Merge pull request #743 from cypherstack/electrumx

ElectrumX fixes: Use subscribable ElectrumX client for subscribing to chain height, resolve issue with sorting by a null blockHeight for unconfirmed tx, and if just one response is returned, return it as a single-item list
This commit is contained in:
Diego Salazar 2024-02-06 20:51:21 -07:00 committed by GitHub
commit e5df9d94bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 1116 additions and 362 deletions

View file

@ -0,0 +1,10 @@
import 'dart:async';
import 'package:stackwallet/utilities/enums/coin_enum.dart';
/// Store chain height subscriptions for each coin.
abstract class ElectrumxChainHeightService {
static Map<Coin, StreamSubscription<dynamic>?> subscriptions = {};
// Used to hold chain height subscriptions for each coin as in:
// ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = sub;
}

View file

@ -202,6 +202,7 @@ class ElectrumXClient {
// ... But if the killswitch is set, then we throw an exception.
throw Exception(
"Tor preference and killswitch set but Tor is not enabled, not connecting to ElectrumX");
// TODO [prio=low]: Try to start Tor.
}
} else {
// Get the proxy info from the TorService.
@ -391,10 +392,21 @@ class ElectrumXClient {
final List<dynamic> response;
try {
response = jsonRpcResponse.data as List;
if (jsonRpcResponse.data is Map) {
response = [jsonRpcResponse.data];
if (requestStrings.length > 1) {
Logging.instance.log(
"Map returned instead of a list and there are ${requestStrings.length} queued.",
level: LogLevel.Error);
}
// Could throw error here.
} else {
response = jsonRpcResponse.data as List;
}
} catch (_) {
throw Exception(
"Expected json list but got a map: ${jsonRpcResponse.data}",
"Expected json list or map but got a ${jsonRpcResponse.data.runtimeType}: ${jsonRpcResponse.data}",
);
}
@ -595,7 +607,6 @@ class ElectrumXClient {
scripthash,
],
);
result = response["result"];
retryCount--;
}
@ -744,20 +755,25 @@ class ElectrumXClient {
return {"rawtx": response["result"] as String};
}
if (response["result"] == null) {
Logging.instance.log(
"getTransaction($txHash) returned null response",
level: LogLevel.Error,
);
throw 'getTransaction($txHash) returned null response';
if (response is! Map) {
final String msg = "getTransaction($txHash) returned a non-Map response"
" of type ${response.runtimeType}.";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
if (response["result"] == null) {
final String msg = "getTransaction($txHash) returned null result."
"\nResponse: $response";
Logging.instance.log(msg, level: LogLevel.Fatal);
throw Exception(msg);
}
return Map<String, dynamic>.from(response["result"] as Map);
} catch (e) {
} catch (e, s) {
Logging.instance.log(
"getTransaction($txHash) response: $response",
level: LogLevel.Error,
);
"getTransaction($txHash) response: $response"
"\nError: $e\nStack trace: $s",
level: LogLevel.Error);
rethrow;
}
}

View file

@ -213,7 +213,7 @@ class JsonRPC {
port,
timeout: connectionTimeout,
onBadCertificate: (_) => true,
); // TODO do not automatically trust bad certificates
); // TODO do not automatically trust bad certificates.
} else {
_socket = await Socket.connect(
host,

File diff suppressed because it is too large Load diff

View file

@ -59,6 +59,7 @@ import 'package:stackwallet/utilities/clipboard_interface.dart';
import 'package:stackwallet/utilities/constants.dart';
import 'package:stackwallet/utilities/enums/backup_frequency_type.dart';
import 'package:stackwallet/utilities/enums/coin_enum.dart';
import 'package:stackwallet/utilities/enums/sync_type_enum.dart';
import 'package:stackwallet/utilities/logger.dart';
import 'package:stackwallet/utilities/show_loading.dart';
import 'package:stackwallet/utilities/text_styles.dart';
@ -305,6 +306,26 @@ class _WalletViewState extends ConsumerState<WalletView> {
BackupFrequencyType.afterClosingAWallet) {
unawaited(ref.read(autoSWBServiceProvider).doBackup());
}
// Close the wallet according to syncing preferences.
switch (ref.read(prefsChangeNotifierProvider).syncType) {
case SyncingType.currentWalletOnly:
// Close the wallet.
unawaited(ref.watch(pWallets).getWallet(walletId).exit());
// unawaited so we don't lag the UI.
case SyncingType.selectedWalletsAtStartup:
// Close if this wallet is not in the list to be synced.
if (!ref
.read(prefsChangeNotifierProvider)
.walletIdsSyncOnStartup
.contains(widget.walletId)) {
unawaited(ref.watch(pWallets).getWallet(walletId).exit());
// unawaited so we don't lag the UI.
}
case SyncingType.allWalletsOnStartup:
// Do nothing.
break;
}
}
Widget _buildNetworkIcon(WalletSyncStatus status) {

View file

@ -10,9 +10,9 @@
import 'dart:async';
import 'dart:io';
import 'dart:typed_data';
import 'package:event_bus/event_bus.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:flutter_svg/svg.dart';
@ -38,7 +38,9 @@ import 'package:stackwallet/themes/coin_icon_provider.dart';
import 'package:stackwallet/themes/stack_colors.dart';
import 'package:stackwallet/utilities/assets.dart';
import 'package:stackwallet/utilities/enums/backup_frequency_type.dart';
import 'package:stackwallet/utilities/enums/sync_type_enum.dart';
import 'package:stackwallet/utilities/text_styles.dart';
import 'package:stackwallet/wallets/isar/providers/wallet_info_provider.dart';
import 'package:stackwallet/wallets/wallet/impl/banano_wallet.dart';
import 'package:stackwallet/widgets/custom_buttons/app_bar_icon_button.dart';
import 'package:stackwallet/widgets/custom_buttons/blue_text_button.dart';
@ -92,6 +94,26 @@ class _DesktopWalletViewState extends ConsumerState<DesktopWalletView> {
unawaited(ref.read(autoSWBServiceProvider).doBackup());
}
// Close the wallet according to syncing preferences.
switch (ref.read(prefsChangeNotifierProvider).syncType) {
case SyncingType.currentWalletOnly:
// Close the wallet.
unawaited(wallet.exit());
// unawaited so we don't lag the UI.
case SyncingType.selectedWalletsAtStartup:
// Close if this wallet is not in the list to be synced.
if (!ref
.read(prefsChangeNotifierProvider)
.walletIdsSyncOnStartup
.contains(widget.walletId)) {
unawaited(wallet.exit());
// unawaited so we don't lag the UI.
}
case SyncingType.allWalletsOnStartup:
// Do nothing.
break;
}
ref.read(currentWalletIdProvider.notifier).state = null;
}
@ -181,6 +203,21 @@ class _DesktopWalletViewState extends ConsumerState<DesktopWalletView> {
),
),
),
if (kDebugMode) const Spacer(),
if (kDebugMode)
Row(
children: [
const Text(
"Debug Height:",
),
const SizedBox(
width: 2,
),
Text(
ref.watch(pWalletChainHeight(widget.walletId)).toString(),
),
],
),
const Spacer(),
Row(
children: [

View file

@ -174,22 +174,27 @@ class BitcoincashWallet extends Bip39HDWallet
coin: cryptoCurrency.coin,
);
final prevOutJson = Map<String, dynamic>.from(
(inputTx["vout"] as List).firstWhere((e) => e["n"] == vout)
as Map);
try {
final prevOutJson = Map<String, dynamic>.from(
(inputTx["vout"] as List).firstWhere((e) => e["n"] == vout)
as Map);
final prevOut = OutputV2.fromElectrumXJson(
prevOutJson,
decimalPlaces: cryptoCurrency.fractionDigits,
walletOwns: false, // doesn't matter here as this is not saved
);
final prevOut = OutputV2.fromElectrumXJson(
prevOutJson,
decimalPlaces: cryptoCurrency.fractionDigits,
walletOwns: false, // doesn't matter here as this is not saved
);
outpoint = OutpointV2.isarCantDoRequiredInDefaultConstructor(
txid: txid,
vout: vout,
);
valueStringSats = prevOut.valueStringSats;
addresses.addAll(prevOut.addresses);
outpoint = OutpointV2.isarCantDoRequiredInDefaultConstructor(
txid: txid,
vout: vout,
);
valueStringSats = prevOut.valueStringSats;
addresses.addAll(prevOut.addresses);
} catch (e, s) {
Logging.instance.log(
"Error getting prevOutJson: $s\nStack trace: $s",
level: LogLevel.Warning);
}
}
InputV2 input = InputV2.isarCantDoRequiredInDefaultConstructor(

View file

@ -4,6 +4,7 @@ import 'package:isar/isar.dart';
import 'package:meta/meta.dart';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/db/isar/main_db.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/models/isar/models/blockchain_data/address.dart';
import 'package:stackwallet/models/isar/models/ethereum/eth_contract.dart';
import 'package:stackwallet/models/node_model.dart';
@ -17,6 +18,7 @@ import 'package:stackwallet/utilities/amount/amount.dart';
import 'package:stackwallet/utilities/constants.dart';
import 'package:stackwallet/utilities/default_nodes.dart';
import 'package:stackwallet/utilities/enums/coin_enum.dart';
import 'package:stackwallet/utilities/enums/sync_type_enum.dart';
import 'package:stackwallet/utilities/flutter_secure_storage_interface.dart';
import 'package:stackwallet/utilities/logger.dart';
import 'package:stackwallet/utilities/paynym_is_api.dart';
@ -609,7 +611,42 @@ abstract class Wallet<T extends CryptoCurrency> {
Future<void> exit() async {
_periodicRefreshTimer?.cancel();
_networkAliveTimer?.cancel();
// TODO:
// If the syncing pref is currentWalletOnly or selectedWalletsAtStartup (and
// this wallet isn't in walletIdsSyncOnStartup), then we close subscriptions.
switch (prefs.syncType) {
case SyncingType.currentWalletOnly:
// Close the subscription for this coin's chain height.
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]
?.cancel();
case SyncingType.selectedWalletsAtStartup:
// Close the subscription if this wallet is not in the list to be synced.
if (!prefs.walletIdsSyncOnStartup.contains(walletId)) {
// Check if there's another wallet of this coin on the sync list.
List<String> walletIds = [];
for (final id in prefs.walletIdsSyncOnStartup) {
final wallet = mainDB.isar.walletInfo
.where()
.walletIdEqualTo(id)
.findFirstSync()!;
if (wallet.coin == cryptoCurrency.coin) {
walletIds.add(id);
}
}
// TODO [prio=low]: use a query instead of iterating thru wallets.
// If there are no other wallets of this coin, then close the sub.
if (walletIds.isEmpty) {
await ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]
?.cancel();
}
}
case SyncingType.allWalletsOnStartup:
// Do nothing.
break;
}
}
@mustCallSuper

View file

@ -6,7 +6,9 @@ import 'package:bitcoindart/bitcoindart.dart' as bitcoindart;
import 'package:coinlib_flutter/coinlib_flutter.dart' as coinlib;
import 'package:isar/isar.dart';
import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_client.dart';
import 'package:stackwallet/electrumx_rpc/subscribable_electrumx_client.dart';
import 'package:stackwallet/models/isar/models/blockchain_data/v2/input_v2.dart';
import 'package:stackwallet/models/isar/models/blockchain_data/v2/output_v2.dart';
import 'package:stackwallet/models/isar/models/blockchain_data/v2/transaction_v2.dart';
@ -30,9 +32,12 @@ import 'package:uuid/uuid.dart';
mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
late ElectrumXClient electrumXClient;
late CachedElectrumXClient electrumXCachedClient;
late SubscribableElectrumXClient subscribableElectrumXClient;
int? get maximumFeerate => null;
int? _latestHeight;
static const _kServerBatchCutoffVersion = [1, 6];
List<int>? _serverVersion;
bool get serverCanBatch {
@ -123,7 +128,12 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// don't care about sorting if using all utxos
if (!coinControl) {
// sort spendable by age (oldest first)
spendableOutputs.sort((a, b) => b.blockTime!.compareTo(a.blockTime!));
spendableOutputs.sort((a, b) => (b.blockTime ?? currentChainHeight)
.compareTo((a.blockTime ?? currentChainHeight)));
// Null check operator changed to null assignment in order to resolve a
// `Null check operator used on a null value` error. currentChainHeight
// used in order to sort these unconfirmed outputs as the youngest, but we
// could just as well use currentChainHeight + 1.
}
Logging.instance.log("spendableOutputs.length: ${spendableOutputs.length}",
@ -794,9 +804,81 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
Future<int> fetchChainHeight() async {
try {
final result = await electrumXClient.getBlockHeadTip();
return result["height"] as int;
} catch (e) {
// Don't set a stream subscription if one already exists.
if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
null) {
final Completer<int> completer = Completer<int>();
// Make sure we only complete once.
final isFirstResponse = _latestHeight == null;
// Subscribe to block headers.
final subscription =
subscribableElectrumXClient.subscribeToBlockHeaders();
// set stream subscription
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
subscription.responseStream.asBroadcastStream().listen((event) {
final response = event;
if (response != null &&
response is Map &&
response.containsKey('height')) {
final int chainHeight = response['height'] as int;
// print("Current chain height: $chainHeight");
_latestHeight = chainHeight;
if (isFirstResponse) {
// Return the chain height.
completer.complete(chainHeight);
}
} else {
Logging.instance.log(
"blockchain.headers.subscribe returned malformed response\n"
"Response: $response",
level: LogLevel.Error);
}
});
return _latestHeight ?? await completer.future;
}
// Don't set a stream subscription if one already exists.
else {
// Check if the stream subscription is paused.
if (ElectrumxChainHeightService
.subscriptions[cryptoCurrency.coin]!.isPaused) {
// If it's paused, resume it.
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin]!
.resume();
}
// Causes synchronization to stall.
// // Check if the stream subscription is active by pinging it.
// if (!(await subscribableElectrumXClient.ping())) {
// // If it's not active, reconnect it.
// final node = await getCurrentElectrumXNode();
//
// await subscribableElectrumXClient.connect(
// host: node.address, port: node.port);
//
// // Wait for first response.
// return completer.future;
// }
if (_latestHeight != null) {
return _latestHeight!;
}
}
// Probably waiting on the subscription to receive the latest block height
// fallback to cached value
return info.cachedChainHeight;
} catch (e, s) {
Logging.instance.log(
"Exception rethrown in fetchChainHeight\nError: $e\nStack trace: $s",
level: LogLevel.Error);
// completer.completeError(e, s);
// return Future.error(e, s);
rethrow;
}
}
@ -865,6 +947,13 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
electrumXCachedClient = CachedElectrumXClient.from(
electrumXClient: electrumXClient,
);
subscribableElectrumXClient = SubscribableElectrumXClient.from(
node: newNode,
prefs: prefs,
failovers: failovers,
);
await subscribableElectrumXClient.connect(
host: newNode.address, port: newNode.port);
}
//============================================================================
@ -932,7 +1021,8 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// check and add appropriate addresses
for (int k = 0; k < txCountBatchSize; k++) {
int count = counts["${_id}_$k"]!;
int count = (counts["${_id}_$k"] == null) ? 0 : counts["${_id}_$k"]!;
if (count > 0) {
iterationsAddressArray.add(txCountCallArgs["${_id}_$k"]!);
@ -1196,9 +1286,9 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
Logging.instance.log("fetched fees: $feeObject", level: LogLevel.Info);
_cachedFees = feeObject;
return _cachedFees!;
} catch (e) {
} catch (e, s) {
Logging.instance.log(
"Exception rethrown from _getFees(): $e",
"Exception rethrown from _getFees(): $e\nStack trace: $s",
level: LogLevel.Error,
);
if (_cachedFees == null) {