Decred sighash and signing.

This commit is contained in:
tecnovert 2024-04-27 16:27:14 +02:00
parent 3db723056f
commit 154c6d6832
8 changed files with 256 additions and 31 deletions

View file

@ -18,7 +18,6 @@ from basicswap.interface import (
Curves)
from basicswap.util import (
ensure,
make_int,
b2h, i2b, b2i, i2h)
from basicswap.util.ecc import (
ep,
@ -763,7 +762,7 @@ class BTCInterface(Secp256k1Interface):
for pi in tx.vin:
ptx = self.rpc('getrawtransaction', [i2h(pi.prevout.hash), True])
prevout = ptx['vout'][pi.prevout.n]
inputs_value += make_int(prevout['value'])
inputs_value += self.make_int(prevout['value'])
prevout_type = prevout['scriptPubKey']['type']
if prevout_type == 'witness_v0_keyhash':
@ -930,25 +929,25 @@ class BTCInterface(Secp256k1Interface):
return True
def signTx(self, key_bytes, tx_bytes, input_n, prevout_script, prevout_value):
def signTx(self, key_bytes: bytes, tx_bytes: bytes, input_n: int, prevout_script: bytes, prevout_value: int) -> bytes:
tx = self.loadTx(tx_bytes)
sig_hash = SegwitV0SignatureHash(prevout_script, tx, input_n, SIGHASH_ALL, prevout_value)
eck = PrivateKey(key_bytes)
return eck.sign(sig_hash, hasher=None) + bytes((SIGHASH_ALL,))
def signTxOtVES(self, key_sign, pubkey_encrypt, tx_bytes, input_n, prevout_script, prevout_value):
def signTxOtVES(self, key_sign: bytes, pubkey_encrypt: bytes, tx_bytes: bytes, input_n: int, prevout_script: bytes, prevout_value: int) -> bytes:
tx = self.loadTx(tx_bytes)
sig_hash = SegwitV0SignatureHash(prevout_script, tx, input_n, SIGHASH_ALL, prevout_value)
return ecdsaotves_enc_sign(key_sign, pubkey_encrypt, sig_hash)
def verifyTxOtVES(self, tx_bytes, ct, Ks, Ke, input_n, prevout_script, prevout_value):
def verifyTxOtVES(self, tx_bytes: bytes, ct: bytes, Ks: bytes, Ke: bytes, input_n: int, prevout_script: bytes, prevout_value):
tx = self.loadTx(tx_bytes)
sig_hash = SegwitV0SignatureHash(prevout_script, tx, input_n, SIGHASH_ALL, prevout_value)
return ecdsaotves_enc_verify(Ks, Ke, sig_hash, ct)
def decryptOtVES(self, k, esig):
def decryptOtVES(self, k: bytes, esig: bytes) -> bytes:
return ecdsaotves_dec_sig(k, esig) + bytes((SIGHASH_ALL,))
def verifyTxSig(self, tx_bytes: bytes, sig: bytes, K: bytes, input_n: int, prevout_script: bytes, prevout_value: int) -> bool:

View file

@ -19,8 +19,91 @@ from basicswap.util.crypto import (
ripemd160,
)
from basicswap.util.extkey import ExtKeyPair
from basicswap.util.integer import encode_varint
from basicswap.interface.dcr.rpc import make_rpc_func
from .messages import CTransaction
from .messages import CTransaction, SigHashType, TxSerializeType
from .script import push_script_data
from coincurve.keys import (
PrivateKey
)
SigHashSerializePrefix: int = 1
SigHashSerializeWitness: int = 3
def DCRSignatureHash(sign_script: bytes, hash_type: SigHashType, tx: CTransaction, idx: int) -> bytes:
masked_hash_type = hash_type & SigHashType.SigHashMask
if masked_hash_type != SigHashType.SigHashAll:
raise ValueError('todo')
# Prefix hash
sign_tx_in_idx: int = idx
sign_vins = tx.vin
if hash_type & SigHashType.SigHashAnyOneCanPay != 0:
sign_vins = [tx.vin[idx],]
sign_tx_in_idx = 0
hash_buffer = bytearray()
version: int = tx.version | (SigHashSerializePrefix << 16)
hash_buffer += version.to_bytes(4, 'little')
hash_buffer += encode_varint(len(sign_vins))
for txi_n, txi in enumerate(sign_vins):
hash_buffer += txi.prevout.hash.to_bytes(32, 'little')
hash_buffer += txi.prevout.n.to_bytes(4, 'little')
hash_buffer += txi.prevout.tree.to_bytes(1)
# In the case of SigHashNone and SigHashSingle, commit to 0 for everything that is not the input being signed instead.
if (masked_hash_type == SigHashType.SigHashNone
or masked_hash_type == SigHashType.SigHashSingle) and \
sign_tx_in_idx != txi_n:
hash_buffer += (0).to_bytes(4, 'little')
else:
hash_buffer += txi.sequence.to_bytes(4, 'little')
hash_buffer += encode_varint(len(tx.vout))
for txo_n, txo in enumerate(tx.vout):
if masked_hash_type == SigHashType.SigHashSingle and \
idx != txo_n:
hash_buffer += (-1).to_bytes(8, 'little')
hash_buffer += txo.version.to_bytes(2, 'little')
hash_buffer += encode_varint(0)
continue
hash_buffer += txo.value.to_bytes(8, 'little')
hash_buffer += txo.version.to_bytes(2, 'little')
hash_buffer += encode_varint(len(txo.script_pubkey))
hash_buffer += txo.script_pubkey
hash_buffer += tx.locktime.to_bytes(4, 'little')
hash_buffer += tx.expiry.to_bytes(4, 'little')
prefix_hash = blake256(hash_buffer)
# Witness hash
hash_buffer.clear()
version: int = tx.version | (SigHashSerializeWitness << 16)
hash_buffer += version.to_bytes(4, 'little')
hash_buffer += encode_varint(len(sign_vins))
for txi_n, txi in enumerate(sign_vins):
if sign_tx_in_idx != txi_n:
hash_buffer += encode_varint(0)
continue
hash_buffer += encode_varint(len(sign_script))
hash_buffer += sign_script
witness_hash = blake256(hash_buffer)
hash_buffer.clear()
hash_buffer += hash_type.to_bytes(4, 'little')
hash_buffer += prefix_hash
hash_buffer += witness_hash
return blake256(hash_buffer)
class DCRInterface(Secp256k1Interface):
@ -121,7 +204,38 @@ class DCRInterface(Secp256k1Interface):
return hash160(ek_account.encode_p())
def decodeKey(self, encoded_key: str) -> (int, bytes):
key = b58decode(encoded_key)
checksum = key[-4:]
key = key[:-4]
if blake256(key)[:4] != checksum:
raise ValueError('Checksum mismatch')
return key[2], key[3:]
def loadTx(self, tx_bytes: bytes) -> CTransaction:
tx = CTransaction()
tx.deserialize(tx_bytes)
return tx
def signTx(self, key_bytes: bytes, tx_bytes: bytes, input_n: int, prevout_script: bytes, prevout_value: int) -> bytes:
tx = self.loadTx(tx_bytes)
sig_hash = DCRSignatureHash(prevout_script, SigHashType.SigHashAll, tx, input_n)
eck = PrivateKey(key_bytes)
return eck.sign(sig_hash, hasher=None) + bytes((SigHashType.SigHashAll,))
def setTxSignature(self, tx_bytes: bytes, stack, txi: int = 0) -> bytes:
tx = self.loadTx(tx_bytes)
script_data = bytearray()
for data in stack:
push_script_data(script_data, data)
tx.vin[txi].signature_script = script_data
return tx.serialize()
def stripTxSignature(self, tx_bytes) -> bytes:
tx = self.loadTx(tx_bytes)
return tx.serialize(TxSerializeType.NoWitness)

View file

@ -17,14 +17,37 @@ class TxSerializeType(IntEnum):
OnlyWitness = 2
class SigHashType(IntEnum):
SigHashAll = 0x1
SigHashNone = 0x2
SigHashSingle = 0x3
SigHashAnyOneCanPay = 0x80
SigHashMask = 0x1f
class SignatureType(IntEnum):
STEcdsaSecp256k1 = 0
STEd25519 = 1
STSchnorrSecp256k1 = 2
class COutpoint:
__slots__ = ('hash', 'n', 'tree')
def get_hash(self) -> bytes:
return self.hash.to_bytes(32, 'big')
class CTxIn:
__slots__ = ('prevout', 'sequence',
'value_in', 'block_height', 'block_index', 'signature_script') # Witness
def __init__(self, tx=None):
self.value_in = -1
self.block_height = 0
self.block_index = 0xffffffff
class CTxOut:
__slots__ = ('value', 'version', 'script_pubkey')
@ -47,7 +70,6 @@ class CTransaction:
self.locktime = tx.locktime
self.expiry = tx.expiry
def deserialize(self, data: bytes) -> None:
version = int.from_bytes(data[:4], 'little')
@ -92,6 +114,9 @@ class CTransaction:
self.expiry = int.from_bytes(data[o:o + 4], 'little')
o += 4
if ser_type == TxSerializeType.NoWitness:
return
num_wit_scripts, nb = decode_varint(data, o)
o += nb
@ -140,7 +165,8 @@ class CTransaction:
if ser_type == TxSerializeType.Full or ser_type == TxSerializeType.OnlyWitness:
data += encode_varint(len(self.vin))
for txi in self.vin:
data += txi.value_in.to_bytes(8, 'little')
tc_value_in = txi.value_in & 0xffffffffffffffff # Convert negative values
data += tc_value_in.to_bytes(8, 'little')
data += txi.block_height.to_bytes(4, 'little')
data += txi.block_index.to_bytes(4, 'little')
data += encode_varint(len(txi.signature_script))

View file

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2024 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
OP_0 = 0x00
OP_DATA_1 = 0x01
OP_1NEGATE = 0x4f
OP_1 = 0x51
OP_PUSHDATA1 = 0x4c
OP_PUSHDATA2 = 0x4d
OP_PUSHDATA4 = 0x4e
def push_script_data(data_array: bytearray, data: bytes) -> None:
len_data: int = len(data)
if len_data == 0 or (len_data == 1 and data[0] == 0):
data_array += bytes((OP_0,))
return
if len_data == 1 and data[0] <= 16:
data_array += bytes((OP_1 - 1 + data[0],))
return
if len_data == 1 and data[0] == 0x81:
data_array += bytes((OP_1NEGATE,))
return
if len_data < OP_PUSHDATA1:
data_array += bytes(((OP_DATA_1 - 1) + len_data,))
elif len_data <= 0xff:
data_array += bytes((OP_PUSHDATA1, len_data))
elif len_data <= 0xffff:
data_array += bytes((OP_PUSHDATA2,)) + len_data.to_bytes(2, 'little')
else:
data_array += bytes((OP_PUSHDATA4,)) + len_data.to_bytes(4, 'little')
data_array += data

View file

@ -7,9 +7,6 @@
from .btc import BTCInterface
from basicswap.chainparams import Coins
from basicswap.util import (
make_int,
)
class NMCInterface(BTCInterface):
@ -26,7 +23,7 @@ class NMCInterface(BTCInterface):
if txid and o['txid'] != txid.hex():
continue
# Verify amount
if make_int(o['amount']) != int(bid_amount):
if self.make_int(o['amount']) != int(bid_amount):
self._log.warning('Found output to lock tx address of incorrect value: %s, %s', str(o['amount']), o['txid'])
continue

View file

@ -18,7 +18,6 @@ from basicswap.contrib.test_framework.script import (
)
from basicswap.util import (
ensure,
make_int,
TemporaryError,
)
from basicswap.util.script import (
@ -345,7 +344,7 @@ class PARTInterfaceBlind(PARTInterface):
ensure(lock_output_n is not None, 'Output not found in tx')
# Check value
locked_txo_value = make_int(blinded_info['amount'])
locked_txo_value = self.make_int(blinded_info['amount'])
ensure(locked_txo_value == swap_value, 'Bad locked value')
# Check script
@ -359,7 +358,7 @@ class PARTInterfaceBlind(PARTInterface):
# TODO: Check that inputs are unspent, rangeproofs and commitments sum
# Verify fee rate
vsize = lock_tx_obj['vsize']
fee_paid = make_int(lock_tx_obj['vout'][0]['ct_fee'])
fee_paid = self.make_int(lock_tx_obj['vout'][0]['ct_fee'])
fee_rate_paid = fee_paid * 1000 // vsize
@ -394,7 +393,7 @@ class PARTInterfaceBlind(PARTInterface):
lock_refund_output_n, blinded_info = self.findOutputByNonce(lock_refund_tx_obj, nonce)
ensure(lock_refund_output_n is not None, 'Output not found in tx')
lock_refund_txo_value = make_int(blinded_info['amount'])
lock_refund_txo_value = self.make_int(blinded_info['amount'])
# Check script
lock_refund_txo_scriptpk = bytes.fromhex(lock_refund_tx_obj['vout'][lock_refund_output_n]['scriptPubKey']['hex'])
@ -415,7 +414,7 @@ class PARTInterfaceBlind(PARTInterface):
ensure(rv['inputs_valid'] is True, 'Invalid inputs')
# Check value
fee_paid = make_int(lock_refund_tx_obj['vout'][0]['ct_fee'])
fee_paid = self.make_int(lock_refund_tx_obj['vout'][0]['ct_fee'])
ensure(swap_value - lock_refund_txo_value == fee_paid, 'Bad output value')
# Check fee rate
@ -463,7 +462,7 @@ class PARTInterfaceBlind(PARTInterface):
dummy_witness_stack = self.getScriptLockRefundSpendTxDummyWitness(prevout_script)
witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack)
vsize = self.getTxVSize(self.loadTx(tx_bytes), add_witness_bytes=witness_bytes)
fee_paid = make_int(lock_refund_spend_tx_obj['vout'][0]['ct_fee'])
fee_paid = self.make_int(lock_refund_spend_tx_obj['vout'][0]['ct_fee'])
fee_rate_paid = fee_paid * 1000 // vsize
ensure(self.compareFeeRates(fee_rate_paid, feerate), 'Bad fee rate, expected: {}'.format(feerate))
@ -527,7 +526,7 @@ class PARTInterfaceBlind(PARTInterface):
rv = self.rpc_wallet('fundrawtransactionfrom', ['blind', lock_spend_tx_hex, inputs_info, outputs_info, options])
lock_spend_tx_hex = rv['hex']
lock_spend_tx_obj = self.rpc('decoderawtransaction', [lock_spend_tx_hex])
pay_fee = make_int(lock_spend_tx_obj['vout'][0]['ct_fee'])
pay_fee = self.make_int(lock_spend_tx_obj['vout'][0]['ct_fee'])
# lock_spend_tx_hex does not include the dummy witness stack
witness_bytes = self.getWitnessStackSerialisedLength(dummy_witness_stack)
@ -599,8 +598,8 @@ class PARTInterfaceBlind(PARTInterface):
ensure(rv['inputs_valid'] is True, 'Invalid inputs')
# Check amount
fee_paid = make_int(lock_spend_tx_obj['vout'][0]['ct_fee'])
amount_difference = make_int(input_blinded_info['amount']) - make_int(output_blinded_info['amount'])
fee_paid = self.make_int(lock_spend_tx_obj['vout'][0]['ct_fee'])
amount_difference = self.make_int(input_blinded_info['amount']) - self.make_int(output_blinded_info['amount'])
ensure(fee_paid == amount_difference, 'Invalid output amount')
# Check fee
@ -703,7 +702,7 @@ class PARTInterfaceBlind(PARTInterface):
assert (tx['outputs'][0]['stealth_address'] == sx_addr) # Should not be possible
ensure(tx['outputs'][0]['type'] == 'blind', 'Output is not anon')
if make_int(tx['outputs'][0]['amount']) == cb_swap_value:
if self.make_int(tx['outputs'][0]['amount']) == cb_swap_value:
height = 0
if tx['confirmations'] > 0:
chain_height = self.rpc('getblockcount')
@ -741,7 +740,7 @@ class PARTInterfaceBlind(PARTInterface):
raise ValueError('Too many spendable outputs')
utxo = utxos[0]
utxo_sats = make_int(utxo['amount'])
utxo_sats = self.make_int(utxo['amount'])
if spend_actual_balance and utxo_sats != cb_swap_value:
self._log.warning('Spending actual balance {}, not swap value {}.'.format(utxo_sats, cb_swap_value))
@ -841,7 +840,7 @@ class PARTInterfaceAnon(PARTInterface):
assert (tx['outputs'][0]['stealth_address'] == sx_addr) # Should not be possible
ensure(tx['outputs'][0]['type'] == 'anon', 'Output is not anon')
if make_int(tx['outputs'][0]['amount']) == cb_swap_value:
if self.make_int(tx['outputs'][0]['amount']) == cb_swap_value:
height = 0
if tx['confirmations'] > 0:
chain_height = self.rpc('getblockcount')
@ -874,7 +873,7 @@ class PARTInterfaceAnon(PARTInterface):
raise ValueError('Too many spendable outputs')
utxo = autxos[0]
utxo_sats = make_int(utxo['amount'])
utxo_sats = self.make_int(utxo['amount'])
if spend_actual_balance and utxo_sats != cb_swap_value:
self._log.warning('Spending actual balance {}, not swap value {}.'.format(utxo_sats, cb_swap_value))

View file

@ -30,7 +30,6 @@ from basicswap.util import (
i2b, b2i, b2h,
dumpj,
ensure,
make_int,
TemporaryError)
from basicswap.util.network import (
is_private_ip_address)
@ -490,7 +489,7 @@ class XMRInterface(CoinInterface):
return {'num_txns': len(rv['fee_list']), 'sum_amount': sum(rv['amount_list']), 'sum_fee': sum(rv['fee_list']), 'sum_weight': sum(rv['weight_list'])}
return rv['tx_hash_list'][0]
value_sats: int = make_int(value, self.exp())
value_sats: int = self.make_int(value)
params = {'destinations': [{'amount': value_sats, 'address': addr_to}], 'do_not_relay': estimate_fee}
if self._fee_priority > 0:
params['priority'] = self._fee_priority

View file

@ -16,10 +16,16 @@ import basicswap.config as cfg
from basicswap.basicswap import (
Coins,
)
from basicswap.util.crypto import hash160
from basicswap.util.crypto import (
hash160
)
from basicswap.interface.dcr.rpc import (
callrpc,
)
from basicswap.interface.dcr.messages import (
SigHashType,
TxSerializeType,
)
from tests.basicswap.common import (
stopDaemons,
waitForRPC,
@ -146,7 +152,7 @@ class Test(BaseTest):
if num_passed >= 5:
ci0.rpc('generate', [1,])
except Exception as e:
logging.warning('coins_loop generate {}'.format(e))
logging.warning('coins_loop generate {}'.format(e))
@classmethod
def prepareExtraDataDir(cls, i):
@ -256,7 +262,6 @@ class Test(BaseTest):
logging.info('---------- Test {} segwit'.format(self.test_coin_from.name))
swap_clients = self.swap_clients
ci0 = swap_clients[0].ci(self.test_coin_from)
assert (ci0.using_segwit() is True)
@ -285,6 +290,53 @@ class Test(BaseTest):
assert (ser_out.hex() == sfrtx['hex'])
assert (f_decoded['txid'] == ctx.TxHash().hex())
def test_003_signature_hash(self):
logging.info('---------- Test {} signature_hash'.format(self.test_coin_from.name))
# Test that signing a transaction manually produces the same result when signed with the wallet
swap_clients = self.swap_clients
ci0 = swap_clients[0].ci(self.test_coin_from)
utxos = ci0.rpc_wallet('listunspent')
addr_out = ci0.rpc_wallet('getnewaddress')
rtx = ci0.rpc_wallet('createrawtransaction', [[], {addr_out: 2.0}])
account_from = ci0.rpc_wallet('getaccount', [self.dcr_mining_addr, ])
frtx = ci0.rpc_wallet('fundrawtransaction', [rtx, account_from])
sfrtx = ci0.rpc_wallet('signrawtransaction', [frtx['hex']])
ctx = ci0.loadTx(bytes.fromhex(frtx['hex']))
prevout = None
prevout_txid = ctx.vin[0].prevout.get_hash().hex()
prevout_n = ctx.vin[0].prevout.n
for utxo in utxos:
if prevout_txid == utxo['txid'] and prevout_n == utxo['vout']:
prevout = utxo
break
assert (prevout is not None)
tx_bytes_no_witness: bytes = ctx.serialize(TxSerializeType.NoWitness)
sig0 = ci0.rpc_wallet('createsignature', [prevout['address'], 0, SigHashType.SigHashAll, prevout['scriptPubKey'], tx_bytes_no_witness.hex()])
priv_key_wif = ci0.rpc_wallet('dumpprivkey', [prevout['address'], ])
sig_type, key_bytes = ci0.decodeKey(priv_key_wif)
addr_info = ci0.rpc_wallet('validateaddress', [prevout['address'],])
pk_hex: str = addr_info['pubkey']
sig0_py = ci0.signTx(key_bytes, tx_bytes_no_witness, 0, bytes.fromhex(prevout['scriptPubKey']), ci0.make_int(prevout['amount']))
tx_bytes_signed = ci0.setTxSignature(tx_bytes_no_witness, [sig0_py, bytes.fromhex(pk_hex)])
# Set prevout value
ctx = ci0.loadTx(tx_bytes_signed)
ctx.vin[0].value_in = ci0.make_int(prevout['amount'])
tx_bytes_signed = ctx.serialize()
assert (tx_bytes_signed.hex() == sfrtx['hex'])
sent_txid = ci0.rpc_wallet('sendrawtransaction', [tx_bytes_signed.hex(), ])
assert (len(sent_txid) == 64)
if __name__ == '__main__':
unittest.main()