# standard imports import os import logging import urllib.parse import urllib.error import urllib.request import json # external imports import celery import psycopg2 from psycopg2 import extras from hexathon import ( strip_0x, add_0x, ) from chainlib.eth.address import to_checksum_address from chainlib.eth.tx import ( unpack, raw, ) from cic_types.processor import generate_metadata_pointer from cic_types.models.person import Person #logg = logging.getLogger().getChild(__name__) logg = logging.getLogger() celery_app = celery.current_app class ImportTask(celery.Task): balances = None import_dir = 'out' count = 0 chain_spec = None balance_processor = None max_retries = None class MetadataTask(ImportTask): meta_host = None meta_port = None meta_path = '' meta_ssl = False autoretry_for = ( urllib.error.HTTPError, OSError, ) retry_jitter = True retry_backoff = True retry_backoff_max = 60 @classmethod def meta_url(self): scheme = 'http' if self.meta_ssl: scheme += s url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path)) return urllib.parse.urlunparse(url) def old_address_from_phone(base_path, phone): pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone') phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format( base_path, pidx[:2], pidx[2:4], pidx, ) ) f = open(phone_idx_path, 'r') old_address = f.read() f.close() return old_address @celery_app.task(bind=True, base=MetadataTask) def resolve_phone(self, phone): identifier = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone') url = urllib.parse.urljoin(self.meta_url(), identifier) logg.debug('attempt getting phone pointer at {} for phone {}'.format(url, phone)) r = urllib.request.urlopen(url) address = json.load(r) address = address.replace('"', '') logg.debug('address {} for phone {}'.format(address, phone)) return address @celery_app.task(bind=True, base=MetadataTask) def generate_metadata(self, address, phone): old_address = old_address_from_phone(self.import_dir, phone) logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> foo') logg.debug('address {}'.format(address)) old_address_upper = strip_0x(old_address).upper() metadata_path = '{}/old/{}/{}/{}.json'.format( self.import_dir, old_address_upper[:2], old_address_upper[2:4], old_address_upper, ) f = open(metadata_path, 'r') o = json.load(f) f.close() u = Person.deserialize(o) if u.identities.get('evm') == None: u.identities['evm'] = {} sub_chain_str = '{}:{}'.format(self.chain_spec.common_name(), self.chain_spec.network_id()) u.identities['evm'][sub_chain_str] = [add_0x(address)] new_address_clean = strip_0x(address) filepath = os.path.join( self.import_dir, 'new', new_address_clean[:2].upper(), new_address_clean[2:4].upper(), new_address_clean.upper() + '.json', ) os.makedirs(os.path.dirname(filepath), exist_ok=True) o = u.serialize() f = open(filepath, 'w') f.write(json.dumps(o)) f.close() meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person') meta_filepath = os.path.join( self.import_dir, 'meta', '{}.json'.format(new_address_clean.upper()), ) os.symlink(os.path.realpath(filepath), meta_filepath) logg.debug('found metadata {} for phone {}'.format(o, phone)) return address @celery_app.task(bind=True, base=MetadataTask) def opening_balance_tx(self, address, phone, serial): old_address = old_address_from_phone(self.import_dir, phone) k = to_checksum_address(strip_0x(old_address)) balance = self.balances[k] logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone)) decimal_balance = self.balance_processor.get_decimal_amount(int(balance)) (tx_hash_hex, o) = self.balance_processor.get_rpc_tx(address, decimal_balance, serial) tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec) logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex)) tx_path = os.path.join( self.import_dir, 'txs', strip_0x(tx_hash_hex), ) f = open(tx_path, 'w') f.write(strip_0x(o)) f.close() tx_nonce_path = os.path.join( self.import_dir, 'txs', '.' + str(tx['nonce']), ) os.symlink(os.path.realpath(tx_path), tx_nonce_path) return tx['hash'] @celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None, default_retry_delay=0.1) def send_txs(self, nonce): if nonce == self.count + self.balance_processor.nonce_offset: logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset, self.count)) return logg.debug('attempt to open symlink for nonce {}'.format(nonce)) tx_nonce_path = os.path.join( self.import_dir, 'txs', '.' + str(nonce), ) f = open(tx_nonce_path, 'r') tx_signed_raw_hex = f.read() f.close() os.unlink(tx_nonce_path) o = raw(add_0x(tx_signed_raw_hex)) tx_hash_hex = self.balance_processor.conn.do(o) logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) #tx_signed_raw_hex)) nonce += 1 queue = self.request.delivery_info.get('routing_key') s = celery.signature( 'import_task.send_txs', [ nonce, ], queue=queue, ) s.apply_async() return nonce @celery_app.task def set_pins(config: dict, phone_to_pins: list): # define db connection db_conn = psycopg2.connect( database=config.get('database'), host=config.get('host'), port=config.get('port'), user=config.get('user'), password=config.get('password') ) db_cursor = db_conn.cursor() # batch updates extras.execute_values(db_cursor, 'UPDATE account \ SET password_hash = update_data.password_hash \ FROM (VALUES %s) AS update_data (phone, password_hash) \ WHERE phone_number = update_data.phone', phone_to_pins) db_conn.commit() db_cursor.close() db_conn.close()