diff --git a/eth_monitor/callback.py b/eth_monitor/callback.py index 244fc23..23d0e24 100644 --- a/eth_monitor/callback.py +++ b/eth_monitor/callback.py @@ -35,5 +35,5 @@ def post_callback(): logg.debug('ending sync loop iteration') - - +def block_callback(block, tx): + logg.info('processing {} {}'.format(block, datetime.datetime.fromtimestamp(block.timestamp))) diff --git a/eth_monitor/cli/arg.py b/eth_monitor/cli/arg.py index 8341ad0..1ba0856 100644 --- a/eth_monitor/cli/arg.py +++ b/eth_monitor/cli/arg.py @@ -26,3 +26,7 @@ def process_flags(argparser, flags): argparser.add_argument('--filter', type=str, action='append', help='Add python module to tx filter path') argparser.add_argument('--block-filter', type=str, dest='block_filter', action='append', help='Add python module to block filter path') + # cache flags + argparser.add_argument('--store-tx-data', action='store_true', dest='store_tx_data', help='Store tx data in cache store') + argparser.add_argument('--store-block-data', action='store_true', dest='store_block_data', help='Store block data in cache store') + argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available') diff --git a/eth_monitor/cli/config.py b/eth_monitor/cli/config.py index ec92a93..131edf2 100644 --- a/eth_monitor/cli/config.py +++ b/eth_monitor/cli/config.py @@ -38,6 +38,9 @@ def process_config(config, args, flags): arg_override['ETHMONITOR_STATE_DIR'] = getattr(args, 'state_dir') + arg_override['ETHCACHE_STORE_BLOCK'] = getattr(args, 'store_block_data') + arg_override['ETHCACHE_STORE_TX'] = getattr(args, 'store_tx_data') + config.dict_override(arg_override, 'local cli args') for rules_arg in rules_args: @@ -49,5 +52,6 @@ def process_config(config, args, flags): config.add(getattr(args, 'session_id'), '_SESSION_ID', False) config.add(getattr(args, 'cache_dir'), '_CACHE_DIR', False) + config.add(getattr(args, 'fresh'), '_FRESH', False) return config diff --git a/eth_monitor/data/config/cache.ini b/eth_monitor/data/config/cache.ini new file mode 100644 index 0000000..98859fe --- /dev/null +++ b/eth_monitor/data/config/cache.ini @@ -0,0 +1,3 @@ +[ethcache] +store_tx = 0 +store_block = 0 diff --git a/eth_monitor/filters/base.py b/eth_monitor/filters/base.py index 08cf208..3d3bfe8 100644 --- a/eth_monitor/filters/base.py +++ b/eth_monitor/filters/base.py @@ -12,25 +12,10 @@ logg = logging.getLogger(__name__) class RuledFilter(SyncFilter): - def __init__(self, rules_filter=None): - if self.store.chain_dir == None: - raise RuntimeError('store must be initialized. call RuledFilter.init() first') + def __init__(self, rules_filter=None, store=None): self.rules_filter = rules_filter - @staticmethod - def init(store, include_block_data=False, include_tx_data=False): - RuledFilter.store = store - RuledFilter.include_block_data = include_block_data - RuledFilter.include_tx_data = include_tx_data - - - @classmethod - def block_callback(cls, block, extra=None): - logg.info('processing {}'.format(block)) - cls.store.put_block(block, include_data=cls.include_block_data) - - def filter(self, conn, block, tx, db_session=None): if self.rules_filter != None: if not self.rules_filter.apply_rules(tx): diff --git a/eth_monitor/filters/block.py b/eth_monitor/filters/block.py new file mode 100644 index 0000000..c5fae7a --- /dev/null +++ b/eth_monitor/filters/block.py @@ -0,0 +1,9 @@ +class Filter: + + def __init__(self, store, include_block_data=False): + self.store = store + self.include_block_data = include_block_data + + + def filter(self, block, tx=None): + self.store.put_block(block, include_data=self.include_block_data) diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py index 1c24896..6583da6 100644 --- a/eth_monitor/filters/cache.py +++ b/eth_monitor/filters/cache.py @@ -9,6 +9,12 @@ logg = logging.getLogger(__name__) class Filter(RuledFilter): + def __init__(self, store, rules_filter=None, include_tx_data=False): + super(Filter, self).__init__(rules_filter=rules_filter) + self.store = store + self.include_tx_data = include_tx_data + + def ruled_filter(self, conn, block, tx, db_session=None): self.store.put_tx(tx, include_data=self.include_tx_data) diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index 36e4465..afc98ac 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -23,22 +23,8 @@ from hexathon import ( from chainsyncer.driver.chain_interface import ChainInterfaceDriver from chainsyncer.error import SyncDone -from eth_cache.rpc import CacheRPC -from eth_cache.store.file import FileStore - # local imports -from eth_monitor.filters.cache import Filter as CacheFilter -from eth_monitor.rules import ( - AddressRules, - RuleSimple, - RuleMethod, - RuleData, - ) -from eth_monitor.filters import RuledFilter -from eth_monitor.filters.out import OutFilter -from eth_monitor.config import override, list_from_prefix from eth_monitor.callback import ( - BlockCallbackFilter, pre_callback, post_callback, ) @@ -49,12 +35,6 @@ logging.STATETRACE = 5 logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -#default_eth_provider = os.environ.get('RPC_PROVIDER') -#if default_eth_provider == None: -# default_eth_provider = os.environ.get('ETH_PROVIDER', 'http://localhost:8545') - -#exec_dir = os.path.realpath(os.getcwd()) -#default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config')) script_dir = os.path.realpath(os.path.dirname(__file__)) config_dir = os.path.join(script_dir, '..', 'data', 'config') @@ -62,9 +42,6 @@ arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC | chain argparser = chainlib.cli.ArgumentParser(arg_flags) eth_monitor.cli.process_flags(argparser, 0) -argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default') -argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default') -argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available') argparser.add_argument('--list-backends', dest='list_backends', action='store_true', help='List built-in store backends') argparser.add_argument('-vvv', action='store_true', help='Be incredibly verbose') @@ -82,17 +59,13 @@ if args.list_backends: print(v) sys.exit(0) +logging.getLogger('chainlib.connection').setLevel(logging.WARNING) +logging.getLogger('chainlib.eth.tx').setLevel(logging.WARNING) +logging.getLogger('chainlib.eth.src').setLevel(logging.WARNING) + if args.vvv: logg.setLevel(logging.STATETRACE) else: - logging.getLogger('chainlib.connection').setLevel(logging.WARNING) - logging.getLogger('chainlib.eth.tx').setLevel(logging.WARNING) - logging.getLogger('chainsyncer.driver.history').setLevel(logging.WARNING) - logging.getLogger('chainsyncer.driver.head').setLevel(logging.WARNING) - logging.getLogger('chainsyncer.backend.file').setLevel(logging.WARNING) - logging.getLogger('chainsyncer.backend.sql').setLevel(logging.WARNING) - logging.getLogger('chainsyncer.filter').setLevel(logging.WARNING) - if args.vv: logg.setLevel(logging.DEBUG) elif args.v: @@ -111,96 +84,10 @@ settings = EthMonitorSettings() settings.process(config) logg.debug('loaded settings:\n{}'.format(settings)) -#rpc_id_generator = None -#if args.seq: -# rpc_id_generator = IntSequenceGenerator() - -#auth = None -#if os.environ.get('RPC_AUTHENTICATION') == 'basic': -# from chainlib.auth import BasicAuth -# auth = BasicAuth(os.environ['RPC_USERNAME'], os.environ['RPC_PASSWORD']) -#rpc = EthHTTPConnection(args.p) - - - - -def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data): - store = None - if cache_dir == None: - logg.warning('no cache dir specified, will discard everything!!') - from eth_cache.store.null import NullStore - store = NullStore() - else: - store = FileStore(chain_spec, cache_dir) - cache_dir = os.path.realpath(cache_dir) - if cache_dir == None: - import tempfile - cache_dir = tempfile.mkdtemp() - logg.info('using chain spec {} and store {}'.format(chain_spec, store)) - RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data) - - return store - - -def setup_cache_filter(rules_filter=None): - return CacheFilter(rules_filter=rules_filter) - - -def block_callback(block, tx): - logg.info('processing {} {}'.format(block, datetime.datetime.fromtimestamp(block.timestamp))) - def main(): - rpc = settings.get('RPC') - o = block_latest() - r = rpc.do(o) - block_offset = int(strip_0x(r), 16) + 1 - logg.info('network block height is {}'.format(block_offset)) - - store = setup_filter( - settings.get('CHAIN_SPEC'), - config.get('_CACHE_DIR'), - bool(args.store_tx_data), - bool(args.store_block_data), - ) - - cache_filter = setup_cache_filter( - rules_filter=settings.get('RULES'), #address_rules, - ) - - filters = [ - cache_filter, - ] - - for fltr in list_from_prefix(config, 'filter'): - m = importlib.import_module(fltr) - fltr_object = m.Filter(rules_filter=settings.get('RULES')) - filters.append(fltr_object) - logg.info('using filter module {}'.format(fltr)) - - renderers_mods = [] - for renderer in list_from_prefix(config, 'renderer'): - m = importlib.import_module(renderer) - renderers_mods.append(m) - logg.info('using renderer module {}'.format(renderer)) - - block_filter_handler = BlockCallbackFilter() - for block_filter in list_from_prefix(config, 'block_filter'): - m = importlib.import_module(block_filter) - block_filter_handler.register(m) - logg.info('using block filter module {}'.format(block_filter)) - - out_filter = OutFilter( - settings.get('CHAIN_SPEC'), - rules_filter=settings.get('RULES'), - renderers=renderers_mods, - ) - filters.append(out_filter) - logg.info('session is {}'.format(settings.get('SESSION_ID'))) - for fltr in filters: - settings.get('SYNC_STORE').register(fltr) drv = ChainInterfaceDriver( settings.get('SYNC_STORE'), settings.get('SYNCER_INTERFACE'), @@ -208,21 +95,14 @@ def main(): target=settings.get('SYNCER_LIMIT'), pre_callback=pre_callback, post_callback=post_callback, - block_callback=block_filter_handler.filter, + block_callback=settings.get('BLOCK_HANDLER').filter, ) - use_rpc = rpc - if not args.fresh: - use_rpc = CacheRPC(rpc, store) - - i = 0 try: - r = drv.run(use_rpc) + r = drv.run(settings.get('RPC')) except SyncDone as e: sys.stderr.write("sync {} done at block {}\n".format(drv, e)) - i += 1 - if __name__ == '__main__': main() diff --git a/eth_monitor/settings.py b/eth_monitor/settings.py index 47b091d..4a43bfa 100644 --- a/eth_monitor/settings.py +++ b/eth_monitor/settings.py @@ -3,6 +3,7 @@ import logging import os import uuid import importlib +import tempfile # external imports from chainlib.settings import ChainSettings @@ -10,6 +11,9 @@ from chainsyncer.settings import ChainsyncerSettings from chainlib.eth.connection import EthHTTPConnection from eth_monitor.chain import EthChainInterface from chainlib.eth.address import is_address +from eth_cache.rpc import CacheRPC +from eth_cache.store.file import FileStore + # local imports from eth_monitor.rules import ( @@ -22,7 +26,13 @@ from eth_monitor.cli.rules import to_config_names from eth_monitor.callback import ( state_change_callback, filter_change_callback, + BlockCallbackFilter, ) +from eth_monitor.filters import RuledFilter +from eth_monitor.filters.cache import Filter as CacheFilter +from eth_monitor.config import override, list_from_prefix +from eth_monitor.filters.out import OutFilter +from eth_monitor.filters.block import Filter as BlockFilter logg = logging.getLogger(__name__) @@ -61,10 +71,11 @@ class EthMonitorSettings(ChainsyncerSettings): os.makedirs(state_dir, exist_ok=True) logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__)) - sync_store = syncer_store_class(state_dir, session_id=self.o['SESSION_ID'], state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback) + session_dir = os.path.join(state_dir, self.o['SESSION_ID']) + sync_store = syncer_store_class(session_dir, session_id=self.o['SESSION_ID'], state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback) self.o['STATE_DIR'] = state_dir - self.o['SESSION_DIR'] = os.path.join(state_dir, self.o['SESSION_ID']) + self.o['SESSION_DIR'] = session_dir self.o['SYNC_STORE'] = sync_store @@ -115,7 +126,6 @@ class EthMonitorSettings(ChainsyncerSettings): self.o['RULES'].exclude(excludes) - def process_data_arg_rules(self, config): #rules, args): include_data = [] for v in config.get('ETHMONITOR_DATA'): @@ -215,6 +225,85 @@ class EthMonitorSettings(ChainsyncerSettings): self.process_address_file_rules(config) + def process_cache_store(self, config): + cache_dir = config.get('_CACHE_DIR') + store = None + if cache_dir == None: + logg.warning('no cache dir specified, will discard everything!!') + from eth_cache.store.null import NullStore + store = NullStore() + else: + store = FileStore(self.o['CHAIN_SPEC'], cache_dir) + cache_dir = os.path.realpath(cache_dir) + if cache_dir == None: + import tempfile + cache_dir = tempfile.mkdtemp() + logg.info('using cache store {}'.format(store)) + + self.o['CACHE_STORE'] = store + + + def process_cache_filter(self, config): + fltr = CacheFilter(self.o['CACHE_STORE'], rules_filter=self.o['RULES'], include_tx_data=config.true('ETHCACHE_STORE_TX')) + self.o['SYNC_STORE'].register(fltr) + + fltr = BlockFilter(self.o['CACHE_STORE'], include_block_data=config.true('ETHCACHE_STORE_BLOCK')) + self.o['BLOCK_HANDLER'].register(fltr) + + + def process_tx_filter(self, config): + for fltr in list_from_prefix(config, 'filter'): + m = importlib.import_module(fltr) + fltr_object = m.Filter(rules_filter=self.o['RULES']) + self.o['SYNC_STORE'].register(fltr_object) + logg.info('using filter module {}'.format(fltr)) + + + def process_block_filter(self, config): + block_filter_handler = BlockCallbackFilter() + for block_filter in list_from_prefix(config, 'block_filter'): + m = importlib.import_module(block_filter) + block_filter_handler.register(m) + logg.info('using block filter module {}'.format(block_filter)) + + self.o['BLOCK_HANDLER'] = block_filter_handler + + + def process_out_filter(self, config): + out_filter = OutFilter( + self.o['CHAIN_SPEC'], + rules_filter=self.o['RULES'], + renderers=self.o['RENDERER'], + ) + self.o['SYNC_STORE'].register(out_filter) + + + def process_filter(self, config): + self.o['FILTER'] = [] + + self.process_renderer(config) + + self.process_block_filter(config) + + self.process_cache_filter(config) + self.process_tx_filter(config) + self.process_out_filter(config) + + + def process_renderer(self, config): + renderers_mods = [] + for renderer in list_from_prefix(config, 'renderer'): + m = importlib.import_module(renderer) + renderers_mods.append(m) + logg.info('using renderer module {}'.format(renderer)) + self.o['RENDERER'] = renderers_mods + + + def process_cache_rpc(self, config): + if not config.true('_FRESH'): + self.o['RPC'] = CacheRPC(self.o['RPC'], cache_store) + + def process_common(self, config): super(EthMonitorSettings, self).process_common(config) # TODO: duplicate from chaind, consider move to chainlib-eth @@ -234,9 +323,15 @@ class EthMonitorSettings(ChainsyncerSettings): self.process_sync_range(config) + def process_cache(self, config): + self.process_cache_store(config) + + def process(self, config): self.process_common(config) self.process_monitor_session(config) self.process_monitor_session_dir(config) self.process_arg_rules(config) self.process_sync(config) + self.process_cache(config) + self.process_filter(config) diff --git a/requirements.txt b/requirements.txt index 2d909ab..cec62f0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,4 @@ chainlib-eth~=0.2.0 chainlib~=0.2.0 chainsyncer~=0.4.9 leveldir~=0.3.0 -eth-cache~=0.1.3 +eth-cache~=0.1.4