2022-04-21 00:41:19 +02:00
|
|
|
# standard imports
|
|
|
|
import logging
|
|
|
|
|
|
|
|
# external imports
|
|
|
|
import celery
|
|
|
|
from hexathon import (
|
|
|
|
add_0x,
|
|
|
|
)
|
|
|
|
from chainlib.status import Status
|
2022-04-23 06:43:06 +02:00
|
|
|
from cic_eth.queue.query import get_tx_local
|
|
|
|
from chainqueue.error import NotLocalTxError
|
2022-04-21 00:41:19 +02:00
|
|
|
|
|
|
|
# local imports
|
|
|
|
from .base import SyncFilter
|
|
|
|
|
|
|
|
logg = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class TxFilter(SyncFilter):
|
|
|
|
|
2022-04-23 11:58:17 +02:00
|
|
|
def filter(self, conn, block, tx):
|
2022-04-23 12:28:22 +02:00
|
|
|
super(TxFilter, self).filter(conn, block, tx)
|
2022-04-23 06:43:06 +02:00
|
|
|
|
|
|
|
try:
|
|
|
|
get_tx_local(self.chain_spec, tx.hash)
|
|
|
|
except NotLocalTxError:
|
|
|
|
logg.debug('tx {} not found locally'.format(tx.hash))
|
2022-04-21 00:41:19 +02:00
|
|
|
return None
|
|
|
|
|
|
|
|
self.register_match()
|
|
|
|
|
|
|
|
s_final_state = celery.signature(
|
|
|
|
'cic_eth.queue.state.set_final',
|
|
|
|
[
|
|
|
|
self.chain_spec.asdict(),
|
2022-04-23 06:43:06 +02:00
|
|
|
add_0x(tx.hash),
|
2022-04-21 00:41:19 +02:00
|
|
|
tx.block.number,
|
|
|
|
tx.index,
|
|
|
|
tx.status == Status.ERROR,
|
|
|
|
],
|
|
|
|
queue=self.queue,
|
|
|
|
)
|
|
|
|
s_obsolete_state = celery.signature(
|
|
|
|
'cic_eth.queue.state.obsolete',
|
|
|
|
[
|
|
|
|
self.chain_spec.asdict(),
|
2022-04-23 06:43:06 +02:00
|
|
|
add_0x(tx.hash),
|
2022-04-21 00:41:19 +02:00
|
|
|
True,
|
|
|
|
],
|
|
|
|
queue=self.queue,
|
|
|
|
)
|
|
|
|
t = celery.group(s_obsolete_state, s_final_state)()
|
|
|
|
|
2022-04-23 06:43:06 +02:00
|
|
|
logline = 'otx filter match on {}'.format(tx.hash)
|
2022-04-21 00:41:19 +02:00
|
|
|
logline = self.to_logline(block, tx, logline)
|
|
|
|
logg.info(logline)
|
|
|
|
|
|
|
|
return t
|
|
|
|
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return 'otx filter'
|