Compare commits

..

No commits in common. "master" and "v0.3.1" have entirely different histories.

28 changed files with 363 additions and 936 deletions

View File

@ -1,17 +1,3 @@
* 0.3.7
- Remove hard eth dependency in settings rendering
- Add unlock cli tool
* 0.3.6
- Add cli arg processing and settings renderer
* 0.3.5
- Allow memory-only shep if factory set to None in store constructor
* 0.3.4
- Use explicit bool check in filter interrupt check
* 0.3.3
- Include shep persistent state bootstrap sync
- Add chainsyncer extras
* 0.3.2
- Implement rocksdb backend
* 0.3.1 * 0.3.1
- Upgrade to release shep version - Upgrade to release shep version
- Move sync state to SYNC after start - Move sync state to SYNC after start

View File

@ -1 +1 @@
include *requirements.txt LICENSE.txt chainsyncer/data/config/* include *requirements.txt LICENSE.txt

View File

@ -1,12 +0,0 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

View File

@ -1,14 +0,0 @@
# local imports
from .base import SyncFlag
def process_flags(argparser, flags):
if flags & SyncFlag.RANGE > 0:
argparser.add_argument('--offset', type=int, help='Block to start sync from. Default is start of history (0).')
argparser.add_argument('--until', type=int, default=-1, help='Block to stop sync on. Default is stop at block height of first run.')
if flags & SyncFlag.HEAD > 0:
argparser.add_argument('--head', action='store_true', help='Start from latest block as offset')
argparser.add_argument('--keep-alive', action='store_true', help='Do not stop syncing when caught up')
argparser.add_argument('--backend', type=str, help='Backend to use for state store')

View File

@ -1,7 +0,0 @@
# standard imports
import enum
class SyncFlag(enum.IntEnum):
RANGE = 1
HEAD = 2

View File

@ -1,20 +0,0 @@
# external imports
from chainsyncer.cli import SyncFlag
def process_config(config, args, flags):
args_override = {}
args_override['SYNCER_BACKEND'] = getattr(args, 'backend')
if flags & SyncFlag.RANGE:
args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
args_override['SYNCER_LIMIT'] = getattr(args, 'until')
config.dict_override(args_override, 'local cli args')
if flags & SyncFlag.HEAD:
config.add(getattr(args, 'keep_alive'), '_KEEP_ALIVE')
config.add(getattr(args, 'head'), '_HEAD')
return config

View File

@ -1,4 +0,0 @@
[syncer]
offset = 0
limit = 0
backend = mem

View File

@ -141,4 +141,3 @@ class SyncDriver:
def get(self, conn): def get(self, conn):
raise NotImplementedError() raise NotImplementedError()

View File

@ -43,17 +43,13 @@ class FilterState:
self.state_store.add('RESET') self.state_store.add('RESET')
self.state = self.state_store.state self.state = self.state_store.state
self.elements = self.state_store.elements
self.put = self.state_store.put self.put = self.state_store.put
self.mask = self.state_store.mask
self.name = self.state_store.name
self.set = self.state_store.set self.set = self.state_store.set
self.next = self.state_store.next self.next = self.state_store.next
self.move = self.state_store.move self.move = self.state_store.move
self.unset = self.state_store.unset self.unset = self.state_store.unset
self.peek = self.state_store.peek self.peek = self.state_store.peek
self.from_name = self.state_store.from_name self.from_name = self.state_store.from_name
self.list = self.state_store.list
self.state_store.sync() self.state_store.sync()
self.all = self.state_store.all self.all = self.state_store.all
self.started = False self.started = False
@ -97,7 +93,6 @@ class FilterState:
if self.scan != None: if self.scan != None:
ks = self.scan() ks = self.scan()
for v in ks: #os.listdir(self.scan_path): for v in ks: #os.listdir(self.scan_path):
logg.debug('ks {}'.format(v))
k = None k = None
try: try:
k = self.state_store.from_elements(v) k = self.state_store.from_elements(v)

View File

@ -1 +0,0 @@

View File

@ -1,146 +0,0 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import sys
import importlib
# external imports
import chainlib.cli
from shep.persist import PersistedState
# local imports
import chainsyncer.cli
from chainsyncer.settings import ChainsyncerSettings
from chainsyncer.store import SyncStore
from chainsyncer.filter import (
FilterState,
SyncFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
valid_fwd = [
'fwd',
'forward',
'next',
'continue',
]
valid_rwd = [
'rwd',
'rewind',
'current',
'back',
'repeat',
'replay',
]
action_is_forward = False
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--state-dir', type=str, dest='state_dir', help='State directory')
argparser.add_positional('action', type=str, help='Action to take on lock. Repeat means re-run the locked filter. Continue means resume execution for next filter.')
sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD
chainsyncer.cli.process_flags(argparser, sync_flags)
args = argparser.parse_args()
if args.action in valid_fwd:
action_is_forward = True
elif args.action not in valid_rwd:
sys.stderr.write('action argument must be one of {} or {}\n'.format(valid_rwd, valid_fwd))
sys.exit(1)
base_config_dir = chainsyncer.cli.config_dir,
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
config = chainsyncer.cli.process_config(config, args, sync_flags)
config.add(args.state_dir, '_STATE_DIR', False)
logg.debug('config loaded:\n{}'.format(config))
settings = ChainsyncerSettings()
settings.process_sync_backend(config)
logg.debug('settings:\n{}'.format(str(settings)))
class FilterInNameOnly(SyncFilter):
def __init__(self, k):
self.k = k
def common_name(self):
return self.k
def main():
if settings.get('SYNCER_BACKEND') == 'mem':
raise ValueError('cannot unlock volatile state store')
state_dir = config.get('_STATE_DIR')
if config.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
store = syncer_store_class(state_dir)
filter_list = store.load_filter_list()
for i, k in enumerate(filter_list):
fltr = FilterInNameOnly(k)
store.register(fltr)
filter_list[i] = k.upper()
store.connect()
store.start(ignore_lock=True)
lock_state = store.filter_state.from_name('LOCK')
locked_item = store.filter_state.list(lock_state)
if len(locked_item) == 0:
sys.stderr.write('Sync filter in {} is not locked\n'.format(state_dir))
sys.exit(1)
elif len(locked_item) > 1:
sys.stderr.write('More than one locked item encountered in {}. That should never happen, so I do not know what to do next.\n'.format(state_dir))
sys.exit(1)
locked_item_key = locked_item[0]
locked_item = store.get(int(locked_item_key))
locked_state = store.filter_state.state(locked_item_key) - lock_state
locked_state_name = store.filter_state.name(locked_state)
logg.info('found item "{}" in locked state {}'.format(locked_item, store.filter_state.name(locked_state)))
if action_is_forward:
k = locked_state_name
filter_index = None
filter_index = filter_list.index(k)
filter_pos = filter_index + 1
filter_count = len(filter_list)
logg.debug('Locked filter {} found at position {} of {}'.format(k, filter_pos, filter_count))
if filter_pos == filter_count:
logg.info('Locked filter {} is the last filter in the list. Executing filter reset'.format(k))
locked_item.reset(check_incomplete=False)
else:
locked_item.advance(ignore_lock=True)
store.filter_state.unset(locked_item_key, lock_state)
else:
filter_mask = 0xf
filter_state = store.filter_state.mask(locked_state, filter_mask)
logg.info('Chosen action is "{}": will continue execution at previous filter {}'.format(args.action, store.filter_state.name(filter_state)))
store.filter_state.unset(locked_item_key, lock_state)
if __name__ == '__main__':
main()

View File

@ -1,12 +1,9 @@
# standard imports # standard imports
import uuid import uuid
import logging
# local imports # local imports
from chainsyncer.error import FilterDone from chainsyncer.error import FilterDone
logg = logging.getLogger(__name__)
class SyncSession: class SyncSession:
@ -32,7 +29,6 @@ class SyncSession:
def filter(self, conn, block, tx): def filter(self, conn, block, tx):
self.session_store.connect() self.session_store.connect()
for fltr in self.filters: for fltr in self.filters:
logg.debug('executing filter {}'.format(fltr))
self.item.advance() self.item.advance()
interrupt = fltr.filter(conn, block, tx) interrupt = fltr.filter(conn, block, tx)
if not self.item.release(interrupt=interrupt): if not self.item.release(interrupt=interrupt):

View File

@ -1,55 +0,0 @@
# standard imports
import logging
# external imports
from hexathon import (
to_int as hex_to_int,
strip_0x,
)
from chainlib.settings import ChainSettings
logg = logging.getLogger(__name__)
class ChainsyncerSettings(ChainSettings):
def process_sync_backend(self, config):
self.o['SYNCER_BACKEND'] = config.get('SYNCER_BACKEND')
def process_sync_range(self, config):
o = self.o['SYNCER_INTERFACE'].block_latest()
r = self.o['RPC'].do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
keep_alive = False
session_block_offset = 0
block_limit = 0
until = 0
if config.true('_HEAD'):
self.o['SYNCER_OFFSET'] = block_offset
self.o['SYNCER_LIMIT'] = -1
return
session_block_offset = int(config.get('SYNCER_OFFSET'))
until = int(config.get('SYNCER_LIMIT'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
elif until == -1:
keep_alive = True
if session_block_offset == -1:
session_block_offset = block_offset
elif config.true('_KEEP_ALIVE'):
block_limit = -1
else:
if block_limit == 0:
block_limit = block_offset
self.o['SYNCER_OFFSET'] = session_block_offset
self.o['SYNCER_LIMIT'] = block_limit

View File

@ -1,10 +1,8 @@
# standard imports # standard imports
import os
import logging import logging
# local imports # local imports
from shep.persist import PersistedState from shep.persist import PersistedState
from shep import State
from shep.error import StateInvalid from shep.error import StateInvalid
from chainsyncer.filter import FilterState from chainsyncer.filter import FilterState
from chainsyncer.error import ( from chainsyncer.error import (
@ -35,7 +33,7 @@ def sync_state_deserialize(b):
# NOT thread safe # NOT thread safe
class SyncItem: class SyncItem:
def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_lock=False): def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_invalid=False):
self.offset = offset self.offset = offset
self.target = target self.target = target
self.sync_state = sync_state self.sync_state = sync_state
@ -47,8 +45,7 @@ class SyncItem:
(self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v) (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
filter_state = self.filter_state.state(self.state_key) if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid:
if filter_state & self.filter_state.from_name('LOCK') > 0 and not ignore_lock:
raise LockError(self.state_key) raise LockError(self.state_key)
self.count = len(self.filter_state.all(pure=True)) - 4 self.count = len(self.filter_state.all(pure=True)) - 4
@ -66,22 +63,11 @@ class SyncItem:
raise FilterDone(self.state_key) raise FilterDone(self.state_key)
def resume(self): def reset(self):
filter_state = self.filter_state.state(self.state_key) if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
if filter_state > 0x0f: raise LockError('reset attempt on {} when state locked'.format(self.state_key))
filter_state_part = self.filter_state.mask(filter_state, 0x0f) if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
if len(self.filter_state.elements(filter_state)) == 1: raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part))
lock_state = self.filter_state.from_name('LOCK')
self.filter_state.set(lock_state)
def reset(self, check_incomplete=True):
if check_incomplete:
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
raise LockError('reset attempt on {} when state locked'.format(self.state_key))
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
@ -114,16 +100,13 @@ class SyncItem:
v = self.filter_state.state(self.state_key) v = self.filter_state.state(self.state_key)
def advance(self, ignore_lock=False): def advance(self):
if self.skip_filter: if self.skip_filter:
raise FilterDone() raise FilterDone()
self.__check_done() self.__check_done()
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
if ignore_lock: raise LockError('advance attempt on {} when state locked'.format(self.state_key))
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
else:
raise LockError('advance attempt on {} when state locked'.format(self.state_key))
done = False done = False
try: try:
self.filter_state.next(self.state_key) self.filter_state.next(self.state_key)
@ -137,7 +120,7 @@ class SyncItem:
def release(self, interrupt=False): def release(self, interrupt=False):
if self.skip_filter: if self.skip_filter:
return False return False
if interrupt == True: if interrupt:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT')) self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
@ -162,8 +145,8 @@ class SyncItem:
class SyncStore: class SyncStore:
def __init__(self, path, session_id=None): def __init__(self, session_id=None):
self.session_id = session_id self.session_id = None
self.session_path = None self.session_path = None
self.is_default = False self.is_default = False
self.first = False self.first = False
@ -172,25 +155,17 @@ class SyncStore:
self.item_keys = [] self.item_keys = []
self.started = False self.started = False
self.thresholds = [] self.thresholds = []
self.session_path = path
def setup_sync_state(self, factory=None, event_callback=None): def setup_sync_state(self, factory, event_callback):
if factory == None: self.state = PersistedState(factory.add, 2, event_callback=event_callback)
self.state = State(2, event_callback=event_callback)
else:
self.state = PersistedState(factory.add, 2, event_callback=event_callback)
self.state.add('SYNC') self.state.add('SYNC')
self.state.add('DONE') self.state.add('DONE')
def setup_filter_state(self, factory=None, event_callback=None): def setup_filter_state(self, factory, event_callback):
if factory == None: filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback)
filter_state_backend = State(0, check_alias=False, event_callback=event_callback) self.filter_state = FilterState(filter_state_backend, scan=factory.ls)
self.filter_state = FilterState(filter_state_backend)
else:
filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend, scan=factory.ls)
self.filters = [] self.filters = []
@ -207,21 +182,18 @@ class SyncStore:
self.filter_state.register(fltr) self.filter_state.register(fltr)
def start(self, offset=0, target=-1, ignore_lock=False): def start(self, offset=0, target=-1):
if self.started: if self.started:
return return
self.save_filter_list() self.load(target)
self.load(target, ignore_lock=ignore_lock)
if self.first: if self.first:
state_bytes = sync_state_serialize(offset, 0, target) state_bytes = sync_state_serialize(offset, 0, target)
block_number_str = str(offset) block_number_str = str(offset)
self.state.put(block_number_str, contents=state_bytes) self.state.put(block_number_str, state_bytes)
self.filter_state.put(block_number_str) self.filter_state.put(block_number_str)
o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock) o = SyncItem(offset, target, self.state, self.filter_state)
o.resume()
self.items[offset] = o self.items[offset] = o
self.item_keys.append(offset) self.item_keys.append(offset)
elif offset > 0: elif offset > 0:
@ -243,10 +215,12 @@ class SyncStore:
self.state.move(item.state_key, self.state.DONE) self.state.move(item.state_key, self.state.DONE)
state_bytes = sync_state_serialize(item.cursor, 0, -1) state_bytes = sync_state_serialize(item.cursor, 0, -1)
self.state.put(str(item.cursor), contents=state_bytes) self.state.put(str(item.cursor), state_bytes)
logg.debug('item {}'.format(self.state.state(item.state_key)))
def load(self, target, ignore_lock=False): def load(self, target):
self.state.sync(self.state.NEW) self.state.sync(self.state.NEW)
self.state.sync(self.state.SYNC) self.state.sync(self.state.SYNC)
@ -270,8 +244,7 @@ class SyncStore:
item_target = target item_target = target
if i < lim: if i < lim:
item_target = thresholds[i+1] item_target = thresholds[i+1]
o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock) o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True)
o.resume()
self.items[block_number] = o self.items[block_number] = o
self.item_keys.append(block_number) self.item_keys.append(block_number)
logg.info('added existing {}'.format(o)) logg.info('added existing {}'.format(o))
@ -305,18 +278,3 @@ class SyncStore:
def disconnect(self): def disconnect(self):
self.filter_state.disconnect() self.filter_state.disconnect()
def save_filter_list(self):
raise NotImplementedError()
def load_filter_list(self):
raise NotImplementedError()
def peek_next_filter(self):
pass
def peek_current_filter(self):
pass

View File

@ -7,7 +7,10 @@ import logging
from shep.store.file import SimpleFileStoreFactory from shep.store.file import SimpleFileStoreFactory
# local imports # local imports
from chainsyncer.store import SyncStore from chainsyncer.store import (
SyncItem,
SyncStore,
)
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -15,7 +18,18 @@ logg = logging.getLogger(__name__)
class SyncFsStore(SyncStore): class SyncFsStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None): def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncFsStore, self).__init__(base_path, session_id=session_id) super(SyncFsStore, self).__init__(session_id=session_id)
default_path = os.path.join(base_path, 'default')
if session_id == None:
self.session_path = os.path.realpath(default_path)
self.is_default = True
else:
if session_id == 'default':
self.is_default = True
given_path = os.path.join(base_path, session_id)
self.session_path = os.path.realpath(given_path)
create_path = False create_path = False
try: try:
@ -24,22 +38,18 @@ class SyncFsStore(SyncStore):
create_path = True create_path = True
if create_path: if create_path:
self.__create_path(base_path, self.default_path, session_id=session_id) self.__create_path(base_path, default_path, session_id=session_id)
self.session_id = os.path.basename(self.session_path) self.session_id = os.path.basename(self.session_path)
logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path)) logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
base_sync_path = os.path.join(self.session_path, 'sync') base_sync_path = os.path.join(self.session_path, 'sync')
factory = SimpleFileStoreFactory(base_sync_path, binary=True) factory = SimpleFileStoreFactory(base_sync_path, binary=True)
self.setup_sync_state(factory, state_event_callback) self.setup_sync_state(factory, state_event_callback)
self.setup_filter_state(callback=filter_state_event_callback)
def setup_filter_state(self, callback=None):
base_filter_path = os.path.join(self.session_path, 'filter') base_filter_path = os.path.join(self.session_path, 'filter')
factory = SimpleFileStoreFactory(base_filter_path, binary=True) factory = SimpleFileStoreFactory(base_filter_path, binary=True)
super(SyncFsStore, self).setup_filter_state(factory, callback) self.setup_filter_state(factory, filter_state_event_callback)
def __create_path(self, base_path, default_path, session_id=None): def __create_path(self, base_path, default_path, session_id=None):
@ -74,25 +84,3 @@ class SyncFsStore(SyncStore):
f.write(str(v)) f.write(str(v))
f.close() f.close()
self.target = v self.target = v
def load_filter_list(self):
fltr = []
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'r')
while True:
v = f.readline()
if len(v) == 0:
break
v = v.rstrip()
fltr.append(v)
f.close()
return fltr
def save_filter_list(self):
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'w')
for fltr in self.filters:
f.write(fltr.common_name() + '\n')
f.close()

View File

@ -1,45 +0,0 @@
# standard imports
import logging
import os
# external imports
from shep import State
# local imports
from chainsyncer.store import SyncStore
logg = logging.getLogger(__name__)
class SyncMemStore(SyncStore):
def __init__(self, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncMemStore, self).__init__(None, session_id=session_id)
factory = None
self.setup_sync_state(factory, state_event_callback)
factory = None
self.setup_filter_state(factory, filter_state_event_callback)
def set_target(self, v):
self.target = int(v)
def get_target(self):
return self.target
def stop(self, item):
if item != None:
super(SyncMemStore, self).stop(item)
logg.info('I am an in-memory only state store. I am shutting down now, so all state will now be discarded.')
def save_filter_list(self):
pass
def load_filter_list(self):
return []

View File

@ -1,79 +0,0 @@
# standard imports
import uuid
import os
import logging
# external imports
from shep.store.rocksdb import RocksDbStoreFactory
# local imports
from chainsyncer.store import (
SyncItem,
SyncStore,
)
logg = logging.getLogger(__name__)
class RocksDbStoreAdder:
def __init__(self, factory, prefix):
self.factory = factory
self.prefix = prefix
def add(self, k):
path = os.path.join(self.prefix, k)
return self.factory.add(path)
def ls(self):
return self.factory.ls()
class SyncRocksDbStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncRocksDbStore, self).__init__(base_path, session_id=session_id)
self.factory = RocksDbStoreFactory(self.session_path, binary=True)
prefix_factory = RocksDbStoreAdder(self.factory, 'sync')
self.setup_sync_state(prefix_factory, state_event_callback)
prefix_factory = RocksDbStoreAdder(self.factory, 'filter')
self.setup_filter_state(prefix_factory, filter_state_event_callback)
#self.session_id = os.path.basename(self.session_path)
#logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
self.target_db = RocksDbStoreAdder(self.factory, '.stat').add('target')
def get_target(self):
v = self.target_db.get('target')
if v != None:
self.target = int(v)
def set_target(self, v):
self.target_db.put('target', str(v))
self.target = v
def stop(self, item):
if item != None:
super(SyncRocksDbStore, self).stop(item)
self.factory.close()
def save_filter_list(self):
fltr = []
for v in self.filters:
fltr.append(v.common_name())
self.target_db.put('filter_list', ','.join(fltr))
def load_filter_list(self):
v = self.target_db.get('filter_list')
v = v.decode('utf-8')
return v.split(',')

View File

@ -267,12 +267,3 @@ class MockChainInterfaceConn(MockConn):
def handle_receipt(self, hsh): def handle_receipt(self, hsh):
return {} return {}
class MockItem:
def __init__(self, target, offset, cursor, state_key):
self.target = target
self.offset = offset
self.cursor = cursor
self.state_key = state_key

View File

@ -0,0 +1,64 @@
# standard imports
import logging
import os
# external imports
import alembic
import alembic.config
# local imports
from chainsyncer.db.models.base import SessionBase
from chainsyncer.db import dsn_from_config
from chainsyncer.db.models.base import SessionBase
logg = logging.getLogger(__name__)
class ChainSyncerDb:
"""SQLITE database setup for unit tests
:param debug: Activate sql level debug (outputs sql statements)
:type debug: bool
"""
base = SessionBase
def __init__(self, debug=False):
config = {
'DATABASE_ENGINE': 'sqlite',
'DATABASE_DRIVER': 'pysqlite',
'DATABASE_NAME': 'chainsyncer.sqlite',
}
logg.debug('config {}'.format(config))
self.dsn = dsn_from_config(config)
self.base.poolable = False
self.base.transactional = False
self.base.procedural = False
self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0
rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..')
dbdir = os.path.join(rootdir, 'chainsyncer', 'db')
#migrationsdir = os.path.join(dbdir, 'migrations', config.get('DATABASE_ENGINE'))
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', self.dsn)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
def bind_session(self, session=None):
"""Create session using underlying session base
"""
return self.base.bind_session(session)
def release_session(self, session=None):
"""Release session using underlying session base
"""
return self.base.release_session(session)

View File

@ -1,301 +0,0 @@
# standard imports
import os
import stat
import unittest
import shutil
import tempfile
import logging
import uuid
# local imports
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import (
MockFilter,
MockItem,
)
logging.STATETRACE = 5
logg = logging.getLogger(__name__)
logg.setLevel(logging.STATETRACE)
def state_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
def filter_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
class TestStoreBase(unittest.TestCase):
def setUp(self):
self.base_path = tempfile.mkdtemp()
self.session_id = str(uuid.uuid4())
self.path = os.path.join(self.base_path, self.session_id)
os.makedirs(self.path)
self.store_factory = None
self.persist = True
@classmethod
def link(cls, target):
for v in [
"default",
"store_start",
"store_resume",
"filter_list",
"sync_process_nofilter",
"sync_process_onefilter",
"sync_process_outoforder",
"sync_process_interrupt",
"sync_process_reset",
"sync_process_done",
"sync_head_future",
"sync_history_interrupted",
"sync_history_complete",
]:
setattr(target, 'test_' + v, getattr(cls, 't_' + v))
def tearDown(self):
shutil.rmtree(self.path)
def t_default(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
if store.session_path == None:
return
#fp = os.path.join(self.path, store.session_id)
fp = self.path
session_id = store.session_id
st = None
st = os.stat(fp)
if st != None:
self.assertTrue(stat.S_ISDIR(st.st_mode))
#self.assertTrue(store.is_default)
store.stop(bogus_item)
store = self.store_factory()
fpr = os.path.join(self.path, self.session_id)
self.assertEqual(fp, self.path)
def t_store_start(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
store.start(42)
self.assertTrue(store.first)
store.stop(bogus_item)
if self.persist:
store = self.store_factory()
store.start()
self.assertFalse(store.first)
def t_store_resume(self):
store = self.store_factory()
store.start(13)
self.assertTrue(store.first)
# todo not done
def t_sync_process_nofilter(self):
store = self.store_factory()
session = SyncSession(store)
session.start()
o = session.get(0)
with self.assertRaises(FilterDone):
o.advance()
def t_sync_process_onefilter(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start()
o = session.get(0)
o.advance()
o.release()
def t_sync_process_outoforder(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('two')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.advance()
o.release()
with self.assertRaises(LockError):
o.release()
o.advance()
o.release()
def t_sync_process_interrupt(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
o.release(interrupt=True)
with self.assertRaises(FilterDone):
o.advance()
def t_sync_process_reset(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.reset()
o.release()
with self.assertRaises(IncompleteFilterError):
o.reset()
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
def t_sync_process_done(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start(target=0)
o = session.get(0)
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def t_sync_head_future(self):
store = self.store_factory('foo')
session = SyncSession(store)
session.start()
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
if self.persist:
store = self.store_factory('foo')
store.start()
o = store.get(2)
def t_sync_history_interrupted(self):
if not self.persist:
return
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory('foo')
session = SyncSession(store)
session.start(target=13)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store.stop(bogus_item)
store = self.store_factory('foo')
store.start()
o = store.get(0)
self.assertEqual(o.cursor, 2)
self.assertEqual(o.target, 13)
o.next(advance_block=True)
o.next(advance_block=True)
store.stop(bogus_item)
store = self.store_factory('foo')
store.start()
self.assertEqual(o.cursor, 4)
self.assertEqual(o.target, 13)
def t_sync_history_complete(self):
store = self.store_factory('foo')
session = SyncSession(store)
session.start(target=3)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
o.next(advance_block=True)
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def t_filter_list(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
if store.session_path == None:
return
fltr_one = MockFilter('foo_bar')
store.register(fltr_one)
fltr_two = MockFilter('bar_baz')
store.register(fltr_two)
store.start()
store.stop(bogus_item)
store = self.store_factory()
r = store.load_filter_list()
self.assertEqual(r[0], 'foo_bar')
self.assertEqual(r[1], 'bar_baz')

View File

@ -1,5 +1,5 @@
confini~=0.6.0 confini~=0.6.0
semver==2.13.0 semver==2.13.0
hexathon~=0.1.5 hexathon~=0.1.5
chainlib~=0.1.1 chainlib>=0.1.0b1,<=0.1.0
shep~=0.2.3 shep>=0.2.0rc1,<0.3.0

View File

@ -8,12 +8,5 @@ for f in `ls tests/*.py`; do
exit exit
fi fi
done done
for f in `ls tests/store/*.py`; do
python $f
if [ $? -gt 0 ]; then
exit
fi
done
set +x set +x
set +e set +e

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chainsyncer name = chainsyncer
version = 0.4.1 version = 0.3.1
description = Generic blockchain syncer driver description = Generic blockchain syncer driver
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
@ -28,14 +28,12 @@ packages =
chainsyncer.driver chainsyncer.driver
chainsyncer.unittest chainsyncer.unittest
chainsyncer.store chainsyncer.store
chainsyncer.cli chainsyncer.state
chainsyncer.runnable
#[options.package_data] #[options.package_data]
#* = #* =
# sql/* # sql/*
[options.entry_points] #[options.entry_points]
console_scripts = #console_scripts =
#blocksync-celery = chainsyncer.runnable.tracker:main # blocksync-celery = chainsyncer.runnable.tracker:main
chainsyncer-unlock = chainsyncer.runnable.unlock:main

View File

@ -26,7 +26,5 @@ setup(
install_requires=requirements, install_requires=requirements,
extras_require={ extras_require={
'sql': sql_requirements, 'sql': sql_requirements,
'rocksdb': ['shep[rocksdb]~=0.2.2'],
'redis': ['shep[redis]~=0.2.2'],
} }
) )

View File

@ -1,33 +0,0 @@
# standard imports
import unittest
import logging
# external imports
from shep import State
# local imports
from chainsyncer.store.mem import SyncMemStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def create(self, session_id=None):
return SyncMemStore(session_id=session_id)
class TestMem(TestStoreBase):
def setUp(self):
super(TestMem, self).setUp()
self.store_factory = StoreFactory().create
self.persist = False
if __name__ == '__main__':
TestStoreBase.link(TestMem)
# Remove tests that test persistence of state
unittest.main()

View File

@ -1,32 +0,0 @@
# standard imports
import unittest
import logging
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def __init__(self, path):
self.path = path
def create(self, session_id=None):
return SyncFsStore(self.path, session_id=session_id)
class TestFs(TestStoreBase):
def setUp(self):
super(TestFs, self).setUp()
self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
TestStoreBase.link(TestFs)
unittest.main()

View File

@ -1,35 +0,0 @@
# standard imports
import unittest
import logging
# local imports
from chainsyncer.store.rocksdb import SyncRocksDbStore
from chainsyncer.unittest.store import (
TestStoreBase,
filter_change_callback,
state_change_callback,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def __init__(self, path):
self.path = path
def create(self, session_id=None):
return SyncRocksDbStore(self.path, session_id=session_id, state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
class TestRocksDb(TestStoreBase):
def setUp(self):
super(TestRocksDb, self).setUp()
self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
TestStoreBase.link(TestRocksDb)
unittest.main()

245
tests/test_fs.py Normal file
View File

@ -0,0 +1,245 @@
# standard imports
import unittest
import tempfile
import shutil
import logging
import stat
import os
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import MockFilter
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestFs(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.path)
def test_default(self):
store = SyncFsStore(self.path)
fp = os.path.join(self.path, store.session_id)
session_id = store.session_id
st = os.stat(fp)
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertTrue(store.is_default)
fpd = os.path.join(self.path, 'default')
st = os.stat(fpd)
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertTrue(store.is_default)
fpd = os.path.realpath(fpd)
self.assertEqual(fpd, fp)
store = SyncFsStore(self.path)
fpr = os.path.join(self.path, session_id)
self.assertEqual(fp, fpr)
self.assertTrue(store.is_default)
store = SyncFsStore(self.path, 'default')
fpr = os.path.join(self.path, session_id)
self.assertEqual(fp, fpr)
self.assertTrue(store.is_default)
store = SyncFsStore(self.path, 'foo')
fpf = os.path.join(self.path, 'foo')
st = os.stat(fpf)
self.assertTrue(stat.S_ISDIR(st.st_mode))
self.assertFalse(store.is_default)
def test_store_start(self):
store = SyncFsStore(self.path)
store.start(42)
self.assertTrue(store.first)
store = SyncFsStore(self.path)
store.start()
self.assertFalse(store.first)
def test_store_resume(self):
store = SyncFsStore(self.path)
store.start(13)
self.assertTrue(store.first)
# todo not done
def test_sync_process_nofilter(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
session.start()
o = session.get(0)
with self.assertRaises(FilterDone):
o.advance()
def test_sync_process_onefilter(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start()
o = session.get(0)
o.advance()
o.release()
def test_sync_process_outoforder(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('two')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.advance()
o.release()
with self.assertRaises(LockError):
o.release()
o.advance()
o.release()
def test_sync_process_interrupt(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
o.release(interrupt=True)
with self.assertRaises(FilterDone):
o.advance()
def test_sync_process_reset(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.reset()
o.release()
with self.assertRaises(IncompleteFilterError):
o.reset()
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
def test_sync_process_done(self):
store = SyncFsStore(self.path)
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start(target=0)
o = session.get(0)
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def test_sync_head_future(self):
store = SyncFsStore(self.path, session_id='foo')
session = SyncSession(store)
session.start()
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store = SyncFsStore(self.path, session_id='foo')
store.start()
o = store.get(2)
def test_sync_history_interrupted(self):
store = SyncFsStore(self.path, session_id='foo')
session = SyncSession(store)
session.start(target=13)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store = SyncFsStore(self.path, session_id='foo')
store.start()
o = store.get(0)
self.assertEqual(o.cursor, 2)
self.assertEqual(o.target, 13)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store = SyncFsStore(self.path, session_id='foo')
store.start()
self.assertEqual(o.cursor, 4)
self.assertEqual(o.target, 13)
def test_sync_history_complete(self):
store = SyncFsStore(self.path, session_id='foo')
session = SyncSession(store)
session.start(target=3)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
o.next(advance_block=True)
with self.assertRaises(SyncDone):
o.next(advance_block=True)
if __name__ == '__main__':
unittest.main()