Implement driver processing

This commit is contained in:
lash 2022-03-18 01:11:30 +00:00
parent 78bd6ca538
commit 75a6d2975e
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
7 changed files with 108 additions and 43 deletions

View File

@ -1,6 +1,7 @@
# standard imports
import logging
import time
import signal
# local imports
from chainsyncer.error import (
@ -17,8 +18,18 @@ NS_DIV = 1000000000
class SyncDriver:
running_global = True
"""If set to false syncer will terminate polling loop."""
yield_delay=0.005
"""Delay between each processed block."""
signal_request = [signal.SIGINT, signal.SIGTERM]
"""Signals to catch to request shutdown."""
signal_set = False
"""Whether shutdown signal has been received."""
name = 'base'
"""Syncer name, to be overriden for each extended implementation."""
def __init__(self, conn, store, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None):
def __init__(self, store, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None):
self.store = store
self.running = True
self.pre_callback = pre_callback
@ -27,7 +38,7 @@ class SyncDriver:
self.idle_callback = idle_callback
self.last_start = 0
self.clock_id = time.CLOCK_MONOTONIC_RAW
self.session = SyncSession(self.store)
self.store.start(offset=offset, target=target)
def __sig_terminate(self, sig, frame):
@ -43,15 +54,16 @@ class SyncDriver:
self.running = False
def run(self):
def run(self, conn):
while self.running_global:
item = self.store.next_item()
self.session = SyncSession(self.store)
item = self.session.start()
logg.debug('item {}'.format(item))
if item == None:
self.running = False
self.running_global = False
break
self.loop(item)
self.loop(conn, item)
def idle(self, interval):
@ -80,20 +92,18 @@ class SyncDriver:
time.sleep(interval)
def loop(self, item):
def loop(self, conn, item):
tx_start = item.tx_cursor
while self.running and SyncDriver.running_global:
self.last_start = time.clock_gettime_ns(self.clock_id)
if self.pre_callback != None:
self.pre_callback()
while True and self.running:
if start_tx > 0:
start_tx -= 1
continue
try:
block = self.get(conn)
block = self.get(conn, item)
except SyncDone as e:
logg.info('all blocks sumitted for processing: {}'.format(e))
return self.backend.get()
return
except NoBlockForYou as e:
break
if self.block_callback != None:
@ -101,21 +111,22 @@ class SyncDriver:
last_block = block
try:
self.process(conn, block)
self.process(conn, item, block, tx_start)
except IndexError:
self.backend.set(block.number + 1, 0)
start_tx = 0
item.next(advance_block=True)
tx_start = 0
time.sleep(self.yield_delay)
if self.post_callback != None:
self.post_callback()
self.idle(interval)
def process_single(self, conn, block, tx):
self.session.filter(conn, block, tx)
def process(self, conn, block):
def process(self, conn, block, tx_start):
raise NotImplementedError()

View File

@ -17,16 +17,10 @@ class SyncSession:
self.filters = self.session_store.filters
# def register(self, fltr):
# if self.started:
# raise RuntimeError('filters cannot be changed after syncer start')
# self.session_store.register(fltr)
# self.filters.append(fltr)
def start(self, offset=0, target=-1):
self.session_store.start(offset=offset, target=target)
self.item = self.session_store.next_item()
return self.item
def filter(self, conn, block, tx):
@ -43,4 +37,5 @@ class SyncSession:
raise BackendError('filter state inconsitent with filter list')
except FilterDone:
self.item.reset()
self.next()
self.session_store.disconnect()

View File

@ -47,8 +47,7 @@ class SyncState:
s = fltr.common_name()
self.state_store.add(s)
n = self.state_store.from_name(s)
logg.debug('add {} {} {}'.format(s, n, self))
logg.debug('add filter {} {} {}'.format(s, n, self))
def sum(self):

View File

@ -33,7 +33,8 @@ class SyncFsItem:
if started:
match_state = self.sync_state.SYNC
v = self.sync_state.get(self.state_key)
self.cursor = int.from_bytes(v, 'big')
self.cursor = int.from_bytes(v[:4], 'big')
self.tx_cursor = int.from_bytes(v[4:], 'big')
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid:
raise LockError(s)
@ -60,11 +61,22 @@ class SyncFsItem:
raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
def next(self, advance_block=False):
v = self.sync_state.get(self.state_key)
block_number = int.from_bytes(v, 'big')
block_number = int.from_bytes(v[:4], 'big')
tx_index = int.from_bytes(v[4:], 'big')
if advance_block:
block_number += 1
tx_index = 0
if self.target >= 0 and block_number > self.target:
raise SyncDone(self.target)
else:
tx_index += 1
self.cursor = block_number
self.tx_cursor = tx_index
v = block_number.to_bytes(4, 'big')
self.sync_state.replace(self.state_key, v)
@ -175,7 +187,6 @@ class SyncFsStore:
def __load(self, target):
self.state.sync(self.state.NEW)
self.state.sync(self.state.SYNC)
@ -220,13 +231,18 @@ class SyncFsStore:
def start(self, offset=0, target=-1):
if self.started:
return
self.__load(target)
if self.first:
block_number = offset
block_number_bytes = block_number.to_bytes(4, 'big')
state_bytes = block_number.to_bytes(4, 'big')
tx_index = 0
state_bytes += tx_index.to_bytes(4, 'big')
block_number_str = str(block_number)
self.state.put(block_number_str, block_number_bytes)
self.state.put(block_number_str, state_bytes)
self.filter_state.put(block_number_str)
o = SyncFsItem(block_number, target, self.state, self.filter_state)
self.items[block_number] = o
@ -239,10 +255,12 @@ class SyncFsStore:
def stop(self):
if self.target == 0:
block_number = self.height + 1
block_number_bytes = block_number.to_bytes(4, 'big')
self.state.put(str(block_number), block_number_bytes)
# if self.target == 0:
# block_number = self.height + 1
# block_number_bytes = block_number.to_bytes(4, 'big')
# block_number_bytes = block_number.to_bytes(4, 'big')
# self.state.put(str(block_number), block_number_bytes)
pass
def get(self, k):

View File

@ -10,6 +10,7 @@ from shep.state import State
# local imports
#from chainsyncer.driver.history import HistorySyncer
from chainsyncer.error import NoBlockForYou
from chainsyncer.driver import SyncDriver
logg = logging.getLogger().getChild(__name__)
@ -105,6 +106,29 @@ class MockFilter:
return self.brk
class MockDriver(SyncDriver):
def __init__(self, store, offset=0, target=-1):
super(MockDriver, self).__init__(store, offset=offset, target=target)
self.blocks = {}
def add_block(self, block):
self.blocks[block.number] = block
def get(self, conn, item):
return self.blocks[item.cursor]
def process(self, conn, item, block, tx_start):
i = tx_start
while True:
tx = block.tx(i)
self.process_single(conn, block, tx)
item.next()
i += 1
#class TestSyncer(HistorySyncer):
# """Unittest extension of history syncer driver.

View File

@ -183,8 +183,9 @@ class TestFs(unittest.TestCase):
o.release()
with self.assertRaises(FilterDone):
o.advance()
with self.assertRaises(SyncDone):
o.reset()
with self.assertRaises(SyncDone):
o.next(advance_block=True)
if __name__ == '__main__':

View File

@ -20,6 +20,7 @@ from chainsyncer.unittest import (
MockConn,
MockTx,
MockBlock,
MockDriver,
)
from chainsyncer.driver import SyncDriver
@ -53,15 +54,31 @@ class TestFilter(unittest.TestCase):
tx_hash = os.urandom(32).hex()
tx = MockTx(42, tx_hash)
block = MockBlock(1, [tx_hash])
with self.assertRaises(SyncDone):
session.filter(self.conn, block, tx)
self.assertEqual(len(fltr_one.contents), 2)
def test_driver(self):
drv = SyncDriver(self.conn, self.store)
drv.run()
drv = MockDriver(self.store, target=1)
tx_hash = os.urandom(32).hex()
tx = MockTx(0, tx_hash)
block = MockBlock(0, [tx_hash])
drv.add_block(block)
tx_hash_one = os.urandom(32).hex()
tx = MockTx(0, tx_hash_one)
tx_hash_two = os.urandom(32).hex()
tx = MockTx(1, tx_hash_two)
block = MockBlock(1, [tx_hash_one, tx_hash_two])
drv.add_block(block)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
with self.assertRaises(SyncDone):
drv.run(self.conn)
self.assertEqual(len(fltr_one.contents), 3)
if __name__ == '__main__':