diff --git a/eth_monitor/filters/base.py b/eth_monitor/filters/base.py index 0747552..a0c6a4b 100644 --- a/eth_monitor/filters/base.py +++ b/eth_monitor/filters/base.py @@ -26,6 +26,7 @@ class RuledFilter: @classmethod def block_callback(cls, block, extra=None): + logg.info('processing {}'.format(block)) cls.store.put_block(block, include_data=cls.include_block_data) @@ -34,5 +35,5 @@ class RuledFilter: if not self.rules_filter.apply_rules(tx): logg.debug('rule match failed for tx {}'.format(tx.hash)) return - logg.info('applying filter {}'.format(self)) + logg.debug('applying filter {}'.format(self)) self.ruled_filter(conn, block, tx, db_session=db_session) diff --git a/eth_monitor/filters/out.py b/eth_monitor/filters/out.py new file mode 100644 index 0000000..0c6b7ae --- /dev/null +++ b/eth_monitor/filters/out.py @@ -0,0 +1,24 @@ +# standard imports +import sys + +# local imports +from .base import RuledFilter + + +class OutFilter(RuledFilter): + + def __init__(self, writer=sys.stdout, renderers=[], rules_filter=None): + super(OutFilter, self).__init__(rules_filter=rules_filter) + self.w = writer + self.renderers = renderers + self.c = 0 + + + def filter(self, con, block, tx, db_session=None): + data = tx.payload + if len(data) > 8: + data = data[:8] + '...' + if len(data) > 0: + data = 'data {}'.format(data) + self.w.write('{} {} {} {}\n'.format(self.c, block, tx, data)) + self.c += 1 diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index 3d15cb4..7a5086a 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -24,6 +24,7 @@ from eth_monitor.chain import EthChainInterface from eth_monitor.filters.cache import Filter as CacheFilter from eth_monitor.rules import AddressRules from eth_monitor.filters import RuledFilter +from eth_monitor.filters.out import OutFilter from eth_monitor.store.file import FileStore logging.basicConfig(level=logging.WARNING) @@ -44,6 +45,8 @@ argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provide argparser.add_argument('-c', type=str, default=default_config_dir, help='config file') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string') argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block') +#argparser.add_argument('--until', type=int, default=0, help='Start sync on this block') +argparser.add_argument('--head', action='store_true', help='Start at current block height (overrides --offset, assumes --keep-alive)') argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids') argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync') argparser.add_argument('--includes-file', type=str, dest='includes_file', help='Load include rules from file') @@ -73,6 +76,20 @@ config.dict_override(args_override, 'cli') config.add(args.offset, '_SYNC_OFFSET', True) config.add(args.skip_history, '_NO_HISTORY', True) config.add(args.single, '_SINGLE', True) +config.add(args.head, '_HEAD', True) +logg.debug('loaded config:\{}'.format(config)) + +block_offset = 0 +if args.head: + block_offset = -1 +else: + block_offset = args.offset + +block_limit = 0 +#if args.until > 0: +# if not args.head and args.until <= block_offset: +# raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(block_offset, args.until)) +# block_limit = args.until logg.debug('config loaded:\n{}'.format(config)) @@ -164,7 +181,7 @@ def setup_cache_filter(rules_filter=None): return CacheFilter(rules_filter=rules_filter) -def setup_backend_resume(chain_spec, block_offset, state_dir, callback, sync_offset=0, skip_history=False): +def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callback, sync_offset=0, skip_history=False): syncers = [] syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir) if len(syncer_backends) == 0: @@ -189,18 +206,29 @@ def setup_backend_resume(chain_spec, block_offset, state_dir, callback, sync_off return syncers -def setup_backend_single(chain_spec, block_offset, state_dir, callback, chain_interface, sync_offset=0, skip_history=False): +def setup_backend_single(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False): syncer_backend = FileBackend.initial(chain_spec, block_offset, start_block_height=sync_offset, base_dir=state_dir) syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback) return [syncer] +def setup_backend_head(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False): + syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir) + syncer = (HeadSyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback)) + return [syncer] + + if __name__ == '__main__': o = block_latest() r = rpc.do(o) block_offset = int(strip_0x(r), 16) + 1 - logg.debug('current block height {}'.format(block_offset)) + logg.info('network block height is {}'.format(block_offset)) + if block_offset == -1: + block_offset = block_latest + elif not config.true('_KEEP_ALIVE'): + if block_limit == 0: + block_limit = block_latest address_rules = setup_address_rules( includes_file=args.includes_file, @@ -231,7 +259,9 @@ if __name__ == '__main__': filters.append(fltr_object) syncer_setup_func = None - if config.true('_SINGLE'): + if config.true('_HEAD'): + syncer_setup_func = setup_backend_head + elif config.true('_SINGLE'): syncer_setup_func = setup_backend_single else: syncer_setup_func = setup_backend_resume @@ -240,6 +270,7 @@ if __name__ == '__main__': syncers = syncer_setup_func( chain_spec, block_offset, + block_limit, state_dir, cache_filter.block_callback, chain_interface, @@ -247,6 +278,9 @@ if __name__ == '__main__': skip_history=config.true('_NO_HISTORY'), ) + out_filter = OutFilter(rules_filter=address_rules) + filters.append(out_filter) + i = 0 for syncer in syncers: logg.info('running syncer index {} {}'.format(i, str(syncer)))