From c7c49ae2625718919add26ff018830691f00c799 Mon Sep 17 00:00:00 2001 From: tecnovert Date: Sat, 9 Nov 2019 23:09:22 +0200 Subject: [PATCH] Delayed events. --- .travis.yml | 2 +- basicswap/basicswap.py | 94 ++++++++++++++++++++++++++++------ basicswap/db.py | 9 +++- bin/basicswap_prepare.py | 2 +- tests/basicswap/test_reload.py | 2 +- tests/basicswap/test_run.py | 52 ++++++++++++++++++- 6 files changed, 139 insertions(+), 22 deletions(-) diff --git a/.travis.yml b/.travis.yml index 933390a..930b182 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ stages: - test env: global: - - PART_VERSION=0.18.1.5 + - PART_VERSION=0.18.1.6 - BTC_VERSION=0.18.1 - LTC_VERSION=0.17.1 - TEST_DIR=~/test_basicswap2/ diff --git a/basicswap/basicswap.py b/basicswap/basicswap.py index 8afd611..cc33bc4 100644 --- a/basicswap/basicswap.py +++ b/basicswap/basicswap.py @@ -17,6 +17,8 @@ import logging import sqlalchemy as sa import shutil import json +import random +import secrets from sqlalchemy.orm import sessionmaker, scoped_session from enum import IntEnum, auto @@ -55,6 +57,7 @@ from .db import ( PooledAddress, SentOffer, SmsgAddress, + EventQueue, ) from .explorers import ExplorerInsight, ExplorerBitAps, ExplorerChainz @@ -80,6 +83,8 @@ class MessageTypes(IntEnum): class SwapTypes(IntEnum): SELLER_FIRST = auto() BUYER_FIRST = auto() + SELLER_FIRST_2MSG = auto() + BUYER_FIRST_2MSG = auto() class OfferStates(IntEnum): @@ -136,6 +141,10 @@ class TxTypes(IntEnum): PTX_REFUND = auto() +class EventTypes(IntEnum): + ACCEPT_BID = auto() + + SEQUENCE_LOCK_BLOCKS = 1 SEQUENCE_LOCK_TIME = 2 ABS_LOCK_BLOCKS = 3 @@ -306,14 +315,21 @@ class BasicSwap(): self.settings = settings self.coin_clients = {} self.mxDB = threading.RLock() - self.last_expired = 0 - self.last_checked_progress = 0 - self.last_checked_watched = 0 - self.last_checked_expired = 0 self.debug = self.settings.get('debug', DEBUG) + self.check_progress_seconds = self.settings.get('check_progress_seconds', 60) self.check_watched_seconds = self.settings.get('check_watched_seconds', 60) self.check_expired_seconds = self.settings.get('check_expired_seconds', 60 * 5) + self.check_events_seconds = self.settings.get('check_events_seconds', 10) + self.last_checked_progress = 0 + self.last_checked_watched = 0 + self.last_checked_expired = 0 + self.last_checked_events = 0 + + # TODO: Adjust ranges + self.min_delay_auto_accept = self.settings.get('min_delay_auto_accept', 10) + self.max_delay_auto_accept = self.settings.get('max_delay_auto_accept', 60) + self.swaps_in_progress = dict() if self.chain == 'regtest': @@ -389,6 +405,8 @@ class BasicSwap(): # non-segwit # https://testnet.litecore.io/insight-api + random.seed(secrets.randbits(128)) + def prepareLogging(self): self.log = logging.getLogger(self.log_name) self.log.propagate = False @@ -1037,6 +1055,24 @@ class BasicSwap(): finally: self.mxDB.release() + def createEvent(self, delay, event_type, linked_id): + self.log.debug('createEvent %d %s', event_type, linked_id.hex()) + self.mxDB.acquire() + try: + session = scoped_session(self.session_factory) + event = EventQueue() + event.active_ind = 1 + event.created_at = int(time.time()) + event.trigger_at = event.created_at + delay + event.event_type = event_type + event.linked_id = linked_id + session.add(event) + session.commit() + session.close() + session.remove() + finally: + self.mxDB.release() + def postBid(self, offer_id, amount, addr_send_from=None): # Bid to send bid.amount * offer.rate of coin_to in exchange for bid.amount of coin_from self.log.debug('postBid %s %s', offer_id.hex(), format8(amount)) @@ -1173,7 +1209,7 @@ class BasicSwap(): pkhash_refund = getKeyID(pubkey_refund) if bid.initiate_tx is not None: - self.log.warning('initiate txn %s already exists for bid %s', bid.initiate_tx.txid, bid_id.hex()) + self.log.warning('Initiate txn %s already exists for bid %s', bid.initiate_tx.txid, bid_id.hex()) txid = bid.initiate_tx.txid script = bid.initiate_tx.script else: @@ -1185,7 +1221,7 @@ class BasicSwap(): lock_value = self.callcoinrpc(coin_from, 'getblockchaininfo')['blocks'] + offer.lock_value else: lock_value = int(time.time()) + offer.lock_value - self.log.debug('initiate %s lock_value %d %d', coin_from, offer.lock_value, lock_value) + self.log.debug('Initiate %s lock_value %d %d', coin_from, offer.lock_value, lock_value) script = buildContractScript(lock_value, secret_hash, bid.pkhash_buyer, pkhash_refund, OpCodes.OP_CHECKLOCKTIMEVERIFY) p2sh = self.callcoinrpc(Coins.PART, 'decodescript', [script.hex()])['p2sh'] @@ -2079,7 +2115,29 @@ class BasicSwap(): # TODO: remove offers from db - self.last_checked_expired = now + finally: + self.mxDB.release() + + def checkEvents(self): + self.mxDB.acquire() + try: + now = int(time.time()) + + session = scoped_session(self.session_factory) + + q = session.query(EventQueue).filter(EventQueue.trigger_at >= now) + for row in q: + + if row.event_type == EventTypes.ACCEPT_BID: + self.acceptBid(row.linked_id) + else: + self.log.warning('Unknown event type: %d', row.event_type) + + session.query.filter(EventQueue.event_id == row.event_id).delete() + + session.commit() + session.close() + session.remove() finally: self.mxDB.release() @@ -2228,8 +2286,10 @@ class BasicSwap(): if self.countAcceptedBids(offer_id) > 0: self.log.info('Not auto accepting bid %s, already have', bid_id.hex()) else: - self.log.info('Auto accepting bid %s', bid_id.hex()) - self.acceptBid(bid_id) + delay = random.randrange(self.min_delay_auto_accept, self.max_delay_auto_accept) + self.log.info('Auto accepting bid %s in %d seconds', bid_id.hex(), delay) + + self.createEvent(delay, EventTypes.ACCEPT_BID, bid_id) def processBidAccept(self, msg): self.log.debug('Processing bid accepted msg %s', msg['msgid']) @@ -2327,7 +2387,7 @@ class BasicSwap(): clear = self.zmqSubscriber.recv() if message[0] == 3: # Paid smsg - return # TODO: switch to paid? + return # TODO: Switch to paid? msg_id = message[2:] options = {'encoding': 'hex', 'setread': True} @@ -2350,7 +2410,7 @@ class BasicSwap(): try: # TODO: Wait for blocks / txns, would need to check multiple coins now = int(time.time()) - if now - self.last_checked_progress > self.check_progress_seconds: + if now - self.last_checked_progress >= self.check_progress_seconds: to_remove = [] for bid_id, v in self.swaps_in_progress.items(): try: @@ -2366,16 +2426,20 @@ class BasicSwap(): del self.swaps_in_progress[bid_id] self.last_checked_progress = now - now = int(time.time()) - if now - self.last_checked_watched > self.check_watched_seconds: + if now - self.last_checked_watched >= self.check_watched_seconds: for k, c in self.coin_clients.items(): if len(c['watched_outputs']) > 0: self.checkForSpends(k, c) self.last_checked_watched = now - # Expire messages - if int(time.time()) - self.last_checked_expired > self.check_expired_seconds: + if now - self.last_checked_expired >= self.check_expired_seconds: self.expireMessages() + self.last_checked_expired = now + + if now - self.last_checked_events >= self.check_events_seconds: + self.checkEvents() + self.last_checked_events = now + except Exception as ex: self.log.error('update %s', str(ex)) traceback.print_exc() diff --git a/basicswap/db.py b/basicswap/db.py index f493220..c04c727 100644 --- a/basicswap/db.py +++ b/basicswap/db.py @@ -16,12 +16,14 @@ Base = declarative_base() class DBKVInt(Base): __tablename__ = 'kv_int' + key = sa.Column(sa.String, primary_key=True) value = sa.Column(sa.Integer) class DBKVString(Base): __tablename__ = 'kv_string' + key = sa.Column(sa.String, primary_key=True) value = sa.Column(sa.String) @@ -177,16 +179,19 @@ class SentOffer(Base): class SmsgAddress(Base): __tablename__ = 'smsgaddresses' + addr_id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) addr = sa.Column(sa.String) use_type = sa.Column(sa.Integer) -# TODO: Delay responding to automated events class EventQueue(Base): __tablename__ = 'eventqueue' - addr_id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + + event_id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + active_ind = sa.Column(sa.Integer) created_at = sa.Column(sa.BigInteger) trigger_at = sa.Column(sa.BigInteger) linked_id = sa.Column(sa.LargeBinary) event_type = sa.Column(sa.Integer) + event_data = sa.Column(sa.LargeBinary) diff --git a/bin/basicswap_prepare.py b/bin/basicswap_prepare.py index 820fe31..9dd2c7f 100644 --- a/bin/basicswap_prepare.py +++ b/bin/basicswap_prepare.py @@ -40,7 +40,7 @@ else: BIN_ARCH = 'x86_64-linux-gnu.tar.gz' known_coins = { - 'particl': '0.18.1.5', + 'particl': '0.18.1.6', 'litecoin': '0.17.1', 'bitcoin': '0.18.1', 'namecoin': '0.18.0', diff --git a/tests/basicswap/test_reload.py b/tests/basicswap/test_reload.py index 02a655c..ec8543d 100644 --- a/tests/basicswap/test_reload.py +++ b/tests/basicswap/test_reload.py @@ -9,7 +9,7 @@ export TEST_RELOAD_PATH=/tmp/test_basicswap mkdir -p ${TEST_RELOAD_PATH}/bin/{particl,bitcoin} -cp ~/tmp/particl-0.18.1.5-x86_64-linux-gnu.tar.gz ${TEST_RELOAD_PATH}/bin/particl +cp ~/tmp/particl-0.18.1.6-x86_64-linux-gnu.tar.gz ${TEST_RELOAD_PATH}/bin/particl cp ~/tmp/bitcoin-0.18.1-x86_64-linux-gnu.tar.gz ${TEST_RELOAD_PATH}/bin/bitcoin export PYTHONPATH=$(pwd) python tests/basicswap/test_reload.py diff --git a/tests/basicswap/test_run.py b/tests/basicswap/test_run.py index 3c719fc..15d2922 100644 --- a/tests/basicswap/test_run.py +++ b/tests/basicswap/test_run.py @@ -166,7 +166,10 @@ def prepareDir(datadir, nodeId, network_key, network_pubkey): }, 'check_progress_seconds': 2, 'check_watched_seconds': 4, - 'check_expired_seconds': 60 + 'check_expired_seconds': 60, + 'check_events_seconds': 1, + 'min_delay_auto_accept': 1, + 'max_delay_auto_accept': 5 } with open(settings_path, 'w') as fp: json.dump(settings, fp, indent=4) @@ -360,7 +363,7 @@ class Test(unittest.TestCase): for i in range(seconds_for): time.sleep(1) bid = swap_client.getBid(bid_id) - if bid.state >= state: + if bid and bid.state >= state: return raise ValueError('wait_for_bid_state timed out.') @@ -558,6 +561,51 @@ class Test(unittest.TestCase): self.wait_for_bid_state(swap_clients[0], bid_id, BidStates.BID_ERROR, seconds_for=60) + def test_08_part_ltc_buyer_first(self): + logging.info('---------- Test PART to LTC, buyer first') + swap_clients = self.swap_clients + + offer_id = swap_clients[0].postOffer(Coins.PART, Coins.LTC, 100 * COIN, 0.1 * COIN, 100 * COIN, SwapTypes.BUYER_FIRST) + + return # TODO + + self.wait_for_offer(swap_clients[1], offer_id) + offers = swap_clients[1].listOffers() + assert(len(offers) == 1) + for offer in offers: + if offer.offer_id == offer_id: + bid_id = swap_clients[1].postBid(offer_id, offer.amount_from) + + self.wait_for_bid(swap_clients[0], bid_id) + + swap_clients[0].acceptBid(bid_id) + + self.wait_for_in_progress(swap_clients[1], bid_id, sent=True) + + self.wait_for_bid_state(swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, seconds_for=60) + self.wait_for_bid_state(swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, sent=True, seconds_for=60) + + js_0 = json.loads(urlopen('http://localhost:1800/json').read()) + js_1 = json.loads(urlopen('http://localhost:1801/json').read()) + assert(js_0['num_swapping'] == 0 and js_0['num_watched_outputs'] == 0) + assert(js_1['num_swapping'] == 0 and js_1['num_watched_outputs'] == 0) + + def test_09_part_ltc_auto_accept(self): + logging.info('---------- Test PART to LTC, auto aceept bid') + swap_clients = self.swap_clients + + offer_id = swap_clients[0].postOffer(Coins.PART, Coins.LTC, 100 * COIN, 0.1 * COIN, 100 * COIN, SwapTypes.SELLER_FIRST, auto_accept_bids=True) + + self.wait_for_offer(swap_clients[1], offer_id) + offers = swap_clients[1].listOffers() + assert(len(offers) == 1) + for offer in offers: + if offer.offer_id == offer_id: + bid_id = swap_clients[1].postBid(offer_id, offer.amount_from) + + self.wait_for_bid_state(swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, seconds_for=60) + self.wait_for_bid_state(swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, sent=True, seconds_for=60) + def pass_99_delay(self): global stop_test logging.info('Delay')