Replace filter execution control with bools instead of exceptions
This commit is contained in:
parent
75a6d2975e
commit
e48b62679d
@ -26,16 +26,10 @@ class SyncSession:
|
||||
def filter(self, conn, block, tx):
|
||||
self.session_store.connect()
|
||||
for fltr in self.filters:
|
||||
try:
|
||||
self.item.advance()
|
||||
except FilterDone:
|
||||
break
|
||||
interrupt = fltr.filter(conn, block, tx)
|
||||
self.item.release(interrupt=interrupt)
|
||||
try:
|
||||
self.item.advance()
|
||||
raise BackendError('filter state inconsitent with filter list')
|
||||
except FilterDone:
|
||||
if not self.item.release(interrupt=interrupt):
|
||||
break
|
||||
self.item.reset()
|
||||
self.next()
|
||||
self.session_store.disconnect()
|
||||
|
@ -19,12 +19,14 @@ class SyncState:
|
||||
self.state_store.add('LOCK')
|
||||
self.state_store.add('INTERRUPT')
|
||||
self.state_store.add('RESET')
|
||||
|
||||
self.state = self.state_store.state
|
||||
self.put = self.state_store.put
|
||||
self.set = self.state_store.set
|
||||
self.next = self.state_store.next
|
||||
self.move = self.state_store.move
|
||||
self.unset = self.state_store.unset
|
||||
self.peek = self.state_store.peek
|
||||
self.from_name = self.state_store.from_name
|
||||
self.state_store.sync()
|
||||
self.all = self.state_store.all
|
||||
|
@ -82,6 +82,10 @@ class SyncFsItem:
|
||||
self.sync_state.replace(self.state_key, v)
|
||||
|
||||
|
||||
def __find_advance(self):
|
||||
v = self.filter_state.state(self.state_key)
|
||||
|
||||
|
||||
def advance(self):
|
||||
if self.skip_filter:
|
||||
raise FilterDone()
|
||||
@ -95,24 +99,30 @@ class SyncFsItem:
|
||||
except StateInvalid:
|
||||
done = True
|
||||
if done:
|
||||
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
|
||||
raise FilterDone()
|
||||
self.filter_state.set(self.state_key, self.filter_state.from_name('LOCK'))
|
||||
|
||||
|
||||
def release(self, interrupt=False):
|
||||
if self.skip_filter:
|
||||
raise FilterDone()
|
||||
return False
|
||||
if interrupt:
|
||||
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
|
||||
self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
|
||||
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
|
||||
return
|
||||
return False
|
||||
|
||||
state = self.filter_state.state(self.state_key)
|
||||
if state & self.filter_state.from_name('LOCK') == 0:
|
||||
raise LockError('release attempt on {} when state unlocked'.format(self.state_key))
|
||||
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
|
||||
try:
|
||||
c = self.filter_state.peek(self.state_key)
|
||||
logg.debug('peeked {}'.format(c))
|
||||
except StateInvalid:
|
||||
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def __str__(self):
|
||||
|
@ -31,7 +31,6 @@ class TestFilter(unittest.TestCase):
|
||||
self.path = tempfile.mkdtemp()
|
||||
self.store = SyncFsStore(self.path)
|
||||
self.session = SyncSession(self.store)
|
||||
self.session.start()
|
||||
self.conn = MockConn()
|
||||
|
||||
|
||||
@ -45,6 +44,8 @@ class TestFilter(unittest.TestCase):
|
||||
fltr_two = MockFilter('bar')
|
||||
self.store.register(fltr_two)
|
||||
|
||||
self.session.start()
|
||||
|
||||
tx_hash = os.urandom(32).hex()
|
||||
tx = MockTx(42, tx_hash)
|
||||
block = MockBlock(13, [tx_hash])
|
||||
@ -61,6 +62,8 @@ class TestFilter(unittest.TestCase):
|
||||
fltr_two = MockFilter('bar')
|
||||
self.store.register(fltr_two)
|
||||
|
||||
self.session.start()
|
||||
|
||||
tx_hash = os.urandom(32).hex()
|
||||
tx = MockTx(42, tx_hash)
|
||||
block = MockBlock(13, [tx_hash])
|
||||
|
Loading…
Reference in New Issue
Block a user