Move nonce, gas tasks to respective modules, chainspec instead of chainid for chainlib tx

This commit is contained in:
nolash 2021-04-04 13:27:28 +02:00
parent d3ccac7dac
commit 65d3efd72b
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
35 changed files with 566 additions and 610 deletions

View File

@ -46,8 +46,8 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
tx_brief = get_tx(tx_hash_orig_hex) tx_brief = get_tx(tx_hash_orig_hex)
tx_raw = bytes.fromhex(tx_brief['signed_tx'][2:]) tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx'][2:]))
tx = unpack(tx_raw, chain_spec.chain_id()) tx = unpack(tx_raw, chain_spec)
nonce = tx_brief['nonce'] nonce = tx_brief['nonce']
address = tx['from'] address = tx['from']
@ -67,8 +67,8 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
tx_hashes = [] tx_hashes = []
txs = [] txs = []
for otx in otxs: for otx in otxs:
tx_raw = bytes.fromhex(otx.signed_tx[2:]) tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec.chain_id()) tx_new = unpack(tx_raw, chain_spec)
tx_previous_hash_hex = tx_new['hash'] tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce'] tx_previous_nonce = tx_new['nonce']

View File

@ -154,7 +154,7 @@ class AdminApi:
raise NotImplementedError('resend as new not yet implemented') raise NotImplementedError('resend as new not yet implemented')
s = celery.signature( s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas', 'cic_eth.eth.gas.resend_with_higher_gas',
[ [
chain_spec.asdict(), chain_spec.asdict(),
None, None,

View File

@ -92,7 +92,7 @@ class Api:
queue=self.queue, queue=self.queue,
) )
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
self.chain_spec.asdict(), self.chain_spec.asdict(),
], ],
@ -156,7 +156,7 @@ class Api:
queue=self.queue, queue=self.queue,
) )
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
self.chain_spec.asdict(), self.chain_spec.asdict(),
], ],
@ -217,7 +217,7 @@ class Api:
queue=self.queue, queue=self.queue,
) )
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
self.chain_spec.asdict(), self.chain_spec.asdict(),
from_address, from_address,
@ -364,7 +364,7 @@ class Api:
if register: if register:
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
self.chain_spec.asdict(), self.chain_spec.asdict(),
'ACCOUNT_REGISTRY_WRITER', 'ACCOUNT_REGISTRY_WRITER',
@ -403,7 +403,7 @@ class Api:
queue=self.queue, queue=self.queue,
) )
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
self.chain_spec.asdict(), self.chain_spec.asdict(),
'GAS_GIFTER', 'GAS_GIFTER',
@ -411,7 +411,7 @@ class Api:
queue=self.queue, queue=self.queue,
) )
s_refill = celery.signature( s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas', 'cic_eth.eth.gas.refill_gas',
[ [
self.chain_spec.asdict(), self.chain_spec.asdict(),
], ],

View File

@ -4,12 +4,6 @@ class TokenCountError(Exception):
pass pass
class NotLocalTxError(Exception):
"""Exception raised when trying to access a tx not originated from a local task
"""
pass
class PermanentTxError(Exception): class PermanentTxError(Exception):
"""Exception raised when encountering a permanent error when sending a tx. """Exception raised when encountering a permanent error when sending a tx.

View File

@ -125,7 +125,7 @@ def register(self, account_address, chain_spec_dict, writer_address=None):
rpc_signer = RPCConnection.connect(chain_spec, 'signer') rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce) nonce_oracle = CustodialTaskNonceOracle(writer_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas) gas_oracle = self.create_gas_oracle(rpc, AccountRegistry.gas)
account_registry = AccountRegistry(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id()) account_registry = AccountRegistry(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = account_registry.add(account_registry_address, writer_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect() rpc_signer.disconnect()
@ -178,7 +178,7 @@ def gift(self, account_address, chain_spec_dict):
rpc_signer = RPCConnection.connect(chain_spec, 'signer') rpc_signer = RPCConnection.connect(chain_spec, 'signer')
nonce_oracle = CustodialTaskNonceOracle(account_address, self.request.root_id, session=session) #, default_nonce) nonce_oracle = CustodialTaskNonceOracle(account_address, self.request.root_id, session=session) #, default_nonce)
gas_oracle = self.create_gas_oracle(rpc, Faucet.gas) gas_oracle = self.create_gas_oracle(rpc, Faucet.gas)
faucet = Faucet(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id()) faucet = Faucet(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = faucet.give_to(faucet_address, account_address, account_address, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = faucet.give_to(faucet_address, account_address, account_address, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect() rpc_signer.disconnect()
@ -285,7 +285,7 @@ def cache_gift_data(
chain_spec = ChainSpec.from_dict(chain_spec_dict) chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex)) tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = Faucet.parse_give_to_request(tx['data']) tx_data = Faucet.parse_give_to_request(tx['data'])
session = self.create_session() session = self.create_session()
@ -328,7 +328,7 @@ def cache_account_data(
""" """
chain_spec = ChainSpec.from_dict(chain_spec_dict) chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:]) tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack(tx_signed_raw_bytes, chain_id=chain_spec.chain_id()) tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = AccountRegistry.parse_add_request(tx['data']) tx_data = AccountRegistry.parse_add_request(tx['data'])
session = SessionBase.create_session() session = SessionBase.create_session()

View File

@ -209,7 +209,7 @@ def convert_with_default_reserve(self, tokens, from_address, source_amount, mini
# s_queue.apply_async() # s_queue.apply_async()
# #
# s_check_gas = celery.signature( # s_check_gas = celery.signature(
# 'cic_eth.eth.tx.check_gas', # 'cic_eth.eth.gas.check_gas',
# [ # [
# c['address'], # c['address'],
# [c['signed_tx']], # [c['signed_tx']],
@ -222,7 +222,7 @@ def convert_with_default_reserve(self, tokens, from_address, source_amount, mini
# ) # )
# #
# s_set_sent = celery.signature( # s_set_sent = celery.signature(
# 'cic_eth.queue.state.set_sent_status', # 'cic_eth.queue.state.set_sent',
# [False], # [False],
# ) # )
# s_send.link(s_set_sent) # s_send.link(s_set_sent)
@ -364,7 +364,7 @@ def otx_cache_convert(
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:]) tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = unpack_convert(tx['data']) tx_data = unpack_convert(tx['data'])
logg.debug('tx data {}'.format(tx_data)) logg.debug('tx data {}'.format(tx_data))

View File

@ -107,7 +107,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_d
session = self.create_session() session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle, chain_id=chain_spec.chain_id()) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect() rpc_signer.disconnect()
@ -170,7 +170,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_spec_dic
session = self.create_session() session = self.create_session()
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle, chain_id=chain_spec.chain_id()) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED)
rpc_signer.disconnect() rpc_signer.disconnect()
@ -244,7 +244,7 @@ def cache_transfer_data(
""" """
chain_spec = ChainSpec.from_dict(chain_spec_dict) chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex)) tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = ERC20.parse_transfer_request(tx['data']) tx_data = ERC20.parse_transfer_request(tx['data'])
recipient_address = tx_data[0] recipient_address = tx_data[0]
@ -285,7 +285,7 @@ def cache_approve_data(
""" """
chain_spec = ChainSpec.from_dict(chain_spec_dict) chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex)) tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack(tx_signed_raw_bytes, chain_spec)
tx_data = ERC20.parse_approve_request(tx['data']) tx_data = ERC20.parse_approve_request(tx['data'])
recipient_address = tx_data[0] recipient_address = tx_data[0]

View File

@ -3,87 +3,59 @@ import logging
# external imports # external imports
import celery import celery
from chainlib.eth.gas import price
from hexathon import strip_0x from hexathon import strip_0x
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address
from chainlib.connection import RPCConnection
from chainqueue.db.enum import StatusBits
from chainlib.eth.gas import (
balance,
price,
)
from chainlib.eth.error import (
NotFoundEthException,
EthException,
)
from chainlib.eth.tx import (
TxFactory,
TxFormat,
unpack,
)
from chainlib.eth.contract import (
abi_decode_single,
ABIContractType,
)
from chainlib.eth.gas import (
Gas,
OverrideGasOracle,
)
from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx
# local imports # local imports
from cic_eth.db.models.role import AccountRole from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.error import (
AlreadyFillingGasError,
OutOfGasError,
)
from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.queue.tx import (
queue_create,
register_tx,
)
from cic_eth.queue.query import get_tx
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalSQLAlchemyAndWeb3Task,
CriticalSQLAlchemyAndSignerTask,
CriticalWeb3AndSignerTask,
)
celery_app = celery.current_app
logg = logging.getLogger() logg = logging.getLogger()
#
#class GasOracle():
# """Provides gas pricing for transactions.
#
# :param w3: Web3 object
# :type w3: web3.Web3
# """
#
# __safe_threshold_amount_value = 2000000000 * 60000 * 3
# __refill_amount_value = __safe_threshold_amount_value * 5
# default_gas_limit = 21000
#
# def __init__(self, conn):
# o = price()
# r = conn.do(o)
# b = bytes.from_hex(strip_0x(r))
# self.gas_price_current = int.from_bytes(b, 'big')
#
# #self.w3 = w3
# #self.gas_price_current = w3.eth.gas_price()
#
#
# def safe_threshold_amount(self):
# """The gas balance threshold under which a new gas refill transaction should be initiated.
#
# :returns: Gas token amount
# :rtype: number
# """
# g = GasOracle.__safe_threshold_amount_value
# logg.warning('gas safe threshold is currently hardcoded to {}'.format(g))
# return g
#
#
# def refill_amount(self):
# """The amount of gas tokens to send in a gas refill transaction.
#
# :returns: Gas token amount
# :rtype: number
# """
# g = GasOracle.__refill_amount_value
# logg.warning('gas refill amount is currently hardcoded to {}'.format(g))
# return g
#
#
# def gas_provider(self):
# """Gas provider address.
#
# :returns: Etheerum account address
# :rtype: str, 0x-hex
# """
# session = SessionBase.create_session()
# a = AccountRole.get_address('GAS_GIFTER', session)
# logg.debug('gasgifter {}'.format(a))
# session.close()
# return a
#
#
# def gas_price(self, category='safe'):
# """Get projected gas price to use for a transaction at the current moment.
#
# When the category parameter is implemented, it can be used to control the priority of a transaction in the network.
#
# :param category: Bid level category to return price for. Currently has no effect.
# :type category: str
# :returns: Gas price
# :rtype: number
# """
# #logg.warning('gas price hardcoded to category "safe"')
# #g = 100
# #return g
# return self.gas_price_current
class MaxGasOracle: class MaxGasOracle:
@ -114,7 +86,7 @@ def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=No
s_check_gas = None s_check_gas = None
if tx_hashes_hex != None: if tx_hashes_hex != None:
s_check_gas = celery.signature( s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas', 'cic_eth.eth.gas.check_gas',
[ [
tx_hashes_hex, tx_hashes_hex,
chain_spec.asdict(), chain_spec.asdict(),
@ -126,7 +98,7 @@ def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=No
) )
else: else:
s_check_gas = celery.signature( s_check_gas = celery.signature(
'cic_eth.eth.tx.check_gas', 'cic_eth.eth.gas.check_gas',
[ [
chain_spec.asdict(), chain_spec.asdict(),
tx_signed_raws_hex, tx_signed_raws_hex,
@ -136,3 +108,324 @@ def create_check_gas_task(tx_signed_raws_hex, chain_spec, holder_address, gas=No
queue=queue, queue=queue,
) )
return s_check_gas return s_check_gas
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
):
"""Helper function for otx_cache_parse_tx
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec)
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
tx['value'],
tx['value'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
If account balance is sufficient, but level of gas before spend is below "safe" threshold, gas refill is requested, and execution continues normally.
:param tx_hashes: Transaction hashes due to be submitted
:type tx_hashes: list of str, 0x-hex
:param chain_spec_dict: Chain spec dict representation
:type chain_spec_dict: dict
:param txs: Signed raw transaction data, corresponding to tx_hashes
:type txs: list of str, 0x-hex
:param address: Sender address
:type address: str, 0x-hex
:param gas_required: Gas limit * gas price for transaction, (optional, if not set will be retrived from transaction data)
:type gas_required: int
:return: Signed raw transaction data list
:rtype: param txs, unchanged
"""
if len(txs) == 0:
for i in range(len(tx_hashes)):
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
if address == None:
address = o['address']
#if not web3.Web3.isChecksumAddress(address):
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
conn = RPCConnection.connect(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
gas_balance = 0
try:
o = balance(address)
r = conn.do(o)
conn.disconnect()
gas_balance = abi_decode_single(ABIContractType.UINT256, r)
except EthException as e:
conn.disconnect()
raise EthError('gas_balance call for {}: {}'.format(address, e))
logg.debug('address {} has gas {} needs {}'.format(address, gas_balance, gas_required))
session = SessionBase.create_session()
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.close()
if gas_required > gas_balance:
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
address,
chain_spec_dict,
gas_provider,
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.gas.refill_gas',
[
chain_spec_dict,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
wait_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.state.set_waitforgas',
[
chain_spec_dict,
tx_hash,
],
queue=queue,
)
wait_tasks.append(s)
celery.group(wait_tasks)()
raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, gas_balance))
safe_gas = self.safe_gas_threshold_amount
if gas_balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.nonce.reserve_nonce',
[
address,
chain_spec_dict,
gas_provider,
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.gas.refill_gas',
[
chain_spec_dict,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(gas_provider, address))
ready_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.state.set_ready',
[
chain_spec_dict,
tx_hash,
],
queue=queue,
)
ready_tasks.append(s)
celery.group(ready_tasks)()
return txs
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3AndSignerTask)
def refill_gas(self, recipient_address, chain_spec_dict):
"""Executes a native token transaction to fund the recipient's gas expenditures.
:param recipient_address: Recipient in need of gas
:type recipient_address: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:raises AlreadyFillingGasError: A gas refill transaction for this address is already executing
:returns: Transaction hash.
:rtype: str, 0x-hex
"""
# essentials
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
# Determine value of gas tokens to send
# if an uncompleted gas refill for the same recipient already exists, we still need to spend the nonce
# however, we will perform a 0-value transaction instead
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!=0)
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
if c > 0:
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True
session.flush()
# finally determine the value to send
refill_amount = 0
if not zero_amount:
refill_amount = self.safe_gas_refill_amount
# determine sender
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.flush()
# set up evm RPC connection
rpc = RPCConnection.connect(chain_spec, 'default')
# set up transaction builder
nonce_oracle = CustodialTaskNonceOracle(gas_provider, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc)
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
c = Gas(chain_spec, signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
# build and add transaction
logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address))
(tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED)
logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex))
cache_task = 'cic_eth.eth.gas.cache_gas_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
# add transaction to send queue
s_status = celery.signature(
'cic_eth.queue.state.set_ready',
[
chain_spec.asdict(),
tx_hash_hex,
],
queue=queue,
)
t = s_status.apply_async()
return tx_signed_raw_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price.
:param txold_hash_hex: Transaction to re-create
:type txold_hash_hex: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:param gas: Explicitly use the specified gas amount
:type gas: number
:param default_factor: Default factor by which to increment the gas price by
:type default_factor: float
:raises NotLocalTxError: Transaction does not exist in the local queue
:returns: Transaction hash
:rtype: str, 0x-hex
"""
session = SessionBase.create_session()
otx = Otx.load(txold_hash_hex, session)
if otx == None:
session.close()
raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx)
tx = unpack(tx_signed_raw_bytes, chain_spec)
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info.get('routing_key')
logg.debug('before {}'.format(tx))
rpc = RPCConnection.connect(chain_spec, 'default')
new_gas_price = gas
if new_gas_price == None:
o = price()
r = rpc.do(o)
current_gas_price = int(r, 16)
if tx['gasPrice'] > current_gas_price:
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(curent_gas_price, tx['gasPrice']))
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
new_gas_price = tx['gasPrice'] + 1
else:
new_gas_price = int(tx['gasPrice'] * default_factor)
#if gas_price > new_gas_price:
# tx['gasPrice'] = gas_price
#else:
# tx['gasPrice'] = new_gas_price
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc)
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx))
tx['gasPrice'] = new_gas_price
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx)
queue_create(
chain_spec,
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
session=session,
)
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
session.close()
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_spec,
tx['from'],
tx['gasPrice'] * tx['gas'],
[tx_hash_hex],
queue=queue,
)
s.apply_async()
return tx_hash_hex

View File

@ -1,9 +1,23 @@
# standard imports
import logging
# external imports
import celery
from chainlib.eth.address import is_checksum_address
# local imports # local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.base import SessionBase
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.db.models.nonce import ( from cic_eth.db.models.nonce import (
Nonce, Nonce,
NonceReservation, NonceReservation,
) )
celery_app = celery.current_app
logg = logging.getLogger()
class CustodialTaskNonceOracle(): class CustodialTaskNonceOracle():
"""Ensures atomic nonce increments for all transactions across all tasks and threads. """Ensures atomic nonce increments for all transactions across all tasks and threads.
@ -30,3 +44,36 @@ class CustodialTaskNonceOracle():
""" """
r = NonceReservation.release(self.address, self.uuid, session=self.session) r = NonceReservation.release(self.address, self.uuid, session=self.session)
return r[1] return r[1]
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, chain_spec_dict, signer_address=None):
self.log_banner()
session = SessionBase.create_session()
address = None
if signer_address == None:
address = chained_input
logg.debug('non-explicit address for reserve nonce, using arg head {}'.format(chained_input))
else:
if is_checksum_address(signer_address):
address = signer_address
logg.debug('explicit address for reserve nonce {}'.format(signer_address))
else:
address = AccountRole.get_address(signer_address, session=session)
logg.debug('role for reserve nonce {} -> {}'.format(signer_address, address))
if not is_checksum_address(address):
raise ValueError('invalid result when resolving address for nonce {}'.format(address))
root_id = self.request.root_id
r = NonceReservation.next(address, root_id, session=session)
logg.debug('nonce {} reserved for address {} task {}'.format(r[1], address, r[0]))
session.commit()
session.close()
return chained_input

View File

@ -1,38 +1,18 @@
# standard imports # standard imports
import logging import logging
# third-party imports # external imports
import celery import celery
import requests
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address from chainlib.eth.address import is_checksum_address
from chainlib.eth.gas import ( from chainlib.eth.error import NotFoundEthException
balance,
price,
)
from chainlib.eth.error import (
EthException,
NotFoundEthException,
)
from chainlib.eth.tx import ( from chainlib.eth.tx import (
transaction, transaction,
receipt, receipt,
raw, raw,
TxFormat,
TxFactory,
unpack,
) )
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.hash import keccak256_hex_to_hex from chainlib.hash import keccak256_hex_to_hex
from chainlib.eth.gas import (
Gas,
OverrideGasOracle,
)
from chainlib.eth.contract import (
abi_decode_single,
ABIContractType,
)
from hexathon import ( from hexathon import (
add_0x, add_0x,
strip_0x, strip_0x,
@ -40,37 +20,15 @@ from hexathon import (
from chainqueue.db.models.tx import Otx from chainqueue.db.models.tx import Otx
from chainqueue.db.models.tx import TxCache from chainqueue.db.models.tx import TxCache
from chainqueue.db.enum import StatusBits from chainqueue.db.enum import StatusBits
from chainqueue.error import NotLocalTxError
# local imports # local imports
from cic_eth.db import SessionBase from cic_eth.db import SessionBase
from cic_eth.db.models.nonce import NonceReservation
from cic_eth.db.models.lock import Lock
from cic_eth.db.models.role import AccountRole
from cic_eth.db.enum import (
LockEnum,
)
from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError
from cic_eth.error import NotLocalTxError
#from cic_eth.queue.tx import create as queue_create
from cic_eth.queue.query import (
get_tx,
get_nonce_tx,
)
from cic_eth.queue.tx import (
queue_create,
register_tx,
)
from cic_eth.error import OutOfGasError
from cic_eth.error import LockedError
from cic_eth.eth.gas import (
create_check_gas_task,
)
from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.error import ( from cic_eth.error import (
AlreadyFillingGasError, PermanentTxError,
EthError, TemporaryTxError,
) )
from cic_eth.eth.gas import create_check_gas_task
from cic_eth.admin.ctrl import lock_send from cic_eth.admin.ctrl import lock_send
from cic_eth.task import ( from cic_eth.task import (
CriticalSQLAlchemyTask, CriticalSQLAlchemyTask,
@ -86,131 +44,6 @@ logg = logging.getLogger()
MAX_NONCE_ATTEMPTS = 3 MAX_NONCE_ATTEMPTS = 3
# TODO this function is too long
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
If account balance is sufficient, but level of gas before spend is below "safe" threshold, gas refill is requested, and execution continues normally.
:param tx_hashes: Transaction hashes due to be submitted
:type tx_hashes: list of str, 0x-hex
:param chain_spec_dict: Chain spec dict representation
:type chain_spec_dict: dict
:param txs: Signed raw transaction data, corresponding to tx_hashes
:type txs: list of str, 0x-hex
:param address: Sender address
:type address: str, 0x-hex
:param gas_required: Gas limit * gas price for transaction, (optional, if not set will be retrived from transaction data)
:type gas_required: int
:return: Signed raw transaction data list
:rtype: param txs, unchanged
"""
if len(txs) == 0:
for i in range(len(tx_hashes)):
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
if address == None:
address = o['address']
#if not web3.Web3.isChecksumAddress(address):
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
conn = RPCConnection.connect(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
gas_balance = 0
try:
o = balance(address)
r = conn.do(o)
conn.disconnect()
gas_balance = abi_decode_single(ABIContractType.UINT256, r)
except EthException as e:
conn.disconnect()
raise EthError('gas_balance call for {}: {}'.format(address, e))
logg.debug('address {} has gas {} needs {}'.format(address, gas_balance, gas_required))
session = SessionBase.create_session()
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.close()
if gas_required > gas_balance:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
chain_spec_dict,
address,
gas_provider,
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
chain_spec_dict,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
wait_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.state.set_waitforgas',
[
chain_spec_dict,
tx_hash,
],
queue=queue,
)
wait_tasks.append(s)
celery.group(wait_tasks)()
raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, gas_balance))
safe_gas = self.safe_gas_threshold_amount
if gas_balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
chain_spec_dict,
address,
gas_provider,
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
chain_spec_dict,
],
queue=queue,
)
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(gas_provider, address))
ready_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
'cic_eth.queue.state.set_ready',
[
chain_spec_dict,
tx_hash,
],
queue=queue,
)
ready_tasks.append(s)
celery.group(ready_tasks)()
return txs
# TODO: chain chainable transactions that use hashes as inputs may be chained to this function to output signed txs instead. # TODO: chain chainable transactions that use hashes as inputs may be chained to this function to output signed txs instead.
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) @celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def hashes_to_txs(self, tx_hashes): def hashes_to_txs(self, tx_hashes):
@ -272,7 +105,7 @@ def send(self, txs, chain_spec_dict):
chain_spec = ChainSpec.from_dict(chain_spec_dict) chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_hex = txs[0] tx_hex = add_0x(txs[0])
tx_hash_hex = add_0x(keccak256_hex_to_hex(tx_hex)) tx_hash_hex = add_0x(keccak256_hex_to_hex(tx_hex))
@ -282,8 +115,9 @@ def send(self, txs, chain_spec_dict):
r = None r = None
s_set_sent = celery.signature( s_set_sent = celery.signature(
'cic_eth.queue.state.set_sent_status', 'cic_eth.queue.state.set_sent',
[ [
chain_spec_dict,
tx_hash_hex, tx_hash_hex,
False False
], ],
@ -300,202 +134,17 @@ def send(self, txs, chain_spec_dict):
if len(tx_tail) > 0: if len(tx_tail) > 0:
s = celery.signature( s = celery.signature(
'cic_eth.eth.tx.send', 'cic_eth.eth.tx.send',
[tx_tail],
queue=queue,
)
s.apply_async()
return tx_hash_hex
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3AndSignerTask)
def refill_gas(self, recipient_address, chain_spec_dict):
"""Executes a native token transaction to fund the recipient's gas expenditures.
:param recipient_address: Recipient in need of gas
:type recipient_address: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:raises AlreadyFillingGasError: A gas refill transaction for this address is already executing
:returns: Transaction hash.
:rtype: str, 0x-hex
"""
# essentials
chain_spec = ChainSpec.from_dict(chain_spec_dict)
queue = self.request.delivery_info.get('routing_key')
# Determine value of gas tokens to send
# if an uncompleted gas refill for the same recipient already exists, we still need to spend the nonce
# however, we will perform a 0-value transaction instead
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!=0)
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
if c > 0:
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True
session.flush()
# finally determine the value to send
refill_amount = 0
if not zero_amount:
refill_amount = self.safe_gas_refill_amount
# determine sender
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
session.flush()
# set up evm RPC connection
rpc = RPCConnection.connect(chain_spec, 'default')
# set up transaction builder
nonce_oracle = CustodialTaskNonceOracle(gas_provider, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc)
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
c = Gas(signer=rpc_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
# build and add transaction
logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address))
(tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED)
logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex))
cache_task = 'cic_eth.eth.tx.cache_gas_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
# add transaction to send queue
s_status = celery.signature(
'cic_eth.queue.state.set_ready',
[ [
chain_spec.asdict(), tx_tail,
tx_hash_hex, chain_spec_dict,
], ],
queue=queue, queue=queue,
) )
t = s_status.apply_async()
return tx_signed_raw_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price.
:param txold_hash_hex: Transaction to re-create
:type txold_hash_hex: str, 0x-hex
:param chain_str: Chain spec, string representation
:type chain_str: str
:param gas: Explicitly use the specified gas amount
:type gas: number
:param default_factor: Default factor by which to increment the gas price by
:type default_factor: float
:raises NotLocalTxError: Transaction does not exist in the local queue
:returns: Transaction hash
:rtype: str, 0x-hex
"""
session = SessionBase.create_session()
otx = Otx.load(txold_hash_hex, session)
if otx == None:
session.close()
raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx)
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info.get('routing_key')
logg.debug('before {}'.format(tx))
rpc = RPCConnection.connect(chain_spec, 'default')
new_gas_price = gas
if new_gas_price == None:
o = price()
r = rpc.do(o)
current_gas_price = int(r, 16)
if tx['gasPrice'] > current_gas_price:
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(curent_gas_price, tx['gasPrice']))
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
new_gas_price = tx['gasPrice'] + 1
else:
new_gas_price = int(tx['gasPrice'] * default_factor)
#if gas_price > new_gas_price:
# tx['gasPrice'] = gas_price
#else:
# tx['gasPrice'] = new_gas_price
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc)
c = TxFactory(signer=rpc_signer, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx))
tx['gasPrice'] = new_gas_price
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx)
queue_create(
chain_spec,
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
session=session,
)
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
session.close()
s = create_check_gas_task(
[tx_signed_raw_hex],
chain_spec,
tx['from'],
tx['gasPrice'] * tx['gas'],
[tx_hash_hex],
queue=queue,
)
s.apply_async() s.apply_async()
return tx_hash_hex return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, chain_spec_dict, signer_address=None):
self.log_banner()
session = SessionBase.create_session()
address = None
if signer_address == None:
address = chained_input
logg.debug('non-explicit address for reserve nonce, using arg head {}'.format(chained_input))
else:
if is_checksum_address(signer_address):
address = signer_address
logg.debug('explicit address for reserve nonce {}'.format(signer_address))
else:
address = AccountRole.get_address(signer_address, session=session)
logg.debug('role for reserve nonce {} -> {}'.format(signer_address, address))
if not is_checksum_address(address):
raise ValueError('invalid result when resolving address for nonce {}'.format(address))
root_id = self.request.root_id
r = NonceReservation.next(address, root_id, session=session)
logg.debug('nonce {} reserved for address {} task {}'.format(r[1], address, r[0]))
session.commit()
session.close()
return chained_input
@celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3Task) @celery_app.task(bind=True, throws=(NotFoundEthException,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_spec_dict): def sync_tx(self, tx_hash_hex, chain_spec_dict):
@ -527,7 +176,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success)) logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success))
s = celery.signature( s = celery.signature(
'cic_eth.queue.state.set_final_status', 'cic_eth.queue.state.set_final',
[ [
tx_hash_hex, tx_hash_hex,
rcpt['blockNumber'], rcpt['blockNumber'],
@ -539,7 +188,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
logg.debug('sync tx {} mempool'.format(tx_hash_hex)) logg.debug('sync tx {} mempool'.format(tx_hash_hex))
s = celery.signature( s = celery.signature(
'cic_eth.queue.state.set_sent_status', 'cic_eth.queue.state.set_sent',
[ [
tx_hash_hex, tx_hash_hex,
], ],
@ -575,7 +224,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
# #
# tx_signed_raw_hex = r[0] # tx_signed_raw_hex = r[0]
# tx_signed_bytes = bytes.fromhex(tx_signed_raw_hex[2:]) # tx_signed_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
# tx = unpack_signed_raw_tx(tx_signed_bytes, chain_spec.chain_id()) # tx = unpack(tx_signed_bytes, chain_spec)
# #
# queue = self.request.delivery_info['routing_key'] # queue = self.request.delivery_info['routing_key']
# #
@ -591,41 +240,3 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
# return txpending_hash_hex # return txpending_hash_hex
# TODO: Move to cic_eth.eth.gas
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
chain_spec_dict,
):
"""Helper function for otx_cache_parse_tx
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_dict(chain_spec_dict)
tx_signed_raw_bytes = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx['to'],
ZERO_ADDRESS,
ZERO_ADDRESS,
tx['value'],
tx['value'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@ -177,7 +177,7 @@ def tx_collate(tx_batches, chain_spec_dict, offset, limit, newest_first=True):
k = None k = None
try: try:
hx = strip_0x(v) hx = strip_0x(v)
tx = unpack(bytes.fromhex(hx), chain_spec.chain_id()) tx = unpack(bytes.fromhex(hx), chain_spec)
txc = get_tx_cache(chain_spec, tx['hash'], session) txc = get_tx_cache(chain_spec, tx['hash'], session)
txc['timestamp'] = int(txc['date_created'].timestamp()) txc['timestamp'] = int(txc['date_created'].timestamp())
txc['hash'] = txc['tx_hash'] txc['hash'] = txc['tx_hash']

View File

@ -91,7 +91,6 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re
:returns: Transactions :returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value :rtype: dict, with transaction hash as key, signed raw transaction as value
""" """
chain_id = chain_spec.chain_id()
session = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
q_outer = session.query( q_outer = session.query(
TxCache.sender, TxCache.sender,
@ -137,7 +136,7 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re
continue continue
tx_signed_bytes = bytes.fromhex(o.signed_tx) tx_signed_bytes = bytes.fromhex(o.signed_tx)
tx = unpack(tx_signed_bytes, chain_id) tx = unpack(tx_signed_bytes, chain_spec)
txs[o.tx_hash] = o.signed_tx txs[o.tx_hash] = o.signed_tx
q = session.query(TxCache) q = session.query(TxCache)

View File

@ -11,19 +11,19 @@ celery_app = celery.current_app
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def set_sent_status(chain_spec_dict, tx_hash, fail=False): def set_sent(chain_spec_dict, tx_hash, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict) chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session() session = SessionBase.create_session()
r = chainqueue.state.set_sent_status(chain_spec, tx_hash, fail, session=session) r = chainqueue.state.set_sent(chain_spec, tx_hash, fail, session=session)
session.close() session.close()
return r return r
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def set_final_status(chain_spec_dict, tx_hash, block=None, fail=False): def set_final(chain_spec_dict, tx_hash, block=None, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict) chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session() session = SessionBase.create_session()
r = chainqueue.state.set_final_status(chain_spec, tx_hash, block, fail, session=session) r = chainqueue.state.set_final(chain_spec, tx_hash, block, fail, session=session)
session.close() session.close()
return r return r

View File

@ -69,7 +69,7 @@ def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=No
""" """
logg.debug('adding queue tx {}:{} -> {}'.format(chain_spec, tx_hash_hex, tx_signed_raw_hex)) logg.debug('adding queue tx {}:{} -> {}'.format(chain_spec, tx_hash_hex, tx_signed_raw_hex))
tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex)) tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex))
tx = unpack(tx_signed_raw, chain_id=chain_spec.chain_id()) tx = unpack(tx_signed_raw, chain_spec)
tx_hash = queue_create( tx_hash = queue_create(
chain_spec, chain_spec,

View File

@ -13,7 +13,7 @@ def connect_token_registry(rpc, chain_spec):
registry = CICRegistry(chain_spec, rpc) registry = CICRegistry(chain_spec, rpc)
token_registry_address = registry.by_name('TokenRegistry') token_registry_address = registry.by_name('TokenRegistry')
logg.debug('using token registry address {}'.format(token_registry_address)) logg.debug('using token registry address {}'.format(token_registry_address))
lookup = TokenIndexLookup(token_registry_address) lookup = TokenIndexLookup(chain_spec, token_registry_address)
CICRegistry.add_lookup(lookup) CICRegistry.add_lookup(lookup)
@ -21,7 +21,7 @@ def connect_declarator(rpc, chain_spec, trusted_addresses):
registry = CICRegistry(chain_spec, rpc) registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator') declarator_address = registry.by_name('AddressDeclarator')
logg.debug('using declarator address {}'.format(declarator_address)) logg.debug('using declarator address {}'.format(declarator_address))
lookup = AddressDeclaratorLookup(declarator_address, trusted_addresses) lookup = AddressDeclaratorLookup(chain_spec, declarator_address, trusted_addresses)
CICRegistry.add_lookup(lookup) CICRegistry.add_lookup(lookup)

View File

@ -22,6 +22,7 @@ from chainqueue.db.enum import (
StatusBits, StatusBits,
) )
from chainqueue.error import NotLocalTxError from chainqueue.error import NotLocalTxError
from chainqueue.state import set_reserved
# local imports # local imports
import cic_eth import cic_eth
@ -29,7 +30,6 @@ from cic_eth.db import SessionBase
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import LockEnum
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.queue.query import get_upcoming_tx from cic_eth.queue.query import get_upcoming_tx
from cic_eth.queue.state import set_reserved
from cic_eth.admin.ctrl import lock_send from cic_eth.admin.ctrl import lock_send
from cic_eth.eth.tx import send as task_tx_send from cic_eth.eth.tx import send as task_tx_send
from cic_eth.error import ( from cic_eth.error import (
@ -91,7 +91,6 @@ class DispatchSyncer:
def __init__(self, chain_spec): def __init__(self, chain_spec):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.chain_id = chain_spec.chain_id()
def chain(self): def chain(self):
@ -102,13 +101,14 @@ class DispatchSyncer:
c = len(txs.keys()) c = len(txs.keys())
logg.debug('processing {} txs {}'.format(c, list(txs.keys()))) logg.debug('processing {} txs {}'.format(c, list(txs.keys())))
chain_str = str(self.chain_spec) chain_str = str(self.chain_spec)
session = SessionBase.create_session()
for k in txs.keys(): for k in txs.keys():
tx_raw = txs[k] tx_raw = txs[k]
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw)) tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
tx = unpack(tx_raw_bytes, self.chain_spec.chain_id()) tx = unpack(tx_raw_bytes, self.chain_spec)
try: try:
set_dequeue(tx['hash']) set_reserved(self.chain_spec, tx['hash'], session=session)
except NotLocalTxError as e: except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash'])) logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
continue continue

View File

@ -2,7 +2,11 @@
import logging import logging
# external imports # external imports
from hexathon import add_0x from hexathon import (
add_0x,
strip_0x,
)
from chainlib.eth.tx import unpack
from chainqueue.db.enum import StatusBits from chainqueue.db.enum import StatusBits
from chainqueue.db.models.tx import TxCache from chainqueue.db.models.tx import TxCache
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
@ -24,13 +28,13 @@ class GasFilter(SyncFilter):
def filter(self, conn, block, tx, session): def filter(self, conn, block, tx, session):
tx_hash_hex = add_0x(tx.hash)
if tx.value > 0: if tx.value > 0:
tx_hash_hex = add_0x(tx.hash)
logg.debug('gas refill tx {}'.format(tx_hash_hex)) logg.debug('gas refill tx {}'.format(tx_hash_hex))
session = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
q = session.query(TxCache.recipient) q = session.query(TxCache.recipient)
q = q.join(Otx) q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_hex) q = q.filter(Otx.tx_hash==strip_0x(tx_hash_hex))
r = q.first() r = q.first()
if r == None: if r == None:
@ -38,7 +42,7 @@ class GasFilter(SyncFilter):
SessionBase.release_session(session) SessionBase.release_session(session)
return return
txs = get_paused_tx(self.chain_spec, StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session) txs = get_paused_tx(self.chain_spec, status=StatusBits.GAS_ISSUES, sender=r[0], session=session, decoder=unpack)
SessionBase.release_session(session) SessionBase.release_session(session)

View File

@ -35,9 +35,10 @@ class RegistrationFilter(SyncFilter):
address = to_checksum_address(add_0x(address_hex)) address = to_checksum_address(add_0x(address_hex))
logg.info('request token gift to {}'.format(address)) logg.info('request token gift to {}'.format(address))
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
address, address,
self.chain_spec.asdict(),
], ],
queue=self.queue, queue=self.queue,
) )

View File

@ -61,7 +61,7 @@ class TransferAuthFilter(SyncFilter):
} }
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
[token_data], [token_data],
sender, sender,
@ -69,7 +69,7 @@ class TransferAuthFilter(SyncFilter):
queue=self.queue, queue=self.queue,
) )
s_approve = celery.signature( s_approve = celery.signature(
'cic_eth.eth.token.approve', 'cic_eth.eth.erc20.approve',
[ [
sender, sender,
recipient, recipient,

View File

@ -34,8 +34,9 @@ class TxFilter(SyncFilter):
db_session.flush() db_session.flush()
SessionBase.release_session(db_session) SessionBase.release_session(db_session)
s = celery.signature( s = celery.signature(
'cic_eth.queue.state.set_final_status', 'cic_eth.queue.state.set_final',
[ [
self.chain_spec.asdict(),
add_0x(tx_hash_hex), add_0x(tx_hash_hex),
tx.block.number, tx.block.number,
tx.status == Status.ERROR, tx.status == Status.ERROR,

View File

@ -92,7 +92,7 @@ straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
# TODO: we already have the signed raw tx in get, so its a waste of cycles to get_tx here # TODO: we already have the signed raw tx in get, so its a waste of cycles to get_tx here
def sendfail_filter(w3, tx_hash, rcpt, chain_spec): def sendfail_filter(w3, tx_hash, rcpt, chain_spec):
tx_dict = get_tx(tx_hash) tx_dict = get_tx(tx_hash)
tx = unpack_signed_raw_tx_hex(tx_dict['signed_tx'], chain_spec.chain_id()) tx = unpack(tx_dict['signed_tx'], chain_spec)
logg.debug('submitting tx {} for retry'.format(tx_hash)) logg.debug('submitting tx {} for retry'.format(tx_hash))
s_check = celery.signature( s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock', 'cic_eth.admin.ctrl.check_lock',
@ -118,7 +118,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_spec):
# queue=queue, # queue=queue,
# ) # )
s_resend = celery.signature( s_resend = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas', 'cic_eth.eth.gas.resend_with_higher_gas',
[ [
chain_str, chain_str,
], ],
@ -143,7 +143,7 @@ def dispatch(conn, chain_spec):
for k in txs.keys(): for k in txs.keys():
#tx_cache = get_tx_cache(k) #tx_cache = get_tx_cache(k)
tx_raw = txs[k] tx_raw = txs[k]
tx = unpack_signed_raw_tx_hex(tx_raw, chain_spec.chain_id()) tx = unpack(tx_raw, chain_spec)
s_check = celery.signature( s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock', 'cic_eth.admin.ctrl.check_lock',
@ -184,7 +184,7 @@ def dispatch(conn, chain_spec):
# txs = list(txs.keys()) # txs = list(txs.keys())
# logg.debug('straggler txs {} chain {}'.format(signed_txs, chain_str)) # logg.debug('straggler txs {} chain {}'.format(signed_txs, chain_str))
# s_send = celery.signature( # s_send = celery.signature(
# 'cic_eth.eth.resend_with_higher_gas', # 'cic_eth.eth.gas.resend_with_higher_gas',
# [ # [
# txs, # txs,
# chain_str, # chain_str,
@ -204,7 +204,7 @@ class StragglerFilter:
def filter(self, conn, block, tx, db_session=None): def filter(self, conn, block, tx, db_session=None):
logg.debug('tx {}'.format(tx)) logg.debug('tx {}'.format(tx))
s_send = celery.signature( s_send = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas', 'cic_eth.eth.gas.resend_with_higher_gas',
[ [
tx, tx,
self.chain_spec.asdict(), self.chain_spec.asdict(),

View File

@ -17,18 +17,31 @@ from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
# local imports # local imports
from cic_eth.eth import erc20 from cic_eth.eth import (
from cic_eth.eth import tx erc20,
from cic_eth.eth import account tx,
from cic_eth.admin import debug account,
from cic_eth.admin import ctrl nonce,
from cic_eth.queue import state gas,
from cic_eth.queue import query )
from cic_eth.queue import balance from cic_eth.admin import (
from cic_eth.callbacks import Callback debug,
from cic_eth.callbacks import http ctrl,
from cic_eth.callbacks import tcp )
from cic_eth.callbacks import redis from cic_eth.queue import (
query,
balance,
state,
tx,
lock,
time,
)
from cic_eth.callbacks import (
Callback,
http,
#tcp,
redis,
)
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.ext import tx from cic_eth.ext import tx
@ -163,5 +176,10 @@ def main():
current_app.worker_main(argv) current_app.worker_main(argv)
@celery.signals.eventlet_pool_postshutdown.connect
def shutdown(sender=None, headers=None, body=None, **kwargs):
logg.warning('in shudown event hook')
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -8,9 +8,9 @@ import semver
version = ( version = (
0, 0,
10, 11,
1, 0,
'beta.3', 'alpha.2',
) )
version_object = semver.VersionInfo( version_object = semver.VersionInfo(

View File

@ -1,25 +1,25 @@
cic-base~=0.1.2a55 cic-base~=0.1.2a56
celery==4.4.7 celery==4.4.7
crypto-dev-signer~=0.4.14a17 crypto-dev-signer~=0.4.14a17
confini~=0.3.6rc3 confini~=0.3.6rc3
cic-eth-registry~=0.5.4a9 cic-eth-registry~=0.5.4a10
#cic-bancor~=0.0.6 #cic-bancor~=0.0.6
redis==3.5.3 redis==3.5.3
alembic==1.4.2 alembic==1.4.2
websockets==8.1 websockets==8.1
requests~=2.24.0 requests~=2.24.0
eth_accounts_index~=0.0.11a6 eth_accounts_index~=0.0.11a7
erc20-transfer-authorization~=0.3.1a2 erc20-transfer-authorization~=0.3.1a3
#simple-rlp==0.1.2 #simple-rlp==0.1.2
uWSGI==2.0.19.1 uWSGI==2.0.19.1
semver==2.13.0 semver==2.13.0
websocket-client==0.57.0 websocket-client==0.57.0
moolb~=0.1.1b2 moolb~=0.1.1b2
eth-address-index~=0.1.1a5 eth-address-index~=0.1.1a6
chainlib~=0.0.1a46 chainlib~=0.0.2a1
hexathon~=0.0.1a7 hexathon~=0.0.1a7
chainsyncer~=0.0.1a20 chainsyncer~=0.0.1a21
chainqueue~=0.0.1a1 chainqueue~=0.0.1a3
pysha3==1.0.2 pysha3==1.0.2
coincurve==15.0.0 coincurve==15.0.0
sarafu-faucet~=0.0.2a15 sarafu-faucet~=0.0.2a16

View File

@ -30,9 +30,7 @@ from cic_eth.api import AdminApi
from cic_eth.db.models.role import AccountRole from cic_eth.db.models.role import AccountRole
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import LockEnum
from cic_eth.error import InitializationError from cic_eth.error import InitializationError
from cic_eth.eth.tx import ( from cic_eth.eth.gas import cache_gas_data
cache_gas_data,
)
from cic_eth.queue.tx import queue_create from cic_eth.queue.tx import queue_create
logg = logging.getLogger() logg = logging.getLogger()
@ -53,7 +51,7 @@ logg = logging.getLogger()
# gas_provider = c.gas_provider() # gas_provider = c.gas_provider()
# #
# s_nonce = celery.signature( # s_nonce = celery.signature(
# 'cic_eth.eth.tx.reserve_nonce', # 'cic_eth.eth.nonce.reserve_nonce',
# [ # [
# init_w3.eth.accounts[0], # init_w3.eth.accounts[0],
# gas_provider, # gas_provider,
@ -61,7 +59,7 @@ logg = logging.getLogger()
# queue=None, # queue=None,
# ) # )
# s_refill = celery.signature( # s_refill = celery.signature(
# 'cic_eth.eth.tx.refill_gas', # 'cic_eth.eth.gas.refill_gas',
# [ # [
# chain_str, # chain_str,
# ], # ],
@ -80,7 +78,7 @@ logg = logging.getLogger()
# o = q.first() # o = q.first()
# tx_raw = o.signed_tx # tx_raw = o.signed_tx
# #
# tx_dict = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), default_chain_spec.chain_id()) # tx_dict = unpack(bytes.fromhex(tx_raw), default_chain_spec)
# gas_price_before = tx_dict['gasPrice'] # gas_price_before = tx_dict['gasPrice']
# #
# s = celery.signature( # s = celery.signature(
@ -106,7 +104,7 @@ logg = logging.getLogger()
# #
# tx_raw_new = get_tx(tx_hash_new_hex) # tx_raw_new = get_tx(tx_hash_new_hex)
# logg.debug('get {}'.format(tx_raw_new)) # logg.debug('get {}'.format(tx_raw_new))
# tx_dict_new = unpack_signed_raw_tx(bytes.fromhex(tx_raw_new['signed_tx'][2:]), default_chain_spec.chain_id()) # tx_dict_new = unpack(bytes.fromhex(tx_raw_new['signed_tx']), default_chain_spec)
# assert tx_hash_new_hex != tx_dict['hash'] # assert tx_hash_new_hex != tx_dict['hash']
# assert tx_dict_new['gasPrice'] > gas_price_before # assert tx_dict_new['gasPrice'] > gas_price_before
# #
@ -130,7 +128,7 @@ logg = logging.getLogger()
# sigs = [] # sigs = []
# for i in range(5): # for i in range(5):
# s = celery.signature( # s = celery.signature(
# 'cic_eth.eth.tx.refill_gas', # 'cic_eth.eth.gas.refill_gas',
# [ # [
# eth_empty_accounts[i], # eth_empty_accounts[i],
# chain_str, # chain_str,
@ -278,11 +276,10 @@ def test_tx(
celery_worker, celery_worker,
): ):
chain_id = default_chain_spec.chain_id()
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED)
tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id) tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec)
queue_create(default_chain_spec, tx['nonce'], agent_roles['ALICE'], tx_hash_hex, tx_signed_raw_hex) queue_create(default_chain_spec, tx['nonce'], agent_roles['ALICE'], tx_hash_hex, tx_signed_raw_hex)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())

View File

@ -38,8 +38,6 @@ def test_list_tx(
celery_worker, celery_worker,
): ):
chain_id = default_chain_spec.chain_id()
tx_hashes = [] tx_hashes = []
# external tx # external tx
@ -59,7 +57,7 @@ def test_list_tx(
init_database.commit() init_database.commit()
init_eth_tester.mine_blocks(13) init_eth_tester.mine_blocks(13)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], 1024) (tx_hash_hex, o) = c.transfer(foo_token, custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], 1024)
eth_rpc.do(o) eth_rpc.do(o)
o = receipt(tx_hash_hex) o = receipt(tx_hash_hex)
@ -79,7 +77,7 @@ def test_list_tx(
init_eth_tester.mine_blocks(13) init_eth_tester.mine_blocks(13)
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 256) (tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 256)
eth_rpc.do(o) eth_rpc.do(o)
o = receipt(tx_hash_hex) o = receipt(tx_hash_hex)

View File

@ -73,7 +73,7 @@ def test_register_account(
): ):
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
eth_empty_accounts[0], eth_empty_accounts[0],
default_chain_spec.asdict(), default_chain_spec.asdict(),
@ -160,7 +160,7 @@ def test_gift(
): ):
nonce_oracle = RPCNonceOracle(contract_roles['ACCOUNT_REGISTRY_WRITER'], eth_rpc) nonce_oracle = RPCNonceOracle(contract_roles['ACCOUNT_REGISTRY_WRITER'], eth_rpc)
c = AccountRegistry(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=default_chain_spec.chain_id()) c = AccountRegistry(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add(account_registry, contract_roles['ACCOUNT_REGISTRY_WRITER'], agent_roles['ALICE']) (tx_hash_hex, o) = c.add(account_registry, contract_roles['ACCOUNT_REGISTRY_WRITER'], agent_roles['ALICE'])
eth_rpc.do(o) eth_rpc.do(o)
o = receipt(tx_hash_hex) o = receipt(tx_hash_hex)
@ -168,7 +168,7 @@ def test_gift(
assert r['status'] == 1 assert r['status'] == 1
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
agent_roles['ALICE'], agent_roles['ALICE'],
default_chain_spec.asdict(), default_chain_spec.asdict(),

View File

@ -28,7 +28,7 @@ def test_otx_cache_transfer(
celery_session_worker, celery_session_worker,
): ):
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], eth_rpc) nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], eth_rpc)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=default_chain_spec.chain_id()) c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
transfer_value = 100 * (10**6) transfer_value = 100 * (10**6)
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value, tx_format=TxFormat.RLP_SIGNED)
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
@ -59,7 +59,7 @@ def test_erc20_balance_task(
): ):
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], eth_rpc) nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], eth_rpc)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=default_chain_spec.chain_id()) c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
transfer_value = 100 * (10**6) transfer_value = 100 * (10**6)
(tx_hash_hex, o) = c.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value) (tx_hash_hex, o) = c.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], transfer_value)
eth_rpc.do(o) eth_rpc.do(o)
@ -102,7 +102,7 @@ def test_erc20_transfer_task(
transfer_value = 100 * (10 ** 6) transfer_value = 100 * (10 ** 6)
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
[token_object], [token_object],
default_chain_spec.asdict(), default_chain_spec.asdict(),
@ -144,7 +144,7 @@ def test_erc20_approve_task(
transfer_value = 100 * (10 ** 6) transfer_value = 100 * (10 ** 6)
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce', 'cic_eth.eth.nonce.reserve_nonce',
[ [
[token_object], [token_object],
default_chain_spec.asdict(), default_chain_spec.asdict(),

View File

@ -17,7 +17,7 @@ from chainqueue.db.models.otx import Otx
# local imports # local imports
from cic_eth.queue.tx import register_tx from cic_eth.queue.tx import register_tx
from cic_eth.eth.tx import cache_gas_data from cic_eth.eth.gas import cache_gas_data
logg = logging.getLogger() logg = logging.getLogger()
@ -32,11 +32,9 @@ def test_tx_send(
celery_session_worker, celery_session_worker,
): ):
chain_id = default_chain_spec.chain_id()
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED)
#unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id)
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
@ -79,17 +77,15 @@ def test_resend_with_higher_gas(
celery_session_worker, celery_session_worker,
): ):
chain_id = default_chain_spec.chain_id()
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED)
#unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id)
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_before = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec.chain_id()) tx_before = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec)
s = celery.signature( s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas', 'cic_eth.eth.gas.resend_with_higher_gas',
[ [
tx_hash_hex, tx_hash_hex,
default_chain_spec.asdict(), default_chain_spec.asdict(),
@ -105,7 +101,7 @@ def test_resend_with_higher_gas(
if otx == None: if otx == None:
raise NotLocalTxError(r) raise NotLocalTxError(r)
tx_after = unpack(bytes.fromhex(strip_0x(otx.signed_tx)), default_chain_spec.chain_id()) tx_after = unpack(bytes.fromhex(strip_0x(otx.signed_tx)), default_chain_spec)
logg.debug('gasprices before {} after {}'.format(tx_before['gasPrice'], tx_after['gasPrice'])) logg.debug('gasprices before {} after {}'.format(tx_before['gasPrice'], tx_after['gasPrice']))
assert tx_after['gasPrice'] > tx_before['gasPrice'] assert tx_after['gasPrice'] > tx_before['gasPrice']

View File

@ -33,14 +33,13 @@ def test_set(
agent_roles, agent_roles,
): ):
chain_id = default_chain_spec.chain_id()
rpc = RPCConnection.connect(default_chain_spec, 'default') rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
gas_oracle = RPCGasOracle(eth_rpc) gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_id) c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED)
tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id) tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec)
otx = Otx( otx = Otx(
tx['nonce'], tx['nonce'],
@ -87,18 +86,17 @@ def test_clone(
agent_roles, agent_roles,
): ):
chain_id = default_chain_spec.chain_id()
rpc = RPCConnection.connect(default_chain_spec, 'default') rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
gas_oracle = StaticGasOracle(2 * (10 ** 9), 21000) gas_oracle = StaticGasOracle(2 * (10 ** 9), 21000)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_id) c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
txs_rpc = [ txs_rpc = [
c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED), c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED),
] ]
gas_oracle = StaticGasOracle(4 * (10 ** 9), 21000) gas_oracle = StaticGasOracle(4 * (10 ** 9), 21000)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=chain_id) c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
txs_rpc += [ txs_rpc += [
c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED), c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED),
] ]
@ -107,7 +105,7 @@ def test_clone(
for tx_rpc in txs_rpc: for tx_rpc in txs_rpc:
tx_hash_hex = tx_rpc[0] tx_hash_hex = tx_rpc[0]
tx_signed_raw_hex = tx_rpc[1] tx_signed_raw_hex = tx_rpc[1]
tx_dict = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id) tx_dict = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec)
otx = Otx( otx = Otx(
tx_dict['nonce'], tx_dict['nonce'],
tx_hash_hex, tx_hash_hex,

View File

@ -18,13 +18,12 @@ def test_unpack(
agent_roles, agent_roles,
): ):
chain_id = default_chain_spec.chain_id()
rpc = RPCConnection.connect(default_chain_spec, 'default') rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
gas_oracle = RPCGasOracle(eth_rpc) gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED)
tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id=chain_id) tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec)
assert tx_hash_hex == tx['hash'] assert tx_hash_hex == tx['hash']

View File

@ -23,7 +23,7 @@ def test_translate(
nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc) nonce_oracle = RPCNonceOracle(contract_roles['CONTRACT_DEPLOYER'], eth_rpc)
c = AddressDeclarator(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=default_chain_spec.chain_id()) c = AddressDeclarator(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
description = 'alice'.encode('utf-8').ljust(32, b'\x00').hex() description = 'alice'.encode('utf-8').ljust(32, b'\x00').hex()
(tx_hash_hex, o) = c.add_declaration(address_declarator, contract_roles['CONTRACT_DEPLOYER'], agent_roles['ALICE'], add_0x(description)) (tx_hash_hex, o) = c.add_declaration(address_declarator, contract_roles['CONTRACT_DEPLOYER'], agent_roles['ALICE'], add_0x(description))

View File

@ -15,7 +15,7 @@ from cic_eth.db.enum import LockEnum
from cic_eth.db.models.lock import Lock from cic_eth.db.models.lock import Lock
from cic_eth.queue.query import get_upcoming_tx from cic_eth.queue.query import get_upcoming_tx
from cic_eth.queue.tx import register_tx from cic_eth.queue.tx import register_tx
from cic_eth.eth.tx import cache_gas_data from cic_eth.eth.gas import cache_gas_data
# test imports # test imports
from tests.util.nonce import StaticNonceOracle from tests.util.nonce import StaticNonceOracle
@ -32,7 +32,7 @@ def test_upcoming_with_lock(
rpc = RPCConnection.connect(default_chain_spec, 'default') rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = StaticNonceOracle(42) nonce_oracle = StaticNonceOracle(42)
gas_oracle = RPCGasOracle(eth_rpc) gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_rpc) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)) (tx_hash_hex, tx_rpc) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6))
tx_signed_raw_hex = tx_rpc['params'][0] tx_signed_raw_hex = tx_rpc['params'][0]

View File

@ -57,9 +57,9 @@ WORKDIR /home/grassroots
USER grassroots USER grassroots
ARG pip_extra_index_url=https://pip.grassrootseconomics.net:8433 ARG pip_extra_index_url=https://pip.grassrootseconomics.net:8433
ARG cic_base_version=0.1.2a55 ARG cic_base_version=0.1.2a56
ARG cic_eth_version=0.10.1b3 ARG cic_eth_version=0.11.1a2
ARG sarafu_faucet_version=0.0.2a15 ARG sarafu_faucet_version=0.0.2a16
ARG cic_contracts_version=0.0.2a2 ARG cic_contracts_version=0.0.2a2
RUN pip install --user --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version \ RUN pip install --user --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version \
cic-eth==$cic_eth_version \ cic-eth==$cic_eth_version \

View File

@ -1,5 +1,5 @@
cic-base[full_graph]==0.1.2a55 cic-base[full_graph]==0.1.2a56
sarafu-faucet==0.0.2a15 sarafu-faucet==0.0.2a16
cic-eth==0.10.1b2+build.9c750e70 cic-eth==0.11.0a2
cic-types==0.1.0a8 cic-types==0.1.0a8
crypto-dev-signer==0.4.14a17 crypto-dev-signer==0.4.14a17