diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index fdafda9..4cfde9d 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -121,7 +121,7 @@ class SyncerBackend: """ self.connect() target = self.db_object.target() - (filter_state, count, digest) = self.db_object_filter.target() + (filter_target, count, digest) = self.db_object_filter.target() self.disconnect() return (target, filter_target,) @@ -190,12 +190,22 @@ class SyncerBackend: object_id = None + highest_unsynced_block = 0 + highest_unsynced_tx = 0 + object_id = BlockchainSync.get_last(session=session, live=False) + if object_id != None: + q = session.query(BlockchainSync) + o = q.get(object_id) + (highest_unsynced_block, highest_unsynced_index) = o.cursor() + for object_id in BlockchainSync.get_unsynced(session=session): logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) - syncers.append(SyncerBackend(chain_spec, object_id)) + s = SyncerBackend(chain_spec, object_id) + syncers.append(s) - last_live_id = BlockchainSync.get_last_live(block_height, session=session) + last_live_id = BlockchainSync.get_last(session=session) logg.debug('last_live_id {}'.format(last_live_id)) + logg.debug('higesst {} {}'.format(highest_unsynced_block, highest_unsynced_tx)) if last_live_id != None: q = session.query(BlockchainSync) @@ -204,7 +214,8 @@ class SyncerBackend: (block_resume, tx_resume) = o.cursor() session.flush() - if block_height != block_resume: + #if block_height != block_resume: + if highest_unsynced_block < block_resume: q = session.query(BlockchainSyncFilter) q = q.filter(BlockchainSyncFilter.chain_sync_id==last_live_id) diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py index 01c28c1..ab6ab88 100644 --- a/chainsyncer/db/models/sync.py +++ b/chainsyncer/db/models/sync.py @@ -61,11 +61,9 @@ class BlockchainSync(SessionBase): @staticmethod - def get_last_live(current, session=None): + def get_last(session=None, live=True): """Get the most recent open-ended ("live") syncer record. - :param current: Current block number - :type current: number :param session: Session to use. If not specified, a separate session will be created for this method only. :type session: SqlAlchemy Session :returns: Block and transaction number, respectively @@ -74,7 +72,10 @@ class BlockchainSync(SessionBase): session = SessionBase.bind_session(session) q = session.query(BlockchainSync.id) - q = q.filter(BlockchainSync.block_target==None) + if live: + q = q.filter(BlockchainSync.block_target==None) + else: + q = q.filter(BlockchainSync.block_target!=None) q = q.order_by(BlockchainSync.date_created.desc()) object_id = q.first() @@ -169,3 +170,20 @@ class BlockchainSync(SessionBase): self.block_target = block_target self.date_created = datetime.datetime.utcnow() self.date_updated = datetime.datetime.utcnow() + + + def __str__(self): + return """object_id: {} +start: {}:{} +cursor: {}:{} +target: {} +""".format( + self.id, + self.block_start, + self.tx_start, + self.block_cursor, + self.tx_cursor, + self.block_target, + ) + + diff --git a/tests/test_database.py b/tests/test_database.py index 5066f69..9797b18 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -67,7 +67,6 @@ class TestDatabase(TestBase): session.close() - def test_backend_retrieve(self): s = SyncerBackend.live(self.chain_spec, 42) s.register_filter('foo') @@ -107,15 +106,46 @@ class TestDatabase(TestBase): self.assertEqual(len(s), 1) resumed_id = s[0].object_id self.assertEqual(resumed_id, original_id + 1) + self.assertEqual(s[0].get(), ((42, 0), 0)) + def test_backend_resume_when_completed(self): + s = SyncerBackend.live(self.chain_spec, 42) + + s = SyncerBackend.resume(self.chain_spec, 666) + s[0].set(666, 0) + + s = SyncerBackend.resume(self.chain_spec, 666) + self.assertEqual(len(s), 0) + + def test_backend_resume_several(self): s = SyncerBackend.live(self.chain_spec, 42) s.set(43, 13) + s = SyncerBackend.resume(self.chain_spec, 666) + SyncerBackend.live(self.chain_spec, 666) s[0].set(123, 2) + + logg.debug('>>>>>') s = SyncerBackend.resume(self.chain_spec, 1024) + SyncerBackend.live(self.chain_spec, 1024) + s[0].connect() + logg.debug('syncer 1 {}'.format(s[0].db_object)) + s[0].disconnect() + s[1].connect() + logg.debug('syncer 2 {}'.format(s[1].db_object)) + s[1].disconnect() + + + self.assertEqual(len(s), 2) + self.assertEqual(s[0].target(), (666, 0)) + self.assertEqual(s[0].get(), ((123, 2), 0)) + self.assertEqual(s[1].target(), (1024, 0)) + self.assertEqual(s[1].get(), ((666, 0), 0)) + + if __name__ == '__main__': unittest.main()