From 8454b0eaaf3f09c1630da104c210889e2dd941f4 Mon Sep 17 00:00:00 2001 From: lash Date: Sun, 23 Jan 2022 22:44:31 +0000 Subject: [PATCH] Factor out tx, block store --- eth_monitor/filters/base.py | 58 +++---------------------- eth_monitor/filters/cache.py | 21 +-------- eth_monitor/importers/etherscan.py | 8 ++-- eth_monitor/runnable/import.py | 8 +++- eth_monitor/store/file.py | 70 ++++++++++++++++++++++++++++++ 5 files changed, 86 insertions(+), 79 deletions(-) create mode 100644 eth_monitor/store/file.py diff --git a/eth_monitor/filters/base.py b/eth_monitor/filters/base.py index 98cf41a..0747552 100644 --- a/eth_monitor/filters/base.py +++ b/eth_monitor/filters/base.py @@ -4,77 +4,29 @@ import logging import json # external imports -from chainsyncer.backend.file import chain_dir_for -from leveldir.numeric import NumDir -from leveldir.hex import HexDir from hexathon import strip_0x logg = logging.getLogger(__name__) -base_dir = '/var/lib' - class RuledFilter: - cache_root = None - chain_dir = None - cache_dir = None - block_num_path = None - block_src_path = None - block_hash_path = None - block_num_dir = None - block_src_dir = None - block_hash_dir = None - address_path = None - address_dir = None - tx_path = None - tx_dir = None - tx_raw_dir = None - - def __init__(self, rules_filter=None): - if self.chain_dir == None: - raise RuntimeError('filter must be initialized. call RuledFilter.init() first') + if self.store.chain_dir == None: + raise RuntimeError('store must be initialized. call RuledFilter.init() first') self.rules_filter = rules_filter @staticmethod - def init(chain_spec, cache_root=base_dir, rules_filter=None, include_block_data=False, include_tx_data=False): - RuledFilter.cache_root = os.path.join( - cache_root, - 'eth_monitor', - chain_spec.engine(), - chain_spec.fork(), - str(chain_spec.chain_id()), - ) - RuledFilter.chain_dir = chain_dir_for(RuledFilter.cache_root) - RuledFilter.cache_dir = os.path.join(RuledFilter.chain_dir, 'cache') - RuledFilter.block_src_path = os.path.join(RuledFilter.cache_dir, 'block', 'src') - RuledFilter.block_src_dir = NumDir(RuledFilter.block_src_path, [100000, 1000]) - RuledFilter.block_num_path = os.path.join(RuledFilter.cache_dir, 'block', 'num') - RuledFilter.block_num_dir = NumDir(RuledFilter.block_num_path, [100000, 1000]) - RuledFilter.block_hash_path = os.path.join(RuledFilter.cache_dir, 'block', 'hash') - RuledFilter.block_hash_dir = HexDir(RuledFilter.block_hash_path, 32, levels=2) - RuledFilter.tx_path = os.path.join(RuledFilter.cache_dir, 'tx', 'src') - RuledFilter.tx_raw_path = os.path.join(RuledFilter.cache_dir, 'tx', 'raw') - RuledFilter.tx_dir = HexDir(RuledFilter.tx_path, 32, levels=2) - RuledFilter.tx_raw_dir = HexDir(RuledFilter.tx_raw_path, 32, levels=2) - RuledFilter.address_path = os.path.join(RuledFilter.cache_dir, 'address') - RuledFilter.address_dir = HexDir(RuledFilter.address_path, 20, levels=2) - RuledFilter.chain_spec = chain_spec + def init(store, include_block_data=False, include_tx_data=False): + RuledFilter.store = store RuledFilter.include_block_data = include_block_data RuledFilter.include_tx_data = include_tx_data @classmethod def block_callback(cls, block, extra=None): - hash_bytes = bytes.fromhex(strip_0x(block.hash)) - cls.block_num_dir.add(block.number, hash_bytes) - num_bytes = block.number.to_bytes(8, 'big') - cls.block_hash_dir.add(hash_bytes, num_bytes) - if cls.include_block_data: - src = json.dumps(block.src()).encode('utf-8') - cls.block_src_dir.add(hash_bytes, src) + cls.store.put_block(block, include_data=cls.include_block_data) def filter(self, conn, block, tx, db_session=None): diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py index a0e4603..b626662 100644 --- a/eth_monitor/filters/cache.py +++ b/eth_monitor/filters/cache.py @@ -1,14 +1,5 @@ # standard imports -import os import logging -import json - -# external imports -from hexathon import strip_0x -from chainlib.eth.tx import ( - Tx, - pack, - ) # local imports from eth_monitor.filters import RuledFilter @@ -19,14 +10,4 @@ logg = logging.getLogger(__name__) class Filter(RuledFilter): def ruled_filter(self, conn, block, tx, db_session=None): - raw = pack(tx.src(), self.chain_spec) - tx_hash_dirnormal = strip_0x(tx.hash).upper() - tx_hash_bytes = bytes.fromhex(tx_hash_dirnormal) - self.tx_raw_dir.add(tx_hash_bytes, raw) - address = bytes.fromhex(strip_0x(tx.inputs[0])) - self.address_dir.add_dir(tx_hash_dirnormal, address, b'') - address = bytes.fromhex(strip_0x(tx.outputs[0])) - self.address_dir.add_dir(tx_hash_dirnormal, address, b'') - if self.include_tx_data: - src = json.dumps(tx.src()).encode('utf-8') - self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src) + self.store.put_tx(tx, include_data=self.include_tx_data) diff --git a/eth_monitor/importers/etherscan.py b/eth_monitor/importers/etherscan.py index 21edf92..8dd05c9 100644 --- a/eth_monitor/importers/etherscan.py +++ b/eth_monitor/importers/etherscan.py @@ -24,10 +24,10 @@ class EtherscanImporter: def get(self, address): - #f = open('sample_import.json', 'r') - #o = json.load(f) - #f.close() - o = self.get_api(address) + f = open('sample_import.json', 'r') + o = json.load(f) + f.close() + #o = self.get_api(address) for v in o['result']: o = block_by_hash(v['blockHash']) diff --git a/eth_monitor/runnable/import.py b/eth_monitor/runnable/import.py index be75a2f..a4c48e5 100644 --- a/eth_monitor/runnable/import.py +++ b/eth_monitor/runnable/import.py @@ -12,6 +12,7 @@ from chainlib.chain import ChainSpec # local imports from eth_monitor.filters.cache import Filter as CacheFilter from eth_monitor.filters import RuledFilter +from eth_monitor.store.file import FileStore logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -21,6 +22,7 @@ normalize_address = TxHexNormalizer().wallet_address argparser = argparse.ArgumentParser('master eth events monitor') argparser.add_argument('--api-key-file', dest='api_key_file', type=str, help='File to read API key from') argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data') +argparser.add_argument('--include-data', dest='include_data', action='store_true', help='Include data objects') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string') argparser.add_argument('-f', '--address-file', dest='address_file', default=[], type=str, action='append', help='Add addresses from file') argparser.add_argument('-a', '--address', default=[], type=str, action='append', help='Add address') @@ -80,12 +82,14 @@ def collect_addresses(addresses=[], address_files=[]): def setup_filter(chain_spec, cache_dir): + store = FileStore(chain_spec, cache_dir) cache_dir = os.path.realpath(cache_dir) if cache_dir == None: import tempfile cache_dir = tempfile.mkdtemp() logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir)) - RuledFilter.init(chain_spec, cache_dir) + include_data = bool(args.include_data) + RuledFilter.init(store, include_tx_data=include_data, include_block_data=include_data) def main(): @@ -93,7 +97,7 @@ def main(): addresses = collect_addresses(args.address, args.address_file) from eth_monitor.importers.etherscan import EtherscanImporter - + setup_filter(chain_spec, args.cache_dir) filters = [CacheFilter()] importer = EtherscanImporter(rpc, api_key, filters=filters, block_callback=RuledFilter.block_callback) diff --git a/eth_monitor/store/file.py b/eth_monitor/store/file.py new file mode 100644 index 0000000..84c2f95 --- /dev/null +++ b/eth_monitor/store/file.py @@ -0,0 +1,70 @@ +# standard imports +import os +import json +import logging + +# external imports +from hexathon import strip_0x +from chainlib.eth.tx import ( + Tx, + pack, + ) +from chainsyncer.backend.file import chain_dir_for +from leveldir.numeric import NumDir +from leveldir.hex import HexDir + +logg = logging.getLogger(__name__) + +base_dir = '/var/lib' + + +class FileStore: + + def put_tx(self, tx, include_data=False): + raw = pack(tx.src(), self.chain_spec) + tx_hash_dirnormal = strip_0x(tx.hash).upper() + tx_hash_bytes = bytes.fromhex(tx_hash_dirnormal) + self.tx_raw_dir.add(tx_hash_bytes, raw) + address = bytes.fromhex(strip_0x(tx.inputs[0])) + self.address_dir.add_dir(tx_hash_dirnormal, address, b'') + address = bytes.fromhex(strip_0x(tx.outputs[0])) + self.address_dir.add_dir(tx_hash_dirnormal, address, b'') + if include_data: + src = json.dumps(tx.src()).encode('utf-8') + self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src) + + + def put_block(self, block, include_data=False): + hash_bytes = bytes.fromhex(strip_0x(block.hash)) + self.block_num_dir.add(block.number, hash_bytes) + num_bytes = block.number.to_bytes(8, 'big') + self.block_hash_dir.add(hash_bytes, num_bytes) + if include_data: + src = json.dumps(block.src()).encode('utf-8') + self.block_src_dir.add(hash_bytes, src) + + + def __init__(self, chain_spec, cache_root=base_dir): + self.cache_root = os.path.join( + cache_root, + 'eth_monitor', + chain_spec.engine(), + chain_spec.fork(), + str(chain_spec.chain_id()), + ) + self.cache_root = os.path.realpath(self.cache_root) + self.chain_dir = chain_dir_for(self.cache_root) + self.cache_dir = os.path.join(self.chain_dir, 'cache') + self.block_src_path = os.path.join(self.cache_dir, 'block', 'src') + self.block_src_dir = HexDir(self.block_src_path, 32, levels=2) + self.block_num_path = os.path.join(self.cache_dir, 'block', 'num') + self.block_num_dir = NumDir(self.block_num_path, [100000, 1000]) + self.block_hash_path = os.path.join(self.cache_dir, 'block', 'hash') + self.block_hash_dir = HexDir(self.block_hash_path, 32, levels=2) + self.tx_path = os.path.join(self.cache_dir, 'tx', 'src') + self.tx_raw_path = os.path.join(self.cache_dir, 'tx', 'raw') + self.tx_dir = HexDir(self.tx_path, 32, levels=2) + self.tx_raw_dir = HexDir(self.tx_raw_path, 32, levels=2) + self.address_path = os.path.join(self.cache_dir, 'address') + self.address_dir = HexDir(self.address_path, 20, levels=2) + self.chain_spec = chain_spec