2023-05-24 23:35:45 +00:00
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:mutex/mutex.dart';
import 'package:stackwallet/utilities/logger.dart';
// hacky fix to receive large jsonrpc responses
class JsonRPC {
required this.host,
required this.port,
this.useSSL = false,
this.connectionTimeout = const Duration(seconds: 60),
final bool useSSL;
final String host;
final int port;
final Duration connectionTimeout;
final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue();
Socket? _socket;
StreamSubscription<Uint8List>? _subscription;
void _dataHandler(List<int> data) {
if (_requestQueue.isEmpty) {
// probably just return although this case should never actually hit
// TODO anything else here?
final req = _requestQueue.next;
if (req.isComplete) {
void _errorHandler(Object error, StackTrace trace) {
"JsonRPC errorHandler: $error\n$trace",
level: LogLevel.Error,
final req = _requestQueue.next;
req.completer.completeError(error, trace);
void _doneHandler() {
"JsonRPC doneHandler: "
2023-05-25 18:49:14 +00:00
"connection closed to ${_socket?.address}:${_socket?.port}, destroying socket",
2023-05-24 23:35:45 +00:00
level: LogLevel.Info,
2023-05-25 18:49:14 +00:00
2023-05-25 18:56:18 +00:00
if (_requestQueue.isNotEmpty) {
"JsonRPC doneHandler: sending next request in queue",
level: LogLevel.Warning,
2023-05-24 23:35:45 +00:00
Future<void> _onReqCompleted(_JsonRPCRequest req) async {
await _requestQueue.remove(req);
if (_requestQueue.isNotEmpty) {
void _sendNextAvailableRequest() {
if (_requestQueue.isEmpty) {
// TODO handle properly
2023-05-25 16:29:46 +00:00
throw Exception("JSON RPC queue empty");
2023-05-24 23:35:45 +00:00
final req = _requestQueue.next;
"JsonRPC request: wrote request ${req.jsonRequest} "
"to socket ${_socket?.address}:${_socket?.port}",
level: LogLevel.Info,
Future<dynamic> request(String jsonRpcRequest) async {
// todo: handle this better?
// Do we need to check the subscription, too?
if (_socket == null) {
"JsonRPC request: opening socket $host:$port",
level: LogLevel.Info,
await connect();
final req = _JsonRPCRequest(
jsonRequest: jsonRpcRequest,
completer: Completer<dynamic>(),
await _requestQueue.add(req);
// if this is the only/first request then send it right away
if (_requestQueue.length == 1) {
} else {
"JsonRPC request: queued request $jsonRpcRequest "
"to socket ${_socket?.address}:${_socket?.port}",
level: LogLevel.Info,
return req.completer.future;
Future<void> disconnect() async {
await _subscription?.cancel();
_subscription = null;
Future<void> connect() async {
if (useSSL) {
_socket ??= await SecureSocket.connect(
timeout: connectionTimeout,
onBadCertificate: (_) => true,
); // TODO do not automatically trust bad certificates
} else {
_socket ??= await Socket.connect(
timeout: connectionTimeout,
await _subscription?.cancel();
_subscription = _socket!.listen(
onError: _errorHandler,
onDone: _doneHandler,
cancelOnError: true,
// mutex *may* not be needed as the protected functions are not async
class _JsonRPCRequestQueue {
final _m = Mutex();
final List<_JsonRPCRequest> _rq = [];
Future<void> add(_JsonRPCRequest req) async {
await _m.protect(() async => _rq.add(req));
Future<void> remove(_JsonRPCRequest req) async {
await _m.protect(() async => _rq.remove(req));
bool get isEmpty => _rq.isEmpty;
bool get isNotEmpty => _rq.isNotEmpty;
int get length => _rq.length;
_JsonRPCRequest get next => _rq.first;
class _JsonRPCRequest {
final String jsonRequest;
final Completer<dynamic> completer;
final List<int> _responseData = [];
_JsonRPCRequest({required this.jsonRequest, required this.completer});
void appendDataAndCheckIfComplete(List<int> data) {
// 0x0A is newline
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
if (data.last == 0x0A) {
try {
final response = json.decode(String.fromCharCodes(_responseData));
} catch (e, s) {
"JsonRPC json.decode: $e\n$s",
level: LogLevel.Error,
completer.completeError(e, s);
bool get isComplete => completer.isCompleted;