mirror of
git://holbrook.no/eth-monitor.git
synced 2024-12-18 05:37:32 +01:00
Limit cache of tx by address to addresses within rules
This commit is contained in:
parent
8454b0eaaf
commit
2ced0e3442
@ -24,10 +24,10 @@ class EtherscanImporter:
|
|||||||
|
|
||||||
|
|
||||||
def get(self, address):
|
def get(self, address):
|
||||||
f = open('sample_import.json', 'r')
|
#f = open('sample_import.json', 'r')
|
||||||
o = json.load(f)
|
#o = json.load(f)
|
||||||
f.close()
|
#f.close()
|
||||||
#o = self.get_api(address)
|
o = self.get_api(address)
|
||||||
|
|
||||||
for v in o['result']:
|
for v in o['result']:
|
||||||
o = block_by_hash(v['blockHash'])
|
o = block_by_hash(v['blockHash'])
|
||||||
|
@ -1,3 +1,12 @@
|
|||||||
|
# standard imports
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# external imports
|
||||||
|
from chainlib.eth.address import is_same_address
|
||||||
|
|
||||||
|
logg = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
class AddressRules:
|
class AddressRules:
|
||||||
|
|
||||||
def __init__(self, include_by_default=False):
|
def __init__(self, include_by_default=False):
|
||||||
@ -17,26 +26,30 @@ class AddressRules:
|
|||||||
|
|
||||||
|
|
||||||
def apply_rules(self, tx):
|
def apply_rules(self, tx):
|
||||||
v = False
|
return self.apply_rules_addresses(tx.outputs[0], tx.inputs[0], tx.hash)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_rules_addresses(self, sender, recipient, tx_hash):
|
||||||
|
v = self.include_by_default
|
||||||
|
|
||||||
for rule in self.includes:
|
for rule in self.includes:
|
||||||
if rule[0] != None and is_same_address(tx.outputs[0], rule[0]):
|
if rule[0] != None and is_same_address(sender, rule[0]):
|
||||||
logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx.hash, tx.outputs[0]))
|
logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx_hash, sender))
|
||||||
v = True
|
v = True
|
||||||
elif rule[1] != None and is_same_address(tx.inputs[0], rule[1]):
|
elif rule[1] != None and is_same_address(recipient, rule[1]):
|
||||||
logg.debug('tx {} rule INCLUDE match in RECIPIENT {}'.format(tx.hash, tx.inputs[0]))
|
logg.debug('tx {} rule INCLUDE match in RECIPIENT {}'.format(tx_hash, recipient))
|
||||||
v = True
|
v = True
|
||||||
elif rule[2] != None and is_same_address(tx.inputs[0], rule[2]):
|
elif rule[2] != None and is_same_address(recipient, rule[2]):
|
||||||
logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0]))
|
logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx_hash, recipient))
|
||||||
v = True
|
v = True
|
||||||
for rule in self.excludes:
|
for rule in self.excludes:
|
||||||
if rule[0] != None and is_same_address(tx.outputs[0], rule[0]):
|
if rule[0] != None and is_same_address(sender, rule[0]):
|
||||||
logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx.hash, tx.outputs[0]))
|
logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx_hash, sender))
|
||||||
v = False
|
v = False
|
||||||
elif rule[1] != None and is_same_address(tx.inputs[0], rule[1]):
|
elif rule[1] != None and is_same_address(recipient, rule[1]):
|
||||||
logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0]))
|
logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx_hash, recipient))
|
||||||
v = False
|
v = False
|
||||||
elif rule[2] != None and is_same_address(tx.inputs[0], rule[2]):
|
elif rule[2] != None and is_same_address(recipient, rule[2]):
|
||||||
logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0]))
|
logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx_hash, recipient))
|
||||||
v = False
|
v = False
|
||||||
return v
|
return v
|
||||||
|
@ -3,6 +3,7 @@ import argparse
|
|||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
# external imports
|
# external imports
|
||||||
from chainlib.encode import TxHexNormalizer
|
from chainlib.encode import TxHexNormalizer
|
||||||
@ -13,19 +14,23 @@ from chainlib.chain import ChainSpec
|
|||||||
from eth_monitor.filters.cache import Filter as CacheFilter
|
from eth_monitor.filters.cache import Filter as CacheFilter
|
||||||
from eth_monitor.filters import RuledFilter
|
from eth_monitor.filters import RuledFilter
|
||||||
from eth_monitor.store.file import FileStore
|
from eth_monitor.store.file import FileStore
|
||||||
|
from eth_monitor.rules import AddressRules
|
||||||
|
|
||||||
logging.basicConfig(level=logging.WARNING)
|
logging.basicConfig(level=logging.WARNING)
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
|
|
||||||
normalize_address = TxHexNormalizer().wallet_address
|
normalize_address = TxHexNormalizer().wallet_address
|
||||||
|
|
||||||
|
|
||||||
argparser = argparse.ArgumentParser('master eth events monitor')
|
argparser = argparse.ArgumentParser('master eth events monitor')
|
||||||
argparser.add_argument('--api-key-file', dest='api_key_file', type=str, help='File to read API key from')
|
argparser.add_argument('--api-key-file', dest='api_key_file', type=str, help='File to read API key from')
|
||||||
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
|
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
|
||||||
argparser.add_argument('--include-data', dest='include_data', action='store_true', help='Include data objects')
|
argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default')
|
||||||
|
argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default')
|
||||||
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string')
|
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string')
|
||||||
argparser.add_argument('-f', '--address-file', dest='address_file', default=[], type=str, action='append', help='Add addresses from file')
|
argparser.add_argument('-f', '--address-file', dest='address_file', default=[], type=str, action='append', help='Add addresses from file')
|
||||||
argparser.add_argument('-a', '--address', default=[], type=str, action='append', help='Add address')
|
argparser.add_argument('-a', '--address', default=[], type=str, action='append', help='Add address')
|
||||||
|
argparser.add_argument('--delay', type=float, default=0.2, help='Seconds to wait between each retrieval from importer')
|
||||||
argparser.add_argument('-v', action='store_true', help='Be verbose')
|
argparser.add_argument('-v', action='store_true', help='Be verbose')
|
||||||
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
|
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
|
||||||
argparser.add_argument('-p', type=str, help='RPC provider')
|
argparser.add_argument('-p', type=str, help='RPC provider')
|
||||||
@ -81,15 +86,21 @@ def collect_addresses(addresses=[], address_files=[]):
|
|||||||
return address_collection
|
return address_collection
|
||||||
|
|
||||||
|
|
||||||
def setup_filter(chain_spec, cache_dir):
|
def setup_address_rules(addresses):
|
||||||
store = FileStore(chain_spec, cache_dir)
|
rules = AddressRules()
|
||||||
|
for address in addresses:
|
||||||
|
rules.include(sender=address, recipient=address)
|
||||||
|
return rules
|
||||||
|
|
||||||
|
|
||||||
|
def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data, address_rules):
|
||||||
|
store = FileStore(chain_spec, cache_dir, address_rules=address_rules)
|
||||||
cache_dir = os.path.realpath(cache_dir)
|
cache_dir = os.path.realpath(cache_dir)
|
||||||
if cache_dir == None:
|
if cache_dir == None:
|
||||||
import tempfile
|
import tempfile
|
||||||
cache_dir = tempfile.mkdtemp()
|
cache_dir = tempfile.mkdtemp()
|
||||||
logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir))
|
logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir))
|
||||||
include_data = bool(args.include_data)
|
RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data)
|
||||||
RuledFilter.init(store, include_tx_data=include_data, include_block_data=include_data)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -98,11 +109,28 @@ def main():
|
|||||||
|
|
||||||
from eth_monitor.importers.etherscan import EtherscanImporter
|
from eth_monitor.importers.etherscan import EtherscanImporter
|
||||||
|
|
||||||
setup_filter(chain_spec, args.cache_dir)
|
address_rules = setup_address_rules(args.address)
|
||||||
filters = [CacheFilter()]
|
|
||||||
|
setup_filter(
|
||||||
|
chain_spec,
|
||||||
|
args.cache_dir,
|
||||||
|
bool(args.store_tx_data),
|
||||||
|
bool(args.store_block_data),
|
||||||
|
address_rules,
|
||||||
|
)
|
||||||
|
|
||||||
|
cache_filter = CacheFilter(
|
||||||
|
rules_filter=address_rules,
|
||||||
|
)
|
||||||
|
|
||||||
|
filters = [
|
||||||
|
cache_filter,
|
||||||
|
]
|
||||||
|
|
||||||
importer = EtherscanImporter(rpc, api_key, filters=filters, block_callback=RuledFilter.block_callback)
|
importer = EtherscanImporter(rpc, api_key, filters=filters, block_callback=RuledFilter.block_callback)
|
||||||
for a in addresses:
|
for a in addresses:
|
||||||
importer.get(a)
|
importer.get(a)
|
||||||
|
time.sleep(args.delay)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -24,6 +24,7 @@ from eth_monitor.chain import EthChainInterface
|
|||||||
from eth_monitor.filters.cache import Filter as CacheFilter
|
from eth_monitor.filters.cache import Filter as CacheFilter
|
||||||
from eth_monitor.rules import AddressRules
|
from eth_monitor.rules import AddressRules
|
||||||
from eth_monitor.filters import RuledFilter
|
from eth_monitor.filters import RuledFilter
|
||||||
|
from eth_monitor.store.file import FileStore
|
||||||
|
|
||||||
logging.basicConfig(level=logging.WARNING)
|
logging.basicConfig(level=logging.WARNING)
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
@ -46,7 +47,9 @@ argparser.add_argument('--offset', type=int, default=0, help='Start sync on this
|
|||||||
argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids')
|
argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids')
|
||||||
argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync')
|
argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync')
|
||||||
argparser.add_argument('--includes-file', type=str, dest='includes_file', help='Load include rules from file')
|
argparser.add_argument('--includes-file', type=str, dest='includes_file', help='Load include rules from file')
|
||||||
argparser.add_argument('--include-default', action='store_true', help='Include all transactions by default')
|
argparser.add_argument('--include-default', dest='include_default', action='store_true', help='Include all transactions by default')
|
||||||
|
argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default')
|
||||||
|
argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default')
|
||||||
argparser.add_argument('--excludes-file', type=str, dest='excludes_file', help='Load exclude rules from file')
|
argparser.add_argument('--excludes-file', type=str, dest='excludes_file', help='Load exclude rules from file')
|
||||||
argparser.add_argument('-f', '--filter', type=str, action='append', help='Add python module filter path')
|
argparser.add_argument('-f', '--filter', type=str, action='append', help='Add python module filter path')
|
||||||
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
|
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
|
||||||
@ -88,7 +91,7 @@ if os.environ.get('RPC_AUTHENTICATION') == 'basic':
|
|||||||
rpc = EthHTTPConnection(args.p)
|
rpc = EthHTTPConnection(args.p)
|
||||||
|
|
||||||
|
|
||||||
def setup_address_rules(includes_file=None, excludes_file=None, include_default=False):
|
def setup_address_rules(includes_file=None, excludes_file=None, include_default=False, include_block_default=False):
|
||||||
|
|
||||||
rules = AddressRules(include_by_default=include_default)
|
rules = AddressRules(include_by_default=include_default)
|
||||||
|
|
||||||
@ -141,13 +144,14 @@ def setup_address_rules(includes_file=None, excludes_file=None, include_default=
|
|||||||
return rules
|
return rules
|
||||||
|
|
||||||
|
|
||||||
def setup_filter(chain_spec, cache_dir):
|
def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data):
|
||||||
|
store = FileStore(chain_spec, cache_dir)
|
||||||
cache_dir = os.path.realpath(cache_dir)
|
cache_dir = os.path.realpath(cache_dir)
|
||||||
if cache_dir == None:
|
if cache_dir == None:
|
||||||
import tempfile
|
import tempfile
|
||||||
cache_dir = tempfile.mkdtemp()
|
cache_dir = tempfile.mkdtemp()
|
||||||
logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir))
|
logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir))
|
||||||
RuledFilter.init(chain_spec, cache_dir)
|
RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data)
|
||||||
|
|
||||||
|
|
||||||
def setup_cache_filter(rules_filter=None):
|
def setup_cache_filter(rules_filter=None):
|
||||||
@ -201,6 +205,8 @@ if __name__ == '__main__':
|
|||||||
setup_filter(
|
setup_filter(
|
||||||
chain_spec,
|
chain_spec,
|
||||||
args.cache_dir,
|
args.cache_dir,
|
||||||
|
bool(args.store_tx_data),
|
||||||
|
bool(args.store_block_data),
|
||||||
)
|
)
|
||||||
|
|
||||||
cache_filter = setup_cache_filter(
|
cache_filter = setup_cache_filter(
|
||||||
|
@ -25,10 +25,12 @@ class FileStore:
|
|||||||
tx_hash_dirnormal = strip_0x(tx.hash).upper()
|
tx_hash_dirnormal = strip_0x(tx.hash).upper()
|
||||||
tx_hash_bytes = bytes.fromhex(tx_hash_dirnormal)
|
tx_hash_bytes = bytes.fromhex(tx_hash_dirnormal)
|
||||||
self.tx_raw_dir.add(tx_hash_bytes, raw)
|
self.tx_raw_dir.add(tx_hash_bytes, raw)
|
||||||
address = bytes.fromhex(strip_0x(tx.inputs[0]))
|
if self.address_rules != None:
|
||||||
self.address_dir.add_dir(tx_hash_dirnormal, address, b'')
|
for a in tx.outputs + tx.inputs:
|
||||||
address = bytes.fromhex(strip_0x(tx.outputs[0]))
|
if self.address_rules.apply_rules_addresses(a, a, tx.hash):
|
||||||
self.address_dir.add_dir(tx_hash_dirnormal, address, b'')
|
a = bytes.fromhex(strip_0x(a))
|
||||||
|
self.address_dir.add_dir(tx_hash_dirnormal, a, b'')
|
||||||
|
|
||||||
if include_data:
|
if include_data:
|
||||||
src = json.dumps(tx.src()).encode('utf-8')
|
src = json.dumps(tx.src()).encode('utf-8')
|
||||||
self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src)
|
self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src)
|
||||||
@ -44,7 +46,7 @@ class FileStore:
|
|||||||
self.block_src_dir.add(hash_bytes, src)
|
self.block_src_dir.add(hash_bytes, src)
|
||||||
|
|
||||||
|
|
||||||
def __init__(self, chain_spec, cache_root=base_dir):
|
def __init__(self, chain_spec, cache_root=base_dir, address_rules=None):
|
||||||
self.cache_root = os.path.join(
|
self.cache_root = os.path.join(
|
||||||
cache_root,
|
cache_root,
|
||||||
'eth_monitor',
|
'eth_monitor',
|
||||||
@ -68,3 +70,4 @@ class FileStore:
|
|||||||
self.address_path = os.path.join(self.cache_dir, 'address')
|
self.address_path = os.path.join(self.cache_dir, 'address')
|
||||||
self.address_dir = HexDir(self.address_path, 20, levels=2)
|
self.address_dir = HexDir(self.address_path, 20, levels=2)
|
||||||
self.chain_spec = chain_spec
|
self.chain_spec = chain_spec
|
||||||
|
self.address_rules = address_rules
|
||||||
|
Loading…
Reference in New Issue
Block a user