Add binary search to find the block to start scanning from

This commit is contained in:
Luke Parker 2024-09-11 11:59:15 -04:00
parent 3ac0265f07
commit fcd5fb85df
4 changed files with 113 additions and 193 deletions

View file

@ -6,11 +6,16 @@
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> = static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System); zalloc::ZeroizingAlloc(std::alloc::System);
use core::cmp::Ordering;
use ciphersuite::Ciphersuite; use ciphersuite::Ciphersuite;
use serai_client::validator_sets::primitives::Session;
use serai_db::{DbTxn, Db}; use serai_db::{DbTxn, Db};
use ::primitives::EncodableG; use ::primitives::EncodableG;
use ::key_gen::KeyGenParams as KeyGenParamsTrait; use ::key_gen::KeyGenParams as KeyGenParamsTrait;
use scanner::{ScannerFeed, Scanner};
mod primitives; mod primitives;
pub(crate) use crate::primitives::*; pub(crate) use crate::primitives::*;
@ -38,6 +43,56 @@ pub(crate) fn hash_bytes(hash: bitcoin_serai::bitcoin::hashes::sha256d::Hash) ->
res res
} }
async fn first_block_after_time<S: ScannerFeed>(feed: &S, serai_time: u64) -> u64 {
async fn first_block_after_time_iteration<S: ScannerFeed>(
feed: &S,
serai_time: u64,
) -> Result<Option<u64>, S::EphemeralError> {
let latest = feed.latest_finalized_block_number().await?;
let latest_time = feed.time_of_block(latest).await?;
if latest_time < serai_time {
tokio::time::sleep(core::time::Duration::from_secs(serai_time - latest_time)).await;
return Ok(None);
}
// A finalized block has a time greater than or equal to the time we want to start at
// Find the first such block with a binary search
// start_search and end_search are inclusive
let mut start_search = 0;
let mut end_search = latest;
while start_search != end_search {
// This on purposely chooses the earlier block in the case two blocks are both in the middle
let to_check = start_search + ((end_search - start_search) / 2);
let block_time = feed.time_of_block(to_check).await?;
match block_time.cmp(&serai_time) {
Ordering::Less => {
start_search = to_check + 1;
assert!(start_search <= end_search);
}
Ordering::Equal | Ordering::Greater => {
// This holds true since we pick the earlier block upon an even search distance
// If it didn't, this would cause an infinite loop
assert!(to_check < end_search);
end_search = to_check;
}
}
}
Ok(Some(start_search))
}
loop {
match first_block_after_time_iteration(feed, serai_time).await {
Ok(Some(block)) => return block,
Ok(None) => {
log::info!("waiting for block to activate at (a block with timestamp >= {serai_time})");
}
Err(e) => {
log::error!("couldn't find the first block Serai should scan due to an RPC error: {e:?}");
}
}
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
}
}
/// Fetch the next message from the Coordinator. /// Fetch the next message from the Coordinator.
/// ///
/// This message is guaranteed to have never been handled before, where handling is defined as /// This message is guaranteed to have never been handled before, where handling is defined as
@ -52,11 +107,13 @@ async fn send_message(_msg: messages::ProcessorMessage) {
async fn coordinator_loop<D: Db>( async fn coordinator_loop<D: Db>(
mut db: D, mut db: D,
feed: Rpc<D>,
mut key_gen: ::key_gen::KeyGen<KeyGenParams>, mut key_gen: ::key_gen::KeyGen<KeyGenParams>,
mut signers: signers::Signers<D, Rpc<D>, Scheduler<D>, Rpc<D>>, mut signers: signers::Signers<D, Rpc<D>, Scheduler<D>, Rpc<D>>,
mut scanner: Option<scanner::Scanner<Rpc<D>>>, mut scanner: Option<scanner::Scanner<Rpc<D>>>,
) { ) {
loop { loop {
let db_clone = db.clone();
let mut txn = db.txn(); let mut txn = db.txn();
let msg = next_message(&mut txn).await; let msg = next_message(&mut txn).await;
let mut txn = Some(txn); let mut txn = Some(txn);
@ -120,9 +177,13 @@ async fn coordinator_loop<D: Db>(
<<KeyGenParams as ::key_gen::KeyGenParams>::ExternalNetworkCurve as Ciphersuite>::G, <<KeyGenParams as ::key_gen::KeyGenParams>::ExternalNetworkCurve as Ciphersuite>::G,
>::set(txn, session, &key); >::set(txn, session, &key);
// This isn't cheap yet only happens for the very first set of keys // This is presumed extremely expensive, potentially blocking for several minutes, yet
if scanner.is_none() { // only happens for the very first set of keys
todo!("TODO") if session == Session(0) {
assert!(scanner.is_none());
let start_block = first_block_after_time(&feed, serai_time).await;
scanner =
Some(Scanner::new::<Scheduler<D>>(db_clone, feed.clone(), start_block, key.0).await);
} }
} }
messages::substrate::CoordinatorMessage::SlashesReported { session } => { messages::substrate::CoordinatorMessage::SlashesReported { session } => {
@ -241,36 +302,6 @@ impl TransactionTrait<Bitcoin> for Transaction {
} }
} }
#[async_trait]
impl BlockTrait<Bitcoin> for Block {
async fn time(&self, rpc: &Bitcoin) -> u64 {
// Use the network median time defined in BIP-0113 since the in-block time isn't guaranteed to
// be monotonic
let mut timestamps = vec![u64::from(self.header.time)];
let mut parent = self.parent();
// BIP-0113 uses a median of the prior 11 blocks
while timestamps.len() < 11 {
let mut parent_block;
while {
parent_block = rpc.rpc.get_block(&parent).await;
parent_block.is_err()
} {
log::error!("couldn't get parent block when trying to get block time: {parent_block:?}");
sleep(Duration::from_secs(5)).await;
}
let parent_block = parent_block.unwrap();
timestamps.push(u64::from(parent_block.header.time));
parent = parent_block.parent();
if parent == [0; 32] {
break;
}
}
timestamps.sort();
timestamps[timestamps.len() / 2]
}
}
impl Bitcoin { impl Bitcoin {
pub(crate) async fn new(url: String) -> Bitcoin { pub(crate) async fn new(url: String) -> Bitcoin {
let mut res = Rpc::new(url.clone()).await; let mut res = Rpc::new(url.clone()).await;

View file

@ -34,6 +34,50 @@ impl<D: Db> ScannerFeed for Rpc<D> {
db::LatestBlockToYieldAsFinalized::get(&self.db).ok_or(RpcError::ConnectionError) db::LatestBlockToYieldAsFinalized::get(&self.db).ok_or(RpcError::ConnectionError)
} }
async fn time_of_block(&self, number: u64) -> Result<u64, Self::EphemeralError> {
let number = usize::try_from(number).unwrap();
/*
The block time isn't guaranteed to be monotonic. It is guaranteed to be greater than the
median time of prior blocks, as detailed in BIP-0113 (a BIP which used that fact to improve
CLTV). This creates a monotonic median time which we use as the block time.
*/
// This implements `GetMedianTimePast`
let median = {
const MEDIAN_TIMESPAN: usize = 11;
let mut timestamps = Vec::with_capacity(MEDIAN_TIMESPAN);
for i in number.saturating_sub(MEDIAN_TIMESPAN) .. number {
timestamps.push(self.rpc.get_block(&self.rpc.get_block_hash(i).await?).await?.header.time);
}
timestamps.sort();
timestamps[timestamps.len() / 2]
};
/*
This block's timestamp is guaranteed to be greater than this median:
https://github.com/bitcoin/bitcoin/blob/0725a374941355349bb4bc8a79dad1affb27d3b9
/src/validation.cpp#L4182-L4184
This does not guarantee the median always increases however. Take the following trivial
example, as the window is initially built:
0 block has time 0 // Prior blocks: []
1 block has time 1 // Prior blocks: [0]
2 block has time 2 // Prior blocks: [0, 1]
3 block has time 2 // Prior blocks: [0, 1, 2]
These two blocks have the same time (both greater than the median of their prior blocks) and
the same median.
The median will never decrease however. The values pushed onto the window will always be
greater than the median. If a value greater than the median is popped, the median will remain
the same (due to the counterbalance of the pushed value). If a value less than the median is
popped, the median will increase (either to another instance of the same value, yet one
closer to the end of the repeating sequence, or to a higher value).
*/
Ok(median.into())
}
async fn unchecked_block_header_by_number( async fn unchecked_block_header_by_number(
&self, &self,
number: u64, number: u64,

View file

@ -106,6 +106,11 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone {
/// consensus. The genesis block accordingly has block number 0. /// consensus. The genesis block accordingly has block number 0.
async fn latest_finalized_block_number(&self) -> Result<u64, Self::EphemeralError>; async fn latest_finalized_block_number(&self) -> Result<u64, Self::EphemeralError>;
/// Fetch the timestamp of a block (represented in seconds since the epoch).
///
/// This must be monotonically incrementing. Two blocks may share a timestamp.
async fn time_of_block(&self, number: u64) -> Result<u64, Self::EphemeralError>;
/// Fetch a block header by its number. /// Fetch a block header by its number.
/// ///
/// This does not check the returned BlockHeader is the header for the block we indexed. /// This does not check the returned BlockHeader is the header for the block we indexed.

View file

@ -29,158 +29,15 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
substrate_mutable: &mut SubstrateMutable<N, D>, substrate_mutable: &mut SubstrateMutable<N, D>,
msg: &Message, msg: &Message,
) { ) {
async fn activate_key<N: Network, D: Db>(
network: &N,
substrate_mutable: &mut SubstrateMutable<N, D>,
tributary_mutable: &mut TributaryMutable<N, D>,
txn: &mut D::Transaction<'_>,
session: Session,
key_pair: KeyPair,
activation_number: usize,
) {
info!("activating {session:?}'s keys at {activation_number}");
let network_key = <N as Network>::Curve::read_G::<&[u8]>(&mut key_pair.1.as_ref())
.expect("Substrate finalized invalid point as a network's key");
if tributary_mutable.key_gen.in_set(&session) {
// See TributaryMutable's struct definition for why this block is safe
let KeyConfirmed { substrate_keys, network_keys } =
tributary_mutable.key_gen.confirm(txn, session, &key_pair);
if session.0 == 0 {
tributary_mutable.batch_signer =
Some(BatchSigner::new(N::NETWORK, session, substrate_keys));
}
tributary_mutable
.signers
.insert(session, Signer::new(network.clone(), session, network_keys));
}
substrate_mutable.add_key(txn, activation_number, network_key).await;
}
match msg.msg.clone() { match msg.msg.clone() {
CoordinatorMessage::Substrate(msg) => { CoordinatorMessage::Substrate(msg) => {
match msg { match msg {
messages::substrate::CoordinatorMessage::ConfirmKeyPair { context, session, key_pair } => {
// This is the first key pair for this network so no block has been finalized yet
// TODO: Write documentation for this in docs/
// TODO: Use an Option instead of a magic?
if context.network_latest_finalized_block.0 == [0; 32] {
assert!(tributary_mutable.signers.is_empty());
assert!(tributary_mutable.batch_signer.is_none());
assert!(tributary_mutable.cosigner.is_none());
// We can't check this as existing is no longer pub
// assert!(substrate_mutable.existing.as_ref().is_none());
// Wait until a network's block's time exceeds Serai's time
// These time calls are extremely expensive for what they do, yet they only run when
// confirming the first key pair, before any network activity has occurred, so they
// should be fine
// If the latest block number is 10, then the block indexed by 1 has 10 confirms
// 10 + 1 - 10 = 1
let mut block_i;
while {
block_i = (network.get_latest_block_number_with_retries().await + 1)
.saturating_sub(N::CONFIRMATIONS);
network.get_block_with_retries(block_i).await.time(network).await < context.serai_time
} {
info!(
"serai confirmed the first key pair for a set. {} {}",
"we're waiting for a network's finalized block's time to exceed unix time ",
context.serai_time,
);
sleep(Duration::from_secs(5)).await;
}
// Find the first block to do so
let mut earliest = block_i;
// earliest > 0 prevents a panic if Serai creates keys before the genesis block
// which... should be impossible
// Yet a prevented panic is a prevented panic
while (earliest > 0) &&
(network.get_block_with_retries(earliest - 1).await.time(network).await >=
context.serai_time)
{
earliest -= 1;
}
// Use this as the activation block
let activation_number = earliest;
activate_key(
network,
substrate_mutable,
tributary_mutable,
txn,
session,
key_pair,
activation_number,
)
.await;
} else {
let mut block_before_queue_block = <N::Block as Block<N>>::Id::default();
block_before_queue_block
.as_mut()
.copy_from_slice(&context.network_latest_finalized_block.0);
// We can't set these keys for activation until we know their queue block, which we
// won't until the next Batch is confirmed
// Set this variable so when we get the next Batch event, we can handle it
PendingActivationsDb::set_pending_activation::<N>(
txn,
&block_before_queue_block,
session,
key_pair,
);
}
}
messages::substrate::CoordinatorMessage::SubstrateBlock { messages::substrate::CoordinatorMessage::SubstrateBlock {
context, context,
block: substrate_block, block: substrate_block,
burns, burns,
batches, batches,
} => { } => {
if let Some((block, session, key_pair)) =
PendingActivationsDb::pending_activation::<N>(txn)
{
// Only run if this is a Batch belonging to a distinct block
if context.network_latest_finalized_block.as_ref() != block.as_ref() {
let mut queue_block = <N::Block as Block<N>>::Id::default();
queue_block.as_mut().copy_from_slice(context.network_latest_finalized_block.as_ref());
let activation_number = substrate_mutable
.block_number(txn, &queue_block)
.await
.expect("KeyConfirmed from context we haven't synced") +
N::CONFIRMATIONS;
activate_key(
network,
substrate_mutable,
tributary_mutable,
txn,
session,
key_pair,
activation_number,
)
.await;
//clear pending activation
txn.del(PendingActivationsDb::key());
}
}
// Since this block was acknowledged, we no longer have to sign the batches within it
if let Some(batch_signer) = tributary_mutable.batch_signer.as_mut() {
for batch_id in batches {
batch_signer.batch_signed(txn, batch_id);
}
}
let (acquired_lock, to_sign) =
substrate_mutable.substrate_block(txn, network, context, burns).await;
// Send SubstrateBlockAck, with relevant plan IDs, before we trigger the signing of these // Send SubstrateBlockAck, with relevant plan IDs, before we trigger the signing of these
// plans // plans
if !tributary_mutable.signers.is_empty() { if !tributary_mutable.signers.is_empty() {
@ -197,23 +54,6 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
}) })
.await; .await;
} }
// See commentary in TributaryMutable for why this is safe
let signers = &mut tributary_mutable.signers;
for (key, id, tx, eventuality) in to_sign {
if let Some(session) = SessionDb::get(txn, key.to_bytes().as_ref()) {
let signer = signers.get_mut(&session).unwrap();
if let Some(msg) = signer.sign_transaction(txn, id, tx, &eventuality).await {
coordinator.send(msg).await;
}
}
}
// This is not premature, even if this block had multiple `Batch`s created, as the first
// `Batch` alone will trigger all Plans/Eventualities/Signs
if acquired_lock {
substrate_mutable.release_scanner_lock().await;
}
} }
} }
} }