diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py index a65a1fa..c5d872f 100644 --- a/chainqueue/adapters/eth.py +++ b/chainqueue/adapters/eth.py @@ -2,6 +2,7 @@ from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.tx import ( unpack, + raw, ) from hexathon import ( add_0x, @@ -15,8 +16,7 @@ from chainqueue.enum import StatusBits class EthAdapter(Adapter): - - def translate(self, chain_spec, bytecode): + def translate(self, bytecode, chain_spec): tx = unpack(bytecode, chain_spec) tx['source_token'] = ZERO_ADDRESS tx['destination_token'] = ZERO_ADDRESS @@ -26,7 +26,7 @@ class EthAdapter(Adapter): def add(self, chain_spec, bytecode): - tx = self.translate(chain_spec, bytecode) + tx = self.translate(bytecode, chain_spec) session = self.backend.create_session() r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) if r: @@ -45,5 +45,13 @@ class EthAdapter(Adapter): session.close() - def process(self, chain_spec): + def upcoming(self, chain_spec): return self.backend.get(chain_spec, StatusBits.QUEUED, unpack) + + + def dispatch(self, chain_spec, rpc, tx_hash, signed_tx): + o = raw(signed_tx) + session = self.backend.create_session() + r = self.backend.dispatch(chain_spec, rpc, tx_hash, o) + session.close() + return r diff --git a/chainqueue/runnable/server.py b/chainqueue/runnable/server.py index 044b808..e92a2c6 100644 --- a/chainqueue/runnable/server.py +++ b/chainqueue/runnable/server.py @@ -17,6 +17,7 @@ from xdg.BaseDirectory import ( ) from hexathon import strip_0x from chainlib.chain import ChainSpec +from chainlib.eth.connection import EthHTTPConnection # local imports from chainqueue.sql.backend import SQLBackend @@ -33,6 +34,8 @@ runtime_dir = os.path.join(get_runtime_dir(), 'chainqueue') argparser = argparse.ArgumentParser('chainqueue transaction submission and trigger server') argparser.add_argument('-c', '--config', dest='c', type=str, default=config_dir, help='configuration directory') +argparser.add_argument('-p', type=str, help='rpc endpoint') +argparser.add_argument('-i', type=str, help='chain spec') argparser.add_argument('--runtime-dir', dest='runtime_dir', type=str, default=runtime_dir, help='runtime directory') argparser.add_argument('--session-id', dest='session_id', type=str, default=str(uuid.uuid4()), help='session id to use for session') argparser.add_argument('-v', action='store_true', help='be verbose') @@ -48,6 +51,8 @@ config = confini.Config(args.c) config.process() args_override = { 'SESSION_RUNTIME_DIR': getattr(args, 'runtime_dir'), + 'SESSION_CHAIN_SPEC': getattr(args, 'i'), + 'RPC_ENDPOINT': getattr(args, 'p'), } config.dict_override(args_override, 'cli args') config.add(getattr(args, 'session_id'), '_SESSION_ID', True) @@ -98,10 +103,12 @@ ctrl = SessionController(config) signal.signal(signal.SIGINT, ctrl.shutdown) signal.signal(signal.SIGTERM, ctrl.shutdown) +chain_spec = ChainSpec.from_chain_str(config.get('SESSION_CHAIN_SPEC')) + dsn = dsn_from_config(config) backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG')) adapter = EthAdapter(backend) -chain_spec = ChainSpec.from_chain_str('evm:mainnet:1') +rpc = EthHTTPConnection(url=config.get('RPC_ENDPOINT'), chain_spec=chain_spec) if __name__ == '__main__': while True: @@ -118,9 +125,10 @@ if __name__ == '__main__': logg.error('entity on socket path is not a socket') break if srvs == None: - txs = adapter.process(chain_spec) + txs = adapter.upcoming(chain_spec) for k in txs.keys(): logg.debug('txs {} {}'.format(k, txs[k])) + adapter.dispatch(chain_spec, rpc, k, txs[k]) continue srvs.setblocking(False) data_in = srvs.recv(1024) diff --git a/chainqueue/sql/backend.py b/chainqueue/sql/backend.py index 4f12d40..581aa4a 100644 --- a/chainqueue/sql/backend.py +++ b/chainqueue/sql/backend.py @@ -5,13 +5,18 @@ import logging from sqlalchemy.exc import ( IntegrityError, ) +from chainlib.error import JSONRPCException # local imports from chainqueue.sql.tx import create as queue_create from chainqueue.db.models.base import SessionBase from chainqueue.db.models.tx import TxCache from chainqueue.sql.query import get_upcoming_tx -from chainqueue.sql.state import set_ready +from chainqueue.sql.state import ( + set_ready, + set_reserved, + set_sent, + ) logg = logging.getLogger(__name__) @@ -44,6 +49,23 @@ class SQLBackend: txs = get_upcoming_tx(chain_spec, typ, decoder=decoder) return txs + + def dispatch(self, chain_spec, rpc, tx_hash, payload, session=None): + set_reserved(chain_spec, tx_hash, session=session) + fail = False + r = 1 + try: + rpc.do(payload) + r = 0 + except ConnectionError: + fail = True + except JSONRPCException as e: + logg.error('error! {}'.format(e)) + + logg.debug('foo') + set_sent(chain_spec, tx_hash, fail=fail, session=session) + return r + def create_session(self): return SessionBase.create_session()