Connect broadcast

This commit is contained in:
Luke Parker 2022-10-30 05:37:23 -04:00
parent 6c54289fb4
commit aee0bde45d
No known key found for this signature in database
GPG key ID: F9F1386DB1E119B6
2 changed files with 45 additions and 27 deletions

View file

@ -48,6 +48,7 @@ pub(crate) struct TendermintImport<T: TendermintValidator> {
validators: Arc<TendermintValidators<T>>,
number: Arc<RwLock<u64>>,
gossip_queue: Arc<RwLock<Vec<SignedMessage<u16, T::Block, Signature>>>>,
importing_block: Arc<RwLock<Option<<T::Block as Block>::Hash>>>,
pub(crate) machine: Arc<RwLock<Option<TendermintHandle<Self>>>>,
@ -108,33 +109,48 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
let mut last_number = last_number.0 + 1;
let mut recv = gossip
.messages_for(TendermintGossip::<TendermintValidators<T>>::topic::<T::Block>(last_number));
loop {
match recv.try_next() {
Ok(Some(msg)) => handle
.messages
.send(match SignedMessage::decode(&mut msg.message.as_ref()) {
Ok(msg) => msg,
Err(e) => {
warn!("couldn't decode valid message: {}", e);
continue;
'outer: loop {
// Send out any queued messages
let mut queue = self.0.gossip_queue.write().unwrap().drain(..).collect::<Vec<_>>();
for msg in queue.drain(..) {
gossip.gossip_message(
TendermintGossip::<TendermintValidators<T>>::topic::<T::Block>(msg.number().0),
msg.encode(),
false,
);
}
// Handle any received messages
// Makes sure to handle all pending messages before acquiring the out-queue lock again
'inner: loop {
match recv.try_next() {
Ok(Some(msg)) => handle
.messages
.send(match SignedMessage::decode(&mut msg.message.as_ref()) {
Ok(msg) => msg,
Err(e) => {
warn!("couldn't decode valid message: {}", e);
continue;
}
})
.await
.unwrap(),
Ok(None) => break 'outer,
// No messages available
Err(_) => {
// Check if we the block updated and should be listening on a different topic
let curr = *self.0.number.read().unwrap();
if last_number != curr {
last_number = curr;
// TODO: Will this return existing messages on the new height? Or will those have been
// ignored and are now gone?
recv = gossip.messages_for(TendermintGossip::<TendermintValidators<T>>::topic::<
T::Block,
>(last_number));
}
})
.await
.unwrap(),
Ok(None) => break,
// No messages available
Err(_) => {
// Check if we the block updated and should be listening on a different topic
let curr = *self.0.number.read().unwrap();
if last_number != curr {
last_number = curr;
// TODO: Will this return existing messages on the new height? Or will those have been
// ignored and are now gone?
recv = gossip.messages_for(TendermintGossip::<TendermintValidators<T>>::topic::<
T::Block,
>(last_number));
yield_now().await;
break 'inner;
}
yield_now().await;
}
}
}
@ -149,6 +165,7 @@ impl<T: TendermintValidator> Clone for TendermintImport<T> {
validators: self.validators.clone(),
number: self.number.clone(),
gossip_queue: self.gossip_queue.clone(),
importing_block: self.importing_block.clone(),
machine: self.machine.clone(),
@ -175,6 +192,7 @@ impl<T: TendermintValidator> TendermintImport<T> {
validators: Arc::new(TendermintValidators::new(client.clone())),
number: Arc::new(RwLock::new(0)),
gossip_queue: Arc::new(RwLock::new(vec![])),
importing_block: Arc::new(RwLock::new(None)),
machine: Arc::new(RwLock::new(None)),
@ -359,7 +377,7 @@ impl<T: TendermintValidator> Network for TendermintImport<T> {
}
async fn broadcast(&mut self, msg: SignedMessage<u16, Self::Block, Signature>) {
// TODO
self.gossip_queue.write().unwrap().push(msg);
}
async fn slash(&mut self, validator: u16) {

View file

@ -278,7 +278,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
weights: weights.clone(),
proposer,
number: BlockNumber(last.0.0 + 1),
number: BlockNumber(last.0 .0 + 1),
canonical_start_time: last.1,
// The end time of the last block is the start time for this one
// The Commit explicitly contains the end time, so loading the last commit will provide