diff --git a/eth_monitor/cli/__init__.py b/eth_monitor/cli/__init__.py index 5c9f166..7123d09 100644 --- a/eth_monitor/cli/__init__.py +++ b/eth_monitor/cli/__init__.py @@ -1,2 +1,2 @@ -from .arg import process_flags +from .arg import process_args from .config import process_config diff --git a/eth_monitor/cli/arg.py b/eth_monitor/cli/arg.py index 1ba0856..c10f753 100644 --- a/eth_monitor/cli/arg.py +++ b/eth_monitor/cli/arg.py @@ -1,4 +1,4 @@ -def process_flags(argparser, flags): +def process_args(argparser, args, flags): # session flags argparser.add_argument('--state-dir', dest='state_dir', type=str, help='Directory to store sync state') argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id') diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index afc98ac..ddb7f30 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -22,28 +22,41 @@ from hexathon import ( #from chainsyncer.store.fs import SyncFsStore from chainsyncer.driver.chain_interface import ChainInterfaceDriver from chainsyncer.error import SyncDone +from chainsyncer.data import config_dir as chainsyncer_config_dir +from chainlib.settings import ChainSettings +from chainlib.eth.settings import process_settings +from chainlib.eth.cli.arg import ( + Arg, + ArgFlag, + process_args, + ) +from chainlib.eth.cli.config import ( + Config, + process_config, + ) # local imports from eth_monitor.callback import ( pre_callback, post_callback, ) -from eth_monitor.settings import EthMonitorSettings import eth_monitor.cli +from eth_monitor.cli.log import process_log +from eth_monitor.settings import process_settings as process_settings_local -logging.STATETRACE = 5 -logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() script_dir = os.path.realpath(os.path.dirname(__file__)) config_dir = os.path.join(script_dir, '..', 'data', 'config') -arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC | chainlib.cli.Flag.PROVIDER -argparser = chainlib.cli.ArgumentParser(arg_flags) -eth_monitor.cli.process_flags(argparser, 0) +arg_flags = ArgFlag() +arg = Arg(arg_flags) +flags = arg_flags.STD_BASE_READ | arg_flags.PROVIDER | arg_flags.CHAIN_SPEC | arg_flags.VERYVERBOSE +argparser = chainlib.eth.cli.ArgumentParser() argparser.add_argument('--list-backends', dest='list_backends', action='store_true', help='List built-in store backends') -argparser.add_argument('-vvv', action='store_true', help='Be incredibly verbose') +argparser = process_args(argparser, arg, flags) +eth_monitor.cli.process_args(argparser, arg, flags) sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD chainsyncer.cli.process_flags(argparser, sync_flags) @@ -59,29 +72,23 @@ if args.list_backends: print(v) sys.exit(0) -logging.getLogger('chainlib.connection').setLevel(logging.WARNING) -logging.getLogger('chainlib.eth.tx').setLevel(logging.WARNING) -logging.getLogger('chainlib.eth.src').setLevel(logging.WARNING) - -if args.vvv: - logg.setLevel(logging.STATETRACE) -else: - if args.vv: - logg.setLevel(logging.DEBUG) - elif args.v: - logg.setLevel(logging.INFO) +logg = process_log(args, logg) base_config_dir = [ chainsyncer.cli.config_dir, config_dir, ] -config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir) +config = Config() +config.add_schema_dir(config_dir) +config.add_schema_dir(chainsyncer_config_dir) +config = process_config(config, arg, args, flags) config = chainsyncer.cli.process_config(config, args, sync_flags) -config = eth_monitor.cli.process_config(config, args, 0) +config = eth_monitor.cli.process_config(config, args, flags) -settings = EthMonitorSettings() -settings.process(config) +settings = ChainSettings() +settings = process_settings(settings, config) +settings = process_settings_local(settings, config) logg.debug('loaded settings:\n{}'.format(settings)) @@ -99,7 +106,7 @@ def main(): ) try: - r = drv.run(settings.get('RPC')) + r = drv.run(settings.get('CONN')) except SyncDone as e: sys.stderr.write("sync {} done at block {}\n".format(drv, e)) diff --git a/eth_monitor/settings.py b/eth_monitor/settings.py index 4a43bfa..72cfa34 100644 --- a/eth_monitor/settings.py +++ b/eth_monitor/settings.py @@ -7,8 +7,8 @@ import tempfile # external imports from chainlib.settings import ChainSettings -from chainsyncer.settings import ChainsyncerSettings from chainlib.eth.connection import EthHTTPConnection +from chainsyncer.settings import * from eth_monitor.chain import EthChainInterface from chainlib.eth.address import is_address from eth_cache.rpc import CacheRPC @@ -37,301 +37,326 @@ from eth_monitor.filters.block import Filter as BlockFilter logg = logging.getLogger(__name__) -class EthMonitorSettings(ChainsyncerSettings): - - def process_monitor_session(self, config): - session_id = config.get('_SESSION_ID') - if session_id == None: - if config.get('_SINGLE'): - session_id = str(uuid.uuid4()) - else: - session_id = 'default' - - self.o['SESSION_ID'] = session_id - - - def process_monitor_session_dir(self, config): - syncer_store_module = None - syncer_store_class = None - state_dir = None - if config.get('SYNCER_BACKEND') == 'mem': - syncer_store_module = importlib.import_module('chainsyncer.store.mem') - syncer_store_class = getattr(syncer_store_module, 'SyncMemStore') +def process_monitor_session(settings, config): + session_id = config.get('_SESSION_ID') + if session_id == None: + if config.get('_SINGLE'): + session_id = str(uuid.uuid4()) else: - if config.get('SYNCER_BACKEND') == 'fs': - syncer_store_module = importlib.import_module('chainsyncer.store.fs') - syncer_store_class = getattr(syncer_store_module, 'SyncFsStore') - elif config.get('SYNCER_BACKEND') == 'rocksdb': - syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb') - syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore') - else: - syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND')) - syncer_store_class = getattr(syncer_store_module, 'SyncStore') - state_dir = os.path.join(config.get('ETHMONITOR_STATE_DIR'), config.get('SYNCER_BACKEND')) - os.makedirs(state_dir, exist_ok=True) - logg.info('using engine {} moduleĀ {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__)) - - session_dir = os.path.join(state_dir, self.o['SESSION_ID']) - sync_store = syncer_store_class(session_dir, session_id=self.o['SESSION_ID'], state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback) - - self.o['STATE_DIR'] = state_dir - self.o['SESSION_DIR'] = session_dir - self.o['SYNC_STORE'] = sync_store + session_id = 'default' + + settings.set('SESSION_ID', session_id) + return settings - def process_address_arg_rules(self, config): - category = { - 'input': { - 'i': [], - 'x': [], - }, - 'output': { - 'i': [], - 'x': [], - }, - 'exec': { - 'i': [], - 'x': [], - }, - } - for rules_arg in [ - 'input', - 'output', - 'exec', - ]: - (vy, vn) = to_config_names(rules_arg) - for address in config.get(vy): - if not is_address(address): - raise ValueError('invalid address in config {}: {}'.format(vy, address)) - category[rules_arg]['i'].append(address) - for address in config.get(vn): - if not is_address(address): - raise ValueError('invalid address in config {}: {}'.format(vn, address)) - category[rules_arg]['x'].append(address) - - includes = RuleSimple( - category['output']['i'], - category['input']['i'], - category['exec']['i'], - description='INCLUDE', - ) - self.o['RULES'].include(includes) - - excludes = RuleSimple( - category['output']['x'], - category['input']['x'], - category['exec']['x'], - description='EXCLUDE', - ) - self.o['RULES'].exclude(excludes) - - - def process_data_arg_rules(self, config): #rules, args): - include_data = [] - for v in config.get('ETHMONITOR_DATA'): - include_data.append(v.lower()) - exclude_data = [] - for v in config.get('ETHMONITOR_X_DATA'): - exclude_data.append(v.lower()) - - includes = RuleMethod(include_data, description='INCLUDE') - self.o['RULES'].include(includes) - - excludes = RuleMethod(exclude_data, description='EXCLUDE') - self.o['RULES'].exclude(excludes) - - include_data = [] - for v in config.get('ETHMONITOR_DATA_IN'): - include_data.append(v.lower()) - exclude_data = [] - for v in config.get('ETHMONITOR_X_DATA_IN'): - exclude_data.append(v.lower()) - - includes = RuleData(include_data, description='INCLUDE') - self.o['RULES'].include(includes) - - excludes = RuleData(exclude_data, description='EXCLUDE') - self.o['RULES'].exclude(excludes) - - - def process_address_file_rules(self, config): #rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False): - includes_file = config.get('ETHMONITOR_INCLUDES_FILE') - if includes_file != None: - f = open(includes_file, 'r') - logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file))) - while True: - r = f.readline() - if r == '': - break - r = r.rstrip() - v = r.split("\t") - - sender = [] - recipient = [] - executable = [] - - try: - if v[0] != '': - sender = v[0].split(',') - except IndexError: - pass - - try: - if v[1] != '': - recipient = v[1].split(',') - except IndexError: - pass - - try: - if v[2] != '': - executable = v[2].split(',') - except IndexError: - pass - - rule = RuleSimple(sender, recipient, executable) - rules.include(rule) - - excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE') - if excludes_file != None: - f = open(includes_file, 'r') - logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file))) - while True: - r = f.readline() - if r == '': - break - r = r.rstrip() - v = r.split("\t") - - sender = None - recipient = None - executable = None - - if v[0] != '': - sender = v[0].strip(',') - if v[1] != '': - recipient = v[1].strip(',') - if v[2] != '': - executable = v[2].strip(',') - - rule = RuleSimple(sender, recipient, executable) - rules.exclude(rule) - - - def process_arg_rules(self, config): - address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT')) - self.o['RULES'] = address_rules - self.process_address_arg_rules(config) - self.process_data_arg_rules(config) - self.process_address_file_rules(config) - - - def process_cache_store(self, config): - cache_dir = config.get('_CACHE_DIR') - store = None - if cache_dir == None: - logg.warning('no cache dir specified, will discard everything!!') - from eth_cache.store.null import NullStore - store = NullStore() +def process_monitor_session_dir(settings, config): + syncer_store_module = None + syncer_store_class = None + session_id = settings.get('SESSION_ID') + state_dir = None + if config.get('SYNCER_BACKEND') == 'mem': + syncer_store_module = importlib.import_module('chainsyncer.store.mem') + syncer_store_class = getattr(syncer_store_module, 'SyncMemStore') + else: + if config.get('SYNCER_BACKEND') == 'fs': + syncer_store_module = importlib.import_module('chainsyncer.store.fs') + syncer_store_class = getattr(syncer_store_module, 'SyncFsStore') + elif config.get('SYNCER_BACKEND') == 'rocksdb': + syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb') + syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore') else: - store = FileStore(self.o['CHAIN_SPEC'], cache_dir) - cache_dir = os.path.realpath(cache_dir) - if cache_dir == None: - import tempfile - cache_dir = tempfile.mkdtemp() - logg.info('using cache store {}'.format(store)) + syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND')) + syncer_store_class = getattr(syncer_store_module, 'SyncStore') + state_dir = os.path.join(config.get('ETHMONITOR_STATE_DIR'), config.get('SYNCER_BACKEND')) + os.makedirs(state_dir, exist_ok=True) + logg.info('using engine {} moduleĀ {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__)) - self.o['CACHE_STORE'] = store - - - def process_cache_filter(self, config): - fltr = CacheFilter(self.o['CACHE_STORE'], rules_filter=self.o['RULES'], include_tx_data=config.true('ETHCACHE_STORE_TX')) - self.o['SYNC_STORE'].register(fltr) - - fltr = BlockFilter(self.o['CACHE_STORE'], include_block_data=config.true('ETHCACHE_STORE_BLOCK')) - self.o['BLOCK_HANDLER'].register(fltr) - - - def process_tx_filter(self, config): - for fltr in list_from_prefix(config, 'filter'): - m = importlib.import_module(fltr) - fltr_object = m.Filter(rules_filter=self.o['RULES']) - self.o['SYNC_STORE'].register(fltr_object) - logg.info('using filter module {}'.format(fltr)) - - - def process_block_filter(self, config): - block_filter_handler = BlockCallbackFilter() - for block_filter in list_from_prefix(config, 'block_filter'): - m = importlib.import_module(block_filter) - block_filter_handler.register(m) - logg.info('using block filter module {}'.format(block_filter)) - - self.o['BLOCK_HANDLER'] = block_filter_handler - - - def process_out_filter(self, config): - out_filter = OutFilter( - self.o['CHAIN_SPEC'], - rules_filter=self.o['RULES'], - renderers=self.o['RENDERER'], + session_dir = os.path.join(state_dir, session_id) + sync_store = syncer_store_class( + session_dir, + session_id=session_id, + state_event_callback=state_change_callback, + filter_state_event_callback=filter_change_callback, ) - self.o['SYNC_STORE'].register(out_filter) + + settings.set('STATE_DIR', state_dir) + settings.set('SESSION_DIR', session_dir) + settings.set('SYNC_STORE', sync_store) + + return settings - def process_filter(self, config): - self.o['FILTER'] = [] +def process_address_arg_rules(settings, config): + rules = settings.get('RULES') + category = { + 'input': { + 'i': [], + 'x': [], + }, + 'output': { + 'i': [], + 'x': [], + }, + 'exec': { + 'i': [], + 'x': [], + }, + } + for rules_arg in [ + 'input', + 'output', + 'exec', + ]: + (vy, vn) = to_config_names(rules_arg) + for address in config.get(vy): + if not is_address(address): + raise ValueError('invalid address in config {}: {}'.format(vy, address)) + category[rules_arg]['i'].append(address) + for address in config.get(vn): + if not is_address(address): + raise ValueError('invalid address in config {}: {}'.format(vn, address)) + category[rules_arg]['x'].append(address) - self.process_renderer(config) + includes = RuleSimple( + category['output']['i'], + category['input']['i'], + category['exec']['i'], + description='INCLUDE', + ) + rules.include(includes) - self.process_block_filter(config) + excludes = RuleSimple( + category['output']['x'], + category['input']['x'], + category['exec']['x'], + description='EXCLUDE', + ) + rules.exclude(excludes) - self.process_cache_filter(config) - self.process_tx_filter(config) - self.process_out_filter(config) + return settings - def process_renderer(self, config): - renderers_mods = [] - for renderer in list_from_prefix(config, 'renderer'): - m = importlib.import_module(renderer) - renderers_mods.append(m) - logg.info('using renderer module {}'.format(renderer)) - self.o['RENDERER'] = renderers_mods +def process_data_arg_rules(settings, config): + rules = settings.get('RULES') + include_data = [] + for v in config.get('ETHMONITOR_DATA'): + include_data.append(v.lower()) + exclude_data = [] + for v in config.get('ETHMONITOR_X_DATA'): + exclude_data.append(v.lower()) - def process_cache_rpc(self, config): - if not config.true('_FRESH'): - self.o['RPC'] = CacheRPC(self.o['RPC'], cache_store) + includes = RuleMethod(include_data, description='INCLUDE') + rules.include(includes) + excludes = RuleMethod(exclude_data, description='EXCLUDE') + rules.exclude(excludes) - def process_common(self, config): - super(EthMonitorSettings, self).process_common(config) - # TODO: duplicate from chaind, consider move to chainlib-eth - rpc_provider = config.get('RPC_PROVIDER') - if rpc_provider == None: - rpc_provider = 'http://localhost:8545' - self.o['RPC'] = EthHTTPConnection(url=rpc_provider, chain_spec=self.o['CHAIN_SPEC']) + include_data = [] + for v in config.get('ETHMONITOR_DATA_IN'): + include_data.append(v.lower()) + exclude_data = [] + for v in config.get('ETHMONITOR_X_DATA_IN'): + exclude_data.append(v.lower()) + + includes = RuleData(include_data, description='INCLUDE') + rules.include(includes) + + excludes = RuleData(exclude_data, description='EXCLUDE') + rules.exclude(excludes) + + return settings - def process_sync_interface(self, config): - self.o['SYNCER_INTERFACE'] = EthChainInterface() +def process_address_file_rules(settings, config): #rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False): + rules = settings.get('RULES') + includes_file = config.get('ETHMONITOR_INCLUDES_FILE') + if includes_file != None: + f = open(includes_file, 'r') + logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file))) + while True: + r = f.readline() + if r == '': + break + r = r.rstrip() + v = r.split("\t") + + sender = [] + recipient = [] + executable = [] + + try: + if v[0] != '': + sender = v[0].split(',') + except IndexError: + pass + + try: + if v[1] != '': + recipient = v[1].split(',') + except IndexError: + pass + + try: + if v[2] != '': + executable = v[2].split(',') + except IndexError: + pass + + rule = RuleSimple(sender, recipient, executable) + rules.include(rule) + + excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE') + if excludes_file != None: + f = open(includes_file, 'r') + logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file))) + while True: + r = f.readline() + if r == '': + break + r = r.rstrip() + v = r.split("\t") + + sender = None + recipient = None + executable = None + + if v[0] != '': + sender = v[0].strip(',') + if v[1] != '': + recipient = v[1].strip(',') + if v[2] != '': + executable = v[2].strip(',') + + rule = RuleSimple(sender, recipient, executable) + rules.exclude(rule) + return settings - def process_sync(self, config): - self.process_sync_interface(config) - self.process_sync_backend(config) - self.process_sync_range(config) +def process_arg_rules(settings, config): + address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT')) + settings.set('RULES', address_rules) + settings = process_address_arg_rules(settings, config) + settings = process_data_arg_rules(settings, config) + settings = process_address_file_rules(settings, config) + return settings - def process_cache(self, config): - self.process_cache_store(config) +def process_cache_store(settings, config): + cache_dir = config.get('_CACHE_DIR') + store = None + if cache_dir == None: + logg.warning('no cache dir specified, will discard everything!!') + from eth_cache.store.null import NullStore + store = NullStore() + else: + store = FileStore(settings.get('CHAIN_SPEC'), cache_dir) + cache_dir = os.path.realpath(cache_dir) + if cache_dir == None: + import tempfile + cache_dir = tempfile.mkdtemp() + logg.info('using cache store {}'.format(store)) + + settings.set('CACHE_STORE', store) + + return settings - def process(self, config): - self.process_common(config) - self.process_monitor_session(config) - self.process_monitor_session_dir(config) - self.process_arg_rules(config) - self.process_sync(config) - self.process_cache(config) - self.process_filter(config) +def process_cache_filter(settings, config): + cache_store = settings.get('CACHE_STORE') + fltr = CacheFilter(cache_store, rules_filter=settings.o['RULES'], include_tx_data=config.true('ETHCACHE_STORE_TX')) + sync_store = settings.get('SYNC_STORE') + sync_store.register(fltr) + + fltr = BlockFilter(cache_store, include_block_data=config.true('ETHCACHE_STORE_BLOCK')) + hndlr = settings.get('BLOCK_HANDLER') + hndlr.register(fltr) + + return settings + + +def process_tx_filter(settings, config): + for fltr in list_from_prefix(config, 'filter'): + m = importlib.import_module(fltr) + fltr_object = m.Filter(rules_filter=settings.get('RULES')) + store = settings.get('SYNC_STORE') + store.register(fltr_object) + logg.info('using filter module {}'.format(fltr)) + return settings + + +def process_block_filter(settings, config): + block_filter_handler = BlockCallbackFilter() + for block_filter in list_from_prefix(config, 'block_filter'): + m = importlib.import_module(block_filter) + block_filter_handler.register(m) + logg.info('using block filter module {}'.format(block_filter)) + + settings.set('BLOCK_HANDLER', block_filter_handler) + return settings + + +def process_out_filter(settings, config): + out_filter = OutFilter( + settings.o['CHAIN_SPEC'], + rules_filter=settings.o['RULES'], + renderers=settings.o['RENDERER'], + ) + store = settings.get('SYNC_STORE') + store.register(out_filter) + return settings + + +def process_filter(settings, config): + settings.set('FILTER', []) + settings = process_renderer(settings, config) + settings = process_block_filter(settings, config) + settings = process_cache_filter(settings, config) + settings = process_tx_filter(settings, config) + settings = process_out_filter(settings, config) + return settings + + +def process_renderer(settings, config): + renderers_mods = [] + for renderer in list_from_prefix(config, 'renderer'): + m = importlib.import_module(renderer) + renderers_mods.append(m) + logg.info('using renderer module {}'.format(renderer)) + settings.set('RENDERER', renderers_mods) + return settings + + +def process_cache_rpc(settings, config): + if not config.true('_FRESH'): + rpc = CacheRPC(settings.get('RPC'), cache_store) + settings.set('RPC', rpc) + return settings + + +def process_sync_interface(settings, config): + ifc = EthChainInterface() + settings.set('SYNCER_INTERFACE', ifc) + return settings + + +def process_sync(settings, config): + settings = process_sync_interface(settings, config) + settings = process_sync_backend(settings, config) + settings = process_sync_range(settings, config) + return settings + + +def process_cache(settings, config): + settings = process_cache_store(settings, config) + return settings + + +def process_settings(settings, config): + settings = process_monitor_session(settings, config) + settings = process_monitor_session_dir(settings, config) + settings = process_arg_rules(settings, config) + settings = process_sync(settings, config) + settings = process_cache(settings, config) + settings = process_filter(settings, config) + return settings diff --git a/requirements.txt b/requirements.txt index 6aa4926..68af469 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ -chainlib-eth~=0.2.0 -chainlib~=0.2.0 -chainsyncer~=0.4.10 +chainlib-eth~=0.3.0 +chainlib~=0.3.0 +chainsyncer~=0.4.11 leveldir~=0.3.0 -eth-cache~=0.1.4 +eth-cache~=0.1.5 diff --git a/setup.cfg b/setup.cfg index 31c6bae..c8204ff 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = eth-monitor -version = 0.4.8 +version = 0.4.9 description = Monitor and cache transactions using match filters author = Louis Holbrook author_email = dev@holbrook.no @@ -33,6 +33,7 @@ packages = eth_monitor.filters eth_monitor.runnable eth_monitor.mock + eth_monitor.cli [options.entry_points] console_scripts =