Account register test passes

This commit is contained in:
nolash 2021-03-18 19:36:52 +01:00
parent 2f07ea1395
commit 871cbdcaeb
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
14 changed files with 277 additions and 318 deletions

View File

@ -22,8 +22,7 @@ from cic_eth.queue.tx import (
) )
from cic_eth.queue.tx import create as queue_create from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import ( from cic_eth.eth.gas import (
#sign_tx,
create_check_gas_task, create_check_gas_task,
) )

View File

@ -153,7 +153,7 @@ class NonceReservation(SessionBase):
session = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
nonce = NonceReservation.peek(key, session=session) nonce = NonceReservation.peek(address, key, session=session)
q = session.query(NonceReservation) q = session.query(NonceReservation)
q = q.filter(NonceReservation.address_hex==address) q = q.filter(NonceReservation.address_hex==address)

View File

@ -21,11 +21,10 @@ from eth_accounts_index import AccountRegistry
#from cic_eth.eth import RpcClient #from cic_eth.eth import RpcClient
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from cic_eth.eth import registry_extra_identifiers from cic_eth.eth import registry_extra_identifiers
from cic_eth.eth.task import ( from cic_eth.eth.gas import (
register_tx,
create_check_gas_task, create_check_gas_task,
) )
from cic_eth.eth.factory import TxFactory #from cic_eth.eth.factory import TxFactory
from cic_eth.db.models.nonce import Nonce from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole from cic_eth.db.models.role import AccountRole
@ -40,6 +39,12 @@ from cic_eth.task import (
CriticalSQLAlchemyAndSignerTask, CriticalSQLAlchemyAndSignerTask,
BaseTask, BaseTask,
) )
from cic_eth.eth.nonce import (
CustodialTaskNonceOracle,
)
from cic_eth.queue.tx import (
register_tx,
)
#logg = logging.getLogger(__name__) #logg = logging.getLogger(__name__)
logg = logging.getLogger() logg = logging.getLogger()
@ -48,73 +53,73 @@ celery_app = celery.current_app
#celery_app.log.redirect_stdouts_to_logger(logg, loglevel=logging.DEBUG) #celery_app.log.redirect_stdouts_to_logger(logg, loglevel=logging.DEBUG)
class AccountTxFactory(TxFactory): #class AccountTxFactory(TxFactory):
"""Factory for creating account index contract transactions # """Factory for creating account index contract transactions
""" # """
def add( # def add(
self, # self,
address, # address,
chain_spec, # chain_spec,
uuid, # uuid,
session=None, # session=None,
): # ):
"""Register an Ethereum account address with the on-chain account registry # """Register an Ethereum account address with the on-chain account registry
#
:param address: Ethereum account address to add # :param address: Ethereum account address to add
:type address: str, 0x-hex # :type address: str, 0x-hex
:param chain_spec: Chain to build transaction for # :param chain_spec: Chain to build transaction for
:type chain_spec: cic_registry.chain.ChainSpec # :type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "AccountRegistry.add" transaction in standard Ethereum format # :returns: Unsigned "AccountRegistry.add" transaction in standard Ethereum format
:rtype: dict # :rtype: dict
""" # """
#
c = self.registry.get_contract(chain_spec, 'AccountRegistry') # c = self.registry.get_contract(chain_spec, 'AccountRegistry')
f = c.function('add') # f = c.function('add')
tx_add_buildable = f( # tx_add_buildable = f(
address, # address,
) # )
gas = c.gas('add') # gas = c.gas('add')
tx_add = tx_add_buildable.buildTransaction({ # tx_add = tx_add_buildable.buildTransaction({
'from': self.address, # 'from': self.address,
'gas': gas, # 'gas': gas,
'gasPrice': self.gas_price, # 'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(), # 'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(uuid, session=session), # 'nonce': self.next_nonce(uuid, session=session),
'value': 0, # 'value': 0,
}) # })
return tx_add # return tx_add
#
#
def gift( # def gift(
self, # self,
address, # address,
chain_spec, # chain_spec,
uuid, # uuid,
session=None, # session=None,
): # ):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account # """Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
#
:param address: Ethereum account address to gift to # :param address: Ethereum account address to gift to
:type address: str, 0x-hex # :type address: str, 0x-hex
:param chain_spec: Chain to build transaction for # :param chain_spec: Chain to build transaction for
:type chain_spec: cic_registry.chain.ChainSpec # :type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "Faucet.giveTo" transaction in standard Ethereum format # :returns: Unsigned "Faucet.giveTo" transaction in standard Ethereum format
:rtype: dict # :rtype: dict
""" # """
#
c = self.registry.get_contract(chain_spec, 'Faucet') # c = self.registry.get_contract(chain_spec, 'Faucet')
f = c.function('giveTo') # f = c.function('giveTo')
tx_add_buildable = f(address) # tx_add_buildable = f(address)
gas = c.gas('add') # gas = c.gas('add')
tx_add = tx_add_buildable.buildTransaction({ # tx_add = tx_add_buildable.buildTransaction({
'from': self.address, # 'from': self.address,
'gas': gas, # 'gas': gas,
'gasPrice': self.gas_price, # 'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(), # 'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(uuid, session=session), # 'nonce': self.next_nonce(uuid, session=session),
'value': 0, # 'value': 0,
}) # })
return tx_add # return tx_add
def unpack_register(data): def unpack_register(data):
@ -231,8 +236,9 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
# Generate and sign transaction # Generate and sign transaction
rpc_signer = RPCConnection.connect(chain_spec, 'signer') rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = self.create_nonce_oracle(writer_address, rpc) #nonce_oracle = self.create_nonce_oracle(writer_address, rpc)
gas_oracle = self.create_gas_oracle(rpc) nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas)
account_registry = AccountRegistry(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id()) account_registry = AccountRegistry(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
(tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED)
# TODO: if cache task fails, task chain will not return # TODO: if cache task fails, task chain will not return
@ -246,7 +252,8 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
#gas_budget = tx_add['gas'] * tx_add['gasPrice'] #gas_budget = tx_add['gas'] * tx_add['gasPrice']
gas_budget = account_registry.gas(tx_signed_raw_hex) gas_pair = gas_oracle.get_gas(tx_signed_raw_hex)
gas_budget = gas_pair[0] * gas_pair[1]
logg.debug('register user tx {} {} {}'.format(tx_hash_hex, queue, gas_budget)) logg.debug('register user tx {} {} {}'.format(tx_hash_hex, queue, gas_budget))
s = create_check_gas_task( s = create_check_gas_task(
@ -348,7 +355,7 @@ def cache_gift_data(
self, self,
tx_hash_hex, tx_hash_hex,
tx_signed_raw_hex, tx_signed_raw_hex,
chain_str, chain_spec,
): ):
"""Generates and commits transaction cache metadata for a Faucet.giveTo transaction """Generates and commits transaction cache metadata for a Faucet.giveTo transaction
@ -394,7 +401,7 @@ def cache_account_data(
self, self,
tx_hash_hex, tx_hash_hex,
tx_signed_raw_hex, tx_signed_raw_hex,
chain_str, chain_spec,
): ):
"""Generates and commits transaction cache metadata for an AccountsIndex.add transaction """Generates and commits transaction cache metadata for an AccountsIndex.add transaction
@ -408,11 +415,12 @@ def cache_account_data(
:rtype: tuple :rtype: tuple
""" """
chain_spec = ChainSpec.from_chain_str(chain_str) #c = RpcClient(chain_spec)
c = RpcClient(chain_spec) return
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:]) tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) #tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx = unpack(tx_signed_raw_bytes, chain_id=chain_spec.chain_id())
tx_data = unpack_register(tx['data']) tx_data = unpack_register(tx['data'])
session = SessionBase.create_session() session = SessionBase.create_session()

View File

@ -2,6 +2,7 @@
import logging import logging
# external imports # external imports
import celery
from chainlib.eth.gas import price from chainlib.eth.gas import price
from hexathon import strip_0x from hexathon import strip_0x
@ -82,3 +83,50 @@ class GasOracle():
#g = 100 #g = 100
#return g #return g
return self.gas_price_current return self.gas_price_current
def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None):
"""Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher.
If tx_hashes_hex is not spefified, a preceding task chained to check_gas must supply the transaction hashes as its return value.
:param tx_signed_raws_hex: Raw signed transaction data
:type tx_signed_raws_hex: list of str, 0x-hex
:param chain_spec: Chain spec of address to add check gas for
:type chain_spec: chainlib.chain.ChainSpec
:param holder_address: Address sending the transactions
:type holder_address: str, 0x-hex
:param gas: Gas budget hint for transactions
:type gas: int
:param tx_hashes_hex: Transaction hashes
:type tx_hashes_hex: list of str, 0x-hex
:param queue: Task queue
:type queue: str
:returns: Signature of task chain
:rtype: celery.Signature
"""
s_check_gas = None
if tx_hashes_hex != None:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
tx_hashes_hex,
chain_spec.asdict(),
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
else:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
chain_spec.asdict(),
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
return s_check_gas

View File

@ -4,7 +4,7 @@ from cic_eth.db.models.nonce import (
NonceReservation, NonceReservation,
) )
class NonceOracle(): class CustodialTaskNonceOracle():
"""Ensures atomic nonce increments for all transactions across all tasks and threads. """Ensures atomic nonce increments for all transactions across all tasks and threads.
:param address: Address to generate nonces for :param address: Address to generate nonces for
@ -12,20 +12,20 @@ class NonceOracle():
:param default_nonce: Initial nonce value to use if no nonce cache entry already exists :param default_nonce: Initial nonce value to use if no nonce cache entry already exists
:type default_nonce: number :type default_nonce: number
""" """
def __init__(self, address, default_nonce): def __init__(self, address, uuid, session=None):
self.address = address self.address = address
self.default_nonce = default_nonce self.uuid = uuid
self.session = session
def next(self): def get_nonce(self):
return self.next_nonce()
def next_nonce(self):
"""Get next unique nonce. """Get next unique nonce.
:returns: Nonce :returns: Nonce
:rtype: number :rtype: number
""" """
raise AttributeError('this should not be called') return NonceReservation.release(self.address, self.uuid, session=self.session)
return Nonce.next(self.address, self.default_nonce)
def next_by_task_uuid(self, uuid, session=None):
return NonceReservation.release(uuid, session=session)

View File

@ -1,148 +0,0 @@
# standard imports
import logging
# external imports
import sha3
import celery
from chainlib.chain import ChainSpec
from chainlib.eth.sign import sign_transaction
from chainlib.connection import RPCConnection
from chainlib.eth.tx import unpack
from hexathon import (
strip_0x,
add_0x,
)
# local imports
from cic_eth.eth import RpcClient
from cic_eth.queue.tx import create as queue_create
from cic_eth.error import SignerError
celery_app = celery.current_app
logg = celery_app.log.get_default_logger()
#
#@celery_app.task()
#def sign_tx(tx, chain_str):
# """Sign a single transaction against the given chain specification.
#
# :param tx: Transaction in standard Ethereum format
# :type tx: dict
# :param chain_str: Chain spec string representation
# :type chain_str: str
# :returns: Transaction hash and raw signed transaction, respectively
# :rtype: tuple
# """
# chain_spec = ChainSpec.from_chain_str(chain_str)
# #c = RpcClient(chain_spec)
# tx_transfer_signed = None
# conn = RPCConnection.connect('signer')
# try:
# o = sign_transaction(tx)
# tx_transfer_signed = conn.do(o)
# #try:
# # tx_transfer_signed = c.w3.eth.sign_transaction(tx)
# except Exception as e:
# raise SignerError('sign tx {}: {}'.format(tx, e))
# logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed))
# h = sha3.keccak_256()
# h.update(bytes.fromhex(strip_0x(tx_transfer_signed['raw'])))
# tx_hash = h.digest()
# #tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
# tx_hash_hex = add_0x(tx_hash.hex())
# return (tx_hash_hex, tx_transfer_signed['raw'],)
#def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None):
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
:param tx: Standard ethereum transaction data
:type tx: dict
:param chain_spec: Chain spec of transaction to add to queue
:type chain_spec: chainlib.chain.ChainSpec
:param queue: Task queue
:type queue: str
:param cache_task: Cache task to call with signed transaction. If None, no task will be called.
:type cache_task: str
:raises: sqlalchemy.exc.DatabaseError
:returns: Tuple; Transaction hash, signed raw transaction data
:rtype: tuple
"""
#(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
logg.debug('adding queue tx {}'.format(tx_hash_hex))
tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw, chain_id=chain_spec.chain_id())
queue_create(
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
chain_spec,
session=session,
)
if cache_task != None:
logg.debug('adding cache task {} tx {}'.format(cache_task, tx_hash_hex))
s_cache = celery.signature(
cache_task,
[
tx_hash_hex,
tx_signed_raw_hex,
chain_spec.asdict(),
],
queue=queue,
)
s_cache.apply_async()
return (tx_hash_hex, tx_signed_raw_hex,)
# TODO: rename as we will not be sending task in the chain, this is the responsibility of the dispatcher
def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=None, tx_hashes_hex=None, queue=None):
"""Creates a celery task signature for a check_gas task that adds the task to the outgoing queue to be processed by the dispatcher.
If tx_hashes_hex is not spefified, a preceding task chained to check_gas must supply the transaction hashes as its return value.
:param tx_signed_raws_hex: Raw signed transaction data
:type tx_signed_raws_hex: list of str, 0x-hex
:param chain_spec: Chain spec of address to add check gas for
:type chain_spec: chainlib.chain.ChainSpec
:param holder_address: Address sending the transactions
:type holder_address: str, 0x-hex
:param gas: Gas budget hint for transactions
:type gas: int
:param tx_hashes_hex: Transaction hashes
:type tx_hashes_hex: list of str, 0x-hex
:param queue: Task queue
:type queue: str
:returns: Signature of task chain
:rtype: celery.Signature
"""
s_check_gas = None
if tx_hashes_hex != None:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
tx_hashes_hex,
chain_spec.asdict(),
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
else:
s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas',
[
chain_spec.asdict(),
tx_signed_raws_hex,
holder_address,
gas,
],
queue=queue,
)
return s_check_gas

View File

@ -16,15 +16,27 @@ from chainlib.eth.tx import (
transaction, transaction,
receipt, receipt,
raw, raw,
TxFormat,
unpack,
) )
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.hash import keccak256_hex_to_hex from chainlib.hash import keccak256_hex_to_hex
from hexathon import add_0x from chainlib.eth.gas import Gas
from chainlib.eth.contract import (
abi_decode_single,
ABIContractType,
)
from hexathon import (
add_0x,
strip_0x,
)
# local imports # local imports
from .rpc import RpcClient from .rpc import RpcClient
from cic_eth.db import Otx, SessionBase from cic_eth.db import (
Otx,
SessionBase,
)
from cic_eth.db.models.tx import TxCache from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import NonceReservation from cic_eth.db.models.nonce import NonceReservation
from cic_eth.db.models.lock import Lock from cic_eth.db.models.lock import Lock
@ -36,18 +48,18 @@ from cic_eth.db.enum import (
from cic_eth.error import PermanentTxError from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError from cic_eth.error import TemporaryTxError
from cic_eth.error import NotLocalTxError from cic_eth.error import NotLocalTxError
from cic_eth.queue.tx import create as queue_create #from cic_eth.queue.tx import create as queue_create
from cic_eth.queue.tx import get_tx from cic_eth.queue.tx import (
from cic_eth.queue.tx import get_nonce_tx get_tx,
register_tx,
get_nonce_tx,
)
from cic_eth.error import OutOfGasError from cic_eth.error import OutOfGasError
from cic_eth.error import LockedError from cic_eth.error import LockedError
from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.eth.gas import (
from cic_eth.eth.task import (
register_tx,
create_check_gas_task, create_check_gas_task,
#sign_tx,
) )
from cic_eth.eth.nonce import NonceOracle from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.error import ( from cic_eth.error import (
AlreadyFillingGasError, AlreadyFillingGasError,
EthError, EthError,
@ -112,6 +124,7 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
try: try:
o = balance(address) o = balance(address)
r = conn.do(o) r = conn.do(o)
gas_balance = abi_decode_single(ABIContractType.UINT256, r)
except EthException as e: except EthException as e:
raise EthError('gas_balance call for {}: {}'.format(address, e)) raise EthError('gas_balance call for {}: {}'.format(address, e))
@ -290,7 +303,7 @@ def send(self, txs, chain_spec_dict):
) )
s.apply_async() s.apply_async()
return r.hex() return tx_hash_hex
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks. # TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
@ -319,70 +332,60 @@ def refill_gas(self, recipient_address, chain_spec_dict):
q = q.filter(TxCache.recipient==recipient_address) q = q.filter(TxCache.recipient==recipient_address)
c = q.count() c = q.count()
if c > 0: if c > 0:
#session.close()
#raise AlreadyFillingGasError(recipient_address)
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address)))) logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True zero_amount = True
session.flush() session.flush()
queue = self.request.delivery_info['routing_key'] queue = self.request.delivery_info.get('routing_key')
c = RpcClient(chain_spec) #c = RpcClient(chain_spec)
conn = RPCConnection.connect() rpc = RPCConnection.connect(chain_spec, 'default')
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.flush()
# Get default nonce to use from network if no nonce has been set # Get default nonce to use from network if no nonce has been set
# TODO: This step may be redundant as nonce entry is set at account creation time # TODO: This step may be redundant as nonce entry is set at account creation time
#default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending') #default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
o = count_pending(c.gas_provider()) #o = count_pending(gas_provider)
default_nonce = conn.do(o) #default_nonce = conn.do(o)
nonce_generator = NonceOracle(c.gas_provider(), default_nonce) nonce_oracle = CustodialTaskNonceOracle(gas_provider, self.request.root_id, session=session) #, default_nonce)
#nonce = nonce_generator.next(session=session) #nonce = nonce_generator.next(session=session)
nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session) #nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session)
gas_price = c.gas_price() rpc_signer = RPCConnection.connect(chain_spec, 'signer')
gas_limit = c.default_gas_limit gas_oracle = self.create_gas_oracle(rpc)
c = Gas(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
#gas_price = c.gas_price()
#gas_limit = c.default_gas_limit
refill_amount = 0 refill_amount = 0
if not zero_amount: if not zero_amount:
refill_amount = c.refill_amount() refill_amount = self.safe_gas_refill_amount
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
# create and sign transaction logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address))
tx_send_gas = { # # create and sign transaction
'from': c.gas_provider(), # tx_send_gas = {
'to': recipient_address, # 'from': c.gas_provider(),
'gas': gas_limit, # 'to': recipient_address,
'gasPrice': gas_price, # 'gas': gas_limit,
'chainId': chain_spec.chain_id(), # 'gasPrice': gas_price,
'nonce': nonce, # 'chainId': chain_spec.chain_id(),
'value': refill_amount, # 'nonce': nonce,
'data': '', # 'value': refill_amount,
} # 'data': '',
#tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas) # }
#tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw']) # #tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas)
#tx_hash_hex = tx_hash.hex() # #tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw'])
(tx_hash_hex, tx_send_gas_signed) = sign_tx(tx_send_gas) # #tx_hash_hex = tx_hash.hex()
# (tx_hash_hex, tx_send_gas_signed) = sign_tx(tx_send_gas)
(tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED)
# TODO: route this through sign_and_register_tx instead # TODO: route this through sign_and_register_tx instead
logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex)) logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex))
queue_create( #cache_task = 'cic_eth.eth.tx.cache_gas_refill_data'
nonce, cache_task = 'cic_eth.eth.tx.otx_cache_parse_tx'
c.gas_provider(), register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
tx_hash_hex,
tx_send_gas_signed['raw'],
chain_str,
session=session,
)
session.close()
s_tx_cache = celery.signature(
'cic_eth.eth.tx.cache_gas_refill_data',
[
tx_hash_hex,
tx_send_gas,
],
queue=queue,
)
s_status = celery.signature( s_status = celery.signature(
'cic_eth.queue.tx.set_ready', 'cic_eth.queue.tx.set_ready',
[ [
@ -390,9 +393,9 @@ def refill_gas(self, recipient_address, chain_spec_dict):
], ],
queue=queue, queue=queue,
) )
celery.group(s_tx_cache, s_status)() t = s_status.apply_async()
return tx_send_gas_signed['raw'] return tx_signed_raw_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask) @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
@ -413,7 +416,6 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
""" """
session = SessionBase.create_session() session = SessionBase.create_session()
q = session.query(Otx) q = session.query(Otx)
q = q.filter(Otx.tx_hash==txold_hash_hex) q = q.filter(Otx.tx_hash==txold_hash_hex)
otx = q.first() otx = q.first()
@ -603,7 +605,7 @@ def resume_tx(self, txpending_hash_hex, chain_str):
def otx_cache_parse_tx( def otx_cache_parse_tx(
tx_hash_hex, tx_hash_hex,
tx_signed_raw_hex, tx_signed_raw_hex,
chain_str, chain_spec_dict,
): ):
"""Generates and commits transaction cache metadata for a gas refill transaction """Generates and commits transaction cache metadata for a gas refill transaction
@ -618,18 +620,20 @@ def otx_cache_parse_tx(
""" """
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_dict(chain_spec_dict)
c = RpcClient(chain_spec) #c = RpcClient(chain_spec)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:]) #tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
(txc, cache_id) = cache_gas_refill_data(tx_hash_hex, tx) tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
(txc, cache_id) = cache_gas_refill_data(tx_hash_hex, tx, chain_spec)
return txc return txc
@celery_app.task(base=CriticalSQLAlchemyTask) #@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_gas_refill_data( def cache_gas_refill_data(
tx_hash_hex, tx_hash_hex,
tx, tx,
chain_spec,
): ):
"""Helper function for otx_cache_parse_tx """Helper function for otx_cache_parse_tx

View File

@ -3,13 +3,14 @@ import logging
import time import time
import datetime import datetime
# third-party imports # external imports
import celery import celery
from hexathon import strip_0x from hexathon import strip_0x
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy import not_ from sqlalchemy import not_
from sqlalchemy import tuple_ from sqlalchemy import tuple_
from sqlalchemy import func from sqlalchemy import func
from chainlib.eth.tx import unpack
# local imports # local imports
from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import Otx
@ -95,6 +96,50 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_spec, obsolete_prede
return tx_hash return tx_hash
def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None):
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
:param tx: Standard ethereum transaction data
:type tx: dict
:param chain_spec: Chain spec of transaction to add to queue
:type chain_spec: chainlib.chain.ChainSpec
:param queue: Task queue
:type queue: str
:param cache_task: Cache task to call with signed transaction. If None, no task will be called.
:type cache_task: str
:raises: sqlalchemy.exc.DatabaseError
:returns: Tuple; Transaction hash, signed raw transaction data
:rtype: tuple
"""
logg.debug('adding queue tx {}:{} -> {}'.format(chain_spec, tx_hash_hex, tx_signed_raw_hex))
tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw, chain_id=chain_spec.chain_id())
create(
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
chain_spec,
session=session,
)
if cache_task != None:
logg.debug('adding cache task {} tx {}'.format(cache_task, tx_hash_hex))
s_cache = celery.signature(
cache_task,
[
tx_hash_hex,
tx_signed_raw_hex,
chain_spec.asdict(),
],
queue=queue,
)
s_cache.apply_async()
return (tx_hash_hex, tx_signed_raw_hex,)
# TODO: Replace set_* with single task for set status # TODO: Replace set_* with single task for set status
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def set_sent_status(tx_hash, fail=False): def set_sent_status(tx_hash, fail=False):

View File

@ -53,6 +53,7 @@ class CriticalWeb3Task(CriticalTask):
requests.exceptions.ConnectionError, requests.exceptions.ConnectionError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5
class CriticalSQLAlchemyAndWeb3Task(CriticalTask): class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
@ -64,6 +65,8 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
EthError, EthError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5
class CriticalSQLAlchemyAndSignerTask(CriticalTask): class CriticalSQLAlchemyAndSignerTask(CriticalTask):
autoretry_for = ( autoretry_for = (
@ -78,6 +81,8 @@ class CriticalWeb3AndSignerTask(CriticalTask):
requests.exceptions.ConnectionError, requests.exceptions.ConnectionError,
SignerError, SignerError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5
@celery_app.task(bind=True, base=BaseTask) @celery_app.task(bind=True, base=BaseTask)

View File

@ -8,7 +8,6 @@ root_dir = os.path.dirname(script_dir)
sys.path.insert(0, root_dir) sys.path.insert(0, root_dir)
from tests.fixtures_config import * from tests.fixtures_config import *
from tests.fixtures_celery import *
from tests.fixtures_database import * from tests.fixtures_database import *
from tests.fixtures_role import * from tests.fixtures_role import *
from chainlib.eth.pytest import * from chainlib.eth.pytest import *

View File

@ -13,9 +13,9 @@ logg = logging.getLogger()
def celery_includes(): def celery_includes():
return [ return [
# 'cic_eth.eth.bancor', # 'cic_eth.eth.bancor',
'cic_eth.eth.token', # 'cic_eth.eth.token',
'cic_eth.eth.tx', 'cic_eth.eth.tx',
'cic_eth.ext.tx', # 'cic_eth.ext.tx',
'cic_eth.queue.tx', 'cic_eth.queue.tx',
'cic_eth.queue.balance', 'cic_eth.queue.balance',
'cic_eth.admin.ctrl', 'cic_eth.admin.ctrl',

View File

@ -11,6 +11,7 @@ from cic_eth.db.models.role import AccountRole
#logg = logging.getLogger(__name__) #logg = logging.getLogger(__name__)
# what the actual fuck, debug is not being shown even though explicitly set # what the actual fuck, debug is not being shown even though explicitly set
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger() logg = logging.getLogger()
@ -29,7 +30,7 @@ def custodial_roles(
for k in r.keys(): for k in r.keys():
role = AccountRole.set(k, r[k]) role = AccountRole.set(k, r[k])
init_database.add(role) init_database.add(role)
logg.info('adding role {} -> {}'.format(k, r[k])) logg.error('adding role {} -> {}'.format(k, r[k]))
init_database.commit() init_database.commit()
return r return r

View File

@ -0,0 +1 @@
from tests.fixtures_celery import *

View File

@ -18,7 +18,6 @@ from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import StatusEnum
from cic_eth.db.models.nonce import Nonce from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.role import AccountRole from cic_eth.db.models.role import AccountRole
from cic_eth.eth.account import AccountTxFactory
logg = logging.getLogger() logg = logging.getLogger()
@ -35,7 +34,7 @@ def test_create_account(
'cic_eth.eth.account.create', 'cic_eth.eth.account.create',
[ [
'foo', 'foo',
str(default_chain_spec), default_chain_spec.asdict(),
], ],
) )
t = s.apply_async() t = s.apply_async()
@ -52,7 +51,7 @@ def test_create_account(
'cic_eth.eth.account.have', 'cic_eth.eth.account.have',
[ [
r, r,
str(default_chain_spec), default_chain_spec.asdict(),
], ],
) )
t = s.apply_async() t = s.apply_async()
@ -73,8 +72,6 @@ def test_register_account(
celery_worker, celery_worker,
): ):
logg.debug('chainspec {}'.format(str(default_chain_spec)))
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.tx.reserve_nonce',
[ [
@ -95,7 +92,7 @@ def test_register_account(
t = s_nonce.apply_async() t = s_nonce.apply_async()
address = t.get() address = t.get()
for r in t.collect(): for r in t.collect():
pass logg.debug('r {}'.format(r))
assert t.successful() assert t.successful()
session = SessionBase.create_session() session = SessionBase.create_session()