Initial commit

This commit is contained in:
nolash 2021-10-10 18:14:34 +02:00
commit 7753247afb
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
34 changed files with 1859 additions and 0 deletions

1
MANIFEST.in Normal file
View File

@ -0,0 +1 @@
include *requirements*

3
config/config.ini Normal file
View File

@ -0,0 +1,3 @@
[signer]
secret = deadbeef
socket_path = ipc:///tmp/crypto-dev-signer/jsonrpc.ipc

6
config/database.ini Normal file
View File

@ -0,0 +1,6 @@
[database]
NAME=cic-signer
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432

0
funga/eth/__init__.py Normal file
View File

View File

115
funga/eth/cli/handle.py Normal file
View File

@ -0,0 +1,115 @@
# standard imports
import json
import logging
# external imports
from jsonrpc.exceptions import (
JSONRPCServerError,
JSONRPCParseError,
JSONRPCInvalidParams,
)
from hexathon import add_0x
# local imports
from funga.eth.transaction import EIP155Transaction
from funga.error import (
UnknownAccountError,
SignerError,
)
from funga.eth.cli.jsonrpc import jsonrpc_ok
from .jsonrpc import (
jsonrpc_error,
is_valid_json,
)
logg = logging.getLogger(__name__)
class SignRequestHandler:
keystore = None
signer = None
def process_input(self, j):
rpc_id = j['id']
m = j['method']
p = j['params']
return (rpc_id, getattr(self, m)(p))
def handle_jsonrpc(self, d):
j = None
try:
j = json.loads(d)
is_valid_json(j)
logg.debug('{}'.format(d.decode('utf-8')))
except Exception as e:
logg.exception('input error {}'.format(e))
j = json.dumps(jsonrpc_error(None, JSONRPCParseError)).encode('utf-8')
raise SignerError(j)
try:
(rpc_id, r) = self.process_input(j)
r = jsonrpc_ok(rpc_id, r)
j = json.dumps(r).encode('utf-8')
except ValueError as e:
# TODO: handle cases to give better error context to caller
logg.exception('process error {}'.format(e))
j = json.dumps(jsonrpc_error(j['id'], JSONRPCServerError)).encode('utf-8')
raise SignerError(j)
except UnknownAccountError as e:
logg.exception('process unknown account error {}'.format(e))
j = json.dumps(jsonrpc_error(j['id'], JSONRPCServerError)).encode('utf-8')
raise SignerError(j)
return j
def personal_newAccount(self, p):
password = p
if p.__class__.__name__ != 'str':
if p.__class__.__name__ != 'list':
e = JSONRPCInvalidParams()
e.data = 'parameter must be list containing one string'
raise ValueError(e)
logg.error('foo {}'.format(p))
if len(p) != 1:
e = JSONRPCInvalidParams()
e.data = 'parameter must be list containing one string'
raise ValueError(e)
if p[0].__class__.__name__ != 'str':
e = JSONRPCInvalidParams()
e.data = 'parameter must be list containing one string'
raise ValueError(e)
password = p[0]
r = self.keystore.new(password)
return add_0x(r)
# TODO: move to translation module ("personal" rpc namespace is node-specific)
def personal_signTransaction(self, p):
logg.debug('got {} to sign'.format(p[0]))
t = EIP155Transaction(p[0], p[0]['nonce'], p[0]['chainId'])
raw_signed_tx = self.signer.sign_transaction_to_rlp(t, p[1])
o = {
'raw': '0x' + raw_signed_tx.hex(),
'tx': t.serialize(),
}
return o
def eth_signTransaction(self, tx):
o = self.personal_signTransaction([tx[0], ''])
return o['raw']
def eth_sign(self, p):
logg.debug('got message {} to sign'.format(p[1]))
message_type = type(p[1]).__name__
if message_type != 'str':
raise ValueError('invalid message format, must be {}, not {}'.format(message_type))
z = self.signer.sign_ethereum_message(p[0], p[1][2:])
return add_0x(z.hex())

85
funga/eth/cli/http.py Normal file
View File

@ -0,0 +1,85 @@
# standard imports
import logging
# external imports
from http.server import (
HTTPServer,
BaseHTTPRequestHandler,
)
# local imports
from .handle import SignRequestHandler
from crypto_dev_signer.error import SignerError
logg = logging.getLogger(__name__)
def start_server_http(spec):
httpd = HTTPServer(spec, HTTPSignRequestHandler)
logg.debug('starting http server {}'.format(spec))
httpd.serve_forever()
class HTTPSignRequestHandler(SignRequestHandler, BaseHTTPRequestHandler):
def do_POST(self):
if self.headers.get('Content-Type') != 'application/json':
self.send_response(400, 'me read json only')
self.end_headers()
return
try:
if 'application/json' not in self.headers.get('Accept').split(','):
self.send_response(400, 'me json only speak')
self.end_headers()
return
except AttributeError:
pass
l = self.headers.get('Content-Length')
try:
l = int(l)
except ValueError:
self.send_response(400, 'content length must be integer')
self.end_headers()
return
if l > 4096:
self.send_response(400, 'too much information')
self.end_headers()
return
if l < 0:
self.send_response(400, 'you are too negative')
self.end_headers()
return
b = b''
c = 0
while c < l:
d = self.rfile.read(l-c)
if d == None:
break
b += d
c += len(d)
if c > 4096:
self.send_response(413, 'i should slap you around for lying about your size')
self.end_headers()
return
try:
r = self.handle_jsonrpc(d)
except SignerError as e:
r = e.to_jsonrpc()
l = len(r)
self.send_response(200, 'You are the Keymaster')
self.send_header('Content-Length', str(l))
self.send_header('Cache-Control', 'no-cache')
self.send_header('Content-Type', 'application/json')
self.end_headers()
c = 0
while c < l:
n = self.wfile.write(r[c:])
c += n

30
funga/eth/cli/jsonrpc.py Normal file
View File

@ -0,0 +1,30 @@
# local imports
from funga.error import UnknownAccountError
def jsonrpc_error(rpc_id, err):
return {
'jsonrpc': '2.0',
'id': rpc_id,
'error': {
'code': err.CODE,
'message': err.MESSAGE,
},
}
def jsonrpc_ok(rpc_id, response):
return {
'jsonrpc': '2.0',
'id': rpc_id,
'result': response,
}
def is_valid_json(j):
if j.get('id') == 'None':
raise ValueError('id missing')
return True

67
funga/eth/cli/socket.py Normal file
View File

@ -0,0 +1,67 @@
# standard imports
import os
import logging
import socket
import stat
# local imports
from crypto_dev_signer.error import SignerError
from .handle import SignRequestHandler
logg = logging.getLogger(__name__)
class SocketHandler:
def __init__(self):
self.handler = SignRequestHandler()
def process(self, csock):
d = csock.recv(4096)
r = None
try:
r = self.handler.handle_jsonrpc(d)
except SignerError as e:
r = e.to_jsonrpc()
csock.send(r)
def start_server_socket(s):
s.listen(10)
logg.debug('server started')
handler = SocketHandler()
while True:
(csock, caddr) = s.accept()
handler.process(csock)
csock.close()
s.close()
os.unlink(socket_path)
def start_server_tcp(spec):
s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
s.bind(spec)
logg.debug('created tcp socket {}'.format(spec))
start_server_socket(s)
def start_server_unix(socket_path):
socket_dir = os.path.dirname(socket_path)
try:
fi = os.stat(socket_dir)
if not stat.S_ISDIR:
RuntimeError('socket path {} is not a directory'.format(socket_dir))
except FileNotFoundError:
os.mkdir(socket_dir)
try:
os.unlink(socket_path)
except FileNotFoundError:
pass
s = socket.socket(family = socket.AF_UNIX, type = socket.SOCK_STREAM)
s.bind(socket_path)
logg.debug('created unix ipc socket {}'.format(socket_path))
start_server_socket(s)

91
funga/eth/encoding.py Normal file
View File

@ -0,0 +1,91 @@
# standard imports
import logging
# external imports
import coincurve
import sha3
from hexathon import (
strip_0x,
uniform,
)
logg = logging.getLogger(__name__)
def private_key_from_bytes(b):
return coincurve.PrivateKey(secret=b)
def public_key_bytes_to_address(pubk_bytes, result_format='hex'):
h = sha3.keccak_256()
logg.debug('public key bytes {}'.format(pubk_bytes.hex()))
h.update(pubk_bytes[1:])
z = h.digest()[12:]
if result_format == 'hex':
return to_checksum_address(z[:20].hex())
elif result_format == 'bytes':
return z[:20]
raise ValueError('invalid result format "{}"'.format(result_format))
def public_key_to_address(pubk, result_format='hex'):
pubk_bytes = pubk.format(compressed=False)
return public_key_bytes_to_address(pubk_bytes, result_format='hex')
def private_key_to_address(pk, result_format='hex'):
pubk = coincurve.PublicKey.from_secret(pk.secret)
#logg.debug('secret {} '.format(pk.secret.hex()))
return public_key_to_address(pubk, result_format)
def is_address(address_hex):
try:
address_hex = strip_0x(address_hex)
except ValueError:
return False
return len(address_hex) == 40
def is_checksum_address(address_hex):
hx = None
try:
hx = to_checksum(address_hex)
except ValueError:
return False
return hx == strip_0x(address_hex)
def to_checksum_address(address_hex):
address_hex = strip_0x(address_hex)
address_hex = uniform(address_hex)
if len(address_hex) != 40:
raise ValueError('Invalid address length')
h = sha3.keccak_256()
h.update(address_hex.encode('utf-8'))
z = h.digest()
#checksum_address_hex = '0x'
checksum_address_hex = ''
for (i, c) in enumerate(address_hex):
if c in '1234567890':
checksum_address_hex += c
elif c in 'abcdef':
if z[int(i / 2)] & (0x80 >> ((i % 2) * 4)) > 1:
checksum_address_hex += c.upper()
else:
checksum_address_hex += c
return checksum_address_hex
to_checksum = to_checksum_address
ethereum_recid_modifier = 35
def chain_id_to_v(chain_id, signature):
v = signature[64]
return (chain_id * 2) + ethereum_recid_modifier + v
def chainv_to_v(chain_id, v):
return v - ethereum_recid_modifier - (chain_id * 2)

View File

@ -0,0 +1 @@
from .tx import EthTxExecutor

58
funga/eth/helper/tx.py Normal file
View File

@ -0,0 +1,58 @@
# standard imports
import logging
# local imports
from crypto_dev_signer.helper import TxExecutor
from crypto_dev_signer.error import NetworkError
logg = logging.getLogger()
logging.getLogger('web3').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
class EthTxExecutor(TxExecutor):
def __init__(self, w3, sender, signer, chain_id, verifier=None, block=False):
self.w3 = w3
nonce = self.w3.eth.getTransactionCount(sender, 'pending')
super(EthTxExecutor, self).__init__(sender, signer, self.translator, self.dispatcher, self.reporter, nonce, chain_id, self.fee_helper, self.fee_price_helper, verifier, block)
def fee_helper(self, tx):
estimate = self.w3.eth.estimateGas(tx)
if estimate < 21000:
estimate = 21000
logg.debug('estimate {} {}'.format(tx, estimate))
return estimate
def fee_price_helper(self):
return self.w3.eth.gasPrice
def dispatcher(self, tx):
error_object = None
try:
tx_hash = self.w3.eth.sendRawTransaction(tx)
except ValueError as e:
error_object = e.args[0]
logg.error('node could not intepret rlp {}'.format(tx))
if error_object != None:
raise NetworkError(error_object)
return tx_hash
def reporter(self, tx):
return self.w3.eth.getTransactionReceipt(tx)
def translator(self, tx):
if tx.get('feePrice') != None:
tx['gasPrice'] = tx['feePrice']
del tx['feePrice']
if tx.get('feeUnits') != None:
tx['gas'] = tx['feeUnits']
del tx['feeUnits']
return tx

View File

@ -0,0 +1,8 @@
# third-party imports
#from eth_keys import KeyAPI
#from eth_keys.backends import NativeECCBackend
#keyapi = KeyAPI(NativeECCBackend)
#from .postgres import ReferenceKeystore
#from .dict import DictKeystore

View File

@ -0,0 +1,45 @@
# standard imports
import logging
# external imports
from hexathon import (
strip_0x,
add_0x,
)
# local imports
#from . import keyapi
from funga.error import UnknownAccountError
from .interface import EthKeystore
from funga.eth.encoding import private_key_to_address
logg = logging.getLogger(__name__)
class DictKeystore(EthKeystore):
def __init__(self):
super(DictKeystore, self).__init__()
self.keys = {}
def get(self, address, password=None):
address_key = strip_0x(address).lower()
if password != None:
logg.debug('password ignored as dictkeystore doesnt do encryption')
try:
return self.keys[address_key]
except KeyError:
raise UnknownAccountError(address_key)
def list(self):
return list(self.keys.keys())
def import_key(self, pk, password=None):
address_hex = private_key_to_address(pk)
address_hex_clean = strip_0x(address_hex).lower()
self.keys[address_hex_clean] = pk.secret
logg.debug('added key {}'.format(address_hex))
return add_0x(address_hex)

View File

@ -0,0 +1,50 @@
# standard imports
import os
import json
import logging
# local imports
from funga.eth.keystore import keyfile
from funga.eth.encoding import private_key_from_bytes
from funga.keystore import Keystore
logg = logging.getLogger(__name__)
def native_keygen(*args, **kwargs):
return os.urandom(32)
class EthKeystore(Keystore):
def __init__(self, private_key_generator=native_keygen):
super(EthKeystore, self).__init__(private_key_generator, private_key_from_bytes, keyfile.from_some)
def new(self, password=None):
b = self.private_key_generator()
return self.import_raw_key(b, password=password)
def import_raw_key(self, b, password=None):
pk = private_key_from_bytes(b)
return self.import_key(pk, password)
def import_key(self, pk, password=None):
raise NotImplementedError
def import_keystore_data(self, keystore_content, password=''):
if type(keystore_content).__name__ == 'str':
keystore_content = json.loads(keystore_content)
elif type(keystore_content).__name__ == 'bytes':
logg.debug('bytes {}'.format(keystore_content))
keystore_content = json.loads(keystore_content.decode('utf-8'))
private_key = keyfile.from_dict(keystore_content, password.encode('utf-8'))
return self.import_raw_key(private_key, password)
def import_keystore_file(self, keystore_file, password=''):
private_key = keyfile.from_file(keystore_file, password)
return self.import_raw_key(private_key)

View File

@ -0,0 +1,173 @@
# standard imports
import os
import hashlib
import logging
import json
import uuid
# external imports
import coincurve
from Crypto.Cipher import AES
from Crypto.Util import Counter
import sha3
# local imports
from funga.error import (
DecryptError,
KeyfileError,
)
from funga.eth.encoding import private_key_to_address
logg = logging.getLogger(__name__)
algo_keywords = [
'aes-128-ctr',
]
hash_keywords = [
'scrypt'
]
default_kdfparams = {
'dklen': 32,
'n': 1 << 18,
'p': 1,
'r': 8,
'salt': os.urandom(32).hex(),
}
def to_mac(mac_key, ciphertext_bytes):
h = sha3.keccak_256()
h.update(mac_key)
h.update(ciphertext_bytes)
return h.digest()
class Hashes:
@staticmethod
def from_scrypt(kdfparams=default_kdfparams, passphrase=''):
dklen = int(kdfparams['dklen'])
n = int(kdfparams['n'])
p = int(kdfparams['p'])
r = int(kdfparams['r'])
salt = bytes.fromhex(kdfparams['salt'])
return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt,n=n, p=p, r=r, maxmem=1024*1024*1024, dklen=dklen)
class Ciphers:
aes_128_block_size = 1 << 7
aes_iv_len = 16
@staticmethod
def decrypt_aes_128_ctr(ciphertext, decryption_key, iv):
ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
cipher = AES.new(decryption_key, AES.MODE_CTR, counter=ctr)
plaintext = cipher.decrypt(ciphertext)
return plaintext
@staticmethod
def encrypt_aes_128_ctr(plaintext, encryption_key, iv):
ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
cipher = AES.new(encryption_key, AES.MODE_CTR, counter=ctr)
ciphertext = cipher.encrypt(plaintext)
return ciphertext
def to_dict(private_key_bytes, passphrase=''):
private_key = coincurve.PrivateKey(secret=private_key_bytes)
encryption_key = Hashes.from_scrypt(passphrase=passphrase)
address_hex = private_key_to_address(private_key)
iv_bytes = os.urandom(Ciphers.aes_iv_len)
iv = int.from_bytes(iv_bytes, 'big')
ciphertext_bytes = Ciphers.encrypt_aes_128_ctr(private_key.secret, encryption_key[:16], iv)
mac = to_mac(encryption_key[16:], ciphertext_bytes)
crypto_dict = {
'cipher': 'aes-128-ctr',
'ciphertext': ciphertext_bytes.hex(),
'cipherparams': {
'iv': iv_bytes.hex(),
},
'kdf': 'scrypt',
'kdfparams': default_kdfparams,
'mac': mac.hex(),
}
uu = uuid.uuid1()
o = {
'address': address_hex,
'version': 3,
'crypto': crypto_dict,
'id': str(uu),
}
return o
def from_dict(o, passphrase=''):
cipher = o['crypto']['cipher']
if cipher not in algo_keywords:
raise NotImplementedError('cipher "{}" not implemented'.format(cipher))
kdf = o['crypto']['kdf']
if kdf not in hash_keywords:
raise NotImplementedError('kdf "{}" not implemented'.format(kdf))
m = getattr(Hashes, 'from_{}'.format(kdf.replace('-', '_')))
decryption_key = m(o['crypto']['kdfparams'], passphrase)
control_mac = bytes.fromhex(o['crypto']['mac'])
iv_bytes = bytes.fromhex(o['crypto']['cipherparams']['iv'])
iv = int.from_bytes(iv_bytes, "big")
ciphertext_bytes = bytes.fromhex(o['crypto']['ciphertext'])
# check mac
calculated_mac = to_mac(decryption_key[16:], ciphertext_bytes)
if control_mac != calculated_mac:
raise DecryptError('mac mismatch when decrypting passphrase')
m = getattr(Ciphers, 'decrypt_{}'.format(cipher.replace('-', '_')))
try:
pk = m(ciphertext_bytes, decryption_key[:16], iv)
except AssertionError as e:
raise DecryptError('could not decrypt keyfile: {}'.format(e))
logg.debug('bar')
return pk
def from_file(filepath, passphrase=''):
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
raise KeyfileError(e)
f.close()
return from_dict(o, passphrase)
def from_some(v, passphrase=''):
if isinstance(v, bytes):
v = v.decode('utf-8')
if isinstance(v, str):
try:
return from_file(v, passphrase)
except Exception:
logg.debug('keyfile parse as file fail')
pass
v = json.loads(v)
return from_dict(v, passphrase)

108
funga/eth/keystore/sql.py Normal file
View File

@ -0,0 +1,108 @@
# standard imports
import logging
import base64
# external imports
from cryptography.fernet import Fernet
#import psycopg2
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import sha3
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from .interface import EthKeystore
#from . import keyapi
from funga.error import UnknownAccountError
from funga.eth.encoding import private_key_to_address
logg = logging.getLogger(__name__)
def to_bytes(x):
return x.encode('utf-8')
class SQLKeystore(EthKeystore):
schema = [
"""CREATE TABLE IF NOT EXISTS ethereum (
id SERIAL NOT NULL PRIMARY KEY,
key_ciphertext VARCHAR(256) NOT NULL,
wallet_address_hex CHAR(40) NOT NULL
);
""",
"""CREATE UNIQUE INDEX IF NOT EXISTS ethereum_address_idx ON ethereum ( wallet_address_hex );
""",
]
def __init__(self, dsn, **kwargs):
super(SQLKeystore, self).__init__()
logg.debug('starting db session with dsn {}'.format(dsn))
self.db_engine = create_engine(dsn)
self.db_session = sessionmaker(bind=self.db_engine)()
for s in self.schema:
self.db_session.execute(s)
self.db_session.commit()
self.symmetric_key = kwargs.get('symmetric_key')
def __del__(self):
logg.debug('closing db session')
self.db_session.close()
def get(self, address, password=None):
safe_address = strip_0x(address).lower()
s = text('SELECT key_ciphertext FROM ethereum WHERE wallet_address_hex = :a')
r = self.db_session.execute(s, {
'a': safe_address,
},
)
try:
k = r.first()[0]
except TypeError:
self.db_session.rollback()
raise UnknownAccountError(safe_address)
self.db_session.commit()
a = self._decrypt(k, password)
return a
def import_key(self, pk, password=None):
address_hex = private_key_to_address(pk)
address_hex_clean = strip_0x(address_hex).lower()
c = self._encrypt(pk.secret, password)
s = text('INSERT INTO ethereum (wallet_address_hex, key_ciphertext) VALUES (:a, :c)') #%s, %s)')
self.db_session.execute(s, {
'a': address_hex_clean,
'c': c.decode('utf-8'),
},
)
self.db_session.commit()
logg.info('added private key for address {}'.format(address_hex_clean))
return add_0x(address_hex)
def _encrypt(self, private_key, password):
f = self._generate_encryption_engine(password)
return f.encrypt(private_key)
def _generate_encryption_engine(self, password):
h = sha3.keccak_256()
h.update(self.symmetric_key)
if password != None:
password_bytes = to_bytes(password)
h.update(password_bytes)
g = h.digest()
return Fernet(base64.b64encode(g))
def _decrypt(self, c, password):
f = self._generate_encryption_engine(password)
return f.decrypt(c.encode('utf-8'))

View File

@ -0,0 +1,87 @@
# standard imports
import os
import logging
import sys
import json
import argparse
import getpass
# external impors
import coincurve
from hexathon import strip_0x
# local imports
from funga.error import DecryptError
from funga.eth.keystore.keyfile import (
from_file,
to_dict,
)
from funga.eth.encoding import (
private_key_to_address,
private_key_from_bytes,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
argparser = argparse.ArgumentParser()
argparser.add_argument('-d', '--decrypt', dest='d', type=str, help='decrypt file')
argparser.add_argument('--private-key', dest='private_key', action='store_true', help='output private key instead of address')
argparser.add_argument('-z', action='store_true', help='zero-length password')
argparser.add_argument('-k', type=str, help='load key from file')
argparser.add_argument('-v', action='store_true', help='be verbose')
args = argparser.parse_args()
if args.v:
logg.setLevel(logging.DEBUG)
mode = 'create'
secret = False
if args.d:
mode = 'decrypt'
if args.private_key:
secret = True
pk_hex = os.environ.get('PRIVATE_KEY')
if args.k != None:
f = open(args.k, 'r')
pk_hex = f.read(66)
f.close()
def main():
global pk_hex
passphrase = os.environ.get('PASSPHRASE')
if args.z:
passphrase = ''
r = None
if mode == 'decrypt':
if passphrase == None:
passphrase = getpass.getpass('decryption phrase: ')
try:
r = from_file(args.d, passphrase).hex()
except DecryptError:
sys.stderr.write('Invalid passphrase\n')
sys.exit(1)
if not secret:
pk = private_key_from_bytes(bytes.fromhex(r))
r = private_key_to_address(pk)
elif mode == 'create':
if passphrase == None:
passphrase = getpass.getpass('encryption phrase: ')
pk_bytes = None
if pk_hex != None:
pk_hex = strip_0x(pk_hex)
pk_bytes = bytes.fromhex(pk_hex)
else:
pk_bytes = os.urandom(32)
pk = coincurve.PrivateKey(secret=pk_bytes)
o = to_dict(pk_bytes, passphrase)
r = json.dumps(o)
print(r)
if __name__ == '__main__':
main()

122
funga/eth/runnable/signer.py Executable file
View File

@ -0,0 +1,122 @@
# standard imports
import re
import os
import sys
import json
import logging
import argparse
from urllib.parse import urlparse
# external imports
import confini
from jsonrpc.exceptions import *
# local imports
from crypto_dev_signer.eth.signer import ReferenceSigner
from crypto_dev_signer.keystore.reference import ReferenceKeystore
from crypto_dev_signer.cli.handle import SignRequestHandler
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = '.'
db = None
signer = None
session = None
chainId = 8995
socket_path = '/run/crypto-dev-signer/jsonrpc.ipc'
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-i', type=int, help='default chain id for EIP155')
argparser.add_argument('-s', type=str, help='socket path')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.censor('SECRET', 'SIGNER')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
if args.i:
chainId = args.i
if args.s:
socket_url = urlparse(args.s)
elif config.get('SIGNER_SOCKET_PATH'):
socket_url = urlparse(config.get('SIGNER_SOCKET_PATH'))
# connect to database
dsn = 'postgresql://{}:{}@{}:{}/{}'.format(
config.get('DATABASE_USER'),
config.get('DATABASE_PASSWORD'),
config.get('DATABASE_HOST'),
config.get('DATABASE_PORT'),
config.get('DATABASE_NAME'),
)
logg.info('using dsn {}'.format(dsn))
logg.info('using socket {}'.format(config.get('SIGNER_SOCKET_PATH')))
re_http = r'^http'
re_tcp = r'^tcp'
re_unix = r'^ipc'
class MissingSecretError(Exception):
pass
def main():
secret_hex = config.get('SIGNER_SECRET')
if secret_hex == None:
raise MissingSecretError('please provide a valid hex value for the SIGNER_SECRET configuration variable')
secret = bytes.fromhex(secret_hex)
kw = {
'symmetric_key': secret,
}
SignRequestHandler.keystore = ReferenceKeystore(dsn, **kw)
SignRequestHandler.signer = ReferenceSigner(SignRequestHandler.keystore)
arg = None
try:
arg = json.loads(sys.argv[1])
except:
logg.info('no json rpc command detected, starting socket server {}'.format(socket_url))
scheme = 'ipc'
if socket_url.scheme != '':
scheme = socket_url.scheme
if re.match(re_tcp, socket_url.scheme):
from crypto_dev_signer.cli.socket import start_server_tcp
socket_spec = socket_url.netloc.split(':')
host = socket_spec[0]
port = int(socket_spec[1])
start_server_tcp((host, port))
elif re.match(re_http, socket_url.scheme):
from crypto_dev_signer.cli.http import start_server_http
socket_spec = socket_url.netloc.split(':')
host = socket_spec[0]
port = int(socket_spec[1])
start_server_http((host, port))
else:
from crypto_dev_signer.cli.socket import start_server_unix
start_server_unix(socket_url.path)
sys.exit(0)
(rpc_id, response) = process_input(arg)
r = jsonrpc_ok(rpc_id, response)
sys.stdout.write(json.dumps(r))
if __name__ == '__main__':
main()

View File

@ -0,0 +1 @@
from funga.eth.signer.defaultsigner import EIP155Signer

View File

@ -0,0 +1,79 @@
# standard imports
import logging
# external imports
import sha3
import coincurve
from hexathon import int_to_minbytes
# local imports
from funga.signer import Signer
from funga.eth.encoding import chain_id_to_v
logg = logging.getLogger(__name__)
class EIP155Signer(Signer):
def __init__(self, keyGetter):
super(EIP155Signer, self).__init__(keyGetter)
def sign_transaction(self, tx, password=None):
s = tx.rlp_serialize()
h = sha3.keccak_256()
h.update(s)
message_to_sign = h.digest()
z = self.sign_pure(tx.sender, message_to_sign, password)
return z
def sign_transaction_to_rlp(self, tx, password=None):
chain_id = int.from_bytes(tx.v, byteorder='big')
sig = self.sign_transaction(tx, password)
tx.apply_signature(chain_id, sig)
return tx.rlp_serialize()
def sign_transaction_to_wire(self, tx, password=None):
return self.sign_transaction_to_rlp(tx, password=password)
def sign_ethereum_message(self, address, message, password=None):
#k = keys.PrivateKey(self.keyGetter.get(address, password))
#z = keys.ecdsa_sign(message_hash=g, private_key=k)
if type(message).__name__ == 'str':
logg.debug('signing message in "str" format: {}'.format(message))
#z = k.sign_msg(bytes.fromhex(message))
message = bytes.fromhex(message)
elif type(message).__name__ == 'bytes':
logg.debug('signing message in "bytes" format: {}'.format(message.hex()))
#z = k.sign_msg(message)
else:
logg.debug('unhandled format {}'.format(type(message).__name__))
raise ValueError('message must be type str or bytes, received {}'.format(type(message).__name__))
ethereumed_message_header = b'\x19' + 'Ethereum Signed Message:\n{}'.format(len(message)).encode('utf-8')
h = sha3.keccak_256()
h.update(ethereumed_message_header + message)
message_to_sign = h.digest()
z = self.sign_pure(address, message_to_sign, password)
return z
# TODO: generic sign should be moved to non-eth context
def sign_pure(self, address, message, password=None):
pk = coincurve.PrivateKey(secret=self.keyGetter.get(address, password))
z = pk.sign_recoverable(hasher=None, message=message)
return z
def sign_message(self, address, message, password=None, dialect='eth'):
if dialect == None:
return self.sign_pure(address, message, password=password)
elif dialect == 'eth':
return self.sign_ethereum_message(address, message, password=password)
raise ValueError('Unknown message sign dialect "{}"'.format(dialect))

172
funga/eth/transaction.py Normal file
View File

@ -0,0 +1,172 @@
# standard imports
import logging
import binascii
import re
# external imports
#from rlp import encode as rlp_encode
from hexathon import (
strip_0x,
add_0x,
int_to_minbytes,
)
# local imports
from funga.eth.encoding import chain_id_to_v
#from crypto_dev_signer.eth.rlp import rlp_encode
import rlp
logg = logging.getLogger(__name__)
rlp_encode = rlp.encode
class Transaction:
def rlp_serialize(self):
raise NotImplementedError
def serialize(self):
raise NotImplementedError
class EIP155Transaction:
def __init__(self, tx, nonce_in, chainId_in=1):
to = b''
data = b''
if tx.get('to') != None:
to = bytes.fromhex(strip_0x(tx['to'], allow_empty=True))
if tx.get('data') != None:
data = bytes.fromhex(strip_0x(tx['data'], allow_empty=True))
gas_price = None
start_gas = None
value = None
nonce = None
chainId = None
# TODO: go directly from hex to bytes
try:
gas_price = int(tx['gasPrice'])
byts = ((gas_price.bit_length()-1)/8)+1
gas_price = gas_price.to_bytes(int(byts), 'big')
except ValueError:
gas_price = bytes.fromhex(strip_0x(tx['gasPrice'], allow_empty=True))
try:
start_gas = int(tx['gas'])
byts = ((start_gas.bit_length()-1)/8)+1
start_gas = start_gas.to_bytes(int(byts), 'big')
except ValueError:
start_gas = bytes.fromhex(strip_0x(tx['gas'], allow_empty=True))
try:
value = int(tx['value'])
byts = ((value.bit_length()-1)/8)+1
value = value.to_bytes(int(byts), 'big')
except ValueError:
value = bytes.fromhex(strip_0x(tx['value'], allow_empty=True))
try:
nonce = int(nonce_in)
byts = ((nonce.bit_length()-1)/8)+1
nonce = nonce.to_bytes(int(byts), 'big')
except ValueError:
nonce = bytes.fromhex(strip_0x(nonce_in, allow_empty=True))
try:
chainId = int(chainId_in)
byts = ((chainId.bit_length()-1)/8)+1
chainId = chainId.to_bytes(int(byts), 'big')
except ValueError:
chainId = bytes.fromhex(strip_0x(chainId_in, allow_empty=True))
self.nonce = nonce
self.gas_price = gas_price
self.start_gas = start_gas
self.to = to
self.value = value
self.data = data
self.v = chainId
self.r = b''
self.s = b''
self.sender = strip_0x(tx['from'])
def canonical_order(self):
s = [
self.nonce,
self.gas_price,
self.start_gas,
self.to,
self.value,
self.data,
self.v,
self.r,
self.s,
]
return s
def bytes_serialize(self):
s = self.canonical_order()
b = b''
for e in s:
b += e
return b
def rlp_serialize(self):
s = self.canonical_order()
return rlp_encode(s)
def serialize(self):
tx = {
'nonce': add_0x(self.nonce.hex(), allow_empty=True),
'gasPrice': add_0x(self.gas_price.hex()),
'gas': add_0x(self.start_gas.hex()),
'value': add_0x(self.value.hex(), allow_empty=True),
'data': add_0x(self.data.hex(), allow_empty=True),
'v': add_0x(self.v.hex(), allow_empty=True),
'r': add_0x(self.r.hex(), allow_empty=True),
's': add_0x(self.s.hex(), allow_empty=True),
}
if self.to == None or len(self.to) == 0:
tx['to'] = None
else:
tx['to'] = add_0x(self.to.hex())
if tx['data'] == '':
tx['data'] = '0x'
if tx['value'] == '':
tx['value'] = '0x00'
if tx['nonce'] == '':
tx['nonce'] = '0x00'
return tx
def apply_signature(self, chain_id, signature, v=None):
if len(self.r + self.s) > 0:
raise AttributeError('signature already set')
if len(signature) < 65:
raise ValueError('invalid signature length')
if v == None:
v = chain_id_to_v(chain_id, signature)
self.v = int_to_minbytes(v)
self.r = signature[:32]
self.s = signature[32:64]
for i in range(len(self.r)):
if self.r[i] > 0:
self.r = self.r[i:]
break
for i in range(len(self.s)):
if self.s[i] > 0:
self.s = self.s[i:]
break

View File

@ -0,0 +1,29 @@
import logging
import re
from web3 import Web3 as Web3super
from web3 import WebsocketProvider, HTTPProvider
from .middleware import PlatformMiddleware
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
logg = logging.getLogger(__file__)
def create_middleware(ipcpath):
PlatformMiddleware.ipcaddr = ipcpath
return PlatformMiddleware
# overrides the original Web3 constructor
#def Web3(blockchain_provider='ws://localhost:8546', ipcpath=None):
def Web3(provider, ipcpath=None):
w3 = Web3super(provider)
if ipcpath != None:
logg.info('using signer middleware with ipc {}'.format(ipcpath))
w3.middleware_onion.add(create_middleware(ipcpath))
w3.eth.personal = w3.geth.personal
return w3

View File

@ -0,0 +1,116 @@
# standard imports
import logging
import re
import socket
import uuid
import json
logg = logging.getLogger(__file__)
def jsonrpc_request(method, params):
uu = uuid.uuid4()
return {
"jsonrpc": "2.0",
"id": str(uu),
"method": method,
"params": params,
}
class PlatformMiddleware:
# id for the request is not available, meaning we cannot easily short-circuit
# hack workaround
id_seq = -1
re_personal = re.compile('^personal_.*')
ipcaddr = None
def __init__(self, make_request, w3):
self.w3 = w3
self.make_request = make_request
if self.ipcaddr == None:
raise AttributeError('ipcaddr not set')
# TODO: understand what format input params come in
# single entry input gives a tuple on params, wtf...
# dict input comes as [{}] and fails if not passed on as an array
@staticmethod
def _translate_params(params):
#if params.__class__.__name__ == 'tuple':
# r = []
# for p in params:
# r.append(p)
# return r
if params.__class__.__name__ == 'list' and len(params) > 0:
return params[0]
return params
# TODO: DRY
def __call__(self, method, suspect_params):
self.id_seq += 1
logg.debug('in middleware method {} params {} ipcpath {}'.format(method, suspect_params, self.ipcaddr))
if self.re_personal.match(method) != None:
params = PlatformMiddleware._translate_params(suspect_params)
# multiple providers is removed in web3.py 5.12.0
# https://github.com/ethereum/web3.py/issues/1701
# thus we need a workaround to use the same web3 instance
s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0)
ipc_provider_workaround = s.connect(self.ipcaddr)
logg.info('redirecting method {} params {} original params {}'.format(method, params, suspect_params))
o = jsonrpc_request(method, params[0])
j = json.dumps(o)
logg.debug('send {}'.format(j))
s.send(j.encode('utf-8'))
r = s.recv(4096)
s.close()
logg.debug('got recv {}'.format(str(r)))
jr = json.loads(r)
jr['id'] = self.id_seq
#return str(json.dumps(jr))
return jr
elif method == 'eth_signTransaction':
params = PlatformMiddleware._translate_params(suspect_params)
s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0)
ipc_provider_workaround = s.connect(self.ipcaddr)
logg.info('redirecting method {} params {} original params {}'.format(method, params, suspect_params))
o = jsonrpc_request(method, params[0])
j = json.dumps(o)
logg.debug('send {}'.format(j))
s.send(j.encode('utf-8'))
r = s.recv(4096)
s.close()
logg.debug('got recv {}'.format(str(r)))
jr = json.loads(r)
jr['id'] = self.id_seq
#return str(json.dumps(jr))
return jr
elif method == 'eth_sign':
params = PlatformMiddleware._translate_params(suspect_params)
s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0)
ipc_provider_workaround = s.connect(self.ipcaddr)
logg.info('redirecting method {} params {} original params {}'.format(method, params, suspect_params))
o = jsonrpc_request(method, params)
j = json.dumps(o)
logg.debug('send {}'.format(j))
s.send(j.encode('utf-8'))
r = s.recv(4096)
s.close()
logg.debug('got recv {}'.format(str(r)))
jr = json.loads(r)
jr['id'] = self.id_seq
return jr
r = self.make_request(method, suspect_params)
return r

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
cryptography==3.2.1
pysha3==1.0.2
rlp==2.0.1
json-rpc==1.13.0
confini>=0.3.6rc3,<0.5.0
coincurve==15.0.0
hexathon~=0.0.1a7
pycryptodome==3.10.1
funga>=0.5.1a1,<0.6.0

11
setup.cfg Normal file
View File

@ -0,0 +1,11 @@
[metadata]
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
Development Status :: 3 - Alpha
Intended Audience :: Developers
Topic :: Software Development :: Libraries
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
license = GPLv3
license_files =
LICENSE.txt

61
setup.py Normal file
View File

@ -0,0 +1,61 @@
from setuptools import setup
f = open('README.md', 'r')
long_description = f.read()
f.close()
requirements = []
f = open('requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
requirements.append(l.rstrip())
f.close()
sql_requirements = []
f = open('sql_requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
sql_requirements.append(l.rstrip())
f.close()
test_requirements = []
f = open('test_requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
test_requirements.append(l.rstrip())
f.close()
setup(
name="funga-eth",
version="0.5.1a1",
description="Ethereum implementation of the funga keystore and signer",
author="Louis Holbrook",
author_email="dev@holbrook.no",
packages=[
'funga.eth.signer',
'funga.eth',
'funga.eth.cli',
'funga.eth.keystore',
'funga.eth.runnable',
],
install_requires=requirements,
extras_require={
'sql': sql_requirements,
},
tests_require=test_requirements,
long_description=long_description,
long_description_content_type='text/markdown',
entry_points = {
'console_scripts': [
'funga-eth=funga.eth.runnable.signer:main',
'eth-keyfile=funga.eth.runnable.keyfile:main',
],
},
url='https://gitlab.com/chaintool/funga-eth',
)

2
sql_requirements.txt Normal file
View File

@ -0,0 +1,2 @@
psycopg2==2.8.6
sqlalchemy==1.3.20

0
test_requirements.txt Normal file
View File

88
tests/test_cli.py Normal file
View File

@ -0,0 +1,88 @@
# standard imports
import unittest
import logging
import os
# external imports
from hexathon import strip_0x
# local imports
from funga.eth.signer import EIP155Signer
from funga.eth.keystore.dict import DictKeystore
from funga.eth.cli.handle import SignRequestHandler
from funga.eth.transaction import EIP155Transaction
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, 'testdata')
class TestCli(unittest.TestCase):
def setUp(self):
#pk = bytes.fromhex('5087503f0a9cc35b38665955eb830c63f778453dd11b8fa5bd04bc41fd2cc6d6')
#pk_getter = pkGetter(pk)
self.keystore = DictKeystore()
SignRequestHandler.keystore = self.keystore
self.signer = EIP155Signer(self.keystore)
SignRequestHandler.signer = self.signer
self.handler = SignRequestHandler()
def test_new_account(self):
q = {
'id': 0,
'method': 'personal_newAccount',
'params': [''],
}
(rpc_id, result) = self.handler.process_input(q)
self.assertTrue(self.keystore.get(result))
def test_sign_tx(self):
keystore_file = os.path.join(data_dir, 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
sender = self.keystore.import_keystore_file(keystore_file)
tx_hexs = {
'nonce': '0x',
'from': sender,
'gasPrice': "0x04a817c800",
'gas': "0x5208",
'to': '0x3535353535353535353535353535353535353535',
'value': "0x03e8",
'data': "0xdeadbeef",
'chainId': 8995,
}
tx = EIP155Transaction(tx_hexs, 42, 8995)
tx_s = tx.serialize()
# TODO: move to serialization wrapper for tests
tx_s['chainId'] = tx_s['v']
tx_s['from'] = sender
# eth_signTransaction wraps personal_signTransaction, so here we test both already
q = {
'id': 0,
'method': 'eth_signTransaction',
'params': [tx_s],
}
(rpc_id, result) = self.handler.process_input(q)
logg.debug('result {}'.format(result))
self.assertEqual(strip_0x(result), 'f86c2a8504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef82466aa0b7c1bbf52f736ada30fe253c7484176f44d6fd097a9720dc85ae5bbc7f060e54a07afee2563b0cf6d00333df51cc62b0d13c63108b2bce54ce2ad24e26ce7b4f25')
def test_sign_msg(self):
keystore_file = os.path.join(data_dir, 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
sender = self.keystore.import_keystore_file(keystore_file)
q = {
'id': 0,
'method': 'eth_sign',
'params': [sender, '0xdeadbeef'],
}
(rpc_id, result) = self.handler.process_input(q)
logg.debug('result msg {}'.format(result))
self.assertEqual(strip_0x(result), '50320dda75190a121b7b5979de66edadafd02bdfbe4f6d49552e79c01410d2464aae35e385c0e5b61663ff7b44ef65fa0ac7ad8a57472cf405db399b9dba3e1600')
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,63 @@
#!/usr/bin/python
# standard imports
import unittest
import logging
import base64
import os
# external imports
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from funga.error import UnknownAccountError
from funga.eth.keystore.dict import DictKeystore
from funga.eth.signer import EIP155Signer
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
class TestDict(unittest.TestCase):
address_hex = None
db = None
def setUp(self):
self.db = DictKeystore()
keystore_filepath = os.path.join(script_dir, 'testdata', 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
address_hex = self.db.import_keystore_file(keystore_filepath, '')
self.address_hex = add_0x(address_hex)
def tearDown(self):
pass
def test_get_key(self):
logg.debug('getting {}'.format(strip_0x(self.address_hex)))
pk = self.db.get(strip_0x(self.address_hex), '')
self.assertEqual(self.address_hex.lower(), '0x00a329c0648769a73afac7f9381e08fb43dbea72')
bogus_account = os.urandom(20).hex()
with self.assertRaises(UnknownAccountError):
self.db.get(bogus_account, '')
def test_sign_message(self):
s = EIP155Signer(self.db)
z = s.sign_ethereum_message(strip_0x(self.address_hex), b'foo')
logg.debug('zzz {}'.format(str(z)))
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,64 @@
#!/usr/bin/python
# standard imports
import unittest
import logging
import base64
import os
# external imports
import psycopg2
from psycopg2 import sql
from cryptography.fernet import Fernet, InvalidToken
# local imports
from funga.eth.keystore.sql import SQLKeystore
from funga.error import UnknownAccountError
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestDatabase(unittest.TestCase):
conn = None
cur = None
symkey = None
address_hex = None
db = None
def setUp(self):
logg.debug('setup')
# arbitrary value
symkey_hex = 'E92431CAEE69313A7BE9E443C4ABEED9BF8157E9A13553B4D5D6E7D51B5021D9'
self.symkey = bytes.fromhex(symkey_hex)
self.address_hex = '9FA61f0E52A5C51b43f0d32404625BC436bb7041'
kw = {
'symmetric_key': self.symkey,
}
self.db = SQLKeystore('postgres+psycopg2://postgres@localhost:5432/signer_test', **kw)
self.address_hex = self.db.new('foo')
#self.address_hex = add_0x(address_hex)
def tearDown(self):
self.db.db_session.execute('DROP INDEX ethereum_address_idx;')
self.db.db_session.execute('DROP TABLE ethereum;')
self.db.db_session.commit()
def test_get_key(self):
logg.debug('getting {}'.format(self.address_hex))
self.db.get(self.address_hex, 'foo')
with self.assertRaises(InvalidToken):
self.db.get(self.address_hex, 'bar')
bogus_account = '0x' + os.urandom(20).hex()
with self.assertRaises(UnknownAccountError):
self.db.get(bogus_account, 'bar')
if __name__ == '__main__':
unittest.main()

99
tests/test_sign.py Normal file
View File

@ -0,0 +1,99 @@
# standard imports
import unittest
import logging
import json
from rlp import encode as rlp_encode
from funga.eth.signer import EIP155Signer
from funga.eth.transaction import EIP155Transaction
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
tx_ints = {
'nonce': 0,
'from': "0xEB014f8c8B418Db6b45774c326A0E64C78914dC0",
'gasPrice': "20000000000",
'gas': "21000",
'to': '0x3535353535353535353535353535353535353535',
'value': "1000",
'data': "deadbeef",
}
tx_hexs = {
'nonce': '0x0',
'from': "0xEB014f8c8B418Db6b45774c326A0E64C78914dC0",
'gasPrice': "0x4a817c800",
'gas': "0x5208",
'to': '0x3535353535353535353535353535353535353535',
'value': "0x3e8",
'data': "deadbeef",
}
class pkGetter:
def __init__(self, pk):
self.pk = pk
def get(self, address, password=None):
return self.pk
class TestSign(unittest.TestCase):
pk = None
nonce = -1
pk_getter = None
def getNonce(self):
self.nonce += 1
return self.nonce
def setUp(self):
self.pk = bytes.fromhex('5087503f0a9cc35b38665955eb830c63f778453dd11b8fa5bd04bc41fd2cc6d6')
self.pk_getter = pkGetter(self.pk)
def tearDown(self):
logg.info('teardown empty')
# TODO: verify rlp tx output
def test_serialize_transaction(self):
t = EIP155Transaction(tx_ints, 0)
self.assertRegex(t.__class__.__name__, "Transaction")
s = t.serialize()
self.assertDictEqual(s, {'nonce': '0x', 'gasPrice': '0x04a817c800', 'gas': '0x5208', 'to': '0x3535353535353535353535353535353535353535', 'value': '0x03e8', 'data': '0xdeadbeef', 'v': '0x01', 'r': '0x', 's': '0x'})
r = t.rlp_serialize()
self.assertEqual(r.hex(), 'ea808504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef018080')
t = EIP155Transaction(tx_hexs, 0)
self.assertRegex(t.__class__.__name__, "Transaction")
s = t.serialize()
#o = json.loads(s)
self.assertDictEqual(s, {'nonce': '0x', 'gasPrice': '0x04a817c800', 'gas': '0x5208', 'to': '0x3535353535353535353535353535353535353535', 'value': '0x03e8', 'data': '0xdeadbeef', 'v': '0x01', 'r': '0x', 's': '0x'})
r = t.rlp_serialize()
self.assertEqual(r.hex(), 'ea808504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef018080')
def test_sign_transaction(self):
t = EIP155Transaction(tx_ints, 461, 8995)
s = EIP155Signer(self.pk_getter)
z = s.sign_transaction(t)
def test_sign_message(self):
s = EIP155Signer(self.pk_getter)
z = s.sign_ethereum_message(tx_ints['from'], '666f6f')
z = s.sign_ethereum_message(tx_ints['from'], b'foo')
logg.debug('zzz {}'.format(str(z)))
if __name__ == '__main__':
unittest.main()

15
tests/test_socket.py Normal file
View File

@ -0,0 +1,15 @@
# standard imports
import unittest
import logging
logg = logging.getLogger(__name__)
class SocketTest(unittest.TestCase):
def test_placeholder_warning(self):
logg.warning('socket tests are missing! :/')
if __name__ == '__main__':
unittest.main()