diff --git a/apps/cic-eth/cic_eth/api/api_task.py b/apps/cic-eth/cic_eth/api/api_task.py index 8522ffbd..6e67b700 100644 --- a/apps/cic-eth/cic_eth/api/api_task.py +++ b/apps/cic-eth/cic_eth/api/api_task.py @@ -92,6 +92,11 @@ class Api: ], queue=self.queue, ) + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [], + queue=self.queue, + ) s_tokens = celery.signature( 'cic_eth.eth.token.resolve_tokens_by_symbol', [ @@ -110,7 +115,8 @@ class Api: ], queue=self.queue, ) - s_check.link(s_tokens) + s_nonce.link(s_tokens) + s_check.link(s_nonce) if self.callback_param != None: s_convert.link(self.callback_success) s_tokens.link(s_convert).on_error(self.callback_error) @@ -147,6 +153,11 @@ class Api: ], queue=self.queue, ) + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [], + queue=self.queue, + ) s_tokens = celery.signature( 'cic_eth.eth.token.resolve_tokens_by_symbol', [ @@ -165,7 +176,8 @@ class Api: ], queue=self.queue, ) - s_check.link(s_tokens) + s_nonce.link(s_tokens) + s_check.link(s_nonce) if self.callback_param != None: s_convert.link(self.callback_success) s_tokens.link(s_convert).on_error(self.callback_error) @@ -200,6 +212,13 @@ class Api: ], queue=self.queue, ) + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [ + from_address, + ], + queue=self.queue, + ) s_tokens = celery.signature( 'cic_eth.eth.token.resolve_tokens_by_symbol', [ @@ -217,7 +236,8 @@ class Api: ], queue=self.queue, ) - s_check.link(s_tokens) + s_nonce.link(s_tokens) + s_check.link(s_nonce) if self.callback_param != None: s_transfer.link(self.callback_success) s_tokens.link(s_transfer).on_error(self.callback_error) @@ -228,82 +248,6 @@ class Api: return t - def transfer_request(self, from_address, to_address, spender_address, value, token_symbol): - """Executes a chain of celery tasks that issues a transfer request of ERC20 tokens from one address to another. - - :param from_address: Ethereum address of sender - :type from_address: str, 0x-hex - :param to_address: Ethereum address of recipient - :type to_address: str, 0x-hex - :param spender_address: Ethereum address that is executing transfer (typically an escrow contract) - :type spender_address: str, 0x-hex - :param value: Estimated return from conversion - :type value: int - :param token_symbol: ERC20 token symbol of token to send - :type token_symbol: str - :returns: uuid of root task - :rtype: celery.Task - """ - s_check = celery.signature( - 'cic_eth.admin.ctrl.check_lock', - [ - [token_symbol], - self.chain_str, - LockEnum.QUEUE, - from_address, - ], - queue=self.queue, - ) - s_tokens_transfer_approval = celery.signature( - 'cic_eth.eth.token.resolve_tokens_by_symbol', - [ - self.chain_str, - ], - queue=self.queue, - ) - s_tokens_approve = celery.signature( - 'cic_eth.eth.token.resolve_tokens_by_symbol', - [ - self.chain_str, - ], - queue=self.queue, - ) - s_approve = celery.signature( - 'cic_eth.eth.token.approve', - [ - from_address, - spender_address, - value, - self.chain_str, - ], - queue=self.queue, - ) - s_transfer_approval = celery.signature( - 'cic_eth.eth.request.transfer_approval_request', - [ - from_address, - to_address, - value, - self.chain_str, - ], - queue=self.queue, - ) - # TODO: make approve and transfer_approval chainable so callback can be part of the full chain - if self.callback_param != None: - s_transfer_approval.link(self.callback_success) - s_tokens_approve.link(s_approve) - s_tokens_transfer_approval.link(s_transfer_approval).on_error(self.callback_error) - else: - s_tokens_approve.link(s_approve) - s_tokens_transfer_approval.link(s_transfer_approval) - - g = celery.group(s_tokens_approve, s_tokens_transfer_approval) #s_tokens.apply_async(queue=self.queue) - s_check.link(g) - t = s_check.apply_async() - #t = s_tokens.apply_async(queue=self.queue) - return t - - def balance(self, address, token_symbol, include_pending=True): """Calls the provided callback with the current token balance of the given address. @@ -396,6 +340,11 @@ class Api: ], queue=self.queue, ) + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [], + queue=self.queue, + ) s_account = celery.signature( 'cic_eth.eth.account.create', [ @@ -403,7 +352,8 @@ class Api: ], queue=self.queue, ) - s_check.link(s_account) + s_nonce.link(s_account) + s_check.link(s_nonce) if self.callback_param != None: s_account.link(self.callback_success) @@ -438,6 +388,11 @@ class Api: ], queue=self.queue, ) + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [], + queue=self.queue, + ) s_refill = celery.signature( 'cic_eth.eth.tx.refill_gas', [ @@ -445,7 +400,8 @@ class Api: ], queue=self.queue, ) - s_check.link(s_refill) + s_nonce.link(s_refill) + s_check.link(s_nonce) if self.callback_param != None: s_refill.link(self.callback_success) diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py new file mode 100644 index 00000000..f8ebee83 --- /dev/null +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py @@ -0,0 +1,30 @@ +"""Nonce reservation + +Revision ID: 3b693afd526a +Revises: f738d9962fdf +Create Date: 2021-03-05 07:09:50.898728 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '3b693afd526a' +down_revision = 'f738d9962fdf' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + 'nonce_task_reservation', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('nonce', sa.Integer, nullable=False), + sa.Column('key', sa.String, nullable=False), + sa.Column('date_created', sa.DateTime, nullable=False), + ) + + +def downgrade(): + op.drop_table('nonce_task_reservation') diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py new file mode 100644 index 00000000..f8ebee83 --- /dev/null +++ b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py @@ -0,0 +1,30 @@ +"""Nonce reservation + +Revision ID: 3b693afd526a +Revises: f738d9962fdf +Create Date: 2021-03-05 07:09:50.898728 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '3b693afd526a' +down_revision = 'f738d9962fdf' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + 'nonce_task_reservation', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('nonce', sa.Integer, nullable=False), + sa.Column('key', sa.String, nullable=False), + sa.Column('date_created', sa.DateTime, nullable=False), + ) + + +def downgrade(): + op.drop_table('nonce_task_reservation') diff --git a/apps/cic-eth/cic_eth/db/models/base.py b/apps/cic-eth/cic_eth/db/models/base.py index 353c67cc..da5d019a 100644 --- a/apps/cic-eth/cic_eth/db/models/base.py +++ b/apps/cic-eth/cic_eth/db/models/base.py @@ -54,7 +54,7 @@ class SessionBase(Model): @staticmethod - def connect(dsn, pool_size=8, debug=False): + def connect(dsn, pool_size=16, debug=False): """Create new database connection engine and connect to database backend. :param dsn: DSN string defining connection. diff --git a/apps/cic-eth/cic_eth/db/models/nonce.py b/apps/cic-eth/cic_eth/db/models/nonce.py index 3b474a98..42bb8fd3 100644 --- a/apps/cic-eth/cic_eth/db/models/nonce.py +++ b/apps/cic-eth/cic_eth/db/models/nonce.py @@ -1,11 +1,16 @@ # standard imports import logging +import datetime # third-party imports -from sqlalchemy import Column, String, Integer +from sqlalchemy import Column, String, Integer, DateTime # local imports from .base import SessionBase +from cic_eth.error import ( + InitializationError, + IntegrityError, + ) logg = logging.getLogger() @@ -37,23 +42,43 @@ class Nonce(SessionBase): @staticmethod - def __get(session, address): - r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address)) + def __get(conn, address): + r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address)) nonce = r.fetchone() - session.flush() if nonce == None: return None return nonce[0] @staticmethod - def __set(session, address, nonce): - session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address)) - session.flush() + def __set(conn, address, nonce): + conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address)) @staticmethod - def next(address, initial_if_not_exists=0, session=None): + def __init(conn, address, nonce): + conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address)) + + + @staticmethod + def init(address, nonce=0, session=None): + session = SessionBase.bind_session(session) + + q = session.query(Nonce) + q = q.filter(Nonce.address_hex==address) + o = q.first() + if o != None: + session.flush() + raise InitializationError('nonce on {} already exists ({})'.format(address, o.nonce)) + session.flush() + Nonce.__init(session, address, nonce) + + SessionBase.release_session(session) + + + # TODO: Incrementing nonce MUST be done by separate tasks. + @staticmethod + def next(address, initial_if_not_exists=0): """Generate next nonce for the given address. If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given. @@ -65,32 +90,97 @@ class Nonce(SessionBase): :returns: Nonce :rtype: number """ - session = SessionBase.bind_session(session) + #session = SessionBase.bind_session(session) - SessionBase.release_session(session) - - session.begin_nested() - #conn = Nonce.engine.connect() + #session.begin_nested() + conn = Nonce.engine.connect() if Nonce.transactional: - #session.execute('BEGIN') - session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE') - session.flush() - nonce = Nonce.__get(session, address) + conn.execute('BEGIN') + conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE') + logg.debug('locking nonce table for address {}'.format(address)) + nonce = Nonce.__get(conn, address) logg.debug('get nonce {} for address {}'.format(nonce, address)) if nonce == None: nonce = initial_if_not_exists - session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address)) - session.flush() logg.debug('setting default nonce to {} for address {}'.format(nonce, address)) - Nonce.__set(session, address, nonce+1) - #if Nonce.transactional: - #session.execute('COMMIT') - #session.execute('UNLOCK TABLE nonce') - #conn.close() - session.commit() -# session.commit() + Nonce.__init(conn, address, nonce) + Nonce.__set(conn, address, nonce+1) + if Nonce.transactional: + conn.execute('COMMIT') + logg.debug('unlocking nonce table for address {}'.format(address)) + conn.close() + #session.commit() - SessionBase.release_session(session) + #SessionBase.release_session(session) return nonce + +class NonceReservation(SessionBase): + + __tablename__ = 'nonce_task_reservation' + + nonce = Column(Integer) + key = Column(String) + date_created = Column(DateTime, default=datetime.datetime.utcnow) + + + @staticmethod + def peek(key, session=None): + session = SessionBase.bind_session(session) + + q = session.query(NonceReservation) + q = q.filter(NonceReservation.key==key) + o = q.first() + + nonce = None + if o != None: + nonce = o.nonce + + session.flush() + + SessionBase.release_session(session) + + return nonce + + + @staticmethod + def release(key, session=None): + + session = SessionBase.bind_session(session) + + nonce = NonceReservation.peek(key, session=session) + + q = session.query(NonceReservation) + q = q.filter(NonceReservation.key==key) + o = q.first() + + if o == None: + raise IntegrityError('nonce for key {}'.format(nonce)) + SessionBase.release_session(session) + + session.delete(o) + session.flush() + + SessionBase.release_session(session) + + return nonce + + + @staticmethod + def next(address, key, session=None): + session = SessionBase.bind_session(session) + + if NonceReservation.peek(key, session) != None: + raise IntegrityError('nonce for key {}'.format(key)) + + nonce = Nonce.next(address) + + o = NonceReservation() + o.nonce = nonce + o.key = key + session.add(o) + + SessionBase.release_session(session) + + return nonce diff --git a/apps/cic-eth/cic_eth/db/models/tx.py b/apps/cic-eth/cic_eth/db/models/tx.py index 34abbd69..c71e2e95 100644 --- a/apps/cic-eth/cic_eth/db/models/tx.py +++ b/apps/cic-eth/cic_eth/db/models/tx.py @@ -143,7 +143,7 @@ class TxCache(SessionBase): self.block_number = block_number self.tx_index = tx_index # not automatically set in sqlite, it seems: - self.date_created = datetime.datetime.now() + self.date_created = datetime.datetime.utcnow() self.date_updated = self.date_created self.date_checked = self.date_created diff --git a/apps/cic-eth/cic_eth/error.py b/apps/cic-eth/cic_eth/error.py index 58afdc87..1096c4c5 100644 --- a/apps/cic-eth/cic_eth/error.py +++ b/apps/cic-eth/cic_eth/error.py @@ -54,6 +54,13 @@ class RoleMissingError(Exception): pass +class IntegrityError(Exception): + """Exception raised to signal irregularities with deduplication and ordering of tasks + + """ + pass + + class LockedError(Exception): """Exception raised when attempt is made to execute action that is deactivated by lock diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index cd6b3d47..8efe4e55 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -36,6 +36,7 @@ class AccountTxFactory(TxFactory): self, address, chain_spec, + uuid, session=None, ): """Register an Ethereum account address with the on-chain account registry @@ -59,7 +60,7 @@ class AccountTxFactory(TxFactory): 'gas': gas, 'gasPrice': self.gas_price, 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(session=session), + 'nonce': self.next_nonce(uuid, session=session), 'value': 0, }) return tx_add @@ -69,6 +70,7 @@ class AccountTxFactory(TxFactory): self, address, chain_spec, + uuid, session=None, ): """Trigger the on-chain faucet to disburse tokens to the provided Ethereum account @@ -90,7 +92,7 @@ class AccountTxFactory(TxFactory): 'gas': gas, 'gasPrice': self.gas_price, 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(session=session), + 'nonce': self.next_nonce(uuid, session=session), 'value': 0, }) return tx_add @@ -156,19 +158,9 @@ def create(password, chain_str): logg.debug('created account {}'.format(a)) # Initialize nonce provider record for account - # TODO: this can safely be set to zero, since we are randomly creating account - n = c.w3.eth.getTransactionCount(a, 'pending') session = SessionBase.create_session() - q = session.query(Nonce) - q = q.filter(Nonce.address_hex==a) - o = q.first() - session.flush() - if o == None: - o = Nonce() - o.address_hex = a - o.nonce = n - session.add(o) - session.commit() + Nonce.init(a, session=session) + session.commit() session.close() return a @@ -203,7 +195,7 @@ def register(self, account_address, chain_str, writer_address=None): c = RpcClient(chain_spec, holder_address=writer_address) txf = AccountTxFactory(writer_address, c) - tx_add = txf.add(account_address, chain_spec, session=session) + tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session) session.close() @@ -243,7 +235,7 @@ def gift(self, account_address, chain_str): txf = AccountTxFactory(account_address, c) session = SessionBase.create_session() - tx_add = txf.gift(account_address, chain_spec, session=session) + tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session) session.close() diff --git a/apps/cic-eth/cic_eth/eth/factory.py b/apps/cic-eth/cic_eth/eth/factory.py index 76a15fbc..eb44eb79 100644 --- a/apps/cic-eth/cic_eth/eth/factory.py +++ b/apps/cic-eth/cic_eth/eth/factory.py @@ -32,10 +32,10 @@ class TxFactory: logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price)) - def next_nonce(self, session=None): - """Returns the current cached nonce value, and increments it for next transaction. + def next_nonce(self, uuid, session=None): + """Returns the current reserved nonce value, and increments it for next transaction. :returns: Nonce :rtype: number """ - return self.nonce_oracle.next(session=session) + return self.nonce_oracle.next_by_task_uuid(uuid, session=session) diff --git a/apps/cic-eth/cic_eth/eth/nonce.py b/apps/cic-eth/cic_eth/eth/nonce.py index e8c96b90..79e00400 100644 --- a/apps/cic-eth/cic_eth/eth/nonce.py +++ b/apps/cic-eth/cic_eth/eth/nonce.py @@ -1,5 +1,8 @@ # local imports -from cic_eth.db.models.nonce import Nonce +from cic_eth.db.models.nonce import ( + Nonce, + NonceReservation, + ) class NonceOracle(): """Ensures atomic nonce increments for all transactions across all tasks and threads. @@ -14,10 +17,15 @@ class NonceOracle(): self.default_nonce = default_nonce - def next(self, session=None): + def next(self): """Get next unique nonce. :returns: Nonce :rtype: number """ - return Nonce.next(self.address, self.default_nonce, session=session) + raise AttributeError('this should not be called') + return Nonce.next(self.address, self.default_nonce) + + + def next_by_task_uuid(self, uuid, session=None): + return NonceReservation.release(uuid, session=session) diff --git a/apps/cic-eth/cic_eth/eth/token.py b/apps/cic-eth/cic_eth/eth/token.py index cb09baf2..10f38bff 100644 --- a/apps/cic-eth/cic_eth/eth/token.py +++ b/apps/cic-eth/cic_eth/eth/token.py @@ -46,6 +46,7 @@ class TokenTxFactory(TxFactory): spender_address, amount, chain_spec, + uuid, session=None, ): """Create an ERC20 "approve" transaction @@ -74,7 +75,7 @@ class TokenTxFactory(TxFactory): 'gas': source_token_gas, 'gasPrice': self.gas_price, 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(session=session), + 'nonce': self.next_nonce(uuid, session=session), }) return tx_approve @@ -85,6 +86,7 @@ class TokenTxFactory(TxFactory): receiver_address, value, chain_spec, + uuid, session=None, ): """Create an ERC20 "transfer" transaction @@ -114,7 +116,7 @@ class TokenTxFactory(TxFactory): 'gas': source_token_gas, 'gasPrice': self.gas_price, 'chainId': chain_spec.chain_id(), - 'nonce': self.next_nonce(session=session), + 'nonce': self.next_nonce(uuid, session=session), }) return tx_transfer @@ -248,7 +250,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str): txf = TokenTxFactory(holder_address, c) session = SessionBase.create_session() - tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, session=session) + tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, self.request.root_id, session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session) session.close() @@ -304,7 +306,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str): txf = TokenTxFactory(holder_address, c) session = SessionBase.create_session() - tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, session=session) + tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, self.request.root_id, session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session) session.close() diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index 4de09734..74c6c165 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -12,6 +12,7 @@ from cic_registry.chain import ChainSpec from .rpc import RpcClient from cic_eth.db import Otx, SessionBase from cic_eth.db.models.tx import TxCache +from cic_eth.db.models.nonce import NonceReservation from cic_eth.db.models.lock import Lock from cic_eth.db.enum import ( LockEnum, @@ -84,15 +85,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required)) if gas_required > balance: + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [ + address, + c.gas_provider(), + ], + queue=queue, + ) s_refill_gas = celery.signature( 'cic_eth.eth.tx.refill_gas', [ - address, chain_str, ], queue=queue, ) - s_refill_gas.apply_async() + s_nonce.link(s_refill_gas) + s_nonce.apply_async() wait_tasks = [] for tx_hash in tx_hashes: s = celery.signature( @@ -108,15 +117,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non safe_gas = c.safe_threshold_amount() if balance < safe_gas: + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce' + [ + address, + c.gas_provider(), + ], + queue=queue, + ) s_refill_gas = celery.signature( 'cic_eth.eth.tx.refill_gas', [ - address, chain_str, ], queue=queue, ) - s_refill_gas.apply_async() + s_nonce.link(s_refill) + s_nonce.apply_async() logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address)) ready_tasks = [] for tx_hash in tx_hashes: @@ -291,7 +308,7 @@ class ParityNodeHandler: tx_hash_hex, debugstr, ], - queue=queue, + queue=self.queue, ) s_set_reject.link(s_debug) s_lock.link(s_set_reject) @@ -299,7 +316,7 @@ class ParityNodeHandler: return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()))) - def handle_default(self, tx_hash_hex, tx_hex): + def handle_default(self, tx_hash_hex, tx_hex, debugstr): tx_bytes = bytes.fromhex(tx_hex[2:]) tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id()) s_lock = celery.signature( @@ -317,9 +334,19 @@ class ParityNodeHandler: [], queue=self.queue, ) + s_debug = celery.signature( + 'cic_eth.admin.debug.alert', + [ + tx_hash_hex, + tx_hash_hex, + debugstr, + ], + queue=self.queue, + ) + s_set_fubar.link(s_debug) s_lock.link(s_set_fubar) t = s_lock.apply_async() - return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()))) + return (t, PermanentTxError, 'Fubar {} {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()), debugstr)) # TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic. @@ -407,6 +434,7 @@ def refill_gas(self, recipient_address, chain_str): """ chain_spec = ChainSpec.from_chain_str(chain_str) + zero_amount = False session = SessionBase.create_session() status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR q = session.query(Otx.tx_hash) @@ -416,8 +444,11 @@ def refill_gas(self, recipient_address, chain_str): q = q.filter(TxCache.recipient==recipient_address) c = q.count() if c > 0: - session.close() - raise AlreadyFillingGasError(recipient_address) + #session.close() + #raise AlreadyFillingGasError(recipient_address) + logg.warning(str(AlreadyFillingGasError(recipient_address))) + zero_amount = True + session.flush() queue = self.request.delivery_info['routing_key'] @@ -426,10 +457,13 @@ def refill_gas(self, recipient_address, chain_str): logg.debug('refill gas from provider address {}'.format(c.gas_provider())) default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending') nonce_generator = NonceOracle(c.gas_provider(), default_nonce) - nonce = nonce_generator.next(session=session) + #nonce = nonce_generator.next(session=session) + nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session) gas_price = c.gas_price() gas_limit = c.default_gas_limit - refill_amount = c.refill_amount() + refill_amount = 0 + if not zero_amount: + refill_amount = c.refill_amount() logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce)) # create and sign transaction @@ -475,6 +509,7 @@ def refill_gas(self, recipient_address, chain_str): queue=queue, ) celery.group(s_tx_cache, s_status)() + return tx_send_gas_signed['raw'] @@ -554,6 +589,21 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa return tx_hash_hex +@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) +def reserve_nonce(self, chained_input, address=None): + session = SessionBase.create_session() + + if address == None: + address = chained_input + + root_id = self.request.root_id + nonce = NonceReservation.next(address, root_id) + + session.close() + + return chained_input + + @celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task) def sync_tx(self, tx_hash_hex, chain_str): """Force update of network status of a simgle transaction diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index c1fe0a3f..ec6954fa 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -315,7 +315,9 @@ def set_ready(tx_hash): @celery_app.task(base=CriticalSQLAlchemyTask) def set_dequeue(tx_hash): session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + q = session.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + o = q.first() if o == None: session.close() raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) @@ -566,7 +568,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0, session=None): return txs -def get_status_tx(status, before=None, exact=False, limit=0, session=None): +def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, session=None): """Retrieve transaction with a specific queue status. :param status: Status to match transactions with @@ -582,11 +584,15 @@ def get_status_tx(status, before=None, exact=False, limit=0, session=None): session = SessionBase.bind_session(session) q = session.query(Otx) q = q.join(TxCache) - q = q.filter(TxCache.date_updated0) + if not_status != None: + q = q.filter(Otx.status.op('&')(not_status)==0) i = 0 for o in q.all(): if limit > 0 and i == limit: diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index 98decc5c..a4d695b8 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -121,6 +121,7 @@ class DispatchSyncer: set_dequeue(tx['hash']) except NotLocalTxError as e: logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash'])) + continue s_check = celery.signature( 'cic_eth.admin.ctrl.check_lock', diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index b59f6d05..7abf07ff 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -138,7 +138,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_str): # TODO: can we merely use the dispatcher instead? def dispatch(chain_str): - txs = get_status_tx(StatusEnum.RETRY, datetime.datetime.utcnow()) + txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow()) if len(txs) == 0: logg.debug('no retry state txs found') return diff --git a/apps/cic-eth/cic_eth/sync/retry.py b/apps/cic-eth/cic_eth/sync/retry.py index 302e4c75..57bd7e99 100644 --- a/apps/cic-eth/cic_eth/sync/retry.py +++ b/apps/cic-eth/cic_eth/sync/retry.py @@ -9,7 +9,10 @@ import celery # local imports from .base import Syncer from cic_eth.eth.rpc import RpcClient -from cic_eth.db.enum import StatusEnum +from cic_eth.db.enum import ( + StatusEnum, + StatusBits, + ) from cic_eth.queue.tx import get_status_tx logg = logging.getLogger() @@ -47,7 +50,8 @@ class RetrySyncer(Syncer): # ) before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) stalled_txs = get_status_tx( - StatusEnum.SENT.value, + StatusBits.IN_NETWORK.value, + not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, before=before, ) # return list(failed_txs.keys()) + list(stalled_txs.keys()) diff --git a/apps/cic-eth/tests/fixtures_database.py b/apps/cic-eth/tests/fixtures_database.py index b4fddd8c..604a87a4 100644 --- a/apps/cic-eth/tests/fixtures_database.py +++ b/apps/cic-eth/tests/fixtures_database.py @@ -27,7 +27,7 @@ def database_engine( SessionBase.poolable = False dsn = dsn_from_config(load_config) #SessionBase.connect(dsn, True) - SessionBase.connect(dsn, load_config.get('DATABASE_DEBUG') != None) + SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None) return dsn diff --git a/apps/cic-eth/tests/fixtures_web3.py b/apps/cic-eth/tests/fixtures_web3.py index 44999ff9..406655b8 100644 --- a/apps/cic-eth/tests/fixtures_web3.py +++ b/apps/cic-eth/tests/fixtures_web3.py @@ -15,6 +15,7 @@ from eth_keys import KeyAPI from cic_eth.eth import RpcClient from cic_eth.eth.rpc import GasOracle from cic_eth.db.models.role import AccountRole +from cic_eth.db.models.nonce import Nonce #logg = logging.getLogger(__name__) logg = logging.getLogger() @@ -128,8 +129,10 @@ def init_eth_account_roles( w3_account_roles, ): - role = AccountRole.set('GAS_GIFTER', w3_account_roles.get('eth_account_gas_provider')) + address = w3_account_roles.get('eth_account_gas_provider') + role = AccountRole.set('GAS_GIFTER', address) init_database.add(role) + Nonce.init(address, session=init_database) init_database.commit() return w3_account_roles @@ -187,6 +190,7 @@ def w3_account_roles( return roles + @pytest.fixture(scope='session') def w3_account_token_owners( tokens_to_deploy, diff --git a/apps/cic-eth/tests/tasks/test_account.py b/apps/cic-eth/tests/tasks/test_account.py index b98436b0..78cd8314 100644 --- a/apps/cic-eth/tests/tasks/test_account.py +++ b/apps/cic-eth/tests/tasks/test_account.py @@ -64,6 +64,7 @@ def test_register_account( init_database, init_eth_tester, init_w3, + init_rpc, cic_registry, celery_session_worker, eth_empty_accounts, @@ -71,18 +72,31 @@ def test_register_account( logg.debug('chainspec {}'.format(str(default_chain_spec))) - s = celery.signature( - 'cic_eth.eth.account.register', + nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0]) + Nonce.init(init_w3.eth.accounts[0], nonce, session=init_database) + init_database.commit() + + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', [ eth_empty_accounts[0], + init_w3.eth.accounts[0], + ], + queue=None, + ) + s_register = celery.signature( + 'cic_eth.eth.account.register', + [ str(default_chain_spec), init_w3.eth.accounts[0], ], ) - t = s.apply_async() + s_nonce.link(s_register) + t = s_nonce.apply_async() address = t.get() - r = t.collect() - t.successful() + for r in t.collect(): + pass + assert t.successful() session = SessionBase.create_session() o = session.query(Otx).first() diff --git a/apps/cic-eth/tests/tasks/test_balance_complex.py b/apps/cic-eth/tests/tasks/test_balance_complex.py index bf4b2214..8da6f51e 100644 --- a/apps/cic-eth/tests/tasks/test_balance_complex.py +++ b/apps/cic-eth/tests/tasks/test_balance_complex.py @@ -8,6 +8,7 @@ import celery # local imports from cic_eth.eth.rpc import RpcClient from cic_eth.db.models.otx import Otx +from cic_eth.db.models.nonce import Nonce from cic_eth.eth.util import unpack_signed_raw_tx #logg = logging.getLogger(__name__) @@ -31,18 +32,32 @@ def test_balance_complex( } tx_hashes = [] + + nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0]) + Nonce.init(init_w3.eth.accounts[0], nonce, session=init_database) + init_database.commit() + for i in range(3): - s = celery.signature( - 'cic_eth.eth.token.transfer', + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', [ [token_data], + init_w3.eth.accounts[0], + ], + queue=None, + ) + s_transfer = celery.signature( + 'cic_eth.eth.token.transfer', + [ init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000*(i+1), chain_str, ], + queue=None, ) - t = s.apply_async() + s_nonce.link(s_transfer) + t = s_nonce.apply_async() t.get() r = None for c in t.collect(): diff --git a/apps/cic-eth/tests/tasks/test_convert.py b/apps/cic-eth/tests/tasks/test_convert.py index 46857b69..e142b989 100644 --- a/apps/cic-eth/tests/tasks/test_convert.py +++ b/apps/cic-eth/tests/tasks/test_convert.py @@ -9,6 +9,7 @@ from cic_eth.eth.bancor import BancorTxFactory logg = logging.getLogger() +@pytest.mark.skip() def test_transfer_after_convert( init_w3, init_database, diff --git a/apps/cic-eth/tests/tasks/test_faucet.py b/apps/cic-eth/tests/tasks/test_faucet.py index b6575ef4..bcf94e0e 100644 --- a/apps/cic-eth/tests/tasks/test_faucet.py +++ b/apps/cic-eth/tests/tasks/test_faucet.py @@ -10,6 +10,9 @@ import celery from cic_eth.eth.account import unpack_gift from cic_eth.eth.factory import TxFactory from cic_eth.eth.util import unpack_signed_raw_tx +from cic_eth.db.models.nonce import Nonce +from cic_eth.db.models.otx import Otx +from cic_eth.db.models.tx import TxCache logg = logging.getLogger() @@ -32,10 +35,21 @@ def test_faucet( init_database, ): - s = celery.signature( + nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[7]) + Nonce.init(init_w3.eth.accounts[7], nonce, session=init_database) + init_database.commit() + + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [ + init_w3.eth.accounts[7], + init_w3.eth.accounts[0], + ], + queue=None, + ) + s_gift = celery.signature( 'cic_eth.eth.account.gift', [ - init_w3.eth.accounts[7], str(default_chain_spec), ], ) @@ -45,15 +59,21 @@ def test_faucet( str(default_chain_spec), ], ) - s.link(s_send) - t = s.apply_async() - signed_tx = t.get() + s_gift.link(s_send) + s_nonce.link(s_gift) + t = s_nonce.apply_async() + t.get() for r in t.collect(): logg.debug('result {}'.format(r)) - assert t.successful() - tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[0][2:]), default_chain_spec.chain_id()) + q = init_database.query(Otx) + q = q.join(TxCache) + q = q.filter(TxCache.sender==init_w3.eth.accounts[7]) + o = q.first() + signed_tx = o.signed_tx + + tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[2:]), default_chain_spec.chain_id()) giveto = unpack_gift(tx['data']) assert giveto['to'] == init_w3.eth.accounts[7] diff --git a/apps/cic-eth/tests/tasks/test_gas_tasks.py b/apps/cic-eth/tests/tasks/test_gas_tasks.py index 157e4d2d..1a318d84 100644 --- a/apps/cic-eth/tests/tasks/test_gas_tasks.py +++ b/apps/cic-eth/tests/tasks/test_gas_tasks.py @@ -44,23 +44,39 @@ def test_refill_gas( refill_amount = c.refill_amount() balance = init_rpc.w3.eth.getBalance(receiver_address) - s = celery.signature( - 'cic_eth.eth.tx.refill_gas', + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', [ receiver_address, + provider_address, + ], + queue=None, + ) + s_refill = celery.signature( + 'cic_eth.eth.tx.refill_gas', + [ str(default_chain_spec), ], + queue=None, ) - t = s.apply_async() + s_nonce.link(s_refill) + t = s_nonce.apply_async() r = t.get() - t.collect() + for c in t.collect(): + pass assert t.successful() + q = init_database.query(Otx) + q = q.join(TxCache) + q = q.filter(TxCache.recipient==receiver_address) + o = q.first() + signed_tx = o.signed_tx + s = celery.signature( 'cic_eth.eth.tx.send', [ - [r], + [signed_tx], str(default_chain_spec), ], ) @@ -99,83 +115,131 @@ def test_refill_deduplication( c = init_rpc refill_amount = c.refill_amount() - s = celery.signature( - 'cic_eth.eth.tx.refill_gas', + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', [ receiver_address, + provider_address, + ], + queue=None, + ) + s_refill = celery.signature( + 'cic_eth.eth.tx.refill_gas', + [ str(default_chain_spec), ], + queue=None, ) - t = s.apply_async() + s_nonce.link(s_refill) + t = s_nonce.apply_async() r = t.get() for e in t.collect(): pass assert t.successful() - s = celery.signature( - 'cic_eth.eth.tx.refill_gas', + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', [ receiver_address, - str(default_chain_spec), + provider_address, ], + queue=None, ) - - t = s.apply_async() - with pytest.raises(AlreadyFillingGasError): - t.get() - - -def test_check_gas( - default_chain_spec, - init_eth_tester, - init_w3, - init_rpc, - eth_empty_accounts, - init_database, - cic_registry, - celery_session_worker, - bancor_registry, - bancor_tokens, - ): - - provider_address = init_w3.eth.accounts[0] - gas_receiver_address = eth_empty_accounts[0] - token_receiver_address = init_w3.eth.accounts[1] - - c = init_rpc - txf = TokenTxFactory(gas_receiver_address, c) - tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec) - - (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None) - - gas_price = c.gas_price() - gas_limit = tx_transfer['gas'] - - s = celery.signature( - 'cic_eth.eth.tx.check_gas', + s_refill = celery.signature( + 'cic_eth.eth.tx.refill_gas', [ - [tx_hash_hex], str(default_chain_spec), - [], - gas_receiver_address, - gas_limit * gas_price, ], ) - t = s.apply_async() - with pytest.raises(OutOfGasError): - r = t.get() - #assert len(r) == 0 - time.sleep(1) - t.collect() + s_nonce.link(s_refill) + t = s_nonce.apply_async() + #with pytest.raises(AlreadyFillingGasError): + t.get() + for e in t.collect(): + pass + assert t.successful() + logg.warning('TODO: complete test by checking that second tx had zero value') - session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash_hex) - r = q.first() - session.close() - assert r.status == StatusEnum.WAITFORGAS + +# TODO: check gas is part of the transfer chain, and we cannot create the transfer nonce by uuid before the task. Test is subsumed by transfer task test, but should be tested in isolation +#def test_check_gas( +# default_chain_spec, +# init_eth_tester, +# init_w3, +# init_rpc, +# eth_empty_accounts, +# init_database, +# cic_registry, +# celery_session_worker, +# bancor_registry, +# bancor_tokens, +# ): +# +# provider_address = init_w3.eth.accounts[0] +# gas_receiver_address = eth_empty_accounts[0] +# token_receiver_address = init_w3.eth.accounts[1] +# +## c = init_rpc +## txf = TokenTxFactory(gas_receiver_address, c) +## tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec, 'foo') +## +## (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None) +# +# token_data = [ +# { +# 'address': bancor_tokens[0], +# }, +# ] +# +# s_nonce = celery.signature( +# 'cic_eth.eth.tx.reserve_nonce', +# [ +# token_data, +# init_w3.eth.accounts[0], +# ], +# queue=None, +# ) +# s_transfer = celery.signature( +# 'cic_eth.eth.token.transfer', +# [ +# init_w3.eth.accounts[0], +# init_w3.eth.accounts[1], +# 1024, +# str(default_chain_spec), +# ], +# queue=None, +# ) +# +# gas_price = c.gas_price() +# gas_limit = tx_transfer['gas'] +# +# s = celery.signature( +# 'cic_eth.eth.tx.check_gas', +# [ +# [tx_hash_hex], +# str(default_chain_spec), +# [], +# gas_receiver_address, +# gas_limit * gas_price, +# ], +# ) +# s_nonce.link(s_transfer) +# t = s_nonce.apply_async() +# with pytest.raises(OutOfGasError): +# r = t.get() +# #assert len(r) == 0 +# +# time.sleep(1) +# t.collect() +# +# session = SessionBase.create_session() +# q = session.query(Otx) +# q = q.filter(Otx.tx_hash==tx_hash_hex) +# r = q.first() +# session.close() +# assert r.status == StatusEnum.WAITFORGAS def test_resend_with_higher_gas( @@ -191,25 +255,63 @@ def test_resend_with_higher_gas( ): c = init_rpc - txf = TokenTxFactory(init_w3.eth.accounts[0], c) - tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec) - logg.debug('txtransfer {}'.format(tx_transfer)) - (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec)) - logg.debug('signed raw {}'.format(tx_signed_raw_hex)) - queue_create( - tx_transfer['nonce'], - tx_transfer['from'], - tx_hash_hex, - tx_signed_raw_hex, - str(default_chain_spec), + token_data = [ + { + 'address': bancor_tokens[0], + }, + ] + + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [ + token_data, + init_w3.eth.accounts[0], + ], + queue=None, ) - logg.debug('create {}'.format(tx_transfer['from'])) - cache_transfer_data( - tx_hash_hex, - tx_transfer, #_signed_raw_hex, + s_transfer = celery.signature( + 'cic_eth.eth.token.transfer', + [ + init_w3.eth.accounts[0], + init_w3.eth.accounts[1], + 1024, + str(default_chain_spec), + ], + queue=None, ) +# txf = TokenTxFactory(init_w3.eth.accounts[0], c) + +# tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec, 'foo') +# logg.debug('txtransfer {}'.format(tx_transfer)) +# (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec)) +# logg.debug('signed raw {}'.format(tx_signed_raw_hex)) +# queue_create( +# tx_transfer['nonce'], +# tx_transfer['from'], +# tx_hash_hex, +# tx_signed_raw_hex, +# str(default_chain_spec), +# ) +# logg.debug('create {}'.format(tx_transfer['from'])) +# cache_transfer_data( +# tx_hash_hex, +# tx_transfer, #_signed_raw_hex, +# ) + s_nonce.link(s_transfer) + t = s_nonce.apply_async() + t.get() + for r in t.collect(): + pass + assert t.successful() + + q = init_database.query(Otx) + q = q.join(TxCache) + q = q.filter(TxCache.recipient==init_w3.eth.accounts[1]) + o = q.first() + tx_hash_hex = o.tx_hash + s_resend = celery.signature( 'cic_eth.eth.tx.resend_with_higher_gas', [ @@ -217,13 +319,10 @@ def test_resend_with_higher_gas( str(default_chain_spec), ], ) - t = s_resend.apply_async() - - i = 0 - for r in t.collect(): - logg.debug('{} {}'.format(i, r[0].get())) - i += 1 + t = s_resend.apply_async() + for r in t.collect(): + pass assert t.successful() # diff --git a/apps/cic-eth/tests/tasks/test_nonce_tasks.py b/apps/cic-eth/tests/tasks/test_nonce_tasks.py index 0f3e2685..e89292b8 100644 --- a/apps/cic-eth/tests/tasks/test_nonce_tasks.py +++ b/apps/cic-eth/tests/tasks/test_nonce_tasks.py @@ -1,4 +1,5 @@ # third-party imports +import pytest import celery # local imports @@ -6,7 +7,88 @@ from cic_eth.admin.nonce import shift_nonce from cic_eth.queue.tx import create as queue_create from cic_eth.eth.tx import otx_cache_parse_tx from cic_eth.eth.task import sign_tx +from cic_eth.db.models.nonce import ( + NonceReservation, + Nonce + ) +from cic_eth.db.models.otx import Otx +from cic_eth.db.models.tx import TxCache + +@pytest.mark.skip() +def test_reserve_nonce_task( + init_database, + celery_session_worker, + eth_empty_accounts, + ): + + s = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [ + 'foo', + eth_empty_accounts[0], + ], + queue=None, + ) + t = s.apply_async() + r = t.get() + + assert r == 'foo' + + q = init_database.query(Nonce) + q = q.filter(Nonce.address_hex==eth_empty_accounts[0]) + o = q.first() + assert o != None + + q = init_database.query(NonceReservation) + q = q.filter(NonceReservation.key==str(t)) + o = q.first() + assert o != None + + +def test_reserve_nonce_chain( + default_chain_spec, + init_database, + celery_session_worker, + init_w3, + init_rpc, + ): + + provider_address = init_rpc.gas_provider() + Nonce.init(provider_address, 42, session=init_database) + init_database.commit() + + s_nonce = celery.signature( + 'cic_eth.eth.tx.reserve_nonce', + [ + init_w3.eth.accounts[0], + provider_address, + ], + queue=None, + ) + s_gas = celery.signature( + 'cic_eth.eth.tx.refill_gas', + [ + str(default_chain_spec), + ], + queue=None, + ) + s_nonce.link(s_gas) + t = s_nonce.apply_async() + r = t.get() + for c in t.collect(): + pass + assert t.successful() + + q = init_database.query(Otx) + Q = q.join(TxCache) + q = q.filter(TxCache.recipient==init_w3.eth.accounts[0]) + o = q.first() + + assert o.nonce == 42 + + +@pytest.mark.skip() def test_shift_nonce( default_chain_spec, init_database, @@ -47,3 +129,4 @@ def test_shift_nonce( for _ in t.collect(): pass assert t.successful() + diff --git a/apps/cic-eth/tests/unit/db/test_nonce_db.py b/apps/cic-eth/tests/unit/db/test_nonce_db.py index 7b8cdb61..d192c5fa 100644 --- a/apps/cic-eth/tests/unit/db/test_nonce_db.py +++ b/apps/cic-eth/tests/unit/db/test_nonce_db.py @@ -1,8 +1,29 @@ # third-party imports import pytest +import uuid # local imports -from cic_eth.db.models.nonce import Nonce +from cic_eth.db.models.nonce import ( + Nonce, + NonceReservation, + ) +from cic_eth.error import ( + InitializationError, + IntegrityError, + ) + + +def test_nonce_init( + init_database, + eth_empty_accounts, + ): + + nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database) + init_database.commit() + + with pytest.raises(InitializationError): + nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database) + def test_nonce_increment( init_database, @@ -10,11 +31,46 @@ def test_nonce_increment( database_engine, ): -# if database_engine[:6] == 'sqlite': -# pytest.skip('sqlite cannot lock tables which is required for this test, skipping') - nonce = Nonce.next(eth_empty_accounts[0], 3) assert nonce == 3 nonce = Nonce.next(eth_empty_accounts[0], 3) assert nonce == 4 + + +def test_nonce_reserve( + init_database, + eth_empty_accounts, + ): + + nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database) + init_database.commit() + uu = uuid.uuid4() + nonce = NonceReservation.next(eth_empty_accounts[0], str(uu), session=init_database) + init_database.commit() + assert nonce == 42 + + q = init_database.query(Nonce) + q = q.filter(Nonce.address_hex==eth_empty_accounts[0]) + o = q.first() + assert o.nonce == 43 + + nonce = NonceReservation.release(str(uu)) + init_database.commit() + assert nonce == 42 + + q = init_database.query(NonceReservation) + q = q.filter(NonceReservation.key==str(uu)) + o = q.first() + assert o == None + + +def test_nonce_reserve_integrity( + init_database, + eth_empty_accounts, + ): + + uu = uuid.uuid4() + nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database) + with pytest.raises(IntegrityError): + NonceReservation.release(str(uu)) diff --git a/apps/cic-eth/tests/unit/queue/test_list_tx.py b/apps/cic-eth/tests/unit/queue/test_list_tx.py new file mode 100644 index 00000000..74dd1191 --- /dev/null +++ b/apps/cic-eth/tests/unit/queue/test_list_tx.py @@ -0,0 +1,71 @@ +# standard imports +import logging + +# local imports +from cic_eth.queue.tx import get_status_tx +from cic_eth.db.enum import ( + StatusEnum, + StatusBits, + ) +from cic_eth.queue.tx import create as queue_create +from cic_eth.eth.tx import cache_gas_refill_data +from cic_eth.db.models.otx import Otx + +logg = logging.getLogger() + + +def test_status_tx_list( + default_chain_spec, + init_database, + init_w3, + ): + + tx = { + 'from': init_w3.eth.accounts[0], + 'to': init_w3.eth.accounts[1], + 'nonce': 42, + 'gas': 21000, + 'gasPrice': 1000000, + 'value': 128, + 'chainId': 666, + 'data': '', + } + logg.debug('nonce {}'.format(tx['nonce'])) + tx_signed = init_w3.eth.sign_transaction(tx) + #tx_hash = RpcClient.w3.keccak(hexstr=tx_signed['raw']) + tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) + queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) + cache_gas_refill_data(tx_hash.hex(), tx) + tx_hash_hex = tx_hash.hex() + + q = init_database.query(Otx) + otx = q.get(1) + otx.sendfail(session=init_database) + init_database.add(otx) + init_database.commit() + init_database.refresh(otx) + + txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database) + assert len(txs) == 1 + + otx.sendfail(session=init_database) + otx.retry(session=init_database) + init_database.add(otx) + init_database.commit() + init_database.refresh(otx) + + txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database) + assert len(txs) == 1 + + txs = get_status_tx(StatusBits.QUEUED, session=init_database) + assert len(txs) == 1 + + txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.LOCAL_ERROR, session=init_database) + assert len(txs) == 0 + + txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK, session=init_database) + assert len(txs) == 1 + + txs = get_status_tx(StatusBits.IN_NETWORK, session=init_database) + assert len(txs) == 0 + diff --git a/apps/contract-migration/scripts/import_users.py b/apps/contract-migration/scripts/import_users.py index a0cc3c8f..cb2d1d7d 100644 --- a/apps/contract-migration/scripts/import_users.py +++ b/apps/contract-migration/scripts/import_users.py @@ -102,6 +102,9 @@ def register_eth(i, u): ps.get_message() m = ps.get_message(timeout=args.timeout) address = None + if m == None: + logg.debug('message timeout') + return if m['type'] == 'subscribe': logg.debug('skipping subscribe message') continue diff --git a/apps/contract-migration/scripts/verify.py b/apps/contract-migration/scripts/verify.py index ef3ee2b4..3eb47311 100644 --- a/apps/contract-migration/scripts/verify.py +++ b/apps/contract-migration/scripts/verify.py @@ -58,6 +58,7 @@ argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spe argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url') argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') +argparser.add_argument('-x', '--exit-on-error', dest='x', action='store_true', help='Halt exection on error') argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-vv', help='be more verbose', action='store_true') argparser.add_argument('user_dir', type=str, help='user export directory') @@ -91,6 +92,27 @@ old_chain_spec = ChainSpec.from_chain_str(args.old_chain_spec) old_chain_str = str(old_chain_spec) user_dir = args.user_dir # user_out_dir from import_users.py meta_url = args.meta_provider +exit_on_error = args.x + + +class VerifierState: + + def __init__(self, item_keys): + self.items = {} + for k in item_keys: + logg.info('k {}'.format(k)) + self.items[k] = 0 + + + def poke(self, item_key): + self.items[item_key] += 1 + + + def __str__(self): + r = '' + for k in self.items.keys(): + r += '{}: {}\n'.format(k, self.items[k]) + return r class VerifierError(Exception): @@ -107,7 +129,8 @@ class VerifierError(Exception): class Verifier: - def __init__(self, conn, cic_eth_api, gas_oracle, chain_spec, index_address, token_address, data_dir): + # TODO: what an awful function signature + def __init__(self, conn, cic_eth_api, gas_oracle, chain_spec, index_address, token_address, data_dir, exit_on_error=False): self.conn = conn self.gas_oracle = gas_oracle self.chain_spec = chain_spec @@ -117,9 +140,18 @@ class Verifier: self.tx_factory = TxFactory(chain_id=chain_spec.chain_id(), gas_oracle=gas_oracle) self.api = cic_eth_api self.data_dir = data_dir + self.exit_on_error = exit_on_error + + verifymethods = [] + for k in dir(self): + if len(k) > 7 and k[:7] == 'verify_': + logg.info('adding verify method {}'.format(k)) + verifymethods.append(k[7:]) + + self.state = VerifierState(verifymethods) - def verify_accounts_index(self, address): + def verify_accounts_index(self, address, balance=None): tx = self.tx_factory.template(ZERO_ADDRESS, self.index_address) data = keccak256_string_to_hex('have(address)')[:8] data += eth_abi.encode_single('address', address).hex() @@ -145,14 +177,14 @@ class Verifier: raise VerifierError((actual_balance, balance), 'balance') - def verify_local_key(self, address): + def verify_local_key(self, address, balance=None): r = self.api.have_account(address, str(self.chain_spec)) logg.debug('verify local key result {}'.format(r)) if r != address: raise VerifierError((address, r), 'local key') - def verify_metadata(self, address): + def verify_metadata(self, address, balance=None): k = generate_metadata_pointer(bytes.fromhex(strip_0x(address)), ':cic.person') url = os.path.join(meta_url, k) logg.debug('verify metadata url {}'.format(url)) @@ -184,15 +216,33 @@ class Verifier: def verify(self, address, balance): logg.debug('verify {} {}'.format(address, balance)) - - try: - self.verify_local_key(address) - self.verify_accounts_index(address) - self.verify_balance(address, balance) - self.verify_metadata(address) - except VerifierError as e: - logg.critical('verification failed: {}'.format(e)) - sys.exit(1) + + methods = [ + 'local_key', + 'accounts_index', + 'balance', + 'metadata', + ] + + for k in methods: + try: + m = getattr(self, 'verify_{}'.format(k)) + m(address, balance) +# self.verify_local_key(address) +# self.verify_accounts_index(address) +# self.verify_balance(address, balance) +# self.verify_metadata(address) + except VerifierError as e: + logline = 'verification {} failed for {}: {}'.format(k, address, str(e)) + if self.exit_on_error: + logg.critical(logline) + sys.exit(1) + logg.error(logline) + self.state.poke(k) + + + def __str__(self): + return str(self.state) class MockClient: @@ -263,7 +313,8 @@ def main(): r = l.split(',') try: address = to_checksum(r[0]) - sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r") + #sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r") + logg.debug('loading balance {} {}'.format(i, address).ljust(200)) except ValueError: break balance = int(r[1].rstrip()) @@ -274,7 +325,7 @@ def main(): api = AdminApi(MockClient()) - verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address, user_dir) + verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address, user_dir, exit_on_error) user_new_dir = os.path.join(user_dir, 'new') for x in os.walk(user_new_dir): @@ -298,11 +349,17 @@ def main(): new_address = u.identities['evm'][subchain_str][0] subchain_str = '{}:{}'.format(old_chain_spec.common_name(), old_chain_spec.network_id()) old_address = u.identities['evm'][subchain_str][0] - balance = balances[old_address] + balance = 0 + try: + balance = balances[old_address] + except KeyError: + logg.info('no old balance found for {}, assuming 0'.format(old_address)) logg.debug('checking {} -> {} = {}'.format(old_address, new_address, balance)) verifier.verify(new_address, balance) + print(verifier) + if __name__ == '__main__': main() diff --git a/docker-compose.yml b/docker-compose.yml index 073d8085..3c8b4426 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -313,7 +313,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - ./start_dispatcher.sh -q cic-eth -v + ./start_dispatcher.sh -q cic-eth -vv # command: "/root/start_dispatcher.sh -q cic-eth -vv"