Add keyfile parser, creater

This commit is contained in:
nolash 2021-03-17 21:51:43 +01:00
parent e973664490
commit 75eaf90205
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
3 changed files with 248 additions and 2 deletions

View File

@ -0,0 +1,70 @@
# standard imports
import logging
# external imports
import coincurve
import sha3
from hexathon import (
strip_0x,
uniform,
)
logg = logging.getLogger(__name__)
def private_key_to_address(pk, result_format='hex'):
pubk = coincurve.PublicKey.from_secret(pk.secret)
logg.debug('secret {} '.format(pk.secret.hex()))
pubk_bytes = pubk.format(compressed=False)
h = sha3.keccak_256()
logg.debug('public key bytes {}'.format(pubk_bytes.hex()))
h.update(pubk_bytes[1:])
z = h.digest()[12:]
if result_format == 'hex':
return to_checksum_address(z[:20].hex())
elif result_format == 'bytes':
return z[:20]
raise ValueError('invalid result format "{}"'.format(result_format))
def is_address(address_hex):
try:
address_hex = strip_0x(address_hex)
except ValueError:
return False
return len(address_hex) == 40
def is_checksum_address(address_hex):
hx = None
try:
hx = to_checksum(address_hex)
except ValueError:
return False
print('{} {}'.format(hx, address_hex))
return hx == address_hex
def to_checksum_address(address_hex):
address_hex = strip_0x(address_hex)
address_hex = uniform(address_hex)
if len(address_hex) != 40:
raise ValueError('Invalid address length')
h = sha3.keccak_256()
h.update(address_hex.encode('utf-8'))
z = h.digest()
checksum_address_hex = '0x'
for (i, c) in enumerate(address_hex):
if c in '1234567890':
checksum_address_hex += c
elif c in 'abcdef':
if z[int(i / 2)] & (0x80 >> ((i % 2) * 4)) > 1:
checksum_address_hex += c.upper()
else:
checksum_address_hex += c
return checksum_address_hex
to_checksum = to_checksum_address

View File

@ -0,0 +1,141 @@
# standard imports
import os
import hashlib
import logging
import json
import uuid
# external imports
from Crypto.Cipher import AES
from Crypto.Util import Counter
import sha3
# local imports
from crypto_dev_signer.encoding import private_key_to_address
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
algo_keywords = [
'aes-128-ctr',
]
hash_keywords = [
'scrypt'
]
default_kdfparams = {
'dklen': 32,
'n': 1 << 18,
'p': 1,
'r': 8,
'salt': os.urandom(32).hex(),
}
def to_mac(mac_key, ciphertext_bytes):
h = sha3.keccak_256()
h.update(mac_key)
h.update(ciphertext_bytes)
return h.digest()
class Hashes:
@staticmethod
def from_scrypt(kdfparams=default_kdfparams, passphrase=''):
dklen = int(kdfparams['dklen'])
n = int(kdfparams['n'])
p = int(kdfparams['p'])
r = int(kdfparams['r'])
salt = bytes.fromhex(kdfparams['salt'])
return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt,n=n, p=p, r=r, maxmem=1024*1024*1024, dklen=dklen)
class Ciphers:
aes_128_block_size = 1 << 7
aes_iv_len = 16
@staticmethod
def decrypt_aes_128_ctr(ciphertext, decryption_key, iv):
ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
cipher = AES.new(decryption_key, AES.MODE_CTR, counter=ctr)
plaintext = cipher.decrypt(ciphertext)
return plaintext
@staticmethod
def encrypt_aes_128_ctr(plaintext, encryption_key, iv):
ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
cipher = AES.new(encryption_key, AES.MODE_CTR, counter=ctr)
ciphertext = cipher.encrypt(plaintext)
return ciphertext
def to_dict(private_key, passphrase=''):
encryption_key = Hashes.from_scrypt(passphrase=passphrase)
address_hex = private_key_to_address(private_key)
iv_bytes = os.urandom(Ciphers.aes_iv_len)
iv = int.from_bytes(iv_bytes, 'big')
ciphertext_bytes = Ciphers.encrypt_aes_128_ctr(private_key.secret, encryption_key[:16], iv)
mac = to_mac(encryption_key[16:], ciphertext_bytes)
crypto_dict = {
'cipher': 'aes-128-ctr',
'ciphertext': ciphertext_bytes.hex(),
'cipherparams': {
'iv': iv_bytes.hex(),
},
'kdf': 'scrypt',
'kdfparams': default_kdfparams,
'mac': mac.hex(),
}
uu = uuid.uuid1()
o = {
'address': address_hex,
'version': 3,
'crypto': crypto_dict,
'id': str(uu),
}
return o
def from_dict(o, passphrase=''):
cipher = o['crypto']['cipher']
if cipher not in algo_keywords:
raise NotImplementedError('cipher "{}" not implemented'.format(cipher))
kdf = o['crypto']['kdf']
if kdf not in hash_keywords:
raise NotImplementedError('kdf "{}" not implemented'.format(kdf))
m = getattr(Hashes, 'from_{}'.format(kdf.replace('-', '_')))
decryption_key = m(o['crypto']['kdfparams'], passphrase)
control_mac = bytes.fromhex(o['crypto']['mac'])
iv_bytes = bytes.fromhex(o['crypto']['cipherparams']['iv'])
iv = int.from_bytes(iv_bytes, "big")
ciphertext_bytes = bytes.fromhex(o['crypto']['ciphertext'])
# check mac
calculated_mac = to_mac(decryption_key[16:], ciphertext_bytes)
assert control_mac == calculated_mac
m = getattr(Ciphers, 'decrypt_{}'.format(cipher.replace('-', '_')))
pk = m(ciphertext_bytes, decryption_key[:16], iv)
return pk
def from_file(filepath, passphrase=''):
f = open(filepath, 'r')
o = json.load(f)
f.close()
return from_dict(o, passphrase)

View File

@ -1,8 +1,43 @@
# standard imports # standard imports
import os
import logging import logging
import sys import sys
import json
import argparse
# external impors
import coincurve
# local imports # local imports
from crypto_dev_signer.keystore.keyfile import parse_file from crypto_dev_signer.keystore.keyfile import (
from_file,
to_dict,
)
print(from_file(sys.argv[1]).hex())
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
argparser = argparse.ArgumentParser()
argparser.add_argument('-d', type=str, help='decrypt file')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('arg', type=str, help='decrypt file')
args = argparser.parse_args()
if args.v:
logg.setLevel(logging.DEBUG)
r = None
if args.d:
try:
r = from_file(args.d, args.arg).hex()
except AssertionError:
sys.stderr.write('Invalid passphrase\n')
sys.exit(1)
else:
pk_bytes = os.urandom(32)
pk = coincurve.PrivateKey(secret=pk_bytes)
o = to_dict(pk, args.arg)
r = json.dumps(o)
print(r)