diff --git a/chainsyncer/session.py b/chainsyncer/session.py index 5335c5a..d299f3a 100644 --- a/chainsyncer/session.py +++ b/chainsyncer/session.py @@ -23,6 +23,10 @@ class SyncSession: return self.item + def stop(self, item): + self.session_store.stop(item) + + def filter(self, conn, block, tx): self.session_store.connect() for fltr in self.filters: diff --git a/chainsyncer/store/fs.py b/chainsyncer/store/fs.py index 4739a42..8c5afcf 100644 --- a/chainsyncer/store/fs.py +++ b/chainsyncer/store/fs.py @@ -29,9 +29,8 @@ class SyncFsItem: self.sync_state = sync_state self.filter_state = filter_state self.state_key = str(offset) - match_state = self.sync_state.NEW - if started: - match_state = self.sync_state.SYNC + + logg.debug('get key {}'.format(self.state_key)) v = self.sync_state.get(self.state_key) self.cursor = int.from_bytes(v[:4], 'big') self.tx_cursor = int.from_bytes(v[4:], 'big') @@ -43,7 +42,7 @@ class SyncFsItem: self.skip_filter = False if self.count == 0: self.skip_filter = True - else: + elif not started: self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) @@ -123,7 +122,7 @@ class SyncFsItem: self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) return False return True - + def __str__(self): return 'syncitem offset {} target {} cursor {}'.format(self.offset, self.target, self.cursor) @@ -161,6 +160,7 @@ class SyncFsStore: if create_path: self.__create_path(base_path, default_path, session_id=session_id) + self.session_id = os.path.basename(self.session_path) logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path)) @@ -187,7 +187,6 @@ class SyncFsStore: session_id = str(uuid.uuid4()) self.session_path = os.path.join(base_path, session_id) os.makedirs(self.session_path) - self.session_id = os.path.basename(self.session_path) if self.is_default: try: @@ -222,7 +221,7 @@ class SyncFsStore: o = SyncFsItem(block_number, item_target, self.state, self.filter_state, started=True) self.items[block_number] = o self.item_keys.append(block_number) - logg.info('added {}'.format(o)) + logg.info('added existing {}'.format(o)) fp = os.path.join(self.session_path, str(target)) if len(thresholds) == 0: @@ -237,8 +236,6 @@ class SyncFsStore: f.close() self.target = int(v) - logg.debug('target {}'.format(self.target)) - def start(self, offset=0, target=-1): if self.started: @@ -257,6 +254,7 @@ class SyncFsStore: o = SyncFsItem(block_number, target, self.state, self.filter_state) self.items[block_number] = o self.item_keys.append(block_number) + logg.debug('added first {}'.format(o)) elif offset > 0: logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id)) self.started = True @@ -264,13 +262,21 @@ class SyncFsStore: self.item_keys.sort() - def stop(self): -# if self.target == 0: -# block_number = self.height + 1 -# block_number_bytes = block_number.to_bytes(4, 'big') -# block_number_bytes = block_number.to_bytes(4, 'big') -# self.state.put(str(block_number), block_number_bytes) - pass + def stop(self, item): + if item.target == -1: + state_bytes = sync_state_serialize(item.cursor, 0, item.cursor) + self.state.replace(str(item.offset), state_bytes) + self.filter_state.put(str(item.cursor)) + + SyncFsItem(item.offset, -1, self.state, self.filter_state) + logg.info('New sync state start at block number {} for next head sync backfill'.format(item.cursor)) + + self.state.move(item.state_key, self.state.DONE) + + state_bytes = sync_state_serialize(item.cursor, 0, -1) + self.state.put(str(item.cursor), state_bytes) + + logg.debug('item {}'.format(self.state.state(item.state_key))) def get(self, k): @@ -291,3 +297,17 @@ class SyncFsStore: def disconnect(self): self.filter_state.disconnect() + + +def sync_state_serialize(block_height, tx_index, block_target): + b = block_height.to_bytes(4, 'big') + b += tx_index.to_bytes(4, 'big') + b += block_target.to_bytes(4, 'big', signed=True) + return b + + +def sync_state_deserialize(b): + block_height = int.from_bytes(b[:4], 'big') + tx_index = int.from_bytes(b[4:8], 'big') + block_target = int.from_bytes(b[8:], 'big', signed=True) + return (block_height, tx_index, block_target,) diff --git a/tests/test_fs.py b/tests/test_fs.py index 75e064c..976b0dd 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -188,5 +188,22 @@ class TestFs(unittest.TestCase): o.next(advance_block=True) + def test_sync_head_future(self): + store = SyncFsStore(self.path, session_id='foo') + session = SyncSession(store) + + session.start() + logg.debug('list {} {} {}'.format(store.state.list(store.state.SYNC), store.state.list(store.state.DONE), store.state.list(store.state.NEW))) + o = session.get(0) + o.next(advance_block=True) + o.next(advance_block=True) + session.stop(o) + + logg.debug('list {} {} {}'.format(store.state.list(store.state.SYNC), store.state.list(store.state.DONE), store.state.list(store.state.NEW))) + store = SyncFsStore(self.path, session_id='foo') + store.start() + o = store.get(2) + + if __name__ == '__main__': unittest.main()