PR to track down CI failures (#501)

* Use an extended timeout for DKGs specifically

* Add a log statement when message-queue connection fails

* Add a 60 second keep-alive to connections

* Use zalloc for processor/message-queue/coordinator

An additional layer which protects us against edge cases with Zeroizing
(objects which don't support it or don't miss it).

* Add further logs to message-queue

* Further increase re-attempt timeouts in CI

* Remove misplaced continue inmessage-queue client

Fixes observed CI failures.

* Revert "Further increase re-attempt timeouts in CI"

This reverts commit 3723530cf6.
This commit is contained in:
Luke Parker 2024-01-04 01:08:13 -05:00 committed by GitHub
parent 6c8040f723
commit 7eb388e546
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 63 additions and 15 deletions

3
Cargo.lock generated
View file

@ -7344,6 +7344,7 @@ dependencies = [
"sp-runtime", "sp-runtime",
"tokio", "tokio",
"tributary-chain", "tributary-chain",
"zalloc",
"zeroize", "zeroize",
] ]
@ -7486,6 +7487,7 @@ dependencies = [
"serai-env", "serai-env",
"serai-primitives", "serai-primitives",
"tokio", "tokio",
"zalloc",
"zeroize", "zeroize",
] ]
@ -7607,6 +7609,7 @@ dependencies = [
"sp-application-crypto", "sp-application-crypto",
"thiserror", "thiserror",
"tokio", "tokio",
"zalloc",
"zeroize", "zeroize",
] ]

View file

@ -49,11 +49,14 @@ pub struct Client {
impl Client { impl Client {
fn connector() -> Connector { fn connector() -> Connector {
let mut res = HttpConnector::new();
res.set_keepalive(Some(core::time::Duration::from_secs(60)));
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
let res = let res = HttpsConnectorBuilder::new()
HttpsConnectorBuilder::new().with_native_roots().https_or_http().enable_http1().build(); .with_native_roots()
#[cfg(not(feature = "tls"))] .https_or_http()
let res = HttpConnector::new(); .enable_http1()
.wrap_connector(res);
res res
} }

View file

@ -32,6 +32,7 @@ frost-schnorrkel = { path = "../crypto/schnorrkel" }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] }
zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db" } serai-db = { path = "../common/db" }
serai-env = { path = "../common/env" } serai-env = { path = "../common/env" }

View file

@ -63,6 +63,10 @@ use cosign_evaluator::CosignEvaluator;
#[cfg(test)] #[cfg(test)]
pub mod tests; pub mod tests;
#[global_allocator]
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);
#[derive(Clone)] #[derive(Clone)]
pub struct ActiveTributary<D: Db, P: P2p> { pub struct ActiveTributary<D: Db, P: P2p> {
pub spec: TributarySpec, pub spec: TributarySpec,

View file

@ -141,12 +141,16 @@ impl ReattemptDb {
// 5 minutes for attempts 0 ..= 2, 10 minutes for attempts 3 ..= 5, 15 minutes for attempts > 5 // 5 minutes for attempts 0 ..= 2, 10 minutes for attempts 3 ..= 5, 15 minutes for attempts > 5
// Assumes no event will take longer than 15 minutes, yet grows the time in case there are // Assumes no event will take longer than 15 minutes, yet grows the time in case there are
// network bandwidth issues // network bandwidth issues
let reattempt_delay = BASE_REATTEMPT_DELAY * let mut reattempt_delay = BASE_REATTEMPT_DELAY *
((AttemptDb::attempt(txn, genesis, topic) ((AttemptDb::attempt(txn, genesis, topic)
.expect("scheduling re-attempt for unknown topic") / .expect("scheduling re-attempt for unknown topic") /
3) + 3) +
1) 1)
.min(3); .min(3);
// Allow more time for DKGs since they have an extra round and much more data
if matches!(topic, Topic::Dkg) {
reattempt_delay *= 4;
}
let upon_block = current_block_number + reattempt_delay; let upon_block = current_block_number + reattempt_delay;
let mut reattempts = Self::get(txn, genesis, upon_block).unwrap_or(vec![]); let mut reattempts = Self::get(txn, genesis, upon_block).unwrap_or(vec![]);

View file

@ -40,6 +40,7 @@ env_logger = { version = "0.10", default-features = false, features = ["humantim
# Uses a single threaded runtime since this shouldn't ever be CPU-bound # Uses a single threaded runtime since this shouldn't ever be CPU-bound
tokio = { version = "1", default-features = false, features = ["rt", "time", "io-util", "net", "macros"] } tokio = { version = "1", default-features = false, features = ["rt", "time", "io-util", "net", "macros"] }
zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db", optional = true } serai-db = { path = "../common/db", optional = true }
serai-env = { path = "../common/env" } serai-env = { path = "../common/env" }

View file

@ -68,9 +68,13 @@ impl MessageQueue {
async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool { async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool {
let msg = borsh::to_vec(&msg).unwrap(); let msg = borsh::to_vec(&msg).unwrap();
let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else { let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else {
log::warn!("couldn't send the message len");
return false;
};
let Ok(()) = socket.write_all(&msg).await else {
log::warn!("couldn't write the message");
return false; return false;
}; };
let Ok(()) = socket.write_all(&msg).await else { return false };
true true
} }
@ -118,20 +122,32 @@ impl MessageQueue {
'outer: loop { 'outer: loop {
if !first { if !first {
tokio::time::sleep(core::time::Duration::from_secs(5)).await; tokio::time::sleep(core::time::Duration::from_secs(5)).await;
continue;
} }
first = false; first = false;
let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; log::trace!("opening socket to message-queue for next");
let mut socket = match TcpStream::connect(&self.url).await {
Ok(socket) => socket,
Err(e) => {
log::warn!("couldn't connect to message-queue server: {e:?}");
continue;
}
};
log::trace!("opened socket for next");
loop { loop {
if !Self::send(&mut socket, msg.clone()).await { if !Self::send(&mut socket, msg.clone()).await {
continue 'outer; continue 'outer;
} }
let Ok(status) = socket.read_u8().await else { let status = match socket.read_u8().await {
continue 'outer; Ok(status) => status,
Err(e) => {
log::warn!("couldn't read status u8: {e:?}");
continue 'outer;
}
}; };
// If there wasn't a message, check again in 1s // If there wasn't a message, check again in 1s
// TODO: Use a notification system here
if status == 0 { if status == 0 {
tokio::time::sleep(core::time::Duration::from_secs(1)).await; tokio::time::sleep(core::time::Duration::from_secs(1)).await;
continue; continue;
@ -143,12 +159,17 @@ impl MessageQueue {
// Timeout after 5 seconds in case there's an issue with the length handling // Timeout after 5 seconds in case there's an issue with the length handling
let Ok(msg) = tokio::time::timeout(core::time::Duration::from_secs(5), async { let Ok(msg) = tokio::time::timeout(core::time::Duration::from_secs(5), async {
// Read the message length // Read the message length
let Ok(len) = socket.read_u32_le().await else { let len = match socket.read_u32_le().await {
return vec![]; Ok(len) => len,
Err(e) => {
log::warn!("couldn't read len: {e:?}");
return vec![];
}
}; };
let mut buf = vec![0; usize::try_from(len).unwrap()]; let mut buf = vec![0; usize::try_from(len).unwrap()];
// Read the message // Read the message
let Ok(_) = socket.read_exact(&mut buf).await else { let Ok(_) = socket.read_exact(&mut buf).await else {
log::warn!("couldn't read the message");
return vec![]; return vec![];
}; };
buf buf

View file

@ -1,6 +1,3 @@
mod messages;
mod queue;
pub(crate) use std::{ pub(crate) use std::{
sync::{Arc, RwLock}, sync::{Arc, RwLock},
collections::HashMap, collections::HashMap,
@ -38,6 +35,13 @@ mod clippy {
} }
pub(crate) use self::clippy::*; pub(crate) use self::clippy::*;
mod messages;
mod queue;
#[global_allocator]
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);
// queue RPC method // queue RPC method
/* /*
Queues a message to be delivered from a processor to a coordinator, or vice versa. Queues a message to be delivered from a processor to a coordinator, or vice versa.

View file

@ -54,6 +54,7 @@ log = { version = "0.4", default-features = false, features = ["std"] }
env_logger = { version = "0.10", default-features = false, features = ["humantime"], optional = true } env_logger = { version = "0.10", default-features = false, features = ["humantime"], optional = true }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db", optional = true } serai-db = { path = "../common/db", optional = true }
serai-env = { path = "../common/env", optional = true } serai-env = { path = "../common/env", optional = true }
# TODO: Replace with direct usage of primitives # TODO: Replace with direct usage of primitives

View file

@ -61,6 +61,10 @@ use multisigs::{MultisigEvent, MultisigManager};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
#[global_allocator]
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);
// Items which are mutably borrowed by Tributary. // Items which are mutably borrowed by Tributary.
// Any exceptions to this have to be carefully monitored in order to ensure consistency isn't // Any exceptions to this have to be carefully monitored in order to ensure consistency isn't
// violated. // violated.
@ -559,6 +563,8 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
loop { loop {
let mut txn = raw_db.txn(); let mut txn = raw_db.txn();
log::trace!("new db txn in run");
let mut outer_msg = None; let mut outer_msg = None;
tokio::select! { tokio::select! {