From eb9eb49bd9f07841dbbd43927a5f1621504f78fc Mon Sep 17 00:00:00 2001 From: tecnovert Date: Thu, 13 Oct 2022 22:21:43 +0200 Subject: [PATCH] Persistent notifications. --- basicswap/__init__.py | 2 +- basicswap/basicswap.py | 135 +++++++++++++++++++------------- basicswap/db.py | 12 ++- basicswap/db_upgrades.py | 10 +++ basicswap/http_server.py | 3 + basicswap/js_server.py | 50 ++++++++---- basicswap/templates/header.html | 3 + basicswap/util/__init__.py | 1 + 8 files changed, 145 insertions(+), 71 deletions(-) diff --git a/basicswap/__init__.py b/basicswap/__init__.py index 92b8429..94d2ad8 100644 --- a/basicswap/__init__.py +++ b/basicswap/__init__.py @@ -1,3 +1,3 @@ name = "basicswap" -__version__ = "0.11.38" +__version__ = "0.11.39" diff --git a/basicswap/basicswap.py b/basicswap/basicswap.py index 779975b..c847f3d 100644 --- a/basicswap/basicswap.py +++ b/basicswap/basicswap.py @@ -96,6 +96,7 @@ from .db import ( XmrSwap, XmrSplitData, Wallets, + Notification, KnownIdentity, AutomationLink, AutomationStrategy, @@ -235,6 +236,12 @@ class BasicSwap(BaseApp): self._updating_wallets_info = {} self._last_updated_wallets_info = 0 + self._notifications_enabled = self.settings.get('notifications_enabled', True) + self._disabled_notification_types = self.settings.get('disabled_notification_types', []) + self._keep_notifications = self.settings.get('keep_notifications', 50) + self._show_notifications = self.settings.get('show_notifications', 10) + self._notifications_cache = {} + # TODO: Adjust ranges self.min_delay_event = self.settings.get('min_delay_event', 10) self.max_delay_event = self.settings.get('max_delay_event', 60) @@ -304,6 +311,7 @@ class BasicSwap(BaseApp): value=self._contract_count )) session.commit() + session.close() session.remove() @@ -364,6 +372,19 @@ class BasicSwap(BaseApp): close_all_sessions() self.engine.dispose() + def openSession(self, session=None): + if session: + return session + self.mxDB.acquire() + return scoped_session(self.session_factory) + + def closeSession(self, use_session, commit=True): + if commit: + use_session.commit() + use_session.close() + use_session.remove() + self.mxDB.release() + def setCoinConnectParams(self, coin): # Set anything that does not require the daemon to be running chain_client_settings = self.getChainClientSettings(coin) @@ -831,13 +852,8 @@ class BasicSwap(BaseApp): if ptx_state is not None and ptx_state != TxStates.TX_REFUNDED: self.returnAddressToPool(bid.bid_id, TxTypes.PTX_REFUND) - use_session = None try: - if session: - use_session = session - else: - self.mxDB.acquire() - use_session = scoped_session(self.session_factory) + use_session = self.openSession(session) # Remove any delayed events if self.debug: @@ -864,10 +880,7 @@ class BasicSwap(BaseApp): finally: if session is None: - use_session.commit() - use_session.close() - use_session.remove() - self.mxDB.release() + self.closeSession(use_session) def loadFromDB(self): self.log.info('Loading data from db') @@ -890,6 +903,7 @@ class BasicSwap(BaseApp): self.log.error('Further error deactivating: %s', str(ex)) if self.debug: self.log.error(traceback.format_exc()) + self.buildNotificationsCache(session) finally: session.close() session.remove() @@ -957,25 +971,67 @@ class BasicSwap(BaseApp): if coin_from == Coins.PIVX and swap_type == SwapTypes.XMR_SWAP: raise ValueError('TODO: PIVX -> XMR') - def notify(self, event_type, event_data): + def notify(self, event_type, event_data, session=None): + + show_event = event_type not in self._disabled_notification_types if event_type == NT.OFFER_RECEIVED: self.log.debug('Received new offer %s', event_data['offer_id']) - if self.ws_server: + if self.ws_server and show_event: event_data['event'] = 'new_offer' self.ws_server.send_message_to_all(json.dumps(event_data)) elif event_type == NT.BID_RECEIVED: self.log.info('Received valid bid %s for %s offer %s', event_data['bid_id'], event_data['type'], event_data['offer_id']) - if self.ws_server: + if self.ws_server and show_event: event_data['event'] = 'new_bid' self.ws_server.send_message_to_all(json.dumps(event_data)) elif event_type == NT.BID_ACCEPTED: self.log.info('Received valid bid accept for %s', event_data['bid_id']) - if self.ws_server: + if self.ws_server and show_event: event_data['event'] = 'bid_accepted' self.ws_server.send_message_to_all(json.dumps(event_data)) else: self.log.warning(f'Unknown notification {event_type}') + try: + now = int(time.time()) + use_session = self.openSession(session) + use_session.add(Notification( + active_ind=1, + created_at=now, + event_type=int(event_type), + event_data=bytes(json.dumps(event_data), 'UTF-8'), + )) + + use_session.execute(f'DELETE FROM notifications WHERE record_id NOT IN (SELECT record_id FROM notifications WHERE active_ind=1 ORDER BY created_at ASC LIMIT {self._keep_notifications})') + + if show_event: + self._notifications_cache[now] = (event_type, event_data) + while len(self._notifications_cache) > self._show_notifications: + # dicts preserve insertion order in Python 3.7+ + self._notifications_cache.pop(next(iter(self._notifications_cache))) + + finally: + if session is None: + self.closeSession(use_session) + + def buildNotificationsCache(self, session): + q = session.execute(f'SELECT created_at, event_type, event_data FROM notifications WHERE active_ind=1 ORDER BY created_at ASC LIMIT {self._show_notifications}') + for entry in q: + self._notifications_cache[entry[0]] = (entry[1], json.loads(entry[2].decode('UTF-8'))) + + def getNotifications(self): + rv = [] + for k, v in self._notifications_cache.items(): + rv.append((k, int(v[0]), json.dumps(v[1]))) + return rv + + def vacuumDB(self): + try: + session = self.openSession() + return session.execute('VACUUM') + finally: + self.closeSession(session) + def validateOfferAmounts(self, coin_from, coin_to, amount, rate, min_bid_amount): ci_from = self.ci(coin_from) ci_to = self.ci(coin_to) @@ -1846,30 +1902,19 @@ class BasicSwap(BaseApp): self.mxDB.release() def getBid(self, bid_id, session=None): - use_session = None try: - if session: - use_session = session - else: - self.mxDB.acquire() - use_session = scoped_session(self.session_factory) + use_session = self.openSession(session) bid = use_session.query(Bid).filter_by(bid_id=bid_id).first() if bid: self.loadBidTxns(bid, use_session) return bid finally: if session is None: - use_session.close() - use_session.remove() - self.mxDB.release() + self.closeSession(use_session, commit=False) def getBidAndOffer(self, bid_id, session=None): try: - if session: - use_session = session - else: - self.mxDB.acquire() - use_session = scoped_session(self.session_factory) + use_session = self.openSession(session) bid = use_session.query(Bid).filter_by(bid_id=bid_id).first() offer = None if bid: @@ -1878,9 +1923,7 @@ class BasicSwap(BaseApp): return bid, offer finally: if session is None: - use_session.close() - use_session.remove() - self.mxDB.release() + self.closeSession(use_session, commit=False) def getXmrBidAndOffer(self, bid_id, list_events=True): self.mxDB.acquire() @@ -3798,7 +3841,7 @@ class BasicSwap(BaseApp): session.add(xmr_offer) - self.notify(NT.OFFER_RECEIVED, {'offer_id': offer_id.hex()}) + self.notify(NT.OFFER_RECEIVED, {'offer_id': offer_id.hex()}, session) else: existing_offer.setState(OfferStates.OFFER_RECEIVED) session.add(existing_offer) @@ -3868,13 +3911,8 @@ class BasicSwap(BaseApp): return bids, total_value def shouldAutoAcceptBid(self, offer, bid, session=None): - use_session = None try: - if session: - use_session = session - else: - self.mxDB.acquire() - use_session = scoped_session(self.session_factory) + use_session = self.openSession(session) link = use_session.query(AutomationLink).filter_by(active_ind=1, linked_type=Concepts.OFFER, linked_id=offer.offer_id).first() if not link: @@ -3943,10 +3981,7 @@ class BasicSwap(BaseApp): return False finally: if session is None: - use_session.commit() - use_session.close() - use_session.remove() - self.mxDB.release() + self.closeSession(use_session) def processBid(self, msg): self.log.debug('Processing bid msg %s', msg['msgid']) @@ -4155,7 +4190,7 @@ class BasicSwap(BaseApp): ensure(ci_to.verifyKey(xmr_swap.vkbvf), 'Invalid key, vkbvf') ensure(ci_from.verifyPubkey(xmr_swap.pkaf), 'Invalid pubkey, pkaf') - self.notify(NT.BID_RECEIVED, {'type': 'xmr', 'bid_id': bid.bid_id.hex(), 'offer_id': bid.offer_id.hex()}) + self.notify(NT.BID_RECEIVED, {'type': 'xmr', 'bid_id': bid.bid_id.hex(), 'offer_id': bid.offer_id.hex()}, session) bid.setState(BidStates.BID_RECEIVED) @@ -4217,7 +4252,7 @@ class BasicSwap(BaseApp): bid.setState(BidStates.BID_ACCEPTED) # XMR self.saveBidInSession(bid.bid_id, bid, session, xmr_swap) - self.notify(NT.BID_ACCEPTED, {'bid_id': bid.bid_id.hex()}) + self.notify(NT.BID_ACCEPTED, {'bid_id': bid.bid_id.hex()}, session) delay = random.randrange(self.min_delay_event, self.max_delay_event) self.log.info('Responding to xmr bid accept %s in %d seconds', bid.bid_id.hex(), delay) @@ -5734,13 +5769,8 @@ class BasicSwap(BaseApp): def newSMSGAddress(self, use_type=AddressTypes.RECV_OFFER, addressnote=None, session=None): now = int(time.time()) - use_session = None try: - if session: - use_session = session - else: - self.mxDB.acquire() - use_session = scoped_session(self.session_factory) + use_session = self.openSession(session) v = use_session.query(DBKVString).filter_by(key='smsg_chain_id').first() if not v: @@ -5780,10 +5810,7 @@ class BasicSwap(BaseApp): return new_addr, addr_info['pubkey'] finally: if session is None: - use_session.commit() - use_session.close() - use_session.remove() - self.mxDB.release() + self.closeSession(use_session) def addSMSGAddress(self, pubkey_hex, addressnote=None): self.mxDB.acquire() diff --git a/basicswap/db.py b/basicswap/db.py index f0ce860..6cd880f 100644 --- a/basicswap/db.py +++ b/basicswap/db.py @@ -12,7 +12,7 @@ from enum import IntEnum, auto from sqlalchemy.ext.declarative import declarative_base -CURRENT_DB_VERSION = 15 +CURRENT_DB_VERSION = 16 CURRENT_DB_DATA_VERSION = 2 Base = declarative_base() @@ -472,3 +472,13 @@ class BidState(Base): note = sa.Column(sa.String) created_at = sa.Column(sa.BigInteger) + + +class Notification(Base): + __tablename__ = 'notifications' + + record_id = sa.Column(sa.Integer, primary_key=True, autoincrement=True) + active_ind = sa.Column(sa.Integer) + created_at = sa.Column(sa.BigInteger) + event_type = sa.Column(sa.Integer) + event_data = sa.Column(sa.LargeBinary) diff --git a/basicswap/db_upgrades.py b/basicswap/db_upgrades.py index 18e16ba..7ab4ae8 100644 --- a/basicswap/db_upgrades.py +++ b/basicswap/db_upgrades.py @@ -215,6 +215,16 @@ def upgradeDatabase(self, db_version): db_version += 1 session.execute('ALTER TABLE xmr_swaps ADD COLUMN coin_a_lock_release_msg_id BLOB') session.execute('ALTER TABLE xmr_swaps RENAME COLUMN coin_a_lock_refund_spend_tx_msg_id TO coin_a_lock_spend_tx_msg_id') + elif current_version == 15: + db_version += 1 + session.execute(''' + CREATE TABLE notifications ( + record_id INTEGER NOT NULL, + active_ind INTEGER, + event_type INTEGER, + event_data BLOB, + created_at BIGINT, + PRIMARY KEY (record_id))''') if current_version != db_version: self.db_version = db_version diff --git a/basicswap/http_server.py b/basicswap/http_server.py index f56b1dc..800d239 100644 --- a/basicswap/http_server.py +++ b/basicswap/http_server.py @@ -121,6 +121,9 @@ class HttpHandler(BaseHTTPRequestHandler): if swap_client.debug: swap_client.log.error(traceback.format_exc()) + if swap_client._show_notifications: + args_dict['notifications'] = swap_client.getNotifications() + self.putHeaders(200, 'text/html') return bytes(template.render( title=self.server.title, diff --git a/basicswap/js_server.py b/basicswap/js_server.py index ef50e4a..c40d715 100644 --- a/basicswap/js_server.py +++ b/basicswap/js_server.py @@ -393,6 +393,39 @@ def js_index(self, url_split, post_string, is_json): return bytes(json.dumps(self.server.swap_client.getSummary()), 'UTF-8') +def js_generatenotification(self, url_split, post_string, is_json): + swap_client = self.server.swap_client + + if not swap_client.debug: + raise ValueError('Debug mode not active.') + + r = random.randint(0, 3) + if r == 0: + swap_client.notify(NT.OFFER_RECEIVED, {'offer_id': random.randbytes(28).hex()}) + elif r == 1: + swap_client.notify(NT.BID_RECEIVED, {'type': 'atomic', 'bid_id': random.randbytes(28).hex(), 'offer_id': random.randbytes(28).hex()}) + elif r == 2: + swap_client.notify(NT.BID_ACCEPTED, {'bid_id': random.randbytes(28).hex()}) + elif r == 3: + swap_client.notify(NT.BID_RECEIVED, {'type': 'xmr', 'bid_id': random.randbytes(28).hex(), 'offer_id': random.randbytes(28).hex()}) + + return bytes(json.dumps({'type': r}), 'UTF-8') + + +def js_notifications(self, url_split, post_string, is_json): + swap_client = self.server.swap_client + swap_client.getNotifications() + + return bytes(json.dumps(swap_client.getNotifications()), 'UTF-8') + + +def js_vacuumdb(self, url_split, post_string, is_json): + swap_client = self.server.swap_client + swap_client.vacuumDB() + + return bytes(json.dumps({'completed': True}), 'UTF-8') + + def js_url_to_function(url_split): if len(url_split) > 2: return { @@ -409,20 +442,7 @@ def js_url_to_function(url_split): 'rates': js_rates, 'rateslist': js_rates_list, 'generatenotification': js_generatenotification, + 'notifications': js_notifications, + 'vacuumdb': js_vacuumdb, }.get(url_split[2], js_index) return js_index - - -def js_generatenotification(self, url_split, post_string, is_json): - swap_client = self.server.swap_client - r = random.randint(0, 3) - if r == 0: - swap_client.notify(NT.OFFER_RECEIVED, {'offer_id': random.randbytes(28).hex()}) - elif r == 1: - swap_client.notify(NT.BID_RECEIVED, {'type': 'atomic', 'bid_id': random.randbytes(28).hex(), 'offer_id': random.randbytes(28).hex()}) - elif r == 2: - swap_client.notify(NT.BID_ACCEPTED, {'bid_id': random.randbytes(28).hex()}) - elif r == 3: - swap_client.notify(NT.BID_RECEIVED, {'type': 'xmr', 'bid_id': random.randbytes(28).hex(), 'offer_id': random.randbytes(28).hex()}) - - return bytes(json.dumps({'type': r}), 'UTF-8') diff --git a/basicswap/templates/header.html b/basicswap/templates/header.html index be0d24b..01787e1 100644 --- a/basicswap/templates/header.html +++ b/basicswap/templates/header.html @@ -680,5 +680,8 @@ floating_div.appendChild(messages); document.body.appendChild(floating_div); + {% for entry in notifications %} + console.log({{ entry[0] }}, {{ entry[1] }}, {{ entry[2] }}); + {% endfor %} {% endif %} diff --git a/basicswap/util/__init__.py b/basicswap/util/__init__.py index 44eb033..840e420 100644 --- a/basicswap/util/__init__.py +++ b/basicswap/util/__init__.py @@ -28,6 +28,7 @@ class AutomationConstraint(ValueError): class InactiveCoin(Exception): def __init__(self, coinid): self.coinid = coinid + def __str__(self): return str(self.coinid)