fix merge

This commit is contained in:
Boog900 2024-10-03 01:53:47 +01:00
parent a7553c20de
commit 69f9d84ae1
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
12 changed files with 147 additions and 54 deletions

48
Cargo.lock generated
View file

@ -1862,6 +1862,16 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.19" version = "0.2.19"
@ -1899,6 +1909,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]] [[package]]
name = "page_size" name = "page_size"
version = "0.6.0" version = "0.6.0"
@ -2514,6 +2530,15 @@ dependencies = [
"keccak", "keccak",
] ]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]] [[package]]
name = "shlex" name = "shlex"
version = "1.3.0" version = "1.3.0"
@ -2894,6 +2919,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [ dependencies = [
"once_cell", "once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
] ]
[[package]] [[package]]
@ -2902,7 +2939,12 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [ dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core", "tracing-core",
"tracing-log",
] ]
[[package]] [[package]]
@ -2985,6 +3027,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]] [[package]]
name = "version_check" name = "version_check"
version = "0.9.5" version = "0.9.5"

View file

@ -35,7 +35,7 @@ pub use free::{handle_incoming_block, IncomingBlockError};
pub async fn check_add_genesis( pub async fn check_add_genesis(
blockchain_read_handle: &mut BlockchainReadHandle, blockchain_read_handle: &mut BlockchainReadHandle,
blockchain_write_handle: &mut BlockchainWriteHandle, blockchain_write_handle: &mut BlockchainWriteHandle,
network: &Network, network: Network,
) { ) {
// Try to get the chain height, will fail if the genesis block is not in the DB. // Try to get the chain height, will fail if the genesis block is not in the DB.
if blockchain_read_handle if blockchain_read_handle

View file

@ -68,7 +68,12 @@ pub async fn handle_incoming_block(
return Ok(false); return Ok(false);
}; };
if !BLOCKS_BEING_HANDLED.get_or_init(|| Mutex::new(HashSet::new())).lock().unwrap().insert(block_hash) { if !BLOCKS_BEING_HANDLED
.get_or_init(|| Mutex::new(HashSet::new()))
.lock()
.unwrap()
.insert(block_hash)
{
return Ok(false); return Ok(false);
} }
@ -83,12 +88,17 @@ pub async fn handle_incoming_block(
.await .await
.expect("TODO: don't actually panic here"); .expect("TODO: don't actually panic here");
let res =response_rx let res = response_rx
.await .await
.unwrap() .unwrap()
.map_err(IncomingBlockError::InvalidBlock); .map_err(IncomingBlockError::InvalidBlock);
BLOCKS_BEING_HANDLED.get().unwrap().lock().unwrap().remove(&block_hash); BLOCKS_BEING_HANDLED
.get()
.unwrap()
.lock()
.unwrap()
.remove(&block_hash);
res res
} }

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use futures::StreamExt; use futures::StreamExt;
use tokio::time::interval;
use tokio::{ use tokio::{
sync::{mpsc, Notify}, sync::{mpsc, Notify},
time::sleep, time::sleep,
@ -17,6 +18,8 @@ use cuprate_p2p::{
}; };
use cuprate_p2p_core::ClearNet; use cuprate_p2p_core::ClearNet;
const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30);
/// An error returned from the [`syncer`]. /// An error returned from the [`syncer`].
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum SyncerError { pub enum SyncerError {
@ -50,6 +53,8 @@ where
{ {
tracing::info!("Starting blockchain syncer"); tracing::info!("Starting blockchain syncer");
let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY);
let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc
.ready() .ready()
.await? .await?
@ -59,26 +64,21 @@ where
panic!("Blockchain context service returned wrong response!"); panic!("Blockchain context service returned wrong response!");
}; };
let mut peer_sync_watch = clearnet_interface.top_sync_stream(); let client_pool = clearnet_interface.client_pool();
tracing::debug!("Waiting for new sync info in top sync channel"); tracing::debug!("Waiting for new sync info in top sync channel");
while let Some(top_sync_info) = peer_sync_watch.next().await { loop {
tracing::info!( check_sync_interval.tick().await;
"New sync info seen, top height: {}, top block hash: {}",
top_sync_info.chain_height,
hex::encode(top_sync_info.top_hash)
);
// The new info could be from a peer giving us a block, so wait a couple seconds to allow the block to tracing::trace!("Checking connected peers to see if we are behind",);
// be added to our blockchain.
sleep(Duration::from_secs(2)).await;
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
if top_sync_info.cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty { if !client_pool.contains_client_with_more_cumulative_difficulty(
tracing::debug!("New peer sync info is not ahead, nothing to do."); raw_blockchain_context.cumulative_difficulty,
) {
continue; continue;
} }
@ -103,8 +103,6 @@ where
} }
} }
} }
Ok(())
} }
async fn check_update_blockchain_context<C>( async fn check_update_blockchain_context<C>(

View file

@ -16,6 +16,7 @@ mod config;
mod constants; mod constants;
mod p2p; mod p2p;
mod rpc; mod rpc;
mod signals;
mod statics; mod statics;
mod txpool; mod txpool;

View file

@ -1,5 +1,5 @@
use bytes::Bytes; use bytes::Bytes;
use cuprate_p2p_core::{ProtocolRequest, ProtocolResponse}; use cuprate_p2p_core::{NetworkZone, ProtocolRequest, ProtocolResponse};
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::FutureExt; use futures::FutureExt;
use monero_serai::block::Block; use monero_serai::block::Block;
@ -16,7 +16,8 @@ use cuprate_fixed_bytes::ByteArray;
use cuprate_helper::asynch::rayon_spawn_async; use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_helper::cast::usize_to_u64; use cuprate_helper::cast::usize_to_u64;
use cuprate_helper::map::split_u128_into_low_high_bits; use cuprate_helper::map::split_u128_into_low_high_bits;
use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN}; use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN;
use cuprate_p2p_core::client::PeerInformation;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::BlockCompleteEntry; use cuprate_types::BlockCompleteEntry;
use cuprate_wire::protocol::{ use cuprate_wire::protocol::{
@ -25,11 +26,41 @@ use cuprate_wire::protocol::{
}; };
#[derive(Clone)] #[derive(Clone)]
pub struct P2pProtocolRequestHandler { pub struct P2pProtocolRequestHandlerMaker {
pub(crate) blockchain_read_handle: BlockchainReadHandle, pub blockchain_read_handle: BlockchainReadHandle,
} }
impl Service<ProtocolRequest> for P2pProtocolRequestHandler { impl<N: NetworkZone> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker {
type Response = P2pProtocolRequestHandler<N>;
type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, peer_information: PeerInformation<N>) -> Self::Future {
// TODO: check peer info.
let blockchain_read_handle = self.blockchain_read_handle.clone();
async {
Ok(P2pProtocolRequestHandler {
peer_information,
blockchain_read_handle,
})
}
.boxed()
}
}
#[derive(Clone)]
pub struct P2pProtocolRequestHandler<N: NetworkZone> {
peer_information: PeerInformation<N>,
blockchain_read_handle: BlockchainReadHandle,
}
impl<Z: NetworkZone> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
type Response = ProtocolResponse; type Response = ProtocolResponse;
type Error = tower::BoxError; type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
@ -38,22 +69,8 @@ impl Service<ProtocolRequest> for P2pProtocolRequestHandler {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, req: ProtocolRequest) -> Self::Future { fn call(&mut self, _: ProtocolRequest) -> Self::Future {
match req { async { Ok(ProtocolResponse::NA) }.boxed()
ProtocolRequest::GetObjects(req) => {
get_objects(self.blockchain_read_handle.clone(), req).boxed()
}
ProtocolRequest::GetChain(req) => {
get_chain(self.blockchain_read_handle.clone(), req).boxed()
}
ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::NewFluffyBlock(block) => {
new_fluffy_block(self.blockchain_read_handle.clone(), block).boxed()
}
ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
}
} }
} }
@ -73,6 +90,9 @@ async fn get_objects(
// de-allocate the backing [`Bytes`] // de-allocate the backing [`Bytes`]
drop(req); drop(req);
return Ok(ProtocolResponse::NA);
/*
let res = blockchain_read_handle let res = blockchain_read_handle
.oneshot(BlockchainReadRequest::BlockCompleteEntries(block_ids)) .oneshot(BlockchainReadRequest::BlockCompleteEntries(block_ids))
.await?; .await?;
@ -91,6 +111,8 @@ async fn get_objects(
missed_ids: missed_ids.into(), missed_ids: missed_ids.into(),
current_blockchain_height: usize_to_u64(current_blockchain_height), current_blockchain_height: usize_to_u64(current_blockchain_height),
})) }))
*/
} }
async fn get_chain( async fn get_chain(
@ -100,7 +122,9 @@ async fn get_chain(
if req.block_ids.is_empty() { if req.block_ids.is_empty() {
Err("No block hashes sent in a `ChainRequest`")?; Err("No block hashes sent in a `ChainRequest`")?;
} }
return Ok(ProtocolResponse::NA);
/*
if req.block_ids.len() > MAX_BLOCKCHAIN_SUPPLEMENT_LEN { if req.block_ids.len() > MAX_BLOCKCHAIN_SUPPLEMENT_LEN {
Err("Too many block hashes in a `ChainRequest`")?; Err("Too many block hashes in a `ChainRequest`")?;
} }
@ -136,6 +160,8 @@ async fn get_chain(
m_block_weights: vec![], m_block_weights: vec![],
first_block: first_missing_block.map_or(Bytes::new(), Bytes::from), first_block: first_missing_block.map_or(Bytes::new(), Bytes::from),
})) }))
*/
} }
async fn new_fluffy_block( async fn new_fluffy_block(

View file

@ -37,6 +37,7 @@ pub use context::{
pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse}; pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse};
// re-export. // re-export.
pub use cuprate_consensus_rules::genesis::generate_genesis_block;
pub use cuprate_types::{ pub use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse}, blockchain::{BlockchainReadRequest, BlockchainResponse},
HardFork, HardFork,
@ -68,13 +69,10 @@ pub enum ExtendedConsensusError {
pub fn initialize_verifier<D, Ctx>( pub fn initialize_verifier<D, Ctx>(
database: D, database: D,
ctx_svc: Ctx, ctx_svc: Ctx,
) -> Result< ) -> (
(
BlockVerifierService<Ctx, TxVerifierService<D>, D>, BlockVerifierService<Ctx, TxVerifierService<D>, D>,
TxVerifierService<D>, TxVerifierService<D>,
), )
ConsensusError,
>
where where
D: Database + Clone + Send + Sync + 'static, D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static, D::Future: Send + 'static,
@ -90,7 +88,7 @@ where
{ {
let tx_svc = TxVerifierService::new(database.clone()); let tx_svc = TxVerifierService::new(database.clone());
let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database); let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database);
Ok((block_svc, tx_svc)) (block_svc, tx_svc)
} }
use __private::Database; use __private::Database;

View file

@ -71,6 +71,7 @@ impl TryFrom<ProtocolResponse> for ProtocolMessage {
ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val), ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val), ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val),
ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val), ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val),
ProtocolResponse::FluffyMissingTxs(val) => Self::FluffyMissingTransactionsRequest(val),
ProtocolResponse::NA => return Err(MessageConversionError), ProtocolResponse::NA => return Err(MessageConversionError),
}) })
} }

View file

@ -153,6 +153,19 @@ impl<N: NetworkZone> ClientPool<N> {
self.borrow_clients(&peers).collect() self.borrow_clients(&peers).collect()
} }
pub fn contains_client_with_more_cumulative_difficulty(
&self,
cumulative_difficulty: u128,
) -> bool {
self.clients
.iter()
.find(|element| {
let sync_data = element.value().info.core_sync_data.lock().unwrap();
sync_data.cumulative_difficulty() > cumulative_difficulty
})
.is_some()
}
} }
mod sealed { mod sealed {

View file

@ -23,7 +23,7 @@ pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24); pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
/// The durations of a long ban. /// The durations of a long ban.
pub(crate) const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7); pub const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7);
/// The default amount of time between inbound diffusion flushes. /// The default amount of time between inbound diffusion flushes.
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5); pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);
@ -53,7 +53,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
/// The enforced maximum amount of blocks to request in a batch. /// The enforced maximum amount of blocks to request in a batch.
/// ///
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans. /// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100; pub const MAX_BLOCK_BATCH_LEN: usize = 100;
/// The timeout that the block downloader will use for requests. /// The timeout that the block downloader will use for requests.
pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);

View file

@ -174,9 +174,8 @@ impl<N: NetworkZone> NetworkInterface<N> {
self.address_book.clone() self.address_book.clone()
} }
/// Pulls a client from the client pool, returning it in a guard that will return it there when it's /// TODO
/// dropped. pub fn client_pool(&self) -> &Arc<client_pool::ClientPool<N>> {
pub fn borrow_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<ClientPoolDropGuard<N>> { &self.pool
self.pool.borrow_client(peer)
} }
} }

View file

@ -35,7 +35,6 @@ serde = { workspace = true, optional = true }
tower = { workspace = true } tower = { workspace = true }
thread_local = { workspace = true, optional = true } thread_local = { workspace = true, optional = true }
rayon = { workspace = true, optional = true } rayon = { workspace = true, optional = true }
bytes = "1.6.0"
[dev-dependencies] [dev-dependencies]
cuprate-helper = { path = "../../helper", features = ["thread", "cast"] } cuprate-helper = { path = "../../helper", features = ["thread", "cast"] }