tests: Manual recovery of xmrswap chain B lock tx

This commit is contained in:
tecnovert 2021-12-19 10:55:29 +02:00
parent f289bcf2e8
commit a802788cfd
No known key found for this signature in database
GPG key ID: 8ED6D8750C4E3F93
5 changed files with 81 additions and 38 deletions

View file

@ -122,6 +122,7 @@ from .basicswap_util import (
getOfferProofOfFundsHash,
getLastBidState,
isActiveBidState)
from .protocols.xmr_swap_1 import recoverNoScriptTxnWithKey
non_script_type_coins = (Coins.XMR, Coins.PART_ANON)
@ -4927,40 +4928,6 @@ class BasicSwap(BaseApp):
finally:
self.mxDB.release()
def recoverNoScriptTxnWithKey(self, bid_id, encoded_key):
# Manually recover txn if other key is known
session = scoped_session(self.session_factory)
try:
bid, xmr_swap = self.getXmrBidFromSession(session, bid_id)
ensure(bid, 'Bid not found: {}.'.format(bid_id.hex()))
ensure(xmr_swap, 'XMR swap not found: {}.'.format(bid_id.hex()))
offer, xmr_offer = self.getXmrOfferFromSession(session, bid.offer_id, sent=False)
ensure(offer, 'Offer not found: {}.'.format(bid.offer_id.hex()))
ensure(xmr_offer, 'XMR offer not found: {}.'.format(bid.offer_id.hex()))
ci_to = self.ci(offer.coin_to)
for_ed25519 = True if Coins(offer.coin_to) == Coins.XMR else False
if bid.was_sent:
kbsl = ci_to.decodeKey(encoded_key)
kbsf = self.getPathKey(offer.coin_from, offer.coin_to, bid.created_at, xmr_swap.contract_count, KeyTypes.KBSF, for_ed25519)
else:
kbsl = self.getPathKey(offer.coin_from, offer.coin_to, bid.created_at, xmr_swap.contract_count, KeyTypes.KBSL, for_ed25519)
kbsf = ci_to.decodeKey(encoded_key)
ensure(ci_to.verifyKey(kbsl), 'Invalid kbsl')
ensure(ci_to.verifyKey(kbsf), 'Invalid kbsf')
vkbs = ci_to.sumKeys(kbsl, kbsf)
address_to = self.getCachedMainWalletAddress(ci_to)
txid = ci_to.spendBLockTx(xmr_swap.b_lock_tx_id, address_to, xmr_swap.vkbv, vkbs, bid.amount_to, xmr_offer.b_fee_rate, bid.chain_b_height_start)
self.log.debug('Submitted lock B spend txn %s to %s chain for bid %s', txid.hex(), ci_to.coin_name(), bid_id.hex())
self.logBidEvent(bid.bid_id, EventLogTypes.LOCK_TX_B_SPEND_TX_PUBLISHED, txid.hex(), session)
session.commit()
return txid.hex()
finally:
session.close()
session.remove()
def manualBidUpdate(self, bid_id, data):
self.log.info('Manually updating bid %s', bid_id.hex())
self.mxDB.acquire()
@ -4984,7 +4951,7 @@ class BasicSwap(BaseApp):
has_changed = True
if data['kbs_other'] is not None:
return self.recoverNoScriptTxnWithKey(bid_id, data['kbs_other'])
return recoverNoScriptTxnWithKey(self, bid_id, data['kbs_other'])
if has_changed:
session = scoped_session(self.session_factory)

View file

@ -28,6 +28,7 @@ from .ui import (
have_data_entry,
tickerToCoinId,
)
from .protocols.xmr_swap_1 import recoverNoScriptTxnWithKey, getChainBSplitKey
def js_error(self, error_str):
@ -220,6 +221,13 @@ def js_bids(self, url_split, post_string, is_json):
bid, xmr_swap, offer, xmr_offer, events = swap_client.getXmrBidAndOffer(bid_id)
assert(bid), 'Unknown bid ID'
if post_string != '':
if have_data_entry(post_data, 'chainbkeysplit'):
return bytes(json.dumps({'splitkey': getChainBSplitKey(swap_client, bid, xmr_swap, offer)}), 'UTF-8')
elif have_data_entry(post_data, 'spendchainblocktx'):
remote_key = get_data_entry(post_data, 'remote_key')
return bytes(json.dumps({'txid': recoverNoScriptTxnWithKey(swap_client, bid_id, remote_key).hex()}), 'UTF-8')
edit_bid = False
show_txns = False
data = describeBid(swap_client, bid, xmr_swap, offer, xmr_offer, events, edit_bid, show_txns, for_api=True)

View file

@ -3,3 +3,58 @@
# Copyright (c) 2020 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
from sqlalchemy.orm import scoped_session
from basicswap.util import (
ensure,
)
from basicswap.chainparams import (
Coins,
)
from basicswap.basicswap_util import (
KeyTypes,
EventLogTypes,
)
def recoverNoScriptTxnWithKey(self, bid_id, encoded_key):
# Manually recover txn if other key is known
session = scoped_session(self.session_factory)
try:
bid, xmr_swap = self.getXmrBidFromSession(session, bid_id)
ensure(bid, 'Bid not found: {}.'.format(bid_id.hex()))
ensure(xmr_swap, 'XMR swap not found: {}.'.format(bid_id.hex()))
offer, xmr_offer = self.getXmrOfferFromSession(session, bid.offer_id, sent=False)
ensure(offer, 'Offer not found: {}.'.format(bid.offer_id.hex()))
ensure(xmr_offer, 'XMR offer not found: {}.'.format(bid.offer_id.hex()))
ci_to = self.ci(offer.coin_to)
for_ed25519 = True if Coins(offer.coin_to) == Coins.XMR else False
if bid.was_sent:
kbsl = ci_to.decodeKey(encoded_key)
kbsf = self.getPathKey(offer.coin_from, offer.coin_to, bid.created_at, xmr_swap.contract_count, KeyTypes.KBSF, for_ed25519)
else:
kbsl = self.getPathKey(offer.coin_from, offer.coin_to, bid.created_at, xmr_swap.contract_count, KeyTypes.KBSL, for_ed25519)
kbsf = ci_to.decodeKey(encoded_key)
ensure(ci_to.verifyKey(kbsl), 'Invalid kbsl')
ensure(ci_to.verifyKey(kbsf), 'Invalid kbsf')
vkbs = ci_to.sumKeys(kbsl, kbsf)
address_to = self.getCachedMainWalletAddress(ci_to)
txid = ci_to.spendBLockTx(xmr_swap.b_lock_tx_id, address_to, xmr_swap.vkbv, vkbs, bid.amount_to, xmr_offer.b_fee_rate, bid.chain_b_height_start)
self.log.debug('Submitted lock B spend txn %s to %s chain for bid %s', txid.hex(), ci_to.coin_name(), bid_id.hex())
self.logBidEvent(bid.bid_id, EventLogTypes.LOCK_TX_B_SPEND_TX_PUBLISHED, txid.hex(), session)
session.commit()
return txid
finally:
session.close()
session.remove()
def getChainBSplitKey(swap_client, bid, xmr_swap, offer):
ci_to = swap_client.ci(offer.coin_to)
key_type = KeyTypes.KBSF if bid.was_sent else KeyTypes.KBSL
return ci_to.encodeKey(swap_client.getPathKey(offer.coin_from, offer.coin_to, bid.created_at, xmr_swap.contract_count, key_type, True if offer.coin_to == Coins.XMR else False))

View file

@ -14,7 +14,6 @@ from .chainparams import (
)
from .basicswap_util import (
TxTypes,
KeyTypes,
TxStates,
BidStates,
SwapTypes,
@ -26,6 +25,8 @@ from .basicswap_util import (
getLastBidState,
)
from .protocols.xmr_swap_1 import getChainBSplitKey
PAGE_LIMIT = 50
@ -283,8 +284,7 @@ def describeBid(swap_client, bid, xmr_swap, offer, xmr_offer, bid_events, edit_b
data['xmr_b_shared_address'] = ci_to.encodeSharedAddress(xmr_swap.pkbv, xmr_swap.pkbs) if xmr_swap.pkbs else None
if swap_client.debug_ui:
key_type = KeyTypes.KBSF if bid.was_sent else KeyTypes.KBSL
data['xmr_b_half_privatekey'] = ci_to.encodeKey(swap_client.getPathKey(offer.coin_from, offer.coin_to, bid.created_at, xmr_swap.contract_count, key_type, True if offer.coin_to == Coins.XMR else False))
data['xmr_b_half_privatekey'] = getChainBSplitKey(swap_client, bid, xmr_swap, offer)
if show_lock_transfers:
if xmr_swap.pkbs:

View file

@ -9,6 +9,7 @@ import json
import random
import logging
import unittest
from urllib import parse
from urllib.request import urlopen
from basicswap.basicswap import (
@ -189,6 +190,18 @@ class Test(BaseTest):
wait_for_none_active(test_delay_event, 1800)
wait_for_none_active(test_delay_event, 1801)
data = parse.urlencode({
'chainbkeysplit': True
}).encode()
offerer_key = json.loads(urlopen('http://127.0.0.1:1800/json/bids/{}'.format(bid_id.hex()), data=data).read())['splitkey']
data = parse.urlencode({
'spendchainblocktx': True,
'remote_key': offerer_key
}).encode()
redeemed_txid = json.loads(urlopen('http://127.0.0.1:1801/json/bids/{}'.format(bid_id.hex()), data=data).read())['txid']
assert(len(redeemed_txid) == 64)
def test_04_follower_recover_b_lock_tx(self):
logging.info('---------- Test PARTct to XMR follower recovers coin b lock tx')