Bug fixes and log statements

Also shims next nonce code with a fine-for-now piece of code which is unviable
in production, yet should survive testnet.
This commit is contained in:
Luke Parker 2023-08-13 02:21:56 -04:00
parent 049fefb5fd
commit 7e71450dc4
No known key found for this signature in database
11 changed files with 217 additions and 72 deletions

View file

@ -374,6 +374,7 @@ pub async fn publish_transaction<D: Db, P: P2p>(
tributary: &Tributary<D, Transaction, P>,
tx: Transaction,
) {
log::debug!("publishing transaction {}", hex::encode(tx.hash()));
if let TransactionKind::Signed(signed) = tx.kind() {
if tributary
.next_nonce(signed.signer)
@ -405,24 +406,32 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
// TODO2: This is slow, and only works as long as a network only has a single Tributary
// (which means there's a lack of multisig rotation)
let genesis = {
let (genesis, my_i) = {
let mut genesis = None;
let mut my_i = None;
for tributary in tributaries.read().await.values() {
if tributary.spec.set().network == msg.network {
genesis = Some(tributary.spec.genesis());
// TODO: We probably want to NOP here, not panic?
my_i = Some(
tributary
.spec
.i(pub_key)
.expect("processor message for network we aren't a validator in"),
);
break;
}
}
genesis.unwrap()
(genesis.unwrap(), my_i.unwrap())
};
let tx = match msg.msg {
let tx = match msg.msg.clone() {
ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
key_gen::ProcessorMessage::Commitments { id, commitments } => {
Some(Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed()))
}
key_gen::ProcessorMessage::Shares { id, shares } => {
Some(Transaction::DkgShares(id.attempt, shares, Transaction::empty_signed()))
Some(Transaction::DkgShares(id.attempt, my_i, shares, Transaction::empty_signed()))
}
key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => {
assert_eq!(
@ -582,10 +591,17 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
}
TransactionKind::Signed(_) => {
// Get the next nonce
// TODO: This should be deterministic, not just DB-backed, to allow rebuilding validators
// without the prior instance's DB
// let mut txn = db.txn();
// let nonce = MainDb::tx_nonce(&mut txn, msg.id, tributary);
let nonce = 0; // TODO
// TODO: This isn't deterministic, or at least DB-backed, and accordingly is unsafe
log::trace!("getting next nonce for Tributary TX in response to processor message");
let nonce = tributary
.next_nonce(Ristretto::generator() * key.deref())
.await
.expect("publishing a TX to a tributary we aren't in");
tx.sign(&mut OsRng, genesis, &key, nonce);
publish_transaction(&tributary, tx).await;
@ -595,6 +611,8 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
_ => panic!("created an unexpected transaction"),
}
}
processors.ack(msg).await;
}
}
@ -664,15 +682,20 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
}),
};
let nonce = 0; // TODO
tx.sign(&mut OsRng, genesis, &key, nonce);
let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else {
panic!("tributary we don't have came to consensus on an ExternalBlock");
};
let tributary = tributary.tributary.read().await;
// TODO: Same note as prior nonce acquisition
log::trace!("getting next nonce for Tributary TX containing Batch signing data");
let nonce = tributary
.next_nonce(Ristretto::generator() * key.deref())
.await
.expect("publishing a TX to a tributary we aren't in");
tx.sign(&mut OsRng, genesis, &key, nonce);
publish_transaction(&tributary, tx).await;
} else {
log::warn!("recognized_id_send was dropped. are we shutting down?");

View file

@ -22,6 +22,7 @@ use libp2p::{
pub use tributary::P2p as TributaryP2p;
// TODO: Use distinct topics
const LIBP2P_TOPIC: &str = "serai-coordinator";
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
@ -99,7 +100,14 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>) {
let mut actual_msg = kind.serialize();
actual_msg.extend(msg);
log::trace!("broadcasting p2p message (kind {kind:?})");
log::trace!(
"broadcasting p2p message (kind {})",
match kind {
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
}
);
self.broadcast_raw(actual_msg).await;
}
async fn receive(&self) -> Message<Self> {
@ -117,7 +125,14 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
};
break (sender, kind, msg_ref.to_vec());
};
log::trace!("received p2p message (kind {kind:?})");
log::trace!(
"received p2p message (kind {})",
match kind {
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
}
);
Message { sender, kind, msg }
}
}

View file

@ -74,6 +74,11 @@ async fn handle_new_set<
};
// The block time is in milliseconds yet the Tributary is in seconds
let time = time / 1000;
// Since this block is in the past, and Tendermint doesn't play nice with starting chains after
// their start time (though it does eventually work), delay the start time by 120 seconds
// This is meant to handle ~20 blocks of lack of finalization for this first block
let time = time + 120;
let spec = TributarySpec::new(block.hash(), time, set, set_data);
create_new_tributary(db, spec.clone()).await;
@ -121,7 +126,7 @@ async fn handle_key_gen<Pro: Processors>(
CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext {
serai_time: block.time().unwrap(),
serai_time: block.time().unwrap() / 1000,
network_latest_finalized_block: serai
.get_latest_block_for_network(block.hash(), set.network)
.await?
@ -213,7 +218,7 @@ async fn handle_batch_and_burns<Pro: Processors>(
CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {
serai_time: block.time().unwrap(),
serai_time: block.time().unwrap() / 1000,
network_latest_finalized_block,
},
network,

View file

@ -60,20 +60,17 @@ async fn dkg_test() {
wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, tx.hash()).await;
}
let expected_commitments = CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments {
id: KeyGenId { set: spec.set(), attempt: 0 },
commitments: txs
.iter()
.enumerate()
.map(|(i, tx)| {
if let Transaction::DkgCommitments(_, commitments, _) = tx {
(Participant::new((i + 1).try_into().unwrap()).unwrap(), commitments.clone())
} else {
panic!("txs had non-commitments");
}
})
.collect(),
});
let expected_commitments: HashMap<_, _> = txs
.iter()
.enumerate()
.map(|(i, tx)| {
if let Transaction::DkgCommitments(_, commitments, _) = tx {
(Participant::new((i + 1).try_into().unwrap()).unwrap(), commitments.clone())
} else {
panic!("txs had non-commitments");
}
})
.collect();
async fn new_processors(
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
@ -119,7 +116,15 @@ async fn dkg_test() {
let mut msgs = processors.0.write().await;
assert_eq!(msgs.len(), 1);
let msgs = msgs.get_mut(&spec.set().network).unwrap();
assert_eq!(msgs.pop_front().unwrap(), expected_commitments);
let mut expected_commitments = expected_commitments.clone();
expected_commitments.remove(&Participant::new((1).try_into().unwrap()).unwrap());
assert_eq!(
msgs.pop_front().unwrap(),
CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments {
id: KeyGenId { set: spec.set(), attempt: 0 },
commitments: expected_commitments
})
);
assert!(msgs.is_empty());
}
@ -129,23 +134,38 @@ async fn dkg_test() {
let mut msgs = processors.0.write().await;
assert_eq!(msgs.len(), 1);
let msgs = msgs.get_mut(&spec.set().network).unwrap();
assert_eq!(msgs.pop_front().unwrap(), expected_commitments);
let mut expected_commitments = expected_commitments.clone();
expected_commitments.remove(&Participant::new((i + 1).try_into().unwrap()).unwrap());
assert_eq!(
msgs.pop_front().unwrap(),
CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments {
id: KeyGenId { set: spec.set(), attempt: 0 },
commitments: expected_commitments
})
);
assert!(msgs.is_empty());
}
// Now do shares
let mut txs = vec![];
for key in &keys {
for (k, key) in keys.iter().enumerate() {
let attempt = 0;
let mut shares = HashMap::new();
for i in 0 .. keys.len() {
let mut share = vec![0; 256];
OsRng.fill_bytes(&mut share);
shares.insert(Participant::new((i + 1).try_into().unwrap()).unwrap(), share);
if i != k {
let mut share = vec![0; 256];
OsRng.fill_bytes(&mut share);
shares.insert(Participant::new((i + 1).try_into().unwrap()).unwrap(), share);
}
}
let mut tx = Transaction::DkgShares(attempt, shares, Transaction::empty_signed());
let mut tx = Transaction::DkgShares(
attempt,
Participant::new((k + 1).try_into().unwrap()).unwrap(),
shares,
Transaction::empty_signed(),
);
tx.sign(&mut OsRng, spec.genesis(), key, 1);
txs.push(tx);
}
@ -184,12 +204,12 @@ async fn dkg_test() {
shares: txs
.iter()
.enumerate()
.map(|(l, tx)| {
if let Transaction::DkgShares(_, shares, _) = tx {
(
Participant::new((l + 1).try_into().unwrap()).unwrap(),
shares[&Participant::new((i + 1).try_into().unwrap()).unwrap()].clone(),
)
.filter_map(|(l, tx)| {
if let Transaction::DkgShares(_, _, shares, _) = tx {
shares
.get(&Participant::new((i + 1).try_into().unwrap()).unwrap())
.cloned()
.map(|share| (Participant::new((l + 1).try_into().unwrap()).unwrap(), share))
} else {
panic!("txs had non-shares");
}
@ -222,7 +242,15 @@ async fn dkg_test() {
let mut msgs = processors.0.write().await;
assert_eq!(msgs.len(), 1);
let msgs = msgs.get_mut(&spec.set().network).unwrap();
assert_eq!(msgs.pop_front().unwrap(), expected_commitments);
let mut expected_commitments = expected_commitments.clone();
expected_commitments.remove(&Participant::new((i + 1).try_into().unwrap()).unwrap());
assert_eq!(
msgs.pop_front().unwrap(),
CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments {
id: KeyGenId { set: spec.set(), attempt: 0 },
commitments: expected_commitments
})
);
assert_eq!(msgs.pop_front().unwrap(), shares_for(i));
assert!(msgs.is_empty());
}

View file

@ -20,6 +20,10 @@ mod dkg;
mod handle_p2p;
mod sync;
fn random_u16<R: RngCore>(rng: &mut R) -> u16 {
u16::try_from(rng.next_u64() >> 48).unwrap()
}
fn random_u32<R: RngCore>(rng: &mut R) -> u32 {
u32::try_from(rng.next_u64() >> 32).unwrap()
}
@ -77,6 +81,7 @@ fn serialize_transaction() {
test_read_write(Transaction::DkgShares(
random_u32(&mut OsRng),
Participant::new(random_u16(&mut OsRng).saturating_add(1)).unwrap(),
shares,
random_signed(&mut OsRng),
));

View file

@ -204,7 +204,7 @@ impl ReadWrite for SignData {
// It provides 4 commitments per input (128 bytes), a 64-byte proof for them, along with a
// key image and proof (96 bytes)
// Even with all of that, we could support 227 inputs in a single TX
// Monero is limited to 120 inputs per TX
// Monero is limited to ~120 inputs per TX
Err(io::Error::new(io::ErrorKind::Other, "signing data exceeded 65535 bytes"))?;
}
writer.write_all(&u16::try_from(self.data.len()).unwrap().to_le_bytes())?;
@ -218,7 +218,7 @@ impl ReadWrite for SignData {
pub enum Transaction {
// Once this completes successfully, no more instances should be created.
DkgCommitments(u32, Vec<u8>, Signed),
DkgShares(u32, HashMap<Participant, Vec<u8>>, Signed),
DkgShares(u32, Participant, HashMap<Participant, Vec<u8>>, Signed),
// When an external block is finalized, we can allow the associated batch IDs
// Commits to the full block so eclipsed nodes don't continue on their eclipsed state
@ -264,6 +264,10 @@ impl ReadWrite for Transaction {
reader.read_exact(&mut attempt)?;
let attempt = u32::from_le_bytes(attempt);
let mut sender_i = [0; 2];
reader.read_exact(&mut sender_i)?;
let sender_i = u16::from_le_bytes(sender_i);
let shares = {
let mut share_quantity = [0; 2];
reader.read_exact(&mut share_quantity)?;
@ -274,7 +278,10 @@ impl ReadWrite for Transaction {
let mut shares = HashMap::new();
for i in 0 .. u16::from_le_bytes(share_quantity) {
let participant = Participant::new(i + 1).unwrap();
let mut participant = Participant::new(i + 1).unwrap();
if u16::from(participant) >= sender_i {
participant = Participant::new(u16::from(participant) + 1).unwrap();
}
let mut share = vec![0; share_len];
reader.read_exact(&mut share)?;
shares.insert(participant, share);
@ -284,7 +291,13 @@ impl ReadWrite for Transaction {
let signed = Signed::read(reader)?;
Ok(Transaction::DkgShares(attempt, shares, signed))
Ok(Transaction::DkgShares(
attempt,
Participant::new(sender_i)
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "invalid sender participant"))?,
shares,
signed,
))
}
2 => {
@ -336,14 +349,30 @@ impl ReadWrite for Transaction {
signed.write(writer)
}
Transaction::DkgShares(attempt, shares, signed) => {
Transaction::DkgShares(attempt, sender_i, shares, signed) => {
writer.write_all(&[1])?;
writer.write_all(&attempt.to_le_bytes())?;
// It's unfortunate to have this so duplicated, yet it avoids needing to pass a Spec to
// read in order to create a valid DkgShares
// TODO: Transform DkgShares to having a Vec of shares, with post-expansion to the proper
// HashMap
writer.write_all(&u16::from(*sender_i).to_le_bytes())?;
// Shares are indexed by non-zero u16s (Participants), so this can't fail
writer.write_all(&u16::try_from(shares.len()).unwrap().to_le_bytes())?;
let mut share_len = None;
for participant in 0 .. shares.len() {
let share = &shares[&Participant::new(u16::try_from(participant + 1).unwrap()).unwrap()];
let mut found_our_share = false;
for participant in 1 ..= (shares.len() + 1) {
let Some(share) =
&shares.get(&Participant::new(u16::try_from(participant).unwrap()).unwrap())
else {
assert!(!found_our_share);
found_our_share = true;
continue;
};
if let Some(share_len) = share_len {
if share.len() != share_len {
panic!("variable length shares");
@ -405,7 +434,7 @@ impl TransactionTrait for Transaction {
fn kind(&self) -> TransactionKind<'_> {
match self {
Transaction::DkgCommitments(_, _, signed) => TransactionKind::Signed(signed),
Transaction::DkgShares(_, _, signed) => TransactionKind::Signed(signed),
Transaction::DkgShares(_, _, _, signed) => TransactionKind::Signed(signed),
Transaction::ExternalBlock(_) => TransactionKind::Provided("external"),
Transaction::SubstrateBlock(_) => TransactionKind::Provided("serai"),
@ -463,7 +492,7 @@ impl Transaction {
fn signed(tx: &mut Transaction) -> &mut Signed {
match tx {
Transaction::DkgCommitments(_, _, ref mut signed) => signed,
Transaction::DkgShares(_, _, ref mut signed) => signed,
Transaction::DkgShares(_, _, _, ref mut signed) => signed,
Transaction::ExternalBlock(_) => panic!("signing ExternalBlock"),
Transaction::SubstrateBlock(_) => panic!("signing SubstrateBlock"),

View file

@ -137,6 +137,15 @@ async fn handle_block<D: Db, Pro: Processors>(
}
assert_eq!(data.len(), usize::from(needed));
// Remove our own piece of data
assert!(data
.remove(
&spec
.i(Ristretto::generator() * key.deref())
.expect("handling a message for a Tributary we aren't part of")
)
.is_some());
return Some(data);
}
None
@ -147,6 +156,7 @@ async fn handle_block<D: Db, Pro: Processors>(
if let Some(commitments) =
handle(Zone::Dkg, b"dkg_commitments", spec.n(), [0; 32], attempt, bytes, signed)
{
log::info!("got all DkgCommitments for {}", hex::encode(genesis));
processors
.send(
spec.set().network,
@ -159,23 +169,33 @@ async fn handle_block<D: Db, Pro: Processors>(
}
}
Transaction::DkgShares(attempt, mut shares, signed) => {
if shares.len() != usize::from(spec.n()) {
Transaction::DkgShares(attempt, sender_i, mut shares, signed) => {
if sender_i !=
spec
.i(signed.signer)
.expect("transaction added to tributary by signer who isn't a participant")
{
// TODO: Full slash
todo!();
}
let bytes = shares
.remove(
&spec
.i(Ristretto::generator() * key.deref())
.expect("in a tributary we're not a validator for"),
)
.unwrap();
if shares.len() != (usize::from(spec.n()) - 1) {
// TODO: Full slash
todo!();
}
// Only save our share's bytes
let our_i = spec
.i(Ristretto::generator() * key.deref())
.expect("in a tributary we're not a validator for");
// This unwrap is safe since the length of shares is checked, the the only missing key
// within the valid range will be the sender's i
let bytes = if sender_i == our_i { vec![] } else { shares.remove(&our_i).unwrap() };
if let Some(shares) =
handle(Zone::Dkg, b"dkg_shares", spec.n(), [0; 32], attempt, bytes, signed)
{
log::info!("got all DkgShares for {}", hex::encode(genesis));
processors
.send(
spec.set().network,

View file

@ -147,6 +147,13 @@ impl<D: Db, T: Transaction> Blockchain<D, T> {
pub(crate) fn add_block(&mut self, block: &Block<T>, commit: Vec<u8>) -> Result<(), BlockError> {
self.verify_block(block)?;
log::info!(
"adding block {} to tributary {} with {} TXs",
hex::encode(block.hash()),
hex::encode(self.genesis),
block.transactions.len(),
);
// None of the following assertions should be reachable since we verified the block
// Take it from the Option so Rust doesn't consider self as mutably borrowed thanks to the

View file

@ -210,8 +210,9 @@ pub trait Network: Send + Sync {
/// Type used for ordered blocks of information.
type Block: Block;
/// Maximum block processing time in seconds. This should include both the actual processing time
/// and the time to download the block.
/// Maximum block processing time in seconds.
///
/// This should include both the time to download the block and the actual processing time.
const BLOCK_PROCESSING_TIME: u32;
/// Network latency time in seconds.
const LATENCY_TIME: u32;

View file

@ -197,7 +197,9 @@ impl<N: Network + 'static> TendermintMachine<N> {
// Sleep until this round ends
let round_end = self.block.end_time[&end_round];
sleep(round_end.instant().saturating_duration_since(Instant::now())).await;
let time_until_round_end = round_end.instant().saturating_duration_since(Instant::now());
log::trace!("sleeping until round ends in {}ms", time_until_round_end.as_millis());
sleep(time_until_round_end).await;
// Clear our outbound message queue
self.queue = VecDeque::new();
@ -313,6 +315,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
let rounds_to_skip = Instant::now().duration_since(start_time.instant()).as_secs() /
u64::from(N::block_time());
if rounds_to_skip != 0 {
log::trace!("joining mid-block so skipping {rounds_to_skip} rounds");
machine.round(RoundNumber(rounds_to_skip.try_into().unwrap()), None);
}
machine
@ -446,7 +449,9 @@ impl<N: Network + 'static> TendermintMachine<N> {
"TendermintMachine produced block {}",
hex::encode(block.id().as_ref()),
);
let id = block.id();
let proposal = self.network.add_block(block, commit).await;
log::trace!("added block {} (produced by machine)", hex::encode(id.as_ref()));
self.reset(msg.round, proposal).await;
}
Err(TendermintError::Malicious(validator)) => self.slash(validator).await,

View file

@ -10,14 +10,15 @@ as a verifiable broadcast layer.
`DkgCommitments` is created when a processor sends the coordinator
`key_gen::ProcessorMessage::Commitments`. When all validators participating in
a multisig publish `DkgCommitments`, the coordinator sends the processor
`key_gen::CoordinatorMessage::Commitments`.
`key_gen::CoordinatorMessage::Commitments`, excluding the processor's own
commitments.
### Key Gen Shares
`DkgShares` is created when a processor sends the coordinator
`key_gen::ProcessorMessage::Shares`. When all validators participating in
a multisig publish `DkgShares`, the coordinator sends the processor
`key_gen::CoordinatorMessage::Shares`.
`key_gen::CoordinatorMessage::Shares`, excluding the processor's own shares.
### External Block
@ -42,8 +43,10 @@ publish transactions for the signing protocols it causes.
`coordinator::ProcessorMessage::BatchPreprocess` and an `ExternalBlock`
transaction allowing the batch to be signed has already been included on chain.
When `t` validators have published `BatchPreprocess` transactions, a
`coordinator::ProcessorMessage::BatchPreprocesses` is sent to the processor.
When `t` validators have published `BatchPreprocess` transactions, if the
coordinator represents one of the first `t` validators to do so, a
`coordinator::ProcessorMessage::BatchPreprocesses` is sent to the processor,
excluding the processor's own preprocess.
### Batch Share
@ -54,9 +57,10 @@ transaction having already been included on chain follows from
also has that precondition.
When the `t` validators who first published `BatchPreprocess` transactions have
published `BatchShare` transactions, a
`coordinator::ProcessorMessage::BatchShares` with the relevant shares is sent
to the processor.
published `BatchShare` transactions, if the coordinator represents one of the
first `t` validators to do so, a `coordinator::ProcessorMessage::BatchShares`
with the relevant shares (excluding the processor's own) is sent to the
processor.
### Sign Preprocess
@ -64,8 +68,10 @@ to the processor.
`sign::ProcessorMessage::Preprocess` and a `SubstrateBlock` transaction
allowing the transaction to be signed has already been included on chain.
When `t` validators have published `SignPreprocess` transactions, a
`sign::ProcessorMessage::Preprocesses` is sent to the processor.
When `t` validators have published `SignPreprocess` transactions, if the
coordinator represents one of the first `t` validators to do so, a
`sign::ProcessorMessage::Preprocesses` is sent to the processor,
excluding the processor's own preprocess.
### Sign Share
@ -76,8 +82,9 @@ having already been included on chain follows from
also has that precondition.
When the `t` validators who first published `SignPreprocess` transactions have
published `SignShare` transactions, a `sign::ProcessorMessage::Shares` with the
relevant shares is sent to the processor.
published `SignShare` transactions, if the coordinator represents one of the
first `t` validators to do so, a `sign::ProcessorMessage::Shares` with the
relevant shares (excluding the processor's own) is sent to the processor.
### Sign Completed