Use dedicated Queues for each from-to pair

Prevents one Processor's message from halting the entire pipeline.
This commit is contained in:
Luke Parker 2023-09-27 12:20:57 -04:00
parent 269db1c4be
commit 40b7bc59d0
No known key found for this signature in database
12 changed files with 142 additions and 125 deletions

View file

@ -15,14 +15,14 @@ impl<D: Db> MainDb<D> {
D::key(b"coordinator_main", dst, key)
}
fn handled_message_key(id: u64) -> Vec<u8> {
Self::main_key(b"handled_message", id.to_le_bytes())
fn handled_message_key(network: NetworkId, id: u64) -> Vec<u8> {
Self::main_key(b"handled_message", (network, id).encode())
}
pub fn save_handled_message(txn: &mut D::Transaction<'_>, id: u64) {
txn.put(Self::handled_message_key(id), []);
pub fn save_handled_message(txn: &mut D::Transaction<'_>, network: NetworkId, id: u64) {
txn.put(Self::handled_message_key(network, id), []);
}
pub fn handled_message<G: Get>(getter: &G, id: u64) -> bool {
getter.get(Self::handled_message_key(id)).is_some()
pub fn handled_message<G: Get>(getter: &G, network: NetworkId, id: u64) -> bool {
getter.get(Self::handled_message_key(network, id)).is_some()
}
fn acive_tributaries_key() -> Vec<u8> {

View file

@ -533,19 +533,20 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>,
mut processors: Pro,
tributary: ActiveTributary<D, P>,
mut recv: mpsc::UnboundedReceiver<processors::Message>,
network: NetworkId,
mut recv: mpsc::UnboundedReceiver<ActiveTributary<D, P>>,
) {
let mut db_clone = db.clone(); // Enables cloning the DB while we have a txn
let pub_key = Ristretto::generator() * key.deref();
let ActiveTributary { spec, tributary } = tributary;
let ActiveTributary { spec, tributary } = recv.recv().await.unwrap();
let genesis = spec.genesis();
loop {
let msg: processors::Message = recv.recv().await.unwrap();
// TODO: Check this ID is sane (last handled ID or expected next ID)
let msg = processors.recv(network).await;
if !MainDb::<D>::handled_message(&db, msg.id) {
if !MainDb::<D>::handled_message(&db, msg.network, msg.id) {
let mut txn = db.txn();
// TODO: We probably want to NOP here, not panic?
@ -817,7 +818,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
}
}
MainDb::<D>::save_handled_message(&mut txn, msg.id);
MainDb::<D>::save_handled_message(&mut txn, msg.network, msg.id);
txn.commit();
}
@ -829,61 +830,27 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>,
mut processors: Pro,
processors: Pro,
mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
) {
let channels = Arc::new(RwLock::new(HashMap::new()));
// Listen to new tributary events
tokio::spawn({
let db = db.clone();
let processors = processors.clone();
let channels = channels.clone();
async move {
loop {
let channels = channels.clone();
let tributary = new_tributary.recv().await.unwrap();
let mut channels = HashMap::new();
for network in [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero] {
let (send, recv) = mpsc::unbounded_channel();
// TODO: Support multisig rotation (not per-Tributary yet per-network?)
channels.write().await.insert(tributary.spec.set().network, send);
// For each new tributary, spawn a dedicated task to handle its messages from the processor
// TODO: Redo per network, not per tributary
tokio::spawn(handle_processor_messages(
db.clone(),
key.clone(),
serai.clone(),
processors.clone(),
tributary,
network,
recv,
));
channels.insert(network, send);
}
}
});
// Dispatch task
let mut last_msg = None;
// Listen to new tributary events
loop {
// TODO: We dispatch this to an async task per-processor, yet we don't move to the next message
// yet as all processor messages are shoved into a global queue.
// Modify message-queue to offer per-sender queues, not per-receiver.
// Alternatively, a peek method with local delineation of handled messages would work.
let msg = processors.recv().await;
// TODO: Check this ID is sane (last handled ID or expected next ID)
if last_msg == Some(msg.id) {
sleep(Duration::from_secs(1)).await;
continue;
}
last_msg = Some(msg.id);
// TODO: Race conditions with above tributary availability?
// TODO: How does this hold up to multisig rotation?
if let Some(channel) = channels.read().await.get(&msg.network) {
channel.send(msg).unwrap();
} else {
log::warn!("received processor message for network we don't have a channel for");
}
let tributary = new_tributary.recv().await.unwrap();
channels[&tributary.spec.set().network].send(tributary).unwrap();
}
}

View file

@ -15,7 +15,7 @@ pub struct Message {
#[async_trait::async_trait]
pub trait Processors: 'static + Send + Sync + Clone {
async fn send(&self, network: NetworkId, msg: CoordinatorMessage);
async fn recv(&mut self) -> Message;
async fn recv(&mut self, network: NetworkId) -> Message;
async fn ack(&mut self, msg: Message);
}
@ -27,13 +27,10 @@ impl Processors for Arc<MessageQueue> {
let msg = serde_json::to_string(&msg).unwrap();
self.queue(metadata, msg.into_bytes()).await;
}
async fn recv(&mut self) -> Message {
let msg = self.next().await;
async fn recv(&mut self, network: NetworkId) -> Message {
let msg = self.next(Service::Processor(network)).await;
assert_eq!(msg.from, Service::Processor(network));
let network = match msg.from {
Service::Processor(network) => network,
Service::Coordinator => panic!("coordinator received coordinator message"),
};
let id = msg.id;
// Deserialize it into a ProcessorMessage
@ -43,6 +40,6 @@ impl Processors for Arc<MessageQueue> {
return Message { id, network, msg };
}
async fn ack(&mut self, msg: Message) {
MessageQueue::ack(self, msg.id).await
MessageQueue::ack(self, Service::Processor(msg.network), msg.id).await
}
}

View file

@ -35,7 +35,7 @@ impl Processors for MemProcessors {
let processor = processors.entry(network).or_insert_with(VecDeque::new);
processor.push_back(msg);
}
async fn recv(&mut self) -> Message {
async fn recv(&mut self, _: NetworkId) -> Message {
todo!()
}
async fn ack(&mut self, _: Message) {

View file

@ -140,9 +140,9 @@ impl MessageQueue {
}
}
pub async fn next(&self) -> QueuedMessage {
pub async fn next(&self, from: Service) -> QueuedMessage {
loop {
let json = self.json_call("next", serde_json::json!([self.service])).await;
let json = self.json_call("next", serde_json::json!([from, self.service])).await;
// Convert from a Value to a type via reserialization
let msg: Option<QueuedMessage> = serde_json::from_str(
@ -179,18 +179,18 @@ impl MessageQueue {
}
}
pub async fn ack(&self, id: u64) {
pub async fn ack(&self, from: Service, id: u64) {
// TODO: Should this use OsRng? Deterministic or deterministic + random may be better.
let nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
let nonce_pub = Ristretto::generator() * nonce.deref();
let sig = SchnorrSignature::<Ristretto>::sign(
&self.priv_key,
nonce,
ack_challenge(self.service, self.pub_key, id, nonce_pub),
ack_challenge(self.service, self.pub_key, from, id, nonce_pub),
)
.serialize();
let json = self.json_call("ack", serde_json::json!([self.service, id, sig])).await;
let json = self.json_call("ack", serde_json::json!([from, self.service, id, sig])).await;
if json.get("result") != Some(&serde_json::Value::Bool(true)) {
panic!("failed to ack message {id}: {json}");
}

View file

@ -25,12 +25,17 @@ mod binaries {
pub(crate) type Db = serai_db::RocksDB;
#[allow(clippy::type_complexity)]
mod clippy {
use super::*;
lazy_static::lazy_static! {
pub(crate) static ref KEYS: Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>> =
Arc::new(RwLock::new(HashMap::new()));
pub(crate) static ref QUEUES: Arc<RwLock<HashMap<Service, RwLock<Queue<Db>>>>> =
pub(crate) static ref QUEUES: Arc<RwLock<HashMap<(Service, Service), RwLock<Queue<Db>>>>> =
Arc::new(RwLock::new(HashMap::new()));
}
}
pub(crate) use self::clippy::*;
// queue RPC method
/*
@ -71,16 +76,17 @@ mod binaries {
fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
[&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat()
}
fn intent_key(from: Service, intent: &[u8]) -> Vec<u8> {
key(b"intent_seen", bincode::serialize(&(from, intent)).unwrap())
fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec<u8> {
key(b"intent_seen", bincode::serialize(&(from, to, intent)).unwrap())
}
let mut db = db.write().unwrap();
let mut txn = db.txn();
let intent_key = intent_key(meta.from, &meta.intent);
let intent_key = intent_key(meta.from, meta.to, &meta.intent);
if Get::get(&txn, &intent_key).is_some() {
log::warn!(
"Prior queued message attempted to be queued again. From: {:?} Intent: {}",
"Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}",
meta.from,
meta.to,
hex::encode(&meta.intent)
);
return;
@ -88,7 +94,7 @@ mod binaries {
DbTxn::put(&mut txn, intent_key, []);
// Queue it
let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(
let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message(
&mut txn,
QueuedMessage {
from: meta.from,
@ -105,15 +111,15 @@ mod binaries {
// next RPC method
/*
Gets the next message in queue for this service.
Gets the next message in queue for the named services.
This is not authenticated due to the fact every nonce would have to be saved to prevent
replays, or a challenge-response protocol implemented. Neither are worth doing when there
should be no sensitive data on this server.
*/
pub(crate) fn get_next_message(service: Service) -> Option<QueuedMessage> {
pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessage> {
let queue_outer = (*QUEUES).read().unwrap();
let queue = queue_outer[&service].read().unwrap();
let queue = queue_outer[&(from, to)].read().unwrap();
let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0);
queue.get_message(next)
}
@ -123,10 +129,10 @@ mod binaries {
Acknowledges a message as received and handled, meaning it'll no longer be returned as the next
message.
*/
pub(crate) fn ack_message(service: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
{
let from = (*KEYS).read().unwrap()[&service];
assert!(sig.verify(from, ack_challenge(service, from, id, sig.R)));
let to_key = (*KEYS).read().unwrap()[&to];
assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R)));
}
// Is it:
@ -136,9 +142,9 @@ mod binaries {
// It's the second if we acknowledge messages before saving them as acknowledged
// TODO: Check only a proper message is being acked
log::info!("{:?} is acknowledging {}", service, id);
log::info!("{:?} is acknowledging {:?} {}", from, to, id);
(*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id)
(*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id)
}
}
@ -177,13 +183,29 @@ async fn main() {
Some(<Ristretto as Ciphersuite>::G::from_bytes(&repr).unwrap())
};
const ALL_EXT_NETWORKS: [NetworkId; 3] =
[NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero];
let register_service = |service, key| {
(*KEYS).write().unwrap().insert(service, key);
(*QUEUES).write().unwrap().insert(service, RwLock::new(Queue(db.clone(), service)));
let mut queues = (*QUEUES).write().unwrap();
if service == Service::Coordinator {
for network in ALL_EXT_NETWORKS {
queues.insert(
(service, Service::Processor(network)),
RwLock::new(Queue(db.clone(), service, Service::Processor(network))),
);
}
} else {
queues.insert(
(service, Service::Coordinator),
RwLock::new(Queue(db.clone(), service, Service::Coordinator)),
);
}
};
// Make queues for each NetworkId, other than Serai
for network in [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero] {
for network in ALL_EXT_NETWORKS {
// Use a match so we error if the list of NetworkIds changes
let Some(key) = read_key(match network {
NetworkId::Serai => unreachable!(),
@ -224,17 +246,18 @@ async fn main() {
.unwrap();
module
.register_method("next", |args, _| {
let args = args.parse::<Service>().unwrap();
Ok(get_next_message(args))
let (from, to) = args.parse::<(Service, Service)>().unwrap();
Ok(get_next_message(from, to))
})
.unwrap();
module
.register_method("ack", |args, _| {
let args = args.parse::<(Service, u64, Vec<u8>)>().unwrap();
let args = args.parse::<(Service, Service, u64, Vec<u8>)>().unwrap();
ack_message(
args.0,
args.1,
SchnorrSignature::<Ristretto>::read(&mut args.2.as_slice()).unwrap(),
args.2,
SchnorrSignature::<Ristretto>::read(&mut args.3.as_slice()).unwrap(),
);
Ok(true)
})

View file

@ -48,15 +48,17 @@ pub fn message_challenge(
}
pub fn ack_challenge(
to: Service,
to_key: <Ristretto as Ciphersuite>::G,
from: Service,
from_key: <Ristretto as Ciphersuite>::G,
id: u64,
nonce: <Ristretto as Ciphersuite>::G,
) -> <Ristretto as Ciphersuite>::F {
let mut transcript = RecommendedTranscript::new(b"Serai Message Queue v0.1 Ackowledgement");
transcript.domain_separate(b"metadata");
transcript.append_message(b"to", bincode::serialize(&to).unwrap());
transcript.append_message(b"to_key", to_key.to_bytes());
transcript.append_message(b"from", bincode::serialize(&from).unwrap());
transcript.append_message(b"from_key", from_key.to_bytes());
transcript.domain_separate(b"message");
transcript.append_message(b"id", id.to_le_bytes());
transcript.domain_separate(b"signature");

View file

@ -3,14 +3,14 @@ use serai_db::{DbTxn, Db};
use crate::messages::*;
#[derive(Clone, Debug)]
pub(crate) struct Queue<D: Db>(pub(crate) D, pub(crate) Service);
pub(crate) struct Queue<D: Db>(pub(crate) D, pub(crate) Service, pub(crate) Service);
impl<D: Db> Queue<D> {
fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
[&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat()
}
fn message_count_key(&self) -> Vec<u8> {
Self::key(b"message_count", serde_json::to_vec(&self.1).unwrap())
Self::key(b"message_count", bincode::serialize(&(self.1, self.2)).unwrap())
}
pub(crate) fn message_count(&self) -> u64 {
self
@ -21,7 +21,7 @@ impl<D: Db> Queue<D> {
}
fn last_acknowledged_key(&self) -> Vec<u8> {
Self::key(b"last_acknowledged", serde_json::to_vec(&self.1).unwrap())
Self::key(b"last_acknowledged", bincode::serialize(&(self.1, self.2)).unwrap())
}
pub(crate) fn last_acknowledged(&self) -> Option<u64> {
self
@ -31,7 +31,7 @@ impl<D: Db> Queue<D> {
}
fn message_key(&self, id: u64) -> Vec<u8> {
Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap())
Self::key(b"message", bincode::serialize(&(self.1, self.2, id)).unwrap())
}
// TODO: This is fine as-used, yet gets from the DB while having a txn. It should get from the
// txn

View file

@ -25,7 +25,7 @@ impl Coordinator for MessageQueue {
}
async fn recv(&mut self) -> Message {
let msg = self.next().await;
let msg = self.next(Service::Coordinator).await;
let id = msg.id;
@ -37,6 +37,6 @@ impl Coordinator for MessageQueue {
}
async fn ack(&mut self, msg: Message) {
MessageQueue::ack(self, msg.id).await
MessageQueue::ack(self, Service::Coordinator, msg.id).await
}
}

View file

@ -200,7 +200,7 @@ impl Processor {
Serai::new(&self.serai_rpc).await.unwrap()
}
/// Send a message to a processor as its coordinator.
/// Send a message to the coordinator as a processor.
pub async fn send_message(&mut self, msg: impl Into<ProcessorMessage>) {
let msg: ProcessorMessage = msg.into();
self
@ -217,14 +217,14 @@ impl Processor {
self.next_send_id += 1;
}
/// Receive a message from a processor as its coordinator.
/// Receive a message from the coordinator as a processor.
pub async fn recv_message(&mut self) -> CoordinatorMessage {
let msg = tokio::time::timeout(Duration::from_secs(10), self.queue.next(self.next_recv_id))
let msg = tokio::time::timeout(Duration::from_secs(10), self.queue.next(Service::Coordinator))
.await
.unwrap();
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, self.next_recv_id);
self.queue.ack(self.next_recv_id).await;
self.queue.ack(Service::Coordinator, msg.id).await;
self.next_recv_id += 1;
serde_json::from_slice(&msg.msg).unwrap()
}

View file

@ -106,28 +106,54 @@ fn basic_functionality() {
// Successfully get it
let bitcoin = MessageQueue::new(
Service::Processor(NetworkId::Bitcoin),
rpc,
rpc.clone(),
Zeroizing::new(priv_keys[&NetworkId::Bitcoin]),
);
let msg = bitcoin.next(0).await;
let msg = bitcoin.next(Service::Coordinator).await;
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, 0);
assert_eq!(&msg.msg, b"Hello, World!");
// If we don't ack it, it should continue to be returned
assert_eq!(msg, bitcoin.next(0).await);
assert_eq!(msg, bitcoin.next(Service::Coordinator).await);
// Acknowledging it should yield the next message
bitcoin.ack(0).await;
bitcoin.ack(Service::Coordinator, 0).await;
let next_msg = bitcoin.next(1).await;
let next_msg = bitcoin.next(Service::Coordinator).await;
assert!(msg != next_msg);
assert_eq!(next_msg.from, Service::Coordinator);
assert_eq!(next_msg.id, 1);
assert_eq!(&next_msg.msg, b"Hello, World, again!");
bitcoin.ack(1).await;
bitcoin.ack(Service::Coordinator, 1).await;
// No further messages should be available
tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(2)).await.unwrap_err();
tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(Service::Coordinator))
.await
.unwrap_err();
// Queueing to a distinct processor should work, with a unique ID
coordinator
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(NetworkId::Monero),
// Intents should be per-from-to, making this valid
intent: b"intent".to_vec(),
},
b"Hello, World!".to_vec(),
)
.await;
let monero = MessageQueue::new(
Service::Processor(NetworkId::Monero),
rpc,
Zeroizing::new(priv_keys[&NetworkId::Monero]),
);
assert_eq!(monero.next(Service::Coordinator).await.id, 0);
monero.ack(Service::Coordinator, 0).await;
tokio::time::timeout(core::time::Duration::from_secs(10), monero.next(Service::Coordinator))
.await
.unwrap_err();
});
}

View file

@ -222,13 +222,15 @@ impl Coordinator {
/// Receive a message from a processor as its coordinator.
pub async fn recv_message(&mut self) -> ProcessorMessage {
let msg =
tokio::time::timeout(core::time::Duration::from_secs(10), self.queue.next(self.next_recv_id))
let msg = tokio::time::timeout(
core::time::Duration::from_secs(10),
self.queue.next(Service::Processor(self.network)),
)
.await
.unwrap();
assert_eq!(msg.from, Service::Processor(self.network));
assert_eq!(msg.id, self.next_recv_id);
self.queue.ack(self.next_recv_id).await;
self.queue.ack(Service::Processor(self.network), msg.id).await;
self.next_recv_id += 1;
serde_json::from_slice(&msg.msg).unwrap()
}