Add graceful shutdown

This commit is contained in:
nolash 2021-02-22 23:38:00 +01:00
parent 5a6fe7327d
commit 7752ab5c5d
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 38 additions and 32 deletions

View File

@ -24,9 +24,10 @@ class CallbackFilter(SyncFilter):
trusted_addresses = [] trusted_addresses = []
def __init__(self, method, queue): def __init__(self, chain_spec, method, queue):
self.queue = queue self.queue = queue
self.method = method self.method = method
self.chain_spec = chain_spec
def call_back(self, transfer_type, result): def call_back(self, transfer_type, result):
@ -53,47 +54,53 @@ class CallbackFilter(SyncFilter):
s.apply_async() s.apply_async()
def parse_data(self, tx, rcpt): def parse_data(self, tx):
transfer_type = 'transfer' transfer_type = 'transfer'
transfer_data = None transfer_data = None
method_signature = tx.input[:10] method_signature = tx.payload[:10]
if method_signature == transfer_method_signature: if method_signature == transfer_method_signature:
transfer_data = unpack_transfer(tx.input) transfer_data = unpack_transfer(tx.payload)
transfer_data['from'] = tx['from'] transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to'] transfer_data['token_address'] = tx['to']
elif method_signature == transferfrom_method_signature: elif method_signature == transferfrom_method_signature:
transfer_type = 'transferfrom' transfer_type = 'transferfrom'
transfer_data = unpack_transferfrom(tx.input) transfer_data = unpack_transferfrom(tx.payload)
transfer_data['token_address'] = tx['to'] transfer_data['token_address'] = tx['to']
# TODO: do not rely on logs here # TODO: do not rely on logs here
elif method_signature == giveto_method_signature: elif method_signature == giveto_method_signature:
transfer_type = 'tokengift' transfer_type = 'tokengift'
transfer_data = unpack_gift(tx.input) transfer_data = unpack_gift(tx.payload)
for l in rcpt.logs: for l in tx.logs:
if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=l.data) transfer_data['value'] = web3.Web3.toInt(hexstr=l.data)
token_address_bytes = l.topics[2][32-20:] token_address_bytes = l.topics[2][32-20:]
transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex()) transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex())
transfer_data['from'] = rcpt.to transfer_data['from'] = tx.to
return (transfer_type, transfer_data) return (transfer_type, transfer_data)
def filter(self, w3, tx, rcpt, chain_spec, session=None): def filter(self, conn, block, tx, db_session=None):
logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method)) logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method))
chain_str = str(chain_spec) chain_str = str(self.chain_spec)
transfer_data = self.parse_data(tx, rcpt)
transfer_data = None transfer_data = None
if len(tx.input) < 10: try:
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx['hash'])) transfer_data = self.parse_data(tx)
except TypeError:
logg.debug('invalid method data length for tx {}'.format(tx.hash))
return return
logg.debug('checking callbacks filter input {}'.format(tx.input[:10])) transfer_data = None
if len(tx.payload) < 10:
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx.hash))
return
logg.debug('checking callbacks filter input {}'.format(tx.payload[:10]))
if transfer_data != None: if transfer_data != None:
token_symbol = None token_symbol = None

View File

@ -15,24 +15,24 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f
class RegistrationFilter(SyncFilter): class RegistrationFilter(SyncFilter):
def __init__(self, queue): def __init__(self, chain_spec, queue):
self.chain_spec = chain_spec
self.queue = queue self.queue = queue
def filter(self, w3, tx, rcpt, chain_spec, session=None): def filter(self, conn, block, tx, db_session=None):
logg.debug('applying registration filter')
registered_address = None registered_address = None
for l in rcpt['logs']: for l in tx.logs:
event_topic_hex = l['topics'][0].hex() event_topic_hex = l['topics'][0]
if event_topic_hex == account_registry_add_log_hash: if event_topic_hex == account_registry_add_log_hash:
address_bytes = l.topics[1][32-20:] address_hex = l['topics'][1][32-20:]
address = to_checksum(address_bytes.hex()) address = to_checksum(address_hex)
logg.debug('request token gift to {}'.format(address)) logg.debug('request token gift to {}'.format(address))
s = celery.signature( s = celery.signature(
'cic_eth.eth.account.gift', 'cic_eth.eth.account.gift',
[ [
address, address,
str(chain_spec), str(self.chain_spec),
], ],
queue=self.queue, queue=self.queue,
) )

View File

@ -6,7 +6,7 @@ import celery
# local imports # local imports
from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import Otx
from cic_eth.db.models.base import SessionBase from chainsyncer.db.models.base import SessionBase
from .base import SyncFilter from .base import SyncFilter
logg = logging.getLogger() logg = logging.getLogger()
@ -18,12 +18,11 @@ class TxFilter(SyncFilter):
self.queue = queue self.queue = queue
def filter(self, w3, tx, rcpt, chain_spec, session=None): def filter(self, conn, block, tx, db_session=None):
session = SessionBase.bind_session(session) db_session = SessionBase.bind_session(db_session)
logg.debug('applying tx filter') tx_hash_hex = tx.hash
tx_hash_hex = tx.hash.hex() otx = Otx.load(tx_hash_hex, session=db_session)
otx = Otx.load(tx_hash_hex, session=session) SessionBase.release_session(db_session)
SessionBase.release_session(session)
if otx == None: if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None return None

View File

@ -125,12 +125,12 @@ def main():
task_queue = config.get('_CELERY_QUEUE') task_queue = config.get('_CELERY_QUEUE')
if len(task_split) > 1: if len(task_split) > 1:
task_queue = task_split[0] task_queue = task_split[0]
callback_filter = CallbackFilter(task_split[1], task_queue) callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue)
callback_filters.append(callback_filter) callback_filters.append(callback_filter)
tx_filter = TxFilter(config.get('_CELERY_QUEUE')) tx_filter = TxFilter(config.get('_CELERY_QUEUE'))
registration_filter = RegistrationFilter(config.get('_CELERY_QUEUE')) registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE'))
gas_filter = GasFilter(config.get('_CELERY_QUEUE')) gas_filter = GasFilter(config.get('_CELERY_QUEUE'))