diff --git a/processor/bin/src/coordinator.rs b/processor/bin/src/coordinator.rs index d9d8d112..12442c3d 100644 --- a/processor/bin/src/coordinator.rs +++ b/processor/bin/src/coordinator.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{LazyLock, Arc, Mutex}; use tokio::sync::mpsc; @@ -9,8 +9,8 @@ use serai_client::{ in_instructions::primitives::{Batch, SignedBatch}, }; -use serai_env as env; use serai_db::{Get, DbTxn, Db, create_db, db_channel}; +use serai_env as env; use message_queue::{Service, Metadata, client::MessageQueue}; create_db! { @@ -21,27 +21,47 @@ create_db! { db_channel! { ProcessorBinCoordinator { - CoordinatorMessages: () -> Vec + ReceivedCoordinatorMessages: () -> Vec, } } -async fn send(service: Service, queue: &MessageQueue, msg: messages::ProcessorMessage) { - let metadata = Metadata { from: service, to: Service::Coordinator, intent: msg.intent() }; - let msg = borsh::to_vec(&msg).unwrap(); - queue.queue(metadata, msg).await; +// A lock to access SentCoordinatorMessages::send +static SEND_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); + +db_channel! { + ProcessorBinCoordinator { + SentCoordinatorMessages: () -> Vec, + } +} + +#[derive(Clone)] +pub(crate) struct CoordinatorSend { + db: crate::Db, + sent_message: mpsc::UnboundedSender<()>, +} + +impl CoordinatorSend { + fn send(&mut self, msg: &messages::ProcessorMessage) { + let _lock = SEND_LOCK.lock().unwrap(); + let mut txn = self.db.txn(); + SentCoordinatorMessages::send(&mut txn, &borsh::to_vec(msg).unwrap()); + txn.commit(); + self + .sent_message + .send(()) + .expect("failed to tell the Coordinator tasks there's a new message to send"); + } } pub(crate) struct Coordinator { - new_message: mpsc::UnboundedReceiver<()>, - service: Service, - message_queue: Arc, + received_message: mpsc::UnboundedReceiver<()>, + send: CoordinatorSend, } -pub(crate) struct CoordinatorSend(Service, Arc); - impl Coordinator { - pub(crate) fn new(mut db: crate::Db) -> Self { - let (new_message_send, new_message_recv) = mpsc::unbounded_channel(); + pub(crate) fn new(db: crate::Db) -> Self { + let (received_message_send, received_message_recv) = mpsc::unbounded_channel(); + let (sent_message_send, mut sent_message_recv) = mpsc::unbounded_channel(); let network_id = match env::var("NETWORK").expect("network wasn't specified").as_str() { "bitcoin" => NetworkId::Bitcoin, @@ -55,6 +75,7 @@ impl Coordinator { // Spawn a task to move messages from the message-queue to our database so we can achieve // atomicity. This is the only place we read/ack messages from tokio::spawn({ + let mut db = db.clone(); let message_queue = message_queue.clone(); async move { loop { @@ -70,7 +91,7 @@ impl Coordinator { assert!((saved_messages == prior_msg) || (saved_messages == Some(msg.id))); if saved_messages < Some(msg.id) { let mut txn = db.txn(); - CoordinatorMessages::send(&mut txn, &msg.msg); + ReceivedCoordinatorMessages::send(&mut txn, &msg.msg); SavedMessages::set(&mut txn, &msg.id); txn.commit(); } @@ -78,16 +99,45 @@ impl Coordinator { message_queue.ack(Service::Coordinator, msg.id).await; // Fire that there's a new message - new_message_send.send(()).expect("failed to tell the Coordinator there's a new message"); + received_message_send + .send(()) + .expect("failed to tell the Coordinator there's a new message"); } } }); - Coordinator { new_message: new_message_recv, service, message_queue } + // Spawn a task to send messages to the message-queue + tokio::spawn({ + let mut db = db.clone(); + async move { + loop { + let mut txn = db.txn(); + match SentCoordinatorMessages::try_recv(&mut txn) { + Some(msg) => { + let metadata = Metadata { + from: service, + to: Service::Coordinator, + intent: borsh::from_slice::(&msg).unwrap().intent(), + }; + message_queue.queue(metadata, msg).await; + txn.commit(); + } + None => { + let _ = + tokio::time::timeout(core::time::Duration::from_secs(60), sent_message_recv.recv()) + .await; + } + } + } + } + }); + + let send = CoordinatorSend { db, sent_message: sent_message_send }; + Coordinator { received_message: received_message_recv, send } } pub(crate) fn coordinator_send(&self) -> CoordinatorSend { - CoordinatorSend(self.service, self.message_queue.clone()) + self.send.clone() } /// Fetch the next message from the Coordinator. @@ -99,23 +149,22 @@ impl Coordinator { txn: &mut impl DbTxn, ) -> messages::CoordinatorMessage { loop { - match CoordinatorMessages::try_recv(txn) { + match ReceivedCoordinatorMessages::try_recv(txn) { Some(msg) => { return borsh::from_slice(&msg) .expect("message wasn't a borsh-encoded CoordinatorMessage") } None => { let _ = - tokio::time::timeout(core::time::Duration::from_secs(60), self.new_message.recv()) + tokio::time::timeout(core::time::Duration::from_secs(60), self.received_message.recv()) .await; } } } } - #[allow(clippy::unused_async)] - pub(crate) async fn send_message(&mut self, msg: messages::ProcessorMessage) { - send(self.service, &self.message_queue, msg).await + pub(crate) fn send_message(&mut self, msg: &messages::ProcessorMessage) { + self.send.send(msg); } } @@ -127,8 +176,7 @@ impl signers::Coordinator for CoordinatorSend { &mut self, msg: messages::sign::ProcessorMessage, ) -> Result<(), Self::EphemeralError> { - // TODO: Use a fallible send for these methods - send(self.0, &self.1, messages::ProcessorMessage::Sign(msg)).await; + self.send(&messages::ProcessorMessage::Sign(msg)); Ok(()) } @@ -138,40 +186,27 @@ impl signers::Coordinator for CoordinatorSend { block: [u8; 32], signature: Signature, ) -> Result<(), Self::EphemeralError> { - send( - self.0, - &self.1, - messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::CosignedBlock { - block_number, - block, - signature: signature.encode(), - }, - ), - ) - .await; + self.send(&messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::CosignedBlock { + block_number, + block, + signature: signature.encode(), + }, + )); Ok(()) } async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError> { - send( - self.0, - &self.1, - messages::ProcessorMessage::Substrate(messages::substrate::ProcessorMessage::Batch { batch }), - ) - .await; + self.send(&messages::ProcessorMessage::Substrate( + messages::substrate::ProcessorMessage::Batch { batch }, + )); Ok(()) } async fn publish_signed_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError> { - send( - self.0, - &self.1, - messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::SignedBatch { batch }, - ), - ) - .await; + self.send(&messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::SignedBatch { batch }, + )); Ok(()) } @@ -180,17 +215,12 @@ impl signers::Coordinator for CoordinatorSend { session: Session, signature: Signature, ) -> Result<(), Self::EphemeralError> { - send( - self.0, - &self.1, - messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::SignedSlashReport { - session, - signature: signature.encode(), - }, - ), - ) - .await; + self.send(&messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::SignedSlashReport { + session, + signature: signature.encode(), + }, + )); Ok(()) } } diff --git a/processor/bin/src/lib.rs b/processor/bin/src/lib.rs index 15873873..67ea6150 100644 --- a/processor/bin/src/lib.rs +++ b/processor/bin/src/lib.rs @@ -158,7 +158,7 @@ async fn first_block_after_time(feed: &S, serai_time: u64) -> u6 } /// The main loop of a Processor, interacting with the Coordinator. -pub async fn coordinator_loop< +pub async fn main_loop< S: ScannerFeed, K: KeyGenParams>>, Sch: Scheduler< @@ -192,7 +192,7 @@ pub async fn coordinator_loop< if let messages::key_gen::ProcessorMessage::GeneratedKeyPair { session, .. } = &msg { new_key = Some(*session) } - coordinator.send_message(messages::ProcessorMessage::KeyGen(msg)).await; + coordinator.send_message(&messages::ProcessorMessage::KeyGen(msg)); } // If we were yielded a key, register it in the signers diff --git a/processor/bitcoin/src/db.rs b/processor/bitcoin/src/db.rs index b0acc427..1d73ebfe 100644 --- a/processor/bitcoin/src/db.rs +++ b/processor/bitcoin/src/db.rs @@ -1,21 +1,4 @@ -use ciphersuite::group::GroupEncoding; - -use serai_client::validator_sets::primitives::Session; - -use serai_db::{Get, DbTxn, create_db, db_channel}; -use primitives::EncodableG; - -create_db! { - Processor { - ExternalKeyForSessionForSigners: (session: Session) -> EncodableG, - } -} - -db_channel! { - Processor { - KeyToActivate: () -> EncodableG - } -} +use serai_db::{Get, DbTxn, create_db}; create_db! { BitcoinProcessor { diff --git a/processor/bitcoin/src/main.rs b/processor/bitcoin/src/main.rs index 09228d44..74e174ee 100644 --- a/processor/bitcoin/src/main.rs +++ b/processor/bitcoin/src/main.rs @@ -57,8 +57,7 @@ async fn main() { tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![])); core::mem::forget(index_handle); - bin::coordinator_loop::<_, KeyGenParams, Scheduler<_>, Rpc>(db, feed.clone(), feed) - .await; + bin::main_loop::<_, KeyGenParams, Scheduler<_>, Rpc>(db, feed.clone(), feed).await; } /*