basicswap/tests/basicswap/test_other.py
2024-11-15 17:21:33 +02:00

561 lines
20 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019-2024 tecnovert
# Copyright (c) 2024 The Basicswap developers
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import hashlib
import random
import secrets
import unittest
import basicswap.contrib.ed25519_fast as edf
import basicswap.ed25519_fast_util as edu
from coincurve.ed25519 import ed25519_get_pubkey
from coincurve.ecdsaotves import (
ecdsaotves_enc_sign,
ecdsaotves_enc_verify,
ecdsaotves_dec_sig,
ecdsaotves_rec_enc_key,
)
from coincurve.keys import PrivateKey
from basicswap.contrib.mnemonic import Mnemonic
from basicswap.util import i2b, h2b
from basicswap.util.address import decodeAddress
from basicswap.util.crypto import ripemd160, hash160, blake256
from basicswap.util.extkey import ExtKeyPair
from basicswap.util.integer import encode_varint, decode_varint
from basicswap.util.network import is_private_ip_address
from basicswap.util.rfc2440 import rfc2440_hash_password
from basicswap.util_xmr import encode_address as xmr_encode_address
from basicswap.interface.btc import BTCInterface
from basicswap.interface.xmr import XMRInterface
from tests.basicswap.mnemonics import mnemonics
from tests.basicswap.util import REQUIRED_SETTINGS
from basicswap.basicswap_util import TxLockTypes
from basicswap.util import (
make_int,
SerialiseNum,
format_amount,
DeserialiseNum,
validate_amount,
)
from basicswap.messages_npb import (
BidMessage,
)
from basicswap.contrib.test_framework.script import hash160 as hash160_btc
class Test(unittest.TestCase):
def test_serialise_num(self):
def test_case(v, nb=None):
b = SerialiseNum(v)
if nb is not None:
assert len(b) == nb
assert v == DeserialiseNum(b)
test_case(0, 1)
test_case(1, 1)
test_case(16, 1)
test_case(-1, 2)
test_case(17, 2)
test_case(500)
test_case(-500)
test_case(4194642)
def test_sequence(self):
coin_settings = {"rpcport": 0, "rpcauth": "none"}
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, "regtest")
time_val = 48 * 60 * 60
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_TIME, time_val)
decoded = ci.decodeSequence(encoded)
assert decoded >= time_val
assert decoded <= time_val + 512
time_val = 24 * 60
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_TIME, time_val)
decoded = ci.decodeSequence(encoded)
assert decoded >= time_val
assert decoded <= time_val + 512
blocks_val = 123
encoded = ci.getExpectedSequence(TxLockTypes.SEQUENCE_LOCK_BLOCKS, blocks_val)
decoded = ci.decodeSequence(encoded)
assert decoded == blocks_val
def test_make_int(self):
def test_case(vs, vf, expect_int):
i = make_int(vs)
assert i == expect_int and isinstance(i, int)
i = make_int(vf)
assert i == expect_int and isinstance(i, int)
vs_out = format_amount(i, 8)
# Strip
for i in range(7):
if vs_out[-1] == "0":
vs_out = vs_out[:-1]
if "." in vs:
assert vs_out == vs
else:
assert vs_out[:-2] == vs
test_case("0", 0, 0)
test_case("1", 1, 100000000)
test_case("10", 10, 1000000000)
test_case("0.00899999", 0.00899999, 899999)
test_case("899999.0", 899999.0, 89999900000000)
test_case("899999.00899999", 899999.00899999, 89999900899999)
test_case("0.0", 0.0, 0)
test_case("1.0", 1.0, 100000000)
test_case("1.1", 1.1, 110000000)
test_case("1.2", 1.2, 120000000)
test_case("0.00899991", 0.00899991, 899991)
test_case("0.0089999", 0.0089999, 899990)
test_case("0.0089991", 0.0089991, 899910)
test_case("0.123", 0.123, 12300000)
test_case("123000.000123", 123000.000123, 12300000012300)
try:
make_int("0.123456789")
assert False
except Exception as e:
assert str(e) == "Mantissa too long"
validate_amount("0.12345678")
# floor
assert make_int("0.123456789", r=-1) == 12345678
# Round up
assert make_int("0.123456789", r=1) == 12345679
def test_make_int12(self):
def test_case(vs, vf, expect_int):
i = make_int(vs, 12)
assert i == expect_int and isinstance(i, int)
i = make_int(vf, 12)
assert i == expect_int and isinstance(i, int)
vs_out = format_amount(i, 12)
# Strip
for i in range(7):
if vs_out[-1] == "0":
vs_out = vs_out[:-1]
if "." in vs:
assert vs_out == vs
else:
assert vs_out[:-2] == vs
test_case("0.123456789", 0.123456789, 123456789000)
test_case("0.123456789123", 0.123456789123, 123456789123)
try:
make_int("0.1234567891234", 12)
assert False
except Exception as e:
assert str(e) == "Mantissa too long"
validate_amount("0.123456789123", 12)
try:
validate_amount("0.1234567891234", 12)
assert False
except Exception as e:
assert "Too many decimal places" in str(e)
try:
validate_amount(0.1234567891234, 12)
assert False
except Exception as e:
assert "Too many decimal places" in str(e)
def test_ed25519(self):
privkey = edu.get_secret()
pubkey = edu.encodepoint(edf.scalarmult_B(privkey))
privkey_bytes = i2b(privkey)
pubkey_test = ed25519_get_pubkey(privkey_bytes)
assert pubkey == pubkey_test
def test_ecdsa_otves(self):
coin_settings = {"rpcport": 0, "rpcauth": "none"}
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, "regtest")
vk_sign = ci.getNewSecretKey()
vk_encrypt = ci.getNewSecretKey()
pk_sign = ci.getPubkey(vk_sign)
pk_encrypt = ci.getPubkey(vk_encrypt)
sign_hash = secrets.token_bytes(32)
cipher_text = ecdsaotves_enc_sign(vk_sign, pk_encrypt, sign_hash)
assert ecdsaotves_enc_verify(pk_sign, pk_encrypt, sign_hash, cipher_text)
sig = ecdsaotves_dec_sig(vk_encrypt, cipher_text)
assert ci.verifySig(pk_sign, sign_hash, sig)
recovered_key = ecdsaotves_rec_enc_key(pk_encrypt, cipher_text, sig)
assert vk_encrypt == recovered_key
def test_sign(self):
coin_settings = {"rpcport": 0, "rpcauth": "none"}
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, "regtest")
vk = ci.getNewSecretKey()
pk = ci.getPubkey(vk)
message = "test signing message"
message_hash = hashlib.sha256(bytes(message, "utf-8")).digest()
eck = PrivateKey(vk)
sig = eck.sign(message.encode("utf-8"))
ci.verifySig(pk, message_hash, sig)
def test_sign_compact(self):
coin_settings = {"rpcport": 0, "rpcauth": "none"}
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, "regtest")
vk = ci.getNewSecretKey()
pk = ci.getPubkey(vk)
sig = ci.signCompact(vk, "test signing message")
assert len(sig) == 64
ci.verifyCompactSig(pk, "test signing message", sig)
# Nonce is set deterministically (using default libsecp256k1 method rfc6979)
sig2 = ci.signCompact(vk, "test signing message")
assert sig == sig2
def test_sign_recoverable(self):
coin_settings = {"rpcport": 0, "rpcauth": "none"}
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, "regtest")
vk = ci.getNewSecretKey()
pk = ci.getPubkey(vk)
sig = ci.signRecoverable(vk, "test signing message")
assert len(sig) == 65
pk_rec = ci.verifySigAndRecover(sig, "test signing message")
assert pk == pk_rec
# Nonce is set deterministically (using default libsecp256k1 method rfc6979)
sig2 = ci.signRecoverable(vk, "test signing message")
assert sig == sig2
def test_pubkey_to_address(self):
coin_settings = {"rpcport": 0, "rpcauth": "none"}
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, "regtest")
pk = h2b("02c26a344e7d21bcc6f291532679559f2fd234c881271ff98714855edc753763a6")
addr = ci.pubkey_to_address(pk)
assert addr == "mj6SdSxmWRmdDqR5R3FfZmRiLmQfQAsLE8"
def test_dleag(self):
coin_settings = {"rpcport": 0, "walletrpcport": 0, "walletrpcauth": "none"}
coin_settings.update(REQUIRED_SETTINGS)
ci = XMRInterface(coin_settings, "regtest")
key = ci.getNewSecretKey()
proof = ci.proveDLEAG(key)
assert ci.verifyDLEAG(proof)
def test_rate(self):
scale_from = 8
scale_to = 12
amount_from = make_int(100, scale_from)
rate = make_int(0.1, scale_to)
amount_to = int((amount_from * rate) // (10**scale_from))
assert "100.00000000" == format_amount(amount_from, scale_from)
assert "10.000000000000" == format_amount(amount_to, scale_to)
rate_check = make_int((amount_to / amount_from), scale_from)
assert rate == rate_check
scale_from = 12
scale_to = 8
amount_from = make_int(1, scale_from)
rate = make_int(12, scale_to)
amount_to = int((amount_from * rate) // (10**scale_from))
assert "1.000000000000" == format_amount(amount_from, scale_from)
assert "12.00000000" == format_amount(amount_to, scale_to)
rate_check = make_int((amount_to / amount_from), scale_from)
assert rate == rate_check
scale_from = 8
scale_to = 8
amount_from = make_int(0.073, scale_from)
amount_to = make_int(10, scale_to)
rate = make_int(amount_to / amount_from, scale_to, r=1)
amount_to_recreate = int((amount_from * rate) // (10**scale_from))
assert "10.00000000" == format_amount(amount_to_recreate, scale_to)
scale_from = 8
scale_to = 12
amount_from = make_int(10.0, scale_from)
amount_to = make_int(0.06935, scale_to)
rate = make_int(amount_to / amount_from, scale_from, r=1)
amount_to_recreate = int((amount_from * rate) // (10**scale_from))
assert "0.069350000000" == format_amount(amount_to_recreate, scale_to)
scale_from = 12
scale_to = 8
amount_from = make_int(0.06935, scale_from)
amount_to = make_int(10.0, scale_to)
rate = make_int(amount_to / amount_from, scale_from, r=1)
amount_to_recreate = int((amount_from * rate) // (10**scale_from))
assert "10.00000000" == format_amount(amount_to_recreate, scale_to)
coin_settings = {
"rpcport": 0,
"rpcauth": "none",
"walletrpcport": 0,
"walletrpcauth": "none",
}
coin_settings.update(REQUIRED_SETTINGS)
ci_xmr = XMRInterface(coin_settings, "regtest")
ci_btc = BTCInterface(coin_settings, "regtest")
for i in range(10000):
test_pairs = random.randint(0, 3)
if test_pairs == 0:
ci_from = ci_btc
ci_to = ci_xmr
elif test_pairs == 1:
ci_from = ci_xmr
ci_to = ci_btc
elif test_pairs == 2:
ci_from = ci_xmr
ci_to = ci_xmr
else:
ci_from = ci_btc
ci_to = ci_btc
test_range = random.randint(0, 5)
if test_range == 0:
amount_from = random.randint(10000, 1 * ci_from.COIN())
elif test_range == 1:
amount_from = random.randint(10000, 1000 * ci_from.COIN())
elif test_range == 2:
amount_from = random.randint(10000, 2100 * ci_from.COIN())
elif test_range == 3:
amount_from = random.randint(10000, 210000 * ci_from.COIN())
elif test_range == 4:
amount_from = random.randint(10000, 21000000 * ci_from.COIN())
else:
amount_from = random.randint(10000, 2100000000 * ci_from.COIN())
test_range = random.randint(0, 5)
if test_range == 0:
amount_to = random.randint(10000, 1 * ci_to.COIN())
elif test_range == 1:
amount_to = random.randint(10000, 1000 * ci_to.COIN())
elif test_range == 2:
amount_to = random.randint(10000, 2100 * ci_to.COIN())
elif test_range == 3:
amount_to = random.randint(10000, 210000 * ci_to.COIN())
elif test_range == 4:
amount_to = random.randint(10000, 21000000 * ci_to.COIN())
else:
amount_to = random.randint(10000, 2100000000 * ci_to.COIN())
offer_rate = ci_from.make_int(amount_to / amount_from, r=1)
amount_to_from_rate: int = int(
(int(amount_from) * offer_rate) // (10**scale_from)
)
scale_from = 24
offer_rate = make_int(amount_to, scale_from) // amount_from
amount_to_from_rate: int = int(
(int(amount_from) * offer_rate) // (10**scale_from)
)
if abs(amount_to - amount_to_from_rate) == 1:
offer_rate += 1
offer_rate_human_read: int = int(
offer_rate // (10 ** (scale_from - ci_from.exp()))
)
amount_to_from_rate: int = int(
(int(amount_from) * offer_rate) // (10**scale_from)
)
if amount_to != amount_to_from_rate:
print("from exp, amount", ci_from.exp(), amount_from)
print("to exp, amount", ci_to.exp(), amount_to)
print("offer_rate_human_read", offer_rate_human_read)
print("amount_to_from_rate", amount_to_from_rate)
raise ValueError("Bad amount_to")
scale_to = 24
reversed_rate = make_int(amount_from, scale_to) // amount_to
amount_from_from_rate: int = int(
(int(amount_to) * reversed_rate) // (10**scale_to)
)
if abs(amount_from - amount_from_from_rate) == 1:
reversed_rate += 1
amount_from_from_rate: int = int(
(int(amount_to) * reversed_rate) // (10**scale_to)
)
if amount_from != amount_from_from_rate:
print("from exp, amount", ci_from.exp(), amount_from)
print("to exp, amount", ci_to.exp(), amount_to)
print("amount_from_from_rate", amount_from_from_rate)
raise ValueError("Bad amount_from")
def test_rfc2440(self):
password = "test"
salt = bytes.fromhex("B7A94A7E4988630E")
password_hash = rfc2440_hash_password(password, salt=salt)
assert (
password_hash
== "16:B7A94A7E4988630E6095334BA67F06FBA509B2A7136A04C9C1B430F539"
)
def test_ripemd160(self):
input_data = b"hash this"
assert ripemd160(input_data).hex() == "d5443a154f167e2c1332f6de72cfb4c6ab9c8c17"
def test_hash160(self):
# hash160 is RIPEMD(SHA256(data))
input_data = b"hash this"
assert hash160(input_data).hex() == "072985b3583a4a71f548494a5e1d5f6b00d0fe13"
assert (
hash160_btc(input_data).hex() == "072985b3583a4a71f548494a5e1d5f6b00d0fe13"
)
def test_protobuf(self):
msg_buf = BidMessage()
msg_buf.protocol_version = 2
msg_buf.time_valid = 1024
serialised_msg = msg_buf.to_bytes()
msg_buf_2 = BidMessage()
msg_buf_2.from_bytes(serialised_msg)
assert msg_buf_2.protocol_version == 2
assert msg_buf_2.time_valid == 1024
assert msg_buf_2.amount == 0
assert msg_buf_2.pkhash_buyer is not None
assert len(msg_buf_2.pkhash_buyer) == 0
# Decode only the first field
msg_buf_3 = BidMessage()
msg_buf_3.from_bytes(serialised_msg[:2])
assert msg_buf_3.protocol_version == 2
assert msg_buf_3.time_valid == 0
try:
_ = BidMessage(doesnotexist=1)
except Exception as e:
assert "unexpected keyword argument" in str(e)
else:
raise ValueError("Should have errored.")
def test_is_private_ip_address(self):
test_addresses = [
("localhost", True),
("127.0.0.1", True),
("10.0.0.0", True),
("172.16.0.0", True),
("192.168.0.0", True),
("20.87.245.0", False),
("particl.io", False),
]
for addr, is_private in test_addresses:
assert is_private_ip_address(addr) is is_private
def test_varint(self):
test_vectors = [
(0, 1),
(1, 1),
(127, 1),
(128, 2),
(253, 2),
(8321, 2),
(16383, 2),
(16384, 3),
(2097151, 3),
(2097152, 4),
]
for i, expect_length in test_vectors:
b = encode_varint(i)
assert len(b) == expect_length
assert decode_varint(b) == (i, expect_length)
def test_base58(self):
kv = edu.get_secret()
Kv = edu.encodepoint(edf.scalarmult_B(kv))
ks = edu.get_secret()
Ks = edu.encodepoint(edf.scalarmult_B(ks))
addr = xmr_encode_address(Kv, Ks)
assert addr.startswith("4")
addr = xmr_encode_address(Kv, Ks, 4146)
assert addr.startswith("Wo")
def test_blake256(self):
test_vectors = [
("716f6e863f744b9ac22c97ec7b76ea5f5908bc5b2f67c61510bfc4751384ea7a", b""),
(
"7576698ee9cad30173080678e5965916adbb11cb5245d386bf1ffda1cb26c9d7",
b"The quick brown fox jumps over the lazy dog",
),
]
for expect_hash, data in test_vectors:
assert blake256(data).hex() == expect_hash
def test_extkey(self):
test_key = "XPARHAr37YxmFP8wyjkaHAQWmp84GiyLikL7EL8j9BCx4LkB8Q1Bw5Kr8sA1GA3Ym53zNLcaxxFHr6u81JVTeCaD61c6fKS1YRAuti8Zu5SzJCjh"
test_key_c0 = "XPARHAt1XMcNYAwP5wEnQXknBAkGSzaetdZt2eoJZehdB4WXfV1xbSjpgHe44AivmumcSejW5KaYx6L5M6MyR1WyXrsWTwaiUEfHq2RrqCfXj3ZW"
test_key_c0_p = "PPARTKPL4rp5WLnrYP6jZfuRjx6jrmvbsz5QdHofPfFqJdm918mQwdPLq6Dd9TkdbQeKUqjbHWkyzWe7Pftd7itzm7ETEoUMq4cbG4fY9FKH1YSU"
test_key_c0h = "XPARHAt1XMcNgWbv48LwoQbjs1bC8kCXKomzvJLRT5xmbQ2GKf9e8Vfr1MMcfiWJC34RyDp5HvAfjeiNyLDfkFm1UrRCrPkVC9GGaAWa3nXMWew8"
ek_data = decodeAddress(test_key)[4:]
ek = ExtKeyPair()
ek.decode(ek_data)
assert ek.encode_v() == ek_data
m_0 = ek.derive(0)
ek_c0_data = decodeAddress(test_key_c0)[4:]
assert m_0.encode_v() == ek_c0_data
child_no: int = 0 | (1 << 31)
m_0h = ek.derive(child_no)
ek_c0h_data = decodeAddress(test_key_c0h)[4:]
assert m_0h.encode_v() == ek_c0h_data
ek.neuter()
assert ek.has_key() is False
m_0 = ek.derive(0)
ek_c0_p_data = decodeAddress(test_key_c0_p)[4:]
assert m_0.encode_p() == ek_c0_p_data
def test_mnemonic(self):
entropy0: bytes = Mnemonic("english").to_entropy(mnemonics[0])
assert entropy0.hex() == "0002207e9b744ea2d7ab41702f31f000"
mnemonic_recovered: str = Mnemonic("english").to_mnemonic(entropy0)
assert mnemonic_recovered == mnemonics[0]
if __name__ == "__main__":
unittest.main()