Complete docstrings and cleanup of chainsyncer backends
This commit is contained in:
parent
07685134c1
commit
059f585efe
@ -1,12 +1,18 @@
|
|||||||
# standard imports
|
# standard imports
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
logg = logging.getLogger().getChild(__name__)
|
logg = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Backend:
|
class Backend:
|
||||||
|
"""Base class for syncer state backend.
|
||||||
|
|
||||||
def __init__(self, flags_reversed=False):
|
:param flags_reversed: If set, filter flags are interpreted from left to right
|
||||||
|
:type flags_reversed: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, object_id, flags_reversed=False):
|
||||||
|
self.object_id = object_id
|
||||||
self.filter_count = 0
|
self.filter_count = 0
|
||||||
self.flags_reversed = flags_reversed
|
self.flags_reversed = flags_reversed
|
||||||
|
|
||||||
@ -19,9 +25,17 @@ class Backend:
|
|||||||
self.block_height_target = 0
|
self.block_height_target = 0
|
||||||
self.tx_index_target = 0
|
self.tx_index_target = 0
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def check_filter(self, n, flags):
|
def check_filter(self, n, flags):
|
||||||
|
"""Check whether an individual filter flag is set.
|
||||||
|
|
||||||
|
:param n: Bit index
|
||||||
|
:type n: int
|
||||||
|
:param flags: Bit field to check against
|
||||||
|
:type flags: int
|
||||||
|
:rtype: bool
|
||||||
|
:returns: True if set
|
||||||
|
"""
|
||||||
if self.flags_reversed:
|
if self.flags_reversed:
|
||||||
try:
|
try:
|
||||||
v = 1 << flags.bit_length() - 1
|
v = 1 << flags.bit_length() - 1
|
||||||
@ -34,12 +48,13 @@ class Backend:
|
|||||||
|
|
||||||
|
|
||||||
def chain(self):
|
def chain(self):
|
||||||
"""Returns chain spec for syncer
|
"""Returns chain spec for syncer.
|
||||||
|
|
||||||
:returns: Chain spec
|
:returns: Chain spec
|
||||||
:rtype chain_spec: cic_registry.chain.ChainSpec
|
:rtype chain_spec: cic_registry.chain.ChainSpec
|
||||||
"""
|
"""
|
||||||
return self.chain_spec
|
return self.chain_spec
|
||||||
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target())
|
return "syncerbackend {} chain {} start {} target {}".format(self.object_id, self.chain(), self.start(), self.target())
|
||||||
|
@ -9,23 +9,54 @@ from .base import Backend
|
|||||||
|
|
||||||
logg = logging.getLogger().getChild(__name__)
|
logg = logging.getLogger().getChild(__name__)
|
||||||
|
|
||||||
base_dir = '/var/lib'
|
BACKEND_BASE_DIR = '/var/lib'
|
||||||
|
|
||||||
|
|
||||||
def chain_dir_for(chain_spec, base_dir=base_dir):
|
def chain_dir_for(chain_spec, base_dir=BACKEND_BASE_DIR):
|
||||||
|
"""Retrieve file backend directory for the given chain spec.
|
||||||
|
|
||||||
|
:param chain_spec: Chain spec context of backend
|
||||||
|
:type chain_spec: chainlib.chain.ChainSpec
|
||||||
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
||||||
|
:type base_dir: str
|
||||||
|
:rtype: str
|
||||||
|
:returns: Absolute path of chain backend directory
|
||||||
|
"""
|
||||||
base_data_dir = os.path.join(base_dir, 'chainsyncer')
|
base_data_dir = os.path.join(base_dir, 'chainsyncer')
|
||||||
return os.path.join(base_data_dir, str(chain_spec).replace(':', '/'))
|
return os.path.join(base_data_dir, str(chain_spec).replace(':', '/'))
|
||||||
|
|
||||||
|
|
||||||
def data_dir_for(chain_spec, object_id, base_dir=base_dir):
|
def data_dir_for(chain_spec, object_id, base_dir=BACKEND_BASE_DIR):
|
||||||
|
"""Retrieve file backend directory for the given syncer.
|
||||||
|
|
||||||
|
:param chain_spec: Chain spec context of backend
|
||||||
|
:type chain_spec: chainlib.chain.ChainSpec
|
||||||
|
:param object_id: Syncer id
|
||||||
|
:type object_id: str
|
||||||
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
||||||
|
:type base_dir: str
|
||||||
|
:rtype: str
|
||||||
|
:returns: Absolute path of chain backend directory
|
||||||
|
"""
|
||||||
chain_dir = chain_dir_for(chain_spec, base_dir=base_dir)
|
chain_dir = chain_dir_for(chain_spec, base_dir=base_dir)
|
||||||
return os.path.join(chain_dir, object_id)
|
return os.path.join(chain_dir, object_id)
|
||||||
|
|
||||||
|
|
||||||
class FileBackend(Backend):
|
class FileBackend(Backend):
|
||||||
|
"""Filesystem backend implementation for syncer state.
|
||||||
|
|
||||||
def __init__(self, chain_spec, object_id=None, base_dir=base_dir):
|
FileBackend uses reverse order of filter flags.
|
||||||
super(FileBackend, self).__init__(flags_reversed=True)
|
|
||||||
|
:param chain_spec: Chain spec for the chain that syncer is running for.
|
||||||
|
:type chain_spec: cic_registry.chain.ChainSpec
|
||||||
|
:param object_id: Unique id for the syncer session.
|
||||||
|
:type object_id: str
|
||||||
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
||||||
|
:type base_dir: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, chain_spec, object_id, base_dir=BACKEND_BASE_DIR):
|
||||||
|
super(FileBackend, self).__init__(object_id, flags_reversed=True)
|
||||||
self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir)
|
self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir)
|
||||||
|
|
||||||
self.object_id = object_id
|
self.object_id = object_id
|
||||||
@ -42,7 +73,16 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_object(chain_spec, object_id=None, base_dir=base_dir):
|
def create_object(chain_spec, object_id=None, base_dir=BACKEND_BASE_DIR):
|
||||||
|
"""Creates a new syncer session at the given backend destination.
|
||||||
|
|
||||||
|
:param chain_spec: Chain spec for the chain that syncer is running for.
|
||||||
|
:type chain_spec: cic_registry.chain.ChainSpec
|
||||||
|
:param object_id: Unique id for the syncer session.
|
||||||
|
:type object_id: str
|
||||||
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
||||||
|
:type base_dir: str
|
||||||
|
"""
|
||||||
if object_id == None:
|
if object_id == None:
|
||||||
object_id = str(uuid.uuid4())
|
object_id = str(uuid.uuid4())
|
||||||
|
|
||||||
@ -89,6 +129,11 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def load(self):
|
def load(self):
|
||||||
|
"""Loads the state of the syncer at the given location of the instance.
|
||||||
|
|
||||||
|
:raises FileNotFoundError: Invalid data directory
|
||||||
|
:raises IsADirectoryError: Invalid data directory
|
||||||
|
"""
|
||||||
offset_path = os.path.join(self.object_data_dir, 'offset')
|
offset_path = os.path.join(self.object_data_dir, 'offset')
|
||||||
f = open(offset_path, 'rb')
|
f = open(offset_path, 'rb')
|
||||||
b = f.read(16)
|
b = f.read(16)
|
||||||
@ -130,6 +175,10 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
|
"""Proxy for chainsyncer.backend.file.FileBackend.load that performs a basic sanity check for instance's backend location.
|
||||||
|
|
||||||
|
:raises ValueError: Sanity check failed
|
||||||
|
"""
|
||||||
object_path = os.path.join(self.object_data_dir, 'object_id')
|
object_path = os.path.join(self.object_data_dir, 'object_id')
|
||||||
f = open(object_path, 'r')
|
f = open(object_path, 'r')
|
||||||
object_id = f.read()
|
object_id = f.read()
|
||||||
@ -141,23 +190,46 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
|
"""FileBackend applies no actual connection, so this is noop
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def purge(self):
|
def purge(self):
|
||||||
|
"""Remove syncer state from backend.
|
||||||
|
"""
|
||||||
shutil.rmtree(self.object_data_dir)
|
shutil.rmtree(self.object_data_dir)
|
||||||
|
|
||||||
|
|
||||||
def get(self):
|
def get(self):
|
||||||
|
"""Get the current state of the syncer cursor.
|
||||||
|
|
||||||
|
:rtype: tuple
|
||||||
|
:returns: Block height / tx index tuple, and filter flags value
|
||||||
|
"""
|
||||||
logg.debug('filter {}'.format(self.filter.hex()))
|
logg.debug('filter {}'.format(self.filter.hex()))
|
||||||
return ((self.block_height_cursor, self.tx_index_cursor), self.get_flags())
|
return ((self.block_height_cursor, self.tx_index_cursor), self.get_flags())
|
||||||
|
|
||||||
|
|
||||||
def get_flags(self):
|
def get_flags(self):
|
||||||
|
"""Get canonical representation format of flags.
|
||||||
|
|
||||||
|
:rtype: int
|
||||||
|
:returns: Filter flag bitfield value
|
||||||
|
"""
|
||||||
return int.from_bytes(self.filter, 'little')
|
return int.from_bytes(self.filter, 'little')
|
||||||
|
|
||||||
|
|
||||||
def set(self, block_height, tx_index):
|
def set(self, block_height, tx_index):
|
||||||
|
"""Update the state of the syncer cursor.
|
||||||
|
|
||||||
|
:param block_height: New block height
|
||||||
|
:type block_height: int
|
||||||
|
:param tx_height: New transaction height in block
|
||||||
|
:type tx_height: int
|
||||||
|
:returns: Block height / tx index tuple, and filter flags value
|
||||||
|
:rtype: tuple
|
||||||
|
"""
|
||||||
self.__set(block_height, tx_index, 'cursor')
|
self.__set(block_height, tx_index, 'cursor')
|
||||||
|
|
||||||
# cursor_path = os.path.join(self.object_data_dir, 'filter')
|
# cursor_path = os.path.join(self.object_data_dir, 'filter')
|
||||||
@ -188,7 +260,21 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def initial(chain_spec, target_block_height, start_block_height=0, base_dir=base_dir):
|
def initial(chain_spec, target_block_height, start_block_height=0, base_dir=BACKEND_BASE_DIR):
|
||||||
|
"""Creates a new syncer session and commit its initial state to backend.
|
||||||
|
|
||||||
|
:param chain_spec: Chain spec of chain that syncer is running for.
|
||||||
|
:type chain_spec: cic_registry.chain.ChainSpec
|
||||||
|
:param target_block_height: Target block height
|
||||||
|
:type target_block_height: int
|
||||||
|
:param start_block_height: Start block height
|
||||||
|
:type start_block_height: int
|
||||||
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
||||||
|
:type base_dir: str
|
||||||
|
:raises ValueError: Invalid start/target specification
|
||||||
|
:returns: New syncer object
|
||||||
|
:rtype: cic_eth.db.models.BlockchainSync
|
||||||
|
"""
|
||||||
if start_block_height >= target_block_height:
|
if start_block_height >= target_block_height:
|
||||||
raise ValueError('start block height must be lower than target block height')
|
raise ValueError('start block height must be lower than target block height')
|
||||||
|
|
||||||
@ -203,7 +289,18 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def live(chain_spec, block_height, base_dir=base_dir):
|
def live(chain_spec, block_height, base_dir=BACKEND_BASE_DIR):
|
||||||
|
"""Creates a new open-ended syncer session starting at the given block height.
|
||||||
|
|
||||||
|
:param chain: Chain spec of chain that syncer is running for.
|
||||||
|
:type chain: cic_registry.chain.ChainSpec
|
||||||
|
:param block_height: Start block height
|
||||||
|
:type block_height: int
|
||||||
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
||||||
|
:type base_dir: str
|
||||||
|
:returns: "Live" syncer object
|
||||||
|
:rtype: cic_eth.db.models.BlockchainSync
|
||||||
|
"""
|
||||||
uu = FileBackend.create_object(chain_spec, base_dir=base_dir)
|
uu = FileBackend.create_object(chain_spec, base_dir=base_dir)
|
||||||
o = FileBackend(chain_spec, uu, base_dir=base_dir)
|
o = FileBackend(chain_spec, uu, base_dir=base_dir)
|
||||||
o.__set(block_height, 0, 'offset')
|
o.__set(block_height, 0, 'offset')
|
||||||
@ -213,15 +310,26 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def target(self):
|
def target(self):
|
||||||
|
"""Get the target state (upper bound of sync) of the syncer cursor.
|
||||||
|
|
||||||
|
:returns: Block height and filter flags value
|
||||||
|
:rtype: tuple
|
||||||
|
"""
|
||||||
|
|
||||||
return (self.block_height_target, 0,)
|
return (self.block_height_target, 0,)
|
||||||
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
|
"""Get the initial state of the syncer cursor.
|
||||||
|
|
||||||
|
:returns: Block height / tx index tuple, and filter flags value
|
||||||
|
:rtype: tuple
|
||||||
|
"""
|
||||||
return ((self.block_height_offset, self.tx_index_offset), 0,)
|
return ((self.block_height_offset, self.tx_index_offset), 0,)
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __sorted_entries(chain_spec, base_dir=base_dir):
|
def __sorted_entries(chain_spec, base_dir=BACKEND_BASE_DIR):
|
||||||
chain_dir = chain_dir_for(chain_spec, base_dir=base_dir)
|
chain_dir = chain_dir_for(chain_spec, base_dir=base_dir)
|
||||||
|
|
||||||
entries = {}
|
entries = {}
|
||||||
@ -246,7 +354,21 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def resume(chain_spec, block_height, base_dir=base_dir):
|
def resume(chain_spec, block_height, base_dir=BACKEND_BASE_DIR):
|
||||||
|
"""Retrieves and returns all previously unfinished syncer sessions.
|
||||||
|
|
||||||
|
If a previous open-ended syncer is found, a new syncer will be generated to sync from where that syncer left off until the block_height given as argument.
|
||||||
|
|
||||||
|
:param chain_spec: Chain spec of chain that syncer is running for
|
||||||
|
:type chain_spec: cic_registry.chain.ChainSpec
|
||||||
|
:param block_height: Target block height for previous live syncer
|
||||||
|
:type block_height: int
|
||||||
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
||||||
|
:type base_dir: str
|
||||||
|
:raises FileNotFoundError: Invalid backend location
|
||||||
|
:returns: Syncer objects of unfinished syncs
|
||||||
|
:rtype: list of cic_eth.db.models.BlockchainSync
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
return FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
return FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
@ -254,7 +376,16 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def first(chain_spec, base_dir=base_dir):
|
def first(chain_spec, base_dir=BACKEND_BASE_DIR):
|
||||||
|
"""Returns the model object of the most recent syncer in backend.
|
||||||
|
|
||||||
|
:param chain_spec: Chain spec of chain that syncer is running for.
|
||||||
|
:type chain_spec: cic_registry.chain.ChainSpec
|
||||||
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
||||||
|
:type base_dir: str
|
||||||
|
:returns: Last syncer object
|
||||||
|
:rtype: cic_eth.db.models.BlockchainSync
|
||||||
|
"""
|
||||||
entries = []
|
entries = []
|
||||||
try:
|
try:
|
||||||
entries = FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
entries = FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
||||||
@ -264,8 +395,13 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
# n is zero-index of bit field
|
# n is zero-index of bit field
|
||||||
def complete_filter(self, n, base_dir=base_dir):
|
def complete_filter(self, n, base_dir=BACKEND_BASE_DIR):
|
||||||
|
"""Sets the filter at the given index as completed.
|
||||||
|
|
||||||
|
:param n: Filter index, starting at zero
|
||||||
|
:type n: int
|
||||||
|
:raises IndexError: Index is outside filter count range
|
||||||
|
"""
|
||||||
if self.filter_count <= n:
|
if self.filter_count <= n:
|
||||||
raise IndexError('index {} out of ranger for filter size {}'.format(n, self.filter_count))
|
raise IndexError('index {} out of ranger for filter size {}'.format(n, self.filter_count))
|
||||||
|
|
||||||
@ -286,8 +422,14 @@ class FileBackend(Backend):
|
|||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
|
|
||||||
# overwrites disk if manual changed members in struct
|
|
||||||
def register_filter(self, name):
|
def register_filter(self, name):
|
||||||
|
"""Add filter to backend.
|
||||||
|
|
||||||
|
Overwrites record on disk if manual changed members in struct
|
||||||
|
|
||||||
|
:param name: Name of filter
|
||||||
|
:type name: str
|
||||||
|
"""
|
||||||
filter_path = os.path.join(self.object_data_dir, 'filter')
|
filter_path = os.path.join(self.object_data_dir, 'filter')
|
||||||
if (self.filter_count + 1) % 8 == 0:
|
if (self.filter_count + 1) % 8 == 0:
|
||||||
self.filter += b'\x00'
|
self.filter += b'\x00'
|
||||||
@ -308,6 +450,8 @@ class FileBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def reset_filter(self):
|
def reset_filter(self):
|
||||||
|
"""Reset all filter states.
|
||||||
|
"""
|
||||||
self.filter = b'\x00' * len(self.filter)
|
self.filter = b'\x00' * len(self.filter)
|
||||||
cursor_path = os.path.join(self.object_data_dir, 'filter')
|
cursor_path = os.path.join(self.object_data_dir, 'filter')
|
||||||
f = open(cursor_path, 'r+b')
|
f = open(cursor_path, 'r+b')
|
||||||
|
@ -4,66 +4,111 @@ import logging
|
|||||||
# local imports
|
# local imports
|
||||||
from .base import Backend
|
from .base import Backend
|
||||||
|
|
||||||
logg = logging.getLogger().getChild(__name__)
|
logg = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class MemBackend(Backend):
|
class MemBackend(Backend):
|
||||||
|
"""Disposable syncer backend. Keeps syncer state in memory.
|
||||||
|
|
||||||
def __init__(self, chain_spec, object_id, target_block=None):
|
Filter bitfield is interpreted right to left.
|
||||||
super(MemBackend, self).__init__()
|
|
||||||
self.object_id = object_id
|
:param chain_spec: Chain spec context of syncer
|
||||||
|
:type chain_spec: chainlib.chain.ChainSpec
|
||||||
|
:param object_id: Unique id for the syncer session.
|
||||||
|
:type object_id: str
|
||||||
|
:param target_block: Block height to terminate sync at
|
||||||
|
:type target_block: int
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, chain_spec, object_id, target_block=None, block_height=0, tx_height=0, flags=0):
|
||||||
|
super(MemBackend, self).__init__(object_id)
|
||||||
self.chain_spec = chain_spec
|
self.chain_spec = chain_spec
|
||||||
self.block_height = 0
|
self.block_height_offset = block_height
|
||||||
self.tx_height = 0
|
self.block_height_cursor = block_height
|
||||||
self.flags = 0
|
self.tx_height_offset = tx_height
|
||||||
self.target_block = target_block
|
self.tx_height_cursor = tx_height
|
||||||
|
self.block_height_target = target_block
|
||||||
self.db_session = None
|
self.db_session = None
|
||||||
|
self.flags = flags
|
||||||
self.filter_names = []
|
self.filter_names = []
|
||||||
self.filter_states = {}
|
|
||||||
|
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
|
"""NOOP as memory backend implements no connection.
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
|
"""NOOP as memory backend implements no connection.
|
||||||
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def set(self, block_height, tx_height):
|
def set(self, block_height, tx_height):
|
||||||
logg.debug('stateless backend received {} {}'.format(block_height, tx_height))
|
"""Set the syncer state.
|
||||||
self.block_height = block_height
|
|
||||||
self.tx_height = tx_height
|
:param block_height: New block height
|
||||||
|
:type block_height: int
|
||||||
|
:param tx_height: New transaction height in block
|
||||||
|
:type tx_height: int
|
||||||
|
"""
|
||||||
|
logg.debug('memory backend received {} {}'.format(block_height, tx_height))
|
||||||
|
self.block_height_cursor = block_height
|
||||||
|
self.tx_height_cursor = tx_height
|
||||||
|
|
||||||
|
|
||||||
def get(self):
|
def get(self):
|
||||||
return ((self.block_height, self.tx_height), self.flags)
|
"""Get the current syncer state
|
||||||
|
|
||||||
|
:rtype: tuple
|
||||||
|
:returns: block height / tx index tuple, and filter flags value
|
||||||
|
"""
|
||||||
|
return ((self.block_height_cursor, self.tx_height_cursor), self.flags)
|
||||||
|
|
||||||
|
|
||||||
def target(self):
|
def target(self):
|
||||||
return (self.target_block, self.flags)
|
"""Returns the syncer target.
|
||||||
|
|
||||||
|
:rtype: tuple
|
||||||
|
:returns: block height / tx index tuple
|
||||||
|
"""
|
||||||
|
return (self.block_height_target, self.flags)
|
||||||
|
|
||||||
|
|
||||||
def register_filter(self, name):
|
def register_filter(self, name):
|
||||||
|
"""Adds a filter identifier to the syncer.
|
||||||
|
|
||||||
|
:param name: Filter name
|
||||||
|
:type name: str
|
||||||
|
"""
|
||||||
self.filter_names.append(name)
|
self.filter_names.append(name)
|
||||||
self.filter_count += 1
|
self.filter_count += 1
|
||||||
|
|
||||||
|
|
||||||
def complete_filter(self, n):
|
def complete_filter(self, n):
|
||||||
|
"""Set filter at index as completed for the current block / tx state.
|
||||||
|
|
||||||
|
:param n: Filter index
|
||||||
|
:type n: int
|
||||||
|
"""
|
||||||
v = 1 << n
|
v = 1 << n
|
||||||
self.flags |= v
|
self.flags |= v
|
||||||
logg.debug('set filter {} {}'.format(self.filter_names[n], v))
|
logg.debug('set filter {} {}'.format(self.filter_names[n], v))
|
||||||
|
|
||||||
|
|
||||||
def reset_filter(self):
|
def reset_filter(self):
|
||||||
|
"""Set all filters to unprocessed for the current block / tx state.
|
||||||
|
"""
|
||||||
logg.debug('reset filters')
|
logg.debug('reset filters')
|
||||||
self.flags = 0
|
self.flags = 0
|
||||||
|
|
||||||
|
|
||||||
def get_flags(self):
|
# def get_flags(self):
|
||||||
return flags
|
# """Returns flags
|
||||||
|
# """
|
||||||
|
# return self.flags
|
||||||
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "syncer membackend chain {} cursor".format(self.get())
|
return "syncer membackend {} chain {} cursor {}".format(self.object_id, self.chain(), self.get())
|
||||||
|
|
||||||
|
@ -19,32 +19,44 @@ class SQLBackend(Backend):
|
|||||||
|
|
||||||
:param chain_spec: Chain spec for the chain that syncer is running for.
|
:param chain_spec: Chain spec for the chain that syncer is running for.
|
||||||
:type chain_spec: cic_registry.chain.ChainSpec
|
:type chain_spec: cic_registry.chain.ChainSpec
|
||||||
:param object_id: Unique id for the syncer session.
|
:param object_id: Unique database record id for the syncer session.
|
||||||
:type object_id: number
|
:type object_id: int
|
||||||
"""
|
"""
|
||||||
|
|
||||||
base = None
|
base = None
|
||||||
|
|
||||||
def __init__(self, chain_spec, object_id):
|
def __init__(self, chain_spec, object_id):
|
||||||
super(SQLBackend, self).__init__()
|
super(SQLBackend, self).__init__(int(object_id))
|
||||||
self.db_session = None
|
self.db_session = None
|
||||||
self.db_object = None
|
self.db_object = None
|
||||||
self.db_object_filter = None
|
self.db_object_filter = None
|
||||||
self.chain_spec = chain_spec
|
self.chain_spec = chain_spec
|
||||||
self.object_id = object_id
|
|
||||||
self.connect()
|
self.connect()
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setup(cls, dsn, debug=False, *args, **kwargs):
|
def setup(cls, dsn, debug=False, pool_size=0, *args, **kwargs):
|
||||||
|
"""Set up database connection backend.
|
||||||
|
|
||||||
|
:param dsn: Database connection string
|
||||||
|
:type dsn: str
|
||||||
|
:param debug: Activate debug output in sql engine
|
||||||
|
:type debug: bool
|
||||||
|
:param pool_size: Size of transaction pool
|
||||||
|
:type pool_size: int
|
||||||
|
"""
|
||||||
if cls.base == None:
|
if cls.base == None:
|
||||||
cls.base = SessionBase
|
cls.base = SessionBase
|
||||||
cls.base.connect(dsn, debug=debug, pool_size=kwargs.get('pool_size', 0))
|
cls.base.connect(dsn, debug=debug, pool_size=pool_size)
|
||||||
|
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
"""Loads the state of the syncer session with the given id.
|
"""Loads the state of the syncer session by the given database record id.
|
||||||
|
|
||||||
|
:raises ValueError: Database syncer object with given id does not exist
|
||||||
|
:rtype: sqlalchemy.orm.session.Session
|
||||||
|
:returns: Database session object
|
||||||
"""
|
"""
|
||||||
if self.db_session == None:
|
if self.db_session == None:
|
||||||
self.db_session = SessionBase.create_session()
|
self.db_session = SessionBase.create_session()
|
||||||
@ -66,7 +78,7 @@ class SQLBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
"""Commits state of sync to backend.
|
"""Commits state of sync to backend and frees connection resources.
|
||||||
"""
|
"""
|
||||||
if self.db_session == None:
|
if self.db_session == None:
|
||||||
return
|
return
|
||||||
@ -83,8 +95,8 @@ class SQLBackend(Backend):
|
|||||||
def get(self):
|
def get(self):
|
||||||
"""Get the current state of the syncer cursor.
|
"""Get the current state of the syncer cursor.
|
||||||
|
|
||||||
:returns: Block and block transaction height, respectively
|
|
||||||
:rtype: tuple
|
:rtype: tuple
|
||||||
|
:returns: Block height / tx index tuple, and filter flags value
|
||||||
"""
|
"""
|
||||||
self.connect()
|
self.connect()
|
||||||
pair = self.db_object.cursor()
|
pair = self.db_object.cursor()
|
||||||
@ -94,12 +106,13 @@ class SQLBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def set(self, block_height, tx_height):
|
def set(self, block_height, tx_height):
|
||||||
"""Update the state of the syncer cursor
|
"""Update the state of the syncer cursor.
|
||||||
:param block_height: Block height of cursor
|
|
||||||
:type block_height: number
|
:param block_height: New block height
|
||||||
:param tx_height: Block transaction height of cursor
|
:type block_height: int
|
||||||
:type tx_height: number
|
:param tx_height: New transaction height in block
|
||||||
:returns: Block and block transaction height, respectively
|
:type tx_height: int
|
||||||
|
:returns: Block height / tx index tuple, and filter flags value
|
||||||
:rtype: tuple
|
:rtype: tuple
|
||||||
"""
|
"""
|
||||||
self.connect()
|
self.connect()
|
||||||
@ -112,7 +125,7 @@ class SQLBackend(Backend):
|
|||||||
def start(self):
|
def start(self):
|
||||||
"""Get the initial state of the syncer cursor.
|
"""Get the initial state of the syncer cursor.
|
||||||
|
|
||||||
:returns: Initial block and block transaction height, respectively
|
:returns: Block height / tx index tuple, and filter flags value
|
||||||
:rtype: tuple
|
:rtype: tuple
|
||||||
"""
|
"""
|
||||||
self.connect()
|
self.connect()
|
||||||
@ -125,8 +138,8 @@ class SQLBackend(Backend):
|
|||||||
def target(self):
|
def target(self):
|
||||||
"""Get the target state (upper bound of sync) of the syncer cursor.
|
"""Get the target state (upper bound of sync) of the syncer cursor.
|
||||||
|
|
||||||
:returns: Target block height
|
:returns: Block height and filter flags value
|
||||||
:rtype: number
|
:rtype: tuple
|
||||||
"""
|
"""
|
||||||
self.connect()
|
self.connect()
|
||||||
target = self.db_object.target()
|
target = self.db_object.target()
|
||||||
@ -139,12 +152,11 @@ class SQLBackend(Backend):
|
|||||||
def first(chain_spec):
|
def first(chain_spec):
|
||||||
"""Returns the model object of the most recent syncer in backend.
|
"""Returns the model object of the most recent syncer in backend.
|
||||||
|
|
||||||
:param chain: Chain spec of chain that syncer is running for.
|
:param chain_spec: Chain spec of chain that syncer is running for.
|
||||||
:type chain: cic_registry.chain.ChainSpec
|
:type chain_spec: cic_registry.chain.ChainSpec
|
||||||
:returns: Last syncer object
|
:returns: Last syncer object
|
||||||
:rtype: cic_eth.db.models.BlockchainSync
|
:rtype: cic_eth.db.models.BlockchainSync
|
||||||
"""
|
"""
|
||||||
#return BlockchainSync.first(str(chain_spec))
|
|
||||||
object_id = BlockchainSync.first(str(chain_spec))
|
object_id = BlockchainSync.first(str(chain_spec))
|
||||||
if object_id == None:
|
if object_id == None:
|
||||||
return None
|
return None
|
||||||
@ -156,10 +168,13 @@ class SQLBackend(Backend):
|
|||||||
def initial(chain_spec, target_block_height, start_block_height=0):
|
def initial(chain_spec, target_block_height, start_block_height=0):
|
||||||
"""Creates a new syncer session and commit its initial state to backend.
|
"""Creates a new syncer session and commit its initial state to backend.
|
||||||
|
|
||||||
:param chain: Chain spec of chain that syncer is running for.
|
:param chain_spec: Chain spec of chain that syncer is running for
|
||||||
:type chain: cic_registry.chain.ChainSpec
|
:type chain_spec: cic_registry.chain.ChainSpec
|
||||||
:param block_height: Target block height
|
:param target_block_height: Target block height
|
||||||
:type block_height: number
|
:type target_block_height: int
|
||||||
|
:param start_block_height: Start block height
|
||||||
|
:type start_block_height: int
|
||||||
|
:raises ValueError: Invalid start/target specification
|
||||||
:returns: New syncer object
|
:returns: New syncer object
|
||||||
:rtype: cic_eth.db.models.BlockchainSync
|
:rtype: cic_eth.db.models.BlockchainSync
|
||||||
"""
|
"""
|
||||||
@ -185,11 +200,12 @@ class SQLBackend(Backend):
|
|||||||
def resume(chain_spec, block_height):
|
def resume(chain_spec, block_height):
|
||||||
"""Retrieves and returns all previously unfinished syncer sessions.
|
"""Retrieves and returns all previously unfinished syncer sessions.
|
||||||
|
|
||||||
|
If a previous open-ended syncer is found, a new syncer will be generated to sync from where that syncer left off until the block_height given as argument.
|
||||||
|
|
||||||
:param chain_spec: Chain spec of chain that syncer is running for.
|
:param chain_spec: Chain spec of chain that syncer is running for
|
||||||
:type chain_spec: cic_registry.chain.ChainSpec
|
:type chain_spec: cic_registry.chain.ChainSpec
|
||||||
:param block_height: Target block height
|
:param block_height: Target block height for previous live syncer
|
||||||
:type block_height: number
|
:type block_height: int
|
||||||
:returns: Syncer objects of unfinished syncs
|
:returns: Syncer objects of unfinished syncs
|
||||||
:rtype: list of cic_eth.db.models.BlockchainSync
|
:rtype: list of cic_eth.db.models.BlockchainSync
|
||||||
"""
|
"""
|
||||||
@ -261,8 +277,8 @@ class SQLBackend(Backend):
|
|||||||
|
|
||||||
:param chain: Chain spec of chain that syncer is running for.
|
:param chain: Chain spec of chain that syncer is running for.
|
||||||
:type chain: cic_registry.chain.ChainSpec
|
:type chain: cic_registry.chain.ChainSpec
|
||||||
:param block_height: Target block height
|
:param block_height: Start block height
|
||||||
:type block_height: number
|
:type block_height: int
|
||||||
:returns: "Live" syncer object
|
:returns: "Live" syncer object
|
||||||
:rtype: cic_eth.db.models.BlockchainSync
|
:rtype: cic_eth.db.models.BlockchainSync
|
||||||
"""
|
"""
|
||||||
@ -283,6 +299,13 @@ class SQLBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def register_filter(self, name):
|
def register_filter(self, name):
|
||||||
|
"""Add filter to backend.
|
||||||
|
|
||||||
|
No check is currently implemented to enforce that filters are the same for existing syncers. Care must be taken by the caller to avoid inconsistencies.
|
||||||
|
|
||||||
|
:param name: Name of filter
|
||||||
|
:type name: str
|
||||||
|
"""
|
||||||
self.connect()
|
self.connect()
|
||||||
if self.db_object_filter == None:
|
if self.db_object_filter == None:
|
||||||
self.db_object_filter = BlockchainSyncFilter(self.db_object)
|
self.db_object_filter = BlockchainSyncFilter(self.db_object)
|
||||||
@ -292,6 +315,11 @@ class SQLBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def complete_filter(self, n):
|
def complete_filter(self, n):
|
||||||
|
"""Sets the filter at the given index as completed.
|
||||||
|
|
||||||
|
:param n: Filter index
|
||||||
|
:type n: int
|
||||||
|
"""
|
||||||
self.connect()
|
self.connect()
|
||||||
self.db_object_filter.set(n)
|
self.db_object_filter.set(n)
|
||||||
self.db_session.add(self.db_object_filter)
|
self.db_session.add(self.db_object_filter)
|
||||||
@ -300,8 +328,8 @@ class SQLBackend(Backend):
|
|||||||
|
|
||||||
|
|
||||||
def reset_filter(self):
|
def reset_filter(self):
|
||||||
|
"""Reset all filter states.
|
||||||
|
"""
|
||||||
self.connect()
|
self.connect()
|
||||||
self.db_object_filter.clear()
|
self.db_object_filter.clear()
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user