avoid race condition

This commit is contained in:
sneurlax 2024-02-14 11:51:59 -06:00
parent 98c095b568
commit e979a352fb

View file

@ -848,71 +848,74 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] =
subscriptionCreationTime; subscriptionCreationTime;
// Set stream subscription. // Doublecheck to avoid race condition.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
subscription.responseStream.asBroadcastStream().listen((event) { null) {
final response = event; // Set stream subscription.
if (response != null && ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
response is Map && subscription.responseStream.asBroadcastStream().listen((event) {
response.containsKey('height')) { final response = event;
final int chainHeight = response['height'] as int; if (response != null &&
// print("Current chain height: $chainHeight"); response is Map &&
response.containsKey('height')) {
final int chainHeight = response['height'] as int;
// print("Current chain height: $chainHeight");
_latestHeight = chainHeight; _latestHeight = chainHeight;
if (isFirstResponse) { if (isFirstResponse) {
// If the completer is not completed, complete it. // If the completer is not completed, complete it.
if (!ElectrumxChainHeightService if (!ElectrumxChainHeightService
.completers[cryptoCurrency.coin]!.isCompleted) { .completers[cryptoCurrency.coin]!.isCompleted) {
// Complete the completer, returning the chain height. // Complete the completer, returning the chain height.
ElectrumxChainHeightService.completers[cryptoCurrency.coin]! ElectrumxChainHeightService.completers[cryptoCurrency.coin]!
.complete(chainHeight); .complete(chainHeight);
}
} }
} else {
Logging.instance.log(
"blockchain.headers.subscribe returned malformed response\n"
"Response: $response",
level: LogLevel.Error);
} }
} else { });
Logging.instance.log(
"blockchain.headers.subscribe returned malformed response\n"
"Response: $response",
level: LogLevel.Error);
}
});
} else {
// A subscription already exists.
//
// Resume the stream subscription if it's paused.
if (ElectrumxChainHeightService
.subscriptions[cryptoCurrency.coin]!.isPaused) {
// If it's paused, resume it.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.resume();
} }
}
// Causes synchronization to stall. // A subscription already exists.
// // Check if the stream subscription is active by pinging it. //
// if (!(await subscribableElectrumXClient.ping())) { // Resume the stream subscription if it's paused.
// // If it's not active, reconnect it. if (ElectrumxChainHeightService
// final node = await getCurrentElectrumXNode(); .subscriptions[cryptoCurrency.coin]!.isPaused) {
// // If it's paused, resume it.
// await subscribableElectrumXClient.connect( ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!.resume();
// host: node.address, port: node.port); }
//
// // Wait for first response.
// return completer.future;
// }
// If there's no completer set for this coin, something's gone wrong. // Causes synchronization to stall.
// // // Check if the stream subscription is active by pinging it.
// The completer is always set before the subscription, so this should // if (!(await subscribableElectrumXClient.ping())) {
// never happen. // // If it's not active, reconnect it.
if (ElectrumxChainHeightService.completers[cryptoCurrency.coin] == null) { // final node = await getCurrentElectrumXNode();
// Clear this coin's subscription. //
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! // await subscribableElectrumXClient.connect(
.cancel(); // host: node.address, port: node.port);
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null; //
// // Wait for first response.
// return completer.future;
// }
// Retry/recurse. // If there's no completer set for this coin, something's gone wrong.
return await _manageChainHeightSubscription(); //
} // The completer is always set before the subscription, so this should
// never happen.
if (ElectrumxChainHeightService.completers[cryptoCurrency.coin] == null) {
// Clear this coin's subscription.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.cancel();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null;
// Retry/recurse.
return await _manageChainHeightSubscription();
} }
// Check if the subscription has been running for too long. // Check if the subscription has been running for too long.