From 08fba819e4630a217ad098d2e72da7e965f39ce1 Mon Sep 17 00:00:00 2001 From: nolash Date: Sat, 3 Apr 2021 00:16:31 +0200 Subject: [PATCH] WIP implement chainqueue in daemons --- .../cic_eth/runnable/daemons/dispatcher.py | 17 ++++++++--------- .../cic_eth/runnable/daemons/filters/gas.py | 10 +++++----- .../cic_eth/runnable/daemons/filters/tx.py | 8 ++++---- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index 9f4ef332..5372c5d9 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -17,26 +17,25 @@ from chainlib.eth.tx import unpack from chainlib.connection import RPCConnection from chainsyncer.error import SyncDone from hexathon import strip_0x +from chainqueue.db.enum import ( + StatusEnum, + StatusBits, + ) +from chainqueue.error import NotLocalTxError # local imports import cic_eth from cic_eth.db import SessionBase -from cic_eth.db.enum import StatusEnum -from cic_eth.db.enum import StatusBits from cic_eth.db.enum import LockEnum from cic_eth.db import dsn_from_config -from cic_eth.queue.tx import ( - get_upcoming_tx, - set_dequeue, - ) +from cic_eth.queue.query import get_upcoming_tx +from cic_eth.queue.state import set_reserved from cic_eth.admin.ctrl import lock_send from cic_eth.eth.tx import send as task_tx_send from cic_eth.error import ( PermanentTxError, TemporaryTxError, - NotLocalTxError, ) -#from cic_eth.eth.util import unpack_signed_raw_tx_hex logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -140,7 +139,7 @@ class DispatchSyncer: while run: txs = {} typ = StatusBits.QUEUED - utxs = get_upcoming_tx(typ, chain_id=self.chain_id) + utxs = get_upcoming_tx(self.chain_spec, typ) for k in utxs.keys(): txs[k] = utxs[k] self.process(w3, txs) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py index 458e2851..baab1293 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/gas.py @@ -3,13 +3,13 @@ import logging # external imports from hexathon import add_0x +from chainqueue.db.enum import StatusBits +from chainqueue.db.models.tx import TxCache +from chainqueue.db.models.otx import Otx +from chainqueue.query import get_paused_tx_cache as get_paused_tx # local imports -from cic_eth.db.enum import StatusBits from cic_eth.db.models.base import SessionBase -from cic_eth.db.models.tx import TxCache -from cic_eth.db.models.otx import Otx -from cic_eth.queue.tx import get_paused_txs from cic_eth.eth.gas import create_check_gas_task from .base import SyncFilter @@ -38,7 +38,7 @@ class GasFilter(SyncFilter): SessionBase.release_session(session) return - txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session) + txs = get_paused_tx(self.chain_spec, StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session) SessionBase.release_session(session) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index 7e783c3e..d23bac9f 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -1,16 +1,16 @@ # standard imports import logging -# third-party imports +# external imports import celery from hexathon import ( add_0x, ) +from chainsyncer.db.models.base import SessionBase +from chainqueue.db.models.otx import Otx +from chainlib.status import Status # local imports -from cic_eth.db.models.otx import Otx -from chainsyncer.db.models.base import SessionBase -from chainlib.status import Status from .base import SyncFilter logg = logging.getLogger().getChild(__name__)