From be8ef772b0f3eea2bfcb3aa51d88ddcb0367d0bd Mon Sep 17 00:00:00 2001 From: julian Date: Sat, 17 Feb 2024 15:47:53 +0700 Subject: [PATCH] INCOMPLETE: Untested refactor to reduce number of chain subscriptions and simply the management thereof --- .../electrumx_chain_height_service.dart | 59 ++++++++++++++++--- lib/wallets/wallet/wallet.dart | 14 +++-- .../electrumx_interface.dart | 57 ++++-------------- 3 files changed, 70 insertions(+), 60 deletions(-) diff --git a/lib/electrumx_rpc/electrumx_chain_height_service.dart b/lib/electrumx_rpc/electrumx_chain_height_service.dart index cc799d3c5..ed8de6e5c 100644 --- a/lib/electrumx_rpc/electrumx_chain_height_service.dart +++ b/lib/electrumx_rpc/electrumx_chain_height_service.dart @@ -1,14 +1,57 @@ import 'dart:async'; +import 'package:electrum_adapter/electrum_adapter.dart'; import 'package:stackwallet/utilities/enums/coin_enum.dart'; -/// Store chain height subscriptions for each coin. -abstract class ElectrumxChainHeightService { - // Used to hold chain height subscriptions for each coin as in: - // ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = sub; - static Map?> subscriptions = {}; +/// Manage chain height subscriptions for each coin. +abstract class ChainHeightServiceManager { + static final Map _services = {}; - // Used to hold chain height completers for each coin as in: - // ElectrumxChainHeightService.completers[cryptoCurrency.coin] = completer; - static Map?> completers = {}; + static ChainHeightService? getService(Coin coin) { + return _services[coin]; + } + + static void add(ChainHeightService service, Coin coin) { + if (_services[coin] == null) { + _services[coin] = service; + } else { + throw Exception("Chain height service for $coin already managed"); + } + } +} + +// Basic untested impl. Needs error handling and branching to handle +// various other scenarios +class ChainHeightService { + ElectrumClient client; + + StreamSubscription? _subscription; + bool get started => _subscription != null; + + int? _height; + int? get height => _height; + + ChainHeightService({required this.client}); + + Future fetchHeightAndStartListenForUpdates() async { + if (_subscription != null) { + throw Exception( + "Attempted to start a chain height service where an existing" + " subscription already exists!", + ); + } + + final completer = Completer(); + _subscription = client.subscribeHeaders().listen((event) { + _height = event.height; + if (!completer.isCompleted) { + completer.complete(_height); + } + }); + + return completer.future; + } + + /// Untested/Unknown implications. USE AT OWN RISK + Future cancelListen() async => await _subscription?.cancel(); } diff --git a/lib/wallets/wallet/wallet.dart b/lib/wallets/wallet/wallet.dart index b4b47da13..2f1691b0a 100644 --- a/lib/wallets/wallet/wallet.dart +++ b/lib/wallets/wallet/wallet.dart @@ -4,7 +4,6 @@ import 'package:isar/isar.dart'; import 'package:meta/meta.dart'; import 'package:mutex/mutex.dart'; import 'package:stackwallet/db/isar/main_db.dart'; -import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/address.dart'; import 'package:stackwallet/models/isar/models/ethereum/eth_contract.dart'; import 'package:stackwallet/models/node_model.dart'; @@ -617,9 +616,10 @@ abstract class Wallet { switch (prefs.syncType) { case SyncingType.currentWalletOnly: - // Close the subscription for this coin's chain height. - await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] - ?.cancel(); + // Close the subscription for this coin's chain height. + // NOTE: This does not work now that the subscription is shared + // await (await ChainHeightServiceManager.getService(cryptoCurrency.coin)) + // ?.cancelListen(); case SyncingType.selectedWalletsAtStartup: // Close the subscription if this wallet is not in the list to be synced. if (!prefs.walletIdsSyncOnStartup.contains(walletId)) { @@ -639,8 +639,10 @@ abstract class Wallet { // If there are no other wallets of this coin, then close the sub. if (walletIds.isEmpty) { - await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] - ?.cancel(); + // NOTE: This does not work now that the subscription is shared + // await (await ChainHeightServiceManager.getService( + // cryptoCurrency.coin)) + // ?.cancelListen(); } } case SyncingType.allWalletsOnStartup: diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index 91b227792..1020bf7db 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -44,8 +44,6 @@ mixin ElectrumXInterface on Bip39HDWallet { int? get maximumFeerate => null; - int? _latestHeight; - StreamSubscription? _torPreferenceListener; StreamSubscription? _torStatusListener; @@ -815,53 +813,20 @@ mixin ElectrumXInterface on Bip39HDWallet { Future fetchChainHeight() async { try { - // _checkChainHeightSubscription(); - // TODO above. Make sure that the subscription/stream is alive. + ChainHeightService? service = ChainHeightServiceManager.getService( + cryptoCurrency.coin, + ); - // Don't set a stream subscription if one already exists. - if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == - null) { - final Completer completer = Completer(); - - // Make sure we only complete once. - final isFirstResponse = _latestHeight == null; - - await electrumXClient.checkElectrumAdapter(); - // TODO [prio=extreme]: Does this update anything in this file?? Thinking no. - - final stream = electrumAdapterClient.subscribeHeaders(); - - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = - stream.asBroadcastStream().listen((response) { - final int chainHeight = response.height; - // print("Current chain height: $chainHeight"); - - _latestHeight = chainHeight; - - if (isFirstResponse && !completer.isCompleted) { - // Return the chain height. - completer.complete(chainHeight); - } - }); - } else { - // Don't set a stream subscription if one already exists. - - // Check if the stream subscription is paused. - if (ElectrumxChainHeightService - .subscriptions[cryptoCurrency.coin]!.isPaused) { - // If it's paused, resume it. - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! - .resume(); - } - - if (_latestHeight != null) { - return _latestHeight!; - } + if (service == null) { + service = ChainHeightService(client: electrumAdapterClient); + ChainHeightServiceManager.add(service, cryptoCurrency.coin); } - // Probably waiting on the subscription to receive the latest block height - // fallback to cached value - return info.cachedChainHeight; + if (!service.started) { + return await service.fetchHeightAndStartListenForUpdates(); + } + + return service.height ?? info.cachedChainHeight; } catch (e, s) { Logging.instance.log( "Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s",