feat: init electrum worker

This commit is contained in:
Rafael Saes 2024-10-30 19:16:00 -03:00
parent 433686bce3
commit f3a0ff7001
2 changed files with 248 additions and 21 deletions

View file

@ -1,8 +1,10 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
import 'package:bitcoin_base/bitcoin_base.dart';
import 'package:cw_bitcoin/electrum_worker.dart';
import 'package:shared_preferences/shared_preferences.dart';
import 'package:blockchain_utils/blockchain_utils.dart';
import 'package:collection/collection.dart';
@ -46,6 +48,11 @@ class ElectrumWallet = ElectrumWalletBase with _$ElectrumWallet;
abstract class ElectrumWalletBase
extends WalletBase<ElectrumBalance, ElectrumTransactionHistory, ElectrumTransactionInfo>
with Store, WalletKeysFile {
ReceivePort? receivePort;
SendPort? workerSendPort;
StreamSubscription? _workerSubscription;
Isolate? _workerIsolate;
ElectrumWalletBase({
required String password,
required WalletInfo walletInfo,
@ -97,6 +104,45 @@ abstract class ElectrumWalletBase
sharedPrefs.complete(SharedPreferences.getInstance());
}
void _handleWorkerResponse(dynamic response) {
print('Main: worker response: $response');
final workerResponse = ElectrumWorkerResponse.fromJson(
jsonDecode(response.toString()) as Map<String, dynamic>,
);
if (workerResponse.error != null) {
// Handle error
print('Worker error: ${workerResponse.error}');
return;
}
switch (workerResponse.method) {
case 'connectionStatus':
final status = workerResponse.data as String;
final connectionStatus = ConnectionStatus.values.firstWhere(
(e) => e.toString() == status,
);
_onConnectionStatusChange(connectionStatus);
break;
case 'fetchBalances':
final balance = ElectrumBalance.fromJSON(
jsonDecode(workerResponse.data.toString()).toString(),
);
// Update the balance state
// this.balance[currency] = balance!;
break;
// Handle other responses...
}
}
// Don't forget to clean up in the close method
// @override
// Future<void> close({required bool shouldCleanup}) async {
// await _workerSubscription?.cancel();
// await super.close(shouldCleanup: shouldCleanup);
// }
static Bip32Slip10Secp256k1 getAccountHDWallet(CryptoCurrency? currency, BasedUtxoNetwork network,
List<int>? seedBytes, String? xpub, DerivationInfo? derivationInfo) {
if (seedBytes == null && xpub == null) {
@ -234,7 +280,6 @@ abstract class ElectrumWalletBase
void Function(FlutterErrorDetails)? _onError;
Timer? _autoSaveTimer;
StreamSubscription<dynamic>? _receiveStream;
Timer? _updateFeeRateTimer;
static const int _autoSaveInterval = 1;
@ -256,13 +301,19 @@ abstract class ElectrumWalletBase
syncStatus = SynchronizingSyncStatus();
await subscribeForHeaders();
await subscribeForUpdates();
// await subscribeForHeaders();
// await subscribeForUpdates();
// await updateTransactions();
// await updateAllUnspents();
await updateBalance();
await updateFeeRates();
// await updateBalance();
// await updateFeeRates();
workerSendPort?.send(
ElectrumWorkerMessage(
method: 'blockchain.scripthash.get_balance',
params: {'scriptHash': scriptHashes.first},
).toJson(),
);
_updateFeeRateTimer ??=
Timer.periodic(const Duration(seconds: 5), (timer) async => await updateFeeRates());
@ -369,28 +420,34 @@ abstract class ElectrumWalletBase
@action
@override
Future<void> connectToNode({required Node node}) async {
scripthashesListening = {};
_isTransactionUpdating = false;
_chainTipListenerOn = false;
this.node = node;
try {
syncStatus = ConnectingSyncStatus();
await _receiveStream?.cancel();
rpc?.disconnect();
if (_workerIsolate != null) {
_workerIsolate!.kill(priority: Isolate.immediate);
_workerSubscription?.cancel();
receivePort?.close();
}
// electrumClient.onConnectionStatusChange = _onConnectionStatusChange;
receivePort = ReceivePort();
this.electrumClient2 = ElectrumApiProvider(
await ElectrumTCPService.connect(
node.uri,
onConnectionStatusChange: _onConnectionStatusChange,
defaultRequestTimeOut: const Duration(seconds: 5),
connectionTimeOut: const Duration(seconds: 5),
),
);
// await electrumClient.connectToUri(node.uri, useSSL: node.useSSL);
_workerIsolate = await Isolate.spawn<SendPort>(ElectrumWorker.run, receivePort!.sendPort);
_workerSubscription = receivePort!.listen((message) {
if (message is SendPort) {
workerSendPort = message;
workerSendPort!.send(
ElectrumWorkerMessage(
method: 'connect',
params: {'uri': node.uri.toString()},
).toJson(),
);
} else {
_handleWorkerResponse(message);
}
});
} catch (e, stacktrace) {
print(stacktrace);
print("connectToNode $e");
@ -1146,7 +1203,6 @@ abstract class ElectrumWalletBase
@override
Future<void> close({required bool shouldCleanup}) async {
try {
await _receiveStream?.cancel();
await electrumClient.close();
} catch (_) {}
_autoSaveTimer?.cancel();

View file

@ -0,0 +1,171 @@
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
import 'package:bitcoin_base/bitcoin_base.dart';
import 'package:cw_bitcoin/electrum_balance.dart';
class ElectrumWorkerMessage {
final String method;
final Map<String, dynamic> params;
ElectrumWorkerMessage({
required this.method,
required this.params,
});
Map<String, dynamic> toJson() => {
'method': method,
'params': params,
};
factory ElectrumWorkerMessage.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerMessage(
method: json['method'] as String,
params: json['params'] as Map<String, dynamic>,
);
}
}
class ElectrumWorkerResponse {
final String method;
final dynamic data;
final String? error;
ElectrumWorkerResponse({
required this.method,
required this.data,
this.error,
});
Map<String, dynamic> toJson() => {
'method': method,
'data': data,
'error': error,
};
factory ElectrumWorkerResponse.fromJson(Map<String, dynamic> json) {
return ElectrumWorkerResponse(
method: json['method'] as String,
data: json['data'],
error: json['error'] as String?,
);
}
}
class ElectrumWorker {
final SendPort sendPort;
ElectrumApiProvider? _electrumClient;
ElectrumWorker._(this.sendPort, {ElectrumApiProvider? electrumClient})
: _electrumClient = electrumClient;
static void run(SendPort sendPort) {
final worker = ElectrumWorker._(sendPort);
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
receivePort.listen(worker.handleMessage);
}
Future<void> _handleConnect({
required Uri uri,
}) async {
_electrumClient = ElectrumApiProvider(
await ElectrumTCPService.connect(
uri,
onConnectionStatusChange: (status) {
_sendResponse('connectionStatus', status.toString());
},
defaultRequestTimeOut: const Duration(seconds: 5),
connectionTimeOut: const Duration(seconds: 5),
),
);
}
void handleMessage(dynamic message) async {
try {
final workerMessage = ElectrumWorkerMessage.fromJson(message as Map<String, dynamic>);
switch (workerMessage.method) {
case 'connect':
final uri = Uri.parse(workerMessage.params['uri'] as String);
await _handleConnect(uri: uri);
break;
case 'blockchain.scripthash.get_balance':
await _handleGetBalance(workerMessage);
break;
case 'blockchain.scripthash.get_history':
// await _handleGetHistory(workerMessage);
break;
case 'blockchain.scripthash.listunspent':
// await _handleListUnspent(workerMessage);
break;
// Add other method handlers here
default:
_sendError(workerMessage.method, 'Unsupported method: ${workerMessage.method}');
}
} catch (e, s) {
print(s);
_sendError('unknown', e.toString());
}
}
void _sendResponse(String method, dynamic data) {
final response = ElectrumWorkerResponse(
method: method,
data: data,
);
sendPort.send(jsonEncode(response.toJson()));
}
void _sendError(String method, String error) {
final response = ElectrumWorkerResponse(
method: method,
data: null,
error: error,
);
sendPort.send(jsonEncode(response.toJson()));
}
Future<void> _handleGetBalance(ElectrumWorkerMessage message) async {
try {
final scriptHash = message.params['scriptHash'] as String;
final result = await _electrumClient!.request(
ElectrumGetScriptHashBalance(scriptHash: scriptHash),
);
final balance = ElectrumBalance(
confirmed: result['confirmed'] as int? ?? 0,
unconfirmed: result['unconfirmed'] as int? ?? 0,
frozen: 0,
);
_sendResponse(message.method, balance.toJSON());
} catch (e, s) {
print(s);
_sendError(message.method, e.toString());
}
}
// Future<void> _handleGetHistory(ElectrumWorkerMessage message) async {
// try {
// final scriptHash = message.params['scriptHash'] as String;
// final result = await electrumClient.getHistory(scriptHash);
// _sendResponse(message.method, jsonEncode(result));
// } catch (e) {
// _sendError(message.method, e.toString());
// }
// }
// Future<void> _handleListUnspent(ElectrumWorkerMessage message) async {
// try {
// final scriptHash = message.params['scriptHash'] as String;
// final result = await electrumClient.listUnspent(scriptHash);
// _sendResponse(message.method, jsonEncode(result));
// } catch (e) {
// _sendError(message.method, e.toString());
// }
// }
}