mirror of
https://github.com/cypherstack/stack_wallet.git
synced 2024-11-17 01:37:54 +00:00
cleaned up electrumx rpc
This commit is contained in:
parent
5c2465784e
commit
bad1eed7c8
3 changed files with 172 additions and 376 deletions
|
@ -132,13 +132,12 @@ class ElectrumX {
|
|||
|
||||
final response = await _rpcClient!.request(jsonRequestString);
|
||||
|
||||
print("=================================================");
|
||||
print("TYPE: ${response.runtimeType}");
|
||||
print("RESPONSE: $response");
|
||||
print("=================================================");
|
||||
if (response.exception != null) {
|
||||
throw response.exception!;
|
||||
}
|
||||
|
||||
if (response["error"] != null) {
|
||||
if (response["error"]
|
||||
if (response.data["error"] != null) {
|
||||
if (response.data["error"]
|
||||
.toString()
|
||||
.contains("No such mempool or blockchain transaction")) {
|
||||
throw NoSuchTransactionException(
|
||||
|
@ -148,11 +147,15 @@ class ElectrumX {
|
|||
}
|
||||
|
||||
throw Exception(
|
||||
"JSONRPC response \ncommand: $command \nargs: $args \nerror: $response");
|
||||
"JSONRPC response\n"
|
||||
" command: $command\n"
|
||||
" args: $args\n"
|
||||
" error: $response.data",
|
||||
);
|
||||
}
|
||||
|
||||
currentFailoverIndex = -1;
|
||||
return response;
|
||||
return response.data;
|
||||
} on WifiOnlyException {
|
||||
rethrow;
|
||||
} on SocketException {
|
||||
|
@ -233,7 +236,13 @@ class ElectrumX {
|
|||
// Logging.instance.log("batch request: $request");
|
||||
|
||||
// send batch request
|
||||
final response = (await _rpcClient!.request(request)) as List<dynamic>;
|
||||
final jsonRpcResponse = (await _rpcClient!.request(request));
|
||||
|
||||
if (jsonRpcResponse.exception != null) {
|
||||
throw jsonRpcResponse.exception!;
|
||||
}
|
||||
|
||||
final response = jsonRpcResponse.data as List;
|
||||
|
||||
// check for errors, format and throw if there are any
|
||||
final List<String> errors = [];
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:mutex/mutex.dart';
|
||||
import 'package:stackwallet/utilities/logger.dart';
|
||||
|
||||
// hacky fix to receive large jsonrpc responses
|
||||
// Json RPC class to handle connecting to electrumx servers
|
||||
class JsonRPC {
|
||||
JsonRPC({
|
||||
required this.host,
|
||||
|
@ -25,90 +25,60 @@ class JsonRPC {
|
|||
StreamSubscription<Uint8List>? _subscription;
|
||||
|
||||
void _dataHandler(List<int> data) {
|
||||
if (_requestQueue.isEmpty) {
|
||||
// probably just return although this case should never actually hit
|
||||
return;
|
||||
}
|
||||
_requestQueue.nextIncompleteReq.then((req) {
|
||||
if (req != null) {
|
||||
req.appendDataAndCheckIfComplete(data);
|
||||
|
||||
final req = _requestQueue.next;
|
||||
req.appendDataAndCheckIfComplete(data);
|
||||
|
||||
if (req.isComplete) {
|
||||
_onReqCompleted(req);
|
||||
}
|
||||
if (req.isComplete) {
|
||||
_onReqCompleted(req);
|
||||
}
|
||||
} else {
|
||||
Logging.instance.log(
|
||||
"_dataHandler found a null req!",
|
||||
level: LogLevel.Warning,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _errorHandler(Object error, StackTrace trace) {
|
||||
Logging.instance.log(
|
||||
"JsonRPC errorHandler: $error\n$trace",
|
||||
level: LogLevel.Error,
|
||||
);
|
||||
|
||||
final req = _requestQueue.next;
|
||||
req.completer.completeError(error, trace);
|
||||
_onReqCompleted(req);
|
||||
_requestQueue.nextIncompleteReq.then((req) {
|
||||
if (req != null) {
|
||||
req.completer.completeError(error, trace);
|
||||
_onReqCompleted(req);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _doneHandler() {
|
||||
Logging.instance.log(
|
||||
"JsonRPC doneHandler: "
|
||||
"connection closed to $host:$port, destroying socket",
|
||||
level: LogLevel.Info,
|
||||
);
|
||||
|
||||
if (_requestQueue.isNotEmpty) {
|
||||
Logging.instance.log(
|
||||
"JsonRPC doneHandler: queue not empty but connection closed, "
|
||||
"completing pending requests with errors",
|
||||
level: LogLevel.Error,
|
||||
);
|
||||
|
||||
for (final req in _requestQueue.queue) {
|
||||
if (!req.isComplete) {
|
||||
try {
|
||||
throw Exception(
|
||||
"JsonRPC doneHandler: socket closed "
|
||||
"before request could complete",
|
||||
);
|
||||
} catch (e, s) {
|
||||
req.completer.completeError(e, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
_requestQueue.clear();
|
||||
}
|
||||
|
||||
disconnect();
|
||||
disconnect(reason: "JsonRPC _doneHandler() called");
|
||||
}
|
||||
|
||||
void _onReqCompleted(_JsonRPCRequest req) {
|
||||
_requestQueue.remove(req);
|
||||
if (_requestQueue.isNotEmpty) {
|
||||
_requestQueue.remove(req).then((_) {
|
||||
// attempt to send next request
|
||||
_sendNextAvailableRequest();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _sendNextAvailableRequest() {
|
||||
if (_requestQueue.isEmpty) {
|
||||
// TODO handle properly
|
||||
throw Exception("JSON RPC queue empty");
|
||||
}
|
||||
_requestQueue.nextIncompleteReq.then((req) {
|
||||
if (req != null) {
|
||||
// \r\n required by electrumx server
|
||||
_socket!.write('${req.jsonRequest}\r\n');
|
||||
|
||||
final req = _requestQueue.next;
|
||||
|
||||
_socket!.write('${req.jsonRequest}\r\n');
|
||||
|
||||
req.initiateTimeout(const Duration(seconds: 10));
|
||||
// Logging.instance.log(
|
||||
// "JsonRPC request: wrote request ${req.jsonRequest} "
|
||||
// "to socket $host:$port",
|
||||
// level: LogLevel.Info,
|
||||
// );
|
||||
// TODO different timeout length?
|
||||
req.initiateTimeout(
|
||||
const Duration(seconds: 10),
|
||||
onTimedOut: () {
|
||||
_requestQueue.remove(req);
|
||||
},
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Future<dynamic> request(String jsonRpcRequest) async {
|
||||
// todo: handle this better?
|
||||
// Do we need to check the subscription, too?
|
||||
Future<JsonRPCResponse> request(String jsonRpcRequest) async {
|
||||
await _requestMutex.protect(() async {
|
||||
if (_socket == null) {
|
||||
Logging.instance.log(
|
||||
|
@ -121,51 +91,69 @@ class JsonRPC {
|
|||
|
||||
final req = _JsonRPCRequest(
|
||||
jsonRequest: jsonRpcRequest,
|
||||
completer: Completer<dynamic>(),
|
||||
completer: Completer<JsonRPCResponse>(),
|
||||
);
|
||||
|
||||
_requestQueue.add(req);
|
||||
final future = req.completer.future.onError(
|
||||
(error, stackTrace) async {
|
||||
await disconnect(
|
||||
reason: "return req.completer.future.onError: $error\n$stackTrace",
|
||||
);
|
||||
return JsonRPCResponse(
|
||||
exception: error is Exception
|
||||
? error
|
||||
: Exception(
|
||||
"req.completer.future.onError: $error\n$stackTrace",
|
||||
),
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
// if this is the only/first request then send it right away
|
||||
if (_requestQueue.length == 1) {
|
||||
_sendNextAvailableRequest();
|
||||
} else {
|
||||
// Logging.instance.log(
|
||||
// "JsonRPC request: queued request $jsonRpcRequest "
|
||||
// "to socket $host:$port",
|
||||
// level: LogLevel.Info,
|
||||
// );
|
||||
}
|
||||
|
||||
return req.completer.future.onError(
|
||||
(error, stackTrace) =>
|
||||
Exception("return req.completer.future.onError: $error"),
|
||||
await _requestQueue.add(
|
||||
req,
|
||||
onInitialRequestAdded: _sendNextAvailableRequest,
|
||||
);
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
void disconnect() {
|
||||
// TODO: maybe clear req queue here and wrap in mutex?
|
||||
_subscription?.cancel().then((_) => _subscription = null);
|
||||
_socket?.destroy();
|
||||
_socket = null;
|
||||
Future<void> disconnect({required String reason}) async {
|
||||
await _requestMutex.protect(() async {
|
||||
await _subscription?.cancel();
|
||||
_subscription = null;
|
||||
_socket?.destroy();
|
||||
_socket = null;
|
||||
|
||||
// clean up remaining queue
|
||||
await _requestQueue.completeRemainingWithError(
|
||||
"JsonRPC disconnect() called with reason: \"$reason\"",
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> connect() async {
|
||||
if (_socket != null) {
|
||||
throw Exception(
|
||||
"JsonRPC attempted to connect to an already existing socket!",
|
||||
);
|
||||
}
|
||||
|
||||
if (useSSL) {
|
||||
_socket ??= await SecureSocket.connect(
|
||||
_socket = await SecureSocket.connect(
|
||||
host,
|
||||
port,
|
||||
timeout: connectionTimeout,
|
||||
onBadCertificate: (_) => true,
|
||||
); // TODO do not automatically trust bad certificates
|
||||
} else {
|
||||
_socket ??= await Socket.connect(
|
||||
_socket = await Socket.connect(
|
||||
host,
|
||||
port,
|
||||
timeout: connectionTimeout,
|
||||
);
|
||||
}
|
||||
await _subscription?.cancel();
|
||||
|
||||
_subscription = _socket!.listen(
|
||||
_dataHandler,
|
||||
onError: _errorHandler,
|
||||
|
@ -176,36 +164,85 @@ class JsonRPC {
|
|||
}
|
||||
|
||||
class _JsonRPCRequestQueue {
|
||||
final _lock = Mutex();
|
||||
final List<_JsonRPCRequest> _rq = [];
|
||||
|
||||
void add(_JsonRPCRequest req) => _rq.add(req);
|
||||
Future<void> add(
|
||||
_JsonRPCRequest req, {
|
||||
VoidCallback? onInitialRequestAdded,
|
||||
}) async {
|
||||
return await _lock.protect(() async {
|
||||
_rq.add(req);
|
||||
if (_rq.length == 1) {
|
||||
onInitialRequestAdded?.call();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
bool remove(_JsonRPCRequest req) => _rq.remove(req);
|
||||
Future<bool> remove(_JsonRPCRequest req) async {
|
||||
return await _lock.protect(() async {
|
||||
final result = _rq.remove(req);
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
void clear() => _rq.clear();
|
||||
Future<_JsonRPCRequest?> get nextIncompleteReq async {
|
||||
return await _lock.protect(() async {
|
||||
int removeCount = 0;
|
||||
_JsonRPCRequest? returnValue;
|
||||
for (final req in _rq) {
|
||||
if (req.isComplete) {
|
||||
removeCount++;
|
||||
} else {
|
||||
returnValue = req;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bool get isEmpty => _rq.isEmpty;
|
||||
bool get isNotEmpty => _rq.isNotEmpty;
|
||||
int get length => _rq.length;
|
||||
_JsonRPCRequest get next => _rq.first;
|
||||
List<_JsonRPCRequest> get queue => _rq.toList(growable: false);
|
||||
_rq.removeRange(0, removeCount);
|
||||
|
||||
return returnValue;
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> completeRemainingWithError(
|
||||
String error, {
|
||||
StackTrace? stackTrace,
|
||||
}) async {
|
||||
await _lock.protect(() async {
|
||||
for (final req in _rq) {
|
||||
if (!req.isComplete) {
|
||||
req.completer.completeError(Exception(error), stackTrace);
|
||||
}
|
||||
}
|
||||
_rq.clear();
|
||||
});
|
||||
}
|
||||
|
||||
Future<bool> get isEmpty async {
|
||||
return await _lock.protect(() async {
|
||||
return _rq.isEmpty;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class _JsonRPCRequest {
|
||||
// 0x0A is newline
|
||||
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
|
||||
static const int separatorByte = 0x0A;
|
||||
|
||||
final String jsonRequest;
|
||||
final Completer<dynamic> completer;
|
||||
final Completer<JsonRPCResponse> completer;
|
||||
final List<int> _responseData = [];
|
||||
|
||||
_JsonRPCRequest({required this.jsonRequest, required this.completer});
|
||||
|
||||
void appendDataAndCheckIfComplete(List<int> data) {
|
||||
_responseData.addAll(data);
|
||||
// 0x0A is newline
|
||||
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
|
||||
if (data.last == 0x0A) {
|
||||
if (data.last == separatorByte) {
|
||||
try {
|
||||
final response = json.decode(String.fromCharCodes(_responseData));
|
||||
completer.complete(response);
|
||||
completer.complete(JsonRPCResponse(data: response));
|
||||
} catch (e, s) {
|
||||
Logging.instance.log(
|
||||
"JsonRPC json.decode: $e\n$s",
|
||||
|
@ -216,13 +253,17 @@ class _JsonRPCRequest {
|
|||
}
|
||||
}
|
||||
|
||||
void initiateTimeout(Duration timeout) {
|
||||
void initiateTimeout(
|
||||
Duration timeout, {
|
||||
VoidCallback? onTimedOut,
|
||||
}) {
|
||||
Future<void>.delayed(timeout).then((_) {
|
||||
if (!isComplete) {
|
||||
try {
|
||||
throw Exception("_JsonRPCRequest timed out: $jsonRequest");
|
||||
} catch (e, s) {
|
||||
completer.completeError(e, s);
|
||||
onTimedOut?.call();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -230,3 +271,10 @@ class _JsonRPCRequest {
|
|||
|
||||
bool get isComplete => completer.isCompleted;
|
||||
}
|
||||
|
||||
class JsonRPCResponse {
|
||||
final dynamic data;
|
||||
final Exception? exception;
|
||||
|
||||
JsonRPCResponse({this.data, this.exception});
|
||||
}
|
||||
|
|
|
@ -1,261 +0,0 @@
|
|||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:mutex/mutex.dart';
|
||||
import 'package:stackwallet/utilities/logger.dart';
|
||||
|
||||
// hacky fix to receive large jsonrpc responses
|
||||
class JsonRPC {
|
||||
JsonRPC({
|
||||
required this.host,
|
||||
required this.port,
|
||||
this.useSSL = false,
|
||||
this.connectionTimeout = const Duration(seconds: 60),
|
||||
});
|
||||
final bool useSSL;
|
||||
final String host;
|
||||
final int port;
|
||||
final Duration connectionTimeout;
|
||||
|
||||
final _requestMutex = Mutex();
|
||||
final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue();
|
||||
Socket? _socket;
|
||||
StreamSubscription<Uint8List>? _subscription;
|
||||
|
||||
void _dataHandler(List<int> data) {
|
||||
_requestQueue.nextIncompleteReq.then((req) {
|
||||
if (req != null) {
|
||||
req.appendDataAndCheckIfComplete(data);
|
||||
|
||||
if (req.isComplete) {
|
||||
_onReqCompleted(req);
|
||||
}
|
||||
} else {
|
||||
Logging.instance.log(
|
||||
"_dataHandler found a null req!",
|
||||
level: LogLevel.Warning,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _errorHandler(Object error, StackTrace trace) {
|
||||
_requestQueue.nextIncompleteReq.then((req) {
|
||||
if (req != null) {
|
||||
req.completer.completeError(error, trace);
|
||||
_onReqCompleted(req);
|
||||
} else {
|
||||
Logging.instance.log(
|
||||
"_errorHandler found a null req!",
|
||||
level: LogLevel.Warning,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _doneHandler() {
|
||||
Logging.instance.log(
|
||||
"JsonRPC doneHandler: "
|
||||
"connection closed to $host:$port, destroying socket",
|
||||
level: LogLevel.Info,
|
||||
);
|
||||
|
||||
disconnect(reason: "JsonRPC _doneHandler() called");
|
||||
}
|
||||
|
||||
void _onReqCompleted(_JsonRPCRequest req) {
|
||||
_requestQueue.remove(req).then((value) {
|
||||
if (kDebugMode) {
|
||||
print("Request removed from queue: $value");
|
||||
}
|
||||
// attempt to send next request
|
||||
_sendNextAvailableRequest();
|
||||
});
|
||||
}
|
||||
|
||||
void _sendNextAvailableRequest() {
|
||||
_requestQueue.nextIncompleteReq.then((req) {
|
||||
if (req != null) {
|
||||
// \r\n required by electrumx server
|
||||
_socket!.write('${req.jsonRequest}\r\n');
|
||||
|
||||
// TODO different timeout length?
|
||||
req.initiateTimeout(const Duration(seconds: 10));
|
||||
} else {
|
||||
Logging.instance.log(
|
||||
"_sendNextAvailableRequest found a null req!",
|
||||
level: LogLevel.Warning,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: non dynamic type
|
||||
Future<dynamic> request(String jsonRpcRequest) async {
|
||||
await _requestMutex.protect(() async {
|
||||
if (_socket == null) {
|
||||
Logging.instance.log(
|
||||
"JsonRPC request: opening socket $host:$port",
|
||||
level: LogLevel.Info,
|
||||
);
|
||||
await connect();
|
||||
}
|
||||
});
|
||||
|
||||
final req = _JsonRPCRequest(
|
||||
jsonRequest: jsonRpcRequest,
|
||||
completer: Completer<dynamic>(),
|
||||
);
|
||||
|
||||
// if this is the only/first request then send it right away
|
||||
await _requestQueue.add(
|
||||
req,
|
||||
onInitialRequestAdded: _sendNextAvailableRequest,
|
||||
);
|
||||
|
||||
return req.completer.future.onError(
|
||||
(error, stackTrace) => Exception(
|
||||
"return req.completer.future.onError: $error",
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
Future<void> disconnect({String reason = "none"}) async {
|
||||
await _requestMutex.protect(() async {
|
||||
await _subscription?.cancel();
|
||||
_subscription = null;
|
||||
_socket?.destroy();
|
||||
_socket = null;
|
||||
|
||||
// clean up remaining queue
|
||||
await _requestQueue.completeRemainingWithError(
|
||||
"JsonRPC disconnect() called with reason: \"$reason\"",
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> connect() async {
|
||||
if (_socket != null) {
|
||||
throw Exception(
|
||||
"JsonRPC attempted to connect to an already existing socket!",
|
||||
);
|
||||
}
|
||||
|
||||
if (useSSL) {
|
||||
_socket = await SecureSocket.connect(
|
||||
host,
|
||||
port,
|
||||
timeout: connectionTimeout,
|
||||
onBadCertificate: (_) => true,
|
||||
); // TODO do not automatically trust bad certificates
|
||||
} else {
|
||||
_socket = await Socket.connect(
|
||||
host,
|
||||
port,
|
||||
timeout: connectionTimeout,
|
||||
);
|
||||
}
|
||||
|
||||
_subscription = _socket!.listen(
|
||||
_dataHandler,
|
||||
onError: _errorHandler,
|
||||
onDone: _doneHandler,
|
||||
cancelOnError: true,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
class _JsonRPCRequestQueue {
|
||||
final m = Mutex();
|
||||
|
||||
final List<_JsonRPCRequest> _rq = [];
|
||||
|
||||
Future<void> add(
|
||||
_JsonRPCRequest req, {
|
||||
VoidCallback? onInitialRequestAdded,
|
||||
}) async {
|
||||
return await m.protect(() async {
|
||||
_rq.add(req);
|
||||
if (_rq.length == 1) {
|
||||
onInitialRequestAdded?.call();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Future<bool> remove(_JsonRPCRequest req) async {
|
||||
return await m.protect(() async => _rq.remove(req));
|
||||
}
|
||||
|
||||
Future<_JsonRPCRequest?> get nextIncompleteReq async {
|
||||
return await m.protect(() async {
|
||||
try {
|
||||
return _rq.firstWhere((e) => !e.isComplete);
|
||||
} catch (_) {
|
||||
// no incomplete requests found
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> completeRemainingWithError(
|
||||
String error, {
|
||||
StackTrace? stackTrace,
|
||||
}) async {
|
||||
await m.protect(() async {
|
||||
for (final req in _rq) {
|
||||
if (!req.isComplete) {
|
||||
req.completer.completeError(Exception(error), stackTrace);
|
||||
}
|
||||
}
|
||||
_rq.clear();
|
||||
});
|
||||
}
|
||||
|
||||
Future<bool> get isEmpty async {
|
||||
return await m.protect(() async {
|
||||
return _rq.isEmpty;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class _JsonRPCRequest {
|
||||
final String jsonRequest;
|
||||
final Completer<dynamic> completer;
|
||||
final List<int> _responseData = [];
|
||||
|
||||
_JsonRPCRequest({required this.jsonRequest, required this.completer});
|
||||
|
||||
void appendDataAndCheckIfComplete(List<int> data) {
|
||||
_responseData.addAll(data);
|
||||
// 0x0A is newline
|
||||
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
|
||||
if (data.last == 0x0A) {
|
||||
try {
|
||||
final response = json.decode(String.fromCharCodes(_responseData));
|
||||
completer.complete(response);
|
||||
} catch (e, s) {
|
||||
Logging.instance.log(
|
||||
"JsonRPC json.decode: $e\n$s",
|
||||
level: LogLevel.Error,
|
||||
);
|
||||
completer.completeError(e, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void initiateTimeout(Duration timeout) {
|
||||
Future<void>.delayed(timeout).then((_) {
|
||||
if (!isComplete) {
|
||||
try {
|
||||
throw Exception("_JsonRPCRequest timed out: $jsonRequest");
|
||||
} catch (e, s) {
|
||||
completer.completeError(e, s);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
bool get isComplete => completer.isCompleted;
|
||||
}
|
Loading…
Reference in a new issue