# standard imports import logging import time # third-party imports import celery # local impotes from .base import Syncer from cic_eth.queue.tx import set_final_status from cic_eth.eth import RpcClient app = celery.current_app logg = logging.getLogger() class MinedSyncer(Syncer): """Base implementation of block processor for mined blocks. Loops through all transactions, :param bc_cache: Retrieves block cache cursors for chain head and latest processed block. :type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend """ yield_delay = 0.005 def __init__(self, bc_cache): super(MinedSyncer, self).__init__(bc_cache) self.block_offset = 0 self.tx_offset = 0 def process(self, w3, ref): """Processes transactions in a single block, advancing transaction (and block) cursor accordingly. :param w3: Web3 object :type w3: web3.Web3 :param ref: Block reference (hash) to process :type ref: str, 0x-hex :returns: Block number of next unprocessed block :rtype: number """ b = w3.eth.getBlock(ref) c = w3.eth.getBlockTransactionCount(ref) s = 0 if self.block_offset == b.number: s = self.tx_offset logg.debug('processing {} (blocknumber {}, count {}, offset {})'.format(ref, b.number, c, s)) for i in range(s, c): tx = w3.eth.getTransactionByBlock(ref, i) tx_hash_hex = tx['hash'].hex() rcpt = w3.eth.getTransactionReceipt(tx_hash_hex) logg.debug('{}/{} processing tx {} from block {} {}'.format(i+1, c, tx_hash_hex, b.number, ref)) ours = False # TODO: ensure filter loop can complete on graceful shutdown for f in self.filter: #try: session = self.bc_cache.connect() task_uuid = f(w3, tx, rcpt, self.chain(), session) #except Exception as e: # logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e)) # continue if task_uuid != None: logg.debug('tx {} passed to celery task {}'.format(tx_hash_hex, task_uuid)) s = celery.signature( 'set_final_status', [tx_hash_hex, rcpt['blockNumber'], not rcpt['status']], ) s.apply_async() break next_tx = i + 1 if next_tx == c: self.bc_cache.set(b.number+1, 0) else: self.bc_cache.set(b.number, next_tx) if c == 0: logg.info('synced blockĀ {} has no transactions'.format(b.number)) #self.bc_cache.session(b.number+1, 0) self.bc_cache.set(b.number+1, 0) return b['number'] def loop(self, interval): """Loop running until the "running" property of Syncer is set to False. Retrieves latest unprocessed blocks and processes them. :param interval: Delay in seconds until next attempt if no new blocks are found. :type interval: int """ while self.running and Syncer.running_global: self.bc_cache.connect() c = RpcClient(self.chain()) logg.debug('loop execute') e = self.get(c.w3) logg.debug('got blocks {}'.format(e)) for block in e: block_number = self.process(c.w3, block.hex()) logg.info('processed block {} {}'.format(block_number, block.hex())) self.bc_cache.disconnect() if len(e) > 0: time.sleep(self.yield_delay) else: time.sleep(interval) logg.info("Syncer no longer set to run, gracefully exiting")