53
chainsyncer/db/__init__.py
Normal file
53
chainsyncer/db/__init__.py
Normal file
@@ -0,0 +1,53 @@
|
||||
# standard imports
|
||||
import os
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def dsn_from_config(config):
|
||||
"""Generate a dsn string from the provided config dict.
|
||||
|
||||
The config dict must include all well-known database connection parameters, and must implement the method "get(key)" to retrieve them. Any missing parameters will be be rendered as the literal string "None"
|
||||
|
||||
:param config: Configuration object
|
||||
:type config: Varies
|
||||
:returns: dsn string
|
||||
:rtype: str
|
||||
"""
|
||||
scheme = config.get('DATABASE_ENGINE')
|
||||
if config.get('DATABASE_DRIVER') != None:
|
||||
scheme += '+{}'.format(config.get('DATABASE_DRIVER'))
|
||||
|
||||
dsn = ''
|
||||
dsn_out = ''
|
||||
if config.get('DATABASE_ENGINE') == 'sqlite':
|
||||
dsn = '{}:///{}'.format(
|
||||
scheme,
|
||||
config.get('DATABASE_NAME'),
|
||||
)
|
||||
dsn_out = dsn
|
||||
|
||||
else:
|
||||
dsn = '{}://{}:{}@{}:{}/{}'.format(
|
||||
scheme,
|
||||
config.get('DATABASE_USER'),
|
||||
config.get('DATABASE_PASSWORD'),
|
||||
config.get('DATABASE_HOST'),
|
||||
config.get('DATABASE_PORT'),
|
||||
config.get('DATABASE_NAME'),
|
||||
)
|
||||
dsn_out = '{}://{}:{}@{}:{}/{}'.format(
|
||||
scheme,
|
||||
config.get('DATABASE_USER'),
|
||||
'***',
|
||||
config.get('DATABASE_HOST'),
|
||||
config.get('DATABASE_PORT'),
|
||||
config.get('DATABASE_NAME'),
|
||||
)
|
||||
logg.debug('parsed dsn from config: {}'.format(dsn_out))
|
||||
return dsn
|
||||
|
||||
0
chainsyncer/db/migrations/__init__.py
Normal file
0
chainsyncer/db/migrations/__init__.py
Normal file
1
chainsyncer/db/migrations/default/README
Normal file
1
chainsyncer/db/migrations/default/README
Normal file
@@ -0,0 +1 @@
|
||||
Generic single-database configuration.
|
||||
85
chainsyncer/db/migrations/default/alembic.ini
Normal file
85
chainsyncer/db/migrations/default/alembic.ini
Normal file
@@ -0,0 +1,85 @@
|
||||
# A generic, single database configuration.
|
||||
|
||||
[alembic]
|
||||
# path to migration scripts
|
||||
script_location = .
|
||||
|
||||
# template used to generate migration files
|
||||
# file_template = %%(rev)s_%%(slug)s
|
||||
|
||||
# timezone to use when rendering the date
|
||||
# within the migration file as well as the filename.
|
||||
# string value is passed to dateutil.tz.gettz()
|
||||
# leave blank for localtime
|
||||
# timezone =
|
||||
|
||||
# max length of characters to apply to the
|
||||
# "slug" field
|
||||
# truncate_slug_length = 40
|
||||
|
||||
# set to 'true' to run the environment during
|
||||
# the 'revision' command, regardless of autogenerate
|
||||
# revision_environment = false
|
||||
|
||||
# set to 'true' to allow .pyc and .pyo files without
|
||||
# a source .py file to be detected as revisions in the
|
||||
# versions/ directory
|
||||
# sourceless = false
|
||||
|
||||
# version location specification; this defaults
|
||||
# to ./versions. When using multiple version
|
||||
# directories, initial revisions must be specified with --version-path
|
||||
# version_locations = %(here)s/bar %(here)s/bat ./versions
|
||||
|
||||
# the output encoding used when revision files
|
||||
# are written from script.py.mako
|
||||
# output_encoding = utf-8
|
||||
|
||||
sqlalchemy.url = driver://user:pass@localhost/dbname
|
||||
|
||||
|
||||
[post_write_hooks]
|
||||
# post_write_hooks defines scripts or Python functions that are run
|
||||
# on newly generated revision scripts. See the documentation for further
|
||||
# detail and examples
|
||||
|
||||
# format using "black" - use the console_scripts runner, against the "black" entrypoint
|
||||
# hooks=black
|
||||
# black.type=console_scripts
|
||||
# black.entrypoint=black
|
||||
# black.options=-l 79
|
||||
|
||||
# Logging configuration
|
||||
[loggers]
|
||||
keys = root,sqlalchemy,alembic
|
||||
|
||||
[handlers]
|
||||
keys = console
|
||||
|
||||
[formatters]
|
||||
keys = generic
|
||||
|
||||
[logger_root]
|
||||
#level = WARN
|
||||
handlers = console
|
||||
qualname =
|
||||
|
||||
[logger_sqlalchemy]
|
||||
level = WARN
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
|
||||
[logger_alembic]
|
||||
level = INFO
|
||||
handlers =
|
||||
qualname = alembic
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stderr,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(levelname)-5.5s [%(name)s] %(message)s
|
||||
datefmt = %H:%M:%S
|
||||
77
chainsyncer/db/migrations/default/env.py
Normal file
77
chainsyncer/db/migrations/default/env.py
Normal file
@@ -0,0 +1,77 @@
|
||||
from logging.config import fileConfig
|
||||
|
||||
from sqlalchemy import engine_from_config
|
||||
from sqlalchemy import pool
|
||||
|
||||
from alembic import context
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
# access to the values within the .ini file in use.
|
||||
config = context.config
|
||||
|
||||
# Interpret the config file for Python logging.
|
||||
# This line sets up loggers basically.
|
||||
fileConfig(config.config_file_name)
|
||||
|
||||
# add your model's MetaData object here
|
||||
# for 'autogenerate' support
|
||||
# from myapp import mymodel
|
||||
# target_metadata = mymodel.Base.metadata
|
||||
target_metadata = None
|
||||
|
||||
# other values from the config, defined by the needs of env.py,
|
||||
# can be acquired:
|
||||
# my_important_option = config.get_main_option("my_important_option")
|
||||
# ... etc.
|
||||
|
||||
|
||||
def run_migrations_offline():
|
||||
"""Run migrations in 'offline' mode.
|
||||
|
||||
This configures the context with just a URL
|
||||
and not an Engine, though an Engine is acceptable
|
||||
here as well. By skipping the Engine creation
|
||||
we don't even need a DBAPI to be available.
|
||||
|
||||
Calls to context.execute() here emit the given string to the
|
||||
script output.
|
||||
|
||||
"""
|
||||
url = config.get_main_option("sqlalchemy.url")
|
||||
context.configure(
|
||||
url=url,
|
||||
target_metadata=target_metadata,
|
||||
literal_binds=True,
|
||||
dialect_opts={"paramstyle": "named"},
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
def run_migrations_online():
|
||||
"""Run migrations in 'online' mode.
|
||||
|
||||
In this scenario we need to create an Engine
|
||||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
connectable = engine_from_config(
|
||||
config.get_section(config.config_ini_section),
|
||||
prefix="sqlalchemy.",
|
||||
poolclass=pool.NullPool,
|
||||
)
|
||||
|
||||
with connectable.connect() as connection:
|
||||
context.configure(
|
||||
connection=connection, target_metadata=target_metadata
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
if context.is_offline_mode():
|
||||
run_migrations_offline()
|
||||
else:
|
||||
run_migrations_online()
|
||||
37
chainsyncer/db/migrations/default/export.py
Normal file
37
chainsyncer/db/migrations/default/export.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.src.sync import (
|
||||
upgrade as upgrade_sync,
|
||||
downgrade as downgrade_sync,
|
||||
)
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.src.sync_tx import (
|
||||
upgrade as upgrade_sync_tx,
|
||||
downgrade as downgrade_sync_tx,
|
||||
)
|
||||
|
||||
def chainsyncer_upgrade(major=0, minor=0, patch=3):
|
||||
r0_0_1_u()
|
||||
if patch >= 3:
|
||||
r0_0_3_u()
|
||||
|
||||
def chainsyncer_downgrade(major=0, minor=0, patch=3):
|
||||
if patch >= 3:
|
||||
r0_0_3_d()
|
||||
r0_0_1_d()
|
||||
|
||||
def r0_0_1_u():
|
||||
upgrade_sync()
|
||||
|
||||
def r0_0_1_d():
|
||||
downgrade_sync()
|
||||
|
||||
|
||||
# 0.0.3
|
||||
|
||||
def r0_0_3_u():
|
||||
upgrade_sync_tx()
|
||||
|
||||
def r0_0_3_d():
|
||||
downgrade_sync_tx()
|
||||
24
chainsyncer/db/migrations/default/script.py.mako
Normal file
24
chainsyncer/db/migrations/default/script.py.mako
Normal file
@@ -0,0 +1,24 @@
|
||||
"""${message}
|
||||
|
||||
Revision ID: ${up_revision}
|
||||
Revises: ${down_revision | comma,n}
|
||||
Create Date: ${create_date}
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
${imports if imports else ""}
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = ${repr(up_revision)}
|
||||
down_revision = ${repr(down_revision)}
|
||||
branch_labels = ${repr(branch_labels)}
|
||||
depends_on = ${repr(depends_on)}
|
||||
|
||||
|
||||
def upgrade():
|
||||
${upgrades if upgrades else "pass"}
|
||||
|
||||
|
||||
def downgrade():
|
||||
${downgrades if downgrades else "pass"}
|
||||
@@ -0,0 +1,14 @@
|
||||
"""base setup
|
||||
|
||||
Revision ID: 452ecfa81de3
|
||||
Revises:
|
||||
Create Date: 2021-07-16 16:29:32.460027
|
||||
|
||||
"""
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '452ecfa81de3'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.src.sync import upgrade, downgrade
|
||||
@@ -0,0 +1,14 @@
|
||||
"""sync-tx
|
||||
|
||||
Revision ID: a2ce6826c5eb
|
||||
Revises: 452ecfa81de3
|
||||
Create Date: 2021-07-16 18:17:53.439721
|
||||
|
||||
"""
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'a2ce6826c5eb'
|
||||
down_revision = '452ecfa81de3'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.src.sync_tx import upgrade, downgrade
|
||||
33
chainsyncer/db/migrations/default/versions/src/sync.py
Normal file
33
chainsyncer/db/migrations/default/versions/src/sync.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'chain_sync',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('blockchain', sa.String, nullable=False),
|
||||
sa.Column('block_start', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('block_target', sa.Integer, nullable=True),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
sa.Column('date_updated', sa.DateTime),
|
||||
)
|
||||
|
||||
op.create_table(
|
||||
'chain_sync_filter',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True),
|
||||
sa.Column('flags', sa.LargeBinary, nullable=True),
|
||||
sa.Column('flags_lock', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('flags_start', sa.LargeBinary, nullable=True),
|
||||
sa.Column('count', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('digest', sa.String(64), nullable=False),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('chain_sync_filter')
|
||||
op.drop_table('chain_sync')
|
||||
17
chainsyncer/db/migrations/default/versions/src/sync_tx.py
Normal file
17
chainsyncer/db/migrations/default/versions/src/sync_tx.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'chain_sync_tx',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('blockchain', sa.String, nullable=False),
|
||||
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=False),
|
||||
sa.Column('flags', sa.LargeBinary, nullable=True),
|
||||
sa.Column('block', sa.Integer, nullable=False),
|
||||
sa.Column('tx', sa.Integer, nullable=False),
|
||||
)
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('chain_sync_tx')
|
||||
37
chainsyncer/db/migrations/sqlalchemy.py
Normal file
37
chainsyncer/db/migrations/sqlalchemy.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.tags.sync import
|
||||
upgrade as upgrade_sync,
|
||||
downgrade as downgrade_sync,
|
||||
)
|
||||
|
||||
from chainsyncer.db.migrations.default.versions.tags.sync_tx import
|
||||
upgrade as upgrade_sync_tx,
|
||||
downgrade as downgrade_sync_tx,
|
||||
)
|
||||
|
||||
def chainsyncer_upgrade(major=0, minor=0, patch=3):
|
||||
r0_0_1_u()
|
||||
if patch >= 3:
|
||||
r0_0_3_u()
|
||||
|
||||
def chainsyncer_downgrade(major=0, minor=0, patch=3):
|
||||
if patch >= 3:
|
||||
r0_0_3_d()
|
||||
r0_0_1_d()
|
||||
|
||||
def r0_0_1_u():
|
||||
upgrade_sync()
|
||||
|
||||
def r0_0_1_d():
|
||||
downgrade_sync()
|
||||
|
||||
|
||||
# 0.0.3
|
||||
|
||||
def r0_0_3_u():
|
||||
upgrade_sync_tx()
|
||||
|
||||
def r0_0_3_d():
|
||||
downgrade_sync_tx()
|
||||
0
chainsyncer/db/models/__init__.py
Normal file
0
chainsyncer/db/models/__init__.py
Normal file
161
chainsyncer/db/models/base.py
Normal file
161
chainsyncer/db/models/base.py
Normal file
@@ -0,0 +1,161 @@
|
||||
# stanard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
from sqlalchemy import Column, Integer
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import (
|
||||
StaticPool,
|
||||
QueuePool,
|
||||
AssertionPool,
|
||||
NullPool,
|
||||
)
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
Model = declarative_base(name='Model')
|
||||
|
||||
CONNECTION_OVERFLOW_FACTOR = 3
|
||||
CONNECTION_RECYCLE_AFTER = 60
|
||||
|
||||
|
||||
class SessionBase(Model):
|
||||
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
|
||||
"""
|
||||
__abstract__ = True
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
engine = None
|
||||
"""Database connection engine of the running aplication"""
|
||||
sessionmaker = None
|
||||
"""Factory object responsible for creating sessions from the connection pool"""
|
||||
transactional = True
|
||||
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
|
||||
poolable = True
|
||||
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
|
||||
procedural = True
|
||||
"""Whether the database backend supports stored procedures"""
|
||||
localsessions = {}
|
||||
"""Contains dictionary of sessions initiated by db model components"""
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create_session():
|
||||
"""Creates a new database session.
|
||||
"""
|
||||
return SessionBase.sessionmaker()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _set_engine(engine):
|
||||
"""Sets the database engine static property
|
||||
|
||||
:param engine: The sqlalchemy engine
|
||||
:type engine: sqlalchemy.engine.Engine
|
||||
"""
|
||||
SessionBase.engine = engine
|
||||
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def connect(dsn, pool_size=16, debug=False):
|
||||
"""Create new database connection engine and connect to database backend.
|
||||
|
||||
The pool_size argument controls the behavior of the connection pool.
|
||||
|
||||
If the pool_size is greater than 1, and the engine has connection pool settings, The connection pool will be set up with the given number of connections. By default, it allows for 3x connection overflow (CONNECTION_OVERFLOW_FACTOR), and connection recycling after 60 seconds of inactivity (CONNECTION_RECYCLE_AFTER).
|
||||
|
||||
If the pool_size is 1 and debug mode is off, the StaticPool class (single connection pool) will be used. If debug is on, AssertionPool will be used (which raises assertionerror if more than a single connection is attempted at any one time by the process).
|
||||
|
||||
If the underlying engine does not have pooling capabilities, the pool_size parameter toggles the connection class used. If pool_size is set to 0, the NullPool will be used (build a new connection for every session). If pool_size is set to a positive number, the StaticPool will be used, keeping a single connection for all sessions.
|
||||
|
||||
:param dsn: DSN string defining connection
|
||||
:type dsn: str
|
||||
:param pool_size: Size of connection pool
|
||||
:type pool_size: int
|
||||
:param debug: Activate sql debug mode (outputs sql statements)
|
||||
:type debug: bool
|
||||
"""
|
||||
e = None
|
||||
if SessionBase.poolable:
|
||||
poolclass = QueuePool
|
||||
if pool_size > 1:
|
||||
e = create_engine(
|
||||
dsn,
|
||||
max_overflow=pool_size * CONNECTION_OVERFLOW_FACTOR,
|
||||
pool_pre_ping=True,
|
||||
pool_size=pool_size,
|
||||
pool_recycle=CONNECTION_RECYCLE_AFTER,
|
||||
poolclass=poolclass,
|
||||
echo=debug,
|
||||
)
|
||||
else:
|
||||
if debug:
|
||||
poolclass = AssertionPool
|
||||
else:
|
||||
poolclass = StaticPool
|
||||
|
||||
e = create_engine(
|
||||
dsn,
|
||||
poolclass=poolclass,
|
||||
echo=debug,
|
||||
)
|
||||
else:
|
||||
pool_class = StaticPool
|
||||
if pool_size < 1:
|
||||
pool_class = NullPool
|
||||
e = create_engine(
|
||||
dsn,
|
||||
poolclass=pool_class,
|
||||
echo=debug,
|
||||
)
|
||||
|
||||
SessionBase._set_engine(e)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def disconnect():
|
||||
"""Disconnect from database and free resources.
|
||||
"""
|
||||
SessionBase.engine.dispose()
|
||||
SessionBase.engine = None
|
||||
|
||||
|
||||
@staticmethod
|
||||
def bind_session(session=None):
|
||||
"""Convenience function to enforce database session responsilibity in call stacks where it is unclear which layer will create a database session.
|
||||
|
||||
If the session argument is None, the method will create and return a new database session. A reference to the database session will be statically stored in the SessionBase class, and must be explicitly released with release_session.
|
||||
|
||||
When an existing session in passed as the argument, this method simply returns back the same session.
|
||||
|
||||
:param session: An sqlalchemy session
|
||||
:type session: session.orm.Session
|
||||
:rtype: session.orm.Session
|
||||
:returns: An sqlalchemy session
|
||||
"""
|
||||
localsession = session
|
||||
if localsession == None:
|
||||
localsession = SessionBase.create_session()
|
||||
localsession_key = str(id(localsession))
|
||||
logg.debug('creating new session {}'.format(localsession_key))
|
||||
SessionBase.localsessions[localsession_key] = localsession
|
||||
return localsession
|
||||
|
||||
|
||||
@staticmethod
|
||||
def release_session(session):
|
||||
"""Checks if a reference to the given session exists in the SessionBase session store, and if it does commits the transaction and closes the session.
|
||||
|
||||
:param session: An sqlalchemy session
|
||||
:type session: session.orm.Session
|
||||
"""
|
||||
session_key = str(id(session))
|
||||
if SessionBase.localsessions.get(session_key) != None:
|
||||
logg.debug('commit and destroy session {}'.format(session_key))
|
||||
session.commit()
|
||||
session.close()
|
||||
del SessionBase.localsessions[session_key]
|
||||
166
chainsyncer/db/models/filter.py
Normal file
166
chainsyncer/db/models/filter.py
Normal file
@@ -0,0 +1,166 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import hashlib
|
||||
|
||||
# external imports
|
||||
from sqlalchemy import Column, String, Integer, LargeBinary, ForeignKey
|
||||
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
|
||||
|
||||
# local imports
|
||||
from .base import SessionBase
|
||||
from .sync import BlockchainSync
|
||||
from chainsyncer.error import LockError
|
||||
|
||||
zero_digest = bytes(32).hex()
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BlockchainSyncFilter(SessionBase):
|
||||
"""Sync filter sql backend database interface.
|
||||
|
||||
:param chain_sync: BlockchainSync object to use as context for filter
|
||||
:type chain_sync: chainsyncer.db.models.sync.BlockchainSync
|
||||
:param count: Number of filters to track
|
||||
:type count: int
|
||||
:param flags: Filter flag value to instantiate record with
|
||||
:type flags: int
|
||||
:param digest: Filter digest as integrity protection when resuming session, 256 bits, in hex
|
||||
:type digest: str
|
||||
"""
|
||||
|
||||
__tablename__ = 'chain_sync_filter'
|
||||
|
||||
chain_sync_id = Column(Integer, ForeignKey('chain_sync.id'))
|
||||
flags_start = Column(LargeBinary)
|
||||
flags = Column(LargeBinary)
|
||||
flags_lock = Column(Integer)
|
||||
digest = Column(String(64))
|
||||
count = Column(Integer)
|
||||
|
||||
|
||||
def __init__(self, chain_sync, count=0, flags=None, digest=None):
|
||||
if digest == None:
|
||||
digest = zero_digest
|
||||
self.digest = digest
|
||||
self.count = count
|
||||
|
||||
if flags == None:
|
||||
flags = bytearray(0)
|
||||
else:
|
||||
bytecount = int((count - 1) / 8 + 1)
|
||||
flags = flags.to_bytes(bytecount, 'big')
|
||||
self.flags_start = flags
|
||||
self.flags = flags
|
||||
self.flags_lock = 0
|
||||
|
||||
self.chain_sync_id = chain_sync.id
|
||||
|
||||
|
||||
@staticmethod
|
||||
def load(sync_id, session=None):
|
||||
q = session.query(BlockchainSyncFilter)
|
||||
q = q.filter(BlockchainSyncFilter.chain_sync_id==sync_id)
|
||||
o = q.first()
|
||||
if o.is_locked():
|
||||
raise LockError('locked state for flag {} of sync id {} must be manually resolved'.format(o.flags_lock))
|
||||
|
||||
|
||||
def add(self, name):
|
||||
"""Add a new filter to the syncer record.
|
||||
|
||||
The name of the filter is hashed with the current aggregated hash sum of previously added filters.
|
||||
|
||||
:param name: Filter informal name
|
||||
:type name: str
|
||||
"""
|
||||
h = hashlib.new('sha256')
|
||||
h.update(bytes.fromhex(self.digest))
|
||||
h.update(name.encode('utf-8'))
|
||||
z = h.digest()
|
||||
|
||||
old_byte_count = int((self.count - 1) / 8 + 1)
|
||||
new_byte_count = int((self.count) / 8 + 1)
|
||||
|
||||
if old_byte_count != new_byte_count:
|
||||
self.flags = bytearray(1) + self.flags
|
||||
self.count += 1
|
||||
self.digest = z.hex()
|
||||
|
||||
|
||||
def start(self):
|
||||
"""Retrieve the initial filter state of the syncer.
|
||||
|
||||
:rtype: tuple
|
||||
:returns: Filter flag value, filter count, filter digest
|
||||
"""
|
||||
return (int.from_bytes(self.flags_start, 'big'), self.count, self.digest)
|
||||
|
||||
|
||||
def cursor(self):
|
||||
"""Retrieve the current filter state of the syncer.
|
||||
|
||||
:rtype: tuple
|
||||
:returns: Filter flag value, filter count, filter digest
|
||||
"""
|
||||
return (int.from_bytes(self.flags, 'big'), self.count, self.digest)
|
||||
|
||||
|
||||
def target(self):
|
||||
"""Retrieve the target filter state of the syncer.
|
||||
|
||||
The target filter value will be the integer value when all bits are set for the filter count.
|
||||
|
||||
:rtype: tuple
|
||||
:returns: Filter flag value, filter count, filter digest
|
||||
"""
|
||||
|
||||
n = 0
|
||||
for i in range(self.count):
|
||||
n |= (1 << self.count) - 1
|
||||
return (n, self.count, self.digest)
|
||||
|
||||
|
||||
def is_locked(self):
|
||||
return self.flags_lock > 0
|
||||
|
||||
|
||||
def clear(self):
|
||||
"""Set current filter flag value to zero.
|
||||
"""
|
||||
if self.is_locked():
|
||||
raise LockError('flag clear attempted when lock set at {}'.format(self.flags_lock))
|
||||
|
||||
self.flags = bytearray(len(self.flags))
|
||||
|
||||
|
||||
def set(self, n):
|
||||
"""Set the filter flag at given index.
|
||||
|
||||
:param n: Filter flag index
|
||||
:type n: int
|
||||
:raises IndexError: Invalid flag index
|
||||
:raises AttributeError: Flag at index already set
|
||||
"""
|
||||
if self.is_locked():
|
||||
raise LockError('flag set attempted when lock set at {}'.format(self.flags_lock))
|
||||
|
||||
if n > self.count:
|
||||
raise IndexError('bit flag out of range')
|
||||
|
||||
self.flags_lock = n
|
||||
|
||||
b = 1 << (n % 8)
|
||||
i = int(n / 8)
|
||||
byte_idx = len(self.flags)-1-i
|
||||
if (self.flags[byte_idx] & b) > 0:
|
||||
raise AttributeError('Filter bit already set')
|
||||
flags = bytearray(self.flags)
|
||||
flags[byte_idx] |= b
|
||||
self.flags = flags
|
||||
|
||||
|
||||
def release(self, check_bit=0):
|
||||
if check_bit > 0:
|
||||
if self.flags_lock > 0 and self.flags_lock != check_bit:
|
||||
raise LockError('release attemped on explicit bit {}, but bit {} was locked'.format(check_bit, self.flags_lock))
|
||||
self.flags_lock = 0
|
||||
202
chainsyncer/db/models/sync.py
Normal file
202
chainsyncer/db/models/sync.py
Normal file
@@ -0,0 +1,202 @@
|
||||
# standard imports
|
||||
import datetime
|
||||
|
||||
# third-party imports
|
||||
from sqlalchemy import Column, String, Integer, DateTime, Text, Boolean
|
||||
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
|
||||
|
||||
# local imports
|
||||
from .base import SessionBase
|
||||
|
||||
|
||||
class BlockchainSync(SessionBase):
|
||||
"""Syncer control backend.
|
||||
|
||||
:param chain_str: Chain spec string representation
|
||||
:type chain_str: str
|
||||
:param block_start: Block number to start sync from
|
||||
:type block_start: number
|
||||
:param tx_start: Block transaction number to start sync from
|
||||
:type tx_start: number
|
||||
:param block_target: Block number to sync until, inclusive
|
||||
:type block_target: number
|
||||
"""
|
||||
__tablename__ = 'chain_sync'
|
||||
|
||||
blockchain = Column(String)
|
||||
"""Chainspec string specifying the blockchain the syncer is running against."""
|
||||
block_start = Column(Integer)
|
||||
"""The block height at the start of syncer."""
|
||||
tx_start = Column(Integer)
|
||||
"""The transaction index at the start of syncer."""
|
||||
block_cursor = Column(Integer)
|
||||
"""The block height for the current state of the syncer."""
|
||||
tx_cursor = Column(Integer)
|
||||
"""The transaction index for the current state of the syncer."""
|
||||
block_target = Column(Integer)
|
||||
"""The block height at which the syncer should terminate. Will be None for an open-ended syncer."""
|
||||
date_created = Column(DateTime, default=datetime.datetime.utcnow)
|
||||
"""Datetime when syncer was first created."""
|
||||
date_updated = Column(DateTime)
|
||||
"""Datetime of the latest update of the syncer state."""
|
||||
|
||||
def __init__(self, chain_str, block_start, tx_start, block_target=None):
|
||||
self.blockchain = chain_str
|
||||
self.block_start = block_start
|
||||
self.tx_start = tx_start
|
||||
self.block_cursor = block_start
|
||||
self.tx_cursor = tx_start
|
||||
self.block_target = block_target
|
||||
self.date_created = datetime.datetime.utcnow()
|
||||
self.date_updated = datetime.datetime.utcnow()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def first(chain_str, session=None):
|
||||
"""Check if a sync session for the specified chain already exists.
|
||||
|
||||
:param chain_str: Chain spec string representation
|
||||
:type chain_str: str
|
||||
:param session: Session to use. If not specified, a separate session will be created for this method only.
|
||||
:type session: sqlalchemy.orm.session.Sessoin
|
||||
:returns: Database primary key id of sync record, or None if insert failed
|
||||
:rtype: number
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = session.query(BlockchainSync.id)
|
||||
q = q.filter(BlockchainSync.blockchain==chain_str)
|
||||
o = q.first()
|
||||
|
||||
if o == None:
|
||||
SessionBase.release_session(session)
|
||||
return None
|
||||
|
||||
sync_id = o.id
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return sync_id
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_last(session=None, live=True):
|
||||
"""Get the most recent syncer record.
|
||||
|
||||
If live is set, only the latest open-ended syncer will be returned.
|
||||
|
||||
:param session: Session to use. If not specified, a separate session will be created for this method only.
|
||||
:type session: SqlAlchemy Session
|
||||
:param live: Match only open-ended syncers
|
||||
:type live: bool
|
||||
:returns: Syncer database id
|
||||
:rtype: int
|
||||
"""
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = session.query(BlockchainSync.id)
|
||||
if live:
|
||||
q = q.filter(BlockchainSync.block_target==None)
|
||||
else:
|
||||
q = q.filter(BlockchainSync.block_target!=None)
|
||||
q = q.order_by(BlockchainSync.date_created.desc())
|
||||
object_id = q.first()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
if object_id == None:
|
||||
return None
|
||||
|
||||
return object_id[0]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_unsynced(session=None):
|
||||
"""Get previous bounded sync sessions that did not complete.
|
||||
|
||||
:param session: Session to use. If not specified, a separate session will be created for this method only.
|
||||
:type session: SqlAlchemy Session
|
||||
:returns: Syncer database ids
|
||||
:rtype: list
|
||||
"""
|
||||
unsynced = []
|
||||
local_session = False
|
||||
if session == None:
|
||||
session = SessionBase.create_session()
|
||||
local_session = True
|
||||
q = session.query(BlockchainSync.id)
|
||||
q = q.filter(BlockchainSync.block_target!=None)
|
||||
q = q.filter(BlockchainSync.block_cursor<BlockchainSync.block_target)
|
||||
q = q.order_by(BlockchainSync.date_created.asc())
|
||||
for u in q.all():
|
||||
unsynced.append(u[0])
|
||||
if local_session:
|
||||
session.close()
|
||||
|
||||
return unsynced
|
||||
|
||||
|
||||
def set(self, block_height, tx_height):
|
||||
"""Set the cursor height of the syncer instance.
|
||||
|
||||
Only manipulates object, does not transaction or commit to backend.
|
||||
|
||||
:param block_height: Block number
|
||||
:type block_height: number
|
||||
:param tx_height: Block transaction number
|
||||
:type tx_height: number
|
||||
:rtype: tuple
|
||||
:returns: Stored block height, transaction index
|
||||
"""
|
||||
self.block_cursor = block_height
|
||||
self.tx_cursor = tx_height
|
||||
self.date_updated = datetime.datetime.utcnow()
|
||||
return (self.block_cursor, self.tx_cursor,)
|
||||
|
||||
|
||||
def cursor(self):
|
||||
"""Get current state of cursor from cached instance.
|
||||
|
||||
:returns: Block height, transaction index
|
||||
:rtype: tuple
|
||||
"""
|
||||
return (self.block_cursor, self.tx_cursor)
|
||||
|
||||
|
||||
def start(self):
|
||||
"""Get sync block start position from cached instance.
|
||||
|
||||
:returns: Block height, transaction index
|
||||
:rtype: tuple
|
||||
"""
|
||||
return (self.block_start, self.tx_start)
|
||||
|
||||
|
||||
def target(self):
|
||||
"""Get sync block upper bound from cached instance.
|
||||
|
||||
:returns: Block number. Returns None if syncer is open-ended.
|
||||
:rtype: int
|
||||
"""
|
||||
return self.block_target
|
||||
|
||||
|
||||
def chain(self):
|
||||
"""Get chain string representation for which the cached instance represents.
|
||||
"""
|
||||
return self.blockchain
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return """object_id: {}
|
||||
start: {}:{}
|
||||
cursor: {}:{}
|
||||
target: {}
|
||||
""".format(
|
||||
self.id,
|
||||
self.block_start,
|
||||
self.tx_start,
|
||||
self.block_cursor,
|
||||
self.tx_cursor,
|
||||
self.block_target,
|
||||
)
|
||||
Reference in New Issue
Block a user