resolve recursion issue and add more cleanup and logging/error handling

and refactor _checkRpcClient -> _checkSocket
This commit is contained in:
sneurlax 2024-02-05 16:35:18 -06:00
parent 0f665bd602
commit 2fb3034dc0
2 changed files with 159 additions and 112 deletions

View file

@ -94,21 +94,28 @@ class SubscribableElectrumXClient {
// Listen to global event bus for Tor status changes. // Listen to global event bus for Tor status changes.
_torStatusListener = bus.on<TorConnectionStatusChangedEvent>().listen( _torStatusListener = bus.on<TorConnectionStatusChangedEvent>().listen(
(event) async { (event) async {
switch (event.newStatus) { try {
case TorConnectionStatus.connecting: switch (event.newStatus) {
// If Tor is connecting, we need to wait. case TorConnectionStatus.connecting:
await _torConnectingLock.acquire(); // If Tor is connecting, we need to wait.
_requireMutex = true; await _torConnectingLock.acquire();
break; _requireMutex = true;
break;
case TorConnectionStatus.connected: case TorConnectionStatus.connected:
case TorConnectionStatus.disconnected: case TorConnectionStatus.disconnected:
// If Tor is connected or disconnected, we can release the lock. // If Tor is connected or disconnected, we can release the lock.
if (_torConnectingLock.isLocked) { if (_torConnectingLock.isLocked) {
_torConnectingLock.release(); _torConnectingLock.release();
} }
_requireMutex = false; _requireMutex = false;
break; break;
}
} finally {
// Ensure the lock is released.
if (_torConnectingLock.isLocked) {
_torConnectingLock.release();
}
} }
}, },
); );
@ -166,7 +173,7 @@ class SubscribableElectrumXClient {
/// Check if the RPC client is connected and connect if needed. /// Check if the RPC client is connected and connect if needed.
/// ///
/// If Tor is enabled but not running, it will attempt to start Tor. /// If Tor is enabled but not running, it will attempt to start Tor.
Future<void> _checkRpcClient() async { Future<void> _checkSocket({bool connecting = false}) async {
if (_prefs.useTor) { if (_prefs.useTor) {
// If we're supposed to use Tor... // If we're supposed to use Tor...
if (_torService.status != TorConnectionStatus.connected) { if (_torService.status != TorConnectionStatus.connected) {
@ -193,23 +200,25 @@ class SubscribableElectrumXClient {
} }
// Connect if needed. // Connect if needed.
if ((!_prefs.useTor && _socket == null) || if (!connecting) {
(_prefs.useTor && _socksSocket == null)) { if ((!_prefs.useTor && _socket == null) ||
if (currentFailoverIndex == -1) { (_prefs.useTor && _socksSocket == null)) {
// Check if we have cached node information if (currentFailoverIndex == -1) {
if (_host == null && _port == null) { // Check if we have cached node information
throw Exception("SubscribableElectrumXClient._checkRpcClient: " if (_host == null && _port == null) {
"No host or port provided and no cached node information."); throw Exception("SubscribableElectrumXClient._checkRpcClient: "
} "No host or port provided and no cached node information.");
}
// Connect to the server. // Connect to the server.
await connect(host: _host!, port: _port!); await connect(host: _host!, port: _port!);
} else { } else {
// Attempt to connect to the next failover server. // Attempt to connect to the next failover server.
await connect( await connect(
host: failovers![currentFailoverIndex].address, host: failovers![currentFailoverIndex].address,
port: failovers![currentFailoverIndex].port, port: failovers![currentFailoverIndex].port,
); );
}
} }
} }
} }
@ -221,94 +230,106 @@ class SubscribableElectrumXClient {
required String host, required String host,
required int port, required int port,
}) async { }) async {
// Cache node information.
_host = host;
_port = port;
// If we're already connected, disconnect first.
try { try {
await _socket?.close(); // Cache node information.
} catch (_) {} _host = host;
_port = port;
// If we're connecting to Tor, wait. // If we're already connected, disconnect first.
if (_requireMutex) { try {
await _torConnectingLock.protect(() async => await _checkRpcClient()); await _socket?.close();
} else { } catch (_) {}
await _checkRpcClient();
}
if (!Prefs.instance.useTor) { // If we're connecting to Tor, wait.
// If we're not supposed to use Tor, then connect directly. if (_requireMutex) {
await connectClearnet(host, port); await _torConnectingLock
} else { .protect(() async => await _checkSocket(connecting: true));
// If we're supposed to use Tor... } else {
if (_torService.status != TorConnectionStatus.connected) { await _checkSocket(connecting: true);
// ... but Tor isn't running... }
if (!_prefs.torKillSwitch) {
// ... and the killswitch isn't set, then we'll connect clearnet.
Logging.instance.log(
"Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet",
level: LogLevel.Warning,
);
await connectClearnet(host, port);
} else {
// ... but if the killswitch is set, then let's try to start Tor.
await _torService.start();
// TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature.
// Doublecheck that Tor is running. if (!Prefs.instance.useTor) {
if (_torService.status != TorConnectionStatus.connected) { // If we're not supposed to use Tor, then connect directly.
// If Tor still isn't running, then we'll throw an exception. await connectClearnet(host, port);
throw Exception( } else {
"Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX"); // If we're supposed to use Tor...
if (_torService.status != TorConnectionStatus.connected) {
// ... but Tor isn't running...
if (!_prefs.torKillSwitch) {
// ... and the killswitch isn't set, then we'll connect clearnet.
Logging.instance.log(
"Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet",
level: LogLevel.Warning,
);
await connectClearnet(host, port);
} else {
// ... but if the killswitch is set, then let's try to start Tor.
await _torService.start();
// TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature.
// Doublecheck that Tor is running.
if (_torService.status != TorConnectionStatus.connected) {
// If Tor still isn't running, then we'll throw an exception.
throw Exception(
"Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX");
}
// Connect via Tor.
await connectTor(host, port);
} }
} else {
// Connect via Tor. // Connect via Tor.
await connectTor(host, port); await connectTor(host, port);
} }
}
_updateConnectionStatus(true);
if (_prefs.useTor) {
if (_socksSocket == null) {
final String msg = "SubscribableElectrumXClient.connect(): "
"cannot listen to $host:$port via SOCKSSocket because it is not connected.";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
_socksSocket!.listen(
_dataHandler,
onError: _errorHandler,
onDone: _doneHandler,
cancelOnError: true,
);
} else { } else {
// Connect via Tor. if (_socket == null) {
await connectTor(host, port); final String msg = "SubscribableElectrumXClient.connect(): "
} "cannot listen to $host:$port via socket because it is not connected.";
} Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
_updateConnectionStatus(true); _socket!.listen(
_dataHandler,
if (_prefs.useTor) { onError: _errorHandler,
if (_socksSocket == null) { onDone: _doneHandler,
final String msg = "SubscribableElectrumXClient.connect(): " cancelOnError: true,
"cannot listen to $host:$port via SOCKSSocket because it is not connected."; );
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
} }
_socksSocket!.listen( _aliveTimer?.cancel();
_dataHandler, _aliveTimer = Timer.periodic(
onError: _errorHandler, _keepAlive,
onDone: _doneHandler, (_) async => _updateConnectionStatus(await ping()),
cancelOnError: true,
); );
} else { } catch (e, s) {
if (_socket == null) { final msg = "SubscribableElectrumXClient.connect: "
final String msg = "SubscribableElectrumXClient.connect(): " "failed to connect to $host:$port."
"cannot listen to $host:$port via socket because it is not connected."; "\nError: $e\nStack trace: $s";
Logging.instance.log(msg, level: LogLevel.Fatal); Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
_socket!.listen( // Ensure cleanup is performed on failure to avoid resource leaks.
_dataHandler, await disconnect(); // Use the disconnect method to clean up.
onError: _errorHandler, rethrow; // Rethrow the exception to handle it further up the call stack.
onDone: _doneHandler,
cancelOnError: true,
);
} }
_aliveTimer?.cancel();
_aliveTimer = Timer.periodic(
_keepAlive,
(_) async => _updateConnectionStatus(await ping()),
);
} }
/// Connect to the server directly. /// Connect to the server directly.
@ -426,8 +447,28 @@ class SubscribableElectrumXClient {
/// Disconnect from the server. /// Disconnect from the server.
Future<void> disconnect() async { Future<void> disconnect() async {
_aliveTimer?.cancel(); _aliveTimer?.cancel();
await _socket?.close(); _aliveTimer = null;
await _socksSocket?.close();
try {
await _socket?.close();
} catch (e, s) {
Logging.instance.log(
"SubscribableElectrumXClient.disconnect: failed to close socket."
"\nError: $e\nStack trace: $s",
level: LogLevel.Warning);
}
_socket = null;
try {
await _socksSocket?.close();
} catch (e, s) {
Logging.instance.log(
"SubscribableElectrumXClient.disconnect: failed to close SOCKS socket."
"\nError: $e\nStack trace: $s",
level: LogLevel.Warning);
}
_socksSocket = null;
onConnectionStatusChanged = null; onConnectionStatusChanged = null;
} }
@ -563,9 +604,9 @@ class SubscribableElectrumXClient {
}) async { }) async {
// If we're connecting to Tor, wait. // If we're connecting to Tor, wait.
if (_requireMutex) { if (_requireMutex) {
await _torConnectingLock.protect(() async => await _checkRpcClient()); await _torConnectingLock.protect(() async => await _checkSocket());
} else { } else {
await _checkRpcClient(); await _checkSocket();
} }
// Check socket is connected. // Check socket is connected.
@ -629,9 +670,9 @@ class SubscribableElectrumXClient {
}) async { }) async {
// If we're connecting to Tor, wait. // If we're connecting to Tor, wait.
if (_requireMutex) { if (_requireMutex) {
await _torConnectingLock.protect(() async => await _checkRpcClient()); await _torConnectingLock.protect(() async => await _checkSocket());
} else { } else {
await _checkRpcClient(); await _checkSocket();
} }
// Check socket is connected. // Check socket is connected.
@ -784,9 +825,9 @@ class SubscribableElectrumXClient {
Future<bool> ping() async { Future<bool> ping() async {
// If we're connecting to Tor, wait. // If we're connecting to Tor, wait.
if (_requireMutex) { if (_requireMutex) {
await _torConnectingLock.protect(() async => await _checkRpcClient()); await _torConnectingLock.protect(() async => await _checkSocket());
} else { } else {
await _checkRpcClient(); await _checkSocket();
} }
// Write to the socket. // Write to the socket.

View file

@ -1019,6 +1019,12 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// check and add appropriate addresses // check and add appropriate addresses
for (int k = 0; k < txCountBatchSize; k++) { for (int k = 0; k < txCountBatchSize; k++) {
if (counts["${_id}_$k"] == null) {
print("121212");
print("${_id}_$k");
print("123123123");
print(counts);
}
int count = counts["${_id}_$k"]!; int count = counts["${_id}_$k"]!;
if (count > 0) { if (count > 0) {
iterationsAddressArray.add(txCountCallArgs["${_id}_$k"]!); iterationsAddressArray.add(txCountCallArgs["${_id}_$k"]!);