basicswap/basicswap/js_server.py
2024-11-15 09:59:28 +02:00

834 lines
32 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2020-2024 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import json
import random
import urllib.parse
from .util import (
ensure,
toBool,
)
from .basicswap_util import (
strBidState,
SwapTypes,
NotificationTypes as NT,
)
from .chainparams import (
Coins,
chainparams,
)
from .ui.util import (
PAGE_LIMIT,
getCoinName,
getCoinType,
inputAmount,
describeBid,
setCoinFilter,
get_data_entry,
get_data_entry_or,
have_data_entry,
tickerToCoinId,
listOldBidStates,
checkAddressesOwned,
)
from .ui.page_offers import postNewOffer
from .protocols.xmr_swap_1 import recoverNoScriptTxnWithKey, getChainBSplitKey
def getFormData(post_string: str, is_json: bool):
if post_string == '':
raise ValueError('No post data')
if is_json:
form_data = json.loads(post_string)
form_data['is_json'] = True
else:
form_data = urllib.parse.parse_qs(post_string)
return form_data
def withdraw_coin(swap_client, coin_type, post_string, is_json):
post_data = getFormData(post_string, is_json)
address = get_data_entry(post_data, 'address')
if coin_type in (Coins.XMR, Coins.WOW):
value = None
sweepall = get_data_entry(post_data, 'sweepall')
if not isinstance(sweepall, bool):
sweepall = toBool(sweepall)
if not sweepall:
value = get_data_entry(post_data, 'value')
else:
value = get_data_entry(post_data, 'value')
subfee = get_data_entry(post_data, 'subfee')
if not isinstance(subfee, bool):
subfee = toBool(subfee)
if coin_type == Coins.PART:
type_from = get_data_entry_or(post_data, 'type_from', 'plain')
type_to = get_data_entry_or(post_data, 'type_to', 'plain')
txid_hex = swap_client.withdrawParticl(type_from, type_to, value, address, subfee)
elif coin_type == Coins.LTC:
type_from = get_data_entry_or(post_data, 'type_from', 'plain')
txid_hex = swap_client.withdrawLTC(type_from, value, address, subfee)
elif coin_type in (Coins.XMR, Coins.WOW):
txid_hex = swap_client.withdrawCoin(coin_type, value, address, sweepall)
else:
txid_hex = swap_client.withdrawCoin(coin_type, value, address, subfee)
return {'txid': txid_hex}
def js_error(self, error_str) -> bytes:
error_str_json = json.dumps({'error': error_str})
return bytes(error_str_json, 'UTF-8')
def js_coins(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
coins = []
for coin in Coins:
cc = swap_client.coin_clients[coin]
coin_chainparams = chainparams[cc['coin']]
coin_active: bool = False if cc['connection_type'] == 'none' else True
if coin == Coins.LTC_MWEB:
coin_active = False
entry = {
'id': int(coin),
'ticker': coin_chainparams['ticker'],
'name': getCoinName(coin),
'active': coin_active,
'decimal_places': coin_chainparams['decimal_places'],
}
if coin == Coins.PART_ANON:
entry['variant'] = 'Anon'
elif coin == Coins.PART_BLIND:
entry['variant'] = 'Blind'
elif coin == Coins.LTC_MWEB:
entry['variant'] = 'MWEB'
coins.append(entry)
return bytes(json.dumps(coins), 'UTF-8')
def js_wallets(self, url_split, post_string, is_json):
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
if len(url_split) > 3:
ticker_str = url_split[3]
coin_type = tickerToCoinId(ticker_str)
if len(url_split) > 4:
cmd = url_split[4]
if cmd == 'withdraw':
return bytes(json.dumps(withdraw_coin(swap_client, coin_type, post_string, is_json)), 'UTF-8')
elif cmd == 'nextdepositaddr':
return bytes(json.dumps(swap_client.cacheNewAddressForCoin(coin_type)), 'UTF-8')
elif cmd == 'createutxo':
post_data = getFormData(post_string, is_json)
ci = swap_client.ci(coin_type)
value = ci.make_int(get_data_entry(post_data, 'value'))
txid_hex, new_addr = ci.createUTXO(value)
return bytes(json.dumps({'txid': txid_hex, 'address': new_addr}), 'UTF-8')
elif cmd == 'reseed':
swap_client.reseedWallet(coin_type)
return bytes(json.dumps({'reseeded': True}), 'UTF-8')
elif cmd == 'newstealthaddress':
if coin_type != Coins.PART:
raise ValueError('Invalid coin for command')
return bytes(json.dumps(swap_client.ci(coin_type).getNewStealthAddress()), 'UTF-8')
elif cmd == 'newmwebaddress':
if coin_type not in (Coins.LTC, Coins.LTC_MWEB):
raise ValueError('Invalid coin for command')
return bytes(json.dumps(swap_client.ci(coin_type).getNewMwebAddress()), 'UTF-8')
raise ValueError('Unknown command')
if coin_type == Coins.LTC_MWEB:
coin_type = Coins.LTC
rv = swap_client.getWalletInfo(coin_type)
rv.update(swap_client.getBlockchainInfo(coin_type))
ci = swap_client.ci(coin_type)
checkAddressesOwned(swap_client, ci, rv)
return bytes(json.dumps(rv), 'UTF-8')
return bytes(json.dumps(swap_client.getWalletsInfo({'ticker_key': True})), 'UTF-8')
def js_offers(self, url_split, post_string, is_json, sent=False) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
offer_id = None
if len(url_split) > 3:
if url_split[3] == 'new':
form_data = getFormData(post_string, is_json)
offer_id = postNewOffer(swap_client, form_data)
rv = {'offer_id': offer_id.hex()}
return bytes(json.dumps(rv), 'UTF-8')
offer_id = bytes.fromhex(url_split[3])
with_extra_info = False
filters = {
'coin_from': -1,
'coin_to': -1,
'page_no': 1,
'limit': PAGE_LIMIT,
'sort_by': 'created_at',
'sort_dir': 'desc',
}
if offer_id:
filters['offer_id'] = offer_id
if post_string != '':
post_data = getFormData(post_string, is_json)
filters['coin_from'] = setCoinFilter(post_data, 'coin_from')
filters['coin_to'] = setCoinFilter(post_data, 'coin_to')
if have_data_entry(post_data, 'sort_by'):
sort_by = get_data_entry(post_data, 'sort_by')
ensure(sort_by in ['created_at', 'rate'], 'Invalid sort by')
filters['sort_by'] = sort_by
if have_data_entry(post_data, 'sort_dir'):
sort_dir = get_data_entry(post_data, 'sort_dir')
ensure(sort_dir in ['asc', 'desc'], 'Invalid sort dir')
filters['sort_dir'] = sort_dir
if have_data_entry(post_data, 'offset'):
filters['offset'] = int(get_data_entry(post_data, 'offset'))
if have_data_entry(post_data, 'limit'):
filters['limit'] = int(get_data_entry(post_data, 'limit'))
ensure(filters['limit'] > 0, 'Invalid limit')
if have_data_entry(post_data, 'active'):
filters['active'] = get_data_entry(post_data, 'active')
if have_data_entry(post_data, 'include_sent'):
filters['include_sent'] = toBool(get_data_entry(post_data, 'include_sent'))
if have_data_entry(post_data, 'with_extra_info'):
with_extra_info = toBool(get_data_entry(post_data, 'with_extra_info'))
offers = swap_client.listOffers(sent, filters)
rv = []
for o in offers:
ci_from = swap_client.ci(o.coin_from)
ci_to = swap_client.ci(o.coin_to)
offer_data = {
'swap_type': o.swap_type,
'addr_from': o.addr_from,
'addr_to': o.addr_to,
'offer_id': o.offer_id.hex(),
'created_at': o.created_at,
'expire_at': o.expire_at,
'coin_from': ci_from.coin_name(),
'coin_to': ci_to.coin_name(),
'amount_from': ci_from.format_amount(o.amount_from),
'amount_to': ci_to.format_amount((o.amount_from * o.rate) // ci_from.COIN()),
'rate': ci_to.format_amount(o.rate),
'min_bid_amount': ci_from.format_amount(o.min_bid_amount),
'is_expired': o.expire_at <= swap_client.getTime(),
'is_own_offer': o.was_sent,
'is_revoked': True if o.active_ind == 2 else False,
}
if with_extra_info:
offer_data['amount_negotiable'] = o.amount_negotiable
offer_data['rate_negotiable'] = o.rate_negotiable
if o.swap_type == SwapTypes.XMR_SWAP:
_, xmr_offer = swap_client.getXmrOffer(o.offer_id)
offer_data['lock_time_1'] = xmr_offer.lock_time_1
offer_data['lock_time_2'] = xmr_offer.lock_time_2
offer_data['feerate_from'] = xmr_offer.a_fee_rate
offer_data['feerate_to'] = xmr_offer.b_fee_rate
else:
offer_data['feerate_from'] = o.from_feerate
offer_data['feerate_to'] = o.to_feerate
rv.append(offer_data)
return bytes(json.dumps(rv), 'UTF-8')
def js_sentoffers(self, url_split, post_string, is_json) -> bytes:
return js_offers(self, url_split, post_string, is_json, True)
def parseBidFilters(post_data):
offer_id = None
filters = {}
if have_data_entry(post_data, 'offer_id'):
offer_id = bytes.fromhex(get_data_entry(post_data, 'offer_id'))
assert (len(offer_id) == 28)
if have_data_entry(post_data, 'sort_by'):
sort_by = get_data_entry(post_data, 'sort_by')
assert (sort_by in ['created_at', ]), 'Invalid sort by'
filters['sort_by'] = sort_by
if have_data_entry(post_data, 'sort_dir'):
sort_dir = get_data_entry(post_data, 'sort_dir')
assert (sort_dir in ['asc', 'desc']), 'Invalid sort dir'
filters['sort_dir'] = sort_dir
if have_data_entry(post_data, 'offset'):
filters['offset'] = int(get_data_entry(post_data, 'offset'))
if have_data_entry(post_data, 'limit'):
filters['limit'] = int(get_data_entry(post_data, 'limit'))
assert (filters['limit'] > 0 and filters['limit'] <= PAGE_LIMIT), 'Invalid limit'
if have_data_entry(post_data, 'with_available_or_active'):
filters['with_available_or_active'] = toBool(get_data_entry(post_data, 'with_available_or_active'))
elif have_data_entry(post_data, 'with_expired'):
filters['with_expired'] = toBool(get_data_entry(post_data, 'with_expired'))
if have_data_entry(post_data, 'with_extra_info'):
filters['with_extra_info'] = toBool(get_data_entry(post_data, 'with_extra_info'))
return offer_id, filters
def formatBids(swap_client, bids, filters) -> bytes:
with_extra_info = filters.get('with_extra_info', False)
rv = []
for b in bids:
bid_data = {
'bid_id': b[2].hex(),
'offer_id': b[3].hex(),
'created_at': b[0],
'expire_at': b[1],
'coin_from': b[9],
'amount_from': swap_client.ci(b[9]).format_amount(b[4]),
'bid_rate': swap_client.ci(b[14]).format_amount(b[10]),
'bid_state': strBidState(b[5])
}
if with_extra_info:
bid_data['addr_from'] = b[11]
rv.append(bid_data)
return bytes(json.dumps(rv), 'UTF-8')
def js_bids(self, url_split, post_string: str, is_json: bool) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
if len(url_split) > 3:
if url_split[3] == 'new':
post_data = getFormData(post_string, is_json)
offer_id = bytes.fromhex(get_data_entry(post_data, 'offer_id'))
assert (len(offer_id) == 28)
offer = swap_client.getOffer(offer_id)
assert (offer), 'Offer not found.'
ci_from = swap_client.ci(offer.coin_from)
ci_to = swap_client.ci(offer.coin_to)
amount_from = inputAmount(get_data_entry(post_data, 'amount_from'), ci_from)
addr_from = None
if have_data_entry(post_data, 'addr_from'):
addr_from = get_data_entry(post_data, 'addr_from')
if addr_from == '-1':
addr_from = None
if have_data_entry(post_data, 'validmins'):
valid_for_seconds = int(get_data_entry(post_data, 'validmins')) * 60
elif have_data_entry(post_data, 'valid_for_seconds'):
valid_for_seconds = int(get_data_entry(post_data, 'valid_for_seconds'))
else:
valid_for_seconds = 10 * 60
extra_options = {
'valid_for_seconds': valid_for_seconds,
}
if have_data_entry(post_data, 'amount_to'):
extra_options['amount_to'] = inputAmount(get_data_entry(post_data, 'amount_to'), ci_to)
elif have_data_entry(post_data, 'bid_rate'):
extra_options['bid_rate'] = ci_to.make_int(get_data_entry(post_data, 'bid_rate'), r=1)
if have_data_entry(post_data, 'bid_amount'):
amount_from = inputAmount(get_data_entry(post_data, 'bid_amount'), ci_from)
if offer.swap_type == SwapTypes.XMR_SWAP:
bid_id = swap_client.postXmrBid(offer_id, amount_from, addr_send_from=addr_from, extra_options=extra_options)
else:
bid_id = swap_client.postBid(offer_id, amount_from, addr_send_from=addr_from, extra_options=extra_options)
if have_data_entry(post_data, 'debugind'):
swap_client.setBidDebugInd(bid_id, int(get_data_entry(post_data, 'debugind')))
rv = {'bid_id': bid_id.hex()}
return bytes(json.dumps(rv), 'UTF-8')
bid_id = bytes.fromhex(url_split[3])
assert (len(bid_id) == 28)
show_txns: bool = False
with_events: bool = False
if post_string != '':
post_data = getFormData(post_string, is_json)
if have_data_entry(post_data, 'accept'):
swap_client.acceptBid(bid_id)
elif have_data_entry(post_data, 'abandon'):
swap_client.abandonBid(bid_id)
elif have_data_entry(post_data, 'debugind'):
swap_client.setBidDebugInd(bid_id, int(get_data_entry(post_data, 'debugind')))
if have_data_entry(post_data, 'show_extra'):
show_txns = True
if have_data_entry(post_data, 'with_events'):
with_events = True
bid, xmr_swap, offer, xmr_offer, events = swap_client.getXmrBidAndOffer(bid_id)
assert (bid), 'Unknown bid ID'
if post_string != '':
if have_data_entry(post_data, 'chainbkeysplit'):
return bytes(json.dumps({'splitkey': getChainBSplitKey(swap_client, bid, xmr_swap, offer)}), 'UTF-8')
elif have_data_entry(post_data, 'spendchainblocktx'):
remote_key = get_data_entry(post_data, 'remote_key')
return bytes(json.dumps({'txid': recoverNoScriptTxnWithKey(swap_client, bid_id, remote_key).hex()}), 'UTF-8')
if len(url_split) > 4 and url_split[4] == 'states':
old_states = listOldBidStates(bid)
if with_events:
new_list = []
for entry in old_states:
entry_list = list(entry)
entry_list.insert(1, 'state')
new_list.append(entry_list)
for event in events:
new_list.append([event['at'], 'event', event['desc']])
old_states = sorted(new_list, key=lambda x: x[0])
return bytes(json.dumps(old_states), 'UTF-8')
edit_bid = False
data = describeBid(swap_client, bid, xmr_swap, offer, xmr_offer, events, edit_bid, show_txns, for_api=True)
return bytes(json.dumps(data), 'UTF-8')
post_data = {} if post_string == '' else getFormData(post_string, is_json)
offer_id, filters = parseBidFilters(post_data)
bids = swap_client.listBids(offer_id=offer_id, filters=filters)
return formatBids(swap_client, bids, filters)
def js_sentbids(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
post_data = getFormData(post_string, is_json)
offer_id, filters = parseBidFilters(post_data)
bids = swap_client.listBids(sent=True, offer_id=offer_id, filters=filters)
return formatBids(swap_client, bids, filters)
def js_network(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
return bytes(json.dumps(swap_client.get_network_info()), 'UTF-8')
def js_revokeoffer(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
offer_id = bytes.fromhex(url_split[3])
assert (len(offer_id) == 28)
swap_client.revokeOffer(offer_id)
return bytes(json.dumps({'revoked_offer': offer_id.hex()}), 'UTF-8')
def js_smsgaddresses(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
post_data = {} if post_string == '' else getFormData(post_string, is_json)
if len(url_split) > 3:
mode: str = url_split[3]
if mode == 'new':
addressnote = get_data_entry_or(post_data, 'addressnote', '')
new_addr, pubkey = swap_client.newSMSGAddress(addressnote=addressnote)
return bytes(json.dumps({'new_address': new_addr, 'pubkey': pubkey}), 'UTF-8')
if mode == 'add':
addressnote = get_data_entry_or(post_data, 'addressnote', '')
pubkey_hex = get_data_entry(post_data, 'addresspubkey')
added_address = swap_client.addSMSGAddress(pubkey_hex, addressnote)
return bytes(json.dumps({'added_address': added_address, 'pubkey': pubkey_hex}), 'UTF-8')
elif mode == 'edit':
address = get_data_entry(post_data, 'address')
activeind = int(get_data_entry(post_data, 'active_ind'))
addressnote = get_data_entry_or(post_data, 'addressnote', '')
new_addr = swap_client.editSMSGAddress(address, activeind, addressnote)
return bytes(json.dumps({'edited_address': address}), 'UTF-8')
elif mode == 'disableall':
rv = swap_client.disableAllSMSGAddresses()
return bytes(json.dumps(rv), 'UTF-8')
filters = {
'exclude_inactive': post_data.get('exclude_inactive', True),
}
return bytes(json.dumps(swap_client.listAllSMSGAddresses(filters)), 'UTF-8')
def js_rates(self, url_split, post_string, is_json) -> bytes:
post_data = getFormData(post_string, is_json)
sc = self.server.swap_client
coin_from = get_data_entry(post_data, 'coin_from')
coin_to = get_data_entry(post_data, 'coin_to')
return bytes(json.dumps(sc.lookupRates(coin_from, coin_to)), 'UTF-8')
def js_rates_list(self, url_split, query_string, is_json) -> bytes:
get_data = urllib.parse.parse_qs(query_string)
sc = self.server.swap_client
coin_from = getCoinType(get_data['from'][0])
coin_to = getCoinType(get_data['to'][0])
return bytes(json.dumps(sc.lookupRates(coin_from, coin_to, True)), 'UTF-8')
def js_rate(self, url_split, post_string, is_json) -> bytes:
post_data = getFormData(post_string, is_json)
sc = self.server.swap_client
coin_from = getCoinType(get_data_entry(post_data, 'coin_from'))
ci_from = sc.ci(coin_from)
coin_to = getCoinType(get_data_entry(post_data, 'coin_to'))
ci_to = sc.ci(coin_to)
# Set amount to if rate is provided
rate = get_data_entry_or(post_data, 'rate', None)
if rate is not None:
amt_from_str = get_data_entry_or(post_data, 'amt_from', None)
amt_to_str = get_data_entry_or(post_data, 'amt_to', None)
if amt_from_str is not None:
rate = ci_to.make_int(rate, r=1)
amt_from = inputAmount(amt_from_str, ci_from)
amount_to = ci_to.format_amount(int((amt_from * rate) // ci_from.COIN()), r=1)
return bytes(json.dumps({'amount_to': amount_to}), 'UTF-8')
if amt_to_str is not None:
rate = ci_from.make_int(1.0 / float(rate), r=1)
amt_to = inputAmount(amt_to_str, ci_to)
amount_from = ci_from.format_amount(int((amt_to * rate) // ci_to.COIN()), r=1)
return bytes(json.dumps({'amount_from': amount_from}), 'UTF-8')
amt_from: int = inputAmount(get_data_entry(post_data, 'amt_from'), ci_from)
amt_to: int = inputAmount(get_data_entry(post_data, 'amt_to'), ci_to)
rate: int = ci_to.format_amount(ci_from.make_int(amt_to / amt_from, r=1))
return bytes(json.dumps({'rate': rate}), 'UTF-8')
def js_index(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
return bytes(json.dumps(swap_client.getSummary()), 'UTF-8')
def js_generatenotification(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
if not swap_client.debug:
raise ValueError('Debug mode not active.')
r = random.randint(0, 3)
if r == 0:
swap_client.notify(NT.OFFER_RECEIVED, {'offer_id': random.randbytes(28).hex()})
elif r == 1:
swap_client.notify(NT.BID_RECEIVED, {'type': 'atomic', 'bid_id': random.randbytes(28).hex(), 'offer_id': random.randbytes(28).hex()})
elif r == 2:
swap_client.notify(NT.BID_ACCEPTED, {'bid_id': random.randbytes(28).hex()})
elif r == 3:
swap_client.notify(NT.BID_RECEIVED, {'type': 'ads', 'bid_id': random.randbytes(28).hex(), 'offer_id': random.randbytes(28).hex()})
return bytes(json.dumps({'type': r}), 'UTF-8')
def js_notifications(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
return bytes(json.dumps(swap_client.getNotifications()), 'UTF-8')
def js_identities(self, url_split, post_string: str, is_json: bool) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
filters = {
'page_no': 1,
'limit': PAGE_LIMIT,
'sort_by': 'created_at',
'sort_dir': 'desc',
}
if len(url_split) > 3:
address = url_split[3]
filters['address'] = address
if post_string != '':
post_data = getFormData(post_string, is_json)
if have_data_entry(post_data, 'sort_by'):
sort_by = get_data_entry(post_data, 'sort_by')
assert (sort_by in ['created_at', 'rate']), 'Invalid sort by'
filters['sort_by'] = sort_by
if have_data_entry(post_data, 'sort_dir'):
sort_dir = get_data_entry(post_data, 'sort_dir')
assert (sort_dir in ['asc', 'desc']), 'Invalid sort dir'
filters['sort_dir'] = sort_dir
if have_data_entry(post_data, 'offset'):
filters['offset'] = int(get_data_entry(post_data, 'offset'))
if have_data_entry(post_data, 'limit'):
filters['limit'] = int(get_data_entry(post_data, 'limit'))
assert (filters['limit'] > 0 and filters['limit'] <= PAGE_LIMIT), 'Invalid limit'
set_data = {}
if have_data_entry(post_data, 'set_label'):
set_data['label'] = get_data_entry(post_data, 'set_label')
if have_data_entry(post_data, 'set_automation_override'):
set_data['automation_override'] = get_data_entry(post_data, 'set_automation_override')
if have_data_entry(post_data, 'set_visibility_override'):
set_data['visibility_override'] = get_data_entry(post_data, 'set_visibility_override')
if have_data_entry(post_data, 'set_note'):
set_data['note'] = get_data_entry(post_data, 'set_note')
if set_data:
ensure('address' in filters, 'Must provide an address to modify data')
swap_client.setIdentityData(filters, set_data)
return bytes(json.dumps(swap_client.listIdentities(filters)), 'UTF-8')
def js_automationstrategies(self, url_split, post_string: str, is_json: bool) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
filters = {
'page_no': 1,
'limit': PAGE_LIMIT,
'sort_by': 'created_at',
'sort_dir': 'desc',
}
if post_string != '':
post_data = getFormData(post_string, is_json)
if have_data_entry(post_data, 'sort_by'):
sort_by = get_data_entry(post_data, 'sort_by')
assert (sort_by in ['created_at', 'rate']), 'Invalid sort by'
filters['sort_by'] = sort_by
if have_data_entry(post_data, 'sort_dir'):
sort_dir = get_data_entry(post_data, 'sort_dir')
assert (sort_dir in ['asc', 'desc']), 'Invalid sort dir'
filters['sort_dir'] = sort_dir
if have_data_entry(post_data, 'offset'):
filters['offset'] = int(get_data_entry(post_data, 'offset'))
if have_data_entry(post_data, 'limit'):
filters['limit'] = int(get_data_entry(post_data, 'limit'))
assert (filters['limit'] > 0 and filters['limit'] <= PAGE_LIMIT), 'Invalid limit'
if len(url_split) > 3:
strat_id = int(url_split[3])
strat_data = swap_client.getAutomationStrategy(strat_id)
rv = {
'record_id': strat_data.record_id,
'label': strat_data.label,
'type_ind': strat_data.type_ind,
'only_known_identities': strat_data.only_known_identities,
'num_concurrent': strat_data.num_concurrent,
'data': json.loads(strat_data.data.decode('utf-8')),
'note': '' if strat_data.note is None else strat_data.note,
}
return bytes(json.dumps(rv), 'UTF-8')
rv = []
strats = swap_client.listAutomationStrategies(filters)
for row in strats:
rv.append((row[0], row[1], row[2]))
return bytes(json.dumps(rv), 'UTF-8')
def js_validateamount(self, url_split, post_string: str, is_json: bool) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
post_data = getFormData(post_string, is_json)
ticker_str = post_data['coin']
amount = post_data['amount']
round_method = post_data.get('method', 'none')
valid_round_methods = ('roundoff', 'rounddown', 'none')
if round_method not in valid_round_methods:
raise ValueError(f'Unknown rounding method, must be one of {valid_round_methods}')
coin_type = tickerToCoinId(ticker_str)
ci = swap_client.ci(coin_type)
r = 0
if round_method == 'roundoff':
r = 1
elif round_method == 'rounddown':
r = -1
output_amount = ci.format_amount(amount, conv_int=True, r=r)
return bytes(json.dumps(output_amount), 'UTF-8')
def js_vacuumdb(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
swap_client.vacuumDB()
return bytes(json.dumps({'completed': True}), 'UTF-8')
def js_getcoinseed(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
swap_client.checkSystemStatus()
post_data = getFormData(post_string, is_json)
coin = getCoinType(get_data_entry(post_data, 'coin'))
if coin in (Coins.PART, Coins.PART_ANON, Coins.PART_BLIND):
raise ValueError('Particl wallet seed is set from the Basicswap mnemonic.')
ci = swap_client.ci(coin)
if coin in (Coins.XMR, Coins.WOW):
key_view = swap_client.getWalletKey(coin, 1, for_ed25519=True)
key_spend = swap_client.getWalletKey(coin, 2, for_ed25519=True)
address = ci.getAddressFromKeys(key_view, key_spend)
return bytes(json.dumps({'coin': ci.ticker(), 'key_view': ci.encodeKey(key_view), 'key_spend': ci.encodeKey(key_spend), 'address': address}), 'UTF-8')
seed_key = swap_client.getWalletKey(coin, 1)
seed_id = ci.getSeedHash(seed_key)
return bytes(json.dumps({'coin': ci.ticker(), 'seed': seed_key.hex(), 'seed_id': seed_id.hex()}), 'UTF-8')
def js_setpassword(self, url_split, post_string, is_json) -> bytes:
# Set or change wallet passwords
# Only works with currently enabled coins
# Will fail if any coin does not unlock on the old password
swap_client = self.server.swap_client
post_data = getFormData(post_string, is_json)
old_password = get_data_entry(post_data, 'oldpassword')
new_password = get_data_entry(post_data, 'newpassword')
if have_data_entry(post_data, 'coin'):
# Set password for one coin
coin = getCoinType(get_data_entry(post_data, 'coin'))
if coin in (Coins.PART_ANON, Coins.PART_BLIND, Coins.LTC_MWEB):
raise ValueError('Invalid coin.')
swap_client.changeWalletPasswords(old_password, new_password, coin)
return bytes(json.dumps({'success': True}), 'UTF-8')
# Set password for all coins
swap_client.changeWalletPasswords(old_password, new_password)
return bytes(json.dumps({'success': True}), 'UTF-8')
def js_unlock(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
post_data = getFormData(post_string, is_json)
password = get_data_entry(post_data, 'password')
if have_data_entry(post_data, 'coin'):
coin = getCoinType(str(get_data_entry(post_data, 'coin')))
if coin in (Coins.PART_ANON, Coins.PART_BLIND):
raise ValueError('Invalid coin.')
swap_client.unlockWallets(password, coin)
return bytes(json.dumps({'success': True}), 'UTF-8')
swap_client.unlockWallets(password)
return bytes(json.dumps({'success': True}), 'UTF-8')
def js_lock(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
post_data = {} if post_string == '' else getFormData(post_string, is_json)
if have_data_entry(post_data, 'coin'):
coin = getCoinType(get_data_entry(post_data, 'coin'))
if coin in (Coins.PART_ANON, Coins.PART_BLIND):
raise ValueError('Invalid coin.')
swap_client.lockWallets(coin)
return bytes(json.dumps({'success': True}), 'UTF-8')
swap_client.lockWallets()
return bytes(json.dumps({'success': True}), 'UTF-8')
def js_404(self, url_split, post_string, is_json) -> bytes:
return bytes(json.dumps({'Error': 'path unknown'}), 'UTF-8')
def js_help(self, url_split, post_string, is_json) -> bytes:
# TODO: Add details and examples
commands = []
for k in pages:
commands.append(k)
return bytes(json.dumps({'commands': commands}), 'UTF-8')
def js_readurl(self, url_split, post_string, is_json) -> bytes:
swap_client = self.server.swap_client
post_data = {} if post_string == '' else getFormData(post_string, is_json)
if have_data_entry(post_data, 'url'):
url = get_data_entry(post_data, 'url')
default_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
response = swap_client.readURL(url, headers=default_headers)
try:
error = json.loads(response.decode())
if "Error" in error:
return json.dumps({"Error": error['Error']}).encode()
except json.JSONDecodeError:
pass
return response
raise ValueError('Requires URL.')
pages = {
'coins': js_coins,
'wallets': js_wallets,
'offers': js_offers,
'sentoffers': js_sentoffers,
'bids': js_bids,
'sentbids': js_sentbids,
'network': js_network,
'revokeoffer': js_revokeoffer,
'smsgaddresses': js_smsgaddresses,
'rate': js_rate,
'rates': js_rates,
'rateslist': js_rates_list,
'generatenotification': js_generatenotification,
'notifications': js_notifications,
'identities': js_identities,
'automationstrategies': js_automationstrategies,
'validateamount': js_validateamount,
'vacuumdb': js_vacuumdb,
'getcoinseed': js_getcoinseed,
'setpassword': js_setpassword,
'unlock': js_unlock,
'lock': js_lock,
'help': js_help,
'readurl': js_readurl,
}
def js_url_to_function(url_split):
if len(url_split) > 2:
return pages.get(url_split[2], js_404)
return js_index