consensus: fix batch handling when we don't have a full list of outputs. (#54)

* consensus: fix batch handling when we don't have a full list of outputs.

* change `scan_chain` to new API

* clippy

* add a test for calculating multiple difficulties

* fmt

* rx_seed -> rx_vms

* consensus-rules: ring members younger than current block.

* only create rx vms when required.

* fix rx initiation when syncing

* add single block verification (no batch)

* update serai
This commit is contained in:
Boog900 2024-02-13 00:51:11 +00:00 committed by GitHub
parent 630faed263
commit b7df133175
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1292 additions and 964 deletions

214
Cargo.lock generated
View file

@ -30,9 +30,9 @@ dependencies = [
[[package]]
name = "ahash"
version = "0.8.7"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01"
checksum = "42cd52102d3df161c77a887b608d7a4897d7cc112886a9537b738a887a03aaff"
dependencies = [
"cfg-if",
"once_cell",
@ -312,9 +312,9 @@ checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e"
[[package]]
name = "chrono"
version = "0.4.33"
version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b"
dependencies = [
"android-tzdata",
"iana-time-zone",
@ -420,9 +420,9 @@ dependencies = [
[[package]]
name = "crc32fast"
version = "1.3.2"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa"
dependencies = [
"cfg-if",
]
@ -592,7 +592,7 @@ dependencies = [
[[package]]
name = "dalek-ff-group"
version = "0.4.1"
source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930"
source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629"
dependencies = [
"crypto-bigint",
"curve25519-dalek",
@ -662,7 +662,7 @@ dependencies = [
[[package]]
name = "dleq"
version = "0.4.1"
source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930"
source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629"
dependencies = [
"digest",
"ff",
@ -676,9 +676,9 @@ dependencies = [
[[package]]
name = "either"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a"
[[package]]
name = "encoding_rs"
@ -795,7 +795,7 @@ dependencies = [
[[package]]
name = "flexible-transcript"
version = "0.3.2"
source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930"
source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629"
dependencies = [
"blake2",
"digest",
@ -967,8 +967,8 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap 2.2.2",
"http 0.2.11",
"indexmap 2.2.3",
"slab",
"tokio",
"tokio-util",
@ -1034,6 +1034,17 @@ dependencies = [
"itoa",
]
[[package]]
name = "http"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
@ -1041,7 +1052,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http",
"http 0.2.11",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
dependencies = [
"bytes",
"http 1.0.0",
]
[[package]]
name = "http-body-util"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840"
dependencies = [
"bytes",
"futures-util",
"http 1.0.0",
"http-body 1.0.0",
"pin-project-lite",
]
@ -1068,8 +1102,8 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"http 0.2.11",
"http-body 0.4.6",
"httparse",
"httpdate",
"itoa",
@ -1082,18 +1116,39 @@ dependencies = [
]
[[package]]
name = "hyper-rustls"
version = "0.24.2"
name = "hyper"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.0.0",
"http-body 1.0.0",
"httparse",
"itoa",
"pin-project-lite",
"tokio",
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
dependencies = [
"futures-util",
"http",
"hyper",
"http 1.0.0",
"hyper 1.1.0",
"hyper-util",
"rustls",
"rustls-native-certs",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tower-service",
]
[[package]]
@ -1103,12 +1158,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"hyper 0.14.28",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "hyper-util"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.0.0",
"http-body 1.0.0",
"hyper 1.1.0",
"pin-project-lite",
"socket2",
"tokio",
"tower",
"tower-service",
"tracing",
]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
@ -1154,9 +1229,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.2.2"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520"
checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177"
dependencies = [
"equivalent",
"hashbrown 0.14.3",
@ -1368,7 +1443,7 @@ dependencies = [
[[package]]
name = "monero-generators"
version = "0.4.0"
source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930"
source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629"
dependencies = [
"curve25519-dalek",
"dalek-ff-group",
@ -1410,7 +1485,7 @@ dependencies = [
[[package]]
name = "monero-serai"
version = "0.1.4-alpha"
source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930"
source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629"
dependencies = [
"async-lock",
"async-trait",
@ -1456,7 +1531,7 @@ dependencies = [
[[package]]
name = "multiexp"
version = "0.4.0"
source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930"
source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629"
dependencies = [
"ff",
"group",
@ -1964,9 +2039,9 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"http 0.2.11",
"http-body 0.4.6",
"hyper 0.14.28",
"hyper-tls",
"ipnet",
"js-sys",
@ -1976,7 +2051,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls-pemfile",
"rustls-pemfile 1.0.4",
"serde",
"serde_json",
"serde_urlencoded",
@ -2036,23 +2111,26 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.10"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"
dependencies = [
"ring",
"rustls-pki-types",
"rustls-webpki",
"sct",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.3"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"rustls-pemfile 2.0.0",
"rustls-pki-types",
"schannel",
"security-framework",
]
@ -2067,12 +2145,29 @@ dependencies = [
]
[[package]]
name = "rustls-webpki"
version = "0.101.7"
name = "rustls-pemfile"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4"
dependencies = [
"base64",
"rustls-pki-types",
]
[[package]]
name = "rustls-pki-types"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf"
[[package]]
name = "rustls-webpki"
version = "0.102.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
@ -2115,16 +2210,6 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sct"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "sealed"
version = "0.5.0"
@ -2262,11 +2347,14 @@ dependencies = [
[[package]]
name = "simple-request"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930"
source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629"
dependencies = [
"hyper",
"http-body-util",
"hyper 1.1.0",
"hyper-rustls",
"hyper-util",
"tokio",
"tower-service",
]
[[package]]
@ -2303,7 +2391,7 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "std-shims"
version = "0.1.1"
source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930"
source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629"
dependencies = [
"hashbrown 0.14.3",
"spin",
@ -2413,18 +2501,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.56"
version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad"
checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.56"
version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471"
checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81"
dependencies = [
"proc-macro2",
"quote",
@ -2526,11 +2614,12 @@ dependencies = [
[[package]]
name = "tokio-rustls"
version = "0.24.1"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
dependencies = [
"rustls",
"rustls-pki-types",
"tokio",
]
@ -2572,7 +2661,7 @@ version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
dependencies = [
"indexmap 2.2.2",
"indexmap 2.2.3",
"toml_datetime",
"winnow",
]
@ -2615,6 +2704,7 @@ version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@ -3021,9 +3111,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
[[package]]
name = "winnow"
version = "0.5.39"
version = "0.5.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5389a154b01683d28c77f8f68f49dea75f0a4da32557a58f68ee51ebba472d29"
checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876"
dependencies = [
"memchr",
]

View file

@ -43,13 +43,13 @@ chrono = { version = "0.4.31", default-features = false }
crypto-bigint = { version = "0.5.5", default-features = false }
crossbeam = { version = "0.8.4", default-features = false }
curve25519-dalek = { version = "4.1.1", default-features = false }
dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "f3429ec1ef386da1458f4ed244402f38e3e12930", default-features = false }
dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false }
dirs = { version = "5.0.1", default-features = false }
futures = { version = "0.3.29", default-features = false }
hex = { version = "0.4.3", default-features = false }
hex-literal = { version = "0.4", default-features = false }
monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "f3429ec1ef386da1458f4ed244402f38e3e12930", default-features = false }
multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "f3429ec1ef386da1458f4ed244402f38e3e12930", default-features = false }
monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false }
multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false }
pin-project = { version = "1.1.3", default-features = false }
randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false }
rand = { version = "0.8.5", default-features = false }

View file

@ -17,7 +17,6 @@ binaries = [
"tower/buffer",
"tower/timeout",
"monero-serai/http-rpc",
"dep:curve25519-dalek",
"dep:tracing-subscriber",
"dep:serde_json",
"dep:serde",
@ -41,6 +40,7 @@ randomx-rs = { workspace = true }
monero-serai = { workspace = true, features = ["std"] }
multiexp = { workspace = true }
dalek-ff-group = { workspace = true }
curve25519-dalek = { workspace = true }
rayon = { workspace = true }
thread_local = { workspace = true }
@ -58,7 +58,6 @@ tracing-subscriber = {version = "0.3", optional = true}
borsh = { workspace = true, optional = true}
dirs = {version="5.0", optional = true}
clap = { version = "4.4.8", optional = true, features = ["derive"] }
curve25519-dalek = { workspace = true, optional = true }
# here to help cargo to pick a version - remove me
syn = "2.0.37"

View file

@ -19,8 +19,8 @@ pub const PENALTY_FREE_ZONE_1: usize = 20000;
pub const PENALTY_FREE_ZONE_2: usize = 60000;
pub const PENALTY_FREE_ZONE_5: usize = 300000;
const RX_SEEDHASH_EPOCH_BLOCKS: u64 = 2048;
const RX_SEEDHASH_EPOCH_LAG: u64 = 64;
pub const RX_SEEDHASH_EPOCH_BLOCKS: u64 = 2048;
pub const RX_SEEDHASH_EPOCH_LAG: u64 = 64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
pub enum BlockError {

View file

@ -62,8 +62,8 @@ pub enum TransactionError {
InputsOverflow,
#[error("The transaction has no inputs.")]
NoInputs,
#[error("Ring member not in database")]
RingMemberNotFound,
#[error("Ring member not in database or is not valid.")]
RingMemberNotFoundOrInvalid,
//-------------------------------------------------------- Ring Signatures
#[error("Ring signature incorrect.")]
RingSignatureIncorrect,
@ -302,6 +302,7 @@ fn check_all_time_locks(
current_time_lock_timestamp,
hf,
) {
tracing::debug!("Transaction invalid: one or more inputs locked, lock: {time_lock:?}.");
Err(TransactionError::OneOrMoreDecoysLocked)
} else {
Ok(())
@ -451,6 +452,9 @@ fn check_10_block_lock(
) -> Result<(), TransactionError> {
if hf >= &HardFork::V12 {
if youngest_used_out_height + 10 > current_chain_height {
tracing::debug!(
"Transaction invalid: One or more ring members younger than 10 blocks."
);
Err(TransactionError::OneOrMoreDecoysLocked)
} else {
Ok(())
@ -517,6 +521,14 @@ fn check_inputs_contextual(
hf: &HardFork,
spent_kis: Arc<std::sync::Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), TransactionError> {
// This rule is not contained in monero-core explicitly, but it is enforced by how Monero picks ring members.
// When picking ring members monerod will only look in the DB at past blocks so an output has to be younger
// than this transaction to be used in this tx.
if tx_ring_members_info.youngest_used_out_height >= current_chain_height {
tracing::debug!("Transaction invalid: One or more ring members too young.");
Err(TransactionError::OneOrMoreDecoysLocked)?;
}
check_10_block_lock(
tx_ring_members_info.youngest_used_out_height,
current_chain_height,

View file

@ -13,8 +13,8 @@ use crate::{transactions::TransactionError, HardFork, TxVersion};
pub struct OutputOnChain {
pub height: u64,
pub time_lock: Timelock,
pub key: EdwardsPoint,
pub mask: EdwardsPoint,
pub key: Option<EdwardsPoint>,
pub commitment: EdwardsPoint,
}
/// Gets the absolute offsets from the relative offsets.
@ -68,7 +68,7 @@ pub fn insert_ring_member_ids(
///
/// Will error if `outputs` does not contain the outputs needed.
pub fn get_ring_members_for_inputs<'a>(
outputs: &'a HashMap<u64, HashMap<u64, OutputOnChain>>,
get_outputs: impl Fn(u64, u64) -> Option<&'a OutputOnChain>,
inputs: &[Input],
) -> Result<Vec<Vec<&'a OutputOnChain>>, TransactionError> {
inputs
@ -83,12 +83,8 @@ pub fn get_ring_members_for_inputs<'a>(
Ok(offsets
.iter()
.map(|offset| {
// get the hashmap for this amount.
outputs
.get(&amount.unwrap_or(0))
// get output at the index from the amount hashmap.
.and_then(|amount_map| amount_map.get(offset))
.ok_or(TransactionError::RingMemberNotFound)
get_outputs(amount.unwrap_or(0), *offset)
.ok_or(TransactionError::RingMemberNotFoundOrInvalid)
})
.collect::<Result<_, TransactionError>>()?)
}
@ -108,13 +104,21 @@ pub enum Rings {
impl Rings {
/// Builds the rings for the transaction inputs, from the given outputs.
fn new(outputs: Vec<Vec<&OutputOnChain>>, tx_version: TxVersion) -> Rings {
match tx_version {
fn new(
outputs: Vec<Vec<&OutputOnChain>>,
tx_version: TxVersion,
) -> Result<Rings, TransactionError> {
Ok(match tx_version {
TxVersion::RingSignatures => Rings::Legacy(
outputs
.into_iter()
.map(|inp_outs| inp_outs.into_iter().map(|out| out.key).collect())
.collect(),
.map(|inp_outs| {
inp_outs
.into_iter()
.map(|out| out.key.ok_or(TransactionError::RingMemberNotFoundOrInvalid))
.collect::<Result<Vec<_>, TransactionError>>()
})
.collect::<Result<Vec<_>, TransactionError>>()?,
),
TxVersion::RingCT => Rings::RingCT(
outputs
@ -122,12 +126,18 @@ impl Rings {
.map(|inp_outs| {
inp_outs
.into_iter()
.map(|out| [out.key, out.mask])
.collect()
.map(|out| {
Ok([
out.key
.ok_or(TransactionError::RingMemberNotFoundOrInvalid)?,
out.commitment,
])
})
.collect(),
.collect::<Result<_, TransactionError>>()
})
.collect::<Result<_, _>>()?,
),
}
})
}
}
@ -151,8 +161,8 @@ impl TxRingMembersInfo {
decoy_info: Option<DecoyInfo>,
tx_version: TxVersion,
hf: HardFork,
) -> TxRingMembersInfo {
TxRingMembersInfo {
) -> Result<TxRingMembersInfo, TransactionError> {
Ok(TxRingMembersInfo {
youngest_used_out_height: used_outs
.iter()
.map(|inp_outs| {
@ -178,9 +188,9 @@ impl TxRingMembersInfo {
})
.collect(),
hf,
rings: Rings::new(used_outs, tx_version),
rings: Rings::new(used_outs, tx_version)?,
decoy_info,
}
})
}
}
@ -213,7 +223,7 @@ impl DecoyInfo {
///
/// So:
///
/// amount_outs_on_chain(inputs[X]) == outputs_with_amount[X]
/// amount_outs_on_chain(inputs`[X]`) == outputs_with_amount`[X]`
///
/// Do not rely on this function to do consensus checks!
///

View file

@ -1,43 +1,27 @@
mod tx_pool;
#[cfg(feature = "binaries")]
mod bin {
use std::{
collections::{HashMap, HashSet},
ops::Range,
path::PathBuf,
sync::Arc,
};
use std::{ops::Range, path::PathBuf, sync::Arc};
use clap::Parser;
use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
use futures::{channel::mpsc, SinkExt, StreamExt};
use monero_serai::{block::Block, transaction::Transaction};
use tokio::sync::RwLock;
use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter;
use cuprate_helper::{asynch::rayon_spawn_async, network::Network};
use cuprate_helper::network::Network;
use cuprate_consensus::{
block::PrePreparedBlockExPOW,
context::{
BlockChainContextRequest, BlockChainContextResponse, ContextConfig,
UpdateBlockchainCacheData,
},
initialize_blockchain_context, initialize_verifier,
randomx::RandomXVM,
rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
Database, DatabaseRequest, DatabaseResponse, PrePreparedBlock, VerifiedBlockInformation,
VerifyBlockRequest, VerifyBlockResponse,
Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation, VerifyBlockRequest,
VerifyBlockResponse,
};
use monero_consensus::{blocks::randomx_seed_height, HardFork};
use super::tx_pool;
const MAX_BLOCKS_IN_RANGE: u64 = 500;
const BATCHES_IN_REQUEST: u64 = 3;
const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000;
@ -92,8 +76,7 @@ mod bin {
}
async fn call_blocks<D>(
mut new_tx_chan: tx_pool::NewTxChanSen,
mut block_chan: mpsc::Sender<Vec<Block>>,
mut block_chan: mpsc::Sender<Vec<(Block, Vec<Transaction>)>>,
start_height: u64,
chain_height: u64,
database: D,
@ -133,37 +116,6 @@ mod bin {
chain_height
);
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
let hf = |block: &Block| HardFork::from_version(block.header.major_version).unwrap();
let txs_hf = if blocks.first().map(hf) == blocks.last().map(hf) {
vec![(
txs.into_iter().flatten().collect::<Vec<_>>(),
blocks.first().map(hf).unwrap(),
)]
} else {
let mut txs_hfs: Vec<(Vec<Transaction>, HardFork)> = Vec::new();
let mut last_hf = blocks.first().map(hf).unwrap();
txs_hfs.push((vec![], last_hf));
for (mut txs, current_hf) in txs.into_iter().zip(blocks.iter().map(hf)) {
if current_hf == last_hf {
assert_eq!(txs_hfs.last_mut().unwrap().1, current_hf);
txs_hfs.last_mut().unwrap().0.append(&mut txs);
} else {
txs_hfs.push((txs, current_hf));
last_hf = current_hf;
}
}
txs_hfs
};
let (tx, rx) = oneshot::channel();
new_tx_chan.send((txs_hf, tx)).await?;
rx.await.unwrap().unwrap();
block_chan.send(blocks).await?;
}
@ -196,123 +148,37 @@ mod bin {
let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?;
let (tx, rx) = oneshot::channel();
let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?;
let (mut block_verifier, transaction_verifier) =
initialize_verifier(database.clone(), tx_pool_svc, ctx_svc.clone()).await?;
tx.send(transaction_verifier).map_err(|_| "").unwrap();
let (mut block_verifier, _) =
initialize_verifier(database.clone(), ctx_svc.clone()).await?;
let start_height = cache.read().await.height;
let (block_tx, mut incoming_blocks) = mpsc::channel(3);
let (mut prepped_blocks_tx, mut prepped_blocks_rx) = mpsc::channel(3);
tokio::spawn(
async move { call_blocks(block_tx, start_height, chain_height, database).await },
);
tokio::spawn(async move {
call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await
});
let BlockChainContextResponse::Context(ctx) = ctx_svc
while let Some(incoming_blocks) = incoming_blocks.next().await {
let VerifyBlockResponse::MainChainBatchPrep(blocks, txs) = block_verifier
.ready()
.await?
.call(BlockChainContextRequest::Get)
.call(VerifyBlockRequest::MainChainBatchPrep(incoming_blocks))
.await?
else {
panic!("ctx svc sent wrong response!");
panic!()
};
let mut rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone();
let mut rx_seed_cache_initiated = false;
let mut randomx_vms: Option<HashMap<u64, RandomXVM>> = Some(HashMap::new());
let mut cloned_ctx_svc = ctx_svc.clone();
tokio::spawn(async move {
while let Some(blocks) = incoming_blocks.next().await {
if blocks.last().unwrap().header.major_version >= 12 {
if !rx_seed_cache_initiated {
let BlockChainContextResponse::Context(ctx) = cloned_ctx_svc
.ready()
.await
.unwrap()
.call(BlockChainContextRequest::Get)
.await
.unwrap()
else {
panic!("ctx svc sent wrong response!");
};
rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone();
rx_seed_cache_initiated = true;
}
let unwrapped_rx_vms = randomx_vms.as_mut().unwrap();
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| PrePreparedBlockExPOW::new(block).unwrap())
.collect::<Vec<_>>()
})
.await;
let seeds_needed = blocks
.iter()
.map(|block| {
rx_seed_cache.new_block(block.block.number() as u64, &block.block_hash);
randomx_seed_height(block.block.number() as u64)
})
.collect::<HashSet<_>>();
unwrapped_rx_vms.retain(|seed_height, _| seeds_needed.contains(seed_height));
for seed_height in seeds_needed {
unwrapped_rx_vms.entry(seed_height).or_insert_with(|| {
RandomXVM::new(rx_seed_cache.get_seeds_hash(seed_height)).unwrap()
});
}
let arc_rx_vms = Arc::new(randomx_vms.take().unwrap());
let cloned_arc_rx_vms = arc_rx_vms.clone();
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| {
let rx_vm = arc_rx_vms
.get(&randomx_seed_height(block.block.number() as u64))
.unwrap();
PrePreparedBlock::new_rx(block, rx_vm).unwrap()
})
.collect::<Vec<_>>()
})
.await;
randomx_vms = Some(Arc::into_inner(cloned_arc_rx_vms).unwrap());
prepped_blocks_tx.send(blocks).await.unwrap();
} else {
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| PrePreparedBlock::new(block).unwrap())
.collect::<Vec<_>>()
})
.await;
prepped_blocks_tx.send(blocks).await.unwrap();
}
}
});
while let Some(incoming_blocks) = prepped_blocks_rx.next().await {
let mut height;
for block in incoming_blocks {
for (block, txs) in blocks.into_iter().zip(txs) {
let VerifyBlockResponse::MainChain(verified_block_info) = block_verifier
.ready()
.await?
.call(VerifyBlockRequest::MainChainPrepared(block))
.await?;
.call(VerifyBlockRequest::MainChainPrepared(block, txs))
.await?
else {
panic!()
};
height = verified_block_info.height;

View file

@ -1,251 +0,0 @@
#[cfg(feature = "binaries")]
mod bin {
use std::{
collections::HashMap,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use monero_serai::transaction::Transaction;
use tower::{Service, ServiceExt};
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_consensus::{
context::{
BlockChainContext, BlockChainContextRequest, BlockChainContextResponse,
RawBlockChainContext,
},
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse,
};
use monero_consensus::HardFork;
#[derive(Clone)]
pub struct TxPoolHandle {
tx_pool_task: std::sync::Arc<tokio::task::JoinHandle<()>>,
tx_pool_chan: mpsc::Sender<(
TxPoolRequest,
oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
)>,
}
impl tower::Service<TxPoolRequest> for TxPoolHandle {
type Response = TxPoolResponse;
type Error = TxNotInPool;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.tx_pool_task.is_finished() {
panic!("Tx pool task finished before it was supposed to!");
};
self.tx_pool_chan
.poll_ready(cx)
.map_err(|_| panic!("Tx pool channel closed before it was supposed to"))
}
fn call(&mut self, req: TxPoolRequest) -> Self::Future {
let (tx, rx) = oneshot::channel();
self.tx_pool_chan
.try_send((req, tx))
.expect("You need to use `poll_ready` to check capacity!");
rx.into()
}
}
pub type NewTxChanRec = mpsc::Receiver<(
Vec<(Vec<Transaction>, HardFork)>,
oneshot::Sender<Result<(), tower::BoxError>>,
)>;
pub type NewTxChanSen = mpsc::Sender<(
Vec<(Vec<Transaction>, HardFork)>,
oneshot::Sender<Result<(), tower::BoxError>>,
)>;
pub struct TxPool<TxV, Ctx> {
txs: Arc<Mutex<HashMap<[u8; 32], Arc<TransactionVerificationData>>>>,
current_ctx: BlockChainContext,
tx_verifier: Option<TxV>,
tx_verifier_chan: Option<oneshot::Receiver<TxV>>,
ctx_svc: Ctx,
}
impl<TxV, Ctx> TxPool<TxV, Ctx>
where
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
Ctx: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
Ctx::Future: Send + 'static,
{
pub async fn spawn(
tx_verifier_chan: oneshot::Receiver<TxV>,
mut ctx_svc: Ctx,
) -> Result<(TxPoolHandle, NewTxChanSen), tower::BoxError> {
let BlockChainContextResponse::Context(current_ctx) = ctx_svc
.ready()
.await?
.call(BlockChainContextRequest::Get)
.await?
else {
panic!("Context service service returned wrong response!")
};
let tx_pool = TxPool {
txs: Default::default(),
current_ctx,
tx_verifier: None,
tx_verifier_chan: Some(tx_verifier_chan),
ctx_svc,
};
let (tx_pool_tx, tx_pool_rx) = mpsc::channel(3);
let (new_tx_tx, new_tx_rx) = mpsc::channel(3);
let tx_pool_task = tokio::spawn(tx_pool.run(tx_pool_rx, new_tx_rx));
Ok((
TxPoolHandle {
tx_pool_task: tx_pool_task.into(),
tx_pool_chan: tx_pool_tx,
},
new_tx_tx,
))
}
async fn get_or_update_ctx(&mut self) -> Result<RawBlockChainContext, tower::BoxError> {
if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() {
Ok(current_ctx)
} else {
let BlockChainContextResponse::Context(current_ctx) = self
.ctx_svc
.ready()
.await?
.call(BlockChainContextRequest::Get)
.await?
else {
panic!("Context service service returned wrong response!")
};
self.current_ctx = current_ctx;
Ok(self.current_ctx.unchecked_blockchain_context().clone())
}
}
async fn handle_txs_req(
&mut self,
req: TxPoolRequest,
tx: oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
) {
let TxPoolRequest::Transactions(txs_to_get) = req;
let mut res = Vec::with_capacity(txs_to_get.len());
for tx_hash in txs_to_get {
let Some(tx) = self.txs.lock().unwrap().remove(&tx_hash) else {
tracing::debug!("tx not in pool: {}", hex::encode(tx_hash));
let _ = tx.send(Err(TxNotInPool));
return;
};
res.push(tx)
}
let _ = tx.send(Ok(TxPoolResponse::Transactions(res)));
}
async fn handle_new_txs(
&mut self,
new_txs: Vec<(Vec<Transaction>, HardFork)>,
res_chan: oneshot::Sender<Result<(), tower::BoxError>>,
) -> Result<(), tower::BoxError> {
if self.tx_verifier.is_none() {
self.tx_verifier = Some(self.tx_verifier_chan.take().unwrap().await?);
}
let current_ctx = self.get_or_update_ctx().await?;
let mut tx_verifier = self.tx_verifier.clone().unwrap();
let tx_pool = self.txs.clone();
tokio::spawn(async move {
for (txs, hf) in new_txs {
// We only batch the setup a real tx pool would also call `VerifyTxRequest::Block`
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier
.ready()
.await
.unwrap()
.call(VerifyTxRequest::BatchSetup {
txs,
hf,
re_org_token: current_ctx.re_org_token.clone(),
})
.await
.unwrap()
else {
panic!("Tx verifier sent incorrect response!");
};
let mut locked_pool = tx_pool.lock().unwrap();
for tx in txs {
let tx_hash = tx.tx_hash;
if locked_pool.insert(tx_hash, tx).is_some() {
panic!("added same tx to pool twice: {}", hex::encode(tx_hash))
}
}
}
res_chan.send(Ok(())).unwrap();
});
Ok(())
}
pub async fn run(
mut self,
mut tx_pool_handle: mpsc::Receiver<(
TxPoolRequest,
oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
)>,
mut new_tx_channel: NewTxChanRec,
) {
loop {
tokio::select! {
biased;
new_txs = new_tx_channel.next() => {
let Some(new_txs) = new_txs else {
todo!("Shutdown txpool")
};
self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap()
}
pool_req = tx_pool_handle.next() => {
let Some((req, tx)) = pool_req else {
todo!("Shutdown txpool")
};
self.handle_txs_req(req, tx).await;
}
}
}
}
}
}
#[cfg(feature = "binaries")]
pub use bin::*;
#[allow(dead_code)]
fn main() {}

View file

@ -1,25 +1,39 @@
use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use cuprate_helper::asynch::rayon_spawn_async;
use futures::FutureExt;
use monero_serai::block::Block;
use monero_serai::transaction::Input;
use monero_serai::{
block::Block,
transaction::{Input, Transaction},
};
use rayon::prelude::*;
use tower::{Service, ServiceExt};
use monero_consensus::{
blocks::{calculate_pow_hash, check_block, check_block_pow, BlockError, RandomX},
blocks::{
calculate_pow_hash, check_block, check_block_pow, is_randomx_seed_height,
randomx_seed_height, BlockError, RandomX,
},
miner_tx::MinerTxError,
ConsensusError, HardFork,
};
use crate::{
context::{BlockChainContextRequest, BlockChainContextResponse},
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse,
context::{
rx_vms::RandomXVM, BlockChainContextRequest, BlockChainContextResponse,
RawBlockChainContext,
},
transactions::{
batch_setup_txs, contextual_data, OutputCache, TransactionVerificationData,
VerifyTxRequest, VerifyTxResponse,
},
Database, ExtendedConsensusError,
};
#[derive(Debug)]
@ -31,6 +45,7 @@ pub struct PrePreparedBlockExPOW {
pub hf_version: HardFork,
pub block_hash: [u8; 32],
pub height: u64,
pub miner_tx_weight: usize,
}
@ -40,12 +55,19 @@ impl PrePreparedBlockExPOW {
let (hf_version, hf_vote) =
HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?;
let Some(Input::Gen(height)) = block.miner_tx.prefix.inputs.first() else {
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputNotOfTypeGen,
)))?
};
Ok(PrePreparedBlockExPOW {
block_blob: block.serialize(),
hf_vote,
hf_version,
block_hash: block.hash(),
height: *height,
miner_tx_weight: block.miner_tx.weight(),
block,
@ -107,7 +129,7 @@ impl PrePreparedBlock {
pub fn new_rx<R: RandomX>(
block: PrePreparedBlockExPOW,
randomx_vm: &R,
randomx_vm: Option<&R>,
) -> Result<PrePreparedBlock, ConsensusError> {
let Some(Input::Gen(height)) = block.block.miner_tx.prefix.inputs.first() else {
Err(ConsensusError::Block(BlockError::MinerTxError(
@ -122,7 +144,7 @@ impl PrePreparedBlock {
block_hash: block.block_hash,
pow_hash: calculate_pow_hash(
Some(randomx_vm),
randomx_vm,
&block.block.serialize_hashable(),
*height,
&block.hf_version,
@ -149,24 +171,33 @@ pub struct VerifiedBlockInformation {
}
pub enum VerifyBlockRequest {
MainChain(Block),
MainChainPrepared(PrePreparedBlock),
MainChainBatchPrep(Vec<(Block, Vec<Transaction>)>),
MainChain {
block: Block,
prepared_txs: Vec<Arc<TransactionVerificationData>>,
txs: Vec<Transaction>,
},
MainChainPrepared(PrePreparedBlock, Vec<Arc<TransactionVerificationData>>),
}
pub enum VerifyBlockResponse {
MainChain(VerifiedBlockInformation),
MainChainBatchPrep(
Vec<PrePreparedBlock>,
Vec<Vec<Arc<TransactionVerificationData>>>,
),
}
// TODO: it is probably a bad idea for this to derive clone, if 2 places (RPC, P2P) receive valid but different blocks
// then they will both get approved but only one should go to main chain.
#[derive(Clone)]
pub struct BlockVerifierService<C: Clone, TxV: Clone, TxP: Clone> {
pub struct BlockVerifierService<C: Clone, TxV: Clone, D> {
context_svc: C,
tx_verifier_svc: TxV,
tx_pool: TxP,
database: D,
}
impl<C, TxV, TxP> BlockVerifierService<C, TxV, TxP>
impl<C, TxV, D> BlockVerifierService<C, TxV, D>
where
C: Service<BlockChainContextRequest, Response = BlockChainContextResponse>
+ Clone
@ -176,25 +207,23 @@ where
+ Clone
+ Send
+ 'static,
TxP: Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
+ Clone
+ Send
+ 'static,
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
pub fn new(
context_svc: C,
tx_verifier_svc: TxV,
tx_pool: TxP,
) -> BlockVerifierService<C, TxV, TxP> {
database: D,
) -> BlockVerifierService<C, TxV, D> {
BlockVerifierService {
context_svc,
tx_verifier_svc,
tx_pool,
database,
}
}
}
impl<C, TxV, TxP> Service<VerifyBlockRequest> for BlockVerifierService<C, TxV, TxP>
impl<C, TxV, D> Service<VerifyBlockRequest> for BlockVerifierService<C, TxV, D>
where
C: Service<
BlockChainContextRequest,
@ -211,11 +240,8 @@ where
+ 'static,
TxV::Future: Send + 'static,
TxP: Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
+ Clone
+ Send
+ 'static,
TxP::Future: Send + 'static,
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
type Response = VerifyBlockResponse;
type Error = ExtendedConsensusError;
@ -229,33 +255,219 @@ where
fn call(&mut self, req: VerifyBlockRequest) -> Self::Future {
let context_svc = self.context_svc.clone();
let tx_verifier_svc = self.tx_verifier_svc.clone();
let tx_pool = self.tx_pool.clone();
let database = self.database.clone();
async move {
match req {
VerifyBlockRequest::MainChain(block) => {
verify_main_chain_block(block, context_svc, tx_verifier_svc, tx_pool).await
VerifyBlockRequest::MainChain {
block,
prepared_txs,
txs,
} => {
verify_main_chain_block(block, txs, prepared_txs, context_svc, tx_verifier_svc)
.await
}
VerifyBlockRequest::MainChainPrepared(prepped_block) => {
VerifyBlockRequest::MainChainPrepared(prepped_block, txs) => {
verify_main_chain_block_prepared(
prepped_block,
txs,
context_svc,
tx_verifier_svc,
tx_pool,
None,
)
.await
}
VerifyBlockRequest::MainChainBatchPrep(blocks) => {
batch_verify_main_chain_block(blocks, context_svc, database).await
}
}
}
.boxed()
}
}
async fn verify_main_chain_block_prepared<C, TxV, TxP>(
async fn batch_verify_main_chain_block<C, D>(
blocks: Vec<(Block, Vec<Transaction>)>,
mut context_svc: C,
mut database: D,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
tracing::debug!("Calculating block hashes.");
let blocks: Vec<PrePreparedBlockExPOW> = rayon_spawn_async(|| {
blocks
.into_iter()
.map(PrePreparedBlockExPOW::new)
.collect::<Result<Vec<_>, _>>()
})
.await?;
let mut timestamps_hfs = Vec::with_capacity(blocks.len());
let mut new_rx_vm = None;
for window in blocks.windows(2) {
if window[0].block_hash != window[1].block.header.previous
|| window[0].height != window[1].height - 1
{
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?;
}
if is_randomx_seed_height(window[0].height) {
new_rx_vm = Some((window[0].height, window[0].block_hash));
}
timestamps_hfs.push((window[0].block.header.timestamp, window[0].hf_version))
}
tracing::debug!("getting blockchain context");
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetContext)
.await
.map_err(Into::<ExtendedConsensusError>::into)?
else {
panic!("Context service returned wrong response!");
};
let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::BatchGetDifficulties(
timestamps_hfs,
))
.await
.map_err(Into::<ExtendedConsensusError>::into)?
else {
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
if context.chain_height != blocks[0].height {
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputsHeightIncorrect,
)))?;
}
if context.top_hash != blocks[0].block.header.previous {
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?;
}
let mut rx_vms = context.rx_vms;
if let Some((new_vm_height, new_vm_seed)) = new_rx_vm {
let new_vm = rayon_spawn_async(move || {
Arc::new(RandomXVM::new(&new_vm_seed).expect("RandomX VM gave an error on set up!"))
})
.await;
context_svc
.ready()
.await?
.call(BlockChainContextRequest::NewRXVM((
new_vm_seed,
new_vm.clone(),
)))
.await
.map_err(Into::<ExtendedConsensusError>::into)?;
rx_vms.insert(new_vm_height, new_vm);
}
let blocks = rayon_spawn_async(move || {
blocks
.into_par_iter()
.zip(difficulties)
.map(|(block, difficultly)| {
let height = block.height;
let block = PrePreparedBlock::new_rx(
block,
rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref),
)?;
check_block_pow(&block.pow_hash, difficultly)?;
Ok(block)
})
.collect::<Result<Vec<_>, ConsensusError>>()
})
.await?;
let txs = batch_setup_txs(
txs.into_iter()
.zip(blocks.iter().map(|block| block.hf_version))
.collect(),
)
.await?;
let mut complete_block_idx = 0;
let mut out_cache = OutputCache::new();
out_cache
.extend_from_block(
blocks
.iter()
.map(|block| &block.block)
.zip(txs.iter().map(Vec::as_slice)),
&mut database,
)
.await?;
for (idx, hf) in blocks
.windows(2)
.enumerate()
.filter(|(_, block)| block[0].hf_version != blocks[1].hf_version)
.map(|(i, block)| (i, &block[0].hf_version))
{
contextual_data::batch_fill_ring_member_info(
txs.iter()
.take(idx + 1)
.skip(complete_block_idx)
.flat_map(|txs| txs.iter()),
hf,
context.re_org_token.clone(),
database.clone(),
Some(&out_cache),
)
.await?;
complete_block_idx = idx + 1;
}
if complete_block_idx != blocks.len() {
contextual_data::batch_fill_ring_member_info(
txs.iter()
.skip(complete_block_idx)
.flat_map(|txs| txs.iter()),
&blocks.last().unwrap().hf_version,
context.re_org_token.clone(),
database.clone(),
Some(&out_cache),
)
.await?;
}
Ok(VerifyBlockResponse::MainChainBatchPrep(blocks, txs))
}
async fn verify_main_chain_block_prepared<C, TxV>(
prepped_block: PrePreparedBlock,
txs: Vec<Arc<TransactionVerificationData>>,
context_svc: C,
tx_verifier_svc: TxV,
tx_pool: TxP,
context: Option<RawBlockChainContext>,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
@ -266,14 +478,13 @@ where
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>,
TxP: Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
+ Clone
+ Send
+ 'static,
{
let context = match context {
Some(context) => context,
None => {
tracing::debug!("getting blockchain context");
let BlockChainContextResponse::Context(checked_context) = context_svc
.oneshot(BlockChainContextRequest::Get)
.oneshot(BlockChainContextRequest::GetContext)
.await
.map_err(Into::<ExtendedConsensusError>::into)?
else {
@ -283,10 +494,25 @@ where
let context = checked_context.unchecked_blockchain_context().clone();
tracing::debug!("got blockchain context: {:?}", context);
context
}
};
let TxPoolResponse::Transactions(txs) = tx_pool
.oneshot(TxPoolRequest::Transactions(prepped_block.block.txs.clone()))
.await?;
check_block_pow(&prepped_block.pow_hash, context.next_difficulty)
.map_err(ConsensusError::Block)?;
// Check that the txs included are what we need and that there are not any extra.
// Collecting into a HashSet could hide duplicates but we check Key Images are unique so someone would have to find
// a hash collision to include duplicate txs here.
let mut tx_hashes = txs.iter().map(|tx| &tx.tx_hash).collect::<HashSet<_>>();
for tx_hash in &prepped_block.block.txs {
if !tx_hashes.remove(tx_hash) {
return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect);
}
}
if !tx_hashes.is_empty() {
return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect);
}
tx_verifier_svc
.oneshot(VerifyTxRequest::Block {
@ -311,9 +537,6 @@ where
)
.map_err(ConsensusError::Block)?;
check_block_pow(&prepped_block.pow_hash, context.next_difficulty)
.map_err(ConsensusError::Block)?;
Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation {
block_hash: prepped_block.block_hash,
block: prepped_block.block,
@ -328,11 +551,12 @@ where
}))
}
async fn verify_main_chain_block<C, TxV, TxP>(
_block: Block,
_context_svc: C,
_tx_verifier_svc: TxV,
_tx_pool: TxP,
async fn verify_main_chain_block<C, TxV>(
block: Block,
txs: Vec<Transaction>,
mut prepared_txs: Vec<Arc<TransactionVerificationData>>,
mut context_svc: C,
tx_verifier_svc: TxV,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
@ -343,17 +567,12 @@ where
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>,
TxP: Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
+ Clone
+ Send
+ 'static,
{
todo!("Single main chain block.");
/*
tracing::debug!("getting blockchain context");
let BlockChainContextResponse::Context(checked_context) = context_svc
.oneshot(BlockChainContextRequest::Get)
.ready()
.await?
.call(BlockChainContextRequest::GetContext)
.await
.map_err(Into::<ExtendedConsensusError>::into)?
else {
@ -361,61 +580,28 @@ where
};
let context = checked_context.unchecked_blockchain_context().clone();
tracing::debug!("got blockchain context: {:?}", context);
let TxPoolResponse::Transactions(txs) = tx_pool
.oneshot(TxPoolRequest::Transactions(block.txs.clone()))
.await?;
let rx_vms = context.rx_vms.clone();
let prepped_block = rayon_spawn_async(move || {
let prepped_block_ex_pow = PrePreparedBlockExPOW::new(block)?;
let height = prepped_block_ex_pow.height;
tx_verifier_svc
.oneshot(VerifyTxRequest::Block {
txs: txs.clone(),
current_chain_height: context.chain_height,
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
hf: context.current_hf,
re_org_token: context.re_org_token.clone(),
PrePreparedBlock::new_rx(prepped_block_ex_pow, rx_vms.get(&height).map(AsRef::as_ref))
})
.await?;
let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
let total_fees = txs.iter().map(|tx| tx.fee).sum::<u64>();
check_block_pow(&prepped_block.pow_hash, context.cumulative_difficulty)
.map_err(ConsensusError::Block)?;
let (hf_vote, generated_coins) = check_block(
&block,
total_fees,
block_weight,
block.serialize().len(),
&context.context_to_verify_block,
prepared_txs.append(&mut batch_setup_txs(vec![(txs, context.current_hf)]).await?[0]);
verify_main_chain_block_prepared(
prepped_block,
prepared_txs,
context_svc,
tx_verifier_svc,
Some(context),
)
.map_err(ConsensusError::Block)?;
let hashing_blob = block.serialize_hashable();
// do POW test last
let chain_height = context.chain_height;
let current_hf = context.current_hf;
let pow_hash = todo!();
/*
rayon_spawn_async(move || calculate_pow_hash(, &hashing_blob, chain_height, &current_hf))
.await
.map_err(ConsensusError::Block)?;
*/
check_block_pow(&pow_hash, context.next_difficulty).map_err(ConsensusError::Block)?;
Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation {
block_hash: block.hash(),
block,
txs,
pow_hash,
generated_coins,
weight: block_weight,
height: context.chain_height,
long_term_weight: context.next_block_long_term_weight(block_weight),
hf_vote,
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
}))
*/
}

View file

@ -7,14 +7,15 @@
use std::{
cmp::min,
collections::HashMap,
future::Future,
ops::DerefMut,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::{
future::{ready, Ready},
lock::{Mutex, OwnedMutexGuard, OwnedMutexLockFuture},
FutureExt,
};
@ -26,13 +27,14 @@ use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError}
pub(crate) mod difficulty;
pub(crate) mod hardforks;
pub(crate) mod rx_seed;
pub(crate) mod rx_vms;
pub(crate) mod weight;
mod tokens;
pub use difficulty::DifficultyCacheConfig;
pub use hardforks::HardForkConfig;
use rx_vms::RandomXVM;
pub use tokens::*;
pub use weight::BlockWeightsCacheConfig;
@ -117,6 +119,11 @@ where
panic!("Database sent incorrect response!");
};
let db = database.clone();
let hardfork_state_handle = tokio::spawn(async move {
hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await
});
let db = database.clone();
let difficulty_cache_handle = tokio::spawn(async move {
difficulty::DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db).await
@ -127,14 +134,12 @@ where
weight::BlockWeightsCache::init_from_chain_height(chain_height, weights_config, db).await
});
let db = database.clone();
let hardfork_state_handle = tokio::spawn(async move {
hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await
});
let hardfork_state = hardfork_state_handle.await.unwrap()?;
let current_hf = hardfork_state.current_hardfork();
let db = database.clone();
let rx_seed_handle = tokio::spawn(async move {
rx_seed::RandomXSeed::init_from_chain_height(chain_height, db).await
rx_vms::RandomXVMCache::init_from_chain_height(chain_height, &current_hf, db).await
});
let context_svc = BlockChainContextService {
@ -145,7 +150,7 @@ where
difficulty_cache: difficulty_cache_handle.await.unwrap()?,
weight_cache: weight_cache_handle.await.unwrap()?,
rx_seed_cache: rx_seed_handle.await.unwrap()?,
hardfork_state: hardfork_state_handle.await.unwrap()?,
hardfork_state,
chain_height,
already_generated_coins,
top_block_hash,
@ -166,7 +171,7 @@ pub struct RawBlockChainContext {
pub cumulative_difficulty: u128,
/// A token which is used to signal if a reorg has happened since creating the token.
pub re_org_token: ReOrgToken,
pub rx_seed_cache: rx_seed::RandomXSeed,
pub rx_vms: HashMap<u64, Arc<RandomXVM>>,
pub context_to_verify_block: ContextToVerifyBlock,
/// The median long term block weight.
median_long_term_weight: usize,
@ -268,12 +273,20 @@ pub struct UpdateBlockchainCacheData {
#[derive(Debug, Clone)]
pub enum BlockChainContextRequest {
Get,
GetContext,
/// Get the next difficulties for these blocks.
///
/// Inputs: a list of block timestamps and hfs
///
/// The number of difficulties returned will be one more than the number of timestamps/ hfs.
BatchGetDifficulties(Vec<(u64, HardFork)>),
NewRXVM(([u8; 32], Arc<RandomXVM>)),
Update(UpdateBlockchainCacheData),
}
pub enum BlockChainContextResponse {
Context(BlockChainContext),
BatchDifficulties(Vec<u128>),
Ok,
}
struct InternalBlockChainContext {
@ -285,7 +298,7 @@ struct InternalBlockChainContext {
difficulty_cache: difficulty::DifficultyCache,
weight_cache: weight::BlockWeightsCache,
rx_seed_cache: rx_seed::RandomXSeed,
rx_seed_cache: rx_vms::RandomXVMCache,
hardfork_state: hardforks::HardForkState,
chain_height: u64,
@ -315,7 +328,8 @@ impl Clone for BlockChainContextService {
impl Service<BlockChainContextRequest> for BlockChainContextService {
type Response = BlockChainContextResponse;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
@ -325,8 +339,8 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
Arc::clone(&self.internal_blockchain_context).lock_owned(),
)
}
MutexLockState::Acquiring(rpc) => {
self.lock_state = MutexLockState::Acquired(futures::ready!(rpc.poll_unpin(cx)))
MutexLockState::Acquiring(lock) => {
self.lock_state = MutexLockState::Acquired(futures::ready!(lock.poll_unpin(cx)))
}
MutexLockState::Acquired(_) => return Poll::Ready(Ok(())),
}
@ -339,7 +353,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
else {
panic!("poll_ready() was not called first!")
};
async move {
let InternalBlockChainContext {
current_validity_token,
current_reorg_token,
@ -353,7 +367,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
} = internal_blockchain_context.deref_mut();
let res = match req {
BlockChainContextRequest::Get => {
BlockChainContextRequest::GetContext => {
let current_hf = hardfork_state.current_hardfork();
BlockChainContextResponse::Context(BlockChainContext {
@ -373,7 +387,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
next_difficulty: difficulty_cache.next_difficulty(&current_hf),
already_generated_coins: *already_generated_coins,
},
rx_seed_cache: rx_seed_cache.clone(),
rx_vms: rx_seed_cache.get_vms(),
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
median_long_term_weight: weight_cache.median_long_term_weight(),
top_block_timestamp: difficulty_cache.top_block_timestamp(),
@ -381,17 +395,37 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
},
})
}
BlockChainContextRequest::BatchGetDifficulties(blocks) => {
let next_diffs = difficulty_cache
.next_difficulties(blocks, &hardfork_state.current_hardfork());
BlockChainContextResponse::BatchDifficulties(next_diffs)
}
BlockChainContextRequest::NewRXVM(vm) => {
rx_seed_cache.add_vm(vm);
BlockChainContextResponse::Ok
}
BlockChainContextRequest::Update(new) => {
// Cancel the validity token and replace it with a new one.
std::mem::replace(current_validity_token, ValidityToken::new()).set_data_invalid();
std::mem::replace(current_validity_token, ValidityToken::new())
.set_data_invalid();
difficulty_cache.new_block(new.height, new.timestamp, new.cumulative_difficulty);
difficulty_cache.new_block(
new.height,
new.timestamp,
new.cumulative_difficulty,
);
weight_cache.new_block(new.height, new.weight, new.long_term_weight);
hardfork_state.new_block(new.vote, new.height);
rx_seed_cache.new_block(new.height, &new.new_top_hash);
rx_seed_cache
.new_block(
new.height,
&new.new_top_hash,
&hardfork_state.current_hardfork(),
)
.await;
*chain_height = new.height + 1;
*top_block_hash = new.new_top_hash;
@ -402,6 +436,8 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
}
};
ready(Ok(res))
Ok(res)
}
.boxed()
}
}

View file

@ -19,7 +19,7 @@ const DIFFICULTY_LAG: usize = 15;
/// Configuration for the difficulty cache.
///
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DifficultyCacheConfig {
pub(crate) window: usize,
pub(crate) cut: usize,
@ -52,7 +52,7 @@ impl DifficultyCacheConfig {
/// This struct is able to calculate difficulties from blockchain information.
///
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct DifficultyCache {
/// The list of timestamps in the window.
/// len <= [`DIFFICULTY_BLOCKS_COUNT`]
@ -121,20 +121,24 @@ impl DifficultyCache {
return 1;
}
let mut sorted_timestamps = self.timestamps.clone();
if sorted_timestamps.len() > self.config.window {
sorted_timestamps.drain(self.config.window..);
let mut timestamps = self.timestamps.clone();
if timestamps.len() > self.config.window {
// remove the lag.
timestamps.drain(self.config.window..);
};
sorted_timestamps.make_contiguous().sort_unstable();
let timestamps_slice = timestamps.make_contiguous();
let (window_start, window_end) = get_window_start_and_end(
sorted_timestamps.len(),
timestamps_slice.len(),
self.config.accounted_window_len(),
self.config.window,
);
let mut time_span =
u128::from(sorted_timestamps[window_end - 1] - sorted_timestamps[window_start]);
// We don't sort the whole timestamp list
let mut time_span = u128::from(
*timestamps_slice.select_nth_unstable(window_end - 1).1
- *timestamps_slice.select_nth_unstable(window_start).1,
);
let windowed_work = self.cumulative_difficulties[window_end - 1]
- self.cumulative_difficulties[window_start];
@ -147,6 +151,50 @@ impl DifficultyCache {
(windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span
}
pub fn next_difficulties(
&mut self,
blocks: Vec<(u64, HardFork)>,
current_hf: &HardFork,
) -> Vec<u128> {
let new_timestamps_len = blocks.len();
let initial_len = self.timestamps.len();
let mut difficulties = Vec::with_capacity(blocks.len() + 1);
difficulties.push(self.next_difficulty(current_hf));
let mut diff_info_popped = Vec::new();
for (new_timestamp, hf) in blocks {
self.timestamps.push_back(new_timestamp);
self.cumulative_difficulties
.push_back(self.cumulative_difficulty() + *difficulties.last().unwrap());
if u64::try_from(self.timestamps.len()).unwrap() > self.config.total_block_count() {
diff_info_popped.push((
self.timestamps.pop_front().unwrap(),
self.cumulative_difficulties.pop_front().unwrap(),
));
}
difficulties.push(self.next_difficulty(&hf));
}
self.cumulative_difficulties.drain(
self.cumulative_difficulties
.len()
.saturating_sub(new_timestamps_len)..,
);
self.timestamps
.drain(self.timestamps.len().saturating_sub(new_timestamps_len)..);
for (timestamp, cum_dif) in diff_info_popped.into_iter().take(initial_len).rev() {
self.timestamps.push_front(timestamp);
self.cumulative_difficulties.push_front(cum_dif);
}
difficulties
}
/// Returns the median timestamp over the last `numb_blocks`, including the genesis block if the block height is low enough.
///
/// Will return [`None`] if there aren't enough blocks.

View file

@ -1,118 +0,0 @@
use std::collections::VecDeque;
use futures::{stream::FuturesOrdered, StreamExt};
use tower::ServiceExt;
use monero_consensus::blocks::{is_randomx_seed_height, randomx_seed_height};
use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError};
const RX_SEEDS_CACHED: usize = 3;
#[derive(Clone, Debug)]
pub struct RandomXSeed {
seeds: VecDeque<(u64, [u8; 32])>,
}
impl RandomXSeed {
pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: u64,
database: D,
) -> Result<Self, ExtendedConsensusError> {
let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED);
let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?;
Ok(RandomXSeed {
seeds: seed_heights.into_iter().zip(seed_hashes).collect(),
})
}
pub fn get_seeds_hash(&self, seed_height: u64) -> [u8; 32] {
for (height, seed) in self.seeds.iter() {
if seed_height == *height {
return *seed;
}
}
tracing::error!(
"Current seeds: {:?}, asked for: {}",
self.seeds,
seed_height
);
panic!("RX seed cache was not updated or was asked for a block too old.")
}
pub fn get_rx_seed(&self, height: u64) -> [u8; 32] {
let seed_height = randomx_seed_height(height);
tracing::warn!(
"Current seeds: {:?}, asked for: {}",
self.seeds,
seed_height
);
self.get_seeds_hash(seed_height)
}
pub fn new_block(&mut self, height: u64, hash: &[u8; 32]) {
if is_randomx_seed_height(height) {
for (got_height, _) in self.seeds.iter() {
if *got_height == height {
return;
}
}
self.seeds.push_front((height, *hash));
if self.seeds.len() > RX_SEEDS_CACHED {
self.seeds.pop_back();
}
}
}
}
fn get_last_rx_seed_heights(mut last_height: u64, mut amount: usize) -> Vec<u64> {
let mut seeds = Vec::with_capacity(amount);
if is_randomx_seed_height(last_height) {
seeds.push(last_height);
amount -= 1;
}
for _ in 0..amount {
if last_height == 0 {
return seeds;
}
let seed_height = randomx_seed_height(last_height);
seeds.push(seed_height);
last_height = seed_height
}
seeds
}
async fn get_block_hashes<D: Database + Clone>(
heights: Vec<u64>,
database: D,
) -> Result<Vec<[u8; 32]>, ExtendedConsensusError> {
let mut fut = FuturesOrdered::new();
for height in heights {
let db = database.clone();
fut.push_back(async move {
let DatabaseResponse::BlockHash(hash) = db
.clone()
.oneshot(DatabaseRequest::BlockHash(height))
.await?
else {
panic!("Database sent incorrect response!");
};
Result::<_, ExtendedConsensusError>::Ok(hash)
});
}
let mut res = Vec::new();
while let Some(hash) = fut.next().await {
res.push(hash?);
}
Ok(res)
}

View file

@ -0,0 +1,202 @@
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use futures::{stream::FuturesOrdered, StreamExt};
use randomx_rs::{RandomXCache, RandomXError, RandomXFlag, RandomXVM as VMInner};
use rayon::prelude::*;
use thread_local::ThreadLocal;
use tower::ServiceExt;
use cuprate_helper::asynch::rayon_spawn_async;
use monero_consensus::{
blocks::{is_randomx_seed_height, RandomX, RX_SEEDHASH_EPOCH_BLOCKS},
HardFork,
};
use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError};
const RX_SEEDS_CACHED: usize = 2;
#[derive(Debug)]
pub struct RandomXVM {
vms: ThreadLocal<VMInner>,
cache: RandomXCache,
flags: RandomXFlag,
}
impl RandomXVM {
pub fn new(seed: &[u8; 32]) -> Result<Self, RandomXError> {
let flags = RandomXFlag::get_recommended_flags();
let cache = RandomXCache::new(flags, seed.as_slice())?;
Ok(RandomXVM {
vms: ThreadLocal::new(),
cache,
flags,
})
}
}
impl RandomX for RandomXVM {
type Error = RandomXError;
fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error> {
self.vms
.get_or_try(|| VMInner::new(self.flags, Some(self.cache.clone()), None))?
.calculate_hash(buf)
.map(|out| out.try_into().unwrap())
}
}
#[derive(Clone, Debug)]
pub struct RandomXVMCache {
pub(crate) seeds: VecDeque<(u64, [u8; 32])>,
pub(crate) vms: HashMap<u64, Arc<RandomXVM>>,
pub(crate) cached_vm: Option<([u8; 32], Arc<RandomXVM>)>,
}
impl RandomXVMCache {
pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: u64,
hf: &HardFork,
database: D,
) -> Result<Self, ExtendedConsensusError> {
let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED);
let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?;
let seeds: VecDeque<(u64, [u8; 32])> = seed_heights.into_iter().zip(seed_hashes).collect();
let vms = if hf >= &HardFork::V12 {
let seeds_clone = seeds.clone();
rayon_spawn_async(move || {
seeds_clone
.par_iter()
.map(|(height, seed)| {
(
*height,
Arc::new(RandomXVM::new(seed).expect("Failed to create RandomX VM!")),
)
})
.collect()
})
.await
} else {
HashMap::new()
};
Ok(RandomXVMCache {
seeds,
vms,
cached_vm: None,
})
}
pub fn add_vm(&mut self, vm: ([u8; 32], Arc<RandomXVM>)) {
self.cached_vm.replace(vm);
}
pub fn get_vms(&self) -> HashMap<u64, Arc<RandomXVM>> {
self.vms.clone()
}
pub async fn new_block(&mut self, height: u64, hash: &[u8; 32], hf: &HardFork) {
let should_make_vms = hf >= &HardFork::V12;
if should_make_vms && self.vms.len() != self.seeds.len() {
// this will only happen when syncing and rx activates.
let seeds_clone = self.seeds.clone();
self.vms = rayon_spawn_async(move || {
seeds_clone
.par_iter()
.map(|(height, seed)| {
(
*height,
Arc::new(RandomXVM::new(seed).expect("Failed to create RandomX VM!")),
)
})
.collect()
})
.await
}
if is_randomx_seed_height(height) {
self.seeds.push_front((height, *hash));
if should_make_vms {
let new_vm = 'new_vm_block: {
if let Some((cached_hash, cached_vm)) = self.cached_vm.take() {
if &cached_hash == hash {
break 'new_vm_block cached_vm;
}
};
let hash_clone = *hash;
rayon_spawn_async(move || Arc::new(RandomXVM::new(&hash_clone).unwrap())).await
};
self.vms.insert(height, new_vm);
}
if self.seeds.len() > RX_SEEDS_CACHED {
self.seeds.pop_back();
// TODO: This is really not efficient but the amount of VMs cached is not a lot.
self.vms.retain(|height, _| {
self.seeds
.iter()
.any(|(cached_height, _)| height == cached_height)
})
}
}
}
}
pub(crate) fn get_last_rx_seed_heights(mut last_height: u64, mut amount: usize) -> Vec<u64> {
let mut seeds = Vec::with_capacity(amount);
if is_randomx_seed_height(last_height) {
seeds.push(last_height);
amount -= 1;
}
for _ in 0..amount {
if last_height == 0 {
return seeds;
}
// We don't include the lag as we only want seeds not the specific seed fo this height.
let seed_height = (last_height - 1) & !(RX_SEEDHASH_EPOCH_BLOCKS - 1);
seeds.push(seed_height);
last_height = seed_height
}
seeds
}
async fn get_block_hashes<D: Database + Clone>(
heights: Vec<u64>,
database: D,
) -> Result<Vec<[u8; 32]>, ExtendedConsensusError> {
let mut fut = FuturesOrdered::new();
for height in heights {
let db = database.clone();
fut.push_back(async move {
let DatabaseResponse::BlockHash(hash) = db
.clone()
.oneshot(DatabaseRequest::BlockHash(height))
.await?
else {
panic!("Database sent incorrect response!");
};
Result::<_, ExtendedConsensusError>::Ok(hash)
});
}
let mut res = Vec::new();
while let Some(hash) = fut.next().await {
res.push(hash?);
}
Ok(res)
}

View file

@ -1,7 +1,6 @@
use std::{
collections::{HashMap, HashSet},
future::Future,
sync::Arc,
};
use monero_consensus::{transactions::OutputOnChain, ConsensusError, HardFork};
@ -31,14 +30,13 @@ pub enum ExtendedConsensusError {
ConErr(#[from] monero_consensus::ConsensusError),
#[error("Database error: {0}")]
DBErr(#[from] tower::BoxError),
#[error("Needed transaction is not in pool")]
TxPErr(#[from] TxNotInPool),
#[error("The transactions passed in with the block are incorrect.")]
TxsIncludedWithBlockIncorrect,
}
// TODO: instead of (ab)using generic returns return the acc type
pub async fn initialize_verifier<D, TxP, Ctx>(
pub async fn initialize_verifier<D, Ctx>(
database: D,
tx_pool: TxP,
ctx_svc: Ctx,
) -> Result<
(
@ -68,12 +66,6 @@ pub async fn initialize_verifier<D, TxP, Ctx>(
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
TxP: tower::Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
+ Clone
+ Send
+ Sync
+ 'static,
TxP::Future: Send + 'static,
Ctx: tower::Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
@ -84,8 +76,8 @@ where
+ 'static,
Ctx::Future: Send + 'static,
{
let tx_svc = transactions::TxVerifierService::new(database);
let block_svc = block::BlockVerifierService::new(ctx_svc, tx_svc.clone(), tx_pool);
let tx_svc = transactions::TxVerifierService::new(database.clone());
let block_svc = block::BlockVerifierService::new(ctx_svc, tx_svc.clone(), database);
Ok((block_svc, tx_svc))
}
@ -154,15 +146,3 @@ pub enum DatabaseResponse {
)>,
),
}
#[derive(Debug, Copy, Clone, thiserror::Error)]
#[error("The transaction requested was not in the transaction pool")]
pub struct TxNotInPool;
pub enum TxPoolRequest {
Transactions(Vec<[u8; 32]>),
}
pub enum TxPoolResponse {
Transactions(Vec<Arc<transactions::TransactionVerificationData>>),
}

View file

@ -58,19 +58,15 @@ impl ScanningCache {
txs: &[Arc<TransactionVerificationData>],
) {
self.add_tx_time_lock(miner_tx.hash(), miner_tx.prefix.timelock);
miner_tx
.prefix
.outputs
.iter()
.for_each(|out| self.add_outs(out.amount.unwrap_or(0), 1));
miner_tx.prefix.outputs.iter().for_each(|out| {
self.add_outs(miner_tx.prefix.version == 2, out.amount.unwrap_or(0), 1)
});
txs.iter().for_each(|tx| {
self.add_tx_time_lock(tx.tx_hash, tx.tx.prefix.timelock);
tx.tx
.prefix
.outputs
.iter()
.for_each(|out| self.add_outs(out.amount.unwrap_or(0), 1));
tx.tx.prefix.outputs.iter().for_each(|out| {
self.add_outs(tx.tx.prefix.version == 2, out.amount.unwrap_or(0), 1)
});
tx.tx.prefix.inputs.iter().for_each(|inp| match inp {
Input::ToKey { key_image, .. } => {
@ -125,7 +121,9 @@ impl ScanningCache {
.collect()
}
pub fn add_outs(&mut self, amount: u64, count: usize) {
pub fn add_outs(&mut self, is_v2: bool, amount: u64, count: usize) {
let amount = if is_v2 { 0 } else { amount };
if let Some(numb_outs) = self.numb_outs.get_mut(&amount) {
*numb_outs += count;
} else {

View file

@ -234,7 +234,7 @@ impl RpcConnection {
txs.len(),
"node: {}, height: {}, node is pruned, which is not supported!",
address,
block.number(),
block.number().unwrap(),
);
Ok((block, txs))
@ -352,9 +352,8 @@ impl RpcConnection {
// then a bad proof has been approved.
key: CompressedEdwardsY::from_slice(&out.key)
.unwrap()
.decompress()
.unwrap(),
mask: CompressedEdwardsY::from_slice(&out.mask)
.decompress(),
commitment: CompressedEdwardsY::from_slice(&out.mask)
.unwrap()
.decompress()
.unwrap(),

View file

@ -14,6 +14,7 @@ use crate::{
pub(crate) mod data;
mod difficulty;
mod hardforks;
mod rx_vms;
mod weight;
use difficulty::*;
@ -40,7 +41,7 @@ async fn context_invalidated_on_new_block() -> Result<(), tower::BoxError> {
let BlockChainContextResponse::Context(context) = ctx_svc
.clone()
.oneshot(BlockChainContextRequest::Get)
.oneshot(BlockChainContextRequest::GetContext)
.await?
else {
panic!("Context service returned wrong response!");
@ -82,8 +83,9 @@ async fn context_height_correct() -> Result<(), tower::BoxError> {
let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?;
let BlockChainContextResponse::Context(context) =
ctx_svc.oneshot(BlockChainContextRequest::Get).await?
let BlockChainContextResponse::Context(context) = ctx_svc
.oneshot(BlockChainContextRequest::GetContext)
.await?
else {
panic!("context service returned incorrect response!")
};

View file

@ -1,6 +1,7 @@
use std::collections::VecDeque;
use proptest::{arbitrary::any, prop_assert_eq, prop_compose, proptest};
use proptest::collection::size_range;
use proptest::{prelude::*, prop_assert_eq, prop_compose, proptest};
use cuprate_helper::num::median;
@ -87,9 +88,9 @@ async fn calculate_diff_3000000_3002000() -> Result<(), tower::BoxError> {
prop_compose! {
/// Generates an arbitrary full difficulty cache.
fn arb_full_difficulty_cache()
fn arb_difficulty_cache(blocks: usize)
(
blocks in any::<[(u64, u64); TEST_TOTAL_ACCOUNTED_BLOCKS]>()
blocks in any_with::<Vec<(u64, u64)>>(size_range(blocks).lift()),
) -> DifficultyCache {
let (timestamps, mut cumulative_difficulties): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
cumulative_difficulties.sort_unstable();
@ -104,10 +105,20 @@ prop_compose! {
}
}
prop_compose! {
fn random_difficulty_cache()(
blocks in 0..TEST_TOTAL_ACCOUNTED_BLOCKS,
)(
diff_cache in arb_difficulty_cache(blocks)
) -> DifficultyCache {
diff_cache
}
}
proptest! {
#[test]
fn check_calculations_lag(
mut diff_cache in arb_full_difficulty_cache(),
mut diff_cache in arb_difficulty_cache(TEST_TOTAL_ACCOUNTED_BLOCKS),
timestamp in any::<u64>(),
cumulative_difficulty in any::<u128>(),
hf in any::<HardFork>()
@ -133,7 +144,7 @@ proptest! {
}
#[test]
fn next_difficulty_consistant(diff_cache in arb_full_difficulty_cache(), hf in any::<HardFork>()) {
fn next_difficulty_consistant(diff_cache in arb_difficulty_cache(TEST_TOTAL_ACCOUNTED_BLOCKS), hf in any::<HardFork>()) {
let first_call = diff_cache.next_difficulty(&hf);
prop_assert_eq!(first_call, diff_cache.next_difficulty(&hf));
prop_assert_eq!(first_call, diff_cache.next_difficulty(&hf));
@ -160,11 +171,41 @@ proptest! {
}
#[test]
fn window_size_kept_constant(mut diff_cache in arb_full_difficulty_cache(), new_blocks in any::<Vec<(u64, u128)>>()) {
fn window_size_kept_constant(mut diff_cache in arb_difficulty_cache(TEST_TOTAL_ACCOUNTED_BLOCKS), new_blocks in any::<Vec<(u64, u128)>>()) {
for (timestamp, cumulative_difficulty) in new_blocks.into_iter() {
diff_cache.new_block(diff_cache.last_accounted_height+1, timestamp, cumulative_difficulty);
prop_assert_eq!(diff_cache.timestamps.len(), TEST_TOTAL_ACCOUNTED_BLOCKS);
prop_assert_eq!(diff_cache.cumulative_difficulties.len(), TEST_TOTAL_ACCOUNTED_BLOCKS);
}
}
#[test]
fn claculating_multiple_diffs_does_not_change_state(
mut diff_cache in random_difficulty_cache(),
timestamps in any_with::<Vec<u64>>(size_range(0..1000).lift()),
hf in any::<HardFork>(),
) {
let cache = diff_cache.clone();
diff_cache.next_difficulties(timestamps.into_iter().zip([hf].into_iter().cycle()).collect(), &hf);
assert_eq!(diff_cache, cache);
}
#[test]
fn calculate_diff_in_advance(
mut diff_cache in random_difficulty_cache(),
timestamps in any_with::<Vec<u64>>(size_range(0..1000).lift()),
hf in any::<HardFork>(),
) {
let timestamps: Vec<_> = timestamps.into_iter().zip([hf].into_iter().cycle()).collect();
let diffs = diff_cache.next_difficulties(timestamps.clone(), &hf);
for (timestamp, diff) in timestamps.into_iter().zip(diffs.into_iter()) {
assert_eq!(diff_cache.next_difficulty(&timestamp.1), diff);
diff_cache.new_block(diff_cache.last_accounted_height +1, timestamp.0, diff + diff_cache.cumulative_difficulty());
}
}
}

View file

@ -0,0 +1,72 @@
use std::collections::VecDeque;
use proptest::prelude::*;
use tokio::runtime::Builder;
use monero_consensus::{
blocks::{is_randomx_seed_height, randomx_seed_height},
HardFork,
};
use crate::{
context::rx_vms::{get_last_rx_seed_heights, RandomXVMCache},
tests::mock_db::*,
};
#[test]
fn rx_heights_consistent() {
let mut last_rx_heights = VecDeque::new();
for height in 0..100_000_000 {
if is_randomx_seed_height(height) {
last_rx_heights.push_front(height);
if last_rx_heights.len() > 3 {
last_rx_heights.pop_back();
}
}
assert_eq!(
get_last_rx_seed_heights(height, 3).as_slice(),
last_rx_heights.make_contiguous()
);
if last_rx_heights.len() >= 3 {
assert!(
randomx_seed_height(height) == last_rx_heights[0]
|| randomx_seed_height(height) == last_rx_heights[1]
);
}
}
}
#[tokio::test]
async fn rx_vm_created_on_hf_12() {
let db = DummyDatabaseBuilder::default().finish(Some(10));
let mut cache = RandomXVMCache::init_from_chain_height(10, &HardFork::V11, db)
.await
.unwrap();
assert!(cache.vms.is_empty());
cache.new_block(11, &[30; 32], &HardFork::V12).await;
assert!(!cache.vms.is_empty());
}
proptest! {
// these tests are expensive, so limit cases.
#![proptest_config(ProptestConfig {
cases: 3, .. ProptestConfig::default()
})]
#[test]
fn rx_vm_created_only_after_hf_12(
hf in any::<HardFork>(),
) {
let db = DummyDatabaseBuilder::default().finish(Some(10));
let rt = Builder::new_multi_thread().enable_all().build().unwrap();
rt.block_on(async move {
let cache = RandomXVMCache::init_from_chain_height(10, &hf, db).await.unwrap();
assert!(cache.seeds.len() == cache.vms.len() || hf < HardFork::V12);
});
}
}

View file

@ -28,7 +28,45 @@ use crate::{
DatabaseResponse, ExtendedConsensusError,
};
mod contextual_data;
pub mod contextual_data;
mod output_cache;
pub use output_cache::OutputCache;
pub async fn batch_setup_txs(
txs: Vec<(Vec<Transaction>, HardFork)>,
) -> Result<Vec<Vec<Arc<TransactionVerificationData>>>, ExtendedConsensusError> {
let batch_verifier = Arc::new(MultiThreadedBatchVerifier::new(rayon::current_num_threads()));
// Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs.
let txs = rayon_spawn_async(move || {
let txs = txs
.into_par_iter()
.map(|(txs, hf)| {
txs.into_par_iter()
.map(|tx| {
Ok(Arc::new(TransactionVerificationData::new(
tx,
&hf,
batch_verifier.clone(),
)?))
})
.collect::<Result<Vec<_>, ConsensusError>>()
})
.collect::<Result<Vec<_>, ConsensusError>>()?;
if !Arc::into_inner(batch_verifier).unwrap().verify() {
Err(ConsensusError::Transaction(TransactionError::RingCTError(
RingCTError::BulletproofsRangeInvalid,
)))?
}
Ok::<_, ConsensusError>(txs)
})
.await?;
Ok(txs)
}
/// Data needed to verify a transaction.
///
@ -90,13 +128,6 @@ pub enum VerifyTxRequest {
hf: HardFork,
re_org_token: ReOrgToken,
},
/// Batches the setup of [`TransactionVerificationData`], does *some* verification, you need to call [`VerifyTxRequest::Block`]
/// with the returned data.
BatchSetup {
txs: Vec<Transaction>,
hf: HardFork,
re_org_token: ReOrgToken,
},
}
pub enum VerifyTxResponse {
@ -136,6 +167,7 @@ where
fn call(&mut self, req: VerifyTxRequest) -> Self::Future {
let database = self.database.clone();
async move {
match req {
VerifyTxRequest::Block {
txs,
@ -143,7 +175,8 @@ where
time_for_time_lock,
hf,
re_org_token,
} => verify_transactions_for_block(
} => {
verify_transactions_for_block(
database,
txs,
current_chain_height,
@ -151,51 +184,12 @@ where
hf,
re_org_token,
)
.boxed(),
VerifyTxRequest::BatchSetup {
txs,
hf,
re_org_token,
} => batch_setup_transactions(database, txs, hf, re_org_token).boxed(),
.await
}
}
}
async fn batch_setup_transactions<D>(
database: D,
txs: Vec<Transaction>,
hf: HardFork,
re_org_token: ReOrgToken,
) -> Result<VerifyTxResponse, ExtendedConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
{
let batch_verifier = Arc::new(MultiThreadedBatchVerifier::new(rayon::current_num_threads()));
let cloned_verifier = batch_verifier.clone();
// Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs.
let txs = rayon_spawn_async(move || {
txs.into_par_iter()
.map(|tx| {
Ok(Arc::new(TransactionVerificationData::new(
tx,
&hf,
cloned_verifier.clone(),
)?))
})
.collect::<Result<Vec<_>, ConsensusError>>()
})
.await?;
if !Arc::into_inner(batch_verifier).unwrap().verify() {
Err(ConsensusError::Transaction(TransactionError::RingCTError(
RingCTError::BulletproofsRangeInvalid,
)))?
.boxed()
}
contextual_data::batch_fill_ring_member_info(&txs, &hf, re_org_token, database).await?;
Ok(VerifyTxResponse::BatchSetupOk(txs))
}
#[instrument(name = "verify_txs", skip_all, level = "info")]
@ -212,7 +206,13 @@ where
{
tracing::debug!("Verifying transactions for block, amount: {}", txs.len());
contextual_data::batch_refresh_ring_member_info(&txs, &hf, re_org_token, database.clone())
contextual_data::batch_refresh_ring_member_info(
&txs,
&hf,
re_org_token,
database.clone(),
None,
)
.await?;
let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new()));

View file

@ -11,8 +11,11 @@
//! Because this data is unique for *every* transaction and the context service is just for blockchain state data.
//!
use std::collections::HashSet;
use std::{collections::HashMap, ops::Deref, sync::Arc};
use std::{
collections::{HashMap, HashSet},
ops::Deref,
sync::Arc,
};
use monero_serai::transaction::Input;
use tower::ServiceExt;
@ -25,25 +28,28 @@ use monero_consensus::{
};
use crate::{
context::ReOrgToken, transactions::TransactionVerificationData, Database, DatabaseRequest,
DatabaseResponse, ExtendedConsensusError,
context::ReOrgToken,
transactions::{output_cache::OutputCache, TransactionVerificationData},
Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError,
};
pub async fn batch_refresh_ring_member_info<D: Database + Clone + Send + Sync + 'static>(
txs_verification_data: &[Arc<TransactionVerificationData>],
pub async fn batch_refresh_ring_member_info<'a, D: Database + Clone + Send + Sync + 'static>(
txs_verification_data: &'a [Arc<TransactionVerificationData>],
hf: &HardFork,
re_org_token: ReOrgToken,
mut database: D,
out_cache: Option<&OutputCache<'a>>,
) -> Result<(), ExtendedConsensusError> {
let (txs_needing_full_refresh, txs_needing_partial_refresh) =
ring_member_info_needing_refresh(txs_verification_data, hf);
if !txs_needing_full_refresh.is_empty() {
batch_fill_ring_member_info(
&txs_needing_full_refresh,
txs_needing_full_refresh.iter(),
hf,
re_org_token,
database.clone(),
out_cache,
)
.await?;
}
@ -157,15 +163,16 @@ fn ring_member_info_needing_refresh(
///
/// This function batch gets all the ring members for the inputted transactions and fills in data about
/// them.
pub async fn batch_fill_ring_member_info<D: Database + Clone + Send + Sync + 'static>(
txs_verification_data: &[Arc<TransactionVerificationData>],
pub async fn batch_fill_ring_member_info<'a, D: Database + Clone + Send + Sync + 'static>(
txs_verification_data: impl Iterator<Item = &Arc<TransactionVerificationData>> + Clone,
hf: &HardFork,
re_org_token: ReOrgToken,
mut database: D,
out_cache: Option<&OutputCache<'a>>,
) -> Result<(), ExtendedConsensusError> {
let mut output_ids = HashMap::new();
for tx_v_data in txs_verification_data.iter() {
for tx_v_data in txs_verification_data.clone() {
insert_ring_member_ids(&tx_v_data.tx.prefix.inputs, &mut output_ids)
.map_err(ConsensusError::Transaction)?;
}
@ -191,8 +198,18 @@ pub async fn batch_fill_ring_member_info<D: Database + Clone + Send + Sync + 'st
};
for tx_v_data in txs_verification_data {
let ring_members_for_tx =
get_ring_members_for_inputs(&outputs, &tx_v_data.tx.prefix.inputs)
let ring_members_for_tx = get_ring_members_for_inputs(
|amt, idx| {
if let Some(cached_outs) = out_cache {
if let Some(out) = cached_outs.get_out(amt, idx) {
return Some(out);
}
}
outputs.get(&amt)?.get(&idx)
},
&tx_v_data.tx.prefix.inputs,
)
.map_err(ConsensusError::Transaction)?;
let decoy_info = if hf != &HardFork::V1 {
@ -207,7 +224,8 @@ pub async fn batch_fill_ring_member_info<D: Database + Clone + Send + Sync + 'st
// Temporarily acquirer the mutex lock to add the ring member info.
let _ = tx_v_data.rings_member_info.lock().unwrap().insert((
TxRingMembersInfo::new(ring_members_for_tx, decoy_info, tx_v_data.version, *hf),
TxRingMembersInfo::new(ring_members_for_tx, decoy_info, tx_v_data.version, *hf)
.map_err(ConsensusError::Transaction)?,
re_org_token.clone(),
));
}

View file

@ -0,0 +1,153 @@
use std::{
collections::{BTreeMap, HashMap},
iter::once,
sync::{Arc, OnceLock},
};
use curve25519_dalek::{
constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY, EdwardsPoint, Scalar,
};
use monero_consensus::{
blocks::BlockError,
miner_tx::MinerTxError,
transactions::{OutputOnChain, TransactionError},
ConsensusError,
};
use monero_serai::{
block::Block,
transaction::{Input, Timelock},
H,
};
use tower::ServiceExt;
use crate::{
transactions::TransactionVerificationData, Database, DatabaseRequest, DatabaseResponse,
ExtendedConsensusError,
};
#[derive(Debug)]
enum CachedAmount<'a> {
Clear(u64),
Commitment(&'a EdwardsPoint),
}
impl<'a> CachedAmount<'a> {
fn get_commitment(&self) -> EdwardsPoint {
match self {
CachedAmount::Commitment(commitment) => **commitment,
// TODO: Setup a table with common amounts.
CachedAmount::Clear(amt) => ED25519_BASEPOINT_POINT + H() * Scalar::from(*amt),
}
}
}
#[derive(Debug)]
struct CachedOutput<'a> {
height: u64,
time_lock: &'a Timelock,
key: &'a CompressedEdwardsY,
amount: CachedAmount<'a>,
cached_created: OnceLock<OutputOnChain>,
}
#[derive(Debug)]
pub struct OutputCache<'a>(HashMap<u64, BTreeMap<u64, CachedOutput<'a>>>);
impl<'a> OutputCache<'a> {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
OutputCache(HashMap::new())
}
pub fn get_out(&self, amt: u64, idx: u64) -> Option<&OutputOnChain> {
let cached_out = self.0.get(&amt)?.get(&idx)?;
Some(cached_out.cached_created.get_or_init(|| OutputOnChain {
height: cached_out.height,
time_lock: *cached_out.time_lock,
key: cached_out.key.decompress(),
commitment: cached_out.amount.get_commitment(),
}))
}
pub async fn extend_from_block<'b: 'a, D: Database>(
&mut self,
blocks: impl Iterator<Item = (&'b Block, &'b [Arc<TransactionVerificationData>])> + 'b,
database: &mut D,
) -> Result<(), ExtendedConsensusError> {
let mut idx_needed = HashMap::new();
for (block, txs) in blocks {
for tx in once(&block.miner_tx).chain(txs.iter().map(|tx| &tx.tx)) {
let is_rct = tx.prefix.version == 2;
let is_miner = matches!(tx.prefix.inputs.as_slice(), &[Input::Gen(_)]);
for (i, out) in tx.prefix.outputs.iter().enumerate() {
let amt = out.amount.unwrap_or(0);
// The amt this output will be stored under.
let amt_table_key = if is_rct { 0 } else { amt };
let amount_commitment = match (is_rct, is_miner) {
(true, false) => CachedAmount::Commitment(
tx.rct_signatures.base.commitments.get(i).ok_or(
ConsensusError::Transaction(TransactionError::NonZeroOutputForV2),
)?,
),
_ => CachedAmount::Clear(amt),
};
let output_to_cache = CachedOutput {
height: block.number().ok_or(ConsensusError::Block(
BlockError::MinerTxError(MinerTxError::InputNotOfTypeGen),
))?,
time_lock: &tx.prefix.timelock,
key: &out.key,
amount: amount_commitment,
cached_created: OnceLock::new(),
};
let Some(amt_table) = self.0.get_mut(&amt_table_key) else {
idx_needed
.entry(amt_table_key)
.or_insert_with(Vec::new)
.push(output_to_cache);
continue;
};
let top_idx = *amt_table.last_key_value().unwrap().0;
amt_table.insert(top_idx + 1, output_to_cache);
}
}
}
if idx_needed.is_empty() {
return Ok(());
}
let DatabaseResponse::NumberOutputsWithAmount(numb_outs) = database
.ready()
.await?
.call(DatabaseRequest::NumberOutputsWithAmount(
idx_needed.keys().copied().collect(),
))
.await?
else {
panic!("Database sent incorrect response!");
};
for (amt_table_key, out) in idx_needed {
let numb_outs = *numb_outs
.get(&amt_table_key)
.expect("DB did not return all results!");
self.0.entry(amt_table_key).or_default().extend(
out.into_iter()
.enumerate()
.map(|(i, out)| (u64::try_from(i + numb_outs).unwrap(), out)),
)
}
Ok(())
}
}

View file

@ -13,21 +13,6 @@
// copies or substantial portions of the Software.
//
// Rust Levin Library
// Written in 2023 by
// Cuprate Contributors
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
//! This module provides a struct BucketHead for the header of a levin protocol
//! message.

View file

@ -21,8 +21,8 @@ use bytes::BufMut;
use epee_encoding::EpeeObject;
use std::{hash::Hash, net, net::SocketAddr};
mod serde_helper;
use serde_helper::*;
mod epee_builder;
use epee_builder::*;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum NetZone {