Merge pull request #761 from cypherstack/electrum

Update chain height subscription, use new electrum_adapter version with Tor fixes, and update Tor package's SOCKSSocket class
This commit is contained in:
Diego Salazar 2024-02-19 18:02:54 -07:00 committed by GitHub
commit 123dc4bed1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 183 additions and 152 deletions

View file

@ -1,10 +1,149 @@
import 'dart:async';
import 'package:electrum_adapter/electrum_adapter.dart';
import 'package:stackwallet/utilities/enums/coin_enum.dart';
import 'package:stackwallet/utilities/logger.dart';
/// Store chain height subscriptions for each coin.
abstract class ElectrumxChainHeightService {
static Map<Coin, StreamSubscription<dynamic>?> subscriptions = {};
// Used to hold chain height subscriptions for each coin as in:
// ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = sub;
/// Manage chain height subscriptions for each coin.
abstract class ChainHeightServiceManager {
// A map of chain height services for each coin.
static final Map<Coin, ChainHeightService> _services = {};
// Map<Coin, ChainHeightService> get services => _services;
// Get the chain height service for a specific coin.
static ChainHeightService? getService(Coin coin) {
return _services[coin];
}
// Add a chain height service for a specific coin.
static void add(ChainHeightService service, Coin coin) {
// Don't add a new service if one already exists.
if (_services[coin] == null) {
_services[coin] = service;
} else {
throw Exception("Chain height service for $coin already managed");
}
}
// Remove a chain height service for a specific coin.
static void remove(Coin coin) {
_services.remove(coin);
}
// Close all subscriptions and clean up resources.
static Future<void> dispose() async {
// Close each subscription.
//
// Create a list of keys to avoid concurrent modification during iteration
var keys = List<Coin>.from(_services.keys);
// Iterate over the copy of the keys
for (final coin in keys) {
final ChainHeightService? service = getService(coin);
await service?.cancelListen();
remove(coin);
}
}
}
/// A service to fetch and listen for chain height updates.
///
/// TODO: Add error handling and branching to handle various other scenarios.
class ChainHeightService {
// The electrum_adapter client to use for fetching chain height updates.
ElectrumClient client;
// The subscription to listen for chain height updates.
StreamSubscription<dynamic>? _subscription;
// Whether the service has started listening for updates.
bool get started => _subscription != null;
// The current chain height.
int? _height;
int? get height => _height;
// Whether the service is currently reconnecting.
bool _isReconnecting = false;
// The reconnect timer.
Timer? _reconnectTimer;
// The reconnection timeout duration.
static const Duration _connectionTimeout = Duration(seconds: 10);
ChainHeightService({required this.client});
/// Fetch the current chain height and start listening for updates.
Future<int> fetchHeightAndStartListenForUpdates() async {
// Don't start a new subscription if one already exists.
if (_subscription != null) {
throw Exception(
"Attempted to start a chain height service where an existing"
" subscription already exists!",
);
}
// A completer to wait for the current chain height to be fetched.
final completer = Completer<int>();
// Fetch the current chain height.
_subscription = client.subscribeHeaders().listen((BlockHeader event) {
_height = event.height;
if (!completer.isCompleted) {
completer.complete(_height);
}
});
_subscription?.onError((dynamic error) {
_handleError(error);
});
// Wait for the current chain height to be fetched.
return completer.future;
}
/// Handle an error from the subscription.
void _handleError(dynamic error) {
Logging.instance.log(
"Error reconnecting for chain height: ${error.toString()}",
level: LogLevel.Error,
);
_subscription?.cancel();
_subscription = null;
_attemptReconnect();
}
/// Attempt to reconnect to the electrum server.
void _attemptReconnect() {
// Avoid multiple reconnection attempts.
if (_isReconnecting) return;
_isReconnecting = true;
// Attempt to reconnect.
unawaited(fetchHeightAndStartListenForUpdates().then((_) {
_isReconnecting = false;
}));
// Set a timer to on the reconnection attempt and clean up if it fails.
_reconnectTimer?.cancel();
_reconnectTimer = Timer(_connectionTimeout, () async {
if (_subscription == null) {
await _subscription?.cancel();
_subscription = null; // Will also occur on an error via handleError.
_reconnectTimer?.cancel();
_reconnectTimer = null;
_isReconnecting = false;
}
});
}
/// Stop listening for chain height updates.
Future<void> cancelListen() async {
await _subscription?.cancel();
_subscription = null;
_reconnectTimer?.cancel();
}
}

View file

@ -20,6 +20,7 @@ import 'package:event_bus/event_bus.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter_libsparkmobile/flutter_libsparkmobile.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/electrumx_rpc/rpc.dart';
import 'package:stackwallet/exceptions/electrumx/no_such_transaction.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart';
@ -127,6 +128,8 @@ class ElectrumXClient {
_coin = coin;
final bus = globalEventBusForTesting ?? GlobalEventBus.instance;
// Listen for tor status changes.
_torStatusListener = bus.on<TorConnectionStatusChangedEvent>().listen(
(event) async {
switch (event.newStatus) {
@ -145,6 +148,8 @@ class ElectrumXClient {
}
},
);
// Listen for tor preference changes.
_torPreferenceListener = bus.on<TorPreferenceChangedEvent>().listen(
(event) async {
// not sure if we need to do anything specific here
@ -157,6 +162,9 @@ class ElectrumXClient {
// on the next request sent through this electrumx instance
_electrumAdapterChannel = null;
_electrumAdapterClient = null;
// Also close any chain height services that are currently open.
await ChainHeightServiceManager.dispose();
},
);
}

View file

@ -68,7 +68,7 @@ class OutputV2 {
scriptPubKeyHex: json["scriptPubKey"]["hex"] as String,
scriptPubKeyAsm: json["scriptPubKey"]["asm"] as String?,
valueStringSats: parseOutputAmountString(
json["value"].toString(),
json["value"] != null ? json["value"].toString(): "0",
decimalPlaces: decimalPlaces,
isFullAmountNotSats: isFullAmountNotSats,
),

View file

@ -4,7 +4,6 @@ import 'package:isar/isar.dart';
import 'package:meta/meta.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/db/isar/main_db.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/models/isar/models/blockchain_data/address.dart';
import 'package:stackwallet/models/isar/models/ethereum/eth_contract.dart';
import 'package:stackwallet/models/node_model.dart';
@ -617,9 +616,10 @@ abstract class Wallet<T extends CryptoCurrency> {
switch (prefs.syncType) {
case SyncingType.currentWalletOnly:
// Close the subscription for this coin's chain height.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]
?.cancel();
// Close the subscription for this coin's chain height.
// NOTE: This does not work now that the subscription is shared
// await (await ChainHeightServiceManager.getService(cryptoCurrency.coin))
// ?.cancelListen();
case SyncingType.selectedWalletsAtStartup:
// Close the subscription if this wallet is not in the list to be synced.
if (!prefs.walletIdsSyncOnStartup.contains(walletId)) {
@ -639,8 +639,10 @@ abstract class Wallet<T extends CryptoCurrency> {
// If there are no other wallets of this coin, then close the sub.
if (walletIds.isEmpty) {
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]
?.cancel();
// NOTE: This does not work now that the subscription is shared
// await (await ChainHeightServiceManager.getService(
// cryptoCurrency.coin))
// ?.cancelListen();
}
}
case SyncingType.allWalletsOnStartup:

View file

@ -7,7 +7,6 @@ import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib;
import 'package:electrum_adapter/electrum_adapter.dart' as electrum_adapter;
import 'package:electrum_adapter/electrum_adapter.dart';
import 'package:isar/isar.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_client.dart';
@ -17,9 +16,6 @@ import 'package:stackwallet/models/isar/models/blockchain_data/v2/transaction_v2
import 'package:stackwallet/models/isar/models/isar_models.dart';
import 'package:stackwallet/models/paymint/fee_object_model.dart';
import 'package:stackwallet/models/signing_data.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart';
import 'package:stackwallet/services/event_bus/global_event_bus.dart';
import 'package:stackwallet/services/tor_service.dart';
import 'package:stackwallet/utilities/amount/amount.dart';
import 'package:stackwallet/utilities/enums/coin_enum.dart';
@ -42,21 +38,10 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
late ElectrumClient electrumAdapterClient;
late CachedElectrumXClient electrumXCachedClient;
// late SubscribableElectrumXClient subscribableElectrumXClient;
late ChainHeightServiceManager chainHeightServiceManager;
int? get maximumFeerate => null;
int? _latestHeight;
late Prefs _prefs;
late TorService _torService;
StreamSubscription<TorPreferenceChangedEvent>? _torPreferenceListener;
StreamSubscription<TorConnectionStatusChangedEvent>? _torStatusListener;
final Mutex _torConnectingLock = Mutex();
bool _requireMutex = false;
Timer? _aliveTimer;
static const Duration _keepAlive = Duration(minutes: 1);
bool _isConnected = false;
static const _kServerBatchCutoffVersion = [1, 6];
List<int>? _serverVersion;
bool get serverCanBatch {
@ -823,118 +808,24 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
Future<int> fetchChainHeight() async {
try {
// Don't set a stream subscription if one already exists.
if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
null) {
final Completer<int> completer = Completer<int>();
// Get the chain height service for the current coin.
ChainHeightService? service = ChainHeightServiceManager.getService(
cryptoCurrency.coin,
);
// Make sure we only complete once.
final isFirstResponse = _latestHeight == null;
// Check Electrum and update internal and cached versions if necessary.
await electrumXClient.checkElectrumAdapter();
if (electrumAdapterChannel != electrumXClient.electrumAdapterChannel &&
electrumXClient.electrumAdapterChannel != null) {
electrumAdapterChannel = electrumXClient.electrumAdapterChannel!;
}
if (electrumAdapterClient != electrumXClient.electrumAdapterClient &&
electrumXClient.electrumAdapterClient != null) {
electrumAdapterClient = electrumXClient.electrumAdapterClient!;
}
// electrumXCachedClient.electrumAdapterChannel = electrumAdapterChannel;
if (electrumXCachedClient.electrumAdapterClient !=
electrumAdapterClient) {
electrumXCachedClient.electrumAdapterClient = electrumAdapterClient;
}
// Subscribe to and listen for new block headers.
final stream = electrumAdapterClient.subscribeHeaders();
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
stream.asBroadcastStream().listen((response) {
final int chainHeight = response.height;
// print("Current chain height: $chainHeight");
_latestHeight = chainHeight;
if (isFirstResponse && !completer.isCompleted) {
// Return the chain height.
completer.complete(chainHeight);
}
});
// If we're testing, use the global event bus for testing.
// final bus = globalEventBusForTesting ?? GlobalEventBus.instance;
// No constructors for mixins, so no globalEventBusForTesting is passed in.
final bus = GlobalEventBus.instance;
// Listen to global event bus for Tor status changes.
_torStatusListener ??= bus.on<TorConnectionStatusChangedEvent>().listen(
(event) async {
try {
switch (event.newStatus) {
case TorConnectionStatus.connecting:
// If Tor is connecting, we need to wait.
await _torConnectingLock.acquire();
_requireMutex = true;
break;
case TorConnectionStatus.connected:
case TorConnectionStatus.disconnected:
// If Tor is connected or disconnected, we can release the lock.
if (_torConnectingLock.isLocked) {
_torConnectingLock.release();
}
_requireMutex = false;
break;
}
} finally {
// Ensure the lock is released.
if (_torConnectingLock.isLocked) {
_torConnectingLock.release();
}
}
},
);
// Listen to global event bus for Tor preference changes.
_torPreferenceListener ??= bus.on<TorPreferenceChangedEvent>().listen(
(event) async {
// Close any open subscriptions.
for (final coinSub
in ElectrumxChainHeightService.subscriptions.entries) {
await coinSub.value?.cancel();
}
// Cancel alive timer
_aliveTimer?.cancel();
},
);
// Set a timer to check if the subscription is still alive.
_aliveTimer?.cancel();
_aliveTimer = Timer.periodic(
_keepAlive,
(_) async => _updateConnectionStatus(await electrumXClient.ping()),
);
} else {
// Don't set a stream subscription if one already exists.
// Check if the stream subscription is paused.
if (ElectrumxChainHeightService
.subscriptions[cryptoCurrency.coin]!.isPaused) {
// If it's paused, resume it.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.resume();
}
if (_latestHeight != null) {
return _latestHeight!;
}
// ... or create a new one if it doesn't exist.
if (service == null) {
service = ChainHeightService(client: electrumAdapterClient);
ChainHeightServiceManager.add(service, cryptoCurrency.coin);
}
// Probably waiting on the subscription to receive the latest block height
// fallback to cached value
return info.cachedChainHeight;
// If the service hasn't been started, start it and fetch the chain height.
if (!service.started) {
return await service.fetchHeightAndStartListenForUpdates();
}
// Return the height as per the service if available or the cached height.
return service.height ?? info.cachedChainHeight;
} catch (e, s) {
Logging.instance.log(
"Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s",
@ -1061,15 +952,6 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// host: newNode.address, port: newNode.port);
}
/// Update the connection status and call the onConnectionStatusChanged callback if it exists.
void _updateConnectionStatus(bool connectionStatus) {
// TODO [prio=low]: Set onConnectionStatusChanged callback.
// if (_isConnected != connectionStatus && onConnectionStatusChanged != null) {
// onConnectionStatusChanged!(connectionStatus);
// }
_isConnected = connectionStatus;
}
//============================================================================
Future<({List<Address> addresses, int index})> checkGapsBatched(

View file

@ -528,8 +528,8 @@ packages:
dependency: "direct main"
description:
path: "."
ref: "51b7a60e07b0409b361e31da65d98178ee235bed"
resolved-ref: "51b7a60e07b0409b361e31da65d98178ee235bed"
ref: "0a34f7f48d921fb33f551cb11dfc9b2930522240"
resolved-ref: "0a34f7f48d921fb33f551cb11dfc9b2930522240"
url: "https://github.com/cypherstack/electrum_adapter.git"
source: git
version: "3.0.0"
@ -1727,8 +1727,8 @@ packages:
dependency: "direct main"
description:
path: "."
ref: "0a6888282f4e98401051a396e9d2293bd55ac2c2"
resolved-ref: "0a6888282f4e98401051a396e9d2293bd55ac2c2"
ref: e37dc4e22f7acb2746b70bdc935f0eb3c50b8b71
resolved-ref: e37dc4e22f7acb2746b70bdc935f0eb3c50b8b71
url: "https://github.com/cypherstack/tor.git"
source: git
version: "0.0.1"

View file

@ -65,7 +65,7 @@ dependencies:
tor_ffi_plugin:
git:
url: https://github.com/cypherstack/tor.git
ref: 0a6888282f4e98401051a396e9d2293bd55ac2c2
ref: e37dc4e22f7acb2746b70bdc935f0eb3c50b8b71
fusiondart:
git:
@ -176,7 +176,7 @@ dependencies:
electrum_adapter:
git:
url: https://github.com/cypherstack/electrum_adapter.git
ref: 51b7a60e07b0409b361e31da65d98178ee235bed
ref: 0a34f7f48d921fb33f551cb11dfc9b2930522240
stream_channel: ^2.1.0
dev_dependencies: