Encode and decode Decred addresses.

This commit is contained in:
tecnovert 2024-04-07 21:27:46 +02:00
parent 3ba5532fa0
commit ee239aba0b
9 changed files with 801 additions and 75 deletions

View file

@ -0,0 +1,533 @@
intro = """
blake.py
version 5, 2-Apr-2014
BLAKE is a SHA3 round-3 finalist designed and submitted by
Jean-Philippe Aumasson et al.
At the core of BLAKE is a ChaCha-like mixer, very similar
to that found in the stream cipher, ChaCha8. Besides being
a very good mixer, ChaCha is fast.
References:
http://www.131002.net/blake/
http://csrc.nist.gov/groups/ST/hash/sha-3/index.html
http://en.wikipedia.org/wiki/BLAKE_(hash_function)
This implementation assumes all data is in increments of
whole bytes. (The formal definition of BLAKE allows for
hashing individual bits.) Note too that this implementation
does include the round-3 tweaks where the number of rounds
was increased to 14/16 from 10/14.
This version can be imported into both Python2 (2.6 and 2.7)
and Python3 programs. Python 2.5 requires an older version
of blake.py (version 4).
Here are some comparative times for different versions of
Python:
64-bit:
2.6 6.284s
2.7 6.343s
3.2 7.620s
pypy (2.7) 2.080s
32-bit:
2.5 (32) 15.389s (with psyco)
2.7-32 13.645s
3.2-32 12.574s
One test on a 2.0GHz Core 2 Duo of 10,000 iterations of
BLAKE-256 on a short message produced a time of 5.7 seconds.
Not bad, but if raw speed is what you want, look to the
the C version. It is 40x faster and did the same thing
in 0.13 seconds.
Copyright (c) 2009-2012 by Larry Bugbee, Kent, WA
ALL RIGHTS RESERVED.
blake.py IS EXPERIMENTAL SOFTWARE FOR EDUCATIONAL
PURPOSES ONLY. IT IS MADE AVAILABLE "AS-IS" WITHOUT
WARRANTY OR GUARANTEE OF ANY KIND. USE SIGNIFIES
ACCEPTANCE OF ALL RISK.
To make your learning and experimentation less cumbersome,
blake.py is free for any use.
Enjoy,
Larry Bugbee
March 2011
rev May 2011 - fixed Python version check (tx JP)
rev Apr 2012 - fixed an out-of-order bit set in final()
- moved self-test to a separate test pgm
- this now works with Python2 and Python3
rev Apr 2014 - added test and conversion of string input
to byte string in update() (tx Soham)
- added hexdigest() method.
- now support state 3 so only one call to
final() per instantiation is allowed. all
subsequent calls to final(), digest() or
hexdigest() simply return the stored value.
"""
import struct
from binascii import hexlify, unhexlify
#---------------------------------------------------------------
class BLAKE(object):
# - - - - - - - - - - - - - - - - - - - - - - - - - - -
# initial values, constants and padding
# IVx for BLAKE-x
IV64 = [
0x6A09E667F3BCC908, 0xBB67AE8584CAA73B,
0x3C6EF372FE94F82B, 0xA54FF53A5F1D36F1,
0x510E527FADE682D1, 0x9B05688C2B3E6C1F,
0x1F83D9ABFB41BD6B, 0x5BE0CD19137E2179,
]
IV48 = [
0xCBBB9D5DC1059ED8, 0x629A292A367CD507,
0x9159015A3070DD17, 0x152FECD8F70E5939,
0x67332667FFC00B31, 0x8EB44A8768581511,
0xDB0C2E0D64F98FA7, 0x47B5481DBEFA4FA4,
]
# note: the values here are the same as the high-order
# half-words of IV64
IV32 = [
0x6A09E667, 0xBB67AE85,
0x3C6EF372, 0xA54FF53A,
0x510E527F, 0x9B05688C,
0x1F83D9AB, 0x5BE0CD19,
]
# note: the values here are the same as the low-order
# half-words of IV48
IV28 = [
0xC1059ED8, 0x367CD507,
0x3070DD17, 0xF70E5939,
0xFFC00B31, 0x68581511,
0x64F98FA7, 0xBEFA4FA4,
]
# constants for BLAKE-64 and BLAKE-48
C64 = [
0x243F6A8885A308D3, 0x13198A2E03707344,
0xA4093822299F31D0, 0x082EFA98EC4E6C89,
0x452821E638D01377, 0xBE5466CF34E90C6C,
0xC0AC29B7C97C50DD, 0x3F84D5B5B5470917,
0x9216D5D98979FB1B, 0xD1310BA698DFB5AC,
0x2FFD72DBD01ADFB7, 0xB8E1AFED6A267E96,
0xBA7C9045F12C7F99, 0x24A19947B3916CF7,
0x0801F2E2858EFC16, 0x636920D871574E69,
]
# constants for BLAKE-32 and BLAKE-28
# note: concatenate and the values are the same as the values
# for the 1st half of C64
C32 = [
0x243F6A88, 0x85A308D3,
0x13198A2E, 0x03707344,
0xA4093822, 0x299F31D0,
0x082EFA98, 0xEC4E6C89,
0x452821E6, 0x38D01377,
0xBE5466CF, 0x34E90C6C,
0xC0AC29B7, 0xC97C50DD,
0x3F84D5B5, 0xB5470917,
]
# the 10 permutations of:0,...15}
SIGMA = [
[ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15],
[14,10, 4, 8, 9,15,13, 6, 1,12, 0, 2,11, 7, 5, 3],
[11, 8,12, 0, 5, 2,15,13,10,14, 3, 6, 7, 1, 9, 4],
[ 7, 9, 3, 1,13,12,11,14, 2, 6, 5,10, 4, 0,15, 8],
[ 9, 0, 5, 7, 2, 4,10,15,14, 1,11,12, 6, 8, 3,13],
[ 2,12, 6,10, 0,11, 8, 3, 4,13, 7, 5,15,14, 1, 9],
[12, 5, 1,15,14,13, 4,10, 0, 7, 6, 3, 9, 2, 8,11],
[13,11, 7,14,12, 1, 3, 9, 5, 0,15, 4, 8, 6, 2,10],
[ 6,15,14, 9,11, 3, 0, 8,12, 2,13, 7, 1, 4,10, 5],
[10, 2, 8, 4, 7, 6, 1, 5,15,11, 9,14, 3,12,13, 0],
[ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15],
[14,10, 4, 8, 9,15,13, 6, 1,12, 0, 2,11, 7, 5, 3],
[11, 8,12, 0, 5, 2,15,13,10,14, 3, 6, 7, 1, 9, 4],
[ 7, 9, 3, 1,13,12,11,14, 2, 6, 5,10, 4, 0,15, 8],
[ 9, 0, 5, 7, 2, 4,10,15,14, 1,11,12, 6, 8, 3,13],
[ 2,12, 6,10, 0,11, 8, 3, 4,13, 7, 5,15,14, 1, 9],
[12, 5, 1,15,14,13, 4,10, 0, 7, 6, 3, 9, 2, 8,11],
[13,11, 7,14,12, 1, 3, 9, 5, 0,15, 4, 8, 6, 2,10],
[ 6,15,14, 9,11, 3, 0, 8,12, 2,13, 7, 1, 4,10, 5],
[10, 2, 8, 4, 7, 6, 1, 5,15,11, 9,14, 3,12,13, 0],
]
MASK32BITS = 0xFFFFFFFF
MASK64BITS = 0xFFFFFFFFFFFFFFFF
# - - - - - - - - - - - - - - - - - - - - - - - - - - -
def __init__(self, hashbitlen):
"""
load the hashSate structure (copy hashbitlen...)
hashbitlen: length of the hash output
"""
if hashbitlen not in [224, 256, 384, 512]:
raise Exception('hash length not 224, 256, 384 or 512')
self.hashbitlen = hashbitlen
self.h = [0]*8 # current chain value (initialized to the IV)
self.t = 0 # number of *BITS* hashed so far
self.cache = b'' # cached leftover data not yet compressed
self.salt = [0]*4 # salt (null by default)
self.state = 1 # set to 2 by update and 3 by final
self.nullt = 0 # Boolean value for special case \ell_i=0
# The algorithm is the same for both the 32- and 64- versions
# of BLAKE. The difference is in word size (4 vs 8 bytes),
# blocksize (64 vs 128 bytes), number of rounds (14 vs 16)
# and a few very specific constants.
if (hashbitlen == 224) or (hashbitlen == 256):
# setup for 32-bit words and 64-bit block
self.byte2int = self._fourByte2int
self.int2byte = self._int2fourByte
self.MASK = self.MASK32BITS
self.WORDBYTES = 4
self.WORDBITS = 32
self.BLKBYTES = 64
self.BLKBITS = 512
self.ROUNDS = 14 # was 10 before round 3
self.cxx = self.C32
self.rot1 = 16 # num bits to shift in G
self.rot2 = 12 # num bits to shift in G
self.rot3 = 8 # num bits to shift in G
self.rot4 = 7 # num bits to shift in G
self.mul = 0 # for 32-bit words, 32<<self.mul where self.mul = 0
# 224- and 256-bit versions (32-bit words)
if hashbitlen == 224:
self.h = self.IV28[:]
else:
self.h = self.IV32[:]
elif (hashbitlen == 384) or (hashbitlen == 512):
# setup for 64-bit words and 128-bit block
self.byte2int = self._eightByte2int
self.int2byte = self._int2eightByte
self.MASK = self.MASK64BITS
self.WORDBYTES = 8
self.WORDBITS = 64
self.BLKBYTES = 128
self.BLKBITS = 1024
self.ROUNDS = 16 # was 14 before round 3
self.cxx = self.C64
self.rot1 = 32 # num bits to shift in G
self.rot2 = 25 # num bits to shift in G
self.rot3 = 16 # num bits to shift in G
self.rot4 = 11 # num bits to shift in G
self.mul = 1 # for 64-bit words, 32<<self.mul where self.mul = 1
# 384- and 512-bit versions (64-bit words)
if hashbitlen == 384:
self.h = self.IV48[:]
else:
self.h = self.IV64[:]
# - - - - - - - - - - - - - - - - - - - - - - - - - - -
def _compress(self, block):
byte2int = self.byte2int
mul = self.mul # de-reference these for ...speed? ;-)
cxx = self.cxx
rot1 = self.rot1
rot2 = self.rot2
rot3 = self.rot3
rot4 = self.rot4
MASK = self.MASK
WORDBITS = self.WORDBITS
SIGMA = self.SIGMA
# get message (<<2 is the same as *4 but faster)
m = [byte2int(block[i<<2<<mul:(i<<2<<mul)+(4<<mul)]) for i in range(16)]
# initialization
v = [0]*16
v[ 0: 8] = [self.h[i] for i in range(8)]
v[ 8:16] = [self.cxx[i] for i in range(8)]
v[ 8:12] = [v[8+i] ^ self.salt[i] for i in range(4)]
if self.nullt == 0: # (i>>1 is the same as i/2 but faster)
v[12] = v[12] ^ (self.t & MASK)
v[13] = v[13] ^ (self.t & MASK)
v[14] = v[14] ^ (self.t >> self.WORDBITS)
v[15] = v[15] ^ (self.t >> self.WORDBITS)
# - - - - - - - - - - - - - - - - -
# ready? let's ChaCha!!!
def G(a, b, c, d, i):
va = v[a] # it's faster to deref and reref later
vb = v[b]
vc = v[c]
vd = v[d]
sri = SIGMA[round][i]
sri1 = SIGMA[round][i+1]
va = ((va + vb) + (m[sri] ^ cxx[sri1]) ) & MASK
x = vd ^ va
vd = (x >> rot1) | ((x << (WORDBITS-rot1)) & MASK)
vc = (vc + vd) & MASK
x = vb ^ vc
vb = (x >> rot2) | ((x << (WORDBITS-rot2)) & MASK)
va = ((va + vb) + (m[sri1] ^ cxx[sri]) ) & MASK
x = vd ^ va
vd = (x >> rot3) | ((x << (WORDBITS-rot3)) & MASK)
vc = (vc + vd) & MASK
x = vb ^ vc
vb = (x >> rot4) | ((x << (WORDBITS-rot4)) & MASK)
v[a] = va
v[b] = vb
v[c] = vc
v[d] = vd
for round in range(self.ROUNDS):
# column step
G( 0, 4, 8,12, 0)
G( 1, 5, 9,13, 2)
G( 2, 6,10,14, 4)
G( 3, 7,11,15, 6)
# diagonal step
G( 0, 5,10,15, 8)
G( 1, 6,11,12,10)
G( 2, 7, 8,13,12)
G( 3, 4, 9,14,14)
# - - - - - - - - - - - - - - - - -
# save current hash value (use i&0x3 to get 0,1,2,3,0,1,2,3)
self.h = [self.h[i]^v[i]^v[i+8]^self.salt[i&0x3]
for i in range(8)]
# print 'self.h', [num2hex(h) for h in self.h]
# - - - - - - - - - - - - - - - - - - - - - - - - - - -
def addsalt(self, salt):
""" adds a salt to the hash function (OPTIONAL)
should be called AFTER Init, and BEFORE update
salt: a bytestring, length determined by hashbitlen.
if not of sufficient length, the bytestring
will be assumed to be a big endian number and
prefixed with an appropriate number of null
bytes, and if too large, only the low order
bytes will be used.
if hashbitlen=224 or 256, then salt will be 16 bytes
if hashbitlen=384 or 512, then salt will be 32 bytes
"""
# fail if addsalt() was not called at the right time
if self.state != 1:
raise Exception('addsalt() not called after init() and before update()')
# salt size is to be 4x word size
saltsize = self.WORDBYTES * 4
# if too short, prefix with null bytes. if too long,
# truncate high order bytes
if len(salt) < saltsize:
salt = (chr(0)*(saltsize-len(salt)) + salt)
else:
salt = salt[-saltsize:]
# prep the salt array
self.salt[0] = self.byte2int(salt[ : 4<<self.mul])
self.salt[1] = self.byte2int(salt[ 4<<self.mul: 8<<self.mul])
self.salt[2] = self.byte2int(salt[ 8<<self.mul:12<<self.mul])
self.salt[3] = self.byte2int(salt[12<<self.mul: ])
# - - - - - - - - - - - - - - - - - - - - - - - - - - -
def update(self, data):
""" update the state with new data, storing excess data
as necessary. may be called multiple times and if a
call sends less than a full block in size, the leftover
is cached and will be consumed in the next call
data: data to be hashed (bytestring)
"""
self.state = 2
BLKBYTES = self.BLKBYTES # de-referenced for improved readability
BLKBITS = self.BLKBITS
datalen = len(data)
if not datalen: return
if type(data) == type(u''):
# use either of the next two lines for a proper
# response under both Python2 and Python3
data = data.encode('UTF-8') # converts to byte string
#data = bytearray(data, 'utf-8') # use if want mutable
# This next line works for Py3 but fails under
# Py2 because the Py2 version of bytes() will
# accept only *one* argument. Arrrrgh!!!
#data = bytes(data, 'utf-8') # converts to immutable byte
# string but... under p7
# bytes() wants only 1 arg
# ...a dummy, 2nd argument like encoding=None
# that does nothing would at least allow
# compatibility between Python2 and Python3.
left = len(self.cache)
fill = BLKBYTES - left
# if any cached data and any added new data will fill a
# full block, fill and compress
if left and datalen >= fill:
self.cache = self.cache + data[:fill]
self.t += BLKBITS # update counter
self._compress(self.cache)
self.cache = b''
data = data[fill:]
datalen -= fill
# compress new data until not enough for a full block
while datalen >= BLKBYTES:
self.t += BLKBITS # update counter
self._compress(data[:BLKBYTES])
data = data[BLKBYTES:]
datalen -= BLKBYTES
# cache all leftover bytes until next call to update()
if datalen > 0:
self.cache = self.cache + data[:datalen]
# - - - - - - - - - - - - - - - - - - - - - - - - - - -
def final(self, data=''):
""" finalize the hash -- pad and hash remaining data
returns hashval, the digest
"""
if self.state == 3:
# we have already finalized so simply return the
# previously calculated/stored hash value
return self.hash
if data:
self.update(data)
ZZ = b'\x00'
ZO = b'\x01'
OZ = b'\x80'
OO = b'\x81'
PADDING = OZ + ZZ*128 # pre-formatted padding data
# copy nb. bits hash in total as a 64-bit BE word
# copy nb. bits hash in total as a 128-bit BE word
tt = self.t + (len(self.cache) << 3)
if self.BLKBYTES == 64:
msglen = self._int2eightByte(tt)
else:
low = tt & self.MASK
high = tt >> self.WORDBITS
msglen = self._int2eightByte(high) + self._int2eightByte(low)
# size of block without the words at the end that count
# the number of bits, 55 or 111.
# Note: (((self.WORDBITS/8)*2)+1) equals ((self.WORDBITS>>2)+1)
sizewithout = self.BLKBYTES - ((self.WORDBITS>>2)+1)
if len(self.cache) == sizewithout:
# special case of one padding byte
self.t -= 8
if self.hashbitlen in [224, 384]:
self.update(OZ)
else:
self.update(OO)
else:
if len(self.cache) < sizewithout:
# enough space to fill the block
# use t=0 if no remaining data
if len(self.cache) == 0:
self.nullt=1
self.t -= (sizewithout - len(self.cache)) << 3
self.update(PADDING[:sizewithout - len(self.cache)])
else:
# NOT enough space, need 2 compressions
# ...add marker, pad with nulls and compress
self.t -= (self.BLKBYTES - len(self.cache)) << 3
self.update(PADDING[:self.BLKBYTES - len(self.cache)])
# ...now pad w/nulls leaving space for marker & bit count
self.t -= (sizewithout+1) << 3
self.update(PADDING[1:sizewithout+1]) # pad with zeroes
self.nullt = 1 # raise flag to set t=0 at the next _compress
# append a marker byte
if self.hashbitlen in [224, 384]:
self.update(ZZ)
else:
self.update(ZO)
self.t -= 8
# append the number of bits (long long)
self.t -= self.BLKBYTES
self.update(msglen)
hashval = []
if self.BLKBYTES == 64:
for h in self.h:
hashval.append(self._int2fourByte(h))
else:
for h in self.h:
hashval.append(self._int2eightByte(h))
self.hash = b''.join(hashval)[:self.hashbitlen >> 3]
self.state = 3
return self.hash
digest = final # may use digest() as a synonym for final()
# - - - - - - - - - - - - - - - - - - - - - - - - - - -
def hexdigest(self, data=''):
return hexlify(self.final(data)).decode('UTF-8')
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# utility functions
def _fourByte2int(self, bytestr): # see also long2byt() below
""" convert a 4-byte string to an int (long) """
return struct.unpack('!L', bytestr)[0]
def _eightByte2int(self, bytestr):
""" convert a 8-byte string to an int (long long) """
return struct.unpack('!Q', bytestr)[0]
def _int2fourByte(self, x): # see also long2byt() below
""" convert a number to a 4-byte string, high order
truncation possible (in Python x could be a BIGNUM)
"""
return struct.pack('!L', x)
def _int2eightByte(self, x):
""" convert a number to a 8-byte string, high order
truncation possible (in Python x could be a BIGNUM)
"""
return struct.pack('!Q', x)
#---------------------------------------------------------------
#---------------------------------------------------------------
#---------------------------------------------------------------
def blake_hash(data):
return BLAKE(256).digest(data)

View file

@ -0,0 +1,37 @@
from blake256 import blake_hash
testVectors = [
["716f6e863f744b9ac22c97ec7b76ea5f5908bc5b2f67c61510bfc4751384ea7a", ""],
["43234ff894a9c0590d0246cfc574eb781a80958b01d7a2fa1ac73c673ba5e311", "a"],
["658c6d9019a1deddbcb3640a066dfd23471553a307ab941fd3e677ba887be329", "ab"],
["1833a9fa7cf4086bd5fda73da32e5a1d75b4c3f89d5c436369f9d78bb2da5c28", "abc"],
["35282468f3b93c5aaca6408582fced36e578f67671ed0741c332d68ac72d7aa2", "abcd"],
["9278d633efce801c6aa62987d7483d50e3c918caed7d46679551eed91fba8904", "abcde"],
["7a17ee5e289845adcafaf6ca1b05c4a281b232a71c7083f66c19ba1d1169a8d4", "abcdef"],
["ee8c7f94ff805cb2e644643010ea43b0222056420917ec70c3da764175193f8f", "abcdefg"],
["7b37c0876d29c5add7800a1823795a82b809fc12f799ff6a4b5e58d52c42b17e", "abcdefgh"],
["bdc514bea74ffbb9c3aa6470b08ceb80a88e313ad65e4a01457bbffd0acc86de", "abcdefghi"],
["12e3afb9739df8d727e93d853faeafc374cc55aedc937e5a1e66f5843b1d4c2e", "abcdefghij"],
["22297d373b751f581944bb26315133f6fda2f0bf60f65db773900f61f81b7e79", "Discard medicine more than two years old."],
["4d48d137bc9cf6d21415b805bf33f59320337d85c673998260e03a02a0d760cd", "He who has a shady past knows that nice guys finish last."],
["beba299e10f93e17d45663a6dc4b8c9349e4f5b9bac0d7832389c40a1b401e5c", "I wouldn't marry him with a ten foot pole."],
["42e082ae7f967781c6cd4e0ceeaeeb19fb2955adbdbaf8c7ec4613ac130071b3", "Free! Free!/A trip/to Mars/for 900/empty jars/Burma Shave"],
["207d06b205bfb359df91b48b6fd8aa6e4798b712d1cc5e91a254da9cef8684a3", "The days of the digital watch are numbered. -Tom Stoppard"],
["d56eab6927e371e2148b0788779aaf565d30567af2af822b6be3b90db9767a70", "Nepal premier won't resign."],
["01020709ca7fd10dc7756ce767d508d7206167d300b7a7ed76838a8547a7898c", "For every action there is an equal and opposite government program."],
["5569a6cc6535a66da221d8f6ad25008f28752d0343f3f1d757f1ecc9b1c61536", "His money is twice tainted: 'taint yours and 'taint mine."],
["8ff699b5ac7687c82600e89d0ff6cfa87e7179759184386971feb76fbae9975f", "There is no reason for any individual to have a computer in their home. -Ken Olsen, 1977"],
["f4b3a7c85a418b15ce330fd41ae0254b036ad48dd98aa37f0506a995ba9c6029", "It's a tiny change to the code and not completely disgusting. - Bob Manchek"],
["1ed94bab64fe560ef0983165fcb067e9a8a971c1db8e6fb151ff9a7c7fe877e3", "size: a.out: bad magic"],
["ff15b54992eedf9889f7b4bbb16692881aa01ed10dfc860fdb04785d8185cd3c", "The major problem is with sendmail. -Mark Horton"],
["8a0a7c417a47deec0b6474d8c247da142d2e315113a2817af3de8f45690d8652", "Give me a rock, paper and scissors and I will move the world. CCFestoon"],
["310d263fdab056a930324cdea5f46f9ea70219c1a74b01009994484113222a62", "If the enemy is within range, then so are you."],
["1aaa0903aa4cf872fe494c322a6e535698ea2140e15f26fb6088287aedceb6ba", "It's well we cannot hear the screams/That we create in others' dreams."],
["2eb81bcaa9e9185a7587a1b26299dcfb30f2a58a7f29adb584b969725457ad4f", "You remind me of a TV show, but that's all right: I watch it anyway."],
["c27b1683ef76e274680ab5492e592997b0d9d5ac5a5f4651b6036f64215256af", "C is as portable as Stonehedge!!"],
["3995cce8f32b174c22ffac916124bd095c80205d9d5f1bb08a155ac24b40d6cb", "Even if I could be Shakespeare, I think I should still choose to be Faraday. - A. Huxley"],
["496f7063f8bd479bf54e9d87e9ba53e277839ac7fdaecc5105f2879b58ee562f", "The fugacity of a constituent in a mixture of gases at a given temperature is proportional to its mole fraction. Lewis-Randall Rule"],
["2e0eff918940b01eea9539a02212f33ee84f77fab201f4287aa6167e4a1ed043", "How can you write a big system without C++? -Paul Glick"]]
for vectorSet in testVectors:
assert vectorSet[0] == blake_hash(vectorSet[1]).encode('hex')

View file

@ -37,6 +37,9 @@ from basicswap.util.address import (
decodeAddress,
pubkeyToAddress,
)
from basicswap.util.crypto import (
sha256,
)
from coincurve.keys import (
PrivateKey,
PublicKey)
@ -109,11 +112,27 @@ def find_vout_for_address_from_txobj(tx_obj, addr: str) -> int:
raise RuntimeError("Vout not found for address: txid={}, addr={}".format(tx_obj['txid'], addr))
class BTCInterface(CoinInterface):
class Secp256k1Interface(CoinInterface):
@staticmethod
def curve_type():
return Curves.secp256k1
def getNewSecretKey(self) -> bytes:
return i2b(getSecretInt())
def getPubkey(self, privkey):
return PublicKey.from_secret(privkey).format()
def verifyKey(self, k: bytes) -> bool:
i = b2i(k)
return (i < ep.o and i > 0)
def verifyPubkey(self, pubkey_bytes: bytes) -> bool:
return verify_secp256k1_point(pubkey_bytes)
class BTCInterface(Secp256k1Interface):
@staticmethod
def coin_type():
return Coins.BTC
@ -422,11 +441,11 @@ class BTCInterface(CoinInterface):
return segwit_addr.encode(bech32_prefix, version, pkh)
def pkh_to_address(self, pkh: bytes) -> str:
# pkh is hash160(pk)
# pkh is ripemd160(sha256(pk))
assert (len(pkh) == 20)
prefix = self.chainparams_network()['pubkey_address']
data = bytes((prefix,)) + pkh
checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest()
checksum = sha256(sha256(data))
return b58encode(data + checksum[0:4])
def sh_to_address(self, sh: bytes) -> str:
@ -452,12 +471,6 @@ class BTCInterface(CoinInterface):
assert (len(pk) == 33)
return self.pkh_to_address(hash160(pk))
def getNewSecretKey(self) -> bytes:
return i2b(getSecretInt())
def getPubkey(self, privkey):
return PublicKey.from_secret(privkey).format()
def getAddressHashFromKey(self, key: bytes) -> bytes:
pk = self.getPubkey(key)
return hash160(pk)
@ -465,13 +478,6 @@ class BTCInterface(CoinInterface):
def getSeedHash(self, seed) -> bytes:
return self.getAddressHashFromKey(seed)[::-1]
def verifyKey(self, k: bytes) -> bool:
i = b2i(k)
return (i < ep.o and i > 0)
def verifyPubkey(self, pubkey_bytes: bytes) -> bool:
return verify_secp256k1_point(pubkey_bytes)
def encodeKey(self, key_bytes: bytes) -> str:
wif_prefix = self.chainparams_network()['key_prefix']
return toWIF(wif_prefix, key_bytes)
@ -1018,20 +1024,20 @@ class BTCInterface(CoinInterface):
return hash160(K)
def getScriptDest(self, script):
return CScript([OP_0, hashlib.sha256(script).digest()])
return CScript([OP_0, sha256(script)])
def getScriptScriptSig(self, script: bytes) -> bytes:
return bytes()
def getP2SHP2WSHDest(self, script):
script_hash = hashlib.sha256(script).digest()
script_hash = sha256(script)
assert len(script_hash) == 32
p2wsh_hash = hash160(CScript([OP_0, script_hash]))
assert len(p2wsh_hash) == 20
return CScript([OP_HASH160, p2wsh_hash, OP_EQUAL])
def getP2SHP2WSHScriptSig(self, script):
script_hash = hashlib.sha256(script).digest()
script_hash = sha256(script)
assert len(script_hash) == 32
return CScript([CScript([OP_0, script_hash, ]), ])
@ -1249,25 +1255,25 @@ class BTCInterface(CoinInterface):
return self.rpc_wallet('sendtoaddress', params)
def signCompact(self, k, message):
message_hash = hashlib.sha256(bytes(message, 'utf-8')).digest()
message_hash = sha256(bytes(message, 'utf-8'))
privkey = PrivateKey(k)
return privkey.sign_recoverable(message_hash, hasher=None)[:64]
def signRecoverable(self, k, message):
message_hash = hashlib.sha256(bytes(message, 'utf-8')).digest()
message_hash = sha256(bytes(message, 'utf-8'))
privkey = PrivateKey(k)
return privkey.sign_recoverable(message_hash, hasher=None)
def verifyCompactSig(self, K, message, sig):
message_hash = hashlib.sha256(bytes(message, 'utf-8')).digest()
message_hash = sha256(bytes(message, 'utf-8'))
pubkey = PublicKey(K)
rv = pubkey.verify_compact(sig, message_hash, hasher=None)
assert (rv is True)
def verifySigAndRecover(self, sig, message):
message_hash = hashlib.sha256(bytes(message, 'utf-8')).digest()
message_hash = sha256(bytes(message, 'utf-8'))
pubkey = PublicKey.from_signature_and_message(sig, message_hash, hasher=None)
return pubkey.format()
@ -1276,7 +1282,7 @@ class BTCInterface(CoinInterface):
message_magic = self.chainparams()['message_magic']
message_bytes = SerialiseNumCompact(len(message_magic)) + bytes(message_magic, 'utf-8') + SerialiseNumCompact(len(message)) + bytes(message, 'utf-8')
message_hash = hashlib.sha256(hashlib.sha256(message_bytes).digest()).digest()
message_hash = sha256(sha256(message_bytes))
signature_bytes = base64.b64decode(signature)
rec_id = (signature_bytes[0] - 27) & 3
signature_bytes = signature_bytes[1:] + bytes((rec_id,))
@ -1502,7 +1508,7 @@ class BTCInterface(CoinInterface):
return CScript([OP_HASH160, script_hash, OP_EQUAL])
def get_p2wsh_script_pubkey(self, script: bytearray) -> bytearray:
return CScript([OP_0, hashlib.sha256(script).digest()])
return CScript([OP_0, sha256(script)])
def findTxnByHash(self, txid_hex: str):
# Only works for wallet txns

View file

@ -0,0 +1,63 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2024 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
from basicswap.chainparams import Coins
from basicswap.interface.btc import Secp256k1Interface
from basicswap.util.address import (
b58decode,
b58encode,
)
from basicswap.util.crypto import (
blake256,
ripemd160,
)
class DCRInterface(Secp256k1Interface):
@staticmethod
def coin_type():
return Coins.DCR
@staticmethod
def exp() -> int:
return 8
@staticmethod
def COIN() -> int:
return 100000000
@staticmethod
def nbk() -> int:
return 32
@staticmethod
def nbK() -> int: # No. of bytes requires to encode a public key
return 33
def __init__(self, coin_settings, network, swap_client=None):
super().__init__(network)
def pkh(self, pubkey: bytes) -> bytes:
return ripemd160(blake256(pubkey))
def pkh_to_address(self, pkh: bytes) -> str:
prefix = self.chainparams_network()['pubkey_address']
data = prefix.to_bytes(2, 'big') + pkh
checksum = blake256(blake256(data))
return b58encode(data + checksum[0:4])
def decode_address(self, address: str) -> bytes:
addr_data = b58decode(address)
if addr_data is None:
return None
prefixed_data = addr_data[:-4]
checksum = addr_data[-4:]
if blake256(blake256(prefixed_data))[:4] != checksum:
raise ValueError('Checksum mismatch')
return prefixed_data

View file

@ -1,12 +1,11 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022-2023 tecnovert
# Copyright (c) 2022-2024 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import hashlib
from basicswap.contrib.segwit_addr import bech32_decode, convertbits, bech32_encode
from basicswap.util.crypto import ripemd160
from basicswap.util.crypto import ripemd160, sha256
__b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
@ -68,7 +67,7 @@ def encodeStealthAddress(prefix_byte: int, scan_pubkey: bytes, spend_pubkey: byt
data += bytes((0x00,)) # num prefix bits
b = bytes((prefix_byte,)) + data
b += hashlib.sha256(hashlib.sha256(b).digest()).digest()[:4]
b += sha256(sha256(b))[:4]
return b58encode(b)
@ -83,13 +82,12 @@ def toWIF(prefix_byte: int, b: bytes, compressed: bool = True) -> str:
b = bytes((prefix_byte,)) + b
if compressed:
b += bytes((0x01,))
b += hashlib.sha256(hashlib.sha256(b).digest()).digest()[:4]
b += sha256(sha256(b))[:4]
return b58encode(b)
def getKeyID(key_data: bytes) -> bytes:
sha256_hash = hashlib.sha256(key_data).digest()
return ripemd160(sha256_hash)
return ripemd160(sha256(key_data))
def bech32Decode(hrp, addr):
@ -109,18 +107,19 @@ def bech32Encode(hrp, data):
return ret
def decodeAddress(address_str: str):
b58_addr = b58decode(address_str)
if b58_addr is not None:
address = b58_addr[:-4]
checksum = b58_addr[-4:]
assert (hashlib.sha256(hashlib.sha256(address).digest()).digest()[:4] == checksum), 'Checksum mismatch'
return b58_addr[:-4]
return None
def decodeAddress(address: str):
addr_data = b58decode(address)
if addr_data is None:
return None
prefixed_data = addr_data[:-4]
checksum = addr_data[-4:]
if sha256(sha256(prefixed_data))[:4] != checksum:
raise ValueError('Checksum mismatch')
return prefixed_data
def encodeAddress(address: bytes) -> str:
checksum = hashlib.sha256(hashlib.sha256(address).digest()).digest()
checksum = sha256(sha256(address))
return b58encode(address + checksum[0:4])

View file

@ -1,23 +1,28 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022-2023 tecnovert
# Copyright (c) 2022-2024 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
from Crypto.Hash import RIPEMD160, SHA256 # pycryptodome
from basicswap.contrib.blake256.blake256 import blake_hash
def sha256(data):
def sha256(data: bytes) -> bytes:
h = SHA256.new()
h.update(data)
return h.digest()
def ripemd160(data):
def ripemd160(data: bytes) -> bytes:
h = RIPEMD160.new()
h.update(data)
return h.digest()
def hash160(s):
def blake256(data: bytes) -> bytes:
return blake_hash(data)
def hash160(s: bytes) -> bytes:
return ripemd160(sha256(s))

View file

@ -0,0 +1,68 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2024 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE or http://www.opensource.org/licenses/mit-license.php.
import logging
import unittest
from basicswap.basicswap import (
Coins,
)
from tests.basicswap.util import (
REQUIRED_SETTINGS,
)
from tests.basicswap.common import (
stopDaemons,
)
from tests.basicswap.test_xmr import BaseTest
from basicswap.interface.dcr import DCRInterface
logger = logging.getLogger()
class Test(BaseTest):
__test__ = True
test_coin_from = Coins.DCR
decred_daemons = []
start_ltc_nodes = False
start_xmr_nodes = False
@classmethod
def prepareExtraCoins(cls):
pass
@classmethod
def tearDownClass(cls):
logging.info('Finalising Decred Test')
super(Test, cls).tearDownClass()
stopDaemons(cls.decred_daemons)
@classmethod
def coins_loop(cls):
super(Test, cls).coins_loop()
def test_001_decred(self):
logging.info('---------- Test {}'.format(self.test_coin_from.name))
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(REQUIRED_SETTINGS)
ci = DCRInterface(coin_settings, 'mainnet')
k = ci.getNewSecretKey()
K = ci.getPubkey(k)
pkh = ci.pkh(K)
address = ci.pkh_to_address(pkh)
assert (address.startswith('Ds'))
data = ci.decode_address(address)
assert (data[2:] == pkh)
if __name__ == '__main__':
unittest.main()

View file

@ -24,12 +24,13 @@ from coincurve.keys import (
from basicswap.util import i2b, h2b
from basicswap.util.integer import encode_varint, decode_varint
from basicswap.util.crypto import ripemd160, hash160
from basicswap.util.crypto import ripemd160, hash160, blake256
from basicswap.util.network import is_private_ip_address
from basicswap.util.rfc2440 import rfc2440_hash_password
from basicswap.util_xmr import encode_address as xmr_encode_address
from basicswap.interface.btc import BTCInterface
from basicswap.interface.xmr import XMRInterface
from tests.basicswap.util import REQUIRED_SETTINGS
from basicswap.basicswap_util import (
TxLockTypes)
@ -48,7 +49,6 @@ from basicswap.contrib.test_framework.script import hash160 as hash160_btc
class Test(unittest.TestCase):
REQUIRED_SETTINGS = {'blocks_confirmed': 1, 'conf_target': 1, 'use_segwit': True, 'connection_type': 'rpc'}
def test_serialise_num(self):
def test_case(v, nb=None):
@ -69,7 +69,7 @@ class Test(unittest.TestCase):
def test_sequence(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
@ -177,7 +177,7 @@ class Test(unittest.TestCase):
def test_ecdsa_otves(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
vk_sign = ci.getNewSecretKey()
vk_encrypt = ci.getNewSecretKey()
@ -200,7 +200,7 @@ class Test(unittest.TestCase):
def test_sign(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
vk = ci.getNewSecretKey()
@ -215,7 +215,7 @@ class Test(unittest.TestCase):
def test_sign_compact(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
vk = ci.getNewSecretKey()
@ -230,7 +230,7 @@ class Test(unittest.TestCase):
def test_sign_recoverable(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
vk = ci.getNewSecretKey()
@ -246,7 +246,7 @@ class Test(unittest.TestCase):
def test_pubkey_to_address(self):
coin_settings = {'rpcport': 0, 'rpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
coin_settings.update(REQUIRED_SETTINGS)
ci = BTCInterface(coin_settings, 'regtest')
pk = h2b('02c26a344e7d21bcc6f291532679559f2fd234c881271ff98714855edc753763a6')
addr = ci.pubkey_to_address(pk)
@ -254,7 +254,7 @@ class Test(unittest.TestCase):
def test_dleag(self):
coin_settings = {'rpcport': 0, 'walletrpcport': 0, 'walletrpcauth': 'none'}
coin_settings.update(self.REQUIRED_SETTINGS)
coin_settings.update(REQUIRED_SETTINGS)
ci = XMRInterface(coin_settings, 'regtest')
@ -430,32 +430,36 @@ class Test(unittest.TestCase):
assert (msg_buf_v2.time_valid == 0)
def test_is_private_ip_address(self):
assert (is_private_ip_address('localhost'))
assert (is_private_ip_address('127.0.0.1'))
assert (is_private_ip_address('10.0.0.0'))
assert (is_private_ip_address('172.16.0.0'))
assert (is_private_ip_address('192.168.0.0'))
assert (is_private_ip_address('20.87.245.0') is False)
assert (is_private_ip_address('particl.io') is False)
test_addresses = [
('localhost', True),
('127.0.0.1', True),
('10.0.0.0', True),
('172.16.0.0', True),
('192.168.0.0', True),
('20.87.245.0', False),
('particl.io', False),
]
for addr, is_private in test_addresses:
assert (is_private_ip_address(addr) is is_private)
def test_varint(self):
def test_case(i, expect_length):
test_vectors = [
(0, 1),
(1, 1),
(127, 1),
(128, 2),
(253, 2),
(8321, 2),
(16383, 2),
(16384, 3),
(2097151, 3),
(2097152, 4),
]
for i, expect_length in test_vectors:
b = encode_varint(i)
assert (len(b) == expect_length)
assert (decode_varint(b) == i)
test_case(0, 1)
test_case(1, 1)
test_case(127, 1)
test_case(128, 2)
test_case(253, 2)
test_case(8321, 2)
test_case(16383, 2)
test_case(16384, 3)
test_case(2097151, 3)
test_case(2097152, 4)
def test_base58(self):
kv = edu.get_secret()
Kv = edu.encodepoint(edf.scalarmult_B(kv))
@ -468,6 +472,14 @@ class Test(unittest.TestCase):
addr = xmr_encode_address(Kv, Ks, 4146)
assert (addr.startswith('Wo'))
def test_blake256(self):
test_vectors = [
('716f6e863f744b9ac22c97ec7b76ea5f5908bc5b2f67c61510bfc4751384ea7a', b''),
('7576698ee9cad30173080678e5965916adbb11cb5245d386bf1ffda1cb26c9d7', b'The quick brown fox jumps over the lazy dog'),
]
for expect_hash, data in test_vectors:
assert (blake256(data).hex() == expect_hash)
if __name__ == '__main__':
unittest.main()

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2022 tecnovert
# Copyright (c) 2022-2024 tecnovert
# Distributed under the MIT software license, see the accompanying
# file LICENSE.txt or http://www.opensource.org/licenses/mit-license.php.
@ -10,6 +10,9 @@ import urllib
from urllib.request import urlopen
REQUIRED_SETTINGS = {'blocks_confirmed': 1, 'conf_target': 1, 'use_segwit': True, 'connection_type': 'rpc'}
def make_boolean(s):
return s.lower() in ['1', 'true']