From 26c622f42fd3dd5759ee731728a4eeb5834c4a45 Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 21 Feb 2021 20:38:09 +0100 Subject: [PATCH] WIP implement chainsyncer and cic_base for manager --- .../cic_eth/runnable/daemons/manager.py | 219 +++++++++++------- apps/cic-eth/requirements.txt | 1 + 2 files changed, 132 insertions(+), 88 deletions(-) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/manager.py b/apps/cic-eth/cic_eth/runnable/daemons/manager.py index 068f0a45..6aa11932 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/manager.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/manager.py @@ -14,16 +14,26 @@ import rlp import web3 from web3 import HTTPProvider, WebsocketProvider from cic_registry import CICRegistry -from cic_registry.chain import ChainSpec +from chainlib.chain import ChainSpec from cic_registry import zero_address from cic_registry.chain import ChainRegistry from cic_registry.error import UnknownContractError -from cic_bancor.bancor import BancorRegistryClient +#from cic_bancor.bancor import BancorRegistryClient +from chainlib.eth.connection import HTTPConnection +from chainlib.eth.block import ( + block_latest, + ) +from hexathon import ( + strip_0x, + ) +from chainsyncer.backend import SyncerBackend +from chainsyncer.db.models.base import SessionBase +import cic_base # local imports import cic_eth from cic_eth.eth import RpcClient -from cic_eth.db import SessionBase +#from cic_eth.db import SessionBase from cic_eth.db import Otx from cic_eth.db import TxConvertTransfer from cic_eth.db.models.tx import TxCache @@ -35,7 +45,6 @@ from cic_eth.sync.error import LoopDone from cic_eth.db.error import UnknownConvertError from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.eth.task import create_check_gas_and_send_task -from cic_eth.sync.backend import SyncerBackend from cic_eth.eth.token import unpack_transfer from cic_eth.eth.token import unpack_transferfrom from cic_eth.eth.account import unpack_gift @@ -46,75 +55,89 @@ from cic_eth.runnable.daemons.filters import ( RegistrationFilter, ) -logging.basicConfig(level=logging.WARNING) -logg = logging.getLogger() -logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL) -logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL) -config_dir = os.path.join('/usr/local/etc/cic-eth') +script_dir = os.path.realpath(os.path.dirname(__file__)) -argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') -argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') -argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') -argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') -argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') -argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') -argparser.add_argument('-v', help='be verbose', action='store_true') -argparser.add_argument('-vv', help='be more verbose', action='store_true') -argparser.add_argument('mode', type=str, help='sync mode: (head|history)', default='head') -args = argparser.parse_args(sys.argv[1:]) +logg = cic_base.log.create() +argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template) +#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic') +args = cic_base.argparse.parse(argparser, logg) +config = cic_base.config.create(args.c, args, args.env_prefix) -if args.v == True: - logging.getLogger().setLevel(logging.INFO) -elif args.vv == True: - logging.getLogger().setLevel(logging.DEBUG) +#logging.basicConfig(level=logging.WARNING) +#logg = logging.getLogger() +#logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL) +#logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL) +#logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL) +#logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL) -config_dir = os.path.join(args.c) -os.makedirs(config_dir, 0o777, True) -config = confini.Config(config_dir, args.env_prefix) -config.process() -# override args -args_override = { - 'ETH_ABI_DIR': getattr(args, 'abi_dir'), - 'CIC_CHAIN_SPEC': getattr(args, 'i'), - } -config.dict_override(args_override, 'cli flag') -config.censor('PASSWORD', 'DATABASE') -config.censor('PASSWORD', 'SSL') -logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) +#config_dir = os.path.join('/usr/local/etc/cic-eth') +# +#argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') +#argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') +#argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') +#argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') +#argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') +#argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') +#argparser.add_argument('-v', help='be verbose', action='store_true') +#argparser.add_argument('-vv', help='be more verbose', action='store_true') +#argparser.add_argument('mode', type=str, help='sync mode: (head|history)', default='head') +#args = argparser.parse_args(sys.argv[1:]) +# +#if args.v == True: +# logging.getLogger().setLevel(logging.INFO) +#elif args.vv == True: +# logging.getLogger().setLevel(logging.DEBUG) +# +#config_dir = os.path.join(args.c) +#os.makedirs(config_dir, 0o777, True) +#config = confini.Config(config_dir, args.env_prefix) +#config.process() +## override args +#args_override = { +# 'ETH_ABI_DIR': getattr(args, 'abi_dir'), +# 'CIC_CHAIN_SPEC': getattr(args, 'i'), +# } +#config.dict_override(args_override, 'cli flag') +#config.censor('PASSWORD', 'DATABASE') +#config.censor('PASSWORD', 'SSL') +#logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) -app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) +config.add(args.y, '_KEYSTORE_FILE', True) -queue = args.q +config.add(args.q, '_CELERY_QUEUE', True) -chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) +cic_base.config.log(config) + +#app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) + +#queue = args.q +# +#chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) -re_websocket = re.compile('^wss?://') -re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) +#re_websocket = re.compile('^wss?://') +#re_http = re.compile('^https?://') +#blockchain_provider = config.get('ETH_PROVIDER') +#if re.match(re_websocket, blockchain_provider) != None: +# blockchain_provider = WebsocketProvider(blockchain_provider) +#elif re.match(re_http, blockchain_provider) != None: +# blockchain_provider = HTTPProvider(blockchain_provider) +#else: +# raise ValueError('unknown provider url {}'.format(blockchain_provider)) +# +#def web3_constructor(): +# w3 = web3.Web3(blockchain_provider) +# return (blockchain_provider, w3) +#RpcClient.set_constructor(web3_constructor) +# +#c = RpcClient(chain_spec) +#CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) +#CICRegistry.add_path(config.get('ETH_ABI_DIR')) +#chain_registry = ChainRegistry(chain_spec) +#CICRegistry.add_chain_registry(chain_registry, True) -def web3_constructor(): - w3 = web3.Web3(blockchain_provider) - return (blockchain_provider, w3) -RpcClient.set_constructor(web3_constructor) - -c = RpcClient(chain_spec) -CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) -CICRegistry.add_path(config.get('ETH_ABI_DIR')) -chain_registry = ChainRegistry(chain_spec) -CICRegistry.add_chain_registry(chain_registry, True) - -declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator') dsn = dsn_from_config(config) @@ -122,36 +145,56 @@ SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG')) def main(): - global chain_spec, c, queue + #global chain_spec, c, queue - if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None: - CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True) + # parse chain spec object + chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) + + # connect to celery + celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) - syncers = [] - block_offset = c.w3.eth.blockNumber - chain = str(chain_spec) + # set up registry + w3 = cic_base.rpc.create(config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored + registry = cic_base.registry.init_legacy(config, w3) - if SyncerBackend.first(chain): - from cic_eth.sync.history import HistorySyncer - backend = SyncerBackend.initial(chain, block_offset) - syncer = HistorySyncer(backend) - syncers.append(syncer) + #if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None: + # CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True) - if args.mode == 'head': - from cic_eth.sync.head import HeadSyncer - block_sync = SyncerBackend.live(chain, block_offset+1) - syncers.append(HeadSyncer(block_sync)) - elif args.mode == 'history': - from cic_eth.sync.history import HistorySyncer - backends = SyncerBackend.resume(chain, block_offset+1) - for backend in backends: - syncers.append(HistorySyncer(backend)) - if len(syncers) == 0: - logg.info('found no unsynced history. terminating') - sys.exit(0) - else: - sys.stderr.write("unknown mode '{}'\n".format(args.mode)) - sys.exit(1) + # Connect to blockchain with chainlib + conn = HTTPConnection(config.get('ETH_PROVIDER')) + + o = block_latest() + r = conn.do(o) + block_offset = int(strip_0x(r), 16) + 1 + + logg.debug('starting at block {}'.format(block_offset)) + + #syncers = [] + #block_offset = c.w3.eth.blockNumber + #chain = str(chain_spec) + + if SyncerBackend.first(config.get('CIC_CHAIN_SPEC')): + #from cic_eth.sync.history import HistorySyncer + backend = SyncerBackend.initial(config.get('CIC_CHAIN_SPEC'), block_offset) + #syncer = HistorySyncer(backend) + #syncers.append(syncer) + + return + + #from cic_eth.sync.head import HeadSyncer + block_sync = SyncerBackend.live(config.get('CIC_CHAIN_SPEC'), block_offset+1) + syncers.append(HeadSyncer(block_sync)) +# elif args.mode == 'history': +# from cic_eth.sync.history import HistorySyncer +# backends = SyncerBackend.resume(chain, block_offset+1) +# for backend in backends: +# syncers.append(HistorySyncer(backend)) +# if len(syncers) == 0: +# logg.info('found no unsynced history. terminating') +# sys.exit(0) +# else: +# sys.stderr.write("unknown mode '{}'\n".format(args.mode)) +# sys.exit(1) # bancor_registry_contract = CICRegistry.get_contract(chain_spec, 'BancorRegistry', interface='Registry') # bancor_chain_registry = CICRegistry.get_chain_registry(chain_spec) diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index 4d9dc2e6..59423863 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -20,3 +20,4 @@ moolb~=0.1.1b2 eth-address-index~=0.1.0a8 chainlib~=0.0.1a16 hexathon~=0.0.1a3 +chainsyncer~=0.0.1a8