From c33535379780a3e28d20a17973da5848aab98aa8 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 8 Sep 2021 17:17:49 +0200 Subject: [PATCH] Add resend script --- chaind_eth/cli/csv.py | 2 + chaind_eth/cli/output.py | 34 ++++++++++ chaind_eth/cli/process.py | 8 +-- chaind_eth/cli/retry.py | 81 ++++++++++++++++++++++++ chaind_eth/cli/tx.py | 23 +++++++ chaind_eth/runnable/resend.py | 110 +++++++++++++++++++++++++++++++++ chaind_eth/runnable/retry.py | 113 ---------------------------------- setup.cfg | 1 + 8 files changed, 255 insertions(+), 117 deletions(-) create mode 100644 chaind_eth/cli/output.py create mode 100644 chaind_eth/cli/retry.py create mode 100644 chaind_eth/cli/tx.py create mode 100644 chaind_eth/runnable/resend.py delete mode 100644 chaind_eth/runnable/retry.py diff --git a/chaind_eth/cli/csv.py b/chaind_eth/cli/csv.py index 874bafd..f70d2d7 100644 --- a/chaind_eth/cli/csv.py +++ b/chaind_eth/cli/csv.py @@ -16,6 +16,8 @@ class CSVProcessor: import csv # only import if needed fr = csv.reader(f) + f.close() + for r in fr: contents.append(r) l = len(contents) diff --git a/chaind_eth/cli/output.py b/chaind_eth/cli/output.py new file mode 100644 index 0000000..080c023 --- /dev/null +++ b/chaind_eth/cli/output.py @@ -0,0 +1,34 @@ +# standard imports +import logging +import socket +import enum + +logg = logging.getLogger(__name__) + + +class OpMode(enum.Enum): + STDOUT = 'standard_output' + UNIX = 'unix_socket' + +class Outputter: + + def __init__(self, mode): + self.out = getattr(self, 'do_' + mode.value) + + + def do(self, hx, *args, **kwargs): + return self.out(hx, *args, **kwargs) + + + def do_standard_output(self, hx, *args, **kwargs): + return hx + + + def do_unix_socket(self, hx, *args, **kwargs): + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(kwargs['socket']) + s.send(hx.encode('utf-8')) + r = s.recv(64+4) + logg.debug('r {}'.format(r)) + s.close() + return r[4:].decode('utf-8') diff --git a/chaind_eth/cli/process.py b/chaind_eth/cli/process.py index 0344bee..44baa59 100644 --- a/chaind_eth/cli/process.py +++ b/chaind_eth/cli/process.py @@ -45,10 +45,10 @@ class Processor: self.content = processor.load(self.source) if self.content != None: if process: - #try: - self.process() - #except Exception as e: - # raise TxSourceError('invalid source contents: {}'.format(str(e))) + try: + self.process() + except Exception as e: + raise TxSourceError('invalid source contents: {}'.format(str(e))) return self.content raise TxSourceError('unparseable source') diff --git a/chaind_eth/cli/retry.py b/chaind_eth/cli/retry.py new file mode 100644 index 0000000..acc43d0 --- /dev/null +++ b/chaind_eth/cli/retry.py @@ -0,0 +1,81 @@ +# standard imports +import logging + +# external imports +from chainlib.eth.gas import price +from chainlib.eth.tx import unpack +from chaind.error import TxSourceError +from crypto_dev_signer.eth.transaction import EIP155Transaction +from chainlib.eth.gas import Gas +from hexathon import ( + add_0x, + strip_0x, + ) + +# local imports +from chaind_eth.cli.tx import TxProcessor + +logg = logging.getLogger(__name__) + +DEFAULT_GAS_FACTOR = 1.1 + + +class Retrier: + + def __init__(self, sender, signer, source, chain_spec, gas_oracle, gas_factor=DEFAULT_GAS_FACTOR): + self.sender = sender + self.signer = signer + self.source = source + self.raw_content = [] + self.content = [] + self.cursor = 0 + self.gas_oracle = gas_oracle + self.gas_factor = gas_factor + self.chain_spec = chain_spec + self.chain_id = chain_spec.chain_id() + self.processor = [TxProcessor()] + + + def load(self, process=True): + for processor in self.processor: + self.raw_content = processor.load(self.source) + if self.raw_content != None: + if process: + #try: + self.process() + #except Exception as e: + # raise TxSourceError('invalid source contents: {}'.format(str(e))) + return self.content + raise TxSourceError('unparseable source') + + + def process(self): + gas_data = self.gas_oracle.get_gas() + gas_price = gas_data[0] + for tx in self.raw_content: + tx_bytes = bytes.fromhex(strip_0x(tx)) + tx = unpack(tx_bytes, self.chain_spec) + tx_gas_price_old = int(tx['gasPrice']) + if tx_gas_price_old < gas_price: + tx['gasPrice'] = gas_price + else: + tx['gasPrice'] = int(tx_gas_price_old * self.gas_factor) + if tx_gas_price_old == tx['gasPrice']: + tx['gasPrice'] += 1 + tx_obj = EIP155Transaction(tx, tx['nonce'], self.chain_id) + new_tx_bytes = self.signer.sign_transaction_to_wire(tx_obj) + logg.debug('add tx {} with gas price changed from {} to {}: {}'.format(tx['hash'], tx_gas_price_old, tx['gasPrice'], new_tx_bytes.hex())) + self.content.append(new_tx_bytes) + + + def __iter__(self): + self.cursor = 0 + return self + + + def __next__(self): + if self.cursor == len(self.content): + raise StopIteration() + tx = self.content[self.cursor] + self.cursor += 1 + return tx diff --git a/chaind_eth/cli/tx.py b/chaind_eth/cli/tx.py new file mode 100644 index 0000000..0a5d7d8 --- /dev/null +++ b/chaind_eth/cli/tx.py @@ -0,0 +1,23 @@ +# standard imports +import logging + +logg = logging.getLogger(__name__) + +class TxProcessor: + + def load(self, s): + contents = [] + f = None + try: + f = open(s, 'r') + except FileNotFoundError: + return None + + contents = f.readlines() + f.close() + for i in range(len(contents)): + contents[i] = contents[i].rstrip() + return contents + + def __str__(self): + return 'tx processor' diff --git a/chaind_eth/runnable/resend.py b/chaind_eth/runnable/resend.py new file mode 100644 index 0000000..66db19c --- /dev/null +++ b/chaind_eth/runnable/resend.py @@ -0,0 +1,110 @@ +# standard imports +import os +import logging +import sys +import datetime +import enum +import re +import stat + +# external imports +import chainlib.eth.cli +from chaind import Environment +from chainlib.eth.gas import price +from chainlib.chain import ChainSpec +from hexathon import strip_0x +from eth_token_index.index import TokenUniqueSymbolIndex + +# local imports +from chaind_eth.cli.retry import Retrier +from chaind.error import TxSourceError +from chaind_eth.cli.output import ( + Outputter, + OpMode, + ) + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.dirname(os.path.realpath(__file__)) +config_dir = os.path.join(script_dir, '..', 'data', 'config') + + +arg_flags = chainlib.eth.cli.argflag_std_write +argparser = chainlib.eth.cli.ArgumentParser(arg_flags) +argparser.add_argument('--socket', dest='socket', type=str, help='Socket to send transactions to') +argparser.add_argument('--token-index', dest='token_index', type=str, help='Token resolver index') +argparser.add_positional('source', required=False, type=str, help='Transaction source file') +args = argparser.parse_args() + +extra_args = { + 'socket': None, + 'source': None, + 'token_index': None, + } + +env = Environment(domain='eth', env=os.environ) +config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) + +wallet = chainlib.eth.cli.Wallet() +wallet.from_config(config) + +rpc = chainlib.eth.cli.Rpc(wallet=wallet) +conn = rpc.connect_by_config(config) + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + + +mode = OpMode.STDOUT +re_unix = r'^ipc://(/.+)' +m = re.match(re_unix, config.get('_SOCKET', '')) +if m != None: + config.add(m.group(1), '_SOCKET', exists_ok=True) + r = 0 + try: + stat_info = os.stat(config.get('_SOCKET')) + if not stat.S_ISSOCK(stat_info.st_mode): + r = 1 + except FileNotFoundError: + r = 1 + + if r > 0: + sys.stderr.write('{} is not a socket\n'.format(config.get('_SOCKET'))) + sys.exit(1) + + mode = OpMode.UNIX + +logg.info('using mode {}'.format(mode.value)) + +if config.get('_SOURCE') == None: + sys.stderr.write('source data missing') + sys.exit(1) + + +def main(): + signer = rpc.get_signer() + + # TODO: make resolvers pluggable + processor = Retrier(wallet.get_signer_address(), wallet.get_signer(), config.get('_SOURCE'), chain_spec, rpc.get_gas_oracle()) + + sends = None + try: + sends = processor.load() + except TxSourceError as e: + sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor))) + sys.exit(1) + + tx_iter = iter(processor) + out = Outputter(mode) + while True: + tx = None + try: + tx_bytes = next(tx_iter) + except StopIteration: + break + tx_hex = tx_bytes.hex() + print(out.do(tx_hex, socket=config.get('_SOCKET'))) + + +if __name__ == '__main__': + main() diff --git a/chaind_eth/runnable/retry.py b/chaind_eth/runnable/retry.py deleted file mode 100644 index 5b0ea70..0000000 --- a/chaind_eth/runnable/retry.py +++ /dev/null @@ -1,113 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -# standard imports -import os -import logging -import sys -import datetime - -# external imports -from hexathon import ( - add_0x, - strip_0x, - ) -from chaind import Environment -import chainlib.eth.cli -from chainlib.chain import ChainSpec -from chainqueue.db import dsn_from_config -from chainqueue.sql.backend import SQLBackend -from chainqueue.enum import StatusBits -from chaind.sql.session import SessionIndex -from chainqueue.adapters.eth import EthAdapter -from chainlib.eth.gas import price -from chainlib.eth.connection import EthHTTPConnection -from crypto_dev_signer.eth.transaction import EIP155Transaction - -DEFAULT_GAS_FACTOR = 1.1 - - -logging.basicConfig(level=logging.WARNING) -logg = logging.getLogger() - -script_dir = os.path.dirname(os.path.realpath(__file__)) -config_dir = os.path.join(script_dir, '..', 'data', 'config') - -arg_flags = chainlib.eth.cli.argflag_std_write -argparser = chainlib.eth.cli.ArgumentParser(arg_flags) -argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")') -argparser.add_positional('session_id', required=False, type=str, help='Session id to connect to') -args = argparser.parse_args() -extra_args = { - 'backend': None, - 'session_id': 'SESSION_ID', - } - -env = Environment(domain='eth', env=os.environ) - -config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) - -if config.get('SESSION_DATA_DIR') == None: - config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True) - -chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) - -tx_getter = None -session_method = None -if config.get('_BACKEND') == 'sql': - from chainqueue.sql.query import get_tx_cache as tx_getter - from chainqueue.runnable.sql import setup_backend - from chainqueue.db.models.base import SessionBase - setup_backend(config, debug=config.true('DATABASE_DEBUG')) - session_method = SessionBase.create_session -else: - raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND'))) - -if config.get('DATABASE_ENGINE') == 'sqlite': - config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) - -wallet = chainlib.eth.cli.Wallet() -wallet.from_config(config) - -rpc = chainlib.eth.cli.Rpc(wallet=wallet) -conn = rpc.connect_by_config(config) - -dsn = dsn_from_config(config) -backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'), error_parser=rpc.error_parser) -session_index_backend = SessionIndex(config.get('SESSION_ID')) -adapter = EthAdapter(backend, session_index_backend=session_index_backend) - - -def main(): - before = datetime.datetime.utcnow() - adapter.pending_retry_threshold - txs = session_index_backend.get(chain_spec, adapter, status=StatusBits.IN_NETWORK, not_status=StatusBits.FINAL | StatusBits.OBSOLETE, before=before) - - o = price() - r = conn.do(o, error_parser=rpc.error_parser) - gas_price = strip_0x(r) - try: - gas_price = int(gas_price, 16) - except ValueError: - gas_price = int(gas_price) - logg.info('got current gas price {}'.format(gas_price)) - - signer = rpc.get_signer() - - db_session = adapter.create_session() - for tx_hash in txs: - tx_bytes = bytes.fromhex(strip_0x(txs[tx_hash])) - tx = adapter.translate(tx_bytes, chain_spec) - tx_gas_price = int(tx['gasPrice']) - if tx_gas_price < gas_price: - tx['gasPrice'] = gas_price - else: - tx['gasPrice'] = int(tx['gasPrice'] * DEFAULT_GAS_FACTOR) - tx_obj = EIP155Transaction(tx, tx['nonce'], chain_spec.chain_id()) - new_tx_bytes = signer.sign_transaction_to_wire(tx_obj) - logg.debug('add tx {} with gas price changed from {} to {}: {}'.format(tx_hash, tx_gas_price, tx['gasPrice'], new_tx_bytes.hex())) - adapter.add(new_tx_bytes, chain_spec, session=db_session) - - db_session.close() - - -if __name__ == '__main__': - main() diff --git a/setup.cfg b/setup.cfg index 6987ffa..4c81784 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,3 +38,4 @@ console_scripts = chaind-eth-server = chaind_eth.runnable.server:main chaind-eth-syncer = chaind_eth.runnable.syncer:main chaind-eth-send = chaind_eth.runnable.send:main + chaind-eth-resend = chaind_eth.runnable.resend:main