2021-06-26 14:04:34 +02:00
# standard imports
import sys
import signal
import argparse
import confini
import logging
import os
2022-01-30 20:44:03 +01:00
import importlib
2022-03-30 10:11:46 +02:00
import uuid
2022-03-31 19:32:25 +02:00
import datetime
2021-06-26 14:04:34 +02:00
# external imports
from chainlib . chain import ChainSpec
from chainlib . eth . connection import EthHTTPConnection
from chainlib . eth . block import block_latest
from hexathon import (
strip_0x ,
add_0x ,
)
2022-03-30 10:11:46 +02:00
from chainsyncer . store . fs import SyncFsStore
from chainsyncer . driver . chain_interface import ChainInterfaceDriver
from chainsyncer . error import SyncDone
2022-03-01 13:10:35 +01:00
from eth_cache . rpc import CacheRPC
from eth_cache . store . file import FileStore
2021-06-26 14:04:34 +02:00
# local imports
from eth_monitor . chain import EthChainInterface
2022-01-23 23:07:59 +01:00
from eth_monitor . filters . cache import Filter as CacheFilter
2022-02-27 12:48:15 +01:00
from eth_monitor . rules import (
AddressRules ,
RuleSimple ,
2022-04-05 13:44:15 +02:00
RuleMethod ,
2022-02-27 12:48:15 +01:00
)
2022-01-23 18:49:04 +01:00
from eth_monitor . filters import RuledFilter
2022-01-30 15:43:53 +01:00
from eth_monitor . filters . out import OutFilter
2022-03-06 20:42:01 +01:00
from eth_monitor . config import override , list_from_prefix
2021-06-26 14:04:34 +02:00
2022-04-02 13:29:12 +02:00
logging . STATETRACE = 5
2021-06-26 14:04:34 +02:00
logging . basicConfig ( level = logging . WARNING )
logg = logging . getLogger ( )
default_eth_provider = os . environ . get ( ' RPC_PROVIDER ' )
if default_eth_provider == None :
default_eth_provider = os . environ . get ( ' ETH_PROVIDER ' , ' http://localhost:8545 ' )
script_dir = os . path . realpath ( os . path . dirname ( __file__ ) )
exec_dir = os . path . realpath ( os . getcwd ( ) )
2022-01-30 20:44:03 +01:00
#default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config'))
base_config_dir = os . path . join ( script_dir , ' .. ' , ' data ' , ' config ' )
2021-06-26 14:04:34 +02:00
argparser = argparse . ArgumentParser ( ' master eth events monitor ' )
argparser . add_argument ( ' -p ' , ' --provider ' , dest = ' p ' , default = default_eth_provider , type = str , help = ' Web3 provider url (http only) ' )
2022-01-30 20:44:03 +01:00
argparser . add_argument ( ' -c ' , type = str , help = ' config file ' )
argparser . add_argument ( ' -i ' , ' --chain-spec ' , dest = ' i ' , type = str , help = ' Chain specification string ' )
2022-01-23 18:28:25 +01:00
argparser . add_argument ( ' --offset ' , type = int , default = 0 , help = ' Start sync on this block ' )
2022-02-27 12:48:15 +01:00
argparser . add_argument ( ' --until ' , type = int , default = 0 , help = ' Terminate sync on this block ' )
2022-01-30 15:43:53 +01:00
argparser . add_argument ( ' --head ' , action = ' store_true ' , help = ' Start at current block height (overrides --offset, assumes --keep-alive) ' )
2021-06-26 14:04:34 +02:00
argparser . add_argument ( ' --seq ' , action = ' store_true ' , help = ' Use sequential rpc ids ' )
argparser . add_argument ( ' --skip-history ' , action = ' store_true ' , dest = ' skip_history ' , help = ' Skip history sync ' )
2022-02-27 12:48:15 +01:00
argparser . add_argument ( ' --keep-alive ' , action = ' store_true ' , dest = ' keep_alive ' , help = ' Continue to sync head after history sync complete ' )
argparser . add_argument ( ' --input ' , default = [ ] , action = ' append ' , type = str , help = ' Add input (recipient) addresses to includes list ' )
argparser . add_argument ( ' --output ' , default = [ ] , action = ' append ' , type = str , help = ' Add output (sender) addresses to includes list ' )
argparser . add_argument ( ' --exec ' , default = [ ] , action = ' append ' , type = str , help = ' Add exec (contract) addresses to includes list ' )
2022-04-05 13:44:15 +02:00
argparser . add_argument ( ' --data ' , default = [ ] , action = ' append ' , type = str , help = ' Add data strings to include list ' )
argparser . add_argument ( ' --x-data ' , default = [ ] , action = ' append ' , dest = ' xdata ' , type = str , help = ' Add data strings to exclude list ' )
2022-02-27 12:48:15 +01:00
argparser . add_argument ( ' --address ' , default = [ ] , action = ' append ' , type = str , help = ' Add addresses as input, output and exec to includes list ' )
argparser . add_argument ( ' --x-input ' , default = [ ] , action = ' append ' , type = str , dest = ' xinput ' , help = ' Add input (recipient) addresses to excludes list ' )
argparser . add_argument ( ' --x-output ' , default = [ ] , action = ' append ' , type = str , dest = ' xoutput ' , help = ' Add output (sender) addresses to excludes list ' )
argparser . add_argument ( ' --x-exec ' , default = [ ] , action = ' append ' , type = str , dest = ' xexec ' , help = ' Add exec (contract) addresses to excludes list ' )
argparser . add_argument ( ' --x-address ' , default = [ ] , action = ' append ' , type = str , dest = ' xaddress ' , help = ' Add addresses as input, output and exec to excludes list ' )
2022-01-23 18:28:25 +01:00
argparser . add_argument ( ' --includes-file ' , type = str , dest = ' includes_file ' , help = ' Load include rules from file ' )
2022-01-24 01:17:27 +01:00
argparser . add_argument ( ' --include-default ' , dest = ' include_default ' , action = ' store_true ' , help = ' Include all transactions by default ' )
argparser . add_argument ( ' --store-tx-data ' , dest = ' store_tx_data ' , action = ' store_true ' , help = ' Include all transaction data objects by default ' )
argparser . add_argument ( ' --store-block-data ' , dest = ' store_block_data ' , action = ' store_true ' , help = ' Include all block data objects by default ' )
2022-02-27 12:48:15 +01:00
argparser . add_argument ( ' --address-file ' , type = str , dest = ' excludes_file ' , help = ' Load exclude rules from file ' )
2022-02-25 15:23:38 +01:00
argparser . add_argument ( ' --renderer ' , type = str , action = ' append ' , default = [ ] , help = ' Python modules to dynamically load for rendering of transaction output ' )
argparser . add_argument ( ' --filter ' , type = str , action = ' append ' , help = ' Add python module filter path ' )
2022-01-23 18:28:25 +01:00
argparser . add_argument ( ' --cache-dir ' , dest = ' cache_dir ' , type = str , help = ' Directory to store tx data ' )
2022-03-30 10:11:46 +02:00
argparser . add_argument ( ' --state-dir ' , dest = ' state_dir ' , default = exec_dir , type = str , help = ' Directory to store sync state ' )
2022-02-27 14:52:05 +01:00
argparser . add_argument ( ' --fresh ' , action = ' store_true ' , help = ' Do not read block and tx data from cache, even if available ' )
2022-01-23 18:28:25 +01:00
argparser . add_argument ( ' --single ' , action = ' store_true ' , help = ' Execute a single sync, regardless of previous states ' )
2022-03-30 10:11:46 +02:00
argparser . add_argument ( ' --session-id ' , dest = ' session_id ' , type = str , help = ' Use state from specified session id ' )
2021-06-26 14:04:34 +02:00
argparser . add_argument ( ' -v ' , action = ' store_true ' , help = ' Be verbose ' )
argparser . add_argument ( ' -vv ' , action = ' store_true ' , help = ' Be more verbose ' )
2022-02-27 12:48:15 +01:00
argparser . add_argument ( ' -vvv ' , action = ' store_true ' , help = ' Be incredibly verbose ' )
2021-06-26 14:04:34 +02:00
args = argparser . parse_args ( sys . argv [ 1 : ] )
2022-02-27 12:48:15 +01:00
if args . vvv :
2022-04-02 13:29:12 +02:00
logg . setLevel ( logging . STATETRACE )
2022-02-27 12:48:15 +01:00
else :
logging . getLogger ( ' chainlib.connection ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainlib.eth.tx ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.driver.history ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.driver.head ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.backend.file ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.backend.sql ' ) . setLevel ( logging . WARNING )
logging . getLogger ( ' chainsyncer.filter ' ) . setLevel ( logging . WARNING )
if args . vv :
logg . setLevel ( logging . DEBUG )
elif args . v :
logg . setLevel ( logging . INFO )
2021-06-26 14:04:34 +02:00
config_dir = args . c
2022-01-30 20:44:03 +01:00
config = confini . Config ( base_config_dir , os . environ . get ( ' CONFINI_ENV_PREFIX ' ) , override_dirs = args . c )
2021-06-26 14:04:34 +02:00
config . process ( )
args_override = {
' CHAIN_SPEC ' : getattr ( args , ' i ' ) ,
}
config . dict_override ( args_override , ' cli ' )
config . add ( args . offset , ' _SYNC_OFFSET ' , True )
config . add ( args . skip_history , ' _NO_HISTORY ' , True )
2022-01-23 18:28:25 +01:00
config . add ( args . single , ' _SINGLE ' , True )
2022-01-30 15:43:53 +01:00
config . add ( args . head , ' _HEAD ' , True )
2022-03-08 18:11:15 +01:00
config . add ( args . keep_alive , ' _KEEP_ALIVE ' , True )
2022-03-30 10:11:46 +02:00
config . add ( os . path . realpath ( args . state_dir ) , ' _STATE_DIR ' , True )
config . add ( args . cache_dir , ' _CACHE_DIR ' , True )
config . add ( args . session_id , ' _SESSION_ID ' , True )
2022-03-06 20:42:01 +01:00
override ( config , ' renderer ' , env = os . environ , args = args )
override ( config , ' filter ' , env = os . environ , args = args )
2022-03-30 10:11:46 +02:00
if config . get ( ' _SESSION_ID ' ) == None :
2022-03-31 19:32:25 +02:00
if config . get ( ' _SINGLE ' ) :
config . add ( str ( uuid . uuid4 ( ) ) , ' _SESSION_ID ' , True )
else :
2022-03-30 10:11:46 +02:00
config . add ( ' default ' , ' _SESSION_ID ' , True )
2022-03-06 20:42:01 +01:00
logg . debug ( ' loaded config: \n {} ' . format ( config ) )
2021-06-26 14:04:34 +02:00
chain_spec = ChainSpec . from_chain_str ( args . i )
rpc_id_generator = None
if args . seq :
rpc_id_generator = IntSequenceGenerator ( )
auth = None
if os . environ . get ( ' RPC_AUTHENTICATION ' ) == ' basic ' :
from chainlib . auth import BasicAuth
auth = BasicAuth ( os . environ [ ' RPC_USERNAME ' ] , os . environ [ ' RPC_PASSWORD ' ] )
rpc = EthHTTPConnection ( args . p )
2022-02-27 12:48:15 +01:00
def setup_address_arg_rules ( rules , args ) :
include_inputs = args . input
include_outputs = args . output
include_exec = args . exec
exclude_inputs = args . xinput
exclude_outputs = args . xoutput
exclude_exec = args . xexec
for address in args . address :
include_inputs . append ( address )
include_outputs . append ( address )
include_exec . append ( address )
for address in args . xaddress :
exclude_inputs . append ( address )
exclude_outputs . append ( address )
exclude_exec . append ( address )
2022-04-05 13:44:15 +02:00
includes = RuleSimple ( include_outputs , include_inputs , include_exec , description = ' INCLUDE ' )
2022-02-27 12:48:15 +01:00
rules . include ( includes )
2022-01-23 18:28:25 +01:00
2022-04-05 13:44:15 +02:00
excludes = RuleSimple ( exclude_outputs , exclude_inputs , exclude_exec , description = ' EXCLUDE ' )
rules . exclude ( excludes )
return rules
def setup_data_arg_rules ( rules , args ) :
include_data = args . data
exclude_data = args . xdata
includes = RuleMethod ( include_data , description = ' INCLUDE ' )
rules . include ( includes )
excludes = RuleMethod ( exclude_data , description = ' EXCLUDE ' )
2022-02-27 12:48:15 +01:00
rules . exclude ( excludes )
return rules
def setup_address_file_rules ( rules , includes_file = None , excludes_file = None , include_default = False , include_block_default = False ) :
2022-01-23 18:28:25 +01:00
if includes_file != None :
f = open ( includes_file , ' r ' )
logg . debug ( ' reading includes rules from {} ' . format ( os . path . realpath ( includes_file ) ) )
while True :
r = f . readline ( )
if r == ' ' :
break
r = r . rstrip ( )
2022-02-27 12:48:15 +01:00
v = r . split ( " \t " )
2022-01-23 18:28:25 +01:00
2022-02-27 12:48:15 +01:00
sender = [ ]
recipient = [ ]
executable = [ ]
2022-01-23 18:28:25 +01:00
2022-02-27 12:48:15 +01:00
try :
if v [ 0 ] != ' ' :
sender = v [ 0 ] . split ( ' , ' )
except IndexError :
pass
2022-01-23 18:28:25 +01:00
2022-02-27 12:48:15 +01:00
try :
if v [ 1 ] != ' ' :
recipient = v [ 1 ] . split ( ' , ' )
except IndexError :
pass
try :
if v [ 2 ] != ' ' :
executable = v [ 2 ] . split ( ' , ' )
except IndexError :
pass
rule = RuleSimple ( sender , recipient , executable )
rules . include ( rule )
2022-01-23 18:28:25 +01:00
if excludes_file != None :
f = open ( includes_file , ' r ' )
logg . debug ( ' reading excludes rules from {} ' . format ( os . path . realpath ( excludes_file ) ) )
while True :
r = f . readline ( )
if r == ' ' :
break
r = r . rstrip ( )
2022-02-27 12:48:15 +01:00
v = r . split ( " \t " )
2022-01-23 18:28:25 +01:00
sender = None
recipient = None
executable = None
if v [ 0 ] != ' ' :
2022-02-27 12:48:15 +01:00
sender = v [ 0 ] . strip ( ' , ' )
2022-01-23 18:28:25 +01:00
if v [ 1 ] != ' ' :
2022-02-27 12:48:15 +01:00
recipient = v [ 1 ] . strip ( ' , ' )
2022-01-23 18:28:25 +01:00
if v [ 2 ] != ' ' :
2022-02-27 12:48:15 +01:00
executable = v [ 2 ] . strip ( ' , ' )
2022-01-23 18:28:25 +01:00
2022-02-27 12:48:15 +01:00
rule = RuleSimple ( sender , recipient , executable )
rules . exclude ( rule )
2022-01-23 18:28:25 +01:00
return rules
2022-01-24 01:17:27 +01:00
def setup_filter ( chain_spec , cache_dir , include_tx_data , include_block_data ) :
2022-01-30 15:11:54 +01:00
store = None
2022-01-23 18:28:25 +01:00
if cache_dir == None :
2022-01-30 15:11:54 +01:00
logg . warning ( ' no cache dir specified, will discard everything!! ' )
2022-03-05 08:01:04 +01:00
from eth_cache . store . null import NullStore
2022-01-30 15:11:54 +01:00
store = NullStore ( )
else :
store = FileStore ( chain_spec , cache_dir )
cache_dir = os . path . realpath ( cache_dir )
if cache_dir == None :
import tempfile
cache_dir = tempfile . mkdtemp ( )
logg . info ( ' using chain spec {} and store {} ' . format ( chain_spec , store ) )
2022-01-24 01:17:27 +01:00
RuledFilter . init ( store , include_tx_data = include_tx_data , include_block_data = include_block_data )
2022-01-23 18:28:25 +01:00
2022-02-27 14:52:05 +01:00
return store
2022-01-23 18:49:04 +01:00
def setup_cache_filter ( rules_filter = None ) :
return CacheFilter ( rules_filter = rules_filter )
2021-06-26 14:04:34 +02:00
2021-06-27 11:01:31 +02:00
2022-03-30 10:11:46 +02:00
def pre_callback ( ) :
logg . debug ( ' starting sync loop iteration ' )
2022-03-08 18:11:15 +01:00
2022-01-23 18:28:25 +01:00
2022-03-30 10:11:46 +02:00
def post_callback ( ) :
logg . debug ( ' ending sync loop iteration ' )
2022-01-23 18:28:25 +01:00
2022-03-30 10:11:46 +02:00
def block_callback ( block , tx ) :
2022-03-31 19:32:25 +02:00
logg . info ( ' processing {} {} ' . format ( block , datetime . datetime . fromtimestamp ( block . timestamp ) ) )
2022-01-30 15:43:53 +01:00
2022-04-02 13:29:12 +02:00
def state_change_callback ( k , old_state , new_state ) :
logg . log ( logging . STATETRACE , ' state change: {} {} -> {} ' . format ( k , old_state , new_state ) )
def filter_change_callback ( k , old_state , new_state ) :
logg . log ( logging . STATETRACE , ' filter change: {} {} -> {} ' . format ( k , old_state , new_state ) )
2022-01-30 20:44:03 +01:00
def main ( ) :
2022-03-30 10:11:46 +02:00
o = block_latest ( )
r = rpc . do ( o )
block_offset = int ( strip_0x ( r ) , 16 ) + 1
logg . info ( ' network block height is {} ' . format ( block_offset ) )
2022-03-08 18:11:15 +01:00
keep_alive = False
2022-02-28 22:56:56 +01:00
session_block_offset = 0
2022-03-30 10:11:46 +02:00
block_limit = 0
2022-02-28 22:56:56 +01:00
if args . head :
2022-03-30 10:11:46 +02:00
session_block_offset = block_offset
block_limit = - 1
2022-03-08 18:11:15 +01:00
keep_alive = True
2022-02-28 22:56:56 +01:00
else :
session_block_offset = args . offset
if args . until > 0 :
2022-04-02 09:32:31 +02:00
if not args . head and args . until < = session_block_offset :
raise ValueError ( ' sync termination block number must be later than offset ( {} >= {} ) ' . format ( session_block_offset , args . until ) )
2022-02-28 22:56:56 +01:00
block_limit = args . until
2022-03-08 18:11:15 +01:00
elif config . true ( ' _KEEP_ALIVE ' ) :
keep_alive = True
2022-03-30 10:11:46 +02:00
block_limit = - 1
2022-01-23 18:28:25 +01:00
2022-02-28 22:56:56 +01:00
if session_block_offset == - 1 :
session_block_offset = block_offset
2022-02-27 12:48:15 +01:00
elif not config . true ( ' _KEEP_ALIVE ' ) :
if block_limit == 0 :
2022-02-28 22:56:56 +01:00
block_limit = block_offset
2022-02-27 12:48:15 +01:00
address_rules = AddressRules ( include_by_default = args . include_default )
2022-04-05 13:44:15 +02:00
address_rules = setup_data_arg_rules (
address_rules ,
args ,
)
2022-02-27 12:48:15 +01:00
address_rules = setup_address_file_rules (
address_rules ,
2022-01-23 18:28:25 +01:00
includes_file = args . includes_file ,
excludes_file = args . excludes_file ,
2022-02-27 12:48:15 +01:00
)
address_rules = setup_address_arg_rules (
address_rules ,
args ,
2022-01-23 18:28:25 +01:00
)
2022-02-27 14:52:05 +01:00
store = setup_filter (
2022-01-23 18:28:25 +01:00
chain_spec ,
2022-03-30 10:11:46 +02:00
config . get ( ' _CACHE_DIR ' ) ,
2022-01-24 01:17:27 +01:00
bool ( args . store_tx_data ) ,
bool ( args . store_block_data ) ,
2022-01-23 18:49:04 +01:00
)
cache_filter = setup_cache_filter (
2022-01-23 18:28:25 +01:00
rules_filter = address_rules ,
)
filters = [
cache_filter ,
]
2022-03-06 20:42:01 +01:00
for fltr in list_from_prefix ( config , ' filter ' ) :
m = importlib . import_module ( fltr )
fltr_object = m . Filter ( rules_filter = address_rules )
filters . append ( fltr_object )
logg . info ( ' using filter module {} ' . format ( fltr ) )
renderers_mods = [ ]
for renderer in list_from_prefix ( config , ' renderer ' ) :
m = importlib . import_module ( renderer )
renderers_mods . append ( m )
logg . info ( ' using renderer module {} ' . format ( renderer ) )
2022-01-23 18:28:25 +01:00
chain_interface = EthChainInterface ( )
2022-01-30 20:44:03 +01:00
out_filter = OutFilter ( chain_spec , rules_filter = address_rules , renderers = renderers_mods )
2022-01-30 15:43:53 +01:00
filters . append ( out_filter )
2022-03-30 10:11:46 +02:00
2022-04-02 13:29:12 +02:00
sync_store = SyncFsStore ( config . get ( ' _STATE_DIR ' ) , session_id = config . get ( ' _SESSION_ID ' ) , state_event_callback = state_change_callback , filter_state_event_callback = filter_change_callback )
2022-03-30 10:11:46 +02:00
logg . info ( ' session is {} ' . format ( sync_store . session_id ) )
for fltr in filters :
sync_store . register ( fltr )
drv = ChainInterfaceDriver ( sync_store , chain_interface , offset = session_block_offset , target = block_limit , pre_callback = pre_callback , post_callback = post_callback , block_callback = block_callback )
2022-02-27 15:21:09 +01:00
use_rpc = rpc
if not args . fresh :
use_rpc = CacheRPC ( rpc , store )
2022-03-06 20:42:01 +01:00
2021-06-26 14:04:34 +02:00
i = 0
2022-03-30 10:11:46 +02:00
try :
r = drv . run ( use_rpc )
except SyncDone as e :
sys . stderr . write ( " sync {} done at block {} \n " . format ( drv , e ) )
2021-06-26 14:04:34 +02:00
2022-03-30 10:11:46 +02:00
i + = 1
2022-01-30 20:44:03 +01:00
if __name__ == ' __main__ ' :
main ( )