Make tx filter pass
This commit is contained in:
parent
3a83acafe8
commit
d12097a8d8
|
@ -6,9 +6,9 @@ import celery
|
||||||
from hexathon import (
|
from hexathon import (
|
||||||
add_0x,
|
add_0x,
|
||||||
)
|
)
|
||||||
from chainsyncer.db.models.base import SessionBase
|
|
||||||
from chainqueue.db.models.otx import Otx
|
|
||||||
from chainlib.status import Status
|
from chainlib.status import Status
|
||||||
|
from cic_eth.queue.query import get_tx_local
|
||||||
|
from chainqueue.error import NotLocalTxError
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from .base import SyncFilter
|
from .base import SyncFilter
|
||||||
|
@ -26,22 +26,20 @@ class TxFilter(SyncFilter):
|
||||||
|
|
||||||
def filter(self, conn, block, tx, db_session=None):
|
def filter(self, conn, block, tx, db_session=None):
|
||||||
super(TxFilter, self).filter(conn, block, tx, db_session)
|
super(TxFilter, self).filter(conn, block, tx, db_session)
|
||||||
db_session = SessionBase.bind_session(db_session)
|
|
||||||
tx_hash_hex = tx.hash
|
try:
|
||||||
otx = Otx.load(add_0x(tx_hash_hex), session=db_session)
|
get_tx_local(self.chain_spec, tx.hash)
|
||||||
if otx == None:
|
except NotLocalTxError:
|
||||||
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
|
logg.debug('tx {} not found locally'.format(tx.hash))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
self.register_match()
|
self.register_match()
|
||||||
|
|
||||||
db_session.flush()
|
|
||||||
SessionBase.release_session(db_session)
|
|
||||||
s_final_state = celery.signature(
|
s_final_state = celery.signature(
|
||||||
'cic_eth.queue.state.set_final',
|
'cic_eth.queue.state.set_final',
|
||||||
[
|
[
|
||||||
self.chain_spec.asdict(),
|
self.chain_spec.asdict(),
|
||||||
add_0x(tx_hash_hex),
|
add_0x(tx.hash),
|
||||||
tx.block.number,
|
tx.block.number,
|
||||||
tx.index,
|
tx.index,
|
||||||
tx.status == Status.ERROR,
|
tx.status == Status.ERROR,
|
||||||
|
@ -52,14 +50,14 @@ class TxFilter(SyncFilter):
|
||||||
'cic_eth.queue.state.obsolete',
|
'cic_eth.queue.state.obsolete',
|
||||||
[
|
[
|
||||||
self.chain_spec.asdict(),
|
self.chain_spec.asdict(),
|
||||||
add_0x(tx_hash_hex),
|
add_0x(tx.hash),
|
||||||
True,
|
True,
|
||||||
],
|
],
|
||||||
queue=self.queue,
|
queue=self.queue,
|
||||||
)
|
)
|
||||||
t = celery.group(s_obsolete_state, s_final_state)()
|
t = celery.group(s_obsolete_state, s_final_state)()
|
||||||
|
|
||||||
logline = 'otx filter match on {}'.format(otx.tx_hash)
|
logline = 'otx filter match on {}'.format(tx.hash)
|
||||||
logline = self.to_logline(block, tx, logline)
|
logline = self.to_logline(block, tx, logline)
|
||||||
logg.info(logline)
|
logg.info(logline)
|
||||||
|
|
||||||
|
|
|
@ -24,10 +24,10 @@ from chainqueue.sql.state import (
|
||||||
set_sent,
|
set_sent,
|
||||||
)
|
)
|
||||||
from hexathon import strip_0x
|
from hexathon import strip_0x
|
||||||
|
from cic_eth.eth.gas import cache_gas_data
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from cic_eth.runnable.daemons.filters.tx import TxFilter
|
from cic_sync_filter.tx import TxFilter
|
||||||
from cic_eth.eth.gas import cache_gas_data
|
|
||||||
|
|
||||||
|
|
||||||
def test_filter_tx(
|
def test_filter_tx(
|
||||||
|
|
Loading…
Reference in New Issue