From 2fb3034dc0a3f29a043622ed5143017a156c8d32 Mon Sep 17 00:00:00 2001 From: sneurlax Date: Mon, 5 Feb 2024 16:35:18 -0600 Subject: [PATCH] resolve recursion issue and add more cleanup and logging/error handling and refactor _checkRpcClient -> _checkSocket --- .../subscribable_electrumx_client.dart | 265 ++++++++++-------- .../electrumx_interface.dart | 6 + 2 files changed, 159 insertions(+), 112 deletions(-) diff --git a/lib/electrumx_rpc/subscribable_electrumx_client.dart b/lib/electrumx_rpc/subscribable_electrumx_client.dart index 1f21dd906..5db7cefc1 100644 --- a/lib/electrumx_rpc/subscribable_electrumx_client.dart +++ b/lib/electrumx_rpc/subscribable_electrumx_client.dart @@ -94,21 +94,28 @@ class SubscribableElectrumXClient { // Listen to global event bus for Tor status changes. _torStatusListener = bus.on().listen( (event) async { - switch (event.newStatus) { - case TorConnectionStatus.connecting: - // If Tor is connecting, we need to wait. - await _torConnectingLock.acquire(); - _requireMutex = true; - break; + try { + switch (event.newStatus) { + case TorConnectionStatus.connecting: + // If Tor is connecting, we need to wait. + await _torConnectingLock.acquire(); + _requireMutex = true; + break; - case TorConnectionStatus.connected: - case TorConnectionStatus.disconnected: - // If Tor is connected or disconnected, we can release the lock. - if (_torConnectingLock.isLocked) { - _torConnectingLock.release(); - } - _requireMutex = false; - break; + case TorConnectionStatus.connected: + case TorConnectionStatus.disconnected: + // If Tor is connected or disconnected, we can release the lock. + if (_torConnectingLock.isLocked) { + _torConnectingLock.release(); + } + _requireMutex = false; + break; + } + } finally { + // Ensure the lock is released. + if (_torConnectingLock.isLocked) { + _torConnectingLock.release(); + } } }, ); @@ -166,7 +173,7 @@ class SubscribableElectrumXClient { /// Check if the RPC client is connected and connect if needed. /// /// If Tor is enabled but not running, it will attempt to start Tor. - Future _checkRpcClient() async { + Future _checkSocket({bool connecting = false}) async { if (_prefs.useTor) { // If we're supposed to use Tor... if (_torService.status != TorConnectionStatus.connected) { @@ -193,23 +200,25 @@ class SubscribableElectrumXClient { } // Connect if needed. - if ((!_prefs.useTor && _socket == null) || - (_prefs.useTor && _socksSocket == null)) { - if (currentFailoverIndex == -1) { - // Check if we have cached node information - if (_host == null && _port == null) { - throw Exception("SubscribableElectrumXClient._checkRpcClient: " - "No host or port provided and no cached node information."); - } + if (!connecting) { + if ((!_prefs.useTor && _socket == null) || + (_prefs.useTor && _socksSocket == null)) { + if (currentFailoverIndex == -1) { + // Check if we have cached node information + if (_host == null && _port == null) { + throw Exception("SubscribableElectrumXClient._checkRpcClient: " + "No host or port provided and no cached node information."); + } - // Connect to the server. - await connect(host: _host!, port: _port!); - } else { - // Attempt to connect to the next failover server. - await connect( - host: failovers![currentFailoverIndex].address, - port: failovers![currentFailoverIndex].port, - ); + // Connect to the server. + await connect(host: _host!, port: _port!); + } else { + // Attempt to connect to the next failover server. + await connect( + host: failovers![currentFailoverIndex].address, + port: failovers![currentFailoverIndex].port, + ); + } } } } @@ -221,94 +230,106 @@ class SubscribableElectrumXClient { required String host, required int port, }) async { - // Cache node information. - _host = host; - _port = port; - - // If we're already connected, disconnect first. try { - await _socket?.close(); - } catch (_) {} + // Cache node information. + _host = host; + _port = port; - // If we're connecting to Tor, wait. - if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkRpcClient()); - } else { - await _checkRpcClient(); - } + // If we're already connected, disconnect first. + try { + await _socket?.close(); + } catch (_) {} - if (!Prefs.instance.useTor) { - // If we're not supposed to use Tor, then connect directly. - await connectClearnet(host, port); - } else { - // If we're supposed to use Tor... - if (_torService.status != TorConnectionStatus.connected) { - // ... but Tor isn't running... - if (!_prefs.torKillSwitch) { - // ... and the killswitch isn't set, then we'll connect clearnet. - Logging.instance.log( - "Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet", - level: LogLevel.Warning, - ); - await connectClearnet(host, port); - } else { - // ... but if the killswitch is set, then let's try to start Tor. - await _torService.start(); - // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. + // If we're connecting to Tor, wait. + if (_requireMutex) { + await _torConnectingLock + .protect(() async => await _checkSocket(connecting: true)); + } else { + await _checkSocket(connecting: true); + } - // Doublecheck that Tor is running. - if (_torService.status != TorConnectionStatus.connected) { - // If Tor still isn't running, then we'll throw an exception. - throw Exception( - "Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX"); + if (!Prefs.instance.useTor) { + // If we're not supposed to use Tor, then connect directly. + await connectClearnet(host, port); + } else { + // If we're supposed to use Tor... + if (_torService.status != TorConnectionStatus.connected) { + // ... but Tor isn't running... + if (!_prefs.torKillSwitch) { + // ... and the killswitch isn't set, then we'll connect clearnet. + Logging.instance.log( + "Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet", + level: LogLevel.Warning, + ); + await connectClearnet(host, port); + } else { + // ... but if the killswitch is set, then let's try to start Tor. + await _torService.start(); + // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. + + // Doublecheck that Tor is running. + if (_torService.status != TorConnectionStatus.connected) { + // If Tor still isn't running, then we'll throw an exception. + throw Exception( + "Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX"); + } + + // Connect via Tor. + await connectTor(host, port); } - + } else { // Connect via Tor. await connectTor(host, port); } + } + + _updateConnectionStatus(true); + + if (_prefs.useTor) { + if (_socksSocket == null) { + final String msg = "SubscribableElectrumXClient.connect(): " + "cannot listen to $host:$port via SOCKSSocket because it is not connected."; + Logging.instance.log(msg, level: LogLevel.Fatal); + throw Exception(msg); + } + + _socksSocket!.listen( + _dataHandler, + onError: _errorHandler, + onDone: _doneHandler, + cancelOnError: true, + ); } else { - // Connect via Tor. - await connectTor(host, port); - } - } + if (_socket == null) { + final String msg = "SubscribableElectrumXClient.connect(): " + "cannot listen to $host:$port via socket because it is not connected."; + Logging.instance.log(msg, level: LogLevel.Fatal); + throw Exception(msg); + } - _updateConnectionStatus(true); - - if (_prefs.useTor) { - if (_socksSocket == null) { - final String msg = "SubscribableElectrumXClient.connect(): " - "cannot listen to $host:$port via SOCKSSocket because it is not connected."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); + _socket!.listen( + _dataHandler, + onError: _errorHandler, + onDone: _doneHandler, + cancelOnError: true, + ); } - _socksSocket!.listen( - _dataHandler, - onError: _errorHandler, - onDone: _doneHandler, - cancelOnError: true, + _aliveTimer?.cancel(); + _aliveTimer = Timer.periodic( + _keepAlive, + (_) async => _updateConnectionStatus(await ping()), ); - } else { - if (_socket == null) { - final String msg = "SubscribableElectrumXClient.connect(): " - "cannot listen to $host:$port via socket because it is not connected."; - Logging.instance.log(msg, level: LogLevel.Fatal); - throw Exception(msg); - } + } catch (e, s) { + final msg = "SubscribableElectrumXClient.connect: " + "failed to connect to $host:$port." + "\nError: $e\nStack trace: $s"; + Logging.instance.log(msg, level: LogLevel.Fatal); - _socket!.listen( - _dataHandler, - onError: _errorHandler, - onDone: _doneHandler, - cancelOnError: true, - ); + // Ensure cleanup is performed on failure to avoid resource leaks. + await disconnect(); // Use the disconnect method to clean up. + rethrow; // Rethrow the exception to handle it further up the call stack. } - - _aliveTimer?.cancel(); - _aliveTimer = Timer.periodic( - _keepAlive, - (_) async => _updateConnectionStatus(await ping()), - ); } /// Connect to the server directly. @@ -426,8 +447,28 @@ class SubscribableElectrumXClient { /// Disconnect from the server. Future disconnect() async { _aliveTimer?.cancel(); - await _socket?.close(); - await _socksSocket?.close(); + _aliveTimer = null; + + try { + await _socket?.close(); + } catch (e, s) { + Logging.instance.log( + "SubscribableElectrumXClient.disconnect: failed to close socket." + "\nError: $e\nStack trace: $s", + level: LogLevel.Warning); + } + _socket = null; + + try { + await _socksSocket?.close(); + } catch (e, s) { + Logging.instance.log( + "SubscribableElectrumXClient.disconnect: failed to close SOCKS socket." + "\nError: $e\nStack trace: $s", + level: LogLevel.Warning); + } + _socksSocket = null; + onConnectionStatusChanged = null; } @@ -563,9 +604,9 @@ class SubscribableElectrumXClient { }) async { // If we're connecting to Tor, wait. if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkRpcClient()); + await _torConnectingLock.protect(() async => await _checkSocket()); } else { - await _checkRpcClient(); + await _checkSocket(); } // Check socket is connected. @@ -629,9 +670,9 @@ class SubscribableElectrumXClient { }) async { // If we're connecting to Tor, wait. if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkRpcClient()); + await _torConnectingLock.protect(() async => await _checkSocket()); } else { - await _checkRpcClient(); + await _checkSocket(); } // Check socket is connected. @@ -784,9 +825,9 @@ class SubscribableElectrumXClient { Future ping() async { // If we're connecting to Tor, wait. if (_requireMutex) { - await _torConnectingLock.protect(() async => await _checkRpcClient()); + await _torConnectingLock.protect(() async => await _checkSocket()); } else { - await _checkRpcClient(); + await _checkSocket(); } // Write to the socket. diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index 740b01968..97ea236ba 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -1019,6 +1019,12 @@ mixin ElectrumXInterface on Bip39HDWallet { // check and add appropriate addresses for (int k = 0; k < txCountBatchSize; k++) { + if (counts["${_id}_$k"] == null) { + print("121212"); + print("${_id}_$k"); + print("123123123"); + print(counts); + } int count = counts["${_id}_$k"]!; if (count > 0) { iterationsAddressArray.add(txCountCallArgs["${_id}_$k"]!);