From d27bcaa9f5142eea9ef1685f8e3a25dca878fced Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 20 Apr 2022 15:15:43 +0000 Subject: [PATCH] Factor out common store tests, implement for fs and rocksdb --- chainsyncer/store/rocksdb.py | 13 +- chainsyncer/unittest/db.py | 64 -------- chainsyncer/unittest/store.py | 267 ++++++++++++++++++++++++++++++++++ tests/test_fs.py | 243 ++----------------------------- tests/test_rocksdb.py | 34 ++--- 5 files changed, 305 insertions(+), 316 deletions(-) delete mode 100644 chainsyncer/unittest/db.py create mode 100644 chainsyncer/unittest/store.py diff --git a/chainsyncer/store/rocksdb.py b/chainsyncer/store/rocksdb.py index cd3a5d9..d0cce28 100644 --- a/chainsyncer/store/rocksdb.py +++ b/chainsyncer/store/rocksdb.py @@ -36,17 +36,17 @@ class SyncRocksDbStore(SyncStore): def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None): super(SyncRocksDbStore, self).__init__(base_path, session_id=session_id) - factory = RocksDbStoreFactory(self.session_path, binary=True) - prefix_factory = RocksDbStoreAdder(factory, 'sync') + self.factory = RocksDbStoreFactory(self.session_path, binary=True) + prefix_factory = RocksDbStoreAdder(self.factory, 'sync') self.setup_sync_state(prefix_factory, state_event_callback) - prefix_factory = RocksDbStoreAdder(factory, 'filter') + prefix_factory = RocksDbStoreAdder(self.factory, 'filter') self.setup_filter_state(prefix_factory, filter_state_event_callback) self.session_id = os.path.basename(self.session_path) logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path)) - self.target_db = RocksDbStoreAdder(factory, '.stat').add('target') + self.target_db = RocksDbStoreAdder(self.factory, '.stat').add('target') def get_target(self): @@ -55,3 +55,8 @@ class SyncRocksDbStore(SyncStore): def set_target(self, v): self.target_db.put('target') + + + def disconnect(self): + super(SyncRocksDbStore, self).disconnect() + self.factory.close() diff --git a/chainsyncer/unittest/db.py b/chainsyncer/unittest/db.py deleted file mode 100644 index 094acfe..0000000 --- a/chainsyncer/unittest/db.py +++ /dev/null @@ -1,64 +0,0 @@ -# standard imports -import logging -import os - -# external imports -import alembic -import alembic.config - -# local imports -from chainsyncer.db.models.base import SessionBase -from chainsyncer.db import dsn_from_config -from chainsyncer.db.models.base import SessionBase - -logg = logging.getLogger(__name__) - - -class ChainSyncerDb: - """SQLITE database setup for unit tests - - :param debug: Activate sql level debug (outputs sql statements) - :type debug: bool - """ - - base = SessionBase - - def __init__(self, debug=False): - config = { - 'DATABASE_ENGINE': 'sqlite', - 'DATABASE_DRIVER': 'pysqlite', - 'DATABASE_NAME': 'chainsyncer.sqlite', - } - logg.debug('config {}'.format(config)) - - self.dsn = dsn_from_config(config) - - self.base.poolable = False - self.base.transactional = False - self.base.procedural = False - self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0 - - rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..') - dbdir = os.path.join(rootdir, 'chainsyncer', 'db') - #migrationsdir = os.path.join(dbdir, 'migrations', config.get('DATABASE_ENGINE')) - migrationsdir = os.path.join(dbdir, 'migrations', 'default') - logg.info('using migrations directory {}'.format(migrationsdir)) - - ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini')) - ac.set_main_option('sqlalchemy.url', self.dsn) - ac.set_main_option('script_location', migrationsdir) - - alembic.command.downgrade(ac, 'base') - alembic.command.upgrade(ac, 'head') - - - def bind_session(self, session=None): - """Create session using underlying session base - """ - return self.base.bind_session(session) - - - def release_session(self, session=None): - """Release session using underlying session base - """ - return self.base.release_session(session) diff --git a/chainsyncer/unittest/store.py b/chainsyncer/unittest/store.py new file mode 100644 index 0000000..3dba999 --- /dev/null +++ b/chainsyncer/unittest/store.py @@ -0,0 +1,267 @@ +# standard imports +import os +import stat +import unittest +import shutil +import tempfile +import logging + +# local imports +from chainsyncer.session import SyncSession +from chainsyncer.error import ( + LockError, + FilterDone, + IncompleteFilterError, + SyncDone, + ) +from chainsyncer.unittest import MockFilter + +logg = logging.getLogger(__name__) + + +class TestStoreBase(unittest.TestCase): + + def setUp(self): + self.path = tempfile.mkdtemp() + self.store_factory = None + + + @classmethod + def link(cls, target): + for v in [ + "default", + "store_start", + "store_resume", + "sync_process_nofilter", + "sync_process_onefilter", + "sync_process_outoforder", + "sync_process_interrupt", + "sync_process_reset", + "sync_process_done", + "sync_head_future", + "sync_history_interrupted", + "sync_history_complete", + ]: + setattr(target, 'test_' + v, getattr(cls, 't_' + v)) + + + def tearDown(self): + shutil.rmtree(self.path) + + + def t_default(self): + store = self.store_factory() + + fp = os.path.join(self.path, store.session_id) + session_id = store.session_id + st = os.stat(fp) + self.assertTrue(stat.S_ISDIR(st.st_mode)) + self.assertTrue(store.is_default) + + fpd = os.path.join(self.path, 'default') + st = os.stat(fpd) + self.assertTrue(stat.S_ISDIR(st.st_mode)) + self.assertTrue(store.is_default) + + fpd = os.path.realpath(fpd) + self.assertEqual(fpd, fp) + + store.disconnect() + store = self.store_factory() + fpr = os.path.join(self.path, session_id) + self.assertEqual(fp, fpr) + self.assertTrue(store.is_default) + + store.disconnect() + store = self.store_factory('default') + fpr = os.path.join(self.path, session_id) + self.assertEqual(fp, fpr) + self.assertTrue(store.is_default) + + store.disconnect() + store = self.store_factory('foo') + fpf = os.path.join(self.path, 'foo') + st = os.stat(fpf) + self.assertTrue(stat.S_ISDIR(st.st_mode)) + self.assertFalse(store.is_default) + + + def t_store_start(self): + store = self.store_factory() + store.start(42) + self.assertTrue(store.first) + + store.disconnect() + store = self.store_factory() + store.start() + self.assertFalse(store.first) + + + def t_store_resume(self): + store = self.store_factory() + store.start(13) + self.assertTrue(store.first) + # todo not done + + + def t_sync_process_nofilter(self): + store = self.store_factory() + session = SyncSession(store) + session.start() + o = session.get(0) + with self.assertRaises(FilterDone): + o.advance() + + + def t_sync_process_onefilter(self): + store = self.store_factory() + session = SyncSession(store) + + fltr_one = MockFilter('foo') + store.register(fltr_one) + + session.start() + o = session.get(0) + o.advance() + o.release() + + + def t_sync_process_outoforder(self): + store = self.store_factory() + session = SyncSession(store) + + fltr_one = MockFilter('foo') + store.register(fltr_one) + fltr_two = MockFilter('two') + store.register(fltr_two) + + session.start() + o = session.get(0) + o.advance() + with self.assertRaises(LockError): + o.advance() + + o.release() + with self.assertRaises(LockError): + o.release() + + o.advance() + o.release() + + + def t_sync_process_interrupt(self): + store = self.store_factory() + session = SyncSession(store) + + fltr_one = MockFilter('foo') + store.register(fltr_one) + fltr_two = MockFilter('bar') + store.register(fltr_two) + + session.start() + o = session.get(0) + o.advance() + o.release(interrupt=True) + with self.assertRaises(FilterDone): + o.advance() + + + def t_sync_process_reset(self): + store = self.store_factory() + session = SyncSession(store) + + fltr_one = MockFilter('foo') + store.register(fltr_one) + fltr_two = MockFilter('bar') + store.register(fltr_two) + + session.start() + o = session.get(0) + o.advance() + with self.assertRaises(LockError): + o.reset() + o.release() + with self.assertRaises(IncompleteFilterError): + o.reset() + + o.advance() + o.release() + + with self.assertRaises(FilterDone): + o.advance() + + o.reset() + + + def t_sync_process_done(self): + store = self.store_factory() + session = SyncSession(store) + + fltr_one = MockFilter('foo') + store.register(fltr_one) + + session.start(target=0) + o = session.get(0) + o.advance() + o.release() + with self.assertRaises(FilterDone): + o.advance() + o.reset() + with self.assertRaises(SyncDone): + o.next(advance_block=True) + + + def t_sync_head_future(self): + store = self.store_factory('foo') + session = SyncSession(store) + + session.start() + o = session.get(0) + o.next(advance_block=True) + o.next(advance_block=True) + session.stop(o) + + store.disconnect() + store = self.store_factory('foo') + store.start() + o = store.get(2) + + + def t_sync_history_interrupted(self): + store = self.store_factory('foo') + session = SyncSession(store) + + session.start(target=13) + o = session.get(0) + o.next(advance_block=True) + o.next(advance_block=True) + session.stop(o) + + store.disconnect() + store = self.store_factory('foo') + store.start() + o = store.get(0) + self.assertEqual(o.cursor, 2) + self.assertEqual(o.target, 13) + o.next(advance_block=True) + o.next(advance_block=True) + + session.stop(o) + store.disconnect() + store = self.store_factory('foo') + store.start() + self.assertEqual(o.cursor, 4) + self.assertEqual(o.target, 13) + + + def t_sync_history_complete(self): + store = self.store_factory('foo') + session = SyncSession(store) + + session.start(target=3) + o = session.get(0) + o.next(advance_block=True) + o.next(advance_block=True) + o.next(advance_block=True) + with self.assertRaises(SyncDone): + o.next(advance_block=True) diff --git a/tests/test_fs.py b/tests/test_fs.py index 7415913..3845c74 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -1,245 +1,32 @@ # standard imports import unittest -import tempfile -import shutil import logging -import stat -import os # local imports from chainsyncer.store.fs import SyncFsStore -from chainsyncer.session import SyncSession -from chainsyncer.error import ( - LockError, - FilterDone, - IncompleteFilterError, - SyncDone, - ) -from chainsyncer.unittest import MockFilter +from chainsyncer.unittest.store import TestStoreBase logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() -class TestFs(unittest.TestCase): +class StoreFactory: + + def __init__(self, path): + self.path = path + + + def create(self, session_id=None): + return SyncFsStore(self.path, session_id=session_id) + + +class TestFs(TestStoreBase): def setUp(self): - self.path = tempfile.mkdtemp() - - - def tearDown(self): - shutil.rmtree(self.path) - - - def test_default(self): - store = SyncFsStore(self.path) - fp = os.path.join(self.path, store.session_id) - session_id = store.session_id - st = os.stat(fp) - self.assertTrue(stat.S_ISDIR(st.st_mode)) - self.assertTrue(store.is_default) - - fpd = os.path.join(self.path, 'default') - st = os.stat(fpd) - self.assertTrue(stat.S_ISDIR(st.st_mode)) - self.assertTrue(store.is_default) - - fpd = os.path.realpath(fpd) - self.assertEqual(fpd, fp) - - store = SyncFsStore(self.path) - fpr = os.path.join(self.path, session_id) - self.assertEqual(fp, fpr) - self.assertTrue(store.is_default) - - store = SyncFsStore(self.path, 'default') - fpr = os.path.join(self.path, session_id) - self.assertEqual(fp, fpr) - self.assertTrue(store.is_default) - - store = SyncFsStore(self.path, 'foo') - fpf = os.path.join(self.path, 'foo') - st = os.stat(fpf) - self.assertTrue(stat.S_ISDIR(st.st_mode)) - self.assertFalse(store.is_default) - - - def test_store_start(self): - store = SyncFsStore(self.path) - store.start(42) - self.assertTrue(store.first) - - store = SyncFsStore(self.path) - store.start() - self.assertFalse(store.first) - - - def test_store_resume(self): - store = SyncFsStore(self.path) - store.start(13) - self.assertTrue(store.first) - # todo not done - - - def test_sync_process_nofilter(self): - store = SyncFsStore(self.path) - session = SyncSession(store) - session.start() - o = session.get(0) - with self.assertRaises(FilterDone): - o.advance() - - - def test_sync_process_onefilter(self): - store = SyncFsStore(self.path) - session = SyncSession(store) - - fltr_one = MockFilter('foo') - store.register(fltr_one) - - session.start() - o = session.get(0) - o.advance() - o.release() - - - def test_sync_process_outoforder(self): - store = SyncFsStore(self.path) - session = SyncSession(store) - - fltr_one = MockFilter('foo') - store.register(fltr_one) - fltr_two = MockFilter('two') - store.register(fltr_two) - - session.start() - o = session.get(0) - o.advance() - with self.assertRaises(LockError): - o.advance() - - o.release() - with self.assertRaises(LockError): - o.release() - - o.advance() - o.release() - - - def test_sync_process_interrupt(self): - store = SyncFsStore(self.path) - session = SyncSession(store) - - fltr_one = MockFilter('foo') - store.register(fltr_one) - fltr_two = MockFilter('bar') - store.register(fltr_two) - - session.start() - o = session.get(0) - o.advance() - o.release(interrupt=True) - with self.assertRaises(FilterDone): - o.advance() - - - def test_sync_process_reset(self): - store = SyncFsStore(self.path) - session = SyncSession(store) - - fltr_one = MockFilter('foo') - store.register(fltr_one) - fltr_two = MockFilter('bar') - store.register(fltr_two) - - session.start() - o = session.get(0) - o.advance() - with self.assertRaises(LockError): - o.reset() - o.release() - with self.assertRaises(IncompleteFilterError): - o.reset() - - o.advance() - o.release() - - with self.assertRaises(FilterDone): - o.advance() - - o.reset() - - - def test_sync_process_done(self): - store = SyncFsStore(self.path) - session = SyncSession(store) - - fltr_one = MockFilter('foo') - store.register(fltr_one) - - session.start(target=0) - o = session.get(0) - o.advance() - o.release() - with self.assertRaises(FilterDone): - o.advance() - o.reset() - with self.assertRaises(SyncDone): - o.next(advance_block=True) - - - def test_sync_head_future(self): - store = SyncFsStore(self.path, session_id='foo') - session = SyncSession(store) - - session.start() - o = session.get(0) - o.next(advance_block=True) - o.next(advance_block=True) - session.stop(o) - - store = SyncFsStore(self.path, session_id='foo') - store.start() - o = store.get(2) - - - def test_sync_history_interrupted(self): - store = SyncFsStore(self.path, session_id='foo') - session = SyncSession(store) - - session.start(target=13) - o = session.get(0) - o.next(advance_block=True) - o.next(advance_block=True) - session.stop(o) - - store = SyncFsStore(self.path, session_id='foo') - store.start() - o = store.get(0) - self.assertEqual(o.cursor, 2) - self.assertEqual(o.target, 13) - o.next(advance_block=True) - o.next(advance_block=True) - - session.stop(o) - store = SyncFsStore(self.path, session_id='foo') - store.start() - self.assertEqual(o.cursor, 4) - self.assertEqual(o.target, 13) - - - def test_sync_history_complete(self): - store = SyncFsStore(self.path, session_id='foo') - session = SyncSession(store) - - session.start(target=3) - o = session.get(0) - o.next(advance_block=True) - o.next(advance_block=True) - o.next(advance_block=True) - with self.assertRaises(SyncDone): - o.next(advance_block=True) + super(TestFs, self).setUp() + self.store_factory = StoreFactory(self.path).create if __name__ == '__main__': + TestStoreBase.link(TestFs) unittest.main() diff --git a/tests/test_rocksdb.py b/tests/test_rocksdb.py index 86f851e..0b7bd38 100644 --- a/tests/test_rocksdb.py +++ b/tests/test_rocksdb.py @@ -1,37 +1,31 @@ # standard imports import unittest -import tempfile -import shutil import logging -import stat -import os # local imports from chainsyncer.store.rocksdb import SyncRocksDbStore -from chainsyncer.session import SyncSession -from chainsyncer.error import ( - LockError, - FilterDone, - IncompleteFilterError, - SyncDone, - ) -from chainsyncer.unittest import MockFilter +from chainsyncer.unittest.store import TestStoreBase logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() +class StoreFactory: -class TestFs(unittest.TestCase): + def __init__(self, path): + self.path = path + + + def create(self, session_id=None): + return SyncRocksDbStore(self.path, session_id=session_id) + + +class TestRocksDb(TestStoreBase): def setUp(self): - self.path = tempfile.mkdtemp() - - - def test_default(self): - store = SyncRocksDbStore(self.path) - store.start(42) - self.assertTrue(store.first) + super(TestRocksDb, self).setUp() + self.store_factory = StoreFactory(self.path).create if __name__ == '__main__': + TestStoreBase.link(TestRocksDb) unittest.main()