diff --git a/cic_sync_filter/tx.py b/cic_sync_filter/tx.py index 8a9a4f9..72b7b61 100644 --- a/cic_sync_filter/tx.py +++ b/cic_sync_filter/tx.py @@ -6,9 +6,9 @@ import celery from hexathon import ( add_0x, ) -from chainsyncer.db.models.base import SessionBase -from chainqueue.db.models.otx import Otx from chainlib.status import Status +from cic_eth.queue.query import get_tx_local +from chainqueue.error import NotLocalTxError # local imports from .base import SyncFilter @@ -26,22 +26,20 @@ class TxFilter(SyncFilter): def filter(self, conn, block, tx, db_session=None): super(TxFilter, self).filter(conn, block, tx, db_session) - db_session = SessionBase.bind_session(db_session) - tx_hash_hex = tx.hash - otx = Otx.load(add_0x(tx_hash_hex), session=db_session) - if otx == None: - logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) + + try: + get_tx_local(self.chain_spec, tx.hash) + except NotLocalTxError: + logg.debug('tx {} not found locally'.format(tx.hash)) return None self.register_match() - db_session.flush() - SessionBase.release_session(db_session) s_final_state = celery.signature( 'cic_eth.queue.state.set_final', [ self.chain_spec.asdict(), - add_0x(tx_hash_hex), + add_0x(tx.hash), tx.block.number, tx.index, tx.status == Status.ERROR, @@ -52,14 +50,14 @@ class TxFilter(SyncFilter): 'cic_eth.queue.state.obsolete', [ self.chain_spec.asdict(), - add_0x(tx_hash_hex), + add_0x(tx.hash), True, ], queue=self.queue, ) t = celery.group(s_obsolete_state, s_final_state)() - logline = 'otx filter match on {}'.format(otx.tx_hash) + logline = 'otx filter match on {}'.format(tx.hash) logline = self.to_logline(block, tx, logline) logg.info(logline) diff --git a/tests/test_tx_filter.py b/tests/test_tx_filter.py index 82d56f6..1a4a5d9 100644 --- a/tests/test_tx_filter.py +++ b/tests/test_tx_filter.py @@ -24,10 +24,10 @@ from chainqueue.sql.state import ( set_sent, ) from hexathon import strip_0x +from cic_eth.eth.gas import cache_gas_data # local imports -from cic_eth.runnable.daemons.filters.tx import TxFilter -from cic_eth.eth.gas import cache_gas_data +from cic_sync_filter.tx import TxFilter def test_filter_tx(