Implement PBKDF2 support in keyfile

---

Squashed commit of the following:

commit 4a4f76b19c
Author: idaapayo <idaapayo@gmail.com>
Date:   Mon Jan 24 20:12:34 2022 +0300

    remove unused import and renaming pbkdf2 default params

commit 23a94c3ba1
Author: idaapayo <idaapayo@gmail.com>
Date:   Mon Jan 24 20:07:22 2022 +0300

    defaulting to scrypt in to_dict fun

commit 1c0047398a
Author: idaapayo <idaapayo@gmail.com>
Date:   Mon Jan 24 15:12:38 2022 +0300

    making final review  changes

commit 0a4f3eaa98
Merge: b208533 903f659
Author: Mohamed Sohail <kamikazechaser@noreply.localhost>
Date:   Mon Jan 24 11:10:35 2022 +0000

    Merge branch 'master' into Ida/pbkdf2

commit b20853312d
Author: idaapayo <idaapayo@gmail.com>
Date:   Mon Jan 24 13:23:12 2022 +0300

    review changes with tests

commit b9c6db414b
Author: idaapayo <idaapayo@gmail.com>
Date:   Fri Jan 21 11:13:35 2022 +0300

    making review changes

commit 1f5d057a9a
Author: idaapayo <idaapayo@gmail.com>
Date:   Wed Jan 19 14:37:22 2022 +0300

    second pbkdf2 implementation

commit 01598a8c59
Author: idaapayo <idaapayo@gmail.com>
Date:   Wed Jan 19 13:49:29 2022 +0300

    pkdf2 implementation
This commit is contained in:
lash 2022-01-24 17:54:59 +00:00
parent 903f65936e
commit e5cd1cad58
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 151 additions and 50 deletions

View File

@ -13,28 +13,35 @@ import sha3
# local imports # local imports
from funga.error import ( from funga.error import (
DecryptError, DecryptError,
KeyfileError, KeyfileError,
) )
from funga.eth.encoding import private_key_to_address from funga.eth.encoding import private_key_to_address
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
algo_keywords = [ algo_keywords = [
'aes-128-ctr', 'aes-128-ctr',
] ]
hash_keywords = [ hash_keywords = [
'scrypt' 'scrypt',
] 'pbkdf2'
]
default_kdfparams = { default_scrypt_kdfparams = {
'dklen': 32, 'dklen': 32,
'n': 1 << 18, 'n': 1 << 18,
'p': 1, 'p': 1,
'r': 8, 'r': 8,
'salt': os.urandom(32).hex(), 'salt': os.urandom(32).hex(),
} }
default_pbkdf2_kdfparams = {
'c': 100000,
'dklen': 32,
'prf': 'sha256',
'salt': os.urandom(32).hex(),
}
def to_mac(mac_key, ciphertext_bytes): def to_mac(mac_key, ciphertext_bytes):
h = sha3.keccak_256() h = sha3.keccak_256()
@ -46,18 +53,32 @@ def to_mac(mac_key, ciphertext_bytes):
class Hashes: class Hashes:
@staticmethod @staticmethod
def from_scrypt(kdfparams=default_kdfparams, passphrase=''): def from_scrypt(kdfparams=default_scrypt_kdfparams, passphrase=''):
dklen = int(kdfparams['dklen']) dklen = int(kdfparams['dklen'])
n = int(kdfparams['n']) n = int(kdfparams['n'])
p = int(kdfparams['p']) p = int(kdfparams['p'])
r = int(kdfparams['r']) r = int(kdfparams['r'])
salt = bytes.fromhex(kdfparams['salt']) salt = bytes.fromhex(kdfparams['salt'])
return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt,n=n, p=p, r=r, maxmem=1024*1024*1024, dklen=dklen) return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt, n=n, p=p, r=r, maxmem=1024 * 1024 * 1024,
dklen=dklen)
@staticmethod
def from_pbkdf2(kdfparams=default_pbkdf2_kdfparams, passphrase=''):
if kdfparams['prf'] == 'hmac-sha256':
kdfparams['prf'].replace('hmac-sha256','sha256')
derived_key = hashlib.pbkdf2_hmac(
hash_name='sha256',
password=passphrase.encode('utf-8'),
salt=bytes.fromhex(kdfparams['salt']),
iterations=int(kdfparams['c']),
dklen=int(kdfparams['dklen'])
)
return derived_key
class Ciphers: class Ciphers:
aes_128_block_size = 1 << 7 aes_128_block_size = 1 << 7
aes_iv_len = 16 aes_iv_len = 16
@ -68,7 +89,6 @@ class Ciphers:
plaintext = cipher.decrypt(ciphertext) plaintext = cipher.decrypt(ciphertext)
return plaintext return plaintext
@staticmethod @staticmethod
def encrypt_aes_128_ctr(plaintext, encryption_key, iv): def encrypt_aes_128_ctr(plaintext, encryption_key, iv):
ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv) ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
@ -77,11 +97,19 @@ class Ciphers:
return ciphertext return ciphertext
def to_dict(private_key_bytes, passphrase=''): def to_dict(private_key_bytes, kdf='scrypt', passphrase=''):
private_key = coincurve.PrivateKey(secret=private_key_bytes) private_key = coincurve.PrivateKey(secret=private_key_bytes)
encryption_key = Hashes.from_scrypt(passphrase=passphrase) if kdf == 'scrypt':
encryption_key = Hashes.from_scrypt(passphrase=passphrase)
kdfparams = default_scrypt_kdfparams
elif kdf == 'pbkdf2':
encryption_key = Hashes.from_pbkdf2(passphrase=passphrase)
kdfparams = pbkdf2_kdfparams
else:
raise NotImplementedError("KDF not implemented: {0}".format(kdf))
address_hex = private_key_to_address(private_key) address_hex = private_key_to_address(private_key)
iv_bytes = os.urandom(Ciphers.aes_iv_len) iv_bytes = os.urandom(Ciphers.aes_iv_len)
@ -95,11 +123,11 @@ def to_dict(private_key_bytes, passphrase=''):
'ciphertext': ciphertext_bytes.hex(), 'ciphertext': ciphertext_bytes.hex(),
'cipherparams': { 'cipherparams': {
'iv': iv_bytes.hex(), 'iv': iv_bytes.hex(),
}, },
'kdf': 'scrypt', 'kdf': kdf,
'kdfparams': default_kdfparams, 'kdfparams': kdfparams,
'mac': mac.hex(), 'mac': mac.hex(),
} }
uu = uuid.uuid1() uu = uuid.uuid1()
o = { o = {
@ -107,12 +135,11 @@ def to_dict(private_key_bytes, passphrase=''):
'version': 3, 'version': 3,
'crypto': crypto_dict, 'crypto': crypto_dict,
'id': str(uu), 'id': str(uu),
} }
return o return o
def from_dict(o, passphrase=''): def from_dict(o, passphrase=''):
cipher = o['crypto']['cipher'] cipher = o['crypto']['cipher']
if cipher not in algo_keywords: if cipher not in algo_keywords:
raise NotImplementedError('cipher "{}" not implemented'.format(cipher)) raise NotImplementedError('cipher "{}" not implemented'.format(cipher))
@ -121,19 +148,19 @@ def from_dict(o, passphrase=''):
if kdf not in hash_keywords: if kdf not in hash_keywords:
raise NotImplementedError('kdf "{}" not implemented'.format(kdf)) raise NotImplementedError('kdf "{}" not implemented'.format(kdf))
m = getattr(Hashes, 'from_{}'.format(kdf.replace('-', '_'))) m = getattr(Hashes, 'from_{}'.format(kdf.replace('-', '_')))
decryption_key = m(o['crypto']['kdfparams'], passphrase) decryption_key = m(o['crypto']['kdfparams'], passphrase)
control_mac = bytes.fromhex(o['crypto']['mac']) control_mac = bytes.fromhex(o['crypto']['mac'])
iv_bytes = bytes.fromhex(o['crypto']['cipherparams']['iv']) iv_bytes = bytes.fromhex(o['crypto']['cipherparams']['iv'])
iv = int.from_bytes(iv_bytes, "big") iv = int.from_bytes(iv_bytes, "big")
ciphertext_bytes = bytes.fromhex(o['crypto']['ciphertext']) ciphertext_bytes = bytes.fromhex(o['crypto']['ciphertext'])
# check mac # check mac
calculated_mac = to_mac(decryption_key[16:], ciphertext_bytes) calculated_mac = to_mac(decryption_key[16:], ciphertext_bytes)
if control_mac != calculated_mac: if control_mac != calculated_mac:
raise DecryptError('mac mismatch when decrypting passphrase') raise DecryptError('mac mismatch when decrypting passphrase')
m = getattr(Ciphers, 'decrypt_{}'.format(cipher.replace('-', '_'))) m = getattr(Ciphers, 'decrypt_{}'.format(cipher.replace('-', '_')))
try: try:
@ -145,7 +172,6 @@ def from_dict(o, passphrase=''):
def from_file(filepath, passphrase=''): def from_file(filepath, passphrase=''):
f = open(filepath, 'r') f = open(filepath, 'r')
try: try:
o = json.load(f) o = json.load(f)

View File

@ -16,6 +16,8 @@ from funga.eth.keystore.keyfile import (
from_file, from_file,
to_dict, to_dict,
) )
from funga.eth.encoding import ( from funga.eth.encoding import (
private_key_to_address, private_key_to_address,
private_key_from_bytes, private_key_from_bytes,

View File

@ -5,10 +5,18 @@ import os
# external imports # external imports
from hexathon import strip_0x from hexathon import strip_0x
from pathlib import Path
import sys
path_root = Path('/home/vincent/ida/grassroots/funga-eth/funga/eth/keystore')
sys.path.append(str(path_root))
print(sys.path)
# local imports # local imports
from funga.eth.signer import EIP155Signer from funga.eth.signer import EIP155Signer
from funga.eth.keystore.dict import DictKeystore from funga.eth.keystore.dict import DictKeystore
from funga.eth.cli.handle import SignRequestHandler from funga.eth.cli.handle import SignRequestHandler
from funga.eth.transaction import EIP155Transaction from funga.eth.transaction import EIP155Transaction
@ -18,30 +26,30 @@ logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__)) script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, 'testdata') data_dir = os.path.join(script_dir, 'testdata')
class TestCli(unittest.TestCase): class TestCli(unittest.TestCase):
def setUp(self): def setUp(self):
#pk = bytes.fromhex('5087503f0a9cc35b38665955eb830c63f778453dd11b8fa5bd04bc41fd2cc6d6') # pk = bytes.fromhex('5087503f0a9cc35b38665955eb830c63f778453dd11b8fa5bd04bc41fd2cc6d6')
#pk_getter = pkGetter(pk) # pk_getter = pkGetter(pk)
self.keystore = DictKeystore() self.keystore = DictKeystore()
SignRequestHandler.keystore = self.keystore SignRequestHandler.keystore = self.keystore
self.signer = EIP155Signer(self.keystore) self.signer = EIP155Signer(self.keystore)
SignRequestHandler.signer = self.signer SignRequestHandler.signer = self.signer
self.handler = SignRequestHandler() self.handler = SignRequestHandler()
def test_new_account(self): def test_new_account(self):
q = { q = {
'id': 0, 'id': 0,
'method': 'personal_newAccount', 'method': 'personal_newAccount',
'params': [''], 'params': [''],
} }
(rpc_id, result) = self.handler.process_input(q) (rpc_id, result) = self.handler.process_input(q)
self.assertTrue(self.keystore.get(result)) self.assertTrue(self.keystore.get(result))
def test_sign_tx(self): def test_sign_tx(self):
keystore_file = os.path.join(data_dir, 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72') keystore_file = os.path.join(data_dir,
'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
sender = self.keystore.import_keystore_file(keystore_file) sender = self.keystore.import_keystore_file(keystore_file)
tx_hexs = { tx_hexs = {
'nonce': '0x', 'nonce': '0x',
@ -62,26 +70,31 @@ class TestCli(unittest.TestCase):
# eth_signTransaction wraps personal_signTransaction, so here we test both already # eth_signTransaction wraps personal_signTransaction, so here we test both already
q = { q = {
'id': 0, 'id': 0,
'method': 'eth_signTransaction', 'method': 'eth_signTransaction',
'params': [tx_s], 'params': [tx_s],
} }
(rpc_id, result) = self.handler.process_input(q) (rpc_id, result) = self.handler.process_input(q)
logg.debug('result {}'.format(result)) logg.debug('result {}'.format(result))
self.assertEqual(strip_0x(result), 'f86c2a8504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef82466aa0b7c1bbf52f736ada30fe253c7484176f44d6fd097a9720dc85ae5bbc7f060e54a07afee2563b0cf6d00333df51cc62b0d13c63108b2bce54ce2ad24e26ce7b4f25') self.assertEqual(strip_0x(result),
'f86c2a8504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef82466aa0b7c1bbf52f736ada30fe253c7484176f44d6fd097a9720dc85ae5bbc7f060e54a07afee2563b0cf6d00333df51cc62b0d13c63108b2bce54ce2ad24e26ce7b4f25')
def test_sign_msg(self): def test_sign_msg(self):
keystore_file = os.path.join(data_dir, 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72') keystore_file = os.path.join(data_dir,
'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
sender = self.keystore.import_keystore_file(keystore_file) sender = self.keystore.import_keystore_file(keystore_file)
q = { q = {
'id': 0, 'id': 0,
'method': 'eth_sign', 'method': 'eth_sign',
'params': [sender, '0xdeadbeef'], 'params': [sender, '0xdeadbeef'],
} }
(rpc_id, result) = self.handler.process_input(q) (rpc_id, result) = self.handler.process_input(q)
logg.debug('result msg {}'.format(result)) logg.debug('result msg {}'.format(result))
self.assertEqual(strip_0x(result), '50320dda75190a121b7b5979de66edadafd02bdfbe4f6d49552e79c01410d2464aae35e385c0e5b61663ff7b44ef65fa0ac7ad8a57472cf405db399b9dba3e1600') self.assertEqual(strip_0x(result),
'50320dda75190a121b7b5979de66edadafd02bdfbe4f6d49552e79c01410d2464aae35e385c0e5b61663ff7b44ef65fa0ac7ad8a57472cf405db399b9dba3e1600')
if __name__ == '__main__': if __name__ == '__main__':

59
tests/test_pbkdf2.py Normal file
View File

@ -0,0 +1,59 @@
#!/usr/bin/python
# standard imports
import unittest
import logging
import base64
import os
# external imports
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from funga.error import UnknownAccountError
from funga.eth.keystore.dict import DictKeystore
from funga.eth.signer import EIP155Signer
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
class TestDict(unittest.TestCase):
address_hex = None
db = None
def setUp(self):
self.db = DictKeystore()
keystore_filepath = os.path.join(script_dir, 'testdata',
'UTC--2022-01-24T10-34-04Z--cc47ad90-71a0-7fbe-0224-63326e27263a')
address_hex = self.db.import_keystore_file(keystore_filepath, 'test')
self.address_hex = add_0x(address_hex)
def tearDown(self):
pass
def test_get_key(self):
logg.debug('getting {}'.format(strip_0x(self.address_hex)))
pk = self.db.get(strip_0x(self.address_hex), '')
self.assertEqual(self.address_hex.lower(), '0xb8df77e1b4fa142e83bf9706f66fd76ad2a564f8')
bogus_account = os.urandom(20).hex()
with self.assertRaises(UnknownAccountError):
self.db.get(bogus_account, '')
def test_sign_message(self):
s = EIP155Signer(self.db)
z = s.sign_ethereum_message(strip_0x(self.address_hex), b'foo')
logg.debug('zzz {}'.format(str(z)))
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1 @@
{"id":"cc47ad90-71a0-7fbe-0224-63326e27263a","version":3,"crypto":{"cipher":"aes-128-ctr","cipherparams":{"iv":"7bff67c888a9878a88e8548a4598322d"},"ciphertext":"0cb0e3c69d224d0a645f2784b64f507e5aecdc7bb8a7ea31963d25e6b8020ccf","kdf":"pbkdf2","kdfparams":{"c":10240,"dklen":32,"prf":"hmac-sha256","salt":"02f8b51b07a66a357c2d812952e6bee70fccc2e6a55e7cbd5c22d97d32fa8873"},"mac":"bb45aaabdb9fbbbde89631444ac39f8d76107381f16591799664274fd5d8c5bb"},"address":"b8df77e1b4fa142e83bf9706f66fd76ad2a564f8","name":"","meta":"{}"}