Start handling P2P messages

This defines the tart of a very complex series of locks I'm really unhappy
with. At the same time, there's not immediately a better solution. This also
should work without issue.
This commit is contained in:
Luke Parker 2023-04-23 16:56:23 -04:00
parent f2d9d70068
commit ad5522d854
No known key found for this signature in database
12 changed files with 199 additions and 95 deletions

1
Cargo.lock generated
View file

@ -1309,6 +1309,7 @@ dependencies = [
"blake2",
"ciphersuite",
"flexible-transcript",
"futures",
"lazy_static",
"log",
"modular-frost",

View file

@ -42,4 +42,5 @@ log = "0.4"
tokio = { version = "1", features = ["full"] }
[dev-dependencies]
futures = "0.3"
tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] }

View file

@ -52,16 +52,19 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
serai: Serai,
) {
let add_new_tributary = |db, spec: TributarySpec| async {
// Save it to the database
MainDb(db).add_active_tributary(&spec);
// Add it to the queue
// If we reboot before this is read from the queue, the fact it was saved to the database
// means it'll be handled on reboot
NEW_TRIBUTARIES.write().await.push_back(spec);
};
// Handle new Substrate blocks
{
let mut substrate_db = substrate::SubstrateDb::new(raw_db.clone());
let mut last_substrate_block = substrate_db.last_block();
let p2p = p2p.clone();
let key = key.clone();
let mut processor = processor.clone();
tokio::spawn(async move {
@ -70,7 +73,6 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
&mut substrate_db,
&key,
add_new_tributary,
&p2p,
&mut processor,
&serai,
&mut last_substrate_block,
@ -87,15 +89,14 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
});
}
// Handle the Tributaries
{
struct ActiveTributary<D: Db, P: P2p> {
spec: TributarySpec,
tributary: Tributary<D, Transaction, P>,
}
let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary<D, P>>::new()));
let mut tributaries = HashMap::<[u8; 32], ActiveTributary<D, P>>::new();
// TODO: Use a db on a distinct volume
async fn add_tributary<D: Db, P: P2p>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
@ -104,6 +105,7 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
spec: TributarySpec,
) {
let tributary = Tributary::<_, Transaction, _>::new(
// TODO: Use a db on a distinct volume
db,
spec.genesis(),
spec.start_time(),
@ -117,40 +119,85 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
tributaries.insert(tributary.genesis(), ActiveTributary { spec, tributary });
}
// Reload active tributaries from the database
// TODO: Can MainDb take a borrow?
for spec in MainDb(raw_db.clone()).active_tributaries().1 {
add_tributary(raw_db.clone(), key.clone(), p2p.clone(), &mut tributaries, spec).await;
add_tributary(
raw_db.clone(),
key.clone(),
p2p.clone(),
&mut *tributaries.write().await,
spec,
)
.await;
}
// Handle new Tributary blocks
let mut tributary_db = tributary::TributaryDb::new(raw_db.clone());
tokio::spawn(async move {
loop {
// The following handle_new_blocks function may take an arbitrary amount of time
// If registering a new tributary waited for a lock on the tributaries table, the substrate
// scanner may wait on a lock for an arbitrary amount of time
// By instead using the distinct NEW_TRIBUTARIES, there should be minimal
// competition/blocking
{
let mut new_tributaries = NEW_TRIBUTARIES.write().await;
while let Some(spec) = new_tributaries.pop_front() {
add_tributary(raw_db.clone(), key.clone(), p2p.clone(), &mut tributaries, spec).await;
{
let tributaries = tributaries.clone();
let p2p = p2p.clone();
tokio::spawn(async move {
loop {
// The following handle_new_blocks function may take an arbitrary amount of time
// If registering a new tributary waited for a lock on the tributaries table, the
// substrate scanner may wait on a lock for an arbitrary amount of time
// By instead using the distinct NEW_TRIBUTARIES, there should be minimal
// competition/blocking
{
let mut new_tributaries = NEW_TRIBUTARIES.write().await;
while let Some(spec) = new_tributaries.pop_front() {
add_tributary(
raw_db.clone(),
key.clone(),
p2p.clone(),
// This is a short-lived write acquisition, which is why it should be fine
&mut *tributaries.write().await,
spec,
)
.await;
}
}
// Unknown-length read acquisition. This would risk screwing over the P2P process EXCEPT
// they both use read locks. Accordingly, they can co-exist
for ActiveTributary { spec, tributary } in tributaries.read().await.values() {
tributary::scanner::handle_new_blocks::<_, _, P>(
&mut tributary_db,
&key,
&mut processor,
spec,
tributary,
)
.await;
}
sleep(Duration::from_secs(3)).await;
}
});
}
// Handle P2P messages
{
tokio::spawn(async move {
loop {
let msg = p2p.receive().await;
match msg.kind {
P2pMessageKind::Tributary(genesis) => {
let tributaries_read = tributaries.read().await;
let Some(tributary) = tributaries_read.get(&genesis) else {
log::debug!("received p2p message for unknown network");
continue;
};
if tributary.tributary.handle_message(&msg.msg).await {
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
}
}
}
}
for ActiveTributary { spec, tributary } in tributaries.values() {
tributary::scanner::handle_new_blocks::<_, _, P>(
&mut tributary_db,
&key,
&mut processor,
spec,
tributary,
)
.await;
}
sleep(Duration::from_secs(3)).await;
}
});
});
}
}
loop {

View file

@ -1,6 +1,7 @@
use core::fmt::Debug;
use std::{
sync::{Arc, RwLock},
io::Read,
collections::VecDeque,
};
@ -10,33 +11,81 @@ pub use tributary::P2p as TributaryP2p;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind {
Tributary,
Tributary([u8; 32]),
}
impl P2pMessageKind {
fn to_byte(self) -> u8 {
fn serialize(&self) -> Vec<u8> {
match self {
P2pMessageKind::Tributary => 0,
P2pMessageKind::Tributary(genesis) => {
let mut res = vec![0];
res.extend(genesis);
res
}
}
}
fn from_byte(byte: u8) -> Option<P2pMessageKind> {
match byte {
0 => Some(P2pMessageKind::Tributary),
fn read<R: Read>(reader: &mut R) -> Option<P2pMessageKind> {
let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Tributary(genesis)
}),
_ => None,
}
}
}
// TODO
#[async_trait]
pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p {
async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>);
async fn receive(&self) -> Option<(P2pMessageKind, Vec<u8>)>;
#[derive(Clone, Debug)]
pub struct Message<P: P2p> {
pub sender: P::Id,
pub kind: P2pMessageKind,
pub msg: Vec<u8>,
}
#[async_trait]
pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p {
type Id: Send + Sync + Clone + Debug;
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>);
async fn broadcast_raw(&self, msg: Vec<u8>);
async fn receive_raw(&self) -> (Self::Id, Vec<u8>);
async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec<u8>) {
let mut actual_msg = kind.serialize();
actual_msg.extend(msg);
self.send_raw(to, actual_msg).await;
}
async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>) {
let mut actual_msg = kind.serialize();
actual_msg.extend(msg);
self.broadcast_raw(actual_msg).await;
}
async fn receive(&self) -> Message<Self> {
let (sender, kind, msg) = loop {
let (sender, msg) = self.receive_raw().await;
if msg.is_empty() {
log::error!("empty p2p message from {sender:?}");
continue;
}
let mut msg_ref = msg.as_ref();
let Some(kind) = P2pMessageKind::read::<&[u8]>(&mut msg_ref) else {
log::error!("invalid p2p message kind from {sender:?}");
continue;
};
break (sender, kind, msg_ref.to_vec());
};
Message { sender, kind, msg }
}
}
#[allow(clippy::type_complexity)]
#[derive(Clone, Debug)]
pub struct LocalP2p(usize, Arc<RwLock<Vec<VecDeque<Vec<u8>>>>>);
pub struct LocalP2p(usize, Arc<RwLock<Vec<VecDeque<(usize, Vec<u8>)>>>>);
impl LocalP2p {
pub fn new(validators: usize) -> Vec<LocalP2p> {
@ -51,29 +100,35 @@ impl LocalP2p {
#[async_trait]
impl P2p for LocalP2p {
async fn broadcast(&self, kind: P2pMessageKind, mut msg: Vec<u8>) {
msg.insert(0, kind.to_byte());
type Id = usize;
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
self.1.write().unwrap()[to].push_back((self.0, msg));
}
async fn broadcast_raw(&self, msg: Vec<u8>) {
for (i, msg_queue) in self.1.write().unwrap().iter_mut().enumerate() {
if i == self.0 {
continue;
}
msg_queue.push_back(msg.clone());
msg_queue.push_back((self.0, msg.clone()));
}
}
async fn receive(&self) -> Option<(P2pMessageKind, Vec<u8>)> {
let mut msg = self.1.write().unwrap()[self.0].pop_front()?;
if msg.is_empty() {
log::error!("empty p2p message");
return None;
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
// This is a cursed way to implement an async read from a Vec
loop {
if let Some(res) = self.1.write().unwrap()[self.0].pop_front() {
return res;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Some((P2pMessageKind::from_byte(msg.remove(0))?, msg))
}
}
#[async_trait]
impl TributaryP2p for LocalP2p {
async fn broadcast(&self, msg: Vec<u8>) {
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary, msg).await
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
}
}

View file

@ -21,7 +21,7 @@ use serai_db::DbTxn;
use processor_messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage};
use crate::{Db, P2p, processor::Processor, tributary::TributarySpec};
use crate::{Db, processor::Processor, tributary::TributarySpec};
mod db;
pub use db::*;
@ -63,7 +63,6 @@ async fn handle_new_set<
// We already have a unique event ID based on block, event index (where event index is
// the one generated in this handle_block function)
// We could use that on this end and the processor end?
// TODO: Should this be handled in the Tributary code?
processor
.send(CoordinatorMessage::KeyGen(
processor_messages::key_gen::CoordinatorMessage::GenerateKey {
@ -212,12 +211,10 @@ async fn handle_block<
Fut: Future<Output = ()>,
ANT: Clone + Fn(D, TributarySpec) -> Fut,
Pro: Processor,
P: P2p,
>(
db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
add_new_tributary: ANT,
p2p: &P,
processor: &mut Pro,
serai: &Serai,
block: Block,
@ -235,7 +232,6 @@ async fn handle_block<
// stable)
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if let ValidatorSetsEvent::NewSet { set } = new_set {
// TODO2: Use a DB on a dedicated volume
handle_new_set(&db.0, key, add_new_tributary.clone(), processor, serai, &block, set)
.await?;
} else {
@ -283,12 +279,10 @@ pub async fn handle_new_blocks<
Fut: Future<Output = ()>,
ANT: Clone + Fn(D, TributarySpec) -> Fut,
Pro: Processor,
P: P2p,
>(
db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
add_new_tributary: ANT,
p2p: &P,
processor: &mut Pro,
serai: &Serai,
last_block: &mut u64,
@ -306,7 +300,6 @@ pub async fn handle_new_blocks<
db,
key,
add_new_tributary.clone(),
p2p,
processor,
serai,
if b == latest_number {

View file

@ -1,8 +1,8 @@
use std::time::{Duration, SystemTime};
use zeroize::Zeroizing;
use rand_core::{RngCore, CryptoRng, OsRng};
use futures::{task::Poll, poll};
use ciphersuite::{
group::{ff::Field, GroupEncoding},
@ -95,11 +95,12 @@ pub async fn run_tributaries(
) {
loop {
for (p2p, tributary) in tributaries.iter_mut() {
while let Some(msg) = p2p.receive().await {
match msg.0 {
P2pMessageKind::Tributary => {
if tributary.handle_message(&msg.1).await {
p2p.broadcast(msg.0, msg.1).await;
while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind {
P2pMessageKind::Tributary(genesis) => {
assert_eq!(genesis, tributary.genesis());
if tributary.handle_message(&msg.msg).await {
p2p.broadcast(msg.kind, msg.msg).await;
}
}
}
@ -163,10 +164,11 @@ async fn tributary_test() {
let timeout = SystemTime::now() + Duration::from_secs(65);
while (blocks < 10) && (SystemTime::now().duration_since(timeout).is_err()) {
for (p2p, tributary) in tributaries.iter_mut() {
while let Some(msg) = p2p.receive().await {
match msg.0 {
P2pMessageKind::Tributary => {
tributary.handle_message(&msg.1).await;
while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind {
P2pMessageKind::Tributary(genesis) => {
assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await;
}
}
}
@ -187,10 +189,11 @@ async fn tributary_test() {
// Handle all existing messages
for (p2p, tributary) in tributaries.iter_mut() {
while let Some(msg) = p2p.receive().await {
match msg.0 {
P2pMessageKind::Tributary => {
tributary.handle_message(&msg.1).await;
while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind {
P2pMessageKind::Tributary(genesis) => {
assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await;
}
}
}

View file

@ -30,7 +30,7 @@ async fn dkg_test() {
let keys = new_keys(&mut OsRng);
let spec = new_spec(&mut OsRng, &keys);
let mut tributaries = new_tributaries(&keys, &spec).await;
let tributaries = new_tributaries(&keys, &spec).await;
// Run the tributaries in the background
tokio::spawn(run_tributaries(tributaries.clone()));

View file

@ -19,7 +19,7 @@ async fn tx_test() {
let keys = new_keys(&mut OsRng);
let spec = new_spec(&mut OsRng, &keys);
let mut tributaries = new_tributaries(&keys, &spec).await;
let tributaries = new_tributaries(&keys, &spec).await;
// Run the tributaries in the background
tokio::spawn(run_tributaries(tributaries.clone()));

View file

@ -84,10 +84,6 @@ impl<D: Db, T: Transaction> Blockchain<D, T> {
res
}
pub(crate) fn genesis(&self) -> [u8; 32] {
self.genesis
}
pub(crate) fn tip(&self) -> [u8; 32] {
self.tip
}

View file

@ -20,6 +20,8 @@ use ::tendermint::{
use serai_db::Db;
use tokio::sync::RwLock as AsyncRwLock;
mod merkle;
pub(crate) use merkle::*;
@ -72,22 +74,23 @@ pub trait ReadWrite: Sized {
#[async_trait]
pub trait P2p: 'static + Send + Sync + Clone + Debug {
async fn broadcast(&self, msg: Vec<u8>);
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>);
}
#[async_trait]
impl<P: P2p> P2p for Arc<P> {
async fn broadcast(&self, msg: Vec<u8>) {
(*self).broadcast(msg).await
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
(*self).broadcast(genesis, msg).await
}
}
#[derive(Clone)]
pub struct Tributary<D: Db, T: Transaction, P: P2p> {
genesis: [u8; 32],
network: TendermintNetwork<D, T, P>,
synced_block: SyncedBlockSender<TendermintNetwork<D, T, P>>,
messages: MessageSender<TendermintNetwork<D, T, P>>,
messages: Arc<AsyncRwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
}
impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
@ -121,7 +124,7 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
tokio::task::spawn(machine.run());
Some(Self { network, synced_block, messages })
Some(Self { genesis, network, synced_block, messages: Arc::new(AsyncRwLock::new(messages)) })
}
pub fn block_time() -> u32 {
@ -129,7 +132,7 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
}
pub fn genesis(&self) -> [u8; 32] {
self.network.blockchain.read().unwrap().genesis()
self.genesis
}
pub fn block_number(&self) -> u32 {
self.network.blockchain.read().unwrap().block_number()
@ -153,12 +156,14 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
}
// Returns if the transaction was valid.
pub async fn add_transaction(&mut self, tx: T) -> bool {
// Safe to be &self since the only meaningful usage of self is self.network.blockchain which
// successfully acquires its own write lock.
pub async fn add_transaction(&self, tx: T) -> bool {
let mut to_broadcast = vec![TRANSACTION_MESSAGE];
tx.write(&mut to_broadcast).unwrap();
let res = self.network.blockchain.write().unwrap().add_transaction(true, tx);
if res {
self.network.p2p.broadcast(to_broadcast).await;
self.network.p2p.broadcast(self.genesis, to_broadcast).await;
}
res
}
@ -189,7 +194,9 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
}
// Return true if the message should be rebroadcasted.
pub async fn handle_message(&mut self, msg: &[u8]) -> bool {
// Safe to be &self since the only usage of self is on self.network.blockchain and self.messages,
// both which successfully acquire their own write locks and don't rely on each other
pub async fn handle_message(&self, msg: &[u8]) -> bool {
match msg.first() {
Some(&TRANSACTION_MESSAGE) => {
let Ok(tx) = T::read::<&[u8]>(&mut &msg[1 ..]) else {
@ -212,7 +219,7 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
return false;
};
self.messages.send(msg).await.unwrap();
self.messages.write().await.send(msg).await.unwrap();
false
}

View file

@ -256,7 +256,7 @@ impl<D: Db, T: Transaction, P: P2p> Network for TendermintNetwork<D, T, P> {
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
to_broadcast.extend(msg.encode());
self.p2p.broadcast(to_broadcast).await
self.p2p.broadcast(self.genesis, to_broadcast).await
}
async fn slash(&mut self, validator: Self::ValidatorId) {
// TODO: Handle this slash

View file

@ -263,6 +263,7 @@ pub trait Network: Send + Sync {
/// Trigger a slash for the validator in question who was definitively malicious.
///
/// The exact process of triggering a slash is undefined and left to the network as a whole.
// TODO: We need to provide some evidence for this.
async fn slash(&mut self, validator: Self::ValidatorId);
/// Validate a block.