doge: Switch to custom binary.

This commit is contained in:
tecnovert 2024-12-12 14:29:41 +02:00
parent 3be72b3c71
commit 0e2be676db
11 changed files with 694 additions and 107 deletions

View file

@ -355,7 +355,13 @@ class BasicSwap(BaseApp):
# TODO: Set dynamically
self.balance_only_coins = (Coins.LTC_MWEB,)
self.scriptless_coins = (Coins.XMR, Coins.WOW, Coins.PART_ANON, Coins.FIRO, Coins.DOGE)
self.scriptless_coins = (
Coins.XMR,
Coins.WOW,
Coins.PART_ANON,
Coins.FIRO,
Coins.DOGE,
)
self.adaptor_swap_only_coins = self.scriptless_coins + (
Coins.PART_BLIND,
Coins.BCH,

View file

@ -86,7 +86,7 @@ DCR_VERSION_TAG = os.getenv("DCR_VERSION_TAG", "")
BITCOINCASH_VERSION = os.getenv("BITCOINCASH_VERSION", "27.1.0")
BITCOINCASH_VERSION_TAG = os.getenv("BITCOINCASH_VERSION_TAG", "")
DOGECOIN_VERSION = os.getenv("DOGECOIN_VERSION", "1.14.7")
DOGECOIN_VERSION = os.getenv("DOGECOIN_VERSION", "23.2.1")
DOGECOIN_VERSION_TAG = os.getenv("DOGECOIN_VERSION_TAG", "")
GUIX_SSL_CERT_DIR = None
@ -111,7 +111,7 @@ known_coins = {
"firo": (FIRO_VERSION, FIRO_VERSION_TAG, ("reuben",)),
"navcoin": (NAV_VERSION, NAV_VERSION_TAG, ("nav_builder",)),
"bitcoincash": (BITCOINCASH_VERSION, BITCOINCASH_VERSION_TAG, ("Calin_Culianu",)),
"dogecoin": (DOGECOIN_VERSION, DOGECOIN_VERSION_TAG, ("patricklodder",)),
"dogecoin": (DOGECOIN_VERSION, DOGECOIN_VERSION_TAG, ("tecnovert",)),
}
disabled_coins = [
@ -815,16 +815,14 @@ def prepareCore(coin, version_data, settings, data_dir, extra_opts={}):
% (version, os_dir_name, signing_key_name, assert_filename)
)
elif coin == "dogecoin":
release_url = "https://github.com/dogecoin/dogecoin/releases/download/v{}/{}".format(
version + version_tag, release_filename
)
assert_filename = "{}-{}-{}-build.assert".format(
coin, os_name, ".".join(version.split(".")[:2])
)
assert_url = (
"https://raw.githubusercontent.com/dogecoin/gitian.sigs/master/%s-%s/%s/%s"
% (version, os_dir_name, signing_key_name, assert_filename)
release_url = (
"https://github.com/tecnovert/dogecoin/releases/download/v{}/{}".format(
version + version_tag, release_filename
)
)
assert_filename = "{}-{}-{}-build.assert".format(coin, os_name, version)
assert_url = f"https://raw.githubusercontent.com/tecnovert/guix.sigs/dogecoin/{version}/{signing_key_name}/noncodesigned.SHA256SUMS"
elif coin == "bitcoin":
release_url = "https://bitcoincore.org/bin/bitcoin-core-{}/{}".format(
version, release_filename
@ -1250,7 +1248,7 @@ def prepareDataDir(coin, settings, chain, particl_mnemonic, extra_opts={}):
fp.write(chainname + "=1\n")
else:
fp.write(chain + "=1\n")
if coin not in ("firo", "navcoin", "dogecoin"):
if coin not in ("firo", "navcoin"):
if chain == "testnet":
fp.write("[test]\n\n")
elif chain == "regtest":
@ -1730,6 +1728,7 @@ def initialise_wallets(
Coins.PART,
Coins.BTC,
Coins.LTC,
Coins.DOGE,
Coins.DCR,
Coins.DASH,
)
@ -2301,8 +2300,8 @@ def main():
"use_csv": False,
"blocks_confirmed": 2,
"conf_target": 2,
"core_version_group": 14,
"min_relay_fee": 0.00001,
"core_version_group": 23,
"min_relay_fee": 0.01, # RECOMMENDED_MIN_TX_FEE
},
"decred": {
"connection_type": "rpc",

View file

@ -175,20 +175,20 @@ chainparams = {
"rpcport": 44555,
"pubkey_address": 113,
"script_address": 196,
"key_prefix": 239,
"key_prefix": 241,
"hrp": "tdge",
"bip44": 3,
"bip44": 1,
"min_amount": 100000,
"max_amount": 10000000 * COIN,
"name": "testnet4",
},
"regtest": {
"rpcport": 18332,
"pubkey_address": 113,
"pubkey_address": 111,
"script_address": 196,
"key_prefix": 239,
"hrp": "rdge",
"bip44": 3,
"bip44": 1,
"min_amount": 100000,
"max_amount": 10000000 * COIN,
},

View file

@ -1296,7 +1296,7 @@ class BTCInterface(Secp256k1Interface):
def getWalletTransaction(self, txid: bytes):
try:
return bytes.fromhex(self.rpc_wallet("gettransaction", [txid.hex()]))
return bytes.fromhex(self.rpc_wallet("gettransaction", [txid.hex()])["hex"])
except Exception as e: # noqa: F841
# TODO: filter errors
return None
@ -1466,7 +1466,6 @@ class BTCInterface(Secp256k1Interface):
vout: int = -1,
):
# Add watchonly address and rescan if required
if not self.isAddressMine(dest_address, or_watch_only=True):
self.importWatchOnlyAddress(dest_address, "bid")
self._log.info("Imported watch-only addr: {}".format(dest_address))

View file

@ -7,7 +7,16 @@
from .btc import BTCInterface
from basicswap.chainparams import Coins
from basicswap.rpc import make_rpc_func
from basicswap.util.crypto import hash160
from basicswap.contrib.test_framework.script import (
CScript,
OP_DUP,
OP_CHECKSIG,
OP_HASH160,
OP_EQUAL,
OP_EQUALVERIFY,
)
class DOGEInterface(BTCInterface):
@ -15,29 +24,35 @@ class DOGEInterface(BTCInterface):
def coin_type():
return Coins.DOGE
@staticmethod
def xmr_swap_b_lock_spend_tx_vsize() -> int:
return 192
def __init__(self, coin_settings, network, swap_client=None):
super(DOGEInterface, self).__init__(coin_settings, network, swap_client)
# No multiwallet support
self.rpc_wallet = make_rpc_func(
self._rpcport, self._rpcauth, host=self._rpc_host
def getScriptDest(self, script: bytearray) -> bytearray:
# P2SH
script_hash = hash160(script)
assert len(script_hash) == 20
return CScript([OP_HASH160, script_hash, OP_EQUAL])
def getScriptForPubkeyHash(self, pkh: bytes) -> bytearray:
# Return P2PKH
return CScript([OP_DUP, OP_HASH160, pkh, OP_EQUALVERIFY, OP_CHECKSIG])
def encodeScriptDest(self, script_dest: bytes) -> str:
# Extract hash from script
script_hash = script_dest[2:-1]
return self.sh_to_address(script_hash)
def getBLockSpendTxFee(self, tx, fee_rate: int) -> int:
add_bytes = 107
size = len(tx.serialize_with_witness()) + add_bytes
pay_fee = round(fee_rate * size / 1000)
self._log.info(
f"BLockSpendTx fee_rate, size, fee: {fee_rate}, {size}, {pay_fee}."
)
def initialiseWallet(self, key):
# load with -hdseed= parameter
pass
def checkWallets(self) -> int:
return 1
def getNewAddress(self, use_segwit, label="swap_receive"):
return self.rpc("getnewaddress", [label])
def isWatchOnlyAddress(self, address):
addr_info = self.rpc("validateaddress", [address])
return addr_info["iswatchonly"]
def isAddressMine(self, address: str, or_watch_only: bool = False) -> bool:
addr_info = self.rpc("validateaddress", [address])
if not or_watch_only:
return addr_info["ismine"]
return addr_info["ismine"] or addr_info["iswatchonly"]
return pay_fee

View file

@ -44,6 +44,10 @@ from tests.basicswap.test_bch_xmr import (
BCH_BASE_PORT,
BCH_BASE_RPC_PORT,
)
from tests.basicswap.extended.test_doge import (
DOGE_BASE_PORT,
DOGE_BASE_RPC_PORT,
)
from basicswap.contrib.rpcauth import generate_salt, password_to_hmac
@ -58,9 +62,6 @@ FIRO_BASE_PORT = 34832
FIRO_BASE_RPC_PORT = 35832
FIRO_RPC_PORT_BASE = int(os.getenv("FIRO_RPC_PORT_BASE", FIRO_BASE_RPC_PORT))
DOGE_BASE_PORT = 22556
DOGE_BASE_RPC_PORT = 18442
TEST_PATH = os.path.expanduser(os.getenv("TEST_PATH", "~/test_basicswap1"))
PARTICL_PORT_BASE = int(os.getenv("PARTICL_PORT_BASE", BASE_PORT))
@ -75,9 +76,7 @@ DECRED_RPC_PORT_BASE = int(os.getenv("DECRED_RPC_PORT_BASE", DCR_BASE_RPC_PORT))
BITCOINCASH_RPC_PORT_BASE = int(
os.getenv("BITCOINCASH_RPC_PORT_BASE", BCH_BASE_RPC_PORT)
)
DOGECOIN_RPC_PORT_BASE = int(
os.getenv("DOGECOIN_RPC_PORT_BASE", DOGE_BASE_RPC_PORT)
)
DOGECOIN_RPC_PORT_BASE = int(os.getenv("DOGECOIN_RPC_PORT_BASE", DOGE_BASE_RPC_PORT))
EXTRA_CONFIG_JSON = json.loads(os.getenv("EXTRA_CONFIG_JSON", "{}"))
@ -453,6 +452,7 @@ def run_prepare(
fp.write("discover=0\n")
fp.write("listenonion=0\n")
fp.write("upnp=0\n")
fp.write("debug=1\n")
if use_rpcauth:
salt = generate_salt(16)
rpc_user = "test_doge_" + str(node_id)

View file

@ -5,64 +5,495 @@
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
"""
export RESET_TEST=true
export TEST_PATH=/tmp/test_doge
mkdir -p ${TEST_PATH}/bin
cp -r ~/tmp/basicswap_bin/* ${TEST_PATH}/bin
export PYTHONPATH=$(pwd)
export TEST_COINS_LIST='bitcoin,dogecoin'
python tests/basicswap/extended/test_doge.py
"""
import os
import sys
import json
import time
import random
import signal
import logging
import unittest
import threading
import multiprocessing
from unittest.mock import patch
from tests.basicswap.extended.test_xmr_persistent import (
BaseTestWithPrepare,
UI_PORT,
import basicswap.config as cfg
from basicswap.basicswap import (
Coins,
)
from tests.basicswap.extended.test_scripts import (
wait_for_offers,
from basicswap.util.address import (
toWIF,
)
from tests.basicswap.util import (
read_json_api,
from tests.basicswap.common import (
stopDaemons,
make_rpc_func,
waitForRPC,
)
from basicswap.bin.run import startDaemon
from basicswap.contrib.rpcauth import generate_salt, password_to_hmac
from tests.basicswap.test_xmr import test_delay_event, callnoderpc
from basicswap.contrib.test_framework.messages import (
CTransaction,
CTxIn,
COutPoint,
ToHex,
)
from basicswap.contrib.test_framework.script import (
CScript,
OP_CHECKLOCKTIMEVERIFY,
)
from tests.basicswap.test_btc_xmr import TestFunctions
logger = logging.getLogger()
logger.level = logging.DEBUG
if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stdout))
DOGE_BINDIR = os.path.expanduser(
os.getenv("DOGE_BINDIR", os.path.join(cfg.DEFAULT_TEST_BINDIR, "dogecoin"))
)
DOGED = os.getenv("DOGED", "dogecoind" + cfg.bin_suffix)
DOGE_CLI = os.getenv("DOGE_CLI", "dogecoin-cli" + cfg.bin_suffix)
DOGE_TX = os.getenv("DOGE_TX", "dogecoin-tx" + cfg.bin_suffix)
class DOGETest(BaseTestWithPrepare):
def test_a(self):
read_json_api(UI_PORT + 0, "wallets/doge/reseed")
read_json_api(UI_PORT + 1, "wallets/doge/reseed")
DOGE_BASE_PORT = 22556
DOGE_BASE_RPC_PORT = 18442
offer_json = {
"coin_from": "btc",
"coin_to": "doge",
"amt_from": 10.0,
"amt_to": 100.0,
"amt_var": True,
"lockseconds": 3600,
def prepareDataDir(
datadir, node_id, conf_file, dir_prefix, base_p2p_port, base_rpc_port, num_nodes=3
):
node_dir = os.path.join(datadir, dir_prefix + str(node_id))
if not os.path.exists(node_dir):
os.makedirs(node_dir)
cfg_file_path = os.path.join(node_dir, conf_file)
if os.path.exists(cfg_file_path):
return
with open(cfg_file_path, "w+") as fp:
fp.write("regtest=1\n")
fp.write("[regtest]\n")
fp.write("port=" + str(base_p2p_port + node_id) + "\n")
fp.write("rpcport=" + str(base_rpc_port + node_id) + "\n")
salt = generate_salt(16)
fp.write(
"rpcauth={}:{}${}\n".format(
"test" + str(node_id),
salt,
password_to_hmac(salt, "test_pass" + str(node_id)),
)
)
fp.write("daemon=0\n")
fp.write("printtoconsole=0\n")
fp.write("server=1\n")
fp.write("discover=0\n")
fp.write("listenonion=0\n")
fp.write("bind=127.0.0.1\n")
fp.write("findpeers=0\n")
fp.write("debug=1\n")
fp.write("debugexclude=libevent\n")
fp.write("acceptnonstdtxn=0\n")
for i in range(0, num_nodes):
if node_id == i:
continue
fp.write("addnode=127.0.0.1:{}\n".format(base_p2p_port + i))
return node_dir
class Test(TestFunctions):
__test__ = True
test_coin = Coins.DOGE
test_coin_from = Coins.BTC
test_coin_to = Coins.DOGE
doge_daemons = []
doge_addr = None
start_ltc_nodes = False
start_xmr_nodes = False
test_atomic = False
test_xmr = True
pause_chain = False
doge_seeds = [
"516b471da2a67bcfd42a1da7f7ae8f9a1b02c34f6a2d6a943ceec5dca68e7fa1",
"a8c0911fba070d5cc2784703afeb0f7c3b9b524b8a53466c04e01933d9fede78",
"7b3b533ac3a27114ae17c8cca0d2cd9f736e7519ae52b8ec8f1f452e8223d082",
]
@classmethod
def prepareExtraDataDir(cls, i):
if not cls.restore_instance:
prepareDataDir(
cfg.TEST_DATADIRS,
i,
"dogecoin.conf",
"doge_",
base_p2p_port=DOGE_BASE_PORT,
base_rpc_port=DOGE_BASE_RPC_PORT,
)
cls.doge_daemons.append(
startDaemon(
os.path.join(cfg.TEST_DATADIRS, "doge_" + str(i)),
DOGE_BINDIR,
DOGED,
)
)
logging.info("Started %s %d", DOGED, cls.doge_daemons[-1].handle.pid)
dogeRpc = make_rpc_func(i, base_rpc_port=DOGE_BASE_RPC_PORT)
waitForRPC(dogeRpc, test_delay_event, rpc_command="getblockchaininfo")
if len(dogeRpc("listwallets")) < 1:
dogeRpc("createwallet", ["wallet.dat", False, True, "", False, False])
wif_prefix: int = 239
wif = toWIF(wif_prefix, bytes.fromhex(cls.doge_seeds[i]), False)
dogeRpc("sethdseed", [True, wif])
waitForRPC(
dogeRpc,
test_delay_event,
max_tries=12,
)
@classmethod
def addPIDInfo(cls, sc, i):
sc.setDaemonPID(Coins.DOGE, cls.doge_daemons[i].handle.pid)
@classmethod
def sync_blocks(cls, wait_for: int = 20, num_nodes: int = 3) -> None:
logging.info("Syncing blocks")
for i in range(wait_for):
if test_delay_event.is_set():
raise ValueError("Test stopped.")
block_hash0 = callnoderpc(
0, "getbestblockhash", base_rpc_port=DOGE_BASE_RPC_PORT
)
matches: int = 0
for i in range(1, num_nodes):
block_hash = callnoderpc(
i, "getbestblockhash", base_rpc_port=DOGE_BASE_RPC_PORT
)
if block_hash == block_hash0:
matches += 1
if matches == num_nodes - 1:
return
test_delay_event.wait(1)
raise ValueError("sync_blocks timed out.")
@classmethod
def prepareExtraCoins(cls):
if cls.restore_instance:
void_block_rewards_pubkey = cls.getRandomPubkey()
cls.doge_addr = (
cls.swap_clients[0]
.ci(Coins.DOGE)
.pubkey_to_address(void_block_rewards_pubkey)
)
else:
num_blocks = 400
cls.doge_addr = callnoderpc(
0, "getnewaddress", ["mining_addr"], base_rpc_port=DOGE_BASE_RPC_PORT
)
logging.info("Mining %d DOGE blocks to %s", num_blocks, cls.doge_addr)
callnoderpc(
0,
"generatetoaddress",
[num_blocks, cls.doge_addr],
base_rpc_port=DOGE_BASE_RPC_PORT,
)
doge_addr1 = callnoderpc(
1, "getnewaddress", ["initial addr"], base_rpc_port=DOGE_BASE_RPC_PORT
)
for i in range(5):
callnoderpc(
0,
"sendtoaddress",
[doge_addr1, 1000],
base_rpc_port=DOGE_BASE_RPC_PORT,
)
# Set future block rewards to nowhere (a random address), so wallet amounts stay constant
void_block_rewards_pubkey = cls.getRandomPubkey()
cls.doge_addr = (
cls.swap_clients[0]
.ci(Coins.DOGE)
.pubkey_to_address(void_block_rewards_pubkey)
)
num_blocks = 100
logging.info("Mining %d DOGE blocks to %s", num_blocks, cls.doge_addr)
callnoderpc(
0,
"generatetoaddress",
[num_blocks, cls.doge_addr],
base_rpc_port=DOGE_BASE_RPC_PORT,
)
cls.sync_blocks()
@classmethod
def tearDownClass(cls):
logging.info("Finalising DOGE Test")
super(Test, cls).tearDownClass()
stopDaemons(cls.doge_daemons)
cls.doge_daemons.clear()
@classmethod
def addCoinSettings(cls, settings, datadir, node_id):
settings["chainclients"]["dogecoin"] = {
"connection_type": "rpc",
"manage_daemon": False,
"rpcport": DOGE_BASE_RPC_PORT + node_id,
"rpcuser": "test" + str(node_id),
"rpcpassword": "test_pass" + str(node_id),
"datadir": os.path.join(datadir, "doge_" + str(node_id)),
"bindir": DOGE_BINDIR,
"use_csv": False,
"use_segwit": False,
"blocks_confirmed": 1,
"min_relay_fee": 0.01, # RECOMMENDED_MIN_TX_FEE
}
offer_id = read_json_api(UI_PORT + 0, "offers/new", offer_json)["offer_id"]
logging.debug(f"offer_id {offer_id}")
wait_for_offers(self.delay_event, 1, 1, offer_id)
@classmethod
def coins_loop(cls):
super(Test, cls).coins_loop()
if cls.pause_chain:
return
ci0 = cls.swap_clients[0].ci(cls.test_coin)
try:
if cls.doge_addr is not None:
ci0.rpc_wallet("generatetoaddress", [1, cls.doge_addr])
except Exception as e:
logging.warning("coins_loop generate {}".format(e))
def callnoderpc(self, method, params=[], wallet=None, node_id=0):
return callnoderpc(
node_id, method, params, wallet, base_rpc_port=DOGE_BASE_RPC_PORT
)
def mineBlock(self, num_blocks: int = 1):
self.callnoderpc("generatetoaddress", [num_blocks, self.doge_addr])
def test_003_cltv(self):
logging.info("---------- Test {} cltv".format(self.test_coin.name))
ci = self.swap_clients[0].ci(self.test_coin)
self.pause_chain = True
try:
start_height: int = self.callnoderpc("getblockcount")
num_blocks: int = 1351 # consensus.BIP65Height = 1351;
if start_height < num_blocks:
to_mine = num_blocks - start_height
logging.info("Mining %d DOGE blocks to %s", to_mine, self.doge_addr)
ci.rpc("generatetoaddress", [to_mine, self.doge_addr])
# self.check_softfork_active("bip65") # TODO: Re-enable next version
chain_height: int = self.callnoderpc("getblockcount")
script = CScript(
[
chain_height + 3,
OP_CHECKLOCKTIMEVERIFY,
]
)
script_dest = ci.getScriptDest(script)
script_info = ci.rpc_wallet(
"decodescript",
[
script_dest.hex(),
],
)
script_addr = ci.encodeScriptDest(script_dest)
assert script_info["address"] == script_addr
prevout_amount: int = ci.make_int(1.1)
tx = CTransaction()
tx.nVersion = ci.txVersion()
tx.vout.append(ci.txoType()(prevout_amount, script_dest))
tx_hex = tx.serialize().hex()
tx = CTransaction()
tx.nVersion = ci.txVersion()
tx.vout.append(ci.txoType()(ci.make_int(1.1), script_dest))
tx_hex = ToHex(tx)
tx_funded = ci.rpc_wallet("fundrawtransaction", [tx_hex])
utxo_pos = 0 if tx_funded["changepos"] == 1 else 1
tx_signed = ci.rpc_wallet(
"signrawtransactionwithwallet",
[
tx_funded["hex"],
],
)["hex"]
txid = ci.rpc(
"sendrawtransaction",
[
tx_signed,
],
)
addr_out = ci.rpc_wallet(
"getnewaddress",
[
"cltv test",
],
)
pkh = ci.decodeAddress(addr_out)
script_out = ci.getScriptForPubkeyHash(pkh)
tx_spend = CTransaction()
tx_spend.nVersion = ci.txVersion()
tx_spend.nLockTime = chain_height + 3
tx_spend.vin.append(
CTxIn(
COutPoint(int(txid, 16), utxo_pos),
scriptSig=CScript(
[
script,
]
),
)
)
tx_spend.vout.append(ci.txoType()(ci.make_int(1.099), script_out))
tx_spend_hex = ToHex(tx_spend)
tx_spend.nLockTime = chain_height + 2
tx_spend_invalid_hex = ToHex(tx_spend)
for tx_hex in [tx_spend_invalid_hex, tx_spend_hex]:
try:
txid = self.callnoderpc(
"sendrawtransaction",
[
tx_hex,
],
)
except Exception as e:
assert "non-final" in str(
e
) or "Locktime requirement not satisfied" in str(e)
else:
assert False, "Should fail"
self.mineBlock(5)
txid = ci.rpc(
"sendrawtransaction",
[
tx_spend_hex,
],
)
self.mineBlock()
ci.rpc("syncwithvalidationinterfacequeue")
# Ensure tx was mined
tx_wallet = ci.rpc_wallet(
"gettransaction",
[
txid,
],
)
assert len(tx_wallet["blockhash"]) == 64
finally:
self.pause_chain = False
def test_010_txn_size(self):
logging.info("---------- Test {} txn size".format(self.test_coin.name))
swap_clients = self.swap_clients
ci = swap_clients[0].ci(self.test_coin)
amount: int = ci.make_int(random.uniform(0.1, 2.0), r=1)
# fee_rate is in sats/kvB
fee_rate: int = 1000000
# Test chain b (no-script) lock tx size
v = ci.getNewSecretKey()
s = ci.getNewSecretKey()
S = ci.getPubkey(s)
lock_tx_b_txid = ci.publishBLockTx(v, S, amount, fee_rate)
test_delay_event.wait(1)
addr_out = ci.getNewAddress(False)
lock_tx_b_spend_txid = ci.spendBLockTx(
lock_tx_b_txid, addr_out, v, s, amount, fee_rate, 0
)
test_delay_event.wait(1)
lock_tx_b_spend = ci.getWalletTransaction(lock_tx_b_spend_txid)
if lock_tx_b_spend is None:
lock_tx_b_spend = ci.getTransaction(lock_tx_b_spend_txid)
assert lock_tx_b_spend is not None
tx_obj = ci.loadTx(lock_tx_b_spend)
tx_out_value: int = tx_obj.vout[0].nValue
fee_paid = amount - tx_out_value
actual_size = len(lock_tx_b_spend)
expect_size: int = ci.xmr_swap_b_lock_spend_tx_vsize()
fee_expect = round(fee_rate * expect_size / 1000)
assert fee_expect == fee_paid
assert expect_size >= actual_size
assert expect_size - actual_size < 10
def test_01_a_full_swap(self):
self.do_test_01_full_swap(self.test_coin_from, self.test_coin_to)
def test_01_b_full_swap_reverse(self):
self.prepare_balance(self.test_coin_to, 100.0, 1800, 1801)
self.do_test_01_full_swap(self.test_coin_to, self.test_coin_from)
def test_01_c_full_swap_to_part(self):
self.do_test_01_full_swap(self.test_coin, Coins.PART)
def test_01_d_full_swap_from_part(self):
self.do_test_01_full_swap(Coins.PART, self.test_coin)
def test_02_a_leader_recover_a_lock_tx(self):
self.do_test_02_leader_recover_a_lock_tx(self.test_coin_from, self.test_coin_to)
def test_02_b_leader_recover_a_lock_tx_reverse(self):
self.prepare_balance(self.test_coin_to, 100.0, 1800, 1801)
self.do_test_02_leader_recover_a_lock_tx(self.test_coin_to, self.test_coin_from)
def test_03_a_follower_recover_a_lock_tx(self):
self.do_test_03_follower_recover_a_lock_tx(
self.test_coin_from, self.test_coin_to
)
def test_03_b_follower_recover_a_lock_tx_reverse(self):
self.prepare_balance(self.test_coin_to, 100.0, 1800, 1801)
self.do_test_03_follower_recover_a_lock_tx(
self.test_coin_to, self.test_coin_from
)
def test_03_e_follower_recover_a_lock_tx_mercy_release(self):
self.do_test_03_follower_recover_a_lock_tx(
self.test_coin_from, self.test_coin_to, with_mercy=True
)
def test_03_f_follower_recover_a_lock_tx_mercy_release_reverse(self):
self.prepare_balance(self.test_coin_to, 100.0, 1800, 1801)
self.prepare_balance(self.test_coin_from, 100.0, 1801, 1800)
self.do_test_03_follower_recover_a_lock_tx(
self.test_coin_to, self.test_coin_from, with_mercy=True
)
def test_04_a_follower_recover_b_lock_tx(self):
self.do_test_04_follower_recover_b_lock_tx(
self.test_coin_from, self.test_coin_to
)
def test_04_b_follower_recover_b_lock_tx_reverse(self):
self.prepare_balance(self.test_coin_to, 100.0, 1800, 1801)
self.do_test_04_follower_recover_b_lock_tx(
self.test_coin_to, self.test_coin_from
)
def test_05_self_bid(self):
self.do_test_05_self_bid(self.test_coin_from, self.test_coin_to)
if __name__ == "__main__":

View file

@ -0,0 +1,132 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2024 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
"""
export RESET_TEST=true
export TEST_PATH=/tmp/test_doge
mkdir -p ${TEST_PATH}/bin
cp -r ~/tmp/basicswap_bin/* ${TEST_PATH}/bin
export PYTHONPATH=$(pwd)
export TEST_COINS_LIST='bitcoin,dogecoin'
python tests/basicswap/extended/test_doge.py
"""
import sys
import logging
import unittest
from tests.basicswap.common import (
wait_for_balance,
)
from tests.basicswap.extended.test_xmr_persistent import (
BaseTestWithPrepare,
UI_PORT,
)
from tests.basicswap.extended.test_scripts import (
wait_for_offers,
)
from tests.basicswap.util import (
read_json_api,
)
logger = logging.getLogger()
logger.level = logging.DEBUG
if not len(logger.handlers):
logger.addHandler(logging.StreamHandler(sys.stdout))
def wait_for_bid(
delay_event, node_id, bid_id, state=None, sent: bool = False, num_tries: int = 40
) -> None:
for i in range(num_tries):
delay_event.wait(3)
if delay_event.is_set():
raise ValueError("Test stopped.")
bid = read_json_api(UI_PORT + node_id, f"bids/{bid_id}")
if "state" not in bid:
continue
if state is None:
return
if bid["state"].lower() == state.lower():
return
raise ValueError("wait_for_bid failed")
def prepare_balance(
delay_event,
node_id,
node_id_take_from,
coin_ticker,
amount,
wait_for: int = 20,
test_balance: bool = True,
) -> None:
print(f"prepare_balance on node {node_id}, {coin_ticker}: {amount}")
balance_type: str = "balance"
address_type: str = "deposit_address"
js_w = read_json_api(UI_PORT + node_id, "wallets")
current_balance: float = float(js_w[coin_ticker][balance_type])
if test_balance and current_balance >= amount:
return
post_json = {
"value": amount,
"address": js_w[coin_ticker][address_type],
"subfee": False,
}
json_rv = read_json_api(
UI_PORT + node_id_take_from,
"wallets/{}/withdraw".format(coin_ticker.lower()),
post_json,
)
assert len(json_rv["txid"]) == 64
wait_for_amount: float = amount
if not test_balance:
wait_for_amount += current_balance
wait_for_balance(
delay_event,
f"http://127.0.0.1:{UI_PORT + node_id}/json/wallets/{coin_ticker.lower()}",
balance_type,
wait_for_amount,
iterations=wait_for,
)
class DOGETest(BaseTestWithPrepare):
def test_a(self):
amount_from = 10.0
offer_json = {
"coin_from": "btc",
"coin_to": "doge",
"amt_from": amount_from,
"amt_to": 100.0,
"amt_var": True,
"lockseconds": 3600,
"automation_strat_id": 1,
}
offer_id = read_json_api(UI_PORT + 0, "offers/new", offer_json)["offer_id"]
logging.debug(f"offer_id {offer_id}")
prepare_balance(self.delay_event, 1, 0, "DOGE", 1000.0)
wait_for_offers(self.delay_event, 1, 1, offer_id)
post_json = {"offer_id": offer_id, "amount_from": amount_from}
bid_id = read_json_api(UI_PORT + 1, "bids/new", post_json)["bid_id"]
wait_for_bid(self.delay_event, 0, bid_id, "completed", num_tries=240)
wait_for_bid(self.delay_event, 1, bid_id, "completed")
if __name__ == "__main__":
unittest.main()

View file

@ -147,6 +147,8 @@ def wait_for_bids(delay_event, node_id, num_bids, offer_id=None) -> None:
logging.info(f"Waiting for {num_bids} bids on node {node_id}")
for i in range(20):
delay_event.wait(1)
if delay_event.is_set():
raise ValueError("Test stopped.")
if offer_id is not None:
bids = read_json_api(UI_PORT + node_id, "bids", {"offer_id": offer_id})
else:

View file

@ -235,7 +235,13 @@ def start_processes(self):
for i in range(NUM_NODES):
self.processes.append(
multiprocessing.Process(target=run_thread, args=(self, i,))
multiprocessing.Process(
target=run_thread,
args=(
self,
i,
),
)
)
self.processes[-1].start()
@ -287,9 +293,7 @@ def start_processes(self):
num_blocks: int = 431
have_blocks: int = callltcrpc(0, "getblockcount")
if have_blocks < 500:
logging.info(
"Mining %d Litecoin blocks to %s", num_blocks, self.ltc_addr
)
logging.info("Mining %d Litecoin blocks to %s", num_blocks, self.ltc_addr)
callltcrpc(
0,
"generatetoaddress",
@ -340,9 +344,7 @@ def start_processes(self):
],
)
self.update_thread_dcr = threading.Thread(
target=updateThreadDCR, args=(self,)
)
self.update_thread_dcr = threading.Thread(target=updateThreadDCR, args=(self,))
self.update_thread_dcr.start()
if "bitcoincash" in TEST_COINS_LIST:
@ -365,20 +367,18 @@ def start_processes(self):
)
if "dogecoin" in TEST_COINS_LIST:
self.doge_addr = calldogerpc(
0, "getnewaddress", ["mining_addr"])
self.doge_addr = calldogerpc(0, "getnewaddress", ["mining_addr"])
num_blocks: int = 200
have_blocks: int = calldogerpc(0, "getblockcount")
if have_blocks < num_blocks:
logging.info(
"Mining %d Dogecoin blocks to %s",
num_blocks - have_blocks,
self.bch_addr,
self.doge_addr,
)
calldogerpc(
0,
"generatetoaddress",
[num_blocks - have_blocks, self.doge_addr])
0, "generatetoaddress", [num_blocks - have_blocks, self.doge_addr]
)
if RESET_TEST:
# Lower output split threshold for more stakeable outputs
@ -411,6 +411,7 @@ def start_processes(self):
logging.info("PART blocks: %d", callpartrpc(0, "getblockcount"))
assert particl_blocks >= num_blocks
class BaseTestWithPrepare(unittest.TestCase):
__test__ = False
@ -492,7 +493,6 @@ class Test(BaseTestWithPrepare):
def test_persistent(self):
if self.run_test_persistent is False:
return
logging.info("[rm] Test::test_persistent")
while not self.delay_event.is_set():
logging.info("Looping indefinitely, ctrl+c to exit.")

View file

@ -377,6 +377,9 @@ class TestFunctions(BaseTest):
id_offerer: int = self.node_a_id
id_bidder: int = self.node_b_id
abandon_all_swaps(test_delay_event, self.swap_clients[id_offerer])
abandon_all_swaps(test_delay_event, self.swap_clients[id_bidder])
swap_clients = self.swap_clients
reverse_bid: bool = swap_clients[0].is_reverse_ads_bid(coin_from, coin_to)
ci_from = swap_clients[id_offerer].ci(coin_from)