Use a local DB channel for sending to the message-queue

The provided message-queue queue functions runs unti it succeeds. This means
sending to the message-queue will no longer potentially block for arbitrary
amount of times as sending messages is just writing them to a DB.
This commit is contained in:
Luke Parker 2024-09-11 19:29:56 -04:00
parent b6811f9015
commit 0d4c8cf032
4 changed files with 96 additions and 84 deletions

View file

@ -1,4 +1,4 @@
use std::sync::Arc; use std::sync::{LazyLock, Arc, Mutex};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -9,8 +9,8 @@ use serai_client::{
in_instructions::primitives::{Batch, SignedBatch}, in_instructions::primitives::{Batch, SignedBatch},
}; };
use serai_env as env;
use serai_db::{Get, DbTxn, Db, create_db, db_channel}; use serai_db::{Get, DbTxn, Db, create_db, db_channel};
use serai_env as env;
use message_queue::{Service, Metadata, client::MessageQueue}; use message_queue::{Service, Metadata, client::MessageQueue};
create_db! { create_db! {
@ -21,27 +21,47 @@ create_db! {
db_channel! { db_channel! {
ProcessorBinCoordinator { ProcessorBinCoordinator {
CoordinatorMessages: () -> Vec<u8> ReceivedCoordinatorMessages: () -> Vec<u8>,
} }
} }
async fn send(service: Service, queue: &MessageQueue, msg: messages::ProcessorMessage) { // A lock to access SentCoordinatorMessages::send
let metadata = Metadata { from: service, to: Service::Coordinator, intent: msg.intent() }; static SEND_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
let msg = borsh::to_vec(&msg).unwrap();
queue.queue(metadata, msg).await; db_channel! {
ProcessorBinCoordinator {
SentCoordinatorMessages: () -> Vec<u8>,
}
}
#[derive(Clone)]
pub(crate) struct CoordinatorSend {
db: crate::Db,
sent_message: mpsc::UnboundedSender<()>,
}
impl CoordinatorSend {
fn send(&mut self, msg: &messages::ProcessorMessage) {
let _lock = SEND_LOCK.lock().unwrap();
let mut txn = self.db.txn();
SentCoordinatorMessages::send(&mut txn, &borsh::to_vec(msg).unwrap());
txn.commit();
self
.sent_message
.send(())
.expect("failed to tell the Coordinator tasks there's a new message to send");
}
} }
pub(crate) struct Coordinator { pub(crate) struct Coordinator {
new_message: mpsc::UnboundedReceiver<()>, received_message: mpsc::UnboundedReceiver<()>,
service: Service, send: CoordinatorSend,
message_queue: Arc<MessageQueue>,
} }
pub(crate) struct CoordinatorSend(Service, Arc<MessageQueue>);
impl Coordinator { impl Coordinator {
pub(crate) fn new(mut db: crate::Db) -> Self { pub(crate) fn new(db: crate::Db) -> Self {
let (new_message_send, new_message_recv) = mpsc::unbounded_channel(); let (received_message_send, received_message_recv) = mpsc::unbounded_channel();
let (sent_message_send, mut sent_message_recv) = mpsc::unbounded_channel();
let network_id = match env::var("NETWORK").expect("network wasn't specified").as_str() { let network_id = match env::var("NETWORK").expect("network wasn't specified").as_str() {
"bitcoin" => NetworkId::Bitcoin, "bitcoin" => NetworkId::Bitcoin,
@ -55,6 +75,7 @@ impl Coordinator {
// Spawn a task to move messages from the message-queue to our database so we can achieve // Spawn a task to move messages from the message-queue to our database so we can achieve
// atomicity. This is the only place we read/ack messages from // atomicity. This is the only place we read/ack messages from
tokio::spawn({ tokio::spawn({
let mut db = db.clone();
let message_queue = message_queue.clone(); let message_queue = message_queue.clone();
async move { async move {
loop { loop {
@ -70,7 +91,7 @@ impl Coordinator {
assert!((saved_messages == prior_msg) || (saved_messages == Some(msg.id))); assert!((saved_messages == prior_msg) || (saved_messages == Some(msg.id)));
if saved_messages < Some(msg.id) { if saved_messages < Some(msg.id) {
let mut txn = db.txn(); let mut txn = db.txn();
CoordinatorMessages::send(&mut txn, &msg.msg); ReceivedCoordinatorMessages::send(&mut txn, &msg.msg);
SavedMessages::set(&mut txn, &msg.id); SavedMessages::set(&mut txn, &msg.id);
txn.commit(); txn.commit();
} }
@ -78,16 +99,45 @@ impl Coordinator {
message_queue.ack(Service::Coordinator, msg.id).await; message_queue.ack(Service::Coordinator, msg.id).await;
// Fire that there's a new message // Fire that there's a new message
new_message_send.send(()).expect("failed to tell the Coordinator there's a new message"); received_message_send
.send(())
.expect("failed to tell the Coordinator there's a new message");
} }
} }
}); });
Coordinator { new_message: new_message_recv, service, message_queue } // Spawn a task to send messages to the message-queue
tokio::spawn({
let mut db = db.clone();
async move {
loop {
let mut txn = db.txn();
match SentCoordinatorMessages::try_recv(&mut txn) {
Some(msg) => {
let metadata = Metadata {
from: service,
to: Service::Coordinator,
intent: borsh::from_slice::<messages::ProcessorMessage>(&msg).unwrap().intent(),
};
message_queue.queue(metadata, msg).await;
txn.commit();
}
None => {
let _ =
tokio::time::timeout(core::time::Duration::from_secs(60), sent_message_recv.recv())
.await;
}
}
}
}
});
let send = CoordinatorSend { db, sent_message: sent_message_send };
Coordinator { received_message: received_message_recv, send }
} }
pub(crate) fn coordinator_send(&self) -> CoordinatorSend { pub(crate) fn coordinator_send(&self) -> CoordinatorSend {
CoordinatorSend(self.service, self.message_queue.clone()) self.send.clone()
} }
/// Fetch the next message from the Coordinator. /// Fetch the next message from the Coordinator.
@ -99,23 +149,22 @@ impl Coordinator {
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
) -> messages::CoordinatorMessage { ) -> messages::CoordinatorMessage {
loop { loop {
match CoordinatorMessages::try_recv(txn) { match ReceivedCoordinatorMessages::try_recv(txn) {
Some(msg) => { Some(msg) => {
return borsh::from_slice(&msg) return borsh::from_slice(&msg)
.expect("message wasn't a borsh-encoded CoordinatorMessage") .expect("message wasn't a borsh-encoded CoordinatorMessage")
} }
None => { None => {
let _ = let _ =
tokio::time::timeout(core::time::Duration::from_secs(60), self.new_message.recv()) tokio::time::timeout(core::time::Duration::from_secs(60), self.received_message.recv())
.await; .await;
} }
} }
} }
} }
#[allow(clippy::unused_async)] pub(crate) fn send_message(&mut self, msg: &messages::ProcessorMessage) {
pub(crate) async fn send_message(&mut self, msg: messages::ProcessorMessage) { self.send.send(msg);
send(self.service, &self.message_queue, msg).await
} }
} }
@ -127,8 +176,7 @@ impl signers::Coordinator for CoordinatorSend {
&mut self, &mut self,
msg: messages::sign::ProcessorMessage, msg: messages::sign::ProcessorMessage,
) -> Result<(), Self::EphemeralError> { ) -> Result<(), Self::EphemeralError> {
// TODO: Use a fallible send for these methods self.send(&messages::ProcessorMessage::Sign(msg));
send(self.0, &self.1, messages::ProcessorMessage::Sign(msg)).await;
Ok(()) Ok(())
} }
@ -138,40 +186,27 @@ impl signers::Coordinator for CoordinatorSend {
block: [u8; 32], block: [u8; 32],
signature: Signature, signature: Signature,
) -> Result<(), Self::EphemeralError> { ) -> Result<(), Self::EphemeralError> {
send( self.send(&messages::ProcessorMessage::Coordinator(
self.0, messages::coordinator::ProcessorMessage::CosignedBlock {
&self.1, block_number,
messages::ProcessorMessage::Coordinator( block,
messages::coordinator::ProcessorMessage::CosignedBlock { signature: signature.encode(),
block_number, },
block, ));
signature: signature.encode(),
},
),
)
.await;
Ok(()) Ok(())
} }
async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError> { async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError> {
send( self.send(&messages::ProcessorMessage::Substrate(
self.0, messages::substrate::ProcessorMessage::Batch { batch },
&self.1, ));
messages::ProcessorMessage::Substrate(messages::substrate::ProcessorMessage::Batch { batch }),
)
.await;
Ok(()) Ok(())
} }
async fn publish_signed_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError> { async fn publish_signed_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError> {
send( self.send(&messages::ProcessorMessage::Coordinator(
self.0, messages::coordinator::ProcessorMessage::SignedBatch { batch },
&self.1, ));
messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::SignedBatch { batch },
),
)
.await;
Ok(()) Ok(())
} }
@ -180,17 +215,12 @@ impl signers::Coordinator for CoordinatorSend {
session: Session, session: Session,
signature: Signature, signature: Signature,
) -> Result<(), Self::EphemeralError> { ) -> Result<(), Self::EphemeralError> {
send( self.send(&messages::ProcessorMessage::Coordinator(
self.0, messages::coordinator::ProcessorMessage::SignedSlashReport {
&self.1, session,
messages::ProcessorMessage::Coordinator( signature: signature.encode(),
messages::coordinator::ProcessorMessage::SignedSlashReport { },
session, ));
signature: signature.encode(),
},
),
)
.await;
Ok(()) Ok(())
} }
} }

View file

@ -158,7 +158,7 @@ async fn first_block_after_time<S: ScannerFeed>(feed: &S, serai_time: u64) -> u6
} }
/// The main loop of a Processor, interacting with the Coordinator. /// The main loop of a Processor, interacting with the Coordinator.
pub async fn coordinator_loop< pub async fn main_loop<
S: ScannerFeed, S: ScannerFeed,
K: KeyGenParams<ExternalNetworkCiphersuite: Ciphersuite<G = KeyFor<S>>>, K: KeyGenParams<ExternalNetworkCiphersuite: Ciphersuite<G = KeyFor<S>>>,
Sch: Scheduler< Sch: Scheduler<
@ -192,7 +192,7 @@ pub async fn coordinator_loop<
if let messages::key_gen::ProcessorMessage::GeneratedKeyPair { session, .. } = &msg { if let messages::key_gen::ProcessorMessage::GeneratedKeyPair { session, .. } = &msg {
new_key = Some(*session) new_key = Some(*session)
} }
coordinator.send_message(messages::ProcessorMessage::KeyGen(msg)).await; coordinator.send_message(&messages::ProcessorMessage::KeyGen(msg));
} }
// If we were yielded a key, register it in the signers // If we were yielded a key, register it in the signers

View file

@ -1,21 +1,4 @@
use ciphersuite::group::GroupEncoding; use serai_db::{Get, DbTxn, create_db};
use serai_client::validator_sets::primitives::Session;
use serai_db::{Get, DbTxn, create_db, db_channel};
use primitives::EncodableG;
create_db! {
Processor {
ExternalKeyForSessionForSigners: <K: GroupEncoding>(session: Session) -> EncodableG<K>,
}
}
db_channel! {
Processor {
KeyToActivate: <K: GroupEncoding>() -> EncodableG<K>
}
}
create_db! { create_db! {
BitcoinProcessor { BitcoinProcessor {

View file

@ -57,8 +57,7 @@ async fn main() {
tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![])); tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![]));
core::mem::forget(index_handle); core::mem::forget(index_handle);
bin::coordinator_loop::<_, KeyGenParams, Scheduler<_>, Rpc<bin::Db>>(db, feed.clone(), feed) bin::main_loop::<_, KeyGenParams, Scheduler<_>, Rpc<bin::Db>>(db, feed.clone(), feed).await;
.await;
} }
/* /*