diff --git a/Cargo.lock b/Cargo.lock index 7d84f940..81e7820c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2439,6 +2439,7 @@ name = "ff-group-tests" version = "0.13.0" dependencies = [ "bls12_381", + "ff", "group", "k256", "p256", diff --git a/coins/monero/src/bin/reserialize_chain.rs b/coins/monero/src/bin/reserialize_chain.rs index 75e9c391..5122f4ec 100644 --- a/coins/monero/src/bin/reserialize_chain.rs +++ b/coins/monero/src/bin/reserialize_chain.rs @@ -1,245 +1,251 @@ -use std::sync::Arc; +#[cfg(feature = "binaries")] +mod binaries { + pub(crate) use std::sync::Arc; -use curve25519_dalek::{ - scalar::Scalar, - edwards::{CompressedEdwardsY, EdwardsPoint}, -}; - -use multiexp::BatchVerifier; - -use serde::Deserialize; -use serde_json::json; - -use monero_serai::{ - Commitment, - ringct::RctPrunable, - transaction::{Input, Transaction}, - block::Block, - rpc::{RpcError, Rpc, HttpRpc}, -}; - -use tokio::task::JoinHandle; - -async fn check_block(rpc: Arc>, block_i: usize) { - let hash = loop { - match rpc.get_block_hash(block_i).await { - Ok(hash) => break hash, - Err(RpcError::ConnectionError) => { - println!("get_block_hash ConnectionError"); - continue; - } - Err(e) => panic!("couldn't get block {block_i}'s hash: {e:?}"), - } + pub(crate) use curve25519_dalek::{ + scalar::Scalar, + edwards::{CompressedEdwardsY, EdwardsPoint}, }; - // TODO: Grab the JSON to also check it was deserialized correctly - #[derive(Deserialize, Debug)] - struct BlockResponse { - blob: String, - } - let res: BlockResponse = loop { - match rpc.json_rpc_call("get_block", Some(json!({ "hash": hex::encode(hash) }))).await { - Ok(res) => break res, - Err(RpcError::ConnectionError) => { - println!("get_block ConnectionError"); - continue; - } - Err(e) => panic!("couldn't get block {block_i} via block.hash(): {e:?}"), - } + pub(crate) use multiexp::BatchVerifier; + + pub(crate) use serde::Deserialize; + pub(crate) use serde_json::json; + + pub(crate) use monero_serai::{ + Commitment, + ringct::RctPrunable, + transaction::{Input, Transaction}, + block::Block, + rpc::{RpcError, Rpc, HttpRpc}, }; - let blob = hex::decode(res.blob).expect("node returned non-hex block"); - let block = Block::read(&mut blob.as_slice()) - .unwrap_or_else(|_| panic!("couldn't deserialize block {block_i}")); - assert_eq!(block.hash(), hash, "hash differs"); - assert_eq!(block.serialize(), blob, "serialization differs"); + pub(crate) use tokio::task::JoinHandle; - let txs_len = 1 + block.txs.len(); - - if !block.txs.is_empty() { - #[derive(Deserialize, Debug)] - struct TransactionResponse { - tx_hash: String, - as_hex: String, - } - #[derive(Deserialize, Debug)] - struct TransactionsResponse { - #[serde(default)] - missed_tx: Vec, - txs: Vec, - } - - let mut hashes_hex = block.txs.iter().map(hex::encode).collect::>(); - let mut all_txs = vec![]; - while !hashes_hex.is_empty() { - let txs: TransactionsResponse = loop { - match rpc - .rpc_call( - "get_transactions", - Some(json!({ - "txs_hashes": hashes_hex.drain(.. hashes_hex.len().min(100)).collect::>(), - })), - ) - .await - { - Ok(txs) => break txs, - Err(RpcError::ConnectionError) => { - println!("get_transactions ConnectionError"); - continue; - } - Err(e) => panic!("couldn't call get_transactions: {e:?}"), + pub(crate) async fn check_block(rpc: Arc>, block_i: usize) { + let hash = loop { + match rpc.get_block_hash(block_i).await { + Ok(hash) => break hash, + Err(RpcError::ConnectionError) => { + println!("get_block_hash ConnectionError"); + continue; } - }; - assert!(txs.missed_tx.is_empty()); - all_txs.extend(txs.txs); + Err(e) => panic!("couldn't get block {block_i}'s hash: {e:?}"), + } + }; + + // TODO: Grab the JSON to also check it was deserialized correctly + #[derive(Deserialize, Debug)] + struct BlockResponse { + blob: String, } + let res: BlockResponse = loop { + match rpc.json_rpc_call("get_block", Some(json!({ "hash": hex::encode(hash) }))).await { + Ok(res) => break res, + Err(RpcError::ConnectionError) => { + println!("get_block ConnectionError"); + continue; + } + Err(e) => panic!("couldn't get block {block_i} via block.hash(): {e:?}"), + } + }; - let mut batch = BatchVerifier::new(block.txs.len()); - for (tx_hash, tx_res) in block.txs.into_iter().zip(all_txs) { - assert_eq!( - tx_res.tx_hash, - hex::encode(tx_hash), - "node returned a transaction with different hash" - ); + let blob = hex::decode(res.blob).expect("node returned non-hex block"); + let block = Block::read(&mut blob.as_slice()) + .unwrap_or_else(|_| panic!("couldn't deserialize block {block_i}")); + assert_eq!(block.hash(), hash, "hash differs"); + assert_eq!(block.serialize(), blob, "serialization differs"); - let tx = Transaction::read( - &mut hex::decode(&tx_res.as_hex).expect("node returned non-hex transaction").as_slice(), - ) - .expect("couldn't deserialize transaction"); + let txs_len = 1 + block.txs.len(); - assert_eq!( - hex::encode(tx.serialize()), - tx_res.as_hex, - "Transaction serialization was different" - ); - assert_eq!(tx.hash(), tx_hash, "Transaction hash was different"); - - if matches!(tx.rct_signatures.prunable, RctPrunable::Null) { - assert_eq!(tx.prefix.version, 1); - assert!(!tx.signatures.is_empty()); - continue; + if !block.txs.is_empty() { + #[derive(Deserialize, Debug)] + struct TransactionResponse { + tx_hash: String, + as_hex: String, + } + #[derive(Deserialize, Debug)] + struct TransactionsResponse { + #[serde(default)] + missed_tx: Vec, + txs: Vec, } - let sig_hash = tx.signature_hash(); - // Verify all proofs we support proving for - // This is due to having debug_asserts calling verify within their proving, and CLSAG - // multisig explicitly calling verify as part of its signing process - // Accordingly, making sure our signature_hash algorithm is correct is great, and further - // making sure the verification functions are valid is appreciated - match tx.rct_signatures.prunable { - RctPrunable::Null | RctPrunable::MlsagBorromean { .. } => {} - RctPrunable::MlsagBulletproofs { bulletproofs, .. } => { - assert!(bulletproofs.batch_verify( - &mut rand_core::OsRng, - &mut batch, - (), - &tx.rct_signatures.base.commitments - )); - } - RctPrunable::Clsag { bulletproofs, clsags, pseudo_outs } => { - assert!(bulletproofs.batch_verify( - &mut rand_core::OsRng, - &mut batch, - (), - &tx.rct_signatures.base.commitments - )); - - for (i, clsag) in clsags.into_iter().enumerate() { - let (amount, key_offsets, image) = match &tx.prefix.inputs[i] { - Input::Gen(_) => panic!("Input::Gen"), - Input::ToKey { amount, key_offsets, key_image } => (amount, key_offsets, key_image), - }; - - let mut running_sum = 0; - let mut actual_indexes = vec![]; - for offset in key_offsets { - running_sum += offset; - actual_indexes.push(running_sum); + let mut hashes_hex = block.txs.iter().map(hex::encode).collect::>(); + let mut all_txs = vec![]; + while !hashes_hex.is_empty() { + let txs: TransactionsResponse = loop { + match rpc + .rpc_call( + "get_transactions", + Some(json!({ + "txs_hashes": hashes_hex.drain(.. hashes_hex.len().min(100)).collect::>(), + })), + ) + .await + { + Ok(txs) => break txs, + Err(RpcError::ConnectionError) => { + println!("get_transactions ConnectionError"); + continue; } + Err(e) => panic!("couldn't call get_transactions: {e:?}"), + } + }; + assert!(txs.missed_tx.is_empty()); + all_txs.extend(txs.txs); + } - async fn get_outs( - rpc: &Rpc, - amount: u64, - indexes: &[u64], - ) -> Vec<[EdwardsPoint; 2]> { - #[derive(Deserialize, Debug)] - struct Out { - key: String, - mask: String, + let mut batch = BatchVerifier::new(block.txs.len()); + for (tx_hash, tx_res) in block.txs.into_iter().zip(all_txs) { + assert_eq!( + tx_res.tx_hash, + hex::encode(tx_hash), + "node returned a transaction with different hash" + ); + + let tx = Transaction::read( + &mut hex::decode(&tx_res.as_hex).expect("node returned non-hex transaction").as_slice(), + ) + .expect("couldn't deserialize transaction"); + + assert_eq!( + hex::encode(tx.serialize()), + tx_res.as_hex, + "Transaction serialization was different" + ); + assert_eq!(tx.hash(), tx_hash, "Transaction hash was different"); + + if matches!(tx.rct_signatures.prunable, RctPrunable::Null) { + assert_eq!(tx.prefix.version, 1); + assert!(!tx.signatures.is_empty()); + continue; + } + + let sig_hash = tx.signature_hash(); + // Verify all proofs we support proving for + // This is due to having debug_asserts calling verify within their proving, and CLSAG + // multisig explicitly calling verify as part of its signing process + // Accordingly, making sure our signature_hash algorithm is correct is great, and further + // making sure the verification functions are valid is appreciated + match tx.rct_signatures.prunable { + RctPrunable::Null | RctPrunable::MlsagBorromean { .. } => {} + RctPrunable::MlsagBulletproofs { bulletproofs, .. } => { + assert!(bulletproofs.batch_verify( + &mut rand_core::OsRng, + &mut batch, + (), + &tx.rct_signatures.base.commitments + )); + } + RctPrunable::Clsag { bulletproofs, clsags, pseudo_outs } => { + assert!(bulletproofs.batch_verify( + &mut rand_core::OsRng, + &mut batch, + (), + &tx.rct_signatures.base.commitments + )); + + for (i, clsag) in clsags.into_iter().enumerate() { + let (amount, key_offsets, image) = match &tx.prefix.inputs[i] { + Input::Gen(_) => panic!("Input::Gen"), + Input::ToKey { amount, key_offsets, key_image } => (amount, key_offsets, key_image), + }; + + let mut running_sum = 0; + let mut actual_indexes = vec![]; + for offset in key_offsets { + running_sum += offset; + actual_indexes.push(running_sum); } - #[derive(Deserialize, Debug)] - struct Outs { - outs: Vec, - } - - let outs: Outs = loop { - match rpc - .rpc_call( - "get_outs", - Some(json!({ - "get_txid": true, - "outputs": indexes.iter().map(|o| json!({ - "amount": amount, - "index": o - })).collect::>() - })), - ) - .await - { - Ok(outs) => break outs, - Err(RpcError::ConnectionError) => { - println!("get_outs ConnectionError"); - continue; - } - Err(e) => panic!("couldn't connect to RPC to get outs: {e:?}"), + async fn get_outs( + rpc: &Rpc, + amount: u64, + indexes: &[u64], + ) -> Vec<[EdwardsPoint; 2]> { + #[derive(Deserialize, Debug)] + struct Out { + key: String, + mask: String, } - }; - let rpc_point = |point: &str| { - CompressedEdwardsY( - hex::decode(point) - .expect("invalid hex for ring member") - .try_into() - .expect("invalid point len for ring member"), - ) - .decompress() - .expect("invalid point for ring member") - }; + #[derive(Deserialize, Debug)] + struct Outs { + outs: Vec, + } - outs - .outs - .iter() - .map(|out| { - let mask = rpc_point(&out.mask); - if amount != 0 { - assert_eq!(mask, Commitment::new(Scalar::from(1u8), amount).calculate()); + let outs: Outs = loop { + match rpc + .rpc_call( + "get_outs", + Some(json!({ + "get_txid": true, + "outputs": indexes.iter().map(|o| json!({ + "amount": amount, + "index": o + })).collect::>() + })), + ) + .await + { + Ok(outs) => break outs, + Err(RpcError::ConnectionError) => { + println!("get_outs ConnectionError"); + continue; + } + Err(e) => panic!("couldn't connect to RPC to get outs: {e:?}"), } - [rpc_point(&out.key), mask] - }) - .collect() - } + }; - clsag - .verify( - &get_outs(&rpc, amount.unwrap_or(0), &actual_indexes).await, - image, - &pseudo_outs[i], - &sig_hash, - ) - .unwrap(); + let rpc_point = |point: &str| { + CompressedEdwardsY( + hex::decode(point) + .expect("invalid hex for ring member") + .try_into() + .expect("invalid point len for ring member"), + ) + .decompress() + .expect("invalid point for ring member") + }; + + outs + .outs + .iter() + .map(|out| { + let mask = rpc_point(&out.mask); + if amount != 0 { + assert_eq!(mask, Commitment::new(Scalar::from(1u8), amount).calculate()); + } + [rpc_point(&out.key), mask] + }) + .collect() + } + + clsag + .verify( + &get_outs(&rpc, amount.unwrap_or(0), &actual_indexes).await, + image, + &pseudo_outs[i], + &sig_hash, + ) + .unwrap(); + } } } } + assert!(batch.verify_vartime()); } - assert!(batch.verify_vartime()); - } - println!("Deserialized, hashed, and reserialized {block_i} with {} TXs", txs_len); + println!("Deserialized, hashed, and reserialized {block_i} with {} TXs", txs_len); + } } +#[cfg(feature = "binaries")] #[tokio::main] async fn main() { + use binaries::*; + let args = std::env::args().collect::>(); // Read start block as the first arg @@ -307,3 +313,8 @@ async fn main() { } } } + +#[cfg(not(feature = "binaries"))] +fn main() { + panic!("To run binaries, please build with `--feature binaries`."); +} diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index d1544572..e5c24e62 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -1,114 +1,123 @@ -use std::{ - sync::{Arc, RwLock}, - collections::HashMap, -}; - -use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; -use schnorr_signatures::SchnorrSignature; - -use serai_primitives::NetworkId; - -use jsonrpsee::{RpcModule, server::ServerBuilder}; - +#[cfg(feature = "binaries")] mod messages; -use messages::*; - +#[cfg(feature = "binaries")] mod queue; -use queue::Queue; -type Db = serai_db::RocksDB; +#[cfg(feature = "binaries")] +mod binaries { + pub(crate) use std::{ + sync::{Arc, RwLock}, + collections::HashMap, + }; -lazy_static::lazy_static! { - static ref KEYS: Arc::G>>> = - Arc::new(RwLock::new(HashMap::new())); - static ref QUEUES: Arc>>>> = - Arc::new(RwLock::new(HashMap::new())); -} + pub(crate) use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; + pub(crate) use schnorr_signatures::SchnorrSignature; -// queue RPC method -/* - Queues a message to be delivered from a processor to a coordinator, or vice versa. + pub(crate) use serai_primitives::NetworkId; - Messages are authenticated to be coming from the claimed service. Recipient services SHOULD - independently verify signatures. + pub(crate) use jsonrpsee::{RpcModule, server::ServerBuilder}; - The metadata specifies an intent. Only one message, for a specified intent, will be delivered. - This allows services to safely send messages multiple times without them being delivered multiple - times. + pub(crate) use crate::messages::*; - The message will be ordered by this service, with the order having no guarantees other than - successful ordering by the time this call returns. -*/ -fn queue_message(meta: Metadata, msg: Vec, sig: SchnorrSignature) { - { - let from = (*KEYS).read().unwrap()[&meta.from]; - assert!( - sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R)) - ); + pub(crate) use crate::queue::Queue; + + pub(crate) type Db = serai_db::RocksDB; + + lazy_static::lazy_static! { + pub(crate) static ref KEYS: Arc::G>>> = + Arc::new(RwLock::new(HashMap::new())); + pub(crate) static ref QUEUES: Arc>>>> = + Arc::new(RwLock::new(HashMap::new())); } - // Assert one, and only one of these, is the coordinator - assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator)); + // queue RPC method + /* + Queues a message to be delivered from a processor to a coordinator, or vice versa. - // TODO: Verify (from, intent) hasn't been prior seen + Messages are authenticated to be coming from the claimed service. Recipient services SHOULD + independently verify signatures. - // Queue it - let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage { - from: meta.from, - // Temporary value which queue_message will override - id: u64::MAX, - msg, - sig: sig.serialize(), - }); + The metadata specifies an intent. Only one message, for a specified intent, will be delivered. + This allows services to safely send messages multiple times without them being delivered + multiple times. - log::info!("Queued message from {:?}. It is {:?} {id}", meta.from, meta.to); -} + The message will be ordered by this service, with the order having no guarantees other than + successful ordering by the time this call returns. + */ + pub(crate) fn queue_message(meta: Metadata, msg: Vec, sig: SchnorrSignature) { + { + let from = (*KEYS).read().unwrap()[&meta.from]; + assert!( + sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R)) + ); + } -// next RPC method -/* - Gets the next message in queue for this service. + // Assert one, and only one of these, is the coordinator + assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator)); - This is not authenticated due to the fact every nonce would have to be saved to prevent replays, - or a challenge-response protocol implemented. Neither are worth doing when there should be no - sensitive data on this server. + // TODO: Verify (from, intent) hasn't been prior seen - The expected index is used to ensure a service didn't fall out of sync with this service. It - should always be either the next message's ID or *TODO*. -*/ -fn get_next_message(service: Service, _expected: u64) -> Option { - // TODO: Verify the expected next message ID matches + // Queue it + let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage { + from: meta.from, + // Temporary value which queue_message will override + id: u64::MAX, + msg, + sig: sig.serialize(), + }); - let queue_outer = (*QUEUES).read().unwrap(); - let queue = queue_outer[&service].read().unwrap(); - let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0); - queue.get_message(next) -} - -// ack RPC method -/* - Acknowledges a message as received and handled, meaning it'll no longer be returned as the next - message. -*/ -fn ack_message(service: Service, id: u64, sig: SchnorrSignature) { - { - let from = (*KEYS).read().unwrap()[&service]; - assert!(sig.verify(from, ack_challenge(service, from, id, sig.R))); + log::info!("Queued message from {:?}. It is {:?} {id}", meta.from, meta.to); } - // Is it: - // The acknowledged message should be > last acknowledged OR - // The acknowledged message should be >= - // It's the first if we save messages as acknowledged before acknowledging them - // It's the second if we acknowledge messages before saving them as acknowledged - // TODO: Check only a proper message is being acked + // next RPC method + /* + Gets the next message in queue for this service. - log::info!("{:?} is acknowledging {}", service, id); + This is not authenticated due to the fact every nonce would have to be saved to prevent replays, + or a challenge-response protocol implemented. Neither are worth doing when there should be no + sensitive data on this server. - (*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id) + The expected index is used to ensure a service didn't fall out of sync with this service. It + should always be either the next message's ID or *TODO*. + */ + pub(crate) fn get_next_message(service: Service, _expected: u64) -> Option { + // TODO: Verify the expected next message ID matches + + let queue_outer = (*QUEUES).read().unwrap(); + let queue = queue_outer[&service].read().unwrap(); + let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0); + queue.get_message(next) + } + + // ack RPC method + /* + Acknowledges a message as received and handled, meaning it'll no longer be returned as the next + message. + */ + pub(crate) fn ack_message(service: Service, id: u64, sig: SchnorrSignature) { + { + let from = (*KEYS).read().unwrap()[&service]; + assert!(sig.verify(from, ack_challenge(service, from, id, sig.R))); + } + + // Is it: + // The acknowledged message should be > last acknowledged OR + // The acknowledged message should be >= + // It's the first if we save messages as acknowledged before acknowledging them + // It's the second if we acknowledge messages before saving them as acknowledged + // TODO: Check only a proper message is being acked + + log::info!("{:?} is acknowledging {}", service, id); + + (*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id) + } } +#[cfg(feature = "binaries")] #[tokio::main] async fn main() { + use binaries::*; + if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string())); } @@ -192,3 +201,8 @@ async fn main() { // Run until stopped, which it never will server.start(module).unwrap().stopped().await; } + +#[cfg(not(feature = "binaries"))] +fn main() { + panic!("To run binaries, please build with `--feature binaries`."); +}