From ca1441d50d9fe5cd9f01981db66c80c45d84d229 Mon Sep 17 00:00:00 2001 From: lash Date: Thu, 28 Apr 2022 08:15:04 +0000 Subject: [PATCH] WIP safe access to unlocking sync with tool --- chainsyncer/driver/base.py | 8 --- chainsyncer/filter.py | 2 + chainsyncer/runnable/lock.py | 111 ++++++++++++++++++++++++++++++----- chainsyncer/store/base.py | 48 ++++++++++----- chainsyncer/store/rocksdb.py | 1 - 5 files changed, 130 insertions(+), 40 deletions(-) diff --git a/chainsyncer/driver/base.py b/chainsyncer/driver/base.py index bcb7c51..855e074 100644 --- a/chainsyncer/driver/base.py +++ b/chainsyncer/driver/base.py @@ -142,11 +142,3 @@ class SyncDriver: def get(self, conn): raise NotImplementedError() - - def save_filter_list(self): - raise NotImplementedError() - - - def load_filter_list(self): - raise NotImplementedError() - diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 8e5476f..6194f88 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -44,6 +44,8 @@ class FilterState: self.state = self.state_store.state self.put = self.state_store.put + self.mask = self.state_store.mask + self.name = self.state_store.name self.set = self.state_store.set self.next = self.state_store.next self.move = self.state_store.move diff --git a/chainsyncer/runnable/lock.py b/chainsyncer/runnable/lock.py index bea5777..10e0386 100644 --- a/chainsyncer/runnable/lock.py +++ b/chainsyncer/runnable/lock.py @@ -14,11 +14,31 @@ from shep.persist import PersistedState import chainsyncer.cli from chainsyncer.settings import ChainsyncerSettings from chainsyncer.store import SyncStore -from chainsyncer.filter import FilterState +from chainsyncer.filter import ( + FilterState, + SyncFilter, + ) logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() +valid_fwd = [ + 'fwd', + 'forward', + 'next', + 'continue', + ] + +valid_rwd = [ + 'rwd', + 'rewind', + 'current', + 'back', + 'repeat', + 'replay', + ] + +action_is_forward = False arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC argparser = chainlib.cli.ArgumentParser(arg_flags) @@ -30,6 +50,13 @@ chainsyncer.cli.process_flags(argparser, sync_flags) args = argparser.parse_args() +if args.action in valid_fwd: + action_is_forward = True +elif args.action not in valid_rwd: + sys.stderr.write('action argument must be one of {} or {}\n'.format(valid_rwd, valid_fwd)) + sys.exit(1) + + base_config_dir = chainsyncer.cli.config_dir, config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir) config = chainsyncer.cli.process_config(config, args, sync_flags) @@ -41,29 +68,81 @@ settings.process_sync_backend(config) logg.debug('settings:\n{}'.format(str(settings))) +class FilterInNameOnly(SyncFilter): + + def __init__(self, k): + self.k = k + + + def common_name(self): + return self.k + def main(): if settings.get('SYNCER_BACKEND') == 'mem': raise ValueError('cannot unlock volatile state store') - if settings.get('SYNCER_BACKEND') == 'fs': - syncer_store_module = importlib.import_module('shep.store.file') - syncer_store_class = getattr(syncer_store_module, 'SimpleFileStoreFactory') - elif settings.get('SYNCER_BACKEND') == 'rocksdb': - syncer_store_module = importlib.import_module('shep.store.rocksdb') - syncer_store_class = getattr(syncer_store_module, 'RocksdbStoreFactory') - else: - raise NotImplementedError('cannot use backend: {}'.format(config.get('SYNCER_BACKEND'))) - state_dir = config.get('_STATE_DIR') - factory = syncer_store_class(state_dir) - store = SyncStore(state_dir) - store.setup_filter_state(factory=factory) + if config.get('SYNCER_BACKEND') == 'fs': + syncer_store_module = importlib.import_module('chainsyncer.store.fs') + syncer_store_class = getattr(syncer_store_module, 'SyncFsStore') + elif config.get('SYNCER_BACKEND') == 'rocksdb': + syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb') + syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore') + else: + syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND')) + syncer_store_class = getattr(syncer_store_module, 'SyncStore') + + logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__)) + + store = syncer_store_class(state_dir) + + filter_list = store.load_filter_list() + for i, k in enumerate(filter_list): + fltr = FilterInNameOnly(k) + store.register(fltr) + filter_list[i] = k.upper() + store.connect() - store.filter_state.scan() - locked_state = store.filter_state.list(store.filter_state.from_name('RESET')) - print(locked_state) + store.start(ignore_lock=True) + + lock_state = store.filter_state.from_name('LOCK') + locked_item = store.filter_state.list(lock_state) + if len(locked_item) == 0: + sys.stderr.write('Sync filter in {} is not locked\n'.format(state_dir)) + sys.exit(1) + elif len(locked_item) > 1: + sys.stderr.write('More than one locked item encountered in {}. That should never happen, so I do not know what to do next.\n'.format(state_dir)) + sys.exit(1) + + locked_item_key = locked_item[0] + locked_item = store.get(int(locked_item_key)) + locked_state = store.filter_state.state(locked_item_key) - lock_state + locked_state_name = store.filter_state.name(locked_state) + logg.info('found item "{}" in locked state {}'.format(locked_item, store.filter_state.name(locked_state))) + + if action_is_forward: + k = locked_state_name + filter_index = None + filter_index = filter_list.index(k) + filter_pos = filter_index + 1 + filter_count = len(filter_list) + logg.debug('Locked filter {} found at position {} of {}'.format(k, filter_pos, filter_count)) + if filter_pos == filter_count: + logg.info('Locked filter {} is the last filter in the list. Executing filter reset'.format(k)) + locked_item.reset(check_incomplete=False) + else: + locked_item.advance(ignore_lock=True) + store.filter_state.unset(locked_item_key, lock_state) + #next_filter = filter_list[filter_pos] + #next_state = store.filter_state.from_name(next_filter) + #store.filter_state.move(next_state) + else: + filter_mask = 0xf + filter_state = store.filter_state.mask(locked_state, filter_mask) + logg.info('Chosen action is "{}": will continue execution at previous filter {}'.format(args.action, store.filter_state.name(filter_state))) + store.filter_state.unset(locked_item_key, lock_state) if __name__ == '__main__': diff --git a/chainsyncer/store/base.py b/chainsyncer/store/base.py index e3f01bc..a36ed37 100644 --- a/chainsyncer/store/base.py +++ b/chainsyncer/store/base.py @@ -35,7 +35,7 @@ def sync_state_deserialize(b): # NOT thread safe class SyncItem: - def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_invalid=False): + def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_lock=False): self.offset = offset self.target = target self.sync_state = sync_state @@ -47,7 +47,7 @@ class SyncItem: (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v) - if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid: + if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0 and not ignore_lock: raise LockError(self.state_key) self.count = len(self.filter_state.all(pure=True)) - 4 @@ -65,11 +65,12 @@ class SyncItem: raise FilterDone(self.state_key) - def reset(self): - if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: - raise LockError('reset attempt on {} when state locked'.format(self.state_key)) - if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0: - raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key)) + def reset(self, check_incomplete=True): + if check_incomplete: + if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: + raise LockError('reset attempt on {} when state locked'.format(self.state_key)) + if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0: + raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key)) self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) @@ -102,13 +103,16 @@ class SyncItem: v = self.filter_state.state(self.state_key) - def advance(self): + def advance(self, ignore_lock=False): if self.skip_filter: raise FilterDone() self.__check_done() if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: - raise LockError('advance attempt on {} when state locked'.format(self.state_key)) + if ignore_lock: + self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) + else: + raise LockError('advance attempt on {} when state locked'.format(self.state_key)) done = False try: self.filter_state.next(self.state_key) @@ -192,20 +196,20 @@ class SyncStore: self.filter_state.register(fltr) - def start(self, offset=0, target=-1): + def start(self, offset=0, target=-1, ignore_lock=False): if self.started: return self.save_filter_list() - self.load(target) + self.load(target, ignore_lock=ignore_lock) if self.first: state_bytes = sync_state_serialize(offset, 0, target) block_number_str = str(offset) self.state.put(block_number_str, contents=state_bytes) self.filter_state.put(block_number_str) - o = SyncItem(offset, target, self.state, self.filter_state) + o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock) self.items[offset] = o self.item_keys.append(offset) elif offset > 0: @@ -230,7 +234,7 @@ class SyncStore: self.state.put(str(item.cursor), contents=state_bytes) - def load(self, target): + def load(self, target, ignore_lock=False): self.state.sync(self.state.NEW) self.state.sync(self.state.SYNC) @@ -254,7 +258,7 @@ class SyncStore: item_target = target if i < lim: item_target = thresholds[i+1] - o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True) + o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock) self.items[block_number] = o self.item_keys.append(block_number) logg.info('added existing {}'.format(o)) @@ -271,7 +275,6 @@ class SyncStore: def get(self, k): - logg.debug('items {}'.format(self.items.keys())) return self.items[k] @@ -289,3 +292,18 @@ class SyncStore: def disconnect(self): self.filter_state.disconnect() + + + def save_filter_list(self): + raise NotImplementedError() + + + def load_filter_list(self): + raise NotImplementedError() + + + def peek_next_filter(self): + pass + + def peek_current_filter(self): + pass diff --git a/chainsyncer/store/rocksdb.py b/chainsyncer/store/rocksdb.py index 35d37ac..ba9fa22 100644 --- a/chainsyncer/store/rocksdb.py +++ b/chainsyncer/store/rocksdb.py @@ -77,4 +77,3 @@ class SyncRocksDbStore(SyncStore): v = self.target_db.get('filter_list') v = v.decode('utf-8') return v.split(',') -