mirror of
https://github.com/serai-dex/serai.git
synced 2024-11-16 17:07:35 +00:00
Simultaenously build Docker images used in tests
This commit is contained in:
parent
571195bfda
commit
292263b21e
23 changed files with 639 additions and 526 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -7646,6 +7646,7 @@ name = "serai-docker-tests"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -7665,6 +7666,7 @@ dependencies = [
|
|||
"rand_core",
|
||||
"serai-client",
|
||||
"serai-coordinator-tests",
|
||||
"serai-docker-tests",
|
||||
"serai-message-queue-tests",
|
||||
"serai-processor",
|
||||
"serai-processor-tests",
|
||||
|
|
|
@ -19,8 +19,8 @@ mod bitcoin {
|
|||
check::<IsTrue<{ Bitcoin::DUST >= bitcoin_serai::wallet::DUST }>>();
|
||||
}
|
||||
|
||||
fn spawn_bitcoin() -> DockerTest {
|
||||
serai_docker_tests::build("bitcoin".to_string());
|
||||
async fn spawn_bitcoin() -> DockerTest {
|
||||
serai_docker_tests::build("bitcoin".to_string()).await;
|
||||
|
||||
let composition = TestBodySpecification::with_image(
|
||||
Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never),
|
||||
|
@ -73,8 +73,8 @@ mod monero {
|
|||
use super::*;
|
||||
use crate::networks::{Network, Monero};
|
||||
|
||||
fn spawn_monero() -> DockerTest {
|
||||
serai_docker_tests::build("monero".to_string());
|
||||
async fn spawn_monero() -> DockerTest {
|
||||
serai_docker_tests::build("monero".to_string()).await;
|
||||
|
||||
let composition = TestBodySpecification::with_image(
|
||||
Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never),
|
||||
|
|
|
@ -44,49 +44,59 @@ macro_rules! test_network {
|
|||
test_key_gen::<$N>().await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn $scanner() {
|
||||
#[tokio::test]
|
||||
async fn $scanner() {
|
||||
*INIT_LOGGER;
|
||||
let docker = $docker();
|
||||
docker.run(|ops| async move {
|
||||
test_scanner($network(&ops).await).await;
|
||||
});
|
||||
let docker = $docker().await;
|
||||
docker
|
||||
.run_async(|ops| async move {
|
||||
test_scanner($network(&ops).await).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn $signer() {
|
||||
#[tokio::test]
|
||||
async fn $signer() {
|
||||
*INIT_LOGGER;
|
||||
let docker = $docker();
|
||||
docker.run(|ops| async move {
|
||||
test_signer($network(&ops).await).await;
|
||||
});
|
||||
let docker = $docker().await;
|
||||
docker
|
||||
.run_async(|ops| async move {
|
||||
test_signer($network(&ops).await).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn $wallet() {
|
||||
#[tokio::test]
|
||||
async fn $wallet() {
|
||||
*INIT_LOGGER;
|
||||
let docker = $docker();
|
||||
docker.run(|ops| async move {
|
||||
test_wallet($network(&ops).await).await;
|
||||
});
|
||||
let docker = $docker().await;
|
||||
docker
|
||||
.run_async(|ops| async move {
|
||||
test_wallet($network(&ops).await).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn $addresses() {
|
||||
#[tokio::test]
|
||||
async fn $addresses() {
|
||||
*INIT_LOGGER;
|
||||
let docker = $docker();
|
||||
docker.run(|ops| async move {
|
||||
test_addresses($network(&ops).await).await;
|
||||
});
|
||||
let docker = $docker().await;
|
||||
docker
|
||||
.run_async(|ops| async move {
|
||||
test_addresses($network(&ops).await).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn $no_deadlock_in_multisig_completed() {
|
||||
#[tokio::test]
|
||||
async fn $no_deadlock_in_multisig_completed() {
|
||||
*INIT_LOGGER;
|
||||
let docker = $docker();
|
||||
docker.run(|ops| async move {
|
||||
test_no_deadlock_in_multisig_completed($network(&ops).await).await;
|
||||
});
|
||||
let docker = $docker().await;
|
||||
docker
|
||||
.run_async(|ops| async move {
|
||||
test_no_deadlock_in_multisig_completed($network(&ops).await).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ macro_rules! serai_test {
|
|||
TestBodySpecification, DockerTest,
|
||||
};
|
||||
|
||||
serai_docker_tests::build("serai".to_string());
|
||||
serai_docker_tests::build("serai".to_string()).await;
|
||||
|
||||
let handle = concat!("serai_client-serai_node-", stringify!($name));
|
||||
|
||||
|
|
|
@ -37,14 +37,18 @@ mod tests;
|
|||
|
||||
static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new();
|
||||
|
||||
pub fn coordinator_instance(
|
||||
pub fn coordinator_docker_name() -> String {
|
||||
"serai-dev-coordinator".to_string()
|
||||
}
|
||||
|
||||
pub async fn coordinator_instance(
|
||||
name: &str,
|
||||
message_queue_key: <Ristretto as Ciphersuite>::F,
|
||||
) -> TestBodySpecification {
|
||||
serai_docker_tests::build("coordinator".to_string());
|
||||
serai_docker_tests::build("coordinator".to_string()).await;
|
||||
|
||||
TestBodySpecification::with_image(
|
||||
Image::with_repository("serai-dev-coordinator").pull_policy(PullPolicy::Never),
|
||||
Image::with_repository(coordinator_docker_name()).pull_policy(PullPolicy::Never),
|
||||
)
|
||||
.replace_env(
|
||||
[
|
||||
|
@ -63,11 +67,15 @@ pub fn coordinator_instance(
|
|||
)
|
||||
}
|
||||
|
||||
pub fn serai_composition(name: &str) -> TestBodySpecification {
|
||||
serai_docker_tests::build("serai".to_string());
|
||||
pub fn serai_docker_name() -> String {
|
||||
"serai-dev-serai".to_string()
|
||||
}
|
||||
|
||||
pub async fn serai_composition(name: &str) -> TestBodySpecification {
|
||||
serai_docker_tests::build("serai".to_string()).await;
|
||||
|
||||
TestBodySpecification::with_image(
|
||||
Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never),
|
||||
Image::with_repository(serai_docker_name()).pull_policy(PullPolicy::Never),
|
||||
)
|
||||
.replace_cmd(vec![
|
||||
"serai-node".to_string(),
|
||||
|
@ -82,15 +90,22 @@ pub fn serai_composition(name: &str) -> TestBodySpecification {
|
|||
}
|
||||
|
||||
pub type Handles = (String, String, String);
|
||||
pub fn coordinator_stack(
|
||||
pub async fn coordinator_stack(
|
||||
name: &str,
|
||||
) -> (Handles, <Ristretto as Ciphersuite>::F, Vec<TestBodySpecification>) {
|
||||
let serai_composition = serai_composition(name);
|
||||
serai_docker_tests::build_batch(vec![
|
||||
serai_docker_name(),
|
||||
serai_message_queue_tests::docker_name(),
|
||||
coordinator_docker_name(),
|
||||
])
|
||||
.await;
|
||||
|
||||
let serai_composition = serai_composition(name).await;
|
||||
|
||||
let (coord_key, message_queue_keys, message_queue_composition) =
|
||||
serai_message_queue_tests::instance();
|
||||
serai_message_queue_tests::instance().await;
|
||||
|
||||
let coordinator_composition = coordinator_instance(name, coord_key);
|
||||
let coordinator_composition = coordinator_instance(name, coord_key).await;
|
||||
|
||||
// Give every item in this stack a unique ID
|
||||
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits
|
||||
|
|
|
@ -253,7 +253,7 @@ pub async fn batch(
|
|||
#[tokio::test]
|
||||
async fn batch_test() {
|
||||
let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock();
|
||||
let (processors, test) = new_test();
|
||||
let (processors, test) = new_test().await;
|
||||
|
||||
test
|
||||
.run_async(|ops| async move {
|
||||
|
|
|
@ -222,7 +222,7 @@ pub async fn key_gen<C: Ciphersuite>(
|
|||
#[tokio::test]
|
||||
async fn key_gen_test() {
|
||||
let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock();
|
||||
let (processors, test) = new_test();
|
||||
let (processors, test) = new_test().await;
|
||||
|
||||
test
|
||||
.run_async(|ops| async move {
|
||||
|
|
|
@ -21,7 +21,7 @@ pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1;
|
|||
|
||||
pub(crate) static ONE_AT_A_TIME: OnceLock<Mutex<()>> = OnceLock::new();
|
||||
|
||||
pub(crate) fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
|
||||
pub(crate) async fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
|
||||
let mut coordinators = vec![];
|
||||
let mut test = DockerTest::new().with_network(dockertest::Network::Isolated);
|
||||
for i in 0 .. COORDINATORS {
|
||||
|
@ -33,7 +33,8 @@ pub(crate) fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, Dock
|
|||
4 => "Eve",
|
||||
5 => "Ferdie",
|
||||
_ => panic!("needed a 7th name for a serai node"),
|
||||
});
|
||||
})
|
||||
.await;
|
||||
coordinators.push((handles, coord_key));
|
||||
for composition in compositions {
|
||||
test.provide_container(composition);
|
||||
|
|
|
@ -170,7 +170,7 @@ pub async fn sign(
|
|||
#[tokio::test]
|
||||
async fn sign_test() {
|
||||
let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock();
|
||||
let (processors, test) = new_test();
|
||||
let (processors, test) = new_test().await;
|
||||
|
||||
test
|
||||
.run_async(|ops| async move {
|
||||
|
|
|
@ -15,3 +15,5 @@ rustdoc-args = ["--cfg", "docsrs"]
|
|||
|
||||
[dependencies]
|
||||
chrono = "0.4"
|
||||
|
||||
tokio = { version = "1", default-features = false, features = ["sync", "rt"] }
|
||||
|
|
|
@ -1,19 +1,27 @@
|
|||
use std::{
|
||||
sync::{Mutex, OnceLock},
|
||||
sync::{OnceLock, Arc},
|
||||
collections::{HashSet, HashMap},
|
||||
time::SystemTime,
|
||||
path::PathBuf,
|
||||
fs, env,
|
||||
process::Command,
|
||||
};
|
||||
|
||||
static BUILT: OnceLock<Mutex<HashMap<String, bool>>> = OnceLock::new();
|
||||
pub fn build(name: String) {
|
||||
use tokio::{sync::Mutex, process::Command};
|
||||
|
||||
static BUILT: OnceLock<Mutex<HashMap<String, Arc<Mutex<bool>>>>> = OnceLock::new();
|
||||
pub async fn build(name: String) {
|
||||
let built = BUILT.get_or_init(|| Mutex::new(HashMap::new()));
|
||||
// Only one call to build will acquire this lock
|
||||
let mut built_lock = built.lock().unwrap();
|
||||
if built_lock.contains_key(&name) {
|
||||
// If it was built, return
|
||||
let mut built_lock = built.lock().await;
|
||||
if !built_lock.contains_key(&name) {
|
||||
built_lock.insert(name.clone(), Arc::new(Mutex::new(false)));
|
||||
}
|
||||
let this_lock = built_lock[&name].clone();
|
||||
drop(built_lock);
|
||||
|
||||
let mut built_lock = this_lock.lock().await;
|
||||
// Already built
|
||||
if *built_lock {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -38,6 +46,7 @@ pub fn build(name: String) {
|
|||
.arg("{{ .Metadata.LastTagTime }}")
|
||||
.arg(format!("serai-dev-{name}"))
|
||||
.output()
|
||||
.await
|
||||
{
|
||||
let last_tag_time_buf = String::from_utf8(res.stdout).expect("docker had non-utf8 output");
|
||||
let last_tag_time = last_tag_time_buf.trim();
|
||||
|
@ -133,7 +142,7 @@ pub fn build(name: String) {
|
|||
if let Some(last_modified) = last_modified {
|
||||
if last_modified < created_time {
|
||||
println!("{} was built after the most recent source code edits, assuming built.", name);
|
||||
built_lock.insert(name, true);
|
||||
*built_lock = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -151,6 +160,7 @@ pub fn build(name: String) {
|
|||
.spawn()
|
||||
.unwrap()
|
||||
.wait()
|
||||
.await
|
||||
.unwrap()
|
||||
.success()
|
||||
{
|
||||
|
@ -194,6 +204,7 @@ pub fn build(name: String) {
|
|||
.arg("--all")
|
||||
.arg("--force")
|
||||
.output()
|
||||
.await
|
||||
.unwrap()
|
||||
.status
|
||||
.success()
|
||||
|
@ -203,5 +214,15 @@ pub fn build(name: String) {
|
|||
}
|
||||
|
||||
// Set built
|
||||
built_lock.insert(name, true);
|
||||
*built_lock = true;
|
||||
}
|
||||
|
||||
pub async fn build_batch(names: Vec<String>) {
|
||||
let mut handles = vec![];
|
||||
for name in names.into_iter().collect::<HashSet<_>>() {
|
||||
handles.push(tokio::spawn(build(name)));
|
||||
}
|
||||
for handle in handles {
|
||||
handle.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ serai-client = { path = "../../substrate/client", features = ["serai"] }
|
|||
tokio = { version = "1", features = ["time"] }
|
||||
|
||||
dockertest = "0.4"
|
||||
serai-docker-tests = { path = "../docker" }
|
||||
serai-message-queue-tests = { path = "../message-queue" }
|
||||
serai-processor-tests = { path = "../processor" }
|
||||
serai-coordinator-tests = { path = "../coordinator" }
|
||||
|
|
|
@ -31,19 +31,30 @@ pub struct Handles {
|
|||
serai: String,
|
||||
}
|
||||
|
||||
pub fn full_stack(name: &str) -> (Handles, Vec<TestBodySpecification>) {
|
||||
let (coord_key, message_queue_keys, message_queue_composition) = message_queue_instance();
|
||||
pub async fn full_stack(name: &str) -> (Handles, Vec<TestBodySpecification>) {
|
||||
let mut docker_names = serai_processor_tests::docker_names(NetworkId::Bitcoin);
|
||||
docker_names.append(&mut serai_processor_tests::docker_names(NetworkId::Monero));
|
||||
docker_names.extend([
|
||||
serai_message_queue_tests::docker_name(),
|
||||
serai_coordinator_tests::serai_docker_name(),
|
||||
serai_coordinator_tests::coordinator_docker_name(),
|
||||
]);
|
||||
serai_docker_tests::build_batch(docker_names).await;
|
||||
|
||||
let (bitcoin_composition, bitcoin_port) = network_instance(NetworkId::Bitcoin);
|
||||
let (coord_key, message_queue_keys, message_queue_composition) = message_queue_instance().await;
|
||||
|
||||
let (bitcoin_composition, bitcoin_port) = network_instance(NetworkId::Bitcoin).await;
|
||||
let bitcoin_processor_composition =
|
||||
processor_instance(NetworkId::Bitcoin, bitcoin_port, message_queue_keys[&NetworkId::Bitcoin]);
|
||||
processor_instance(NetworkId::Bitcoin, bitcoin_port, message_queue_keys[&NetworkId::Bitcoin])
|
||||
.await;
|
||||
|
||||
let (monero_composition, monero_port) = network_instance(NetworkId::Monero);
|
||||
let (monero_composition, monero_port) = network_instance(NetworkId::Monero).await;
|
||||
let monero_processor_composition =
|
||||
processor_instance(NetworkId::Monero, monero_port, message_queue_keys[&NetworkId::Monero]);
|
||||
processor_instance(NetworkId::Monero, monero_port, message_queue_keys[&NetworkId::Monero])
|
||||
.await;
|
||||
|
||||
let coordinator_composition = coordinator_instance(name, coord_key);
|
||||
let serai_composition = serai_composition(name);
|
||||
let coordinator_composition = coordinator_instance(name, coord_key).await;
|
||||
let serai_composition = serai_composition(name).await;
|
||||
|
||||
// Give every item in this stack a unique ID
|
||||
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits
|
||||
|
|
|
@ -26,7 +26,7 @@ use crate::tests::*;
|
|||
#[tokio::test]
|
||||
async fn mint_and_burn_test() {
|
||||
let _one_at_a_time = ONE_AT_A_TIME.get_or_init(|| Mutex::new(())).lock();
|
||||
let (handles, test) = new_test();
|
||||
let (handles, test) = new_test().await;
|
||||
|
||||
test
|
||||
.run_async(|ops| async move {
|
||||
|
|
|
@ -11,7 +11,7 @@ pub(crate) const VALIDATORS: usize = 4;
|
|||
|
||||
pub(crate) static ONE_AT_A_TIME: OnceLock<Mutex<()>> = OnceLock::new();
|
||||
|
||||
pub(crate) fn new_test() -> (Vec<Handles>, DockerTest) {
|
||||
pub(crate) async fn new_test() -> (Vec<Handles>, DockerTest) {
|
||||
let mut validators = vec![];
|
||||
let mut test = DockerTest::new().with_network(dockertest::Network::Isolated);
|
||||
for i in 0 .. VALIDATORS {
|
||||
|
@ -23,7 +23,8 @@ pub(crate) fn new_test() -> (Vec<Handles>, DockerTest) {
|
|||
4 => "Eve",
|
||||
5 => "Ferdie",
|
||||
_ => panic!("needed a 7th name for a serai node"),
|
||||
});
|
||||
})
|
||||
.await;
|
||||
validators.push(handles);
|
||||
for composition in compositions {
|
||||
test.provide_container(composition);
|
||||
|
|
|
@ -13,10 +13,14 @@ use dockertest::{
|
|||
PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, TestBodySpecification,
|
||||
};
|
||||
|
||||
pub fn docker_name() -> String {
|
||||
"serai-dev-message-queue".to_string()
|
||||
}
|
||||
|
||||
pub type MessageQueuePrivateKey = <Ristretto as Ciphersuite>::F;
|
||||
pub fn instance(
|
||||
pub async fn instance(
|
||||
) -> (MessageQueuePrivateKey, HashMap<NetworkId, MessageQueuePrivateKey>, TestBodySpecification) {
|
||||
serai_docker_tests::build("message-queue".to_string());
|
||||
serai_docker_tests::build("message-queue".to_string()).await;
|
||||
|
||||
let coord_key = <Ristretto as Ciphersuite>::F::random(&mut OsRng);
|
||||
let priv_keys = HashMap::from([
|
||||
|
@ -26,7 +30,7 @@ pub fn instance(
|
|||
]);
|
||||
|
||||
let composition = TestBodySpecification::with_image(
|
||||
Image::with_repository("serai-dev-message-queue").pull_policy(PullPolicy::Never),
|
||||
Image::with_repository(docker_name()).pull_policy(PullPolicy::Never),
|
||||
)
|
||||
.set_log_options(Some(LogOptions {
|
||||
action: LogAction::Forward,
|
||||
|
@ -58,8 +62,8 @@ pub fn instance(
|
|||
(coord_key, priv_keys, composition)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic_functionality() {
|
||||
#[tokio::test]
|
||||
async fn basic_functionality() {
|
||||
use zeroize::Zeroizing;
|
||||
|
||||
use dockertest::DockerTest;
|
||||
|
@ -67,99 +71,107 @@ fn basic_functionality() {
|
|||
use serai_message_queue::{Service, Metadata, client::MessageQueue};
|
||||
|
||||
let mut test = DockerTest::new().with_network(dockertest::Network::Isolated);
|
||||
let (coord_key, priv_keys, composition) = instance();
|
||||
let (coord_key, priv_keys, composition) = instance().await;
|
||||
test.provide_container(composition);
|
||||
test.run(|ops| async move {
|
||||
tokio::time::timeout(core::time::Duration::from_secs(60), async move {
|
||||
// Sleep for a second for the message-queue to boot
|
||||
// It isn't an error to start immediately, it just silences an error
|
||||
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
|
||||
test
|
||||
.run_async(|ops| async move {
|
||||
tokio::time::timeout(core::time::Duration::from_secs(60), async move {
|
||||
// Sleep for a second for the message-queue to boot
|
||||
// It isn't an error to start immediately, it just silences an error
|
||||
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
|
||||
|
||||
let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap();
|
||||
let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string();
|
||||
let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap();
|
||||
let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string();
|
||||
|
||||
// Queue some messages
|
||||
let coordinator =
|
||||
MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key));
|
||||
coordinator
|
||||
.queue(
|
||||
Metadata {
|
||||
from: Service::Coordinator,
|
||||
to: Service::Processor(NetworkId::Bitcoin),
|
||||
intent: b"intent".to_vec(),
|
||||
},
|
||||
b"Hello, World!".to_vec(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Queue this twice, which message-queue should de-duplicate
|
||||
for _ in 0 .. 2 {
|
||||
// Queue some messages
|
||||
let coordinator =
|
||||
MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key));
|
||||
coordinator
|
||||
.queue(
|
||||
Metadata {
|
||||
from: Service::Coordinator,
|
||||
to: Service::Processor(NetworkId::Bitcoin),
|
||||
intent: b"intent 2".to_vec(),
|
||||
intent: b"intent".to_vec(),
|
||||
},
|
||||
b"Hello, World, again!".to_vec(),
|
||||
b"Hello, World!".to_vec(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Successfully get it
|
||||
let bitcoin = MessageQueue::new(
|
||||
Service::Processor(NetworkId::Bitcoin),
|
||||
rpc.clone(),
|
||||
Zeroizing::new(priv_keys[&NetworkId::Bitcoin]),
|
||||
);
|
||||
let msg = bitcoin.next(Service::Coordinator).await;
|
||||
assert_eq!(msg.from, Service::Coordinator);
|
||||
assert_eq!(msg.id, 0);
|
||||
assert_eq!(&msg.msg, b"Hello, World!");
|
||||
// Queue this twice, which message-queue should de-duplicate
|
||||
for _ in 0 .. 2 {
|
||||
coordinator
|
||||
.queue(
|
||||
Metadata {
|
||||
from: Service::Coordinator,
|
||||
to: Service::Processor(NetworkId::Bitcoin),
|
||||
intent: b"intent 2".to_vec(),
|
||||
},
|
||||
b"Hello, World, again!".to_vec(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// If we don't ack it, it should continue to be returned
|
||||
assert_eq!(msg, bitcoin.next(Service::Coordinator).await);
|
||||
// Successfully get it
|
||||
let bitcoin = MessageQueue::new(
|
||||
Service::Processor(NetworkId::Bitcoin),
|
||||
rpc.clone(),
|
||||
Zeroizing::new(priv_keys[&NetworkId::Bitcoin]),
|
||||
);
|
||||
let msg = bitcoin.next(Service::Coordinator).await;
|
||||
assert_eq!(msg.from, Service::Coordinator);
|
||||
assert_eq!(msg.id, 0);
|
||||
assert_eq!(&msg.msg, b"Hello, World!");
|
||||
|
||||
// Acknowledging it should yield the next message
|
||||
bitcoin.ack(Service::Coordinator, 0).await;
|
||||
// If we don't ack it, it should continue to be returned
|
||||
assert_eq!(msg, bitcoin.next(Service::Coordinator).await);
|
||||
|
||||
let next_msg = bitcoin.next(Service::Coordinator).await;
|
||||
assert!(msg != next_msg);
|
||||
assert_eq!(next_msg.from, Service::Coordinator);
|
||||
assert_eq!(next_msg.id, 1);
|
||||
assert_eq!(&next_msg.msg, b"Hello, World, again!");
|
||||
bitcoin.ack(Service::Coordinator, 1).await;
|
||||
// Acknowledging it should yield the next message
|
||||
bitcoin.ack(Service::Coordinator, 0).await;
|
||||
|
||||
// No further messages should be available
|
||||
tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(Service::Coordinator))
|
||||
.await
|
||||
.unwrap_err();
|
||||
let next_msg = bitcoin.next(Service::Coordinator).await;
|
||||
assert!(msg != next_msg);
|
||||
assert_eq!(next_msg.from, Service::Coordinator);
|
||||
assert_eq!(next_msg.id, 1);
|
||||
assert_eq!(&next_msg.msg, b"Hello, World, again!");
|
||||
bitcoin.ack(Service::Coordinator, 1).await;
|
||||
|
||||
// Queueing to a distinct processor should work, with a unique ID
|
||||
coordinator
|
||||
.queue(
|
||||
Metadata {
|
||||
from: Service::Coordinator,
|
||||
to: Service::Processor(NetworkId::Monero),
|
||||
// Intents should be per-from-to, making this valid
|
||||
intent: b"intent".to_vec(),
|
||||
},
|
||||
b"Hello, World!".to_vec(),
|
||||
// No further messages should be available
|
||||
tokio::time::timeout(
|
||||
core::time::Duration::from_secs(10),
|
||||
bitcoin.next(Service::Coordinator),
|
||||
)
|
||||
.await;
|
||||
|
||||
let monero = MessageQueue::new(
|
||||
Service::Processor(NetworkId::Monero),
|
||||
rpc,
|
||||
Zeroizing::new(priv_keys[&NetworkId::Monero]),
|
||||
);
|
||||
assert_eq!(monero.next(Service::Coordinator).await.id, 0);
|
||||
monero.ack(Service::Coordinator, 0).await;
|
||||
tokio::time::timeout(core::time::Duration::from_secs(10), monero.next(Service::Coordinator))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// Queueing to a distinct processor should work, with a unique ID
|
||||
coordinator
|
||||
.queue(
|
||||
Metadata {
|
||||
from: Service::Coordinator,
|
||||
to: Service::Processor(NetworkId::Monero),
|
||||
// Intents should be per-from-to, making this valid
|
||||
intent: b"intent".to_vec(),
|
||||
},
|
||||
b"Hello, World!".to_vec(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let monero = MessageQueue::new(
|
||||
Service::Processor(NetworkId::Monero),
|
||||
rpc,
|
||||
Zeroizing::new(priv_keys[&NetworkId::Monero]),
|
||||
);
|
||||
assert_eq!(monero.next(Service::Coordinator).await.id, 0);
|
||||
monero.ack(Service::Coordinator, 0).await;
|
||||
tokio::time::timeout(
|
||||
core::time::Duration::from_secs(10),
|
||||
monero.next(Service::Coordinator),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
.await;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,20 @@ mod tests;
|
|||
|
||||
static UNIQUE_ID: OnceLock<Mutex<u16>> = OnceLock::new();
|
||||
|
||||
pub fn processor_instance(
|
||||
fn network_str(network: NetworkId) -> &'static str {
|
||||
match network {
|
||||
NetworkId::Serai => panic!("starting a processor for Serai"),
|
||||
NetworkId::Bitcoin => "bitcoin",
|
||||
NetworkId::Ethereum => "ethereum",
|
||||
NetworkId::Monero => "monero",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn processor_docker_name(network: NetworkId) -> String {
|
||||
format!("{}-processor", network_str(network))
|
||||
}
|
||||
|
||||
pub async fn processor_instance(
|
||||
network: NetworkId,
|
||||
port: u32,
|
||||
message_queue_key: <Ristretto as Ciphersuite>::F,
|
||||
|
@ -32,17 +45,12 @@ pub fn processor_instance(
|
|||
let mut entropy = [0; 32];
|
||||
OsRng.fill_bytes(&mut entropy);
|
||||
|
||||
let network_str = match network {
|
||||
NetworkId::Serai => panic!("starting a processor for Serai"),
|
||||
NetworkId::Bitcoin => "bitcoin",
|
||||
NetworkId::Ethereum => "ethereum",
|
||||
NetworkId::Monero => "monero",
|
||||
};
|
||||
let image = format!("{network_str}-processor");
|
||||
serai_docker_tests::build(image.clone());
|
||||
let network_str = network_str(network);
|
||||
serai_docker_tests::build(processor_docker_name(network)).await;
|
||||
|
||||
TestBodySpecification::with_image(
|
||||
Image::with_repository(format!("serai-dev-{image}")).pull_policy(PullPolicy::Never),
|
||||
Image::with_repository(format!("serai-dev-{}", processor_docker_name(network)))
|
||||
.pull_policy(PullPolicy::Never),
|
||||
)
|
||||
.replace_env(
|
||||
[
|
||||
|
@ -58,17 +66,23 @@ pub fn processor_instance(
|
|||
)
|
||||
}
|
||||
|
||||
pub fn docker_names(network: NetworkId) -> Vec<String> {
|
||||
vec![network_docker_name(network), processor_docker_name(network)]
|
||||
}
|
||||
|
||||
pub type Handles = (String, String, String);
|
||||
pub fn processor_stack(
|
||||
pub async fn processor_stack(
|
||||
network: NetworkId,
|
||||
) -> (Handles, <Ristretto as Ciphersuite>::F, Vec<TestBodySpecification>) {
|
||||
let (network_composition, network_rpc_port) = network_instance(network);
|
||||
serai_docker_tests::build_batch(docker_names(network)).await;
|
||||
|
||||
let (network_composition, network_rpc_port) = network_instance(network).await;
|
||||
|
||||
let (coord_key, message_queue_keys, message_queue_composition) =
|
||||
serai_message_queue_tests::instance();
|
||||
serai_message_queue_tests::instance().await;
|
||||
|
||||
let processor_composition =
|
||||
processor_instance(network, network_rpc_port, message_queue_keys[&network]);
|
||||
processor_instance(network, network_rpc_port, message_queue_keys[&network]).await;
|
||||
|
||||
// Give every item in this stack a unique ID
|
||||
// Uses a Mutex as we can't generate a 8-byte random ID without hitting hostname length limits
|
||||
|
|
|
@ -21,8 +21,19 @@ pub const RPC_PASS: &str = "seraidex";
|
|||
pub const BTC_PORT: u32 = 8332;
|
||||
pub const XMR_PORT: u32 = 18081;
|
||||
|
||||
pub fn bitcoin_instance() -> (TestBodySpecification, u32) {
|
||||
serai_docker_tests::build("bitcoin".to_string());
|
||||
pub fn network_docker_name(network: NetworkId) -> String {
|
||||
match network {
|
||||
NetworkId::Serai => {
|
||||
panic!("asking for docker name for external network Serai, which isn't external")
|
||||
}
|
||||
NetworkId::Bitcoin => "bitcoin".to_string(),
|
||||
NetworkId::Ethereum => todo!(),
|
||||
NetworkId::Monero => "monero".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn bitcoin_instance() -> (TestBodySpecification, u32) {
|
||||
serai_docker_tests::build(network_docker_name(NetworkId::Bitcoin)).await;
|
||||
|
||||
let composition = TestBodySpecification::with_image(
|
||||
Image::with_repository("serai-dev-bitcoin").pull_policy(PullPolicy::Never),
|
||||
|
@ -41,8 +52,8 @@ pub fn bitcoin_instance() -> (TestBodySpecification, u32) {
|
|||
(composition, BTC_PORT)
|
||||
}
|
||||
|
||||
pub fn monero_instance() -> (TestBodySpecification, u32) {
|
||||
serai_docker_tests::build("monero".to_string());
|
||||
pub async fn monero_instance() -> (TestBodySpecification, u32) {
|
||||
serai_docker_tests::build(network_docker_name(NetworkId::Monero)).await;
|
||||
|
||||
let composition = TestBodySpecification::with_image(
|
||||
Image::with_repository("serai-dev-monero").pull_policy(PullPolicy::Never),
|
||||
|
@ -63,11 +74,11 @@ pub fn monero_instance() -> (TestBodySpecification, u32) {
|
|||
(composition, XMR_PORT)
|
||||
}
|
||||
|
||||
pub fn network_instance(network: NetworkId) -> (TestBodySpecification, u32) {
|
||||
pub async fn network_instance(network: NetworkId) -> (TestBodySpecification, u32) {
|
||||
match network {
|
||||
NetworkId::Bitcoin => bitcoin_instance(),
|
||||
NetworkId::Bitcoin => bitcoin_instance().await,
|
||||
NetworkId::Ethereum => todo!(),
|
||||
NetworkId::Monero => monero_instance(),
|
||||
NetworkId::Monero => monero_instance().await,
|
||||
NetworkId::Serai => {
|
||||
panic!("Serai is not a valid network to spawn an instance of for a processor")
|
||||
}
|
||||
|
|
|
@ -191,164 +191,167 @@ pub(crate) async fn substrate_block(
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn batch_test() {
|
||||
#[tokio::test]
|
||||
async fn batch_test() {
|
||||
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
|
||||
let (coordinators, test) = new_test(network);
|
||||
let (coordinators, test) = new_test(network).await;
|
||||
|
||||
test.run(|ops| async move {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
let mut coordinators = coordinators
|
||||
.into_iter()
|
||||
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Create a wallet before we start generating keys
|
||||
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// Generate keys
|
||||
let key_pair = key_gen(&mut coordinators).await;
|
||||
|
||||
// Now we we have to mine blocks to activate the key
|
||||
// (the first key is activated when the network's time as of a block exceeds the Serai time
|
||||
// it was confirmed at)
|
||||
// Mine multiple sets of medians to ensure the median is sufficiently advanced
|
||||
for _ in 0 .. (10 * confirmations(network)) {
|
||||
coordinators[0].add_block(&ops).await;
|
||||
test
|
||||
.run_async(|ops| async move {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// Run twice, once with an instruction and once without
|
||||
let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1;
|
||||
for i in 0 .. 2 {
|
||||
let mut serai_address = [0; 32];
|
||||
OsRng.fill_bytes(&mut serai_address);
|
||||
let instruction =
|
||||
if i == 0 { Some(InInstruction::Transfer(SeraiAddress(serai_address))) } else { None };
|
||||
let mut coordinators = coordinators
|
||||
.into_iter()
|
||||
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Send into the processor's wallet
|
||||
let (tx, balance_sent) =
|
||||
wallet.send_to_address(&ops, &key_pair.1, instruction.clone()).await;
|
||||
for coordinator in &mut coordinators {
|
||||
coordinator.publish_transacton(&ops, &tx).await;
|
||||
}
|
||||
// Create a wallet before we start generating keys
|
||||
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// Put the TX past the confirmation depth
|
||||
let mut block_with_tx = None;
|
||||
for _ in 0 .. confirmations(network) {
|
||||
let (hash, _) = coordinators[0].add_block(&ops).await;
|
||||
if block_with_tx.is_none() {
|
||||
block_with_tx = Some(hash);
|
||||
}
|
||||
// Generate keys
|
||||
let key_pair = key_gen(&mut coordinators).await;
|
||||
|
||||
// Now we we have to mine blocks to activate the key
|
||||
// (the first key is activated when the network's time as of a block exceeds the Serai time
|
||||
// it was confirmed at)
|
||||
// Mine multiple sets of medians to ensure the median is sufficiently advanced
|
||||
for _ in 0 .. (10 * confirmations(network)) {
|
||||
coordinators[0].add_block(&ops).await;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// Sleep for 10s
|
||||
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
// Run twice, once with an instruction and once without
|
||||
let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1;
|
||||
for i in 0 .. 2 {
|
||||
let mut serai_address = [0; 32];
|
||||
OsRng.fill_bytes(&mut serai_address);
|
||||
let instruction =
|
||||
if i == 0 { Some(InInstruction::Transfer(SeraiAddress(serai_address))) } else { None };
|
||||
|
||||
let expected_batch = Batch {
|
||||
network,
|
||||
id: i,
|
||||
block: BlockHash(block_with_tx.unwrap()),
|
||||
instructions: if let Some(instruction) = &instruction {
|
||||
vec![InInstructionWithBalance {
|
||||
instruction: instruction.clone(),
|
||||
balance: Balance {
|
||||
coin: balance_sent.coin,
|
||||
amount: Amount(
|
||||
balance_sent.amount.0 -
|
||||
(2 * if network == NetworkId::Bitcoin {
|
||||
Bitcoin::COST_TO_AGGREGATE
|
||||
} else {
|
||||
Monero::COST_TO_AGGREGATE
|
||||
}),
|
||||
),
|
||||
},
|
||||
}]
|
||||
} else {
|
||||
// This shouldn't have an instruction as we didn't add any data into the TX we sent
|
||||
// Empty batches remain valuable as they let us achieve consensus on the block and spend
|
||||
// contained outputs
|
||||
vec![]
|
||||
},
|
||||
};
|
||||
|
||||
// Make sure the processors picked it up by checking they're trying to sign a batch for it
|
||||
let (mut id, mut preprocesses) =
|
||||
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await;
|
||||
// Trigger a random amount of re-attempts
|
||||
for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
|
||||
// TODO: Double check how the processor handles this ID field
|
||||
// It should be able to assert its perfectly sequential
|
||||
id.attempt = attempt;
|
||||
for coordinator in coordinators.iter_mut() {
|
||||
coordinator
|
||||
.send_message(messages::coordinator::CoordinatorMessage::BatchReattempt {
|
||||
id: id.clone(),
|
||||
})
|
||||
.await;
|
||||
// Send into the processor's wallet
|
||||
let (tx, balance_sent) =
|
||||
wallet.send_to_address(&ops, &key_pair.1, instruction.clone()).await;
|
||||
for coordinator in &mut coordinators {
|
||||
coordinator.publish_transacton(&ops, &tx).await;
|
||||
}
|
||||
(id, preprocesses) =
|
||||
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, attempt).await;
|
||||
}
|
||||
|
||||
// Continue with signing the batch
|
||||
let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await;
|
||||
|
||||
// Check it
|
||||
assert_eq!(batch.batch, expected_batch);
|
||||
|
||||
// Fire a SubstrateBlock
|
||||
let serai_time =
|
||||
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
for coordinator in &mut coordinators {
|
||||
let plans = substrate_block(
|
||||
coordinator,
|
||||
messages::substrate::CoordinatorMessage::SubstrateBlock {
|
||||
context: SubstrateContext {
|
||||
serai_time,
|
||||
network_latest_finalized_block: batch.batch.block,
|
||||
},
|
||||
block: substrate_block_num + u64::from(i),
|
||||
burns: vec![],
|
||||
batches: vec![batch.batch.id],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
if instruction.is_some() || (instruction.is_none() && (network == NetworkId::Monero)) {
|
||||
assert!(plans.is_empty());
|
||||
} else {
|
||||
// If no instruction was used, and the processor csn presume the origin, it'd have
|
||||
// created a refund Plan
|
||||
assert_eq!(plans.len(), 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// With the latter InInstruction not existing, we should've triggered a refund if the origin
|
||||
// was detectable
|
||||
// Check this is trying to sign a Plan
|
||||
if network != NetworkId::Monero {
|
||||
let mut refund_id = None;
|
||||
for coordinator in &mut coordinators {
|
||||
match coordinator.recv_message().await {
|
||||
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Preprocess {
|
||||
id,
|
||||
..
|
||||
}) => {
|
||||
if refund_id.is_none() {
|
||||
refund_id = Some(id.clone());
|
||||
}
|
||||
assert_eq!(refund_id.as_ref().unwrap(), &id);
|
||||
// Put the TX past the confirmation depth
|
||||
let mut block_with_tx = None;
|
||||
for _ in 0 .. confirmations(network) {
|
||||
let (hash, _) = coordinators[0].add_block(&ops).await;
|
||||
if block_with_tx.is_none() {
|
||||
block_with_tx = Some(hash);
|
||||
}
|
||||
}
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// Sleep for 10s
|
||||
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
|
||||
let expected_batch = Batch {
|
||||
network,
|
||||
id: i,
|
||||
block: BlockHash(block_with_tx.unwrap()),
|
||||
instructions: if let Some(instruction) = &instruction {
|
||||
vec![InInstructionWithBalance {
|
||||
instruction: instruction.clone(),
|
||||
balance: Balance {
|
||||
coin: balance_sent.coin,
|
||||
amount: Amount(
|
||||
balance_sent.amount.0 -
|
||||
(2 * if network == NetworkId::Bitcoin {
|
||||
Bitcoin::COST_TO_AGGREGATE
|
||||
} else {
|
||||
Monero::COST_TO_AGGREGATE
|
||||
}),
|
||||
),
|
||||
},
|
||||
}]
|
||||
} else {
|
||||
// This shouldn't have an instruction as we didn't add any data into the TX we sent
|
||||
// Empty batches remain valuable as they let us achieve consensus on the block and
|
||||
// spend contained outputs
|
||||
vec![]
|
||||
},
|
||||
};
|
||||
|
||||
// Make sure the processors picked it up by checking they're trying to sign a batch for it
|
||||
let (mut id, mut preprocesses) =
|
||||
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await;
|
||||
// Trigger a random amount of re-attempts
|
||||
for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
|
||||
// TODO: Double check how the processor handles this ID field
|
||||
// It should be able to assert its perfectly sequential
|
||||
id.attempt = attempt;
|
||||
for coordinator in coordinators.iter_mut() {
|
||||
coordinator
|
||||
.send_message(messages::coordinator::CoordinatorMessage::BatchReattempt {
|
||||
id: id.clone(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
(id, preprocesses) =
|
||||
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, attempt)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Continue with signing the batch
|
||||
let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await;
|
||||
|
||||
// Check it
|
||||
assert_eq!(batch.batch, expected_batch);
|
||||
|
||||
// Fire a SubstrateBlock
|
||||
let serai_time =
|
||||
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
for coordinator in &mut coordinators {
|
||||
let plans = substrate_block(
|
||||
coordinator,
|
||||
messages::substrate::CoordinatorMessage::SubstrateBlock {
|
||||
context: SubstrateContext {
|
||||
serai_time,
|
||||
network_latest_finalized_block: batch.batch.block,
|
||||
},
|
||||
block: substrate_block_num + u64::from(i),
|
||||
burns: vec![],
|
||||
batches: vec![batch.batch.id],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
if instruction.is_some() || (instruction.is_none() && (network == NetworkId::Monero)) {
|
||||
assert!(plans.is_empty());
|
||||
} else {
|
||||
// If no instruction was used, and the processor csn presume the origin, it'd have
|
||||
// created a refund Plan
|
||||
assert_eq!(plans.len(), 1);
|
||||
}
|
||||
_ => panic!("processor didn't send preprocess for expected refund transaction"),
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// With the latter InInstruction not existing, we should've triggered a refund if the origin
|
||||
// was detectable
|
||||
// Check this is trying to sign a Plan
|
||||
if network != NetworkId::Monero {
|
||||
let mut refund_id = None;
|
||||
for coordinator in &mut coordinators {
|
||||
match coordinator.recv_message().await {
|
||||
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Preprocess {
|
||||
id,
|
||||
..
|
||||
}) => {
|
||||
if refund_id.is_none() {
|
||||
refund_id = Some(id.clone());
|
||||
}
|
||||
assert_eq!(refund_id.as_ref().unwrap(), &id);
|
||||
}
|
||||
_ => panic!("processor didn't send preprocess for expected refund transaction"),
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,23 +142,25 @@ pub(crate) async fn key_gen(coordinators: &mut [Coordinator]) -> KeyPair {
|
|||
key_pair
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn key_gen_test() {
|
||||
#[tokio::test]
|
||||
async fn key_gen_test() {
|
||||
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
|
||||
let (coordinators, test) = new_test(network);
|
||||
let (coordinators, test) = new_test(network).await;
|
||||
|
||||
test.run(|ops| async move {
|
||||
// Sleep for a second for the message-queue to boot
|
||||
// It isn't an error to start immediately, it just silences an error
|
||||
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
|
||||
test
|
||||
.run_async(|ops| async move {
|
||||
// Sleep for a second for the message-queue to boot
|
||||
// It isn't an error to start immediately, it just silences an error
|
||||
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
|
||||
|
||||
// Connect to the Message Queues as the coordinator
|
||||
let mut coordinators = coordinators
|
||||
.into_iter()
|
||||
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
|
||||
.collect::<Vec<_>>();
|
||||
// Connect to the Message Queues as the coordinator
|
||||
let mut coordinators = coordinators
|
||||
.into_iter()
|
||||
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
key_gen(&mut coordinators).await;
|
||||
});
|
||||
key_gen(&mut coordinators).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,11 +17,13 @@ mod send;
|
|||
pub(crate) const COORDINATORS: usize = 4;
|
||||
pub(crate) const THRESHOLD: usize = ((COORDINATORS * 2) / 3) + 1;
|
||||
|
||||
fn new_test(network: NetworkId) -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
|
||||
pub(crate) async fn new_test(
|
||||
network: NetworkId,
|
||||
) -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, DockerTest) {
|
||||
let mut coordinators = vec![];
|
||||
let mut test = DockerTest::new().with_network(dockertest::Network::Isolated);
|
||||
for _ in 0 .. COORDINATORS {
|
||||
let (handles, coord_key, compositions) = processor_stack(network);
|
||||
let (handles, coord_key, compositions) = processor_stack(network).await;
|
||||
coordinators.push((handles, coord_key));
|
||||
for composition in compositions {
|
||||
test.provide_container(composition);
|
||||
|
|
|
@ -142,163 +142,166 @@ pub(crate) async fn sign_tx(
|
|||
tx.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_test() {
|
||||
#[tokio::test]
|
||||
async fn send_test() {
|
||||
for network in [NetworkId::Bitcoin, NetworkId::Monero] {
|
||||
let (coordinators, test) = new_test(network);
|
||||
let (coordinators, test) = new_test(network).await;
|
||||
|
||||
test.run(|ops| async move {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
let mut coordinators = coordinators
|
||||
.into_iter()
|
||||
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Create a wallet before we start generating keys
|
||||
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// Generate keys
|
||||
let key_pair = key_gen(&mut coordinators).await;
|
||||
|
||||
// Now we we have to mine blocks to activate the key
|
||||
// (the first key is activated when the network's time as of a block exceeds the Serai time
|
||||
// it was confirmed at)
|
||||
// Mine multiple sets of medians to ensure the median is sufficiently advanced
|
||||
for _ in 0 .. (10 * confirmations(network)) {
|
||||
coordinators[0].add_block(&ops).await;
|
||||
test
|
||||
.run_async(|ops| async move {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// Send into the processor's wallet
|
||||
let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, None).await;
|
||||
for coordinator in &mut coordinators {
|
||||
coordinator.publish_transacton(&ops, &tx).await;
|
||||
}
|
||||
let mut coordinators = coordinators
|
||||
.into_iter()
|
||||
.map(|(handles, key)| Coordinator::new(network, &ops, handles, key))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Put the TX past the confirmation depth
|
||||
let mut block_with_tx = None;
|
||||
for _ in 0 .. confirmations(network) {
|
||||
let (hash, _) = coordinators[0].add_block(&ops).await;
|
||||
if block_with_tx.is_none() {
|
||||
block_with_tx = Some(hash);
|
||||
// Create a wallet before we start generating keys
|
||||
let mut wallet = Wallet::new(network, &ops, coordinators[0].network_handle.clone()).await;
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// Generate keys
|
||||
let key_pair = key_gen(&mut coordinators).await;
|
||||
|
||||
// Now we we have to mine blocks to activate the key
|
||||
// (the first key is activated when the network's time as of a block exceeds the Serai time
|
||||
// it was confirmed at)
|
||||
// Mine multiple sets of medians to ensure the median is sufficiently advanced
|
||||
for _ in 0 .. (10 * confirmations(network)) {
|
||||
coordinators[0].add_block(&ops).await;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// Sleep for 10s
|
||||
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
|
||||
let expected_batch =
|
||||
Batch { network, id: 0, block: BlockHash(block_with_tx.unwrap()), instructions: vec![] };
|
||||
|
||||
// Make sure the proceessors picked it up by checking they're trying to sign a batch for it
|
||||
let (id, preprocesses) =
|
||||
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await;
|
||||
|
||||
// Continue with signing the batch
|
||||
let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await;
|
||||
|
||||
// Check it
|
||||
assert_eq!(batch.batch, expected_batch);
|
||||
|
||||
// Fire a SubstrateBlock with a burn
|
||||
let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1;
|
||||
let serai_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
|
||||
let mut plans = vec![];
|
||||
for coordinator in &mut coordinators {
|
||||
let these_plans = substrate_block(
|
||||
coordinator,
|
||||
messages::substrate::CoordinatorMessage::SubstrateBlock {
|
||||
context: SubstrateContext {
|
||||
serai_time,
|
||||
network_latest_finalized_block: batch.batch.block,
|
||||
},
|
||||
block: substrate_block_num,
|
||||
burns: vec![OutInstructionWithBalance {
|
||||
instruction: OutInstruction { address: wallet.address(), data: None },
|
||||
balance: balance_sent,
|
||||
}],
|
||||
batches: vec![batch.batch.id],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
if plans.is_empty() {
|
||||
plans = these_plans;
|
||||
} else {
|
||||
assert_eq!(plans, these_plans);
|
||||
}
|
||||
}
|
||||
assert_eq!(plans.len(), 1);
|
||||
|
||||
// Start signing the TX
|
||||
let (mut id, mut preprocesses) =
|
||||
recv_sign_preprocesses(&mut coordinators, Session(0), 0).await;
|
||||
assert_eq!(id, SignId { session: Session(0), id: plans[0].id, attempt: 0 });
|
||||
|
||||
// Trigger a random amount of re-attempts
|
||||
for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
|
||||
// TODO: Double check how the processor handles this ID field
|
||||
// It should be able to assert its perfectly sequential
|
||||
id.attempt = attempt;
|
||||
for coordinator in coordinators.iter_mut() {
|
||||
coordinator
|
||||
.send_message(messages::sign::CoordinatorMessage::Reattempt { id: id.clone() })
|
||||
.await;
|
||||
}
|
||||
(id, preprocesses) = recv_sign_preprocesses(&mut coordinators, Session(0), attempt).await;
|
||||
}
|
||||
let participating = preprocesses.keys().cloned().collect::<Vec<_>>();
|
||||
|
||||
let tx_id = sign_tx(&mut coordinators, Session(0), id.clone(), preprocesses).await;
|
||||
|
||||
// Make sure all participating nodes published the TX
|
||||
let participating =
|
||||
participating.iter().map(|p| usize::from(u16::from(*p) - 1)).collect::<HashSet<_>>();
|
||||
for participant in &participating {
|
||||
assert!(coordinators[*participant].get_transaction(&ops, &tx_id).await.is_some());
|
||||
}
|
||||
|
||||
// Publish this transaction to the left out nodes
|
||||
let tx = coordinators[*participating.iter().next().unwrap()]
|
||||
.get_transaction(&ops, &tx_id)
|
||||
.await
|
||||
.unwrap();
|
||||
for (i, coordinator) in coordinators.iter_mut().enumerate() {
|
||||
if !participating.contains(&i) {
|
||||
// Send into the processor's wallet
|
||||
let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, None).await;
|
||||
for coordinator in &mut coordinators {
|
||||
coordinator.publish_transacton(&ops, &tx).await;
|
||||
// Tell them of it as a completion of the relevant signing nodess
|
||||
coordinator
|
||||
.send_message(messages::sign::CoordinatorMessage::Completed {
|
||||
session: Session(0),
|
||||
id: id.id,
|
||||
tx: tx_id.clone(),
|
||||
})
|
||||
.await;
|
||||
// Verify they send Completed back
|
||||
match coordinator.recv_message().await {
|
||||
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed {
|
||||
session,
|
||||
id: this_id,
|
||||
tx: this_tx,
|
||||
}) => {
|
||||
assert_eq!(session, Session(0));
|
||||
assert_eq!(&this_id, &id.id);
|
||||
assert_eq!(this_tx, tx_id);
|
||||
}
|
||||
_ => panic!("processor didn't send Completed"),
|
||||
}
|
||||
|
||||
// Put the TX past the confirmation depth
|
||||
let mut block_with_tx = None;
|
||||
for _ in 0 .. confirmations(network) {
|
||||
let (hash, _) = coordinators[0].add_block(&ops).await;
|
||||
if block_with_tx.is_none() {
|
||||
block_with_tx = Some(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
coordinators[0].sync(&ops, &coordinators[1 ..]).await;
|
||||
|
||||
// TODO: Test the Eventuality from the blockchain, instead of from the coordinator
|
||||
// TODO: Test what happenns when Completed is sent with a non-existent TX ID
|
||||
// TODO: Test what happenns when Completed is sent with a non-completing TX ID
|
||||
});
|
||||
// Sleep for 10s
|
||||
// The scanner works on a 5s interval, so this leaves a few s for any processing/latency
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
|
||||
let expected_batch =
|
||||
Batch { network, id: 0, block: BlockHash(block_with_tx.unwrap()), instructions: vec![] };
|
||||
|
||||
// Make sure the proceessors picked it up by checking they're trying to sign a batch for it
|
||||
let (id, preprocesses) =
|
||||
recv_batch_preprocesses(&mut coordinators, Session(0), &expected_batch, 0).await;
|
||||
|
||||
// Continue with signing the batch
|
||||
let batch = sign_batch(&mut coordinators, key_pair.0 .0, id, preprocesses).await;
|
||||
|
||||
// Check it
|
||||
assert_eq!(batch.batch, expected_batch);
|
||||
|
||||
// Fire a SubstrateBlock with a burn
|
||||
let substrate_block_num = (OsRng.next_u64() % 4_000_000_000u64) + 1;
|
||||
let serai_time =
|
||||
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
|
||||
let mut plans = vec![];
|
||||
for coordinator in &mut coordinators {
|
||||
let these_plans = substrate_block(
|
||||
coordinator,
|
||||
messages::substrate::CoordinatorMessage::SubstrateBlock {
|
||||
context: SubstrateContext {
|
||||
serai_time,
|
||||
network_latest_finalized_block: batch.batch.block,
|
||||
},
|
||||
block: substrate_block_num,
|
||||
burns: vec![OutInstructionWithBalance {
|
||||
instruction: OutInstruction { address: wallet.address(), data: None },
|
||||
balance: balance_sent,
|
||||
}],
|
||||
batches: vec![batch.batch.id],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
if plans.is_empty() {
|
||||
plans = these_plans;
|
||||
} else {
|
||||
assert_eq!(plans, these_plans);
|
||||
}
|
||||
}
|
||||
assert_eq!(plans.len(), 1);
|
||||
|
||||
// Start signing the TX
|
||||
let (mut id, mut preprocesses) =
|
||||
recv_sign_preprocesses(&mut coordinators, Session(0), 0).await;
|
||||
assert_eq!(id, SignId { session: Session(0), id: plans[0].id, attempt: 0 });
|
||||
|
||||
// Trigger a random amount of re-attempts
|
||||
for attempt in 1 ..= u32::try_from(OsRng.next_u64() % 4).unwrap() {
|
||||
// TODO: Double check how the processor handles this ID field
|
||||
// It should be able to assert its perfectly sequential
|
||||
id.attempt = attempt;
|
||||
for coordinator in coordinators.iter_mut() {
|
||||
coordinator
|
||||
.send_message(messages::sign::CoordinatorMessage::Reattempt { id: id.clone() })
|
||||
.await;
|
||||
}
|
||||
(id, preprocesses) = recv_sign_preprocesses(&mut coordinators, Session(0), attempt).await;
|
||||
}
|
||||
let participating = preprocesses.keys().cloned().collect::<Vec<_>>();
|
||||
|
||||
let tx_id = sign_tx(&mut coordinators, Session(0), id.clone(), preprocesses).await;
|
||||
|
||||
// Make sure all participating nodes published the TX
|
||||
let participating =
|
||||
participating.iter().map(|p| usize::from(u16::from(*p) - 1)).collect::<HashSet<_>>();
|
||||
for participant in &participating {
|
||||
assert!(coordinators[*participant].get_transaction(&ops, &tx_id).await.is_some());
|
||||
}
|
||||
|
||||
// Publish this transaction to the left out nodes
|
||||
let tx = coordinators[*participating.iter().next().unwrap()]
|
||||
.get_transaction(&ops, &tx_id)
|
||||
.await
|
||||
.unwrap();
|
||||
for (i, coordinator) in coordinators.iter_mut().enumerate() {
|
||||
if !participating.contains(&i) {
|
||||
coordinator.publish_transacton(&ops, &tx).await;
|
||||
// Tell them of it as a completion of the relevant signing nodess
|
||||
coordinator
|
||||
.send_message(messages::sign::CoordinatorMessage::Completed {
|
||||
session: Session(0),
|
||||
id: id.id,
|
||||
tx: tx_id.clone(),
|
||||
})
|
||||
.await;
|
||||
// Verify they send Completed back
|
||||
match coordinator.recv_message().await {
|
||||
messages::ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed {
|
||||
session,
|
||||
id: this_id,
|
||||
tx: this_tx,
|
||||
}) => {
|
||||
assert_eq!(session, Session(0));
|
||||
assert_eq!(&this_id, &id.id);
|
||||
assert_eq!(this_tx, tx_id);
|
||||
}
|
||||
_ => panic!("processor didn't send Completed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Test the Eventuality from the blockchain, instead of from the coordinator
|
||||
// TODO: Test what happenns when Completed is sent with a non-existent TX ID
|
||||
// TODO: Test what happenns when Completed is sent with a non-completing TX ID
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#[test]
|
||||
pub fn reproducibly_builds() {
|
||||
#[tokio::test]
|
||||
pub async fn reproducibly_builds() {
|
||||
use std::{collections::HashSet, process::Command};
|
||||
|
||||
use rand_core::{RngCore, OsRng};
|
||||
|
@ -9,7 +9,7 @@ pub fn reproducibly_builds() {
|
|||
const RUNS: usize = 3;
|
||||
const TIMEOUT: u16 = 180 * 60; // 3 hours
|
||||
|
||||
serai_docker_tests::build("runtime".to_string());
|
||||
serai_docker_tests::build("runtime".to_string()).await;
|
||||
|
||||
let mut ids = vec![[0; 8]; RUNS];
|
||||
for id in &mut ids {
|
||||
|
@ -38,64 +38,66 @@ pub fn reproducibly_builds() {
|
|||
);
|
||||
}
|
||||
|
||||
test.run(|_| async {
|
||||
let ids = ids;
|
||||
let mut containers = vec![];
|
||||
for container in String::from_utf8(
|
||||
Command::new("docker").arg("ps").arg("--format").arg("{{.Names}}").output().unwrap().stdout,
|
||||
)
|
||||
.expect("output wasn't utf-8")
|
||||
.lines()
|
||||
{
|
||||
for id in &ids {
|
||||
if container.contains(&hex::encode(id)) {
|
||||
containers.push(container.trim().to_string());
|
||||
test
|
||||
.run_async(|_| async {
|
||||
let ids = ids;
|
||||
let mut containers = vec![];
|
||||
for container in String::from_utf8(
|
||||
Command::new("docker").arg("ps").arg("--format").arg("{{.Names}}").output().unwrap().stdout,
|
||||
)
|
||||
.expect("output wasn't utf-8")
|
||||
.lines()
|
||||
{
|
||||
for id in &ids {
|
||||
if container.contains(&hex::encode(id)) {
|
||||
containers.push(container.trim().to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
assert_eq!(containers.len(), RUNS, "couldn't find all containers");
|
||||
assert_eq!(containers.len(), RUNS, "couldn't find all containers");
|
||||
|
||||
let mut res = vec![None; RUNS];
|
||||
'attempt: for _ in 0 .. (TIMEOUT / 10) {
|
||||
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
|
||||
let mut res = vec![None; RUNS];
|
||||
'attempt: for _ in 0 .. (TIMEOUT / 10) {
|
||||
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
|
||||
|
||||
'runner: for (i, container) in containers.iter().enumerate() {
|
||||
if res[i].is_some() {
|
||||
continue;
|
||||
'runner: for (i, container) in containers.iter().enumerate() {
|
||||
if res[i].is_some() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let logs = Command::new("docker").arg("logs").arg(container).output().unwrap();
|
||||
let Some(last_log) =
|
||||
std::str::from_utf8(&logs.stdout).expect("output wasn't utf-8").lines().last()
|
||||
else {
|
||||
continue 'runner;
|
||||
};
|
||||
|
||||
let split = last_log.split("Runtime hash: ").collect::<Vec<_>>();
|
||||
if split.len() == 2 {
|
||||
res[i] = Some(split[1].to_string());
|
||||
continue 'runner;
|
||||
}
|
||||
}
|
||||
|
||||
let logs = Command::new("docker").arg("logs").arg(container).output().unwrap();
|
||||
let Some(last_log) =
|
||||
std::str::from_utf8(&logs.stdout).expect("output wasn't utf-8").lines().last()
|
||||
else {
|
||||
continue 'runner;
|
||||
};
|
||||
|
||||
let split = last_log.split("Runtime hash: ").collect::<Vec<_>>();
|
||||
if split.len() == 2 {
|
||||
res[i] = Some(split[1].to_string());
|
||||
continue 'runner;
|
||||
for item in &res {
|
||||
if item.is_none() {
|
||||
continue 'attempt;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// If we didn't get results from all runners, panic
|
||||
for item in &res {
|
||||
if item.is_none() {
|
||||
continue 'attempt;
|
||||
panic!("couldn't get runtime hashes within allowed time");
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// If we didn't get results from all runners, panic
|
||||
for item in &res {
|
||||
if item.is_none() {
|
||||
panic!("couldn't get runtime hashes within allowed time");
|
||||
let mut identical = HashSet::new();
|
||||
for res in res.clone() {
|
||||
identical.insert(res.unwrap());
|
||||
}
|
||||
}
|
||||
let mut identical = HashSet::new();
|
||||
for res in res.clone() {
|
||||
identical.insert(res.unwrap());
|
||||
}
|
||||
assert_eq!(identical.len(), 1, "got different runtime hashes {:?}", res);
|
||||
});
|
||||
assert_eq!(identical.len(), 1, "got different runtime hashes {:?}", res);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue