diff --git a/common/db/src/create_db.rs b/common/db/src/create_db.rs index 04157b13..0fbe3404 100644 --- a/common/db/src/create_db.rs +++ b/common/db/src/create_db.rs @@ -29,7 +29,7 @@ pub fn serai_db_key( /// /// ```ignore /// create_db!( -/// TrubutariesDb { +/// TributariesDb { /// AttemptsDb: (key_bytes: &[u8], attempt_id: u32) -> u64, /// ExpiredDb: (genesis: [u8; 32]) -> Vec /// } @@ -70,3 +70,51 @@ macro_rules! create_db { )* }; } + +#[macro_export] +macro_rules! db_channel { + ($db_name: ident { + $($field_name: ident: ($($arg: ident: $arg_type: ty),*) -> $field_type: ty$(,)?)* + }) => { + $( + create_db! { + $db_name { + $field_name: ($($arg: $arg_type,)* index: u32) -> $field_type, + } + } + + impl $field_name { + pub fn send(txn: &mut impl DbTxn $(, $arg: $arg_type)*, value: &$field_type) { + // Use index 0 to store the amount of messages + let messages_sent_key = $field_name::key($($arg),*, 0); + let messages_sent = txn.get(&messages_sent_key).map(|counter| { + u32::from_le_bytes(counter.try_into().unwrap()) + }).unwrap_or(0); + txn.put(&messages_sent_key, (messages_sent + 1).to_le_bytes()); + + // + 2 as index 1 is used for the amount of messages read + // Using distinct counters enables send to be called without mutating anything recv may + // at the same time + let index_to_use = messages_sent + 2; + + $field_name::set(txn, $($arg),*, index_to_use, value); + } + pub fn try_recv(txn: &mut impl DbTxn $(, $arg: $arg_type)*) -> Option<$field_type> { + let messages_recvd_key = $field_name::key($($arg),*, 1); + let messages_recvd = txn.get(&messages_recvd_key).map(|counter| { + u32::from_le_bytes(counter.try_into().unwrap()) + }).unwrap_or(0); + + let index_to_read = messages_recvd + 2; + + let res = $field_name::get(txn, $($arg),*, index_to_read); + if res.is_some() { + $field_name::del(txn, $($arg),*, index_to_read); + txn.put(&messages_recvd_key, (messages_recvd + 1).to_le_bytes()); + } + res + } + } + )* + }; +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index a1f3a6f6..6b5b8fcc 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -847,33 +847,36 @@ async fn handle_cosigns_and_batch_publication( } // Handle pending cosigns - while let Some((session, block, hash)) = CosignTransactions::peek_cosign(&db, network) { - let Some(ActiveTributary { spec, tributary }) = tributaries.get(&session) else { - log::warn!("didn't yet have tributary we're supposed to cosign with"); - break; - }; - log::info!( - "{network:?} {session:?} cosigning block #{block} (hash {}...)", - hex::encode(&hash[.. 8]) - ); - let tx = Transaction::CosignSubstrateBlock(hash); - let res = tributary.provide_transaction(tx.clone()).await; - if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { - if res == Err(ProvidedError::LocalMismatchesOnChain) { - // Spin, since this is a crit for this Tributary - loop { - log::error!( - "{}. tributary: {}, provided: {:?}", - "tributary added distinct CosignSubstrateBlock", - hex::encode(spec.genesis()), - &tx, - ); - sleep(Duration::from_secs(60)).await; + { + let mut txn = db.txn(); + while let Some((session, block, hash)) = CosignTransactions::try_recv(&mut txn, network) { + let Some(ActiveTributary { spec, tributary }) = tributaries.get(&session) else { + log::warn!("didn't yet have tributary we're supposed to cosign with"); + break; + }; + log::info!( + "{network:?} {session:?} cosigning block #{block} (hash {}...)", + hex::encode(&hash[.. 8]) + ); + let tx = Transaction::CosignSubstrateBlock(hash); + let res = tributary.provide_transaction(tx.clone()).await; + if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { + if res == Err(ProvidedError::LocalMismatchesOnChain) { + // Spin, since this is a crit for this Tributary + loop { + log::error!( + "{}. tributary: {}, provided: {:?}", + "tributary added distinct CosignSubstrateBlock", + hex::encode(spec.genesis()), + &tx, + ); + sleep(Duration::from_secs(60)).await; + } } + panic!("provided an invalid CosignSubstrateBlock: {res:?}"); } - panic!("provided an invalid CosignSubstrateBlock: {res:?}"); } - CosignTransactions::take_cosign(db.txn(), network); + txn.commit(); } // Verify any publifshed `Batch`s diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 72909e0a..e0b25e4e 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -37,7 +37,6 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p}; use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; -// TODO: Use distinct topics const LIBP2P_TOPIC: &str = "serai-coordinator"; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)] diff --git a/coordinator/src/substrate/db.rs b/coordinator/src/substrate/db.rs index f5cde131..b1c4c1c2 100644 --- a/coordinator/src/substrate/db.rs +++ b/coordinator/src/substrate/db.rs @@ -1,5 +1,3 @@ -use std::sync::{OnceLock, MutexGuard, Mutex}; - use scale::{Encode, Decode}; pub use serai_db::*; @@ -10,11 +8,10 @@ use serai_client::{ }; create_db! { - NewSubstrateDb { + SubstrateDb { CosignTriggered: () -> (), IntendedCosign: () -> (u64, Option), BlockHasEvents: (block: u64) -> u8, - CosignTransactions: (network: NetworkId) -> Vec<(Session, u64, [u8; 32])>, } } @@ -29,48 +26,15 @@ impl IntendedCosign { } } -// This guarantees: -// 1) Appended transactions are appended -// 2) Taking cosigns does not clear any TXs which weren't taken -// 3) Taking does actually clear the set -static COSIGN_LOCK: OnceLock> = OnceLock::new(); -pub struct CosignTxn(T, MutexGuard<'static, ()>); -impl CosignTxn { - pub fn new(txn: T) -> Self { - Self(txn, COSIGN_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()) - } - pub fn commit(self) { - self.0.commit(); +db_channel! { + SubstrateDb { + CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]), } } impl CosignTransactions { // Append a cosign transaction. - pub fn append_cosign( - txn: &mut CosignTxn, - set: ValidatorSet, - number: u64, - hash: [u8; 32], - ) { - #[allow(clippy::unwrap_or_default)] - let mut txs = CosignTransactions::get(&txn.0, set.network).unwrap_or(vec![]); - txs.push((set.session, number, hash)); - CosignTransactions::set(&mut txn.0, set.network, &txs); - } - // Peek at the next cosign transaction. - pub fn peek_cosign(getter: &impl Get, network: NetworkId) -> Option<(Session, u64, [u8; 32])> { - let mut to_cosign = CosignTransactions::get(getter, network)?; - if to_cosign.is_empty() { - None? - } - Some(to_cosign.swap_remove(0)) - } - // Take the next transaction, panicking if it doesn't exist. - pub fn take_cosign(mut txn: impl DbTxn, network: NetworkId) { - let _lock = COSIGN_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap(); - let mut txs = CosignTransactions::get(&txn, network).unwrap(); - txs.remove(0); - CosignTransactions::set(&mut txn, network, &txs); - txn.commit(); + pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) { + CosignTransactions::send(txn, set.network, &(set.session, number, hash)) } } diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 8d00ee43..e34ca199 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -368,7 +368,6 @@ async fn handle_new_blocks( // TODO: If this block directly builds off a cosigned block *and* doesn't contain events, mark // cosigned, - // TODO: Can we remove any of these events while maintaining security? { // If: // A) This block has events and it's been at least X blocks since the last cosign or @@ -533,16 +532,14 @@ async fn handle_new_blocks( if let Some(has_no_cosigners) = has_no_cosigners { log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number()); SubstrateDb::::set_latest_cosigned_block(&mut txn, has_no_cosigners.number()); - txn.commit(); } else { CosignTriggered::set(&mut txn, &()); - let mut txn = CosignTxn::new(txn); for (set, block, hash) in cosign { log::debug!("cosigning {block} with {:?} {:?}", set.network, set.session); CosignTransactions::append_cosign(&mut txn, set, block, hash); } - txn.commit(); } + txn.commit(); } // Reduce to the latest cosigned block