Update chain syncer and chain lib

This commit is contained in:
nolash 2021-02-24 12:53:08 +01:00
parent 435bfa88f1
commit a0d405f582
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
15 changed files with 160 additions and 79 deletions

View File

@ -79,6 +79,13 @@ class Otx(SessionBase):
return r
def __status_not_set(self, status):
r = not(self.status & status)
if r:
logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash))
return r
def set_block(self, block, session=None):
"""Set block number transaction was mined in.
@ -320,6 +327,32 @@ class Otx(SessionBase):
SessionBase.release_session(session)
def dequeue(self, session=None):
"""Marks that a process to execute send attempt is underway
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_not_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__reset_status(StatusBits.QUEUED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def minefail(self, block, session=None):
"""Marks that transaction was mined but code execution did not succeed.
@ -373,18 +406,6 @@ class Otx(SessionBase):
else:
self.__set_status(StatusEnum.OBSOLETED, session)
# if confirmed:
# if self.status != StatusEnum.OBSOLETED:
# logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.CANCELLED, session)
# elif self.status != StatusEnum.OBSOLETED:
# if self.status > StatusEnum.SENT:
# logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.OBSOLETED, session)
if self.tracing:
self.__state_log(session=session)

View File

@ -8,6 +8,7 @@ from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from erc20_single_shot_faucet import Faucet
from cic_registry import zero_address
from hexathon import strip_0x
# local import
from cic_eth.eth import RpcClient
@ -102,11 +103,12 @@ def unpack_register(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != '0a3b0a4f':
raise ValueError('Invalid account index register data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
@ -121,11 +123,12 @@ def unpack_gift(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != '63e4bff4':
raise ValueError('Invalid account index register data ({})'.format(f))
raise ValueError('Invalid gift data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}

View File

@ -8,6 +8,7 @@ import web3
from cic_registry import CICRegistry
from cic_registry import zero_address
from cic_registry.chain import ChainSpec
from hexathon import strip_0x
# platform imports
from cic_eth.db.models.tx import TxCache
@ -124,11 +125,12 @@ def unpack_transfer(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['transfer']:
raise ValueError('Invalid transfer data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
@ -144,11 +146,12 @@ def unpack_transferfrom(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['transferfrom']:
raise ValueError('Invalid transferFrom data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'from': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'to': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
@ -165,11 +168,12 @@ def unpack_approve(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['approve']:
raise ValueError('Invalid approval data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)

View File

@ -311,6 +311,24 @@ def set_ready(tx_hash):
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_dequeue(tx_hash):
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.dequeue(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_waitforgas(tx_hash):
"""Used to set the status when a transaction must be deferred due to gas refill

View File

@ -24,12 +24,18 @@ from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import StatusBits
from cic_eth.db.enum import LockEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import get_upcoming_tx
from cic_eth.queue.tx import (
get_upcoming_tx,
set_dequeue,
)
from cic_eth.admin.ctrl import lock_send
from cic_eth.sync.error import LoopDone
from cic_eth.eth.tx import send as task_tx_send
from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError
from cic_eth.error import (
PermanentTxError,
TemporaryTxError,
NotLocalTxError,
)
from cic_eth.eth.util import unpack_signed_raw_tx_hex
logging.basicConfig(level=logging.WARNING)
@ -111,6 +117,11 @@ class DispatchSyncer:
tx_raw = txs[k]
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
try:
set_dequeue(tx['hash'])
except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[

View File

@ -5,19 +5,23 @@ import logging
import web3
import celery
from cic_registry.error import UnknownContractError
from chainlib.status import Status as TxStatus
# local imports
from .base import SyncFilter
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.token import (
unpack_transfer,
unpack_transferfrom,
)
from cic_eth.eth.account import unpack_gift
from cic_eth.eth.token import ExtendedTx
from .base import SyncFilter
logg = logging.getLogger(__name__)
transfer_method_signature = '0xa9059cbb' # keccak256(transfer(address,uint256))
transferfrom_method_signature = '0x23b872dd' # keccak256(transferFrom(address,address,uint256))
giveto_method_signature = '0x63e4bff4' # keccak256(giveTo(address))
transfer_method_signature = 'a9059cbb' # keccak256(transfer(address,uint256))
transferfrom_method_signature = '23b872dd' # keccak256(transferFrom(address,address,uint256))
giveto_method_signature = '63e4bff4' # keccak256(giveTo(address))
class CallbackFilter(SyncFilter):
@ -51,35 +55,44 @@ class CallbackFilter(SyncFilter):
# )
# s_translate.link(s)
# s_translate.apply_async()
s.apply_async()
t = s.apply_async()
return s
def parse_data(self, tx):
transfer_type = 'transfer'
#transfer_type = 'transfer'
transfer_type = None
transfer_data = None
method_signature = tx.payload[:10]
logg.debug('have payload {}'.format(tx.payload))
method_signature = tx.payload[:8]
if tx.status == TxStatus.ERROR:
logg.error('tx {} has failed, no callbacks will be called'.format(tx.hash))
if method_signature == transfer_method_signature:
transfer_data = unpack_transfer(tx.payload)
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
else:
logg.debug('tx status {}'.format(tx.status))
if method_signature == transfer_method_signature:
transfer_data = unpack_transfer(tx.payload)
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
elif method_signature == transferfrom_method_signature:
transfer_type = 'transferfrom'
transfer_data = unpack_transferfrom(tx.payload)
transfer_data['token_address'] = tx['to']
elif method_signature == transferfrom_method_signature:
transfer_type = 'transferfrom'
transfer_data = unpack_transferfrom(tx.payload)
transfer_data['token_address'] = tx['to']
# TODO: do not rely on logs here
elif method_signature == giveto_method_signature:
transfer_type = 'tokengift'
transfer_data = unpack_gift(tx.payload)
for l in tx.logs:
if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=l.data)
token_address_bytes = l.topics[2][32-20:]
transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex())
transfer_data['from'] = tx.to
# TODO: do not rely on logs here
elif method_signature == giveto_method_signature:
transfer_type = 'tokengift'
transfer_data = unpack_gift(tx.payload)
for l in tx.logs:
if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=l.data)
token_address_bytes = l.topics[2][32-20:]
transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex())
transfer_data['from'] = tx.to
logg.debug('resolved method {}'.format(transfer_type))
return (transfer_type, transfer_data)
@ -88,29 +101,31 @@ class CallbackFilter(SyncFilter):
chain_str = str(self.chain_spec)
transfer_data = None
transfer_type = None
try:
transfer_data = self.parse_data(tx)
(transfer_type, transfer_data) = self.parse_data(tx)
except TypeError:
logg.debug('invalid method data length for tx {}'.format(tx.hash))
return
transfer_data = None
if len(tx.payload) < 10:
if len(tx.payload) < 8:
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx.hash))
return
logg.debug('checking callbacks filter input {}'.format(tx.payload[:10]))
logg.debug('checking callbacks filter input {}'.format(tx.payload[:8]))
if transfer_data != None:
logg.debug('wtfoo {}'.format(transfer_data))
token_symbol = None
result = None
try:
tokentx = ExtendedTx(self.chain_spec)
tokentx = ExtendedTx(tx.hash, self.chain_spec)
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses)
tokentx.set_tokens(transfer_data['token_address'], transfer_data['value'])
self.call_back(tokentx.to_dict())
t = self.call_back(tokentx.to_dict())
logg.info('callback success task id {} tx {}'.format(t, tx.hash))
except UnknownContractError:
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash.hex()))
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash))
def __str__(self):

View File

@ -3,6 +3,7 @@ import logging
# third-party imports
from cic_registry.chain import ChainSpec
from hexathon import add_0x
# local imports
from cic_eth.db.enum import StatusBits
@ -18,12 +19,13 @@ logg = logging.getLogger(__name__)
class GasFilter(SyncFilter):
def __init__(self, queue=None):
def __init__(self, chain_spec, queue=None):
self.queue = queue
self.chain_spec = chain_spec
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
tx_hash_hex = tx.hash
tx_hash_hex = add_0x(tx.hash)
if tx.value > 0:
logg.debug('gas refill tx {}'.format(tx_hash_hex))
session = SessionBase.bind_session(session)
@ -37,16 +39,15 @@ class GasFilter(SyncFilter):
SessionBase.release_session(session)
return
chain_spec = ChainSpec.from_chain_str(chain_str)
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], chain_spec.chain_id(), session=session)
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session)
SessionBase.release_session(session)
logg.info('resuming gas-in-waiting txs for {}'.format(r[0]))
if len(txs) > 0:
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))
s = create_check_gas_and_send_task(
list(txs.values()),
str(chain_str),
str(self.chain_spec),
r[0],
0,
tx_hashes_hex=list(txs.keys()),

View File

@ -4,6 +4,10 @@ import logging
# third-party imports
import celery
from chainlib.eth.address import to_checksum
from hexathon import (
add_0x,
strip_0x,
)
# local imports
from .base import SyncFilter
@ -22,11 +26,14 @@ class RegistrationFilter(SyncFilter):
def filter(self, conn, block, tx, db_session=None):
registered_address = None
logg.debug('register filter checking log {}'.format(tx.logs))
for l in tx.logs:
event_topic_hex = l['topics'][0]
if event_topic_hex == account_registry_add_log_hash:
address_hex = l['topics'][1][32-20:]
address = to_checksum(address_hex)
# TODO: use abi conversion method instead
address_hex = strip_0x(l['topics'][1])[64-40:]
address = to_checksum(add_0x(address_hex))
logg.debug('request token gift to {}'.format(address))
s = celery.signature(
'cic_eth.eth.account.gift',

View File

@ -18,8 +18,9 @@ logg = logging.getLogger(__name__)
class TxFilter(SyncFilter):
def __init__(self, queue):
def __init__(self, chain_spec, queue):
self.queue = queue
self.chain_spec = chain_spec
def filter(self, conn, block, tx, db_session=None):

View File

@ -137,11 +137,11 @@ def main():
callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue)
callback_filters.append(callback_filter)
tx_filter = TxFilter(config.get('_CELERY_QUEUE'))
tx_filter = TxFilter(chain_spec, config.get('_CELERY_QUEUE'))
registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE'))
gas_filter = GasFilter(config.get('_CELERY_QUEUE'))
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
i = 0
for syncer in syncers:

View File

@ -10,7 +10,7 @@ version = (
0,
10,
0,
'alpha.32',
'alpha.33',
)
version_object = semver.VersionInfo(

View File

@ -18,7 +18,7 @@ eth-gas-proxy==0.0.1a4
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.0a8
chainlib~=0.0.1a17
chainlib~=0.0.1a18
hexathon~=0.0.1a3
chainsyncer~=0.0.1a17
cic-base==0.1.1a4
chainsyncer~=0.0.1a18
cic-base==0.1.1a5

View File

@ -1,3 +1,3 @@
cic-base[full]==0.1.1a4
cic-eth==0.10.0a32
cic-base[full]==0.1.1a5
cic-eth==0.10.0a33
cic-types==0.1.0a8

View File

@ -31,7 +31,7 @@ set -e
set -a
# We need to not install these here...
pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a32 chainlib==0.0.1a17 cic-contracts==0.0.2a2
pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a33 chainlib==0.0.1a18 cic-contracts==0.0.2a2
>&2 echo "create account for gas gifter"
old_gas_provider=$DEV_ETH_ACCOUNT_GAS_PROVIDER

View File

@ -42,6 +42,6 @@ rlp==2.0.1
cryptocurrency-cli-tools==0.0.4
giftable-erc20-token==0.0.7b12
hexathon==0.0.1a3
chainlib==0.0.1a17
chainsyncer==0.0.1a17
chainlib==0.0.1a18
chainsyncer==0.0.1a18
cic-registry==0.5.3.a21