From 891f90ae5f1a2cd38998fdbd6a7e7b5e3791fa35 Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 20 Apr 2022 11:59:12 +0000 Subject: [PATCH] WIP refactor to allow other backends --- chainsyncer/state/base.py | 3 +- chainsyncer/store/__init__.py | 1 + chainsyncer/store/base.py | 217 +++++++++++++++++++++++++++++++++ chainsyncer/store/fs.py | 220 ++-------------------------------- requirements.txt | 2 +- tests/test_session.py | 4 +- 6 files changed, 232 insertions(+), 215 deletions(-) create mode 100644 chainsyncer/store/__init__.py create mode 100644 chainsyncer/store/base.py diff --git a/chainsyncer/state/base.py b/chainsyncer/state/base.py index 63ce30f..3f179ad 100644 --- a/chainsyncer/state/base.py +++ b/chainsyncer/state/base.py @@ -7,7 +7,7 @@ import os logg = logging.getLogger(__name__) -re_processedname = r'^_?[A-Z,_]*$' +re_processedname = r'^_?[A-Z,\.]*$' # TODO: properly clarify interface shared with syncfsstore, move to filter module? class SyncState: @@ -74,7 +74,6 @@ class SyncState: self.__syncs[v] = True if self.scan_path != None: for v in os.listdir(self.scan_path): - logg.debug('sync {} try {}'.format(self.scan_path, v)) if re.match(re_processedname, v): k = None try: diff --git a/chainsyncer/store/__init__.py b/chainsyncer/store/__init__.py new file mode 100644 index 0000000..9b5ed21 --- /dev/null +++ b/chainsyncer/store/__init__.py @@ -0,0 +1 @@ +from .base import * diff --git a/chainsyncer/store/base.py b/chainsyncer/store/base.py new file mode 100644 index 0000000..afa8a88 --- /dev/null +++ b/chainsyncer/store/base.py @@ -0,0 +1,217 @@ +# standard imports +import logging + +# local imports +from shep.error import StateInvalid +from chainsyncer.error import ( + LockError, + FilterDone, + InterruptError, + IncompleteFilterError, + SyncDone, + ) + +logg = logging.getLogger(__name__) + + +def sync_state_serialize(block_height, tx_index, block_target): + b = block_height.to_bytes(4, 'big') + b += tx_index.to_bytes(4, 'big') + b += block_target.to_bytes(4, 'big', signed=True) + return b + + +def sync_state_deserialize(b): + block_height = int.from_bytes(b[:4], 'big') + tx_index = int.from_bytes(b[4:8], 'big') + block_target = int.from_bytes(b[8:], 'big', signed=True) + return (block_height, tx_index, block_target,) + + +# NOT thread safe +class SyncItem: + + def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_invalid=False): + self.offset = offset + self.target = target + self.sync_state = sync_state + self.filter_state = filter_state + self.state_key = str(offset) + + logg.debug('get key {}'.format(self.state_key)) + v = self.sync_state.get(self.state_key) + + (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v) + + if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid: + raise LockError(self.state_key) + + self.count = len(self.filter_state.all(pure=True)) - 4 + self.skip_filter = False + if self.count == 0: + self.skip_filter = True + elif not started: + self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) + + + def __check_done(self): + if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0: + raise InterruptError(self.state_key) + if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') > 0: + raise FilterDone(self.state_key) + + + def reset(self): + if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: + raise LockError('reset attempt on {} when state locked'.format(self.state_key)) + if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0: + raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key)) + self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) + + + def next(self, advance_block=False): + v = self.sync_state.state(self.state_key) + if v == self.sync_state.DONE: + raise SyncDone(self.target) + elif v == self.sync_state.NEW: + self.sync_state.next(self.state_key) + + v = self.sync_state.get(self.state_key) + (block_number, tx_index, target) = sync_state_deserialize(v) + if advance_block: + block_number += 1 + tx_index = 0 + if self.target >= 0 and block_number > self.target: + self.sync_state.move(self.state_key, self.sync_state.DONE) + raise SyncDone(self.target) + else: + tx_index += 1 + + self.cursor = block_number + self.tx_cursor = tx_index + + b = sync_state_serialize(block_number, tx_index, target) + self.sync_state.replace(self.state_key, b) + + + def __find_advance(self): + v = self.filter_state.state(self.state_key) + + + def advance(self): + if self.skip_filter: + raise FilterDone() + self.__check_done() + + if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: + raise LockError('advance attempt on {} when state locked'.format(self.state_key)) + done = False + try: + self.filter_state.next(self.state_key) + except StateInvalid: + done = True + if done: + raise FilterDone() + self.filter_state.set(self.state_key, self.filter_state.from_name('LOCK')) + + + def release(self, interrupt=False): + if self.skip_filter: + return False + if interrupt: + self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) + self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT')) + self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) + return False + + state = self.filter_state.state(self.state_key) + if state & self.filter_state.from_name('LOCK') == 0: + raise LockError('release attempt on {} when state unlocked'.format(self.state_key)) + self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) + try: + c = self.filter_state.peek(self.state_key) + logg.debug('peeked {}'.format(c)) + except StateInvalid: + self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) + return False + return True + + + def __str__(self): + return 'syncitem offset {} target {} cursor {}'.format(self.offset, self.target, self.cursor) + + +class SyncStore: + + def __init__(self, session_id=None): + self.session_id = None + self.session_path = None + self.is_default = False + self.first = False + self.target = None + self.items = {} + self.item_keys = [] + self.started = False + + + def register(self, fltr): + self.filters.append(fltr) + self.filter_state.register(fltr) + + + def start(self, offset=0, target=-1): + if self.started: + return + + self.load(target) + + if self.first: + state_bytes = sync_state_serialize(offset, 0, target) + block_number_str = str(offset) + self.state.put(block_number_str, state_bytes) + self.filter_state.put(block_number_str) + o = SyncItem(offset, target, self.state, self.filter_state) + self.items[offset] = o + self.item_keys.append(offset) + elif offset > 0: + logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id)) + self.started = True + + self.item_keys.sort() + + + def stop(self, item): + if item.target == -1: + state_bytes = sync_state_serialize(item.cursor, 0, item.cursor) + self.state.replace(str(item.offset), state_bytes) + self.filter_state.put(str(item.cursor)) + + SyncItem(item.offset, -1, self.state, self.filter_state) + logg.info('New sync state start at block number {} for next head sync backfill'.format(item.cursor)) + + self.state.move(item.state_key, self.state.DONE) + + state_bytes = sync_state_serialize(item.cursor, 0, -1) + self.state.put(str(item.cursor), state_bytes) + + logg.debug('item {}'.format(self.state.state(item.state_key))) + + + def get(self, k): + return self.items[k] + + + def next_item(self): + try: + k = self.item_keys.pop(0) + except IndexError: + return None + return self.items[k] + + + def connect(self): + self.filter_state.connect() + + + def disconnect(self): + self.filter_state.disconnect() diff --git a/chainsyncer/store/fs.py b/chainsyncer/store/fs.py index a958fca..e1ef731 100644 --- a/chainsyncer/store/fs.py +++ b/chainsyncer/store/fs.py @@ -6,159 +6,21 @@ import logging # external imports from shep.store.file import SimpleFileStoreFactory from shep.persist import PersistedState -from shep.error import StateInvalid # local imports from chainsyncer.state import SyncState -from chainsyncer.error import ( - LockError, - FilterDone, - InterruptError, - IncompleteFilterError, - SyncDone, +from chainsyncer.store import ( + SyncItem, + SyncStore, ) + logg = logging.getLogger(__name__) -def sync_state_serialize(block_height, tx_index, block_target): - b = block_height.to_bytes(4, 'big') - b += tx_index.to_bytes(4, 'big') - b += block_target.to_bytes(4, 'big', signed=True) - return b - - -def sync_state_deserialize(b): - block_height = int.from_bytes(b[:4], 'big') - tx_index = int.from_bytes(b[4:8], 'big') - block_target = int.from_bytes(b[8:], 'big', signed=True) - return (block_height, tx_index, block_target,) - - -# NOT thread safe -class SyncFsItem: - - def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_invalid=False): - self.offset = offset - self.target = target - self.sync_state = sync_state - self.filter_state = filter_state - self.state_key = str(offset) - - logg.debug('get key {}'.format(self.state_key)) - v = self.sync_state.get(self.state_key) - - (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v) - - if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid: - raise LockError(self.state_key) - - self.count = len(self.filter_state.all(pure=True)) - 4 - self.skip_filter = False - if self.count == 0: - self.skip_filter = True - elif not started: - self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) - - - def __check_done(self): - if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0: - raise InterruptError(self.state_key) - if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') > 0: - raise FilterDone(self.state_key) - - - def reset(self): - if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: - raise LockError('reset attempt on {} when state locked'.format(self.state_key)) - if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0: - raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key)) - self.filter_state.move(self.state_key, self.filter_state.from_name('RESET')) - - - def next(self, advance_block=False): - v = self.sync_state.state(self.state_key) - if v == self.sync_state.DONE: - raise SyncDone(self.target) - elif v == self.sync_state.NEW: - self.sync_state.next(self.state_key) - - v = self.sync_state.get(self.state_key) - (block_number, tx_index, target) = sync_state_deserialize(v) - if advance_block: - block_number += 1 - tx_index = 0 - if self.target >= 0 and block_number > self.target: - self.sync_state.move(self.state_key, self.sync_state.DONE) - raise SyncDone(self.target) - else: - tx_index += 1 - - self.cursor = block_number - self.tx_cursor = tx_index - - b = sync_state_serialize(block_number, tx_index, target) - self.sync_state.replace(self.state_key, b) - - - def __find_advance(self): - v = self.filter_state.state(self.state_key) - - - def advance(self): - if self.skip_filter: - raise FilterDone() - self.__check_done() - - if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0: - raise LockError('advance attempt on {} when state locked'.format(self.state_key)) - done = False - try: - self.filter_state.next(self.state_key) - except StateInvalid: - done = True - if done: - raise FilterDone() - self.filter_state.set(self.state_key, self.filter_state.from_name('LOCK')) - - - def release(self, interrupt=False): - if self.skip_filter: - return False - if interrupt: - self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) - self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT')) - self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) - return False - - state = self.filter_state.state(self.state_key) - if state & self.filter_state.from_name('LOCK') == 0: - raise LockError('release attempt on {} when state unlocked'.format(self.state_key)) - self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) - try: - c = self.filter_state.peek(self.state_key) - logg.debug('peeked {}'.format(c)) - except StateInvalid: - self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) - return False - return True - - - def __str__(self): - return 'syncitem offset {} target {} cursor {}'.format(self.offset, self.target, self.cursor) - - - -class SyncFsStore: +class SyncFsStore(SyncStore): def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None): - self.session_id = None - self.session_path = None - self.is_default = False - self.first = False - self.target = None - self.items = {} - self.item_keys = [] - self.started = False + super(SyncFsStore, self).__init__(session_id=session_id) default_path = os.path.join(base_path, 'default') @@ -195,11 +57,6 @@ class SyncFsStore: self.filters = [] # used by SyncSession - def register(self, fltr): - self.filters.append(fltr) - self.filter_state.register(fltr) - - def __create_path(self, base_path, default_path, session_id=None): logg.debug('fs store path {} does not exist, creating'.format(self.session_path)) if session_id == None: @@ -214,7 +71,7 @@ class SyncFsStore: pass - def __load(self, target): + def load(self, target): self.state.sync(self.state.NEW) self.state.sync(self.state.SYNC) @@ -233,11 +90,13 @@ class SyncFsStore: thresholds_new.sort() thresholds = thresholds_sync + thresholds_new lim = len(thresholds) - 1 + + logg.debug('thresholds {}'.format(thresholds)) for i in range(len(thresholds)): item_target = target if i < lim: item_target = thresholds[i+1] - o = SyncFsItem(block_number, item_target, self.state, self.filter_state, started=True) + o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True) self.items[block_number] = o self.item_keys.append(block_number) logg.info('added existing {}'.format(o)) @@ -263,62 +122,3 @@ class SyncFsStore: f.write(str(target)) f.close() self.target = target - - - def start(self, offset=0, target=-1): - if self.started: - return - - self.__load(target) - - if self.first: - state_bytes = sync_state_serialize(offset, 0, target) - block_number_str = str(offset) - self.state.put(block_number_str, state_bytes) - self.filter_state.put(block_number_str) - o = SyncFsItem(offset, target, self.state, self.filter_state) - self.items[offset] = o - self.item_keys.append(offset) - elif offset > 0: - logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id)) - self.started = True - - self.item_keys.sort() - - - def stop(self, item): - if item.target == -1: - state_bytes = sync_state_serialize(item.cursor, 0, item.cursor) - self.state.replace(str(item.offset), state_bytes) - self.filter_state.put(str(item.cursor)) - - SyncFsItem(item.offset, -1, self.state, self.filter_state) - logg.info('New sync state start at block number {} for next head sync backfill'.format(item.cursor)) - - self.state.move(item.state_key, self.state.DONE) - - state_bytes = sync_state_serialize(item.cursor, 0, -1) - self.state.put(str(item.cursor), state_bytes) - - logg.debug('item {}'.format(self.state.state(item.state_key))) - - - def get(self, k): - return self.items[k] - - - def next_item(self): - try: - k = self.item_keys.pop(0) - except IndexError: - return None - return self.items[k] - - - def connect(self): - self.filter_state.connect() - - - def disconnect(self): - self.filter_state.disconnect() - diff --git a/requirements.txt b/requirements.txt index ab01bc9..999cdbb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,4 @@ confini~=0.6.0 semver==2.13.0 hexathon~=0.1.5 chainlib>=0.1.0b1,<=0.1.0 -shep~=0.1.1 +shep>=0.2.0rc1,<0.3.0 diff --git a/tests/test_session.py b/tests/test_session.py index b02e956..7586786 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -40,8 +40,8 @@ class TestFilter(unittest.TestCase): self.conn = MockConn() - def tearDown(self): - shutil.rmtree(self.path) +# def tearDown(self): +# shutil.rmtree(self.path) def test_filter_basic(self):