diff --git a/apps/cic-eth/cic_eth/admin/nonce.py b/apps/cic-eth/cic_eth/admin/nonce.py index f8a3f24a..a55c8921 100644 --- a/apps/cic-eth/cic_eth/admin/nonce.py +++ b/apps/cic-eth/cic_eth/admin/nonce.py @@ -10,8 +10,8 @@ from chainlib.eth.tx import ( TxFactory, ) from chainlib.eth.gas import OverrideGasOracle -from chainqueue.query import get_tx -from chainqueue.state import set_cancel +from chainqueue.sql.query import get_tx +from chainqueue.sql.state import set_cancel from chainqueue.db.models.otx import Otx from chainqueue.db.models.tx import TxCache from hexathon import strip_0x @@ -28,13 +28,14 @@ from cic_eth.admin.ctrl import ( ) from cic_eth.queue.tx import queue_create from cic_eth.eth.gas import create_check_gas_task +from cic_eth.task import BaseTask celery_app = celery.current_app logg = logging.getLogger() -@celery_app.task(bind=True) -def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1): +@celery_app.task(bind=True, base=BaseTask) +def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1): """Shift all transactions with nonces higher than the offset by the provided position delta. Transactions who are replaced by transactions that move nonces will be marked as OVERRIDDEN. @@ -45,7 +46,7 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1): :type tx_hash_orig_hex: str, 0x-hex :param delta: Amount """ - chain_spec = ChainSpec.from_chain_str(chain_str) + chain_spec = ChainSpec.from_dict(chainspec_dict) rpc = RPCConnection.connect(chain_spec, 'default') rpc_signer = RPCConnection.connect(chain_spec, 'signer') queue = None @@ -54,18 +55,20 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1): except AttributeError: pass - session = SessionBase.create_session() + session = BaseTask.session_func() tx_brief = get_tx(chain_spec, tx_hash_orig_hex, session=session) tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx'])) tx = unpack(tx_raw, chain_spec) nonce = tx_brief['nonce'] address = tx['from'] - logg.debug('shifting nonce {} position(s) for address {}, offset {}'.format(delta, address, nonce)) + logg.debug('shifting nonce {} position(s) for address {}, offset {}, hash {}'.format(delta, address, nonce, tx['hash'])) lock_queue(None, chain_spec.asdict(), address=address) lock_send(None, chain_spec.asdict(), address=address) + set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session) + q = session.query(Otx) q = q.join(TxCache) q = q.filter(TxCache.sender==address) @@ -83,13 +86,16 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1): tx_previous_hash_hex = tx_new['hash'] tx_previous_nonce = tx_new['nonce'] - del(tx_new['hash']) - del(tx_new['hash_unsigned']) tx_new['gas_price'] += 1 tx_new['gasPrice'] = tx_new['gas_price'] tx_new['nonce'] -= delta - logg.debug('tx_nex {}'.format(tx_new)) + logg.debug('tx_new {}'.format(tx_new)) + + del(tx_new['hash']) + del(tx_new['hash_unsigned']) + del(tx_new['hashUnsigned']) + gas_oracle = OverrideGasOracle(limit=tx_new['gas'], price=tx_new['gas_price'] + 1) # TODO: it should be possible to merely set this price here and if missing in the existing struct then fill it in (chainlib.eth.tx) c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle) (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx_new) @@ -101,15 +107,15 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1): tx_signed_raw_hex, ) session.add(otx) - session.commit() # TODO: cancel all first, then replace. Otherwise we risk two non-locked states for two different nonces. - set_cancel(chain_spec, tx_previous_hash_hex, manual=True, session=session) + set_cancel(chain_spec, strip_0x(tx_previous_hash_hex), manual=True, session=session) TxCache.clone(tx_previous_hash_hex, tx_hash_hex, session=session) tx_hashes.append(tx_hash_hex) txs.append(tx_signed_raw_hex) + session.commit() session.close() diff --git a/apps/cic-eth/cic_eth/api/api_admin.py b/apps/cic-eth/cic_eth/api/api_admin.py index b559b8ce..f3fc78d1 100644 --- a/apps/cic-eth/cic_eth/api/api_admin.py +++ b/apps/cic-eth/cic_eth/api/api_admin.py @@ -30,13 +30,13 @@ from chainqueue.db.enum import ( status_str, ) from chainqueue.error import TxStateChangeError +from chainqueue.query import get_tx # local imports from cic_eth.db.models.base import SessionBase from cic_eth.db.models.role import AccountRole from cic_eth.db.models.nonce import Nonce from cic_eth.error import InitializationError -from cic_eth.queue.query import get_tx app = celery.current_app @@ -264,7 +264,8 @@ class AdminApi: } - def fix_nonce(self, address, nonce, chain_spec): + # TODO: is risky since it does not validate that there is actually a nonce problem? + def fix_nonce(self, chain_spec, address, nonce): s = celery.signature( 'cic_eth.queue.query.get_account_tx', [ @@ -278,15 +279,17 @@ class AdminApi: txs = s.apply_async().get() tx_hash_hex = None + session = SessionBase.create_session() for k in txs.keys(): - tx_dict = get_tx(k) + tx_dict = get_tx(chain_spec, k, session=session) if tx_dict['nonce'] == nonce: tx_hash_hex = k + session.close() s_nonce = celery.signature( 'cic_eth.admin.nonce.shift_nonce', [ - self.rpc.chain_spec.asdict(), + chain_spec.asdict(), tx_hash_hex, ], queue=self.queue diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index ac6142e9..61c99fbe 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -17,7 +17,7 @@ eth-address-index~=0.1.1a11 chainlib~=0.0.3rc2 hexathon~=0.0.1a7 chainsyncer[sql]~=0.0.2a4 -chainqueue~=0.0.2a2 +chainqueue~=0.0.2b1 sarafu-faucet==0.0.3a3 erc20-faucet==0.2.1a4 coincurve==15.0.0 diff --git a/apps/cic-eth/tests/task/api/test_admin.py b/apps/cic-eth/tests/task/api/test_admin.py index 46e41a09..95b1a060 100644 --- a/apps/cic-eth/tests/task/api/test_admin.py +++ b/apps/cic-eth/tests/task/api/test_admin.py @@ -34,7 +34,10 @@ from chainqueue.state import ( set_ready, set_reserved, ) -from chainqueue.query import get_tx +from chainqueue.query import ( + get_tx, + get_nonce_tx_cache, + ) # local imports from cic_eth.api import AdminApi @@ -133,7 +136,6 @@ def test_tx( logg.warning('code missing to verify tx contents {}'.format(tx)) - def test_check_nonce_gap( default_chain_spec, init_database, @@ -236,3 +238,64 @@ def test_check_nonce_localfail( assert r['nonce']['blocking'] == 4 assert r['tx']['blocking'] == tx_hashes[4] + + +def test_fix_nonce( + default_chain_spec, + init_database, + eth_rpc, + eth_signer, + agent_roles, + contract_roles, + celery_session_worker, + init_celery_tasks, + caplog, + ): + + nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 0) + gas_oracle = OverrideGasOracle(limit=21000, conn=eth_rpc) + + tx_hashes = [] + txs = [] + + for i in range(10): + c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle) + (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED) + + queue_create( + default_chain_spec, + i, + agent_roles['ALICE'], + tx_hash_hex, + tx_signed_raw_hex, + session=init_database, + ) + cache_gas_data( + tx_hash_hex, + tx_signed_raw_hex, + default_chain_spec.asdict(), + ) + tx_hashes.append(tx_hash_hex) + txs.append(tx_signed_raw_hex) + + init_database.commit() + + api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['DEFAULT']) + t = api.fix_nonce(default_chain_spec, agent_roles['ALICE'], 3) + r = t.get_leaf() + assert t.successful() + + init_database.commit() + + txs = get_nonce_tx_cache(default_chain_spec, 3, agent_roles['ALICE'], session=init_database) + ks = txs.keys() + assert len(ks) == 2 + for k in ks: + hsh = add_0x(k) + otx = Otx.load(hsh, session=init_database) + init_database.refresh(otx) + logg.debug('checking nonce {} tx {} status {}'.format(3, otx.tx_hash, otx.status)) + if add_0x(k) == tx_hashes[3]: + assert otx.status & StatusBits.OBSOLETE == StatusBits.OBSOLETE + else: + assert otx.status == 1 diff --git a/apps/cic-eth/tests/task/test_task_admin.py b/apps/cic-eth/tests/task/test_task_admin.py index 2df11210..7efa8283 100644 --- a/apps/cic-eth/tests/task/test_task_admin.py +++ b/apps/cic-eth/tests/task/test_task_admin.py @@ -65,7 +65,7 @@ def test_shift_nonce( s = celery.signature( 'cic_eth.admin.nonce.shift_nonce', [ - str(default_chain_spec), + default_chain_spec.asdict(), tx_hashes[3], ], queue=None @@ -76,7 +76,7 @@ def test_shift_nonce( init_database.commit() - for i in range(42+4, 42+10): + for i in range(42+3, 42+10): txs = get_nonce_tx_cache(default_chain_spec, i, agent_roles['ALICE'], session=init_database) for k in txs.keys(): hsh = add_0x(k)