WIP move subscription over to electrum_adapter

This commit is contained in:
sneurlax 2024-02-15 17:53:39 -06:00
parent d00c205e6c
commit 25ffa1fee6
3 changed files with 908 additions and 924 deletions

View file

@ -256,7 +256,7 @@ class ElectrumXClient {
} }
} }
Future<void> _checkElectrumAdapter() async { Future<void> checkElectrumAdapter() async {
({InternetAddress host, int port})? proxyInfo; ({InternetAddress host, int port})? proxyInfo;
// If we're supposed to use Tor... // If we're supposed to use Tor...
@ -370,9 +370,9 @@ class ElectrumXClient {
if (_requireMutex) { if (_requireMutex) {
await _torConnectingLock await _torConnectingLock
.protect(() async => await _checkElectrumAdapter()); .protect(() async => await checkElectrumAdapter());
} else { } else {
await _checkElectrumAdapter(); await checkElectrumAdapter();
} }
try { try {
@ -450,9 +450,9 @@ class ElectrumXClient {
if (_requireMutex) { if (_requireMutex) {
await _torConnectingLock await _torConnectingLock
.protect(() async => await _checkElectrumAdapter()); .protect(() async => await checkElectrumAdapter());
} else { } else {
await _checkElectrumAdapter(); await checkElectrumAdapter();
} }
try { try {
@ -814,7 +814,7 @@ class ElectrumXClient {
}) async { }) async {
Logging.instance.log("attempting to fetch blockchain.transaction.get...", Logging.instance.log("attempting to fetch blockchain.transaction.get...",
level: LogLevel.Info); level: LogLevel.Info);
await _checkElectrumAdapter(); await checkElectrumAdapter();
dynamic response = await _electrumAdapterClient!.getTransaction(txHash); dynamic response = await _electrumAdapterClient!.getTransaction(txHash);
Logging.instance.log("Fetching blockchain.transaction.get finished", Logging.instance.log("Fetching blockchain.transaction.get finished",
level: LogLevel.Info); level: LogLevel.Info);
@ -847,7 +847,7 @@ class ElectrumXClient {
}) async { }) async {
Logging.instance.log("attempting to fetch lelantus.getanonymityset...", Logging.instance.log("attempting to fetch lelantus.getanonymityset...",
level: LogLevel.Info); level: LogLevel.Info);
await _checkElectrumAdapter(); await checkElectrumAdapter();
Map<String, dynamic> response = Map<String, dynamic> response =
await (_electrumAdapterClient as FiroElectrumClient)! await (_electrumAdapterClient as FiroElectrumClient)!
.getLelantusAnonymitySet(groupId: groupId, blockHash: blockhash); .getLelantusAnonymitySet(groupId: groupId, blockHash: blockhash);
@ -866,7 +866,7 @@ class ElectrumXClient {
}) async { }) async {
Logging.instance.log("attempting to fetch lelantus.getmintmetadata...", Logging.instance.log("attempting to fetch lelantus.getmintmetadata...",
level: LogLevel.Info); level: LogLevel.Info);
await _checkElectrumAdapter(); await checkElectrumAdapter();
dynamic response = await (_electrumAdapterClient as FiroElectrumClient)! dynamic response = await (_electrumAdapterClient as FiroElectrumClient)!
.getLelantusMintData(mints: mints); .getLelantusMintData(mints: mints);
Logging.instance.log("Fetching lelantus.getmintmetadata finished", Logging.instance.log("Fetching lelantus.getmintmetadata finished",
@ -882,7 +882,7 @@ class ElectrumXClient {
}) async { }) async {
Logging.instance.log("attempting to fetch lelantus.getusedcoinserials...", Logging.instance.log("attempting to fetch lelantus.getusedcoinserials...",
level: LogLevel.Info); level: LogLevel.Info);
await _checkElectrumAdapter(); await checkElectrumAdapter();
int retryCount = 3; int retryCount = 3;
dynamic response; dynamic response;
@ -906,7 +906,7 @@ class ElectrumXClient {
Future<int> getLelantusLatestCoinId({String? requestID}) async { Future<int> getLelantusLatestCoinId({String? requestID}) async {
Logging.instance.log("attempting to fetch lelantus.getlatestcoinid...", Logging.instance.log("attempting to fetch lelantus.getlatestcoinid...",
level: LogLevel.Info); level: LogLevel.Info);
await _checkElectrumAdapter(); await checkElectrumAdapter();
int response = int response =
await (_electrumAdapterClient as FiroElectrumClient).getLatestCoinId(); await (_electrumAdapterClient as FiroElectrumClient).getLatestCoinId();
Logging.instance.log("Fetching lelantus.getlatestcoinid finished", Logging.instance.log("Fetching lelantus.getlatestcoinid finished",
@ -937,7 +937,7 @@ class ElectrumXClient {
try { try {
Logging.instance.log("attempting to fetch spark.getsparkanonymityset...", Logging.instance.log("attempting to fetch spark.getsparkanonymityset...",
level: LogLevel.Info); level: LogLevel.Info);
await _checkElectrumAdapter(); await checkElectrumAdapter();
Map<String, dynamic> response = Map<String, dynamic> response =
await (_electrumAdapterClient as FiroElectrumClient) await (_electrumAdapterClient as FiroElectrumClient)
.getSparkAnonymitySet( .getSparkAnonymitySet(
@ -960,7 +960,7 @@ class ElectrumXClient {
// Use electrum_adapter package's getSparkUsedCoinsTags method. // Use electrum_adapter package's getSparkUsedCoinsTags method.
Logging.instance.log("attempting to fetch spark.getusedcoinstags...", Logging.instance.log("attempting to fetch spark.getusedcoinstags...",
level: LogLevel.Info); level: LogLevel.Info);
await _checkElectrumAdapter(); await checkElectrumAdapter();
Map<String, dynamic> response = Map<String, dynamic> response =
await (_electrumAdapterClient as FiroElectrumClient) await (_electrumAdapterClient as FiroElectrumClient)
.getUsedCoinsTags(startNumber: startNumber); .getUsedCoinsTags(startNumber: startNumber);
@ -993,7 +993,7 @@ class ElectrumXClient {
try { try {
Logging.instance.log("attempting to fetch spark.getsparkmintmetadata...", Logging.instance.log("attempting to fetch spark.getsparkmintmetadata...",
level: LogLevel.Info); level: LogLevel.Info);
await _checkElectrumAdapter(); await checkElectrumAdapter();
List<dynamic> response = List<dynamic> response =
await (_electrumAdapterClient as FiroElectrumClient) await (_electrumAdapterClient as FiroElectrumClient)
.getSparkMintMetaData(sparkCoinHashes: sparkCoinHashes); .getSparkMintMetaData(sparkCoinHashes: sparkCoinHashes);
@ -1015,7 +1015,7 @@ class ElectrumXClient {
try { try {
Logging.instance.log("attempting to fetch spark.getsparklatestcoinid...", Logging.instance.log("attempting to fetch spark.getsparklatestcoinid...",
level: LogLevel.Info); level: LogLevel.Info);
await _checkElectrumAdapter(); await checkElectrumAdapter();
int response = await (_electrumAdapterClient as FiroElectrumClient) int response = await (_electrumAdapterClient as FiroElectrumClient)
.getSparkLatestCoinId(); .getSparkLatestCoinId();
Logging.instance.log("Fetching spark.getsparklatestcoinid finished", Logging.instance.log("Fetching spark.getsparklatestcoinid finished",
@ -1037,7 +1037,7 @@ class ElectrumXClient {
/// "rate": 1000, /// "rate": 1000,
/// } /// }
Future<Map<String, dynamic>> getFeeRate({String? requestID}) async { Future<Map<String, dynamic>> getFeeRate({String? requestID}) async {
await _checkElectrumAdapter(); await checkElectrumAdapter();
return await _electrumAdapterClient!.getFeeRate(); return await _electrumAdapterClient!.getFeeRate();
} }

File diff suppressed because it is too large Load diff

View file

@ -10,7 +10,6 @@ import 'package:isar/isar.dart';
import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/cached_electrumx_client.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_chain_height_service.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart';
import 'package:stackwallet/electrumx_rpc/subscribable_electrumx_client.dart';
import 'package:stackwallet/models/isar/models/blockchain_data/v2/input_v2.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/v2/input_v2.dart';
import 'package:stackwallet/models/isar/models/blockchain_data/v2/output_v2.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/v2/output_v2.dart';
import 'package:stackwallet/models/isar/models/blockchain_data/v2/transaction_v2.dart'; import 'package:stackwallet/models/isar/models/blockchain_data/v2/transaction_v2.dart';
@ -33,17 +32,23 @@ import 'package:stackwallet/wallets/wallet/intermediate/bip39_hd_wallet.dart';
import 'package:stackwallet/wallets/wallet/wallet_mixin_interfaces/paynym_interface.dart'; import 'package:stackwallet/wallets/wallet/wallet_mixin_interfaces/paynym_interface.dart';
import 'package:stream_channel/stream_channel.dart'; import 'package:stream_channel/stream_channel.dart';
import '../../../services/event_bus/events/global/tor_connection_status_changed_event.dart';
import '../../../services/event_bus/events/global/tor_status_changed_event.dart';
mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> { mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
late ElectrumXClient electrumXClient; late ElectrumXClient electrumXClient;
late StreamChannel electrumAdapterChannel; late StreamChannel electrumAdapterChannel;
late ElectrumClient electrumAdapterClient; late ElectrumClient electrumAdapterClient;
late CachedElectrumXClient electrumXCachedClient; late CachedElectrumXClient electrumXCachedClient;
late SubscribableElectrumXClient subscribableElectrumXClient; // late SubscribableElectrumXClient subscribableElectrumXClient;
int? get maximumFeerate => null; int? get maximumFeerate => null;
int? _latestHeight; int? _latestHeight;
StreamSubscription<TorPreferenceChangedEvent>? _torPreferenceListener;
StreamSubscription<TorConnectionStatusChangedEvent>? _torStatusListener;
static const _kServerBatchCutoffVersion = [1, 6]; static const _kServerBatchCutoffVersion = [1, 6];
List<int>? _serverVersion; List<int>? _serverVersion;
bool get serverCanBatch { bool get serverCanBatch {
@ -810,6 +815,9 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
Future<int> fetchChainHeight() async { Future<int> fetchChainHeight() async {
try { try {
// _checkChainHeightSubscription();
// TODO above. Make sure that the subscription/stream is alive.
// Don't set a stream subscription if one already exists. // Don't set a stream subscription if one already exists.
if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] == if (ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] ==
null) { null) {
@ -818,18 +826,14 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// Make sure we only complete once. // Make sure we only complete once.
final isFirstResponse = _latestHeight == null; final isFirstResponse = _latestHeight == null;
// Subscribe to block headers. await electrumXClient.checkElectrumAdapter();
final subscription = // TODO [prio=extreme]: Does this update anything in this file?? Thinking no.
subscribableElectrumXClient.subscribeToBlockHeaders();
final stream = electrumAdapterClient.subscribeHeaders();
// set stream subscription
ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] = ElectrumxChainHeightService.subscriptions[cryptoCurrency.coin] =
subscription.responseStream.asBroadcastStream().listen((event) { stream.asBroadcastStream().listen((response) {
final response = event; final int chainHeight = response.height;
if (response != null &&
response is Map &&
response.containsKey('height')) {
final int chainHeight = response['height'] as int;
// print("Current chain height: $chainHeight"); // print("Current chain height: $chainHeight");
_latestHeight = chainHeight; _latestHeight = chainHeight;
@ -838,18 +842,10 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// Return the chain height. // Return the chain height.
completer.complete(chainHeight); completer.complete(chainHeight);
} }
} else {
Logging.instance.log(
"blockchain.headers.subscribe returned malformed response\n"
"Response: $response",
level: LogLevel.Error);
}
}); });
} else {
return _latestHeight ?? await completer.future;
}
// Don't set a stream subscription if one already exists. // Don't set a stream subscription if one already exists.
else {
// Check if the stream subscription is paused. // Check if the stream subscription is paused.
if (ElectrumxChainHeightService if (ElectrumxChainHeightService
.subscriptions[cryptoCurrency.coin]!.isPaused) { .subscriptions[cryptoCurrency.coin]!.isPaused) {
@ -858,19 +854,6 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
.resume(); .resume();
} }
// Causes synchronization to stall.
// // Check if the stream subscription is active by pinging it.
// if (!(await subscribableElectrumXClient.ping())) {
// // If it's not active, reconnect it.
// final node = await getCurrentElectrumXNode();
//
// await subscribableElectrumXClient.connect(
// host: node.address, port: node.port);
//
// // Wait for first response.
// return completer.future;
// }
if (_latestHeight != null) { if (_latestHeight != null) {
return _latestHeight!; return _latestHeight!;
} }
@ -985,13 +968,14 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
electrumAdapterClient: electrumAdapterClient, electrumAdapterClient: electrumAdapterClient,
electrumAdapterUpdateCallback: updateClient, electrumAdapterUpdateCallback: updateClient,
); );
subscribableElectrumXClient = SubscribableElectrumXClient.from( // Replaced using electrum_adapters' SubscribableClient in fetchChainHeight.
node: newNode, // subscribableElectrumXClient = SubscribableElectrumXClient.from(
prefs: prefs, // node: newNode,
failovers: failovers, // prefs: prefs,
); // failovers: failovers,
await subscribableElectrumXClient.connect( // );
host: newNode.address, port: newNode.port); // await subscribableElectrumXClient.connect(
// host: newNode.address, port: newNode.port);
} }
//============================================================================ //============================================================================