diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..da28803 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +*.pyc +*.o diff --git a/cic_syncer/backend.py b/cic_syncer/backend.py new file mode 100644 index 0000000..af58be9 --- /dev/null +++ b/cic_syncer/backend.py @@ -0,0 +1,196 @@ +# standard imports +import logging + +# local imports +from cic_eth.db.models.sync import BlockchainSync +from cic_eth.db.models.base import SessionBase + +logg = logging.getLogger() + + +class SyncerBackend: + """Interface to block and transaction sync state. + + :param chain_spec: Chain spec for the chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec + :param object_id: Unique id for the syncer session. + :type object_id: number + """ + def __init__(self, chain_spec, object_id): + self.db_session = None + self.db_object = None + self.chain_spec = chain_spec + self.object_id = object_id + self.connect() + self.disconnect() + + + def connect(self): + """Loads the state of the syncer session with the given id. + """ + self.db_session = SessionBase.create_session() + q = self.db_session.query(BlockchainSync) + q = q.filter(BlockchainSync.id==self.object_id) + self.db_object = q.first() + if self.db_object == None: + raise ValueError('sync entry with id {} not found'.format(self.object_id)) + + + def disconnect(self): + """Commits state of sync to backend. + """ + self.db_session.add(self.db_object) + self.db_session.commit() + self.db_session.close() + + + def chain(self): + """Returns chain spec for syncer + + :returns: Chain spec + :rtype chain_spec: cic_registry.chain.ChainSpec + """ + return self.chain_spec + + + def get(self): + """Get the current state of the syncer cursor. + + :returns: Block and block transaction height, respectively + :rtype: tuple + """ + self.connect() + pair = self.db_object.cursor() + self.disconnect() + return pair + + + def set(self, block_height, tx_height): + """Update the state of the syncer cursor + :param block_height: Block height of cursor + :type block_height: number + :param tx_height: Block transaction height of cursor + :type tx_height: number + :returns: Block and block transaction height, respectively + :rtype: tuple + """ + self.connect() + pair = self.db_object.set(block_height, tx_height) + self.disconnect() + return pair + + + def start(self): + """Get the initial state of the syncer cursor. + + :returns: Initial block and block transaction height, respectively + :rtype: tuple + """ + self.connect() + pair = self.db_object.start() + self.disconnect() + return pair + + + def target(self): + """Get the target state (upper bound of sync) of the syncer cursor. + + :returns: Target block height + :rtype: number + """ + self.connect() + target = self.db_object.target() + self.disconnect() + return target + + + @staticmethod + def first(chain): + """Returns the model object of the most recent syncer in backend. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :returns: Last syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ + return BlockchainSync.first(chain) + + + @staticmethod + def initial(chain, block_height): + """Creates a new syncer session and commit its initial state to backend. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :param block_height: Target block height + :type block_height: number + :returns: New syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ + object_id = None + session = SessionBase.create_session() + o = BlockchainSync(chain, 0, 0, block_height) + session.add(o) + session.commit() + object_id = o.id + session.close() + + return SyncerBackend(chain, object_id) + + + @staticmethod + def resume(chain, block_height): + """Retrieves and returns all previously unfinished syncer sessions. + + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :param block_height: Target block height + :type block_height: number + :returns: Syncer objects of unfinished syncs + :rtype: list of cic_eth.db.models.BlockchainSync + """ + syncers = [] + + session = SessionBase.create_session() + + object_id = None + + for object_id in BlockchainSync.get_unsynced(session=session): + logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) + syncers.append(SyncerBackend(chain, object_id)) + + (block_resume, tx_resume) = BlockchainSync.get_last_live_height(block_height, session=session) + if block_height != block_resume: + o = BlockchainSync(chain, block_resume, tx_resume, block_height) + session.add(o) + session.commit() + object_id = o.id + syncers.append(SyncerBackend(chain, object_id)) + logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height)) + + session.close() + + return syncers + + + @staticmethod + def live(chain, block_height): + """Creates a new open-ended syncer session starting at the given block height. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :param block_height: Target block height + :type block_height: number + :returns: "Live" syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ + object_id = None + session = SessionBase.create_session() + o = BlockchainSync(chain, block_height, 0, None) + session.add(o) + session.commit() + object_id = o.id + session.close() + + return SyncerBackend(chain, object_id) diff --git a/cic_syncer/client/__init__.py b/cic_syncer/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cic_syncer/client/evm/__pycache__/response.cpython-38.pyc b/cic_syncer/client/evm/__pycache__/response.cpython-38.pyc new file mode 100644 index 0000000..7a8de2b Binary files /dev/null and b/cic_syncer/client/evm/__pycache__/response.cpython-38.pyc differ diff --git a/cic_syncer/client/evm/__pycache__/websocket.cpython-38.pyc b/cic_syncer/client/evm/__pycache__/websocket.cpython-38.pyc new file mode 100644 index 0000000..faac392 Binary files /dev/null and b/cic_syncer/client/evm/__pycache__/websocket.cpython-38.pyc differ diff --git a/cic_syncer/client/evm/response.py b/cic_syncer/client/evm/response.py new file mode 100644 index 0000000..7e87340 --- /dev/null +++ b/cic_syncer/client/evm/response.py @@ -0,0 +1,21 @@ +from cic_syncer.client import translate + +translations = { + 'block_number': 'hex_to_int', + } + + +class EVMResponse: + + def __init__(self, item, response_object): + self.response_object = response_object + self.item = item + self.fn = getattr(translate, translations[self.item]) + + + def get_error(self): + return self.response_object.get('error') + + + def get_result(self): + return self.fn(self.response_object.get('result')) diff --git a/cic_syncer/client/evm/websocket.py b/cic_syncer/client/evm/websocket.py new file mode 100644 index 0000000..11d389d --- /dev/null +++ b/cic_syncer/client/evm/websocket.py @@ -0,0 +1,37 @@ +import uuid +import json + +import websocket + +from .response import EVMResponse +from cic_syncer.error import RequestError + + + +class EVMWebsocketClient: + + def __init__(self, url): + self.url = url + self.conn = websocket.create_connection(url) + + + def __del__(self): + self.conn.close() + + + def block_number(self): + req_id = str(uuid.uuid4()) + req = { + 'jsonrpc': '2.0', + 'method': 'eth_blockNumber', + 'id': str(req_id), + 'params': [], + } + self.conn.send(json.dumps(req)) + r = self.conn.recv() + res = EVMResponse('block_number', json.loads(r)) + err = res.get_error() + if err != None: + raise RequestError(err) + + return res.get_result() diff --git a/cic_syncer/client/translate.py b/cic_syncer/client/translate.py new file mode 100644 index 0000000..ad35fb4 --- /dev/null +++ b/cic_syncer/client/translate.py @@ -0,0 +1,21 @@ +import re + +re_hex = r'^[0-9a-fA-Z]+$' +def is_hex(hx): + m = re.match(re_hex, hx) + if m == None: + raise ValueError('not valid hex {}'.format(hx)) + + return hx + + +def strip_0x(hx): + if len(hx) >= 2 and hx[:2] == '0x': + hx = hx[2:] + return is_hex(hx) + + +def hex_to_int(hx, endianness='big'): + hx = strip_0x(hx) + b = bytes.fromhex(hx) + return int.from_bytes(b, endianness) diff --git a/cic_syncer/db/__init__.py b/cic_syncer/db/__init__.py new file mode 100644 index 0000000..6a33fef --- /dev/null +++ b/cic_syncer/db/__init__.py @@ -0,0 +1,53 @@ +# standard imports +import os +import logging + +# local imports +from cic_syncer.db.models.base import SessionBase + +logg = logging.getLogger() + + +def dsn_from_config(config): + """Generate a dsn string from the provided config dict. + + The config dict must include all well-known database connection parameters, and must implement the method "get(key)" to retrieve them. Any missing parameters will be be rendered as the literal string "None" + + :param config: Configuration object + :type config: Varies + :returns: dsn string + :rtype: str + """ + scheme = config.get('DATABASE_ENGINE') + if config.get('DATABASE_DRIVER') != None: + scheme += '+{}'.format(config.get('DATABASE_DRIVER')) + + dsn = '' + dsn_out = '' + if config.get('DATABASE_ENGINE') == 'sqlite': + dsn = '{}:///{}'.format( + scheme, + config.get('DATABASE_NAME'), + ) + dsn_out = dsn + + else: + dsn = '{}://{}:{}@{}:{}/{}'.format( + scheme, + config.get('DATABASE_USER'), + config.get('DATABASE_PASSWORD'), + config.get('DATABASE_HOST'), + config.get('DATABASE_PORT'), + config.get('DATABASE_NAME'), + ) + dsn_out = '{}://{}:{}@{}:{}/{}'.format( + scheme, + config.get('DATABASE_USER'), + '***', + config.get('DATABASE_HOST'), + config.get('DATABASE_PORT'), + config.get('DATABASE_NAME'), + ) + logg.debug('parsed dsn from config: {}'.format(dsn_out)) + return dsn + diff --git a/cic_syncer/db/models/base.py b/cic_syncer/db/models/base.py new file mode 100644 index 0000000..153906a --- /dev/null +++ b/cic_syncer/db/models/base.py @@ -0,0 +1,73 @@ +# third-party imports +from sqlalchemy import Column, Integer +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +Model = declarative_base(name='Model') + + +class SessionBase(Model): + """The base object for all SQLAlchemy enabled models. All other models must extend this. + """ + __abstract__ = True + + id = Column(Integer, primary_key=True) + + engine = None + """Database connection engine of the running aplication""" + sessionmaker = None + """Factory object responsible for creating sessions from the connection pool""" + transactional = True + """Whether the database backend supports query transactions. Should be explicitly set by initialization code""" + poolable = True + """Whether the database backend supports query transactions. Should be explicitly set by initialization code""" + + + @staticmethod + def create_session(): + """Creates a new database session. + """ + return SessionBase.sessionmaker() + + + @staticmethod + def _set_engine(engine): + """Sets the database engine static property + """ + SessionBase.engine = engine + SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine) + + + @staticmethod + def connect(dsn, debug=False): + """Create new database connection engine and connect to database backend. + + :param dsn: DSN string defining connection. + :type dsn: str + """ + e = None + if SessionBase.poolable: + e = create_engine( + dsn, + max_overflow=50, + pool_pre_ping=True, + pool_size=20, + pool_recycle=10, + echo=debug, + ) + else: + e = create_engine( + dsn, + echo=debug, + ) + + SessionBase._set_engine(e) + + + @staticmethod + def disconnect(): + """Disconnect from database and free resources. + """ + SessionBase.engine.dispose() + SessionBase.engine = None diff --git a/cic_syncer/db/models/sync.py b/cic_syncer/db/models/sync.py new file mode 100644 index 0000000..c920a89 --- /dev/null +++ b/cic_syncer/db/models/sync.py @@ -0,0 +1,168 @@ +# standard imports +import datetime + +# third-party imports +from sqlalchemy import Column, String, Integer, DateTime, Text, Boolean +from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method + +# local imports +from .base import SessionBase + + +class BlockchainSync(SessionBase): + """Syncer control backend. + + :param chain: Chain spec string representation + :type chain: str + :param block_start: Block number to start sync from + :type block_start: number + :param tx_start: Block transaction number to start sync from + :type tx_start: number + :param block_target: Block number to sync until, inclusive + :type block_target: number + """ + __tablename__ = 'blockchain_sync' + + blockchain = Column(String) + block_start = Column(Integer) + tx_start = Column(Integer) + block_cursor = Column(Integer) + tx_cursor = Column(Integer) + block_target = Column(Integer) + date_created = Column(DateTime, default=datetime.datetime.utcnow) + date_updated = Column(DateTime) + + + @staticmethod + def first(chain, session=None): + """Check if a sync session for the specified chain already exists. + + :param chain: Chain spec string representation + :type chain: str + :param session: Session to use. If not specified, a separate session will be created for this method only. + :type session: SqlAlchemy Session + :returns: True if sync record found + :rtype: bool + """ + local_session = False + if session == None: + session = SessionBase.create_session() + local_session = True + q = session.query(BlockchainSync.id) + q = q.filter(BlockchainSync.blockchain==chain) + o = q.first() + if local_session: + session.close() + return o == None + + + @staticmethod + def get_last_live_height(current, session=None): + """Get the most recent open-ended ("live") syncer record. + + :param current: Current block number + :type current: number + :param session: Session to use. If not specified, a separate session will be created for this method only. + :type session: SqlAlchemy Session + :returns: Block and transaction number, respectively + :rtype: tuple + """ + local_session = False + if session == None: + session = SessionBase.create_session() + local_session = True + q = session.query(BlockchainSync) + q = q.filter(BlockchainSync.block_target==None) + q = q.order_by(BlockchainSync.date_created.desc()) + o = q.first() + if local_session: + session.close() + + if o == None: + return (0, 0) + + return (o.block_cursor, o.tx_cursor) + + + @staticmethod + def get_unsynced(session=None): + """Get previous bounded sync sessions that did not complete. + + :param session: Session to use. If not specified, a separate session will be created for this method only. + :type session: SqlAlchemy Session + :returns: Syncer database ids + :rtype: tuple, where first element is id + """ + unsynced = [] + local_session = False + if session == None: + session = SessionBase.create_session() + local_session = True + q = session.query(BlockchainSync.id) + q = q.filter(BlockchainSync.block_target!=None) + q = q.filter(BlockchainSync.block_cursor