diff --git a/lib/services/cashfusion/fusion.dart b/lib/services/cashfusion/fusion.dart index ecef0f855..c3bf2bc55 100644 --- a/lib/services/cashfusion/fusion.dart +++ b/lib/services/cashfusion/fusion.dart @@ -204,7 +204,7 @@ class Fusion { Map safety_exess_fees = {}; Map> tierOutputs ={}; // not sure if this should be using outputs class. - int inactiveTimeLimit = 0; + int inactiveTimeLimit = 600000; // this is in ms... equates to 10 minutes. int tier = 0; int covertPort = 0; bool covertSSL = false; @@ -322,18 +322,13 @@ class Fusion { await registerAndWait(socketwrapper); - print ("FUSION DEBUG 273"); - print ("RETURNING early in fusion_run...."); - return; - - // launch the covert submitter - CovertSubmitter covert = await start_covert(); + CovertSubmitter covert = await startCovert(); try { // Pool started. Keep running rounds until fail or complete. while (true) { roundcount += 1; - if (await run_round(covert)) { + if (await runRound(covert)) { break; } } @@ -344,6 +339,9 @@ class Fusion { (await connection)?.close(); } + print ("RETURNING early in fusion_run...."); + return; + for (int i = 0; i < 60; i++) { if (stopping) { break; // not an error @@ -389,20 +387,6 @@ class Fusion { - Future start_covert() async { - // Function implementation here... - - // For now, just return a new instance of CovertSubmitter - return CovertSubmitter("dummy",0,true,"some_host",0,0,0,0); - } - - - Future run_round(CovertSubmitter covert) async { - // function implementation here... - - // placeholder return statement - return Future.value(false); - } void notify_server_status(bool b, {Tuple? tup}) { // Function implementation goes here @@ -823,7 +807,7 @@ static double nextDoubleNonZero(Random rng) { Future registerAndWait(SocketWrapper socketwrapper) async { - print ("DEBUG register and wait top."); + var stopwatch = Stopwatch()..start(); // msg can be different classes depending on which protobuf msg is sent. dynamic? msg; @@ -865,11 +849,10 @@ static double nextDoubleNonZero(Random rng) { var tiersStrings = {for (var entry in tierOutputs.entries) entry.key: (entry.key * 1e-8).toStringAsFixed(8).replaceAll(RegExp(r'0+$'), '')}; while (true) { - print ("RECEIVE LOOP 870............DEBUG"); - var msg = await recv2(socketwrapper,['tierstatusupdate', 'fusionbegin'], timeout: Duration(seconds: 10)); - + msg = await recv2(socketwrapper,['tierstatusupdate', 'fusionbegin'], timeout: Duration(seconds: 10)); var fieldInfoFusionBegin = msg.info_.byName["fusionbegin"]; if (fieldInfoFusionBegin != null && msg.hasField(fieldInfoFusionBegin.tagNumber)) { + print ("DEBUG 867 Fusion Begin message..."); break; } @@ -886,7 +869,7 @@ static double nextDoubleNonZero(Random rng) { } bool messageIsTierStatusUpdate = msg.hasField(fieldInfo.tagNumber); - + print ("DEBUG 889 getting tier update."); if (!messageIsTierStatusUpdate) { throw FusionError('Expected a TierStatusUpdate message'); @@ -915,12 +898,8 @@ static double nextDoubleNonZero(Random rng) { } var fieldInfoTimeRemaining = entry.value.info_.byName["timeRemaining"]; - if (fieldInfoTimeRemaining == null) { - throw FusionError('Expected field not found in message: timeRemaining'); - } if (entry.value.hasField(fieldInfoTimeRemaining.tagNumber)) { - int tr = entry.value.timeRemaining.toInt(); if (besttime == null || tr < besttime) { besttime = tr; @@ -933,7 +912,9 @@ static double nextDoubleNonZero(Random rng) { var displayBest = []; var displayMid = []; var displayQueued = []; + for (var tier in tiersSorted) { + if (statuses.containsKey(tier)) { var tierStr = tiersStrings[tier]; if (tierStr == null) { @@ -951,6 +932,7 @@ static double nextDoubleNonZero(Random rng) { } } + var parts = []; if (displayBest.isNotEmpty || displayMid.isNotEmpty) { parts.add("Tiers: ${displayBest.join(', ')} ${displayMid.join(', ')}"); @@ -961,11 +943,16 @@ static double nextDoubleNonZero(Random rng) { var tiersString = parts.join(' '); if (besttime == null && inactiveTimeLimit != null) { - if (DateTime.now().millisecondsSinceEpoch > inactiveTimeLimit) { + if (stopwatch.elapsedMilliseconds > inactiveTimeLimit) { throw FusionError('stopping due to inactivity'); + } else { + ; } + } else { + ; } + if (besttime != null) { status = Tuple('waiting', 'Starting in ${besttime}s. $tiersString'); } else if (maxfraction >= 1) { @@ -977,35 +964,47 @@ static double nextDoubleNonZero(Random rng) { } } + + var fieldInfoFusionBegin = msg.info_.byName["fusionbegin"]; + +// Check if fieldInfoFusionBegin is null if (fieldInfoFusionBegin == null) { throw FusionError('Expected field not found in message: fusionbegin'); } bool messageIsFusionBegin = msg.hasField(fieldInfoFusionBegin.tagNumber); + +// Print whether the message is FusionBegin or not if (!messageIsFusionBegin) { throw FusionError('Expected a FusionBegin message'); } - t_fusionBegin = DateTime.now(); + FusionBegin fusionBeginMsg = msg.fusionbegin; - var clockMismatch = msg.serverTime - DateTime.now().millisecondsSinceEpoch / 1000; - if (clockMismatch.abs() > Protocol.MAX_CLOCK_DISCREPANCY) { - throw FusionError("Clock mismatch too large: ${clockMismatch.toStringAsFixed(3)}."); + var elapsedSeconds = (stopwatch.elapsedMilliseconds / 1000).toInt(); + var clockMismatch = fusionBeginMsg.serverTime.toInt() - DateTime.now().millisecondsSinceEpoch / 1000; + + + + if (clockMismatch.abs().toDouble() > Protocol.MAX_CLOCK_DISCREPANCY) { + throw FusionError("Clock mismatch too large: ${(clockMismatch.toDouble()).toStringAsFixed(3)}."); } - tier = msg.tier; + + tier = fusionBeginMsg.tier.toInt(); + if (msg is FusionBegin) { covertDomainB = Uint8List.fromList(msg.covertDomain); } - covertPort = msg.covertPort; - covertSSL = msg.covertSSL; - beginTime = msg.serverTime; + covertPort = fusionBeginMsg.covertPort; + covertSSL = fusionBeginMsg.covertSsl; + beginTime = fusionBeginMsg.serverTime.toDouble(); lastHash = Util.calcInitialHash(tier, covertDomainB, covertPort, covertSSL, beginTime); @@ -1021,8 +1020,11 @@ static double nextDoubleNonZero(Random rng) { } Future startCovert() async { + + print ("DEBUG START COVERT!"); status = Tuple('running', 'Setting up Tor connections'); + String covertDomain; try { covertDomain = utf8.decode(covertDomainB); @@ -1047,6 +1049,8 @@ static double nextDoubleNonZero(Random rng) { connectTimeout: Protocol.COVERT_CONNECT_TIMEOUT.toInt() ); + print ("DEBUG return early from covert"); + return covert; // loop until a just a bit before we're expecting startRound, watching for status updates final tend = t_fusionBegin.add(Duration(seconds: (Protocol.WARMUP_TIME - Protocol.WARMUP_SLOP - 1).round())); @@ -1074,7 +1078,9 @@ static double nextDoubleNonZero(Random rng) { } - void runRound(CovertSubmitter covert) async { + Future runRound(CovertSubmitter covert) async { + print ("START OF RUN ROUND"); + status = Tuple('running', 'Starting round ${roundcount.toString()}'); int timeoutInSeconds = (2 * Protocol.WARMUP_SLOP + Protocol.STANDARD_TIMEOUT).toInt(); var msg = await recv(['startround'], timeout: Duration(seconds: timeoutInSeconds)); @@ -1158,6 +1164,9 @@ static double nextDoubleNonZero(Random rng) { final blindSigRequests = blindNoncePoints.map((e) => Schnorr.BlindSignatureRequest(roundPubKey, e, sha256(myComponents.elementAt(e)))).toList(); */ + print ("RETURNING EARLY FROM run round ....."); + return true; + final randomNumber = Util.getRandomBytes(32); covert.checkOk(); check_stop(); @@ -1581,8 +1590,8 @@ static double nextDoubleNonZero(Random rng) { // itself needs to check blockchain. await recv(['restartround'], timeout: Duration(seconds: 2 * (Protocol.STANDARD_TIMEOUT.round() + Protocol.BLAME_VERIFY_TIME.round()))); - - } // end of run_round() function. + return true; + } // end of runRound() function.