Move the coordinator to a n-processor design

This commit is contained in:
Luke Parker 2023-05-09 23:44:41 -04:00
parent 9175383e89
commit 7b7ddbdd97
No known key found for this signature in database
6 changed files with 228 additions and 174 deletions

View file

@ -42,8 +42,8 @@ pub use p2p::*;
use processor_messages::{key_gen, sign, coordinator, ProcessorMessage};
pub mod processor;
use processor::Processor;
pub mod processors;
use processors::Processors;
mod substrate;
@ -90,10 +90,10 @@ async fn add_tributary<D: Db, P: P2p>(
reader
}
pub async fn scan_substrate<D: Db, Pro: Processor>(
pub async fn scan_substrate<D: Db, Pro: Processors>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
processor: Pro,
processors: Pro,
serai: Serai,
) {
let mut db = substrate::SubstrateDb::new(db);
@ -114,7 +114,7 @@ pub async fn scan_substrate<D: Db, Pro: Processor>(
NEW_TRIBUTARIES.write().await.push_back(spec);
}
},
&processor,
&processors,
&serai,
&mut last_substrate_block,
)
@ -132,12 +132,12 @@ pub async fn scan_substrate<D: Db, Pro: Processor>(
}
#[allow(clippy::type_complexity)]
pub async fn scan_tributaries<D: Db, Pro: Processor, P: P2p>(
pub async fn scan_tributaries<D: Db, Pro: Processors, P: P2p>(
raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id_send: UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
p2p: P,
processor: Pro,
processors: Pro,
tributaries: Arc<RwLock<HashMap<[u8; 32], ActiveTributary<D, P>>>>,
) {
let mut tributary_readers = vec![];
@ -174,7 +174,7 @@ pub async fn scan_tributaries<D: Db, Pro: Processor, P: P2p>(
&mut tributary_db,
&key,
&recognized_id_send,
&processor,
&processors,
spec,
reader,
)
@ -359,18 +359,18 @@ pub async fn publish_transaction<D: Db, P: P2p>(
}
#[allow(clippy::type_complexity)]
pub async fn handle_processors<D: Db, Pro: Processor, P: P2p>(
pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
mut db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
mut processor: Pro,
mut processors: Pro,
tributaries: Arc<RwLock<HashMap<[u8; 32], ActiveTributary<D, P>>>>,
) {
let pub_key = Ristretto::generator() * key.deref();
loop {
let msg = processor.recv().await;
let msg = processors.recv().await;
// TODO: We need (ValidatorSet or key) to genesis hash
// TODO: Get genesis hash by msg.network
let genesis = [0; 32];
let tx = match msg.msg {
@ -410,9 +410,12 @@ pub async fn handle_processors<D: Db, Pro: Processor, P: P2p>(
// TODO
sign::ProcessorMessage::Completed { .. } => todo!(),
},
ProcessorMessage::Coordinator(msg) => match msg {
coordinator::ProcessorMessage::SubstrateBlockAck { network: _, block, plans } => {
// TODO2: Check this network aligns with this processor
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
assert_eq!(
msg.network, network,
"processor claimed to be a different network than it was",
);
// Safe to use its own txn since this is static and just needs to be written before we
// provide SubstrateBlock
@ -494,15 +497,15 @@ pub async fn handle_processors<D: Db, Pro: Processor, P: P2p>(
}
}
pub async fn run<D: Db, Pro: Processor, P: P2p>(
pub async fn run<D: Db, Pro: Processors, P: P2p>(
mut raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
p2p: P,
processor: Pro,
processors: Pro,
serai: Serai,
) {
// Handle new Substrate blocks
tokio::spawn(scan_substrate(raw_db.clone(), key.clone(), processor.clone(), serai.clone()));
tokio::spawn(scan_substrate(raw_db.clone(), key.clone(), processors.clone(), serai.clone()));
// Handle the Tributaries
@ -531,7 +534,7 @@ pub async fn run<D: Db, Pro: Processor, P: P2p>(
key.clone(),
recognized_id_send,
p2p.clone(),
processor.clone(),
processors.clone(),
tributaries.clone(),
));
}
@ -587,7 +590,7 @@ pub async fn run<D: Db, Pro: Processor, P: P2p>(
tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries.clone()));
// Handle all messages from processors
handle_processors(raw_db, key, processor, tributaries).await;
handle_processors(raw_db, key, processors, tributaries).await;
}
#[tokio::main]
@ -597,7 +600,7 @@ async fn main() {
let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::ZERO); // TODO
let p2p = LocalP2p::new(1).swap_remove(0); // TODO
let processor = processor::MemProcessor::new(); // TODO
let processors = processors::MemProcessors::new(); // TODO
let serai = || async {
loop {
@ -609,5 +612,5 @@ async fn main() {
return serai;
}
};
run(db, key, p2p, processor, serai().await).await
run(db, key, p2p, processors, serai().await).await
}

View file

@ -1,41 +0,0 @@
use std::{sync::Arc, collections::VecDeque};
use tokio::sync::RwLock;
use processor_messages::{ProcessorMessage, CoordinatorMessage};
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Message {
pub id: u64,
pub msg: ProcessorMessage,
}
#[async_trait::async_trait]
pub trait Processor: 'static + Send + Sync + Clone {
async fn send(&self, msg: CoordinatorMessage);
async fn recv(&mut self) -> Message;
async fn ack(&mut self, msg: Message);
}
// TODO: Move this to tests
#[derive(Clone)]
pub struct MemProcessor(pub Arc<RwLock<VecDeque<CoordinatorMessage>>>);
impl MemProcessor {
#[allow(clippy::new_without_default)]
pub fn new() -> MemProcessor {
MemProcessor(Arc::new(RwLock::new(VecDeque::new())))
}
}
#[async_trait::async_trait]
impl Processor for MemProcessor {
async fn send(&self, msg: CoordinatorMessage) {
self.0.write().await.push_back(msg)
}
async fn recv(&mut self) -> Message {
todo!()
}
async fn ack(&mut self, _: Message) {
todo!()
}
}

View file

@ -0,0 +1,49 @@
use std::{
sync::Arc,
collections::{VecDeque, HashMap},
};
use tokio::sync::RwLock;
use serai_client::primitives::NetworkId;
use processor_messages::{ProcessorMessage, CoordinatorMessage};
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Message {
pub id: u64,
pub network: NetworkId,
pub msg: ProcessorMessage,
}
#[async_trait::async_trait]
pub trait Processors: 'static + Send + Sync + Clone {
async fn send(&self, network: NetworkId, msg: CoordinatorMessage);
async fn recv(&mut self) -> Message;
async fn ack(&mut self, msg: Message);
}
// TODO: Move this to tests
#[derive(Clone)]
pub struct MemProcessors(pub Arc<RwLock<HashMap<NetworkId, VecDeque<CoordinatorMessage>>>>);
impl MemProcessors {
#[allow(clippy::new_without_default)]
pub fn new() -> MemProcessors {
MemProcessors(Arc::new(RwLock::new(HashMap::new())))
}
}
#[async_trait::async_trait]
impl Processors for MemProcessors {
async fn send(&self, network: NetworkId, msg: CoordinatorMessage) {
let mut processors = self.0.write().await;
let processor = processors.entry(network).or_insert_with(VecDeque::new);
processor.push_back(msg);
}
async fn recv(&mut self) -> Message {
todo!()
}
async fn ack(&mut self, _: Message) {
todo!()
}
}

View file

@ -21,7 +21,7 @@ use serai_db::DbTxn;
use processor_messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage};
use crate::{Db, processor::Processor, tributary::TributarySpec};
use crate::{Db, processors::Processors, tributary::TributarySpec};
mod db;
pub use db::*;
@ -42,12 +42,12 @@ async fn handle_new_set<
D: Db,
Fut: Future<Output = ()>,
CNT: Clone + Fn(&mut D, TributarySpec) -> Fut,
Pro: Processor,
Pro: Processors,
>(
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT,
processor: &Pro,
processors: &Pro,
serai: &Serai,
block: &Block,
set: ValidatorSet,
@ -63,9 +63,10 @@ async fn handle_new_set<
// We already have a unique event ID based on block, event index (where event index is
// the one generated in this handle_block function)
// We could use that on this end and the processor end?
processor
.send(CoordinatorMessage::KeyGen(
processor_messages::key_gen::CoordinatorMessage::GenerateKey {
processors
.send(
set.network,
CoordinatorMessage::KeyGen(processor_messages::key_gen::CoordinatorMessage::GenerateKey {
id: KeyGenId { set, attempt: 0 },
params: ThresholdParams::new(
spec.t(),
@ -75,17 +76,18 @@ async fn handle_new_set<
.expect("In set for a set we aren't in set for"),
)
.unwrap(),
},
))
}),
)
.await;
}
Ok(())
}
async fn handle_key_gen<Pro: Processor>(
async fn handle_key_gen<D: Db, Pro: Processors>(
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
processor: &Pro,
processors: &Pro,
serai: &Serai,
block: &Block,
set: ValidatorSet,
@ -93,30 +95,33 @@ async fn handle_key_gen<Pro: Processor>(
) -> Result<(), SeraiError> {
if in_set(key, serai, set).await?.expect("KeyGen occurred for a set which doesn't exist") {
// TODO: Check how the processor handles this being fired multiple times
processor
.send(CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext {
serai_time: block.time().unwrap(),
coin_latest_finalized_block: serai
.get_latest_block_for_network(block.hash(), set.network)
.await?
// The processor treats this as a magic value which will cause it to find a network
// block which has a time greater than or equal to the Serai time
.unwrap_or(BlockHash([0; 32])),
processors
.send(
set.network,
CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext {
serai_time: block.time().unwrap(),
coin_latest_finalized_block: serai
.get_latest_block_for_network(block.hash(), set.network)
.await?
// The processor treats this as a magic value which will cause it to find a network
// block which has a time greater than or equal to the Serai time
.unwrap_or(BlockHash([0; 32])),
},
set,
key_pair,
},
set,
key_pair,
},
))
),
)
.await;
}
Ok(())
}
async fn handle_batch_and_burns<Pro: Processor>(
processor: &Pro,
async fn handle_batch_and_burns<Pro: Processors>(
processors: &Pro,
serai: &Serai,
block: &Block,
) -> Result<(), SeraiError> {
@ -182,23 +187,26 @@ async fn handle_batch_and_burns<Pro: Processor>(
};
// TODO: Check how the processor handles this being fired multiple times
processor
.send(CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {
serai_time: block.time().unwrap(),
coin_latest_finalized_block,
processors
.send(
network,
CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {
serai_time: block.time().unwrap(),
coin_latest_finalized_block,
},
network,
block: block.number(),
key: serai
.get_keys(ValidatorSet { network, session: Session(0) }) // TODO2
.await?
.map(|keys| keys.1.into_inner())
.expect("batch/burn for network which never set keys"),
burns: burns.remove(&network).unwrap(),
},
network,
block: block.number(),
key: serai
.get_keys(ValidatorSet { network, session: Session(0) }) // TODO2
.await?
.map(|keys| keys.1.into_inner())
.expect("batch/burn for network which never set keys"),
burns: burns.remove(&network).unwrap(),
},
))
),
)
.await;
}
@ -211,12 +219,12 @@ async fn handle_block<
D: Db,
Fut: Future<Output = ()>,
CNT: Clone + Fn(&mut D, TributarySpec) -> Fut,
Pro: Processor,
Pro: Processors,
>(
db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT,
processor: &Pro,
processors: &Pro,
serai: &Serai,
block: Block,
) -> Result<(), SeraiError> {
@ -233,8 +241,16 @@ async fn handle_block<
// stable)
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if let ValidatorSetsEvent::NewSet { set } = new_set {
handle_new_set(&mut db.0, key, create_new_tributary.clone(), processor, serai, &block, set)
.await?;
handle_new_set(
&mut db.0,
key,
create_new_tributary.clone(),
processors,
serai,
&block,
set,
)
.await?;
} else {
panic!("NewSet event wasn't NewSet: {new_set:?}");
}
@ -249,7 +265,7 @@ async fn handle_block<
for key_gen in serai.get_key_gen_events(hash).await? {
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen {
handle_key_gen(key, processor, serai, &block, set, key_pair).await?;
handle_key_gen(&mut db.0, key, processors, serai, &block, set, key_pair).await?;
} else {
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
}
@ -266,7 +282,7 @@ async fn handle_block<
// This does break the uniqueness of (hash, event_id) -> one event, yet
// (network, (hash, event_id)) remains valid as a unique ID for an event
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
handle_batch_and_burns(processor, serai, &block).await?;
handle_batch_and_burns(processors, serai, &block).await?;
}
let mut txn = db.0.txn();
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
@ -279,12 +295,12 @@ pub async fn handle_new_blocks<
D: Db,
Fut: Future<Output = ()>,
CNT: Clone + Fn(&mut D, TributarySpec) -> Fut,
Pro: Processor,
Pro: Processors,
>(
db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
create_new_tributary: CNT,
processor: &Pro,
processors: &Pro,
serai: &Serai,
last_block: &mut u64,
) -> Result<(), SeraiError> {
@ -301,7 +317,7 @@ pub async fn handle_new_blocks<
db,
key,
create_new_tributary.clone(),
processor,
processors,
serai,
if b == latest_number {
latest.take().unwrap()

View file

@ -19,7 +19,7 @@ use processor_messages::{
use tributary::{Transaction as TransactionTrait, Tributary};
use crate::{
processor::MemProcessor,
processors::MemProcessors,
LocalP2p,
tributary::{TributaryDb, Transaction, TributarySpec, scanner::handle_new_blocks},
tests::tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion},
@ -74,29 +74,29 @@ async fn dkg_test() {
.collect(),
});
async fn new_processor(
async fn new_processors(
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
spec: &TributarySpec,
tributary: &Tributary<MemDb, Transaction, LocalP2p>,
) -> (TributaryDb<MemDb>, MemProcessor) {
) -> (TributaryDb<MemDb>, MemProcessors) {
let mut scanner_db = TributaryDb(MemDb::new());
let processor = MemProcessor::new();
let processors = MemProcessors::new();
// Uses a brand new channel since this channel won't be used within this test
handle_new_blocks(
&mut scanner_db,
key,
&mpsc::unbounded_channel().0,
&processor,
&processors,
spec,
&tributary.reader(),
)
.await;
(scanner_db, processor)
(scanner_db, processors)
}
// Instantiate a scanner and verify it has nothing to report
let (mut scanner_db, processor) = new_processor(&keys[0], &spec, &tributaries[0].1).await;
assert!(processor.0.read().await.is_empty());
let (mut scanner_db, processors) = new_processors(&keys[0], &spec, &tributaries[0].1).await;
assert!(processors.0.read().await.is_empty());
// Publish the last commitment
let block_before_tx = tributaries[0].1.tip().await;
@ -109,21 +109,25 @@ async fn dkg_test() {
&mut scanner_db,
&keys[0],
&mpsc::unbounded_channel().0,
&processor,
&processors,
&spec,
&tributaries[0].1.reader(),
)
.await;
{
let mut msgs = processor.0.write().await;
let mut msgs = processors.0.write().await;
assert_eq!(msgs.len(), 1);
let msgs = msgs.get_mut(&spec.set().network).unwrap();
assert_eq!(msgs.pop_front().unwrap(), expected_commitments);
assert!(msgs.is_empty());
}
// Verify all keys exhibit this scanner behavior
for (i, key) in keys.iter().enumerate() {
let (_, processor) = new_processor(key, &spec, &tributaries[i].1).await;
let mut msgs = processor.0.write().await;
let (_, processors) = new_processors(key, &spec, &tributaries[i].1).await;
let mut msgs = processors.0.write().await;
assert_eq!(msgs.len(), 1);
let msgs = msgs.get_mut(&spec.set().network).unwrap();
assert_eq!(msgs.pop_front().unwrap(), expected_commitments);
assert!(msgs.is_empty());
}
@ -158,12 +162,13 @@ async fn dkg_test() {
&mut scanner_db,
&keys[0],
&mpsc::unbounded_channel().0,
&processor,
&processors,
&spec,
&tributaries[0].1.reader(),
)
.await;
assert!(processor.0.write().await.is_empty());
assert_eq!(processors.0.read().await.len(), 1);
assert!(processors.0.read().await[&spec.set().network].is_empty());
// Publish the final set of shares
let block_before_tx = tributaries[0].1.tip().await;
@ -197,21 +202,25 @@ async fn dkg_test() {
&mut scanner_db,
&keys[0],
&mpsc::unbounded_channel().0,
&processor,
&processors,
&spec,
&tributaries[0].1.reader(),
)
.await;
{
let mut msgs = processor.0.write().await;
let mut msgs = processors.0.write().await;
assert_eq!(msgs.len(), 1);
let msgs = msgs.get_mut(&spec.set().network).unwrap();
assert_eq!(msgs.pop_front().unwrap(), shares_for(0));
assert!(msgs.is_empty());
}
// Yet new scanners should emit all events
for (i, key) in keys.iter().enumerate() {
let (_, processor) = new_processor(key, &spec, &tributaries[i].1).await;
let mut msgs = processor.0.write().await;
let (_, processors) = new_processors(key, &spec, &tributaries[i].1).await;
let mut msgs = processors.0.write().await;
assert_eq!(msgs.len(), 1);
let msgs = msgs.get_mut(&spec.set().network).unwrap();
assert_eq!(msgs.pop_front().unwrap(), expected_commitments);
assert_eq!(msgs.pop_front().unwrap(), shares_for(i));
assert!(msgs.is_empty());

View file

@ -19,7 +19,7 @@ use serai_db::DbTxn;
use crate::{
Db,
processor::Processor,
processors::Processors,
tributary::{TributaryDb, TributarySpec, Transaction},
};
@ -30,11 +30,11 @@ pub enum RecognizedIdType {
}
// Handle a specific Tributary block
async fn handle_block<D: Db, Pro: Processor>(
async fn handle_block<D: Db, Pro: Processors>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
processor: &Pro,
processors: &Pro,
spec: &TributarySpec,
block: Block<Transaction>,
) {
@ -144,11 +144,14 @@ async fn handle_block<D: Db, Pro: Processor>(
if let Some(commitments) =
handle(Zone::Dkg, b"dkg_commitments", spec.n(), [0; 32], attempt, bytes, signed)
{
processor
.send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments {
id: KeyGenId { set: spec.set(), attempt },
commitments,
}))
processors
.send(
spec.set().network,
CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments {
id: KeyGenId { set: spec.set(), attempt },
commitments,
}),
)
.await;
}
}
@ -170,11 +173,14 @@ async fn handle_block<D: Db, Pro: Processor>(
if let Some(shares) =
handle(Zone::Dkg, b"dkg_shares", spec.n(), [0; 32], attempt, bytes, signed)
{
processor
.send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares {
id: KeyGenId { set: spec.set(), attempt },
shares,
}))
processors
.send(
spec.set().network,
CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares {
id: KeyGenId { set: spec.set(), attempt },
shares,
}),
)
.await;
}
}
@ -211,13 +217,16 @@ async fn handle_block<D: Db, Pro: Processor>(
data.data,
data.signed,
) {
processor
.send(CoordinatorMessage::Coordinator(
coordinator::CoordinatorMessage::BatchPreprocesses {
id: SignId { key: todo!(), id: data.plan, attempt: data.attempt },
preprocesses,
},
))
processors
.send(
spec.set().network,
CoordinatorMessage::Coordinator(
coordinator::CoordinatorMessage::BatchPreprocesses {
id: SignId { key: todo!(), id: data.plan, attempt: data.attempt },
preprocesses,
},
),
)
.await;
}
}
@ -231,14 +240,17 @@ async fn handle_block<D: Db, Pro: Processor>(
data.data,
data.signed,
) {
processor
.send(CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchShares {
id: SignId { key: todo!(), id: data.plan, attempt: data.attempt },
shares: shares
.drain()
.map(|(validator, share)| (validator, share.try_into().unwrap()))
.collect(),
}))
processors
.send(
spec.set().network,
CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchShares {
id: SignId { key: todo!(), id: data.plan, attempt: data.attempt },
shares: shares
.drain()
.map(|(validator, share)| (validator, share.try_into().unwrap()))
.collect(),
}),
)
.await;
}
}
@ -253,11 +265,14 @@ async fn handle_block<D: Db, Pro: Processor>(
data.data,
data.signed,
) {
processor
.send(CoordinatorMessage::Sign(sign::CoordinatorMessage::Preprocesses {
id: SignId { key: todo!(), id: data.plan, attempt: data.attempt },
preprocesses,
}))
processors
.send(
spec.set().network,
CoordinatorMessage::Sign(sign::CoordinatorMessage::Preprocesses {
id: SignId { key: todo!(), id: data.plan, attempt: data.attempt },
preprocesses,
}),
)
.await;
}
}
@ -271,11 +286,14 @@ async fn handle_block<D: Db, Pro: Processor>(
data.data,
data.signed,
) {
processor
.send(CoordinatorMessage::Sign(sign::CoordinatorMessage::Shares {
id: SignId { key: todo!(), id: data.plan, attempt: data.attempt },
shares,
}))
processors
.send(
spec.set().network,
CoordinatorMessage::Sign(sign::CoordinatorMessage::Shares {
id: SignId { key: todo!(), id: data.plan, attempt: data.attempt },
shares,
}),
)
.await;
}
}
@ -290,11 +308,11 @@ async fn handle_block<D: Db, Pro: Processor>(
// TODO: Trigger any necessary re-attempts
}
pub async fn handle_new_blocks<D: Db, Pro: Processor>(
pub async fn handle_new_blocks<D: Db, Pro: Processors>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
processor: &Pro,
processors: &Pro,
spec: &TributarySpec,
tributary: &TributaryReader<D, Transaction>,
) {
@ -302,7 +320,7 @@ pub async fn handle_new_blocks<D: Db, Pro: Processor>(
let mut last_block = db.last_block(genesis);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();
handle_block(db, key, recognized_id, processor, spec, block).await;
handle_block(db, key, recognized_id, processors, spec, block).await;
last_block = next;
db.set_last_block(genesis, next);
}