2021-02-21 16:41:37 +01:00
# standard imports
import os
import sys
import json
import logging
import argparse
import uuid
import datetime
import time
2021-04-09 15:00:15 +02:00
import phonenumbers
2021-02-21 16:41:37 +01:00
from glob import glob
2021-10-20 17:02:36 +02:00
# external imports
2021-02-21 16:41:37 +01:00
import redis
import confini
import celery
from hexathon import (
add_0x ,
strip_0x ,
)
2021-04-09 15:00:15 +02:00
from chainlib . eth . address import to_checksum_address
2021-02-21 16:41:37 +01:00
from cic_types . models . person import Person
from cic_eth . api . api_task import Api
2021-02-22 21:00:18 +01:00
from chainlib . chain import ChainSpec
2021-02-21 16:41:37 +01:00
from cic_types . processor import generate_metadata_pointer
2021-10-20 17:02:36 +02:00
from cic_types import MetadataPointer
# local imports
from common . dirs import initialize_dirs
2021-02-21 16:41:37 +01:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
2021-10-20 17:02:36 +02:00
script_dir = os . path . dirname ( os . path . realpath ( __file__ ) )
root_dir = os . path . dirname ( script_dir )
base_config_dir = os . path . join ( root_dir , ' config ' )
2021-02-21 16:41:37 +01:00
argparser = argparse . ArgumentParser ( )
2021-10-20 17:02:36 +02:00
argparser . add_argument ( ' -c ' , type = str , help = ' config override directory ' )
2021-02-21 16:41:37 +01:00
argparser . add_argument ( ' -i ' , ' --chain-spec ' , dest = ' i ' , type = str , help = ' Chain specification string ' )
2021-10-20 17:02:36 +02:00
argparser . add_argument ( ' -f ' , action = ' store_true ' , help = ' force clear previous state ' )
2021-04-24 08:14:24 +02:00
argparser . add_argument ( ' --old-chain-spec ' , type = str , dest = ' old_chain_spec ' , default = ' evm:oldchain:1 ' , help = ' chain spec ' )
2021-02-21 16:41:37 +01:00
argparser . add_argument ( ' --redis-host ' , dest = ' redis_host ' , type = str , help = ' redis host to use for task submission ' )
argparser . add_argument ( ' --redis-port ' , dest = ' redis_port ' , type = int , help = ' redis host to use for task submission ' )
argparser . add_argument ( ' --redis-db ' , dest = ' redis_db ' , type = int , help = ' redis db to use for task submission and callback ' )
argparser . add_argument ( ' --redis-host-callback ' , dest = ' redis_host_callback ' , default = ' localhost ' , type = str , help = ' redis host to use for callback ' )
argparser . add_argument ( ' --redis-port-callback ' , dest = ' redis_port_callback ' , default = 6379 , type = int , help = ' redis port to use for callback ' )
2021-04-05 17:07:09 +02:00
argparser . add_argument ( ' --batch-size ' , dest = ' batch_size ' , default = 100 , type = int , help = ' burst size of sending transactions to node ' ) # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
2021-02-21 16:41:37 +01:00
argparser . add_argument ( ' --batch-delay ' , dest = ' batch_delay ' , default = 2 , type = int , help = ' seconds delay between batches ' )
2021-03-07 19:01:44 +01:00
argparser . add_argument ( ' --timeout ' , default = 60.0 , type = float , help = ' Callback timeout ' )
2021-02-21 16:41:37 +01:00
argparser . add_argument ( ' -q ' , type = str , default = ' cic-eth ' , help = ' Task queue ' )
argparser . add_argument ( ' -v ' , action = ' store_true ' , help = ' Be verbose ' )
argparser . add_argument ( ' -vv ' , action = ' store_true ' , help = ' Be more verbose ' )
argparser . add_argument ( ' user_dir ' , type = str , help = ' path to users export dir tree ' )
args = argparser . parse_args ( )
if args . v :
logg . setLevel ( logging . INFO )
elif args . vv :
logg . setLevel ( logging . DEBUG )
2021-10-20 17:02:36 +02:00
config = None
if args . c != None :
config = confini . Config ( base_config_dir , os . environ . get ( ' CONFINI_ENV_PREFIX ' ) , override_config_dir = args . c )
else :
config = confini . Config ( base_config_dir , os . environ . get ( ' CONFINI_ENV_PREFIX ' ) )
2021-02-21 16:41:37 +01:00
config . process ( )
args_override = {
2021-10-20 17:02:36 +02:00
' CHAIN_SPEC ' : getattr ( args , ' i ' ) ,
2021-02-21 16:41:37 +01:00
' REDIS_HOST ' : getattr ( args , ' redis_host ' ) ,
' REDIS_PORT ' : getattr ( args , ' redis_port ' ) ,
' REDIS_DB ' : getattr ( args , ' redis_db ' ) ,
}
config . dict_override ( args_override , ' cli ' )
2021-10-20 17:02:36 +02:00
config . add ( args . user_dir , ' _USERDIR ' , True )
2021-02-21 16:41:37 +01:00
celery_app = celery . Celery ( broker = config . get ( ' CELERY_BROKER_URL ' ) , backend = config . get ( ' CELERY_RESULT_URL ' ) )
redis_host = config . get ( ' REDIS_HOST ' )
redis_port = config . get ( ' REDIS_PORT ' )
redis_db = config . get ( ' REDIS_DB ' )
r = redis . Redis ( redis_host , redis_port , redis_db )
ps = r . pubsub ( )
2021-04-24 08:14:24 +02:00
old_chain_spec = ChainSpec . from_chain_str ( args . old_chain_spec )
old_chain_str = str ( old_chain_spec )
2021-10-20 17:02:36 +02:00
chain_spec = ChainSpec . from_chain_str ( config . get ( ' CHAIN_SPEC ' ) )
2021-02-21 16:41:37 +01:00
chain_str = str ( chain_spec )
batch_size = args . batch_size
batch_delay = args . batch_delay
2021-10-20 17:02:36 +02:00
dirs = initialize_dirs ( config . get ( ' _USERDIR ' ) , force_reset = args . f )
2021-02-21 16:41:37 +01:00
def register_eth ( i , u ) :
redis_channel = str ( uuid . uuid4 ( ) )
ps . subscribe ( redis_channel )
2021-03-01 21:15:17 +01:00
#ps.get_message()
2021-02-21 16:41:37 +01:00
api = Api (
2021-10-20 17:02:36 +02:00
config . get ( ' CHAIN_SPEC ' ) ,
2021-02-21 16:41:37 +01:00
queue = args . q ,
callback_param = ' {} : {} : {} : {} ' . format ( args . redis_host_callback , args . redis_port_callback , redis_db , redis_channel ) ,
callback_task = ' cic_eth.callbacks.redis.redis ' ,
callback_queue = args . q ,
)
t = api . create_account ( register = True )
2021-03-07 19:01:44 +01:00
logg . debug ( ' register {} -> {} ' . format ( u , t ) )
2021-02-21 16:41:37 +01:00
2021-03-01 21:15:17 +01:00
while True :
ps . get_message ( )
m = ps . get_message ( timeout = args . timeout )
address = None
2021-03-06 18:55:51 +01:00
if m == None :
logg . debug ( ' message timeout ' )
return
2021-03-01 21:15:17 +01:00
if m [ ' type ' ] == ' subscribe ' :
logg . debug ( ' skipping subscribe message ' )
continue
try :
r = json . loads ( m [ ' data ' ] )
address = r [ ' result ' ]
break
2021-03-07 19:01:44 +01:00
except Exception as e :
2021-03-01 21:15:17 +01:00
if m == None :
2021-03-07 19:01:44 +01:00
logg . critical ( ' empty response from redis callback (did the service crash?) {} ' . format ( e ) )
2021-03-01 21:15:17 +01:00
else :
2021-03-07 19:01:44 +01:00
logg . critical ( ' unexpected response from redis callback: {} {} ' . format ( m , e ) )
2021-03-01 21:15:17 +01:00
sys . exit ( 1 )
logg . debug ( ' [ {} ] register eth {} {} ' . format ( i , u , address ) )
2021-02-21 16:41:37 +01:00
return address
if __name__ == ' __main__ ' :
2021-04-24 08:14:24 +02:00
user_tags = { }
2021-10-20 17:02:36 +02:00
f = open ( os . path . join ( config . get ( ' _USERDIR ' ) , ' tags.csv ' ) , ' r ' )
2021-04-24 08:14:24 +02:00
while True :
r = f . readline ( ) . rstrip ( )
if len ( r ) == 0 :
break
( old_address , tags_csv ) = r . split ( ' : ' )
old_address = strip_0x ( old_address )
user_tags [ old_address ] = tags_csv . split ( ' , ' )
logg . debug ( ' read tags {} for old address {} ' . format ( user_tags [ old_address ] , old_address ) )
2021-02-21 16:41:37 +01:00
i = 0
j = 0
2021-10-20 17:02:36 +02:00
for x in os . walk ( dirs [ ' old ' ] ) :
2021-02-21 16:41:37 +01:00
for y in x [ 2 ] :
if y [ len ( y ) - 5 : ] != ' .json ' :
continue
filepath = os . path . join ( x [ 0 ] , y )
f = open ( filepath , ' r ' )
try :
o = json . load ( f )
except json . decoder . JSONDecodeError as e :
f . close ( )
logg . error ( ' load error for {} : {} ' . format ( y , e ) )
continue
f . close ( )
2021-05-19 11:55:24 +02:00
logg . debug ( ' deserializing {} {} ' . format ( filepath , o ) )
2021-02-21 16:41:37 +01:00
u = Person . deserialize ( o )
new_address = register_eth ( i , u )
if u . identities . get ( ' evm ' ) == None :
u . identities [ ' evm ' ] = { }
2021-02-22 21:00:18 +01:00
sub_chain_str = ' {} : {} ' . format ( chain_spec . common_name ( ) , chain_spec . network_id ( ) )
u . identities [ ' evm ' ] [ sub_chain_str ] = [ new_address ]
2021-02-21 16:41:37 +01:00
new_address_clean = strip_0x ( new_address )
filepath = os . path . join (
2021-10-20 17:02:36 +02:00
dirs [ ' new ' ] ,
2021-02-21 16:41:37 +01:00
new_address_clean [ : 2 ] . upper ( ) ,
new_address_clean [ 2 : 4 ] . upper ( ) ,
new_address_clean . upper ( ) + ' .json ' ,
)
os . makedirs ( os . path . dirname ( filepath ) , exist_ok = True )
o = u . serialize ( )
f = open ( filepath , ' w ' )
f . write ( json . dumps ( o ) )
f . close ( )
2021-10-20 17:02:36 +02:00
meta_key = generate_metadata_pointer ( bytes . fromhex ( new_address_clean ) , MetadataPointer . PERSON )
meta_filepath = os . path . join ( dirs [ ' meta ' ] , ' {} .json ' . format ( new_address_clean . upper ( ) ) )
2021-02-21 16:41:37 +01:00
os . symlink ( os . path . realpath ( filepath ) , meta_filepath )
2021-04-09 15:00:15 +02:00
phone_object = phonenumbers . parse ( u . tel )
phone = phonenumbers . format_number ( phone_object , phonenumbers . PhoneNumberFormat . E164 )
2021-10-20 17:02:36 +02:00
meta_phone_key = generate_metadata_pointer ( phone . encode ( ' utf-8 ' ) , MetadataPointer . PHONE )
meta_phone_filepath = os . path . join ( dirs [ ' phone ' ] , ' meta ' , meta_phone_key )
2021-04-09 15:00:15 +02:00
filepath = os . path . join (
2021-10-20 17:02:36 +02:00
dirs [ ' phone ' ] ,
2021-04-09 15:00:15 +02:00
' new ' ,
meta_phone_key [ : 2 ] . upper ( ) ,
meta_phone_key [ 2 : 4 ] . upper ( ) ,
meta_phone_key . upper ( ) ,
)
os . makedirs ( os . path . dirname ( filepath ) , exist_ok = True )
f = open ( filepath , ' w ' )
f . write ( to_checksum_address ( new_address_clean ) )
f . close ( )
os . symlink ( os . path . realpath ( filepath ) , meta_phone_filepath )
2021-04-24 08:14:24 +02:00
# custom data
2021-10-20 17:02:36 +02:00
custom_key = generate_metadata_pointer ( bytes . fromhex ( new_address_clean ) , MetadataPointer . CUSTOM )
custom_filepath = os . path . join ( dirs [ ' custom ' ] , ' meta ' , custom_key )
2021-04-24 08:14:24 +02:00
filepath = os . path . join (
2021-10-20 17:02:36 +02:00
dirs [ ' custom ' ] ,
2021-04-24 08:14:24 +02:00
' new ' ,
custom_key [ : 2 ] . upper ( ) ,
custom_key [ 2 : 4 ] . upper ( ) ,
custom_key . upper ( ) + ' .json ' ,
)
os . makedirs ( os . path . dirname ( filepath ) , exist_ok = True )
sub_old_chain_str = ' {} : {} ' . format ( old_chain_spec . common_name ( ) , old_chain_spec . network_id ( ) )
f = open ( filepath , ' w ' )
k = u . identities [ ' evm ' ] [ sub_old_chain_str ] [ 0 ]
tag_data = { ' tags ' : user_tags [ strip_0x ( k ) ] }
f . write ( json . dumps ( tag_data ) )
f . close ( )
os . symlink ( os . path . realpath ( filepath ) , custom_filepath )
2021-02-21 16:41:37 +01:00
i + = 1
sys . stdout . write ( ' imported {} {} ' . format ( i , u ) . ljust ( 200 ) + " \r " )
j + = 1
if j == batch_size :
time . sleep ( batch_delay )
j = 0
#fi.close()