From b8cd6e256a2f8684b72dc03dd43d16459c42f7ee Mon Sep 17 00:00:00 2001 From: lash Date: Sun, 30 Jan 2022 09:51:23 +0000 Subject: [PATCH] Add idle callback to poller --- chainsyncer/driver/poll.py | 45 ++++++++++++++++++++++++++++++++------ setup.cfg | 2 +- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/chainsyncer/driver/poll.py b/chainsyncer/driver/poll.py index e11fdcb..cb7c790 100644 --- a/chainsyncer/driver/poll.py +++ b/chainsyncer/driver/poll.py @@ -12,6 +12,7 @@ from chainsyncer.error import ( logg = logging.getLogger(__name__) +NS_DIV = 1000000000 class BlockPollSyncer(Syncer): """Syncer driver implementation of chainsyncer.driver.base.Syncer that retrieves new blocks through polling. @@ -19,6 +20,40 @@ class BlockPollSyncer(Syncer): name = 'blockpoll' + + def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, idle_callback=None): + super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback=pre_callback, block_callback=block_callback, post_callback=post_callback) + self.idle_callback = idle_callback + self.last_start = 0 + self.clock_id = time.CLOCK_MONOTONIC_RAW + + + def idle(self, interval): + interval *= NS_DIV + idle_start = time.clock_gettime_ns(self.clock_id) + delta = idle_start - self.last_start + if delta > interval: + interval /= NS_DIV + time.sleep(interval) + return + + if self.idle_callback != None: + r = True + while r: + before = time.clock_gettime_ns(self.clock_id) + r = self.idle_callback(interval) + after = time.clock_gettime_ns(self.clock_id) + delta = after - before + if delta < 0: + return + interval -= delta + if interval < 0: + return + + interval /= NS_DIV + time.sleep(interval) + + def loop(self, interval, conn): """Indefinite loop polling the given RPC connection for new blocks in the given interval. @@ -34,9 +69,9 @@ class BlockPollSyncer(Syncer): while self.running and Syncer.running_global: + self.last_start = time.clock_gettime_ns(self.clock_id) if self.pre_callback != None: self.pre_callback() - #while True and Syncer.running_global: while True and self.running: if start_tx > 0: start_tx -= 1 @@ -48,11 +83,6 @@ class BlockPollSyncer(Syncer): return self.backend.get() except NoBlockForYou as e: break -# TODO: To properly handle this, ensure that previous request is rolled back -# except sqlalchemy.exc.OperationalError as e: -# logg.error('database error: {}'.format(e)) -# break - if self.block_callback != None: self.block_callback(block, None) @@ -65,4 +95,5 @@ class BlockPollSyncer(Syncer): time.sleep(self.yield_delay) if self.post_callback != None: self.post_callback() - time.sleep(interval) + + self.idle(interval) diff --git a/setup.cfg b/setup.cfg index 16d8364..376c2a7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.7 +version = 0.1.0 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no