Compare commits

6 Commits

Author SHA1 Message Date
lash
3a2317d253 WIP add lock cli tool 2022-04-27 09:43:57 +00:00
lash
6647e11df5 Add cli args handler and settings processor 2022-04-27 05:04:13 +00:00
lash
4e7c0f0d73 Add settings renderer, cli flag and config handling 2022-04-26 19:07:34 +00:00
lash
b5617fc9fb Upgrade shep 2022-04-26 08:16:38 +00:00
lash
044e85fb99 Allow memory-only syncing 2022-04-26 07:56:04 +00:00
lash
927913bd02 Check explicit for bool in filter interrupt check 2022-04-25 06:28:42 +00:00
19 changed files with 354 additions and 34 deletions

View File

@@ -1,3 +1,12 @@
* 0.3.7
- Remove hard eth dependency in settings rendering
- Add unlock cli tool
* 0.3.6
- Add cli arg processing and settings renderer
* 0.3.5
- Allow memory-only shep if factory set to None in store constructor
* 0.3.4
- Use explicit bool check in filter interrupt check
* 0.3.3 * 0.3.3
- Include shep persistent state bootstrap sync - Include shep persistent state bootstrap sync
- Add chainsyncer extras - Add chainsyncer extras

View File

@@ -1 +1 @@
include *requirements.txt LICENSE.txt include *requirements.txt LICENSE.txt chainsyncer/data/config/*

View File

@@ -0,0 +1,12 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

14
chainsyncer/cli/arg.py Normal file
View File

@@ -0,0 +1,14 @@
# local imports
from .base import SyncFlag
def process_flags(argparser, flags):
if flags & SyncFlag.RANGE > 0:
argparser.add_argument('--offset', type=int, help='Block to start sync from. Default is start of history (0).')
argparser.add_argument('--until', type=int, default=-1, help='Block to stop sync on. Default is stop at block height of first run.')
if flags & SyncFlag.HEAD > 0:
argparser.add_argument('--head', action='store_true', help='Start from latest block as offset')
argparser.add_argument('--keep-alive', action='store_true', help='Do not stop syncing when caught up')
argparser.add_argument('--backend', type=str, help='Backend to use for state store')

7
chainsyncer/cli/base.py Normal file
View File

@@ -0,0 +1,7 @@
# standard imports
import enum
class SyncFlag(enum.IntEnum):
RANGE = 1
HEAD = 2

20
chainsyncer/cli/config.py Normal file
View File

@@ -0,0 +1,20 @@
# external imports
from chainsyncer.cli import SyncFlag
def process_config(config, args, flags):
args_override = {}
args_override['SYNCER_BACKEND'] = getattr(args, 'backend')
if flags & SyncFlag.RANGE:
args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
args_override['SYNCER_LIMIT'] = getattr(args, 'until')
config.dict_override(args_override, 'local cli args')
if flags & SyncFlag.HEAD:
config.add(getattr(args, 'keep_alive'), '_KEEP_ALIVE')
config.add(getattr(args, 'head'), '_HEAD')
return config

View File

@@ -0,0 +1,4 @@
[syncer]
offset = 0
limit = 0
backend = mem

1
chainsyncer/paths.py Normal file
View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,66 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import sys
import importlib
# external imports
import chainlib.cli
# local imports
import chainsyncer.cli
from chainsyncer.settings import ChainsyncerSettings
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--state-dir', type=str, dest='state_dir', help='State directory')
argparser.add_argument('--session-id', type=str, dest='session_id', help='Session id for state')
argparser.add_argument('--action', type=str, choices=['repeat', 'continue'], help='Action to take on lock. Repeat means re-run the locked filter. Continue means resume execution for next filter.')
argparser.add_positional('tx', type=str, help='Transaction hash to unlock')
sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD
chainsyncer.cli.process_flags(argparser, sync_flags)
args = argparser.parse_args()
base_config_dir = chainsyncer.cli.config_dir,
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
config = chainsyncer.cli.process_config(config, args, sync_flags)
config.add(args.state_dir, '_STATE_DIR', False)
config.add(args.session_id, '_SESSION_ID', False)
logg.debug('config loaded:\n{}'.format(config))
settings = ChainsyncerSettings()
settings.process_sync_backend(config)
def main():
state_dir = None
if settings.get('SYNCER_BACKEND') == 'mem':
raise ValueError('cannot unlock volatile state store')
if settings.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif settings.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(settings.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
state_dir = os.path.join(config.get('_STATE_DIR'), settings.get('SYNCER_BACKEND'))
sync_path = os.path.join(config.get('_SESSION_ID'), 'sync', 'filter')
sync_store = syncer_store_class(state_dir, session_id=sync_path)
logg.info('session is {}'.format(sync_store.session_id))
if __name__ == '__main__':
main()

View File

@@ -1,9 +1,12 @@
# standard imports # standard imports
import uuid import uuid
import logging
# local imports # local imports
from chainsyncer.error import FilterDone from chainsyncer.error import FilterDone
logg = logging.getLogger(__name__)
class SyncSession: class SyncSession:
@@ -29,6 +32,7 @@ class SyncSession:
def filter(self, conn, block, tx): def filter(self, conn, block, tx):
self.session_store.connect() self.session_store.connect()
for fltr in self.filters: for fltr in self.filters:
logg.debug('executing filter {}'.format(fltr))
self.item.advance() self.item.advance()
interrupt = fltr.filter(conn, block, tx) interrupt = fltr.filter(conn, block, tx)
if not self.item.release(interrupt=interrupt): if not self.item.release(interrupt=interrupt):

59
chainsyncer/settings.py Normal file
View File

@@ -0,0 +1,59 @@
# standard imports
import logging
# external imports
from hexathon import (
to_int as hex_to_int,
strip_0x,
)
logg = logging.getLogger(__name__)
class ChainsyncerSettings:
def __init__(self):
self.o = {}
self.get = self.o.get
def process_sync_backend(self, config):
self.o['SYNCER_BACKEND'] = config.get('SYNCER_BACKEND')
def process_sync_range(self, config):
o = self.o['SYNCER_INTERFACE'].block_latest()
r = self.o['RPC'].do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
keep_alive = False
session_block_offset = 0
block_limit = 0
until = 0
if config.true('_HEAD'):
self.o['SYNCER_OFFSET'] = block_offset
self.o['SYNCER_LIMIT'] = -1
return
session_block_offset = int(config.get('SYNCER_OFFSET'))
until = int(config.get('SYNCER_LIMIT'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
elif until == -1:
keep_alive = True
if session_block_offset == -1:
session_block_offset = block_offset
elif config.true('_KEEP_ALIVE'):
block_limit = -1
else:
if block_limit == 0:
block_limit = block_offset
self.o['SYNCER_OFFSET'] = session_block_offset
self.o['SYNCER_LIMIT'] = block_limit

View File

@@ -4,6 +4,7 @@ import logging
# local imports # local imports
from shep.persist import PersistedState from shep.persist import PersistedState
from shep import State
from shep.error import StateInvalid from shep.error import StateInvalid
from chainsyncer.filter import FilterState from chainsyncer.filter import FilterState
from chainsyncer.error import ( from chainsyncer.error import (
@@ -121,7 +122,7 @@ class SyncItem:
def release(self, interrupt=False): def release(self, interrupt=False):
if self.skip_filter: if self.skip_filter:
return False return False
if interrupt: if interrupt == True:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT')) self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
@@ -168,15 +169,22 @@ class SyncStore:
self.session_path = os.path.realpath(given_path) self.session_path = os.path.realpath(given_path)
def setup_sync_state(self, factory, event_callback): def setup_sync_state(self, factory=None, event_callback=None):
self.state = PersistedState(factory.add, 2, event_callback=event_callback) if factory == None:
self.state = State(2, event_callback=event_callback)
else:
self.state = PersistedState(factory.add, 2, event_callback=event_callback)
self.state.add('SYNC') self.state.add('SYNC')
self.state.add('DONE') self.state.add('DONE')
def setup_filter_state(self, factory, event_callback): def setup_filter_state(self, factory=None, event_callback=None):
filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback) if factory == None:
self.filter_state = FilterState(filter_state_backend, scan=factory.ls) filter_state_backend = State(0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend)
else:
filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend, scan=factory.ls)
self.filters = [] self.filters = []
@@ -202,7 +210,7 @@ class SyncStore:
if self.first: if self.first:
state_bytes = sync_state_serialize(offset, 0, target) state_bytes = sync_state_serialize(offset, 0, target)
block_number_str = str(offset) block_number_str = str(offset)
self.state.put(block_number_str, state_bytes) self.state.put(block_number_str, contents=state_bytes)
self.filter_state.put(block_number_str) self.filter_state.put(block_number_str)
o = SyncItem(offset, target, self.state, self.filter_state) o = SyncItem(offset, target, self.state, self.filter_state)
self.items[offset] = o self.items[offset] = o
@@ -226,7 +234,7 @@ class SyncStore:
self.state.move(item.state_key, self.state.DONE) self.state.move(item.state_key, self.state.DONE)
state_bytes = sync_state_serialize(item.cursor, 0, -1) state_bytes = sync_state_serialize(item.cursor, 0, -1)
self.state.put(str(item.cursor), state_bytes) self.state.put(str(item.cursor), contents=state_bytes)
def load(self, target): def load(self, target):

View File

@@ -7,10 +7,7 @@ import logging
from shep.store.file import SimpleFileStoreFactory from shep.store.file import SimpleFileStoreFactory
# local imports # local imports
from chainsyncer.store import ( from chainsyncer.store import SyncStore
SyncItem,
SyncStore,
)
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@@ -36,9 +33,13 @@ class SyncFsStore(SyncStore):
factory = SimpleFileStoreFactory(base_sync_path, binary=True) factory = SimpleFileStoreFactory(base_sync_path, binary=True)
self.setup_sync_state(factory, state_event_callback) self.setup_sync_state(factory, state_event_callback)
self.setup_filter_state(callback=filter_state_event_callback)
def setup_filter_state(self, callback=None):
base_filter_path = os.path.join(self.session_path, 'filter') base_filter_path = os.path.join(self.session_path, 'filter')
factory = SimpleFileStoreFactory(base_filter_path, binary=True) factory = SimpleFileStoreFactory(base_filter_path, binary=True)
self.setup_filter_state(factory, filter_state_event_callback) super(SyncFsStore, self).setup_filter_state(factory, callback)
def __create_path(self, base_path, default_path, session_id=None): def __create_path(self, base_path, default_path, session_id=None):

40
chainsyncer/store/mem.py Normal file
View File

@@ -0,0 +1,40 @@
# standard imports
import logging
import os
# external imports
from shep import State
# local imports
from chainsyncer.store import SyncStore
logg = logging.getLogger(__name__)
class SyncMemStore(SyncStore):
def __init__(self, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncMemStore, self).__init__('/dev/null', session_id=session_id)
self.session_id = os.path.basename(self.session_path)
logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
factory = None
self.setup_sync_state(factory, state_event_callback)
factory = None
self.setup_filter_state(factory, filter_state_event_callback)
def set_target(self, v):
self.target = int(v)
def get_target(self):
return self.target
def stop(self, item):
if item != None:
super(SyncRocksDbStore, self).stop(item)
logg.info('I am an in-memory only state store. I am shutting down now, so all state will now be discarded.')

View File

@@ -19,7 +19,17 @@ from chainsyncer.unittest import (
MockItem, MockItem,
) )
logging.STATETRACE = 5
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
logg.setLevel(logging.STATETRACE)
def state_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
def filter_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
class TestStoreBase(unittest.TestCase): class TestStoreBase(unittest.TestCase):
@@ -27,6 +37,7 @@ class TestStoreBase(unittest.TestCase):
def setUp(self): def setUp(self):
self.path = tempfile.mkdtemp() self.path = tempfile.mkdtemp()
self.store_factory = None self.store_factory = None
self.persist = True
@classmethod @classmethod
@@ -58,14 +69,28 @@ class TestStoreBase(unittest.TestCase):
fp = os.path.join(self.path, store.session_id) fp = os.path.join(self.path, store.session_id)
session_id = store.session_id session_id = store.session_id
st = os.stat(fp) st = None
self.assertTrue(stat.S_ISDIR(st.st_mode)) try:
self.assertTrue(store.is_default) st = os.stat(fp)
except FileNotFoundError as e:
logg.warning('error {} persist {}'.format(e, self.persist))
if self.persist:
raise e
if st != None:
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertTrue(store.is_default)
fpd = os.path.join(self.path, 'default') fpd = os.path.join(self.path, 'default')
st = os.stat(fpd) try:
self.assertTrue(stat.S_ISDIR(st.st_mode)) st = os.stat(fpd)
self.assertTrue(store.is_default) except FileNotFoundError as e:
logg.warning('error {} persist {}'.format(e, self.persist))
if self.persist:
raise e
if st != None:
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertTrue(store.is_default)
fpd = os.path.realpath(fpd) fpd = os.path.realpath(fpd)
self.assertEqual(fpd, fp) self.assertEqual(fpd, fp)
@@ -85,9 +110,15 @@ class TestStoreBase(unittest.TestCase):
store.stop(bogus_item) store.stop(bogus_item)
store = self.store_factory('foo') store = self.store_factory('foo')
fpf = os.path.join(self.path, 'foo') fpf = os.path.join(self.path, 'foo')
st = os.stat(fpf) try:
self.assertTrue(stat.S_ISDIR(st.st_mode)) st = os.stat(fpf)
self.assertFalse(store.is_default) except FileNotFoundError as e:
logg.warning('error {} persist {}'.format(e, self.persist))
if self.persist:
raise e
if st != None:
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertFalse(store.is_default)
def t_store_start(self): def t_store_start(self):
@@ -97,9 +128,11 @@ class TestStoreBase(unittest.TestCase):
self.assertTrue(store.first) self.assertTrue(store.first)
store.stop(bogus_item) store.stop(bogus_item)
store = self.store_factory()
store.start() if self.persist:
self.assertFalse(store.first) store = self.store_factory()
store.start()
self.assertFalse(store.first)
def t_store_resume(self): def t_store_resume(self):
@@ -226,12 +259,16 @@ class TestStoreBase(unittest.TestCase):
o.next(advance_block=True) o.next(advance_block=True)
session.stop(o) session.stop(o)
store = self.store_factory('foo') if self.persist:
store.start() store = self.store_factory('foo')
o = store.get(2) store.start()
o = store.get(2)
def t_sync_history_interrupted(self): def t_sync_history_interrupted(self):
if not self.persist:
return
bogus_item = MockItem(0, 0, 0, 0) bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory('foo') store = self.store_factory('foo')
session = SyncSession(store) session = SyncSession(store)

View File

@@ -2,4 +2,4 @@ confini~=0.6.0
semver==2.13.0 semver==2.13.0
hexathon~=0.1.5 hexathon~=0.1.5
chainlib>=0.1.0b1,<0.2.0 chainlib>=0.1.0b1,<0.2.0
shep~=0.2.2 shep~=0.2.3

View File

@@ -1,6 +1,6 @@
[metadata] [metadata]
name = chainsyncer name = chainsyncer
version = 0.3.3 version = 0.3.7
description = Generic blockchain syncer driver description = Generic blockchain syncer driver
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
@@ -28,6 +28,7 @@ packages =
chainsyncer.driver chainsyncer.driver
chainsyncer.unittest chainsyncer.unittest
chainsyncer.store chainsyncer.store
chainsyncer.cli
#[options.package_data] #[options.package_data]
#* = #* =

33
tests/store/test_0_mem.py Normal file
View File

@@ -0,0 +1,33 @@
# standard imports
import unittest
import logging
# external imports
from shep import State
# local imports
from chainsyncer.store.mem import SyncMemStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def create(self, session_id=None):
return SyncMemStore(session_id=session_id)
class TestMem(TestStoreBase):
def setUp(self):
super(TestMem, self).setUp()
self.store_factory = StoreFactory().create
self.persist = False
if __name__ == '__main__':
TestStoreBase.link(TestMem)
# Remove tests that test persistence of state
unittest.main()

View File

@@ -4,7 +4,11 @@ import logging
# local imports # local imports
from chainsyncer.store.rocksdb import SyncRocksDbStore from chainsyncer.store.rocksdb import SyncRocksDbStore
from chainsyncer.unittest.store import TestStoreBase from chainsyncer.unittest.store import (
TestStoreBase,
filter_change_callback,
state_change_callback,
)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger() logg = logging.getLogger()
@@ -16,7 +20,7 @@ class StoreFactory:
def create(self, session_id=None): def create(self, session_id=None):
return SyncRocksDbStore(self.path, session_id=session_id) return SyncRocksDbStore(self.path, session_id=session_id, state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
class TestRocksDb(TestStoreBase): class TestRocksDb(TestStoreBase):