diff --git a/processor/src/main.rs b/processor/src/main.rs index 52916924..17f3b171 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -1,4 +1,4 @@ -use std::{sync::RwLock, time::Duration, collections::HashMap}; +use std::{time::Duration, collections::HashMap}; use zeroize::{Zeroize, Zeroizing}; @@ -499,9 +499,7 @@ async fn run(mut raw_db: D, network: N, mut let mut last_coordinator_msg = None; loop { - // The following select uses this txn in both branches, hence why needing a RwLock to pass it - // around is needed - let txn = RwLock::new(raw_db.txn()); + let mut txn = raw_db.txn(); let mut outer_msg = None; @@ -512,15 +510,12 @@ async fn run(mut raw_db: D, network: N, mut // the other messages in the queue, it may be beneficial to parallelize these // They could likely be parallelized by type (KeyGen, Sign, Substrate) without issue msg = coordinator.recv() => { - let mut txn = txn.write().unwrap(); - let txn = &mut txn; - assert_eq!(msg.id, (last_coordinator_msg.unwrap_or(msg.id - 1) + 1)); last_coordinator_msg = Some(msg.id); // Only handle this if we haven't already if !main_db.handled_message(msg.id) { - MainDb::::handle_message(txn, msg.id); + MainDb::::handle_message(&mut txn, msg.id); // This is isolated to better think about how its ordered, or rather, about how the other // cases aren't ordered @@ -533,7 +528,7 @@ async fn run(mut raw_db: D, network: N, mut // This is safe so long as Tributary and Substrate messages don't both expect mutable // references over the same data handle_coordinator_msg( - &mut **txn, + &mut txn, &network, &mut coordinator, &mut tributary_mutable, @@ -545,9 +540,13 @@ async fn run(mut raw_db: D, network: N, mut outer_msg = Some(msg); }, - msg = substrate_mutable.next_event(&txn) => { - let mut txn = txn.write().unwrap(); - let txn = &mut txn; + scanner_event = substrate_mutable.next_scanner_event() => { + let msg = substrate_mutable.scanner_event_to_multisig_event( + &mut txn, + &network, + scanner_event + ).await; + match msg { MultisigEvent::Batches(retired_key_new_key, batches) => { // Start signing this batch @@ -559,7 +558,7 @@ async fn run(mut raw_db: D, network: N, mut ).await; if let Some(substrate_signer) = tributary_mutable.substrate_signer.as_mut() { - if let Some(msg) = substrate_signer.sign(txn, batch).await { + if let Some(msg) = substrate_signer.sign(&mut txn, batch).await { coordinator.send(msg).await; } } @@ -577,7 +576,7 @@ async fn run(mut raw_db: D, network: N, mut }, MultisigEvent::Completed(key, id, tx) => { if let Some(signer) = tributary_mutable.signers.get_mut(&key) { - if let Some(msg) = signer.completed(txn, id, tx) { + if let Some(msg) = signer.completed(&mut txn, id, tx) { coordinator.send(msg).await; } } @@ -586,7 +585,7 @@ async fn run(mut raw_db: D, network: N, mut }, } - txn.into_inner().unwrap().commit(); + txn.commit(); if let Some(msg) = outer_msg { coordinator.ack(msg).await; } diff --git a/processor/src/multisigs/db.rs b/processor/src/multisigs/db.rs index 3356cd85..202ce4ee 100644 --- a/processor/src/multisigs/db.rs +++ b/processor/src/multisigs/db.rs @@ -5,10 +5,7 @@ use ciphersuite::Ciphersuite; pub use serai_db::*; use scale::{Encode, Decode}; -use serai_client::{ - primitives::{Balance, ExternalAddress}, - in_instructions::primitives::InInstructionWithBalance, -}; +use serai_client::{primitives::Balance, in_instructions::primitives::InInstructionWithBalance}; use crate::{ Get, Db, Plan, @@ -156,15 +153,33 @@ impl MultisigsDb { txn.put(Self::resolved_key(resolution.as_ref()), plan); } - fn refund_key(id: &[u8]) -> Vec { - Self::multisigs_key(b"refund", id) + fn plans_from_scanning_key(block_number: usize) -> Vec { + Self::multisigs_key(b"plans_from_scanning", u32::try_from(block_number).unwrap().to_le_bytes()) } - pub fn set_refund(txn: &mut D::Transaction<'_>, id: &[u8], address: ExternalAddress) { - txn.put(Self::refund_key(id), address.encode()); + pub fn set_plans_from_scanning( + txn: &mut D::Transaction<'_>, + block_number: usize, + plans: Vec>, + ) { + let mut buf = vec![]; + for plan in plans { + plan.write(&mut buf).unwrap(); + } + txn.put(Self::plans_from_scanning_key(block_number), buf); } - pub fn take_refund(txn: &mut D::Transaction<'_>, id: &[u8]) -> Option { - let key = Self::refund_key(id); - let res = txn.get(&key).map(|address| ExternalAddress::decode(&mut address.as_ref()).unwrap()); + pub fn take_plans_from_scanning( + txn: &mut D::Transaction<'_>, + block_number: usize, + ) -> Option>> { + let key = Self::plans_from_scanning_key(block_number); + let res = txn.get(&key).map(|plans| { + let mut plans_ref = plans.as_slice(); + let mut res = vec![]; + while !plans_ref.is_empty() { + res.push(Plan::::read(&mut plans_ref).unwrap()); + } + res + }); if res.is_some() { txn.del(key); } diff --git a/processor/src/multisigs/mod.rs b/processor/src/multisigs/mod.rs index 4b5214c0..3361141a 100644 --- a/processor/src/multisigs/mod.rs +++ b/processor/src/multisigs/mod.rs @@ -1,5 +1,5 @@ use core::time::Duration; -use std::{sync::RwLock, collections::HashMap}; +use std::collections::HashSet; use ciphersuite::{group::GroupEncoding, Ciphersuite}; @@ -270,8 +270,8 @@ impl MultisigManager { fn current_rotation_step(&self, block_number: usize) -> RotationStep { let Some(new) = self.new.as_ref() else { return RotationStep::UseExisting }; - // Period numbering here has no meaning other than these the time values useful here, and the - // order they're built in. They have no reference/shared marker with anything else + // Period numbering here has no meaning other than these are the time values useful here, and + // the order they're calculated in. They have no reference/shared marker with anything else // ESTIMATED_BLOCK_TIME_IN_SECONDS is fine to use here. While inaccurate, it shouldn't be // drastically off, and even if it is, it's a hiccup to latency handling only possible when @@ -354,32 +354,27 @@ impl MultisigManager { (existing_outputs, new_outputs) } - fn refund_plan(output: &N::Output, refund_to: N::Address) -> Plan { + fn refund_plan(output: N::Output, refund_to: N::Address) -> Plan { log::info!("creating refund plan for {}", hex::encode(output.id())); assert_eq!(output.kind(), OutputType::External); Plan { key: output.key(), - inputs: vec![output.clone()], // Uses a payment as this will still be successfully sent due to fee amortization, // and because change is currently always a Serai key payments: vec![Payment { address: refund_to, data: None, balance: output.balance() }], + inputs: vec![output], change: None, } } - // Manually creates Plans for all External outputs needing forwarding/refunding. - // - // Returns created Plans and a map of forwarded output IDs to their associated InInstructions. - fn filter_outputs_due_to_forwarding( - &self, - existing_outputs: &mut Vec, - ) -> (Vec>, HashMap, InInstructionWithBalance>) { - // Manually create a Plan for all External outputs needing forwarding/refunding + fn forward_plan(&self, output: N::Output) -> Plan { + log::info!("creating forwarding plan for {}", hex::encode(output.id())); /* Sending a Plan, with arbitrary data proxying the InInstruction, would require adding a flow for networks which drop their data to still embed arbitrary data. It'd also have - edge cases causing failures. + edge cases causing failures (we'd need to manually provide the origin if it was implied, + which may exceed the encoding limit). Instead, we save the InInstruction as we scan this output. Then, when the output is successfully forwarded, we simply read it from the local database. This also saves the @@ -395,37 +390,22 @@ impl MultisigManager { TODO: Add a fourth address, forwarded_address, to prevent this. */ - let mut plans = vec![]; - let mut forwarding = HashMap::new(); - existing_outputs.retain(|output| { - let plans_at_start = plans.len(); - if output.kind() == OutputType::External { - let (refund_to, instruction) = instruction_from_output::(output); - if let Some(instruction) = instruction { - // Build a dedicated Plan forwarding this - plans.push(Plan { - key: self.existing.as_ref().unwrap().key, - inputs: vec![output.clone()], - payments: vec![], - change: Some(N::address(self.new.as_ref().unwrap().key)), - }); - - // Set the instruction for this output to be returned - forwarding.insert(output.id().as_ref().to_vec(), instruction); - } else if let Some(refund_to) = refund_to { - if let Ok(refund_to) = refund_to.consume().try_into() { - // Build a dedicated Plan refunding this - plans.push(Self::refund_plan(output, refund_to)); - } - } - } - // Only keep if we didn't make a Plan consuming it - plans_at_start == plans.len() - }); - (plans, forwarding) + Plan { + key: self.existing.as_ref().unwrap().key, + payments: vec![Payment { + address: N::address(self.new.as_ref().unwrap().key), + data: None, + balance: output.balance(), + }], + inputs: vec![output], + change: None, + } } // Filter newly received outputs due to the step being RotationStep::ClosingExisting. + // + // Returns the Plans for the `Branch`s which should be created off outputs which passed the + // filter. fn filter_outputs_due_to_closing( &mut self, txn: &mut D::Transaction<'_>, @@ -617,16 +597,21 @@ impl MultisigManager { block_id: >::Id, step: &mut RotationStep, burns: Vec, - ) -> (bool, Vec>, HashMap, InInstructionWithBalance>) { + ) -> (bool, Vec>) { let (mut existing_payments, mut new_payments) = self.burns_to_payments(txn, *step, burns); + let mut plans = vec![]; + // We now have to acknowledge the acknowledged block, if it's new // It won't be if this block's `InInstruction`s were split into multiple `Batch`s - let (acquired_lock, (mut existing_outputs, mut new_outputs)) = { - let (acquired_lock, outputs) = if ScannerHandle::::db_scanned(txn) + let (acquired_lock, (mut existing_outputs, new_outputs)) = { + let (acquired_lock, mut outputs) = if ScannerHandle::::db_scanned(txn) .expect("published a Batch despite never scanning a block") < block_number { + // Load plans crated when we scanned the block + plans = MultisigsDb::::take_plans_from_scanning(txn, block_number).unwrap(); + let (is_retirement_block, outputs) = self.scanner.ack_block(txn, block_id.clone()).await; if is_retirement_block { let existing = self.existing.take().unwrap(); @@ -641,38 +626,26 @@ impl MultisigManager { } else { (false, vec![]) }; + + // Remove all outputs already present in plans + let mut output_set = HashSet::new(); + for plan in &plans { + for input in &plan.inputs { + output_set.insert(input.id().as_ref().to_vec()); + } + } + outputs.retain(|output| !output_set.remove(output.id().as_ref())); + assert_eq!(output_set.len(), 0); + (acquired_lock, self.split_outputs_by_key(outputs)) }; - let (mut plans, forwarded_external_outputs) = match *step { - RotationStep::UseExisting | RotationStep::NewAsChange => (vec![], HashMap::new()), - RotationStep::ForwardFromExisting => { - self.filter_outputs_due_to_forwarding(&mut existing_outputs) - } - RotationStep::ClosingExisting => { - (self.filter_outputs_due_to_closing(txn, &mut existing_outputs), HashMap::new()) - } - }; - - let handle_refund_outputs = |txn: &mut _, plans: &mut Vec<_>, outputs: &mut Vec| { - outputs.retain(|output| { - if let Some(refund_to) = MultisigsDb::::take_refund(txn, output.id().as_ref()) { - // If this isn't a valid refund address, accumulate this output - let Ok(refund_to) = refund_to.consume().try_into() else { - log::info!( - "set refund for {} didn't have a valid address to refund to", - hex::encode(output.id()) - ); - return true; - }; - plans.push(Self::refund_plan(output, refund_to)); - return false; - } - true - }); - }; - handle_refund_outputs(txn, &mut plans, &mut existing_outputs); + // If we're closing the existing multisig, filter its outputs down + if *step == RotationStep::ClosingExisting { + plans.extend(self.filter_outputs_due_to_closing(txn, &mut existing_outputs)); + } + // Now that we've done all our filtering, schedule the existing multisig's outputs plans.extend({ let existing = self.existing.as_mut().unwrap(); let existing_key = existing.key; @@ -694,7 +667,6 @@ impl MultisigManager { }); for plan in &plans { - assert_eq!(plan.key, self.existing.as_ref().unwrap().key); if plan.change == Some(N::change_address(plan.key)) { // Assert these are only created during the expected step match *step { @@ -706,12 +678,12 @@ impl MultisigManager { } } - handle_refund_outputs(txn, &mut plans, &mut new_outputs); + // Schedule the new multisig's outputs too if let Some(new) = self.new.as_mut() { plans.extend(new.scheduler.schedule::(txn, new_outputs, new_payments, new.key, false)); } - (acquired_lock, plans, forwarded_external_outputs) + (acquired_lock, plans) } /// Handle a SubstrateBlock event, building the relevant Plans. @@ -732,7 +704,7 @@ impl MultisigManager { let mut step = self.current_rotation_step(block_number); // Get the Plans from this block - let (acquired_lock, plans, mut forwarded_external_outputs) = + let (acquired_lock, plans) = self.plans_from_block(txn, block_number, block_id, &mut step, burns).await; let res = { @@ -745,36 +717,50 @@ impl MultisigManager { let key = plan.key; let key_bytes = key.to_bytes(); - let running_operating_costs = MultisigsDb::::take_operating_costs(txn); + let (tx, post_fee_branches) = { + let running_operating_costs = MultisigsDb::::take_operating_costs(txn); - MultisigsDb::::save_active_plan( - txn, - key_bytes.as_ref(), - block_number.try_into().unwrap(), - &plan, - running_operating_costs, - ); + MultisigsDb::::save_active_plan( + txn, + key_bytes.as_ref(), + block_number.try_into().unwrap(), + &plan, + running_operating_costs, + ); - let to_be_forwarded = forwarded_external_outputs.remove(plan.inputs[0].id().as_ref()); - if to_be_forwarded.is_some() { - assert_eq!(plan.inputs.len(), 1); - } - let PreparedSend { tx, post_fee_branches, operating_costs } = - prepare_send(network, block_number, plan, running_operating_costs).await; - // 'Drop' running_operating_costs to ensure only operating_costs is used from here on out - #[allow(unused, clippy::let_unit_value)] - let running_operating_costs: () = (); - MultisigsDb::::set_operating_costs(txn, operating_costs); - - // If this is a Plan for an output we're forwarding, we need to save the InInstruction for - // its output under the amount successfully forwarded - if let Some(mut instruction) = to_be_forwarded { - // If we can't successfully create a forwarding TX, simply drop this - if let Some(tx) = &tx { - instruction.balance.amount.0 -= tx.0.fee(); - MultisigsDb::::save_forwarded_output(txn, instruction); + let to_be_forwarded = { + let output = &plan.inputs[0]; + (step == RotationStep::ForwardFromExisting) && + (output.kind() == OutputType::External) && + (output.key() == self.existing.as_ref().unwrap().key) + }; + if to_be_forwarded { + assert_eq!(plan.inputs.len(), 1); } - } + + // If we're forwarding this output, don't take the opportunity to amortze operating costs + // The scanner handler below, in order to properly save forwarded outputs' instructions, + // needs to know the actual value the forwarded output will be created with + // Including operating costs prevents that + let to_use_operating_costs = if to_be_forwarded { 0 } else { running_operating_costs }; + + let PreparedSend { tx, post_fee_branches, mut operating_costs } = + prepare_send(network, block_number, plan, to_use_operating_costs).await; + + // Restore running_operating_costs to operating_costs + if to_be_forwarded { + // If we're forwarding this output, operating_costs should still be 0 + // Either this TX wasn't created, causing no operating costs, or it was yet it'd be + // amortized + assert_eq!(operating_costs, 0); + + operating_costs += running_operating_costs; + } + + MultisigsDb::::set_operating_costs(txn, operating_costs); + + (tx, post_fee_branches) + }; for branch in post_fee_branches { let existing = self.existing.as_mut().unwrap(); @@ -818,13 +804,14 @@ impl MultisigManager { self.scanner.release_lock().await; } - fn scanner_event_to_multisig_event( + pub async fn scanner_event_to_multisig_event( &self, txn: &mut D::Transaction<'_>, + network: &N, msg: ScannerEvent, ) -> MultisigEvent { let (block_number, event) = match msg { - ScannerEvent::Block { is_retirement_block, block, outputs } => { + ScannerEvent::Block { is_retirement_block, block, mut outputs } => { // Since the Scanner is asynchronous, the following is a concern for race conditions // We safely know the step of a block since keys are declared, and the Scanner is safe // with respect to the declaration of keys @@ -833,13 +820,70 @@ impl MultisigManager { .expect("didn't have the block number for a block we just scanned"); let step = self.current_rotation_step(block_number); + // If these aren't externally received funds, don't handle it as an instruction + outputs.retain(|output| output.kind() == OutputType::External); + + let mut single_input_plans = vec![]; + // If the old multisig is explicitly only supposed to forward, create all such plans now + if step == RotationStep::ForwardFromExisting { + let mut i = 0; + while i < outputs.len() { + let output = &outputs[i]; + let single_input_plans = &mut single_input_plans; + let txn = &mut *txn; + + #[allow(clippy::redundant_closure_call)] + let should_retain = (|| async move { + // If this output doesn't belong to the existing multisig, it shouldn't be forwarded + if output.key() != self.existing.as_ref().unwrap().key { + return true; + } + + let plans_at_start = single_input_plans.len(); + let (refund_to, instruction) = instruction_from_output::(output); + if let Some(mut instruction) = instruction { + // Build a dedicated Plan forwarding this + let forward_plan = self.forward_plan(output.clone()); + single_input_plans.push(forward_plan.clone()); + + // Set the instruction for this output to be returned + // We need to set it under the amount it's forwarded with, so prepare its forwarding + // TX to determine the fees involved + let PreparedSend { tx, post_fee_branches: _, operating_costs } = + prepare_send(network, block_number, forward_plan, 0).await; + // operating_costs should not increase in a forwarding TX + assert_eq!(operating_costs, 0); + + // If this actually forwarded any coins, save the output as forwarded + // If this didn't create a TX, we don't bother saving the output as forwarded + // The fact we already created and pushed a plan still using this output will cause + // it to not be retained here, and later the plan will be dropped as this did here, + // letting it die out + if let Some(tx) = &tx { + instruction.balance.amount.0 -= tx.0.fee(); + MultisigsDb::::save_forwarded_output(txn, instruction); + } + } else if let Some(refund_to) = refund_to { + if let Ok(refund_to) = refund_to.consume().try_into() { + // Build a dedicated Plan refunding this + single_input_plans.push(Self::refund_plan(output.clone(), refund_to)); + } + } + + // Only keep if we didn't make a Plan consuming it + plans_at_start == single_input_plans.len() + })() + .await; + if should_retain { + i += 1; + continue; + } + outputs.remove(i); + } + } + let mut instructions = vec![]; for output in outputs { - // If these aren't externally received funds, don't handle it as an instruction - if output.kind() != OutputType::External { - continue; - } - // If this is an External transaction to the existing multisig, and we're either solely // forwarding or closing the existing multisig, drop it // In the case of the forwarding case, we'll report it once it hits the new multisig @@ -852,10 +896,11 @@ impl MultisigManager { } let (refund_to, instruction) = instruction_from_output::(&output); - let refund = |txn| { + let refund = || { if let Some(refund_to) = refund_to { - log::info!("setting refund for output {}", hex::encode(output.id())); - MultisigsDb::::set_refund(txn, output.id().as_ref(), refund_to); + if let Ok(refund_to) = refund_to.consume().try_into() { + single_input_plans.push(Self::refund_plan(output.clone(), refund_to)) + } } }; let instruction = if let Some(instruction) = instruction { @@ -865,21 +910,17 @@ impl MultisigManager { // multisig // If it's not empty, it's corrupt in some way and should be refunded if !output.data().is_empty() { - refund(txn); + refund(); continue; } - // TODO: Both save_forwarded_output and take_forwarded_output have to happen on the - // same clock. Right now, one occurs on Substrate block ack, one occurs on scan. - // TODO: To resolve this, this function has to create the plans for - // forwarding/refunding outputs. if let Some(instruction) = MultisigsDb::::take_forwarded_output(txn, output.balance()) { instruction } else { // If it's not a forwarded output, refund - refund(txn); + refund(); continue; } }; @@ -900,6 +941,10 @@ impl MultisigManager { instructions.push(instruction); } + // Save the plans created while scanning + // TODO: Should we combine all of these plans? + MultisigsDb::::set_plans_from_scanning(txn, block_number, single_input_plans); + // If any outputs were delayed, append them into this block match step { RotationStep::UseExisting => {} @@ -1000,13 +1045,7 @@ impl MultisigManager { event } - // async fn where dropping the Future causes no state changes - // This property is derived from recv having this property, and recv being the only async call - pub async fn next_event(&mut self, txn: &RwLock>) -> MultisigEvent { - let event = self.scanner.events.recv().await.unwrap(); - - // No further code is async - - self.scanner_event_to_multisig_event(&mut *txn.write().unwrap(), event) + pub async fn next_scanner_event(&mut self) -> ScannerEvent { + self.scanner.events.recv().await.unwrap() } } diff --git a/processor/src/signer.rs b/processor/src/signer.rs index adb59062..b86c3118 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -404,6 +404,7 @@ impl Signer { // branch again for something we've already attempted // // Only run if this hasn't already been attempted + // TODO: This isn't complete as this txn may not be committed with the expected timing if SignerDb::::has_attempt(txn, &id) { warn!( "already attempted {} #{}. this is an error if we didn't reboot", diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index baa154c9..49ee51bb 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -191,6 +191,7 @@ impl SubstrateSigner { // branch again for something we've already attempted // // Only run if this hasn't already been attempted + // TODO: This isn't complete as this txn may not be committed with the expected timing if SubstrateSignerDb::::has_attempt(txn, &id) { warn!( "already attempted batch {}, attempt #{}. this is an error if we didn't reboot",