WIP implement resume sync test
This commit is contained in:
parent
55e30eb13b
commit
ecb123f495
@ -55,7 +55,7 @@ class SyncDriver:
|
||||
self.running = False
|
||||
|
||||
|
||||
def run(self, conn):
|
||||
def run(self, conn, interval=1):
|
||||
while self.running_global:
|
||||
self.session = SyncSession(self.store)
|
||||
item = self.session.start()
|
||||
@ -63,7 +63,7 @@ class SyncDriver:
|
||||
self.running = False
|
||||
self.running_global = False
|
||||
break
|
||||
self.loop(conn, item)
|
||||
self.loop(conn, item, interval=interval)
|
||||
|
||||
|
||||
def idle(self, interval):
|
||||
@ -92,7 +92,7 @@ class SyncDriver:
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def loop(self, conn, item):
|
||||
def loop(self, conn, item, interval=1):
|
||||
logg.debug('started loop')
|
||||
tx_start = item.tx_cursor
|
||||
while self.running and SyncDriver.running_global:
|
||||
@ -115,7 +115,6 @@ class SyncDriver:
|
||||
self.process(conn, item, block, tx_start)
|
||||
except IndexError:
|
||||
item.next(advance_block=True)
|
||||
logg.debug('foo')
|
||||
tx_start = 0
|
||||
time.sleep(self.yield_delay)
|
||||
if self.post_callback != None:
|
||||
|
@ -17,11 +17,17 @@ logg = logging.getLogger().getChild(__name__)
|
||||
|
||||
|
||||
def state_event_handler(k, v_old, v_new):
|
||||
logg.log(logging.STATETRACE, 'sync state change key {}: {} -> {}'.format(k, v_old, v_new))
|
||||
if v_old == None:
|
||||
logg.log(logging.STATETRACE, 'sync state create key {}: -> {}'.format(k, v_new))
|
||||
else:
|
||||
logg.log(logging.STATETRACE, 'sync state change key {}: {} -> {}'.format(k, v_old, v_new))
|
||||
|
||||
|
||||
def filter_state_event_handler(k, v_old, v_new):
|
||||
logg.log(logging.STATETRACE, 'filter state change key {}: {} -> {}'.format(k, v_old, v_new))
|
||||
if v_old == None:
|
||||
logg.log(logging.STATETRACE, 'filter state create key {}: -> {}'.format(k, v_new))
|
||||
else:
|
||||
logg.log(logging.STATETRACE, 'filter state change key {}: {} -> {}'.format(k, v_old, v_new))
|
||||
|
||||
|
||||
class MockFilterError(Exception):
|
||||
@ -45,9 +51,21 @@ class MockBlockGenerator:
|
||||
txs.append(tx)
|
||||
|
||||
block = MockBlock(self.cursor, txs)
|
||||
driver.add_block(block)
|
||||
self.blocks[self.cursor] = block
|
||||
self.cursor += 1
|
||||
|
||||
if driver != None:
|
||||
self.apply(driver)
|
||||
|
||||
|
||||
def apply(self, driver, offset=0):
|
||||
block_numbers = list(self.blocks.keys())
|
||||
for block_number in block_numbers:
|
||||
if block_number < offset:
|
||||
continue
|
||||
block = self.blocks[block_number]
|
||||
driver.add_block(block)
|
||||
|
||||
|
||||
class MockConn:
|
||||
"""Noop connection mocker.
|
||||
@ -151,15 +169,24 @@ class MockFilter:
|
||||
if self.brk > 0:
|
||||
r = True
|
||||
self.brk -= 1
|
||||
logg.debug('filter {} r {} block {}'.format(self.common_name(), r, block.number))
|
||||
logg.debug('filter {} result {} block {}'.format(self.common_name(), r, block.number))
|
||||
return r
|
||||
|
||||
|
||||
class MockDriver(SyncDriver):
|
||||
|
||||
def __init__(self, store, offset=0, target=-1):
|
||||
def __init__(self, store, offset=0, target=-1, interrupt_block=None, interrupt_tx=None, interrupt_global=False):
|
||||
super(MockDriver, self).__init__(store, offset=offset, target=target)
|
||||
self.blocks = {}
|
||||
self.interrupt = None
|
||||
if interrupt_block != None:
|
||||
interrupt_block = int(interrupt_block)
|
||||
if interrupt_tx == None:
|
||||
interrupt_tx = 0
|
||||
else:
|
||||
interrupt_tx = int(interrupt_tx)
|
||||
self.interrupt = (interrupt_block, interrupt_tx,)
|
||||
self.interrupt_global = interrupt_global
|
||||
|
||||
|
||||
def add_block(self, block):
|
||||
@ -173,7 +200,14 @@ class MockDriver(SyncDriver):
|
||||
|
||||
def process(self, conn, item, block, tx_start):
|
||||
i = tx_start
|
||||
while True:
|
||||
while self.running:
|
||||
if self.interrupt != None:
|
||||
if self.interrupt[0] == block.number and self.interrupt[1] == i:
|
||||
logg.info('interrupt triggered at {}'.format(self.interrupt))
|
||||
if self.interrupt_global:
|
||||
SyncDriver.running_global = False
|
||||
self.running = False
|
||||
break
|
||||
tx = block.tx(i)
|
||||
self.process_single(conn, block, tx)
|
||||
item.next()
|
||||
|
@ -117,9 +117,20 @@ class TestFilter(unittest.TestCase):
|
||||
|
||||
|
||||
def test_driver_interrupt_sync(self):
|
||||
drv = MockDriver(self.store, interrupt_block=1)
|
||||
generator = MockBlockGenerator()
|
||||
drv = MockDriver(self.store, target=1)
|
||||
generator.generate([1, 2], driver=drv)
|
||||
generator.generate([3, 1, 2], driver=drv)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
self.store.register(fltr_one)
|
||||
|
||||
drv.run(self.conn, interval=0.1)
|
||||
|
||||
|
||||
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
|
||||
drv = MockDriver(store)
|
||||
generator.apply(drv, offset=1)
|
||||
drv.run(self.conn, interval=0.1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Loading…
Reference in New Issue
Block a user