clicada/clicada/user/file.py

149 lines
4.5 KiB
Python

# standard imports
import stat
import os
import logging
import json
import urllib.parse
import datetime
# external imports
from hexathon import strip_0x
from cic_types.condiments import MetadataPointer
from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.models.person import Person
from cic_types.ext.requests import make_request
from cic_types.processor import generate_metadata_pointer
import requests.exceptions
import phonenumbers
# local imports
from clicada.encode import tx_normalize
from clicada.store.mem import MemDictStore
from clicada.error import ExpiredRecordError
logg = logging.getLogger(__name__)
class FileUserStore:
def __init__(self, chain_spec, label, store_base_path, ttl):
invalidate_before = datetime.datetime.now() - datetime.timedelta(seconds=ttl)
self.invalidate_before = int(invalidate_before.timestamp())
self.label = label
self.store_path = os.path.join(
store_base_path,
chain_spec.arch(),
chain_spec.fork(),
str(chain_spec.network_id()),
self.label,
)
self.memstore_entity = MemDictStore()
os.makedirs(self.store_path, exist_ok=True)
self.__validate_dir()
def __validate_dir(self):
st = os.stat(self.store_path)
if stat.S_ISDIR(st.st_mode):
logg.debug('using existing file store {} for {}'.format(self.store_path, self.label))
def put(self, k, v, force=False):
have_file = False
p = os.path.join(self.store_path, k)
its_time = True
try:
st = os.stat(p)
have_file = True
its_time = st[stat.ST_MTIME] < self.invalidate_before
except FileNotFoundError:
pass
if have_file and not its_time and not force:
raise FileExistsError('user resolution already exists for {}'.format(k))
f = open(p, 'w')
f.write(v)
f.close()
logg.info('added user store {} record {} -> {}'.format(self.label, k, v))
def check_expiry(self, p):
st = os.stat(p)
if st[stat.ST_MTIME] < self.invalidate_before:
raise ExpiredRecordError('record in {} is expired'.format(p))
def get(self, k):
p = os.path.join(self.store_path, k)
self.check_expiry(p)
f = open(p, 'r')
r = f.read()
f.close()
return r.strip()
def by_phone(self, phone, update=False):
phone = phonenumbers.parse(phone, None)
phone = phonenumbers.format_number(phone, phonenumbers.PhoneNumberFormat.E164)
phone_file = phone.replace('+', '')
if not update:
try:
v = self.get(phone_file)
return v
except FileNotFoundError:
pass
except ExpiredRecordError as e:
logg.info(e)
pass
#getter = MetadataRequestsHandler(MetadataPointer.PHONE, phone.encode('utf-8'))
r = None
user_address = None
try:
ptr = generate_metadata_pointer(phone.encode('utf-8'), MetadataPointer.PHONE)
url = urllib.parse.urljoin(MetadataRequestsHandler.base_url, ptr)
r = make_request('GET', url)
user_address = r.json()
except requests.exceptions.HTTPError as e:
logg.debug('no address found for phone {}: {}'.format(phone, e))
return None
self.put(phone_file, user_address)
return user_address
def get_label(self, address, update=False):
add = tx_normalize.wallet_address(address)
if not update:
try:
v = self.get(address)
v = json.loads(v)
person = Person()
person_data = person.deserialize(person_data=v)
return str(person_data)
except FileNotFoundError:
pass
except ExpiredRecordError as e:
logg.info(e)
pass
address = strip_0x(address)
getter = MetadataRequestsHandler(MetadataPointer.PERSON, bytes.fromhex(address))
r = None
try:
r = getter.query()
except requests.exceptions.HTTPError as e:
logg.debug('no metadata found for {}: {}'.format(address, e))
return address
data = r.json()
person = Person()
person_data = person.deserialize(person_data=data)
self.put(address, json.dumps(person_data.serialize()))
return str(person_data)