store completers similarly to subscriptions so we can await them

This commit is contained in:
sneurlax 2024-02-14 11:07:27 -06:00
parent 7363438279
commit fc0d9639b8
2 changed files with 96 additions and 67 deletions

View file

@ -7,4 +7,8 @@ abstract class ElectrumxChainHeightService {
static Map<Coin, StreamSubscription<dynamic>?> subscriptions = {}; static Map<Coin, StreamSubscription<dynamic>?> subscriptions = {};
// Used to hold chain height subscriptions for each coin as in: // Used to hold chain height subscriptions for each coin as in:
// ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = sub; // ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = sub;
static Map<Coin, Completer<int>?> completers = {};
// Used to hold chain height completers for each coin as in:
// ElectrumxChainHeightService.completers[cryptoCurrency.coin] = completer;
} }

View file

@ -805,25 +805,79 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
Future<int> fetchChainHeight() async { Future<int> fetchChainHeight() async {
try { try {
// Don't set a stream subscription if one already exists. // Don't set a stream subscription if one already exists.
await _manageChainHeightSubscription();
if (_latestHeight == null) {
// Probably waiting on the subscription to receive the latest block
// height, fallback to cached value
return info.cachedChainHeight;
} else {
return _latestHeight!;
}
} catch (e, s) {
Logging.instance.log(
"Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s",
level: LogLevel.Error);
// completer.completeError(e, s);
// return Future.error(e, s);
rethrow;
}
}
Future<void> _manageChainHeightSubscription() async {
if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
null) { null) {
return _manageChainHeightSubscription(); // No subscription exists for this coin yet, so create one.
//
// Set up to wait for the first response.
final Completer<int> completer = Completer<int>();
ElectrumxChainHeightService.completers[cryptoCurrency.coin] ??= completer;
// Make sure we only complete once.
final isFirstResponse = _latestHeight == null;
// Subscribe to block headers.
final subscription =
subscribableElectrumXClient.subscribeToBlockHeaders();
// Set stream subscription.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
subscription.responseStream.asBroadcastStream().listen((event) {
final response = event;
if (response != null &&
response is Map &&
response.containsKey('height')) {
final int chainHeight = response['height'] as int;
// print("Current chain height: $chainHeight");
_latestHeight = chainHeight;
if (isFirstResponse) {
// If the completer is not completed, complete it.
if (!ElectrumxChainHeightService
.completers[cryptoCurrency.coin]!.isCompleted) {
// Complete the completer, returning the chain height.
ElectrumxChainHeightService.completers[cryptoCurrency.coin]!
.complete(chainHeight);
} }
// Don't set a stream subscription if one already exists. }
else { } else {
//IF there's already a wallet for a coin the chain height might not be Logging.instance.log(
// stored for current wallet "blockchain.headers.subscribe returned malformed response\n"
// Check if the stream subscription is paused. "Response: $response",
level: LogLevel.Error);
}
});
} else {
// A subscription already exists.
//
// Resume the stream subscription if it's paused.
if (ElectrumxChainHeightService if (ElectrumxChainHeightService
.subscriptions[cryptoCurrency.coin]!.isPaused) { .subscriptions[cryptoCurrency.coin]!.isPaused) {
// If it's paused, resume it. // If it's paused, resume it.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]! ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.resume(); .resume();
} }
if (_latestHeight == null) {
//Get the chain height
return _manageChainHeightSubscription();
}
// Causes synchronization to stall. // Causes synchronization to stall.
// // Check if the stream subscription is active by pinging it. // // Check if the stream subscription is active by pinging it.
@ -838,58 +892,29 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// return completer.future; // return completer.future;
// } // }
if (_latestHeight != null) { // If there's no completer set for this coin, something's gone wrong.
return _latestHeight!; //
// The completer is always set before the subscription, so this should
// never happen.
if (ElectrumxChainHeightService.completers[cryptoCurrency.coin] == null) {
// Clear this coin's subscription.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.cancel();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = null;
// Clear this coin's completer.
ElectrumxChainHeightService.completers[cryptoCurrency.coin] = null;
// Retry/recurse.
return await _manageChainHeightSubscription();
} }
} }
// Probably waiting on the subscription to receive the latest block height // Wait for the first response.
// fallback to cached value _latestHeight = await ElectrumxChainHeightService
return info.cachedChainHeight; .completers[cryptoCurrency.coin]!.future;
} catch (e, s) {
Logging.instance.log(
"Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s",
level: LogLevel.Error);
// completer.completeError(e, s);
// return Future.error(e, s);
rethrow;
}
}
Future<int> _manageChainHeightSubscription() async { return;
final Completer<int> completer = Completer<int>();
// Make sure we only complete once.
final isFirstResponse = _latestHeight == null;
// Subscribe to block headers.
final subscription =
subscribableElectrumXClient.subscribeToBlockHeaders();
// set stream subscription
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
subscription.responseStream.asBroadcastStream().listen((event) {
final response = event;
if (response != null &&
response is Map &&
response.containsKey('height')) {
final int chainHeight = response['height'] as int;
// print("Current chain height: $chainHeight");
_latestHeight = chainHeight;
if (isFirstResponse && !completer.isCompleted) {
// Return the chain height.
completer.complete(chainHeight);
}
} else {
Logging.instance.log(
"blockchain.headers.subscribe returned malformed response\n"
"Response: $response",
level: LogLevel.Error);
}
});
return _latestHeight ?? await completer.future;
} }
Future<int> fetchTxCount({required String addressScriptHash}) async { Future<int> fetchTxCount({required String addressScriptHash}) async {