diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py index b626662..7db893d 100644 --- a/eth_monitor/filters/cache.py +++ b/eth_monitor/filters/cache.py @@ -11,3 +11,12 @@ class Filter(RuledFilter): def ruled_filter(self, conn, block, tx, db_session=None): self.store.put_tx(tx, include_data=self.include_tx_data) + + + def filter(self, conn, block, tx, db_session=None): + r = super(Filter, self).filter(conn, block, tx, db_session=db_session) + if r == False: + return True + + self.ruled_filter(conn, block, tx, db_session=db_session) + return True diff --git a/eth_monitor/filters/out.py b/eth_monitor/filters/out.py index ece506a..a3844d6 100644 --- a/eth_monitor/filters/out.py +++ b/eth_monitor/filters/out.py @@ -14,6 +14,24 @@ def apply_interface(c, s, chain_str, conn, block, tx, db_session=None): pass +class OutResult: + + def __init__(self): + self.content = '' + + + def set(self, v): + self.content = v + + + def get(self): + return self.content + + + def __str__(self): + return self.content + + class OutFilter(RuledFilter): def __init__(self, chain_spec, writer=sys.stdout, renderers=[], rules_filter=None): @@ -22,6 +40,7 @@ class OutFilter(RuledFilter): self.renderers = renderers self.c = 0 self.chain_spec = chain_spec + self.result = OutResult() def filter(self, conn, block, tx, db_session=None): @@ -29,14 +48,14 @@ class OutFilter(RuledFilter): if r == False: return True - s = None - for renderer in self.renderers: - s = renderer.apply(self.c, s, self.chain_spec, conn, block, tx) - if s != None: + r = renderer.apply(self.c, self.result, self.chain_spec, conn, block, tx) + if not r: break - if s == None: + s = str(self.result) + + if s == '': data = tx.payload if len(data) > 8: data = data[:8] + '...' diff --git a/eth_monitor/importers/etherscan.py b/eth_monitor/importers/etherscan.py index 21edf92..9a7b337 100644 --- a/eth_monitor/importers/etherscan.py +++ b/eth_monitor/importers/etherscan.py @@ -14,7 +14,7 @@ from chainlib.eth.tx import ( ) -class EtherscanImporter: +class Importer: def __init__(self, rpc, api_key, filters=[], block_callback=None): self.api_key = api_key diff --git a/eth_monitor/runnable/import.py b/eth_monitor/runnable/import.py index 6601e0c..5a9887d 100644 --- a/eth_monitor/runnable/import.py +++ b/eth_monitor/runnable/import.py @@ -107,7 +107,7 @@ def main(): conn_socks_tor() addresses = collect_addresses(args.address, args.address_file) - from eth_monitor.importers.etherscan import EtherscanImporter + from eth_monitor.importers.etherscan import Importer as EtherscanImporter address_rules = setup_address_rules(args.address) diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index 637448a..bcfceaa 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -30,6 +30,7 @@ from eth_monitor.rules import ( from eth_monitor.filters import RuledFilter from eth_monitor.filters.out import OutFilter from eth_monitor.store.file import FileStore +from eth_monitor.rpc import CacheRPC logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -69,6 +70,7 @@ argparser.add_argument('--address-file', type=str, dest='excludes_file', help='L argparser.add_argument('--renderer', type=str, action='append', default=[], help='Python modules to dynamically load for rendering of transaction output') argparser.add_argument('--filter', type=str, action='append', help='Add python module filter path') argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data') +argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available') argparser.add_argument('--single', action='store_true', help='Execute a single sync, regardless of previous states') argparser.add_argument('-v', action='store_true', help='Be verbose') argparser.add_argument('-vv', action='store_true', help='Be more verbose') @@ -85,8 +87,6 @@ else: logging.getLogger('chainsyncer.backend.file').setLevel(logging.WARNING) logging.getLogger('chainsyncer.backend.sql').setLevel(logging.WARNING) logging.getLogger('chainsyncer.filter').setLevel(logging.WARNING) - logging.getLogger('leveldir.hex').setLevel(level=logging.DEBUG) - logging.getLogger('leveldir.numeric').setLevel(level=logging.DEBUG) if args.vv: logg.setLevel(logging.DEBUG) @@ -241,6 +241,8 @@ def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data): logg.info('using chain spec {} and store {}'.format(chain_spec, store)) RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data) + return store + def setup_cache_filter(rules_filter=None): return CacheFilter(rules_filter=rules_filter) @@ -308,7 +310,7 @@ def main(): args, ) - setup_filter( + store = setup_filter( chain_spec, args.cache_dir, bool(args.store_tx_data), @@ -357,13 +359,14 @@ def main(): out_filter = OutFilter(chain_spec, rules_filter=address_rules, renderers=renderers_mods) filters.append(out_filter) + cache_rpc = CacheRPC(rpc, store) i = 0 for syncer in syncers: logg.info('running syncer index {} {}'.format(i, str(syncer))) for f in filters: syncer.add_filter(f) - r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc) + r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), cache_rpc) sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) i += 1 diff --git a/eth_monitor/store/file.py b/eth_monitor/store/file.py index 78780c0..2e055bf 100644 --- a/eth_monitor/store/file.py +++ b/eth_monitor/store/file.py @@ -54,6 +54,13 @@ class FileStore: src = json.dumps(tx.src()).encode('utf-8') self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src) + rcpt_src = tx.rcpt_src() + logg.debug('rcpt {}'.format(rcpt_src)) + if rcpt_src != None: + rcpt_src = json.dumps(rcpt_src).encode('utf-8') + self.rcpt_dir.add(bytes.fromhex(strip_0x(tx.hash)), rcpt_src) + + def put_block(self, block, include_data=False): hash_bytes = bytes.fromhex(strip_0x(block.hash)) @@ -65,6 +72,37 @@ class FileStore: self.block_src_dir.add(hash_bytes, src) + def get_block_number(self, block_number): + fp = self.block_num_dir.to_filepath(block_number) + f = open(fp, 'rb') + r = f.read() + f.close() + return self.get_block(r.hex()) + + + def get_block(self, block_hash): + fp = self.block_src_dir.to_filepath(block_hash) + f = open(fp, 'rb') + r = f.read() + f.close() + return r + + + def get_tx(self, tx_hash): + fp = self.tx_dir.to_filepath(tx_hash) + f = open(fp, 'rb') + r = f.read() + f.close() + return r + + + def get_rcpt(self, tx_hash): + fp = self.rcpt_dir.to_filepath(tx_hash) + f = open(fp, 'rb') + r = f.read() + f.close() + return r + def __init__(self, chain_spec, cache_root=base_dir, address_rules=None): self.cache_root = os.path.join( cache_root, @@ -86,6 +124,10 @@ class FileStore: self.tx_raw_path = os.path.join(self.cache_dir, 'tx', 'raw') self.tx_dir = HexDir(self.tx_path, 32, levels=2) self.tx_raw_dir = HexDir(self.tx_raw_path, 32, levels=2) + self.rcpt_path = os.path.join(self.cache_dir, 'rcpt', 'src') + self.rcpt_raw_path = os.path.join(self.cache_dir, 'rcpt', 'raw') + self.rcpt_dir = HexDir(self.rcpt_path, 32, levels=2) + self.rcpt_raw_dir = HexDir(self.rcpt_raw_path, 32, levels=2) self.address_path = os.path.join(self.cache_dir, 'address') self.address_dir = HexDir(self.address_path, 20, levels=2) self.chain_spec = chain_spec diff --git a/eth_monitor/store/null.py b/eth_monitor/store/null.py index e331b54..3dda4cf 100644 --- a/eth_monitor/store/null.py +++ b/eth_monitor/store/null.py @@ -16,6 +16,22 @@ class NullStore: pass + def get_block_number(self, v): + raise FileNotFoundError(v) + + + def get_block(self, v): + raise FileNotFoundError(v) + + + def get_tx(self, v): + raise FileNotFoundError(v) + + + def get_rcpt(self, v): + raise FileNotFoundError(v) + + def __init__(self): self.chain_dir = '/dev/null' diff --git a/man/eth-monitor.custom.groff b/man/eth-monitor.custom.groff index 3c1803a..dfffe96 100644 --- a/man/eth-monitor.custom.groff +++ b/man/eth-monitor.custom.groff @@ -2,7 +2,8 @@ By default, addresses to match against transactions need to be explicitly specified. This behavior can be reversed with the \fB--include-default\fP option. Addresses to match are defined using the \fB--input\fP, \fB--output\fP and \fB--exec\fP options. Addresses specified multiple times will be deduplicated. .P Inclusion rules may also be loaded from file by specifying the \fB--includes-file\fP and \fB--excludes-file\fP options. Each file must specify the outputs, inputs and exec addresses as comma separated lists respectively, separated by tabs. - +.P +In the current state of this tool, address matching will affect all parts of the processing; cache, code execution and rendering. .SH SYNCING When a sync is initiated, the state of this sync is persisted. This way, previous syncs that did not complete for some reason will be resumed where they left off. @@ -13,13 +14,17 @@ Syncs can be forced to (re)run for ranges regardless of previous state by using .SH CACHE -.P When syncing, the hash of a block and transaction matching the address criteria will be stored in the cache. The hashes can be used for future data lookups. .P If \fB--store-block-data\fP and/or \fB--store-tx-data\fP is set, a copy of the block and/or transaction data will also be stored, respectively. .SH RENDERING +Rendering in the context of \fBeth-monitor\fP refers to a formatted output stream that occurs independently of caching and code execution. +.P +Filters for rendering may be specified by specifying python modules to the \fB--renderer\fP option. This option may be specified multiple times. +.P +Rendering filters will be executed in order, and the first filter to return \fIFalse\fP .SH DEFINING FILTERS diff --git a/requirements.txt b/requirements.txt index 3131631..afc45c0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -chainlib-eth~=0.0.27 +chainlib-eth~=0.0.28 chainlib~=0.0.23 chainsyncer~=0.1.0 eth-erc20~=0.1.10