Merge branch 'dev-0.3.0' into lash/shep

This commit is contained in:
lash 2022-03-17 10:12:00 +00:00
commit 58e983efcc
33 changed files with 2678 additions and 13 deletions

View File

@ -1,4 +0,0 @@
- 0.3.0
* Re-implement filter state handling on shep
- 0.1.0
* Add deferred idle handling

View File

@ -0,0 +1,60 @@
# standard imports
import logging
logg = logging.getLogger(__name__)
class Backend:
"""Base class for syncer state backend.
:param flags_reversed: If set, filter flags are interpreted from left to right
:type flags_reversed: bool
"""
def __init__(self, object_id, flags_reversed=False):
self.object_id = object_id
self.filter_count = 0
self.flags_reversed = flags_reversed
self.block_height_offset = 0
self.tx_index_offset = 0
self.block_height_cursor = 0
self.tx_index_cursor = 0
self.block_height_target = 0
self.tx_index_target = 0
def check_filter(self, n, flags):
"""Check whether an individual filter flag is set.
:param n: Bit index
:type n: int
:param flags: Bit field to check against
:type flags: int
:rtype: bool
:returns: True if set
"""
if self.flags_reversed:
try:
v = 1 << flags.bit_length() - 1
return (v >> n) & flags > 0
except ValueError:
pass
return False
return flags & (1 << n) > 0
def chain(self):
"""Returns chain spec for syncer.
:returns: Chain spec
:rtype chain_spec: cic_registry.chain.ChainSpec
"""
return self.chain_spec
def __str__(self):
return "syncerbackend {} chain {} start {} target {}".format(self.object_id, self.chain(), self.start(), self.target())

472
chainsyncer/backend/file.py Normal file
View File

@ -0,0 +1,472 @@
# standard imports
import os
import uuid
import shutil
import logging
# local imports
from .base import Backend
logg = logging.getLogger().getChild(__name__)
BACKEND_BASE_DIR = '/var/lib'
def chain_dir_for(chain_spec, base_dir=BACKEND_BASE_DIR):
"""Retrieve file backend directory for the given chain spec.
:param chain_spec: Chain spec context of backend
:type chain_spec: chainlib.chain.ChainSpec
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
:type base_dir: str
:rtype: str
:returns: Absolute path of chain backend directory
"""
base_data_dir = os.path.join(base_dir, 'chainsyncer')
return os.path.join(base_data_dir, str(chain_spec).replace(':', '/'))
def data_dir_for(chain_spec, object_id, base_dir=BACKEND_BASE_DIR):
"""Retrieve file backend directory for the given syncer.
:param chain_spec: Chain spec context of backend
:type chain_spec: chainlib.chain.ChainSpec
:param object_id: Syncer id
:type object_id: str
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
:type base_dir: str
:rtype: str
:returns: Absolute path of chain backend directory
"""
chain_dir = chain_dir_for(chain_spec, base_dir=base_dir)
return os.path.join(chain_dir, object_id)
class FileBackend(Backend):
"""Filesystem backend implementation for syncer state.
FileBackend uses reverse order of filter flags.
:param chain_spec: Chain spec for the chain that syncer is running for.
:type chain_spec: cic_registry.chain.ChainSpec
:param object_id: Unique id for the syncer session.
:type object_id: str
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
:type base_dir: str
"""
__warned = False
def __init__(self, chain_spec, object_id, base_dir=BACKEND_BASE_DIR):
if not FileBackend.__warned:
logg.warning('file backend for chainsyncer is experimental and not yet guaranteed to handle interrupted filter execution.')
FileBackend.__warned = True
super(FileBackend, self).__init__(object_id, flags_reversed=True)
self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir)
self.object_id = object_id
self.db_object = None
self.db_object_filter = None
self.chain_spec = chain_spec
self.filter = b'\x00'
self.filter_names = []
if self.object_id != None:
self.connect()
self.disconnect()
@staticmethod
def create_object(chain_spec, object_id=None, base_dir=BACKEND_BASE_DIR):
"""Creates a new syncer session at the given backend destination.
:param chain_spec: Chain spec for the chain that syncer is running for.
:type chain_spec: cic_registry.chain.ChainSpec
:param object_id: Unique id for the syncer session.
:type object_id: str
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
:type base_dir: str
"""
if object_id == None:
object_id = str(uuid.uuid4())
object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir)
if os.path.isdir(object_data_dir):
raise FileExistsError(object_data_dir)
os.makedirs(object_data_dir)
object_id_path = os.path.join(object_data_dir, 'object_id')
f = open(object_id_path, 'wb')
f.write(object_id.encode('utf-8'))
f.close()
init_value = 0
b = init_value.to_bytes(16, byteorder='big')
offset_path = os.path.join(object_data_dir, 'offset')
f = open(offset_path, 'wb')
f.write(b)
f.close()
target_path = os.path.join(object_data_dir, 'target')
f = open(target_path, 'wb')
f.write(b'\x00' * 16)
f.close()
cursor_path = os.path.join(object_data_dir, 'cursor')
f = open(cursor_path, 'wb')
f.write(b'\x00' * 16)
f.close()
cursor_path = os.path.join(object_data_dir, 'filter')
f = open(cursor_path, 'wb')
f.write(b'\x00' * 9)
f.close()
filter_name_path = os.path.join(object_data_dir, 'filter_name')
f = open(filter_name_path, 'wb')
f.write(b'')
f.close()
return object_id
def load(self):
"""Loads the state of the syncer at the given location of the instance.
:raises FileNotFoundError: Invalid data directory
:raises IsADirectoryError: Invalid data directory
"""
offset_path = os.path.join(self.object_data_dir, 'offset')
f = open(offset_path, 'rb')
b = f.read(16)
f.close()
self.block_height_offset = int.from_bytes(b[:8], byteorder='big')
self.tx_index_offset = int.from_bytes(b[8:], byteorder='big')
target_path = os.path.join(self.object_data_dir, 'target')
f = open(target_path, 'rb')
b = f.read(16)
f.close()
self.block_height_target = int.from_bytes(b[:8], byteorder='big')
self.tx_index_target = int.from_bytes(b[8:], byteorder='big')
cursor_path = os.path.join(self.object_data_dir, 'cursor')
f = open(cursor_path, 'rb')
b = f.read(16)
f.close()
self.block_height_cursor = int.from_bytes(b[:8], byteorder='big')
self.tx_index_cursor = int.from_bytes(b[8:], byteorder='big')
filter_path = os.path.join(self.object_data_dir, 'filter')
f = open(filter_path, 'rb')
b = f.read(8)
self.filter_count = int.from_bytes(b, byteorder='big')
filter_count_bytes = int((self.filter_count - 1) / 8 + 1)
if filter_count_bytes > 0:
self.filter = f.read(filter_count_bytes)
f.close()
filter_name_path = filter_path + '_name'
f = open(filter_name_path, 'r')
while True:
s = f.readline().rstrip()
if len(s) == 0:
break
self.filter_names.append(s)
f.close()
def connect(self):
"""Proxy for chainsyncer.backend.file.FileBackend.load that performs a basic sanity check for instance's backend location.
:raises ValueError: Sanity check failed
"""
object_path = os.path.join(self.object_data_dir, 'object_id')
f = open(object_path, 'r')
object_id = f.read()
f.close()
if object_id != self.object_id:
raise ValueError('data corruption in store for id {}'.format(object_id))
self.load()
def disconnect(self):
"""FileBackend applies no actual connection, so this is noop
"""
pass
def purge(self):
"""Remove syncer state from backend.
"""
shutil.rmtree(self.object_data_dir)
def get(self):
"""Get the current state of the syncer cursor.
:rtype: tuple
:returns: Block height / tx index tuple, and filter flags value
"""
logg.debug('filter {}'.format(self.filter.hex()))
return ((self.block_height_cursor, self.tx_index_cursor), self.get_flags())
def get_flags(self):
"""Get canonical representation format of flags.
:rtype: int
:returns: Filter flag bitfield value
"""
return int.from_bytes(self.filter, 'little')
def set(self, block_height, tx_index):
"""Update the state of the syncer cursor.
:param block_height: New block height
:type block_height: int
:param tx_height: New transaction height in block
:type tx_height: int
:returns: Block height / tx index tuple, and filter flags value
:rtype: tuple
"""
self.__set(block_height, tx_index, 'cursor')
# cursor_path = os.path.join(self.object_data_dir, 'filter')
# f = open(cursor_path, 'r+b')
# f.seek(8)
# l = len(self.filter)
# c = 0
# while c < l:
# c += f.write(self.filter[c:])
# f.close()
return ((self.block_height_cursor, self.tx_index_cursor), self.get_flags())
def __set(self, block_height, tx_index, category):
cursor_path = os.path.join(self.object_data_dir, category)
block_height_bytes = block_height.to_bytes(8, byteorder='big')
tx_index_bytes = tx_index.to_bytes(8, byteorder='big')
f = open(cursor_path, 'wb')
b = f.write(block_height_bytes)
b = f.write(tx_index_bytes)
f.close()
setattr(self, 'block_height_' + category, block_height)
setattr(self, 'tx_index_' + category, tx_index)
@staticmethod
def initial(chain_spec, target_block_height, start_block_height=0, base_dir=BACKEND_BASE_DIR):
"""Creates a new syncer session and commit its initial state to backend.
:param chain_spec: Chain spec of chain that syncer is running for.
:type chain_spec: cic_registry.chain.ChainSpec
:param target_block_height: Target block height
:type target_block_height: int
:param start_block_height: Start block height
:type start_block_height: int
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
:type base_dir: str
:raises ValueError: Invalid start/target specification
:returns: New syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
if start_block_height >= target_block_height:
raise ValueError('start block height must be lower than target block height')
uu = FileBackend.create_object(chain_spec, base_dir=base_dir)
o = FileBackend(chain_spec, uu, base_dir=base_dir)
o.__set(target_block_height, 0, 'target')
o.__set(start_block_height, 0, 'offset')
o.__set(start_block_height, 0, 'cursor')
return o
@staticmethod
def live(chain_spec, block_height, base_dir=BACKEND_BASE_DIR):
"""Creates a new open-ended syncer session starting at the given block height.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Start block height
:type block_height: int
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
:type base_dir: str
:returns: "Live" syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
uu = FileBackend.create_object(chain_spec, base_dir=base_dir)
o = FileBackend(chain_spec, uu, base_dir=base_dir)
o.__set(block_height, 0, 'offset')
o.__set(block_height, 0, 'cursor')
return o
def target(self):
"""Get the target state (upper bound of sync) of the syncer cursor.
:returns: Block height and filter flags value
:rtype: tuple
"""
return (self.block_height_target, 0,)
def start(self):
"""Get the initial state of the syncer cursor.
:returns: Block height / tx index tuple, and filter flags value
:rtype: tuple
"""
return ((self.block_height_offset, self.tx_index_offset), 0,)
@staticmethod
def __sorted_entries(chain_spec, base_dir=BACKEND_BASE_DIR):
chain_dir = chain_dir_for(chain_spec, base_dir=base_dir)
entries = {}
for v in os.listdir(chain_dir):
d = os.path.realpath(os.path.join(chain_dir, v))
f = open(os.path.join(d, 'object_id'))
object_id = f.read()
f.close()
logg.debug('found syncer entry {} in {}'.format(object_id, d))
o = FileBackend(chain_spec, object_id, base_dir=base_dir)
entries[o.block_height_offset] = o
sorted_entries = []
for k in sorted(entries):
sorted_entries.append(entries[k])
return sorted_entries
@staticmethod
def resume(chain_spec, block_height, base_dir=BACKEND_BASE_DIR):
"""Retrieves and returns all previously unfinished syncer sessions.
If a previous open-ended syncer is found, a new syncer will be generated to sync from where that syncer left off until the block_height given as argument.
:param chain_spec: Chain spec of chain that syncer is running for
:type chain_spec: cic_registry.chain.ChainSpec
:param block_height: Target block height for previous live syncer
:type block_height: int
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
:type base_dir: str
:raises FileNotFoundError: Invalid backend location
:returns: Syncer objects of unfinished syncs
:rtype: list of cic_eth.db.models.BlockchainSync
"""
try:
return FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
except FileNotFoundError:
return []
@staticmethod
def first(chain_spec, base_dir=BACKEND_BASE_DIR):
"""Returns the model object of the most recent syncer in backend.
:param chain_spec: Chain spec of chain that syncer is running for.
:type chain_spec: cic_registry.chain.ChainSpec
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
:type base_dir: str
:returns: Last syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
entries = []
try:
entries = FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
except FileNotFoundError:
return entries
return entries[len(entries)-1]
# n is zero-index of bit field
def begin_filter(self, n, base_dir=BACKEND_BASE_DIR):
pass
# n is zero-index of bit field
def complete_filter(self, n, base_dir=BACKEND_BASE_DIR):
"""Sets the filter at the given index as completed.
:param n: Filter index, starting at zero
:type n: int
:raises IndexError: Index is outside filter count range
"""
if self.filter_count <= n:
raise IndexError('index {} out of ranger for filter size {}'.format(n, self.filter_count))
byte_pos = int(n / 8)
bit_pos = n % 8
byts = bytearray(self.filter)
b = (0x80 >> bit_pos)
b |= self.filter[byte_pos]
logg.debug('bbb {}'.format(type(b)))
byts[byte_pos] = b #b.to_bytes(1, byteorder='big')
self.filter = byts
filter_path = os.path.join(self.object_data_dir, 'filter')
f = open(filter_path, 'r+b')
f.seek(8 + byte_pos)
f.write(self.filter)
f.close()
def register_filter(self, name):
"""Add filter to backend.
Overwrites record on disk if manual changed members in struct
:param name: Name of filter
:type name: str
"""
filter_path = os.path.join(self.object_data_dir, 'filter')
if (self.filter_count + 1) % 8 == 0:
self.filter += b'\x00'
f = open(filter_path, 'a+b')
f.write(b'\x00')
f.close()
filter_name_path = filter_path + '_name'
f = open(filter_name_path, 'a')
f.write(name + '\n')
f.close()
self.filter_count += 1
f = open(filter_path, 'r+b')
b = self.filter_count.to_bytes(8, byteorder='big')
f.write(b)
f.close()
def reset_filter(self):
"""Reset all filter states.
"""
self.filter = b'\x00' * len(self.filter)
cursor_path = os.path.join(self.object_data_dir, 'filter')
f = open(cursor_path, 'r+b')
f.seek(8)
l = len(self.filter)
c = 0
while c < l:
c += f.write(self.filter[c:])
f.close()

View File

@ -0,0 +1,141 @@
# standard imports
import logging
import uuid
# local imports
from .base import Backend
logg = logging.getLogger(__name__)
class MemBackend(Backend):
"""Disposable syncer backend. Keeps syncer state in memory.
Filter bitfield is interpreted right to left.
:param chain_spec: Chain spec context of syncer
:type chain_spec: chainlib.chain.ChainSpec
:param object_id: Unique id for the syncer session.
:type object_id: str
:param target_block: Block height to terminate sync at
:type target_block: int
"""
def __init__(self, chain_spec, object_id):
super(MemBackend, self).__init__(object_id)
self.chain_spec = chain_spec
self.db_session = None
self.block_height_offset = 0
self.block_height_cursor = 0
self.tx_height_offset = 0
self.tx_height_cursor = 0
self.block_height_target = None
self.flags = 0
self.flags_start = 0
self.flags_target = 0
self.filter_names = []
@staticmethod
def custom(chain_spec, target_block, block_offset=0, tx_offset=0, flags=0, flags_count=0, *args, **kwargs):
object_id = kwargs.get('object_id', str(uuid.uuid4()))
backend = MemBackend(chain_spec, object_id)
backend.block_height_offset = block_offset
backend.block_height_cursor = block_offset
backend.tx_height_offset = tx_offset
backend.tx_height_cursor = tx_offset
backend.block_height_target = target_block
backend.flags = flags
backend.flags_count = flags_count
backend.flags_start = flags
flags_target = (2 ** flags_count) - 1
backend.flags_target = flags_target
return backend
def connect(self):
"""NOOP as memory backend implements no connection.
"""
pass
def disconnect(self):
"""NOOP as memory backend implements no connection.
"""
pass
def set(self, block_height, tx_height):
"""Set the syncer state.
:param block_height: New block height
:type block_height: int
:param tx_height: New transaction height in block
:type tx_height: int
"""
logg.debug('memory backend received {} {}'.format(block_height, tx_height))
self.block_height_cursor = block_height
self.tx_height_cursor = tx_height
def get(self):
"""Get the current syncer state
:rtype: tuple
:returns: block height / tx index tuple, and filter flags value
"""
return ((self.block_height_cursor, self.tx_height_cursor), self.flags)
def start(self):
"""Get the initial syncer state
:rtype: tuple
:returns: block height / tx index tuple, and filter flags value
"""
return ((self.block_height_offset, self.tx_height_offset), self.flags_start)
def target(self):
"""Returns the syncer target.
:rtype: tuple
:returns: block height / tx index tuple
"""
return (self.block_height_target, self.flags_target)
def register_filter(self, name):
"""Adds a filter identifier to the syncer.
:param name: Filter name
:type name: str
"""
self.filter_names.append(name)
self.filter_count += 1
def begin_filter(self, n):
"""Set filter at index as completed for the current block / tx state.
:param n: Filter index
:type n: int
"""
v = 1 << n
self.flags |= v
logg.debug('set filter {} {}'.format(self.filter_names[n], v))
def complete_filter(self, n):
pass
def reset_filter(self):
"""Set all filters to unprocessed for the current block / tx state.
"""
logg.debug('reset filters')
self.flags = 0
def __str__(self):
return "syncer membackend {} chain {} cursor {}".format(self.object_id, self.chain(), self.get())

367
chainsyncer/backend/sql.py Normal file
View File

@ -0,0 +1,367 @@
# standard imports
import logging
import uuid
# imports
from chainlib.chain import ChainSpec
# local imports
from chainsyncer.db.models.sync import BlockchainSync
from chainsyncer.db.models.filter import BlockchainSyncFilter
from chainsyncer.db.models.base import SessionBase
from .base import Backend
logg = logging.getLogger().getChild(__name__)
class SQLBackend(Backend):
"""Interface to block and transaction sync state.
:param chain_spec: Chain spec for the chain that syncer is running for.
:type chain_spec: cic_registry.chain.ChainSpec
:param object_id: Unique database record id for the syncer session.
:type object_id: int
"""
base = None
def __init__(self, chain_spec, object_id):
super(SQLBackend, self).__init__(int(object_id))
self.db_session = None
self.db_object = None
self.db_object_filter = None
self.chain_spec = chain_spec
self.connect()
self.disconnect()
@classmethod
def setup(cls, dsn, debug=False, pool_size=0, *args, **kwargs):
"""Set up database connection backend.
:param dsn: Database connection string
:type dsn: str
:param debug: Activate debug output in sql engine
:type debug: bool
:param pool_size: Size of transaction pool
:type pool_size: int
"""
if cls.base == None:
cls.base = SessionBase
cls.base.connect(dsn, debug=debug, pool_size=pool_size)
def connect(self):
"""Loads the state of the syncer session by the given database record id.
:raises ValueError: Database syncer object with given id does not exist
:rtype: sqlalchemy.orm.session.Session
:returns: Database session object
"""
if self.db_session == None:
self.db_session = SessionBase.create_session()
q = self.db_session.query(BlockchainSync)
q = q.filter(BlockchainSync.id==self.object_id)
self.db_object = q.first()
if self.db_object != None:
qtwo = self.db_session.query(BlockchainSyncFilter)
qtwo = qtwo.join(BlockchainSync)
qtwo = qtwo.filter(BlockchainSync.id==self.db_object.id)
self.db_object_filter = qtwo.first()
if self.db_object == None:
raise ValueError('sync entry with id {} not found'.format(self.object_id))
return self.db_session
def disconnect(self):
"""Commits state of sync to backend and frees connection resources.
"""
if self.db_session == None:
return
if self.db_object_filter != None:
self.db_session.add(self.db_object_filter)
self.db_session.add(self.db_object)
self.db_session.commit()
self.db_session.close()
self.db_session = None
def get(self):
"""Get the current state of the syncer cursor.
:rtype: tuple
:returns: Block height / tx index tuple, and filter flags value
"""
self.connect()
pair = self.db_object.cursor()
(filter_state, count, digest) = self.db_object_filter.cursor()
self.disconnect()
return (pair, filter_state,)
def set(self, block_height, tx_height):
"""Update the state of the syncer cursor.
:param block_height: New block height
:type block_height: int
:param tx_height: New transaction height in block
:type tx_height: int
:returns: Block height / tx index tuple, and filter flags value
:rtype: tuple
"""
self.connect()
pair = self.db_object.set(block_height, tx_height)
(filter_state, count, digest)= self.db_object_filter.cursor()
self.disconnect()
return (pair, filter_state,)
def start(self):
"""Get the initial state of the syncer cursor.
:returns: Block height / tx index tuple, and filter flags value
:rtype: tuple
"""
self.connect()
pair = self.db_object.start()
(filter_state, count, digest) = self.db_object_filter.start()
self.disconnect()
return (pair, filter_state,)
def target(self):
"""Get the target state (upper bound of sync) of the syncer cursor.
:returns: Block height and filter flags value
:rtype: tuple
"""
self.connect()
target = self.db_object.target()
(filter_target, count, digest) = self.db_object_filter.target()
self.disconnect()
return (target, filter_target,)
@staticmethod
def custom(chain_spec, target_block, block_offset=0, tx_offset=0, flags=0, flag_count=0, *args, **kwargs):
"""
:param flags: flags bit field
:type flags: bytes
:param flag_count: number of flags in bit field
:type flag_count:
"""
session = SessionBase.create_session()
o = BlockchainSync(str(chain_spec), block_offset, tx_offset, target_block)
session.add(o)
session.commit()
object_id = o.id
of = BlockchainSyncFilter(o, flag_count, flags, kwargs.get('flags_digest'))
session.add(of)
session.commit()
session.close()
return SQLBackend(chain_spec, object_id)
@staticmethod
def first(chain_spec):
"""Returns the model object of the most recent syncer in backend.
:param chain_spec: Chain spec of chain that syncer is running for.
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Last syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
object_id = BlockchainSync.first(str(chain_spec))
if object_id == None:
return None
return SQLBackend(chain_spec, object_id)
@staticmethod
def initial(chain_spec, target_block_height, start_block_height=0):
"""Creates a new syncer session and commit its initial state to backend.
:param chain_spec: Chain spec of chain that syncer is running for
:type chain_spec: cic_registry.chain.ChainSpec
:param target_block_height: Target block height
:type target_block_height: int
:param start_block_height: Start block height
:type start_block_height: int
:raises ValueError: Invalid start/target specification
:returns: New syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
if start_block_height >= target_block_height:
raise ValueError('start block height must be lower than target block height')
object_id = None
session = SessionBase.create_session()
o = BlockchainSync(str(chain_spec), start_block_height, 0, target_block_height)
session.add(o)
session.commit()
object_id = o.id
of = BlockchainSyncFilter(o)
session.add(of)
session.commit()
session.close()
return SQLBackend(chain_spec, object_id)
@staticmethod
def resume(chain_spec, block_height):
"""Retrieves and returns all previously unfinished syncer sessions.
If a previous open-ended syncer is found, a new syncer will be generated to sync from where that syncer left off until the block_height given as argument.
:param chain_spec: Chain spec of chain that syncer is running for
:type chain_spec: cic_registry.chain.ChainSpec
:param block_height: Target block height for previous live syncer
:type block_height: int
:returns: Syncer objects of unfinished syncs
:rtype: list of cic_eth.db.models.BlockchainSync
"""
syncers = []
session = SessionBase.create_session()
object_id = None
highest_unsynced_block = 0
highest_unsynced_tx = 0
object_id = BlockchainSync.get_last(session=session, live=False)
if object_id != None:
q = session.query(BlockchainSync)
o = q.get(object_id)
(highest_unsynced_block, highest_unsynced_index) = o.cursor()
object_ids = BlockchainSync.get_unsynced(session=session)
session.close()
for object_id in object_ids:
s = SQLBackend(chain_spec, object_id)
logg.debug('resume unfinished {}'.format(s))
syncers.append(s)
session = SessionBase.create_session()
last_live_id = BlockchainSync.get_last(session=session)
if last_live_id != None:
q = session.query(BlockchainSync)
o = q.get(last_live_id)
(block_resume, tx_resume) = o.cursor()
session.flush()
#if block_height != block_resume:
if highest_unsynced_block < block_resume:
q = session.query(BlockchainSyncFilter)
q = q.filter(BlockchainSyncFilter.chain_sync_id==last_live_id)
of = q.first()
(flags, count, digest) = of.cursor()
session.flush()
o = BlockchainSync(str(chain_spec), block_resume, tx_resume, block_height)
session.add(o)
session.flush()
object_id = o.id
of = BlockchainSyncFilter(o, count, flags, digest)
session.add(of)
session.commit()
backend = SQLBackend(chain_spec, object_id)
syncers.append(backend)
logg.debug('last live session resume {}'.format(backend))
session.close()
return syncers
@staticmethod
def live(chain_spec, block_height):
"""Creates a new open-ended syncer session starting at the given block height.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Start block height
:type block_height: int
:returns: "Live" syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
session = SessionBase.create_session()
o = BlockchainSync(str(chain_spec), block_height, 0, None)
session.add(o)
session.flush()
object_id = o.id
of = BlockchainSyncFilter(o)
session.add(of)
session.commit()
session.close()
return SQLBackend(chain_spec, object_id)
def register_filter(self, name):
"""Add filter to backend.
No check is currently implemented to enforce that filters are the same for existing syncers. Care must be taken by the caller to avoid inconsistencies.
:param name: Name of filter
:type name: str
"""
self.connect()
if self.db_object_filter == None:
self.db_object_filter = BlockchainSyncFilter(self.db_object)
self.db_object_filter.add(name)
self.db_session.add(self.db_object_filter)
self.disconnect()
def begin_filter(self, n):
"""Marks start of execution of the filter indexed by the corresponding bit.
:param n: Filter index
:type n: int
"""
self.connect()
self.db_object_filter.set(n)
self.db_session.add(self.db_object_filter)
self.db_session.commit()
self.disconnect()
def complete_filter(self, n):
self.connect()
self.db_object_filter.release(check_bit=n)
self.db_session.add(self.db_object_filter)
self.db_session.commit()
self.disconnect()
def reset_filter(self):
"""Reset all filter states.
"""
self.connect()
self.db_object_filter.clear()
self.disconnect()

View File

@ -0,0 +1,53 @@
# standard imports
import os
import logging
# local imports
from chainsyncer.db.models.base import SessionBase
logg = logging.getLogger()
def dsn_from_config(config):
"""Generate a dsn string from the provided config dict.
The config dict must include all well-known database connection parameters, and must implement the method "get(key)" to retrieve them. Any missing parameters will be be rendered as the literal string "None"
:param config: Configuration object
:type config: Varies
:returns: dsn string
:rtype: str
"""
scheme = config.get('DATABASE_ENGINE')
if config.get('DATABASE_DRIVER') != None:
scheme += '+{}'.format(config.get('DATABASE_DRIVER'))
dsn = ''
dsn_out = ''
if config.get('DATABASE_ENGINE') == 'sqlite':
dsn = '{}:///{}'.format(
scheme,
config.get('DATABASE_NAME'),
)
dsn_out = dsn
else:
dsn = '{}://{}:{}@{}:{}/{}'.format(
scheme,
config.get('DATABASE_USER'),
config.get('DATABASE_PASSWORD'),
config.get('DATABASE_HOST'),
config.get('DATABASE_PORT'),
config.get('DATABASE_NAME'),
)
dsn_out = '{}://{}:{}@{}:{}/{}'.format(
scheme,
config.get('DATABASE_USER'),
'***',
config.get('DATABASE_HOST'),
config.get('DATABASE_PORT'),
config.get('DATABASE_NAME'),
)
logg.debug('parsed dsn from config: {}'.format(dsn_out))
return dsn

View File

View File

@ -0,0 +1 @@
Generic single-database configuration.

View File

@ -0,0 +1,85 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = .
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to ./versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat ./versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
#level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -0,0 +1,77 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -0,0 +1,37 @@
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.default.versions.src.sync import (
upgrade as upgrade_sync,
downgrade as downgrade_sync,
)
from chainsyncer.db.migrations.default.versions.src.sync_tx import (
upgrade as upgrade_sync_tx,
downgrade as downgrade_sync_tx,
)
def chainsyncer_upgrade(major=0, minor=0, patch=3):
r0_0_1_u()
if patch >= 3:
r0_0_3_u()
def chainsyncer_downgrade(major=0, minor=0, patch=3):
if patch >= 3:
r0_0_3_d()
r0_0_1_d()
def r0_0_1_u():
upgrade_sync()
def r0_0_1_d():
downgrade_sync()
# 0.0.3
def r0_0_3_u():
upgrade_sync_tx()
def r0_0_3_d():
downgrade_sync_tx()

View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,14 @@
"""base setup
Revision ID: 452ecfa81de3
Revises:
Create Date: 2021-07-16 16:29:32.460027
"""
# revision identifiers, used by Alembic.
revision = '452ecfa81de3'
down_revision = None
branch_labels = None
depends_on = None
from chainsyncer.db.migrations.default.versions.src.sync import upgrade, downgrade

View File

@ -0,0 +1,14 @@
"""sync-tx
Revision ID: a2ce6826c5eb
Revises: 452ecfa81de3
Create Date: 2021-07-16 18:17:53.439721
"""
# revision identifiers, used by Alembic.
revision = 'a2ce6826c5eb'
down_revision = '452ecfa81de3'
branch_labels = None
depends_on = None
from chainsyncer.db.migrations.default.versions.src.sync_tx import upgrade, downgrade

View File

@ -0,0 +1,33 @@
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'chain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.create_table(
'chain_sync_filter',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True),
sa.Column('flags', sa.LargeBinary, nullable=True),
sa.Column('flags_lock', sa.Integer, nullable=False, default=0),
sa.Column('flags_start', sa.LargeBinary, nullable=True),
sa.Column('count', sa.Integer, nullable=False, default=0),
sa.Column('digest', sa.String(64), nullable=False),
)
def downgrade():
op.drop_table('chain_sync_filter')
op.drop_table('chain_sync')

View File

@ -0,0 +1,17 @@
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'chain_sync_tx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=False),
sa.Column('flags', sa.LargeBinary, nullable=True),
sa.Column('block', sa.Integer, nullable=False),
sa.Column('tx', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('chain_sync_tx')

View File

@ -0,0 +1,37 @@
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.default.versions.tags.sync import
upgrade as upgrade_sync,
downgrade as downgrade_sync,
)
from chainsyncer.db.migrations.default.versions.tags.sync_tx import
upgrade as upgrade_sync_tx,
downgrade as downgrade_sync_tx,
)
def chainsyncer_upgrade(major=0, minor=0, patch=3):
r0_0_1_u()
if patch >= 3:
r0_0_3_u()
def chainsyncer_downgrade(major=0, minor=0, patch=3):
if patch >= 3:
r0_0_3_d()
r0_0_1_d()
def r0_0_1_u():
upgrade_sync()
def r0_0_1_d():
downgrade_sync()
# 0.0.3
def r0_0_3_u():
upgrade_sync_tx()
def r0_0_3_d():
downgrade_sync_tx()

View File

View File

@ -0,0 +1,161 @@
# stanard imports
import logging
# third-party imports
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger()
Model = declarative_base(name='Model')
CONNECTION_OVERFLOW_FACTOR = 3
CONNECTION_RECYCLE_AFTER = 60
class SessionBase(Model):
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
"""
__abstract__ = True
id = Column(Integer, primary_key=True)
engine = None
"""Database connection engine of the running aplication"""
sessionmaker = None
"""Factory object responsible for creating sessions from the connection pool"""
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
def create_session():
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
:param engine: The sqlalchemy engine
:type engine: sqlalchemy.engine.Engine
"""
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@staticmethod
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
The pool_size argument controls the behavior of the connection pool.
If the pool_size is greater than 1, and the engine has connection pool settings, The connection pool will be set up with the given number of connections. By default, it allows for 3x connection overflow (CONNECTION_OVERFLOW_FACTOR), and connection recycling after 60 seconds of inactivity (CONNECTION_RECYCLE_AFTER).
If the pool_size is 1 and debug mode is off, the StaticPool class (single connection pool) will be used. If debug is on, AssertionPool will be used (which raises assertionerror if more than a single connection is attempted at any one time by the process).
If the underlying engine does not have pooling capabilities, the pool_size parameter toggles the connection class used. If pool_size is set to 0, the NullPool will be used (build a new connection for every session). If pool_size is set to a positive number, the StaticPool will be used, keeping a single connection for all sessions.
:param dsn: DSN string defining connection
:type dsn: str
:param pool_size: Size of connection pool
:type pool_size: int
:param debug: Activate sql debug mode (outputs sql statements)
:type debug: bool
"""
e = None
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
e = create_engine(
dsn,
max_overflow=pool_size * CONNECTION_OVERFLOW_FACTOR,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=CONNECTION_RECYCLE_AFTER,
poolclass=poolclass,
echo=debug,
)
else:
if debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
pool_class = StaticPool
if pool_size < 1:
pool_class = NullPool
e = create_engine(
dsn,
poolclass=pool_class,
echo=debug,
)
SessionBase._set_engine(e)
@staticmethod
def disconnect():
"""Disconnect from database and free resources.
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
"""Convenience function to enforce database session responsilibity in call stacks where it is unclear which layer will create a database session.
If the session argument is None, the method will create and return a new database session. A reference to the database session will be statically stored in the SessionBase class, and must be explicitly released with release_session.
When an existing session in passed as the argument, this method simply returns back the same session.
:param session: An sqlalchemy session
:type session: session.orm.Session
:rtype: session.orm.Session
:returns: An sqlalchemy session
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session):
"""Checks if a reference to the given session exists in the SessionBase session store, and if it does commits the transaction and closes the session.
:param session: An sqlalchemy session
:type session: session.orm.Session
"""
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()
del SessionBase.localsessions[session_key]

View File

@ -0,0 +1,166 @@
# standard imports
import logging
import hashlib
# external imports
from sqlalchemy import Column, String, Integer, LargeBinary, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
from .sync import BlockchainSync
from chainsyncer.error import LockError
zero_digest = bytes(32).hex()
logg = logging.getLogger(__name__)
class BlockchainSyncFilter(SessionBase):
"""Sync filter sql backend database interface.
:param chain_sync: BlockchainSync object to use as context for filter
:type chain_sync: chainsyncer.db.models.sync.BlockchainSync
:param count: Number of filters to track
:type count: int
:param flags: Filter flag value to instantiate record with
:type flags: int
:param digest: Filter digest as integrity protection when resuming session, 256 bits, in hex
:type digest: str
"""
__tablename__ = 'chain_sync_filter'
chain_sync_id = Column(Integer, ForeignKey('chain_sync.id'))
flags_start = Column(LargeBinary)
flags = Column(LargeBinary)
flags_lock = Column(Integer)
digest = Column(String(64))
count = Column(Integer)
def __init__(self, chain_sync, count=0, flags=None, digest=None):
if digest == None:
digest = zero_digest
self.digest = digest
self.count = count
if flags == None:
flags = bytearray(0)
else:
bytecount = int((count - 1) / 8 + 1)
flags = flags.to_bytes(bytecount, 'big')
self.flags_start = flags
self.flags = flags
self.flags_lock = 0
self.chain_sync_id = chain_sync.id
@staticmethod
def load(sync_id, session=None):
q = session.query(BlockchainSyncFilter)
q = q.filter(BlockchainSyncFilter.chain_sync_id==sync_id)
o = q.first()
if o.is_locked():
raise LockError('locked state for flag {} of sync id {} must be manually resolved'.format(o.flags_lock))
def add(self, name):
"""Add a new filter to the syncer record.
The name of the filter is hashed with the current aggregated hash sum of previously added filters.
:param name: Filter informal name
:type name: str
"""
h = hashlib.new('sha256')
h.update(bytes.fromhex(self.digest))
h.update(name.encode('utf-8'))
z = h.digest()
old_byte_count = int((self.count - 1) / 8 + 1)
new_byte_count = int((self.count) / 8 + 1)
if old_byte_count != new_byte_count:
self.flags = bytearray(1) + self.flags
self.count += 1
self.digest = z.hex()
def start(self):
"""Retrieve the initial filter state of the syncer.
:rtype: tuple
:returns: Filter flag value, filter count, filter digest
"""
return (int.from_bytes(self.flags_start, 'big'), self.count, self.digest)
def cursor(self):
"""Retrieve the current filter state of the syncer.
:rtype: tuple
:returns: Filter flag value, filter count, filter digest
"""
return (int.from_bytes(self.flags, 'big'), self.count, self.digest)
def target(self):
"""Retrieve the target filter state of the syncer.
The target filter value will be the integer value when all bits are set for the filter count.
:rtype: tuple
:returns: Filter flag value, filter count, filter digest
"""
n = 0
for i in range(self.count):
n |= (1 << self.count) - 1
return (n, self.count, self.digest)
def is_locked(self):
return self.flags_lock > 0
def clear(self):
"""Set current filter flag value to zero.
"""
if self.is_locked():
raise LockError('flag clear attempted when lock set at {}'.format(self.flags_lock))
self.flags = bytearray(len(self.flags))
def set(self, n):
"""Set the filter flag at given index.
:param n: Filter flag index
:type n: int
:raises IndexError: Invalid flag index
:raises AttributeError: Flag at index already set
"""
if self.is_locked():
raise LockError('flag set attempted when lock set at {}'.format(self.flags_lock))
if n > self.count:
raise IndexError('bit flag out of range')
self.flags_lock = n
b = 1 << (n % 8)
i = int(n / 8)
byte_idx = len(self.flags)-1-i
if (self.flags[byte_idx] & b) > 0:
raise AttributeError('Filter bit already set')
flags = bytearray(self.flags)
flags[byte_idx] |= b
self.flags = flags
def release(self, check_bit=0):
if check_bit > 0:
if self.flags_lock > 0 and self.flags_lock != check_bit:
raise LockError('release attemped on explicit bit {}, but bit {} was locked'.format(check_bit, self.flags_lock))
self.flags_lock = 0

View File

@ -0,0 +1,202 @@
# standard imports
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, Text, Boolean
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
class BlockchainSync(SessionBase):
"""Syncer control backend.
:param chain_str: Chain spec string representation
:type chain_str: str
:param block_start: Block number to start sync from
:type block_start: number
:param tx_start: Block transaction number to start sync from
:type tx_start: number
:param block_target: Block number to sync until, inclusive
:type block_target: number
"""
__tablename__ = 'chain_sync'
blockchain = Column(String)
"""Chainspec string specifying the blockchain the syncer is running against."""
block_start = Column(Integer)
"""The block height at the start of syncer."""
tx_start = Column(Integer)
"""The transaction index at the start of syncer."""
block_cursor = Column(Integer)
"""The block height for the current state of the syncer."""
tx_cursor = Column(Integer)
"""The transaction index for the current state of the syncer."""
block_target = Column(Integer)
"""The block height at which the syncer should terminate. Will be None for an open-ended syncer."""
date_created = Column(DateTime, default=datetime.datetime.utcnow)
"""Datetime when syncer was first created."""
date_updated = Column(DateTime)
"""Datetime of the latest update of the syncer state."""
def __init__(self, chain_str, block_start, tx_start, block_target=None):
self.blockchain = chain_str
self.block_start = block_start
self.tx_start = tx_start
self.block_cursor = block_start
self.tx_cursor = tx_start
self.block_target = block_target
self.date_created = datetime.datetime.utcnow()
self.date_updated = datetime.datetime.utcnow()
@staticmethod
def first(chain_str, session=None):
"""Check if a sync session for the specified chain already exists.
:param chain_str: Chain spec string representation
:type chain_str: str
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: sqlalchemy.orm.session.Sessoin
:returns: Database primary key id of sync record, or None if insert failed
:rtype: number
"""
session = SessionBase.bind_session(session)
q = session.query(BlockchainSync.id)
q = q.filter(BlockchainSync.blockchain==chain_str)
o = q.first()
if o == None:
SessionBase.release_session(session)
return None
sync_id = o.id
SessionBase.release_session(session)
return sync_id
@staticmethod
def get_last(session=None, live=True):
"""Get the most recent syncer record.
If live is set, only the latest open-ended syncer will be returned.
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
:param live: Match only open-ended syncers
:type live: bool
:returns: Syncer database id
:rtype: int
"""
session = SessionBase.bind_session(session)
q = session.query(BlockchainSync.id)
if live:
q = q.filter(BlockchainSync.block_target==None)
else:
q = q.filter(BlockchainSync.block_target!=None)
q = q.order_by(BlockchainSync.date_created.desc())
object_id = q.first()
SessionBase.release_session(session)
if object_id == None:
return None
return object_id[0]
@staticmethod
def get_unsynced(session=None):
"""Get previous bounded sync sessions that did not complete.
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
:returns: Syncer database ids
:rtype: list
"""
unsynced = []
local_session = False
if session == None:
session = SessionBase.create_session()
local_session = True
q = session.query(BlockchainSync.id)
q = q.filter(BlockchainSync.block_target!=None)
q = q.filter(BlockchainSync.block_cursor<BlockchainSync.block_target)
q = q.order_by(BlockchainSync.date_created.asc())
for u in q.all():
unsynced.append(u[0])
if local_session:
session.close()
return unsynced
def set(self, block_height, tx_height):
"""Set the cursor height of the syncer instance.
Only manipulates object, does not transaction or commit to backend.
:param block_height: Block number
:type block_height: number
:param tx_height: Block transaction number
:type tx_height: number
:rtype: tuple
:returns: Stored block height, transaction index
"""
self.block_cursor = block_height
self.tx_cursor = tx_height
self.date_updated = datetime.datetime.utcnow()
return (self.block_cursor, self.tx_cursor,)
def cursor(self):
"""Get current state of cursor from cached instance.
:returns: Block height, transaction index
:rtype: tuple
"""
return (self.block_cursor, self.tx_cursor)
def start(self):
"""Get sync block start position from cached instance.
:returns: Block height, transaction index
:rtype: tuple
"""
return (self.block_start, self.tx_start)
def target(self):
"""Get sync block upper bound from cached instance.
:returns: Block number. Returns None if syncer is open-ended.
:rtype: int
"""
return self.block_target
def chain(self):
"""Get chain string representation for which the cached instance represents.
"""
return self.blockchain
def __str__(self):
return """object_id: {}
start: {}:{}
cursor: {}:{}
target: {}
""".format(
self.id,
self.block_start,
self.tx_start,
self.block_cursor,
self.tx_cursor,
self.block_target,
)

View File

@ -1 +0,0 @@
from .base import SyncState

View File

@ -2,4 +2,3 @@ confini~=0.6.0
semver==2.13.0
hexathon~=0.1.5
chainlib>=0.1.0b1,<=0.1.0
shep~=0.0.1

View File

@ -1,6 +1,6 @@
[metadata]
name = chainsyncer
version = 0.3.0
version = 0.2.0
description = Generic blockchain syncer driver
author = Louis Holbrook
author_email = dev@holbrook.no

69
tests/chainsyncer_base.py Normal file
View File

@ -0,0 +1,69 @@
# standard imports
import logging
import unittest
import tempfile
import os
#import pysqlite
# external imports
from chainlib.chain import ChainSpec
from chainlib.interface import ChainInterface
from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.eth.block import (
block_by_number,
Block,
)
from potaahto.symbols import snake_and_camel
# local imports
from chainsyncer.db import dsn_from_config
from chainsyncer.db.models.base import SessionBase
# test imports
from chainsyncer.unittest.db import ChainSyncerDb
script_dir = os.path.realpath(os.path.dirname(__file__))
logging.basicConfig(level=logging.DEBUG)
class EthChainInterface(ChainInterface):
def __init__(self):
self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._tx_from_src = Tx.from_src
self._src_normalize = snake_and_camel
class TestBase(unittest.TestCase):
interface = EthChainInterface()
def setUp(self):
self.db = ChainSyncerDb()
#f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r')
#sql = f.read()
#f.close()
#conn = SessionBase.engine.connect()
#conn.execute(sql)
#f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '2.sql'), 'r')
#sql = f.read()
#f.close()
#conn = SessionBase.engine.connect()
#conn.execute(sql)
self.session = self.db.bind_session()
self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar')
def tearDown(self):
self.session.commit()
self.db.release_session(self.session)
#os.unlink(self.db_path)

View File

@ -1,15 +1,17 @@
# standard imporst
# standard imports
import unittest
# external imports
from shep import State
from chainlib.chain import ChainSpec
# local imports
from chainsyncer.state import SyncState
from chainsyncer.session import SyncSession
from chainsyncer.backend.memory import MemBackend
# testutil imports
from tests.chainsyncer_base import TestBase
class MockStore(State):
class TestBasic(TestBase):
def __init__(self, bits=0):
super(MockStore, self).__init__(bits, check_alias=False)
@ -88,6 +90,5 @@ class TestSync(unittest.TestCase):
state.register(fltr)
if __name__ == '__main__':
unittest.main()

199
tests/test_database.py Normal file
View File

@ -0,0 +1,199 @@
# standard imports
import unittest
import logging
# external imports
from chainlib.chain import ChainSpec
# local imports
from chainsyncer.db.models.base import SessionBase
from chainsyncer.db.models.filter import BlockchainSyncFilter
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.error import LockError
# testutil imports
from tests.chainsyncer_base import TestBase
logg = logging.getLogger()
class TestDatabase(TestBase):
def test_backend_live(self):
s = SQLBackend.live(self.chain_spec, 42)
self.assertEqual(s.object_id, 1)
backend = SQLBackend.first(self.chain_spec)
#SQLBackend(self.chain_spec, sync_id)
self.assertEqual(backend.object_id, 1)
bogus_chain_spec = ChainSpec('bogus', 'foo', 13, 'baz')
sync_id = SQLBackend.first(bogus_chain_spec)
self.assertIsNone(sync_id)
def test_backend_filter_lock(self):
s = SQLBackend.live(self.chain_spec, 42)
s.connect()
filter_id = s.db_object_filter.id
s.disconnect()
session = SessionBase.create_session()
o = session.query(BlockchainSyncFilter).get(filter_id)
self.assertEqual(len(o.flags), 0)
session.close()
s.register_filter(str(0))
s.register_filter(str(1))
s.connect()
filter_id = s.db_object_filter.id
s.disconnect()
session = SessionBase.create_session()
o = session.query(BlockchainSyncFilter).get(filter_id)
o.set(1)
with self.assertRaises(LockError):
o.set(2)
o.release()
o.set(2)
def test_backend_filter(self):
s = SQLBackend.live(self.chain_spec, 42)
s.connect()
filter_id = s.db_object_filter.id
s.disconnect()
session = SessionBase.create_session()
o = session.query(BlockchainSyncFilter).get(filter_id)
self.assertEqual(len(o.flags), 0)
session.close()
for i in range(9):
s.register_filter(str(i))
s.connect()
filter_id = s.db_object_filter.id
s.disconnect()
session = SessionBase.create_session()
o = session.query(BlockchainSyncFilter).get(filter_id)
self.assertEqual(len(o.flags), 2)
(t, c, d) = o.target()
self.assertEqual(t, (1 << 9) - 1)
for i in range(9):
o.set(i)
o.release()
(f, c, d) = o.cursor()
self.assertEqual(f, t)
self.assertEqual(c, 9)
self.assertEqual(d, o.digest)
session.close()
def test_backend_retrieve(self):
s = SQLBackend.live(self.chain_spec, 42)
s.register_filter('foo')
s.register_filter('bar')
s.register_filter('baz')
s.set(42, 13)
s = SQLBackend.first(self.chain_spec)
self.assertEqual(s.get(), ((42,13), 0))
def test_backend_initial(self):
with self.assertRaises(ValueError):
s = SQLBackend.initial(self.chain_spec, 42, 42)
with self.assertRaises(ValueError):
s = SQLBackend.initial(self.chain_spec, 42, 43)
s = SQLBackend.initial(self.chain_spec, 42, 13)
s.set(43, 13)
s = SQLBackend.first(self.chain_spec)
self.assertEqual(s.get(), ((43,13), 0))
self.assertEqual(s.start(), ((13,0), 0))
def test_backend_resume(self):
s = SQLBackend.resume(self.chain_spec, 666)
self.assertEqual(len(s), 0)
s = SQLBackend.live(self.chain_spec, 42)
original_id = s.object_id
s = SQLBackend.resume(self.chain_spec, 666)
self.assertEqual(len(s), 1)
resumed_id = s[0].object_id
self.assertEqual(resumed_id, original_id + 1)
self.assertEqual(s[0].get(), ((42, 0), 0))
def test_backend_resume_when_completed(self):
s = SQLBackend.live(self.chain_spec, 42)
s = SQLBackend.resume(self.chain_spec, 666)
s[0].set(666, 0)
s = SQLBackend.resume(self.chain_spec, 666)
self.assertEqual(len(s), 0)
def test_backend_resume_several(self):
s = SQLBackend.live(self.chain_spec, 42)
s.set(43, 13)
s = SQLBackend.resume(self.chain_spec, 666)
SQLBackend.live(self.chain_spec, 666)
s[0].set(123, 2)
s = SQLBackend.resume(self.chain_spec, 1024)
SQLBackend.live(self.chain_spec, 1024)
self.assertEqual(len(s), 2)
self.assertEqual(s[0].target(), (666, 0))
self.assertEqual(s[0].get(), ((123, 2), 0))
self.assertEqual(s[1].target(), (1024, 0))
self.assertEqual(s[1].get(), ((666, 0), 0))
def test_backend_resume_filter(self):
s = SQLBackend.live(self.chain_spec, 42)
s.register_filter('foo')
s.register_filter('bar')
s.register_filter('baz')
s.set(43, 13)
s.begin_filter(0)
s.begin_filter(2)
s = SQLBackend.resume(self.chain_spec, 666)
(pair, flags) = s[0].get()
self.assertEqual(flags, 5)
def test_backend_sql_custom(self):
chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
flags = 5
flags_target = 1023
flag_count = 10
backend = SQLBackend.custom(chain_spec, 666, 42, 2, flags, flag_count)
self.assertEqual(((42, 2), flags), backend.start())
self.assertEqual(((42, 2), flags), backend.get())
self.assertEqual((666, flags_target), backend.target())
if __name__ == '__main__':
unittest.main()

121
tests/test_file.py Normal file
View File

@ -0,0 +1,121 @@
# standard imports
import logging
import uuid
import os
import unittest
import shutil
# external imports
from chainlib.chain import ChainSpec
# local imports
from chainsyncer.backend.file import FileBackend
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger().getChild(__name__)
script_dir = os.path.dirname(__file__)
tmp_test_dir = os.path.join(script_dir, 'testdata', 'tmp')
chainsyncer_test_dir = os.path.join(tmp_test_dir, 'chainsyncer')
os.makedirs(tmp_test_dir, exist_ok=True)
class TestFile(unittest.TestCase):
def setUp(self):
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.uu = FileBackend.create_object(self.chain_spec, None, base_dir=tmp_test_dir)
logg.debug('made uu {} for {}'.format(self.uu, self.chain_spec))
self.o = FileBackend(self.chain_spec, self.uu, base_dir=tmp_test_dir)
def tearDown(self):
self.o.purge()
shutil.rmtree(chainsyncer_test_dir)
@unittest.skip('foo')
def test_set(self):
self.o.set(42, 13)
o = FileBackend(self.chain_spec, self.o.object_id, base_dir=tmp_test_dir)
state = o.get()
self.assertEqual(state[0], 42)
self.assertEqual(state[1], 13)
@unittest.skip('foo')
def test_initial(self):
local_uu = FileBackend.initial(self.chain_spec, 1337, start_block_height=666, base_dir=tmp_test_dir)
o = FileBackend(self.chain_spec, local_uu, base_dir=tmp_test_dir)
(pair, filter_stats) = o.target()
self.assertEqual(pair[0], 1337)
self.assertEqual(pair[1], 0)
(pair, filter_stats) = o.start()
self.assertEqual(pair[0], 666)
self.assertEqual(pair[1], 0)
@unittest.skip('foo')
def test_resume(self):
for i in range(1, 10):
local_uu = FileBackend.initial(self.chain_spec, 666, start_block_height=i, base_dir=tmp_test_dir)
entries = FileBackend.resume(self.chain_spec, base_dir=tmp_test_dir)
self.assertEqual(len(entries), 10)
last = -1
for o in entries:
self.assertLess(last, o.block_height_offset)
last = o.block_height_offset
@unittest.skip('foo')
def test_first(self):
for i in range(1, 10):
local_uu = FileBackend.initial(self.chain_spec, 666, start_block_height=i, base_dir=tmp_test_dir)
first_entry = FileBackend.first(self.chain_spec, base_dir=tmp_test_dir)
self.assertEqual(first_entry.block_height_offset, 9)
def test_filter(self):
self.assertEqual(len(self.o.filter), 1)
self.o.register_filter('foo')
self.o.register_filter('bar')
o = FileBackend(self.chain_spec, self.uu, base_dir=tmp_test_dir)
self.assertEqual(o.filter_count, 2)
self.assertEqual(o.filter_names, ['foo', 'bar'])
self.assertEqual(len(o.filter), 1)
self.o.complete_filter(1)
self.assertEqual(self.o.filter, b'\x40')
self.o.complete_filter(0)
self.assertEqual(self.o.filter, b'\xc0')
o = FileBackend(self.chain_spec, self.uu, base_dir=tmp_test_dir)
self.assertEqual(o.filter, b'\xc0')
with self.assertRaises(IndexError):
self.o.complete_filter(2)
self.o.register_filter('baz')
self.o.complete_filter(2)
if __name__ == '__main__':
unittest.main()

15
tests/test_helo.py Normal file
View File

@ -0,0 +1,15 @@
# standard imports
import unittest
# local imports
from tests.chainsyncer_base import TestBase
class TestHelo(TestBase):
def test_helo(self):
pass
if __name__ == '__main__':
unittest.main()

147
tests/test_interrupt.py Normal file
View File

@ -0,0 +1,147 @@
# standard imports
import logging
import unittest
import os
import tempfile
# external imports
from chainlib.chain import ChainSpec
# local imports
from chainsyncer.backend.memory import MemBackend
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.backend.file import (
FileBackend,
data_dir_for,
)
from chainsyncer.error import LockError
# test imports
from tests.chainsyncer_base import TestBase
from chainsyncer.unittest.base import (
MockBlock,
MockConn,
TestSyncer,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class NaughtyCountExceptionFilter:
def __init__(self, name, croak_on):
self.c = 0
self.croak = croak_on
self.name = name
def filter(self, conn, block, tx, db_session=None):
self.c += 1
if self.c == self.croak:
self.croak = -1
raise RuntimeError('foo')
def __str__(self):
return '{} {}'.format(self.__class__.__name__, self.name)
class CountFilter:
def __init__(self, name):
self.c = 0
self.name = name
def filter(self, conn, block, tx, db_session=None):
self.c += 1
def __str__(self):
return '{} {}'.format(self.__class__.__name__, self.name)
class TestInterrupt(TestBase):
def setUp(self):
super(TestInterrupt, self).setUp()
self.backend = None
self.conn = MockConn()
self.vectors = [
[4, 3, 2],
[6, 4, 2],
[6, 5, 2],
[6, 4, 3],
]
self.track_complete = True
def assert_filter_interrupt(self, vector, chain_interface):
logg.debug('running vector {} {}'.format(str(self.backend), vector))
z = 0
for v in vector:
z += v
syncer = TestSyncer(self.backend, chain_interface, vector)
filters = [
CountFilter('foo'),
CountFilter('bar'),
NaughtyCountExceptionFilter('xyzzy', croak_on=3),
CountFilter('baz'),
]
for fltr in filters:
syncer.add_filter(fltr)
try:
syncer.loop(0.1, self.conn)
except RuntimeError:
self.croaked = 2
logg.info('caught croak')
pass
(pair, fltr) = self.backend.get()
self.assertGreater(fltr, 0)
try:
syncer.loop(0.1, self.conn)
except LockError:
self.backend.complete_filter(2)
syncer.loop(0.1, self.conn)
for fltr in filters:
logg.debug('{} {}'.format(str(fltr), fltr.c))
self.assertEqual(fltr.c, z)
def test_filter_interrupt_memory(self):
self.track_complete = True
for vector in self.vectors:
self.backend = MemBackend.custom(self.chain_spec, target_block=len(vector))
self.assert_filter_interrupt(vector, self.interface)
#TODO: implement flag lock in file backend
@unittest.expectedFailure
def test_filter_interrupt_file(self):
#for vector in self.vectors:
vector = self.vectors.pop()
d = tempfile.mkdtemp()
#os.makedirs(data_dir_for(self.chain_spec, 'foo', d))
self.backend = FileBackend.initial(self.chain_spec, len(vector), base_dir=d) #'foo', base_dir=d)
self.assert_filter_interrupt(vector, self.interface)
def test_filter_interrupt_sql(self):
self.track_complete = True
for vector in self.vectors:
self.backend = SQLBackend.initial(self.chain_spec, len(vector))
self.assert_filter_interrupt(vector, self.interface)
if __name__ == '__main__':
unittest.main()

32
tests/test_mem.py Normal file
View File

@ -0,0 +1,32 @@
# standard imports
import unittest
import logging
# external imports
from chainlib.chain import ChainSpec
# local imports
from chainsyncer.backend.memory import MemBackend
# testutil imports
from tests.chainsyncer_base import TestBase
logging.basicConfig(level=logging.DEBUG)
class TestMem(TestBase):
def test_backend_mem_custom(self):
chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
flags = int(5).to_bytes(2, 'big')
flag_count = 10
flags_target = (2 ** 10) - 1
backend = MemBackend.custom(chain_spec, 666, 42, 2, flags, flag_count, object_id='xyzzy')
self.assertEqual(((42, 2), flags), backend.start())
self.assertEqual(((42, 2), flags), backend.get())
self.assertEqual((666, flags_target), backend.target())
self.assertEqual(backend.object_id, 'xyzzy')
if __name__ == '__main__':
unittest.main()

12
tests/test_thread.py Normal file
View File

@ -0,0 +1,12 @@
# standard imports
import logging
import unittest
# test imports
from tests.chainsyncer_base import TestBase
class TestThreadRange(TestBase):
def test_hello(self):
ThreadPoolRangeHistorySyncer(None, 3)

114
tests/test_thread_range.py Normal file
View File

@ -0,0 +1,114 @@
# standard imports
import unittest
import logging
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.unittest.ethtester import EthTesterCase
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import (
RPCGasOracle,
Gas,
)
from chainlib.eth.unittest.base import TestRPCConnection
# local imports
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver.threadrange import (
sync_split,
ThreadPoolRangeHistorySyncer,
)
from chainsyncer.unittest.base import MockConn
from chainsyncer.unittest.db import ChainSyncerDb
# testutil imports
from tests.chainsyncer_base import (
EthChainInterface,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class SyncerCounter:
def __init__(self):
self.hits = []
def filter(self, conn, block, tx, db_session=None):
logg.debug('fltr {} {}'.format(block, tx))
self.hits.append((block, tx))
class TestBaseEth(EthTesterCase):
interface = EthChainInterface()
def setUp(self):
super(TestBaseEth, self).setUp()
self.db = ChainSyncerDb()
self.session = self.db.bind_session()
def tearDown(self):
self.session.commit()
self.db.release_session(self.session)
#os.unlink(self.db_path)
class TestThreadRange(TestBaseEth):
interface = EthChainInterface()
def test_range_split_even(self):
ranges = sync_split(5, 20, 3)
self.assertEqual(len(ranges), 3)
self.assertEqual(ranges[0], (5, 9))
self.assertEqual(ranges[1], (10, 14))
self.assertEqual(ranges[2], (15, 19))
def test_range_split_underflow(self):
ranges = sync_split(5, 8, 4)
self.assertEqual(len(ranges), 3)
self.assertEqual(ranges[0], (5, 5))
self.assertEqual(ranges[1], (6, 6))
self.assertEqual(ranges[2], (7, 7))
def test_range_syncer_hello(self):
#chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
chain_spec = ChainSpec('evm', 'foochain', 42)
backend = MemBackend.custom(chain_spec, 20, 5, 3, 5, 10)
#syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backend, self.interface)
syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface)
syncer.loop(0.1, None)
def test_range_syncer_content(self):
nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc)
gas_oracle = RPCGasOracle(self.rpc)
self.backend.mine_blocks(10)
c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec)
(tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 1024)
r = self.rpc.do(o)
self.backend.mine_blocks(3)
c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec)
(tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 2048)
r = self.rpc.do(o)
self.backend.mine_blocks(10)
backend = MemBackend.custom(self.chain_spec, 20, 5, 3, 5, 10)
syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface)
fltr = SyncerCounter()
syncer.add_filter(fltr)
syncer.loop(0.1, None)
if __name__ == '__main__':
unittest.main()