From fb79cd867cb4e66f7621b0756b1822759400b330 Mon Sep 17 00:00:00 2001 From: sneurlax Date: Wed, 14 Feb 2024 15:23:36 -0600 Subject: [PATCH] use mutex to control race conditions --- .../electrumx_interface.dart | 167 +++++++++--------- 1 file changed, 86 insertions(+), 81 deletions(-) diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index ec244b0b4..0dc7e3fdf 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -5,6 +5,7 @@ import 'package:bip47/src/util.dart'; import 'package:bitcoindart/bitcoindart.dart' as bitcoindart; import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib; import 'package:isar/isar.dart'; +import 'package:mutex/mutex.dart'; import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; @@ -824,33 +825,35 @@ mixin ElectrumXInterface on Bip39HDWallet { } } + // Mutex to control subscription management access. + static final Mutex _subMutex = Mutex(); + Future _manageChainHeightSubscription() async { - // Set the timeout period for the chain height subscription. - const timeout = Duration(seconds: 10); + await _subMutex.protect(() async { + // Set the timeout period for the chain height subscription. + const timeout = Duration(seconds: 10); - if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == - null) { - // No subscription exists for this coin yet, so create one. - // - // Set up to wait for the first response. - final Completer completer = Completer(); - ElectrumxChainHeightService.completers[cryptoCurrency.coin] ??= completer; - - // Make sure we only complete once. - final isFirstResponse = _latestHeight == null; - - // Subscribe to block headers. - final subscription = - subscribableElectrumXClient.subscribeToBlockHeaders(); - - // Set the time the subscription was created. - final subscriptionCreationTime = DateTime.now(); - ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = - subscriptionCreationTime; - - // Doublecheck to avoid race condition. if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == null) { + // No subscription exists for this coin yet, so create one. + // + // Set up to wait for the first response. + final Completer completer = Completer(); + ElectrumxChainHeightService.completers[cryptoCurrency.coin] ??= + completer; + + // Make sure we only complete once. + final isFirstResponse = _latestHeight == null; + + // Subscribe to block headers. + final subscription = + subscribableElectrumXClient.subscribeToBlockHeaders(); + + // Set the time the subscription was created. + final subscriptionCreationTime = DateTime.now(); + ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = + subscriptionCreationTime; + // Set stream subscription. ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = subscription.responseStream.asBroadcastStream().listen((event) { @@ -880,77 +883,79 @@ mixin ElectrumXInterface on Bip39HDWallet { } }); } - } - // A subscription already exists. - // - // Resume the stream subscription if it's paused. - if (ElectrumxChainHeightService - .subscriptions[cryptoCurrency.coin]!.isPaused) { - // If it's paused, resume it. - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!.resume(); - } + // A subscription already exists. + // + // Resume the stream subscription if it's paused. + if (ElectrumxChainHeightService + .subscriptions[cryptoCurrency.coin]!.isPaused) { + // If it's paused, resume it. + ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! + .resume(); + } - // Causes synchronization to stall. - // // Check if the stream subscription is active by pinging it. - // if (!(await subscribableElectrumXClient.ping())) { - // // If it's not active, reconnect it. - // final node = await getCurrentElectrumXNode(); - // - // await subscribableElectrumXClient.connect( - // host: node.address, port: node.port); - // - // // Wait for first response. - // return completer.future; - // } + // Causes synchronization to stall. + // // Check if the stream subscription is active by pinging it. + // if (!(await subscribableElectrumXClient.ping())) { + // // If it's not active, reconnect it. + // final node = await getCurrentElectrumXNode(); + // + // await subscribableElectrumXClient.connect( + // host: node.address, port: node.port); + // + // // Wait for first response. + // return completer.future; + // } - // If there's no completer set for this coin, something's gone wrong. - // - // The completer is always set before the subscription, so this should - // never happen. - if (ElectrumxChainHeightService.completers[cryptoCurrency.coin] == null) { - // Clear this coin's subscription. - await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! - .cancel(); - ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null; - - // Retry/recurse. - return await _manageChainHeightSubscription(); - } - - // Check if the subscription has been running for too long. - if (ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] != null) { - final timeRunning = DateTime.now().difference( - ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin]!); - // Cancel and retry if we've been waiting too long. - if (timeRunning > timeout) { + // If there's no completer set for this coin, something's gone wrong. + // + // The completer is always set before the subscription, so this should + // never happen. + if (ElectrumxChainHeightService.completers[cryptoCurrency.coin] == null) { // Clear this coin's subscription. await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! .cancel(); ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null; - // Clear this coin's completer. - ElectrumxChainHeightService.completers[cryptoCurrency.coin] - ?.completeError( - Exception( - "Subscription to block headers has been running for too long", - ), - ); - ElectrumxChainHeightService.completers[cryptoCurrency.coin] = null; - - // Reset time started. - ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = null; - // Retry/recurse. return await _manageChainHeightSubscription(); } - } - // Wait for the first response. - _latestHeight = await ElectrumxChainHeightService - .completers[cryptoCurrency.coin]!.future; + // Check if the subscription has been running for too long. + if (ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] != + null) { + final timeRunning = DateTime.now().difference( + ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin]!); + // Cancel and retry if we've been waiting too long. + if (timeRunning > timeout) { + // Clear this coin's subscription. + await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! + .cancel(); + ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null; - return; + // Clear this coin's completer. + ElectrumxChainHeightService.completers[cryptoCurrency.coin] + ?.completeError( + Exception( + "Subscription to block headers has been running for too long", + ), + ); + ElectrumxChainHeightService.completers[cryptoCurrency.coin] = null; + + // Reset time started. + ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = null; + + // Retry/recurse. + return await _manageChainHeightSubscription(); + } + } + + // Wait for the first response. + _latestHeight = await ElectrumxChainHeightService + .completers[cryptoCurrency.coin]!.future; + + return; + }); } Future fetchTxCount({required String addressScriptHash}) async {