close chain height subscriptions on tor connection preference change

This commit is contained in:
sneurlax 2024-02-19 14:32:43 -06:00
parent 0f8d51657f
commit 494a1a9ba6
3 changed files with 50 additions and 25 deletions

View file

@ -5,35 +5,63 @@ import 'package:stackwallet/utilities/enums/coin_enum.dart';
/// Manage chain height subscriptions for each coin. /// Manage chain height subscriptions for each coin.
abstract class ChainHeightServiceManager { abstract class ChainHeightServiceManager {
// A map of chain height services for each coin.
static final Map<Coin, ChainHeightService> _services = {}; static final Map<Coin, ChainHeightService> _services = {};
// Map<Coin, ChainHeightService> get services => _services;
// Get the chain height service for a specific coin.
static ChainHeightService? getService(Coin coin) { static ChainHeightService? getService(Coin coin) {
return _services[coin]; return _services[coin];
} }
// Add a chain height service for a specific coin.
static void add(ChainHeightService service, Coin coin) { static void add(ChainHeightService service, Coin coin) {
// Don't add a new service if one already exists.
if (_services[coin] == null) { if (_services[coin] == null) {
_services[coin] = service; _services[coin] = service;
} else { } else {
throw Exception("Chain height service for $coin already managed"); throw Exception("Chain height service for $coin already managed");
} }
} }
// Remove a chain height service for a specific coin.
static void remove(Coin coin) {
_services.remove(coin);
} }
// Basic untested impl. Needs error handling and branching to handle // Close all subscriptions and clean up resources.
// various other scenarios static Future<void> dispose() async {
// Close each subscription.
for (final coin in _services.keys) {
final ChainHeightService? service = getService(coin);
await service?.cancelListen();
remove(coin);
}
}
}
/// A service to fetch and listen for chain height updates.
///
/// TODO: Add error handling and branching to handle various other scenarios.
class ChainHeightService { class ChainHeightService {
// The electrum_adapter client to use for fetching chain height updates.
ElectrumClient client; ElectrumClient client;
// The subscription to listen for chain height updates.
StreamSubscription<dynamic>? _subscription; StreamSubscription<dynamic>? _subscription;
// Whether the service has started listening for updates.
bool get started => _subscription != null; bool get started => _subscription != null;
// The current chain height.
int? _height; int? _height;
int? get height => _height; int? get height => _height;
ChainHeightService({required this.client}); ChainHeightService({required this.client});
/// Fetch the current chain height and start listening for updates.
Future<int> fetchHeightAndStartListenForUpdates() async { Future<int> fetchHeightAndStartListenForUpdates() async {
// Don't start a new subscription if one already exists.
if (_subscription != null) { if (_subscription != null) {
throw Exception( throw Exception(
"Attempted to start a chain height service where an existing" "Attempted to start a chain height service where an existing"
@ -41,17 +69,22 @@ class ChainHeightService {
); );
} }
// A completer to wait for the current chain height to be fetched.
final completer = Completer<int>(); final completer = Completer<int>();
_subscription = client.subscribeHeaders().listen((event) {
// Fetch the current chain height.
_subscription = client.subscribeHeaders().listen((BlockHeader event) {
_height = event.height; _height = event.height;
if (!completer.isCompleted) { if (!completer.isCompleted) {
completer.complete(_height); completer.complete(_height);
} }
}); });
// Wait for the current chain height to be fetched.
return completer.future; return completer.future;
} }
/// Untested/Unknown implications. USE AT OWN RISK /// Stop listening for chain height updates.
Future<void> cancelListen() async => await _subscription?.cancel(); Future<void> cancelListen() async => await _subscription?.cancel();
} }

View file

@ -20,6 +20,7 @@ import 'package:event_bus/event_bus.dart';
import 'package:flutter/foundation.dart'; import 'package:flutter/foundation.dart';
import 'package:flutter_libsparkmobile/flutter_libsparkmobile.dart'; import 'package:flutter_libsparkmobile/flutter_libsparkmobile.dart';
import 'package:mutex/mutex.dart'; import 'package:mutex/mutex.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/electrumx_rpc/rpc.dart'; import 'package:stackwallet/electrumx_rpc/rpc.dart';
import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart'; import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart';
@ -127,6 +128,8 @@ class ElectrumXClient {
_coin = coin; _coin = coin;
final bus = globalEventBusForTesting ?? GlobalEventBus.instance; final bus = globalEventBusForTesting ?? GlobalEventBus.instance;
// Listen for tor status changes.
_torStatusListener = bus.on<TorConnectionStatusChangedEvent>().listen( _torStatusListener = bus.on<TorConnectionStatusChangedEvent>().listen(
(event) async { (event) async {
switch (event.newStatus) { switch (event.newStatus) {
@ -145,6 +148,8 @@ class ElectrumXClient {
} }
}, },
); );
// Listen for tor preference changes.
_torPreferenceListener = bus.on<TorPreferenceChangedEvent>().listen( _torPreferenceListener = bus.on<TorPreferenceChangedEvent>().listen(
(event) async { (event) async {
// not sure if we need to do anything specific here // not sure if we need to do anything specific here
@ -157,6 +162,9 @@ class ElectrumXClient {
// on the next request sent through this electrumx instance // on the next request sent through this electrumx instance
_electrumAdapterChannel = null; _electrumAdapterChannel = null;
_electrumAdapterClient = null; _electrumAdapterClient = null;
// Also close any chain height services that are currently open.
await ChainHeightServiceManager.dispose();
}, },
); );
} }

View file

@ -7,7 +7,6 @@ import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib;
import 'package:electrum_adapter/electrum_adapter.dart' as electrum_adapter; import 'package:electrum_adapter/electrum_adapter.dart' as electrum_adapter;
import 'package:electrum_adapter/electrum_adapter.dart'; import 'package:electrum_adapter/electrum_adapter.dart';
import 'package:isar/isar.dart'; import 'package:isar/isar.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart';
@ -17,9 +16,6 @@ import 'package:stackwallet/models/isar/models/blockchain_data/v2/transaction_v2
import 'package:stackwallet/models/isar/models/isar_models.dart'; import 'package:stackwallet/models/isar/models/isar_models.dart';
import 'package:stackwallet/models/paymint/fee_object_model.dart'; import 'package:stackwallet/models/paymint/fee_object_model.dart';
import 'package:stackwallet/models/signing_data.dart'; import 'package:stackwallet/models/signing_data.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/global_event_bus.dart';
import 'package:stackwallet/services/tor_service.dart'; import 'package:stackwallet/services/tor_service.dart';
import 'package:stackwallet/utilities/amount/amount.dart'; import 'package:stackwallet/utilities/amount/amount.dart';
import 'package:stackwallet/utilities/enums/coin_enum.dart'; import 'package:stackwallet/utilities/enums/coin_enum.dart';
@ -42,17 +38,10 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
late ElectrumClient electrumAdapterClient; late ElectrumClient electrumAdapterClient;
late CachedElectrumXClient electrumXCachedClient; late CachedElectrumXClient electrumXCachedClient;
// late SubscribableElectrumXClient subscribableElectrumXClient; // late SubscribableElectrumXClient subscribableElectrumXClient;
late ChainHeightServiceManager chainHeightServiceManager;
int? get maximumFeerate => null; int? get maximumFeerate => null;
StreamSubscription<TorPreferenceChangedEvent>? _torPreferenceListener;
StreamSubscription<TorConnectionStatusChangedEvent>? _torStatusListener;
final Mutex _torConnectingLock = Mutex();
bool _requireMutex = false;
Timer? _aliveTimer;
static const Duration _keepAlive = Duration(minutes: 1);
bool _isConnected = false;
static const _kServerBatchCutoffVersion = [1, 6]; static const _kServerBatchCutoffVersion = [1, 6];
List<int>? _serverVersion; List<int>? _serverVersion;
bool get serverCanBatch { bool get serverCanBatch {
@ -819,19 +808,23 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
Future<int> fetchChainHeight() async { Future<int> fetchChainHeight() async {
try { try {
// Get the chain height service for the current coin.
ChainHeightService? service = ChainHeightServiceManager.getService( ChainHeightService? service = ChainHeightServiceManager.getService(
cryptoCurrency.coin, cryptoCurrency.coin,
); );
// ... or create a new one if it doesn't exist.
if (service == null) { if (service == null) {
service = ChainHeightService(client: electrumAdapterClient); service = ChainHeightService(client: electrumAdapterClient);
ChainHeightServiceManager.add(service, cryptoCurrency.coin); ChainHeightServiceManager.add(service, cryptoCurrency.coin);
} }
// If the service hasn't been started, start it and fetch the chain height.
if (!service.started) { if (!service.started) {
return await service.fetchHeightAndStartListenForUpdates(); return await service.fetchHeightAndStartListenForUpdates();
} }
// Return the height as per the service if available or the cached height.
return service.height ?? info.cachedChainHeight; return service.height ?? info.cachedChainHeight;
} catch (e, s) { } catch (e, s) {
Logging.instance.log( Logging.instance.log(
@ -959,15 +952,6 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// host: newNode.address, port: newNode.port); // host: newNode.address, port: newNode.port);
} }
/// Update the connection status and call the onConnectionStatusChanged callback if it exists.
void _updateConnectionStatus(bool connectionStatus) {
// TODO [prio=low]: Set onConnectionStatusChanged callback.
// if (_isConnected != connectionStatus && onConnectionStatusChanged != null) {
// onConnectionStatusChanged!(connectionStatus);
// }
_isConnected = connectionStatus;
}
//============================================================================ //============================================================================
Future<({List<Address> addresses, int index})> checkGapsBatched( Future<({List<Address> addresses, int index})> checkGapsBatched(