INCOMPLETE: WIP use streams instead of change notifier for electrumx socket subscriptions

This commit is contained in:
julian 2024-02-04 09:33:49 +07:00
parent 1b81af1e7e
commit 2be13a89c5
2 changed files with 25 additions and 24 deletions

View file

@ -12,17 +12,16 @@ import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:flutter/foundation.dart';
import 'package:stackwallet/electrumx_rpc/electrumx_client.dart';
import 'package:stackwallet/utilities/logger.dart';
class ElectrumXSubscription with ChangeNotifier {
dynamic _response;
dynamic get response => _response;
set response(dynamic newData) {
_response = newData;
notifyListeners();
}
class ElectrumXSubscription {
final StreamController<dynamic> _controller =
StreamController(); // TODO controller params
Stream<dynamic> get responseStream => _controller.stream;
void addToStream(dynamic data) => _controller.add(data);
}
class SocketTask {
@ -192,13 +191,13 @@ class SubscribableElectrumXClient {
final scripthash = params.first as String;
final taskId = "blockchain.scripthash.subscribe:$scripthash";
_tasks[taskId]?.subscription?.response = params.last;
_tasks[taskId]?.subscription?.addToStream(params.last);
break;
case "blockchain.headers.subscribe":
final params = response["params"];
const taskId = "blockchain.headers.subscribe";
_tasks[taskId]?.subscription?.response = params.first;
_tasks[taskId]?.subscription?.addToStream(params.first);
break;
default:
break;
@ -230,7 +229,7 @@ class SubscribableElectrumXClient {
if (!(_tasks[id]?.isSubscription ?? false)) {
_tasks.remove(id);
} else {
_tasks[id]?.subscription?.response = data;
_tasks[id]?.subscription?.addToStream(data);
}
}

View file

@ -35,7 +35,8 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
int? get maximumFeerate => null;
int? latestHeight;
int? _latestHeight;
StreamSubscription<dynamic>? _heightSubscription;
static const _kServerBatchCutoffVersion = [1, 6];
List<int>? _serverVersion;
@ -800,14 +801,10 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
final Completer<int> completer = Completer<int>();
try {
// Subscribe to block headers.
final subscription =
subscribableElectrumXClient.subscribeToBlockHeaders();
// Don't add a listener if one already exists.
if (subscription.hasListeners) {
if (latestHeight != null) {
return latestHeight!;
// Don't set a stream subscription if one already exists.
if (_heightSubscription != null) {
if (_latestHeight != null) {
return _latestHeight!;
} else {
// Wait for first response.
return completer.future;
@ -817,15 +814,20 @@ mixin ElectrumXInterface<T extends Bip39HDCurrency> on Bip39HDWallet<T> {
// Make sure we only complete once.
bool isFirstResponse = true;
// Add listener.
subscription.addListener(() {
final response = subscription.response;
// Subscribe to block headers.
final subscription =
subscribableElectrumXClient.subscribeToBlockHeaders();
// set stream subscription
_heightSubscription =
subscription.responseStream.asBroadcastStream().listen((event) {
final response = event;
if (response != null &&
response is Map &&
response.containsKey('height')) {
final int chainHeight = response['height'] as int;
// print("Current chain height: $chainHeight");
latestHeight = chainHeight;
_latestHeight = chainHeight;
if (isFirstResponse) {
isFirstResponse = false;