Add cli tx dumper
This commit is contained in:
@@ -21,6 +21,7 @@ from chainlib.eth.tx import (
|
||||
from chainlib.interface import ChainInterface
|
||||
from chainsyncer.backend.memory import MemBackend
|
||||
from chainsyncer.driver.threadpool import ThreadPoolHistorySyncer
|
||||
from chainsyncer.filter import SyncFilter
|
||||
|
||||
# local imports
|
||||
from eth_cache.account import AccountRegistry
|
||||
@@ -30,8 +31,9 @@ from eth_cache.store import PointerHexDir
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logg = logging.getLogger()
|
||||
logging.getLogger('eth_cache.store').setLevel(logging.DEBUG)
|
||||
#logging.getLogger('chainsyncer.filter').setLevel(logging.DEBUG)
|
||||
logging.getLogger('chainsyncer.driver.threadpool').setLevel(logging.DEBUG)
|
||||
logging.getLogger('chainsyncer.driver.head').setLevel(logging.DEBUG)
|
||||
#logging.getLogger('chainsyncer.driver.head').setLevel(logging.DEBUG)
|
||||
#logging.getLogger('chainsyncer.backend.memory').setLevel(logging.DEBUG)
|
||||
|
||||
root_dir = tempfile.mkdtemp(dir=os.path.join('/tmp/ethsync'))
|
||||
@@ -62,10 +64,10 @@ rpc = conn_factory()
|
||||
start = 12423900
|
||||
#start = 0
|
||||
|
||||
o = block_latest()
|
||||
r = rpc.do(o)
|
||||
stop = int(r, 16)
|
||||
stop = start + 3
|
||||
#o = block_latest()
|
||||
#r = rpc.do(o)
|
||||
#stop = int(r, 16)
|
||||
stop = start + 10
|
||||
|
||||
syncer_backend = MemBackend(chain_spec, None, target_block=stop)
|
||||
syncer_backend.set(start, 0)
|
||||
@@ -87,6 +89,29 @@ syncer_backend.set(start, 0)
|
||||
account_registry = AccountRegistry()
|
||||
account_registry.add('0x6bd8cb96bbc58a73d5360301b7791457bc93da24', 'money')
|
||||
|
||||
|
||||
def apply_one_async(fltr, idx, conn, block, tx, session):
|
||||
logg.error('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply async')
|
||||
fltr.filter(conn, block, tx, session)
|
||||
return (block.number, tx.index,)
|
||||
|
||||
|
||||
|
||||
|
||||
class ThreadedSyncFilter(SyncFilter):
|
||||
|
||||
def __init__(self, pool, backend):
|
||||
super(ThreadedSyncFilter, self).__init__(backend)
|
||||
self.pool = pool
|
||||
|
||||
|
||||
def apply_one(self, fltr, idx, conn, block, tx, session):
|
||||
logg.error('applyone {} {}'.format(fltr, self.pool))
|
||||
#self.pool.apply_async(apply_one_async, (fltr, idx, conn, block, tx, session, self.backend.complete_filter,))
|
||||
self.pool.apply(fltr, (idx, conn, block, tx, session,), {}, self.backend.complete_filter)
|
||||
logg.debug('applyone end')
|
||||
|
||||
|
||||
class StoreFilter:
|
||||
|
||||
def __init__(self, store, registry):
|
||||
@@ -110,6 +135,8 @@ class MonitorFilter:
|
||||
|
||||
|
||||
def filter(self, rpc, block, tx, session=None):
|
||||
logg.error('foozzzzzzzzzz')
|
||||
logg.debug('foo')
|
||||
if tx == None:
|
||||
s = '{} sync block error in tx lookup ({})'.format(self.name, block.number, len(block.txs))
|
||||
else:
|
||||
@@ -124,4 +151,5 @@ if __name__ == '__main__':
|
||||
syncer = ThreadPoolHistorySyncer(conn_factory, 2, syncer_backend, chain_interface)
|
||||
syncer.add_filter(MonitorFilter())
|
||||
syncer.add_filter(fltr)
|
||||
syncer.filter = ThreadedSyncFilter(syncer.worker_pool, syncer_backend)
|
||||
syncer.loop(0, rpc)
|
||||
|
||||
Reference in New Issue
Block a user