multiprocessing in synceR

This commit is contained in:
nolash 2021-06-13 13:07:58 +02:00
parent 819b07f173
commit 80a5b344f8
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 20 additions and 7 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
__pycache__
*.pyc
dist/
build/
*.egg-info/

View File

@ -31,6 +31,7 @@ logging.basicConfig(level=logging.INFO)
logg = logging.getLogger() logg = logging.getLogger()
logging.getLogger('eth_cache.store').setLevel(logging.DEBUG) logging.getLogger('eth_cache.store').setLevel(logging.DEBUG)
logging.getLogger('chainsyncer.driver.thread').setLevel(logging.DEBUG) logging.getLogger('chainsyncer.driver.thread').setLevel(logging.DEBUG)
logging.getLogger('chainsyncer.driver.head').setLevel(logging.DEBUG)
#logging.getLogger('chainsyncer.backend.memory').setLevel(logging.DEBUG) #logging.getLogger('chainsyncer.backend.memory').setLevel(logging.DEBUG)
root_dir = tempfile.mkdtemp(dir=os.path.join('/tmp/ethsync')) root_dir = tempfile.mkdtemp(dir=os.path.join('/tmp/ethsync'))
@ -52,7 +53,9 @@ backend = PointerHexDir(data_dir, 32)
backend.register_pointer('address') backend.register_pointer('address')
store = TxFileStore(chain_spec, backend) store = TxFileStore(chain_spec, backend)
rpc = EthHTTPConnection('http://localhost:8545') def conn_factory():
return EthHTTPConnection('http://localhost:8545')
rpc = conn_factory()
#start = 8534365 #start = 8534365
start = 12423900 start = 12423900
@ -60,7 +63,7 @@ start = 12423900
o = block_latest() o = block_latest()
r = rpc.do(o) r = rpc.do(o)
stop = int(r, 16) stop = int(r, 16)
stop = start + 200 stop = start + 50
syncer_backend = MemBackend(chain_spec, None, target_block=stop) syncer_backend = MemBackend(chain_spec, None, target_block=stop)
syncer_backend.set(start, 0) syncer_backend.set(start, 0)
@ -105,13 +108,18 @@ class MonitorFilter:
def filter(self, rpc, block, tx, session=None): def filter(self, rpc, block, tx, session=None):
s = '{} sync block {} tx {}/{}'.format(self.name, block.number, tx.index, len(block.txs)) if tx == None:
s = '{} sync block error in tx lookup ({})'.format(self.name, block.number, len(block.txs))
else:
s = '{} sync block {} tx {}/{}'.format(self.name, block.number, tx.index, len(block.txs))
sys.stdout.write('{:<100s}\r'.format(s)) sys.stdout.write('{:<100s}\r'.format(s))
fltr = StoreFilter(store, account_registry) fltr = StoreFilter(store, account_registry)
syncer = ThreadedHistorySyncer(10, syncer_backend, chain_interface) if __name__ == '__main__':
syncer.add_filter(MonitorFilter()) ThreadedHistorySyncer.yield_delay = 0
syncer.add_filter(fltr) syncer = ThreadedHistorySyncer(conn_factory, 50, syncer_backend, chain_interface)
syncer.loop(0, rpc) syncer.add_filter(MonitorFilter())
syncer.add_filter(fltr)
syncer.loop(0, rpc)