diff --git a/processor/bitcoin/src/block.rs b/processor/bitcoin/src/block.rs index 24cccec9..8221c8b5 100644 --- a/processor/bitcoin/src/block.rs +++ b/processor/bitcoin/src/block.rs @@ -1,3 +1,4 @@ +use core::fmt; use std::collections::HashMap; use ciphersuite::{Ciphersuite, Secp256k1}; @@ -6,6 +7,7 @@ use bitcoin_serai::bitcoin::block::{Header, Block as BBlock}; use serai_client::networks::bitcoin::Address; +use serai_db::Db; use primitives::{ReceivedOutput, EventualityTracker}; use crate::{hash_bytes, scan::scanner, output::Output, transaction::Eventuality}; @@ -21,11 +23,16 @@ impl primitives::BlockHeader for BlockHeader { } } -#[derive(Clone, Debug)] -pub(crate) struct Block(pub(crate) BBlock); +#[derive(Clone)] +pub(crate) struct Block(pub(crate) D, pub(crate) BBlock); +impl fmt::Debug for Block { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Block").field("1", &self.1).finish_non_exhaustive() + } +} #[async_trait::async_trait] -impl primitives::Block for Block { +impl primitives::Block for Block { type Header = BlockHeader; type Key = ::G; @@ -34,7 +41,7 @@ impl primitives::Block for Block { type Eventuality = Eventuality; fn id(&self) -> [u8; 32] { - primitives::BlockHeader::id(&BlockHeader(self.0.header)) + primitives::BlockHeader::id(&BlockHeader(self.1.header)) } fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec { @@ -42,9 +49,9 @@ impl primitives::Block for Block { let mut res = vec![]; // We skip the coinbase transaction as its burdened by maturity - for tx in &self.0.txdata[1 ..] { + for tx in &self.1.txdata[1 ..] { for output in scanner.scan_transaction(tx) { - res.push(Output::new(key, tx, output)); + res.push(Output::new(&self.0, key, tx, output)); } } res @@ -59,7 +66,7 @@ impl primitives::Block for Block { Self::Eventuality, > { let mut res = HashMap::new(); - for tx in &self.0.txdata[1 ..] { + for tx in &self.1.txdata[1 ..] { let id = hash_bytes(tx.compute_txid().to_raw_hash()); if let Some(eventuality) = eventualities.active_eventualities.remove(id.as_slice()) { res.insert(id, eventuality); diff --git a/processor/bitcoin/src/db.rs b/processor/bitcoin/src/db.rs new file mode 100644 index 00000000..1d73ebfe --- /dev/null +++ b/processor/bitcoin/src/db.rs @@ -0,0 +1,8 @@ +use serai_db::{Get, DbTxn, create_db}; + +create_db! { + BitcoinProcessor { + LatestBlockToYieldAsFinalized: () -> u64, + ScriptPubKey: (tx: [u8; 32], vout: u32) -> Vec, + } +} diff --git a/processor/bitcoin/src/main.rs b/processor/bitcoin/src/main.rs index 653e8b5a..941cc0dc 100644 --- a/processor/bitcoin/src/main.rs +++ b/processor/bitcoin/src/main.rs @@ -18,6 +18,10 @@ mod block; mod rpc; mod scheduler; +// Our custom code for Bitcoin +mod db; +mod txindex; + pub(crate) fn hash_bytes(hash: bitcoin_serai::bitcoin::hashes::sha256d::Hash) -> [u8; 32] { use bitcoin_serai::bitcoin::hashes::Hash; diff --git a/processor/bitcoin/src/output.rs b/processor/bitcoin/src/output.rs index dc541350..2ed03705 100644 --- a/processor/bitcoin/src/output.rs +++ b/processor/bitcoin/src/output.rs @@ -15,6 +15,7 @@ use bitcoin_serai::{ use scale::{Encode, Decode, IoReader}; use borsh::{BorshSerialize, BorshDeserialize}; +use serai_db::Get; use serai_client::{ primitives::{Coin, Amount, Balance, ExternalAddress}, @@ -52,13 +53,35 @@ pub(crate) struct Output { } impl Output { - pub fn new(key: ::G, tx: &Transaction, output: WalletOutput) -> Self { + pub fn new( + getter: &impl Get, + key: ::G, + tx: &Transaction, + output: WalletOutput, + ) -> Self { Self { kind: offsets_for_key(key) .into_iter() .find_map(|(kind, offset)| (offset == output.offset()).then_some(kind)) .expect("scanned output for unknown offset"), - presumed_origin: presumed_origin(tx), + presumed_origin: presumed_origin(getter, tx), + output, + data: extract_serai_data(tx), + } + } + + pub fn new_with_presumed_origin( + key: ::G, + tx: &Transaction, + presumed_origin: Option
, + output: WalletOutput, + ) -> Self { + Self { + kind: offsets_for_key(key) + .into_iter() + .find_map(|(kind, offset)| (offset == output.offset()).then_some(kind)) + .expect("scanned output for unknown offset"), + presumed_origin, output, data: extract_serai_data(tx), } diff --git a/processor/bitcoin/src/rpc.rs b/processor/bitcoin/src/rpc.rs index 8af82121..cafb0ef3 100644 --- a/processor/bitcoin/src/rpc.rs +++ b/processor/bitcoin/src/rpc.rs @@ -2,34 +2,36 @@ use bitcoin_serai::rpc::{RpcError, Rpc as BRpc}; use serai_client::primitives::{NetworkId, Coin, Amount}; +use serai_db::Db; use scanner::ScannerFeed; use signers::TransactionPublisher; use crate::{ + db, transaction::Transaction, block::{BlockHeader, Block}, }; #[derive(Clone)] -pub(crate) struct Rpc(BRpc); +pub(crate) struct Rpc { + pub(crate) db: D, + pub(crate) rpc: BRpc, +} #[async_trait::async_trait] -impl ScannerFeed for Rpc { +impl ScannerFeed for Rpc { const NETWORK: NetworkId = NetworkId::Bitcoin; const CONFIRMATIONS: u64 = 6; const WINDOW_LENGTH: u64 = 6; const TEN_MINUTES: u64 = 1; - type Block = Block; + type Block = Block; type EphemeralError = RpcError; async fn latest_finalized_block_number(&self) -> Result { - u64::try_from(self.0.get_latest_block_number().await?) - .unwrap() - .checked_sub(Self::CONFIRMATIONS) - .ok_or(RpcError::ConnectionError) + db::LatestBlockToYieldAsFinalized::get(&self.db).ok_or(RpcError::ConnectionError) } async fn unchecked_block_header_by_number( @@ -37,7 +39,7 @@ impl ScannerFeed for Rpc { number: u64, ) -> Result<::Header, Self::EphemeralError> { Ok(BlockHeader( - self.0.get_block(&self.0.get_block_hash(number.try_into().unwrap()).await?).await?.header, + self.rpc.get_block(&self.rpc.get_block_hash(number.try_into().unwrap()).await?).await?.header, )) } @@ -45,7 +47,10 @@ impl ScannerFeed for Rpc { &self, number: u64, ) -> Result { - Ok(Block(self.0.get_block(&self.0.get_block_hash(number.try_into().unwrap()).await?).await?)) + Ok(Block( + self.db.clone(), + self.rpc.get_block(&self.rpc.get_block_hash(number.try_into().unwrap()).await?).await?, + )) } fn dust(coin: Coin) -> Amount { @@ -98,10 +103,10 @@ impl ScannerFeed for Rpc { } #[async_trait::async_trait] -impl TransactionPublisher for Rpc { +impl TransactionPublisher for Rpc { type EphemeralError = RpcError; async fn publish(&self, tx: Transaction) -> Result<(), Self::EphemeralError> { - self.0.send_raw_transaction(&tx.0).await.map(|_| ()) + self.rpc.send_raw_transaction(&tx.0).await.map(|_| ()) } } diff --git a/processor/bitcoin/src/scan.rs b/processor/bitcoin/src/scan.rs index 43518b57..b3d3a6dc 100644 --- a/processor/bitcoin/src/scan.rs +++ b/processor/bitcoin/src/scan.rs @@ -13,8 +13,11 @@ use bitcoin_serai::{ use serai_client::networks::bitcoin::Address; +use serai_db::Get; use primitives::OutputType; +use crate::{db, hash_bytes}; + const KEY_DST: &[u8] = b"Serai Bitcoin Processor Key Offset"; static BRANCH_BASE_OFFSET: LazyLock<::F> = LazyLock::new(|| Secp256k1::hash_to_F(KEY_DST, b"branch")); @@ -55,26 +58,17 @@ pub(crate) fn scanner(key: ::G) -> Scanner { scanner } -pub(crate) fn presumed_origin(tx: &Transaction) -> Option
{ - todo!("TODO") - - /* - let spent_output = { - let input = &tx.input[0]; - let mut spent_tx = input.previous_output.txid.as_raw_hash().to_byte_array(); - spent_tx.reverse(); - let mut tx; - while { - tx = self.rpc.get_transaction(&spent_tx).await; - tx.is_err() - } { - log::error!("couldn't get transaction from bitcoin node: {tx:?}"); - sleep(Duration::from_secs(5)).await; +pub(crate) fn presumed_origin(getter: &impl Get, tx: &Transaction) -> Option
{ + for input in &tx.input { + let txid = hash_bytes(input.previous_output.txid.to_raw_hash()); + let vout = input.previous_output.vout; + if let Some(address) = Address::new(ScriptBuf::from_bytes( + db::ScriptPubKey::get(getter, txid, vout).expect("unknown output being spent by input"), + )) { + return Some(address); } - tx.unwrap().output.swap_remove(usize::try_from(input.previous_output.vout).unwrap()) - }; - Address::new(spent_output.script_pubkey) - */ + } + None? } // Checks if this script matches SHA256 PUSH MSG_HASH OP_EQUALVERIFY .. diff --git a/processor/bitcoin/src/scheduler.rs b/processor/bitcoin/src/scheduler.rs index c48f9a69..e225613c 100644 --- a/processor/bitcoin/src/scheduler.rs +++ b/processor/bitcoin/src/scheduler.rs @@ -10,6 +10,7 @@ use serai_client::{ networks::bitcoin::Address, }; +use serai_db::Db; use primitives::{OutputType, ReceivedOutput, Payment}; use scanner::{KeyFor, AddressFor, OutputFor, BlockFor}; use utxo_scheduler::{PlannedTransaction, TransactionPlanner}; @@ -31,17 +32,24 @@ fn address_from_serai_key(key: ::G, kind: OutputType) .expect("couldn't create Serai-representable address for P2TR script") } -fn signable_transaction( +fn signable_transaction( fee_per_vbyte: u64, - inputs: Vec>, - payments: Vec>>, - change: Option>, + inputs: Vec>>, + payments: Vec>>>, + change: Option>>, ) -> Result<(SignableTransaction, BSignableTransaction), TransactionError> { - assert!(inputs.len() < Planner::MAX_INPUTS); - assert!((payments.len() + usize::from(u8::from(change.is_some()))) < Planner::MAX_OUTPUTS); + assert!( + inputs.len() < + , EffectedReceivedOutputs>>>::MAX_INPUTS + ); + assert!( + (payments.len() + usize::from(u8::from(change.is_some()))) < + , EffectedReceivedOutputs>>>::MAX_OUTPUTS + ); let inputs = inputs.into_iter().map(|input| input.output).collect::>(); - let payments = payments + + let mut payments = payments .into_iter() .map(|payment| { (payment.address().clone(), { @@ -51,7 +59,8 @@ fn signable_transaction( }) }) .collect::>(); - let change = change.map(Planner::change_address); + let change = change + .map(, EffectedReceivedOutputs>>>::change_address); // TODO: ACP output BSignableTransaction::new( @@ -69,7 +78,7 @@ fn signable_transaction( } pub(crate) struct Planner; -impl TransactionPlanner> for Planner { +impl TransactionPlanner, EffectedReceivedOutputs>> for Planner { type FeeRate = u64; type SignableTransaction = SignableTransaction; @@ -94,29 +103,29 @@ impl TransactionPlanner> for Planner { // to unstick any transactions which had too low of a fee. const MAX_OUTPUTS: usize = 519; - fn fee_rate(block: &BlockFor, coin: Coin) -> Self::FeeRate { + fn fee_rate(block: &BlockFor>, coin: Coin) -> Self::FeeRate { assert_eq!(coin, Coin::Bitcoin); // TODO 1 } - fn branch_address(key: KeyFor) -> AddressFor { + fn branch_address(key: KeyFor>) -> AddressFor> { address_from_serai_key(key, OutputType::Branch) } - fn change_address(key: KeyFor) -> AddressFor { + fn change_address(key: KeyFor>) -> AddressFor> { address_from_serai_key(key, OutputType::Change) } - fn forwarding_address(key: KeyFor) -> AddressFor { + fn forwarding_address(key: KeyFor>) -> AddressFor> { address_from_serai_key(key, OutputType::Forwarded) } fn calculate_fee( fee_rate: Self::FeeRate, - inputs: Vec>, - payments: Vec>>, - change: Option>, + inputs: Vec>>, + payments: Vec>>>, + change: Option>>, ) -> Amount { - match signable_transaction(fee_rate, inputs, payments, change) { + match signable_transaction::(fee_rate, inputs, payments, change) { Ok(tx) => Amount(tx.1.needed_fee()), Err( TransactionError::NoInputs | TransactionError::NoOutputs | TransactionError::DustPayment, @@ -133,17 +142,17 @@ impl TransactionPlanner> for Planner { fn plan( fee_rate: Self::FeeRate, - inputs: Vec>, - payments: Vec>>, - change: Option>, - ) -> PlannedTransaction> { + inputs: Vec>>, + payments: Vec>>>, + change: Option>>, + ) -> PlannedTransaction, Self::SignableTransaction, EffectedReceivedOutputs>> { let key = inputs.first().unwrap().key(); for input in &inputs { assert_eq!(key, input.key()); } let singular_spent_output = (inputs.len() == 1).then(|| inputs[0].id()); - match signable_transaction(fee_rate, inputs, payments, change) { + match signable_transaction::(fee_rate, inputs.clone(), payments, change) { Ok(tx) => PlannedTransaction { signable: tx.0, eventuality: Eventuality { txid: tx.1.txid(), singular_spent_output }, @@ -153,7 +162,14 @@ impl TransactionPlanner> for Planner { let mut res = vec![]; for output in scanner.scan_transaction(tx) { - res.push(Output::new(key, tx, output)); + res.push(Output::new_with_presumed_origin( + key, + tx, + // It shouldn't matter if this is wrong as we should never try to return these + // We still provide an accurate value to ensure a lack of discrepancies + Some(Address::new(inputs[0].output.output().script_pubkey.clone()).unwrap()), + output, + )); } res }), @@ -174,4 +190,4 @@ impl TransactionPlanner> for Planner { } } -pub(crate) type Scheduler = GenericScheduler; +pub(crate) type Scheduler = GenericScheduler, Planner>; diff --git a/processor/bitcoin/src/txindex.rs b/processor/bitcoin/src/txindex.rs new file mode 100644 index 00000000..d9d52526 --- /dev/null +++ b/processor/bitcoin/src/txindex.rs @@ -0,0 +1,80 @@ +/* + We want to be able to return received outputs. We do that by iterating over the inputs to find an + address format we recognize, then setting that address as the address to return to. + + Since inputs only contain the script signatures, yet addresses are for script public keys, we + need to pull up the output spent by an input and read the script public key from that. While we + could use `txindex=1`, and an asynchronous call to the Bitcoin node, we: + + 1) Can maintain a much smaller index ourselves + 2) Don't want the asynchronous call (which would require the flow be async, allowed to + potentially error, and more latent) + 3) Don't want to risk Bitcoin's `txindex` corruptions (frequently observed on testnet) + + This task builds that index. +*/ + +use serai_db::{DbTxn, Db}; + +use primitives::task::ContinuallyRan; +use scanner::ScannerFeed; + +use crate::{db, rpc::Rpc, hash_bytes}; + +pub(crate) struct TxIndexTask(Rpc); + +#[async_trait::async_trait] +impl ContinuallyRan for TxIndexTask { + async fn run_iteration(&mut self) -> Result { + let latest_block_number = self + .0 + .rpc + .get_latest_block_number() + .await + .map_err(|e| format!("couldn't fetch latest block number: {e:?}"))?; + let latest_block_number = u64::try_from(latest_block_number).unwrap(); + // `CONFIRMATIONS - 1` as any on-chain block inherently has one confirmation (itself) + let finalized_block_number = + latest_block_number.checked_sub(Rpc::::CONFIRMATIONS - 1).ok_or(format!( + "blockchain only just started and doesn't have {} blocks yet", + Rpc::::CONFIRMATIONS + ))?; + + let finalized_block_number_in_db = db::LatestBlockToYieldAsFinalized::get(&self.0.db); + let next_block = finalized_block_number_in_db.map_or(0, |block| block + 1); + + let mut iterated = false; + for b in next_block ..= finalized_block_number { + iterated = true; + + // Fetch the block + let block_hash = self + .0 + .rpc + .get_block_hash(b.try_into().unwrap()) + .await + .map_err(|e| format!("couldn't fetch block hash for block {b}: {e:?}"))?; + let block = self + .0 + .rpc + .get_block(&block_hash) + .await + .map_err(|e| format!("couldn't fetch block {b}: {e:?}"))?; + + let mut txn = self.0.db.txn(); + + for tx in &block.txdata[1 ..] { + let txid = hash_bytes(tx.compute_txid().to_raw_hash()); + for (o, output) in tx.output.iter().enumerate() { + let o = u32::try_from(o).unwrap(); + // Set the script pub key for this transaction + db::ScriptPubKey::set(&mut txn, txid, o, &output.script_pubkey.clone().into_bytes()); + } + } + + db::LatestBlockToYieldAsFinalized::set(&mut txn, &b); + txn.commit(); + } + Ok(iterated) + } +}