114 lines
3.2 KiB
Python
114 lines
3.2 KiB
Python
# SPDX-License-Identifier: GPL-3.0-or-later
|
||
|
||
# standard imports
|
||
import os
|
||
import logging
|
||
import sys
|
||
import importlib
|
||
|
||
# external imports
|
||
import chainlib.cli
|
||
from shep.persist import PersistedState
|
||
|
||
# local imports
|
||
import chainsyncer.cli
|
||
from chainsyncer.settings import ChainsyncerSettings
|
||
from chainsyncer.store import SyncStore
|
||
from chainsyncer.filter import (
|
||
FilterState,
|
||
SyncFilter,
|
||
)
|
||
|
||
logging.basicConfig(level=logging.WARNING)
|
||
logg = logging.getLogger()
|
||
|
||
valid_fwd = [
|
||
'fwd',
|
||
'forward',
|
||
'next',
|
||
'continue',
|
||
]
|
||
|
||
valid_rwd = [
|
||
'rwd',
|
||
'rewind',
|
||
'current',
|
||
'back',
|
||
'repeat',
|
||
'replay',
|
||
]
|
||
|
||
action_is_forward = False
|
||
|
||
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
|
||
argparser = chainlib.cli.ArgumentParser(arg_flags)
|
||
argparser.add_argument('--state-dir', type=str, dest='state_dir', help='State directory')
|
||
argparser.add_positional('action', type=str, help='Action to take on lock. Repeat means re-run the locked filter. Continue means resume execution for next filter.')
|
||
|
||
sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD
|
||
chainsyncer.cli.process_flags(argparser, sync_flags)
|
||
|
||
args = argparser.parse_args()
|
||
|
||
if args.action in valid_fwd:
|
||
action_is_forward = True
|
||
elif args.action not in valid_rwd:
|
||
sys.stderr.write('action argument must be one of {} or {}\n'.format(valid_rwd, valid_fwd))
|
||
sys.exit(1)
|
||
|
||
|
||
base_config_dir = chainsyncer.cli.config_dir,
|
||
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
|
||
config = chainsyncer.cli.process_config(config, args, sync_flags)
|
||
config.add(args.state_dir, '_STATE_DIR', False)
|
||
logg.debug('config loaded:\n{}'.format(config))
|
||
|
||
settings = ChainsyncerSettings()
|
||
settings.process_sync_backend(config)
|
||
logg.debug('settings:\n{}'.format(str(settings)))
|
||
|
||
|
||
class FilterInNameOnly(SyncFilter):
|
||
|
||
def __init__(self, k):
|
||
self.k = k
|
||
|
||
|
||
def common_name(self):
|
||
return self.k
|
||
|
||
|
||
def main():
|
||
if settings.get('SYNCER_BACKEND') == 'mem':
|
||
raise ValueError('cannot unlock volatile state store')
|
||
|
||
state_dir = config.get('_STATE_DIR')
|
||
|
||
if config.get('SYNCER_BACKEND') == 'fs':
|
||
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
|
||
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
|
||
elif config.get('SYNCER_BACKEND') == 'rocksdb':
|
||
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
|
||
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
|
||
else:
|
||
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
|
||
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
|
||
|
||
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
|
||
|
||
store = syncer_store_class(state_dir)
|
||
|
||
filter_list = store.load_filter_list()
|
||
for i, k in enumerate(filter_list):
|
||
fltr = FilterInNameOnly(k)
|
||
store.register(fltr)
|
||
filter_list[i] = k.upper()
|
||
|
||
store.connect()
|
||
store.start(ignore_lock=True)
|
||
store.unlock_filter(not action_is_forward)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|