2023-04-15 21:38:47 +00:00
|
|
|
#![allow(unused_variables)]
|
2023-04-16 07:16:53 +00:00
|
|
|
#![allow(unreachable_code)]
|
|
|
|
#![allow(clippy::diverging_sub_expression)]
|
2023-04-15 21:38:47 +00:00
|
|
|
|
2023-08-25 01:55:59 +00:00
|
|
|
use core::{ops::Deref, future::Future};
|
2023-04-23 07:48:50 +00:00
|
|
|
use std::{
|
|
|
|
sync::Arc,
|
2023-04-23 22:55:43 +00:00
|
|
|
time::{SystemTime, Duration},
|
2023-04-23 07:48:50 +00:00
|
|
|
collections::{VecDeque, HashMap},
|
|
|
|
};
|
2023-04-17 04:50:56 +00:00
|
|
|
|
2023-08-06 16:38:44 +00:00
|
|
|
use zeroize::{Zeroize, Zeroizing};
|
2023-04-25 07:14:42 +00:00
|
|
|
use rand_core::OsRng;
|
2023-04-16 07:16:53 +00:00
|
|
|
|
2023-08-06 16:38:44 +00:00
|
|
|
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
|
2023-04-15 21:38:47 +00:00
|
|
|
|
2023-07-18 05:53:51 +00:00
|
|
|
use serai_db::{DbTxn, Db};
|
|
|
|
use serai_env as env;
|
2023-05-10 04:46:51 +00:00
|
|
|
|
2023-08-24 23:06:22 +00:00
|
|
|
use serai_client::{primitives::NetworkId, Public, Serai};
|
2023-04-15 21:38:47 +00:00
|
|
|
|
2023-07-18 05:53:51 +00:00
|
|
|
use message_queue::{Service, client::MessageQueue};
|
|
|
|
|
2023-08-25 01:55:59 +00:00
|
|
|
use tokio::{sync::RwLock, time::sleep};
|
2023-04-23 07:48:50 +00:00
|
|
|
|
2023-05-09 02:20:51 +00:00
|
|
|
use ::tributary::{
|
Slash malevolent validators (#294)
* add slash tx
* ignore unsigned tx replays
* verify that provided evidence is valid
* fix clippy + fmt
* move application tx handling to another module
* partially handle the tendermint txs
* fix pr comments
* support unsigned app txs
* add slash target to the votes
* enforce provided, unsigned, signed tx ordering within a block
* bug fixes
* add unit test for tendermint txs
* bug fixes
* update tests for tendermint txs
* add tx ordering test
* tidy up tx ordering test
* cargo +nightly fmt
* Misc fixes from rebasing
* Finish resolving clippy
* Remove sha3 from tendermint-machine
* Resolve a DoS in SlashEvidence's read
Also moves Evidence from Vec<Message> to (Message, Option<Message>). That
should meet all requirements while being a bit safer.
* Make lazy_static a dev-depend for tributary
* Various small tweaks
One use of sort was inefficient, sorting unsigned || signed when unsigned was
already properly sorted. Given how the unsigned TXs were given a nonce of 0, an
unstable sort may swap places with an unsigned TX and a signed TX with a nonce
of 0 (leading to a faulty block).
The extra protection added here sorts signed, then concats.
* Fix Tributary tests I broke, start review on tendermint/tx.rs
* Finish reviewing everything outside tests and empty_signature
* Remove empty_signature
empty_signature led to corrupted local state histories. Unfortunately, the API
is only sane with a signature.
We now use the actual signature, which risks creating a signature over a
malicious message if we have ever have an invariant producing malicious
messages. Prior, we only signed the message after the local machine confirmed
it was okay per the local view of consensus.
This is tolerated/preferred over a corrupt state history since production of
such messages is already an invariant. TODOs are added to make handling of this
theoretical invariant further robust.
* Remove async_sequential for tokio::test
There was no competition for resources forcing them to be run sequentially.
* Modify block order test to be statistically significant without multiple runs
* Clean tests
---------
Co-authored-by: Luke Parker <lukeparker5132@gmail.com>
2023-08-21 04:28:23 +00:00
|
|
|
ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary, TributaryReader,
|
2023-05-09 02:20:51 +00:00
|
|
|
};
|
2023-04-17 04:50:56 +00:00
|
|
|
|
2023-04-20 09:05:17 +00:00
|
|
|
mod tributary;
|
2023-05-09 02:20:51 +00:00
|
|
|
#[rustfmt::skip]
|
|
|
|
use crate::tributary::{TributarySpec, SignData, Transaction, TributaryDb, scanner::RecognizedIdType};
|
2023-04-16 04:51:56 +00:00
|
|
|
|
2023-04-23 08:31:00 +00:00
|
|
|
mod db;
|
|
|
|
use db::MainDb;
|
|
|
|
|
2023-04-16 04:51:56 +00:00
|
|
|
mod p2p;
|
|
|
|
pub use p2p::*;
|
|
|
|
|
2023-04-25 07:14:42 +00:00
|
|
|
use processor_messages::{key_gen, sign, coordinator, ProcessorMessage};
|
|
|
|
|
2023-05-10 03:44:41 +00:00
|
|
|
pub mod processors;
|
|
|
|
use processors::Processors;
|
2023-04-17 06:10:33 +00:00
|
|
|
|
2023-04-15 21:38:47 +00:00
|
|
|
mod substrate;
|
2023-04-11 23:04:53 +00:00
|
|
|
|
|
|
|
#[cfg(test)]
|
2023-04-23 02:27:12 +00:00
|
|
|
pub mod tests;
|
2023-04-11 23:04:53 +00:00
|
|
|
|
2023-04-23 07:48:50 +00:00
|
|
|
lazy_static::lazy_static! {
|
2023-08-21 06:36:03 +00:00
|
|
|
// This is a static to satisfy lifetime expectations
|
2023-04-23 22:29:50 +00:00
|
|
|
static ref NEW_TRIBUTARIES: RwLock<VecDeque<TributarySpec>> = RwLock::new(VecDeque::new());
|
2023-04-23 07:48:50 +00:00
|
|
|
}
|
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
pub struct ActiveTributary<D: Db, P: P2p> {
|
2023-04-24 06:50:03 +00:00
|
|
|
pub spec: TributarySpec,
|
|
|
|
pub tributary: Arc<RwLock<Tributary<D, Transaction, P>>>,
|
2023-04-24 03:15:15 +00:00
|
|
|
}
|
|
|
|
|
2023-05-10 05:19:01 +00:00
|
|
|
type Tributaries<D, P> = HashMap<[u8; 32], ActiveTributary<D, P>>;
|
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
// Adds a tributary into the specified HahMap
|
|
|
|
async fn add_tributary<D: Db, P: P2p>(
|
|
|
|
db: D,
|
2023-04-16 07:16:53 +00:00
|
|
|
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
|
|
|
p2p: P,
|
2023-05-10 05:19:01 +00:00
|
|
|
tributaries: &mut Tributaries<D, P>,
|
2023-04-24 03:15:15 +00:00
|
|
|
spec: TributarySpec,
|
2023-04-24 10:50:40 +00:00
|
|
|
) -> TributaryReader<D, Transaction> {
|
2023-08-01 23:00:48 +00:00
|
|
|
log::info!("adding tributary {:?}", spec.set());
|
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
let tributary = Tributary::<_, Transaction, _>::new(
|
2023-04-25 07:14:42 +00:00
|
|
|
// TODO2: Use a db on a distinct volume
|
2023-04-24 03:15:15 +00:00
|
|
|
db,
|
|
|
|
spec.genesis(),
|
|
|
|
spec.start_time(),
|
|
|
|
key,
|
|
|
|
spec.validators(),
|
|
|
|
p2p,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
2023-04-24 10:50:40 +00:00
|
|
|
let reader = tributary.reader();
|
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
tributaries.insert(
|
|
|
|
tributary.genesis(),
|
|
|
|
ActiveTributary { spec, tributary: Arc::new(RwLock::new(tributary)) },
|
|
|
|
);
|
2023-04-24 10:50:40 +00:00
|
|
|
|
|
|
|
reader
|
2023-04-24 03:15:15 +00:00
|
|
|
}
|
|
|
|
|
2023-05-10 03:44:41 +00:00
|
|
|
pub async fn scan_substrate<D: Db, Pro: Processors>(
|
2023-04-24 03:15:15 +00:00
|
|
|
db: D,
|
|
|
|
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
2023-05-10 03:44:41 +00:00
|
|
|
processors: Pro,
|
2023-08-14 10:08:55 +00:00
|
|
|
serai: Arc<Serai>,
|
2023-04-16 07:16:53 +00:00
|
|
|
) {
|
2023-08-01 23:00:48 +00:00
|
|
|
log::info!("scanning substrate");
|
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
let mut db = substrate::SubstrateDb::new(db);
|
2023-08-02 16:18:50 +00:00
|
|
|
let mut next_substrate_block = db.next_block();
|
2023-04-20 09:05:17 +00:00
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
loop {
|
|
|
|
match substrate::handle_new_blocks(
|
|
|
|
&mut db,
|
|
|
|
&key,
|
2023-04-26 04:10:06 +00:00
|
|
|
|db: &mut D, spec: TributarySpec| {
|
2023-08-08 19:12:47 +00:00
|
|
|
log::info!("creating new tributary for {:?}", spec.set());
|
|
|
|
|
2023-04-26 04:10:06 +00:00
|
|
|
// Save it to the database
|
|
|
|
MainDb::new(db).add_active_tributary(&spec);
|
|
|
|
|
|
|
|
// Add it to the queue
|
|
|
|
// If we reboot before this is read from the queue, the fact it was saved to the database
|
|
|
|
// means it'll be handled on reboot
|
|
|
|
async {
|
|
|
|
NEW_TRIBUTARIES.write().await.push_back(spec);
|
|
|
|
}
|
|
|
|
},
|
2023-05-10 03:44:41 +00:00
|
|
|
&processors,
|
2023-04-24 03:15:15 +00:00
|
|
|
&serai,
|
2023-08-02 16:18:50 +00:00
|
|
|
&mut next_substrate_block,
|
2023-04-24 03:15:15 +00:00
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
2023-04-25 07:14:42 +00:00
|
|
|
// TODO2: Should this use a notification system for new blocks?
|
2023-04-24 03:15:15 +00:00
|
|
|
// Right now it's sleeping for half the block time.
|
|
|
|
Ok(()) => sleep(Duration::from_secs(3)).await,
|
|
|
|
Err(e) => {
|
|
|
|
log::error!("couldn't communicate with serai node: {e}");
|
|
|
|
sleep(Duration::from_secs(5)).await;
|
2023-04-20 09:05:17 +00:00
|
|
|
}
|
2023-04-23 07:48:50 +00:00
|
|
|
}
|
2023-04-24 03:15:15 +00:00
|
|
|
}
|
|
|
|
}
|
2023-04-23 22:29:50 +00:00
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
#[allow(clippy::type_complexity)]
|
2023-08-25 01:55:59 +00:00
|
|
|
pub async fn scan_tributaries<
|
|
|
|
D: Db,
|
|
|
|
Pro: Processors,
|
|
|
|
P: P2p,
|
|
|
|
FRid: Future<Output = Vec<[u8; 32]>>,
|
|
|
|
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid,
|
|
|
|
>(
|
2023-04-24 03:15:15 +00:00
|
|
|
raw_db: D,
|
|
|
|
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
2023-08-25 01:55:59 +00:00
|
|
|
recognized_id: RID,
|
2023-04-24 03:15:15 +00:00
|
|
|
p2p: P,
|
2023-05-10 03:44:41 +00:00
|
|
|
processors: Pro,
|
2023-08-14 10:08:55 +00:00
|
|
|
serai: Arc<Serai>,
|
2023-05-10 05:19:01 +00:00
|
|
|
tributaries: Arc<RwLock<Tributaries<D, P>>>,
|
2023-04-24 03:15:15 +00:00
|
|
|
) {
|
2023-08-01 23:00:48 +00:00
|
|
|
log::info!("scanning tributaries");
|
|
|
|
|
2023-04-24 10:50:40 +00:00
|
|
|
let mut tributary_readers = vec![];
|
|
|
|
for ActiveTributary { spec, tributary } in tributaries.read().await.values() {
|
|
|
|
tributary_readers.push((spec.clone(), tributary.read().await.reader()));
|
|
|
|
}
|
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
// Handle new Tributary blocks
|
|
|
|
let mut tributary_db = tributary::TributaryDb::new(raw_db.clone());
|
|
|
|
loop {
|
|
|
|
// The following handle_new_blocks function may take an arbitrary amount of time
|
|
|
|
// Accordingly, it may take a long time to acquire a write lock on the tributaries table
|
|
|
|
// By definition of NEW_TRIBUTARIES, we allow tributaries to be added almost immediately,
|
|
|
|
// meaning the Substrate scanner won't become blocked on this
|
|
|
|
{
|
|
|
|
let mut new_tributaries = NEW_TRIBUTARIES.write().await;
|
|
|
|
while let Some(spec) = new_tributaries.pop_front() {
|
2023-04-24 10:50:40 +00:00
|
|
|
let reader = add_tributary(
|
2023-04-24 03:15:15 +00:00
|
|
|
raw_db.clone(),
|
|
|
|
key.clone(),
|
|
|
|
p2p.clone(),
|
|
|
|
// This is a short-lived write acquisition, which is why it should be fine
|
|
|
|
&mut *tributaries.write().await,
|
2023-04-24 10:50:40 +00:00
|
|
|
spec.clone(),
|
2023-04-24 03:15:15 +00:00
|
|
|
)
|
|
|
|
.await;
|
2023-04-24 10:50:40 +00:00
|
|
|
|
|
|
|
tributary_readers.push((spec, reader));
|
2023-04-24 03:15:15 +00:00
|
|
|
}
|
2023-04-23 07:48:50 +00:00
|
|
|
}
|
|
|
|
|
2023-04-24 10:50:40 +00:00
|
|
|
for (spec, reader) in &tributary_readers {
|
2023-08-25 01:55:59 +00:00
|
|
|
tributary::scanner::handle_new_blocks::<_, _, _, _, _, _, P>(
|
2023-04-24 03:15:15 +00:00
|
|
|
&mut tributary_db,
|
|
|
|
&key,
|
2023-08-25 01:55:59 +00:00
|
|
|
recognized_id.clone(),
|
2023-05-10 03:44:41 +00:00
|
|
|
&processors,
|
2023-08-14 10:08:55 +00:00
|
|
|
|set, tx| {
|
|
|
|
let serai = serai.clone();
|
|
|
|
async move {
|
|
|
|
loop {
|
|
|
|
match serai.publish(&tx).await {
|
2023-08-14 12:18:19 +00:00
|
|
|
Ok(_) => {
|
|
|
|
log::info!("set key pair for {set:?}");
|
2023-08-14 10:53:20 +00:00
|
|
|
break;
|
2023-08-14 10:08:55 +00:00
|
|
|
}
|
|
|
|
// This is assumed to be some ephemeral error due to the assumed fault-free
|
|
|
|
// creation
|
2023-08-14 10:53:20 +00:00
|
|
|
// TODO: Differentiate connection errors from invariants
|
2023-08-14 10:08:55 +00:00
|
|
|
Err(e) => {
|
2023-08-14 10:53:20 +00:00
|
|
|
// Check if this failed because the keys were already set by someone else
|
|
|
|
if matches!(serai.get_keys(spec.set()).await, Ok(Some(_))) {
|
|
|
|
log::info!("other party set key pair for {:?}", set);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
log::error!("couldn't connect to Serai node to publish set_keys TX: {:?}", e);
|
2023-08-14 10:08:55 +00:00
|
|
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
2023-04-23 20:56:23 +00:00
|
|
|
spec,
|
2023-04-24 10:50:40 +00:00
|
|
|
reader,
|
2023-04-23 20:56:23 +00:00
|
|
|
)
|
|
|
|
.await;
|
2023-04-23 08:31:00 +00:00
|
|
|
}
|
2023-04-23 07:48:50 +00:00
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
// Sleep for half the block time
|
2023-04-25 07:14:42 +00:00
|
|
|
// TODO2: Should we define a notification system for when a new block occurs?
|
2023-04-24 03:15:15 +00:00
|
|
|
sleep(Duration::from_secs((Tributary::<D, Transaction, P>::block_time() / 2).into())).await;
|
|
|
|
}
|
|
|
|
}
|
2023-04-23 22:55:43 +00:00
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
pub async fn heartbeat_tributaries<D: Db, P: P2p>(
|
|
|
|
p2p: P,
|
2023-05-10 05:19:01 +00:00
|
|
|
tributaries: Arc<RwLock<Tributaries<D, P>>>,
|
2023-04-24 03:15:15 +00:00
|
|
|
) {
|
|
|
|
let ten_blocks_of_time =
|
2023-04-24 06:50:03 +00:00
|
|
|
Duration::from_secs((10 * Tributary::<D, Transaction, P>::block_time()).into());
|
2023-04-24 03:15:15 +00:00
|
|
|
|
|
|
|
loop {
|
|
|
|
for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() {
|
|
|
|
let tributary = tributary.read().await;
|
|
|
|
let tip = tributary.tip().await;
|
2023-04-24 10:50:40 +00:00
|
|
|
let block_time = SystemTime::UNIX_EPOCH +
|
|
|
|
Duration::from_secs(tributary.reader().time_of_block(&tip).unwrap_or(0));
|
2023-04-24 03:15:15 +00:00
|
|
|
|
|
|
|
// Only trigger syncing if the block is more than a minute behind
|
|
|
|
if SystemTime::now() > (block_time + Duration::from_secs(60)) {
|
|
|
|
log::warn!("last known tributary block was over a minute ago");
|
2023-08-08 19:12:47 +00:00
|
|
|
let mut msg = tip.to_vec();
|
|
|
|
// Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating
|
|
|
|
let timestamp = SystemTime::now()
|
|
|
|
.duration_since(SystemTime::UNIX_EPOCH)
|
|
|
|
.expect("system clock is wrong")
|
|
|
|
.as_secs();
|
|
|
|
// Divide by the block time so if multiple parties send a Heartbeat, they're more likely to
|
|
|
|
// overlap
|
|
|
|
let time_unit = timestamp / u64::from(Tributary::<D, Transaction, P>::block_time());
|
|
|
|
msg.extend(time_unit.to_le_bytes());
|
|
|
|
P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await;
|
2023-04-24 03:15:15 +00:00
|
|
|
}
|
2023-04-23 20:56:23 +00:00
|
|
|
}
|
2023-04-23 07:48:50 +00:00
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
// Only check once every 10 blocks of time
|
|
|
|
sleep(ten_blocks_of_time).await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn handle_p2p<D: Db, P: P2p>(
|
2023-04-24 04:53:15 +00:00
|
|
|
our_key: <Ristretto as Ciphersuite>::G,
|
2023-04-24 03:15:15 +00:00
|
|
|
p2p: P,
|
2023-05-10 05:19:01 +00:00
|
|
|
tributaries: Arc<RwLock<Tributaries<D, P>>>,
|
2023-04-24 03:15:15 +00:00
|
|
|
) {
|
|
|
|
loop {
|
2023-04-24 04:53:15 +00:00
|
|
|
let mut msg = p2p.receive().await;
|
2023-04-24 03:15:15 +00:00
|
|
|
match msg.kind {
|
2023-08-21 06:36:03 +00:00
|
|
|
P2pMessageKind::KeepAlive => {}
|
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
P2pMessageKind::Tributary(genesis) => {
|
2023-04-24 10:50:40 +00:00
|
|
|
let tributaries = tributaries.read().await;
|
|
|
|
let Some(tributary) = tributaries.get(&genesis) else {
|
2023-04-24 04:53:15 +00:00
|
|
|
log::debug!("received p2p message for unknown network");
|
|
|
|
continue;
|
|
|
|
};
|
2023-04-24 03:15:15 +00:00
|
|
|
|
2023-08-08 19:12:47 +00:00
|
|
|
log::trace!("handling message for tributary {:?}", tributary.spec.set());
|
2023-04-24 06:50:03 +00:00
|
|
|
if tributary.tributary.write().await.handle_message(&msg.msg).await {
|
2023-04-24 03:15:15 +00:00
|
|
|
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
|
2023-04-23 20:56:23 +00:00
|
|
|
}
|
2023-04-24 03:15:15 +00:00
|
|
|
}
|
|
|
|
|
2023-08-08 19:12:47 +00:00
|
|
|
// TODO2: Rate limit this per timestamp
|
2023-04-24 04:53:15 +00:00
|
|
|
P2pMessageKind::Heartbeat(genesis) => {
|
2023-08-08 19:12:47 +00:00
|
|
|
if msg.msg.len() != 40 {
|
2023-04-24 04:53:15 +00:00
|
|
|
log::error!("validator sent invalid heartbeat");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2023-05-09 02:20:51 +00:00
|
|
|
let tributaries = tributaries.read().await;
|
|
|
|
let Some(tributary) = tributaries.get(&genesis) else {
|
|
|
|
log::debug!("received heartbeat message for unknown network");
|
|
|
|
continue;
|
|
|
|
};
|
2023-04-24 04:53:15 +00:00
|
|
|
let tributary_read = tributary.tributary.read().await;
|
|
|
|
|
2023-04-24 06:50:03 +00:00
|
|
|
/*
|
2023-04-24 04:53:15 +00:00
|
|
|
// Have sqrt(n) nodes reply with the blocks
|
|
|
|
let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64;
|
|
|
|
// Try to have at least 3 responders
|
|
|
|
if responders < 3 {
|
|
|
|
responders = tributary.spec.n().min(3).into();
|
|
|
|
}
|
2023-04-24 06:50:03 +00:00
|
|
|
*/
|
|
|
|
|
|
|
|
// Have up to three nodes respond
|
|
|
|
let responders = u64::from(tributary.spec.n().min(3));
|
2023-04-24 04:53:15 +00:00
|
|
|
|
2023-04-24 06:50:03 +00:00
|
|
|
// Decide which nodes will respond by using the latest block's hash as a mutually agreed
|
|
|
|
// upon entropy source
|
2023-08-08 19:12:47 +00:00
|
|
|
// This isn't a secure source of entropy, yet it's fine for this
|
2023-04-24 04:53:15 +00:00
|
|
|
let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap());
|
|
|
|
// If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9)
|
|
|
|
// entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7
|
|
|
|
let start =
|
|
|
|
usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)).unwrap();
|
|
|
|
let mut selected = false;
|
|
|
|
for validator in
|
|
|
|
&tributary.spec.validators()[start .. (start + usize::try_from(responders).unwrap())]
|
|
|
|
{
|
|
|
|
if our_key == validator.0 {
|
|
|
|
selected = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !selected {
|
2023-04-24 06:50:03 +00:00
|
|
|
log::debug!("received heartbeat and not selected to respond");
|
2023-04-24 04:53:15 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2023-04-24 06:50:03 +00:00
|
|
|
log::debug!("received heartbeat and selected to respond");
|
|
|
|
|
2023-04-24 10:50:40 +00:00
|
|
|
let reader = tributary_read.reader();
|
|
|
|
drop(tributary_read);
|
|
|
|
|
2023-08-08 19:12:47 +00:00
|
|
|
let mut latest = msg.msg[.. 32].try_into().unwrap();
|
2023-04-24 10:50:40 +00:00
|
|
|
while let Some(next) = reader.block_after(&latest) {
|
|
|
|
let mut res = reader.block(&next).unwrap().serialize();
|
|
|
|
res.extend(reader.commit(&next).unwrap());
|
2023-08-08 19:12:47 +00:00
|
|
|
// Also include the timestamp used within the Heartbeat
|
|
|
|
res.extend(&msg.msg[32 .. 40]);
|
2023-04-24 04:53:15 +00:00
|
|
|
p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await;
|
|
|
|
latest = next;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
P2pMessageKind::Block(genesis) => {
|
|
|
|
let mut msg_ref: &[u8] = msg.msg.as_ref();
|
|
|
|
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
|
|
|
|
log::error!("received block message with an invalidly serialized block");
|
|
|
|
continue;
|
|
|
|
};
|
|
|
|
// Get just the commit
|
2023-04-24 06:50:03 +00:00
|
|
|
msg.msg.drain(.. (msg.msg.len() - msg_ref.len()));
|
2023-08-08 19:12:47 +00:00
|
|
|
msg.msg.drain((msg.msg.len() - 8) ..);
|
2023-04-24 04:53:15 +00:00
|
|
|
|
2023-05-09 20:58:53 +00:00
|
|
|
// Spawn a dedicated task to add this block, as it may take a notable amount of time
|
|
|
|
// While we could use a long-lived task to add each block, that task would only add one
|
|
|
|
// block at a time *across all tributaries*
|
|
|
|
// We either need:
|
|
|
|
// 1) One task per tributary
|
|
|
|
// 2) Background tasks
|
|
|
|
// 3) For sync_block to return instead of waiting for provided transactions which are
|
|
|
|
// missing
|
|
|
|
// sync_block waiting is preferable since we know the block is valid by its commit, meaning
|
|
|
|
// we are the node behind
|
2023-05-10 04:46:51 +00:00
|
|
|
// As for 1/2, 1 may be preferable since this message may frequently occur
|
|
|
|
// This is suitably performant, as tokio HTTP servers will even spawn a new task per
|
|
|
|
// connection
|
|
|
|
// In order to reduce congestion though, we should at least check if we take value from
|
|
|
|
// this message before running spawn
|
2023-07-18 05:53:51 +00:00
|
|
|
// TODO2
|
2023-05-09 20:58:53 +00:00
|
|
|
tokio::spawn({
|
|
|
|
let tributaries = tributaries.clone();
|
|
|
|
async move {
|
|
|
|
let tributaries = tributaries.read().await;
|
|
|
|
let Some(tributary) = tributaries.get(&genesis) else {
|
|
|
|
log::debug!("received block message for unknown network");
|
|
|
|
return;
|
|
|
|
};
|
|
|
|
|
2023-08-08 19:12:47 +00:00
|
|
|
// TODO: Add a check which doesn't require write to see if this is the next block in
|
|
|
|
// line
|
|
|
|
// If it's in the future, hold it for up to T time
|
|
|
|
|
2023-05-09 20:58:53 +00:00
|
|
|
let res = tributary.tributary.write().await.sync_block(block, msg.msg).await;
|
|
|
|
log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res);
|
|
|
|
}
|
|
|
|
});
|
2023-04-24 04:53:15 +00:00
|
|
|
}
|
2023-04-23 20:56:23 +00:00
|
|
|
}
|
2023-04-20 09:05:17 +00:00
|
|
|
}
|
2023-04-24 03:15:15 +00:00
|
|
|
}
|
|
|
|
|
2023-05-09 02:20:51 +00:00
|
|
|
pub async fn publish_transaction<D: Db, P: P2p>(
|
|
|
|
tributary: &Tributary<D, Transaction, P>,
|
|
|
|
tx: Transaction,
|
|
|
|
) {
|
2023-08-13 06:21:56 +00:00
|
|
|
log::debug!("publishing transaction {}", hex::encode(tx.hash()));
|
2023-05-09 02:20:51 +00:00
|
|
|
if let TransactionKind::Signed(signed) = tx.kind() {
|
|
|
|
if tributary
|
|
|
|
.next_nonce(signed.signer)
|
|
|
|
.await
|
|
|
|
.expect("we don't have a nonce, meaning we aren't a participant on this tributary") >
|
|
|
|
signed.nonce
|
|
|
|
{
|
|
|
|
log::warn!("we've already published this transaction. this should only appear on reboot");
|
|
|
|
} else {
|
|
|
|
// We should've created a valid transaction
|
|
|
|
assert!(tributary.add_transaction(tx).await, "created an invalid transaction");
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
panic!("non-signed transaction passed to publish_transaction");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-05-10 03:44:41 +00:00
|
|
|
pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
|
2023-05-09 02:20:51 +00:00
|
|
|
mut db: D,
|
2023-04-25 07:14:42 +00:00
|
|
|
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
2023-08-14 10:08:55 +00:00
|
|
|
serai: Arc<Serai>,
|
2023-05-10 03:44:41 +00:00
|
|
|
mut processors: Pro,
|
2023-05-10 05:19:01 +00:00
|
|
|
tributaries: Arc<RwLock<Tributaries<D, P>>>,
|
2023-04-25 07:14:42 +00:00
|
|
|
) {
|
|
|
|
let pub_key = Ristretto::generator() * key.deref();
|
|
|
|
|
|
|
|
loop {
|
2023-05-10 03:44:41 +00:00
|
|
|
let msg = processors.recv().await;
|
2023-04-25 07:14:42 +00:00
|
|
|
|
2023-05-10 03:51:05 +00:00
|
|
|
// TODO2: This is slow, and only works as long as a network only has a single Tributary
|
|
|
|
// (which means there's a lack of multisig rotation)
|
2023-08-14 10:08:55 +00:00
|
|
|
let spec = {
|
|
|
|
let mut spec = None;
|
2023-05-10 03:51:05 +00:00
|
|
|
for tributary in tributaries.read().await.values() {
|
|
|
|
if tributary.spec.set().network == msg.network {
|
2023-08-14 10:08:55 +00:00
|
|
|
spec = Some(tributary.spec.clone());
|
2023-05-10 03:51:05 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2023-08-14 10:08:55 +00:00
|
|
|
spec.unwrap()
|
2023-05-10 03:51:05 +00:00
|
|
|
};
|
2023-04-25 07:14:42 +00:00
|
|
|
|
2023-08-14 10:08:55 +00:00
|
|
|
let genesis = spec.genesis();
|
|
|
|
// TODO: We probably want to NOP here, not panic?
|
|
|
|
let my_i = spec.i(pub_key).expect("processor message for network we aren't a validator in");
|
|
|
|
|
2023-08-13 06:21:56 +00:00
|
|
|
let tx = match msg.msg.clone() {
|
2023-05-10 04:46:51 +00:00
|
|
|
ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
|
2023-04-25 07:14:42 +00:00
|
|
|
key_gen::ProcessorMessage::Commitments { id, commitments } => {
|
|
|
|
Some(Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed()))
|
|
|
|
}
|
|
|
|
key_gen::ProcessorMessage::Shares { id, shares } => {
|
2023-08-14 10:08:55 +00:00
|
|
|
// Create a MuSig-based machine to inform Substrate of this key generation
|
|
|
|
// DkgConfirmer has a TODO noting it's only secure for a single usage, yet this ensures
|
|
|
|
// the TODO is resolved before unsafe usage
|
|
|
|
if id.attempt != 0 {
|
|
|
|
panic!("attempt wasn't 0");
|
|
|
|
}
|
Slash malevolent validators (#294)
* add slash tx
* ignore unsigned tx replays
* verify that provided evidence is valid
* fix clippy + fmt
* move application tx handling to another module
* partially handle the tendermint txs
* fix pr comments
* support unsigned app txs
* add slash target to the votes
* enforce provided, unsigned, signed tx ordering within a block
* bug fixes
* add unit test for tendermint txs
* bug fixes
* update tests for tendermint txs
* add tx ordering test
* tidy up tx ordering test
* cargo +nightly fmt
* Misc fixes from rebasing
* Finish resolving clippy
* Remove sha3 from tendermint-machine
* Resolve a DoS in SlashEvidence's read
Also moves Evidence from Vec<Message> to (Message, Option<Message>). That
should meet all requirements while being a bit safer.
* Make lazy_static a dev-depend for tributary
* Various small tweaks
One use of sort was inefficient, sorting unsigned || signed when unsigned was
already properly sorted. Given how the unsigned TXs were given a nonce of 0, an
unstable sort may swap places with an unsigned TX and a signed TX with a nonce
of 0 (leading to a faulty block).
The extra protection added here sorts signed, then concats.
* Fix Tributary tests I broke, start review on tendermint/tx.rs
* Finish reviewing everything outside tests and empty_signature
* Remove empty_signature
empty_signature led to corrupted local state histories. Unfortunately, the API
is only sane with a signature.
We now use the actual signature, which risks creating a signature over a
malicious message if we have ever have an invariant producing malicious
messages. Prior, we only signed the message after the local machine confirmed
it was okay per the local view of consensus.
This is tolerated/preferred over a corrupt state history since production of
such messages is already an invariant. TODOs are added to make handling of this
theoretical invariant further robust.
* Remove async_sequential for tokio::test
There was no competition for resources forcing them to be run sequentially.
* Modify block order test to be statistically significant without multiple runs
* Clean tests
---------
Co-authored-by: Luke Parker <lukeparker5132@gmail.com>
2023-08-21 04:28:23 +00:00
|
|
|
let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec);
|
2023-08-14 10:08:55 +00:00
|
|
|
Some(Transaction::DkgShares {
|
|
|
|
attempt: id.attempt,
|
|
|
|
sender_i: my_i,
|
|
|
|
shares,
|
|
|
|
confirmation_nonces: nonces,
|
|
|
|
signed: Transaction::empty_signed(),
|
|
|
|
})
|
2023-04-25 07:14:42 +00:00
|
|
|
}
|
2023-07-30 20:11:30 +00:00
|
|
|
key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => {
|
2023-05-10 04:46:51 +00:00
|
|
|
assert_eq!(
|
|
|
|
id.set.network, msg.network,
|
2023-07-18 05:53:51 +00:00
|
|
|
"processor claimed to be a different network than it was for GeneratedKeyPair",
|
2023-05-10 04:46:51 +00:00
|
|
|
);
|
|
|
|
// TODO: Also check the other KeyGenId fields
|
|
|
|
|
2023-08-14 10:08:55 +00:00
|
|
|
// Tell the Tributary the key pair, get back the share for the MuSig signature
|
|
|
|
let mut txn = db.txn();
|
Slash malevolent validators (#294)
* add slash tx
* ignore unsigned tx replays
* verify that provided evidence is valid
* fix clippy + fmt
* move application tx handling to another module
* partially handle the tendermint txs
* fix pr comments
* support unsigned app txs
* add slash target to the votes
* enforce provided, unsigned, signed tx ordering within a block
* bug fixes
* add unit test for tendermint txs
* bug fixes
* update tests for tendermint txs
* add tx ordering test
* tidy up tx ordering test
* cargo +nightly fmt
* Misc fixes from rebasing
* Finish resolving clippy
* Remove sha3 from tendermint-machine
* Resolve a DoS in SlashEvidence's read
Also moves Evidence from Vec<Message> to (Message, Option<Message>). That
should meet all requirements while being a bit safer.
* Make lazy_static a dev-depend for tributary
* Various small tweaks
One use of sort was inefficient, sorting unsigned || signed when unsigned was
already properly sorted. Given how the unsigned TXs were given a nonce of 0, an
unstable sort may swap places with an unsigned TX and a signed TX with a nonce
of 0 (leading to a faulty block).
The extra protection added here sorts signed, then concats.
* Fix Tributary tests I broke, start review on tendermint/tx.rs
* Finish reviewing everything outside tests and empty_signature
* Remove empty_signature
empty_signature led to corrupted local state histories. Unfortunately, the API
is only sane with a signature.
We now use the actual signature, which risks creating a signature over a
malicious message if we have ever have an invariant producing malicious
messages. Prior, we only signed the message after the local machine confirmed
it was okay per the local view of consensus.
This is tolerated/preferred over a corrupt state history since production of
such messages is already an invariant. TODOs are added to make handling of this
theoretical invariant further robust.
* Remove async_sequential for tokio::test
There was no competition for resources forcing them to be run sequentially.
* Modify block order test to be statistically significant without multiple runs
* Clean tests
---------
Co-authored-by: Luke Parker <lukeparker5132@gmail.com>
2023-08-21 04:28:23 +00:00
|
|
|
let share = crate::tributary::generated_key_pair::<D>(
|
2023-08-14 10:08:55 +00:00
|
|
|
&mut txn,
|
|
|
|
&key,
|
|
|
|
&spec,
|
|
|
|
&(Public(substrate_key), network_key.try_into().unwrap()),
|
2023-05-14 02:43:13 +00:00
|
|
|
);
|
2023-08-14 10:08:55 +00:00
|
|
|
txn.commit();
|
2023-05-10 04:46:51 +00:00
|
|
|
|
2023-08-14 10:08:55 +00:00
|
|
|
match share {
|
|
|
|
Ok(share) => {
|
|
|
|
Some(Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed()))
|
2023-05-10 04:46:51 +00:00
|
|
|
}
|
2023-08-14 10:08:55 +00:00
|
|
|
Err(p) => todo!("participant {p:?} sent invalid DKG confirmation preprocesses"),
|
2023-05-10 04:46:51 +00:00
|
|
|
}
|
|
|
|
}
|
2023-04-25 07:14:42 +00:00
|
|
|
},
|
|
|
|
ProcessorMessage::Sign(msg) => match msg {
|
|
|
|
sign::ProcessorMessage::Preprocess { id, preprocess } => {
|
2023-05-09 02:20:51 +00:00
|
|
|
if id.attempt == 0 {
|
|
|
|
let mut txn = db.txn();
|
|
|
|
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
|
|
|
|
txn.commit();
|
|
|
|
|
|
|
|
None
|
|
|
|
} else {
|
|
|
|
Some(Transaction::SignPreprocess(SignData {
|
|
|
|
plan: id.id,
|
|
|
|
attempt: id.attempt,
|
|
|
|
data: preprocess,
|
|
|
|
signed: Transaction::empty_signed(),
|
|
|
|
}))
|
|
|
|
}
|
2023-04-25 07:14:42 +00:00
|
|
|
}
|
|
|
|
sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData {
|
|
|
|
plan: id.id,
|
|
|
|
attempt: id.attempt,
|
|
|
|
data: share,
|
|
|
|
signed: Transaction::empty_signed(),
|
|
|
|
})),
|
|
|
|
// TODO
|
|
|
|
sign::ProcessorMessage::Completed { .. } => todo!(),
|
|
|
|
},
|
2023-05-10 03:44:41 +00:00
|
|
|
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
|
|
|
|
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
|
|
|
|
assert_eq!(
|
2023-05-10 04:46:51 +00:00
|
|
|
network, msg.network,
|
|
|
|
"processor claimed to be a different network than it was for SubstrateBlockAck",
|
2023-05-10 03:44:41 +00:00
|
|
|
);
|
2023-05-09 02:20:51 +00:00
|
|
|
|
|
|
|
// Safe to use its own txn since this is static and just needs to be written before we
|
|
|
|
// provide SubstrateBlock
|
|
|
|
let mut txn = db.txn();
|
|
|
|
TributaryDb::<D>::set_plan_ids(&mut txn, genesis, block, &plans);
|
|
|
|
txn.commit();
|
|
|
|
|
|
|
|
Some(Transaction::SubstrateBlock(block))
|
|
|
|
}
|
2023-08-14 15:57:38 +00:00
|
|
|
coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => {
|
2023-08-25 01:55:59 +00:00
|
|
|
log::info!(
|
|
|
|
"informed of batch (sign ID {}, attempt {}) for block {}",
|
|
|
|
hex::encode(id.id),
|
|
|
|
id.attempt,
|
|
|
|
hex::encode(block),
|
|
|
|
);
|
2023-05-09 02:20:51 +00:00
|
|
|
// If this is the first attempt instance, synchronize around the block first
|
|
|
|
if id.attempt == 0 {
|
|
|
|
// Save the preprocess to disk so we can publish it later
|
|
|
|
// This is fine to use its own TX since it's static and just needs to be written
|
|
|
|
// before this message finishes it handling (or with this message's finished handling)
|
|
|
|
let mut txn = db.txn();
|
|
|
|
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
|
2023-08-24 23:06:22 +00:00
|
|
|
MainDb::<D>::add_batch_to_block(&mut txn, msg.network, block, id.id);
|
2023-05-09 02:20:51 +00:00
|
|
|
txn.commit();
|
|
|
|
|
2023-08-14 15:57:38 +00:00
|
|
|
// TODO: This will publish one ExternalBlock per Batch. We should only publish one per
|
|
|
|
// all batches within a block
|
|
|
|
Some(Transaction::ExternalBlock(block.0))
|
2023-05-09 02:20:51 +00:00
|
|
|
} else {
|
|
|
|
Some(Transaction::BatchPreprocess(SignData {
|
|
|
|
plan: id.id,
|
|
|
|
attempt: id.attempt,
|
|
|
|
data: preprocess,
|
|
|
|
signed: Transaction::empty_signed(),
|
|
|
|
}))
|
|
|
|
}
|
2023-04-25 07:14:42 +00:00
|
|
|
}
|
|
|
|
coordinator::ProcessorMessage::BatchShare { id, share } => {
|
|
|
|
Some(Transaction::BatchShare(SignData {
|
|
|
|
plan: id.id,
|
|
|
|
attempt: id.attempt,
|
|
|
|
data: share.to_vec(),
|
|
|
|
signed: Transaction::empty_signed(),
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
},
|
2023-05-10 05:45:42 +00:00
|
|
|
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
|
2023-08-25 00:30:50 +00:00
|
|
|
processor_messages::substrate::ProcessorMessage::Update { batch } => {
|
2023-05-10 05:45:42 +00:00
|
|
|
assert_eq!(
|
|
|
|
batch.batch.network, msg.network,
|
|
|
|
"processor sent us a batch for a different network than it was for",
|
|
|
|
);
|
|
|
|
// TODO: Check this key's key pair's substrate key is authorized to publish batches
|
|
|
|
// TODO: Check the batch ID is an atomic increment
|
|
|
|
|
2023-08-14 10:53:20 +00:00
|
|
|
let tx = Serai::execute_batch(batch.clone());
|
2023-07-18 05:53:51 +00:00
|
|
|
loop {
|
2023-08-14 10:53:20 +00:00
|
|
|
match serai.publish(&tx).await {
|
2023-08-14 12:18:19 +00:00
|
|
|
Ok(_) => {
|
2023-07-18 05:53:51 +00:00
|
|
|
log::info!(
|
2023-08-14 12:18:19 +00:00
|
|
|
"executed batch {:?} {} (block {})",
|
2023-07-18 05:53:51 +00:00
|
|
|
batch.batch.network,
|
|
|
|
batch.batch.id,
|
|
|
|
hex::encode(batch.batch.block),
|
|
|
|
);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
Err(e) => {
|
2023-08-14 10:53:20 +00:00
|
|
|
// TODO: Check if this failed because the batch was already published by someone
|
|
|
|
// else
|
2023-07-18 05:53:51 +00:00
|
|
|
log::error!("couldn't connect to Serai node to publish batch TX: {:?}", e);
|
|
|
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
|
|
|
}
|
2023-05-10 05:45:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
None
|
|
|
|
}
|
2023-04-25 07:14:42 +00:00
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
// If this created a transaction, publish it
|
|
|
|
if let Some(mut tx) = tx {
|
|
|
|
let tributaries = tributaries.read().await;
|
|
|
|
let Some(tributary) = tributaries.get(&genesis) else {
|
2023-04-25 19:05:58 +00:00
|
|
|
// TODO: This can happen since Substrate tells the Processor to generate commitments
|
|
|
|
// at the same time it tells the Tributary to be created
|
|
|
|
// There's no guarantee the Tributary will have been created though
|
|
|
|
panic!("processor is operating on tributary we don't have");
|
|
|
|
};
|
2023-04-25 07:14:42 +00:00
|
|
|
let tributary = tributary.tributary.read().await;
|
|
|
|
|
2023-05-09 02:20:51 +00:00
|
|
|
match tx.kind() {
|
|
|
|
TransactionKind::Provided(_) => {
|
|
|
|
let res = tributary.provide_transaction(tx).await;
|
|
|
|
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
|
|
|
|
panic!("provided an invalid transaction: {res:?}");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
TransactionKind::Signed(_) => {
|
|
|
|
// Get the next nonce
|
2023-08-13 06:21:56 +00:00
|
|
|
// TODO: This should be deterministic, not just DB-backed, to allow rebuilding validators
|
|
|
|
// without the prior instance's DB
|
2023-05-09 02:20:51 +00:00
|
|
|
// let mut txn = db.txn();
|
|
|
|
// let nonce = MainDb::tx_nonce(&mut txn, msg.id, tributary);
|
|
|
|
|
2023-08-13 06:21:56 +00:00
|
|
|
// TODO: This isn't deterministic, or at least DB-backed, and accordingly is unsafe
|
|
|
|
log::trace!("getting next nonce for Tributary TX in response to processor message");
|
|
|
|
let nonce = tributary
|
|
|
|
.next_nonce(Ristretto::generator() * key.deref())
|
|
|
|
.await
|
|
|
|
.expect("publishing a TX to a tributary we aren't in");
|
2023-05-09 02:20:51 +00:00
|
|
|
tx.sign(&mut OsRng, genesis, &key, nonce);
|
|
|
|
|
|
|
|
publish_transaction(&tributary, tx).await;
|
|
|
|
|
|
|
|
// txn.commit();
|
|
|
|
}
|
|
|
|
_ => panic!("created an unexpected transaction"),
|
|
|
|
}
|
2023-04-25 07:14:42 +00:00
|
|
|
}
|
2023-08-13 06:21:56 +00:00
|
|
|
|
|
|
|
processors.ack(msg).await;
|
2023-04-25 07:14:42 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-05-10 03:44:41 +00:00
|
|
|
pub async fn run<D: Db, Pro: Processors, P: P2p>(
|
2023-04-26 04:10:06 +00:00
|
|
|
mut raw_db: D,
|
2023-04-24 03:15:15 +00:00
|
|
|
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
|
|
|
p2p: P,
|
2023-05-10 03:44:41 +00:00
|
|
|
processors: Pro,
|
2023-04-24 03:15:15 +00:00
|
|
|
serai: Serai,
|
|
|
|
) {
|
2023-08-14 10:08:55 +00:00
|
|
|
let serai = Arc::new(serai);
|
|
|
|
|
2023-04-24 03:15:15 +00:00
|
|
|
// Handle new Substrate blocks
|
2023-05-10 03:44:41 +00:00
|
|
|
tokio::spawn(scan_substrate(raw_db.clone(), key.clone(), processors.clone(), serai.clone()));
|
2023-04-24 03:15:15 +00:00
|
|
|
|
|
|
|
// Handle the Tributaries
|
|
|
|
|
|
|
|
// Arc so this can be shared between the Tributary scanner task and the P2P task
|
|
|
|
// Write locks on this may take a while to acquire
|
|
|
|
let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary<D, P>>::new()));
|
|
|
|
|
|
|
|
// Reload active tributaries from the database
|
2023-04-26 04:10:06 +00:00
|
|
|
for spec in MainDb::new(&mut raw_db).active_tributaries().1 {
|
2023-04-24 10:50:40 +00:00
|
|
|
let _ = add_tributary(
|
|
|
|
raw_db.clone(),
|
|
|
|
key.clone(),
|
|
|
|
p2p.clone(),
|
|
|
|
&mut *tributaries.write().await,
|
|
|
|
spec,
|
|
|
|
)
|
|
|
|
.await;
|
2023-04-24 03:15:15 +00:00
|
|
|
}
|
|
|
|
|
2023-08-25 01:55:59 +00:00
|
|
|
// When we reach synchrony on an event requiring signing, send our preprocess for it
|
|
|
|
let recognized_id = {
|
2023-05-09 02:20:51 +00:00
|
|
|
let raw_db = raw_db.clone();
|
|
|
|
let key = key.clone();
|
|
|
|
let tributaries = tributaries.clone();
|
2023-08-25 01:55:59 +00:00
|
|
|
move |network, genesis, id_type, id| {
|
|
|
|
let raw_db = raw_db.clone();
|
|
|
|
let key = key.clone();
|
|
|
|
let tributaries = tributaries.clone();
|
|
|
|
async move {
|
2023-08-27 01:09:57 +00:00
|
|
|
// SubstrateBlockAck is fired before Preprocess, creating a race between Tributary ack
|
|
|
|
// of the SubstrateBlock and the sending of all Preprocesses
|
|
|
|
// A similar race condition exists when multiple Batches are present in a block
|
|
|
|
// This waits until the necessary preprocess is available
|
|
|
|
let get_preprocess = |raw_db, id| async move {
|
|
|
|
loop {
|
|
|
|
let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, id) else {
|
|
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
continue;
|
|
|
|
};
|
|
|
|
return preprocess;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2023-08-25 01:55:59 +00:00
|
|
|
let (ids, txs) = match id_type {
|
|
|
|
RecognizedIdType::Block => {
|
|
|
|
let block = id;
|
|
|
|
|
|
|
|
let ids = MainDb::<D>::batches_in_block(&raw_db, network, block);
|
|
|
|
let mut txs = vec![];
|
|
|
|
for id in &ids {
|
|
|
|
txs.push(Transaction::BatchPreprocess(SignData {
|
|
|
|
plan: *id,
|
|
|
|
attempt: 0,
|
2023-08-27 01:09:57 +00:00
|
|
|
data: get_preprocess(&raw_db, *id).await,
|
2023-08-25 01:55:59 +00:00
|
|
|
signed: Transaction::empty_signed(),
|
|
|
|
}));
|
2023-08-24 23:06:22 +00:00
|
|
|
}
|
2023-08-25 01:55:59 +00:00
|
|
|
(ids, txs)
|
|
|
|
}
|
2023-05-09 02:20:51 +00:00
|
|
|
|
2023-08-25 01:55:59 +00:00
|
|
|
RecognizedIdType::Plan => (
|
|
|
|
vec![id],
|
|
|
|
vec![Transaction::SignPreprocess(SignData {
|
2023-05-09 02:20:51 +00:00
|
|
|
plan: id,
|
|
|
|
attempt: 0,
|
2023-08-27 01:09:57 +00:00
|
|
|
data: get_preprocess(&raw_db, id).await,
|
2023-05-09 02:20:51 +00:00
|
|
|
signed: Transaction::empty_signed(),
|
2023-08-24 23:06:22 +00:00
|
|
|
})],
|
2023-08-25 01:55:59 +00:00
|
|
|
),
|
|
|
|
};
|
|
|
|
|
|
|
|
let tributaries = tributaries.read().await;
|
|
|
|
let Some(tributary) = tributaries.get(&genesis) else {
|
|
|
|
panic!("tributary we don't have came to consensus on an ExternalBlock");
|
|
|
|
};
|
|
|
|
let tributary = tributary.tributary.read().await;
|
|
|
|
|
|
|
|
for mut tx in txs {
|
|
|
|
// TODO: Same note as prior nonce acquisition
|
|
|
|
log::trace!("getting next nonce for Tributary TX containing Batch signing data");
|
|
|
|
let nonce = tributary
|
|
|
|
.next_nonce(Ristretto::generator() * key.deref())
|
|
|
|
.await
|
|
|
|
.expect("publishing a TX to a tributary we aren't in");
|
|
|
|
tx.sign(&mut OsRng, genesis, &key, nonce);
|
|
|
|
|
|
|
|
publish_transaction(&tributary, tx).await;
|
2023-05-09 02:20:51 +00:00
|
|
|
}
|
2023-08-25 01:55:59 +00:00
|
|
|
|
|
|
|
ids
|
2023-05-09 02:20:51 +00:00
|
|
|
}
|
|
|
|
}
|
2023-08-25 01:55:59 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
// Handle new blocks for each Tributary
|
|
|
|
{
|
|
|
|
let raw_db = raw_db.clone();
|
|
|
|
tokio::spawn(scan_tributaries(
|
|
|
|
raw_db,
|
|
|
|
key.clone(),
|
|
|
|
recognized_id,
|
|
|
|
p2p.clone(),
|
|
|
|
processors.clone(),
|
|
|
|
serai.clone(),
|
|
|
|
tributaries.clone(),
|
|
|
|
));
|
|
|
|
}
|
2023-04-24 03:15:15 +00:00
|
|
|
|
|
|
|
// Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block
|
|
|
|
// in a while (presumably because we're behind)
|
|
|
|
tokio::spawn(heartbeat_tributaries(p2p.clone(), tributaries.clone()));
|
|
|
|
|
|
|
|
// Handle P2P messages
|
2023-04-25 07:14:42 +00:00
|
|
|
tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries.clone()));
|
2023-04-15 21:38:47 +00:00
|
|
|
|
2023-04-25 07:14:42 +00:00
|
|
|
// Handle all messages from processors
|
2023-05-10 04:46:51 +00:00
|
|
|
handle_processors(raw_db, key, serai, processors, tributaries).await;
|
2023-04-15 21:38:47 +00:00
|
|
|
}
|
|
|
|
|
2023-04-11 13:21:35 +00:00
|
|
|
#[tokio::main]
|
2023-04-15 21:38:47 +00:00
|
|
|
async fn main() {
|
2023-08-13 08:30:49 +00:00
|
|
|
// Override the panic handler with one which will panic if any tokio task panics
|
|
|
|
{
|
|
|
|
let existing = std::panic::take_hook();
|
|
|
|
std::panic::set_hook(Box::new(move |panic| {
|
|
|
|
existing(panic);
|
|
|
|
const MSG: &str = "exiting the process due to a task panicking";
|
|
|
|
println!("{MSG}");
|
|
|
|
log::error!("{MSG}");
|
|
|
|
std::process::exit(1);
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
2023-08-01 23:00:48 +00:00
|
|
|
if std::env::var("RUST_LOG").is_err() {
|
|
|
|
std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string()));
|
|
|
|
}
|
|
|
|
env_logger::init();
|
|
|
|
|
|
|
|
log::info!("starting coordinator service...");
|
|
|
|
|
2023-07-26 01:39:29 +00:00
|
|
|
let db = serai_db::new_rocksdb(&env::var("DB_PATH").expect("path to DB wasn't specified"));
|
2023-04-17 06:10:33 +00:00
|
|
|
|
2023-08-06 16:38:44 +00:00
|
|
|
let key = {
|
|
|
|
let mut key_hex = serai_env::var("SERAI_KEY").expect("Serai key wasn't provided");
|
|
|
|
let mut key_vec = hex::decode(&key_hex).map_err(|_| ()).expect("Serai key wasn't hex-encoded");
|
|
|
|
key_hex.zeroize();
|
|
|
|
if key_vec.len() != 32 {
|
|
|
|
key_vec.zeroize();
|
|
|
|
panic!("Serai key had an invalid length");
|
|
|
|
}
|
|
|
|
let mut key_bytes = [0; 32];
|
|
|
|
key_bytes.copy_from_slice(&key_vec);
|
|
|
|
key_vec.zeroize();
|
|
|
|
let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::from_repr(key_bytes).unwrap());
|
|
|
|
key_bytes.zeroize();
|
|
|
|
key
|
|
|
|
};
|
2023-08-08 19:12:47 +00:00
|
|
|
let p2p = LibP2p::new();
|
2023-04-17 06:10:33 +00:00
|
|
|
|
2023-07-21 18:00:03 +00:00
|
|
|
let processors = Arc::new(MessageQueue::from_env(Service::Coordinator));
|
2023-04-17 06:10:33 +00:00
|
|
|
|
2023-04-16 04:51:56 +00:00
|
|
|
let serai = || async {
|
|
|
|
loop {
|
2023-08-06 16:38:44 +00:00
|
|
|
let Ok(serai) = Serai::new(&format!(
|
2023-08-01 23:00:48 +00:00
|
|
|
"ws://{}:9944",
|
|
|
|
serai_env::var("SERAI_HOSTNAME").expect("Serai hostname wasn't provided")
|
2023-08-06 16:38:44 +00:00
|
|
|
))
|
2023-08-01 23:00:48 +00:00
|
|
|
.await
|
|
|
|
else {
|
2023-04-16 04:51:56 +00:00
|
|
|
log::error!("couldn't connect to the Serai node");
|
2023-04-17 06:10:33 +00:00
|
|
|
sleep(Duration::from_secs(5)).await;
|
2023-08-01 04:47:36 +00:00
|
|
|
continue;
|
2023-04-16 04:51:56 +00:00
|
|
|
};
|
2023-08-01 23:00:48 +00:00
|
|
|
log::info!("made initial connection to Serai node");
|
2023-04-16 04:51:56 +00:00
|
|
|
return serai;
|
|
|
|
}
|
|
|
|
};
|
2023-05-10 03:44:41 +00:00
|
|
|
run(db, key, p2p, processors, serai().await).await
|
2023-04-15 21:38:47 +00:00
|
|
|
}
|