Cache the block's events within TemporalSerai

Event retrieval was prior:
- Retrieve all events in the block, which may be hundreds of KB
- Filter to just a few

Since it's frequent to want multiple sets of events, each filtered in their own
way, this caused the retrieval to happen multiple times. Now, it only will
happen once.

Also has the scoped clients take a reference, not an owned TemporalSerai.
This commit is contained in:
Luke Parker 2023-12-08 08:41:14 -05:00
parent 397fca748f
commit 7122e0faf4
No known key found for this signature in database
12 changed files with 357 additions and 51 deletions

1
Cargo.lock generated
View file

@ -7273,6 +7273,7 @@ dependencies = [
name = "serai-client"
version = "0.1.0"
dependencies = [
"async-lock",
"bitcoin",
"blake2",
"ciphersuite",

View file

@ -0,0 +1,264 @@
/*
If:
A) This block has events and it's been at least X blocks since the last cosign or
B) This block doesn't have events but it's been X blocks since a skipped block which did
have events or
C) This block key gens (which changes who the cosigners are)
cosign this block.
This creates both a minimum and maximum delay of X blocks before a block's cosigning begins,
barring key gens which are exceptional. The minimum delay is there to ensure we don't constantly
spawn new protocols every 6 seconds, overwriting the old ones. The maximum delay is there to
ensure any block needing cosigned is consigned within a reasonable amount of time.
*/
use core::{ops::Deref, time::Duration};
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use zeroize::Zeroizing;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use scale::{Encode, Decode};
use serai_client::{
SeraiError, Block, Serai, TemporalSerai,
primitives::{BlockHash, NetworkId},
validator_sets::{
primitives::{Session, ValidatorSet, KeyPair, amortize_excess_key_shares},
ValidatorSetsEvent,
},
in_instructions::InInstructionsEvent,
coins::CoinsEvent,
};
use serai_db::*;
use processor_messages::SubstrateContext;
use tokio::{sync::mpsc, time::sleep};
use crate::{
Db,
processors::Processors,
tributary::{TributarySpec, SeraiBlockNumber},
};
// 5 minutes, expressed in blocks
// TODO: Pull a constant for block time
const COSIGN_DISTANCE: u64 = 5 * 60 / 6;
create_db!(
SubstrateCosignDb {
CosignTriggered: () -> (),
IntendedCosign: () -> (u64, Option<u64>),
BlockHasEvents: (block: u64) -> u8,
LatestCosignedBlock: () -> u64,
}
);
impl IntendedCosign {
pub fn set_intended_cosign(txn: &mut impl DbTxn, intended: u64) {
Self::set(txn, &(intended, None::<u64>));
}
pub fn set_skipped_cosign(txn: &mut impl DbTxn, skipped: u64) {
let (intended, prior_skipped) = Self::get(txn).unwrap();
assert!(prior_skipped.is_none());
Self::set(txn, &(intended, Some(skipped)));
}
}
impl LatestCosignedBlock {
pub fn latest_cosigned_block(getter: &impl Get) -> u64 {
Self::get(getter).unwrap_or_default().max(1)
}
}
db_channel! {
SubstrateDbChannels {
CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]),
}
}
impl CosignTransactions {
// Append a cosign transaction.
pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) {
CosignTransactions::send(txn, set.network, &(set.session, number, hash))
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
enum HasEvents {
KeyGen,
Yes,
No,
}
async fn block_has_events(
txn: &mut impl DbTxn,
serai: &Serai,
block: u64,
) -> Result<HasEvents, SeraiError> {
let cached = BlockHasEvents::get(txn, block);
match cached {
None => {
let serai = serai.as_of(
serai
.finalized_block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized")
.hash(),
);
if !serai.validator_sets().key_gen_events().await?.is_empty() {
return Ok(HasEvents::KeyGen);
}
let has_no_events = serai.coins().burn_with_instruction_events().await?.is_empty() &&
serai.in_instructions().batch_events().await?.is_empty() &&
serai.validator_sets().new_set_events().await?.is_empty() &&
serai.validator_sets().set_retired_events().await?.is_empty();
let has_events = if has_no_events { HasEvents::No } else { HasEvents::Yes };
let has_events = has_events.encode();
assert_eq!(has_events.len(), 1);
BlockHasEvents::set(txn, block, &has_events[0]);
Ok(HasEvents::Yes)
}
Some(code) => Ok(HasEvents::decode(&mut [code].as_slice()).unwrap()),
}
}
/*
Advances the cosign protocol as should be done per the latest block.
A block is considered cosigned if:
A) It was cosigned
B) It's the parent of a cosigned block
C) It immediately follows a cosigned block and has no events requiring cosigning (TODO)
*/
async fn advance_cosign_protocol(db: &mut impl Db, serai: &Serai, latest_number: u64) -> Result<(), ()> {
let Some((last_intended_to_cosign_block, mut skipped_block)) = IntendedCosign::get(&txn) else {
let mut txn = db.txn();
IntendedCosign::set_intended_cosign(&mut txn, 1);
txn.commit();
return Ok(());
};
}
// If we haven't flagged skipped, and a block within the distance had events, flag the first
// such block as skipped
let mut distance_end_exclusive = last_intended_to_cosign_block + COSIGN_DISTANCE;
// If we've never triggered a cosign, don't skip any cosigns
if CosignTriggered::get(&txn).is_none() {
distance_end_exclusive = 0;
}
if skipped_block.is_none() {
for b in (last_intended_to_cosign_block + 1) .. distance_end_exclusive {
if b > latest_number {
break;
}
if block_has_events(&mut txn, serai, b).await? == HasEvents::Yes {
skipped_block = Some(b);
log::debug!("skipping cosigning {b} due to proximity to prior cosign");
IntendedCosign::set_skipped_cosign(&mut txn, b);
break;
}
}
}
let mut has_no_cosigners = None;
let mut cosign = vec![];
// Block we should cosign no matter what if no prior blocks qualified for cosigning
let maximally_latent_cosign_block =
skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE);
for block in (last_intended_to_cosign_block + 1) ..= latest_number {
let actual_block = serai
.finalized_block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized");
SeraiBlockNumber::set(&mut txn, actual_block.hash(), &block);
let mut set = false;
let block_has_events = block_has_events(&mut txn, serai, block).await?;
// If this block is within the distance,
if block < distance_end_exclusive {
// and set a key, cosign it
if block_has_events == HasEvents::KeyGen {
IntendedCosign::set_intended_cosign(&mut txn, block);
set = true;
// Carry skipped if it isn't included by cosigning this block
if let Some(skipped) = skipped_block {
if skipped > block {
IntendedCosign::set_skipped_cosign(&mut txn, block);
}
}
}
} else if (Some(block) == maximally_latent_cosign_block) ||
(block_has_events != HasEvents::No)
{
// Since this block was outside the distance and had events/was maximally latent, cosign it
IntendedCosign::set_intended_cosign(&mut txn, block);
set = true;
}
if set {
// Get the keys as of the prior block
// That means if this block is setting new keys (which won't lock in until we process this
// block), we won't freeze up waiting for the yet-to-be-processed keys to sign this block
let serai = serai.as_of(actual_block.header.parent_hash.into());
has_no_cosigners = Some(actual_block.clone());
for network in serai_client::primitives::NETWORKS {
// Get the latest session to have set keys
let Some(latest_session) = serai.validator_sets().session(network).await? else {
continue;
};
let prior_session = Session(latest_session.0.saturating_sub(1));
let set_with_keys = if serai
.validator_sets()
.keys(ValidatorSet { network, session: prior_session })
.await?
.is_some()
{
ValidatorSet { network, session: prior_session }
} else {
let set = ValidatorSet { network, session: latest_session };
if serai.validator_sets().keys(set).await?.is_none() {
continue;
}
set
};
// Since this is a valid cosigner, don't flag this block as having no cosigners
has_no_cosigners = None;
log::debug!("{:?} will be cosigning {block}", set_with_keys.network);
if in_set(key, &serai, set_with_keys).await?.unwrap() {
cosign.push((set_with_keys, block, actual_block.hash()));
}
}
break;
}
}
// If this block doesn't have cosigners, yet does have events, automatically mark it as
// cosigned
if let Some(has_no_cosigners) = has_no_cosigners {
log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number());
LatestCosignedBlock::set(&mut txn, &has_no_cosigners.number());
} else {
CosignTriggered::set(&mut txn, &());
for (set, block, hash) in cosign {
log::debug!("cosigning {block} with {:?} {:?}", set.network, set.session);
CosignTransactions::append_cosign(&mut txn, set, block, hash);
}
}
txn.commit();

View file

@ -62,7 +62,8 @@ async fn handle_new_set<D: Db>(
log::info!("present in set {:?}", set);
let set_data = {
let serai = serai.as_of(block.hash()).validator_sets();
let serai = serai.as_of(block.hash());
let serai = serai.validator_sets();
let set_participants =
serai.participants(set.network).await?.expect("NewSet for set which doesn't exist");

View file

@ -28,6 +28,8 @@ sp-core = { git = "https://github.com/serai-dex/substrate", optional = true }
sp-runtime = { git = "https://github.com/serai-dex/substrate", optional = true }
frame-system = { git = "https://github.com/serai-dex/substrate", optional = true }
async-lock = "3"
simple-request = { path = "../../common/request", version = "0.1", optional = true }
bitcoin = { version = "0.31", optional = true }

View file

@ -11,18 +11,18 @@ const PALLET: &str = "Coins";
pub type CoinsEvent = serai_abi::coins::Event;
#[derive(Clone, Copy)]
pub struct SeraiCoins<'a>(pub(crate) TemporalSerai<'a>);
pub struct SeraiCoins<'a>(pub(crate) &'a TemporalSerai<'a>);
impl<'a> SeraiCoins<'a> {
pub fn into_inner(self) -> TemporalSerai<'a> {
self.0
}
pub async fn mint_events(&self) -> Result<Vec<CoinsEvent>, SeraiError> {
self
.0
.events(|event| {
if let serai_abi::Event::Coins(event) = event {
Some(event).filter(|event| matches!(event, CoinsEvent::Mint { .. }))
if matches!(event, CoinsEvent::Mint { .. }) {
Some(event.clone())
} else {
None
}
} else {
None
}
@ -35,7 +35,11 @@ impl<'a> SeraiCoins<'a> {
.0
.events(|event| {
if let serai_abi::Event::Coins(event) = event {
Some(event).filter(|event| matches!(event, CoinsEvent::BurnWithInstruction { .. }))
if matches!(event, CoinsEvent::BurnWithInstruction { .. }) {
Some(event.clone())
} else {
None
}
} else {
None
}

View file

@ -6,12 +6,14 @@ use crate::{SeraiError, TemporalSerai};
pub type DexEvent = serai_abi::dex::Event;
#[derive(Clone, Copy)]
pub struct SeraiDex<'a>(pub(crate) TemporalSerai<'a>);
pub struct SeraiDex<'a>(pub(crate) &'a TemporalSerai<'a>);
impl<'a> SeraiDex<'a> {
pub async fn events(&self) -> Result<Vec<DexEvent>, SeraiError> {
self
.0
.events(|event| if let serai_abi::Event::Dex(event) = event { Some(event) } else { None })
.events(
|event| if let serai_abi::Event::Dex(event) = event { Some(event.clone()) } else { None },
)
.await
}

View file

@ -11,12 +11,8 @@ pub type InInstructionsEvent = serai_abi::in_instructions::Event;
const PALLET: &str = "InInstructions";
#[derive(Clone, Copy)]
pub struct SeraiInInstructions<'a>(pub(crate) TemporalSerai<'a>);
pub struct SeraiInInstructions<'a>(pub(crate) &'a TemporalSerai<'a>);
impl<'a> SeraiInInstructions<'a> {
pub fn into_inner(self) -> TemporalSerai<'a> {
self.0
}
pub async fn latest_block_for_network(
&self,
network: NetworkId,
@ -36,7 +32,11 @@ impl<'a> SeraiInInstructions<'a> {
.0
.events(|event| {
if let serai_abi::Event::InInstructions(event) = event {
Some(event).filter(|event| matches!(event, InInstructionsEvent::Batch { .. }))
if matches!(event, InInstructionsEvent::Batch { .. }) {
Some(event.clone())
} else {
None
}
} else {
None
}

View file

@ -1,5 +1,6 @@
use thiserror::Error;
use async_lock::RwLock;
use simple_request::{hyper, Request, Client};
use scale::{Encode, Decode, Compact};
@ -69,8 +70,17 @@ pub struct Serai {
genesis: [u8; 32],
}
#[derive(Clone, Copy)]
pub struct TemporalSerai<'a>(pub(crate) &'a Serai, pub(crate) [u8; 32]);
type EventsInBlock = Vec<frame_system::EventRecord<Event, [u8; 32]>>;
pub struct TemporalSerai<'a> {
serai: &'a Serai,
block: [u8; 32],
events: RwLock<Option<EventsInBlock>>,
}
impl<'a> Clone for TemporalSerai<'a> {
fn clone(&self) -> Self {
Self { serai: self.serai, block: self.block, events: RwLock::new(None) }
}
}
impl Serai {
pub async fn call<Req: Serialize, Res: DeserializeOwned>(
@ -289,27 +299,35 @@ impl Serai {
/// itself.
pub async fn as_of_latest_finalized_block(&self) -> Result<TemporalSerai, SeraiError> {
let latest = self.latest_finalized_block_hash().await?;
Ok(TemporalSerai(self, latest))
Ok(TemporalSerai { serai: self, block: latest, events: RwLock::new(None) })
}
/// Returns a TemporalSerai able to retrieve state as of the specified block.
pub fn as_of(&self, block: [u8; 32]) -> TemporalSerai {
TemporalSerai(self, block)
TemporalSerai { serai: self, block, events: RwLock::new(None) }
}
}
impl<'a> TemporalSerai<'a> {
pub fn into_inner(&self) -> &Serai {
self.0
}
async fn events<E>(
&self,
filter_map: impl Fn(&Event) -> Option<E>,
) -> Result<Vec<E>, SeraiError> {
let mut events = self.events.read().await;
if events.is_none() {
drop(events);
let mut events_write = self.events.write().await;
#[allow(clippy::unwrap_or_default)]
if events_write.is_none() {
*events_write = Some(self.storage("System", "Events", ()).await?.unwrap_or(vec![]));
}
drop(events_write);
events = self.events.read().await;
}
async fn events<E>(&self, filter_map: impl Fn(Event) -> Option<E>) -> Result<Vec<E>, SeraiError> {
let mut res = vec![];
let all_events: Option<Vec<frame_system::EventRecord<Event, [u8; 32]>>> =
self.storage("System", "Events", ()).await?;
#[allow(clippy::unwrap_or_default)]
for event in all_events.unwrap_or(vec![]) {
if let Some(event) = filter_map(event.event) {
for event in events.as_ref().unwrap() {
if let Some(event) = filter_map(&event.event) {
res.push(event);
}
}
@ -328,7 +346,7 @@ impl<'a> TemporalSerai<'a> {
full_key.extend(key.encode());
let res: Option<String> =
self.0.call("state_getStorage", [hex::encode(full_key), hex::encode(self.1)]).await?;
self.serai.call("state_getStorage", [hex::encode(full_key), hex::encode(self.block)]).await?;
let Some(res) = res else { return Ok(None) };
let res = Serai::hex_decode(res)?;
Ok(Some(R::decode(&mut res.as_slice()).map_err(|_| {
@ -336,19 +354,19 @@ impl<'a> TemporalSerai<'a> {
})?))
}
pub fn coins(self) -> SeraiCoins<'a> {
pub fn coins(&'a self) -> SeraiCoins<'a> {
SeraiCoins(self)
}
pub fn dex(self) -> SeraiDex<'a> {
pub fn dex(&'a self) -> SeraiDex<'a> {
SeraiDex(self)
}
pub fn in_instructions(self) -> SeraiInInstructions<'a> {
pub fn in_instructions(&'a self) -> SeraiInInstructions<'a> {
SeraiInInstructions(self)
}
pub fn validator_sets(self) -> SeraiValidatorSets<'a> {
pub fn validator_sets(&'a self) -> SeraiValidatorSets<'a> {
SeraiValidatorSets(self)
}
}

View file

@ -16,18 +16,18 @@ const PALLET: &str = "ValidatorSets";
pub type ValidatorSetsEvent = serai_abi::validator_sets::Event;
#[derive(Clone, Copy)]
pub struct SeraiValidatorSets<'a>(pub(crate) TemporalSerai<'a>);
pub struct SeraiValidatorSets<'a>(pub(crate) &'a TemporalSerai<'a>);
impl<'a> SeraiValidatorSets<'a> {
pub fn into_inner(self) -> TemporalSerai<'a> {
self.0
}
pub async fn new_set_events(&self) -> Result<Vec<ValidatorSetsEvent>, SeraiError> {
self
.0
.events(|event| {
if let serai_abi::Event::ValidatorSets(event) = event {
Some(event).filter(|event| matches!(event, ValidatorSetsEvent::NewSet { .. }))
if matches!(event, ValidatorSetsEvent::NewSet { .. }) {
Some(event.clone())
} else {
None
}
} else {
None
}
@ -40,7 +40,11 @@ impl<'a> SeraiValidatorSets<'a> {
.0
.events(|event| {
if let serai_abi::Event::ValidatorSets(event) = event {
Some(event).filter(|event| matches!(event, ValidatorSetsEvent::KeyGen { .. }))
if matches!(event, ValidatorSetsEvent::KeyGen { .. }) {
Some(event.clone())
} else {
None
}
} else {
None
}
@ -53,7 +57,11 @@ impl<'a> SeraiValidatorSets<'a> {
.0
.events(|event| {
if let serai_abi::Event::ValidatorSets(event) = event {
Some(event).filter(|event| matches!(event, ValidatorSetsEvent::SetRetired { .. }))
if matches!(event, ValidatorSetsEvent::SetRetired { .. }) {
Some(event.clone())
} else {
None
}
} else {
None
}

View file

@ -54,6 +54,7 @@ serai_test!(
let block = provide_batch(&serai, batch.clone()).await;
let instruction = {
let serai = serai.as_of(block);
let batches = serai.in_instructions().batch_events().await.unwrap();
assert_eq!(
@ -82,19 +83,20 @@ serai_test!(
OsRng.fill_bytes(&mut rand_bytes);
let data = Data::new(rand_bytes).unwrap();
let instruction = OutInstructionWithBalance {
OutInstructionWithBalance {
balance,
instruction: OutInstruction { address: external_address, data: Some(data) },
};
}
};
let serai = serai.into_inner();
let block = publish_tx(
serai,
&serai,
&serai.sign(&pair, SeraiCoins::burn_with_instruction(instruction.clone()), 0, 0),
)
.await;
let serai = serai.as_of(block).coins();
let serai = serai.as_of(block);
let serai = serai.coins();
let events = serai.burn_with_instruction_events().await.unwrap();
assert_eq!(events, vec![CoinsEvent::BurnWithInstruction { from: address, instruction }]);
assert_eq!(serai.coin_supply(coin).await.unwrap(), Amount(0));

View file

@ -48,7 +48,8 @@ serai_test!(
);
{
let vs_serai = serai.as_of_latest_finalized_block().await.unwrap().validator_sets();
let vs_serai = serai.as_of_latest_finalized_block().await.unwrap();
let vs_serai = vs_serai.validator_sets();
let participants = vs_serai.participants(set.network).await
.unwrap()
.unwrap()
@ -64,7 +65,8 @@ serai_test!(
// While the set_keys function should handle this, it's beneficial to
// independently test it
let serai = serai.as_of(block).validator_sets();
let serai = serai.as_of(block);
let serai = serai.validator_sets();
assert_eq!(
serai.key_gen_events().await.unwrap(),
vec![ValidatorSetsEvent::KeyGen { set, key_pair: key_pair.clone() }]

View file

@ -243,7 +243,8 @@ async fn sign_test() {
let block_included_in_hash =
serai.finalized_block_by_number(block_included_in).await.unwrap().unwrap().hash();
let serai = serai.as_of(block_included_in_hash).coins();
let serai = serai.as_of(block_included_in_hash);
let serai = serai.coins();
assert_eq!(
serai.coin_balance(Coin::Serai, serai_addr).await.unwrap(),
Amount(1_000_000_000)
@ -310,7 +311,8 @@ async fn sign_test() {
let last_serai_block =
serai.finalized_block_by_number(last_serai_block).await.unwrap().unwrap();
let last_serai_block_hash = last_serai_block.hash();
let serai = serai.as_of(last_serai_block_hash).coins();
let serai = serai.as_of(last_serai_block_hash);
let serai = serai.coins();
assert_eq!(serai.coin_supply(Coin::Bitcoin).await.unwrap(), Amount(0));
assert_eq!(serai.coin_balance(Coin::Bitcoin, serai_addr).await.unwrap(), Amount(0));