diff --git a/apps/cic-eth/cic_eth/admin/nonce.py b/apps/cic-eth/cic_eth/admin/nonce.py index 5da7e877..d250df09 100644 --- a/apps/cic-eth/cic_eth/admin/nonce.py +++ b/apps/cic-eth/cic_eth/admin/nonce.py @@ -5,11 +5,13 @@ import logging import celery from chainlib.chain import ChainSpec from chainlib.eth.tx import unpack +from chainqueue.query import get_tx +from chainqueue.state import set_cancel +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache # local imports from cic_eth.db.models.base import SessionBase -from cic_eth.db.models.otx import Otx -from cic_eth.db.models.tx import TxCache from cic_eth.db.models.nonce import Nonce from cic_eth.admin.ctrl import ( lock_send, @@ -17,14 +19,8 @@ from cic_eth.admin.ctrl import ( lock_queue, unlock_queue, ) -from cic_eth.queue.tx import ( - get_tx, - set_cancel, - ) -from cic_eth.queue.tx import create as queue_create -from cic_eth.eth.gas import ( - create_check_gas_task, - ) +from cic_eth.queue.tx import queue_create +from cic_eth.eth.gas import create_check_gas_task celery_app = celery.current_app logg = logging.getLogger() diff --git a/apps/cic-eth/cic_eth/api/api_admin.py b/apps/cic-eth/cic_eth/api/api_admin.py index 67be4176..f62a10b7 100644 --- a/apps/cic-eth/cic_eth/api/api_admin.py +++ b/apps/cic-eth/cic_eth/api/api_admin.py @@ -22,23 +22,21 @@ from hexathon import ( add_0x, ) from chainlib.eth.gas import balance - -# local imports -from cic_eth.db.models.base import SessionBase -from cic_eth.db.models.role import AccountRole -from cic_eth.db.models.otx import Otx -from cic_eth.db.models.tx import TxCache -from cic_eth.db.models.nonce import Nonce -from cic_eth.db.enum import ( +from chainqueue.db.enum import ( StatusEnum, StatusBits, is_alive, is_error_status, status_str, ) +from chainqueue.error import TxStateChangeError + +# local imports +from cic_eth.db.models.base import SessionBase +from cic_eth.db.models.role import AccountRole +from cic_eth.db.models.nonce import Nonce from cic_eth.error import InitializationError -from cic_eth.db.error import TxStateChangeError -from cic_eth.queue.tx import get_tx +from cic_eth.queue.query import get_tx app = celery.current_app @@ -138,8 +136,9 @@ class AdminApi: logg.debug('resend {}'.format(tx_hash_hex)) s_get_tx_cache = celery.signature( - 'cic_eth.queue.tx.get_tx_cache', + 'cic_eth.queue.query.get_tx_cache', [ + chain_spec.asdict(), tx_hash_hex, ], queue=self.queue, @@ -188,8 +187,9 @@ class AdminApi: def check_nonce(self, address): s = celery.signature( - 'cic_eth.queue.tx.get_account_tx', + 'cic_eth.queue.query.get_account_tx', [ + chain_spec.asdict(), address, True, False, @@ -204,8 +204,9 @@ class AdminApi: last_nonce = -1 for k in txs.keys(): s_get_tx = celery.signature( - 'cic_eth.queue.tx.get_tx', + 'cic_eth.queue.query.get_tx', [ + chain_spec.asdict(), k, ], queue=self.queue, @@ -243,8 +244,9 @@ class AdminApi: def fix_nonce(self, address, nonce, chain_spec): s = celery.signature( - 'cic_eth.queue.tx.get_account_tx', + 'cic_eth.queue.query.get_account_tx', [ + chain_spec.asdict(), address, True, False, @@ -294,8 +296,9 @@ class AdminApi: """ last_nonce = -1 s = celery.signature( - 'cic_eth.queue.tx.get_account_tx', + 'cic_eth.queue.query.get_account_tx', [ + chain_spec.asdict(), address, ], queue=self.queue, @@ -306,8 +309,11 @@ class AdminApi: for tx_hash in txs.keys(): errors = [] s = celery.signature( - 'cic_eth.queue.tx.get_tx_cache', - [tx_hash], + 'cic_eth.queue.query.get_tx_cache', + [ + chain_spec.asdict(), + tx_hash, + ], queue=self.queue, ) tx_dict = s.apply_async().get() @@ -367,12 +373,16 @@ class AdminApi: #tx_hash = self.w3.keccak(hexstr=tx_raw).hex() s = celery.signature( - 'cic_eth.queue.tx.get_tx_cache', - [tx_hash], + 'cic_eth.queue.query.get_tx_cache', + [ + chain_spec.asdict(), + tx_hash, + ], queue=self.queue, ) - tx = s.apply_async().get() + t = s.apply_async() + tx = t.get() source_token = None if tx['source_token'] != ZERO_ADDRESS: @@ -524,8 +534,9 @@ class AdminApi: tx['data'] = tx_unpacked['data'] s = celery.signature( - 'cic_eth.queue.tx.get_state_log', + 'cic_eth.queue.state.get_state_log', [ + chain_spec.asdict(), tx_hash, ], queue=self.queue, diff --git a/apps/cic-eth/cic_eth/api/api_task.py b/apps/cic-eth/cic_eth/api/api_task.py index cbe5c391..e07b24f6 100644 --- a/apps/cic-eth/cic_eth/api/api_task.py +++ b/apps/cic-eth/cic_eth/api/api_task.py @@ -93,7 +93,9 @@ class Api: ) s_nonce = celery.signature( 'cic_eth.eth.tx.reserve_nonce', - [], + [ + self.chain_spec.asdict(), + ], queue=self.queue, ) s_tokens = celery.signature( @@ -155,7 +157,9 @@ class Api: ) s_nonce = celery.signature( 'cic_eth.eth.tx.reserve_nonce', - [], + [ + self.chain_spec.asdict(), + ], queue=self.queue, ) s_tokens = celery.signature( @@ -215,6 +219,7 @@ class Api: s_nonce = celery.signature( 'cic_eth.eth.tx.reserve_nonce', [ + self.chain_spec.asdict(), from_address, ], queue=self.queue, @@ -361,6 +366,7 @@ class Api: s_nonce = celery.signature( 'cic_eth.eth.tx.reserve_nonce', [ + self.chain_spec.asdict(), 'ACCOUNT_REGISTRY_WRITER', ], queue=self.queue, @@ -399,6 +405,7 @@ class Api: s_nonce = celery.signature( 'cic_eth.eth.tx.reserve_nonce', [ + self.chain_spec.asdict(), 'GAS_GIFTER', ], queue=self.queue, @@ -439,8 +446,9 @@ class Api: """ offset = 0 s_local = celery.signature( - 'cic_eth.queue.tx.get_account_tx', + 'cic_eth.queue.query.get_account_tx', [ + self.chain_spec.asdict(), address, ], queue=self.queue, diff --git a/apps/cic-eth/cic_eth/db/__init__.py b/apps/cic-eth/cic_eth/db/__init__.py index 1a549508..62731220 100644 --- a/apps/cic-eth/cic_eth/db/__init__.py +++ b/apps/cic-eth/cic_eth/db/__init__.py @@ -11,10 +11,6 @@ logg = logging.getLogger() # an Engine, which the Session will use for connection # resources -# TODO: Remove the package exports, all models should be imported using full path -from .models.otx import Otx -from .models.convert import TxConvertTransfer - def dsn_from_config(config): """Generate a dsn string from the provided config dict. diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/75d4767b3031_lock.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/75d4767b3031_lock.py index 82e4384b..656fdd06 100644 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/75d4767b3031_lock.py +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/75d4767b3031_lock.py @@ -24,7 +24,7 @@ def upgrade(): sa.Column('blockchain', sa.String), sa.Column("flags", sa.BIGINT(), nullable=False, default=0), sa.Column("date_created", sa.DateTime, nullable=False), - sa.Column("otx_id", sa.Integer, nullable=True), + sa.Column("otx_id", sa.Integer, sa.ForeignKey('otx.id'), nullable=True), ) op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True) diff --git a/apps/cic-eth/cic_eth/db/models/lock.py b/apps/cic-eth/cic_eth/db/models/lock.py index 3737b893..d8dcb426 100644 --- a/apps/cic-eth/cic_eth/db/models/lock.py +++ b/apps/cic-eth/cic_eth/db/models/lock.py @@ -5,11 +5,11 @@ import logging # third-party imports from sqlalchemy import Column, String, Integer, DateTime, ForeignKey from chainlib.eth.constant import ZERO_ADDRESS +from chainqueue.db.models.tx import TxCache +from chainqueue.db.models.otx import Otx # local imports from cic_eth.db.models.base import SessionBase -from cic_eth.db.models.tx import TxCache -from cic_eth.db.models.otx import Otx logg = logging.getLogger() @@ -22,10 +22,12 @@ class Lock(SessionBase): __tablename__ = "lock" blockchain = Column(String) - address = Column(String, ForeignKey('tx_cache.sender')) + #address = Column(String, ForeignKey('tx_cache.sender')) + address = Column(String, ForeignKey(TxCache.sender)) flags = Column(Integer) date_created = Column(DateTime, default=datetime.datetime.utcnow) - otx_id = Column(Integer, ForeignKey('otx.id')) + otx_id = Column(Integer, ForeignKey(Otx.id)) + #otx_id = Column(Integer) def chain(self): diff --git a/apps/cic-eth/cic_eth/db/models/otx.py b/apps/cic-eth/cic_eth/db/models/otx.py deleted file mode 100644 index 6be5f53d..00000000 --- a/apps/cic-eth/cic_eth/db/models/otx.py +++ /dev/null @@ -1,680 +0,0 @@ -# standard imports -import datetime -import logging - -# external imports -from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey -from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method - -# local imports -from .base import SessionBase -from cic_eth.db.enum import ( - StatusEnum, - StatusBits, - status_str, - is_error_status, - ) -from cic_eth.db.error import TxStateChangeError - -logg = logging.getLogger() - - -class OtxStateLog(SessionBase): - - __tablename__ = 'otx_state_log' - - date = Column(DateTime, default=datetime.datetime.utcnow) - status = Column(Integer) - otx_id = Column(Integer, ForeignKey('otx.id')) - - - def __init__(self, otx): - self.otx_id = otx.id - self.status = otx.status - - -class Otx(SessionBase): - """Outgoing transactions with local origin. - - :param nonce: Transaction nonce - :type nonce: number - :param address: Ethereum address of recipient - NOT IN USE, REMOVE - :type address: str - :param tx_hash: Tranasction hash - :type tx_hash: str, 0x-hex - :param signed_tx: Signed raw transaction data - :type signed_tx: str, 0x-hex - """ - __tablename__ = 'otx' - - tracing = False - """Whether to enable queue state tracing""" - - nonce = Column(Integer) - date_created = Column(DateTime, default=datetime.datetime.utcnow) - tx_hash = Column(String(66)) - signed_tx = Column(Text) - status = Column(Integer) - block = Column(Integer) - - - def __set_status(self, status, session): - self.status |= status - session.add(self) - session.flush() - - - def __reset_status(self, status, session): - status_edit = ~status & self.status - self.status &= status_edit - session.add(self) - session.flush() - - - def __status_already_set(self, status): - r = bool(self.status & status) - if r: - logg.warning('status bit {} already set on {}'.format(status.name, self.tx_hash)) - return r - - - def __status_not_set(self, status): - r = not(self.status & status) - if r: - logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash)) - return r - - - def set_block(self, block, session=None): - """Set block number transaction was mined in. - - Only manipulates object, does not transaction or commit to backend. - - :param block: Block number - :type block: number - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - session = SessionBase.bind_session(session) - - if self.block != None: - SessionBase.release_session(session) - raise TxStateChangeError('Attempted set block {} when block was already {}'.format(block, self.block)) - self.block = block - session.add(self) - session.flush() - - SessionBase.release_session(session) - - - def waitforgas(self, session=None): - """Marks transaction as suspended pending gas funding. - - Only manipulates object, does not transaction or commit to backend. - - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - if self.__status_already_set(StatusBits.GAS_ISSUES): - return - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - if self.status & StatusBits.IN_NETWORK: - SessionBase.release_session(session) - raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) - - self.__set_status(StatusBits.GAS_ISSUES, session) - self.__reset_status(StatusBits.QUEUED | StatusBits.DEFERRED, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def fubar(self, session=None): - """Marks transaction as "fubar." Any transaction marked this way is an anomaly and may be a symptom of a serious problem. - - Only manipulates object, does not transaction or commit to backend. - """ - if self.__status_already_set(StatusBits.UNKNOWN_ERROR): - return - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - if is_error_status(self.status): - SessionBase.release_session(session) - raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status))) - - self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def reject(self, session=None): - """Marks transaction as "rejected," which means the node rejected sending the transaction to the network. The nonce has not been spent, and the transaction should be replaced. - - Only manipulates object, does not transaction or commit to backend. - """ - if self.__status_already_set(StatusBits.NODE_ERROR): - return - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - if self.status & StatusBits.IN_NETWORK: - SessionBase.release_session(session) - raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status))) - if is_error_status(self.status): - SessionBase.release_session(session) - raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status))) - - self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def override(self, manual=False, session=None): - """Marks transaction as manually overridden. - - Only manipulates object, does not transaction or commit to backend. - """ - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - if self.status & StatusBits.IN_NETWORK: - SessionBase.release_session(session) - raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status))) - if self.status & StatusBits.OBSOLETE: - SessionBase.release_session(session) - raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status))) - - self.__set_status(StatusBits.OBSOLETE, session) - #if manual: - # self.__set_status(StatusBits.MANUAL, session) - self.__reset_status(StatusBits.QUEUED | StatusBits.IN_NETWORK, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def manual(self, session=None): - - session = SessionBase.bind_session(session) - - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - - self.__set_status(StatusBits.MANUAL, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - def retry(self, session=None): - """Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding. - - Only manipulates object, does not transaction or commit to backend. - - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - if self.__status_already_set(StatusBits.QUEUED): - return - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0: - SessionBase.release_session(session) - raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status))) - - self.__set_status(StatusBits.QUEUED, session) - self.__reset_status(StatusBits.GAS_ISSUES, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def readysend(self, session=None): - """Marks transaction as ready for initial send attempt. - - Only manipulates object, does not transaction or commit to backend. - - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - if self.__status_already_set(StatusBits.QUEUED): - return - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - if is_error_status(self.status): - SessionBase.release_session(session) - raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status))) - - self.__set_status(StatusBits.QUEUED, session) - self.__reset_status(StatusBits.GAS_ISSUES, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def sent(self, session=None): - """Marks transaction as having been sent to network. - - Only manipulates object, does not transaction or commit to backend. - - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - if self.__status_already_set(StatusBits.IN_NETWORK): - return - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - - self.__set_status(StatusBits.IN_NETWORK, session) - self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def sendfail(self, session=None): - """Marks that an attempt to send the transaction to the network has failed. - - Only manipulates object, does not transaction or commit to backend. - - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - if self.__status_already_set(StatusBits.NODE_ERROR): - return - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - if self.status & StatusBits.IN_NETWORK: - SessionBase.release_session(session) - raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) - - self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session) - self.__reset_status(StatusBits.QUEUED | StatusBits.GAS_ISSUES, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def dequeue(self, session=None): - """Marks that a process to execute send attempt is underway - - Only manipulates object, does not transaction or commit to backend. - - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - if self.__status_not_set(StatusBits.QUEUED): - return - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('QUEUED cannot be unset on an entry with FINAL state set ({})'.format(status_str(self.status))) - if self.status & StatusBits.IN_NETWORK: - SessionBase.release_session(session) - raise TxStateChangeError('QUEUED cannot be unset on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) - - self.__reset_status(StatusBits.QUEUED, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - - def minefail(self, block, session=None): - """Marks that transaction was mined but code execution did not succeed. - - Only manipulates object, does not transaction or commit to backend. - - :param block: Block number transaction was mined in. - :type block: number - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - if self.__status_already_set(StatusBits.NETWORK_ERROR): - return - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - if not self.status & StatusBits.IN_NETWORK: - SessionBase.release_session(session) - raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status))) - - if block != None: - self.block = block - - self.__set_status(StatusBits.NETWORK_ERROR | StatusBits.FINAL, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def cancel(self, confirmed=False, session=None): - """Marks that the transaction has been succeeded by a new transaction with same nonce. - - If set to confirmed, the previous state must be OBSOLETED, and will transition to CANCELLED - a finalized state. Otherwise, the state must follow a non-finalized state, and will be set to OBSOLETED. - - Only manipulates object, does not transaction or commit to backend. - - :param confirmed: Whether transition is to a final state. - :type confirmed: bool - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - - if confirmed: - if self.status > 0 and not self.status & StatusBits.OBSOLETE: - SessionBase.release_session(session) - raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status))) - self.__set_status(StatusEnum.CANCELLED, session) - else: - self.__set_status(StatusEnum.OBSOLETED, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - def success(self, block, session=None): - """Marks that transaction was successfully mined. - - Only manipulates object, does not transaction or commit to backend. - - :param block: Block number transaction was mined in. - :type block: number - :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. - """ - - session = SessionBase.bind_session(session) - - if self.status & StatusBits.FINAL: - SessionBase.release_session(session) - raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) - if not self.status & StatusBits.IN_NETWORK: - SessionBase.release_session(session) - raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status))) - if is_error_status(self.status): - SessionBase.release_session(session) - raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status))) - - if block != None: - self.block = block - self.__set_status(StatusEnum.SUCCESS, session) - - if self.tracing: - self.__state_log(session=session) - - SessionBase.release_session(session) - - - @staticmethod - def get(status=0, limit=4096, status_exact=True, session=None): - """Returns outgoing transaction lists by status. - - Status may either be matched exactly, or be an upper bound of the integer value of the status enum. - - :param status: Status value to use in query - :type status: cic_eth.db.enum.StatusEnum - :param limit: Max results to return - :type limit: number - :param status_exact: Whether or not to perform exact status match - :type bool: - :returns: List of transaction hashes - :rtype: tuple, where first element is transaction hash - """ - e = None - - session = SessionBase.bind_session(session) - - if status_exact: - e = session.query(Otx.tx_hash).filter(Otx.status==status).order_by(Otx.date_created.asc()).limit(limit).all() - else: - e = session.query(Otx.tx_hash).filter(Otx.status<=status).order_by(Otx.date_created.asc()).limit(limit).all() - - SessionBase.release_session(session) - return e - - - @staticmethod - def load(tx_hash, session=None): - """Retrieves the outgoing transaction record by transaction hash. - - :param tx_hash: Transaction hash - :type tx_hash: str, 0x-hex - """ - session = SessionBase.bind_session(session) - - q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - - SessionBase.release_session(session) - - return q.first() - - - @staticmethod - def account(account_address): - """Retrieves all transaction hashes for which the given Ethereum address is sender or recipient. - - :param account_address: Ethereum address to use in query. - :type account_address: str, 0x-hex - :returns: Outgoing transactions - :rtype: tuple, where first element is transaction hash - """ - session = Otx.create_session() - q = session.query(Otx.tx_hash) - q = q.join(TxCache) - q = q.filter(or_(TxCache.sender==account_address, TxCache.recipient==account_address)) - txs = q.all() - session.close() - return list(txs) - - - def __state_log(self, session): - l = OtxStateLog(self) - session.add(l) - - - # TODO: it is not safe to return otx here unless session has been passed in - @staticmethod - def add(nonce, address, tx_hash, signed_tx, session=None): - external_session = session != None - - session = SessionBase.bind_session(session) - - otx = Otx(nonce, address, tx_hash, signed_tx) - session.add(otx) - session.flush() - if otx.tracing: - otx.__state_log(session=session) - session.flush() - - SessionBase.release_session(session) - - if not external_session: - return None - - return otx - - - def __init__(self, nonce, address, tx_hash, signed_tx): - self.nonce = nonce - self.tx_hash = tx_hash - self.signed_tx = signed_tx - self.status = StatusEnum.PENDING - signed_tx_bytes = bytes.fromhex(signed_tx[2:]) - - # sender_address = address_hex_from_signed_tx(signed_tx_bytes) - # logg.debug('decoded tx {}'.format(sender_address)) - - - -# TODO: Most of the methods on this object are obsolete, but it contains a static function for retrieving "expired" outgoing transactions that should be moved to Otx instead. -class OtxSync(SessionBase): - """Obsolete - """ - __tablename__ = 'otx_sync' - - blockchain = Column(String) - block_height_backlog = Column(Integer) - tx_height_backlog = Column(Integer) - block_height_session = Column(Integer) - tx_height_session = Column(Integer) - block_height_head = Column(Integer) - tx_height_head = Column(Integer) - date_created = Column(DateTime, default=datetime.datetime.utcnow) - date_updated = Column(DateTime) - - - def backlog(self, block_height=None, tx_height=None): - #session = OtxSync.create_session() - if block_height != None: - if tx_height == None: - raise ValueError('tx height missing') - self.block_height_backlog = block_height - self.tx_height_backlog = tx_height - #session.add(self) - self.date_updated = datetime.datetime.utcnow() - #session.commit() - block_height = self.block_height_backlog - tx_height = self.tx_height_backlog - #session.close() - return (block_height, tx_height) - - - def session(self, block_height=None, tx_height=None): - #session = OtxSync.create_session() - if block_height != None: - if tx_height == None: - raise ValueError('tx height missing') - self.block_height_session = block_height - self.tx_height_session = tx_height - #session.add(self) - self.date_updated = datetime.datetime.utcnow() - #session.commit() - block_height = self.block_height_session - tx_height = self.tx_height_session - #session.close() - return (block_height, tx_height) - - - def head(self, block_height=None, tx_height=None): - #session = OtxSync.create_session() - if block_height != None: - if tx_height == None: - raise ValueError('tx height missing') - self.block_height_head = block_height - self.tx_height_head = tx_height - #session.add(self) - self.date_updated = datetime.datetime.utcnow() - #session.commit() - block_height = self.block_height_head - tx_height = self.tx_height_head - #session.close() - return (block_height, tx_height) - - - @hybrid_property - def synced(self): - #return self.block_height_session == self.block_height_backlog and self.tx_height_session == self.block_height_backlog - return self.block_height_session == self.block_height_backlog and self.tx_height_session == self.tx_height_backlog - - - @staticmethod - def load(blockchain_string, session): - q = session.query(OtxSync) - q = q.filter(OtxSync.blockchain==blockchain_string) - return q.first() - - - @staticmethod - def latest(nonce): - session = SessionBase.create_session() - otx = session.query(Otx).filter(Otx.nonce==nonce).order_by(Otx.created.desc()).first() - session.close() - return otx - - - @staticmethod - def get_expired(datetime_threshold): - session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.date_created 0: + SessionBase.release_session(session) + raise LockedError(lock) + + tx_hash = create(chain_spec, nonce, holder_address, tx_hash, signed_tx, chain_spec, session=session) + + SessionBase.release_session(session) + + return tx_hash + + def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None): """Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING). @@ -55,12 +71,12 @@ def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=No tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex)) tx = unpack(tx_signed_raw, chain_id=chain_spec.chain_id()) - create( + tx_hash = queue_create( + chain_spec, tx['nonce'], tx['from'], tx_hash_hex, tx_signed_raw_hex, - chain_spec, session=session, ) diff --git a/apps/cic-eth/tests/fixtures_celery.py b/apps/cic-eth/tests/fixtures_celery.py index 47de8801..33cadc31 100644 --- a/apps/cic-eth/tests/fixtures_celery.py +++ b/apps/cic-eth/tests/fixtures_celery.py @@ -27,6 +27,9 @@ def celery_includes(): 'cic_eth.eth.tx', 'cic_eth.ext.tx', 'cic_eth.queue.tx', + 'cic_eth.queue.lock', + 'cic_eth.queue.query', + 'cic_eth.queue.state', 'cic_eth.queue.balance', 'cic_eth.admin.ctrl', 'cic_eth.admin.nonce', diff --git a/apps/cic-eth/tests/task/api/test_admin.py b/apps/cic-eth/tests/task/api/test_admin.py index 411eabcb..5e5e431b 100644 --- a/apps/cic-eth/tests/task/api/test_admin.py +++ b/apps/cic-eth/tests/task/api/test_admin.py @@ -16,27 +16,24 @@ from hexathon import ( strip_0x, add_0x, ) +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache +from chainqueue.db.enum import ( + StatusEnum, + StatusBits, + status_str, + ) +from chainqueue.query import get_tx # local imports from cic_eth.api import AdminApi from cic_eth.db.models.role import AccountRole -from cic_eth.db.models.otx import Otx -from cic_eth.db.models.tx import TxCache -from cic_eth.db.enum import ( - StatusEnum, - StatusBits, - status_str, - LockEnum, - ) +from cic_eth.db.enum import LockEnum from cic_eth.error import InitializationError from cic_eth.eth.tx import ( cache_gas_data, ) -#from cic_eth.eth.gas import cache_gas_tx -from cic_eth.queue.tx import ( - create as queue_create, - get_tx, - ) +from cic_eth.queue.tx import queue_create logg = logging.getLogger() @@ -278,7 +275,7 @@ def test_tx( eth_signer, agent_roles, contract_roles, - celery_session_worker, + celery_worker, ): chain_id = default_chain_spec.chain_id() @@ -286,7 +283,7 @@ def test_tx( c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED) tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id) - queue_create(tx['nonce'], agent_roles['ALICE'], tx_hash_hex, tx_signed_raw_hex, default_chain_spec, session=init_database) + queue_create(default_chain_spec, tx['nonce'], agent_roles['ALICE'], tx_hash_hex, tx_signed_raw_hex) cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['DEFAULT']) diff --git a/apps/cic-eth/tests/task/api/test_list.py b/apps/cic-eth/tests/task/api/test_list.py index 27f0bdf2..e886d233 100644 --- a/apps/cic-eth/tests/task/api/test_list.py +++ b/apps/cic-eth/tests/task/api/test_list.py @@ -1,23 +1,29 @@ # standard imports import logging -# local imports +# external imports +import pytest from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.erc20 import ERC20 from chainlib.eth.tx import receipt + +# local imports from cic_eth.api.api_task import Api -from tests.mock.filter import ( - block_filter, - tx_filter, - ) from cic_eth.db.models.nonce import ( Nonce, NonceReservation, ) +# test imports +from tests.mock.filter import ( + block_filter, + tx_filter, + ) + logg = logging.getLogger() +@pytest.mark.xfail() def test_list_tx( default_chain_spec, init_database, @@ -29,7 +35,7 @@ def test_list_tx( foo_token, register_tokens, init_eth_tester, - celery_session_worker, + celery_worker, ): chain_id = default_chain_spec.chain_id() @@ -110,7 +116,6 @@ def test_list_tx( r = t.get_leaf() assert t.successful() - assert len(r) == 3 logg.debug('rrrr {}'.format(r)) diff --git a/apps/cic-eth/tests/task/test_task_account.py b/apps/cic-eth/tests/task/test_task_account.py index d4e0ee6f..ce7f54ba 100644 --- a/apps/cic-eth/tests/task/test_task_account.py +++ b/apps/cic-eth/tests/task/test_task_account.py @@ -11,13 +11,12 @@ from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.tx import receipt from eth_accounts_index import AccountRegistry from hexathon import strip_0x +from chainqueue.db.enum import StatusEnum +from chainqueue.db.models.otx import Otx # local imports from cic_eth.error import OutOfGasError -from cic_eth.db.models.otx import Otx from cic_eth.db.models.base import SessionBase -from cic_eth.db.enum import StatusEnum -from cic_eth.db.enum import StatusEnum from cic_eth.db.models.nonce import Nonce from cic_eth.db.models.role import AccountRole @@ -77,6 +76,7 @@ def test_register_account( 'cic_eth.eth.tx.reserve_nonce', [ eth_empty_accounts[0], + default_chain_spec.asdict(), custodial_roles['ACCOUNT_REGISTRY_WRITER'], ], queue=None, @@ -171,6 +171,7 @@ def test_gift( 'cic_eth.eth.tx.reserve_nonce', [ agent_roles['ALICE'], + default_chain_spec.asdict(), ], queue=None, ) diff --git a/apps/cic-eth/tests/task/test_task_erc20.py b/apps/cic-eth/tests/task/test_task_erc20.py index 048ed71b..652c6a55 100644 --- a/apps/cic-eth/tests/task/test_task_erc20.py +++ b/apps/cic-eth/tests/task/test_task_erc20.py @@ -105,6 +105,7 @@ def test_erc20_transfer_task( 'cic_eth.eth.tx.reserve_nonce', [ [token_object], + default_chain_spec.asdict(), custodial_roles['FOO_TOKEN_GIFTER'], ], queue=None, @@ -146,6 +147,7 @@ def test_erc20_approve_task( 'cic_eth.eth.tx.reserve_nonce', [ [token_object], + default_chain_spec.asdict(), custodial_roles['FOO_TOKEN_GIFTER'], ], queue=None, diff --git a/apps/cic-eth/tests/task/test_task_tx.py b/apps/cic-eth/tests/task/test_task_tx.py index 99d1a3d0..81b0a5f6 100644 --- a/apps/cic-eth/tests/task/test_task_tx.py +++ b/apps/cic-eth/tests/task/test_task_tx.py @@ -13,16 +13,15 @@ from chainlib.eth.tx import ( receipt, ) from hexathon import strip_0x +from chainqueue.db.models.otx import Otx # local imports from cic_eth.queue.tx import register_tx from cic_eth.eth.tx import cache_gas_data -from cic_eth.db.models.otx import Otx logg = logging.getLogger() -@pytest.mark.skip() def test_tx_send( init_database, default_chain_spec, @@ -62,12 +61,11 @@ def test_tx_send( assert rcpt['status'] == 1 -@pytest.mark.skip() def test_sync_tx( default_chain_spec, eth_rpc, eth_signer, - celery_worker, + celery_session_worker, ): pass @@ -78,7 +76,7 @@ def test_resend_with_higher_gas( eth_rpc, eth_signer, agent_roles, - celery_worker, + celery_session_worker, ): chain_id = default_chain_spec.chain_id() @@ -102,7 +100,7 @@ def test_resend_with_higher_gas( r = t.get_leaf() q = init_database.query(Otx) - q = q.filter(Otx.tx_hash==r) + q = q.filter(Otx.tx_hash==strip_0x(r)) otx = q.first() if otx == None: raise NotLocalTxError(r) diff --git a/apps/cic-eth/tests/unit/db/test_block_sync.py b/apps/cic-eth/tests/unit/db/test_block_sync.py deleted file mode 100644 index 87fdee4f..00000000 --- a/apps/cic-eth/tests/unit/db/test_block_sync.py +++ /dev/null @@ -1,29 +0,0 @@ -# standard imports -import logging - -# local imports -from cic_eth.db.models.otx import OtxSync - -logg = logging.getLogger() - - -def test_db_block_sync( - init_database, - ): - - s = OtxSync('eip155:8995:bloxberg') - - s.head(666, 12) - assert s.head() == (666, 12) - - s.session(42, 13) - assert s.session() == (42, 13) - - s.backlog(13, 2) - assert s.backlog() == (13, 2) - - assert not s.synced - - s.backlog(42, 13) - assert s.backlog() == (42, 13) - assert s.synced diff --git a/apps/cic-eth/tests/unit/db/test_db_convert_transfer.py b/apps/cic-eth/tests/unit/db/test_db_convert_transfer.py index 15e766db..bc3b6e5a 100644 --- a/apps/cic-eth/tests/unit/db/test_db_convert_transfer.py +++ b/apps/cic-eth/tests/unit/db/test_db_convert_transfer.py @@ -3,7 +3,7 @@ import logging import pytest -from cic_eth.db import TxConvertTransfer +from cic_eth.db.models.convert import TxConvertTransfer from cic_eth.db.error import UnknownConvertError logg = logging.getLogger() diff --git a/apps/cic-eth/tests/unit/db/test_otx.py b/apps/cic-eth/tests/unit/db/test_otx.py deleted file mode 100644 index 00057a5e..00000000 --- a/apps/cic-eth/tests/unit/db/test_otx.py +++ /dev/null @@ -1,107 +0,0 @@ -# standard imports -import os -import logging - -# third-party imports -import pytest - -# local imports -from cic_eth.db.models.base import SessionBase -from cic_eth.db.models.otx import OtxStateLog -from cic_eth.db.models.otx import Otx -from cic_eth.db.enum import ( - StatusEnum, - StatusBits, - is_alive, - ) - -logg = logging.getLogger() - - -#def test_get( -# rpc_eth, -# rpc_signer, -# agent_roles, -# init_database, -# ): -# -# rpc = RPCConnection.connect(default_chain_spec, 'default') -# nonce_oracle = RPCNonceOracle(agent_roles['ALICE']) -# gas_oracle = RPCGasOracle(eth_rpc) -# c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) -# -# for i in range(10): -# -# (tx_hash_hex, tx_rpc) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)), -# -# tx_def = { -# 'from': init_w3.eth.accounts[0], -# 'to': init_w3.eth.accounts[1], -# 'nonce': 0, -# 'value': 101, -# 'gasPrice': 2000000000, -# 'gas': 21000, -# 'data': '', -# 'chainId': 1, -# } -# -# session = init_database -# txs = [] -# for i in range(10): -# nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0], 'pending') -# tx_def['nonce'] = nonce -# tx = init_w3.eth.sign_transaction(tx_def) -# tx_hash = init_w3.eth.send_raw_transaction(tx['raw']) -# logg.debug('tx {}'.format(tx)) -# -# address = init_w3.eth.accounts[i%3] -# otx = Otx(int((i/3)+1), address, '0x'+tx_hash.hex(), tx['raw']) -# txs.append(otx) -# session.add(otx) -# session.flush() -# -# logg.debug(txs) -# session.commit() -# -# txs[0].status = 0 -# session.add(txs[0]) -# session.commit() -# session.close() -# -# get_txs = Otx.get() -# logg.debug(get_txs) - - -def test_state_log( - init_database, - ): - - Otx.tracing = True - - address = '0x' + os.urandom(20).hex() - tx_hash = '0x' + os.urandom(32).hex() - signed_tx = '0x' + os.urandom(128).hex() - otx = Otx.add(0, address, tx_hash, signed_tx, session=init_database) - - otx.waitforgas(session=init_database) - init_database.commit() - - otx.readysend(session=init_database) - init_database.commit() - - otx.sent(session=init_database) - init_database.commit() - - otx.success(1024, session=init_database) - init_database.commit() - - q = init_database.query(OtxStateLog) - q = q.filter(OtxStateLog.otx_id==otx.id) - q = q.order_by(OtxStateLog.date.asc()) - logs = q.all() - - assert logs[0].status == StatusEnum.PENDING - assert logs[1].status == StatusEnum.WAITFORGAS - assert logs[2].status & StatusBits.QUEUED - assert logs[3].status & StatusBits.IN_NETWORK - assert not is_alive(logs[4].status) diff --git a/apps/cic-eth/tests/unit/db/test_status.py b/apps/cic-eth/tests/unit/db/test_status.py deleted file mode 100644 index f40cd6d8..00000000 --- a/apps/cic-eth/tests/unit/db/test_status.py +++ /dev/null @@ -1,97 +0,0 @@ -# standard imports -import os - -# third-party imports -import pytest - -# local imports -from cic_eth.db.models.otx import Otx -from cic_eth.db.enum import ( - StatusEnum, - StatusBits, - is_alive, - ) - - -@pytest.fixture(scope='function') -def otx( - init_database, - ): - - bogus_hash = '0x' + os.urandom(32).hex() - bogus_address = '0x' + os.urandom(20).hex() - bogus_tx_raw = '0x' + os.urandom(128).hex() - return Otx(0, bogus_address, bogus_hash, bogus_tx_raw) - - -def test_status_chain_gas( - init_database, - otx, - ): - - otx.waitforgas(init_database) - otx.readysend(init_database) - otx.sent(init_database) - otx.success(1024, init_database) - assert not is_alive(otx.status) - - -def test_status_chain_straight_success( - init_database, - otx, - ): - - otx.readysend(init_database) - otx.sent(init_database) - otx.success(1024, init_database) - assert not is_alive(otx.status) - - -def test_status_chain_straight_revert( - init_database, - otx, - ): - - otx.readysend(init_database) - otx.sent(init_database) - otx.minefail(1024, init_database) - assert not is_alive(otx.status) - - -def test_status_chain_nodeerror( - init_database, - otx, - ): - - otx.readysend(init_database) - otx.sendfail(init_database) - otx.retry(init_database) - otx.sent(init_database) - otx.success(1024, init_database) - assert not is_alive(otx.status) - - - -def test_status_chain_nodeerror_multiple( - init_database, - otx, - ): - - otx.readysend(init_database) - otx.sendfail(init_database) - otx.retry(init_database) - otx.sendfail(init_database) - otx.retry(init_database) - otx.sent(init_database) - otx.success(1024, init_database) - assert not is_alive(otx.status) - - -def test_status_chain_nodeerror( - init_database, - otx, - ): - - otx.readysend(init_database) - otx.reject(init_database) - assert not is_alive(otx.status) diff --git a/apps/cic-eth/tests/unit/db/test_tx.py b/apps/cic-eth/tests/unit/db/test_tx.py index c616bf77..db13e55c 100644 --- a/apps/cic-eth/tests/unit/db/test_tx.py +++ b/apps/cic-eth/tests/unit/db/test_tx.py @@ -18,10 +18,8 @@ from hexathon import ( add_0x, strip_0x, ) - -# local imports -from cic_eth.db.models.tx import TxCache -from cic_eth.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache +from chainqueue.db.models.otx import Otx # test imports from tests.util.gas import StaticGasOracle @@ -46,7 +44,6 @@ def test_set( otx = Otx( tx['nonce'], - tx['from'], tx_hash_hex, tx_signed_raw_hex, ) @@ -66,6 +63,7 @@ def test_set( to_value, 666, 13, + session=init_database, ) init_database.add(txc) init_database.commit() @@ -112,7 +110,6 @@ def test_clone( tx_dict = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id) otx = Otx( tx_dict['nonce'], - tx_dict['from'], tx_hash_hex, tx_signed_raw_hex, ) @@ -130,15 +127,16 @@ def test_clone( ZERO_ADDRESS, txs[0]['value'], txs[0]['value'], + session=init_database, ) init_database.add(txc) init_database.commit() - TxCache.clone(txs[0]['hash'], txs[1]['hash']) + TxCache.clone(txs[0]['hash'], txs[1]['hash'], session=init_database) q = init_database.query(TxCache) q = q.join(Otx) - q = q.filter(Otx.tx_hash==txs[1]['hash']) + q = q.filter(Otx.tx_hash==strip_0x(txs[1]['hash'])) txc_clone = q.first() assert txc_clone != None diff --git a/apps/cic-eth/tests/unit/queue/test_balances.py b/apps/cic-eth/tests/unit/queue/test_balances.py index 8d028b75..2a372314 100644 --- a/apps/cic-eth/tests/unit/queue/test_balances.py +++ b/apps/cic-eth/tests/unit/queue/test_balances.py @@ -59,17 +59,15 @@ def test_assemble(): assert r[1].get('balance_xyzzy') != None -@pytest.mark.skip() def test_outgoing_balance( default_chain_spec, init_database, ): - chain_str = str(default_chain_spec) recipient = '0x' + os.urandom(20).hex() tx_hash = '0x' + os.urandom(32).hex() signed_tx = '0x' + os.urandom(128).hex() - otx = Otx.add(0, recipient, tx_hash, signed_tx, session=init_database) + otx = Otx.add(0, tx_hash, signed_tx, session=init_database) init_database.add(otx) init_database.commit() @@ -83,6 +81,7 @@ def test_outgoing_balance( token_address, 1000, 1000, + session=init_database, ) init_database.add(txc) init_database.commit() @@ -91,33 +90,35 @@ def test_outgoing_balance( 'address': token_address, 'converters': [], } - b = balance_outgoing([token_data], sender, chain_str) + b = balance_outgoing([token_data], sender, default_chain_spec.asdict()) assert b[0]['balance_outgoing'] == 1000 + otx.readysend(session=init_database) + init_database.flush() + otx.reserve(session=init_database) + init_database.flush() otx.sent(session=init_database) init_database.commit() - b = balance_outgoing([token_data], sender, chain_str) + b = balance_outgoing([token_data], sender, default_chain_spec.asdict()) assert b[0]['balance_outgoing'] == 1000 otx.success(block=1024, session=init_database) init_database.commit() - b = balance_outgoing([token_data], sender, chain_str) + b = balance_outgoing([token_data], sender, default_chain_spec.asdict()) assert b[0]['balance_outgoing'] == 0 -@pytest.mark.skip() def test_incoming_balance( default_chain_spec, init_database, ): - chain_str = str(default_chain_spec) recipient = '0x' + os.urandom(20).hex() tx_hash = '0x' + os.urandom(32).hex() signed_tx = '0x' + os.urandom(128).hex() - otx = Otx.add(0, recipient, tx_hash, signed_tx, session=init_database) + otx = Otx.add(0, tx_hash, signed_tx, session=init_database) init_database.add(otx) init_database.commit() @@ -131,6 +132,7 @@ def test_incoming_balance( token_address, 1000, 1000, + session=init_database, ) init_database.add(txc) init_database.commit() @@ -139,19 +141,23 @@ def test_incoming_balance( 'address': token_address, 'converters': [], } - b = balance_incoming([token_data], recipient, chain_str) + b = balance_incoming([token_data], recipient, default_chain_spec.asdict()) assert b[0]['balance_incoming'] == 0 + otx.readysend(session=init_database) + init_database.flush() + otx.reserve(session=init_database) + init_database.flush() otx.sent(session=init_database) init_database.commit() - b = balance_incoming([token_data], recipient, chain_str) + b = balance_incoming([token_data], recipient, default_chain_spec.asdict()) assert b[0]['balance_incoming'] == 1000 otx.success(block=1024, session=init_database) init_database.commit() - b = balance_incoming([token_data], recipient, chain_str) + b = balance_incoming([token_data], recipient, default_chain_spec.asdict()) assert b[0]['balance_incoming'] == 0 diff --git a/apps/cic-eth/tests/unit/queue/test_query.py b/apps/cic-eth/tests/unit/queue/test_query.py new file mode 100644 index 00000000..59205786 --- /dev/null +++ b/apps/cic-eth/tests/unit/queue/test_query.py @@ -0,0 +1,58 @@ +# external imports +from chainqueue.db.enum import ( + StatusEnum, + StatusBits, + ) +from chainlib.connection import RPCConnection +from chainlib.eth.gas import ( + RPCGasOracle, + Gas, + ) +from chainlib.chain import ChainSpec + +# local imports +from cic_eth.db.enum import LockEnum +from cic_eth.db.models.lock import Lock +from cic_eth.queue.query import get_upcoming_tx +from cic_eth.queue.tx import register_tx +from cic_eth.eth.tx import cache_gas_data + +# test imports +from tests.util.nonce import StaticNonceOracle + + +def test_upcoming_with_lock( + default_chain_spec, + init_database, + eth_rpc, + eth_signer, + agent_roles, + ): + + rpc = RPCConnection.connect(default_chain_spec, 'default') + nonce_oracle = StaticNonceOracle(42) + gas_oracle = RPCGasOracle(eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) + + (tx_hash_hex, tx_rpc) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)) + tx_signed_raw_hex = tx_rpc['params'][0] + + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + txs = get_upcoming_tx(default_chain_spec, StatusEnum.PENDING) + assert len(txs.keys()) == 1 + + Lock.set(str(default_chain_spec), LockEnum.SEND, address=agent_roles['ALICE']) + + txs = get_upcoming_tx(default_chain_spec, StatusEnum.PENDING) + assert len(txs.keys()) == 0 + + (tx_hash_hex, tx_rpc) = c.create(agent_roles['BOB'], agent_roles['ALICE'], 100 * (10 ** 6)) + tx_signed_raw_hex = tx_rpc['params'][0] + + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + + txs = get_upcoming_tx(default_chain_spec, StatusEnum.PENDING) + assert len(txs.keys()) == 1 diff --git a/apps/cic-eth/tests/unit/queue/test_queue_lock.py b/apps/cic-eth/tests/unit/queue/test_queue_lock.py index 4ca65ab2..7b0a6e5b 100644 --- a/apps/cic-eth/tests/unit/queue/test_queue_lock.py +++ b/apps/cic-eth/tests/unit/queue/test_queue_lock.py @@ -3,12 +3,12 @@ import os # third-party imports import pytest -from chainqueue.tx import create as queue_create # local imports from cic_eth.db.models.lock import Lock from cic_eth.db.enum import LockEnum from cic_eth.error import LockedError +from cic_eth.queue.tx import queue_create def test_queue_lock( @@ -25,41 +25,41 @@ def test_queue_lock( Lock.set(chain_str, LockEnum.QUEUE) with pytest.raises(LockedError): queue_create( + default_chain_spec, 0, address, tx_hash, tx_raw, - chain_str ) Lock.set(chain_str, LockEnum.QUEUE, address=address) with pytest.raises(LockedError): queue_create( + default_chain_spec, 0, address, tx_hash, tx_raw, - chain_str ) Lock.reset(chain_str, LockEnum.QUEUE) with pytest.raises(LockedError): queue_create( + default_chain_spec, 0, address, tx_hash, tx_raw, - chain_str ) Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash) with pytest.raises(LockedError): queue_create( + default_chain_spec, 0, address, tx_hash, tx_raw, - chain_str )