Reduce coordinator tests contention re: cosign messages

This commit is contained in:
Luke Parker 2024-03-20 08:23:23 -04:00
parent 4a6496a90b
commit 13b147cbf6
No known key found for this signature in database
2 changed files with 158 additions and 162 deletions

View file

@ -5,7 +5,10 @@ use std::{
time::Duration, time::Duration,
}; };
use tokio::{task::AbortHandle, sync::Mutex as AsyncMutex}; use tokio::{
task::AbortHandle,
sync::{Mutex as AsyncMutex, mpsc},
};
use rand_core::{RngCore, OsRng}; use rand_core::{RngCore, OsRng};
@ -96,7 +99,6 @@ pub struct Handles {
pub(crate) message_queue: String, pub(crate) message_queue: String,
} }
#[derive(Clone)]
pub struct Processor { pub struct Processor {
network: NetworkId, network: NetworkId,
@ -104,7 +106,8 @@ pub struct Processor {
#[allow(unused)] #[allow(unused)]
handles: Handles, handles: Handles,
queue: Arc<AsyncMutex<(u64, u64, MessageQueue)>>, msgs: mpsc::UnboundedReceiver<messages::CoordinatorMessage>,
queue_for_sending: MessageQueue,
abort_handle: Option<Arc<AbortHandle>>, abort_handle: Option<Arc<AbortHandle>>,
substrate_key: Arc<AsyncMutex<Option<Zeroizing<<Ristretto as Ciphersuite>::F>>>>, substrate_key: Arc<AsyncMutex<Option<Zeroizing<<Ristretto as Ciphersuite>::F>>>>,
@ -145,47 +148,50 @@ impl Processor {
// The Serai RPC may or may not be started // The Serai RPC may or may not be started
// Assume it is and continue, so if it's a few seconds late, it's still within tolerance // Assume it is and continue, so if it's a few seconds late, it's still within tolerance
// Create the queue
let mut queue = (
0,
Arc::new(MessageQueue::new(
Service::Processor(network),
message_queue_rpc.clone(),
Zeroizing::new(processor_key),
)),
);
let (msg_send, msg_recv) = mpsc::unbounded_channel();
let substrate_key = Arc::new(AsyncMutex::new(None));
let mut res = Processor { let mut res = Processor {
network, network,
serai_rpc, serai_rpc,
handles, handles,
queue: Arc::new(AsyncMutex::new(( queue_for_sending: MessageQueue::new(
0,
0,
MessageQueue::new(
Service::Processor(network), Service::Processor(network),
message_queue_rpc, message_queue_rpc,
Zeroizing::new(processor_key), Zeroizing::new(processor_key),
), ),
))), msgs: msg_recv,
abort_handle: None, abort_handle: None,
substrate_key: Arc::new(AsyncMutex::new(None)), substrate_key: substrate_key.clone(),
}; };
// Handle any cosigns which come up // Spawn a task to handle cosigns and forward messages as appropriate
res.abort_handle = Some(Arc::new( let abort_handle = tokio::spawn({
tokio::spawn({
let mut res = res.clone();
async move { async move {
loop { loop {
tokio::task::yield_now().await; // Get new messages
let (next_recv_id, queue) = &mut queue;
let msg = { let msg = queue.next(Service::Coordinator).await;
let mut queue_lock = res.queue.lock().await;
let (_, next_recv_id, queue) = &mut *queue_lock;
let Ok(msg) =
tokio::time::timeout(Duration::from_secs(1), queue.next(Service::Coordinator))
.await
else {
continue;
};
assert_eq!(msg.from, Service::Coordinator); assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, *next_recv_id); assert_eq!(msg.id, *next_recv_id);
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
let msg_msg = borsh::from_slice(&msg.msg).unwrap(); let msg_msg = borsh::from_slice(&msg.msg).unwrap();
// Remove any BatchReattempts clogging the pipe // Remove any BatchReattempts clogging the pipe
// TODO: Set up a wrapper around serai-client so we aren't throwing this away yet // TODO: Set up a wrapper around serai-client so we aren't throwing this away yet
// leave it for the tests // leave it for the tests
@ -195,16 +201,26 @@ impl Processor {
messages::coordinator::CoordinatorMessage::BatchReattempt { .. } messages::coordinator::CoordinatorMessage::BatchReattempt { .. }
) )
) { ) {
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
continue; continue;
} }
if !is_cosign_message(&msg_msg) { if !is_cosign_message(&msg_msg) {
msg_send.send(msg_msg).unwrap();
continue; continue;
}; }
queue.ack(Service::Coordinator, msg.id).await; let msg = msg_msg;
*next_recv_id += 1;
msg_msg let send_message = |msg: ProcessorMessage| async move {
queue
.queue(
Metadata {
from: Service::Processor(network),
to: Service::Coordinator,
intent: msg.intent(),
},
borsh::to_vec(&msg).unwrap(),
)
.await;
}; };
struct CurrentCosign { struct CurrentCosign {
@ -219,10 +235,7 @@ impl Processor {
// While technically, each processor should individually track the current cosign, // While technically, each processor should individually track the current cosign,
// this is fine for current testing purposes // this is fine for current testing purposes
CoordinatorMessage::Coordinator( CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { id, block_number },
id,
block_number,
},
) => { ) => {
let SubstrateSignId { let SubstrateSignId {
id: SubstrateSignableId::CosigningSubstrateBlock(block), .. id: SubstrateSignableId::CosigningSubstrateBlock(block), ..
@ -235,11 +248,13 @@ impl Processor {
if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) { if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) {
*current_cosign = Some(new_cosign); *current_cosign = Some(new_cosign);
} }
res send_message(
.send_message(messages::coordinator::ProcessorMessage::CosignPreprocess { messages::coordinator::ProcessorMessage::CosignPreprocess {
id: id.clone(), id: id.clone(),
preprocesses: vec![[raw_i; 64]], preprocesses: vec![[raw_i; 64]],
}) }
.into(),
)
.await; .await;
} }
CoordinatorMessage::Coordinator( CoordinatorMessage::Coordinator(
@ -247,11 +262,13 @@ impl Processor {
) => { ) => {
// TODO: Assert the ID matches CURRENT_COSIGN // TODO: Assert the ID matches CURRENT_COSIGN
// TODO: Verify the received preprocesses // TODO: Verify the received preprocesses
res send_message(
.send_message(messages::coordinator::ProcessorMessage::SubstrateShare { messages::coordinator::ProcessorMessage::SubstrateShare {
id, id,
shares: vec![[raw_i; 32]], shares: vec![[raw_i; 32]],
}) }
.into(),
)
.await; .await;
} }
CoordinatorMessage::Coordinator( CoordinatorMessage::Coordinator(
@ -263,7 +280,7 @@ impl Processor {
let block_number = current_cosign.as_ref().unwrap().block_number; let block_number = current_cosign.as_ref().unwrap().block_number;
let block = current_cosign.as_ref().unwrap().block; let block = current_cosign.as_ref().unwrap().block;
let substrate_key = res.substrate_key.lock().await.clone().unwrap(); let substrate_key = substrate_key.lock().await.clone().unwrap();
// Expand to a key pair as Schnorrkel expects // Expand to a key pair as Schnorrkel expects
// It's the private key + 32-bytes of entropy for nonces + the public key // It's the private key + 32-bytes of entropy for nonces + the public key
@ -280,12 +297,14 @@ impl Processor {
.to_bytes(), .to_bytes(),
); );
res send_message(
.send_message(messages::coordinator::ProcessorMessage::CosignedBlock { messages::coordinator::ProcessorMessage::CosignedBlock {
block_number, block_number,
block, block,
signature: signature.0.to_vec(), signature: signature.0.to_vec(),
}) }
.into(),
)
.await; .await;
} }
_ => panic!("unexpected message passed is_cosign_message"), _ => panic!("unexpected message passed is_cosign_message"),
@ -293,8 +312,9 @@ impl Processor {
} }
} }
}) })
.abort_handle(), .abort_handle();
));
res.abort_handle = Some(Arc::new(abort_handle));
res res
} }
@ -307,9 +327,8 @@ impl Processor {
pub async fn send_message(&mut self, msg: impl Into<ProcessorMessage>) { pub async fn send_message(&mut self, msg: impl Into<ProcessorMessage>) {
let msg: ProcessorMessage = msg.into(); let msg: ProcessorMessage = msg.into();
let mut queue_lock = self.queue.lock().await; self
let (next_send_id, _, queue) = &mut *queue_lock; .queue_for_sending
queue
.queue( .queue(
Metadata { Metadata {
from: Service::Processor(self.network), from: Service::Processor(self.network),
@ -319,36 +338,13 @@ impl Processor {
borsh::to_vec(&msg).unwrap(), borsh::to_vec(&msg).unwrap(),
) )
.await; .await;
*next_send_id += 1;
}
async fn recv_message_inner(&mut self) -> CoordinatorMessage {
loop {
tokio::task::yield_now().await;
let mut queue_lock = self.queue.lock().await;
let (_, next_recv_id, queue) = &mut *queue_lock;
let msg = queue.next(Service::Coordinator).await;
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, *next_recv_id);
// If this is a cosign message, let the cosign task handle it
let msg_msg = borsh::from_slice(&msg.msg).unwrap();
if is_cosign_message(&msg_msg) {
continue;
}
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
return msg_msg;
}
} }
/// Receive a message from the coordinator as a processor. /// Receive a message from the coordinator as a processor.
pub async fn recv_message(&mut self) -> CoordinatorMessage { pub async fn recv_message(&mut self) -> CoordinatorMessage {
// Set a timeout of 20 minutes to allow effectively any protocol to occur without a fear of // Set a timeout of 20 minutes to allow effectively any protocol to occur without a fear of
// an arbitrary timeout cutting it short // an arbitrary timeout cutting it short
tokio::time::timeout(Duration::from_secs(20 * 60), self.recv_message_inner()).await.unwrap() tokio::time::timeout(Duration::from_secs(20 * 60), self.msgs.recv()).await.unwrap().unwrap()
} }
pub async fn set_substrate_key( pub async fn set_substrate_key(

View file

@ -245,7 +245,7 @@ pub async fn batch(
) )
); );
// Send the ack as expected, though it shouldn't trigger any observable behavior // Send the ack as expected
processor processor
.send_message(messages::ProcessorMessage::Coordinator( .send_message(messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::SubstrateBlockAck { messages::coordinator::ProcessorMessage::SubstrateBlockAck {