WIP Bind nonce to root task uuid

This commit is contained in:
nolash 2021-03-05 14:05:40 +01:00
parent 195b1e3c9b
commit 77f6eb193c
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
29 changed files with 874 additions and 275 deletions

View File

@ -92,6 +92,11 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature( s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol', 'cic_eth.eth.token.resolve_tokens_by_symbol',
[ [
@ -110,7 +115,8 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_check.link(s_tokens) s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None: if self.callback_param != None:
s_convert.link(self.callback_success) s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error) s_tokens.link(s_convert).on_error(self.callback_error)
@ -147,6 +153,11 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature( s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol', 'cic_eth.eth.token.resolve_tokens_by_symbol',
[ [
@ -165,7 +176,8 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_check.link(s_tokens) s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None: if self.callback_param != None:
s_convert.link(self.callback_success) s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error) s_tokens.link(s_convert).on_error(self.callback_error)
@ -200,6 +212,13 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
from_address,
],
queue=self.queue,
)
s_tokens = celery.signature( s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol', 'cic_eth.eth.token.resolve_tokens_by_symbol',
[ [
@ -217,7 +236,8 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_check.link(s_tokens) s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None: if self.callback_param != None:
s_transfer.link(self.callback_success) s_transfer.link(self.callback_success)
s_tokens.link(s_transfer).on_error(self.callback_error) s_tokens.link(s_transfer).on_error(self.callback_error)
@ -228,82 +248,6 @@ class Api:
return t return t
def transfer_request(self, from_address, to_address, spender_address, value, token_symbol):
"""Executes a chain of celery tasks that issues a transfer request of ERC20 tokens from one address to another.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of recipient
:type to_address: str, 0x-hex
:param spender_address: Ethereum address that is executing transfer (typically an escrow contract)
:type spender_address: str, 0x-hex
:param value: Estimated return from conversion
:type value: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
self.chain_str,
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_tokens_transfer_approval = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_tokens_approve = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
from_address,
spender_address,
value,
self.chain_str,
],
queue=self.queue,
)
s_transfer_approval = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
from_address,
to_address,
value,
self.chain_str,
],
queue=self.queue,
)
# TODO: make approve and transfer_approval chainable so callback can be part of the full chain
if self.callback_param != None:
s_transfer_approval.link(self.callback_success)
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval).on_error(self.callback_error)
else:
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval)
g = celery.group(s_tokens_approve, s_tokens_transfer_approval) #s_tokens.apply_async(queue=self.queue)
s_check.link(g)
t = s_check.apply_async()
#t = s_tokens.apply_async(queue=self.queue)
return t
def balance(self, address, token_symbol, include_pending=True): def balance(self, address, token_symbol, include_pending=True):
"""Calls the provided callback with the current token balance of the given address. """Calls the provided callback with the current token balance of the given address.
@ -396,6 +340,11 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_account = celery.signature( s_account = celery.signature(
'cic_eth.eth.account.create', 'cic_eth.eth.account.create',
[ [
@ -403,7 +352,8 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_check.link(s_account) s_nonce.link(s_account)
s_check.link(s_nonce)
if self.callback_param != None: if self.callback_param != None:
s_account.link(self.callback_success) s_account.link(self.callback_success)
@ -438,6 +388,11 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_refill = celery.signature( s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas', 'cic_eth.eth.tx.refill_gas',
[ [
@ -445,7 +400,8 @@ class Api:
], ],
queue=self.queue, queue=self.queue,
) )
s_check.link(s_refill) s_nonce.link(s_refill)
s_check.link(s_nonce)
if self.callback_param != None: if self.callback_param != None:
s_refill.link(self.callback_success) s_refill.link(self.callback_success)

View File

@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@ -54,7 +54,7 @@ class SessionBase(Model):
@staticmethod @staticmethod
def connect(dsn, pool_size=8, debug=False): def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend. """Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection. :param dsn: DSN string defining connection.

View File

@ -1,11 +1,16 @@
# standard imports # standard imports
import logging import logging
import datetime
# third-party imports # third-party imports
from sqlalchemy import Column, String, Integer from sqlalchemy import Column, String, Integer, DateTime
# local imports # local imports
from .base import SessionBase from .base import SessionBase
from cic_eth.error import (
InitializationError,
IntegrityError,
)
logg = logging.getLogger() logg = logging.getLogger()
@ -37,23 +42,43 @@ class Nonce(SessionBase):
@staticmethod @staticmethod
def __get(session, address): def __get(conn, address):
r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address)) r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
nonce = r.fetchone() nonce = r.fetchone()
session.flush()
if nonce == None: if nonce == None:
return None return None
return nonce[0] return nonce[0]
@staticmethod @staticmethod
def __set(session, address, nonce): def __set(conn, address, nonce):
session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address)) conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
session.flush()
@staticmethod @staticmethod
def next(address, initial_if_not_exists=0, session=None): def __init(conn, address, nonce):
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
@staticmethod
def init(address, nonce=0, session=None):
session = SessionBase.bind_session(session)
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==address)
o = q.first()
if o != None:
session.flush()
raise InitializationError('nonce on {} already exists ({})'.format(address, o.nonce))
session.flush()
Nonce.__init(session, address, nonce)
SessionBase.release_session(session)
# TODO: Incrementing nonce MUST be done by separate tasks.
@staticmethod
def next(address, initial_if_not_exists=0):
"""Generate next nonce for the given address. """Generate next nonce for the given address.
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given. If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
@ -65,32 +90,97 @@ class Nonce(SessionBase):
:returns: Nonce :returns: Nonce
:rtype: number :rtype: number
""" """
session = SessionBase.bind_session(session) #session = SessionBase.bind_session(session)
SessionBase.release_session(session) #session.begin_nested()
conn = Nonce.engine.connect()
session.begin_nested()
#conn = Nonce.engine.connect()
if Nonce.transactional: if Nonce.transactional:
#session.execute('BEGIN') conn.execute('BEGIN')
session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE') conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
session.flush() logg.debug('locking nonce table for address {}'.format(address))
nonce = Nonce.__get(session, address) nonce = Nonce.__get(conn, address)
logg.debug('get nonce {} for address {}'.format(nonce, address)) logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None: if nonce == None:
nonce = initial_if_not_exists nonce = initial_if_not_exists
session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
session.flush()
logg.debug('setting default nonce to {} for address {}'.format(nonce, address)) logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
Nonce.__set(session, address, nonce+1) Nonce.__init(conn, address, nonce)
#if Nonce.transactional: Nonce.__set(conn, address, nonce+1)
#session.execute('COMMIT') if Nonce.transactional:
#session.execute('UNLOCK TABLE nonce') conn.execute('COMMIT')
#conn.close() logg.debug('unlocking nonce table for address {}'.format(address))
session.commit() conn.close()
#session.commit() #session.commit()
SessionBase.release_session(session) #SessionBase.release_session(session)
return nonce return nonce
class NonceReservation(SessionBase):
__tablename__ = 'nonce_task_reservation'
nonce = Column(Integer)
key = Column(String)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
@staticmethod
def peek(key, session=None):
session = SessionBase.bind_session(session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
nonce = None
if o != None:
nonce = o.nonce
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def release(key, session=None):
session = SessionBase.bind_session(session)
nonce = NonceReservation.peek(key, session=session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
if o == None:
raise IntegrityError('nonce for key {}'.format(nonce))
SessionBase.release_session(session)
session.delete(o)
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def next(address, key, session=None):
session = SessionBase.bind_session(session)
if NonceReservation.peek(key, session) != None:
raise IntegrityError('nonce for key {}'.format(key))
nonce = Nonce.next(address)
o = NonceReservation()
o.nonce = nonce
o.key = key
session.add(o)
SessionBase.release_session(session)
return nonce

View File

@ -143,7 +143,7 @@ class TxCache(SessionBase):
self.block_number = block_number self.block_number = block_number
self.tx_index = tx_index self.tx_index = tx_index
# not automatically set in sqlite, it seems: # not automatically set in sqlite, it seems:
self.date_created = datetime.datetime.now() self.date_created = datetime.datetime.utcnow()
self.date_updated = self.date_created self.date_updated = self.date_created
self.date_checked = self.date_created self.date_checked = self.date_created

View File

@ -54,6 +54,13 @@ class RoleMissingError(Exception):
pass pass
class IntegrityError(Exception):
"""Exception raised to signal irregularities with deduplication and ordering of tasks
"""
pass
class LockedError(Exception): class LockedError(Exception):
"""Exception raised when attempt is made to execute action that is deactivated by lock """Exception raised when attempt is made to execute action that is deactivated by lock

View File

@ -36,6 +36,7 @@ class AccountTxFactory(TxFactory):
self, self,
address, address,
chain_spec, chain_spec,
uuid,
session=None, session=None,
): ):
"""Register an Ethereum account address with the on-chain account registry """Register an Ethereum account address with the on-chain account registry
@ -59,7 +60,7 @@ class AccountTxFactory(TxFactory):
'gas': gas, 'gas': gas,
'gasPrice': self.gas_price, 'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(), 'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session), 'nonce': self.next_nonce(uuid, session=session),
'value': 0, 'value': 0,
}) })
return tx_add return tx_add
@ -69,6 +70,7 @@ class AccountTxFactory(TxFactory):
self, self,
address, address,
chain_spec, chain_spec,
uuid,
session=None, session=None,
): ):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account """Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
@ -90,7 +92,7 @@ class AccountTxFactory(TxFactory):
'gas': gas, 'gas': gas,
'gasPrice': self.gas_price, 'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(), 'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session), 'nonce': self.next_nonce(uuid, session=session),
'value': 0, 'value': 0,
}) })
return tx_add return tx_add
@ -156,18 +158,8 @@ def create(password, chain_str):
logg.debug('created account {}'.format(a)) logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account # Initialize nonce provider record for account
# TODO: this can safely be set to zero, since we are randomly creating account
n = c.w3.eth.getTransactionCount(a, 'pending')
session = SessionBase.create_session() session = SessionBase.create_session()
q = session.query(Nonce) Nonce.init(a, session=session)
q = q.filter(Nonce.address_hex==a)
o = q.first()
session.flush()
if o == None:
o = Nonce()
o.address_hex = a
o.nonce = n
session.add(o)
session.commit() session.commit()
session.close() session.close()
return a return a
@ -203,7 +195,7 @@ def register(self, account_address, chain_str, writer_address=None):
c = RpcClient(chain_spec, holder_address=writer_address) c = RpcClient(chain_spec, holder_address=writer_address)
txf = AccountTxFactory(writer_address, c) txf = AccountTxFactory(writer_address, c)
tx_add = txf.add(account_address, chain_spec, session=session) tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
session.close() session.close()
@ -243,7 +235,7 @@ def gift(self, account_address, chain_str):
txf = AccountTxFactory(account_address, c) txf = AccountTxFactory(account_address, c)
session = SessionBase.create_session() session = SessionBase.create_session()
tx_add = txf.gift(account_address, chain_spec, session=session) tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
session.close() session.close()

View File

@ -32,10 +32,10 @@ class TxFactory:
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price)) logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
def next_nonce(self, session=None): def next_nonce(self, uuid, session=None):
"""Returns the current cached nonce value, and increments it for next transaction. """Returns the current reserved nonce value, and increments it for next transaction.
:returns: Nonce :returns: Nonce
:rtype: number :rtype: number
""" """
return self.nonce_oracle.next(session=session) return self.nonce_oracle.next_by_task_uuid(uuid, session=session)

View File

@ -1,5 +1,8 @@
# local imports # local imports
from cic_eth.db.models.nonce import Nonce from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
class NonceOracle(): class NonceOracle():
"""Ensures atomic nonce increments for all transactions across all tasks and threads. """Ensures atomic nonce increments for all transactions across all tasks and threads.
@ -14,10 +17,15 @@ class NonceOracle():
self.default_nonce = default_nonce self.default_nonce = default_nonce
def next(self, session=None): def next(self):
"""Get next unique nonce. """Get next unique nonce.
:returns: Nonce :returns: Nonce
:rtype: number :rtype: number
""" """
return Nonce.next(self.address, self.default_nonce, session=session) raise AttributeError('this should not be called')
return Nonce.next(self.address, self.default_nonce)
def next_by_task_uuid(self, uuid, session=None):
return NonceReservation.release(uuid, session=session)

View File

@ -46,6 +46,7 @@ class TokenTxFactory(TxFactory):
spender_address, spender_address,
amount, amount,
chain_spec, chain_spec,
uuid,
session=None, session=None,
): ):
"""Create an ERC20 "approve" transaction """Create an ERC20 "approve" transaction
@ -74,7 +75,7 @@ class TokenTxFactory(TxFactory):
'gas': source_token_gas, 'gas': source_token_gas,
'gasPrice': self.gas_price, 'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(), 'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session), 'nonce': self.next_nonce(uuid, session=session),
}) })
return tx_approve return tx_approve
@ -85,6 +86,7 @@ class TokenTxFactory(TxFactory):
receiver_address, receiver_address,
value, value,
chain_spec, chain_spec,
uuid,
session=None, session=None,
): ):
"""Create an ERC20 "transfer" transaction """Create an ERC20 "transfer" transaction
@ -114,7 +116,7 @@ class TokenTxFactory(TxFactory):
'gas': source_token_gas, 'gas': source_token_gas,
'gasPrice': self.gas_price, 'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(), 'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session), 'nonce': self.next_nonce(uuid, session=session),
}) })
return tx_transfer return tx_transfer
@ -248,7 +250,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
txf = TokenTxFactory(holder_address, c) txf = TokenTxFactory(holder_address, c)
session = SessionBase.create_session() session = SessionBase.create_session()
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, session=session) tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session)
session.close() session.close()
@ -304,7 +306,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
txf = TokenTxFactory(holder_address, c) txf = TokenTxFactory(holder_address, c)
session = SessionBase.create_session() session = SessionBase.create_session()
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, session=session) tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session) (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session)
session.close() session.close()

View File

@ -12,6 +12,7 @@ from cic_registry.chain import ChainSpec
from .rpc import RpcClient from .rpc import RpcClient
from cic_eth.db import Otx, SessionBase from cic_eth.db import Otx, SessionBase
from cic_eth.db.models.tx import TxCache from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import NonceReservation
from cic_eth.db.models.lock import Lock from cic_eth.db.models.lock import Lock
from cic_eth.db.enum import ( from cic_eth.db.enum import (
LockEnum, LockEnum,
@ -84,15 +85,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required)) logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
if gas_required > balance: if gas_required > balance:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature( s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas', 'cic_eth.eth.tx.refill_gas',
[ [
address,
chain_str, chain_str,
], ],
queue=queue, queue=queue,
) )
s_refill_gas.apply_async() s_nonce.link(s_refill_gas)
s_nonce.apply_async()
wait_tasks = [] wait_tasks = []
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
s = celery.signature( s = celery.signature(
@ -108,15 +117,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
safe_gas = c.safe_threshold_amount() safe_gas = c.safe_threshold_amount()
if balance < safe_gas: if balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce'
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature( s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas', 'cic_eth.eth.tx.refill_gas',
[ [
address,
chain_str, chain_str,
], ],
queue=queue, queue=queue,
) )
s_refill_gas.apply_async() s_nonce.link(s_refill)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address)) logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address))
ready_tasks = [] ready_tasks = []
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
@ -291,7 +308,7 @@ class ParityNodeHandler:
tx_hash_hex, tx_hash_hex,
debugstr, debugstr,
], ],
queue=queue, queue=self.queue,
) )
s_set_reject.link(s_debug) s_set_reject.link(s_debug)
s_lock.link(s_set_reject) s_lock.link(s_set_reject)
@ -299,7 +316,7 @@ class ParityNodeHandler:
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()))) return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_default(self, tx_hash_hex, tx_hex): def handle_default(self, tx_hash_hex, tx_hex, debugstr):
tx_bytes = bytes.fromhex(tx_hex[2:]) tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id()) tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature( s_lock = celery.signature(
@ -317,9 +334,19 @@ class ParityNodeHandler:
[], [],
queue=self.queue, queue=self.queue,
) )
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
tx_hash_hex,
debugstr,
],
queue=self.queue,
)
s_set_fubar.link(s_debug)
s_lock.link(s_set_fubar) s_lock.link(s_set_fubar)
t = s_lock.apply_async() t = s_lock.apply_async()
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()))) return (t, PermanentTxError, 'Fubar {} {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()), debugstr))
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic. # TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
@ -407,6 +434,7 @@ def refill_gas(self, recipient_address, chain_str):
""" """
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
zero_amount = False
session = SessionBase.create_session() session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash) q = session.query(Otx.tx_hash)
@ -416,8 +444,11 @@ def refill_gas(self, recipient_address, chain_str):
q = q.filter(TxCache.recipient==recipient_address) q = q.filter(TxCache.recipient==recipient_address)
c = q.count() c = q.count()
if c > 0: if c > 0:
session.close() #session.close()
raise AlreadyFillingGasError(recipient_address) #raise AlreadyFillingGasError(recipient_address)
logg.warning(str(AlreadyFillingGasError(recipient_address)))
zero_amount = True
session.flush()
queue = self.request.delivery_info['routing_key'] queue = self.request.delivery_info['routing_key']
@ -426,9 +457,12 @@ def refill_gas(self, recipient_address, chain_str):
logg.debug('refill gas from provider address {}'.format(c.gas_provider())) logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending') default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
nonce_generator = NonceOracle(c.gas_provider(), default_nonce) nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
nonce = nonce_generator.next(session=session) #nonce = nonce_generator.next(session=session)
nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session)
gas_price = c.gas_price() gas_price = c.gas_price()
gas_limit = c.default_gas_limit gas_limit = c.default_gas_limit
refill_amount = 0
if not zero_amount:
refill_amount = c.refill_amount() refill_amount = c.refill_amount()
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce)) logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
@ -475,6 +509,7 @@ def refill_gas(self, recipient_address, chain_str):
queue=queue, queue=queue,
) )
celery.group(s_tx_cache, s_status)() celery.group(s_tx_cache, s_status)()
return tx_send_gas_signed['raw'] return tx_send_gas_signed['raw']
@ -554,6 +589,21 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
return tx_hash_hex return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, address=None):
session = SessionBase.create_session()
if address == None:
address = chained_input
root_id = self.request.root_id
nonce = NonceReservation.next(address, root_id)
session.close()
return chained_input
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task) @celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_str): def sync_tx(self, tx_hash_hex, chain_str):
"""Force update of network status of a simgle transaction """Force update of network status of a simgle transaction

View File

@ -315,7 +315,9 @@ def set_ready(tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def set_dequeue(tx_hash): def set_dequeue(tx_hash):
session = SessionBase.create_session() session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
if o == None: if o == None:
session.close() session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
@ -566,7 +568,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
return txs return txs
def get_status_tx(status, before=None, exact=False, limit=0, session=None): def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, session=None):
"""Retrieve transaction with a specific queue status. """Retrieve transaction with a specific queue status.
:param status: Status to match transactions with :param status: Status to match transactions with
@ -582,11 +584,15 @@ def get_status_tx(status, before=None, exact=False, limit=0, session=None):
session = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
q = session.query(Otx) q = session.query(Otx)
q = q.join(TxCache) q = q.join(TxCache)
# before = datetime.datetime.utcnow()
if before != None:
q = q.filter(TxCache.date_updated<before) q = q.filter(TxCache.date_updated<before)
if exact: if exact:
q = q.filter(Otx.status==status.value) q = q.filter(Otx.status==status)
else: else:
q = q.filter(Otx.status.op('&')(status.value)==status.value) q = q.filter(Otx.status.op('&')(status)>0)
if not_status != None:
q = q.filter(Otx.status.op('&')(not_status)==0)
i = 0 i = 0
for o in q.all(): for o in q.all():
if limit > 0 and i == limit: if limit > 0 and i == limit:

View File

@ -121,6 +121,7 @@ class DispatchSyncer:
set_dequeue(tx['hash']) set_dequeue(tx['hash'])
except NotLocalTxError as e: except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash'])) logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
continue
s_check = celery.signature( s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock', 'cic_eth.admin.ctrl.check_lock',

View File

@ -138,7 +138,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_str):
# TODO: can we merely use the dispatcher instead? # TODO: can we merely use the dispatcher instead?
def dispatch(chain_str): def dispatch(chain_str):
txs = get_status_tx(StatusEnum.RETRY, datetime.datetime.utcnow()) txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow())
if len(txs) == 0: if len(txs) == 0:
logg.debug('no retry state txs found') logg.debug('no retry state txs found')
return return

View File

@ -9,7 +9,10 @@ import celery
# local imports # local imports
from .base import Syncer from .base import Syncer
from cic_eth.eth.rpc import RpcClient from cic_eth.eth.rpc import RpcClient
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
from cic_eth.queue.tx import get_status_tx from cic_eth.queue.tx import get_status_tx
logg = logging.getLogger() logg = logging.getLogger()
@ -47,7 +50,8 @@ class RetrySyncer(Syncer):
# ) # )
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
stalled_txs = get_status_tx( stalled_txs = get_status_tx(
StatusEnum.SENT.value, StatusBits.IN_NETWORK.value,
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
before=before, before=before,
) )
# return list(failed_txs.keys()) + list(stalled_txs.keys()) # return list(failed_txs.keys()) + list(stalled_txs.keys())

View File

@ -27,7 +27,7 @@ def database_engine(
SessionBase.poolable = False SessionBase.poolable = False
dsn = dsn_from_config(load_config) dsn = dsn_from_config(load_config)
#SessionBase.connect(dsn, True) #SessionBase.connect(dsn, True)
SessionBase.connect(dsn, load_config.get('DATABASE_DEBUG') != None) SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None)
return dsn return dsn

View File

@ -15,6 +15,7 @@ from eth_keys import KeyAPI
from cic_eth.eth import RpcClient from cic_eth.eth import RpcClient
from cic_eth.eth.rpc import GasOracle from cic_eth.eth.rpc import GasOracle
from cic_eth.db.models.role import AccountRole from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
#logg = logging.getLogger(__name__) #logg = logging.getLogger(__name__)
logg = logging.getLogger() logg = logging.getLogger()
@ -128,8 +129,10 @@ def init_eth_account_roles(
w3_account_roles, w3_account_roles,
): ):
role = AccountRole.set('GAS_GIFTER', w3_account_roles.get('eth_account_gas_provider')) address = w3_account_roles.get('eth_account_gas_provider')
role = AccountRole.set('GAS_GIFTER', address)
init_database.add(role) init_database.add(role)
Nonce.init(address, session=init_database)
init_database.commit() init_database.commit()
return w3_account_roles return w3_account_roles
@ -187,6 +190,7 @@ def w3_account_roles(
return roles return roles
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def w3_account_token_owners( def w3_account_token_owners(
tokens_to_deploy, tokens_to_deploy,

View File

@ -64,6 +64,7 @@ def test_register_account(
init_database, init_database,
init_eth_tester, init_eth_tester,
init_w3, init_w3,
init_rpc,
cic_registry, cic_registry,
celery_session_worker, celery_session_worker,
eth_empty_accounts, eth_empty_accounts,
@ -71,18 +72,31 @@ def test_register_account(
logg.debug('chainspec {}'.format(str(default_chain_spec))) logg.debug('chainspec {}'.format(str(default_chain_spec)))
s = celery.signature( nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
'cic_eth.eth.account.register', Nonce.init(init_w3.eth.accounts[0], nonce, session=init_database)
init_database.commit()
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[ [
eth_empty_accounts[0], eth_empty_accounts[0],
init_w3.eth.accounts[0],
],
queue=None,
)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
str(default_chain_spec), str(default_chain_spec),
init_w3.eth.accounts[0], init_w3.eth.accounts[0],
], ],
) )
t = s.apply_async() s_nonce.link(s_register)
t = s_nonce.apply_async()
address = t.get() address = t.get()
r = t.collect() for r in t.collect():
t.successful() pass
assert t.successful()
session = SessionBase.create_session() session = SessionBase.create_session()
o = session.query(Otx).first() o = session.query(Otx).first()

View File

@ -8,6 +8,7 @@ import celery
# local imports # local imports
from cic_eth.eth.rpc import RpcClient from cic_eth.eth.rpc import RpcClient
from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import Otx
from cic_eth.db.models.nonce import Nonce
from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.eth.util import unpack_signed_raw_tx
#logg = logging.getLogger(__name__) #logg = logging.getLogger(__name__)
@ -31,18 +32,32 @@ def test_balance_complex(
} }
tx_hashes = [] tx_hashes = []
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
Nonce.init(init_w3.eth.accounts[0], nonce, session=init_database)
init_database.commit()
for i in range(3): for i in range(3):
s = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.token.transfer', 'cic_eth.eth.tx.reserve_nonce',
[ [
[token_data], [token_data],
init_w3.eth.accounts[0],
],
queue=None,
)
s_transfer = celery.signature(
'cic_eth.eth.token.transfer',
[
init_w3.eth.accounts[0], init_w3.eth.accounts[0],
init_w3.eth.accounts[1], init_w3.eth.accounts[1],
1000*(i+1), 1000*(i+1),
chain_str, chain_str,
], ],
queue=None,
) )
t = s.apply_async() s_nonce.link(s_transfer)
t = s_nonce.apply_async()
t.get() t.get()
r = None r = None
for c in t.collect(): for c in t.collect():

View File

@ -9,6 +9,7 @@ from cic_eth.eth.bancor import BancorTxFactory
logg = logging.getLogger() logg = logging.getLogger()
@pytest.mark.skip()
def test_transfer_after_convert( def test_transfer_after_convert(
init_w3, init_w3,
init_database, init_database,

View File

@ -10,6 +10,9 @@ import celery
from cic_eth.eth.account import unpack_gift from cic_eth.eth.account import unpack_gift
from cic_eth.eth.factory import TxFactory from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
logg = logging.getLogger() logg = logging.getLogger()
@ -32,10 +35,21 @@ def test_faucet(
init_database, init_database,
): ):
s = celery.signature( nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[7])
'cic_eth.eth.account.gift', Nonce.init(init_w3.eth.accounts[7], nonce, session=init_database)
init_database.commit()
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[ [
init_w3.eth.accounts[7], init_w3.eth.accounts[7],
init_w3.eth.accounts[0],
],
queue=None,
)
s_gift = celery.signature(
'cic_eth.eth.account.gift',
[
str(default_chain_spec), str(default_chain_spec),
], ],
) )
@ -45,15 +59,21 @@ def test_faucet(
str(default_chain_spec), str(default_chain_spec),
], ],
) )
s.link(s_send) s_gift.link(s_send)
t = s.apply_async() s_nonce.link(s_gift)
signed_tx = t.get() t = s_nonce.apply_async()
t.get()
for r in t.collect(): for r in t.collect():
logg.debug('result {}'.format(r)) logg.debug('result {}'.format(r))
assert t.successful() assert t.successful()
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[0][2:]), default_chain_spec.chain_id()) q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==init_w3.eth.accounts[7])
o = q.first()
signed_tx = o.signed_tx
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[2:]), default_chain_spec.chain_id())
giveto = unpack_gift(tx['data']) giveto = unpack_gift(tx['data'])
assert giveto['to'] == init_w3.eth.accounts[7] assert giveto['to'] == init_w3.eth.accounts[7]

View File

@ -44,23 +44,39 @@ def test_refill_gas(
refill_amount = c.refill_amount() refill_amount = c.refill_amount()
balance = init_rpc.w3.eth.getBalance(receiver_address) balance = init_rpc.w3.eth.getBalance(receiver_address)
s = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.refill_gas', 'cic_eth.eth.tx.reserve_nonce',
[ [
receiver_address, receiver_address,
provider_address,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
str(default_chain_spec), str(default_chain_spec),
], ],
queue=None,
) )
t = s.apply_async() s_nonce.link(s_refill)
t = s_nonce.apply_async()
r = t.get() r = t.get()
t.collect() for c in t.collect():
pass
assert t.successful() assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==receiver_address)
o = q.first()
signed_tx = o.signed_tx
s = celery.signature( s = celery.signature(
'cic_eth.eth.tx.send', 'cic_eth.eth.tx.send',
[ [
[r], [signed_tx],
str(default_chain_spec), str(default_chain_spec),
], ],
) )
@ -99,83 +115,131 @@ def test_refill_deduplication(
c = init_rpc c = init_rpc
refill_amount = c.refill_amount() refill_amount = c.refill_amount()
s = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.refill_gas', 'cic_eth.eth.tx.reserve_nonce',
[ [
receiver_address, receiver_address,
provider_address,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
str(default_chain_spec), str(default_chain_spec),
], ],
queue=None,
) )
t = s.apply_async() s_nonce.link(s_refill)
t = s_nonce.apply_async()
r = t.get() r = t.get()
for e in t.collect(): for e in t.collect():
pass pass
assert t.successful() assert t.successful()
s = celery.signature( s_nonce = celery.signature(
'cic_eth.eth.tx.refill_gas', 'cic_eth.eth.tx.reserve_nonce',
[ [
receiver_address, receiver_address,
str(default_chain_spec), provider_address,
], ],
queue=None,
) )
s_refill = celery.signature(
t = s.apply_async() 'cic_eth.eth.tx.refill_gas',
with pytest.raises(AlreadyFillingGasError):
t.get()
def test_check_gas(
default_chain_spec,
init_eth_tester,
init_w3,
init_rpc,
eth_empty_accounts,
init_database,
cic_registry,
celery_session_worker,
bancor_registry,
bancor_tokens,
):
provider_address = init_w3.eth.accounts[0]
gas_receiver_address = eth_empty_accounts[0]
token_receiver_address = init_w3.eth.accounts[1]
c = init_rpc
txf = TokenTxFactory(gas_receiver_address, c)
tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
gas_price = c.gas_price()
gas_limit = tx_transfer['gas']
s = celery.signature(
'cic_eth.eth.tx.check_gas',
[ [
[tx_hash_hex],
str(default_chain_spec), str(default_chain_spec),
[],
gas_receiver_address,
gas_limit * gas_price,
], ],
) )
t = s.apply_async()
with pytest.raises(OutOfGasError):
r = t.get()
#assert len(r) == 0
time.sleep(1) s_nonce.link(s_refill)
t.collect() t = s_nonce.apply_async()
#with pytest.raises(AlreadyFillingGasError):
t.get()
for e in t.collect():
pass
assert t.successful()
logg.warning('TODO: complete test by checking that second tx had zero value')
session = SessionBase.create_session()
q = session.query(Otx) # TODO: check gas is part of the transfer chain, and we cannot create the transfer nonce by uuid before the task. Test is subsumed by transfer task test, but should be tested in isolation
q = q.filter(Otx.tx_hash==tx_hash_hex) #def test_check_gas(
r = q.first() # default_chain_spec,
session.close() # init_eth_tester,
assert r.status == StatusEnum.WAITFORGAS # init_w3,
# init_rpc,
# eth_empty_accounts,
# init_database,
# cic_registry,
# celery_session_worker,
# bancor_registry,
# bancor_tokens,
# ):
#
# provider_address = init_w3.eth.accounts[0]
# gas_receiver_address = eth_empty_accounts[0]
# token_receiver_address = init_w3.eth.accounts[1]
#
## c = init_rpc
## txf = TokenTxFactory(gas_receiver_address, c)
## tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec, 'foo')
##
## (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
#
# token_data = [
# {
# 'address': bancor_tokens[0],
# },
# ]
#
# s_nonce = celery.signature(
# 'cic_eth.eth.tx.reserve_nonce',
# [
# token_data,
# init_w3.eth.accounts[0],
# ],
# queue=None,
# )
# s_transfer = celery.signature(
# 'cic_eth.eth.token.transfer',
# [
# init_w3.eth.accounts[0],
# init_w3.eth.accounts[1],
# 1024,
# str(default_chain_spec),
# ],
# queue=None,
# )
#
# gas_price = c.gas_price()
# gas_limit = tx_transfer['gas']
#
# s = celery.signature(
# 'cic_eth.eth.tx.check_gas',
# [
# [tx_hash_hex],
# str(default_chain_spec),
# [],
# gas_receiver_address,
# gas_limit * gas_price,
# ],
# )
# s_nonce.link(s_transfer)
# t = s_nonce.apply_async()
# with pytest.raises(OutOfGasError):
# r = t.get()
# #assert len(r) == 0
#
# time.sleep(1)
# t.collect()
#
# session = SessionBase.create_session()
# q = session.query(Otx)
# q = q.filter(Otx.tx_hash==tx_hash_hex)
# r = q.first()
# session.close()
# assert r.status == StatusEnum.WAITFORGAS
def test_resend_with_higher_gas( def test_resend_with_higher_gas(
@ -191,24 +255,62 @@ def test_resend_with_higher_gas(
): ):
c = init_rpc c = init_rpc
txf = TokenTxFactory(init_w3.eth.accounts[0], c)
tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec) token_data = [
logg.debug('txtransfer {}'.format(tx_transfer)) {
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec)) 'address': bancor_tokens[0],
logg.debug('signed raw {}'.format(tx_signed_raw_hex)) },
queue_create( ]
tx_transfer['nonce'],
tx_transfer['from'], s_nonce = celery.signature(
tx_hash_hex, 'cic_eth.eth.tx.reserve_nonce',
tx_signed_raw_hex, [
token_data,
init_w3.eth.accounts[0],
],
queue=None,
)
s_transfer = celery.signature(
'cic_eth.eth.token.transfer',
[
init_w3.eth.accounts[0],
init_w3.eth.accounts[1],
1024,
str(default_chain_spec), str(default_chain_spec),
],
queue=None,
) )
logg.debug('create {}'.format(tx_transfer['from']))
cache_transfer_data( # txf = TokenTxFactory(init_w3.eth.accounts[0], c)
tx_hash_hex,
tx_transfer, #_signed_raw_hex, # tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec, 'foo')
) # logg.debug('txtransfer {}'.format(tx_transfer))
# (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec))
# logg.debug('signed raw {}'.format(tx_signed_raw_hex))
# queue_create(
# tx_transfer['nonce'],
# tx_transfer['from'],
# tx_hash_hex,
# tx_signed_raw_hex,
# str(default_chain_spec),
# )
# logg.debug('create {}'.format(tx_transfer['from']))
# cache_transfer_data(
# tx_hash_hex,
# tx_transfer, #_signed_raw_hex,
# )
s_nonce.link(s_transfer)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[1])
o = q.first()
tx_hash_hex = o.tx_hash
s_resend = celery.signature( s_resend = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas', 'cic_eth.eth.tx.resend_with_higher_gas',
@ -217,13 +319,10 @@ def test_resend_with_higher_gas(
str(default_chain_spec), str(default_chain_spec),
], ],
) )
t = s_resend.apply_async() t = s_resend.apply_async()
i = 0
for r in t.collect(): for r in t.collect():
logg.debug('{} {}'.format(i, r[0].get())) pass
i += 1
assert t.successful() assert t.successful()
# #

View File

@ -1,4 +1,5 @@
# third-party imports # third-party imports
import pytest
import celery import celery
# local imports # local imports
@ -6,7 +7,88 @@ from cic_eth.admin.nonce import shift_nonce
from cic_eth.queue.tx import create as queue_create from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.tx import otx_cache_parse_tx from cic_eth.eth.tx import otx_cache_parse_tx
from cic_eth.eth.task import sign_tx from cic_eth.eth.task import sign_tx
from cic_eth.db.models.nonce import (
NonceReservation,
Nonce
)
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
@pytest.mark.skip()
def test_reserve_nonce_task(
init_database,
celery_session_worker,
eth_empty_accounts,
):
s = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'foo',
eth_empty_accounts[0],
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
o = q.first()
assert o != None
q = init_database.query(NonceReservation)
q = q.filter(NonceReservation.key==str(t))
o = q.first()
assert o != None
def test_reserve_nonce_chain(
default_chain_spec,
init_database,
celery_session_worker,
init_w3,
init_rpc,
):
provider_address = init_rpc.gas_provider()
Nonce.init(provider_address, 42, session=init_database)
init_database.commit()
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[0],
provider_address,
],
queue=None,
)
s_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
str(default_chain_spec),
],
queue=None,
)
s_nonce.link(s_gas)
t = s_nonce.apply_async()
r = t.get()
for c in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
Q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
o = q.first()
assert o.nonce == 42
@pytest.mark.skip()
def test_shift_nonce( def test_shift_nonce(
default_chain_spec, default_chain_spec,
init_database, init_database,
@ -47,3 +129,4 @@ def test_shift_nonce(
for _ in t.collect(): for _ in t.collect():
pass pass
assert t.successful() assert t.successful()

View File

@ -1,8 +1,29 @@
# third-party imports # third-party imports
import pytest import pytest
import uuid
# local imports # local imports
from cic_eth.db.models.nonce import Nonce from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
from cic_eth.error import (
InitializationError,
IntegrityError,
)
def test_nonce_init(
init_database,
eth_empty_accounts,
):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
init_database.commit()
with pytest.raises(InitializationError):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
def test_nonce_increment( def test_nonce_increment(
init_database, init_database,
@ -10,11 +31,46 @@ def test_nonce_increment(
database_engine, database_engine,
): ):
# if database_engine[:6] == 'sqlite':
# pytest.skip('sqlite cannot lock tables which is required for this test, skipping')
nonce = Nonce.next(eth_empty_accounts[0], 3) nonce = Nonce.next(eth_empty_accounts[0], 3)
assert nonce == 3 assert nonce == 3
nonce = Nonce.next(eth_empty_accounts[0], 3) nonce = Nonce.next(eth_empty_accounts[0], 3)
assert nonce == 4 assert nonce == 4
def test_nonce_reserve(
init_database,
eth_empty_accounts,
):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
init_database.commit()
uu = uuid.uuid4()
nonce = NonceReservation.next(eth_empty_accounts[0], str(uu), session=init_database)
init_database.commit()
assert nonce == 42
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
o = q.first()
assert o.nonce == 43
nonce = NonceReservation.release(str(uu))
init_database.commit()
assert nonce == 42
q = init_database.query(NonceReservation)
q = q.filter(NonceReservation.key==str(uu))
o = q.first()
assert o == None
def test_nonce_reserve_integrity(
init_database,
eth_empty_accounts,
):
uu = uuid.uuid4()
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
with pytest.raises(IntegrityError):
NonceReservation.release(str(uu))

View File

@ -0,0 +1,71 @@
# standard imports
import logging
# local imports
from cic_eth.queue.tx import get_status_tx
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.tx import cache_gas_refill_data
from cic_eth.db.models.otx import Otx
logg = logging.getLogger()
def test_status_tx_list(
default_chain_spec,
init_database,
init_w3,
):
tx = {
'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[1],
'nonce': 42,
'gas': 21000,
'gasPrice': 1000000,
'value': 128,
'chainId': 666,
'data': '',
}
logg.debug('nonce {}'.format(tx['nonce']))
tx_signed = init_w3.eth.sign_transaction(tx)
#tx_hash = RpcClient.w3.keccak(hexstr=tx_signed['raw'])
tx_hash = init_w3.keccak(hexstr=tx_signed['raw'])
queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec))
cache_gas_refill_data(tx_hash.hex(), tx)
tx_hash_hex = tx_hash.hex()
q = init_database.query(Otx)
otx = q.get(1)
otx.sendfail(session=init_database)
init_database.add(otx)
init_database.commit()
init_database.refresh(otx)
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 1
otx.sendfail(session=init_database)
otx.retry(session=init_database)
init_database.add(otx)
init_database.commit()
init_database.refresh(otx)
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.QUEUED, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 0
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.IN_NETWORK, session=init_database)
assert len(txs) == 0

View File

@ -102,6 +102,9 @@ def register_eth(i, u):
ps.get_message() ps.get_message()
m = ps.get_message(timeout=args.timeout) m = ps.get_message(timeout=args.timeout)
address = None address = None
if m == None:
logg.debug('message timeout')
return
if m['type'] == 'subscribe': if m['type'] == 'subscribe':
logg.debug('skipping subscribe message') logg.debug('skipping subscribe message')
continue continue

View File

@ -58,6 +58,7 @@ argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spe
argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url') argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address') argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-x', '--exit-on-error', dest='x', action='store_true', help='Halt exection on error')
argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true') argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', type=str, help='user export directory') argparser.add_argument('user_dir', type=str, help='user export directory')
@ -91,6 +92,27 @@ old_chain_spec = ChainSpec.from_chain_str(args.old_chain_spec)
old_chain_str = str(old_chain_spec) old_chain_str = str(old_chain_spec)
user_dir = args.user_dir # user_out_dir from import_users.py user_dir = args.user_dir # user_out_dir from import_users.py
meta_url = args.meta_provider meta_url = args.meta_provider
exit_on_error = args.x
class VerifierState:
def __init__(self, item_keys):
self.items = {}
for k in item_keys:
logg.info('k {}'.format(k))
self.items[k] = 0
def poke(self, item_key):
self.items[item_key] += 1
def __str__(self):
r = ''
for k in self.items.keys():
r += '{}: {}\n'.format(k, self.items[k])
return r
class VerifierError(Exception): class VerifierError(Exception):
@ -107,7 +129,8 @@ class VerifierError(Exception):
class Verifier: class Verifier:
def __init__(self, conn, cic_eth_api, gas_oracle, chain_spec, index_address, token_address, data_dir): # TODO: what an awful function signature
def __init__(self, conn, cic_eth_api, gas_oracle, chain_spec, index_address, token_address, data_dir, exit_on_error=False):
self.conn = conn self.conn = conn
self.gas_oracle = gas_oracle self.gas_oracle = gas_oracle
self.chain_spec = chain_spec self.chain_spec = chain_spec
@ -117,9 +140,18 @@ class Verifier:
self.tx_factory = TxFactory(chain_id=chain_spec.chain_id(), gas_oracle=gas_oracle) self.tx_factory = TxFactory(chain_id=chain_spec.chain_id(), gas_oracle=gas_oracle)
self.api = cic_eth_api self.api = cic_eth_api
self.data_dir = data_dir self.data_dir = data_dir
self.exit_on_error = exit_on_error
verifymethods = []
for k in dir(self):
if len(k) > 7 and k[:7] == 'verify_':
logg.info('adding verify method {}'.format(k))
verifymethods.append(k[7:])
self.state = VerifierState(verifymethods)
def verify_accounts_index(self, address): def verify_accounts_index(self, address, balance=None):
tx = self.tx_factory.template(ZERO_ADDRESS, self.index_address) tx = self.tx_factory.template(ZERO_ADDRESS, self.index_address)
data = keccak256_string_to_hex('have(address)')[:8] data = keccak256_string_to_hex('have(address)')[:8]
data += eth_abi.encode_single('address', address).hex() data += eth_abi.encode_single('address', address).hex()
@ -145,14 +177,14 @@ class Verifier:
raise VerifierError((actual_balance, balance), 'balance') raise VerifierError((actual_balance, balance), 'balance')
def verify_local_key(self, address): def verify_local_key(self, address, balance=None):
r = self.api.have_account(address, str(self.chain_spec)) r = self.api.have_account(address, str(self.chain_spec))
logg.debug('verify local key result {}'.format(r)) logg.debug('verify local key result {}'.format(r))
if r != address: if r != address:
raise VerifierError((address, r), 'local key') raise VerifierError((address, r), 'local key')
def verify_metadata(self, address): def verify_metadata(self, address, balance=None):
k = generate_metadata_pointer(bytes.fromhex(strip_0x(address)), ':cic.person') k = generate_metadata_pointer(bytes.fromhex(strip_0x(address)), ':cic.person')
url = os.path.join(meta_url, k) url = os.path.join(meta_url, k)
logg.debug('verify metadata url {}'.format(url)) logg.debug('verify metadata url {}'.format(url))
@ -185,14 +217,32 @@ class Verifier:
def verify(self, address, balance): def verify(self, address, balance):
logg.debug('verify {} {}'.format(address, balance)) logg.debug('verify {} {}'.format(address, balance))
methods = [
'local_key',
'accounts_index',
'balance',
'metadata',
]
for k in methods:
try: try:
self.verify_local_key(address) m = getattr(self, 'verify_{}'.format(k))
self.verify_accounts_index(address) m(address, balance)
self.verify_balance(address, balance) # self.verify_local_key(address)
self.verify_metadata(address) # self.verify_accounts_index(address)
# self.verify_balance(address, balance)
# self.verify_metadata(address)
except VerifierError as e: except VerifierError as e:
logg.critical('verification failed: {}'.format(e)) logline = 'verification {} failed for {}: {}'.format(k, address, str(e))
if self.exit_on_error:
logg.critical(logline)
sys.exit(1) sys.exit(1)
logg.error(logline)
self.state.poke(k)
def __str__(self):
return str(self.state)
class MockClient: class MockClient:
@ -263,7 +313,8 @@ def main():
r = l.split(',') r = l.split(',')
try: try:
address = to_checksum(r[0]) address = to_checksum(r[0])
sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r") #sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r")
logg.debug('loading balance {} {}'.format(i, address).ljust(200))
except ValueError: except ValueError:
break break
balance = int(r[1].rstrip()) balance = int(r[1].rstrip())
@ -274,7 +325,7 @@ def main():
api = AdminApi(MockClient()) api = AdminApi(MockClient())
verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address, user_dir) verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address, user_dir, exit_on_error)
user_new_dir = os.path.join(user_dir, 'new') user_new_dir = os.path.join(user_dir, 'new')
for x in os.walk(user_new_dir): for x in os.walk(user_new_dir):
@ -298,11 +349,17 @@ def main():
new_address = u.identities['evm'][subchain_str][0] new_address = u.identities['evm'][subchain_str][0]
subchain_str = '{}:{}'.format(old_chain_spec.common_name(), old_chain_spec.network_id()) subchain_str = '{}:{}'.format(old_chain_spec.common_name(), old_chain_spec.network_id())
old_address = u.identities['evm'][subchain_str][0] old_address = u.identities['evm'][subchain_str][0]
balance = 0
try:
balance = balances[old_address] balance = balances[old_address]
except KeyError:
logg.info('no old balance found for {}, assuming 0'.format(old_address))
logg.debug('checking {} -> {} = {}'.format(old_address, new_address, balance)) logg.debug('checking {} -> {} = {}'.format(old_address, new_address, balance))
verifier.verify(new_address, balance) verifier.verify(new_address, balance)
print(verifier)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -313,7 +313,7 @@ services:
- -c - -c
- | - |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_dispatcher.sh -q cic-eth -v ./start_dispatcher.sh -q cic-eth -vv
# command: "/root/start_dispatcher.sh -q cic-eth -vv" # command: "/root/start_dispatcher.sh -q cic-eth -vv"