From 5cd8286aa3b7e522152dc3ef453643ef788726dc Mon Sep 17 00:00:00 2001 From: tecnovert Date: Tue, 15 Dec 2020 20:00:44 +0200 Subject: [PATCH] Start on network. --- basicswap/basicswap.py | 26 +- basicswap/network.py | 552 ++++++++++++++++++++++++++++++-- basicswap/rfc6979.py | 60 ++++ requirements.txt | 1 + tests/basicswap/test_network.py | 8 +- tests/basicswap/test_nmc.py | 4 +- tests/basicswap/test_run.py | 4 +- tests/basicswap/test_xmr.py | 2 +- 8 files changed, 626 insertions(+), 31 deletions(-) create mode 100644 basicswap/rfc6979.py diff --git a/basicswap/basicswap.py b/basicswap/basicswap.py index cfd5249..afec760 100644 --- a/basicswap/basicswap.py +++ b/basicswap/basicswap.py @@ -10,11 +10,12 @@ import zmq import json import time import base64 -import shutil import random +import shutil +import struct +import hashlib import logging import secrets -import hashlib import datetime as dt import traceback import sqlalchemy as sa @@ -387,8 +388,9 @@ def replaceAddrPrefix(addr, coin_type, chain_name, addr_type='pubkey_address'): return encodeAddress(bytes((chainparams[coin_type][chain_name][addr_type],)) + decodeAddress(addr)[1:]) -class WatchedOutput(): - # Watch for spends +class WatchedOutput(): # Watch for spends + __slots__ = ('bid_id', 'txid_hex', 'vout', 'tx_type', 'swap_type') + def __init__(self, bid_id, txid_hex, vout, tx_type, swap_type): self.bid_id = bid_id self.txid_hex = txid_hex @@ -398,6 +400,7 @@ class WatchedOutput(): class WatchedTransaction(): + # TODO # Watch for presence in mempool (getrawtransaction) def __init__(self, bid_id, txid_hex, tx_type, swap_type): self.bid_id = bid_id @@ -410,6 +413,9 @@ class BasicSwap(BaseApp): def __init__(self, fp, data_dir, settings, chain, log_name='BasicSwap'): super().__init__(fp, data_dir, settings, chain, log_name) + v = __version__.split('.') + self._version = struct.pack('>HHH', int(v[0]), int(v[1]), int(v[2])) + self.check_progress_seconds = self.settings.get('check_progress_seconds', 60) self.check_watched_seconds = self.settings.get('check_watched_seconds', 60) self.check_expired_seconds = self.settings.get('check_expired_seconds', 60 * 5) @@ -509,10 +515,14 @@ class BasicSwap(BaseApp): random.seed(secrets.randbits(128)) def finalise(self): + self.log.info('Finalise') + + with self.mxDB: + self.is_running = False + if self._network: self._network.stopNetwork() self._network = None - self.log.info('Finalise') def setCoinConnectParams(self, coin): # Set anything that does not require the daemon to be running @@ -662,7 +672,7 @@ class BasicSwap(BaseApp): if 'p2p_host' in self.settings: network_key = self.getNetworkKey(1) - self._network = bsn.Network(self.settings['p2p_host'], self.settings['p2p_port'], network_key) + self._network = bsn.Network(self.settings['p2p_host'], self.settings['p2p_port'], network_key, self) self._network.startNetwork() self.initialise() @@ -4740,3 +4750,7 @@ class BasicSwap(BaseApp): passed = self.callcoinrpc(Coins.PART, 'verifymessage', [offer_addr_from, signature_enc, offer_id.hex() + '_revoke']) return True if passed is True else False # _possibly_revoked_offers should not contain duplicates return False + + def add_connection(self, host, port, peer_pubkey): + self.log.info('add_connection %s %d %s', host, port, peer_pubkey.hex()) + self._network.add_connection(host, port, peer_pubkey) diff --git a/basicswap/network.py b/basicswap/network.py index 00f5fce..bd2dcdb 100644 --- a/basicswap/network.py +++ b/basicswap/network.py @@ -6,53 +6,569 @@ # file LICENSE or http://www.opensource.org/licenses/mit-license.php. ''' -TODO: + Message 2 bytes msg_class, 4 bytes length, [ 2 bytes msg_type, payload ] + + Handshake procedure: + node0 connecting to node1 + node0 send_handshake + node1 process_handshake + node1 send_ping - With a version field + node0 recv_ping + Both nodes are initialised ''' +import time +import queue +import random import select import socket +import struct +import hashlib import logging +import secrets import threading +import traceback + +from enum import IntEnum, auto +from collections import OrderedDict +from Crypto.Cipher import ChaCha20_Poly1305 # TODO: Add to libsecp256k1/coincurve fork +from coincurve.keys import PrivateKey, PublicKey +from basicswap.rfc6979 import ( + rfc6979_hmac_sha256_initialize, + rfc6979_hmac_sha256_generate) +START_TOKEN = 0xabcd +MSG_START_TOKEN = struct.pack('>H', START_TOKEN) + +MSG_MAX_SIZE = 0x200000 # 2MB + +MSG_HEADER_LEN = 8 + + +MAX_SEEN_EPHEM_KEYS = 1000 +TIMESTAMP_LEEWAY = 8 + + +class NetMessageTypes(IntEnum): + HANDSHAKE = auto() + PING = auto() + PONG = auto() + DATA = auto() + + @classmethod + def has_value(cls, value): + return value in cls._value2member_map_ + + +''' class NetMessage: def __init__(self): - self._msg_type + self._msg_class = None # 2 bytes + self._len = None # 4 bytes + self._msg_type = None # 2 bytes +''' + + +# Ensure handshake keys are not reused by including the time in the msg, mac and key hash +# Verify timestamp is not too old +# Add keys to db to catch concurrent attempts, records can be cleared periodically, the timestamp should catch older replay attempts +class MsgHandshake: + __slots__ = ('_timestamp', '_ephem_pk', '_ct', '_mac') + + def __init__(self): + pass + + def encode_aad(self): # Additional Authenticated Data + return struct.pack('>H', NetMessageTypes.HANDSHAKE) + \ + struct.pack('>Q', self._timestamp) + \ + self._ephem_pk + + def encode(self): + return self.encode_aad() + self._ct + self._mac + + def decode(self, msg_mv): + o = 2 + self._timestamp = struct.unpack('>Q', msg_mv[o: o + 8])[0] + o += 8 + self._ephem_pk = bytes(msg_mv[o: o + 33]) + o += 33 + self._ct = bytes(msg_mv[o: -16]) + self._mac = bytes(msg_mv[-16:]) class Peer: - def __init__(self, address): + __slots__ = ( + '_mx', '_pubkey', '_address', '_socket', '_version', '_ready', + '_connected_at', '_last_received_at', '_bytes_sent', '_bytes_received', + '_receiving_length', '_receiving_buffer', '_recv_messages', '_misbehaving_score', + '_ke', '_km', '_dir', '_sent_nonce', '_recv_nonce', '_last_handshake_at', + '_ping_nonce', '_last_ping_at', '_last_ping_rtt') + + def __init__(self, address, socket, pubkey): + self._mx = threading.Lock() + self._pubkey = pubkey self._address = address + self._socket = socket + self._version = None + self._ready = False # True When handshake is complete + self._connected_at = time.time() + self._last_received_at = 0 + self._last_handshake_at = 0 + + self._bytes_sent = 0 + self._bytes_received = 0 + + self._receiving_length = 0 + self._receiving_buffer = None + self._recv_messages = queue.Queue() # Built in mutex + self._misbehaving_score = 0 + + self._ping_nonce = 0 + self._last_ping_at = 0 # ms + self._last_ping_rtt = 0 # ms + + def close(self): + self._socket.close() + + +def listen_thread(cls): + timeout = 1.0 + + max_bytes = 0x10000 + while cls._running: + # logging.info('[rm] network loop %d', cls._running) + readable, writable, errored = select.select(cls._read_sockets, cls._write_sockets, cls._error_sockets, timeout) + cls._mx.acquire() + try: + disconnected_peers = [] + for s in readable: + if s == cls._socket: + peer_socket, address = cls._socket.accept() + logging.info('Connection from %s', address) + cls._peers.append(Peer(address, peer_socket, None)) + cls._error_sockets.append(peer_socket) + cls._read_sockets.append(peer_socket) + else: + for peer in cls._peers: + if peer._socket == s: + try: + bytes_recv = s.recv(max_bytes, socket.MSG_DONTWAIT) + except socket.error as se: + if se.args[0] not in (socket.EWOULDBLOCK, ): + logging.error('Receive error %s', str(se)) + disconnected_peers.append(peer) + continue + except Exception as e: + logging.error('Receive error %s', str(e)) + disconnected_peers.append(peer) + continue + + if len(bytes_recv) < 1: + disconnected_peers.append(peer) + continue + cls.receive_bytes(peer, bytes_recv) + + for s in errored: + logging.warning('Socket error') + + for peer in disconnected_peers: + cls.disconnect(peer) + finally: + cls._mx.release() + + +def msg_thread(cls): + timeout = 0.1 + while cls._running: + processed = False + + for peer in cls._peers: + try: + now_us = time.time_ns() // 1000 + if peer._ready is True: + if now_us - peer._last_ping_at >= 5000000: # 5 seconds TODO: Make variable + cls.send_ping(peer) + msg = peer._recv_messages.get(False) + cls.process_message(peer, msg) + processed = True + except queue.Empty: + pass + except Exception as e: + logging.warning('process message error %s', str(e)) + if cls._sc.debug: + traceback.print_exc() + + if processed is False: + time.sleep(timeout) class Network: - def __init__(self, p2p_host, p2p_port, network_key): + __slots__ = ( + '_p2p_host', '_p2p_port', '_network_key', '_network_pubkey', + '_sc', '_peers', '_max_connections', '_running', '_network_thread', '_msg_thread', + '_mx', '_socket', '_read_sockets', '_write_sockets', '_error_sockets', '_csprng', '_seen_ephem_keys') + + def __init__(self, p2p_host, p2p_port, network_key, swap_client): self._p2p_host = p2p_host self._p2p_port = p2p_port self._network_key = network_key + self._network_pubkey = PublicKey.from_secret(network_key).format() + self._sc = swap_client self._peers = [] self._max_connections = 10 - self._running = True + self._running = False self._network_thread = None + self._msg_thread = None self._mx = threading.Lock() + self._socket = None + self._read_sockets = [] + self._write_sockets = [] + self._error_sockets = [] # Check for error events + self._seen_ephem_keys = OrderedDict() def startNetwork(self): - pass + self._mx.acquire() + try: + self._csprng = rfc6979_hmac_sha256_initialize(secrets.token_bytes(32)) + + self._running = True + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._socket.bind((self._p2p_host, self._p2p_port)) + self._socket.listen(self._max_connections) + self._read_sockets.append(self._socket) + + self._network_thread = threading.Thread(target=listen_thread, args=(self,)) + self._network_thread.start() + + self._msg_thread = threading.Thread(target=msg_thread, args=(self,)) + self._msg_thread.start() + finally: + self._mx.release() def stopNetwork(self): - pass + self._mx.acquire() + try: + self._running = False + finally: + self._mx.release() - def listen(self): - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self._socket.bind((self._p2p_host, self._p2p_port)) - self._socket.listen(self._max_connections) + if self._network_thread: + self._network_thread.join() + if self._msg_thread: + self._msg_thread.join() - timeout = 1.0 - while self._running: - readable, writable, errored = select.select([self._socket], [], [], timeout) - for s in readable: - client_socket, address = self._socket.accept() - logging.info('Connection from %s', address) + self._mx.acquire() + try: + if self._socket: + self._socket.close() + + for peer in self._peers: + peer.close() + finally: + self._mx.release() + + def add_connection(self, host, port, peer_pubkey): + self._sc.log.info('Connecting from %s to %s at %s %d', self._network_pubkey.hex(), peer_pubkey.hex(), host, port) + self._mx.acquire() + try: + address = (host, port) + peer_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + peer_socket.connect(address) + peer = Peer(address, peer_socket, peer_pubkey) + self._peers.append(peer) + self._error_sockets.append(peer_socket) + self._read_sockets.append(peer_socket) + finally: + self._mx.release() + + self.send_handshake(peer) + + def disconnect(self, peer): + self._sc.log.info('Closing peer socket %s', peer._address) + self._read_sockets.pop(self._read_sockets.index(peer._socket)) + self._error_sockets.pop(self._error_sockets.index(peer._socket)) + peer.close() + self._peers.pop(self._peers.index(peer)) + + def check_handshake_ephem_key(self, peer, timestamp, ephem_pk, direction=1): + # assert ._mx.acquire() ? + + used = self._seen_ephem_keys.get(ephem_pk) + if used: + raise ValueError('Handshake ephem_pk reused %s peer %s', 'for' if direction == 1 else 'by', used[0]) + + self._seen_ephem_keys[ephem_pk] = (peer._address, timestamp) + + while len(self._seen_ephem_keys) > MAX_SEEN_EPHEM_KEYS: + self._seen_ephem_keys.popitem(last=False) + + def send_handshake(self, peer): + self._sc.log.debug('send_handshake %s', peer._address) + peer._mx.acquire() + try: + # TODO: Drain peer._recv_messages + if not peer._recv_messages.empty(): + self._sc.log.warning('send_handshake %s - Receive queue dumped.', peer._address) + while not peer._recv_messages.empty(): + peer._recv_messages.get(False) + + msg = MsgHandshake() + + msg._timestamp = int(time.time()) + key_r = rfc6979_hmac_sha256_generate(self._csprng, 32) + k = PrivateKey(key_r) + msg._ephem_pk = PublicKey.from_secret(key_r).format() + self.check_handshake_ephem_key(peer, msg._timestamp, msg._ephem_pk) + + ss = k.ecdh(peer._pubkey) + + hashed = hashlib.sha512(ss + struct.pack('>Q', msg._timestamp)).digest() + peer._ke = hashed[:32] + peer._km = hashed[32:] + + nonce = peer._km[24:] + + payload = self._sc._version + + nk = PrivateKey(self._network_key) + sig = nk.sign_recoverable(peer._km) + payload += sig + + aad = msg.encode_aad() + aad += nonce + cipher = ChaCha20_Poly1305.new(key=peer._ke, nonce=nonce) + cipher.update(aad) + msg._ct, msg._mac = cipher.encrypt_and_digest(payload) + + peer._sent_nonce = hashlib.sha256(nonce + msg._mac).digest() + peer._recv_nonce = hashlib.sha256(peer._km).digest() # Init nonce + + peer._last_handshake_at = msg._timestamp + peer._ready = False # Wait for peer to complete handshake + + self.send_msg(peer, msg) + finally: + peer._mx.release() + + def process_handshake(self, peer, msg_mv): + self._sc.log.debug('process_handshake %s', peer._address) + + # TODO: Drain peer._recv_messages + if not peer._recv_messages.empty(): + self._sc.log.warning('process_handshake %s - Receive queue dumped.', peer._address) + while not peer._recv_messages.empty(): + peer._recv_messages.get(False) + + msg = MsgHandshake() + msg.decode(msg_mv) + + try: + now = int(time.time()) + if now - peer._last_handshake_at < 30: + raise ValueError('Too many handshakes from peer %s', peer._address) + + if abs(msg._timestamp - now) > TIMESTAMP_LEEWAY: + raise ValueError('Bad handshake timestamp from peer %s', peer._address) + + self.check_handshake_ephem_key(peer, msg._timestamp, msg._ephem_pk, direction=2) + + nk = PrivateKey(self._network_key) + ss = nk.ecdh(msg._ephem_pk) + + hashed = hashlib.sha512(ss + struct.pack('>Q', msg._timestamp)).digest() + peer._ke = hashed[:32] + peer._km = hashed[32:] + + nonce = peer._km[24:] + + aad = msg.encode_aad() + aad += nonce + cipher = ChaCha20_Poly1305.new(key=peer._ke, nonce=nonce) + cipher.update(aad) + plaintext = cipher.decrypt_and_verify(msg._ct, msg._mac) # Will raise error if mac doesn't match + + peer._version = plaintext[:6] + sig = plaintext[6:] + + pk_peer = PublicKey.from_signature_and_message(sig, peer._km) + # TODO: Should pk_peer be linked to public data? + + peer._pubkey = pk_peer.format() + peer._recv_nonce = hashlib.sha256(nonce + msg._mac).digest() + peer._sent_nonce = hashlib.sha256(peer._km).digest() # Init nonce + + peer._last_handshake_at = msg._timestamp + peer._ready = True + # Schedule a ping to complete the handshake, TODO: Send here? + peer._last_ping_at = 0 + + except Exception as e: + # TODO: misbehaving + self._sc.log.debug('[rm] process_handshake %s', str(e)) + + def process_ping(self, peer, msg_mv): + nonce = peer._recv_nonce[:24] + + cipher = ChaCha20_Poly1305.new(key=peer._ke, nonce=nonce) + cipher.update(msg_mv[0: 2]) + cipher.update(nonce) + + mac = msg_mv[-16:] + plaintext = cipher.decrypt_and_verify(msg_mv[2: -16], mac) + + ping_nonce = struct.unpack('>I', plaintext[:4])[0] + # Version is added to a ping following a handshake message + if len(plaintext) >= 10: + peer._ready = True + version = plaintext[4: 10] + if peer._version is None: + peer._version = version + self._sc.log.debug('Set version from ping %s, %s', peer._pubkey.hex(), peer._version.hex()) + + peer._recv_nonce = hashlib.sha256(nonce + mac).digest() + + self.send_pong(peer, ping_nonce) + + def process_pong(self, peer, msg_mv): + nonce = peer._recv_nonce[:24] + + cipher = ChaCha20_Poly1305.new(key=peer._ke, nonce=nonce) + cipher.update(msg_mv[0: 2]) + cipher.update(nonce) + + mac = msg_mv[-16:] + plaintext = cipher.decrypt_and_verify(msg_mv[2: -16], mac) + + pong_nonce = struct.unpack('>I', plaintext[:4])[0] + + if pong_nonce == peer._ping_nonce: + peer._last_ping_rtt = (time.time_ns() // 1000) - peer._last_ping_at + else: + self._sc.log.debug('Pong received out of order %s', peer._address) + + peer._recv_nonce = hashlib.sha256(nonce + mac).digest() + + def send_ping(self, peer): + ping_nonce = random.getrandbits(32) + + msg_bytes = struct.pack('>H', NetMessageTypes.PING) + nonce = peer._sent_nonce[:24] + + cipher = ChaCha20_Poly1305.new(key=peer._ke, nonce=nonce) + cipher.update(msg_bytes) + cipher.update(nonce) + + payload = struct.pack('>I', ping_nonce) + if peer._last_ping_at == 0: + payload += self._sc._version + ct, mac = cipher.encrypt_and_digest(payload) + + msg_bytes += ct + mac + + peer._sent_nonce = hashlib.sha256(nonce + mac).digest() + + peer._last_ping_at = time.time_ns() // 1000 + peer._ping_nonce = ping_nonce + + self.send_msg(peer, msg_bytes) + + def send_pong(self, peer, ping_nonce): + msg_bytes = struct.pack('>H', NetMessageTypes.PONG) + nonce = peer._sent_nonce[:24] + + cipher = ChaCha20_Poly1305.new(key=peer._ke, nonce=nonce) + cipher.update(msg_bytes) + cipher.update(nonce) + + payload = struct.pack('>I', ping_nonce) + ct, mac = cipher.encrypt_and_digest(payload) + msg_bytes += ct + mac + + peer._sent_nonce = hashlib.sha256(nonce + mac).digest() + + self.send_msg(peer, msg_bytes) + + def send_msg(self, peer, msg): + msg_encoded = msg if isinstance(msg, bytes) else msg.encode() + len_encoded = len(msg_encoded) + + msg_packed = bytearray(MSG_START_TOKEN) + struct.pack('>I', len_encoded) + msg_encoded + peer._socket.sendall(msg_packed) + + peer._bytes_sent += len_encoded + + def process_message(self, peer, msg_bytes): + logging.info('[rm] process_message %s len %d', peer._address, len(msg_bytes)) + + peer._mx.acquire() + try: + mv = memoryview(msg_bytes) + o = 0 + msg_type = struct.unpack('>H', mv[o: o + 2])[0] + if msg_type == NetMessageTypes.HANDSHAKE: + self.process_handshake(peer, mv) + elif msg_type == NetMessageTypes.PING: + self.process_ping(peer, mv) + elif msg_type == NetMessageTypes.PONG: + self.process_pong(peer, mv) + else: + self._sc.log.debug('Unknown message type %d', msg_type) + finally: + peer._mx.release() + + def receive_bytes(self, peer, bytes_recv): + # logging.info('[rm] receive_bytes %s %s', peer._address, bytes_recv) + + len_received = len(bytes_recv) + peer._last_received_at = time.time() + peer._bytes_received += len_received + + invalid_msg = False + mv = memoryview(bytes_recv) + + o = 0 + try: + while o < len_received: + if peer._receiving_length == 0: + if len(bytes_recv) < MSG_HEADER_LEN: + raise ValueError('Msg too short') + + if mv[o: o + 2] != MSG_START_TOKEN: + raise ValueError('Invalid start token') + o += 2 + + msg_len = struct.unpack('>I', mv[o: o + 4])[0] + o += 4 + if msg_len < 2 or msg_len > MSG_MAX_SIZE: + raise ValueError('Invalid data length') + + # Precheck msg_type + msg_type = struct.unpack('>H', mv[o: o + 2])[0] + # o += 2 # Don't inc offset, msg includes type + if not NetMessageTypes.has_value(msg_type): + raise ValueError('Invalid msg type') + + peer._receiving_length = msg_len + len_pkt = (len_received - o) + nc = msg_len if len_pkt > msg_len else len_pkt + + peer._receiving_buffer = mv[o: o + nc] + o += nc + else: + len_to_go = peer._receiving_length - len(peer._receiving_buffer) + len_pkt = (len_received - o) + nc = len_to_go if len_pkt > len_to_go else len_pkt + peer._receiving_buffer = mv[o: o + nc] + o += nc + if len(peer._receiving_buffer) == peer._receiving_length: + peer._recv_messages.put(peer._receiving_buffer) + peer._receiving_length = 0 + + except Exception as e: + if self._sc.debug: + self._sc.log.error('Invalid message received from %s %s', peer._address, str(e)) + # TODO: misbehaving diff --git a/basicswap/rfc6979.py b/basicswap/rfc6979.py new file mode 100644 index 0000000..c9a2316 --- /dev/null +++ b/basicswap/rfc6979.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import hmac +import hashlib + + +zero = bytes((0,)) +one = bytes((1,)) + + +def rfc6979_hmac_sha256_initialize(key): + rng_v = one * 32 # RFC6979 3.2.b. + rng_k = zero * 32 # RFC6979 3.2.c. + + # RFC6979 3.2.d. + h = hmac.new(rng_k, digestmod=hashlib.sha256) + h.update(rng_v) + h.update(zero) + h.update(key) + rng_k = h.digest() + + h = hmac.new(rng_k, digestmod=hashlib.sha256) + h.update(rng_v) + rng_v = h.digest() + + # RFC6979 3.2.f. + h = hmac.new(rng_k, digestmod=hashlib.sha256) + h.update(rng_v) + h.update(one) + h.update(key) + rng_k = h.digest() + h = hmac.new(rng_k, digestmod=hashlib.sha256) + h.update(rng_v) + rng_v = h.digest() + + return [rng_k, rng_v, False] + + +def rfc6979_hmac_sha256_generate(rng, n): + if rng[2]: # Retry + h = hmac.new(rng[0], digestmod=hashlib.sha256) + h.update(rng[1]) + h.update(zero) + rng[0] = h.digest() + h = hmac.new(rng[0], digestmod=hashlib.sha256) + h.update(rng[1]) + rng[1] = h.digest() + + out = bytes() + while n > 0: + i = n if n < 32 else 32 + h = hmac.new(rng[0], digestmod=hashlib.sha256) + h.update(rng[1]) + rng[1] = h.digest() + out += rng[1][:i] + n -= i + + rng[2] = True + return out diff --git a/requirements.txt b/requirements.txt index 5834b9a..fef3f16 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ sqlalchemy python-gnupg Jinja2 requests +pycryptodome diff --git a/tests/basicswap/test_network.py b/tests/basicswap/test_network.py index 27c6518..5c40dad 100644 --- a/tests/basicswap/test_network.py +++ b/tests/basicswap/test_network.py @@ -43,6 +43,7 @@ from tests.basicswap.common import ( checkForks, stopDaemons, wait_for_offer, + delay_for, TEST_HTTP_HOST, TEST_HTTP_PORT, BASE_P2P_PORT, @@ -142,13 +143,12 @@ def callnoderpc(node_id, method, params=[], wallet=None, base_rpc_port=BASE_RPC_ def run_coins_loop(cls): while not stop_test: - time.sleep(1.0) try: if cls.btc_addr is not None: btcRpc('generatetoaddress 1 {}'.format(cls.btc_addr)) except Exception as e: logging.warning('run_coins_loop ' + str(e)) - + time.sleep(1.0) def run_loop(cls): @@ -303,6 +303,10 @@ class Test(unittest.TestCase): offer_id = swap_clients[0].postOffer(Coins.PART, Coins.BTC, 100 * COIN, 0.1 * COIN, 100 * COIN, SwapTypes.SELLER_FIRST) wait_for_offer(delay_event, swap_clients[1], offer_id) + swap_clients[1].add_connection('127.0.0.1', BASE_P2P_PORT + 0, swap_clients[0]._network._network_pubkey) + + delay_for(delay_event, 1000) + if __name__ == '__main__': unittest.main() diff --git a/tests/basicswap/test_nmc.py b/tests/basicswap/test_nmc.py index a5589a1..e5c3394 100644 --- a/tests/basicswap/test_nmc.py +++ b/tests/basicswap/test_nmc.py @@ -206,19 +206,19 @@ def signal_handler(sig, frame): def run_coins_loop(cls): while not stop_test: - time.sleep(1.0) try: nmcRpc('generatetoaddress 1 {}'.format(cls.nmc_addr)) btcRpc('generatetoaddress 1 {}'.format(cls.btc_addr)) except Exception as e: logging.warning('run_coins_loop ' + str(e)) + time.sleep(1.0) def run_loop(self): while not stop_test: - time.sleep(1) for c in self.swap_clients: c.update() + time.sleep(1) def make_part_cli_rpc_func(node_id): diff --git a/tests/basicswap/test_run.py b/tests/basicswap/test_run.py index 8e9048d..f202e9f 100644 --- a/tests/basicswap/test_run.py +++ b/tests/basicswap/test_run.py @@ -215,18 +215,18 @@ def signal_handler(sig, frame): def run_coins_loop(cls): while not stop_test: try: - time.sleep(1.0) ltcRpc('generatetoaddress 1 {}'.format(cls.ltc_addr)) btcRpc('generatetoaddress 1 {}'.format(cls.btc_addr)) except Exception as e: logging.warning('run_coins_loop ' + str(e)) + time.sleep(1.0) def run_loop(cls): while not stop_test: - time.sleep(1) for c in cls.swap_clients: c.update() + time.sleep(1) def make_part_cli_rpc_func(node_id): diff --git a/tests/basicswap/test_xmr.py b/tests/basicswap/test_xmr.py index f6464e6..5a2797c 100644 --- a/tests/basicswap/test_xmr.py +++ b/tests/basicswap/test_xmr.py @@ -238,7 +238,6 @@ def callnoderpc(node_id, method, params=[], wallet=None, base_rpc_port=BASE_RPC_ def run_coins_loop(cls): while not stop_test: - time.sleep(1.0) try: if cls.btc_addr is not None: btcRpc('generatetoaddress 1 {}'.format(cls.btc_addr)) @@ -246,6 +245,7 @@ def run_coins_loop(cls): callrpc_xmr_na(XMR_BASE_RPC_PORT + 1, 'generateblocks', {'wallet_address': cls.xmr_addr, 'amount_of_blocks': 1}) except Exception as e: logging.warning('run_coins_loop ' + str(e)) + time.sleep(1.0) def run_loop(cls):