mirror of
git://holbrook.no/eth-monitor.git
synced 2024-10-31 21:56:46 +01:00
Add match-all criteria and flag
This commit is contained in:
parent
fa694c957b
commit
c99259b2ed
@ -1,3 +1,6 @@
|
|||||||
|
- 0.8.8
|
||||||
|
* Add match-all flag to rule processing
|
||||||
|
* Add match-all flag to CLI to toggle setting match_all flag to rule processing for include criteria
|
||||||
- 0.8.7
|
- 0.8.7
|
||||||
* Upgrade chainsyncer (and shep) to avoid state deletion on partial filter list interrupts
|
* Upgrade chainsyncer (and shep) to avoid state deletion on partial filter list interrupts
|
||||||
- 0.8.6
|
- 0.8.6
|
||||||
|
@ -30,6 +30,7 @@ def process_args(argparser, args, flags):
|
|||||||
argparser.add_argument('--store-tx-data', action='store_true', dest='store_tx_data', help='Store tx data in cache store')
|
argparser.add_argument('--store-tx-data', action='store_true', dest='store_tx_data', help='Store tx data in cache store')
|
||||||
argparser.add_argument('--store-block-data', action='store_true', dest='store_block_data', help='Store block data in cache store')
|
argparser.add_argument('--store-block-data', action='store_true', dest='store_block_data', help='Store block data in cache store')
|
||||||
argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available')
|
argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available')
|
||||||
|
argparser.add_argument('--match-all', action='store_true', dest='match_all', help='Match all include filter criteria')
|
||||||
|
|
||||||
# misc flags
|
# misc flags
|
||||||
argparser.add_argument('-k', '--context-key', dest='context_key', action='append', type=str, help='Add a key-value pair to be added to the context')
|
argparser.add_argument('-k', '--context-key', dest='context_key', action='append', type=str, help='Add a key-value pair to be added to the context')
|
||||||
|
@ -40,6 +40,8 @@ def process_config(config, arg, args, flags):
|
|||||||
|
|
||||||
arg_override['ETHMONITOR_CONTEXT_KEY'] = getattr(args, 'context_key')
|
arg_override['ETHMONITOR_CONTEXT_KEY'] = getattr(args, 'context_key')
|
||||||
|
|
||||||
|
arg_override['ETHMONITOR_MATCH_ALL'] = getattr(args, 'match_all')
|
||||||
|
|
||||||
arg_override['ETHCACHE_STORE_BLOCK'] = getattr(args, 'store_block_data')
|
arg_override['ETHCACHE_STORE_BLOCK'] = getattr(args, 'store_block_data')
|
||||||
arg_override['ETHCACHE_STORE_TX'] = getattr(args, 'store_tx_data')
|
arg_override['ETHCACHE_STORE_TX'] = getattr(args, 'store_tx_data')
|
||||||
|
|
||||||
|
@ -19,3 +19,4 @@ block_filter =
|
|||||||
include_default = 0
|
include_default = 0
|
||||||
state_dir = ./.eth-monitor
|
state_dir = ./.eth-monitor
|
||||||
context_key =
|
context_key =
|
||||||
|
match_all = 0
|
||||||
|
2
eth_monitor/error.py
Normal file
2
eth_monitor/error.py
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
class RuleFail(Exception):
|
||||||
|
pass
|
@ -4,6 +4,7 @@ import uuid
|
|||||||
|
|
||||||
# external imports
|
# external imports
|
||||||
from chainlib.eth.address import is_same_address
|
from chainlib.eth.address import is_same_address
|
||||||
|
from .error import RuleFail
|
||||||
|
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
|
|
||||||
@ -11,14 +12,17 @@ logg = logging.getLogger()
|
|||||||
|
|
||||||
class RuleData:
|
class RuleData:
|
||||||
|
|
||||||
def __init__(self, fragments, description=None):
|
def __init__(self, fragments, description=None, match_all=False):
|
||||||
self.fragments = fragments
|
self.fragments = fragments
|
||||||
self.description = description
|
self.description = description
|
||||||
if self.description == None:
|
if self.description == None:
|
||||||
self.description = str(uuid.uuid4())
|
self.description = str(uuid.uuid4())
|
||||||
|
self.match_all = match_all
|
||||||
|
|
||||||
|
|
||||||
def check(self, sender, recipient, data, tx_hash):
|
def check(self, sender, recipient, data, tx_hash):
|
||||||
|
have_fail = False
|
||||||
|
have_match = False
|
||||||
if len(self.fragments) == 0:
|
if len(self.fragments) == 0:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -28,9 +32,16 @@ class RuleData:
|
|||||||
continue
|
continue
|
||||||
if fragment in data:
|
if fragment in data:
|
||||||
logg.debug('tx {} rule {} match in DATA FRAGMENT {}'.format(tx_hash, self.description, fragment))
|
logg.debug('tx {} rule {} match in DATA FRAGMENT {}'.format(tx_hash, self.description, fragment))
|
||||||
return True
|
if not self.match_all:
|
||||||
|
return True
|
||||||
|
have_match = True
|
||||||
|
else:
|
||||||
|
logg.debug('data match all {}'.format(self.match_all))
|
||||||
|
if self.match_all:
|
||||||
|
return False
|
||||||
|
have_fail = True
|
||||||
|
|
||||||
return False
|
return have_match
|
||||||
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
@ -41,11 +52,13 @@ class RuleData:
|
|||||||
|
|
||||||
class RuleMethod:
|
class RuleMethod:
|
||||||
|
|
||||||
def __init__(self, methods, description=None):
|
def __init__(self, methods, description=None, match_all=False):
|
||||||
self.methods = methods
|
self.methods = methods
|
||||||
self.description = description
|
self.description = description
|
||||||
if self.description == None:
|
if self.description == None:
|
||||||
self.description = str(uuid.uuid4())
|
self.description = str(uuid.uuid4())
|
||||||
|
if match_all:
|
||||||
|
logg.warning('match_all ignord for RuleMethod rule')
|
||||||
|
|
||||||
|
|
||||||
def check(self, sender, recipient, data, tx_hash):
|
def check(self, sender, recipient, data, tx_hash):
|
||||||
@ -82,22 +95,51 @@ class RuleSimple:
|
|||||||
|
|
||||||
|
|
||||||
def check(self, sender, recipient, data, tx_hash):
|
def check(self, sender, recipient, data, tx_hash):
|
||||||
|
r = None
|
||||||
|
try:
|
||||||
|
r = self.__check(sender, recipient, data, tx_hash)
|
||||||
|
except RuleFail:
|
||||||
|
return False
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def __check(self, sender, recipient, data, tx_hash):
|
||||||
have_fail = False
|
have_fail = False
|
||||||
have_match = False
|
have_match = False
|
||||||
for rule in self.outputs:
|
for rule in self.outputs:
|
||||||
if rule != None and is_same_address(sender, rule):
|
if rule != None and is_same_address(sender, rule):
|
||||||
logg.debug('tx {} rule {} match in SENDER {}'.format(tx_hash, self.description, sender))
|
logg.debug('tx {} rule {} match in SENDER {}'.format(tx_hash, self.description, sender))
|
||||||
return True
|
if not self.match_all:
|
||||||
|
return True
|
||||||
|
have_match = True
|
||||||
|
else:
|
||||||
|
if self.match_all:
|
||||||
|
raise RuleFail(rule)
|
||||||
|
have_fail = True
|
||||||
if recipient == None:
|
if recipient == None:
|
||||||
return False
|
return False
|
||||||
for rule in self.inputs:
|
for rule in self.inputs:
|
||||||
if rule != None and is_same_address(recipient, rule):
|
if rule != None and is_same_address(recipient, rule):
|
||||||
logg.debug('tx {} rule {} match in RECIPIENT {}'.format(tx_hash, self.description, recipient))
|
logg.debug('tx {} rule {} match in RECIPIENT {}'.format(tx_hash, self.description, recipient))
|
||||||
return True
|
if not self.match_all:
|
||||||
|
return True
|
||||||
|
have_match = True
|
||||||
|
else:
|
||||||
|
if self.match_all:
|
||||||
|
raise RuleFail(rule)
|
||||||
|
have_fail = True
|
||||||
for rule in self.executables:
|
for rule in self.executables:
|
||||||
if rule != None and is_same_address(recipient, rule):
|
if rule != None and is_same_address(recipient, rule):
|
||||||
logg.debug('tx {} rule {} match in EXECUTABLE {}'.format(tx_hash, self.description, recipient))
|
logg.debug('tx {} rule {} match in EXECUTABLE {}'.format(tx_hash, self.description, recipient))
|
||||||
return True
|
if not self.match_all:
|
||||||
|
return True
|
||||||
|
have_match = True
|
||||||
|
else:
|
||||||
|
if self.match_all:
|
||||||
|
raise RuleFail(rule)
|
||||||
|
have_fail = True
|
||||||
|
|
||||||
|
return have_match
|
||||||
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
@ -131,7 +173,6 @@ class AddressRules:
|
|||||||
return self.apply_rules_addresses(tx.outputs[0], tx.inputs[0], tx.payload, tx.hash)
|
return self.apply_rules_addresses(tx.outputs[0], tx.inputs[0], tx.payload, tx.hash)
|
||||||
|
|
||||||
|
|
||||||
# TODO: rename
|
|
||||||
def apply_rules_addresses(self, sender, recipient, data, tx_hash):
|
def apply_rules_addresses(self, sender, recipient, data, tx_hash):
|
||||||
v = self.include_by_default
|
v = self.include_by_default
|
||||||
have_fail = False
|
have_fail = False
|
||||||
|
@ -130,6 +130,7 @@ def process_address_arg_rules(settings, config):
|
|||||||
category['input']['i'],
|
category['input']['i'],
|
||||||
category['exec']['i'],
|
category['exec']['i'],
|
||||||
description='INCLUDE',
|
description='INCLUDE',
|
||||||
|
match_all=settings.get('MATCH_ALL'),
|
||||||
)
|
)
|
||||||
rules.include(includes)
|
rules.include(includes)
|
||||||
|
|
||||||
@ -167,7 +168,7 @@ def process_data_arg_rules(settings, config):
|
|||||||
for v in config.get('ETHMONITOR_X_DATA_IN'):
|
for v in config.get('ETHMONITOR_X_DATA_IN'):
|
||||||
exclude_data.append(v.lower())
|
exclude_data.append(v.lower())
|
||||||
|
|
||||||
includes = RuleData(include_data, description='INCLUDE')
|
includes = RuleData(include_data, description='INCLUDE', match_all=settings.get('MATCH_ALL'))
|
||||||
rules.include(includes)
|
rules.include(includes)
|
||||||
|
|
||||||
excludes = RuleData(exclude_data, description='EXCLUDE')
|
excludes = RuleData(exclude_data, description='EXCLUDE')
|
||||||
@ -211,7 +212,7 @@ def process_address_file_rules(settings, config): #rules, includes_file=None, ex
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
rule = RuleSimple(sender, recipient, executable)
|
rule = RuleSimple(sender, recipient, executable, match_all=settings.get('MATCH_ALL'))
|
||||||
rules.include(rule)
|
rules.include(rule)
|
||||||
|
|
||||||
excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE')
|
excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE')
|
||||||
@ -243,6 +244,7 @@ def process_address_file_rules(settings, config): #rules, includes_file=None, ex
|
|||||||
|
|
||||||
def process_arg_rules(settings, config):
|
def process_arg_rules(settings, config):
|
||||||
address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT'))
|
address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT'))
|
||||||
|
settings.set('MATCH_ALL', config.true('ETHMONITOR_MATCH_ALL'))
|
||||||
settings.set('RULES', address_rules)
|
settings.set('RULES', address_rules)
|
||||||
settings = process_address_arg_rules(settings, config)
|
settings = process_address_arg_rules(settings, config)
|
||||||
settings = process_data_arg_rules(settings, config)
|
settings = process_data_arg_rules(settings, config)
|
||||||
|
17
run_tests.sh
Normal file
17
run_tests.sh
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
set -a
|
||||||
|
set -e
|
||||||
|
set -x
|
||||||
|
default_pythonpath=$PYTHONPATH:.
|
||||||
|
export PYTHONPATH=${default_pythonpath:-.}
|
||||||
|
>&2 echo using pythonpath $PYTHONPATH
|
||||||
|
for f in `ls tests/*.py`; do
|
||||||
|
python $f
|
||||||
|
done
|
||||||
|
for f in `ls tests/rules/*.py`; do
|
||||||
|
python $f
|
||||||
|
done
|
||||||
|
set +x
|
||||||
|
set +e
|
||||||
|
set +a
|
160
tests/rules/test_base.py
Normal file
160
tests/rules/test_base.py
Normal file
@ -0,0 +1,160 @@
|
|||||||
|
# standard imports
|
||||||
|
import logging
|
||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from eth_monitor.rules import *
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
class TestRule(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.alice = os.urandom(20).hex()
|
||||||
|
self.bob = os.urandom(20).hex()
|
||||||
|
self.carol = os.urandom(20).hex()
|
||||||
|
self.dave = os.urandom(20).hex()
|
||||||
|
self.x = os.urandom(20).hex()
|
||||||
|
self.y = os.urandom(20).hex()
|
||||||
|
self.hsh = os.urandom(32).hex()
|
||||||
|
|
||||||
|
|
||||||
|
def test_address_include(self):
|
||||||
|
data = b''
|
||||||
|
outs = [self.alice]
|
||||||
|
ins = []
|
||||||
|
execs = []
|
||||||
|
rule = RuleSimple(outs, ins, execs)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
|
||||||
|
outs = []
|
||||||
|
ins = [self.alice]
|
||||||
|
execs = []
|
||||||
|
rule = RuleSimple(outs, ins, execs)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
outs = []
|
||||||
|
ins = []
|
||||||
|
execs = [self.x]
|
||||||
|
rule = RuleSimple(outs, ins, execs)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
|
||||||
|
data = b'deadbeef0123456789'
|
||||||
|
data_match = [data[:8]]
|
||||||
|
rule = RuleMethod(data_match)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd' + data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
|
||||||
|
rule = RuleData(data_match)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd' + data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
|
||||||
|
def test_address_exclude(self):
|
||||||
|
data = b''
|
||||||
|
outs = [self.alice]
|
||||||
|
ins = []
|
||||||
|
execs = []
|
||||||
|
rule = RuleSimple(outs, ins, execs)
|
||||||
|
|
||||||
|
c = AddressRules()
|
||||||
|
c.exclude(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
|
||||||
|
c = AddressRules(include_by_default=True)
|
||||||
|
c.exclude(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
outs = []
|
||||||
|
ins = [self.alice]
|
||||||
|
execs = []
|
||||||
|
rule = RuleSimple(outs, ins, execs)
|
||||||
|
c = AddressRules(include_by_default=True)
|
||||||
|
c.exclude(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
|
||||||
|
outs = []
|
||||||
|
ins = []
|
||||||
|
execs = [self.x]
|
||||||
|
rule = RuleSimple(outs, ins, execs)
|
||||||
|
c = AddressRules(include_by_default=True)
|
||||||
|
c.exclude(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
data = b'deadbeef0123456789'
|
||||||
|
data_match = [data[:8]]
|
||||||
|
rule = RuleMethod(data_match)
|
||||||
|
c = AddressRules(include_by_default=True)
|
||||||
|
c.exclude(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd' + data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
rule = RuleData(data_match)
|
||||||
|
c = AddressRules(include_by_default=True)
|
||||||
|
c.exclude(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.x, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd' + data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, b'abcd', self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
|
||||||
|
def test_address_include_exclude(self):
|
||||||
|
data = b''
|
||||||
|
outs = [self.alice]
|
||||||
|
ins = []
|
||||||
|
execs = []
|
||||||
|
rule = RuleSimple(outs, ins, execs)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
c.exclude(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
90
tests/rules/test_greedy.py
Normal file
90
tests/rules/test_greedy.py
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
import logging
|
||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from eth_monitor.rules import *
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
class TestRule(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.alice = os.urandom(20).hex()
|
||||||
|
self.bob = os.urandom(20).hex()
|
||||||
|
self.carol = os.urandom(20).hex()
|
||||||
|
self.dave = os.urandom(20).hex()
|
||||||
|
self.x = os.urandom(20).hex()
|
||||||
|
self.y = os.urandom(20).hex()
|
||||||
|
self.hsh = os.urandom(32).hex()
|
||||||
|
|
||||||
|
|
||||||
|
def test_greedy_includes(self):
|
||||||
|
data = b''
|
||||||
|
outs = [self.alice]
|
||||||
|
ins = [self.carol]
|
||||||
|
execs = []
|
||||||
|
rule = RuleSimple(outs, ins, execs, match_all=True)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.carol, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.carol, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
rule = RuleSimple(outs, ins, execs)
|
||||||
|
c = AddressRules(match_all=True)
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
r = c.apply_rules_addresses(self.bob, self.carol, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.carol, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
|
||||||
|
def test_greedy_data(self):
|
||||||
|
data = os.urandom(128).hex()
|
||||||
|
data_match_one = data[4:8]
|
||||||
|
data_match_two = data[32:42]
|
||||||
|
data_match_fail = os.urandom(64).hex()
|
||||||
|
data_match = [data_match_one]
|
||||||
|
|
||||||
|
rule = RuleData(data_match, match_all=True)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
data_match = [data_match_two]
|
||||||
|
rule = RuleData(data_match, match_all=True)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
data_match = [data_match_two, data_match_one]
|
||||||
|
rule = RuleData(data_match, match_all=True)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertTrue(r)
|
||||||
|
|
||||||
|
data_match = [data_match_two, data_match_fail, data_match_one]
|
||||||
|
rule = RuleData(data_match, match_all=True)
|
||||||
|
c = AddressRules()
|
||||||
|
c.include(rule)
|
||||||
|
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
||||||
|
self.assertFalse(r)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
@ -1,40 +0,0 @@
|
|||||||
# standard imports
|
|
||||||
import logging
|
|
||||||
import unittest
|
|
||||||
import os
|
|
||||||
|
|
||||||
# local imports
|
|
||||||
from eth_monitor.rules import *
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
logg = logging.getLogger()
|
|
||||||
|
|
||||||
|
|
||||||
class TestRule(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.alice = os.urandom(20).hex()
|
|
||||||
self.bob = os.urandom(20).hex()
|
|
||||||
self.carol = os.urandom(20).hex()
|
|
||||||
self.dave = os.urandom(20).hex()
|
|
||||||
self.x = os.urandom(20).hex()
|
|
||||||
self.y = os.urandom(20).hex()
|
|
||||||
self.hsh = os.urandom(32).hex()
|
|
||||||
|
|
||||||
|
|
||||||
def test_address_include(self):
|
|
||||||
outs = [self.alice]
|
|
||||||
ins = []
|
|
||||||
execs = []
|
|
||||||
rule = RuleSimple(outs, ins, execs)
|
|
||||||
c = AddressRules()
|
|
||||||
c.include(rule)
|
|
||||||
data = b''
|
|
||||||
r = c.apply_rules_addresses(self.alice, self.bob, data, self.hsh)
|
|
||||||
self.assertTrue(r)
|
|
||||||
r = c.apply_rules_addresses(self.bob, self.alice, data, self.hsh)
|
|
||||||
self.assertFalse(r)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
Loading…
Reference in New Issue
Block a user