From c599a80a6c9dabf8a8d8c6dbbbd1f03776dff8c5 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 23 Feb 2021 22:56:12 +0100 Subject: [PATCH] Remove unique chain row constraint --- chainsyncer/backend.py | 14 ++++-- chainsyncer/db/migrations/sqlalchemy.py | 2 +- chainsyncer/db/models/sync.py | 1 + chainsyncer/driver.py | 58 +++++++++++++++++++++---- chainsyncer/error.py | 5 ++- setup.cfg | 2 +- 6 files changed, 66 insertions(+), 16 deletions(-) diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py index 0dddce1..7f4ddc1 100644 --- a/chainsyncer/backend.py +++ b/chainsyncer/backend.py @@ -10,7 +10,7 @@ from chainsyncer.db.models.sync import BlockchainSync from chainsyncer.db.models.filter import BlockchainSyncFilter from chainsyncer.db.models.base import SessionBase -logg = logging.getLogger() +logg = logging.getLogger(__name__) class SyncerBackend: @@ -203,8 +203,8 @@ class SyncerBackend: session.close() for object_id in object_ids: - logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) s = SyncerBackend(chain_spec, object_id) + logg.debug('resume unfinished {}'.format(s)) syncers.append(s) session = SessionBase.create_session() @@ -237,9 +237,10 @@ class SyncerBackend: session.add(of) session.commit() - syncers.append(SyncerBackend(chain_spec, object_id)) + backend = SyncerBackend(chain_spec, object_id) + syncers.append(backend) - logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height)) + logg.debug('last live session resume {}'.format(backend)) session.close() @@ -287,6 +288,11 @@ class SyncerBackend: self.connect() self.db_object_filter.set(n) self.disconnect() + + + + def __str__(self): + return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target()) diff --git a/chainsyncer/db/migrations/sqlalchemy.py b/chainsyncer/db/migrations/sqlalchemy.py index 2d50f99..0b78c1e 100644 --- a/chainsyncer/db/migrations/sqlalchemy.py +++ b/chainsyncer/db/migrations/sqlalchemy.py @@ -11,7 +11,7 @@ def r0_0_1_u(): op.create_table( 'chain_sync', sa.Column('id', sa.Integer, primary_key=True), - sa.Column('blockchain', sa.String, nullable=False, unique=True), + sa.Column('blockchain', sa.String, nullable=False), sa.Column('block_start', sa.Integer, nullable=False, default=0), sa.Column('tx_start', sa.Integer, nullable=False, default=0), sa.Column('block_cursor', sa.Integer, nullable=False, default=0), diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py index d1d8ff5..036c821 100644 --- a/chainsyncer/db/models/sync.py +++ b/chainsyncer/db/models/sync.py @@ -126,6 +126,7 @@ class BlockchainSync(SessionBase): """ self.block_cursor = block_height self.tx_cursor = tx_height + self.date_updated = datetime.datetime.utcnow() return (self.block_cursor, self.tx_cursor,) diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py index 6f8d9b5..0a6ca3c 100644 --- a/chainsyncer/driver.py +++ b/chainsyncer/driver.py @@ -13,12 +13,16 @@ from chainlib.eth.tx import receipt # local imports from chainsyncer.filter import SyncFilter +from chainsyncer.error import ( + SyncDone, + NoBlockForYou, + ) -logg = logging.getLogger() +logg = logging.getLogger(__name__) def noop_callback(block_number, tx_index, s=None): - logg.debug('({},{}) {}'.format(block_number, tx_index, s)) + logg.debug('noop callback ({},{}) {}'.format(block_number, tx_index, s)) class Syncer: @@ -83,8 +87,10 @@ class BlockPollSyncer(Syncer): while True and Syncer.running_global: try: block = self.get(conn) - except Exception as e: - logg.debug('erro {}'.format(e)) + except SyncDone as e: + logg.info('sync done: {}'.format(e)) + return self.backend.get() + except NoBlockForYou as e: break last_block = block.number self.process(conn, block) @@ -97,10 +103,6 @@ class BlockPollSyncer(Syncer): class HeadSyncer(BlockPollSyncer): - def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback): - super(HeadSyncer, self).__init__(backend, loop_callback, progress_callback) - - def process(self, conn, block): logg.debug('process block {}'.format(block)) i = 0 @@ -120,13 +122,51 @@ class HeadSyncer(BlockPollSyncer): def get(self, conn): - #(block_number, tx_number) = self.backend.get() (height, flags) = self.backend.get() block_number = height[0] block_hash = [] o = block_by_number(block_number) r = conn.do(o) + if r == None: + raise NoBlockForYou() b = Block(r) logg.debug('get {}'.format(b)) return b + + + def __str__(self): + return '[headsyncer] {}'.format(str(self.backend)) + + +class HistorySyncer(HeadSyncer): + + def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback): + super(HeadSyncer, self).__init__(backend, loop_callback, progress_callback) + self.block_target = None + (block_number, flags) = self.backend.target() + if block_number == None: + raise AttributeError('backend has no future target. Use HeadSyner instead') + self.block_target = block_number + + + def get(self, conn): + (height, flags) = self.backend.get() + if self.block_target < height[0]: + raise SyncDone(self.block_target) + block_number = height[0] + block_hash = [] + o = block_by_number(block_number) + r = conn.do(o) + if r == None: + raise NoBlockForYou() + b = Block(r) + logg.debug('get {}'.format(b)) + + return b + + + def __str__(self): + return '[historysyncer] {}'.format(str(self.backend)) + + diff --git a/chainsyncer/error.py b/chainsyncer/error.py index 9d14d45..33b865a 100644 --- a/chainsyncer/error.py +++ b/chainsyncer/error.py @@ -1,8 +1,11 @@ -class LoopDone(Exception): +class SyncDone(Exception): """Exception raised when a syncing is complete. """ pass +class NoBlockForYou(Exception): + pass + class RequestError(Exception): pass diff --git a/setup.cfg b/setup.cfg index 1fb77ad..e6414b9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a15 +version = 0.0.1a16 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no