Rename module

This commit is contained in:
nolash
2021-02-11 09:02:17 +01:00
parent 88df1418b0
commit 00725ebc7c
14 changed files with 52 additions and 74 deletions

216
chainsyncer/backend.py Normal file
View File

@@ -0,0 +1,216 @@
# standard imports
import logging
import uuid
# local imports
from chainsyncer.db.models.sync import BlockchainSync
from chainsyncer.db.models.base import SessionBase
logg = logging.getLogger()
class SyncerBackend:
"""Interface to block and transaction sync state.
:param chain_spec: Chain spec for the chain that syncer is running for.
:type chain_spec: cic_registry.chain.ChainSpec
:param object_id: Unique id for the syncer session.
:type object_id: number
"""
def __init__(self, chain_spec, object_id):
self.db_session = None
self.db_object = None
self.chain_spec = chain_spec
self.object_id = object_id
self.connect()
self.disconnect()
def connect(self):
"""Loads the state of the syncer session with the given id.
"""
self.db_session = SessionBase.create_session()
q = self.db_session.query(BlockchainSync)
q = q.filter(BlockchainSync.id==self.object_id)
self.db_object = q.first()
if self.db_object == None:
raise ValueError('sync entry with id {} not found'.format(self.object_id))
def disconnect(self):
"""Commits state of sync to backend.
"""
self.db_session.add(self.db_object)
self.db_session.commit()
self.db_session.close()
def chain(self):
"""Returns chain spec for syncer
:returns: Chain spec
:rtype chain_spec: cic_registry.chain.ChainSpec
"""
return self.chain_spec
def get(self):
"""Get the current state of the syncer cursor.
:returns: Block and block transaction height, respectively
:rtype: tuple
"""
self.connect()
pair = self.db_object.cursor()
self.disconnect()
return pair
def set(self, block_height, tx_height):
"""Update the state of the syncer cursor
:param block_height: Block height of cursor
:type block_height: number
:param tx_height: Block transaction height of cursor
:type tx_height: number
:returns: Block and block transaction height, respectively
:rtype: tuple
"""
self.connect()
pair = self.db_object.set(block_height, tx_height)
self.disconnect()
return pair
def start(self):
"""Get the initial state of the syncer cursor.
:returns: Initial block and block transaction height, respectively
:rtype: tuple
"""
self.connect()
pair = self.db_object.start()
self.disconnect()
return pair
def target(self):
"""Get the target state (upper bound of sync) of the syncer cursor.
:returns: Target block height
:rtype: number
"""
self.connect()
target = self.db_object.target()
self.disconnect()
return target
@staticmethod
def first(chain):
"""Returns the model object of the most recent syncer in backend.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:returns: Last syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
return BlockchainSync.first(chain)
@staticmethod
def initial(chain, block_height):
"""Creates a new syncer session and commit its initial state to backend.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: New syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
object_id = None
session = SessionBase.create_session()
o = BlockchainSync(chain, 0, 0, block_height)
session.add(o)
session.commit()
object_id = o.id
session.close()
return SyncerBackend(chain, object_id)
@staticmethod
def resume(chain, block_height):
"""Retrieves and returns all previously unfinished syncer sessions.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: Syncer objects of unfinished syncs
:rtype: list of cic_eth.db.models.BlockchainSync
"""
syncers = []
session = SessionBase.create_session()
object_id = None
for object_id in BlockchainSync.get_unsynced(session=session):
logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id))
syncers.append(SyncerBackend(chain, object_id))
(block_resume, tx_resume) = BlockchainSync.get_last_live_height(block_height, session=session)
if block_height != block_resume:
o = BlockchainSync(chain, block_resume, tx_resume, block_height)
session.add(o)
session.commit()
object_id = o.id
syncers.append(SyncerBackend(chain, object_id))
logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height))
session.close()
return syncers
@staticmethod
def live(chain, block_height):
"""Creates a new open-ended syncer session starting at the given block height.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: "Live" syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
object_id = None
session = SessionBase.create_session()
o = BlockchainSync(chain, block_height, 0, None)
session.add(o)
session.commit()
object_id = o.id
session.close()
return SyncerBackend(chain, object_id)
class MemBackend:
def __init__(self, chain_spec, object_id):
self.object_id = object_id
self.chain_spec = chain_spec
self.block_height = 0
self.tx_height = 0
def set(self, block_height, tx_height):
logg.debug('stateless backend received {} {}'.format(block_height, tx_height))
self.block_height = block_height
self.tx_height = tx_height
def get(self):
return (self.block_height, self.tx_height)

View File

View File

@@ -0,0 +1,16 @@
class Block:
def __init__(self, hsh, obj):
self.hash = hsh
self.obj = obj
def tx(self, idx):
return NotImplementedError
def number(self):
return NotImplementedError

View File

@@ -0,0 +1,52 @@
import json
from chainsyncer.client import translate
from chainsyncer.client.block import Block
from chainsyncer.client.tx import Tx
translations = {
'block_number': translate.hex_to_int,
'get_block': json.dumps,
'number': translate.hex_to_int,
}
class EVMResponse:
def __init__(self, item, response_object):
self.response_object = response_object
self.item = item
self.fn = translations[self.item]
def get_error(self):
return self.response_object.get('error')
def get_result(self):
r = self.fn(self.response_object.get('result'))
if r == 'null':
return None
return r
class EVMTx(Tx):
def __init__(self, block, tx_number, obj):
super(EVMTx, self).__init__(block, tx_number, obj)
class EVMBlock(Block):
def tx(self, idx):
o = self.obj['transactions'][idx]
return Tx(self, idx, o)
def number(self):
return translate.hex_to_int(self.obj['number'])
def __str__(self):
return str('block {} {}'.format(self.number(), self.hash))

View File

@@ -0,0 +1,90 @@
# standard imports
import logging
import uuid
import json
# third-party imports
import websocket
from hexathon import add_0x
# local imports
from .response import EVMResponse
from chainsyncer.error import RequestError
from chainsyncer.client.evm.response import EVMBlock
logg = logging.getLogger()
class EVMWebsocketClient:
def __init__(self, url):
self.url = url
self.conn = websocket.create_connection(url)
def __del__(self):
self.conn.close()
def block_number(self):
req_id = str(uuid.uuid4())
req = {
'jsonrpc': '2.0',
'method': 'eth_blockNumber',
'id': str(req_id),
'params': [],
}
self.conn.send(json.dumps(req))
r = self.conn.recv()
res = EVMResponse('block_number', json.loads(r))
err = res.get_error()
if err != None:
raise RequestError(err)
return res.get_result()
def block_by_integer(self, n):
req_id = str(uuid.uuid4())
nhx = '0x' + n.to_bytes(8, 'big').hex()
req = {
'jsonrpc': '2.0',
'method': 'eth_getBlockByNumber',
'id': str(req_id),
'params': [nhx, False],
}
self.conn.send(json.dumps(req))
r = self.conn.recv()
res = EVMResponse('get_block', json.loads(r))
err = res.get_error()
if err != None:
raise RequestError(err)
j = res.get_result()
if j == None:
return None
o = json.loads(j)
return EVMBlock(o['hash'], o)
def block_by_hash(self, hx_in):
req_id = str(uuid.uuid4())
hx = add_0x(hx_in)
req ={
'jsonrpc': '2.0',
'method': 'eth_getBlockByHash',
'id': str(req_id),
'params': [hx, False],
}
self.conn.send(json.dumps(req))
r = self.conn.recv()
res = EVMResponse('get_block', json.loads(r))
err = res.get_error()
if err != None:
raise RequestError(err)
j = res.get_result()
if j == None:
return None
o = json.loads(j)
return EVMBlock(o['hash'], o)

View File

@@ -0,0 +1,10 @@
# third-party imports
from hexathon import strip_0x
def hex_to_int(hx, endianness='big'):
hx = strip_0x(hx)
if len(hx) % 2 == 1:
hx = '0' + hx
b = bytes.fromhex(hx)
return int.from_bytes(b, endianness)

6
chainsyncer/client/tx.py Normal file
View File

@@ -0,0 +1,6 @@
class Tx:
def __init__(self, block, tx_number, obj):
self.block = block
self.tx_number = tx_number
self.obj = obj

View File

@@ -0,0 +1,53 @@
# standard imports
import os
import logging
# local imports
from chainsyncer.db.models.base import SessionBase
logg = logging.getLogger()
def dsn_from_config(config):
"""Generate a dsn string from the provided config dict.
The config dict must include all well-known database connection parameters, and must implement the method "get(key)" to retrieve them. Any missing parameters will be be rendered as the literal string "None"
:param config: Configuration object
:type config: Varies
:returns: dsn string
:rtype: str
"""
scheme = config.get('DATABASE_ENGINE')
if config.get('DATABASE_DRIVER') != None:
scheme += '+{}'.format(config.get('DATABASE_DRIVER'))
dsn = ''
dsn_out = ''
if config.get('DATABASE_ENGINE') == 'sqlite':
dsn = '{}:///{}'.format(
scheme,
config.get('DATABASE_NAME'),
)
dsn_out = dsn
else:
dsn = '{}://{}:{}@{}:{}/{}'.format(
scheme,
config.get('DATABASE_USER'),
config.get('DATABASE_PASSWORD'),
config.get('DATABASE_HOST'),
config.get('DATABASE_PORT'),
config.get('DATABASE_NAME'),
)
dsn_out = '{}://{}:{}@{}:{}/{}'.format(
scheme,
config.get('DATABASE_USER'),
'***',
config.get('DATABASE_HOST'),
config.get('DATABASE_PORT'),
config.get('DATABASE_NAME'),
)
logg.debug('parsed dsn from config: {}'.format(dsn_out))
return dsn

View File

@@ -0,0 +1,73 @@
# third-party imports
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
Model = declarative_base(name='Model')
class SessionBase(Model):
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
"""
__abstract__ = True
id = Column(Integer, primary_key=True)
engine = None
"""Database connection engine of the running aplication"""
sessionmaker = None
"""Factory object responsible for creating sessions from the connection pool"""
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
@staticmethod
def create_session():
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
"""
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@staticmethod
def connect(dsn, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
:type dsn: str
"""
e = None
if SessionBase.poolable:
e = create_engine(
dsn,
max_overflow=50,
pool_pre_ping=True,
pool_size=20,
pool_recycle=10,
echo=debug,
)
else:
e = create_engine(
dsn,
echo=debug,
)
SessionBase._set_engine(e)
@staticmethod
def disconnect():
"""Disconnect from database and free resources.
"""
SessionBase.engine.dispose()
SessionBase.engine = None

View File

@@ -0,0 +1,168 @@
# standard imports
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer, DateTime, Text, Boolean
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
class BlockchainSync(SessionBase):
"""Syncer control backend.
:param chain: Chain spec string representation
:type chain: str
:param block_start: Block number to start sync from
:type block_start: number
:param tx_start: Block transaction number to start sync from
:type tx_start: number
:param block_target: Block number to sync until, inclusive
:type block_target: number
"""
__tablename__ = 'chain_sync'
blockchain = Column(String)
block_start = Column(Integer)
tx_start = Column(Integer)
block_cursor = Column(Integer)
tx_cursor = Column(Integer)
block_target = Column(Integer)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
date_updated = Column(DateTime)
@staticmethod
def first(chain, session=None):
"""Check if a sync session for the specified chain already exists.
:param chain: Chain spec string representation
:type chain: str
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
:returns: True if sync record found
:rtype: bool
"""
local_session = False
if session == None:
session = SessionBase.create_session()
local_session = True
q = session.query(BlockchainSync.id)
q = q.filter(BlockchainSync.blockchain==chain)
o = q.first()
if local_session:
session.close()
return o == None
@staticmethod
def get_last_live_height(current, session=None):
"""Get the most recent open-ended ("live") syncer record.
:param current: Current block number
:type current: number
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
:returns: Block and transaction number, respectively
:rtype: tuple
"""
local_session = False
if session == None:
session = SessionBase.create_session()
local_session = True
q = session.query(BlockchainSync)
q = q.filter(BlockchainSync.block_target==None)
q = q.order_by(BlockchainSync.date_created.desc())
o = q.first()
if local_session:
session.close()
if o == None:
return (0, 0)
return (o.block_cursor, o.tx_cursor)
@staticmethod
def get_unsynced(session=None):
"""Get previous bounded sync sessions that did not complete.
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
:returns: Syncer database ids
:rtype: tuple, where first element is id
"""
unsynced = []
local_session = False
if session == None:
session = SessionBase.create_session()
local_session = True
q = session.query(BlockchainSync.id)
q = q.filter(BlockchainSync.block_target!=None)
q = q.filter(BlockchainSync.block_cursor<BlockchainSync.block_target)
q = q.order_by(BlockchainSync.date_created.asc())
for u in q.all():
unsynced.append(u[0])
if local_session:
session.close()
return unsynced
def set(self, block_height, tx_height):
"""Set the height of the syncer instance.
Only manipulates object, does not transaction or commit to backend.
:param block_height: Block number
:type block_height: number
:param tx_height: Block transaction number
:type tx_height: number
"""
self.block_cursor = block_height
self.tx_cursor = tx_height
def cursor(self):
"""Get current state of cursor from cached instance.
:returns: Block and transaction height, respectively
:rtype: tuple
"""
return (self.block_cursor, self.tx_cursor)
def start(self):
"""Get sync block start position from cached instance.
:returns: Block and transaction height, respectively
:rtype: tuple
"""
return (self.block_start, self.tx_start)
def target(self):
"""Get sync block upper bound from cached instance.
:returns: Block number
:rtype: number, or None if sync is open-ended
"""
return self.block_target
def chain(self):
"""Get chain the cached instance represents.
"""
return self.blockchain
def __init__(self, chain, block_start, tx_start, block_target=None):
self.blockchain = chain
self.block_start = block_start
self.tx_start = tx_start
self.block_cursor = block_start
self.tx_cursor = tx_start
self.block_target = block_target
self.date_created = datetime.datetime.utcnow()
self.date_modified = datetime.datetime.utcnow()

81
chainsyncer/driver.py Normal file
View File

@@ -0,0 +1,81 @@
# standard imports
import uuid
import logging
import time
logg = logging.getLogger()
class Syncer:
running_global = True
def __init__(self, backend):
self.cursor = None
self.running = True
self.backend = backend
self.filter = []
def chain(self):
"""Returns the string representation of the chain spec for the chain the syncer is running on.
:returns: Chain spec string
:rtype: str
"""
return self.bc_cache.chain()
def add_filter(self, f):
self.filter.append(f)
class MinedSyncer(Syncer):
def __init__(self, backend):
super(MinedSyncer, self).__init__(backend)
def loop(self, interval, getter):
while self.running and Syncer.running_global:
while True:
block_hash = self.get(getter)
if block_hash == None:
break
self.process(getter, block_hash)
time.sleep(interval)
class HeadSyncer(MinedSyncer):
def __init__(self, backend):
super(HeadSyncer, self).__init__(backend)
def process(self, getter, block):
logg.debug('process {}'.format(block))
block = getter.block_by_hash(block.hash)
i = 0
tx = None
while True:
try:
#self.filter[0].handle(getter, block, None)
tx = block.tx(i)
logg.debug('tx {}'.format(tx))
self.backend.set(block.number(), i)
for f in self.filter:
f(getter, block, tx)
except IndexError as e:
self.backend.set(block.number() + 1, 0)
break
i += 1
def get(self, getter):
(block_number, tx_number) = self.backend.get()
block_hash = []
uu = uuid.uuid4()
res = getter.block_by_integer(block_number)
logg.debug('get {}'.format(res))
return res

8
chainsyncer/error.py Normal file
View File

@@ -0,0 +1,8 @@
class LoopDone(Exception):
"""Exception raised when a syncing is complete.
"""
pass
class RequestError(Exception):
pass

View File

@@ -0,0 +1,93 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# third-party imports
import confini
from chainlib.eth.connection import HTTPConnection
from chainsyncer.driver import HeadSyncer
from chainsyncer.db import dsn_from_config
from chainsyncer.db.models.base import SessionBase
from chainsyncer.backend import SyncerBackend
from chainsyncer.error import LoopDone
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = '/usr/local/etc/cic-syncer'
class Handler:
def __init__(self, method, domain):
self.method = method
self.domain = domain
def handle(self, getter, tx, chain):
logg.debug('noop tx {} chain {} method {} domain {}'.format(tx, chain, self.method, self.domain))
handler = getattr(Handler, 'handle')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
#app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
dsn = dsn_from_config(config)
SessionBase.connect(dsn)
c = HTTPConnection(config.get('ETH_PROVIDER'))
chain = config.get('CHAIN_SPEC')
def main():
block_offset = c.block_number()
syncer_backend = SyncerBackend.live(chain, 0)
syncer = HeadSyncer(syncer_backend)
try:
logg.debug('block offset {} {}'.format(block_offset, c))
syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), c)
except LoopDone as e:
sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e))
sys.exit(0)
if __name__ == '__main__':
main()