mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-10 04:44:40 +00:00
Resolve #335
This commit is contained in:
parent
fbf51e53ec
commit
0d23964762
5 changed files with 114 additions and 26 deletions
|
@ -459,6 +459,10 @@ async fn boot<N: Network, D: Db>(
|
||||||
signers.insert(key.as_ref().to_vec(), signer);
|
signers.insert(key.as_ref().to_vec(), signer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Spawn a task to rebroadcast signed TXs yet to be mined into a finalized block
|
||||||
|
// This hedges against being dropped due to full mempools, temporarily too low of a fee...
|
||||||
|
tokio::spawn(Signer::<N, D>::rebroadcast_task(raw_db.clone(), network.clone()));
|
||||||
|
|
||||||
(main_db, TributaryMutable { key_gen, substrate_signer, signers }, multisig_manager)
|
(main_db, TributaryMutable { key_gen, substrate_signer, signers }, multisig_manager)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -163,6 +163,11 @@ impl TransactionTrait<Bitcoin> for Transaction {
|
||||||
self.consensus_encode(&mut buf).unwrap();
|
self.consensus_encode(&mut buf).unwrap();
|
||||||
buf
|
buf
|
||||||
}
|
}
|
||||||
|
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
|
||||||
|
Transaction::consensus_decode(reader)
|
||||||
|
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{e}")))
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
async fn fee(&self, network: &Bitcoin) -> u64 {
|
async fn fee(&self, network: &Bitcoin) -> u64 {
|
||||||
let mut value = 0;
|
let mut value = 0;
|
||||||
|
|
|
@ -119,6 +119,7 @@ pub trait Transaction<N: Network>: Send + Sync + Sized + Clone + Debug {
|
||||||
type Id: 'static + Id;
|
type Id: 'static + Id;
|
||||||
fn id(&self) -> Self::Id;
|
fn id(&self) -> Self::Id;
|
||||||
fn serialize(&self) -> Vec<u8>;
|
fn serialize(&self) -> Vec<u8>;
|
||||||
|
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self>;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
async fn fee(&self, network: &N) -> u64;
|
async fn fee(&self, network: &N) -> u64;
|
||||||
|
|
|
@ -114,6 +114,10 @@ impl TransactionTrait<Monero> for Transaction {
|
||||||
fn serialize(&self) -> Vec<u8> {
|
fn serialize(&self) -> Vec<u8> {
|
||||||
self.serialize()
|
self.serialize()
|
||||||
}
|
}
|
||||||
|
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
|
||||||
|
Transaction::read(reader)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
async fn fee(&self, _: &Monero) -> u64 {
|
async fn fee(&self, _: &Monero) -> u64 {
|
||||||
self.rct_signatures.base.fee
|
self.rct_signatures.base.fee
|
||||||
|
|
|
@ -31,35 +31,92 @@ impl<N: Network, D: Db> SignerDb<N, D> {
|
||||||
D::key(b"SIGNER", dst, key)
|
D::key(b"SIGNER", dst, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn completed_key(id: [u8; 32]) -> Vec<u8> {
|
fn active_signs_key() -> Vec<u8> {
|
||||||
|
Self::sign_key(b"active_signs", [])
|
||||||
|
}
|
||||||
|
fn completed_on_chain_key(id: &[u8; 32]) -> Vec<u8> {
|
||||||
|
Self::sign_key(b"completed_on_chain", id)
|
||||||
|
}
|
||||||
|
fn active_signs<G: Get>(getter: &G) -> Vec<[u8; 32]> {
|
||||||
|
let active = getter.get(Self::active_signs_key()).unwrap_or(vec![]);
|
||||||
|
let mut active_ref = active.as_slice();
|
||||||
|
let mut res = vec![];
|
||||||
|
while !active_ref.is_empty() {
|
||||||
|
res.push(active_ref[.. 32].try_into().unwrap());
|
||||||
|
active_ref = &active_ref[32 ..];
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
fn add_active_sign(txn: &mut D::Transaction<'_>, id: &[u8; 32]) {
|
||||||
|
if txn.get(Self::completed_on_chain_key(id)).is_some() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let key = Self::active_signs_key();
|
||||||
|
let mut active = txn.get(&key).unwrap_or(vec![]);
|
||||||
|
active.extend(id);
|
||||||
|
txn.put(key, active);
|
||||||
|
}
|
||||||
|
fn complete_on_chain(txn: &mut D::Transaction<'_>, id: &[u8; 32]) {
|
||||||
|
txn.put(Self::completed_on_chain_key(id), []);
|
||||||
|
txn.put(
|
||||||
|
Self::active_signs_key(),
|
||||||
|
Self::active_signs(txn)
|
||||||
|
.into_iter()
|
||||||
|
.filter(|active| active != id)
|
||||||
|
.flatten()
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn transaction_key(id: &<N::Transaction as Transaction<N>>::Id) -> Vec<u8> {
|
||||||
|
Self::sign_key(b"tx", id)
|
||||||
|
}
|
||||||
|
fn completions_key(id: [u8; 32]) -> Vec<u8> {
|
||||||
Self::sign_key(b"completed", id)
|
Self::sign_key(b"completed", id)
|
||||||
}
|
}
|
||||||
fn complete(
|
fn complete(txn: &mut D::Transaction<'_>, id: [u8; 32], tx: &N::Transaction) {
|
||||||
txn: &mut D::Transaction<'_>,
|
|
||||||
id: [u8; 32],
|
|
||||||
tx: &<N::Transaction as Transaction<N>>::Id,
|
|
||||||
) {
|
|
||||||
// Transactions can be completed by multiple signatures
|
// Transactions can be completed by multiple signatures
|
||||||
// Save every solution in order to be robust
|
// Save every solution in order to be robust
|
||||||
let mut existing = txn.get(Self::completed_key(id)).unwrap_or(vec![]);
|
let tx_id = tx.id();
|
||||||
|
txn.put(Self::transaction_key(&tx_id), tx.serialize());
|
||||||
|
|
||||||
|
let mut existing = txn.get(Self::completions_key(id)).unwrap_or(vec![]);
|
||||||
|
|
||||||
// Don't add this TX if it's already present
|
// Don't add this TX if it's already present
|
||||||
let tx_len = tx.as_ref().len();
|
let tx_len = tx_id.as_ref().len();
|
||||||
assert_eq!(existing.len() % tx_len, 0);
|
assert_eq!(existing.len() % tx_len, 0);
|
||||||
|
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
while i < existing.len() {
|
while i < existing.len() {
|
||||||
if &existing[i .. (i + tx_len)] == tx.as_ref() {
|
if &existing[i .. (i + tx_len)] == tx_id.as_ref() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
i += tx_len;
|
i += tx_len;
|
||||||
}
|
}
|
||||||
|
|
||||||
existing.extend(tx.as_ref());
|
existing.extend(tx_id.as_ref());
|
||||||
txn.put(Self::completed_key(id), existing);
|
txn.put(Self::completions_key(id), existing);
|
||||||
}
|
}
|
||||||
fn completed<G: Get>(getter: &G, id: [u8; 32]) -> Option<Vec<u8>> {
|
fn completions<G: Get>(getter: &G, id: [u8; 32]) -> Vec<<N::Transaction as Transaction<N>>::Id> {
|
||||||
getter.get(Self::completed_key(id))
|
let completions = getter.get(Self::completions_key(id)).unwrap_or(vec![]);
|
||||||
|
let mut completions_ref = completions.as_slice();
|
||||||
|
let mut res = vec![];
|
||||||
|
while !completions_ref.is_empty() {
|
||||||
|
let mut id = <N::Transaction as Transaction<N>>::Id::default();
|
||||||
|
let id_len = id.as_ref().len();
|
||||||
|
id.as_mut().copy_from_slice(&completions_ref[.. id_len]);
|
||||||
|
completions_ref = &completions_ref[id_len ..];
|
||||||
|
res.push(id);
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
fn transaction<G: Get>(
|
||||||
|
getter: &G,
|
||||||
|
id: <N::Transaction as Transaction<N>>::Id,
|
||||||
|
) -> Option<N::Transaction> {
|
||||||
|
getter
|
||||||
|
.get(Self::transaction_key(&id))
|
||||||
|
.map(|tx| N::Transaction::read(&mut tx.as_slice()).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn eventuality_key(id: [u8; 32]) -> Vec<u8> {
|
fn eventuality_key(id: [u8; 32]) -> Vec<u8> {
|
||||||
|
@ -83,10 +140,6 @@ impl<N: Network, D: Db> SignerDb<N, D> {
|
||||||
fn has_attempt<G: Get>(getter: &G, id: &SignId) -> bool {
|
fn has_attempt<G: Get>(getter: &G, id: &SignId) -> bool {
|
||||||
getter.get(Self::attempt_key(id)).is_some()
|
getter.get(Self::attempt_key(id)).is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn save_transaction(txn: &mut D::Transaction<'_>, tx: &N::Transaction) {
|
|
||||||
txn.put(Self::sign_key(b"tx", tx.id()), tx.serialize());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Signer<N: Network, D: Db> {
|
pub struct Signer<N: Network, D: Db> {
|
||||||
|
@ -122,6 +175,25 @@ impl<N: Network, D: Db> fmt::Debug for Signer<N, D> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<N: Network, D: Db> Signer<N, D> {
|
impl<N: Network, D: Db> Signer<N, D> {
|
||||||
|
/// Rebroadcast already signed TXs which haven't had their completions mined into a sufficiently
|
||||||
|
/// confirmed block.
|
||||||
|
pub async fn rebroadcast_task(db: D, network: N) {
|
||||||
|
log::info!("rebroadcasting transactions for plans whose completions yet to be confirmed...");
|
||||||
|
loop {
|
||||||
|
for active in SignerDb::<N, D>::active_signs(&db) {
|
||||||
|
for completion in SignerDb::<N, D>::completions(&db, active) {
|
||||||
|
log::info!("rebroadcasting {}", hex::encode(&completion));
|
||||||
|
// TODO: Don't drop the error entirely. Check for invariants
|
||||||
|
let _ = network
|
||||||
|
.publish_transaction(&SignerDb::<N, D>::transaction(&db, completion).unwrap())
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Only run every five minutes so we aren't frequently loading tens to hundreds of KB from
|
||||||
|
// the DB
|
||||||
|
tokio::time::sleep(core::time::Duration::from_secs(5 * 60)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn new(network: N, keys: ThresholdKeys<N::Curve>) -> Signer<N, D> {
|
pub fn new(network: N, keys: ThresholdKeys<N::Curve>) -> Signer<N, D> {
|
||||||
Signer {
|
Signer {
|
||||||
db: PhantomData,
|
db: PhantomData,
|
||||||
|
@ -170,7 +242,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn already_completed(&self, txn: &mut D::Transaction<'_>, id: [u8; 32]) -> bool {
|
fn already_completed(&self, txn: &mut D::Transaction<'_>, id: [u8; 32]) -> bool {
|
||||||
if SignerDb::<N, D>::completed(txn, id).is_some() {
|
if !SignerDb::<N, D>::completions(txn, id).is_empty() {
|
||||||
debug!(
|
debug!(
|
||||||
"SignTransaction/Reattempt order for {}, which we've already completed signing",
|
"SignTransaction/Reattempt order for {}, which we've already completed signing",
|
||||||
hex::encode(id)
|
hex::encode(id)
|
||||||
|
@ -202,8 +274,8 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||||
let first_completion = !self.already_completed(txn, id);
|
let first_completion = !self.already_completed(txn, id);
|
||||||
|
|
||||||
// Save this completion to the DB
|
// Save this completion to the DB
|
||||||
SignerDb::<N, D>::save_transaction(txn, &tx);
|
SignerDb::<N, D>::complete_on_chain(txn, &id);
|
||||||
SignerDb::<N, D>::complete(txn, id, &tx.id());
|
SignerDb::<N, D>::complete(txn, id, &tx);
|
||||||
|
|
||||||
if first_completion {
|
if first_completion {
|
||||||
self.complete(id, tx.id());
|
self.complete(id, tx.id());
|
||||||
|
@ -238,8 +310,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||||
let first_completion = !self.already_completed(txn, id);
|
let first_completion = !self.already_completed(txn, id);
|
||||||
|
|
||||||
// Save this completion to the DB
|
// Save this completion to the DB
|
||||||
SignerDb::<N, D>::save_transaction(txn, &tx);
|
SignerDb::<N, D>::complete(txn, id, &tx);
|
||||||
SignerDb::<N, D>::complete(txn, id, tx_id);
|
|
||||||
|
|
||||||
if first_completion {
|
if first_completion {
|
||||||
self.complete(id, tx.id());
|
self.complete(id, tx.id());
|
||||||
|
@ -255,7 +326,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||||
} else {
|
} else {
|
||||||
// If we don't have this in RAM, it should be because we already finished signing it
|
// If we don't have this in RAM, it should be because we already finished signing it
|
||||||
// TODO: Will the coordinator ever send us Completed for an unknown ID?
|
// TODO: Will the coordinator ever send us Completed for an unknown ID?
|
||||||
assert!(SignerDb::<N, D>::completed(txn, id).is_some());
|
assert!(!SignerDb::<N, D>::completions(txn, id).is_empty());
|
||||||
info!(
|
info!(
|
||||||
"signer {} informed of the eventuality completion for plan {}, {}",
|
"signer {} informed of the eventuality completion for plan {}, {}",
|
||||||
hex::encode(self.keys.group_key().to_bytes()),
|
hex::encode(self.keys.group_key().to_bytes()),
|
||||||
|
@ -356,6 +427,10 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||||
tx: N::SignableTransaction,
|
tx: N::SignableTransaction,
|
||||||
eventuality: N::Eventuality,
|
eventuality: N::Eventuality,
|
||||||
) {
|
) {
|
||||||
|
// The caller is expected to re-issue sign orders on reboot
|
||||||
|
// This is solely used by the rebroadcast task
|
||||||
|
SignerDb::<N, D>::add_active_sign(txn, &id);
|
||||||
|
|
||||||
if self.already_completed(txn, id) {
|
if self.already_completed(txn, id) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -462,11 +537,10 @@ impl<N: Network, D: Db> Signer<N, D> {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Save the transaction in case it's needed for recovery
|
// Save the transaction in case it's needed for recovery
|
||||||
SignerDb::<N, D>::save_transaction(txn, &tx);
|
SignerDb::<N, D>::complete(txn, id.id, &tx);
|
||||||
let tx_id = tx.id();
|
|
||||||
SignerDb::<N, D>::complete(txn, id.id, &tx_id);
|
|
||||||
|
|
||||||
// Publish it
|
// Publish it
|
||||||
|
let tx_id = tx.id();
|
||||||
if let Err(e) = self.network.publish_transaction(&tx).await {
|
if let Err(e) = self.network.publish_transaction(&tx).await {
|
||||||
error!("couldn't publish {:?}: {:?}", tx, e);
|
error!("couldn't publish {:?}: {:?}", tx, e);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue