Support arbitrary RPC providers in monero-serai

Sets a clean path for no-std premised RPCs (buffers to an external RPC impl)/
Tor-based RPCs/client-side load balancing/...
This commit is contained in:
Luke Parker 2023-05-02 02:39:08 -04:00
parent 5765d1d278
commit adfbde6e24
No known key found for this signature in database
10 changed files with 123 additions and 74 deletions

1
Cargo.lock generated
View file

@ -5270,6 +5270,7 @@ dependencies = [
name = "monero-serai"
version = "0.1.4-alpha"
dependencies = [
"async-trait",
"base58-monero",
"crc",
"curve25519-dalek 3.2.0",

View file

@ -15,6 +15,7 @@ rustdoc-args = ["--cfg", "docsrs"]
futures = "0.3"
lazy_static = "1"
async-trait = "0.1"
thiserror = "1"
rand_core = "0.6"

View file

@ -1,5 +1,6 @@
use std::fmt::Debug;
use async_trait::async_trait;
use thiserror::Error;
use curve25519_dalek::edwards::{EdwardsPoint, CompressedEdwardsY};
@ -8,7 +9,7 @@ use serde::{Serialize, Deserialize, de::DeserializeOwned};
use serde_json::{Value, json};
use digest_auth::AuthContext;
use reqwest::{Client, RequestBuilder};
use reqwest::Client;
use crate::{
Protocol,
@ -73,18 +74,27 @@ fn rpc_point(point: &str) -> Result<EdwardsPoint, RpcError> {
.ok_or_else(|| RpcError::InvalidPoint(point.to_string()))
}
#[async_trait]
pub trait RpcConnection: Clone + Debug {
/// Perform a POST request to the specified route with the specified body.
///
/// The implementor is left to handle anything such as authentication.
async fn post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError>;
}
#[derive(Clone, Debug)]
pub struct Rpc {
pub struct HttpRpc {
client: Client,
userpass: Option<(String, String)>,
url: String,
}
impl Rpc {
/// Create a new RPC connection.
impl HttpRpc {
/// Create a new HTTP(S) RPC connection.
///
/// A daemon requiring authentication can be used via including the username and password in the
/// URL.
pub fn new(mut url: String) -> Result<Rpc, RpcError> {
pub fn new(mut url: String) -> Result<Rpc<HttpRpc>, RpcError> {
// Parse out the username and password
let userpass = if url.contains('@') {
let url_clone = url;
@ -114,26 +124,80 @@ impl Rpc {
None
};
Ok(Rpc { client: Client::new(), userpass, url })
Ok(Rpc(HttpRpc { client: Client::new(), userpass, url }))
}
}
/// Perform a RPC call to the specified method with the provided parameters.
/// This is NOT a JSON-RPC call, which use a method of "json_rpc" and are available via
#[async_trait]
impl RpcConnection for HttpRpc {
async fn post(&self, route: &str, body: Vec<u8>) -> Result<Vec<u8>, RpcError> {
let mut builder = self.client.post(self.url.clone() + "/" + route).body(body);
if let Some((user, pass)) = &self.userpass {
let req = self.client.post(&self.url).send().await.map_err(|_| RpcError::InvalidNode)?;
// Only provide authentication if this daemon actually expects it
if let Some(header) = req.headers().get("www-authenticate") {
builder = builder.header(
"Authorization",
digest_auth::parse(header.to_str().map_err(|_| RpcError::InvalidNode)?)
.map_err(|_| RpcError::InvalidNode)?
.respond(&AuthContext::new_post::<_, _, _, &[u8]>(
user,
pass,
"/".to_string() + route,
None,
))
.map_err(|_| RpcError::InvalidNode)?
.to_header_string(),
);
}
}
Ok(
builder
.send()
.await
.map_err(|_| RpcError::ConnectionError)?
.bytes()
.await
.map_err(|_| RpcError::ConnectionError)?
.slice(..)
.to_vec(),
)
}
}
#[derive(Clone, Debug)]
pub struct Rpc<R: RpcConnection>(R);
impl<R: RpcConnection> Rpc<R> {
/// Perform a RPC call to the specified route with the provided parameters.
///
/// This is NOT a JSON-RPC call. They use a route of "json_rpc" and are available via
/// `json_rpc_call`.
pub async fn rpc_call<Params: Serialize + Debug, Response: DeserializeOwned + Debug>(
&self,
method: &str,
route: &str,
params: Option<Params>,
) -> Result<Response, RpcError> {
let mut builder = self.client.post(self.url.clone() + "/" + method);
if let Some(params) = params.as_ref() {
builder = builder.json(params);
}
self.call_tail(method, builder).await
self
.call_tail(
route,
self
.0
.post(
route,
if let Some(params) = params {
serde_json::to_string(&params).unwrap().into_bytes()
} else {
vec![]
},
)
.await?,
)
.await
}
/// Perform a JSON-RPC call to the specified method with the provided parameters
/// Perform a JSON-RPC call with the specified method with the provided parameters
pub async fn json_rpc_call<Response: DeserializeOwned + Debug>(
&self,
method: &str,
@ -146,48 +210,25 @@ impl Rpc {
Ok(self.rpc_call::<_, JsonRpcResponse<Response>>("json_rpc", Some(req)).await?.result)
}
/// Perform a binary call to the specified method with the provided parameters.
/// Perform a binary call to the specified route with the provided parameters.
pub async fn bin_call<Response: DeserializeOwned + Debug>(
&self,
method: &str,
route: &str,
params: Vec<u8>,
) -> Result<Response, RpcError> {
let builder = self.client.post(self.url.clone() + "/" + method).body(params.clone());
self.call_tail(method, builder.header("Content-Type", "application/octet-stream")).await
self.call_tail(route, self.0.post(route, params).await?).await
}
async fn call_tail<Response: DeserializeOwned + Debug>(
&self,
method: &str,
mut builder: RequestBuilder,
route: &str,
res: Vec<u8>,
) -> Result<Response, RpcError> {
if let Some((user, pass)) = &self.userpass {
let req = self.client.post(&self.url).send().await.map_err(|_| RpcError::InvalidNode)?;
// Only provide authentication if this daemon actually expects it
if let Some(header) = req.headers().get("www-authenticate") {
builder = builder.header(
"Authorization",
digest_auth::parse(header.to_str().map_err(|_| RpcError::InvalidNode)?)
.map_err(|_| RpcError::InvalidNode)?
.respond(&AuthContext::new_post::<_, _, _, &[u8]>(
user,
pass,
"/".to_string() + method,
None,
))
.map_err(|_| RpcError::InvalidNode)?
.to_header_string(),
);
}
}
let res = builder.send().await.map_err(|_| RpcError::ConnectionError)?;
Ok(if !method.ends_with(".bin") {
serde_json::from_str(&res.text().await.map_err(|_| RpcError::ConnectionError)?)
Ok(if !route.ends_with(".bin") {
serde_json::from_str(std::str::from_utf8(&res).map_err(|_| RpcError::InvalidNode)?)
.map_err(|_| RpcError::InternalError("Failed to parse JSON response"))?
} else {
monero_epee_bin_serde::from_bytes(&res.bytes().await.map_err(|_| RpcError::ConnectionError)?)
monero_epee_bin_serde::from_bytes(&res)
.map_err(|_| RpcError::InternalError("Failed to parse binary response"))?
})
}

View file

@ -13,7 +13,7 @@ use curve25519_dalek::edwards::EdwardsPoint;
use crate::{
wallet::SpendableOutput,
rpc::{RpcError, Rpc},
rpc::{RpcError, RpcConnection, Rpc},
};
const LOCK_WINDOW: usize = 10;
@ -31,9 +31,9 @@ lazy_static! {
}
#[allow(clippy::too_many_arguments)]
async fn select_n<'a, R: RngCore + CryptoRng>(
async fn select_n<'a, R: RngCore + CryptoRng, RPC: RpcConnection>(
rng: &mut R,
rpc: &Rpc,
rpc: &Rpc<RPC>,
distribution: &MutexGuard<'a, Vec<u64>>,
height: usize,
high: u64,
@ -137,9 +137,9 @@ impl Decoys {
}
/// Select decoys using the same distribution as Monero.
pub async fn select<R: RngCore + CryptoRng>(
pub async fn select<R: RngCore + CryptoRng, RPC: RpcConnection>(
rng: &mut R,
rpc: &Rpc,
rpc: &Rpc<RPC>,
ring_len: usize,
height: usize,
inputs: &[SpendableOutput],

View file

@ -10,7 +10,7 @@ use crate::{
serialize::{read_byte, read_u32, read_u64, read_bytes, read_scalar, read_point, read_raw_vec},
transaction::{Input, Timelock, Transaction},
block::Block,
rpc::{Rpc, RpcError},
rpc::{RpcError, RpcConnection, Rpc},
wallet::{
PaymentId, Extra, address::SubaddressIndex, Scanner, uniqueness, shared_key, amount_decryption,
commitment_mask,
@ -195,13 +195,19 @@ pub struct SpendableOutput {
impl SpendableOutput {
/// Update the spendable output's global index. This is intended to be called if a
/// re-organization occurred.
pub async fn refresh_global_index(&mut self, rpc: &Rpc) -> Result<(), RpcError> {
pub async fn refresh_global_index<RPC: RpcConnection>(
&mut self,
rpc: &Rpc<RPC>,
) -> Result<(), RpcError> {
self.global_index =
rpc.get_o_indexes(self.output.absolute.tx).await?[usize::from(self.output.absolute.o)];
Ok(())
}
pub async fn from(rpc: &Rpc, output: ReceivedOutput) -> Result<SpendableOutput, RpcError> {
pub async fn from<RPC: RpcConnection>(
rpc: &Rpc<RPC>,
output: ReceivedOutput,
) -> Result<SpendableOutput, RpcError> {
let mut output = SpendableOutput { output, global_index: 0 };
output.refresh_global_index(rpc).await?;
Ok(output)
@ -408,9 +414,9 @@ impl Scanner {
/// transactions is a dead giveaway for which transactions you successfully scanned. This
/// function obtains the output indexes for the miner transaction, incrementing from there
/// instead.
pub async fn scan(
pub async fn scan<RPC: RpcConnection>(
&mut self,
rpc: &Rpc,
rpc: &Rpc<RPC>,
block: &Block,
) -> Result<Vec<Timelocked<SpendableOutput>>, RpcError> {
let mut index = rpc.get_o_indexes(block.miner_tx.hash()).await?[0];

View file

@ -33,7 +33,7 @@ use crate::{
RctBase, RctPrunable, RctSignatures,
},
transaction::{Input, Output, Timelock, TransactionPrefix, Transaction},
rpc::{Rpc, RpcError},
rpc::{RpcError, RpcConnection, Rpc},
wallet::{
address::{Network, AddressSpec, MoneroAddress},
ViewPair, SpendableOutput, Decoys, PaymentId, ExtraField, Extra, key_image_sort, uniqueness,
@ -147,9 +147,9 @@ pub enum TransactionError {
FrostError(FrostError),
}
async fn prepare_inputs<R: RngCore + CryptoRng>(
async fn prepare_inputs<R: RngCore + CryptoRng, RPC: RpcConnection>(
rng: &mut R,
rpc: &Rpc,
rpc: &Rpc<RPC>,
ring_len: usize,
inputs: &[SpendableOutput],
spend: &Zeroizing<Scalar>,
@ -663,10 +663,10 @@ impl SignableTransaction {
}
/// Sign this transaction.
pub async fn sign<R: RngCore + CryptoRng>(
pub async fn sign<R: RngCore + CryptoRng, RPC: RpcConnection>(
mut self,
rng: &mut R,
rpc: &Rpc,
rpc: &Rpc<RPC>,
spend: &Zeroizing<Scalar>,
) -> Result<Transaction, TransactionError> {
let mut images = Vec::with_capacity(self.inputs.len());

View file

@ -30,7 +30,7 @@ use crate::{
RctPrunable,
},
transaction::{Input, Transaction},
rpc::Rpc,
rpc::{RpcConnection, Rpc},
wallet::{
TransactionError, InternalPayment, SignableTransaction, Decoys, key_image_sort, uniqueness,
},
@ -74,9 +74,9 @@ pub struct TransactionSignatureMachine {
impl SignableTransaction {
/// Create a FROST signing machine out of this signable transaction.
/// The height is the Monero blockchain height to synchronize around.
pub async fn multisig(
pub async fn multisig<RPC: RpcConnection>(
self,
rpc: &Rpc,
rpc: &Rpc<RPC>,
keys: ThresholdKeys<Ed25519>,
mut transcript: RecommendedTranscript,
height: usize,

View file

@ -12,7 +12,7 @@ use tokio::sync::Mutex;
use monero_serai::{
random_scalar,
rpc::Rpc,
rpc::{HttpRpc, Rpc},
wallet::{
ViewPair, Scanner,
address::{Network, AddressType, AddressSpec, AddressMeta, MoneroAddress},
@ -38,7 +38,7 @@ pub fn random_address() -> (Scalar, ViewPair, MoneroAddress) {
// TODO: Support transactions already on-chain
// TODO: Don't have a side effect of mining blocks more blocks than needed under race conditions
// TODO: mine as much as needed instead of default 10 blocks
pub async fn mine_until_unlocked(rpc: &Rpc, addr: &str, tx_hash: [u8; 32]) {
pub async fn mine_until_unlocked(rpc: &Rpc<HttpRpc>, addr: &str, tx_hash: [u8; 32]) {
// mine until tx is in a block
let mut height = rpc.get_height().await.unwrap();
let mut found = false;
@ -60,7 +60,7 @@ pub async fn mine_until_unlocked(rpc: &Rpc, addr: &str, tx_hash: [u8; 32]) {
// Mines 60 blocks and returns an unlocked miner TX output.
#[allow(dead_code)]
pub async fn get_miner_tx_output(rpc: &Rpc, view: &ViewPair) -> SpendableOutput {
pub async fn get_miner_tx_output(rpc: &Rpc<HttpRpc>, view: &ViewPair) -> SpendableOutput {
let mut scanner = Scanner::from_view(view.clone(), Some(HashSet::new()));
// Mine 60 blocks to unlock a miner TX
@ -74,8 +74,8 @@ pub async fn get_miner_tx_output(rpc: &Rpc, view: &ViewPair) -> SpendableOutput
scanner.scan(rpc, &block).await.unwrap().swap_remove(0).ignore_timelock().swap_remove(0)
}
pub async fn rpc() -> Rpc {
let rpc = Rpc::new("http://127.0.0.1:18081".to_string()).unwrap();
pub async fn rpc() -> Rpc<HttpRpc> {
let rpc = HttpRpc::new("http://127.0.0.1:18081".to_string()).unwrap();
// Only run once
if rpc.get_height().await.unwrap() != 1 {

View file

@ -69,7 +69,7 @@ test!(
},
),
(
|rpc: Rpc, _, _, mut outputs: Vec<ReceivedOutput>| async move {
|rpc: Rpc<_>, _, _, mut outputs: Vec<ReceivedOutput>| async move {
let change_view = ViewPair::new(
&random_scalar(&mut OsRng) * &ED25519_BASEPOINT_TABLE,
Zeroizing::new(random_scalar(&mut OsRng)),

View file

@ -19,7 +19,7 @@ use monero_rpc::{
use monero_serai::{
transaction::Transaction,
rpc::Rpc,
rpc::{HttpRpc, Rpc},
wallet::{
address::{Network, AddressSpec, SubaddressIndex, MoneroAddress},
extra::{MAX_TX_EXTRA_NONCE_SIZE, Extra},
@ -35,7 +35,7 @@ async fn make_integrated_address(payment_id: [u8; 8]) -> String {
integrated_address: String,
}
let rpc = Rpc::new("http://127.0.0.1:6061".to_string()).unwrap();
let rpc = HttpRpc::new("http://127.0.0.1:6061".to_string()).unwrap();
let res = rpc
.json_rpc_call::<IntegratedAddressResponse>(
"make_integrated_address",
@ -47,7 +47,7 @@ async fn make_integrated_address(payment_id: [u8; 8]) -> String {
res.integrated_address
}
async fn initialize_rpcs() -> (WalletClient, Rpc, monero_rpc::monero::Address) {
async fn initialize_rpcs() -> (WalletClient, Rpc<HttpRpc>, monero_rpc::monero::Address) {
let wallet_rpc =
monero_rpc::RpcClientBuilder::new().build("http://127.0.0.1:6061").unwrap().wallet();
let daemon_rpc = runner::rpc().await;