diff --git a/lib/electrumx_rpc/electrumx_chain_height_service.dart b/lib/electrumx_rpc/electrumx_chain_height_service.dart index 126a0df72..3696e78f9 100644 --- a/lib/electrumx_rpc/electrumx_chain_height_service.dart +++ b/lib/electrumx_rpc/electrumx_chain_height_service.dart @@ -1,10 +1,149 @@ import 'dart:async'; +import 'package:electrum_adapter/electrum_adapter.dart'; import 'package:stackwallet/utilities/enums/coin_enum.dart'; +import 'package:stackwallet/utilities/logger.dart'; -/// Store chain height subscriptions for each coin. -abstract class ElectrumxChainHeightService { - static Map?> subscriptions = {}; - // Used to hold chain height subscriptions for each coin as in: - // ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = sub; +/// Manage chain height subscriptions for each coin. +abstract class ChainHeightServiceManager { + // A map of chain height services for each coin. + static final Map _services = {}; + // Map get services => _services; + + // Get the chain height service for a specific coin. + static ChainHeightService? getService(Coin coin) { + return _services[coin]; + } + + // Add a chain height service for a specific coin. + static void add(ChainHeightService service, Coin coin) { + // Don't add a new service if one already exists. + if (_services[coin] == null) { + _services[coin] = service; + } else { + throw Exception("Chain height service for $coin already managed"); + } + } + + // Remove a chain height service for a specific coin. + static void remove(Coin coin) { + _services.remove(coin); + } + + // Close all subscriptions and clean up resources. + static Future dispose() async { + // Close each subscription. + // + // Create a list of keys to avoid concurrent modification during iteration + var keys = List.from(_services.keys); + + // Iterate over the copy of the keys + for (final coin in keys) { + final ChainHeightService? service = getService(coin); + await service?.cancelListen(); + remove(coin); + } + } +} + +/// A service to fetch and listen for chain height updates. +/// +/// TODO: Add error handling and branching to handle various other scenarios. +class ChainHeightService { + // The electrum_adapter client to use for fetching chain height updates. + ElectrumClient client; + + // The subscription to listen for chain height updates. + StreamSubscription? _subscription; + + // Whether the service has started listening for updates. + bool get started => _subscription != null; + + // The current chain height. + int? _height; + int? get height => _height; + + // Whether the service is currently reconnecting. + bool _isReconnecting = false; + + // The reconnect timer. + Timer? _reconnectTimer; + + // The reconnection timeout duration. + static const Duration _connectionTimeout = Duration(seconds: 10); + + ChainHeightService({required this.client}); + + /// Fetch the current chain height and start listening for updates. + Future fetchHeightAndStartListenForUpdates() async { + // Don't start a new subscription if one already exists. + if (_subscription != null) { + throw Exception( + "Attempted to start a chain height service where an existing" + " subscription already exists!", + ); + } + + // A completer to wait for the current chain height to be fetched. + final completer = Completer(); + + // Fetch the current chain height. + _subscription = client.subscribeHeaders().listen((BlockHeader event) { + _height = event.height; + + if (!completer.isCompleted) { + completer.complete(_height); + } + }); + + _subscription?.onError((dynamic error) { + _handleError(error); + }); + + // Wait for the current chain height to be fetched. + return completer.future; + } + + /// Handle an error from the subscription. + void _handleError(dynamic error) { + Logging.instance.log( + "Error reconnecting for chain height: ${error.toString()}", + level: LogLevel.Error, + ); + + _subscription?.cancel(); + _subscription = null; + _attemptReconnect(); + } + + /// Attempt to reconnect to the electrum server. + void _attemptReconnect() { + // Avoid multiple reconnection attempts. + if (_isReconnecting) return; + _isReconnecting = true; + + // Attempt to reconnect. + unawaited(fetchHeightAndStartListenForUpdates().then((_) { + _isReconnecting = false; + })); + + // Set a timer to on the reconnection attempt and clean up if it fails. + _reconnectTimer?.cancel(); + _reconnectTimer = Timer(_connectionTimeout, () async { + if (_subscription == null) { + await _subscription?.cancel(); + _subscription = null; // Will also occur on an error via handleError. + _reconnectTimer?.cancel(); + _reconnectTimer = null; + _isReconnecting = false; + } + }); + } + + /// Stop listening for chain height updates. + Future cancelListen() async { + await _subscription?.cancel(); + _subscription = null; + _reconnectTimer?.cancel(); + } } diff --git a/lib/electrumx_rpc/electrumx_client.dart b/lib/electrumx_rpc/electrumx_client.dart index d8c029f36..37509f9a2 100644 --- a/lib/electrumx_rpc/electrumx_client.dart +++ b/lib/electrumx_rpc/electrumx_client.dart @@ -20,6 +20,7 @@ import 'package:event_bus/event_bus.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter_libsparkmobile/flutter_libsparkmobile.dart'; import 'package:mutex/mutex.dart'; +import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/electrumx_rpc/rpc.dart'; import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart'; import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; @@ -127,6 +128,8 @@ class ElectrumXClient { _coin = coin; final bus = globalEventBusForTesting ?? GlobalEventBus.instance; + + // Listen for tor status changes. _torStatusListener = bus.on().listen( (event) async { switch (event.newStatus) { @@ -145,6 +148,8 @@ class ElectrumXClient { } }, ); + + // Listen for tor preference changes. _torPreferenceListener = bus.on().listen( (event) async { // not sure if we need to do anything specific here @@ -157,6 +162,9 @@ class ElectrumXClient { // on the next request sent through this electrumx instance _electrumAdapterChannel = null; _electrumAdapterClient = null; + + // Also close any chain height services that are currently open. + await ChainHeightServiceManager.dispose(); }, ); } diff --git a/lib/models/isar/models/blockchain_data/v2/output_v2.dart b/lib/models/isar/models/blockchain_data/v2/output_v2.dart index f096d8a90..45a8b1329 100644 --- a/lib/models/isar/models/blockchain_data/v2/output_v2.dart +++ b/lib/models/isar/models/blockchain_data/v2/output_v2.dart @@ -68,7 +68,7 @@ class OutputV2 { scriptPubKeyHex: json["scriptPubKey"]["hex"] as String, scriptPubKeyAsm: json["scriptPubKey"]["asm"] as String?, valueStringSats: parseOutputAmountString( - json["value"].toString(), + json["value"] != null ? json["value"].toString(): "0", decimalPlaces: decimalPlaces, isFullAmountNotSats: isFullAmountNotSats, ), diff --git a/lib/wallets/wallet/wallet.dart b/lib/wallets/wallet/wallet.dart index b4b47da13..2f1691b0a 100644 --- a/lib/wallets/wallet/wallet.dart +++ b/lib/wallets/wallet/wallet.dart @@ -4,7 +4,6 @@ import 'package:isar/isar.dart'; import 'package:meta/meta.dart'; import 'package:mutex/mutex.dart'; import 'package:stackwallet/db/isar/main_db.dart'; -import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/address.dart'; import 'package:stackwallet/models/isar/models/ethereum/eth_contract.dart'; import 'package:stackwallet/models/node_model.dart'; @@ -617,9 +616,10 @@ abstract class Wallet { switch (prefs.syncType) { case SyncingType.currentWalletOnly: - // Close the subscription for this coin's chain height. - await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] - ?.cancel(); + // Close the subscription for this coin's chain height. + // NOTE: This does not work now that the subscription is shared + // await (await ChainHeightServiceManager.getService(cryptoCurrency.coin)) + // ?.cancelListen(); case SyncingType.selectedWalletsAtStartup: // Close the subscription if this wallet is not in the list to be synced. if (!prefs.walletIdsSyncOnStartup.contains(walletId)) { @@ -639,8 +639,10 @@ abstract class Wallet { // If there are no other wallets of this coin, then close the sub. if (walletIds.isEmpty) { - await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] - ?.cancel(); + // NOTE: This does not work now that the subscription is shared + // await (await ChainHeightServiceManager.getService( + // cryptoCurrency.coin)) + // ?.cancelListen(); } } case SyncingType.allWalletsOnStartup: diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index 407bfa3c0..63d49368b 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -7,7 +7,6 @@ import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib; import 'package:electrum_adapter/electrum_adapter.dart' as electrum_adapter; import 'package:electrum_adapter/electrum_adapter.dart'; import 'package:isar/isar.dart'; -import 'package:mutex/mutex.dart'; import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; @@ -17,9 +16,6 @@ import 'package:stackwallet/models/isar/models/blockchain_data/v2/transaction_v2 import 'package:stackwallet/models/isar/models/isar_models.dart'; import 'package:stackwallet/models/paymint/fee_object_model.dart'; import 'package:stackwallet/models/signing_data.dart'; -import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; -import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; -import 'package:stackwallet/services/event_bus/global_event_bus.dart'; import 'package:stackwallet/services/tor_service.dart'; import 'package:stackwallet/utilities/amount/amount.dart'; import 'package:stackwallet/utilities/enums/coin_enum.dart'; @@ -42,21 +38,10 @@ mixin ElectrumXInterface on Bip39HDWallet { late ElectrumClient electrumAdapterClient; late CachedElectrumXClient electrumXCachedClient; // late SubscribableElectrumXClient subscribableElectrumXClient; + late ChainHeightServiceManager chainHeightServiceManager; int? get maximumFeerate => null; - int? _latestHeight; - - late Prefs _prefs; - late TorService _torService; - StreamSubscription? _torPreferenceListener; - StreamSubscription? _torStatusListener; - final Mutex _torConnectingLock = Mutex(); - bool _requireMutex = false; - Timer? _aliveTimer; - static const Duration _keepAlive = Duration(minutes: 1); - bool _isConnected = false; - static const _kServerBatchCutoffVersion = [1, 6]; List? _serverVersion; bool get serverCanBatch { @@ -823,118 +808,24 @@ mixin ElectrumXInterface on Bip39HDWallet { Future fetchChainHeight() async { try { - // Don't set a stream subscription if one already exists. - if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == - null) { - final Completer completer = Completer(); + // Get the chain height service for the current coin. + ChainHeightService? service = ChainHeightServiceManager.getService( + cryptoCurrency.coin, + ); - // Make sure we only complete once. - final isFirstResponse = _latestHeight == null; - - // Check Electrum and update internal and cached versions if necessary. - await electrumXClient.checkElectrumAdapter(); - if (electrumAdapterChannel != electrumXClient.electrumAdapterChannel && - electrumXClient.electrumAdapterChannel != null) { - electrumAdapterChannel = electrumXClient.electrumAdapterChannel!; - } - if (electrumAdapterClient != electrumXClient.electrumAdapterClient && - electrumXClient.electrumAdapterClient != null) { - electrumAdapterClient = electrumXClient.electrumAdapterClient!; - } - // electrumXCachedClient.electrumAdapterChannel = electrumAdapterChannel; - if (electrumXCachedClient.electrumAdapterClient != - electrumAdapterClient) { - electrumXCachedClient.electrumAdapterClient = electrumAdapterClient; - } - - // Subscribe to and listen for new block headers. - final stream = electrumAdapterClient.subscribeHeaders(); - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = - stream.asBroadcastStream().listen((response) { - final int chainHeight = response.height; - // print("Current chain height: $chainHeight"); - - _latestHeight = chainHeight; - - if (isFirstResponse && !completer.isCompleted) { - // Return the chain height. - completer.complete(chainHeight); - } - }); - - // If we're testing, use the global event bus for testing. - // final bus = globalEventBusForTesting ?? GlobalEventBus.instance; - // No constructors for mixins, so no globalEventBusForTesting is passed in. - final bus = GlobalEventBus.instance; - - // Listen to global event bus for Tor status changes. - _torStatusListener ??= bus.on().listen( - (event) async { - try { - switch (event.newStatus) { - case TorConnectionStatus.connecting: - // If Tor is connecting, we need to wait. - await _torConnectingLock.acquire(); - _requireMutex = true; - break; - - case TorConnectionStatus.connected: - case TorConnectionStatus.disconnected: - // If Tor is connected or disconnected, we can release the lock. - if (_torConnectingLock.isLocked) { - _torConnectingLock.release(); - } - _requireMutex = false; - break; - } - } finally { - // Ensure the lock is released. - if (_torConnectingLock.isLocked) { - _torConnectingLock.release(); - } - } - }, - ); - - // Listen to global event bus for Tor preference changes. - _torPreferenceListener ??= bus.on().listen( - (event) async { - // Close any open subscriptions. - for (final coinSub - in ElectrumxChainHeightService.subscriptions.entries) { - await coinSub.value?.cancel(); - } - - // Cancel alive timer - _aliveTimer?.cancel(); - }, - ); - - // Set a timer to check if the subscription is still alive. - _aliveTimer?.cancel(); - _aliveTimer = Timer.periodic( - _keepAlive, - (_) async => _updateConnectionStatus(await electrumXClient.ping()), - ); - } else { - // Don't set a stream subscription if one already exists. - - // Check if the stream subscription is paused. - if (ElectrumxChainHeightService - .subscriptions[cryptoCurrency.coin]!.isPaused) { - // If it's paused, resume it. - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! - .resume(); - } - - if (_latestHeight != null) { - return _latestHeight!; - } + // ... or create a new one if it doesn't exist. + if (service == null) { + service = ChainHeightService(client: electrumAdapterClient); + ChainHeightServiceManager.add(service, cryptoCurrency.coin); } - // Probably waiting on the subscription to receive the latest block height - // fallback to cached value - return info.cachedChainHeight; + // If the service hasn't been started, start it and fetch the chain height. + if (!service.started) { + return await service.fetchHeightAndStartListenForUpdates(); + } + + // Return the height as per the service if available or the cached height. + return service.height ?? info.cachedChainHeight; } catch (e, s) { Logging.instance.log( "Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s", @@ -1061,15 +952,6 @@ mixin ElectrumXInterface on Bip39HDWallet { // host: newNode.address, port: newNode.port); } - /// Update the connection status and call the onConnectionStatusChanged callback if it exists. - void _updateConnectionStatus(bool connectionStatus) { - // TODO [prio=low]: Set onConnectionStatusChanged callback. - // if (_isConnected != connectionStatus && onConnectionStatusChanged != null) { - // onConnectionStatusChanged!(connectionStatus); - // } - _isConnected = connectionStatus; - } - //============================================================================ Future<({List
addresses, int index})> checkGapsBatched( diff --git a/pubspec.lock b/pubspec.lock index c0e5b16c1..2bba3135a 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -528,8 +528,8 @@ packages: dependency: "direct main" description: path: "." - ref: "51b7a60e07b0409b361e31da65d98178ee235bed" - resolved-ref: "51b7a60e07b0409b361e31da65d98178ee235bed" + ref: "0a34f7f48d921fb33f551cb11dfc9b2930522240" + resolved-ref: "0a34f7f48d921fb33f551cb11dfc9b2930522240" url: "https://github.com/cypherstack/electrum_adapter.git" source: git version: "3.0.0" @@ -1727,8 +1727,8 @@ packages: dependency: "direct main" description: path: "." - ref: "0a6888282f4e98401051a396e9d2293bd55ac2c2" - resolved-ref: "0a6888282f4e98401051a396e9d2293bd55ac2c2" + ref: e37dc4e22f7acb2746b70bdc935f0eb3c50b8b71 + resolved-ref: e37dc4e22f7acb2746b70bdc935f0eb3c50b8b71 url: "https://github.com/cypherstack/tor.git" source: git version: "0.0.1" diff --git a/pubspec.yaml b/pubspec.yaml index 764c2bd57..472e302fc 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -65,7 +65,7 @@ dependencies: tor_ffi_plugin: git: url: https://github.com/cypherstack/tor.git - ref: 0a6888282f4e98401051a396e9d2293bd55ac2c2 + ref: e37dc4e22f7acb2746b70bdc935f0eb3c50b8b71 fusiondart: git: @@ -176,7 +176,7 @@ dependencies: electrum_adapter: git: url: https://github.com/cypherstack/electrum_adapter.git - ref: 51b7a60e07b0409b361e31da65d98178ee235bed + ref: 0a34f7f48d921fb33f551cb11dfc9b2930522240 stream_channel: ^2.1.0 dev_dependencies: