import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:typed_data'; import 'package:bitcoin_base/bitcoin_base.dart'; import 'package:flutter/foundation.dart'; import 'package:rxdart/rxdart.dart'; String jsonrpc( {required String method, required List params, required int id, double version = 2.0}) => '{"jsonrpc": "$version", "method": "$method", "id": "$id", "params": ${json.encode(params)}}\n'; class SocketTask { SocketTask({required this.isSubscription, this.completer, this.subject}); final Completer? completer; final BehaviorSubject? subject; final bool isSubscription; } class ElectrumClient { ElectrumClient() : _id = 0, _isConnected = false, _tasks = {}, _errors = {}, unterminatedString = ''; static const connectionTimeout = Duration(seconds: 5); static const aliveTimerDuration = Duration(seconds: 4); bool get isConnected => _isConnected; Socket? socket; void Function(ConnectionStatus)? onConnectionStatusChange; int _id; final Map _tasks; Map get tasks => _tasks; final Map _errors; ConnectionStatus _connectionStatus = ConnectionStatus.disconnected; bool _isConnected; Timer? _aliveTimer; String unterminatedString; Uri? uri; bool? useSSL; Future connectToUri(Uri uri, {bool? useSSL}) async { this.uri = uri; if (useSSL != null) { this.useSSL = useSSL; } await connect(host: uri.host, port: uri.port); } Future connect({required String host, required int port}) async { _setConnectionStatus(ConnectionStatus.connecting); try { await socket?.close(); } catch (_) {} socket = null; try { if (useSSL == false || (useSSL == null && uri.toString().contains("btc-electrum"))) { socket = await Socket.connect(host, port, timeout: connectionTimeout); } else { socket = await SecureSocket.connect( host, port, timeout: connectionTimeout, onBadCertificate: (_) => true, ); } } catch (e) { if (e is HandshakeException) { useSSL = !(useSSL ?? false); } if (_connectionStatus != ConnectionStatus.connecting) { _setConnectionStatus(ConnectionStatus.failed); } return; } if (socket == null) { if (_connectionStatus != ConnectionStatus.connecting) { _setConnectionStatus(ConnectionStatus.failed); } return; } // use ping to determine actual connection status since we could've just not timed out yet: // _setConnectionStatus(ConnectionStatus.connected); socket!.listen( (Uint8List event) { try { final msg = utf8.decode(event.toList()); final messagesList = msg.split("\n"); for (var message in messagesList) { if (message.isEmpty) { continue; } _parseResponse(message); } } catch (e) { print("socket.listen: $e"); } }, onError: (Object error) { final errorMsg = error.toString(); print(errorMsg); unterminatedString = ''; }, onDone: () { print("SOCKET CLOSED!!!!!"); unterminatedString = ''; try { if (host == socket?.address.host || socket == null) { _setConnectionStatus(ConnectionStatus.disconnected); socket?.destroy(); } } catch (e) { print("onDone: $e"); } }, cancelOnError: true, ); keepAlive(); } void _parseResponse(String message) { try { final response = json.decode(message) as Map; _handleResponse(response); } on FormatException catch (e) { final msg = e.message.toLowerCase(); if (e.source is String) { unterminatedString += e.source as String; } if (msg.contains("not a subtype of type")) { unterminatedString += e.source as String; return; } if (isJSONStringCorrect(unterminatedString)) { final response = json.decode(unterminatedString) as Map; _handleResponse(response); unterminatedString = ''; } } on TypeError catch (e) { if (!e.toString().contains('Map') && !e.toString().contains('Map')) { return; } unterminatedString += message; if (isJSONStringCorrect(unterminatedString)) { final response = json.decode(unterminatedString) as Map; _handleResponse(response); // unterminatedString = null; unterminatedString = ''; } } catch (e) { print("parse $e"); } } void keepAlive() { _aliveTimer?.cancel(); _aliveTimer = Timer.periodic(aliveTimerDuration, (_) async => ping()); } Future ping() async { try { await callWithTimeout(method: 'server.ping'); _setConnectionStatus(ConnectionStatus.connected); } catch (_) { _setConnectionStatus(ConnectionStatus.disconnected); } } Future> version() => call(method: 'server.version', params: ["", "1.4"]).then((dynamic result) { if (result is List) { return result.map((dynamic val) => val.toString()).toList(); } return []; }); Future> getBalance(String scriptHash) => call(method: 'blockchain.scripthash.get_balance', params: [scriptHash]) .then((dynamic result) { if (result is Map) { return result; } return {}; }); Future>> getHistory(String scriptHash) => call(method: 'blockchain.scripthash.get_history', params: [scriptHash]) .then((dynamic result) { if (result is List) { return result.map((dynamic val) { if (val is Map) { return val; } return {}; }).toList(); } return []; }); Future>> getListUnspent(String scriptHash) => call(method: 'blockchain.scripthash.listunspent', params: [scriptHash]) .then((dynamic result) { if (result is List) { return result.map((dynamic val) { if (val is Map) { return val; } return {}; }).toList(); } return []; }); Future>> getMempool(String scriptHash) => call(method: 'blockchain.scripthash.get_mempool', params: [scriptHash]) .then((dynamic result) { if (result is List) { return result.map((dynamic val) { if (val is Map) { return val; } return {}; }).toList(); } return []; }); Future getTransaction({required String hash, required bool verbose}) async { try { final result = await callWithTimeout( method: 'blockchain.transaction.get', params: [hash, verbose], timeout: 10000); return result; } on RequestFailedTimeoutException catch (_) { return {}; } catch (e) { return {}; } } Future> getTransactionVerbose({required String hash}) => getTransaction(hash: hash, verbose: true).then((dynamic result) { if (result is Map) { return result; } return {}; }); Future getTransactionHex({required String hash}) => getTransaction(hash: hash, verbose: false).then((dynamic result) { if (result is String) { return result; } return ''; }); Future broadcastTransaction( {required String transactionRaw, BasedUtxoNetwork? network, Function(int)? idCallback}) async => call( method: 'blockchain.transaction.broadcast', params: [transactionRaw], idCallback: idCallback) .then((dynamic result) { if (result is String) { return result; } return ''; }); Future> getMerkle({required String hash, required int height}) async => await call(method: 'blockchain.transaction.get_merkle', params: [hash, height]) as Map; Future> getHeader({required int height}) async => await call(method: 'blockchain.block.get_header', params: [height]) as Map; BehaviorSubject? tweaksSubscribe({required int height, required int count}) => subscribe( id: 'blockchain.tweaks.subscribe', method: 'blockchain.tweaks.subscribe', params: [height, count, true], ); Future tweaksRegister({ required String secViewKey, required String pubSpendKey, List labels = const [], }) => call( method: 'blockchain.tweaks.register', params: [secViewKey, pubSpendKey, labels], ); Future tweaksErase({required String pubSpendKey}) => call( method: 'blockchain.tweaks.erase', params: [pubSpendKey], ); BehaviorSubject? tweaksScan({required String pubSpendKey}) => subscribe( id: 'blockchain.tweaks.scan', method: 'blockchain.tweaks.scan', params: [pubSpendKey], ); Future tweaksGet({required String pubSpendKey}) => call( method: 'blockchain.tweaks.get', params: [pubSpendKey], ); Future getTweaks({required int height}) async => await callWithTimeout(method: 'blockchain.tweaks.subscribe', params: [height, 1, false]); Future estimatefee({required int p}) => call(method: 'blockchain.estimatefee', params: [p]).then((dynamic result) { if (result is double) { return result; } if (result is String) { return double.parse(result); } return 0; }); Future>> feeHistogram() => call(method: 'mempool.get_fee_histogram').then((dynamic result) { if (result is List) { // return result.map((dynamic e) { // if (e is List) { // return e.map((dynamic ee) => ee is int ? ee : null).toList(); // } // return null; // }).toList(); final histogram = >[]; for (final e in result) { if (e is List) { final eee = []; for (final ee in e) { if (ee is int) { eee.add(ee); } } histogram.add(eee); } } return histogram; } return []; }); // Future> feeRates({BasedUtxoNetwork? network}) async { // try { // final topDoubleString = await estimatefee(p: 1); // final middleDoubleString = await estimatefee(p: 5); // final bottomDoubleString = await estimatefee(p: 10); // final top = (stringDoubleToBitcoinAmount(topDoubleString.toString()) / 1000).round(); // final middle = (stringDoubleToBitcoinAmount(middleDoubleString.toString()) / 1000).round(); // final bottom = (stringDoubleToBitcoinAmount(bottomDoubleString.toString()) / 1000).round(); // return [bottom, middle, top]; // } catch (_) { // return []; // } // } // https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-headers-subscribe // example response: // { // "height": 520481, // "hex": "00000020890208a0ae3a3892aa047c5468725846577cfcd9b512b50000000000000000005dc2b02f2d297a9064ee103036c14d678f9afc7e3d9409cf53fd58b82e938e8ecbeca05a2d2103188ce804c4" // } Future getCurrentBlockChainTip() async { try { final result = await callWithTimeout(method: 'blockchain.headers.subscribe'); if (result is Map) { return result["height"] as int; } return null; } on RequestFailedTimeoutException catch (_) { return null; } catch (e) { print("getCurrentBlockChainTip: ${e.toString()}"); return null; } } BehaviorSubject? chainTipSubscribe() { _id += 1; return subscribe( id: 'blockchain.headers.subscribe', method: 'blockchain.headers.subscribe'); } BehaviorSubject? scripthashUpdate(String scripthash) { _id += 1; return subscribe( id: 'blockchain.scripthash.subscribe:$scripthash', method: 'blockchain.scripthash.subscribe', params: [scripthash]); } BehaviorSubject? subscribe( {required String id, required String method, List params = const []}) { try { if (socket == null) { return null; } final subscription = BehaviorSubject(); _regisrySubscription(id, subscription); socket!.write(jsonrpc(method: method, id: _id, params: params)); return subscription; } catch (e) { print("subscribe $e"); return null; } } Future call( {required String method, List params = const [], Function(int)? idCallback}) async { if (socket == null) { return null; } final completer = Completer(); _id += 1; final id = _id; idCallback?.call(id); _registryTask(id, completer); socket!.write(jsonrpc(method: method, id: id, params: params)); return completer.future; } Future callWithTimeout( {required String method, List params = const [], int timeout = 5000}) async { try { if (socket == null) { return null; } final completer = Completer(); _id += 1; final id = _id; _registryTask(id, completer); socket!.write(jsonrpc(method: method, id: id, params: params)); Timer(Duration(milliseconds: timeout), () { if (!completer.isCompleted) { completer.completeError(RequestFailedTimeoutException(method, id)); } }); return completer.future; } catch (e) { print("callWithTimeout $e"); rethrow; } } Future close() async { _aliveTimer?.cancel(); try { await socket?.close(); socket = null; } catch (_) {} onConnectionStatusChange = null; } void _registryTask(int id, Completer completer) => _tasks[id.toString()] = SocketTask(completer: completer, isSubscription: false); void _regisrySubscription(String id, BehaviorSubject subject) => _tasks[id] = SocketTask(subject: subject, isSubscription: true); void _finish(String id, Object? data) { if (_tasks[id] == null) { return; } if (!(_tasks[id]?.completer?.isCompleted ?? false)) { _tasks[id]?.completer!.complete(data); } if (!(_tasks[id]?.isSubscription ?? false)) { _tasks.remove(id); } else { _tasks[id]?.subject?.add(data); } } void _methodHandler({required String method, required Map request}) { switch (method) { case 'blockchain.headers.subscribe': final params = request['params'] as List; final id = 'blockchain.headers.subscribe'; _tasks[id]?.subject?.add(params.last); break; case 'blockchain.scripthash.subscribe': final params = request['params'] as List; final scripthash = params.first as String?; final id = 'blockchain.scripthash.subscribe:$scripthash'; _tasks[id]?.subject?.add(params.last); break; case 'blockchain.headers.subscribe': final params = request['params'] as List; _tasks[method]?.subject?.add(params.last); break; case 'blockchain.tweaks.subscribe': case 'blockchain.tweaks.scan': final params = request['params'] as List; _tasks[_tasks.keys.first]?.subject?.add(params.last); break; default: break; } } void _setConnectionStatus(ConnectionStatus status) { onConnectionStatusChange?.call(status); _connectionStatus = status; _isConnected = status == ConnectionStatus.connected; if (!_isConnected) { try { socket?.destroy(); } catch (_) {} socket = null; } } void _handleResponse(Map response) { final method = response['method']; final id = response['id'] as String?; final result = response['result']; try { final error = response['error'] as Map?; if (error != null) { final errorMessage = error['message'] as String?; if (errorMessage != null) { _errors[id!] = errorMessage; } } } catch (_) {} try { final error = response['error'] as String?; if (error != null) { _errors[id!] = error; } } catch (_) {} if (method is String) { _methodHandler(method: method, request: response); return; } if (id != null) { _finish(id, result); } } String getErrorMessage(int id) => _errors[id.toString()] ?? ''; } // FIXME: move me bool isJSONStringCorrect(String source) { try { json.decode(source); return true; } catch (_) { return false; } } class RequestFailedTimeoutException implements Exception { RequestFailedTimeoutException(this.method, this.id); final String method; final int id; }