mirror of
https://github.com/cypherstack/stack_wallet.git
synced 2025-01-18 00:24:31 +00:00
queued json rpc requests per socket connection
This commit is contained in:
parent
6b9337ee10
commit
2b9f4c85f9
1 changed files with 191 additions and 0 deletions
191
lib/electrumx_rpc/rpc2.dart
Normal file
191
lib/electrumx_rpc/rpc2.dart
Normal file
|
@ -0,0 +1,191 @@
|
|||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:mutex/mutex.dart';
|
||||
import 'package:stackwallet/utilities/logger.dart';
|
||||
|
||||
// hacky fix to receive large jsonrpc responses
|
||||
class JsonRPC {
|
||||
JsonRPC({
|
||||
required this.host,
|
||||
required this.port,
|
||||
this.useSSL = false,
|
||||
this.connectionTimeout = const Duration(seconds: 60),
|
||||
});
|
||||
final bool useSSL;
|
||||
final String host;
|
||||
final int port;
|
||||
final Duration connectionTimeout;
|
||||
|
||||
final _JsonRPCRequestQueue _requestQueue = _JsonRPCRequestQueue();
|
||||
Socket? _socket;
|
||||
StreamSubscription<Uint8List>? _subscription;
|
||||
|
||||
void _dataHandler(List<int> data) {
|
||||
if (_requestQueue.isEmpty) {
|
||||
// probably just return although this case should never actually hit
|
||||
// TODO anything else here?
|
||||
return;
|
||||
}
|
||||
|
||||
final req = _requestQueue.next;
|
||||
req.appendDataAndCheckIfComplete(data);
|
||||
|
||||
if (req.isComplete) {
|
||||
_onReqCompleted(req);
|
||||
}
|
||||
}
|
||||
|
||||
void _errorHandler(Object error, StackTrace trace) {
|
||||
Logging.instance.log(
|
||||
"JsonRPC errorHandler: $error\n$trace",
|
||||
level: LogLevel.Error,
|
||||
);
|
||||
|
||||
final req = _requestQueue.next;
|
||||
req.completer.completeError(error, trace);
|
||||
_onReqCompleted(req);
|
||||
}
|
||||
|
||||
void _doneHandler() {
|
||||
Logging.instance.log(
|
||||
"JsonRPC doneHandler: "
|
||||
"connection closed to ${_socket?.address}:${_socket?.port}",
|
||||
level: LogLevel.Info,
|
||||
);
|
||||
}
|
||||
|
||||
Future<void> _onReqCompleted(_JsonRPCRequest req) async {
|
||||
await _requestQueue.remove(req);
|
||||
if (_requestQueue.isNotEmpty) {
|
||||
_sendNextAvailableRequest();
|
||||
}
|
||||
}
|
||||
|
||||
void _sendNextAvailableRequest() {
|
||||
if (_requestQueue.isEmpty) {
|
||||
// TODO handle properly
|
||||
throw Exception("reeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee");
|
||||
}
|
||||
|
||||
final req = _requestQueue.next;
|
||||
|
||||
_socket!.write('${req.jsonRequest}\r\n');
|
||||
Logging.instance.log(
|
||||
"JsonRPC request: wrote request ${req.jsonRequest} "
|
||||
"to socket ${_socket?.address}:${_socket?.port}",
|
||||
level: LogLevel.Info,
|
||||
);
|
||||
}
|
||||
|
||||
Future<dynamic> request(String jsonRpcRequest) async {
|
||||
// todo: handle this better?
|
||||
// Do we need to check the subscription, too?
|
||||
if (_socket == null) {
|
||||
Logging.instance.log(
|
||||
"JsonRPC request: opening socket $host:$port",
|
||||
level: LogLevel.Info,
|
||||
);
|
||||
await connect();
|
||||
}
|
||||
|
||||
final req = _JsonRPCRequest(
|
||||
jsonRequest: jsonRpcRequest,
|
||||
completer: Completer<dynamic>(),
|
||||
);
|
||||
|
||||
await _requestQueue.add(req);
|
||||
|
||||
// if this is the only/first request then send it right away
|
||||
if (_requestQueue.length == 1) {
|
||||
_sendNextAvailableRequest();
|
||||
} else {
|
||||
Logging.instance.log(
|
||||
"JsonRPC request: queued request $jsonRpcRequest "
|
||||
"to socket ${_socket?.address}:${_socket?.port}",
|
||||
level: LogLevel.Info,
|
||||
);
|
||||
}
|
||||
|
||||
return req.completer.future;
|
||||
}
|
||||
|
||||
Future<void> disconnect() async {
|
||||
await _subscription?.cancel();
|
||||
_subscription = null;
|
||||
_socket?.destroy();
|
||||
}
|
||||
|
||||
Future<void> connect() async {
|
||||
if (useSSL) {
|
||||
_socket ??= await SecureSocket.connect(
|
||||
host,
|
||||
port,
|
||||
timeout: connectionTimeout,
|
||||
onBadCertificate: (_) => true,
|
||||
); // TODO do not automatically trust bad certificates
|
||||
} else {
|
||||
_socket ??= await Socket.connect(
|
||||
host,
|
||||
port,
|
||||
timeout: connectionTimeout,
|
||||
);
|
||||
}
|
||||
await _subscription?.cancel();
|
||||
_subscription = _socket!.listen(
|
||||
_dataHandler,
|
||||
onError: _errorHandler,
|
||||
onDone: _doneHandler,
|
||||
cancelOnError: true,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// mutex *may* not be needed as the protected functions are not async
|
||||
class _JsonRPCRequestQueue {
|
||||
final _m = Mutex();
|
||||
final List<_JsonRPCRequest> _rq = [];
|
||||
|
||||
Future<void> add(_JsonRPCRequest req) async {
|
||||
await _m.protect(() async => _rq.add(req));
|
||||
}
|
||||
|
||||
Future<void> remove(_JsonRPCRequest req) async {
|
||||
await _m.protect(() async => _rq.remove(req));
|
||||
}
|
||||
|
||||
bool get isEmpty => _rq.isEmpty;
|
||||
bool get isNotEmpty => _rq.isNotEmpty;
|
||||
int get length => _rq.length;
|
||||
_JsonRPCRequest get next => _rq.first;
|
||||
}
|
||||
|
||||
class _JsonRPCRequest {
|
||||
final String jsonRequest;
|
||||
final Completer<dynamic> completer;
|
||||
final List<int> _responseData = [];
|
||||
|
||||
_JsonRPCRequest({required this.jsonRequest, required this.completer});
|
||||
|
||||
void appendDataAndCheckIfComplete(List<int> data) {
|
||||
_responseData.addAll(data);
|
||||
// 0x0A is newline
|
||||
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-basics.html
|
||||
if (data.last == 0x0A) {
|
||||
try {
|
||||
final response = json.decode(String.fromCharCodes(_responseData));
|
||||
completer.complete(response);
|
||||
} catch (e, s) {
|
||||
Logging.instance.log(
|
||||
"JsonRPC json.decode: $e\n$s",
|
||||
level: LogLevel.Error,
|
||||
);
|
||||
completer.completeError(e, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool get isComplete => completer.isCompleted;
|
||||
}
|
Loading…
Reference in a new issue