Lash/add filter

This commit is contained in:
Louis Holbrook
2021-04-04 13:03:58 +00:00
parent 82e2674555
commit 673d80774a
17 changed files with 641 additions and 130 deletions

View File

@@ -0,0 +1,36 @@
from alembic import op
import sqlalchemy as sa
def chainsyncer_upgrade(major, minor, patch):
r0_0_1_u()
def chainsyncer_downgrade(major, minor, patch):
r0_0_1_d()
def r0_0_1_u():
op.create_table(
'chain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.create_table(
'chain_sync_filter',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True),
sa.Column('flags', sa.LargeBinary, nullable=True),
sa.Column('flags_start', sa.LargeBinary, nullable=True),
sa.Column('count', sa.Integer, nullable=False, default=0),
sa.Column('digest', sa.String(64), nullable=False),
)
def r0_0_1_d():
op.drop_table('chain_sync_filter')
op.drop_table('chain_sync')

View File

@@ -1,8 +1,18 @@
# stanard imports
import logging
# third-party imports
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
)
logg = logging.getLogger()
Model = declarative_base(name='Model')
@@ -21,7 +31,11 @@ class SessionBase(Model):
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
@@ -40,7 +54,7 @@ class SessionBase(Model):
@staticmethod
def connect(dsn, debug=False):
def connect(dsn, pool_size=8, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
@@ -48,14 +62,28 @@ class SessionBase(Model):
"""
e = None
if SessionBase.poolable:
e = create_engine(
dsn,
max_overflow=50,
pool_pre_ping=True,
pool_size=20,
pool_recycle=10,
echo=debug,
)
poolclass = QueuePool
if pool_size > 1:
e = create_engine(
dsn,
max_overflow=pool_size*3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
e = create_engine(
dsn,
@@ -71,3 +99,24 @@ class SessionBase(Model):
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session.flush()
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()

View File

@@ -1,40 +1,88 @@
# standard imports
import logging
import hashlib
# third-party imports
from sqlalchemy import Column, String, Integer, BLOB
# external imports
from sqlalchemy import Column, String, Integer, LargeBinary, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
from .sync import BlockchainSync
zero_digest = '{:<064s'.format('0')
zero_digest = bytes(32).hex()
logg = logging.getLogger(__name__)
class BlockchainSyncFilter(SessionBase):
__tablename__ = 'chain_sync_filter'
chain_sync_id = Column(Integer, ForeignKey='chain_sync.id')
flags = Column(BLOB)
digest = Column(String)
chain_sync_id = Column(Integer, ForeignKey('chain_sync.id'))
flags_start = Column(LargeBinary)
flags = Column(LargeBinary)
digest = Column(String(64))
count = Column(Integer)
@staticmethod
def set(self, names):
def __init__(self, names, chain_sync, digest=None):
if len(names) == 0:
digest = zero_digest
elif digest == None:
h = hashlib.new('sha256')
for n in names:
h.update(n.encode('utf-8') + b'\x00')
z = h.digest()
digest = z.hex()
def __init__(self, chain_sync, count=0, flags=None, digest=zero_digest):
self.digest = digest
self.count = len(names)
self.flags = bytearray((len(names) -1 ) / 8 + 1)
self.count = count
if flags == None:
flags = bytearray(0)
else: # TODO: handle bytes too
bytecount = int((count - 1) / 8 + 1)
flags = flags.to_bytes(bytecount, 'big')
self.flags_start = flags
self.flags = flags
self.chain_sync_id = chain_sync.id
def add(self, name):
h = hashlib.new('sha256')
h.update(bytes.fromhex(self.digest))
h.update(name.encode('utf-8'))
z = h.digest()
old_byte_count = int((self.count - 1) / 8 + 1)
new_byte_count = int((self.count) / 8 + 1)
if old_byte_count != new_byte_count:
self.flags = bytearray(1) + self.flags
self.count += 1
self.digest = z.hex()
def start(self):
return (int.from_bytes(self.flags_start, 'big'), self.count, self.digest)
def cursor(self):
return (int.from_bytes(self.flags, 'big'), self.count, self.digest)
def target(self):
n = 0
for i in range(self.count):
n |= (1 << self.count) - 1
return (n, self.count, self.digest)
def clear(self):
self.flags = bytearray(len(self.flags))
def set(self, n):
if n > self.count:
raise IndexError('bit flag out of range')
b = 1 << (n % 8)
i = int(n / 8)
byte_idx = len(self.flags)-1-i
if (self.flags[byte_idx] & b) > 0:
raise AttributeError('Filter bit already set')
flags = bytearray(self.flags)
flags[byte_idx] |= b
self.flags = flags

View File

@@ -41,47 +41,51 @@ class BlockchainSync(SessionBase):
:type chain: str
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
:returns: True if sync record found
:rtype: bool
:returns: Database primary key id of sync record
:rtype: number|None
"""
local_session = False
if session == None:
session = SessionBase.create_session()
local_session = True
session = SessionBase.bind_session(session)
q = session.query(BlockchainSync.id)
q = q.filter(BlockchainSync.blockchain==chain)
o = q.first()
if local_session:
session.close()
return o == None
if o == None:
SessionBase.release_session(session)
return None
sync_id = o.id
SessionBase.release_session(session)
return sync_id
@staticmethod
def get_last_live_height(current, session=None):
def get_last(session=None, live=True):
"""Get the most recent open-ended ("live") syncer record.
:param current: Current block number
:type current: number
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
:returns: Block and transaction number, respectively
:rtype: tuple
"""
local_session = False
if session == None:
session = SessionBase.create_session()
local_session = True
q = session.query(BlockchainSync)
q = q.filter(BlockchainSync.block_target==None)
session = SessionBase.bind_session(session)
q = session.query(BlockchainSync.id)
if live:
q = q.filter(BlockchainSync.block_target==None)
else:
q = q.filter(BlockchainSync.block_target!=None)
q = q.order_by(BlockchainSync.date_created.desc())
o = q.first()
if local_session:
session.close()
object_id = q.first()
if o == None:
return (0, 0)
SessionBase.release_session(session)
return (o.block_cursor, o.tx_cursor)
if object_id == None:
return None
return object_id[0]
@staticmethod
@@ -122,6 +126,8 @@ class BlockchainSync(SessionBase):
"""
self.block_cursor = block_height
self.tx_cursor = tx_height
self.date_updated = datetime.datetime.utcnow()
return (self.block_cursor, self.tx_cursor,)
def cursor(self):
@@ -165,4 +171,21 @@ class BlockchainSync(SessionBase):
self.tx_cursor = tx_start
self.block_target = block_target
self.date_created = datetime.datetime.utcnow()
self.date_modified = datetime.datetime.utcnow()
self.date_updated = datetime.datetime.utcnow()
def __str__(self):
return """object_id: {}
start: {}:{}
cursor: {}:{}
target: {}
""".format(
self.id,
self.block_start,
self.tx_start,
self.block_cursor,
self.tx_cursor,
self.block_target,
)