chainsyncer/chainsyncer/db/models/sync.py

203 lines
6.2 KiB
Python
Raw Normal View History

2021-02-03 20:55:39 +01:00
# standard imports
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, Text, Boolean
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
class BlockchainSync(SessionBase):
"""Syncer control backend.
2021-08-27 14:04:34 +02:00
:param chain_str: Chain spec string representation
:type chain_str: str
2021-02-03 20:55:39 +01:00
:param block_start: Block number to start sync from
:type block_start: number
:param tx_start: Block transaction number to start sync from
:type tx_start: number
:param block_target: Block number to sync until, inclusive
:type block_target: number
"""
2021-02-03 21:10:08 +01:00
__tablename__ = 'chain_sync'
2021-02-03 20:55:39 +01:00
blockchain = Column(String)
2021-08-27 14:04:34 +02:00
"""Chainspec string specifying the blockchain the syncer is running against."""
2021-02-03 20:55:39 +01:00
block_start = Column(Integer)
2021-08-27 14:04:34 +02:00
"""The block height at the start of syncer."""
2021-02-03 20:55:39 +01:00
tx_start = Column(Integer)
2021-08-27 14:04:34 +02:00
"""The transaction index at the start of syncer."""
2021-02-03 20:55:39 +01:00
block_cursor = Column(Integer)
2021-08-27 14:04:34 +02:00
"""The block height for the current state of the syncer."""
2021-02-03 20:55:39 +01:00
tx_cursor = Column(Integer)
2021-08-27 14:04:34 +02:00
"""The transaction index for the current state of the syncer."""
2021-02-03 20:55:39 +01:00
block_target = Column(Integer)
2021-08-27 14:04:34 +02:00
"""The block height at which the syncer should terminate. Will be None for an open-ended syncer."""
2021-02-03 20:55:39 +01:00
date_created = Column(DateTime, default=datetime.datetime.utcnow)
2021-08-27 14:04:34 +02:00
"""Datetime when syncer was first created."""
2021-02-03 20:55:39 +01:00
date_updated = Column(DateTime)
2021-08-27 14:04:34 +02:00
"""Datetime of the latest update of the syncer state."""
def __init__(self, chain_str, block_start, tx_start, block_target=None):
self.blockchain = chain_str
self.block_start = block_start
self.tx_start = tx_start
self.block_cursor = block_start
self.tx_cursor = tx_start
self.block_target = block_target
self.date_created = datetime.datetime.utcnow()
self.date_updated = datetime.datetime.utcnow()
2021-02-03 20:55:39 +01:00
@staticmethod
2021-08-27 14:04:34 +02:00
def first(chain_str, session=None):
2021-02-03 20:55:39 +01:00
"""Check if a sync session for the specified chain already exists.
2021-08-27 14:04:34 +02:00
:param chain_str: Chain spec string representation
:type chain_str: str
2021-02-03 20:55:39 +01:00
:param session: Session to use. If not specified, a separate session will be created for this method only.
2021-08-27 14:04:34 +02:00
:type session: sqlalchemy.orm.session.Sessoin
:returns: Database primary key id of sync record, or None if insert failed
:rtype: number
2021-02-03 20:55:39 +01:00
"""
2021-04-04 15:03:58 +02:00
session = SessionBase.bind_session(session)
2021-02-03 20:55:39 +01:00
q = session.query(BlockchainSync.id)
2021-08-27 14:04:34 +02:00
q = q.filter(BlockchainSync.blockchain==chain_str)
2021-02-03 20:55:39 +01:00
o = q.first()
2021-04-04 15:03:58 +02:00
if o == None:
SessionBase.release_session(session)
return None
sync_id = o.id
SessionBase.release_session(session)
return sync_id
2021-02-03 20:55:39 +01:00
@staticmethod
2021-04-04 15:03:58 +02:00
def get_last(session=None, live=True):
2021-08-27 14:04:34 +02:00
"""Get the most recent syncer record.
If live is set, only the latest open-ended syncer will be returned.
2021-02-03 20:55:39 +01:00
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
2021-08-27 14:04:34 +02:00
:param live: Match only open-ended syncers
:type live: bool
:returns: Syncer database id
:rtype: int
2021-02-03 20:55:39 +01:00
"""
2021-04-04 15:03:58 +02:00
session = SessionBase.bind_session(session)
q = session.query(BlockchainSync.id)
if live:
q = q.filter(BlockchainSync.block_target==None)
else:
q = q.filter(BlockchainSync.block_target!=None)
2021-02-03 20:55:39 +01:00
q = q.order_by(BlockchainSync.date_created.desc())
2021-04-04 15:03:58 +02:00
object_id = q.first()
2021-02-03 20:55:39 +01:00
2021-04-04 15:03:58 +02:00
SessionBase.release_session(session)
if object_id == None:
return None
2021-02-03 20:55:39 +01:00
2021-04-04 15:03:58 +02:00
return object_id[0]
2021-02-03 20:55:39 +01:00
@staticmethod
def get_unsynced(session=None):
"""Get previous bounded sync sessions that did not complete.
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
:returns: Syncer database ids
2021-08-27 14:04:34 +02:00
:rtype: list
2021-02-03 20:55:39 +01:00
"""
unsynced = []
local_session = False
if session == None:
session = SessionBase.create_session()
local_session = True
q = session.query(BlockchainSync.id)
q = q.filter(BlockchainSync.block_target!=None)
q = q.filter(BlockchainSync.block_cursor<BlockchainSync.block_target)
q = q.order_by(BlockchainSync.date_created.asc())
for u in q.all():
unsynced.append(u[0])
if local_session:
session.close()
return unsynced
def set(self, block_height, tx_height):
2021-08-27 14:04:34 +02:00
"""Set the cursor height of the syncer instance.
2021-02-03 20:55:39 +01:00
Only manipulates object, does not transaction or commit to backend.
:param block_height: Block number
:type block_height: number
:param tx_height: Block transaction number
:type tx_height: number
2021-08-27 14:04:34 +02:00
:rtype: tuple
:returns: Stored block height, transaction index
2021-02-03 20:55:39 +01:00
"""
self.block_cursor = block_height
self.tx_cursor = tx_height
2021-04-04 15:03:58 +02:00
self.date_updated = datetime.datetime.utcnow()
return (self.block_cursor, self.tx_cursor,)
2021-02-03 20:55:39 +01:00
def cursor(self):
"""Get current state of cursor from cached instance.
2021-08-27 14:04:34 +02:00
:returns: Block height, transaction index
2021-02-03 20:55:39 +01:00
:rtype: tuple
"""
return (self.block_cursor, self.tx_cursor)
def start(self):
"""Get sync block start position from cached instance.
2021-08-27 14:04:34 +02:00
:returns: Block height, transaction index
2021-02-03 20:55:39 +01:00
:rtype: tuple
"""
return (self.block_start, self.tx_start)
def target(self):
"""Get sync block upper bound from cached instance.
2021-08-27 14:04:34 +02:00
:returns: Block number. Returns None if syncer is open-ended.
:rtype: int
2021-02-03 20:55:39 +01:00
"""
return self.block_target
def chain(self):
2021-08-27 14:04:34 +02:00
"""Get chain string representation for which the cached instance represents.
2021-02-03 20:55:39 +01:00
"""
return self.blockchain
2021-04-04 15:03:58 +02:00
def __str__(self):
return """object_id: {}
start: {}:{}
cursor: {}:{}
target: {}
""".format(
self.id,
self.block_start,
self.tx_start,
self.block_cursor,
self.tx_cursor,
self.block_target,
)