use mutex to control race conditions

This commit is contained in:
sneurlax 2024-02-14 15:23:36 -06:00
parent 604f175a43
commit fb79cd867c

View file

@ -5,6 +5,7 @@ import 'package:bip47/src/util.dart';
import 'package:bitcoindart/bitcoindart.dart' as bitcoindart; import 'package:bitcoindart/bitcoindart.dart' as bitcoindart;
import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib; import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib;
import 'package:isar/isar.dart'; import 'package:isar/isar.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart';
@ -824,33 +825,35 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
} }
} }
// Mutex to control subscription management access.
static final Mutex _subMutex = Mutex();
Future<void> _manageChainHeightSubscription() async { Future<void> _manageChainHeightSubscription() async {
// Set the timeout period for the chain height subscription. await _subMutex.protect(() async {
const timeout = Duration(seconds: 10); // Set the timeout period for the chain height subscription.
const timeout = Duration(seconds: 10);
if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
null) {
// No subscription exists for this coin yet, so create one.
//
// Set up to wait for the first response.
final Completer<int> completer = Completer<int>();
ElectrumxChainHeightService.completers[cryptoCurrency.coin] ??= completer;
// Make sure we only complete once.
final isFirstResponse = _latestHeight == null;
// Subscribe to block headers.
final subscription =
subscribableElectrumXClient.subscribeToBlockHeaders();
// Set the time the subscription was created.
final subscriptionCreationTime = DateTime.now();
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] =
subscriptionCreationTime;
// Doublecheck to avoid race condition.
if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
null) { null) {
// No subscription exists for this coin yet, so create one.
//
// Set up to wait for the first response.
final Completer<int> completer = Completer<int>();
ElectrumxChainHeightService.completers[cryptoCurrency.coin] ??=
completer;
// Make sure we only complete once.
final isFirstResponse = _latestHeight == null;
// Subscribe to block headers.
final subscription =
subscribableElectrumXClient.subscribeToBlockHeaders();
// Set the time the subscription was created.
final subscriptionCreationTime = DateTime.now();
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] =
subscriptionCreationTime;
// Set stream subscription. // Set stream subscription.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
subscription.responseStream.asBroadcastStream().listen((event) { subscription.responseStream.asBroadcastStream().listen((event) {
@ -880,77 +883,79 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
} }
}); });
} }
}
// A subscription already exists. // A subscription already exists.
// //
// Resume the stream subscription if it's paused. // Resume the stream subscription if it's paused.
if (ElectrumxChainHeightService if (ElectrumxChainHeightService
.subscriptions[cryptoCurrency.coin]!.isPaused) { .subscriptions[cryptoCurrency.coin]!.isPaused) {
// If it's paused, resume it. // If it's paused, resume it.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!.resume(); ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
} .resume();
}
// Causes synchronization to stall. // Causes synchronization to stall.
// // Check if the stream subscription is active by pinging it. // // Check if the stream subscription is active by pinging it.
// if (!(await subscribableElectrumXClient.ping())) { // if (!(await subscribableElectrumXClient.ping())) {
// // If it's not active, reconnect it. // // If it's not active, reconnect it.
// final node = await getCurrentElectrumXNode(); // final node = await getCurrentElectrumXNode();
// //
// await subscribableElectrumXClient.connect( // await subscribableElectrumXClient.connect(
// host: node.address, port: node.port); // host: node.address, port: node.port);
// //
// // Wait for first response. // // Wait for first response.
// return completer.future; // return completer.future;
// } // }
// If there's no completer set for this coin, something's gone wrong. // If there's no completer set for this coin, something's gone wrong.
// //
// The completer is always set before the subscription, so this should // The completer is always set before the subscription, so this should
// never happen. // never happen.
if (ElectrumxChainHeightService.completers[cryptoCurrency.coin] == null) { if (ElectrumxChainHeightService.completers[cryptoCurrency.coin] == null) {
// Clear this coin's subscription.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.cancel();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null;
// Retry/recurse.
return await _manageChainHeightSubscription();
}
// Check if the subscription has been running for too long.
if (ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] != null) {
final timeRunning = DateTime.now().difference(
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin]!);
// Cancel and retry if we've been waiting too long.
if (timeRunning > timeout) {
// Clear this coin's subscription. // Clear this coin's subscription.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.cancel(); .cancel();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null; ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null;
// Clear this coin's completer.
ElectrumxChainHeightService.completers[cryptoCurrency.coin]
?.completeError(
Exception(
"Subscription to block headers has been running for too long",
),
);
ElectrumxChainHeightService.completers[cryptoCurrency.coin] = null;
// Reset time started.
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = null;
// Retry/recurse. // Retry/recurse.
return await _manageChainHeightSubscription(); return await _manageChainHeightSubscription();
} }
}
// Wait for the first response. // Check if the subscription has been running for too long.
_latestHeight = await ElectrumxChainHeightService if (ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] !=
.completers[cryptoCurrency.coin]!.future; null) {
final timeRunning = DateTime.now().difference(
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin]!);
// Cancel and retry if we've been waiting too long.
if (timeRunning > timeout) {
// Clear this coin's subscription.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.cancel();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null;
return; // Clear this coin's completer.
ElectrumxChainHeightService.completers[cryptoCurrency.coin]
?.completeError(
Exception(
"Subscription to block headers has been running for too long",
),
);
ElectrumxChainHeightService.completers[cryptoCurrency.coin] = null;
// Reset time started.
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = null;
// Retry/recurse.
return await _manageChainHeightSubscription();
}
}
// Wait for the first response.
_latestHeight = await ElectrumxChainHeightService
.completers[cryptoCurrency.coin]!.future;
return;
});
} }
Future<int> fetchTxCount({required String addressScriptHash}) async { Future<int> fetchTxCount({required String addressScriptHash}) async {