mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-08 20:09:44 +00:00
add saving cache to disk.
This commit is contained in:
parent
6bfc4da4e4
commit
edccf83481
11 changed files with 213 additions and 169 deletions
|
@ -20,7 +20,9 @@ binaries = [
|
|||
"dep:serde_json",
|
||||
"dep:serde",
|
||||
"dep:monero-epee-bin-serde",
|
||||
"dep:monero-wire"
|
||||
"dep:monero-wire",
|
||||
"dep:bincode",
|
||||
"dep:dirs"
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
|
@ -50,6 +52,8 @@ monero-epee-bin-serde = {git = "https://github.com/monero-rs/monero-epee-bin-ser
|
|||
serde_json = {version = "1", optional = true}
|
||||
serde = {version = "1", optional = true, features = ["derive"]}
|
||||
tracing-subscriber = {version = "0.3", optional = true}
|
||||
bincode = {version = "2.0.0-rc.3", optional = true}
|
||||
dirs = {version="5.0", optional = true}
|
||||
# here to help cargo to pick a version - remove me
|
||||
syn = "2.0.37"
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ use std::collections::HashMap;
|
|||
use std::fmt::{Display, Formatter};
|
||||
use std::io::Read;
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
|
@ -42,7 +43,7 @@ async fn call_batch<D: Database>(
|
|||
|
||||
async fn scan_chain<D>(
|
||||
cache: Arc<RwLock<ScanningCache>>,
|
||||
network: Network,
|
||||
save_file: PathBuf,
|
||||
rpc_config: Arc<RwLock<RpcConfig>>,
|
||||
mut database: D,
|
||||
) -> Result<(), tower::BoxError>
|
||||
|
@ -65,7 +66,7 @@ where
|
|||
let start_height = cache.read().unwrap().height;
|
||||
|
||||
tracing::info!(
|
||||
"Initialised verifier, begging scan from {} to {}",
|
||||
"Initialised verifier, beginning scan from {} to {}",
|
||||
start_height,
|
||||
chain_height
|
||||
);
|
||||
|
@ -175,6 +176,7 @@ where
|
|||
long_term_weight: verified_block_info.long_term_weight,
|
||||
vote: verified_block_info.hf_vote,
|
||||
generated_coins: verified_block_info.generated_coins,
|
||||
cumulative_difficulty: verified_block_info.cumulative_difficulty,
|
||||
})
|
||||
.await?;
|
||||
|
||||
|
@ -182,6 +184,11 @@ where
|
|||
|
||||
current_height += 1;
|
||||
next_batch_start_height += 1;
|
||||
|
||||
if current_height % 500 == 0 {
|
||||
tracing::info!("Saving cache to: {}", save_file.display());
|
||||
cache.write().unwrap().save(&save_file)?;
|
||||
}
|
||||
}
|
||||
|
||||
time_to_verify_last_batch = time_to_verify_batch.elapsed().as_millis();
|
||||
|
@ -193,11 +200,14 @@ where
|
|||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(LevelFilter::INFO)
|
||||
.with_max_level(LevelFilter::DEBUG)
|
||||
.init();
|
||||
|
||||
let network = Network::Mainnet;
|
||||
|
||||
let mut file_for_cache = dirs::cache_dir().unwrap();
|
||||
file_for_cache.push("cuprate_rpc_scanning_cache.bin");
|
||||
|
||||
let urls = vec![
|
||||
"http://xmr-node.cakewallet.com:18081".to_string(),
|
||||
"http://node.sethforprivacy.com".to_string(),
|
||||
|
@ -230,26 +240,32 @@ async fn main() {
|
|||
);
|
||||
let rpc_config = Arc::new(RwLock::new(rpc_config));
|
||||
|
||||
let cache = Arc::new(RwLock::new(ScanningCache::default()));
|
||||
let cache = match ScanningCache::load(&file_for_cache) {
|
||||
Ok(cache) => {
|
||||
tracing::info!("Reloaded from cache, chain height: {}", cache.height);
|
||||
Arc::new(RwLock::new(cache))
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::info!("Couldn't load from cache starting from scratch");
|
||||
let mut cache = ScanningCache::default();
|
||||
let genesis = monero_consensus::genesis::generate_genesis_block(&network);
|
||||
|
||||
let mut cache_write = cache.write().unwrap();
|
||||
let total_outs = genesis
|
||||
.miner_tx
|
||||
.prefix
|
||||
.outputs
|
||||
.iter()
|
||||
.map(|out| out.amount.unwrap_or(0))
|
||||
.sum::<u64>();
|
||||
|
||||
if cache_write.height == 0 {
|
||||
let genesis = monero_consensus::genesis::generate_genesis_block(&network);
|
||||
|
||||
let total_outs = genesis
|
||||
.miner_tx
|
||||
.prefix
|
||||
.outputs
|
||||
.iter()
|
||||
.map(|out| out.amount.unwrap_or(0))
|
||||
.sum::<u64>();
|
||||
|
||||
cache_write.add_new_block_data(total_outs, &genesis.miner_tx, &[]);
|
||||
}
|
||||
drop(cache_write);
|
||||
cache.add_new_block_data(total_outs, &genesis.miner_tx, &[]);
|
||||
Arc::new(RwLock::new(cache))
|
||||
}
|
||||
};
|
||||
|
||||
let rpc = init_rpc_load_balancer(urls, cache.clone(), rpc_config.clone());
|
||||
|
||||
scan_chain(cache, network, rpc_config, rpc).await.unwrap();
|
||||
scan_chain(cache, file_for_cache, rpc_config, rpc)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ use crate::{
|
|||
ConsensusError, HardFork,
|
||||
};
|
||||
|
||||
//mod checks;
|
||||
mod hash_worker;
|
||||
mod miner_tx;
|
||||
|
||||
|
@ -29,6 +30,7 @@ pub struct VerifiedBlockInformation {
|
|||
pub generated_coins: u64,
|
||||
pub weight: usize,
|
||||
pub long_term_weight: usize,
|
||||
pub cumulative_difficulty: u128,
|
||||
}
|
||||
|
||||
pub enum VerifyBlockRequest {
|
||||
|
@ -117,13 +119,13 @@ where
|
|||
C::Future: Send + 'static,
|
||||
Tx: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
|
||||
{
|
||||
tracing::info!("getting blockchain context");
|
||||
tracing::debug!("getting blockchain context");
|
||||
let context = context_svc
|
||||
.oneshot(BlockChainContextRequest)
|
||||
.await
|
||||
.map_err(Into::<ConsensusError>::into)?;
|
||||
|
||||
tracing::info!("got blockchain context: {:?}", context);
|
||||
tracing::debug!("got blockchain context: {:?}", context);
|
||||
|
||||
let txs = if !txs.is_empty() {
|
||||
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier_svc
|
||||
|
@ -174,7 +176,8 @@ where
|
|||
generated_coins,
|
||||
weight: block_weight,
|
||||
height: context.chain_height,
|
||||
long_term_weight: 0,
|
||||
long_term_weight: context.next_block_long_term_weight(block_weight),
|
||||
hf_vote: HardFork::V1,
|
||||
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,20 +1,13 @@
|
|||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use crypto_bigint::U256;
|
||||
use crypto_bigint::{CheckedMul, U256};
|
||||
use futures::stream::{FuturesOrdered, StreamExt};
|
||||
use monero_serai::{
|
||||
block::Block,
|
||||
transaction::{Timelock, Transaction},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
helper::current_time, transactions::TransactionVerificationData, ConsensusError, Database,
|
||||
HardFork,
|
||||
};
|
||||
|
||||
mod checks;
|
||||
mod hash_worker;
|
||||
pub mod reward;
|
||||
use crate::{helper::current_time, ConsensusError, Database, HardFork};
|
||||
|
||||
const BLOCK_SIZE_SANITY_LEEWAY: usize = 100;
|
||||
const BLOCK_FUTURE_TIME_LIMIT: u64 = 60 * 60 * 2;
|
||||
|
|
|
@ -27,8 +27,6 @@ pub fn calculate_block_reward(
|
|||
already_generated_coins: u64,
|
||||
hf: &HardFork,
|
||||
) -> u64 {
|
||||
tracing::info!("bw: {} median: {}", block_weight, median_bw);
|
||||
|
||||
let base_reward: u128 = calculate_base_reward(already_generated_coins, hf).into();
|
||||
|
||||
if block_weight <= median_bw {
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
use std::{
|
||||
future::Future,
|
||||
ops::{Deref, DerefMut},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
|
@ -14,8 +15,7 @@ use std::{
|
|||
|
||||
use futures::FutureExt;
|
||||
use tokio::sync::RwLock;
|
||||
use tower::buffer::future::ResponseFuture;
|
||||
use tower::{buffer::Buffer, Service, ServiceExt};
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use crate::{ConsensusError, Database, DatabaseRequest, DatabaseResponse};
|
||||
|
||||
|
@ -27,7 +27,7 @@ pub use difficulty::DifficultyCacheConfig;
|
|||
pub use hardforks::{HardFork, HardForkConfig};
|
||||
pub use weight::BlockWeightsCacheConfig;
|
||||
|
||||
const BUFFER_CONTEXT_CHANNEL_SIZE: usize = 5;
|
||||
const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: usize = 60;
|
||||
|
||||
pub struct ContextConfig {
|
||||
hard_fork_cfg: HardForkConfig,
|
||||
|
@ -111,19 +111,22 @@ where
|
|||
});
|
||||
|
||||
let context_svc = BlockChainContextService {
|
||||
difficulty_cache: Arc::new(difficulty_cache_handle.await.unwrap()?.into()),
|
||||
weight_cache: Arc::new(weight_cache_handle.await.unwrap()?.into()),
|
||||
hardfork_state: Arc::new(hardfork_state_handle.await.unwrap()?.into()),
|
||||
chain_height: Arc::new(chain_height.into()),
|
||||
already_generated_coins: Arc::new(already_generated_coins.into()),
|
||||
top_block_hash: Arc::new(top_block_hash.into()),
|
||||
internal_blockchain_context: Arc::new(
|
||||
InternalBlockChainContext {
|
||||
difficulty_cache: difficulty_cache_handle.await.unwrap()?,
|
||||
weight_cache: weight_cache_handle.await.unwrap()?,
|
||||
hardfork_state: hardfork_state_handle.await.unwrap()?,
|
||||
chain_height,
|
||||
already_generated_coins,
|
||||
top_block_hash,
|
||||
}
|
||||
.into(),
|
||||
),
|
||||
};
|
||||
|
||||
let context_svc_update = context_svc.clone();
|
||||
|
||||
let buffered_svc = Buffer::new(context_svc.boxed(), BUFFER_CONTEXT_CHANNEL_SIZE);
|
||||
|
||||
Ok((buffered_svc.clone(), context_svc_update))
|
||||
Ok((context_svc_update.clone(), context_svc_update))
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
|
@ -141,32 +144,58 @@ pub struct BlockChainContext {
|
|||
/// The amount of coins minted already.
|
||||
pub already_generated_coins: u64,
|
||||
/// Timestamp to use to check time locked outputs.
|
||||
time_lock_timestamp: u64,
|
||||
pub time_lock_timestamp: u64,
|
||||
/// The median timestamp over the last [`BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW`] blocks, will be None if there aren't
|
||||
/// [`BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW`] blocks.
|
||||
pub median_block_timestamp: Option<u64>,
|
||||
/// The height of the chain.
|
||||
pub chain_height: u64,
|
||||
/// The top blocks hash
|
||||
top_hash: [u8; 32],
|
||||
pub top_hash: [u8; 32],
|
||||
/// The current hard fork.
|
||||
pub current_hard_fork: HardFork,
|
||||
}
|
||||
|
||||
impl BlockChainContext {
|
||||
pub fn block_blob_size_limit(&self) -> usize {
|
||||
self.effective_median_weight * 2 - 600
|
||||
}
|
||||
|
||||
pub fn block_weight_limit(&self) -> usize {
|
||||
self.median_weight_for_block_reward * 2
|
||||
}
|
||||
|
||||
pub fn next_block_long_term_weight(&self, block_weight: usize) -> usize {
|
||||
weight::calculate_block_long_term_weight(
|
||||
&self.current_hard_fork,
|
||||
block_weight,
|
||||
self.median_long_term_weight,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockChainContextRequest;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BlockChainContextService {
|
||||
difficulty_cache: Arc<RwLock<difficulty::DifficultyCache>>,
|
||||
weight_cache: Arc<RwLock<weight::BlockWeightsCache>>,
|
||||
hardfork_state: Arc<RwLock<hardforks::HardForkState>>,
|
||||
struct InternalBlockChainContext {
|
||||
difficulty_cache: difficulty::DifficultyCache,
|
||||
weight_cache: weight::BlockWeightsCache,
|
||||
hardfork_state: hardforks::HardForkState,
|
||||
|
||||
chain_height: Arc<RwLock<u64>>,
|
||||
top_block_hash: Arc<RwLock<[u8; 32]>>,
|
||||
already_generated_coins: Arc<RwLock<u64>>,
|
||||
chain_height: u64,
|
||||
top_block_hash: [u8; 32],
|
||||
already_generated_coins: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BlockChainContextService {
|
||||
internal_blockchain_context: Arc<RwLock<InternalBlockChainContext>>,
|
||||
}
|
||||
|
||||
impl Service<BlockChainContextRequest> for BlockChainContextService {
|
||||
type Response = BlockChainContext;
|
||||
type Error = ConsensusError;
|
||||
type Error = tower::BoxError;
|
||||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
||||
|
@ -175,18 +204,19 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
|
|||
}
|
||||
|
||||
fn call(&mut self, _: BlockChainContextRequest) -> Self::Future {
|
||||
let hardfork_state = self.hardfork_state.clone();
|
||||
let difficulty_cache = self.difficulty_cache.clone();
|
||||
let weight_cache = self.weight_cache.clone();
|
||||
|
||||
let chain_height = self.chain_height.clone();
|
||||
let top_hash = self.top_block_hash.clone();
|
||||
let already_generated_coins = self.already_generated_coins.clone();
|
||||
let internal_blockchain_context = self.internal_blockchain_context.clone();
|
||||
|
||||
async move {
|
||||
let hardfork_state = hardfork_state.read().await;
|
||||
let difficulty_cache = difficulty_cache.read().await;
|
||||
let weight_cache = weight_cache.read().await;
|
||||
let internal_blockchain_context_lock = internal_blockchain_context.read().await;
|
||||
|
||||
let InternalBlockChainContext {
|
||||
difficulty_cache,
|
||||
weight_cache,
|
||||
hardfork_state,
|
||||
chain_height,
|
||||
top_block_hash,
|
||||
already_generated_coins,
|
||||
} = internal_blockchain_context_lock.deref();
|
||||
|
||||
let current_hf = hardfork_state.current_hardfork();
|
||||
|
||||
|
@ -196,10 +226,12 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
|
|||
effective_median_weight: weight_cache.effective_median_block_weight(¤t_hf),
|
||||
median_long_term_weight: weight_cache.median_long_term_weight(),
|
||||
median_weight_for_block_reward: weight_cache.median_for_block_reward(¤t_hf),
|
||||
already_generated_coins: *already_generated_coins.read().await,
|
||||
already_generated_coins: *already_generated_coins,
|
||||
time_lock_timestamp: 0, //TODO:
|
||||
chain_height: *chain_height.read().await,
|
||||
top_hash: *top_hash.read().await,
|
||||
median_block_timestamp: difficulty_cache
|
||||
.median_timestamp(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW),
|
||||
chain_height: *chain_height,
|
||||
top_hash: *top_block_hash,
|
||||
current_hard_fork: current_hf,
|
||||
})
|
||||
}
|
||||
|
@ -207,6 +239,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: join these services, there is no need for 2.
|
||||
pub struct UpdateBlockchainCacheRequest {
|
||||
pub new_top_hash: [u8; 32],
|
||||
pub height: u64,
|
||||
|
@ -229,36 +262,32 @@ impl tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService {
|
|||
}
|
||||
|
||||
fn call(&mut self, new: UpdateBlockchainCacheRequest) -> Self::Future {
|
||||
let hardfork_state = self.hardfork_state.clone();
|
||||
let difficulty_cache = self.difficulty_cache.clone();
|
||||
let weight_cache = self.weight_cache.clone();
|
||||
|
||||
let chain_height = self.chain_height.clone();
|
||||
let top_hash = self.top_block_hash.clone();
|
||||
let already_generated_coins = self.already_generated_coins.clone();
|
||||
let internal_blockchain_context = self.internal_blockchain_context.clone();
|
||||
|
||||
async move {
|
||||
let mut internal_blockchain_context_lock = internal_blockchain_context.write().await;
|
||||
|
||||
let InternalBlockChainContext {
|
||||
difficulty_cache,
|
||||
weight_cache,
|
||||
hardfork_state,
|
||||
chain_height,
|
||||
top_block_hash,
|
||||
already_generated_coins,
|
||||
} = internal_blockchain_context_lock.deref_mut();
|
||||
|
||||
difficulty_cache
|
||||
.write()
|
||||
.await
|
||||
.new_block(new.height, new.timestamp, new.cumulative_difficulty)
|
||||
.await?;
|
||||
|
||||
weight_cache
|
||||
.write()
|
||||
.await
|
||||
.new_block(new.height, new.weight, new.long_term_weight)
|
||||
.await?;
|
||||
|
||||
hardfork_state
|
||||
.write()
|
||||
.await
|
||||
.new_block(new.vote, new.height)
|
||||
.await?;
|
||||
hardfork_state.new_block(new.vote, new.height).await?;
|
||||
|
||||
*chain_height.write().await = new.height + 1;
|
||||
*top_hash.write().await = new.new_top_hash;
|
||||
let mut already_generated_coins = already_generated_coins.write().await;
|
||||
*chain_height = new.height + 1;
|
||||
*top_block_hash = new.new_top_hash;
|
||||
*already_generated_coins = already_generated_coins.saturating_add(new.generated_coins);
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -168,15 +168,15 @@ impl DifficultyCache {
|
|||
|
||||
/// Returns the median timestamp over the last `numb_blocks`.
|
||||
///
|
||||
/// Will panic if `numb_blocks` is larger than amount of blocks in the cache.
|
||||
pub fn median_timestamp(&self, numb_blocks: usize) -> u64 {
|
||||
median(
|
||||
/// Will return [`None`] if there aren't enough blocks.
|
||||
pub fn median_timestamp(&self, numb_blocks: usize) -> Option<u64> {
|
||||
Some(median(
|
||||
&self
|
||||
.timestamps
|
||||
.range(self.timestamps.len().checked_sub(numb_blocks).unwrap()..)
|
||||
.range(self.timestamps.len().checked_sub(numb_blocks)?..)
|
||||
.copied()
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns the cumulative difficulty of the chain.
|
||||
|
|
|
@ -13,6 +13,7 @@ use std::{
|
|||
};
|
||||
|
||||
use monero_serai::{block::Block, transaction::Transaction};
|
||||
use rayon::prelude::*;
|
||||
use tower::ServiceExt;
|
||||
use tracing::instrument;
|
||||
|
||||
|
@ -87,10 +88,8 @@ impl BlockWeightsCacheConfig {
|
|||
/// this data it reduces the load on the database.
|
||||
#[derive(Clone)]
|
||||
pub struct BlockWeightsCache {
|
||||
/// This list is not sorted.
|
||||
short_term_block_weights: VecDeque<usize>,
|
||||
/// This list is sorted.
|
||||
long_term_weights: Vec<usize>,
|
||||
long_term_weights: VecDeque<usize>,
|
||||
/// The height of the top block.
|
||||
tip_height: u64,
|
||||
|
||||
|
@ -124,17 +123,12 @@ impl BlockWeightsCache {
|
|||
) -> Result<Self, ConsensusError> {
|
||||
tracing::info!("Initializing weight cache this may take a while.");
|
||||
|
||||
let mut long_term_weights = get_long_term_weight_in_range(
|
||||
let long_term_weights: VecDeque<usize> = get_long_term_weight_in_range(
|
||||
chain_height.saturating_sub(config.long_term_window)..chain_height,
|
||||
database.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
long_term_weights.sort_unstable();
|
||||
tracing::debug!(
|
||||
"Sorted long term weights with length: {}",
|
||||
long_term_weights.len()
|
||||
);
|
||||
.await?
|
||||
.into();
|
||||
|
||||
let short_term_block_weights: VecDeque<usize> = get_blocks_weight_in_range(
|
||||
chain_height.saturating_sub(config.short_term_window)..chain_height,
|
||||
|
@ -157,66 +151,41 @@ impl BlockWeightsCache {
|
|||
///
|
||||
/// The block_height **MUST** be one more than the last height the cache has
|
||||
/// seen.
|
||||
pub async fn new_block<D: Database>(
|
||||
pub async fn new_block(
|
||||
&mut self,
|
||||
block_height: u64,
|
||||
block_weight: usize,
|
||||
long_term_weight: usize,
|
||||
database: D,
|
||||
) -> Result<(), ConsensusError> {
|
||||
assert_eq!(self.tip_height + 1, block_height);
|
||||
self.tip_height += 1;
|
||||
tracing::debug!(
|
||||
"Adding new block's {} weights to block cache, weight: {}, long term weight: {}",
|
||||
block_weight,
|
||||
self.tip_height,
|
||||
block_weight,
|
||||
long_term_weight
|
||||
);
|
||||
assert_eq!(self.tip_height + 1, block_height);
|
||||
self.tip_height += 1;
|
||||
|
||||
match self.long_term_weights.binary_search(&long_term_weight) {
|
||||
Ok(idx) | Err(idx) => self.long_term_weights.insert(idx, long_term_weight),
|
||||
};
|
||||
|
||||
self.long_term_weights.push_back(long_term_weight);
|
||||
if u64::try_from(self.long_term_weights.len()).unwrap() > self.config.long_term_window {
|
||||
if let Some(height_to_remove) = block_height.checked_sub(self.config.long_term_window) {
|
||||
tracing::debug!(
|
||||
"Block {} is out of the long term weight window, removing it",
|
||||
height_to_remove
|
||||
);
|
||||
let DatabaseResponse::BlockExtendedHeader(ext_header) = database
|
||||
.oneshot(DatabaseRequest::BlockExtendedHeader(
|
||||
height_to_remove.into(),
|
||||
))
|
||||
.await?
|
||||
else {
|
||||
panic!("Database sent incorrect response!");
|
||||
};
|
||||
let idx = self
|
||||
.long_term_weights
|
||||
.binary_search(&ext_header.long_term_weight)
|
||||
.expect("Weight must be in list if in the window");
|
||||
self.long_term_weights.remove(idx);
|
||||
}
|
||||
self.long_term_weights.pop_front();
|
||||
}
|
||||
|
||||
self.short_term_block_weights.push_back(block_weight);
|
||||
if self.short_term_block_weights.len() > self.config.short_term_window.try_into().unwrap() {
|
||||
if u64::try_from(self.short_term_block_weights.len()).unwrap()
|
||||
> self.config.short_term_window
|
||||
{
|
||||
self.short_term_block_weights.pop_front();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the next blocks long term weight.
|
||||
///
|
||||
/// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#calculating-a-blocks-long-term-weight
|
||||
pub fn next_block_long_term_weight(&self, hf: &HardFork, block_weight: usize) -> usize {
|
||||
calculate_block_long_term_weight(hf, block_weight, &self.long_term_weights)
|
||||
}
|
||||
|
||||
/// Returns the median long term weight over the last [`LONG_TERM_WINDOW`] blocks, or custom amount of blocks in the config.
|
||||
pub fn median_long_term_weight(&self) -> usize {
|
||||
median(&self.long_term_weights)
|
||||
let mut sorted_long_term_weights: Vec<usize> = self.long_term_weights.clone().into();
|
||||
sorted_long_term_weights.par_sort_unstable();
|
||||
median(&sorted_long_term_weights)
|
||||
}
|
||||
|
||||
/// Returns the effective median weight, used for block reward calculations and to calculate
|
||||
|
@ -226,11 +195,16 @@ impl BlockWeightsCache {
|
|||
pub fn effective_median_block_weight(&self, hf: &HardFork) -> usize {
|
||||
let mut sorted_short_term_weights: Vec<usize> =
|
||||
self.short_term_block_weights.clone().into();
|
||||
sorted_short_term_weights.sort_unstable();
|
||||
sorted_short_term_weights.par_sort_unstable();
|
||||
|
||||
/// TODO: this sometimes takes a while (>5s)
|
||||
let mut sorted_long_term_weights: Vec<usize> = self.long_term_weights.clone().into();
|
||||
sorted_long_term_weights.par_sort_unstable();
|
||||
|
||||
calculate_effective_median_block_weight(
|
||||
hf,
|
||||
&sorted_short_term_weights,
|
||||
&self.long_term_weights,
|
||||
&sorted_long_term_weights,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -276,16 +250,16 @@ fn calculate_effective_median_block_weight(
|
|||
effective_median.max(penalty_free_zone(hf))
|
||||
}
|
||||
|
||||
fn calculate_block_long_term_weight(
|
||||
pub fn calculate_block_long_term_weight(
|
||||
hf: &HardFork,
|
||||
block_weight: usize,
|
||||
sorted_long_term_window: &[usize],
|
||||
long_term_median: usize,
|
||||
) -> usize {
|
||||
if hf.in_range(&HardFork::V1, &HardFork::V10) {
|
||||
return block_weight;
|
||||
}
|
||||
|
||||
let long_term_median = max(penalty_free_zone(hf), median(sorted_long_term_window));
|
||||
let long_term_median = max(penalty_free_zone(hf), long_term_median);
|
||||
|
||||
let (short_term_constraint, adjusted_block_weight) =
|
||||
if hf.in_range(&HardFork::V10, &HardFork::V15) {
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::ops::Range;
|
|||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture};
|
||||
use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
|
||||
|
@ -84,11 +85,12 @@ pub fn init_rpc_load_balancer(
|
|||
let (rpc_discoverer_tx, rpc_discoverer_rx) = futures::channel::mpsc::channel(30);
|
||||
|
||||
let rpc_balance = Balance::new(rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok));
|
||||
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 30);
|
||||
//let rpcs = tower::retry::Retry::new(Attempts(2), rpc_buffer);
|
||||
let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(120));
|
||||
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(timeout), 30);
|
||||
let rpcs = tower::retry::Retry::new(Attempts(3), rpc_buffer);
|
||||
|
||||
let discover = discover::RPCDiscover {
|
||||
rpc: rpc_buffer.clone(),
|
||||
rpc: rpcs.clone(),
|
||||
initial_list: addresses,
|
||||
ok_channel: rpc_discoverer_tx,
|
||||
already_connected: Default::default(),
|
||||
|
@ -97,10 +99,7 @@ pub fn init_rpc_load_balancer(
|
|||
|
||||
tokio::spawn(discover.run());
|
||||
|
||||
RpcBalancer {
|
||||
rpcs: rpc_buffer,
|
||||
config,
|
||||
}
|
||||
RpcBalancer { rpcs, config }
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
|
|
@ -1,33 +1,52 @@
|
|||
use std::io::Read;
|
||||
use std::path::Path;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::{Display, Formatter},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use monero_serai::{
|
||||
block::Block,
|
||||
transaction::{Timelock, Transaction},
|
||||
};
|
||||
use bincode::{Decode, Encode};
|
||||
use monero_serai::transaction::{Timelock, Transaction};
|
||||
use tracing_subscriber::fmt::MakeWriter;
|
||||
|
||||
use cuprate_common::Network;
|
||||
|
||||
use crate::transactions::TransactionVerificationData;
|
||||
use cuprate_common::Network;
|
||||
|
||||
/// A cache which can keep chain state while scanning.
|
||||
///
|
||||
/// Because we are using a RPC interface with a node we need to keep track
|
||||
/// of certain data that the node doesn't hold or give us like the number
|
||||
/// of outputs at a certain time.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
#[derive(Debug, Default, Clone, Encode, Decode)]
|
||||
pub struct ScanningCache {
|
||||
network: Network,
|
||||
// network: u8,
|
||||
numb_outs: HashMap<u64, u64>,
|
||||
time_locked_out: HashMap<[u8; 32], Timelock>,
|
||||
time_locked_out: HashMap<[u8; 32], u64>,
|
||||
pub already_generated_coins: u64,
|
||||
/// The height of the *next* block to scan.
|
||||
pub height: u64,
|
||||
}
|
||||
|
||||
impl ScanningCache {
|
||||
pub fn save(&self, file: &Path) -> Result<(), tower::BoxError> {
|
||||
let file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.create(true)
|
||||
.open(file)?;
|
||||
let mut writer = file.make_writer();
|
||||
bincode::encode_into_std_write(self, &mut writer, bincode::config::standard())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn load(file: &Path) -> Result<ScanningCache, tower::BoxError> {
|
||||
let mut file = std::fs::OpenOptions::new().read(true).open(file)?;
|
||||
|
||||
bincode::decode_from_std_read(&mut file, bincode::config::standard()).map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn add_new_block_data(
|
||||
&mut self,
|
||||
generated_coins: u64,
|
||||
|
@ -55,17 +74,26 @@ impl ScanningCache {
|
|||
}
|
||||
|
||||
pub fn outputs_time_lock(&self, tx: &[u8; 32]) -> Timelock {
|
||||
self.time_locked_out
|
||||
.get(tx)
|
||||
.copied()
|
||||
.unwrap_or(Timelock::None)
|
||||
let time_lock = self.time_locked_out.get(tx).copied().unwrap_or(0);
|
||||
match time_lock {
|
||||
0 => Timelock::None,
|
||||
block if block < 500_000_000 => Timelock::Block(block as usize),
|
||||
time => Timelock::Time(time),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_tx_time_lock(&mut self, tx: [u8; 32], time_lock: Timelock) {
|
||||
match time_lock {
|
||||
Timelock::None => (),
|
||||
lock => {
|
||||
self.time_locked_out.insert(tx, lock);
|
||||
self.time_locked_out.insert(
|
||||
tx,
|
||||
match lock {
|
||||
Timelock::None => unreachable!(),
|
||||
Timelock::Block(x) => x as u64,
|
||||
Timelock::Time(x) => x,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ mod inputs;
|
|||
pub(crate) mod outputs;
|
||||
mod ring;
|
||||
mod sigs;
|
||||
mod time_lock;
|
||||
//mod time_lock;
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
pub enum TxVersion {
|
||||
|
@ -194,7 +194,7 @@ async fn verify_transactions_for_block<D>(
|
|||
where
|
||||
D: Database + Clone + Sync + Send + 'static,
|
||||
{
|
||||
tracing::info!("Verifying transactions for block, amount: {}", txs.len());
|
||||
tracing::debug!("Verifying transactions for block, amount: {}", txs.len());
|
||||
|
||||
set_missing_ring_members(database, &txs, &hf).await?;
|
||||
|
||||
|
|
Loading…
Reference in a new issue