Abort the P2P meta task when dropped

This should cause full cleanup of all Tributary async tasks, since the machine
already cleans itself up on drop.
This commit is contained in:
Luke Parker 2023-10-14 20:07:12 -04:00
parent e4adaa8947
commit 62e1d63f47
No known key found for this signature in database
3 changed files with 26 additions and 15 deletions

View file

@ -67,8 +67,6 @@ pub enum TributaryEvent<D: Db, P: P2p> {
TributaryRetired(ValidatorSet),
}
// TODO: Clean up the actual underlying Tributary/Tendermint tasks
// Creates a new tributary and sends it to all listeners.
async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
db: D,
@ -86,6 +84,7 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
let tributary = Tributary::<_, Transaction, _>::new(
// TODO2: Use a db on a distinct volume to protect against DoS attacks
// TODO2: Delete said db once the Tributary is dropped
db,
spec.genesis(),
spec.start_time(),

View file

@ -152,6 +152,14 @@ pub struct Tributary<D: Db, T: TransactionTrait, P: P2p> {
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
p2p_meta_task_handle: Arc<tokio::task::AbortHandle>,
}
impl<D: Db, T: TransactionTrait, P: P2p> Drop for Tributary<D, T, P> {
fn drop(&mut self) {
self.p2p_meta_task_handle.abort();
}
}
impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
@ -186,26 +194,29 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
let to_rebroadcast = Arc::new(RwLock::new(vec![]));
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
// P2P layer
tokio::spawn({
let to_rebroadcast = to_rebroadcast.clone();
let p2p = p2p.clone();
async move {
loop {
let to_rebroadcast = to_rebroadcast.read().await.clone();
for msg in to_rebroadcast {
p2p.broadcast(genesis, msg).await;
let p2p_meta_task_handle = Arc::new(
tokio::spawn({
let to_rebroadcast = to_rebroadcast.clone();
let p2p = p2p.clone();
async move {
loop {
let to_rebroadcast = to_rebroadcast.read().await.clone();
for msg in to_rebroadcast {
p2p.broadcast(genesis, msg).await;
}
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
}
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
}
}
});
})
.abort_handle(),
);
let network =
TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p };
let TendermintHandle { synced_block, synced_block_result, messages, machine } =
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
tokio::task::spawn(machine.run());
tokio::spawn(machine.run());
Some(Self {
db,
@ -214,6 +225,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
synced_block: Arc::new(RwLock::new(synced_block)),
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
messages: Arc::new(RwLock::new(messages)),
p2p_meta_task_handle,
})
}

View file

@ -176,7 +176,7 @@ impl TestNetwork {
TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) },
)
.await;
tokio::task::spawn(machine.run());
tokio::spawn(machine.run());
write.push((messages, synced_block, synced_block_result));
}
}