diff --git a/Cargo.lock b/Cargo.lock index 8306b74a..bb159071 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,6 +309,17 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-recursion" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -4840,6 +4851,7 @@ dependencies = [ name = "monero-serai" version = "0.1.4-alpha" dependencies = [ + "async-recursion", "async-trait", "base58-monero", "curve25519-dalek", diff --git a/coins/monero/Cargo.toml b/coins/monero/Cargo.toml index cf0549fe..101cf80b 100644 --- a/coins/monero/Cargo.toml +++ b/coins/monero/Cargo.toml @@ -54,6 +54,7 @@ serde_json = { version = "1", default-features = false, features = ["alloc"] } base58-monero = { version = "2", default-features = false, features = ["check"] } # Used for the provided HTTP RPC +async-recursion = { version = "1", optional = true } digest_auth = { version = "0.3", default-features = false, optional = true } simple-request = { path = "../../common/request", version = "0.1", default-features = false, optional = true } tokio = { version = "1", default-features = false, optional = true } @@ -100,7 +101,7 @@ std = [ "base58-monero/std", ] -http-rpc = ["digest_auth", "simple-request", "tokio"] +http-rpc = ["async-recursion", "digest_auth", "simple-request", "tokio"] multisig = ["transcript", "frost", "dleq", "std"] binaries = ["tokio/rt-multi-thread", "tokio/macros", "http-rpc"] experimental = [] diff --git a/coins/monero/src/bin/reserialize_chain.rs b/coins/monero/src/bin/reserialize_chain.rs index b23f73c1..c850b4ed 100644 --- a/coins/monero/src/bin/reserialize_chain.rs +++ b/coins/monero/src/bin/reserialize_chain.rs @@ -271,14 +271,15 @@ async fn main() { } let nodes = if specified_nodes.is_empty() { default_nodes } else { specified_nodes }; - let rpc = |url: String| { + let rpc = |url: String| async move { HttpRpc::new(url.clone()) + .await .unwrap_or_else(|_| panic!("couldn't create HttpRpc connected to {url}")) }; - let main_rpc = rpc(nodes[0].clone()); + let main_rpc = rpc(nodes[0].clone()).await; let mut rpcs = vec![]; for i in 0 .. async_parallelism { - rpcs.push(Arc::new(rpc(nodes[i % nodes.len()].clone()))); + rpcs.push(Arc::new(rpc(nodes[i % nodes.len()].clone()).await)); } let mut rpc_i = 0; diff --git a/coins/monero/src/rpc/http.rs b/coins/monero/src/rpc/http.rs index b7462cd0..402d17a5 100644 --- a/coins/monero/src/rpc/http.rs +++ b/coins/monero/src/rpc/http.rs @@ -1,25 +1,30 @@ -use std::io::Read; +use std::{sync::Arc, io::Read}; use async_trait::async_trait; -use digest_auth::AuthContext; +use tokio::sync::Mutex; + +use digest_auth::{WwwAuthenticateHeader, AuthContext}; use simple_request::{ - hyper::{header::HeaderValue, Request}, - Client, + hyper::{StatusCode, header::HeaderValue, Request}, + Response, Client, }; use crate::rpc::{RpcError, RpcConnection, Rpc}; #[derive(Clone, Debug)] enum Authentication { - // If unauthenticated, reuse a single client + // If unauthenticated, use a single client Unauthenticated(Client), - // If authenticated, don't reuse clients so that each connection makes its own connection + // If authenticated, use a single client which supports being locked and tracks its nonce // This ensures that if a nonce is requested, another caller doesn't make a request invalidating // it - // We could acquire a mutex over the client, yet creating a new client is preferred for the - // possibility of parallelism - Authenticated { username: String, password: String }, + Authenticated { + username: String, + password: String, + #[allow(clippy::type_complexity)] + connection: Arc, Client)>>, + }, } /// An HTTP(S) transport for the RPC. @@ -32,11 +37,29 @@ pub struct HttpRpc { } impl HttpRpc { + fn digest_auth_challenge( + response: &Response, + ) -> Result, RpcError> { + Ok(if let Some(header) = response.headers().get("www-authenticate") { + Some(( + digest_auth::parse( + header + .to_str() + .map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?, + ) + .map_err(|_| RpcError::InvalidNode("invalid digest-auth response"))?, + 0, + )) + } else { + None + }) + } + /// Create a new HTTP(S) RPC connection. /// /// A daemon requiring authentication can be used via including the username and password in the /// URL. - pub fn new(mut url: String) -> Result, RpcError> { + pub async fn new(mut url: String) -> Result, RpcError> { let authentication = if url.contains('@') { // Parse out the username and password let url_clone = url; @@ -61,9 +84,24 @@ impl HttpRpc { if split_userpass.len() > 2 { Err(RpcError::ConnectionError("invalid amount of passwords".to_string()))?; } + + let client = Client::without_connection_pool(url.clone()) + .map_err(|_| RpcError::ConnectionError("invalid URL".to_string()))?; + // Obtain the initial challenge, which also somewhat validates this connection + let challenge = Self::digest_auth_challenge( + &client + .request( + Request::post(url.clone()) + .body(vec![].into()) + .map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))?, + ) + .await + .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?, + )?; Authentication::Authenticated { username: split_userpass[0].to_string(), password: split_userpass.get(1).unwrap_or(&"").to_string(), + connection: Arc::new(Mutex::new((challenge, client))), } } else { Authentication::Unauthenticated(Client::with_connection_pool()) @@ -74,60 +112,96 @@ impl HttpRpc { } impl HttpRpc { - async fn inner_post(&self, route: &str, body: Vec) -> Result, RpcError> { - let request = |uri| Request::post(uri).body(body.clone().into()).unwrap(); + #[async_recursion::async_recursion] + async fn inner_post( + &self, + route: &str, + body: Vec, + recursing: bool, + ) -> Result, RpcError> { + let request_fn = |uri| { + Request::post(uri) + .body(body.clone().into()) + .map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}"))) + }; - let mut connection = None; let response = match &self.authentication { Authentication::Unauthenticated(client) => client - .request(request(self.url.clone() + "/" + route)) + .request(request_fn(self.url.clone() + "/" + route)?) .await .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?, - Authentication::Authenticated { username, password } => { - // This Client will drop and replace its connection on error, when monero-serai requires - // a single socket for the lifetime of this function - // Since dropping the connection will raise an error, and this function aborts on any - // error, this is fine - let client = Client::without_connection_pool(self.url.clone()) - .map_err(|_| RpcError::ConnectionError("invalid URL".to_string()))?; - let mut response = client - .request(request("/".to_string() + route)) - .await - .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?; + Authentication::Authenticated { username, password, connection } => { + let mut connection_lock = connection.lock().await; + + let mut request = request_fn("/".to_string() + route)?; + + // If we don't have an auth challenge, obtain one + if connection_lock.0.is_none() { + connection_lock.0 = Self::digest_auth_challenge( + &connection_lock + .1 + .request(request) + .await + .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?, + )?; + request = request_fn("/".to_string() + route)?; + } + + // Insert the challenge response, if we have a challenge + if let Some((challenge, cnonce)) = connection_lock.0.as_mut() { + // Update the cnonce + // Overflow isn't a concern as this is a u64 + *cnonce += 1; + + let mut context = AuthContext::new_post::<_, _, _, &[u8]>( + username, + password, + "/".to_string() + route, + None, + ); + context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes())); - // Only provide authentication if this daemon actually expects it - if let Some(header) = response.headers().get("www-authenticate") { - let mut request = request("/".to_string() + route); request.headers_mut().insert( "Authorization", HeaderValue::from_str( - &digest_auth::parse( - header - .to_str() - .map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?, - ) - .map_err(|_| RpcError::InvalidNode("invalid digest-auth response"))? - .respond(&AuthContext::new_post::<_, _, _, &[u8]>( - username, - password, - "/".to_string() + route, - None, - )) - .map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))? - .to_header_string(), + &challenge + .respond(&context) + .map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))? + .to_header_string(), ) .unwrap(), ); - - // Make the request with the response challenge - response = client - .request(request) - .await - .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?; } - // Store the client so it's not dropped yet - connection = Some(client); + let response_result = connection_lock + .1 + .request(request) + .await + .map_err(|e| RpcError::ConnectionError(format!("{e:?}"))); + + // If the connection entered an error state, drop the cached challenge as challenges are + // per-connection + // We don't need to create a new connection as simple-request will for us + if response_result.is_err() { + connection_lock.0 = None; + } + let response = response_result?; + + // If we need to re-auth due to this token being stale, recursively re-call this function, + // unless we're already recursing + if (!recursing) && (response.status() == StatusCode::UNAUTHORIZED) { + if let Some(header) = response.headers().get("www-authenticate") { + if header + .to_str() + .map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))? + .contains("stale") + { + connection_lock.0 = None; + drop(connection_lock); + return self.inner_post(route, body, true).await; + } + } + } response } @@ -163,8 +237,6 @@ impl HttpRpc { .read_to_end(&mut res) .unwrap(); - drop(connection); - Ok(res) } } @@ -173,7 +245,7 @@ impl HttpRpc { impl RpcConnection for HttpRpc { async fn post(&self, route: &str, body: Vec) -> Result, RpcError> { // TODO: Make this timeout configurable - tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body)) + tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body, false)) .await .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))? } diff --git a/coins/monero/tests/runner.rs b/coins/monero/tests/runner.rs index 9362d860..e85268fd 100644 --- a/coins/monero/tests/runner.rs +++ b/coins/monero/tests/runner.rs @@ -86,7 +86,7 @@ pub fn check_weight_and_fee(tx: &Transaction, fee_rate: Fee) { } pub async fn rpc() -> Rpc { - let rpc = HttpRpc::new("http://127.0.0.1:18081".to_string()).unwrap(); + let rpc = HttpRpc::new("http://127.0.0.1:18081".to_string()).await.unwrap(); // Only run once if rpc.get_height().await.unwrap() != 1 { diff --git a/coins/monero/tests/wallet2_compatibility.rs b/coins/monero/tests/wallet2_compatibility.rs index 592a4d92..4b638c24 100644 --- a/coins/monero/tests/wallet2_compatibility.rs +++ b/coins/monero/tests/wallet2_compatibility.rs @@ -35,7 +35,7 @@ async fn make_integrated_address(rpc: &Rpc, payment_id: [u8; 8]) -> Str } async fn initialize_rpcs() -> (Rpc, Rpc, String) { - let wallet_rpc = HttpRpc::new("http://127.0.0.1:6061".to_string()).unwrap(); + let wallet_rpc = HttpRpc::new("http://127.0.0.1:6061".to_string()).await.unwrap(); let daemon_rpc = runner::rpc().await; #[derive(Debug, Deserialize)] diff --git a/common/request/src/lib.rs b/common/request/src/lib.rs index 86eb1aac..8dbab9c0 100644 --- a/common/request/src/lib.rs +++ b/common/request/src/lib.rs @@ -26,7 +26,7 @@ pub enum Error { InvalidUri, MissingHost, InconsistentHost, - SslError, + SslError(Box), Hyper(hyper::Error), } @@ -116,7 +116,7 @@ impl Client { // If there's not a connection... if connection_lock.is_none() { let (requester, connection) = hyper::client::conn::http1::handshake( - https_builder.clone().call(host.clone()).await.map_err(|_| Error::SslError)?, + https_builder.clone().call(host.clone()).await.map_err(Error::SslError)?, ) .await .map_err(Error::Hyper)?; diff --git a/processor/src/main.rs b/processor/src/main.rs index 07c5b52d..074b92c2 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -650,7 +650,7 @@ async fn main() { #[cfg(feature = "bitcoin")] NetworkId::Bitcoin => run(db, Bitcoin::new(url).await, coordinator).await, #[cfg(feature = "monero")] - NetworkId::Monero => run(db, Monero::new(url), coordinator).await, + NetworkId::Monero => run(db, Monero::new(url).await, coordinator).await, _ => panic!("spawning a processor for an unsupported network"), } } diff --git a/processor/src/networks/monero.rs b/processor/src/networks/monero.rs index f919b195..22f02bcc 100644 --- a/processor/src/networks/monero.rs +++ b/processor/src/networks/monero.rs @@ -191,8 +191,8 @@ fn map_rpc_err(err: RpcError) -> NetworkError { } impl Monero { - pub fn new(url: String) -> Monero { - Monero { rpc: HttpRpc::new(url).unwrap() } + pub async fn new(url: String) -> Monero { + Monero { rpc: HttpRpc::new(url).await.unwrap() } } fn view_pair(spend: EdwardsPoint) -> ViewPair { diff --git a/processor/src/tests/literal/mod.rs b/processor/src/tests/literal/mod.rs index 04d76199..b0bb4c74 100644 --- a/processor/src/tests/literal/mod.rs +++ b/processor/src/tests/literal/mod.rs @@ -114,13 +114,15 @@ mod monero { async fn monero(ops: &DockerOperations) -> Monero { let handle = ops.handle("serai-dev-monero").host_port(18081).unwrap(); - let monero = Monero::new(format!("http://serai:seraidex@{}:{}", handle.0, handle.1)); + let url = format!("http://serai:seraidex@{}:{}", handle.0, handle.1); for _ in 0 .. 60 { - if monero.get_latest_block_number().await.is_ok() { + if monero_serai::rpc::HttpRpc::new(url.clone()).await.is_ok() { break; } tokio::time::sleep(core::time::Duration::from_secs(1)).await; } + + let monero = Monero::new(url).await; while monero.get_latest_block_number().await.unwrap() < 150 { monero.mine_block().await; } diff --git a/tests/full-stack/src/lib.rs b/tests/full-stack/src/lib.rs index 886956a4..7a9bbe8a 100644 --- a/tests/full-stack/src/lib.rs +++ b/tests/full-stack/src/lib.rs @@ -171,7 +171,7 @@ impl Handles { // If the RPC server has yet to start, sleep for up to 60s until it does for _ in 0 .. 60 { tokio::time::sleep(Duration::from_secs(1)).await; - let Ok(client) = HttpRpc::new(rpc.clone()) else { continue }; + let Ok(client) = HttpRpc::new(rpc.clone()).await else { continue }; if client.get_height().await.is_err() { continue; } diff --git a/tests/full-stack/src/tests/mint_and_burn.rs b/tests/full-stack/src/tests/mint_and_burn.rs index f545f81a..1277f6fa 100644 --- a/tests/full-stack/src/tests/mint_and_burn.rs +++ b/tests/full-stack/src/tests/mint_and_burn.rs @@ -523,7 +523,7 @@ async fn mint_and_burn_test() { Coin::Monero, 1_000_000_000_000, ExternalAddress::new( - serai_client::networks::monero::Address::new(monero_addr).unwrap().try_into().unwrap(), + serai_client::networks::monero::Address::new(monero_addr).unwrap().into(), ) .unwrap(), ) diff --git a/tests/processor/src/lib.rs b/tests/processor/src/lib.rs index 169b164b..40c3c3ae 100644 --- a/tests/processor/src/lib.rs +++ b/tests/processor/src/lib.rs @@ -188,7 +188,8 @@ impl Coordinator { use monero_serai::rpc::HttpRpc; // Monero's won't, so call get_height - if HttpRpc::new(rpc_url.clone()) + if handle + .block_on(HttpRpc::new(rpc_url.clone())) .ok() .and_then(|rpc| handle.block_on(rpc.get_height()).ok()) .is_some() @@ -283,7 +284,7 @@ impl Coordinator { rpc::HttpRpc, }; - let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); + let rpc = HttpRpc::new(rpc_url).await.expect("couldn't connect to the Monero RPC"); let _: EmptyResponse = rpc .json_rpc_call( "generateblocks", @@ -322,7 +323,8 @@ impl Coordinator { NetworkId::Monero => { use monero_serai::rpc::HttpRpc; - let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC"); + let rpc = + HttpRpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Monero RPC"); let res: serde_json::Value = rpc .json_rpc_call("submit_block", Some(serde_json::json!([hex::encode(block)]))) .await @@ -368,10 +370,11 @@ impl Coordinator { NetworkId::Monero => { use monero_serai::rpc::HttpRpc; - let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); + let rpc = HttpRpc::new(rpc_url).await.expect("couldn't connect to the Monero RPC"); let to = rpc.get_height().await.unwrap(); for coordinator in others { let from = HttpRpc::new(network_rpc(self.network, ops, &coordinator.network_handle)) + .await .expect("couldn't connect to the Monero RPC") .get_height() .await @@ -407,7 +410,8 @@ impl Coordinator { NetworkId::Monero => { use monero_serai::{transaction::Transaction, rpc::HttpRpc}; - let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC"); + let rpc = + HttpRpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Monero RPC"); rpc.publish_transaction(&Transaction::read(&mut &*tx).unwrap()).await.unwrap(); } NetworkId::Serai => panic!("processor tests broadcasting block to Serai"), @@ -436,7 +440,8 @@ impl Coordinator { NetworkId::Monero => { use monero_serai::rpc::HttpRpc; - let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC"); + let rpc = + HttpRpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Monero RPC"); let mut hash = [0; 32]; hash.copy_from_slice(tx); if let Ok(tx) = rpc.get_transaction(hash).await { diff --git a/tests/processor/src/networks.rs b/tests/processor/src/networks.rs index 9db4fb1d..140f096e 100644 --- a/tests/processor/src/networks.rs +++ b/tests/processor/src/networks.rs @@ -180,7 +180,7 @@ impl Wallet { let view_pair = ViewPair::new(ED25519_BASEPOINT_POINT * spend_key, Zeroizing::new(view_key)); - let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); + let rpc = HttpRpc::new(rpc_url).await.expect("couldn't connect to the Monero RPC"); let height = rpc.get_height().await.unwrap(); // Mines 200 blocks so sufficient decoys exist, as only 60 is needed for maturity @@ -316,7 +316,7 @@ impl Wallet { use processor::{additional_key, networks::Monero}; let rpc_url = network_rpc(NetworkId::Monero, ops, handle); - let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); + let rpc = HttpRpc::new(rpc_url).await.expect("couldn't connect to the Monero RPC"); // Prepare inputs let outputs = std::mem::take(inputs);