Make import balance script run indepedently of import users commence

This commit is contained in:
nolash 2021-02-18 00:20:26 +01:00
parent b19e8f2133
commit 00643f4cea
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
11 changed files with 149 additions and 100 deletions

View File

@ -405,7 +405,7 @@ def refill_gas(self, recipient_address, chain_str):
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!='0x00')
q = q.filter(TxCache.from_value!=0)
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
session.close()

View File

@ -91,6 +91,8 @@ run = True
class DispatchSyncer:
yield_delay = 0.005
def __init__(self, chain_spec):
self.chain_spec = chain_spec
self.chain_id = chain_spec.chain_id()
@ -138,7 +140,10 @@ class DispatchSyncer:
txs[k] = utxs[k]
self.process(w3, txs)
time.sleep(interval)
if len(utxs) > 0:
time.sleep(self.yield_delay)
else:
time.sleep(interval)
def main():

View File

@ -5,9 +5,10 @@ import logging
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db.enum import StatusBits
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db import Otx
from cic_eth.db.models.otx import Otx
from cic_eth.queue.tx import get_paused_txs
from cic_eth.eth.task import create_check_gas_and_send_task
from .base import SyncFilter
@ -17,8 +18,9 @@ logg = logging.getLogger()
class GasFilter(SyncFilter):
def __init__(self, gas_provider):
def __init__(self, queue, gas_provider):
self.gas_provider = gas_provider
self.queue = queue
def filter(self, w3, tx, rcpt, chain_str):
@ -39,7 +41,7 @@ class GasFilter(SyncFilter):
return
chain_spec = ChainSpec.from_chain_str(chain_str)
txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id())
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], chain_spec.chain_id())
if len(txs) > 0:
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))
@ -49,6 +51,6 @@ class GasFilter(SyncFilter):
r[0],
0,
tx_hashes_hex=list(txs.keys()),
queue=queue,
queue=self.queue,
)
s.apply_async()

View File

@ -15,6 +15,10 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f
class RegistrationFilter(SyncFilter):
def __init__(self, queue):
self.queue = queue
def filter(self, w3, tx, rcpt, chain_spec):
logg.debug('applying registration filter')
registered_address = None
@ -30,6 +34,6 @@ class RegistrationFilter(SyncFilter):
address,
str(chain_spec),
],
queue=queue,
queue=self.queue,
)
s.apply_async()

View File

@ -25,7 +25,7 @@ class TxFilter(SyncFilter):
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None
logg.info('otx found {}'.format(otx.tx_hash))
s = celery.siignature(
s = celery.signature(
'cic_eth.queue.tx.set_final_status',
[
tx_hash_hex,

View File

@ -178,9 +178,9 @@ def main():
tx_filter = TxFilter(queue)
registration_filter = RegistrationFilter()
registration_filter = RegistrationFilter(queue)
gas_filter = GasFilter(c.gas_provider())
gas_filter = GasFilter(queue, c.gas_provider())
i = 0
for syncer in syncers:

View File

@ -23,6 +23,8 @@ class MinedSyncer(Syncer):
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
"""
yield_delay = 0.005
def __init__(self, bc_cache):
super(MinedSyncer, self).__init__(bc_cache)
self.block_offset = 0
@ -99,5 +101,8 @@ class MinedSyncer(Syncer):
block_number = self.process(c.w3, block.hex())
logg.info('processed block {} {}'.format(block_number, block.hex()))
self.bc_cache.disconnect()
time.sleep(interval)
if len(e) > 0:
time.sleep(self.yield_delay)
else:
time.sleep(interval)
logg.info("Syncer no longer set to run, gracefully exiting")

View File

@ -17,7 +17,6 @@ from hexathon import (
strip_0x,
add_0x,
)
from cic_registry.chain import ChainSpec
from chainsyncer.backend import MemBackend
from chainsyncer.driver import HeadSyncer
from chainlib.eth.connection import HTTPConnection
@ -34,6 +33,7 @@ from chainlib.eth.nonce import DefaultNonceOracle
from chainlib.eth.tx import TxFactory
from chainlib.eth.rpc import jsonrpc_template
from chainlib.eth.error import EthException
from chainlib.chain import ChainSpec
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore import DictKeystore
from cic_types.models.person import Person
@ -49,7 +49,6 @@ argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc p
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('-a', '--index-address', type=str, dest='a', help='account index contract address')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
@ -97,9 +96,8 @@ if args.head:
block_offset = -1
else:
block_offset = args.offset
account_index_address = args.a
chain_spec = ChainSpec.from_chain_str(chain_str)
chain_spec = ChainSpec.from_chain_str('evm:' + chain_str)
user_dir = args.user_dir # user_out_dir from import_users.py
@ -108,45 +106,51 @@ class Handler:
account_index_add_signature = keccak256_string_to_hex('add(address)')[:8]
def __init__(self, conn, user_dir, addresses, balances, token_address):
self.conn = conn
def __init__(self, conn, chain_spec, user_dir, balances, token_address, signer, gas_oracle, nonce_oracle):
self.token_address = token_address
self.user_dir = user_dir
self.addresses = addresses
self.balances = balances
self.chain_spec = chain_spec
self.tx_factory = ERC20TxFactory(signer, gas_oracle, nonce_oracle, chain_spec.network_id())
def name(self):
return 'balance_handler'
def handle(self, getter, block, tx):
try:
if tx.payload[:8] == self.account_index_add_signature:
recipient = eth_abi.decode_single('address', bytes.fromhex(tx.payload[-64:]))
original_address = to_checksum(self.addresses[to_checksum(recipient)])
user_file = '{}/{}/{}.json'.format(
recipient[2:4].upper(),
recipient[4:6].upper(),
recipient[2:].upper(),
)
filepath = os.path.join(self.user_dir, user_file)
f = open(filepath, 'r')
o = json.load(f)
f.close()
u = Person(o)
balance = self.balances[original_address]
logg.info('registered {} originally {} ({}) tx hash {} balance {}'.format(recipient, original_address, u, tx.hash, balance))
def filter(self, conn, block, tx):
if tx.payload == None or len(tx.payload) == 0:
logg.debug('no payload, skipping {}'.format(tx))
return
(tx_hash_hex, o) = getter.tx_factory.erc20_transfer(self.token_address, signer_address, recipient, balance)
logg.info('submitting erc20 transfer tx {} for recipient {}'.format(tx_hash_hex, recipient))
r = self.conn.do(o)
except TypeError:
pass
except IndexError:
pass
except EthException as e:
logg.error('send error {}'.format(e).ljust(200))
if tx.payload[:8] == self.account_index_add_signature:
recipient = eth_abi.decode_single('address', bytes.fromhex(tx.payload[-64:]))
#original_address = to_checksum(self.addresses[to_checksum(recipient)])
user_file = 'new/{}/{}/{}.json'.format(
recipient[2:4].upper(),
recipient[4:6].upper(),
recipient[2:].upper(),
)
filepath = os.path.join(self.user_dir, user_file)
f = open(filepath, 'r')
o = json.load(f)
f.close()
u = Person(o)
original_address = u.identities['evm']['xdai:1'][0]
balance = self.balances[original_address]
logg.info('registered {} originally {} ({}) tx hash {} balance {}'.format(recipient, original_address, u, tx.hash, balance))
(tx_hash_hex, o) = self.tx_factory.erc20_transfer(self.token_address, signer_address, recipient, balance)
logg.info('submitting erc20 transfer tx {} for recipient {}'.format(tx_hash_hex, recipient))
r = conn.do(o)
# except TypeError as e:
# logg.warning('typerror {}'.format(e))
# pass
# except IndexError as e:
# logg.warning('indexerror {}'.format(e))
# pass
# except EthException as e:
# logg.error('send error {}'.format(e).ljust(200))
#except KeyError as e:
# logg.error('key error {}'.format(e).ljust(200))
@ -173,7 +177,7 @@ class BlockGetter:
def progress_callback(s, block_number, tx_index):
sys.stdout.write(s.ljust(200) + "\r")
sys.stdout.write(s.ljust(200) + "\n")
@ -185,7 +189,7 @@ def main():
nonce_oracle = DefaultNonceOracle(signer_address, conn)
# Get Token registry address
txf = TxFactory(signer=signer, gas_oracle=gas_oracle, nonce_oracle=None, chain_id=chain_spec.chain_id())
txf = TxFactory(signer=signer, gas_oracle=gas_oracle, nonce_oracle=None, chain_id=chain_spec.network_id())
tx = txf.template(signer_address, config.get('CIC_REGISTRY_ADDRESS'))
registry_addressof_method = keccak256_string_to_hex('addressOf(bytes32)')[:8]
@ -220,29 +224,28 @@ def main():
logg.info('found token address {}'.format(sarafu_token_address))
getter = BlockGetter(conn, gas_oracle, nonce_oracle, chain_spec.chain_id())
syncer_backend = MemBackend(chain_str, 0)
if block_offset == -1:
o = block_latest()
r = conn.do(o)
block_offset = int(strip_0x(r), 16) + 1
addresses = {}
f = open('{}/addresses.csv'.format(user_dir, 'r'))
while True:
l = f.readline()
if l == None:
break
r = l.split(',')
try:
k = r[0]
v = r[1].rstrip()
addresses[k] = v
sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r")
except IndexError as e:
break
f.close()
#
# addresses = {}
# f = open('{}/addresses.csv'.format(user_dir, 'r'))
# while True:
# l = f.readline()
# if l == None:
# break
# r = l.split(',')
# try:
# k = r[0]
# v = r[1].rstrip()
# addresses[k] = v
# sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r")
# except IndexError as e:
# break
# f.close()
balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r'))
@ -266,9 +269,9 @@ def main():
syncer_backend.set(block_offset, 0)
syncer = HeadSyncer(syncer_backend, progress_callback=progress_callback)
handler = Handler(conn, user_dir, addresses, balances, sarafu_token_address)
handler = Handler(conn, chain_spec, user_dir, balances, sarafu_token_address, signer, gas_oracle, nonce_oracle)
syncer.add_filter(handler)
syncer.loop(1, getter)
syncer.loop(1, conn)
if __name__ == '__main__':

View File

@ -6,7 +6,7 @@ import logging
import argparse
import uuid
import datetime
import shutil
import time
from glob import glob
# third-party imports
@ -35,6 +35,8 @@ argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
argparser.add_argument('--batch-size', dest='batch_size', default=50, type=int, help='burst size of sending transactions to node')
argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=20.0, type=float, help='Callback timeout')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
@ -67,18 +69,16 @@ r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub()
user_dir = args.user_dir
user_out_dir = '{}_cic_eth'.format(user_dir)
os.makedirs(user_out_dir)
shutil.copy(
os.path.join(user_dir, 'balances.csv'),
os.path.join(user_out_dir, 'balances.csv'),
)
os.makedirs(os.path.join(user_dir, 'new'))
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
def register_eth(u):
def register_eth(i, u):
redis_channel = str(uuid.uuid4())
ps.subscribe(redis_channel)
ps.get_message()
@ -94,7 +94,7 @@ def register_eth(u):
ps.get_message()
m = ps.get_message(timeout=args.timeout)
address = json.loads(m['data'])
logg.debug('register eth {} {}'.format(u, address))
logg.debug('[{}] register eth {} {}'.format(i, u, address))
return address
@ -105,10 +105,13 @@ def register_ussd(u):
if __name__ == '__main__':
fi = open(os.path.join(user_out_dir, 'addresses.csv'), 'a')
#fi = open(os.path.join(user_out_dir, 'addresses.csv'), 'a')
i = 0
for x in os.walk(user_dir):
j = 0
user_new_dir = os.path.join(user_dir, 'new')
user_old_dir = os.path.join(user_dir, 'old')
for x in os.walk(user_old_dir):
for y in x[2]:
if y[len(y)-5:] != '.json':
continue
@ -123,19 +126,18 @@ if __name__ == '__main__':
f.close()
u = Person(o)
new_address = register_eth(u)
new_address = register_eth(i, u)
u.identities['evm'][chain_str] = [new_address]
register_ussd(u)
new_address_clean = strip_0x(new_address)
filepath = os.path.join(
user_out_dir,
user_new_dir,
new_address_clean[:2].upper(),
new_address_clean[2:4].upper(),
new_address_clean.upper() + '.json',
)
logg.debug('outpath {}'.format(filepath))
os.makedirs(os.path.dirname(filepath), exist_ok=True)
o = u.serialize()
@ -143,10 +145,15 @@ if __name__ == '__main__':
f.write(json.dumps(o))
f.close()
old_address = to_checksum(add_0x(y[:len(y)-5]))
fi.write('{},{}\n'.format(new_address, old_address))
#old_address = to_checksum(add_0x(y[:len(y)-5]))
#fi.write('{},{}\n'.format(new_address, old_address))
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1
if j == batch_size:
time.sleep(batch_delay)
j = 0
fi.close()
#fi.close()

View File

@ -1,6 +1,6 @@
psycopg2==2.8.6
cic-types==0.1.0a4
chainlib~=0.0.1a13
chainlib~=0.0.1a14
chainsyncer==0.0.1a7
cic-eth==0.10.0a27
confini==0.3.6b2

View File

@ -39,6 +39,7 @@ from chainlib.eth.error import EthException
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore import DictKeystore
from cic_eth.api.api_admin import AdminApi
from cic_types.models.person import Person
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@ -160,6 +161,7 @@ def main():
o['params'].append(txf.normalize(tx))
o['params'].append('latest')
r = conn.do(o)
print('r {}'.format(r))
token_index_address = to_checksum(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r))))
logg.info('found token index address {}'.format(token_index_address))
@ -193,21 +195,21 @@ def main():
sarafu_token_address = to_checksum(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r))))
logg.info('found token address {}'.format(sarafu_token_address))
addresses = {}
f = open('{}/addresses.csv'.format(user_dir, 'r'))
while True:
l = f.readline()
if l == None:
break
r = l.split(',')
try:
k = r[0]
v = r[1].rstrip()
addresses[k] = v
sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r")
except IndexError as e:
break
f.close()
# addresses = {}
# f = open('{}/addresses.csv'.format(user_dir, 'r'))
# while True:
# l = f.readline()
# if l == None:
# break
# r = l.split(',')
# try:
# k = r[0]
# v = r[1].rstrip()
# addresses[k] = v
# sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r")
# except IndexError as e:
# break
# f.close()
balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r'))
@ -232,8 +234,29 @@ def main():
api = AdminApi(MockClient())
verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address)
for k in addresses.keys():
verifier.verify(k, balances[addresses[k]])
user_new_dir = os.path.join(user_dir, 'new')
for x in os.walk(user_new_dir):
for y in x[2]:
if y[len(y)-5:] != '.json':
continue
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person(o)
new_address = u.identities['evm'][chain_str][0]
old_address = u.identities['evm']['xdai:1'][0]
balance = balances[old_address]
logg.debug('checking {} -> {} = {}'.format(old_address, new_address, balance))
verifier.verify(new_address, balance)
if __name__ == '__main__':