Complete test for lock on interrupted filter

This commit is contained in:
lash 2022-03-23 23:36:17 +00:00
parent a3517d0203
commit ce945ae56e
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 58 additions and 21 deletions

View File

@ -38,6 +38,7 @@ class SyncDriver:
self.idle_callback = idle_callback self.idle_callback = idle_callback
self.last_start = 0 self.last_start = 0
self.clock_id = time.CLOCK_MONOTONIC_RAW self.clock_id = time.CLOCK_MONOTONIC_RAW
self.store.connect()
self.store.start(offset=offset, target=target) self.store.start(offset=offset, target=target)

View File

@ -1,14 +1,18 @@
# standard imports # standard imports
import hashlib import hashlib
import logging import logging
import re
import os
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
re_processedname = r'^_?[A-Z,_]*$'
# TODO: properly clarify interface shared with syncfsstore, move to filter module? # TODO: properly clarify interface shared with syncfsstore, move to filter module?
class SyncState: class SyncState:
def __init__(self, state_store): def __init__(self, state_store, scan_path=None):
self.state_store = state_store self.state_store = state_store
self.digest = b'\x00' * 32 self.digest = b'\x00' * 32
self.summed = False self.summed = False
@ -32,6 +36,8 @@ class SyncState:
self.all = self.state_store.all self.all = self.state_store.all
self.started = False self.started = False
self.scan_path = scan_path
def __verify_sum(self, v): def __verify_sum(self, v):
if not isinstance(v, bytes) and not isinstance(v, bytearray): if not isinstance(v, bytes) and not isinstance(v, bytearray):
@ -66,6 +72,18 @@ class SyncState:
k = self.state_store.from_name(v) k = self.state_store.from_name(v)
self.state_store.sync(k) self.state_store.sync(k)
self.__syncs[v] = True self.__syncs[v] = True
if self.scan_path != None:
for v in os.listdir(self.scan_path):
logg.debug('sync {} try {}'.format(self.scan_path, v))
if re.match(re_processedname, v):
k = None
try:
k = self.state_store.from_elements(v)
self.state_store.alias(v, k)
except ValueError:
k = self.state_store.from_name(v)
self.state_store.sync(k)
self.__syncs[v] = True
self.synced = True self.synced = True
self.connected = True self.connected = True

View File

@ -50,9 +50,9 @@ class SyncFsItem:
(self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v) (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid: if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid:
raise LockError(s) raise LockError(self.state_key)
self.count = len(self.filter_state.all(pure=True)) - 3 self.count = len(self.filter_state.all(pure=True)) - 4
self.skip_filter = False self.skip_filter = False
if self.count == 0: if self.count == 0:
self.skip_filter = True self.skip_filter = True
@ -148,7 +148,7 @@ class SyncFsItem:
class SyncFsStore: class SyncFsStore:
def __init__(self, base_path, session_id=None): def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
self.session_id = None self.session_id = None
self.session_path = None self.session_path = None
self.is_default = False self.is_default = False
@ -182,14 +182,14 @@ class SyncFsStore:
logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path)) logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
factory = SimpleFileStoreFactory(self.session_path, binary=True) factory = SimpleFileStoreFactory(self.session_path, binary=True)
self.state = PersistedState(factory.add, 2) self.state = PersistedState(factory.add, 2, event_callback=state_event_callback)
self.state.add('SYNC') self.state.add('SYNC')
self.state.add('DONE') self.state.add('DONE')
base_filter_path = os.path.join(self.session_path, 'filter') base_filter_path = os.path.join(self.session_path, 'filter')
factory = SimpleFileStoreFactory(base_filter_path, binary=True) factory = SimpleFileStoreFactory(base_filter_path, binary=True)
filter_state_backend = PersistedState(factory.add, 0, check_alias=False) filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=filter_state_event_callback)
self.filter_state = SyncState(filter_state_backend) self.filter_state = SyncState(filter_state_backend, scan_path=base_filter_path)
self.filters = [] # used by SyncSession self.filters = [] # used by SyncSession

View File

@ -12,9 +12,18 @@ from shep.state import State
from chainsyncer.error import NoBlockForYou from chainsyncer.error import NoBlockForYou
from chainsyncer.driver import SyncDriver from chainsyncer.driver import SyncDriver
logging.STATETRACE = 5
logg = logging.getLogger().getChild(__name__) logg = logging.getLogger().getChild(__name__)
def state_event_handler(k, v_old, v_new):
logg.log(logging.STATETRACE, 'sync state change key {}: {} -> {}'.format(k, v_old, v_new))
def filter_state_event_handler(k, v_old, v_new):
logg.log(logging.STATETRACE, 'filter state change key {}: {} -> {}'.format(k, v_old, v_new))
class MockFilterError(Exception): class MockFilterError(Exception):
pass pass
@ -120,7 +129,7 @@ class MockFilter:
if self.brk > 0: if self.brk > 0:
r = True r = True
self.brk -= 1 self.brk -= 1
logg.debug('filter {} r {}'.format(self.common_name(), r)) logg.debug('filter {} r {} block {}'.format(self.common_name(), r, block.number))
return r return r

View File

@ -22,10 +22,12 @@ from chainsyncer.unittest import (
MockBlock, MockBlock,
MockDriver, MockDriver,
MockFilterError, MockFilterError,
state_event_handler,
filter_state_event_handler,
) )
from chainsyncer.driver import SyncDriver from chainsyncer.driver import SyncDriver
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.STATETRACE)
logg = logging.getLogger() logg = logging.getLogger()
@ -33,7 +35,7 @@ class TestFilter(unittest.TestCase):
def setUp(self): def setUp(self):
self.path = tempfile.mkdtemp() self.path = tempfile.mkdtemp()
self.store = SyncFsStore(self.path) self.store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
self.conn = MockConn() self.conn = MockConn()
@ -98,19 +100,26 @@ class TestFilter(unittest.TestCase):
with self.assertRaises(MockFilterError): with self.assertRaises(MockFilterError):
drv.run(self.conn) drv.run(self.conn)
store = SyncFsStore(self.path) store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
drv = MockDriver(store, target=1)
drv.add_block(block) fltr_one = MockFilter('foo', brk_hard=1)
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
tx_hash_one = os.urandom(32).hex() with self.assertRaises(LockError):
tx = MockTx(0, tx_hash_one) drv = MockDriver(store, target=1)
tx_hash_two = os.urandom(32).hex()
tx = MockTx(1, tx_hash_two)
block = MockBlock(1, [tx_hash_one, tx_hash_two])
drv.add_block(block)
with self.assertRaises(SyncDone): # drv.add_block(block)
drv.run(self.conn) #
# tx_hash_one = os.urandom(32).hex()
# tx = MockTx(0, tx_hash_one)
# tx_hash_two = os.urandom(32).hex()
# tx = MockTx(1, tx_hash_two)
# block = MockBlock(1, [tx_hash_one, tx_hash_two])
# drv.add_block(block)
# drv.run(self.conn)
if __name__ == '__main__': if __name__ == '__main__':