From 578ee69cd3000c3817e4968f9e946605a3e1f3e9 Mon Sep 17 00:00:00 2001 From: nolash Date: Fri, 27 Aug 2021 13:41:03 +0200 Subject: [PATCH] WIP docstrings for drivers --- chainsyncer/driver/base.py | 57 +++++++++++++++++++++++++++++++++-- chainsyncer/driver/head.py | 28 +++++++++++++++-- chainsyncer/driver/history.py | 14 ++++++++- chainsyncer/driver/poll.py | 18 ++++++++--- chainsyncer/filter.py | 2 +- 5 files changed, 108 insertions(+), 11 deletions(-) diff --git a/chainsyncer/driver/base.py b/chainsyncer/driver/base.py index 8e04c0a..2826c64 100644 --- a/chainsyncer/driver/base.py +++ b/chainsyncer/driver/base.py @@ -19,16 +19,41 @@ logg = logging.getLogger(__name__) def noop_callback(block, tx): + """Logger-only callback for pre- and post processing. + + :param block: Block object + :type block: chainlib.block.Block + :param tx: Transaction object + :type tx: chainlib.tx.Tx + """ logg.debug('noop callback ({},{})'.format(block, tx)) class Syncer: + """Base class for syncer implementations. + + :param backend: Syncer state backend + :type backend: chainsyncer.backend.base.Backend implementation + :param chain_interface: Chain interface implementation + :type chain_interface: chainlib.interface.ChainInterface implementation + :param pre_callback: Function to call before polling. Function will receive no arguments. + :type pre_callback: function + :param block_callback: Function to call before processing txs in a retrieved block. Function should have signature as chainsyncer.driver.base.noop_callback + :type block_callback: function + :param post_callback: Function to call after polling. Function will receive no arguments. + :type post_callback: function + """ running_global = True + """If set to false syncer will terminate polling loop.""" yield_delay=0.005 + """Delay between each processed block.""" signal_request = [signal.SIGINT, signal.SIGTERM] + """Signals to catch to request shutdown.""" signal_set = False + """Whether shutdown signal has been received.""" name = 'base' + """Syncer name, to be overriden for each extended implementation.""" def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None): self.chain_interface = chain_interface @@ -51,21 +76,49 @@ class Syncer: def terminate(self): + """Set syncer to terminate as soon as possible. + """ logg.info('termination requested!') Syncer.running_global = False - Syncer.running = False + self.running = False def add_filter(self, f): + """Add filter to be processed for each transaction. + + :param f: Filter + :type f: Object instance implementing signature as in chainsyncer.filter.NoopFilter.filter + """ self.filter.add(f) self.backend.register_filter(str(f)) def process_single(self, conn, block, tx): + """Set syncer backend cursor to the given transaction index and block height, and apply all registered filters on transaction. + + :param conn: RPC connection instance + :type conn: chainlib.connection.RPCConnection + :param block: Block object + :type block: chainlib.block.Block + :param block: Transaction object + :type block: chainlib.tx.Tx + """ self.backend.set(block.number, tx.index) self.filter.apply(conn, block, tx) - + + def loop(self, interval, conn): + raise NotImplementedError() + + + def process(self, conn, block): + raise NotImplementedError() + + + def get(self, conn) + raise NotImplementedError() + + def __str__(self): return 'syncer "{}" {}'.format( self.name, diff --git a/chainsyncer/driver/head.py b/chainsyncer/driver/head.py index b044a4e..46e2d21 100644 --- a/chainsyncer/driver/head.py +++ b/chainsyncer/driver/head.py @@ -8,15 +8,29 @@ from .poll import BlockPollSyncer logg = logging.getLogger(__name__) class HeadSyncer(BlockPollSyncer): + """Extends the block poller, implementing an open-ended syncer. + """ name = 'head' def process(self, conn, block): + """Process a single block using the given RPC connection. + + Processing means that all filters are executed on all transactions in the block. + + If the block object does not contain the transaction details, the details will be retrieved from the network (incurring the corresponding performance penalty). + + :param conn: RPC connection + :type conn: chainlib.connection.RPCConnection + :param block: Block object + :type block: chainlib.block.Block + """ (pair, fltr) = self.backend.get() logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr)) i = pair[1] # set tx index from previous tx = None while True: + # handle block objects regardless of whether the tx data is embedded or not try: tx = block.tx(i) except AttributeError: @@ -27,7 +41,6 @@ class HeadSyncer(BlockPollSyncer): # logg.debug('index error syncer tx get {}'.format(e)) # break - # TODO: Move specifics to eth subpackage, receipts are not a global concept rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash)) if rcpt != None: tx.apply_receipt(self.chain_interface.src_normalize(rcpt)) @@ -39,11 +52,22 @@ class HeadSyncer(BlockPollSyncer): def get(self, conn): + """Retrieve the block currently defined by the syncer cursor from the RPC provider. + + :param conn: RPC connection + :type conn: chainlib.connectin.RPCConnection + :raises NoBlockForYou: Block at the given height does not exist + :rtype: chainlib.block.Block + :returns: Block object + """ (height, flags) = self.backend.get() block_number = height[0] block_hash = [] o = self.chain_interface.block_by_number(block_number) - r = conn.do(o) + try: + r = conn.do(o) + except RPCException: + r = None if r == None: raise NoBlockForYou() b = self.chain_interface.block_from_src(r) diff --git a/chainsyncer/driver/history.py b/chainsyncer/driver/history.py index 1e03de9..7cebc74 100644 --- a/chainsyncer/driver/history.py +++ b/chainsyncer/driver/history.py @@ -13,7 +13,10 @@ logg = logging.getLogger(__name__) class HistorySyncer(HeadSyncer): + """Bounded syncer implementation of the block poller. Reuses the head syncer process method implementation. + + """ name = 'history' def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None): @@ -27,6 +30,15 @@ class HistorySyncer(HeadSyncer): def get(self, conn): + """Retrieve the block currently defined by the syncer cursor from the RPC provider. + + :param conn: RPC connection + :type conn: chainlib.connectin.RPCConnection + :raises SyncDone: Block target reached (at which point the syncer should terminate). + :rtype: chainlib.block.Block + :returns: Block object + :todo: DRY against HeadSyncer + """ (height, flags) = self.backend.get() if self.block_target < height[0]: raise SyncDone(self.block_target) @@ -39,7 +51,7 @@ class HistorySyncer(HeadSyncer): except RPCException: r = None if r == None: - raise SyncDone() #NoBlockForYou() + raise SyncDone() b = self.chain_interface.block_from_src(r) return b diff --git a/chainsyncer/driver/poll.py b/chainsyncer/driver/poll.py index c8d72e0..9fb7abf 100644 --- a/chainsyncer/driver/poll.py +++ b/chainsyncer/driver/poll.py @@ -14,21 +14,29 @@ logg = logging.getLogger(__name__) class BlockPollSyncer(Syncer): + """Syncer driver implementation of chainsyncer.driver.base.Syncer that retrieves new blocks through polling. + """ name = 'blockpoll' - def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None): - super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback) - - def loop(self, interval, conn): + """Indefinite loop polling the given RPC connection for new blocks in the given interval. + + :param interval: Seconds to wait for next poll after processing of previous poll has been completed. + :type interval: int + :param conn: RPC connection + :type conn: chainlib.connection.RPCConnection + :rtype: tuple + :returns: See chainsyncer.backend.base.Backend.get + """ (pair, fltr) = self.backend.get() start_tx = pair[1] while self.running and Syncer.running_global: if self.pre_callback != None: self.pre_callback() - while True and Syncer.running_global: + #while True and Syncer.running_global: + while True and self.running: if start_tx > 0: start_tx -= 1 continue diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py index 140ef45..3dc6860 100644 --- a/chainsyncer/filter.py +++ b/chainsyncer/filter.py @@ -85,7 +85,7 @@ class NoopFilter: :type block: chainlib.block.Block :param tx: Transaction object :type tx: chainlib.tx.Tx - :param db_session: Backend sesssion object + :param db_session: Backend session object :type db_session: varies """ logg.debug('noop filter :received\n{} {} {}'.format(block, tx, id(db_session)))