Repurpose loop callbacks

This commit is contained in:
nolash 2021-04-09 16:07:10 +02:00
parent 1eceae6353
commit c4f6bc807b
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 31 additions and 22 deletions

View File

@ -298,12 +298,13 @@ class SyncerBackend:
class MemBackend: class MemBackend:
def __init__(self, chain_spec, object_id): def __init__(self, chain_spec, object_id, target_block=None):
self.object_id = object_id self.object_id = object_id
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.block_height = 0 self.block_height = 0
self.tx_height = 0 self.tx_height = 0
self.flags = 0 self.flags = 0
self.target_block = target_block
self.db_session = None self.db_session = None
@ -325,6 +326,10 @@ class MemBackend:
return ((self.block_height, self.tx_height), self.flags) return ((self.block_height, self.tx_height), self.flags)
def target(self):
return (self.target_block, self.flags)
def register_filter(self, name): def register_filter(self, name):
pass pass

View File

@ -22,8 +22,8 @@ from chainsyncer.error import (
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
def noop_callback(block_number, tx_index, s=None): def noop_callback(block, tx):
logg.debug('noop callback ({},{}) {}'.format(block_number, tx_index, s)) logg.debug('noop callback ({},{})'.format(block, tx))
class Syncer: class Syncer:
@ -32,13 +32,14 @@ class Syncer:
yield_delay=0.005 yield_delay=0.005
signal_set = False signal_set = False
def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback): def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None):
self.cursor = None self.cursor = None
self.running = True self.running = True
self.backend = backend self.backend = backend
self.filter = SyncFilter(backend) self.filter = SyncFilter(backend)
self.progress_callback = progress_callback self.block_callback = block_callback
self.loop_callback = loop_callback self.pre_callback = pre_callback
self.post_callback = post_callback
if not Syncer.signal_set: if not Syncer.signal_set:
signal.signal(signal.SIGINT, Syncer.__sig_terminate) signal.signal(signal.SIGINT, Syncer.__sig_terminate)
signal.signal(signal.SIGTERM, Syncer.__sig_terminate) signal.signal(signal.SIGTERM, Syncer.__sig_terminate)
@ -73,18 +74,18 @@ class Syncer:
class BlockPollSyncer(Syncer): class BlockPollSyncer(Syncer):
def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback): def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None):
super(BlockPollSyncer, self).__init__(backend, loop_callback, progress_callback) super(BlockPollSyncer, self).__init__(backend, pre_callback, block_callback, post_callback)
def loop(self, interval, conn): def loop(self, interval, conn):
(g, flags) = self.backend.get() #(g, flags) = self.backend.get()
last_tx = g[1] #last_tx = g[1]
last_block = g[0] #last_block = g[0]
self.progress_callback(last_block, last_tx, 'loop started') #self.progress_callback(last_block, last_tx, 'loop started')
while self.running and Syncer.running_global: while self.running and Syncer.running_global:
if self.loop_callback != None: if self.pre_callback != None:
self.loop_callback(last_block, last_tx) self.pre_callback()
while True and Syncer.running_global: while True and Syncer.running_global:
try: try:
block = self.get(conn) block = self.get(conn)
@ -97,12 +98,16 @@ class BlockPollSyncer(Syncer):
# except sqlalchemy.exc.OperationalError as e: # except sqlalchemy.exc.OperationalError as e:
# logg.error('database error: {}'.format(e)) # logg.error('database error: {}'.format(e))
# break # break
last_block = block.number
if self.block_callback != None:
self.block_callback(block, None)
last_block = block
self.process(conn, block) self.process(conn, block)
start_tx = 0 start_tx = 0
self.progress_callback(last_block, last_tx, 'processed block {}'.format(self.backend.get()))
time.sleep(self.yield_delay) time.sleep(self.yield_delay)
self.progress_callback(last_block + 1, last_tx, 'loop ended') if self.post_callback != None:
self.post_callback()
time.sleep(interval) time.sleep(interval)
@ -117,7 +122,6 @@ class HeadSyncer(BlockPollSyncer):
tx = block.tx(i) tx = block.tx(i)
rcpt = conn.do(receipt(tx.hash)) rcpt = conn.do(receipt(tx.hash))
tx.apply_receipt(rcpt) tx.apply_receipt(rcpt)
self.progress_callback(block.number, i, 'processing {}'.format(repr(tx)))
self.backend.set(block.number, i) self.backend.set(block.number, i)
self.filter.apply(conn, block, tx) self.filter.apply(conn, block, tx)
except IndexError as e: except IndexError as e:
@ -146,8 +150,8 @@ class HeadSyncer(BlockPollSyncer):
class HistorySyncer(HeadSyncer): class HistorySyncer(HeadSyncer):
def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback): def __init__(self, backend, pre_callback=None, block_callback=None, post_callback=None):
super(HeadSyncer, self).__init__(backend, loop_callback, progress_callback) super(HeadSyncer, self).__init__(backend, pre_callback, block_callback, post_callback)
self.block_target = None self.block_target = None
(block_number, flags) = self.backend.target() (block_number, flags) = self.backend.target()
if block_number == None: if block_number == None:

View File

@ -3,4 +3,4 @@ SQLAlchemy==1.3.20
confini~=0.3.6rc3 confini~=0.3.6rc3
semver==2.13.0 semver==2.13.0
hexathon~=0.0.1a7 hexathon~=0.0.1a7
chainlib~=0.0.2a1 chainlib~=0.0.2a6

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chainsyncer name = chainsyncer
version = 0.0.1a21 version = 0.0.1a22
description = Generic blockchain syncer driver description = Generic blockchain syncer driver
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no