mirror of
				git://holbrook.no/eth-monitor.git
				synced 2025-11-04 03:21:18 +01:00 
			
		
		
		
	Implement chainlib 0.3.0 structure
This commit is contained in:
		
							parent
							
								
									d5b4a8d362
								
							
						
					
					
						commit
						02d27ef167
					
				@ -1,2 +1,2 @@
 | 
			
		||||
from .arg import process_flags
 | 
			
		||||
from .arg import process_args
 | 
			
		||||
from .config import process_config
 | 
			
		||||
 | 
			
		||||
@ -1,4 +1,4 @@
 | 
			
		||||
def process_flags(argparser, flags):
 | 
			
		||||
def process_args(argparser, args, flags):
 | 
			
		||||
    # session flags
 | 
			
		||||
    argparser.add_argument('--state-dir', dest='state_dir', type=str, help='Directory to store sync state')
 | 
			
		||||
    argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id')
 | 
			
		||||
 | 
			
		||||
@ -22,28 +22,41 @@ from hexathon import (
 | 
			
		||||
#from chainsyncer.store.fs import SyncFsStore
 | 
			
		||||
from chainsyncer.driver.chain_interface import ChainInterfaceDriver
 | 
			
		||||
from chainsyncer.error import SyncDone
 | 
			
		||||
from chainsyncer.data import config_dir as chainsyncer_config_dir
 | 
			
		||||
from chainlib.settings import ChainSettings
 | 
			
		||||
from chainlib.eth.settings import process_settings
 | 
			
		||||
from chainlib.eth.cli.arg import (
 | 
			
		||||
        Arg,
 | 
			
		||||
        ArgFlag,
 | 
			
		||||
        process_args,
 | 
			
		||||
        )
 | 
			
		||||
from chainlib.eth.cli.config import (
 | 
			
		||||
        Config,
 | 
			
		||||
        process_config,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
# local imports
 | 
			
		||||
from eth_monitor.callback import (
 | 
			
		||||
        pre_callback,
 | 
			
		||||
        post_callback,
 | 
			
		||||
        )
 | 
			
		||||
from eth_monitor.settings import EthMonitorSettings
 | 
			
		||||
import eth_monitor.cli
 | 
			
		||||
from eth_monitor.cli.log import process_log
 | 
			
		||||
from eth_monitor.settings import process_settings as process_settings_local
 | 
			
		||||
 | 
			
		||||
logging.STATETRACE = 5
 | 
			
		||||
logging.basicConfig(level=logging.WARNING)
 | 
			
		||||
logg = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
script_dir = os.path.realpath(os.path.dirname(__file__))
 | 
			
		||||
config_dir = os.path.join(script_dir, '..', 'data', 'config')
 | 
			
		||||
 | 
			
		||||
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC | chainlib.cli.Flag.PROVIDER
 | 
			
		||||
argparser = chainlib.cli.ArgumentParser(arg_flags)
 | 
			
		||||
eth_monitor.cli.process_flags(argparser, 0)
 | 
			
		||||
arg_flags = ArgFlag()
 | 
			
		||||
arg = Arg(arg_flags)
 | 
			
		||||
flags = arg_flags.STD_BASE_READ | arg_flags.PROVIDER | arg_flags.CHAIN_SPEC | arg_flags.VERYVERBOSE
 | 
			
		||||
 | 
			
		||||
argparser = chainlib.eth.cli.ArgumentParser()
 | 
			
		||||
argparser.add_argument('--list-backends', dest='list_backends', action='store_true', help='List built-in store backends')
 | 
			
		||||
argparser.add_argument('-vvv', action='store_true', help='Be incredibly verbose')
 | 
			
		||||
argparser = process_args(argparser, arg, flags)
 | 
			
		||||
eth_monitor.cli.process_args(argparser, arg, flags)
 | 
			
		||||
 | 
			
		||||
sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD
 | 
			
		||||
chainsyncer.cli.process_flags(argparser, sync_flags)
 | 
			
		||||
@ -59,29 +72,23 @@ if args.list_backends:
 | 
			
		||||
        print(v)
 | 
			
		||||
    sys.exit(0)
 | 
			
		||||
 | 
			
		||||
logging.getLogger('chainlib.connection').setLevel(logging.WARNING)
 | 
			
		||||
logging.getLogger('chainlib.eth.tx').setLevel(logging.WARNING)
 | 
			
		||||
logging.getLogger('chainlib.eth.src').setLevel(logging.WARNING)
 | 
			
		||||
 | 
			
		||||
if args.vvv:
 | 
			
		||||
    logg.setLevel(logging.STATETRACE)
 | 
			
		||||
else:
 | 
			
		||||
    if args.vv:
 | 
			
		||||
        logg.setLevel(logging.DEBUG)
 | 
			
		||||
    elif args.v:
 | 
			
		||||
        logg.setLevel(logging.INFO)
 | 
			
		||||
logg = process_log(args, logg)
 | 
			
		||||
 | 
			
		||||
base_config_dir = [
 | 
			
		||||
    chainsyncer.cli.config_dir,
 | 
			
		||||
    config_dir,
 | 
			
		||||
        ]
 | 
			
		||||
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
 | 
			
		||||
config = Config()
 | 
			
		||||
config.add_schema_dir(config_dir)
 | 
			
		||||
config.add_schema_dir(chainsyncer_config_dir)
 | 
			
		||||
config = process_config(config, arg, args, flags)
 | 
			
		||||
config = chainsyncer.cli.process_config(config, args, sync_flags)
 | 
			
		||||
config = eth_monitor.cli.process_config(config, args, 0)
 | 
			
		||||
config = eth_monitor.cli.process_config(config, args, flags)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
settings = EthMonitorSettings()
 | 
			
		||||
settings.process(config)
 | 
			
		||||
settings = ChainSettings()
 | 
			
		||||
settings = process_settings(settings, config)
 | 
			
		||||
settings = process_settings_local(settings, config)
 | 
			
		||||
logg.debug('loaded settings:\n{}'.format(settings))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -99,7 +106,7 @@ def main():
 | 
			
		||||
            )
 | 
			
		||||
    
 | 
			
		||||
    try:
 | 
			
		||||
        r = drv.run(settings.get('RPC'))
 | 
			
		||||
        r = drv.run(settings.get('CONN'))
 | 
			
		||||
    except SyncDone as e:
 | 
			
		||||
        sys.stderr.write("sync {} done at block {}\n".format(drv, e))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -7,8 +7,8 @@ import tempfile
 | 
			
		||||
 | 
			
		||||
# external imports
 | 
			
		||||
from chainlib.settings import ChainSettings
 | 
			
		||||
from chainsyncer.settings import ChainsyncerSettings
 | 
			
		||||
from chainlib.eth.connection import EthHTTPConnection
 | 
			
		||||
from chainsyncer.settings import *
 | 
			
		||||
from eth_monitor.chain import EthChainInterface
 | 
			
		||||
from chainlib.eth.address import is_address
 | 
			
		||||
from eth_cache.rpc import CacheRPC
 | 
			
		||||
@ -37,301 +37,326 @@ from eth_monitor.filters.block import Filter as BlockFilter
 | 
			
		||||
logg = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class EthMonitorSettings(ChainsyncerSettings):
 | 
			
		||||
 | 
			
		||||
    def process_monitor_session(self, config):
 | 
			
		||||
        session_id = config.get('_SESSION_ID')
 | 
			
		||||
        if session_id == None:
 | 
			
		||||
            if config.get('_SINGLE'):
 | 
			
		||||
                session_id = str(uuid.uuid4())
 | 
			
		||||
            else:
 | 
			
		||||
                session_id = 'default'
 | 
			
		||||
        
 | 
			
		||||
        self.o['SESSION_ID'] = session_id
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_monitor_session_dir(self, config):
 | 
			
		||||
        syncer_store_module = None
 | 
			
		||||
        syncer_store_class = None
 | 
			
		||||
        state_dir = None
 | 
			
		||||
        if config.get('SYNCER_BACKEND') == 'mem':
 | 
			
		||||
            syncer_store_module = importlib.import_module('chainsyncer.store.mem')
 | 
			
		||||
            syncer_store_class = getattr(syncer_store_module, 'SyncMemStore')
 | 
			
		||||
def process_monitor_session(settings, config):
 | 
			
		||||
    session_id = config.get('_SESSION_ID')
 | 
			
		||||
    if session_id == None:
 | 
			
		||||
        if config.get('_SINGLE'):
 | 
			
		||||
            session_id = str(uuid.uuid4())
 | 
			
		||||
        else:
 | 
			
		||||
            if config.get('SYNCER_BACKEND') == 'fs': 
 | 
			
		||||
                syncer_store_module = importlib.import_module('chainsyncer.store.fs')
 | 
			
		||||
                syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
 | 
			
		||||
            elif config.get('SYNCER_BACKEND') == 'rocksdb':
 | 
			
		||||
                syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
 | 
			
		||||
                syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
 | 
			
		||||
            else:
 | 
			
		||||
                syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
 | 
			
		||||
                syncer_store_class = getattr(syncer_store_module, 'SyncStore')
 | 
			
		||||
            state_dir = os.path.join(config.get('ETHMONITOR_STATE_DIR'), config.get('SYNCER_BACKEND'))
 | 
			
		||||
            os.makedirs(state_dir, exist_ok=True)
 | 
			
		||||
        logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
 | 
			
		||||
 | 
			
		||||
        session_dir = os.path.join(state_dir, self.o['SESSION_ID'])
 | 
			
		||||
        sync_store = syncer_store_class(session_dir, session_id=self.o['SESSION_ID'], state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
 | 
			
		||||
 | 
			
		||||
        self.o['STATE_DIR'] = state_dir
 | 
			
		||||
        self.o['SESSION_DIR'] = session_dir
 | 
			
		||||
        self.o['SYNC_STORE'] = sync_store
 | 
			
		||||
            session_id = 'default'
 | 
			
		||||
    
 | 
			
		||||
    settings.set('SESSION_ID', session_id)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_address_arg_rules(self, config):
 | 
			
		||||
        category = {
 | 
			
		||||
            'input': {
 | 
			
		||||
                'i': [],
 | 
			
		||||
                'x': [],
 | 
			
		||||
                },
 | 
			
		||||
            'output': {
 | 
			
		||||
                'i': [],
 | 
			
		||||
                'x': [],
 | 
			
		||||
                },
 | 
			
		||||
            'exec':  {
 | 
			
		||||
                'i': [],
 | 
			
		||||
                'x': [],
 | 
			
		||||
                },
 | 
			
		||||
            }
 | 
			
		||||
        for rules_arg in [
 | 
			
		||||
                'input',
 | 
			
		||||
                'output',
 | 
			
		||||
                'exec',
 | 
			
		||||
                ]:
 | 
			
		||||
            (vy, vn) = to_config_names(rules_arg)
 | 
			
		||||
            for address in config.get(vy):
 | 
			
		||||
                if not is_address(address):
 | 
			
		||||
                    raise ValueError('invalid address in config {}: {}'.format(vy, address))
 | 
			
		||||
                category[rules_arg]['i'].append(address)
 | 
			
		||||
            for address in config.get(vn):
 | 
			
		||||
                if not is_address(address):
 | 
			
		||||
                    raise ValueError('invalid address in config {}: {}'.format(vn, address))
 | 
			
		||||
                category[rules_arg]['x'].append(address)
 | 
			
		||||
 | 
			
		||||
        includes = RuleSimple(
 | 
			
		||||
                category['output']['i'],
 | 
			
		||||
                category['input']['i'],
 | 
			
		||||
                category['exec']['i'],
 | 
			
		||||
                description='INCLUDE',
 | 
			
		||||
                )
 | 
			
		||||
        self.o['RULES'].include(includes)
 | 
			
		||||
 | 
			
		||||
        excludes = RuleSimple(
 | 
			
		||||
                category['output']['x'],
 | 
			
		||||
                category['input']['x'],
 | 
			
		||||
                category['exec']['x'],
 | 
			
		||||
                description='EXCLUDE',
 | 
			
		||||
                )
 | 
			
		||||
        self.o['RULES'].exclude(excludes)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_data_arg_rules(self, config): #rules, args):
 | 
			
		||||
        include_data = []
 | 
			
		||||
        for v in config.get('ETHMONITOR_DATA'):
 | 
			
		||||
            include_data.append(v.lower())
 | 
			
		||||
        exclude_data = []
 | 
			
		||||
        for v in config.get('ETHMONITOR_X_DATA'):
 | 
			
		||||
            exclude_data.append(v.lower())
 | 
			
		||||
 | 
			
		||||
        includes = RuleMethod(include_data, description='INCLUDE')
 | 
			
		||||
        self.o['RULES'].include(includes)
 | 
			
		||||
       
 | 
			
		||||
        excludes = RuleMethod(exclude_data, description='EXCLUDE')
 | 
			
		||||
        self.o['RULES'].exclude(excludes)
 | 
			
		||||
 | 
			
		||||
        include_data = []
 | 
			
		||||
        for v in config.get('ETHMONITOR_DATA_IN'):
 | 
			
		||||
            include_data.append(v.lower())
 | 
			
		||||
        exclude_data = []
 | 
			
		||||
        for v in config.get('ETHMONITOR_X_DATA_IN'):
 | 
			
		||||
            exclude_data.append(v.lower())
 | 
			
		||||
 | 
			
		||||
        includes = RuleData(include_data, description='INCLUDE')
 | 
			
		||||
        self.o['RULES'].include(includes)
 | 
			
		||||
       
 | 
			
		||||
        excludes = RuleData(exclude_data, description='EXCLUDE')
 | 
			
		||||
        self.o['RULES'].exclude(excludes)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_address_file_rules(self, config): #rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False):
 | 
			
		||||
        includes_file = config.get('ETHMONITOR_INCLUDES_FILE')
 | 
			
		||||
        if includes_file != None:
 | 
			
		||||
            f = open(includes_file, 'r')
 | 
			
		||||
            logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file)))
 | 
			
		||||
            while True:
 | 
			
		||||
                r = f.readline()
 | 
			
		||||
                if r == '':
 | 
			
		||||
                    break
 | 
			
		||||
                r = r.rstrip()
 | 
			
		||||
                v = r.split("\t")
 | 
			
		||||
 | 
			
		||||
                sender = []
 | 
			
		||||
                recipient = []
 | 
			
		||||
                executable = []
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    if v[0] != '':
 | 
			
		||||
                        sender = v[0].split(',')
 | 
			
		||||
                except IndexError:
 | 
			
		||||
                    pass
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    if v[1] != '':
 | 
			
		||||
                        recipient = v[1].split(',')
 | 
			
		||||
                except IndexError:
 | 
			
		||||
                    pass
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    if v[2] != '':
 | 
			
		||||
                        executable = v[2].split(',')
 | 
			
		||||
                except IndexError:
 | 
			
		||||
                    pass
 | 
			
		||||
 | 
			
		||||
                rule = RuleSimple(sender, recipient, executable)
 | 
			
		||||
                rules.include(rule)
 | 
			
		||||
 | 
			
		||||
        excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE')
 | 
			
		||||
        if excludes_file != None:
 | 
			
		||||
            f = open(includes_file, 'r')
 | 
			
		||||
            logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file)))
 | 
			
		||||
            while True:
 | 
			
		||||
                r = f.readline()
 | 
			
		||||
                if r == '':
 | 
			
		||||
                    break
 | 
			
		||||
                r = r.rstrip()
 | 
			
		||||
                v = r.split("\t")
 | 
			
		||||
 | 
			
		||||
                sender = None
 | 
			
		||||
                recipient = None
 | 
			
		||||
                executable = None
 | 
			
		||||
 | 
			
		||||
                if v[0] != '':
 | 
			
		||||
                    sender = v[0].strip(',')
 | 
			
		||||
                if v[1] != '':
 | 
			
		||||
                    recipient = v[1].strip(',')
 | 
			
		||||
                if v[2] != '':
 | 
			
		||||
                    executable = v[2].strip(',')
 | 
			
		||||
 | 
			
		||||
                rule = RuleSimple(sender, recipient, executable)
 | 
			
		||||
                rules.exclude(rule)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_arg_rules(self, config):
 | 
			
		||||
        address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT'))
 | 
			
		||||
        self.o['RULES'] = address_rules
 | 
			
		||||
        self.process_address_arg_rules(config)
 | 
			
		||||
        self.process_data_arg_rules(config)
 | 
			
		||||
        self.process_address_file_rules(config)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_cache_store(self, config):
 | 
			
		||||
        cache_dir = config.get('_CACHE_DIR')
 | 
			
		||||
        store = None
 | 
			
		||||
        if cache_dir == None:
 | 
			
		||||
            logg.warning('no cache dir specified, will discard everything!!')
 | 
			
		||||
            from eth_cache.store.null import NullStore
 | 
			
		||||
            store = NullStore()
 | 
			
		||||
def process_monitor_session_dir(settings, config):
 | 
			
		||||
    syncer_store_module = None
 | 
			
		||||
    syncer_store_class = None
 | 
			
		||||
    session_id = settings.get('SESSION_ID')
 | 
			
		||||
    state_dir = None
 | 
			
		||||
    if config.get('SYNCER_BACKEND') == 'mem':
 | 
			
		||||
        syncer_store_module = importlib.import_module('chainsyncer.store.mem')
 | 
			
		||||
        syncer_store_class = getattr(syncer_store_module, 'SyncMemStore')
 | 
			
		||||
    else:
 | 
			
		||||
        if config.get('SYNCER_BACKEND') == 'fs': 
 | 
			
		||||
            syncer_store_module = importlib.import_module('chainsyncer.store.fs')
 | 
			
		||||
            syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
 | 
			
		||||
        elif config.get('SYNCER_BACKEND') == 'rocksdb':
 | 
			
		||||
            syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
 | 
			
		||||
            syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
 | 
			
		||||
        else:
 | 
			
		||||
            store = FileStore(self.o['CHAIN_SPEC'], cache_dir)
 | 
			
		||||
            cache_dir = os.path.realpath(cache_dir)
 | 
			
		||||
            if cache_dir == None:
 | 
			
		||||
                import tempfile
 | 
			
		||||
                cache_dir = tempfile.mkdtemp()
 | 
			
		||||
        logg.info('using cache store {}'.format(store))
 | 
			
		||||
            syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
 | 
			
		||||
            syncer_store_class = getattr(syncer_store_module, 'SyncStore')
 | 
			
		||||
        state_dir = os.path.join(config.get('ETHMONITOR_STATE_DIR'), config.get('SYNCER_BACKEND'))
 | 
			
		||||
        os.makedirs(state_dir, exist_ok=True)
 | 
			
		||||
    logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
 | 
			
		||||
 | 
			
		||||
        self.o['CACHE_STORE'] = store
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_cache_filter(self, config):
 | 
			
		||||
        fltr = CacheFilter(self.o['CACHE_STORE'], rules_filter=self.o['RULES'], include_tx_data=config.true('ETHCACHE_STORE_TX'))
 | 
			
		||||
        self.o['SYNC_STORE'].register(fltr)
 | 
			
		||||
 | 
			
		||||
        fltr = BlockFilter(self.o['CACHE_STORE'], include_block_data=config.true('ETHCACHE_STORE_BLOCK'))
 | 
			
		||||
        self.o['BLOCK_HANDLER'].register(fltr)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_tx_filter(self, config):
 | 
			
		||||
        for fltr in list_from_prefix(config, 'filter'):
 | 
			
		||||
            m = importlib.import_module(fltr)
 | 
			
		||||
            fltr_object = m.Filter(rules_filter=self.o['RULES'])
 | 
			
		||||
            self.o['SYNC_STORE'].register(fltr_object)
 | 
			
		||||
            logg.info('using filter module {}'.format(fltr))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_block_filter(self, config):
 | 
			
		||||
        block_filter_handler = BlockCallbackFilter()
 | 
			
		||||
        for block_filter in list_from_prefix(config, 'block_filter'):
 | 
			
		||||
            m = importlib.import_module(block_filter)
 | 
			
		||||
            block_filter_handler.register(m)
 | 
			
		||||
            logg.info('using block filter module {}'.format(block_filter))
 | 
			
		||||
 | 
			
		||||
        self.o['BLOCK_HANDLER'] = block_filter_handler
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_out_filter(self, config):
 | 
			
		||||
        out_filter = OutFilter(
 | 
			
		||||
            self.o['CHAIN_SPEC'],
 | 
			
		||||
            rules_filter=self.o['RULES'],
 | 
			
		||||
            renderers=self.o['RENDERER'],
 | 
			
		||||
    session_dir = os.path.join(state_dir, session_id)
 | 
			
		||||
    sync_store = syncer_store_class(
 | 
			
		||||
            session_dir,
 | 
			
		||||
            session_id=session_id,
 | 
			
		||||
            state_event_callback=state_change_callback,
 | 
			
		||||
            filter_state_event_callback=filter_change_callback,
 | 
			
		||||
            )
 | 
			
		||||
        self.o['SYNC_STORE'].register(out_filter)
 | 
			
		||||
 | 
			
		||||
    settings.set('STATE_DIR', state_dir)
 | 
			
		||||
    settings.set('SESSION_DIR', session_dir)
 | 
			
		||||
    settings.set('SYNC_STORE', sync_store)
 | 
			
		||||
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_filter(self, config):
 | 
			
		||||
        self.o['FILTER'] = []
 | 
			
		||||
def process_address_arg_rules(settings, config):
 | 
			
		||||
    rules = settings.get('RULES')
 | 
			
		||||
    category = {
 | 
			
		||||
        'input': {
 | 
			
		||||
            'i': [],
 | 
			
		||||
            'x': [],
 | 
			
		||||
            },
 | 
			
		||||
        'output': {
 | 
			
		||||
            'i': [],
 | 
			
		||||
            'x': [],
 | 
			
		||||
            },
 | 
			
		||||
        'exec':  {
 | 
			
		||||
            'i': [],
 | 
			
		||||
            'x': [],
 | 
			
		||||
            },
 | 
			
		||||
        }
 | 
			
		||||
    for rules_arg in [
 | 
			
		||||
            'input',
 | 
			
		||||
            'output',
 | 
			
		||||
            'exec',
 | 
			
		||||
            ]:
 | 
			
		||||
        (vy, vn) = to_config_names(rules_arg)
 | 
			
		||||
        for address in config.get(vy):
 | 
			
		||||
            if not is_address(address):
 | 
			
		||||
                raise ValueError('invalid address in config {}: {}'.format(vy, address))
 | 
			
		||||
            category[rules_arg]['i'].append(address)
 | 
			
		||||
        for address in config.get(vn):
 | 
			
		||||
            if not is_address(address):
 | 
			
		||||
                raise ValueError('invalid address in config {}: {}'.format(vn, address))
 | 
			
		||||
            category[rules_arg]['x'].append(address)
 | 
			
		||||
 | 
			
		||||
        self.process_renderer(config)
 | 
			
		||||
    includes = RuleSimple(
 | 
			
		||||
            category['output']['i'],
 | 
			
		||||
            category['input']['i'],
 | 
			
		||||
            category['exec']['i'],
 | 
			
		||||
            description='INCLUDE',
 | 
			
		||||
            )
 | 
			
		||||
    rules.include(includes)
 | 
			
		||||
 | 
			
		||||
        self.process_block_filter(config)
 | 
			
		||||
    excludes = RuleSimple(
 | 
			
		||||
            category['output']['x'],
 | 
			
		||||
            category['input']['x'],
 | 
			
		||||
            category['exec']['x'],
 | 
			
		||||
            description='EXCLUDE',
 | 
			
		||||
            )
 | 
			
		||||
    rules.exclude(excludes)
 | 
			
		||||
 | 
			
		||||
        self.process_cache_filter(config)
 | 
			
		||||
        self.process_tx_filter(config)
 | 
			
		||||
        self.process_out_filter(config)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_renderer(self, config):
 | 
			
		||||
        renderers_mods = []
 | 
			
		||||
        for renderer in list_from_prefix(config, 'renderer'):
 | 
			
		||||
            m = importlib.import_module(renderer)
 | 
			
		||||
            renderers_mods.append(m)
 | 
			
		||||
            logg.info('using renderer module {}'.format(renderer))
 | 
			
		||||
        self.o['RENDERER'] = renderers_mods
 | 
			
		||||
def process_data_arg_rules(settings, config):
 | 
			
		||||
    rules = settings.get('RULES')
 | 
			
		||||
 | 
			
		||||
    include_data = []
 | 
			
		||||
    for v in config.get('ETHMONITOR_DATA'):
 | 
			
		||||
        include_data.append(v.lower())
 | 
			
		||||
    exclude_data = []
 | 
			
		||||
    for v in config.get('ETHMONITOR_X_DATA'):
 | 
			
		||||
        exclude_data.append(v.lower())
 | 
			
		||||
 | 
			
		||||
    def process_cache_rpc(self, config):
 | 
			
		||||
        if not config.true('_FRESH'):
 | 
			
		||||
            self.o['RPC'] = CacheRPC(self.o['RPC'], cache_store)
 | 
			
		||||
    includes = RuleMethod(include_data, description='INCLUDE')
 | 
			
		||||
    rules.include(includes)
 | 
			
		||||
   
 | 
			
		||||
    excludes = RuleMethod(exclude_data, description='EXCLUDE')
 | 
			
		||||
    rules.exclude(excludes)
 | 
			
		||||
 | 
			
		||||
    def process_common(self, config):
 | 
			
		||||
        super(EthMonitorSettings, self).process_common(config)
 | 
			
		||||
        # TODO: duplicate from chaind, consider move to chainlib-eth
 | 
			
		||||
        rpc_provider = config.get('RPC_PROVIDER')
 | 
			
		||||
        if rpc_provider == None:
 | 
			
		||||
            rpc_provider = 'http://localhost:8545'
 | 
			
		||||
        self.o['RPC'] = EthHTTPConnection(url=rpc_provider, chain_spec=self.o['CHAIN_SPEC'])
 | 
			
		||||
    include_data = []
 | 
			
		||||
    for v in config.get('ETHMONITOR_DATA_IN'):
 | 
			
		||||
        include_data.append(v.lower())
 | 
			
		||||
    exclude_data = []
 | 
			
		||||
    for v in config.get('ETHMONITOR_X_DATA_IN'):
 | 
			
		||||
        exclude_data.append(v.lower())
 | 
			
		||||
 | 
			
		||||
    includes = RuleData(include_data, description='INCLUDE')
 | 
			
		||||
    rules.include(includes)
 | 
			
		||||
   
 | 
			
		||||
    excludes = RuleData(exclude_data, description='EXCLUDE')
 | 
			
		||||
    rules.exclude(excludes)
 | 
			
		||||
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_sync_interface(self, config):
 | 
			
		||||
        self.o['SYNCER_INTERFACE'] = EthChainInterface()
 | 
			
		||||
def process_address_file_rules(settings, config): #rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False):
 | 
			
		||||
    rules = settings.get('RULES')
 | 
			
		||||
    includes_file = config.get('ETHMONITOR_INCLUDES_FILE')
 | 
			
		||||
    if includes_file != None:
 | 
			
		||||
        f = open(includes_file, 'r')
 | 
			
		||||
        logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file)))
 | 
			
		||||
        while True:
 | 
			
		||||
            r = f.readline()
 | 
			
		||||
            if r == '':
 | 
			
		||||
                break
 | 
			
		||||
            r = r.rstrip()
 | 
			
		||||
            v = r.split("\t")
 | 
			
		||||
 | 
			
		||||
            sender = []
 | 
			
		||||
            recipient = []
 | 
			
		||||
            executable = []
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                if v[0] != '':
 | 
			
		||||
                    sender = v[0].split(',')
 | 
			
		||||
            except IndexError:
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                if v[1] != '':
 | 
			
		||||
                    recipient = v[1].split(',')
 | 
			
		||||
            except IndexError:
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                if v[2] != '':
 | 
			
		||||
                    executable = v[2].split(',')
 | 
			
		||||
            except IndexError:
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
            rule = RuleSimple(sender, recipient, executable)
 | 
			
		||||
            rules.include(rule)
 | 
			
		||||
 | 
			
		||||
    excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE')
 | 
			
		||||
    if excludes_file != None:
 | 
			
		||||
        f = open(includes_file, 'r')
 | 
			
		||||
        logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file)))
 | 
			
		||||
        while True:
 | 
			
		||||
            r = f.readline()
 | 
			
		||||
            if r == '':
 | 
			
		||||
                break
 | 
			
		||||
            r = r.rstrip()
 | 
			
		||||
            v = r.split("\t")
 | 
			
		||||
 | 
			
		||||
            sender = None
 | 
			
		||||
            recipient = None
 | 
			
		||||
            executable = None
 | 
			
		||||
 | 
			
		||||
            if v[0] != '':
 | 
			
		||||
                sender = v[0].strip(',')
 | 
			
		||||
            if v[1] != '':
 | 
			
		||||
                recipient = v[1].strip(',')
 | 
			
		||||
            if v[2] != '':
 | 
			
		||||
                executable = v[2].strip(',')
 | 
			
		||||
 | 
			
		||||
            rule = RuleSimple(sender, recipient, executable)
 | 
			
		||||
            rules.exclude(rule)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_sync(self, config):
 | 
			
		||||
        self.process_sync_interface(config)
 | 
			
		||||
        self.process_sync_backend(config)
 | 
			
		||||
        self.process_sync_range(config)
 | 
			
		||||
def process_arg_rules(settings, config):
 | 
			
		||||
    address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT'))
 | 
			
		||||
    settings.set('RULES', address_rules)
 | 
			
		||||
    settings = process_address_arg_rules(settings, config)
 | 
			
		||||
    settings = process_data_arg_rules(settings, config)
 | 
			
		||||
    settings = process_address_file_rules(settings, config)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process_cache(self, config):
 | 
			
		||||
        self.process_cache_store(config)
 | 
			
		||||
def process_cache_store(settings, config):
 | 
			
		||||
    cache_dir = config.get('_CACHE_DIR')
 | 
			
		||||
    store = None
 | 
			
		||||
    if cache_dir == None:
 | 
			
		||||
        logg.warning('no cache dir specified, will discard everything!!')
 | 
			
		||||
        from eth_cache.store.null import NullStore
 | 
			
		||||
        store = NullStore()
 | 
			
		||||
    else:
 | 
			
		||||
        store = FileStore(settings.get('CHAIN_SPEC'), cache_dir)
 | 
			
		||||
        cache_dir = os.path.realpath(cache_dir)
 | 
			
		||||
        if cache_dir == None:
 | 
			
		||||
            import tempfile
 | 
			
		||||
            cache_dir = tempfile.mkdtemp()
 | 
			
		||||
    logg.info('using cache store {}'.format(store))
 | 
			
		||||
 | 
			
		||||
    settings.set('CACHE_STORE', store)
 | 
			
		||||
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def process(self, config):
 | 
			
		||||
        self.process_common(config)
 | 
			
		||||
        self.process_monitor_session(config)
 | 
			
		||||
        self.process_monitor_session_dir(config)
 | 
			
		||||
        self.process_arg_rules(config)
 | 
			
		||||
        self.process_sync(config)
 | 
			
		||||
        self.process_cache(config)
 | 
			
		||||
        self.process_filter(config)
 | 
			
		||||
def process_cache_filter(settings, config):
 | 
			
		||||
    cache_store = settings.get('CACHE_STORE')
 | 
			
		||||
    fltr = CacheFilter(cache_store, rules_filter=settings.o['RULES'], include_tx_data=config.true('ETHCACHE_STORE_TX'))
 | 
			
		||||
    sync_store = settings.get('SYNC_STORE')
 | 
			
		||||
    sync_store.register(fltr)
 | 
			
		||||
    
 | 
			
		||||
    fltr = BlockFilter(cache_store, include_block_data=config.true('ETHCACHE_STORE_BLOCK'))
 | 
			
		||||
    hndlr = settings.get('BLOCK_HANDLER')
 | 
			
		||||
    hndlr.register(fltr)
 | 
			
		||||
    
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_tx_filter(settings, config):
 | 
			
		||||
    for fltr in list_from_prefix(config, 'filter'):
 | 
			
		||||
        m = importlib.import_module(fltr)
 | 
			
		||||
        fltr_object = m.Filter(rules_filter=settings.get('RULES'))
 | 
			
		||||
        store = settings.get('SYNC_STORE')
 | 
			
		||||
        store.register(fltr_object)
 | 
			
		||||
        logg.info('using filter module {}'.format(fltr))
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_block_filter(settings, config):
 | 
			
		||||
    block_filter_handler = BlockCallbackFilter()
 | 
			
		||||
    for block_filter in list_from_prefix(config, 'block_filter'):
 | 
			
		||||
        m = importlib.import_module(block_filter)
 | 
			
		||||
        block_filter_handler.register(m)
 | 
			
		||||
        logg.info('using block filter module {}'.format(block_filter))
 | 
			
		||||
 | 
			
		||||
    settings.set('BLOCK_HANDLER', block_filter_handler)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_out_filter(settings, config):
 | 
			
		||||
    out_filter = OutFilter(
 | 
			
		||||
        settings.o['CHAIN_SPEC'],
 | 
			
		||||
        rules_filter=settings.o['RULES'],
 | 
			
		||||
        renderers=settings.o['RENDERER'],
 | 
			
		||||
        )
 | 
			
		||||
    store = settings.get('SYNC_STORE')
 | 
			
		||||
    store.register(out_filter)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_filter(settings, config):
 | 
			
		||||
    settings.set('FILTER', [])
 | 
			
		||||
    settings = process_renderer(settings, config)
 | 
			
		||||
    settings = process_block_filter(settings, config)
 | 
			
		||||
    settings = process_cache_filter(settings, config)
 | 
			
		||||
    settings = process_tx_filter(settings, config)
 | 
			
		||||
    settings = process_out_filter(settings, config)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_renderer(settings, config):
 | 
			
		||||
    renderers_mods = []
 | 
			
		||||
    for renderer in list_from_prefix(config, 'renderer'):
 | 
			
		||||
        m = importlib.import_module(renderer)
 | 
			
		||||
        renderers_mods.append(m)
 | 
			
		||||
        logg.info('using renderer module {}'.format(renderer))
 | 
			
		||||
    settings.set('RENDERER', renderers_mods)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_cache_rpc(settings, config):
 | 
			
		||||
    if not config.true('_FRESH'):
 | 
			
		||||
        rpc = CacheRPC(settings.get('RPC'), cache_store)
 | 
			
		||||
        settings.set('RPC', rpc)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_sync_interface(settings, config):
 | 
			
		||||
    ifc = EthChainInterface()
 | 
			
		||||
    settings.set('SYNCER_INTERFACE', ifc)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_sync(settings, config):
 | 
			
		||||
    settings = process_sync_interface(settings, config)
 | 
			
		||||
    settings = process_sync_backend(settings, config)
 | 
			
		||||
    settings = process_sync_range(settings, config)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_cache(settings, config):
 | 
			
		||||
    settings = process_cache_store(settings, config)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def process_settings(settings, config):
 | 
			
		||||
    settings = process_monitor_session(settings, config)
 | 
			
		||||
    settings = process_monitor_session_dir(settings, config)
 | 
			
		||||
    settings = process_arg_rules(settings, config)
 | 
			
		||||
    settings = process_sync(settings, config)
 | 
			
		||||
    settings = process_cache(settings, config)
 | 
			
		||||
    settings = process_filter(settings, config)
 | 
			
		||||
    return settings
 | 
			
		||||
 | 
			
		||||
@ -1,5 +1,5 @@
 | 
			
		||||
chainlib-eth~=0.2.0
 | 
			
		||||
chainlib~=0.2.0
 | 
			
		||||
chainsyncer~=0.4.10
 | 
			
		||||
chainlib-eth~=0.3.0
 | 
			
		||||
chainlib~=0.3.0
 | 
			
		||||
chainsyncer~=0.4.11
 | 
			
		||||
leveldir~=0.3.0
 | 
			
		||||
eth-cache~=0.1.4
 | 
			
		||||
eth-cache~=0.1.5
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,6 @@
 | 
			
		||||
[metadata]
 | 
			
		||||
name = eth-monitor
 | 
			
		||||
version = 0.4.8
 | 
			
		||||
version = 0.4.9
 | 
			
		||||
description = Monitor and cache transactions using match filters
 | 
			
		||||
author = Louis Holbrook
 | 
			
		||||
author_email = dev@holbrook.no
 | 
			
		||||
@ -33,6 +33,7 @@ packages =
 | 
			
		||||
	eth_monitor.filters
 | 
			
		||||
	eth_monitor.runnable
 | 
			
		||||
	eth_monitor.mock
 | 
			
		||||
	eth_monitor.cli
 | 
			
		||||
 | 
			
		||||
[options.entry_points]
 | 
			
		||||
console_scripts =
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user