From d1a236be3330e3b7d1b282da185b21af254a3014 Mon Sep 17 00:00:00 2001 From: julian Date: Fri, 14 Jun 2024 14:52:01 -0600 Subject: [PATCH] spark mempool check during refresh --- .../spark_interface.dart | 474 +++++++++++------- 1 file changed, 283 insertions(+), 191 deletions(-) diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart index 5ac6ade90..496147da1 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart @@ -635,7 +635,9 @@ mixin SparkInterface // been marked as isUsed. // TODO: [prio=med] Could (probably should) throw an exception here if txData.usedSparkCoins is null or empty if (txData.usedSparkCoins != null && txData.usedSparkCoins!.isNotEmpty) { - await _addOrUpdateSparkCoins(txData.usedSparkCoins!); + await mainDB.isar.writeTxn(() async { + await mainDB.isar.sparkCoins.putAll(txData.usedSparkCoins!); + }); } return await updateSentCachedTxData(txData: txData); @@ -648,7 +650,88 @@ mixin SparkInterface } } + // in mem cache + Set _mempoolTxids = {}; + Set _mempoolTxidsChecked = {}; + + Future> _refreshSparkCoinsMempoolCheck({ + required Set privateKeyHexSet, + required int groupId, + }) async { + final start = DateTime.now(); + try { + // update cache + _mempoolTxids = await electrumXClient.getMempoolTxids(); + + // remove any checked txids that are not in the mempool anymore + _mempoolTxidsChecked = _mempoolTxidsChecked.intersection(_mempoolTxids); + + // get all unchecked txids currently in mempool + final txidsToCheck = _mempoolTxids.difference(_mempoolTxidsChecked); + if (txidsToCheck.isEmpty) { + return []; + } + + // fetch spark data to scan if we own any unconfirmed spark coins + final sparkDataToCheck = await electrumXClient.getMempoolSparkData( + txids: txidsToCheck.toList(), + ); + + final Set checkedTxids = {}; + final List> rawCoins = []; + + for (final data in sparkDataToCheck) { + for (int i = 0; i < data.coins.length; i++) { + rawCoins.add([ + data.coins[i], + data.txid, + data.serialContext.first, + ]); + } + + checkedTxids.add(data.txid); + } + + final result = []; + + // if there is new data we try and identify the coins + if (rawCoins.isNotEmpty) { + // run identify off main isolate + final myCoins = await compute( + _identifyCoins, + ( + anonymitySetCoins: rawCoins, + groupId: groupId, + privateKeyHexSet: privateKeyHexSet, + walletId: walletId, + isTestNet: cryptoCurrency.network == CryptoCurrencyNetwork.test, + ), + ); + + // add checked txids after identification + _mempoolTxidsChecked.addAll(checkedTxids); + + result.addAll(myCoins); + } + + return result; + } catch (e) { + Logging.instance.log( + "refreshSparkMempoolData() failed: $e", + level: LogLevel.Error, + ); + return []; + } finally { + Logging.instance.log( + "$walletId ${info.name} refreshSparkCoinsMempoolCheck() run " + "duration: ${DateTime.now().difference(start)}", + level: LogLevel.Debug, + ); + } + } + Future refreshSparkData() async { + final start = DateTime.now(); try { // start by checking if any previous sets are missing from db and add the // missing groupIds to the list if sets to check and update @@ -684,15 +767,210 @@ mixin SparkInterface ), ]); - await _checkAndUpdateCoins(); - // refresh spark balance - await refreshSparkBalance(); + // Get cached timestamps per groupId. These timestamps are used to check + // and try to id coins that were added to the spark anon set cache + // after that timestamp. + final groupIdTimestampUTCMap = + info.otherData[WalletInfoKeys.firoSparkCacheSetTimestampCache] + as Map? ?? + {}; + + // iterate through the cache, fetching spark coin data that hasn't been + // processed by this wallet yet + final Map>> rawCoinsBySetId = {}; + for (int i = 1; i <= latestGroupId; i++) { + final lastCheckedTimeStampUTC = + groupIdTimestampUTCMap[i.toString()] as int? ?? 0; + final info = await FiroCacheCoordinator.getLatestSetInfoForGroupId( + i, + ); + final anonymitySetResult = + await FiroCacheCoordinator.getSetCoinsForGroupId( + i, + newerThanTimeStamp: lastCheckedTimeStampUTC, + ); + final coinsRaw = anonymitySetResult + .map( + (e) => [ + e.serialized, + e.txHash, + e.context, + ], + ) + .toList(); + + if (coinsRaw.isNotEmpty) { + rawCoinsBySetId[i] = coinsRaw; + } + + // update last checked timestamp data + groupIdTimestampUTCMap[i.toString()] = max( + lastCheckedTimeStampUTC, + info?.timestampUTC ?? lastCheckedTimeStampUTC, + ); + } + + // get address(es) to get the private key hex strings required for + // identifying spark coins + final sparkAddresses = await mainDB.isar.addresses + .where() + .walletIdEqualTo(walletId) + .filter() + .typeEqualTo(AddressType.spark) + .findAll(); + final root = await getRootHDNode(); + final Set privateKeyHexSet = sparkAddresses + .map( + (e) => + root.derivePath(e.derivationPath!.value).privateKey.data.toHex, + ) + .toSet(); + + // try to identify any coins in the unchecked set data + final List newlyIdCoins = []; + for (final groupId in rawCoinsBySetId.keys) { + final myCoins = await compute( + _identifyCoins, + ( + anonymitySetCoins: rawCoinsBySetId[groupId]!, + groupId: groupId, + privateKeyHexSet: privateKeyHexSet, + walletId: walletId, + isTestNet: cryptoCurrency.network == CryptoCurrencyNetwork.test, + ), + ); + newlyIdCoins.addAll(myCoins); + } + // if any were found, add to database + if (newlyIdCoins.isNotEmpty) { + await mainDB.isar.writeTxn(() async { + await mainDB.isar.sparkCoins.putAll(newlyIdCoins); + }); + } + + // finally update the cached timestamps in the database + await info.updateOtherData( + newEntries: { + WalletInfoKeys.firoSparkCacheSetTimestampCache: + groupIdTimestampUTCMap, + }, + isar: mainDB.isar, + ); + + // check for spark coins in mempool + final mempoolMyCoins = await _refreshSparkCoinsMempoolCheck( + privateKeyHexSet: privateKeyHexSet, + groupId: latestGroupId, + ); + // if any were found, add to database + if (mempoolMyCoins.isNotEmpty) { + await mainDB.isar.writeTxn(() async { + await mainDB.isar.sparkCoins.putAll(mempoolMyCoins); + }); + } + + // get unused and or unconfirmed coins from db + final coinsToCheck = await mainDB.isar.sparkCoins + .where() + .walletIdEqualToAnyLTagHash(walletId) + .filter() + .heightIsNull() + .or() + .isUsedEqualTo(false) + .findAll(); + + Set? spentCoinTags; + // only fetch tags from db if we need them to compare against any items + // in coinsToCheck + if (coinsToCheck.isNotEmpty) { + spentCoinTags = await FiroCacheCoordinator.getUsedCoinTags(0); + } + + // check and update coins if required + final List updatedCoins = []; + for (final coin in coinsToCheck) { + SparkCoin updated = coin; + + if (updated.height == null) { + final tx = await electrumXCachedClient.getTransaction( + txHash: updated.txHash, + cryptoCurrency: info.coin, + ); + if (tx["height"] is int) { + updated = updated.copyWith(height: tx["height"] as int); + } + } + + if (updated.height != null && + spentCoinTags!.contains(updated.lTagHash)) { + updated = coin.copyWith(isUsed: true); + } + + updatedCoins.add(updated); + } + // update in db if any have changed + if (updatedCoins.isNotEmpty) { + await mainDB.isar.writeTxn(() async { + await mainDB.isar.sparkCoins.putAll(updatedCoins); + }); + } + + // used to check if balance is spendable or total + final currentHeight = await chainHeight; + + // get all unused coins to update wallet spark balance + final unusedCoins = await mainDB.isar.sparkCoins + .where() + .walletIdEqualToAnyLTagHash(walletId) + .filter() + .isUsedEqualTo(false) + .findAll(); + + final total = Amount( + rawValue: unusedCoins + .map((e) => e.value) + .fold(BigInt.zero, (prev, e) => prev + e), + fractionDigits: cryptoCurrency.fractionDigits, + ); + final spendable = Amount( + rawValue: unusedCoins + .where( + (e) => + e.height != null && + e.height! + cryptoCurrency.minConfirms <= currentHeight, + ) + .map((e) => e.value) + .fold(BigInt.zero, (prev, e) => prev + e), + fractionDigits: cryptoCurrency.fractionDigits, + ); + + final sparkBalance = Balance( + total: total, + spendable: spendable, + blockedTotal: Amount( + rawValue: BigInt.zero, + fractionDigits: cryptoCurrency.fractionDigits, + ), + pendingSpendable: total - spendable, + ); + + // finally update balance in db + await info.updateBalanceTertiary( + newBalance: sparkBalance, + isar: mainDB.isar, + ); } catch (e, s) { Logging.instance.log( "$runtimeType $walletId ${info.name}: $e\n$s", level: LogLevel.Error, ); rethrow; + } finally { + Logging.instance.log( + "${info.name} refreshSparkData() duration:" + " ${DateTime.now().difference(start)}", + level: LogLevel.Debug, + ); } } @@ -722,49 +1000,6 @@ mixin SparkInterface return pairs.toSet(); } - Future refreshSparkBalance() async { - final currentHeight = await chainHeight; - final unusedCoins = await mainDB.isar.sparkCoins - .where() - .walletIdEqualToAnyLTagHash(walletId) - .filter() - .isUsedEqualTo(false) - .findAll(); - - final total = Amount( - rawValue: unusedCoins - .map((e) => e.value) - .fold(BigInt.zero, (prev, e) => prev + e), - fractionDigits: cryptoCurrency.fractionDigits, - ); - final spendable = Amount( - rawValue: unusedCoins - .where( - (e) => - e.height != null && - e.height! + cryptoCurrency.minConfirms <= currentHeight, - ) - .map((e) => e.value) - .fold(BigInt.zero, (prev, e) => prev + e), - fractionDigits: cryptoCurrency.fractionDigits, - ); - - final sparkBalance = Balance( - total: total, - spendable: spendable, - blockedTotal: Amount( - rawValue: BigInt.zero, - fractionDigits: cryptoCurrency.fractionDigits, - ), - pendingSpendable: total - spendable, - ); - - await info.updateBalanceTertiary( - newBalance: sparkBalance, - isar: mainDB.isar, - ); - } - /// Should only be called within the standard wallet [recover] function due to /// mutex locking. Otherwise behaviour MAY be undefined. Future recoverSparkWallet({ @@ -777,10 +1012,7 @@ mixin SparkInterface } try { - await _checkAndUpdateCoins(); - - // refresh spark balance - await refreshSparkBalance(); + await refreshSparkData(); } catch (e, s) { Logging.instance.log( "$runtimeType $walletId ${info.name}: $e\n$s", @@ -790,115 +1022,6 @@ mixin SparkInterface } } - Future _checkAndUpdateCoins() async { - final sparkAddresses = await mainDB.isar.addresses - .where() - .walletIdEqualTo(walletId) - .filter() - .typeEqualTo(AddressType.spark) - .findAll(); - final root = await getRootHDNode(); - final Set privateKeyHexSet = sparkAddresses - .map( - (e) => root.derivePath(e.derivationPath!.value).privateKey.data.toHex, - ) - .toSet(); - - final Map>> rawCoinsBySetId = {}; - - final groupIdTimestampUTCMap = - info.otherData[WalletInfoKeys.firoSparkCacheSetTimestampCache] - as Map? ?? - {}; - - final latestSparkCoinId = await electrumXClient.getSparkLatestCoinId(); - for (int i = 1; i <= latestSparkCoinId; i++) { - final lastCheckedTimeStampUTC = - groupIdTimestampUTCMap[i.toString()] as int? ?? 0; - final info = await FiroCacheCoordinator.getLatestSetInfoForGroupId( - i, - ); - final anonymitySetResult = - await FiroCacheCoordinator.getSetCoinsForGroupId( - i, - newerThanTimeStamp: lastCheckedTimeStampUTC, - ); - final coinsRaw = anonymitySetResult - .map( - (e) => [ - e.serialized, - e.txHash, - e.context, - ], - ) - .toList(); - - if (coinsRaw.isNotEmpty) { - rawCoinsBySetId[i] = coinsRaw; - } - - groupIdTimestampUTCMap[i.toString()] = max( - lastCheckedTimeStampUTC, - info?.timestampUTC ?? lastCheckedTimeStampUTC, - ); - } - - await info.updateOtherData( - newEntries: { - WalletInfoKeys.firoSparkCacheSetTimestampCache: groupIdTimestampUTCMap, - }, - isar: mainDB.isar, - ); - - final List newlyIdCoins = []; - for (final groupId in rawCoinsBySetId.keys) { - final myCoins = await compute( - _identifyCoins, - ( - anonymitySetCoins: rawCoinsBySetId[groupId]!, - groupId: groupId, - privateKeyHexSet: privateKeyHexSet, - walletId: walletId, - isTestNet: cryptoCurrency.network == CryptoCurrencyNetwork.test, - ), - ); - newlyIdCoins.addAll(myCoins); - } - - await _checkAndMarkCoinsUsedInDB(coinsNotInDbYet: newlyIdCoins); - } - - Future _checkAndMarkCoinsUsedInDB({ - List coinsNotInDbYet = const [], - }) async { - final List coins = await mainDB.isar.sparkCoins - .where() - .walletIdEqualToAnyLTagHash(walletId) - .filter() - .isUsedEqualTo(false) - .findAll(); - - final List coinsToWrite = []; - - final spentCoinTags = await FiroCacheCoordinator.getUsedCoinTags(0); - - for (final coin in coins) { - if (spentCoinTags.contains(coin.lTagHash)) { - coinsToWrite.add(coin.copyWith(isUsed: true)); - } - } - for (final coin in coinsNotInDbYet) { - if (spentCoinTags.contains(coin.lTagHash)) { - coinsToWrite.add(coin.copyWith(isUsed: true)); - } else { - coinsToWrite.add(coin); - } - } - - // update wallet spark coins in isar - await _addOrUpdateSparkCoins(coinsToWrite); - } - // modelled on CSparkWallet::CreateSparkMintTransactions https://github.com/firoorg/firo/blob/39c41e5e7ec634ced3700fe3f4f5509dc2e480d0/src/spark/sparkwallet.cpp#L752 Future> _createSparkMintTransactions({ required List availableUtxos, @@ -1698,37 +1821,6 @@ mixin SparkInterface // ====================== Private ============================================ - Future _addOrUpdateSparkCoins(List coins) async { - if (coins.isNotEmpty) { - await mainDB.isar.writeTxn(() async { - await mainDB.isar.sparkCoins.putAll(coins); - }); - } - - // update wallet spark coin height - final coinsToCheck = await mainDB.isar.sparkCoins - .where() - .walletIdEqualToAnyLTagHash(walletId) - .filter() - .heightIsNull() - .findAll(); - final List updatedCoins = []; - for (final coin in coinsToCheck) { - final tx = await electrumXCachedClient.getTransaction( - txHash: coin.txHash, - cryptoCurrency: info.coin, - ); - if (tx["height"] is int) { - updatedCoins.add(coin.copyWith(height: tx["height"] as int)); - } - } - if (updatedCoins.isNotEmpty) { - await mainDB.isar.writeTxn(() async { - await mainDB.isar.sparkCoins.putAll(updatedCoins); - }); - } - } - btc.NetworkType get _bitcoinDartNetwork => btc.NetworkType( messagePrefix: cryptoCurrency.networkParams.messagePrefix, bech32: cryptoCurrency.networkParams.bech32Hrp,