182 lines
5.1 KiB
Python
182 lines
5.1 KiB
Python
# standard imports
|
|
import os
|
|
import json
|
|
import logging
|
|
import tempfile
|
|
import cbor2
|
|
|
|
# external imports
|
|
from hexathon import strip_0x
|
|
from cic_types import MetadataPointer
|
|
from cic_types.processor import generate_metadata_pointer
|
|
from cic_types.ext.metadata import MetadataRequestsHandler
|
|
|
|
# local imports
|
|
from .base import *
|
|
from cic.output import OutputWriter
|
|
|
|
logg = logging.getLogger(__name__)
|
|
|
|
|
|
class Proof(Data):
|
|
"""Proof handles the immutable token proof data mapped to the initial token deployment.
|
|
|
|
It processes inputs from the proof.json file in the session directory.
|
|
|
|
Optionally, attachment objects can be added to the proof. If added, the resulting proof digest will consists of the attachment digests added to the root digest. These are then are deterministically ordered, regardless of which order attachments were given to the constructor.
|
|
|
|
:param path: Path to settings directory
|
|
:type path: str
|
|
:param attachments: List of attachment objects to include in the proof
|
|
:type attachments: cic.attachment.Attachment
|
|
:param writer: Writer interface receiving the output of the processor
|
|
:type writer: cic.output.OutputWriter
|
|
"""
|
|
|
|
def __init__(self, path='.', attachments=None, writer=None):
|
|
super(Proof, self).__init__()
|
|
self.proofs = []
|
|
self.namespace = 'ge'
|
|
self.description = None
|
|
self.issuer = None
|
|
self.path = path
|
|
self.writer = writer
|
|
self.extra_attachments = attachments
|
|
self.attachments = {}
|
|
self.proof_path = os.path.join(self.path, 'proof.json')
|
|
self.temp_proof_path = tempfile.mkstemp()[1]
|
|
|
|
|
|
def load(self):
|
|
"""Load proof data from settings.
|
|
"""
|
|
super(Proof, self).load()
|
|
|
|
f = open(self.proof_path, 'r')
|
|
o = json.load(f)
|
|
f.close()
|
|
|
|
self.set_version(o['version'])
|
|
self.description = o['description']
|
|
self.namespace = o['namespace']
|
|
self.issuer = o['issuer']
|
|
self.proofs = o['proofs']
|
|
|
|
if self.extra_attachments != None:
|
|
a = self.extra_attachments.asdict()
|
|
for k in a.keys():
|
|
self.attachments[k] = a[k]
|
|
|
|
hshs = self.__get_ordered_hashes()
|
|
self.proofs = list(map(strip_0x, hshs))
|
|
|
|
self.inited = True
|
|
|
|
|
|
def start(self):
|
|
"""Initialize proof settings from template.
|
|
"""
|
|
super(Proof, self).start()
|
|
|
|
proof_template_file_path = os.path.join(data_dir, 'proof_template_v{}.json'.format(self.version()))
|
|
|
|
f = open(proof_template_file_path)
|
|
o = json.load(f)
|
|
f.close()
|
|
|
|
f = open(self.proof_path, 'w')
|
|
json.dump(o, f, sort_keys=True, indent="\t")
|
|
f.close()
|
|
|
|
|
|
def asdict(self):
|
|
"""Output proof state to dict.
|
|
"""
|
|
return {
|
|
'version': self.version(),
|
|
'namespace': self.namespace,
|
|
'description': self.description,
|
|
'issuer': self.issuer,
|
|
'proofs': self.proofs,
|
|
}
|
|
|
|
|
|
# TODO: the efficiency of this method could probably be improved.
|
|
def __get_ordered_hashes(self):
|
|
ks = list(self.attachments.keys())
|
|
ks.sort()
|
|
|
|
return ks
|
|
|
|
|
|
# def get(self):
|
|
# hsh = self.hash(b).hex()
|
|
# self.attachments[hsh] = self.temp_proof_path
|
|
# logg.debug('cbor of {} is {} hashes to {}'.format(v, b.hex(), hsh))
|
|
|
|
|
|
def root(self):
|
|
"""Calculate the root digest from the serialized proof object.
|
|
"""
|
|
v = self.asdict()
|
|
#b = cbor2.dumps(v)
|
|
b = json.dumps(v)
|
|
|
|
f = open(self.temp_proof_path, 'w')
|
|
f.write(b)
|
|
f.close()
|
|
|
|
b = b.encode('utf-8')
|
|
k = self.hash(b)
|
|
|
|
return (k.hex(), b)
|
|
|
|
|
|
def process(self, token_address=None, token_symbol=None, writer=None):
|
|
"""Serialize and publish proof.
|
|
|
|
See cic.processor.Processor.process
|
|
"""
|
|
if writer == None:
|
|
writer = self.writer
|
|
|
|
(k, v) = self.root()
|
|
writer.write(k, v)
|
|
root_key = k
|
|
|
|
token_symbol_bytes = token_symbol.encode('utf-8')
|
|
k = generate_metadata_pointer(token_symbol_bytes, MetadataPointer.TOKEN_PROOF_SYMBOL)
|
|
writer.write(k, v)
|
|
|
|
token_address_bytes = bytes.fromhex(strip_0x(token_address))
|
|
k = generate_metadata_pointer(token_address_bytes, MetadataPointer.TOKEN_PROOF)
|
|
writer.write(k, v)
|
|
|
|
# (hsh, hshs) = self.get()
|
|
#hshs = list(map(strip_0x, hshs))
|
|
# hshs_bin = list(map(bytes.fromhex, hshs))
|
|
# hshs_cat = b''.join(hshs_bin)
|
|
|
|
# f = open(self.temp_proof_path, 'rb')
|
|
# v = f.read()
|
|
# f.close()
|
|
# writer.write(hsh, v)
|
|
|
|
# r = self.hash(hshs_cat)
|
|
# r_hex = r.hex()
|
|
|
|
#logg.debug('generated proof {} for hashes {}'.format(r_hex, hshs))
|
|
|
|
#writer.write(r_hex, hshs_cat)
|
|
|
|
o = self.asdict()
|
|
f = open(self.proof_path, 'w')
|
|
json.dump(o, f, sort_keys=True, indent="\t")
|
|
f.close()
|
|
|
|
return root_key
|
|
|
|
|
|
def __str__(self):
|
|
return "description = {}\n".format(self.description)
|