capture all grpc errors and much better error handling overall

This commit is contained in:
Matthew Fosse 2024-09-19 15:35:47 -07:00
parent c49f2ed5f1
commit 8d4443e504
2 changed files with 100 additions and 23 deletions
cw_bitcoin/lib
cw_mweb/lib

View file

@ -34,6 +34,7 @@ import 'package:cw_core/transaction_priority.dart';
import 'package:cw_core/wallet_info.dart';
import 'package:cw_core/wallet_keys_file.dart';
import 'package:flutter/foundation.dart';
import 'package:grpc/grpc.dart';
import 'package:hive/hive.dart';
import 'package:mobx/mobx.dart';
import 'package:cw_core/wallet_type.dart';
@ -264,11 +265,17 @@ abstract class LitecoinWalletBase extends ElectrumWallet with Store {
}
await waitForMwebAddresses();
await getStub();
await processMwebUtxos();
await updateTransactions();
await updateUnspent();
await updateBalance();
try {
await getStub();
await processMwebUtxos();
await updateTransactions();
await updateUnspent();
await updateBalance();
} catch (e) {
print("failed to start mweb sync: $e");
syncStatus = FailedSyncStatus();
return;
}
_syncTimer?.cancel();
_syncTimer = Timer.periodic(const Duration(milliseconds: 1500), (timer) async {
@ -276,7 +283,7 @@ abstract class LitecoinWalletBase extends ElectrumWallet with Store {
final nodeHeight =
await electrumClient.getCurrentBlockChainTip() ?? 0; // current block height of our node
final resp = await _stub.status(StatusRequest());
final resp = await CwMweb.status(StatusRequest());
if (resp.blockHeaderHeight < nodeHeight) {
int h = resp.blockHeaderHeight;
@ -487,7 +494,11 @@ abstract class LitecoinWalletBase extends ElectrumWallet with Store {
// process new utxos as they come in:
_utxoStream?.cancel();
_utxoStream = _stub.utxos(req).listen((Utxo sUtxo) async {
ResponseStream<Utxo>? responseStream = await CwMweb.utxos(req);
if (responseStream == null) {
throw Exception("failed to get utxos stream!");
}
_utxoStream = responseStream.listen((Utxo sUtxo) async {
final utxo = MwebUtxo(
address: sUtxo.address,
blockTime: sUtxo.blockTime,
@ -530,10 +541,10 @@ abstract class LitecoinWalletBase extends ElectrumWallet with Store {
final outputIds =
mwebUtxosBox.values.where((utxo) => utxo.height > 0).map((utxo) => utxo.outputId).toList();
final resp = await _stub.spent(SpentRequest(outputId: outputIds));
final resp = await CwMweb.spent(SpentRequest(outputId: outputIds));
final spent = resp.outputId;
if (spent.isEmpty) return;
final status = await _stub.status(StatusRequest());
final status = await CwMweb.status(StatusRequest());
final height = await electrumClient.getCurrentBlockChainTip();
if (height == null || status.blockHeaderHeight != height) return;
if (status.mwebUtxosHeight != height) return;
@ -599,9 +610,9 @@ abstract class LitecoinWalletBase extends ElectrumWallet with Store {
if (outputId.isEmpty) {
return false;
}
final resp = await _stub.spent(SpentRequest(outputId: outputId));
final resp = await CwMweb.spent(SpentRequest(outputId: outputId));
if (!setEquals(resp.outputId.toSet(), target)) return false;
final status = await _stub.status(StatusRequest());
final status = await CwMweb.status(StatusRequest());
if (!tx.isPending) return false;
tx.height = status.mwebUtxosHeight;
tx.confirmations = 1;
@ -799,7 +810,7 @@ abstract class LitecoinWalletBase extends ElectrumWallet with Store {
final fee = utxos.sumOfUtxosValue() - preOutputSum;
final txb =
BitcoinTransactionBuilder(utxos: utxos, outputs: outputs, fee: fee, network: network);
final resp = await _stub.create(CreateRequest(
final resp = await CwMweb.create(CreateRequest(
rawTx: txb.buildTransaction((a, b, c, d) => '').toBytes(),
scanSecret: scanSecret,
spendSecret: spendSecret,
@ -841,7 +852,7 @@ abstract class LitecoinWalletBase extends ElectrumWallet with Store {
await waitForMwebAddresses();
await getStub();
final resp = await _stub.create(CreateRequest(
final resp = await CwMweb.create(CreateRequest(
rawTx: hex.decode(tx.hex),
scanSecret: scanSecret,
spendSecret: spendSecret,
@ -927,8 +938,7 @@ abstract class LitecoinWalletBase extends ElectrumWallet with Store {
}
Future<StatusResponse> getStatusRequest() async {
await getStub();
final resp = await _stub.status(StatusRequest());
final resp = await CwMweb.status(StatusRequest());
return resp;
}

View file

@ -9,8 +9,13 @@ class CwMweb {
static RpcClient? _rpcClient;
static ClientChannel? _clientChannel;
static int? _port;
static const TIMEOUT_DURATION = Duration(seconds: 5);
static Future<void> _initializeClient() async {
await stop();
// wait a few seconds to make sure the server is stopped
await Future.delayed(const Duration(seconds: 5));
final appDir = await getApplicationSupportDirectory();
_port = await CwMwebPlatform.instance.start(appDir.path);
if (_port == null || _port == 0) {
@ -18,8 +23,12 @@ class CwMweb {
}
print("Attempting to connect to server on port: $_port");
_clientChannel = ClientChannel('127.0.0.1',
port: _port!,
// wait for the server to finish starting up before we try to connect to it:
await Future.delayed(const Duration(seconds: 5));
_clientChannel = ClientChannel('127.0.0.1', port: _port!, channelShutdownHandler: () {
print("Channel shutdown!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
},
options: const ChannelOptions(
credentials: ChannelCredentials.insecure(),
keepAlive: ClientKeepAliveOptions(permitWithoutCalls: true),
@ -34,27 +43,35 @@ class CwMweb {
await _initializeClient();
}
final status = await _rpcClient!
.status(StatusRequest(), options: CallOptions(timeout: const Duration(seconds: 3)));
.status(StatusRequest(), options: CallOptions(timeout: TIMEOUT_DURATION));
if (status.blockTime == 0) {
throw Exception("blockTime shouldn't be 0! (this connection is likely broken)");
}
return _rpcClient!;
} catch (e) {
print("Attempt $i failed: $e");
await stop(); // call stop so we create a new instance before retrying
await Future.delayed(const Duration(seconds: 4)); // wait before retrying
_rpcClient = null;
}
}
throw Exception("Failed to connect after $maxRetries attempts");
}
static Future<void> stop() async {
await CwMwebPlatform.instance.stop();
await cleanup();
try {
await CwMwebPlatform.instance.stop();
await cleanup();
} catch (e) {
print("Error stopping server: $e");
}
}
static Future<String?> address(Uint8List scanSecret, Uint8List spendPub, int index) async {
return CwMwebPlatform.instance.address(scanSecret, spendPub, index);
try {
return CwMwebPlatform.instance.address(scanSecret, spendPub, index);
} catch (e) {
print("Error getting address: $e");
return null;
}
}
static Future<void> cleanup() async {
@ -63,4 +80,54 @@ class CwMweb {
_clientChannel = null;
_port = null;
}
// wrappers that handle the connection issues:
static Future<SpentResponse> spent(SpentRequest request) async {
try {
if (_rpcClient == null) {
await _initializeClient();
}
return await _rpcClient!.spent(request, options: CallOptions(timeout: TIMEOUT_DURATION));
} catch (e) {
print("Error getting spent: $e");
return SpentResponse();
}
}
static Future<StatusResponse> status(StatusRequest request) async {
try {
if (_rpcClient == null) {
await _initializeClient();
}
return await _rpcClient!.status(request, options: CallOptions(timeout: TIMEOUT_DURATION));
} catch (e) {
print("Error getting status: $e");
return StatusResponse();
}
}
static Future<CreateResponse> create(CreateRequest request) async {
try {
if (_rpcClient == null) {
await _initializeClient();
}
return await _rpcClient!.create(request, options: CallOptions(timeout: TIMEOUT_DURATION));
} catch (e) {
print("Error getting create: $e");
return CreateResponse();
}
}
static Future<ResponseStream<Utxo>?> utxos(UtxosRequest request) async {
try {
if (_rpcClient == null) {
await _initializeClient();
}
// this is a stream, so we should have an effectively infinite timeout:
return _rpcClient!.utxos(request, options: CallOptions(timeout: const Duration(days: 99)));
} catch (e) {
print("Error getting utxos: $e");
return null;
}
}
}