Spawn PublishSlashReportTask

Updates it so that it'll try for every network instead of returning after any
network fails.

Uses the SlashReport type throughout the codebase.
This commit is contained in:
Luke Parker 2025-01-15 12:08:28 -05:00
parent 92a4cceeeb
commit 7312fa8d3c
No known key found for this signature in database
10 changed files with 132 additions and 101 deletions
coordinator
src
substrate/src
tributary/src
processor/messages/src
substrate
abi/src
client/src/serai
runtime/src
validator-sets
pallet/src
primitives/src

View file

@ -14,7 +14,7 @@ use borsh::BorshDeserialize;
use tokio::sync::mpsc;
use serai_client::{
primitives::{NetworkId, PublicKey},
primitives::{NetworkId, PublicKey, Signature},
validator_sets::primitives::ValidatorSet,
Serai,
};
@ -25,6 +25,7 @@ use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{Faulted, SignedCosign, Cosigning};
use serai_coordinator_substrate::{
CanonicalEventStream, EphemeralEventStream, SignSlashReport, SignedBatches, PublishBatchTask,
SlashReports, PublishSlashReportTask,
};
use serai_coordinator_tributary::{SigningProtocolRound, Signed, Transaction, SubstrateBlockPlans};
@ -161,6 +162,7 @@ async fn handle_network(
.unwrap()
.continually_run(publish_batch_task_def, vec![]),
);
// Forget its handle so it always runs in the background
core::mem::forget(publish_batch_task);
}
@ -274,8 +276,17 @@ async fn handle_network(
messages::coordinator::ProcessorMessage::SignedBatch { batch } => {
SignedBatches::send(&mut txn, &batch);
}
messages::coordinator::ProcessorMessage::SignedSlashReport { session, signature } => {
todo!("TODO PublishSlashReportTask")
messages::coordinator::ProcessorMessage::SignedSlashReport {
session,
slash_report,
signature,
} => {
SlashReports::set(
&mut txn,
ValidatorSet { network, session },
slash_report,
Signature(signature),
);
}
},
messages::ProcessorMessage::Substrate(msg) => match msg {
@ -472,6 +483,16 @@ async fn main() {
tokio::spawn(handle_network(db.clone(), message_queue.clone(), serai.clone(), network));
}
// Spawn the task to publish slash reports
{
let (publish_slash_report_task_def, publish_slash_report_task) = Task::new();
tokio::spawn(
PublishSlashReportTask::new(db, serai).continually_run(publish_slash_report_task_def, vec![]),
);
// Always have this run in the background
core::mem::forget(publish_slash_report_task);
}
// Run the spawned tasks ad-infinitum
core::future::pending().await
}

View file

@ -6,8 +6,8 @@ use scale::{Encode, Decode};
use borsh::{io, BorshSerialize, BorshDeserialize};
use serai_client::{
primitives::{NetworkId, PublicKey, Signature, SeraiAddress},
validator_sets::primitives::{Session, ValidatorSet, KeyPair},
primitives::{NetworkId, PublicKey, Signature},
validator_sets::primitives::{Session, ValidatorSet, KeyPair, SlashReport},
in_instructions::primitives::SignedBatch,
Transaction,
};
@ -183,10 +183,6 @@ impl SignedBatches {
}
}
/// The slash report was invalid.
#[derive(Debug)]
pub struct InvalidSlashReport;
/// The slash reports to publish onto Serai.
pub struct SlashReports;
impl SlashReports {
@ -194,30 +190,25 @@ impl SlashReports {
///
/// This only saves the most recent slashes as only a single session is eligible to have its
/// slashes reported at once.
///
/// Returns Err if the slashes are invalid. Returns Ok if the slashes weren't detected as
/// invalid. Slashes may be considered invalid by the Serai blockchain later even if not detected
/// as invalid here.
pub fn set(
txn: &mut impl DbTxn,
set: ValidatorSet,
slashes: Vec<(SeraiAddress, u32)>,
slash_report: SlashReport,
signature: Signature,
) -> Result<(), InvalidSlashReport> {
) {
// If we have a more recent slash report, don't write this historic one
if let Some((existing_session, _)) = _public_db::SlashReports::get(txn, set.network) {
if existing_session.0 >= set.session.0 {
return Ok(());
return;
}
}
let tx = serai_client::validator_sets::SeraiValidatorSets::report_slashes(
set.network,
slashes.try_into().map_err(|_| InvalidSlashReport)?,
slash_report,
signature,
);
_public_db::SlashReports::set(txn, set.network, &(set.session, tx.encode()));
Ok(())
}
pub(crate) fn take(txn: &mut impl DbTxn, network: NetworkId) -> Option<(Session, Transaction)> {
let (session, tx) = _public_db::SlashReports::take(txn, network)?;

View file

@ -22,66 +22,80 @@ impl<D: Db> PublishSlashReportTask<D> {
}
}
impl<D: Db> PublishSlashReportTask<D> {
// Returns if a slash report was successfully published
async fn publish(&mut self, network: NetworkId) -> Result<bool, String> {
let mut txn = self.db.txn();
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else {
// No slash report to publish
return Ok(false);
};
let serai = self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let session_after_slash_report = Session(session.0 + 1);
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to publish the slash report for session #n while session #n+1 is still
// active
let session_after_slash_report_retired = current_session > Some(session_after_slash_report.0);
if session_after_slash_report_retired {
// Commit the txn to drain this slash report from the database and not try it again later
txn.commit();
return Ok(false);
}
if Some(session_after_slash_report.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session_after_slash_report.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a slash report for a session Serai has yet to retire".to_string())?;
}
// If this session which should publish a slash report already has, move on
let key_pending_slash_report =
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
if key_pending_slash_report.is_none() {
txn.commit();
return Ok(false);
};
match self.serai.publish(&slash_report).await {
Ok(()) => {
txn.commit();
Ok(true)
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window available to publish the slash report. That makes
// this a non-issue.
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}")),
}
}
}
impl<D: Db> ContinuallyRan for PublishSlashReportTask<D> {
type Error = String;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let mut made_progress = false;
let mut error = None;
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
};
let mut txn = self.db.txn();
let Some((session, slash_report)) = SlashReports::take(&mut txn, network) else {
// No slash report to publish
continue;
};
let serai =
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
let serai = serai.validator_sets();
let session_after_slash_report = Session(session.0 + 1);
let current_session = serai.session(network).await.map_err(|e| format!("{e:?}"))?;
let current_session = current_session.map(|session| session.0);
// Only attempt to publish the slash report for session #n while session #n+1 is still
// active
let session_after_slash_report_retired =
current_session > Some(session_after_slash_report.0);
if session_after_slash_report_retired {
// Commit the txn to drain this slash report from the database and not try it again later
txn.commit();
continue;
}
if Some(session_after_slash_report.0) != current_session {
// We already checked the current session wasn't greater, and they're not equal
assert!(current_session < Some(session_after_slash_report.0));
// This would mean the Serai node is resyncing and is behind where it prior was
Err("have a slash report for a session Serai has yet to retire".to_string())?;
}
// If this session which should publish a slash report already has, move on
let key_pending_slash_report =
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
if key_pending_slash_report.is_none() {
txn.commit();
continue;
};
match self.serai.publish(&slash_report).await {
Ok(()) => {
txn.commit();
made_progress = true;
}
// This could be specific to this TX (such as an already in mempool error) and it may be
// worthwhile to continue iteration with the other pending slash reports. We assume this
// error ephemeral and that the latency incurred for this ephemeral error to resolve is
// miniscule compared to the window available to publish the slash report. That makes
// this a non-issue.
Err(e) => Err(format!("couldn't publish slash report transaction: {e:?}"))?,
}
let network_res = self.publish(network).await;
// We made progress if any network successfully published their slash report
made_progress |= network_res == Ok(true);
// We want to yield the first error *after* attempting for every network
error = error.or(network_res.err());
}
// Yield the error
if let Some(error) = error {
Err(error)?
}
Ok(made_progress)
}

View file

@ -371,7 +371,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Create the resulting slash report
let mut slash_report = vec![];
for (_, points) in self.validators.iter().copied().zip(amortized_slash_report) {
for points in amortized_slash_report {
// TODO: Natively store this as a `Slash`
if points == u32::MAX {
slash_report.push(Slash::Fatal);
@ -397,7 +397,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
self.set,
messages::coordinator::CoordinatorMessage::SignSlashReport {
session: self.set.session,
report: slash_report,
slash_report: slash_report.try_into().unwrap(),
},
);
}

View file

@ -7,7 +7,7 @@ use borsh::{BorshSerialize, BorshDeserialize};
use dkg::Participant;
use serai_primitives::BlockHash;
use validator_sets_primitives::{Session, KeyPair, Slash};
use validator_sets_primitives::{Session, KeyPair, SlashReport};
use coins_primitives::OutInstructionWithBalance;
use in_instructions_primitives::SignedBatch;
@ -100,7 +100,9 @@ pub mod sign {
Self::Cosign(cosign) => {
f.debug_struct("VariantSignId::Cosign").field("0", &cosign).finish()
}
Self::Batch(batch) => f.debug_struct("VariantSignId::Batch").field("0", &batch).finish(),
Self::Batch(batch) => {
f.debug_struct("VariantSignId::Batch").field("0", &hex::encode(batch)).finish()
}
Self::SlashReport => f.debug_struct("VariantSignId::SlashReport").finish(),
Self::Transaction(tx) => {
f.debug_struct("VariantSignId::Transaction").field("0", &hex::encode(tx)).finish()
@ -168,7 +170,7 @@ pub mod coordinator {
/// Sign the slash report for this session.
///
/// This is sent by the Coordinator's Tributary scanner.
SignSlashReport { session: Session, report: Vec<Slash> },
SignSlashReport { session: Session, slash_report: SlashReport },
}
// This set of messages is sent entirely and solely by serai-processor-bin's implementation of
@ -178,7 +180,7 @@ pub mod coordinator {
pub enum ProcessorMessage {
CosignedBlock { cosign: SignedCosign },
SignedBatch { batch: SignedBatch },
SignedSlashReport { session: Session, signature: Vec<u8> },
SignedSlashReport { session: Session, slash_report: SlashReport, signature: [u8; 64] },
}
}

View file

@ -21,7 +21,7 @@ pub enum Call {
},
report_slashes {
network: NetworkId,
slashes: BoundedVec<(SeraiAddress, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>,
slashes: SlashReport,
signature: Signature,
},
allocate {

View file

@ -5,10 +5,10 @@ use sp_runtime::BoundedVec;
use serai_abi::primitives::Amount;
pub use serai_abi::validator_sets::primitives;
use primitives::{MAX_KEY_LEN, Session, ValidatorSet, KeyPair};
use primitives::{MAX_KEY_LEN, Session, ValidatorSet, KeyPair, SlashReport};
use crate::{
primitives::{EmbeddedEllipticCurve, NetworkId, SeraiAddress},
primitives::{EmbeddedEllipticCurve, NetworkId},
Transaction, Serai, TemporalSerai, SeraiError,
};
@ -238,12 +238,7 @@ impl<'a> SeraiValidatorSets<'a> {
pub fn report_slashes(
network: NetworkId,
// TODO: This bounds a maximum length but takes more space than just publishing all the u32s
// (50 * (32 + 4)) > (150 * 4)
slashes: sp_runtime::BoundedVec<
(SeraiAddress, u32),
sp_core::ConstU32<{ primitives::MAX_KEY_SHARES_PER_SET_U32 / 3 }>,
>,
slashes: SlashReport,
signature: Signature,
) -> Transaction {
Serai::unsigned(serai_abi::Call::ValidatorSets(

View file

@ -111,13 +111,7 @@ impl From<Call> for RuntimeCall {
serai_abi::validator_sets::Call::report_slashes { network, slashes, signature } => {
RuntimeCall::ValidatorSets(validator_sets::Call::report_slashes {
network,
slashes: <_>::try_from(
slashes
.into_iter()
.map(|(addr, slash)| (PublicKey::from(addr), slash))
.collect::<Vec<_>>(),
)
.unwrap(),
slashes,
signature,
})
}
@ -301,17 +295,7 @@ impl TryInto<Call> for RuntimeCall {
}
}
validator_sets::Call::report_slashes { network, slashes, signature } => {
serai_abi::validator_sets::Call::report_slashes {
network,
slashes: <_>::try_from(
slashes
.into_iter()
.map(|(addr, slash)| (SeraiAddress::from(addr), slash))
.collect::<Vec<_>>(),
)
.unwrap(),
signature,
}
serai_abi::validator_sets::Call::report_slashes { network, slashes, signature }
}
validator_sets::Call::allocate { network, amount } => {
serai_abi::validator_sets::Call::allocate { network, amount }

View file

@ -1010,7 +1010,7 @@ pub mod pallet {
pub fn report_slashes(
origin: OriginFor<T>,
network: NetworkId,
slashes: BoundedVec<(Public, u32), ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 / 3 }>>,
slashes: SlashReport,
signature: Signature,
) -> DispatchResult {
ensure_none(origin)?;

View file

@ -210,6 +210,30 @@ impl Slash {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SlashReport(pub BoundedVec<Slash, ConstU32<{ MAX_KEY_SHARES_PER_SET_U32 }>>);
#[cfg(feature = "borsh")]
impl BorshSerialize for SlashReport {
fn serialize<W: borsh::io::Write>(&self, writer: &mut W) -> borsh::io::Result<()> {
BorshSerialize::serialize(self.0.as_slice(), writer)
}
}
#[cfg(feature = "borsh")]
impl BorshDeserialize for SlashReport {
fn deserialize_reader<R: borsh::io::Read>(reader: &mut R) -> borsh::io::Result<Self> {
let slashes = Vec::<Slash>::deserialize_reader(reader)?;
slashes
.try_into()
.map(Self)
.map_err(|_| borsh::io::Error::other("length of slash report exceeds max validators"))
}
}
impl TryFrom<Vec<Slash>> for SlashReport {
type Error = &'static str;
fn try_from(slashes: Vec<Slash>) -> Result<SlashReport, &'static str> {
slashes.try_into().map(Self).map_err(|_| "length of slash report exceeds max validators")
}
}
// This is assumed binding to the ValidatorSet via the key signed with
pub fn report_slashes_message(slashes: &SlashReport) -> Vec<u8> {
(b"ValidatorSets-report_slashes", slashes).encode()