cic-internal-integration/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py

63 lines
1.7 KiB
Python
Raw Normal View History

2021-02-17 09:19:42 +01:00
# standard imports
import logging
2021-04-04 14:40:59 +02:00
# external imports
2021-02-17 09:19:42 +01:00
import celery
2021-03-01 21:15:17 +01:00
from hexathon import (
add_0x,
)
from chainsyncer.db.models.base import SessionBase
2021-04-04 14:40:59 +02:00
from chainqueue.db.models.otx import Otx
2021-03-01 21:15:17 +01:00
from chainlib.status import Status
2021-04-04 14:40:59 +02:00
# local imports
2021-02-17 09:19:42 +01:00
from .base import SyncFilter
logg = logging.getLogger().getChild(__name__)
2021-02-17 09:19:42 +01:00
class TxFilter(SyncFilter):
2021-03-01 21:15:17 +01:00
def __init__(self, chain_spec, queue):
2021-02-17 09:19:42 +01:00
self.queue = queue
2021-03-01 21:15:17 +01:00
self.chain_spec = chain_spec
2021-02-17 09:19:42 +01:00
2021-03-01 21:15:17 +01:00
def filter(self, conn, block, tx, db_session=None):
db_session = SessionBase.bind_session(db_session)
tx_hash_hex = tx.hash
otx = Otx.load(add_0x(tx_hash_hex), session=db_session)
2021-02-17 09:19:42 +01:00
if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None
2021-03-06 18:55:51 +01:00
logg.info('tx filter match on {}'.format(otx.tx_hash))
db_session.flush()
2021-03-01 21:15:17 +01:00
SessionBase.release_session(db_session)
s_final_state = celery.signature(
2021-04-04 14:40:59 +02:00
'cic_eth.queue.state.set_final',
2021-02-17 09:19:42 +01:00
[
2021-04-04 14:40:59 +02:00
self.chain_spec.asdict(),
2021-03-01 21:15:17 +01:00
add_0x(tx_hash_hex),
tx.block.number,
2021-04-06 17:14:04 +02:00
tx.index,
2021-03-01 21:15:17 +01:00
tx.status == Status.ERROR,
2021-02-17 09:19:42 +01:00
],
queue=self.queue,
)
s_obsolete_state = celery.signature(
'cic_eth.queue.state.obsolete',
[
self.chain_spec.asdict(),
add_0x(tx_hash_hex),
True,
],
queue=self.queue,
)
t = celery.group(s_obsolete_state, s_final_state)()
2021-02-17 09:19:42 +01:00
return t
2021-03-01 21:15:17 +01:00
def __str__(self):
return 'cic-eth erc20 transfer filter'