diff --git a/lib/electrumx_rpc/electrumx_client.dart b/lib/electrumx_rpc/electrumx_client.dart index db135427c..bddce3289 100644 --- a/lib/electrumx_rpc/electrumx_client.dart +++ b/lib/electrumx_rpc/electrumx_client.dart @@ -256,7 +256,7 @@ class ElectrumXClient { } } - Future _checkElectrumAdapter() async { + Future checkElectrumAdapter() async { ({InternetAddress host, int port})? proxyInfo; // If we're supposed to use Tor... @@ -370,9 +370,9 @@ class ElectrumXClient { if (_requireMutex) { await _torConnectingLock - .protect(() async => await _checkElectrumAdapter()); + .protect(() async => await checkElectrumAdapter()); } else { - await _checkElectrumAdapter(); + await checkElectrumAdapter(); } try { @@ -450,9 +450,9 @@ class ElectrumXClient { if (_requireMutex) { await _torConnectingLock - .protect(() async => await _checkElectrumAdapter()); + .protect(() async => await checkElectrumAdapter()); } else { - await _checkElectrumAdapter(); + await checkElectrumAdapter(); } try { @@ -814,7 +814,7 @@ class ElectrumXClient { }) async { Logging.instance.log("attempting to fetch blockchain.transaction.get...", level: LogLevel.Info); - await _checkElectrumAdapter(); + await checkElectrumAdapter(); dynamic response = await _electrumAdapterClient!.getTransaction(txHash); Logging.instance.log("Fetching blockchain.transaction.get finished", level: LogLevel.Info); @@ -847,7 +847,7 @@ class ElectrumXClient { }) async { Logging.instance.log("attempting to fetch lelantus.getanonymityset...", level: LogLevel.Info); - await _checkElectrumAdapter(); + await checkElectrumAdapter(); Map response = await (_electrumAdapterClient as FiroElectrumClient)! .getLelantusAnonymitySet(groupId: groupId, blockHash: blockhash); @@ -866,7 +866,7 @@ class ElectrumXClient { }) async { Logging.instance.log("attempting to fetch lelantus.getmintmetadata...", level: LogLevel.Info); - await _checkElectrumAdapter(); + await checkElectrumAdapter(); dynamic response = await (_electrumAdapterClient as FiroElectrumClient)! .getLelantusMintData(mints: mints); Logging.instance.log("Fetching lelantus.getmintmetadata finished", @@ -882,7 +882,7 @@ class ElectrumXClient { }) async { Logging.instance.log("attempting to fetch lelantus.getusedcoinserials...", level: LogLevel.Info); - await _checkElectrumAdapter(); + await checkElectrumAdapter(); int retryCount = 3; dynamic response; @@ -906,7 +906,7 @@ class ElectrumXClient { Future getLelantusLatestCoinId({String? requestID}) async { Logging.instance.log("attempting to fetch lelantus.getlatestcoinid...", level: LogLevel.Info); - await _checkElectrumAdapter(); + await checkElectrumAdapter(); int response = await (_electrumAdapterClient as FiroElectrumClient).getLatestCoinId(); Logging.instance.log("Fetching lelantus.getlatestcoinid finished", @@ -937,7 +937,7 @@ class ElectrumXClient { try { Logging.instance.log("attempting to fetch spark.getsparkanonymityset...", level: LogLevel.Info); - await _checkElectrumAdapter(); + await checkElectrumAdapter(); Map response = await (_electrumAdapterClient as FiroElectrumClient) .getSparkAnonymitySet( @@ -960,7 +960,7 @@ class ElectrumXClient { // Use electrum_adapter package's getSparkUsedCoinsTags method. Logging.instance.log("attempting to fetch spark.getusedcoinstags...", level: LogLevel.Info); - await _checkElectrumAdapter(); + await checkElectrumAdapter(); Map response = await (_electrumAdapterClient as FiroElectrumClient) .getUsedCoinsTags(startNumber: startNumber); @@ -993,7 +993,7 @@ class ElectrumXClient { try { Logging.instance.log("attempting to fetch spark.getsparkmintmetadata...", level: LogLevel.Info); - await _checkElectrumAdapter(); + await checkElectrumAdapter(); List response = await (_electrumAdapterClient as FiroElectrumClient) .getSparkMintMetaData(sparkCoinHashes: sparkCoinHashes); @@ -1015,7 +1015,7 @@ class ElectrumXClient { try { Logging.instance.log("attempting to fetch spark.getsparklatestcoinid...", level: LogLevel.Info); - await _checkElectrumAdapter(); + await checkElectrumAdapter(); int response = await (_electrumAdapterClient as FiroElectrumClient) .getSparkLatestCoinId(); Logging.instance.log("Fetching spark.getsparklatestcoinid finished", @@ -1037,7 +1037,7 @@ class ElectrumXClient { /// "rate": 1000, /// } Future> getFeeRate({String? requestID}) async { - await _checkElectrumAdapter(); + await checkElectrumAdapter(); return await _electrumAdapterClient!.getFeeRate(); } diff --git a/lib/electrumx_rpc/subscribable_electrumx_client.dart b/lib/electrumx_rpc/subscribable_electrumx_client.dart index b17227c8b..f06771906 100644 --- a/lib/electrumx_rpc/subscribable_electrumx_client.dart +++ b/lib/electrumx_rpc/subscribable_electrumx_client.dart @@ -1,862 +1,862 @@ -/* - * This file is part of Stack Wallet. - * - * Copyright (c) 2023 Cypher Stack - * All Rights Reserved. - * The code is distributed under GPLv3 license, see LICENSE file for details. - * Generated by Cypher Stack on 2023-05-26 - * - */ - -import 'dart:async'; -import 'dart:convert'; -import 'dart:io'; - -import 'package:event_bus/event_bus.dart'; -import 'package:mutex/mutex.dart'; -import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; -import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart'; -import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; -import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; -import 'package:stackwallet/services/event_bus/global_event_bus.dart'; -import 'package:stackwallet/services/tor_service.dart'; -import 'package:stackwallet/utilities/logger.dart'; -import 'package:stackwallet/utilities/prefs.dart'; -import 'package:tor_ffi_plugin/socks_socket.dart'; - -class ElectrumXSubscription { - final StreamController _controller = - StreamController(); // TODO controller params - - Stream get responseStream => _controller.stream; - - void addToStream(dynamic data) => _controller.add(data); -} - -class SocketTask { - SocketTask({this.completer, this.subscription}); - - final Completer? completer; - final ElectrumXSubscription? subscription; - - bool get isSubscription => subscription != null; -} - -class SubscribableElectrumXClient { - int _currentRequestID = 0; - bool _isConnected = false; - List _responseData = []; - final Map _tasks = {}; - Timer? _aliveTimer; - Socket? _socket; - SOCKSSocket? _socksSocket; - late final bool _useSSL; - late final Duration _connectionTimeout; - late final Duration _keepAlive; - - bool get isConnected => _isConnected; - bool get useSSL => _useSSL; - // Used to reconnect. - String? _host; - int? _port; - - void Function(bool)? onConnectionStatusChanged; - - late Prefs _prefs; - late TorService _torService; - StreamSubscription? _torPreferenceListener; - StreamSubscription? _torStatusListener; - final Mutex _torConnectingLock = Mutex(); - bool _requireMutex = false; - - List? failovers; - int currentFailoverIndex = -1; - - SubscribableElectrumXClient({ - required bool useSSL, - required Prefs prefs, - required List failovers, - TorService? torService, - this.onConnectionStatusChanged, - Duration connectionTimeout = const Duration(seconds: 5), - Duration keepAlive = const Duration(seconds: 10), - EventBus? globalEventBusForTesting, - }) { - _useSSL = useSSL; - _prefs = prefs; - _torService = torService ?? TorService.sharedInstance; - _connectionTimeout = connectionTimeout; - _keepAlive = keepAlive; - - // If we're testing, use the global event bus for testing. - final bus = globalEventBusForTesting ?? GlobalEventBus.instance; - - // Listen to global event bus for Tor status changes. - _torStatusListener = bus.on().listen( - (event) async { - try { - switch (event.newStatus) { - case TorConnectionStatus.connecting: - // If Tor is connecting, we need to wait. - await _torConnectingLock.acquire(); - _requireMutex = true; - break; - - case TorConnectionStatus.connected: - case TorConnectionStatus.disconnected: - // If Tor is connected or disconnected, we can release the lock. - if (_torConnectingLock.isLocked) { - _torConnectingLock.release(); - } - _requireMutex = false; - break; - } - } finally { - // Ensure the lock is released. - if (_torConnectingLock.isLocked) { - _torConnectingLock.release(); - } - } - }, - ); - - // Listen to global event bus for Tor preference changes. - _torPreferenceListener = bus.on().listen( - (event) async { - // Close open socket (if open). - final tempSocket = _socket; - _socket = null; - await tempSocket?.close(); - - // Close open SOCKS socket (if open). - final tempSOCKSSocket = _socksSocket; - _socksSocket = null; - await tempSOCKSSocket?.close(); - - // Clear subscriptions. - _tasks.clear(); - - // Cancel alive timer - _aliveTimer?.cancel(); - }, - ); - } - - factory SubscribableElectrumXClient.from({ - required ElectrumXNode node, - required Prefs prefs, - required List failovers, - TorService? torService, - }) { - return SubscribableElectrumXClient( - useSSL: node.useSSL, - prefs: prefs, - failovers: failovers, - torService: torService ?? TorService.sharedInstance, - ); - } - - // Example for returning a future which completes upon connection. - // static Future from({ - // required ElectrumXNode node, - // TorService? torService, - // }) async { - // final client = SubscribableElectrumXClient( - // useSSL: node.useSSL, - // ); - // - // await client.connect(host: node.address, port: node.port); - // - // return client; - // } - - /// Check if the RPC client is connected and connect if needed. - /// - /// If Tor is enabled but not running, it will attempt to start Tor. - Future _checkSocket({bool connecting = false}) async { - if (_prefs.useTor) { - // If we're supposed to use Tor... - if (_torService.status != TorConnectionStatus.connected) { - // ... but Tor isn't running... - if (!_prefs.torKillSwitch) { - // ... and the killswitch isn't set, then we'll just return below. - Logging.instance.log( - "Tor preference set but Tor is not enabled, killswitch not set, connecting to ElectrumX through clearnet.", - level: LogLevel.Warning, - ); - } else { - // ... but if the killswitch is set, then let's try to start Tor. - await _torService.start(); - // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. - - // Double-check that Tor is running. - if (_torService.status != TorConnectionStatus.connected) { - // If Tor still isn't running, then we'll throw an exception. - throw Exception("SubscribableElectrumXClient._checkRpcClient: " - "Tor preference and killswitch set but Tor not enabled and could not start, not connecting to ElectrumX."); - } - } - } - } - - // Connect if needed. - if (!connecting) { - if ((!_prefs.useTor && _socket == null) || - (_prefs.useTor && _socksSocket == null)) { - if (currentFailoverIndex == -1) { - // Check if we have cached node information - if (_host == null && _port == null) { - throw Exception("SubscribableElectrumXClient._checkRpcClient: " - "No host or port provided and no cached node information."); - } - - // Connect to the server. - await connect(host: _host!, port: _port!); - } else { - // Attempt to connect to the next failover server. - await connect( - host: failovers![currentFailoverIndex].address, - port: failovers![currentFailoverIndex].port, - ); - } - } - } - } - - /// Connect to the server. - /// - /// If Tor is enabled, it will attempt to connect through Tor. - Future connect({ - required String host, - required int port, - }) async { - try { - // Cache node information. - _host = host; - _port = port; - - // If we're already connected, disconnect first. - try { - await _socket?.close(); - } catch (_) {} - - // If we're connecting to Tor, wait. - if (_requireMutex) { - await _torConnectingLock - .protect(() async => await _checkSocket(connecting: true)); - } else { - await _checkSocket(connecting: true); - } - - if (!Prefs.instance.useTor) { - // If we're not supposed to use Tor, then connect directly. - await connectClearnet(host, port); - } else { - // If we're supposed to use Tor... - if (_torService.status != TorConnectionStatus.connected) { - // ... but Tor isn't running... - if (!_prefs.torKillSwitch) { - // ... and the killswitch isn't set, then we'll connect clearnet. - Logging.instance.log( - "Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet", - level: LogLevel.Warning, - ); - await connectClearnet(host, port); - } else { - // ... but if the killswitch is set, then let's try to start Tor. - await _torService.start(); - // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. - - // Doublecheck that Tor is running. - if (_torService.status != TorConnectionStatus.connected) { - // If Tor still isn't running, then we'll throw an exception. - throw Exception( - "Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX"); - } - - // Connect via Tor. - await connectTor(host, port); - } - } else { - // Connect via Tor. - await connectTor(host, port); - } - } - - _updateConnectionStatus(true); - - if (_prefs.useTor) { - if (_socksSocket == null) { - final String msg = "SubscribableElectrumXClient.connect(): " - "cannot listen to $host:$port via SOCKSSocket because it is not connected."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - - _socksSocket!.listen( - _dataHandler, - onError: _errorHandler, - onDone: _doneHandler, - cancelOnError: true, - ); - } else { - if (_socket == null) { - final String msg = "SubscribableElectrumXClient.connect(): " - "cannot listen to $host:$port via socket because it is not connected."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - - _socket!.listen( - _dataHandler, - onError: _errorHandler, - onDone: _doneHandler, - cancelOnError: true, - ); - } - - _aliveTimer?.cancel(); - _aliveTimer = Timer.periodic( - _keepAlive, - (_) async => _updateConnectionStatus(await ping()), - ); - } catch (e, s) { - final msg = "SubscribableElectrumXClient.connect: " - "failed to connect to $host:$port." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - - // Ensure cleanup is performed on failure to avoid resource leaks. - await disconnect(); // Use the disconnect method to clean up. - rethrow; // Rethrow the exception to handle it further up the call stack. - } - } - - /// Connect to the server directly. - Future connectClearnet(String host, int port) async { - try { - Logging.instance.log( - "SubscribableElectrumXClient.connectClearnet(): " - "creating a socket to $host:$port (SSL $useSSL)...", - level: LogLevel.Info); - - if (_useSSL) { - _socket = await SecureSocket.connect( - host, - port, - timeout: _connectionTimeout, - onBadCertificate: (_) => - true, // TODO do not automatically trust bad certificates. - ); - } else { - _socket = await Socket.connect( - host, - port, - timeout: _connectionTimeout, - ); - } - - Logging.instance.log( - "SubscribableElectrumXClient.connectClearnet(): " - "created socket to $host:$port...", - level: LogLevel.Info); - } catch (e, s) { - final String msg = "SubscribableElectrumXClient.connectClearnet: " - "failed to connect to $host (SSL: $useSSL)." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - - return; - } - - /// Connect to the server using the Tor service. - Future connectTor(String host, int port) async { - // Get the proxy info from the TorService. - final proxyInfo = _torService.getProxyInfo(); - - try { - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "creating a SOCKS socket at $proxyInfo (SSL $useSSL)...", - level: LogLevel.Info); - - // Create a socks socket using the Tor service's proxy info. - _socksSocket = await SOCKSSocket.create( - proxyHost: proxyInfo.host.address, - proxyPort: proxyInfo.port, - sslEnabled: useSSL, - ); - - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "created SOCKS socket at $proxyInfo...", - level: LogLevel.Info); - } catch (e, s) { - final String msg = "SubscribableElectrumXClient.connectTor(): " - "failed to create a SOCKS socket at $proxyInfo (SSL $useSSL)..." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - - try { - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "connecting to SOCKS socket at $proxyInfo (SSL $useSSL)...", - level: LogLevel.Info); - - await _socksSocket?.connect(); - - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "connected to SOCKS socket at $proxyInfo...", - level: LogLevel.Info); - } catch (e, s) { - final String msg = "SubscribableElectrumXClient.connectTor(): " - "failed to connect to SOCKS socket at $proxyInfo.." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - - try { - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "connecting to $host:$port over SOCKS socket at $proxyInfo...", - level: LogLevel.Info); - - await _socksSocket?.connectTo(host, port); - - Logging.instance.log( - "SubscribableElectrumXClient.connectTor(): " - "connected to $host:$port over SOCKS socket at $proxyInfo", - level: LogLevel.Info); - } catch (e, s) { - final String msg = "SubscribableElectrumXClient.connectTor(): " - "failed to connect $host over tor proxy at $proxyInfo." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - - return; - } - - /// Disconnect from the server. - Future disconnect() async { - _aliveTimer?.cancel(); - _aliveTimer = null; - - try { - await _socket?.close(); - } catch (e, s) { - Logging.instance.log( - "SubscribableElectrumXClient.disconnect: failed to close socket." - "\nError: $e\nStack trace: $s", - level: LogLevel.Warning); - } - _socket = null; - - try { - await _socksSocket?.close(); - } catch (e, s) { - Logging.instance.log( - "SubscribableElectrumXClient.disconnect: failed to close SOCKS socket." - "\nError: $e\nStack trace: $s", - level: LogLevel.Warning); - } - _socksSocket = null; - - onConnectionStatusChanged = null; - } - - /// Format JSON request string. - String _buildJsonRequestString({ - required String method, - required String id, - required List params, - }) { - final paramString = jsonEncode(params); - return '{"jsonrpc": "2.0", "id": "$id","method": "$method","params": $paramString}\r\n'; - } - - /// Update the connection status and call the onConnectionStatusChanged callback if it exists. - void _updateConnectionStatus(bool connectionStatus) { - if (_isConnected != connectionStatus && onConnectionStatusChanged != null) { - onConnectionStatusChanged!(connectionStatus); - } - _isConnected = connectionStatus; - } - - /// Called when the socket has data. - void _dataHandler(List data) { - _responseData.addAll(data); - - // 0x0A is newline - // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html - if (data.last == 0x0A) { - try { - final response = jsonDecode(String.fromCharCodes(_responseData)) - as Map; - _responseHandler(response); - } catch (e, s) { - Logging.instance - .log("JsonRPC jsonDecode: $e\n$s", level: LogLevel.Error); - rethrow; - } finally { - _responseData = []; - } - } - } - - /// Called when the socket has a response. - void _responseHandler(Map response) { - // subscriptions will have a method in the response - if (response['method'] is String) { - _subscriptionHandler(response: response); - return; - } - - final id = response['id'] as String; - final result = response['result']; - - _complete(id, result); - } - - /// Called when the subscription has a response. - void _subscriptionHandler({ - required Map response, - }) { - final method = response['method']; - switch (method) { - case "blockchain.scripthash.subscribe": - final params = response["params"] as List; - final scripthash = params.first as String; - final taskId = "blockchain.scripthash.subscribe:$scripthash"; - - _tasks[taskId]?.subscription?.addToStream(params.last); - break; - case "blockchain.headers.subscribe": - final params = response["params"]; - const taskId = "blockchain.headers.subscribe"; - - _tasks[taskId]?.subscription?.addToStream(params.first); - break; - default: - break; - } - } - - /// Called when the socket has an error. - void _errorHandler(Object error, StackTrace trace) { - _updateConnectionStatus(false); - Logging.instance.log( - "SubscribableElectrumXClient called _errorHandler with: $error\n$trace", - level: LogLevel.Info); - } - - /// Called when the socket is closed. - void _doneHandler() { - _updateConnectionStatus(false); - Logging.instance.log("SubscribableElectrumXClient called _doneHandler", - level: LogLevel.Info); - } - - /// Complete a task with the given id and data. - void _complete(String id, dynamic data) { - if (_tasks[id] == null) { - return; - } - - if (!(_tasks[id]?.completer?.isCompleted ?? false)) { - _tasks[id]?.completer?.complete(data); - } - - if (!(_tasks[id]?.isSubscription ?? false)) { - _tasks.remove(id); - } else { - _tasks[id]?.subscription?.addToStream(data); - } - } - - /// Add a task to the task list. - void _addTask({ - required String id, - required Completer completer, - }) { - _tasks[id] = SocketTask(completer: completer, subscription: null); - } - - /// Add a subscription task to the task list. - void _addSubscriptionTask({ - required String id, - required ElectrumXSubscription subscription, - }) { - _tasks[id] = SocketTask(completer: null, subscription: subscription); - } - - /// Write call to socket. - Future _call({ - required String method, - List params = const [], - }) async { - // If we're connecting to Tor, wait. - if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkSocket()); - } else { - await _checkSocket(); - } - - // Check socket is connected. - if (_prefs.useTor) { - if (_socksSocket == null) { - final msg = "SubscribableElectrumXClient._call: " - "SOCKSSocket is not connected. Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } else { - if (_socket == null) { - final msg = "SubscribableElectrumXClient._call: " - "Socket is not connected. Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } - - final completer = Completer(); - _currentRequestID++; - final id = _currentRequestID.toString(); - - // Write to the socket. - try { - _addTask(id: id, completer: completer); - - if (_prefs.useTor) { - _socksSocket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } else { - _socket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } - - return completer.future; - } catch (e, s) { - final String msg = "SubscribableElectrumXClient._call: " - "failed to request $method with id $id." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - } - - /// Write call to socket with timeout. - Future _callWithTimeout({ - required String method, - List params = const [], - Duration timeout = const Duration(seconds: 2), - }) async { - // If we're connecting to Tor, wait. - if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkSocket()); - } else { - await _checkSocket(); - } - - // Check socket is connected. - if (_prefs.useTor) { - if (_socksSocket == null) { - try { - if (_host == null || _port == null) { - throw Exception("No host or port provided"); - } - - // Attempt to conect. - await connect( - host: _host!, - port: _port!, - ); - } catch (e, s) { - final msg = "SubscribableElectrumXClient._callWithTimeout: " - "SOCKSSocket not connected and cannot connect. " - "Method $method, params $params." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } - } else { - if (_socket == null) { - try { - if (_host == null || _port == null) { - throw Exception("No host or port provided"); - } - - // Attempt to conect. - await connect( - host: _host!, - port: _port!, - ); - } catch (e, s) { - final msg = "SubscribableElectrumXClient._callWithTimeout: " - "Socket not connected and cannot connect. " - "Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } - } - - final completer = Completer(); - _currentRequestID++; - final id = _currentRequestID.toString(); - - // Write to the socket. - try { - _addTask(id: id, completer: completer); - - if (_prefs.useTor) { - _socksSocket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } else { - _socket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } - - Timer(timeout, () { - if (!completer.isCompleted) { - completer.completeError( - Exception("Request \"id: $id, method: $method\" timed out!"), - ); - } - }); - - return completer.future; - } catch (e, s) { - final String msg = "SubscribableElectrumXClient._callWithTimeout: " - "failed to request $method with id $id (timeout $timeout)." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - } - - ElectrumXSubscription _subscribe({ - required String id, - required String method, - List params = const [], - }) { - try { - final subscription = ElectrumXSubscription(); - _addSubscriptionTask(id: id, subscription: subscription); - _currentRequestID++; - - // Check socket is connected. - if (_prefs.useTor) { - if (_socksSocket == null) { - final msg = "SubscribableElectrumXClient._call: " - "SOCKSSocket is not connected. Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } else { - if (_socket == null) { - final msg = "SubscribableElectrumXClient._call: " - "Socket is not connected. Method $method, params $params."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } - } - - // Write to the socket. - if (_prefs.useTor) { - _socksSocket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } else { - _socket?.write( - _buildJsonRequestString( - method: method, - id: id, - params: params, - ), - ); - } - - return subscription; - } catch (e, s) { - final String msg = "SubscribableElectrumXClient._subscribe: " - "failed to subscribe to $method with id $id." - "\nError: $e\nStack trace: $s"; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw JsonRpcException(msg); - } - } - - /// Ping the server to ensure it is responding - /// - /// Returns true if ping succeeded - Future ping() async { - // If we're connecting to Tor, wait. - if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkSocket()); - } else { - await _checkSocket(); - } - - // Write to the socket. - try { - final response = (await _callWithTimeout(method: "server.ping")) as Map; - return response.keys.contains("result") && response["result"] == null; - } catch (_) { - return false; - } - } - - /// Subscribe to a scripthash to receive notifications on status changes - ElectrumXSubscription subscribeToScripthash({required String scripthash}) { - return _subscribe( - id: 'blockchain.scripthash.subscribe:$scripthash', - method: 'blockchain.scripthash.subscribe', - params: [scripthash], - ); - } - - /// Subscribe to block headers to receive notifications on new blocks found - /// - /// Returns the existing subscription if found - ElectrumXSubscription subscribeToBlockHeaders() { - return _tasks["blockchain.headers.subscribe"]?.subscription ?? - _subscribe( - id: "blockchain.headers.subscribe", - method: "blockchain.headers.subscribe", - params: [], - ); - } -} +// /* +// * This file is part of Stack Wallet. +// * +// * Copyright (c) 2023 Cypher Stack +// * All Rights Reserved. +// * The code is distributed under GPLv3 license, see LICENSE file for details. +// * Generated by Cypher Stack on 2023-05-26 +// * +// */ +// +// import 'dart:async'; +// import 'dart:convert'; +// import 'dart:io'; +// +// import 'package:event_bus/event_bus.dart'; +// import 'package:mutex/mutex.dart'; +// import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; +// import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart'; +// import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; +// import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; +// import 'package:stackwallet/services/event_bus/global_event_bus.dart'; +// import 'package:stackwallet/services/tor_service.dart'; +// import 'package:stackwallet/utilities/logger.dart'; +// import 'package:stackwallet/utilities/prefs.dart'; +// import 'package:tor_ffi_plugin/socks_socket.dart'; +// +// class ElectrumXSubscription { +// final StreamController _controller = +// StreamController(); // TODO controller params +// +// Stream get responseStream => _controller.stream; +// +// void addToStream(dynamic data) => _controller.add(data); +// } +// +// class SocketTask { +// SocketTask({this.completer, this.subscription}); +// +// final Completer? completer; +// final ElectrumXSubscription? subscription; +// +// bool get isSubscription => subscription != null; +// } +// +// class SubscribableElectrumXClient { +// int _currentRequestID = 0; +// bool _isConnected = false; +// List _responseData = []; +// final Map _tasks = {}; +// Timer? _aliveTimer; +// Socket? _socket; +// SOCKSSocket? _socksSocket; +// late final bool _useSSL; +// late final Duration _connectionTimeout; +// late final Duration _keepAlive; +// +// bool get isConnected => _isConnected; +// bool get useSSL => _useSSL; +// // Used to reconnect. +// String? _host; +// int? _port; +// +// void Function(bool)? onConnectionStatusChanged; +// +// late Prefs _prefs; +// late TorService _torService; +// StreamSubscription? _torPreferenceListener; +// StreamSubscription? _torStatusListener; +// final Mutex _torConnectingLock = Mutex(); +// bool _requireMutex = false; +// +// List? failovers; +// int currentFailoverIndex = -1; +// +// SubscribableElectrumXClient({ +// required bool useSSL, +// required Prefs prefs, +// required List failovers, +// TorService? torService, +// this.onConnectionStatusChanged, +// Duration connectionTimeout = const Duration(seconds: 5), +// Duration keepAlive = const Duration(seconds: 10), +// EventBus? globalEventBusForTesting, +// }) { +// _useSSL = useSSL; +// _prefs = prefs; +// _torService = torService ?? TorService.sharedInstance; +// _connectionTimeout = connectionTimeout; +// _keepAlive = keepAlive; +// +// // If we're testing, use the global event bus for testing. +// final bus = globalEventBusForTesting ?? GlobalEventBus.instance; +// +// // Listen to global event bus for Tor status changes. +// _torStatusListener = bus.on().listen( +// (event) async { +// try { +// switch (event.newStatus) { +// case TorConnectionStatus.connecting: +// // If Tor is connecting, we need to wait. +// await _torConnectingLock.acquire(); +// _requireMutex = true; +// break; +// +// case TorConnectionStatus.connected: +// case TorConnectionStatus.disconnected: +// // If Tor is connected or disconnected, we can release the lock. +// if (_torConnectingLock.isLocked) { +// _torConnectingLock.release(); +// } +// _requireMutex = false; +// break; +// } +// } finally { +// // Ensure the lock is released. +// if (_torConnectingLock.isLocked) { +// _torConnectingLock.release(); +// } +// } +// }, +// ); +// +// // Listen to global event bus for Tor preference changes. +// _torPreferenceListener = bus.on().listen( +// (event) async { +// // Close open socket (if open). +// final tempSocket = _socket; +// _socket = null; +// await tempSocket?.close(); +// +// // Close open SOCKS socket (if open). +// final tempSOCKSSocket = _socksSocket; +// _socksSocket = null; +// await tempSOCKSSocket?.close(); +// +// // Clear subscriptions. +// _tasks.clear(); +// +// // Cancel alive timer +// _aliveTimer?.cancel(); +// }, +// ); +// } +// +// factory SubscribableElectrumXClient.from({ +// required ElectrumXNode node, +// required Prefs prefs, +// required List failovers, +// TorService? torService, +// }) { +// return SubscribableElectrumXClient( +// useSSL: node.useSSL, +// prefs: prefs, +// failovers: failovers, +// torService: torService ?? TorService.sharedInstance, +// ); +// } +// +// // Example for returning a future which completes upon connection. +// // static Future from({ +// // required ElectrumXNode node, +// // TorService? torService, +// // }) async { +// // final client = SubscribableElectrumXClient( +// // useSSL: node.useSSL, +// // ); +// // +// // await client.connect(host: node.address, port: node.port); +// // +// // return client; +// // } +// +// /// Check if the RPC client is connected and connect if needed. +// /// +// /// If Tor is enabled but not running, it will attempt to start Tor. +// Future _checkSocket({bool connecting = false}) async { +// if (_prefs.useTor) { +// // If we're supposed to use Tor... +// if (_torService.status != TorConnectionStatus.connected) { +// // ... but Tor isn't running... +// if (!_prefs.torKillSwitch) { +// // ... and the killswitch isn't set, then we'll just return below. +// Logging.instance.log( +// "Tor preference set but Tor is not enabled, killswitch not set, connecting to ElectrumX through clearnet.", +// level: LogLevel.Warning, +// ); +// } else { +// // ... but if the killswitch is set, then let's try to start Tor. +// await _torService.start(); +// // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. +// +// // Double-check that Tor is running. +// if (_torService.status != TorConnectionStatus.connected) { +// // If Tor still isn't running, then we'll throw an exception. +// throw Exception("SubscribableElectrumXClient._checkRpcClient: " +// "Tor preference and killswitch set but Tor not enabled and could not start, not connecting to ElectrumX."); +// } +// } +// } +// } +// +// // Connect if needed. +// if (!connecting) { +// if ((!_prefs.useTor && _socket == null) || +// (_prefs.useTor && _socksSocket == null)) { +// if (currentFailoverIndex == -1) { +// // Check if we have cached node information +// if (_host == null && _port == null) { +// throw Exception("SubscribableElectrumXClient._checkRpcClient: " +// "No host or port provided and no cached node information."); +// } +// +// // Connect to the server. +// await connect(host: _host!, port: _port!); +// } else { +// // Attempt to connect to the next failover server. +// await connect( +// host: failovers![currentFailoverIndex].address, +// port: failovers![currentFailoverIndex].port, +// ); +// } +// } +// } +// } +// +// /// Connect to the server. +// /// +// /// If Tor is enabled, it will attempt to connect through Tor. +// Future connect({ +// required String host, +// required int port, +// }) async { +// try { +// // Cache node information. +// _host = host; +// _port = port; +// +// // If we're already connected, disconnect first. +// try { +// await _socket?.close(); +// } catch (_) {} +// +// // If we're connecting to Tor, wait. +// if (_requireMutex) { +// await _torConnectingLock +// .protect(() async => await _checkSocket(connecting: true)); +// } else { +// await _checkSocket(connecting: true); +// } +// +// if (!Prefs.instance.useTor) { +// // If we're not supposed to use Tor, then connect directly. +// await connectClearnet(host, port); +// } else { +// // If we're supposed to use Tor... +// if (_torService.status != TorConnectionStatus.connected) { +// // ... but Tor isn't running... +// if (!_prefs.torKillSwitch) { +// // ... and the killswitch isn't set, then we'll connect clearnet. +// Logging.instance.log( +// "Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet", +// level: LogLevel.Warning, +// ); +// await connectClearnet(host, port); +// } else { +// // ... but if the killswitch is set, then let's try to start Tor. +// await _torService.start(); +// // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. +// +// // Doublecheck that Tor is running. +// if (_torService.status != TorConnectionStatus.connected) { +// // If Tor still isn't running, then we'll throw an exception. +// throw Exception( +// "Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX"); +// } +// +// // Connect via Tor. +// await connectTor(host, port); +// } +// } else { +// // Connect via Tor. +// await connectTor(host, port); +// } +// } +// +// _updateConnectionStatus(true); +// +// if (_prefs.useTor) { +// if (_socksSocket == null) { +// final String msg = "SubscribableElectrumXClient.connect(): " +// "cannot listen to $host:$port via SOCKSSocket because it is not connected."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// +// _socksSocket!.listen( +// _dataHandler, +// onError: _errorHandler, +// onDone: _doneHandler, +// cancelOnError: true, +// ); +// } else { +// if (_socket == null) { +// final String msg = "SubscribableElectrumXClient.connect(): " +// "cannot listen to $host:$port via socket because it is not connected."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// +// _socket!.listen( +// _dataHandler, +// onError: _errorHandler, +// onDone: _doneHandler, +// cancelOnError: true, +// ); +// } +// +// _aliveTimer?.cancel(); +// _aliveTimer = Timer.periodic( +// _keepAlive, +// (_) async => _updateConnectionStatus(await ping()), +// ); +// } catch (e, s) { +// final msg = "SubscribableElectrumXClient.connect: " +// "failed to connect to $host:$port." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// +// // Ensure cleanup is performed on failure to avoid resource leaks. +// await disconnect(); // Use the disconnect method to clean up. +// rethrow; // Rethrow the exception to handle it further up the call stack. +// } +// } +// +// /// Connect to the server directly. +// Future connectClearnet(String host, int port) async { +// try { +// Logging.instance.log( +// "SubscribableElectrumXClient.connectClearnet(): " +// "creating a socket to $host:$port (SSL $useSSL)...", +// level: LogLevel.Info); +// +// if (_useSSL) { +// _socket = await SecureSocket.connect( +// host, +// port, +// timeout: _connectionTimeout, +// onBadCertificate: (_) => +// true, // TODO do not automatically trust bad certificates. +// ); +// } else { +// _socket = await Socket.connect( +// host, +// port, +// timeout: _connectionTimeout, +// ); +// } +// +// Logging.instance.log( +// "SubscribableElectrumXClient.connectClearnet(): " +// "created socket to $host:$port...", +// level: LogLevel.Info); +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient.connectClearnet: " +// "failed to connect to $host (SSL: $useSSL)." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// +// return; +// } +// +// /// Connect to the server using the Tor service. +// Future connectTor(String host, int port) async { +// // Get the proxy info from the TorService. +// final proxyInfo = _torService.getProxyInfo(); +// +// try { +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "creating a SOCKS socket at $proxyInfo (SSL $useSSL)...", +// level: LogLevel.Info); +// +// // Create a socks socket using the Tor service's proxy info. +// _socksSocket = await SOCKSSocket.create( +// proxyHost: proxyInfo.host.address, +// proxyPort: proxyInfo.port, +// sslEnabled: useSSL, +// ); +// +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "created SOCKS socket at $proxyInfo...", +// level: LogLevel.Info); +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient.connectTor(): " +// "failed to create a SOCKS socket at $proxyInfo (SSL $useSSL)..." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// +// try { +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "connecting to SOCKS socket at $proxyInfo (SSL $useSSL)...", +// level: LogLevel.Info); +// +// await _socksSocket?.connect(); +// +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "connected to SOCKS socket at $proxyInfo...", +// level: LogLevel.Info); +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient.connectTor(): " +// "failed to connect to SOCKS socket at $proxyInfo.." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// +// try { +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "connecting to $host:$port over SOCKS socket at $proxyInfo...", +// level: LogLevel.Info); +// +// await _socksSocket?.connectTo(host, port); +// +// Logging.instance.log( +// "SubscribableElectrumXClient.connectTor(): " +// "connected to $host:$port over SOCKS socket at $proxyInfo", +// level: LogLevel.Info); +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient.connectTor(): " +// "failed to connect $host over tor proxy at $proxyInfo." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// +// return; +// } +// +// /// Disconnect from the server. +// Future disconnect() async { +// _aliveTimer?.cancel(); +// _aliveTimer = null; +// +// try { +// await _socket?.close(); +// } catch (e, s) { +// Logging.instance.log( +// "SubscribableElectrumXClient.disconnect: failed to close socket." +// "\nError: $e\nStack trace: $s", +// level: LogLevel.Warning); +// } +// _socket = null; +// +// try { +// await _socksSocket?.close(); +// } catch (e, s) { +// Logging.instance.log( +// "SubscribableElectrumXClient.disconnect: failed to close SOCKS socket." +// "\nError: $e\nStack trace: $s", +// level: LogLevel.Warning); +// } +// _socksSocket = null; +// +// onConnectionStatusChanged = null; +// } +// +// /// Format JSON request string. +// String _buildJsonRequestString({ +// required String method, +// required String id, +// required List params, +// }) { +// final paramString = jsonEncode(params); +// return '{"jsonrpc": "2.0", "id": "$id","method": "$method","params": $paramString}\r\n'; +// } +// +// /// Update the connection status and call the onConnectionStatusChanged callback if it exists. +// void _updateConnectionStatus(bool connectionStatus) { +// if (_isConnected != connectionStatus && onConnectionStatusChanged != null) { +// onConnectionStatusChanged!(connectionStatus); +// } +// _isConnected = connectionStatus; +// } +// +// /// Called when the socket has data. +// void _dataHandler(List data) { +// _responseData.addAll(data); +// +// // 0x0A is newline +// // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html +// if (data.last == 0x0A) { +// try { +// final response = jsonDecode(String.fromCharCodes(_responseData)) +// as Map; +// _responseHandler(response); +// } catch (e, s) { +// Logging.instance +// .log("JsonRPC jsonDecode: $e\n$s", level: LogLevel.Error); +// rethrow; +// } finally { +// _responseData = []; +// } +// } +// } +// +// /// Called when the socket has a response. +// void _responseHandler(Map response) { +// // subscriptions will have a method in the response +// if (response['method'] is String) { +// _subscriptionHandler(response: response); +// return; +// } +// +// final id = response['id'] as String; +// final result = response['result']; +// +// _complete(id, result); +// } +// +// /// Called when the subscription has a response. +// void _subscriptionHandler({ +// required Map response, +// }) { +// final method = response['method']; +// switch (method) { +// case "blockchain.scripthash.subscribe": +// final params = response["params"] as List; +// final scripthash = params.first as String; +// final taskId = "blockchain.scripthash.subscribe:$scripthash"; +// +// _tasks[taskId]?.subscription?.addToStream(params.last); +// break; +// case "blockchain.headers.subscribe": +// final params = response["params"]; +// const taskId = "blockchain.headers.subscribe"; +// +// _tasks[taskId]?.subscription?.addToStream(params.first); +// break; +// default: +// break; +// } +// } +// +// /// Called when the socket has an error. +// void _errorHandler(Object error, StackTrace trace) { +// _updateConnectionStatus(false); +// Logging.instance.log( +// "SubscribableElectrumXClient called _errorHandler with: $error\n$trace", +// level: LogLevel.Info); +// } +// +// /// Called when the socket is closed. +// void _doneHandler() { +// _updateConnectionStatus(false); +// Logging.instance.log("SubscribableElectrumXClient called _doneHandler", +// level: LogLevel.Info); +// } +// +// /// Complete a task with the given id and data. +// void _complete(String id, dynamic data) { +// if (_tasks[id] == null) { +// return; +// } +// +// if (!(_tasks[id]?.completer?.isCompleted ?? false)) { +// _tasks[id]?.completer?.complete(data); +// } +// +// if (!(_tasks[id]?.isSubscription ?? false)) { +// _tasks.remove(id); +// } else { +// _tasks[id]?.subscription?.addToStream(data); +// } +// } +// +// /// Add a task to the task list. +// void _addTask({ +// required String id, +// required Completer completer, +// }) { +// _tasks[id] = SocketTask(completer: completer, subscription: null); +// } +// +// /// Add a subscription task to the task list. +// void _addSubscriptionTask({ +// required String id, +// required ElectrumXSubscription subscription, +// }) { +// _tasks[id] = SocketTask(completer: null, subscription: subscription); +// } +// +// /// Write call to socket. +// Future _call({ +// required String method, +// List params = const [], +// }) async { +// // If we're connecting to Tor, wait. +// if (_requireMutex) { +// await _torConnectingLock.protect(() async => await _checkSocket()); +// } else { +// await _checkSocket(); +// } +// +// // Check socket is connected. +// if (_prefs.useTor) { +// if (_socksSocket == null) { +// final msg = "SubscribableElectrumXClient._call: " +// "SOCKSSocket is not connected. Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } else { +// if (_socket == null) { +// final msg = "SubscribableElectrumXClient._call: " +// "Socket is not connected. Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } +// +// final completer = Completer(); +// _currentRequestID++; +// final id = _currentRequestID.toString(); +// +// // Write to the socket. +// try { +// _addTask(id: id, completer: completer); +// +// if (_prefs.useTor) { +// _socksSocket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } else { +// _socket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } +// +// return completer.future; +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient._call: " +// "failed to request $method with id $id." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// } +// +// /// Write call to socket with timeout. +// Future _callWithTimeout({ +// required String method, +// List params = const [], +// Duration timeout = const Duration(seconds: 2), +// }) async { +// // If we're connecting to Tor, wait. +// if (_requireMutex) { +// await _torConnectingLock.protect(() async => await _checkSocket()); +// } else { +// await _checkSocket(); +// } +// +// // Check socket is connected. +// if (_prefs.useTor) { +// if (_socksSocket == null) { +// try { +// if (_host == null || _port == null) { +// throw Exception("No host or port provided"); +// } +// +// // Attempt to conect. +// await connect( +// host: _host!, +// port: _port!, +// ); +// } catch (e, s) { +// final msg = "SubscribableElectrumXClient._callWithTimeout: " +// "SOCKSSocket not connected and cannot connect. " +// "Method $method, params $params." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } +// } else { +// if (_socket == null) { +// try { +// if (_host == null || _port == null) { +// throw Exception("No host or port provided"); +// } +// +// // Attempt to conect. +// await connect( +// host: _host!, +// port: _port!, +// ); +// } catch (e, s) { +// final msg = "SubscribableElectrumXClient._callWithTimeout: " +// "Socket not connected and cannot connect. " +// "Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } +// } +// +// final completer = Completer(); +// _currentRequestID++; +// final id = _currentRequestID.toString(); +// +// // Write to the socket. +// try { +// _addTask(id: id, completer: completer); +// +// if (_prefs.useTor) { +// _socksSocket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } else { +// _socket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } +// +// Timer(timeout, () { +// if (!completer.isCompleted) { +// completer.completeError( +// Exception("Request \"id: $id, method: $method\" timed out!"), +// ); +// } +// }); +// +// return completer.future; +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient._callWithTimeout: " +// "failed to request $method with id $id (timeout $timeout)." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// } +// +// ElectrumXSubscription _subscribe({ +// required String id, +// required String method, +// List params = const [], +// }) { +// try { +// final subscription = ElectrumXSubscription(); +// _addSubscriptionTask(id: id, subscription: subscription); +// _currentRequestID++; +// +// // Check socket is connected. +// if (_prefs.useTor) { +// if (_socksSocket == null) { +// final msg = "SubscribableElectrumXClient._call: " +// "SOCKSSocket is not connected. Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } else { +// if (_socket == null) { +// final msg = "SubscribableElectrumXClient._call: " +// "Socket is not connected. Method $method, params $params."; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw Exception(msg); +// } +// } +// +// // Write to the socket. +// if (_prefs.useTor) { +// _socksSocket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } else { +// _socket?.write( +// _buildJsonRequestString( +// method: method, +// id: id, +// params: params, +// ), +// ); +// } +// +// return subscription; +// } catch (e, s) { +// final String msg = "SubscribableElectrumXClient._subscribe: " +// "failed to subscribe to $method with id $id." +// "\nError: $e\nStack trace: $s"; +// Logging.instance.log(msg, level: LogLevel.Fatal); +// throw JsonRpcException(msg); +// } +// } +// +// /// Ping the server to ensure it is responding +// /// +// /// Returns true if ping succeeded +// Future ping() async { +// // If we're connecting to Tor, wait. +// if (_requireMutex) { +// await _torConnectingLock.protect(() async => await _checkSocket()); +// } else { +// await _checkSocket(); +// } +// +// // Write to the socket. +// try { +// final response = (await _callWithTimeout(method: "server.ping")) as Map; +// return response.keys.contains("result") && response["result"] == null; +// } catch (_) { +// return false; +// } +// } +// +// /// Subscribe to a scripthash to receive notifications on status changes +// ElectrumXSubscription subscribeToScripthash({required String scripthash}) { +// return _subscribe( +// id: 'blockchain.scripthash.subscribe:$scripthash', +// method: 'blockchain.scripthash.subscribe', +// params: [scripthash], +// ); +// } +// +// /// Subscribe to block headers to receive notifications on new blocks found +// /// +// /// Returns the existing subscription if found +// ElectrumXSubscription subscribeToBlockHeaders() { +// return _tasks["blockchain.headers.subscribe"]?.subscription ?? +// _subscribe( +// id: "blockchain.headers.subscribe", +// method: "blockchain.headers.subscribe", +// params: [], +// ); +// } +// } diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index 42292184e..d10996a00 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -11,7 +11,6 @@ import 'package:mutex/mutex.dart'; import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; -import 'package:stackwallet/electrumx_rpc/subscribable_electrumx_client.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/v2/input_v2.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/v2/output_v2.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/v2/transaction_v2.dart'; @@ -34,17 +33,23 @@ import 'package:stackwallet/wallets/wallet/intermediate/bip39_hd_wallet.dart'; import 'package:stackwallet/wallets/wallet/wallet_mixin_interfaces/paynym_interface.dart'; import 'package:stream_channel/stream_channel.dart'; +import '../../../services/event_bus/events/global/tor_connection_status_changed_event.dart'; +import '../../../services/event_bus/events/global/tor_status_changed_event.dart'; + mixin ElectrumXInterface on Bip39HDWallet { late ElectrumXClient electrumXClient; late StreamChannel electrumAdapterChannel; late ElectrumClient electrumAdapterClient; late CachedElectrumXClient electrumXCachedClient; - late SubscribableElectrumXClient subscribableElectrumXClient; + // late SubscribableElectrumXClient subscribableElectrumXClient; int? get maximumFeerate => null; int? _latestHeight; + StreamSubscription? _torPreferenceListener; + StreamSubscription? _torStatusListener; + static const _kServerBatchCutoffVersion = [1, 6]; List? _serverVersion; bool get serverCanBatch { @@ -811,6 +816,9 @@ mixin ElectrumXInterface on Bip39HDWallet { Future fetchChainHeight() async { try { + // _checkChainHeightSubscription(); + // TODO above. Make sure that the subscription/stream is alive. + // Don't set a stream subscription if one already exists. await _manageChainHeightSubscription(); @@ -862,36 +870,36 @@ mixin ElectrumXInterface on Bip39HDWallet { } } - Future _createSubscription() async { - final completer = Completer(); - ElectrumxChainHeightService.completers[cryptoCurrency.coin] = completer; + await electrumXClient.checkElectrumAdapter(); + // TODO [prio=extreme]: Does this update anything in this file?? Thinking no. - // Make sure we only complete once. - final isFirstResponse = _latestHeight == null; + final stream = electrumAdapterClient.subscribeHeaders(); - // Subscribe to block headers. - final subscription = subscribableElectrumXClient.subscribeToBlockHeaders(); + ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = + stream.asBroadcastStream().listen((response) { + final int chainHeight = response.height; + // print("Current chain height: $chainHeight"); - // Set stream subscription. - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = - subscription.responseStream.asBroadcastStream().listen((event) { - final response = event; - if (response != null && - response is Map && - response.containsKey('height')) { - final int chainHeight = response['height'] as int; - // print("Current chain height: $chainHeight"); + _latestHeight = chainHeight; - _latestHeight = chainHeight; - - if (isFirstResponse) { - // If the completer is not completed, complete it. - if (!ElectrumxChainHeightService - .completers[cryptoCurrency.coin]!.isCompleted) { - // Complete the completer, returning the chain height. - ElectrumxChainHeightService.completers[cryptoCurrency.coin]! - .complete(chainHeight); + if (isFirstResponse && !completer.isCompleted) { + // Return the chain height. + completer.complete(chainHeight); } + }); + } else { + // Don't set a stream subscription if one already exists. + + // Check if the stream subscription is paused. + if (ElectrumxChainHeightService + .subscriptions[cryptoCurrency.coin]!.isPaused) { + // If it's paused, resume it. + ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! + .resume(); + } + + if (_latestHeight != null) { + return _latestHeight!; } } else { Logging.instance.log( @@ -998,13 +1006,14 @@ mixin ElectrumXInterface on Bip39HDWallet { electrumAdapterClient: electrumAdapterClient, electrumAdapterUpdateCallback: updateClient, ); - subscribableElectrumXClient = SubscribableElectrumXClient.from( - node: newNode, - prefs: prefs, - failovers: failovers, - ); - await subscribableElectrumXClient.connect( - host: newNode.address, port: newNode.port); + // Replaced using electrum_adapters' SubscribableClient in fetchChainHeight. + // subscribableElectrumXClient = SubscribableElectrumXClient.from( + // node: newNode, + // prefs: prefs, + // failovers: failovers, + // ); + // await subscribableElectrumXClient.connect( + // host: newNode.address, port: newNode.port); } //============================================================================