diff --git a/chainqueue/db/models/otx.py b/chainqueue/db/models/otx.py index 2729b63..7abfaf6 100644 --- a/chainqueue/db/models/otx.py +++ b/chainqueue/db/models/otx.py @@ -140,6 +140,10 @@ class Otx(SessionBase): if is_error_status(self.status): SessionBase.release_session(session) raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status))) + if not self.status & StatusBits.RESERVED: + SessionBase.release_session(session) + raise TxStateChangeError('FUBAR on tx that has not been RESEREVED ({})'.format(status_str(self.status))) + self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session) @@ -168,6 +172,11 @@ class Otx(SessionBase): if is_error_status(self.status): SessionBase.release_session(session) raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status))) + if not self.status & StatusBits.RESERVED: + SessionBase.release_session(session) + raise TxStateChangeError('REJECTED on tx that has not been RESEREVED ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session) @@ -294,6 +303,10 @@ class Otx(SessionBase): SessionBase.release_session(session) raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if not self.status & StatusBits.RESERVED: + SessionBase.release_session(session) + raise TxStateChangeError('SENT on tx that has not been RESEREVED ({})'.format(status_str(self.status))) + self.__set_status(StatusBits.IN_NETWORK, session) self.__reset_status(StatusBits.RESERVED | StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session) @@ -321,6 +334,9 @@ class Otx(SessionBase): if self.status & StatusBits.IN_NETWORK: SessionBase.release_session(session) raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) + if not self.status & StatusBits.RESERVED: + SessionBase.release_session(session) + raise TxStateChangeError('SENDFAIL on tx that has not been RESEREVED ({})'.format(status_str(self.status))) self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session) self.__reset_status(StatusBits.RESERVED | StatusBits.QUEUED | StatusBits.GAS_ISSUES, session) @@ -338,17 +354,20 @@ class Otx(SessionBase): :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. """ - if self.__status_not_set(StatusBits.QUEUED): + if self.__status_already_set(StatusBits.RESERVED): return session = SessionBase.bind_session(session) + if self.status & StatusBits.QUEUED == 0: + SessionBase.release_session(session) + raise TxStateChangeError('RESERVED cannot be set on an entry without QUEUED state set ({})'.format(status_str(self.status))) if self.status & StatusBits.FINAL: SessionBase.release_session(session) - raise TxStateChangeError('QUEUED cannot be unset on an entry with FINAL state set ({})'.format(status_str(self.status))) + raise TxStateChangeError('RESERVED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) if self.status & StatusBits.IN_NETWORK: SessionBase.release_session(session) - raise TxStateChangeError('QUEUED cannot be unset on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) + raise TxStateChangeError('RESERVED cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) self.__reset_status(StatusBits.QUEUED, session) self.__set_status(StatusBits.RESERVED, session) diff --git a/chainqueue/state.py b/chainqueue/state.py index 856a1ba..d0b1fc7 100644 --- a/chainqueue/state.py +++ b/chainqueue/state.py @@ -60,7 +60,7 @@ def set_sent(tx_hash, fail=False): return tx_hash -def set_final(tx_hash, block=None, fail=False, cancel_obsoletes=True): +def set_final(tx_hash, block=None, fail=False): """Used to set the status of an incoming transaction result. :param tx_hash: Transaction hash of record to modify @@ -113,7 +113,7 @@ def set_cancel(tx_hash, manual=False): """ session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + o = Otx.load(tx_hash, session=session) if o == None: session.close() raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) @@ -146,7 +146,7 @@ def set_rejected(tx_hash): """ session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + o = Otx.load(tx_hash, session=session) if o == None: session.close() raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) @@ -171,7 +171,7 @@ def set_fubar(tx_hash): """ session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + o = Otx.load(tx_hash, session=session) if o == None: session.close() raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) diff --git a/tests/test_otx.py b/tests/test_otx.py index 1ce9d68..9aa6301 100644 --- a/tests/test_otx.py +++ b/tests/test_otx.py @@ -59,5 +59,105 @@ class TestOtx(TestBase): self.assertFalse(is_error_status(otx.status)) + def test_send_fail_and_retry(self): + set_ready(self.tx_hash) + otx = Otx.load(self.tx_hash, session=self.session) + self.assertEqual(otx.status, StatusBits.QUEUED) + + set_reserved(self.tx_hash) + self.session.refresh(otx) + self.assertEqual(otx.status, StatusBits.RESERVED) + + set_sent(self.tx_hash, fail=True) + self.session.refresh(otx) + self.assertTrue(is_error_status(otx.status)) + + set_ready(self.tx_hash) + self.session.refresh(otx) + self.assertEqual(otx.status & StatusBits.QUEUED, StatusBits.QUEUED) + self.assertTrue(is_error_status(otx.status)) + + set_reserved(self.tx_hash) + self.session.refresh(otx) + self.assertEqual(otx.status & StatusBits.RESERVED, StatusBits.RESERVED) + self.assertTrue(is_error_status(otx.status)) + + set_sent(self.tx_hash) + self.session.refresh(otx) + self.assertEqual(otx.status, StatusBits.IN_NETWORK) + self.assertFalse(is_error_status(otx.status)) + + set_final(self.tx_hash, block=1024) + self.session.refresh(otx) + self.assertFalse(is_alive(otx.status)) + self.assertFalse(is_error_status(otx.status)) + + + def test_fubar(self): + set_ready(self.tx_hash) + otx = Otx.load(self.tx_hash, session=self.session) + self.assertEqual(otx.status, StatusBits.QUEUED) + + set_reserved(self.tx_hash) + self.session.refresh(otx) + self.assertEqual(otx.status & StatusBits.RESERVED, StatusBits.RESERVED) + + set_fubar(self.tx_hash) + self.session.refresh(otx) + self.assertTrue(is_error_status(otx.status)) + self.assertEqual(otx.status & StatusBits.UNKNOWN_ERROR, StatusBits.UNKNOWN_ERROR) + + + def test_reject(self): + set_ready(self.tx_hash) + otx = Otx.load(self.tx_hash, session=self.session) + self.assertEqual(otx.status, StatusBits.QUEUED) + + set_reserved(self.tx_hash) + self.session.refresh(otx) + self.assertEqual(otx.status & StatusBits.RESERVED, StatusBits.RESERVED) + + set_rejected(self.tx_hash) + self.session.refresh(otx) + self.assertTrue(is_error_status(otx.status)) + self.assertEqual(otx.status & StatusBits.NODE_ERROR, StatusBits.NODE_ERROR) + + + def test_final_fail(self): + set_ready(self.tx_hash) + set_reserved(self.tx_hash) + set_sent(self.tx_hash) + set_final(self.tx_hash, block=1042, fail=True) + otx = Otx.load(self.tx_hash, session=self.session) + self.assertFalse(is_alive(otx.status)) + self.assertTrue(is_error_status(otx.status)) + + + def test_final_protected(self): + set_ready(self.tx_hash) + set_reserved(self.tx_hash) + set_sent(self.tx_hash) + set_final(self.tx_hash, block=1042) + + otx = Otx.load(self.tx_hash, session=self.session) + self.assertEqual(otx.status & StatusBits.FINAL, StatusBits.FINAL) + + with self.assertRaises(TxStateChangeError): + set_ready(self.tx_hash) + + with self.assertRaises(TxStateChangeError): + set_fubar(self.tx_hash) + + with self.assertRaises(TxStateChangeError): + set_rejected(self.tx_hash) + + set_cancel(self.tx_hash) + self.session.refresh(otx) + self.assertEqual(otx.status & StatusBits.OBSOLETE, 0) + + with self.assertRaises(TxStateChangeError): + set_reserved(self.tx_hash) + + if __name__ == '__main__': unittest.main()