add tx dispatch

This commit is contained in:
nolash 2021-06-02 15:05:48 +02:00
parent 99af484f30
commit 3872d0d9b4
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
3 changed files with 45 additions and 7 deletions

View File

@ -2,6 +2,7 @@
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.tx import ( from chainlib.eth.tx import (
unpack, unpack,
raw,
) )
from hexathon import ( from hexathon import (
add_0x, add_0x,
@ -15,8 +16,7 @@ from chainqueue.enum import StatusBits
class EthAdapter(Adapter): class EthAdapter(Adapter):
def translate(self, bytecode, chain_spec):
def translate(self, chain_spec, bytecode):
tx = unpack(bytecode, chain_spec) tx = unpack(bytecode, chain_spec)
tx['source_token'] = ZERO_ADDRESS tx['source_token'] = ZERO_ADDRESS
tx['destination_token'] = ZERO_ADDRESS tx['destination_token'] = ZERO_ADDRESS
@ -26,7 +26,7 @@ class EthAdapter(Adapter):
def add(self, chain_spec, bytecode): def add(self, chain_spec, bytecode):
tx = self.translate(chain_spec, bytecode) tx = self.translate(bytecode, chain_spec)
session = self.backend.create_session() session = self.backend.create_session()
r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
if r: if r:
@ -45,5 +45,13 @@ class EthAdapter(Adapter):
session.close() session.close()
def process(self, chain_spec): def upcoming(self, chain_spec):
return self.backend.get(chain_spec, StatusBits.QUEUED, unpack) return self.backend.get(chain_spec, StatusBits.QUEUED, unpack)
def dispatch(self, chain_spec, rpc, tx_hash, signed_tx):
o = raw(signed_tx)
session = self.backend.create_session()
r = self.backend.dispatch(chain_spec, rpc, tx_hash, o)
session.close()
return r

View File

@ -17,6 +17,7 @@ from xdg.BaseDirectory import (
) )
from hexathon import strip_0x from hexathon import strip_0x
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.connection import EthHTTPConnection
# local imports # local imports
from chainqueue.sql.backend import SQLBackend from chainqueue.sql.backend import SQLBackend
@ -33,6 +34,8 @@ runtime_dir = os.path.join(get_runtime_dir(), 'chainqueue')
argparser = argparse.ArgumentParser('chainqueue transaction submission and trigger server') argparser = argparse.ArgumentParser('chainqueue transaction submission and trigger server')
argparser.add_argument('-c', '--config', dest='c', type=str, default=config_dir, help='configuration directory') argparser.add_argument('-c', '--config', dest='c', type=str, default=config_dir, help='configuration directory')
argparser.add_argument('-p', type=str, help='rpc endpoint')
argparser.add_argument('-i', type=str, help='chain spec')
argparser.add_argument('--runtime-dir', dest='runtime_dir', type=str, default=runtime_dir, help='runtime directory') argparser.add_argument('--runtime-dir', dest='runtime_dir', type=str, default=runtime_dir, help='runtime directory')
argparser.add_argument('--session-id', dest='session_id', type=str, default=str(uuid.uuid4()), help='session id to use for session') argparser.add_argument('--session-id', dest='session_id', type=str, default=str(uuid.uuid4()), help='session id to use for session')
argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-v', action='store_true', help='be verbose')
@ -48,6 +51,8 @@ config = confini.Config(args.c)
config.process() config.process()
args_override = { args_override = {
'SESSION_RUNTIME_DIR': getattr(args, 'runtime_dir'), 'SESSION_RUNTIME_DIR': getattr(args, 'runtime_dir'),
'SESSION_CHAIN_SPEC': getattr(args, 'i'),
'RPC_ENDPOINT': getattr(args, 'p'),
} }
config.dict_override(args_override, 'cli args') config.dict_override(args_override, 'cli args')
config.add(getattr(args, 'session_id'), '_SESSION_ID', True) config.add(getattr(args, 'session_id'), '_SESSION_ID', True)
@ -98,10 +103,12 @@ ctrl = SessionController(config)
signal.signal(signal.SIGINT, ctrl.shutdown) signal.signal(signal.SIGINT, ctrl.shutdown)
signal.signal(signal.SIGTERM, ctrl.shutdown) signal.signal(signal.SIGTERM, ctrl.shutdown)
chain_spec = ChainSpec.from_chain_str(config.get('SESSION_CHAIN_SPEC'))
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG')) backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'))
adapter = EthAdapter(backend) adapter = EthAdapter(backend)
chain_spec = ChainSpec.from_chain_str('evm:mainnet:1') rpc = EthHTTPConnection(url=config.get('RPC_ENDPOINT'), chain_spec=chain_spec)
if __name__ == '__main__': if __name__ == '__main__':
while True: while True:
@ -118,9 +125,10 @@ if __name__ == '__main__':
logg.error('entity on socket path is not a socket') logg.error('entity on socket path is not a socket')
break break
if srvs == None: if srvs == None:
txs = adapter.process(chain_spec) txs = adapter.upcoming(chain_spec)
for k in txs.keys(): for k in txs.keys():
logg.debug('txs {} {}'.format(k, txs[k])) logg.debug('txs {} {}'.format(k, txs[k]))
adapter.dispatch(chain_spec, rpc, k, txs[k])
continue continue
srvs.setblocking(False) srvs.setblocking(False)
data_in = srvs.recv(1024) data_in = srvs.recv(1024)

View File

@ -5,13 +5,18 @@ import logging
from sqlalchemy.exc import ( from sqlalchemy.exc import (
IntegrityError, IntegrityError,
) )
from chainlib.error import JSONRPCException
# local imports # local imports
from chainqueue.sql.tx import create as queue_create from chainqueue.sql.tx import create as queue_create
from chainqueue.db.models.base import SessionBase from chainqueue.db.models.base import SessionBase
from chainqueue.db.models.tx import TxCache from chainqueue.db.models.tx import TxCache
from chainqueue.sql.query import get_upcoming_tx from chainqueue.sql.query import get_upcoming_tx
from chainqueue.sql.state import set_ready from chainqueue.sql.state import (
set_ready,
set_reserved,
set_sent,
)
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -44,6 +49,23 @@ class SQLBackend:
txs = get_upcoming_tx(chain_spec, typ, decoder=decoder) txs = get_upcoming_tx(chain_spec, typ, decoder=decoder)
return txs return txs
def dispatch(self, chain_spec, rpc, tx_hash, payload, session=None):
set_reserved(chain_spec, tx_hash, session=session)
fail = False
r = 1
try:
rpc.do(payload)
r = 0
except ConnectionError:
fail = True
except JSONRPCException as e:
logg.error('error! {}'.format(e))
logg.debug('foo')
set_sent(chain_spec, tx_hash, fail=fail, session=session)
return r
def create_session(self): def create_session(self):
return SessionBase.create_session() return SessionBase.create_session()