WIP safe access to unlocking sync with tool

This commit is contained in:
lash 2022-04-28 08:15:04 +00:00
parent ca82ea247f
commit ca1441d50d
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 130 additions and 40 deletions

View File

@ -142,11 +142,3 @@ class SyncDriver:
def get(self, conn): def get(self, conn):
raise NotImplementedError() raise NotImplementedError()
def save_filter_list(self):
raise NotImplementedError()
def load_filter_list(self):
raise NotImplementedError()

View File

@ -44,6 +44,8 @@ class FilterState:
self.state = self.state_store.state self.state = self.state_store.state
self.put = self.state_store.put self.put = self.state_store.put
self.mask = self.state_store.mask
self.name = self.state_store.name
self.set = self.state_store.set self.set = self.state_store.set
self.next = self.state_store.next self.next = self.state_store.next
self.move = self.state_store.move self.move = self.state_store.move

View File

@ -14,11 +14,31 @@ from shep.persist import PersistedState
import chainsyncer.cli import chainsyncer.cli
from chainsyncer.settings import ChainsyncerSettings from chainsyncer.settings import ChainsyncerSettings
from chainsyncer.store import SyncStore from chainsyncer.store import SyncStore
from chainsyncer.filter import FilterState from chainsyncer.filter import (
FilterState,
SyncFilter,
)
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
valid_fwd = [
'fwd',
'forward',
'next',
'continue',
]
valid_rwd = [
'rwd',
'rewind',
'current',
'back',
'repeat',
'replay',
]
action_is_forward = False
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
argparser = chainlib.cli.ArgumentParser(arg_flags) argparser = chainlib.cli.ArgumentParser(arg_flags)
@ -30,6 +50,13 @@ chainsyncer.cli.process_flags(argparser, sync_flags)
args = argparser.parse_args() args = argparser.parse_args()
if args.action in valid_fwd:
action_is_forward = True
elif args.action not in valid_rwd:
sys.stderr.write('action argument must be one of {} or {}\n'.format(valid_rwd, valid_fwd))
sys.exit(1)
base_config_dir = chainsyncer.cli.config_dir, base_config_dir = chainsyncer.cli.config_dir,
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir) config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
config = chainsyncer.cli.process_config(config, args, sync_flags) config = chainsyncer.cli.process_config(config, args, sync_flags)
@ -41,29 +68,81 @@ settings.process_sync_backend(config)
logg.debug('settings:\n{}'.format(str(settings))) logg.debug('settings:\n{}'.format(str(settings)))
class FilterInNameOnly(SyncFilter):
def __init__(self, k):
self.k = k
def common_name(self):
return self.k
def main(): def main():
if settings.get('SYNCER_BACKEND') == 'mem': if settings.get('SYNCER_BACKEND') == 'mem':
raise ValueError('cannot unlock volatile state store') raise ValueError('cannot unlock volatile state store')
if settings.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('shep.store.file')
syncer_store_class = getattr(syncer_store_module, 'SimpleFileStoreFactory')
elif settings.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('shep.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'RocksdbStoreFactory')
else:
raise NotImplementedError('cannot use backend: {}'.format(config.get('SYNCER_BACKEND')))
state_dir = config.get('_STATE_DIR') state_dir = config.get('_STATE_DIR')
factory = syncer_store_class(state_dir) if config.get('SYNCER_BACKEND') == 'fs':
store = SyncStore(state_dir) syncer_store_module = importlib.import_module('chainsyncer.store.fs')
store.setup_filter_state(factory=factory) syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
store = syncer_store_class(state_dir)
filter_list = store.load_filter_list()
for i, k in enumerate(filter_list):
fltr = FilterInNameOnly(k)
store.register(fltr)
filter_list[i] = k.upper()
store.connect() store.connect()
store.filter_state.scan() store.start(ignore_lock=True)
locked_state = store.filter_state.list(store.filter_state.from_name('RESET'))
print(locked_state) lock_state = store.filter_state.from_name('LOCK')
locked_item = store.filter_state.list(lock_state)
if len(locked_item) == 0:
sys.stderr.write('Sync filter in {} is not locked\n'.format(state_dir))
sys.exit(1)
elif len(locked_item) > 1:
sys.stderr.write('More than one locked item encountered in {}. That should never happen, so I do not know what to do next.\n'.format(state_dir))
sys.exit(1)
locked_item_key = locked_item[0]
locked_item = store.get(int(locked_item_key))
locked_state = store.filter_state.state(locked_item_key) - lock_state
locked_state_name = store.filter_state.name(locked_state)
logg.info('found item "{}" in locked state {}'.format(locked_item, store.filter_state.name(locked_state)))
if action_is_forward:
k = locked_state_name
filter_index = None
filter_index = filter_list.index(k)
filter_pos = filter_index + 1
filter_count = len(filter_list)
logg.debug('Locked filter {} found at position {} of {}'.format(k, filter_pos, filter_count))
if filter_pos == filter_count:
logg.info('Locked filter {} is the last filter in the list. Executing filter reset'.format(k))
locked_item.reset(check_incomplete=False)
else:
locked_item.advance(ignore_lock=True)
store.filter_state.unset(locked_item_key, lock_state)
#next_filter = filter_list[filter_pos]
#next_state = store.filter_state.from_name(next_filter)
#store.filter_state.move(next_state)
else:
filter_mask = 0xf
filter_state = store.filter_state.mask(locked_state, filter_mask)
logg.info('Chosen action is "{}": will continue execution at previous filter {}'.format(args.action, store.filter_state.name(filter_state)))
store.filter_state.unset(locked_item_key, lock_state)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -35,7 +35,7 @@ def sync_state_deserialize(b):
# NOT thread safe # NOT thread safe
class SyncItem: class SyncItem:
def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_invalid=False): def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_lock=False):
self.offset = offset self.offset = offset
self.target = target self.target = target
self.sync_state = sync_state self.sync_state = sync_state
@ -47,7 +47,7 @@ class SyncItem:
(self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v) (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid: if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0 and not ignore_lock:
raise LockError(self.state_key) raise LockError(self.state_key)
self.count = len(self.filter_state.all(pure=True)) - 4 self.count = len(self.filter_state.all(pure=True)) - 4
@ -65,7 +65,8 @@ class SyncItem:
raise FilterDone(self.state_key) raise FilterDone(self.state_key)
def reset(self): def reset(self, check_incomplete=True):
if check_incomplete:
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
raise LockError('reset attempt on {} when state locked'.format(self.state_key)) raise LockError('reset attempt on {} when state locked'.format(self.state_key))
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0: if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
@ -102,12 +103,15 @@ class SyncItem:
v = self.filter_state.state(self.state_key) v = self.filter_state.state(self.state_key)
def advance(self): def advance(self, ignore_lock=False):
if self.skip_filter: if self.skip_filter:
raise FilterDone() raise FilterDone()
self.__check_done() self.__check_done()
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
if ignore_lock:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
else:
raise LockError('advance attempt on {} when state locked'.format(self.state_key)) raise LockError('advance attempt on {} when state locked'.format(self.state_key))
done = False done = False
try: try:
@ -192,20 +196,20 @@ class SyncStore:
self.filter_state.register(fltr) self.filter_state.register(fltr)
def start(self, offset=0, target=-1): def start(self, offset=0, target=-1, ignore_lock=False):
if self.started: if self.started:
return return
self.save_filter_list() self.save_filter_list()
self.load(target) self.load(target, ignore_lock=ignore_lock)
if self.first: if self.first:
state_bytes = sync_state_serialize(offset, 0, target) state_bytes = sync_state_serialize(offset, 0, target)
block_number_str = str(offset) block_number_str = str(offset)
self.state.put(block_number_str, contents=state_bytes) self.state.put(block_number_str, contents=state_bytes)
self.filter_state.put(block_number_str) self.filter_state.put(block_number_str)
o = SyncItem(offset, target, self.state, self.filter_state) o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock)
self.items[offset] = o self.items[offset] = o
self.item_keys.append(offset) self.item_keys.append(offset)
elif offset > 0: elif offset > 0:
@ -230,7 +234,7 @@ class SyncStore:
self.state.put(str(item.cursor), contents=state_bytes) self.state.put(str(item.cursor), contents=state_bytes)
def load(self, target): def load(self, target, ignore_lock=False):
self.state.sync(self.state.NEW) self.state.sync(self.state.NEW)
self.state.sync(self.state.SYNC) self.state.sync(self.state.SYNC)
@ -254,7 +258,7 @@ class SyncStore:
item_target = target item_target = target
if i < lim: if i < lim:
item_target = thresholds[i+1] item_target = thresholds[i+1]
o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True) o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock)
self.items[block_number] = o self.items[block_number] = o
self.item_keys.append(block_number) self.item_keys.append(block_number)
logg.info('added existing {}'.format(o)) logg.info('added existing {}'.format(o))
@ -271,7 +275,6 @@ class SyncStore:
def get(self, k): def get(self, k):
logg.debug('items {}'.format(self.items.keys()))
return self.items[k] return self.items[k]
@ -289,3 +292,18 @@ class SyncStore:
def disconnect(self): def disconnect(self):
self.filter_state.disconnect() self.filter_state.disconnect()
def save_filter_list(self):
raise NotImplementedError()
def load_filter_list(self):
raise NotImplementedError()
def peek_next_filter(self):
pass
def peek_current_filter(self):
pass

View File

@ -77,4 +77,3 @@ class SyncRocksDbStore(SyncStore):
v = self.target_db.get('filter_list') v = self.target_db.get('filter_list')
v = v.decode('utf-8') v = v.decode('utf-8')
return v.split(',') return v.split(',')