electrum/fulcrum batching tweaks and fixes

This commit is contained in:
julian 2024-02-22 12:16:53 +07:00
parent a5299adb39
commit 725d11f9c2
2 changed files with 86 additions and 91 deletions

View file

@ -387,7 +387,7 @@ class ElectrumXClient {
/// returns a list of json response objects if no errors were found
Future<List<dynamic>> batchRequest({
required String command,
required Map<String, List<dynamic>> args,
required List<dynamic> args,
Duration requestTimeout = const Duration(seconds: 60),
int retries = 2,
}) async {
@ -404,37 +404,39 @@ class ElectrumXClient {
try {
var futures = <Future<dynamic>>[];
List? response;
_electrumAdapterClient!.peer.withBatch(() {
for (final entry in args.entries) {
futures.add(_electrumAdapterClient!.request(command, entry.value));
for (final arg in args) {
futures.add(_electrumAdapterClient!.request(command, arg));
}
});
response = await Future.wait(futures);
final response = await Future.wait(futures);
// check for errors, format and throw if there are any
final List<String> errors = [];
for (int i = 0; i < response.length; i++) {
var result = response[i];
if (result == null || (result is List && result.isEmpty)) {
continue;
// TODO [prio=extreme]: Figure out if this is actually an issue.
}
result = result[0]; // Unwrap the list.
if ((result is Map && result.keys.contains("error")) ||
result == null) {
errors.add(result.toString());
}
}
if (errors.isNotEmpty) {
String error = "[\n";
for (int i = 0; i < errors.length; i++) {
error += "${errors[i]}\n";
}
error += "]";
throw Exception("JSONRPC response error: $error");
}
// We cannot modify the response list as the order and length are related
// to the order and length of the batched requests!
//
// // check for errors, format and throw if there are any
// final List<String> errors = [];
// for (int i = 0; i < response.length; i++) {
// var result = response[i];
//
// if (result == null || (result is List && result.isEmpty)) {
// continue;
// // TODO [prio=extreme]: Figure out if this is actually an issue.
// }
// result = result[0]; // Unwrap the list.
// if ((result is Map && result.keys.contains("error")) ||
// result == null) {
// errors.add(result.toString());
// }
// }
// if (errors.isNotEmpty) {
// String error = "[\n";
// for (int i = 0; i < errors.length; i++) {
// error += "${errors[i]}\n";
// }
// error += "]";
// throw Exception("JSONRPC response error: $error");
// }
currentFailoverIndex = -1;
return response;
@ -636,16 +638,17 @@ class ElectrumXClient {
}
}
Future<Map<int, List<Map<String, dynamic>>>> getBatchHistory(
{required Map<String, List<dynamic>> args}) async {
Future<List<List<Map<String, dynamic>>>> getBatchHistory({
required List<dynamic> args,
}) async {
try {
final response = await batchRequest(
command: 'blockchain.scripthash.get_history',
args: args,
);
final Map<int, List<Map<String, dynamic>>> result = {};
final List<List<Map<String, dynamic>>> result = [];
for (int i = 0; i < response.length; i++) {
result[i] = List<Map<String, dynamic>>.from(response[i] as List);
result.add(List<Map<String, dynamic>>.from(response[i] as List));
}
return result;
} catch (e) {
@ -689,23 +692,27 @@ class ElectrumXClient {
}
}
Future<Map<int, List<Map<String, dynamic>>>> getBatchUTXOs(
{required Map<String, List<dynamic>> args}) async {
Future<List<List<Map<String, dynamic>>>> getBatchUTXOs({
required List<dynamic> args,
}) async {
try {
final response = await batchRequest(
command: 'blockchain.scripthash.listunspent',
args: args,
);
final Map<int, List<Map<String, dynamic>>> result = {};
final List<List<Map<String, dynamic>>> result = [];
for (int i = 0; i < response.length; i++) {
if ((response[i] as List).isNotEmpty) {
try {
// result[i] = response[i] as List<Map<String, dynamic>>;
result[i] = List<Map<String, dynamic>>.from(response[i] as List);
final data = List<Map<String, dynamic>>.from(response[i] as List);
result.add(data);
} catch (e) {
print(response[i]);
// to ensure we keep same length of responses as requests/args
// add empty list on error
result.add([]);
Logging.instance.log(
"getBatchUTXOs failed to parse response",
"getBatchUTXOs failed to parse response=${response[i]}: $e",
level: LogLevel.Error,
);
}

View file

@ -842,21 +842,19 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
return transactions.length;
}
Future<Map<int, int>> fetchTxCountBatched({
required Map<String, String> addresses,
/// Should return a list of tx counts matching the list of addresses given
Future<List<int>> fetchTxCountBatched({
required List<String> addresses,
}) async {
try {
final Map<String, List<dynamic>> args = {};
for (final entry in addresses.entries) {
args[entry.key] = [
cryptoCurrency.addressToScriptHash(address: entry.value),
];
}
final response = await electrumXClient.getBatchHistory(args: args);
final response = await electrumXClient.getBatchHistory(
args: addresses
.map((e) => [cryptoCurrency.addressToScriptHash(address: e)])
.toList(growable: false));
final Map<int, int> result = {};
for (final entry in response.entries) {
result[entry.key] = entry.value.length;
final List<int> result = [];
for (final entry in response) {
result.add(entry.length);
}
return result;
} catch (e, s) {
@ -968,13 +966,11 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
index < cryptoCurrency.maxNumberOfIndexesToCheck &&
gapCounter < cryptoCurrency.maxUnusedAddressGap;
index += txCountBatchSize) {
List<String> iterationsAddressArray = [];
Logging.instance.log(
"index: $index, \t GapCounter $chain ${type.name}: $gapCounter",
level: LogLevel.Info);
final _id = "k_$index";
Map<String, String> txCountCallArgs = {};
List<String> txCountCallArgs = [];
for (int j = 0; j < txCountBatchSize; j++) {
final derivePath = cryptoCurrency.constructDerivePath(
@ -1007,9 +1003,9 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
addressArray.add(address);
txCountCallArgs.addAll({
"${_id}_$j": addressString,
});
txCountCallArgs.add(
addressString,
);
}
// get address tx counts
@ -1017,11 +1013,9 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// check and add appropriate addresses
for (int k = 0; k < txCountBatchSize; k++) {
int count = (counts["${_id}_$k"] == null) ? 0 : counts["${_id}_$k"]!;
final count = counts[k];
if (count > 0) {
iterationsAddressArray.add(txCountCallArgs["${_id}_$k"]!);
// update highest
highestIndexWithHistory = index + k;
@ -1111,23 +1105,20 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
List<Map<String, dynamic>> allTxHashes = [];
if (serverCanBatch) {
final Map<String, Map<String, List<dynamic>>> batches = {};
final Map<int, String> requestIdToAddressMap = {};
final Map<int, List<List<dynamic>>> batches = {};
final Map<int, List<String>> batchIndexToAddressListMap = {};
const batchSizeMax = 100;
int batchNumber = 0;
for (int i = 0; i < allAddresses.length; i++) {
if (batches["$batchNumber"] == null) {
batches["$batchNumber"] = {};
}
batches[batchNumber] ??= [];
batchIndexToAddressListMap[batchNumber] ??= [];
final address = allAddresses.elementAt(i);
final scriptHash = cryptoCurrency.addressToScriptHash(
address: allAddresses.elementAt(i),
address: address,
);
// final id = Logger.isTestEnv ? "$i" : const Uuid().v1();
// TODO [prio=???]: Pass request IDs to electrum_adapter.
requestIdToAddressMap[i] = allAddresses.elementAt(i);
batches["$batchNumber"]!.addAll({
"$i": [scriptHash]
});
batches[batchNumber]!.add([scriptHash]);
batchIndexToAddressListMap[batchNumber]!.add(address);
if (i % batchSizeMax == batchSizeMax - 1) {
batchNumber++;
}
@ -1135,13 +1126,14 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
for (int i = 0; i < batches.length; i++) {
final response =
await electrumXClient.getBatchHistory(args: batches["$i"]!);
for (final entry in response.entries) {
for (int j = 0; j < entry.value.length; j++) {
entry.value[j]["address"] = requestIdToAddressMap[entry.key];
if (!allTxHashes.contains(entry.value[j])) {
allTxHashes.add(entry.value[j]);
}
await electrumXClient.getBatchHistory(args: batches[i]!);
for (int j = 0; j < response.length; j++) {
final entry = response[j];
for (int k = 0; k < entry.length; k++) {
entry[k]["address"] = batchIndexToAddressListMap[i]![j];
// if (!allTxHashes.contains(entry[j])) {
allTxHashes.add(entry[k]);
// }
}
}
}
@ -1608,31 +1600,27 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
final fetchedUtxoList = <List<Map<String, dynamic>>>[];
if (serverCanBatch) {
final Map<int, Map<String, List<dynamic>>> batches = {};
final Map<int, List<List<dynamic>>> batchArgs = {};
const batchSizeMax = 10;
int batchNumber = 0;
for (int i = 0; i < allAddresses.length; i++) {
if (batches[batchNumber] == null) {
batches[batchNumber] = {};
}
batchArgs[batchNumber] ??= [];
final scriptHash = cryptoCurrency.addressToScriptHash(
address: allAddresses[i].value,
);
batches[batchNumber]!.addAll({
scriptHash: [scriptHash]
});
batchArgs[batchNumber]!.add([scriptHash]);
if (i % batchSizeMax == batchSizeMax - 1) {
batchNumber++;
}
}
for (int i = 0; i < batches.length; i++) {
for (int i = 0; i < batchArgs.length; i++) {
final response =
await electrumXClient.getBatchUTXOs(args: batches[i]!);
for (final entry in response.entries) {
if (entry.value.isNotEmpty) {
fetchedUtxoList.add(entry.value);
await electrumXClient.getBatchUTXOs(args: batchArgs[i]!);
for (final entry in response) {
if (entry.isNotEmpty) {
fetchedUtxoList.add(entry);
}
}
}