WIP: paginated spark anon set prep

This commit is contained in:
julian 2024-12-14 15:29:17 -06:00 committed by julian-CStack
parent 744f273862
commit d6aec00b58
10 changed files with 420 additions and 69 deletions

View file

@ -9,6 +9,7 @@ import 'package:sqlite3/sqlite3.dart';
import 'package:uuid/uuid.dart';
import '../../electrumx_rpc/electrumx_client.dart';
import '../../models/electrumx_response/spark_models.dart';
import '../../utilities/extensions/extensions.dart';
import '../../utilities/logger.dart';
import '../../utilities/stack_file_system.dart';
@ -42,12 +43,17 @@ abstract class _FiroCache {
network == CryptoCurrencyNetwork.main
? "spark_set_v$_setCacheVersion.sqlite3"
: "spark_set_v${_setCacheVersion}_${network.name}.sqlite3";
static String sparkSetMetaCacheFileName(CryptoCurrencyNetwork network) =>
network == CryptoCurrencyNetwork.main
? "spark_set_meta_v$_setCacheVersion.sqlite3"
: "spark_set_meta_v${_setCacheVersion}_${network.name}.sqlite3";
static String sparkUsedTagsCacheFileName(CryptoCurrencyNetwork network) =>
network == CryptoCurrencyNetwork.main
? "spark_tags_v$_tagsCacheVersion.sqlite3"
: "spark_tags_v${_tagsCacheVersion}_${network.name}.sqlite3";
static final Map<CryptoCurrencyNetwork, Database> _setCacheDB = {};
static final Map<CryptoCurrencyNetwork, Database> _setMetaCacheDB = {};
static final Map<CryptoCurrencyNetwork, Database> _usedTagsCacheDB = {};
static Database setCacheDB(CryptoCurrencyNetwork network) {
if (_setCacheDB[network] == null) {
@ -58,6 +64,15 @@ abstract class _FiroCache {
return _setCacheDB[network]!;
}
static Database setMetaCacheDB(CryptoCurrencyNetwork network) {
if (_setMetaCacheDB[network] == null) {
throw Exception(
"FiroCache.init() must be called before accessing FiroCache.db!",
);
}
return _setMetaCacheDB[network]!;
}
static Database usedTagsCacheDB(CryptoCurrencyNetwork network) {
if (_usedTagsCacheDB[network] == null) {
throw Exception(
@ -78,12 +93,18 @@ abstract class _FiroCache {
final sparkSetCacheFile =
File("${sqliteDir.path}/${sparkSetCacheFileName(network)}");
final sparkSetMetaCacheFile =
File("${sqliteDir.path}/${sparkSetMetaCacheFileName(network)}");
final sparkUsedTagsCacheFile =
File("${sqliteDir.path}/${sparkUsedTagsCacheFileName(network)}");
if (!(await sparkSetCacheFile.exists())) {
await _createSparkSetCacheDb(sparkSetCacheFile.path);
}
if (!(await sparkSetMetaCacheFile.exists())) {
await _createSparkSetMetaCacheDb(sparkSetMetaCacheFile.path);
}
if (!(await sparkUsedTagsCacheFile.exists())) {
await _createSparkUsedTagsCacheDb(sparkUsedTagsCacheFile.path);
}
@ -92,6 +113,10 @@ abstract class _FiroCache {
sparkSetCacheFile.path,
mode: OpenMode.readWrite,
);
_setMetaCacheDB[network] = sqlite3.open(
sparkSetMetaCacheFile.path,
mode: OpenMode.readWrite,
);
_usedTagsCacheDB[network] = sqlite3.open(
sparkUsedTagsCacheFile.path,
mode: OpenMode.readWrite,
@ -109,6 +134,12 @@ abstract class _FiroCache {
VACUUM;
""",
);
setMetaCacheDB(network).execute(
"""
DELETE FROM PreviousMetaFetchResult;
VACUUM;
""",
);
usedTagsCacheDB(network).execute(
"""
DELETE FROM SparkUsedCoinTags;
@ -159,6 +190,27 @@ abstract class _FiroCache {
db.dispose();
}
static Future<void> _createSparkSetMetaCacheDb(String file) async {
final db = sqlite3.open(
file,
mode: OpenMode.readWriteCreate,
);
db.execute(
"""
CREATE TABLE PreviousMetaFetchResult (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
coinGroupId INTEGER NOT NULL UNIQUE,
blockHash TEXT NOT NULL,
setHash TEXT NOT NULL,
size INTEGER NOT NULL
);
""",
);
db.dispose();
}
static Future<void> _createSparkUsedTagsCacheDb(String file) async {
final db = sqlite3.open(
file,

View file

@ -6,6 +6,8 @@ typedef LTagPair = ({String tag, String txid});
/// background isolate and [FiroCacheCoordinator] should manage that isolate
abstract class FiroCacheCoordinator {
static final Map<CryptoCurrencyNetwork, _FiroCacheWorker> _workers = {};
static final Map<CryptoCurrencyNetwork, Mutex> _tagLocks = {};
static final Map<CryptoCurrencyNetwork, Mutex> _setLocks = {};
static bool _init = false;
static Future<void> init() async {
@ -15,6 +17,8 @@ abstract class FiroCacheCoordinator {
_init = true;
await _FiroCache.init();
for (final network in _FiroCache.networks) {
_tagLocks[network] = Mutex();
_setLocks[network] = Mutex();
_workers[network] = await _FiroCacheWorker.spawn(network);
}
}
@ -28,14 +32,26 @@ abstract class FiroCacheCoordinator {
final setCacheFile = File(
"${dir.path}/${_FiroCache.sparkSetCacheFileName(network)}",
);
final setMetaCacheFile = File(
"${dir.path}/${_FiroCache.sparkSetMetaCacheFileName(network)}",
);
final usedTagsCacheFile = File(
"${dir.path}/${_FiroCache.sparkUsedTagsCacheFileName(network)}",
);
final int bytes =
((await setCacheFile.exists()) ? await setCacheFile.length() : 0) +
((await usedTagsCacheFile.exists())
? await usedTagsCacheFile.length()
: 0);
final setSize =
(await setCacheFile.exists()) ? await setCacheFile.length() : 0;
final tagsSize = (await usedTagsCacheFile.exists())
? await usedTagsCacheFile.length()
: 0;
final setMetaSize =
(await setMetaCacheFile.exists()) ? await setMetaCacheFile.length() : 0;
print("TAG SIZE: $tagsSize");
print("SET SIZE: $setSize");
print("SET META SIZE: $setMetaSize");
final int bytes = tagsSize + setSize + setMetaSize;
if (bytes < 1024) {
return '$bytes B';
@ -55,43 +71,116 @@ abstract class FiroCacheCoordinator {
ElectrumXClient client,
CryptoCurrencyNetwork network,
) async {
final count = await FiroCacheCoordinator.getUsedCoinTagsCount(network);
final unhashedTags = await client.getSparkUnhashedUsedCoinsTagsWithTxHashes(
startNumber: count,
);
if (unhashedTags.isNotEmpty) {
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkUsedTagsWith,
data: unhashedTags,
),
await _tagLocks[network]!.protect(() async {
final count = await FiroCacheCoordinator.getUsedCoinTagsCount(network);
final unhashedTags =
await client.getSparkUnhashedUsedCoinsTagsWithTxHashes(
startNumber: count,
);
}
if (unhashedTags.isNotEmpty) {
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkUsedTagsWith,
data: unhashedTags,
),
);
}
});
}
static Future<void> runFetchAndUpdateSparkAnonSetCacheForGroupId(
int groupId,
ElectrumXClient client,
CryptoCurrencyNetwork network,
void Function(int countFetched, int totalCount)? progressUpdated,
) async {
final blockhashResult =
await FiroCacheCoordinator.getLatestSetInfoForGroupId(
groupId,
network,
);
final blockHash = blockhashResult?.blockHash ?? "";
await _setLocks[network]!.protect(() async {
Map<String, dynamic> json;
SparkAnonymitySetMeta? meta;
final json = await client.getSparkAnonymitySet(
coinGroupId: groupId.toString(),
startBlockHash: blockHash.toHexReversedFromBase64,
);
if (progressUpdated == null) {
// Legacy
final blockhashResult =
await FiroCacheCoordinator.getLatestSetInfoForGroupId(
groupId,
network,
);
final blockHash = blockhashResult?.blockHash ?? "";
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkAnonSetCoinsWith,
data: (groupId, json),
),
);
json = await client.getSparkAnonymitySet(
coinGroupId: groupId.toString(),
startBlockHash: blockHash.toHexReversedFromBase64,
);
} else {
const sectorSize = 100; // TODO adjust this?
final prevMetaSize =
await FiroCacheCoordinator.getSparkMetaSetSizeForGroupId(
groupId,
network,
);
final prevSize = prevMetaSize ?? 0;
meta = await client.getSparkAnonymitySetMeta(
coinGroupId: groupId,
);
progressUpdated.call(prevSize, meta.size);
/// Returns blockHash (last block hash),
/// setHash (hash of current set)
/// and coins (the list of pairs serialized coin and tx hash)
final fullSectorCount = (meta.size - prevSize) ~/ sectorSize;
final remainder = (meta.size - prevSize) % sectorSize;
final List<dynamic> coins = [];
for (int i = 0; i < fullSectorCount; i++) {
final start = (i * sectorSize) + prevSize;
final data = await client.getSparkAnonymitySetBySector(
coinGroupId: groupId,
latestBlock: meta.blockHash,
startIndex: start,
endIndex: start + sectorSize,
);
coins.addAll(data);
}
if (remainder > 0) {
final data = await client.getSparkAnonymitySetBySector(
coinGroupId: groupId,
latestBlock: meta.blockHash,
startIndex: meta.size - remainder,
endIndex: meta.size,
);
coins.addAll(data);
}
json = {
"blockHash": meta.blockHash,
"setHash": meta.setHash,
"coins": coins,
};
}
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkAnonSetCoinsWith,
data: (groupId, json),
),
);
if (meta != null) {
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkAnonSetMetaWith,
data: meta,
),
);
}
});
}
// ===========================================================================
@ -228,4 +317,19 @@ abstract class FiroCacheCoordinator {
db: _FiroCache.setCacheDB(network),
);
}
static Future<int?> getSparkMetaSetSizeForGroupId(
int groupId,
CryptoCurrencyNetwork network,
) async {
final result = await _Reader._getSizeForGroupId(
groupId,
db: _FiroCache.setMetaCacheDB(network),
);
if (result.isEmpty) {
return null;
}
return result.first["size"] as int;
}
}

View file

@ -56,6 +56,21 @@ abstract class _Reader {
return db.select("$query;").first["setExists"] == 1;
}
// ===========================================================================
// =============== Spark anonymity set meta queries ==========================
static Future<ResultSet> _getSizeForGroupId(
int groupId, {
required Database db,
}) async {
final query = """
SELECT ss.size
FROM PreviousMetaFetchResult ss
WHERE ss.groupId = $groupId;
""";
return db.select("$query;");
}
// ===========================================================================
// =============== Spark used coin tags queries ==============================

View file

@ -3,6 +3,7 @@ part of 'firo_cache.dart';
enum FCFuncName {
_updateSparkAnonSetCoinsWith,
_updateSparkUsedTagsWith,
_updateSparkAnonSetMetaWith,
}
class FCTask {
@ -29,6 +30,8 @@ class _FiroCacheWorker {
final dir = await StackFileSystem.applicationFiroCacheSQLiteDirectory();
final setCacheFilePath =
"${dir.path}/${_FiroCache.sparkSetCacheFileName(network)}";
final setMetaCacheFilePath =
"${dir.path}/${_FiroCache.sparkSetMetaCacheFileName(network)}";
final usedTagsCacheFilePath =
"${dir.path}/${_FiroCache.sparkUsedTagsCacheFileName(network)}";
@ -48,7 +51,12 @@ class _FiroCacheWorker {
try {
await Isolate.spawn(
_startWorkerIsolate,
(initPort.sendPort, setCacheFilePath, usedTagsCacheFilePath),
(
initPort.sendPort,
setCacheFilePath,
setMetaCacheFilePath,
usedTagsCacheFilePath,
),
);
} catch (_) {
initPort.close();
@ -79,6 +87,7 @@ class _FiroCacheWorker {
ReceivePort receivePort,
SendPort sendPort,
Database setCacheDb,
Database setMetaCacheDb,
Database usedTagsCacheDb,
Mutex mutex,
) {
@ -104,6 +113,13 @@ class _FiroCacheWorker {
task.data as List<List<dynamic>>,
);
break;
case FCFuncName._updateSparkAnonSetMetaWith:
result = _updateSparkAnonSetMetaWith(
setMetaCacheDb,
task.data as SparkAnonymitySetMeta,
);
break;
}
if (result.success) {
@ -118,7 +134,7 @@ class _FiroCacheWorker {
});
}
static void _startWorkerIsolate((SendPort, String, String) args) {
static void _startWorkerIsolate((SendPort, String, String, String) args) {
final receivePort = ReceivePort();
args.$1.send(receivePort.sendPort);
final mutex = Mutex();
@ -126,14 +142,19 @@ class _FiroCacheWorker {
args.$2,
mode: OpenMode.readWrite,
);
final usedTagsCacheDb = sqlite3.open(
final setMetaCacheDb = sqlite3.open(
args.$3,
mode: OpenMode.readWrite,
);
final usedTagsCacheDb = sqlite3.open(
args.$4,
mode: OpenMode.readWrite,
);
_handleCommandsToIsolate(
receivePort,
args.$1,
setCacheDb,
setMetaCacheDb,
usedTagsCacheDb,
mutex,
);

View file

@ -48,6 +48,31 @@ FCResult _updateSparkUsedTagsWith(
}
}
// ===========================================================================
// ================== write to spark anon set Meta cache ==========================
FCResult _updateSparkAnonSetMetaWith(
Database db,
SparkAnonymitySetMeta meta,
) {
db.execute("BEGIN;");
try {
db.execute(
"""
INSERT OR REPLACE INTO PreviousMetaFetchResult (coinGroupId, blockHash, setHash, size)
VALUES (?, ?, ?, ?);
""",
[meta.coinGroupId, meta.blockHash, meta.setHash, meta.size],
);
db.execute("COMMIT;");
return FCResult(success: true);
} catch (e) {
db.execute("ROLLBACK;");
return FCResult(success: false, error: e);
}
}
// ===========================================================================
// ================== write to spark anon set cache ==========================

View file

@ -21,6 +21,7 @@ import 'package:mutex/mutex.dart';
import 'package:stream_channel/stream_channel.dart';
import '../exceptions/electrumx/no_such_transaction.dart';
import '../models/electrumx_response/spark_models.dart';
import '../services/event_bus/events/global/tor_connection_status_changed_event.dart';
import '../services/event_bus/events/global/tor_status_changed_event.dart';
import '../services/event_bus/global_event_bus.dart';
@ -34,13 +35,6 @@ import '../wallets/crypto_currency/crypto_currency.dart';
import '../wallets/crypto_currency/interfaces/electrumx_currency_interface.dart';
import 'client_manager.dart';
typedef SparkMempoolData = ({
String txid,
List<String> serialContext,
List<String> lTags,
List<String> coins,
});
class WifiOnlyException implements Exception {}
class TorOnlyException implements Exception {}
@ -1037,29 +1031,30 @@ class ElectrumXClient {
/// "b476ed2b374bb081ea51d111f68f0136252521214e213d119b8dc67b92f5a390",
/// ]
/// }
Future<List<Map<String, dynamic>>> getSparkMintMetaData({
String? requestID,
required List<String> sparkCoinHashes,
}) async {
try {
Logging.instance.log(
"attempting to fetch spark.getsparkmintmetadata...",
level: LogLevel.Info,
);
await checkElectrumAdapter();
final List<dynamic> response =
await (getElectrumAdapter() as FiroElectrumClient)
.getSparkMintMetaData(sparkCoinHashes: sparkCoinHashes);
Logging.instance.log(
"Fetching spark.getsparkmintmetadata finished",
level: LogLevel.Info,
);
return List<Map<String, dynamic>>.from(response);
} catch (e) {
Logging.instance.log(e, level: LogLevel.Error);
rethrow;
}
}
/// NOT USED?
// Future<List<Map<String, dynamic>>> getSparkMintMetaData({
// String? requestID,
// required List<String> sparkCoinHashes,
// }) async {
// try {
// Logging.instance.log(
// "attempting to fetch spark.getsparkmintmetadata...",
// level: LogLevel.Info,
// );
// await checkElectrumAdapter();
// final List<dynamic> response =
// await (getElectrumAdapter() as FiroElectrumClient)
// .getSparkMintMetaData(sparkCoinHashes: sparkCoinHashes);
// Logging.instance.log(
// "Fetching spark.getsparkmintmetadata finished",
// level: LogLevel.Info,
// );
// return List<Map<String, dynamic>>.from(response);
// } catch (e) {
// Logging.instance.log(e, level: LogLevel.Error);
// rethrow;
// }
// }
/// Returns the latest Spark set id
///
@ -1135,7 +1130,7 @@ class ElectrumXClient {
final List<SparkMempoolData> result = [];
for (final entry in map.entries) {
result.add(
(
SparkMempoolData(
txid: entry.key,
serialContext:
List<String>.from(entry.value["serial_context"] as List),
@ -1191,6 +1186,94 @@ class ElectrumXClient {
rethrow;
}
}
// ======== New Paginated Endpoints ==========================================
Future<SparkAnonymitySetMeta> getSparkAnonymitySetMeta({
String? requestID,
required int coinGroupId,
}) async {
try {
const command =
"spark.getsparkanonyumitysetmeta"; // TODO verify this will be correct
final start = DateTime.now();
final response = await request(
requestID: requestID,
command: command,
args: [
"$coinGroupId",
],
);
final map = Map<String, dynamic>.from(response as Map);
final result = SparkAnonymitySetMeta(
coinGroupId: coinGroupId,
blockHash: map["blockHash"] as String,
setHash: map["setHash"] as String,
size: map["size"] as int,
);
Logging.instance.log(
"Finished ElectrumXClient.getSparkAnonymitySetMeta("
"requestID=$requestID, "
"coinGroupId=$coinGroupId"
"). Set meta=$result, "
"Duration=${DateTime.now().difference(start)}",
level: LogLevel.Debug,
);
return result;
} catch (e) {
Logging.instance.log(e, level: LogLevel.Error);
rethrow;
}
}
Future<List<dynamic>> getSparkAnonymitySetBySector({
String? requestID,
required int coinGroupId,
required String latestBlock,
required int startIndex, // inclusive
required int endIndex, // exclusive
}) async {
try {
const command =
"spark.getsparkanonyumitysetsector"; // TODO verify this will be correct
final start = DateTime.now();
final response = await request(
requestID: requestID,
command: command,
args: [
"$coinGroupId",
latestBlock,
"$startIndex",
"$endIndex",
],
);
final map = Map<String, dynamic>.from(response as Map);
final result = map["coins"] as List;
Logging.instance.log(
"Finished ElectrumXClient.getSparkAnonymitySetBySector("
"requestID=$requestID, "
"coinGroupId=$coinGroupId, "
"latestBlock=$latestBlock, "
"startIndex=$startIndex, "
"endIndex=$endIndex"
"). # of coins=${result.length}, "
"Duration=${DateTime.now().difference(start)}",
level: LogLevel.Debug,
);
return result;
} catch (e) {
Logging.instance.log(e, level: LogLevel.Error);
rethrow;
}
}
// ===========================================================================
Future<bool> isMasterNodeCollateral({

View file

@ -0,0 +1,47 @@
class SparkMempoolData {
final String txid;
final List<String> serialContext;
final List<String> lTags;
final List<String> coins;
SparkMempoolData({
required this.txid,
required this.serialContext,
required this.lTags,
required this.coins,
});
@override
String toString() {
return "SparkMempoolData{"
"txid: $txid, "
"serialContext: $serialContext, "
"lTags: $lTags, "
"coins: $coins"
"}";
}
}
class SparkAnonymitySetMeta {
final int coinGroupId;
final String blockHash;
final String setHash;
final int size;
SparkAnonymitySetMeta({
required this.coinGroupId,
required this.blockHash,
required this.setHash,
required this.size,
});
@override
String toString() {
return "SparkAnonymitySetMeta{"
"coinGroupId: $coinGroupId, "
"blockHash: $blockHash, "
"setHash: $setHash, "
"size: $size"
"}";
}
}

View file

@ -725,6 +725,7 @@ class FiroWallet<T extends ElectrumXCurrencyInterface> extends Bip39HDWallet<T>
i,
electrumXClient,
cryptoCurrency.network,
null,
),
);
}

View file

@ -670,7 +670,7 @@ abstract class Wallet<T extends CryptoCurrency> {
GlobalEventBus.instance.fire(RefreshPercentChangedEvent(0.3, walletId));
if (this is SparkInterface && !viewOnly) {
// this should be called before updateTransactions()
await (this as SparkInterface).refreshSparkData();
await (this as SparkInterface).refreshSparkData(null);
}
_checkAlive();

View file

@ -795,7 +795,9 @@ mixin SparkInterface<T extends ElectrumXCurrencyInterface>
}
}
Future<void> refreshSparkData() async {
Future<void> refreshSparkData(
void Function(int countFetched, int totalCount)? progressUpdated,
) async {
final start = DateTime.now();
try {
// start by checking if any previous sets are missing from db and add the
@ -823,6 +825,7 @@ mixin SparkInterface<T extends ElectrumXCurrencyInterface>
e,
electrumXClient,
cryptoCurrency.network,
null,
),
);
@ -1086,7 +1089,7 @@ mixin SparkInterface<T extends ElectrumXCurrencyInterface>
}
try {
await refreshSparkData();
await refreshSparkData(null);
} catch (e, s) {
Logging.instance.log(
"$runtimeType $walletId ${info.name}: $e\n$s",