Compare commits

..

No commits in common. "master" and "dev-0.3.3" have entirely different histories.

22 changed files with 85 additions and 547 deletions

View File

@ -1,12 +1,3 @@
* 0.3.7
- Remove hard eth dependency in settings rendering
- Add unlock cli tool
* 0.3.6
- Add cli arg processing and settings renderer
* 0.3.5
- Allow memory-only shep if factory set to None in store constructor
* 0.3.4
- Use explicit bool check in filter interrupt check
* 0.3.3 * 0.3.3
- Include shep persistent state bootstrap sync - Include shep persistent state bootstrap sync
- Add chainsyncer extras - Add chainsyncer extras

View File

@ -1 +1 @@
include *requirements.txt LICENSE.txt chainsyncer/data/config/* include *requirements.txt LICENSE.txt

View File

@ -1,12 +0,0 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

View File

@ -1,14 +0,0 @@
# local imports
from .base import SyncFlag
def process_flags(argparser, flags):
if flags & SyncFlag.RANGE > 0:
argparser.add_argument('--offset', type=int, help='Block to start sync from. Default is start of history (0).')
argparser.add_argument('--until', type=int, default=-1, help='Block to stop sync on. Default is stop at block height of first run.')
if flags & SyncFlag.HEAD > 0:
argparser.add_argument('--head', action='store_true', help='Start from latest block as offset')
argparser.add_argument('--keep-alive', action='store_true', help='Do not stop syncing when caught up')
argparser.add_argument('--backend', type=str, help='Backend to use for state store')

View File

@ -1,7 +0,0 @@
# standard imports
import enum
class SyncFlag(enum.IntEnum):
RANGE = 1
HEAD = 2

View File

@ -1,20 +0,0 @@
# external imports
from chainsyncer.cli import SyncFlag
def process_config(config, args, flags):
args_override = {}
args_override['SYNCER_BACKEND'] = getattr(args, 'backend')
if flags & SyncFlag.RANGE:
args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
args_override['SYNCER_LIMIT'] = getattr(args, 'until')
config.dict_override(args_override, 'local cli args')
if flags & SyncFlag.HEAD:
config.add(getattr(args, 'keep_alive'), '_KEEP_ALIVE')
config.add(getattr(args, 'head'), '_HEAD')
return config

View File

@ -1,4 +0,0 @@
[syncer]
offset = 0
limit = 0
backend = mem

View File

@ -141,4 +141,3 @@ class SyncDriver:
def get(self, conn): def get(self, conn):
raise NotImplementedError() raise NotImplementedError()

View File

@ -43,17 +43,13 @@ class FilterState:
self.state_store.add('RESET') self.state_store.add('RESET')
self.state = self.state_store.state self.state = self.state_store.state
self.elements = self.state_store.elements
self.put = self.state_store.put self.put = self.state_store.put
self.mask = self.state_store.mask
self.name = self.state_store.name
self.set = self.state_store.set self.set = self.state_store.set
self.next = self.state_store.next self.next = self.state_store.next
self.move = self.state_store.move self.move = self.state_store.move
self.unset = self.state_store.unset self.unset = self.state_store.unset
self.peek = self.state_store.peek self.peek = self.state_store.peek
self.from_name = self.state_store.from_name self.from_name = self.state_store.from_name
self.list = self.state_store.list
self.state_store.sync() self.state_store.sync()
self.all = self.state_store.all self.all = self.state_store.all
self.started = False self.started = False
@ -97,7 +93,6 @@ class FilterState:
if self.scan != None: if self.scan != None:
ks = self.scan() ks = self.scan()
for v in ks: #os.listdir(self.scan_path): for v in ks: #os.listdir(self.scan_path):
logg.debug('ks {}'.format(v))
k = None k = None
try: try:
k = self.state_store.from_elements(v) k = self.state_store.from_elements(v)

View File

@ -1 +0,0 @@

View File

@ -1,146 +0,0 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import sys
import importlib
# external imports
import chainlib.cli
from shep.persist import PersistedState
# local imports
import chainsyncer.cli
from chainsyncer.settings import ChainsyncerSettings
from chainsyncer.store import SyncStore
from chainsyncer.filter import (
FilterState,
SyncFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
valid_fwd = [
'fwd',
'forward',
'next',
'continue',
]
valid_rwd = [
'rwd',
'rewind',
'current',
'back',
'repeat',
'replay',
]
action_is_forward = False
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--state-dir', type=str, dest='state_dir', help='State directory')
argparser.add_positional('action', type=str, help='Action to take on lock. Repeat means re-run the locked filter. Continue means resume execution for next filter.')
sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD
chainsyncer.cli.process_flags(argparser, sync_flags)
args = argparser.parse_args()
if args.action in valid_fwd:
action_is_forward = True
elif args.action not in valid_rwd:
sys.stderr.write('action argument must be one of {} or {}\n'.format(valid_rwd, valid_fwd))
sys.exit(1)
base_config_dir = chainsyncer.cli.config_dir,
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
config = chainsyncer.cli.process_config(config, args, sync_flags)
config.add(args.state_dir, '_STATE_DIR', False)
logg.debug('config loaded:\n{}'.format(config))
settings = ChainsyncerSettings()
settings.process_sync_backend(config)
logg.debug('settings:\n{}'.format(str(settings)))
class FilterInNameOnly(SyncFilter):
def __init__(self, k):
self.k = k
def common_name(self):
return self.k
def main():
if settings.get('SYNCER_BACKEND') == 'mem':
raise ValueError('cannot unlock volatile state store')
state_dir = config.get('_STATE_DIR')
if config.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
store = syncer_store_class(state_dir)
filter_list = store.load_filter_list()
for i, k in enumerate(filter_list):
fltr = FilterInNameOnly(k)
store.register(fltr)
filter_list[i] = k.upper()
store.connect()
store.start(ignore_lock=True)
lock_state = store.filter_state.from_name('LOCK')
locked_item = store.filter_state.list(lock_state)
if len(locked_item) == 0:
sys.stderr.write('Sync filter in {} is not locked\n'.format(state_dir))
sys.exit(1)
elif len(locked_item) > 1:
sys.stderr.write('More than one locked item encountered in {}. That should never happen, so I do not know what to do next.\n'.format(state_dir))
sys.exit(1)
locked_item_key = locked_item[0]
locked_item = store.get(int(locked_item_key))
locked_state = store.filter_state.state(locked_item_key) - lock_state
locked_state_name = store.filter_state.name(locked_state)
logg.info('found item "{}" in locked state {}'.format(locked_item, store.filter_state.name(locked_state)))
if action_is_forward:
k = locked_state_name
filter_index = None
filter_index = filter_list.index(k)
filter_pos = filter_index + 1
filter_count = len(filter_list)
logg.debug('Locked filter {} found at position {} of {}'.format(k, filter_pos, filter_count))
if filter_pos == filter_count:
logg.info('Locked filter {} is the last filter in the list. Executing filter reset'.format(k))
locked_item.reset(check_incomplete=False)
else:
locked_item.advance(ignore_lock=True)
store.filter_state.unset(locked_item_key, lock_state)
else:
filter_mask = 0xf
filter_state = store.filter_state.mask(locked_state, filter_mask)
logg.info('Chosen action is "{}": will continue execution at previous filter {}'.format(args.action, store.filter_state.name(filter_state)))
store.filter_state.unset(locked_item_key, lock_state)
if __name__ == '__main__':
main()

View File

@ -1,12 +1,9 @@
# standard imports # standard imports
import uuid import uuid
import logging
# local imports # local imports
from chainsyncer.error import FilterDone from chainsyncer.error import FilterDone
logg = logging.getLogger(__name__)
class SyncSession: class SyncSession:
@ -32,7 +29,6 @@ class SyncSession:
def filter(self, conn, block, tx): def filter(self, conn, block, tx):
self.session_store.connect() self.session_store.connect()
for fltr in self.filters: for fltr in self.filters:
logg.debug('executing filter {}'.format(fltr))
self.item.advance() self.item.advance()
interrupt = fltr.filter(conn, block, tx) interrupt = fltr.filter(conn, block, tx)
if not self.item.release(interrupt=interrupt): if not self.item.release(interrupt=interrupt):

View File

@ -1,55 +0,0 @@
# standard imports
import logging
# external imports
from hexathon import (
to_int as hex_to_int,
strip_0x,
)
from chainlib.settings import ChainSettings
logg = logging.getLogger(__name__)
class ChainsyncerSettings(ChainSettings):
def process_sync_backend(self, config):
self.o['SYNCER_BACKEND'] = config.get('SYNCER_BACKEND')
def process_sync_range(self, config):
o = self.o['SYNCER_INTERFACE'].block_latest()
r = self.o['RPC'].do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
keep_alive = False
session_block_offset = 0
block_limit = 0
until = 0
if config.true('_HEAD'):
self.o['SYNCER_OFFSET'] = block_offset
self.o['SYNCER_LIMIT'] = -1
return
session_block_offset = int(config.get('SYNCER_OFFSET'))
until = int(config.get('SYNCER_LIMIT'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
elif until == -1:
keep_alive = True
if session_block_offset == -1:
session_block_offset = block_offset
elif config.true('_KEEP_ALIVE'):
block_limit = -1
else:
if block_limit == 0:
block_limit = block_offset
self.o['SYNCER_OFFSET'] = session_block_offset
self.o['SYNCER_LIMIT'] = block_limit

View File

@ -4,7 +4,6 @@ import logging
# local imports # local imports
from shep.persist import PersistedState from shep.persist import PersistedState
from shep import State
from shep.error import StateInvalid from shep.error import StateInvalid
from chainsyncer.filter import FilterState from chainsyncer.filter import FilterState
from chainsyncer.error import ( from chainsyncer.error import (
@ -35,7 +34,7 @@ def sync_state_deserialize(b):
# NOT thread safe # NOT thread safe
class SyncItem: class SyncItem:
def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_lock=False): def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_invalid=False):
self.offset = offset self.offset = offset
self.target = target self.target = target
self.sync_state = sync_state self.sync_state = sync_state
@ -47,8 +46,7 @@ class SyncItem:
(self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v) (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
filter_state = self.filter_state.state(self.state_key) if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid:
if filter_state & self.filter_state.from_name('LOCK') > 0 and not ignore_lock:
raise LockError(self.state_key) raise LockError(self.state_key)
self.count = len(self.filter_state.all(pure=True)) - 4 self.count = len(self.filter_state.all(pure=True)) - 4
@ -57,7 +55,7 @@ class SyncItem:
self.skip_filter = True self.skip_filter = True
elif not started: elif not started:
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
def __check_done(self): def __check_done(self):
if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0: if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0:
@ -66,22 +64,11 @@ class SyncItem:
raise FilterDone(self.state_key) raise FilterDone(self.state_key)
def resume(self): def reset(self):
filter_state = self.filter_state.state(self.state_key) if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
if filter_state > 0x0f: raise LockError('reset attempt on {} when state locked'.format(self.state_key))
filter_state_part = self.filter_state.mask(filter_state, 0x0f) if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
if len(self.filter_state.elements(filter_state)) == 1: raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part))
lock_state = self.filter_state.from_name('LOCK')
self.filter_state.set(lock_state)
def reset(self, check_incomplete=True):
if check_incomplete:
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
raise LockError('reset attempt on {} when state locked'.format(self.state_key))
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
@ -114,16 +101,13 @@ class SyncItem:
v = self.filter_state.state(self.state_key) v = self.filter_state.state(self.state_key)
def advance(self, ignore_lock=False): def advance(self):
if self.skip_filter: if self.skip_filter:
raise FilterDone() raise FilterDone()
self.__check_done() self.__check_done()
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
if ignore_lock: raise LockError('advance attempt on {} when state locked'.format(self.state_key))
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
else:
raise LockError('advance attempt on {} when state locked'.format(self.state_key))
done = False done = False
try: try:
self.filter_state.next(self.state_key) self.filter_state.next(self.state_key)
@ -137,7 +121,7 @@ class SyncItem:
def release(self, interrupt=False): def release(self, interrupt=False):
if self.skip_filter: if self.skip_filter:
return False return False
if interrupt == True: if interrupt:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT')) self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
@ -163,7 +147,7 @@ class SyncItem:
class SyncStore: class SyncStore:
def __init__(self, path, session_id=None): def __init__(self, path, session_id=None):
self.session_id = session_id self.session_id = None
self.session_path = None self.session_path = None
self.is_default = False self.is_default = False
self.first = False self.first = False
@ -172,25 +156,27 @@ class SyncStore:
self.item_keys = [] self.item_keys = []
self.started = False self.started = False
self.thresholds = [] self.thresholds = []
self.session_path = path self.default_path = os.path.join(path, 'default')
if session_id == None:
def setup_sync_state(self, factory=None, event_callback=None): self.session_path = os.path.realpath(self.default_path)
if factory == None: self.is_default = True
self.state = State(2, event_callback=event_callback)
else: else:
self.state = PersistedState(factory.add, 2, event_callback=event_callback) if session_id == 'default':
self.is_default = True
given_path = os.path.join(path, session_id)
self.session_path = os.path.realpath(given_path)
def setup_sync_state(self, factory, event_callback):
self.state = PersistedState(factory.add, 2, event_callback=event_callback)
self.state.add('SYNC') self.state.add('SYNC')
self.state.add('DONE') self.state.add('DONE')
def setup_filter_state(self, factory=None, event_callback=None): def setup_filter_state(self, factory, event_callback):
if factory == None: filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback)
filter_state_backend = State(0, check_alias=False, event_callback=event_callback) self.filter_state = FilterState(filter_state_backend, scan=factory.ls)
self.filter_state = FilterState(filter_state_backend)
else:
filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend, scan=factory.ls)
self.filters = [] self.filters = []
@ -207,21 +193,18 @@ class SyncStore:
self.filter_state.register(fltr) self.filter_state.register(fltr)
def start(self, offset=0, target=-1, ignore_lock=False): def start(self, offset=0, target=-1):
if self.started: if self.started:
return return
self.save_filter_list() self.load(target)
self.load(target, ignore_lock=ignore_lock)
if self.first: if self.first:
state_bytes = sync_state_serialize(offset, 0, target) state_bytes = sync_state_serialize(offset, 0, target)
block_number_str = str(offset) block_number_str = str(offset)
self.state.put(block_number_str, contents=state_bytes) self.state.put(block_number_str, state_bytes)
self.filter_state.put(block_number_str) self.filter_state.put(block_number_str)
o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock) o = SyncItem(offset, target, self.state, self.filter_state)
o.resume()
self.items[offset] = o self.items[offset] = o
self.item_keys.append(offset) self.item_keys.append(offset)
elif offset > 0: elif offset > 0:
@ -243,10 +226,10 @@ class SyncStore:
self.state.move(item.state_key, self.state.DONE) self.state.move(item.state_key, self.state.DONE)
state_bytes = sync_state_serialize(item.cursor, 0, -1) state_bytes = sync_state_serialize(item.cursor, 0, -1)
self.state.put(str(item.cursor), contents=state_bytes) self.state.put(str(item.cursor), state_bytes)
def load(self, target, ignore_lock=False): def load(self, target):
self.state.sync(self.state.NEW) self.state.sync(self.state.NEW)
self.state.sync(self.state.SYNC) self.state.sync(self.state.SYNC)
@ -270,8 +253,7 @@ class SyncStore:
item_target = target item_target = target
if i < lim: if i < lim:
item_target = thresholds[i+1] item_target = thresholds[i+1]
o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock) o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True)
o.resume()
self.items[block_number] = o self.items[block_number] = o
self.item_keys.append(block_number) self.item_keys.append(block_number)
logg.info('added existing {}'.format(o)) logg.info('added existing {}'.format(o))
@ -305,18 +287,3 @@ class SyncStore:
def disconnect(self): def disconnect(self):
self.filter_state.disconnect() self.filter_state.disconnect()
def save_filter_list(self):
raise NotImplementedError()
def load_filter_list(self):
raise NotImplementedError()
def peek_next_filter(self):
pass
def peek_current_filter(self):
pass

View File

@ -7,7 +7,10 @@ import logging
from shep.store.file import SimpleFileStoreFactory from shep.store.file import SimpleFileStoreFactory
# local imports # local imports
from chainsyncer.store import SyncStore from chainsyncer.store import (
SyncItem,
SyncStore,
)
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -33,13 +36,9 @@ class SyncFsStore(SyncStore):
factory = SimpleFileStoreFactory(base_sync_path, binary=True) factory = SimpleFileStoreFactory(base_sync_path, binary=True)
self.setup_sync_state(factory, state_event_callback) self.setup_sync_state(factory, state_event_callback)
self.setup_filter_state(callback=filter_state_event_callback)
def setup_filter_state(self, callback=None):
base_filter_path = os.path.join(self.session_path, 'filter') base_filter_path = os.path.join(self.session_path, 'filter')
factory = SimpleFileStoreFactory(base_filter_path, binary=True) factory = SimpleFileStoreFactory(base_filter_path, binary=True)
super(SyncFsStore, self).setup_filter_state(factory, callback) self.setup_filter_state(factory, filter_state_event_callback)
def __create_path(self, base_path, default_path, session_id=None): def __create_path(self, base_path, default_path, session_id=None):
@ -74,25 +73,3 @@ class SyncFsStore(SyncStore):
f.write(str(v)) f.write(str(v))
f.close() f.close()
self.target = v self.target = v
def load_filter_list(self):
fltr = []
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'r')
while True:
v = f.readline()
if len(v) == 0:
break
v = v.rstrip()
fltr.append(v)
f.close()
return fltr
def save_filter_list(self):
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'w')
for fltr in self.filters:
f.write(fltr.common_name() + '\n')
f.close()

View File

@ -1,45 +0,0 @@
# standard imports
import logging
import os
# external imports
from shep import State
# local imports
from chainsyncer.store import SyncStore
logg = logging.getLogger(__name__)
class SyncMemStore(SyncStore):
def __init__(self, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncMemStore, self).__init__(None, session_id=session_id)
factory = None
self.setup_sync_state(factory, state_event_callback)
factory = None
self.setup_filter_state(factory, filter_state_event_callback)
def set_target(self, v):
self.target = int(v)
def get_target(self):
return self.target
def stop(self, item):
if item != None:
super(SyncMemStore, self).stop(item)
logg.info('I am an in-memory only state store. I am shutting down now, so all state will now be discarded.')
def save_filter_list(self):
pass
def load_filter_list(self):
return []

View File

@ -43,8 +43,8 @@ class SyncRocksDbStore(SyncStore):
prefix_factory = RocksDbStoreAdder(self.factory, 'filter') prefix_factory = RocksDbStoreAdder(self.factory, 'filter')
self.setup_filter_state(prefix_factory, filter_state_event_callback) self.setup_filter_state(prefix_factory, filter_state_event_callback)
#self.session_id = os.path.basename(self.session_path) self.session_id = os.path.basename(self.session_path)
#logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path)) logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
self.target_db = RocksDbStoreAdder(self.factory, '.stat').add('target') self.target_db = RocksDbStoreAdder(self.factory, '.stat').add('target')
@ -64,16 +64,3 @@ class SyncRocksDbStore(SyncStore):
if item != None: if item != None:
super(SyncRocksDbStore, self).stop(item) super(SyncRocksDbStore, self).stop(item)
self.factory.close() self.factory.close()
def save_filter_list(self):
fltr = []
for v in self.filters:
fltr.append(v.common_name())
self.target_db.put('filter_list', ','.join(fltr))
def load_filter_list(self):
v = self.target_db.get('filter_list')
v = v.decode('utf-8')
return v.split(',')

View File

@ -5,7 +5,6 @@ import unittest
import shutil import shutil
import tempfile import tempfile
import logging import logging
import uuid
# local imports # local imports
from chainsyncer.session import SyncSession from chainsyncer.session import SyncSession
@ -20,28 +19,14 @@ from chainsyncer.unittest import (
MockItem, MockItem,
) )
logging.STATETRACE = 5
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
logg.setLevel(logging.STATETRACE)
def state_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
def filter_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
class TestStoreBase(unittest.TestCase): class TestStoreBase(unittest.TestCase):
def setUp(self): def setUp(self):
self.base_path = tempfile.mkdtemp() self.path = tempfile.mkdtemp()
self.session_id = str(uuid.uuid4())
self.path = os.path.join(self.base_path, self.session_id)
os.makedirs(self.path)
self.store_factory = None self.store_factory = None
self.persist = True
@classmethod @classmethod
@ -50,7 +35,6 @@ class TestStoreBase(unittest.TestCase):
"default", "default",
"store_start", "store_start",
"store_resume", "store_resume",
"filter_list",
"sync_process_nofilter", "sync_process_nofilter",
"sync_process_onefilter", "sync_process_onefilter",
"sync_process_outoforder", "sync_process_outoforder",
@ -71,25 +55,40 @@ class TestStoreBase(unittest.TestCase):
def t_default(self): def t_default(self):
bogus_item = MockItem(0, 0, 0, 0) bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory() store = self.store_factory()
if store.session_path == None: fp = os.path.join(self.path, store.session_id)
return
#fp = os.path.join(self.path, store.session_id)
fp = self.path
session_id = store.session_id session_id = store.session_id
st = None
st = os.stat(fp) st = os.stat(fp)
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertTrue(store.is_default)
fpd = os.path.join(self.path, 'default')
st = os.stat(fpd)
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertTrue(store.is_default)
if st != None: fpd = os.path.realpath(fpd)
self.assertTrue(stat.S_ISDIR(st.st_mode)) self.assertEqual(fpd, fp)
#self.assertTrue(store.is_default)
store.stop(bogus_item) store.stop(bogus_item)
store = self.store_factory() store = self.store_factory()
fpr = os.path.join(self.path, self.session_id) fpr = os.path.join(self.path, session_id)
self.assertEqual(fp, self.path) self.assertEqual(fp, fpr)
self.assertTrue(store.is_default)
store.stop(bogus_item)
store = self.store_factory('default')
fpr = os.path.join(self.path, session_id)
self.assertEqual(fp, fpr)
self.assertTrue(store.is_default)
store.stop(bogus_item)
store = self.store_factory('foo')
fpf = os.path.join(self.path, 'foo')
st = os.stat(fpf)
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertFalse(store.is_default)
def t_store_start(self): def t_store_start(self):
bogus_item = MockItem(0, 0, 0, 0) bogus_item = MockItem(0, 0, 0, 0)
@ -98,11 +97,9 @@ class TestStoreBase(unittest.TestCase):
self.assertTrue(store.first) self.assertTrue(store.first)
store.stop(bogus_item) store.stop(bogus_item)
store = self.store_factory()
if self.persist: store.start()
store = self.store_factory() self.assertFalse(store.first)
store.start()
self.assertFalse(store.first)
def t_store_resume(self): def t_store_resume(self):
@ -229,16 +226,12 @@ class TestStoreBase(unittest.TestCase):
o.next(advance_block=True) o.next(advance_block=True)
session.stop(o) session.stop(o)
if self.persist: store = self.store_factory('foo')
store = self.store_factory('foo') store.start()
store.start() o = store.get(2)
o = store.get(2)
def t_sync_history_interrupted(self): def t_sync_history_interrupted(self):
if not self.persist:
return
bogus_item = MockItem(0, 0, 0, 0) bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory('foo') store = self.store_factory('foo')
session = SyncSession(store) session = SyncSession(store)
@ -276,26 +269,3 @@ class TestStoreBase(unittest.TestCase):
o.next(advance_block=True) o.next(advance_block=True)
with self.assertRaises(SyncDone): with self.assertRaises(SyncDone):
o.next(advance_block=True) o.next(advance_block=True)
def t_filter_list(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
if store.session_path == None:
return
fltr_one = MockFilter('foo_bar')
store.register(fltr_one)
fltr_two = MockFilter('bar_baz')
store.register(fltr_two)
store.start()
store.stop(bogus_item)
store = self.store_factory()
r = store.load_filter_list()
self.assertEqual(r[0], 'foo_bar')
self.assertEqual(r[1], 'bar_baz')

View File

@ -1,5 +1,5 @@
confini~=0.6.0 confini~=0.6.0
semver==2.13.0 semver==2.13.0
hexathon~=0.1.5 hexathon~=0.1.5
chainlib~=0.1.1 chainlib>=0.1.0b1,<0.2.0
shep~=0.2.3 shep~=0.2.2

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chainsyncer name = chainsyncer
version = 0.4.1 version = 0.3.3
description = Generic blockchain syncer driver description = Generic blockchain syncer driver
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
@ -28,14 +28,11 @@ packages =
chainsyncer.driver chainsyncer.driver
chainsyncer.unittest chainsyncer.unittest
chainsyncer.store chainsyncer.store
chainsyncer.cli
chainsyncer.runnable
#[options.package_data] #[options.package_data]
#* = #* =
# sql/* # sql/*
[options.entry_points] #[options.entry_points]
console_scripts = #console_scripts =
#blocksync-celery = chainsyncer.runnable.tracker:main # blocksync-celery = chainsyncer.runnable.tracker:main
chainsyncer-unlock = chainsyncer.runnable.unlock:main

View File

@ -1,33 +0,0 @@
# standard imports
import unittest
import logging
# external imports
from shep import State
# local imports
from chainsyncer.store.mem import SyncMemStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def create(self, session_id=None):
return SyncMemStore(session_id=session_id)
class TestMem(TestStoreBase):
def setUp(self):
super(TestMem, self).setUp()
self.store_factory = StoreFactory().create
self.persist = False
if __name__ == '__main__':
TestStoreBase.link(TestMem)
# Remove tests that test persistence of state
unittest.main()

View File

@ -4,11 +4,7 @@ import logging
# local imports # local imports
from chainsyncer.store.rocksdb import SyncRocksDbStore from chainsyncer.store.rocksdb import SyncRocksDbStore
from chainsyncer.unittest.store import ( from chainsyncer.unittest.store import TestStoreBase
TestStoreBase,
filter_change_callback,
state_change_callback,
)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger() logg = logging.getLogger()
@ -20,7 +16,7 @@ class StoreFactory:
def create(self, session_id=None): def create(self, session_id=None):
return SyncRocksDbStore(self.path, session_id=session_id, state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback) return SyncRocksDbStore(self.path, session_id=session_id)
class TestRocksDb(TestStoreBase): class TestRocksDb(TestStoreBase):