Resolve race condition regarding when forwarded output is set

The higher-level scanner code in multisigs/mod.rs now creates a series of plans
with limited context. These include forwarding and refunding plans, moving all
handling of forwarding flags on the scanner's clock and therefore safe.

Also simplifies the refunding a decent bit.
This commit is contained in:
Luke Parker 2023-11-09 12:29:09 -05:00
parent bf41009c5a
commit 24919cfc54
No known key found for this signature in database
5 changed files with 209 additions and 154 deletions

View file

@ -1,4 +1,4 @@
use std::{sync::RwLock, time::Duration, collections::HashMap};
use std::{time::Duration, collections::HashMap};
use zeroize::{Zeroize, Zeroizing};
@ -499,9 +499,7 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
let mut last_coordinator_msg = None;
loop {
// The following select uses this txn in both branches, hence why needing a RwLock to pass it
// around is needed
let txn = RwLock::new(raw_db.txn());
let mut txn = raw_db.txn();
let mut outer_msg = None;
@ -512,15 +510,12 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
// the other messages in the queue, it may be beneficial to parallelize these
// They could likely be parallelized by type (KeyGen, Sign, Substrate) without issue
msg = coordinator.recv() => {
let mut txn = txn.write().unwrap();
let txn = &mut txn;
assert_eq!(msg.id, (last_coordinator_msg.unwrap_or(msg.id - 1) + 1));
last_coordinator_msg = Some(msg.id);
// Only handle this if we haven't already
if !main_db.handled_message(msg.id) {
MainDb::<N, D>::handle_message(txn, msg.id);
MainDb::<N, D>::handle_message(&mut txn, msg.id);
// This is isolated to better think about how its ordered, or rather, about how the other
// cases aren't ordered
@ -533,7 +528,7 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
// This is safe so long as Tributary and Substrate messages don't both expect mutable
// references over the same data
handle_coordinator_msg(
&mut **txn,
&mut txn,
&network,
&mut coordinator,
&mut tributary_mutable,
@ -545,9 +540,13 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
outer_msg = Some(msg);
},
msg = substrate_mutable.next_event(&txn) => {
let mut txn = txn.write().unwrap();
let txn = &mut txn;
scanner_event = substrate_mutable.next_scanner_event() => {
let msg = substrate_mutable.scanner_event_to_multisig_event(
&mut txn,
&network,
scanner_event
).await;
match msg {
MultisigEvent::Batches(retired_key_new_key, batches) => {
// Start signing this batch
@ -559,7 +558,7 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
).await;
if let Some(substrate_signer) = tributary_mutable.substrate_signer.as_mut() {
if let Some(msg) = substrate_signer.sign(txn, batch).await {
if let Some(msg) = substrate_signer.sign(&mut txn, batch).await {
coordinator.send(msg).await;
}
}
@ -577,7 +576,7 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
},
MultisigEvent::Completed(key, id, tx) => {
if let Some(signer) = tributary_mutable.signers.get_mut(&key) {
if let Some(msg) = signer.completed(txn, id, tx) {
if let Some(msg) = signer.completed(&mut txn, id, tx) {
coordinator.send(msg).await;
}
}
@ -586,7 +585,7 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
},
}
txn.into_inner().unwrap().commit();
txn.commit();
if let Some(msg) = outer_msg {
coordinator.ack(msg).await;
}

View file

@ -5,10 +5,7 @@ use ciphersuite::Ciphersuite;
pub use serai_db::*;
use scale::{Encode, Decode};
use serai_client::{
primitives::{Balance, ExternalAddress},
in_instructions::primitives::InInstructionWithBalance,
};
use serai_client::{primitives::Balance, in_instructions::primitives::InInstructionWithBalance};
use crate::{
Get, Db, Plan,
@ -156,15 +153,33 @@ impl<N: Network, D: Db> MultisigsDb<N, D> {
txn.put(Self::resolved_key(resolution.as_ref()), plan);
}
fn refund_key(id: &[u8]) -> Vec<u8> {
Self::multisigs_key(b"refund", id)
fn plans_from_scanning_key(block_number: usize) -> Vec<u8> {
Self::multisigs_key(b"plans_from_scanning", u32::try_from(block_number).unwrap().to_le_bytes())
}
pub fn set_refund(txn: &mut D::Transaction<'_>, id: &[u8], address: ExternalAddress) {
txn.put(Self::refund_key(id), address.encode());
pub fn set_plans_from_scanning(
txn: &mut D::Transaction<'_>,
block_number: usize,
plans: Vec<Plan<N>>,
) {
let mut buf = vec![];
for plan in plans {
plan.write(&mut buf).unwrap();
}
txn.put(Self::plans_from_scanning_key(block_number), buf);
}
pub fn take_refund(txn: &mut D::Transaction<'_>, id: &[u8]) -> Option<ExternalAddress> {
let key = Self::refund_key(id);
let res = txn.get(&key).map(|address| ExternalAddress::decode(&mut address.as_ref()).unwrap());
pub fn take_plans_from_scanning(
txn: &mut D::Transaction<'_>,
block_number: usize,
) -> Option<Vec<Plan<N>>> {
let key = Self::plans_from_scanning_key(block_number);
let res = txn.get(&key).map(|plans| {
let mut plans_ref = plans.as_slice();
let mut res = vec![];
while !plans_ref.is_empty() {
res.push(Plan::<N>::read(&mut plans_ref).unwrap());
}
res
});
if res.is_some() {
txn.del(key);
}

View file

@ -1,5 +1,5 @@
use core::time::Duration;
use std::{sync::RwLock, collections::HashMap};
use std::collections::HashSet;
use ciphersuite::{group::GroupEncoding, Ciphersuite};
@ -270,8 +270,8 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
fn current_rotation_step(&self, block_number: usize) -> RotationStep {
let Some(new) = self.new.as_ref() else { return RotationStep::UseExisting };
// Period numbering here has no meaning other than these the time values useful here, and the
// order they're built in. They have no reference/shared marker with anything else
// Period numbering here has no meaning other than these are the time values useful here, and
// the order they're calculated in. They have no reference/shared marker with anything else
// ESTIMATED_BLOCK_TIME_IN_SECONDS is fine to use here. While inaccurate, it shouldn't be
// drastically off, and even if it is, it's a hiccup to latency handling only possible when
@ -354,32 +354,27 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
(existing_outputs, new_outputs)
}
fn refund_plan(output: &N::Output, refund_to: N::Address) -> Plan<N> {
fn refund_plan(output: N::Output, refund_to: N::Address) -> Plan<N> {
log::info!("creating refund plan for {}", hex::encode(output.id()));
assert_eq!(output.kind(), OutputType::External);
Plan {
key: output.key(),
inputs: vec![output.clone()],
// Uses a payment as this will still be successfully sent due to fee amortization,
// and because change is currently always a Serai key
payments: vec![Payment { address: refund_to, data: None, balance: output.balance() }],
inputs: vec![output],
change: None,
}
}
// Manually creates Plans for all External outputs needing forwarding/refunding.
//
// Returns created Plans and a map of forwarded output IDs to their associated InInstructions.
fn filter_outputs_due_to_forwarding(
&self,
existing_outputs: &mut Vec<N::Output>,
) -> (Vec<Plan<N>>, HashMap<Vec<u8>, InInstructionWithBalance>) {
// Manually create a Plan for all External outputs needing forwarding/refunding
fn forward_plan(&self, output: N::Output) -> Plan<N> {
log::info!("creating forwarding plan for {}", hex::encode(output.id()));
/*
Sending a Plan, with arbitrary data proxying the InInstruction, would require adding
a flow for networks which drop their data to still embed arbitrary data. It'd also have
edge cases causing failures.
edge cases causing failures (we'd need to manually provide the origin if it was implied,
which may exceed the encoding limit).
Instead, we save the InInstruction as we scan this output. Then, when the output is
successfully forwarded, we simply read it from the local database. This also saves the
@ -395,37 +390,22 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
TODO: Add a fourth address, forwarded_address, to prevent this.
*/
let mut plans = vec![];
let mut forwarding = HashMap::new();
existing_outputs.retain(|output| {
let plans_at_start = plans.len();
if output.kind() == OutputType::External {
let (refund_to, instruction) = instruction_from_output::<N>(output);
if let Some(instruction) = instruction {
// Build a dedicated Plan forwarding this
plans.push(Plan {
key: self.existing.as_ref().unwrap().key,
inputs: vec![output.clone()],
payments: vec![],
change: Some(N::address(self.new.as_ref().unwrap().key)),
});
// Set the instruction for this output to be returned
forwarding.insert(output.id().as_ref().to_vec(), instruction);
} else if let Some(refund_to) = refund_to {
if let Ok(refund_to) = refund_to.consume().try_into() {
// Build a dedicated Plan refunding this
plans.push(Self::refund_plan(output, refund_to));
}
}
}
// Only keep if we didn't make a Plan consuming it
plans_at_start == plans.len()
});
(plans, forwarding)
Plan {
key: self.existing.as_ref().unwrap().key,
payments: vec![Payment {
address: N::address(self.new.as_ref().unwrap().key),
data: None,
balance: output.balance(),
}],
inputs: vec![output],
change: None,
}
}
// Filter newly received outputs due to the step being RotationStep::ClosingExisting.
//
// Returns the Plans for the `Branch`s which should be created off outputs which passed the
// filter.
fn filter_outputs_due_to_closing(
&mut self,
txn: &mut D::Transaction<'_>,
@ -617,16 +597,21 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
block_id: <N::Block as Block<N>>::Id,
step: &mut RotationStep,
burns: Vec<OutInstructionWithBalance>,
) -> (bool, Vec<Plan<N>>, HashMap<Vec<u8>, InInstructionWithBalance>) {
) -> (bool, Vec<Plan<N>>) {
let (mut existing_payments, mut new_payments) = self.burns_to_payments(txn, *step, burns);
let mut plans = vec![];
// We now have to acknowledge the acknowledged block, if it's new
// It won't be if this block's `InInstruction`s were split into multiple `Batch`s
let (acquired_lock, (mut existing_outputs, mut new_outputs)) = {
let (acquired_lock, outputs) = if ScannerHandle::<N, D>::db_scanned(txn)
let (acquired_lock, (mut existing_outputs, new_outputs)) = {
let (acquired_lock, mut outputs) = if ScannerHandle::<N, D>::db_scanned(txn)
.expect("published a Batch despite never scanning a block") <
block_number
{
// Load plans crated when we scanned the block
plans = MultisigsDb::<N, D>::take_plans_from_scanning(txn, block_number).unwrap();
let (is_retirement_block, outputs) = self.scanner.ack_block(txn, block_id.clone()).await;
if is_retirement_block {
let existing = self.existing.take().unwrap();
@ -641,38 +626,26 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
} else {
(false, vec![])
};
// Remove all outputs already present in plans
let mut output_set = HashSet::new();
for plan in &plans {
for input in &plan.inputs {
output_set.insert(input.id().as_ref().to_vec());
}
}
outputs.retain(|output| !output_set.remove(output.id().as_ref()));
assert_eq!(output_set.len(), 0);
(acquired_lock, self.split_outputs_by_key(outputs))
};
let (mut plans, forwarded_external_outputs) = match *step {
RotationStep::UseExisting | RotationStep::NewAsChange => (vec![], HashMap::new()),
RotationStep::ForwardFromExisting => {
self.filter_outputs_due_to_forwarding(&mut existing_outputs)
}
RotationStep::ClosingExisting => {
(self.filter_outputs_due_to_closing(txn, &mut existing_outputs), HashMap::new())
}
};
let handle_refund_outputs = |txn: &mut _, plans: &mut Vec<_>, outputs: &mut Vec<N::Output>| {
outputs.retain(|output| {
if let Some(refund_to) = MultisigsDb::<N, D>::take_refund(txn, output.id().as_ref()) {
// If this isn't a valid refund address, accumulate this output
let Ok(refund_to) = refund_to.consume().try_into() else {
log::info!(
"set refund for {} didn't have a valid address to refund to",
hex::encode(output.id())
);
return true;
};
plans.push(Self::refund_plan(output, refund_to));
return false;
}
true
});
};
handle_refund_outputs(txn, &mut plans, &mut existing_outputs);
// If we're closing the existing multisig, filter its outputs down
if *step == RotationStep::ClosingExisting {
plans.extend(self.filter_outputs_due_to_closing(txn, &mut existing_outputs));
}
// Now that we've done all our filtering, schedule the existing multisig's outputs
plans.extend({
let existing = self.existing.as_mut().unwrap();
let existing_key = existing.key;
@ -694,7 +667,6 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
});
for plan in &plans {
assert_eq!(plan.key, self.existing.as_ref().unwrap().key);
if plan.change == Some(N::change_address(plan.key)) {
// Assert these are only created during the expected step
match *step {
@ -706,12 +678,12 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
}
}
handle_refund_outputs(txn, &mut plans, &mut new_outputs);
// Schedule the new multisig's outputs too
if let Some(new) = self.new.as_mut() {
plans.extend(new.scheduler.schedule::<D>(txn, new_outputs, new_payments, new.key, false));
}
(acquired_lock, plans, forwarded_external_outputs)
(acquired_lock, plans)
}
/// Handle a SubstrateBlock event, building the relevant Plans.
@ -732,7 +704,7 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
let mut step = self.current_rotation_step(block_number);
// Get the Plans from this block
let (acquired_lock, plans, mut forwarded_external_outputs) =
let (acquired_lock, plans) =
self.plans_from_block(txn, block_number, block_id, &mut step, burns).await;
let res = {
@ -745,36 +717,50 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
let key = plan.key;
let key_bytes = key.to_bytes();
let running_operating_costs = MultisigsDb::<N, D>::take_operating_costs(txn);
let (tx, post_fee_branches) = {
let running_operating_costs = MultisigsDb::<N, D>::take_operating_costs(txn);
MultisigsDb::<N, D>::save_active_plan(
txn,
key_bytes.as_ref(),
block_number.try_into().unwrap(),
&plan,
running_operating_costs,
);
MultisigsDb::<N, D>::save_active_plan(
txn,
key_bytes.as_ref(),
block_number.try_into().unwrap(),
&plan,
running_operating_costs,
);
let to_be_forwarded = forwarded_external_outputs.remove(plan.inputs[0].id().as_ref());
if to_be_forwarded.is_some() {
assert_eq!(plan.inputs.len(), 1);
}
let PreparedSend { tx, post_fee_branches, operating_costs } =
prepare_send(network, block_number, plan, running_operating_costs).await;
// 'Drop' running_operating_costs to ensure only operating_costs is used from here on out
#[allow(unused, clippy::let_unit_value)]
let running_operating_costs: () = ();
MultisigsDb::<N, D>::set_operating_costs(txn, operating_costs);
// If this is a Plan for an output we're forwarding, we need to save the InInstruction for
// its output under the amount successfully forwarded
if let Some(mut instruction) = to_be_forwarded {
// If we can't successfully create a forwarding TX, simply drop this
if let Some(tx) = &tx {
instruction.balance.amount.0 -= tx.0.fee();
MultisigsDb::<N, D>::save_forwarded_output(txn, instruction);
let to_be_forwarded = {
let output = &plan.inputs[0];
(step == RotationStep::ForwardFromExisting) &&
(output.kind() == OutputType::External) &&
(output.key() == self.existing.as_ref().unwrap().key)
};
if to_be_forwarded {
assert_eq!(plan.inputs.len(), 1);
}
}
// If we're forwarding this output, don't take the opportunity to amortze operating costs
// The scanner handler below, in order to properly save forwarded outputs' instructions,
// needs to know the actual value the forwarded output will be created with
// Including operating costs prevents that
let to_use_operating_costs = if to_be_forwarded { 0 } else { running_operating_costs };
let PreparedSend { tx, post_fee_branches, mut operating_costs } =
prepare_send(network, block_number, plan, to_use_operating_costs).await;
// Restore running_operating_costs to operating_costs
if to_be_forwarded {
// If we're forwarding this output, operating_costs should still be 0
// Either this TX wasn't created, causing no operating costs, or it was yet it'd be
// amortized
assert_eq!(operating_costs, 0);
operating_costs += running_operating_costs;
}
MultisigsDb::<N, D>::set_operating_costs(txn, operating_costs);
(tx, post_fee_branches)
};
for branch in post_fee_branches {
let existing = self.existing.as_mut().unwrap();
@ -818,13 +804,14 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
self.scanner.release_lock().await;
}
fn scanner_event_to_multisig_event(
pub async fn scanner_event_to_multisig_event(
&self,
txn: &mut D::Transaction<'_>,
network: &N,
msg: ScannerEvent<N>,
) -> MultisigEvent<N> {
let (block_number, event) = match msg {
ScannerEvent::Block { is_retirement_block, block, outputs } => {
ScannerEvent::Block { is_retirement_block, block, mut outputs } => {
// Since the Scanner is asynchronous, the following is a concern for race conditions
// We safely know the step of a block since keys are declared, and the Scanner is safe
// with respect to the declaration of keys
@ -833,13 +820,70 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
.expect("didn't have the block number for a block we just scanned");
let step = self.current_rotation_step(block_number);
// If these aren't externally received funds, don't handle it as an instruction
outputs.retain(|output| output.kind() == OutputType::External);
let mut single_input_plans = vec![];
// If the old multisig is explicitly only supposed to forward, create all such plans now
if step == RotationStep::ForwardFromExisting {
let mut i = 0;
while i < outputs.len() {
let output = &outputs[i];
let single_input_plans = &mut single_input_plans;
let txn = &mut *txn;
#[allow(clippy::redundant_closure_call)]
let should_retain = (|| async move {
// If this output doesn't belong to the existing multisig, it shouldn't be forwarded
if output.key() != self.existing.as_ref().unwrap().key {
return true;
}
let plans_at_start = single_input_plans.len();
let (refund_to, instruction) = instruction_from_output::<N>(output);
if let Some(mut instruction) = instruction {
// Build a dedicated Plan forwarding this
let forward_plan = self.forward_plan(output.clone());
single_input_plans.push(forward_plan.clone());
// Set the instruction for this output to be returned
// We need to set it under the amount it's forwarded with, so prepare its forwarding
// TX to determine the fees involved
let PreparedSend { tx, post_fee_branches: _, operating_costs } =
prepare_send(network, block_number, forward_plan, 0).await;
// operating_costs should not increase in a forwarding TX
assert_eq!(operating_costs, 0);
// If this actually forwarded any coins, save the output as forwarded
// If this didn't create a TX, we don't bother saving the output as forwarded
// The fact we already created and pushed a plan still using this output will cause
// it to not be retained here, and later the plan will be dropped as this did here,
// letting it die out
if let Some(tx) = &tx {
instruction.balance.amount.0 -= tx.0.fee();
MultisigsDb::<N, D>::save_forwarded_output(txn, instruction);
}
} else if let Some(refund_to) = refund_to {
if let Ok(refund_to) = refund_to.consume().try_into() {
// Build a dedicated Plan refunding this
single_input_plans.push(Self::refund_plan(output.clone(), refund_to));
}
}
// Only keep if we didn't make a Plan consuming it
plans_at_start == single_input_plans.len()
})()
.await;
if should_retain {
i += 1;
continue;
}
outputs.remove(i);
}
}
let mut instructions = vec![];
for output in outputs {
// If these aren't externally received funds, don't handle it as an instruction
if output.kind() != OutputType::External {
continue;
}
// If this is an External transaction to the existing multisig, and we're either solely
// forwarding or closing the existing multisig, drop it
// In the case of the forwarding case, we'll report it once it hits the new multisig
@ -852,10 +896,11 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
}
let (refund_to, instruction) = instruction_from_output::<N>(&output);
let refund = |txn| {
let refund = || {
if let Some(refund_to) = refund_to {
log::info!("setting refund for output {}", hex::encode(output.id()));
MultisigsDb::<N, D>::set_refund(txn, output.id().as_ref(), refund_to);
if let Ok(refund_to) = refund_to.consume().try_into() {
single_input_plans.push(Self::refund_plan(output.clone(), refund_to))
}
}
};
let instruction = if let Some(instruction) = instruction {
@ -865,21 +910,17 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
// multisig
// If it's not empty, it's corrupt in some way and should be refunded
if !output.data().is_empty() {
refund(txn);
refund();
continue;
}
// TODO: Both save_forwarded_output and take_forwarded_output have to happen on the
// same clock. Right now, one occurs on Substrate block ack, one occurs on scan.
// TODO: To resolve this, this function has to create the plans for
// forwarding/refunding outputs.
if let Some(instruction) =
MultisigsDb::<N, D>::take_forwarded_output(txn, output.balance())
{
instruction
} else {
// If it's not a forwarded output, refund
refund(txn);
refund();
continue;
}
};
@ -900,6 +941,10 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
instructions.push(instruction);
}
// Save the plans created while scanning
// TODO: Should we combine all of these plans?
MultisigsDb::<N, D>::set_plans_from_scanning(txn, block_number, single_input_plans);
// If any outputs were delayed, append them into this block
match step {
RotationStep::UseExisting => {}
@ -1000,13 +1045,7 @@ impl<D: Db, N: Network> MultisigManager<D, N> {
event
}
// async fn where dropping the Future causes no state changes
// This property is derived from recv having this property, and recv being the only async call
pub async fn next_event(&mut self, txn: &RwLock<D::Transaction<'_>>) -> MultisigEvent<N> {
let event = self.scanner.events.recv().await.unwrap();
// No further code is async
self.scanner_event_to_multisig_event(&mut *txn.write().unwrap(), event)
pub async fn next_scanner_event(&mut self) -> ScannerEvent<N> {
self.scanner.events.recv().await.unwrap()
}
}

View file

@ -404,6 +404,7 @@ impl<N: Network, D: Db> Signer<N, D> {
// branch again for something we've already attempted
//
// Only run if this hasn't already been attempted
// TODO: This isn't complete as this txn may not be committed with the expected timing
if SignerDb::<N, D>::has_attempt(txn, &id) {
warn!(
"already attempted {} #{}. this is an error if we didn't reboot",

View file

@ -191,6 +191,7 @@ impl<D: Db> SubstrateSigner<D> {
// branch again for something we've already attempted
//
// Only run if this hasn't already been attempted
// TODO: This isn't complete as this txn may not be committed with the expected timing
if SubstrateSignerDb::<D>::has_attempt(txn, &id) {
warn!(
"already attempted batch {}, attempt #{}. this is an error if we didn't reboot",