From 75eaf90205f30469344c1a1e46e7582579e77e87 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 17 Mar 2021 21:51:43 +0100 Subject: [PATCH] Add keyfile parser, creater --- crypto_dev_signer/encoding.py | 70 +++++++++++++ crypto_dev_signer/keystore/keyfile.py | 141 ++++++++++++++++++++++++++ crypto_dev_signer/runnable/keyfile.py | 39 ++++++- 3 files changed, 248 insertions(+), 2 deletions(-) create mode 100644 crypto_dev_signer/encoding.py create mode 100644 crypto_dev_signer/keystore/keyfile.py diff --git a/crypto_dev_signer/encoding.py b/crypto_dev_signer/encoding.py new file mode 100644 index 0000000..79d6532 --- /dev/null +++ b/crypto_dev_signer/encoding.py @@ -0,0 +1,70 @@ +# standard imports +import logging + +# external imports +import coincurve +import sha3 +from hexathon import ( + strip_0x, + uniform, + ) + +logg = logging.getLogger(__name__) + + +def private_key_to_address(pk, result_format='hex'): + pubk = coincurve.PublicKey.from_secret(pk.secret) + logg.debug('secret {} '.format(pk.secret.hex())) + pubk_bytes = pubk.format(compressed=False) + h = sha3.keccak_256() + logg.debug('public key bytes {}'.format(pubk_bytes.hex())) + h.update(pubk_bytes[1:]) + z = h.digest()[12:] + if result_format == 'hex': + return to_checksum_address(z[:20].hex()) + elif result_format == 'bytes': + return z[:20] + raise ValueError('invalid result format "{}"'.format(result_format)) + + +def is_address(address_hex): + try: + address_hex = strip_0x(address_hex) + except ValueError: + return False + return len(address_hex) == 40 + + +def is_checksum_address(address_hex): + hx = None + try: + hx = to_checksum(address_hex) + except ValueError: + return False + print('{} {}'.format(hx, address_hex)) + return hx == address_hex + + +def to_checksum_address(address_hex): + address_hex = strip_0x(address_hex) + address_hex = uniform(address_hex) + if len(address_hex) != 40: + raise ValueError('Invalid address length') + h = sha3.keccak_256() + h.update(address_hex.encode('utf-8')) + z = h.digest() + + checksum_address_hex = '0x' + + for (i, c) in enumerate(address_hex): + if c in '1234567890': + checksum_address_hex += c + elif c in 'abcdef': + if z[int(i / 2)] & (0x80 >> ((i % 2) * 4)) > 1: + checksum_address_hex += c.upper() + else: + checksum_address_hex += c + + return checksum_address_hex + +to_checksum = to_checksum_address diff --git a/crypto_dev_signer/keystore/keyfile.py b/crypto_dev_signer/keystore/keyfile.py new file mode 100644 index 0000000..d73ae98 --- /dev/null +++ b/crypto_dev_signer/keystore/keyfile.py @@ -0,0 +1,141 @@ +# standard imports +import os +import hashlib +import logging +import json +import uuid + +# external imports +from Crypto.Cipher import AES +from Crypto.Util import Counter +import sha3 + +# local imports +from crypto_dev_signer.encoding import private_key_to_address + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + +algo_keywords = [ + 'aes-128-ctr', + ] +hash_keywords = [ + 'scrypt' + ] + +default_kdfparams = { + 'dklen': 32, + 'n': 1 << 18, + 'p': 1, + 'r': 8, + 'salt': os.urandom(32).hex(), + } + + +def to_mac(mac_key, ciphertext_bytes): + h = sha3.keccak_256() + h.update(mac_key) + h.update(ciphertext_bytes) + return h.digest() + + +class Hashes: + + @staticmethod + def from_scrypt(kdfparams=default_kdfparams, passphrase=''): + dklen = int(kdfparams['dklen']) + n = int(kdfparams['n']) + p = int(kdfparams['p']) + r = int(kdfparams['r']) + salt = bytes.fromhex(kdfparams['salt']) + + return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt,n=n, p=p, r=r, maxmem=1024*1024*1024, dklen=dklen) + + +class Ciphers: + + aes_128_block_size = 1 << 7 + aes_iv_len = 16 + + @staticmethod + def decrypt_aes_128_ctr(ciphertext, decryption_key, iv): + ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv) + cipher = AES.new(decryption_key, AES.MODE_CTR, counter=ctr) + plaintext = cipher.decrypt(ciphertext) + return plaintext + + + @staticmethod + def encrypt_aes_128_ctr(plaintext, encryption_key, iv): + ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv) + cipher = AES.new(encryption_key, AES.MODE_CTR, counter=ctr) + ciphertext = cipher.encrypt(plaintext) + return ciphertext + + +def to_dict(private_key, passphrase=''): + + encryption_key = Hashes.from_scrypt(passphrase=passphrase) + + address_hex = private_key_to_address(private_key) + iv_bytes = os.urandom(Ciphers.aes_iv_len) + iv = int.from_bytes(iv_bytes, 'big') + ciphertext_bytes = Ciphers.encrypt_aes_128_ctr(private_key.secret, encryption_key[:16], iv) + + mac = to_mac(encryption_key[16:], ciphertext_bytes) + + crypto_dict = { + 'cipher': 'aes-128-ctr', + 'ciphertext': ciphertext_bytes.hex(), + 'cipherparams': { + 'iv': iv_bytes.hex(), + }, + 'kdf': 'scrypt', + 'kdfparams': default_kdfparams, + 'mac': mac.hex(), + } + + uu = uuid.uuid1() + o = { + 'address': address_hex, + 'version': 3, + 'crypto': crypto_dict, + 'id': str(uu), + } + return o + + +def from_dict(o, passphrase=''): + + cipher = o['crypto']['cipher'] + if cipher not in algo_keywords: + raise NotImplementedError('cipher "{}" not implemented'.format(cipher)) + + kdf = o['crypto']['kdf'] + if kdf not in hash_keywords: + raise NotImplementedError('kdf "{}" not implemented'.format(kdf)) + + m = getattr(Hashes, 'from_{}'.format(kdf.replace('-', '_'))) + decryption_key = m(o['crypto']['kdfparams'], passphrase) + + control_mac = bytes.fromhex(o['crypto']['mac']) + iv_bytes = bytes.fromhex(o['crypto']['cipherparams']['iv']) + iv = int.from_bytes(iv_bytes, "big") + ciphertext_bytes = bytes.fromhex(o['crypto']['ciphertext']) + + # check mac + calculated_mac = to_mac(decryption_key[16:], ciphertext_bytes) + assert control_mac == calculated_mac + + m = getattr(Ciphers, 'decrypt_{}'.format(cipher.replace('-', '_'))) + pk = m(ciphertext_bytes, decryption_key[:16], iv) + return pk + + +def from_file(filepath, passphrase=''): + + f = open(filepath, 'r') + o = json.load(f) + f.close() + + return from_dict(o, passphrase) diff --git a/crypto_dev_signer/runnable/keyfile.py b/crypto_dev_signer/runnable/keyfile.py index dedc62a..ad9937b 100644 --- a/crypto_dev_signer/runnable/keyfile.py +++ b/crypto_dev_signer/runnable/keyfile.py @@ -1,8 +1,43 @@ # standard imports +import os import logging import sys +import json +import argparse + +# external impors +import coincurve # local imports -from crypto_dev_signer.keystore.keyfile import parse_file +from crypto_dev_signer.keystore.keyfile import ( + from_file, + to_dict, + ) -print(from_file(sys.argv[1]).hex()) + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +argparser = argparse.ArgumentParser() +argparser.add_argument('-d', type=str, help='decrypt file') +argparser.add_argument('-v', action='store_true', help='be verbose') +argparser.add_argument('arg', type=str, help='decrypt file') +args = argparser.parse_args() + +if args.v: + logg.setLevel(logging.DEBUG) + +r = None +if args.d: + try: + r = from_file(args.d, args.arg).hex() + except AssertionError: + sys.stderr.write('Invalid passphrase\n') + sys.exit(1) +else: + pk_bytes = os.urandom(32) + pk = coincurve.PrivateKey(secret=pk_bytes) + o = to_dict(pk, args.arg) + r = json.dumps(o) + +print(r)