Full import by ussd

This commit is contained in:
nolash 2021-04-07 21:23:50 +02:00
parent 49e92874c3
commit b204a3e5d9
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 208 additions and 89 deletions

View File

@ -0,0 +1,148 @@
# standard imports
import os
import logging
import urllib.parse
import urllib.error
import urllib.request
import json
# external imports
import celery
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum_address
from cic_types.processor import generate_metadata_pointer
from cic_types.models.person import Person
logg = logging.getLogger().getChild(__name__)
celery_app = celery.current_app
class MetadataTask(celery.Task):
balances = None
chain_spec = None
import_path = 'out'
meta_host = None
meta_port = None
meta_path = ''
meta_ssl = False
balance_processor = None
autoretry_for = (
urllib.error.HTTPError,
)
retry_kwargs = {
'countdown': 3,
'max_retries': 100,
}
@classmethod
def meta_url(self):
scheme = 'http'
if self.meta_ssl:
scheme += s
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path))
return urllib.parse.urlunparse(url)
def old_address_from_phone(base_path, phone):
pidx = generate_metadata_pointer(phone.encode('utf-8'), 'cic.phone')
phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format(
base_path,
pidx[:2],
pidx[2:4],
pidx,
)
)
f = open(phone_idx_path, 'r')
old_address = f.read()
f.close()
return old_address
@celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone):
identifier = generate_metadata_pointer(phone.encode('utf-8'), 'cic.phone')
url = urllib.parse.urljoin(self.meta_url(), identifier)
logg.debug('attempt getting phone pointer at {} for phone {}'.format(url, phone))
r = urllib.request.urlopen(url)
address = json.load(r)
address = address.replace('"', '')
logg.debug('address {} for phone {}'.format(address, phone))
return address
@celery_app.task(bind=True, base=MetadataTask)
def generate_metadata(self, address, phone):
old_address = old_address_from_phone(self.import_path, phone)
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
self.import_path,
old_address_upper[:2],
old_address_upper[2:4],
old_address_upper,
)
f = open(metadata_path, 'r')
o = json.load(f)
f.close()
u = Person.deserialize(o)
if u.identities.get('evm') == None:
u.identities['evm'] = {}
sub_chain_str = '{}:{}'.format(self.chain_spec.common_name(), self.chain_spec.network_id())
u.identities['evm'][sub_chain_str] = [add_0x(address)]
new_address_clean = strip_0x(address)
filepath = os.path.join(
self.import_path,
'new',
new_address_clean[:2].upper(),
new_address_clean[2:4].upper(),
new_address_clean.upper() + '.json',
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
o = u.serialize()
f = open(filepath, 'w')
f.write(json.dumps(o))
f.close()
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), 'cic.person')
meta_filepath = os.path.join(
self.import_path,
'meta',
'{}.json'.format(new_address_clean.upper()),
)
os.symlink(os.path.realpath(filepath), meta_filepath)
logg.debug('found metadata {} for phone {}'.format(o, phone))
return address
@celery_app.task(bind=True, base=MetadataTask)
def transfer_opening_balance(self, address, phone, serial):
old_address = old_address_from_phone(self.import_path, phone)
k = to_checksum_address(strip_0x(old_address))
balance = self.balances[k]
logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone))
decimal_balance = self.balance_processor.get_decimal_amount(balance)
tx_hash_hex = self.balance_processor.get_rpc_tx(address, decimal_balance, serial)
logg.debug('sending {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex))
return tx_hash_hex

View File

@ -21,8 +21,8 @@ from crypto_dev_signer.keystore.dict import DictKeystore
from cic_types.models.person import Person from cic_types.models.person import Person
# local imports # local imports
from balance import BalanceProcessor from import_util import BalanceProcessor
from task import * from import_task import *
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -108,38 +108,15 @@ token_symbol = args.token_symbol
MetadataTask.meta_host = config.get('META_HOST') MetadataTask.meta_host = config.get('META_HOST')
MetadataTask.meta_port = config.get('META_PORT') MetadataTask.meta_port = config.get('META_PORT')
MetadataTask.chain_spec = chain_spec
def main(): def main():
conn = EthHTTPConnection(config.get('ETH_PROVIDER')) conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), signer_address) MetadataTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), signer_address, signer)
processor.init() MetadataTask.balance_processor.init()
# TODO get decimals from token
# syncer_backend = MemBackend(chain_str, 0)
#
# if block_offset == -1:
# o = block_latest()
# r = conn.do(o)
# block_offset = int(strip_0x(r), 16) + 1
##
## addresses = {}
## f = open('{}/addresses.csv'.format(user_dir, 'r'))
## while True:
## l = f.readline()
## if l == None:
## break
## r = l.split(',')
## try:
## k = r[0]
## v = r[1].rstrip()
## addresses[k] = v
## sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r")
## except IndexError as e:
## break
## f.close()
#
# # TODO get decimals from token
balances = {} balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r')) f = open('{}/balances.csv'.format(user_dir, 'r'))
remove_zeros = 10**6 remove_zeros = 10**6
@ -160,6 +137,8 @@ def main():
f.close() f.close()
MetadataTask.balances = balances
argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG'] argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG']
celery_app.worker_main(argv) celery_app.worker_main(argv)

View File

@ -148,15 +148,34 @@ if __name__ == '__main__':
phone_object = phonenumbers.parse(u.tel) phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s = celery.signature( s_phone = celery.signature(
'task.resolve_phone', 'import_task.resolve_phone',
[ [
phone, phone,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s.apply_async()
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
)
s_balance = celery.signature(
'import_task.transfer_opening_balance',
[
phone,
i,
],
queue='cic-import-ussd',
)
s_meta.link(s_balance)
s_phone.link(s_meta)
s_phone.apply_async()
# if u.identities.get('evm') == None: # if u.identities.get('evm') == None:
# u.identities['evm'] = {} # u.identities['evm'] = {}

View File

@ -5,26 +5,34 @@ import logging
from eth_contract_registry import Registry from eth_contract_registry import Registry
from eth_token_index import TokenUniqueSymbolIndex from eth_token_index import TokenUniqueSymbolIndex
from chainlib.eth.gas import OverrideGasOracle from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.nonce import OverrideNonceOracle
from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import count
logg = logging.getLogger().getChild(__name__) logg = logging.getLogger().getChild(__name__)
class BalanceProcessor: class BalanceProcessor:
def __init__(self, conn, chain_spec, registry_address, signer_address): def __init__(self, conn, chain_spec, registry_address, signer_address, signer):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.conn = conn self.conn = conn
self.signer_address = signer_address #self.signer_address = signer_address
self.registry_address = registry_address self.registry_address = registry_address
self.token_index_address = None self.token_index_address = None
self.token_address = None self.token_address = None
self.signer_address = signer_address
self.signer = signer
gas_oracle = OverrideGasOracle(conn=conn, limit=8000000) o = count(signer_address)
nonce_oracle = RPCNonceOracle(signer_address, conn) c = self.conn.do(o)
self.nonce_offset = int(c, 16)
self.gas_oracle = OverrideGasOracle(conn=conn, limit=8000000)
self.value_multiplier = 1
def init(self): def init(self):
# Get Token registry address # Get Token registry address
@ -40,3 +48,20 @@ class BalanceProcessor:
self.token_address = token_registry.parse_address_of(r) self.token_address = token_registry.parse_address_of(r)
logg.info('found SRF token address {}'.format(self.token_address)) logg.info('found SRF token address {}'.format(self.token_address))
tx_factory = ERC20(self.chain_spec)
o = tx_factory.decimals(self.token_address)
r = self.conn.do(o)
self.value_multiplier = int(r, 16) ** 10
def get_rpc_tx(self, recipient, value, i):
logg.debug('initiating nonce offset {} for recipient {}'.format(self.nonce_offset + i, recipient))
nonce_oracle = OverrideNonceOracle(self.signer_address, self.nonce_offset + i)
tx_factory = ERC20(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.gas_oracle)
(tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value)
self.conn.do(o)
return tx_hash_hex
def get_decimal_amount(self, value):
return value * self.value_multiplier

View File

@ -1,52 +0,0 @@
# standard imports
import logging
import urllib.parse
import urllib.error
import urllib.request
import json
# external imports
import celery
from cic_types.processor import generate_metadata_pointer
logg = logging.getLogger().getChild(__name__)
celery_app = celery.current_app
class MetadataTask(celery.Task):
meta_host = None
meta_port = None
meta_path = ''
meta_ssl = False
autoretry_for = (
urllib.error.HTTPError,
)
retry_kwargs = {
'countdown': 3,
'max_retries': 100,
}
@classmethod
def meta_url(self):
scheme = 'http'
if self.meta_ssl:
scheme += s
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path))
return urllib.parse.urlunparse(url)
@celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone):
identifier = generate_metadata_pointer(phone.encode('utf-8'), 'cic.phone')
url = urllib.parse.urljoin(self.meta_url(), identifier)
logg.debug('attempt getting phone pointer at {}'.format(url))
r = urllib.request.urlopen(url)
address = json.load(r)
logg.debug('address {} for phone {}'.format(address, phone))
@celery_app.task(bind=True, base=MetadataTask)
def transfer_opening_balance(self, phone, address):
pass