Delayed events.

This commit is contained in:
tecnovert 2019-11-09 23:09:22 +02:00
parent dc0b078107
commit c7c49ae262
No known key found for this signature in database
GPG key ID: 8ED6D8750C4E3F93
6 changed files with 139 additions and 22 deletions

View file

@ -7,7 +7,7 @@ stages:
- test
env:
global:
- PART_VERSION=0.18.1.5
- PART_VERSION=0.18.1.6
- BTC_VERSION=0.18.1
- LTC_VERSION=0.17.1
- TEST_DIR=~/test_basicswap2/

View file

@ -17,6 +17,8 @@ import logging
import sqlalchemy as sa
import shutil
import json
import random
import secrets
from sqlalchemy.orm import sessionmaker, scoped_session
from enum import IntEnum, auto
@ -55,6 +57,7 @@ from .db import (
PooledAddress,
SentOffer,
SmsgAddress,
EventQueue,
)
from .explorers import ExplorerInsight, ExplorerBitAps, ExplorerChainz
@ -80,6 +83,8 @@ class MessageTypes(IntEnum):
class SwapTypes(IntEnum):
SELLER_FIRST = auto()
BUYER_FIRST = auto()
SELLER_FIRST_2MSG = auto()
BUYER_FIRST_2MSG = auto()
class OfferStates(IntEnum):
@ -136,6 +141,10 @@ class TxTypes(IntEnum):
PTX_REFUND = auto()
class EventTypes(IntEnum):
ACCEPT_BID = auto()
SEQUENCE_LOCK_BLOCKS = 1
SEQUENCE_LOCK_TIME = 2
ABS_LOCK_BLOCKS = 3
@ -306,14 +315,21 @@ class BasicSwap():
self.settings = settings
self.coin_clients = {}
self.mxDB = threading.RLock()
self.last_expired = 0
self.last_checked_progress = 0
self.last_checked_watched = 0
self.last_checked_expired = 0
self.debug = self.settings.get('debug', DEBUG)
self.check_progress_seconds = self.settings.get('check_progress_seconds', 60)
self.check_watched_seconds = self.settings.get('check_watched_seconds', 60)
self.check_expired_seconds = self.settings.get('check_expired_seconds', 60 * 5)
self.check_events_seconds = self.settings.get('check_events_seconds', 10)
self.last_checked_progress = 0
self.last_checked_watched = 0
self.last_checked_expired = 0
self.last_checked_events = 0
# TODO: Adjust ranges
self.min_delay_auto_accept = self.settings.get('min_delay_auto_accept', 10)
self.max_delay_auto_accept = self.settings.get('max_delay_auto_accept', 60)
self.swaps_in_progress = dict()
if self.chain == 'regtest':
@ -389,6 +405,8 @@ class BasicSwap():
# non-segwit
# https://testnet.litecore.io/insight-api
random.seed(secrets.randbits(128))
def prepareLogging(self):
self.log = logging.getLogger(self.log_name)
self.log.propagate = False
@ -1037,6 +1055,24 @@ class BasicSwap():
finally:
self.mxDB.release()
def createEvent(self, delay, event_type, linked_id):
self.log.debug('createEvent %d %s', event_type, linked_id.hex())
self.mxDB.acquire()
try:
session = scoped_session(self.session_factory)
event = EventQueue()
event.active_ind = 1
event.created_at = int(time.time())
event.trigger_at = event.created_at + delay
event.event_type = event_type
event.linked_id = linked_id
session.add(event)
session.commit()
session.close()
session.remove()
finally:
self.mxDB.release()
def postBid(self, offer_id, amount, addr_send_from=None):
# Bid to send bid.amount * offer.rate of coin_to in exchange for bid.amount of coin_from
self.log.debug('postBid %s %s', offer_id.hex(), format8(amount))
@ -1173,7 +1209,7 @@ class BasicSwap():
pkhash_refund = getKeyID(pubkey_refund)
if bid.initiate_tx is not None:
self.log.warning('initiate txn %s already exists for bid %s', bid.initiate_tx.txid, bid_id.hex())
self.log.warning('Initiate txn %s already exists for bid %s', bid.initiate_tx.txid, bid_id.hex())
txid = bid.initiate_tx.txid
script = bid.initiate_tx.script
else:
@ -1185,7 +1221,7 @@ class BasicSwap():
lock_value = self.callcoinrpc(coin_from, 'getblockchaininfo')['blocks'] + offer.lock_value
else:
lock_value = int(time.time()) + offer.lock_value
self.log.debug('initiate %s lock_value %d %d', coin_from, offer.lock_value, lock_value)
self.log.debug('Initiate %s lock_value %d %d', coin_from, offer.lock_value, lock_value)
script = buildContractScript(lock_value, secret_hash, bid.pkhash_buyer, pkhash_refund, OpCodes.OP_CHECKLOCKTIMEVERIFY)
p2sh = self.callcoinrpc(Coins.PART, 'decodescript', [script.hex()])['p2sh']
@ -2079,7 +2115,29 @@ class BasicSwap():
# TODO: remove offers from db
self.last_checked_expired = now
finally:
self.mxDB.release()
def checkEvents(self):
self.mxDB.acquire()
try:
now = int(time.time())
session = scoped_session(self.session_factory)
q = session.query(EventQueue).filter(EventQueue.trigger_at >= now)
for row in q:
if row.event_type == EventTypes.ACCEPT_BID:
self.acceptBid(row.linked_id)
else:
self.log.warning('Unknown event type: %d', row.event_type)
session.query.filter(EventQueue.event_id == row.event_id).delete()
session.commit()
session.close()
session.remove()
finally:
self.mxDB.release()
@ -2228,8 +2286,10 @@ class BasicSwap():
if self.countAcceptedBids(offer_id) > 0:
self.log.info('Not auto accepting bid %s, already have', bid_id.hex())
else:
self.log.info('Auto accepting bid %s', bid_id.hex())
self.acceptBid(bid_id)
delay = random.randrange(self.min_delay_auto_accept, self.max_delay_auto_accept)
self.log.info('Auto accepting bid %s in %d seconds', bid_id.hex(), delay)
self.createEvent(delay, EventTypes.ACCEPT_BID, bid_id)
def processBidAccept(self, msg):
self.log.debug('Processing bid accepted msg %s', msg['msgid'])
@ -2327,7 +2387,7 @@ class BasicSwap():
clear = self.zmqSubscriber.recv()
if message[0] == 3: # Paid smsg
return # TODO: switch to paid?
return # TODO: Switch to paid?
msg_id = message[2:]
options = {'encoding': 'hex', 'setread': True}
@ -2350,7 +2410,7 @@ class BasicSwap():
try:
# TODO: Wait for blocks / txns, would need to check multiple coins
now = int(time.time())
if now - self.last_checked_progress > self.check_progress_seconds:
if now - self.last_checked_progress >= self.check_progress_seconds:
to_remove = []
for bid_id, v in self.swaps_in_progress.items():
try:
@ -2366,16 +2426,20 @@ class BasicSwap():
del self.swaps_in_progress[bid_id]
self.last_checked_progress = now
now = int(time.time())
if now - self.last_checked_watched > self.check_watched_seconds:
if now - self.last_checked_watched >= self.check_watched_seconds:
for k, c in self.coin_clients.items():
if len(c['watched_outputs']) > 0:
self.checkForSpends(k, c)
self.last_checked_watched = now
# Expire messages
if int(time.time()) - self.last_checked_expired > self.check_expired_seconds:
if now - self.last_checked_expired >= self.check_expired_seconds:
self.expireMessages()
self.last_checked_expired = now
if now - self.last_checked_events >= self.check_events_seconds:
self.checkEvents()
self.last_checked_events = now
except Exception as ex:
self.log.error('update %s', str(ex))
traceback.print_exc()

View file

@ -16,12 +16,14 @@ Base = declarative_base()
class DBKVInt(Base):
__tablename__ = 'kv_int'
key = sa.Column(sa.String, primary_key=True)
value = sa.Column(sa.Integer)
class DBKVString(Base):
__tablename__ = 'kv_string'
key = sa.Column(sa.String, primary_key=True)
value = sa.Column(sa.String)
@ -177,16 +179,19 @@ class SentOffer(Base):
class SmsgAddress(Base):
__tablename__ = 'smsgaddresses'
addr_id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
addr = sa.Column(sa.String)
use_type = sa.Column(sa.Integer)
# TODO: Delay responding to automated events
class EventQueue(Base):
__tablename__ = 'eventqueue'
addr_id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
event_id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
active_ind = sa.Column(sa.Integer)
created_at = sa.Column(sa.BigInteger)
trigger_at = sa.Column(sa.BigInteger)
linked_id = sa.Column(sa.LargeBinary)
event_type = sa.Column(sa.Integer)
event_data = sa.Column(sa.LargeBinary)

View file

@ -40,7 +40,7 @@ else:
BIN_ARCH = 'x86_64-linux-gnu.tar.gz'
known_coins = {
'particl': '0.18.1.5',
'particl': '0.18.1.6',
'litecoin': '0.17.1',
'bitcoin': '0.18.1',
'namecoin': '0.18.0',

View file

@ -9,7 +9,7 @@
export TEST_RELOAD_PATH=/tmp/test_basicswap
mkdir -p ${TEST_RELOAD_PATH}/bin/{particl,bitcoin}
cp ~/tmp/particl-0.18.1.5-x86_64-linux-gnu.tar.gz ${TEST_RELOAD_PATH}/bin/particl
cp ~/tmp/particl-0.18.1.6-x86_64-linux-gnu.tar.gz ${TEST_RELOAD_PATH}/bin/particl
cp ~/tmp/bitcoin-0.18.1-x86_64-linux-gnu.tar.gz ${TEST_RELOAD_PATH}/bin/bitcoin
export PYTHONPATH=$(pwd)
python tests/basicswap/test_reload.py

View file

@ -166,7 +166,10 @@ def prepareDir(datadir, nodeId, network_key, network_pubkey):
},
'check_progress_seconds': 2,
'check_watched_seconds': 4,
'check_expired_seconds': 60
'check_expired_seconds': 60,
'check_events_seconds': 1,
'min_delay_auto_accept': 1,
'max_delay_auto_accept': 5
}
with open(settings_path, 'w') as fp:
json.dump(settings, fp, indent=4)
@ -360,7 +363,7 @@ class Test(unittest.TestCase):
for i in range(seconds_for):
time.sleep(1)
bid = swap_client.getBid(bid_id)
if bid.state >= state:
if bid and bid.state >= state:
return
raise ValueError('wait_for_bid_state timed out.')
@ -558,6 +561,51 @@ class Test(unittest.TestCase):
self.wait_for_bid_state(swap_clients[0], bid_id, BidStates.BID_ERROR, seconds_for=60)
def test_08_part_ltc_buyer_first(self):
logging.info('---------- Test PART to LTC, buyer first')
swap_clients = self.swap_clients
offer_id = swap_clients[0].postOffer(Coins.PART, Coins.LTC, 100 * COIN, 0.1 * COIN, 100 * COIN, SwapTypes.BUYER_FIRST)
return # TODO
self.wait_for_offer(swap_clients[1], offer_id)
offers = swap_clients[1].listOffers()
assert(len(offers) == 1)
for offer in offers:
if offer.offer_id == offer_id:
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
self.wait_for_bid(swap_clients[0], bid_id)
swap_clients[0].acceptBid(bid_id)
self.wait_for_in_progress(swap_clients[1], bid_id, sent=True)
self.wait_for_bid_state(swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, seconds_for=60)
self.wait_for_bid_state(swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, sent=True, seconds_for=60)
js_0 = json.loads(urlopen('http://localhost:1800/json').read())
js_1 = json.loads(urlopen('http://localhost:1801/json').read())
assert(js_0['num_swapping'] == 0 and js_0['num_watched_outputs'] == 0)
assert(js_1['num_swapping'] == 0 and js_1['num_watched_outputs'] == 0)
def test_09_part_ltc_auto_accept(self):
logging.info('---------- Test PART to LTC, auto aceept bid')
swap_clients = self.swap_clients
offer_id = swap_clients[0].postOffer(Coins.PART, Coins.LTC, 100 * COIN, 0.1 * COIN, 100 * COIN, SwapTypes.SELLER_FIRST, auto_accept_bids=True)
self.wait_for_offer(swap_clients[1], offer_id)
offers = swap_clients[1].listOffers()
assert(len(offers) == 1)
for offer in offers:
if offer.offer_id == offer_id:
bid_id = swap_clients[1].postBid(offer_id, offer.amount_from)
self.wait_for_bid_state(swap_clients[0], bid_id, BidStates.SWAP_COMPLETED, seconds_for=60)
self.wait_for_bid_state(swap_clients[1], bid_id, BidStates.SWAP_COMPLETED, sent=True, seconds_for=60)
def pass_99_delay(self):
global stop_test
logging.info('Delay')