From 2be13a89c545991d013a846987311c5cfdcb3400 Mon Sep 17 00:00:00 2001 From: julian Date: Sun, 4 Feb 2024 09:33:49 +0700 Subject: [PATCH] INCOMPLETE: WIP use streams instead of change notifier for electrumx socket subscriptions --- .../subscribable_electrumx_client.dart | 21 +++++++------- .../electrumx_interface.dart | 28 ++++++++++--------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/lib/electrumx_rpc/subscribable_electrumx_client.dart b/lib/electrumx_rpc/subscribable_electrumx_client.dart index 35f20ad09..ca4d8bd9f 100644 --- a/lib/electrumx_rpc/subscribable_electrumx_client.dart +++ b/lib/electrumx_rpc/subscribable_electrumx_client.dart @@ -12,17 +12,16 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; -import 'package:flutter/foundation.dart'; import 'package:stackwallet/electrumx_rpc/electrumx_client.dart'; import 'package:stackwallet/utilities/logger.dart'; -class ElectrumXSubscription with ChangeNotifier { - dynamic _response; - dynamic get response => _response; - set response(dynamic newData) { - _response = newData; - notifyListeners(); - } +class ElectrumXSubscription { + final StreamController _controller = + StreamController(); // TODO controller params + + Stream get responseStream => _controller.stream; + + void addToStream(dynamic data) => _controller.add(data); } class SocketTask { @@ -192,13 +191,13 @@ class SubscribableElectrumXClient { final scripthash = params.first as String; final taskId = "blockchain.scripthash.subscribe:$scripthash"; - _tasks[taskId]?.subscription?.response = params.last; + _tasks[taskId]?.subscription?.addToStream(params.last); break; case "blockchain.headers.subscribe": final params = response["params"]; const taskId = "blockchain.headers.subscribe"; - _tasks[taskId]?.subscription?.response = params.first; + _tasks[taskId]?.subscription?.addToStream(params.first); break; default: break; @@ -230,7 +229,7 @@ class SubscribableElectrumXClient { if (!(_tasks[id]?.isSubscription ?? false)) { _tasks.remove(id); } else { - _tasks[id]?.subscription?.response = data; + _tasks[id]?.subscription?.addToStream(data); } } diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart index 6b90245eb..502ec57ae 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/electrumx_interface.dart @@ -35,7 +35,8 @@ mixin ElectrumXInterface on Bip39HDWallet { int? get maximumFeerate => null; - int? latestHeight; + int? _latestHeight; + StreamSubscription? _heightSubscription; static const _kServerBatchCutoffVersion = [1, 6]; List? _serverVersion; @@ -800,14 +801,10 @@ mixin ElectrumXInterface on Bip39HDWallet { final Completer completer = Completer(); try { - // Subscribe to block headers. - final subscription = - subscribableElectrumXClient.subscribeToBlockHeaders(); - - // Don't add a listener if one already exists. - if (subscription.hasListeners) { - if (latestHeight != null) { - return latestHeight!; + // Don't set a stream subscription if one already exists. + if (_heightSubscription != null) { + if (_latestHeight != null) { + return _latestHeight!; } else { // Wait for first response. return completer.future; @@ -817,15 +814,20 @@ mixin ElectrumXInterface on Bip39HDWallet { // Make sure we only complete once. bool isFirstResponse = true; - // Add listener. - subscription.addListener(() { - final response = subscription.response; + // Subscribe to block headers. + final subscription = + subscribableElectrumXClient.subscribeToBlockHeaders(); + + // set stream subscription + _heightSubscription = + subscription.responseStream.asBroadcastStream().listen((event) { + final response = event; if (response != null && response is Map && response.containsKey('height')) { final int chainHeight = response['height'] as int; // print("Current chain height: $chainHeight"); - latestHeight = chainHeight; + _latestHeight = chainHeight; if (isFirstResponse) { isFirstResponse = false;