// /* // * This file is part of Stack Wallet. // * // * Copyright (c) 2023 Cypher Stack // * All Rights Reserved. // * The code is distributed under GPLv3 license, see LICENSE file for details. // * Generated by Cypher Stack on 2023-05-26 // * // */ // // import 'dart:async'; // import 'dart:convert'; // import 'dart:io'; // // import 'package:event_bus/event_bus.dart'; // import 'package:mutex/mutex.dart'; // import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; // import 'package:stackwallet/exceptions/json_rpc/json_rpc_exception.dart'; // import 'package:stackwallet/services/event_bus/events/global/tor_connection_status_changed_event.dart'; // import 'package:stackwallet/services/event_bus/events/global/tor_status_changed_event.dart'; // import 'package:stackwallet/services/event_bus/global_event_bus.dart'; // import 'package:stackwallet/services/tor_service.dart'; // import 'package:stackwallet/utilities/logger.dart'; // import 'package:stackwallet/utilities/prefs.dart'; // import 'package:tor_ffi_plugin/socks_socket.dart'; // // class ElectrumXSubscription { // final StreamController _controller = // StreamController(); // TODO controller params // // Stream get responseStream => _controller.stream; // // void addToStream(dynamic data) => _controller.add(data); // } // // class SocketTask { // SocketTask({this.completer, this.subscription}); // // final Completer? completer; // final ElectrumXSubscription? subscription; // // bool get isSubscription => subscription != null; // } // // class SubscribableElectrumXClient { // int _currentRequestID = 0; // bool _isConnected = false; // List _responseData = []; // final Map _tasks = {}; // Timer? _aliveTimer; // Socket? _socket; // SOCKSSocket? _socksSocket; // late final bool _useSSL; // late final Duration _connectionTimeout; // late final Duration _keepAlive; // // bool get isConnected => _isConnected; // bool get useSSL => _useSSL; // // Used to reconnect. // String? _host; // int? _port; // // void Function(bool)? onConnectionStatusChanged; // // late Prefs _prefs; // late TorService _torService; // StreamSubscription? _torPreferenceListener; // StreamSubscription? _torStatusListener; // final Mutex _torConnectingLock = Mutex(); // bool _requireMutex = false; // // List? failovers; // int currentFailoverIndex = -1; // // SubscribableElectrumXClient({ // required bool useSSL, // required Prefs prefs, // required List failovers, // TorService? torService, // this.onConnectionStatusChanged, // Duration connectionTimeout = const Duration(seconds: 5), // Duration keepAlive = const Duration(seconds: 10), // EventBus? globalEventBusForTesting, // }) { // _useSSL = useSSL; // _prefs = prefs; // _torService = torService ?? TorService.sharedInstance; // _connectionTimeout = connectionTimeout; // _keepAlive = keepAlive; // // // If we're testing, use the global event bus for testing. // final bus = globalEventBusForTesting ?? GlobalEventBus.instance; // // // Listen to global event bus for Tor status changes. // _torStatusListener = bus.on().listen( // (event) async { // try { // switch (event.newStatus) { // case TorConnectionStatus.connecting: // // If Tor is connecting, we need to wait. // await _torConnectingLock.acquire(); // _requireMutex = true; // break; // // case TorConnectionStatus.connected: // case TorConnectionStatus.disconnected: // // If Tor is connected or disconnected, we can release the lock. // if (_torConnectingLock.isLocked) { // _torConnectingLock.release(); // } // _requireMutex = false; // break; // } // } finally { // // Ensure the lock is released. // if (_torConnectingLock.isLocked) { // _torConnectingLock.release(); // } // } // }, // ); // // // Listen to global event bus for Tor preference changes. // _torPreferenceListener = bus.on().listen( // (event) async { // // Close open socket (if open). // final tempSocket = _socket; // _socket = null; // await tempSocket?.close(); // // // Close open SOCKS socket (if open). // final tempSOCKSSocket = _socksSocket; // _socksSocket = null; // await tempSOCKSSocket?.close(); // // // Clear subscriptions. // _tasks.clear(); // // // Cancel alive timer // _aliveTimer?.cancel(); // }, // ); // } // // factory SubscribableElectrumXClient.from({ // required ElectrumXNode node, // required Prefs prefs, // required List failovers, // TorService? torService, // }) { // return SubscribableElectrumXClient( // useSSL: node.useSSL, // prefs: prefs, // failovers: failovers, // torService: torService ?? TorService.sharedInstance, // ); // } // // // Example for returning a future which completes upon connection. // // static Future from({ // // required ElectrumXNode node, // // TorService? torService, // // }) async { // // final client = SubscribableElectrumXClient( // // useSSL: node.useSSL, // // ); // // // // await client.connect(host: node.address, port: node.port); // // // // return client; // // } // // /// Check if the RPC client is connected and connect if needed. // /// // /// If Tor is enabled but not running, it will attempt to start Tor. // Future _checkSocket({bool connecting = false}) async { // if (_prefs.useTor) { // // If we're supposed to use Tor... // if (_torService.status != TorConnectionStatus.connected) { // // ... but Tor isn't running... // if (!_prefs.torKillSwitch) { // // ... and the killswitch isn't set, then we'll just return below. // Logging.instance.log( // "Tor preference set but Tor is not enabled, killswitch not set, connecting to ElectrumX through clearnet.", // level: LogLevel.Warning, // ); // } else { // // ... but if the killswitch is set, then let's try to start Tor. // await _torService.start(); // // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. // // // Double-check that Tor is running. // if (_torService.status != TorConnectionStatus.connected) { // // If Tor still isn't running, then we'll throw an exception. // throw Exception("SubscribableElectrumXClient._checkRpcClient: " // "Tor preference and killswitch set but Tor not enabled and could not start, not connecting to ElectrumX."); // } // } // } // } // // // Connect if needed. // if (!connecting) { // if ((!_prefs.useTor && _socket == null) || // (_prefs.useTor && _socksSocket == null)) { // if (currentFailoverIndex == -1) { // // Check if we have cached node information // if (_host == null && _port == null) { // throw Exception("SubscribableElectrumXClient._checkRpcClient: " // "No host or port provided and no cached node information."); // } // // // Connect to the server. // await connect(host: _host!, port: _port!); // } else { // // Attempt to connect to the next failover server. // await connect( // host: failovers![currentFailoverIndex].address, // port: failovers![currentFailoverIndex].port, // ); // } // } // } // } // // /// Connect to the server. // /// // /// If Tor is enabled, it will attempt to connect through Tor. // Future connect({ // required String host, // required int port, // }) async { // try { // // Cache node information. // _host = host; // _port = port; // // // If we're already connected, disconnect first. // try { // await _socket?.close(); // } catch (_) {} // // // If we're connecting to Tor, wait. // if (_requireMutex) { // await _torConnectingLock // .protect(() async => await _checkSocket(connecting: true)); // } else { // await _checkSocket(connecting: true); // } // // if (!Prefs.instance.useTor) { // // If we're not supposed to use Tor, then connect directly. // await connectClearnet(host, port); // } else { // // If we're supposed to use Tor... // if (_torService.status != TorConnectionStatus.connected) { // // ... but Tor isn't running... // if (!_prefs.torKillSwitch) { // // ... and the killswitch isn't set, then we'll connect clearnet. // Logging.instance.log( // "Tor preference set but Tor not enabled, no killswitch set, connecting to ElectrumX through clearnet", // level: LogLevel.Warning, // ); // await connectClearnet(host, port); // } else { // // ... but if the killswitch is set, then let's try to start Tor. // await _torService.start(); // // TODO [prio=low]: Attempt to restart Tor if needed. Update Tor package for restart feature. // // // Doublecheck that Tor is running. // if (_torService.status != TorConnectionStatus.connected) { // // If Tor still isn't running, then we'll throw an exception. // throw Exception( // "Tor preference and killswitch set but Tor not enabled, not connecting to ElectrumX"); // } // // // Connect via Tor. // await connectTor(host, port); // } // } else { // // Connect via Tor. // await connectTor(host, port); // } // } // // _updateConnectionStatus(true); // // if (_prefs.useTor) { // if (_socksSocket == null) { // final String msg = "SubscribableElectrumXClient.connect(): " // "cannot listen to $host:$port via SOCKSSocket because it is not connected."; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw Exception(msg); // } // // _socksSocket!.listen( // _dataHandler, // onError: _errorHandler, // onDone: _doneHandler, // cancelOnError: true, // ); // } else { // if (_socket == null) { // final String msg = "SubscribableElectrumXClient.connect(): " // "cannot listen to $host:$port via socket because it is not connected."; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw Exception(msg); // } // // _socket!.listen( // _dataHandler, // onError: _errorHandler, // onDone: _doneHandler, // cancelOnError: true, // ); // } // // _aliveTimer?.cancel(); // _aliveTimer = Timer.periodic( // _keepAlive, // (_) async => _updateConnectionStatus(await ping()), // ); // } catch (e, s) { // final msg = "SubscribableElectrumXClient.connect: " // "failed to connect to $host:$port." // "\nError: $e\nStack trace: $s"; // Logging.instance.log(msg, level: LogLevel.Fatal); // // // Ensure cleanup is performed on failure to avoid resource leaks. // await disconnect(); // Use the disconnect method to clean up. // rethrow; // Rethrow the exception to handle it further up the call stack. // } // } // // /// Connect to the server directly. // Future connectClearnet(String host, int port) async { // try { // Logging.instance.log( // "SubscribableElectrumXClient.connectClearnet(): " // "creating a socket to $host:$port (SSL $useSSL)...", // level: LogLevel.Info); // // if (_useSSL) { // _socket = await SecureSocket.connect( // host, // port, // timeout: _connectionTimeout, // onBadCertificate: (_) => // true, // TODO do not automatically trust bad certificates. // ); // } else { // _socket = await Socket.connect( // host, // port, // timeout: _connectionTimeout, // ); // } // // Logging.instance.log( // "SubscribableElectrumXClient.connectClearnet(): " // "created socket to $host:$port...", // level: LogLevel.Info); // } catch (e, s) { // final String msg = "SubscribableElectrumXClient.connectClearnet: " // "failed to connect to $host (SSL: $useSSL)." // "\nError: $e\nStack trace: $s"; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw JsonRpcException(msg); // } // // return; // } // // /// Connect to the server using the Tor service. // Future connectTor(String host, int port) async { // // Get the proxy info from the TorService. // final proxyInfo = _torService.getProxyInfo(); // // try { // Logging.instance.log( // "SubscribableElectrumXClient.connectTor(): " // "creating a SOCKS socket at $proxyInfo (SSL $useSSL)...", // level: LogLevel.Info); // // // Create a socks socket using the Tor service's proxy info. // _socksSocket = await SOCKSSocket.create( // proxyHost: proxyInfo.host.address, // proxyPort: proxyInfo.port, // sslEnabled: useSSL, // ); // // Logging.instance.log( // "SubscribableElectrumXClient.connectTor(): " // "created SOCKS socket at $proxyInfo...", // level: LogLevel.Info); // } catch (e, s) { // final String msg = "SubscribableElectrumXClient.connectTor(): " // "failed to create a SOCKS socket at $proxyInfo (SSL $useSSL)..." // "\nError: $e\nStack trace: $s"; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw JsonRpcException(msg); // } // // try { // Logging.instance.log( // "SubscribableElectrumXClient.connectTor(): " // "connecting to SOCKS socket at $proxyInfo (SSL $useSSL)...", // level: LogLevel.Info); // // await _socksSocket?.connect(); // // Logging.instance.log( // "SubscribableElectrumXClient.connectTor(): " // "connected to SOCKS socket at $proxyInfo...", // level: LogLevel.Info); // } catch (e, s) { // final String msg = "SubscribableElectrumXClient.connectTor(): " // "failed to connect to SOCKS socket at $proxyInfo.." // "\nError: $e\nStack trace: $s"; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw JsonRpcException(msg); // } // // try { // Logging.instance.log( // "SubscribableElectrumXClient.connectTor(): " // "connecting to $host:$port over SOCKS socket at $proxyInfo...", // level: LogLevel.Info); // // await _socksSocket?.connectTo(host, port); // // Logging.instance.log( // "SubscribableElectrumXClient.connectTor(): " // "connected to $host:$port over SOCKS socket at $proxyInfo", // level: LogLevel.Info); // } catch (e, s) { // final String msg = "SubscribableElectrumXClient.connectTor(): " // "failed to connect $host over tor proxy at $proxyInfo." // "\nError: $e\nStack trace: $s"; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw JsonRpcException(msg); // } // // return; // } // // /// Disconnect from the server. // Future disconnect() async { // _aliveTimer?.cancel(); // _aliveTimer = null; // // try { // await _socket?.close(); // } catch (e, s) { // Logging.instance.log( // "SubscribableElectrumXClient.disconnect: failed to close socket." // "\nError: $e\nStack trace: $s", // level: LogLevel.Warning); // } // _socket = null; // // try { // await _socksSocket?.close(); // } catch (e, s) { // Logging.instance.log( // "SubscribableElectrumXClient.disconnect: failed to close SOCKS socket." // "\nError: $e\nStack trace: $s", // level: LogLevel.Warning); // } // _socksSocket = null; // // onConnectionStatusChanged = null; // } // // /// Format JSON request string. // String _buildJsonRequestString({ // required String method, // required String id, // required List params, // }) { // final paramString = jsonEncode(params); // return '{"jsonrpc": "2.0", "id": "$id","method": "$method","params": $paramString}\r\n'; // } // // /// Update the connection status and call the onConnectionStatusChanged callback if it exists. // void _updateConnectionStatus(bool connectionStatus) { // if (_isConnected != connectionStatus && onConnectionStatusChanged != null) { // onConnectionStatusChanged!(connectionStatus); // } // _isConnected = connectionStatus; // } // // /// Called when the socket has data. // void _dataHandler(List data) { // _responseData.addAll(data); // // // 0x0A is newline // // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html // if (data.last == 0x0A) { // try { // final response = jsonDecode(String.fromCharCodes(_responseData)) // as Map; // _responseHandler(response); // } catch (e, s) { // Logging.instance // .log("JsonRPC jsonDecode: $e\n$s", level: LogLevel.Error); // rethrow; // } finally { // _responseData = []; // } // } // } // // /// Called when the socket has a response. // void _responseHandler(Map response) { // // subscriptions will have a method in the response // if (response['method'] is String) { // _subscriptionHandler(response: response); // return; // } // // final id = response['id'] as String; // final result = response['result']; // // _complete(id, result); // } // // /// Called when the subscription has a response. // void _subscriptionHandler({ // required Map response, // }) { // final method = response['method']; // switch (method) { // case "blockchain.scripthash.subscribe": // final params = response["params"] as List; // final scripthash = params.first as String; // final taskId = "blockchain.scripthash.subscribe:$scripthash"; // // _tasks[taskId]?.subscription?.addToStream(params.last); // break; // case "blockchain.headers.subscribe": // final params = response["params"]; // const taskId = "blockchain.headers.subscribe"; // // _tasks[taskId]?.subscription?.addToStream(params.first); // break; // default: // break; // } // } // // /// Called when the socket has an error. // void _errorHandler(Object error, StackTrace trace) { // _updateConnectionStatus(false); // Logging.instance.log( // "SubscribableElectrumXClient called _errorHandler with: $error\n$trace", // level: LogLevel.Info); // } // // /// Called when the socket is closed. // void _doneHandler() { // _updateConnectionStatus(false); // Logging.instance.log("SubscribableElectrumXClient called _doneHandler", // level: LogLevel.Info); // } // // /// Complete a task with the given id and data. // void _complete(String id, dynamic data) { // if (_tasks[id] == null) { // return; // } // // if (!(_tasks[id]?.completer?.isCompleted ?? false)) { // _tasks[id]?.completer?.complete(data); // } // // if (!(_tasks[id]?.isSubscription ?? false)) { // _tasks.remove(id); // } else { // _tasks[id]?.subscription?.addToStream(data); // } // } // // /// Add a task to the task list. // void _addTask({ // required String id, // required Completer completer, // }) { // _tasks[id] = SocketTask(completer: completer, subscription: null); // } // // /// Add a subscription task to the task list. // void _addSubscriptionTask({ // required String id, // required ElectrumXSubscription subscription, // }) { // _tasks[id] = SocketTask(completer: null, subscription: subscription); // } // // /// Write call to socket. // Future _call({ // required String method, // List params = const [], // }) async { // // If we're connecting to Tor, wait. // if (_requireMutex) { // await _torConnectingLock.protect(() async => await _checkSocket()); // } else { // await _checkSocket(); // } // // // Check socket is connected. // if (_prefs.useTor) { // if (_socksSocket == null) { // final msg = "SubscribableElectrumXClient._call: " // "SOCKSSocket is not connected. Method $method, params $params."; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw Exception(msg); // } // } else { // if (_socket == null) { // final msg = "SubscribableElectrumXClient._call: " // "Socket is not connected. Method $method, params $params."; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw Exception(msg); // } // } // // final completer = Completer(); // _currentRequestID++; // final id = _currentRequestID.toString(); // // // Write to the socket. // try { // _addTask(id: id, completer: completer); // // if (_prefs.useTor) { // _socksSocket?.write( // _buildJsonRequestString( // method: method, // id: id, // params: params, // ), // ); // } else { // _socket?.write( // _buildJsonRequestString( // method: method, // id: id, // params: params, // ), // ); // } // // return completer.future; // } catch (e, s) { // final String msg = "SubscribableElectrumXClient._call: " // "failed to request $method with id $id." // "\nError: $e\nStack trace: $s"; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw JsonRpcException(msg); // } // } // // /// Write call to socket with timeout. // Future _callWithTimeout({ // required String method, // List params = const [], // Duration timeout = const Duration(seconds: 2), // }) async { // // If we're connecting to Tor, wait. // if (_requireMutex) { // await _torConnectingLock.protect(() async => await _checkSocket()); // } else { // await _checkSocket(); // } // // // Check socket is connected. // if (_prefs.useTor) { // if (_socksSocket == null) { // try { // if (_host == null || _port == null) { // throw Exception("No host or port provided"); // } // // // Attempt to conect. // await connect( // host: _host!, // port: _port!, // ); // } catch (e, s) { // final msg = "SubscribableElectrumXClient._callWithTimeout: " // "SOCKSSocket not connected and cannot connect. " // "Method $method, params $params." // "\nError: $e\nStack trace: $s"; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw Exception(msg); // } // } // } else { // if (_socket == null) { // try { // if (_host == null || _port == null) { // throw Exception("No host or port provided"); // } // // // Attempt to conect. // await connect( // host: _host!, // port: _port!, // ); // } catch (e, s) { // final msg = "SubscribableElectrumXClient._callWithTimeout: " // "Socket not connected and cannot connect. " // "Method $method, params $params."; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw Exception(msg); // } // } // } // // final completer = Completer(); // _currentRequestID++; // final id = _currentRequestID.toString(); // // // Write to the socket. // try { // _addTask(id: id, completer: completer); // // if (_prefs.useTor) { // _socksSocket?.write( // _buildJsonRequestString( // method: method, // id: id, // params: params, // ), // ); // } else { // _socket?.write( // _buildJsonRequestString( // method: method, // id: id, // params: params, // ), // ); // } // // Timer(timeout, () { // if (!completer.isCompleted) { // completer.completeError( // Exception("Request \"id: $id, method: $method\" timed out!"), // ); // } // }); // // return completer.future; // } catch (e, s) { // final String msg = "SubscribableElectrumXClient._callWithTimeout: " // "failed to request $method with id $id (timeout $timeout)." // "\nError: $e\nStack trace: $s"; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw JsonRpcException(msg); // } // } // // ElectrumXSubscription _subscribe({ // required String id, // required String method, // List params = const [], // }) { // try { // final subscription = ElectrumXSubscription(); // _addSubscriptionTask(id: id, subscription: subscription); // _currentRequestID++; // // // Check socket is connected. // if (_prefs.useTor) { // if (_socksSocket == null) { // final msg = "SubscribableElectrumXClient._call: " // "SOCKSSocket is not connected. Method $method, params $params."; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw Exception(msg); // } // } else { // if (_socket == null) { // final msg = "SubscribableElectrumXClient._call: " // "Socket is not connected. Method $method, params $params."; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw Exception(msg); // } // } // // // Write to the socket. // if (_prefs.useTor) { // _socksSocket?.write( // _buildJsonRequestString( // method: method, // id: id, // params: params, // ), // ); // } else { // _socket?.write( // _buildJsonRequestString( // method: method, // id: id, // params: params, // ), // ); // } // // return subscription; // } catch (e, s) { // final String msg = "SubscribableElectrumXClient._subscribe: " // "failed to subscribe to $method with id $id." // "\nError: $e\nStack trace: $s"; // Logging.instance.log(msg, level: LogLevel.Fatal); // throw JsonRpcException(msg); // } // } // // /// Ping the server to ensure it is responding // /// // /// Returns true if ping succeeded // Future ping() async { // // If we're connecting to Tor, wait. // if (_requireMutex) { // await _torConnectingLock.protect(() async => await _checkSocket()); // } else { // await _checkSocket(); // } // // // Write to the socket. // try { // final response = (await _callWithTimeout(method: "server.ping")) as Map; // return response.keys.contains("result") && response["result"] == null; // } catch (_) { // return false; // } // } // // /// Subscribe to a scripthash to receive notifications on status changes // ElectrumXSubscription subscribeToScripthash({required String scripthash}) { // return _subscribe( // id: 'blockchain.scripthash.subscribe:$scripthash', // method: 'blockchain.scripthash.subscribe', // params: [scripthash], // ); // } // // /// Subscribe to block headers to receive notifications on new blocks found // /// // /// Returns the existing subscription if found // ElectrumXSubscription subscribeToBlockHeaders() { // return _tasks["blockchain.headers.subscribe"]?.subscription ?? // _subscribe( // id: "blockchain.headers.subscribe", // method: "blockchain.headers.subscribe", // params: [], // ); // } // }