diff --git a/apps/cic-eth/cic_eth/admin/nonce.py b/apps/cic-eth/cic_eth/admin/nonce.py index af4a28b6..a0d95cc9 100644 --- a/apps/cic-eth/cic_eth/admin/nonce.py +++ b/apps/cic-eth/cic_eth/admin/nonce.py @@ -23,7 +23,7 @@ from cic_eth.queue.tx import ( from cic_eth.queue.tx import create as queue_create from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.eth.task import ( - sign_tx, + #sign_tx, create_check_gas_task, ) diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py index f8ebee83..580d0345 100644 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py @@ -20,6 +20,7 @@ def upgrade(): op.create_table( 'nonce_task_reservation', sa.Column('id', sa.Integer, primary_key=True), + sa.Column('address_hex', sa.String(42), nullable=False), sa.Column('nonce', sa.Integer, nullable=False), sa.Column('key', sa.String, nullable=False), sa.Column('date_created', sa.DateTime, nullable=False), diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py index f8ebee83..580d0345 100644 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py +++ b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py @@ -20,6 +20,7 @@ def upgrade(): op.create_table( 'nonce_task_reservation', sa.Column('id', sa.Integer, primary_key=True), + sa.Column('address_hex', sa.String(42), nullable=False), sa.Column('nonce', sa.Integer, nullable=False), sa.Column('key', sa.String, nullable=False), sa.Column('date_created', sa.DateTime, nullable=False), diff --git a/apps/cic-eth/cic_eth/db/models/nonce.py b/apps/cic-eth/cic_eth/db/models/nonce.py index 1df15792..6cc1db91 100644 --- a/apps/cic-eth/cic_eth/db/models/nonce.py +++ b/apps/cic-eth/cic_eth/db/models/nonce.py @@ -122,17 +122,19 @@ class NonceReservation(SessionBase): __tablename__ = 'nonce_task_reservation' + address_hex = Column(String(42)) nonce = Column(Integer) key = Column(String) date_created = Column(DateTime, default=datetime.datetime.utcnow) @staticmethod - def peek(key, session=None): + def peek(address, key, session=None): session = SessionBase.bind_session(session) q = session.query(NonceReservation) q = q.filter(NonceReservation.key==key) + q = q.filter(NonceReservation.address_hex==address) o = q.first() nonce = None @@ -147,18 +149,19 @@ class NonceReservation(SessionBase): @staticmethod - def release(key, session=None): + def release(address, key, session=None): session = SessionBase.bind_session(session) nonce = NonceReservation.peek(key, session=session) q = session.query(NonceReservation) + q = q.filter(NonceReservation.address_hex==address) q = q.filter(NonceReservation.key==key) o = q.first() if o == None: - raise IntegrityError('nonce for key {}'.format(nonce)) + raise IntegrityError('nonce {} for key {} address {}: {}'.format(nonce, key, address)) SessionBase.release_session(session) session.delete(o) @@ -173,14 +176,15 @@ class NonceReservation(SessionBase): def next(address, key, session=None): session = SessionBase.bind_session(session) - if NonceReservation.peek(key, session) != None: - raise IntegrityError('nonce for key {}'.format(key)) + if NonceReservation.peek(address, key, session) != None: + raise IntegrityError('nonce for key {} address {}'.format(key, address)) nonce = Nonce.next(address, session=session) o = NonceReservation() o.nonce = nonce o.key = key + o.address_hex = address session.add(o) SessionBase.release_session(session) diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index ddb66bd8..58ff67b0 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -1,9 +1,8 @@ # standard imports import logging -# third-party imports +# external imports import celery -from cic_registry.chain import ChainSpec from erc20_single_shot_faucet import Faucet from chainlib.eth.constant import ZERO_ADDRESS from hexathon import strip_0x @@ -14,6 +13,7 @@ from chainlib.eth.sign import ( ) from chainlib.eth.address import to_checksum_address from chainlib.eth.tx import TxFormat +from chainlib.chain import ChainSpec from eth_accounts_index import AccountRegistry # local import @@ -196,7 +196,7 @@ def create(self, password, chain_str): @celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyAndSignerTask) -def register(self, account_address, chain_str, writer_address=None): +def register(self, account_address, chain_spec_dict, writer_address=None): """Creates a transaction to add the given address to the accounts index. :param account_address: Ethereum address to add @@ -209,7 +209,7 @@ def register(self, account_address, chain_str, writer_address=None): :returns: The account_address input param :rtype: str, 0x-hex """ - chain_spec = ChainSpec.from_chain_str(chain_str) + chain_spec = ChainSpec.from_dict(chain_spec_dict) session = self.create_session() #session = SessionBase.create_session() @@ -223,12 +223,6 @@ def register(self, account_address, chain_str, writer_address=None): logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address)) queue = self.request.delivery_info.get('routing_key') -# c = RpcClient(chain_spec, holder_address=writer_address) -# registry = safe_registry(c.w3) -# txf = AccountTxFactory(writer_address, c, registry=registry) -# tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session) -# (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session) - # Retrieve account index address rpc = RPCConnection.connect(chain_spec, 'default') reg = CICRegistry(chain_spec, rpc) @@ -239,26 +233,27 @@ def register(self, account_address, chain_str, writer_address=None): rpc_signer = RPCConnection.connect(chain_spec, 'signer') nonce_oracle = self.create_nonce_oracle(writer_address, rpc) gas_oracle = self.create_gas_oracle(rpc) - account_registry = AccountRegistry(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle) + account_registry = AccountRegistry(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id()) (tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED) - #cache_task = 'cic_eth.eth.account.cache_account_data' + # TODO: if cache task fails, task chain will not return + cache_task = 'cic_eth.eth.account.cache_account_data' cache_task = None # add transaction to queue - register_tx(tx_hash_hex, tx_signed_raw_hex, chain_str, queue, cache_task=cache_task, session=session) + register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session) session.commit() session.close() - return tx_hash_hex - #gas_budget = tx_add['gas'] * tx_add['gasPrice'] - logg.debug('register user tx {}'.format(tx_hash_hex)) - s = create_check_gas_and_send_task( + gas_budget = account_registry.gas(tx_signed_raw_hex) + logg.debug('register user tx {} {} {}'.format(tx_hash_hex, queue, gas_budget)) + + s = create_check_gas_task( [tx_signed_raw_hex], - chain_str, + chain_spec, writer_address, - gas_budget, + gas=gas_budget, tx_hashes_hex=[tx_hash_hex], queue=queue, ) @@ -305,6 +300,7 @@ def gift(self, account_address, chain_str): queue=queue, ) s.apply_async() + return [tx_signed_raw_hex] diff --git a/apps/cic-eth/cic_eth/eth/task.py b/apps/cic-eth/cic_eth/eth/task.py index 87e7385d..2dfeb300 100644 --- a/apps/cic-eth/cic_eth/eth/task.py +++ b/apps/cic-eth/cic_eth/eth/task.py @@ -21,46 +21,46 @@ from cic_eth.error import SignerError celery_app = celery.current_app logg = celery_app.log.get_default_logger() - -@celery_app.task() -def sign_tx(tx, chain_str): - """Sign a single transaction against the given chain specification. - - :param tx: Transaction in standard Ethereum format - :type tx: dict - :param chain_str: Chain spec string representation - :type chain_str: str - :returns: Transaction hash and raw signed transaction, respectively - :rtype: tuple - """ - chain_spec = ChainSpec.from_chain_str(chain_str) - #c = RpcClient(chain_spec) - tx_transfer_signed = None - conn = RPCConnection.connect('signer') - try: - o = sign_transaction(tx) - tx_transfer_signed = conn.do(o) - #try: - # tx_transfer_signed = c.w3.eth.sign_transaction(tx) - except Exception as e: - raise SignerError('sign tx {}: {}'.format(tx, e)) - logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed)) - h = sha3.keccak_256() - h.update(bytes.fromhex(strip_0x(tx_transfer_signed['raw']))) - tx_hash = h.digest() - #tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw']) - tx_hash_hex = add_0x(tx_hash.hex()) - return (tx_hash_hex, tx_transfer_signed['raw'],) +# +#@celery_app.task() +#def sign_tx(tx, chain_str): +# """Sign a single transaction against the given chain specification. +# +# :param tx: Transaction in standard Ethereum format +# :type tx: dict +# :param chain_str: Chain spec string representation +# :type chain_str: str +# :returns: Transaction hash and raw signed transaction, respectively +# :rtype: tuple +# """ +# chain_spec = ChainSpec.from_chain_str(chain_str) +# #c = RpcClient(chain_spec) +# tx_transfer_signed = None +# conn = RPCConnection.connect('signer') +# try: +# o = sign_transaction(tx) +# tx_transfer_signed = conn.do(o) +# #try: +# # tx_transfer_signed = c.w3.eth.sign_transaction(tx) +# except Exception as e: +# raise SignerError('sign tx {}: {}'.format(tx, e)) +# logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed)) +# h = sha3.keccak_256() +# h.update(bytes.fromhex(strip_0x(tx_transfer_signed['raw']))) +# tx_hash = h.digest() +# #tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw']) +# tx_hash_hex = add_0x(tx_hash.hex()) +# return (tx_hash_hex, tx_transfer_signed['raw'],) #def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None): -def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_str, queue, cache_task=None, session=None): +def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None): """Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING). :param tx: Standard ethereum transaction data :type tx: dict - :param chain_str: Chain spec, string representation - :type chain_str: str + :param chain_spec: Chain spec of transaction to add to queue + :type chain_spec: chainlib.chain.ChainSpec :param queue: Task queue :type queue: str :param cache_task: Cache task to call with signed transaction. If None, no task will be called. @@ -73,14 +73,14 @@ def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_str, queue, cache_task=Non logg.debug('adding queue tx {}'.format(tx_hash_hex)) tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex)) - tx = unpack(tx_signed_raw) + tx = unpack(tx_signed_raw, chain_id=chain_spec.chain_id()) queue_create( tx['nonce'], tx['from'], tx_hash_hex, tx_signed_raw_hex, - chain_str, + chain_spec, session=session, ) @@ -91,7 +91,7 @@ def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_str, queue, cache_task=Non [ tx_hash_hex, tx_signed_raw_hex, - chain_str, + chain_spec.asdict(), ], queue=queue, ) @@ -101,15 +101,15 @@ def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_str, queue, cache_task=Non # TODO: rename as we will not be sending task in the chain, this is the responsibility of the dispatcher -def create_check_gas_task(tx_signed_raws_hex, chain_str, holder_address, gas, tx_hashes_hex=None, queue=None): +def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None): """Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher. If tx_hashes_hex is not spefified, a preceding task chained to check_gas must supply the transaction hashes as its return value. :param tx_signed_raws_hex: Raw signed transaction data :type tx_signed_raws_hex: list of str, 0x-hex - :param chain_str: Chain spec, string representation - :type chain_str: str + :param chain_spec: Chain spec of address to add check gas for + :type chain_spec: chainlib.chain.ChainSpec :param holder_address: Address sending the transactions :type holder_address: str, 0x-hex :param gas: Gas budget hint for transactions @@ -127,7 +127,7 @@ def create_check_gas_task(tx_signed_raws_hex, chain_str, holder_address, gas, tx 'cic_eth.eth.tx.check_gas', [ tx_hashes_hex, - chain_str, + chain_spec.asdict(), tx_signed_raws_hex, holder_address, gas, @@ -138,7 +138,7 @@ def create_check_gas_task(tx_signed_raws_hex, chain_str, holder_address, gas, tx s_check_gas = celery.signature( 'cic_eth.eth.tx.check_gas', [ - chain_str, + chain_spec.asdict(), tx_signed_raws_hex, holder_address, gas, diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index 8238256a..91732037 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -4,8 +4,8 @@ import logging # third-party imports import celery import requests -from cic_registry import zero_address -from cic_registry.chain import ChainSpec +from chainlib.eth.constant import ZERO_ADDRESS +from chainlib.chain import ChainSpec from chainlib.eth.address import is_checksum_address from chainlib.eth.gas import balance from chainlib.eth.error import ( @@ -15,7 +15,9 @@ from chainlib.eth.error import ( from chainlib.eth.tx import ( transaction, receipt, + raw, ) +from chainlib.connection import RPCConnection from chainlib.hash import keccak256_hex_to_hex from hexathon import add_0x @@ -43,7 +45,7 @@ from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.eth.task import ( register_tx, create_check_gas_task, - sign_tx, + #sign_tx, ) from cic_eth.eth.nonce import NonceOracle from cic_eth.error import ( @@ -68,7 +70,7 @@ MAX_NONCE_ATTEMPTS = 3 # TODO this function is too long @celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task) -def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None): +def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=None): """Check the gas level of the sender address of a transaction. If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser. @@ -77,8 +79,8 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non :param tx_hashes: Transaction hashes due to be submitted :type tx_hashes: list of str, 0x-hex - :param chain_str: Chain spec string representation - :type chain_str: str + :param chain_spec_dict: Chain spec dict representation + :type chain_spec_dict: dict :param txs: Signed raw transaction data, corresponding to tx_hashes :type txs: list of str, 0x-hex :param address: Sender address @@ -99,38 +101,38 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non if not is_checksum_address(address): raise ValueError('invalid address {}'.format(address)) - chain_spec = ChainSpec.from_chain_str(chain_str) + chain_spec = ChainSpec.from_dict(chain_spec_dict) - queue = self.request.delivery_info['routing_key'] + queue = self.request.delivery_info.get('routing_key') - #c = RpcClient(chain_spec, holder_address=address) - #c = RpcClient(chain_spec) - conn = RPCConnection.connect() + conn = RPCConnection.connect(chain_spec) # TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx - balance = 0 + gas_balance = 0 try: - #balance = c.w3.eth.getBalance(address) o = balance(address) r = conn.do(o) except EthException as e: - raise EthError('balance call for {}: {}'.format(address, e)) + raise EthError('gas_balance call for {}: {}'.format(address, e)) - logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required)) + logg.debug('address {} has gas {} needs {}'.format(address, gas_balance, gas_required)) + session = SessionBase.create_session() + gas_provider = AccountRole.get_address('GAS_GIFTER', session=session) + session.close() - if gas_required > balance: + if gas_required > gas_balance: s_nonce = celery.signature( 'cic_eth.eth.tx.reserve_nonce', [ address, - c.gas_provider(), + gas_provider, ], queue=queue, ) s_refill_gas = celery.signature( 'cic_eth.eth.tx.refill_gas', [ - chain_str, + chain_spec_dict, ], queue=queue, ) @@ -147,28 +149,28 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non ) wait_tasks.append(s) celery.group(wait_tasks)() - raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, balance)) + raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, gas_balance)) - safe_gas = c.safe_threshold_amount() - if balance < safe_gas: + safe_gas = self.safe_gas_threshold_amount + if gas_balance < safe_gas: s_nonce = celery.signature( 'cic_eth.eth.tx.reserve_nonce', [ address, - c.gas_provider(), + gas_provider, ], queue=queue, ) s_refill_gas = celery.signature( 'cic_eth.eth.tx.refill_gas', [ - chain_str, + chain_spec_dict, ], queue=queue, ) s_nonce.link(s_refill_gas) s_nonce.apply_async() - logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address)) + logg.debug('requested refill from {} to {}'.format(gas_provider, address)) ready_tasks = [] for tx_hash in tx_hashes: s = celery.signature( @@ -218,172 +220,9 @@ def hashes_to_txs(self, tx_hashes): return txs -# TODO: Move this and send to subfolder submodule -class ParityNodeHandler: - def __init__(self, chain_spec, queue): - self.chain_spec = chain_spec - self.chain_str = str(chain_spec) - self.queue = queue - - def handle(self, exception, tx_hash_hex, tx_hex): - meth = self.handle_default - if isinstance(exception, (ValueError)): - - earg = exception.args[0] - if earg['code'] == -32010: - logg.debug('skipping lock for code {}'.format(earg['code'])) - meth = self.handle_invalid_parameters - elif earg['code'] == -32602: - meth = self.handle_invalid_encoding - else: - # TODO: move to status log db comment field - meth = self.handle_invalid - elif isinstance(exception, (requests.exceptions.ConnectionError)): - meth = self.handle_connection - (t, e_fn, message) = meth(tx_hash_hex, tx_hex, str(exception)) - return (t, e_fn, '{} {}'.format(message, exception)) - - - def handle_connection(self, tx_hash_hex, tx_hex, debugstr=None): - s_set_sent = celery.signature( - 'cic_eth.queue.tx.set_sent_status', - [ - tx_hash_hex, - True, - ], - queue=self.queue, - ) - t = s_set_sent.apply_async() - return (t, TemporaryTxError, 'Sendfail {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()))) - - - def handle_invalid_encoding(self, tx_hash_hex, tx_hex, debugstr=None): - tx_bytes = bytes.fromhex(tx_hex[2:]) - tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id()) - s_lock = celery.signature( - 'cic_eth.admin.ctrl.lock_send', - [ - tx_hash_hex, - self.chain_str, - tx['from'], - tx_hash_hex, - ], - queue=self.queue, - ) - s_set_reject = celery.signature( - 'cic_eth.queue.tx.set_rejected', - [], - queue=self.queue, - ) - nonce_txs = get_nonce_tx(tx['nonce'], tx['from'], self.chain_spec.chain_id()) - attempts = len(nonce_txs) - if attempts < MAX_NONCE_ATTEMPTS: - logg.debug('nonce {} address {} retries {} < {}'.format(tx['nonce'], tx['from'], attempts, MAX_NONCE_ATTEMPTS)) - s_resend = celery.signature( - 'cic_eth.eth.tx.resend_with_higher_gas', - [ - self.chain_str, - None, - 1.01, - ], - queue=self.queue, - ) - s_unlock = celery.signature( - 'cic_eth.admin.ctrl.unlock_send', - [ - self.chain_str, - tx['from'], - ], - queue=self.queue, - ) - s_resend.link(s_unlock) - s_set_reject.link(s_resend) - - s_lock.link(s_set_reject) - t = s_lock.apply_async() - return (t, PermanentTxError, 'Reject invalid encoding {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()))) - - - def handle_invalid_parameters(self, tx_hash_hex, tx_hex, debugstr=None): - s_sync = celery.signature( - 'cic_eth.eth.tx.sync_tx', - [ - tx_hash_hex, - self.chain_str, - ], - queue=self.queue, - ) - t = s_sync.apply_async() - return (t, PermanentTxError, 'Reject invalid parameters {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()))) - - - def handle_invalid(self, tx_hash_hex, tx_hex, debugstr=None): - tx_bytes = bytes.fromhex(tx_hex[2:]) - tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id()) - s_lock = celery.signature( - 'cic_eth.admin.ctrl.lock_send', - [ - tx_hash_hex, - self.chain_str, - tx['from'], - tx_hash_hex, - ], - queue=self.queue, - ) - s_set_reject = celery.signature( - 'cic_eth.queue.tx.set_rejected', - [], - queue=self.queue, - ) - s_debug = celery.signature( - 'cic_eth.admin.debug.alert', - [ - tx_hash_hex, - debugstr, - ], - queue=self.queue, - ) - s_set_reject.link(s_debug) - s_lock.link(s_set_reject) - t = s_lock.apply_async() - return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()))) - - - def handle_default(self, tx_hash_hex, tx_hex, debugstr): - tx_bytes = bytes.fromhex(tx_hex[2:]) - tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id()) - s_lock = celery.signature( - 'cic_eth.admin.ctrl.lock_send', - [ - tx_hash_hex, - self.chain_str, - tx['from'], - tx_hash_hex, - ], - queue=self.queue, - ) - s_set_fubar = celery.signature( - 'cic_eth.queue.tx.set_fubar', - [], - queue=self.queue, - ) - s_debug = celery.signature( - 'cic_eth.admin.debug.alert', - [ - tx_hash_hex, - debugstr, - ], - queue=self.queue, - ) - s_set_fubar.link(s_debug) - s_lock.link(s_set_fubar) - t = s_lock.apply_async() - return (t, PermanentTxError, 'Fubar {} {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()), debugstr)) - - # TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic. @celery_app.task(bind=True, base=CriticalWeb3Task) -def send(self, txs, chain_str): +def send(self, txs, chain_spec_dict): """Send transactions to the network. If more than one transaction is passed to the task, it will spawn a new send task with the remaining transaction(s) after the first in the list has been processed. @@ -408,7 +247,7 @@ def send(self, txs, chain_str): if len(txs) == 0: raise ValueError('no transaction to send') - chain_spec = ChainSpec.from_chain_str(chain_str) + chain_spec = ChainSpec.from_dict(chain_spec_dict) tx_hex = txs[0] logg.debug('send transaction {}'.format(tx_hex)) @@ -428,17 +267,18 @@ def send(self, txs, chain_str): queue=queue, ) - return txs[1:] - - try: + o = raw(tx_hex) + conn = RPCConnection.connect(chain_spec, 'default') + #try: #r = c.w3.eth.send_raw_transaction(tx_hex) - r = c.w3.eth.sendRawTransaction(tx_hex) - except requests.exceptions.ConnectionError as e: - raise(e) - except Exception as e: - raiser = ParityNodeHandler(chain_spec, queue) - (t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex) - raise e(m) + #r = c.w3.eth.sendRawTransaction(tx_hex) + conn.do(o) + #except requests.exceptions.ConnectionError as e: + # raise(e) +# except Exception as e: +# raiser = ParityNodeHandler(chain_spec, queue) +# (t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex) +# raise e(m) s_set_sent.apply_async() tx_tail = txs[1:] @@ -456,7 +296,7 @@ def send(self, txs, chain_str): # TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks. # TODO: method is too long, factor out code for clarity @celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3AndSignerTask) -def refill_gas(self, recipient_address, chain_str): +def refill_gas(self, recipient_address, chain_spec_dict): """Executes a native token transaction to fund the recipient's gas expenditures. :param recipient_address: Recipient in need of gas @@ -467,7 +307,7 @@ def refill_gas(self, recipient_address, chain_str): :returns: Transaction hash. :rtype: str, 0x-hex """ - chain_spec = ChainSpec.from_chain_str(chain_str) + chain_spec = ChainSpec.from_dict(chain_spec_dict) zero_amount = False session = SessionBase.create_session() @@ -632,21 +472,22 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa @celery_app.task(bind=True, base=CriticalSQLAlchemyTask) -def reserve_nonce(self, chained_input, signer=None): +def reserve_nonce(self, chained_input, signer_address=None): + session = SessionBase.create_session() address = None - if signer == None: + if signer_address == None: address = chained_input logg.debug('non-explicit address for reserve nonce, using arg head {}'.format(chained_input)) else: - #if web3.Web3.isChecksumAddress(signer): - if is_checksum_address(signer): - address = signer - logg.debug('explicit address for reserve nonce {}'.format(signer)) + #if web3.Web3.isChecksumAddress(signer_address): + if is_checksum_address(signer_address): + address = signer_address + logg.debug('explicit address for reserve nonce {}'.format(signer_address)) else: - address = AccountRole.get_address(signer, session=session) - logg.debug('role for reserve nonce {} -> {}'.format(signer, address)) + address = AccountRole.get_address(signer_address, session=session) + logg.debug('role for reserve nonce {} -> {}'.format(signer_address, address)) if not is_checksum_address(address): raise ValueError('invalid result when resolving address for nonce {}'.format(address)) @@ -803,8 +644,8 @@ def cache_gas_refill_data( tx_hash_hex, tx['from'], tx['to'], - zero_address, - zero_address, + ZERO_ADDRESS, + ZERO_ADDRESS, tx['value'], tx['value'], ) diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 520a4529..55bdba3a 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -35,7 +35,7 @@ celery_app = celery.current_app logg = logging.getLogger() -def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predecessors=True, session=None): +def create(nonce, holder_address, tx_hash, signed_tx, chain_spec, obsolete_predecessors=True, session=None): """Create a new transaction queue record. :param nonce: Transaction nonce @@ -46,13 +46,13 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec :type tx_hash: str, 0x-hex :param signed_tx: Signed raw transaction :type signed_tx: str, 0x-hex - :param chain_str: Chain spec string representation to create transaction for - :type chain_str: str + :param chain_spec: Chain spec to create transaction for + :type chain_spec: ChainSpec :returns: transaction hash :rtype: str, 0x-hash """ session = SessionBase.bind_session(session) - lock = Lock.check_aggregate(chain_str, LockEnum.QUEUE, holder_address, session=session) + lock = Lock.check_aggregate(str(chain_spec), LockEnum.QUEUE, holder_address, session=session) if lock > 0: SessionBase.release_session(session) raise LockedError(lock) diff --git a/apps/cic-eth/cic_eth/task.py b/apps/cic-eth/cic_eth/task.py index ceccdf24..5fd0d0e2 100644 --- a/apps/cic-eth/cic_eth/task.py +++ b/apps/cic-eth/cic_eth/task.py @@ -52,6 +52,7 @@ class CriticalWeb3Task(CriticalTask): autoretry_for = ( requests.exceptions.ConnectionError, ) + safe_gas_threshold_amount = 2000000000 * 60000 * 3 class CriticalSQLAlchemyAndWeb3Task(CriticalTask): @@ -62,6 +63,7 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask): sqlalchemy.exc.ResourceClosedError, EthError, ) + safe_gas_threshold_amount = 2000000000 * 60000 * 3 class CriticalSQLAlchemyAndSignerTask(CriticalTask): autoretry_for = ( diff --git a/apps/cic-eth/tests/fixtures_celery.py b/apps/cic-eth/tests/fixtures_celery.py index af003258..45d22971 100644 --- a/apps/cic-eth/tests/fixtures_celery.py +++ b/apps/cic-eth/tests/fixtures_celery.py @@ -4,7 +4,8 @@ import tempfile import logging import shutil -logg = logging.getLogger(__name__) +#logg = logging.getLogger(__name__) +logg = logging.getLogger() # celery fixtures @@ -52,7 +53,7 @@ def celery_config(): @pytest.fixture(scope='session') def celery_worker_parameters(): return { -# 'queues': ('cic-eth'), +# 'queues': ('celery'), } @pytest.fixture(scope='session') diff --git a/apps/cic-eth/tests/fixtures_role.py b/apps/cic-eth/tests/fixtures_role.py index 58a9fe5c..588af53f 100644 --- a/apps/cic-eth/tests/fixtures_role.py +++ b/apps/cic-eth/tests/fixtures_role.py @@ -9,7 +9,9 @@ from chainlib.eth.address import to_checksum_address # local imports from cic_eth.db.models.role import AccountRole -logg = logging.getLogger(__name__) +#logg = logging.getLogger(__name__) +# what the actual fuck, debug is not being shown even though explicitly set +logg = logging.getLogger() @pytest.fixture(scope='function') @@ -22,6 +24,7 @@ def custodial_roles( r.update(contract_roles) r.update({ 'DEFAULT': eth_accounts[0], + 'GAS_GIFTER': eth_accounts[3], }) for k in r.keys(): role = AccountRole.set(k, r[k]) diff --git a/apps/cic-eth/tests/task/test_account.py b/apps/cic-eth/tests/task/test_account.py index 54d6da56..dd55e699 100644 --- a/apps/cic-eth/tests/task/test_account.py +++ b/apps/cic-eth/tests/task/test_account.py @@ -67,10 +67,10 @@ def test_register_account( eth_accounts, eth_rpc, cic_registry, - celery_session_worker, eth_empty_accounts, custodial_roles, call_sender, + celery_worker, ): logg.debug('chainspec {}'.format(str(default_chain_spec))) @@ -79,16 +79,17 @@ def test_register_account( 'cic_eth.eth.tx.reserve_nonce', [ eth_empty_accounts[0], - eth_accounts[0], + custodial_roles['ACCOUNT_REGISTRY_WRITER'], ], queue=None, ) s_register = celery.signature( 'cic_eth.eth.account.register', [ - str(default_chain_spec), - eth_accounts[0], + default_chain_spec.asdict(), + custodial_roles['ACCOUNT_REGISTRY_WRITER'], ], + queue=None, ) s_nonce.link(s_register) t = s_nonce.apply_async() @@ -101,13 +102,14 @@ def test_register_account( o = session.query(Otx).first() tx_signed_hex = o.signed_tx session.close() - + s_send = celery.signature( 'cic_eth.eth.tx.send', [ [tx_signed_hex], - str(default_chain_spec), + default_chain_spec.asdict(), ], + queue=None, ) t = s_send.apply_async() address = t.get()