chainsyncer/chainsyncer/driver/base.py

145 lines
4.2 KiB
Python
Raw Normal View History

2021-08-26 10:09:47 +02:00
# standard imports
import logging
import time
2022-03-18 02:11:30 +01:00
import signal
2021-08-26 10:09:47 +02:00
# local imports
from chainsyncer.error import (
SyncDone,
NoBlockForYou,
)
2022-03-18 00:48:23 +01:00
from chainsyncer.session import SyncSession
2021-08-26 10:09:47 +02:00
2022-03-18 00:48:23 +01:00
logg = logging.getLogger(__name__)
2021-08-26 10:09:47 +02:00
2022-01-30 10:51:23 +01:00
NS_DIV = 1000000000
2021-08-26 10:09:47 +02:00
2022-03-18 00:48:23 +01:00
class SyncDriver:
2021-08-26 10:09:47 +02:00
2022-03-18 00:48:23 +01:00
running_global = True
2022-03-18 02:11:30 +01:00
"""If set to false syncer will terminate polling loop."""
yield_delay=0.005
"""Delay between each processed block."""
signal_request = [signal.SIGINT, signal.SIGTERM]
"""Signals to catch to request shutdown."""
signal_set = False
"""Whether shutdown signal has been received."""
name = 'base'
"""Syncer name, to be overriden for each extended implementation."""
def __init__(self, store, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None):
2022-03-18 00:48:23 +01:00
self.store = store
self.running = True
self.pre_callback = pre_callback
self.post_callback = post_callback
self.block_callback = block_callback
2022-01-30 10:51:23 +01:00
self.idle_callback = idle_callback
self.last_start = 0
self.clock_id = time.CLOCK_MONOTONIC_RAW
self.store.connect()
2022-03-18 02:11:30 +01:00
self.store.start(offset=offset, target=target)
2022-04-02 13:21:58 +02:00
if not SyncDriver.signal_set:
for sig in SyncDriver.signal_request:
signal.signal(sig, self.__sig_terminate)
SyncDriver.signal_set = True
2022-03-18 00:48:23 +01:00
def __sig_terminate(self, sig, frame):
logg.warning('got signal {}'.format(sig))
self.terminate()
def terminate(self):
"""Set syncer to terminate as soon as possible.
"""
logg.info('termination requested!')
SyncDriver.running_global = False
self.running = False
2022-03-29 10:12:43 +02:00
def run(self, conn, interval=1):
2022-03-18 00:48:23 +01:00
while self.running_global:
2022-03-18 02:11:30 +01:00
self.session = SyncSession(self.store)
item = self.session.start()
2022-03-18 00:48:23 +01:00
if item == None:
self.running = False
self.running_global = False
break
2022-03-29 10:12:43 +02:00
self.loop(conn, item, interval=interval)
2022-01-30 10:51:23 +01:00
def idle(self, interval):
interval *= NS_DIV
idle_start = time.clock_gettime_ns(self.clock_id)
delta = idle_start - self.last_start
if delta > interval:
interval /= NS_DIV
time.sleep(interval)
return
if self.idle_callback != None:
r = True
while r:
before = time.clock_gettime_ns(self.clock_id)
r = self.idle_callback(interval)
after = time.clock_gettime_ns(self.clock_id)
delta = after - before
if delta < 0:
return
interval -= delta
if interval < 0:
return
interval /= NS_DIV
time.sleep(interval)
2022-03-29 10:12:43 +02:00
def loop(self, conn, item, interval=1):
2022-03-19 03:47:25 +01:00
logg.debug('started loop')
2022-03-18 00:48:23 +01:00
while self.running and SyncDriver.running_global:
2022-01-30 10:51:23 +01:00
self.last_start = time.clock_gettime_ns(self.clock_id)
2022-03-30 08:55:21 +02:00
2021-08-26 10:09:47 +02:00
if self.pre_callback != None:
self.pre_callback()
2022-03-30 08:55:21 +02:00
2021-08-27 13:41:03 +02:00
while True and self.running:
2021-08-26 10:09:47 +02:00
try:
2022-03-18 02:11:30 +01:00
block = self.get(conn, item)
2021-08-26 10:09:47 +02:00
except SyncDone as e:
logg.info('all blocks sumitted for processing: {}'.format(e))
2022-03-18 02:11:30 +01:00
return
2021-08-26 10:09:47 +02:00
except NoBlockForYou as e:
break
if self.block_callback != None:
self.block_callback(block, None)
try:
2022-03-30 08:55:21 +02:00
self.process(conn, item, block)
2021-08-26 10:09:47 +02:00
except IndexError:
2022-03-18 02:11:30 +01:00
item.next(advance_block=True)
2021-08-26 10:09:47 +02:00
time.sleep(self.yield_delay)
2022-03-30 08:55:21 +02:00
if self.store.target > -1 and block.number >= self.store.target:
self.running = False
2021-08-26 10:09:47 +02:00
if self.post_callback != None:
self.post_callback()
2022-03-29 14:56:15 +02:00
2022-01-30 10:51:23 +01:00
self.idle(interval)
2022-03-18 00:48:23 +01:00
2022-03-18 02:11:30 +01:00
2022-03-18 00:48:23 +01:00
def process_single(self, conn, block, tx):
self.session.filter(conn, block, tx)
2022-03-30 08:55:21 +02:00
def process(self, conn, item, block):
2022-03-18 00:48:23 +01:00
raise NotImplementedError()
def get(self, conn):
raise NotImplementedError()
2022-04-28 08:45:59 +02:00