diff --git a/.gitmodules b/.gitmodules index 7474c8a54..95b02e580 100644 --- a/.gitmodules +++ b/.gitmodules @@ -6,4 +6,4 @@ url = https://github.com/cypherstack/flutter_libmonero.git [submodule "crypto_plugins/flutter_liblelantus"] path = crypto_plugins/flutter_liblelantus - url = https://github.com/cypherstack/flutter_liblelantus.git + url = https://github.com/cypherstack/flutter_liblelantus.git \ No newline at end of file diff --git a/lib/db/db_version_migration.dart b/lib/db/db_version_migration.dart index 21241f630..e81c01c76 100644 --- a/lib/db/db_version_migration.dart +++ b/lib/db/db_version_migration.dart @@ -85,6 +85,7 @@ class DbVersionMigrator with WalletDB { useSSL: node.useSSL), prefs: prefs, failovers: failovers, + coin: Coin.firo, ); try { diff --git a/lib/electrumx_rpc/cached_electrumx_client.dart b/lib/electrumx_rpc/cached_electrumx_client.dart index 021bdf065..b64e2ec7d 100644 --- a/lib/electrumx_rpc/cached_electrumx_client.dart +++ b/lib/electrumx_rpc/cached_electrumx_client.dart @@ -11,6 +11,9 @@ import 'dart:convert'; import 'dart:math'; +import 'package:electrum_adapter/electrum_adapter.dart' as electrum_adapter; +import 'package:electrum_adapter/electrum_adapter.dart'; +import 'package:electrum_adapter/methods/specific/firo.dart'; import 'package:stackwallet/db/hive/db.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; import 'package:stackwallet/utilities/enums/coin_enum.dart'; @@ -19,20 +22,41 @@ import 'package:string_validator/string_validator.dart'; class CachedElectrumXClient { final ElectrumXClient electrumXClient; + ElectrumClient electrumAdapterClient; + final Future Function() electrumAdapterUpdateCallback; static const minCacheConfirms = 30; - const CachedElectrumXClient({ + CachedElectrumXClient({ required this.electrumXClient, + required this.electrumAdapterClient, + required this.electrumAdapterUpdateCallback, }); factory CachedElectrumXClient.from({ required ElectrumXClient electrumXClient, + required ElectrumClient electrumAdapterClient, + required Future Function() electrumAdapterUpdateCallback, }) => CachedElectrumXClient( electrumXClient: electrumXClient, + electrumAdapterClient: electrumAdapterClient, + electrumAdapterUpdateCallback: electrumAdapterUpdateCallback, ); + /// If the client is closed, use the callback to update it. + _checkElectrumAdapterClient() async { + if (electrumAdapterClient.peer.isClosed) { + Logging.instance.log( + "ElectrumAdapterClient is closed, reopening it...", + level: LogLevel.Info, + ); + ElectrumClient? _electrumAdapterClient = + await electrumAdapterUpdateCallback.call(); + electrumAdapterClient = _electrumAdapterClient; + } + } + Future> getAnonymitySet({ required String groupId, String blockhash = "", @@ -56,9 +80,12 @@ class CachedElectrumXClient { set = Map.from(cachedSet); } - final newSet = await electrumXClient.getLelantusAnonymitySet( + await _checkElectrumAdapterClient(); + + final newSet = await (electrumAdapterClient as FiroElectrumClient) + .getLelantusAnonymitySet( groupId: groupId, - blockhash: set["blockHash"] as String, + blockHash: set["blockHash"] as String, ); // update set with new data @@ -82,7 +109,7 @@ class CachedElectrumXClient { translatedCoin.add(!isHexadecimal(newCoin[2] as String) ? base64ToHex(newCoin[2] as String) : newCoin[2]); - } catch (e, s) { + } catch (e) { translatedCoin.add(newCoin[2]); } translatedCoin.add(!isHexadecimal(newCoin[3] as String) @@ -130,7 +157,10 @@ class CachedElectrumXClient { set = Map.from(cachedSet); } - final newSet = await electrumXClient.getSparkAnonymitySet( + await _checkElectrumAdapterClient(); + + final newSet = await (electrumAdapterClient as FiroElectrumClient) + .getSparkAnonymitySet( coinGroupId: groupId, startBlockHash: set["blockHash"] as String, ); @@ -188,8 +218,10 @@ class CachedElectrumXClient { final cachedTx = box.get(txHash) as Map?; if (cachedTx == null) { - final Map result = await electrumXClient - .getTransaction(txHash: txHash, verbose: verbose); + await _checkElectrumAdapterClient(); + + final Map result = + await electrumAdapterClient.getTransaction(txHash); result.remove("hex"); result.remove("lelantusData"); @@ -231,7 +263,10 @@ class CachedElectrumXClient { cachedSerials.length - 100, // 100 being some arbitrary buffer ); - final serials = await electrumXClient.getLelantusUsedCoinSerials( + await _checkElectrumAdapterClient(); + + final serials = await (electrumAdapterClient as FiroElectrumClient) + .getLelantusUsedCoinSerials( startNumber: startNumber, ); @@ -279,7 +314,10 @@ class CachedElectrumXClient { cachedTags.length - 100, // 100 being some arbitrary buffer ); - final tags = await electrumXClient.getSparkUsedCoinsTags( + await _checkElectrumAdapterClient(); + + final tags = + await (electrumAdapterClient as FiroElectrumClient).getUsedCoinsTags( startNumber: startNumber, ); @@ -287,12 +325,18 @@ class CachedElectrumXClient { // .map((e) => !isHexadecimal(e) ? base64ToHex(e) : e) // .toSet(); + // Convert the Map tags to a Set. + final newTags = (tags["tags"] as List).toSet(); + // ensure we are getting some overlap so we know we are not missing any if (cachedTags.isNotEmpty && tags.isNotEmpty) { - assert(cachedTags.intersection(tags).isNotEmpty); + assert(cachedTags.intersection(newTags).isNotEmpty); } - cachedTags.addAll(tags); + // Make newTags an Iterable. + final Iterable iterableTags = newTags.map((e) => e.toString()); + + cachedTags.addAll(iterableTags); await box.put( "tags", diff --git a/lib/electrumx_rpc/electrumx_client.dart b/lib/electrumx_rpc/electrumx_client.dart index 931c9334f..d8c029f36 100644 --- a/lib/electrumx_rpc/electrumx_client.dart +++ b/lib/electrumx_rpc/electrumx_client.dart @@ -9,11 +9,13 @@ */ import 'dart:async'; -import 'dart:convert'; import 'dart:io'; import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:decimal/decimal.dart'; +import 'package:electrum_adapter/electrum_adapter.dart' as electrum_adapter; +import 'package:electrum_adapter/electrum_adapter.dart'; +import 'package:electrum_adapter/methods/specific/firo.dart'; import 'package:event_bus/event_bus.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter_libsparkmobile/flutter_libsparkmobile.dart'; @@ -24,9 +26,10 @@ import 'package:stackwallet/services/event_bus/events/global/tor_connection_stat import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; import 'package:stackwallet/services/event_bus/global_event_bus.dart'; import 'package:stackwallet/services/tor_service.dart'; +import 'package:stackwallet/utilities/enums/coin_enum.dart'; import 'package:stackwallet/utilities/logger.dart'; import 'package:stackwallet/utilities/prefs.dart'; -import 'package:uuid/uuid.dart'; +import 'package:stream_channel/stream_channel.dart'; class WifiOnlyException implements Exception {} @@ -73,6 +76,12 @@ class ElectrumXClient { JsonRPC? get rpcClient => _rpcClient; JsonRPC? _rpcClient; + StreamChannel? get electrumAdapterChannel => _electrumAdapterChannel; + StreamChannel? _electrumAdapterChannel; + + ElectrumClient? get electrumAdapterClient => _electrumAdapterClient; + ElectrumClient? _electrumAdapterClient; + late Prefs _prefs; late TorService _torService; @@ -81,6 +90,9 @@ class ElectrumXClient { final Duration connectionTimeoutForSpecialCaseJsonRPCClients; + Coin? get coin => _coin; + late Coin? _coin; + // add finalizer to cancel stream subscription when all references to an // instance of ElectrumX becomes inaccessible static final Finalizer _finalizer = Finalizer( @@ -101,7 +113,7 @@ class ElectrumXClient { required bool useSSL, required Prefs prefs, required List failovers, - JsonRPC? client, + Coin? coin, this.connectionTimeoutForSpecialCaseJsonRPCClients = const Duration(seconds: 60), TorService? torService, @@ -112,7 +124,7 @@ class ElectrumXClient { _host = host; _port = port; _useSSL = useSSL; - _rpcClient = client; + _coin = coin; final bus = globalEventBusForTesting ?? GlobalEventBus.instance; _torStatusListener = bus.on().listen( @@ -141,21 +153,10 @@ class ElectrumXClient { // case TorStatus.disabled: // } - // might be ok to just reset/kill the current _jsonRpcClient - - // since disconnecting is async and we want to ensure instant change over - // we will keep temp reference to current rpc client to call disconnect - // on before awaiting the disconnection future - - final temp = _rpcClient; - // setting to null should force the creation of a new json rpc client // on the next request sent through this electrumx instance - _rpcClient = null; - - await temp?.disconnect( - reason: "Tor status changed to \"${event.status}\"", - ); + _electrumAdapterChannel = null; + _electrumAdapterClient = null; }, ); } @@ -164,6 +165,7 @@ class ElectrumXClient { required ElectrumXNode node, required Prefs prefs, required List failovers, + required Coin coin, TorService? torService, EventBus? globalEventBusForTesting, }) { @@ -175,6 +177,7 @@ class ElectrumXClient { torService: torService, failovers: failovers, globalEventBusForTesting: globalEventBusForTesting, + coin: coin, ); } @@ -186,7 +189,9 @@ class ElectrumXClient { return true; } - void _checkRpcClient() { + Future checkElectrumAdapter() async { + ({InternetAddress host, int port})? proxyInfo; + // If we're supposed to use Tor... if (_prefs.useTor) { // But Tor isn't running... @@ -195,65 +200,93 @@ class ElectrumXClient { if (!_prefs.torKillSwitch) { // Then we'll just proceed and connect to ElectrumX through clearnet at the bottom of this function. Logging.instance.log( - "Tor preference set but Tor is not enabled, killswitch not set, connecting to ElectrumX through clearnet", + "Tor preference set but Tor is not enabled, killswitch not set, connecting to Electrum adapter through clearnet", level: LogLevel.Warning, ); } else { // ... But if the killswitch is set, then we throw an exception. throw Exception( - "Tor preference and killswitch set but Tor is not enabled, not connecting to ElectrumX"); + "Tor preference and killswitch set but Tor is not enabled, not connecting to Electrum adapter"); // TODO [prio=low]: Try to start Tor. } } else { // Get the proxy info from the TorService. - final proxyInfo = _torService.getProxyInfo(); - - if (currentFailoverIndex == -1) { - _rpcClient ??= JsonRPC( - host: host, - port: port, - useSSL: useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: proxyInfo, - ); - } else { - _rpcClient ??= JsonRPC( - host: failovers![currentFailoverIndex].address, - port: failovers![currentFailoverIndex].port, - useSSL: failovers![currentFailoverIndex].useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: proxyInfo, - ); - } - - if (_rpcClient!.proxyInfo != proxyInfo) { - _rpcClient!.proxyInfo = proxyInfo; - _rpcClient!.disconnect( - reason: "Tor proxyInfo does not match current info", - ); - } - - return; + proxyInfo = _torService.getProxyInfo(); } } - if (currentFailoverIndex == -1) { - _rpcClient ??= JsonRPC( - host: host, - port: port, - useSSL: useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: null, - ); - } else { - _rpcClient ??= JsonRPC( - host: failovers![currentFailoverIndex].address, - port: failovers![currentFailoverIndex].port, - useSSL: failovers![currentFailoverIndex].useSSL, - connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, - proxyInfo: null, - ); + // TODO [prio=med]: Add proxyInfo to StreamChannel (or add to wrapper). + // if (_electrumAdapter!.proxyInfo != proxyInfo) { + // _electrumAdapter!.proxyInfo = proxyInfo; + // _electrumAdapter!.disconnect( + // reason: "Tor proxyInfo does not match current info", + // ); + // } + + // If the current ElectrumAdapterClient is closed, create a new one. + if (_electrumAdapterClient != null && + _electrumAdapterClient!.peer.isClosed) { + _electrumAdapterChannel = null; + _electrumAdapterClient = null; } + + if (currentFailoverIndex == -1) { + _electrumAdapterChannel ??= await electrum_adapter.connect( + host, + port: port, + connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, + aliveTimerDuration: connectionTimeoutForSpecialCaseJsonRPCClients, + acceptUnverified: true, + useSSL: useSSL, + proxyInfo: proxyInfo, + ); + if (_coin == Coin.firo || _coin == Coin.firoTestNet) { + _electrumAdapterClient ??= FiroElectrumClient( + _electrumAdapterChannel!, + host, + port, + useSSL, + proxyInfo, + ); + } else { + _electrumAdapterClient ??= ElectrumClient( + _electrumAdapterChannel!, + host, + port, + useSSL, + proxyInfo, + ); + } + } else { + _electrumAdapterChannel ??= await electrum_adapter.connect( + failovers![currentFailoverIndex].address, + port: failovers![currentFailoverIndex].port, + connectionTimeout: connectionTimeoutForSpecialCaseJsonRPCClients, + aliveTimerDuration: connectionTimeoutForSpecialCaseJsonRPCClients, + acceptUnverified: true, + useSSL: failovers![currentFailoverIndex].useSSL, + proxyInfo: proxyInfo, + ); + if (_coin == Coin.firo || _coin == Coin.firoTestNet) { + _electrumAdapterClient ??= FiroElectrumClient( + _electrumAdapterChannel!, + failovers![currentFailoverIndex].address, + failovers![currentFailoverIndex].port, + failovers![currentFailoverIndex].useSSL, + proxyInfo, + ); + } else { + _electrumAdapterClient ??= ElectrumClient( + _electrumAdapterChannel!, + failovers![currentFailoverIndex].address, + failovers![currentFailoverIndex].port, + failovers![currentFailoverIndex].useSSL, + proxyInfo, + ); + } + } + + return; } /// Send raw rpc command @@ -269,32 +302,22 @@ class ElectrumXClient { } if (_requireMutex) { - await _torConnectingLock.protect(() async => _checkRpcClient()); + await _torConnectingLock + .protect(() async => await checkElectrumAdapter()); } else { - _checkRpcClient(); + await checkElectrumAdapter(); } try { - final requestId = requestID ?? const Uuid().v1(); - final jsonArgs = json.encode(args); - final jsonRequestString = '{"jsonrpc": "2.0", ' - '"id": "$requestId",' - '"method": "$command",' - '"params": $jsonArgs}'; - - // Logging.instance.log("ElectrumX jsonRequestString: $jsonRequestString"); - - final response = await _rpcClient!.request( - jsonRequestString, - requestTimeout, + final response = await _electrumAdapterClient!.request( + command, + args, ); - if (response.exception != null) { - throw response.exception!; - } - - if (response.data is Map && response.data["error"] != null) { - if (response.data["error"] + if (response is Map && + response.keys.contains("error") && + response["error"] != null) { + if (response["error"] .toString() .contains("No such mempool or blockchain transaction")) { throw NoSuchTransactionException( @@ -306,13 +329,19 @@ class ElectrumXClient { throw Exception( "JSONRPC response\n" " command: $command\n" - " error: ${response.data}" + " error: ${response["error"]}\n" " args: $args\n", ); } currentFailoverIndex = -1; - return response.data; + + // If the command is a ping, a good return should always be null. + if (command.contains("ping")) { + return true; + } + + return response; } on WifiOnlyException { rethrow; } on SocketException { @@ -348,7 +377,7 @@ class ElectrumXClient { /// map of /// /// returns a list of json response objects if no errors were found - Future>> batchRequest({ + Future> batchRequest({ required String command, required Map> args, Duration requestTimeout = const Duration(seconds: 60), @@ -359,62 +388,34 @@ class ElectrumXClient { } if (_requireMutex) { - await _torConnectingLock.protect(() async => _checkRpcClient()); + await _torConnectingLock + .protect(() async => await checkElectrumAdapter()); } else { - _checkRpcClient(); + await checkElectrumAdapter(); } try { - final List requestStrings = []; - - for (final entry in args.entries) { - final jsonArgs = json.encode(entry.value); - requestStrings.add( - '{"jsonrpc": "2.0", "id": "${entry.key}","method": "$command","params": $jsonArgs}'); - } - - // combine request strings into json array - String request = "["; - for (int i = 0; i < requestStrings.length - 1; i++) { - request += "${requestStrings[i]},"; - } - request += "${requestStrings.last}]"; - - // Logging.instance.log("batch request: $request"); - - // send batch request - final jsonRpcResponse = - (await _rpcClient!.request(request, requestTimeout)); - - if (jsonRpcResponse.exception != null) { - throw jsonRpcResponse.exception!; - } - - final List response; - try { - if (jsonRpcResponse.data is Map) { - response = [jsonRpcResponse.data]; - - if (requestStrings.length > 1) { - Logging.instance.log( - "ElectrumXClient.batchRequest: Map returned instead of a list and there are ${requestStrings.length} queued.", - level: LogLevel.Error); - } - // Could throw error here. - } else { - response = jsonRpcResponse.data as List; + var futures = >[]; + List? response; + _electrumAdapterClient!.peer.withBatch(() { + for (final entry in args.entries) { + futures.add(_electrumAdapterClient!.request(command, entry.value)); } - } catch (_) { - throw Exception( - "Expected json list or map but got a ${jsonRpcResponse.data.runtimeType}: ${jsonRpcResponse.data}", - ); - } + }); + response = await Future.wait(futures); // check for errors, format and throw if there are any final List errors = []; for (int i = 0; i < response.length; i++) { - final result = response[i]; - if (result["error"] != null || result["result"] == null) { + var result = response[i]; + + if (result == null || (result is List && result.isEmpty)) { + continue; + // TODO [prio=extreme]: Figure out if this is actually an issue. + } + result = result[0]; // Unwrap the list. + if ((result is Map && result.keys.contains("error")) || + result == null) { errors.add(result.toString()); } } @@ -428,7 +429,7 @@ class ElectrumXClient { } currentFailoverIndex = -1; - return List>.from(response, growable: false); + return response; } on WifiOnlyException { rethrow; } on SocketException { @@ -463,13 +464,23 @@ class ElectrumXClient { /// Returns true if ping succeeded Future ping({String? requestID, int retryCount = 1}) async { try { - final response = await request( + // This doesn't work because electrum_adapter only returns the result: + // (which is always `null`). + // await checkElectrumAdapter(); + // final response = await electrumAdapterClient! + // .ping() + // .timeout(const Duration(seconds: 2)); + // return (response as Map).isNotEmpty; + + // Because request() has been updated to use electrum_adapter, and because + // electrum_adapter returns the result of the request, request() has been + // updated to return a bool on a server.ping command as a special case. + return await request( requestID: requestID, command: 'server.ping', requestTimeout: const Duration(seconds: 2), retries: retryCount, - ).timeout(const Duration(seconds: 2)) as Map; - return response.keys.contains("result") && response["result"] == null; + ).timeout(const Duration(seconds: 2)) as bool; } catch (e) { rethrow; } @@ -490,14 +501,14 @@ class ElectrumXClient { requestID: requestID, command: 'blockchain.headers.subscribe', ); - if (response["result"] == null) { + if (response == null) { Logging.instance.log( "getBlockHeadTip returned null response", level: LogLevel.Error, ); throw 'getBlockHeadTip returned null response'; } - return Map.from(response["result"] as Map); + return Map.from(response as Map); } catch (e) { rethrow; } @@ -522,7 +533,7 @@ class ElectrumXClient { requestID: requestID, command: 'server.features', ); - return Map.from(response["result"] as Map); + return Map.from(response as Map); } catch (e) { rethrow; } @@ -543,7 +554,7 @@ class ElectrumXClient { rawTx, ], ); - return response["result"] as String; + return response as String; } catch (e) { rethrow; } @@ -570,7 +581,7 @@ class ElectrumXClient { scripthash, ], ); - return Map.from(response["result"] as Map); + return Map.from(response as Map); } catch (e) { rethrow; } @@ -607,7 +618,7 @@ class ElectrumXClient { scripthash, ], ); - result = response["result"]; + result = response; retryCount--; } @@ -617,17 +628,16 @@ class ElectrumXClient { } } - Future>>> getBatchHistory( + Future>>> getBatchHistory( {required Map> args}) async { try { final response = await batchRequest( command: 'blockchain.scripthash.get_history', args: args, ); - final Map>> result = {}; + final Map>> result = {}; for (int i = 0; i < response.length; i++) { - result[response[i]["id"] as String] = - List>.from(response[i]["result"] as List); + result[i] = List>.from(response[i] as List); } return result; } catch (e) { @@ -665,23 +675,33 @@ class ElectrumXClient { scripthash, ], ); - return List>.from(response["result"] as List); + return List>.from(response as List); } catch (e) { rethrow; } } - Future>>> getBatchUTXOs( + Future>>> getBatchUTXOs( {required Map> args}) async { try { final response = await batchRequest( command: 'blockchain.scripthash.listunspent', args: args, ); - final Map>> result = {}; + final Map>> result = {}; for (int i = 0; i < response.length; i++) { - result[response[i]["id"] as String] = - List>.from(response[i]["result"] as List); + if ((response[i] as List).isNotEmpty) { + try { + // result[i] = response[i] as List>; + result[i] = List>.from(response[i] as List); + } catch (e) { + print(response[i]); + Logging.instance.log( + "getBatchUTXOs failed to parse response", + level: LogLevel.Error, + ); + } + } } return result; } catch (e) { @@ -741,41 +761,18 @@ class ElectrumXClient { bool verbose = true, String? requestID, }) async { - dynamic response; - try { - response = await request( - requestID: requestID, - command: 'blockchain.transaction.get', - args: [ - txHash, - verbose, - ], - ); - if (!verbose) { - return {"rawtx": response["result"] as String}; - } + Logging.instance.log("attempting to fetch blockchain.transaction.get...", + level: LogLevel.Info); + await checkElectrumAdapter(); + dynamic response = await _electrumAdapterClient!.getTransaction(txHash); + Logging.instance.log("Fetching blockchain.transaction.get finished", + level: LogLevel.Info); - if (response is! Map) { - final String msg = "getTransaction($txHash) returned a non-Map response" - " of type ${response.runtimeType}.\nResponse: $response"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - - if (response["result"] == null) { - final String msg = "getTransaction($txHash) returned null result." - "\nResponse: $response"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - return Map.from(response["result"] as Map); - } catch (e, s) { - Logging.instance.log( - "getTransaction($txHash) response: $response" - "\nError: $e\nStack trace: $s", - level: LogLevel.Error); - rethrow; + if (!verbose) { + return {"rawtx": response as String}; } + + return Map.from(response as Map); } /// Returns the whole Lelantus anonymity set for denomination in the groupId. @@ -797,23 +794,15 @@ class ElectrumXClient { String blockhash = "", String? requestID, }) async { - try { - Logging.instance.log("attempting to fetch lelantus.getanonymityset...", - level: LogLevel.Info); - final response = await request( - requestID: requestID, - command: 'lelantus.getanonymityset', - args: [ - groupId, - blockhash, - ], - ); - Logging.instance.log("Fetching lelantus.getanonymityset finished", - level: LogLevel.Info); - return Map.from(response["result"] as Map); - } catch (e) { - rethrow; - } + Logging.instance.log("attempting to fetch lelantus.getanonymityset...", + level: LogLevel.Info); + await checkElectrumAdapter(); + Map response = + await (_electrumAdapterClient as FiroElectrumClient)! + .getLelantusAnonymitySet(groupId: groupId, blockHash: blockhash); + Logging.instance.log("Fetching lelantus.getanonymityset finished", + level: LogLevel.Info); + return response; } //TODO add example to docs @@ -824,18 +813,14 @@ class ElectrumXClient { dynamic mints, String? requestID, }) async { - try { - final response = await request( - requestID: requestID, - command: 'lelantus.getmintmetadata', - args: [ - mints, - ], - ); - return response["result"]; - } catch (e) { - rethrow; - } + Logging.instance.log("attempting to fetch lelantus.getmintmetadata...", + level: LogLevel.Info); + await checkElectrumAdapter(); + dynamic response = await (_electrumAdapterClient as FiroElectrumClient)! + .getLelantusMintData(mints: mints); + Logging.instance.log("Fetching lelantus.getmintmetadata finished", + level: LogLevel.Info); + return response; } //TODO add example to docs @@ -844,45 +829,38 @@ class ElectrumXClient { String? requestID, required int startNumber, }) async { - try { - int retryCount = 3; - dynamic result; + Logging.instance.log("attempting to fetch lelantus.getusedcoinserials...", + level: LogLevel.Info); + await checkElectrumAdapter(); - while (retryCount > 0 && result is! List) { - final response = await request( - requestID: requestID, - command: 'lelantus.getusedcoinserials', - args: [ - "$startNumber", - ], - requestTimeout: const Duration(minutes: 2), - ); + int retryCount = 3; + dynamic response; - result = response["result"]; - retryCount--; - } + while (retryCount > 0 && response is! List) { + response = await (_electrumAdapterClient as FiroElectrumClient)! + .getLelantusUsedCoinSerials(startNumber: startNumber); + // TODO add 2 minute timeout. + Logging.instance.log("Fetching lelantus.getusedcoinserials finished", + level: LogLevel.Info); - return Map.from(result as Map); - } catch (e) { - Logging.instance.log(e, level: LogLevel.Error); - rethrow; + retryCount--; } + + return Map.from(response as Map); } /// Returns the latest Lelantus set id /// /// ex: 1 Future getLelantusLatestCoinId({String? requestID}) async { - try { - final response = await request( - requestID: requestID, - command: 'lelantus.getlatestcoinid', - ); - return response["result"] as int; - } catch (e) { - Logging.instance.log(e, level: LogLevel.Error); - rethrow; - } + Logging.instance.log("attempting to fetch lelantus.getlatestcoinid...", + level: LogLevel.Info); + await checkElectrumAdapter(); + int response = + await (_electrumAdapterClient as FiroElectrumClient).getLatestCoinId(); + Logging.instance.log("Fetching lelantus.getlatestcoinid finished", + level: LogLevel.Info); + return response; } // ============== Spark ====================================================== @@ -908,17 +886,14 @@ class ElectrumXClient { try { Logging.instance.log("attempting to fetch spark.getsparkanonymityset...", level: LogLevel.Info); - final response = await request( - requestID: requestID, - command: 'spark.getsparkanonymityset', - args: [ - coinGroupId, - startBlockHash, - ], - ); + await checkElectrumAdapter(); + Map response = + await (_electrumAdapterClient as FiroElectrumClient) + .getSparkAnonymitySet( + coinGroupId: coinGroupId, startBlockHash: startBlockHash); Logging.instance.log("Fetching spark.getsparkanonymityset finished", level: LogLevel.Info); - return Map.from(response["result"] as Map); + return response; } catch (e) { rethrow; } @@ -931,15 +906,17 @@ class ElectrumXClient { required int startNumber, }) async { try { - final response = await request( - requestID: requestID, - command: 'spark.getusedcoinstags', - args: [ - "$startNumber", - ], - requestTimeout: const Duration(minutes: 2), - ); - final map = Map.from(response["result"] as Map); + // Use electrum_adapter package's getSparkUsedCoinsTags method. + Logging.instance.log("attempting to fetch spark.getusedcoinstags...", + level: LogLevel.Info); + await checkElectrumAdapter(); + Map response = + await (_electrumAdapterClient as FiroElectrumClient) + .getUsedCoinsTags(startNumber: startNumber); + // TODO: Add 2 minute timeout. + Logging.instance.log("Fetching spark.getusedcoinstags finished", + level: LogLevel.Info); + final map = Map.from(response); final set = Set.from(map["tags"] as List); return await compute(_ffiHashTagsComputeWrapper, set); } catch (e) { @@ -963,16 +940,15 @@ class ElectrumXClient { required List sparkCoinHashes, }) async { try { - final response = await request( - requestID: requestID, - command: 'spark.getsparkmintmetadata', - args: [ - { - "coinHashes": sparkCoinHashes, - }, - ], - ); - return List>.from(response["result"] as List); + Logging.instance.log("attempting to fetch spark.getsparkmintmetadata...", + level: LogLevel.Info); + await checkElectrumAdapter(); + List response = + await (_electrumAdapterClient as FiroElectrumClient) + .getSparkMintMetaData(sparkCoinHashes: sparkCoinHashes); + Logging.instance.log("Fetching spark.getsparkmintmetadata finished", + level: LogLevel.Info); + return List>.from(response); } catch (e) { Logging.instance.log(e, level: LogLevel.Error); rethrow; @@ -986,11 +962,14 @@ class ElectrumXClient { String? requestID, }) async { try { - final response = await request( - requestID: requestID, - command: 'spark.getsparklatestcoinid', - ); - return response["result"] as int; + Logging.instance.log("attempting to fetch spark.getsparklatestcoinid...", + level: LogLevel.Info); + await checkElectrumAdapter(); + int response = await (_electrumAdapterClient as FiroElectrumClient) + .getSparkLatestCoinId(); + Logging.instance.log("Fetching spark.getsparklatestcoinid finished", + level: LogLevel.Info); + return response; } catch (e) { Logging.instance.log(e, level: LogLevel.Error); rethrow; @@ -1007,15 +986,8 @@ class ElectrumXClient { /// "rate": 1000, /// } Future> getFeeRate({String? requestID}) async { - try { - final response = await request( - requestID: requestID, - command: 'blockchain.getfeerate', - ); - return Map.from(response["result"] as Map); - } catch (e) { - rethrow; - } + await checkElectrumAdapter(); + return await _electrumAdapterClient!.getFeeRate(); } /// Return the estimated transaction fee per kilobyte for a transaction to be confirmed within a certain number of [blocks]. @@ -1033,10 +1005,10 @@ class ElectrumXClient { ], ); try { - return Decimal.parse(response["result"].toString()); + return Decimal.parse(response.toString()); } catch (e, s) { final String msg = "Error parsing fee rate. Response: $response" - "\nResult: ${response["result"]}\nError: $e\nStack trace: $s"; + "\nResult: ${response}\nError: $e\nStack trace: $s"; Logging.instance.log(msg, level: LogLevel.Fatal); throw Exception(msg); } @@ -1056,7 +1028,7 @@ class ElectrumXClient { requestID: requestID, command: 'blockchain.relayfee', ); - return Decimal.parse(response["result"].toString()); + return Decimal.parse(response.toString()); } catch (e) { rethrow; } diff --git a/lib/electrumx_rpc/subscribable_electrumx_client.dart b/lib/electrumx_rpc/subscribable_electrumx_client.dart index 5db7cefc1..f06771906 100644 --- a/lib/electrumx_rpc/subscribable_electrumx_client.dart +++ b/lib/electrumx_rpc/subscribable_electrumx_client.dart @@ -1,862 +1,862 @@ -/* - * This file is part of Stack Wallet. - * - * Copyright (c) 2023 Cypher Stack - * All Rights Reserved. - * The code is distributed under GPLv3 license, see LICENSE file for details. - * Generated by Cypher Stack on 2023-05-26 - * - */ - -import 'dart:async'; -import 'dart:convert'; -import 'dart:io'; - -import 'package:event_bus/event_bus.dart'; -import 'package:mutex/mutex.dart'; -import 'package:socks_socket/socks_socket.dart'; -import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; -import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart'; -import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; -import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; -import 'package:stackwallet/services/event_bus/global_event_bus.dart'; -import 'package:stackwallet/services/tor_service.dart'; -import 'package:stackwallet/utilities/logger.dart'; -import 'package:stackwallet/utilities/prefs.dart'; - -class ElectrumXSubscription { - final StreamController _controller = - StreamController(); // TODO controller params - - Stream get responseStream => _controller.stream; - - void addToStream(dynamic data) => _controller.add(data); -} - -class SocketTask { - SocketTask({this.completer, this.subscription}); - - final Completer? completer; - final ElectrumXSubscription? subscription; - - bool get isSubscription => subscription != null; -} - -class SubscribableElectrumXClient { - int _currentRequestID = 0; - bool _isConnected = false; - List _responseData = []; - final Map _tasks = {}; - Timer? _aliveTimer; - Socket? _socket; - SOCKSSocket? _socksSocket; - late final bool _useSSL; - late final Duration _connectionTimeout; - late final Duration _keepAlive; - - bool get isConnected => _isConnected; - bool get useSSL => _useSSL; - // Used to reconnect. - String? _host; - int? _port; - - void Function(bool)? onConnectionStatusChanged; - - late Prefs _prefs; - late TorService _torService; - StreamSubscription? _torPreferenceListener; - StreamSubscription? _torStatusListener; - final Mutex _torConnectingLock = Mutex(); - bool _requireMutex = false; - - List? failovers; - int currentFailoverIndex = -1; - - SubscribableElectrumXClient({ - required bool useSSL, - required Prefs prefs, - required List failovers, - TorService? torService, - this.onConnectionStatusChanged, - Duration connectionTimeout = const Duration(seconds: 5), - Duration keepAlive = const Duration(seconds: 10), - EventBus? globalEventBusForTesting, - }) { - _useSSL = useSSL; - _prefs = prefs; - _torService = torService ?? TorService.sharedInstance; - _connectionTimeout = connectionTimeout; - _keepAlive = keepAlive; - - // If we're testing, use the global event bus for testing. - final bus = globalEventBusForTesting ?? GlobalEventBus.instance; - - // Listen to global event bus for Tor status changes. - _torStatusListener = bus.on().listen( - (event) async { - try { - switch (event.newStatus) { - case TorConnectionStatus.connecting: - // If Tor is connecting, we need to wait. - await _torConnectingLock.acquire(); - _requireMutex = true; - break; - - case TorConnectionStatus.connected: - case TorConnectionStatus.disconnected: - // If Tor is connected or disconnected, we can release the lock. - if (_torConnectingLock.isLocked) { - _torConnectingLock.release(); - } - _requireMutex = false; - break; - } - } finally { - // Ensure the lock is released. - if (_torConnectingLock.isLocked) { - _torConnectingLock.release(); - } - } - }, - ); - - // Listen to global event bus for Tor preference changes. - _torPreferenceListener = bus.on().listen( - (event) async { - // Close open socket (if open). - final tempSocket = _socket; - _socket = null; - await tempSocket?.close(); - - // Close open SOCKS socket (if open). - final tempSOCKSSocket = _socksSocket; - _socksSocket = null; - await tempSOCKSSocket?.close(); - - // Clear subscriptions. - _tasks.clear(); - - // Cancel alive timer - _aliveTimer?.cancel(); - }, - ); - } - - factory SubscribableElectrumXClient.from({ - required ElectrumXNode node, - required Prefs prefs, - required List failovers, - TorService? torService, - }) { - return SubscribableElectrumXClient( - useSSL: node.useSSL, - prefs: prefs, - failovers: failovers, - torService: torService ?? TorService.sharedInstance, - ); - } - - // Example for returning a future which completes upon connection. - // static Future from({ - // required ElectrumXNode node, - // TorService? torService, - // }) async { - // final client = SubscribableElectrumXClient( - // useSSL: node.useSSL, - // ); - // - // await client.connect(host: node.address, port: node.port); - // - // return client; - // } - - /// Check if the RPC client is connected and connect if needed. - /// - /// If Tor is enabled but not running, it will attempt to start Tor. - Future _checkSocket({bool connecting = false}) async { - if (_prefs.useTor) { - // If we're supposed to use Tor... - if (_torService.status != TorConnectionStatus.connected) { - // ... but Tor isn't running... - if (!_prefs.torKillSwitch) { - // ... and the killswitch isn't set, then we'll just return below. - Logging.instance.log( - "Tor preference set but Tor is not enabled, killswitch not set, connecting to ElectrumX through clearnet.", - level: LogLevel.Warning, - ); - } else { - // ... but if the killswitch is set, then let's try to start Tor. - await _torService.start(); - // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. - - // Double-check that Tor is running. - if (_torService.status != TorConnectionStatus.connected) { - // If Tor still isn't running, then we'll throw an exception. - throw Exception("SubscribableElectrumXClient._checkRpcClient: " - "Tor preference and killswitch set but Tor not enabled and could not start, not connecting to ElectrumX."); - } - } - } - } - - // Connect if needed. - if (!connecting) { - if ((!_prefs.useTor && _socket == null) || - (_prefs.useTor && _socksSocket == null)) { - if (currentFailoverIndex == -1) { - // Check if we have cached node information - if (_host == null && _port == null) { - throw Exception("SubscribableElectrumXClient._checkRpcClient: " - "No host or port provided and no cached node information."); - } - - // Connect to the server. - await connect(host: _host!, port: _port!); - } else { - // Attempt to connect to the next failover server. - await connect( - host: failovers![currentFailoverIndex].address, - port: failovers![currentFailoverIndex].port, - ); - } - } - } - } - - /// Connect to the server. - /// - /// If Tor is enabled, it will attempt to connect through Tor. - Future connect({ - required String host, - required int port, - }) async { - try { - // Cache node information. - _host = host; - _port = port; - - // If we're already connected, disconnect first. - try { - await _socket?.close(); - } catch (_) {} - - // If we're connecting to Tor, wait. - if (_requireMutex) { - await _torConnectingLock - .protect(() async => await _checkSocket(connecting: true)); - } else { - await _checkSocket(connecting: true); - } - - if (!Prefs.instance.useTor) { - // If we're not supposed to use Tor, then connect directly. - await connectClearnet(host, port); - } else { - // If we're supposed to use Tor... - if (_torService.status != TorConnectionStatus.connected) { - // ... but Tor isn't running... - if (!_prefs.torKillSwitch) { - // ... and the killswitch isn't set, then we'll connect clearnet. - Logging.instance.log( - "Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet", - level: LogLevel.Warning, - ); - await connectClearnet(host, port); - } else { - // ... but if the killswitch is set, then let's try to start Tor. - await _torService.start(); - // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. - - // Doublecheck that Tor is running. - if (_torService.status != TorConnectionStatus.connected) { - // If Tor still isn't running, then we'll throw an exception. - throw Exception( - "Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX"); - } - - // Connect via Tor. - await connectTor(host, port); - } - } else { - // Connect via Tor. - await connectTor(host, port); - } - } - - _updateConnectionStatus(true); - - if (_prefs.useTor) { - if (_socksSocket == null) { - final String msg = "SubscribableElectrumXClient.connect(): " - "cannot listen to $host:$port via SOCKSSocket because it is not connected."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - - _socksSocket!.listen( - _dataHandler, - onError: _errorHandler, - onDone: _doneHandler, - cancelOnError: true, - ); - } else { - if (_socket == null) { - final String msg = "SubscribableElectrumXClient.connect(): " - "cannot listen to $host:$port via socket because it is not connected."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - - _socket!.listen( - _dataHandler, - onError: _errorHandler, - onDone: _doneHandler, - cancelOnError: true, - ); - } - - _aliveTimer?.cancel(); - _aliveTimer = Timer.periodic( - _keepAlive, - (_) async => _updateConnectionStatus(await ping()), - ); - } catch (e, s) { - final msg = "SubscribableElectrumXClient.connect: " - "failed to connect to $host:$port." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - - // Ensure cleanup is performed on failure to avoid resource leaks. - await disconnect(); // Use the disconnect method to clean up. - rethrow; // Rethrow the exception to handle it further up the call stack. - } - } - - /// Connect to the server directly. - Future connectClearnet(String host, int port) async { - try { - Logging.instance.log( - "SubscribableElectrumXClient.connectClearnet(): " - "creating a socket to $host:$port (SSL $useSSL)...", - level: LogLevel.Info); - - if (_useSSL) { - _socket = await SecureSocket.connect( - host, - port, - timeout: _connectionTimeout, - onBadCertificate: (_) => - true, // TODO do not automatically trust bad certificates. - ); - } else { - _socket = await Socket.connect( - host, - port, - timeout: _connectionTimeout, - ); - } - - Logging.instance.log( - "SubscribableElectrumXClient.connectClearnet(): " - "created socket to $host:$port...", - level: LogLevel.Info); - } catch (e, s) { - final String msg = "SubscribableElectrumXClient.connectClearnet: " - "failed to connect to $host (SSL: $useSSL)." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - - return; - } - - /// Connect to the server using the Tor service. - Future connectTor(String host, int port) async { - // Get the proxy info from the TorService. - final proxyInfo = _torService.getProxyInfo(); - - try { - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "creating a SOCKS socket at $proxyInfo (SSL $useSSL)...", - level: LogLevel.Info); - - // Create a socks socket using the Tor service's proxy info. - _socksSocket = await SOCKSSocket.create( - proxyHost: proxyInfo.host.address, - proxyPort: proxyInfo.port, - sslEnabled: useSSL, - ); - - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "created SOCKS socket at $proxyInfo...", - level: LogLevel.Info); - } catch (e, s) { - final String msg = "SubscribableElectrumXClient.connectTor(): " - "failed to create a SOCKS socket at $proxyInfo (SSL $useSSL)..." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - - try { - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "connecting to SOCKS socket at $proxyInfo (SSL $useSSL)...", - level: LogLevel.Info); - - await _socksSocket?.connect(); - - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "connected to SOCKS socket at $proxyInfo...", - level: LogLevel.Info); - } catch (e, s) { - final String msg = "SubscribableElectrumXClient.connectTor(): " - "failed to connect to SOCKS socket at $proxyInfo.." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - - try { - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "connecting to $host:$port over SOCKS socket at $proxyInfo...", - level: LogLevel.Info); - - await _socksSocket?.connectTo(host, port); - - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "connected to $host:$port over SOCKS socket at $proxyInfo", - level: LogLevel.Info); - } catch (e, s) { - final String msg = "SubscribableElectrumXClient.connectTor(): " - "failed to connect $host over tor proxy at $proxyInfo." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - - return; - } - - /// Disconnect from the server. - Future disconnect() async { - _aliveTimer?.cancel(); - _aliveTimer = null; - - try { - await _socket?.close(); - } catch (e, s) { - Logging.instance.log( - "SubscribableElectrumXClient.disconnect: failed to close socket." - "\nError: $e\nStack trace: $s", - level: LogLevel.Warning); - } - _socket = null; - - try { - await _socksSocket?.close(); - } catch (e, s) { - Logging.instance.log( - "SubscribableElectrumXClient.disconnect: failed to close SOCKS socket." - "\nError: $e\nStack trace: $s", - level: LogLevel.Warning); - } - _socksSocket = null; - - onConnectionStatusChanged = null; - } - - /// Format JSON request string. - String _buildJsonRequestString({ - required String method, - required String id, - required List params, - }) { - final paramString = jsonEncode(params); - return '{"jsonrpc": "2.0", "id": "$id","method": "$method","params": $paramString}\r\n'; - } - - /// Update the connection status and call the onConnectionStatusChanged callback if it exists. - void _updateConnectionStatus(bool connectionStatus) { - if (_isConnected != connectionStatus && onConnectionStatusChanged != null) { - onConnectionStatusChanged!(connectionStatus); - } - _isConnected = connectionStatus; - } - - /// Called when the socket has data. - void _dataHandler(List data) { - _responseData.addAll(data); - - // 0x0A is newline - // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html - if (data.last == 0x0A) { - try { - final response = jsonDecode(String.fromCharCodes(_responseData)) - as Map; - _responseHandler(response); - } catch (e, s) { - Logging.instance - .log("JsonRPC jsonDecode: $e\n$s", level: LogLevel.Error); - rethrow; - } finally { - _responseData = []; - } - } - } - - /// Called when the socket has a response. - void _responseHandler(Map response) { - // subscriptions will have a method in the response - if (response['method'] is String) { - _subscriptionHandler(response: response); - return; - } - - final id = response['id'] as String; - final result = response['result']; - - _complete(id, result); - } - - /// Called when the subscription has a response. - void _subscriptionHandler({ - required Map response, - }) { - final method = response['method']; - switch (method) { - case "blockchain.scripthash.subscribe": - final params = response["params"] as List; - final scripthash = params.first as String; - final taskId = "blockchain.scripthash.subscribe:$scripthash"; - - _tasks[taskId]?.subscription?.addToStream(params.last); - break; - case "blockchain.headers.subscribe": - final params = response["params"]; - const taskId = "blockchain.headers.subscribe"; - - _tasks[taskId]?.subscription?.addToStream(params.first); - break; - default: - break; - } - } - - /// Called when the socket has an error. - void _errorHandler(Object error, StackTrace trace) { - _updateConnectionStatus(false); - Logging.instance.log( - "SubscribableElectrumXClient called _errorHandler with: $error\n$trace", - level: LogLevel.Info); - } - - /// Called when the socket is closed. - void _doneHandler() { - _updateConnectionStatus(false); - Logging.instance.log("SubscribableElectrumXClient called _doneHandler", - level: LogLevel.Info); - } - - /// Complete a task with the given id and data. - void _complete(String id, dynamic data) { - if (_tasks[id] == null) { - return; - } - - if (!(_tasks[id]?.completer?.isCompleted ?? false)) { - _tasks[id]?.completer?.complete(data); - } - - if (!(_tasks[id]?.isSubscription ?? false)) { - _tasks.remove(id); - } else { - _tasks[id]?.subscription?.addToStream(data); - } - } - - /// Add a task to the task list. - void _addTask({ - required String id, - required Completer completer, - }) { - _tasks[id] = SocketTask(completer: completer, subscription: null); - } - - /// Add a subscription task to the task list. - void _addSubscriptionTask({ - required String id, - required ElectrumXSubscription subscription, - }) { - _tasks[id] = SocketTask(completer: null, subscription: subscription); - } - - /// Write call to socket. - Future _call({ - required String method, - List params = const [], - }) async { - // If we're connecting to Tor, wait. - if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkSocket()); - } else { - await _checkSocket(); - } - - // Check socket is connected. - if (_prefs.useTor) { - if (_socksSocket == null) { - final msg = "SubscribableElectrumXClient._call: " - "SOCKSSocket is not connected. Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } else { - if (_socket == null) { - final msg = "SubscribableElectrumXClient._call: " - "Socket is not connected. Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } - - final completer = Completer(); - _currentRequestID++; - final id = _currentRequestID.toString(); - - // Write to the socket. - try { - _addTask(id: id, completer: completer); - - if (_prefs.useTor) { - _socksSocket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } else { - _socket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } - - return completer.future; - } catch (e, s) { - final String msg = "SubscribableElectrumXClient._call: " - "failed to request $method with id $id." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - } - - /// Write call to socket with timeout. - Future _callWithTimeout({ - required String method, - List params = const [], - Duration timeout = const Duration(seconds: 2), - }) async { - // If we're connecting to Tor, wait. - if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkSocket()); - } else { - await _checkSocket(); - } - - // Check socket is connected. - if (_prefs.useTor) { - if (_socksSocket == null) { - try { - if (_host == null || _port == null) { - throw Exception("No host or port provided"); - } - - // Attempt to conect. - await connect( - host: _host!, - port: _port!, - ); - } catch (e, s) { - final msg = "SubscribableElectrumXClient._callWithTimeout: " - "SOCKSSocket not connected and cannot connect. " - "Method $method, params $params." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } - } else { - if (_socket == null) { - try { - if (_host == null || _port == null) { - throw Exception("No host or port provided"); - } - - // Attempt to conect. - await connect( - host: _host!, - port: _port!, - ); - } catch (e, s) { - final msg = "SubscribableElectrumXClient._callWithTimeout: " - "Socket not connected and cannot connect. " - "Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } - } - - final completer = Completer(); - _currentRequestID++; - final id = _currentRequestID.toString(); - - // Write to the socket. - try { - _addTask(id: id, completer: completer); - - if (_prefs.useTor) { - _socksSocket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } else { - _socket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } - - Timer(timeout, () { - if (!completer.isCompleted) { - completer.completeError( - Exception("Request \"id: $id, method: $method\" timed out!"), - ); - } - }); - - return completer.future; - } catch (e, s) { - final String msg = "SubscribableElectrumXClient._callWithTimeout: " - "failed to request $method with id $id (timeout $timeout)." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - } - - ElectrumXSubscription _subscribe({ - required String id, - required String method, - List params = const [], - }) { - try { - final subscription = ElectrumXSubscription(); - _addSubscriptionTask(id: id, subscription: subscription); - _currentRequestID++; - - // Check socket is connected. - if (_prefs.useTor) { - if (_socksSocket == null) { - final msg = "SubscribableElectrumXClient._call: " - "SOCKSSocket is not connected. Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } else { - if (_socket == null) { - final msg = "SubscribableElectrumXClient._call: " - "Socket is not connected. Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } - - // Write to the socket. - if (_prefs.useTor) { - _socksSocket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } else { - _socket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } - - return subscription; - } catch (e, s) { - final String msg = "SubscribableElectrumXClient._subscribe: " - "failed to subscribe to $method with id $id." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - } - - /// Ping the server to ensure it is responding - /// - /// Returns true if ping succeeded - Future ping() async { - // If we're connecting to Tor, wait. - if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkSocket()); - } else { - await _checkSocket(); - } - - // Write to the socket. - try { - final response = (await _callWithTimeout(method: "server.ping")) as Map; - return response.keys.contains("result") && response["result"] == null; - } catch (_) { - return false; - } - } - - /// Subscribe to a scripthash to receive notifications on status changes - ElectrumXSubscription subscribeToScripthash({required String scripthash}) { - return _subscribe( - id: 'blockchain.scripthash.subscribe:$scripthash', - method: 'blockchain.scripthash.subscribe', - params: [scripthash], - ); - } - - /// Subscribe to block headers to receive notifications on new blocks found - /// - /// Returns the existing subscription if found - ElectrumXSubscription subscribeToBlockHeaders() { - return _tasks["blockchain.headers.subscribe"]?.subscription ?? - _subscribe( - id: "blockchain.headers.subscribe", - method: "blockchain.headers.subscribe", - params: [], - ); - } -} +// /* +// * This file is part of Stack Wallet. +// * +// * Copyright (c) 2023 Cypher Stack +// * All Rights Reserved. +// * The code is distributed under GPLv3 license, see LICENSE file for details. +// * Generated by Cypher Stack on 2023-05-26 +// * +// */ +// +// import 'dart:async'; +// import 'dart:convert'; +// import 'dart:io'; +// +// import 'package:event_bus/event_bus.dart'; +// import 'package:mutex/mutex.dart'; +// import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; +// import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart'; +// import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; +// import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; +// import 'package:stackwallet/services/event_bus/global_event_bus.dart'; +// import 'package:stackwallet/services/tor_service.dart'; +// import 'package:stackwallet/utilities/logger.dart'; +// import 'package:stackwallet/utilities/prefs.dart'; +// import 'package:tor_ffi_plugin/socks_socket.dart'; +// +// class ElectrumXSubscription { +// final StreamController _controller = +// StreamController(); // TODO controller params +// +// Stream get responseStream => _controller.stream; +// +// void addToStream(dynamic data) => _controller.add(data); +// } +// +// class SocketTask { +// SocketTask({this.completer, this.subscription}); +// +// final Completer? completer; +// final ElectrumXSubscription? subscription; +// +// bool get isSubscription => subscription != null; +// } +// +// class SubscribableElectrumXClient { +// int _currentRequestID = 0; +// bool _isConnected = false; +// List _responseData = []; +// final Map _tasks = {}; +// Timer? _aliveTimer; +// Socket? _socket; +// SOCKSSocket? _socksSocket; +// late final bool _useSSL; +// late final Duration _connectionTimeout; +// late final Duration _keepAlive; +// +// bool get isConnected => _isConnected; +// bool get useSSL => _useSSL; +// // Used to reconnect. +// String? _host; +// int? _port; +// +// void Function(bool)? onConnectionStatusChanged; +// +// late Prefs _prefs; +// late TorService _torService; +// StreamSubscription? _torPreferenceListener; +// StreamSubscription? _torStatusListener; +// final Mutex _torConnectingLock = Mutex(); +// bool _requireMutex = false; +// +// List? failovers; +// int currentFailoverIndex = -1; +// +// SubscribableElectrumXClient({ +// required bool useSSL, +// required Prefs prefs, +// required List failovers, +// TorService? torService, +// this.onConnectionStatusChanged, +// Duration connectionTimeout = const Duration(seconds: 5), +// Duration keepAlive = const Duration(seconds: 10), +// EventBus? globalEventBusForTesting, +// }) { +// _useSSL = useSSL; +// _prefs = prefs; +// _torService = torService ?? TorService.sharedInstance; +// _connectionTimeout = connectionTimeout; +// _keepAlive = keepAlive; +// +// // If we're testing, use the global event bus for testing. +// final bus = globalEventBusForTesting ?? GlobalEventBus.instance; +// +// // Listen to global event bus for Tor status changes. +// _torStatusListener = bus.on().listen( +// (event) async { +// try { +// switch (event.newStatus) { +// case TorConnectionStatus.connecting: +// // If Tor is connecting, we need to wait. +// await _torConnectingLock.acquire(); +// _requireMutex = true; +// break; +// +// case TorConnectionStatus.connected: +// case TorConnectionStatus.disconnected: +// // If Tor is connected or disconnected, we can release the lock. +// if (_torConnectingLock.isLocked) { +// _torConnectingLock.release(); +// } +// _requireMutex = false; +// break; +// } +// } finally { +// // Ensure the lock is released. +// if (_torConnectingLock.isLocked) { +// _torConnectingLock.release(); +// } +// } +// }, +// ); +// +// // Listen to global event bus for Tor preference changes. +// _torPreferenceListener = bus.on().listen( +// (event) async { +// // Close open socket (if open). +// final tempSocket = _socket; +// _socket = null; +// await tempSocket?.close(); +// +// // Close open SOCKS socket (if open). +// final tempSOCKSSocket = _socksSocket; +// _socksSocket = null; +// await tempSOCKSSocket?.close(); +// +// // Clear subscriptions. +// _tasks.clear(); +// +// // Cancel alive timer +// _aliveTimer?.cancel(); +// }, +// ); +// } +// +// factory SubscribableElectrumXClient.from({ +// required ElectrumXNode node, +// required Prefs prefs, +// required List failovers, +// TorService? torService, +// }) { +// return SubscribableElectrumXClient( +// useSSL: node.useSSL, +// prefs: prefs, +// failovers: failovers, +// torService: torService ?? TorService.sharedInstance, +// ); +// } +// +// // Example for returning a future which completes upon connection. +// // static Future from({ +// // required ElectrumXNode node, +// // TorService? torService, +// // }) async { +// // final client = SubscribableElectrumXClient( +// // useSSL: node.useSSL, +// // ); +// // +// // await client.connect(host: node.address, port: node.port); +// // +// // return client; +// // } +// +// /// Check if the RPC client is connected and connect if needed. +// /// +// /// If Tor is enabled but not running, it will attempt to start Tor. +// Future _checkSocket({bool connecting = false}) async { +// if (_prefs.useTor) { +// // If we're supposed to use Tor... +// if (_torService.status != TorConnectionStatus.connected) { +// // ... but Tor isn't running... +// if (!_prefs.torKillSwitch) { +// // ... and the killswitch isn't set, then we'll just return below. +// Logging.instance.log( +// "Tor preference set but Tor is not enabled, killswitch not set, connecting to ElectrumX through clearnet.", +// level: LogLevel.Warning, +// ); +// } else { +// // ... but if the killswitch is set, then let's try to start Tor. +// await _torService.start(); +// // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. +// +// // Double-check that Tor is running. +// if (_torService.status != TorConnectionStatus.connected) { +// // If Tor still isn't running, then we'll throw an exception. +// throw Exception("SubscribableElectrumXClient._checkRpcClient: " +// "Tor preference and killswitch set but Tor not enabled and could not start, not connecting to ElectrumX."); +// } +// } +// } +// } +// +// // Connect if needed. +// if (!connecting) { +// if ((!_prefs.useTor && _socket == null) || +// (_prefs.useTor && _socksSocket == null)) { +// if (currentFailoverIndex == -1) { +// // Check if we have cached node information +// if (_host == null && _port == null) { +// throw Exception("SubscribableElectrumXClient._checkRpcClient: " +// "No host or port provided and no cached node information."); +// } +// +// // Connect to the server. +// await connect(host: _host!, port: _port!); +// } else { +// // Attempt to connect to the next failover server. +// await connect( +// host: failovers![currentFailoverIndex].address, +// port: failovers![currentFailoverIndex].port, +// ); +// } +// } +// } +// } +// +// /// Connect to the server. +// /// +// /// If Tor is enabled, it will attempt to connect through Tor. +// Future connect({ +// required String host, +// required int port, +// }) async { +// try { +// // Cache node information. +// _host = host; +// _port = port; +// +// // If we're already connected, disconnect first. +// try { +// await _socket?.close(); +// } catch (_) {} +// +// // If we're connecting to Tor, wait. +// if (_requireMutex) { +// await _torConnectingLock +// .protect(() async => await _checkSocket(connecting: true)); +// } else { +// await _checkSocket(connecting: true); +// } +// +// if (!Prefs.instance.useTor) { +// // If we're not supposed to use Tor, then connect directly. +// await connectClearnet(host, port); +// } else { +// // If we're supposed to use Tor... +// if (_torService.status != TorConnectionStatus.connected) { +// // ... but Tor isn't running... +// if (!_prefs.torKillSwitch) { +// // ... and the killswitch isn't set, then we'll connect clearnet. +// Logging.instance.log( +// "Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet", +// level: LogLevel.Warning, +// ); +// await connectClearnet(host, port); +// } else { +// // ... but if the killswitch is set, then let's try to start Tor. +// await _torService.start(); +// // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. +// +// // Doublecheck that Tor is running. +// if (_torService.status != TorConnectionStatus.connected) { +// // If Tor still isn't running, then we'll throw an exception. +// throw Exception( +// "Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX"); +// } +// +// // Connect via Tor. +// await connectTor(host, port); +// } +// } else { +// // Connect via Tor. +// await connectTor(host, port); +// } +// } +// +// _updateConnectionStatus(true); +// +// if (_prefs.useTor) { +// if (_socksSocket == null) { +// final String msg = "SubscribableElectrumXClient.connect(): " +// "cannot listen to $host:$port via SOCKSSocket because it is not connected."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// +// _socksSocket!.listen( +// _dataHandler, +// onError: _errorHandler, +// onDone: _doneHandler, +// cancelOnError: true, +// ); +// } else { +// if (_socket == null) { +// final String msg = "SubscribableElectrumXClient.connect(): " +// "cannot listen to $host:$port via socket because it is not connected."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// +// _socket!.listen( +// _dataHandler, +// onError: _errorHandler, +// onDone: _doneHandler, +// cancelOnError: true, +// ); +// } +// +// _aliveTimer?.cancel(); +// _aliveTimer = Timer.periodic( +// _keepAlive, +// (_) async => _updateConnectionStatus(await ping()), +// ); +// } catch (e, s) { +// final msg = "SubscribableElectrumXClient.connect: " +// "failed to connect to $host:$port." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// +// // Ensure cleanup is performed on failure to avoid resource leaks. +// await disconnect(); // Use the disconnect method to clean up. +// rethrow; // Rethrow the exception to handle it further up the call stack. +// } +// } +// +// /// Connect to the server directly. +// Future connectClearnet(String host, int port) async { +// try { +// Logging.instance.log( +// "SubscribableElectrumXClient.connectClearnet(): " +// "creating a socket to $host:$port (SSL $useSSL)...", +// level: LogLevel.Info); +// +// if (_useSSL) { +// _socket = await SecureSocket.connect( +// host, +// port, +// timeout: _connectionTimeout, +// onBadCertificate: (_) => +// true, // TODO do not automatically trust bad certificates. +// ); +// } else { +// _socket = await Socket.connect( +// host, +// port, +// timeout: _connectionTimeout, +// ); +// } +// +// Logging.instance.log( +// "SubscribableElectrumXClient.connectClearnet(): " +// "created socket to $host:$port...", +// level: LogLevel.Info); +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient.connectClearnet: " +// "failed to connect to $host (SSL: $useSSL)." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// +// return; +// } +// +// /// Connect to the server using the Tor service. +// Future connectTor(String host, int port) async { +// // Get the proxy info from the TorService. +// final proxyInfo = _torService.getProxyInfo(); +// +// try { +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "creating a SOCKS socket at $proxyInfo (SSL $useSSL)...", +// level: LogLevel.Info); +// +// // Create a socks socket using the Tor service's proxy info. +// _socksSocket = await SOCKSSocket.create( +// proxyHost: proxyInfo.host.address, +// proxyPort: proxyInfo.port, +// sslEnabled: useSSL, +// ); +// +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "created SOCKS socket at $proxyInfo...", +// level: LogLevel.Info); +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient.connectTor(): " +// "failed to create a SOCKS socket at $proxyInfo (SSL $useSSL)..." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// +// try { +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "connecting to SOCKS socket at $proxyInfo (SSL $useSSL)...", +// level: LogLevel.Info); +// +// await _socksSocket?.connect(); +// +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "connected to SOCKS socket at $proxyInfo...", +// level: LogLevel.Info); +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient.connectTor(): " +// "failed to connect to SOCKS socket at $proxyInfo.." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// +// try { +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "connecting to $host:$port over SOCKS socket at $proxyInfo...", +// level: LogLevel.Info); +// +// await _socksSocket?.connectTo(host, port); +// +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "connected to $host:$port over SOCKS socket at $proxyInfo", +// level: LogLevel.Info); +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient.connectTor(): " +// "failed to connect $host over tor proxy at $proxyInfo." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// +// return; +// } +// +// /// Disconnect from the server. +// Future disconnect() async { +// _aliveTimer?.cancel(); +// _aliveTimer = null; +// +// try { +// await _socket?.close(); +// } catch (e, s) { +// Logging.instance.log( +// "SubscribableElectrumXClient.disconnect: failed to close socket." +// "\nError: $e\nStack trace: $s", +// level: LogLevel.Warning); +// } +// _socket = null; +// +// try { +// await _socksSocket?.close(); +// } catch (e, s) { +// Logging.instance.log( +// "SubscribableElectrumXClient.disconnect: failed to close SOCKS socket." +// "\nError: $e\nStack trace: $s", +// level: LogLevel.Warning); +// } +// _socksSocket = null; +// +// onConnectionStatusChanged = null; +// } +// +// /// Format JSON request string. +// String _buildJsonRequestString({ +// required String method, +// required String id, +// required List params, +// }) { +// final paramString = jsonEncode(params); +// return '{"jsonrpc": "2.0", "id": "$id","method": "$method","params": $paramString}\r\n'; +// } +// +// /// Update the connection status and call the onConnectionStatusChanged callback if it exists. +// void _updateConnectionStatus(bool connectionStatus) { +// if (_isConnected != connectionStatus && onConnectionStatusChanged != null) { +// onConnectionStatusChanged!(connectionStatus); +// } +// _isConnected = connectionStatus; +// } +// +// /// Called when the socket has data. +// void _dataHandler(List data) { +// _responseData.addAll(data); +// +// // 0x0A is newline +// // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html +// if (data.last == 0x0A) { +// try { +// final response = jsonDecode(String.fromCharCodes(_responseData)) +// as Map; +// _responseHandler(response); +// } catch (e, s) { +// Logging.instance +// .log("JsonRPC jsonDecode: $e\n$s", level: LogLevel.Error); +// rethrow; +// } finally { +// _responseData = []; +// } +// } +// } +// +// /// Called when the socket has a response. +// void _responseHandler(Map response) { +// // subscriptions will have a method in the response +// if (response['method'] is String) { +// _subscriptionHandler(response: response); +// return; +// } +// +// final id = response['id'] as String; +// final result = response['result']; +// +// _complete(id, result); +// } +// +// /// Called when the subscription has a response. +// void _subscriptionHandler({ +// required Map response, +// }) { +// final method = response['method']; +// switch (method) { +// case "blockchain.scripthash.subscribe": +// final params = response["params"] as List; +// final scripthash = params.first as String; +// final taskId = "blockchain.scripthash.subscribe:$scripthash"; +// +// _tasks[taskId]?.subscription?.addToStream(params.last); +// break; +// case "blockchain.headers.subscribe": +// final params = response["params"]; +// const taskId = "blockchain.headers.subscribe"; +// +// _tasks[taskId]?.subscription?.addToStream(params.first); +// break; +// default: +// break; +// } +// } +// +// /// Called when the socket has an error. +// void _errorHandler(Object error, StackTrace trace) { +// _updateConnectionStatus(false); +// Logging.instance.log( +// "SubscribableElectrumXClient called _errorHandler with: $error\n$trace", +// level: LogLevel.Info); +// } +// +// /// Called when the socket is closed. +// void _doneHandler() { +// _updateConnectionStatus(false); +// Logging.instance.log("SubscribableElectrumXClient called _doneHandler", +// level: LogLevel.Info); +// } +// +// /// Complete a task with the given id and data. +// void _complete(String id, dynamic data) { +// if (_tasks[id] == null) { +// return; +// } +// +// if (!(_tasks[id]?.completer?.isCompleted ?? false)) { +// _tasks[id]?.completer?.complete(data); +// } +// +// if (!(_tasks[id]?.isSubscription ?? false)) { +// _tasks.remove(id); +// } else { +// _tasks[id]?.subscription?.addToStream(data); +// } +// } +// +// /// Add a task to the task list. +// void _addTask({ +// required String id, +// required Completer completer, +// }) { +// _tasks[id] = SocketTask(completer: completer, subscription: null); +// } +// +// /// Add a subscription task to the task list. +// void _addSubscriptionTask({ +// required String id, +// required ElectrumXSubscription subscription, +// }) { +// _tasks[id] = SocketTask(completer: null, subscription: subscription); +// } +// +// /// Write call to socket. +// Future _call({ +// required String method, +// List params = const [], +// }) async { +// // If we're connecting to Tor, wait. +// if (_requireMutex) { +// await _torConnectingLock.protect(() async => await _checkSocket()); +// } else { +// await _checkSocket(); +// } +// +// // Check socket is connected. +// if (_prefs.useTor) { +// if (_socksSocket == null) { +// final msg = "SubscribableElectrumXClient._call: " +// "SOCKSSocket is not connected. Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } else { +// if (_socket == null) { +// final msg = "SubscribableElectrumXClient._call: " +// "Socket is not connected. Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } +// +// final completer = Completer(); +// _currentRequestID++; +// final id = _currentRequestID.toString(); +// +// // Write to the socket. +// try { +// _addTask(id: id, completer: completer); +// +// if (_prefs.useTor) { +// _socksSocket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } else { +// _socket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } +// +// return completer.future; +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient._call: " +// "failed to request $method with id $id." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// } +// +// /// Write call to socket with timeout. +// Future _callWithTimeout({ +// required String method, +// List params = const [], +// Duration timeout = const Duration(seconds: 2), +// }) async { +// // If we're connecting to Tor, wait. +// if (_requireMutex) { +// await _torConnectingLock.protect(() async => await _checkSocket()); +// } else { +// await _checkSocket(); +// } +// +// // Check socket is connected. +// if (_prefs.useTor) { +// if (_socksSocket == null) { +// try { +// if (_host == null || _port == null) { +// throw Exception("No host or port provided"); +// } +// +// // Attempt to conect. +// await connect( +// host: _host!, +// port: _port!, +// ); +// } catch (e, s) { +// final msg = "SubscribableElectrumXClient._callWithTimeout: " +// "SOCKSSocket not connected and cannot connect. " +// "Method $method, params $params." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } +// } else { +// if (_socket == null) { +// try { +// if (_host == null || _port == null) { +// throw Exception("No host or port provided"); +// } +// +// // Attempt to conect. +// await connect( +// host: _host!, +// port: _port!, +// ); +// } catch (e, s) { +// final msg = "SubscribableElectrumXClient._callWithTimeout: " +// "Socket not connected and cannot connect. " +// "Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } +// } +// +// final completer = Completer(); +// _currentRequestID++; +// final id = _currentRequestID.toString(); +// +// // Write to the socket. +// try { +// _addTask(id: id, completer: completer); +// +// if (_prefs.useTor) { +// _socksSocket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } else { +// _socket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } +// +// Timer(timeout, () { +// if (!completer.isCompleted) { +// completer.completeError( +// Exception("Request \"id: $id, method: $method\" timed out!"), +// ); +// } +// }); +// +// return completer.future; +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient._callWithTimeout: " +// "failed to request $method with id $id (timeout $timeout)." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// } +// +// ElectrumXSubscription _subscribe({ +// required String id, +// required String method, +// List params = const [], +// }) { +// try { +// final subscription = ElectrumXSubscription(); +// _addSubscriptionTask(id: id, subscription: subscription); +// _currentRequestID++; +// +// // Check socket is connected. +// if (_prefs.useTor) { +// if (_socksSocket == null) { +// final msg = "SubscribableElectrumXClient._call: " +// "SOCKSSocket is not connected. Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } else { +// if (_socket == null) { +// final msg = "SubscribableElectrumXClient._call: " +// "Socket is not connected. Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } +// +// // Write to the socket. +// if (_prefs.useTor) { +// _socksSocket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } else { +// _socket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } +// +// return subscription; +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient._subscribe: " +// "failed to subscribe to $method with id $id." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// } +// +// /// Ping the server to ensure it is responding +// /// +// /// Returns true if ping succeeded +// Future ping() async { +// // If we're connecting to Tor, wait. +// if (_requireMutex) { +// await _torConnectingLock.protect(() async => await _checkSocket()); +// } else { +// await _checkSocket(); +// } +// +// // Write to the socket. +// try { +// final response = (await _callWithTimeout(method: "server.ping")) as Map; +// return response.keys.contains("result") && response["result"] == null; +// } catch (_) { +// return false; +// } +// } +// +// /// Subscribe to a scripthash to receive notifications on status changes +// ElectrumXSubscription subscribeToScripthash({required String scripthash}) { +// return _subscribe( +// id: 'blockchain.scripthash.subscribe:$scripthash', +// method: 'blockchain.scripthash.subscribe', +// params: [scripthash], +// ); +// } +// +// /// Subscribe to block headers to receive notifications on new blocks found +// /// +// /// Returns the existing subscription if found +// ElectrumXSubscription subscribeToBlockHeaders() { +// return _tasks["blockchain.headers.subscribe"]?.subscription ?? +// _subscribe( +// id: "blockchain.headers.subscribe", +// method: "blockchain.headers.subscribe", +// params: [], +// ); +// } +// } diff --git a/lib/pages/settings_views/global_settings_view/manage_nodes_views/add_edit_node_view.dart b/lib/pages/settings_views/global_settings_view/manage_nodes_views/add_edit_node_view.dart index e03c3ab21..ddac06aca 100644 --- a/lib/pages/settings_views/global_settings_view/manage_nodes_views/add_edit_node_view.dart +++ b/lib/pages/settings_views/global_settings_view/manage_nodes_views/add_edit_node_view.dart @@ -177,6 +177,7 @@ class _AddEditNodeViewState extends ConsumerState { useSSL: formData.useSSL!, failovers: [], prefs: ref.read(prefsChangeNotifierProvider), + coin: coin, ); try { diff --git a/lib/pages/settings_views/global_settings_view/manage_nodes_views/node_details_view.dart b/lib/pages/settings_views/global_settings_view/manage_nodes_views/node_details_view.dart index 6bf0092e8..bd974b6ec 100644 --- a/lib/pages/settings_views/global_settings_view/manage_nodes_views/node_details_view.dart +++ b/lib/pages/settings_views/global_settings_view/manage_nodes_views/node_details_view.dart @@ -154,6 +154,7 @@ class _NodeDetailsViewState extends ConsumerState { useSSL: node.useSSL, failovers: [], prefs: ref.read(prefsChangeNotifierProvider), + coin: coin, ); try { diff --git a/lib/services/notifications_service.dart b/lib/services/notifications_service.dart index 1512c12c6..e6c49f47f 100644 --- a/lib/services/notifications_service.dart +++ b/lib/services/notifications_service.dart @@ -146,6 +146,7 @@ class NotificationsService extends ChangeNotifier { node: eNode, failovers: failovers, prefs: prefs, + coin: coin, ); final tx = await client.getTransaction(txHash: txid); diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index 6a044ff15..407bfa3c0 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -4,40 +4,59 @@ import 'dart:math'; import 'package:bip47/src/util.dart'; import 'package:bitcoindart/bitcoindart.dart' as bitcoindart; import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib; +import 'package:electrum_adapter/electrum_adapter.dart' as electrum_adapter; +import 'package:electrum_adapter/electrum_adapter.dart'; import 'package:isar/isar.dart'; +import 'package:mutex/mutex.dart'; import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; -import 'package:stackwallet/electrumx_rpc/subscribable_electrumx_client.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/v2/input_v2.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/v2/output_v2.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/v2/transaction_v2.dart'; import 'package:stackwallet/models/isar/models/isar_models.dart'; import 'package:stackwallet/models/paymint/fee_object_model.dart'; import 'package:stackwallet/models/signing_data.dart'; +import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; +import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; +import 'package:stackwallet/services/event_bus/global_event_bus.dart'; +import 'package:stackwallet/services/tor_service.dart'; import 'package:stackwallet/utilities/amount/amount.dart'; import 'package:stackwallet/utilities/enums/coin_enum.dart'; import 'package:stackwallet/utilities/enums/derive_path_type_enum.dart'; import 'package:stackwallet/utilities/enums/fee_rate_type_enum.dart'; import 'package:stackwallet/utilities/logger.dart'; import 'package:stackwallet/utilities/paynym_is_api.dart'; +import 'package:stackwallet/utilities/prefs.dart'; import 'package:stackwallet/wallets/crypto_currency/coins/firo.dart'; import 'package:stackwallet/wallets/crypto_currency/intermediate/bip39_hd_currency.dart'; import 'package:stackwallet/wallets/models/tx_data.dart'; import 'package:stackwallet/wallets/wallet/impl/bitcoin_wallet.dart'; import 'package:stackwallet/wallets/wallet/intermediate/bip39_hd_wallet.dart'; import 'package:stackwallet/wallets/wallet/wallet_mixin_interfaces/paynym_interface.dart'; -import 'package:uuid/uuid.dart'; +import 'package:stream_channel/stream_channel.dart'; mixin ElectrumXInterface on Bip39HDWallet { late ElectrumXClient electrumXClient; + late StreamChannel electrumAdapterChannel; + late ElectrumClient electrumAdapterClient; late CachedElectrumXClient electrumXCachedClient; - late SubscribableElectrumXClient subscribableElectrumXClient; + // late SubscribableElectrumXClient subscribableElectrumXClient; int? get maximumFeerate => null; int? _latestHeight; + late Prefs _prefs; + late TorService _torService; + StreamSubscription? _torPreferenceListener; + StreamSubscription? _torStatusListener; + final Mutex _torConnectingLock = Mutex(); + bool _requireMutex = false; + Timer? _aliveTimer; + static const Duration _keepAlive = Duration(minutes: 1); + bool _isConnected = false; + static const _kServerBatchCutoffVersion = [1, 6]; List? _serverVersion; bool get serverCanBatch { @@ -812,38 +831,94 @@ mixin ElectrumXInterface on Bip39HDWallet { // Make sure we only complete once. final isFirstResponse = _latestHeight == null; - // Subscribe to block headers. - final subscription = - subscribableElectrumXClient.subscribeToBlockHeaders(); + // Check Electrum and update internal and cached versions if necessary. + await electrumXClient.checkElectrumAdapter(); + if (electrumAdapterChannel != electrumXClient.electrumAdapterChannel && + electrumXClient.electrumAdapterChannel != null) { + electrumAdapterChannel = electrumXClient.electrumAdapterChannel!; + } + if (electrumAdapterClient != electrumXClient.electrumAdapterClient && + electrumXClient.electrumAdapterClient != null) { + electrumAdapterClient = electrumXClient.electrumAdapterClient!; + } + // electrumXCachedClient.electrumAdapterChannel = electrumAdapterChannel; + if (electrumXCachedClient.electrumAdapterClient != + electrumAdapterClient) { + electrumXCachedClient.electrumAdapterClient = electrumAdapterClient; + } - // set stream subscription + // Subscribe to and listen for new block headers. + final stream = electrumAdapterClient.subscribeHeaders(); ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = - subscription.responseStream.asBroadcastStream().listen((event) { - final response = event; - if (response != null && - response is Map && - response.containsKey('height')) { - final int chainHeight = response['height'] as int; - // print("Current chain height: $chainHeight"); + stream.asBroadcastStream().listen((response) { + final int chainHeight = response.height; + // print("Current chain height: $chainHeight"); - _latestHeight = chainHeight; + _latestHeight = chainHeight; - if (isFirstResponse && !completer.isCompleted) { - // Return the chain height. - completer.complete(chainHeight); - } - } else { - Logging.instance.log( - "blockchain.headers.subscribe returned malformed response\n" - "Response: $response", - level: LogLevel.Error); + if (isFirstResponse && !completer.isCompleted) { + // Return the chain height. + completer.complete(chainHeight); } }); - return _latestHeight ?? await completer.future; - } - // Don't set a stream subscription if one already exists. - else { + // If we're testing, use the global event bus for testing. + // final bus = globalEventBusForTesting ?? GlobalEventBus.instance; + // No constructors for mixins, so no globalEventBusForTesting is passed in. + final bus = GlobalEventBus.instance; + + // Listen to global event bus for Tor status changes. + _torStatusListener ??= bus.on().listen( + (event) async { + try { + switch (event.newStatus) { + case TorConnectionStatus.connecting: + // If Tor is connecting, we need to wait. + await _torConnectingLock.acquire(); + _requireMutex = true; + break; + + case TorConnectionStatus.connected: + case TorConnectionStatus.disconnected: + // If Tor is connected or disconnected, we can release the lock. + if (_torConnectingLock.isLocked) { + _torConnectingLock.release(); + } + _requireMutex = false; + break; + } + } finally { + // Ensure the lock is released. + if (_torConnectingLock.isLocked) { + _torConnectingLock.release(); + } + } + }, + ); + + // Listen to global event bus for Tor preference changes. + _torPreferenceListener ??= bus.on().listen( + (event) async { + // Close any open subscriptions. + for (final coinSub + in ElectrumxChainHeightService.subscriptions.entries) { + await coinSub.value?.cancel(); + } + + // Cancel alive timer + _aliveTimer?.cancel(); + }, + ); + + // Set a timer to check if the subscription is still alive. + _aliveTimer?.cancel(); + _aliveTimer = Timer.periodic( + _keepAlive, + (_) async => _updateConnectionStatus(await electrumXClient.ping()), + ); + } else { + // Don't set a stream subscription if one already exists. + // Check if the stream subscription is paused. if (ElectrumxChainHeightService .subscriptions[cryptoCurrency.coin]!.isPaused) { @@ -852,19 +927,6 @@ mixin ElectrumXInterface on Bip39HDWallet { .resume(); } - // Causes synchronization to stall. - // // Check if the stream subscription is active by pinging it. - // if (!(await subscribableElectrumXClient.ping())) { - // // If it's not active, reconnect it. - // final node = await getCurrentElectrumXNode(); - // - // await subscribableElectrumXClient.connect( - // host: node.address, port: node.port); - // - // // Wait for first response. - // return completer.future; - // } - if (_latestHeight != null) { return _latestHeight!; } @@ -889,7 +951,7 @@ mixin ElectrumXInterface on Bip39HDWallet { return transactions.length; } - Future> fetchTxCountBatched({ + Future> fetchTxCountBatched({ required Map addresses, }) async { try { @@ -901,7 +963,7 @@ mixin ElectrumXInterface on Bip39HDWallet { } final response = await electrumXClient.getBatchHistory(args: args); - final Map result = {}; + final Map result = {}; for (final entry in response.entries) { result[entry.key] = entry.value.length; } @@ -939,21 +1001,73 @@ mixin ElectrumXInterface on Bip39HDWallet { .toList(); final newNode = await getCurrentElectrumXNode(); + try { + await electrumXClient.electrumAdapterClient?.close(); + } catch (e, s) { + if (e.toString().contains("initialized")) { + // Ignore. This should happen every first time the wallet is opened. + } else { + Logging.instance + .log("Error closing electrumXClient: $e", level: LogLevel.Error); + } + } electrumXClient = ElectrumXClient.from( node: newNode, prefs: prefs, failovers: failovers, + coin: cryptoCurrency.coin, ); + electrumAdapterChannel = await electrum_adapter.connect( + newNode.address, + port: newNode.port, + acceptUnverified: true, + useSSL: newNode.useSSL, + proxyInfo: Prefs.instance.useTor + ? TorService.sharedInstance.getProxyInfo() + : null, + ); + if (electrumXClient.coin == Coin.firo || + electrumXClient.coin == Coin.firoTestNet) { + electrumAdapterClient = FiroElectrumClient( + electrumAdapterChannel, + newNode.address, + newNode.port, + newNode.useSSL, + Prefs.instance.useTor + ? TorService.sharedInstance.getProxyInfo() + : null); + } else { + electrumAdapterClient = ElectrumClient( + electrumAdapterChannel, + newNode.address, + newNode.port, + newNode.useSSL, + Prefs.instance.useTor + ? TorService.sharedInstance.getProxyInfo() + : null); + } electrumXCachedClient = CachedElectrumXClient.from( electrumXClient: electrumXClient, + electrumAdapterClient: electrumAdapterClient, + electrumAdapterUpdateCallback: updateClient, ); - subscribableElectrumXClient = SubscribableElectrumXClient.from( - node: newNode, - prefs: prefs, - failovers: failovers, - ); - await subscribableElectrumXClient.connect( - host: newNode.address, port: newNode.port); + // Replaced using electrum_adapters' SubscribableClient in fetchChainHeight. + // subscribableElectrumXClient = SubscribableElectrumXClient.from( + // node: newNode, + // prefs: prefs, + // failovers: failovers, + // ); + // await subscribableElectrumXClient.connect( + // host: newNode.address, port: newNode.port); + } + + /// Update the connection status and call the onConnectionStatusChanged callback if it exists. + void _updateConnectionStatus(bool connectionStatus) { + // TODO [prio=low]: Set onConnectionStatusChanged callback. + // if (_isConnected != connectionStatus && onConnectionStatusChanged != null) { + // onConnectionStatusChanged!(connectionStatus); + // } + _isConnected = connectionStatus; } //============================================================================ @@ -1115,21 +1229,22 @@ mixin ElectrumXInterface on Bip39HDWallet { List> allTxHashes = []; if (serverCanBatch) { - final Map>> batches = {}; - final Map requestIdToAddressMap = {}; + final Map>> batches = {}; + final Map requestIdToAddressMap = {}; const batchSizeMax = 100; int batchNumber = 0; for (int i = 0; i < allAddresses.length; i++) { - if (batches[batchNumber] == null) { - batches[batchNumber] = {}; + if (batches["$batchNumber"] == null) { + batches["$batchNumber"] = {}; } final scriptHash = cryptoCurrency.addressToScriptHash( address: allAddresses.elementAt(i), ); - final id = Logger.isTestEnv ? "$i" : const Uuid().v1(); - requestIdToAddressMap[id] = allAddresses.elementAt(i); - batches[batchNumber]!.addAll({ - id: [scriptHash] + // final id = Logger.isTestEnv ? "$i" : const Uuid().v1(); + // TODO [prio=???]: Pass request IDs to electrum_adapter. + requestIdToAddressMap[i] = allAddresses.elementAt(i); + batches["$batchNumber"]!.addAll({ + "$i": [scriptHash] }); if (i % batchSizeMax == batchSizeMax - 1) { batchNumber++; @@ -1138,7 +1253,7 @@ mixin ElectrumXInterface on Bip39HDWallet { for (int i = 0; i < batches.length; i++) { final response = - await electrumXClient.getBatchHistory(args: batches[i]!); + await electrumXClient.getBatchHistory(args: batches["$i"]!); for (final entry in response.entries) { for (int j = 0; j < entry.value.length; j++) { entry.value[j]["address"] = requestIdToAddressMap[entry.key]; @@ -1186,6 +1301,8 @@ mixin ElectrumXInterface on Bip39HDWallet { coin: cryptoCurrency.coin, ); + print("txn: $txn"); + final vout = jsonUTXO["tx_pos"] as int; final outputs = txn["vout"] as List; @@ -1254,6 +1371,13 @@ mixin ElectrumXInterface on Bip39HDWallet { await updateElectrumX(newNode: node); } + Future updateClient() async { + Logging.instance.log("Updating electrum node and ElectrumAdapterClient.", + level: LogLevel.Info); + await updateNode(); + return electrumAdapterClient; + } + FeeObject? _cachedFees; @override diff --git a/lib/widgets/node_card.dart b/lib/widgets/node_card.dart index 7576999de..bc7233065 100644 --- a/lib/widgets/node_card.dart +++ b/lib/widgets/node_card.dart @@ -175,6 +175,7 @@ class _NodeCardState extends ConsumerState { useSSL: node.useSSL, failovers: [], prefs: ref.read(prefsChangeNotifierProvider), + coin: widget.coin, ); try { diff --git a/lib/widgets/node_options_sheet.dart b/lib/widgets/node_options_sheet.dart index c14b6a2ec..0a2e5ac4c 100644 --- a/lib/widgets/node_options_sheet.dart +++ b/lib/widgets/node_options_sheet.dart @@ -158,6 +158,7 @@ class NodeOptionsSheet extends ConsumerWidget { failovers: [], prefs: ref.read(prefsChangeNotifierProvider), torService: ref.read(pTorService), + coin: coin, ); try { diff --git a/pubspec.lock b/pubspec.lock index 840efc472..c0e5b16c1 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -524,6 +524,15 @@ packages: url: "https://pub.dev" source: hosted version: "1.0.2" + electrum_adapter: + dependency: "direct main" + description: + path: "." + ref: "51b7a60e07b0409b361e31da65d98178ee235bed" + resolved-ref: "51b7a60e07b0409b361e31da65d98178ee235bed" + url: "https://github.com/cypherstack/electrum_adapter.git" + source: git + version: "3.0.0" emojis: dependency: "direct main" description: @@ -674,10 +683,10 @@ packages: dependency: "direct dev" description: name: flutter_lints - sha256: a25a15ebbdfc33ab1cd26c63a6ee519df92338a9c10f122adda92938253bef04 + sha256: e2a421b7e59244faef694ba7b30562e489c2b489866e505074eb005cd7060db7 url: "https://pub.dev" source: hosted - version: "2.0.3" + version: "3.0.1" flutter_local_notifications: dependency: "direct main" description: @@ -1038,10 +1047,10 @@ packages: dependency: transitive description: name: lints - sha256: "0a217c6c989d21039f1498c3ed9f3ed71b354e69873f13a8dfc3c9fe76f1b452" + sha256: cbf8d4b858bb0134ef3ef87841abdf8d63bfc255c266b7bf6b39daa1085c4290 url: "https://pub.dev" source: hosted - version: "2.1.1" + version: "3.0.0" local_auth: dependency: "direct main" description: @@ -1594,7 +1603,7 @@ packages: source: hosted version: "1.5.3" stream_channel: - dependency: transitive + dependency: "direct main" description: name: stream_channel sha256: ba2aa5d8cc609d96bbb2899c28934f9e1af5cddbd60a827822ea467161eb54e7 diff --git a/pubspec.yaml b/pubspec.yaml index df60947c5..0ec9d9fa7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -173,6 +173,11 @@ dependencies: url: https://github.com/cypherstack/coinlib.git path: coinlib_flutter ref: 376d520b4516d4eb7c3f0bd4b1522f7769f3f2a7 + electrum_adapter: + git: + url: https://github.com/cypherstack/electrum_adapter.git + ref: 51b7a60e07b0409b361e31da65d98178ee235bed + stream_channel: ^2.1.0 dev_dependencies: flutter_test: @@ -189,7 +194,7 @@ dev_dependencies: # lint: ^1.10.0 analyzer: ^5.13.0 import_sorter: ^4.6.0 - flutter_lints: ^2.0.1 + flutter_lints: ^3.0.1 isar_generator: 3.0.5 flutter_launcher_icons: