From e58a614729c1015923f0b213282c2c26be1eb5e3 Mon Sep 17 00:00:00 2001 From: sneurlax Date: Wed, 14 Feb 2024 15:56:34 -0600 Subject: [PATCH] remove recursion to resolve deadlock issue --- .../electrumx_chain_height_service.dart | 4 - .../electrumx_interface.dart | 166 ++++++------------ 2 files changed, 52 insertions(+), 118 deletions(-) diff --git a/lib/electrumx_rpc/electrumx_chain_height_service.dart b/lib/electrumx_rpc/electrumx_chain_height_service.dart index b55506dd4..cc799d3c5 100644 --- a/lib/electrumx_rpc/electrumx_chain_height_service.dart +++ b/lib/electrumx_rpc/electrumx_chain_height_service.dart @@ -11,8 +11,4 @@ abstract class ElectrumxChainHeightService { // Used to hold chain height completers for each coin as in: // ElectrumxChainHeightService.completers[cryptoCurrency.coin] = completer; static Map?> completers = {}; - - // Used to hold the time each coin started waiting for chain height as in: - // ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = time; - static Map timeStarted = {}; } diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index 0dc7e3fdf..566911ca0 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -808,13 +808,7 @@ mixin ElectrumXInterface on Bip39HDWallet { // Don't set a stream subscription if one already exists. await _manageChainHeightSubscription(); - if (_latestHeight == null) { - // Probably waiting on the subscription to receive the latest block - // height, fallback to cached value - return info.cachedChainHeight; - } else { - return _latestHeight!; - } + return _latestHeight ?? info.cachedChainHeight; } catch (e, s) { Logging.instance.log( "Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s", @@ -829,132 +823,76 @@ mixin ElectrumXInterface on Bip39HDWallet { static final Mutex _subMutex = Mutex(); Future _manageChainHeightSubscription() async { - await _subMutex.protect(() async { - // Set the timeout period for the chain height subscription. - const timeout = Duration(seconds: 10); + // Set the timeout period for the chain height subscription. + const timeout = Duration(seconds: 10); + await _subMutex.protect(() async { if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == null) { - // No subscription exists for this coin yet, so create one. - // - // Set up to wait for the first response. - final Completer completer = Completer(); - ElectrumxChainHeightService.completers[cryptoCurrency.coin] ??= - completer; - - // Make sure we only complete once. - final isFirstResponse = _latestHeight == null; - - // Subscribe to block headers. - final subscription = - subscribableElectrumXClient.subscribeToBlockHeaders(); - - // Set the time the subscription was created. - final subscriptionCreationTime = DateTime.now(); - ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = - subscriptionCreationTime; - - // Set stream subscription. - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = - subscription.responseStream.asBroadcastStream().listen((event) { - final response = event; - if (response != null && - response is Map && - response.containsKey('height')) { - final int chainHeight = response['height'] as int; - // print("Current chain height: $chainHeight"); - - _latestHeight = chainHeight; - - if (isFirstResponse) { - // If the completer is not completed, complete it. - if (!ElectrumxChainHeightService - .completers[cryptoCurrency.coin]!.isCompleted) { - // Complete the completer, returning the chain height. - ElectrumxChainHeightService.completers[cryptoCurrency.coin]! - .complete(chainHeight); - } - } - } else { - Logging.instance.log( - "blockchain.headers.subscribe returned malformed response\n" - "Response: $response", - level: LogLevel.Error); - } - }); - } - - // A subscription already exists. - // - // Resume the stream subscription if it's paused. - if (ElectrumxChainHeightService + await _createSubscription(); + } else if (ElectrumxChainHeightService .subscriptions[cryptoCurrency.coin]!.isPaused) { - // If it's paused, resume it. ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! .resume(); } + }); - // Causes synchronization to stall. - // // Check if the stream subscription is active by pinging it. - // if (!(await subscribableElectrumXClient.ping())) { - // // If it's not active, reconnect it. - // final node = await getCurrentElectrumXNode(); - // - // await subscribableElectrumXClient.connect( - // host: node.address, port: node.port); - // - // // Wait for first response. - // return completer.future; - // } - - // If there's no completer set for this coin, something's gone wrong. - // - // The completer is always set before the subscription, so this should - // never happen. - if (ElectrumxChainHeightService.completers[cryptoCurrency.coin] == null) { + // Ensure _latestHeight is updated before proceeding. + if (_latestHeight == null && + ElectrumxChainHeightService.completers[cryptoCurrency.coin] != null) { + try { + // Use a timeout to wait for the completer to avoid indefinite blocking. + _latestHeight = await ElectrumxChainHeightService + .completers[cryptoCurrency.coin]!.future + .timeout(timeout); + } catch (e) { + Logging.instance + .log("Timeout waiting for chain height", level: LogLevel.Error); // Clear this coin's subscription. await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! .cancel(); ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null; - - // Retry/recurse. - return await _manageChainHeightSubscription(); } + } + } - // Check if the subscription has been running for too long. - if (ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] != - null) { - final timeRunning = DateTime.now().difference( - ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin]!); - // Cancel and retry if we've been waiting too long. - if (timeRunning > timeout) { - // Clear this coin's subscription. - await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! - .cancel(); - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null; + Future _createSubscription() async { + final completer = Completer(); + ElectrumxChainHeightService.completers[cryptoCurrency.coin] = completer; - // Clear this coin's completer. - ElectrumxChainHeightService.completers[cryptoCurrency.coin] - ?.completeError( - Exception( - "Subscription to block headers has been running for too long", - ), - ); - ElectrumxChainHeightService.completers[cryptoCurrency.coin] = null; + // Make sure we only complete once. + final isFirstResponse = _latestHeight == null; - // Reset time started. - ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = null; + // Subscribe to block headers. + final subscription = subscribableElectrumXClient.subscribeToBlockHeaders(); - // Retry/recurse. - return await _manageChainHeightSubscription(); + // Set stream subscription. + ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = + subscription.responseStream.asBroadcastStream().listen((event) { + final response = event; + if (response != null && + response is Map && + response.containsKey('height')) { + final int chainHeight = response['height'] as int; + // print("Current chain height: $chainHeight"); + + _latestHeight = chainHeight; + + if (isFirstResponse) { + // If the completer is not completed, complete it. + if (!ElectrumxChainHeightService + .completers[cryptoCurrency.coin]!.isCompleted) { + // Complete the completer, returning the chain height. + ElectrumxChainHeightService.completers[cryptoCurrency.coin]! + .complete(chainHeight); + } } + } else { + Logging.instance.log( + "blockchain.headers.subscribe returned malformed response\n" + "Response: $response", + level: LogLevel.Error); } - - // Wait for the first response. - _latestHeight = await ElectrumxChainHeightService - .completers[cryptoCurrency.coin]!.future; - - return; }); }