Merge branch 'staging' into monero_changes

This commit is contained in:
julian-CStack 2024-01-28 22:47:50 -06:00 committed by GitHub
commit 0f8e0db381
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 83 additions and 55 deletions

View file

@ -80,18 +80,32 @@ class JsonRPC {
void _sendNextAvailableRequest() { void _sendNextAvailableRequest() {
_requestQueue.nextIncompleteReq.then((req) { _requestQueue.nextIncompleteReq.then((req) {
if (req != null) { if (req != null) {
// \r\n required by electrumx server if (!Prefs.instance.useTor) {
if (_socket != null) { if (_socket == null) {
Logging.instance.log(
"JsonRPC _sendNextAvailableRequest attempted with"
" _socket=null on $host:$port",
level: LogLevel.Error,
);
}
// \r\n required by electrumx server
_socket!.write('${req.jsonRequest}\r\n'); _socket!.write('${req.jsonRequest}\r\n');
} } else {
if (_socksSocket != null) { if (_socksSocket == null) {
_socksSocket!.write('${req.jsonRequest}\r\n'); Logging.instance.log(
"JsonRPC _sendNextAvailableRequest attempted with"
" _socksSocket=null on $host:$port",
level: LogLevel.Error,
);
}
// \r\n required by electrumx server
_socksSocket?.write('${req.jsonRequest}\r\n');
} }
// TODO different timeout length? // TODO different timeout length?
req.initiateTimeout( req.initiateTimeout(
onTimedOut: () { onTimedOut: () {
_requestQueue.remove(req); _onReqCompleted(req);
}, },
); );
} }
@ -109,7 +123,7 @@ class JsonRPC {
"JsonRPC request: opening socket $host:$port", "JsonRPC request: opening socket $host:$port",
level: LogLevel.Info, level: LogLevel.Info,
); );
await connect().timeout(requestTimeout, onTimeout: () { await _connect().timeout(requestTimeout, onTimeout: () {
throw Exception("Request timeout: $jsonRpcRequest"); throw Exception("Request timeout: $jsonRpcRequest");
}); });
} }
@ -119,7 +133,7 @@ class JsonRPC {
"JsonRPC request: opening SOCKS socket to $host:$port", "JsonRPC request: opening SOCKS socket to $host:$port",
level: LogLevel.Info, level: LogLevel.Info,
); );
await connect().timeout(requestTimeout, onTimeout: () { await _connect().timeout(requestTimeout, onTimeout: () {
throw Exception("Request timeout: $jsonRpcRequest"); throw Exception("Request timeout: $jsonRpcRequest");
}); });
} }
@ -156,23 +170,42 @@ class JsonRPC {
return future; return future;
} }
Future<void> disconnect({required String reason}) async { /// DO NOT set [ignoreMutex] to true unless fully aware of the consequences
await _requestMutex.protect(() async { Future<void> disconnect({
await _subscription?.cancel(); required String reason,
_subscription = null; bool ignoreMutex = false,
_socket?.destroy(); }) async {
_socket = null; if (ignoreMutex) {
await _socksSocket?.close(); await _disconnectHelper(reason: reason);
_socksSocket = null; } else {
await _requestMutex.protect(() async {
// clean up remaining queue await _disconnectHelper(reason: reason);
await _requestQueue.completeRemainingWithError( });
"JsonRPC disconnect() called with reason: \"$reason\"", }
);
});
} }
Future<void> connect() async { Future<void> _disconnectHelper({required String reason}) async {
await _subscription?.cancel();
_subscription = null;
_socket?.destroy();
_socket = null;
await _socksSocket?.close();
_socksSocket = null;
// clean up remaining queue
await _requestQueue.completeRemainingWithError(
"JsonRPC disconnect() called with reason: \"$reason\"",
);
}
Future<void> _connect() async {
// ignore mutex is set to true here as _connect is already called within
// the mutex.protect block. Setting to false here leads to a deadlock
await disconnect(
reason: "New connection requested",
ignoreMutex: true,
);
if (!Prefs.instance.useTor) { if (!Prefs.instance.useTor) {
if (useSSL) { if (useSSL) {
_socket = await SecureSocket.connect( _socket = await SecureSocket.connect(
@ -352,17 +385,20 @@ class _JsonRPCRequest {
} }
void initiateTimeout({ void initiateTimeout({
VoidCallback? onTimedOut, required VoidCallback onTimedOut,
}) { }) {
Future<void>.delayed(requestTimeout).then((_) { Future<void>.delayed(requestTimeout).then((_) {
if (!isComplete) { if (!isComplete) {
try { completer.complete(
throw JsonRpcException("_JsonRPCRequest timed out: $jsonRequest"); JsonRPCResponse(
} catch (e, s) { data: null,
completer.completeError(e, s); exception: JsonRpcException(
onTimedOut?.call(); "_JsonRPCRequest timed out: $jsonRequest",
} ),
),
);
} }
onTimedOut.call();
}); });
} }
@ -375,14 +411,3 @@ class JsonRPCResponse {
JsonRPCResponse({this.data, this.exception}); JsonRPCResponse({this.data, this.exception});
} }
bool isIpAddress(String host) {
try {
// if the string can be parsed into an InternetAddress, it's an IP.
InternetAddress(host);
return true;
} catch (e) {
// if parsing fails, it's not an IP.
return false;
}
}

View file

@ -482,6 +482,11 @@ abstract class Wallet<T extends CryptoCurrency> {
), ),
); );
// add some small buffer before making calls.
// this can probably be removed in the future but was added as a
// debugging feature
await Future<void>.delayed(const Duration(milliseconds: 300));
// TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided.
final Set<String> codesToCheck = {}; final Set<String> codesToCheck = {};
if (this is PaynymInterface) { if (this is PaynymInterface) {

View file

@ -1702,7 +1702,7 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
try { try {
final features = await electrumXClient final features = await electrumXClient
.getServerFeatures() .getServerFeatures()
.timeout(const Duration(seconds: 4)); .timeout(const Duration(seconds: 5));
Logging.instance.log("features: $features", level: LogLevel.Info); Logging.instance.log("features: $features", level: LogLevel.Info);
@ -1715,8 +1715,8 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
} catch (e, s) { } catch (e, s) {
// do nothing, still allow user into wallet // do nothing, still allow user into wallet
Logging.instance.log( Logging.instance.log(
"$runtimeType init() failed: $e\n$s", "$runtimeType init() did not complete: $e\n$s",
level: LogLevel.Error, level: LogLevel.Warning,
); );
} }

View file

@ -140,20 +140,18 @@ class MockJsonRPC extends _i1.Mock implements _i2.JsonRPC {
)), )),
) as _i5.Future<_i2.JsonRPCResponse>); ) as _i5.Future<_i2.JsonRPCResponse>);
@override @override
_i5.Future<void> disconnect({required String? reason}) => (super.noSuchMethod( _i5.Future<void> disconnect({
required String? reason,
bool? ignoreMutex = false,
}) =>
(super.noSuchMethod(
Invocation.method( Invocation.method(
#disconnect, #disconnect,
[], [],
{#reason: reason}, {
), #reason: reason,
returnValue: _i5.Future<void>.value(), #ignoreMutex: ignoreMutex,
returnValueForMissingStub: _i5.Future<void>.value(), },
) as _i5.Future<void>);
@override
_i5.Future<void> connect() => (super.noSuchMethod(
Invocation.method(
#connect,
[],
), ),
returnValue: _i5.Future<void>.value(), returnValue: _i5.Future<void>.value(),
returnValueForMissingStub: _i5.Future<void>.value(), returnValueForMissingStub: _i5.Future<void>.value(),