WIP docstrings for drivers

This commit is contained in:
nolash 2021-08-27 13:41:03 +02:00
parent 8a6272dbfe
commit 578ee69cd3
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 108 additions and 11 deletions

View File

@ -19,16 +19,41 @@ logg = logging.getLogger(__name__)
def noop_callback(block, tx): def noop_callback(block, tx):
"""Logger-only callback for pre- and post processing.
:param block: Block object
:type block: chainlib.block.Block
:param tx: Transaction object
:type tx: chainlib.tx.Tx
"""
logg.debug('noop callback ({},{})'.format(block, tx)) logg.debug('noop callback ({},{})'.format(block, tx))
class Syncer: class Syncer:
"""Base class for syncer implementations.
:param backend: Syncer state backend
:type backend: chainsyncer.backend.base.Backend implementation
:param chain_interface: Chain interface implementation
:type chain_interface: chainlib.interface.ChainInterface implementation
:param pre_callback: Function to call before polling. Function will receive no arguments.
:type pre_callback: function
:param block_callback: Function to call before processing txs in a retrieved block. Function should have signature as chainsyncer.driver.base.noop_callback
:type block_callback: function
:param post_callback: Function to call after polling. Function will receive no arguments.
:type post_callback: function
"""
running_global = True running_global = True
"""If set to false syncer will terminate polling loop."""
yield_delay=0.005 yield_delay=0.005
"""Delay between each processed block."""
signal_request = [signal.SIGINT, signal.SIGTERM] signal_request = [signal.SIGINT, signal.SIGTERM]
"""Signals to catch to request shutdown."""
signal_set = False signal_set = False
"""Whether shutdown signal has been received."""
name = 'base' name = 'base'
"""Syncer name, to be overriden for each extended implementation."""
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None): def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
self.chain_interface = chain_interface self.chain_interface = chain_interface
@ -51,21 +76,49 @@ class Syncer:
def terminate(self): def terminate(self):
"""Set syncer to terminate as soon as possible.
"""
logg.info('termination requested!') logg.info('termination requested!')
Syncer.running_global = False Syncer.running_global = False
Syncer.running = False self.running = False
def add_filter(self, f): def add_filter(self, f):
"""Add filter to be processed for each transaction.
:param f: Filter
:type f: Object instance implementing signature as in chainsyncer.filter.NoopFilter.filter
"""
self.filter.add(f) self.filter.add(f)
self.backend.register_filter(str(f)) self.backend.register_filter(str(f))
def process_single(self, conn, block, tx): def process_single(self, conn, block, tx):
"""Set syncer backend cursor to the given transaction index and block height, and apply all registered filters on transaction.
:param conn: RPC connection instance
:type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
:param block: Transaction object
:type block: chainlib.tx.Tx
"""
self.backend.set(block.number, tx.index) self.backend.set(block.number, tx.index)
self.filter.apply(conn, block, tx) self.filter.apply(conn, block, tx)
def loop(self, interval, conn):
raise NotImplementedError()
def process(self, conn, block):
raise NotImplementedError()
def get(self, conn)
raise NotImplementedError()
def __str__(self): def __str__(self):
return 'syncer "{}" {}'.format( return 'syncer "{}" {}'.format(
self.name, self.name,

View File

@ -8,15 +8,29 @@ from .poll import BlockPollSyncer
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class HeadSyncer(BlockPollSyncer): class HeadSyncer(BlockPollSyncer):
"""Extends the block poller, implementing an open-ended syncer.
"""
name = 'head' name = 'head'
def process(self, conn, block): def process(self, conn, block):
"""Process a single block using the given RPC connection.
Processing means that all filters are executed on all transactions in the block.
If the block object does not contain the transaction details, the details will be retrieved from the network (incurring the corresponding performance penalty).
:param conn: RPC connection
:type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
"""
(pair, fltr) = self.backend.get() (pair, fltr) = self.backend.get()
logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr)) logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr))
i = pair[1] # set tx index from previous i = pair[1] # set tx index from previous
tx = None tx = None
while True: while True:
# handle block objects regardless of whether the tx data is embedded or not
try: try:
tx = block.tx(i) tx = block.tx(i)
except AttributeError: except AttributeError:
@ -27,7 +41,6 @@ class HeadSyncer(BlockPollSyncer):
# logg.debug('index error syncer tx get {}'.format(e)) # logg.debug('index error syncer tx get {}'.format(e))
# break # break
# TODO: Move specifics to eth subpackage, receipts are not a global concept
rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash)) rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash))
if rcpt != None: if rcpt != None:
tx.apply_receipt(self.chain_interface.src_normalize(rcpt)) tx.apply_receipt(self.chain_interface.src_normalize(rcpt))
@ -39,11 +52,22 @@ class HeadSyncer(BlockPollSyncer):
def get(self, conn): def get(self, conn):
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
:param conn: RPC connection
:type conn: chainlib.connectin.RPCConnection
:raises NoBlockForYou: Block at the given height does not exist
:rtype: chainlib.block.Block
:returns: Block object
"""
(height, flags) = self.backend.get() (height, flags) = self.backend.get()
block_number = height[0] block_number = height[0]
block_hash = [] block_hash = []
o = self.chain_interface.block_by_number(block_number) o = self.chain_interface.block_by_number(block_number)
try:
r = conn.do(o) r = conn.do(o)
except RPCException:
r = None
if r == None: if r == None:
raise NoBlockForYou() raise NoBlockForYou()
b = self.chain_interface.block_from_src(r) b = self.chain_interface.block_from_src(r)

View File

@ -13,7 +13,10 @@ logg = logging.getLogger(__name__)
class HistorySyncer(HeadSyncer): class HistorySyncer(HeadSyncer):
"""Bounded syncer implementation of the block poller. Reuses the head syncer process method implementation.
"""
name = 'history' name = 'history'
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None): def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
@ -27,6 +30,15 @@ class HistorySyncer(HeadSyncer):
def get(self, conn): def get(self, conn):
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
:param conn: RPC connection
:type conn: chainlib.connectin.RPCConnection
:raises SyncDone: Block target reached (at which point the syncer should terminate).
:rtype: chainlib.block.Block
:returns: Block object
:todo: DRY against HeadSyncer
"""
(height, flags) = self.backend.get() (height, flags) = self.backend.get()
if self.block_target < height[0]: if self.block_target < height[0]:
raise SyncDone(self.block_target) raise SyncDone(self.block_target)
@ -39,7 +51,7 @@ class HistorySyncer(HeadSyncer):
except RPCException: except RPCException:
r = None r = None
if r == None: if r == None:
raise SyncDone() #NoBlockForYou() raise SyncDone()
b = self.chain_interface.block_from_src(r) b = self.chain_interface.block_from_src(r)
return b return b

View File

@ -14,21 +14,29 @@ logg = logging.getLogger(__name__)
class BlockPollSyncer(Syncer): class BlockPollSyncer(Syncer):
"""Syncer driver implementation of chainsyncer.driver.base.Syncer that retrieves new blocks through polling.
"""
name = 'blockpoll' name = 'blockpoll'
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
def loop(self, interval, conn): def loop(self, interval, conn):
"""Indefinite loop polling the given RPC connection for new blocks in the given interval.
:param interval: Seconds to wait for next poll after processing of previous poll has been completed.
:type interval: int
:param conn: RPC connection
:type conn: chainlib.connection.RPCConnection
:rtype: tuple
:returns: See chainsyncer.backend.base.Backend.get
"""
(pair, fltr) = self.backend.get() (pair, fltr) = self.backend.get()
start_tx = pair[1] start_tx = pair[1]
while self.running and Syncer.running_global: while self.running and Syncer.running_global:
if self.pre_callback != None: if self.pre_callback != None:
self.pre_callback() self.pre_callback()
while True and Syncer.running_global: #while True and Syncer.running_global:
while True and self.running:
if start_tx > 0: if start_tx > 0:
start_tx -= 1 start_tx -= 1
continue continue

View File

@ -85,7 +85,7 @@ class NoopFilter:
:type block: chainlib.block.Block :type block: chainlib.block.Block
:param tx: Transaction object :param tx: Transaction object
:type tx: chainlib.tx.Tx :type tx: chainlib.tx.Tx
:param db_session: Backend sesssion object :param db_session: Backend session object
:type db_session: varies :type db_session: varies
""" """
logg.debug('noop filter :received\n{} {} {}'.format(block, tx, id(db_session))) logg.debug('noop filter :received\n{} {} {}'.format(block, tx, id(db_session)))