listen to tor and preferences changes and handle connections accordingly

This commit is contained in:
sneurlax 2024-02-16 16:33:19 -06:00
parent 75ca3d489b
commit a807303eba

View file

@ -7,6 +7,7 @@ import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib;
import 'package:electrum_adapter/electrum_adapter.dart' as electrum_adapter;
import 'package:electrum_adapter/electrum_adapter.dart';
import 'package:isar/isar.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_client.dart';
@ -18,6 +19,7 @@ import 'package:stackwallet/models/paymint/fee_object_model.dart';
import 'package:stackwallet/models/signing_data.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/global_event_bus.dart';
import 'package:stackwallet/services/tor_service.dart';
import 'package:stackwallet/utilities/amount/amount.dart';
import 'package:stackwallet/utilities/enums/coin_enum.dart';
@ -45,8 +47,15 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
int? _latestHeight;
late Prefs _prefs;
late TorService _torService;
StreamSubscription<TorPreferenceChangedEvent>? _torPreferenceListener;
StreamSubscription<TorConnectionStatusChangedEvent>? _torStatusListener;
final Mutex _torConnectingLock = Mutex();
bool _requireMutex = false;
Timer? _aliveTimer;
static const Duration _keepAlive = Duration(minutes: 1);
bool _isConnected = false;
static const _kServerBatchCutoffVersion = [1, 6];
List<int>? _serverVersion;
@ -814,9 +823,6 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
Future<int> fetchChainHeight() async {
try {
// _checkChainHeightSubscription();
// TODO above. Make sure that the subscription/stream is alive.
// Don't set a stream subscription if one already exists.
if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
null) {
@ -825,11 +831,24 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// Make sure we only complete once.
final isFirstResponse = _latestHeight == null;
// Check Electrum and update internal and cached versions if necessary.
await electrumXClient.checkElectrumAdapter();
// TODO [prio=extreme]: Does this update anything in this file?? Thinking no.
if (electrumAdapterChannel != electrumXClient.electrumAdapterChannel &&
electrumXClient.electrumAdapterChannel != null) {
electrumAdapterChannel = electrumXClient.electrumAdapterChannel!;
}
if (electrumAdapterClient != electrumXClient.electrumAdapterClient &&
electrumXClient.electrumAdapterClient != null) {
electrumAdapterClient = electrumXClient.electrumAdapterClient!;
}
// electrumXCachedClient.electrumAdapterChannel = electrumAdapterChannel;
if (electrumXCachedClient.electrumAdapterClient !=
electrumAdapterClient) {
electrumXCachedClient.electrumAdapterClient = electrumAdapterClient;
}
// Subscribe to and listen for new block headers.
final stream = electrumAdapterClient.subscribeHeaders();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
stream.asBroadcastStream().listen((response) {
final int chainHeight = response.height;
@ -842,6 +861,61 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
completer.complete(chainHeight);
}
});
// If we're testing, use the global event bus for testing.
// final bus = globalEventBusForTesting ?? GlobalEventBus.instance;
// No constructors for mixins, so no globalEventBusForTesting is passed in.
final bus = GlobalEventBus.instance;
// Listen to global event bus for Tor status changes.
_torStatusListener ??= bus.on<TorConnectionStatusChangedEvent>().listen(
(event) async {
try {
switch (event.newStatus) {
case TorConnectionStatus.connecting:
// If Tor is connecting, we need to wait.
await _torConnectingLock.acquire();
_requireMutex = true;
break;
case TorConnectionStatus.connected:
case TorConnectionStatus.disconnected:
// If Tor is connected or disconnected, we can release the lock.
if (_torConnectingLock.isLocked) {
_torConnectingLock.release();
}
_requireMutex = false;
break;
}
} finally {
// Ensure the lock is released.
if (_torConnectingLock.isLocked) {
_torConnectingLock.release();
}
}
},
);
// Listen to global event bus for Tor preference changes.
_torPreferenceListener ??= bus.on<TorPreferenceChangedEvent>().listen(
(event) async {
// Close any open subscriptions.
for (final coinSub
in ElectrumxChainHeightService.subscriptions.entries) {
await coinSub.value?.cancel();
}
// Cancel alive timer
_aliveTimer?.cancel();
},
);
// Set a timer to check if the subscription is still alive.
_aliveTimer?.cancel();
_aliveTimer = Timer.periodic(
_keepAlive,
(_) async => _updateConnectionStatus(await electrumXClient.ping()),
);
} else {
// Don't set a stream subscription if one already exists.
@ -977,6 +1051,15 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// host: newNode.address, port: newNode.port);
}
/// Update the connection status and call the onConnectionStatusChanged callback if it exists.
void _updateConnectionStatus(bool connectionStatus) {
// TODO [prio=low]: Set onConnectionStatusChanged callback.
// if (_isConnected != connectionStatus && onConnectionStatusChanged != null) {
// onConnectionStatusChanged!(connectionStatus);
// }
_isConnected = connectionStatus;
}
//============================================================================
Future<({List<Address> addresses, int index})> checkGapsBatched(