From fcd5fb85df7357b14a4ee0d1d2c10896b6650ee1 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 11 Sep 2024 11:59:15 -0400 Subject: [PATCH] Add binary search to find the block to start scanning from --- processor/bitcoin/src/main.rs | 97 ++++++++++++++------- processor/bitcoin/src/rpc.rs | 44 ++++++++++ processor/scanner/src/lib.rs | 5 ++ processor/src/main.rs | 160 ---------------------------------- 4 files changed, 113 insertions(+), 193 deletions(-) diff --git a/processor/bitcoin/src/main.rs b/processor/bitcoin/src/main.rs index f1f14082..1c07b6cd 100644 --- a/processor/bitcoin/src/main.rs +++ b/processor/bitcoin/src/main.rs @@ -6,11 +6,16 @@ static ALLOCATOR: zalloc::ZeroizingAlloc = zalloc::ZeroizingAlloc(std::alloc::System); +use core::cmp::Ordering; + use ciphersuite::Ciphersuite; +use serai_client::validator_sets::primitives::Session; + use serai_db::{DbTxn, Db}; use ::primitives::EncodableG; use ::key_gen::KeyGenParams as KeyGenParamsTrait; +use scanner::{ScannerFeed, Scanner}; mod primitives; pub(crate) use crate::primitives::*; @@ -38,6 +43,56 @@ pub(crate) fn hash_bytes(hash: bitcoin_serai::bitcoin::hashes::sha256d::Hash) -> res } +async fn first_block_after_time(feed: &S, serai_time: u64) -> u64 { + async fn first_block_after_time_iteration( + feed: &S, + serai_time: u64, + ) -> Result, S::EphemeralError> { + let latest = feed.latest_finalized_block_number().await?; + let latest_time = feed.time_of_block(latest).await?; + if latest_time < serai_time { + tokio::time::sleep(core::time::Duration::from_secs(serai_time - latest_time)).await; + return Ok(None); + } + + // A finalized block has a time greater than or equal to the time we want to start at + // Find the first such block with a binary search + // start_search and end_search are inclusive + let mut start_search = 0; + let mut end_search = latest; + while start_search != end_search { + // This on purposely chooses the earlier block in the case two blocks are both in the middle + let to_check = start_search + ((end_search - start_search) / 2); + let block_time = feed.time_of_block(to_check).await?; + match block_time.cmp(&serai_time) { + Ordering::Less => { + start_search = to_check + 1; + assert!(start_search <= end_search); + } + Ordering::Equal | Ordering::Greater => { + // This holds true since we pick the earlier block upon an even search distance + // If it didn't, this would cause an infinite loop + assert!(to_check < end_search); + end_search = to_check; + } + } + } + Ok(Some(start_search)) + } + loop { + match first_block_after_time_iteration(feed, serai_time).await { + Ok(Some(block)) => return block, + Ok(None) => { + log::info!("waiting for block to activate at (a block with timestamp >= {serai_time})"); + } + Err(e) => { + log::error!("couldn't find the first block Serai should scan due to an RPC error: {e:?}"); + } + } + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + } +} + /// Fetch the next message from the Coordinator. /// /// This message is guaranteed to have never been handled before, where handling is defined as @@ -52,11 +107,13 @@ async fn send_message(_msg: messages::ProcessorMessage) { async fn coordinator_loop( mut db: D, + feed: Rpc, mut key_gen: ::key_gen::KeyGen, mut signers: signers::Signers, Scheduler, Rpc>, mut scanner: Option>>, ) { loop { + let db_clone = db.clone(); let mut txn = db.txn(); let msg = next_message(&mut txn).await; let mut txn = Some(txn); @@ -120,9 +177,13 @@ async fn coordinator_loop( <::ExternalNetworkCurve as Ciphersuite>::G, >::set(txn, session, &key); - // This isn't cheap yet only happens for the very first set of keys - if scanner.is_none() { - todo!("TODO") + // This is presumed extremely expensive, potentially blocking for several minutes, yet + // only happens for the very first set of keys + if session == Session(0) { + assert!(scanner.is_none()); + let start_block = first_block_after_time(&feed, serai_time).await; + scanner = + Some(Scanner::new::>(db_clone, feed.clone(), start_block, key.0).await); } } messages::substrate::CoordinatorMessage::SlashesReported { session } => { @@ -241,36 +302,6 @@ impl TransactionTrait for Transaction { } } -#[async_trait] -impl BlockTrait for Block { - async fn time(&self, rpc: &Bitcoin) -> u64 { - // Use the network median time defined in BIP-0113 since the in-block time isn't guaranteed to - // be monotonic - let mut timestamps = vec![u64::from(self.header.time)]; - let mut parent = self.parent(); - // BIP-0113 uses a median of the prior 11 blocks - while timestamps.len() < 11 { - let mut parent_block; - while { - parent_block = rpc.rpc.get_block(&parent).await; - parent_block.is_err() - } { - log::error!("couldn't get parent block when trying to get block time: {parent_block:?}"); - sleep(Duration::from_secs(5)).await; - } - let parent_block = parent_block.unwrap(); - timestamps.push(u64::from(parent_block.header.time)); - parent = parent_block.parent(); - - if parent == [0; 32] { - break; - } - } - timestamps.sort(); - timestamps[timestamps.len() / 2] - } -} - impl Bitcoin { pub(crate) async fn new(url: String) -> Bitcoin { let mut res = Rpc::new(url.clone()).await; diff --git a/processor/bitcoin/src/rpc.rs b/processor/bitcoin/src/rpc.rs index cafb0ef3..a6f6e5fd 100644 --- a/processor/bitcoin/src/rpc.rs +++ b/processor/bitcoin/src/rpc.rs @@ -34,6 +34,50 @@ impl ScannerFeed for Rpc { db::LatestBlockToYieldAsFinalized::get(&self.db).ok_or(RpcError::ConnectionError) } + async fn time_of_block(&self, number: u64) -> Result { + let number = usize::try_from(number).unwrap(); + + /* + The block time isn't guaranteed to be monotonic. It is guaranteed to be greater than the + median time of prior blocks, as detailed in BIP-0113 (a BIP which used that fact to improve + CLTV). This creates a monotonic median time which we use as the block time. + */ + // This implements `GetMedianTimePast` + let median = { + const MEDIAN_TIMESPAN: usize = 11; + let mut timestamps = Vec::with_capacity(MEDIAN_TIMESPAN); + for i in number.saturating_sub(MEDIAN_TIMESPAN) .. number { + timestamps.push(self.rpc.get_block(&self.rpc.get_block_hash(i).await?).await?.header.time); + } + timestamps.sort(); + timestamps[timestamps.len() / 2] + }; + + /* + This block's timestamp is guaranteed to be greater than this median: + https://github.com/bitcoin/bitcoin/blob/0725a374941355349bb4bc8a79dad1affb27d3b9 + /src/validation.cpp#L4182-L4184 + + This does not guarantee the median always increases however. Take the following trivial + example, as the window is initially built: + + 0 block has time 0 // Prior blocks: [] + 1 block has time 1 // Prior blocks: [0] + 2 block has time 2 // Prior blocks: [0, 1] + 3 block has time 2 // Prior blocks: [0, 1, 2] + + These two blocks have the same time (both greater than the median of their prior blocks) and + the same median. + + The median will never decrease however. The values pushed onto the window will always be + greater than the median. If a value greater than the median is popped, the median will remain + the same (due to the counterbalance of the pushed value). If a value less than the median is + popped, the median will increase (either to another instance of the same value, yet one + closer to the end of the repeating sequence, or to a higher value). + */ + Ok(median.into()) + } + async fn unchecked_block_header_by_number( &self, number: u64, diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 4f30f5e7..6ed16d74 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -106,6 +106,11 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone { /// consensus. The genesis block accordingly has block number 0. async fn latest_finalized_block_number(&self) -> Result; + /// Fetch the timestamp of a block (represented in seconds since the epoch). + /// + /// This must be monotonically incrementing. Two blocks may share a timestamp. + async fn time_of_block(&self, number: u64) -> Result; + /// Fetch a block header by its number. /// /// This does not check the returned BlockHeader is the header for the block we indexed. diff --git a/processor/src/main.rs b/processor/src/main.rs index 51123b92..65e74f55 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -29,158 +29,15 @@ async fn handle_coordinator_msg( substrate_mutable: &mut SubstrateMutable, msg: &Message, ) { - async fn activate_key( - network: &N, - substrate_mutable: &mut SubstrateMutable, - tributary_mutable: &mut TributaryMutable, - txn: &mut D::Transaction<'_>, - session: Session, - key_pair: KeyPair, - activation_number: usize, - ) { - info!("activating {session:?}'s keys at {activation_number}"); - - let network_key = ::Curve::read_G::<&[u8]>(&mut key_pair.1.as_ref()) - .expect("Substrate finalized invalid point as a network's key"); - - if tributary_mutable.key_gen.in_set(&session) { - // See TributaryMutable's struct definition for why this block is safe - let KeyConfirmed { substrate_keys, network_keys } = - tributary_mutable.key_gen.confirm(txn, session, &key_pair); - if session.0 == 0 { - tributary_mutable.batch_signer = - Some(BatchSigner::new(N::NETWORK, session, substrate_keys)); - } - tributary_mutable - .signers - .insert(session, Signer::new(network.clone(), session, network_keys)); - } - - substrate_mutable.add_key(txn, activation_number, network_key).await; - } - match msg.msg.clone() { CoordinatorMessage::Substrate(msg) => { match msg { - messages::substrate::CoordinatorMessage::ConfirmKeyPair { context, session, key_pair } => { - // This is the first key pair for this network so no block has been finalized yet - // TODO: Write documentation for this in docs/ - // TODO: Use an Option instead of a magic? - if context.network_latest_finalized_block.0 == [0; 32] { - assert!(tributary_mutable.signers.is_empty()); - assert!(tributary_mutable.batch_signer.is_none()); - assert!(tributary_mutable.cosigner.is_none()); - // We can't check this as existing is no longer pub - // assert!(substrate_mutable.existing.as_ref().is_none()); - - // Wait until a network's block's time exceeds Serai's time - // These time calls are extremely expensive for what they do, yet they only run when - // confirming the first key pair, before any network activity has occurred, so they - // should be fine - - // If the latest block number is 10, then the block indexed by 1 has 10 confirms - // 10 + 1 - 10 = 1 - let mut block_i; - while { - block_i = (network.get_latest_block_number_with_retries().await + 1) - .saturating_sub(N::CONFIRMATIONS); - network.get_block_with_retries(block_i).await.time(network).await < context.serai_time - } { - info!( - "serai confirmed the first key pair for a set. {} {}", - "we're waiting for a network's finalized block's time to exceed unix time ", - context.serai_time, - ); - sleep(Duration::from_secs(5)).await; - } - - // Find the first block to do so - let mut earliest = block_i; - // earliest > 0 prevents a panic if Serai creates keys before the genesis block - // which... should be impossible - // Yet a prevented panic is a prevented panic - while (earliest > 0) && - (network.get_block_with_retries(earliest - 1).await.time(network).await >= - context.serai_time) - { - earliest -= 1; - } - - // Use this as the activation block - let activation_number = earliest; - - activate_key( - network, - substrate_mutable, - tributary_mutable, - txn, - session, - key_pair, - activation_number, - ) - .await; - } else { - let mut block_before_queue_block = >::Id::default(); - block_before_queue_block - .as_mut() - .copy_from_slice(&context.network_latest_finalized_block.0); - // We can't set these keys for activation until we know their queue block, which we - // won't until the next Batch is confirmed - // Set this variable so when we get the next Batch event, we can handle it - PendingActivationsDb::set_pending_activation::( - txn, - &block_before_queue_block, - session, - key_pair, - ); - } - } - messages::substrate::CoordinatorMessage::SubstrateBlock { context, block: substrate_block, burns, batches, } => { - if let Some((block, session, key_pair)) = - PendingActivationsDb::pending_activation::(txn) - { - // Only run if this is a Batch belonging to a distinct block - if context.network_latest_finalized_block.as_ref() != block.as_ref() { - let mut queue_block = >::Id::default(); - queue_block.as_mut().copy_from_slice(context.network_latest_finalized_block.as_ref()); - - let activation_number = substrate_mutable - .block_number(txn, &queue_block) - .await - .expect("KeyConfirmed from context we haven't synced") + - N::CONFIRMATIONS; - - activate_key( - network, - substrate_mutable, - tributary_mutable, - txn, - session, - key_pair, - activation_number, - ) - .await; - //clear pending activation - txn.del(PendingActivationsDb::key()); - } - } - - // Since this block was acknowledged, we no longer have to sign the batches within it - if let Some(batch_signer) = tributary_mutable.batch_signer.as_mut() { - for batch_id in batches { - batch_signer.batch_signed(txn, batch_id); - } - } - - let (acquired_lock, to_sign) = - substrate_mutable.substrate_block(txn, network, context, burns).await; - // Send SubstrateBlockAck, with relevant plan IDs, before we trigger the signing of these // plans if !tributary_mutable.signers.is_empty() { @@ -197,23 +54,6 @@ async fn handle_coordinator_msg( }) .await; } - - // See commentary in TributaryMutable for why this is safe - let signers = &mut tributary_mutable.signers; - for (key, id, tx, eventuality) in to_sign { - if let Some(session) = SessionDb::get(txn, key.to_bytes().as_ref()) { - let signer = signers.get_mut(&session).unwrap(); - if let Some(msg) = signer.sign_transaction(txn, id, tx, &eventuality).await { - coordinator.send(msg).await; - } - } - } - - // This is not premature, even if this block had multiple `Batch`s created, as the first - // `Batch` alone will trigger all Plans/Eventualities/Signs - if acquired_lock { - substrate_mutable.release_scanner_lock().await; - } } } }