mirror of
https://github.com/serai-dex/serai.git
synced 2024-11-16 17:07:35 +00:00
Remove async_recursion for a for loop
This commit is contained in:
parent
e1c07d89e0
commit
bc07e14b1e
3 changed files with 119 additions and 134 deletions
12
Cargo.lock
generated
12
Cargo.lock
generated
|
@ -309,17 +309,6 @@ dependencies = [
|
|||
"event-listener",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-recursion"
|
||||
version = "1.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.39",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.74"
|
||||
|
@ -4851,7 +4840,6 @@ dependencies = [
|
|||
name = "monero-serai"
|
||||
version = "0.1.4-alpha"
|
||||
dependencies = [
|
||||
"async-recursion",
|
||||
"async-trait",
|
||||
"base58-monero",
|
||||
"curve25519-dalek",
|
||||
|
|
|
@ -54,7 +54,6 @@ serde_json = { version = "1", default-features = false, features = ["alloc"] }
|
|||
base58-monero = { version = "2", default-features = false, features = ["check"] }
|
||||
|
||||
# Used for the provided HTTP RPC
|
||||
async-recursion = { version = "1", optional = true }
|
||||
digest_auth = { version = "0.3", default-features = false, optional = true }
|
||||
simple-request = { path = "../../common/request", version = "0.1", default-features = false, optional = true }
|
||||
tokio = { version = "1", default-features = false, optional = true }
|
||||
|
@ -101,7 +100,7 @@ std = [
|
|||
"base58-monero/std",
|
||||
]
|
||||
|
||||
http-rpc = ["async-recursion", "digest_auth", "simple-request", "tokio"]
|
||||
http-rpc = ["digest_auth", "simple-request", "tokio"]
|
||||
multisig = ["transcript", "frost", "dleq", "std"]
|
||||
binaries = ["tokio/rt-multi-thread", "tokio/macros", "http-rpc"]
|
||||
experimental = []
|
||||
|
|
|
@ -112,141 +112,139 @@ impl HttpRpc {
|
|||
}
|
||||
|
||||
impl HttpRpc {
|
||||
#[async_recursion::async_recursion]
|
||||
async fn inner_post(
|
||||
&self,
|
||||
route: &str,
|
||||
body: Vec<u8>,
|
||||
recursing: bool,
|
||||
) -> Result<Vec<u8>, RpcError> {
|
||||
async fn inner_post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
|
||||
let request_fn = |uri| {
|
||||
Request::post(uri)
|
||||
.body(body.clone().into())
|
||||
.map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))
|
||||
};
|
||||
|
||||
let response = match &self.authentication {
|
||||
Authentication::Unauthenticated(client) => client
|
||||
.request(request_fn(self.url.clone() + "/" + route)?)
|
||||
.await
|
||||
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
|
||||
Authentication::Authenticated { username, password, connection } => {
|
||||
let mut connection_lock = connection.lock().await;
|
||||
|
||||
let mut request = request_fn("/".to_string() + route)?;
|
||||
|
||||
// If we don't have an auth challenge, obtain one
|
||||
if connection_lock.0.is_none() {
|
||||
connection_lock.0 = Self::digest_auth_challenge(
|
||||
&connection_lock
|
||||
.1
|
||||
.request(request)
|
||||
.await
|
||||
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
|
||||
)?;
|
||||
request = request_fn("/".to_string() + route)?;
|
||||
}
|
||||
|
||||
// Insert the challenge response, if we have a challenge
|
||||
if let Some((challenge, cnonce)) = connection_lock.0.as_mut() {
|
||||
// Update the cnonce
|
||||
// Overflow isn't a concern as this is a u64
|
||||
*cnonce += 1;
|
||||
|
||||
let mut context = AuthContext::new_post::<_, _, _, &[u8]>(
|
||||
username,
|
||||
password,
|
||||
"/".to_string() + route,
|
||||
None,
|
||||
);
|
||||
context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes()));
|
||||
|
||||
request.headers_mut().insert(
|
||||
"Authorization",
|
||||
HeaderValue::from_str(
|
||||
&challenge
|
||||
.respond(&context)
|
||||
.map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))?
|
||||
.to_header_string(),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
let response_result = connection_lock
|
||||
.1
|
||||
.request(request)
|
||||
for attempt in 0 .. 2 {
|
||||
let response = match &self.authentication {
|
||||
Authentication::Unauthenticated(client) => client
|
||||
.request(request_fn(self.url.clone() + "/" + route)?)
|
||||
.await
|
||||
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")));
|
||||
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
|
||||
Authentication::Authenticated { username, password, connection } => {
|
||||
let mut connection_lock = connection.lock().await;
|
||||
|
||||
// If the connection entered an error state, drop the cached challenge as challenges are
|
||||
// per-connection
|
||||
// We don't need to create a new connection as simple-request will for us
|
||||
if response_result.is_err() {
|
||||
connection_lock.0 = None;
|
||||
}
|
||||
let mut request = request_fn("/".to_string() + route)?;
|
||||
|
||||
// If we're not already recursing and:
|
||||
// 1) We had a connection error
|
||||
// 2) We need to re-auth due to this token being stale
|
||||
// recursively re-call this function
|
||||
if (!recursing) &&
|
||||
(response_result.is_err() || {
|
||||
let response = response_result.as_ref().unwrap();
|
||||
if response.status() == StatusCode::UNAUTHORIZED {
|
||||
if let Some(header) = response.headers().get("www-authenticate") {
|
||||
header
|
||||
.to_str()
|
||||
.map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?
|
||||
.contains("stale")
|
||||
// If we don't have an auth challenge, obtain one
|
||||
if connection_lock.0.is_none() {
|
||||
connection_lock.0 = Self::digest_auth_challenge(
|
||||
&connection_lock
|
||||
.1
|
||||
.request(request)
|
||||
.await
|
||||
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
|
||||
)?;
|
||||
request = request_fn("/".to_string() + route)?;
|
||||
}
|
||||
|
||||
// Insert the challenge response, if we have a challenge
|
||||
if let Some((challenge, cnonce)) = connection_lock.0.as_mut() {
|
||||
// Update the cnonce
|
||||
// Overflow isn't a concern as this is a u64
|
||||
*cnonce += 1;
|
||||
|
||||
let mut context = AuthContext::new_post::<_, _, _, &[u8]>(
|
||||
username,
|
||||
password,
|
||||
"/".to_string() + route,
|
||||
None,
|
||||
);
|
||||
context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes()));
|
||||
|
||||
request.headers_mut().insert(
|
||||
"Authorization",
|
||||
HeaderValue::from_str(
|
||||
&challenge
|
||||
.respond(&context)
|
||||
.map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))?
|
||||
.to_header_string(),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
let response_result = connection_lock
|
||||
.1
|
||||
.request(request)
|
||||
.await
|
||||
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")));
|
||||
|
||||
// If the connection entered an error state, drop the cached challenge as challenges are
|
||||
// per-connection
|
||||
// We don't need to create a new connection as simple-request will for us
|
||||
if response_result.is_err() {
|
||||
connection_lock.0 = None;
|
||||
}
|
||||
|
||||
// If we're not already on our second attempt and:
|
||||
// A) We had a connection error
|
||||
// B) We need to re-auth due to this token being stale
|
||||
// Move to the next loop iteration (retrying all of this)
|
||||
if (attempt == 0) &&
|
||||
(response_result.is_err() || {
|
||||
let response = response_result.as_ref().unwrap();
|
||||
if response.status() == StatusCode::UNAUTHORIZED {
|
||||
if let Some(header) = response.headers().get("www-authenticate") {
|
||||
header
|
||||
.to_str()
|
||||
.map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?
|
||||
.contains("stale")
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
{
|
||||
connection_lock.0 = None;
|
||||
drop(connection_lock);
|
||||
return self.inner_post(route, body, true).await;
|
||||
})
|
||||
{
|
||||
// Drop the cached authentication before we do
|
||||
connection_lock.0 = None;
|
||||
continue;
|
||||
}
|
||||
|
||||
response_result?
|
||||
}
|
||||
};
|
||||
|
||||
response_result?
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
let length = usize::try_from(
|
||||
response
|
||||
.headers()
|
||||
.get("content-length")
|
||||
.ok_or(RpcError::InvalidNode("no content-length header"))?
|
||||
.to_str()
|
||||
.map_err(|_| RpcError::InvalidNode("non-ascii content-length value"))?
|
||||
.parse::<u32>()
|
||||
.map_err(|_| RpcError::InvalidNode("non-u32 content-length value"))?,
|
||||
)
|
||||
.unwrap();
|
||||
// Only pre-allocate 1 MB so a malicious node which claims a content-length of 1 GB actually
|
||||
// has to send 1 GB of data to cause a 1 GB allocation
|
||||
let mut res = Vec::with_capacity(length.max(1024 * 1024));
|
||||
let mut body = response.into_body();
|
||||
while res.len() < length {
|
||||
let Some(data) = body.data().await else { break };
|
||||
res.extend(data.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?.as_ref());
|
||||
}
|
||||
*/
|
||||
|
||||
let mut res = Vec::with_capacity(128);
|
||||
response
|
||||
.body()
|
||||
.await
|
||||
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
|
||||
.read_to_end(&mut res)
|
||||
/*
|
||||
let length = usize::try_from(
|
||||
response
|
||||
.headers()
|
||||
.get("content-length")
|
||||
.ok_or(RpcError::InvalidNode("no content-length header"))?
|
||||
.to_str()
|
||||
.map_err(|_| RpcError::InvalidNode("non-ascii content-length value"))?
|
||||
.parse::<u32>()
|
||||
.map_err(|_| RpcError::InvalidNode("non-u32 content-length value"))?,
|
||||
)
|
||||
.unwrap();
|
||||
// Only pre-allocate 1 MB so a malicious node which claims a content-length of 1 GB actually
|
||||
// has to send 1 GB of data to cause a 1 GB allocation
|
||||
let mut res = Vec::with_capacity(length.max(1024 * 1024));
|
||||
let mut body = response.into_body();
|
||||
while res.len() < length {
|
||||
let Some(data) = body.data().await else { break };
|
||||
res.extend(data.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?.as_ref());
|
||||
}
|
||||
*/
|
||||
|
||||
Ok(res)
|
||||
let mut res = Vec::with_capacity(128);
|
||||
response
|
||||
.body()
|
||||
.await
|
||||
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
|
||||
.read_to_end(&mut res)
|
||||
.unwrap();
|
||||
|
||||
return Ok(res);
|
||||
}
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -254,7 +252,7 @@ impl HttpRpc {
|
|||
impl RpcConnection for HttpRpc {
|
||||
async fn post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
|
||||
// TODO: Make this timeout configurable
|
||||
tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body, false))
|
||||
tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body))
|
||||
.await
|
||||
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue