From b7df133175ab03c96e37c7da5bd10efc50486ea2 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Tue, 13 Feb 2024 00:51:11 +0000 Subject: [PATCH] consensus: fix batch handling when we don't have a full list of outputs. (#54) * consensus: fix batch handling when we don't have a full list of outputs. * change `scan_chain` to new API * clippy * add a test for calculating multiple difficulties * fmt * rx_seed -> rx_vms * consensus-rules: ring members younger than current block. * only create rx vms when required. * fix rx initiation when syncing * add single block verification (no batch) * update serai --- Cargo.lock | 214 ++++++--- Cargo.toml | 6 +- consensus/Cargo.toml | 3 +- consensus/rules/src/blocks.rs | 4 +- consensus/rules/src/transactions.rs | 16 +- .../rules/src/transactions/contextual_data.rs | 54 ++- consensus/src/bin/scan_chain.rs | 186 ++------ consensus/src/bin/tx_pool.rs | 251 ----------- consensus/src/block.rs | 420 +++++++++++++----- consensus/src/context.rs | 172 ++++--- consensus/src/context/difficulty.rs | 66 ++- consensus/src/context/rx_seed.rs | 118 ----- consensus/src/context/rx_vms.rs | 202 +++++++++ consensus/src/lib.rs | 30 +- consensus/src/rpc/cache.rs | 20 +- consensus/src/rpc/connection.rs | 7 +- consensus/src/tests/context.rs | 8 +- consensus/src/tests/context/difficulty.rs | 53 ++- consensus/src/tests/context/rx_vms.rs | 72 +++ consensus/src/transactions.rs | 136 +++--- consensus/src/transactions/contextual_data.rs | 46 +- consensus/src/transactions/output_cache.rs | 153 +++++++ net/levin/src/header.rs | 15 - net/monero-wire/src/network_address.rs | 4 +- .../{serde_helper.rs => epee_builder.rs} | 0 25 files changed, 1292 insertions(+), 964 deletions(-) delete mode 100644 consensus/src/bin/tx_pool.rs delete mode 100644 consensus/src/context/rx_seed.rs create mode 100644 consensus/src/context/rx_vms.rs create mode 100644 consensus/src/tests/context/rx_vms.rs create mode 100644 consensus/src/transactions/output_cache.rs rename net/monero-wire/src/network_address/{serde_helper.rs => epee_builder.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index 88cff0ca..67317819 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,9 +30,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +checksum = "42cd52102d3df161c77a887b608d7a4897d7cc112886a9537b738a887a03aaff" dependencies = [ "cfg-if", "once_cell", @@ -312,9 +312,9 @@ checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" [[package]] name = "chrono" -version = "0.4.33" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", @@ -420,9 +420,9 @@ dependencies = [ [[package]] name = "crc32fast" -version = "1.3.2" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" dependencies = [ "cfg-if", ] @@ -592,7 +592,7 @@ dependencies = [ [[package]] name = "dalek-ff-group" version = "0.4.1" -source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930" +source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629" dependencies = [ "crypto-bigint", "curve25519-dalek", @@ -662,7 +662,7 @@ dependencies = [ [[package]] name = "dleq" version = "0.4.1" -source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930" +source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629" dependencies = [ "digest", "ff", @@ -676,9 +676,9 @@ dependencies = [ [[package]] name = "either" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" [[package]] name = "encoding_rs" @@ -795,7 +795,7 @@ dependencies = [ [[package]] name = "flexible-transcript" version = "0.3.2" -source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930" +source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629" dependencies = [ "blake2", "digest", @@ -967,8 +967,8 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", - "indexmap 2.2.2", + "http 0.2.11", + "indexmap 2.2.3", "slab", "tokio", "tokio-util", @@ -1034,6 +1034,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1041,7 +1052,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.11", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1068,8 +1102,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.11", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1082,18 +1116,39 @@ dependencies = [ ] [[package]] -name = "hyper-rustls" -version = "0.24.2" +name = "hyper" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "itoa", + "pin-project-lite", + "tokio", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http", - "hyper", + "http 1.0.0", + "hyper 1.1.0", + "hyper-util", "rustls", "rustls-native-certs", + "rustls-pki-types", "tokio", "tokio-rustls", + "tower-service", ] [[package]] @@ -1103,12 +1158,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.28", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -1154,9 +1229,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.2" +version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" +checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -1368,7 +1443,7 @@ dependencies = [ [[package]] name = "monero-generators" version = "0.4.0" -source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930" +source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629" dependencies = [ "curve25519-dalek", "dalek-ff-group", @@ -1410,7 +1485,7 @@ dependencies = [ [[package]] name = "monero-serai" version = "0.1.4-alpha" -source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930" +source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629" dependencies = [ "async-lock", "async-trait", @@ -1456,7 +1531,7 @@ dependencies = [ [[package]] name = "multiexp" version = "0.4.0" -source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930" +source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629" dependencies = [ "ff", "group", @@ -1964,9 +2039,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.11", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-tls", "ipnet", "js-sys", @@ -1976,7 +2051,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -2036,23 +2111,26 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ "ring", + "rustls-pki-types", "rustls-webpki", - "sct", + "subtle", + "zeroize", ] [[package]] name = "rustls-native-certs" -version = "0.6.3" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 2.0.0", + "rustls-pki-types", "schannel", "security-framework", ] @@ -2067,12 +2145,29 @@ dependencies = [ ] [[package]] -name = "rustls-webpki" -version = "0.101.7" +name = "rustls-pemfile" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf" + +[[package]] +name = "rustls-webpki" +version = "0.102.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ "ring", + "rustls-pki-types", "untrusted", ] @@ -2115,16 +2210,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sealed" version = "0.5.0" @@ -2262,11 +2347,14 @@ dependencies = [ [[package]] name = "simple-request" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930" +source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629" dependencies = [ - "hyper", + "http-body-util", + "hyper 1.1.0", "hyper-rustls", + "hyper-util", "tokio", + "tower-service", ] [[package]] @@ -2303,7 +2391,7 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "std-shims" version = "0.1.1" -source = "git+https://github.com/Cuprate/serai.git?rev=f3429ec1ef386da1458f4ed244402f38e3e12930#f3429ec1ef386da1458f4ed244402f38e3e12930" +source = "git+https://github.com/Cuprate/serai.git?rev=347d4cf#347d4cf4135c92bc5b0a3e3cb66fa3ff51b1c629" dependencies = [ "hashbrown 0.14.3", "spin", @@ -2413,18 +2501,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", @@ -2526,11 +2614,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.24.1" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ "rustls", + "rustls-pki-types", "tokio", ] @@ -2572,7 +2661,7 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.2.2", + "indexmap 2.2.3", "toml_datetime", "winnow", ] @@ -2615,6 +2704,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3021,9 +3111,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.39" +version = "0.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5389a154b01683d28c77f8f68f49dea75f0a4da32557a58f68ee51ebba472d29" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 834e5ba8..ed1ced33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,13 +43,13 @@ chrono = { version = "0.4.31", default-features = false } crypto-bigint = { version = "0.5.5", default-features = false } crossbeam = { version = "0.8.4", default-features = false } curve25519-dalek = { version = "4.1.1", default-features = false } -dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "f3429ec1ef386da1458f4ed244402f38e3e12930", default-features = false } +dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false } dirs = { version = "5.0.1", default-features = false } futures = { version = "0.3.29", default-features = false } hex = { version = "0.4.3", default-features = false } hex-literal = { version = "0.4", default-features = false } -monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "f3429ec1ef386da1458f4ed244402f38e3e12930", default-features = false } -multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "f3429ec1ef386da1458f4ed244402f38e3e12930", default-features = false } +monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false } +multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false } pin-project = { version = "1.1.3", default-features = false } randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false } rand = { version = "0.8.5", default-features = false } diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 393c4dfd..a109144f 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -17,7 +17,6 @@ binaries = [ "tower/buffer", "tower/timeout", "monero-serai/http-rpc", - "dep:curve25519-dalek", "dep:tracing-subscriber", "dep:serde_json", "dep:serde", @@ -41,6 +40,7 @@ randomx-rs = { workspace = true } monero-serai = { workspace = true, features = ["std"] } multiexp = { workspace = true } dalek-ff-group = { workspace = true } +curve25519-dalek = { workspace = true } rayon = { workspace = true } thread_local = { workspace = true } @@ -58,7 +58,6 @@ tracing-subscriber = {version = "0.3", optional = true} borsh = { workspace = true, optional = true} dirs = {version="5.0", optional = true} clap = { version = "4.4.8", optional = true, features = ["derive"] } -curve25519-dalek = { workspace = true, optional = true } # here to help cargo to pick a version - remove me syn = "2.0.37" diff --git a/consensus/rules/src/blocks.rs b/consensus/rules/src/blocks.rs index 19e7d922..be275d2b 100644 --- a/consensus/rules/src/blocks.rs +++ b/consensus/rules/src/blocks.rs @@ -19,8 +19,8 @@ pub const PENALTY_FREE_ZONE_1: usize = 20000; pub const PENALTY_FREE_ZONE_2: usize = 60000; pub const PENALTY_FREE_ZONE_5: usize = 300000; -const RX_SEEDHASH_EPOCH_BLOCKS: u64 = 2048; -const RX_SEEDHASH_EPOCH_LAG: u64 = 64; +pub const RX_SEEDHASH_EPOCH_BLOCKS: u64 = 2048; +pub const RX_SEEDHASH_EPOCH_LAG: u64 = 64; #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] pub enum BlockError { diff --git a/consensus/rules/src/transactions.rs b/consensus/rules/src/transactions.rs index 9a7f31d8..281553dc 100644 --- a/consensus/rules/src/transactions.rs +++ b/consensus/rules/src/transactions.rs @@ -62,8 +62,8 @@ pub enum TransactionError { InputsOverflow, #[error("The transaction has no inputs.")] NoInputs, - #[error("Ring member not in database")] - RingMemberNotFound, + #[error("Ring member not in database or is not valid.")] + RingMemberNotFoundOrInvalid, //-------------------------------------------------------- Ring Signatures #[error("Ring signature incorrect.")] RingSignatureIncorrect, @@ -302,6 +302,7 @@ fn check_all_time_locks( current_time_lock_timestamp, hf, ) { + tracing::debug!("Transaction invalid: one or more inputs locked, lock: {time_lock:?}."); Err(TransactionError::OneOrMoreDecoysLocked) } else { Ok(()) @@ -451,6 +452,9 @@ fn check_10_block_lock( ) -> Result<(), TransactionError> { if hf >= &HardFork::V12 { if youngest_used_out_height + 10 > current_chain_height { + tracing::debug!( + "Transaction invalid: One or more ring members younger than 10 blocks." + ); Err(TransactionError::OneOrMoreDecoysLocked) } else { Ok(()) @@ -517,6 +521,14 @@ fn check_inputs_contextual( hf: &HardFork, spent_kis: Arc>>, ) -> Result<(), TransactionError> { + // This rule is not contained in monero-core explicitly, but it is enforced by how Monero picks ring members. + // When picking ring members monerod will only look in the DB at past blocks so an output has to be younger + // than this transaction to be used in this tx. + if tx_ring_members_info.youngest_used_out_height >= current_chain_height { + tracing::debug!("Transaction invalid: One or more ring members too young."); + Err(TransactionError::OneOrMoreDecoysLocked)?; + } + check_10_block_lock( tx_ring_members_info.youngest_used_out_height, current_chain_height, diff --git a/consensus/rules/src/transactions/contextual_data.rs b/consensus/rules/src/transactions/contextual_data.rs index 6b269b9d..dacd1396 100644 --- a/consensus/rules/src/transactions/contextual_data.rs +++ b/consensus/rules/src/transactions/contextual_data.rs @@ -13,8 +13,8 @@ use crate::{transactions::TransactionError, HardFork, TxVersion}; pub struct OutputOnChain { pub height: u64, pub time_lock: Timelock, - pub key: EdwardsPoint, - pub mask: EdwardsPoint, + pub key: Option, + pub commitment: EdwardsPoint, } /// Gets the absolute offsets from the relative offsets. @@ -68,7 +68,7 @@ pub fn insert_ring_member_ids( /// /// Will error if `outputs` does not contain the outputs needed. pub fn get_ring_members_for_inputs<'a>( - outputs: &'a HashMap>, + get_outputs: impl Fn(u64, u64) -> Option<&'a OutputOnChain>, inputs: &[Input], ) -> Result>, TransactionError> { inputs @@ -83,12 +83,8 @@ pub fn get_ring_members_for_inputs<'a>( Ok(offsets .iter() .map(|offset| { - // get the hashmap for this amount. - outputs - .get(&amount.unwrap_or(0)) - // get output at the index from the amount hashmap. - .and_then(|amount_map| amount_map.get(offset)) - .ok_or(TransactionError::RingMemberNotFound) + get_outputs(amount.unwrap_or(0), *offset) + .ok_or(TransactionError::RingMemberNotFoundOrInvalid) }) .collect::>()?) } @@ -108,13 +104,21 @@ pub enum Rings { impl Rings { /// Builds the rings for the transaction inputs, from the given outputs. - fn new(outputs: Vec>, tx_version: TxVersion) -> Rings { - match tx_version { + fn new( + outputs: Vec>, + tx_version: TxVersion, + ) -> Result { + Ok(match tx_version { TxVersion::RingSignatures => Rings::Legacy( outputs .into_iter() - .map(|inp_outs| inp_outs.into_iter().map(|out| out.key).collect()) - .collect(), + .map(|inp_outs| { + inp_outs + .into_iter() + .map(|out| out.key.ok_or(TransactionError::RingMemberNotFoundOrInvalid)) + .collect::, TransactionError>>() + }) + .collect::, TransactionError>>()?, ), TxVersion::RingCT => Rings::RingCT( outputs @@ -122,12 +126,18 @@ impl Rings { .map(|inp_outs| { inp_outs .into_iter() - .map(|out| [out.key, out.mask]) - .collect() + .map(|out| { + Ok([ + out.key + .ok_or(TransactionError::RingMemberNotFoundOrInvalid)?, + out.commitment, + ]) + }) + .collect::>() }) - .collect(), + .collect::>()?, ), - } + }) } } @@ -151,8 +161,8 @@ impl TxRingMembersInfo { decoy_info: Option, tx_version: TxVersion, hf: HardFork, - ) -> TxRingMembersInfo { - TxRingMembersInfo { + ) -> Result { + Ok(TxRingMembersInfo { youngest_used_out_height: used_outs .iter() .map(|inp_outs| { @@ -178,9 +188,9 @@ impl TxRingMembersInfo { }) .collect(), hf, - rings: Rings::new(used_outs, tx_version), + rings: Rings::new(used_outs, tx_version)?, decoy_info, - } + }) } } @@ -213,7 +223,7 @@ impl DecoyInfo { /// /// So: /// - /// amount_outs_on_chain(inputs[X]) == outputs_with_amount[X] + /// amount_outs_on_chain(inputs`[X]`) == outputs_with_amount`[X]` /// /// Do not rely on this function to do consensus checks! /// diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 7e747060..e0a4c896 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,43 +1,27 @@ -mod tx_pool; - #[cfg(feature = "binaries")] mod bin { - use std::{ - collections::{HashMap, HashSet}, - ops::Range, - path::PathBuf, - sync::Arc, - }; + use std::{ops::Range, path::PathBuf, sync::Arc}; use clap::Parser; - use futures::{ - channel::{mpsc, oneshot}, - SinkExt, StreamExt, - }; + use futures::{channel::mpsc, SinkExt, StreamExt}; use monero_serai::{block::Block, transaction::Transaction}; use tokio::sync::RwLock; use tower::{Service, ServiceExt}; use tracing::level_filters::LevelFilter; - use cuprate_helper::{asynch::rayon_spawn_async, network::Network}; + use cuprate_helper::network::Network; use cuprate_consensus::{ - block::PrePreparedBlockExPOW, context::{ BlockChainContextRequest, BlockChainContextResponse, ContextConfig, UpdateBlockchainCacheData, }, initialize_blockchain_context, initialize_verifier, - randomx::RandomXVM, rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig}, - Database, DatabaseRequest, DatabaseResponse, PrePreparedBlock, VerifiedBlockInformation, - VerifyBlockRequest, VerifyBlockResponse, + Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation, VerifyBlockRequest, + VerifyBlockResponse, }; - use monero_consensus::{blocks::randomx_seed_height, HardFork}; - - use super::tx_pool; - const MAX_BLOCKS_IN_RANGE: u64 = 500; const BATCHES_IN_REQUEST: u64 = 3; const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000; @@ -92,8 +76,7 @@ mod bin { } async fn call_blocks( - mut new_tx_chan: tx_pool::NewTxChanSen, - mut block_chan: mpsc::Sender>, + mut block_chan: mpsc::Sender)>>, start_height: u64, chain_height: u64, database: D, @@ -133,37 +116,6 @@ mod bin { chain_height ); - let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); - - let hf = |block: &Block| HardFork::from_version(block.header.major_version).unwrap(); - - let txs_hf = if blocks.first().map(hf) == blocks.last().map(hf) { - vec![( - txs.into_iter().flatten().collect::>(), - blocks.first().map(hf).unwrap(), - )] - } else { - let mut txs_hfs: Vec<(Vec, HardFork)> = Vec::new(); - let mut last_hf = blocks.first().map(hf).unwrap(); - - txs_hfs.push((vec![], last_hf)); - - for (mut txs, current_hf) in txs.into_iter().zip(blocks.iter().map(hf)) { - if current_hf == last_hf { - assert_eq!(txs_hfs.last_mut().unwrap().1, current_hf); - txs_hfs.last_mut().unwrap().0.append(&mut txs); - } else { - txs_hfs.push((txs, current_hf)); - last_hf = current_hf; - } - } - txs_hfs - }; - - let (tx, rx) = oneshot::channel(); - new_tx_chan.send((txs_hf, tx)).await?; - rx.await.unwrap().unwrap(); - block_chan.send(blocks).await?; } @@ -196,123 +148,37 @@ mod bin { let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?; - let (tx, rx) = oneshot::channel(); - - let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?; - - let (mut block_verifier, transaction_verifier) = - initialize_verifier(database.clone(), tx_pool_svc, ctx_svc.clone()).await?; - - tx.send(transaction_verifier).map_err(|_| "").unwrap(); + let (mut block_verifier, _) = + initialize_verifier(database.clone(), ctx_svc.clone()).await?; let start_height = cache.read().await.height; let (block_tx, mut incoming_blocks) = mpsc::channel(3); - let (mut prepped_blocks_tx, mut prepped_blocks_rx) = mpsc::channel(3); + tokio::spawn( + async move { call_blocks(block_tx, start_height, chain_height, database).await }, + ); - tokio::spawn(async move { - call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await - }); + while let Some(incoming_blocks) = incoming_blocks.next().await { + let VerifyBlockResponse::MainChainBatchPrep(blocks, txs) = block_verifier + .ready() + .await? + .call(VerifyBlockRequest::MainChainBatchPrep(incoming_blocks)) + .await? + else { + panic!() + }; - let BlockChainContextResponse::Context(ctx) = ctx_svc - .ready() - .await? - .call(BlockChainContextRequest::Get) - .await? - else { - panic!("ctx svc sent wrong response!"); - }; - let mut rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone(); - let mut rx_seed_cache_initiated = false; - - let mut randomx_vms: Option> = Some(HashMap::new()); - - let mut cloned_ctx_svc = ctx_svc.clone(); - tokio::spawn(async move { - while let Some(blocks) = incoming_blocks.next().await { - if blocks.last().unwrap().header.major_version >= 12 { - if !rx_seed_cache_initiated { - let BlockChainContextResponse::Context(ctx) = cloned_ctx_svc - .ready() - .await - .unwrap() - .call(BlockChainContextRequest::Get) - .await - .unwrap() - else { - panic!("ctx svc sent wrong response!"); - }; - rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone(); - rx_seed_cache_initiated = true; - } - - let unwrapped_rx_vms = randomx_vms.as_mut().unwrap(); - - let blocks = rayon_spawn_async(move || { - blocks - .into_iter() - .map(move |block| PrePreparedBlockExPOW::new(block).unwrap()) - .collect::>() - }) - .await; - - let seeds_needed = blocks - .iter() - .map(|block| { - rx_seed_cache.new_block(block.block.number() as u64, &block.block_hash); - randomx_seed_height(block.block.number() as u64) - }) - .collect::>(); - - unwrapped_rx_vms.retain(|seed_height, _| seeds_needed.contains(seed_height)); - - for seed_height in seeds_needed { - unwrapped_rx_vms.entry(seed_height).or_insert_with(|| { - RandomXVM::new(rx_seed_cache.get_seeds_hash(seed_height)).unwrap() - }); - } - - let arc_rx_vms = Arc::new(randomx_vms.take().unwrap()); - let cloned_arc_rx_vms = arc_rx_vms.clone(); - let blocks = rayon_spawn_async(move || { - blocks - .into_iter() - .map(move |block| { - let rx_vm = arc_rx_vms - .get(&randomx_seed_height(block.block.number() as u64)) - .unwrap(); - PrePreparedBlock::new_rx(block, rx_vm).unwrap() - }) - .collect::>() - }) - .await; - - randomx_vms = Some(Arc::into_inner(cloned_arc_rx_vms).unwrap()); - - prepped_blocks_tx.send(blocks).await.unwrap(); - } else { - let blocks = rayon_spawn_async(move || { - blocks - .into_iter() - .map(move |block| PrePreparedBlock::new(block).unwrap()) - .collect::>() - }) - .await; - - prepped_blocks_tx.send(blocks).await.unwrap(); - } - } - }); - - while let Some(incoming_blocks) = prepped_blocks_rx.next().await { let mut height; - for block in incoming_blocks { + for (block, txs) in blocks.into_iter().zip(txs) { let VerifyBlockResponse::MainChain(verified_block_info) = block_verifier .ready() .await? - .call(VerifyBlockRequest::MainChainPrepared(block)) - .await?; + .call(VerifyBlockRequest::MainChainPrepared(block, txs)) + .await? + else { + panic!() + }; height = verified_block_info.height; diff --git a/consensus/src/bin/tx_pool.rs b/consensus/src/bin/tx_pool.rs deleted file mode 100644 index dd596232..00000000 --- a/consensus/src/bin/tx_pool.rs +++ /dev/null @@ -1,251 +0,0 @@ -#[cfg(feature = "binaries")] -mod bin { - use std::{ - collections::HashMap, - sync::{Arc, Mutex}, - task::{Context, Poll}, - }; - - use futures::{ - channel::{mpsc, oneshot}, - StreamExt, - }; - use monero_serai::transaction::Transaction; - use tower::{Service, ServiceExt}; - - use cuprate_helper::asynch::InfallibleOneshotReceiver; - - use cuprate_consensus::{ - context::{ - BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, - RawBlockChainContext, - }, - transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, - ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse, - }; - use monero_consensus::HardFork; - - #[derive(Clone)] - pub struct TxPoolHandle { - tx_pool_task: std::sync::Arc>, - tx_pool_chan: mpsc::Sender<( - TxPoolRequest, - oneshot::Sender>, - )>, - } - - impl tower::Service for TxPoolHandle { - type Response = TxPoolResponse; - type Error = TxNotInPool; - type Future = InfallibleOneshotReceiver>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.tx_pool_task.is_finished() { - panic!("Tx pool task finished before it was supposed to!"); - }; - - self.tx_pool_chan - .poll_ready(cx) - .map_err(|_| panic!("Tx pool channel closed before it was supposed to")) - } - - fn call(&mut self, req: TxPoolRequest) -> Self::Future { - let (tx, rx) = oneshot::channel(); - self.tx_pool_chan - .try_send((req, tx)) - .expect("You need to use `poll_ready` to check capacity!"); - - rx.into() - } - } - - pub type NewTxChanRec = mpsc::Receiver<( - Vec<(Vec, HardFork)>, - oneshot::Sender>, - )>; - - pub type NewTxChanSen = mpsc::Sender<( - Vec<(Vec, HardFork)>, - oneshot::Sender>, - )>; - - pub struct TxPool { - txs: Arc>>>, - current_ctx: BlockChainContext, - tx_verifier: Option, - tx_verifier_chan: Option>, - ctx_svc: Ctx, - } - - impl TxPool - where - TxV: Service - + Clone - + Send - + 'static, - TxV::Future: Send + 'static, - Ctx: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - Ctx::Future: Send + 'static, - { - pub async fn spawn( - tx_verifier_chan: oneshot::Receiver, - mut ctx_svc: Ctx, - ) -> Result<(TxPoolHandle, NewTxChanSen), tower::BoxError> { - let BlockChainContextResponse::Context(current_ctx) = ctx_svc - .ready() - .await? - .call(BlockChainContextRequest::Get) - .await? - else { - panic!("Context service service returned wrong response!") - }; - - let tx_pool = TxPool { - txs: Default::default(), - current_ctx, - tx_verifier: None, - tx_verifier_chan: Some(tx_verifier_chan), - ctx_svc, - }; - - let (tx_pool_tx, tx_pool_rx) = mpsc::channel(3); - let (new_tx_tx, new_tx_rx) = mpsc::channel(3); - - let tx_pool_task = tokio::spawn(tx_pool.run(tx_pool_rx, new_tx_rx)); - - Ok(( - TxPoolHandle { - tx_pool_task: tx_pool_task.into(), - tx_pool_chan: tx_pool_tx, - }, - new_tx_tx, - )) - } - - async fn get_or_update_ctx(&mut self) -> Result { - if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() { - Ok(current_ctx) - } else { - let BlockChainContextResponse::Context(current_ctx) = self - .ctx_svc - .ready() - .await? - .call(BlockChainContextRequest::Get) - .await? - else { - panic!("Context service service returned wrong response!") - }; - - self.current_ctx = current_ctx; - - Ok(self.current_ctx.unchecked_blockchain_context().clone()) - } - } - - async fn handle_txs_req( - &mut self, - req: TxPoolRequest, - tx: oneshot::Sender>, - ) { - let TxPoolRequest::Transactions(txs_to_get) = req; - - let mut res = Vec::with_capacity(txs_to_get.len()); - - for tx_hash in txs_to_get { - let Some(tx) = self.txs.lock().unwrap().remove(&tx_hash) else { - tracing::debug!("tx not in pool: {}", hex::encode(tx_hash)); - let _ = tx.send(Err(TxNotInPool)); - return; - }; - res.push(tx) - } - - let _ = tx.send(Ok(TxPoolResponse::Transactions(res))); - } - - async fn handle_new_txs( - &mut self, - new_txs: Vec<(Vec, HardFork)>, - res_chan: oneshot::Sender>, - ) -> Result<(), tower::BoxError> { - if self.tx_verifier.is_none() { - self.tx_verifier = Some(self.tx_verifier_chan.take().unwrap().await?); - } - - let current_ctx = self.get_or_update_ctx().await?; - - let mut tx_verifier = self.tx_verifier.clone().unwrap(); - let tx_pool = self.txs.clone(); - - tokio::spawn(async move { - for (txs, hf) in new_txs { - // We only batch the setup a real tx pool would also call `VerifyTxRequest::Block` - let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier - .ready() - .await - .unwrap() - .call(VerifyTxRequest::BatchSetup { - txs, - hf, - re_org_token: current_ctx.re_org_token.clone(), - }) - .await - .unwrap() - else { - panic!("Tx verifier sent incorrect response!"); - }; - - let mut locked_pool = tx_pool.lock().unwrap(); - - for tx in txs { - let tx_hash = tx.tx_hash; - if locked_pool.insert(tx_hash, tx).is_some() { - panic!("added same tx to pool twice: {}", hex::encode(tx_hash)) - } - } - } - res_chan.send(Ok(())).unwrap(); - }); - Ok(()) - } - - pub async fn run( - mut self, - mut tx_pool_handle: mpsc::Receiver<( - TxPoolRequest, - oneshot::Sender>, - )>, - mut new_tx_channel: NewTxChanRec, - ) { - loop { - tokio::select! { - biased; - new_txs = new_tx_channel.next() => { - let Some(new_txs) = new_txs else { - todo!("Shutdown txpool") - }; - - self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap() - } - pool_req = tx_pool_handle.next() => { - let Some((req, tx)) = pool_req else { - todo!("Shutdown txpool") - }; - self.handle_txs_req(req, tx).await; - } - } - } - } - } -} - -#[cfg(feature = "binaries")] -pub use bin::*; - -#[allow(dead_code)] -fn main() {} diff --git a/consensus/src/block.rs b/consensus/src/block.rs index 2fe770ca..57ed8691 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -1,25 +1,39 @@ use std::{ + collections::HashSet, future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, }; +use cuprate_helper::asynch::rayon_spawn_async; use futures::FutureExt; -use monero_serai::block::Block; -use monero_serai::transaction::Input; +use monero_serai::{ + block::Block, + transaction::{Input, Transaction}, +}; +use rayon::prelude::*; use tower::{Service, ServiceExt}; use monero_consensus::{ - blocks::{calculate_pow_hash, check_block, check_block_pow, BlockError, RandomX}, + blocks::{ + calculate_pow_hash, check_block, check_block_pow, is_randomx_seed_height, + randomx_seed_height, BlockError, RandomX, + }, miner_tx::MinerTxError, ConsensusError, HardFork, }; use crate::{ - context::{BlockChainContextRequest, BlockChainContextResponse}, - transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, - ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse, + context::{ + rx_vms::RandomXVM, BlockChainContextRequest, BlockChainContextResponse, + RawBlockChainContext, + }, + transactions::{ + batch_setup_txs, contextual_data, OutputCache, TransactionVerificationData, + VerifyTxRequest, VerifyTxResponse, + }, + Database, ExtendedConsensusError, }; #[derive(Debug)] @@ -31,6 +45,7 @@ pub struct PrePreparedBlockExPOW { pub hf_version: HardFork, pub block_hash: [u8; 32], + pub height: u64, pub miner_tx_weight: usize, } @@ -40,12 +55,19 @@ impl PrePreparedBlockExPOW { let (hf_version, hf_vote) = HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?; + let Some(Input::Gen(height)) = block.miner_tx.prefix.inputs.first() else { + Err(ConsensusError::Block(BlockError::MinerTxError( + MinerTxError::InputNotOfTypeGen, + )))? + }; + Ok(PrePreparedBlockExPOW { block_blob: block.serialize(), hf_vote, hf_version, block_hash: block.hash(), + height: *height, miner_tx_weight: block.miner_tx.weight(), block, @@ -107,7 +129,7 @@ impl PrePreparedBlock { pub fn new_rx( block: PrePreparedBlockExPOW, - randomx_vm: &R, + randomx_vm: Option<&R>, ) -> Result { let Some(Input::Gen(height)) = block.block.miner_tx.prefix.inputs.first() else { Err(ConsensusError::Block(BlockError::MinerTxError( @@ -122,7 +144,7 @@ impl PrePreparedBlock { block_hash: block.block_hash, pow_hash: calculate_pow_hash( - Some(randomx_vm), + randomx_vm, &block.block.serialize_hashable(), *height, &block.hf_version, @@ -149,24 +171,33 @@ pub struct VerifiedBlockInformation { } pub enum VerifyBlockRequest { - MainChain(Block), - MainChainPrepared(PrePreparedBlock), + MainChainBatchPrep(Vec<(Block, Vec)>), + MainChain { + block: Block, + prepared_txs: Vec>, + txs: Vec, + }, + MainChainPrepared(PrePreparedBlock, Vec>), } pub enum VerifyBlockResponse { MainChain(VerifiedBlockInformation), + MainChainBatchPrep( + Vec, + Vec>>, + ), } // TODO: it is probably a bad idea for this to derive clone, if 2 places (RPC, P2P) receive valid but different blocks // then they will both get approved but only one should go to main chain. #[derive(Clone)] -pub struct BlockVerifierService { +pub struct BlockVerifierService { context_svc: C, tx_verifier_svc: TxV, - tx_pool: TxP, + database: D, } -impl BlockVerifierService +impl BlockVerifierService where C: Service + Clone @@ -176,25 +207,23 @@ where + Clone + Send + 'static, - TxP: Service - + Clone - + Send - + 'static, + D: Database + Clone + Send + Sync + 'static, + D::Future: Send + 'static, { pub fn new( context_svc: C, tx_verifier_svc: TxV, - tx_pool: TxP, - ) -> BlockVerifierService { + database: D, + ) -> BlockVerifierService { BlockVerifierService { context_svc, tx_verifier_svc, - tx_pool, + database, } } } -impl Service for BlockVerifierService +impl Service for BlockVerifierService where C: Service< BlockChainContextRequest, @@ -211,11 +240,8 @@ where + 'static, TxV::Future: Send + 'static, - TxP: Service - + Clone - + Send - + 'static, - TxP::Future: Send + 'static, + D: Database + Clone + Send + Sync + 'static, + D::Future: Send + 'static, { type Response = VerifyBlockResponse; type Error = ExtendedConsensusError; @@ -229,33 +255,219 @@ where fn call(&mut self, req: VerifyBlockRequest) -> Self::Future { let context_svc = self.context_svc.clone(); let tx_verifier_svc = self.tx_verifier_svc.clone(); - let tx_pool = self.tx_pool.clone(); + let database = self.database.clone(); async move { match req { - VerifyBlockRequest::MainChain(block) => { - verify_main_chain_block(block, context_svc, tx_verifier_svc, tx_pool).await + VerifyBlockRequest::MainChain { + block, + prepared_txs, + txs, + } => { + verify_main_chain_block(block, txs, prepared_txs, context_svc, tx_verifier_svc) + .await } - VerifyBlockRequest::MainChainPrepared(prepped_block) => { + VerifyBlockRequest::MainChainPrepared(prepped_block, txs) => { verify_main_chain_block_prepared( prepped_block, + txs, context_svc, tx_verifier_svc, - tx_pool, + None, ) .await } + VerifyBlockRequest::MainChainBatchPrep(blocks) => { + batch_verify_main_chain_block(blocks, context_svc, database).await + } } } .boxed() } } -async fn verify_main_chain_block_prepared( +async fn batch_verify_main_chain_block( + blocks: Vec<(Block, Vec)>, + mut context_svc: C, + mut database: D, +) -> Result +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Send + + 'static, + C::Future: Send + 'static, + D: Database + Clone + Send + Sync + 'static, + D::Future: Send + 'static, +{ + let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); + + tracing::debug!("Calculating block hashes."); + let blocks: Vec = rayon_spawn_async(|| { + blocks + .into_iter() + .map(PrePreparedBlockExPOW::new) + .collect::, _>>() + }) + .await?; + + let mut timestamps_hfs = Vec::with_capacity(blocks.len()); + let mut new_rx_vm = None; + + for window in blocks.windows(2) { + if window[0].block_hash != window[1].block.header.previous + || window[0].height != window[1].height - 1 + { + Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?; + } + + if is_randomx_seed_height(window[0].height) { + new_rx_vm = Some((window[0].height, window[0].block_hash)); + } + + timestamps_hfs.push((window[0].block.header.timestamp, window[0].hf_version)) + } + + tracing::debug!("getting blockchain context"); + let BlockChainContextResponse::Context(checked_context) = context_svc + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; + + let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc + .ready() + .await? + .call(BlockChainContextRequest::BatchGetDifficulties( + timestamps_hfs, + )) + .await + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; + + let context = checked_context.unchecked_blockchain_context().clone(); + + if context.chain_height != blocks[0].height { + Err(ConsensusError::Block(BlockError::MinerTxError( + MinerTxError::InputsHeightIncorrect, + )))?; + } + + if context.top_hash != blocks[0].block.header.previous { + Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?; + } + + let mut rx_vms = context.rx_vms; + + if let Some((new_vm_height, new_vm_seed)) = new_rx_vm { + let new_vm = rayon_spawn_async(move || { + Arc::new(RandomXVM::new(&new_vm_seed).expect("RandomX VM gave an error on set up!")) + }) + .await; + + context_svc + .ready() + .await? + .call(BlockChainContextRequest::NewRXVM(( + new_vm_seed, + new_vm.clone(), + ))) + .await + .map_err(Into::::into)?; + + rx_vms.insert(new_vm_height, new_vm); + } + + let blocks = rayon_spawn_async(move || { + blocks + .into_par_iter() + .zip(difficulties) + .map(|(block, difficultly)| { + let height = block.height; + let block = PrePreparedBlock::new_rx( + block, + rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref), + )?; + + check_block_pow(&block.pow_hash, difficultly)?; + Ok(block) + }) + .collect::, ConsensusError>>() + }) + .await?; + + let txs = batch_setup_txs( + txs.into_iter() + .zip(blocks.iter().map(|block| block.hf_version)) + .collect(), + ) + .await?; + + let mut complete_block_idx = 0; + + let mut out_cache = OutputCache::new(); + + out_cache + .extend_from_block( + blocks + .iter() + .map(|block| &block.block) + .zip(txs.iter().map(Vec::as_slice)), + &mut database, + ) + .await?; + + for (idx, hf) in blocks + .windows(2) + .enumerate() + .filter(|(_, block)| block[0].hf_version != blocks[1].hf_version) + .map(|(i, block)| (i, &block[0].hf_version)) + { + contextual_data::batch_fill_ring_member_info( + txs.iter() + .take(idx + 1) + .skip(complete_block_idx) + .flat_map(|txs| txs.iter()), + hf, + context.re_org_token.clone(), + database.clone(), + Some(&out_cache), + ) + .await?; + + complete_block_idx = idx + 1; + } + + if complete_block_idx != blocks.len() { + contextual_data::batch_fill_ring_member_info( + txs.iter() + .skip(complete_block_idx) + .flat_map(|txs| txs.iter()), + &blocks.last().unwrap().hf_version, + context.re_org_token.clone(), + database.clone(), + Some(&out_cache), + ) + .await?; + } + + Ok(VerifyBlockResponse::MainChainBatchPrep(blocks, txs)) +} + +async fn verify_main_chain_block_prepared( prepped_block: PrePreparedBlock, + txs: Vec>, context_svc: C, tx_verifier_svc: TxV, - tx_pool: TxP, + context: Option, ) -> Result where C: Service< @@ -266,27 +478,41 @@ where + 'static, C::Future: Send + 'static, TxV: Service, - TxP: Service - + Clone - + Send - + 'static, { - tracing::debug!("getting blockchain context"); - let BlockChainContextResponse::Context(checked_context) = context_svc - .oneshot(BlockChainContextRequest::Get) - .await - .map_err(Into::::into)? - else { - panic!("Context service returned wrong response!"); + let context = match context { + Some(context) => context, + None => { + tracing::debug!("getting blockchain context"); + let BlockChainContextResponse::Context(checked_context) = context_svc + .oneshot(BlockChainContextRequest::GetContext) + .await + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; + + let context = checked_context.unchecked_blockchain_context().clone(); + + tracing::debug!("got blockchain context: {:?}", context); + context + } }; - let context = checked_context.unchecked_blockchain_context().clone(); + check_block_pow(&prepped_block.pow_hash, context.next_difficulty) + .map_err(ConsensusError::Block)?; - tracing::debug!("got blockchain context: {:?}", context); - - let TxPoolResponse::Transactions(txs) = tx_pool - .oneshot(TxPoolRequest::Transactions(prepped_block.block.txs.clone())) - .await?; + // Check that the txs included are what we need and that there are not any extra. + // Collecting into a HashSet could hide duplicates but we check Key Images are unique so someone would have to find + // a hash collision to include duplicate txs here. + let mut tx_hashes = txs.iter().map(|tx| &tx.tx_hash).collect::>(); + for tx_hash in &prepped_block.block.txs { + if !tx_hashes.remove(tx_hash) { + return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); + } + } + if !tx_hashes.is_empty() { + return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); + } tx_verifier_svc .oneshot(VerifyTxRequest::Block { @@ -311,9 +537,6 @@ where ) .map_err(ConsensusError::Block)?; - check_block_pow(&prepped_block.pow_hash, context.next_difficulty) - .map_err(ConsensusError::Block)?; - Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation { block_hash: prepped_block.block_hash, block: prepped_block.block, @@ -328,11 +551,12 @@ where })) } -async fn verify_main_chain_block( - _block: Block, - _context_svc: C, - _tx_verifier_svc: TxV, - _tx_pool: TxP, +async fn verify_main_chain_block( + block: Block, + txs: Vec, + mut prepared_txs: Vec>, + mut context_svc: C, + tx_verifier_svc: TxV, ) -> Result where C: Service< @@ -343,17 +567,12 @@ where + 'static, C::Future: Send + 'static, TxV: Service, - TxP: Service - + Clone - + Send - + 'static, { - todo!("Single main chain block."); - - /* tracing::debug!("getting blockchain context"); let BlockChainContextResponse::Context(checked_context) = context_svc - .oneshot(BlockChainContextRequest::Get) + .ready() + .await? + .call(BlockChainContextRequest::GetContext) .await .map_err(Into::::into)? else { @@ -361,61 +580,28 @@ where }; let context = checked_context.unchecked_blockchain_context().clone(); - tracing::debug!("got blockchain context: {:?}", context); - let TxPoolResponse::Transactions(txs) = tx_pool - .oneshot(TxPoolRequest::Transactions(block.txs.clone())) - .await?; + let rx_vms = context.rx_vms.clone(); + let prepped_block = rayon_spawn_async(move || { + let prepped_block_ex_pow = PrePreparedBlockExPOW::new(block)?; + let height = prepped_block_ex_pow.height; - tx_verifier_svc - .oneshot(VerifyTxRequest::Block { - txs: txs.clone(), - current_chain_height: context.chain_height, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hf, - re_org_token: context.re_org_token.clone(), - }) - .await?; + PrePreparedBlock::new_rx(prepped_block_ex_pow, rx_vms.get(&height).map(AsRef::as_ref)) + }) + .await?; - let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::(); - let total_fees = txs.iter().map(|tx| tx.fee).sum::(); + check_block_pow(&prepped_block.pow_hash, context.cumulative_difficulty) + .map_err(ConsensusError::Block)?; - let (hf_vote, generated_coins) = check_block( - &block, - total_fees, - block_weight, - block.serialize().len(), - &context.context_to_verify_block, + prepared_txs.append(&mut batch_setup_txs(vec![(txs, context.current_hf)]).await?[0]); + + verify_main_chain_block_prepared( + prepped_block, + prepared_txs, + context_svc, + tx_verifier_svc, + Some(context), ) - .map_err(ConsensusError::Block)?; - - let hashing_blob = block.serialize_hashable(); - - // do POW test last - let chain_height = context.chain_height; - let current_hf = context.current_hf; - let pow_hash = todo!(); - /* - rayon_spawn_async(move || calculate_pow_hash(, &hashing_blob, chain_height, ¤t_hf)) - .await - .map_err(ConsensusError::Block)?; - - */ - - check_block_pow(&pow_hash, context.next_difficulty).map_err(ConsensusError::Block)?; - - Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation { - block_hash: block.hash(), - block, - txs, - pow_hash, - generated_coins, - weight: block_weight, - height: context.chain_height, - long_term_weight: context.next_block_long_term_weight(block_weight), - hf_vote, - cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty, - })) - */ + .await } diff --git a/consensus/src/context.rs b/consensus/src/context.rs index 947cdfdb..68750ad9 100644 --- a/consensus/src/context.rs +++ b/consensus/src/context.rs @@ -7,14 +7,15 @@ use std::{ cmp::min, + collections::HashMap, future::Future, ops::DerefMut, + pin::Pin, sync::Arc, task::{Context, Poll}, }; use futures::{ - future::{ready, Ready}, lock::{Mutex, OwnedMutexGuard, OwnedMutexLockFuture}, FutureExt, }; @@ -26,13 +27,14 @@ use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError} pub(crate) mod difficulty; pub(crate) mod hardforks; -pub(crate) mod rx_seed; +pub(crate) mod rx_vms; pub(crate) mod weight; mod tokens; pub use difficulty::DifficultyCacheConfig; pub use hardforks::HardForkConfig; +use rx_vms::RandomXVM; pub use tokens::*; pub use weight::BlockWeightsCacheConfig; @@ -117,6 +119,11 @@ where panic!("Database sent incorrect response!"); }; + let db = database.clone(); + let hardfork_state_handle = tokio::spawn(async move { + hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await + }); + let db = database.clone(); let difficulty_cache_handle = tokio::spawn(async move { difficulty::DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db).await @@ -127,14 +134,12 @@ where weight::BlockWeightsCache::init_from_chain_height(chain_height, weights_config, db).await }); - let db = database.clone(); - let hardfork_state_handle = tokio::spawn(async move { - hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await - }); + let hardfork_state = hardfork_state_handle.await.unwrap()?; + let current_hf = hardfork_state.current_hardfork(); let db = database.clone(); let rx_seed_handle = tokio::spawn(async move { - rx_seed::RandomXSeed::init_from_chain_height(chain_height, db).await + rx_vms::RandomXVMCache::init_from_chain_height(chain_height, ¤t_hf, db).await }); let context_svc = BlockChainContextService { @@ -145,7 +150,7 @@ where difficulty_cache: difficulty_cache_handle.await.unwrap()?, weight_cache: weight_cache_handle.await.unwrap()?, rx_seed_cache: rx_seed_handle.await.unwrap()?, - hardfork_state: hardfork_state_handle.await.unwrap()?, + hardfork_state, chain_height, already_generated_coins, top_block_hash, @@ -166,7 +171,7 @@ pub struct RawBlockChainContext { pub cumulative_difficulty: u128, /// A token which is used to signal if a reorg has happened since creating the token. pub re_org_token: ReOrgToken, - pub rx_seed_cache: rx_seed::RandomXSeed, + pub rx_vms: HashMap>, pub context_to_verify_block: ContextToVerifyBlock, /// The median long term block weight. median_long_term_weight: usize, @@ -268,12 +273,20 @@ pub struct UpdateBlockchainCacheData { #[derive(Debug, Clone)] pub enum BlockChainContextRequest { - Get, + GetContext, + /// Get the next difficulties for these blocks. + /// + /// Inputs: a list of block timestamps and hfs + /// + /// The number of difficulties returned will be one more than the number of timestamps/ hfs. + BatchGetDifficulties(Vec<(u64, HardFork)>), + NewRXVM(([u8; 32], Arc)), Update(UpdateBlockchainCacheData), } pub enum BlockChainContextResponse { Context(BlockChainContext), + BatchDifficulties(Vec), Ok, } struct InternalBlockChainContext { @@ -285,7 +298,7 @@ struct InternalBlockChainContext { difficulty_cache: difficulty::DifficultyCache, weight_cache: weight::BlockWeightsCache, - rx_seed_cache: rx_seed::RandomXSeed, + rx_seed_cache: rx_vms::RandomXVMCache, hardfork_state: hardforks::HardForkState, chain_height: u64, @@ -315,7 +328,8 @@ impl Clone for BlockChainContextService { impl Service for BlockChainContextService { type Response = BlockChainContextResponse; type Error = tower::BoxError; - type Future = Ready>; + type Future = + Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { @@ -325,8 +339,8 @@ impl Service for BlockChainContextService { Arc::clone(&self.internal_blockchain_context).lock_owned(), ) } - MutexLockState::Acquiring(rpc) => { - self.lock_state = MutexLockState::Acquired(futures::ready!(rpc.poll_unpin(cx))) + MutexLockState::Acquiring(lock) => { + self.lock_state = MutexLockState::Acquired(futures::ready!(lock.poll_unpin(cx))) } MutexLockState::Acquired(_) => return Poll::Ready(Ok(())), } @@ -339,69 +353,91 @@ impl Service for BlockChainContextService { else { panic!("poll_ready() was not called first!") }; + async move { + let InternalBlockChainContext { + current_validity_token, + current_reorg_token, + difficulty_cache, + weight_cache, + rx_seed_cache, + hardfork_state, + chain_height, + top_block_hash, + already_generated_coins, + } = internal_blockchain_context.deref_mut(); - let InternalBlockChainContext { - current_validity_token, - current_reorg_token, - difficulty_cache, - weight_cache, - rx_seed_cache, - hardfork_state, - chain_height, - top_block_hash, - already_generated_coins, - } = internal_blockchain_context.deref_mut(); + let res = match req { + BlockChainContextRequest::GetContext => { + let current_hf = hardfork_state.current_hardfork(); - let res = match req { - BlockChainContextRequest::Get => { - let current_hf = hardfork_state.current_hardfork(); - - BlockChainContextResponse::Context(BlockChainContext { - validity_token: current_validity_token.clone(), - raw: RawBlockChainContext { - context_to_verify_block: ContextToVerifyBlock { - median_weight_for_block_reward: weight_cache - .median_for_block_reward(¤t_hf), - effective_median_weight: weight_cache - .effective_median_block_weight(¤t_hf), - top_hash: *top_block_hash, - median_block_timestamp: difficulty_cache.median_timestamp( - usize::try_from(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW).unwrap(), - ), - chain_height: *chain_height, - current_hf, - next_difficulty: difficulty_cache.next_difficulty(¤t_hf), - already_generated_coins: *already_generated_coins, + BlockChainContextResponse::Context(BlockChainContext { + validity_token: current_validity_token.clone(), + raw: RawBlockChainContext { + context_to_verify_block: ContextToVerifyBlock { + median_weight_for_block_reward: weight_cache + .median_for_block_reward(¤t_hf), + effective_median_weight: weight_cache + .effective_median_block_weight(¤t_hf), + top_hash: *top_block_hash, + median_block_timestamp: difficulty_cache.median_timestamp( + usize::try_from(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW).unwrap(), + ), + chain_height: *chain_height, + current_hf, + next_difficulty: difficulty_cache.next_difficulty(¤t_hf), + already_generated_coins: *already_generated_coins, + }, + rx_vms: rx_seed_cache.get_vms(), + cumulative_difficulty: difficulty_cache.cumulative_difficulty(), + median_long_term_weight: weight_cache.median_long_term_weight(), + top_block_timestamp: difficulty_cache.top_block_timestamp(), + re_org_token: current_reorg_token.clone(), }, - rx_seed_cache: rx_seed_cache.clone(), - cumulative_difficulty: difficulty_cache.cumulative_difficulty(), - median_long_term_weight: weight_cache.median_long_term_weight(), - top_block_timestamp: difficulty_cache.top_block_timestamp(), - re_org_token: current_reorg_token.clone(), - }, - }) - } - BlockChainContextRequest::Update(new) => { - // Cancel the validity token and replace it with a new one. - std::mem::replace(current_validity_token, ValidityToken::new()).set_data_invalid(); + }) + } + BlockChainContextRequest::BatchGetDifficulties(blocks) => { + let next_diffs = difficulty_cache + .next_difficulties(blocks, &hardfork_state.current_hardfork()); + BlockChainContextResponse::BatchDifficulties(next_diffs) + } + BlockChainContextRequest::NewRXVM(vm) => { + rx_seed_cache.add_vm(vm); + BlockChainContextResponse::Ok + } + BlockChainContextRequest::Update(new) => { + // Cancel the validity token and replace it with a new one. + std::mem::replace(current_validity_token, ValidityToken::new()) + .set_data_invalid(); - difficulty_cache.new_block(new.height, new.timestamp, new.cumulative_difficulty); + difficulty_cache.new_block( + new.height, + new.timestamp, + new.cumulative_difficulty, + ); - weight_cache.new_block(new.height, new.weight, new.long_term_weight); + weight_cache.new_block(new.height, new.weight, new.long_term_weight); - hardfork_state.new_block(new.vote, new.height); + hardfork_state.new_block(new.vote, new.height); - rx_seed_cache.new_block(new.height, &new.new_top_hash); + rx_seed_cache + .new_block( + new.height, + &new.new_top_hash, + &hardfork_state.current_hardfork(), + ) + .await; - *chain_height = new.height + 1; - *top_block_hash = new.new_top_hash; - *already_generated_coins = - already_generated_coins.saturating_add(new.generated_coins); + *chain_height = new.height + 1; + *top_block_hash = new.new_top_hash; + *already_generated_coins = + already_generated_coins.saturating_add(new.generated_coins); - BlockChainContextResponse::Ok - } - }; + BlockChainContextResponse::Ok + } + }; - ready(Ok(res)) + Ok(res) + } + .boxed() } } diff --git a/consensus/src/context/difficulty.rs b/consensus/src/context/difficulty.rs index 055cd77b..9416a164 100644 --- a/consensus/src/context/difficulty.rs +++ b/consensus/src/context/difficulty.rs @@ -19,7 +19,7 @@ const DIFFICULTY_LAG: usize = 15; /// Configuration for the difficulty cache. /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct DifficultyCacheConfig { pub(crate) window: usize, pub(crate) cut: usize, @@ -52,7 +52,7 @@ impl DifficultyCacheConfig { /// This struct is able to calculate difficulties from blockchain information. /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub(crate) struct DifficultyCache { /// The list of timestamps in the window. /// len <= [`DIFFICULTY_BLOCKS_COUNT`] @@ -121,20 +121,24 @@ impl DifficultyCache { return 1; } - let mut sorted_timestamps = self.timestamps.clone(); - if sorted_timestamps.len() > self.config.window { - sorted_timestamps.drain(self.config.window..); + let mut timestamps = self.timestamps.clone(); + if timestamps.len() > self.config.window { + // remove the lag. + timestamps.drain(self.config.window..); }; - sorted_timestamps.make_contiguous().sort_unstable(); + let timestamps_slice = timestamps.make_contiguous(); let (window_start, window_end) = get_window_start_and_end( - sorted_timestamps.len(), + timestamps_slice.len(), self.config.accounted_window_len(), self.config.window, ); - let mut time_span = - u128::from(sorted_timestamps[window_end - 1] - sorted_timestamps[window_start]); + // We don't sort the whole timestamp list + let mut time_span = u128::from( + *timestamps_slice.select_nth_unstable(window_end - 1).1 + - *timestamps_slice.select_nth_unstable(window_start).1, + ); let windowed_work = self.cumulative_difficulties[window_end - 1] - self.cumulative_difficulties[window_start]; @@ -147,6 +151,50 @@ impl DifficultyCache { (windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span } + pub fn next_difficulties( + &mut self, + blocks: Vec<(u64, HardFork)>, + current_hf: &HardFork, + ) -> Vec { + let new_timestamps_len = blocks.len(); + let initial_len = self.timestamps.len(); + + let mut difficulties = Vec::with_capacity(blocks.len() + 1); + + difficulties.push(self.next_difficulty(current_hf)); + + let mut diff_info_popped = Vec::new(); + + for (new_timestamp, hf) in blocks { + self.timestamps.push_back(new_timestamp); + self.cumulative_difficulties + .push_back(self.cumulative_difficulty() + *difficulties.last().unwrap()); + if u64::try_from(self.timestamps.len()).unwrap() > self.config.total_block_count() { + diff_info_popped.push(( + self.timestamps.pop_front().unwrap(), + self.cumulative_difficulties.pop_front().unwrap(), + )); + } + + difficulties.push(self.next_difficulty(&hf)); + } + + self.cumulative_difficulties.drain( + self.cumulative_difficulties + .len() + .saturating_sub(new_timestamps_len).., + ); + self.timestamps + .drain(self.timestamps.len().saturating_sub(new_timestamps_len)..); + + for (timestamp, cum_dif) in diff_info_popped.into_iter().take(initial_len).rev() { + self.timestamps.push_front(timestamp); + self.cumulative_difficulties.push_front(cum_dif); + } + + difficulties + } + /// Returns the median timestamp over the last `numb_blocks`, including the genesis block if the block height is low enough. /// /// Will return [`None`] if there aren't enough blocks. diff --git a/consensus/src/context/rx_seed.rs b/consensus/src/context/rx_seed.rs deleted file mode 100644 index 4bead27c..00000000 --- a/consensus/src/context/rx_seed.rs +++ /dev/null @@ -1,118 +0,0 @@ -use std::collections::VecDeque; - -use futures::{stream::FuturesOrdered, StreamExt}; -use tower::ServiceExt; - -use monero_consensus::blocks::{is_randomx_seed_height, randomx_seed_height}; - -use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError}; - -const RX_SEEDS_CACHED: usize = 3; - -#[derive(Clone, Debug)] -pub struct RandomXSeed { - seeds: VecDeque<(u64, [u8; 32])>, -} - -impl RandomXSeed { - pub async fn init_from_chain_height( - chain_height: u64, - database: D, - ) -> Result { - let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED); - let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?; - - Ok(RandomXSeed { - seeds: seed_heights.into_iter().zip(seed_hashes).collect(), - }) - } - - pub fn get_seeds_hash(&self, seed_height: u64) -> [u8; 32] { - for (height, seed) in self.seeds.iter() { - if seed_height == *height { - return *seed; - } - } - - tracing::error!( - "Current seeds: {:?}, asked for: {}", - self.seeds, - seed_height - ); - panic!("RX seed cache was not updated or was asked for a block too old.") - } - - pub fn get_rx_seed(&self, height: u64) -> [u8; 32] { - let seed_height = randomx_seed_height(height); - tracing::warn!( - "Current seeds: {:?}, asked for: {}", - self.seeds, - seed_height - ); - - self.get_seeds_hash(seed_height) - } - - pub fn new_block(&mut self, height: u64, hash: &[u8; 32]) { - if is_randomx_seed_height(height) { - for (got_height, _) in self.seeds.iter() { - if *got_height == height { - return; - } - } - - self.seeds.push_front((height, *hash)); - - if self.seeds.len() > RX_SEEDS_CACHED { - self.seeds.pop_back(); - } - } - } -} - -fn get_last_rx_seed_heights(mut last_height: u64, mut amount: usize) -> Vec { - let mut seeds = Vec::with_capacity(amount); - if is_randomx_seed_height(last_height) { - seeds.push(last_height); - amount -= 1; - } - - for _ in 0..amount { - if last_height == 0 { - return seeds; - } - - let seed_height = randomx_seed_height(last_height); - seeds.push(seed_height); - last_height = seed_height - } - - seeds -} - -async fn get_block_hashes( - heights: Vec, - database: D, -) -> Result, ExtendedConsensusError> { - let mut fut = FuturesOrdered::new(); - - for height in heights { - let db = database.clone(); - fut.push_back(async move { - let DatabaseResponse::BlockHash(hash) = db - .clone() - .oneshot(DatabaseRequest::BlockHash(height)) - .await? - else { - panic!("Database sent incorrect response!"); - }; - Result::<_, ExtendedConsensusError>::Ok(hash) - }); - } - - let mut res = Vec::new(); - while let Some(hash) = fut.next().await { - res.push(hash?); - } - Ok(res) -} diff --git a/consensus/src/context/rx_vms.rs b/consensus/src/context/rx_vms.rs new file mode 100644 index 00000000..5631206a --- /dev/null +++ b/consensus/src/context/rx_vms.rs @@ -0,0 +1,202 @@ +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; + +use futures::{stream::FuturesOrdered, StreamExt}; +use randomx_rs::{RandomXCache, RandomXError, RandomXFlag, RandomXVM as VMInner}; +use rayon::prelude::*; +use thread_local::ThreadLocal; +use tower::ServiceExt; + +use cuprate_helper::asynch::rayon_spawn_async; +use monero_consensus::{ + blocks::{is_randomx_seed_height, RandomX, RX_SEEDHASH_EPOCH_BLOCKS}, + HardFork, +}; + +use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError}; + +const RX_SEEDS_CACHED: usize = 2; + +#[derive(Debug)] +pub struct RandomXVM { + vms: ThreadLocal, + cache: RandomXCache, + flags: RandomXFlag, +} + +impl RandomXVM { + pub fn new(seed: &[u8; 32]) -> Result { + let flags = RandomXFlag::get_recommended_flags(); + + let cache = RandomXCache::new(flags, seed.as_slice())?; + + Ok(RandomXVM { + vms: ThreadLocal::new(), + cache, + flags, + }) + } +} + +impl RandomX for RandomXVM { + type Error = RandomXError; + + fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error> { + self.vms + .get_or_try(|| VMInner::new(self.flags, Some(self.cache.clone()), None))? + .calculate_hash(buf) + .map(|out| out.try_into().unwrap()) + } +} + +#[derive(Clone, Debug)] +pub struct RandomXVMCache { + pub(crate) seeds: VecDeque<(u64, [u8; 32])>, + pub(crate) vms: HashMap>, + + pub(crate) cached_vm: Option<([u8; 32], Arc)>, +} + +impl RandomXVMCache { + pub async fn init_from_chain_height( + chain_height: u64, + hf: &HardFork, + database: D, + ) -> Result { + let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED); + let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?; + + let seeds: VecDeque<(u64, [u8; 32])> = seed_heights.into_iter().zip(seed_hashes).collect(); + + let vms = if hf >= &HardFork::V12 { + let seeds_clone = seeds.clone(); + rayon_spawn_async(move || { + seeds_clone + .par_iter() + .map(|(height, seed)| { + ( + *height, + Arc::new(RandomXVM::new(seed).expect("Failed to create RandomX VM!")), + ) + }) + .collect() + }) + .await + } else { + HashMap::new() + }; + + Ok(RandomXVMCache { + seeds, + vms, + cached_vm: None, + }) + } + + pub fn add_vm(&mut self, vm: ([u8; 32], Arc)) { + self.cached_vm.replace(vm); + } + + pub fn get_vms(&self) -> HashMap> { + self.vms.clone() + } + + pub async fn new_block(&mut self, height: u64, hash: &[u8; 32], hf: &HardFork) { + let should_make_vms = hf >= &HardFork::V12; + if should_make_vms && self.vms.len() != self.seeds.len() { + // this will only happen when syncing and rx activates. + let seeds_clone = self.seeds.clone(); + self.vms = rayon_spawn_async(move || { + seeds_clone + .par_iter() + .map(|(height, seed)| { + ( + *height, + Arc::new(RandomXVM::new(seed).expect("Failed to create RandomX VM!")), + ) + }) + .collect() + }) + .await + } + + if is_randomx_seed_height(height) { + self.seeds.push_front((height, *hash)); + + if should_make_vms { + let new_vm = 'new_vm_block: { + if let Some((cached_hash, cached_vm)) = self.cached_vm.take() { + if &cached_hash == hash { + break 'new_vm_block cached_vm; + } + }; + + let hash_clone = *hash; + rayon_spawn_async(move || Arc::new(RandomXVM::new(&hash_clone).unwrap())).await + }; + + self.vms.insert(height, new_vm); + } + + if self.seeds.len() > RX_SEEDS_CACHED { + self.seeds.pop_back(); + // TODO: This is really not efficient but the amount of VMs cached is not a lot. + self.vms.retain(|height, _| { + self.seeds + .iter() + .any(|(cached_height, _)| height == cached_height) + }) + } + } + } +} + +pub(crate) fn get_last_rx_seed_heights(mut last_height: u64, mut amount: usize) -> Vec { + let mut seeds = Vec::with_capacity(amount); + if is_randomx_seed_height(last_height) { + seeds.push(last_height); + amount -= 1; + } + + for _ in 0..amount { + if last_height == 0 { + return seeds; + } + + // We don't include the lag as we only want seeds not the specific seed fo this height. + let seed_height = (last_height - 1) & !(RX_SEEDHASH_EPOCH_BLOCKS - 1); + seeds.push(seed_height); + last_height = seed_height + } + + seeds +} + +async fn get_block_hashes( + heights: Vec, + database: D, +) -> Result, ExtendedConsensusError> { + let mut fut = FuturesOrdered::new(); + + for height in heights { + let db = database.clone(); + fut.push_back(async move { + let DatabaseResponse::BlockHash(hash) = db + .clone() + .oneshot(DatabaseRequest::BlockHash(height)) + .await? + else { + panic!("Database sent incorrect response!"); + }; + Result::<_, ExtendedConsensusError>::Ok(hash) + }); + } + + let mut res = Vec::new(); + while let Some(hash) = fut.next().await { + res.push(hash?); + } + Ok(res) +} diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index dd848c13..c4d9d815 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -1,7 +1,6 @@ use std::{ collections::{HashMap, HashSet}, future::Future, - sync::Arc, }; use monero_consensus::{transactions::OutputOnChain, ConsensusError, HardFork}; @@ -31,14 +30,13 @@ pub enum ExtendedConsensusError { ConErr(#[from] monero_consensus::ConsensusError), #[error("Database error: {0}")] DBErr(#[from] tower::BoxError), - #[error("Needed transaction is not in pool")] - TxPErr(#[from] TxNotInPool), + #[error("The transactions passed in with the block are incorrect.")] + TxsIncludedWithBlockIncorrect, } // TODO: instead of (ab)using generic returns return the acc type -pub async fn initialize_verifier( +pub async fn initialize_verifier( database: D, - tx_pool: TxP, ctx_svc: Ctx, ) -> Result< ( @@ -68,12 +66,6 @@ pub async fn initialize_verifier( where D: Database + Clone + Send + Sync + 'static, D::Future: Send + 'static, - TxP: tower::Service - + Clone - + Send - + Sync - + 'static, - TxP::Future: Send + 'static, Ctx: tower::Service< BlockChainContextRequest, Response = BlockChainContextResponse, @@ -84,8 +76,8 @@ where + 'static, Ctx::Future: Send + 'static, { - let tx_svc = transactions::TxVerifierService::new(database); - let block_svc = block::BlockVerifierService::new(ctx_svc, tx_svc.clone(), tx_pool); + let tx_svc = transactions::TxVerifierService::new(database.clone()); + let block_svc = block::BlockVerifierService::new(ctx_svc, tx_svc.clone(), database); Ok((block_svc, tx_svc)) } @@ -154,15 +146,3 @@ pub enum DatabaseResponse { )>, ), } - -#[derive(Debug, Copy, Clone, thiserror::Error)] -#[error("The transaction requested was not in the transaction pool")] -pub struct TxNotInPool; - -pub enum TxPoolRequest { - Transactions(Vec<[u8; 32]>), -} - -pub enum TxPoolResponse { - Transactions(Vec>), -} diff --git a/consensus/src/rpc/cache.rs b/consensus/src/rpc/cache.rs index a5597e94..012d161b 100644 --- a/consensus/src/rpc/cache.rs +++ b/consensus/src/rpc/cache.rs @@ -58,19 +58,15 @@ impl ScanningCache { txs: &[Arc], ) { self.add_tx_time_lock(miner_tx.hash(), miner_tx.prefix.timelock); - miner_tx - .prefix - .outputs - .iter() - .for_each(|out| self.add_outs(out.amount.unwrap_or(0), 1)); + miner_tx.prefix.outputs.iter().for_each(|out| { + self.add_outs(miner_tx.prefix.version == 2, out.amount.unwrap_or(0), 1) + }); txs.iter().for_each(|tx| { self.add_tx_time_lock(tx.tx_hash, tx.tx.prefix.timelock); - tx.tx - .prefix - .outputs - .iter() - .for_each(|out| self.add_outs(out.amount.unwrap_or(0), 1)); + tx.tx.prefix.outputs.iter().for_each(|out| { + self.add_outs(tx.tx.prefix.version == 2, out.amount.unwrap_or(0), 1) + }); tx.tx.prefix.inputs.iter().for_each(|inp| match inp { Input::ToKey { key_image, .. } => { @@ -125,7 +121,9 @@ impl ScanningCache { .collect() } - pub fn add_outs(&mut self, amount: u64, count: usize) { + pub fn add_outs(&mut self, is_v2: bool, amount: u64, count: usize) { + let amount = if is_v2 { 0 } else { amount }; + if let Some(numb_outs) = self.numb_outs.get_mut(&amount) { *numb_outs += count; } else { diff --git a/consensus/src/rpc/connection.rs b/consensus/src/rpc/connection.rs index 780d160e..b55c20c6 100644 --- a/consensus/src/rpc/connection.rs +++ b/consensus/src/rpc/connection.rs @@ -234,7 +234,7 @@ impl RpcConnection { txs.len(), "node: {}, height: {}, node is pruned, which is not supported!", address, - block.number(), + block.number().unwrap(), ); Ok((block, txs)) @@ -352,9 +352,8 @@ impl RpcConnection { // then a bad proof has been approved. key: CompressedEdwardsY::from_slice(&out.key) .unwrap() - .decompress() - .unwrap(), - mask: CompressedEdwardsY::from_slice(&out.mask) + .decompress(), + commitment: CompressedEdwardsY::from_slice(&out.mask) .unwrap() .decompress() .unwrap(), diff --git a/consensus/src/tests/context.rs b/consensus/src/tests/context.rs index 5756ea3f..2521a8fe 100644 --- a/consensus/src/tests/context.rs +++ b/consensus/src/tests/context.rs @@ -14,6 +14,7 @@ use crate::{ pub(crate) mod data; mod difficulty; mod hardforks; +mod rx_vms; mod weight; use difficulty::*; @@ -40,7 +41,7 @@ async fn context_invalidated_on_new_block() -> Result<(), tower::BoxError> { let BlockChainContextResponse::Context(context) = ctx_svc .clone() - .oneshot(BlockChainContextRequest::Get) + .oneshot(BlockChainContextRequest::GetContext) .await? else { panic!("Context service returned wrong response!"); @@ -82,8 +83,9 @@ async fn context_height_correct() -> Result<(), tower::BoxError> { let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?; - let BlockChainContextResponse::Context(context) = - ctx_svc.oneshot(BlockChainContextRequest::Get).await? + let BlockChainContextResponse::Context(context) = ctx_svc + .oneshot(BlockChainContextRequest::GetContext) + .await? else { panic!("context service returned incorrect response!") }; diff --git a/consensus/src/tests/context/difficulty.rs b/consensus/src/tests/context/difficulty.rs index 25a406b9..0e4af492 100644 --- a/consensus/src/tests/context/difficulty.rs +++ b/consensus/src/tests/context/difficulty.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; -use proptest::{arbitrary::any, prop_assert_eq, prop_compose, proptest}; +use proptest::collection::size_range; +use proptest::{prelude::*, prop_assert_eq, prop_compose, proptest}; use cuprate_helper::num::median; @@ -87,9 +88,9 @@ async fn calculate_diff_3000000_3002000() -> Result<(), tower::BoxError> { prop_compose! { /// Generates an arbitrary full difficulty cache. - fn arb_full_difficulty_cache() + fn arb_difficulty_cache(blocks: usize) ( - blocks in any::<[(u64, u64); TEST_TOTAL_ACCOUNTED_BLOCKS]>() + blocks in any_with::>(size_range(blocks).lift()), ) -> DifficultyCache { let (timestamps, mut cumulative_difficulties): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); cumulative_difficulties.sort_unstable(); @@ -104,10 +105,20 @@ prop_compose! { } } +prop_compose! { + fn random_difficulty_cache()( + blocks in 0..TEST_TOTAL_ACCOUNTED_BLOCKS, + )( + diff_cache in arb_difficulty_cache(blocks) + ) -> DifficultyCache { + diff_cache + } +} + proptest! { #[test] fn check_calculations_lag( - mut diff_cache in arb_full_difficulty_cache(), + mut diff_cache in arb_difficulty_cache(TEST_TOTAL_ACCOUNTED_BLOCKS), timestamp in any::(), cumulative_difficulty in any::(), hf in any::() @@ -133,7 +144,7 @@ proptest! { } #[test] - fn next_difficulty_consistant(diff_cache in arb_full_difficulty_cache(), hf in any::()) { + fn next_difficulty_consistant(diff_cache in arb_difficulty_cache(TEST_TOTAL_ACCOUNTED_BLOCKS), hf in any::()) { let first_call = diff_cache.next_difficulty(&hf); prop_assert_eq!(first_call, diff_cache.next_difficulty(&hf)); prop_assert_eq!(first_call, diff_cache.next_difficulty(&hf)); @@ -160,11 +171,41 @@ proptest! { } #[test] - fn window_size_kept_constant(mut diff_cache in arb_full_difficulty_cache(), new_blocks in any::>()) { + fn window_size_kept_constant(mut diff_cache in arb_difficulty_cache(TEST_TOTAL_ACCOUNTED_BLOCKS), new_blocks in any::>()) { for (timestamp, cumulative_difficulty) in new_blocks.into_iter() { diff_cache.new_block(diff_cache.last_accounted_height+1, timestamp, cumulative_difficulty); prop_assert_eq!(diff_cache.timestamps.len(), TEST_TOTAL_ACCOUNTED_BLOCKS); prop_assert_eq!(diff_cache.cumulative_difficulties.len(), TEST_TOTAL_ACCOUNTED_BLOCKS); } } + + #[test] + fn claculating_multiple_diffs_does_not_change_state( + mut diff_cache in random_difficulty_cache(), + timestamps in any_with::>(size_range(0..1000).lift()), + hf in any::(), + ) { + let cache = diff_cache.clone(); + + diff_cache.next_difficulties(timestamps.into_iter().zip([hf].into_iter().cycle()).collect(), &hf); + + assert_eq!(diff_cache, cache); + } + + #[test] + fn calculate_diff_in_advance( + mut diff_cache in random_difficulty_cache(), + timestamps in any_with::>(size_range(0..1000).lift()), + hf in any::(), + ) { + let timestamps: Vec<_> = timestamps.into_iter().zip([hf].into_iter().cycle()).collect(); + + let diffs = diff_cache.next_difficulties(timestamps.clone(), &hf); + + for (timestamp, diff) in timestamps.into_iter().zip(diffs.into_iter()) { + assert_eq!(diff_cache.next_difficulty(×tamp.1), diff); + diff_cache.new_block(diff_cache.last_accounted_height +1, timestamp.0, diff + diff_cache.cumulative_difficulty()); + } + + } } diff --git a/consensus/src/tests/context/rx_vms.rs b/consensus/src/tests/context/rx_vms.rs new file mode 100644 index 00000000..6d58c5cc --- /dev/null +++ b/consensus/src/tests/context/rx_vms.rs @@ -0,0 +1,72 @@ +use std::collections::VecDeque; + +use proptest::prelude::*; +use tokio::runtime::Builder; + +use monero_consensus::{ + blocks::{is_randomx_seed_height, randomx_seed_height}, + HardFork, +}; + +use crate::{ + context::rx_vms::{get_last_rx_seed_heights, RandomXVMCache}, + tests::mock_db::*, +}; + +#[test] +fn rx_heights_consistent() { + let mut last_rx_heights = VecDeque::new(); + + for height in 0..100_000_000 { + if is_randomx_seed_height(height) { + last_rx_heights.push_front(height); + if last_rx_heights.len() > 3 { + last_rx_heights.pop_back(); + } + } + + assert_eq!( + get_last_rx_seed_heights(height, 3).as_slice(), + last_rx_heights.make_contiguous() + ); + if last_rx_heights.len() >= 3 { + assert!( + randomx_seed_height(height) == last_rx_heights[0] + || randomx_seed_height(height) == last_rx_heights[1] + ); + } + } +} + +#[tokio::test] +async fn rx_vm_created_on_hf_12() { + let db = DummyDatabaseBuilder::default().finish(Some(10)); + + let mut cache = RandomXVMCache::init_from_chain_height(10, &HardFork::V11, db) + .await + .unwrap(); + + assert!(cache.vms.is_empty()); + cache.new_block(11, &[30; 32], &HardFork::V12).await; + assert!(!cache.vms.is_empty()); +} + +proptest! { + // these tests are expensive, so limit cases. + #![proptest_config(ProptestConfig { + cases: 3, .. ProptestConfig::default() + })] + #[test] + fn rx_vm_created_only_after_hf_12( + hf in any::(), + ) { + let db = DummyDatabaseBuilder::default().finish(Some(10)); + + let rt = Builder::new_multi_thread().enable_all().build().unwrap(); + + rt.block_on(async move { + let cache = RandomXVMCache::init_from_chain_height(10, &hf, db).await.unwrap(); + assert!(cache.seeds.len() == cache.vms.len() || hf < HardFork::V12); + }); + } +} diff --git a/consensus/src/transactions.rs b/consensus/src/transactions.rs index 7396aa03..5b9d5a54 100644 --- a/consensus/src/transactions.rs +++ b/consensus/src/transactions.rs @@ -28,7 +28,45 @@ use crate::{ DatabaseResponse, ExtendedConsensusError, }; -mod contextual_data; +pub mod contextual_data; +mod output_cache; + +pub use output_cache::OutputCache; + +pub async fn batch_setup_txs( + txs: Vec<(Vec, HardFork)>, +) -> Result>>, ExtendedConsensusError> { + let batch_verifier = Arc::new(MultiThreadedBatchVerifier::new(rayon::current_num_threads())); + + // Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs. + let txs = rayon_spawn_async(move || { + let txs = txs + .into_par_iter() + .map(|(txs, hf)| { + txs.into_par_iter() + .map(|tx| { + Ok(Arc::new(TransactionVerificationData::new( + tx, + &hf, + batch_verifier.clone(), + )?)) + }) + .collect::, ConsensusError>>() + }) + .collect::, ConsensusError>>()?; + + if !Arc::into_inner(batch_verifier).unwrap().verify() { + Err(ConsensusError::Transaction(TransactionError::RingCTError( + RingCTError::BulletproofsRangeInvalid, + )))? + } + + Ok::<_, ConsensusError>(txs) + }) + .await?; + + Ok(txs) +} /// Data needed to verify a transaction. /// @@ -90,13 +128,6 @@ pub enum VerifyTxRequest { hf: HardFork, re_org_token: ReOrgToken, }, - /// Batches the setup of [`TransactionVerificationData`], does *some* verification, you need to call [`VerifyTxRequest::Block`] - /// with the returned data. - BatchSetup { - txs: Vec, - hf: HardFork, - re_org_token: ReOrgToken, - }, } pub enum VerifyTxResponse { @@ -136,68 +167,31 @@ where fn call(&mut self, req: VerifyTxRequest) -> Self::Future { let database = self.database.clone(); - match req { - VerifyTxRequest::Block { - txs, - current_chain_height, - time_for_time_lock, - hf, - re_org_token, - } => verify_transactions_for_block( - database, - txs, - current_chain_height, - time_for_time_lock, - hf, - re_org_token, - ) - .boxed(), - VerifyTxRequest::BatchSetup { - txs, - hf, - re_org_token, - } => batch_setup_transactions(database, txs, hf, re_org_token).boxed(), + async move { + match req { + VerifyTxRequest::Block { + txs, + current_chain_height, + time_for_time_lock, + hf, + re_org_token, + } => { + verify_transactions_for_block( + database, + txs, + current_chain_height, + time_for_time_lock, + hf, + re_org_token, + ) + .await + } + } } + .boxed() } } -async fn batch_setup_transactions( - database: D, - txs: Vec, - hf: HardFork, - re_org_token: ReOrgToken, -) -> Result -where - D: Database + Clone + Sync + Send + 'static, -{ - let batch_verifier = Arc::new(MultiThreadedBatchVerifier::new(rayon::current_num_threads())); - - let cloned_verifier = batch_verifier.clone(); - // Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs. - let txs = rayon_spawn_async(move || { - txs.into_par_iter() - .map(|tx| { - Ok(Arc::new(TransactionVerificationData::new( - tx, - &hf, - cloned_verifier.clone(), - )?)) - }) - .collect::, ConsensusError>>() - }) - .await?; - - if !Arc::into_inner(batch_verifier).unwrap().verify() { - Err(ConsensusError::Transaction(TransactionError::RingCTError( - RingCTError::BulletproofsRangeInvalid, - )))? - } - - contextual_data::batch_fill_ring_member_info(&txs, &hf, re_org_token, database).await?; - - Ok(VerifyTxResponse::BatchSetupOk(txs)) -} - #[instrument(name = "verify_txs", skip_all, level = "info")] async fn verify_transactions_for_block( database: D, @@ -212,8 +206,14 @@ where { tracing::debug!("Verifying transactions for block, amount: {}", txs.len()); - contextual_data::batch_refresh_ring_member_info(&txs, &hf, re_org_token, database.clone()) - .await?; + contextual_data::batch_refresh_ring_member_info( + &txs, + &hf, + re_org_token, + database.clone(), + None, + ) + .await?; let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new())); diff --git a/consensus/src/transactions/contextual_data.rs b/consensus/src/transactions/contextual_data.rs index 05c534cc..aa1b0a61 100644 --- a/consensus/src/transactions/contextual_data.rs +++ b/consensus/src/transactions/contextual_data.rs @@ -11,8 +11,11 @@ //! Because this data is unique for *every* transaction and the context service is just for blockchain state data. //! -use std::collections::HashSet; -use std::{collections::HashMap, ops::Deref, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + ops::Deref, + sync::Arc, +}; use monero_serai::transaction::Input; use tower::ServiceExt; @@ -25,25 +28,28 @@ use monero_consensus::{ }; use crate::{ - context::ReOrgToken, transactions::TransactionVerificationData, Database, DatabaseRequest, - DatabaseResponse, ExtendedConsensusError, + context::ReOrgToken, + transactions::{output_cache::OutputCache, TransactionVerificationData}, + Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError, }; -pub async fn batch_refresh_ring_member_info( - txs_verification_data: &[Arc], +pub async fn batch_refresh_ring_member_info<'a, D: Database + Clone + Send + Sync + 'static>( + txs_verification_data: &'a [Arc], hf: &HardFork, re_org_token: ReOrgToken, mut database: D, + out_cache: Option<&OutputCache<'a>>, ) -> Result<(), ExtendedConsensusError> { let (txs_needing_full_refresh, txs_needing_partial_refresh) = ring_member_info_needing_refresh(txs_verification_data, hf); if !txs_needing_full_refresh.is_empty() { batch_fill_ring_member_info( - &txs_needing_full_refresh, + txs_needing_full_refresh.iter(), hf, re_org_token, database.clone(), + out_cache, ) .await?; } @@ -157,15 +163,16 @@ fn ring_member_info_needing_refresh( /// /// This function batch gets all the ring members for the inputted transactions and fills in data about /// them. -pub async fn batch_fill_ring_member_info( - txs_verification_data: &[Arc], +pub async fn batch_fill_ring_member_info<'a, D: Database + Clone + Send + Sync + 'static>( + txs_verification_data: impl Iterator> + Clone, hf: &HardFork, re_org_token: ReOrgToken, mut database: D, + out_cache: Option<&OutputCache<'a>>, ) -> Result<(), ExtendedConsensusError> { let mut output_ids = HashMap::new(); - for tx_v_data in txs_verification_data.iter() { + for tx_v_data in txs_verification_data.clone() { insert_ring_member_ids(&tx_v_data.tx.prefix.inputs, &mut output_ids) .map_err(ConsensusError::Transaction)?; } @@ -191,9 +198,19 @@ pub async fn batch_fill_ring_member_info { + Clear(u64), + Commitment(&'a EdwardsPoint), +} + +impl<'a> CachedAmount<'a> { + fn get_commitment(&self) -> EdwardsPoint { + match self { + CachedAmount::Commitment(commitment) => **commitment, + // TODO: Setup a table with common amounts. + CachedAmount::Clear(amt) => ED25519_BASEPOINT_POINT + H() * Scalar::from(*amt), + } + } +} + +#[derive(Debug)] +struct CachedOutput<'a> { + height: u64, + time_lock: &'a Timelock, + key: &'a CompressedEdwardsY, + amount: CachedAmount<'a>, + + cached_created: OnceLock, +} + +#[derive(Debug)] +pub struct OutputCache<'a>(HashMap>>); + +impl<'a> OutputCache<'a> { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + OutputCache(HashMap::new()) + } + + pub fn get_out(&self, amt: u64, idx: u64) -> Option<&OutputOnChain> { + let cached_out = self.0.get(&amt)?.get(&idx)?; + + Some(cached_out.cached_created.get_or_init(|| OutputOnChain { + height: cached_out.height, + time_lock: *cached_out.time_lock, + key: cached_out.key.decompress(), + commitment: cached_out.amount.get_commitment(), + })) + } + + pub async fn extend_from_block<'b: 'a, D: Database>( + &mut self, + blocks: impl Iterator])> + 'b, + database: &mut D, + ) -> Result<(), ExtendedConsensusError> { + let mut idx_needed = HashMap::new(); + + for (block, txs) in blocks { + for tx in once(&block.miner_tx).chain(txs.iter().map(|tx| &tx.tx)) { + let is_rct = tx.prefix.version == 2; + let is_miner = matches!(tx.prefix.inputs.as_slice(), &[Input::Gen(_)]); + + for (i, out) in tx.prefix.outputs.iter().enumerate() { + let amt = out.amount.unwrap_or(0); + // The amt this output will be stored under. + let amt_table_key = if is_rct { 0 } else { amt }; + + let amount_commitment = match (is_rct, is_miner) { + (true, false) => CachedAmount::Commitment( + tx.rct_signatures.base.commitments.get(i).ok_or( + ConsensusError::Transaction(TransactionError::NonZeroOutputForV2), + )?, + ), + _ => CachedAmount::Clear(amt), + }; + let output_to_cache = CachedOutput { + height: block.number().ok_or(ConsensusError::Block( + BlockError::MinerTxError(MinerTxError::InputNotOfTypeGen), + ))?, + time_lock: &tx.prefix.timelock, + key: &out.key, + amount: amount_commitment, + + cached_created: OnceLock::new(), + }; + + let Some(amt_table) = self.0.get_mut(&amt_table_key) else { + idx_needed + .entry(amt_table_key) + .or_insert_with(Vec::new) + .push(output_to_cache); + continue; + }; + + let top_idx = *amt_table.last_key_value().unwrap().0; + amt_table.insert(top_idx + 1, output_to_cache); + } + } + } + + if idx_needed.is_empty() { + return Ok(()); + } + + let DatabaseResponse::NumberOutputsWithAmount(numb_outs) = database + .ready() + .await? + .call(DatabaseRequest::NumberOutputsWithAmount( + idx_needed.keys().copied().collect(), + )) + .await? + else { + panic!("Database sent incorrect response!"); + }; + + for (amt_table_key, out) in idx_needed { + let numb_outs = *numb_outs + .get(&amt_table_key) + .expect("DB did not return all results!"); + + self.0.entry(amt_table_key).or_default().extend( + out.into_iter() + .enumerate() + .map(|(i, out)| (u64::try_from(i + numb_outs).unwrap(), out)), + ) + } + + Ok(()) + } +} diff --git a/net/levin/src/header.rs b/net/levin/src/header.rs index 3435293c..42418ded 100644 --- a/net/levin/src/header.rs +++ b/net/levin/src/header.rs @@ -13,21 +13,6 @@ // copies or substantial portions of the Software. // -// Rust Levin Library -// Written in 2023 by -// Cuprate Contributors -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// - //! This module provides a struct BucketHead for the header of a levin protocol //! message. diff --git a/net/monero-wire/src/network_address.rs b/net/monero-wire/src/network_address.rs index bd4068a2..900ae61d 100644 --- a/net/monero-wire/src/network_address.rs +++ b/net/monero-wire/src/network_address.rs @@ -21,8 +21,8 @@ use bytes::BufMut; use epee_encoding::EpeeObject; use std::{hash::Hash, net, net::SocketAddr}; -mod serde_helper; -use serde_helper::*; +mod epee_builder; +use epee_builder::*; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum NetZone { diff --git a/net/monero-wire/src/network_address/serde_helper.rs b/net/monero-wire/src/network_address/epee_builder.rs similarity index 100% rename from net/monero-wire/src/network_address/serde_helper.rs rename to net/monero-wire/src/network_address/epee_builder.rs