INCOMPLETE: Untested refactor to reduce number of chain subscriptions and simply the management thereof

This commit is contained in:
julian 2024-02-17 15:47:53 +07:00
parent f9a8399d05
commit be8ef772b0
3 changed files with 70 additions and 60 deletions

View file

@ -1,14 +1,57 @@
import 'dart:async';
import 'package:electrum_adapter/electrum_adapter.dart';
import 'package:stackwallet/utilities/enums/coin_enum.dart';
/// Store chain height subscriptions for each coin.
abstract class ElectrumxChainHeightService {
// Used to hold chain height subscriptions for each coin as in:
// ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = sub;
static Map<Coin, StreamSubscription<dynamic>?> subscriptions = {};
/// Manage chain height subscriptions for each coin.
abstract class ChainHeightServiceManager {
static final Map<Coin, ChainHeightService> _services = {};
// Used to hold chain height completers for each coin as in:
// ElectrumxChainHeightService.completers[cryptoCurrency.coin] = completer;
static Map<Coin, Completer<int>?> completers = {};
static ChainHeightService? getService(Coin coin) {
return _services[coin];
}
static void add(ChainHeightService service, Coin coin) {
if (_services[coin] == null) {
_services[coin] = service;
} else {
throw Exception("Chain height service for $coin already managed");
}
}
}
// Basic untested impl. Needs error handling and branching to handle
// various other scenarios
class ChainHeightService {
ElectrumClient client;
StreamSubscription<dynamic>? _subscription;
bool get started => _subscription != null;
int? _height;
int? get height => _height;
ChainHeightService({required this.client});
Future<int> fetchHeightAndStartListenForUpdates() async {
if (_subscription != null) {
throw Exception(
"Attempted to start a chain height service where an existing"
" subscription already exists!",
);
}
final completer = Completer<int>();
_subscription = client.subscribeHeaders().listen((event) {
_height = event.height;
if (!completer.isCompleted) {
completer.complete(_height);
}
});
return completer.future;
}
/// Untested/Unknown implications. USE AT OWN RISK
Future<void> cancelListen() async => await _subscription?.cancel();
}

View file

@ -4,7 +4,6 @@ import 'package:isar/isar.dart';
import 'package:meta/meta.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/db/isar/main_db.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/models/isar/models/blockchain_data/address.dart';
import 'package:stackwallet/models/isar/models/ethereum/eth_contract.dart';
import 'package:stackwallet/models/node_model.dart';
@ -617,9 +616,10 @@ abstract class Wallet<T extends CryptoCurrency> {
switch (prefs.syncType) {
case SyncingType.currentWalletOnly:
// Close the subscription for this coin's chain height.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]
?.cancel();
// Close the subscription for this coin's chain height.
// NOTE: This does not work now that the subscription is shared
// await (await ChainHeightServiceManager.getService(cryptoCurrency.coin))
// ?.cancelListen();
case SyncingType.selectedWalletsAtStartup:
// Close the subscription if this wallet is not in the list to be synced.
if (!prefs.walletIdsSyncOnStartup.contains(walletId)) {
@ -639,8 +639,10 @@ abstract class Wallet<T extends CryptoCurrency> {
// If there are no other wallets of this coin, then close the sub.
if (walletIds.isEmpty) {
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]
?.cancel();
// NOTE: This does not work now that the subscription is shared
// await (await ChainHeightServiceManager.getService(
// cryptoCurrency.coin))
// ?.cancelListen();
}
}
case SyncingType.allWalletsOnStartup:

View file

@ -44,8 +44,6 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
int? get maximumFeerate => null;
int? _latestHeight;
StreamSubscription<TorPreferenceChangedEvent>? _torPreferenceListener;
StreamSubscription<TorConnectionStatusChangedEvent>? _torStatusListener;
@ -815,53 +813,20 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
Future<int> fetchChainHeight() async {
try {
// _checkChainHeightSubscription();
// TODO above. Make sure that the subscription/stream is alive.
ChainHeightService? service = ChainHeightServiceManager.getService(
cryptoCurrency.coin,
);
// Don't set a stream subscription if one already exists.
if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
null) {
final Completer<int> completer = Completer<int>();
// Make sure we only complete once.
final isFirstResponse = _latestHeight == null;
await electrumXClient.checkElectrumAdapter();
// TODO [prio=extreme]: Does this update anything in this file?? Thinking no.
final stream = electrumAdapterClient.subscribeHeaders();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
stream.asBroadcastStream().listen((response) {
final int chainHeight = response.height;
// print("Current chain height: $chainHeight");
_latestHeight = chainHeight;
if (isFirstResponse && !completer.isCompleted) {
// Return the chain height.
completer.complete(chainHeight);
}
});
} else {
// Don't set a stream subscription if one already exists.
// Check if the stream subscription is paused.
if (ElectrumxChainHeightService
.subscriptions[cryptoCurrency.coin]!.isPaused) {
// If it's paused, resume it.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.resume();
}
if (_latestHeight != null) {
return _latestHeight!;
}
if (service == null) {
service = ChainHeightService(client: electrumAdapterClient);
ChainHeightServiceManager.add(service, cryptoCurrency.coin);
}
// Probably waiting on the subscription to receive the latest block height
// fallback to cached value
return info.cachedChainHeight;
if (!service.started) {
return await service.fetchHeightAndStartListenForUpdates();
}
return service.height ?? info.cachedChainHeight;
} catch (e, s) {
Logging.instance.log(
"Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s",