82 lines
2.2 KiB
Python
82 lines
2.2 KiB
Python
# standard imports
|
|
import stat
|
|
import os
|
|
import logging
|
|
|
|
# external imports
|
|
from hexathon import strip_0x
|
|
from cic_types.condiments import MetadataPointer
|
|
from cic_types.ext.metadata import MetadataRequestsHandler
|
|
from cic_types.models.person import Person
|
|
import requests.exceptions
|
|
|
|
# local imports
|
|
from clicada.encode import tx_normalize
|
|
from clicada.store.mem import MemDictStore
|
|
|
|
logg = logging.getLogger(__name__)
|
|
|
|
|
|
class FileUserStore:
|
|
|
|
def __init__(self, chain_spec, label, store_base_path):
|
|
self.label = label
|
|
self.store_path = os.path.join(
|
|
store_base_path,
|
|
chain_spec.arch(),
|
|
chain_spec.fork(),
|
|
str(chain_spec.network_id()),
|
|
self.label,
|
|
)
|
|
self.memstore_entity = MemDictStore()
|
|
os.makedirs(self.store_path, exist_ok=True)
|
|
self.__validate_dir()
|
|
|
|
|
|
def __validate_dir(self):
|
|
st = os.stat(self.store_path)
|
|
if stat.S_ISDIR(st.st_mode):
|
|
logg.debug('using existing file store {} for {}'.format(self.store_path, self.label))
|
|
|
|
|
|
def put(self, k, v, force=False):
|
|
have_file = False
|
|
p = os.path.join(self.store_path, k)
|
|
try:
|
|
st = os.stat(p)
|
|
have_file = True
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
if have_file and not force:
|
|
raise FileExistsError('user resolution already exists for {}'.format(k))
|
|
|
|
f = open(p, 'w')
|
|
f.write(v)
|
|
f.close()
|
|
|
|
logg.info('added user store {} record {} -> {}'.format(self.label, k, v))
|
|
|
|
|
|
def get(self, k):
|
|
p = os.path.join(self.store_path, k)
|
|
f = open(p, 'r')
|
|
r = f.read()
|
|
f.close()
|
|
return r.strip()
|
|
|
|
|
|
def get_metadata(self, address):
|
|
address = strip_0x(address)
|
|
getter = MetadataRequestsHandler(MetadataPointer.PERSON, bytes.fromhex(address))
|
|
r = None
|
|
try:
|
|
r = getter.query()
|
|
except requests.exceptions.HTTPError as e:
|
|
logg.debug('no metadata found for {}: {}'.format(address, e))
|
|
return address
|
|
data = r.json()
|
|
person = Person()
|
|
person_data = person.deserialize(person_data=data)
|
|
return str(person_data)
|