diff --git a/Cargo.lock b/Cargo.lock index ed3f73fd..034a27aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2964,6 +2964,17 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-ticker" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9763058047f713632a52e916cc7f6a4b3fc6e9fc1ff8c5b1dc49e5a89041682e" +dependencies = [ + "futures", + "futures-timer", + "instant", +] + [[package]] name = "futures-timer" version = "3.0.2" @@ -3218,6 +3229,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" +[[package]] +name = "hex_fmt" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" + [[package]] name = "hex_lit" version = "0.1.1" @@ -3905,6 +3922,7 @@ dependencies = [ "libp2p-connection-limits", "libp2p-core", "libp2p-dns", + "libp2p-gossipsub", "libp2p-identify", "libp2p-identity", "libp2p-kad", @@ -3989,6 +4007,38 @@ dependencies = [ "trust-dns-resolver", ] +[[package]] +name = "libp2p-gossipsub" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e378da62e8c9251f6e885ed173a561663f29b251e745586cf6ae6150b295c37" +dependencies = [ + "asynchronous-codec", + "base64 0.21.2", + "byteorder", + "bytes", + "either", + "fnv", + "futures", + "futures-ticker", + "getrandom 0.2.10", + "hex_fmt", + "instant", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "log", + "prometheus-client", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "regex", + "sha2 0.10.7", + "smallvec", + "unsigned-varint", + "void", +] + [[package]] name = "libp2p-identify" version = "0.43.0" @@ -4085,6 +4135,7 @@ checksum = "3787ea81798dcc5bf1d8b40a8e8245cf894b168d04dd70aa48cb3ff2fff141d2" dependencies = [ "instant", "libp2p-core", + "libp2p-gossipsub", "libp2p-identify", "libp2p-identity", "libp2p-kad", @@ -7953,6 +8004,7 @@ dependencies = [ "futures", "hex", "lazy_static", + "libp2p", "log", "modular-frost", "parity-scale-codec", @@ -9680,6 +9732,7 @@ version = "0.2.0" dependencies = [ "async-trait", "futures", + "hex", "log", "parity-scale-codec", "thiserror", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index e7913d4d..c23da93d 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -46,7 +46,9 @@ serde_json = { version = "1", default-features = false } log = "0.4" env_logger = "0.10" +futures = "0.3" tokio = { version = "1", features = ["rt-multi-thread", "sync", "time", "macros"] } +libp2p = { version = "0.52", features = ["tokio", "tcp", "noise", "yamux", "gossipsub", "mdns", "macros"] } [dev-dependencies] futures = "0.3" diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index fee2eb1b..1cd9a7ac 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -114,6 +114,8 @@ pub async fn scan_substrate( &mut db, &key, |db: &mut D, spec: TributarySpec| { + log::info!("creating new tributary for {:?}", spec.set()); + // Save it to the database MainDb::new(db).add_active_tributary(&spec); @@ -216,7 +218,17 @@ pub async fn heartbeat_tributaries( // Only trigger syncing if the block is more than a minute behind if SystemTime::now() > (block_time + Duration::from_secs(60)) { log::warn!("last known tributary block was over a minute ago"); - P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), tip.to_vec()).await; + let mut msg = tip.to_vec(); + // Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("system clock is wrong") + .as_secs(); + // Divide by the block time so if multiple parties send a Heartbeat, they're more likely to + // overlap + let time_unit = timestamp / u64::from(Tributary::::block_time()); + msg.extend(time_unit.to_le_bytes()); + P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await; } } @@ -240,14 +252,15 @@ pub async fn handle_p2p( continue; }; + log::trace!("handling message for tributary {:?}", tributary.spec.set()); if tributary.tributary.write().await.handle_message(&msg.msg).await { P2p::broadcast(&p2p, msg.kind, msg.msg).await; } } - // TODO2: Rate limit this per validator + // TODO2: Rate limit this per timestamp P2pMessageKind::Heartbeat(genesis) => { - if msg.msg.len() != 32 { + if msg.msg.len() != 40 { log::error!("validator sent invalid heartbeat"); continue; } @@ -273,7 +286,7 @@ pub async fn handle_p2p( // Decide which nodes will respond by using the latest block's hash as a mutually agreed // upon entropy source - // THis isn't a secure source of entropy, yet it's fine for this + // This isn't a secure source of entropy, yet it's fine for this let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); // If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9) // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 @@ -298,10 +311,12 @@ pub async fn handle_p2p( let reader = tributary_read.reader(); drop(tributary_read); - let mut latest = msg.msg.try_into().unwrap(); + let mut latest = msg.msg[.. 32].try_into().unwrap(); while let Some(next) = reader.block_after(&latest) { let mut res = reader.block(&next).unwrap().serialize(); res.extend(reader.commit(&next).unwrap()); + // Also include the timestamp used within the Heartbeat + res.extend(&msg.msg[32 .. 40]); p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await; latest = next; } @@ -315,6 +330,7 @@ pub async fn handle_p2p( }; // Get just the commit msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); + msg.msg.drain((msg.msg.len() - 8) ..); // Spawn a dedicated task to add this block, as it may take a notable amount of time // While we could use a long-lived task to add each block, that task would only add one @@ -341,6 +357,10 @@ pub async fn handle_p2p( return; }; + // TODO: Add a check which doesn't require write to see if this is the next block in + // line + // If it's in the future, hold it for up to T time + let res = tributary.tributary.write().await.sync_block(block, msg.msg).await; log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); } @@ -699,7 +719,7 @@ async fn main() { key_bytes.zeroize(); key }; - let p2p = LocalP2p::new(1).swap_remove(0); // TODO + let p2p = LibP2p::new(); let processors = Arc::new(MessageQueue::from_env(Service::Coordinator)); diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index c4b430b0..624c827b 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -1,12 +1,29 @@ -use core::fmt::Debug; -use std::{sync::Arc, io::Read, collections::VecDeque}; +use core::{time::Duration, fmt, task::Poll}; +use std::{sync::Arc, collections::VecDeque, io::Read}; use async_trait::async_trait; -use tokio::sync::RwLock; +use tokio::{sync::Mutex, time::sleep}; + +use libp2p::{ + futures::StreamExt, + identity::Keypair, + PeerId, Transport, + core::upgrade, + tcp::{Config, tokio as libp2p_tokio}, + noise, yamux, + gossipsub::{ + IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, + IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError, + Behaviour as GsBehavior, + }, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, Swarm}, +}; pub use tributary::P2p as TributaryP2p; +const LIBP2P_TOPIC: &str = "serai-coordinator"; + #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum P2pMessageKind { Tributary([u8; 32]), @@ -67,8 +84,8 @@ pub struct Message { } #[async_trait] -pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { - type Id: Send + Sync + Clone + Copy + Debug; +pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { + type Id: Send + Sync + Clone + Copy + fmt::Debug; async fn send_raw(&self, to: Self::Id, msg: Vec); async fn broadcast_raw(&self, msg: Vec); @@ -82,6 +99,7 @@ pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { async fn broadcast(&self, kind: P2pMessageKind, msg: Vec) { let mut actual_msg = kind.serialize(); actual_msg.extend(msg); + log::trace!("broadcasting p2p message (kind {kind:?})"); self.broadcast_raw(actual_msg).await; } async fn receive(&self) -> Message { @@ -99,56 +117,187 @@ pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { }; break (sender, kind, msg_ref.to_vec()); }; + log::trace!("received p2p message (kind {kind:?})"); Message { sender, kind, msg } } } -// TODO: Move this to tests -#[allow(clippy::type_complexity)] -#[derive(Clone, Debug)] -pub struct LocalP2p(usize, pub Arc)>>>>); +#[derive(NetworkBehaviour)] +struct Behavior { + gossipsub: GsBehavior, + //#[cfg(debug_assertions)] + mdns: libp2p::mdns::tokio::Behaviour, +} -impl LocalP2p { - pub fn new(validators: usize) -> Vec { - let shared = Arc::new(RwLock::new(vec![VecDeque::new(); validators])); - let mut res = vec![]; - for i in 0 .. validators { - res.push(LocalP2p(i, shared.clone())); - } +#[allow(clippy::type_complexity)] +#[derive(Clone)] +pub struct LibP2p(Arc>>, Arc)>>>); +impl fmt::Debug for LibP2p { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("LibP2p").finish_non_exhaustive() + } +} + +impl LibP2p { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + log::info!("creating a libp2p instance"); + + let throwaway_key_pair = Keypair::generate_ed25519(); + let throwaway_peer_id = PeerId::from(throwaway_key_pair.public()); + + // Uses noise for authentication, yamux for multiplexing + // TODO: Do we want to add a custom authentication protocol to only accept connections from + // fellow validators? Doing so would reduce the potential for spam + let transport = libp2p_tokio::Transport::new(Config::default()) + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&throwaway_key_pair).unwrap()) + .multiplex(yamux::Config::default()) + .boxed(); + + let behavior = Behavior { + gossipsub: { + // Block size limit + 1 KB of space for signatures/metadata + const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024; + + use blake2::{Digest, Blake2s256}; + let config = ConfigBuilder::default() + .max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE) + .validation_mode(ValidationMode::Strict) + // Uses a content based message ID to avoid duplicates as much as possible + .message_id_fn(|msg| { + MessageId::new(&Blake2s256::digest([msg.topic.as_str().as_bytes(), &msg.data].concat())) + }) + // Re-defines for fast ID to prevent needing to convert into a Message to run + // message_id_fn + // This function is valid for both + .fast_message_id_fn(|msg| { + FastMessageId::new(&Blake2s256::digest( + [msg.topic.as_str().as_bytes(), &msg.data].concat(), + )) + }) + .build(); + let mut gossipsub = GsBehavior::::new( + MessageAuthenticity::Signed(throwaway_key_pair), + config.unwrap(), + ) + .unwrap(); + + // Uses a single topic to prevent being a BTC validator only connected to ETH validators, + // unable to communicate with other BTC validators + let topic = IdentTopic::new(LIBP2P_TOPIC); + gossipsub.subscribe(&topic).unwrap(); + + gossipsub + }, + + // Only use MDNS in debug environments, as it should have no value in a release build + // TODO: We do tests on release binaries as of right now... + //#[cfg(debug_assertions)] + mdns: { + log::info!("spawning mdns"); + libp2p::mdns::tokio::Behaviour::new(libp2p::mdns::Config::default(), throwaway_peer_id) + .unwrap() + }, + }; + + let mut swarm = + SwarmBuilder::with_tokio_executor(transport, behavior, throwaway_peer_id).build(); + const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') + swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap(); + + let res = LibP2p(Arc::new(Mutex::new(swarm)), Arc::new(Mutex::new(VecDeque::new()))); + tokio::spawn({ + let p2p = res.clone(); + async move { + // Run this task ad-infinitum + loop { + // Maintain this lock until it's out of events + let mut p2p_lock = p2p.0.lock().await; + loop { + match futures::poll!(p2p_lock.next()) { + //#[cfg(debug_assertions)] + Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns( + libp2p::mdns::Event::Discovered(list), + )))) => { + for (peer, mut addr) in list { + if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) { + log::info!("found peer via mdns"); + p2p_lock.behaviour_mut().gossipsub.add_explicit_peer(&peer); + } + } + } + //#[cfg(debug_assertions)] + Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns( + libp2p::mdns::Event::Expired(list), + )))) => { + for (peer, _) in list { + log::info!("disconnecting peer due to mdns"); + p2p_lock.behaviour_mut().gossipsub.remove_explicit_peer(&peer); + } + } + + Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( + GsEvent::Message { propagation_source, message, .. }, + )))) => { + p2p.1.lock().await.push_back((propagation_source, message.data)); + } + Poll::Ready(Some(_)) => {} + _ => { + drop(p2p_lock); + sleep(Duration::from_millis(100)).await; + break; + } + } + } + } + } + }); res } } #[async_trait] -impl P2p for LocalP2p { - type Id = usize; +impl P2p for LibP2p { + type Id = PeerId; - async fn send_raw(&self, to: Self::Id, msg: Vec) { - self.1.write().await[to].push_back((self.0, msg)); + async fn send_raw(&self, _: Self::Id, msg: Vec) { + self.broadcast_raw(msg).await; } async fn broadcast_raw(&self, msg: Vec) { - for (i, msg_queue) in self.1.write().await.iter_mut().enumerate() { - if i == self.0 { - continue; + match self + .0 + .lock() + .await + .behaviour_mut() + .gossipsub + .publish(IdentTopic::new(LIBP2P_TOPIC), msg.clone()) + { + Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"), + Err(PublishError::InsufficientPeers) => { + log::warn!("failed to send p2p message due to insufficient peers") } - msg_queue.push_back((self.0, msg.clone())); - } + Err(PublishError::MessageTooLarge) => { + panic!("tried to send a too large message: {}", hex::encode(msg)) + } + Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"), + Err(PublishError::Duplicate) | Ok(_) => {} + }; } async fn receive_raw(&self) -> (Self::Id, Vec) { - // This is a cursed way to implement an async read from a Vec loop { - if let Some(res) = self.1.write().await[self.0].pop_front() { + if let Some(res) = self.1.lock().await.pop_front() { return res; } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; } } } #[async_trait] -impl TributaryP2p for LocalP2p { +impl TributaryP2p for LibP2p { async fn broadcast(&self, genesis: [u8; 32], msg: Vec) { ::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await } diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 32689fdf..f9394f67 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -72,8 +72,10 @@ async fn handle_new_set< break res.time().unwrap(); } }; + // The block time is in milliseconds yet the Tributary is in seconds + let time = time / 1000; let spec = TributarySpec::new(block.hash(), time, set, set_data); - create_new_tributary(db, spec.clone()); + create_new_tributary(db, spec.clone()).await; // Trigger a DKG // TODO: Check how the processor handles this being fired multiple times diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index d715b7cd..748ac4a6 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -1,3 +1,4 @@ +use core::fmt::Debug; use std::{ sync::Arc, collections::{VecDeque, HashMap}, @@ -7,9 +8,14 @@ use serai_client::primitives::NetworkId; use processor_messages::CoordinatorMessage; +use async_trait::async_trait; + use tokio::sync::RwLock; -use crate::processors::{Message, Processors}; +use crate::{ + processors::{Message, Processors}, + TributaryP2p, P2pMessageKind, P2p, +}; pub mod tributary; @@ -36,3 +42,53 @@ impl Processors for MemProcessors { todo!() } } + +#[allow(clippy::type_complexity)] +#[derive(Clone, Debug)] +pub struct LocalP2p(usize, pub Arc)>>>>); + +impl LocalP2p { + pub fn new(validators: usize) -> Vec { + let shared = Arc::new(RwLock::new(vec![VecDeque::new(); validators])); + let mut res = vec![]; + for i in 0 .. validators { + res.push(LocalP2p(i, shared.clone())); + } + res + } +} + +#[async_trait] +impl P2p for LocalP2p { + type Id = usize; + + async fn send_raw(&self, to: Self::Id, msg: Vec) { + self.1.write().await[to].push_back((self.0, msg)); + } + + async fn broadcast_raw(&self, msg: Vec) { + for (i, msg_queue) in self.1.write().await.iter_mut().enumerate() { + if i == self.0 { + continue; + } + msg_queue.push_back((self.0, msg.clone())); + } + } + + async fn receive_raw(&self) -> (Self::Id, Vec) { + // This is a cursed way to implement an async read from a Vec + loop { + if let Some(res) = self.1.write().await[self.0].pop_front() { + return res; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } +} + +#[async_trait] +impl TributaryP2p for LocalP2p { + async fn broadcast(&self, genesis: [u8; 32], msg: Vec) { + ::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await + } +} diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs index 1a52d059..4711fb01 100644 --- a/coordinator/src/tests/tributary/chain.rs +++ b/coordinator/src/tests/tributary/chain.rs @@ -23,8 +23,9 @@ use serai_db::MemDb; use tributary::{Transaction as TransactionTrait, Tributary}; use crate::{ - P2pMessageKind, P2p, LocalP2p, + P2pMessageKind, P2p, tributary::{Transaction, TributarySpec}, + tests::LocalP2p, }; pub fn new_keys( diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index f824dced..3ba3b249 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -19,10 +19,9 @@ use processor_messages::{ use tributary::{Transaction as TransactionTrait, Tributary}; use crate::{ - LocalP2p, tributary::{TributaryDb, Transaction, TributarySpec, scanner::handle_new_blocks}, tests::{ - MemProcessors, + MemProcessors, LocalP2p, tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion}, }, }; diff --git a/coordinator/src/tests/tributary/handle_p2p.rs b/coordinator/src/tests/tributary/handle_p2p.rs index a4c1f300..becf5059 100644 --- a/coordinator/src/tests/tributary/handle_p2p.rs +++ b/coordinator/src/tests/tributary/handle_p2p.rs @@ -13,8 +13,11 @@ use tributary::Tributary; use crate::{ tributary::Transaction, - LocalP2p, ActiveTributary, handle_p2p, - tests::tributary::{new_keys, new_spec, new_tributaries}, + ActiveTributary, handle_p2p, + tests::{ + LocalP2p, + tributary::{new_keys, new_spec, new_tributaries}, + }, }; #[tokio::test] diff --git a/coordinator/src/tests/tributary/sync.rs b/coordinator/src/tests/tributary/sync.rs index 89be690d..ced97bd6 100644 --- a/coordinator/src/tests/tributary/sync.rs +++ b/coordinator/src/tests/tributary/sync.rs @@ -16,8 +16,11 @@ use tributary::Tributary; use crate::{ tributary::Transaction, - LocalP2p, ActiveTributary, handle_p2p, heartbeat_tributaries, - tests::tributary::{new_keys, new_spec, new_tributaries}, + ActiveTributary, handle_p2p, heartbeat_tributaries, + tests::{ + LocalP2p, + tributary::{new_keys, new_spec, new_tributaries}, + }, }; #[tokio::test] diff --git a/coordinator/src/tests/tributary/tx.rs b/coordinator/src/tests/tributary/tx.rs index dffce499..98e8f327 100644 --- a/coordinator/src/tests/tributary/tx.rs +++ b/coordinator/src/tests/tributary/tx.rs @@ -9,9 +9,11 @@ use serai_db::MemDb; use tributary::{Transaction as TransactionTrait, Tributary}; use crate::{ - LocalP2p, tributary::Transaction, - tests::tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion}, + tests::{ + LocalP2p, + tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion}, + }, }; #[tokio::test] diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index fe562354..e69a4e15 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -39,6 +39,8 @@ async fn handle_block( spec: &TributarySpec, block: Block, ) { + log::info!("found block for Tributary {:?}", spec.set()); + let genesis = spec.genesis(); let hash = block.hash(); diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index bf5b3f79..cf554f97 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -103,6 +103,8 @@ impl Tributary { validators: Vec<(::G, u64)>, p2p: P, ) -> Option { + log::info!("new Tributary with genesis {}", hex::encode(genesis)); + let validators_vec = validators.iter().map(|validator| validator.0).collect::>(); let signer = Arc::new(Signer::new(genesis, key)); diff --git a/coordinator/tributary/tendermint/Cargo.toml b/coordinator/tributary/tendermint/Cargo.toml index 865034de..14a79746 100644 --- a/coordinator/tributary/tendermint/Cargo.toml +++ b/coordinator/tributary/tendermint/Cargo.toml @@ -11,6 +11,7 @@ edition = "2021" async-trait = "0.1" thiserror = "1" +hex = "0.4" log = "0.4" parity-scale-codec = { version = "3", features = ["derive"] } diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index 753c6526..ad06b8ca 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -239,7 +239,7 @@ impl TendermintMachine { async fn slash(&mut self, validator: N::ValidatorId) { if !self.block.slashes.contains(&validator) { - log::info!(target: "tendermint", "Slashing validator {:?}", validator); + log::info!(target: "tendermint", "Slashing validator {}", hex::encode(validator.encode())); self.block.slashes.insert(validator); self.network.slash(validator).await; } @@ -264,8 +264,16 @@ impl TendermintMachine { messages: msg_send, machine: { let sys_time = sys_time(last_time); + let time_until = sys_time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO); + log::info!( + target: "tendermint", + "new TendermintMachine building off block {} is scheduled to start in {}s", + last_block.0, + time_until.as_secs() + ); + // If the last block hasn't ended yet, sleep until it has - sleep(sys_time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO)).await; + sleep(time_until).await; let signer = network.signer(); let validators = network.signature_scheme(); @@ -298,13 +306,21 @@ impl TendermintMachine { // after it, without the standard amount of separation (so their times will be // equivalent or minimally offset) // For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time()) - machine.round(RoundNumber(0), Some(CanonicalInstant::new(last_time))); + let start_time = CanonicalInstant::new(last_time); + machine.round(RoundNumber(0), Some(start_time)); + + // If we're past the start time, skip to and only join the next round + let rounds_to_skip = Instant::now().duration_since(start_time.instant()).as_secs() / + u64::from(N::block_time()); + machine.round(RoundNumber(rounds_to_skip.try_into().unwrap()), None); machine }, } } pub async fn run(mut self) { + log::debug!(target: "tendermint", "running TendermintMachine"); + loop { // Also create a future for if the queue has a message // Does not pop_front as if another message has higher priority, its future will be handled @@ -330,6 +346,10 @@ impl TendermintMachine { continue; } + log::debug!( + target: "tendermint", + "TendermintMachine received a block from the external sync loop", + ); let proposal = self.network.add_block(block, commit.clone()).await; self.reset_by_commit(commit, proposal).await; self.synced_block_result_send.send(true).await.unwrap(); @@ -419,6 +439,11 @@ impl TendermintMachine { }; debug_assert!(self.network.verify_commit(block.id(), &commit)); + log::info!( + target: "tendermint", + "TendermintMachine produced block {}", + hex::encode(block.id().as_ref()), + ); let proposal = self.network.add_block(block, commit).await; self.reset(msg.round, proposal).await; } diff --git a/substrate/client/src/serai/mod.rs b/substrate/client/src/serai/mod.rs index 942f1bda..cbbd572d 100644 --- a/substrate/client/src/serai/mod.rs +++ b/substrate/client/src/serai/mod.rs @@ -77,7 +77,7 @@ impl Block { self.0.header.number } - /// Returns the time of this block, set by its producer, as a unix timestamp. + /// Returns the time of this block, set by its producer, in milliseconds since the epoch. pub fn time(&self) -> Result { for extrinsic in &self.0.extrinsics { // Inherent/unsigned diff --git a/tests/coordinator/src/lib.rs b/tests/coordinator/src/lib.rs index f6ec1cb3..d0c97f0f 100644 --- a/tests/coordinator/src/lib.rs +++ b/tests/coordinator/src/lib.rs @@ -38,6 +38,10 @@ pub fn coordinator_instance( use serai_client::primitives::insecure_pair_from_name; hex::encode(&insecure_pair_from_name(name).as_ref().secret.to_bytes()[.. 32]) }), + ( + "RUST_LOG".to_string(), + "serai_coordinator=trace,".to_string() + "tributary_chain=trace," + "tendermint=trace", + ), ] .into(), ) diff --git a/tests/coordinator/src/tests/mod.rs b/tests/coordinator/src/tests/mod.rs index 25fa9a52..94e9515e 100644 --- a/tests/coordinator/src/tests/mod.rs +++ b/tests/coordinator/src/tests/mod.rs @@ -70,6 +70,8 @@ async fn stack_test() { }) ); } + + tokio::time::sleep(Duration::from_secs(30)).await; }) .await; }