WIP Move unlock filter code into store base
This commit is contained in:
		
							parent
							
								
									44bdda80bf
								
							
						
					
					
						commit
						6b6b26f1ae
					
				@ -31,16 +31,25 @@ class LockError(Exception):
 | 
			
		||||
class FilterDone(Exception):
 | 
			
		||||
    """Exception raised when all registered filters have been executed
 | 
			
		||||
    """
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InterruptError(FilterDone):
 | 
			
		||||
    """Exception for interrupting or attempting to use an interrupted sync
 | 
			
		||||
    """
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IncompleteFilterError(Exception):
 | 
			
		||||
    """Exception raised if filter reset is executed prematurely
 | 
			
		||||
    """
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FilterInitializationError(BackendError):
 | 
			
		||||
    """Exception raised if filter state does not match the registered filters
 | 
			
		||||
    """
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
#class AbortTx(Exception):
 | 
			
		||||
#    """
 | 
			
		||||
 | 
			
		||||
@ -106,6 +106,8 @@ def main():
 | 
			
		||||
 | 
			
		||||
    store.connect()
 | 
			
		||||
    store.start(ignore_lock=True)
 | 
			
		||||
    store.unlock_filter(not action_is_forward)
 | 
			
		||||
    return
 | 
			
		||||
 | 
			
		||||
    lock_state = store.filter_state.from_name('LOCK')
 | 
			
		||||
    locked_item = store.filter_state.list(lock_state)
 | 
			
		||||
 | 
			
		||||
@ -13,12 +13,15 @@ class SyncSession:
 | 
			
		||||
    def __init__(self, session_store):
 | 
			
		||||
        self.session_store = session_store
 | 
			
		||||
        self.started = self.session_store.started
 | 
			
		||||
        self.get = self.session_store.get
 | 
			
		||||
        self.next = self.session_store.next_item
 | 
			
		||||
        self.item = None
 | 
			
		||||
        self.filters = self.session_store.filters
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    def get(self, k):
 | 
			
		||||
        return self.session_store.get(str(k))
 | 
			
		||||
   
 | 
			
		||||
 | 
			
		||||
    def start(self, offset=0, target=-1):
 | 
			
		||||
        self.session_store.start(offset=offset, target=target)
 | 
			
		||||
        self.item = self.session_store.next_item()
 | 
			
		||||
@ -38,5 +41,5 @@ class SyncSession:
 | 
			
		||||
            if not self.item.release(interrupt=interrupt):
 | 
			
		||||
                break
 | 
			
		||||
        self.item.reset()
 | 
			
		||||
        self.next()
 | 
			
		||||
        #self.next()
 | 
			
		||||
        self.session_store.disconnect()
 | 
			
		||||
 | 
			
		||||
@ -13,6 +13,7 @@ from chainsyncer.error import (
 | 
			
		||||
        InterruptError,
 | 
			
		||||
        IncompleteFilterError,
 | 
			
		||||
        SyncDone,
 | 
			
		||||
        FilterInitializationError,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
logg = logging.getLogger(__name__)
 | 
			
		||||
@ -67,13 +68,16 @@ class SyncItem:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def resume(self):
 | 
			
		||||
        return
 | 
			
		||||
        filter_state = self.filter_state.state(self.state_key)
 | 
			
		||||
        if filter_state > 0x0f:
 | 
			
		||||
            filter_state_part = self.filter_state.mask(filter_state, 0x0f)
 | 
			
		||||
            if len(self.filter_state.elements(filter_state)) == 1:
 | 
			
		||||
                logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part))
 | 
			
		||||
                lock_state = self.filter_state.from_name('LOCK')
 | 
			
		||||
                self.filter_state.set(lock_state)
 | 
			
		||||
            if filter_state_part > 0:
 | 
			
		||||
                filter_state_part_name = self.filter_state.name(filter_state_part)
 | 
			
		||||
                if filter_state_part_name[0] != '_':
 | 
			
		||||
                    logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part))
 | 
			
		||||
                    lock_state = self.filter_state.from_name('LOCK')
 | 
			
		||||
                    self.filter_state.set(self.state_key, lock_state)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def reset(self, check_incomplete=True):
 | 
			
		||||
@ -222,8 +226,9 @@ class SyncStore:
 | 
			
		||||
            self.filter_state.put(block_number_str)
 | 
			
		||||
            o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock)
 | 
			
		||||
            o.resume()
 | 
			
		||||
            self.items[offset] = o
 | 
			
		||||
            self.item_keys.append(offset)
 | 
			
		||||
            k = str(offset)
 | 
			
		||||
            self.items[k] = o
 | 
			
		||||
            self.item_keys.append(k)
 | 
			
		||||
        elif offset > 0:
 | 
			
		||||
            logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id))
 | 
			
		||||
        self.started = True
 | 
			
		||||
@ -272,8 +277,9 @@ class SyncStore:
 | 
			
		||||
                item_target = thresholds[i+1] 
 | 
			
		||||
            o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock)
 | 
			
		||||
            o.resume()
 | 
			
		||||
            self.items[block_number] = o
 | 
			
		||||
            self.item_keys.append(block_number)
 | 
			
		||||
            k = str(block_number)
 | 
			
		||||
            self.items[k] = o
 | 
			
		||||
            self.item_keys.append(k)
 | 
			
		||||
            logg.info('added existing {}'.format(o))
 | 
			
		||||
 | 
			
		||||
        self.get_target()
 | 
			
		||||
@ -315,8 +321,65 @@ class SyncStore:
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def peek_next_filter(self):
 | 
			
		||||
        pass
 | 
			
		||||
    def __get_locked_item(self):
 | 
			
		||||
        locked_item = self.filter_state.list(self.filter_state.state_store.LOCK)
 | 
			
		||||
        
 | 
			
		||||
        if len(locked_item) == 0:
 | 
			
		||||
            logg.error('Sync filter in store {} is not locked\n'.format(self))
 | 
			
		||||
            return None
 | 
			
		||||
        elif len(locked_item) > 1:
 | 
			
		||||
            raise FilterInitializationError('More than one locked filter item encountered in store {}. That should never happen, so I do not know what to do next.\n'.format(self))
 | 
			
		||||
        return locked_item[0]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def __get_filter_index(self, k):
 | 
			
		||||
        i = -1
 | 
			
		||||
        fltrs = self.load_filter_list()
 | 
			
		||||
        for fltr in fltrs:
 | 
			
		||||
            i += 1
 | 
			
		||||
            if k == fltr.upper():
 | 
			
		||||
                logg.debug('lock filter match at filter list index {}'.format(i))
 | 
			
		||||
        return (i, fltrs,)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def unlock_filter(self, revert=False):
 | 
			
		||||
        locked_item_key = self.__get_locked_item()
 | 
			
		||||
        if locked_item_key == None:
 | 
			
		||||
            return False
 | 
			
		||||
        locked_item = self.get(locked_item_key)
 | 
			
		||||
        locked_state = self.filter_state.state(locked_item_key) - self.filter_state.state_store.LOCK
 | 
			
		||||
        locked_state_name = self.filter_state.name(locked_state)
 | 
			
		||||
 | 
			
		||||
        logg.debug('found locked item {} in state {}'.format(locked_item, locked_state))
 | 
			
		||||
 | 
			
		||||
        (i, fltrs) = self.__get_filter_index(locked_state_name)
 | 
			
		||||
 | 
			
		||||
        if i == -1:
 | 
			
		||||
            raise FilterInitializationError('locked state {} ({}) found for item {}, but matching filter has not been registered'.format(locked_state_name, locked_state, locked_item))
 | 
			
		||||
 | 
			
		||||
        if revert:
 | 
			
		||||
            self.__unlock_previous(locked_item, fltrs, i)
 | 
			
		||||
        else:
 | 
			
		||||
            self.__unlock_next(locked_item, fltrs, i)
 | 
			
		||||
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def __unlock_next(self, item, lst, index):
 | 
			
		||||
        #self.filter_state.state_store.unset(item.state_key, self.filter_state.state_store.LOCK)
 | 
			
		||||
        if index == len(lst) - 1:
 | 
			
		||||
            item.reset(check_incomplete=False)
 | 
			
		||||
        else:
 | 
			
		||||
            item.release()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def __unlock_previous(self, item, lst, index):
 | 
			
		||||
        if index == 0:
 | 
			
		||||
            item.reset(check_incomplete=False)
 | 
			
		||||
        else:
 | 
			
		||||
            new_state = lst(index)
 | 
			
		||||
            self.filter_state.state_store.from_name(new_state.upper())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def peek_current_filter(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
@ -13,7 +13,8 @@ from chainsyncer.error import NoBlockForYou
 | 
			
		||||
from chainsyncer.driver import SyncDriver
 | 
			
		||||
 | 
			
		||||
logging.STATETRACE = 5
 | 
			
		||||
logg = logging.getLogger().getChild(__name__)
 | 
			
		||||
logging.addLevelName('STATETRACE', logging.STATETRACE)
 | 
			
		||||
logg = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def state_event_handler(k, v_old, v_new):
 | 
			
		||||
 | 
			
		||||
@ -232,7 +232,7 @@ class TestStoreBase(unittest.TestCase):
 | 
			
		||||
        if self.persist:
 | 
			
		||||
            store = self.store_factory('foo')
 | 
			
		||||
            store.start()
 | 
			
		||||
            o = store.get(2)
 | 
			
		||||
            o = store.get('2')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def t_sync_history_interrupted(self):
 | 
			
		||||
@ -252,7 +252,7 @@ class TestStoreBase(unittest.TestCase):
 | 
			
		||||
        store.stop(bogus_item)
 | 
			
		||||
        store = self.store_factory('foo')
 | 
			
		||||
        store.start()
 | 
			
		||||
        o = store.get(0)
 | 
			
		||||
        o = store.get('0')
 | 
			
		||||
        self.assertEqual(o.cursor, 2)
 | 
			
		||||
        self.assertEqual(o.target, 13) 
 | 
			
		||||
        o.next(advance_block=True)
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,6 @@
 | 
			
		||||
[metadata]
 | 
			
		||||
name = chainsyncer
 | 
			
		||||
version = 0.4.2
 | 
			
		||||
version = 0.4.3
 | 
			
		||||
description = Generic blockchain syncer driver
 | 
			
		||||
author = Louis Holbrook
 | 
			
		||||
author_email = dev@holbrook.no
 | 
			
		||||
 | 
			
		||||
@ -19,17 +19,22 @@ from chainsyncer.unittest import (
 | 
			
		||||
        MockConn,
 | 
			
		||||
        MockTx,
 | 
			
		||||
        MockBlock,
 | 
			
		||||
        MockFilterError,
 | 
			
		||||
        state_event_handler,
 | 
			
		||||
        filter_state_event_handler,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
logging.basicConfig(level=logging.DEBUG)
 | 
			
		||||
 | 
			
		||||
logging.basicConfig(level=logging.STATETRACE)
 | 
			
		||||
logg = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestFilter(unittest.TestCase):
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        self.path = tempfile.mkdtemp()
 | 
			
		||||
        self.store = SyncFsStore(self.path)
 | 
			
		||||
        self.store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
 | 
			
		||||
        self.session = SyncSession(self.store)
 | 
			
		||||
        self.conn = MockConn()
 | 
			
		||||
 | 
			
		||||
@ -73,6 +78,82 @@ class TestFilter(unittest.TestCase):
 | 
			
		||||
        self.assertEqual(len(fltr_two.contents), 0)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def test_filter_resume_single_revert(self):
 | 
			
		||||
        fltr_one = MockFilter('foo', brk_hard=True)
 | 
			
		||||
        self.store.register(fltr_one)
 | 
			
		||||
 | 
			
		||||
        self.session.start()
 | 
			
		||||
 | 
			
		||||
        item = self.store.get('0')
 | 
			
		||||
        item.next()
 | 
			
		||||
 | 
			
		||||
        tx_hash = os.urandom(32).hex()
 | 
			
		||||
        tx = MockTx(42, tx_hash)
 | 
			
		||||
        block = MockBlock(13, [tx_hash])
 | 
			
		||||
 | 
			
		||||
        with self.assertRaises(MockFilterError):
 | 
			
		||||
            self.session.filter(self.conn, block, tx)
 | 
			
		||||
      
 | 
			
		||||
        # Unlock the state, reverting to previous filter
 | 
			
		||||
        store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
 | 
			
		||||
        self.conn = MockConn()
 | 
			
		||||
        fltr_one = MockFilter('foo')
 | 
			
		||||
        store.register(fltr_one)
 | 
			
		||||
        store.connect()
 | 
			
		||||
        store.start(ignore_lock=True)
 | 
			
		||||
        store.unlock_filter(revert=True)
 | 
			
		||||
 | 
			
		||||
        store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
 | 
			
		||||
        session = SyncSession(store)
 | 
			
		||||
        self.conn = MockConn()
 | 
			
		||||
 | 
			
		||||
        fltr_one = MockFilter('foo')
 | 
			
		||||
        store.register(fltr_one)
 | 
			
		||||
 | 
			
		||||
        session.start()
 | 
			
		||||
 | 
			
		||||
        session.filter(self.conn, block, tx)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def test_filter_resume_single_continue(self):
 | 
			
		||||
        fltr_one = MockFilter('foo', brk_hard=True)
 | 
			
		||||
        self.store.register(fltr_one)
 | 
			
		||||
 | 
			
		||||
        self.session.start()
 | 
			
		||||
 | 
			
		||||
        item = self.store.get('0')
 | 
			
		||||
        item.next()
 | 
			
		||||
 | 
			
		||||
        tx_hash = os.urandom(32).hex()
 | 
			
		||||
        tx = MockTx(42, tx_hash)
 | 
			
		||||
        block = MockBlock(13, [tx_hash])
 | 
			
		||||
 | 
			
		||||
        with self.assertRaises(MockFilterError):
 | 
			
		||||
            self.session.filter(self.conn, block, tx)
 | 
			
		||||
      
 | 
			
		||||
        # Unlock the state, reverting to previous filter
 | 
			
		||||
        store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
 | 
			
		||||
        self.conn = MockConn()
 | 
			
		||||
        fltr_one = MockFilter('foo')
 | 
			
		||||
        store.register(fltr_one)
 | 
			
		||||
        store.connect()
 | 
			
		||||
        store.start(ignore_lock=True)
 | 
			
		||||
        store.unlock_filter(revert=False)
 | 
			
		||||
 | 
			
		||||
        store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
 | 
			
		||||
        session = SyncSession(store)
 | 
			
		||||
        self.conn = MockConn()
 | 
			
		||||
 | 
			
		||||
        fltr_one = MockFilter('foo')
 | 
			
		||||
        store.register(fltr_one)
 | 
			
		||||
        store.connect()
 | 
			
		||||
 | 
			
		||||
        session.start()
 | 
			
		||||
 | 
			
		||||
        session.filter(self.conn, block, tx)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    unittest.main()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user