diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index cca8fbc0..20b2935f 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -3,6 +3,7 @@ #![allow(unreachable_code)] #![allow(clippy::diverging_sub_expression)] +use core::ops::Deref; use std::{ sync::Arc, time::{SystemTime, Duration}, @@ -18,7 +19,7 @@ use serai_client::Serai; use tokio::{sync::RwLock, time::sleep}; -use ::tributary::Tributary; +use ::tributary::{ReadWrite, Block, Tributary}; mod tributary; use crate::tributary::{TributarySpec, Transaction}; @@ -192,18 +193,19 @@ pub async fn heartbeat_tributaries( #[allow(clippy::type_complexity)] pub async fn handle_p2p( + our_key: ::G, p2p: P, tributaries: Arc>>>, ) { loop { - let msg = p2p.receive().await; + let mut msg = p2p.receive().await; match msg.kind { P2pMessageKind::Tributary(genesis) => { let tributaries_read = tributaries.read().await; let Some(tributary) = tributaries_read.get(&genesis) else { - log::debug!("received p2p message for unknown network"); - continue; - }; + log::debug!("received p2p message for unknown network"); + continue; + }; // This is misleading being read, as it will mutate the Tributary, yet there's // greater efficiency when it is read @@ -213,8 +215,74 @@ pub async fn handle_p2p( } } - // TODO: Respond with the missing block, if there are any - P2pMessageKind::Heartbeat(genesis) => todo!(), + P2pMessageKind::Heartbeat(genesis) => { + let tributaries_read = tributaries.read().await; + let Some(tributary) = tributaries_read.get(&genesis) else { + log::debug!("received hearttbeat message for unknown network"); + continue; + }; + + if msg.msg.len() != 32 { + log::error!("validator sent invalid heartbeat"); + continue; + } + + let tributary_read = tributary.tributary.read().await; + + // Have sqrt(n) nodes reply with the blocks + let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; + // Try to have at least 3 responders + if responders < 3 { + responders = tributary.spec.n().min(3).into(); + } + + // Only respond to this if randomly chosen + let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); + // If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9) + // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 + let start = + usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)).unwrap(); + let mut selected = false; + for validator in + &tributary.spec.validators()[start .. (start + usize::try_from(responders).unwrap())] + { + if our_key == validator.0 { + selected = true; + break; + } + } + if !selected { + continue; + } + + let mut latest = msg.msg.try_into().unwrap(); + // TODO: All of these calls don't *actually* need a read lock, just access to a DB handle + // We can reduce lock contention accordingly + while let Some(next) = tributary_read.block_after(&latest) { + let mut res = tributary_read.block(&next).unwrap().serialize(); + res.extend(tributary_read.commit(&next).unwrap()); + p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await; + latest = next; + } + } + + P2pMessageKind::Block(genesis) => { + let mut msg_ref: &[u8] = msg.msg.as_ref(); + let Ok(block) = Block::::read(&mut msg_ref) else { + log::error!("received block message with an invalidly serialized block"); + continue; + }; + // Get just the commit + msg.msg.drain((msg.msg.len() - msg_ref.len()) ..); + + let tributaries = tributaries.read().await; + let Some(tributary) = tributaries.get(&genesis) else { + log::debug!("received block message for unknown network"); + continue; + }; + + tributary.tributary.write().await.sync_block(block, msg.msg).await; + } } } } @@ -257,7 +325,7 @@ pub async fn run( // Handle P2P messages // TODO: We also have to broadcast blocks once they're added - tokio::spawn(handle_p2p(p2p, tributaries)); + tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries)); loop { // Handle all messages from processors diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 985860d3..710537cf 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -11,6 +11,7 @@ pub use tributary::P2p as TributaryP2p; pub enum P2pMessageKind { Tributary([u8; 32]), Heartbeat([u8; 32]), + Block([u8; 32]), } impl P2pMessageKind { @@ -26,6 +27,11 @@ impl P2pMessageKind { res.extend(genesis); res } + P2pMessageKind::Block(genesis) => { + let mut res = vec![2]; + res.extend(genesis); + res + } } } @@ -43,6 +49,11 @@ impl P2pMessageKind { reader.read_exact(&mut genesis).ok()?; P2pMessageKind::Heartbeat(genesis) }), + 2 => Some({ + let mut genesis = [0; 32]; + reader.read_exact(&mut genesis).ok()?; + P2pMessageKind::Block(genesis) + }), _ => None, } } @@ -57,7 +68,7 @@ pub struct Message { #[async_trait] pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { - type Id: Send + Sync + Clone + Debug; + type Id: Send + Sync + Clone + Copy + Debug; async fn send_raw(&self, to: Self::Id, msg: Vec); async fn broadcast_raw(&self, msg: Vec); diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index ccd63351..3a3787fc 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -1,5 +1,5 @@ use core::ops::Deref; -use std::collections::{VecDeque, HashMap}; +use std::collections::HashMap; use zeroize::Zeroizing; @@ -297,43 +297,11 @@ pub async fn handle_new_blocks( spec: &TributarySpec, tributary: &Tributary, ) { - let last_block = db.last_block(tributary.genesis()); - - // Check if there's been a new Tributary block - let latest = tributary.tip().await; - if latest == last_block { - return; - } - - let mut blocks = VecDeque::new(); - // This is a new block, as per the prior if check - blocks.push_back(tributary.block(&latest).unwrap()); - - let mut block = None; - while { - let parent = blocks.back().unwrap().parent(); - // If the parent is the genesis, we've reached the end - if parent == tributary.genesis() { - false - } else { - // Get this block - block = Some(tributary.block(&parent).unwrap()); - // If it's the last block we've scanned, it's the end. Else, push it - block.as_ref().unwrap().hash() != last_block - } - } { - blocks.push_back(block.take().unwrap()); - - // Prevent this from loading the entire chain into RAM by setting a limit of 1000 blocks at a - // time (roughly 350 MB under the current block size limit) - if blocks.len() > 1000 { - blocks.pop_front(); - } - } - - while let Some(block) = blocks.pop_back() { - let hash = block.hash(); + let mut last_block = db.last_block(tributary.genesis()); + while let Some(next) = tributary.block_after(&last_block) { + let block = tributary.block(&next).unwrap(); handle_block(db, key, processor, spec, tributary, block).await; - db.set_last_block(tributary.genesis(), hash); + last_block = next; + db.set_last_block(tributary.genesis(), next); } } diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index 82874ddb..91cf24b6 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -37,6 +37,9 @@ impl Blockchain { fn commit_key(hash: &[u8; 32]) -> Vec { D::key(b"tributary_blockchain", b"commit", hash) } + fn block_after_key(hash: &[u8; 32]) -> Vec { + D::key(b"tributary_blockchain", b"block_after", hash) + } fn next_nonce_key(&self, signer: &::G) -> Vec { D::key( b"tributary_blockchain", @@ -105,6 +108,10 @@ impl Blockchain { Self::commit_from_db(self.db.as_ref().unwrap(), block) } + pub(crate) fn block_after(db: &D, block: &[u8; 32]) -> Option<[u8; 32]> { + db.get(Self::block_after_key(block)).map(|bytes| bytes.try_into().unwrap()) + } + pub(crate) fn add_transaction(&mut self, internal: bool, tx: T) -> bool { self.mempool.add(&self.next_nonces, internal, tx) } @@ -158,6 +165,8 @@ impl Blockchain { txn.put(Self::block_key(&self.tip), block.serialize()); txn.put(Self::commit_key(&self.tip), commit); + txn.put(Self::block_after_key(&block.parent()), block.hash()); + for tx in &block.transactions { match tx.kind() { TransactionKind::Provided(order) => { diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 0b38d1cf..d0d1ad4f 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -149,6 +149,9 @@ impl Tributary { pub fn commit(&self, hash: &[u8; 32]) -> Option> { Blockchain::::commit_from_db(&self.db, hash) } + pub fn block_after(&self, hash: &[u8; 32]) -> Option<[u8; 32]> { + Blockchain::::block_after(&self.db, hash) + } pub fn time_of_block(&self, hash: &[u8; 32]) -> Option { self .commit(hash) diff --git a/coordinator/tributary/src/tests/blockchain.rs b/coordinator/tributary/src/tests/blockchain.rs index 7a16b114..2be63c9b 100644 --- a/coordinator/tributary/src/tests/blockchain.rs +++ b/coordinator/tributary/src/tests/blockchain.rs @@ -23,17 +23,18 @@ fn new_genesis() -> [u8; 32] { fn new_blockchain( genesis: [u8; 32], participants: &[::G], -) -> Blockchain { - let blockchain = Blockchain::new(MemDb::new(), genesis, participants); +) -> (MemDb, Blockchain) { + let db = MemDb::new(); + let blockchain = Blockchain::new(db.clone(), genesis, participants); assert_eq!(blockchain.tip(), genesis); assert_eq!(blockchain.block_number(), 0); - blockchain + (db, blockchain) } #[test] fn block_addition() { let genesis = new_genesis(); - let mut blockchain = new_blockchain::(genesis, &[]); + let (db, mut blockchain) = new_blockchain::(genesis, &[]); let block = blockchain.build_block(); assert_eq!(block.header.parent, genesis); assert_eq!(block.header.transactions, [0; 32]); @@ -41,12 +42,16 @@ fn block_addition() { assert!(blockchain.add_block(&block, vec![]).is_ok()); assert_eq!(blockchain.tip(), block.hash()); assert_eq!(blockchain.block_number(), 1); + assert_eq!( + Blockchain::::block_after(&db, &block.parent()).unwrap(), + block.hash() + ); } #[test] fn invalid_block() { let genesis = new_genesis(); - let mut blockchain = new_blockchain::(genesis, &[]); + let (_, mut blockchain) = new_blockchain::(genesis, &[]); let block = blockchain.build_block(); @@ -77,7 +82,7 @@ fn invalid_block() { } // Run the rest of the tests with them as a participant - let blockchain = new_blockchain(genesis, &[tx.1.signer]); + let (_, blockchain) = new_blockchain(genesis, &[tx.1.signer]); // Re-run the not a participant block to make sure it now works { @@ -130,7 +135,7 @@ fn signed_transaction() { let tx = crate::tests::signed_transaction(&mut OsRng, genesis, &key, 0); let signer = tx.1.signer; - let mut blockchain = new_blockchain::(genesis, &[signer]); + let (_, mut blockchain) = new_blockchain::(genesis, &[signer]); assert_eq!(blockchain.next_nonce(signer), Some(0)); let test = |blockchain: &mut Blockchain, @@ -176,7 +181,7 @@ fn signed_transaction() { #[test] fn provided_transaction() { let genesis = new_genesis(); - let mut blockchain = new_blockchain::(genesis, &[]); + let (_, mut blockchain) = new_blockchain::(genesis, &[]); let tx = random_provided_transaction(&mut OsRng);