197 lines
5.9 KiB
Python
197 lines
5.9 KiB
Python
# standard imports
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
|
|
# external imports
|
|
from hexathon import strip_0x
|
|
from cic_types import MetadataPointer
|
|
from cic_types.processor import generate_metadata_pointer
|
|
|
|
# local imports
|
|
from cic.contract.base import Data, data_dir
|
|
from cic.utils import object_to_str
|
|
|
|
|
|
|
|
logg = logging.getLogger(__name__)
|
|
|
|
|
|
class Proof(Data):
|
|
"""Proof handles the immutable token proof data mapped to the initial token deployment.
|
|
|
|
It processes inputs from the proof.json file in the session directory.
|
|
|
|
Optionally, attachment objects can be added to the proof. If added, the resulting proof
|
|
digest will consists of the attachment digests added to the root digest. These are then
|
|
are deterministically ordered, regardless of which order attachments were given to the constructor.
|
|
|
|
:param path: Path to settings directory
|
|
:type path: str
|
|
:param attachments: List of attachment objects to include in the proof
|
|
:type attachments: cic.attachment.Attachment
|
|
:param writer: Writer interface receiving the output of the processor
|
|
:type writer: cic.writers.OutputWriter
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
path=".",
|
|
description="",
|
|
namespace="ge",
|
|
issuer="",
|
|
attachments=None,
|
|
writer=None,
|
|
interactive=False,
|
|
):
|
|
super(Proof, self).__init__()
|
|
self.proofs = []
|
|
self.namespace = namespace
|
|
self.description = description
|
|
self.issuer = issuer
|
|
self.path = path
|
|
self.writer = writer
|
|
self.extra_attachments = attachments
|
|
self.attachments = {}
|
|
self.proof_path = os.path.join(self.path, "proof.json")
|
|
self.temp_proof_path = tempfile.mkstemp()[1]
|
|
|
|
if interactive:
|
|
self.description = (
|
|
input(f"Enter Proof Description ({self.description}): ")
|
|
or self.description
|
|
)
|
|
self.namespace = (
|
|
input(f"Enter Proof Namespace ({self.namespace}): ") or self.namespace
|
|
)
|
|
self.issuer = input(f"Enter Proof Issuer ({self.issuer}): ") or self.issuer
|
|
|
|
def load(self):
|
|
"""Load proof data from settings."""
|
|
super(Proof, self).load()
|
|
|
|
f = open(self.proof_path, "r", encoding="utf-8")
|
|
o = json.load(f)
|
|
f.close()
|
|
|
|
self.set_version(o["version"])
|
|
self.description = o["description"]
|
|
self.namespace = o["namespace"]
|
|
self.issuer = o["issuer"]
|
|
self.proofs = o["proofs"]
|
|
|
|
if self.extra_attachments is not None:
|
|
a = self.extra_attachments.asdict()
|
|
for k in a.keys():
|
|
self.attachments[k] = a[k]
|
|
|
|
hshs = self.__get_ordered_hashes()
|
|
self.proofs = list(map(strip_0x, hshs))
|
|
|
|
self.inited = True
|
|
|
|
def start(self):
|
|
"""Initialize proof settings from template."""
|
|
super(Proof, self).start()
|
|
|
|
proof_template_file_path = os.path.join(
|
|
data_dir, f"proof_template_v{self.version()}.json"
|
|
)
|
|
|
|
with open(proof_template_file_path, "r", encoding="utf-8") as f:
|
|
o = json.load(f)
|
|
|
|
o["issuer"] = self.issuer
|
|
o["description"] = self.description
|
|
o["namespace"] = self.namespace
|
|
|
|
with open(self.proof_path, "w", encoding="utf-8") as f:
|
|
json.dump(o, f, sort_keys=True, indent="\t")
|
|
|
|
def asdict(self):
|
|
"""Output proof state to dict."""
|
|
return {
|
|
"version": self.version(),
|
|
"namespace": self.namespace,
|
|
"description": self.description,
|
|
"issuer": self.issuer,
|
|
"proofs": self.proofs,
|
|
}
|
|
|
|
# TODO: the efficiency of this method could probably be improved.
|
|
def __get_ordered_hashes(self):
|
|
ks = list(self.attachments.keys())
|
|
ks.sort()
|
|
|
|
return ks
|
|
|
|
# def get(self):
|
|
# hsh = self.hash(b).hex()
|
|
# self.attachments[hsh] = self.temp_proof_path
|
|
# logg.debug('cbor of {} is {} hashes to {}'.format(v, b.hex(), hsh))
|
|
|
|
def root(self):
|
|
"""Calculate the root digest from the serialized proof object."""
|
|
v = self.asdict()
|
|
# b = cbor2.dumps(v)
|
|
b = json.dumps(v, separators=(",", ":"))
|
|
|
|
with open(self.temp_proof_path, "w", encoding="utf-8") as f:
|
|
f.write(b)
|
|
|
|
b = b.encode("utf-8")
|
|
k = self.hash(b)
|
|
|
|
return (k.hex(), b)
|
|
|
|
def process(self, token_address=None, token_symbol=None, writer=None):
|
|
"""Serialize and publish proof.
|
|
|
|
See cic.processor.Processor.process
|
|
"""
|
|
if writer is None:
|
|
writer = self.writer
|
|
|
|
(k, v) = self.root()
|
|
writer.write(k, v)
|
|
root_key = k
|
|
|
|
token_symbol_bytes = token_symbol.encode("utf-8")
|
|
k = generate_metadata_pointer(
|
|
token_symbol_bytes, MetadataPointer.TOKEN_PROOF_SYMBOL
|
|
)
|
|
writer.write(k, v)
|
|
|
|
token_address_bytes = bytes.fromhex(strip_0x(token_address))
|
|
k = generate_metadata_pointer(token_address_bytes, MetadataPointer.TOKEN_PROOF)
|
|
writer.write(k, v)
|
|
|
|
# (hsh, hshs) = self.get()
|
|
# hshs = list(map(strip_0x, hshs))
|
|
# hshs_bin = list(map(bytes.fromhex, hshs))
|
|
# hshs_cat = b''.join(hshs_bin)
|
|
|
|
# f = open(self.temp_proof_path, 'rb')
|
|
# v = f.read()
|
|
# f.close()
|
|
# writer.write(hsh, v)
|
|
|
|
# r = self.hash(hshs_cat)
|
|
# r_hex = r.hex()
|
|
|
|
# logg.debug('generated proof {} for hashes {}'.format(r_hex, hshs))
|
|
|
|
# writer.write(r_hex, hshs_cat)
|
|
|
|
o = self.asdict()
|
|
with open(self.proof_path, "w", encoding="utf-8") as f:
|
|
json.dump(o, f, sort_keys=True, indent="\t")
|
|
|
|
return root_key
|
|
|
|
def __str__(self):
|
|
return object_to_str(
|
|
self, ["description", "issuer", "namespace", "version()", "proofs"]
|
|
)
|