cic-eth: Make nonce separate task
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import sys
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
@@ -317,6 +318,8 @@ class AdminApi:
|
||||
:return: Transaction details
|
||||
:rtype: dict
|
||||
"""
|
||||
problems = []
|
||||
|
||||
if tx_hash != None and tx_raw != None:
|
||||
ValueError('Specify only one of hash or raw tx')
|
||||
|
||||
@@ -444,10 +447,12 @@ class AdminApi:
|
||||
r = c.w3.eth.getTransactionReceipt(tx_hash)
|
||||
if r.status == 1:
|
||||
tx['network_status'] = 'Confirmed'
|
||||
tx['block'] = r.blockNumber
|
||||
tx['tx_index'] = r.transactionIndex
|
||||
else:
|
||||
tx['network_status'] = 'Reverted'
|
||||
tx['network_block_number'] = r.blockNumber
|
||||
tx['network_tx_index'] = r.transactionIndex
|
||||
if tx['block_number'] == None:
|
||||
problems.append('Queue is missing block number {} for mined tx'.format(r.blockNumber))
|
||||
except web3.exceptions.TransactionNotFound:
|
||||
pass
|
||||
|
||||
@@ -469,4 +474,9 @@ class AdminApi:
|
||||
t = s.apply_async()
|
||||
tx['status_log'] = t.get()
|
||||
|
||||
if len(problems) > 0:
|
||||
sys.stderr.write('\n')
|
||||
for p in problems:
|
||||
sys.stderr.write('!!!{}\n'.format(p))
|
||||
|
||||
return tx
|
||||
|
||||
@@ -92,6 +92,11 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
@@ -110,7 +115,8 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_check.link(s_tokens)
|
||||
s_nonce.link(s_tokens)
|
||||
s_check.link(s_nonce)
|
||||
if self.callback_param != None:
|
||||
s_convert.link(self.callback_success)
|
||||
s_tokens.link(s_convert).on_error(self.callback_error)
|
||||
@@ -147,6 +153,11 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
@@ -165,7 +176,8 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_check.link(s_tokens)
|
||||
s_nonce.link(s_tokens)
|
||||
s_check.link(s_nonce)
|
||||
if self.callback_param != None:
|
||||
s_convert.link(self.callback_success)
|
||||
s_tokens.link(s_convert).on_error(self.callback_error)
|
||||
@@ -200,6 +212,13 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
from_address,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
@@ -217,7 +236,8 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_check.link(s_tokens)
|
||||
s_nonce.link(s_tokens)
|
||||
s_check.link(s_nonce)
|
||||
if self.callback_param != None:
|
||||
s_transfer.link(self.callback_success)
|
||||
s_tokens.link(s_transfer).on_error(self.callback_error)
|
||||
@@ -228,82 +248,6 @@ class Api:
|
||||
return t
|
||||
|
||||
|
||||
def transfer_request(self, from_address, to_address, spender_address, value, token_symbol):
|
||||
"""Executes a chain of celery tasks that issues a transfer request of ERC20 tokens from one address to another.
|
||||
|
||||
:param from_address: Ethereum address of sender
|
||||
:type from_address: str, 0x-hex
|
||||
:param to_address: Ethereum address of recipient
|
||||
:type to_address: str, 0x-hex
|
||||
:param spender_address: Ethereum address that is executing transfer (typically an escrow contract)
|
||||
:type spender_address: str, 0x-hex
|
||||
:param value: Estimated return from conversion
|
||||
:type value: int
|
||||
:param token_symbol: ERC20 token symbol of token to send
|
||||
:type token_symbol: str
|
||||
:returns: uuid of root task
|
||||
:rtype: celery.Task
|
||||
"""
|
||||
s_check = celery.signature(
|
||||
'cic_eth.admin.ctrl.check_lock',
|
||||
[
|
||||
[token_symbol],
|
||||
self.chain_str,
|
||||
LockEnum.QUEUE,
|
||||
from_address,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens_transfer_approval = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens_approve = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_approve = celery.signature(
|
||||
'cic_eth.eth.token.approve',
|
||||
[
|
||||
from_address,
|
||||
spender_address,
|
||||
value,
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_transfer_approval = celery.signature(
|
||||
'cic_eth.eth.request.transfer_approval_request',
|
||||
[
|
||||
from_address,
|
||||
to_address,
|
||||
value,
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
# TODO: make approve and transfer_approval chainable so callback can be part of the full chain
|
||||
if self.callback_param != None:
|
||||
s_transfer_approval.link(self.callback_success)
|
||||
s_tokens_approve.link(s_approve)
|
||||
s_tokens_transfer_approval.link(s_transfer_approval).on_error(self.callback_error)
|
||||
else:
|
||||
s_tokens_approve.link(s_approve)
|
||||
s_tokens_transfer_approval.link(s_transfer_approval)
|
||||
|
||||
g = celery.group(s_tokens_approve, s_tokens_transfer_approval) #s_tokens.apply_async(queue=self.queue)
|
||||
s_check.link(g)
|
||||
t = s_check.apply_async()
|
||||
#t = s_tokens.apply_async(queue=self.queue)
|
||||
return t
|
||||
|
||||
|
||||
def balance(self, address, token_symbol, include_pending=True):
|
||||
"""Calls the provided callback with the current token balance of the given address.
|
||||
|
||||
@@ -396,6 +340,11 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_account = celery.signature(
|
||||
'cic_eth.eth.account.create',
|
||||
[
|
||||
@@ -403,7 +352,8 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_check.link(s_account)
|
||||
s_nonce.link(s_account)
|
||||
s_check.link(s_nonce)
|
||||
if self.callback_param != None:
|
||||
s_account.link(self.callback_success)
|
||||
|
||||
@@ -438,6 +388,11 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_refill = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
@@ -445,7 +400,8 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_check.link(s_refill)
|
||||
s_nonce.link(s_refill)
|
||||
s_check.link(s_nonce)
|
||||
if self.callback_param != None:
|
||||
s_refill.link(self.callback_success)
|
||||
|
||||
|
||||
@@ -103,6 +103,9 @@ def status_str(v, bits_only=False):
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if v == 0:
|
||||
return 'NONE'
|
||||
|
||||
for i in range(16):
|
||||
b = (1 << i)
|
||||
if (b & 0xffff) & v:
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
"""Nonce reservation
|
||||
|
||||
Revision ID: 3b693afd526a
|
||||
Revises: f738d9962fdf
|
||||
Create Date: 2021-03-05 07:09:50.898728
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '3b693afd526a'
|
||||
down_revision = 'f738d9962fdf'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'nonce_task_reservation',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('nonce', sa.Integer, nullable=False),
|
||||
sa.Column('key', sa.String, nullable=False),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('nonce_task_reservation')
|
||||
@@ -0,0 +1,30 @@
|
||||
"""Nonce reservation
|
||||
|
||||
Revision ID: 3b693afd526a
|
||||
Revises: f738d9962fdf
|
||||
Create Date: 2021-03-05 07:09:50.898728
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '3b693afd526a'
|
||||
down_revision = 'f738d9962fdf'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'nonce_task_reservation',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('nonce', sa.Integer, nullable=False),
|
||||
sa.Column('key', sa.String, nullable=False),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('nonce_task_reservation')
|
||||
@@ -54,7 +54,7 @@ class SessionBase(Model):
|
||||
|
||||
|
||||
@staticmethod
|
||||
def connect(dsn, pool_size=8, debug=False):
|
||||
def connect(dsn, pool_size=16, debug=False):
|
||||
"""Create new database connection engine and connect to database backend.
|
||||
|
||||
:param dsn: DSN string defining connection.
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
# third-party imports
|
||||
from sqlalchemy import Column, String, Integer
|
||||
from sqlalchemy import Column, String, Integer, DateTime
|
||||
|
||||
# local imports
|
||||
from .base import SessionBase
|
||||
from cic_eth.error import (
|
||||
InitializationError,
|
||||
IntegrityError,
|
||||
)
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
@@ -37,23 +42,43 @@ class Nonce(SessionBase):
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __get(session, address):
|
||||
r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
|
||||
def __get(conn, address):
|
||||
r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
|
||||
nonce = r.fetchone()
|
||||
session.flush()
|
||||
if nonce == None:
|
||||
return None
|
||||
return nonce[0]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __set(session, address, nonce):
|
||||
session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
|
||||
session.flush()
|
||||
def __set(conn, address, nonce):
|
||||
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
|
||||
|
||||
|
||||
@staticmethod
|
||||
def next(address, initial_if_not_exists=0, session=None):
|
||||
def __init(conn, address, nonce):
|
||||
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
|
||||
|
||||
|
||||
@staticmethod
|
||||
def init(address, nonce=0, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = session.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==address)
|
||||
o = q.first()
|
||||
if o != None:
|
||||
session.flush()
|
||||
raise InitializationError('nonce on {} already exists ({})'.format(address, o.nonce))
|
||||
session.flush()
|
||||
Nonce.__init(session, address, nonce)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
# TODO: Incrementing nonce MUST be done by separate tasks.
|
||||
@staticmethod
|
||||
def next(address, initial_if_not_exists=0):
|
||||
"""Generate next nonce for the given address.
|
||||
|
||||
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
|
||||
@@ -65,32 +90,96 @@ class Nonce(SessionBase):
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
#session = SessionBase.bind_session(session)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
session.begin_nested()
|
||||
#conn = Nonce.engine.connect()
|
||||
#session.begin_nested()
|
||||
conn = Nonce.engine.connect()
|
||||
if Nonce.transactional:
|
||||
#session.execute('BEGIN')
|
||||
session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
|
||||
session.flush()
|
||||
nonce = Nonce.__get(session, address)
|
||||
conn.execute('BEGIN')
|
||||
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
|
||||
logg.debug('locking nonce table for address {}'.format(address))
|
||||
nonce = Nonce.__get(conn, address)
|
||||
logg.debug('get nonce {} for address {}'.format(nonce, address))
|
||||
if nonce == None:
|
||||
nonce = initial_if_not_exists
|
||||
session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
|
||||
session.flush()
|
||||
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
|
||||
Nonce.__set(session, address, nonce+1)
|
||||
#if Nonce.transactional:
|
||||
#session.execute('COMMIT')
|
||||
#session.execute('UNLOCK TABLE nonce')
|
||||
#conn.close()
|
||||
session.commit()
|
||||
# session.commit()
|
||||
Nonce.__init(conn, address, nonce)
|
||||
Nonce.__set(conn, address, nonce+1)
|
||||
if Nonce.transactional:
|
||||
conn.execute('COMMIT')
|
||||
logg.debug('unlocking nonce table for address {}'.format(address))
|
||||
conn.close()
|
||||
#session.commit()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
#SessionBase.release_session(session)
|
||||
return nonce
|
||||
|
||||
|
||||
class NonceReservation(SessionBase):
|
||||
|
||||
__tablename__ = 'nonce_task_reservation'
|
||||
|
||||
nonce = Column(Integer)
|
||||
key = Column(String)
|
||||
date_created = Column(DateTime, default=datetime.datetime.utcnow)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def peek(key, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = session.query(NonceReservation)
|
||||
q = q.filter(NonceReservation.key==key)
|
||||
o = q.first()
|
||||
|
||||
nonce = None
|
||||
if o != None:
|
||||
nonce = o.nonce
|
||||
|
||||
session.flush()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return nonce
|
||||
|
||||
|
||||
@staticmethod
|
||||
def release(key, session=None):
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
nonce = NonceReservation.peek(key, session=session)
|
||||
|
||||
q = session.query(NonceReservation)
|
||||
q = q.filter(NonceReservation.key==key)
|
||||
o = q.first()
|
||||
|
||||
if o == None:
|
||||
raise IntegrityError('nonce for key {}'.format(nonce))
|
||||
SessionBase.release_session(session)
|
||||
|
||||
session.delete(o)
|
||||
session.flush()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return nonce
|
||||
|
||||
|
||||
@staticmethod
|
||||
def next(address, key, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if NonceReservation.peek(key, session) != None:
|
||||
raise IntegrityError('nonce for key {}'.format(key))
|
||||
|
||||
nonce = Nonce.next(address)
|
||||
|
||||
o = NonceReservation()
|
||||
o.nonce = nonce
|
||||
o.key = key
|
||||
session.add(o)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return nonce
|
||||
|
||||
@@ -400,7 +400,7 @@ class Otx(SessionBase):
|
||||
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
|
||||
if confirmed:
|
||||
if not self.status & StatusBits.OBSOLETE:
|
||||
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
|
||||
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
|
||||
self.__set_status(StatusEnum.CANCELLED, session)
|
||||
else:
|
||||
|
||||
@@ -143,7 +143,7 @@ class TxCache(SessionBase):
|
||||
self.block_number = block_number
|
||||
self.tx_index = tx_index
|
||||
# not automatically set in sqlite, it seems:
|
||||
self.date_created = datetime.datetime.now()
|
||||
self.date_created = datetime.datetime.utcnow()
|
||||
self.date_updated = self.date_created
|
||||
self.date_checked = self.date_created
|
||||
|
||||
|
||||
@@ -54,6 +54,13 @@ class RoleMissingError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class IntegrityError(Exception):
|
||||
"""Exception raised to signal irregularities with deduplication and ordering of tasks
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class LockedError(Exception):
|
||||
"""Exception raised when attempt is made to execute action that is deactivated by lock
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ class AccountTxFactory(TxFactory):
|
||||
self,
|
||||
address,
|
||||
chain_spec,
|
||||
uuid,
|
||||
session=None,
|
||||
):
|
||||
"""Register an Ethereum account address with the on-chain account registry
|
||||
@@ -59,7 +60,7 @@ class AccountTxFactory(TxFactory):
|
||||
'gas': gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'nonce': self.next_nonce(uuid, session=session),
|
||||
'value': 0,
|
||||
})
|
||||
return tx_add
|
||||
@@ -69,6 +70,7 @@ class AccountTxFactory(TxFactory):
|
||||
self,
|
||||
address,
|
||||
chain_spec,
|
||||
uuid,
|
||||
session=None,
|
||||
):
|
||||
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
|
||||
@@ -90,7 +92,7 @@ class AccountTxFactory(TxFactory):
|
||||
'gas': gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'nonce': self.next_nonce(uuid, session=session),
|
||||
'value': 0,
|
||||
})
|
||||
return tx_add
|
||||
@@ -156,19 +158,9 @@ def create(password, chain_str):
|
||||
logg.debug('created account {}'.format(a))
|
||||
|
||||
# Initialize nonce provider record for account
|
||||
# TODO: this can safely be set to zero, since we are randomly creating account
|
||||
n = c.w3.eth.getTransactionCount(a, 'pending')
|
||||
session = SessionBase.create_session()
|
||||
q = session.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==a)
|
||||
o = q.first()
|
||||
session.flush()
|
||||
if o == None:
|
||||
o = Nonce()
|
||||
o.address_hex = a
|
||||
o.nonce = n
|
||||
session.add(o)
|
||||
session.commit()
|
||||
Nonce.init(a, session=session)
|
||||
session.commit()
|
||||
session.close()
|
||||
return a
|
||||
|
||||
@@ -203,7 +195,7 @@ def register(self, account_address, chain_str, writer_address=None):
|
||||
c = RpcClient(chain_spec, holder_address=writer_address)
|
||||
txf = AccountTxFactory(writer_address, c)
|
||||
|
||||
tx_add = txf.add(account_address, chain_spec, session=session)
|
||||
tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session)
|
||||
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
|
||||
session.close()
|
||||
@@ -243,7 +235,7 @@ def gift(self, account_address, chain_str):
|
||||
txf = AccountTxFactory(account_address, c)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_add = txf.gift(account_address, chain_spec, session=session)
|
||||
tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
|
||||
session.close()
|
||||
|
||||
|
||||
@@ -32,10 +32,10 @@ class TxFactory:
|
||||
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
|
||||
|
||||
|
||||
def next_nonce(self, session=None):
|
||||
"""Returns the current cached nonce value, and increments it for next transaction.
|
||||
def next_nonce(self, uuid, session=None):
|
||||
"""Returns the current reserved nonce value, and increments it for next transaction.
|
||||
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
return self.nonce_oracle.next(session=session)
|
||||
return self.nonce_oracle.next_by_task_uuid(uuid, session=session)
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
# local imports
|
||||
from cic_eth.db.models.nonce import Nonce
|
||||
from cic_eth.db.models.nonce import (
|
||||
Nonce,
|
||||
NonceReservation,
|
||||
)
|
||||
|
||||
class NonceOracle():
|
||||
"""Ensures atomic nonce increments for all transactions across all tasks and threads.
|
||||
@@ -14,10 +17,15 @@ class NonceOracle():
|
||||
self.default_nonce = default_nonce
|
||||
|
||||
|
||||
def next(self, session=None):
|
||||
def next(self):
|
||||
"""Get next unique nonce.
|
||||
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
return Nonce.next(self.address, self.default_nonce, session=session)
|
||||
raise AttributeError('this should not be called')
|
||||
return Nonce.next(self.address, self.default_nonce)
|
||||
|
||||
|
||||
def next_by_task_uuid(self, uuid, session=None):
|
||||
return NonceReservation.release(uuid, session=session)
|
||||
|
||||
@@ -46,6 +46,7 @@ class TokenTxFactory(TxFactory):
|
||||
spender_address,
|
||||
amount,
|
||||
chain_spec,
|
||||
uuid,
|
||||
session=None,
|
||||
):
|
||||
"""Create an ERC20 "approve" transaction
|
||||
@@ -74,7 +75,7 @@ class TokenTxFactory(TxFactory):
|
||||
'gas': source_token_gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'nonce': self.next_nonce(uuid, session=session),
|
||||
})
|
||||
return tx_approve
|
||||
|
||||
@@ -85,6 +86,7 @@ class TokenTxFactory(TxFactory):
|
||||
receiver_address,
|
||||
value,
|
||||
chain_spec,
|
||||
uuid,
|
||||
session=None,
|
||||
):
|
||||
"""Create an ERC20 "transfer" transaction
|
||||
@@ -114,7 +116,7 @@ class TokenTxFactory(TxFactory):
|
||||
'gas': source_token_gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'nonce': self.next_nonce(uuid, session=session),
|
||||
})
|
||||
return tx_transfer
|
||||
|
||||
@@ -248,7 +250,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
||||
txf = TokenTxFactory(holder_address, c)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, session=session)
|
||||
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, self.request.root_id, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session)
|
||||
session.close()
|
||||
|
||||
@@ -304,7 +306,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
|
||||
txf = TokenTxFactory(holder_address, c)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, session=session)
|
||||
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, self.request.root_id, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session)
|
||||
session.close()
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ from cic_registry.chain import ChainSpec
|
||||
from .rpc import RpcClient
|
||||
from cic_eth.db import Otx, SessionBase
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.models.nonce import NonceReservation
|
||||
from cic_eth.db.models.lock import Lock
|
||||
from cic_eth.db.enum import (
|
||||
LockEnum,
|
||||
@@ -84,15 +85,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
|
||||
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
|
||||
|
||||
if gas_required > balance:
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
address,
|
||||
c.gas_provider(),
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
s_refill_gas = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
address,
|
||||
chain_str,
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
s_refill_gas.apply_async()
|
||||
s_nonce.link(s_refill_gas)
|
||||
s_nonce.apply_async()
|
||||
wait_tasks = []
|
||||
for tx_hash in tx_hashes:
|
||||
s = celery.signature(
|
||||
@@ -108,15 +117,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
|
||||
|
||||
safe_gas = c.safe_threshold_amount()
|
||||
if balance < safe_gas:
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
address,
|
||||
c.gas_provider(),
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
s_refill_gas = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
address,
|
||||
chain_str,
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
s_refill_gas.apply_async()
|
||||
s_nonce.link(s_refill)
|
||||
s_nonce.apply_async()
|
||||
logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address))
|
||||
ready_tasks = []
|
||||
for tx_hash in tx_hashes:
|
||||
@@ -287,11 +304,10 @@ class ParityNodeHandler:
|
||||
s_debug = celery.signature(
|
||||
'cic_eth.admin.debug.alert',
|
||||
[
|
||||
tx_hash_hex,
|
||||
tx_hash_hex,
|
||||
debugstr,
|
||||
],
|
||||
queue=queue,
|
||||
queue=self.queue,
|
||||
)
|
||||
s_set_reject.link(s_debug)
|
||||
s_lock.link(s_set_reject)
|
||||
@@ -299,7 +315,7 @@ class ParityNodeHandler:
|
||||
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||||
|
||||
|
||||
def handle_default(self, tx_hash_hex, tx_hex):
|
||||
def handle_default(self, tx_hash_hex, tx_hex, debugstr):
|
||||
tx_bytes = bytes.fromhex(tx_hex[2:])
|
||||
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
|
||||
s_lock = celery.signature(
|
||||
@@ -317,9 +333,18 @@ class ParityNodeHandler:
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_debug = celery.signature(
|
||||
'cic_eth.admin.debug.alert',
|
||||
[
|
||||
tx_hash_hex,
|
||||
debugstr,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_set_fubar.link(s_debug)
|
||||
s_lock.link(s_set_fubar)
|
||||
t = s_lock.apply_async()
|
||||
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||||
return (t, PermanentTxError, 'Fubar {} {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()), debugstr))
|
||||
|
||||
|
||||
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
|
||||
@@ -407,6 +432,7 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
zero_amount = False
|
||||
session = SessionBase.create_session()
|
||||
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
|
||||
q = session.query(Otx.tx_hash)
|
||||
@@ -416,8 +442,11 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
q = q.filter(TxCache.recipient==recipient_address)
|
||||
c = q.count()
|
||||
if c > 0:
|
||||
session.close()
|
||||
raise AlreadyFillingGasError(recipient_address)
|
||||
#session.close()
|
||||
#raise AlreadyFillingGasError(recipient_address)
|
||||
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
|
||||
zero_amount = True
|
||||
session.flush()
|
||||
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
|
||||
@@ -426,10 +455,13 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
|
||||
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
|
||||
nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
|
||||
nonce = nonce_generator.next(session=session)
|
||||
#nonce = nonce_generator.next(session=session)
|
||||
nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session)
|
||||
gas_price = c.gas_price()
|
||||
gas_limit = c.default_gas_limit
|
||||
refill_amount = c.refill_amount()
|
||||
refill_amount = 0
|
||||
if not zero_amount:
|
||||
refill_amount = c.refill_amount()
|
||||
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
|
||||
|
||||
# create and sign transaction
|
||||
@@ -475,6 +507,7 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
queue=queue,
|
||||
)
|
||||
celery.group(s_tx_cache, s_status)()
|
||||
|
||||
return tx_send_gas_signed['raw']
|
||||
|
||||
|
||||
@@ -554,6 +587,21 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
return tx_hash_hex
|
||||
|
||||
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
def reserve_nonce(self, chained_input, address=None):
|
||||
session = SessionBase.create_session()
|
||||
|
||||
if address == None:
|
||||
address = chained_input
|
||||
|
||||
root_id = self.request.root_id
|
||||
nonce = NonceReservation.next(address, root_id)
|
||||
|
||||
session.close()
|
||||
|
||||
return chained_input
|
||||
|
||||
|
||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
|
||||
def sync_tx(self, tx_hash_hex, chain_str):
|
||||
"""Force update of network status of a simgle transaction
|
||||
|
||||
@@ -3,10 +3,11 @@ import logging
|
||||
import sha3
|
||||
import web3
|
||||
|
||||
# third-party imports
|
||||
# external imports
|
||||
from rlp import decode as rlp_decode
|
||||
from rlp import encode as rlp_encode
|
||||
from eth_keys import KeyAPI
|
||||
from chainlib.eth.tx import unpack
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
@@ -22,64 +23,65 @@ field_debugs = [
|
||||
's',
|
||||
]
|
||||
|
||||
unpack_signed_raw_tx = unpack
|
||||
|
||||
def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
|
||||
d = rlp_decode(tx_raw_bytes)
|
||||
|
||||
logg.debug('decoding using chain id {}'.format(chain_id))
|
||||
j = 0
|
||||
for i in d:
|
||||
logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
|
||||
j += 1
|
||||
vb = chain_id
|
||||
if chain_id != 0:
|
||||
v = int.from_bytes(d[6], 'big')
|
||||
vb = v - (chain_id * 2) - 35
|
||||
while len(d[7]) < 32:
|
||||
d[7] = b'\x00' + d[7]
|
||||
while len(d[8]) < 32:
|
||||
d[8] = b'\x00' + d[8]
|
||||
s = b''.join([d[7], d[8], bytes([vb])])
|
||||
so = KeyAPI.Signature(signature_bytes=s)
|
||||
|
||||
h = sha3.keccak_256()
|
||||
h.update(rlp_encode(d))
|
||||
signed_hash = h.digest()
|
||||
|
||||
d[6] = chain_id
|
||||
d[7] = b''
|
||||
d[8] = b''
|
||||
|
||||
h = sha3.keccak_256()
|
||||
h.update(rlp_encode(d))
|
||||
unsigned_hash = h.digest()
|
||||
|
||||
p = so.recover_public_key_from_msg_hash(unsigned_hash)
|
||||
a = p.to_checksum_address()
|
||||
logg.debug('decoded recovery byte {}'.format(vb))
|
||||
logg.debug('decoded address {}'.format(a))
|
||||
logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
|
||||
logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
|
||||
|
||||
to = d[3].hex() or None
|
||||
if to != None:
|
||||
to = web3.Web3.toChecksumAddress('0x' + to)
|
||||
|
||||
return {
|
||||
'from': a,
|
||||
'nonce': int.from_bytes(d[0], 'big'),
|
||||
'gasPrice': int.from_bytes(d[1], 'big'),
|
||||
'gas': int.from_bytes(d[2], 'big'),
|
||||
'to': to,
|
||||
'value': int.from_bytes(d[4], 'big'),
|
||||
'data': '0x' + d[5].hex(),
|
||||
'v': chain_id,
|
||||
'r': '0x' + s[:32].hex(),
|
||||
's': '0x' + s[32:64].hex(),
|
||||
'chainId': chain_id,
|
||||
'hash': '0x' + signed_hash.hex(),
|
||||
'hash_unsigned': '0x' + unsigned_hash.hex(),
|
||||
}
|
||||
#def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
|
||||
# d = rlp_decode(tx_raw_bytes)
|
||||
#
|
||||
# logg.debug('decoding {} using chain id {}'.format(tx_raw_bytes.hex(), chain_id))
|
||||
# j = 0
|
||||
# for i in d:
|
||||
# logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
|
||||
# j += 1
|
||||
# vb = chain_id
|
||||
# if chain_id != 0:
|
||||
# v = int.from_bytes(d[6], 'big')
|
||||
# vb = v - (chain_id * 2) - 35
|
||||
# while len(d[7]) < 32:
|
||||
# d[7] = b'\x00' + d[7]
|
||||
# while len(d[8]) < 32:
|
||||
# d[8] = b'\x00' + d[8]
|
||||
# s = b''.join([d[7], d[8], bytes([vb])])
|
||||
# so = KeyAPI.Signature(signature_bytes=s)
|
||||
#
|
||||
# h = sha3.keccak_256()
|
||||
# h.update(rlp_encode(d))
|
||||
# signed_hash = h.digest()
|
||||
#
|
||||
# d[6] = chain_id
|
||||
# d[7] = b''
|
||||
# d[8] = b''
|
||||
#
|
||||
# h = sha3.keccak_256()
|
||||
# h.update(rlp_encode(d))
|
||||
# unsigned_hash = h.digest()
|
||||
#
|
||||
# p = so.recover_public_key_from_msg_hash(unsigned_hash)
|
||||
# a = p.to_checksum_address()
|
||||
# logg.debug('decoded recovery byte {}'.format(vb))
|
||||
# logg.debug('decoded address {}'.format(a))
|
||||
# logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
|
||||
# logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
|
||||
#
|
||||
# to = d[3].hex() or None
|
||||
# if to != None:
|
||||
# to = web3.Web3.toChecksumAddress('0x' + to)
|
||||
#
|
||||
# return {
|
||||
# 'from': a,
|
||||
# 'nonce': int.from_bytes(d[0], 'big'),
|
||||
# 'gasPrice': int.from_bytes(d[1], 'big'),
|
||||
# 'gas': int.from_bytes(d[2], 'big'),
|
||||
# 'to': to,
|
||||
# 'value': int.from_bytes(d[4], 'big'),
|
||||
# 'data': '0x' + d[5].hex(),
|
||||
# 'v': chain_id,
|
||||
# 'r': '0x' + s[:32].hex(),
|
||||
# 's': '0x' + s[32:64].hex(),
|
||||
# 'chainId': chain_id,
|
||||
# 'hash': '0x' + signed_hash.hex(),
|
||||
# 'hash_unsigned': '0x' + unsigned_hash.hex(),
|
||||
# }
|
||||
|
||||
|
||||
def unpack_signed_raw_tx_hex(tx_raw_hex, chain_id):
|
||||
|
||||
@@ -315,7 +315,9 @@ def set_ready(tx_hash):
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_dequeue(tx_hash):
|
||||
session = SessionBase.create_session()
|
||||
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
o = q.first()
|
||||
if o == None:
|
||||
session.close()
|
||||
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
|
||||
@@ -566,7 +568,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
|
||||
return txs
|
||||
|
||||
|
||||
def get_status_tx(status, before=None, exact=False, limit=0, session=None):
|
||||
def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, session=None):
|
||||
"""Retrieve transaction with a specific queue status.
|
||||
|
||||
:param status: Status to match transactions with
|
||||
@@ -582,11 +584,15 @@ def get_status_tx(status, before=None, exact=False, limit=0, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
q = session.query(Otx)
|
||||
q = q.join(TxCache)
|
||||
q = q.filter(TxCache.date_updated<before)
|
||||
# before = datetime.datetime.utcnow()
|
||||
if before != None:
|
||||
q = q.filter(TxCache.date_updated<before)
|
||||
if exact:
|
||||
q = q.filter(Otx.status==status.value)
|
||||
q = q.filter(Otx.status==status)
|
||||
else:
|
||||
q = q.filter(Otx.status.op('&')(status.value)==status.value)
|
||||
q = q.filter(Otx.status.op('&')(status)>0)
|
||||
if not_status != None:
|
||||
q = q.filter(Otx.status.op('&')(not_status)==0)
|
||||
i = 0
|
||||
for o in q.all():
|
||||
if limit > 0 and i == limit:
|
||||
|
||||
@@ -15,6 +15,8 @@ import web3
|
||||
from web3 import HTTPProvider, WebsocketProvider
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry.chain import ChainSpec
|
||||
from chainlib.eth.tx import unpack
|
||||
from hexathon import strip_0x
|
||||
|
||||
# local imports
|
||||
import cic_eth
|
||||
@@ -36,7 +38,7 @@ from cic_eth.error import (
|
||||
TemporaryTxError,
|
||||
NotLocalTxError,
|
||||
)
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx_hex
|
||||
#from cic_eth.eth.util import unpack_signed_raw_tx_hex
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
@@ -115,12 +117,15 @@ class DispatchSyncer:
|
||||
chain_str = str(self.chain_spec)
|
||||
for k in txs.keys():
|
||||
tx_raw = txs[k]
|
||||
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
|
||||
#tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
|
||||
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
|
||||
tx = unpack(tx_raw_bytes, self.chain_spec.chain_id())
|
||||
|
||||
try:
|
||||
set_dequeue(tx['hash'])
|
||||
except NotLocalTxError as e:
|
||||
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
|
||||
continue
|
||||
|
||||
s_check = celery.signature(
|
||||
'cic_eth.admin.ctrl.check_lock',
|
||||
|
||||
@@ -26,7 +26,6 @@ class RegistrationFilter(SyncFilter):
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
registered_address = None
|
||||
logg.debug('register filter checking log {}'.format(tx.logs))
|
||||
for l in tx.logs:
|
||||
event_topic_hex = l['topics'][0]
|
||||
if event_topic_hex == account_registry_add_log_hash:
|
||||
@@ -34,16 +33,23 @@ class RegistrationFilter(SyncFilter):
|
||||
|
||||
address_hex = strip_0x(l['topics'][1])[64-40:]
|
||||
address = to_checksum(add_0x(address_hex))
|
||||
logg.debug('request token gift to {}'.format(address))
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.account.gift',
|
||||
logg.info('request token gift to {}'.format(address))
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
address,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_gift = celery.signature(
|
||||
'cic_eth.eth.account.gift',
|
||||
[
|
||||
str(self.chain_spec),
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s.apply_async()
|
||||
s_nonce.link(s_gift)
|
||||
s_nonce.apply_async()
|
||||
|
||||
|
||||
def __str__(self):
|
||||
|
||||
@@ -66,14 +66,21 @@ class TransferAuthFilter(SyncFilter):
|
||||
sender = add_0x(to_checksum(o['sender']))
|
||||
recipient = add_0x(to_checksum(recipient))
|
||||
token = add_0x(to_checksum(o['token']))
|
||||
s = celery.signature(
|
||||
token_data = {
|
||||
'address': token,
|
||||
}
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
[token_data],
|
||||
sender,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_approve = celery.signature(
|
||||
'cic_eth.eth.token.approve',
|
||||
[
|
||||
[
|
||||
{
|
||||
'address': token,
|
||||
},
|
||||
],
|
||||
sender,
|
||||
recipient,
|
||||
o['value'],
|
||||
@@ -81,7 +88,8 @@ class TransferAuthFilter(SyncFilter):
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
t = s.apply_async()
|
||||
s_nonce.link(s_approve)
|
||||
t = s_nonce.apply_async()
|
||||
return True
|
||||
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ class TxFilter(SyncFilter):
|
||||
if otx == None:
|
||||
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
|
||||
return None
|
||||
logg.info('local tx match {}'.format(otx.tx_hash))
|
||||
logg.info('tx filter match on {}'.format(otx.tx_hash))
|
||||
SessionBase.release_session(db_session)
|
||||
s = celery.signature(
|
||||
'cic_eth.queue.tx.set_final_status',
|
||||
|
||||
@@ -138,7 +138,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_str):
|
||||
|
||||
# TODO: can we merely use the dispatcher instead?
|
||||
def dispatch(chain_str):
|
||||
txs = get_status_tx(StatusEnum.RETRY, datetime.datetime.utcnow())
|
||||
txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow())
|
||||
if len(txs) == 0:
|
||||
logg.debug('no retry state txs found')
|
||||
return
|
||||
|
||||
@@ -43,6 +43,7 @@ argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-p', '--provider', dest='p', type=str, help='Web3 provider url (http only)')
|
||||
argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address')
|
||||
argparser.add_argument('-f', '--format', dest='f', default='terminal', type=str, help='Output format')
|
||||
argparser.add_argument('--status-raw', dest='status_raw', action='store_true', help='Output statis bit enum names only')
|
||||
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
|
||||
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
|
||||
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
|
||||
@@ -122,7 +123,7 @@ def render_tx(o, **kwargs):
|
||||
|
||||
for v in o.get('status_log', []):
|
||||
d = datetime.datetime.fromisoformat(v[0])
|
||||
e = status_str(v[1])
|
||||
e = status_str(v[1], args.status_raw)
|
||||
content += '{}: {}\n'.format(d, e)
|
||||
|
||||
return content
|
||||
|
||||
@@ -9,7 +9,10 @@ import celery
|
||||
# local imports
|
||||
from .base import Syncer
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db.enum import (
|
||||
StatusEnum,
|
||||
StatusBits,
|
||||
)
|
||||
from cic_eth.queue.tx import get_status_tx
|
||||
|
||||
logg = logging.getLogger()
|
||||
@@ -47,7 +50,8 @@ class RetrySyncer(Syncer):
|
||||
# )
|
||||
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
|
||||
stalled_txs = get_status_tx(
|
||||
StatusEnum.SENT.value,
|
||||
StatusBits.IN_NETWORK.value,
|
||||
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
|
||||
before=before,
|
||||
)
|
||||
# return list(failed_txs.keys()) + list(stalled_txs.keys())
|
||||
|
||||
Reference in New Issue
Block a user