mirror of
https://github.com/basicswap/basicswap.git
synced 2025-01-10 04:34:44 +00:00
1025 lines
37 KiB
Python
1025 lines
37 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2020-2024 tecnovert
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
import json
|
|
import random
|
|
import urllib.parse
|
|
|
|
from .util import (
|
|
ensure,
|
|
toBool,
|
|
)
|
|
from .basicswap_util import (
|
|
strBidState,
|
|
SwapTypes,
|
|
NotificationTypes as NT,
|
|
)
|
|
from .chainparams import (
|
|
Coins,
|
|
chainparams,
|
|
)
|
|
from .ui.util import (
|
|
PAGE_LIMIT,
|
|
getCoinName,
|
|
getCoinType,
|
|
inputAmount,
|
|
describeBid,
|
|
setCoinFilter,
|
|
get_data_entry,
|
|
get_data_entry_or,
|
|
have_data_entry,
|
|
tickerToCoinId,
|
|
listOldBidStates,
|
|
checkAddressesOwned,
|
|
)
|
|
from .ui.page_offers import postNewOffer
|
|
from .protocols.xmr_swap_1 import recoverNoScriptTxnWithKey, getChainBSplitKey
|
|
|
|
|
|
def getFormData(post_string: str, is_json: bool):
|
|
if post_string == "":
|
|
raise ValueError("No post data")
|
|
if is_json:
|
|
form_data = json.loads(post_string)
|
|
form_data["is_json"] = True
|
|
else:
|
|
form_data = urllib.parse.parse_qs(post_string)
|
|
return form_data
|
|
|
|
|
|
def withdraw_coin(swap_client, coin_type, post_string, is_json):
|
|
post_data = getFormData(post_string, is_json)
|
|
address = get_data_entry(post_data, "address")
|
|
|
|
if coin_type in (Coins.XMR, Coins.WOW):
|
|
value = None
|
|
sweepall = get_data_entry(post_data, "sweepall")
|
|
if not isinstance(sweepall, bool):
|
|
sweepall = toBool(sweepall)
|
|
if not sweepall:
|
|
value = get_data_entry(post_data, "value")
|
|
else:
|
|
value = get_data_entry(post_data, "value")
|
|
subfee = get_data_entry(post_data, "subfee")
|
|
if not isinstance(subfee, bool):
|
|
subfee = toBool(subfee)
|
|
|
|
if coin_type == Coins.PART:
|
|
type_from = get_data_entry_or(post_data, "type_from", "plain")
|
|
type_to = get_data_entry_or(post_data, "type_to", "plain")
|
|
txid_hex = swap_client.withdrawParticl(
|
|
type_from, type_to, value, address, subfee
|
|
)
|
|
elif coin_type == Coins.LTC:
|
|
type_from = get_data_entry_or(post_data, "type_from", "plain")
|
|
txid_hex = swap_client.withdrawLTC(type_from, value, address, subfee)
|
|
elif coin_type in (Coins.XMR, Coins.WOW):
|
|
txid_hex = swap_client.withdrawCoin(coin_type, value, address, sweepall)
|
|
else:
|
|
txid_hex = swap_client.withdrawCoin(coin_type, value, address, subfee)
|
|
|
|
return {"txid": txid_hex}
|
|
|
|
|
|
def js_error(self, error_str) -> bytes:
|
|
error_str_json = json.dumps({"error": error_str})
|
|
return bytes(error_str_json, "UTF-8")
|
|
|
|
|
|
def js_coins(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
|
|
coins = []
|
|
for coin in Coins:
|
|
cc = swap_client.coin_clients[coin]
|
|
coin_chainparams = chainparams[cc["coin"]]
|
|
coin_active: bool = False if cc["connection_type"] == "none" else True
|
|
if coin == Coins.LTC_MWEB:
|
|
coin_active = False
|
|
entry = {
|
|
"id": int(coin),
|
|
"ticker": coin_chainparams["ticker"],
|
|
"name": getCoinName(coin),
|
|
"active": coin_active,
|
|
"decimal_places": coin_chainparams["decimal_places"],
|
|
}
|
|
if coin == Coins.PART_ANON:
|
|
entry["variant"] = "Anon"
|
|
elif coin == Coins.PART_BLIND:
|
|
entry["variant"] = "Blind"
|
|
elif coin == Coins.LTC_MWEB:
|
|
entry["variant"] = "MWEB"
|
|
coins.append(entry)
|
|
|
|
return bytes(json.dumps(coins), "UTF-8")
|
|
|
|
|
|
def js_wallets(self, url_split, post_string, is_json):
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
if len(url_split) > 3:
|
|
ticker_str = url_split[3]
|
|
coin_type = tickerToCoinId(ticker_str)
|
|
|
|
if len(url_split) > 4:
|
|
cmd = url_split[4]
|
|
if cmd == "withdraw":
|
|
return bytes(
|
|
json.dumps(
|
|
withdraw_coin(swap_client, coin_type, post_string, is_json)
|
|
),
|
|
"UTF-8",
|
|
)
|
|
elif cmd == "nextdepositaddr":
|
|
return bytes(
|
|
json.dumps(swap_client.cacheNewAddressForCoin(coin_type)), "UTF-8"
|
|
)
|
|
elif cmd == "createutxo":
|
|
post_data = getFormData(post_string, is_json)
|
|
ci = swap_client.ci(coin_type)
|
|
value = ci.make_int(get_data_entry(post_data, "value"))
|
|
txid_hex, new_addr = ci.createUTXO(value)
|
|
return bytes(
|
|
json.dumps({"txid": txid_hex, "address": new_addr}), "UTF-8"
|
|
)
|
|
elif cmd == "reseed":
|
|
swap_client.reseedWallet(coin_type)
|
|
return bytes(json.dumps({"reseeded": True}), "UTF-8")
|
|
elif cmd == "newstealthaddress":
|
|
if coin_type != Coins.PART:
|
|
raise ValueError("Invalid coin for command")
|
|
return bytes(
|
|
json.dumps(swap_client.ci(coin_type).getNewStealthAddress()),
|
|
"UTF-8",
|
|
)
|
|
elif cmd == "newmwebaddress":
|
|
if coin_type not in (Coins.LTC, Coins.LTC_MWEB):
|
|
raise ValueError("Invalid coin for command")
|
|
return bytes(
|
|
json.dumps(swap_client.ci(coin_type).getNewMwebAddress()), "UTF-8"
|
|
)
|
|
|
|
raise ValueError("Unknown command")
|
|
|
|
if coin_type == Coins.LTC_MWEB:
|
|
coin_type = Coins.LTC
|
|
rv = swap_client.getWalletInfo(coin_type)
|
|
rv.update(swap_client.getBlockchainInfo(coin_type))
|
|
ci = swap_client.ci(coin_type)
|
|
checkAddressesOwned(swap_client, ci, rv)
|
|
return bytes(json.dumps(rv), "UTF-8")
|
|
|
|
return bytes(json.dumps(swap_client.getWalletsInfo({"ticker_key": True})), "UTF-8")
|
|
|
|
|
|
def js_offers(self, url_split, post_string, is_json, sent=False) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
offer_id = None
|
|
if len(url_split) > 3:
|
|
if url_split[3] == "new":
|
|
form_data = getFormData(post_string, is_json)
|
|
offer_id = postNewOffer(swap_client, form_data)
|
|
rv = {"offer_id": offer_id.hex()}
|
|
return bytes(json.dumps(rv), "UTF-8")
|
|
offer_id = bytes.fromhex(url_split[3])
|
|
|
|
with_extra_info = False
|
|
filters = {
|
|
"coin_from": -1,
|
|
"coin_to": -1,
|
|
"page_no": 1,
|
|
"limit": PAGE_LIMIT,
|
|
"sort_by": "created_at",
|
|
"sort_dir": "desc",
|
|
}
|
|
|
|
if offer_id:
|
|
filters["offer_id"] = offer_id
|
|
|
|
if post_string != "":
|
|
post_data = getFormData(post_string, is_json)
|
|
filters["coin_from"] = setCoinFilter(post_data, "coin_from")
|
|
filters["coin_to"] = setCoinFilter(post_data, "coin_to")
|
|
|
|
if have_data_entry(post_data, "sort_by"):
|
|
sort_by = get_data_entry(post_data, "sort_by")
|
|
ensure(sort_by in ["created_at", "rate"], "Invalid sort by")
|
|
filters["sort_by"] = sort_by
|
|
if have_data_entry(post_data, "sort_dir"):
|
|
sort_dir = get_data_entry(post_data, "sort_dir")
|
|
ensure(sort_dir in ["asc", "desc"], "Invalid sort dir")
|
|
filters["sort_dir"] = sort_dir
|
|
|
|
if have_data_entry(post_data, "offset"):
|
|
filters["offset"] = int(get_data_entry(post_data, "offset"))
|
|
if have_data_entry(post_data, "limit"):
|
|
filters["limit"] = int(get_data_entry(post_data, "limit"))
|
|
ensure(filters["limit"] > 0, "Invalid limit")
|
|
if have_data_entry(post_data, "active"):
|
|
filters["active"] = get_data_entry(post_data, "active")
|
|
if have_data_entry(post_data, "include_sent"):
|
|
filters["include_sent"] = toBool(get_data_entry(post_data, "include_sent"))
|
|
|
|
if have_data_entry(post_data, "with_extra_info"):
|
|
with_extra_info = toBool(get_data_entry(post_data, "with_extra_info"))
|
|
|
|
offers = swap_client.listOffers(sent, filters)
|
|
rv = []
|
|
for o in offers:
|
|
ci_from = swap_client.ci(o.coin_from)
|
|
ci_to = swap_client.ci(o.coin_to)
|
|
offer_data = {
|
|
"swap_type": o.swap_type,
|
|
"addr_from": o.addr_from,
|
|
"addr_to": o.addr_to,
|
|
"offer_id": o.offer_id.hex(),
|
|
"created_at": o.created_at,
|
|
"expire_at": o.expire_at,
|
|
"coin_from": ci_from.coin_name(),
|
|
"coin_to": ci_to.coin_name(),
|
|
"amount_from": ci_from.format_amount(o.amount_from),
|
|
"amount_to": ci_to.format_amount(
|
|
(o.amount_from * o.rate) // ci_from.COIN()
|
|
),
|
|
"rate": ci_to.format_amount(o.rate),
|
|
"min_bid_amount": ci_from.format_amount(o.min_bid_amount),
|
|
"is_expired": o.expire_at <= swap_client.getTime(),
|
|
"is_own_offer": o.was_sent,
|
|
"is_revoked": True if o.active_ind == 2 else False,
|
|
"is_public": o.addr_to == swap_client.network_addr
|
|
or o.addr_to.strip() == "",
|
|
}
|
|
if with_extra_info:
|
|
offer_data["amount_negotiable"] = o.amount_negotiable
|
|
offer_data["rate_negotiable"] = o.rate_negotiable
|
|
if o.swap_type == SwapTypes.XMR_SWAP:
|
|
_, xmr_offer = swap_client.getXmrOffer(o.offer_id)
|
|
offer_data["lock_time_1"] = xmr_offer.lock_time_1
|
|
offer_data["lock_time_2"] = xmr_offer.lock_time_2
|
|
|
|
offer_data["feerate_from"] = xmr_offer.a_fee_rate
|
|
offer_data["feerate_to"] = xmr_offer.b_fee_rate
|
|
else:
|
|
offer_data["feerate_from"] = o.from_feerate
|
|
offer_data["feerate_to"] = o.to_feerate
|
|
|
|
rv.append(offer_data)
|
|
return bytes(json.dumps(rv), "UTF-8")
|
|
|
|
|
|
def js_sentoffers(self, url_split, post_string, is_json) -> bytes:
|
|
return js_offers(self, url_split, post_string, is_json, True)
|
|
|
|
|
|
def parseBidFilters(post_data):
|
|
offer_id = None
|
|
filters = {}
|
|
|
|
if have_data_entry(post_data, "offer_id"):
|
|
offer_id = bytes.fromhex(get_data_entry(post_data, "offer_id"))
|
|
assert len(offer_id) == 28
|
|
|
|
if have_data_entry(post_data, "sort_by"):
|
|
sort_by = get_data_entry(post_data, "sort_by")
|
|
assert sort_by in [
|
|
"created_at",
|
|
], "Invalid sort by"
|
|
filters["sort_by"] = sort_by
|
|
if have_data_entry(post_data, "sort_dir"):
|
|
sort_dir = get_data_entry(post_data, "sort_dir")
|
|
assert sort_dir in ["asc", "desc"], "Invalid sort dir"
|
|
filters["sort_dir"] = sort_dir
|
|
|
|
if have_data_entry(post_data, "offset"):
|
|
filters["offset"] = int(get_data_entry(post_data, "offset"))
|
|
if have_data_entry(post_data, "limit"):
|
|
filters["limit"] = int(get_data_entry(post_data, "limit"))
|
|
assert filters["limit"] > 0 and filters["limit"] <= PAGE_LIMIT, "Invalid limit"
|
|
|
|
if have_data_entry(post_data, "with_available_or_active"):
|
|
filters["with_available_or_active"] = toBool(
|
|
get_data_entry(post_data, "with_available_or_active")
|
|
)
|
|
elif have_data_entry(post_data, "with_expired"):
|
|
filters["with_expired"] = toBool(get_data_entry(post_data, "with_expired"))
|
|
|
|
if have_data_entry(post_data, "with_extra_info"):
|
|
filters["with_extra_info"] = toBool(
|
|
get_data_entry(post_data, "with_extra_info")
|
|
)
|
|
|
|
return offer_id, filters
|
|
|
|
|
|
def formatBids(swap_client, bids, filters) -> bytes:
|
|
with_extra_info = filters.get("with_extra_info", False)
|
|
rv = []
|
|
for b in bids:
|
|
bid_data = {
|
|
"bid_id": b[2].hex(),
|
|
"offer_id": b[3].hex(),
|
|
"created_at": b[0],
|
|
"expire_at": b[1],
|
|
"coin_from": b[9],
|
|
"amount_from": swap_client.ci(b[9]).format_amount(b[4]),
|
|
"bid_rate": swap_client.ci(b[14]).format_amount(b[10]),
|
|
"bid_state": strBidState(b[5]),
|
|
}
|
|
if with_extra_info:
|
|
bid_data["addr_from"] = b[11]
|
|
rv.append(bid_data)
|
|
return bytes(json.dumps(rv), "UTF-8")
|
|
|
|
|
|
def js_bids(self, url_split, post_string: str, is_json: bool) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
if len(url_split) > 3:
|
|
if url_split[3] == "new":
|
|
post_data = getFormData(post_string, is_json)
|
|
|
|
offer_id = bytes.fromhex(get_data_entry(post_data, "offer_id"))
|
|
assert len(offer_id) == 28
|
|
|
|
offer = swap_client.getOffer(offer_id)
|
|
assert offer, "Offer not found."
|
|
|
|
ci_from = swap_client.ci(offer.coin_from)
|
|
ci_to = swap_client.ci(offer.coin_to)
|
|
amount_from = inputAmount(get_data_entry(post_data, "amount_from"), ci_from)
|
|
|
|
addr_from = None
|
|
if have_data_entry(post_data, "addr_from"):
|
|
addr_from = get_data_entry(post_data, "addr_from")
|
|
if addr_from == "-1":
|
|
addr_from = None
|
|
|
|
if have_data_entry(post_data, "validmins"):
|
|
valid_for_seconds = int(get_data_entry(post_data, "validmins")) * 60
|
|
elif have_data_entry(post_data, "valid_for_seconds"):
|
|
valid_for_seconds = int(get_data_entry(post_data, "valid_for_seconds"))
|
|
else:
|
|
valid_for_seconds = 10 * 60
|
|
|
|
extra_options = {
|
|
"valid_for_seconds": valid_for_seconds,
|
|
}
|
|
|
|
if have_data_entry(post_data, "amount_to"):
|
|
extra_options["amount_to"] = inputAmount(
|
|
get_data_entry(post_data, "amount_to"), ci_to
|
|
)
|
|
elif have_data_entry(post_data, "bid_rate"):
|
|
extra_options["bid_rate"] = ci_to.make_int(
|
|
get_data_entry(post_data, "bid_rate"), r=1
|
|
)
|
|
if have_data_entry(post_data, "bid_amount"):
|
|
amount_from = inputAmount(
|
|
get_data_entry(post_data, "bid_amount"), ci_from
|
|
)
|
|
|
|
if offer.swap_type == SwapTypes.XMR_SWAP:
|
|
bid_id = swap_client.postXmrBid(
|
|
offer_id,
|
|
amount_from,
|
|
addr_send_from=addr_from,
|
|
extra_options=extra_options,
|
|
)
|
|
else:
|
|
bid_id = swap_client.postBid(
|
|
offer_id,
|
|
amount_from,
|
|
addr_send_from=addr_from,
|
|
extra_options=extra_options,
|
|
)
|
|
|
|
if have_data_entry(post_data, "debugind"):
|
|
swap_client.setBidDebugInd(
|
|
bid_id, int(get_data_entry(post_data, "debugind"))
|
|
)
|
|
|
|
rv = {"bid_id": bid_id.hex()}
|
|
return bytes(json.dumps(rv), "UTF-8")
|
|
|
|
bid_id = bytes.fromhex(url_split[3])
|
|
assert len(bid_id) == 28
|
|
|
|
show_txns: bool = False
|
|
with_events: bool = False
|
|
if post_string != "":
|
|
post_data = getFormData(post_string, is_json)
|
|
if have_data_entry(post_data, "accept"):
|
|
swap_client.acceptBid(bid_id)
|
|
elif have_data_entry(post_data, "abandon"):
|
|
swap_client.abandonBid(bid_id)
|
|
elif have_data_entry(post_data, "debugind"):
|
|
swap_client.setBidDebugInd(
|
|
bid_id, int(get_data_entry(post_data, "debugind"))
|
|
)
|
|
|
|
if have_data_entry(post_data, "show_extra"):
|
|
show_txns = True
|
|
if have_data_entry(post_data, "with_events"):
|
|
with_events = True
|
|
|
|
bid, xmr_swap, offer, xmr_offer, events = swap_client.getXmrBidAndOffer(bid_id)
|
|
assert bid, "Unknown bid ID"
|
|
|
|
if post_string != "":
|
|
if have_data_entry(post_data, "chainbkeysplit"):
|
|
return bytes(
|
|
json.dumps(
|
|
{
|
|
"splitkey": getChainBSplitKey(
|
|
swap_client, bid, xmr_swap, offer
|
|
)
|
|
}
|
|
),
|
|
"UTF-8",
|
|
)
|
|
elif have_data_entry(post_data, "spendchainblocktx"):
|
|
remote_key = get_data_entry(post_data, "remote_key")
|
|
return bytes(
|
|
json.dumps(
|
|
{
|
|
"txid": recoverNoScriptTxnWithKey(
|
|
swap_client, bid_id, remote_key
|
|
).hex()
|
|
}
|
|
),
|
|
"UTF-8",
|
|
)
|
|
|
|
if len(url_split) > 4 and url_split[4] == "states":
|
|
old_states = listOldBidStates(bid)
|
|
|
|
if with_events:
|
|
new_list = []
|
|
for entry in old_states:
|
|
entry_list = list(entry)
|
|
entry_list.insert(1, "state")
|
|
new_list.append(entry_list)
|
|
for event in events:
|
|
new_list.append([event["at"], "event", event["desc"]])
|
|
old_states = sorted(new_list, key=lambda x: x[0])
|
|
|
|
return bytes(json.dumps(old_states), "UTF-8")
|
|
|
|
edit_bid = False
|
|
data = describeBid(
|
|
swap_client,
|
|
bid,
|
|
xmr_swap,
|
|
offer,
|
|
xmr_offer,
|
|
events,
|
|
edit_bid,
|
|
show_txns,
|
|
for_api=True,
|
|
)
|
|
return bytes(json.dumps(data), "UTF-8")
|
|
|
|
post_data = {} if post_string == "" else getFormData(post_string, is_json)
|
|
offer_id, filters = parseBidFilters(post_data)
|
|
|
|
bids = swap_client.listBids(offer_id=offer_id, filters=filters)
|
|
return formatBids(swap_client, bids, filters)
|
|
|
|
|
|
def js_sentbids(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
post_data = getFormData(post_string, is_json)
|
|
offer_id, filters = parseBidFilters(post_data)
|
|
|
|
bids = swap_client.listBids(sent=True, offer_id=offer_id, filters=filters)
|
|
return formatBids(swap_client, bids, filters)
|
|
|
|
|
|
def js_network(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
return bytes(json.dumps(swap_client.get_network_info()), "UTF-8")
|
|
|
|
|
|
def js_revokeoffer(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
offer_id = bytes.fromhex(url_split[3])
|
|
assert len(offer_id) == 28
|
|
swap_client.revokeOffer(offer_id)
|
|
return bytes(json.dumps({"revoked_offer": offer_id.hex()}), "UTF-8")
|
|
|
|
|
|
def js_smsgaddresses(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
post_data = {} if post_string == "" else getFormData(post_string, is_json)
|
|
|
|
if len(url_split) > 3:
|
|
mode: str = url_split[3]
|
|
if mode == "new":
|
|
addressnote = get_data_entry_or(post_data, "addressnote", "")
|
|
new_addr, pubkey = swap_client.newSMSGAddress(addressnote=addressnote)
|
|
return bytes(
|
|
json.dumps({"new_address": new_addr, "pubkey": pubkey}), "UTF-8"
|
|
)
|
|
if mode == "add":
|
|
addressnote = get_data_entry_or(post_data, "addressnote", "")
|
|
pubkey_hex = get_data_entry(post_data, "addresspubkey")
|
|
added_address = swap_client.addSMSGAddress(pubkey_hex, addressnote)
|
|
return bytes(
|
|
json.dumps({"added_address": added_address, "pubkey": pubkey_hex}),
|
|
"UTF-8",
|
|
)
|
|
elif mode == "edit":
|
|
address = get_data_entry(post_data, "address")
|
|
activeind = int(get_data_entry(post_data, "active_ind"))
|
|
addressnote = get_data_entry_or(post_data, "addressnote", "")
|
|
new_addr = swap_client.editSMSGAddress(address, activeind, addressnote)
|
|
return bytes(json.dumps({"edited_address": address}), "UTF-8")
|
|
elif mode == "disableall":
|
|
rv = swap_client.disableAllSMSGAddresses()
|
|
return bytes(json.dumps(rv), "UTF-8")
|
|
|
|
filters = {
|
|
"exclude_inactive": post_data.get("exclude_inactive", True),
|
|
}
|
|
return bytes(json.dumps(swap_client.listAllSMSGAddresses(filters)), "UTF-8")
|
|
|
|
|
|
def js_rates(self, url_split, post_string, is_json) -> bytes:
|
|
post_data = getFormData(post_string, is_json)
|
|
|
|
sc = self.server.swap_client
|
|
coin_from = get_data_entry(post_data, "coin_from")
|
|
coin_to = get_data_entry(post_data, "coin_to")
|
|
return bytes(json.dumps(sc.lookupRates(coin_from, coin_to)), "UTF-8")
|
|
|
|
|
|
def js_rates_list(self, url_split, query_string, is_json) -> bytes:
|
|
get_data = urllib.parse.parse_qs(query_string)
|
|
|
|
sc = self.server.swap_client
|
|
coin_from = getCoinType(get_data["from"][0])
|
|
coin_to = getCoinType(get_data["to"][0])
|
|
return bytes(json.dumps(sc.lookupRates(coin_from, coin_to, True)), "UTF-8")
|
|
|
|
|
|
def js_rate(self, url_split, post_string, is_json) -> bytes:
|
|
post_data = getFormData(post_string, is_json)
|
|
|
|
sc = self.server.swap_client
|
|
coin_from = getCoinType(get_data_entry(post_data, "coin_from"))
|
|
ci_from = sc.ci(coin_from)
|
|
coin_to = getCoinType(get_data_entry(post_data, "coin_to"))
|
|
ci_to = sc.ci(coin_to)
|
|
|
|
# Set amount to if rate is provided
|
|
rate = get_data_entry_or(post_data, "rate", None)
|
|
if rate is not None:
|
|
amt_from_str = get_data_entry_or(post_data, "amt_from", None)
|
|
amt_to_str = get_data_entry_or(post_data, "amt_to", None)
|
|
|
|
if amt_from_str is not None:
|
|
rate = ci_to.make_int(rate, r=1)
|
|
amt_from = inputAmount(amt_from_str, ci_from)
|
|
amount_to = ci_to.format_amount(
|
|
int((amt_from * rate) // ci_from.COIN()), r=1
|
|
)
|
|
return bytes(json.dumps({"amount_to": amount_to}), "UTF-8")
|
|
if amt_to_str is not None:
|
|
rate = ci_from.make_int(1.0 / float(rate), r=1)
|
|
amt_to = inputAmount(amt_to_str, ci_to)
|
|
amount_from = ci_from.format_amount(
|
|
int((amt_to * rate) // ci_to.COIN()), r=1
|
|
)
|
|
return bytes(json.dumps({"amount_from": amount_from}), "UTF-8")
|
|
|
|
amt_from: int = inputAmount(get_data_entry(post_data, "amt_from"), ci_from)
|
|
amt_to: int = inputAmount(get_data_entry(post_data, "amt_to"), ci_to)
|
|
|
|
rate: int = ci_to.format_amount(ci_from.make_int(amt_to / amt_from, r=1))
|
|
return bytes(json.dumps({"rate": rate}), "UTF-8")
|
|
|
|
|
|
def js_index(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
return bytes(json.dumps(swap_client.getSummary()), "UTF-8")
|
|
|
|
|
|
def js_generatenotification(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
|
|
if not swap_client.debug:
|
|
raise ValueError("Debug mode not active.")
|
|
|
|
r = random.randint(0, 3)
|
|
if r == 0:
|
|
swap_client.notify(NT.OFFER_RECEIVED, {"offer_id": random.randbytes(28).hex()})
|
|
elif r == 1:
|
|
swap_client.notify(
|
|
NT.BID_RECEIVED,
|
|
{
|
|
"type": "atomic",
|
|
"bid_id": random.randbytes(28).hex(),
|
|
"offer_id": random.randbytes(28).hex(),
|
|
},
|
|
)
|
|
elif r == 2:
|
|
swap_client.notify(NT.BID_ACCEPTED, {"bid_id": random.randbytes(28).hex()})
|
|
elif r == 3:
|
|
swap_client.notify(
|
|
NT.BID_RECEIVED,
|
|
{
|
|
"type": "ads",
|
|
"bid_id": random.randbytes(28).hex(),
|
|
"offer_id": random.randbytes(28).hex(),
|
|
},
|
|
)
|
|
|
|
return bytes(json.dumps({"type": r}), "UTF-8")
|
|
|
|
|
|
def js_notifications(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
|
|
return bytes(json.dumps(swap_client.getNotifications()), "UTF-8")
|
|
|
|
|
|
def js_identities(self, url_split, post_string: str, is_json: bool) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
|
|
if len(url_split) > 3:
|
|
address = url_split[3]
|
|
identity = swap_client.getIdentity(address)
|
|
if identity:
|
|
return bytes(
|
|
json.dumps(
|
|
{
|
|
"label": identity.label if identity.label is not None else "",
|
|
"note": identity.note if identity.note is not None else "",
|
|
"automation_override": (
|
|
identity.automation_override
|
|
if identity.automation_override is not None
|
|
else 0
|
|
),
|
|
"num_sent_bids_successful": (
|
|
identity.num_sent_bids_successful
|
|
if identity.num_sent_bids_successful is not None
|
|
else 0
|
|
),
|
|
"num_recv_bids_successful": (
|
|
identity.num_recv_bids_successful
|
|
if identity.num_recv_bids_successful is not None
|
|
else 0
|
|
),
|
|
"num_sent_bids_rejected": (
|
|
identity.num_sent_bids_rejected
|
|
if identity.num_sent_bids_rejected is not None
|
|
else 0
|
|
),
|
|
"num_recv_bids_rejected": (
|
|
identity.num_recv_bids_rejected
|
|
if identity.num_recv_bids_rejected is not None
|
|
else 0
|
|
),
|
|
"num_sent_bids_failed": (
|
|
identity.num_sent_bids_failed
|
|
if identity.num_sent_bids_failed is not None
|
|
else 0
|
|
),
|
|
"num_recv_bids_failed": (
|
|
identity.num_recv_bids_failed
|
|
if identity.num_recv_bids_failed is not None
|
|
else 0
|
|
),
|
|
}
|
|
),
|
|
"UTF-8",
|
|
)
|
|
return bytes(json.dumps({}), "UTF-8")
|
|
|
|
filters = {
|
|
"page_no": 1,
|
|
"limit": PAGE_LIMIT,
|
|
"sort_by": "created_at",
|
|
"sort_dir": "desc",
|
|
}
|
|
|
|
if post_string != "":
|
|
post_data = getFormData(post_string, is_json)
|
|
|
|
if have_data_entry(post_data, "sort_by"):
|
|
sort_by = get_data_entry(post_data, "sort_by")
|
|
assert sort_by in ["created_at", "rate"], "Invalid sort by"
|
|
filters["sort_by"] = sort_by
|
|
if have_data_entry(post_data, "sort_dir"):
|
|
sort_dir = get_data_entry(post_data, "sort_dir")
|
|
assert sort_dir in ["asc", "desc"], "Invalid sort dir"
|
|
filters["sort_dir"] = sort_dir
|
|
|
|
if have_data_entry(post_data, "offset"):
|
|
filters["offset"] = int(get_data_entry(post_data, "offset"))
|
|
if have_data_entry(post_data, "limit"):
|
|
filters["limit"] = int(get_data_entry(post_data, "limit"))
|
|
assert (
|
|
filters["limit"] > 0 and filters["limit"] <= PAGE_LIMIT
|
|
), "Invalid limit"
|
|
|
|
set_data = {}
|
|
if have_data_entry(post_data, "set_label"):
|
|
set_data["label"] = get_data_entry(post_data, "set_label")
|
|
if have_data_entry(post_data, "set_automation_override"):
|
|
set_data["automation_override"] = get_data_entry(
|
|
post_data, "set_automation_override"
|
|
)
|
|
if have_data_entry(post_data, "set_visibility_override"):
|
|
set_data["visibility_override"] = get_data_entry(
|
|
post_data, "set_visibility_override"
|
|
)
|
|
if have_data_entry(post_data, "set_note"):
|
|
set_data["note"] = get_data_entry(post_data, "set_note")
|
|
|
|
if set_data:
|
|
ensure("address" in filters, "Must provide an address to modify data")
|
|
swap_client.setIdentityData(filters, set_data)
|
|
|
|
return bytes(json.dumps(swap_client.listIdentities(filters)), "UTF-8")
|
|
|
|
|
|
def js_automationstrategies(self, url_split, post_string: str, is_json: bool) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
|
|
filters = {
|
|
"page_no": 1,
|
|
"limit": PAGE_LIMIT,
|
|
"sort_by": "created_at",
|
|
"sort_dir": "desc",
|
|
}
|
|
|
|
strat_id = int(url_split[3]) if len(url_split) > 3 else None
|
|
|
|
if post_string != "":
|
|
post_data = getFormData(post_string, is_json)
|
|
|
|
if have_data_entry(post_data, "sort_by"):
|
|
sort_by = get_data_entry(post_data, "sort_by")
|
|
assert sort_by in ["created_at", "rate"], "Invalid sort by"
|
|
filters["sort_by"] = sort_by
|
|
if have_data_entry(post_data, "sort_dir"):
|
|
sort_dir = get_data_entry(post_data, "sort_dir")
|
|
assert sort_dir in ["asc", "desc"], "Invalid sort dir"
|
|
filters["sort_dir"] = sort_dir
|
|
|
|
if have_data_entry(post_data, "offset"):
|
|
filters["offset"] = int(get_data_entry(post_data, "offset"))
|
|
if have_data_entry(post_data, "limit"):
|
|
filters["limit"] = int(get_data_entry(post_data, "limit"))
|
|
assert (
|
|
filters["limit"] > 0 and filters["limit"] <= PAGE_LIMIT
|
|
), "Invalid limit"
|
|
|
|
set_data = {}
|
|
if have_data_entry(post_data, "set_label"):
|
|
set_data["label"] = get_data_entry(post_data, "set_label")
|
|
if have_data_entry(post_data, "set_data"):
|
|
set_data["data"] = json.loads(get_data_entry(post_data, "set_data"))
|
|
if have_data_entry(post_data, "set_note"):
|
|
set_data["note"] = get_data_entry(post_data, "set_note")
|
|
if have_data_entry(post_data, "set_only_known_identities"):
|
|
set_data["only_known_identities"] = get_data_entry(
|
|
post_data, "set_only_known_identities"
|
|
)
|
|
|
|
if have_data_entry(post_data, "set_max_concurrent_bids"):
|
|
if "data" in set_data:
|
|
raise ValueError("set_max_concurrent_bids can't be used with set_data")
|
|
set_data["set_max_concurrent_bids"] = int(
|
|
get_data_entry(post_data, "set_max_concurrent_bids")
|
|
)
|
|
|
|
if set_data:
|
|
ensure(strat_id is not None, "Must specify a strategy to modify")
|
|
swap_client.updateAutomationStrategy(strat_id, set_data)
|
|
|
|
if strat_id is not None:
|
|
strat_data = swap_client.getAutomationStrategy(strat_id)
|
|
rv = {
|
|
"record_id": strat_data.record_id,
|
|
"label": strat_data.label,
|
|
"type_ind": strat_data.type_ind,
|
|
"only_known_identities": strat_data.only_known_identities,
|
|
"data": json.loads(strat_data.data.decode("utf-8")),
|
|
"note": "" if strat_data.note is None else strat_data.note,
|
|
}
|
|
return bytes(json.dumps(rv), "UTF-8")
|
|
|
|
rv = []
|
|
strats = swap_client.listAutomationStrategies(filters)
|
|
for row in strats:
|
|
rv.append((row[0], row[1], row[2]))
|
|
return bytes(json.dumps(rv), "UTF-8")
|
|
|
|
|
|
def js_validateamount(self, url_split, post_string: str, is_json: bool) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
|
|
post_data = getFormData(post_string, is_json)
|
|
|
|
ticker_str = post_data["coin"]
|
|
amount = post_data["amount"]
|
|
round_method = post_data.get("method", "none")
|
|
|
|
valid_round_methods = ("roundoff", "rounddown", "none")
|
|
if round_method not in valid_round_methods:
|
|
raise ValueError(
|
|
f"Unknown rounding method, must be one of {valid_round_methods}"
|
|
)
|
|
|
|
coin_type = tickerToCoinId(ticker_str)
|
|
ci = swap_client.ci(coin_type)
|
|
|
|
r = 0
|
|
if round_method == "roundoff":
|
|
r = 1
|
|
elif round_method == "rounddown":
|
|
r = -1
|
|
|
|
output_amount = ci.format_amount(amount, conv_int=True, r=r)
|
|
return bytes(json.dumps(output_amount), "UTF-8")
|
|
|
|
|
|
def js_vacuumdb(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
swap_client.vacuumDB()
|
|
|
|
return bytes(json.dumps({"completed": True}), "UTF-8")
|
|
|
|
|
|
def js_getcoinseed(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
swap_client.checkSystemStatus()
|
|
post_data = getFormData(post_string, is_json)
|
|
|
|
coin = getCoinType(get_data_entry(post_data, "coin"))
|
|
if coin in (Coins.PART, Coins.PART_ANON, Coins.PART_BLIND):
|
|
raise ValueError("Particl wallet seed is set from the Basicswap mnemonic.")
|
|
|
|
ci = swap_client.ci(coin)
|
|
if coin in (Coins.XMR, Coins.WOW):
|
|
key_view = swap_client.getWalletKey(coin, 1, for_ed25519=True)
|
|
key_spend = swap_client.getWalletKey(coin, 2, for_ed25519=True)
|
|
address = ci.getAddressFromKeys(key_view, key_spend)
|
|
return bytes(
|
|
json.dumps(
|
|
{
|
|
"coin": ci.ticker(),
|
|
"key_view": ci.encodeKey(key_view),
|
|
"key_spend": ci.encodeKey(key_spend),
|
|
"address": address,
|
|
}
|
|
),
|
|
"UTF-8",
|
|
)
|
|
|
|
seed_key = swap_client.getWalletKey(coin, 1)
|
|
seed_id = ci.getSeedHash(seed_key)
|
|
return bytes(
|
|
json.dumps(
|
|
{"coin": ci.ticker(), "seed": seed_key.hex(), "seed_id": seed_id.hex()}
|
|
),
|
|
"UTF-8",
|
|
)
|
|
|
|
|
|
def js_setpassword(self, url_split, post_string, is_json) -> bytes:
|
|
# Set or change wallet passwords
|
|
# Only works with currently enabled coins
|
|
# Will fail if any coin does not unlock on the old password
|
|
swap_client = self.server.swap_client
|
|
post_data = getFormData(post_string, is_json)
|
|
|
|
old_password = get_data_entry(post_data, "oldpassword")
|
|
new_password = get_data_entry(post_data, "newpassword")
|
|
|
|
if have_data_entry(post_data, "coin"):
|
|
# Set password for one coin
|
|
coin = getCoinType(get_data_entry(post_data, "coin"))
|
|
if coin in (Coins.PART_ANON, Coins.PART_BLIND, Coins.LTC_MWEB):
|
|
raise ValueError("Invalid coin.")
|
|
swap_client.changeWalletPasswords(old_password, new_password, coin)
|
|
return bytes(json.dumps({"success": True}), "UTF-8")
|
|
|
|
# Set password for all coins
|
|
swap_client.changeWalletPasswords(old_password, new_password)
|
|
return bytes(json.dumps({"success": True}), "UTF-8")
|
|
|
|
|
|
def js_unlock(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
post_data = getFormData(post_string, is_json)
|
|
|
|
password = get_data_entry(post_data, "password")
|
|
|
|
if have_data_entry(post_data, "coin"):
|
|
coin = getCoinType(str(get_data_entry(post_data, "coin")))
|
|
if coin in (Coins.PART_ANON, Coins.PART_BLIND):
|
|
raise ValueError("Invalid coin.")
|
|
swap_client.unlockWallets(password, coin)
|
|
return bytes(json.dumps({"success": True}), "UTF-8")
|
|
|
|
swap_client.unlockWallets(password)
|
|
return bytes(json.dumps({"success": True}), "UTF-8")
|
|
|
|
|
|
def js_lock(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
post_data = {} if post_string == "" else getFormData(post_string, is_json)
|
|
|
|
if have_data_entry(post_data, "coin"):
|
|
coin = getCoinType(get_data_entry(post_data, "coin"))
|
|
if coin in (Coins.PART_ANON, Coins.PART_BLIND):
|
|
raise ValueError("Invalid coin.")
|
|
swap_client.lockWallets(coin)
|
|
return bytes(json.dumps({"success": True}), "UTF-8")
|
|
|
|
swap_client.lockWallets()
|
|
return bytes(json.dumps({"success": True}), "UTF-8")
|
|
|
|
|
|
def js_404(self, url_split, post_string, is_json) -> bytes:
|
|
return bytes(json.dumps({"Error": "path unknown"}), "UTF-8")
|
|
|
|
|
|
def js_help(self, url_split, post_string, is_json) -> bytes:
|
|
# TODO: Add details and examples
|
|
commands = []
|
|
for k in pages:
|
|
commands.append(k)
|
|
return bytes(json.dumps({"commands": commands}), "UTF-8")
|
|
|
|
|
|
def js_readurl(self, url_split, post_string, is_json) -> bytes:
|
|
swap_client = self.server.swap_client
|
|
post_data = {} if post_string == "" else getFormData(post_string, is_json)
|
|
if have_data_entry(post_data, "url"):
|
|
url = get_data_entry(post_data, "url")
|
|
default_headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.5",
|
|
}
|
|
response = swap_client.readURL(url, headers=default_headers)
|
|
try:
|
|
error = json.loads(response.decode())
|
|
if "Error" in error:
|
|
return json.dumps({"Error": error["Error"]}).encode()
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return response
|
|
raise ValueError("Requires URL.")
|
|
|
|
|
|
pages = {
|
|
"coins": js_coins,
|
|
"wallets": js_wallets,
|
|
"offers": js_offers,
|
|
"sentoffers": js_sentoffers,
|
|
"bids": js_bids,
|
|
"sentbids": js_sentbids,
|
|
"network": js_network,
|
|
"revokeoffer": js_revokeoffer,
|
|
"smsgaddresses": js_smsgaddresses,
|
|
"rate": js_rate,
|
|
"rates": js_rates,
|
|
"rateslist": js_rates_list,
|
|
"generatenotification": js_generatenotification,
|
|
"notifications": js_notifications,
|
|
"identities": js_identities,
|
|
"automationstrategies": js_automationstrategies,
|
|
"validateamount": js_validateamount,
|
|
"vacuumdb": js_vacuumdb,
|
|
"getcoinseed": js_getcoinseed,
|
|
"setpassword": js_setpassword,
|
|
"unlock": js_unlock,
|
|
"lock": js_lock,
|
|
"help": js_help,
|
|
"readurl": js_readurl,
|
|
}
|
|
|
|
|
|
def js_url_to_function(url_split):
|
|
if len(url_split) > 2:
|
|
return pages.get(url_split[2], js_404)
|
|
return js_index
|