diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py deleted file mode 100644 index c5d872f..0000000 --- a/chainqueue/adapters/eth.py +++ /dev/null @@ -1,57 +0,0 @@ -# external imports -from chainlib.eth.constant import ZERO_ADDRESS -from chainlib.eth.tx import ( - unpack, - raw, - ) -from hexathon import ( - add_0x, - strip_0x, - ) - -# local imports -from chainqueue.adapters.base import Adapter -from chainqueue.enum import StatusBits - - -class EthAdapter(Adapter): - - def translate(self, bytecode, chain_spec): - tx = unpack(bytecode, chain_spec) - tx['source_token'] = ZERO_ADDRESS - tx['destination_token'] = ZERO_ADDRESS - tx['from_value'] = tx['value'] - tx['to_value'] = tx['value'] - return tx - - - def add(self, chain_spec, bytecode): - tx = self.translate(bytecode, chain_spec) - session = self.backend.create_session() - r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) - if r: - session.rollback() - session.close() - return r - r = self.backend.cache(tx, session=session) - session.commit() - session.close() - return r - - - def cache(self, chain_spec): - session = self.backend.create_session() - r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) - session.close() - - - def upcoming(self, chain_spec): - return self.backend.get(chain_spec, StatusBits.QUEUED, unpack) - - - def dispatch(self, chain_spec, rpc, tx_hash, signed_tx): - o = raw(signed_tx) - session = self.backend.create_session() - r = self.backend.dispatch(chain_spec, rpc, tx_hash, o) - session.close() - return r diff --git a/chainqueue/runnable/server.py b/chainqueue/runnable/server.py deleted file mode 100644 index 4c8bd52..0000000 --- a/chainqueue/runnable/server.py +++ /dev/null @@ -1,183 +0,0 @@ -import syslog -import sys -import time -import socket -import signal -import os -import logging -import stat -import argparse -import uuid - -# external imports -import confini -from xdg.BaseDirectory import ( - xdg_data_dirs, - get_runtime_dir, - load_first_config, - ) -from hexathon import strip_0x -from chainlib.chain import ChainSpec -from chainlib.eth.connection import EthHTTPConnection - -# local imports -from chainqueue.sql.backend import SQLBackend -from chainqueue.db import dsn_from_config -from chainqueue.adapters.eth import EthAdapter - -logging.basicConfig(level=logging.WARNING) -logg = logging.getLogger() - -default_config_dir = load_first_config('chainqueue') -config_dir = os.environ.get('CONFINI_DIR', default_config_dir) - -default_runtime_dir = os.path.join(get_runtime_dir(), 'chainqueue') -default_data_dir = os.path.join(xdg_data_dirs[0], 'chainqueue') - -argparser = argparse.ArgumentParser('chainqueue transaction submission and trigger server') -argparser.add_argument('-c', '--config', dest='c', type=str, default=config_dir, help='configuration directory') -argparser.add_argument('-p', type=str, help='rpc endpoint') -argparser.add_argument('-i', type=str, help='chain spec') -argparser.add_argument('--runtime-dir', dest='runtime_dir', type=str, default=default_runtime_dir, help='runtime directory') -argparser.add_argument('--data-dir', dest='data_dir', type=str, default=default_data_dir, help='data directory') -argparser.add_argument('--session-id', dest='session_id', type=str, default=str(uuid.uuid4()), help='session id to use for session') -argparser.add_argument('-v', action='store_true', help='be verbose') -argparser.add_argument('-vv', action='store_true', help='be very verbose') -args = argparser.parse_args(sys.argv[1:]) - -if args.vv: - logg.setLevel(logging.DEBUG) -elif args.v: - logg.setLevel(logging.INFO) - -# process config -config = confini.Config(args.c) -config.process() -args_override = { - 'SESSION_RUNTIME_DIR': getattr(args, 'runtime_dir'), - 'SESSION_CHAIN_SPEC': getattr(args, 'i'), - 'RPC_ENDPOINT': getattr(args, 'p'), - 'PATH_DATA': getattr(args, 'data_dir'), - } -config.dict_override(args_override, 'cli args') -config.add(getattr(args, 'session_id'), '_SESSION_ID', True) - -if not config.get('SESSION_SOCKET_PATH'): - socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('_SESSION_ID'), 'chainqueue.sock') - config.add(socket_path, 'SESSION_SOCKET_PATH', True) - -if config.get('DATABASE_ENGINE') == 'sqlite': - config.add(os.path.join(config.get('PATH_DATA'), config.get('DATABASE_NAME')), 'DATABASE_NAME', True) - - -logg.debug('config loaded:\n{}'.format(config)) - - -# verify setup -try: - os.stat(config.get('DATABASE_NAME')) -except FileNotFoundError: - sys.stderr.write('database file {} not found. please run database migration script first'.format(config.get('DATABASE_NAME'))) - sys.exit(1) - - -class SessionController: - - def __init__(self, config): - self.dead = False - os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True) - try: - os.unlink(config.get('SESSION_SOCKET_PATH')) - except FileNotFoundError: - pass - - self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) - self.srv.bind(config.get('SESSION_SOCKET_PATH')) - self.srv.listen(2) - self.srv.settimeout(4.0) - - def shutdown(self, signo, frame): - if self.dead: - return - self.dead = True - if signo != None: - syslog.syslog('closing on {}'.format(signo)) - else: - syslog.syslog('explicit shutdown') - sockname = self.srv.getsockname() - self.srv.close() - try: - os.unlink(sockname) - except FileNotFoundError: - logg.warning('socket file {} already gone'.format(sockname)) - - - def get_connection(self): - return self.srv.accept() - - -ctrl = SessionController(config) - -signal.signal(signal.SIGINT, ctrl.shutdown) -signal.signal(signal.SIGTERM, ctrl.shutdown) - -chain_spec = ChainSpec.from_chain_str(config.get('SESSION_CHAIN_SPEC')) - -dsn = dsn_from_config(config) -backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG')) -adapter = EthAdapter(backend) -rpc = EthHTTPConnection(url=config.get('RPC_ENDPOINT'), chain_spec=chain_spec) - -if __name__ == '__main__': - havesends = 0 - while True: - srvs = None - try: - logg.debug('getting connection') - (srvs, srvs_addr) = ctrl.get_connection() - except OSError as e: - havesends = 0 - try: - fi = os.stat(config.get('SESSION_SOCKET_PATH')) - except FileNotFoundError: - logg.error('socket is gone') - break - if not stat.S_ISSOCK(fi.st_mode): - logg.error('entity on socket path is not a socket') - break - if srvs == None: - logg.debug('timeout (remote socket is none)') - txs = adapter.upcoming(chain_spec) - for k in txs.keys(): - havesends += 1 - logg.debug('txs {} {}'.format(k, txs[k])) - adapter.dispatch(chain_spec, rpc, k, txs[k]) - if havesends > 0: - ctrl.srv.settimeout(0.1) - else: - ctrl.srv.settimeout(4.0) - continue - ctrl.srv.settimeout(0.1) - srvs.settimeout(0.1) - data_in = None - try: - data_in = srvs.recv(1024) - except BlockingIOError as e: - logg.debug('block io error: {}'.format(e)) - continue - - data = None - try: - data_in_str = data_in.decode('utf-8') - data = bytes.fromhex(strip_0x(data_in_str)) - except ValueError: - logg.error('invalid input "{}"'.format(data_in_str)) - continue - - logg.debug('recv {} bytes'.format(len(data))) - r = adapter.add(chain_spec, data) - r = srvs.send(r.to_bytes(4, byteorder='big')) - logg.debug('{} bytes sent'.format(r)) - srvs.close() - -ctrl.shutdown(None, None) diff --git a/setup.cfg b/setup.cfg index 2485138..ac165f2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,8 @@ packages = chainqueue.db.models chainqueue.db.migrations chainqueue.sql + chainqueue.adapters #[options.entry_points] #console_scripts = +# chainqueue-eth-server = chainqueue.runnable.server_eth:main