update fusion.dart to get to runround stage

This commit is contained in:
Jonald Fyookball 2023-08-03 08:50:28 -04:00
parent e91ac3d355
commit 387e4ba99c

View file

@ -204,7 +204,7 @@ class Fusion {
Map<int, int> safety_exess_fees = {};
Map<int, List<int>> tierOutputs ={}; // not sure if this should be using outputs class.
int inactiveTimeLimit = 0;
int inactiveTimeLimit = 600000; // this is in ms... equates to 10 minutes.
int tier = 0;
int covertPort = 0;
bool covertSSL = false;
@ -322,18 +322,13 @@ class Fusion {
await registerAndWait(socketwrapper);
print ("FUSION DEBUG 273");
print ("RETURNING early in fusion_run....");
return;
// launch the covert submitter
CovertSubmitter covert = await start_covert();
CovertSubmitter covert = await startCovert();
try {
// Pool started. Keep running rounds until fail or complete.
while (true) {
roundcount += 1;
if (await run_round(covert)) {
if (await runRound(covert)) {
break;
}
}
@ -344,6 +339,9 @@ class Fusion {
(await connection)?.close();
}
print ("RETURNING early in fusion_run....");
return;
for (int i = 0; i < 60; i++) {
if (stopping) {
break; // not an error
@ -389,20 +387,6 @@ class Fusion {
Future<CovertSubmitter> start_covert() async {
// Function implementation here...
// For now, just return a new instance of CovertSubmitter
return CovertSubmitter("dummy",0,true,"some_host",0,0,0,0);
}
Future<bool> run_round(CovertSubmitter covert) async {
// function implementation here...
// placeholder return statement
return Future.value(false);
}
void notify_server_status(bool b, {Tuple? tup}) {
// Function implementation goes here
@ -823,7 +807,7 @@ static double nextDoubleNonZero(Random rng) {
Future<void> registerAndWait(SocketWrapper socketwrapper) async {
print ("DEBUG register and wait top.");
var stopwatch = Stopwatch()..start();
// msg can be different classes depending on which protobuf msg is sent.
dynamic? msg;
@ -865,11 +849,10 @@ static double nextDoubleNonZero(Random rng) {
var tiersStrings = {for (var entry in tierOutputs.entries) entry.key: (entry.key * 1e-8).toStringAsFixed(8).replaceAll(RegExp(r'0+$'), '')};
while (true) {
print ("RECEIVE LOOP 870............DEBUG");
var msg = await recv2(socketwrapper,['tierstatusupdate', 'fusionbegin'], timeout: Duration(seconds: 10));
msg = await recv2(socketwrapper,['tierstatusupdate', 'fusionbegin'], timeout: Duration(seconds: 10));
var fieldInfoFusionBegin = msg.info_.byName["fusionbegin"];
if (fieldInfoFusionBegin != null && msg.hasField(fieldInfoFusionBegin.tagNumber)) {
print ("DEBUG 867 Fusion Begin message...");
break;
}
@ -886,7 +869,7 @@ static double nextDoubleNonZero(Random rng) {
}
bool messageIsTierStatusUpdate = msg.hasField(fieldInfo.tagNumber);
print ("DEBUG 889 getting tier update.");
if (!messageIsTierStatusUpdate) {
throw FusionError('Expected a TierStatusUpdate message');
@ -915,12 +898,8 @@ static double nextDoubleNonZero(Random rng) {
}
var fieldInfoTimeRemaining = entry.value.info_.byName["timeRemaining"];
if (fieldInfoTimeRemaining == null) {
throw FusionError('Expected field not found in message: timeRemaining');
}
if (entry.value.hasField(fieldInfoTimeRemaining.tagNumber)) {
int tr = entry.value.timeRemaining.toInt();
if (besttime == null || tr < besttime) {
besttime = tr;
@ -933,7 +912,9 @@ static double nextDoubleNonZero(Random rng) {
var displayBest = <String>[];
var displayMid = <String>[];
var displayQueued = <String>[];
for (var tier in tiersSorted) {
if (statuses.containsKey(tier)) {
var tierStr = tiersStrings[tier];
if (tierStr == null) {
@ -951,6 +932,7 @@ static double nextDoubleNonZero(Random rng) {
}
}
var parts = <String>[];
if (displayBest.isNotEmpty || displayMid.isNotEmpty) {
parts.add("Tiers: ${displayBest.join(', ')} ${displayMid.join(', ')}");
@ -961,11 +943,16 @@ static double nextDoubleNonZero(Random rng) {
var tiersString = parts.join(' ');
if (besttime == null && inactiveTimeLimit != null) {
if (DateTime.now().millisecondsSinceEpoch > inactiveTimeLimit) {
if (stopwatch.elapsedMilliseconds > inactiveTimeLimit) {
throw FusionError('stopping due to inactivity');
} else {
;
}
} else {
;
}
if (besttime != null) {
status = Tuple<String, String>('waiting', 'Starting in ${besttime}s. $tiersString');
} else if (maxfraction >= 1) {
@ -977,35 +964,47 @@ static double nextDoubleNonZero(Random rng) {
}
}
var fieldInfoFusionBegin = msg.info_.byName["fusionbegin"];
// Check if fieldInfoFusionBegin is null
if (fieldInfoFusionBegin == null) {
throw FusionError('Expected field not found in message: fusionbegin');
}
bool messageIsFusionBegin = msg.hasField(fieldInfoFusionBegin.tagNumber);
// Print whether the message is FusionBegin or not
if (!messageIsFusionBegin) {
throw FusionError('Expected a FusionBegin message');
}
t_fusionBegin = DateTime.now();
FusionBegin fusionBeginMsg = msg.fusionbegin;
var clockMismatch = msg.serverTime - DateTime.now().millisecondsSinceEpoch / 1000;
if (clockMismatch.abs() > Protocol.MAX_CLOCK_DISCREPANCY) {
throw FusionError("Clock mismatch too large: ${clockMismatch.toStringAsFixed(3)}.");
var elapsedSeconds = (stopwatch.elapsedMilliseconds / 1000).toInt();
var clockMismatch = fusionBeginMsg.serverTime.toInt() - DateTime.now().millisecondsSinceEpoch / 1000;
if (clockMismatch.abs().toDouble() > Protocol.MAX_CLOCK_DISCREPANCY) {
throw FusionError("Clock mismatch too large: ${(clockMismatch.toDouble()).toStringAsFixed(3)}.");
}
tier = msg.tier;
tier = fusionBeginMsg.tier.toInt();
if (msg is FusionBegin) {
covertDomainB = Uint8List.fromList(msg.covertDomain);
}
covertPort = msg.covertPort;
covertSSL = msg.covertSSL;
beginTime = msg.serverTime;
covertPort = fusionBeginMsg.covertPort;
covertSSL = fusionBeginMsg.covertSsl;
beginTime = fusionBeginMsg.serverTime.toDouble();
lastHash = Util.calcInitialHash(tier, covertDomainB, covertPort, covertSSL, beginTime);
@ -1021,8 +1020,11 @@ static double nextDoubleNonZero(Random rng) {
}
Future<CovertSubmitter> startCovert() async {
print ("DEBUG START COVERT!");
status = Tuple('running', 'Setting up Tor connections');
String covertDomain;
try {
covertDomain = utf8.decode(covertDomainB);
@ -1047,6 +1049,8 @@ static double nextDoubleNonZero(Random rng) {
connectTimeout: Protocol.COVERT_CONNECT_TIMEOUT.toInt()
);
print ("DEBUG return early from covert");
return covert;
// loop until a just a bit before we're expecting startRound, watching for status updates
final tend = t_fusionBegin.add(Duration(seconds: (Protocol.WARMUP_TIME - Protocol.WARMUP_SLOP - 1).round()));
@ -1074,7 +1078,9 @@ static double nextDoubleNonZero(Random rng) {
}
void runRound(CovertSubmitter covert) async {
Future<bool> runRound(CovertSubmitter covert) async {
print ("START OF RUN ROUND");
status = Tuple('running', 'Starting round ${roundcount.toString()}');
int timeoutInSeconds = (2 * Protocol.WARMUP_SLOP + Protocol.STANDARD_TIMEOUT).toInt();
var msg = await recv(['startround'], timeout: Duration(seconds: timeoutInSeconds));
@ -1158,6 +1164,9 @@ static double nextDoubleNonZero(Random rng) {
final blindSigRequests = blindNoncePoints.map((e) => Schnorr.BlindSignatureRequest(roundPubKey, e, sha256(myComponents.elementAt(e)))).toList();
*/
print ("RETURNING EARLY FROM run round .....");
return true;
final randomNumber = Util.getRandomBytes(32);
covert.checkOk();
check_stop();
@ -1581,8 +1590,8 @@ static double nextDoubleNonZero(Random rng) {
// itself needs to check blockchain.
await recv(['restartround'], timeout: Duration(seconds: 2 * (Protocol.STANDARD_TIMEOUT.round() + Protocol.BLAME_VERIFY_TIME.round())));
} // end of run_round() function.
return true;
} // end of runRound() function.