diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py index bd1da3c..7bbb136 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver.py @@ -1,6 +1,7 @@ # standard imports import logging import time +import signal # local imports from chainsyncer.error import ( @@ -17,8 +18,18 @@ NS_DIV = 1000000000 class SyncDriver: running_global = True + """If set to false syncer will terminate polling loop.""" + yield_delay=0.005 + """Delay between each processed block.""" + signal_request = [signal.SIGINT, signal.SIGTERM] + """Signals to catch to request shutdown.""" + signal_set = False + """Whether shutdown signal has been received.""" + name = 'base' + """Syncer name, to be overriden for each extended implementation.""" - def __init__(self, conn, store, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None): + + def __init__(self, store, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None): self.store = store self.running = True self.pre_callback = pre_callback @@ -27,7 +38,7 @@ class SyncDriver: self.idle_callback = idle_callback self.last_start = 0 self.clock_id = time.CLOCK_MONOTONIC_RAW - self.session = SyncSession(self.store) + self.store.start(offset=offset, target=target) def __sig_terminate(self, sig, frame): @@ -43,15 +54,16 @@ class SyncDriver: self.running = False - def run(self): + def run(self, conn): while self.running_global: - item = self.store.next_item() + self.session = SyncSession(self.store) + item = self.session.start() logg.debug('item {}'.format(item)) if item == None: self.running = False self.running_global = False break - self.loop(item) + self.loop(conn, item) def idle(self, interval): @@ -80,20 +92,18 @@ class SyncDriver: time.sleep(interval) - def loop(self, item): + def loop(self, conn, item): + tx_start = item.tx_cursor while self.running and SyncDriver.running_global: self.last_start = time.clock_gettime_ns(self.clock_id) if self.pre_callback != None: self.pre_callback() while True and self.running: - if start_tx > 0: - start_tx -= 1 - continue try: - block = self.get(conn) + block = self.get(conn, item) except SyncDone as e: logg.info('all blocks sumitted for processing: {}'.format(e)) - return self.backend.get() + return except NoBlockForYou as e: break if self.block_callback != None: @@ -101,21 +111,22 @@ class SyncDriver: last_block = block try: - self.process(conn, block) + self.process(conn, item, block, tx_start) except IndexError: - self.backend.set(block.number + 1, 0) - start_tx = 0 + item.next(advance_block=True) + tx_start = 0 time.sleep(self.yield_delay) if self.post_callback != None: self.post_callback() self.idle(interval) + def process_single(self, conn, block, tx): self.session.filter(conn, block, tx) - def process(self, conn, block): + def process(self, conn, block, tx_start): raise NotImplementedError() diff --git a/chainsyncer/session.py b/chainsyncer/session.py index bc5bc7f..44d8b0b 100644 --- a/chainsyncer/session.py +++ b/chainsyncer/session.py @@ -16,17 +16,11 @@ class SyncSession: self.item = None self.filters = self.session_store.filters - -# def register(self, fltr): -# if self.started: -# raise RuntimeError('filters cannot be changed after syncer start') -# self.session_store.register(fltr) -# self.filters.append(fltr) - def start(self, offset=0, target=-1): self.session_store.start(offset=offset, target=target) self.item = self.session_store.next_item() + return self.item def filter(self, conn, block, tx): @@ -43,4 +37,5 @@ class SyncSession: raise BackendError('filter state inconsitent with filter list') except FilterDone: self.item.reset() + self.next() self.session_store.disconnect() diff --git a/chainsyncer/state/base.py b/chainsyncer/state/base.py index 708e3a4..58fd4d5 100644 --- a/chainsyncer/state/base.py +++ b/chainsyncer/state/base.py @@ -47,8 +47,7 @@ class SyncState: s = fltr.common_name() self.state_store.add(s) n = self.state_store.from_name(s) - logg.debug('add {} {} {}'.format(s, n, self)) - + logg.debug('add filter {} {} {}'.format(s, n, self)) def sum(self): diff --git a/chainsyncer/store/fs.py b/chainsyncer/store/fs.py index 96fab2b..e64315d 100644 --- a/chainsyncer/store/fs.py +++ b/chainsyncer/store/fs.py @@ -33,7 +33,8 @@ class SyncFsItem: if started: match_state = self.sync_state.SYNC v = self.sync_state.get(self.state_key) - self.cursor = int.from_bytes(v, 'big') + self.cursor = int.from_bytes(v[:4], 'big') + self.tx_cursor = int.from_bytes(v[4:], 'big') if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid: raise LockError(s) @@ -60,15 +61,26 @@ class SyncFsItem: raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key)) self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) + + def next(self, advance_block=False): v = self.sync_state.get(self.state_key) - block_number = int.from_bytes(v, 'big') - block_number += 1 - if self.target >= 0 and block_number > self.target: - raise SyncDone(self.target) + + block_number = int.from_bytes(v[:4], 'big') + tx_index = int.from_bytes(v[4:], 'big') + if advance_block: + block_number += 1 + tx_index = 0 + if self.target >= 0 and block_number > self.target: + raise SyncDone(self.target) + else: + tx_index += 1 + + self.cursor = block_number + self.tx_cursor = tx_index v = block_number.to_bytes(4, 'big') self.sync_state.replace(self.state_key, v) - + def advance(self): if self.skip_filter: @@ -175,7 +187,6 @@ class SyncFsStore: def __load(self, target): - self.state.sync(self.state.NEW) self.state.sync(self.state.SYNC) @@ -220,13 +231,18 @@ class SyncFsStore: def start(self, offset=0, target=-1): + if self.started: + return + self.__load(target) if self.first: block_number = offset - block_number_bytes = block_number.to_bytes(4, 'big') + state_bytes = block_number.to_bytes(4, 'big') + tx_index = 0 + state_bytes += tx_index.to_bytes(4, 'big') block_number_str = str(block_number) - self.state.put(block_number_str, block_number_bytes) + self.state.put(block_number_str, state_bytes) self.filter_state.put(block_number_str) o = SyncFsItem(block_number, target, self.state, self.filter_state) self.items[block_number] = o @@ -239,10 +255,12 @@ class SyncFsStore: def stop(self): - if self.target == 0: - block_number = self.height + 1 - block_number_bytes = block_number.to_bytes(4, 'big') - self.state.put(str(block_number), block_number_bytes) +# if self.target == 0: +# block_number = self.height + 1 +# block_number_bytes = block_number.to_bytes(4, 'big') +# block_number_bytes = block_number.to_bytes(4, 'big') +# self.state.put(str(block_number), block_number_bytes) + pass def get(self, k): diff --git a/chainsyncer/unittest/base.py b/chainsyncer/unittest/base.py index d665192..d924834 100644 --- a/chainsyncer/unittest/base.py +++ b/chainsyncer/unittest/base.py @@ -10,6 +10,7 @@ from shep.state import State # local imports #from chainsyncer.driver.history import HistorySyncer from chainsyncer.error import NoBlockForYou +from chainsyncer.driver import SyncDriver logg = logging.getLogger().getChild(__name__) @@ -105,6 +106,29 @@ class MockFilter: return self.brk +class MockDriver(SyncDriver): + + def __init__(self, store, offset=0, target=-1): + super(MockDriver, self).__init__(store, offset=offset, target=target) + self.blocks = {} + + + def add_block(self, block): + self.blocks[block.number] = block + + + def get(self, conn, item): + return self.blocks[item.cursor] + + + def process(self, conn, item, block, tx_start): + i = tx_start + while True: + tx = block.tx(i) + self.process_single(conn, block, tx) + item.next() + i += 1 + #class TestSyncer(HistorySyncer): # """Unittest extension of history syncer driver. diff --git a/tests/test_fs.py b/tests/test_fs.py index e3de05a..75e064c 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -183,8 +183,9 @@ class TestFs(unittest.TestCase): o.release() with self.assertRaises(FilterDone): o.advance() + o.reset() with self.assertRaises(SyncDone): - o.reset() + o.next(advance_block=True) if __name__ == '__main__': diff --git a/tests/test_session.py b/tests/test_session.py index 2e48cfd..cfb91dd 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -20,6 +20,7 @@ from chainsyncer.unittest import ( MockConn, MockTx, MockBlock, + MockDriver, ) from chainsyncer.driver import SyncDriver @@ -53,15 +54,31 @@ class TestFilter(unittest.TestCase): tx_hash = os.urandom(32).hex() tx = MockTx(42, tx_hash) block = MockBlock(1, [tx_hash]) - with self.assertRaises(SyncDone): - session.filter(self.conn, block, tx) + session.filter(self.conn, block, tx) self.assertEqual(len(fltr_one.contents), 2) - def test_driver(self): - drv = SyncDriver(self.conn, self.store) - drv.run() + drv = MockDriver(self.store, target=1) + + tx_hash = os.urandom(32).hex() + tx = MockTx(0, tx_hash) + block = MockBlock(0, [tx_hash]) + drv.add_block(block) + + tx_hash_one = os.urandom(32).hex() + tx = MockTx(0, tx_hash_one) + tx_hash_two = os.urandom(32).hex() + tx = MockTx(1, tx_hash_two) + block = MockBlock(1, [tx_hash_one, tx_hash_two]) + drv.add_block(block) + + fltr_one = MockFilter('foo') + self.store.register(fltr_one) + with self.assertRaises(SyncDone): + drv.run(self.conn) + + self.assertEqual(len(fltr_one.contents), 3) if __name__ == '__main__':