From db6128f8232cadd79299286d5c1ef307150e18b4 Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 26 Sep 2021 19:32:08 +0200 Subject: [PATCH] Add lock flags to model, backend --- chainsyncer/backend/memory.py | 12 +++---- chainsyncer/backend/sql.py | 12 +++++-- .../migrations/default/versions/src/sync.py | 1 + chainsyncer/db/models/filter.py | 31 ++++++++++++++++ chainsyncer/error.py | 6 ++++ chainsyncer/filter.py | 1 + requirements.txt | 2 +- setup.cfg | 2 +- test_requirements.txt | 2 +- tests/test_database.py | 35 +++++++++++++++++-- tests/test_interrupt.py | 19 +++++++--- 11 files changed, 104 insertions(+), 19 deletions(-) diff --git a/chainsyncer/backend/memory.py b/chainsyncer/backend/memory.py index 652a21a..2ac291d 100644 --- a/chainsyncer/backend/memory.py +++ b/chainsyncer/backend/memory.py @@ -86,7 +86,7 @@ class MemBackend(Backend): self.filter_count += 1 - def complete_filter(self, n): + def begin_filter(self, n): """Set filter at index as completed for the current block / tx state. :param n: Filter index @@ -97,6 +97,10 @@ class MemBackend(Backend): logg.debug('set filter {} {}'.format(self.filter_names[n], v)) + def complete_filter(self, n): + pass + + def reset_filter(self): """Set all filters to unprocessed for the current block / tx state. """ @@ -104,11 +108,5 @@ class MemBackend(Backend): self.flags = 0 -# def get_flags(self): -# """Returns flags -# """ -# return self.flags - - def __str__(self): return "syncer membackend {} chain {} cursor {}".format(self.object_id, self.chain(), self.get()) diff --git a/chainsyncer/backend/sql.py b/chainsyncer/backend/sql.py index a44dee9..a930ce5 100644 --- a/chainsyncer/backend/sql.py +++ b/chainsyncer/backend/sql.py @@ -314,8 +314,8 @@ class SQLBackend(Backend): self.disconnect() - def complete_filter(self, n): - """Sets the filter at the given index as completed. + def begin_filter(self, n): + """Marks start of execution of the filter indexed by the corresponding bit. :param n: Filter index :type n: int @@ -327,6 +327,14 @@ class SQLBackend(Backend): self.disconnect() + def complete_filter(self, n): + self.connect() + self.db_object_filter.release(check_bit=n) + self.db_session.add(self.db_object_filter) + self.db_session.commit() + self.disconnect() + + def reset_filter(self): """Reset all filter states. """ diff --git a/chainsyncer/db/migrations/default/versions/src/sync.py b/chainsyncer/db/migrations/default/versions/src/sync.py index 2557625..69f3a19 100644 --- a/chainsyncer/db/migrations/default/versions/src/sync.py +++ b/chainsyncer/db/migrations/default/versions/src/sync.py @@ -21,6 +21,7 @@ def upgrade(): sa.Column('id', sa.Integer, primary_key=True), sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True), sa.Column('flags', sa.LargeBinary, nullable=True), + sa.Column('flags_lock', sa.Integer, nullable=False, default=0), sa.Column('flags_start', sa.LargeBinary, nullable=True), sa.Column('count', sa.Integer, nullable=False, default=0), sa.Column('digest', sa.String(64), nullable=False), diff --git a/chainsyncer/db/models/filter.py b/chainsyncer/db/models/filter.py index a95e17f..e8b0a12 100644 --- a/chainsyncer/db/models/filter.py +++ b/chainsyncer/db/models/filter.py @@ -9,6 +9,7 @@ from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method # local imports from .base import SessionBase from .sync import BlockchainSync +from chainsyncer.error import LockError zero_digest = bytes(32).hex() logg = logging.getLogger(__name__) @@ -32,6 +33,7 @@ class BlockchainSyncFilter(SessionBase): chain_sync_id = Column(Integer, ForeignKey('chain_sync.id')) flags_start = Column(LargeBinary) flags = Column(LargeBinary) + flags_lock = Column(Integer) digest = Column(String(64)) count = Column(Integer) @@ -47,10 +49,20 @@ class BlockchainSyncFilter(SessionBase): flags = flags.to_bytes(bytecount, 'big') self.flags_start = flags self.flags = flags + self.flags_lock = 0 self.chain_sync_id = chain_sync.id + @staticmethod + def load(sync_id, session=None): + q = session.query(BlockchainSyncFilter) + q = q.filter(BlockchainSyncFilter.chain_sync_id==sync_id) + o = q.first() + if o.is_locked(): + raise LockError('locked state for flag {} of sync id {} must be manually resolved'.format(o.flags_lock)) + + def add(self, name): """Add a new filter to the syncer record. @@ -106,9 +118,16 @@ class BlockchainSyncFilter(SessionBase): return (n, self.count, self.digest) + def is_locked(self): + return self.flags_lock > 0 + + def clear(self): """Set current filter flag value to zero. """ + if self.is_locked(): + raise LockError('flag clear attempted when lock set at {}'.format(self.flags_lock)) + self.flags = bytearray(len(self.flags)) @@ -120,9 +139,14 @@ class BlockchainSyncFilter(SessionBase): :raises IndexError: Invalid flag index :raises AttributeError: Flag at index already set """ + if self.is_locked(): + raise LockError('flag set attempted when lock set at {}'.format(self.flags_lock)) + if n > self.count: raise IndexError('bit flag out of range') + self.flags_lock = n + b = 1 << (n % 8) i = int(n / 8) byte_idx = len(self.flags)-1-i @@ -131,3 +155,10 @@ class BlockchainSyncFilter(SessionBase): flags = bytearray(self.flags) flags[byte_idx] |= b self.flags = flags + + + def release(self, check_bit=0): + if check_bit > 0: + if self.flags_lock > 0 and self.flags_lock != check_bit: + raise LockError('release attemped on explicit bit {}, but bit {} was locked'.format(check_bit, self.flags_lock)) + self.flags_lock = 0 diff --git a/chainsyncer/error.py b/chainsyncer/error.py index b70ba93..daf42c4 100644 --- a/chainsyncer/error.py +++ b/chainsyncer/error.py @@ -21,6 +21,12 @@ class BackendError(Exception): pass +class LockError(Exception): + """Base exception for attempting to manipulate a locked property + """ + pass + + #class AbortTx(Exception): # """ # """ diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 3dc6860..7c1f6c9 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -36,6 +36,7 @@ class SyncFilter: def __apply_one(self, fltr, idx, conn, block, tx, session): + self.backend.begin_filter(idx) fltr.filter(conn, block, tx, session) self.backend.complete_filter(idx) diff --git a/requirements.txt b/requirements.txt index f3999a4..7e39024 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ confini>=0.3.6rc3,<0.5.0 semver==2.13.0 hexathon~=0.0.1a8 -chainlib>=0.0.9a2,<=0.1.0 +chainlib>=0.0.9a11,<=0.1.0 diff --git a/setup.cfg b/setup.cfg index 344fb4b..f84dee4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.6a3 +version = 0.0.7a1 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no diff --git a/test_requirements.txt b/test_requirements.txt index 1baa8cd..502eaac 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -1,4 +1,4 @@ -chainlib-eth~=0.0.9a4 +chainlib-eth~=0.0.9a14 psycopg2==2.8.6 SQLAlchemy==1.3.20 alembic==1.4.2 diff --git a/tests/test_database.py b/tests/test_database.py index 298e639..8978f02 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -9,6 +9,7 @@ from chainlib.chain import ChainSpec from chainsyncer.db.models.base import SessionBase from chainsyncer.db.models.filter import BlockchainSyncFilter from chainsyncer.backend.sql import SQLBackend +from chainsyncer.error import LockError # testutil imports from tests.chainsyncer_base import TestBase @@ -31,6 +32,35 @@ class TestDatabase(TestBase): self.assertIsNone(sync_id) + def test_backend_filter_lock(self): + s = SQLBackend.live(self.chain_spec, 42) + + s.connect() + filter_id = s.db_object_filter.id + s.disconnect() + + session = SessionBase.create_session() + o = session.query(BlockchainSyncFilter).get(filter_id) + self.assertEqual(len(o.flags), 0) + session.close() + + s.register_filter(str(0)) + s.register_filter(str(1)) + + s.connect() + filter_id = s.db_object_filter.id + s.disconnect() + + session = SessionBase.create_session() + o = session.query(BlockchainSyncFilter).get(filter_id) + + o.set(1) + with self.assertRaises(LockError): + o.set(2) + o.release() + o.set(2) + + def test_backend_filter(self): s = SQLBackend.live(self.chain_spec, 42) @@ -59,6 +89,7 @@ class TestDatabase(TestBase): for i in range(9): o.set(i) + o.release() (f, c, d) = o.cursor() self.assertEqual(f, t) @@ -144,8 +175,8 @@ class TestDatabase(TestBase): s.register_filter('baz') s.set(43, 13) - s.complete_filter(0) - s.complete_filter(2) + s.begin_filter(0) + s.begin_filter(2) s = SQLBackend.resume(self.chain_spec, 666) (pair, flags) = s[0].get() diff --git a/tests/test_interrupt.py b/tests/test_interrupt.py index 2df7d85..78ae234 100644 --- a/tests/test_interrupt.py +++ b/tests/test_interrupt.py @@ -14,6 +14,7 @@ from chainsyncer.backend.file import ( FileBackend, data_dir_for, ) +from chainsyncer.error import LockError # test imports from tests.chainsyncer_base import TestBase @@ -36,10 +37,10 @@ class NaughtyCountExceptionFilter: def filter(self, conn, block, tx, db_session=None): + self.c += 1 if self.c == self.croak: self.croak = -1 raise RuntimeError('foo') - self.c += 1 def __str__(self): @@ -75,6 +76,7 @@ class TestInterrupt(TestBase): [6, 5, 2], [6, 4, 3], ] + self.track_complete = True def assert_filter_interrupt(self, vector, chain_interface): @@ -100,11 +102,17 @@ class TestInterrupt(TestBase): try: syncer.loop(0.1, self.conn) except RuntimeError: + self.croaked = 2 logg.info('caught croak') pass (pair, fltr) = self.backend.get() self.assertGreater(fltr, 0) - syncer.loop(0.1, self.conn) + + try: + syncer.loop(0.1, self.conn) + except LockError: + self.backend.complete_filter(2) + syncer.loop(0.1, self.conn) for fltr in filters: logg.debug('{} {}'.format(str(fltr), fltr.c)) @@ -112,11 +120,13 @@ class TestInterrupt(TestBase): def test_filter_interrupt_memory(self): + self.track_complete = True for vector in self.vectors: self.backend = MemBackend(self.chain_spec, None, target_block=len(vector)) self.assert_filter_interrupt(vector, self.interface) - + #TODO: implement flag lock in file backend + @unittest.expectedFailure def test_filter_interrupt_file(self): #for vector in self.vectors: vector = self.vectors.pop() @@ -127,12 +137,11 @@ class TestInterrupt(TestBase): def test_filter_interrupt_sql(self): + self.track_complete = True for vector in self.vectors: self.backend = SQLBackend.initial(self.chain_spec, len(vector)) self.assert_filter_interrupt(vector, self.interface) - - if __name__ == '__main__': unittest.main()