diff --git a/net/levin/src/lib.rs b/net/levin/src/lib.rs index 22ffd12..cb0a3ab 100644 --- a/net/levin/src/lib.rs +++ b/net/levin/src/lib.rs @@ -64,7 +64,7 @@ pub enum BucketError { #[error("Levin header had incorrect signature")] InvalidHeaderSignature, /// Error decoding the body - #[error("Error decoding bucket body")] + #[error("Error decoding bucket body: {0}")] BodyDecodingError(Box), /// Unknown command ID #[error("Unknown command ID")] diff --git a/p2p/monero-p2p/src/client/connection.rs b/p2p/monero-p2p/src/client/connection.rs index a05b022..2d1b03b 100644 --- a/p2p/monero-p2p/src/client/connection.rs +++ b/p2p/monero-p2p/src/client/connection.rs @@ -90,6 +90,8 @@ where } async fn handle_client_request(&mut self, req: ConnectionTaskRequest) -> Result<(), PeerError> { + tracing::debug!("handling client request, id: {:?}", req.request.id()); + if req.request.needs_response() { self.state = State::WaitingForResponse { request_id: req.request.id(), @@ -103,6 +105,8 @@ where } async fn handle_peer_request(&mut self, req: PeerRequest) -> Result<(), PeerError> { + tracing::debug!("Received peer request: {:?}", req.id()); + let ready_svc = self.peer_request_handler.ready().await?; let res = ready_svc.call(req).await?; if matches!(res, PeerResponse::NA) { @@ -113,6 +117,8 @@ where } async fn handle_potential_response(&mut self, mes: Message) -> Result<(), PeerError> { + tracing::debug!("Received peer message, command: {:?}", mes.command()); + if mes.is_request() { return self.handle_peer_request(mes.try_into().unwrap()).await; } @@ -145,6 +151,7 @@ where where Str: FusedStream> + Unpin, { + tracing::debug!("waiting for peer/client request."); tokio::select! { biased; bradcast_req = self.broadcast_rx.next() => { @@ -152,15 +159,17 @@ where } client_req = self.client_rx.next() => { if let Some(client_req) = client_req { - self.handle_client_request(client_req).await? + self.handle_client_request(client_req).await + } else { + Err(PeerError::ClientChannelClosed) } - Err(PeerError::ClientChannelClosed) }, peer_message = stream.next() => { if let Some(peer_message) = peer_message { - self.handle_peer_request(peer_message?.try_into().map_err(|_| PeerError::PeerSentInvalidMessage)?).await? + self.handle_peer_request(peer_message?.try_into().map_err(|_| PeerError::PeerSentInvalidMessage)?).await + }else { + Err(PeerError::ClientChannelClosed) } - Err(PeerError::ConnectionClosed) }, } } @@ -169,6 +178,7 @@ where where Str: FusedStream> + Unpin, { + tracing::debug!("waiting for peer response.."); tokio::select! { biased; bradcast_req = self.broadcast_rx.next() => { @@ -176,9 +186,10 @@ where } peer_message = stream.next() => { if let Some(peer_message) = peer_message { - self.handle_peer_request(peer_message?.try_into().map_err(|_| PeerError::PeerSentInvalidMessage)?).await? + self.handle_potential_response(peer_message?).await + }else { + Err(PeerError::ClientChannelClosed) } - Err(PeerError::ConnectionClosed) }, } } @@ -187,6 +198,10 @@ where where Str: FusedStream> + Unpin, { + tracing::debug!( + "Handling eager messages len: {}", + eager_protocol_messages.len() + ); for message in eager_protocol_messages { let message = Message::Protocol(message).try_into(); @@ -202,6 +217,7 @@ where loop { if self.connection_guard.should_shutdown() { + tracing::debug!("connection guard has shutdown, shutting down connection."); return self.shutdown(PeerError::ConnectionClosed); } diff --git a/p2p/monero-p2p/tests/utils.rs b/p2p/monero-p2p/tests/utils.rs index 372b7ea..e6b457f 100644 --- a/p2p/monero-p2p/tests/utils.rs +++ b/p2p/monero-p2p/tests/utils.rs @@ -86,10 +86,10 @@ impl Service for DummyPeerRequestHandlerSvc { Pin> + Send + 'static>>; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - todo!() + Poll::Ready(Ok(())) } fn call(&mut self, _: PeerRequest) -> Self::Future { - todo!() + async move { Ok(PeerResponse::NA) }.boxed() } }