From a807303eba3d953dca14ba67772165b3948faa0f Mon Sep 17 00:00:00 2001 From: sneurlax Date: Fri, 16 Feb 2024 16:33:19 -0600 Subject: [PATCH] listen to tor and preferences changes and handle connections accordingly --- .../electrumx_interface.dart | 93 ++++++++++++++++++- 1 file changed, 88 insertions(+), 5 deletions(-) diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index 49c3c9ae8..245dd9ed5 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -7,6 +7,7 @@ import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib; import 'package:electrum_adapter/electrum_adapter.dart' as electrum_adapter; import 'package:electrum_adapter/electrum_adapter.dart'; import 'package:isar/isar.dart'; +import 'package:mutex/mutex.dart'; import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; @@ -18,6 +19,7 @@ import 'package:stackwallet/models/paymint/fee_object_model.dart'; import 'package:stackwallet/models/signing_data.dart'; import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; +import 'package:stackwallet/services/event_bus/global_event_bus.dart'; import 'package:stackwallet/services/tor_service.dart'; import 'package:stackwallet/utilities/amount/amount.dart'; import 'package:stackwallet/utilities/enums/coin_enum.dart'; @@ -45,8 +47,15 @@ mixin ElectrumXInterface on Bip39HDWallet { int? _latestHeight; + late Prefs _prefs; + late TorService _torService; StreamSubscription? _torPreferenceListener; StreamSubscription? _torStatusListener; + final Mutex _torConnectingLock = Mutex(); + bool _requireMutex = false; + Timer? _aliveTimer; + static const Duration _keepAlive = Duration(minutes: 1); + bool _isConnected = false; static const _kServerBatchCutoffVersion = [1, 6]; List? _serverVersion; @@ -814,9 +823,6 @@ mixin ElectrumXInterface on Bip39HDWallet { Future fetchChainHeight() async { try { - // _checkChainHeightSubscription(); - // TODO above. Make sure that the subscription/stream is alive. - // Don't set a stream subscription if one already exists. if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == null) { @@ -825,11 +831,24 @@ mixin ElectrumXInterface on Bip39HDWallet { // Make sure we only complete once. final isFirstResponse = _latestHeight == null; + // Check Electrum and update internal and cached versions if necessary. await electrumXClient.checkElectrumAdapter(); - // TODO [prio=extreme]: Does this update anything in this file?? Thinking no. + if (electrumAdapterChannel != electrumXClient.electrumAdapterChannel && + electrumXClient.electrumAdapterChannel != null) { + electrumAdapterChannel = electrumXClient.electrumAdapterChannel!; + } + if (electrumAdapterClient != electrumXClient.electrumAdapterClient && + electrumXClient.electrumAdapterClient != null) { + electrumAdapterClient = electrumXClient.electrumAdapterClient!; + } + // electrumXCachedClient.electrumAdapterChannel = electrumAdapterChannel; + if (electrumXCachedClient.electrumAdapterClient != + electrumAdapterClient) { + electrumXCachedClient.electrumAdapterClient = electrumAdapterClient; + } + // Subscribe to and listen for new block headers. final stream = electrumAdapterClient.subscribeHeaders(); - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = stream.asBroadcastStream().listen((response) { final int chainHeight = response.height; @@ -842,6 +861,61 @@ mixin ElectrumXInterface on Bip39HDWallet { completer.complete(chainHeight); } }); + + // If we're testing, use the global event bus for testing. + // final bus = globalEventBusForTesting ?? GlobalEventBus.instance; + // No constructors for mixins, so no globalEventBusForTesting is passed in. + final bus = GlobalEventBus.instance; + + // Listen to global event bus for Tor status changes. + _torStatusListener ??= bus.on().listen( + (event) async { + try { + switch (event.newStatus) { + case TorConnectionStatus.connecting: + // If Tor is connecting, we need to wait. + await _torConnectingLock.acquire(); + _requireMutex = true; + break; + + case TorConnectionStatus.connected: + case TorConnectionStatus.disconnected: + // If Tor is connected or disconnected, we can release the lock. + if (_torConnectingLock.isLocked) { + _torConnectingLock.release(); + } + _requireMutex = false; + break; + } + } finally { + // Ensure the lock is released. + if (_torConnectingLock.isLocked) { + _torConnectingLock.release(); + } + } + }, + ); + + // Listen to global event bus for Tor preference changes. + _torPreferenceListener ??= bus.on().listen( + (event) async { + // Close any open subscriptions. + for (final coinSub + in ElectrumxChainHeightService.subscriptions.entries) { + await coinSub.value?.cancel(); + } + + // Cancel alive timer + _aliveTimer?.cancel(); + }, + ); + + // Set a timer to check if the subscription is still alive. + _aliveTimer?.cancel(); + _aliveTimer = Timer.periodic( + _keepAlive, + (_) async => _updateConnectionStatus(await electrumXClient.ping()), + ); } else { // Don't set a stream subscription if one already exists. @@ -977,6 +1051,15 @@ mixin ElectrumXInterface on Bip39HDWallet { // host: newNode.address, port: newNode.port); } + /// Update the connection status and call the onConnectionStatusChanged callback if it exists. + void _updateConnectionStatus(bool connectionStatus) { + // TODO [prio=low]: Set onConnectionStatusChanged callback. + // if (_isConnected != connectionStatus && onConnectionStatusChanged != null) { + // onConnectionStatusChanged!(connectionStatus); + // } + _isConnected = connectionStatus; + } + //============================================================================ Future<({List
addresses, int index})> checkGapsBatched(