clicada/clicada/user/file.py

292 lines
8.1 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import stat
import os
import logging
import json
import urllib.parse
import datetime
# external imports
from hexathon import strip_0x
from cic_types.condiments import MetadataPointer
from cic_types.models.person import Person
from cic_types.ext.requests import make_request
from cic_types.processor import generate_metadata_pointer
from requests.exceptions import HTTPError
import phonenumbers
# local imports
from clicada.encode import tx_normalize
from clicada.store.mem import MemDictStore
from clicada.error import (
ExpiredRecordError,
MetadataNotFoundError,
)
logg = logging.getLogger(__name__)
class Account(Person):
def __init__(self):
super(Account, self).__init__()
self.tags = []
def apply_custom(self, custom_data):
self.tags = custom_data['tags']
logg.debug('tags are now {}'.format(self.tags))
@classmethod
def deserialize(cls, person_data):
o = super(Account, cls).deserialize(person_data)
try:
o.tags = person_data['custom']['tags']
except KeyError as e:
pass
return o
def serialize(self):
o = super(Account, self).serialize()
o['custom'] = {}
o['custom']['tags'] = self.tags
return o
def __str__(self):
return '{} {} ({})'.format(
self.given_name,
self.family_name,
self.tel,
)
class FileUserStore:
def __init__(self, metadata_opener, chain_spec, label, store_base_path, ttl, encrypter=None):
invalidate_before = datetime.datetime.now() - datetime.timedelta(seconds=ttl)
self.invalidate_before = int(invalidate_before.timestamp())
self.have_xattr = False
self.label = label
self.store_path = os.path.join(
store_base_path,
chain_spec.arch(),
chain_spec.fork(),
str(chain_spec.network_id()),
self.label,
)
self.memstore_entity = MemDictStore()
os.makedirs(self.store_path, exist_ok=True)
self.__validate_dir()
self.metadata_opener = metadata_opener
self.failed_entities = {}
self.encrypter = encrypter
def __validate_dir(self):
st = os.stat(self.store_path)
if stat.S_ISDIR(st.st_mode):
logg.debug('using existing file store {} for {}'.format(self.store_path, self.label))
def is_dud(self, address):
return bool(self.failed_entities.get(address))
def put(self, k, v, force=False):
have_file = False
p = os.path.join(self.store_path, k)
its_time = True
try:
st = os.stat(p)
have_file = True
its_time = st[stat.ST_MTIME] < self.invalidate_before
except FileNotFoundError:
pass
if have_file and not its_time and not force:
raise FileExistsError('user resolution already exists for {}'.format(k))
ve = v
f = None
if self.encrypter != None:
ve = self.encrypter.encrypt(k, ve.encode('utf-8'))
f = open(p, 'wb')
else:
f = open(p, 'w')
f.write(ve)
f.close()
logg.info('added user store {} record {} -> {}'.format(self.label, k, v))
def stick(self, k):
p = os.path.join(self.store_path, k)
if self.have_xattr:
os.setxattr(p, 'cic_sticky', b'1')
else:
(s_h, s_t) = os.path.split(p)
sp = os.path.join(s_h, '.stick_' + s_t)
f = open(sp, 'w')
f.close()
def __is_sticky(self, p):
if self.have_xattr:
if not os.getxattr(p, 'cic_sticky'):
return False
else:
(s_h, s_t) = os.path.split(p)
sp = os.path.join(s_h, '.stick_' + s_t)
try:
os.stat(sp)
except FileNotFoundError:
return False
return True
def __unstick(self, p):
(s_h, s_t) = os.path.split(p)
sp = os.path.join(s_h, '.stick_' + s_t)
os.unlink(sp)
def path(self, k):
return os.path.join(self.store_path, k)
def sticky(self, k):
p = self.path(k)
return self.__is_sticky(p)
def check_expiry(self, p):
st = os.stat(p)
if st[stat.ST_MTIME] < self.invalidate_before:
if self.__is_sticky(p):
logg.debug('ignoring sticky record expiry for {}'.format(p))
return
raise ExpiredRecordError('record in {} is expired'.format(p))
def get(self, k, ignore_expired=False):
try:
p = os.path.join(self.store_path, k)
except FileNotFoundError as e:
if self.__is_sticky(p):
logg.warning('removed orphaned sticky record for ' + p)
self.__unstick(p)
self.check_expiry(p)
f = None
if self.encrypter != None:
f = open(p, 'rb')
else:
f = open(p, 'r')
v = f.read()
f.close()
if self.encrypter != None:
v = self.encrypter.decrypt(k, v)
logg.debug('>>>>>>>>>>>>< v decoded {}'.format(v))
v = v.decode('utf-8')
logg.debug('retrieved {} from {}'.format(k, p))
return v.strip()
def by_phone(self, phone, update=False):
phone = phonenumbers.parse(phone, None)
phone = phonenumbers.format_number(phone, phonenumbers.PhoneNumberFormat.E164)
phone_file = phone.replace('+', '')
ignore_expired = self.sticky(phone_file)
if not update:
try:
v = self.get(phone_file, ignore_expired=ignore_expired)
return v
except FileNotFoundError:
pass
except ExpiredRecordError as e:
logg.info(e)
pass
getter = self.metadata_opener
ptr = generate_metadata_pointer(phone.encode('utf-8'), MetadataPointer.PHONE)
r = None
user_address = None
try:
r = getter.open(ptr)
user_address = json.loads(r)
except HTTPError as e:
logg.debug('no address found for phone {}: {}'.format(phone, e))
return None
self.put(phone_file, user_address, force=update)
return user_address
def metadata_to_person(self, v):
person = Account()
try:
person_data = person.deserialize(person_data=v)
except Exception as e:
person_data = v
return person_data
def by_address(self, address, update=False):
address = tx_normalize.wallet_address(address)
address = strip_0x(address)
#if self.failed_entities.get(address):
if self.is_dud(address):
logg.debug('already tried and failed {}, skipping'.format(address))
return address
ignore_expired = self.sticky(address)
if not update:
try:
v = self.get(address, ignore_expired=ignore_expired)
v = json.loads(v)
return self.metadata_to_person(v)
except FileNotFoundError:
pass
except ExpiredRecordError as e:
logg.info(e)
pass
getter = self.metadata_opener
ptr = generate_metadata_pointer(bytes.fromhex(address), MetadataPointer.PERSON)
r = None
try:
r = getter.open(ptr)
except Exception as e:
logg.debug('no metadata found for {}: {}'.format(address, e))
if not r:
self.failed_entities[address] = True
raise MetadataNotFoundError()
data = json.loads(r)
person = Account()
person_data = person.deserialize(person_data=data)
ptr = generate_metadata_pointer(bytes.fromhex(address), MetadataPointer.CUSTOM)
r = None
try:
r = getter.open(ptr)
o = json.loads(r)
person_data.apply_custom(o)
except Exception as e:
pass
self.put(address, json.dumps(person_data.serialize()), force=update)
return person_data