From a0d405f582b6713389692f1c89fb5b6926510978 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 24 Feb 2021 12:53:08 +0100 Subject: [PATCH] Update chain syncer and chain lib --- apps/cic-eth/cic_eth/db/models/otx.py | 45 ++++++++--- apps/cic-eth/cic_eth/eth/account.py | 13 +-- apps/cic-eth/cic_eth/eth/token.py | 16 ++-- apps/cic-eth/cic_eth/queue/tx.py | 18 +++++ .../cic_eth/runnable/daemons/dispatcher.py | 17 +++- .../runnable/daemons/filters/callback.py | 81 +++++++++++-------- .../cic_eth/runnable/daemons/filters/gas.py | 13 +-- .../runnable/daemons/filters/register.py | 11 ++- .../cic_eth/runnable/daemons/filters/tx.py | 3 +- .../cic_eth/runnable/daemons/tracker.py | 4 +- apps/cic-eth/cic_eth/version.py | 2 +- apps/cic-eth/requirements.txt | 6 +- .../scripts/requirements.txt | 4 +- apps/contract-migration/seed_cic_eth.sh | 2 +- apps/requirements.txt | 4 +- 15 files changed, 160 insertions(+), 79 deletions(-) diff --git a/apps/cic-eth/cic_eth/db/models/otx.py b/apps/cic-eth/cic_eth/db/models/otx.py index 0816d846..523ae2e0 100644 --- a/apps/cic-eth/cic_eth/db/models/otx.py +++ b/apps/cic-eth/cic_eth/db/models/otx.py @@ -79,6 +79,13 @@ class Otx(SessionBase): return r + def __status_not_set(self, status): + r = not(self.status & status) + if r: + logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash)) + return r + + def set_block(self, block, session=None): """Set block number transaction was mined in. @@ -320,6 +327,32 @@ class Otx(SessionBase): SessionBase.release_session(session) + def dequeue(self, session=None): + """Marks that a process to execute send attempt is underway + + Only manipulates object, does not transaction or commit to backend. + + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + if self.__status_not_set(StatusBits.QUEUED): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if self.status & StatusBits.IN_NETWORK: + raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) + + self.__reset_status(StatusBits.QUEUED, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def minefail(self, block, session=None): """Marks that transaction was mined but code execution did not succeed. @@ -373,18 +406,6 @@ class Otx(SessionBase): else: self.__set_status(StatusEnum.OBSOLETED, session) - -# if confirmed: -# if self.status != StatusEnum.OBSOLETED: -# logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name)) -# #raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name)) -# self.__set_status(StatusEnum.CANCELLED, session) -# elif self.status != StatusEnum.OBSOLETED: -# if self.status > StatusEnum.SENT: -# logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name)) -# #raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name)) - # self.__set_status(StatusEnum.OBSOLETED, session) - if self.tracing: self.__state_log(session=session) diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index 37ac0c26..61facebe 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -8,6 +8,7 @@ from cic_registry import CICRegistry from cic_registry.chain import ChainSpec from erc20_single_shot_faucet import Faucet from cic_registry import zero_address +from hexathon import strip_0x # local import from cic_eth.eth import RpcClient @@ -102,11 +103,12 @@ def unpack_register(data): :returns: Parsed parameters :rtype: dict """ - f = data[2:10] + data = strip_0x(data) + f = data[:8] if f != '0a3b0a4f': raise ValueError('Invalid account index register data ({})'.format(f)) - d = data[10:] + d = data[8:] return { 'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]), } @@ -121,11 +123,12 @@ def unpack_gift(data): :returns: Parsed parameters :rtype: dict """ - f = data[2:10] + data = strip_0x(data) + f = data[:8] if f != '63e4bff4': - raise ValueError('Invalid account index register data ({})'.format(f)) + raise ValueError('Invalid gift data ({})'.format(f)) - d = data[10:] + d = data[8:] return { 'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]), } diff --git a/apps/cic-eth/cic_eth/eth/token.py b/apps/cic-eth/cic_eth/eth/token.py index 13eef37e..8f3e8c6e 100644 --- a/apps/cic-eth/cic_eth/eth/token.py +++ b/apps/cic-eth/cic_eth/eth/token.py @@ -8,6 +8,7 @@ import web3 from cic_registry import CICRegistry from cic_registry import zero_address from cic_registry.chain import ChainSpec +from hexathon import strip_0x # platform imports from cic_eth.db.models.tx import TxCache @@ -124,11 +125,12 @@ def unpack_transfer(data): :returns: Parsed parameters :rtype: dict """ - f = data[2:10] + data = strip_0x(data) + f = data[:8] if f != contract_function_signatures['transfer']: raise ValueError('Invalid transfer data ({})'.format(f)) - d = data[10:] + d = data[8:] return { 'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]), 'amount': int(d[64:], 16) @@ -144,11 +146,12 @@ def unpack_transferfrom(data): :returns: Parsed parameters :rtype: dict """ - f = data[2:10] + data = strip_0x(data) + f = data[:8] if f != contract_function_signatures['transferfrom']: raise ValueError('Invalid transferFrom data ({})'.format(f)) - d = data[10:] + d = data[8:] return { 'from': web3.Web3.toChecksumAddress('0x' + d[64-40:64]), 'to': web3.Web3.toChecksumAddress('0x' + d[128-40:128]), @@ -165,11 +168,12 @@ def unpack_approve(data): :returns: Parsed parameters :rtype: dict """ - f = data[2:10] + data = strip_0x(data) + f = data[:8] if f != contract_function_signatures['approve']: raise ValueError('Invalid approval data ({})'.format(f)) - d = data[10:] + d = data[8:] return { 'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]), 'amount': int(d[64:], 16) diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 00e1484d..29136348 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -311,6 +311,24 @@ def set_ready(tx_hash): return tx_hash +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_dequeue(tx_hash): + session = SessionBase.create_session() + o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + if o == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + + session.flush() + + o.dequeue(session=session) + session.commit() + session.close() + + return tx_hash + + + @celery_app.task(base=CriticalSQLAlchemyTask) def set_waitforgas(tx_hash): """Used to set the status when a transaction must be deferred due to gas refill diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index c5954977..98decc5c 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -24,12 +24,18 @@ from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import StatusBits from cic_eth.db.enum import LockEnum from cic_eth.db import dsn_from_config -from cic_eth.queue.tx import get_upcoming_tx +from cic_eth.queue.tx import ( + get_upcoming_tx, + set_dequeue, + ) from cic_eth.admin.ctrl import lock_send from cic_eth.sync.error import LoopDone from cic_eth.eth.tx import send as task_tx_send -from cic_eth.error import PermanentTxError -from cic_eth.error import TemporaryTxError +from cic_eth.error import ( + PermanentTxError, + TemporaryTxError, + NotLocalTxError, + ) from cic_eth.eth.util import unpack_signed_raw_tx_hex logging.basicConfig(level=logging.WARNING) @@ -110,6 +116,11 @@ class DispatchSyncer: for k in txs.keys(): tx_raw = txs[k] tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id()) + + try: + set_dequeue(tx['hash']) + except NotLocalTxError as e: + logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash'])) s_check = celery.signature( 'cic_eth.admin.ctrl.check_lock', diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py index 71d013fe..f0426b5d 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py @@ -5,19 +5,23 @@ import logging import web3 import celery from cic_registry.error import UnknownContractError +from chainlib.status import Status as TxStatus # local imports from .base import SyncFilter -from cic_eth.eth.token import unpack_transfer -from cic_eth.eth.token import unpack_transferfrom +from cic_eth.eth.token import ( + unpack_transfer, + unpack_transferfrom, + ) +from cic_eth.eth.account import unpack_gift from cic_eth.eth.token import ExtendedTx from .base import SyncFilter logg = logging.getLogger(__name__) -transfer_method_signature = '0xa9059cbb' # keccak256(transfer(address,uint256)) -transferfrom_method_signature = '0x23b872dd' # keccak256(transferFrom(address,address,uint256)) -giveto_method_signature = '0x63e4bff4' # keccak256(giveTo(address)) +transfer_method_signature = 'a9059cbb' # keccak256(transfer(address,uint256)) +transferfrom_method_signature = '23b872dd' # keccak256(transferFrom(address,address,uint256)) +giveto_method_signature = '63e4bff4' # keccak256(giveTo(address)) class CallbackFilter(SyncFilter): @@ -51,35 +55,44 @@ class CallbackFilter(SyncFilter): # ) # s_translate.link(s) # s_translate.apply_async() - s.apply_async() + t = s.apply_async() + return s def parse_data(self, tx): - transfer_type = 'transfer' + #transfer_type = 'transfer' + transfer_type = None transfer_data = None - method_signature = tx.payload[:10] + logg.debug('have payload {}'.format(tx.payload)) + method_signature = tx.payload[:8] + if tx.status == TxStatus.ERROR: + logg.error('tx {} has failed, no callbacks will be called'.format(tx.hash)) - if method_signature == transfer_method_signature: - transfer_data = unpack_transfer(tx.payload) - transfer_data['from'] = tx['from'] - transfer_data['token_address'] = tx['to'] + else: + logg.debug('tx status {}'.format(tx.status)) + if method_signature == transfer_method_signature: + transfer_data = unpack_transfer(tx.payload) + transfer_data['from'] = tx['from'] + transfer_data['token_address'] = tx['to'] - elif method_signature == transferfrom_method_signature: - transfer_type = 'transferfrom' - transfer_data = unpack_transferfrom(tx.payload) - transfer_data['token_address'] = tx['to'] + elif method_signature == transferfrom_method_signature: + transfer_type = 'transferfrom' + transfer_data = unpack_transferfrom(tx.payload) + transfer_data['token_address'] = tx['to'] - # TODO: do not rely on logs here - elif method_signature == giveto_method_signature: - transfer_type = 'tokengift' - transfer_data = unpack_gift(tx.payload) - for l in tx.logs: - if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': - transfer_data['value'] = web3.Web3.toInt(hexstr=l.data) - token_address_bytes = l.topics[2][32-20:] - transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex()) - transfer_data['from'] = tx.to + # TODO: do not rely on logs here + elif method_signature == giveto_method_signature: + transfer_type = 'tokengift' + transfer_data = unpack_gift(tx.payload) + for l in tx.logs: + if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': + transfer_data['value'] = web3.Web3.toInt(hexstr=l.data) + token_address_bytes = l.topics[2][32-20:] + transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex()) + transfer_data['from'] = tx.to + + logg.debug('resolved method {}'.format(transfer_type)) return (transfer_type, transfer_data) @@ -88,29 +101,31 @@ class CallbackFilter(SyncFilter): chain_str = str(self.chain_spec) transfer_data = None + transfer_type = None try: - transfer_data = self.parse_data(tx) + (transfer_type, transfer_data) = self.parse_data(tx) except TypeError: logg.debug('invalid method data length for tx {}'.format(tx.hash)) return - transfer_data = None - if len(tx.payload) < 10: + if len(tx.payload) < 8: logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx.hash)) return - logg.debug('checking callbacks filter input {}'.format(tx.payload[:10])) + logg.debug('checking callbacks filter input {}'.format(tx.payload[:8])) if transfer_data != None: + logg.debug('wtfoo {}'.format(transfer_data)) token_symbol = None result = None try: - tokentx = ExtendedTx(self.chain_spec) + tokentx = ExtendedTx(tx.hash, self.chain_spec) tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses) tokentx.set_tokens(transfer_data['token_address'], transfer_data['value']) - self.call_back(tokentx.to_dict()) + t = self.call_back(tokentx.to_dict()) + logg.info('callback success task id {} tx {}'.format(t, tx.hash)) except UnknownContractError: - logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash.hex())) + logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash)) def __str__(self): diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py index 8c41bba6..697b30f0 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py @@ -3,6 +3,7 @@ import logging # third-party imports from cic_registry.chain import ChainSpec +from hexathon import add_0x # local imports from cic_eth.db.enum import StatusBits @@ -18,12 +19,13 @@ logg = logging.getLogger(__name__) class GasFilter(SyncFilter): - def __init__(self, queue=None): + def __init__(self, chain_spec, queue=None): self.queue = queue + self.chain_spec = chain_spec def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None): - tx_hash_hex = tx.hash + tx_hash_hex = add_0x(tx.hash) if tx.value > 0: logg.debug('gas refill tx {}'.format(tx_hash_hex)) session = SessionBase.bind_session(session) @@ -37,16 +39,15 @@ class GasFilter(SyncFilter): SessionBase.release_session(session) return - chain_spec = ChainSpec.from_chain_str(chain_str) - txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], chain_spec.chain_id(), session=session) + txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session) SessionBase.release_session(session) + logg.info('resuming gas-in-waiting txs for {}'.format(r[0])) if len(txs) > 0: - logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys())) s = create_check_gas_and_send_task( list(txs.values()), - str(chain_str), + str(self.chain_spec), r[0], 0, tx_hashes_hex=list(txs.keys()), diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py index 1751187c..2d0a8349 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py @@ -4,6 +4,10 @@ import logging # third-party imports import celery from chainlib.eth.address import to_checksum +from hexathon import ( + add_0x, + strip_0x, + ) # local imports from .base import SyncFilter @@ -22,11 +26,14 @@ class RegistrationFilter(SyncFilter): def filter(self, conn, block, tx, db_session=None): registered_address = None + logg.debug('register filter checking log {}'.format(tx.logs)) for l in tx.logs: event_topic_hex = l['topics'][0] if event_topic_hex == account_registry_add_log_hash: - address_hex = l['topics'][1][32-20:] - address = to_checksum(address_hex) + # TODO: use abi conversion method instead + + address_hex = strip_0x(l['topics'][1])[64-40:] + address = to_checksum(add_0x(address_hex)) logg.debug('request token gift to {}'.format(address)) s = celery.signature( 'cic_eth.eth.account.gift', diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index dfd6758f..dbee5025 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -18,8 +18,9 @@ logg = logging.getLogger(__name__) class TxFilter(SyncFilter): - def __init__(self, queue): + def __init__(self, chain_spec, queue): self.queue = queue + self.chain_spec = chain_spec def filter(self, conn, block, tx, db_session=None): diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tracker.py b/apps/cic-eth/cic_eth/runnable/daemons/tracker.py index 6ecd76f3..47bbc1f7 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tracker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tracker.py @@ -137,11 +137,11 @@ def main(): callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue) callback_filters.append(callback_filter) - tx_filter = TxFilter(config.get('_CELERY_QUEUE')) + tx_filter = TxFilter(chain_spec, config.get('_CELERY_QUEUE')) registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE')) - gas_filter = GasFilter(config.get('_CELERY_QUEUE')) + gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE')) i = 0 for syncer in syncers: diff --git a/apps/cic-eth/cic_eth/version.py b/apps/cic-eth/cic_eth/version.py index ac8c25e4..98ee59d8 100644 --- a/apps/cic-eth/cic_eth/version.py +++ b/apps/cic-eth/cic_eth/version.py @@ -10,7 +10,7 @@ version = ( 0, 10, 0, - 'alpha.32', + 'alpha.33', ) version_object = semver.VersionInfo( diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index 0fe45736..7b52306e 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -18,7 +18,7 @@ eth-gas-proxy==0.0.1a4 websocket-client==0.57.0 moolb~=0.1.1b2 eth-address-index~=0.1.0a8 -chainlib~=0.0.1a17 +chainlib~=0.0.1a18 hexathon~=0.0.1a3 -chainsyncer~=0.0.1a17 -cic-base==0.1.1a4 +chainsyncer~=0.0.1a18 +cic-base==0.1.1a5 diff --git a/apps/contract-migration/scripts/requirements.txt b/apps/contract-migration/scripts/requirements.txt index b6b520f5..561c877c 100644 --- a/apps/contract-migration/scripts/requirements.txt +++ b/apps/contract-migration/scripts/requirements.txt @@ -1,3 +1,3 @@ -cic-base[full]==0.1.1a4 -cic-eth==0.10.0a32 +cic-base[full]==0.1.1a5 +cic-eth==0.10.0a33 cic-types==0.1.0a8 diff --git a/apps/contract-migration/seed_cic_eth.sh b/apps/contract-migration/seed_cic_eth.sh index 6eb0d26a..488088e5 100644 --- a/apps/contract-migration/seed_cic_eth.sh +++ b/apps/contract-migration/seed_cic_eth.sh @@ -31,7 +31,7 @@ set -e set -a # We need to not install these here... -pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a32 chainlib==0.0.1a17 cic-contracts==0.0.2a2 +pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a33 chainlib==0.0.1a18 cic-contracts==0.0.2a2 >&2 echo "create account for gas gifter" old_gas_provider=$DEV_ETH_ACCOUNT_GAS_PROVIDER diff --git a/apps/requirements.txt b/apps/requirements.txt index 4208c093..ee777f19 100644 --- a/apps/requirements.txt +++ b/apps/requirements.txt @@ -42,6 +42,6 @@ rlp==2.0.1 cryptocurrency-cli-tools==0.0.4 giftable-erc20-token==0.0.7b12 hexathon==0.0.1a3 -chainlib==0.0.1a17 -chainsyncer==0.0.1a17 +chainlib==0.0.1a18 +chainsyncer==0.0.1a18 cic-registry==0.5.3.a21