Start work on a send_test

Stops work where it does to the processor panickinng for Monero, yet not
Bitcoin, under what's present.

Cleans up processor tests to consolidate shared code.
This commit is contained in:
Luke Parker 2023-07-29 04:26:24 -04:00
parent 22da7aedde
commit f78332453b
No known key found for this signature in database
8 changed files with 381 additions and 60 deletions

4
Cargo.lock generated
View file

@ -8191,14 +8191,12 @@ dependencies = [
"monero-serai",
"parity-scale-codec",
"rand_core 0.6.4",
"serai-client",
"serai-docker-tests",
"serai-in-instructions-primitives",
"serai-message-queue",
"serai-message-queue-tests",
"serai-primitives",
"serai-processor",
"serai-processor-messages",
"serai-validator-sets-primitives",
"serde",
"serde_json",
"tokio",

View file

@ -29,9 +29,7 @@ monero-serai = { path = "../../coins/monero" }
messages = { package = "serai-processor-messages", path = "../../processor/messages" }
scale = { package = "parity-scale-codec", version = "3" }
serai-primitives = { path = "../../substrate/primitives" }
serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives" }
serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives" }
serai-client = { path = "../../substrate/client" }
serai-message-queue = { path = "../../message-queue" }
serde = { version = "1", default-features = false }

View file

@ -5,7 +5,7 @@ use rand_core::{RngCore, OsRng};
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
use serai_primitives::NetworkId;
use serai_client::primitives::NetworkId;
use messages::{ProcessorMessage, CoordinatorMessage};
use serai_message_queue::{Service, Metadata, client::MessageQueue};
@ -57,9 +57,10 @@ pub fn processor_instance(
)
}
pub type Handles = (String, String, String);
pub fn processor_stack(
network: NetworkId,
) -> ((String, String, String), <Ristretto as Ciphersuite>::F, Vec<Composition>) {
) -> (Handles, <Ristretto as Ciphersuite>::F, Vec<Composition>) {
let (network_composition, network_rpc_port) = network_instance(network);
let (coord_key, message_queue_keys, message_queue_composition) =

View file

@ -5,9 +5,11 @@ use rand_core::{RngCore, OsRng};
use scale::Encode;
use serai_primitives::{NetworkId, Amount};
use serai_validator_sets_primitives::ExternalKey;
use serai_in_instructions_primitives::{InInstruction, RefundableInInstruction, Shorthand};
use serai_client::{
primitives::{Amount, NetworkId, Coin, Balance, ExternalAddress},
validator_sets::primitives::ExternalKey,
in_instructions::primitives::{InInstruction, RefundableInInstruction, Shorthand},
};
use dockertest::{PullPolicy, Image, StartPolicy, Composition, DockerOperations};
@ -221,7 +223,7 @@ impl Wallet {
ops: &DockerOperations,
to: &ExternalKey,
instruction: Option<InInstruction>,
) -> (Vec<u8>, Amount) {
) -> (Vec<u8>, Balance) {
match self {
Wallet::Bitcoin { private_key, public_key, ref mut input_tx } => {
use bitcoin_serai::bitcoin::{
@ -298,7 +300,7 @@ impl Wallet {
let mut buf = vec![];
tx.consensus_encode(&mut buf).unwrap();
*input_tx = tx;
(buf, Amount(AMOUNT))
(buf, Balance { coin: Coin::Bitcoin, amount: Amount(AMOUNT) })
}
Wallet::Monero { handle, ref spend_key, ref view_pair, ref mut inputs } => {
@ -376,7 +378,31 @@ impl Wallet {
.remove(0),
);
(tx.serialize(), Amount(AMOUNT))
(tx.serialize(), Balance { coin: Coin::Monero, amount: Amount(AMOUNT) })
}
}
}
pub fn address(&self) -> ExternalAddress {
use serai_client::coins;
match self {
Wallet::Bitcoin { public_key, .. } => {
use bitcoin_serai::bitcoin::{Network, Address};
ExternalAddress::new(
coins::bitcoin::Address(Address::p2pkh(public_key, Network::Regtest)).try_into().unwrap(),
)
.unwrap()
}
Wallet::Monero { view_pair, .. } => {
use monero_serai::wallet::address::{Network, AddressSpec};
ExternalAddress::new(
coins::monero::Address::new(view_pair.address(Network::Mainnet, AddressSpec::Standard))
.unwrap()
.try_into()
.unwrap(),
)
.unwrap()
}
}
}

View file

@ -1,17 +1,18 @@
use std::collections::HashMap;
use std::{
collections::HashMap,
time::{SystemTime, Duration},
};
use dkg::{Participant, tests::clone_without};
use messages::sign::SignId;
use messages::{sign::SignId, SubstrateContext};
use serai_primitives::{
BlockHash, crypto::RuntimePublic, PublicKey, SeraiAddress, NetworkId, Coin, Balance,
use serai_client::{
primitives::{BlockHash, crypto::RuntimePublic, PublicKey, SeraiAddress, NetworkId},
in_instructions::primitives::{
InInstruction, InInstructionWithBalance, SignedBatch, batch_message,
},
};
use serai_in_instructions_primitives::{
InInstruction, InInstructionWithBalance, SignedBatch, batch_message,
};
use dockertest::DockerTest;
use crate::{*, tests::*};
@ -135,21 +136,46 @@ pub(crate) async fn sign_batch(
batch.unwrap()
}
pub(crate) async fn substrate_block(
coordinator: &mut Coordinator,
block: messages::substrate::CoordinatorMessage,
) {
match block.clone() {
messages::substrate::CoordinatorMessage::SubstrateBlock {
context: _,
network: sent_network,
block: sent_block,
key: _,
burns,
} => {
coordinator.send_message(block).await;
match coordinator.recv_message().await {
messages::ProcessorMessage::Coordinator(
messages::coordinator::ProcessorMessage::SubstrateBlockAck {
network: recvd_network,
block: recvd_block,
plans,
},
) => {
assert_eq!(recvd_network, sent_network);
assert_eq!(recvd_block, sent_block);
// TODO: This isn't the correct formula at all
assert_eq!(plans.len(), if burns.is_empty() { 0 } else { 1 });
}
_ => panic!("coordinator didn't respond to SubstrateBlock with SubstrateBlockAck"),
}
}
_ => panic!("substrate_block message wasn't a SubstrateBlock"),
}
}
#[test]
fn batch_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let mut coordinators = vec![];
let mut test = DockerTest::new();
for _ in 0 .. COORDINATORS {
let (handles, coord_key, compositions) = processor_stack(network);
coordinators.push((handles, coord_key));
for composition in compositions {
test.add_composition(composition);
}
}
let (coordinators, test) = new_test(network);
test.run(|ops| async move {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let mut coordinators = coordinators
.into_iter()
@ -173,6 +199,7 @@ fn batch_test() {
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Run twice, once with an instruction and once without
let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1;
for i in 0 .. 2 {
let mut serai_address = [0; 32];
OsRng.fill_bytes(&mut serai_address);
@ -180,7 +207,7 @@ fn batch_test() {
if i == 1 { Some(InInstruction::Transfer(SeraiAddress(serai_address))) } else { None };
// Send into the processor's wallet
let (tx, amount_sent) =
let (tx, balance_sent) =
wallet.send_to_address(&ops, &key_pair.1, instruction.clone()).await;
for coordinator in &mut coordinators {
coordinator.publish_transacton(&ops, &tx).await;
@ -198,7 +225,7 @@ fn batch_test() {
// Sleep for 10s
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
// Make sure the proceessors picked it up by checking they're trying to sign a batch for it
let (mut id, mut preprocesses) =
@ -229,18 +256,7 @@ fn batch_test() {
if let Some(instruction) = instruction {
assert_eq!(
batch.batch.instructions,
vec![InInstructionWithBalance {
instruction,
balance: Balance {
coin: match network {
NetworkId::Bitcoin => Coin::Bitcoin,
NetworkId::Ethereum => todo!(),
NetworkId::Monero => Coin::Monero,
NetworkId::Serai => panic!("running processor tests on Serai"),
},
amount: amount_sent,
}
}]
vec![InInstructionWithBalance { instruction, balance: balance_sent }]
);
} else {
// This shouldn't have an instruction as we didn't add any data into the TX we sent
@ -248,6 +264,27 @@ fn batch_test() {
// contained outputs
assert!(batch.batch.instructions.is_empty());
}
// Fire a SubstrateBlock
let serai_time =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
for coordinator in &mut coordinators {
substrate_block(
coordinator,
messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {
serai_time,
coin_latest_finalized_block: batch.batch.block,
},
network,
block: substrate_block_num + u64::from(i),
// TODO: Should we use the network key here? Or should we only use the Ristretto key?
key: key_pair.1.to_vec(),
burns: vec![],
},
)
.await;
}
}
});
}

View file

@ -2,13 +2,13 @@ use std::{collections::HashMap, time::SystemTime};
use dkg::{Participant, ThresholdParams, tests::clone_without};
use serai_primitives::{NetworkId, BlockHash, PublicKey};
use serai_validator_sets_primitives::{Session, KeyPair, ValidatorSet};
use serai_client::{
primitives::{NetworkId, BlockHash, PublicKey},
validator_sets::primitives::{Session, KeyPair, ValidatorSet},
};
use messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage, ProcessorMessage};
use dockertest::DockerTest;
use crate::{*, tests::*};
pub(crate) async fn key_gen(coordinators: &mut [Coordinator], network: NetworkId) -> KeyPair {
@ -140,15 +140,7 @@ pub(crate) async fn key_gen(coordinators: &mut [Coordinator], network: NetworkId
#[test]
fn key_gen_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let mut coordinators = vec![];
let mut test = DockerTest::new();
for _ in 0 .. COORDINATORS {
let (handles, coord_key, compositions) = processor_stack(network);
coordinators.push((handles, coord_key));
for composition in compositions {
test.add_composition(composition);
}
}
let (coordinators, test) = new_test(network);
test.run(|ops| async move {
// Sleep for a second for the message-queue to boot

View file

@ -1,7 +1,31 @@
use ciphersuite::{Ciphersuite, Ristretto};
use serai_client::primitives::NetworkId;
use dockertest::DockerTest;
use crate::*;
mod key_gen;
pub(crate) use key_gen::key_gen;
mod batch;
pub(crate) use batch::{recv_batch_preprocesses, sign_batch, substrate_block};
mod send;
pub(crate) const COORDINATORS: usize = 4;
pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1;
fn new_test(network: NetworkId) -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
let mut coordinators = vec![];
let mut test = DockerTest::new();
for _ in 0 .. COORDINATORS {
let (handles, coord_key, compositions) = processor_stack(network);
coordinators.push((handles, coord_key));
for composition in compositions {
test.add_composition(composition);
}
}
(coordinators, test)
}

View file

@ -0,0 +1,245 @@
use std::{
collections::HashMap,
time::{SystemTime, Duration},
};
use dkg::{Participant, tests::clone_without};
use messages::{sign::SignId, SubstrateContext};
use serai_client::{
primitives::{BlockHash, NetworkId},
tokens::primitives::{OutInstruction, OutInstructionWithBalance},
};
use crate::{*, tests::*};
#[allow(unused)]
pub(crate) async fn recv_sign_preprocesses(
coordinators: &mut [Coordinator],
key: [u8; 32],
attempt: u32,
) -> (SignId, HashMap<Participant, Vec<u8>>) {
let mut id = None;
let mut preprocesses = HashMap::new();
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
let msg = coordinator.recv_message().await;
match msg {
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Preprocess {
id: this_id,
preprocess,
}) => {
if id.is_none() {
assert_eq!(&this_id.key, &key);
assert_eq!(this_id.attempt, attempt);
id = Some(this_id.clone());
}
assert_eq!(&this_id, id.as_ref().unwrap());
preprocesses.insert(i, preprocess);
}
_ => panic!("processor didn't send sign preprocess"),
}
}
// Reduce the preprocesses down to the threshold
while preprocesses.len() > THRESHOLD {
preprocesses.remove(
&Participant::new(
u16::try_from(OsRng.next_u64() % u64::try_from(COORDINATORS).unwrap()).unwrap() + 1,
)
.unwrap(),
);
}
(id.unwrap(), preprocesses)
}
#[allow(unused)]
pub(crate) async fn sign_tx(
coordinators: &mut [Coordinator],
id: SignId,
preprocesses: HashMap<Participant, Vec<u8>>,
) -> Vec<u8> {
assert_eq!(preprocesses.len(), THRESHOLD);
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
if preprocesses.contains_key(&i) {
coordinator
.send_message(messages::sign::CoordinatorMessage::Preprocesses {
id: id.clone(),
preprocesses: clone_without(&preprocesses, &i),
})
.await;
}
}
let mut shares = HashMap::new();
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
if preprocesses.contains_key(&i) {
match coordinator.recv_message().await {
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Share {
id: this_id,
share,
}) => {
assert_eq!(&this_id, &id);
shares.insert(i, share);
}
_ => panic!("processor didn't send TX shares"),
}
}
}
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
if preprocesses.contains_key(&i) {
coordinator
.send_message(messages::sign::CoordinatorMessage::Shares {
id: id.clone(),
shares: clone_without(&shares, &i),
})
.await;
}
}
// The selected processors should yield Completed
let mut tx = None;
for (i, coordinator) in coordinators.iter_mut().enumerate() {
let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap();
if preprocesses.contains_key(&i) {
match coordinator.recv_message().await {
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed {
key,
id: this_id,
tx: this_tx,
}) => {
assert_eq!(&key, &id.key);
assert_eq!(&this_id, &id.id);
if tx.is_none() {
tx = Some(this_tx.clone());
}
assert_eq!(tx.as_ref().unwrap(), &this_tx);
}
_ => panic!("processor didn't send Completed"),
}
}
}
tx.unwrap()
}
#[test]
fn send_test() {
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
let (coordinators, test) = new_test(network);
test.run(|ops| async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let mut coordinators = coordinators
.into_iter()
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
.collect::<Vec<_>>();
// Create a wallet before we start generating keys
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Generate keys
let key_pair = key_gen(&mut coordinators, network).await;
// Now we we have to mine blocks to activate the key
// (the first key is activated when the coin's block time exceeds the Serai time it was
// confirmed at)
for _ in 0 .. confirmations(network) {
coordinators[0].add_block(&ops).await;
}
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Send into the processor's wallet
let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, None).await;
for coordinator in &mut coordinators {
coordinator.publish_transacton(&ops, &tx).await;
}
// Put the TX past the confirmation depth
let mut block_with_tx = None;
for _ in 0 .. confirmations(network) {
let (hash, _) = coordinators[0].add_block(&ops).await;
if block_with_tx.is_none() {
block_with_tx = Some(hash);
}
}
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
// Sleep for 10s
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency
tokio::time::sleep(Duration::from_secs(10)).await;
// Make sure the proceessors picked it up by checking they're trying to sign a batch for it
let (id, preprocesses) = recv_batch_preprocesses(&mut coordinators, key_pair.0 .0, 0).await;
// Continue with signing the batch
let batch = sign_batch(&mut coordinators, id, preprocesses).await;
// Check it
assert_eq!(batch.batch.network, network);
assert_eq!(batch.batch.id, 0);
assert_eq!(batch.batch.block, BlockHash(block_with_tx.unwrap()));
assert!(batch.batch.instructions.is_empty());
// Fire a SubstrateBlock with a burn
let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1;
let serai_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
for coordinator in &mut coordinators {
substrate_block(
coordinator,
messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {
serai_time,
coin_latest_finalized_block: batch.batch.block,
},
network,
block: substrate_block_num,
// TODO: Should we use the network key here? Or should we only use the Ristretto key?
key: key_pair.1.to_vec(),
burns: vec![OutInstructionWithBalance {
instruction: OutInstruction { address: wallet.address(), data: None },
balance: balance_sent,
}],
},
)
.await;
}
/*
// Trigger a random amount of re-attempts
for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
// TODO: Double check how the processor handles this ID field
// It should be able to assert its perfectly sequential
id.attempt = attempt;
for coordinator in coordinators.iter_mut() {
coordinator
.send_message(messages::sign::CoordinatorMessage::Reattempt {
id: id.clone(),
})
.await;
}
(id, preprocesses) =
recv_batch_preprocesses(&mut coordinators, key_pair.0 .0, attempt).await;
}
*/
});
}
}