From 0c0d1fac8fc381d20282bce69800954e74e39b4f Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 26 Aug 2021 17:14:11 +0200 Subject: [PATCH] Implement error parser to rpc instantation --- chaind_eth/runnable/retry.py | 113 ++++++++++++++++++++++++++++++++++ chaind_eth/runnable/server.py | 33 ++++++---- chainqueue/adapters/eth.py | 31 +--------- requirements.txt | 2 +- setup.cfg | 2 +- 5 files changed, 140 insertions(+), 41 deletions(-) create mode 100644 chaind_eth/runnable/retry.py diff --git a/chaind_eth/runnable/retry.py b/chaind_eth/runnable/retry.py new file mode 100644 index 0000000..8d91b2c --- /dev/null +++ b/chaind_eth/runnable/retry.py @@ -0,0 +1,113 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# standard imports +import os +import logging +import sys +import datetime + +# external imports +from hexathon import ( + add_0x, + strip_0x, + ) +from chaind import Environment +import chainlib.eth.cli +from chainlib.chain import ChainSpec +from chainqueue.db import dsn_from_config +from chainqueue.sql.backend import SQLBackend +from chainqueue.enum import StatusBits +from chaind.sql.session import SessionIndex +from chainqueue.adapters.eth import EthAdapter +from chainlib.eth.gas import price +from chainlib.eth.connection import EthHTTPConnection +from crypto_dev_signer.eth.transaction import EIP155Transaction + +DEFAULT_GAS_FACTOR = 1.1 + + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.dirname(os.path.realpath(__file__)) +config_dir = os.path.join(script_dir, '..', 'data', 'config') + +arg_flags = chainlib.eth.cli.argflag_std_write +argparser = chainlib.eth.cli.ArgumentParser(arg_flags) +argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")') +argparser.add_positional('session_id', required=False, type=str, help='Ethereum address of recipient') +args = argparser.parse_args() +extra_args = { + 'backend': None, + 'session_id': 'SESSION_ID', + } + +env = Environment(domain='eth', env=os.environ) + +config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) + +if config.get('SESSION_DATA_DIR') == None: + config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True) + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + +tx_getter = None +session_method = None +if config.get('_BACKEND') == 'sql': + from chainqueue.sql.query import get_tx_cache as tx_getter + from chainqueue.runnable.sql import setup_backend + from chainqueue.db.models.base import SessionBase + setup_backend(config, debug=config.true('DATABASE_DEBUG')) + session_method = SessionBase.create_session +else: + raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND'))) + +if config.get('DATABASE_ENGINE') == 'sqlite': + config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + +wallet = chainlib.eth.cli.Wallet() +wallet.from_config(config) + +rpc = chainlib.eth.cli.Rpc(wallet=wallet) +conn = rpc.connect_by_config(config) + +dsn = dsn_from_config(config) +backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'), error_parser=rpc.error_parser) +session_index_backend = SessionIndex(config.get('SESSION_ID')) +adapter = EthAdapter(backend, session_index_backend=session_index_backend) + + +def main(): + before = datetime.datetime.utcnow() - adapter.pending_retry_threshold + txs = session_index_backend.get(chain_spec, adapter, status=StatusBits.IN_NETWORK, not_status=StatusBits.FINAL | StatusBits.OBSOLETE, before=before) + + o = price() + r = conn.do(o, error_parser=rpc.error_parser) + gas_price = strip_0x(r) + try: + gas_price = int(gas_price, 16) + except ValueError: + gas_price = int(gas_price) + logg.info('got current gas price {}'.format(gas_price)) + + signer = rpc.get_signer() + + db_session = adapter.create_session() + for tx_hash in txs: + tx_bytes = bytes.fromhex(strip_0x(txs[tx_hash])) + tx = adapter.translate(tx_bytes, chain_spec) + tx_gas_price = int(tx['gasPrice']) + if tx_gas_price < gas_price: + tx['gasPrice'] = gas_price + else: + tx['gasPrice'] = int(tx['gasPrice'] * DEFAULT_GAS_FACTOR) + tx_obj = EIP155Transaction(tx, tx['nonce'], chain_spec.chain_id()) + new_tx_bytes = signer.sign_transaction_to_wire(tx_obj) + logg.debug('add tx {} with gas price changed from {} to {}: {}'.format(tx_hash, tx_gas_price, tx['gasPrice'], new_tx_bytes.hex())) + adapter.add(new_tx_bytes, chain_spec, session=db_session) + + db_session.close() + + +if __name__ == '__main__': + main() diff --git a/chaind_eth/runnable/server.py b/chaind_eth/runnable/server.py index 08f43be..3c904b2 100644 --- a/chaind_eth/runnable/server.py +++ b/chaind_eth/runnable/server.py @@ -17,6 +17,7 @@ from chainlib.eth.connection import EthHTTPConnection from chainqueue.sql.backend import SQLBackend from chainlib.error import JSONRPCException from chainqueue.db import dsn_from_config +from chaind.sql.session import SessionIndex # local imports from chaind_eth.dispatch import Dispatcher @@ -59,7 +60,7 @@ if not config.get('SESSION_SOCKET_PATH'): if config.get('DATABASE_ENGINE') == 'sqlite': #config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) - config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME')), 'DATABASE_NAME', exists_ok=True) + config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) config.censor('PASSWORD', 'DATABASE') logg.debug('config loaded:\n{}'.format(config)) @@ -116,10 +117,14 @@ signal.signal(signal.SIGTERM, ctrl.shutdown) chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) +rpc = chainlib.eth.cli.Rpc() +conn = rpc.connect_by_config(config) + +logg.debug('error {}'.format(rpc.error_parser)) dsn = dsn_from_config(config) -backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG')) -adapter = EthAdapter(backend) -rpc = EthHTTPConnection(url=config.get('RPC_HTTP_PROVIDER'), chain_spec=chain_spec) +backend = SQLBackend(dsn, error_parser=rpc.error_parser, debug=config.true('DATABASE_DEBUG')) +session_index_backend = SessionIndex(config.get('SESSION_ID')) +adapter = EthAdapter(backend, session_index_backend=session_index_backend) def process_outgoing(chain_spec, adapter, rpc): @@ -148,7 +153,7 @@ def main(): break if srvs == None: logg.debug('timeout (remote socket is none)') - r = process_outgoing(chain_spec, adapter, rpc) + r = process_outgoing(chain_spec, adapter, conn) if r > 0: ctrl.srv.settimeout(0.1) else: @@ -174,15 +179,21 @@ def main(): logg.debug('recv {} bytes'.format(len(data))) session = backend.create_session() + tx_hash = None try: - r = adapter.add(data, chain_spec, session=session) - try: - r = srvs.send(r.to_bytes(4, byteorder='big')) - logg.debug('{} bytes sent'.format(r)) - except BrokenPipeError: - logg.debug('they just hung up. how rude.') + tx_hash = adapter.add(data, chain_spec, session=session) except ValueError as e: logg.error('invalid input: {}'.format(e)) + + r = 1 + if tx_hash != None: + session.commit() + r = 0 + try: + r = srvs.send(r.to_bytes(4, byteorder='big')) + logg.debug('{} bytes sent'.format(r)) + except BrokenPipeError: + logg.debug('they just hung up. how rude.') session.close() srvs.close() diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py index 118cf44..fb28719 100644 --- a/chainqueue/adapters/eth.py +++ b/chainqueue/adapters/eth.py @@ -1,6 +1,5 @@ # standard imports import logging -import datetime # external imports from chainlib.eth.constant import ZERO_ADDRESS @@ -12,18 +11,15 @@ from hexathon import ( add_0x, strip_0x, ) -from chainqueue.enum import ( - StatusBits, - errors as queue_errors, - ) # local imports -from chainqueue.adapters.base import Adapter +from chainqueue.adapters.sessionindex import SessionIndexAdapter logg = logging.getLogger(__name__) -class EthAdapter(Adapter): +class EthAdapter(SessionIndexAdapter): + def translate(self, bytecode, chain_spec): logg.debug('bytecode {}'.format(bytecode)) @@ -41,27 +37,6 @@ class EthAdapter(Adapter): return r - def upcoming(self, chain_spec, session=None): - txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK) - before = datetime.datetime.utcnow() - self.error_retry_threshold - errored_txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.LOCAL_ERROR, not_status=StatusBits.FINAL, before=before, requeue=True) - for tx_hash in errored_txs.keys(): - txs[tx_hash] = errored_txs[tx_hash] - return txs - - - def add(self, bytecode, chain_spec, session=None): - tx = self.translate(bytecode, chain_spec) - r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) - if r: - session.rollback() - session.close() - return r - r = self.backend.cache(tx, session=session) - session.commit() - return r - - # def cache(self, chain_spec): # session = self.backend.create_session() # r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) diff --git a/requirements.txt b/requirements.txt index 01fcbb1..aa54925 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -chaind<=0.0.2,>=0.0.2a3 +chaind<=0.0.3,>=0.0.3a1 hexathon~=0.0.1a8 chainlib-eth<=0.1.0,>=0.0.9a4 diff --git a/setup.cfg b/setup.cfg index dc3bc00..d20fde3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chaind-eth -version = 0.0.2a1 +version = 0.0.3a1 description = Queue server for ethereum author = Louis Holbrook author_email = dev@holbrook.no