From 725d11f9c2e514589f1ce11506a5bf935f4e1367 Mon Sep 17 00:00:00 2001 From: julian Date: Thu, 22 Feb 2024 12:16:53 +0700 Subject: [PATCH] electrum/fulcrum batching tweaks and fixes --- lib/electrumx_rpc/electrumx_client.dart | 85 +++++++++-------- .../electrumx_interface.dart | 92 ++++++++----------- 2 files changed, 86 insertions(+), 91 deletions(-) diff --git a/lib/electrumx_rpc/electrumx_client.dart b/lib/electrumx_rpc/electrumx_client.dart index 0df6cf787..cacef8178 100644 --- a/lib/electrumx_rpc/electrumx_client.dart +++ b/lib/electrumx_rpc/electrumx_client.dart @@ -387,7 +387,7 @@ class ElectrumXClient { /// returns a list of json response objects if no errors were found Future> batchRequest({ required String command, - required Map> args, + required List args, Duration requestTimeout = const Duration(seconds: 60), int retries = 2, }) async { @@ -404,37 +404,39 @@ class ElectrumXClient { try { var futures = >[]; - List? response; _electrumAdapterClient!.peer.withBatch(() { - for (final entry in args.entries) { - futures.add(_electrumAdapterClient!.request(command, entry.value)); + for (final arg in args) { + futures.add(_electrumAdapterClient!.request(command, arg)); } }); - response = await Future.wait(futures); + final response = await Future.wait(futures); - // check for errors, format and throw if there are any - final List errors = []; - for (int i = 0; i < response.length; i++) { - var result = response[i]; - - if (result == null || (result is List && result.isEmpty)) { - continue; - // TODO [prio=extreme]: Figure out if this is actually an issue. - } - result = result[0]; // Unwrap the list. - if ((result is Map && result.keys.contains("error")) || - result == null) { - errors.add(result.toString()); - } - } - if (errors.isNotEmpty) { - String error = "[\n"; - for (int i = 0; i < errors.length; i++) { - error += "${errors[i]}\n"; - } - error += "]"; - throw Exception("JSONRPC response error: $error"); - } + // We cannot modify the response list as the order and length are related + // to the order and length of the batched requests! + // + // // check for errors, format and throw if there are any + // final List errors = []; + // for (int i = 0; i < response.length; i++) { + // var result = response[i]; + // + // if (result == null || (result is List && result.isEmpty)) { + // continue; + // // TODO [prio=extreme]: Figure out if this is actually an issue. + // } + // result = result[0]; // Unwrap the list. + // if ((result is Map && result.keys.contains("error")) || + // result == null) { + // errors.add(result.toString()); + // } + // } + // if (errors.isNotEmpty) { + // String error = "[\n"; + // for (int i = 0; i < errors.length; i++) { + // error += "${errors[i]}\n"; + // } + // error += "]"; + // throw Exception("JSONRPC response error: $error"); + // } currentFailoverIndex = -1; return response; @@ -636,16 +638,17 @@ class ElectrumXClient { } } - Future>>> getBatchHistory( - {required Map> args}) async { + Future>>> getBatchHistory({ + required List args, + }) async { try { final response = await batchRequest( command: 'blockchain.scripthash.get_history', args: args, ); - final Map>> result = {}; + final List>> result = []; for (int i = 0; i < response.length; i++) { - result[i] = List>.from(response[i] as List); + result.add(List>.from(response[i] as List)); } return result; } catch (e) { @@ -689,23 +692,27 @@ class ElectrumXClient { } } - Future>>> getBatchUTXOs( - {required Map> args}) async { + Future>>> getBatchUTXOs({ + required List args, + }) async { try { final response = await batchRequest( command: 'blockchain.scripthash.listunspent', args: args, ); - final Map>> result = {}; + final List>> result = []; for (int i = 0; i < response.length; i++) { if ((response[i] as List).isNotEmpty) { try { - // result[i] = response[i] as List>; - result[i] = List>.from(response[i] as List); + final data = List>.from(response[i] as List); + result.add(data); } catch (e) { - print(response[i]); + // to ensure we keep same length of responses as requests/args + // add empty list on error + result.add([]); + Logging.instance.log( - "getBatchUTXOs failed to parse response", + "getBatchUTXOs failed to parse response=${response[i]}: $e", level: LogLevel.Error, ); } diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index 577d52044..a7bb2a1af 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -842,21 +842,19 @@ mixin ElectrumXInterface on Bip39HDWallet { return transactions.length; } - Future> fetchTxCountBatched({ - required Map addresses, + /// Should return a list of tx counts matching the list of addresses given + Future> fetchTxCountBatched({ + required List addresses, }) async { try { - final Map> args = {}; - for (final entry in addresses.entries) { - args[entry.key] = [ - cryptoCurrency.addressToScriptHash(address: entry.value), - ]; - } - final response = await electrumXClient.getBatchHistory(args: args); + final response = await electrumXClient.getBatchHistory( + args: addresses + .map((e) => [cryptoCurrency.addressToScriptHash(address: e)]) + .toList(growable: false)); - final Map result = {}; - for (final entry in response.entries) { - result[entry.key] = entry.value.length; + final List result = []; + for (final entry in response) { + result.add(entry.length); } return result; } catch (e, s) { @@ -968,13 +966,11 @@ mixin ElectrumXInterface on Bip39HDWallet { index < cryptoCurrency.maxNumberOfIndexesToCheck && gapCounter < cryptoCurrency.maxUnusedAddressGap; index += txCountBatchSize) { - List iterationsAddressArray = []; Logging.instance.log( "index: $index, \t GapCounter $chain ${type.name}: $gapCounter", level: LogLevel.Info); - final _id = "k_$index"; - Map txCountCallArgs = {}; + List txCountCallArgs = []; for (int j = 0; j < txCountBatchSize; j++) { final derivePath = cryptoCurrency.constructDerivePath( @@ -1007,9 +1003,9 @@ mixin ElectrumXInterface on Bip39HDWallet { addressArray.add(address); - txCountCallArgs.addAll({ - "${_id}_$j": addressString, - }); + txCountCallArgs.add( + addressString, + ); } // get address tx counts @@ -1017,11 +1013,9 @@ mixin ElectrumXInterface on Bip39HDWallet { // check and add appropriate addresses for (int k = 0; k < txCountBatchSize; k++) { - int count = (counts["${_id}_$k"] == null) ? 0 : counts["${_id}_$k"]!; + final count = counts[k]; if (count > 0) { - iterationsAddressArray.add(txCountCallArgs["${_id}_$k"]!); - // update highest highestIndexWithHistory = index + k; @@ -1111,23 +1105,20 @@ mixin ElectrumXInterface on Bip39HDWallet { List> allTxHashes = []; if (serverCanBatch) { - final Map>> batches = {}; - final Map requestIdToAddressMap = {}; + final Map>> batches = {}; + final Map> batchIndexToAddressListMap = {}; const batchSizeMax = 100; int batchNumber = 0; for (int i = 0; i < allAddresses.length; i++) { - if (batches["$batchNumber"] == null) { - batches["$batchNumber"] = {}; - } + batches[batchNumber] ??= []; + batchIndexToAddressListMap[batchNumber] ??= []; + + final address = allAddresses.elementAt(i); final scriptHash = cryptoCurrency.addressToScriptHash( - address: allAddresses.elementAt(i), + address: address, ); - // final id = Logger.isTestEnv ? "$i" : const Uuid().v1(); - // TODO [prio=???]: Pass request IDs to electrum_adapter. - requestIdToAddressMap[i] = allAddresses.elementAt(i); - batches["$batchNumber"]!.addAll({ - "$i": [scriptHash] - }); + batches[batchNumber]!.add([scriptHash]); + batchIndexToAddressListMap[batchNumber]!.add(address); if (i % batchSizeMax == batchSizeMax - 1) { batchNumber++; } @@ -1135,13 +1126,14 @@ mixin ElectrumXInterface on Bip39HDWallet { for (int i = 0; i < batches.length; i++) { final response = - await electrumXClient.getBatchHistory(args: batches["$i"]!); - for (final entry in response.entries) { - for (int j = 0; j < entry.value.length; j++) { - entry.value[j]["address"] = requestIdToAddressMap[entry.key]; - if (!allTxHashes.contains(entry.value[j])) { - allTxHashes.add(entry.value[j]); - } + await electrumXClient.getBatchHistory(args: batches[i]!); + for (int j = 0; j < response.length; j++) { + final entry = response[j]; + for (int k = 0; k < entry.length; k++) { + entry[k]["address"] = batchIndexToAddressListMap[i]![j]; + // if (!allTxHashes.contains(entry[j])) { + allTxHashes.add(entry[k]); + // } } } } @@ -1608,31 +1600,27 @@ mixin ElectrumXInterface on Bip39HDWallet { final fetchedUtxoList = >>[]; if (serverCanBatch) { - final Map>> batches = {}; + final Map>> batchArgs = {}; const batchSizeMax = 10; int batchNumber = 0; for (int i = 0; i < allAddresses.length; i++) { - if (batches[batchNumber] == null) { - batches[batchNumber] = {}; - } + batchArgs[batchNumber] ??= []; final scriptHash = cryptoCurrency.addressToScriptHash( address: allAddresses[i].value, ); - batches[batchNumber]!.addAll({ - scriptHash: [scriptHash] - }); + batchArgs[batchNumber]!.add([scriptHash]); if (i % batchSizeMax == batchSizeMax - 1) { batchNumber++; } } - for (int i = 0; i < batches.length; i++) { + for (int i = 0; i < batchArgs.length; i++) { final response = - await electrumXClient.getBatchUTXOs(args: batches[i]!); - for (final entry in response.entries) { - if (entry.value.isNotEmpty) { - fetchedUtxoList.add(entry.value); + await electrumXClient.getBatchUTXOs(args: batchArgs[i]!); + for (final entry in response) { + if (entry.isNotEmpty) { + fetchedUtxoList.add(entry); } } }