Compare commits

...

22 Commits

Author SHA1 Message Date
lash f039d6c9ad
Correct runnable package name 2022-04-28 15:36:04 +00:00
lash 58787b3884
Add unlock cli tool to setup 2022-04-28 15:30:45 +00:00
lash b192dd6e95
Add resume of filter in syncitem 2022-04-28 12:35:18 +00:00
lash ca1441d50d
WIP safe access to unlocking sync with tool 2022-04-28 08:15:04 +00:00
lash ca82ea247f
Filter list persistencE 2022-04-28 06:45:59 +00:00
lash 384c79bed0
Remove session id path generation 2022-04-28 06:10:43 +00:00
lash 36acf3f09a
WIP more work on lock cli tool 2022-04-27 09:59:40 +00:00
lash 3a2317d253
WIP add lock cli tool 2022-04-27 09:43:57 +00:00
lash 6647e11df5
Add cli args handler and settings processor 2022-04-27 05:04:13 +00:00
lash 4e7c0f0d73
Add settings renderer, cli flag and config handling 2022-04-26 19:07:34 +00:00
lash b5617fc9fb
Upgrade shep 2022-04-26 08:16:38 +00:00
lash 044e85fb99
Allow memory-only syncing 2022-04-26 07:56:04 +00:00
lash 927913bd02
Check explicit for bool in filter interrupt check 2022-04-25 06:28:42 +00:00
lash 290fa1844d
Add chainsyncer extras 2022-04-24 21:20:41 +00:00
lash b6ed8d7d8f
Remove loglines 2022-04-24 20:52:11 +00:00
lash 4905fe4fc2
Upgrade shep 2022-04-24 20:47:09 +00:00
lash 6d9d0f0462
Update deps 2022-04-20 19:01:07 +00:00
lash 2c206078ff
Remove deleted module from setup 2022-04-20 16:38:04 +00:00
lash 05898a7e00
Complete rocksdb test 2022-04-20 16:36:06 +00:00
lash 4bda7522ab
Move store tests to separate dir, run last 2022-04-20 15:28:12 +00:00
lash d27bcaa9f5
Factor out common store tests, implement for fs and rocksdb 2022-04-20 15:15:43 +00:00
lash 197560ae16
Implement rocksdb and default test 2022-04-20 14:27:59 +00:00
28 changed files with 936 additions and 363 deletions

View File

@ -1,3 +1,17 @@
* 0.3.7
- Remove hard eth dependency in settings rendering
- Add unlock cli tool
* 0.3.6
- Add cli arg processing and settings renderer
* 0.3.5
- Allow memory-only shep if factory set to None in store constructor
* 0.3.4
- Use explicit bool check in filter interrupt check
* 0.3.3
- Include shep persistent state bootstrap sync
- Add chainsyncer extras
* 0.3.2
- Implement rocksdb backend
* 0.3.1 * 0.3.1
- Upgrade to release shep version - Upgrade to release shep version
- Move sync state to SYNC after start - Move sync state to SYNC after start

View File

@ -1 +1 @@
include *requirements.txt LICENSE.txt include *requirements.txt LICENSE.txt chainsyncer/data/config/*

View File

@ -0,0 +1,12 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

14
chainsyncer/cli/arg.py Normal file
View File

@ -0,0 +1,14 @@
# local imports
from .base import SyncFlag
def process_flags(argparser, flags):
if flags & SyncFlag.RANGE > 0:
argparser.add_argument('--offset', type=int, help='Block to start sync from. Default is start of history (0).')
argparser.add_argument('--until', type=int, default=-1, help='Block to stop sync on. Default is stop at block height of first run.')
if flags & SyncFlag.HEAD > 0:
argparser.add_argument('--head', action='store_true', help='Start from latest block as offset')
argparser.add_argument('--keep-alive', action='store_true', help='Do not stop syncing when caught up')
argparser.add_argument('--backend', type=str, help='Backend to use for state store')

7
chainsyncer/cli/base.py Normal file
View File

@ -0,0 +1,7 @@
# standard imports
import enum
class SyncFlag(enum.IntEnum):
RANGE = 1
HEAD = 2

20
chainsyncer/cli/config.py Normal file
View File

@ -0,0 +1,20 @@
# external imports
from chainsyncer.cli import SyncFlag
def process_config(config, args, flags):
args_override = {}
args_override['SYNCER_BACKEND'] = getattr(args, 'backend')
if flags & SyncFlag.RANGE:
args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
args_override['SYNCER_LIMIT'] = getattr(args, 'until')
config.dict_override(args_override, 'local cli args')
if flags & SyncFlag.HEAD:
config.add(getattr(args, 'keep_alive'), '_KEEP_ALIVE')
config.add(getattr(args, 'head'), '_HEAD')
return config

View File

@ -0,0 +1,4 @@
[syncer]
offset = 0
limit = 0
backend = mem

View File

@ -141,3 +141,4 @@ class SyncDriver:
def get(self, conn): def get(self, conn):
raise NotImplementedError() raise NotImplementedError()

View File

@ -43,13 +43,17 @@ class FilterState:
self.state_store.add('RESET') self.state_store.add('RESET')
self.state = self.state_store.state self.state = self.state_store.state
self.elements = self.state_store.elements
self.put = self.state_store.put self.put = self.state_store.put
self.mask = self.state_store.mask
self.name = self.state_store.name
self.set = self.state_store.set self.set = self.state_store.set
self.next = self.state_store.next self.next = self.state_store.next
self.move = self.state_store.move self.move = self.state_store.move
self.unset = self.state_store.unset self.unset = self.state_store.unset
self.peek = self.state_store.peek self.peek = self.state_store.peek
self.from_name = self.state_store.from_name self.from_name = self.state_store.from_name
self.list = self.state_store.list
self.state_store.sync() self.state_store.sync()
self.all = self.state_store.all self.all = self.state_store.all
self.started = False self.started = False
@ -93,6 +97,7 @@ class FilterState:
if self.scan != None: if self.scan != None:
ks = self.scan() ks = self.scan()
for v in ks: #os.listdir(self.scan_path): for v in ks: #os.listdir(self.scan_path):
logg.debug('ks {}'.format(v))
k = None k = None
try: try:
k = self.state_store.from_elements(v) k = self.state_store.from_elements(v)

1
chainsyncer/paths.py Normal file
View File

@ -0,0 +1 @@

View File

@ -0,0 +1,146 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import sys
import importlib
# external imports
import chainlib.cli
from shep.persist import PersistedState
# local imports
import chainsyncer.cli
from chainsyncer.settings import ChainsyncerSettings
from chainsyncer.store import SyncStore
from chainsyncer.filter import (
FilterState,
SyncFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
valid_fwd = [
'fwd',
'forward',
'next',
'continue',
]
valid_rwd = [
'rwd',
'rewind',
'current',
'back',
'repeat',
'replay',
]
action_is_forward = False
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--state-dir', type=str, dest='state_dir', help='State directory')
argparser.add_positional('action', type=str, help='Action to take on lock. Repeat means re-run the locked filter. Continue means resume execution for next filter.')
sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD
chainsyncer.cli.process_flags(argparser, sync_flags)
args = argparser.parse_args()
if args.action in valid_fwd:
action_is_forward = True
elif args.action not in valid_rwd:
sys.stderr.write('action argument must be one of {} or {}\n'.format(valid_rwd, valid_fwd))
sys.exit(1)
base_config_dir = chainsyncer.cli.config_dir,
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
config = chainsyncer.cli.process_config(config, args, sync_flags)
config.add(args.state_dir, '_STATE_DIR', False)
logg.debug('config loaded:\n{}'.format(config))
settings = ChainsyncerSettings()
settings.process_sync_backend(config)
logg.debug('settings:\n{}'.format(str(settings)))
class FilterInNameOnly(SyncFilter):
def __init__(self, k):
self.k = k
def common_name(self):
return self.k
def main():
if settings.get('SYNCER_BACKEND') == 'mem':
raise ValueError('cannot unlock volatile state store')
state_dir = config.get('_STATE_DIR')
if config.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
store = syncer_store_class(state_dir)
filter_list = store.load_filter_list()
for i, k in enumerate(filter_list):
fltr = FilterInNameOnly(k)
store.register(fltr)
filter_list[i] = k.upper()
store.connect()
store.start(ignore_lock=True)
lock_state = store.filter_state.from_name('LOCK')
locked_item = store.filter_state.list(lock_state)
if len(locked_item) == 0:
sys.stderr.write('Sync filter in {} is not locked\n'.format(state_dir))
sys.exit(1)
elif len(locked_item) > 1:
sys.stderr.write('More than one locked item encountered in {}. That should never happen, so I do not know what to do next.\n'.format(state_dir))
sys.exit(1)
locked_item_key = locked_item[0]
locked_item = store.get(int(locked_item_key))
locked_state = store.filter_state.state(locked_item_key) - lock_state
locked_state_name = store.filter_state.name(locked_state)
logg.info('found item "{}" in locked state {}'.format(locked_item, store.filter_state.name(locked_state)))
if action_is_forward:
k = locked_state_name
filter_index = None
filter_index = filter_list.index(k)
filter_pos = filter_index + 1
filter_count = len(filter_list)
logg.debug('Locked filter {} found at position {} of {}'.format(k, filter_pos, filter_count))
if filter_pos == filter_count:
logg.info('Locked filter {} is the last filter in the list. Executing filter reset'.format(k))
locked_item.reset(check_incomplete=False)
else:
locked_item.advance(ignore_lock=True)
store.filter_state.unset(locked_item_key, lock_state)
else:
filter_mask = 0xf
filter_state = store.filter_state.mask(locked_state, filter_mask)
logg.info('Chosen action is "{}": will continue execution at previous filter {}'.format(args.action, store.filter_state.name(filter_state)))
store.filter_state.unset(locked_item_key, lock_state)
if __name__ == '__main__':
main()

View File

@ -1,9 +1,12 @@
# standard imports # standard imports
import uuid import uuid
import logging
# local imports # local imports
from chainsyncer.error import FilterDone from chainsyncer.error import FilterDone
logg = logging.getLogger(__name__)
class SyncSession: class SyncSession:
@ -29,6 +32,7 @@ class SyncSession:
def filter(self, conn, block, tx): def filter(self, conn, block, tx):
self.session_store.connect() self.session_store.connect()
for fltr in self.filters: for fltr in self.filters:
logg.debug('executing filter {}'.format(fltr))
self.item.advance() self.item.advance()
interrupt = fltr.filter(conn, block, tx) interrupt = fltr.filter(conn, block, tx)
if not self.item.release(interrupt=interrupt): if not self.item.release(interrupt=interrupt):

55
chainsyncer/settings.py Normal file
View File

@ -0,0 +1,55 @@
# standard imports
import logging
# external imports
from hexathon import (
to_int as hex_to_int,
strip_0x,
)
from chainlib.settings import ChainSettings
logg = logging.getLogger(__name__)
class ChainsyncerSettings(ChainSettings):
def process_sync_backend(self, config):
self.o['SYNCER_BACKEND'] = config.get('SYNCER_BACKEND')
def process_sync_range(self, config):
o = self.o['SYNCER_INTERFACE'].block_latest()
r = self.o['RPC'].do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
keep_alive = False
session_block_offset = 0
block_limit = 0
until = 0
if config.true('_HEAD'):
self.o['SYNCER_OFFSET'] = block_offset
self.o['SYNCER_LIMIT'] = -1
return
session_block_offset = int(config.get('SYNCER_OFFSET'))
until = int(config.get('SYNCER_LIMIT'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
elif until == -1:
keep_alive = True
if session_block_offset == -1:
session_block_offset = block_offset
elif config.true('_KEEP_ALIVE'):
block_limit = -1
else:
if block_limit == 0:
block_limit = block_offset
self.o['SYNCER_OFFSET'] = session_block_offset
self.o['SYNCER_LIMIT'] = block_limit

View File

@ -1,8 +1,10 @@
# standard imports # standard imports
import os
import logging import logging
# local imports # local imports
from shep.persist import PersistedState from shep.persist import PersistedState
from shep import State
from shep.error import StateInvalid from shep.error import StateInvalid
from chainsyncer.filter import FilterState from chainsyncer.filter import FilterState
from chainsyncer.error import ( from chainsyncer.error import (
@ -33,7 +35,7 @@ def sync_state_deserialize(b):
# NOT thread safe # NOT thread safe
class SyncItem: class SyncItem:
def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_invalid=False): def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_lock=False):
self.offset = offset self.offset = offset
self.target = target self.target = target
self.sync_state = sync_state self.sync_state = sync_state
@ -45,7 +47,8 @@ class SyncItem:
(self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v) (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid: filter_state = self.filter_state.state(self.state_key)
if filter_state & self.filter_state.from_name('LOCK') > 0 and not ignore_lock:
raise LockError(self.state_key) raise LockError(self.state_key)
self.count = len(self.filter_state.all(pure=True)) - 4 self.count = len(self.filter_state.all(pure=True)) - 4
@ -54,7 +57,7 @@ class SyncItem:
self.skip_filter = True self.skip_filter = True
elif not started: elif not started:
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
def __check_done(self): def __check_done(self):
if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0: if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0:
@ -63,11 +66,22 @@ class SyncItem:
raise FilterDone(self.state_key) raise FilterDone(self.state_key)
def reset(self): def resume(self):
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: filter_state = self.filter_state.state(self.state_key)
raise LockError('reset attempt on {} when state locked'.format(self.state_key)) if filter_state > 0x0f:
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0: filter_state_part = self.filter_state.mask(filter_state, 0x0f)
raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key)) if len(self.filter_state.elements(filter_state)) == 1:
logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part))
lock_state = self.filter_state.from_name('LOCK')
self.filter_state.set(lock_state)
def reset(self, check_incomplete=True):
if check_incomplete:
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
raise LockError('reset attempt on {} when state locked'.format(self.state_key))
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
@ -100,13 +114,16 @@ class SyncItem:
v = self.filter_state.state(self.state_key) v = self.filter_state.state(self.state_key)
def advance(self): def advance(self, ignore_lock=False):
if self.skip_filter: if self.skip_filter:
raise FilterDone() raise FilterDone()
self.__check_done() self.__check_done()
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
raise LockError('advance attempt on {} when state locked'.format(self.state_key)) if ignore_lock:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
else:
raise LockError('advance attempt on {} when state locked'.format(self.state_key))
done = False done = False
try: try:
self.filter_state.next(self.state_key) self.filter_state.next(self.state_key)
@ -120,7 +137,7 @@ class SyncItem:
def release(self, interrupt=False): def release(self, interrupt=False):
if self.skip_filter: if self.skip_filter:
return False return False
if interrupt: if interrupt == True:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT')) self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
@ -145,8 +162,8 @@ class SyncItem:
class SyncStore: class SyncStore:
def __init__(self, session_id=None): def __init__(self, path, session_id=None):
self.session_id = None self.session_id = session_id
self.session_path = None self.session_path = None
self.is_default = False self.is_default = False
self.first = False self.first = False
@ -155,17 +172,25 @@ class SyncStore:
self.item_keys = [] self.item_keys = []
self.started = False self.started = False
self.thresholds = [] self.thresholds = []
self.session_path = path
def setup_sync_state(self, factory, event_callback): def setup_sync_state(self, factory=None, event_callback=None):
self.state = PersistedState(factory.add, 2, event_callback=event_callback) if factory == None:
self.state = State(2, event_callback=event_callback)
else:
self.state = PersistedState(factory.add, 2, event_callback=event_callback)
self.state.add('SYNC') self.state.add('SYNC')
self.state.add('DONE') self.state.add('DONE')
def setup_filter_state(self, factory, event_callback): def setup_filter_state(self, factory=None, event_callback=None):
filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback) if factory == None:
self.filter_state = FilterState(filter_state_backend, scan=factory.ls) filter_state_backend = State(0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend)
else:
filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend, scan=factory.ls)
self.filters = [] self.filters = []
@ -182,18 +207,21 @@ class SyncStore:
self.filter_state.register(fltr) self.filter_state.register(fltr)
def start(self, offset=0, target=-1): def start(self, offset=0, target=-1, ignore_lock=False):
if self.started: if self.started:
return return
self.load(target) self.save_filter_list()
self.load(target, ignore_lock=ignore_lock)
if self.first: if self.first:
state_bytes = sync_state_serialize(offset, 0, target) state_bytes = sync_state_serialize(offset, 0, target)
block_number_str = str(offset) block_number_str = str(offset)
self.state.put(block_number_str, state_bytes) self.state.put(block_number_str, contents=state_bytes)
self.filter_state.put(block_number_str) self.filter_state.put(block_number_str)
o = SyncItem(offset, target, self.state, self.filter_state) o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock)
o.resume()
self.items[offset] = o self.items[offset] = o
self.item_keys.append(offset) self.item_keys.append(offset)
elif offset > 0: elif offset > 0:
@ -215,12 +243,10 @@ class SyncStore:
self.state.move(item.state_key, self.state.DONE) self.state.move(item.state_key, self.state.DONE)
state_bytes = sync_state_serialize(item.cursor, 0, -1) state_bytes = sync_state_serialize(item.cursor, 0, -1)
self.state.put(str(item.cursor), state_bytes) self.state.put(str(item.cursor), contents=state_bytes)
logg.debug('item {}'.format(self.state.state(item.state_key)))
def load(self, target): def load(self, target, ignore_lock=False):
self.state.sync(self.state.NEW) self.state.sync(self.state.NEW)
self.state.sync(self.state.SYNC) self.state.sync(self.state.SYNC)
@ -244,7 +270,8 @@ class SyncStore:
item_target = target item_target = target
if i < lim: if i < lim:
item_target = thresholds[i+1] item_target = thresholds[i+1]
o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True) o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock)
o.resume()
self.items[block_number] = o self.items[block_number] = o
self.item_keys.append(block_number) self.item_keys.append(block_number)
logg.info('added existing {}'.format(o)) logg.info('added existing {}'.format(o))
@ -278,3 +305,18 @@ class SyncStore:
def disconnect(self): def disconnect(self):
self.filter_state.disconnect() self.filter_state.disconnect()
def save_filter_list(self):
raise NotImplementedError()
def load_filter_list(self):
raise NotImplementedError()
def peek_next_filter(self):
pass
def peek_current_filter(self):
pass

View File

@ -7,10 +7,7 @@ import logging
from shep.store.file import SimpleFileStoreFactory from shep.store.file import SimpleFileStoreFactory
# local imports # local imports
from chainsyncer.store import ( from chainsyncer.store import SyncStore
SyncItem,
SyncStore,
)
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -18,18 +15,7 @@ logg = logging.getLogger(__name__)
class SyncFsStore(SyncStore): class SyncFsStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None): def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncFsStore, self).__init__(session_id=session_id) super(SyncFsStore, self).__init__(base_path, session_id=session_id)
default_path = os.path.join(base_path, 'default')
if session_id == None:
self.session_path = os.path.realpath(default_path)
self.is_default = True
else:
if session_id == 'default':
self.is_default = True
given_path = os.path.join(base_path, session_id)
self.session_path = os.path.realpath(given_path)
create_path = False create_path = False
try: try:
@ -38,18 +24,22 @@ class SyncFsStore(SyncStore):
create_path = True create_path = True
if create_path: if create_path:
self.__create_path(base_path, default_path, session_id=session_id) self.__create_path(base_path, self.default_path, session_id=session_id)
self.session_id = os.path.basename(self.session_path)
self.session_id = os.path.basename(self.session_path)
logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path)) logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
base_sync_path = os.path.join(self.session_path, 'sync') base_sync_path = os.path.join(self.session_path, 'sync')
factory = SimpleFileStoreFactory(base_sync_path, binary=True) factory = SimpleFileStoreFactory(base_sync_path, binary=True)
self.setup_sync_state(factory, state_event_callback) self.setup_sync_state(factory, state_event_callback)
self.setup_filter_state(callback=filter_state_event_callback)
def setup_filter_state(self, callback=None):
base_filter_path = os.path.join(self.session_path, 'filter') base_filter_path = os.path.join(self.session_path, 'filter')
factory = SimpleFileStoreFactory(base_filter_path, binary=True) factory = SimpleFileStoreFactory(base_filter_path, binary=True)
self.setup_filter_state(factory, filter_state_event_callback) super(SyncFsStore, self).setup_filter_state(factory, callback)
def __create_path(self, base_path, default_path, session_id=None): def __create_path(self, base_path, default_path, session_id=None):
@ -84,3 +74,25 @@ class SyncFsStore(SyncStore):
f.write(str(v)) f.write(str(v))
f.close() f.close()
self.target = v self.target = v
def load_filter_list(self):
fltr = []
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'r')
while True:
v = f.readline()
if len(v) == 0:
break
v = v.rstrip()
fltr.append(v)
f.close()
return fltr
def save_filter_list(self):
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'w')
for fltr in self.filters:
f.write(fltr.common_name() + '\n')
f.close()

45
chainsyncer/store/mem.py Normal file
View File

@ -0,0 +1,45 @@
# standard imports
import logging
import os
# external imports
from shep import State
# local imports
from chainsyncer.store import SyncStore
logg = logging.getLogger(__name__)
class SyncMemStore(SyncStore):
def __init__(self, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncMemStore, self).__init__(None, session_id=session_id)
factory = None
self.setup_sync_state(factory, state_event_callback)
factory = None
self.setup_filter_state(factory, filter_state_event_callback)
def set_target(self, v):
self.target = int(v)
def get_target(self):
return self.target
def stop(self, item):
if item != None:
super(SyncMemStore, self).stop(item)
logg.info('I am an in-memory only state store. I am shutting down now, so all state will now be discarded.')
def save_filter_list(self):
pass
def load_filter_list(self):
return []

View File

@ -0,0 +1,79 @@
# standard imports
import uuid
import os
import logging
# external imports
from shep.store.rocksdb import RocksDbStoreFactory
# local imports
from chainsyncer.store import (
SyncItem,
SyncStore,
)
logg = logging.getLogger(__name__)
class RocksDbStoreAdder:
def __init__(self, factory, prefix):
self.factory = factory
self.prefix = prefix
def add(self, k):
path = os.path.join(self.prefix, k)
return self.factory.add(path)
def ls(self):
return self.factory.ls()
class SyncRocksDbStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncRocksDbStore, self).__init__(base_path, session_id=session_id)
self.factory = RocksDbStoreFactory(self.session_path, binary=True)
prefix_factory = RocksDbStoreAdder(self.factory, 'sync')
self.setup_sync_state(prefix_factory, state_event_callback)
prefix_factory = RocksDbStoreAdder(self.factory, 'filter')
self.setup_filter_state(prefix_factory, filter_state_event_callback)
#self.session_id = os.path.basename(self.session_path)
#logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
self.target_db = RocksDbStoreAdder(self.factory, '.stat').add('target')
def get_target(self):
v = self.target_db.get('target')
if v != None:
self.target = int(v)
def set_target(self, v):
self.target_db.put('target', str(v))
self.target = v
def stop(self, item):
if item != None:
super(SyncRocksDbStore, self).stop(item)
self.factory.close()
def save_filter_list(self):
fltr = []
for v in self.filters:
fltr.append(v.common_name())
self.target_db.put('filter_list', ','.join(fltr))
def load_filter_list(self):
v = self.target_db.get('filter_list')
v = v.decode('utf-8')
return v.split(',')

View File

@ -267,3 +267,12 @@ class MockChainInterfaceConn(MockConn):
def handle_receipt(self, hsh): def handle_receipt(self, hsh):
return {} return {}
class MockItem:
def __init__(self, target, offset, cursor, state_key):
self.target = target
self.offset = offset
self.cursor = cursor
self.state_key = state_key

View File

@ -1,64 +0,0 @@
# standard imports
import logging
import os
# external imports
import alembic
import alembic.config
# local imports
from chainsyncer.db.models.base import SessionBase
from chainsyncer.db import dsn_from_config
from chainsyncer.db.models.base import SessionBase
logg = logging.getLogger(__name__)
class ChainSyncerDb:
"""SQLITE database setup for unit tests
:param debug: Activate sql level debug (outputs sql statements)
:type debug: bool
"""
base = SessionBase
def __init__(self, debug=False):
config = {
'DATABASE_ENGINE': 'sqlite',
'DATABASE_DRIVER': 'pysqlite',
'DATABASE_NAME': 'chainsyncer.sqlite',
}
logg.debug('config {}'.format(config))
self.dsn = dsn_from_config(config)
self.base.poolable = False
self.base.transactional = False
self.base.procedural = False
self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0
rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..')
dbdir = os.path.join(rootdir, 'chainsyncer', 'db')
#migrationsdir = os.path.join(dbdir, 'migrations', config.get('DATABASE_ENGINE'))
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', self.dsn)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
def bind_session(self, session=None):
"""Create session using underlying session base
"""
return self.base.bind_session(session)
def release_session(self, session=None):
"""Release session using underlying session base
"""
return self.base.release_session(session)

View File

@ -0,0 +1,301 @@
# standard imports
import os
import stat
import unittest
import shutil
import tempfile
import logging
import uuid
# local imports
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import (
MockFilter,
MockItem,
)
logging.STATETRACE = 5
logg = logging.getLogger(__name__)
logg.setLevel(logging.STATETRACE)
def state_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
def filter_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
class TestStoreBase(unittest.TestCase):
def setUp(self):
self.base_path = tempfile.mkdtemp()
self.session_id = str(uuid.uuid4())
self.path = os.path.join(self.base_path, self.session_id)
os.makedirs(self.path)
self.store_factory = None
self.persist = True
@classmethod
def link(cls, target):
for v in [
"default",
"store_start",
"store_resume",
"filter_list",
"sync_process_nofilter",
"sync_process_onefilter",
"sync_process_outoforder",
"sync_process_interrupt",
"sync_process_reset",
"sync_process_done",
"sync_head_future",
"sync_history_interrupted",
"sync_history_complete",
]:
setattr(target, 'test_' + v, getattr(cls, 't_' + v))
def tearDown(self):
shutil.rmtree(self.path)
def t_default(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
if store.session_path == None:
return
#fp = os.path.join(self.path, store.session_id)
fp = self.path
session_id = store.session_id
st = None
st = os.stat(fp)
if st != None:
self.assertTrue(stat.S_ISDIR(st.st_mode))
#self.assertTrue(store.is_default)
store.stop(bogus_item)
store = self.store_factory()
fpr = os.path.join(self.path, self.session_id)
self.assertEqual(fp, self.path)
def t_store_start(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
store.start(42)
self.assertTrue(store.first)
store.stop(bogus_item)
if self.persist:
store = self.store_factory()
store.start()
self.assertFalse(store.first)
def t_store_resume(self):
store = self.store_factory()
store.start(13)
self.assertTrue(store.first)
# todo not done
def t_sync_process_nofilter(self):
store = self.store_factory()
session = SyncSession(store)
session.start()
o = session.get(0)
with self.assertRaises(FilterDone):
o.advance()
def t_sync_process_onefilter(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start()
o = session.get(0)
o.advance()
o.release()
def t_sync_process_outoforder(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('two')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.advance()
o.release()
with self.assertRaises(LockError):
o.release()
o.advance()
o.release()
def t_sync_process_interrupt(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
o.release(interrupt=True)
with self.assertRaises(FilterDone):
o.advance()
def t_sync_process_reset(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.reset()
o.release()
with self.assertRaises(IncompleteFilterError):
o.reset()
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
def t_sync_process_done(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start(target=0)
o = session.get(0)
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def t_sync_head_future(self):
store = self.store_factory('foo')
session = SyncSession(store)
session.start()
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
if self.persist:
store = self.store_factory('foo')
store.start()
o = store.get(2)
def t_sync_history_interrupted(self):
if not self.persist:
return
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory('foo')
session = SyncSession(store)
session.start(target=13)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store.stop(bogus_item)
store = self.store_factory('foo')
store.start()
o = store.get(0)
self.assertEqual(o.cursor, 2)
self.assertEqual(o.target, 13)
o.next(advance_block=True)
o.next(advance_block=True)
store.stop(bogus_item)
store = self.store_factory('foo')
store.start()
self.assertEqual(o.cursor, 4)
self.assertEqual(o.target, 13)
def t_sync_history_complete(self):
store = self.store_factory('foo')
session = SyncSession(store)
session.start(target=3)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
o.next(advance_block=True)
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def t_filter_list(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
if store.session_path == None:
return
fltr_one = MockFilter('foo_bar')
store.register(fltr_one)
fltr_two = MockFilter('bar_baz')
store.register(fltr_two)
store.start()
store.stop(bogus_item)
store = self.store_factory()
r = store.load_filter_list()
self.assertEqual(r[0], 'foo_bar')
self.assertEqual(r[1], 'bar_baz')

View File

@ -1,5 +1,5 @@
confini~=0.6.0 confini~=0.6.0
semver==2.13.0 semver==2.13.0
hexathon~=0.1.5 hexathon~=0.1.5
chainlib>=0.1.0b1,<=0.1.0 chainlib~=0.1.1
shep>=0.2.0rc1,<0.3.0 shep~=0.2.3

View File

@ -8,5 +8,12 @@ for f in `ls tests/*.py`; do
exit exit
fi fi
done done
for f in `ls tests/store/*.py`; do
python $f
if [ $? -gt 0 ]; then
exit
fi
done
set +x set +x
set +e set +e

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chainsyncer name = chainsyncer
version = 0.3.1 version = 0.4.1
description = Generic blockchain syncer driver description = Generic blockchain syncer driver
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
@ -28,12 +28,14 @@ packages =
chainsyncer.driver chainsyncer.driver
chainsyncer.unittest chainsyncer.unittest
chainsyncer.store chainsyncer.store
chainsyncer.state chainsyncer.cli
chainsyncer.runnable
#[options.package_data] #[options.package_data]
#* = #* =
# sql/* # sql/*
#[options.entry_points] [options.entry_points]
#console_scripts = console_scripts =
# blocksync-celery = chainsyncer.runnable.tracker:main #blocksync-celery = chainsyncer.runnable.tracker:main
chainsyncer-unlock = chainsyncer.runnable.unlock:main

View File

@ -26,5 +26,7 @@ setup(
install_requires=requirements, install_requires=requirements,
extras_require={ extras_require={
'sql': sql_requirements, 'sql': sql_requirements,
'rocksdb': ['shep[rocksdb]~=0.2.2'],
'redis': ['shep[redis]~=0.2.2'],
} }
) )

33
tests/store/test_0_mem.py Normal file
View File

@ -0,0 +1,33 @@
# standard imports
import unittest
import logging
# external imports
from shep import State
# local imports
from chainsyncer.store.mem import SyncMemStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def create(self, session_id=None):
return SyncMemStore(session_id=session_id)
class TestMem(TestStoreBase):
def setUp(self):
super(TestMem, self).setUp()
self.store_factory = StoreFactory().create
self.persist = False
if __name__ == '__main__':
TestStoreBase.link(TestMem)
# Remove tests that test persistence of state
unittest.main()

32
tests/store/test_1_fs.py Normal file
View File

@ -0,0 +1,32 @@
# standard imports
import unittest
import logging
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def __init__(self, path):
self.path = path
def create(self, session_id=None):
return SyncFsStore(self.path, session_id=session_id)
class TestFs(TestStoreBase):
def setUp(self):
super(TestFs, self).setUp()
self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
TestStoreBase.link(TestFs)
unittest.main()

View File

@ -0,0 +1,35 @@
# standard imports
import unittest
import logging
# local imports
from chainsyncer.store.rocksdb import SyncRocksDbStore
from chainsyncer.unittest.store import (
TestStoreBase,
filter_change_callback,
state_change_callback,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def __init__(self, path):
self.path = path
def create(self, session_id=None):
return SyncRocksDbStore(self.path, session_id=session_id, state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
class TestRocksDb(TestStoreBase):
def setUp(self):
super(TestRocksDb, self).setUp()
self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
TestStoreBase.link(TestRocksDb)
unittest.main()

View File

@ -1,245 +0,0 @@
# standard imports
import unittest
import tempfile
import shutil
import logging
import stat
import os
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import MockFilter
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestFs(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.path)
def test_default(self):
store = SyncFsStore(self.path)
fp = os.path.join(self.path, store.session_id)
session_id = store.session_id
st = os.stat(fp)
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertTrue(store.is_default)
fpd = os.path.join(self.path, 'default')
st = os.stat(fpd)
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertTrue(store.is_default)
fpd = os.path.realpath(fpd)
self.assertEqual(fpd, fp)
store = SyncFsStore(self.path)
fpr = os.path.join(self.path, session_id)
self.assertEqual(fp, fpr)
self.assertTrue(store.is_default)
store = SyncFsStore(self.path, 'default')
fpr = os.path.join(self.path, session_id)
self.assertEqual(fp, fpr)
self.assertTrue(store.is_default)
store = SyncFsStore(self.path, 'foo')
fpf = os.path.join(self.path, 'foo')
st = os.stat(fpf)
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertFalse(store.is_default)
def test_store_start(self):
store = SyncFsStore(self.path)
store.start(42)
self.assertTrue(store.first)
store = SyncFsStore(self.path)
store.start()
self.assertFalse(store.first)
def test_store_resume(self):
store = SyncFsStore(self.path)
store.start(13)
self.assertTrue(store.first)
# todo not done
def test_sync_process_nofilter(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
session.start()
o = session.get(0)
with self.assertRaises(FilterDone):
o.advance()
def test_sync_process_onefilter(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start()
o = session.get(0)
o.advance()
o.release()
def test_sync_process_outoforder(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('two')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.advance()
o.release()
with self.assertRaises(LockError):
o.release()
o.advance()
o.release()
def test_sync_process_interrupt(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
o.release(interrupt=True)
with self.assertRaises(FilterDone):
o.advance()
def test_sync_process_reset(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.reset()
o.release()
with self.assertRaises(IncompleteFilterError):
o.reset()
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
def test_sync_process_done(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start(target=0)
o = session.get(0)
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def test_sync_head_future(self):
store = SyncFsStore(self.path, session_id='foo')
session = SyncSession(store)
session.start()
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store = SyncFsStore(self.path, session_id='foo')
store.start()
o = store.get(2)
def test_sync_history_interrupted(self):
store = SyncFsStore(self.path, session_id='foo')
session = SyncSession(store)
session.start(target=13)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store = SyncFsStore(self.path, session_id='foo')
store.start()
o = store.get(0)
self.assertEqual(o.cursor, 2)
self.assertEqual(o.target, 13)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store = SyncFsStore(self.path, session_id='foo')
store.start()
self.assertEqual(o.cursor, 4)
self.assertEqual(o.target, 13)
def test_sync_history_complete(self):
store = SyncFsStore(self.path, session_id='foo')
session = SyncSession(store)
session.start(target=3)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
o.next(advance_block=True)
with self.assertRaises(SyncDone):
o.next(advance_block=True)
if __name__ == '__main__':
unittest.main()