cic-cli/cic/meta.py
2021-11-29 08:02:12 +01:00

142 lines
4.0 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import os
import json
import logging
import base64
# external imports
from cic_types import MetadataPointer
from cic_types.processor import generate_metadata_pointer
from cic_types.ext.metadata import MetadataRequestsHandler
from hexathon import strip_0x
# local imports
from .base import (
Data,
data_dir,
)
from cic.output import OutputWriter
logg = logging.getLogger(__name__)
class Meta(Data):
"""Serialize and publish metadata for token.
The token metadata is any mutable data that is not part of the initial token proof, but published simultaneously as the token nonetheless.
:param path: Path to settings directory
:type path: str
:param writer: Writer interface receiving the output of the processor
:type writer: cic.output.OutputWriter
"""
def __init__(self, path='.', writer=None):
super(Meta, self).__init__()
self.name = None
self.contact = {}
self.path = path
self.writer = writer
self.meta_path = os.path.join(self.path, 'meta.json')
def load(self):
"""Load metadata from settings.
"""
super(Meta, self).load()
f = open(self.meta_path, 'r')
o = json.load(f)
f.close()
self.name = o['name']
self.contact = o['contact']
self.inited = True
def start(self):
"""Initialize metadata settings from template.
"""
super(Meta, self).start()
meta_template_file_path = os.path.join(data_dir, 'meta_template_v{}.json'.format(self.version()))
f = open(meta_template_file_path)
o = json.load(f)
f.close()
f = open(self.meta_path, 'w')
json.dump(o, f, sort_keys=True, indent="\t")
f.close()
def reference(self, token_address):
"""Calculate the mutable reference for the token metadata.
"""
token_address_bytes = bytes.fromhex(strip_0x(token_address))
return generate_metadata_pointer(token_address_bytes, MetadataPointer.TOKEN_META)
def asdict(self):
"""Output proof state to dict.
"""
return {
'name': self.name,
'contact': self.contact,
}
def process(self, token_address=None, token_symbol=None, writer=None):
"""Serialize and publish metadata.
See cic.processor.Processor.process
"""
if writer == None:
writer = self.writer
v = json.dumps(self.asdict())
token_address_bytes = bytes.fromhex(strip_0x(token_address))
k = generate_metadata_pointer(token_address_bytes, MetadataPointer.TOKEN_META)
writer.write(k, v.encode('utf-8'))
token_symbol_bytes = token_symbol.encode('utf-8')
k = generate_metadata_pointer(token_symbol_bytes, MetadataPointer.TOKEN_META_SYMBOL)
writer.write(k, v.encode('utf-8'))
return (k, v)
def __str__(self):
s = "contact.name = {}\n".format(self.name)
for k in self.contact.keys():
if self.contact[k] == '':
continue
s += "contact.{} = {}\n".format(k.lower(), self.contact[k])
return s
class MetadataWriter(OutputWriter):
"""Custom writer for publishing data under immutable content-addressed pointers in the cic-meta storage backend.
Data that is not utf-8 will be converted to base64 before publishing.
Implements cic.output.OutputWriter
"""
def write(self, k, v):
rq = MetadataRequestsHandler(MetadataPointer.NONE, bytes.fromhex(k))
try:
v = v.decode('utf-8')
v = json.loads(v)
logg.debug('metadatawriter bindecode {} {}'.format(k, v))
except UnicodeDecodeError:
v = base64.b64encode(v).decode('utf-8')
v = json.loads(json.dumps(v))
logg.debug('metadatawriter b64encode {} {}'.format(k, v))
r = rq.create(v)
logg.info('metadata submitted at {}'.format(k))
return r