diff --git a/chainsyncer/backend/base.py b/chainsyncer/backend/base.py index ccf36f1..f5048bc 100644 --- a/chainsyncer/backend/base.py +++ b/chainsyncer/backend/base.py @@ -1,12 +1,18 @@ # standard imports import logging -logg = logging.getLogger().getChild(__name__) +logg = logging.getLogger(__name__) class Backend: + """Base class for syncer state backend. - def __init__(self, flags_reversed=False): + :param flags_reversed: If set, filter flags are interpreted from left to right + :type flags_reversed: bool + """ + + def __init__(self, object_id, flags_reversed=False): + self.object_id = object_id self.filter_count = 0 self.flags_reversed = flags_reversed @@ -19,9 +25,17 @@ class Backend: self.block_height_target = 0 self.tx_index_target = 0 - def check_filter(self, n, flags): + """Check whether an individual filter flag is set. + + :param n: Bit index + :type n: int + :param flags: Bit field to check against + :type flags: int + :rtype: bool + :returns: True if set + """ if self.flags_reversed: try: v = 1 << flags.bit_length() - 1 @@ -34,12 +48,13 @@ class Backend: def chain(self): - """Returns chain spec for syncer + """Returns chain spec for syncer. :returns: Chain spec :rtype chain_spec: cic_registry.chain.ChainSpec """ return self.chain_spec + def __str__(self): - return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target()) + return "syncerbackend {} chain {} start {} target {}".format(self.object_id, self.chain(), self.start(), self.target()) diff --git a/chainsyncer/backend/file.py b/chainsyncer/backend/file.py index 9df52f5..316d571 100644 --- a/chainsyncer/backend/file.py +++ b/chainsyncer/backend/file.py @@ -9,23 +9,54 @@ from .base import Backend logg = logging.getLogger().getChild(__name__) -base_dir = '/var/lib' +BACKEND_BASE_DIR = '/var/lib' -def chain_dir_for(chain_spec, base_dir=base_dir): +def chain_dir_for(chain_spec, base_dir=BACKEND_BASE_DIR): + """Retrieve file backend directory for the given chain spec. + + :param chain_spec: Chain spec context of backend + :type chain_spec: chainlib.chain.ChainSpec + :param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR + :type base_dir: str + :rtype: str + :returns: Absolute path of chain backend directory + """ base_data_dir = os.path.join(base_dir, 'chainsyncer') return os.path.join(base_data_dir, str(chain_spec).replace(':', '/')) -def data_dir_for(chain_spec, object_id, base_dir=base_dir): +def data_dir_for(chain_spec, object_id, base_dir=BACKEND_BASE_DIR): + """Retrieve file backend directory for the given syncer. + + :param chain_spec: Chain spec context of backend + :type chain_spec: chainlib.chain.ChainSpec + :param object_id: Syncer id + :type object_id: str + :param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR + :type base_dir: str + :rtype: str + :returns: Absolute path of chain backend directory + """ chain_dir = chain_dir_for(chain_spec, base_dir=base_dir) return os.path.join(chain_dir, object_id) class FileBackend(Backend): + """Filesystem backend implementation for syncer state. - def __init__(self, chain_spec, object_id=None, base_dir=base_dir): - super(FileBackend, self).__init__(flags_reversed=True) + FileBackend uses reverse order of filter flags. + + :param chain_spec: Chain spec for the chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec + :param object_id: Unique id for the syncer session. + :type object_id: str + :param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR + :type base_dir: str + """ + + def __init__(self, chain_spec, object_id, base_dir=BACKEND_BASE_DIR): + super(FileBackend, self).__init__(object_id, flags_reversed=True) self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir) self.object_id = object_id @@ -42,7 +73,16 @@ class FileBackend(Backend): @staticmethod - def create_object(chain_spec, object_id=None, base_dir=base_dir): + def create_object(chain_spec, object_id=None, base_dir=BACKEND_BASE_DIR): + """Creates a new syncer session at the given backend destination. + + :param chain_spec: Chain spec for the chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec + :param object_id: Unique id for the syncer session. + :type object_id: str + :param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR + :type base_dir: str + """ if object_id == None: object_id = str(uuid.uuid4()) @@ -89,6 +129,11 @@ class FileBackend(Backend): def load(self): + """Loads the state of the syncer at the given location of the instance. + + :raises FileNotFoundError: Invalid data directory + :raises IsADirectoryError: Invalid data directory + """ offset_path = os.path.join(self.object_data_dir, 'offset') f = open(offset_path, 'rb') b = f.read(16) @@ -130,6 +175,10 @@ class FileBackend(Backend): def connect(self): + """Proxy for chainsyncer.backend.file.FileBackend.load that performs a basic sanity check for instance's backend location. + + :raises ValueError: Sanity check failed + """ object_path = os.path.join(self.object_data_dir, 'object_id') f = open(object_path, 'r') object_id = f.read() @@ -141,23 +190,46 @@ class FileBackend(Backend): def disconnect(self): + """FileBackend applies no actual connection, so this is noop + """ pass def purge(self): + """Remove syncer state from backend. + """ shutil.rmtree(self.object_data_dir) def get(self): + """Get the current state of the syncer cursor. + + :rtype: tuple + :returns: Block height / tx index tuple, and filter flags value + """ logg.debug('filter {}'.format(self.filter.hex())) return ((self.block_height_cursor, self.tx_index_cursor), self.get_flags()) def get_flags(self): + """Get canonical representation format of flags. + + :rtype: int + :returns: Filter flag bitfield value + """ return int.from_bytes(self.filter, 'little') def set(self, block_height, tx_index): + """Update the state of the syncer cursor. + + :param block_height: New block height + :type block_height: int + :param tx_height: New transaction height in block + :type tx_height: int + :returns: Block height / tx index tuple, and filter flags value + :rtype: tuple + """ self.__set(block_height, tx_index, 'cursor') # cursor_path = os.path.join(self.object_data_dir, 'filter') @@ -188,7 +260,21 @@ class FileBackend(Backend): @staticmethod - def initial(chain_spec, target_block_height, start_block_height=0, base_dir=base_dir): + def initial(chain_spec, target_block_height, start_block_height=0, base_dir=BACKEND_BASE_DIR): + """Creates a new syncer session and commit its initial state to backend. + + :param chain_spec: Chain spec of chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec + :param target_block_height: Target block height + :type target_block_height: int + :param start_block_height: Start block height + :type start_block_height: int + :param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR + :type base_dir: str + :raises ValueError: Invalid start/target specification + :returns: New syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ if start_block_height >= target_block_height: raise ValueError('start block height must be lower than target block height') @@ -203,7 +289,18 @@ class FileBackend(Backend): @staticmethod - def live(chain_spec, block_height, base_dir=base_dir): + def live(chain_spec, block_height, base_dir=BACKEND_BASE_DIR): + """Creates a new open-ended syncer session starting at the given block height. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :param block_height: Start block height + :type block_height: int + :param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR + :type base_dir: str + :returns: "Live" syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ uu = FileBackend.create_object(chain_spec, base_dir=base_dir) o = FileBackend(chain_spec, uu, base_dir=base_dir) o.__set(block_height, 0, 'offset') @@ -213,15 +310,26 @@ class FileBackend(Backend): def target(self): + """Get the target state (upper bound of sync) of the syncer cursor. + + :returns: Block height and filter flags value + :rtype: tuple + """ + return (self.block_height_target, 0,) def start(self): + """Get the initial state of the syncer cursor. + + :returns: Block height / tx index tuple, and filter flags value + :rtype: tuple + """ return ((self.block_height_offset, self.tx_index_offset), 0,) @staticmethod - def __sorted_entries(chain_spec, base_dir=base_dir): + def __sorted_entries(chain_spec, base_dir=BACKEND_BASE_DIR): chain_dir = chain_dir_for(chain_spec, base_dir=base_dir) entries = {} @@ -246,7 +354,21 @@ class FileBackend(Backend): @staticmethod - def resume(chain_spec, block_height, base_dir=base_dir): + def resume(chain_spec, block_height, base_dir=BACKEND_BASE_DIR): + """Retrieves and returns all previously unfinished syncer sessions. + + If a previous open-ended syncer is found, a new syncer will be generated to sync from where that syncer left off until the block_height given as argument. + + :param chain_spec: Chain spec of chain that syncer is running for + :type chain_spec: cic_registry.chain.ChainSpec + :param block_height: Target block height for previous live syncer + :type block_height: int + :param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR + :type base_dir: str + :raises FileNotFoundError: Invalid backend location + :returns: Syncer objects of unfinished syncs + :rtype: list of cic_eth.db.models.BlockchainSync + """ try: return FileBackend.__sorted_entries(chain_spec, base_dir=base_dir) except FileNotFoundError: @@ -254,7 +376,16 @@ class FileBackend(Backend): @staticmethod - def first(chain_spec, base_dir=base_dir): + def first(chain_spec, base_dir=BACKEND_BASE_DIR): + """Returns the model object of the most recent syncer in backend. + + :param chain_spec: Chain spec of chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec + :param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR + :type base_dir: str + :returns: Last syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ entries = [] try: entries = FileBackend.__sorted_entries(chain_spec, base_dir=base_dir) @@ -264,8 +395,13 @@ class FileBackend(Backend): # n is zero-index of bit field - def complete_filter(self, n, base_dir=base_dir): + def complete_filter(self, n, base_dir=BACKEND_BASE_DIR): + """Sets the filter at the given index as completed. + :param n: Filter index, starting at zero + :type n: int + :raises IndexError: Index is outside filter count range + """ if self.filter_count <= n: raise IndexError('index {} out of ranger for filter size {}'.format(n, self.filter_count)) @@ -286,8 +422,14 @@ class FileBackend(Backend): f.close() - # overwrites disk if manual changed members in struct def register_filter(self, name): + """Add filter to backend. + + Overwrites record on disk if manual changed members in struct + + :param name: Name of filter + :type name: str + """ filter_path = os.path.join(self.object_data_dir, 'filter') if (self.filter_count + 1) % 8 == 0: self.filter += b'\x00' @@ -308,6 +450,8 @@ class FileBackend(Backend): def reset_filter(self): + """Reset all filter states. + """ self.filter = b'\x00' * len(self.filter) cursor_path = os.path.join(self.object_data_dir, 'filter') f = open(cursor_path, 'r+b') diff --git a/chainsyncer/backend/memory.py b/chainsyncer/backend/memory.py index a2b451c..652a21a 100644 --- a/chainsyncer/backend/memory.py +++ b/chainsyncer/backend/memory.py @@ -4,66 +4,111 @@ import logging # local imports from .base import Backend -logg = logging.getLogger().getChild(__name__) +logg = logging.getLogger(__name__) class MemBackend(Backend): + """Disposable syncer backend. Keeps syncer state in memory. - def __init__(self, chain_spec, object_id, target_block=None): - super(MemBackend, self).__init__() - self.object_id = object_id + Filter bitfield is interpreted right to left. + + :param chain_spec: Chain spec context of syncer + :type chain_spec: chainlib.chain.ChainSpec + :param object_id: Unique id for the syncer session. + :type object_id: str + :param target_block: Block height to terminate sync at + :type target_block: int + """ + + def __init__(self, chain_spec, object_id, target_block=None, block_height=0, tx_height=0, flags=0): + super(MemBackend, self).__init__(object_id) self.chain_spec = chain_spec - self.block_height = 0 - self.tx_height = 0 - self.flags = 0 - self.target_block = target_block + self.block_height_offset = block_height + self.block_height_cursor = block_height + self.tx_height_offset = tx_height + self.tx_height_cursor = tx_height + self.block_height_target = target_block self.db_session = None + self.flags = flags self.filter_names = [] - self.filter_states = {} def connect(self): + """NOOP as memory backend implements no connection. + """ pass def disconnect(self): + """NOOP as memory backend implements no connection. + """ pass def set(self, block_height, tx_height): - logg.debug('stateless backend received {} {}'.format(block_height, tx_height)) - self.block_height = block_height - self.tx_height = tx_height + """Set the syncer state. + + :param block_height: New block height + :type block_height: int + :param tx_height: New transaction height in block + :type tx_height: int + """ + logg.debug('memory backend received {} {}'.format(block_height, tx_height)) + self.block_height_cursor = block_height + self.tx_height_cursor = tx_height def get(self): - return ((self.block_height, self.tx_height), self.flags) + """Get the current syncer state + + :rtype: tuple + :returns: block height / tx index tuple, and filter flags value + """ + return ((self.block_height_cursor, self.tx_height_cursor), self.flags) def target(self): - return (self.target_block, self.flags) + """Returns the syncer target. + + :rtype: tuple + :returns: block height / tx index tuple + """ + return (self.block_height_target, self.flags) def register_filter(self, name): + """Adds a filter identifier to the syncer. + + :param name: Filter name + :type name: str + """ self.filter_names.append(name) self.filter_count += 1 def complete_filter(self, n): + """Set filter at index as completed for the current block / tx state. + + :param n: Filter index + :type n: int + """ v = 1 << n self.flags |= v logg.debug('set filter {} {}'.format(self.filter_names[n], v)) def reset_filter(self): + """Set all filters to unprocessed for the current block / tx state. + """ logg.debug('reset filters') self.flags = 0 - def get_flags(self): - return flags +# def get_flags(self): +# """Returns flags +# """ +# return self.flags def __str__(self): - return "syncer membackend chain {} cursor".format(self.get()) - + return "syncer membackend {} chain {} cursor {}".format(self.object_id, self.chain(), self.get()) diff --git a/chainsyncer/backend/sql.py b/chainsyncer/backend/sql.py index 53c2f97..a44dee9 100644 --- a/chainsyncer/backend/sql.py +++ b/chainsyncer/backend/sql.py @@ -19,32 +19,44 @@ class SQLBackend(Backend): :param chain_spec: Chain spec for the chain that syncer is running for. :type chain_spec: cic_registry.chain.ChainSpec - :param object_id: Unique id for the syncer session. - :type object_id: number + :param object_id: Unique database record id for the syncer session. + :type object_id: int """ base = None def __init__(self, chain_spec, object_id): - super(SQLBackend, self).__init__() + super(SQLBackend, self).__init__(int(object_id)) self.db_session = None self.db_object = None self.db_object_filter = None self.chain_spec = chain_spec - self.object_id = object_id self.connect() self.disconnect() @classmethod - def setup(cls, dsn, debug=False, *args, **kwargs): + def setup(cls, dsn, debug=False, pool_size=0, *args, **kwargs): + """Set up database connection backend. + + :param dsn: Database connection string + :type dsn: str + :param debug: Activate debug output in sql engine + :type debug: bool + :param pool_size: Size of transaction pool + :type pool_size: int + """ if cls.base == None: cls.base = SessionBase - cls.base.connect(dsn, debug=debug, pool_size=kwargs.get('pool_size', 0)) + cls.base.connect(dsn, debug=debug, pool_size=pool_size) def connect(self): - """Loads the state of the syncer session with the given id. + """Loads the state of the syncer session by the given database record id. + + :raises ValueError: Database syncer object with given id does not exist + :rtype: sqlalchemy.orm.session.Session + :returns: Database session object """ if self.db_session == None: self.db_session = SessionBase.create_session() @@ -66,7 +78,7 @@ class SQLBackend(Backend): def disconnect(self): - """Commits state of sync to backend. + """Commits state of sync to backend and frees connection resources. """ if self.db_session == None: return @@ -83,8 +95,8 @@ class SQLBackend(Backend): def get(self): """Get the current state of the syncer cursor. - :returns: Block and block transaction height, respectively :rtype: tuple + :returns: Block height / tx index tuple, and filter flags value """ self.connect() pair = self.db_object.cursor() @@ -94,12 +106,13 @@ class SQLBackend(Backend): def set(self, block_height, tx_height): - """Update the state of the syncer cursor - :param block_height: Block height of cursor - :type block_height: number - :param tx_height: Block transaction height of cursor - :type tx_height: number - :returns: Block and block transaction height, respectively + """Update the state of the syncer cursor. + + :param block_height: New block height + :type block_height: int + :param tx_height: New transaction height in block + :type tx_height: int + :returns: Block height / tx index tuple, and filter flags value :rtype: tuple """ self.connect() @@ -112,7 +125,7 @@ class SQLBackend(Backend): def start(self): """Get the initial state of the syncer cursor. - :returns: Initial block and block transaction height, respectively + :returns: Block height / tx index tuple, and filter flags value :rtype: tuple """ self.connect() @@ -125,8 +138,8 @@ class SQLBackend(Backend): def target(self): """Get the target state (upper bound of sync) of the syncer cursor. - :returns: Target block height - :rtype: number + :returns: Block height and filter flags value + :rtype: tuple """ self.connect() target = self.db_object.target() @@ -139,12 +152,11 @@ class SQLBackend(Backend): def first(chain_spec): """Returns the model object of the most recent syncer in backend. - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec + :param chain_spec: Chain spec of chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec :returns: Last syncer object :rtype: cic_eth.db.models.BlockchainSync """ - #return BlockchainSync.first(str(chain_spec)) object_id = BlockchainSync.first(str(chain_spec)) if object_id == None: return None @@ -156,10 +168,13 @@ class SQLBackend(Backend): def initial(chain_spec, target_block_height, start_block_height=0): """Creates a new syncer session and commit its initial state to backend. - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number + :param chain_spec: Chain spec of chain that syncer is running for + :type chain_spec: cic_registry.chain.ChainSpec + :param target_block_height: Target block height + :type target_block_height: int + :param start_block_height: Start block height + :type start_block_height: int + :raises ValueError: Invalid start/target specification :returns: New syncer object :rtype: cic_eth.db.models.BlockchainSync """ @@ -185,11 +200,12 @@ class SQLBackend(Backend): def resume(chain_spec, block_height): """Retrieves and returns all previously unfinished syncer sessions. + If a previous open-ended syncer is found, a new syncer will be generated to sync from where that syncer left off until the block_height given as argument. - :param chain_spec: Chain spec of chain that syncer is running for. + :param chain_spec: Chain spec of chain that syncer is running for :type chain_spec: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number + :param block_height: Target block height for previous live syncer + :type block_height: int :returns: Syncer objects of unfinished syncs :rtype: list of cic_eth.db.models.BlockchainSync """ @@ -261,8 +277,8 @@ class SQLBackend(Backend): :param chain: Chain spec of chain that syncer is running for. :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number + :param block_height: Start block height + :type block_height: int :returns: "Live" syncer object :rtype: cic_eth.db.models.BlockchainSync """ @@ -283,6 +299,13 @@ class SQLBackend(Backend): def register_filter(self, name): + """Add filter to backend. + + No check is currently implemented to enforce that filters are the same for existing syncers. Care must be taken by the caller to avoid inconsistencies. + + :param name: Name of filter + :type name: str + """ self.connect() if self.db_object_filter == None: self.db_object_filter = BlockchainSyncFilter(self.db_object) @@ -292,6 +315,11 @@ class SQLBackend(Backend): def complete_filter(self, n): + """Sets the filter at the given index as completed. + + :param n: Filter index + :type n: int + """ self.connect() self.db_object_filter.set(n) self.db_session.add(self.db_object_filter) @@ -300,8 +328,8 @@ class SQLBackend(Backend): def reset_filter(self): + """Reset all filter states. + """ self.connect() self.db_object_filter.clear() self.disconnect() - -