Use a single long-lived RPC connection when authenticated

The prior system spawned a new connection per request to enable parallelism,
yet kept hitting hyper::IncompleteMessages I couldn't track down. This
attempts to resolve those by a long-lived socket.

Halves the amount of requests per-authenticated RPC call, and accordingly is
likely still better overall.

I don't believe this is resolved yet but this is still worth pushing.
This commit is contained in:
Luke Parker 2023-11-06 23:45:39 -05:00
parent c03fb6c71b
commit 56fd11ab8d
No known key found for this signature in database
14 changed files with 169 additions and 76 deletions

12
Cargo.lock generated
View file

@ -309,6 +309,17 @@ dependencies = [
"event-listener", "event-listener",
] ]
[[package]]
name = "async-recursion"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.74" version = "0.1.74"
@ -4840,6 +4851,7 @@ dependencies = [
name = "monero-serai" name = "monero-serai"
version = "0.1.4-alpha" version = "0.1.4-alpha"
dependencies = [ dependencies = [
"async-recursion",
"async-trait", "async-trait",
"base58-monero", "base58-monero",
"curve25519-dalek", "curve25519-dalek",

View file

@ -54,6 +54,7 @@ serde_json = { version = "1", default-features = false, features = ["alloc"] }
base58-monero = { version = "2", default-features = false, features = ["check"] } base58-monero = { version = "2", default-features = false, features = ["check"] }
# Used for the provided HTTP RPC # Used for the provided HTTP RPC
async-recursion = { version = "1", optional = true }
digest_auth = { version = "0.3", default-features = false, optional = true } digest_auth = { version = "0.3", default-features = false, optional = true }
simple-request = { path = "../../common/request", version = "0.1", default-features = false, optional = true } simple-request = { path = "../../common/request", version = "0.1", default-features = false, optional = true }
tokio = { version = "1", default-features = false, optional = true } tokio = { version = "1", default-features = false, optional = true }
@ -100,7 +101,7 @@ std = [
"base58-monero/std", "base58-monero/std",
] ]
http-rpc = ["digest_auth", "simple-request", "tokio"] http-rpc = ["async-recursion", "digest_auth", "simple-request", "tokio"]
multisig = ["transcript", "frost", "dleq", "std"] multisig = ["transcript", "frost", "dleq", "std"]
binaries = ["tokio/rt-multi-thread", "tokio/macros", "http-rpc"] binaries = ["tokio/rt-multi-thread", "tokio/macros", "http-rpc"]
experimental = [] experimental = []

View file

@ -271,14 +271,15 @@ async fn main() {
} }
let nodes = if specified_nodes.is_empty() { default_nodes } else { specified_nodes }; let nodes = if specified_nodes.is_empty() { default_nodes } else { specified_nodes };
let rpc = |url: String| { let rpc = |url: String| async move {
HttpRpc::new(url.clone()) HttpRpc::new(url.clone())
.await
.unwrap_or_else(|_| panic!("couldn't create HttpRpc connected to {url}")) .unwrap_or_else(|_| panic!("couldn't create HttpRpc connected to {url}"))
}; };
let main_rpc = rpc(nodes[0].clone()); let main_rpc = rpc(nodes[0].clone()).await;
let mut rpcs = vec![]; let mut rpcs = vec![];
for i in 0 .. async_parallelism { for i in 0 .. async_parallelism {
rpcs.push(Arc::new(rpc(nodes[i % nodes.len()].clone()))); rpcs.push(Arc::new(rpc(nodes[i % nodes.len()].clone()).await));
} }
let mut rpc_i = 0; let mut rpc_i = 0;

View file

@ -1,25 +1,30 @@
use std::io::Read; use std::{sync::Arc, io::Read};
use async_trait::async_trait; use async_trait::async_trait;
use digest_auth::AuthContext; use tokio::sync::Mutex;
use digest_auth::{WwwAuthenticateHeader, AuthContext};
use simple_request::{ use simple_request::{
hyper::{header::HeaderValue, Request}, hyper::{StatusCode, header::HeaderValue, Request},
Client, Response, Client,
}; };
use crate::rpc::{RpcError, RpcConnection, Rpc}; use crate::rpc::{RpcError, RpcConnection, Rpc};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
enum Authentication { enum Authentication {
// If unauthenticated, reuse a single client // If unauthenticated, use a single client
Unauthenticated(Client), Unauthenticated(Client),
// If authenticated, don't reuse clients so that each connection makes its own connection // If authenticated, use a single client which supports being locked and tracks its nonce
// This ensures that if a nonce is requested, another caller doesn't make a request invalidating // This ensures that if a nonce is requested, another caller doesn't make a request invalidating
// it // it
// We could acquire a mutex over the client, yet creating a new client is preferred for the Authenticated {
// possibility of parallelism username: String,
Authenticated { username: String, password: String }, password: String,
#[allow(clippy::type_complexity)]
connection: Arc<Mutex<(Option<(WwwAuthenticateHeader, u64)>, Client)>>,
},
} }
/// An HTTP(S) transport for the RPC. /// An HTTP(S) transport for the RPC.
@ -32,11 +37,29 @@ pub struct HttpRpc {
} }
impl HttpRpc { impl HttpRpc {
fn digest_auth_challenge(
response: &Response,
) -> Result<Option<(WwwAuthenticateHeader, u64)>, RpcError> {
Ok(if let Some(header) = response.headers().get("www-authenticate") {
Some((
digest_auth::parse(
header
.to_str()
.map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?,
)
.map_err(|_| RpcError::InvalidNode("invalid digest-auth response"))?,
0,
))
} else {
None
})
}
/// Create a new HTTP(S) RPC connection. /// Create a new HTTP(S) RPC connection.
/// ///
/// A daemon requiring authentication can be used via including the username and password in the /// A daemon requiring authentication can be used via including the username and password in the
/// URL. /// URL.
pub fn new(mut url: String) -> Result<Rpc<HttpRpc>, RpcError> { pub async fn new(mut url: String) -> Result<Rpc<HttpRpc>, RpcError> {
let authentication = if url.contains('@') { let authentication = if url.contains('@') {
// Parse out the username and password // Parse out the username and password
let url_clone = url; let url_clone = url;
@ -61,9 +84,24 @@ impl HttpRpc {
if split_userpass.len() > 2 { if split_userpass.len() > 2 {
Err(RpcError::ConnectionError("invalid amount of passwords".to_string()))?; Err(RpcError::ConnectionError("invalid amount of passwords".to_string()))?;
} }
let client = Client::without_connection_pool(url.clone())
.map_err(|_| RpcError::ConnectionError("invalid URL".to_string()))?;
// Obtain the initial challenge, which also somewhat validates this connection
let challenge = Self::digest_auth_challenge(
&client
.request(
Request::post(url.clone())
.body(vec![].into())
.map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))?,
)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
)?;
Authentication::Authenticated { Authentication::Authenticated {
username: split_userpass[0].to_string(), username: split_userpass[0].to_string(),
password: split_userpass.get(1).unwrap_or(&"").to_string(), password: split_userpass.get(1).unwrap_or(&"").to_string(),
connection: Arc::new(Mutex::new((challenge, client))),
} }
} else { } else {
Authentication::Unauthenticated(Client::with_connection_pool()) Authentication::Unauthenticated(Client::with_connection_pool())
@ -74,60 +112,96 @@ impl HttpRpc {
} }
impl HttpRpc { impl HttpRpc {
async fn inner_post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> { #[async_recursion::async_recursion]
let request = |uri| Request::post(uri).body(body.clone().into()).unwrap(); async fn inner_post(
&self,
route: &str,
body: Vec<u8>,
recursing: bool,
) -> Result<Vec<u8>, RpcError> {
let request_fn = |uri| {
Request::post(uri)
.body(body.clone().into())
.map_err(|e| RpcError::ConnectionError(format!("couldn't make request: {e:?}")))
};
let mut connection = None;
let response = match &self.authentication { let response = match &self.authentication {
Authentication::Unauthenticated(client) => client Authentication::Unauthenticated(client) => client
.request(request(self.url.clone() + "/" + route)) .request(request_fn(self.url.clone() + "/" + route)?)
.await .await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?, .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
Authentication::Authenticated { username, password } => { Authentication::Authenticated { username, password, connection } => {
// This Client will drop and replace its connection on error, when monero-serai requires let mut connection_lock = connection.lock().await;
// a single socket for the lifetime of this function
// Since dropping the connection will raise an error, and this function aborts on any
// error, this is fine
let client = Client::without_connection_pool(self.url.clone())
.map_err(|_| RpcError::ConnectionError("invalid URL".to_string()))?;
let mut response = client
.request(request("/".to_string() + route))
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?;
// Only provide authentication if this daemon actually expects it let mut request = request_fn("/".to_string() + route)?;
if let Some(header) = response.headers().get("www-authenticate") {
let mut request = request("/".to_string() + route); // If we don't have an auth challenge, obtain one
request.headers_mut().insert( if connection_lock.0.is_none() {
"Authorization", connection_lock.0 = Self::digest_auth_challenge(
HeaderValue::from_str( &connection_lock
&digest_auth::parse( .1
header .request(request)
.to_str() .await
.map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?, .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?,
) )?;
.map_err(|_| RpcError::InvalidNode("invalid digest-auth response"))? request = request_fn("/".to_string() + route)?;
.respond(&AuthContext::new_post::<_, _, _, &[u8]>( }
// Insert the challenge response, if we have a challenge
if let Some((challenge, cnonce)) = connection_lock.0.as_mut() {
// Update the cnonce
// Overflow isn't a concern as this is a u64
*cnonce += 1;
let mut context = AuthContext::new_post::<_, _, _, &[u8]>(
username, username,
password, password,
"/".to_string() + route, "/".to_string() + route,
None, None,
)) );
context.set_custom_cnonce(hex::encode(cnonce.to_le_bytes()));
request.headers_mut().insert(
"Authorization",
HeaderValue::from_str(
&challenge
.respond(&context)
.map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))? .map_err(|_| RpcError::InvalidNode("couldn't respond to digest-auth challenge"))?
.to_header_string(), .to_header_string(),
) )
.unwrap(), .unwrap(),
); );
// Make the request with the response challenge
response = client
.request(request)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?;
} }
// Store the client so it's not dropped yet let response_result = connection_lock
connection = Some(client); .1
.request(request)
.await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")));
// If the connection entered an error state, drop the cached challenge as challenges are
// per-connection
// We don't need to create a new connection as simple-request will for us
if response_result.is_err() {
connection_lock.0 = None;
}
let response = response_result?;
// If we need to re-auth due to this token being stale, recursively re-call this function,
// unless we're already recursing
if (!recursing) && (response.status() == StatusCode::UNAUTHORIZED) {
if let Some(header) = response.headers().get("www-authenticate") {
if header
.to_str()
.map_err(|_| RpcError::InvalidNode("www-authenticate header wasn't a string"))?
.contains("stale")
{
connection_lock.0 = None;
drop(connection_lock);
return self.inner_post(route, body, true).await;
}
}
}
response response
} }
@ -163,8 +237,6 @@ impl HttpRpc {
.read_to_end(&mut res) .read_to_end(&mut res)
.unwrap(); .unwrap();
drop(connection);
Ok(res) Ok(res)
} }
} }
@ -173,7 +245,7 @@ impl HttpRpc {
impl RpcConnection for HttpRpc { impl RpcConnection for HttpRpc {
async fn post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> { async fn post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
// TODO: Make this timeout configurable // TODO: Make this timeout configurable
tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body)) tokio::time::timeout(core::time::Duration::from_secs(30), self.inner_post(route, body, false))
.await .await
.map_err(|e| RpcError::ConnectionError(format!("{e:?}")))? .map_err(|e| RpcError::ConnectionError(format!("{e:?}")))?
} }

View file

@ -86,7 +86,7 @@ pub fn check_weight_and_fee(tx: &Transaction, fee_rate: Fee) {
} }
pub async fn rpc() -> Rpc<HttpRpc> { pub async fn rpc() -> Rpc<HttpRpc> {
let rpc = HttpRpc::new("http://127.0.0.1:18081".to_string()).unwrap(); let rpc = HttpRpc::new("http://127.0.0.1:18081".to_string()).await.unwrap();
// Only run once // Only run once
if rpc.get_height().await.unwrap() != 1 { if rpc.get_height().await.unwrap() != 1 {

View file

@ -35,7 +35,7 @@ async fn make_integrated_address(rpc: &Rpc<HttpRpc>, payment_id: [u8; 8]) -> Str
} }
async fn initialize_rpcs() -> (Rpc<HttpRpc>, Rpc<HttpRpc>, String) { async fn initialize_rpcs() -> (Rpc<HttpRpc>, Rpc<HttpRpc>, String) {
let wallet_rpc = HttpRpc::new("http://127.0.0.1:6061".to_string()).unwrap(); let wallet_rpc = HttpRpc::new("http://127.0.0.1:6061".to_string()).await.unwrap();
let daemon_rpc = runner::rpc().await; let daemon_rpc = runner::rpc().await;
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]

View file

@ -26,7 +26,7 @@ pub enum Error {
InvalidUri, InvalidUri,
MissingHost, MissingHost,
InconsistentHost, InconsistentHost,
SslError, SslError(Box<dyn Send + Sync + std::error::Error>),
Hyper(hyper::Error), Hyper(hyper::Error),
} }
@ -116,7 +116,7 @@ impl Client {
// If there's not a connection... // If there's not a connection...
if connection_lock.is_none() { if connection_lock.is_none() {
let (requester, connection) = hyper::client::conn::http1::handshake( let (requester, connection) = hyper::client::conn::http1::handshake(
https_builder.clone().call(host.clone()).await.map_err(|_| Error::SslError)?, https_builder.clone().call(host.clone()).await.map_err(Error::SslError)?,
) )
.await .await
.map_err(Error::Hyper)?; .map_err(Error::Hyper)?;

View file

@ -650,7 +650,7 @@ async fn main() {
#[cfg(feature = "bitcoin")] #[cfg(feature = "bitcoin")]
NetworkId::Bitcoin => run(db, Bitcoin::new(url).await, coordinator).await, NetworkId::Bitcoin => run(db, Bitcoin::new(url).await, coordinator).await,
#[cfg(feature = "monero")] #[cfg(feature = "monero")]
NetworkId::Monero => run(db, Monero::new(url), coordinator).await, NetworkId::Monero => run(db, Monero::new(url).await, coordinator).await,
_ => panic!("spawning a processor for an unsupported network"), _ => panic!("spawning a processor for an unsupported network"),
} }
} }

View file

@ -191,8 +191,8 @@ fn map_rpc_err(err: RpcError) -> NetworkError {
} }
impl Monero { impl Monero {
pub fn new(url: String) -> Monero { pub async fn new(url: String) -> Monero {
Monero { rpc: HttpRpc::new(url).unwrap() } Monero { rpc: HttpRpc::new(url).await.unwrap() }
} }
fn view_pair(spend: EdwardsPoint) -> ViewPair { fn view_pair(spend: EdwardsPoint) -> ViewPair {

View file

@ -114,13 +114,15 @@ mod monero {
async fn monero(ops: &DockerOperations) -> Monero { async fn monero(ops: &DockerOperations) -> Monero {
let handle = ops.handle("serai-dev-monero").host_port(18081).unwrap(); let handle = ops.handle("serai-dev-monero").host_port(18081).unwrap();
let monero = Monero::new(format!("http://serai:seraidex@{}:{}", handle.0, handle.1)); let url = format!("http://serai:seraidex@{}:{}", handle.0, handle.1);
for _ in 0 .. 60 { for _ in 0 .. 60 {
if monero.get_latest_block_number().await.is_ok() { if monero_serai::rpc::HttpRpc::new(url.clone()).await.is_ok() {
break; break;
} }
tokio::time::sleep(core::time::Duration::from_secs(1)).await; tokio::time::sleep(core::time::Duration::from_secs(1)).await;
} }
let monero = Monero::new(url).await;
while monero.get_latest_block_number().await.unwrap() < 150 { while monero.get_latest_block_number().await.unwrap() < 150 {
monero.mine_block().await; monero.mine_block().await;
} }

View file

@ -171,7 +171,7 @@ impl Handles {
// If the RPC server has yet to start, sleep for up to 60s until it does // If the RPC server has yet to start, sleep for up to 60s until it does
for _ in 0 .. 60 { for _ in 0 .. 60 {
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
let Ok(client) = HttpRpc::new(rpc.clone()) else { continue }; let Ok(client) = HttpRpc::new(rpc.clone()).await else { continue };
if client.get_height().await.is_err() { if client.get_height().await.is_err() {
continue; continue;
} }

View file

@ -523,7 +523,7 @@ async fn mint_and_burn_test() {
Coin::Monero, Coin::Monero,
1_000_000_000_000, 1_000_000_000_000,
ExternalAddress::new( ExternalAddress::new(
serai_client::networks::monero::Address::new(monero_addr).unwrap().try_into().unwrap(), serai_client::networks::monero::Address::new(monero_addr).unwrap().into(),
) )
.unwrap(), .unwrap(),
) )

View file

@ -188,7 +188,8 @@ impl Coordinator {
use monero_serai::rpc::HttpRpc; use monero_serai::rpc::HttpRpc;
// Monero's won't, so call get_height // Monero's won't, so call get_height
if HttpRpc::new(rpc_url.clone()) if handle
.block_on(HttpRpc::new(rpc_url.clone()))
.ok() .ok()
.and_then(|rpc| handle.block_on(rpc.get_height()).ok()) .and_then(|rpc| handle.block_on(rpc.get_height()).ok())
.is_some() .is_some()
@ -283,7 +284,7 @@ impl Coordinator {
rpc::HttpRpc, rpc::HttpRpc,
}; };
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); let rpc = HttpRpc::new(rpc_url).await.expect("couldn't connect to the Monero RPC");
let _: EmptyResponse = rpc let _: EmptyResponse = rpc
.json_rpc_call( .json_rpc_call(
"generateblocks", "generateblocks",
@ -322,7 +323,8 @@ impl Coordinator {
NetworkId::Monero => { NetworkId::Monero => {
use monero_serai::rpc::HttpRpc; use monero_serai::rpc::HttpRpc;
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC"); let rpc =
HttpRpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Monero RPC");
let res: serde_json::Value = rpc let res: serde_json::Value = rpc
.json_rpc_call("submit_block", Some(serde_json::json!([hex::encode(block)]))) .json_rpc_call("submit_block", Some(serde_json::json!([hex::encode(block)])))
.await .await
@ -368,10 +370,11 @@ impl Coordinator {
NetworkId::Monero => { NetworkId::Monero => {
use monero_serai::rpc::HttpRpc; use monero_serai::rpc::HttpRpc;
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); let rpc = HttpRpc::new(rpc_url).await.expect("couldn't connect to the Monero RPC");
let to = rpc.get_height().await.unwrap(); let to = rpc.get_height().await.unwrap();
for coordinator in others { for coordinator in others {
let from = HttpRpc::new(network_rpc(self.network, ops, &coordinator.network_handle)) let from = HttpRpc::new(network_rpc(self.network, ops, &coordinator.network_handle))
.await
.expect("couldn't connect to the Monero RPC") .expect("couldn't connect to the Monero RPC")
.get_height() .get_height()
.await .await
@ -407,7 +410,8 @@ impl Coordinator {
NetworkId::Monero => { NetworkId::Monero => {
use monero_serai::{transaction::Transaction, rpc::HttpRpc}; use monero_serai::{transaction::Transaction, rpc::HttpRpc};
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC"); let rpc =
HttpRpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Monero RPC");
rpc.publish_transaction(&Transaction::read(&mut &*tx).unwrap()).await.unwrap(); rpc.publish_transaction(&Transaction::read(&mut &*tx).unwrap()).await.unwrap();
} }
NetworkId::Serai => panic!("processor tests broadcasting block to Serai"), NetworkId::Serai => panic!("processor tests broadcasting block to Serai"),
@ -436,7 +440,8 @@ impl Coordinator {
NetworkId::Monero => { NetworkId::Monero => {
use monero_serai::rpc::HttpRpc; use monero_serai::rpc::HttpRpc;
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the coordinator's Monero RPC"); let rpc =
HttpRpc::new(rpc_url).await.expect("couldn't connect to the coordinator's Monero RPC");
let mut hash = [0; 32]; let mut hash = [0; 32];
hash.copy_from_slice(tx); hash.copy_from_slice(tx);
if let Ok(tx) = rpc.get_transaction(hash).await { if let Ok(tx) = rpc.get_transaction(hash).await {

View file

@ -180,7 +180,7 @@ impl Wallet {
let view_pair = let view_pair =
ViewPair::new(ED25519_BASEPOINT_POINT * spend_key, Zeroizing::new(view_key)); ViewPair::new(ED25519_BASEPOINT_POINT * spend_key, Zeroizing::new(view_key));
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); let rpc = HttpRpc::new(rpc_url).await.expect("couldn't connect to the Monero RPC");
let height = rpc.get_height().await.unwrap(); let height = rpc.get_height().await.unwrap();
// Mines 200 blocks so sufficient decoys exist, as only 60 is needed for maturity // Mines 200 blocks so sufficient decoys exist, as only 60 is needed for maturity
@ -316,7 +316,7 @@ impl Wallet {
use processor::{additional_key, networks::Monero}; use processor::{additional_key, networks::Monero};
let rpc_url = network_rpc(NetworkId::Monero, ops, handle); let rpc_url = network_rpc(NetworkId::Monero, ops, handle);
let rpc = HttpRpc::new(rpc_url).expect("couldn't connect to the Monero RPC"); let rpc = HttpRpc::new(rpc_url).await.expect("couldn't connect to the Monero RPC");
// Prepare inputs // Prepare inputs
let outputs = std::mem::take(inputs); let outputs = std::mem::take(inputs);