Add idle callback to poller

This commit is contained in:
lash 2022-01-30 09:51:23 +00:00
parent c37ad876f1
commit b8cd6e256a
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 39 additions and 8 deletions

View File

@ -12,6 +12,7 @@ from chainsyncer.error import (
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
NS_DIV = 1000000000
class BlockPollSyncer(Syncer): class BlockPollSyncer(Syncer):
"""Syncer driver implementation of chainsyncer.driver.base.Syncer that retrieves new blocks through polling. """Syncer driver implementation of chainsyncer.driver.base.Syncer that retrieves new blocks through polling.
@ -19,6 +20,40 @@ class BlockPollSyncer(Syncer):
name = 'blockpoll' name = 'blockpoll'
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, idle_callback=None):
super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback=pre_callback, block_callback=block_callback, post_callback=post_callback)
self.idle_callback = idle_callback
self.last_start = 0
self.clock_id = time.CLOCK_MONOTONIC_RAW
def idle(self, interval):
interval *= NS_DIV
idle_start = time.clock_gettime_ns(self.clock_id)
delta = idle_start - self.last_start
if delta > interval:
interval /= NS_DIV
time.sleep(interval)
return
if self.idle_callback != None:
r = True
while r:
before = time.clock_gettime_ns(self.clock_id)
r = self.idle_callback(interval)
after = time.clock_gettime_ns(self.clock_id)
delta = after - before
if delta < 0:
return
interval -= delta
if interval < 0:
return
interval /= NS_DIV
time.sleep(interval)
def loop(self, interval, conn): def loop(self, interval, conn):
"""Indefinite loop polling the given RPC connection for new blocks in the given interval. """Indefinite loop polling the given RPC connection for new blocks in the given interval.
@ -34,9 +69,9 @@ class BlockPollSyncer(Syncer):
while self.running and Syncer.running_global: while self.running and Syncer.running_global:
self.last_start = time.clock_gettime_ns(self.clock_id)
if self.pre_callback != None: if self.pre_callback != None:
self.pre_callback() self.pre_callback()
#while True and Syncer.running_global:
while True and self.running: while True and self.running:
if start_tx > 0: if start_tx > 0:
start_tx -= 1 start_tx -= 1
@ -48,11 +83,6 @@ class BlockPollSyncer(Syncer):
return self.backend.get() return self.backend.get()
except NoBlockForYou as e: except NoBlockForYou as e:
break break
# TODO: To properly handle this, ensure that previous request is rolled back
# except sqlalchemy.exc.OperationalError as e:
# logg.error('database error: {}'.format(e))
# break
if self.block_callback != None: if self.block_callback != None:
self.block_callback(block, None) self.block_callback(block, None)
@ -65,4 +95,5 @@ class BlockPollSyncer(Syncer):
time.sleep(self.yield_delay) time.sleep(self.yield_delay)
if self.post_callback != None: if self.post_callback != None:
self.post_callback() self.post_callback()
time.sleep(interval)
self.idle(interval)

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chainsyncer name = chainsyncer
version = 0.0.7 version = 0.1.0
description = Generic blockchain syncer driver description = Generic blockchain syncer driver
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no