From 4b372481d45adaa94d10eec2590f299f2a90cfc8 Mon Sep 17 00:00:00 2001 From: lash Date: Sun, 23 Jan 2022 22:07:59 +0000 Subject: [PATCH] Add etherscan importer --- eth_monitor/filters/base.py | 39 ++++++++--- eth_monitor/filters/cache.py | 20 +++++- eth_monitor/importers/etherscan.py | 57 ++++++++++++++++ eth_monitor/runnable/import.py | 105 +++++++++++++++++++++++++++++ eth_monitor/runnable/sync.py | 2 +- requirements.txt | 3 +- 6 files changed, 212 insertions(+), 14 deletions(-) create mode 100644 eth_monitor/importers/etherscan.py create mode 100644 eth_monitor/runnable/import.py diff --git a/eth_monitor/filters/base.py b/eth_monitor/filters/base.py index 6821219..98cf41a 100644 --- a/eth_monitor/filters/base.py +++ b/eth_monitor/filters/base.py @@ -1,6 +1,7 @@ # standard imports import os import logging +import json # external imports from chainsyncer.backend.file import chain_dir_for @@ -15,15 +16,20 @@ base_dir = '/var/lib' class RuledFilter: - cache_root = os.path.join(base_dir, 'eth_monitor') + cache_root = None chain_dir = None cache_dir = None block_num_path = None - block_num_dir = None + block_src_path = None block_hash_path = None + block_num_dir = None + block_src_dir = None block_hash_dir = None + address_path = None + address_dir = None tx_path = None tx_dir = None + tx_raw_dir = None def __init__(self, rules_filter=None): @@ -33,25 +39,42 @@ class RuledFilter: @staticmethod - def init(chain_spec, cache_root=None, rules_filter=None): - if cache_root != None: - RuledFilter.cache_root = os.path.join(cache_root, 'eth_monitor') + def init(chain_spec, cache_root=base_dir, rules_filter=None, include_block_data=False, include_tx_data=False): + RuledFilter.cache_root = os.path.join( + cache_root, + 'eth_monitor', + chain_spec.engine(), + chain_spec.fork(), + str(chain_spec.chain_id()), + ) RuledFilter.chain_dir = chain_dir_for(RuledFilter.cache_root) RuledFilter.cache_dir = os.path.join(RuledFilter.chain_dir, 'cache') + RuledFilter.block_src_path = os.path.join(RuledFilter.cache_dir, 'block', 'src') + RuledFilter.block_src_dir = NumDir(RuledFilter.block_src_path, [100000, 1000]) RuledFilter.block_num_path = os.path.join(RuledFilter.cache_dir, 'block', 'num') RuledFilter.block_num_dir = NumDir(RuledFilter.block_num_path, [100000, 1000]) RuledFilter.block_hash_path = os.path.join(RuledFilter.cache_dir, 'block', 'hash') RuledFilter.block_hash_dir = HexDir(RuledFilter.block_hash_path, 32, levels=2) - RuledFilter.tx_path = os.path.join(RuledFilter.cache_dir, 'tx') + RuledFilter.tx_path = os.path.join(RuledFilter.cache_dir, 'tx', 'src') + RuledFilter.tx_raw_path = os.path.join(RuledFilter.cache_dir, 'tx', 'raw') RuledFilter.tx_dir = HexDir(RuledFilter.tx_path, 32, levels=2) + RuledFilter.tx_raw_dir = HexDir(RuledFilter.tx_raw_path, 32, levels=2) + RuledFilter.address_path = os.path.join(RuledFilter.cache_dir, 'address') + RuledFilter.address_dir = HexDir(RuledFilter.address_path, 20, levels=2) + RuledFilter.chain_spec = chain_spec + RuledFilter.include_block_data = include_block_data + RuledFilter.include_tx_data = include_tx_data @classmethod def block_callback(cls, block, extra=None): - src = str(block.src()).encode('utf-8') hash_bytes = bytes.fromhex(strip_0x(block.hash)) - cls.block_hash_dir.add(hash_bytes, src) cls.block_num_dir.add(block.number, hash_bytes) + num_bytes = block.number.to_bytes(8, 'big') + cls.block_hash_dir.add(hash_bytes, num_bytes) + if cls.include_block_data: + src = json.dumps(block.src()).encode('utf-8') + cls.block_src_dir.add(hash_bytes, src) def filter(self, conn, block, tx, db_session=None): diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py index 214851b..a0e4603 100644 --- a/eth_monitor/filters/cache.py +++ b/eth_monitor/filters/cache.py @@ -1,9 +1,14 @@ # standard imports import os import logging +import json # external imports from hexathon import strip_0x +from chainlib.eth.tx import ( + Tx, + pack, + ) # local imports from eth_monitor.filters import RuledFilter @@ -11,8 +16,17 @@ from eth_monitor.filters import RuledFilter logg = logging.getLogger(__name__) -class CacheFilter(RuledFilter): +class Filter(RuledFilter): def ruled_filter(self, conn, block, tx, db_session=None): - src = str(tx.src()).encode('utf-8') - self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src) + raw = pack(tx.src(), self.chain_spec) + tx_hash_dirnormal = strip_0x(tx.hash).upper() + tx_hash_bytes = bytes.fromhex(tx_hash_dirnormal) + self.tx_raw_dir.add(tx_hash_bytes, raw) + address = bytes.fromhex(strip_0x(tx.inputs[0])) + self.address_dir.add_dir(tx_hash_dirnormal, address, b'') + address = bytes.fromhex(strip_0x(tx.outputs[0])) + self.address_dir.add_dir(tx_hash_dirnormal, address, b'') + if self.include_tx_data: + src = json.dumps(tx.src()).encode('utf-8') + self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src) diff --git a/eth_monitor/importers/etherscan.py b/eth_monitor/importers/etherscan.py new file mode 100644 index 0000000..21edf92 --- /dev/null +++ b/eth_monitor/importers/etherscan.py @@ -0,0 +1,57 @@ +# standard imports +import urllib.request +import json + +# external imports +from hexathon import add_0x +from chainlib.eth.block import ( + block_by_hash, + Block, + ) +from chainlib.eth.tx import ( + Tx, + receipt, + ) + + +class EtherscanImporter: + + def __init__(self, rpc, api_key, filters=[], block_callback=None): + self.api_key = api_key + self.filters = filters + self.rpc = rpc + self.block_callback = block_callback + + + def get(self, address): + #f = open('sample_import.json', 'r') + #o = json.load(f) + #f.close() + o = self.get_api(address) + + for v in o['result']: + o = block_by_hash(v['blockHash']) + r = self.rpc.do(o) + block = Block(r) + + if self.block_callback != None: + self.block_callback(block) + + tx_src = block.txs[int(v['transactionIndex'])] + + o = receipt(tx_src['hash']) + r = self.rpc.do(o) + + tx = Tx.from_src(tx_src, block=block, rcpt=r) + + for fltr in self.filters: + fltr.filter(self.rpc, block, tx) + + + def get_api(self, address): + a = add_0x(address) + req = urllib.request.Request(url='https://api.etherscan.io/api?module=account&action=txlist&address={}&tag=latest&api_key={}'.format(a, self.api_key)) + req.add_header('Content-Length', 0) + req.add_header('Accept', 'application/json') + r = urllib.request.urlopen(req) + return json.load(r) diff --git a/eth_monitor/runnable/import.py b/eth_monitor/runnable/import.py new file mode 100644 index 0000000..be75a2f --- /dev/null +++ b/eth_monitor/runnable/import.py @@ -0,0 +1,105 @@ +# standard imports +import argparse +import logging +import sys +import os + +# external imports +from chainlib.encode import TxHexNormalizer +from chainlib.eth.connection import EthHTTPConnection +from chainlib.chain import ChainSpec + +# local imports +from eth_monitor.filters.cache import Filter as CacheFilter +from eth_monitor.filters import RuledFilter + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +normalize_address = TxHexNormalizer().wallet_address + +argparser = argparse.ArgumentParser('master eth events monitor') +argparser.add_argument('--api-key-file', dest='api_key_file', type=str, help='File to read API key from') +argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data') +argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string') +argparser.add_argument('-f', '--address-file', dest='address_file', default=[], type=str, action='append', help='Add addresses from file') +argparser.add_argument('-a', '--address', default=[], type=str, action='append', help='Add address') +argparser.add_argument('-v', action='store_true', help='Be verbose') +argparser.add_argument('-vv', action='store_true', help='Be more verbose') +argparser.add_argument('-p', type=str, help='RPC provider') +args = argparser.parse_args(sys.argv[1:]) + +if args.vv: + logg.setLevel(logging.DEBUG) +elif args.v: + logg.setLevel(logging.INFO) + +api_key = os.environ.get('API_KEY') +if args.api_key_file != None: + f = open(args.api_key_file, 'r') + api_key = f.read() + f.close() + +rpc = EthHTTPConnection(args.p) + +chain_spec = ChainSpec.from_chain_str(args.i) + +def conn_socks_tor(host='127.0.0.1', port=9050): + import socks + import socket + + socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS4, host, port, True) + socket.socket = socks.socksocket + + +def collect_addresses(addresses=[], address_files=[]): + address_collection = [] + for a in addresses: + a = normalize_address(a) + if a in address_collection: + logg.debug('skipping duplicate address {}'.format(a)) + address_collection.append(a) + logg.info('added address {}'.format(a)) + + for fp in address_files: + logg.debug('processing file ' + fp) + f = open(fp, 'r') + while True: + a = f.readline() + if a == '': + break + a = a.rstrip() + a = normalize_address(a) + if a in address_collection: + logg.debug('skipping duplicate address {}'.format(a)) + address_collection.append(a) + logg.info('added address {}'.format(a)) + f.close() + + return address_collection + + +def setup_filter(chain_spec, cache_dir): + cache_dir = os.path.realpath(cache_dir) + if cache_dir == None: + import tempfile + cache_dir = tempfile.mkdtemp() + logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir)) + RuledFilter.init(chain_spec, cache_dir) + + +def main(): + conn_socks_tor() + addresses = collect_addresses(args.address, args.address_file) + + from eth_monitor.importers.etherscan import EtherscanImporter + + setup_filter(chain_spec, args.cache_dir) + filters = [CacheFilter()] + importer = EtherscanImporter(rpc, api_key, filters=filters, block_callback=RuledFilter.block_callback) + for a in addresses: + importer.get(a) + + +if __name__ == '__main__': + main() diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py index 433bf3f..8734c48 100644 --- a/eth_monitor/runnable/sync.py +++ b/eth_monitor/runnable/sync.py @@ -21,7 +21,7 @@ from chainsyncer.filter import NoopFilter # local imports from eth_monitor.chain import EthChainInterface -from eth_monitor.filters.cache import CacheFilter +from eth_monitor.filters.cache import Filter as CacheFilter from eth_monitor.rules import AddressRules from eth_monitor.filters import RuledFilter diff --git a/requirements.txt b/requirements.txt index cdf06b9..5f02bcc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ -chainlib-eth~=0.0.19 +chainlib-eth~=0.0.22 chainsyncer>=0.0.7a1, <=0.0.8 chainsyncer~=0.0.7 -#funga-eth>=0.5.1a1,<0.6.0 eth-erc20~=0.1.6 leveldir~=0.1.0