compartmentalize the sqlite firo cache code and add a background isolate worker to handle some processing as well as cache db writes

This commit is contained in:
julian 2024-05-31 16:37:25 -06:00
parent de949efbff
commit 744107b3eb
5 changed files with 576 additions and 498 deletions

View file

@ -1,15 +1,23 @@
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'package:flutter/foundation.dart';
import 'package:flutter_libsparkmobile/flutter_libsparkmobile.dart';
import 'package:mutex/mutex.dart';
import 'package:sqlite3/sqlite3.dart';
import 'package:uuid/uuid.dart';
import '../../electrumx_rpc/electrumx_client.dart';
import '../../utilities/extensions/extensions.dart';
import '../../utilities/logger.dart';
import '../../utilities/stack_file_system.dart';
part 'firo_cache_coordinator.dart';
part 'firo_cache_reader.dart';
part 'firo_cache_writer.dart';
part 'firo_cache_worker.dart';
/// Temporary debugging log function for this file
void _debugLog(Object? object) {
if (kDebugMode) {
@ -20,146 +28,6 @@ void _debugLog(Object? object) {
}
}
List<String> _ffiHashTagsComputeWrapper(List<String> base64Tags) {
return LibSpark.hashTags(base64Tags: base64Tags);
}
/// Wrapper class for [_FiroCache] as [_FiroCache] should eventually be handled in a
/// background isolate and [FiroCacheCoordinator] should manage that isolate
abstract class FiroCacheCoordinator {
static Future<void> init() => _FiroCache.init();
static Future<void> clearSharedCache() async {
return await _FiroCache._deleteAllCache();
}
static Future<String> getSparkCacheSize() async {
final dir = await StackFileSystem.applicationSQLiteDirectory();
final cacheFile = File("${dir.path}/${_FiroCache.sqliteDbFileName}");
final int bytes;
if (await cacheFile.exists()) {
bytes = await cacheFile.length();
} else {
bytes = 0;
}
if (bytes < 1024) {
return '$bytes B';
} else if (bytes < 1048576) {
final double kbSize = bytes / 1024;
return '${kbSize.toStringAsFixed(2)} KB';
} else if (bytes < 1073741824) {
final double mbSize = bytes / 1048576;
return '${mbSize.toStringAsFixed(2)} MB';
} else {
final double gbSize = bytes / 1073741824;
return '${gbSize.toStringAsFixed(2)} GB';
}
}
static Future<void> runFetchAndUpdateSparkUsedCoinTags(
ElectrumXClient client,
) async {
final count = await FiroCacheCoordinator.getUsedCoinTagsLastAddedRowId();
final unhashedTags = await client.getSparkUnhashedUsedCoinsTags(
startNumber: count,
);
if (unhashedTags.isNotEmpty) {
final hashedTags = await compute(
_ffiHashTagsComputeWrapper,
unhashedTags,
);
await _FiroCache._updateSparkUsedTagsWith(hashedTags);
}
}
static Future<void> runFetchAndUpdateSparkAnonSetCacheForGroupId(
int groupId,
ElectrumXClient client,
) async {
final blockhashResult =
await FiroCacheCoordinator.getLatestSetInfoForGroupId(
groupId,
);
final blockHash = blockhashResult?.blockHash ?? "";
final json = await client.getSparkAnonymitySet(
coinGroupId: groupId.toString(),
startBlockHash: blockHash.toHexReversedFromBase64,
);
await _FiroCache._updateSparkAnonSetCoinsWith(json, groupId);
}
// ===========================================================================
static Future<Set<String>> getUsedCoinTags(int startNumber) async {
final result = await _FiroCache._getSparkUsedCoinTags(
startNumber,
);
return result.map((e) => e["tag"] as String).toSet();
}
/// This should be the equivalent of counting the number of tags in the db.
/// Assuming the integrity of the data. Faster than actually calling count on
/// a table where no records have been deleted. None should be deleted from
/// this table in practice.
static Future<int> getUsedCoinTagsLastAddedRowId() async {
final result = await _FiroCache._getUsedCoinTagsLastAddedRowId();
if (result.isEmpty) {
return 0;
}
return result.first["highestId"] as int? ?? 0;
}
static Future<bool> checkTagIsUsed(
String tag,
) async {
return await _FiroCache._checkTagIsUsed(
tag,
);
}
static Future<ResultSet> getSetCoinsForGroupId(
int groupId, {
int? newerThanTimeStamp,
}) async {
return await _FiroCache._getSetCoinsForGroupId(
groupId,
newerThanTimeStamp: newerThanTimeStamp,
);
}
static Future<
({
String blockHash,
String setHash,
int timestampUTC,
})?> getLatestSetInfoForGroupId(
int groupId,
) async {
final result = await _FiroCache._getLatestSetInfoForGroupId(groupId);
if (result.isEmpty) {
return null;
}
return (
blockHash: result.first["blockHash"] as String,
setHash: result.first["setHash"] as String,
timestampUTC: result.first["timestampUTC"] as int,
);
}
static Future<bool> checkSetInfoForGroupIdExists(
int groupId,
) async {
return await _FiroCache._checkSetInfoForGroupIdExists(
groupId,
);
}
}
abstract class _FiroCache {
static const String sqliteDbFileName = "firo_ex_cache.sqlite3";
@ -218,14 +86,14 @@ abstract class _FiroCache {
db.execute(
"""
CREATE TABLE SparkSet (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
blockHash TEXT NOT NULL,
setHash TEXT NOT NULL,
groupId INTEGER NOT NULL,
timestampUTC INTEGER NOT NULL,
UNIQUE (blockHash, setHash, groupId)
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
blockHash TEXT NOT NULL,
setHash TEXT NOT NULL,
groupId INTEGER NOT NULL,
timestampUTC INTEGER NOT NULL,
UNIQUE (blockHash, setHash, groupId)
);
CREATE TABLE SparkCoin (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
serialized TEXT NOT NULL,
@ -233,366 +101,22 @@ abstract class _FiroCache {
context TEXT NOT NULL,
UNIQUE(serialized, txHash, context)
);
CREATE TABLE SparkSetCoins (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
setId INTEGER NOT NULL,
coinId INTEGER NOT NULL,
FOREIGN KEY (setId) REFERENCES SparkSet(id),
FOREIGN KEY (coinId) REFERENCES SparkCoin(id)
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
setId INTEGER NOT NULL,
coinId INTEGER NOT NULL,
FOREIGN KEY (setId) REFERENCES SparkSet(id),
FOREIGN KEY (coinId) REFERENCES SparkCoin(id)
);
CREATE TABLE SparkUsedCoinTags (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
tag TEXT NOT NULL UNIQUE
);
""",
""",
);
db.dispose();
}
// ===========================================================================
// =============== Spark anonymity set queries ===============================
static Future<ResultSet> _getSetCoinsForGroupId(
int groupId, {
int? newerThanTimeStamp,
}) async {
String query = """
SELECT sc.serialized, sc.txHash, sc.context
FROM SparkSet AS ss
JOIN SparkSetCoins AS ssc ON ss.id = ssc.setId
JOIN SparkCoin AS sc ON ssc.coinId = sc.id
WHERE ss.groupId = $groupId
""";
if (newerThanTimeStamp != null) {
query += " AND ss.timestampUTC"
" > $newerThanTimeStamp";
}
return db.select("$query;");
}
static Future<ResultSet> _getLatestSetInfoForGroupId(
int groupId,
) async {
final query = """
SELECT ss.blockHash, ss.setHash, ss.timestampUTC
FROM SparkSet ss
WHERE ss.groupId = $groupId
ORDER BY ss.timestampUTC DESC
LIMIT 1;
""";
return db.select("$query;");
}
static Future<bool> _checkSetInfoForGroupIdExists(
int groupId,
) async {
final query = """
SELECT EXISTS (
SELECT 1
FROM SparkSet
WHERE groupId = $groupId
) AS setExists;
""";
return db.select("$query;").first["setExists"] == 1;
}
// ===========================================================================
// =============== Spark used coin tags queries ==============================
static Future<ResultSet> _getSparkUsedCoinTags(
int startNumber,
) async {
String query = """
SELECT tag
FROM SparkUsedCoinTags
""";
if (startNumber > 0) {
query += " WHERE id >= $startNumber";
}
return db.select("$query;");
}
static Future<ResultSet> _getUsedCoinTagsLastAddedRowId() async {
const query = """
SELECT MAX(id) AS highestId
FROM SparkUsedCoinTags;
""";
return db.select("$query;");
}
static Future<bool> _checkTagIsUsed(String tag) async {
final query = """
SELECT EXISTS (
SELECT 1
FROM SparkUsedCoinTags
WHERE tag = '$tag'
) AS tagExists;
""";
return db.select("$query;").first["tagExists"] == 1;
}
// ===========================================================================
// ================== write to spark used tags cache =========================
// debug log counter var
static int _updateTagsCount = 0;
/// update the sqlite cache
/// Expected json format:
/// {
/// "blockHash": "someBlockHash",
/// "setHash": "someSetHash",
/// "coins": [
/// ["serliazed1", "hash1", "context1"],
/// ["serliazed2", "hash2", "context2"],
/// ...
/// ["serliazed3", "hash3", "context3"],
/// ["serliazed4", "hash4", "context4"],
/// ],
/// }
///
/// returns true if successful, otherwise false
static Future<bool> _updateSparkUsedTagsWith(
List<String> tags,
) async {
final start = DateTime.now();
_updateTagsCount++;
if (tags.isEmpty) {
_debugLog(
"$_updateTagsCount _updateSparkUsedTagsWith(tags) called "
"where tags is empty",
);
_debugLog(
"$_updateTagsCount _updateSparkUsedTagsWith() "
"duration = ${DateTime.now().difference(start)}",
);
// nothing to add, return early
return true;
} else if (tags.length <= 10) {
_debugLog("$_updateTagsCount _updateSparkUsedTagsWith() called where "
"tags.length=${tags.length}, tags: $tags,");
} else {
_debugLog(
"$_updateTagsCount _updateSparkUsedTagsWith() called where"
" tags.length=${tags.length},"
" first 5 tags: ${tags.sublist(0, 5)},"
" last 5 tags: ${tags.sublist(tags.length - 5, tags.length)}",
);
}
db.execute("BEGIN;");
try {
for (final tag in tags) {
db.execute(
"""
INSERT OR IGNORE INTO SparkUsedCoinTags (tag)
VALUES (?);
""",
[tag],
);
}
db.execute("COMMIT;");
_debugLog("$_updateTagsCount _updateSparkUsedTagsWith() COMMITTED");
_debugLog(
"$_updateTagsCount _updateSparkUsedTagsWith() "
"duration = ${DateTime.now().difference(start)}",
);
return true;
} catch (e, s) {
db.execute("ROLLBACK;");
_debugLog("$_updateTagsCount _updateSparkUsedTagsWith() ROLLBACK");
_debugLog(
"$_updateTagsCount _updateSparkUsedTagsWith() "
"duration = ${DateTime.now().difference(start)}",
);
// NOTE THIS LOGGER MUST BE CALLED ON MAIN ISOLATE FOR NOW
Logging.instance.log(
"$e\n$s",
level: LogLevel.Error,
);
}
return false;
}
// ===========================================================================
// ================== write to spark anon set cache ==========================
// debug log counter var
static int _updateAnonSetCount = 0;
/// update the sqlite cache
/// Expected json format:
/// {
/// "blockHash": "someBlockHash",
/// "setHash": "someSetHash",
/// "coins": [
/// ["serliazed1", "hash1", "context1"],
/// ["serliazed2", "hash2", "context2"],
/// ...
/// ["serliazed3", "hash3", "context3"],
/// ["serliazed4", "hash4", "context4"],
/// ],
/// }
///
/// returns true if successful, otherwise false
static Future<bool> _updateSparkAnonSetCoinsWith(
Map<String, dynamic> json,
int groupId,
) async {
final start = DateTime.now();
_updateAnonSetCount++;
final blockHash = json["blockHash"] as String;
final setHash = json["setHash"] as String;
final coinsRaw = json["coins"] as List;
_debugLog(
"$_updateAnonSetCount _updateSparkAnonSetCoinsWith() "
"called where groupId=$groupId, "
"blockHash=$blockHash (${blockHash.toHexReversedFromBase64}), "
"setHash=$setHash, "
"coins.length: ${coinsRaw.isEmpty ? 0 : coinsRaw.length}",
);
if ((json["coins"] as List).isEmpty) {
_debugLog(
"$_updateAnonSetCount _updateSparkAnonSetCoinsWith()"
" called where json[coins] is Empty",
);
_debugLog(
"$_updateAnonSetCount _updateSparkAnonSetCoinsWith()"
" duration = ${DateTime.now().difference(start)}",
);
// no coins to actually insert
return true;
}
final checkResult = db.select(
"""
SELECT *
FROM SparkSet
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
""",
[
blockHash,
setHash,
groupId,
],
);
_debugLog(
"$_updateAnonSetCount _updateSparkAnonSetCoinsWith()"
" called where checkResult=$checkResult",
);
if (checkResult.isNotEmpty) {
_debugLog(
"$_updateAnonSetCount _updateSparkAnonSetCoinsWith()"
" duration = ${DateTime.now().difference(start)}",
);
// already up to date
return true;
}
final coins = coinsRaw
.map(
(e) => [
e[0] as String,
e[1] as String,
e[2] as String,
],
)
.toList();
final timestamp = DateTime.now().toUtc().millisecondsSinceEpoch ~/ 1000;
db.execute("BEGIN;");
try {
db.execute(
"""
INSERT INTO SparkSet (blockHash, setHash, groupId, timestampUTC)
VALUES (?, ?, ?, ?);
""",
[blockHash, setHash, groupId, timestamp],
);
final setId = db.lastInsertRowId;
for (final coin in coins) {
int coinId;
try {
// try to insert and get row id
db.execute(
"""
INSERT INTO SparkCoin (serialized, txHash, context)
VALUES (?, ?, ?);
""",
coin,
);
coinId = db.lastInsertRowId;
} on SqliteException catch (e) {
// if there already is a matching coin in the db
// just grab its row id
if (e.extendedResultCode == 2067) {
final result = db.select(
"""
SELECT id
FROM SparkCoin
WHERE serialized = ? AND txHash = ? AND context = ?;
""",
coin,
);
coinId = result.first["id"] as int;
} else {
rethrow;
}
}
// finally add the row id to the newly added set
db.execute(
"""
INSERT INTO SparkSetCoins (setId, coinId)
VALUES (?, ?);
""",
[setId, coinId],
);
}
db.execute("COMMIT;");
_debugLog(
"$_updateAnonSetCount _updateSparkAnonSetCoinsWith() COMMITTED",
);
_debugLog(
"$_updateAnonSetCount _updateSparkAnonSetCoinsWith() duration"
" = ${DateTime.now().difference(start)}",
);
return true;
} catch (e, s) {
db.execute("ROLLBACK;");
_debugLog("$_updateAnonSetCount _updateSparkAnonSetCoinsWith() ROLLBACK");
_debugLog(
"$_updateAnonSetCount _updateSparkAnonSetCoinsWith()"
" duration = ${DateTime.now().difference(start)}",
);
// NOTE THIS LOGGER MUST BE CALLED ON MAIN ISOLATE FOR NOW
Logging.instance.log(
"$e\n$s",
level: LogLevel.Error,
);
}
return false;
}
}

View file

@ -0,0 +1,162 @@
part of 'firo_cache.dart';
/// Wrapper class for [_FiroCache] as [_FiroCache] should eventually be handled in a
/// background isolate and [FiroCacheCoordinator] should manage that isolate
abstract class FiroCacheCoordinator {
static _FiroCacheWorker? _worker;
static bool _init = false;
static Future<void> init() async {
if (_init) {
return;
}
_init = true;
await _FiroCache.init();
_worker = await _FiroCacheWorker.spawn();
}
static Future<void> clearSharedCache() async {
return await _FiroCache._deleteAllCache();
}
static Future<String> getSparkCacheSize() async {
final dir = await StackFileSystem.applicationSQLiteDirectory();
final cacheFile = File("${dir.path}/${_FiroCache.sqliteDbFileName}");
final int bytes;
if (await cacheFile.exists()) {
bytes = await cacheFile.length();
} else {
bytes = 0;
}
if (bytes < 1024) {
return '$bytes B';
} else if (bytes < 1048576) {
final double kbSize = bytes / 1024;
return '${kbSize.toStringAsFixed(2)} KB';
} else if (bytes < 1073741824) {
final double mbSize = bytes / 1048576;
return '${mbSize.toStringAsFixed(2)} MB';
} else {
final double gbSize = bytes / 1073741824;
return '${gbSize.toStringAsFixed(2)} GB';
}
}
static Future<void> runFetchAndUpdateSparkUsedCoinTags(
ElectrumXClient client,
) async {
final count = await FiroCacheCoordinator.getUsedCoinTagsLastAddedRowId();
final unhashedTags = await client.getSparkUnhashedUsedCoinsTags(
startNumber: count,
);
if (unhashedTags.isNotEmpty) {
await _worker!.runTask(
FCTask(
func: FCFuncName._updateSparkUsedTagsWith,
data: unhashedTags,
),
);
}
}
static Future<void> runFetchAndUpdateSparkAnonSetCacheForGroupId(
int groupId,
ElectrumXClient client,
) async {
final blockhashResult =
await FiroCacheCoordinator.getLatestSetInfoForGroupId(
groupId,
);
final blockHash = blockhashResult?.blockHash ?? "";
final json = await client.getSparkAnonymitySet(
coinGroupId: groupId.toString(),
startBlockHash: blockHash.toHexReversedFromBase64,
);
await _worker!.runTask(
FCTask(
func: FCFuncName._updateSparkAnonSetCoinsWith,
data: (groupId, json),
),
);
}
// ===========================================================================
static Future<Set<String>> getUsedCoinTags(int startNumber) async {
final result = await _Reader._getSparkUsedCoinTags(
startNumber,
db: _FiroCache.db,
);
return result.map((e) => e["tag"] as String).toSet();
}
/// This should be the equivalent of counting the number of tags in the db.
/// Assuming the integrity of the data. Faster than actually calling count on
/// a table where no records have been deleted. None should be deleted from
/// this table in practice.
static Future<int> getUsedCoinTagsLastAddedRowId() async {
final result = await _Reader._getUsedCoinTagsLastAddedRowId(
db: _FiroCache.db,
);
if (result.isEmpty) {
return 0;
}
return result.first["highestId"] as int? ?? 0;
}
static Future<bool> checkTagIsUsed(
String tag,
) async {
return await _Reader._checkTagIsUsed(
tag,
db: _FiroCache.db,
);
}
static Future<ResultSet> getSetCoinsForGroupId(
int groupId, {
int? newerThanTimeStamp,
}) async {
return await _Reader._getSetCoinsForGroupId(
groupId,
db: _FiroCache.db,
newerThanTimeStamp: newerThanTimeStamp,
);
}
static Future<
({
String blockHash,
String setHash,
int timestampUTC,
})?> getLatestSetInfoForGroupId(
int groupId,
) async {
final result = await _Reader._getLatestSetInfoForGroupId(
groupId,
db: _FiroCache.db,
);
if (result.isEmpty) {
return null;
}
return (
blockHash: result.first["blockHash"] as String,
setHash: result.first["setHash"] as String,
timestampUTC: result.first["timestampUTC"] as int,
);
}
static Future<bool> checkSetInfoForGroupIdExists(
int groupId,
) async {
return await _Reader._checkSetInfoForGroupIdExists(
groupId,
db: _FiroCache.db,
);
}
}

View file

@ -0,0 +1,103 @@
part of 'firo_cache.dart';
/// Keep all fetch queries in this separate file
abstract class _Reader {
// ===========================================================================
// =============== Spark anonymity set queries ===============================
static Future<ResultSet> _getSetCoinsForGroupId(
int groupId, {
required Database db,
int? newerThanTimeStamp,
}) async {
String query = """
SELECT sc.serialized, sc.txHash, sc.context
FROM SparkSet AS ss
JOIN SparkSetCoins AS ssc ON ss.id = ssc.setId
JOIN SparkCoin AS sc ON ssc.coinId = sc.id
WHERE ss.groupId = $groupId
""";
if (newerThanTimeStamp != null) {
query += " AND ss.timestampUTC"
" > $newerThanTimeStamp";
}
return db.select("$query;");
}
static Future<ResultSet> _getLatestSetInfoForGroupId(
int groupId, {
required Database db,
}) async {
final query = """
SELECT ss.blockHash, ss.setHash, ss.timestampUTC
FROM SparkSet ss
WHERE ss.groupId = $groupId
ORDER BY ss.timestampUTC DESC
LIMIT 1;
""";
return db.select("$query;");
}
static Future<bool> _checkSetInfoForGroupIdExists(
int groupId, {
required Database db,
}) async {
final query = """
SELECT EXISTS (
SELECT 1
FROM SparkSet
WHERE groupId = $groupId
) AS setExists;
""";
return db.select("$query;").first["setExists"] == 1;
}
// ===========================================================================
// =============== Spark used coin tags queries ==============================
static Future<ResultSet> _getSparkUsedCoinTags(
int startNumber, {
required Database db,
}) async {
String query = """
SELECT tag
FROM SparkUsedCoinTags
""";
if (startNumber > 0) {
query += " WHERE id >= $startNumber";
}
return db.select("$query;");
}
static Future<ResultSet> _getUsedCoinTagsLastAddedRowId({
required Database db,
}) async {
const query = """
SELECT MAX(id) AS highestId
FROM SparkUsedCoinTags;
""";
return db.select("$query;");
}
static Future<bool> _checkTagIsUsed(
String tag, {
required Database db,
}) async {
final query = """
SELECT EXISTS (
SELECT 1
FROM SparkUsedCoinTags
WHERE tag = '$tag'
) AS tagExists;
""";
return db.select("$query;").first["tagExists"] == 1;
}
}

View file

@ -0,0 +1,120 @@
part of 'firo_cache.dart';
enum FCFuncName {
_updateSparkAnonSetCoinsWith,
_updateSparkUsedTagsWith,
}
class FCTask {
final id = const Uuid().v4();
final FCFuncName func;
final dynamic data;
FCTask({required this.func, required this.data});
}
class _FiroCacheWorker {
final SendPort _commands;
final ReceivePort _responses;
final Map<String, Completer<Object?>> _activeRequests = {};
Future<Object?> runTask(FCTask task) async {
final completer = Completer<Object?>.sync();
_activeRequests[task.id] = completer;
_commands.send(task);
return await completer.future;
}
static Future<_FiroCacheWorker> spawn() async {
final sqliteDir = await StackFileSystem.applicationSQLiteDirectory();
final dbFilePath = "${sqliteDir.path}/${_FiroCache.sqliteDbFileName}";
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (dynamic initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete(
(
ReceivePort.fromRawReceivePort(initPort),
commandPort,
),
);
};
try {
await Isolate.spawn(
_startWorkerIsolate,
(initPort.sendPort, dbFilePath),
);
} catch (_) {
initPort.close();
rethrow;
}
final (receivePort, sendPort) = await connection.future;
return _FiroCacheWorker._(receivePort, sendPort);
}
_FiroCacheWorker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
void _handleResponsesFromIsolate(dynamic message) {
final (id, error) = message as (String, Object?);
final completer = _activeRequests.remove(id)!;
if (error != null) {
completer.completeError(error);
} else {
completer.complete(id);
}
}
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
Database db,
Mutex mutex,
) {
receivePort.listen((message) {
final task = message as FCTask;
mutex.protect(() async {
try {
final FCResult result;
switch (task.func) {
case FCFuncName._updateSparkAnonSetCoinsWith:
final data = task.data as (int, Map<String, dynamic>);
result = _updateSparkAnonSetCoinsWith(db, data.$2, data.$1);
break;
case FCFuncName._updateSparkUsedTagsWith:
result = _updateSparkUsedTagsWith(db, task.data as List<String>);
break;
}
if (result.success) {
sendPort.send((task.id, null));
} else {
sendPort.send((task.id, result.error!));
}
} catch (e) {
sendPort.send((task.id, e));
}
});
});
}
static void _startWorkerIsolate((SendPort, String) args) {
final receivePort = ReceivePort();
args.$1.send(receivePort.sendPort);
final mutex = Mutex();
final db = sqlite3.open(
args.$2,
mode: OpenMode.readWrite,
);
_handleCommandsToIsolate(receivePort, args.$1, db, mutex);
}
}

View file

@ -0,0 +1,169 @@
part of 'firo_cache.dart';
class FCResult {
final bool success;
final Object? error;
FCResult({required this.success, this.error});
}
// ===========================================================================
// ================== write to spark used tags cache =========================
/// update the sqlite cache
/// Expected json format:
/// returns true if successful, otherwise some exception
FCResult _updateSparkUsedTagsWith(
Database db,
List<String> tags,
) {
// hash the tags here since this function is called in a background isolate
final hashedTags = LibSpark.hashTags(base64Tags: tags);
if (hashedTags.isEmpty) {
// nothing to add, return early
return FCResult(success: true);
}
db.execute("BEGIN;");
try {
for (final tag in hashedTags) {
db.execute(
"""
INSERT OR IGNORE INTO SparkUsedCoinTags (tag)
VALUES (?);
""",
[tag],
);
}
db.execute("COMMIT;");
return FCResult(success: true);
} catch (e) {
db.execute("ROLLBACK;");
return FCResult(success: false, error: e);
}
}
// ===========================================================================
// ================== write to spark anon set cache ==========================
/// update the sqlite cache
/// Expected json format:
/// {
/// "blockHash": "someBlockHash",
/// "setHash": "someSetHash",
/// "coins": [
/// ["serliazed1", "hash1", "context1"],
/// ["serliazed2", "hash2", "context2"],
/// ...
/// ["serliazed3", "hash3", "context3"],
/// ["serliazed4", "hash4", "context4"],
/// ],
/// }
///
/// returns true if successful, otherwise false
FCResult _updateSparkAnonSetCoinsWith(
Database db,
Map<String, dynamic> json,
int groupId,
) {
final blockHash = json["blockHash"] as String;
final setHash = json["setHash"] as String;
final coinsRaw = json["coins"] as List;
if (coinsRaw.isEmpty) {
// no coins to actually insert
return FCResult(success: true);
}
final checkResult = db.select(
"""
SELECT *
FROM SparkSet
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
""",
[
blockHash,
setHash,
groupId,
],
);
if (checkResult.isNotEmpty) {
// already up to date
return FCResult(success: true);
}
final coins = coinsRaw
.map(
(e) => [
e[0] as String,
e[1] as String,
e[2] as String,
],
)
.toList();
final timestamp = DateTime.now().toUtc().millisecondsSinceEpoch ~/ 1000;
db.execute("BEGIN;");
try {
db.execute(
"""
INSERT INTO SparkSet (blockHash, setHash, groupId, timestampUTC)
VALUES (?, ?, ?, ?);
""",
[blockHash, setHash, groupId, timestamp],
);
final setId = db.lastInsertRowId;
for (final coin in coins) {
int coinId;
try {
// try to insert and get row id
db.execute(
"""
INSERT INTO SparkCoin (serialized, txHash, context)
VALUES (?, ?, ?);
""",
coin,
);
coinId = db.lastInsertRowId;
} on SqliteException catch (e) {
// if there already is a matching coin in the db
// just grab its row id
if (e.extendedResultCode == 2067) {
final result = db.select(
"""
SELECT id
FROM SparkCoin
WHERE serialized = ? AND txHash = ? AND context = ?;
""",
coin,
);
coinId = result.first["id"] as int;
} else {
rethrow;
}
}
// finally add the row id to the newly added set
db.execute(
"""
INSERT INTO SparkSetCoins (setId, coinId)
VALUES (?, ?);
""",
[setId, coinId],
);
}
db.execute("COMMIT;");
return FCResult(success: true);
} catch (e) {
db.execute("ROLLBACK;");
return FCResult(success: false, error: e);
}
}