2021-06-26 14:04:34 +02:00
# standard imports
import sys
import signal
import argparse
import confini
import logging
import os
2022-01-30 20:44:03 +01:00
import importlib
2021-06-26 14:04:34 +02:00
# external imports
from chainlib . chain import ChainSpec
from chainlib . eth . connection import EthHTTPConnection
from chainlib . eth . block import block_latest
from hexathon import (
strip_0x ,
add_0x ,
)
from chainsyncer . driver . head import HeadSyncer
from chainsyncer . driver . history import HistorySyncer
from chainsyncer . backend . file import FileBackend
from chainsyncer . filter import NoopFilter
2022-03-01 13:10:35 +01:00
from eth_cache . rpc import CacheRPC
from eth_cache . store . file import FileStore
2021-06-26 14:04:34 +02:00
# local imports
from eth_monitor . chain import EthChainInterface
2022-01-23 23:07:59 +01:00
from eth_monitor . filters . cache import Filter as CacheFilter
2022-02-27 12:48:15 +01:00
from eth_monitor . rules import (
AddressRules ,
RuleSimple ,
)
2022-01-23 18:49:04 +01:00
from eth_monitor . filters import RuledFilter
2022-01-30 15:43:53 +01:00
from eth_monitor . filters . out import OutFilter
2021-06-26 14:04:34 +02:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
default_eth_provider = os . environ . get ( ' RPC_PROVIDER ' )
if default_eth_provider == None :
default_eth_provider = os . environ . get ( ' ETH_PROVIDER ' , ' http://localhost:8545 ' )
script_dir = os . path . realpath ( os . path . dirname ( __file__ ) )
exec_dir = os . path . realpath ( os . getcwd ( ) )
2022-01-30 20:44:03 +01:00
#default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config'))
base_config_dir = os . path . join ( script_dir , ' .. ' , ' data ' , ' config ' )
2021-06-26 14:04:34 +02:00
argparser = argparse . ArgumentParser ( ' master eth events monitor ' )
argparser . add_argument ( ' -p ' , ' --provider ' , dest = ' p ' , default = default_eth_provider , type = str , help = ' Web3 provider url (http only) ' )
2022-01-30 20:44:03 +01:00
argparser . add_argument ( ' -c ' , type = str , help = ' config file ' )
argparser . add_argument ( ' -i ' , ' --chain-spec ' , dest = ' i ' , type = str , help = ' Chain specification string ' )
2022-01-23 18:28:25 +01:00
argparser . add_argument ( ' --offset ' , type = int , default = 0 , help = ' Start sync on this block ' )
2022-02-27 12:48:15 +01:00
argparser . add_argument ( ' --until ' , type = int , default = 0 , help = ' Terminate sync on this block ' )
2022-01-30 15:43:53 +01:00
argparser . add_argument ( ' --head ' , action = ' store_true ' , help = ' Start at current block height (overrides --offset, assumes --keep-alive) ' )
2021-06-26 14:04:34 +02:00
argparser . add_argument ( ' --seq ' , action = ' store_true ' , help = ' Use sequential rpc ids ' )
argparser . add_argument ( ' --skip-history ' , action = ' store_true ' , dest = ' skip_history ' , help = ' Skip history sync ' )
2022-02-27 12:48:15 +01:00
argparser . add_argument ( ' --keep-alive ' , action = ' store_true ' , dest = ' keep_alive ' , help = ' Continue to sync head after history sync complete ' )
argparser . add_argument ( ' --input ' , default = [ ] , action = ' append ' , type = str , help = ' Add input (recipient) addresses to includes list ' )
argparser . add_argument ( ' --output ' , default = [ ] , action = ' append ' , type = str , help = ' Add output (sender) addresses to includes list ' )
argparser . add_argument ( ' --exec ' , default = [ ] , action = ' append ' , type = str , help = ' Add exec (contract) addresses to includes list ' )
argparser . add_argument ( ' --address ' , default = [ ] , action = ' append ' , type = str , help = ' Add addresses as input, output and exec to includes list ' )
argparser . add_argument ( ' --x-input ' , default = [ ] , action = ' append ' , type = str , dest = ' xinput ' , help = ' Add input (recipient) addresses to excludes list ' )
argparser . add_argument ( ' --x-output ' , default = [ ] , action = ' append ' , type = str , dest = ' xoutput ' , help = ' Add output (sender) addresses to excludes list ' )
argparser . add_argument ( ' --x-exec ' , default = [ ] , action = ' append ' , type = str , dest = ' xexec ' , help = ' Add exec (contract) addresses to excludes list ' )
argparser . add_argument ( ' --x-address ' , default = [ ] , action = ' append ' , type = str , dest = ' xaddress ' , help = ' Add addresses as input, output and exec to excludes list ' )
2022-01-23 18:28:25 +01:00
argparser . add_argument ( ' --includes-file ' , type = str , dest = ' includes_file ' , help = ' Load include rules from file ' )
2022-01-24 01:17:27 +01:00
argparser . add_argument ( ' --include-default ' , dest = ' include_default ' , action = ' store_true ' , help = ' Include all transactions by default ' )
argparser . add_argument ( ' --store-tx-data ' , dest = ' store_tx_data ' , action = ' store_true ' , help = ' Include all transaction data objects by default ' )
argparser . add_argument ( ' --store-block-data ' , dest = ' store_block_data ' , action = ' store_true ' , help = ' Include all block data objects by default ' )
2022-02-27 12:48:15 +01:00
argparser . add_argument ( ' --address-file ' , type = str , dest = ' excludes_file ' , help = ' Load exclude rules from file ' )
2022-02-25 15:23:38 +01:00
argparser . add_argument ( ' --renderer ' , type = str , action = ' append ' , default = [ ] , help = ' Python modules to dynamically load for rendering of transaction output ' )
argparser . add_argument ( ' --filter ' , type = str , action = ' append ' , help = ' Add python module filter path ' )
2022-01-23 18:28:25 +01:00
argparser . add_argument ( ' --cache-dir ' , dest = ' cache_dir ' , type = str , help = ' Directory to store tx data ' )
2022-02-27 14:52:05 +01:00
argparser . add_argument ( ' --fresh ' , action = ' store_true ' , help = ' Do not read block and tx data from cache, even if available ' )
2022-01-23 18:28:25 +01:00
argparser . add_argument ( ' --single ' , action = ' store_true ' , help = ' Execute a single sync, regardless of previous states ' )
2021-06-26 14:04:34 +02:00
argparser . add_argument ( ' -v ' , action = ' store_true ' , help = ' Be verbose ' )
argparser . add_argument ( ' -vv ' , action = ' store_true ' , help = ' Be more verbose ' )
2022-02-27 12:48:15 +01:00
argparser . add_argument ( ' -vvv ' , action = ' store_true ' , help = ' Be incredibly verbose ' )
2021-06-26 14:04:34 +02:00
args = argparser . parse_args ( sys . argv [ 1 : ] )
2022-02-27 12:48:15 +01:00
if args . vvv :
2021-06-26 14:04:34 +02:00
logg . setLevel ( logging . DEBUG )
2022-02-27 12:48:15 +01:00
else :
logging . getLogger ( ' chainlib.connection ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainlib.eth.tx ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.driver.history ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.driver.head ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.backend.file ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.backend.sql ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.filter ' ) . setLevel ( logging . WARNING )
if args . vv :
logg . setLevel ( logging . DEBUG )
elif args . v :
logg . setLevel ( logging . INFO )
2021-06-26 14:04:34 +02:00
config_dir = args . c
2022-01-30 20:44:03 +01:00
config = confini . Config ( base_config_dir , os . environ . get ( ' CONFINI_ENV_PREFIX ' ) , override_dirs = args . c )
2021-06-26 14:04:34 +02:00
config . process ( )
args_override = {
' CHAIN_SPEC ' : getattr ( args , ' i ' ) ,
}
config . dict_override ( args_override , ' cli ' )
config . add ( args . offset , ' _SYNC_OFFSET ' , True )
config . add ( args . skip_history , ' _NO_HISTORY ' , True )
2022-01-23 18:28:25 +01:00
config . add ( args . single , ' _SINGLE ' , True )
2022-01-30 15:43:53 +01:00
config . add ( args . head , ' _HEAD ' , True )
logg . debug ( ' loaded config: \ {} ' . format ( config ) )
2021-06-26 14:04:34 +02:00
logg . debug ( ' config loaded: \n {} ' . format ( config ) )
chain_spec = ChainSpec . from_chain_str ( args . i )
state_dir = os . path . join ( exec_dir , ' state ' )
rpc_id_generator = None
if args . seq :
rpc_id_generator = IntSequenceGenerator ( )
auth = None
if os . environ . get ( ' RPC_AUTHENTICATION ' ) == ' basic ' :
from chainlib . auth import BasicAuth
auth = BasicAuth ( os . environ [ ' RPC_USERNAME ' ] , os . environ [ ' RPC_PASSWORD ' ] )
rpc = EthHTTPConnection ( args . p )
2022-02-27 12:48:15 +01:00
def setup_address_arg_rules ( rules , args ) :
include_inputs = args . input
include_outputs = args . output
include_exec = args . exec
exclude_inputs = args . xinput
exclude_outputs = args . xoutput
exclude_exec = args . xexec
for address in args . address :
include_inputs . append ( address )
include_outputs . append ( address )
include_exec . append ( address )
for address in args . xaddress :
exclude_inputs . append ( address )
exclude_outputs . append ( address )
exclude_exec . append ( address )
includes = RuleSimple ( include_outputs , include_inputs , include_exec )
rules . include ( includes )
2022-01-23 18:28:25 +01:00
2022-02-27 12:48:15 +01:00
excludes = RuleSimple ( exclude_outputs , exclude_inputs , exclude_exec )
rules . exclude ( excludes )
return rules
def setup_address_file_rules ( rules , includes_file = None , excludes_file = None , include_default = False , include_block_default = False ) :
2022-01-23 18:28:25 +01:00
if includes_file != None :
f = open ( includes_file , ' r ' )
logg . debug ( ' reading includes rules from {} ' . format ( os . path . realpath ( includes_file ) ) )
while True :
r = f . readline ( )
if r == ' ' :
break
r = r . rstrip ( )
2022-02-27 12:48:15 +01:00
v = r . split ( " \t " )
2022-01-23 18:28:25 +01:00
2022-02-27 12:48:15 +01:00
sender = [ ]
recipient = [ ]
executable = [ ]
2022-01-23 18:28:25 +01:00
2022-02-27 12:48:15 +01:00
try :
if v [ 0 ] != ' ' :
sender = v [ 0 ] . split ( ' , ' )
except IndexError :
pass
2022-01-23 18:28:25 +01:00
2022-02-27 12:48:15 +01:00
try :
if v [ 1 ] != ' ' :
recipient = v [ 1 ] . split ( ' , ' )
except IndexError :
pass
try :
if v [ 2 ] != ' ' :
executable = v [ 2 ] . split ( ' , ' )
except IndexError :
pass
rule = RuleSimple ( sender , recipient , executable )
rules . include ( rule )
2022-01-23 18:28:25 +01:00
if excludes_file != None :
f = open ( includes_file , ' r ' )
logg . debug ( ' reading excludes rules from {} ' . format ( os . path . realpath ( excludes_file ) ) )
while True :
r = f . readline ( )
if r == ' ' :
break
r = r . rstrip ( )
2022-02-27 12:48:15 +01:00
v = r . split ( " \t " )
2022-01-23 18:28:25 +01:00
sender = None
recipient = None
executable = None
if v [ 0 ] != ' ' :
2022-02-27 12:48:15 +01:00
sender = v [ 0 ] . strip ( ' , ' )
2022-01-23 18:28:25 +01:00
if v [ 1 ] != ' ' :
2022-02-27 12:48:15 +01:00
recipient = v [ 1 ] . strip ( ' , ' )
2022-01-23 18:28:25 +01:00
if v [ 2 ] != ' ' :
2022-02-27 12:48:15 +01:00
executable = v [ 2 ] . strip ( ' , ' )
2022-01-23 18:28:25 +01:00
2022-02-27 12:48:15 +01:00
rule = RuleSimple ( sender , recipient , executable )
rules . exclude ( rule )
2022-01-23 18:28:25 +01:00
return rules
2022-01-24 01:17:27 +01:00
def setup_filter ( chain_spec , cache_dir , include_tx_data , include_block_data ) :
2022-01-30 15:11:54 +01:00
store = None
2022-01-23 18:28:25 +01:00
if cache_dir == None :
2022-01-30 15:11:54 +01:00
logg . warning ( ' no cache dir specified, will discard everything!! ' )
from eth_monitor . store . null import NullStore
store = NullStore ( )
else :
store = FileStore ( chain_spec , cache_dir )
cache_dir = os . path . realpath ( cache_dir )
if cache_dir == None :
import tempfile
cache_dir = tempfile . mkdtemp ( )
logg . info ( ' using chain spec {} and store {} ' . format ( chain_spec , store ) )
2022-01-24 01:17:27 +01:00
RuledFilter . init ( store , include_tx_data = include_tx_data , include_block_data = include_block_data )
2022-01-23 18:28:25 +01:00
2022-02-27 14:52:05 +01:00
return store
2022-01-23 18:49:04 +01:00
def setup_cache_filter ( rules_filter = None ) :
return CacheFilter ( rules_filter = rules_filter )
2021-06-26 14:04:34 +02:00
2021-06-27 11:01:31 +02:00
2022-01-30 20:44:03 +01:00
def setup_backend_resume ( chain_spec , block_offset , block_limit , state_dir , callback , chain_interface , sync_offset = 0 , skip_history = False ) :
2022-01-23 18:28:25 +01:00
syncers = [ ]
syncer_backends = FileBackend . resume ( chain_spec , block_offset , base_dir = state_dir )
2021-06-26 14:04:34 +02:00
if len ( syncer_backends ) == 0 :
initial_block_start = block_offset - 1
if config . get ( ' _SYNC_OFFSET ' ) != None :
initial_block_start = config . get ( ' _SYNC_OFFSET ' )
initial_block_offset = block_offset
2022-01-23 18:28:25 +01:00
if skip_history :
2021-06-26 14:04:34 +02:00
initial_block_start = block_offset
initial_block_offset + = 1
syncer_backends . append ( FileBackend . initial ( chain_spec , initial_block_offset , start_block_height = initial_block_start , base_dir = state_dir ) )
logg . info ( ' found no backends to resume, adding initial sync from history start {} end {} ' . format ( initial_block_start , initial_block_offset ) )
else :
for syncer_backend in syncer_backends :
logg . info ( ' resuming sync session {} ' . format ( syncer_backend ) )
for syncer_backend in syncer_backends :
2022-01-30 20:44:03 +01:00
syncers . append ( HistorySyncer ( syncer_backend , chain_interface , block_callback = callback ) ) #RuledFilter.block_callback))
2021-06-26 14:04:34 +02:00
2021-06-27 11:01:31 +02:00
syncer_backend = FileBackend . live ( chain_spec , block_offset + 1 , base_dir = state_dir )
2022-01-30 20:44:03 +01:00
syncers . append ( HeadSyncer ( syncer_backend , chain_interface , block_callback = callback ) )
2022-01-23 18:28:25 +01:00
return syncers
2022-01-30 15:43:53 +01:00
def setup_backend_single ( chain_spec , block_offset , block_limit , state_dir , callback , chain_interface , sync_offset = 0 , skip_history = False ) :
2022-02-28 22:56:56 +01:00
logg . debug ( ' block limit {} ' . format ( block_limit ) )
syncer_backend = FileBackend . initial ( chain_spec , block_limit , start_block_height = sync_offset , base_dir = state_dir )
2022-01-30 20:44:03 +01:00
syncer = HistorySyncer ( syncer_backend , chain_interface , block_callback = callback )
2022-01-23 18:28:25 +01:00
return [ syncer ]
2022-01-30 15:43:53 +01:00
def setup_backend_head ( chain_spec , block_offset , block_limit , state_dir , callback , chain_interface , sync_offset = 0 , skip_history = False ) :
syncer_backend = FileBackend . live ( chain_spec , block_offset , base_dir = state_dir )
2022-01-30 20:44:03 +01:00
syncer = HeadSyncer ( syncer_backend , chain_interface , block_callback = callback )
2022-01-30 15:43:53 +01:00
return [ syncer ]
2022-01-30 20:44:03 +01:00
def main ( ) :
2022-02-28 22:56:56 +01:00
session_block_offset = 0
if args . head :
session_block_offset = - 1
else :
session_block_offset = args . offset
block_limit = 0
if args . until > 0 :
if not args . head and args . until < = block_offset :
raise ValueError ( ' sync termination block number must be later than offset ( {} >= {} ) ' . format ( block_offset , args . until ) )
block_limit = args . until
2022-02-27 12:48:15 +01:00
2022-01-23 18:28:25 +01:00
o = block_latest ( )
r = rpc . do ( o )
block_offset = int ( strip_0x ( r ) , 16 ) + 1
2022-01-30 15:43:53 +01:00
logg . info ( ' network block height is {} ' . format ( block_offset ) )
2022-01-23 18:28:25 +01:00
2022-02-28 22:56:56 +01:00
if session_block_offset == - 1 :
session_block_offset = block_offset
2022-02-27 12:48:15 +01:00
elif not config . true ( ' _KEEP_ALIVE ' ) :
if block_limit == 0 :
2022-02-28 22:56:56 +01:00
block_limit = block_offset
2022-02-27 12:48:15 +01:00
address_rules = AddressRules ( include_by_default = args . include_default )
address_rules = setup_address_file_rules (
address_rules ,
2022-01-23 18:28:25 +01:00
includes_file = args . includes_file ,
excludes_file = args . excludes_file ,
2022-02-27 12:48:15 +01:00
)
address_rules = setup_address_arg_rules (
address_rules ,
args ,
2022-01-23 18:28:25 +01:00
)
2022-02-27 14:52:05 +01:00
store = setup_filter (
2022-01-23 18:28:25 +01:00
chain_spec ,
args . cache_dir ,
2022-01-24 01:17:27 +01:00
bool ( args . store_tx_data ) ,
bool ( args . store_block_data ) ,
2022-01-23 18:49:04 +01:00
)
cache_filter = setup_cache_filter (
2022-01-23 18:28:25 +01:00
rules_filter = address_rules ,
)
filters = [
cache_filter ,
]
2022-01-23 19:07:58 +01:00
if args . filter != None :
for fltr in args . filter :
m = importlib . import_module ( fltr )
fltr_object = m . Filter ( rules_filter = address_rules )
filters . append ( fltr_object )
2022-01-23 18:28:25 +01:00
syncer_setup_func = None
2022-01-30 15:43:53 +01:00
if config . true ( ' _HEAD ' ) :
syncer_setup_func = setup_backend_head
elif config . true ( ' _SINGLE ' ) :
2022-01-23 18:28:25 +01:00
syncer_setup_func = setup_backend_single
else :
syncer_setup_func = setup_backend_resume
chain_interface = EthChainInterface ( )
syncers = syncer_setup_func (
chain_spec ,
block_offset ,
2022-01-30 15:43:53 +01:00
block_limit ,
2022-01-23 18:28:25 +01:00
state_dir ,
cache_filter . block_callback ,
chain_interface ,
sync_offset = config . get ( ' _SYNC_OFFSET ' ) ,
skip_history = config . true ( ' _NO_HISTORY ' ) ,
)
2022-01-30 20:44:03 +01:00
renderers_mods = [ ]
2022-01-30 21:00:47 +01:00
for renderer in args . renderer :
2022-01-30 20:44:03 +01:00
m = importlib . import_module ( renderer )
renderers_mods . append ( m )
out_filter = OutFilter ( chain_spec , rules_filter = address_rules , renderers = renderers_mods )
2022-01-30 15:43:53 +01:00
filters . append ( out_filter )
2022-02-27 15:21:09 +01:00
use_rpc = rpc
if not args . fresh :
use_rpc = CacheRPC ( rpc , store )
2021-06-26 14:04:34 +02:00
i = 0
for syncer in syncers :
2022-01-23 18:28:25 +01:00
logg . info ( ' running syncer index {} {} ' . format ( i , str ( syncer ) ) )
2021-06-26 14:04:34 +02:00
for f in filters :
syncer . add_filter ( f )
2022-02-27 15:21:09 +01:00
r = syncer . loop ( int ( config . get ( ' SYNCER_LOOP_INTERVAL ' ) ) , use_rpc )
2021-06-26 14:04:34 +02:00
sys . stderr . write ( " sync {} done at block {} \n " . format ( syncer , r ) )
i + = 1
2022-01-30 20:44:03 +01:00
if __name__ == ' __main__ ' :
main ( )