From bc07e14b1e56da8157eeecea2eb32fed7a65e9d8 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 7 Nov 2023 23:05:09 -0500 Subject: [PATCH] Remove async_recursion for a for loop --- Cargo.lock | 12 -- coins/monero/Cargo.toml | 3 +- coins/monero/src/rpc/http.rs | 238 +++++++++++++++++------------------ 3 files changed, 119 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb159071..8306b74a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,17 +309,6 @@ dependencies = [ "event-listener", ] -[[package]] -name = "async-recursion" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.39", -] - [[package]] name = "async-trait" version = "0.1.74" @@ -4851,7 +4840,6 @@ dependencies = [ name = "monero-serai" version = "0.1.4-alpha" dependencies = [ - "async-recursion", "async-trait", "base58-monero", "curve25519-dalek", diff --git a/coins/monero/Cargo.toml b/coins/monero/Cargo.toml index 101cf80b..cf0549fe 100644 --- a/coins/monero/Cargo.toml +++ b/coins/monero/Cargo.toml @@ -54,7 +54,6 @@ serde_json = { version = "1", default-features = false, features = ["alloc"] } base58-monero = { version = "2", default-features = false, features = ["check"] } # Used for the provided HTTP RPC -async-recursion = { version = "1", optional = true } digest_auth = { version = "0.3", default-features = false, optional = true } simple-request = { path = "../../common/request", version = "0.1", default-features = false, optional = true } tokio = { version = "1", default-features = false, optional = true } @@ -101,7 +100,7 @@ std = [ "base58-monero/std", ] -http-rpc = ["async-recursion", "digest_auth", "simple-request", "tokio"] +http-rpc = ["digest_auth", "simple-request", "tokio"] multisig = ["transcript", "frost", "dleq", "std"] binaries = ["tokio/rt-multi-thread", "tokio/macros", "http-rpc"] experimental = [] diff --git a/coins/monero/src/rpc/http.rs b/coins/monero/src/rpc/http.rs index bcf201c9..dad2c107 100644 --- a/coins/monero/src/rpc/http.rs +++ b/coins/monero/src/rpc/http.rs @@ -112,141 +112,139 @@ impl HttpRpc { } impl HttpRpc { - #[async_recursion::async_recursion] - async fn inner_post( - &self, - route: &str, - body: Vec, - recursing: bool, - ) -> Result, RpcError> { + async fn inner_post(&self, route: &str, body: Vec) -> Result, RpcError> { let request_fn = |uri| { Request::post(uri) .body(body.clone().into()) .map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}"))) }; - let response = match &self.authentication { - Authentication::Unauthenticated(client) => client - .request(request_fn(self.url.clone() + "/" + route)?) - .await - .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?, - Authentication::Authenticated { username, password, connection } => { - let mut connection_lock = connection.lock().await; - - let mut request = request_fn("/".to_string() + route)?; - - // If we don't have an auth challenge, obtain one - if connection_lock.0.is_none() { - connection_lock.0 = Self::digest_auth_challenge( - &connection_lock - .1 - .request(request) - .await - .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?, - )?; - request = request_fn("/".to_string() + route)?; - } - - // Insert the challenge response, if we have a challenge - if let Some((challenge, cnonce)) = connection_lock.0.as_mut() { - // Update the cnonce - // Overflow isn't a concern as this is a u64 - *cnonce += 1; - - let mut context = AuthContext::new_post::<_, _, _, &[u8]>( - username, - password, - "/".to_string() + route, - None, - ); - context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes())); - - request.headers_mut().insert( - "Authorization", - HeaderValue::from_str( - &challenge - .respond(&context) - .map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))? - .to_header_string(), - ) - .unwrap(), - ); - } - - let response_result = connection_lock - .1 - .request(request) + for attempt in 0 .. 2 { + let response = match &self.authentication { + Authentication::Unauthenticated(client) => client + .request(request_fn(self.url.clone() + "/" + route)?) .await - .map_err(|e| RpcError::ConnectionError(format!("{e:?}"))); + .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?, + Authentication::Authenticated { username, password, connection } => { + let mut connection_lock = connection.lock().await; - // If the connection entered an error state, drop the cached challenge as challenges are - // per-connection - // We don't need to create a new connection as simple-request will for us - if response_result.is_err() { - connection_lock.0 = None; - } + let mut request = request_fn("/".to_string() + route)?; - // If we're not already recursing and: - // 1) We had a connection error - // 2) We need to re-auth due to this token being stale - // recursively re-call this function - if (!recursing) && - (response_result.is_err() || { - let response = response_result.as_ref().unwrap(); - if response.status() == StatusCode::UNAUTHORIZED { - if let Some(header) = response.headers().get("www-authenticate") { - header - .to_str() - .map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))? - .contains("stale") + // If we don't have an auth challenge, obtain one + if connection_lock.0.is_none() { + connection_lock.0 = Self::digest_auth_challenge( + &connection_lock + .1 + .request(request) + .await + .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?, + )?; + request = request_fn("/".to_string() + route)?; + } + + // Insert the challenge response, if we have a challenge + if let Some((challenge, cnonce)) = connection_lock.0.as_mut() { + // Update the cnonce + // Overflow isn't a concern as this is a u64 + *cnonce += 1; + + let mut context = AuthContext::new_post::<_, _, _, &[u8]>( + username, + password, + "/".to_string() + route, + None, + ); + context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes())); + + request.headers_mut().insert( + "Authorization", + HeaderValue::from_str( + &challenge + .respond(&context) + .map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))? + .to_header_string(), + ) + .unwrap(), + ); + } + + let response_result = connection_lock + .1 + .request(request) + .await + .map_err(|e| RpcError::ConnectionError(format!("{e:?}"))); + + // If the connection entered an error state, drop the cached challenge as challenges are + // per-connection + // We don't need to create a new connection as simple-request will for us + if response_result.is_err() { + connection_lock.0 = None; + } + + // If we're not already on our second attempt and: + // A) We had a connection error + // B) We need to re-auth due to this token being stale + // Move to the next loop iteration (retrying all of this) + if (attempt == 0) && + (response_result.is_err() || { + let response = response_result.as_ref().unwrap(); + if response.status() == StatusCode::UNAUTHORIZED { + if let Some(header) = response.headers().get("www-authenticate") { + header + .to_str() + .map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))? + .contains("stale") + } else { + false + } } else { false } - } else { - false - } - }) - { - connection_lock.0 = None; - drop(connection_lock); - return self.inner_post(route, body, true).await; + }) + { + // Drop the cached authentication before we do + connection_lock.0 = None; + continue; + } + + response_result? } + }; - response_result? - } - }; - - /* - let length = usize::try_from( - response - .headers() - .get("content-length") - .ok_or(RpcError::InvalidNode("no content-length header"))? - .to_str() - .map_err(|_| RpcError::InvalidNode("non-ascii content-length value"))? - .parse::() - .map_err(|_| RpcError::InvalidNode("non-u32 content-length value"))?, - ) - .unwrap(); - // Only pre-allocate 1 MB so a malicious node which claims a content-length of 1 GB actually - // has to send 1 GB of data to cause a 1 GB allocation - let mut res = Vec::with_capacity(length.max(1024 * 1024)); - let mut body = response.into_body(); - while res.len() < length { - let Some(data) = body.data().await else { break }; - res.extend(data.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?.as_ref()); - } - */ - - let mut res = Vec::with_capacity(128); - response - .body() - .await - .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))? - .read_to_end(&mut res) + /* + let length = usize::try_from( + response + .headers() + .get("content-length") + .ok_or(RpcError::InvalidNode("no content-length header"))? + .to_str() + .map_err(|_| RpcError::InvalidNode("non-ascii content-length value"))? + .parse::() + .map_err(|_| RpcError::InvalidNode("non-u32 content-length value"))?, + ) .unwrap(); + // Only pre-allocate 1 MB so a malicious node which claims a content-length of 1 GB actually + // has to send 1 GB of data to cause a 1 GB allocation + let mut res = Vec::with_capacity(length.max(1024 * 1024)); + let mut body = response.into_body(); + while res.len() < length { + let Some(data) = body.data().await else { break }; + res.extend(data.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?.as_ref()); + } + */ - Ok(res) + let mut res = Vec::with_capacity(128); + response + .body() + .await + .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))? + .read_to_end(&mut res) + .unwrap(); + + return Ok(res); + } + + unreachable!() } } @@ -254,7 +252,7 @@ impl HttpRpc { impl RpcConnection for HttpRpc { async fn post(&self, route: &str, body: Vec) -> Result, RpcError> { // TODO: Make this timeout configurable - tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body, false)) + tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body)) .await .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))? }