diff --git a/processor/src/main.rs b/processor/src/main.rs index b080c379..84585804 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -116,11 +116,62 @@ async fn prepare_send( } } +// Items which are mutably borrowed by Tributary. +// Any exceptions to this have to be carefully monitored in order to ensure consistency isn't +// violated. +struct TributaryMutable { + // The following are actually mutably borrowed by Substrate as well. + // - Substrate triggers key gens, and determines which to use. + // - SubstrateBlock events cause scheduling which causes signing. + // + // This is still considered Tributary-mutable as most mutation (preprocesses/shares) happens by + // the Tributary. + // + // Creation of tasks is by Substrate, yet this is safe since the mutable borrow is transferred to + // Tributary. + // + // Tributary stops mutating a key gen attempt before Substrate is made aware of it, ensuring + // Tributary drops its mutable borrow before Substrate acquires it. Tributary will maintain a + // mutable borrow on the *key gen task*, yet the finalization code can successfully run for any + // attempt. + // + // The only other note is how the scanner may cause a signer task to be dropped, effectively + // invalidating the Tributary's mutable borrow. The signer is coded to allow for attempted usage + // of a dropped task. + key_gen: KeyGen, + signers: HashMap, Signer>, + + // This isn't mutably borrowed by Substrate. It is also mutably borrowed by the Scanner. + // The safety of this is from the Scanner starting new sign tasks, and Tributary only mutating + // already-created sign tasks. The Scanner does not mutate sign tasks post-creation. + substrate_signers: HashMap, SubstrateSigner>, +} + +// Items which are mutably borrowed by Substrate. +// Any exceptions to this have to be carefully monitored in order to ensure consistency isn't +// violated. +struct SubstrateMutable { + // The scanner is expected to autonomously operate, scanning blocks as they appear. + // When a block is sufficiently confirmed, the scanner mutates the signer to try and get a Batch + // signed. + // The scanner itself only mutates its list of finalized blocks and in-memory state though. + // Disk mutations to the scan-state only happen when Substrate says to. + + // This can't be mutated as soon as a Batch is signed since the mutation which occurs then is + // paired with the mutations caused by Burn events. Substrate's ordering determines if such a + // pairing exists. + scanner: ScannerHandle, + + // Schedulers take in new outputs, from the scanner, and payments, from Burn events on Substrate. + // These are paired when possible, in the name of efficiency. Accordingly, both mutations must + // happen by Substrate. + schedulers: HashMap, Scheduler>, +} + async fn sign_plans( db: &mut MainDb, coin: &C, - scanner: &ScannerHandle, - schedulers: &mut HashMap, Scheduler>, + substrate_mutable: &mut SubstrateMutable, signers: &mut HashMap, Signer>, context: SubstrateContext, plans: Vec>, @@ -129,7 +180,8 @@ async fn sign_plans( let mut block_hash = >::Id::default(); block_hash.as_mut().copy_from_slice(&context.coin_latest_finalized_block.0); - let block_number = scanner + let block_number = substrate_mutable + .scanner .block_number(&block_hash) .await .expect("told to sign_plans on a context we're not synced to"); @@ -153,26 +205,192 @@ async fn sign_plans( // The key_gen/scanner/signer are designed to be deterministic to new data, irrelevant to prior // states. for branch in branches { - schedulers + substrate_mutable + .schedulers .get_mut(key.as_ref()) .expect("didn't have a scheduler for a key we have a plan for") .created_output(branch.expected, branch.actual); } if let Some((tx, eventuality)) = tx { - scanner.register_eventuality(block_number, id, eventuality.clone()).await; + substrate_mutable.scanner.register_eventuality(block_number, id, eventuality.clone()).await; signers.get_mut(key.as_ref()).unwrap().sign_transaction(id, tx, eventuality).await; } } } -async fn run(raw_db: D, coin: C, mut coordinator: Co) { - // We currently expect a contextless bidirectional mapping between these two values - // (which is that any value of A can be interpreted as B and vice versa) - // While we can write a contextual mapping, we have yet to do so - // This check ensures no coin which doesn't have a bidirectional mapping is defined - assert_eq!(>::Id::default().as_ref().len(), BlockHash([0u8; 32]).0.len()); +async fn handle_coordinator_msg( + raw_db: &D, + main_db: &mut MainDb, + coin: &C, + coordinator: &mut Co, + tributary_mutable: &mut TributaryMutable, + substrate_mutable: &mut SubstrateMutable, + msg: Message, +) { + // If this message expects a higher block number than we have, halt until synced + async fn wait(scanner: &ScannerHandle, block_hash: &BlockHash) { + let mut needed_hash = >::Id::default(); + needed_hash.as_mut().copy_from_slice(&block_hash.0); + let block_number; + loop { + // Ensure our scanner has scanned this block, which means our daemon has this block at + // a sufficient depth + let Some(block_number_inner) = scanner.block_number(&needed_hash).await else { + warn!( + "node is desynced. we haven't scanned {} which should happen after {} confirms", + hex::encode(&needed_hash), + C::CONFIRMATIONS, + ); + sleep(Duration::from_secs(10)).await; + continue; + }; + block_number = block_number_inner; + break; + } + + // While the scanner has cemented this block, that doesn't mean it's been scanned for all + // keys + // ram_scanned will return the lowest scanned block number out of all keys + while scanner.ram_scanned().await < block_number { + sleep(Duration::from_secs(1)).await; + } + + // TODO: Sanity check we got an AckBlock (or this is the AckBlock) for the block in + // question + + /* + let synced = |context: &SubstrateContext, key| -> Result<(), ()> { + // Check that we've synced this block and can actually operate on it ourselves + let latest = scanner.latest_scanned(key); + if usize::try_from(context.coin_latest_block_number).unwrap() < latest { + log::warn!( + "coin node disconnected/desynced from rest of the network. \ + our block: {latest:?}, network's acknowledged: {}", + context.coin_latest_block_number + ); + Err(())?; + } + Ok(()) + }; + */ + } + + if let Some(required) = msg.msg.required_block() { + // wait only reads from, it doesn't mutate, the scanner + wait(&substrate_mutable.scanner, &required).await; + } + + // TODO: Shouldn't we create a txn here and pass it around as needed? + // The txn would ack this message ID. If we detect this mesage ID as handled in the DB, + // we'd move on here. Only after committing the TX would we report it as acked. + + match msg.msg.clone() { + CoordinatorMessage::KeyGen(msg) => { + match tributary_mutable.key_gen.handle(msg).await { + // This should only occur when Substrate confirms a key, enabling access of + // substrate_mutable + // TODO: Move this under Substrate accordingly + KeyGenEvent::KeyConfirmed { activation_block, substrate_keys, coin_keys } => { + tributary_mutable.substrate_signers.insert( + substrate_keys.group_key().to_bytes().to_vec(), + SubstrateSigner::new(raw_db.clone(), substrate_keys), + ); + + let key = coin_keys.group_key(); + + let mut activation_block_hash = >::Id::default(); + activation_block_hash.as_mut().copy_from_slice(&activation_block.0); + let activation_number = substrate_mutable + .scanner + .block_number(&activation_block_hash) + .await + .expect("KeyConfirmed from context we haven't synced"); + + substrate_mutable.scanner.rotate_key(activation_number, key).await; + substrate_mutable + .schedulers + .insert(key.to_bytes().as_ref().to_vec(), Scheduler::::new(key)); + tributary_mutable.signers.insert( + key.to_bytes().as_ref().to_vec(), + Signer::new(raw_db.clone(), coin.clone(), coin_keys), + ); + } + + // TODO: This may be fired multiple times. What's our plan for that? + KeyGenEvent::ProcessorMessage(msg) => { + coordinator.send(ProcessorMessage::KeyGen(msg)).await; + } + } + } + + CoordinatorMessage::Sign(msg) => { + tributary_mutable.signers.get_mut(msg.key()).unwrap().handle(msg).await; + } + + CoordinatorMessage::Coordinator(msg) => { + tributary_mutable.substrate_signers.get_mut(msg.key()).unwrap().handle(msg).await; + } + + CoordinatorMessage::Substrate(msg) => { + match msg { + messages::substrate::CoordinatorMessage::SubstrateBlock { + context, + key: key_vec, + burns, + } => { + let mut block_id = >::Id::default(); + block_id.as_mut().copy_from_slice(&context.coin_latest_finalized_block.0); + + let key = ::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); + + // We now have to acknowledge every block for this key up to the acknowledged block + let outputs = substrate_mutable.scanner.ack_up_to_block(key, block_id).await; + + let mut payments = vec![]; + for out in burns { + let OutInstructionWithBalance { + instruction: OutInstruction { address, data }, + balance, + } = out; + if let Ok(address) = C::Address::try_from(address.consume()) { + payments.push(Payment { + address, + data: data.map(|data| data.consume()), + amount: balance.amount.0, + }); + } + } + + let plans = substrate_mutable + .schedulers + .get_mut(&key_vec) + .expect("key we don't have a scheduler for acknowledged a block") + .schedule(outputs, payments); + + sign_plans( + main_db, + coin, + substrate_mutable, + // See commentary in TributaryMutable for why this is safe + &mut tributary_mutable.signers, + context, + plans, + ) + .await; + } + } + } + } + + coordinator.ack(msg).await; +} + +async fn boot( + raw_db: &D, + coin: &C, +) -> (MainDb, TributaryMutable, SubstrateMutable) { let mut entropy_transcript = { let entropy = Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't provided as an env var")); @@ -189,6 +407,8 @@ async fn run(raw_db: D, coin: C, mut coordinato transcript }; + // TODO: Save a hash of the entropy to the DB and make sure the entropy didn't change + let mut entropy = |label| { let mut challenge = entropy_transcript.challenge(label); let mut res = Zeroizing::new([0; 32]); @@ -200,15 +420,15 @@ async fn run(raw_db: D, coin: C, mut coordinato // We don't need to re-issue GenerateKey orders because the coordinator is expected to // schedule/notify us of new attempts - let mut key_gen = KeyGen::::new(raw_db.clone(), entropy(b"key-gen_entropy")); + let key_gen = KeyGen::::new(raw_db.clone(), entropy(b"key-gen_entropy")); // The scanner has no long-standing orders to re-issue let (mut scanner, active_keys) = Scanner::new(coin.clone(), raw_db.clone()); - let mut schedulers = HashMap::, Scheduler>::new(); + let schedulers = HashMap::, Scheduler>::new(); let mut substrate_signers = HashMap::new(); let mut signers = HashMap::new(); - let mut main_db = MainDb::new(raw_db.clone()); + let main_db = MainDb::new(raw_db.clone()); for key in &active_keys { // TODO: Load existing schedulers @@ -228,15 +448,15 @@ async fn run(raw_db: D, coin: C, mut coordinato for (block_number, plan) in main_db.signing(key.as_ref()) { let block_number = block_number.try_into().unwrap(); - let fee = get_fee(&coin, block_number).await; + let fee = get_fee(coin, block_number).await; let id = plan.id(); info!("reloading plan {}: {:?}", hex::encode(id), plan); let (Some((tx, eventuality)), _) = - prepare_send(&coin, &signer, block_number, fee, plan).await else { - panic!("previously created transaction is no longer being created") - }; + prepare_send(coin, &signer, block_number, fee, plan).await else { + panic!("previously created transaction is no longer being created") + }; scanner.register_eventuality(block_number, id, eventuality.clone()).await; // TODO: Reconsider if the Signer should have the eventuality, or if just the coin/scanner @@ -247,6 +467,22 @@ async fn run(raw_db: D, coin: C, mut coordinato signers.insert(key.as_ref().to_vec(), signer); } + ( + main_db, + TributaryMutable { key_gen, substrate_signers, signers }, + SubstrateMutable { scanner, schedulers }, + ) +} + +async fn run(raw_db: D, coin: C, mut coordinator: Co) { + // We currently expect a contextless bidirectional mapping between these two values + // (which is that any value of A can be interpreted as B and vice versa) + // While we can write a contextual mapping, we have yet to do so + // This check ensures no coin which doesn't have a bidirectional mapping is defined + assert_eq!(>::Id::default().as_ref().len(), BlockHash([0u8; 32]).0.len()); + + let (mut main_db, mut tributary_mutable, mut substrate_mutable) = boot(&raw_db, &coin).await; + // We can't load this from the DB as we can't guarantee atomic increments with the ack function let mut last_coordinator_msg = None; @@ -254,7 +490,7 @@ async fn run(raw_db: D, coin: C, mut coordinato // Check if the signers have events // The signers will only have events after the following select executes, which will then // trigger the loop again, hence why having the code here with no timer is fine - for (key, signer) in signers.iter_mut() { + for (key, signer) in tributary_mutable.signers.iter_mut() { while let Some(msg) = signer.events.pop_front() { match msg { SignerEvent::ProcessorMessage(msg) => { @@ -265,7 +501,9 @@ async fn run(raw_db: D, coin: C, mut coordinato // If we die after calling finish_signing, we'll never fire Completed // TODO: Is that acceptable? Do we need to fire Completed before firing finish_signing? main_db.finish_signing(key, id); - scanner.drop_eventuality(id).await; + // This does mutate the Scanner, yet the eventuality protocol is only run to mutate + // the signer, which is Tributary mutable (and what's currently being mutated) + substrate_mutable.scanner.drop_eventuality(id).await; coordinator .send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { key: key.clone(), @@ -286,7 +524,7 @@ async fn run(raw_db: D, coin: C, mut coordinato } } - for (key, signer) in substrate_signers.iter_mut() { + for (key, signer) in tributary_mutable.substrate_signers.iter_mut() { while let Some(msg) = signer.events.pop_front() { match msg { SubstrateSignerEvent::ProcessorMessage(msg) => { @@ -314,158 +552,33 @@ async fn run(raw_db: D, coin: C, mut coordinato assert_eq!(msg.id, (last_coordinator_msg.unwrap_or(msg.id - 1) + 1)); last_coordinator_msg = Some(msg.id); - // If this message expects a higher block number than we have, halt until synced - async fn wait( - scanner: &ScannerHandle, - block_hash: &BlockHash - ) { - let mut needed_hash = >::Id::default(); - needed_hash.as_mut().copy_from_slice(&block_hash.0); + // If we've already handled this message, continue + // TODO - let block_number; - loop { - // Ensure our scanner has scanned this block, which means our daemon has this block at - // a sufficient depth - let Some(block_number_inner) = scanner.block_number(&needed_hash).await else { - warn!( - "node is desynced. we haven't scanned {} which should happen after {} confirms", - hex::encode(&needed_hash), - C::CONFIRMATIONS, - ); - sleep(Duration::from_secs(10)).await; - continue; - }; - block_number = block_number_inner; - break; - } - - // While the scanner has cemented this block, that doesn't mean it's been scanned for all - // keys - // ram_scanned will return the lowest scanned block number out of all keys - while scanner.ram_scanned().await < block_number { - sleep(Duration::from_secs(1)).await; - } - - // TODO: Sanity check we got an AckBlock (or this is the AckBlock) for the block in - // question - - /* - let synced = |context: &SubstrateContext, key| -> Result<(), ()> { - // Check that we've synced this block and can actually operate on it ourselves - let latest = scanner.latest_scanned(key); - if usize::try_from(context.coin_latest_block_number).unwrap() < latest { - log::warn!( - "coin node disconnected/desynced from rest of the network. \ - our block: {latest:?}, network's acknowledged: {}", - context.coin_latest_block_number - ); - Err(())?; - } - Ok(()) - }; - */ - } - - if let Some(required) = msg.msg.required_block() { - wait(&scanner, &required).await; - } - - match msg.msg.clone() { - CoordinatorMessage::KeyGen(msg) => { - match key_gen.handle(msg).await { - KeyGenEvent::KeyConfirmed { activation_block, substrate_keys, coin_keys } => { - substrate_signers.insert( - substrate_keys.group_key().to_bytes().to_vec(), - SubstrateSigner::new(raw_db.clone(), substrate_keys), - ); - - let key = coin_keys.group_key(); - - let mut activation_block_hash = >::Id::default(); - activation_block_hash.as_mut().copy_from_slice(&activation_block.0); - let activation_number = - scanner - .block_number(&activation_block_hash) - .await - .expect("KeyConfirmed from context we haven't synced"); - - scanner.rotate_key(activation_number, key).await; - schedulers.insert(key.to_bytes().as_ref().to_vec(), Scheduler::::new(key)); - signers.insert( - key.to_bytes().as_ref().to_vec(), - Signer::new(raw_db.clone(), coin.clone(), coin_keys) - ); - }, - - // TODO: This may be fired multiple times. What's our plan for that? - KeyGenEvent::ProcessorMessage(msg) => { - coordinator.send(ProcessorMessage::KeyGen(msg)).await; - }, - } - }, - - CoordinatorMessage::Sign(msg) => { - signers.get_mut(msg.key()).unwrap().handle(msg).await; - }, - - CoordinatorMessage::Coordinator(msg) => { - substrate_signers.get_mut(msg.key()).unwrap().handle(msg).await; - }, - - CoordinatorMessage::Substrate(msg) => { - match msg { - messages::substrate::CoordinatorMessage::SubstrateBlock { - context, - key: key_vec, - burns, - } => { - let mut block_id = >::Id::default(); - block_id.as_mut().copy_from_slice(&context.coin_latest_finalized_block.0); - - let key = - ::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); - - // We now have to acknowledge every block for this key up to the acknowledged block - let outputs = scanner.ack_up_to_block(key, block_id).await; - - let mut payments = vec![]; - for out in burns { - let OutInstructionWithBalance { - instruction: OutInstruction { address, data }, - balance, - } = out; - if let Ok(address) = C::Address::try_from(address.consume()) { - payments.push(Payment { - address, - data: data.map(|data| data.consume()), - amount: balance.amount.0, - }); - } - } - - let plans = schedulers - .get_mut(&key_vec) - .expect("key we don't have a scheduler for acknowledged a block") - .schedule(outputs, payments); - - sign_plans( - &mut main_db, - &coin, - &scanner, - &mut schedulers, - &mut signers, - context, - plans - ).await; - } - } - } - } - - coordinator.ack(msg).await; + // This is isolated to better think about how its ordered, or rather, about how the + // following cases aren't ordered + // + // While the coordinator messages are ordered, they're not deterministically ordered + // While Tributary-caused messages are deterministically ordered, and Substrate-caused + // messages are deterministically-ordered, they're both shoved into this singular queue + // The order at which they're shoved in together isn't deterministic + // + // This should be safe so long as Tributary and Substrate messages don't both expect + // mutable references over the same data + // + // TODO: Better assert/guarantee this + handle_coordinator_msg( + &raw_db, + &mut main_db, + &coin, + &mut coordinator, + &mut tributary_mutable, + &mut substrate_mutable, + msg, + ).await; }, - msg = scanner.events.recv() => { + msg = substrate_mutable.scanner.events.recv() => { match msg.unwrap() { ScannerEvent::Block { key, block, batch, outputs } => { let key = key.to_bytes().as_ref().to_vec(); @@ -504,12 +617,13 @@ async fn run(raw_db: D, coin: C, mut coordinato }).collect() }; - substrate_signers.get_mut(&key).unwrap().sign(batch).await; + // Start signing this batch + tributary_mutable.substrate_signers.get_mut(&key).unwrap().sign(batch).await; }, ScannerEvent::Completed(id, tx) => { // We don't know which signer had this plan, so inform all of them - for (_, signer) in signers.iter_mut() { + for (_, signer) in tributary_mutable.signers.iter_mut() { signer.eventuality_completion(id, &tx).await; } }, diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 867e6cf2..419a1b9e 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -240,7 +240,7 @@ impl ScannerHandle { } pub async fn register_eventuality( - &self, + &mut self, block_number: usize, id: [u8; 32], eventuality: C::Eventuality, @@ -248,7 +248,7 @@ impl ScannerHandle { self.scanner.write().await.eventualities.register(block_number, id, eventuality) } - pub async fn drop_eventuality(&self, id: [u8; 32]) { + pub async fn drop_eventuality(&mut self, id: [u8; 32]) { self.scanner.write().await.eventualities.drop(id); } @@ -259,7 +259,7 @@ impl ScannerHandle { /// If a key has been prior set, both keys will be scanned for as detailed in the Multisig /// documentation. The old key will eventually stop being scanned for, leaving just the /// updated-to key. - pub async fn rotate_key(&self, activation_number: usize, key: ::G) { + pub async fn rotate_key(&mut self, activation_number: usize, key: ::G) { let mut scanner = self.scanner.write().await; if !scanner.keys.is_empty() { // Protonet will have a single, static validator set @@ -281,7 +281,7 @@ impl ScannerHandle { /// Acknowledge having handled a block for a key. pub async fn ack_up_to_block( - &self, + &mut self, key: ::G, id: >::Id, ) -> Vec { diff --git a/processor/src/signer.rs b/processor/src/signer.rs index f1a359df..8abb0d8a 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -144,7 +144,8 @@ impl Signer { // Check the attempt lines up match self.attempt.get(&id.id) { // If we don't have an attempt logged, it's because the coordinator is faulty OR because we - // rebooted + // rebooted OR we detected the signed transaction on chain, so there's notable network + // latency/a malicious validator None => { warn!( "not attempting {} #{}. this is an error if we didn't reboot", diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index 69cd3e8d..7650eb5e 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -27,7 +27,7 @@ pub async fn test_scanner(coin: C) { let first = Arc::new(Mutex::new(true)); let db = MemDb::new(); let new_scanner = || async { - let (scanner, active_keys) = Scanner::new(coin.clone(), db.clone()); + let (mut scanner, active_keys) = Scanner::new(coin.clone(), db.clone()); let mut first = first.lock().unwrap(); if *first { assert!(active_keys.is_empty());