diff --git a/processor/src/db.rs b/processor/src/db.rs index 923a7e84..d9803b77 100644 --- a/processor/src/db.rs +++ b/processor/src/db.rs @@ -15,16 +15,24 @@ impl MainDb { D::key(b"MAIN", dst, key) } + fn handled_key(id: u64) -> Vec { + Self::main_key(b"handled", id.to_le_bytes()) + } + pub fn handled_message(&self, id: u64) -> bool { + self.0.get(Self::handled_key(id)).is_some() + } + pub fn handle_message(txn: &mut D::Transaction<'_>, id: u64) { + txn.put(Self::handled_key(id), []) + } + fn plan_key(id: &[u8]) -> Vec { Self::main_key(b"plan", id) } fn signing_key(key: &[u8]) -> Vec { Self::main_key(b"signing", key) } - pub fn save_signing(&mut self, key: &[u8], block_number: u64, plan: &Plan) { + pub fn save_signing(txn: &mut D::Transaction<'_>, key: &[u8], block_number: u64, plan: &Plan) { let id = plan.id(); - // Creating a TXN here is arguably an anti-pattern, yet nothing here expects atomicity - let mut txn = self.0.txn(); { let mut signing = txn.get(Self::signing_key(key)).unwrap_or(vec![]); @@ -46,8 +54,6 @@ impl MainDb { plan.write(&mut buf).unwrap(); txn.put(Self::plan_key(&id), &buf); } - - txn.commit(); } pub fn signing(&self, key: &[u8]) -> Vec<(u64, Plan)> { @@ -68,7 +74,7 @@ impl MainDb { res } - pub fn finish_signing(&mut self, key: &[u8], id: [u8; 32]) { + pub fn finish_signing(&mut self, txn: &mut D::Transaction<'_>, key: &[u8], id: [u8; 32]) { let mut signing = self.0.get(Self::signing_key(key)).unwrap_or(vec![]); assert_eq!(signing.len() % 32, 0); @@ -87,8 +93,6 @@ impl MainDb { log::warn!("told to finish signing {} yet wasn't actively signing it", hex::encode(id)); } - let mut txn = self.0.txn(); txn.put(Self::signing_key(key), signing); - txn.commit(); } } diff --git a/processor/src/key_gen.rs b/processor/src/key_gen.rs index 1878df36..115a4d1b 100644 --- a/processor/src/key_gen.rs +++ b/processor/src/key_gen.rs @@ -28,7 +28,7 @@ pub struct KeyConfirmed { } #[derive(Clone, Debug)] -struct KeyGenDb(D, PhantomData); +struct KeyGenDb(PhantomData, PhantomData); impl KeyGenDb { fn key_gen_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { D::key(b"KEY_GEN", dst, key) @@ -40,9 +40,9 @@ impl KeyGenDb { fn save_params(txn: &mut D::Transaction<'_>, set: &ValidatorSet, params: &ThresholdParams) { txn.put(Self::params_key(set), bincode::serialize(params).unwrap()); } - fn params(&self, set: &ValidatorSet) -> ThresholdParams { + fn params(getter: &G, set: &ValidatorSet) -> ThresholdParams { // Directly unwraps the .get() as this will only be called after being set - bincode::deserialize(&self.0.get(Self::params_key(set)).unwrap()).unwrap() + bincode::deserialize(&getter.get(Self::params_key(set)).unwrap()).unwrap() } // Not scoped to the set since that'd have latter attempts overwrite former @@ -58,9 +58,9 @@ impl KeyGenDb { ) { txn.put(Self::commitments_key(id), bincode::serialize(commitments).unwrap()); } - fn commitments(&self, id: &KeyGenId) -> HashMap> { + fn commitments(getter: &G, id: &KeyGenId) -> HashMap> { bincode::deserialize::>>( - &self.0.get(Self::commitments_key(id)).unwrap(), + &getter.get(Self::commitments_key(id)).unwrap(), ) .unwrap() } @@ -102,11 +102,11 @@ impl KeyGenDb { txn.put(Self::keys_key(&keys.1.group_key()), keys_vec); keys } - fn keys( - &self, + fn keys( + getter: &G, key: &::G, ) -> (ThresholdKeys, ThresholdKeys) { - Self::read_keys(&self.0, &Self::keys_key(key)).1 + Self::read_keys(getter, &Self::keys_key(key)).1 } } @@ -115,7 +115,7 @@ impl KeyGenDb { /// 2) It did send its response, and has locally saved enough data to continue #[derive(Debug)] pub struct KeyGen { - db: KeyGenDb, + db: D, entropy: Zeroizing<[u8; 32]>, active_commit: @@ -126,23 +126,23 @@ pub struct KeyGen { impl KeyGen { #[allow(clippy::new_ret_no_self)] pub fn new(db: D, entropy: Zeroizing<[u8; 32]>) -> KeyGen { - KeyGen { - db: KeyGenDb(db, PhantomData::), - entropy, - - active_commit: HashMap::new(), - active_share: HashMap::new(), - } + KeyGen { db, entropy, active_commit: HashMap::new(), active_share: HashMap::new() } } pub fn keys( &self, key: &::G, ) -> (ThresholdKeys, ThresholdKeys) { - self.db.keys(key) + // This is safe, despite not having a txn, since it's a static value + // At worst, it's not set when it's expected to be set, yet that should be handled contextually + KeyGenDb::::keys(&self.db, key) } - pub async fn handle(&mut self, msg: CoordinatorMessage) -> ProcessorMessage { + pub async fn handle( + &mut self, + txn: &mut D::Transaction<'_>, + msg: CoordinatorMessage, + ) -> ProcessorMessage { let context = |id: &KeyGenId| { // TODO2: Also embed the chain ID/genesis block format!( @@ -177,11 +177,7 @@ impl KeyGen { self.active_share.remove(&id.set).is_none() { // If we haven't handled this set before, save the params - // This may overwrite previously written params if we rebooted, yet that isn't a - // concern - let mut txn = self.db.0.txn(); - KeyGenDb::::save_params(&mut txn, &id.set, ¶ms); - txn.commit(); + KeyGenDb::::save_params(txn, &id.set, ¶ms); } let (machines, commitments) = key_gen_machines(id, params); @@ -202,7 +198,7 @@ impl KeyGen { panic!("commitments when already handled commitments"); } - let params = self.db.params(&id.set); + let params = KeyGenDb::::params(txn, &id.set); // Unwrap the machines, rebuilding them if we didn't have them in our cache // We won't if the processor rebooted @@ -256,9 +252,7 @@ impl KeyGen { share.extend(coin_shares[i].serialize()); } - let mut txn = self.db.0.txn(); - KeyGenDb::::save_commitments(&mut txn, &id, &commitments); - txn.commit(); + KeyGenDb::::save_commitments(txn, &id, &commitments); ProcessorMessage::Shares { id, shares } } @@ -266,13 +260,13 @@ impl KeyGen { CoordinatorMessage::Shares { id, shares } => { info!("Received shares for {:?}", id); - let params = self.db.params(&id.set); + let params = KeyGenDb::::params(txn, &id.set); // Same commentary on inconsistency as above exists let machines = self.active_share.remove(&id.set).unwrap_or_else(|| { let machines = key_gen_machines(id, params).0; let mut rng = secret_shares_rng(id); - let commitments = self.db.commitments(&id); + let commitments = KeyGenDb::::commitments(txn, &id); let mut commitments_ref: HashMap = commitments.iter().map(|(i, commitments)| (*i, commitments.as_ref())).collect(); @@ -337,9 +331,7 @@ impl KeyGen { let substrate_keys = handle_machine(&mut rng, params, machines.0, &mut shares_ref); let coin_keys = handle_machine(&mut rng, params, machines.1, &mut shares_ref); - let mut txn = self.db.0.txn(); - KeyGenDb::::save_keys(&mut txn, &id, &substrate_keys, &coin_keys); - txn.commit(); + KeyGenDb::::save_keys(txn, &id, &substrate_keys, &coin_keys); let mut coin_keys = ThresholdKeys::new(coin_keys); C::tweak_keys(&mut coin_keys); @@ -354,12 +346,11 @@ impl KeyGen { pub async fn confirm( &mut self, + txn: &mut D::Transaction<'_>, context: SubstrateContext, id: KeyGenId, ) -> KeyConfirmed { - let mut txn = self.db.0.txn(); - let (substrate_keys, coin_keys) = KeyGenDb::::confirm_keys(&mut txn, &id); - txn.commit(); + let (substrate_keys, coin_keys) = KeyGenDb::::confirm_keys(txn, &id); info!( "Confirmed key pair {} {} from {:?}", diff --git a/processor/src/main.rs b/processor/src/main.rs index a88d9ddb..c98661a2 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -8,7 +8,7 @@ use zeroize::{Zeroize, Zeroizing}; use transcript::{Transcript, RecommendedTranscript}; use group::GroupEncoding; -use frost::curve::Ciphersuite; +use frost::{curve::Ciphersuite, ThresholdKeys}; use log::{info, warn, error}; use tokio::time::sleep; @@ -90,14 +90,13 @@ async fn get_fee(coin: &C, block_number: usize) -> C::Fee { } } -async fn prepare_send( +async fn prepare_send( coin: &C, - signer: &Signer, + keys: ThresholdKeys, block_number: usize, fee: C::Fee, plan: Plan, ) -> (Option<(C::SignableTransaction, C::Eventuality)>, Vec) { - let keys = signer.keys().await; loop { match coin.prepare_send(keys.clone(), block_number, plan.clone(), fee).await { Ok(prepared) => { @@ -173,7 +172,7 @@ struct SubstrateMutable { } async fn sign_plans( - db: &mut MainDb, + txn: &mut D::Transaction<'_>, coin: &C, substrate_mutable: &mut SubstrateMutable, signers: &mut HashMap, Signer>, @@ -197,17 +196,11 @@ async fn sign_plans( info!("preparing plan {}: {:?}", hex::encode(id), plan); let key = plan.key.to_bytes(); - db.save_signing(key.as_ref(), block_number.try_into().unwrap(), &plan); + MainDb::::save_signing(txn, key.as_ref(), block_number.try_into().unwrap(), &plan); let (tx, branches) = - prepare_send(coin, signers.get_mut(key.as_ref()).unwrap(), block_number, fee, plan).await; + prepare_send(coin, signers.get_mut(key.as_ref()).unwrap().keys(), block_number, fee, plan) + .await; - // TODO: If we reboot mid-sign_plans, for a DB-backed scheduler, these may be partially - // executed - // Global TXN object for the entire coordinator message? - // Re-ser the scheduler after every sign_plans call? - // To clarify, the scheduler is distinct as it mutates itself on new data. - // The key_gen/scanner/signer are designed to be deterministic to new data, irrelevant to prior - // states. for branch in branches { substrate_mutable .schedulers @@ -218,19 +211,18 @@ async fn sign_plans( if let Some((tx, eventuality)) = tx { substrate_mutable.scanner.register_eventuality(block_number, id, eventuality.clone()).await; - signers.get_mut(key.as_ref()).unwrap().sign_transaction(id, tx, eventuality).await; + signers.get_mut(key.as_ref()).unwrap().sign_transaction(txn, id, tx, eventuality).await; } } } async fn handle_coordinator_msg( - raw_db: &D, - main_db: &mut MainDb, + txn: &mut D::Transaction<'_>, coin: &C, coordinator: &mut Co, tributary_mutable: &mut TributaryMutable, substrate_mutable: &mut SubstrateMutable, - msg: Message, + msg: &Message, ) { // If this message expects a higher block number than we have, halt until synced async fn wait(scanner: &ScannerHandle, block_hash: &BlockHash) { @@ -293,15 +285,17 @@ async fn handle_coordinator_msg( match msg.msg.clone() { CoordinatorMessage::KeyGen(msg) => { // TODO: This may be fired multiple times. What's our plan for that? - coordinator.send(ProcessorMessage::KeyGen(tributary_mutable.key_gen.handle(msg).await)).await; + coordinator + .send(ProcessorMessage::KeyGen(tributary_mutable.key_gen.handle(txn, msg).await)) + .await; } CoordinatorMessage::Sign(msg) => { - tributary_mutable.signers.get_mut(msg.key()).unwrap().handle(msg).await; + tributary_mutable.signers.get_mut(msg.key()).unwrap().handle(txn, msg).await; } CoordinatorMessage::Coordinator(msg) => { - tributary_mutable.substrate_signers.get_mut(msg.key()).unwrap().handle(msg).await; + tributary_mutable.substrate_signers.get_mut(msg.key()).unwrap().handle(txn, msg).await; } CoordinatorMessage::Substrate(msg) => { @@ -309,10 +303,10 @@ async fn handle_coordinator_msg( messages::substrate::CoordinatorMessage::ConfirmKeyPair { context, id } => { // See TributaryMutable's struct definition for why this block is safe let KeyConfirmed { activation_block, substrate_keys, coin_keys } = - tributary_mutable.key_gen.confirm(context, id).await; + tributary_mutable.key_gen.confirm(txn, context, id).await; tributary_mutable.substrate_signers.insert( substrate_keys.group_key().to_bytes().to_vec(), - SubstrateSigner::new(raw_db.clone(), substrate_keys), + SubstrateSigner::new(substrate_keys), ); let key = coin_keys.group_key(); @@ -325,15 +319,14 @@ async fn handle_coordinator_msg( .await .expect("KeyConfirmed from context we haven't synced"); - substrate_mutable.scanner.rotate_key(activation_number, key).await; + substrate_mutable.scanner.rotate_key(txn, activation_number, key).await; substrate_mutable .schedulers .insert(key.to_bytes().as_ref().to_vec(), Scheduler::::new(key)); - tributary_mutable.signers.insert( - key.to_bytes().as_ref().to_vec(), - Signer::new(raw_db.clone(), coin.clone(), coin_keys), - ); + tributary_mutable + .signers + .insert(key.to_bytes().as_ref().to_vec(), Signer::new(coin.clone(), coin_keys)); } messages::substrate::CoordinatorMessage::SubstrateBlock { @@ -347,11 +340,12 @@ async fn handle_coordinator_msg( let key = ::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); // We now have to acknowledge every block for this key up to the acknowledged block - let (blocks, outputs) = substrate_mutable.scanner.ack_up_to_block(key, block_id).await; + let (blocks, outputs) = + substrate_mutable.scanner.ack_up_to_block(txn, key, block_id).await; // Since this block was acknowledged, we no longer have to sign the batch for it for block in blocks { for (_, signer) in tributary_mutable.substrate_signers.iter_mut() { - signer.batch_signed(block); + signer.batch_signed(txn, block); } } @@ -377,7 +371,7 @@ async fn handle_coordinator_msg( .schedule(outputs, payments); sign_plans( - main_db, + txn, coin, substrate_mutable, // See commentary in TributaryMutable for why this is safe @@ -390,12 +384,10 @@ async fn handle_coordinator_msg( } } } - - coordinator.ack(msg).await; } async fn boot( - raw_db: &D, + raw_db: &mut D, coin: &C, ) -> (MainDb, TributaryMutable, SubstrateMutable) { let mut entropy_transcript = { @@ -443,12 +435,12 @@ async fn boot( let (substrate_keys, coin_keys) = key_gen.keys(key); let substrate_key = substrate_keys.group_key(); - let substrate_signer = SubstrateSigner::new(raw_db.clone(), substrate_keys); + let substrate_signer = SubstrateSigner::new(substrate_keys); // We don't have to load any state for this since the Scanner will re-fire any events // necessary substrate_signers.insert(substrate_key.to_bytes().to_vec(), substrate_signer); - let mut signer = Signer::new(raw_db.clone(), coin.clone(), coin_keys); + let mut signer = Signer::new(coin.clone(), coin_keys); // Load any TXs being actively signed let key = key.to_bytes(); @@ -461,14 +453,17 @@ async fn boot( info!("reloading plan {}: {:?}", hex::encode(id), plan); let (Some((tx, eventuality)), _) = - prepare_send(coin, &signer, block_number, fee, plan).await else { + prepare_send(coin, signer.keys(), block_number, fee, plan).await else { panic!("previously created transaction is no longer being created") }; scanner.register_eventuality(block_number, id, eventuality.clone()).await; // TODO: Reconsider if the Signer should have the eventuality, or if just the coin/scanner // should - signer.sign_transaction(id, tx, eventuality).await; + let mut txn = raw_db.txn(); + signer.sign_transaction(&mut txn, id, tx, eventuality).await; + // This should only have re-writes of existing data + drop(txn); } signers.insert(key.as_ref().to_vec(), signer); @@ -481,14 +476,14 @@ async fn boot( ) } -async fn run(raw_db: D, coin: C, mut coordinator: Co) { +async fn run(mut raw_db: D, coin: C, mut coordinator: Co) { // We currently expect a contextless bidirectional mapping between these two values // (which is that any value of A can be interpreted as B and vice versa) // While we can write a contextual mapping, we have yet to do so // This check ensures no coin which doesn't have a bidirectional mapping is defined assert_eq!(>::Id::default().as_ref().len(), BlockHash([0u8; 32]).0.len()); - let (mut main_db, mut tributary_mutable, mut substrate_mutable) = boot(&raw_db, &coin).await; + let (mut main_db, mut tributary_mutable, mut substrate_mutable) = boot(&mut raw_db, &coin).await; // We can't load this from the DB as we can't guarantee atomic increments with the ack function let mut last_coordinator_msg = None; @@ -505,12 +500,6 @@ async fn run(raw_db: D, coin: C, mut coordinato } SignerEvent::SignedTransaction { id, tx } => { - // If we die after calling finish_signing, we'll never fire Completed - // TODO: Is that acceptable? Do we need to fire Completed before firing finish_signing? - main_db.finish_signing(key, id); - // This does mutate the Scanner, yet the eventuality protocol is only run to mutate - // the signer, which is Tributary mutable (and what's currently being mutated) - substrate_mutable.scanner.drop_eventuality(id).await; coordinator .send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { key: key.clone(), @@ -519,6 +508,13 @@ async fn run(raw_db: D, coin: C, mut coordinato })) .await; + let mut txn = raw_db.txn(); + // This does mutate the Scanner, yet the eventuality protocol is only run to mutate + // the signer, which is Tributary mutable (and what's currently being mutated) + substrate_mutable.scanner.drop_eventuality(id).await; + main_db.finish_signing(&mut txn, key, id); + txn.commit(); + // TODO // 1) We need to stop signing whenever a peer informs us or the chain has an // eventuality @@ -559,33 +555,39 @@ async fn run(raw_db: D, coin: C, mut coordinato assert_eq!(msg.id, (last_coordinator_msg.unwrap_or(msg.id - 1) + 1)); last_coordinator_msg = Some(msg.id); - // If we've already handled this message, continue - // TODO + // Only handle this if we haven't already + if !main_db.handled_message(msg.id) { + let mut txn = raw_db.txn(); + MainDb::::handle_message(&mut txn, msg.id); - // This is isolated to better think about how its ordered, or rather, about how the - // following cases aren't ordered - // - // While the coordinator messages are ordered, they're not deterministically ordered - // While Tributary-caused messages are deterministically ordered, and Substrate-caused - // messages are deterministically-ordered, they're both shoved into this singular queue - // The order at which they're shoved in together isn't deterministic - // - // This should be safe so long as Tributary and Substrate messages don't both expect - // mutable references over the same data - // - // TODO: Better assert/guarantee this - handle_coordinator_msg( - &raw_db, - &mut main_db, - &coin, - &mut coordinator, - &mut tributary_mutable, - &mut substrate_mutable, - msg, - ).await; + // This is isolated to better think about how its ordered, or rather, about how the other + // cases aren't ordered + // + // While the coordinator messages are ordered, they're not deterministically ordered + // Tributary-caused messages are deterministically ordered, and Substrate-caused messages + // are deterministically-ordered, yet they're both shoved into a singular queue + // The order at which they're shoved in together isn't deterministic + // + // This is safe so long as Tributary and Substrate messages don't both expect mutable + // references over the same data + handle_coordinator_msg( + &mut txn, + &coin, + &mut coordinator, + &mut tributary_mutable, + &mut substrate_mutable, + &msg, + ).await; + + txn.commit(); + } + + coordinator.ack(msg).await; }, msg = substrate_mutable.scanner.events.recv() => { + let mut txn = raw_db.txn(); + match msg.unwrap() { ScannerEvent::Block { key, block, batch, outputs } => { let key = key.to_bytes().as_ref().to_vec(); @@ -625,16 +627,18 @@ async fn run(raw_db: D, coin: C, mut coordinato }; // Start signing this batch - tributary_mutable.substrate_signers.get_mut(&key).unwrap().sign(batch).await; + tributary_mutable.substrate_signers.get_mut(&key).unwrap().sign(&mut txn, batch).await; }, ScannerEvent::Completed(id, tx) => { // We don't know which signer had this plan, so inform all of them for (_, signer) in tributary_mutable.signers.iter_mut() { - signer.eventuality_completion(id, &tx).await; + signer.eventuality_completion(&mut txn, id, &tx).await; } }, } + + txn.commit(); }, } } diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 653f5aa7..b65d7ba4 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -37,7 +37,7 @@ pub enum ScannerEvent { pub type ScannerEventChannel = mpsc::UnboundedReceiver>; #[derive(Clone, Debug)] -struct ScannerDb(D, PhantomData); +struct ScannerDb(PhantomData, PhantomData); impl ScannerDb { fn scanner_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { D::key(b"SCANNER", dst, key) @@ -60,9 +60,8 @@ impl ScannerDb { res }) } - fn block_number(&self, id: &>::Id) -> Option { - self - .0 + fn block_number(getter: &G, id: &>::Id) -> Option { + getter .get(Self::block_number_key(id)) .map(|number| u64::from_le_bytes(number.try_into().unwrap()).try_into().unwrap()) } @@ -91,8 +90,8 @@ impl ScannerDb { keys.extend(key_bytes.as_ref()); txn.put(Self::active_keys_key(), keys); } - fn active_keys(&self) -> Vec<::G> { - let bytes_vec = self.0.get(Self::active_keys_key()).unwrap_or(vec![]); + fn active_keys(getter: &G) -> Vec<::G> { + let bytes_vec = getter.get(Self::active_keys_key()).unwrap_or(vec![]); let mut bytes: &[u8] = bytes_vec.as_ref(); // Assumes keys will be 32 bytes when calculating the capacity @@ -109,8 +108,8 @@ impl ScannerDb { fn seen_key(id: &::Id) -> Vec { Self::scanner_key(b"seen", id) } - fn seen(&self, id: &::Id) -> bool { - self.0.get(Self::seen_key(id)).is_some() + fn seen(getter: &G, id: &::Id) -> bool { + getter.get(Self::seen_key(id)).is_some() } fn next_batch_key() -> Vec { @@ -201,9 +200,8 @@ impl ScannerDb { // Return this block's outputs so they can be pruned from the RAM cache (id, outputs) } - fn latest_scanned_block(&self, key: ::G) -> usize { - let bytes = self - .0 + fn latest_scanned_block(getter: &G, key: ::G) -> usize { + let bytes = getter .get(Self::scanned_block_key(&key)) .expect("asking for latest scanned block of key which wasn't rotated to"); u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap() @@ -216,7 +214,7 @@ impl ScannerDb { #[derive(Debug)] pub struct Scanner { coin: C, - db: ScannerDb, + db: D, keys: Vec<::G>, eventualities: EventualitiesTracker, @@ -267,7 +265,12 @@ impl ScannerHandle { /// If a key has been prior set, both keys will be scanned for as detailed in the Multisig /// documentation. The old key will eventually stop being scanned for, leaving just the /// updated-to key. - pub async fn rotate_key(&mut self, activation_number: usize, key: ::G) { + pub async fn rotate_key( + &mut self, + txn: &mut D::Transaction<'_>, + activation_number: usize, + key: ::G, + ) { let mut scanner = self.scanner.write().await; if !scanner.keys.is_empty() { // Protonet will have a single, static validator set @@ -276,21 +279,25 @@ impl ScannerHandle { } info!("Rotating to key {}", hex::encode(key.to_bytes())); - let mut txn = scanner.db.0.txn(); - let (_, outputs) = ScannerDb::::save_scanned_block(&mut txn, &key, activation_number); + + let (_, outputs) = ScannerDb::::save_scanned_block(txn, &key, activation_number); + scanner.ram_scanned.insert(key.to_bytes().as_ref().to_vec(), activation_number); assert!(outputs.is_empty()); - ScannerDb::::add_active_key(&mut txn, key); - txn.commit(); + + ScannerDb::::add_active_key(txn, key); scanner.keys.push(key); } pub async fn block_number(&self, id: &>::Id) -> Option { - self.scanner.read().await.db.block_number(id) + // This is safe, despite not having a txn, since it's a static value + // At worst, it's not set when it's expected to be set, yet that should be handled contextually + ScannerDb::::block_number(&self.scanner.read().await.db, id) } /// Acknowledge having handled a block for a key. pub async fn ack_up_to_block( &mut self, + txn: &mut D::Transaction<'_>, key: ::G, id: >::Id, ) -> (Vec, Vec) { @@ -298,23 +305,20 @@ impl ScannerHandle { debug!("Block {} acknowledged", hex::encode(&id)); // Get the number for this block - let number = - scanner.db.block_number(&id).expect("main loop trying to operate on data we haven't scanned"); + let number = ScannerDb::::block_number(txn, &id) + .expect("main loop trying to operate on data we haven't scanned"); // Get the number of the last block we acknowledged - let prior = scanner.db.latest_scanned_block(key); + let prior = ScannerDb::::latest_scanned_block(txn, key); let mut blocks = vec![]; let mut outputs = vec![]; - let mut txn = scanner.db.0.txn(); for number in (prior + 1) ..= number { - let (block, these_outputs) = ScannerDb::::save_scanned_block(&mut txn, &key, number); + let (block, these_outputs) = ScannerDb::::save_scanned_block(txn, &key, number); let block = BlockHash(block.unwrap().as_ref().try_into().unwrap()); blocks.push(block); outputs.extend(these_outputs); } assert_eq!(blocks.last().unwrap().as_ref(), id.as_ref()); - // TODO: This likely needs to be atomic with the scheduler? - txn.commit(); for output in &outputs { assert!(scanner.ram_outputs.remove(output.id().as_ref())); @@ -329,8 +333,14 @@ impl Scanner { pub fn new(coin: C, db: D) -> (ScannerHandle, Vec<::G>) { let (events_send, events_recv) = mpsc::unbounded_channel(); - let db = ScannerDb(db, PhantomData); - let keys = db.active_keys(); + let keys = ScannerDb::::active_keys(&db); + let mut ram_scanned = HashMap::new(); + for key in keys.clone() { + ram_scanned.insert( + key.to_bytes().as_ref().to_vec(), + ScannerDb::::latest_scanned_block(&db, key), + ); + } let scanner = Arc::new(RwLock::new(Scanner { coin, @@ -339,7 +349,7 @@ impl Scanner { eventualities: EventualitiesTracker::new(), - ram_scanned: HashMap::new(), + ram_scanned, ram_outputs: HashSet::new(), events: events_send, @@ -380,21 +390,7 @@ impl Scanner { for key in scanner.keys.clone() { let key_vec = key.to_bytes().as_ref().to_vec(); - let latest_scanned = { - // Grab the latest scanned block according to the DB - let db_scanned = scanner.db.latest_scanned_block(key); - // We may, within this process's lifetime, have scanned more blocks - // If they're still being processed, we will not have officially written them to the DB - // as scanned yet - // That way, if the process terminates, and is rebooted, we'll rescan from a handled - // point, re-firing all events along the way, enabling them to be properly processed - // In order to not re-fire them within this process's lifetime, check our RAM cache - // of what we've scanned - // We are allowed to re-fire them within this lifetime. It's just wasteful - let ram_scanned = scanner.ram_scanned.get(&key_vec).cloned().unwrap_or(0); - // Pick whichever is higher - db_scanned.max(ram_scanned) - }; + let latest_scanned = scanner.ram_scanned[&key_vec]; for i in (latest_scanned + 1) ..= latest { // TODO2: Check for key deprecation @@ -408,14 +404,15 @@ impl Scanner { }; let block_id = block.id(); - if let Some(id) = ScannerDb::::block(&scanner.db.0, i) { + // These block calls are safe, despite not having a txn, since they're static values + if let Some(id) = ScannerDb::::block(&scanner.db, i) { if id != block_id { panic!("reorg'd from finalized {} to {}", hex::encode(id), hex::encode(block_id)); } } else { info!("Found new block: {}", hex::encode(&block_id)); - if let Some(id) = ScannerDb::::block(&scanner.db.0, i.saturating_sub(1)) { + if let Some(id) = ScannerDb::::block(&scanner.db, i.saturating_sub(1)) { if id != block.parent() { panic!( "block {} doesn't build off expected parent {}", @@ -425,7 +422,7 @@ impl Scanner { } } - let mut txn = scanner.db.0.txn(); + let mut txn = scanner.db.txn(); ScannerDb::::save_block(&mut txn, i, &block_id); txn.commit(); } @@ -470,7 +467,35 @@ impl Scanner { // On Bitcoin, the output ID should be unique for a given chain // On Monero, it's trivial to make an output sharing an ID with another // We should only scan outputs with valid IDs however, which will be unique - let seen = scanner.db.seen(&id); + + /* + The safety of this code must satisfy the following conditions: + 1) seen is not set for the first occurrence + 2) seen is set for any future occurrence + + seen is only written to after this code completes. Accordingly, it cannot be set + before the first occurrence UNLESSS it's set, yet the last scanned block isn't. + They are both written in the same database transaction, preventing this. + + As for future occurrences, the RAM entry ensures they're handled properly even if + the database has yet to be set. + + On reboot, which will clear the RAM, if seen wasn't set, neither was latest scanned + block. Accordingly, this will scan from some prior block, re-populating the RAM. + + If seen was set, then this will be successfully read. + + There's also no concern ram_outputs was pruned, yet seen wasn't set, as pruning + from ram_outputs will acquire a write lock (preventing this code from acquiring + its own write lock and running), and during its holding of the write lock, it + commits the transaction setting seen and the latest scanned block. + + This last case isn't true. Committing seen/latest_scanned_block happens after + relinquishing the write lock. + + TODO: Only update ram_outputs after committing the TXN in question. + */ + let seen = ScannerDb::::seen(&scanner.db, &id); let id = id.as_ref().to_vec(); if seen || scanner.ram_outputs.contains(&id) { panic!("scanned an output multiple times"); @@ -483,7 +508,7 @@ impl Scanner { } // Save the outputs to disk - let mut txn = scanner.db.0.txn(); + let mut txn = scanner.db.txn(); let batch = ScannerDb::::save_outputs(&mut txn, &key, &block_id, &outputs); txn.commit(); diff --git a/processor/src/signer.rs b/processor/src/signer.rs index 8abb0d8a..3e6fb41a 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -57,8 +57,8 @@ impl SignerDb { existing.extend(tx.as_ref()); txn.put(Self::completed_key(id), existing); } - fn completed(&self, id: [u8; 32]) -> Option> { - self.0.get(Self::completed_key(id)) + fn completed(getter: &G, id: [u8; 32]) -> Option> { + getter.get(Self::completed_key(id)) } fn eventuality_key(id: [u8; 32]) -> Vec { @@ -67,9 +67,9 @@ impl SignerDb { fn save_eventuality(txn: &mut D::Transaction<'_>, id: [u8; 32], eventuality: C::Eventuality) { txn.put(Self::eventuality_key(id), eventuality.serialize()); } - fn eventuality(&self, id: [u8; 32]) -> Option { + fn eventuality(getter: &G, id: [u8; 32]) -> Option { Some( - C::Eventuality::read::<&[u8]>(&mut self.0.get(Self::eventuality_key(id))?.as_ref()).unwrap(), + C::Eventuality::read::<&[u8]>(&mut getter.get(Self::eventuality_key(id))?.as_ref()).unwrap(), ) } @@ -79,8 +79,8 @@ impl SignerDb { fn attempt(txn: &mut D::Transaction<'_>, id: &SignId) { txn.put(Self::attempt_key(id), []); } - fn has_attempt(&mut self, id: &SignId) -> bool { - self.0.get(Self::attempt_key(id)).is_some() + fn has_attempt(getter: &G, id: &SignId) -> bool { + getter.get(Self::attempt_key(id)).is_some() } fn save_transaction(txn: &mut D::Transaction<'_>, tx: &C::Transaction) { @@ -89,8 +89,9 @@ impl SignerDb { } pub struct Signer { + db: PhantomData, + coin: C, - db: SignerDb, keys: ThresholdKeys, @@ -120,10 +121,11 @@ impl fmt::Debug for Signer { } impl Signer { - pub fn new(db: D, coin: C, keys: ThresholdKeys) -> Signer { + pub fn new(coin: C, keys: ThresholdKeys) -> Signer { Signer { + db: PhantomData, + coin, - db: SignerDb(db, PhantomData), keys, @@ -136,7 +138,7 @@ impl Signer { } } - pub async fn keys(&self) -> ThresholdKeys { + pub fn keys(&self) -> ThresholdKeys { self.keys.clone() } @@ -172,10 +174,11 @@ impl Signer { pub async fn eventuality_completion( &mut self, + txn: &mut D::Transaction<'_>, id: [u8; 32], tx_id: &>::Id, ) { - if let Some(eventuality) = self.db.eventuality(id) { + if let Some(eventuality) = SignerDb::::eventuality(txn, id) { // Transaction hasn't hit our mempool/was dropped for a different signature // The latter can happen given certain latency conditions/a single malicious signer // In the case of a single malicious signer, they can drag multiple honest @@ -193,10 +196,8 @@ impl Signer { debug!("eventuality for {} resolved in TX {}", hex::encode(id), hex::encode(tx_id)); // Stop trying to sign for this TX - let mut txn = self.db.0.txn(); - SignerDb::::save_transaction(&mut txn, &tx); - SignerDb::::complete(&mut txn, id, tx_id); - txn.commit(); + SignerDb::::save_transaction(txn, &tx); + SignerDb::::complete(txn, id, tx_id); self.signable.remove(&id); self.attempt.remove(&id); @@ -221,8 +222,8 @@ impl Signer { } } - async fn check_completion(&mut self, id: [u8; 32]) -> bool { - if let Some(txs) = self.db.completed(id) { + async fn check_completion(&mut self, txn: &mut D::Transaction<'_>, id: [u8; 32]) -> bool { + if let Some(txs) = SignerDb::::completed(txn, id) { debug!( "SignTransaction/Reattempt order for {}, which we've already completed signing", hex::encode(id) @@ -255,8 +256,8 @@ impl Signer { } } - async fn attempt(&mut self, id: [u8; 32], attempt: u32) { - if self.check_completion(id).await { + async fn attempt(&mut self, txn: &mut D::Transaction<'_>, id: [u8; 32], attempt: u32) { + if self.check_completion(txn, id).await { return; } @@ -304,7 +305,7 @@ impl Signer { // branch again for something we've already attempted // // Only run if this hasn't already been attempted - if self.db.has_attempt(&id) { + if SignerDb::::has_attempt(txn, &id) { warn!( "already attempted {} #{}. this is an error if we didn't reboot", hex::encode(id.id), @@ -313,9 +314,7 @@ impl Signer { return; } - let mut txn = self.db.0.txn(); - SignerDb::::attempt(&mut txn, &id); - txn.commit(); + SignerDb::::attempt(txn, &id); // Attempt to create the TX let machine = match self.coin.attempt_send(tx).await { @@ -338,23 +337,22 @@ impl Signer { pub async fn sign_transaction( &mut self, + txn: &mut D::Transaction<'_>, id: [u8; 32], tx: C::SignableTransaction, eventuality: C::Eventuality, ) { - if self.check_completion(id).await { + if self.check_completion(txn, id).await { return; } - let mut txn = self.db.0.txn(); - SignerDb::::save_eventuality(&mut txn, id, eventuality); - txn.commit(); + SignerDb::::save_eventuality(txn, id, eventuality); self.signable.insert(id, tx); - self.attempt(id, 0).await; + self.attempt(txn, id, 0).await; } - pub async fn handle(&mut self, msg: CoordinatorMessage) { + pub async fn handle(&mut self, txn: &mut D::Transaction<'_>, msg: CoordinatorMessage) { match msg { CoordinatorMessage::Preprocesses { id, mut preprocesses } => { if self.verify_id(&id).is_err() { @@ -440,11 +438,9 @@ impl Signer { }; // Save the transaction in case it's needed for recovery - let mut txn = self.db.0.txn(); - SignerDb::::save_transaction(&mut txn, &tx); + SignerDb::::save_transaction(txn, &tx); let tx_id = tx.id(); - SignerDb::::complete(&mut txn, id.id, &tx_id); - txn.commit(); + SignerDb::::complete(txn, id.id, &tx_id); // Publish it if let Err(e) = self.coin.publish_transaction(&tx).await { @@ -463,7 +459,7 @@ impl Signer { } CoordinatorMessage::Reattempt { id } => { - self.attempt(id.id, id.attempt).await; + self.attempt(txn, id.id, id.attempt).await; } CoordinatorMessage::Completed { key: _, id, tx: mut tx_vec } => { @@ -479,7 +475,7 @@ impl Signer { } tx.as_mut().copy_from_slice(&tx_vec); - self.eventuality_completion(id, &tx).await; + self.eventuality_completion(txn, id, &tx).await; } } } diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index 368813c1..e32c2570 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -1,4 +1,4 @@ -use core::fmt; +use core::{marker::PhantomData, fmt}; use std::collections::{VecDeque, HashMap}; use rand_core::OsRng; @@ -24,7 +24,7 @@ use serai_client::{ }; use messages::{sign::SignId, coordinator::*}; -use crate::{DbTxn, Db}; +use crate::{Get, DbTxn, Db}; #[derive(Debug)] pub enum SubstrateSignerEvent { @@ -45,8 +45,8 @@ impl SubstrateSignerDb { fn complete(txn: &mut D::Transaction<'_>, id: [u8; 32]) { txn.put(Self::completed_key(id), [1]); } - fn completed(&self, id: [u8; 32]) -> bool { - self.0.get(Self::completed_key(id)).is_some() + fn completed(getter: &G, id: [u8; 32]) -> bool { + getter.get(Self::completed_key(id)).is_some() } fn attempt_key(id: &SignId) -> Vec { @@ -55,8 +55,8 @@ impl SubstrateSignerDb { fn attempt(txn: &mut D::Transaction<'_>, id: &SignId) { txn.put(Self::attempt_key(id), []); } - fn has_attempt(&mut self, id: &SignId) -> bool { - self.0.get(Self::attempt_key(id)).is_some() + fn has_attempt(getter: &G, id: &SignId) -> bool { + getter.get(Self::attempt_key(id)).is_some() } fn save_batch(txn: &mut D::Transaction<'_>, batch: &SignedBatch) { @@ -65,7 +65,7 @@ impl SubstrateSignerDb { } pub struct SubstrateSigner { - db: SubstrateSignerDb, + db: PhantomData, keys: ThresholdKeys, @@ -88,9 +88,9 @@ impl fmt::Debug for SubstrateSigner { } impl SubstrateSigner { - pub fn new(db: D, keys: ThresholdKeys) -> SubstrateSigner { + pub fn new(keys: ThresholdKeys) -> SubstrateSigner { SubstrateSigner { - db: SubstrateSignerDb(db), + db: PhantomData, keys, @@ -129,9 +129,9 @@ impl SubstrateSigner { Ok(()) } - async fn attempt(&mut self, id: [u8; 32], attempt: u32) { + async fn attempt(&mut self, txn: &mut D::Transaction<'_>, id: [u8; 32], attempt: u32) { // See above commentary for why this doesn't emit SignedBatch - if self.db.completed(id) { + if SubstrateSignerDb::::completed(txn, id) { return; } @@ -176,7 +176,7 @@ impl SubstrateSigner { // branch again for something we've already attempted // // Only run if this hasn't already been attempted - if self.db.has_attempt(&id) { + if SubstrateSignerDb::::has_attempt(txn, &id) { warn!( "already attempted {} #{}. this is an error if we didn't reboot", hex::encode(id.id), @@ -185,9 +185,7 @@ impl SubstrateSigner { return; } - let mut txn = self.db.0.txn(); - SubstrateSignerDb::::attempt(&mut txn, &id); - txn.commit(); + SubstrateSignerDb::::attempt(txn, &id); // b"substrate" is a literal from sp-core let machine = AlgorithmMachine::new(Schnorrkel::new(b"substrate"), self.keys.clone()); @@ -201,8 +199,8 @@ impl SubstrateSigner { )); } - pub async fn sign(&mut self, batch: Batch) { - if self.db.completed(batch.block.0) { + pub async fn sign(&mut self, txn: &mut D::Transaction<'_>, batch: Batch) { + if SubstrateSignerDb::::completed(txn, batch.block.0) { debug!("Sign batch order for ID we've already completed signing"); // See batch_signed for commentary on why this simply returns return; @@ -210,10 +208,10 @@ impl SubstrateSigner { let id = batch.block.0; self.signable.insert(id, batch); - self.attempt(id, 0).await; + self.attempt(txn, id, 0).await; } - pub async fn handle(&mut self, msg: CoordinatorMessage) { + pub async fn handle(&mut self, txn: &mut D::Transaction<'_>, msg: CoordinatorMessage) { match msg { CoordinatorMessage::BatchPreprocesses { id, mut preprocesses } => { if self.verify_id(&id).is_err() { @@ -302,10 +300,8 @@ impl SubstrateSigner { SignedBatch { batch: self.signable.remove(&id.id).unwrap(), signature: sig.into() }; // Save the batch in case it's needed for recovery - let mut txn = self.db.0.txn(); - SubstrateSignerDb::::save_batch(&mut txn, &batch); - SubstrateSignerDb::::complete(&mut txn, id.id); - txn.commit(); + SubstrateSignerDb::::save_batch(txn, &batch); + SubstrateSignerDb::::complete(txn, id.id); // Stop trying to sign for this batch assert!(self.attempt.remove(&id.id).is_some()); @@ -316,16 +312,14 @@ impl SubstrateSigner { } CoordinatorMessage::BatchReattempt { id } => { - self.attempt(id.id, id.attempt).await; + self.attempt(txn, id.id, id.attempt).await; } } } - pub fn batch_signed(&mut self, block: BlockHash) { + pub fn batch_signed(&mut self, txn: &mut D::Transaction<'_>, block: BlockHash) { // Stop trying to sign for this batch - let mut txn = self.db.0.txn(); - SubstrateSignerDb::::complete(&mut txn, block.0); - txn.commit(); + SubstrateSignerDb::::complete(txn, block.0); self.signable.remove(&block.0); self.attempt.remove(&block.0); diff --git a/processor/src/tests/addresses.rs b/processor/src/tests/addresses.rs index ad5fb542..4d114dec 100644 --- a/processor/src/tests/addresses.rs +++ b/processor/src/tests/addresses.rs @@ -7,7 +7,7 @@ use frost::{Participant, ThresholdKeys}; use tokio::time::timeout; -use serai_db::MemDb; +use serai_db::{DbTxn, MemDb}; use crate::{ Plan, Db, @@ -78,10 +78,12 @@ pub async fn test_addresses(coin: C) { coin.mine_block().await; } - let db = MemDb::new(); + let mut db = MemDb::new(); let (mut scanner, active_keys) = Scanner::new(coin.clone(), db.clone()); assert!(active_keys.is_empty()); - scanner.rotate_key(coin.get_latest_block_number().await.unwrap(), key).await; + let mut txn = db.txn(); + scanner.rotate_key(&mut txn, coin.get_latest_block_number().await.unwrap(), key).await; + txn.commit(); // Receive funds to the branch address and make sure it's properly identified let block_id = coin.test_send(C::branch_address(key)).await.id(); diff --git a/processor/src/tests/key_gen.rs b/processor/src/tests/key_gen.rs index 8a33b2b0..592d53ce 100644 --- a/processor/src/tests/key_gen.rs +++ b/processor/src/tests/key_gen.rs @@ -7,7 +7,7 @@ use rand_core::{RngCore, OsRng}; use group::GroupEncoding; use frost::{Participant, ThresholdParams, tests::clone_without}; -use serai_db::MemDb; +use serai_db::{DbTxn, Db, MemDb}; use serai_client::{ primitives::{MONERO_NET_ID, BlockHash}, @@ -31,19 +31,24 @@ pub async fn test_key_gen() { let mut entropy = Zeroizing::new([0; 32]); OsRng.fill_bytes(entropy.as_mut()); entropies.insert(i, entropy); - dbs.insert(i, MemDb::new()); - key_gens.insert(i, KeyGen::::new(dbs[&i].clone(), entropies[&i].clone())); + let db = MemDb::new(); + dbs.insert(i, db.clone()); + key_gens.insert(i, KeyGen::::new(db, entropies[&i].clone())); } let mut all_commitments = HashMap::new(); for i in 1 ..= 5 { let key_gen = key_gens.get_mut(&i).unwrap(); + let mut txn = dbs.get_mut(&i).unwrap().txn(); if let ProcessorMessage::Commitments { id, commitments } = key_gen - .handle(CoordinatorMessage::GenerateKey { - id: ID, - params: ThresholdParams::new(3, 5, Participant::new(u16::try_from(i).unwrap()).unwrap()) - .unwrap(), - }) + .handle( + &mut txn, + CoordinatorMessage::GenerateKey { + id: ID, + params: ThresholdParams::new(3, 5, Participant::new(u16::try_from(i).unwrap()).unwrap()) + .unwrap(), + }, + ) .await { assert_eq!(id, ID); @@ -51,27 +56,32 @@ pub async fn test_key_gen() { } else { panic!("didn't get commitments back"); } + txn.commit(); } // 1 is rebuilt on every step // 2 is rebuilt here // 3 ... are rebuilt once, one at each of the following steps - let rebuild = |key_gens: &mut HashMap<_, _>, i| { + let rebuild = |key_gens: &mut HashMap<_, _>, dbs: &HashMap<_, MemDb>, i| { key_gens.remove(&i); key_gens.insert(i, KeyGen::::new(dbs[&i].clone(), entropies[&i].clone())); }; - rebuild(&mut key_gens, 1); - rebuild(&mut key_gens, 2); + rebuild(&mut key_gens, &dbs, 1); + rebuild(&mut key_gens, &dbs, 2); let mut all_shares = HashMap::new(); for i in 1 ..= 5 { let key_gen = key_gens.get_mut(&i).unwrap(); + let mut txn = dbs.get_mut(&i).unwrap().txn(); let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); if let ProcessorMessage::Shares { id, shares } = key_gen - .handle(CoordinatorMessage::Commitments { - id: ID, - commitments: clone_without(&all_commitments, &i), - }) + .handle( + &mut txn, + CoordinatorMessage::Commitments { + id: ID, + commitments: clone_without(&all_commitments, &i), + }, + ) .await { assert_eq!(id, ID); @@ -79,24 +89,29 @@ pub async fn test_key_gen() { } else { panic!("didn't get shares back"); } + txn.commit(); } // Rebuild 1 and 3 - rebuild(&mut key_gens, 1); - rebuild(&mut key_gens, 3); + rebuild(&mut key_gens, &dbs, 1); + rebuild(&mut key_gens, &dbs, 3); let mut res = None; for i in 1 ..= 5 { let key_gen = key_gens.get_mut(&i).unwrap(); + let mut txn = dbs.get_mut(&i).unwrap().txn(); let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); if let ProcessorMessage::GeneratedKeyPair { id, substrate_key, coin_key } = key_gen - .handle(CoordinatorMessage::Shares { - id: ID, - shares: all_shares - .iter() - .filter_map(|(l, shares)| if i == *l { None } else { Some((*l, shares[&i].clone())) }) - .collect(), - }) + .handle( + &mut txn, + CoordinatorMessage::Shares { + id: ID, + shares: all_shares + .iter() + .filter_map(|(l, shares)| if i == *l { None } else { Some((*l, shares[&i].clone())) }) + .collect(), + }, + ) .await { assert_eq!(id, ID); @@ -107,17 +122,25 @@ pub async fn test_key_gen() { } else { panic!("didn't get key back"); } + txn.commit(); } // Rebuild 1 and 4 - rebuild(&mut key_gens, 1); - rebuild(&mut key_gens, 4); + rebuild(&mut key_gens, &dbs, 1); + rebuild(&mut key_gens, &dbs, 4); for i in 1 ..= 5 { let key_gen = key_gens.get_mut(&i).unwrap(); + let mut txn = dbs.get_mut(&i).unwrap().txn(); let KeyConfirmed { activation_block, substrate_keys, coin_keys } = key_gen - .confirm(SubstrateContext { coin_latest_finalized_block: BlockHash([0x11; 32]) }, ID) + .confirm( + &mut txn, + SubstrateContext { coin_latest_finalized_block: BlockHash([0x11; 32]) }, + ID, + ) .await; + txn.commit(); + assert_eq!(activation_block, BlockHash([0x11; 32])); let params = ThresholdParams::new(3, 5, Participant::new(u16::try_from(i).unwrap()).unwrap()).unwrap(); diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index f900ed01..bdfb3782 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -9,7 +9,7 @@ use tokio::time::timeout; use serai_client::primitives::BlockHash; -use serai_db::MemDb; +use serai_db::{DbTxn, Db, MemDb}; use crate::{ coins::{OutputType, Output, Block, Coin}, @@ -20,6 +20,7 @@ pub async fn test_scanner(coin: C) { let mut keys = frost::tests::key_gen::<_, C::Curve>(&mut OsRng).remove(&Participant::new(1).unwrap()).unwrap(); C::tweak_keys(&mut keys); + let group_key = keys.group_key(); // Mine blocks so there's a confirmed block for _ in 0 .. C::CONFIRMATIONS { @@ -30,11 +31,14 @@ pub async fn test_scanner(coin: C) { let activation_number = coin.get_latest_block_number().await.unwrap(); let db = MemDb::new(); let new_scanner = || async { + let mut db = db.clone(); let (mut scanner, active_keys) = Scanner::new(coin.clone(), db.clone()); let mut first = first.lock().unwrap(); if *first { assert!(active_keys.is_empty()); - scanner.rotate_key(activation_number, keys.group_key()).await; + let mut txn = db.txn(); + scanner.rotate_key(&mut txn, activation_number, group_key).await; + txn.commit(); *first = false; } else { assert_eq!(active_keys.len(), 1); @@ -83,7 +87,14 @@ pub async fn test_scanner(coin: C) { } curr_block += 1; } - assert_eq!(scanner.ack_up_to_block(keys.group_key(), block_id).await, (blocks, outputs)); + + let mut cloned_db = db.clone(); + let mut txn = cloned_db.txn(); + assert_eq!( + scanner.ack_up_to_block(&mut txn, keys.group_key(), block_id).await, + (blocks, outputs) + ); + txn.commit(); // There should be no more events assert!(timeout(Duration::from_secs(30), scanner.events.recv()).await.is_err()); diff --git a/processor/src/tests/signer.rs b/processor/src/tests/signer.rs index 91694ff8..bcc0a363 100644 --- a/processor/src/tests/signer.rs +++ b/processor/src/tests/signer.rs @@ -8,7 +8,7 @@ use frost::{ dkg::tests::{key_gen, clone_without}, }; -use serai_db::MemDb; +use serai_db::{DbTxn, Db, MemDb}; use messages::sign::*; use crate::{ @@ -39,19 +39,23 @@ pub async fn sign( } let mut signers = HashMap::new(); + let mut dbs = HashMap::new(); let mut t = 0; for i in 1 ..= keys.len() { let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); let keys = keys.remove(&i).unwrap(); t = keys.params().t(); - signers.insert(i, Signer::new(MemDb::new(), coin.clone(), keys)); + signers.insert(i, Signer::<_, MemDb>::new(coin.clone(), keys)); + dbs.insert(i, MemDb::new()); } drop(keys); for i in 1 ..= signers.len() { let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); let (tx, eventuality) = txs.remove(&i).unwrap(); - signers.get_mut(&i).unwrap().sign_transaction(actual_id.id, tx, eventuality).await; + let mut txn = dbs.get_mut(&i).unwrap().txn(); + signers.get_mut(&i).unwrap().sign_transaction(&mut txn, actual_id.id, tx, eventuality).await; + txn.commit(); } let mut signing_set = vec![]; @@ -84,14 +88,20 @@ pub async fn sign( let mut shares = HashMap::new(); for i in &signing_set { + let mut txn = dbs.get_mut(i).unwrap().txn(); signers .get_mut(i) .unwrap() - .handle(CoordinatorMessage::Preprocesses { - id: actual_id.clone(), - preprocesses: clone_without(&preprocesses, i), - }) + .handle( + &mut txn, + CoordinatorMessage::Preprocesses { + id: actual_id.clone(), + preprocesses: clone_without(&preprocesses, i), + }, + ) .await; + txn.commit(); + if let SignerEvent::ProcessorMessage(ProcessorMessage::Share { id, share }) = signers.get_mut(i).unwrap().events.pop_front().unwrap() { @@ -104,14 +114,17 @@ pub async fn sign( let mut tx_id = None; for i in &signing_set { + let mut txn = dbs.get_mut(i).unwrap().txn(); signers .get_mut(i) .unwrap() - .handle(CoordinatorMessage::Shares { - id: actual_id.clone(), - shares: clone_without(&shares, i), - }) + .handle( + &mut txn, + CoordinatorMessage::Shares { id: actual_id.clone(), shares: clone_without(&shares, i) }, + ) .await; + txn.commit(); + if let SignerEvent::SignedTransaction { id, tx } = signers.get_mut(i).unwrap().events.pop_front().unwrap() { diff --git a/processor/src/tests/substrate_signer.rs b/processor/src/tests/substrate_signer.rs index 395d2b01..01cd3787 100644 --- a/processor/src/tests/substrate_signer.rs +++ b/processor/src/tests/substrate_signer.rs @@ -12,7 +12,7 @@ use frost::{ use scale::Encode; use sp_application_crypto::{RuntimePublic, sr25519::Public}; -use serai_db::MemDb; +use serai_db::{DbTxn, Db, MemDb}; use serai_client::{primitives::*, in_instructions::primitives::*}; @@ -49,14 +49,21 @@ async fn test_substrate_signer() { }; let mut signers = HashMap::new(); + let mut dbs = HashMap::new(); let mut t = 0; for i in 1 ..= keys.len() { let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); let keys = keys.remove(&i).unwrap(); t = keys.params().t(); - let mut signer = SubstrateSigner::new(MemDb::new(), keys); - signer.sign(batch.clone()).await; + + let mut signer = SubstrateSigner::::new(keys); + let mut db = MemDb::new(); + let mut txn = db.txn(); + signer.sign(&mut txn, batch.clone()).await; + txn.commit(); + signers.insert(i, signer); + dbs.insert(i, db); } drop(keys); @@ -92,14 +99,20 @@ async fn test_substrate_signer() { let mut shares = HashMap::new(); for i in &signing_set { + let mut txn = dbs.get_mut(i).unwrap().txn(); signers .get_mut(i) .unwrap() - .handle(CoordinatorMessage::BatchPreprocesses { - id: actual_id.clone(), - preprocesses: clone_without(&preprocesses, i), - }) + .handle( + &mut txn, + CoordinatorMessage::BatchPreprocesses { + id: actual_id.clone(), + preprocesses: clone_without(&preprocesses, i), + }, + ) .await; + txn.commit(); + if let SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchShare { id, share }) = signers.get_mut(i).unwrap().events.pop_front().unwrap() { @@ -111,14 +124,19 @@ async fn test_substrate_signer() { } for i in &signing_set { + let mut txn = dbs.get_mut(i).unwrap().txn(); signers .get_mut(i) .unwrap() - .handle(CoordinatorMessage::BatchShares { - id: actual_id.clone(), - shares: clone_without(&shares, i), - }) + .handle( + &mut txn, + CoordinatorMessage::BatchShares { + id: actual_id.clone(), + shares: clone_without(&shares, i), + }, + ) .await; + txn.commit(); if let SubstrateSignerEvent::SignedBatch(signed_batch) = signers.get_mut(i).unwrap().events.pop_front().unwrap() diff --git a/processor/src/tests/wallet.rs b/processor/src/tests/wallet.rs index 6b33b7fb..2e064b09 100644 --- a/processor/src/tests/wallet.rs +++ b/processor/src/tests/wallet.rs @@ -6,7 +6,7 @@ use frost::{Participant, dkg::tests::key_gen}; use tokio::time::timeout; -use serai_db::MemDb; +use serai_db::{DbTxn, Db, MemDb}; use crate::{ Payment, Plan, @@ -24,10 +24,13 @@ pub async fn test_wallet(coin: C) { } let key = keys[&Participant::new(1).unwrap()].group_key(); - let (mut scanner, active_keys) = Scanner::new(coin.clone(), MemDb::new()); + let mut db = MemDb::new(); + let (mut scanner, active_keys) = Scanner::new(coin.clone(), db.clone()); assert!(active_keys.is_empty()); let (block_id, outputs) = { - scanner.rotate_key(coin.get_latest_block_number().await.unwrap(), key).await; + let mut txn = db.txn(); + scanner.rotate_key(&mut txn, coin.get_latest_block_number().await.unwrap(), key).await; + txn.commit(); let block = coin.test_send(C::address(key)).await; let block_id = block.id(); @@ -114,8 +117,10 @@ pub async fn test_wallet(coin: C) { } // Check the Scanner DB can reload the outputs + let mut txn = db.txn(); assert_eq!( - scanner.ack_up_to_block(key, block.id()).await.1, + scanner.ack_up_to_block(&mut txn, key, block.id()).await.1, [first_outputs, outputs].concat().to_vec() ); + txn.commit(); }