diff --git a/eth_monitor/callback.py b/eth_monitor/callback.py index 036f9c1..244fc23 100644 --- a/eth_monitor/callback.py +++ b/eth_monitor/callback.py @@ -1,3 +1,9 @@ +# standard imports +import logging + +logg = logging.getLogger(__name__) + + class BlockCallbackFilter: def __init__(self): @@ -11,3 +17,23 @@ class BlockCallbackFilter: def filter(self, block, tx=None): for fltr in self.filters: fltr.filter(block, tx=tx) + + +def state_change_callback(k, old_state, new_state): + logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state)) + + +def filter_change_callback(k, old_state, new_state): + logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state)) + + +def pre_callback(): + logg.debug('starting sync loop iteration') + + +def post_callback(): + logg.debug('ending sync loop iteration') + + + + diff --git a/eth_monitor/chain.py b/eth_monitor/chain.py index d9171cd..f28d382 100644 --- a/eth_monitor/chain.py +++ b/eth_monitor/chain.py @@ -1,6 +1,7 @@ # external imports from chainlib.interface import ChainInterface from chainlib.eth.block import ( + block_latest, block_by_number, Block, ) @@ -12,6 +13,7 @@ from chainlib.eth.tx import ( class EthChainInterface(ChainInterface): def __init__(self): + self._block_latest = block_latest self._block_by_number = block_by_number self._block_from_src = Block.from_src self._tx_receipt = receipt diff --git a/eth_monitor/cli/arg.py b/eth_monitor/cli/arg.py index fc4cab5..8341ad0 100644 --- a/eth_monitor/cli/arg.py +++ b/eth_monitor/cli/arg.py @@ -10,13 +10,19 @@ def process_flags(argparser, flags): argparser.add_argument('--exec', action='append', type=str, help='Add exec (contract) addresses to includes list') argparser.add_argument('--data', action='append', type=str, help='Add data prefix strings to include list') argparser.add_argument('--data-in', action='append', dest='data_in', type=str, help='Add data contain strings to include list') - argparser.add_argument('--x-data', action='append', dest='xdata', type=str, help='Add data prefix string to exclude list') - argparser.add_argument('--x-data-in', action='append', dest='xdata_in', type=str, help='Add data contain string to exclude list') + argparser.add_argument('--x-data', action='append', dest='x_data', type=str, help='Add data prefix string to exclude list') + argparser.add_argument('--x-data-in', action='append', dest='x_data_in', type=str, help='Add data contain string to exclude list') argparser.add_argument('--address', action='append', type=str, help='Add addresses as input, output and exec to includes list') - argparser.add_argument('--x-input', action='append', type=str, dest='xinput', help='Add input (recipient) addresses to excludes list') - argparser.add_argument('--x-output', action='append', type=str, dest='xoutput', help='Add output (sender) addresses to excludes list') - argparser.add_argument('--x-exec', action='append', type=str, dest='xexec', help='Add exec (contract) addresses to excludes list') - argparser.add_argument('--x-address', action='append', type=str, dest='xaddress', help='Add addresses as input, output and exec to excludes list') + argparser.add_argument('--x-input', action='append', type=str, dest='x_input', help='Add input (recipient) addresses to excludes list') + argparser.add_argument('--x-output', action='append', type=str, dest='x_output', help='Add output (sender) addresses to excludes list') + argparser.add_argument('--x-exec', action='append', type=str, dest='x_exec', help='Add exec (contract) addresses to excludes list') + argparser.add_argument('--x-address', action='append', type=str, dest='x_address', help='Add addresses as input, output and exec to excludes list') argparser.add_argument('--includes-file', type=str, dest='includes_file', help='Load include rules from file') argparser.add_argument('--excludes-file', type=str, dest='excludes_file', help='Load exclude rules from file') argparser.add_argument('--include-default', dest='include_default', action='store_true', help='Include all transactions by default') + + # filter flags + argparser.add_argument('--renderer', type=str, action='append', default=[], help='Python modules to dynamically load for rendering of transaction output') + argparser.add_argument('--filter', type=str, action='append', help='Add python module to tx filter path') + argparser.add_argument('--block-filter', type=str, dest='block_filter', action='append', help='Add python module to block filter path') + diff --git a/eth_monitor/cli/config.py b/eth_monitor/cli/config.py index 7983eb6..ec92a93 100644 --- a/eth_monitor/cli/config.py +++ b/eth_monitor/cli/config.py @@ -1,17 +1,52 @@ +# local imports +from .rules import ( + rules_address_args, + rules_data_args, + to_config_names, + ) + def process_config(config, args, flags): arg_override = {} - arg_override['ETHMONITOR_INPUTS'] = getattr(args, 'input') - arg_override['ETHMONITOR_OUTPUTS'] = getattr(args, 'output') - arg_override['ETHMONITOR_EXEC'] = getattr(args, 'exec') - arg_override['ETHMONITOR_ADDRESS'] = getattr(args, 'address') - arg_override['ETHMONITOR_X_INPUTS'] = getattr(args, 'xinput') - arg_override['ETHMONITOR_X_OUTPUTS'] = getattr(args, 'xoutput') - arg_override['ETHMONITOR_X_EXEC'] = getattr(args, 'xexec') - arg_override['ETHMONITOR_X_ADDRESS'] = getattr(args, 'xaddress') + + rules_args = rules_address_args + rules_data_args + + for rules_arg in rules_args: + (vy, vn) = to_config_names(rules_arg) + arg = getattr(args, rules_arg) + if arg == None: + v = config.get(vy) + if bool(v): + arg_override[vy] = v.split(',') + else: + arg_override[vy] = arg + + arg = getattr(args, 'x_' + rules_arg) + if arg == None: + v = config.get(vn) + if bool(v): + arg_override[vn] = v.split(',') + else: + arg_override[vn] = arg + + arg_override['ETHMONITOR_INCLUDES_FILE'] = getattr(args, 'includes_file') + arg_override['ETHMONITOR_EXCLUDES_FILE'] = getattr(args, 'excludes_file') arg_override['ETHMONITOR_INCLUDE_DEFAULT'] = getattr(args, 'include_default') + arg_override['ETHMONITOR_RENDERER'] = getattr(args, 'renderer') + arg_override['ETHMONITOR_FILTER'] = getattr(args, 'filter') + arg_override['ETHMONITOR_BLOCK_FILTER'] = getattr(args, 'block_filter') + + arg_override['ETHMONITOR_STATE_DIR'] = getattr(args, 'state_dir') + config.dict_override(arg_override, 'local cli args') + for rules_arg in rules_args: + (vy, vn) = to_config_names(rules_arg) + if config.get(vy) == None: + config.add([], vy, True) + if config.get(vn) == None: + config.add([], vn, True) + config.add(getattr(args, 'session_id'), '_SESSION_ID', False) config.add(getattr(args, 'cache_dir'), '_CACHE_DIR', False) diff --git a/eth_monitor/cli/rules.py b/eth_monitor/cli/rules.py new file mode 100644 index 0000000..563f2d8 --- /dev/null +++ b/eth_monitor/cli/rules.py @@ -0,0 +1,18 @@ +rules_address_args = [ + 'input', + 'output', + 'exec', + 'address', + ] + +rules_data_args = [ + 'data', + 'data_in', + ] + + +def to_config_names(v): + v = v.upper() + return ('ETHMONITOR_' + v, 'ETHMONITOR_X_' + v) + + diff --git a/eth_monitor/data/config/monitor.ini b/eth_monitor/data/config/monitor.ini index 22de6d9..fb41d51 100644 --- a/eth_monitor/data/config/monitor.ini +++ b/eth_monitor/data/config/monitor.ini @@ -1,13 +1,20 @@ [ethmonitor] -inputs = -outputs = +input = +output = exec = -x_inputs = -x_outputs = +x_input = +x_output = x_exec = address = x_address = +data = +x_data = +data_in = +x_data_in = +includes_file = +excludes_file = renderer = filter = +block_filter = include_default = 0 -state_dir = +state_dir = ./.eth-monitor diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index 48a052b..36e4465 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -27,7 +27,6 @@ from eth_cache.rpc import CacheRPC from eth_cache.store.file import FileStore # local imports -from eth_monitor.chain import EthChainInterface from eth_monitor.filters.cache import Filter as CacheFilter from eth_monitor.rules import ( AddressRules, @@ -38,7 +37,11 @@ from eth_monitor.rules import ( from eth_monitor.filters import RuledFilter from eth_monitor.filters.out import OutFilter from eth_monitor.config import override, list_from_prefix -from eth_monitor.callback import BlockCallbackFilter +from eth_monitor.callback import ( + BlockCallbackFilter, + pre_callback, + post_callback, + ) from eth_monitor.settings import EthMonitorSettings import eth_monitor.cli @@ -61,9 +64,6 @@ eth_monitor.cli.process_flags(argparser, 0) argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default') argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default') -argparser.add_argument('--renderer', type=str, action='append', default=[], help='Python modules to dynamically load for rendering of transaction output') -argparser.add_argument('--filter', type=str, action='append', help='Add python module to tx filter path') -argparser.add_argument('--block-filter', type=str, dest='block_filter', action='append', help='Add python module to block filter path') argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available') argparser.add_argument('--list-backends', dest='list_backends', action='store_true', help='List built-in store backends') argparser.add_argument('-vvv', action='store_true', help='Be incredibly verbose') @@ -124,100 +124,6 @@ logg.debug('loaded settings:\n{}'.format(settings)) -def setup_data_arg_rules(rules, args): - include_data = [] - for v in args.data: - include_data.append(v.lower()) - exclude_data = [] - for v in args.xdata: - exclude_data.append(v.lower()) - - includes = RuleMethod(include_data, description='INCLUDE') - rules.include(includes) - - excludes = RuleMethod(exclude_data, description='EXCLUDE') - rules.exclude(excludes) - - include_data = [] - for v in args.data_in: - include_data.append(v.lower()) - exclude_data = [] - for v in args.xdata_in: - exclude_data.append(v.lower()) - - includes = RuleData(include_data, description='INCLUDE') - rules.include(includes) - - excludes = RuleData(exclude_data, description='EXCLUDE') - rules.exclude(excludes) - - return rules - - -def setup_address_file_rules(rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False): - - if includes_file != None: - f = open(includes_file, 'r') - logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file))) - while True: - r = f.readline() - if r == '': - break - r = r.rstrip() - v = r.split("\t") - - sender = [] - recipient = [] - executable = [] - - try: - if v[0] != '': - sender = v[0].split(',') - except IndexError: - pass - - try: - if v[1] != '': - recipient = v[1].split(',') - except IndexError: - pass - - try: - if v[2] != '': - executable = v[2].split(',') - except IndexError: - pass - - rule = RuleSimple(sender, recipient, executable) - rules.include(rule) - - if excludes_file != None: - f = open(includes_file, 'r') - logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file))) - while True: - r = f.readline() - if r == '': - break - r = r.rstrip() - v = r.split("\t") - - sender = None - recipient = None - executable = None - - if v[0] != '': - sender = v[0].strip(',') - if v[1] != '': - recipient = v[1].strip(',') - if v[2] != '': - executable = v[2].strip(',') - - rule = RuleSimple(sender, recipient, executable) - rules.exclude(rule) - - return rules - - def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data): store = None if cache_dir == None: @@ -240,26 +146,10 @@ def setup_cache_filter(rules_filter=None): return CacheFilter(rules_filter=rules_filter) -def pre_callback(): - logg.debug('starting sync loop iteration') - - -def post_callback(): - logg.debug('ending sync loop iteration') - - def block_callback(block, tx): logg.info('processing {} {}'.format(block, datetime.datetime.fromtimestamp(block.timestamp))) -def state_change_callback(k, old_state, new_state): - logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state)) - - -def filter_change_callback(k, old_state, new_state): - logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state)) - - def main(): rpc = settings.get('RPC') o = block_latest() @@ -267,47 +157,6 @@ def main(): block_offset = int(strip_0x(r), 16) + 1 logg.info('network block height is {}'.format(block_offset)) - keep_alive = False - session_block_offset = 0 - block_limit = 0 - if args.head: - session_block_offset = block_offset - block_limit = -1 - keep_alive = True - else: - session_block_offset = args.offset - - if args.until > 0: - if not args.head and args.until <= session_block_offset: - raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, args.until)) - block_limit = args.until - elif config.true('_KEEP_ALIVE'): - keep_alive=True - block_limit = -1 - - if session_block_offset == -1: - session_block_offset = block_offset - elif not config.true('_KEEP_ALIVE'): - if block_limit == 0: - block_limit = block_offset - - sys.exit(0) - - #address_rules = AddressRules(include_by_default=args.include_default) - address_rules = setup_data_arg_rules( - address_rules, - args, - ) - address_rules = setup_address_file_rules( - address_rules, - includes_file=args.includes_file, - excludes_file=args.excludes_file, - ) - address_rules = setup_address_arg_rules( - address_rules, - args, - ) - store = setup_filter( settings.get('CHAIN_SPEC'), config.get('_CACHE_DIR'), @@ -316,7 +165,7 @@ def main(): ) cache_filter = setup_cache_filter( - rules_filter=address_rules, + rules_filter=settings.get('RULES'), #address_rules, ) filters = [ @@ -325,7 +174,7 @@ def main(): for fltr in list_from_prefix(config, 'filter'): m = importlib.import_module(fltr) - fltr_object = m.Filter(rules_filter=address_rules) + fltr_object = m.Filter(rules_filter=settings.get('RULES')) filters.append(fltr_object) logg.info('using filter module {}'.format(fltr)) @@ -341,19 +190,26 @@ def main(): block_filter_handler.register(m) logg.info('using block filter module {}'.format(block_filter)) - chain_interface = EthChainInterface() - out_filter = OutFilter( settings.get('CHAIN_SPEC'), - rules_filter=address_rules,renderers=renderers_mods, + rules_filter=settings.get('RULES'), + renderers=renderers_mods, ) filters.append(out_filter) logg.info('session is {}'.format(settings.get('SESSION_ID'))) for fltr in filters: - sync_store.register(fltr) - drv = ChainInterfaceDriver(sync_store, chain_interface, offset=session_block_offset, target=block_limit, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_filter_handler.filter) + settings.get('SYNC_STORE').register(fltr) + drv = ChainInterfaceDriver( + settings.get('SYNC_STORE'), + settings.get('SYNCER_INTERFACE'), + offset=settings.get('SYNCER_OFFSET'), + target=settings.get('SYNCER_LIMIT'), + pre_callback=pre_callback, + post_callback=post_callback, + block_callback=block_filter_handler.filter, + ) use_rpc = rpc if not args.fresh: diff --git a/eth_monitor/settings.py b/eth_monitor/settings.py index 7fbb24b..47b091d 100644 --- a/eth_monitor/settings.py +++ b/eth_monitor/settings.py @@ -8,6 +8,8 @@ import importlib from chainlib.settings import ChainSettings from chainsyncer.settings import ChainsyncerSettings from chainlib.eth.connection import EthHTTPConnection +from eth_monitor.chain import EthChainInterface +from chainlib.eth.address import is_address # local imports from eth_monitor.rules import ( @@ -16,6 +18,11 @@ from eth_monitor.rules import ( RuleMethod, RuleData, ) +from eth_monitor.cli.rules import to_config_names +from eth_monitor.callback import ( + state_change_callback, + filter_change_callback, + ) logg = logging.getLogger(__name__) @@ -50,82 +57,162 @@ class EthMonitorSettings(ChainsyncerSettings): else: syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND')) syncer_store_class = getattr(syncer_store_module, 'SyncStore') - state_dir = os.path.join(config.get('_STATE_DIR'), config.get('SYNCER_BACKEND')) - + state_dir = os.path.join(config.get('ETHMONITOR_STATE_DIR'), config.get('SYNCER_BACKEND')) + os.makedirs(state_dir, exist_ok=True) logg.info('using engine {} moduleĀ {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__)) - #state_dir = os.path.join(state_dir, config.get('_SESSION_ID')) - sync_store = syncer_store_class(state_dir, session_id=session.get('SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback) + sync_store = syncer_store_class(state_dir, session_id=self.o['SESSION_ID'], state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback) self.o['STATE_DIR'] = state_dir + self.o['SESSION_DIR'] = os.path.join(state_dir, self.o['SESSION_ID']) self.o['SYNC_STORE'] = sync_store - #def process_address_arg_rules(rules, args): def process_address_arg_rules(self, config): - include_inputs = config.get('ETHMONITOR_INPUTS') - if include_inputs == None: - include_inputs = [] - else: - include_inputs = include_inputs.split(',') + category = { + 'input': { + 'i': [], + 'x': [], + }, + 'output': { + 'i': [], + 'x': [], + }, + 'exec': { + 'i': [], + 'x': [], + }, + } + for rules_arg in [ + 'input', + 'output', + 'exec', + ]: + (vy, vn) = to_config_names(rules_arg) + for address in config.get(vy): + if not is_address(address): + raise ValueError('invalid address in config {}: {}'.format(vy, address)) + category[rules_arg]['i'].append(address) + for address in config.get(vn): + if not is_address(address): + raise ValueError('invalid address in config {}: {}'.format(vn, address)) + category[rules_arg]['x'].append(address) - include_outputs = config.get('ETHMONITOR_OUTPUTS') - if include_outputs == None: - include_outputs = [] - else: - include_outputs = include_outputs.split(',') - - include_exec = config.get('ETHMONITOR_EXEC') - if include_exec == None: - include_exec = [] - else: - include_exec = include_exec.split(',') - - exclude_inputs = config.get('ETHMONITOR_X_INPUTS') - if exclude_inputs == None: - exclude_inputs = [] - else: - exclude_inputs = exclude_inputs.split(',') - - exclude_outputs = config.get('ETHMONITOR_X_OUTPUTS') - if exclude_outputs == None: - exclude_outputs = [] - else: - exclude_outputs = exclude_outputs.split(',') - - exclude_exec = config.get('ETHMONITOR_X_EXEC') - if exclude_exec == None: - exclude_exec = [] - else: - exclude_exec = exclude_exec.split(',') - - - address = config.get('ETHMONITOR_ADDRESS') - if address != None: - for address in address.split(','): - include_inputs.append(address) - include_outputs.append(address) - include_exec.append(address) - - address = config.get('ETHMONITOR_X_ADDRESS') - if address != None: - for address in address.split(','): - exclude_inputs.append(address) - exclude_outputs.append(address) - exclude_exec.append(address) - - includes = RuleSimple(include_outputs, include_inputs, include_exec, description='INCLUDE') + includes = RuleSimple( + category['output']['i'], + category['input']['i'], + category['exec']['i'], + description='INCLUDE', + ) self.o['RULES'].include(includes) - excludes = RuleSimple(exclude_outputs, exclude_inputs, exclude_exec, description='EXCLUDE') + excludes = RuleSimple( + category['output']['x'], + category['input']['x'], + category['exec']['x'], + description='EXCLUDE', + ) self.o['RULES'].exclude(excludes) + + def process_data_arg_rules(self, config): #rules, args): + include_data = [] + for v in config.get('ETHMONITOR_DATA'): + include_data.append(v.lower()) + exclude_data = [] + for v in config.get('ETHMONITOR_X_DATA'): + exclude_data.append(v.lower()) + + includes = RuleMethod(include_data, description='INCLUDE') + self.o['RULES'].include(includes) + + excludes = RuleMethod(exclude_data, description='EXCLUDE') + self.o['RULES'].exclude(excludes) + + include_data = [] + for v in config.get('ETHMONITOR_DATA_IN'): + include_data.append(v.lower()) + exclude_data = [] + for v in config.get('ETHMONITOR_X_DATA_IN'): + exclude_data.append(v.lower()) + + includes = RuleData(include_data, description='INCLUDE') + self.o['RULES'].include(includes) + + excludes = RuleData(exclude_data, description='EXCLUDE') + self.o['RULES'].exclude(excludes) + + + def process_address_file_rules(self, config): #rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False): + includes_file = config.get('ETHMONITOR_INCLUDES_FILE') + if includes_file != None: + f = open(includes_file, 'r') + logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file))) + while True: + r = f.readline() + if r == '': + break + r = r.rstrip() + v = r.split("\t") + + sender = [] + recipient = [] + executable = [] + + try: + if v[0] != '': + sender = v[0].split(',') + except IndexError: + pass + + try: + if v[1] != '': + recipient = v[1].split(',') + except IndexError: + pass + + try: + if v[2] != '': + executable = v[2].split(',') + except IndexError: + pass + + rule = RuleSimple(sender, recipient, executable) + rules.include(rule) + + excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE') + if excludes_file != None: + f = open(includes_file, 'r') + logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file))) + while True: + r = f.readline() + if r == '': + break + r = r.rstrip() + v = r.split("\t") + + sender = None + recipient = None + executable = None + + if v[0] != '': + sender = v[0].strip(',') + if v[1] != '': + recipient = v[1].strip(',') + if v[2] != '': + executable = v[2].strip(',') + + rule = RuleSimple(sender, recipient, executable) + rules.exclude(rule) + + def process_arg_rules(self, config): address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT')) self.o['RULES'] = address_rules - self.process_address_arg_rules(config) + self.process_data_arg_rules(config) + self.process_address_file_rules(config) def process_common(self, config): @@ -137,7 +224,19 @@ class EthMonitorSettings(ChainsyncerSettings): self.o['RPC'] = EthHTTPConnection(url=rpc_provider, chain_spec=self.o['CHAIN_SPEC']) + def process_sync_interface(self, config): + self.o['SYNCER_INTERFACE'] = EthChainInterface() + + + def process_sync(self, config): + self.process_sync_interface(config) + self.process_sync_backend(config) + self.process_sync_range(config) + + def process(self, config): self.process_common(config) self.process_monitor_session(config) + self.process_monitor_session_dir(config) self.process_arg_rules(config) + self.process_sync(config)