basicswap/basicswap/interface/firo.py
tecnovert ff2fc35f72
Add wallet_name option to basicswap.json.
Removed "walletfile" setting for XMR and WOW, replaced with "wallet_name".
Set wallet_name in prepare script with eg: BTC_WALLET_NAME env var.
2025-01-28 09:40:29 +02:00

452 lines
15 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2022-2023 tecnovert
# Copyright (c) 2024 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import hashlib
import random
from .btc import BTCInterface, find_vout_for_address_from_txobj
from basicswap.util import (
i2b,
ensure,
)
from basicswap.rpc import make_rpc_func
from basicswap.util.crypto import hash160
from basicswap.util.address import decodeAddress
from basicswap.chainparams import Coins
from basicswap.interface.contrib.firo_test_framework.script import (
CScript,
OP_DUP,
OP_EQUAL,
OP_HASH160,
OP_CHECKSIG,
OP_EQUALVERIFY,
)
from basicswap.interface.contrib.firo_test_framework.mininode import (
CBlock,
FromHex,
CTransaction,
)
class FIROInterface(BTCInterface):
@staticmethod
def coin_type():
return Coins.FIRO
def __init__(self, coin_settings, network, swap_client=None):
super(FIROInterface, self).__init__(coin_settings, network, swap_client)
# No multiwallet support
self.rpc_wallet = make_rpc_func(
self._rpcport, self._rpcauth, host=self._rpc_host
)
if "wallet_name" in coin_settings:
raise ValueError(f"Invalid setting for {self.coin_name()}: wallet_name")
def getExchangeName(self, exchange_name: str) -> str:
return "zcoin"
def initialiseWallet(self, key):
# load with -hdseed= parameter
pass
def checkWallets(self) -> int:
return 1
def getNewAddress(self, use_segwit, label="swap_receive"):
return self.rpc("getnewaddress", [label])
# addr_plain = self.rpc('getnewaddress', [label])
# return self.rpc('addwitnessaddress', [addr_plain])
def decodeAddress(self, address):
return decodeAddress(address)[1:]
def encodeSegwitAddress(self, script):
raise ValueError("TODO")
def decodeSegwitAddress(self, addr):
raise ValueError("TODO")
def isWatchOnlyAddress(self, address):
addr_info = self.rpc("validateaddress", [address])
return addr_info["iswatchonly"]
def isAddressMine(self, address: str, or_watch_only: bool = False) -> bool:
addr_info = self.rpc("validateaddress", [address])
if not or_watch_only:
return addr_info["ismine"]
return addr_info["ismine"] or addr_info["iswatchonly"]
def getSCLockScriptAddress(self, lock_script: bytes) -> str:
lock_tx_dest = self.getScriptDest(lock_script)
address = self.encodeScriptDest(lock_tx_dest)
if not self.isAddressMine(address, or_watch_only=True):
# Expects P2WSH nested in BIP16_P2SH
self.rpc("importaddress", [lock_tx_dest.hex(), "bid lock", False, True])
return address
def getLockTxHeight(
self,
txid,
dest_address,
bid_amount,
rescan_from,
find_index: bool = False,
vout: int = -1,
):
# Add watchonly address and rescan if required
if not self.isAddressMine(dest_address, or_watch_only=True):
self.importWatchOnlyAddress(dest_address, "bid")
self._log.info("Imported watch-only addr: {}".format(dest_address))
self._log.info(
"Rescanning {} chain from height: {}".format(
self.coin_name(), rescan_from
)
)
self.rescanBlockchainForAddress(rescan_from, dest_address)
return_txid = True if txid is None else False
if txid is None:
txns = self.rpc(
"listunspent",
[
0,
9999999,
[
dest_address,
],
],
)
for tx in txns:
if self.make_int(tx["amount"]) == bid_amount:
txid = bytes.fromhex(tx["txid"])
break
if txid is None:
return None
try:
tx = self.rpc("gettransaction", [txid.hex()])
block_height = 0
if "blockhash" in tx:
block_header = self.rpc("getblockheader", [tx["blockhash"]])
block_height = block_header["height"]
rv = {
"depth": 0 if "confirmations" not in tx else tx["confirmations"],
"height": block_height,
}
except Exception as e:
self._log.debug(
"getLockTxHeight gettransaction failed: %s, %s", txid.hex(), str(e)
)
return None
if find_index:
tx_obj = self.rpc("decoderawtransaction", [tx["hex"]])
rv["index"] = find_vout_for_address_from_txobj(tx_obj, dest_address)
if return_txid:
rv["txid"] = txid.hex()
return rv
def createSCLockTx(
self, value: int, script: bytearray, vkbv: bytes = None
) -> bytes:
tx = CTransaction()
tx.nVersion = self.txVersion()
tx.vout.append(self.txoType()(value, self.getScriptDest(script)))
return tx.serialize()
def fundSCLockTx(self, tx_bytes, feerate, vkbv=None):
return self.fundTx(tx_bytes, feerate)
def signTxWithWallet(self, tx):
rv = self.rpc("signrawtransaction", [tx.hex()])
return bytes.fromhex(rv["hex"])
def createRawFundedTransaction(
self,
addr_to: str,
amount: int,
sub_fee: bool = False,
lock_unspents: bool = True,
) -> str:
txn = self.rpc(
"createrawtransaction", [[], {addr_to: self.format_amount(amount)}]
)
fee_rate, fee_src = self.get_fee_rate(self._conf_target)
self._log.debug(
f"Fee rate: {fee_rate}, source: {fee_src}, block target: {self._conf_target}"
)
options = {
"lockUnspents": lock_unspents,
"feeRate": fee_rate,
}
if sub_fee:
options["subtractFeeFromOutputs"] = [
0,
]
return self.rpc("fundrawtransaction", [txn, options])["hex"]
def createRawSignedTransaction(self, addr_to, amount) -> str:
txn_funded = self.createRawFundedTransaction(addr_to, amount)
return self.rpc("signrawtransaction", [txn_funded])["hex"]
def getScriptForPubkeyHash(self, pkh: bytes) -> bytearray:
# Return P2PKH
return CScript([OP_DUP, OP_HASH160, pkh, OP_EQUALVERIFY, OP_CHECKSIG])
def getScriptDest(self, script: bytearray) -> bytearray:
# P2SH
script_hash = hash160(script)
assert len(script_hash) == 20
return CScript([OP_HASH160, script_hash, OP_EQUAL])
def getSeedHash(self, seed: bytes) -> bytes:
return hash160(seed)[::-1]
def encodeScriptDest(self, script_dest: bytes) -> str:
# Extract hash from script
script_hash = script_dest[2:-1]
return self.sh_to_address(script_hash)
def getDestForScriptHash(self, script_hash):
assert len(script_hash) == 20
return CScript([OP_HASH160, script_hash, OP_EQUAL])
def withdrawCoin(self, value, addr_to, subfee):
params = [addr_to, value, "", "", subfee]
return self.rpc("sendtoaddress", params)
def getWalletSeedID(self):
return self.rpc("getwalletinfo")["hdmasterkeyid"]
def getSpendableBalance(self) -> int:
return self.make_int(self.rpc("getwalletinfo")["balance"])
def getBLockSpendTxFee(self, tx, fee_rate: int) -> int:
add_bytes = 107
size = len(tx.serialize_with_witness()) + add_bytes
pay_fee = round(fee_rate * size / 1000)
self._log.info(
f"BLockSpendTx fee_rate, size, fee: {fee_rate}, {size}, {pay_fee}."
)
return pay_fee
def signTxWithKey(self, tx: bytes, key: bytes) -> bytes:
key_wif = self.encodeKey(key)
rv = self.rpc(
"signrawtransaction",
[
tx.hex(),
[],
[
key_wif,
],
],
)
return bytes.fromhex(rv["hex"])
def findTxnByHash(self, txid_hex: str):
# Only works for wallet txns
try:
rv = self.rpc("gettransaction", [txid_hex])
except Exception as e: # noqa: F841
self._log.debug(
"findTxnByHash getrawtransaction failed: {}".format(txid_hex)
)
return None
if "confirmations" in rv and rv["confirmations"] >= self.blocks_confirmed:
block_height = self.getBlockHeader(rv["blockhash"])["height"]
return {"txid": txid_hex, "amount": 0, "height": block_height}
return None
def getProofOfFunds(self, amount_for, extra_commit_bytes):
# TODO: Lock unspent and use same output/s to fund bid
unspents_by_addr = dict()
unspents = self.rpc("listunspent")
for u in unspents:
if u["spendable"] is not True:
continue
if u["address"] not in unspents_by_addr:
unspents_by_addr[u["address"]] = {"total": 0, "utxos": []}
utxo_amount: int = self.make_int(u["amount"], r=1)
unspents_by_addr[u["address"]]["total"] += utxo_amount
unspents_by_addr[u["address"]]["utxos"].append(
(utxo_amount, u["txid"], u["vout"])
)
max_utxos: int = 4
viable_addrs = []
for addr, data in unspents_by_addr.items():
if data["total"] >= amount_for:
# Sort from largest to smallest amount
sorted_utxos = sorted(data["utxos"], key=lambda x: x[0])
# Max outputs required to reach amount_for
utxos_req: int = 0
sum_value: int = 0
for utxo in sorted_utxos:
sum_value += utxo[0]
utxos_req += 1
if sum_value >= amount_for:
break
if utxos_req <= max_utxos:
viable_addrs.append(addr)
continue
ensure(
len(viable_addrs) > 0, "Could not find address with enough funds for proof"
)
sign_for_addr: str = random.choice(viable_addrs)
self._log.debug("sign_for_addr %s", sign_for_addr)
prove_utxos = []
sorted_utxos = sorted(
unspents_by_addr[sign_for_addr]["utxos"], key=lambda x: x[0]
)
hasher = hashlib.sha256()
sum_value: int = 0
for utxo in sorted_utxos:
sum_value += utxo[0]
outpoint = (bytes.fromhex(utxo[1]), utxo[2])
prove_utxos.append(outpoint)
hasher.update(outpoint[0])
hasher.update(outpoint[1].to_bytes(2, "big"))
if sum_value >= amount_for:
break
utxos_hash = hasher.digest()
if (
self.using_segwit()
): # TODO: Use isSegwitAddress when scantxoutset can use combo
# 'Address does not refer to key' for non p2pkh
pkh = self.decodeAddress(sign_for_addr)
sign_for_addr = self.pkh_to_address(pkh)
self._log.debug("sign_for_addr converted %s", sign_for_addr)
signature = self.rpc(
"signmessage",
[
sign_for_addr,
sign_for_addr
+ "_swap_proof_"
+ utxos_hash.hex()
+ extra_commit_bytes.hex(),
],
)
return (sign_for_addr, signature, prove_utxos)
def verifyProofOfFunds(self, address, signature, utxos, extra_commit_bytes):
hasher = hashlib.sha256()
sum_value: int = 0
for outpoint in utxos:
hasher.update(outpoint[0])
hasher.update(outpoint[1].to_bytes(2, "big"))
utxos_hash = hasher.digest()
passed = self.verifyMessage(
address,
address + "_swap_proof_" + utxos_hash.hex() + extra_commit_bytes.hex(),
signature,
)
ensure(passed is True, "Proof of funds signature invalid")
if self.using_segwit():
address = self.encodeSegwitAddress(decodeAddress(address)[1:])
sum_value: int = 0
for outpoint in utxos:
txout = self.rpc("gettxout", [outpoint[0].hex(), outpoint[1]])
sum_value += self.make_int(txout["value"])
return sum_value
def rescanBlockchainForAddress(self, height_start: int, addr_find: str):
# Very ugly workaround for missing `rescanblockchain` rpc command
chain_blocks: int = self.getChainHeight()
current_height: int = chain_blocks
block_hash = self.rpc("getblockhash", [current_height])
script_hash: bytes = self.decodeAddress(addr_find)
find_scriptPubKey = self.getDestForScriptHash(script_hash)
while current_height > height_start:
block_hash = self.rpc("getblockhash", [current_height])
block = self.rpc("getblock", [block_hash, False])
decoded_block = CBlock()
decoded_block = FromHex(decoded_block, block)
for tx in decoded_block.vtx:
for txo in tx.vout:
if txo.scriptPubKey == find_scriptPubKey:
tx.rehash()
txid = i2b(tx.sha256)
self._log.info(
"Found output to addr: {} in tx {} in block {}".format(
addr_find, txid.hex(), block_hash
)
)
self._log.info(
"rescanblockchain hack invalidateblock {}".format(
block_hash
)
)
self.rpc("invalidateblock", [block_hash])
self.rpc("reconsiderblock", [block_hash])
return
current_height -= 1
def getBlockWithTxns(self, block_hash: str):
# TODO: Bypass decoderawtransaction and getblockheader
block = self.rpc("getblock", [block_hash, False])
block_header = self.rpc("getblockheader", [block_hash])
decoded_block = CBlock()
decoded_block = FromHex(decoded_block, block)
tx_rv = []
for tx in decoded_block.vtx:
tx_hex = tx.serialize_with_witness().hex()
tx_dec = self.rpc("decoderawtransaction", [tx_hex])
if "hex" not in tx_dec:
tx_dec["hex"] = tx_hex
tx_rv.append(tx_dec)
block_rv = {
"hash": block_hash,
"previousblockhash": block_header["previousblockhash"],
"tx": tx_rv,
"confirmations": block_header["confirmations"],
"height": block_header["height"],
"time": block_header["time"],
"version": block_header["version"],
"merkleroot": block_header["merkleroot"],
}
return block_rv