commit 819b07f1730a4c1e631635887811248aeffea02b Author: nolash Date: Sun Jun 13 11:09:48 2021 +0200 Initial commit diff --git a/eth_cache/account.py b/eth_cache/account.py new file mode 100644 index 0000000..d6a96f7 --- /dev/null +++ b/eth_cache/account.py @@ -0,0 +1,45 @@ +# external imports +from hexathon import strip_0x + + +class AccountRegistry: + + def __init__(self): + self.senders = {} + self.recipients = {} + + + def __normalize_address(self, address): + return bytes.fromhex(strip_0x(address)) + + def add(self, address, label): + self.add_sender(address, label) + self.add_recipient(address, label) + + + def add_sender(self, address, label): + a = self.__normalize_address(address) + self.senders[a] = label + + + def add_recipient(self, address, label): + a = self.__normalize_address(address) + self.recipients[a] = label + + + def have(self, address): + if self.get_sender(address) != None: + return True + if self.get_recipient(address) != None: + return True + return False + + + def get_sender(self, address): + a = self.__normalize_address(address) + return self.senders.get(a) + + + def get_recipient(self, address): + a = self.__normalize_address(address) + return self.recipients.get(a) diff --git a/eth_cache/filter.py b/eth_cache/filter.py new file mode 100644 index 0000000..e889a8d --- /dev/null +++ b/eth_cache/filter.py @@ -0,0 +1,7 @@ +class GasFilter: + + def __init__(self, store): + + + def filter(self, conn, block, tx, session): + self.store.put() diff --git a/eth_cache/store.py b/eth_cache/store.py new file mode 100644 index 0000000..5a14760 --- /dev/null +++ b/eth_cache/store.py @@ -0,0 +1,65 @@ +# standard imports +import logging +import os + +# external imports +from hexdir import HexDir +from chainlib.eth.tx import pack +from hexathon import strip_0x + +logg = logging.getLogger(__name__) + + +class PointerHexDir(HexDir): + + def __init__(self, root_path, key_length, levels=2, prefix_length=0): + super(PointerHexDir, self).__init__(root_path, key_length, levels, prefix_length) + self.pointers = {} + + + def register_pointer(self, label, dir_name=None): + if dir_name == None: + dir_name = label + pointer_dir = os.path.join(self.path, dir_name) + os.makedirs(pointer_dir, exist_ok=True) + + label_file = os.path.join(pointer_dir, '.label') + try: + os.stat(label_file) + except FileNotFoundError: + f = open(label_file, 'w') + f.write(label) + f.close() + + self.pointers[label] = pointer_dir + + + def add_pointer(self, pointer, pointer_relpath, destination_path): + if isinstance(pointer_relpath, list): + link_path = os.path.join(self.pointers[pointer], *pointer_relpath) + else: + link_path = os.path.join(self.pointers[pointer], pointer_relpath) + os.makedirs(os.path.dirname(link_path), exist_ok=True) + os.symlink(destination_path, link_path) + logg.debug('added link {} -> {}'.format(link_path, destination_path)) + + + def add(self, key, content, prefix=b'', pointers={}): + (c, entry_path) = super(PointerHexDir, self).add(key, content, prefix=prefix) + for k in pointers.keys(): + self.add_pointer(k, pointers[k], entry_path) + + +class TxFileStore: + + def __init__(self, chain_spec, backend): + self.backend = backend + self.chain_spec = chain_spec + + + def put(self, block, tx, addresses, attrs={}): + for address in addresses: + tx_src = tx.as_dict() + tx_raw = pack(tx_src, self.chain_spec) + filename = '{}_{}_{}'.format(block.number, tx.index, strip_0x(tx.hash)) + self.backend.add(bytes.fromhex(tx.hash), tx_raw, pointers={'address': [strip_0x(address),filename]}) diff --git a/examples/dump.py b/examples/dump.py new file mode 100644 index 0000000..4d7d9fd --- /dev/null +++ b/examples/dump.py @@ -0,0 +1,117 @@ +# standard imports +import tempfile +import os +import logging +import sys + +# external imports +from chainlib.chain import ChainSpec +from chainlib.eth.connection import EthHTTPConnection +from chainlib.eth.block import ( + block_by_number, + block_by_hash, + block_latest, + Block, + ) +from chainlib.eth.tx import ( + Tx, + transaction, + receipt, + ) +from chainlib.interface import ChainInterface +from chainsyncer.backend.memory import MemBackend +from chainsyncer.driver.thread import ThreadedHistorySyncer + +# local imports +from eth_cache.account import AccountRegistry +from eth_cache.store import TxFileStore +from eth_cache.store import PointerHexDir + +logging.basicConfig(level=logging.INFO) +logg = logging.getLogger() +logging.getLogger('eth_cache.store').setLevel(logging.DEBUG) +logging.getLogger('chainsyncer.driver.thread').setLevel(logging.DEBUG) +#logging.getLogger('chainsyncer.backend.memory').setLevel(logging.DEBUG) + +root_dir = tempfile.mkdtemp(dir=os.path.join('/tmp/ethsync')) +data_dir = os.path.join(root_dir, 'store') +logg.info('using data dir {}'.format(data_dir)) + +chain_interface = ChainInterface() +chain_interface.set('block_by_number', block_by_number) +chain_interface.set('block_by_hash', block_by_hash) +chain_interface.set('block_latest', block_latest) +chain_interface.set('block_from_src', Block.from_src) +chain_interface.set('tx_from_src', Tx.from_src) +chain_interface.set('tx_by_hash', transaction) +chain_interface.set('tx_receipt', receipt) +chain_interface.set('src_normalize', Tx.src_normalize) + +chain_spec = ChainSpec('evm', 'ethereum', 1) +backend = PointerHexDir(data_dir, 32) +backend.register_pointer('address') +store = TxFileStore(chain_spec, backend) + +rpc = EthHTTPConnection('http://localhost:8545') + +#start = 8534365 +start = 12423900 + +o = block_latest() +r = rpc.do(o) +stop = int(r, 16) +stop = start + 200 + +syncer_backend = MemBackend(chain_spec, None, target_block=stop) +syncer_backend.set(start, 0) + +#o = block_by_number(12423955, include_tx=False) +#r = rpc.do(o) +##o = block_by_hash(r) +##r = rpc.do(o) +#block = Block(r) +# +#tx_hash = block.txs[308] +#logg.debug('tx_ahsh {}'.format(tx_hash)) +#o = transaction(tx_hash) +#tx_src = rpc.do(o) +#o = receipt(tx_hash) +#rcpt = rpc.do(o) +#tx = Tx(tx_src, block=block) + +account_registry = AccountRegistry() +account_registry.add('0x6bd8cb96bbc58a73d5360301b7791457bc93da24', 'money') + +class StoreFilter: + + def __init__(self, store, registry): + self.registry = registry + self.store = store + + + def filter(self, conn, block, tx, session=None): + addresses = [] + if account_registry.have(tx.inputs[0]): + addresses.append(tx.inputs[0]) + if account_registry.have(tx.outputs[0]): + addresses.append(tx.outputs[0]) + store.put(block, tx, addresses) + + +class MonitorFilter: + + def __init__(self, name='sync'): + self.name = name + + + def filter(self, rpc, block, tx, session=None): + s = '{} sync block {} tx {}/{}'.format(self.name, block.number, tx.index, len(block.txs)) + sys.stdout.write('{:<100s}\r'.format(s)) + + +fltr = StoreFilter(store, account_registry) + +syncer = ThreadedHistorySyncer(10, syncer_backend, chain_interface) +syncer.add_filter(MonitorFilter()) +syncer.add_filter(fltr) +syncer.loop(0, rpc)