mirror of
https://github.com/serai-dex/serai.git
synced 2024-12-22 19:49:22 +00:00
Stub binaries' code when features binaries is not set
Allows running `cargo build` in monero-serai and message-queue without erroring, since it'd automatically try to build the binaries which require additional features. While we could make those features not optional, it'd increase time to build and disk space required, which is why the features exist for monero-serai and message-queue in the first place (since both are frequently used as libs).
This commit is contained in:
parent
38ad1d4bc4
commit
376b36974f
3 changed files with 323 additions and 297 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -2439,6 +2439,7 @@ name = "ff-group-tests"
|
||||||
version = "0.13.0"
|
version = "0.13.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bls12_381",
|
"bls12_381",
|
||||||
|
"ff",
|
||||||
"group",
|
"group",
|
||||||
"k256",
|
"k256",
|
||||||
"p256",
|
"p256",
|
||||||
|
|
|
@ -1,245 +1,251 @@
|
||||||
use std::sync::Arc;
|
#[cfg(feature = "binaries")]
|
||||||
|
mod binaries {
|
||||||
|
pub(crate) use std::sync::Arc;
|
||||||
|
|
||||||
use curve25519_dalek::{
|
pub(crate) use curve25519_dalek::{
|
||||||
scalar::Scalar,
|
scalar::Scalar,
|
||||||
edwards::{CompressedEdwardsY, EdwardsPoint},
|
edwards::{CompressedEdwardsY, EdwardsPoint},
|
||||||
};
|
|
||||||
|
|
||||||
use multiexp::BatchVerifier;
|
|
||||||
|
|
||||||
use serde::Deserialize;
|
|
||||||
use serde_json::json;
|
|
||||||
|
|
||||||
use monero_serai::{
|
|
||||||
Commitment,
|
|
||||||
ringct::RctPrunable,
|
|
||||||
transaction::{Input, Transaction},
|
|
||||||
block::Block,
|
|
||||||
rpc::{RpcError, Rpc, HttpRpc},
|
|
||||||
};
|
|
||||||
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
|
|
||||||
async fn check_block(rpc: Arc<Rpc<HttpRpc>>, block_i: usize) {
|
|
||||||
let hash = loop {
|
|
||||||
match rpc.get_block_hash(block_i).await {
|
|
||||||
Ok(hash) => break hash,
|
|
||||||
Err(RpcError::ConnectionError) => {
|
|
||||||
println!("get_block_hash ConnectionError");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => panic!("couldn't get block {block_i}'s hash: {e:?}"),
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: Grab the JSON to also check it was deserialized correctly
|
pub(crate) use multiexp::BatchVerifier;
|
||||||
#[derive(Deserialize, Debug)]
|
|
||||||
struct BlockResponse {
|
pub(crate) use serde::Deserialize;
|
||||||
blob: String,
|
pub(crate) use serde_json::json;
|
||||||
}
|
|
||||||
let res: BlockResponse = loop {
|
pub(crate) use monero_serai::{
|
||||||
match rpc.json_rpc_call("get_block", Some(json!({ "hash": hex::encode(hash) }))).await {
|
Commitment,
|
||||||
Ok(res) => break res,
|
ringct::RctPrunable,
|
||||||
Err(RpcError::ConnectionError) => {
|
transaction::{Input, Transaction},
|
||||||
println!("get_block ConnectionError");
|
block::Block,
|
||||||
continue;
|
rpc::{RpcError, Rpc, HttpRpc},
|
||||||
}
|
|
||||||
Err(e) => panic!("couldn't get block {block_i} via block.hash(): {e:?}"),
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let blob = hex::decode(res.blob).expect("node returned non-hex block");
|
pub(crate) use tokio::task::JoinHandle;
|
||||||
let block = Block::read(&mut blob.as_slice())
|
|
||||||
.unwrap_or_else(|_| panic!("couldn't deserialize block {block_i}"));
|
|
||||||
assert_eq!(block.hash(), hash, "hash differs");
|
|
||||||
assert_eq!(block.serialize(), blob, "serialization differs");
|
|
||||||
|
|
||||||
let txs_len = 1 + block.txs.len();
|
pub(crate) async fn check_block(rpc: Arc<Rpc<HttpRpc>>, block_i: usize) {
|
||||||
|
let hash = loop {
|
||||||
if !block.txs.is_empty() {
|
match rpc.get_block_hash(block_i).await {
|
||||||
#[derive(Deserialize, Debug)]
|
Ok(hash) => break hash,
|
||||||
struct TransactionResponse {
|
Err(RpcError::ConnectionError) => {
|
||||||
tx_hash: String,
|
println!("get_block_hash ConnectionError");
|
||||||
as_hex: String,
|
continue;
|
||||||
}
|
|
||||||
#[derive(Deserialize, Debug)]
|
|
||||||
struct TransactionsResponse {
|
|
||||||
#[serde(default)]
|
|
||||||
missed_tx: Vec<String>,
|
|
||||||
txs: Vec<TransactionResponse>,
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut hashes_hex = block.txs.iter().map(hex::encode).collect::<Vec<_>>();
|
|
||||||
let mut all_txs = vec![];
|
|
||||||
while !hashes_hex.is_empty() {
|
|
||||||
let txs: TransactionsResponse = loop {
|
|
||||||
match rpc
|
|
||||||
.rpc_call(
|
|
||||||
"get_transactions",
|
|
||||||
Some(json!({
|
|
||||||
"txs_hashes": hashes_hex.drain(.. hashes_hex.len().min(100)).collect::<Vec<_>>(),
|
|
||||||
})),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(txs) => break txs,
|
|
||||||
Err(RpcError::ConnectionError) => {
|
|
||||||
println!("get_transactions ConnectionError");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => panic!("couldn't call get_transactions: {e:?}"),
|
|
||||||
}
|
}
|
||||||
};
|
Err(e) => panic!("couldn't get block {block_i}'s hash: {e:?}"),
|
||||||
assert!(txs.missed_tx.is_empty());
|
}
|
||||||
all_txs.extend(txs.txs);
|
};
|
||||||
|
|
||||||
|
// TODO: Grab the JSON to also check it was deserialized correctly
|
||||||
|
#[derive(Deserialize, Debug)]
|
||||||
|
struct BlockResponse {
|
||||||
|
blob: String,
|
||||||
}
|
}
|
||||||
|
let res: BlockResponse = loop {
|
||||||
|
match rpc.json_rpc_call("get_block", Some(json!({ "hash": hex::encode(hash) }))).await {
|
||||||
|
Ok(res) => break res,
|
||||||
|
Err(RpcError::ConnectionError) => {
|
||||||
|
println!("get_block ConnectionError");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(e) => panic!("couldn't get block {block_i} via block.hash(): {e:?}"),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let mut batch = BatchVerifier::new(block.txs.len());
|
let blob = hex::decode(res.blob).expect("node returned non-hex block");
|
||||||
for (tx_hash, tx_res) in block.txs.into_iter().zip(all_txs) {
|
let block = Block::read(&mut blob.as_slice())
|
||||||
assert_eq!(
|
.unwrap_or_else(|_| panic!("couldn't deserialize block {block_i}"));
|
||||||
tx_res.tx_hash,
|
assert_eq!(block.hash(), hash, "hash differs");
|
||||||
hex::encode(tx_hash),
|
assert_eq!(block.serialize(), blob, "serialization differs");
|
||||||
"node returned a transaction with different hash"
|
|
||||||
);
|
|
||||||
|
|
||||||
let tx = Transaction::read(
|
let txs_len = 1 + block.txs.len();
|
||||||
&mut hex::decode(&tx_res.as_hex).expect("node returned non-hex transaction").as_slice(),
|
|
||||||
)
|
|
||||||
.expect("couldn't deserialize transaction");
|
|
||||||
|
|
||||||
assert_eq!(
|
if !block.txs.is_empty() {
|
||||||
hex::encode(tx.serialize()),
|
#[derive(Deserialize, Debug)]
|
||||||
tx_res.as_hex,
|
struct TransactionResponse {
|
||||||
"Transaction serialization was different"
|
tx_hash: String,
|
||||||
);
|
as_hex: String,
|
||||||
assert_eq!(tx.hash(), tx_hash, "Transaction hash was different");
|
}
|
||||||
|
#[derive(Deserialize, Debug)]
|
||||||
if matches!(tx.rct_signatures.prunable, RctPrunable::Null) {
|
struct TransactionsResponse {
|
||||||
assert_eq!(tx.prefix.version, 1);
|
#[serde(default)]
|
||||||
assert!(!tx.signatures.is_empty());
|
missed_tx: Vec<String>,
|
||||||
continue;
|
txs: Vec<TransactionResponse>,
|
||||||
}
|
}
|
||||||
|
|
||||||
let sig_hash = tx.signature_hash();
|
let mut hashes_hex = block.txs.iter().map(hex::encode).collect::<Vec<_>>();
|
||||||
// Verify all proofs we support proving for
|
let mut all_txs = vec![];
|
||||||
// This is due to having debug_asserts calling verify within their proving, and CLSAG
|
while !hashes_hex.is_empty() {
|
||||||
// multisig explicitly calling verify as part of its signing process
|
let txs: TransactionsResponse = loop {
|
||||||
// Accordingly, making sure our signature_hash algorithm is correct is great, and further
|
match rpc
|
||||||
// making sure the verification functions are valid is appreciated
|
.rpc_call(
|
||||||
match tx.rct_signatures.prunable {
|
"get_transactions",
|
||||||
RctPrunable::Null | RctPrunable::MlsagBorromean { .. } => {}
|
Some(json!({
|
||||||
RctPrunable::MlsagBulletproofs { bulletproofs, .. } => {
|
"txs_hashes": hashes_hex.drain(.. hashes_hex.len().min(100)).collect::<Vec<_>>(),
|
||||||
assert!(bulletproofs.batch_verify(
|
})),
|
||||||
&mut rand_core::OsRng,
|
)
|
||||||
&mut batch,
|
.await
|
||||||
(),
|
{
|
||||||
&tx.rct_signatures.base.commitments
|
Ok(txs) => break txs,
|
||||||
));
|
Err(RpcError::ConnectionError) => {
|
||||||
}
|
println!("get_transactions ConnectionError");
|
||||||
RctPrunable::Clsag { bulletproofs, clsags, pseudo_outs } => {
|
continue;
|
||||||
assert!(bulletproofs.batch_verify(
|
|
||||||
&mut rand_core::OsRng,
|
|
||||||
&mut batch,
|
|
||||||
(),
|
|
||||||
&tx.rct_signatures.base.commitments
|
|
||||||
));
|
|
||||||
|
|
||||||
for (i, clsag) in clsags.into_iter().enumerate() {
|
|
||||||
let (amount, key_offsets, image) = match &tx.prefix.inputs[i] {
|
|
||||||
Input::Gen(_) => panic!("Input::Gen"),
|
|
||||||
Input::ToKey { amount, key_offsets, key_image } => (amount, key_offsets, key_image),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut running_sum = 0;
|
|
||||||
let mut actual_indexes = vec![];
|
|
||||||
for offset in key_offsets {
|
|
||||||
running_sum += offset;
|
|
||||||
actual_indexes.push(running_sum);
|
|
||||||
}
|
}
|
||||||
|
Err(e) => panic!("couldn't call get_transactions: {e:?}"),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
assert!(txs.missed_tx.is_empty());
|
||||||
|
all_txs.extend(txs.txs);
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_outs(
|
let mut batch = BatchVerifier::new(block.txs.len());
|
||||||
rpc: &Rpc<HttpRpc>,
|
for (tx_hash, tx_res) in block.txs.into_iter().zip(all_txs) {
|
||||||
amount: u64,
|
assert_eq!(
|
||||||
indexes: &[u64],
|
tx_res.tx_hash,
|
||||||
) -> Vec<[EdwardsPoint; 2]> {
|
hex::encode(tx_hash),
|
||||||
#[derive(Deserialize, Debug)]
|
"node returned a transaction with different hash"
|
||||||
struct Out {
|
);
|
||||||
key: String,
|
|
||||||
mask: String,
|
let tx = Transaction::read(
|
||||||
|
&mut hex::decode(&tx_res.as_hex).expect("node returned non-hex transaction").as_slice(),
|
||||||
|
)
|
||||||
|
.expect("couldn't deserialize transaction");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
hex::encode(tx.serialize()),
|
||||||
|
tx_res.as_hex,
|
||||||
|
"Transaction serialization was different"
|
||||||
|
);
|
||||||
|
assert_eq!(tx.hash(), tx_hash, "Transaction hash was different");
|
||||||
|
|
||||||
|
if matches!(tx.rct_signatures.prunable, RctPrunable::Null) {
|
||||||
|
assert_eq!(tx.prefix.version, 1);
|
||||||
|
assert!(!tx.signatures.is_empty());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let sig_hash = tx.signature_hash();
|
||||||
|
// Verify all proofs we support proving for
|
||||||
|
// This is due to having debug_asserts calling verify within their proving, and CLSAG
|
||||||
|
// multisig explicitly calling verify as part of its signing process
|
||||||
|
// Accordingly, making sure our signature_hash algorithm is correct is great, and further
|
||||||
|
// making sure the verification functions are valid is appreciated
|
||||||
|
match tx.rct_signatures.prunable {
|
||||||
|
RctPrunable::Null | RctPrunable::MlsagBorromean { .. } => {}
|
||||||
|
RctPrunable::MlsagBulletproofs { bulletproofs, .. } => {
|
||||||
|
assert!(bulletproofs.batch_verify(
|
||||||
|
&mut rand_core::OsRng,
|
||||||
|
&mut batch,
|
||||||
|
(),
|
||||||
|
&tx.rct_signatures.base.commitments
|
||||||
|
));
|
||||||
|
}
|
||||||
|
RctPrunable::Clsag { bulletproofs, clsags, pseudo_outs } => {
|
||||||
|
assert!(bulletproofs.batch_verify(
|
||||||
|
&mut rand_core::OsRng,
|
||||||
|
&mut batch,
|
||||||
|
(),
|
||||||
|
&tx.rct_signatures.base.commitments
|
||||||
|
));
|
||||||
|
|
||||||
|
for (i, clsag) in clsags.into_iter().enumerate() {
|
||||||
|
let (amount, key_offsets, image) = match &tx.prefix.inputs[i] {
|
||||||
|
Input::Gen(_) => panic!("Input::Gen"),
|
||||||
|
Input::ToKey { amount, key_offsets, key_image } => (amount, key_offsets, key_image),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut running_sum = 0;
|
||||||
|
let mut actual_indexes = vec![];
|
||||||
|
for offset in key_offsets {
|
||||||
|
running_sum += offset;
|
||||||
|
actual_indexes.push(running_sum);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
async fn get_outs(
|
||||||
struct Outs {
|
rpc: &Rpc<HttpRpc>,
|
||||||
outs: Vec<Out>,
|
amount: u64,
|
||||||
}
|
indexes: &[u64],
|
||||||
|
) -> Vec<[EdwardsPoint; 2]> {
|
||||||
let outs: Outs = loop {
|
#[derive(Deserialize, Debug)]
|
||||||
match rpc
|
struct Out {
|
||||||
.rpc_call(
|
key: String,
|
||||||
"get_outs",
|
mask: String,
|
||||||
Some(json!({
|
|
||||||
"get_txid": true,
|
|
||||||
"outputs": indexes.iter().map(|o| json!({
|
|
||||||
"amount": amount,
|
|
||||||
"index": o
|
|
||||||
})).collect::<Vec<_>>()
|
|
||||||
})),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(outs) => break outs,
|
|
||||||
Err(RpcError::ConnectionError) => {
|
|
||||||
println!("get_outs ConnectionError");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => panic!("couldn't connect to RPC to get outs: {e:?}"),
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
let rpc_point = |point: &str| {
|
#[derive(Deserialize, Debug)]
|
||||||
CompressedEdwardsY(
|
struct Outs {
|
||||||
hex::decode(point)
|
outs: Vec<Out>,
|
||||||
.expect("invalid hex for ring member")
|
}
|
||||||
.try_into()
|
|
||||||
.expect("invalid point len for ring member"),
|
|
||||||
)
|
|
||||||
.decompress()
|
|
||||||
.expect("invalid point for ring member")
|
|
||||||
};
|
|
||||||
|
|
||||||
outs
|
let outs: Outs = loop {
|
||||||
.outs
|
match rpc
|
||||||
.iter()
|
.rpc_call(
|
||||||
.map(|out| {
|
"get_outs",
|
||||||
let mask = rpc_point(&out.mask);
|
Some(json!({
|
||||||
if amount != 0 {
|
"get_txid": true,
|
||||||
assert_eq!(mask, Commitment::new(Scalar::from(1u8), amount).calculate());
|
"outputs": indexes.iter().map(|o| json!({
|
||||||
|
"amount": amount,
|
||||||
|
"index": o
|
||||||
|
})).collect::<Vec<_>>()
|
||||||
|
})),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(outs) => break outs,
|
||||||
|
Err(RpcError::ConnectionError) => {
|
||||||
|
println!("get_outs ConnectionError");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(e) => panic!("couldn't connect to RPC to get outs: {e:?}"),
|
||||||
}
|
}
|
||||||
[rpc_point(&out.key), mask]
|
};
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
clsag
|
let rpc_point = |point: &str| {
|
||||||
.verify(
|
CompressedEdwardsY(
|
||||||
&get_outs(&rpc, amount.unwrap_or(0), &actual_indexes).await,
|
hex::decode(point)
|
||||||
image,
|
.expect("invalid hex for ring member")
|
||||||
&pseudo_outs[i],
|
.try_into()
|
||||||
&sig_hash,
|
.expect("invalid point len for ring member"),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.decompress()
|
||||||
|
.expect("invalid point for ring member")
|
||||||
|
};
|
||||||
|
|
||||||
|
outs
|
||||||
|
.outs
|
||||||
|
.iter()
|
||||||
|
.map(|out| {
|
||||||
|
let mask = rpc_point(&out.mask);
|
||||||
|
if amount != 0 {
|
||||||
|
assert_eq!(mask, Commitment::new(Scalar::from(1u8), amount).calculate());
|
||||||
|
}
|
||||||
|
[rpc_point(&out.key), mask]
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
clsag
|
||||||
|
.verify(
|
||||||
|
&get_outs(&rpc, amount.unwrap_or(0), &actual_indexes).await,
|
||||||
|
image,
|
||||||
|
&pseudo_outs[i],
|
||||||
|
&sig_hash,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
assert!(batch.verify_vartime());
|
||||||
}
|
}
|
||||||
assert!(batch.verify_vartime());
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Deserialized, hashed, and reserialized {block_i} with {} TXs", txs_len);
|
println!("Deserialized, hashed, and reserialized {block_i} with {} TXs", txs_len);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "binaries")]
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
use binaries::*;
|
||||||
|
|
||||||
let args = std::env::args().collect::<Vec<String>>();
|
let args = std::env::args().collect::<Vec<String>>();
|
||||||
|
|
||||||
// Read start block as the first arg
|
// Read start block as the first arg
|
||||||
|
@ -307,3 +313,8 @@ async fn main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "binaries"))]
|
||||||
|
fn main() {
|
||||||
|
panic!("To run binaries, please build with `--feature binaries`.");
|
||||||
|
}
|
||||||
|
|
|
@ -1,114 +1,123 @@
|
||||||
use std::{
|
#[cfg(feature = "binaries")]
|
||||||
sync::{Arc, RwLock},
|
|
||||||
collections::HashMap,
|
|
||||||
};
|
|
||||||
|
|
||||||
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
|
|
||||||
use schnorr_signatures::SchnorrSignature;
|
|
||||||
|
|
||||||
use serai_primitives::NetworkId;
|
|
||||||
|
|
||||||
use jsonrpsee::{RpcModule, server::ServerBuilder};
|
|
||||||
|
|
||||||
mod messages;
|
mod messages;
|
||||||
use messages::*;
|
#[cfg(feature = "binaries")]
|
||||||
|
|
||||||
mod queue;
|
mod queue;
|
||||||
use queue::Queue;
|
|
||||||
|
|
||||||
type Db = serai_db::RocksDB;
|
#[cfg(feature = "binaries")]
|
||||||
|
mod binaries {
|
||||||
|
pub(crate) use std::{
|
||||||
|
sync::{Arc, RwLock},
|
||||||
|
collections::HashMap,
|
||||||
|
};
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
pub(crate) use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
|
||||||
static ref KEYS: Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>> =
|
pub(crate) use schnorr_signatures::SchnorrSignature;
|
||||||
Arc::new(RwLock::new(HashMap::new()));
|
|
||||||
static ref QUEUES: Arc<RwLock<HashMap<Service, RwLock<Queue<Db>>>>> =
|
|
||||||
Arc::new(RwLock::new(HashMap::new()));
|
|
||||||
}
|
|
||||||
|
|
||||||
// queue RPC method
|
pub(crate) use serai_primitives::NetworkId;
|
||||||
/*
|
|
||||||
Queues a message to be delivered from a processor to a coordinator, or vice versa.
|
|
||||||
|
|
||||||
Messages are authenticated to be coming from the claimed service. Recipient services SHOULD
|
pub(crate) use jsonrpsee::{RpcModule, server::ServerBuilder};
|
||||||
independently verify signatures.
|
|
||||||
|
|
||||||
The metadata specifies an intent. Only one message, for a specified intent, will be delivered.
|
pub(crate) use crate::messages::*;
|
||||||
This allows services to safely send messages multiple times without them being delivered multiple
|
|
||||||
times.
|
|
||||||
|
|
||||||
The message will be ordered by this service, with the order having no guarantees other than
|
pub(crate) use crate::queue::Queue;
|
||||||
successful ordering by the time this call returns.
|
|
||||||
*/
|
pub(crate) type Db = serai_db::RocksDB;
|
||||||
fn queue_message(meta: Metadata, msg: Vec<u8>, sig: SchnorrSignature<Ristretto>) {
|
|
||||||
{
|
lazy_static::lazy_static! {
|
||||||
let from = (*KEYS).read().unwrap()[&meta.from];
|
pub(crate) static ref KEYS: Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>> =
|
||||||
assert!(
|
Arc::new(RwLock::new(HashMap::new()));
|
||||||
sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R))
|
pub(crate) static ref QUEUES: Arc<RwLock<HashMap<Service, RwLock<Queue<Db>>>>> =
|
||||||
);
|
Arc::new(RwLock::new(HashMap::new()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assert one, and only one of these, is the coordinator
|
// queue RPC method
|
||||||
assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator));
|
/*
|
||||||
|
Queues a message to be delivered from a processor to a coordinator, or vice versa.
|
||||||
|
|
||||||
// TODO: Verify (from, intent) hasn't been prior seen
|
Messages are authenticated to be coming from the claimed service. Recipient services SHOULD
|
||||||
|
independently verify signatures.
|
||||||
|
|
||||||
// Queue it
|
The metadata specifies an intent. Only one message, for a specified intent, will be delivered.
|
||||||
let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage {
|
This allows services to safely send messages multiple times without them being delivered
|
||||||
from: meta.from,
|
multiple times.
|
||||||
// Temporary value which queue_message will override
|
|
||||||
id: u64::MAX,
|
|
||||||
msg,
|
|
||||||
sig: sig.serialize(),
|
|
||||||
});
|
|
||||||
|
|
||||||
log::info!("Queued message from {:?}. It is {:?} {id}", meta.from, meta.to);
|
The message will be ordered by this service, with the order having no guarantees other than
|
||||||
}
|
successful ordering by the time this call returns.
|
||||||
|
*/
|
||||||
|
pub(crate) fn queue_message(meta: Metadata, msg: Vec<u8>, sig: SchnorrSignature<Ristretto>) {
|
||||||
|
{
|
||||||
|
let from = (*KEYS).read().unwrap()[&meta.from];
|
||||||
|
assert!(
|
||||||
|
sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// next RPC method
|
// Assert one, and only one of these, is the coordinator
|
||||||
/*
|
assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator));
|
||||||
Gets the next message in queue for this service.
|
|
||||||
|
|
||||||
This is not authenticated due to the fact every nonce would have to be saved to prevent replays,
|
// TODO: Verify (from, intent) hasn't been prior seen
|
||||||
or a challenge-response protocol implemented. Neither are worth doing when there should be no
|
|
||||||
sensitive data on this server.
|
|
||||||
|
|
||||||
The expected index is used to ensure a service didn't fall out of sync with this service. It
|
// Queue it
|
||||||
should always be either the next message's ID or *TODO*.
|
let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage {
|
||||||
*/
|
from: meta.from,
|
||||||
fn get_next_message(service: Service, _expected: u64) -> Option<QueuedMessage> {
|
// Temporary value which queue_message will override
|
||||||
// TODO: Verify the expected next message ID matches
|
id: u64::MAX,
|
||||||
|
msg,
|
||||||
|
sig: sig.serialize(),
|
||||||
|
});
|
||||||
|
|
||||||
let queue_outer = (*QUEUES).read().unwrap();
|
log::info!("Queued message from {:?}. It is {:?} {id}", meta.from, meta.to);
|
||||||
let queue = queue_outer[&service].read().unwrap();
|
|
||||||
let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0);
|
|
||||||
queue.get_message(next)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ack RPC method
|
|
||||||
/*
|
|
||||||
Acknowledges a message as received and handled, meaning it'll no longer be returned as the next
|
|
||||||
message.
|
|
||||||
*/
|
|
||||||
fn ack_message(service: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
|
|
||||||
{
|
|
||||||
let from = (*KEYS).read().unwrap()[&service];
|
|
||||||
assert!(sig.verify(from, ack_challenge(service, from, id, sig.R)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Is it:
|
// next RPC method
|
||||||
// The acknowledged message should be > last acknowledged OR
|
/*
|
||||||
// The acknowledged message should be >=
|
Gets the next message in queue for this service.
|
||||||
// It's the first if we save messages as acknowledged before acknowledging them
|
|
||||||
// It's the second if we acknowledge messages before saving them as acknowledged
|
|
||||||
// TODO: Check only a proper message is being acked
|
|
||||||
|
|
||||||
log::info!("{:?} is acknowledging {}", service, id);
|
This is not authenticated due to the fact every nonce would have to be saved to prevent replays,
|
||||||
|
or a challenge-response protocol implemented. Neither are worth doing when there should be no
|
||||||
|
sensitive data on this server.
|
||||||
|
|
||||||
(*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id)
|
The expected index is used to ensure a service didn't fall out of sync with this service. It
|
||||||
|
should always be either the next message's ID or *TODO*.
|
||||||
|
*/
|
||||||
|
pub(crate) fn get_next_message(service: Service, _expected: u64) -> Option<QueuedMessage> {
|
||||||
|
// TODO: Verify the expected next message ID matches
|
||||||
|
|
||||||
|
let queue_outer = (*QUEUES).read().unwrap();
|
||||||
|
let queue = queue_outer[&service].read().unwrap();
|
||||||
|
let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0);
|
||||||
|
queue.get_message(next)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ack RPC method
|
||||||
|
/*
|
||||||
|
Acknowledges a message as received and handled, meaning it'll no longer be returned as the next
|
||||||
|
message.
|
||||||
|
*/
|
||||||
|
pub(crate) fn ack_message(service: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
|
||||||
|
{
|
||||||
|
let from = (*KEYS).read().unwrap()[&service];
|
||||||
|
assert!(sig.verify(from, ack_challenge(service, from, id, sig.R)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is it:
|
||||||
|
// The acknowledged message should be > last acknowledged OR
|
||||||
|
// The acknowledged message should be >=
|
||||||
|
// It's the first if we save messages as acknowledged before acknowledging them
|
||||||
|
// It's the second if we acknowledge messages before saving them as acknowledged
|
||||||
|
// TODO: Check only a proper message is being acked
|
||||||
|
|
||||||
|
log::info!("{:?} is acknowledging {}", service, id);
|
||||||
|
|
||||||
|
(*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "binaries")]
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
use binaries::*;
|
||||||
|
|
||||||
if std::env::var("RUST_LOG").is_err() {
|
if std::env::var("RUST_LOG").is_err() {
|
||||||
std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string()));
|
std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string()));
|
||||||
}
|
}
|
||||||
|
@ -192,3 +201,8 @@ async fn main() {
|
||||||
// Run until stopped, which it never will
|
// Run until stopped, which it never will
|
||||||
server.start(module).unwrap().stopped().await;
|
server.start(module).unwrap().stopped().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "binaries"))]
|
||||||
|
fn main() {
|
||||||
|
panic!("To run binaries, please build with `--feature binaries`.");
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue