Separate ranges calculation from backend creation
This commit is contained in:
		
							parent
							
								
									a49c152e24
								
							
						
					
					
						commit
						0e37914991
					
				@ -30,7 +30,6 @@ class BlockPollSyncer(Syncer):
 | 
			
		||||
        :rtype: tuple
 | 
			
		||||
        :returns: See chainsyncer.backend.base.Backend.get
 | 
			
		||||
        """
 | 
			
		||||
        raise ValueError()
 | 
			
		||||
        (pair, fltr) = self.backend.get()
 | 
			
		||||
        start_tx = pair[1]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -11,58 +11,72 @@ from .threadpool import ThreadPoolTask
 | 
			
		||||
logg = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def range_to_backends(chain_spec, block_offset, tx_offset, block_target, flags, flags_count, backend_class, backend_count):
 | 
			
		||||
#def range_to_backends(chain_spec, block_offset, tx_offset, block_target, flags, flags_count, backend_class, backend_count):
 | 
			
		||||
def sync_split(block_offset, block_target, count):
 | 
			
		||||
    block_count = block_target - block_offset
 | 
			
		||||
    if block_count < backend_count:
 | 
			
		||||
    if block_count < count:
 | 
			
		||||
        logg.warning('block count is less than thread count, adjusting thread count to {}'.format(block_count))
 | 
			
		||||
        backend_count = block_count
 | 
			
		||||
    blocks_per_thread = int(block_count / backend_count)
 | 
			
		||||
        count = block_count
 | 
			
		||||
    blocks_per_thread = int(block_count / count)
 | 
			
		||||
 | 
			
		||||
    backends = []
 | 
			
		||||
    for i in range(backend_count):
 | 
			
		||||
    #backends = []
 | 
			
		||||
    #for i in range(backend_count):
 | 
			
		||||
    ranges = []
 | 
			
		||||
    for i in range(count):
 | 
			
		||||
        block_target = block_offset + blocks_per_thread
 | 
			
		||||
        backend = backend_class.custom(chain_spec, block_target - 1, block_offset=block_offset, tx_offset=tx_offset, flags=flags, flags_count=flags_count)
 | 
			
		||||
        backends.append(backend)
 | 
			
		||||
        #backend = backend_class.custom(chain_spec, block_target - 1, block_offset=block_offset, tx_offset=tx_offset, flags=flags, flags_count=flags_count)
 | 
			
		||||
        offset = block_offset
 | 
			
		||||
        target = block_target -1
 | 
			
		||||
        ranges.append((offset, target,))
 | 
			
		||||
        block_offset = block_target
 | 
			
		||||
        tx_offset = 0
 | 
			
		||||
        flags = 0
 | 
			
		||||
#        tx_offset = 0
 | 
			
		||||
#        flags = 0
 | 
			
		||||
 | 
			
		||||
    return backends
 | 
			
		||||
#    return backends
 | 
			
		||||
    return ranges
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ThreadPoolRangeTask:
 | 
			
		||||
    
 | 
			
		||||
    loop_func = None
 | 
			
		||||
 | 
			
		||||
    def __init__(self, backend, conn):
 | 
			
		||||
        pass
 | 
			
		||||
    def __init__(self, backend, sync_range, conn_factory, chain_interface, syncer_factory=HistorySyncer):
 | 
			
		||||
        backend_start = backend.start()
 | 
			
		||||
        backend_target = backend.target()
 | 
			
		||||
        backend_class = backend.__class__
 | 
			
		||||
        tx_offset = 0
 | 
			
		||||
        flags = 0
 | 
			
		||||
        if sync_range[0] == backend_start[0][0]:
 | 
			
		||||
            tx_offset = backend_start[0][1]
 | 
			
		||||
            flags = backend_start[1]
 | 
			
		||||
        self.backend = backend_class.custom(backend.chain_spec, sync_range[1], block_offset=sync_range[0], tx_offset=tx_offset, flags=flags, flags_count=0)
 | 
			
		||||
        self.syncer = syncer_factory(self.backend, chain_interface)
 | 
			
		||||
        self.conn_factory = conn_factory
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def foo(self, a, b):
 | 
			
		||||
        return self.loop_func()
 | 
			
		||||
    def start_loop(self, interval):
 | 
			
		||||
        conn = self.conn_factory()
 | 
			
		||||
        return self.syncer.loop(interval, conn)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ThreadPoolRangeHistorySyncer(HistorySyncer):
 | 
			
		||||
class ThreadPoolRangeHistorySyncer:
 | 
			
		||||
 | 
			
		||||
    def __init__(self, conn_factory, thread_count, backends, chain_interface, loop_func=HistorySyncer.loop, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None):
 | 
			
		||||
        if thread_count > len(backends):
 | 
			
		||||
            raise ValueError('thread count {} is greater than than backend count {}'.format(thread_count, len(backends)))
 | 
			
		||||
        self.backends = backends
 | 
			
		||||
    def __init__(self, conn_factory, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None):
 | 
			
		||||
        self.src_backend = backend
 | 
			
		||||
        self.thread_count = thread_count
 | 
			
		||||
        self.conn_factory = conn_factory
 | 
			
		||||
        self.single_sync_offset = 0
 | 
			
		||||
        self.runlevel_callback = None
 | 
			
		||||
 | 
			
		||||
        ThreadPoolRangeTask.loop_func = loop_func
 | 
			
		||||
        backend_start = backend.start()
 | 
			
		||||
        backend_target = backend.target()
 | 
			
		||||
        self.ranges = sync_split(backend_start[0][0], backend_target[0], thread_count)
 | 
			
		||||
        self.chain_interface = chain_interface
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def loop(self, interval, conn):
 | 
			
		||||
        super_loop = super(ThreadPoolRangeHistorySyncer, self).loop
 | 
			
		||||
        self.worker_pool = multiprocessing.Pool(processes=self.thread_count)
 | 
			
		||||
        for backend in self.backends:
 | 
			
		||||
        for sync_range in self.ranges:
 | 
			
		||||
            conn = self.conn_factory()
 | 
			
		||||
            task = ThreadPoolRangeTask(backend, conn)
 | 
			
		||||
            t = self.worker_pool.apply_async(task.foo, (backend, conn,))
 | 
			
		||||
            task = ThreadPoolRangeTask(self.src_backend, sync_range, self.conn_factory, self.chain_interface)
 | 
			
		||||
            t = self.worker_pool.apply_async(task.start_loop, (0.1,))
 | 
			
		||||
            print(t.get())
 | 
			
		||||
        self.worker_pool.close()
 | 
			
		||||
        self.worker_pool.join()
 | 
			
		||||
 | 
			
		||||
@ -9,6 +9,7 @@ import os
 | 
			
		||||
from chainlib.chain import ChainSpec
 | 
			
		||||
from chainlib.interface import ChainInterface
 | 
			
		||||
from chainlib.eth.tx import receipt
 | 
			
		||||
from chainlib.eth.block import block_by_number
 | 
			
		||||
 | 
			
		||||
# local imports
 | 
			
		||||
from chainsyncer.db import dsn_from_config
 | 
			
		||||
@ -26,6 +27,7 @@ class EthChainInterface(ChainInterface):
 | 
			
		||||
    
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self._tx_receipt = receipt
 | 
			
		||||
        self._block_by_number = block_by_number
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestBase(unittest.TestCase):
 | 
			
		||||
 | 
			
		||||
@ -8,7 +8,8 @@ from chainlib.chain import ChainSpec
 | 
			
		||||
# local imports
 | 
			
		||||
from chainsyncer.backend.memory import MemBackend
 | 
			
		||||
from chainsyncer.driver.threadrange import (
 | 
			
		||||
        range_to_backends,
 | 
			
		||||
#        range_to_backends,
 | 
			
		||||
        sync_split,
 | 
			
		||||
        ThreadPoolRangeHistorySyncer,
 | 
			
		||||
        )
 | 
			
		||||
from chainsyncer.unittest.base import MockConn
 | 
			
		||||
@ -22,34 +23,48 @@ logg = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
class TestThreadRange(TestBase):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def test_range_split_even(self):
 | 
			
		||||
        chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
 | 
			
		||||
        backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3)
 | 
			
		||||
        self.assertEqual(len(backends), 3)
 | 
			
		||||
        self.assertEqual(((5, 3), 5), backends[0].start())
 | 
			
		||||
        self.assertEqual((9, 1023), backends[0].target())
 | 
			
		||||
        self.assertEqual(((10, 0), 0), backends[1].start())
 | 
			
		||||
        self.assertEqual((14, 1023), backends[1].target())
 | 
			
		||||
        self.assertEqual(((15, 0), 0), backends[2].start())
 | 
			
		||||
        self.assertEqual((19, 1023), backends[2].target())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def test_range_split_underflow(self):
 | 
			
		||||
        chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
 | 
			
		||||
        backends = range_to_backends(chain_spec, 5, 3, 7, 5, 10, MemBackend, 3)
 | 
			
		||||
        self.assertEqual(len(backends), 2)
 | 
			
		||||
        self.assertEqual(((5, 3), 5), backends[0].start())
 | 
			
		||||
        self.assertEqual((5, 1023), backends[0].target())
 | 
			
		||||
        self.assertEqual(((6, 0), 0), backends[1].start())
 | 
			
		||||
        self.assertEqual((6, 1023), backends[1].target())
 | 
			
		||||
        ranges = sync_split(5, 20, 3)
 | 
			
		||||
        self.assertEqual(len(ranges), 3)
 | 
			
		||||
        self.assertEqual(ranges[0], (5, 9))
 | 
			
		||||
        self.assertEqual(ranges[1], (10, 14))
 | 
			
		||||
        self.assertEqual(ranges[2], (15, 19))
 | 
			
		||||
 | 
			
		||||
#    def test_range_split_even(self):
 | 
			
		||||
#        chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
 | 
			
		||||
#        backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3)
 | 
			
		||||
#        self.assertEqual(len(backends), 3)
 | 
			
		||||
#        self.assertEqual(((5, 3), 5), backends[0].start())
 | 
			
		||||
#        self.assertEqual((9, 1023), backends[0].target())
 | 
			
		||||
#        self.assertEqual(((10, 0), 0), backends[1].start())
 | 
			
		||||
#        self.assertEqual((14, 1023), backends[1].target())
 | 
			
		||||
#        self.assertEqual(((15, 0), 0), backends[2].start())
 | 
			
		||||
#        self.assertEqual((19, 1023), backends[2].target())
 | 
			
		||||
#
 | 
			
		||||
#
 | 
			
		||||
#    def test_range_split_underflow(self):
 | 
			
		||||
#        chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
 | 
			
		||||
#        backends = range_to_backends(chain_spec, 5, 3, 7, 5, 10, MemBackend, 3)
 | 
			
		||||
#        self.assertEqual(len(backends), 2)
 | 
			
		||||
#        self.assertEqual(((5, 3), 5), backends[0].start())
 | 
			
		||||
#        self.assertEqual((5, 1023), backends[0].target())
 | 
			
		||||
#        self.assertEqual(((6, 0), 0), backends[1].start())
 | 
			
		||||
#        self.assertEqual((6, 1023), backends[1].target())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#    def test_range_syncer(self):
 | 
			
		||||
#        chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
 | 
			
		||||
#        backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3)
 | 
			
		||||
#
 | 
			
		||||
#        syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backends, self.interface)
 | 
			
		||||
#        syncer.loop(1, None)
 | 
			
		||||
#
 | 
			
		||||
    def test_range_syncer(self):
 | 
			
		||||
        chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
 | 
			
		||||
        backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3)
 | 
			
		||||
 | 
			
		||||
        syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backends, self.interface)
 | 
			
		||||
        syncer.loop(1, None)
 | 
			
		||||
        backend = MemBackend.custom(chain_spec, 20, 5, 3, 5, 10)
 | 
			
		||||
        syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backend, self.interface)
 | 
			
		||||
        syncer.loop(0.1, None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user