diff --git a/cw_bitcoin/lib/electrum_wallet.dart b/cw_bitcoin/lib/electrum_wallet.dart index 1d0bcfa2d..1c43c03ac 100644 --- a/cw_bitcoin/lib/electrum_wallet.dart +++ b/cw_bitcoin/lib/electrum_wallet.dart @@ -1,8 +1,10 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'dart:isolate'; import 'package:bitcoin_base/bitcoin_base.dart'; +import 'package:cw_bitcoin/electrum_worker.dart'; import 'package:shared_preferences/shared_preferences.dart'; import 'package:blockchain_utils/blockchain_utils.dart'; import 'package:collection/collection.dart'; @@ -46,6 +48,11 @@ class ElectrumWallet = ElectrumWalletBase with _$ElectrumWallet; abstract class ElectrumWalletBase extends WalletBase with Store, WalletKeysFile { + ReceivePort? receivePort; + SendPort? workerSendPort; + StreamSubscription? _workerSubscription; + Isolate? _workerIsolate; + ElectrumWalletBase({ required String password, required WalletInfo walletInfo, @@ -97,6 +104,45 @@ abstract class ElectrumWalletBase sharedPrefs.complete(SharedPreferences.getInstance()); } + void _handleWorkerResponse(dynamic response) { + print('Main: worker response: $response'); + + final workerResponse = ElectrumWorkerResponse.fromJson( + jsonDecode(response.toString()) as Map, + ); + + if (workerResponse.error != null) { + // Handle error + print('Worker error: ${workerResponse.error}'); + return; + } + + switch (workerResponse.method) { + case 'connectionStatus': + final status = workerResponse.data as String; + final connectionStatus = ConnectionStatus.values.firstWhere( + (e) => e.toString() == status, + ); + _onConnectionStatusChange(connectionStatus); + break; + case 'fetchBalances': + final balance = ElectrumBalance.fromJSON( + jsonDecode(workerResponse.data.toString()).toString(), + ); + // Update the balance state + // this.balance[currency] = balance!; + break; + // Handle other responses... + } + } + + // Don't forget to clean up in the close method + // @override + // Future close({required bool shouldCleanup}) async { + // await _workerSubscription?.cancel(); + // await super.close(shouldCleanup: shouldCleanup); + // } + static Bip32Slip10Secp256k1 getAccountHDWallet(CryptoCurrency? currency, BasedUtxoNetwork network, List? seedBytes, String? xpub, DerivationInfo? derivationInfo) { if (seedBytes == null && xpub == null) { @@ -234,7 +280,6 @@ abstract class ElectrumWalletBase void Function(FlutterErrorDetails)? _onError; Timer? _autoSaveTimer; - StreamSubscription? _receiveStream; Timer? _updateFeeRateTimer; static const int _autoSaveInterval = 1; @@ -256,13 +301,19 @@ abstract class ElectrumWalletBase syncStatus = SynchronizingSyncStatus(); - await subscribeForHeaders(); - await subscribeForUpdates(); + // await subscribeForHeaders(); + // await subscribeForUpdates(); // await updateTransactions(); // await updateAllUnspents(); - await updateBalance(); - await updateFeeRates(); + // await updateBalance(); + // await updateFeeRates(); + workerSendPort?.send( + ElectrumWorkerMessage( + method: 'blockchain.scripthash.get_balance', + params: {'scriptHash': scriptHashes.first}, + ).toJson(), + ); _updateFeeRateTimer ??= Timer.periodic(const Duration(seconds: 5), (timer) async => await updateFeeRates()); @@ -369,28 +420,34 @@ abstract class ElectrumWalletBase @action @override Future connectToNode({required Node node}) async { - scripthashesListening = {}; - _isTransactionUpdating = false; - _chainTipListenerOn = false; this.node = node; try { syncStatus = ConnectingSyncStatus(); - await _receiveStream?.cancel(); - rpc?.disconnect(); + if (_workerIsolate != null) { + _workerIsolate!.kill(priority: Isolate.immediate); + _workerSubscription?.cancel(); + receivePort?.close(); + } - // electrumClient.onConnectionStatusChange = _onConnectionStatusChange; + receivePort = ReceivePort(); - this.electrumClient2 = ElectrumApiProvider( - await ElectrumTCPService.connect( - node.uri, - onConnectionStatusChange: _onConnectionStatusChange, - defaultRequestTimeOut: const Duration(seconds: 5), - connectionTimeOut: const Duration(seconds: 5), - ), - ); - // await electrumClient.connectToUri(node.uri, useSSL: node.useSSL); + _workerIsolate = await Isolate.spawn(ElectrumWorker.run, receivePort!.sendPort); + + _workerSubscription = receivePort!.listen((message) { + if (message is SendPort) { + workerSendPort = message; + workerSendPort!.send( + ElectrumWorkerMessage( + method: 'connect', + params: {'uri': node.uri.toString()}, + ).toJson(), + ); + } else { + _handleWorkerResponse(message); + } + }); } catch (e, stacktrace) { print(stacktrace); print("connectToNode $e"); @@ -1146,7 +1203,6 @@ abstract class ElectrumWalletBase @override Future close({required bool shouldCleanup}) async { try { - await _receiveStream?.cancel(); await electrumClient.close(); } catch (_) {} _autoSaveTimer?.cancel(); diff --git a/cw_bitcoin/lib/electrum_worker.dart b/cw_bitcoin/lib/electrum_worker.dart new file mode 100644 index 000000000..553c4df6a --- /dev/null +++ b/cw_bitcoin/lib/electrum_worker.dart @@ -0,0 +1,171 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:isolate'; + +import 'package:bitcoin_base/bitcoin_base.dart'; +import 'package:cw_bitcoin/electrum_balance.dart'; + +class ElectrumWorkerMessage { + final String method; + final Map params; + + ElectrumWorkerMessage({ + required this.method, + required this.params, + }); + + Map toJson() => { + 'method': method, + 'params': params, + }; + + factory ElectrumWorkerMessage.fromJson(Map json) { + return ElectrumWorkerMessage( + method: json['method'] as String, + params: json['params'] as Map, + ); + } +} + +class ElectrumWorkerResponse { + final String method; + final dynamic data; + final String? error; + + ElectrumWorkerResponse({ + required this.method, + required this.data, + this.error, + }); + + Map toJson() => { + 'method': method, + 'data': data, + 'error': error, + }; + + factory ElectrumWorkerResponse.fromJson(Map json) { + return ElectrumWorkerResponse( + method: json['method'] as String, + data: json['data'], + error: json['error'] as String?, + ); + } +} + +class ElectrumWorker { + final SendPort sendPort; + ElectrumApiProvider? _electrumClient; + + ElectrumWorker._(this.sendPort, {ElectrumApiProvider? electrumClient}) + : _electrumClient = electrumClient; + + static void run(SendPort sendPort) { + final worker = ElectrumWorker._(sendPort); + final receivePort = ReceivePort(); + + sendPort.send(receivePort.sendPort); + + receivePort.listen(worker.handleMessage); + } + + Future _handleConnect({ + required Uri uri, + }) async { + _electrumClient = ElectrumApiProvider( + await ElectrumTCPService.connect( + uri, + onConnectionStatusChange: (status) { + _sendResponse('connectionStatus', status.toString()); + }, + defaultRequestTimeOut: const Duration(seconds: 5), + connectionTimeOut: const Duration(seconds: 5), + ), + ); + } + + void handleMessage(dynamic message) async { + try { + final workerMessage = ElectrumWorkerMessage.fromJson(message as Map); + + switch (workerMessage.method) { + case 'connect': + final uri = Uri.parse(workerMessage.params['uri'] as String); + await _handleConnect(uri: uri); + break; + case 'blockchain.scripthash.get_balance': + await _handleGetBalance(workerMessage); + break; + case 'blockchain.scripthash.get_history': + // await _handleGetHistory(workerMessage); + break; + case 'blockchain.scripthash.listunspent': + // await _handleListUnspent(workerMessage); + break; + // Add other method handlers here + default: + _sendError(workerMessage.method, 'Unsupported method: ${workerMessage.method}'); + } + } catch (e, s) { + print(s); + _sendError('unknown', e.toString()); + } + } + + void _sendResponse(String method, dynamic data) { + final response = ElectrumWorkerResponse( + method: method, + data: data, + ); + sendPort.send(jsonEncode(response.toJson())); + } + + void _sendError(String method, String error) { + final response = ElectrumWorkerResponse( + method: method, + data: null, + error: error, + ); + sendPort.send(jsonEncode(response.toJson())); + } + + Future _handleGetBalance(ElectrumWorkerMessage message) async { + try { + final scriptHash = message.params['scriptHash'] as String; + final result = await _electrumClient!.request( + ElectrumGetScriptHashBalance(scriptHash: scriptHash), + ); + + final balance = ElectrumBalance( + confirmed: result['confirmed'] as int? ?? 0, + unconfirmed: result['unconfirmed'] as int? ?? 0, + frozen: 0, + ); + + _sendResponse(message.method, balance.toJSON()); + } catch (e, s) { + print(s); + _sendError(message.method, e.toString()); + } + } + + // Future _handleGetHistory(ElectrumWorkerMessage message) async { + // try { + // final scriptHash = message.params['scriptHash'] as String; + // final result = await electrumClient.getHistory(scriptHash); + // _sendResponse(message.method, jsonEncode(result)); + // } catch (e) { + // _sendError(message.method, e.toString()); + // } + // } + + // Future _handleListUnspent(ElectrumWorkerMessage message) async { + // try { + // final scriptHash = message.params['scriptHash'] as String; + // final result = await electrumClient.listUnspent(scriptHash); + // _sendResponse(message.method, jsonEncode(result)); + // } catch (e) { + // _sendError(message.method, e.toString()); + // } + // } +}