Use external chain queue engine
This commit is contained in:
@@ -17,26 +17,25 @@ from chainlib.eth.tx import unpack
|
||||
from chainlib.connection import RPCConnection
|
||||
from chainsyncer.error import SyncDone
|
||||
from hexathon import strip_0x
|
||||
from chainqueue.db.enum import (
|
||||
StatusEnum,
|
||||
StatusBits,
|
||||
)
|
||||
from chainqueue.error import NotLocalTxError
|
||||
from chainqueue.state import set_reserved
|
||||
|
||||
# local imports
|
||||
import cic_eth
|
||||
from cic_eth.db import SessionBase
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db.enum import StatusBits
|
||||
from cic_eth.db.enum import LockEnum
|
||||
from cic_eth.db import dsn_from_config
|
||||
from cic_eth.queue.tx import (
|
||||
get_upcoming_tx,
|
||||
set_dequeue,
|
||||
)
|
||||
from cic_eth.queue.query import get_upcoming_tx
|
||||
from cic_eth.admin.ctrl import lock_send
|
||||
from cic_eth.eth.tx import send as task_tx_send
|
||||
from cic_eth.error import (
|
||||
PermanentTxError,
|
||||
TemporaryTxError,
|
||||
NotLocalTxError,
|
||||
)
|
||||
#from cic_eth.eth.util import unpack_signed_raw_tx_hex
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
@@ -92,7 +91,6 @@ class DispatchSyncer:
|
||||
|
||||
def __init__(self, chain_spec):
|
||||
self.chain_spec = chain_spec
|
||||
self.chain_id = chain_spec.chain_id()
|
||||
|
||||
|
||||
def chain(self):
|
||||
@@ -103,13 +101,14 @@ class DispatchSyncer:
|
||||
c = len(txs.keys())
|
||||
logg.debug('processing {} txs {}'.format(c, list(txs.keys())))
|
||||
chain_str = str(self.chain_spec)
|
||||
session = SessionBase.create_session()
|
||||
for k in txs.keys():
|
||||
tx_raw = txs[k]
|
||||
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
|
||||
tx = unpack(tx_raw_bytes, self.chain_spec.chain_id())
|
||||
tx = unpack(tx_raw_bytes, self.chain_spec)
|
||||
|
||||
try:
|
||||
set_dequeue(tx['hash'])
|
||||
set_reserved(self.chain_spec, tx['hash'], session=session)
|
||||
except NotLocalTxError as e:
|
||||
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
|
||||
continue
|
||||
@@ -140,7 +139,7 @@ class DispatchSyncer:
|
||||
while run:
|
||||
txs = {}
|
||||
typ = StatusBits.QUEUED
|
||||
utxs = get_upcoming_tx(typ, chain_id=self.chain_id)
|
||||
utxs = get_upcoming_tx(self.chain_spec, typ)
|
||||
for k in utxs.keys():
|
||||
txs[k] = utxs[k]
|
||||
self.process(w3, txs)
|
||||
|
||||
@@ -2,14 +2,18 @@
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from hexathon import add_0x
|
||||
from hexathon import (
|
||||
add_0x,
|
||||
strip_0x,
|
||||
)
|
||||
from chainlib.eth.tx import unpack
|
||||
from chainqueue.db.enum import StatusBits
|
||||
from chainqueue.db.models.tx import TxCache
|
||||
from chainqueue.db.models.otx import Otx
|
||||
from chainqueue.query import get_paused_tx_cache as get_paused_tx
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.enum import StatusBits
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.queue.tx import get_paused_txs
|
||||
from cic_eth.eth.gas import create_check_gas_task
|
||||
from .base import SyncFilter
|
||||
|
||||
@@ -24,13 +28,13 @@ class GasFilter(SyncFilter):
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, session):
|
||||
tx_hash_hex = add_0x(tx.hash)
|
||||
if tx.value > 0:
|
||||
tx_hash_hex = add_0x(tx.hash)
|
||||
logg.debug('gas refill tx {}'.format(tx_hash_hex))
|
||||
session = SessionBase.bind_session(session)
|
||||
q = session.query(TxCache.recipient)
|
||||
q = q.join(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash_hex)
|
||||
q = q.filter(Otx.tx_hash==strip_0x(tx_hash_hex))
|
||||
r = q.first()
|
||||
|
||||
if r == None:
|
||||
@@ -38,7 +42,7 @@ class GasFilter(SyncFilter):
|
||||
SessionBase.release_session(session)
|
||||
return
|
||||
|
||||
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session)
|
||||
txs = get_paused_tx(self.chain_spec, status=StatusBits.GAS_ISSUES, sender=r[0], session=session, decoder=unpack)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
@@ -35,9 +35,10 @@ class RegistrationFilter(SyncFilter):
|
||||
address = to_checksum_address(add_0x(address_hex))
|
||||
logg.info('request token gift to {}'.format(address))
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
'cic_eth.eth.nonce.reserve_nonce',
|
||||
[
|
||||
address,
|
||||
self.chain_spec.asdict(),
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
|
||||
@@ -61,7 +61,7 @@ class TransferAuthFilter(SyncFilter):
|
||||
}
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
'cic_eth.eth.nonce.reserve_nonce',
|
||||
[
|
||||
[token_data],
|
||||
sender,
|
||||
@@ -69,7 +69,7 @@ class TransferAuthFilter(SyncFilter):
|
||||
queue=self.queue,
|
||||
)
|
||||
s_approve = celery.signature(
|
||||
'cic_eth.eth.token.approve',
|
||||
'cic_eth.eth.erc20.approve',
|
||||
[
|
||||
sender,
|
||||
recipient,
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
# external imports
|
||||
import celery
|
||||
from hexathon import (
|
||||
add_0x,
|
||||
)
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
from chainqueue.db.models.otx import Otx
|
||||
from chainlib.status import Status
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
from chainlib.status import Status
|
||||
from .base import SyncFilter
|
||||
|
||||
logg = logging.getLogger().getChild(__name__)
|
||||
@@ -34,8 +34,9 @@ class TxFilter(SyncFilter):
|
||||
db_session.flush()
|
||||
SessionBase.release_session(db_session)
|
||||
s = celery.signature(
|
||||
'cic_eth.queue.tx.set_final_status',
|
||||
'cic_eth.queue.state.set_final',
|
||||
[
|
||||
self.chain_spec.asdict(),
|
||||
add_0x(tx_hash_hex),
|
||||
tx.block.number,
|
||||
tx.status == Status.ERROR,
|
||||
|
||||
@@ -92,7 +92,7 @@ straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
|
||||
# TODO: we already have the signed raw tx in get, so its a waste of cycles to get_tx here
|
||||
def sendfail_filter(w3, tx_hash, rcpt, chain_spec):
|
||||
tx_dict = get_tx(tx_hash)
|
||||
tx = unpack_signed_raw_tx_hex(tx_dict['signed_tx'], chain_spec.chain_id())
|
||||
tx = unpack(tx_dict['signed_tx'], chain_spec)
|
||||
logg.debug('submitting tx {} for retry'.format(tx_hash))
|
||||
s_check = celery.signature(
|
||||
'cic_eth.admin.ctrl.check_lock',
|
||||
@@ -113,12 +113,12 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_spec):
|
||||
# )
|
||||
|
||||
# s_retry_status = celery.signature(
|
||||
# 'cic_eth.queue.tx.set_ready',
|
||||
# 'cic_eth.queue.state.set_ready',
|
||||
# [],
|
||||
# queue=queue,
|
||||
# )
|
||||
s_resend = celery.signature(
|
||||
'cic_eth.eth.tx.resend_with_higher_gas',
|
||||
'cic_eth.eth.gas.resend_with_higher_gas',
|
||||
[
|
||||
chain_str,
|
||||
],
|
||||
@@ -143,7 +143,7 @@ def dispatch(conn, chain_spec):
|
||||
for k in txs.keys():
|
||||
#tx_cache = get_tx_cache(k)
|
||||
tx_raw = txs[k]
|
||||
tx = unpack_signed_raw_tx_hex(tx_raw, chain_spec.chain_id())
|
||||
tx = unpack(tx_raw, chain_spec)
|
||||
|
||||
s_check = celery.signature(
|
||||
'cic_eth.admin.ctrl.check_lock',
|
||||
@@ -184,7 +184,7 @@ def dispatch(conn, chain_spec):
|
||||
# txs = list(txs.keys())
|
||||
# logg.debug('straggler txs {} chain {}'.format(signed_txs, chain_str))
|
||||
# s_send = celery.signature(
|
||||
# 'cic_eth.eth.resend_with_higher_gas',
|
||||
# 'cic_eth.eth.gas.resend_with_higher_gas',
|
||||
# [
|
||||
# txs,
|
||||
# chain_str,
|
||||
@@ -204,7 +204,7 @@ class StragglerFilter:
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
logg.debug('tx {}'.format(tx))
|
||||
s_send = celery.signature(
|
||||
'cic_eth.eth.tx.resend_with_higher_gas',
|
||||
'cic_eth.eth.gas.resend_with_higher_gas',
|
||||
[
|
||||
tx,
|
||||
self.chain_spec.asdict(),
|
||||
|
||||
@@ -14,21 +14,35 @@ import confini
|
||||
from chainlib.connection import RPCConnection
|
||||
from chainlib.eth.connection import EthUnixSignerConnection
|
||||
from chainlib.chain import ChainSpec
|
||||
from chainqueue.db.models.otx import Otx
|
||||
|
||||
# local imports
|
||||
from cic_eth.eth import erc20
|
||||
from cic_eth.eth import tx
|
||||
from cic_eth.eth import account
|
||||
from cic_eth.admin import debug
|
||||
from cic_eth.admin import ctrl
|
||||
from cic_eth.queue import tx
|
||||
from cic_eth.queue import balance
|
||||
from cic_eth.callbacks import Callback
|
||||
from cic_eth.callbacks import http
|
||||
from cic_eth.callbacks import tcp
|
||||
from cic_eth.callbacks import redis
|
||||
from cic_eth.eth import (
|
||||
erc20,
|
||||
tx,
|
||||
account,
|
||||
nonce,
|
||||
gas,
|
||||
)
|
||||
from cic_eth.admin import (
|
||||
debug,
|
||||
ctrl,
|
||||
)
|
||||
from cic_eth.queue import (
|
||||
query,
|
||||
balance,
|
||||
state,
|
||||
tx,
|
||||
lock,
|
||||
time,
|
||||
)
|
||||
from cic_eth.callbacks import (
|
||||
Callback,
|
||||
http,
|
||||
#tcp,
|
||||
redis,
|
||||
)
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db import dsn_from_config
|
||||
from cic_eth.ext import tx
|
||||
from cic_eth.registry import (
|
||||
@@ -162,5 +176,10 @@ def main():
|
||||
current_app.worker_main(argv)
|
||||
|
||||
|
||||
@celery.signals.eventlet_pool_postshutdown.connect
|
||||
def shutdown(sender=None, headers=None, body=None, **kwargs):
|
||||
logg.warning('in shudown event hook')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user