\#339 addendum

This commit is contained in:
Luke Parker 2023-11-15 20:23:19 -05:00
parent d25e3d86a2
commit 369af0fab5
No known key found for this signature in database
10 changed files with 225 additions and 71 deletions

View file

@ -1,20 +1,23 @@
use core::time::Duration; use core::time::Duration;
use std::{ use std::{
sync::{Arc, Mutex, RwLock}, sync::Arc,
collections::{HashSet, HashMap}, collections::{HashSet, HashMap},
}; };
use tokio::{sync::mpsc, time::sleep}; use tokio::{
sync::{mpsc, Mutex, RwLock},
time::sleep,
};
use scale::Encode; use scale::Encode;
use sp_application_crypto::RuntimePublic; use sp_application_crypto::RuntimePublic;
use serai_client::{ use serai_client::{
primitives::{NETWORKS, NetworkId, Signature}, primitives::{NETWORKS, NetworkId, Signature},
validator_sets::primitives::{Session, ValidatorSet}, validator_sets::primitives::{Session, ValidatorSet},
SeraiError, Serai, SeraiError, TemporalSerai, Serai,
}; };
use serai_db::{DbTxn, Db}; use serai_db::{Get, DbTxn, Db, create_db};
use processor_messages::coordinator::cosign_block_msg; use processor_messages::coordinator::cosign_block_msg;
@ -23,6 +26,13 @@ use crate::{
substrate::SubstrateDb, substrate::SubstrateDb,
}; };
create_db! {
CosignDb {
ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> Vec<u8>,
DistinctChain: (set: ValidatorSet) -> ()
}
}
pub struct CosignEvaluator<D: Db> { pub struct CosignEvaluator<D: Db> {
db: Mutex<D>, db: Mutex<D>,
serai: Arc<Serai>, serai: Arc<Serai>,
@ -31,14 +41,14 @@ pub struct CosignEvaluator<D: Db> {
} }
impl<D: Db> CosignEvaluator<D> { impl<D: Db> CosignEvaluator<D> {
fn update_latest_cosign(&self) { async fn update_latest_cosign(&self) {
let stakes_lock = self.stakes.read().unwrap(); let stakes_lock = self.stakes.read().await;
// If we haven't gotten the stake data yet, return // If we haven't gotten the stake data yet, return
let Some(stakes) = stakes_lock.as_ref() else { return }; let Some(stakes) = stakes_lock.as_ref() else { return };
let total_stake = stakes.values().cloned().sum::<u64>(); let total_stake = stakes.values().cloned().sum::<u64>();
let latest_cosigns = self.latest_cosigns.read().unwrap(); let latest_cosigns = self.latest_cosigns.read().await;
let mut highest_block = 0; let mut highest_block = 0;
for (block_num, _) in latest_cosigns.values() { for (block_num, _) in latest_cosigns.values() {
let mut networks = HashSet::new(); let mut networks = HashSet::new();
@ -55,7 +65,7 @@ impl<D: Db> CosignEvaluator<D> {
} }
} }
let mut db_lock = self.db.lock().unwrap(); let mut db_lock = self.db.lock().await;
let mut txn = db_lock.txn(); let mut txn = db_lock.txn();
if highest_block > SubstrateDb::<D>::latest_cosigned_block(&txn) { if highest_block > SubstrateDb::<D>::latest_cosigned_block(&txn) {
log::info!("setting latest cosigned block to {}", highest_block); log::info!("setting latest cosigned block to {}", highest_block);
@ -85,61 +95,172 @@ impl<D: Db> CosignEvaluator<D> {
} }
// Since we've successfully built stakes, set it // Since we've successfully built stakes, set it
*self.stakes.write().unwrap() = Some(stakes); *self.stakes.write().await = Some(stakes);
self.update_latest_cosign(); self.update_latest_cosign().await;
Ok(()) Ok(())
} }
// Uses Err to signify a message should be retried // Uses Err to signify a message should be retried
async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> { async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> {
let Some(block) = self.serai.block(cosign.block).await? else { // If we already have this cosign or a newer cosign, return
log::warn!("received cosign for an unknown block"); if let Some(latest) = self.latest_cosigns.read().await.get(&cosign.network) {
if latest.0 >= cosign.block_number {
return Ok(()); return Ok(());
}; }
}
// If this an old cosign, don't bother handling it // If this an old cosign (older than a day), drop it
if block.number() < let latest_block = self.serai.latest_block().await?;
self.latest_cosigns.read().unwrap().get(&cosign.network).map(|cosign| cosign.0).unwrap_or(0) if (cosign.block_number + (24 * 60 * 60 / 6)) < latest_block.number() {
{ log::debug!("received old cosign supposedly signed by {:?}", cosign.network);
log::debug!("received old cosign from {:?}", cosign.network);
return Ok(()); return Ok(());
} }
// Get the key for this network as of the prior block let Some(block) = self.serai.block_by_number(cosign.block_number).await? else {
let serai = self.serai.as_of(block.header().parent_hash.into()); log::warn!("received cosign with a block number which doesn't map to a block");
let Some(latest_session) = serai.validator_sets().session(cosign.network).await? else {
log::warn!("received cosign from {:?}, which doesn't yet have a session", cosign.network);
return Ok(()); return Ok(());
}; };
async fn set_with_keys_fn(
serai: &TemporalSerai<'_>,
network: NetworkId,
) -> Result<Option<ValidatorSet>, SeraiError> {
let Some(latest_session) = serai.validator_sets().session(network).await? else {
log::warn!("received cosign from {:?}, which doesn't yet have a session", network);
return Ok(None);
};
let prior_session = Session(latest_session.0.saturating_sub(1)); let prior_session = Session(latest_session.0.saturating_sub(1));
let set_with_keys = if serai Ok(Some(
if serai
.validator_sets() .validator_sets()
.keys(ValidatorSet { network: cosign.network, session: prior_session }) .keys(ValidatorSet { network, session: prior_session })
.await? .await?
.is_some() .is_some()
{ {
ValidatorSet { network: cosign.network, session: prior_session } ValidatorSet { network, session: prior_session }
} else { } else {
ValidatorSet { network: cosign.network, session: latest_session } ValidatorSet { network, session: latest_session }
}; },
))
}
// Get the key for this network as of the prior block
// If we have two chains, this value may be different across chains depending on if one chain
// included the set_keys and one didn't
// Because set_keys will force a cosign, it will force detection of distinct blocks
// re: set_keys using keys prior to set_keys (assumed amenable to all)
let serai = self.serai.as_of(block.header().parent_hash.into());
let Some(set_with_keys) = set_with_keys_fn(&serai, cosign.network).await? else {
return Ok(());
};
let Some(keys) = serai.validator_sets().keys(set_with_keys).await? else { let Some(keys) = serai.validator_sets().keys(set_with_keys).await? else {
log::warn!("received cosign for a block we didn't have keys for"); log::warn!("received cosign for a block we didn't have keys for");
return Ok(()); return Ok(());
}; };
if !keys.0.verify(&cosign_block_msg(cosign.block), &Signature(cosign.signature)) { if !keys
.0
.verify(&cosign_block_msg(cosign.block_number, cosign.block), &Signature(cosign.signature))
{
log::warn!("received cosigned block with an invalid signature"); log::warn!("received cosigned block with an invalid signature");
return Ok(()); return Ok(());
} }
log::info!("received cosign for block {} by {:?}", block.number(), cosign.network); log::info!(
self.latest_cosigns.write().unwrap().insert(cosign.network, (block.number(), cosign)); "received cosign for block {} ({}) by {:?}",
block.number(),
hex::encode(cosign.block),
cosign.network
);
self.update_latest_cosign(); // Save this cosign to the DB
{
let mut db = self.db.lock().await;
let mut txn = db.txn();
ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign.encode());
txn.commit();
}
if cosign.block != block.hash() {
log::error!(
"received cosign for a distinct block at {}. we have {}. cosign had {}",
cosign.block_number,
hex::encode(block.hash()),
hex::encode(cosign.block)
);
let serai = self.serai.as_of(latest_block.hash());
let mut db = self.db.lock().await;
// Save this set as being on a different chain
let mut txn = db.txn();
DistinctChain::set(&mut txn, set_with_keys, &());
txn.commit();
let mut total_stake = 0;
let mut total_on_distinct_chain = 0;
for network in NETWORKS {
if network == NetworkId::Serai {
continue;
}
// Get the current set for this network
let set_with_keys = {
let mut res;
while {
res = set_with_keys_fn(&serai, cosign.network).await;
res.is_err()
} {
log::error!(
"couldn't get the set with keys when checking for a distinct chain: {:?}",
res
);
tokio::time::sleep(core::time::Duration::from_secs(3)).await;
}
res.unwrap()
};
// Get its stake
// Doesn't use the stakes inside self to prevent deadlocks re: multi-lock acquisition
if let Some(set_with_keys) = set_with_keys {
let stake = {
let mut res;
while {
res = serai.validator_sets().total_allocated_stake(set_with_keys.network).await;
res.is_err()
} {
log::error!(
"couldn't get total allocated stake when checking for a distinct chain: {:?}",
res
);
tokio::time::sleep(core::time::Duration::from_secs(3)).await;
}
res.unwrap()
};
if let Some(stake) = stake {
total_stake += stake.0;
if DistinctChain::get(&*db, set_with_keys).is_some() {
total_on_distinct_chain += stake.0;
}
}
}
}
// See https://github.com/serai-dex/serai/issues/339 for the reasoning on 17%
if (total_stake * 17 / 100) <= total_on_distinct_chain {
panic!("17% of validator sets (by stake) have co-signed a distinct chain");
}
} else {
let mut latest_cosigns = self.latest_cosigns.write().await;
latest_cosigns.insert(cosign.network, (block.number(), cosign));
self.update_latest_cosign().await;
}
Ok(()) Ok(())
} }
@ -191,7 +312,7 @@ impl<D: Db> CosignEvaluator<D> {
let cosigns = evaluator let cosigns = evaluator
.latest_cosigns .latest_cosigns
.read() .read()
.unwrap() .await
.values() .values()
.map(|cosign| cosign.1) .map(|cosign| cosign.1)
.collect::<Vec<_>>(); .collect::<Vec<_>>();

View file

@ -294,9 +294,10 @@ async fn handle_processor_message<D: Db, P: P2p>(
coordinator::ProcessorMessage::SubstrateShare { id, .. } => { coordinator::ProcessorMessage::SubstrateShare { id, .. } => {
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap()) Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
} }
coordinator::ProcessorMessage::CosignedBlock { block, signature } => { coordinator::ProcessorMessage::CosignedBlock { block_number, block, signature } => {
let cosigned_block = CosignedBlock { let cosigned_block = CosignedBlock {
network, network,
block_number: *block_number,
block: *block, block: *block,
signature: { signature: {
let mut arr = [0; 64]; let mut arr = [0; 64];

View file

@ -43,6 +43,7 @@ const LIBP2P_TOPIC: &str = "serai-coordinator";
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
pub struct CosignedBlock { pub struct CosignedBlock {
pub network: NetworkId, pub network: NetworkId,
pub block_number: u64,
pub block: [u8; 32], pub block: [u8; 32],
pub signature: [u8; 64], pub signature: [u8; 64],
} }

View file

@ -30,7 +30,7 @@ use tokio::{sync::mpsc, time::sleep};
use crate::{ use crate::{
Db, Db,
processors::Processors, processors::Processors,
tributary::{TributarySpec, TributaryDb}, tributary::{TributarySpec, SeraiBlockNumber, TributaryDb},
}; };
mod db; mod db;
@ -456,6 +456,16 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
let maximally_latent_cosign_block = let maximally_latent_cosign_block =
skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE); skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE);
for block in (last_intended_to_cosign_block + 1) ..= latest_number { for block in (last_intended_to_cosign_block + 1) ..= latest_number {
SeraiBlockNumber::set(
&mut txn,
serai
.block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized")
.hash(),
&block,
);
let mut set = false; let mut set = false;
let block_has_events = block_has_events(&mut txn, serai, block).await?; let block_has_events = block_has_events(&mut txn, serai, block).await?;

View file

@ -15,6 +15,12 @@ pub use serai_db::*;
use crate::tributary::TributarySpec; use crate::tributary::TributarySpec;
create_db! {
NewTributaryDb {
SeraiBlockNumber: (hash: [u8; 32]) -> u64
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Topic { pub enum Topic {
Dkg, Dkg,

View file

@ -27,8 +27,8 @@ use serai_db::{Get, Db};
use crate::{ use crate::{
processors::Processors, processors::Processors,
tributary::{ tributary::{
Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, DataSet, Accumulation, Transaction, TributarySpec, SeraiBlockNumber, Topic, DataSpecification, TributaryDb, DataSet,
TributaryState, Accumulation, TributaryState,
nonce_decider::NonceDecider, nonce_decider::NonceDecider,
dkg_confirmer::DkgConfirmer, dkg_confirmer::DkgConfirmer,
scanner::{RecognizedIdType, RIDTrait}, scanner::{RecognizedIdType, RIDTrait},
@ -528,6 +528,8 @@ pub(crate) async fn handle_application_tx<
id: SubstrateSignableId::CosigningSubstrateBlock(hash), id: SubstrateSignableId::CosigningSubstrateBlock(hash),
attempt: 0, attempt: 0,
}, },
block_number: SeraiBlockNumber::get(txn, hash)
.expect("CosignSubstrateBlock yet didn't save Serai block number"),
}, },
) )
.await; .await;

View file

@ -156,10 +156,11 @@ pub mod sign {
pub mod coordinator { pub mod coordinator {
use super::*; use super::*;
pub fn cosign_block_msg(block: [u8; 32]) -> Vec<u8> { pub fn cosign_block_msg(block_number: u64, block: [u8; 32]) -> Vec<u8> {
const DST: &[u8] = b"Cosign"; const DST: &[u8] = b"Cosign";
let mut res = vec![u8::try_from(DST.len()).unwrap()]; let mut res = vec![u8::try_from(DST.len()).unwrap()];
res.extend(DST); res.extend(DST);
res.extend(block_number.to_le_bytes());
res.extend(block); res.extend(block);
res res
} }
@ -181,7 +182,7 @@ pub mod coordinator {
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub enum CoordinatorMessage { pub enum CoordinatorMessage {
CosignSubstrateBlock { id: SubstrateSignId }, CosignSubstrateBlock { id: SubstrateSignId, block_number: u64 },
// Uses Vec<u8> instead of [u8; 64] since serde Deserialize isn't implemented for [u8; 64] // Uses Vec<u8> instead of [u8; 64] since serde Deserialize isn't implemented for [u8; 64]
SubstratePreprocesses { id: SubstrateSignId, preprocesses: HashMap<Participant, Vec<u8>> }, SubstratePreprocesses { id: SubstrateSignId, preprocesses: HashMap<Participant, Vec<u8>> },
SubstrateShares { id: SubstrateSignId, shares: HashMap<Participant, [u8; 32]> }, SubstrateShares { id: SubstrateSignId, shares: HashMap<Participant, [u8; 32]> },
@ -205,7 +206,7 @@ pub mod coordinator {
pub fn key(&self) -> &[u8] { pub fn key(&self) -> &[u8] {
match self { match self {
CoordinatorMessage::CosignSubstrateBlock { id } => &id.key, CoordinatorMessage::CosignSubstrateBlock { id, .. } => &id.key,
CoordinatorMessage::SubstratePreprocesses { id, .. } => &id.key, CoordinatorMessage::SubstratePreprocesses { id, .. } => &id.key,
CoordinatorMessage::SubstrateShares { id, .. } => &id.key, CoordinatorMessage::SubstrateShares { id, .. } => &id.key,
CoordinatorMessage::BatchReattempt { id } => &id.key, CoordinatorMessage::BatchReattempt { id } => &id.key,
@ -226,7 +227,7 @@ pub mod coordinator {
CosignPreprocess { id: SubstrateSignId, preprocesses: Vec<Vec<u8>> }, CosignPreprocess { id: SubstrateSignId, preprocesses: Vec<Vec<u8>> },
BatchPreprocess { id: SubstrateSignId, block: BlockHash, preprocesses: Vec<Vec<u8>> }, BatchPreprocess { id: SubstrateSignId, block: BlockHash, preprocesses: Vec<Vec<u8>> },
SubstrateShare { id: SubstrateSignId, shares: Vec<[u8; 32]> }, SubstrateShare { id: SubstrateSignId, shares: Vec<[u8; 32]> },
CosignedBlock { block: [u8; 32], signature: Vec<u8> }, CosignedBlock { block_number: u64, block: [u8; 32], signature: Vec<u8> },
} }
} }
@ -372,7 +373,7 @@ impl CoordinatorMessage {
CoordinatorMessage::Coordinator(msg) => { CoordinatorMessage::Coordinator(msg) => {
let (sub, id) = match msg { let (sub, id) = match msg {
// Unique since this is the entire message // Unique since this is the entire message
coordinator::CoordinatorMessage::CosignSubstrateBlock { id } => (0, id.encode()), coordinator::CoordinatorMessage::CosignSubstrateBlock { id, .. } => (0, id.encode()),
// Unique since this embeds the batch ID (including its network) and attempt // Unique since this embeds the batch ID (including its network) and attempt
coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. } => (1, id.encode()), coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. } => (1, id.encode()),
coordinator::CoordinatorMessage::SubstrateShares { id, .. } => (2, id.encode()), coordinator::CoordinatorMessage::SubstrateShares { id, .. } => (2, id.encode()),

View file

@ -38,6 +38,7 @@ pub struct Cosigner {
#[allow(dead_code)] // False positive #[allow(dead_code)] // False positive
keys: Vec<ThresholdKeys<Ristretto>>, keys: Vec<ThresholdKeys<Ristretto>>,
block_number: u64,
id: [u8; 32], id: [u8; 32],
attempt: u32, attempt: u32,
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@ -50,6 +51,7 @@ impl fmt::Debug for Cosigner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt fmt
.debug_struct("Cosigner") .debug_struct("Cosigner")
.field("block_number", &self.block_number)
.field("id", &self.id) .field("id", &self.id)
.field("attempt", &self.attempt) .field("attempt", &self.attempt)
.field("preprocessing", &self.preprocessing.is_some()) .field("preprocessing", &self.preprocessing.is_some())
@ -62,6 +64,7 @@ impl Cosigner {
pub fn new( pub fn new(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
keys: Vec<ThresholdKeys<Ristretto>>, keys: Vec<ThresholdKeys<Ristretto>>,
block_number: u64,
id: [u8; 32], id: [u8; 32],
attempt: u32, attempt: u32,
) -> Option<(Cosigner, ProcessorMessage)> { ) -> Option<(Cosigner, ProcessorMessage)> {
@ -104,7 +107,7 @@ impl Cosigner {
}; };
Some(( Some((
Cosigner { keys, id, attempt, preprocessing, signing: None }, Cosigner { keys, block_number, id, attempt, preprocessing, signing: None },
ProcessorMessage::CosignPreprocess { ProcessorMessage::CosignPreprocess {
id: substrate_sign_id, id: substrate_sign_id,
preprocesses: serialized_preprocesses, preprocesses: serialized_preprocesses,
@ -176,7 +179,8 @@ impl Cosigner {
} }
} }
let (machine, share) = match machine.sign(preprocesses, &cosign_block_msg(self.id)) { let (machine, share) =
match machine.sign(preprocesses, &cosign_block_msg(self.block_number, self.id)) {
Ok(res) => res, Ok(res) => res,
Err(e) => match e { Err(e) => match e {
FrostError::InternalError(_) | FrostError::InternalError(_) |
@ -278,7 +282,11 @@ impl Cosigner {
Completed::set(txn, block, &()); Completed::set(txn, block, &());
Some(ProcessorMessage::CosignedBlock { block, signature: sig.to_bytes().to_vec() }) Some(ProcessorMessage::CosignedBlock {
block_number: self.block_number,
block,
signature: sig.to_bytes().to_vec(),
})
} }
CoordinatorMessage::BatchReattempt { .. } => panic!("BatchReattempt passed to Cosigner"), CoordinatorMessage::BatchReattempt { .. } => panic!("BatchReattempt passed to Cosigner"),
} }

View file

@ -253,7 +253,7 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
} }
} else { } else {
match msg { match msg {
CoordinatorCoordinatorMessage::CosignSubstrateBlock { id } => { CoordinatorCoordinatorMessage::CosignSubstrateBlock { id, block_number } => {
let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else { let SubstrateSignableId::CosigningSubstrateBlock(block) = id.id else {
panic!("CosignSubstrateBlock id didn't have a CosigningSubstrateBlock") panic!("CosignSubstrateBlock id didn't have a CosigningSubstrateBlock")
}; };
@ -261,7 +261,8 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
else { else {
panic!("didn't have key shares for the key we were told to cosign with"); panic!("didn't have key shares for the key we were told to cosign with");
}; };
if let Some((cosigner, msg)) = Cosigner::new(txn, keys, block, id.attempt) { if let Some((cosigner, msg)) = Cosigner::new(txn, keys, block_number, block, id.attempt)
{
tributary_mutable.cosigner = Some(cosigner); tributary_mutable.cosigner = Some(cosigner);
coordinator.send(msg).await; coordinator.send(msg).await;
} else { } else {

View file

@ -24,6 +24,7 @@ async fn test_cosigner() {
let participant_one = Participant::new(1).unwrap(); let participant_one = Participant::new(1).unwrap();
let block_number = OsRng.next_u64();
let block = [0xaa; 32]; let block = [0xaa; 32];
let actual_id = SubstrateSignId { let actual_id = SubstrateSignId {
@ -54,7 +55,7 @@ async fn test_cosigner() {
let mut db = MemDb::new(); let mut db = MemDb::new();
let mut txn = db.txn(); let mut txn = db.txn();
let (signer, preprocess) = let (signer, preprocess) =
Cosigner::new(&mut txn, vec![keys], block, actual_id.attempt).unwrap(); Cosigner::new(&mut txn, vec![keys], block_number, block, actual_id.attempt).unwrap();
match preprocess { match preprocess {
// All participants should emit a preprocess // All participants should emit a preprocess
@ -114,10 +115,12 @@ async fn test_cosigner() {
.await .await
.unwrap() .unwrap()
{ {
ProcessorMessage::CosignedBlock { block: signed_block, signature } => { ProcessorMessage::CosignedBlock { block_number, block: signed_block, signature } => {
assert_eq!(signed_block, block); assert_eq!(signed_block, block);
assert!(Public::from_raw(keys[&participant_one].group_key().to_bytes()) assert!(Public::from_raw(keys[&participant_one].group_key().to_bytes()).verify(
.verify(&cosign_block_msg(block), &Signature(signature.try_into().unwrap()))); &cosign_block_msg(block_number, block),
&Signature(signature.try_into().unwrap())
));
} }
_ => panic!("didn't get cosigned block back"), _ => panic!("didn't get cosigned block back"),
} }