From 0e379149914a4d65842547337d63cb353189a004 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 27 Sep 2021 21:37:48 +0200 Subject: [PATCH] Separate ranges calculation from backend creation --- chainsyncer/driver/poll.py | 1 - chainsyncer/driver/threadrange.py | 70 ++++++++++++++++++------------- tests/chainsyncer_base.py | 2 + tests/test_thread_range.py | 63 +++++++++++++++++----------- 4 files changed, 83 insertions(+), 53 deletions(-) diff --git a/chainsyncer/driver/poll.py b/chainsyncer/driver/poll.py index 1cbbf11..43bf4b1 100644 --- a/chainsyncer/driver/poll.py +++ b/chainsyncer/driver/poll.py @@ -30,7 +30,6 @@ class BlockPollSyncer(Syncer): :rtype: tuple :returns: See chainsyncer.backend.base.Backend.get """ - raise ValueError() (pair, fltr) = self.backend.get() start_tx = pair[1] diff --git a/chainsyncer/driver/threadrange.py b/chainsyncer/driver/threadrange.py index d69adc4..89a6b86 100644 --- a/chainsyncer/driver/threadrange.py +++ b/chainsyncer/driver/threadrange.py @@ -11,58 +11,72 @@ from .threadpool import ThreadPoolTask logg = logging.getLogger() -def range_to_backends(chain_spec, block_offset, tx_offset, block_target, flags, flags_count, backend_class, backend_count): +#def range_to_backends(chain_spec, block_offset, tx_offset, block_target, flags, flags_count, backend_class, backend_count): +def sync_split(block_offset, block_target, count): block_count = block_target - block_offset - if block_count < backend_count: + if block_count < count: logg.warning('block count is less than thread count, adjusting thread count to {}'.format(block_count)) - backend_count = block_count - blocks_per_thread = int(block_count / backend_count) + count = block_count + blocks_per_thread = int(block_count / count) - backends = [] - for i in range(backend_count): + #backends = [] + #for i in range(backend_count): + ranges = [] + for i in range(count): block_target = block_offset + blocks_per_thread - backend = backend_class.custom(chain_spec, block_target - 1, block_offset=block_offset, tx_offset=tx_offset, flags=flags, flags_count=flags_count) - backends.append(backend) + #backend = backend_class.custom(chain_spec, block_target - 1, block_offset=block_offset, tx_offset=tx_offset, flags=flags, flags_count=flags_count) + offset = block_offset + target = block_target -1 + ranges.append((offset, target,)) block_offset = block_target - tx_offset = 0 - flags = 0 +# tx_offset = 0 +# flags = 0 - return backends +# return backends + return ranges class ThreadPoolRangeTask: - - loop_func = None - def __init__(self, backend, conn): - pass + def __init__(self, backend, sync_range, conn_factory, chain_interface, syncer_factory=HistorySyncer): + backend_start = backend.start() + backend_target = backend.target() + backend_class = backend.__class__ + tx_offset = 0 + flags = 0 + if sync_range[0] == backend_start[0][0]: + tx_offset = backend_start[0][1] + flags = backend_start[1] + self.backend = backend_class.custom(backend.chain_spec, sync_range[1], block_offset=sync_range[0], tx_offset=tx_offset, flags=flags, flags_count=0) + self.syncer = syncer_factory(self.backend, chain_interface) + self.conn_factory = conn_factory - def foo(self, a, b): - return self.loop_func() + def start_loop(self, interval): + conn = self.conn_factory() + return self.syncer.loop(interval, conn) -class ThreadPoolRangeHistorySyncer(HistorySyncer): +class ThreadPoolRangeHistorySyncer: - def __init__(self, conn_factory, thread_count, backends, chain_interface, loop_func=HistorySyncer.loop, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None): - if thread_count > len(backends): - raise ValueError('thread count {} is greater than than backend count {}'.format(thread_count, len(backends))) - self.backends = backends + def __init__(self, conn_factory, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None): + self.src_backend = backend self.thread_count = thread_count self.conn_factory = conn_factory self.single_sync_offset = 0 self.runlevel_callback = None - - ThreadPoolRangeTask.loop_func = loop_func + backend_start = backend.start() + backend_target = backend.target() + self.ranges = sync_split(backend_start[0][0], backend_target[0], thread_count) + self.chain_interface = chain_interface def loop(self, interval, conn): - super_loop = super(ThreadPoolRangeHistorySyncer, self).loop self.worker_pool = multiprocessing.Pool(processes=self.thread_count) - for backend in self.backends: + for sync_range in self.ranges: conn = self.conn_factory() - task = ThreadPoolRangeTask(backend, conn) - t = self.worker_pool.apply_async(task.foo, (backend, conn,)) + task = ThreadPoolRangeTask(self.src_backend, sync_range, self.conn_factory, self.chain_interface) + t = self.worker_pool.apply_async(task.start_loop, (0.1,)) print(t.get()) self.worker_pool.close() self.worker_pool.join() diff --git a/tests/chainsyncer_base.py b/tests/chainsyncer_base.py index 34ceab7..c34cfe0 100644 --- a/tests/chainsyncer_base.py +++ b/tests/chainsyncer_base.py @@ -9,6 +9,7 @@ import os from chainlib.chain import ChainSpec from chainlib.interface import ChainInterface from chainlib.eth.tx import receipt +from chainlib.eth.block import block_by_number # local imports from chainsyncer.db import dsn_from_config @@ -26,6 +27,7 @@ class EthChainInterface(ChainInterface): def __init__(self): self._tx_receipt = receipt + self._block_by_number = block_by_number class TestBase(unittest.TestCase): diff --git a/tests/test_thread_range.py b/tests/test_thread_range.py index 5a57646..404e779 100644 --- a/tests/test_thread_range.py +++ b/tests/test_thread_range.py @@ -8,7 +8,8 @@ from chainlib.chain import ChainSpec # local imports from chainsyncer.backend.memory import MemBackend from chainsyncer.driver.threadrange import ( - range_to_backends, +# range_to_backends, + sync_split, ThreadPoolRangeHistorySyncer, ) from chainsyncer.unittest.base import MockConn @@ -22,34 +23,48 @@ logg = logging.getLogger() class TestThreadRange(TestBase): + def test_range_split_even(self): - chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') - backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3) - self.assertEqual(len(backends), 3) - self.assertEqual(((5, 3), 5), backends[0].start()) - self.assertEqual((9, 1023), backends[0].target()) - self.assertEqual(((10, 0), 0), backends[1].start()) - self.assertEqual((14, 1023), backends[1].target()) - self.assertEqual(((15, 0), 0), backends[2].start()) - self.assertEqual((19, 1023), backends[2].target()) - - - def test_range_split_underflow(self): - chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') - backends = range_to_backends(chain_spec, 5, 3, 7, 5, 10, MemBackend, 3) - self.assertEqual(len(backends), 2) - self.assertEqual(((5, 3), 5), backends[0].start()) - self.assertEqual((5, 1023), backends[0].target()) - self.assertEqual(((6, 0), 0), backends[1].start()) - self.assertEqual((6, 1023), backends[1].target()) + ranges = sync_split(5, 20, 3) + self.assertEqual(len(ranges), 3) + self.assertEqual(ranges[0], (5, 9)) + self.assertEqual(ranges[1], (10, 14)) + self.assertEqual(ranges[2], (15, 19)) + +# def test_range_split_even(self): +# chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') +# backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3) +# self.assertEqual(len(backends), 3) +# self.assertEqual(((5, 3), 5), backends[0].start()) +# self.assertEqual((9, 1023), backends[0].target()) +# self.assertEqual(((10, 0), 0), backends[1].start()) +# self.assertEqual((14, 1023), backends[1].target()) +# self.assertEqual(((15, 0), 0), backends[2].start()) +# self.assertEqual((19, 1023), backends[2].target()) +# +# +# def test_range_split_underflow(self): +# chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') +# backends = range_to_backends(chain_spec, 5, 3, 7, 5, 10, MemBackend, 3) +# self.assertEqual(len(backends), 2) +# self.assertEqual(((5, 3), 5), backends[0].start()) +# self.assertEqual((5, 1023), backends[0].target()) +# self.assertEqual(((6, 0), 0), backends[1].start()) +# self.assertEqual((6, 1023), backends[1].target()) +# def test_range_syncer(self): +# chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') +# backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3) +# +# syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backends, self.interface) +# syncer.loop(1, None) +# def test_range_syncer(self): chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') - backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3) - - syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backends, self.interface) - syncer.loop(1, None) + backend = MemBackend.custom(chain_spec, 20, 5, 3, 5, 10) + syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backend, self.interface) + syncer.loop(0.1, None) if __name__ == '__main__':