mirror of
https://github.com/hinto-janai/cuprate.git
synced 2025-03-12 09:31:30 +00:00
start re-working p2p to work with change monero-wire
This commit is contained in:
parent
477d9e42f3
commit
d2a94f1909
22 changed files with 47 additions and 2471 deletions
12
Cargo.toml
12
Cargo.toml
|
@ -7,16 +7,6 @@ members = [
|
|||
# "database",
|
||||
"net/levin",
|
||||
"net/monero-wire",
|
||||
# "p2p",
|
||||
"p2p",
|
||||
# "p2p/sync-states"
|
||||
]
|
||||
|
||||
[workspace.dependencies]
|
||||
monero = { version = "*" }
|
||||
bincode = { version = "2.0.0-rc.3" }
|
||||
serde = { version = "*", features =["derive"]}
|
||||
tracing = "*"
|
||||
tracing-subscriber = "*"
|
||||
|
||||
# As suggested by /u/danda :
|
||||
thiserror = "*"
|
||||
|
|
|
@ -9,7 +9,7 @@ repository = "https://github.com/SyntheticBird45/cuprate/tree/main/net/monero-wi
|
|||
|
||||
[dependencies]
|
||||
levin-cuprate = {path="../levin"}
|
||||
epee-encoding = { git = "https://github.com/boog900/epee-encoding"}
|
||||
epee-encoding = { git = "https://github.com/boog900/epee-encoding", rev = "b774bf7"}
|
||||
|
||||
[dev-dependencies]
|
||||
hex = "0.4.3"
|
||||
|
|
|
@ -32,8 +32,7 @@
|
|||
pub mod messages;
|
||||
pub mod network_address;
|
||||
|
||||
pub use network_address::NetworkAddress;
|
||||
|
||||
pub use messages::*;
|
||||
pub use network_address::*;
|
||||
|
||||
pub type MoneroWireCodec = levin_cuprate::codec::LevinMessageCodec<Message>;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
[package]
|
||||
name = "cuprate-peer"
|
||||
name = "cuprate-p2p"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "AGPL-3.0-only"
|
||||
|
@ -7,13 +7,18 @@ authors = ["Boog900"]
|
|||
|
||||
|
||||
[dependencies]
|
||||
chrono = "0.4.24"
|
||||
thiserror = "1.0.39"
|
||||
cuprate-common = {path = "../common"}
|
||||
monero-wire = {path= "../net/monero-wire"}
|
||||
futures = "0.3.26"
|
||||
tower = {version = "0.4.13", features = ["util", "steer"]}
|
||||
tokio = {version= "1.27", features=["rt", "time"]}
|
||||
async-trait = "0.1.68"
|
||||
tracing = "0.1.37"
|
||||
rand = "0.8.5"
|
||||
|
||||
|
||||
#chrono = "0.4.24"
|
||||
#thiserror = "1.0.39"
|
||||
#cuprate-common = {path = "../common"}
|
||||
#futures = "0.3.26"
|
||||
#tower = {version = "0.4.13", features = ["util", "steer", "load", "discover", "load-shed", "buffer", "timeout"]}
|
||||
#tokio = {version= "1.27", features=["rt", "time", "net"]}
|
||||
#tokio-util = {version = "0.7.8", features=["compat"]}
|
||||
#tokio-stream = {version="0.1.14", features=["time"]}
|
||||
#async-trait = "0.1.68"
|
||||
#tracing = "0.1.37"
|
||||
#rand = "0.8.5"
|
||||
#pin-project = "1.0.12"
|
||||
|
|
|
@ -1,120 +0,0 @@
|
|||
mod addr_book_client;
|
||||
pub(crate) mod address_book;
|
||||
|
||||
pub use addr_book_client::start_address_book;
|
||||
|
||||
use monero_wire::{messages::PeerListEntryBase, network_address::NetZone, NetworkAddress};
|
||||
|
||||
const MAX_WHITE_LIST_PEERS: usize = 1000;
|
||||
const MAX_GRAY_LIST_PEERS: usize = 5000;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum AddressBookError {
|
||||
#[error("Peer was not found in book")]
|
||||
PeerNotFound,
|
||||
#[error("The peer list is empty")]
|
||||
PeerListEmpty,
|
||||
#[error("Peer sent an address out of it's net-zone")]
|
||||
PeerSentAnAddressOutOfZone,
|
||||
#[error("The address books channel has closed.")]
|
||||
AddressBooksChannelClosed,
|
||||
#[error("Peer Store Error: {0}")]
|
||||
PeerStoreError(&'static str),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AddressBookRequest {
|
||||
HandleNewPeerList(Vec<PeerListEntryBase>, NetZone),
|
||||
SetPeerSeen(NetworkAddress, i64),
|
||||
BanPeer(NetworkAddress, chrono::NaiveDateTime),
|
||||
AddPeerToAnchor(NetworkAddress),
|
||||
RemovePeerFromAnchor(NetworkAddress),
|
||||
UpdatePeerInfo(PeerListEntryBase),
|
||||
|
||||
GetRandomGrayPeer(NetZone),
|
||||
GetRandomWhitePeer(NetZone),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for AddressBookRequest {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::HandleNewPeerList(_, _) => f.write_str("HandleNewPeerList"),
|
||||
Self::SetPeerSeen(_, _) => f.write_str("SetPeerSeen"),
|
||||
Self::BanPeer(_, _) => f.write_str("BanPeer"),
|
||||
Self::AddPeerToAnchor(_) => f.write_str("AddPeerToAnchor"),
|
||||
Self::RemovePeerFromAnchor(_) => f.write_str("RemovePeerFromAnchor"),
|
||||
Self::UpdatePeerInfo(_) => f.write_str("UpdatePeerInfo"),
|
||||
Self::GetRandomGrayPeer(_) => f.write_str("GetRandomGrayPeer"),
|
||||
Self::GetRandomWhitePeer(_) => f.write_str("GetRandomWhitePeer"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AddressBookRequest {
|
||||
pub fn get_zone(&self) -> NetZone {
|
||||
match self {
|
||||
Self::HandleNewPeerList(_, zone) => *zone,
|
||||
Self::SetPeerSeen(peer, _) => peer.get_zone(),
|
||||
Self::BanPeer(peer, _) => peer.get_zone(),
|
||||
Self::AddPeerToAnchor(peer) => peer.get_zone(),
|
||||
Self::RemovePeerFromAnchor(peer) => peer.get_zone(),
|
||||
Self::UpdatePeerInfo(peer) => peer.adr.get_zone(),
|
||||
|
||||
Self::GetRandomGrayPeer(zone) => *zone,
|
||||
Self::GetRandomWhitePeer(zone) => *zone,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AddressBookResponse {
|
||||
Ok,
|
||||
Peer(PeerListEntryBase),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AddressBookConfig {
|
||||
max_white_peers: usize,
|
||||
max_gray_peers: usize,
|
||||
}
|
||||
|
||||
impl Default for AddressBookConfig {
|
||||
fn default() -> Self {
|
||||
AddressBookConfig {
|
||||
max_white_peers: MAX_WHITE_LIST_PEERS,
|
||||
max_gray_peers: MAX_GRAY_LIST_PEERS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait AddressBookStore: Clone {
|
||||
type Error: Into<AddressBookError>;
|
||||
/// Loads the peers from the peer store.
|
||||
/// returns (in order):
|
||||
/// the white list,
|
||||
/// the gray list,
|
||||
/// the anchor list,
|
||||
/// the ban list
|
||||
async fn load_peers(
|
||||
&mut self,
|
||||
zone: NetZone,
|
||||
) -> Result<
|
||||
(
|
||||
Vec<PeerListEntryBase>, // white list
|
||||
Vec<PeerListEntryBase>, // gray list
|
||||
Vec<NetworkAddress>, // anchor list
|
||||
Vec<(NetworkAddress, chrono::NaiveDateTime)>, // ban list
|
||||
),
|
||||
Self::Error,
|
||||
>;
|
||||
|
||||
async fn save_peers(
|
||||
&mut self,
|
||||
zone: NetZone,
|
||||
white: Vec<PeerListEntryBase>,
|
||||
gray: Vec<PeerListEntryBase>,
|
||||
anchor: Vec<NetworkAddress>,
|
||||
bans: Vec<(NetworkAddress, chrono::NaiveDateTime)>, // ban lists
|
||||
) -> Result<(), Self::Error>;
|
||||
}
|
|
@ -1,121 +0,0 @@
|
|||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use futures::FutureExt;
|
||||
use tokio::task::spawn;
|
||||
use tower::steer::Steer;
|
||||
|
||||
use monero_wire::network_address::NetZone;
|
||||
|
||||
use super::address_book::{AddressBook, AddressBookClientRequest};
|
||||
use super::{
|
||||
AddressBookConfig, AddressBookError, AddressBookRequest, AddressBookResponse, AddressBookStore,
|
||||
};
|
||||
|
||||
pub async fn start_address_book<S>(
|
||||
peer_store: S,
|
||||
config: AddressBookConfig,
|
||||
) -> Result<
|
||||
impl tower::Service<
|
||||
AddressBookRequest,
|
||||
Response = AddressBookResponse,
|
||||
Error = AddressBookError,
|
||||
Future = Pin<
|
||||
Box<
|
||||
dyn Future<Output = Result<AddressBookResponse, AddressBookError>>
|
||||
+ Send
|
||||
+ 'static,
|
||||
>,
|
||||
>,
|
||||
> + Clone,
|
||||
AddressBookError,
|
||||
>
|
||||
where
|
||||
S: AddressBookStore,
|
||||
{
|
||||
let mut builder = AddressBookBuilder::new(peer_store, config);
|
||||
|
||||
let public = builder.build(NetZone::Public).await?;
|
||||
let tor = builder.build(NetZone::Tor).await?;
|
||||
let i2p = builder.build(NetZone::I2p).await?;
|
||||
|
||||
let books = vec![public, tor, i2p];
|
||||
|
||||
Ok(Steer::new(
|
||||
books,
|
||||
|req: &AddressBookRequest, _: &[_]| match req.get_zone() {
|
||||
NetZone::Public => 0,
|
||||
NetZone::Tor => 1,
|
||||
NetZone::I2p => 2,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
pub struct AddressBookBuilder<S> {
|
||||
peer_store: S,
|
||||
config: AddressBookConfig,
|
||||
}
|
||||
|
||||
impl<S> AddressBookBuilder<S>
|
||||
where
|
||||
S: AddressBookStore,
|
||||
{
|
||||
fn new(peer_store: S, config: AddressBookConfig) -> Self {
|
||||
AddressBookBuilder { peer_store, config }
|
||||
}
|
||||
|
||||
async fn build(&mut self, zone: NetZone) -> Result<AddressBookClient, AddressBookError> {
|
||||
let (white, gray, anchor, bans) =
|
||||
self.peer_store.load_peers(zone).await.map_err(Into::into)?;
|
||||
|
||||
let book = AddressBook::new(self.config.clone(), zone, white, gray, anchor, bans);
|
||||
|
||||
let (tx, rx) = mpsc::channel(5);
|
||||
|
||||
spawn(book.run(rx));
|
||||
|
||||
Ok(AddressBookClient { book: tx })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct AddressBookClient {
|
||||
book: mpsc::Sender<AddressBookClientRequest>,
|
||||
}
|
||||
|
||||
impl tower::Service<AddressBookRequest> for AddressBookClient {
|
||||
type Error = AddressBookError;
|
||||
type Response = AddressBookResponse;
|
||||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.book
|
||||
.poll_ready(cx)
|
||||
.map_err(|_| AddressBookError::AddressBooksChannelClosed)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: AddressBookRequest) -> Self::Future {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
// get the callers span
|
||||
let span = tracing::span::Span::current();
|
||||
|
||||
let req = AddressBookClientRequest { req, tx, span };
|
||||
|
||||
match self.book.try_send(req) {
|
||||
Err(_e) => {
|
||||
// I'm assuming all callers will call `poll_ready` first (which they are supposed to)
|
||||
futures::future::ready(Err(AddressBookError::AddressBooksChannelClosed)).boxed()
|
||||
}
|
||||
Ok(()) => async move {
|
||||
rx.await
|
||||
.expect("Address Book will not drop requests until completed")
|
||||
}
|
||||
.boxed(),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,274 +0,0 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
StreamExt,
|
||||
};
|
||||
use rand::{Rng, SeedableRng};
|
||||
use std::time::Duration;
|
||||
|
||||
use cuprate_common::PruningSeed;
|
||||
use monero_wire::{messages::PeerListEntryBase, network_address::NetZone, NetworkAddress};
|
||||
|
||||
use super::{AddressBookConfig, AddressBookError, AddressBookRequest, AddressBookResponse};
|
||||
|
||||
mod peer_list;
|
||||
use peer_list::PeerList;
|
||||
|
||||
pub(crate) struct AddressBookClientRequest {
|
||||
pub req: AddressBookRequest,
|
||||
pub tx: oneshot::Sender<Result<AddressBookResponse, AddressBookError>>,
|
||||
|
||||
pub span: tracing::Span,
|
||||
}
|
||||
|
||||
pub struct AddressBook {
|
||||
zone: NetZone,
|
||||
config: AddressBookConfig,
|
||||
white_list: PeerList,
|
||||
gray_list: PeerList,
|
||||
anchor_list: HashSet<NetworkAddress>,
|
||||
|
||||
baned_peers: HashMap<NetworkAddress, chrono::NaiveDateTime>,
|
||||
|
||||
rng: rand::rngs::StdRng,
|
||||
//banned_subnets:,
|
||||
}
|
||||
|
||||
impl AddressBook {
|
||||
pub fn new(
|
||||
config: AddressBookConfig,
|
||||
zone: NetZone,
|
||||
white_peers: Vec<PeerListEntryBase>,
|
||||
gray_peers: Vec<PeerListEntryBase>,
|
||||
anchor_peers: Vec<NetworkAddress>,
|
||||
baned_peers: Vec<(NetworkAddress, chrono::NaiveDateTime)>,
|
||||
) -> AddressBook {
|
||||
let rng = rand::prelude::StdRng::from_entropy();
|
||||
let white_list = PeerList::new(white_peers);
|
||||
let gray_list = PeerList::new(gray_peers);
|
||||
let anchor_list = HashSet::from_iter(anchor_peers);
|
||||
let baned_peers = HashMap::from_iter(baned_peers);
|
||||
|
||||
let mut book = AddressBook {
|
||||
zone,
|
||||
config,
|
||||
white_list,
|
||||
gray_list,
|
||||
anchor_list,
|
||||
baned_peers,
|
||||
rng,
|
||||
};
|
||||
|
||||
book.check_unban_peers();
|
||||
|
||||
book
|
||||
}
|
||||
|
||||
pub const fn book_name(&self) -> &'static str {
|
||||
match self.zone {
|
||||
NetZone::Public => "PublicAddressBook",
|
||||
NetZone::Tor => "TorAddressBook",
|
||||
NetZone::I2p => "I2pAddressBook",
|
||||
}
|
||||
}
|
||||
|
||||
fn len_white_list(&self) -> usize {
|
||||
self.white_list.len()
|
||||
}
|
||||
|
||||
fn len_gray_list(&self) -> usize {
|
||||
self.gray_list.len()
|
||||
}
|
||||
|
||||
fn max_white_peers(&self) -> usize {
|
||||
self.config.max_white_peers
|
||||
}
|
||||
|
||||
fn max_gray_peers(&self) -> usize {
|
||||
self.config.max_gray_peers
|
||||
}
|
||||
|
||||
fn is_peer_banned(&self, peer: &NetworkAddress) -> bool {
|
||||
self.baned_peers.contains_key(peer)
|
||||
}
|
||||
|
||||
fn check_unban_peers(&mut self) {
|
||||
let mut now = chrono::Utc::now().naive_utc();
|
||||
self.baned_peers.retain(|_, time| time > &mut now)
|
||||
}
|
||||
|
||||
fn ban_peer(&mut self, peer: NetworkAddress, till: chrono::NaiveDateTime) {
|
||||
let now = chrono::Utc::now().naive_utc();
|
||||
if now > till {
|
||||
return;
|
||||
}
|
||||
|
||||
tracing::debug!("Banning peer: {peer:?} until: {till}");
|
||||
|
||||
self.baned_peers.insert(peer, till);
|
||||
}
|
||||
|
||||
fn add_peer_to_anchor(&mut self, peer: NetworkAddress) -> Result<(), AddressBookError> {
|
||||
tracing::debug!("Adding peer: {peer:?} to anchor list");
|
||||
// is peer in gray list
|
||||
if let Some(peer_eb) = self.gray_list.remove_peer(&peer) {
|
||||
self.white_list.add_new_peer(peer_eb);
|
||||
self.anchor_list.insert(peer);
|
||||
Ok(())
|
||||
} else {
|
||||
if !self.white_list.contains_peer(&peer) {
|
||||
return Err(AddressBookError::PeerNotFound);
|
||||
}
|
||||
self.anchor_list.insert(peer);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_peer_from_anchor(&mut self, peer: NetworkAddress) {
|
||||
let _ = self.anchor_list.remove(&peer);
|
||||
}
|
||||
|
||||
fn set_peer_seen(
|
||||
&mut self,
|
||||
peer: NetworkAddress,
|
||||
last_seen: i64,
|
||||
) -> Result<(), AddressBookError> {
|
||||
if let Some(mut peer) = self.gray_list.remove_peer(&peer) {
|
||||
peer.last_seen = last_seen;
|
||||
self.white_list.add_new_peer(peer);
|
||||
} else {
|
||||
let peer = self
|
||||
.white_list
|
||||
.get_peer_mut(&peer)
|
||||
.ok_or(AddressBookError::PeerNotFound)?;
|
||||
peer.last_seen = last_seen;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_peer_to_gray_list(&mut self, mut peer: PeerListEntryBase) {
|
||||
if self.white_list.contains_peer(&peer.adr) {
|
||||
return;
|
||||
};
|
||||
if !self.gray_list.contains_peer(&peer.adr) {
|
||||
peer.last_seen = 0;
|
||||
self.gray_list.add_new_peer(peer);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_new_peerlist(
|
||||
&mut self,
|
||||
mut peers: Vec<PeerListEntryBase>,
|
||||
) -> Result<(), AddressBookError> {
|
||||
let length = peers.len();
|
||||
|
||||
tracing::debug!("Received new peer list, length: {length}");
|
||||
|
||||
let mut err = None;
|
||||
peers.retain(|peer| {
|
||||
if err.is_some() {
|
||||
false
|
||||
} else if peer.adr.is_local() || peer.adr.is_loopback() {
|
||||
false
|
||||
} else if peer.adr.port() == peer.rpc_port {
|
||||
false
|
||||
} else if PruningSeed::try_from(peer.pruning_seed).is_err() {
|
||||
false
|
||||
} else if peer.adr.get_zone() != self.zone {
|
||||
tracing::info!("Received an address from a different network zone, ignoring list.");
|
||||
err = Some(AddressBookError::PeerSentAnAddressOutOfZone);
|
||||
false
|
||||
} else if self.is_peer_banned(&peer.adr) {
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(e) = err {
|
||||
return Err(e);
|
||||
} else {
|
||||
for peer in peers {
|
||||
self.add_peer_to_gray_list(peer);
|
||||
}
|
||||
self.gray_list
|
||||
.reduce_list(&HashSet::new(), self.max_gray_peers());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn get_random_gray_peer(&mut self) -> Option<PeerListEntryBase> {
|
||||
self.gray_list.get_random_peer(&mut self.rng).map(|p| *p)
|
||||
}
|
||||
|
||||
fn get_random_white_peer(&mut self) -> Option<PeerListEntryBase> {
|
||||
self.white_list.get_random_peer(&mut self.rng).map(|p| *p)
|
||||
}
|
||||
|
||||
fn update_peer_info(&mut self, peer: PeerListEntryBase) -> Result<(), AddressBookError> {
|
||||
if let Some(peer_stored) = self.gray_list.get_peer_mut(&peer.adr) {
|
||||
*peer_stored = peer;
|
||||
Ok(())
|
||||
} else if let Some(peer_stored) = self.white_list.get_peer_mut(&peer.adr) {
|
||||
*peer_stored = peer;
|
||||
Ok(())
|
||||
} else {
|
||||
return Err(AddressBookError::PeerNotFound);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run(mut self, mut rx: mpsc::Receiver<AddressBookClientRequest>) {
|
||||
loop {
|
||||
let Some(req) = rx.next().await else {
|
||||
// the client has been dropped the node has *possibly* shut down
|
||||
return;
|
||||
};
|
||||
|
||||
self.check_unban_peers();
|
||||
|
||||
let span = tracing::debug_span!(parent: &req.span, "AddressBook");
|
||||
let _guard = span.enter();
|
||||
|
||||
tracing::debug!("{} received request: {}", self.book_name(), req.req);
|
||||
|
||||
let res = match req.req {
|
||||
AddressBookRequest::HandleNewPeerList(new_peers, _) => self
|
||||
.handle_new_peerlist(new_peers)
|
||||
.map(|_| AddressBookResponse::Ok),
|
||||
AddressBookRequest::SetPeerSeen(peer, last_seen) => self
|
||||
.set_peer_seen(peer, last_seen)
|
||||
.map(|_| AddressBookResponse::Ok),
|
||||
AddressBookRequest::BanPeer(peer, till) => {
|
||||
self.ban_peer(peer, till);
|
||||
Ok(AddressBookResponse::Ok)
|
||||
}
|
||||
AddressBookRequest::AddPeerToAnchor(peer) => self
|
||||
.add_peer_to_anchor(peer)
|
||||
.map(|_| AddressBookResponse::Ok),
|
||||
AddressBookRequest::RemovePeerFromAnchor(peer) => {
|
||||
self.remove_peer_from_anchor(peer);
|
||||
Ok(AddressBookResponse::Ok)
|
||||
}
|
||||
AddressBookRequest::UpdatePeerInfo(peer) => {
|
||||
self.update_peer_info(peer).map(|_| AddressBookResponse::Ok)
|
||||
}
|
||||
|
||||
AddressBookRequest::GetRandomGrayPeer(_) => match self.get_random_gray_peer() {
|
||||
Some(peer) => Ok(AddressBookResponse::Peer(peer)),
|
||||
None => Err(AddressBookError::PeerListEmpty),
|
||||
},
|
||||
AddressBookRequest::GetRandomWhitePeer(_) => match self.get_random_white_peer() {
|
||||
Some(peer) => Ok(AddressBookResponse::Peer(peer)),
|
||||
None => Err(AddressBookError::PeerListEmpty),
|
||||
},
|
||||
};
|
||||
|
||||
if let Err(e) = &res {
|
||||
tracing::debug!("Error when handling request, err: {e}")
|
||||
}
|
||||
|
||||
let _ = req.tx.send(res);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,282 +0,0 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use monero_wire::{messages::PeerListEntryBase, NetworkAddress};
|
||||
use rand::Rng;
|
||||
|
||||
pub struct PeerList {
|
||||
peers: HashMap<NetworkAddress, PeerListEntryBase>,
|
||||
pruning_idxs: HashMap<u32, Vec<NetworkAddress>>,
|
||||
}
|
||||
|
||||
impl PeerList {
|
||||
pub fn new(list: Vec<PeerListEntryBase>) -> PeerList {
|
||||
let mut peers = HashMap::with_capacity(list.len());
|
||||
let mut pruning_idxs = HashMap::with_capacity(8);
|
||||
|
||||
for peer in list {
|
||||
peers.insert(peer.adr, peer);
|
||||
|
||||
pruning_idxs
|
||||
.entry(peer.pruning_seed)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(peer.adr);
|
||||
}
|
||||
PeerList {
|
||||
peers,
|
||||
pruning_idxs,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.peers.len()
|
||||
}
|
||||
|
||||
pub fn add_new_peer(&mut self, peer: PeerListEntryBase) {
|
||||
if self.peers.insert(peer.adr, peer.clone()).is_none() {
|
||||
self.pruning_idxs
|
||||
.entry(peer.pruning_seed)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(peer.adr);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_peer(&self, peer: &NetworkAddress) -> Option<&PeerListEntryBase> {
|
||||
self.peers.get(peer)
|
||||
}
|
||||
|
||||
pub fn get_peer_by_idx(&self, n: usize) -> Option<&PeerListEntryBase> {
|
||||
self.peers.iter().nth(n).map(|(_, ret)| ret)
|
||||
}
|
||||
|
||||
pub fn get_random_peer<R: Rng>(&self, r: &mut R) -> Option<&PeerListEntryBase> {
|
||||
let len = self.len();
|
||||
if len == 0 {
|
||||
None
|
||||
} else {
|
||||
let n = r.gen_range(0..len);
|
||||
|
||||
self.get_peer_by_idx(n)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_peer_mut(&mut self, peer: &NetworkAddress) -> Option<&mut PeerListEntryBase> {
|
||||
self.peers.get_mut(peer)
|
||||
}
|
||||
|
||||
pub fn contains_peer(&self, peer: &NetworkAddress) -> bool {
|
||||
self.peers.contains_key(peer)
|
||||
}
|
||||
|
||||
pub fn get_peers_by_pruning_seed(
|
||||
&self,
|
||||
seed: &u32,
|
||||
) -> Option<impl Iterator<Item = &PeerListEntryBase>> {
|
||||
let addrs = self.pruning_idxs.get(seed)?;
|
||||
Some(addrs.iter().filter_map(move |addr| self.peers.get(addr)))
|
||||
}
|
||||
|
||||
fn remove_peer_pruning_idx(&mut self, peer: &PeerListEntryBase) {
|
||||
if let Some(peer_list) = self.pruning_idxs.get_mut(&peer.pruning_seed) {
|
||||
if let Some(idx) = peer_list.iter().position(|peer_adr| peer_adr == &peer.adr) {
|
||||
peer_list.remove(idx);
|
||||
} else {
|
||||
unreachable!("This function will only be called when the peer exists.");
|
||||
}
|
||||
} else {
|
||||
unreachable!("Pruning seed must exist if a peer has that seed.");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_peer(&mut self, peer: &NetworkAddress) -> Option<PeerListEntryBase> {
|
||||
let peer_eb = self.peers.remove(peer)?;
|
||||
self.remove_peer_pruning_idx(&peer_eb);
|
||||
Some(peer_eb)
|
||||
}
|
||||
|
||||
pub fn reduce_list(&mut self, must_keep_peers: &HashSet<NetworkAddress>, new_len: usize) {
|
||||
if new_len >= self.len() {
|
||||
return;
|
||||
}
|
||||
|
||||
let target_removed = self.len() - new_len;
|
||||
let mut removed_count = 0;
|
||||
let mut peers_to_remove: Vec<NetworkAddress> = Vec::with_capacity(target_removed);
|
||||
|
||||
for (peer_adr, _) in &self.peers {
|
||||
if removed_count >= target_removed {
|
||||
break;
|
||||
}
|
||||
if !must_keep_peers.contains(peer_adr) {
|
||||
peers_to_remove.push(*peer_adr);
|
||||
removed_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
for peer_adr in peers_to_remove {
|
||||
let _ = self.remove_peer(&peer_adr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashSet, vec};
|
||||
|
||||
use monero_wire::{messages::PeerListEntryBase, NetworkAddress};
|
||||
use rand::Rng;
|
||||
|
||||
use super::PeerList;
|
||||
|
||||
fn make_fake_peer_list(numb_o_peers: usize) -> PeerList {
|
||||
let mut peer_list = vec![PeerListEntryBase::default(); numb_o_peers];
|
||||
for (idx, peer) in peer_list.iter_mut().enumerate() {
|
||||
let NetworkAddress::IPv4(ip) = &mut peer.adr else {panic!("this test requires default to be ipv4")};
|
||||
ip.m_ip += idx as u32;
|
||||
}
|
||||
|
||||
PeerList::new(peer_list)
|
||||
}
|
||||
|
||||
fn make_fake_peer_list_with_random_pruning_seeds(numb_o_peers: usize) -> PeerList {
|
||||
let mut r = rand::thread_rng();
|
||||
|
||||
let mut peer_list = vec![PeerListEntryBase::default(); numb_o_peers];
|
||||
for (idx, peer) in peer_list.iter_mut().enumerate() {
|
||||
let NetworkAddress::IPv4(ip) = &mut peer.adr else {panic!("this test requires default to be ipv4")};
|
||||
ip.m_ip += idx as u32;
|
||||
|
||||
peer.pruning_seed = if r.gen_bool(0.4) {
|
||||
0
|
||||
} else {
|
||||
r.gen_range(384..=391)
|
||||
};
|
||||
}
|
||||
|
||||
PeerList::new(peer_list)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_list_reduce_length() {
|
||||
let mut peer_list = make_fake_peer_list(2090);
|
||||
let must_keep_peers = HashSet::new();
|
||||
|
||||
let target_len = 2000;
|
||||
|
||||
peer_list.reduce_list(&must_keep_peers, target_len);
|
||||
|
||||
assert_eq!(peer_list.len(), target_len);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_list_reduce_length_with_peers_we_need() {
|
||||
let mut peer_list = make_fake_peer_list(500);
|
||||
let must_keep_peers = HashSet::from_iter(peer_list.peers.iter().map(|(adr, _)| *adr));
|
||||
|
||||
let target_len = 49;
|
||||
|
||||
peer_list.reduce_list(&must_keep_peers, target_len);
|
||||
|
||||
// we can't remove any of the peers we said we need them all
|
||||
assert_eq!(peer_list.len(), 500);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_list_get_peers_by_pruning_seed() {
|
||||
let mut r = rand::thread_rng();
|
||||
|
||||
let peer_list = make_fake_peer_list_with_random_pruning_seeds(1000);
|
||||
let seed = if r.gen_bool(0.4) {
|
||||
0
|
||||
} else {
|
||||
r.gen_range(384..=391)
|
||||
};
|
||||
|
||||
let peers_with_seed = peer_list
|
||||
.get_peers_by_pruning_seed(&seed)
|
||||
.expect("If you hit this buy a lottery ticket");
|
||||
|
||||
for peer in peers_with_seed {
|
||||
assert_eq!(peer.pruning_seed, seed);
|
||||
}
|
||||
|
||||
assert_eq!(peer_list.len(), 1000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_list_remove_specific_peer() {
|
||||
let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
|
||||
|
||||
// generate peer at a random point in the list
|
||||
let mut peer = NetworkAddress::default();
|
||||
let NetworkAddress::IPv4(ip) = &mut peer else {panic!("this test requires default to be ipv4")};
|
||||
ip.m_ip += 50;
|
||||
|
||||
assert!(peer_list.remove_peer(&peer).is_some());
|
||||
|
||||
let pruning_idxs = peer_list.pruning_idxs;
|
||||
let peers = peer_list.peers;
|
||||
|
||||
for (_, addrs) in pruning_idxs {
|
||||
addrs.iter().for_each(|adr| assert!(adr != &peer))
|
||||
}
|
||||
|
||||
assert!(!peers.contains_key(&peer));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_list_pruning_idxs_are_correct() {
|
||||
let peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
|
||||
let mut total_len = 0;
|
||||
|
||||
for (seed, list) in peer_list.pruning_idxs {
|
||||
for peer in list.iter() {
|
||||
assert_eq!(peer_list.peers.get(peer).unwrap().pruning_seed, seed);
|
||||
total_len += 1;
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(total_len, peer_list.peers.len())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_list_add_new_peer() {
|
||||
let mut peer_list = make_fake_peer_list(10);
|
||||
let mut new_peer = PeerListEntryBase::default();
|
||||
let NetworkAddress::IPv4(ip) = &mut new_peer.adr else {panic!("this test requires default to be ipv4")};
|
||||
ip.m_ip += 50;
|
||||
|
||||
peer_list.add_new_peer(new_peer.clone());
|
||||
|
||||
assert_eq!(peer_list.len(), 11);
|
||||
assert_eq!(peer_list.get_peer(&new_peer.adr), Some(&new_peer));
|
||||
assert!(peer_list
|
||||
.pruning_idxs
|
||||
.get(&new_peer.pruning_seed)
|
||||
.unwrap()
|
||||
.contains(&new_peer.adr));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_list_add_existing_peer() {
|
||||
let mut peer_list = make_fake_peer_list(10);
|
||||
let existing_peer = peer_list
|
||||
.get_peer(&NetworkAddress::default())
|
||||
.unwrap()
|
||||
.clone();
|
||||
|
||||
peer_list.add_new_peer(existing_peer.clone());
|
||||
|
||||
assert_eq!(peer_list.len(), 10);
|
||||
assert_eq!(peer_list.get_peer(&existing_peer.adr), Some(&existing_peer));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_list_get_non_existent_peer() {
|
||||
let peer_list = make_fake_peer_list(10);
|
||||
let mut non_existent_peer = NetworkAddress::default();
|
||||
let NetworkAddress::IPv4(ip) = &mut non_existent_peer else {panic!("this test requires default to be ipv4")};
|
||||
ip.m_ip += 50;
|
||||
|
||||
assert_eq!(peer_list.get_peer(&non_existent_peer), None);
|
||||
}
|
||||
}
|
|
@ -1,3 +1 @@
|
|||
pub mod address_book;
|
||||
pub mod peer;
|
||||
mod protocol;
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
pub mod client;
|
||||
pub mod connection;
|
||||
pub mod handshaker;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use monero_wire::levin::BucketError;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error, Clone, Copy)]
|
||||
pub enum RequestServiceError {}
|
||||
|
||||
#[derive(Debug, Error, Clone, Copy)]
|
||||
pub enum PeerError {
|
||||
#[error("Peer is on a different network")]
|
||||
PeerIsOnAnotherNetwork,
|
||||
#[error("Peer sent an unexpected response")]
|
||||
PeerSentUnSolicitedResponse,
|
||||
#[error("Internal service did not respond when required")]
|
||||
InternalServiceDidNotRespond,
|
||||
#[error("Connection to peer has been terminated")]
|
||||
PeerConnectionClosed,
|
||||
#[error("The Client `internal` channel was closed")]
|
||||
ClientChannelClosed,
|
||||
#[error("The Peer sent an unexpected response")]
|
||||
PeerSentUnexpectedResponse,
|
||||
#[error("The peer sent a bad response: {0}")]
|
||||
ResponseError(&'static str),
|
||||
#[error("Internal service error: {0}")]
|
||||
InternalService(#[from] RequestServiceError),
|
||||
#[error("Internal peer sync channel closed")]
|
||||
InternalPeerSyncChannelClosed,
|
||||
#[error("Levin Error")]
|
||||
LevinError, // remove me, this is just temporary
|
||||
}
|
||||
|
||||
impl From<BucketError> for PeerError {
|
||||
fn from(_: BucketError) -> Self {
|
||||
PeerError::LevinError
|
||||
}
|
||||
}
|
|
@ -1,70 +0,0 @@
|
|||
use std::pin::Pin;
|
||||
use std::{future::Future, sync::Arc};
|
||||
|
||||
use crate::protocol::{InternalMessageRequest, InternalMessageResponse};
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
FutureExt,
|
||||
};
|
||||
use monero_wire::messages::PeerID;
|
||||
use monero_wire::{messages::common::PeerSupportFlags, NetworkAddress};
|
||||
|
||||
use super::{connection::ClientRequest, PeerError};
|
||||
|
||||
pub struct ConnectionInfo {
|
||||
pub addr: NetworkAddress,
|
||||
pub support_flags: PeerSupportFlags,
|
||||
/// Peer ID
|
||||
pub peer_id: PeerID,
|
||||
pub rpc_port: u16,
|
||||
pub rpc_credits_per_hash: u32,
|
||||
}
|
||||
|
||||
pub struct Client {
|
||||
pub connection_info: Arc<ConnectionInfo>,
|
||||
server_tx: mpsc::Sender<ClientRequest>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(
|
||||
connection_info: Arc<ConnectionInfo>,
|
||||
server_tx: mpsc::Sender<ClientRequest>,
|
||||
) -> Self {
|
||||
Client {
|
||||
connection_info,
|
||||
server_tx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl tower::Service<InternalMessageRequest> for Client {
|
||||
type Error = PeerError;
|
||||
type Response = InternalMessageResponse;
|
||||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.server_tx
|
||||
.poll_ready(cx)
|
||||
.map_err(|e| PeerError::ClientChannelClosed)
|
||||
}
|
||||
fn call(&mut self, req: InternalMessageRequest) -> Self::Future {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
match self.server_tx.try_send(ClientRequest { req, tx }) {
|
||||
Ok(()) => rx
|
||||
.map(|recv_result| {
|
||||
recv_result
|
||||
.expect("ClientRequest oneshot sender must not be dropped before send")
|
||||
})
|
||||
.boxed(),
|
||||
Err(_e) => {
|
||||
// TODO: better error handling
|
||||
futures::future::ready(Err(PeerError::ClientChannelClosed)).boxed()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,196 +0,0 @@
|
|||
use std::collections::HashSet;
|
||||
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use futures::stream::Fuse;
|
||||
use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt};
|
||||
|
||||
use levin::{MessageSink, MessageStream};
|
||||
use monero_wire::messages::CoreSyncData;
|
||||
use monero_wire::{levin, Message, NetworkAddress};
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use crate::protocol::{
|
||||
InternalMessageRequest, InternalMessageResponse, BLOCKS_IDS_SYNCHRONIZING_MAX_COUNT,
|
||||
P2P_MAX_PEERS_IN_HANDSHAKE,
|
||||
};
|
||||
|
||||
use super::PeerError;
|
||||
|
||||
pub enum PeerSyncChange {
|
||||
CoreSyncData(NetworkAddress, CoreSyncData),
|
||||
ObjectsResponse(NetworkAddress, Vec<[u8; 32]>, u64),
|
||||
PeerDisconnected(NetworkAddress),
|
||||
}
|
||||
|
||||
pub struct ClientRequest {
|
||||
pub req: InternalMessageRequest,
|
||||
pub tx: oneshot::Sender<Result<InternalMessageResponse, PeerError>>,
|
||||
}
|
||||
|
||||
pub enum State {
|
||||
WaitingForRequest,
|
||||
WaitingForResponse {
|
||||
request: InternalMessageRequest,
|
||||
tx: oneshot::Sender<Result<InternalMessageResponse, PeerError>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn expected_response_id(&self) -> Option<u32> {
|
||||
match self {
|
||||
Self::WaitingForRequest => None,
|
||||
Self::WaitingForResponse { request, tx: _ } => request.expected_id(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Connection<Svc, Aw, Ar> {
|
||||
address: NetworkAddress,
|
||||
state: State,
|
||||
sink: MessageSink<Aw, Message>,
|
||||
stream: Fuse<MessageStream<Ar, Message>>,
|
||||
client_rx: mpsc::Receiver<ClientRequest>,
|
||||
sync_state_tx: mpsc::Sender<PeerSyncChange>,
|
||||
svc: Svc,
|
||||
}
|
||||
|
||||
impl<Svc, Aw, Ar> Connection<Svc, Aw, Ar>
|
||||
where
|
||||
Svc: Service<InternalMessageRequest, Response = InternalMessageResponse, Error = PeerError>,
|
||||
Aw: AsyncWrite + std::marker::Unpin,
|
||||
Ar: AsyncRead + std::marker::Unpin,
|
||||
{
|
||||
pub fn new(
|
||||
address: NetworkAddress,
|
||||
sink: MessageSink<Aw, Message>,
|
||||
stream: MessageStream<Ar, Message>,
|
||||
client_rx: mpsc::Receiver<ClientRequest>,
|
||||
sync_state_tx: mpsc::Sender<PeerSyncChange>,
|
||||
svc: Svc,
|
||||
) -> Connection<Svc, Aw, Ar> {
|
||||
Connection {
|
||||
address,
|
||||
state: State::WaitingForRequest,
|
||||
sink,
|
||||
stream: stream.fuse(),
|
||||
client_rx,
|
||||
sync_state_tx,
|
||||
svc,
|
||||
}
|
||||
}
|
||||
async fn handle_response(&mut self, res: InternalMessageResponse) -> Result<(), PeerError> {
|
||||
let state = std::mem::replace(&mut self.state, State::WaitingForRequest);
|
||||
if let State::WaitingForResponse { request, tx } = state {
|
||||
match (request, &res) {
|
||||
(InternalMessageRequest::Handshake(_), InternalMessageResponse::Handshake(_)) => {}
|
||||
(
|
||||
InternalMessageRequest::SupportFlags(_),
|
||||
InternalMessageResponse::SupportFlags(_),
|
||||
) => {}
|
||||
(InternalMessageRequest::TimedSync(_), InternalMessageResponse::TimedSync(res)) => {
|
||||
}
|
||||
(
|
||||
InternalMessageRequest::GetObjectsRequest(req),
|
||||
InternalMessageResponse::GetObjectsResponse(res),
|
||||
) => {}
|
||||
(
|
||||
InternalMessageRequest::ChainRequest(_),
|
||||
InternalMessageResponse::ChainResponse(res),
|
||||
) => {}
|
||||
(
|
||||
InternalMessageRequest::FluffyMissingTransactionsRequest(req),
|
||||
InternalMessageResponse::NewFluffyBlock(blk),
|
||||
) => {}
|
||||
(
|
||||
InternalMessageRequest::GetTxPoolCompliment(_),
|
||||
InternalMessageResponse::NewTransactions(_),
|
||||
) => {
|
||||
// we could check we received no transactions that we said we knew about but thats going to happen later anyway when they get added to our
|
||||
// mempool
|
||||
}
|
||||
_ => return Err(PeerError::ResponseError("Peer sent incorrect response")),
|
||||
}
|
||||
// response passed our tests we can send it to the requestor
|
||||
let _ = tx.send(Ok(res));
|
||||
Ok(())
|
||||
} else {
|
||||
unreachable!("This will only be called when in state WaitingForResponse");
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_message_to_peer(&mut self, mes: impl Into<Message>) -> Result<(), PeerError> {
|
||||
Ok(self.sink.send(mes.into()).await?)
|
||||
}
|
||||
|
||||
async fn handle_peer_request(&mut self, req: InternalMessageRequest) -> Result<(), PeerError> {
|
||||
// we should check contents of peer requests for obvious errors like we do with responses
|
||||
let ready_svc = self.svc.ready().await?;
|
||||
let res = ready_svc.call(req).await?;
|
||||
self.send_message_to_peer(res).await
|
||||
}
|
||||
|
||||
async fn handle_client_request(&mut self, req: ClientRequest) -> Result<(), PeerError> {
|
||||
// check we need a response
|
||||
if let Some(_) = req.req.expected_id() {
|
||||
self.state = State::WaitingForResponse {
|
||||
request: req.req.clone(),
|
||||
tx: req.tx,
|
||||
};
|
||||
}
|
||||
self.send_message_to_peer(req.req).await
|
||||
}
|
||||
|
||||
async fn state_waiting_for_request(&mut self) -> Result<(), PeerError> {
|
||||
futures::select! {
|
||||
peer_message = self.stream.next() => {
|
||||
match peer_message.expect("MessageStream will never return None") {
|
||||
Ok(message) => {
|
||||
self.handle_peer_request(message.try_into().map_err(|_| PeerError::PeerSentUnexpectedResponse)?).await
|
||||
},
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
},
|
||||
client_req = self.client_rx.next() => {
|
||||
self.handle_client_request(client_req.ok_or(PeerError::ClientChannelClosed)?).await
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn state_waiting_for_response(&mut self) -> Result<(), PeerError> {
|
||||
// put a timeout on this
|
||||
let peer_message = self
|
||||
.stream
|
||||
.next()
|
||||
.await
|
||||
.expect("MessageStream will never return None")?;
|
||||
|
||||
if !peer_message.is_request()
|
||||
&& self.state.expected_response_id() == Some(peer_message.id())
|
||||
{
|
||||
if let Ok(res) = peer_message.try_into() {
|
||||
Ok(self.handle_response(res).await?)
|
||||
} else {
|
||||
// im almost certain this is impossible to hit, but im not certain enough to use unreachable!()
|
||||
Err(PeerError::ResponseError("Peer sent incorrect response"))
|
||||
}
|
||||
} else {
|
||||
if let Ok(req) = peer_message.try_into() {
|
||||
self.handle_peer_request(req).await
|
||||
} else {
|
||||
// this can be hit if the peer sends a protocol response with the wrong id
|
||||
Err(PeerError::ResponseError("Peer sent incorrect response"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
loop {
|
||||
let _res = match self.state {
|
||||
State::WaitingForRequest => self.state_waiting_for_request().await,
|
||||
State::WaitingForResponse { request: _, tx: _ } => {
|
||||
self.state_waiting_for_response().await
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,454 +0,0 @@
|
|||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::FutureExt;
|
||||
use futures::{channel::mpsc, AsyncRead, AsyncWrite, SinkExt, StreamExt};
|
||||
use monero_wire::messages::admin::{SupportFlagsRequest, SupportFlagsResponse};
|
||||
use monero_wire::messages::MessageRequest;
|
||||
use thiserror::Error;
|
||||
use tokio::time;
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use crate::address_book::{AddressBookError, AddressBookRequest, AddressBookResponse};
|
||||
use crate::protocol::temp_database::{DataBaseRequest, DataBaseResponse, DatabaseError};
|
||||
use crate::protocol::{
|
||||
Direction, InternalMessageRequest, InternalMessageResponse, P2P_MAX_PEERS_IN_HANDSHAKE,
|
||||
};
|
||||
use cuprate_common::{HardForks, Network, PruningSeed};
|
||||
use monero_wire::{
|
||||
levin::{BucketError, MessageSink, MessageStream},
|
||||
messages::{
|
||||
admin::{HandshakeRequest, HandshakeResponse},
|
||||
common::PeerSupportFlags,
|
||||
BasicNodeData, CoreSyncData, MessageResponse, PeerID, PeerListEntryBase,
|
||||
},
|
||||
Message, NetworkAddress,
|
||||
};
|
||||
use tracing::Instrument;
|
||||
|
||||
use super::client::Client;
|
||||
use super::{
|
||||
client::ConnectionInfo,
|
||||
connection::{ClientRequest, Connection, PeerSyncChange},
|
||||
PeerError,
|
||||
};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum HandShakeError {
|
||||
#[error("The peer did not complete the handshake fast enough")]
|
||||
PeerTimedOut,
|
||||
#[error("The peer has a weird pruning scheme")]
|
||||
PeerClaimedWeirdPruning,
|
||||
#[error("The peer has an unexpected top version")]
|
||||
PeerHasUnexpectedTopVersion,
|
||||
#[error("The peer does not have the minimum support flags")]
|
||||
PeerDoesNotHaveTheMinimumSupportFlags,
|
||||
#[error("The peer is on a different network")]
|
||||
PeerIsOnADifferentNetwork,
|
||||
#[error("Address book err: {0}")]
|
||||
AddressBookError(#[from] AddressBookError),
|
||||
#[error("The peer sent too many peers, considered spamming")]
|
||||
PeerSentTooManyPeers,
|
||||
#[error("The peer sent a wrong response to our handshake")]
|
||||
PeerSentWrongResponse,
|
||||
#[error("The syncer returned an error")]
|
||||
DataBaseError(#[from] DatabaseError),
|
||||
#[error("Bucket error while communicating with peer: {0}")]
|
||||
BucketError(#[from] BucketError),
|
||||
}
|
||||
|
||||
pub struct NetworkConfig {
|
||||
/// Port
|
||||
my_port: u32,
|
||||
/// The Network
|
||||
network: Network,
|
||||
/// Peer ID
|
||||
peer_id: PeerID,
|
||||
/// RPC Port
|
||||
rpc_port: u16,
|
||||
/// RPC Credits Per Hash
|
||||
rpc_credits_per_hash: u32,
|
||||
our_support_flags: PeerSupportFlags,
|
||||
minimum_peer_support_flags: PeerSupportFlags,
|
||||
handshake_timeout: time::Duration,
|
||||
max_in_peers: u32,
|
||||
target_out_peers: u32,
|
||||
}
|
||||
|
||||
impl Default for NetworkConfig {
|
||||
fn default() -> Self {
|
||||
NetworkConfig {
|
||||
my_port: 18080,
|
||||
network: Network::MainNet,
|
||||
peer_id: PeerID(21),
|
||||
rpc_port: 0,
|
||||
rpc_credits_per_hash: 0,
|
||||
our_support_flags: PeerSupportFlags::get_support_flag_fluffy_blocks(),
|
||||
minimum_peer_support_flags: PeerSupportFlags::from(0_u32),
|
||||
handshake_timeout: time::Duration::from_secs(5),
|
||||
max_in_peers: 13,
|
||||
target_out_peers: 21,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkConfig {
|
||||
pub fn basic_node_data(&self) -> BasicNodeData {
|
||||
BasicNodeData {
|
||||
my_port: self.my_port,
|
||||
network_id: self.network.network_id(),
|
||||
peer_id: self.peer_id,
|
||||
support_flags: self.our_support_flags,
|
||||
rpc_port: self.rpc_port,
|
||||
rpc_credits_per_hash: self.rpc_credits_per_hash,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Handshake<W, R> {
|
||||
sink: MessageSink<W, Message>,
|
||||
stream: MessageStream<R, Message>,
|
||||
direction: Direction,
|
||||
addr: NetworkAddress,
|
||||
}
|
||||
|
||||
pub struct Handshaker<Bc, Svc, AdrBook> {
|
||||
config: NetworkConfig,
|
||||
parent_span: tracing::Span,
|
||||
address_book: AdrBook,
|
||||
blockchain: Bc,
|
||||
peer_sync_states: mpsc::Sender<PeerSyncChange>,
|
||||
peer_request_service: Svc,
|
||||
}
|
||||
|
||||
impl<Bc, Svc, AdrBook, W, R> tower::Service<Handshake<W, R>> for Handshaker<Bc, Svc, AdrBook>
|
||||
where
|
||||
Bc: Service<DataBaseRequest, Response = DataBaseResponse, Error = DatabaseError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
Bc::Future: Send,
|
||||
|
||||
Svc: Service<InternalMessageRequest, Response = InternalMessageResponse, Error = PeerError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
Svc::Future: Send,
|
||||
|
||||
AdrBook: Service<AddressBookRequest, Response = AddressBookResponse, Error = AddressBookError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
AdrBook::Future: Send,
|
||||
|
||||
W: AsyncWrite + std::marker::Unpin + Send + 'static,
|
||||
R: AsyncRead + std::marker::Unpin + Send + 'static,
|
||||
{
|
||||
type Error = HandShakeError;
|
||||
type Response = Client;
|
||||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
std::task::Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Handshake<W, R>) -> Self::Future {
|
||||
let Handshake {
|
||||
sink: mut peer_sink,
|
||||
stream: mut peer_stream,
|
||||
direction,
|
||||
addr,
|
||||
} = req;
|
||||
|
||||
let span = tracing::debug_span!("Handshaker");
|
||||
|
||||
let connection_span = tracing::debug_span!(parent: &self.parent_span, "Connection");
|
||||
|
||||
let blockchain = self.blockchain.clone();
|
||||
let address_book = self.address_book.clone();
|
||||
let syncer_tx = self.peer_sync_states.clone();
|
||||
let peer_request_service = self.peer_request_service.clone();
|
||||
|
||||
let state_machine = HandshakeSM {
|
||||
peer_sink,
|
||||
peer_stream,
|
||||
direction,
|
||||
addr,
|
||||
network: self.config.network,
|
||||
basic_node_data: self.config.basic_node_data(),
|
||||
minimum_support_flags: self.config.minimum_peer_support_flags,
|
||||
address_book,
|
||||
blockchain,
|
||||
peer_request_service,
|
||||
connection_span,
|
||||
state: HandshakeState::Start,
|
||||
};
|
||||
|
||||
let ret = time::timeout(self.config.handshake_timeout, state_machine.do_handshake());
|
||||
|
||||
async move {
|
||||
match ret.await {
|
||||
Ok(handshake) => handshake,
|
||||
Err(_) => Err(HandShakeError::PeerTimedOut),
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
enum HandshakeState {
|
||||
Start,
|
||||
WaitingForHandshakeResponse,
|
||||
WaitingForSupportFlagResponse(BasicNodeData),
|
||||
Complete(BasicNodeData),
|
||||
}
|
||||
|
||||
impl HandshakeState {
|
||||
pub fn is_complete(&self) -> bool {
|
||||
matches!(self, HandshakeState::Complete(_))
|
||||
}
|
||||
|
||||
pub fn peer_basic_node_data(self) -> Option<BasicNodeData> {
|
||||
match self {
|
||||
HandshakeState::Complete(sup) => Some(sup),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct HandshakeSM<Bc, Svc, AdrBook, W, R> {
|
||||
peer_sink: MessageSink<W, Message>,
|
||||
peer_stream: MessageStream<R, Message>,
|
||||
direction: Direction,
|
||||
addr: NetworkAddress,
|
||||
network: Network,
|
||||
|
||||
basic_node_data: BasicNodeData,
|
||||
minimum_support_flags: PeerSupportFlags,
|
||||
address_book: AdrBook,
|
||||
blockchain: Bc,
|
||||
peer_request_service: Svc,
|
||||
connection_span: tracing::Span,
|
||||
|
||||
state: HandshakeState,
|
||||
}
|
||||
|
||||
impl<Bc, Svc, AdrBook, W, R> HandshakeSM<Bc, Svc, AdrBook, W, R>
|
||||
where
|
||||
Bc: Service<DataBaseRequest, Response = DataBaseResponse, Error = DatabaseError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
Bc::Future: Send,
|
||||
|
||||
Svc: Service<InternalMessageRequest, Response = InternalMessageResponse, Error = PeerError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
Svc::Future: Send,
|
||||
|
||||
AdrBook: Service<AddressBookRequest, Response = AddressBookResponse, Error = AddressBookError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
AdrBook::Future: Send,
|
||||
|
||||
W: AsyncWrite + std::marker::Unpin + Send + 'static,
|
||||
R: AsyncRead + std::marker::Unpin + Send + 'static,
|
||||
{
|
||||
async fn get_our_core_sync(&mut self) -> Result<CoreSyncData, DatabaseError> {
|
||||
let blockchain = self.blockchain.ready().await?;
|
||||
let DataBaseResponse::CoreSyncData(core_sync) = blockchain.call(DataBaseRequest::CoreSyncData).await? else {
|
||||
unreachable!("Database will always return the requested item")
|
||||
};
|
||||
Ok(core_sync)
|
||||
}
|
||||
|
||||
async fn send_handshake_req(
|
||||
&mut self,
|
||||
node_data: BasicNodeData,
|
||||
payload_data: CoreSyncData,
|
||||
) -> Result<(), HandShakeError> {
|
||||
let handshake_req = HandshakeRequest {
|
||||
node_data,
|
||||
payload_data,
|
||||
};
|
||||
|
||||
tracing::trace!("Sending handshake request: {handshake_req:?}");
|
||||
|
||||
let message: Message = Message::Request(handshake_req.into());
|
||||
self.peer_sink.send(message).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_handshake_res(&mut self) -> Result<HandshakeResponse, HandShakeError> {
|
||||
// put a timeout on this
|
||||
let Message::Response(MessageResponse::Handshake(handshake_res)) = self.peer_stream.next().await.expect("MessageSink will not return None")? else {
|
||||
return Err(HandShakeError::PeerSentWrongResponse);
|
||||
};
|
||||
|
||||
tracing::trace!("Received handshake response: {handshake_res:?}");
|
||||
|
||||
Ok(handshake_res)
|
||||
}
|
||||
|
||||
async fn send_support_flag_req(&mut self) -> Result<(), HandShakeError> {
|
||||
tracing::trace!("Peer sent no support flags, sending request");
|
||||
|
||||
let message: Message = Message::Request(SupportFlagsRequest.into());
|
||||
self.peer_sink.send(message).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_handshake_response(
|
||||
&mut self,
|
||||
res: HandshakeResponse,
|
||||
) -> Result<(), HandShakeError> {
|
||||
let HandshakeResponse {
|
||||
node_data: peer_node_data,
|
||||
payload_data: peer_core_sync,
|
||||
local_peerlist_new,
|
||||
} = res;
|
||||
|
||||
if !peer_node_data
|
||||
.support_flags
|
||||
.contains(&self.minimum_support_flags)
|
||||
{
|
||||
tracing::debug!("Handshake failed: peer does not have minimum support flags");
|
||||
return Err(HandShakeError::PeerDoesNotHaveTheMinimumSupportFlags);
|
||||
}
|
||||
|
||||
if peer_node_data.network_id != self.network.network_id() {
|
||||
tracing::debug!("Handshake failed: peer is on a different network");
|
||||
return Err(HandShakeError::PeerIsOnADifferentNetwork);
|
||||
}
|
||||
|
||||
if local_peerlist_new.len() > P2P_MAX_PEERS_IN_HANDSHAKE {
|
||||
tracing::debug!("Handshake failed: peer sent too many peers in response");
|
||||
return Err(HandShakeError::PeerSentTooManyPeers);
|
||||
}
|
||||
|
||||
// Tell the address book about the new peers
|
||||
self.address_book
|
||||
.ready()
|
||||
.await?
|
||||
.call(AddressBookRequest::HandleNewPeerList(
|
||||
local_peerlist_new,
|
||||
self.addr.get_zone(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
// coresync, pruning seed
|
||||
|
||||
if peer_node_data.support_flags.is_empty() {
|
||||
self.send_support_flag_req().await?;
|
||||
self.state = HandshakeState::WaitingForSupportFlagResponse(peer_node_data);
|
||||
} else {
|
||||
self.state = HandshakeState::Complete(peer_node_data);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_message_response(
|
||||
&mut self,
|
||||
response: MessageResponse,
|
||||
) -> Result<(), HandShakeError> {
|
||||
match (&mut self.state, response) {
|
||||
(
|
||||
HandshakeState::WaitingForHandshakeResponse,
|
||||
MessageResponse::Handshake(handshake),
|
||||
) => self.handle_handshake_response(handshake).await,
|
||||
(
|
||||
HandshakeState::WaitingForSupportFlagResponse(bnd),
|
||||
MessageResponse::SupportFlags(support_flags),
|
||||
) => {
|
||||
bnd.support_flags = support_flags.support_flags;
|
||||
self.state = HandshakeState::Complete(bnd.clone());
|
||||
Ok(())
|
||||
}
|
||||
_ => Err(HandShakeError::PeerSentWrongResponse),
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_support_flags(
|
||||
&mut self,
|
||||
support_flags: PeerSupportFlags,
|
||||
) -> Result<(), HandShakeError> {
|
||||
let message = Message::Response(SupportFlagsResponse { support_flags }.into());
|
||||
self.peer_sink.send(message).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn do_outbound_handshake(&mut self) -> Result<(), HandShakeError> {
|
||||
let core_sync = self.get_our_core_sync().await?;
|
||||
self.send_handshake_req(self.basic_node_data.clone(), core_sync)
|
||||
.await?;
|
||||
self.state = HandshakeState::WaitingForHandshakeResponse;
|
||||
|
||||
while !self.state.is_complete() {
|
||||
match self.peer_stream.next().await {
|
||||
Some(mes) => {
|
||||
let mes = mes?;
|
||||
match mes {
|
||||
Message::Request(MessageRequest::SupportFlags(_)) => {
|
||||
self.send_support_flags(self.basic_node_data.support_flags)
|
||||
.await?
|
||||
}
|
||||
Message::Response(response) => {
|
||||
self.handle_message_response(response).await?
|
||||
}
|
||||
_ => return Err(HandShakeError::PeerSentWrongResponse),
|
||||
}
|
||||
}
|
||||
None => unreachable!("peer_stream wont return None"),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn do_handshake(mut self) -> Result<Client, HandShakeError> {
|
||||
match self.direction {
|
||||
Direction::Outbound => self.do_outbound_handshake().await?,
|
||||
Direction::Inbound => todo!(),
|
||||
}
|
||||
|
||||
let (server_tx, server_rx) = mpsc::channel(3);
|
||||
|
||||
let (replace_me, replace_me_rx) = mpsc::channel(3);
|
||||
|
||||
let peer_node_data = self
|
||||
.state
|
||||
.peer_basic_node_data()
|
||||
.expect("We must be in state complete to be here");
|
||||
let connection_info = ConnectionInfo {
|
||||
addr: self.addr,
|
||||
support_flags: peer_node_data.support_flags,
|
||||
peer_id: peer_node_data.peer_id,
|
||||
rpc_port: peer_node_data.rpc_port,
|
||||
rpc_credits_per_hash: peer_node_data.rpc_credits_per_hash,
|
||||
};
|
||||
|
||||
let connection = Connection::new(
|
||||
self.addr,
|
||||
self.peer_sink,
|
||||
self.peer_stream,
|
||||
server_rx,
|
||||
replace_me,
|
||||
self.peer_request_service,
|
||||
);
|
||||
|
||||
let client = Client::new(connection_info.into(), server_tx);
|
||||
|
||||
tokio::task::spawn(connection.run().instrument(self.connection_span));
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
}
|
|
@ -1 +0,0 @@
|
|||
mod handshake;
|
|
@ -1 +0,0 @@
|
|||
pub use crate::peer::handshaker::{Handshake, Handshaker};
|
|
@ -1,13 +0,0 @@
|
|||
pub mod internal_network;
|
||||
pub mod temp_database;
|
||||
|
||||
pub use internal_network::{InternalMessageRequest, InternalMessageResponse};
|
||||
|
||||
pub const BLOCKS_IDS_SYNCHRONIZING_DEFAULT_COUNT: usize = 10000;
|
||||
pub const BLOCKS_IDS_SYNCHRONIZING_MAX_COUNT: usize = 25000;
|
||||
pub const P2P_MAX_PEERS_IN_HANDSHAKE: usize = 250;
|
||||
|
||||
pub enum Direction {
|
||||
Inbound,
|
||||
Outbound,
|
||||
}
|
|
@ -22,162 +22,37 @@
|
|||
/// Request: NewFluffyBlock, Response: None,
|
||||
/// Request: NewTransactions, Response: None
|
||||
///
|
||||
use monero_wire::messages::{
|
||||
AdminMessage, ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
|
||||
GetObjectsResponse, GetTxPoolCompliment, Handshake, Message, MessageNotification,
|
||||
MessageRequest, MessageResponse, NewBlock, NewFluffyBlock, NewTransactions, Ping,
|
||||
ProtocolMessage, SupportFlags, TimedSync,
|
||||
use monero_wire::{
|
||||
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
|
||||
GetObjectsResponse, GetTxPoolCompliment, HandshakeRequest, HandshakeResponse, NewBlock,
|
||||
NewFluffyBlock, NewTransactions, PingResponse, SupportFlagsResponse, TimedSyncRequest,
|
||||
TimedSyncResponse,
|
||||
};
|
||||
|
||||
macro_rules! client_request_peer_response {
|
||||
(
|
||||
Admin:
|
||||
$($admin_mes:ident),+
|
||||
Protocol:
|
||||
$(Request: $protocol_req:ident, Response: $(SOME: $protocol_res:ident)? $(NULL: $none:expr)? ),+
|
||||
) => {
|
||||
pub enum Request {
|
||||
Handshake(HandshakeRequest),
|
||||
TimedSync(TimedSyncRequest),
|
||||
Ping,
|
||||
SupportFlags,
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum InternalMessageRequest {
|
||||
$($admin_mes(<$admin_mes as AdminMessage>::Request),)+
|
||||
$($protocol_req(<$protocol_req as ProtocolMessage>::Notification),)+
|
||||
}
|
||||
|
||||
impl InternalMessageRequest {
|
||||
pub fn get_str_name(&self) -> &'static str {
|
||||
match self {
|
||||
$(InternalMessageRequest::$admin_mes(_) => $admin_mes::NAME,)+
|
||||
$(InternalMessageRequest::$protocol_req(_) => $protocol_req::NAME,)+
|
||||
}
|
||||
}
|
||||
pub fn id(&self) -> u32 {
|
||||
match self {
|
||||
$(InternalMessageRequest::$admin_mes(_) => $admin_mes::ID,)+
|
||||
$(InternalMessageRequest::$protocol_req(_) => $protocol_req::ID,)+
|
||||
}
|
||||
}
|
||||
pub fn expected_id(&self) -> Option<u32> {
|
||||
match self {
|
||||
$(InternalMessageRequest::$admin_mes(_) => Some($admin_mes::ID),)+
|
||||
$(InternalMessageRequest::$protocol_req(_) => $(Some($protocol_res::ID))? $($none)?,)+
|
||||
}
|
||||
}
|
||||
pub fn is_levin_request(&self) -> bool {
|
||||
match self {
|
||||
$(InternalMessageRequest::$admin_mes(_) => true,)+
|
||||
$(InternalMessageRequest::$protocol_req(_) => false,)+
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MessageRequest> for InternalMessageRequest {
|
||||
fn from(value: MessageRequest) -> Self {
|
||||
match value {
|
||||
$(MessageRequest::$admin_mes(mes) => InternalMessageRequest::$admin_mes(mes),)+
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<Message> for InternalMessageRequest {
|
||||
fn into(self) -> Message {
|
||||
match self {
|
||||
$(InternalMessageRequest::$admin_mes(mes) => Message::Request(MessageRequest::$admin_mes(mes)),)+
|
||||
$(InternalMessageRequest::$protocol_req(mes) => Message::Notification(MessageNotification::$protocol_req(mes)),)+
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NotAnInternalRequest;
|
||||
|
||||
impl TryFrom<Message> for InternalMessageRequest {
|
||||
type Error = NotAnInternalRequest;
|
||||
fn try_from(value: Message) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
Message::Response(_) => Err(NotAnInternalRequest),
|
||||
Message::Request(req) => Ok(req.into()),
|
||||
Message::Notification(noti) => {
|
||||
match noti {
|
||||
$(MessageNotification::$protocol_req(noti) => Ok(InternalMessageRequest::$protocol_req(noti)),)+
|
||||
_ => Err(NotAnInternalRequest),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum InternalMessageResponse {
|
||||
$($admin_mes(<$admin_mes as AdminMessage>::Response),)+
|
||||
$($($protocol_res(<$protocol_res as ProtocolMessage>::Notification),)?)+
|
||||
}
|
||||
|
||||
impl InternalMessageResponse {
|
||||
pub fn get_str_name(&self) -> &'static str {
|
||||
match self {
|
||||
$(InternalMessageResponse::$admin_mes(_) => $admin_mes::NAME,)+
|
||||
$($(InternalMessageResponse::$protocol_res(_) => $protocol_res::NAME,)?)+
|
||||
}
|
||||
}
|
||||
pub fn id(&self) -> u32 {
|
||||
match self{
|
||||
$(InternalMessageResponse::$admin_mes(_) => $admin_mes::ID,)+
|
||||
$($(InternalMessageResponse::$protocol_res(_) => $protocol_res::ID,)?)+
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MessageResponse> for InternalMessageResponse {
|
||||
fn from(value: MessageResponse) -> Self {
|
||||
match value {
|
||||
$(MessageResponse::$admin_mes(mes) => InternalMessageResponse::$admin_mes(mes),)+
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<Message> for InternalMessageResponse {
|
||||
fn into(self) -> Message {
|
||||
match self {
|
||||
$(InternalMessageResponse::$admin_mes(mes) => Message::Response(MessageResponse::$admin_mes(mes)),)+
|
||||
$($(InternalMessageResponse::$protocol_res(mes) => Message::Notification(MessageNotification::$protocol_res(mes)),)?)+
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NotAnInternalResponse;
|
||||
|
||||
impl TryFrom<Message> for InternalMessageResponse {
|
||||
type Error = NotAnInternalResponse;
|
||||
fn try_from(value: Message) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
Message::Response(res) => Ok(res.into()),
|
||||
Message::Request(_) => Err(NotAnInternalResponse),
|
||||
Message::Notification(noti) => {
|
||||
match noti {
|
||||
$($(MessageNotification::$protocol_res(noti) => Ok(InternalMessageResponse::$protocol_res(noti)),)?)+
|
||||
_ => Err(NotAnInternalResponse),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
GetObjects(GetObjectsRequest),
|
||||
GetChain(ChainRequest),
|
||||
FluffyMissingTxs(FluffyMissingTransactionsRequest),
|
||||
GetTxPollCompliment(GetTxPoolCompliment),
|
||||
NewBlock(NewBlock),
|
||||
NewFluffyBlock(NewFluffyBlock),
|
||||
NewTransactions(NewTransactions),
|
||||
}
|
||||
|
||||
client_request_peer_response!(
|
||||
Admin:
|
||||
Handshake,
|
||||
TimedSync,
|
||||
Ping,
|
||||
SupportFlags
|
||||
Protocol:
|
||||
Request: GetObjectsRequest, Response: SOME: GetObjectsResponse,
|
||||
Request: ChainRequest, Response: SOME: ChainResponse,
|
||||
Request: FluffyMissingTransactionsRequest, Response: SOME: NewFluffyBlock, // these 2 could be requests or responses
|
||||
Request: GetTxPoolCompliment, Response: SOME: NewTransactions, //
|
||||
// these don't need to be responded to
|
||||
Request: NewBlock, Response: NULL: None,
|
||||
Request: NewFluffyBlock, Response: NULL: None,
|
||||
Request: NewTransactions, Response: NULL: None
|
||||
);
|
||||
pub enum Response {
|
||||
Handshake(HandshakeResponse),
|
||||
TimedSync(TimedSyncResponse),
|
||||
Ping(PingResponse),
|
||||
SupportFlags(SupportFlagsResponse),
|
||||
|
||||
GetObjects(GetObjectsResponse),
|
||||
GetChain(ChainResponse),
|
||||
NewFluffyBlock(NewFluffyBlock),
|
||||
NewTransactions(NewTransactions),
|
||||
NA,
|
||||
}
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
pub mod internal_network;
|
||||
pub mod temp_database;
|
||||
|
||||
pub use internal_network::{InternalMessageRequest, InternalMessageResponse};
|
||||
|
||||
pub const BLOCKS_IDS_SYNCHRONIZING_DEFAULT_COUNT: usize = 10000;
|
||||
pub const BLOCKS_IDS_SYNCHRONIZING_MAX_COUNT: usize = 25000;
|
||||
pub const P2P_MAX_PEERS_IN_HANDSHAKE: usize = 250;
|
||||
|
||||
pub enum Direction {
|
||||
Inbound,
|
||||
Outbound,
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
use monero_wire::messages::CoreSyncData;
|
||||
use thiserror::Error;
|
||||
|
||||
pub enum BlockKnown {
|
||||
No,
|
||||
OnMainChain,
|
||||
OnSideChain,
|
||||
KnownBad,
|
||||
}
|
||||
|
||||
impl BlockKnown {
|
||||
pub fn is_known(&self) -> bool {
|
||||
!matches!(self, BlockKnown::No)
|
||||
}
|
||||
}
|
||||
|
||||
pub enum DataBaseRequest {
|
||||
CurrentHeight,
|
||||
CumulativeDifficulty,
|
||||
CoreSyncData,
|
||||
Chain,
|
||||
BlockHeight([u8; 32]),
|
||||
BlockKnown([u8; 32]),
|
||||
}
|
||||
|
||||
pub enum DataBaseResponse {
|
||||
CurrentHeight(u64),
|
||||
CumulativeDifficulty(u128),
|
||||
CoreSyncData(CoreSyncData),
|
||||
Chain(Vec<[u8; 32]>),
|
||||
BlockHeight(Option<u64>),
|
||||
BlockKnown(BlockKnown),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error, PartialEq, Eq)]
|
||||
pub enum DatabaseError {}
|
|
@ -1,21 +0,0 @@
|
|||
[package]
|
||||
name = "cuprate-sync-states"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
cuprate-common = {path = "../../common"}
|
||||
cuprate-peer = {path = "../peer"}
|
||||
cuprate-protocol = {path = "../protocol"}
|
||||
monero = {git="https://github.com/Boog900/monero-rs.git", branch="db", features=["database"]}
|
||||
monero-wire = {path= "../../net/monero-wire"}
|
||||
futures = "0.3.26"
|
||||
tower = {version = "0.4.13", features = ["util"]}
|
||||
thiserror = "1.0.39"
|
||||
|
||||
|
||||
tokio = {version="1.1", features=["full"]}
|
||||
tokio-util = {version ="0.7", features=["compat"]}
|
||||
|
|
@ -1,538 +0,0 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
use monero::Hash;
|
||||
use thiserror::Error;
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use cuprate_common::{hardforks, HardForks};
|
||||
use cuprate_peer::connection::PeerSyncChange;
|
||||
use cuprate_protocol::temp_database::{
|
||||
BlockKnown, DataBaseRequest, DataBaseResponse, DatabaseError,
|
||||
};
|
||||
use cuprate_protocol::{InternalMessageRequest, InternalMessageResponse};
|
||||
use monero_wire::messages::protocol::ChainResponse;
|
||||
use monero_wire::messages::{ChainRequest, CoreSyncData};
|
||||
use monero_wire::{Message, NetworkAddress};
|
||||
|
||||
// TODO: Move this!!!!!!!
|
||||
// ********************************
|
||||
|
||||
pub enum PeerSetRequest {
|
||||
DisconnectPeer(NetworkAddress),
|
||||
BanPeer(NetworkAddress),
|
||||
SendRequest(InternalMessageRequest, Option<NetworkAddress>),
|
||||
}
|
||||
|
||||
pub struct PeerSetResponse {
|
||||
peer: NetworkAddress,
|
||||
response: Option<InternalMessageResponse>,
|
||||
}
|
||||
|
||||
// *******************************
|
||||
#[derive(Debug, Default)]
|
||||
pub struct IndividualPeerSync {
|
||||
height: u64,
|
||||
// no grantee this is the same block as height
|
||||
top_id: Hash,
|
||||
top_version: u8,
|
||||
cumulative_difficulty: u128,
|
||||
/// the height the list of needed blocks starts at
|
||||
start_height: u64,
|
||||
/// list of block hashes our node does not have.
|
||||
needed_blocks: Vec<(Hash, Option<u64>)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PeersSyncData {
|
||||
peers: HashMap<NetworkAddress, IndividualPeerSync>,
|
||||
}
|
||||
|
||||
impl PeersSyncData {
|
||||
pub fn new_core_sync_data(
|
||||
&mut self,
|
||||
id: &NetworkAddress,
|
||||
core_sync: CoreSyncData,
|
||||
) -> Result<(), SyncStatesError> {
|
||||
let peer_data = self.peers.get_mut(&id);
|
||||
if peer_data.is_none() {
|
||||
let ips = IndividualPeerSync {
|
||||
height: core_sync.current_height,
|
||||
top_id: core_sync.top_id,
|
||||
top_version: core_sync.top_version,
|
||||
cumulative_difficulty: core_sync.cumulative_difficulty(),
|
||||
start_height: 0,
|
||||
needed_blocks: vec![],
|
||||
};
|
||||
self.peers.insert(*id, ips);
|
||||
} else {
|
||||
let peer_data = peer_data.unwrap();
|
||||
if peer_data.height > core_sync.current_height {
|
||||
return Err(SyncStatesError::PeersHeightHasDropped);
|
||||
}
|
||||
if peer_data.cumulative_difficulty > core_sync.cumulative_difficulty() {
|
||||
return Err(SyncStatesError::PeersCumulativeDifficultyDropped);
|
||||
}
|
||||
peer_data.height = core_sync.current_height;
|
||||
peer_data.cumulative_difficulty = core_sync.cumulative_difficulty();
|
||||
peer_data.top_id = core_sync.top_id;
|
||||
peer_data.top_version = core_sync.top_version;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn new_chain_response(
|
||||
&mut self,
|
||||
id: &NetworkAddress,
|
||||
chain_response: ChainResponse,
|
||||
needed_blocks: Vec<(Hash, Option<u64>)>,
|
||||
) -> Result<(), SyncStatesError> {
|
||||
let peer_data = self
|
||||
.peers
|
||||
.get_mut(&id)
|
||||
.expect("Peers must give use their core sync before chain response");
|
||||
|
||||
// it's sad we have to do this so late in the response validation process
|
||||
if peer_data.height > chain_response.total_height {
|
||||
return Err(SyncStatesError::PeersHeightHasDropped);
|
||||
}
|
||||
if peer_data.cumulative_difficulty > chain_response.cumulative_difficulty() {
|
||||
return Err(SyncStatesError::PeersCumulativeDifficultyDropped);
|
||||
}
|
||||
|
||||
peer_data.cumulative_difficulty = chain_response.cumulative_difficulty();
|
||||
peer_data.height = chain_response.total_height;
|
||||
peer_data.start_height = chain_response.start_height
|
||||
+ chain_response.m_block_ids.len() as u64
|
||||
- needed_blocks.len() as u64;
|
||||
peer_data.needed_blocks = needed_blocks;
|
||||
Ok(())
|
||||
}
|
||||
// returns true if we have ran out of known blocks for that peer
|
||||
pub fn new_objects_response(
|
||||
&mut self,
|
||||
id: &NetworkAddress,
|
||||
mut block_ids: HashSet<Hash>,
|
||||
) -> Result<bool, SyncStatesError> {
|
||||
let peer_data = self
|
||||
.peers
|
||||
.get_mut(id)
|
||||
.expect("Peers must give use their core sync before objects response");
|
||||
let mut i = 0;
|
||||
if peer_data.needed_blocks.is_empty() {
|
||||
return Ok(true);
|
||||
}
|
||||
while !block_ids.contains(&peer_data.needed_blocks[i].0) {
|
||||
i += 1;
|
||||
if i == peer_data.needed_blocks.len() {
|
||||
peer_data.needed_blocks = vec![];
|
||||
peer_data.start_height = 0;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
for _ in 0..block_ids.len() {
|
||||
if !block_ids.remove(&peer_data.needed_blocks[i].0) {
|
||||
return Err(SyncStatesError::PeerSentAnUnexpectedBlockId);
|
||||
}
|
||||
i += 1;
|
||||
if i == peer_data.needed_blocks.len() {
|
||||
peer_data.needed_blocks = vec![];
|
||||
peer_data.start_height = 0;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
peer_data.needed_blocks = peer_data.needed_blocks[i..].to_vec();
|
||||
peer_data.start_height = peer_data.start_height + i as u64;
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
pub fn peer_disconnected(&mut self, id: &NetworkAddress) {
|
||||
let _ = self.peers.remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error, PartialEq, Eq)]
|
||||
pub enum SyncStatesError {
|
||||
#[error("Peer sent a block id we know is bad")]
|
||||
PeerSentKnownBadBlock,
|
||||
#[error("Peer sent a block id we weren't expecting")]
|
||||
PeerSentAnUnexpectedBlockId,
|
||||
#[error("Peer sent a chain entry where we don't know the start")]
|
||||
PeerSentNoneOverlappingFirstBlock,
|
||||
#[error("We have the peers block just at a different height")]
|
||||
WeHaveBlockAtDifferentHeight,
|
||||
#[error("The peer sent a top version we weren't expecting")]
|
||||
PeerSentBadTopVersion,
|
||||
#[error("The peer sent a weird pruning seed")]
|
||||
PeerSentBadPruningSeed,
|
||||
#[error("The peer height has dropped")]
|
||||
PeersHeightHasDropped,
|
||||
#[error("The peers cumulative difficulty has dropped")]
|
||||
PeersCumulativeDifficultyDropped,
|
||||
#[error("Our database returned an error: {0}")]
|
||||
DataBaseError(#[from] DatabaseError),
|
||||
}
|
||||
|
||||
pub struct SyncStates<Db> {
|
||||
peer_sync_rx: mpsc::Receiver<PeerSyncChange>,
|
||||
hardforks: HardForks,
|
||||
peer_sync_states: Arc<Mutex<PeersSyncData>>,
|
||||
blockchain: Db,
|
||||
}
|
||||
|
||||
impl<Db> SyncStates<Db>
|
||||
where
|
||||
Db: Service<DataBaseRequest, Response = DataBaseResponse, Error = DatabaseError>,
|
||||
{
|
||||
pub fn new(
|
||||
peer_sync_rx: mpsc::Receiver<PeerSyncChange>,
|
||||
hardforks: HardForks,
|
||||
peer_sync_states: Arc<Mutex<PeersSyncData>>,
|
||||
blockchain: Db,
|
||||
) -> Self {
|
||||
SyncStates {
|
||||
peer_sync_rx,
|
||||
hardforks,
|
||||
peer_sync_states,
|
||||
blockchain,
|
||||
}
|
||||
}
|
||||
async fn send_database_request(
|
||||
&mut self,
|
||||
req: DataBaseRequest,
|
||||
) -> Result<DataBaseResponse, DatabaseError> {
|
||||
let ready_blockchain = self.blockchain.ready().await?;
|
||||
ready_blockchain.call(req).await
|
||||
}
|
||||
|
||||
async fn handle_core_sync_change(
|
||||
&mut self,
|
||||
id: &NetworkAddress,
|
||||
core_sync: CoreSyncData,
|
||||
) -> Result<bool, SyncStatesError> {
|
||||
if core_sync.current_height > 0 {
|
||||
let version = self
|
||||
.hardforks
|
||||
.get_ideal_version_from_height(core_sync.current_height - 1);
|
||||
if version >= 6 && version != core_sync.top_version {
|
||||
return Err(SyncStatesError::PeerSentBadTopVersion);
|
||||
}
|
||||
}
|
||||
if core_sync.pruning_seed != 0 {
|
||||
let log_stripes =
|
||||
monero::database::pruning::get_pruning_log_stripes(core_sync.pruning_seed);
|
||||
let stripe =
|
||||
monero::database::pruning::get_pruning_stripe_for_seed(core_sync.pruning_seed);
|
||||
if stripe != monero::database::pruning::CRYPTONOTE_PRUNING_LOG_STRIPES
|
||||
|| stripe > (1 << log_stripes)
|
||||
{
|
||||
return Err(SyncStatesError::PeerSentBadPruningSeed);
|
||||
}
|
||||
}
|
||||
//if core_sync.current_height > max block numb
|
||||
let DataBaseResponse::BlockHeight(height) = self.send_database_request(DataBaseRequest::BlockHeight(core_sync.top_id)).await? else {
|
||||
unreachable!("the blockchain won't send the wrong response");
|
||||
};
|
||||
|
||||
let behind: bool;
|
||||
|
||||
if let Some(height) = height {
|
||||
if height != core_sync.current_height {
|
||||
return Err(SyncStatesError::WeHaveBlockAtDifferentHeight);
|
||||
}
|
||||
behind = false;
|
||||
} else {
|
||||
let DataBaseResponse::CumulativeDifficulty(cumulative_diff) = self.send_database_request(DataBaseRequest::CumulativeDifficulty).await? else {
|
||||
unreachable!("the blockchain won't send the wrong response");
|
||||
};
|
||||
// if their chain has more POW we want it
|
||||
if cumulative_diff < core_sync.cumulative_difficulty() {
|
||||
behind = true;
|
||||
} else {
|
||||
behind = false;
|
||||
}
|
||||
}
|
||||
|
||||
let mut sync_states = self.peer_sync_states.lock().unwrap();
|
||||
sync_states.new_core_sync_data(id, core_sync)?;
|
||||
|
||||
Ok(behind)
|
||||
}
|
||||
|
||||
async fn handle_chain_entry_response(
|
||||
&mut self,
|
||||
id: &NetworkAddress,
|
||||
chain_response: ChainResponse,
|
||||
) -> Result<(), SyncStatesError> {
|
||||
let mut expect_unknown = false;
|
||||
let mut needed_blocks = Vec::with_capacity(chain_response.m_block_ids.len());
|
||||
|
||||
for (index, block_id) in chain_response.m_block_ids.iter().enumerate() {
|
||||
let DataBaseResponse::BlockKnown(known) = self.send_database_request(DataBaseRequest::BlockKnown(*block_id)).await? else {
|
||||
unreachable!("the blockchain won't send the wrong response");
|
||||
};
|
||||
if index == 0 {
|
||||
if !known.is_known() {
|
||||
return Err(SyncStatesError::PeerSentNoneOverlappingFirstBlock);
|
||||
}
|
||||
} else {
|
||||
match known {
|
||||
BlockKnown::No => expect_unknown = true,
|
||||
BlockKnown::OnMainChain => {
|
||||
if expect_unknown {
|
||||
return Err(SyncStatesError::PeerSentAnUnexpectedBlockId);
|
||||
} else {
|
||||
let DataBaseResponse::BlockHeight(height) = self.send_database_request(DataBaseRequest::BlockHeight(*block_id)).await? else {
|
||||
unreachable!("the blockchain won't send the wrong response");
|
||||
};
|
||||
if chain_response.start_height + index as u64
|
||||
!= height.expect("We already know this block is in our main chain.")
|
||||
{
|
||||
return Err(SyncStatesError::WeHaveBlockAtDifferentHeight);
|
||||
}
|
||||
}
|
||||
}
|
||||
BlockKnown::OnSideChain => {
|
||||
if expect_unknown {
|
||||
return Err(SyncStatesError::PeerSentAnUnexpectedBlockId);
|
||||
}
|
||||
}
|
||||
BlockKnown::KnownBad => return Err(SyncStatesError::PeerSentKnownBadBlock),
|
||||
}
|
||||
}
|
||||
let block_weight = chain_response.m_block_weights.get(index).map(|f| f.clone());
|
||||
needed_blocks.push((*block_id, block_weight));
|
||||
}
|
||||
let mut sync_states = self.peer_sync_states.lock().unwrap();
|
||||
sync_states.new_chain_response(id, chain_response, needed_blocks)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build_chain_request(&mut self) -> Result<ChainRequest, DatabaseError> {
|
||||
let DataBaseResponse::Chain(ids) = self.send_database_request(DataBaseRequest::Chain).await? else {
|
||||
unreachable!("the blockchain won't send the wrong response");
|
||||
};
|
||||
|
||||
Ok(ChainRequest {
|
||||
block_ids: ids,
|
||||
prune: false,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_peers_chain_entry<Svc>(
|
||||
&mut self,
|
||||
peer_set: &mut Svc,
|
||||
id: &NetworkAddress,
|
||||
) -> Result<ChainResponse, DatabaseError>
|
||||
where
|
||||
Svc: Service<PeerSetRequest, Response = PeerSetResponse, Error = DatabaseError>,
|
||||
{
|
||||
let chain_req = self.build_chain_request().await?;
|
||||
let ready_set = peer_set.ready().await.unwrap();
|
||||
let response: PeerSetResponse = ready_set
|
||||
.call(PeerSetRequest::SendRequest(
|
||||
Message::Notification(chain_req.into())
|
||||
.try_into()
|
||||
.expect("Chain request can always be converted to IMR"),
|
||||
Some(*id),
|
||||
))
|
||||
.await?;
|
||||
let InternalMessageResponse::ChainResponse(response) = response.response.expect("peer set will return a result for a chain request") else {
|
||||
unreachable!("peer set will return correct response");
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn get_and_handle_chain_entry<Svc>(
|
||||
&mut self,
|
||||
peer_set: &mut Svc,
|
||||
id: NetworkAddress,
|
||||
) -> Result<(), SyncStatesError>
|
||||
where
|
||||
Svc: Service<PeerSetRequest, Response = PeerSetResponse, Error = DatabaseError>,
|
||||
{
|
||||
let chain_response = self.get_peers_chain_entry(peer_set, &id).await?;
|
||||
self.handle_chain_entry_response(&id, chain_response).await
|
||||
}
|
||||
|
||||
async fn handle_objects_response(
|
||||
&mut self,
|
||||
id: NetworkAddress,
|
||||
block_ids: Vec<Hash>,
|
||||
peers_height: u64,
|
||||
) -> Result<bool, SyncStatesError> {
|
||||
let mut sync_states = self.peer_sync_states.lock().unwrap();
|
||||
let ran_out_of_blocks =
|
||||
sync_states.new_objects_response(&id, HashSet::from_iter(block_ids))?;
|
||||
drop(sync_states);
|
||||
if ran_out_of_blocks {
|
||||
let DataBaseResponse::CurrentHeight(our_height) = self.send_database_request(DataBaseRequest::CurrentHeight).await? else {
|
||||
unreachable!("the blockchain won't send the wrong response");
|
||||
};
|
||||
if our_height < peers_height {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn handle_peer_disconnect(&mut self, id: NetworkAddress) {
|
||||
let mut sync_states = self.peer_sync_states.lock().unwrap();
|
||||
sync_states.peer_disconnected(&id);
|
||||
}
|
||||
|
||||
pub async fn run<Svc>(mut self, mut peer_set: Svc)
|
||||
where
|
||||
Svc: Service<PeerSetRequest, Response = PeerSetResponse, Error = DatabaseError>,
|
||||
{
|
||||
loop {
|
||||
let Some(change) = self.peer_sync_rx.next().await else {
|
||||
// is this best?
|
||||
return;
|
||||
};
|
||||
|
||||
match change {
|
||||
PeerSyncChange::CoreSyncData(id, csd) => {
|
||||
match self.handle_core_sync_change(&id, csd).await {
|
||||
Err(_) => {
|
||||
// TODO: check if error needs ban or forget
|
||||
let ready_set = peer_set.ready().await.unwrap();
|
||||
let res = ready_set.call(PeerSetRequest::BanPeer(id)).await;
|
||||
}
|
||||
Ok(request_chain) => {
|
||||
if request_chain {
|
||||
self.get_and_handle_chain_entry(&mut peer_set, id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
PeerSyncChange::ObjectsResponse(id, block_ids, height) => {
|
||||
match self.handle_objects_response(id, block_ids, height).await {
|
||||
Err(_) => {
|
||||
// TODO: check if error needs ban or forget
|
||||
let ready_set = peer_set.ready().await.unwrap();
|
||||
let res = ready_set.call(PeerSetRequest::BanPeer(id)).await;
|
||||
}
|
||||
Ok(res) => {
|
||||
if res {
|
||||
self.get_and_handle_chain_entry(&mut peer_set, id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
PeerSyncChange::PeerDisconnected(id) => {
|
||||
self.handle_peer_disconnect(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use monero::Hash;
|
||||
use monero_wire::messages::{ChainResponse, CoreSyncData};
|
||||
|
||||
use crate::{PeersSyncData, SyncStatesError};
|
||||
|
||||
#[test]
|
||||
fn peer_sync_data_good_core_sync() {
|
||||
let mut peer_sync_states = PeersSyncData::default();
|
||||
let core_sync = CoreSyncData::new(65346753, 1232, 389, Hash::null(), 1);
|
||||
|
||||
peer_sync_states
|
||||
.new_core_sync_data(&monero_wire::NetworkAddress::default(), core_sync)
|
||||
.unwrap();
|
||||
|
||||
let new_core_sync = CoreSyncData::new(65346754, 1233, 389, Hash::null(), 1);
|
||||
|
||||
peer_sync_states
|
||||
.new_core_sync_data(&monero_wire::NetworkAddress::default(), new_core_sync)
|
||||
.unwrap();
|
||||
|
||||
let peer = peer_sync_states
|
||||
.peers
|
||||
.get(&monero_wire::NetworkAddress::default())
|
||||
.unwrap();
|
||||
assert_eq!(peer.height, 1233);
|
||||
assert_eq!(peer.cumulative_difficulty, 65346754);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_sync_data_peer_height_dropped() {
|
||||
let mut peer_sync_states = PeersSyncData::default();
|
||||
let core_sync = CoreSyncData::new(65346753, 1232, 389, Hash::null(), 1);
|
||||
|
||||
peer_sync_states
|
||||
.new_core_sync_data(&monero_wire::NetworkAddress::default(), core_sync)
|
||||
.unwrap();
|
||||
|
||||
let new_core_sync = CoreSyncData::new(65346754, 1231, 389, Hash::null(), 1);
|
||||
|
||||
let res = peer_sync_states
|
||||
.new_core_sync_data(&monero_wire::NetworkAddress::default(), new_core_sync)
|
||||
.unwrap_err();
|
||||
|
||||
assert_eq!(res, SyncStatesError::PeersHeightHasDropped);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_sync_data_peer_cumulative_difficulty_dropped() {
|
||||
let mut peer_sync_states = PeersSyncData::default();
|
||||
let core_sync = CoreSyncData::new(65346753, 1232, 389, Hash::null(), 1);
|
||||
|
||||
peer_sync_states
|
||||
.new_core_sync_data(&monero_wire::NetworkAddress::default(), core_sync)
|
||||
.unwrap();
|
||||
|
||||
let new_core_sync = CoreSyncData::new(65346752, 1233, 389, Hash::null(), 1);
|
||||
|
||||
let res = peer_sync_states
|
||||
.new_core_sync_data(&monero_wire::NetworkAddress::default(), new_core_sync)
|
||||
.unwrap_err();
|
||||
|
||||
assert_eq!(res, SyncStatesError::PeersCumulativeDifficultyDropped);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_sync_new_chain_response() {
|
||||
let mut peer_sync_states = PeersSyncData::default();
|
||||
let core_sync = CoreSyncData::new(65346753, 1232, 389, Hash::null(), 1);
|
||||
|
||||
peer_sync_states
|
||||
.new_core_sync_data(&monero_wire::NetworkAddress::default(), core_sync)
|
||||
.unwrap();
|
||||
|
||||
let chain_response = ChainResponse::new(
|
||||
10,
|
||||
1233,
|
||||
65346754,
|
||||
vec![Hash::new(&[1]), Hash::new(&[2])],
|
||||
vec![],
|
||||
vec![],
|
||||
);
|
||||
|
||||
let needed_blocks = vec![(Hash::new(&[2]), None)];
|
||||
|
||||
peer_sync_states
|
||||
.new_chain_response(
|
||||
&monero_wire::NetworkAddress::default(),
|
||||
chain_response,
|
||||
needed_blocks,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let peer = peer_sync_states
|
||||
.peers
|
||||
.get(&monero_wire::NetworkAddress::default())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(peer.start_height, 11);
|
||||
assert_eq!(peer.height, 1233);
|
||||
assert_eq!(peer.cumulative_difficulty, 65346754);
|
||||
assert_eq!(peer.needed_blocks, vec![(Hash::new(&[2]), None)]);
|
||||
}
|
||||
}
|
|
@ -1,109 +0,0 @@
|
|||
use std::{
|
||||
pin::Pin,
|
||||
str::FromStr,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use cuprate_common::{HardForks, Network};
|
||||
use cuprate_peer::PeerError;
|
||||
use cuprate_protocol::{
|
||||
temp_database::{BlockKnown, DataBaseRequest, DataBaseResponse, DatabaseError},
|
||||
Direction, InternalMessageRequest, InternalMessageResponse,
|
||||
};
|
||||
use cuprate_sync_states::SyncStates;
|
||||
use futures::{channel::mpsc, Future, FutureExt};
|
||||
use monero::Hash;
|
||||
use monero_wire::messages::{admin::HandshakeResponse, CoreSyncData};
|
||||
use tower::ServiceExt;
|
||||
|
||||
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
|
||||
|
||||
struct TestBlockchain;
|
||||
|
||||
impl tower::Service<DataBaseRequest> for TestBlockchain {
|
||||
type Error = DatabaseError;
|
||||
type Response = DataBaseResponse;
|
||||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
std::task::Poll::Ready(Ok(()))
|
||||
}
|
||||
fn call(&mut self, req: DataBaseRequest) -> Self::Future {
|
||||
let res = match req {
|
||||
DataBaseRequest::BlockHeight(h) => DataBaseResponse::BlockHeight(Some(221)),
|
||||
DataBaseRequest::BlockKnown(_) => DataBaseResponse::BlockKnown(BlockKnown::OnMainChain),
|
||||
DataBaseRequest::Chain => todo!(),
|
||||
DataBaseRequest::CoreSyncData => {
|
||||
DataBaseResponse::CoreSyncData(CoreSyncData::new(0, 0, 0, Hash::null(), 0))
|
||||
}
|
||||
DataBaseRequest::CumulativeDifficulty => DataBaseResponse::CumulativeDifficulty(0),
|
||||
DataBaseRequest::CurrentHeight => DataBaseResponse::CurrentHeight(0),
|
||||
};
|
||||
|
||||
async { Ok(res) }.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct TestPeerRequest;
|
||||
|
||||
impl tower::Service<InternalMessageRequest> for TestPeerRequest {
|
||||
type Error = PeerError;
|
||||
type Response = InternalMessageResponse;
|
||||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
todo!()
|
||||
}
|
||||
fn call(&mut self, req: InternalMessageRequest) -> Self::Future {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_p2p_conn() {
|
||||
let conf = cuprate_peer::handshaker::NetworkConfig::default();
|
||||
let (addr_tx, addr_rx) = mpsc::channel(21);
|
||||
let (sync_tx, sync_rx) = mpsc::channel(21);
|
||||
let peer_sync_states = Arc::new(Mutex::default());
|
||||
|
||||
let peer_sync_states = SyncStates::new(
|
||||
sync_rx,
|
||||
HardForks::new(Network::MainNet),
|
||||
peer_sync_states,
|
||||
TestBlockchain,
|
||||
);
|
||||
|
||||
let mut handshaker = cuprate_peer::handshaker::Handshaker::new(
|
||||
conf,
|
||||
addr_tx,
|
||||
TestBlockchain,
|
||||
sync_tx,
|
||||
TestPeerRequest.boxed_clone(),
|
||||
);
|
||||
|
||||
let soc = tokio::net::TcpSocket::new_v4().unwrap();
|
||||
let addr = std::net::SocketAddr::from_str("127.0.0.1:18080").unwrap();
|
||||
|
||||
let mut con = soc.connect(addr).await.unwrap();
|
||||
|
||||
let (r_h, w_h) = con.split();
|
||||
|
||||
let (client, conn) = handshaker
|
||||
.complete_handshake(
|
||||
r_h.compat(),
|
||||
w_h.compat_write(),
|
||||
Direction::Outbound,
|
||||
monero_wire::NetworkAddress::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
//conn.run().await;
|
||||
}
|
Loading…
Reference in a new issue