From af47e31cc8f26270db19982159ae5004d7e2eeaa Mon Sep 17 00:00:00 2001 From: lash Date: Thu, 17 Mar 2022 10:09:12 +0000 Subject: [PATCH] New filter interface, add state step stubs --- chainsyncer/filter.py | 96 +++------------------------------------ chainsyncer/session.py | 21 +++++++-- chainsyncer/state/base.py | 35 ++++++++++---- tests/test_basic.py | 59 ++++++++++++++++-------- 4 files changed, 90 insertions(+), 121 deletions(-) diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 7c1f6c9..4149014 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -1,96 +1,12 @@ -# standard imports -import logging - -# local imports -from .error import BackendError - -logg = logging.getLogger(__name__) - - class SyncFilter: - """Manages the collection of filters on behalf of a specific backend. - A filter is a pluggable piece of code to execute for every transaction retrieved by the syncer. Filters are executed in the sequence they were added to the instance. - - :param backend: Syncer backend to apply filter state changes to - :type backend: chainsyncer.backend.base.Backend implementation - """ - - def __init__(self, backend): - self.filters = [] - self.backend = backend + def common_name(self): + raise NotImplementedError() - def add(self, fltr): - """Add a filter instance. - - :param fltr: Filter instance. - :type fltr: Object instance implementing signature as in chainsyncer.filter.NoopFilter.filter - :raises ValueError: Object instance is incorrect implementation - """ - if getattr(fltr, 'filter') == None: - raise ValueError('filter object must implement have method filter') - logg.debug('added filter "{}"'.format(str(fltr))) - - self.filters.append(fltr) + def sum(self): + raise NotImplementedError() - def __apply_one(self, fltr, idx, conn, block, tx, session): - self.backend.begin_filter(idx) - fltr.filter(conn, block, tx, session) - self.backend.complete_filter(idx) - - - def apply(self, conn, block, tx): - """Apply all registered filters on the given transaction. - - :param conn: RPC Connection, will be passed to the filter method - :type conn: chainlib.connection.RPCConnection - :param block: Block object - :type block: chainlib.block.Block - :param tx: Transaction object - :type tx: chainlib.tx.Tx - :raises BackendError: Backend connection failed - """ - session = None - try: - session = self.backend.connect() - except TimeoutError as e: - self.backend.disconnect() - raise BackendError('database connection fail: {}'.format(e)) - i = 0 - (pair, flags) = self.backend.get() - for f in self.filters: - if not self.backend.check_filter(i, flags): - logg.debug('applying filter {} {}'.format(str(f), flags)) - self.__apply_one(f, i, conn, block, tx, session) - else: - logg.debug('skipping previously applied filter {} {}'.format(str(f), flags)) - i += 1 - - self.backend.disconnect() - - -class NoopFilter: - """A noop implemenation of a sync filter. - - Logs the filter inputs at debug log level. - """ - - def filter(self, conn, block, tx, db_session=None): - """Filter method implementation: - - :param conn: RPC Connection, will be passed to the filter method - :type conn: chainlib.connection.RPCConnection - :param block: Block object - :type block: chainlib.block.Block - :param tx: Transaction object - :type tx: chainlib.tx.Tx - :param db_session: Backend session object - :type db_session: varies - """ - logg.debug('noop filter :received\n{} {} {}'.format(block, tx, id(db_session))) - - - def __str__(self): - return 'noopfilter' + def filter(self, conn, block, tx): + raise NotImplementedError() diff --git a/chainsyncer/session.py b/chainsyncer/session.py index c5d4be8..e23d8c0 100644 --- a/chainsyncer/session.py +++ b/chainsyncer/session.py @@ -4,20 +4,33 @@ import uuid class SyncSession: - def __init__(self, state_store, session_id=None, is_default=False): + def __init__(self, session_store, sync_state, session_id=None, is_default=False): + self.session_store = session_store if session_id == None: session_id = str(uuid.uuid4()) is_default = True self.session_id = session_id self.is_default = is_default - self.state_store = state_store + self.sync_state = sync_state self.filters = [] + self.started = False def add_filter(self, fltr): - self.state_store.register(fltr) + if self.started: + raise RuntimeError('filters cannot be changed after syncer start') + self.sync_state.register(fltr) self.filters.append(fltr) def start(self): - self.state_store.start() + self.started = True + + + def filter(self, conn, block, tx): + self.sync_state.connect() + for fltr in filters: + self.sync_start.lock() + self.sync_start.unlock() + self.sync_start.disconnect() + diff --git a/chainsyncer/state/base.py b/chainsyncer/state/base.py index fdb2566..d0b370c 100644 --- a/chainsyncer/state/base.py +++ b/chainsyncer/state/base.py @@ -1,13 +1,18 @@ # standard imports import hashlib + class SyncState: def __init__(self, state_store): - self.store = state_store + self.state_store = state_store self.digest = b'\x00' * 32 self.summed = False - self.synced = {} + self.__syncs = {} + self.synced = False + self.connected = False + self.state_store.add('INTERRUPT') + self.state_store.add('LOCK') def __verify_sum(self, v): @@ -24,8 +29,7 @@ class SyncState: self.__verify_sum(z) self.digest += z s = fltr.common_name() - self.store.add('i_' + s) - self.store.add('o_' + s) + self.state_store.add(s) def sum(self): @@ -36,7 +40,22 @@ class SyncState: return self.digest - def start(self): - for v in self.store.all(): - self.store.sync(v) - self.synced[v] = True + def connect(self): + if not self.synced: + for v in self.state_store.all(): + self.state_store.sync(v) + self.__syncs[v] = True + self.synced = True + self.connected = True + + + def disconnect(self): + self.connected = False + + + def lock(self): + pass + + + def unlock(self): + pass diff --git a/tests/test_basic.py b/tests/test_basic.py index dff6480..79a61fe 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -11,15 +11,20 @@ from chainsyncer.session import SyncSession class MockStore(State): - def __init__(self, bits): + def __init__(self, bits=0): super(MockStore, self).__init__(bits, check_alias=False) class MockFilter: - def __init__(self, z, name): - self.z = z + def __init__(self, name, brk=False, z=None): self.name = name + if z == None: + h = hashlib.sha256() + h.update(self.name.encode('utf-8')) + z = h.digest() + self.z = z + self.brk = brk def sum(self): @@ -30,43 +35,59 @@ class MockFilter: return self.name + def filter(self, conn, block, tx): + return self.brk + + class TestSync(unittest.TestCase): - def setUp(self): - self.store = MockStore(6) - self.state = SyncState(self.store) - - def test_basic(self): - session = SyncSession(self.state) + store = MockStore(6) + state = SyncState(store) + session = SyncSession(None, state) self.assertTrue(session.is_default) - session = SyncSession(self.state, session_id='foo') + session = SyncSession(None, state, session_id='foo') self.assertFalse(session.is_default) def test_sum(self): + store = MockStore(4) + state = SyncState(store) + b = b'\x2a' * 32 - fltr = MockFilter(b, name='foo') - self.state.register(fltr) + fltr = MockFilter('foo', z=b) + state.register(fltr) b = b'\x0d' * 31 - fltr = MockFilter(b, name='bar') + fltr = MockFilter('bar', z=b) with self.assertRaises(ValueError): - self.state.register(fltr) + state.register(fltr) b = b'\x0d' * 32 - fltr = MockFilter(b, name='bar') - self.state.register(fltr) + fltr = MockFilter('bar', z=b) + state.register(fltr) - v = self.state.sum() + v = state.sum() self.assertEqual(v.hex(), 'a24abf9fec112b4e0210ae874b4a371f8657b1ee0d923ad6d974aef90bad8550') def test_session_start(self): - session = SyncSession(self.state) + store = MockStore(6) + state = SyncState(store) + session = SyncSession(None, state) session.start() - + + + def test_state_dynamic(self): + store = MockStore() + state = SyncState(store) + + b = b'\x0d' * 32 + fltr = MockFilter(name='foo', z=b) + state.register(fltr) + + if __name__ == '__main__': unittest.main()