diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py index ace3be58..27e44661 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/callback.py @@ -24,9 +24,10 @@ class CallbackFilter(SyncFilter): trusted_addresses = [] - def __init__(self, method, queue): + def __init__(self, chain_spec, method, queue): self.queue = queue self.method = method + self.chain_spec = chain_spec def call_back(self, transfer_type, result): @@ -53,47 +54,53 @@ class CallbackFilter(SyncFilter): s.apply_async() - def parse_data(self, tx, rcpt): + def parse_data(self, tx): transfer_type = 'transfer' transfer_data = None - method_signature = tx.input[:10] + method_signature = tx.payload[:10] + if method_signature == transfer_method_signature: - transfer_data = unpack_transfer(tx.input) + transfer_data = unpack_transfer(tx.payload) transfer_data['from'] = tx['from'] transfer_data['token_address'] = tx['to'] elif method_signature == transferfrom_method_signature: transfer_type = 'transferfrom' - transfer_data = unpack_transferfrom(tx.input) + transfer_data = unpack_transferfrom(tx.payload) transfer_data['token_address'] = tx['to'] # TODO: do not rely on logs here elif method_signature == giveto_method_signature: transfer_type = 'tokengift' - transfer_data = unpack_gift(tx.input) - for l in rcpt.logs: + transfer_data = unpack_gift(tx.payload) + for l in tx.logs: if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa': transfer_data['value'] = web3.Web3.toInt(hexstr=l.data) token_address_bytes = l.topics[2][32-20:] transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex()) - transfer_data['from'] = rcpt.to + transfer_data['from'] = tx.to return (transfer_type, transfer_data) - def filter(self, w3, tx, rcpt, chain_spec, session=None): + def filter(self, conn, block, tx, db_session=None): logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method)) - chain_str = str(chain_spec) - - transfer_data = self.parse_data(tx, rcpt) + chain_str = str(self.chain_spec) transfer_data = None - if len(tx.input) < 10: - logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx['hash'])) + try: + transfer_data = self.parse_data(tx) + except TypeError: + logg.debug('invalid method data length for tx {}'.format(tx.hash)) return - logg.debug('checking callbacks filter input {}'.format(tx.input[:10])) + transfer_data = None + if len(tx.payload) < 10: + logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx.hash)) + return + + logg.debug('checking callbacks filter input {}'.format(tx.payload[:10])) if transfer_data != None: token_symbol = None diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py index 03c80938..d59cdd75 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/register.py @@ -15,24 +15,24 @@ account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153f class RegistrationFilter(SyncFilter): - def __init__(self, queue): + def __init__(self, chain_spec, queue): + self.chain_spec = chain_spec self.queue = queue - def filter(self, w3, tx, rcpt, chain_spec, session=None): - logg.debug('applying registration filter') + def filter(self, conn, block, tx, db_session=None): registered_address = None - for l in rcpt['logs']: - event_topic_hex = l['topics'][0].hex() + for l in tx.logs: + event_topic_hex = l['topics'][0] if event_topic_hex == account_registry_add_log_hash: - address_bytes = l.topics[1][32-20:] - address = to_checksum(address_bytes.hex()) + address_hex = l['topics'][1][32-20:] + address = to_checksum(address_hex) logg.debug('request token gift to {}'.format(address)) s = celery.signature( 'cic_eth.eth.account.gift', [ address, - str(chain_spec), + str(self.chain_spec), ], queue=self.queue, ) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index 93d6a5c9..b8c5f298 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -6,7 +6,7 @@ import celery # local imports from cic_eth.db.models.otx import Otx -from cic_eth.db.models.base import SessionBase +from chainsyncer.db.models.base import SessionBase from .base import SyncFilter logg = logging.getLogger() @@ -18,12 +18,11 @@ class TxFilter(SyncFilter): self.queue = queue - def filter(self, w3, tx, rcpt, chain_spec, session=None): - session = SessionBase.bind_session(session) - logg.debug('applying tx filter') - tx_hash_hex = tx.hash.hex() - otx = Otx.load(tx_hash_hex, session=session) - SessionBase.release_session(session) + def filter(self, conn, block, tx, db_session=None): + db_session = SessionBase.bind_session(db_session) + tx_hash_hex = tx.hash + otx = Otx.load(tx_hash_hex, session=db_session) + SessionBase.release_session(db_session) if otx == None: logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex)) return None diff --git a/apps/cic-eth/cic_eth/runnable/daemons/manager.py b/apps/cic-eth/cic_eth/runnable/daemons/manager.py index 6a3974f0..66f4888f 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/manager.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/manager.py @@ -125,12 +125,12 @@ def main(): task_queue = config.get('_CELERY_QUEUE') if len(task_split) > 1: task_queue = task_split[0] - callback_filter = CallbackFilter(task_split[1], task_queue) + callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue) callback_filters.append(callback_filter) tx_filter = TxFilter(config.get('_CELERY_QUEUE')) - registration_filter = RegistrationFilter(config.get('_CELERY_QUEUE')) + registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE')) gas_filter = GasFilter(config.get('_CELERY_QUEUE'))