remove recursion to resolve deadlock issue

This commit is contained in:
sneurlax 2024-02-14 15:56:34 -06:00
parent fb79cd867c
commit e58a614729
2 changed files with 52 additions and 118 deletions

View file

@ -11,8 +11,4 @@ abstract class ElectrumxChainHeightService {
// Used to hold chain height completers for each coin as in: // Used to hold chain height completers for each coin as in:
// ElectrumxChainHeightService.completers[cryptoCurrency.coin] = completer; // ElectrumxChainHeightService.completers[cryptoCurrency.coin] = completer;
static Map<Coin, Completer<int>?> completers = {}; static Map<Coin, Completer<int>?> completers = {};
// Used to hold the time each coin started waiting for chain height as in:
// ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = time;
static Map<Coin, DateTime?> timeStarted = {};
} }

View file

@ -808,13 +808,7 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// Don't set a stream subscription if one already exists. // Don't set a stream subscription if one already exists.
await _manageChainHeightSubscription(); await _manageChainHeightSubscription();
if (_latestHeight == null) { return _latestHeight ?? info.cachedChainHeight;
// Probably waiting on the subscription to receive the latest block
// height, fallback to cached value
return info.cachedChainHeight;
} else {
return _latestHeight!;
}
} catch (e, s) { } catch (e, s) {
Logging.instance.log( Logging.instance.log(
"Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s", "Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s",
@ -829,132 +823,76 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
static final Mutex _subMutex = Mutex(); static final Mutex _subMutex = Mutex();
Future<void> _manageChainHeightSubscription() async { Future<void> _manageChainHeightSubscription() async {
await _subMutex.protect(() async { // Set the timeout period for the chain height subscription.
// Set the timeout period for the chain height subscription. const timeout = Duration(seconds: 10);
const timeout = Duration(seconds: 10);
await _subMutex.protect(() async {
if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
null) { null) {
// No subscription exists for this coin yet, so create one. await _createSubscription();
// } else if (ElectrumxChainHeightService
// Set up to wait for the first response.
final Completer<int> completer = Completer<int>();
ElectrumxChainHeightService.completers[cryptoCurrency.coin] ??=
completer;
// Make sure we only complete once.
final isFirstResponse = _latestHeight == null;
// Subscribe to block headers.
final subscription =
subscribableElectrumXClient.subscribeToBlockHeaders();
// Set the time the subscription was created.
final subscriptionCreationTime = DateTime.now();
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] =
subscriptionCreationTime;
// Set stream subscription.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
subscription.responseStream.asBroadcastStream().listen((event) {
final response = event;
if (response != null &&
response is Map &&
response.containsKey('height')) {
final int chainHeight = response['height'] as int;
// print("Current chain height: $chainHeight");
_latestHeight = chainHeight;
if (isFirstResponse) {
// If the completer is not completed, complete it.
if (!ElectrumxChainHeightService
.completers[cryptoCurrency.coin]!.isCompleted) {
// Complete the completer, returning the chain height.
ElectrumxChainHeightService.completers[cryptoCurrency.coin]!
.complete(chainHeight);
}
}
} else {
Logging.instance.log(
"blockchain.headers.subscribe returned malformed response\n"
"Response: $response",
level: LogLevel.Error);
}
});
}
// A subscription already exists.
//
// Resume the stream subscription if it's paused.
if (ElectrumxChainHeightService
.subscriptions[cryptoCurrency.coin]!.isPaused) { .subscriptions[cryptoCurrency.coin]!.isPaused) {
// If it's paused, resume it.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.resume(); .resume();
} }
});
// Causes synchronization to stall. // Ensure _latestHeight is updated before proceeding.
// // Check if the stream subscription is active by pinging it. if (_latestHeight == null &&
// if (!(await subscribableElectrumXClient.ping())) { ElectrumxChainHeightService.completers[cryptoCurrency.coin] != null) {
// // If it's not active, reconnect it. try {
// final node = await getCurrentElectrumXNode(); // Use a timeout to wait for the completer to avoid indefinite blocking.
// _latestHeight = await ElectrumxChainHeightService
// await subscribableElectrumXClient.connect( .completers[cryptoCurrency.coin]!.future
// host: node.address, port: node.port); .timeout(timeout);
// } catch (e) {
// // Wait for first response. Logging.instance
// return completer.future; .log("Timeout waiting for chain height", level: LogLevel.Error);
// }
// If there's no completer set for this coin, something's gone wrong.
//
// The completer is always set before the subscription, so this should
// never happen.
if (ElectrumxChainHeightService.completers[cryptoCurrency.coin] == null) {
// Clear this coin's subscription. // Clear this coin's subscription.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.cancel(); .cancel();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null; ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null;
// Retry/recurse.
return await _manageChainHeightSubscription();
} }
}
}
// Check if the subscription has been running for too long. Future<void> _createSubscription() async {
if (ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] != final completer = Completer<int>();
null) { ElectrumxChainHeightService.completers[cryptoCurrency.coin] = completer;
final timeRunning = DateTime.now().difference(
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin]!);
// Cancel and retry if we've been waiting too long.
if (timeRunning > timeout) {
// Clear this coin's subscription.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.cancel();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null;
// Clear this coin's completer. // Make sure we only complete once.
ElectrumxChainHeightService.completers[cryptoCurrency.coin] final isFirstResponse = _latestHeight == null;
?.completeError(
Exception(
"Subscription to block headers has been running for too long",
),
);
ElectrumxChainHeightService.completers[cryptoCurrency.coin] = null;
// Reset time started. // Subscribe to block headers.
ElectrumxChainHeightService.timeStarted[cryptoCurrency.coin] = null; final subscription = subscribableElectrumXClient.subscribeToBlockHeaders();
// Retry/recurse. // Set stream subscription.
return await _manageChainHeightSubscription(); ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
subscription.responseStream.asBroadcastStream().listen((event) {
final response = event;
if (response != null &&
response is Map &&
response.containsKey('height')) {
final int chainHeight = response['height'] as int;
// print("Current chain height: $chainHeight");
_latestHeight = chainHeight;
if (isFirstResponse) {
// If the completer is not completed, complete it.
if (!ElectrumxChainHeightService
.completers[cryptoCurrency.coin]!.isCompleted) {
// Complete the completer, returning the chain height.
ElectrumxChainHeightService.completers[cryptoCurrency.coin]!
.complete(chainHeight);
}
} }
} else {
Logging.instance.log(
"blockchain.headers.subscribe returned malformed response\n"
"Response: $response",
level: LogLevel.Error);
} }
// Wait for the first response.
_latestHeight = await ElectrumxChainHeightService
.completers[cryptoCurrency.coin]!.future;
return;
}); });
} }