Compare commits

..

No commits in common. "master" and "39435b67ba7441f86f4703e24a962f6ab1fd607d" have entirely different histories.

10 changed files with 60 additions and 244 deletions

View File

@ -1,16 +0,0 @@
* 0.6.2
- Enable signing of binary message
* 0.6.1
- Avoid padding of addresses missing one nibble
* 0.6.0
- Upgrade confini
* 0.5.4
- Add message signer cli
- Add pbkdf2 support
- Add -0 flag for omitting newline on output
- Revert RLP to 2.0.1, to not break eth-tester in dependents
* 0.5.3
- Upgrade RLP to 3.0.0 (eliminates really annoying cytoolz warning on stdout)
---
changelog before 0.5.3 will be written later, sorry

View File

@ -41,7 +41,7 @@ def private_key_to_address(pk, result_format='hex'):
def is_address(address_hex): def is_address(address_hex):
try: try:
address_hex = strip_0x(address_hex, pad=False) address_hex = strip_0x(address_hex)
except ValueError: except ValueError:
return False return False
return len(address_hex) == 40 return len(address_hex) == 40
@ -57,10 +57,10 @@ def is_checksum_address(address_hex):
def to_checksum_address(address_hex): def to_checksum_address(address_hex):
address_hex = strip_0x(address_hex, pad=False) address_hex = strip_0x(address_hex)
address_hex = uniform(address_hex)
if len(address_hex) != 40: if len(address_hex) != 40:
raise ValueError('Invalid address length') raise ValueError('Invalid address length')
address_hex = uniform(address_hex)
h = sha3.keccak_256() h = sha3.keccak_256()
h.update(address_hex.encode('utf-8')) h.update(address_hex.encode('utf-8'))
z = h.digest() z = h.digest()

View File

@ -13,35 +13,28 @@ import sha3
# local imports # local imports
from funga.error import ( from funga.error import (
DecryptError, DecryptError,
KeyfileError, KeyfileError,
) )
from funga.eth.encoding import private_key_to_address from funga.eth.encoding import private_key_to_address
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
algo_keywords = [ algo_keywords = [
'aes-128-ctr', 'aes-128-ctr',
] ]
hash_keywords = [ hash_keywords = [
'scrypt', 'scrypt'
'pbkdf2' ]
]
default_scrypt_kdfparams = { default_kdfparams = {
'dklen': 32, 'dklen': 32,
'n': 1 << 18, 'n': 1 << 18,
'p': 1, 'p': 1,
'r': 8, 'r': 8,
'salt': os.urandom(32).hex(), 'salt': os.urandom(32).hex(),
} }
default_pbkdf2_kdfparams = {
'c': 100000,
'dklen': 32,
'prf': 'sha256',
'salt': os.urandom(32).hex(),
}
def to_mac(mac_key, ciphertext_bytes): def to_mac(mac_key, ciphertext_bytes):
h = sha3.keccak_256() h = sha3.keccak_256()
@ -53,32 +46,18 @@ def to_mac(mac_key, ciphertext_bytes):
class Hashes: class Hashes:
@staticmethod @staticmethod
def from_scrypt(kdfparams=default_scrypt_kdfparams, passphrase=''): def from_scrypt(kdfparams=default_kdfparams, passphrase=''):
dklen = int(kdfparams['dklen']) dklen = int(kdfparams['dklen'])
n = int(kdfparams['n']) n = int(kdfparams['n'])
p = int(kdfparams['p']) p = int(kdfparams['p'])
r = int(kdfparams['r']) r = int(kdfparams['r'])
salt = bytes.fromhex(kdfparams['salt']) salt = bytes.fromhex(kdfparams['salt'])
return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt, n=n, p=p, r=r, maxmem=1024 * 1024 * 1024, return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt,n=n, p=p, r=r, maxmem=1024*1024*1024, dklen=dklen)
dklen=dklen)
@staticmethod
def from_pbkdf2(kdfparams=default_pbkdf2_kdfparams, passphrase=''):
if kdfparams['prf'] == 'hmac-sha256':
kdfparams['prf'].replace('hmac-sha256','sha256')
derived_key = hashlib.pbkdf2_hmac(
hash_name='sha256',
password=passphrase.encode('utf-8'),
salt=bytes.fromhex(kdfparams['salt']),
iterations=int(kdfparams['c']),
dklen=int(kdfparams['dklen'])
)
return derived_key
class Ciphers: class Ciphers:
aes_128_block_size = 1 << 7 aes_128_block_size = 1 << 7
aes_iv_len = 16 aes_iv_len = 16
@ -89,6 +68,7 @@ class Ciphers:
plaintext = cipher.decrypt(ciphertext) plaintext = cipher.decrypt(ciphertext)
return plaintext return plaintext
@staticmethod @staticmethod
def encrypt_aes_128_ctr(plaintext, encryption_key, iv): def encrypt_aes_128_ctr(plaintext, encryption_key, iv):
ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv) ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
@ -97,19 +77,11 @@ class Ciphers:
return ciphertext return ciphertext
def to_dict(private_key_bytes, kdf='scrypt', passphrase=''): def to_dict(private_key_bytes, passphrase=''):
private_key = coincurve.PrivateKey(secret=private_key_bytes) private_key = coincurve.PrivateKey(secret=private_key_bytes)
if kdf == 'scrypt': encryption_key = Hashes.from_scrypt(passphrase=passphrase)
encryption_key = Hashes.from_scrypt(passphrase=passphrase)
kdfparams = default_scrypt_kdfparams
elif kdf == 'pbkdf2':
encryption_key = Hashes.from_pbkdf2(passphrase=passphrase)
kdfparams = pbkdf2_kdfparams
else:
raise NotImplementedError("KDF not implemented: {0}".format(kdf))
address_hex = private_key_to_address(private_key) address_hex = private_key_to_address(private_key)
iv_bytes = os.urandom(Ciphers.aes_iv_len) iv_bytes = os.urandom(Ciphers.aes_iv_len)
@ -123,11 +95,11 @@ def to_dict(private_key_bytes, kdf='scrypt', passphrase=''):
'ciphertext': ciphertext_bytes.hex(), 'ciphertext': ciphertext_bytes.hex(),
'cipherparams': { 'cipherparams': {
'iv': iv_bytes.hex(), 'iv': iv_bytes.hex(),
}, },
'kdf': kdf, 'kdf': 'scrypt',
'kdfparams': kdfparams, 'kdfparams': default_kdfparams,
'mac': mac.hex(), 'mac': mac.hex(),
} }
uu = uuid.uuid1() uu = uuid.uuid1()
o = { o = {
@ -135,11 +107,12 @@ def to_dict(private_key_bytes, kdf='scrypt', passphrase=''):
'version': 3, 'version': 3,
'crypto': crypto_dict, 'crypto': crypto_dict,
'id': str(uu), 'id': str(uu),
} }
return o return o
def from_dict(o, passphrase=''): def from_dict(o, passphrase=''):
cipher = o['crypto']['cipher'] cipher = o['crypto']['cipher']
if cipher not in algo_keywords: if cipher not in algo_keywords:
raise NotImplementedError('cipher "{}" not implemented'.format(cipher)) raise NotImplementedError('cipher "{}" not implemented'.format(cipher))
@ -172,6 +145,7 @@ def from_dict(o, passphrase=''):
def from_file(filepath, passphrase=''): def from_file(filepath, passphrase=''):
f = open(filepath, 'r') f = open(filepath, 'r')
try: try:
o = json.load(f) o = json.load(f)

View File

@ -16,8 +16,6 @@ from funga.eth.keystore.keyfile import (
from_file, from_file,
to_dict, to_dict,
) )
from funga.eth.encoding import ( from funga.eth.encoding import (
private_key_to_address, private_key_to_address,
private_key_from_bytes, private_key_from_bytes,
@ -30,7 +28,6 @@ logg = logging.getLogger()
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument('-d', '--decrypt', dest='d', type=str, help='decrypt file') argparser.add_argument('-d', '--decrypt', dest='d', type=str, help='decrypt file')
argparser.add_argument('--private-key', dest='private_key', action='store_true', help='output private key instead of address') argparser.add_argument('--private-key', dest='private_key', action='store_true', help='output private key instead of address')
argparser.add_argument('-0', dest='nonl', action='store_true', help='no newline at end of output')
argparser.add_argument('-z', action='store_true', help='zero-length password') argparser.add_argument('-z', action='store_true', help='zero-length password')
argparser.add_argument('-k', type=str, help='load key from file') argparser.add_argument('-k', type=str, help='load key from file')
argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-v', action='store_true', help='be verbose')
@ -80,12 +77,10 @@ def main():
else: else:
pk_bytes = os.urandom(32) pk_bytes = os.urandom(32)
pk = coincurve.PrivateKey(secret=pk_bytes) pk = coincurve.PrivateKey(secret=pk_bytes)
o = to_dict(pk_bytes, passphrase=passphrase) o = to_dict(pk_bytes, passphrase)
r = json.dumps(o) r = json.dumps(o)
if not args.nonl: print(r)
r += "\n"
sys.stdout.write(r)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,62 +0,0 @@
# standard imports
import os
import logging
import sys
import json
import argparse
import getpass
# external impors
import coincurve
from hexathon import strip_0x
# local imports
from funga.error import DecryptError
from funga.eth.keystore.dict import DictKeystore
from funga.eth.signer import EIP155Signer
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
argparser = argparse.ArgumentParser()
argparser.add_argument('-f', type=str, help='Keyfile to use for signing')
argparser.add_argument('-z', action='store_true', help='zero-length password')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-0', dest='nonl', action='store_true', help='no newline at end of output')
argparser.add_argument('-b', '--binary', dest='binary', action='store_true', help='parse input as binary hex')
argparser.add_argument('msg', type=str, help='Message to sign')
args = argparser.parse_args()
if args.v:
logg.setLevel(logging.DEBUG)
def main():
passphrase = os.environ.get('WALLET_PASSPHRASE', os.environ.get('PASSPHRASE'))
if args.z:
passphrase = ''
if passphrase == None:
passphrase = getpass.getpass('decryption phrase: ')
keystore = DictKeystore()
address = keystore.import_keystore_file(args.f, password=passphrase)
signer = EIP155Signer(keystore)
msg = None
if args.binary:
hx = strip_0x(args.msg, pad=True)
msg = bytes.fromhex(hx)
else:
msg = args.msg.encode('utf-8').hex()
sig = signer.sign_ethereum_message(address, msg, password=passphrase)
r = sig.hex()
if not args.nonl:
r += "\n"
sys.stdout.write(r)
if __name__ == '__main__':
main()

View File

@ -1,10 +1,9 @@
cryptography==3.2.1 cryptography==3.2.1
pysha3==1.0.2 pysha3==1.0.2
rlp==2.0.1 rlp==2.0.1
#rlp==3.0.0
json-rpc==1.13.0 json-rpc==1.13.0
confini~=0.6.0 confini~=0.5.1
coincurve==15.0.0 coincurve==15.0.0
hexathon~=0.1.6 hexathon~=0.1.0
pycryptodome==3.10.1 pycryptodome==3.10.1
funga==0.5.2 funga==0.5.1

View File

@ -33,7 +33,7 @@ f.close()
setup( setup(
name="funga-eth", name="funga-eth",
version="0.6.2", version="0.5.2",
description="Ethereum implementation of the funga keystore and signer", description="Ethereum implementation of the funga keystore and signer",
author="Louis Holbrook", author="Louis Holbrook",
author_email="dev@holbrook.no", author_email="dev@holbrook.no",
@ -55,9 +55,8 @@ setup(
'console_scripts': [ 'console_scripts': [
'funga-ethd=funga.eth.runnable.signer:main', 'funga-ethd=funga.eth.runnable.signer:main',
'eth-keyfile=funga.eth.runnable.keyfile:main', 'eth-keyfile=funga.eth.runnable.keyfile:main',
'eth-sign-msg=funga.eth.runnable.msg:main',
], ],
}, },
url='https://git.grassecon.net/chaintool/funga-eth', url='https://gitlab.com/chaintool/funga-eth',
include_package_data=True, include_package_data=True,
) )

View File

@ -5,18 +5,10 @@ import os
# external imports # external imports
from hexathon import strip_0x from hexathon import strip_0x
from pathlib import Path
import sys
path_root = Path('/home/vincent/ida/grassroots/funga-eth/funga/eth/keystore')
sys.path.append(str(path_root))
print(sys.path)
# local imports # local imports
from funga.eth.signer import EIP155Signer from funga.eth.signer import EIP155Signer
from funga.eth.keystore.dict import DictKeystore from funga.eth.keystore.dict import DictKeystore
from funga.eth.cli.handle import SignRequestHandler from funga.eth.cli.handle import SignRequestHandler
from funga.eth.transaction import EIP155Transaction from funga.eth.transaction import EIP155Transaction
@ -26,30 +18,30 @@ logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__)) script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, 'testdata') data_dir = os.path.join(script_dir, 'testdata')
class TestCli(unittest.TestCase): class TestCli(unittest.TestCase):
def setUp(self): def setUp(self):
# pk = bytes.fromhex('5087503f0a9cc35b38665955eb830c63f778453dd11b8fa5bd04bc41fd2cc6d6') #pk = bytes.fromhex('5087503f0a9cc35b38665955eb830c63f778453dd11b8fa5bd04bc41fd2cc6d6')
# pk_getter = pkGetter(pk) #pk_getter = pkGetter(pk)
self.keystore = DictKeystore() self.keystore = DictKeystore()
SignRequestHandler.keystore = self.keystore SignRequestHandler.keystore = self.keystore
self.signer = EIP155Signer(self.keystore) self.signer = EIP155Signer(self.keystore)
SignRequestHandler.signer = self.signer SignRequestHandler.signer = self.signer
self.handler = SignRequestHandler() self.handler = SignRequestHandler()
def test_new_account(self): def test_new_account(self):
q = { q = {
'id': 0, 'id': 0,
'method': 'personal_newAccount', 'method': 'personal_newAccount',
'params': [''], 'params': [''],
} }
(rpc_id, result) = self.handler.process_input(q) (rpc_id, result) = self.handler.process_input(q)
self.assertTrue(self.keystore.get(result)) self.assertTrue(self.keystore.get(result))
def test_sign_tx(self): def test_sign_tx(self):
keystore_file = os.path.join(data_dir, keystore_file = os.path.join(data_dir, 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
sender = self.keystore.import_keystore_file(keystore_file) sender = self.keystore.import_keystore_file(keystore_file)
tx_hexs = { tx_hexs = {
'nonce': '0x', 'nonce': '0x',
@ -70,31 +62,26 @@ class TestCli(unittest.TestCase):
# eth_signTransaction wraps personal_signTransaction, so here we test both already # eth_signTransaction wraps personal_signTransaction, so here we test both already
q = { q = {
'id': 0, 'id': 0,
'method': 'eth_signTransaction', 'method': 'eth_signTransaction',
'params': [tx_s], 'params': [tx_s],
} }
(rpc_id, result) = self.handler.process_input(q) (rpc_id, result) = self.handler.process_input(q)
logg.debug('result {}'.format(result)) logg.debug('result {}'.format(result))
self.assertEqual(strip_0x(result), self.assertEqual(strip_0x(result), 'f86c2a8504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef82466aa0b7c1bbf52f736ada30fe253c7484176f44d6fd097a9720dc85ae5bbc7f060e54a07afee2563b0cf6d00333df51cc62b0d13c63108b2bce54ce2ad24e26ce7b4f25')
'f86c2a8504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef82466aa0b7c1bbf52f736ada30fe253c7484176f44d6fd097a9720dc85ae5bbc7f060e54a07afee2563b0cf6d00333df51cc62b0d13c63108b2bce54ce2ad24e26ce7b4f25')
def test_sign_msg(self): def test_sign_msg(self):
keystore_file = os.path.join(data_dir, keystore_file = os.path.join(data_dir, 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
sender = self.keystore.import_keystore_file(keystore_file) sender = self.keystore.import_keystore_file(keystore_file)
q = { q = {
'id': 0, 'id': 0,
'method': 'eth_sign', 'method': 'eth_sign',
'params': [sender, '0xdeadbeef'], 'params': [sender, '0xdeadbeef'],
} }
(rpc_id, result) = self.handler.process_input(q) (rpc_id, result) = self.handler.process_input(q)
logg.debug('result msg {}'.format(result)) logg.debug('result msg {}'.format(result))
self.assertEqual(strip_0x(result), self.assertEqual(strip_0x(result), '50320dda75190a121b7b5979de66edadafd02bdfbe4f6d49552e79c01410d2464aae35e385c0e5b61663ff7b44ef65fa0ac7ad8a57472cf405db399b9dba3e1600')
'50320dda75190a121b7b5979de66edadafd02bdfbe4f6d49552e79c01410d2464aae35e385c0e5b61663ff7b44ef65fa0ac7ad8a57472cf405db399b9dba3e1600')
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,59 +0,0 @@
#!/usr/bin/python
# standard imports
import unittest
import logging
import base64
import os
# external imports
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from funga.error import UnknownAccountError
from funga.eth.keystore.dict import DictKeystore
from funga.eth.signer import EIP155Signer
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
class TestDict(unittest.TestCase):
address_hex = None
db = None
def setUp(self):
self.db = DictKeystore()
keystore_filepath = os.path.join(script_dir, 'testdata',
'UTC--2022-01-24T10-34-04Z--cc47ad90-71a0-7fbe-0224-63326e27263a')
address_hex = self.db.import_keystore_file(keystore_filepath, 'test')
self.address_hex = add_0x(address_hex)
def tearDown(self):
pass
def test_get_key(self):
logg.debug('getting {}'.format(strip_0x(self.address_hex)))
pk = self.db.get(strip_0x(self.address_hex), '')
self.assertEqual(self.address_hex.lower(), '0xb8df77e1b4fa142e83bf9706f66fd76ad2a564f8')
bogus_account = os.urandom(20).hex()
with self.assertRaises(UnknownAccountError):
self.db.get(bogus_account, '')
def test_sign_message(self):
s = EIP155Signer(self.db)
z = s.sign_ethereum_message(strip_0x(self.address_hex), b'foo')
logg.debug('zzz {}'.format(str(z)))
if __name__ == '__main__':
unittest.main()

View File

@ -1 +0,0 @@
{"id":"cc47ad90-71a0-7fbe-0224-63326e27263a","version":3,"crypto":{"cipher":"aes-128-ctr","cipherparams":{"iv":"7bff67c888a9878a88e8548a4598322d"},"ciphertext":"0cb0e3c69d224d0a645f2784b64f507e5aecdc7bb8a7ea31963d25e6b8020ccf","kdf":"pbkdf2","kdfparams":{"c":10240,"dklen":32,"prf":"hmac-sha256","salt":"02f8b51b07a66a357c2d812952e6bee70fccc2e6a55e7cbd5c22d97d32fa8873"},"mac":"bb45aaabdb9fbbbde89631444ac39f8d76107381f16591799664274fd5d8c5bb"},"address":"b8df77e1b4fa142e83bf9706f66fd76ad2a564f8","name":"","meta":"{}"}