commit 41d38d4eb8b4ab89f56695d5c1732caadf0c7745 Author: nolash Date: Sat Jun 26 14:04:34 2021 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3d556bb --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +build/ +dist/ +*.pyc +*.egg-info/ diff --git a/config/syncer.ini b/config/syncer.ini new file mode 100644 index 0000000..42e7cd8 --- /dev/null +++ b/config/syncer.ini @@ -0,0 +1,2 @@ +[syncer] +loop_interval = 5 diff --git a/eth_monitor/chain.py b/eth_monitor/chain.py new file mode 100644 index 0000000..d9171cd --- /dev/null +++ b/eth_monitor/chain.py @@ -0,0 +1,18 @@ +# external imports +from chainlib.interface import ChainInterface +from chainlib.eth.block import ( + block_by_number, + Block, + ) +from chainlib.eth.tx import ( + receipt, + Tx, + ) + +class EthChainInterface(ChainInterface): + + def __init__(self): + self._block_by_number = block_by_number + self._block_from_src = Block.from_src + self._tx_receipt = receipt + self._src_normalize = Tx.src_normalize diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py new file mode 100644 index 0000000..9684bfd --- /dev/null +++ b/eth_monitor/runnable/sync.py @@ -0,0 +1,125 @@ +# standard imports +import sys +import signal +import argparse +import confini +import logging +import os + +# external imports +from chainlib.chain import ChainSpec +from chainlib.eth.connection import EthHTTPConnection +from chainlib.eth.block import block_latest +from hexathon import ( + strip_0x, + add_0x, + ) +from chainsyncer.driver.head import HeadSyncer +from chainsyncer.driver.history import HistorySyncer +from chainsyncer.backend.file import FileBackend +from chainsyncer.filter import NoopFilter + +# local imports +from eth_monitor.chain import EthChainInterface + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +default_eth_provider = os.environ.get('RPC_PROVIDER') +if default_eth_provider == None: + default_eth_provider = os.environ.get('ETH_PROVIDER', 'http://localhost:8545') + +script_dir = os.path.realpath(os.path.dirname(__file__)) +exec_dir = os.path.realpath(os.getcwd()) +default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config')) + +argparser = argparse.ArgumentParser('master eth events monitor') +argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provider, type=str, help='Web3 provider url (http only)') +argparser.add_argument('-c', type=str, default=default_config_dir, help='config file') +argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string') +argparser.add_argument('--offset', type=int, default=0, help='Use sequential rpc ids') +argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids') +argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync') +argparser.add_argument('-v', action='store_true', help='Be verbose') +argparser.add_argument('-vv', action='store_true', help='Be more verbose') +args = argparser.parse_args(sys.argv[1:]) + +if args.vv: + logg.setLevel(logging.DEBUG) +elif args.v: + logg.setLevel(logging.INFO) + +config_dir = args.c +config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) +config.process() +args_override = { + 'CHAIN_SPEC': getattr(args, 'i'), + } +config.dict_override(args_override, 'cli') +config.add(args.offset, '_SYNC_OFFSET', True) +config.add(args.skip_history, '_NO_HISTORY', True) +logg.debug('config loaded:\n{}'.format(config)) + +chain_spec = ChainSpec.from_chain_str(args.i) + +state_dir = os.path.join(exec_dir, 'state') + +rpc_id_generator = None +if args.seq: + rpc_id_generator = IntSequenceGenerator() + +auth = None +if os.environ.get('RPC_AUTHENTICATION') == 'basic': + from chainlib.auth import BasicAuth + auth = BasicAuth(os.environ['RPC_USERNAME'], os.environ['RPC_PASSWORD']) +rpc = EthHTTPConnection(args.p) + +if __name__ == '__main__': + o = block_latest() + r = rpc.do(o) + block_offset = int(strip_0x(r), 16) + 1 + logg.debug('current block height {}'.format(block_offset)) + syncers = [] + + syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir) + + if len(syncer_backends) == 0: + initial_block_start = block_offset - 1 + if config.get('_SYNC_OFFSET') != None: + initial_block_start = config.get('_SYNC_OFFSET') + initial_block_offset = block_offset + if config.get('_NO_HISTORY'): + initial_block_start = block_offset + initial_block_offset += 1 + syncer_backends.append(FileBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start, base_dir=state_dir)) + logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset)) + else: + for syncer_backend in syncer_backends: + logg.info('resuming sync session {}'.format(syncer_backend)) + + chain_interface = EthChainInterface() + for syncer_backend in syncer_backends: + syncers.append(HistorySyncer(syncer_backend, chain_interface)) + + #syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir) + #syncers.append(HeadSyncer(syncer_backend, chain_interface)) + + filters = [ + NoopFilter(), + ] + i = 0 + for syncer in syncers: + logg.debug('running syncer index {} {}'.format(i, str(syncer))) + for f in filters: + syncer.add_filter(f) + + r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc) + sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) + + i += 1 + + +# if len(sys.argv) > 1: +# block_number = offset +# sys.stderr.write('starting on block {}\n'.format(block_number)) +# backend.set(block_number, 0) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0a5fb8f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +chainlib>=0.0.4a1,<=0.0.4 +chainsyncer>=0.0.3a1, <=0.0.3 +crypto-dev-signer>=0.4.14a6,<0.5 +eth_erc20~=0.0.10a1