diff --git a/processor/primitives/src/task.rs b/processor/primitives/src/task.rs index 94a576a0..a40fb9ff 100644 --- a/processor/primitives/src/task.rs +++ b/processor/primitives/src/task.rs @@ -1,28 +1,54 @@ use core::time::Duration; +use std::sync::Arc; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot, Mutex}; -/// A handle to immediately run an iteration of a task. +enum Closed { + NotClosed(Option>), + Closed, +} + +/// A handle for a task. #[derive(Clone)] -pub struct RunNowHandle(mpsc::Sender<()>); -/// An instruction recipient to immediately run an iteration of a task. -pub struct RunNowRecipient(mpsc::Receiver<()>); +pub struct TaskHandle { + run_now: mpsc::Sender<()>, + close: mpsc::Sender<()>, + closed: Arc>, +} +/// A task's internal structures. +pub struct Task { + run_now: mpsc::Receiver<()>, + close: mpsc::Receiver<()>, + closed: oneshot::Sender<()>, +} -impl RunNowHandle { - /// Create a new run-now handle to be assigned to a task. - pub fn new() -> (Self, RunNowRecipient) { +impl Task { + /// Create a new task definition. + pub fn new() -> (Self, TaskHandle) { // Uses a capacity of 1 as any call to run as soon as possible satisfies all calls to run as // soon as possible - let (send, recv) = mpsc::channel(1); - (Self(send), RunNowRecipient(recv)) + let (run_now_send, run_now_recv) = mpsc::channel(1); + // And any call to close satisfies all calls to close + let (close_send, close_recv) = mpsc::channel(1); + let (closed_send, closed_recv) = oneshot::channel(); + ( + Self { run_now: run_now_recv, close: close_recv, closed: closed_send }, + TaskHandle { + run_now: run_now_send, + close: close_send, + closed: Arc::new(Mutex::new(Closed::NotClosed(Some(closed_recv)))), + }, + ) } +} +impl TaskHandle { /// Tell the task to run now (and not whenever its next iteration on a timer is). /// /// Panics if the task has been dropped. pub fn run_now(&self) { #[allow(clippy::match_same_arms)] - match self.0.try_send(()) { + match self.run_now.try_send(()) { Ok(()) => {} // NOP on full, as this task will already be ran as soon as possible Err(mpsc::error::TrySendError::Full(())) => {} @@ -31,6 +57,24 @@ impl RunNowHandle { } } } + + /// Close the task. + /// + /// Returns once the task shuts down after it finishes its current iteration (which may be of + /// unbounded time). + pub async fn close(self) { + // If another instance of the handle called tfhis, don't error + let _ = self.close.send(()).await; + // Wait until we receive the closed message + let mut closed = self.closed.lock().await; + match &mut *closed { + Closed::NotClosed(ref mut recv) => { + assert_eq!(recv.take().unwrap().await, Ok(()), "continually ran task dropped itself?"); + *closed = Closed::Closed; + } + Closed::Closed => {} + } + } } /// A task to be continually ran. @@ -50,10 +94,7 @@ pub trait ContinuallyRan: Sized { async fn run_iteration(&mut self) -> Result; /// Continually run the task. - /// - /// This returns a channel which can have a message set to immediately trigger a new run of an - /// iteration. - async fn continually_run(mut self, mut run_now: RunNowRecipient, dependents: Vec) { + async fn continually_run(mut self, mut task: Task, dependents: Vec) { // The default number of seconds to sleep before running the task again let default_sleep_before_next_task = Self::DELAY_BETWEEN_ITERATIONS; // The current number of seconds to sleep before running the task again @@ -66,6 +107,15 @@ pub trait ContinuallyRan: Sized { }; loop { + // If we were told to close/all handles were dropped, drop it + { + let should_close = task.close.try_recv(); + match should_close { + Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => break, + Err(mpsc::error::TryRecvError::Empty) => {} + } + } + match self.run_iteration().await { Ok(run_dependents) => { // Upon a successful (error-free) loop iteration, reset the amount of time we sleep @@ -86,8 +136,15 @@ pub trait ContinuallyRan: Sized { // Don't run the task again for another few seconds UNLESS told to run now tokio::select! { () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {}, - msg = run_now.0.recv() => assert_eq!(msg, Some(()), "run now handle was dropped"), + msg = task.run_now.recv() => { + // Check if this is firing because the handle was dropped + if msg.is_none() { + break; + } + }, } } + + task.closed.send(()).unwrap(); } } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 7c699e9c..6403605d 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -343,7 +343,7 @@ pub trait Scheduler: 'static + Send { /// A representation of a scanner. #[allow(non_snake_case)] pub struct Scanner { - substrate_handle: RunNowHandle, + substrate_handle: TaskHandle, _S: PhantomData, } impl Scanner { @@ -362,11 +362,11 @@ impl Scanner { let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone()); let eventuality_task = eventuality::EventualityTask::<_, _, Sch>::new(db, feed, start_block); - let (_index_handle, index_run) = RunNowHandle::new(); - let (scan_handle, scan_run) = RunNowHandle::new(); - let (report_handle, report_run) = RunNowHandle::new(); - let (substrate_handle, substrate_run) = RunNowHandle::new(); - let (eventuality_handle, eventuality_run) = RunNowHandle::new(); + let (index_run, _index_handle) = Task::new(); + let (scan_run, scan_handle) = Task::new(); + let (report_run, report_handle) = Task::new(); + let (substrate_run, substrate_handle) = Task::new(); + let (eventuality_run, eventuality_handle) = Task::new(); // Upon indexing a new block, scan it tokio::spawn(index_task.continually_run(index_run, vec![scan_handle.clone()])); diff --git a/processor/signers/src/db.rs b/processor/signers/src/db.rs index 9975cbda..ec9b879c 100644 --- a/processor/signers/src/db.rs +++ b/processor/signers/src/db.rs @@ -9,6 +9,7 @@ create_db! { RegisteredKeys: () -> Vec, SerializedKeys: (session: Session) -> Vec, LatestRetiredSession: () -> Session, + ToCleanup: () -> Vec<(Session, Vec)>, } } diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index 9bc2459d..72fe2d17 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -3,6 +3,7 @@ #![deny(missing_docs)] use core::{fmt::Debug, marker::PhantomData}; +use std::collections::HashMap; use zeroize::Zeroizing; @@ -13,6 +14,7 @@ use frost::dkg::{ThresholdCore, ThresholdKeys}; use serai_db::{DbTxn, Db}; +use primitives::task::TaskHandle; use scheduler::{Transaction, SignableTransaction, TransactionsToSign}; pub(crate) mod db; @@ -39,7 +41,10 @@ pub trait TransactionPublisher: 'static + Send + Sync { } /// The signers used by a processor. -pub struct Signers(PhantomData); +pub struct Signers { + tasks: HashMap>, + _ST: PhantomData, +} /* This is completely outside of consensus, so the worst that can happen is: @@ -58,6 +63,8 @@ impl Signers { /// /// This will spawn tasks for any historically registered keys. pub fn new(db: impl Db) -> Self { + let mut tasks = HashMap::new(); + for session in db::RegisteredKeys::get(&db).unwrap_or(vec![]) { let buf = db::SerializedKeys::get(&db, session).unwrap(); let mut buf = buf.as_slice(); @@ -74,7 +81,7 @@ impl Signers { todo!("TODO") } - todo!("TODO") + Self { tasks, _ST: PhantomData } } /// Register a set of keys to sign with. @@ -87,6 +94,7 @@ impl Signers { substrate_keys: Vec>, network_keys: Vec>, ) { + // Don't register already retired keys if Some(session.0) <= db::LatestRetiredSession::get(txn).map(|session| session.0) { return; } @@ -125,9 +133,6 @@ impl Signers { db::LatestRetiredSession::set(txn, &session); } - // Kill the tasks - todo!("TODO"); - // Update RegisteredKeys/SerializedKeys if let Some(registered) = db::RegisteredKeys::get(txn) { db::RegisteredKeys::set( @@ -137,6 +142,20 @@ impl Signers { } db::SerializedKeys::del(txn, session); + // Queue the session for clean up + let mut to_cleanup = db::ToCleanup::get(txn).unwrap_or(vec![]); + to_cleanup.push((session, external_key.to_bytes().as_ref().to_vec())); + db::ToCleanup::set(txn, &to_cleanup); + + // TODO: Handle all of the following cleanup on a task + /* + // Kill the tasks + if let Some(tasks) = self.tasks.remove(&session) { + for task in tasks { + task.close().await; + } + } + // Drain the transactions to sign // Presumably, TransactionsToSign will be fully populated before retiry occurs, making this // perfect in not leaving any pending blobs behind @@ -152,6 +171,7 @@ impl Signers { while db::SlashReportSignerToCoordinatorMessages::try_recv(txn, session).is_some() {} while db::CoordinatorToCosignerMessages::try_recv(txn, session).is_some() {} while db::CosignerToCoordinatorMessages::try_recv(txn, session).is_some() {} + */ } }