Compare commits
3 Commits
lash/flags
...
v0.1.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
537b5c2bbc
|
||
|
|
fd62ad7075
|
||
|
|
b8cd6e256a
|
@@ -11,7 +11,7 @@ from chainsyncer.db.models.filter import BlockchainSyncFilter
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
from .base import Backend
|
||||
|
||||
logg = logging.getLogger().getChild(__name__)
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SQLBackend(Backend):
|
||||
@@ -267,7 +267,8 @@ class SQLBackend(Backend):
|
||||
session.flush()
|
||||
|
||||
#if block_height != block_resume:
|
||||
if highest_unsynced_block < block_resume:
|
||||
logg.info('last live id {} {} {}'.format(last_live_id, highest_unsynced_block, block_resume))
|
||||
if highest_unsynced_block <= block_resume:
|
||||
|
||||
q = session.query(BlockchainSyncFilter)
|
||||
q = q.filter(BlockchainSyncFilter.chain_sync_id==last_live_id)
|
||||
|
||||
@@ -12,6 +12,7 @@ from chainsyncer.error import (
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
NS_DIV = 1000000000
|
||||
|
||||
class BlockPollSyncer(Syncer):
|
||||
"""Syncer driver implementation of chainsyncer.driver.base.Syncer that retrieves new blocks through polling.
|
||||
@@ -19,6 +20,40 @@ class BlockPollSyncer(Syncer):
|
||||
|
||||
name = 'blockpoll'
|
||||
|
||||
|
||||
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, idle_callback=None):
|
||||
super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback=pre_callback, block_callback=block_callback, post_callback=post_callback)
|
||||
self.idle_callback = idle_callback
|
||||
self.last_start = 0
|
||||
self.clock_id = time.CLOCK_MONOTONIC_RAW
|
||||
|
||||
|
||||
def idle(self, interval):
|
||||
interval *= NS_DIV
|
||||
idle_start = time.clock_gettime_ns(self.clock_id)
|
||||
delta = idle_start - self.last_start
|
||||
if delta > interval:
|
||||
interval /= NS_DIV
|
||||
time.sleep(interval)
|
||||
return
|
||||
|
||||
if self.idle_callback != None:
|
||||
r = True
|
||||
while r:
|
||||
before = time.clock_gettime_ns(self.clock_id)
|
||||
r = self.idle_callback(interval)
|
||||
after = time.clock_gettime_ns(self.clock_id)
|
||||
delta = after - before
|
||||
if delta < 0:
|
||||
return
|
||||
interval -= delta
|
||||
if interval < 0:
|
||||
return
|
||||
|
||||
interval /= NS_DIV
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
"""Indefinite loop polling the given RPC connection for new blocks in the given interval.
|
||||
|
||||
@@ -30,13 +65,19 @@ class BlockPollSyncer(Syncer):
|
||||
:returns: See chainsyncer.backend.base.Backend.get
|
||||
"""
|
||||
(pair, fltr) = self.backend.get()
|
||||
(target, fltr_target) = self.backend.target()
|
||||
if target == pair[0]:
|
||||
logg.info('syncer was done before it started: {}'.format(self))
|
||||
raise SyncDone(target)
|
||||
|
||||
|
||||
start_tx = pair[1]
|
||||
|
||||
|
||||
while self.running and Syncer.running_global:
|
||||
self.last_start = time.clock_gettime_ns(self.clock_id)
|
||||
if self.pre_callback != None:
|
||||
self.pre_callback()
|
||||
#while True and Syncer.running_global:
|
||||
while True and self.running:
|
||||
if start_tx > 0:
|
||||
start_tx -= 1
|
||||
@@ -48,11 +89,6 @@ class BlockPollSyncer(Syncer):
|
||||
return self.backend.get()
|
||||
except NoBlockForYou as e:
|
||||
break
|
||||
# TODO: To properly handle this, ensure that previous request is rolled back
|
||||
# except sqlalchemy.exc.OperationalError as e:
|
||||
# logg.error('database error: {}'.format(e))
|
||||
# break
|
||||
|
||||
if self.block_callback != None:
|
||||
self.block_callback(block, None)
|
||||
|
||||
@@ -65,4 +101,5 @@ class BlockPollSyncer(Syncer):
|
||||
time.sleep(self.yield_delay)
|
||||
if self.post_callback != None:
|
||||
self.post_callback()
|
||||
time.sleep(interval)
|
||||
|
||||
self.idle(interval)
|
||||
|
||||
Reference in New Issue
Block a user