Move rules handling to settings module

This commit is contained in:
lash 2022-05-10 14:21:49 +00:00
parent 80eee2b779
commit 39dc90fe9e
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
8 changed files with 289 additions and 240 deletions

View File

@ -1,3 +1,9 @@
# standard imports
import logging
logg = logging.getLogger(__name__)
class BlockCallbackFilter:
def __init__(self):
@ -11,3 +17,23 @@ class BlockCallbackFilter:
def filter(self, block, tx=None):
for fltr in self.filters:
fltr.filter(block, tx=tx)
def state_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
def filter_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
def pre_callback():
logg.debug('starting sync loop iteration')
def post_callback():
logg.debug('ending sync loop iteration')

View File

@ -1,6 +1,7 @@
# external imports
from chainlib.interface import ChainInterface
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
@ -12,6 +13,7 @@ from chainlib.eth.tx import (
class EthChainInterface(ChainInterface):
def __init__(self):
self._block_latest = block_latest
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._tx_receipt = receipt

View File

@ -10,13 +10,19 @@ def process_flags(argparser, flags):
argparser.add_argument('--exec', action='append', type=str, help='Add exec (contract) addresses to includes list')
argparser.add_argument('--data', action='append', type=str, help='Add data prefix strings to include list')
argparser.add_argument('--data-in', action='append', dest='data_in', type=str, help='Add data contain strings to include list')
argparser.add_argument('--x-data', action='append', dest='xdata', type=str, help='Add data prefix string to exclude list')
argparser.add_argument('--x-data-in', action='append', dest='xdata_in', type=str, help='Add data contain string to exclude list')
argparser.add_argument('--x-data', action='append', dest='x_data', type=str, help='Add data prefix string to exclude list')
argparser.add_argument('--x-data-in', action='append', dest='x_data_in', type=str, help='Add data contain string to exclude list')
argparser.add_argument('--address', action='append', type=str, help='Add addresses as input, output and exec to includes list')
argparser.add_argument('--x-input', action='append', type=str, dest='xinput', help='Add input (recipient) addresses to excludes list')
argparser.add_argument('--x-output', action='append', type=str, dest='xoutput', help='Add output (sender) addresses to excludes list')
argparser.add_argument('--x-exec', action='append', type=str, dest='xexec', help='Add exec (contract) addresses to excludes list')
argparser.add_argument('--x-address', action='append', type=str, dest='xaddress', help='Add addresses as input, output and exec to excludes list')
argparser.add_argument('--x-input', action='append', type=str, dest='x_input', help='Add input (recipient) addresses to excludes list')
argparser.add_argument('--x-output', action='append', type=str, dest='x_output', help='Add output (sender) addresses to excludes list')
argparser.add_argument('--x-exec', action='append', type=str, dest='x_exec', help='Add exec (contract) addresses to excludes list')
argparser.add_argument('--x-address', action='append', type=str, dest='x_address', help='Add addresses as input, output and exec to excludes list')
argparser.add_argument('--includes-file', type=str, dest='includes_file', help='Load include rules from file')
argparser.add_argument('--excludes-file', type=str, dest='excludes_file', help='Load exclude rules from file')
argparser.add_argument('--include-default', dest='include_default', action='store_true', help='Include all transactions by default')
# filter flags
argparser.add_argument('--renderer', type=str, action='append', default=[], help='Python modules to dynamically load for rendering of transaction output')
argparser.add_argument('--filter', type=str, action='append', help='Add python module to tx filter path')
argparser.add_argument('--block-filter', type=str, dest='block_filter', action='append', help='Add python module to block filter path')

View File

@ -1,17 +1,52 @@
# local imports
from .rules import (
rules_address_args,
rules_data_args,
to_config_names,
)
def process_config(config, args, flags):
arg_override = {}
arg_override['ETHMONITOR_INPUTS'] = getattr(args, 'input')
arg_override['ETHMONITOR_OUTPUTS'] = getattr(args, 'output')
arg_override['ETHMONITOR_EXEC'] = getattr(args, 'exec')
arg_override['ETHMONITOR_ADDRESS'] = getattr(args, 'address')
arg_override['ETHMONITOR_X_INPUTS'] = getattr(args, 'xinput')
arg_override['ETHMONITOR_X_OUTPUTS'] = getattr(args, 'xoutput')
arg_override['ETHMONITOR_X_EXEC'] = getattr(args, 'xexec')
arg_override['ETHMONITOR_X_ADDRESS'] = getattr(args, 'xaddress')
rules_args = rules_address_args + rules_data_args
for rules_arg in rules_args:
(vy, vn) = to_config_names(rules_arg)
arg = getattr(args, rules_arg)
if arg == None:
v = config.get(vy)
if bool(v):
arg_override[vy] = v.split(',')
else:
arg_override[vy] = arg
arg = getattr(args, 'x_' + rules_arg)
if arg == None:
v = config.get(vn)
if bool(v):
arg_override[vn] = v.split(',')
else:
arg_override[vn] = arg
arg_override['ETHMONITOR_INCLUDES_FILE'] = getattr(args, 'includes_file')
arg_override['ETHMONITOR_EXCLUDES_FILE'] = getattr(args, 'excludes_file')
arg_override['ETHMONITOR_INCLUDE_DEFAULT'] = getattr(args, 'include_default')
arg_override['ETHMONITOR_RENDERER'] = getattr(args, 'renderer')
arg_override['ETHMONITOR_FILTER'] = getattr(args, 'filter')
arg_override['ETHMONITOR_BLOCK_FILTER'] = getattr(args, 'block_filter')
arg_override['ETHMONITOR_STATE_DIR'] = getattr(args, 'state_dir')
config.dict_override(arg_override, 'local cli args')
for rules_arg in rules_args:
(vy, vn) = to_config_names(rules_arg)
if config.get(vy) == None:
config.add([], vy, True)
if config.get(vn) == None:
config.add([], vn, True)
config.add(getattr(args, 'session_id'), '_SESSION_ID', False)
config.add(getattr(args, 'cache_dir'), '_CACHE_DIR', False)

18
eth_monitor/cli/rules.py Normal file
View File

@ -0,0 +1,18 @@
rules_address_args = [
'input',
'output',
'exec',
'address',
]
rules_data_args = [
'data',
'data_in',
]
def to_config_names(v):
v = v.upper()
return ('ETHMONITOR_' + v, 'ETHMONITOR_X_' + v)

View File

@ -1,13 +1,20 @@
[ethmonitor]
inputs =
outputs =
input =
output =
exec =
x_inputs =
x_outputs =
x_input =
x_output =
x_exec =
address =
x_address =
data =
x_data =
data_in =
x_data_in =
includes_file =
excludes_file =
renderer =
filter =
block_filter =
include_default = 0
state_dir =
state_dir = ./.eth-monitor

View File

@ -27,7 +27,6 @@ from eth_cache.rpc import CacheRPC
from eth_cache.store.file import FileStore
# local imports
from eth_monitor.chain import EthChainInterface
from eth_monitor.filters.cache import Filter as CacheFilter
from eth_monitor.rules import (
AddressRules,
@ -38,7 +37,11 @@ from eth_monitor.rules import (
from eth_monitor.filters import RuledFilter
from eth_monitor.filters.out import OutFilter
from eth_monitor.config import override, list_from_prefix
from eth_monitor.callback import BlockCallbackFilter
from eth_monitor.callback import (
BlockCallbackFilter,
pre_callback,
post_callback,
)
from eth_monitor.settings import EthMonitorSettings
import eth_monitor.cli
@ -61,9 +64,6 @@ eth_monitor.cli.process_flags(argparser, 0)
argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default')
argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default')
argparser.add_argument('--renderer', type=str, action='append', default=[], help='Python modules to dynamically load for rendering of transaction output')
argparser.add_argument('--filter', type=str, action='append', help='Add python module to tx filter path')
argparser.add_argument('--block-filter', type=str, dest='block_filter', action='append', help='Add python module to block filter path')
argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available')
argparser.add_argument('--list-backends', dest='list_backends', action='store_true', help='List built-in store backends')
argparser.add_argument('-vvv', action='store_true', help='Be incredibly verbose')
@ -124,100 +124,6 @@ logg.debug('loaded settings:\n{}'.format(settings))
def setup_data_arg_rules(rules, args):
include_data = []
for v in args.data:
include_data.append(v.lower())
exclude_data = []
for v in args.xdata:
exclude_data.append(v.lower())
includes = RuleMethod(include_data, description='INCLUDE')
rules.include(includes)
excludes = RuleMethod(exclude_data, description='EXCLUDE')
rules.exclude(excludes)
include_data = []
for v in args.data_in:
include_data.append(v.lower())
exclude_data = []
for v in args.xdata_in:
exclude_data.append(v.lower())
includes = RuleData(include_data, description='INCLUDE')
rules.include(includes)
excludes = RuleData(exclude_data, description='EXCLUDE')
rules.exclude(excludes)
return rules
def setup_address_file_rules(rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False):
if includes_file != None:
f = open(includes_file, 'r')
logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file)))
while True:
r = f.readline()
if r == '':
break
r = r.rstrip()
v = r.split("\t")
sender = []
recipient = []
executable = []
try:
if v[0] != '':
sender = v[0].split(',')
except IndexError:
pass
try:
if v[1] != '':
recipient = v[1].split(',')
except IndexError:
pass
try:
if v[2] != '':
executable = v[2].split(',')
except IndexError:
pass
rule = RuleSimple(sender, recipient, executable)
rules.include(rule)
if excludes_file != None:
f = open(includes_file, 'r')
logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file)))
while True:
r = f.readline()
if r == '':
break
r = r.rstrip()
v = r.split("\t")
sender = None
recipient = None
executable = None
if v[0] != '':
sender = v[0].strip(',')
if v[1] != '':
recipient = v[1].strip(',')
if v[2] != '':
executable = v[2].strip(',')
rule = RuleSimple(sender, recipient, executable)
rules.exclude(rule)
return rules
def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data):
store = None
if cache_dir == None:
@ -240,26 +146,10 @@ def setup_cache_filter(rules_filter=None):
return CacheFilter(rules_filter=rules_filter)
def pre_callback():
logg.debug('starting sync loop iteration')
def post_callback():
logg.debug('ending sync loop iteration')
def block_callback(block, tx):
logg.info('processing {} {}'.format(block, datetime.datetime.fromtimestamp(block.timestamp)))
def state_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
def filter_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
def main():
rpc = settings.get('RPC')
o = block_latest()
@ -267,47 +157,6 @@ def main():
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height is {}'.format(block_offset))
keep_alive = False
session_block_offset = 0
block_limit = 0
if args.head:
session_block_offset = block_offset
block_limit = -1
keep_alive = True
else:
session_block_offset = args.offset
if args.until > 0:
if not args.head and args.until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, args.until))
block_limit = args.until
elif config.true('_KEEP_ALIVE'):
keep_alive=True
block_limit = -1
if session_block_offset == -1:
session_block_offset = block_offset
elif not config.true('_KEEP_ALIVE'):
if block_limit == 0:
block_limit = block_offset
sys.exit(0)
#address_rules = AddressRules(include_by_default=args.include_default)
address_rules = setup_data_arg_rules(
address_rules,
args,
)
address_rules = setup_address_file_rules(
address_rules,
includes_file=args.includes_file,
excludes_file=args.excludes_file,
)
address_rules = setup_address_arg_rules(
address_rules,
args,
)
store = setup_filter(
settings.get('CHAIN_SPEC'),
config.get('_CACHE_DIR'),
@ -316,7 +165,7 @@ def main():
)
cache_filter = setup_cache_filter(
rules_filter=address_rules,
rules_filter=settings.get('RULES'), #address_rules,
)
filters = [
@ -325,7 +174,7 @@ def main():
for fltr in list_from_prefix(config, 'filter'):
m = importlib.import_module(fltr)
fltr_object = m.Filter(rules_filter=address_rules)
fltr_object = m.Filter(rules_filter=settings.get('RULES'))
filters.append(fltr_object)
logg.info('using filter module {}'.format(fltr))
@ -341,19 +190,26 @@ def main():
block_filter_handler.register(m)
logg.info('using block filter module {}'.format(block_filter))
chain_interface = EthChainInterface()
out_filter = OutFilter(
settings.get('CHAIN_SPEC'),
rules_filter=address_rules,renderers=renderers_mods,
rules_filter=settings.get('RULES'),
renderers=renderers_mods,
)
filters.append(out_filter)
logg.info('session is {}'.format(settings.get('SESSION_ID')))
for fltr in filters:
sync_store.register(fltr)
drv = ChainInterfaceDriver(sync_store, chain_interface, offset=session_block_offset, target=block_limit, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_filter_handler.filter)
settings.get('SYNC_STORE').register(fltr)
drv = ChainInterfaceDriver(
settings.get('SYNC_STORE'),
settings.get('SYNCER_INTERFACE'),
offset=settings.get('SYNCER_OFFSET'),
target=settings.get('SYNCER_LIMIT'),
pre_callback=pre_callback,
post_callback=post_callback,
block_callback=block_filter_handler.filter,
)
use_rpc = rpc
if not args.fresh:

View File

@ -8,6 +8,8 @@ import importlib
from chainlib.settings import ChainSettings
from chainsyncer.settings import ChainsyncerSettings
from chainlib.eth.connection import EthHTTPConnection
from eth_monitor.chain import EthChainInterface
from chainlib.eth.address import is_address
# local imports
from eth_monitor.rules import (
@ -16,6 +18,11 @@ from eth_monitor.rules import (
RuleMethod,
RuleData,
)
from eth_monitor.cli.rules import to_config_names
from eth_monitor.callback import (
state_change_callback,
filter_change_callback,
)
logg = logging.getLogger(__name__)
@ -50,82 +57,162 @@ class EthMonitorSettings(ChainsyncerSettings):
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
state_dir = os.path.join(config.get('_STATE_DIR'), config.get('SYNCER_BACKEND'))
state_dir = os.path.join(config.get('ETHMONITOR_STATE_DIR'), config.get('SYNCER_BACKEND'))
os.makedirs(state_dir, exist_ok=True)
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
#state_dir = os.path.join(state_dir, config.get('_SESSION_ID'))
sync_store = syncer_store_class(state_dir, session_id=session.get('SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
sync_store = syncer_store_class(state_dir, session_id=self.o['SESSION_ID'], state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
self.o['STATE_DIR'] = state_dir
self.o['SESSION_DIR'] = os.path.join(state_dir, self.o['SESSION_ID'])
self.o['SYNC_STORE'] = sync_store
#def process_address_arg_rules(rules, args):
def process_address_arg_rules(self, config):
include_inputs = config.get('ETHMONITOR_INPUTS')
if include_inputs == None:
include_inputs = []
else:
include_inputs = include_inputs.split(',')
category = {
'input': {
'i': [],
'x': [],
},
'output': {
'i': [],
'x': [],
},
'exec': {
'i': [],
'x': [],
},
}
for rules_arg in [
'input',
'output',
'exec',
]:
(vy, vn) = to_config_names(rules_arg)
for address in config.get(vy):
if not is_address(address):
raise ValueError('invalid address in config {}: {}'.format(vy, address))
category[rules_arg]['i'].append(address)
for address in config.get(vn):
if not is_address(address):
raise ValueError('invalid address in config {}: {}'.format(vn, address))
category[rules_arg]['x'].append(address)
include_outputs = config.get('ETHMONITOR_OUTPUTS')
if include_outputs == None:
include_outputs = []
else:
include_outputs = include_outputs.split(',')
include_exec = config.get('ETHMONITOR_EXEC')
if include_exec == None:
include_exec = []
else:
include_exec = include_exec.split(',')
exclude_inputs = config.get('ETHMONITOR_X_INPUTS')
if exclude_inputs == None:
exclude_inputs = []
else:
exclude_inputs = exclude_inputs.split(',')
exclude_outputs = config.get('ETHMONITOR_X_OUTPUTS')
if exclude_outputs == None:
exclude_outputs = []
else:
exclude_outputs = exclude_outputs.split(',')
exclude_exec = config.get('ETHMONITOR_X_EXEC')
if exclude_exec == None:
exclude_exec = []
else:
exclude_exec = exclude_exec.split(',')
address = config.get('ETHMONITOR_ADDRESS')
if address != None:
for address in address.split(','):
include_inputs.append(address)
include_outputs.append(address)
include_exec.append(address)
address = config.get('ETHMONITOR_X_ADDRESS')
if address != None:
for address in address.split(','):
exclude_inputs.append(address)
exclude_outputs.append(address)
exclude_exec.append(address)
includes = RuleSimple(include_outputs, include_inputs, include_exec, description='INCLUDE')
includes = RuleSimple(
category['output']['i'],
category['input']['i'],
category['exec']['i'],
description='INCLUDE',
)
self.o['RULES'].include(includes)
excludes = RuleSimple(exclude_outputs, exclude_inputs, exclude_exec, description='EXCLUDE')
excludes = RuleSimple(
category['output']['x'],
category['input']['x'],
category['exec']['x'],
description='EXCLUDE',
)
self.o['RULES'].exclude(excludes)
def process_data_arg_rules(self, config): #rules, args):
include_data = []
for v in config.get('ETHMONITOR_DATA'):
include_data.append(v.lower())
exclude_data = []
for v in config.get('ETHMONITOR_X_DATA'):
exclude_data.append(v.lower())
includes = RuleMethod(include_data, description='INCLUDE')
self.o['RULES'].include(includes)
excludes = RuleMethod(exclude_data, description='EXCLUDE')
self.o['RULES'].exclude(excludes)
include_data = []
for v in config.get('ETHMONITOR_DATA_IN'):
include_data.append(v.lower())
exclude_data = []
for v in config.get('ETHMONITOR_X_DATA_IN'):
exclude_data.append(v.lower())
includes = RuleData(include_data, description='INCLUDE')
self.o['RULES'].include(includes)
excludes = RuleData(exclude_data, description='EXCLUDE')
self.o['RULES'].exclude(excludes)
def process_address_file_rules(self, config): #rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False):
includes_file = config.get('ETHMONITOR_INCLUDES_FILE')
if includes_file != None:
f = open(includes_file, 'r')
logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file)))
while True:
r = f.readline()
if r == '':
break
r = r.rstrip()
v = r.split("\t")
sender = []
recipient = []
executable = []
try:
if v[0] != '':
sender = v[0].split(',')
except IndexError:
pass
try:
if v[1] != '':
recipient = v[1].split(',')
except IndexError:
pass
try:
if v[2] != '':
executable = v[2].split(',')
except IndexError:
pass
rule = RuleSimple(sender, recipient, executable)
rules.include(rule)
excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE')
if excludes_file != None:
f = open(includes_file, 'r')
logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file)))
while True:
r = f.readline()
if r == '':
break
r = r.rstrip()
v = r.split("\t")
sender = None
recipient = None
executable = None
if v[0] != '':
sender = v[0].strip(',')
if v[1] != '':
recipient = v[1].strip(',')
if v[2] != '':
executable = v[2].strip(',')
rule = RuleSimple(sender, recipient, executable)
rules.exclude(rule)
def process_arg_rules(self, config):
address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT'))
self.o['RULES'] = address_rules
self.process_address_arg_rules(config)
self.process_data_arg_rules(config)
self.process_address_file_rules(config)
def process_common(self, config):
@ -137,7 +224,19 @@ class EthMonitorSettings(ChainsyncerSettings):
self.o['RPC'] = EthHTTPConnection(url=rpc_provider, chain_spec=self.o['CHAIN_SPEC'])
def process_sync_interface(self, config):
self.o['SYNCER_INTERFACE'] = EthChainInterface()
def process_sync(self, config):
self.process_sync_interface(config)
self.process_sync_backend(config)
self.process_sync_range(config)
def process(self, config):
self.process_common(config)
self.process_monitor_session(config)
self.process_monitor_session_dir(config)
self.process_arg_rules(config)
self.process_sync(config)