From 9aded5c56154e1c0caa1988cadec6e623f19ed9d Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 20 Apr 2022 12:55:43 +0000 Subject: [PATCH] Factor out target state --- chainsyncer/store/base.py | 49 +++++++++++++++++++++++++++++++++++++ chainsyncer/store/fs.py | 51 +++++++-------------------------------- 2 files changed, 58 insertions(+), 42 deletions(-) diff --git a/chainsyncer/store/base.py b/chainsyncer/store/base.py index 5c3379b..536e0ac 100644 --- a/chainsyncer/store/base.py +++ b/chainsyncer/store/base.py @@ -154,6 +154,7 @@ class SyncStore: self.items = {} self.item_keys = [] self.started = False + self.thresholds = [] def setup_sync_state(self, factory, event_callback): @@ -168,6 +169,14 @@ class SyncStore: self.filters = [] + def set_target(self, v): + pass + + + def get_target(self): + return None + + def register(self, fltr): self.filters.append(fltr) self.filter_state.register(fltr) @@ -211,6 +220,46 @@ class SyncStore: logg.debug('item {}'.format(self.state.state(item.state_key))) + def load(self, target): + self.state.sync(self.state.NEW) + self.state.sync(self.state.SYNC) + + thresholds_sync = [] + for v in self.state.list(self.state.SYNC): + block_number = int(v) + thresholds_sync.append(block_number) + logg.debug('queue resume {}'.format(block_number)) + thresholds_new = [] + for v in self.state.list(self.state.NEW): + block_number = int(v) + thresholds_new.append(block_number) + logg.debug('queue new range {}'.format(block_number)) + + thresholds_sync.sort() + thresholds_new.sort() + thresholds = thresholds_sync + thresholds_new + lim = len(thresholds) - 1 + + for i in range(len(thresholds)): + item_target = target + if i < lim: + item_target = thresholds[i+1] + o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True) + self.items[block_number] = o + self.item_keys.append(block_number) + logg.info('added existing {}'.format(o)) + + self.get_target() + + if len(thresholds) == 0: + if self.target != None: + logg.warning('sync "{}" is already done, nothing to do'.format(self.session_id)) + else: + logg.info('syncer first run target {}'.format(target)) + self.first = True + self.set_target(target) + + def get(self, k): return self.items[k] diff --git a/chainsyncer/store/fs.py b/chainsyncer/store/fs.py index 6c06668..dee8a75 100644 --- a/chainsyncer/store/fs.py +++ b/chainsyncer/store/fs.py @@ -66,54 +66,21 @@ class SyncFsStore(SyncStore): pass - def load(self, target): - self.state.sync(self.state.NEW) - self.state.sync(self.state.SYNC) - - thresholds_sync = [] - for v in self.state.list(self.state.SYNC): - block_number = int(v) - thresholds_sync.append(block_number) - logg.debug('queue resume {}'.format(block_number)) - thresholds_new = [] - for v in self.state.list(self.state.NEW): - block_number = int(v) - thresholds_new.append(block_number) - logg.debug('queue new range {}'.format(block_number)) - - thresholds_sync.sort() - thresholds_new.sort() - thresholds = thresholds_sync + thresholds_new - lim = len(thresholds) - 1 - - logg.debug('thresholds {}'.format(thresholds)) - for i in range(len(thresholds)): - item_target = target - if i < lim: - item_target = thresholds[i+1] - o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True) - self.items[block_number] = o - self.item_keys.append(block_number) - logg.info('added existing {}'.format(o)) - + def get_target(self): fp = os.path.join(self.session_path, 'target') - have_target = False try: f = open(fp, 'r') v = f.read() f.close() self.target = int(v) - have_target = True except FileNotFoundError as e: + logg.debug('cant find target {} {}'.format(fp, e)) pass - if len(thresholds) == 0: - if have_target: - logg.warning('sync "{}" is already done, nothing to do'.format(self.session_id)) - else: - logg.info('syncer first run target {}'.format(target)) - self.first = True - f = open(fp, 'w') - f.write(str(target)) - f.close() - self.target = target + + def set_target(self, v): + fp = os.path.join(self.session_path, 'target') + f = open(fp, 'w') + f.write(str(v)) + f.close() + self.target = v