diff --git a/chainsyncer/session.py b/chainsyncer/session.py index 44d8b0b..5335c5a 100644 --- a/chainsyncer/session.py +++ b/chainsyncer/session.py @@ -26,16 +26,10 @@ class SyncSession: def filter(self, conn, block, tx): self.session_store.connect() for fltr in self.filters: - try: - self.item.advance() - except FilterDone: - break - interrupt = fltr.filter(conn, block, tx) - self.item.release(interrupt=interrupt) - try: self.item.advance() - raise BackendError('filter state inconsitent with filter list') - except FilterDone: - self.item.reset() + interrupt = fltr.filter(conn, block, tx) + if not self.item.release(interrupt=interrupt): + break + self.item.reset() self.next() self.session_store.disconnect() diff --git a/chainsyncer/state/base.py b/chainsyncer/state/base.py index 58fd4d5..f50c27e 100644 --- a/chainsyncer/state/base.py +++ b/chainsyncer/state/base.py @@ -19,12 +19,14 @@ class SyncState: self.state_store.add('LOCK') self.state_store.add('INTERRUPT') self.state_store.add('RESET') + self.state = self.state_store.state self.put = self.state_store.put self.set = self.state_store.set self.next = self.state_store.next self.move = self.state_store.move self.unset = self.state_store.unset + self.peek = self.state_store.peek self.from_name = self.state_store.from_name self.state_store.sync() self.all = self.state_store.all diff --git a/chainsyncer/store/fs.py b/chainsyncer/store/fs.py index e64315d..4739a42 100644 --- a/chainsyncer/store/fs.py +++ b/chainsyncer/store/fs.py @@ -82,6 +82,10 @@ class SyncFsItem: self.sync_state.replace(self.state_key, v) + def __find_advance(self): + v = self.filter_state.state(self.state_key) + + def advance(self): if self.skip_filter: raise FilterDone() @@ -95,24 +99,30 @@ class SyncFsItem: except StateInvalid: done = True if done: - self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) raise FilterDone() self.filter_state.set(self.state_key, self.filter_state.from_name('LOCK')) def release(self, interrupt=False): if self.skip_filter: - raise FilterDone() + return False if interrupt: self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT')) self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) - return + return False state = self.filter_state.state(self.state_key) if state & self.filter_state.from_name('LOCK') == 0: raise LockError('release attempt on {} when state unlocked'.format(self.state_key)) self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK')) + try: + c = self.filter_state.peek(self.state_key) + logg.debug('peeked {}'.format(c)) + except StateInvalid: + self.filter_state.set(self.state_key, self.filter_state.from_name('DONE')) + return False + return True def __str__(self): diff --git a/tests/test_filter.py b/tests/test_filter.py index 6954786..1444512 100644 --- a/tests/test_filter.py +++ b/tests/test_filter.py @@ -31,7 +31,6 @@ class TestFilter(unittest.TestCase): self.path = tempfile.mkdtemp() self.store = SyncFsStore(self.path) self.session = SyncSession(self.store) - self.session.start() self.conn = MockConn() @@ -45,6 +44,8 @@ class TestFilter(unittest.TestCase): fltr_two = MockFilter('bar') self.store.register(fltr_two) + self.session.start() + tx_hash = os.urandom(32).hex() tx = MockTx(42, tx_hash) block = MockBlock(13, [tx_hash]) @@ -61,6 +62,8 @@ class TestFilter(unittest.TestCase): fltr_two = MockFilter('bar') self.store.register(fltr_two) + self.session.start() + tx_hash = os.urandom(32).hex() tx = MockTx(42, tx_hash) block = MockBlock(13, [tx_hash])