473 lines
16 KiB
Python
473 lines
16 KiB
Python
# standard imports
|
|
import os
|
|
import uuid
|
|
import shutil
|
|
import logging
|
|
|
|
# local imports
|
|
from .base import Backend
|
|
|
|
logg = logging.getLogger().getChild(__name__)
|
|
|
|
BACKEND_BASE_DIR = '/var/lib'
|
|
|
|
|
|
def chain_dir_for(chain_spec, base_dir=BACKEND_BASE_DIR):
|
|
"""Retrieve file backend directory for the given chain spec.
|
|
|
|
:param chain_spec: Chain spec context of backend
|
|
:type chain_spec: chainlib.chain.ChainSpec
|
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
|
:type base_dir: str
|
|
:rtype: str
|
|
:returns: Absolute path of chain backend directory
|
|
"""
|
|
base_data_dir = os.path.join(base_dir, 'chainsyncer')
|
|
return os.path.join(base_data_dir, str(chain_spec).replace(':', '/'))
|
|
|
|
|
|
def data_dir_for(chain_spec, object_id, base_dir=BACKEND_BASE_DIR):
|
|
"""Retrieve file backend directory for the given syncer.
|
|
|
|
:param chain_spec: Chain spec context of backend
|
|
:type chain_spec: chainlib.chain.ChainSpec
|
|
:param object_id: Syncer id
|
|
:type object_id: str
|
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
|
:type base_dir: str
|
|
:rtype: str
|
|
:returns: Absolute path of chain backend directory
|
|
"""
|
|
chain_dir = chain_dir_for(chain_spec, base_dir=base_dir)
|
|
return os.path.join(chain_dir, object_id)
|
|
|
|
|
|
class FileBackend(Backend):
|
|
"""Filesystem backend implementation for syncer state.
|
|
|
|
FileBackend uses reverse order of filter flags.
|
|
|
|
:param chain_spec: Chain spec for the chain that syncer is running for.
|
|
:type chain_spec: cic_registry.chain.ChainSpec
|
|
:param object_id: Unique id for the syncer session.
|
|
:type object_id: str
|
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
|
:type base_dir: str
|
|
"""
|
|
__warned = False
|
|
|
|
def __init__(self, chain_spec, object_id, base_dir=BACKEND_BASE_DIR):
|
|
if not FileBackend.__warned:
|
|
logg.warning('file backend for chainsyncer is experimental and not yet guaranteed to handle interrupted filter execution.')
|
|
FileBackend.__warned = True
|
|
super(FileBackend, self).__init__(object_id, flags_reversed=True)
|
|
self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir)
|
|
|
|
self.object_id = object_id
|
|
self.db_object = None
|
|
self.db_object_filter = None
|
|
self.chain_spec = chain_spec
|
|
|
|
self.filter = b'\x00'
|
|
self.filter_names = []
|
|
|
|
if self.object_id != None:
|
|
self.connect()
|
|
self.disconnect()
|
|
|
|
|
|
@staticmethod
|
|
def create_object(chain_spec, object_id=None, base_dir=BACKEND_BASE_DIR):
|
|
"""Creates a new syncer session at the given backend destination.
|
|
|
|
:param chain_spec: Chain spec for the chain that syncer is running for.
|
|
:type chain_spec: cic_registry.chain.ChainSpec
|
|
:param object_id: Unique id for the syncer session.
|
|
:type object_id: str
|
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
|
:type base_dir: str
|
|
"""
|
|
if object_id == None:
|
|
object_id = str(uuid.uuid4())
|
|
|
|
object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir)
|
|
|
|
if os.path.isdir(object_data_dir):
|
|
raise FileExistsError(object_data_dir)
|
|
|
|
os.makedirs(object_data_dir)
|
|
|
|
object_id_path = os.path.join(object_data_dir, 'object_id')
|
|
f = open(object_id_path, 'wb')
|
|
f.write(object_id.encode('utf-8'))
|
|
f.close()
|
|
|
|
init_value = 0
|
|
b = init_value.to_bytes(16, byteorder='big')
|
|
offset_path = os.path.join(object_data_dir, 'offset')
|
|
f = open(offset_path, 'wb')
|
|
f.write(b)
|
|
f.close()
|
|
|
|
target_path = os.path.join(object_data_dir, 'target')
|
|
f = open(target_path, 'wb')
|
|
f.write(b'\x00' * 16)
|
|
f.close()
|
|
|
|
cursor_path = os.path.join(object_data_dir, 'cursor')
|
|
f = open(cursor_path, 'wb')
|
|
f.write(b'\x00' * 16)
|
|
f.close()
|
|
|
|
cursor_path = os.path.join(object_data_dir, 'filter')
|
|
f = open(cursor_path, 'wb')
|
|
f.write(b'\x00' * 9)
|
|
f.close()
|
|
|
|
filter_name_path = os.path.join(object_data_dir, 'filter_name')
|
|
f = open(filter_name_path, 'wb')
|
|
f.write(b'')
|
|
f.close()
|
|
|
|
return object_id
|
|
|
|
|
|
def load(self):
|
|
"""Loads the state of the syncer at the given location of the instance.
|
|
|
|
:raises FileNotFoundError: Invalid data directory
|
|
:raises IsADirectoryError: Invalid data directory
|
|
"""
|
|
offset_path = os.path.join(self.object_data_dir, 'offset')
|
|
f = open(offset_path, 'rb')
|
|
b = f.read(16)
|
|
f.close()
|
|
self.block_height_offset = int.from_bytes(b[:8], byteorder='big')
|
|
self.tx_index_offset = int.from_bytes(b[8:], byteorder='big')
|
|
|
|
target_path = os.path.join(self.object_data_dir, 'target')
|
|
f = open(target_path, 'rb')
|
|
b = f.read(16)
|
|
f.close()
|
|
self.block_height_target = int.from_bytes(b[:8], byteorder='big')
|
|
self.tx_index_target = int.from_bytes(b[8:], byteorder='big')
|
|
|
|
cursor_path = os.path.join(self.object_data_dir, 'cursor')
|
|
f = open(cursor_path, 'rb')
|
|
b = f.read(16)
|
|
f.close()
|
|
self.block_height_cursor = int.from_bytes(b[:8], byteorder='big')
|
|
self.tx_index_cursor = int.from_bytes(b[8:], byteorder='big')
|
|
|
|
filter_path = os.path.join(self.object_data_dir, 'filter')
|
|
f = open(filter_path, 'rb')
|
|
b = f.read(8)
|
|
self.filter_count = int.from_bytes(b, byteorder='big')
|
|
filter_count_bytes = int((self.filter_count - 1) / 8 + 1)
|
|
if filter_count_bytes > 0:
|
|
self.filter = f.read(filter_count_bytes)
|
|
f.close()
|
|
|
|
filter_name_path = filter_path + '_name'
|
|
f = open(filter_name_path, 'r')
|
|
while True:
|
|
s = f.readline().rstrip()
|
|
if len(s) == 0:
|
|
break
|
|
self.filter_names.append(s)
|
|
f.close()
|
|
|
|
|
|
def connect(self):
|
|
"""Proxy for chainsyncer.backend.file.FileBackend.load that performs a basic sanity check for instance's backend location.
|
|
|
|
:raises ValueError: Sanity check failed
|
|
"""
|
|
object_path = os.path.join(self.object_data_dir, 'object_id')
|
|
f = open(object_path, 'r')
|
|
object_id = f.read()
|
|
f.close()
|
|
if object_id != self.object_id:
|
|
raise ValueError('data corruption in store for id {}'.format(object_id))
|
|
|
|
self.load()
|
|
|
|
|
|
def disconnect(self):
|
|
"""FileBackend applies no actual connection, so this is noop
|
|
"""
|
|
pass
|
|
|
|
|
|
def purge(self):
|
|
"""Remove syncer state from backend.
|
|
"""
|
|
shutil.rmtree(self.object_data_dir)
|
|
|
|
|
|
def get(self):
|
|
"""Get the current state of the syncer cursor.
|
|
|
|
:rtype: tuple
|
|
:returns: Block height / tx index tuple, and filter flags value
|
|
"""
|
|
logg.debug('filter {}'.format(self.filter.hex()))
|
|
return ((self.block_height_cursor, self.tx_index_cursor), self.get_flags())
|
|
|
|
|
|
def get_flags(self):
|
|
"""Get canonical representation format of flags.
|
|
|
|
:rtype: int
|
|
:returns: Filter flag bitfield value
|
|
"""
|
|
return int.from_bytes(self.filter, 'little')
|
|
|
|
|
|
def set(self, block_height, tx_index):
|
|
"""Update the state of the syncer cursor.
|
|
|
|
:param block_height: New block height
|
|
:type block_height: int
|
|
:param tx_height: New transaction height in block
|
|
:type tx_height: int
|
|
:returns: Block height / tx index tuple, and filter flags value
|
|
:rtype: tuple
|
|
"""
|
|
self.__set(block_height, tx_index, 'cursor')
|
|
|
|
# cursor_path = os.path.join(self.object_data_dir, 'filter')
|
|
# f = open(cursor_path, 'r+b')
|
|
# f.seek(8)
|
|
# l = len(self.filter)
|
|
# c = 0
|
|
# while c < l:
|
|
# c += f.write(self.filter[c:])
|
|
# f.close()
|
|
|
|
return ((self.block_height_cursor, self.tx_index_cursor), self.get_flags())
|
|
|
|
|
|
def __set(self, block_height, tx_index, category):
|
|
cursor_path = os.path.join(self.object_data_dir, category)
|
|
|
|
block_height_bytes = block_height.to_bytes(8, byteorder='big')
|
|
tx_index_bytes = tx_index.to_bytes(8, byteorder='big')
|
|
|
|
f = open(cursor_path, 'wb')
|
|
b = f.write(block_height_bytes)
|
|
b = f.write(tx_index_bytes)
|
|
f.close()
|
|
|
|
setattr(self, 'block_height_' + category, block_height)
|
|
setattr(self, 'tx_index_' + category, tx_index)
|
|
|
|
|
|
@staticmethod
|
|
def initial(chain_spec, target_block_height, start_block_height=0, base_dir=BACKEND_BASE_DIR):
|
|
"""Creates a new syncer session and commit its initial state to backend.
|
|
|
|
:param chain_spec: Chain spec of chain that syncer is running for.
|
|
:type chain_spec: cic_registry.chain.ChainSpec
|
|
:param target_block_height: Target block height
|
|
:type target_block_height: int
|
|
:param start_block_height: Start block height
|
|
:type start_block_height: int
|
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
|
:type base_dir: str
|
|
:raises ValueError: Invalid start/target specification
|
|
:returns: New syncer object
|
|
:rtype: cic_eth.db.models.BlockchainSync
|
|
"""
|
|
if start_block_height >= target_block_height:
|
|
raise ValueError('start block height must be lower than target block height')
|
|
|
|
uu = FileBackend.create_object(chain_spec, base_dir=base_dir)
|
|
|
|
o = FileBackend(chain_spec, uu, base_dir=base_dir)
|
|
o.__set(target_block_height, 0, 'target')
|
|
o.__set(start_block_height, 0, 'offset')
|
|
o.__set(start_block_height, 0, 'cursor')
|
|
|
|
return o
|
|
|
|
|
|
@staticmethod
|
|
def live(chain_spec, block_height, base_dir=BACKEND_BASE_DIR):
|
|
"""Creates a new open-ended syncer session starting at the given block height.
|
|
|
|
:param chain: Chain spec of chain that syncer is running for.
|
|
:type chain: cic_registry.chain.ChainSpec
|
|
:param block_height: Start block height
|
|
:type block_height: int
|
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
|
:type base_dir: str
|
|
:returns: "Live" syncer object
|
|
:rtype: cic_eth.db.models.BlockchainSync
|
|
"""
|
|
uu = FileBackend.create_object(chain_spec, base_dir=base_dir)
|
|
o = FileBackend(chain_spec, uu, base_dir=base_dir)
|
|
o.__set(block_height, 0, 'offset')
|
|
o.__set(block_height, 0, 'cursor')
|
|
|
|
return o
|
|
|
|
|
|
def target(self):
|
|
"""Get the target state (upper bound of sync) of the syncer cursor.
|
|
|
|
:returns: Block height and filter flags value
|
|
:rtype: tuple
|
|
"""
|
|
|
|
return (self.block_height_target, 0,)
|
|
|
|
|
|
def start(self):
|
|
"""Get the initial state of the syncer cursor.
|
|
|
|
:returns: Block height / tx index tuple, and filter flags value
|
|
:rtype: tuple
|
|
"""
|
|
return ((self.block_height_offset, self.tx_index_offset), 0,)
|
|
|
|
|
|
@staticmethod
|
|
def __sorted_entries(chain_spec, base_dir=BACKEND_BASE_DIR):
|
|
chain_dir = chain_dir_for(chain_spec, base_dir=base_dir)
|
|
|
|
entries = {}
|
|
|
|
for v in os.listdir(chain_dir):
|
|
d = os.path.realpath(os.path.join(chain_dir, v))
|
|
f = open(os.path.join(d, 'object_id'))
|
|
object_id = f.read()
|
|
f.close()
|
|
|
|
logg.debug('found syncer entry {} in {}'.format(object_id, d))
|
|
|
|
o = FileBackend(chain_spec, object_id, base_dir=base_dir)
|
|
|
|
entries[o.block_height_offset] = o
|
|
|
|
sorted_entries = []
|
|
for k in sorted(entries):
|
|
sorted_entries.append(entries[k])
|
|
|
|
return sorted_entries
|
|
|
|
|
|
@staticmethod
|
|
def resume(chain_spec, block_height, base_dir=BACKEND_BASE_DIR):
|
|
"""Retrieves and returns all previously unfinished syncer sessions.
|
|
|
|
If a previous open-ended syncer is found, a new syncer will be generated to sync from where that syncer left off until the block_height given as argument.
|
|
|
|
:param chain_spec: Chain spec of chain that syncer is running for
|
|
:type chain_spec: cic_registry.chain.ChainSpec
|
|
:param block_height: Target block height for previous live syncer
|
|
:type block_height: int
|
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
|
:type base_dir: str
|
|
:raises FileNotFoundError: Invalid backend location
|
|
:returns: Syncer objects of unfinished syncs
|
|
:rtype: list of cic_eth.db.models.BlockchainSync
|
|
"""
|
|
try:
|
|
return FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
|
except FileNotFoundError:
|
|
return []
|
|
|
|
|
|
@staticmethod
|
|
def first(chain_spec, base_dir=BACKEND_BASE_DIR):
|
|
"""Returns the model object of the most recent syncer in backend.
|
|
|
|
:param chain_spec: Chain spec of chain that syncer is running for.
|
|
:type chain_spec: cic_registry.chain.ChainSpec
|
|
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
|
:type base_dir: str
|
|
:returns: Last syncer object
|
|
:rtype: cic_eth.db.models.BlockchainSync
|
|
"""
|
|
entries = []
|
|
try:
|
|
entries = FileBackend.__sorted_entries(chain_spec, base_dir=base_dir)
|
|
except FileNotFoundError:
|
|
return entries
|
|
return entries[len(entries)-1]
|
|
|
|
|
|
# n is zero-index of bit field
|
|
def begin_filter(self, n, base_dir=BACKEND_BASE_DIR):
|
|
pass
|
|
|
|
|
|
# n is zero-index of bit field
|
|
def complete_filter(self, n, base_dir=BACKEND_BASE_DIR):
|
|
"""Sets the filter at the given index as completed.
|
|
|
|
:param n: Filter index, starting at zero
|
|
:type n: int
|
|
:raises IndexError: Index is outside filter count range
|
|
"""
|
|
if self.filter_count <= n:
|
|
raise IndexError('index {} out of ranger for filter size {}'.format(n, self.filter_count))
|
|
|
|
byte_pos = int(n / 8)
|
|
bit_pos = n % 8
|
|
|
|
byts = bytearray(self.filter)
|
|
b = (0x80 >> bit_pos)
|
|
b |= self.filter[byte_pos]
|
|
logg.debug('bbb {}'.format(type(b)))
|
|
byts[byte_pos] = b #b.to_bytes(1, byteorder='big')
|
|
self.filter = byts
|
|
|
|
filter_path = os.path.join(self.object_data_dir, 'filter')
|
|
f = open(filter_path, 'r+b')
|
|
f.seek(8 + byte_pos)
|
|
f.write(self.filter)
|
|
f.close()
|
|
|
|
|
|
def register_filter(self, name):
|
|
"""Add filter to backend.
|
|
|
|
Overwrites record on disk if manual changed members in struct
|
|
|
|
:param name: Name of filter
|
|
:type name: str
|
|
"""
|
|
filter_path = os.path.join(self.object_data_dir, 'filter')
|
|
if (self.filter_count + 1) % 8 == 0:
|
|
self.filter += b'\x00'
|
|
f = open(filter_path, 'a+b')
|
|
f.write(b'\x00')
|
|
f.close()
|
|
|
|
filter_name_path = filter_path + '_name'
|
|
f = open(filter_name_path, 'a')
|
|
f.write(name + '\n')
|
|
f.close()
|
|
|
|
self.filter_count += 1
|
|
f = open(filter_path, 'r+b')
|
|
b = self.filter_count.to_bytes(8, byteorder='big')
|
|
f.write(b)
|
|
f.close()
|
|
|
|
|
|
def reset_filter(self):
|
|
"""Reset all filter states.
|
|
"""
|
|
self.filter = b'\x00' * len(self.filter)
|
|
cursor_path = os.path.join(self.object_data_dir, 'filter')
|
|
f = open(cursor_path, 'r+b')
|
|
f.seek(8)
|
|
l = len(self.filter)
|
|
c = 0
|
|
while c < l:
|
|
c += f.write(self.filter[c:])
|
|
f.close()
|