diff --git a/apps/cic-eth/cic_eth/callbacks/tcp.py b/apps/cic-eth/cic_eth/callbacks/tcp.py index 9cd79938..db8c3983 100644 --- a/apps/cic-eth/cic_eth/callbacks/tcp.py +++ b/apps/cic-eth/cic_eth/callbacks/tcp.py @@ -20,5 +20,10 @@ def tcp(self, result, destination, status_code): (host, port) = destination.split(':') logg.debug('tcp callback to {} {}'.format(host, port)) s.connect((host, int(port))) - s.send(json.dumps(result).encode('utf-8')) + data = { + 'root_id': self.request.root_id, + 'status': status_code, + 'result': result, + } + s.send(json.dumps(data).encode('utf-8')) s.close() diff --git a/apps/cic-eth/cic_eth/eth/token.py b/apps/cic-eth/cic_eth/eth/token.py index 761a1765..13eef37e 100644 --- a/apps/cic-eth/cic_eth/eth/token.py +++ b/apps/cic-eth/cic_eth/eth/token.py @@ -19,7 +19,10 @@ from cic_eth.eth.task import create_check_gas_and_send_task from cic_eth.eth.factory import TxFactory from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.ext.address import translate_address -from cic_eth.task import CriticalSQLAlchemyTask +from cic_eth.task import ( + CriticalSQLAlchemyTask, + CriticalWeb3Task, + ) celery_app = celery.current_app logg = logging.getLogger() @@ -173,7 +176,7 @@ def unpack_approve(data): } -@celery_app.task() +@celery_app.task(base=CriticalWeb3Task) def balance(tokens, holder_address, chain_str): """Return token balances for a list of tokens for given address @@ -308,7 +311,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str): return tx_hash_hex -@celery_app.task() +@celery_app.task(base=CriticalWeb3Task) def resolve_tokens_by_symbol(token_symbols, chain_str): """Returns contract addresses of an array of ERC20 token symbols diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index 91c311ab..f7285ff8 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -32,7 +32,10 @@ from cic_eth.eth.nonce import NonceOracle from cic_eth.error import AlreadyFillingGasError from cic_eth.eth.util import tx_hex_string from cic_eth.admin.ctrl import lock_send -from cic_eth.task import CriticalSQLAlchemyTask +from cic_eth.task import ( + CriticalSQLAlchemyTask, + CriticalWeb3Task, + ) celery_app = celery.current_app logg = logging.getLogger() @@ -315,7 +318,7 @@ class ParityNodeHandler: # TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic. -@celery_app.task(bind=True) +@celery_app.task(bind=True, base=CriticalWeb3Task) def send(self, txs, chain_str): """Send transactions to the network. @@ -363,6 +366,8 @@ def send(self, txs, chain_str): ) try: r = c.w3.eth.send_raw_transaction(tx_hex) + except requests.exceptions.ConnectionError as e: + raise(e) except Exception as e: raiser = ParityNodeHandler(chain_spec, queue) (t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex) @@ -382,7 +387,7 @@ def send(self, txs, chain_str): # TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks. -@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,)) +@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task) def refill_gas(self, recipient_address, chain_str): """Executes a native token transaction to fund the recipient's gas expenditures. @@ -465,7 +470,7 @@ def refill_gas(self, recipient_address, chain_str): return tx_send_gas_signed['raw'] -@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) +@celery_app.task(bind=True) def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1): """Create a new transaction from an existing one with same nonce and higher gas price. @@ -539,7 +544,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa return tx_hash_hex -@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,)) +@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task) def sync_tx(self, tx_hash_hex, chain_str): queue = self.request.delivery_info['routing_key'] diff --git a/apps/cic-eth/cic_eth/queue/balance.py b/apps/cic-eth/cic_eth/queue/balance.py index 5c035167..d9648e95 100644 --- a/apps/cic-eth/cic_eth/queue/balance.py +++ b/apps/cic-eth/cic_eth/queue/balance.py @@ -14,6 +14,7 @@ from cic_eth.db.enum import ( StatusBits, dead, ) +from cic_eth.task import CriticalSQLAlchemyTask celery_app = celery.current_app @@ -35,7 +36,7 @@ def __balance_outgoing_compatible(token_address, holder_address, chain_str): return delta -@celery_app.task() +@celery_app.task(base=CriticalSQLAlchemyTask) def balance_outgoing(tokens, holder_address, chain_str): """Retrieve accumulated value of unprocessed transactions sent from the given address. @@ -73,7 +74,7 @@ def __balance_incoming_compatible(token_address, receiver_address, chain_str): return delta -@celery_app.task() +@celery_app.task(base=CriticalSQLAlchemyTask) def balance_incoming(tokens, receipient_address, chain_str): """Retrieve accumulated value of unprocessed transactions to be received by the given address. diff --git a/apps/cic-eth/cic_eth/queue/time.py b/apps/cic-eth/cic_eth/queue/time.py index aecb6657..c35d03bb 100644 --- a/apps/cic-eth/cic_eth/queue/time.py +++ b/apps/cic-eth/cic_eth/queue/time.py @@ -10,6 +10,7 @@ from cic_registry.chain import ChainSpec from cic_eth.eth.rpc import RpcClient from cic_eth.db.models.otx import Otx from cic_eth.error import NotLocalTxError +from cic_eth.task import CriticalSQLAlchemyAndWeb3Task celery_app = celery.current_app @@ -17,7 +18,7 @@ logg = logging.getLogger() # TODO: This method does not belong in the _queue_ module, it operates across queue and network -@celery_app.task() +@celery_app.task(base=CriticalSQLAlchemyAndWeb3Task) def tx_times(tx_hash, chain_str): chain_spec = ChainSpec.from_chain_str(chain_str) c = RpcClient(chain_spec) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py index cb012366..71d013fe 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py @@ -13,7 +13,7 @@ from cic_eth.eth.token import unpack_transferfrom from cic_eth.eth.token import ExtendedTx from .base import SyncFilter -logg = logging.getLogger() +logg = logging.getLogger(__name__) transfer_method_signature = '0xa9059cbb' # keccak256(transfer(address,uint256)) transferfrom_method_signature = '0x23b872dd' # keccak256(transferFrom(address,address,uint256)) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py index db6c9d31..8c41bba6 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py @@ -13,7 +13,7 @@ from cic_eth.queue.tx import get_paused_txs from cic_eth.eth.task import create_check_gas_and_send_task from .base import SyncFilter -logg = logging.getLogger() +logg = logging.getLogger(__name__) class GasFilter(SyncFilter): @@ -33,7 +33,7 @@ class GasFilter(SyncFilter): r = q.first() if r == None: - logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex)) + logg.debug('unsolicited gas refill tx {}'.format(tx_hash_hex)) SessionBase.release_session(session) return diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py index d59cdd75..1751187c 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py @@ -8,7 +8,7 @@ from chainlib.eth.address import to_checksum # local imports from .base import SyncFilter -logg = logging.getLogger() +logg = logging.getLogger(__name__) account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' # keccak256(AccountAdded(address,uint256)) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index 5ed7d109..dfd6758f 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -10,9 +10,10 @@ from hexathon import ( # local imports from cic_eth.db.models.otx import Otx from chainsyncer.db.models.base import SessionBase +from chainlib.status import Status from .base import SyncFilter -logg = logging.getLogger() +logg = logging.getLogger(__name__) class TxFilter(SyncFilter): @@ -25,17 +26,17 @@ class TxFilter(SyncFilter): db_session = SessionBase.bind_session(db_session) tx_hash_hex = tx.hash otx = Otx.load(add_0x(tx_hash_hex), session=db_session) - SessionBase.release_session(db_session) if otx == None: logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) return None - logg.info('otx found {}'.format(otx.tx_hash)) + logg.info('local tx match {}'.format(otx.tx_hash)) + SessionBase.release_session(db_session) s = celery.signature( 'cic_eth.queue.tx.set_final_status', [ - tx_hash_hex, - rcpt.blockNumber, - rcpt.status == 0, + add_0x(tx_hash_hex), + tx.block.number, + tx.status == Status.ERROR, ], queue=self.queue, ) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tracker.py b/apps/cic-eth/cic_eth/runnable/daemons/tracker.py index e09205b5..6ecd76f3 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tracker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tracker.py @@ -27,9 +27,11 @@ from hexathon import ( strip_0x, ) from chainsyncer.backend import SyncerBackend -from chainsyncer.driver import HeadSyncer +from chainsyncer.driver import ( + HeadSyncer, + HistorySyncer, + ) from chainsyncer.db.models.base import SessionBase -from chainsyncer.error import LoopDone # local imports from cic_eth.registry import init_registry @@ -101,12 +103,21 @@ def main(): syncer_backends = SyncerBackend.resume(chain_spec, block_offset) if len(syncer_backends) == 0: + logg.info('found no backends to resume') syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset)) + else: + for syncer_backend in syncer_backends: + logg.info('resuming sync session {}'.format(syncer_backend)) + + syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1)) - #block_sync = SyncerBackend.live(chain_spec, block_offset+1) for syncer_backend in syncer_backends: - syncers.append(HeadSyncer(syncer_backend)) - + try: + syncers.append(HistorySyncer(syncer_backend)) + logg.info('Initializing HISTORY syncer on backend {}'.format(syncer_backend)) + except AttributeError: + logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend)) + syncers.append(HeadSyncer(syncer_backend)) trusted_addresses_src = config.get('CIC_TRUST_ADDRESS') if trusted_addresses_src == None: @@ -142,10 +153,8 @@ def main(): for cf in callback_filters: syncer.add_filter(cf) - try: - syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn) - except LoopDone as e: - sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e)) + r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn) + sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) i += 1 diff --git a/apps/cic-eth/cic_eth/task.py b/apps/cic-eth/cic_eth/task.py index 233a2c90..7e02fdb6 100644 --- a/apps/cic-eth/cic_eth/task.py +++ b/apps/cic-eth/cic_eth/task.py @@ -1,13 +1,33 @@ +# import +import requests + # external imports import celery import sqlalchemy -class CriticalSQLAlchemyTask(celery.Task): +class CriticalTask(celery.Task): + retry_jitter = True + retry_backoff = True + retry_backoff_max = 8 + + +class CriticalSQLAlchemyTask(CriticalTask): autoretry_for = ( sqlalchemy.exc.DatabaseError, sqlalchemy.exc.TimeoutError, ) - retry_jitter = True - retry_backoff = True - retry_backoff_max = 8 + + +class CriticalWeb3Task(CriticalTask): + autoretry_for = ( + requests.exceptions.ConnectionError, + ) + + +class CriticalSQLAlchemyAndWeb3Task(CriticalTask): + autoretry_for = ( + sqlalchemy.exc.DatabaseError, + sqlalchemy.exc.TimeoutError, + requests.exceptions.ConnectionError, + ) diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index fb55ede8..be5c2da2 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -20,5 +20,5 @@ moolb~=0.1.1b2 eth-address-index~=0.1.0a8 chainlib~=0.0.1a17 hexathon~=0.0.1a3 -chainsyncer~=0.0.1a15 +chainsyncer~=0.0.1a16 cic-base==0.1.1a3 diff --git a/apps/cic-eth/tests/tasks/test_gas_tasks.py b/apps/cic-eth/tests/tasks/test_gas_tasks.py index c3fa07c7..157e4d2d 100644 --- a/apps/cic-eth/tests/tasks/test_gas_tasks.py +++ b/apps/cic-eth/tests/tasks/test_gas_tasks.py @@ -37,7 +37,7 @@ def test_refill_gas( eth_empty_accounts, ): - provider_address = AccountRole.get_address('GAS_GIFTER') + provider_address = AccountRole.get_address('GAS_GIFTER', init_database) receiver_address = eth_empty_accounts[0] c = init_rpc @@ -93,7 +93,7 @@ def test_refill_deduplication( eth_empty_accounts, ): - provider_address = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS') + provider_address = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS', init_database) receiver_address = eth_empty_accounts[0] c = init_rpc diff --git a/apps/cic-eth/tests/tasks/test_states.py b/apps/cic-eth/tests/tasks/test_states.py index 94dbaba5..6ae88495 100644 --- a/apps/cic-eth/tests/tasks/test_states.py +++ b/apps/cic-eth/tests/tasks/test_states.py @@ -27,14 +27,14 @@ def test_states_initial( tx = { 'from': init_w3.eth.accounts[0], 'to': init_w3.eth.accounts[1], - 'nonce': 42, + 'nonce': 13, 'gas': 21000, 'gasPrice': 1000000, 'value': 128, - 'chainId': 666, + 'chainId': 42, 'data': '', } - (tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'Foo:666', None) + (tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'foo:bar:42', None) otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first() assert otx.status == StatusEnum.PENDING.value @@ -43,7 +43,7 @@ def test_states_initial( 'cic_eth.eth.tx.check_gas', [ [tx_hash_hex], - 'Foo:666', + 'foo:bar:42', [tx_raw_signed_hex], init_w3.eth.accounts[0], 8000000, @@ -67,7 +67,7 @@ def test_states_initial( 'cic_eth.eth.tx.check_gas', [ [tx_hash_hex], - 'Foo:666', + 'foo:bar:42', [tx_raw_signed_hex], init_w3.eth.accounts[0], 8000000, @@ -94,14 +94,14 @@ def test_states_failed( tx = { 'from': init_w3.eth.accounts[0], 'to': init_w3.eth.accounts[1], - 'nonce': 42, + 'nonce': 13, 'gas': 21000, 'gasPrice': 1000000, 'value': 128, - 'chainId': 666, + 'chainId': 42, 'data': '', } - (tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'Foo:666', None) + (tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'foo:bar:42', None) otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first() otx.sendfail(session=init_database) @@ -112,7 +112,7 @@ def test_states_failed( 'cic_eth.eth.tx.check_gas', [ [tx_hash_hex], - 'Foo:666', + 'foo:bar:42', [tx_raw_signed_hex], init_w3.eth.accounts[0], 8000000, diff --git a/apps/cic-eth/tests/unit/api/test_callback.py b/apps/cic-eth/tests/unit/api/test_callback.py index 1d6440fd..7b71405f 100644 --- a/apps/cic-eth/tests/unit/api/test_callback.py +++ b/apps/cic-eth/tests/unit/api/test_callback.py @@ -67,7 +67,7 @@ def test_callback_tcp( logg.debug('recived {} '.format(data)) o = json.loads(echo) try: - assert o == data + assert o['result'] == data except Exception as e: self.exception = e @@ -130,7 +130,7 @@ def test_callback_redis( o = json.loads(echo['data']) logg.debug('recived {} '.format(o)) try: - assert o == data + assert o['result'] == data except Exception as e: self.exception = e diff --git a/apps/cic-eth/tests/unit/db/test_role.py b/apps/cic-eth/tests/unit/db/test_role.py index 67db1041..ff6fadce 100644 --- a/apps/cic-eth/tests/unit/db/test_role.py +++ b/apps/cic-eth/tests/unit/db/test_role.py @@ -9,18 +9,18 @@ def test_db_role( foo = AccountRole.set('foo', eth_empty_accounts[0]) init_database.add(foo) init_database.commit() - assert AccountRole.get_address('foo') == eth_empty_accounts[0] + assert AccountRole.get_address('foo', init_database) == eth_empty_accounts[0] bar = AccountRole.set('bar', eth_empty_accounts[1]) init_database.add(bar) init_database.commit() - assert AccountRole.get_address('bar') == eth_empty_accounts[1] + assert AccountRole.get_address('bar', init_database) == eth_empty_accounts[1] foo = AccountRole.set('foo', eth_empty_accounts[2]) init_database.add(foo) init_database.commit() - assert AccountRole.get_address('foo') == eth_empty_accounts[2] - assert AccountRole.get_address('bar') == eth_empty_accounts[1] + assert AccountRole.get_address('foo', init_database) == eth_empty_accounts[2] + assert AccountRole.get_address('bar', init_database) == eth_empty_accounts[1] tag = AccountRole.role_for(eth_empty_accounts[2]) assert tag == 'foo' diff --git a/apps/cic-eth/tests/unit/db/test_tx.py b/apps/cic-eth/tests/unit/db/test_tx.py index 775f7440..28f0e51a 100644 --- a/apps/cic-eth/tests/unit/db/test_tx.py +++ b/apps/cic-eth/tests/unit/db/test_tx.py @@ -26,7 +26,7 @@ def test_set( 'data': '', 'chainId': 1, } - (tx_hash, tx_signed) = sign_tx(tx_def, 'Foo:1') + (tx_hash, tx_signed) = sign_tx(tx_def, 'foo:bar:1') otx = Otx( tx_def['nonce'], tx_def['from'], @@ -82,7 +82,7 @@ def test_clone( 'data': '', 'chainId': 1, } - (tx_hash, tx_signed) = sign_tx(tx_def, 'Foo:1') + (tx_hash, tx_signed) = sign_tx(tx_def, 'foo:bar:1') otx = Otx( tx_def['nonce'], tx_def['from'], diff --git a/apps/cic-eth/tests/unit/eth/test_raw.py b/apps/cic-eth/tests/unit/eth/test_raw.py index dc2f24bf..acb2858e 100644 --- a/apps/cic-eth/tests/unit/eth/test_raw.py +++ b/apps/cic-eth/tests/unit/eth/test_raw.py @@ -14,11 +14,11 @@ def test_unpack( 'gas': 21000, 'gasPrice': 200000000, 'data': '0x', - 'chainId': 8995, + 'chainId': 42, } - (tx_hash, tx_raw) = sign_tx(tx, 'Foo:8995') + (tx_hash, tx_raw) = sign_tx(tx, 'foo:bar:42') - tx_recovered = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), 8995) + tx_recovered = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), 42) assert tx_hash == tx_recovered['hash'] diff --git a/apps/cic-eth/tests/unit/util/test_unpack.py b/apps/cic-eth/tests/unit/util/test_unpack.py index 9ad9adaa..edca0d5d 100644 --- a/apps/cic-eth/tests/unit/util/test_unpack.py +++ b/apps/cic-eth/tests/unit/util/test_unpack.py @@ -7,24 +7,22 @@ def test_unpack( ): tx = { - 'nonce': 42, + 'nonce': 13, 'from': init_w3_conn.eth.accounts[0], 'to': init_w3_conn.eth.accounts[1], 'data': '0xdeadbeef', 'value': 1024, 'gas': 23000, 'gasPrice': 1422521, - 'chainId': 1337, + 'chainId': 42, } - (tx_hash, tx_signed) = sign_tx(tx, 'Foo:1337') + (tx_hash, tx_signed) = sign_tx(tx, 'foo:bar:42') - tx_unpacked = unpack_signed_raw_tx_hex(tx_signed, 1337) + tx_unpacked = unpack_signed_raw_tx_hex(tx_signed, 42) for k in tx.keys(): assert tx[k] == tx_unpacked[k] - tx_str = tx_hex_string(tx_signed, 1337) - assert tx_str == 'tx nonce 42 from 0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf to 0x2B5AD5c4795c026514f8317c7a215E218DcCD6cF hash 0xe5aba32b1a7255d035faccb70cd8bb92c8c4a2f6bbea3f655bc5a8b802bbaa91' - - + tx_str = tx_hex_string(tx_signed, 42) + assert tx_str == 'tx nonce 13 from 0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf to 0x2B5AD5c4795c026514f8317c7a215E218DcCD6cF hash 0x23ba3c2b400fbddcacc77d99644bfb17ac4653a69bfa46e544801fbd841b8f1e'