4
0
mirror of git://holbrook.no/eth-monitor.git synced 2025-01-11 13:57:32 +01:00
eth-monitor/eth_monitor/runnable/sync.py

434 lines
17 KiB
Python
Raw Permalink Normal View History

2021-06-26 14:04:34 +02:00
# standard imports
import sys
import signal
import argparse
import confini
import logging
import os
2022-01-30 20:44:03 +01:00
import importlib
import uuid
2022-03-31 19:32:25 +02:00
import datetime
2021-06-26 14:04:34 +02:00
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.block import block_latest
from hexathon import (
strip_0x,
add_0x,
)
2022-04-20 19:42:11 +02:00
#from chainsyncer.store.fs import SyncFsStore
from chainsyncer.driver.chain_interface import ChainInterfaceDriver
from chainsyncer.error import SyncDone
from eth_cache.rpc import CacheRPC
from eth_cache.store.file import FileStore
2021-06-26 14:04:34 +02:00
# local imports
from eth_monitor.chain import EthChainInterface
2022-01-23 23:07:59 +01:00
from eth_monitor.filters.cache import Filter as CacheFilter
from eth_monitor.rules import (
AddressRules,
RuleSimple,
2022-04-05 13:44:15 +02:00
RuleMethod,
2022-04-05 16:02:18 +02:00
RuleData,
)
from eth_monitor.filters import RuledFilter
2022-01-30 15:43:53 +01:00
from eth_monitor.filters.out import OutFilter
from eth_monitor.config import override, list_from_prefix
2021-06-26 14:04:34 +02:00
2022-04-02 13:29:12 +02:00
logging.STATETRACE = 5
2021-06-26 14:04:34 +02:00
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_eth_provider = os.environ.get('RPC_PROVIDER')
if default_eth_provider == None:
default_eth_provider = os.environ.get('ETH_PROVIDER', 'http://localhost:8545')
script_dir = os.path.realpath(os.path.dirname(__file__))
exec_dir = os.path.realpath(os.getcwd())
2022-01-30 20:44:03 +01:00
#default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config'))
base_config_dir = os.path.join(script_dir, '..', 'data', 'config')
2021-06-26 14:04:34 +02:00
argparser = argparse.ArgumentParser('master eth events monitor')
argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provider, type=str, help='Web3 provider url (http only)')
2022-01-30 20:44:03 +01:00
argparser.add_argument('-c', type=str, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain specification string')
2022-01-23 18:28:25 +01:00
argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block')
argparser.add_argument('--until', type=int, default=0, help='Terminate sync on this block')
2022-01-30 15:43:53 +01:00
argparser.add_argument('--head', action='store_true', help='Start at current block height (overrides --offset, assumes --keep-alive)')
2021-06-26 14:04:34 +02:00
argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids')
argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync')
argparser.add_argument('--keep-alive', action='store_true', dest='keep_alive', help='Continue to sync head after history sync complete')
argparser.add_argument('--input', default=[], action='append', type=str, help='Add input (recipient) addresses to includes list')
argparser.add_argument('--output', default=[], action='append', type=str, help='Add output (sender) addresses to includes list')
argparser.add_argument('--exec', default=[], action='append', type=str, help='Add exec (contract) addresses to includes list')
2022-04-05 16:02:18 +02:00
argparser.add_argument('--data', default=[], action='append', type=str, help='Add data prefix strings to include list')
argparser.add_argument('--data-in', default=[], action='append', dest='data_in', type=str, help='Add data contain strings to include list')
argparser.add_argument('--x-data', default=[], action='append', dest='xdata', type=str, help='Add data prefix string to exclude list')
argparser.add_argument('--x-data-in', default=[], action='append', dest='xdata_in', type=str, help='Add data contain string to exclude list')
argparser.add_argument('--address', default=[], action='append', type=str, help='Add addresses as input, output and exec to includes list')
argparser.add_argument('--x-input', default=[], action='append', type=str, dest='xinput', help='Add input (recipient) addresses to excludes list')
argparser.add_argument('--x-output', default=[], action='append', type=str, dest='xoutput', help='Add output (sender) addresses to excludes list')
argparser.add_argument('--x-exec', default=[], action='append', type=str, dest='xexec', help='Add exec (contract) addresses to excludes list')
argparser.add_argument('--x-address', default=[], action='append', type=str, dest='xaddress', help='Add addresses as input, output and exec to excludes list')
2022-01-23 18:28:25 +01:00
argparser.add_argument('--includes-file', type=str, dest='includes_file', help='Load include rules from file')
argparser.add_argument('--include-default', dest='include_default', action='store_true', help='Include all transactions by default')
argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default')
argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default')
argparser.add_argument('--address-file', type=str, dest='excludes_file', help='Load exclude rules from file')
2022-02-25 15:23:38 +01:00
argparser.add_argument('--renderer', type=str, action='append', default=[], help='Python modules to dynamically load for rendering of transaction output')
argparser.add_argument('--filter', type=str, action='append', help='Add python module filter path')
2022-01-23 18:28:25 +01:00
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state')
2022-02-27 14:52:05 +01:00
argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available')
2022-01-23 18:28:25 +01:00
argparser.add_argument('--single', action='store_true', help='Execute a single sync, regardless of previous states')
argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id')
2022-04-20 19:42:11 +02:00
argparser.add_argument('--backend', type=str, help='State store backend')
argparser.add_argument('--list-backends', dest='list_backends', action='store_true', help='List built-in store backends')
2021-06-26 14:04:34 +02:00
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
argparser.add_argument('-vvv', action='store_true', help='Be incredibly verbose')
2021-06-26 14:04:34 +02:00
args = argparser.parse_args(sys.argv[1:])
2022-04-20 19:42:11 +02:00
if args.list_backends:
for v in [
'fs',
'rocksdb',
]:
print(v)
sys.exit(0)
if args.vvv:
2022-04-02 13:29:12 +02:00
logg.setLevel(logging.STATETRACE)
else:
logging.getLogger('chainlib.connection').setLevel(logging.WARNING)
logging.getLogger('chainlib.eth.tx').setLevel(logging.WARNING)
logging.getLogger('chainsyncer.driver.history').setLevel(logging.WARNING)
logging.getLogger('chainsyncer.driver.head').setLevel(logging.WARNING)
logging.getLogger('chainsyncer.backend.file').setLevel(logging.WARNING)
logging.getLogger('chainsyncer.backend.sql').setLevel(logging.WARNING)
logging.getLogger('chainsyncer.filter').setLevel(logging.WARNING)
if args.vv:
logg.setLevel(logging.DEBUG)
elif args.v:
logg.setLevel(logging.INFO)
2021-06-26 14:04:34 +02:00
config_dir = args.c
2022-01-30 20:44:03 +01:00
config = confini.Config(base_config_dir, os.environ.get('CONFINI_ENV_PREFIX'), override_dirs=args.c)
2021-06-26 14:04:34 +02:00
config.process()
args_override = {
'CHAIN_SPEC': getattr(args, 'i'),
2022-04-20 19:42:11 +02:00
'SYNCER_BACKEND': getattr(args, 'backend'),
2021-06-26 14:04:34 +02:00
}
config.dict_override(args_override, 'cli')
config.add(args.offset, '_SYNC_OFFSET', True)
config.add(args.skip_history, '_NO_HISTORY', True)
2022-01-23 18:28:25 +01:00
config.add(args.single, '_SINGLE', True)
2022-01-30 15:43:53 +01:00
config.add(args.head, '_HEAD', True)
2022-03-08 18:11:15 +01:00
config.add(args.keep_alive, '_KEEP_ALIVE', True)
config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True)
config.add(args.cache_dir, '_CACHE_DIR', True)
config.add(args.session_id, '_SESSION_ID', True)
override(config, 'renderer', env=os.environ, args=args)
override(config, 'filter', env=os.environ, args=args)
if config.get('_SESSION_ID') == None:
2022-03-31 19:32:25 +02:00
if config.get('_SINGLE'):
config.add(str(uuid.uuid4()), '_SESSION_ID', True)
else:
config.add('default', '_SESSION_ID', True)
logg.debug('loaded config:\n{}'.format(config))
2021-06-26 14:04:34 +02:00
chain_spec = ChainSpec.from_chain_str(args.i)
rpc_id_generator = None
if args.seq:
rpc_id_generator = IntSequenceGenerator()
auth = None
if os.environ.get('RPC_AUTHENTICATION') == 'basic':
from chainlib.auth import BasicAuth
auth = BasicAuth(os.environ['RPC_USERNAME'], os.environ['RPC_PASSWORD'])
rpc = EthHTTPConnection(args.p)
def setup_address_arg_rules(rules, args):
include_inputs = args.input
include_outputs = args.output
include_exec = args.exec
exclude_inputs = args.xinput
exclude_outputs = args.xoutput
exclude_exec = args.xexec
for address in args.address:
include_inputs.append(address)
include_outputs.append(address)
include_exec.append(address)
for address in args.xaddress:
exclude_inputs.append(address)
exclude_outputs.append(address)
exclude_exec.append(address)
2022-04-05 13:44:15 +02:00
includes = RuleSimple(include_outputs, include_inputs, include_exec, description='INCLUDE')
rules.include(includes)
2022-01-23 18:28:25 +01:00
2022-04-05 13:44:15 +02:00
excludes = RuleSimple(exclude_outputs, exclude_inputs, exclude_exec, description='EXCLUDE')
rules.exclude(excludes)
return rules
def setup_data_arg_rules(rules, args):
2022-04-05 16:02:18 +02:00
include_data = []
for v in args.data:
include_data.append(v.lower())
exclude_data = []
for v in args.xdata:
exclude_data.append(v.lower())
2022-04-05 13:44:15 +02:00
includes = RuleMethod(include_data, description='INCLUDE')
rules.include(includes)
excludes = RuleMethod(exclude_data, description='EXCLUDE')
rules.exclude(excludes)
2022-04-05 16:02:18 +02:00
include_data = []
for v in args.data_in:
include_data.append(v.lower())
exclude_data = []
for v in args.xdata_in:
exclude_data.append(v.lower())
includes = RuleData(include_data, description='INCLUDE')
rules.include(includes)
excludes = RuleData(exclude_data, description='EXCLUDE')
rules.exclude(excludes)
return rules
def setup_address_file_rules(rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False):
2022-01-23 18:28:25 +01:00
if includes_file != None:
f = open(includes_file, 'r')
logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file)))
while True:
r = f.readline()
if r == '':
break
r = r.rstrip()
v = r.split("\t")
2022-01-23 18:28:25 +01:00
sender = []
recipient = []
executable = []
2022-01-23 18:28:25 +01:00
try:
if v[0] != '':
sender = v[0].split(',')
except IndexError:
pass
2022-01-23 18:28:25 +01:00
try:
if v[1] != '':
recipient = v[1].split(',')
except IndexError:
pass
try:
if v[2] != '':
executable = v[2].split(',')
except IndexError:
pass
rule = RuleSimple(sender, recipient, executable)
rules.include(rule)
2022-01-23 18:28:25 +01:00
if excludes_file != None:
f = open(includes_file, 'r')
logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file)))
while True:
r = f.readline()
if r == '':
break
r = r.rstrip()
v = r.split("\t")
2022-01-23 18:28:25 +01:00
sender = None
recipient = None
executable = None
if v[0] != '':
sender = v[0].strip(',')
2022-01-23 18:28:25 +01:00
if v[1] != '':
recipient = v[1].strip(',')
2022-01-23 18:28:25 +01:00
if v[2] != '':
executable = v[2].strip(',')
2022-01-23 18:28:25 +01:00
rule = RuleSimple(sender, recipient, executable)
rules.exclude(rule)
2022-01-23 18:28:25 +01:00
return rules
def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data):
store = None
2022-01-23 18:28:25 +01:00
if cache_dir == None:
logg.warning('no cache dir specified, will discard everything!!')
2022-03-05 08:01:04 +01:00
from eth_cache.store.null import NullStore
store = NullStore()
else:
store = FileStore(chain_spec, cache_dir)
cache_dir = os.path.realpath(cache_dir)
if cache_dir == None:
import tempfile
cache_dir = tempfile.mkdtemp()
logg.info('using chain spec {} and store {}'.format(chain_spec, store))
RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data)
2022-01-23 18:28:25 +01:00
2022-02-27 14:52:05 +01:00
return store
def setup_cache_filter(rules_filter=None):
return CacheFilter(rules_filter=rules_filter)
2021-06-26 14:04:34 +02:00
2021-06-27 11:01:31 +02:00
def pre_callback():
logg.debug('starting sync loop iteration')
2022-03-08 18:11:15 +01:00
2022-01-23 18:28:25 +01:00
def post_callback():
logg.debug('ending sync loop iteration')
2022-01-23 18:28:25 +01:00
def block_callback(block, tx):
2022-03-31 19:32:25 +02:00
logg.info('processing {} {}'.format(block, datetime.datetime.fromtimestamp(block.timestamp)))
2022-01-30 15:43:53 +01:00
2022-04-02 13:29:12 +02:00
def state_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
def filter_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
2022-01-30 20:44:03 +01:00
def main():
o = block_latest()
r = rpc.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height is {}'.format(block_offset))
2022-03-08 18:11:15 +01:00
keep_alive = False
2022-02-28 22:56:56 +01:00
session_block_offset = 0
block_limit = 0
2022-02-28 22:56:56 +01:00
if args.head:
session_block_offset = block_offset
block_limit = -1
2022-03-08 18:11:15 +01:00
keep_alive = True
2022-02-28 22:56:56 +01:00
else:
session_block_offset = args.offset
if args.until > 0:
2022-04-02 09:32:31 +02:00
if not args.head and args.until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, args.until))
2022-02-28 22:56:56 +01:00
block_limit = args.until
2022-03-08 18:11:15 +01:00
elif config.true('_KEEP_ALIVE'):
keep_alive=True
block_limit = -1
2022-01-23 18:28:25 +01:00
2022-02-28 22:56:56 +01:00
if session_block_offset == -1:
session_block_offset = block_offset
elif not config.true('_KEEP_ALIVE'):
if block_limit == 0:
2022-02-28 22:56:56 +01:00
block_limit = block_offset
address_rules = AddressRules(include_by_default=args.include_default)
2022-04-05 13:44:15 +02:00
address_rules = setup_data_arg_rules(
address_rules,
args,
)
address_rules = setup_address_file_rules(
address_rules,
2022-01-23 18:28:25 +01:00
includes_file=args.includes_file,
excludes_file=args.excludes_file,
)
address_rules = setup_address_arg_rules(
address_rules,
args,
2022-01-23 18:28:25 +01:00
)
2022-02-27 14:52:05 +01:00
store = setup_filter(
2022-01-23 18:28:25 +01:00
chain_spec,
config.get('_CACHE_DIR'),
bool(args.store_tx_data),
bool(args.store_block_data),
)
cache_filter = setup_cache_filter(
2022-01-23 18:28:25 +01:00
rules_filter=address_rules,
)
filters = [
cache_filter,
]
for fltr in list_from_prefix(config, 'filter'):
m = importlib.import_module(fltr)
fltr_object = m.Filter(rules_filter=address_rules)
filters.append(fltr_object)
logg.info('using filter module {}'.format(fltr))
renderers_mods = []
for renderer in list_from_prefix(config, 'renderer'):
m = importlib.import_module(renderer)
renderers_mods.append(m)
logg.info('using renderer module {}'.format(renderer))
2022-01-23 18:28:25 +01:00
chain_interface = EthChainInterface()
2022-01-30 20:44:03 +01:00
out_filter = OutFilter(chain_spec, rules_filter=address_rules, renderers=renderers_mods)
2022-01-30 15:43:53 +01:00
filters.append(out_filter)
2022-04-20 19:42:11 +02:00
syncer_store_module = None
syncer_store_class = None
if config.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
state_dir = os.path.join(config.get('_STATE_DIR'), config.get('SYNCER_BACKEND'))
sync_store = syncer_store_class(state_dir, session_id=config.get('_SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
logg.info('session is {}'.format(sync_store.session_id))
for fltr in filters:
sync_store.register(fltr)
drv = ChainInterfaceDriver(sync_store, chain_interface, offset=session_block_offset, target=block_limit, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_callback)
2022-02-27 15:21:09 +01:00
use_rpc = rpc
if not args.fresh:
use_rpc = CacheRPC(rpc, store)
2021-06-26 14:04:34 +02:00
i = 0
try:
r = drv.run(use_rpc)
except SyncDone as e:
sys.stderr.write("sync {} done at block {}\n".format(drv, e))
2021-06-26 14:04:34 +02:00
i += 1
2022-01-30 20:44:03 +01:00
if __name__ == '__main__':
main()