Implement presumed_origin

Before we yield a block for scanning, we save all of the contained script
public keys. Then, when we want the address credited for creating an output,
we read the script public key of the spent output from the database.

Fixes #559.
This commit is contained in:
Luke Parker 2024-09-11 02:46:18 -04:00
parent 4cb838e248
commit 93c7d06684
8 changed files with 200 additions and 63 deletions

View file

@ -1,3 +1,4 @@
use core::fmt;
use std::collections::HashMap; use std::collections::HashMap;
use ciphersuite::{Ciphersuite, Secp256k1}; use ciphersuite::{Ciphersuite, Secp256k1};
@ -6,6 +7,7 @@ use bitcoin_serai::bitcoin::block::{Header, Block as BBlock};
use serai_client::networks::bitcoin::Address; use serai_client::networks::bitcoin::Address;
use serai_db::Db;
use primitives::{ReceivedOutput, EventualityTracker}; use primitives::{ReceivedOutput, EventualityTracker};
use crate::{hash_bytes, scan::scanner, output::Output, transaction::Eventuality}; use crate::{hash_bytes, scan::scanner, output::Output, transaction::Eventuality};
@ -21,11 +23,16 @@ impl primitives::BlockHeader for BlockHeader {
} }
} }
#[derive(Clone, Debug)] #[derive(Clone)]
pub(crate) struct Block(pub(crate) BBlock); pub(crate) struct Block<D: Db>(pub(crate) D, pub(crate) BBlock);
impl<D: Db> fmt::Debug for Block<D> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Block").field("1", &self.1).finish_non_exhaustive()
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl primitives::Block for Block { impl<D: Db> primitives::Block for Block<D> {
type Header = BlockHeader; type Header = BlockHeader;
type Key = <Secp256k1 as Ciphersuite>::G; type Key = <Secp256k1 as Ciphersuite>::G;
@ -34,7 +41,7 @@ impl primitives::Block for Block {
type Eventuality = Eventuality; type Eventuality = Eventuality;
fn id(&self) -> [u8; 32] { fn id(&self) -> [u8; 32] {
primitives::BlockHeader::id(&BlockHeader(self.0.header)) primitives::BlockHeader::id(&BlockHeader(self.1.header))
} }
fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec<Self::Output> { fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec<Self::Output> {
@ -42,9 +49,9 @@ impl primitives::Block for Block {
let mut res = vec![]; let mut res = vec![];
// We skip the coinbase transaction as its burdened by maturity // We skip the coinbase transaction as its burdened by maturity
for tx in &self.0.txdata[1 ..] { for tx in &self.1.txdata[1 ..] {
for output in scanner.scan_transaction(tx) { for output in scanner.scan_transaction(tx) {
res.push(Output::new(key, tx, output)); res.push(Output::new(&self.0, key, tx, output));
} }
} }
res res
@ -59,7 +66,7 @@ impl primitives::Block for Block {
Self::Eventuality, Self::Eventuality,
> { > {
let mut res = HashMap::new(); let mut res = HashMap::new();
for tx in &self.0.txdata[1 ..] { for tx in &self.1.txdata[1 ..] {
let id = hash_bytes(tx.compute_txid().to_raw_hash()); let id = hash_bytes(tx.compute_txid().to_raw_hash());
if let Some(eventuality) = eventualities.active_eventualities.remove(id.as_slice()) { if let Some(eventuality) = eventualities.active_eventualities.remove(id.as_slice()) {
res.insert(id, eventuality); res.insert(id, eventuality);

View file

@ -0,0 +1,8 @@
use serai_db::{Get, DbTxn, create_db};
create_db! {
BitcoinProcessor {
LatestBlockToYieldAsFinalized: () -> u64,
ScriptPubKey: (tx: [u8; 32], vout: u32) -> Vec<u8>,
}
}

View file

@ -18,6 +18,10 @@ mod block;
mod rpc; mod rpc;
mod scheduler; mod scheduler;
// Our custom code for Bitcoin
mod db;
mod txindex;
pub(crate) fn hash_bytes(hash: bitcoin_serai::bitcoin::hashes::sha256d::Hash) -> [u8; 32] { pub(crate) fn hash_bytes(hash: bitcoin_serai::bitcoin::hashes::sha256d::Hash) -> [u8; 32] {
use bitcoin_serai::bitcoin::hashes::Hash; use bitcoin_serai::bitcoin::hashes::Hash;

View file

@ -15,6 +15,7 @@ use bitcoin_serai::{
use scale::{Encode, Decode, IoReader}; use scale::{Encode, Decode, IoReader};
use borsh::{BorshSerialize, BorshDeserialize}; use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::Get;
use serai_client::{ use serai_client::{
primitives::{Coin, Amount, Balance, ExternalAddress}, primitives::{Coin, Amount, Balance, ExternalAddress},
@ -52,13 +53,35 @@ pub(crate) struct Output {
} }
impl Output { impl Output {
pub fn new(key: <Secp256k1 as Ciphersuite>::G, tx: &Transaction, output: WalletOutput) -> Self { pub fn new(
getter: &impl Get,
key: <Secp256k1 as Ciphersuite>::G,
tx: &Transaction,
output: WalletOutput,
) -> Self {
Self { Self {
kind: offsets_for_key(key) kind: offsets_for_key(key)
.into_iter() .into_iter()
.find_map(|(kind, offset)| (offset == output.offset()).then_some(kind)) .find_map(|(kind, offset)| (offset == output.offset()).then_some(kind))
.expect("scanned output for unknown offset"), .expect("scanned output for unknown offset"),
presumed_origin: presumed_origin(tx), presumed_origin: presumed_origin(getter, tx),
output,
data: extract_serai_data(tx),
}
}
pub fn new_with_presumed_origin(
key: <Secp256k1 as Ciphersuite>::G,
tx: &Transaction,
presumed_origin: Option<Address>,
output: WalletOutput,
) -> Self {
Self {
kind: offsets_for_key(key)
.into_iter()
.find_map(|(kind, offset)| (offset == output.offset()).then_some(kind))
.expect("scanned output for unknown offset"),
presumed_origin,
output, output,
data: extract_serai_data(tx), data: extract_serai_data(tx),
} }

View file

@ -2,34 +2,36 @@ use bitcoin_serai::rpc::{RpcError, Rpc as BRpc};
use serai_client::primitives::{NetworkId, Coin, Amount}; use serai_client::primitives::{NetworkId, Coin, Amount};
use serai_db::Db;
use scanner::ScannerFeed; use scanner::ScannerFeed;
use signers::TransactionPublisher; use signers::TransactionPublisher;
use crate::{ use crate::{
db,
transaction::Transaction, transaction::Transaction,
block::{BlockHeader, Block}, block::{BlockHeader, Block},
}; };
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Rpc(BRpc); pub(crate) struct Rpc<D: Db> {
pub(crate) db: D,
pub(crate) rpc: BRpc,
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl ScannerFeed for Rpc { impl<D: Db> ScannerFeed for Rpc<D> {
const NETWORK: NetworkId = NetworkId::Bitcoin; const NETWORK: NetworkId = NetworkId::Bitcoin;
const CONFIRMATIONS: u64 = 6; const CONFIRMATIONS: u64 = 6;
const WINDOW_LENGTH: u64 = 6; const WINDOW_LENGTH: u64 = 6;
const TEN_MINUTES: u64 = 1; const TEN_MINUTES: u64 = 1;
type Block = Block; type Block = Block<D>;
type EphemeralError = RpcError; type EphemeralError = RpcError;
async fn latest_finalized_block_number(&self) -> Result<u64, Self::EphemeralError> { async fn latest_finalized_block_number(&self) -> Result<u64, Self::EphemeralError> {
u64::try_from(self.0.get_latest_block_number().await?) db::LatestBlockToYieldAsFinalized::get(&self.db).ok_or(RpcError::ConnectionError)
.unwrap()
.checked_sub(Self::CONFIRMATIONS)
.ok_or(RpcError::ConnectionError)
} }
async fn unchecked_block_header_by_number( async fn unchecked_block_header_by_number(
@ -37,7 +39,7 @@ impl ScannerFeed for Rpc {
number: u64, number: u64,
) -> Result<<Self::Block as primitives::Block>::Header, Self::EphemeralError> { ) -> Result<<Self::Block as primitives::Block>::Header, Self::EphemeralError> {
Ok(BlockHeader( Ok(BlockHeader(
self.0.get_block(&self.0.get_block_hash(number.try_into().unwrap()).await?).await?.header, self.rpc.get_block(&self.rpc.get_block_hash(number.try_into().unwrap()).await?).await?.header,
)) ))
} }
@ -45,7 +47,10 @@ impl ScannerFeed for Rpc {
&self, &self,
number: u64, number: u64,
) -> Result<Self::Block, Self::EphemeralError> { ) -> Result<Self::Block, Self::EphemeralError> {
Ok(Block(self.0.get_block(&self.0.get_block_hash(number.try_into().unwrap()).await?).await?)) Ok(Block(
self.db.clone(),
self.rpc.get_block(&self.rpc.get_block_hash(number.try_into().unwrap()).await?).await?,
))
} }
fn dust(coin: Coin) -> Amount { fn dust(coin: Coin) -> Amount {
@ -98,10 +103,10 @@ impl ScannerFeed for Rpc {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl TransactionPublisher<Transaction> for Rpc { impl<D: Db> TransactionPublisher<Transaction> for Rpc<D> {
type EphemeralError = RpcError; type EphemeralError = RpcError;
async fn publish(&self, tx: Transaction) -> Result<(), Self::EphemeralError> { async fn publish(&self, tx: Transaction) -> Result<(), Self::EphemeralError> {
self.0.send_raw_transaction(&tx.0).await.map(|_| ()) self.rpc.send_raw_transaction(&tx.0).await.map(|_| ())
} }
} }

View file

@ -13,8 +13,11 @@ use bitcoin_serai::{
use serai_client::networks::bitcoin::Address; use serai_client::networks::bitcoin::Address;
use serai_db::Get;
use primitives::OutputType; use primitives::OutputType;
use crate::{db, hash_bytes};
const KEY_DST: &[u8] = b"Serai Bitcoin Processor Key Offset"; const KEY_DST: &[u8] = b"Serai Bitcoin Processor Key Offset";
static BRANCH_BASE_OFFSET: LazyLock<<Secp256k1 as Ciphersuite>::F> = static BRANCH_BASE_OFFSET: LazyLock<<Secp256k1 as Ciphersuite>::F> =
LazyLock::new(|| Secp256k1::hash_to_F(KEY_DST, b"branch")); LazyLock::new(|| Secp256k1::hash_to_F(KEY_DST, b"branch"));
@ -55,26 +58,17 @@ pub(crate) fn scanner(key: <Secp256k1 as Ciphersuite>::G) -> Scanner {
scanner scanner
} }
pub(crate) fn presumed_origin(tx: &Transaction) -> Option<Address> { pub(crate) fn presumed_origin(getter: &impl Get, tx: &Transaction) -> Option<Address> {
todo!("TODO") for input in &tx.input {
let txid = hash_bytes(input.previous_output.txid.to_raw_hash());
/* let vout = input.previous_output.vout;
let spent_output = { if let Some(address) = Address::new(ScriptBuf::from_bytes(
let input = &tx.input[0]; db::ScriptPubKey::get(getter, txid, vout).expect("unknown output being spent by input"),
let mut spent_tx = input.previous_output.txid.as_raw_hash().to_byte_array(); )) {
spent_tx.reverse(); return Some(address);
let mut tx;
while {
tx = self.rpc.get_transaction(&spent_tx).await;
tx.is_err()
} {
log::error!("couldn't get transaction from bitcoin node: {tx:?}");
sleep(Duration::from_secs(5)).await;
} }
tx.unwrap().output.swap_remove(usize::try_from(input.previous_output.vout).unwrap()) }
}; None?
Address::new(spent_output.script_pubkey)
*/
} }
// Checks if this script matches SHA256 PUSH MSG_HASH OP_EQUALVERIFY .. // Checks if this script matches SHA256 PUSH MSG_HASH OP_EQUALVERIFY ..

View file

@ -10,6 +10,7 @@ use serai_client::{
networks::bitcoin::Address, networks::bitcoin::Address,
}; };
use serai_db::Db;
use primitives::{OutputType, ReceivedOutput, Payment}; use primitives::{OutputType, ReceivedOutput, Payment};
use scanner::{KeyFor, AddressFor, OutputFor, BlockFor}; use scanner::{KeyFor, AddressFor, OutputFor, BlockFor};
use utxo_scheduler::{PlannedTransaction, TransactionPlanner}; use utxo_scheduler::{PlannedTransaction, TransactionPlanner};
@ -31,17 +32,24 @@ fn address_from_serai_key(key: <Secp256k1 as Ciphersuite>::G, kind: OutputType)
.expect("couldn't create Serai-representable address for P2TR script") .expect("couldn't create Serai-representable address for P2TR script")
} }
fn signable_transaction( fn signable_transaction<D: Db>(
fee_per_vbyte: u64, fee_per_vbyte: u64,
inputs: Vec<OutputFor<Rpc>>, inputs: Vec<OutputFor<Rpc<D>>>,
payments: Vec<Payment<AddressFor<Rpc>>>, payments: Vec<Payment<AddressFor<Rpc<D>>>>,
change: Option<KeyFor<Rpc>>, change: Option<KeyFor<Rpc<D>>>,
) -> Result<(SignableTransaction, BSignableTransaction), TransactionError> { ) -> Result<(SignableTransaction, BSignableTransaction), TransactionError> {
assert!(inputs.len() < Planner::MAX_INPUTS); assert!(
assert!((payments.len() + usize::from(u8::from(change.is_some()))) < Planner::MAX_OUTPUTS); inputs.len() <
<Planner as TransactionPlanner<Rpc<D>, EffectedReceivedOutputs<Rpc<D>>>>::MAX_INPUTS
);
assert!(
(payments.len() + usize::from(u8::from(change.is_some()))) <
<Planner as TransactionPlanner<Rpc<D>, EffectedReceivedOutputs<Rpc<D>>>>::MAX_OUTPUTS
);
let inputs = inputs.into_iter().map(|input| input.output).collect::<Vec<_>>(); let inputs = inputs.into_iter().map(|input| input.output).collect::<Vec<_>>();
let payments = payments
let mut payments = payments
.into_iter() .into_iter()
.map(|payment| { .map(|payment| {
(payment.address().clone(), { (payment.address().clone(), {
@ -51,7 +59,8 @@ fn signable_transaction(
}) })
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let change = change.map(Planner::change_address); let change = change
.map(<Planner as TransactionPlanner<Rpc<D>, EffectedReceivedOutputs<Rpc<D>>>>::change_address);
// TODO: ACP output // TODO: ACP output
BSignableTransaction::new( BSignableTransaction::new(
@ -69,7 +78,7 @@ fn signable_transaction(
} }
pub(crate) struct Planner; pub(crate) struct Planner;
impl TransactionPlanner<Rpc, EffectedReceivedOutputs<Rpc>> for Planner { impl<D: Db> TransactionPlanner<Rpc<D>, EffectedReceivedOutputs<Rpc<D>>> for Planner {
type FeeRate = u64; type FeeRate = u64;
type SignableTransaction = SignableTransaction; type SignableTransaction = SignableTransaction;
@ -94,29 +103,29 @@ impl TransactionPlanner<Rpc, EffectedReceivedOutputs<Rpc>> for Planner {
// to unstick any transactions which had too low of a fee. // to unstick any transactions which had too low of a fee.
const MAX_OUTPUTS: usize = 519; const MAX_OUTPUTS: usize = 519;
fn fee_rate(block: &BlockFor<Rpc>, coin: Coin) -> Self::FeeRate { fn fee_rate(block: &BlockFor<Rpc<D>>, coin: Coin) -> Self::FeeRate {
assert_eq!(coin, Coin::Bitcoin); assert_eq!(coin, Coin::Bitcoin);
// TODO // TODO
1 1
} }
fn branch_address(key: KeyFor<Rpc>) -> AddressFor<Rpc> { fn branch_address(key: KeyFor<Rpc<D>>) -> AddressFor<Rpc<D>> {
address_from_serai_key(key, OutputType::Branch) address_from_serai_key(key, OutputType::Branch)
} }
fn change_address(key: KeyFor<Rpc>) -> AddressFor<Rpc> { fn change_address(key: KeyFor<Rpc<D>>) -> AddressFor<Rpc<D>> {
address_from_serai_key(key, OutputType::Change) address_from_serai_key(key, OutputType::Change)
} }
fn forwarding_address(key: KeyFor<Rpc>) -> AddressFor<Rpc> { fn forwarding_address(key: KeyFor<Rpc<D>>) -> AddressFor<Rpc<D>> {
address_from_serai_key(key, OutputType::Forwarded) address_from_serai_key(key, OutputType::Forwarded)
} }
fn calculate_fee( fn calculate_fee(
fee_rate: Self::FeeRate, fee_rate: Self::FeeRate,
inputs: Vec<OutputFor<Rpc>>, inputs: Vec<OutputFor<Rpc<D>>>,
payments: Vec<Payment<AddressFor<Rpc>>>, payments: Vec<Payment<AddressFor<Rpc<D>>>>,
change: Option<KeyFor<Rpc>>, change: Option<KeyFor<Rpc<D>>>,
) -> Amount { ) -> Amount {
match signable_transaction(fee_rate, inputs, payments, change) { match signable_transaction::<D>(fee_rate, inputs, payments, change) {
Ok(tx) => Amount(tx.1.needed_fee()), Ok(tx) => Amount(tx.1.needed_fee()),
Err( Err(
TransactionError::NoInputs | TransactionError::NoOutputs | TransactionError::DustPayment, TransactionError::NoInputs | TransactionError::NoOutputs | TransactionError::DustPayment,
@ -133,17 +142,17 @@ impl TransactionPlanner<Rpc, EffectedReceivedOutputs<Rpc>> for Planner {
fn plan( fn plan(
fee_rate: Self::FeeRate, fee_rate: Self::FeeRate,
inputs: Vec<OutputFor<Rpc>>, inputs: Vec<OutputFor<Rpc<D>>>,
payments: Vec<Payment<AddressFor<Rpc>>>, payments: Vec<Payment<AddressFor<Rpc<D>>>>,
change: Option<KeyFor<Rpc>>, change: Option<KeyFor<Rpc<D>>>,
) -> PlannedTransaction<Rpc, Self::SignableTransaction, EffectedReceivedOutputs<Rpc>> { ) -> PlannedTransaction<Rpc<D>, Self::SignableTransaction, EffectedReceivedOutputs<Rpc<D>>> {
let key = inputs.first().unwrap().key(); let key = inputs.first().unwrap().key();
for input in &inputs { for input in &inputs {
assert_eq!(key, input.key()); assert_eq!(key, input.key());
} }
let singular_spent_output = (inputs.len() == 1).then(|| inputs[0].id()); let singular_spent_output = (inputs.len() == 1).then(|| inputs[0].id());
match signable_transaction(fee_rate, inputs, payments, change) { match signable_transaction::<D>(fee_rate, inputs.clone(), payments, change) {
Ok(tx) => PlannedTransaction { Ok(tx) => PlannedTransaction {
signable: tx.0, signable: tx.0,
eventuality: Eventuality { txid: tx.1.txid(), singular_spent_output }, eventuality: Eventuality { txid: tx.1.txid(), singular_spent_output },
@ -153,7 +162,14 @@ impl TransactionPlanner<Rpc, EffectedReceivedOutputs<Rpc>> for Planner {
let mut res = vec![]; let mut res = vec![];
for output in scanner.scan_transaction(tx) { for output in scanner.scan_transaction(tx) {
res.push(Output::new(key, tx, output)); res.push(Output::new_with_presumed_origin(
key,
tx,
// It shouldn't matter if this is wrong as we should never try to return these
// We still provide an accurate value to ensure a lack of discrepancies
Some(Address::new(inputs[0].output.output().script_pubkey.clone()).unwrap()),
output,
));
} }
res res
}), }),
@ -174,4 +190,4 @@ impl TransactionPlanner<Rpc, EffectedReceivedOutputs<Rpc>> for Planner {
} }
} }
pub(crate) type Scheduler = GenericScheduler<Rpc, Planner>; pub(crate) type Scheduler<D> = GenericScheduler<Rpc<D>, Planner>;

View file

@ -0,0 +1,80 @@
/*
We want to be able to return received outputs. We do that by iterating over the inputs to find an
address format we recognize, then setting that address as the address to return to.
Since inputs only contain the script signatures, yet addresses are for script public keys, we
need to pull up the output spent by an input and read the script public key from that. While we
could use `txindex=1`, and an asynchronous call to the Bitcoin node, we:
1) Can maintain a much smaller index ourselves
2) Don't want the asynchronous call (which would require the flow be async, allowed to
potentially error, and more latent)
3) Don't want to risk Bitcoin's `txindex` corruptions (frequently observed on testnet)
This task builds that index.
*/
use serai_db::{DbTxn, Db};
use primitives::task::ContinuallyRan;
use scanner::ScannerFeed;
use crate::{db, rpc::Rpc, hash_bytes};
pub(crate) struct TxIndexTask<D: Db>(Rpc<D>);
#[async_trait::async_trait]
impl<D: Db> ContinuallyRan for TxIndexTask<D> {
async fn run_iteration(&mut self) -> Result<bool, String> {
let latest_block_number = self
.0
.rpc
.get_latest_block_number()
.await
.map_err(|e| format!("couldn't fetch latest block number: {e:?}"))?;
let latest_block_number = u64::try_from(latest_block_number).unwrap();
// `CONFIRMATIONS - 1` as any on-chain block inherently has one confirmation (itself)
let finalized_block_number =
latest_block_number.checked_sub(Rpc::<D>::CONFIRMATIONS - 1).ok_or(format!(
"blockchain only just started and doesn't have {} blocks yet",
Rpc::<D>::CONFIRMATIONS
))?;
let finalized_block_number_in_db = db::LatestBlockToYieldAsFinalized::get(&self.0.db);
let next_block = finalized_block_number_in_db.map_or(0, |block| block + 1);
let mut iterated = false;
for b in next_block ..= finalized_block_number {
iterated = true;
// Fetch the block
let block_hash = self
.0
.rpc
.get_block_hash(b.try_into().unwrap())
.await
.map_err(|e| format!("couldn't fetch block hash for block {b}: {e:?}"))?;
let block = self
.0
.rpc
.get_block(&block_hash)
.await
.map_err(|e| format!("couldn't fetch block {b}: {e:?}"))?;
let mut txn = self.0.db.txn();
for tx in &block.txdata[1 ..] {
let txid = hash_bytes(tx.compute_txid().to_raw_hash());
for (o, output) in tx.output.iter().enumerate() {
let o = u32::try_from(o).unwrap();
// Set the script pub key for this transaction
db::ScriptPubKey::set(&mut txn, txid, o, &output.script_pubkey.clone().into_bytes());
}
}
db::LatestBlockToYieldAsFinalized::set(&mut txn, &b);
txn.commit();
}
Ok(iterated)
}
}