From 6b6b26f1aefe08f55a3cc3b30bf0444e9dfc9239 Mon Sep 17 00:00:00 2001 From: lash Date: Sat, 30 Apr 2022 07:35:08 +0000 Subject: [PATCH] WIP Move unlock filter code into store base --- chainsyncer/error.py | 9 ++++ chainsyncer/runnable/unlock.py | 2 + chainsyncer/session.py | 9 ++-- chainsyncer/store/base.py | 83 +++++++++++++++++++++++++++++---- chainsyncer/unittest/base.py | 3 +- chainsyncer/unittest/store.py | 4 +- setup.cfg | 2 +- tests/test_filter.py | 85 +++++++++++++++++++++++++++++++++- 8 files changed, 178 insertions(+), 19 deletions(-) diff --git a/chainsyncer/error.py b/chainsyncer/error.py index fab66ce..c34c350 100644 --- a/chainsyncer/error.py +++ b/chainsyncer/error.py @@ -31,16 +31,25 @@ class LockError(Exception): class FilterDone(Exception): """Exception raised when all registered filters have been executed """ + pass class InterruptError(FilterDone): """Exception for interrupting or attempting to use an interrupted sync """ + pass class IncompleteFilterError(Exception): """Exception raised if filter reset is executed prematurely """ + pass + + +class FilterInitializationError(BackendError): + """Exception raised if filter state does not match the registered filters + """ + pass #class AbortTx(Exception): # """ diff --git a/chainsyncer/runnable/unlock.py b/chainsyncer/runnable/unlock.py index 20e8a72..0d79961 100644 --- a/chainsyncer/runnable/unlock.py +++ b/chainsyncer/runnable/unlock.py @@ -106,6 +106,8 @@ def main(): store.connect() store.start(ignore_lock=True) + store.unlock_filter(not action_is_forward) + return lock_state = store.filter_state.from_name('LOCK') locked_item = store.filter_state.list(lock_state) diff --git a/chainsyncer/session.py b/chainsyncer/session.py index 3274675..0ebf98f 100644 --- a/chainsyncer/session.py +++ b/chainsyncer/session.py @@ -13,12 +13,15 @@ class SyncSession: def __init__(self, session_store): self.session_store = session_store self.started = self.session_store.started - self.get = self.session_store.get self.next = self.session_store.next_item self.item = None self.filters = self.session_store.filters - + + def get(self, k): + return self.session_store.get(str(k)) + + def start(self, offset=0, target=-1): self.session_store.start(offset=offset, target=target) self.item = self.session_store.next_item() @@ -38,5 +41,5 @@ class SyncSession: if not self.item.release(interrupt=interrupt): break self.item.reset() - self.next() + #self.next() self.session_store.disconnect() diff --git a/chainsyncer/store/base.py b/chainsyncer/store/base.py index 5b28933..417a9a3 100644 --- a/chainsyncer/store/base.py +++ b/chainsyncer/store/base.py @@ -13,6 +13,7 @@ from chainsyncer.error import ( InterruptError, IncompleteFilterError, SyncDone, + FilterInitializationError, ) logg = logging.getLogger(__name__) @@ -67,13 +68,16 @@ class SyncItem: def resume(self): + return filter_state = self.filter_state.state(self.state_key) if filter_state > 0x0f: filter_state_part = self.filter_state.mask(filter_state, 0x0f) - if len(self.filter_state.elements(filter_state)) == 1: - logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part)) - lock_state = self.filter_state.from_name('LOCK') - self.filter_state.set(lock_state) + if filter_state_part > 0: + filter_state_part_name = self.filter_state.name(filter_state_part) + if filter_state_part_name[0] != '_': + logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part)) + lock_state = self.filter_state.from_name('LOCK') + self.filter_state.set(self.state_key, lock_state) def reset(self, check_incomplete=True): @@ -222,8 +226,9 @@ class SyncStore: self.filter_state.put(block_number_str) o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock) o.resume() - self.items[offset] = o - self.item_keys.append(offset) + k = str(offset) + self.items[k] = o + self.item_keys.append(k) elif offset > 0: logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id)) self.started = True @@ -272,8 +277,9 @@ class SyncStore: item_target = thresholds[i+1] o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock) o.resume() - self.items[block_number] = o - self.item_keys.append(block_number) + k = str(block_number) + self.items[k] = o + self.item_keys.append(k) logg.info('added existing {}'.format(o)) self.get_target() @@ -315,8 +321,65 @@ class SyncStore: raise NotImplementedError() - def peek_next_filter(self): - pass + def __get_locked_item(self): + locked_item = self.filter_state.list(self.filter_state.state_store.LOCK) + + if len(locked_item) == 0: + logg.error('Sync filter in store {} is not locked\n'.format(self)) + return None + elif len(locked_item) > 1: + raise FilterInitializationError('More than one locked filter item encountered in store {}. That should never happen, so I do not know what to do next.\n'.format(self)) + return locked_item[0] + + + def __get_filter_index(self, k): + i = -1 + fltrs = self.load_filter_list() + for fltr in fltrs: + i += 1 + if k == fltr.upper(): + logg.debug('lock filter match at filter list index {}'.format(i)) + return (i, fltrs,) + + + def unlock_filter(self, revert=False): + locked_item_key = self.__get_locked_item() + if locked_item_key == None: + return False + locked_item = self.get(locked_item_key) + locked_state = self.filter_state.state(locked_item_key) - self.filter_state.state_store.LOCK + locked_state_name = self.filter_state.name(locked_state) + + logg.debug('found locked item {} in state {}'.format(locked_item, locked_state)) + + (i, fltrs) = self.__get_filter_index(locked_state_name) + + if i == -1: + raise FilterInitializationError('locked state {} ({}) found for item {}, but matching filter has not been registered'.format(locked_state_name, locked_state, locked_item)) + + if revert: + self.__unlock_previous(locked_item, fltrs, i) + else: + self.__unlock_next(locked_item, fltrs, i) + + return True + + + def __unlock_next(self, item, lst, index): + #self.filter_state.state_store.unset(item.state_key, self.filter_state.state_store.LOCK) + if index == len(lst) - 1: + item.reset(check_incomplete=False) + else: + item.release() + + + def __unlock_previous(self, item, lst, index): + if index == 0: + item.reset(check_incomplete=False) + else: + new_state = lst(index) + self.filter_state.state_store.from_name(new_state.upper()) + def peek_current_filter(self): pass diff --git a/chainsyncer/unittest/base.py b/chainsyncer/unittest/base.py index 3e4abda..cfca7c9 100644 --- a/chainsyncer/unittest/base.py +++ b/chainsyncer/unittest/base.py @@ -13,7 +13,8 @@ from chainsyncer.error import NoBlockForYou from chainsyncer.driver import SyncDriver logging.STATETRACE = 5 -logg = logging.getLogger().getChild(__name__) +logging.addLevelName('STATETRACE', logging.STATETRACE) +logg = logging.getLogger(__name__) def state_event_handler(k, v_old, v_new): diff --git a/chainsyncer/unittest/store.py b/chainsyncer/unittest/store.py index 6fd2212..57b7c8b 100644 --- a/chainsyncer/unittest/store.py +++ b/chainsyncer/unittest/store.py @@ -232,7 +232,7 @@ class TestStoreBase(unittest.TestCase): if self.persist: store = self.store_factory('foo') store.start() - o = store.get(2) + o = store.get('2') def t_sync_history_interrupted(self): @@ -252,7 +252,7 @@ class TestStoreBase(unittest.TestCase): store.stop(bogus_item) store = self.store_factory('foo') store.start() - o = store.get(0) + o = store.get('0') self.assertEqual(o.cursor, 2) self.assertEqual(o.target, 13) o.next(advance_block=True) diff --git a/setup.cfg b/setup.cfg index ee9a325..bd9eda5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.4.2 +version = 0.4.3 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no diff --git a/tests/test_filter.py b/tests/test_filter.py index 1444512..78d70c2 100644 --- a/tests/test_filter.py +++ b/tests/test_filter.py @@ -19,17 +19,22 @@ from chainsyncer.unittest import ( MockConn, MockTx, MockBlock, + MockFilterError, + state_event_handler, + filter_state_event_handler, ) -logging.basicConfig(level=logging.DEBUG) + +logging.basicConfig(level=logging.STATETRACE) logg = logging.getLogger() + class TestFilter(unittest.TestCase): def setUp(self): self.path = tempfile.mkdtemp() - self.store = SyncFsStore(self.path) + self.store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler) self.session = SyncSession(self.store) self.conn = MockConn() @@ -73,6 +78,82 @@ class TestFilter(unittest.TestCase): self.assertEqual(len(fltr_two.contents), 0) + def test_filter_resume_single_revert(self): + fltr_one = MockFilter('foo', brk_hard=True) + self.store.register(fltr_one) + + self.session.start() + + item = self.store.get('0') + item.next() + + tx_hash = os.urandom(32).hex() + tx = MockTx(42, tx_hash) + block = MockBlock(13, [tx_hash]) + + with self.assertRaises(MockFilterError): + self.session.filter(self.conn, block, tx) + + # Unlock the state, reverting to previous filter + store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler) + self.conn = MockConn() + fltr_one = MockFilter('foo') + store.register(fltr_one) + store.connect() + store.start(ignore_lock=True) + store.unlock_filter(revert=True) + + store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler) + session = SyncSession(store) + self.conn = MockConn() + + fltr_one = MockFilter('foo') + store.register(fltr_one) + + session.start() + + session.filter(self.conn, block, tx) + + + + def test_filter_resume_single_continue(self): + fltr_one = MockFilter('foo', brk_hard=True) + self.store.register(fltr_one) + + self.session.start() + + item = self.store.get('0') + item.next() + + tx_hash = os.urandom(32).hex() + tx = MockTx(42, tx_hash) + block = MockBlock(13, [tx_hash]) + + with self.assertRaises(MockFilterError): + self.session.filter(self.conn, block, tx) + + # Unlock the state, reverting to previous filter + store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler) + self.conn = MockConn() + fltr_one = MockFilter('foo') + store.register(fltr_one) + store.connect() + store.start(ignore_lock=True) + store.unlock_filter(revert=False) + + store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler) + session = SyncSession(store) + self.conn = MockConn() + + fltr_one = MockFilter('foo') + store.register(fltr_one) + store.connect() + + session.start() + + session.filter(self.conn, block, tx) + + if __name__ == '__main__': unittest.main()