basicswap/tests/basicswap/test_ltc_xmr.py
2024-11-15 17:21:33 +02:00

342 lines
10 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2021-2024 tecnovert
# Copyright (c) 2024 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import random
import logging
import unittest
from basicswap.basicswap import (
Coins,
SwapTypes,
BidStates,
)
from basicswap.util import (
COIN,
)
from tests.basicswap.util import (
read_json_api,
)
from tests.basicswap.common import (
wait_for_bid,
wait_for_offer,
wait_for_in_progress,
TEST_HTTP_PORT,
LTC_BASE_RPC_PORT,
)
from .test_btc_xmr import BasicSwapTest, test_delay_event
from .test_xmr import pause_event
logger = logging.getLogger()
class TestLTC(BasicSwapTest):
__test__ = True
test_coin_from = Coins.LTC
start_ltc_nodes = True
base_rpc_port = LTC_BASE_RPC_PORT
def mineBlock(self, num_blocks=1):
self.callnoderpc("generatetoaddress", [num_blocks, self.ltc_addr])
def check_softfork_active(self, feature_name):
deploymentinfo = self.callnoderpc("getblockchaininfo")
assert deploymentinfo["softforks"][feature_name]["active"] is True
def test_001_nested_segwit(self):
logging.info(
"---------- Test {} p2sh nested segwit".format(self.test_coin_from.name)
)
logging.info("Skipped")
def test_002_native_segwit(self):
logging.info(
"---------- Test {} p2sh native segwit".format(self.test_coin_from.name)
)
ci = self.swap_clients[0].ci(self.test_coin_from)
addr_segwit = ci.rpc_wallet("getnewaddress", ["segwit test", "bech32"])
addr_info = ci.rpc_wallet(
"getaddressinfo",
[
addr_segwit,
],
)
assert addr_info["iswitness"] is True
txid = ci.rpc_wallet("sendtoaddress", [addr_segwit, 1.0])
assert len(txid) == 64
tx_wallet = ci.rpc_wallet(
"gettransaction",
[
txid,
],
)["hex"]
tx = ci.rpc(
"decoderawtransaction",
[
tx_wallet,
],
)
self.mineBlock()
ro = ci.rpc("scantxoutset", ["start", ["addr({})".format(addr_segwit)]])
assert len(ro["unspents"]) == 1
assert ro["unspents"][0]["txid"] == txid
prevout_n = -1
for txo in tx["vout"]:
if addr_segwit in txo["scriptPubKey"]["addresses"]:
prevout_n = txo["n"]
break
assert prevout_n > -1
tx_funded = ci.rpc(
"createrawtransaction",
[[{"txid": txid, "vout": prevout_n}], {addr_segwit: 0.99}],
)
tx_signed = ci.rpc_wallet(
"signrawtransactionwithwallet",
[
tx_funded,
],
)["hex"]
tx_funded_decoded = ci.rpc(
"decoderawtransaction",
[
tx_funded,
],
)
tx_signed_decoded = ci.rpc(
"decoderawtransaction",
[
tx_signed,
],
)
assert tx_funded_decoded["txid"] == tx_signed_decoded["txid"]
def test_007_hdwallet(self):
logging.info("---------- Test {} hdwallet".format(self.test_coin_from.name))
test_seed = "8e54a313e6df8918df6d758fafdbf127a115175fdd2238d0e908dd8093c9ac3b"
test_wif = (
self.swap_clients[0]
.ci(self.test_coin_from)
.encodeKey(bytes.fromhex(test_seed))
)
new_wallet_name = random.randbytes(10).hex()
self.callnoderpc("createwallet", [new_wallet_name])
self.callnoderpc("sethdseed", [True, test_wif], wallet=new_wallet_name)
addr = self.callnoderpc("getnewaddress", wallet=new_wallet_name)
self.callnoderpc("unloadwallet", [new_wallet_name])
assert addr == "rltc1qps7hnjd866e9ynxadgseprkc2l56m00djr82la"
def test_20_btc_coin(self):
logging.info("---------- Test BTC to {}".format(self.test_coin_from.name))
swap_clients = self.swap_clients
offer_id = swap_clients[0].postOffer(
Coins.BTC,
self.test_coin_from,
100 * COIN,
0.1 * COIN,
100 * COIN,
SwapTypes.SELLER_FIRST,
)
wait_for_offer(test_delay_event, swap_clients[1], offer_id)
offer = swap_clients[1].getOffer(offer_id)
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
wait_for_bid(test_delay_event, swap_clients[0], bid_id)
swap_clients[0].acceptBid(bid_id)
wait_for_in_progress(test_delay_event, swap_clients[1], bid_id, sent=True)
wait_for_bid(
test_delay_event,
swap_clients[0],
bid_id,
BidStates.SWAP_COMPLETED,
wait_for=60,
)
wait_for_bid(
test_delay_event,
swap_clients[1],
bid_id,
BidStates.SWAP_COMPLETED,
sent=True,
wait_for=60,
)
js_0 = read_json_api(1800)
js_1 = read_json_api(1801)
assert js_0["num_swapping"] == 0 and js_0["num_watched_outputs"] == 0
assert js_1["num_swapping"] == 0 and js_1["num_watched_outputs"] == 0
def test_21_mweb(self):
logging.info("---------- Test MWEB {}".format(self.test_coin_from.name))
swap_clients = self.swap_clients
ci0 = swap_clients[0].ci(self.test_coin_from)
ci1 = swap_clients[1].ci(self.test_coin_from)
mweb_addr_0 = ci0.rpc_wallet("getnewaddress", ["mweb addr test 0", "mweb"])
mweb_addr_1 = ci1.rpc_wallet("getnewaddress", ["mweb addr test 1", "mweb"])
addr_info0 = ci0.rpc_wallet(
"getaddressinfo",
[
mweb_addr_0,
],
)
assert addr_info0["ismweb"] is True
addr_info1 = ci1.rpc_wallet(
"getaddressinfo",
[
mweb_addr_1,
],
)
assert addr_info1["ismweb"] is True
trusted_before = ci0.rpc_wallet("getbalances")["mine"]["trusted"]
ci0.rpc_wallet("sendtoaddress", [mweb_addr_0, 10.0])
assert (
trusted_before - float(ci0.rpc_wallet("getbalances")["mine"]["trusted"])
< 0.1
)
try:
pause_event.clear() # Stop mining
ci0.rpc_wallet("sendtoaddress", [mweb_addr_1, 10.0])
found_unconfirmed: bool = False
for i in range(20):
test_delay_event.wait(1)
ltc_wallet = read_json_api(TEST_HTTP_PORT + 1, "wallets/ltc")
if float(ltc_wallet["unconfirmed"]) == 10.0:
found_unconfirmed = True
break
finally:
pause_event.set()
assert found_unconfirmed
self.mineBlock()
ci0.rpc_wallet("sendtoaddress", [mweb_addr_1, 10.0])
self.mineBlock()
utxos = ci1.rpc_wallet("listunspent")
mweb_tx = None
for utxo in utxos:
if utxo.get("address", "") == mweb_addr_1:
mweb_tx = utxo
assert mweb_tx is not None
unspent_addr = ci1.getUnspentsByAddr()
assert len(unspent_addr) > 0
for addr, _ in unspent_addr.items():
if "mweb1" in addr:
raise ValueError("getUnspentsByAddr should exclude mweb UTXOs.")
# TODO
def test_22_mweb_balance(self):
logging.info("---------- Test MWEB balance {}".format(self.test_coin_from.name))
swap_clients = self.swap_clients
ci_mweb = swap_clients[0].ci(Coins.LTC_MWEB)
mweb_addr_0 = ci_mweb.getNewAddress()
addr_info0 = ci_mweb.rpc_wallet(
"getaddressinfo",
[
mweb_addr_0,
],
)
assert addr_info0["ismweb"] is True
ltc_addr = read_json_api(TEST_HTTP_PORT + 0, "wallets/ltc/nextdepositaddr")
ltc_mweb_addr = read_json_api(
TEST_HTTP_PORT + 0, "wallets/ltc_mweb/nextdepositaddr"
)
ltc_mweb_addr2 = read_json_api(TEST_HTTP_PORT + 0, "wallets/ltc/newmwebaddress")
assert (
ci_mweb.rpc_wallet(
"getaddressinfo",
[
ltc_addr,
],
)["ismweb"]
is False
)
assert (
ci_mweb.rpc_wallet(
"getaddressinfo",
[
ltc_mweb_addr,
],
)["ismweb"]
is True
)
assert (
ci_mweb.rpc_wallet(
"getaddressinfo",
[
ltc_mweb_addr2,
],
)["ismweb"]
is True
)
post_json = {
"value": 10,
"address": ltc_mweb_addr,
"subfee": False,
}
json_rv = read_json_api(TEST_HTTP_PORT + 0, "wallets/ltc/withdraw", post_json)
assert len(json_rv["txid"]) == 64
self.mineBlock()
json_rv = read_json_api(TEST_HTTP_PORT + 0, "wallets/ltc", post_json)
assert json_rv["mweb_balance"] == 10.0
mweb_address = json_rv["mweb_address"]
post_json = {
"value": 11,
"address": mweb_address,
"subfee": False,
}
json_rv = read_json_api(TEST_HTTP_PORT + 0, "wallets/ltc/withdraw", post_json)
assert len(json_rv["txid"]) == 64
self.mineBlock()
json_rv = read_json_api(TEST_HTTP_PORT + 0, "wallets/ltc_mweb", post_json)
assert json_rv["mweb_balance"] == 21.0
assert json_rv["mweb_address"] == mweb_address
ltc_address = json_rv["deposit_address"]
# Check that spending the mweb balance takes from the correct wallet
post_json = {
"value": 1,
"address": ltc_address,
"subfee": False,
"type_from": "mweb",
}
json_rv = read_json_api(TEST_HTTP_PORT + 0, "wallets/ltc/withdraw", post_json)
assert len(json_rv["txid"]) == 64
json_rv = read_json_api(TEST_HTTP_PORT + 0, "wallets/ltc", post_json)
assert json_rv["mweb_balance"] <= 20.0
if __name__ == "__main__":
unittest.main()