From 1b82cb31187069e0ea0930de28ab5858a55f1cd2 Mon Sep 17 00:00:00 2001 From: nolash Date: Fri, 4 Jun 2021 20:48:19 +0200 Subject: [PATCH] Factor out dispatch to separate object, implement batch cap per address --- chaind_eth/dispatch.py | 49 +++++++++++++++++++++++++++++++++++ chaind_eth/runnable/server.py | 17 ++++++------ chainqueue/adapters/eth.py | 20 ++++++-------- requirements.txt | 2 +- setup.cfg | 2 +- 5 files changed, 68 insertions(+), 22 deletions(-) create mode 100644 chaind_eth/dispatch.py diff --git a/chaind_eth/dispatch.py b/chaind_eth/dispatch.py new file mode 100644 index 0000000..71e68cc --- /dev/null +++ b/chaind_eth/dispatch.py @@ -0,0 +1,49 @@ +# standard imports +import logging + +# external imports +from chainlib.eth.address import to_checksum_address +from chainlib.eth.tx import unpack +from chainqueue.enum import StatusBits +from chainqueue.sql.query import count_tx +from hexathon import strip_0x + +logg = logging.getLogger(__name__) + + +class Dispatcher: + + status_inflight_mask = StatusBits.IN_NETWORK | StatusBits.FINAL + + def __init__(self, chain_spec, adapter, limit=100): + self.address_counts = {} + self.chain_spec = chain_spec + self.adapter = adapter + self.limit = limit + + + def get_count(self, address, session): + c = self.address_counts.get(address) + if c == None: + c = self.limit - count_tx(self.chain_spec, address, self.status_inflight_mask, StatusBits.IN_NETWORK, session=session) + if c < 0: + c = 0 + self.address_counts[address] = c + return c + + + def process(self, rpc, session): + c = 0 + txs = self.adapter.upcoming(self.chain_spec, session=session) + for k in txs.keys(): + signed_tx_bytes = bytes.fromhex(strip_0x(txs[k])) + tx_obj = unpack(signed_tx_bytes, self.chain_spec) + sender = to_checksum_address(tx_obj['from']) + address_count = self.get_count(sender, session) + if address_count == 0: + logg.debug('too many inflight txs for {}, skipping {}'.format(sender, k)) + continue + logg.debug('txs {} {}'.format(k, txs[k])) + self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session) + c += 1 + return c diff --git a/chaind_eth/runnable/server.py b/chaind_eth/runnable/server.py index f947b26..473ae56 100644 --- a/chaind_eth/runnable/server.py +++ b/chaind_eth/runnable/server.py @@ -19,6 +19,7 @@ from chainqueue.sql.backend import SQLBackend from chainqueue.db import dsn_from_config # local imports +from chaind_eth.dispatch import Dispatcher from chainqueue.adapters.eth import EthAdapter logging.basicConfig(level=logging.WARNING) @@ -128,7 +129,6 @@ def main(): logg.debug('getting connection') (srvs, srvs_addr) = ctrl.get_connection() except OSError as e: - havesends = 0 try: fi = os.stat(config.get('SESSION_SOCKET_PATH')) except FileNotFoundError: @@ -139,12 +139,11 @@ def main(): break if srvs == None: logg.debug('timeout (remote socket is none)') - txs = adapter.upcoming(chain_spec) - for k in txs.keys(): - havesends += 1 - logg.debug('txs {} {}'.format(k, txs[k])) - adapter.dispatch(chain_spec, rpc, k, txs[k]) - if havesends > 0: + dispatcher = Dispatcher(chain_spec, adapter) + session = backend.create_session() + r = dispatcher.process(rpc, session) + session.close() + if r > 0: ctrl.srv.settimeout(0.1) else: ctrl.srv.settimeout(4.0) @@ -167,12 +166,14 @@ def main(): continue logg.debug('recv {} bytes'.format(len(data))) - r = adapter.add(chain_spec, data) + session = backend.create_session() + r = adapter.add(chain_spec, data, session) try: r = srvs.send(r.to_bytes(4, byteorder='big')) logg.debug('{} bytes sent'.format(r)) except BrokenPipeError: logg.debug('they just hung up. how rude.') + session.close() srvs.close() ctrl.shutdown(None, None) diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py index c5d872f..ec8b4d8 100644 --- a/chainqueue/adapters/eth.py +++ b/chainqueue/adapters/eth.py @@ -25,9 +25,8 @@ class EthAdapter(Adapter): return tx - def add(self, chain_spec, bytecode): + def add(self, chain_spec, bytecode, session): tx = self.translate(bytecode, chain_spec) - session = self.backend.create_session() r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) if r: session.rollback() @@ -35,23 +34,20 @@ class EthAdapter(Adapter): return r r = self.backend.cache(tx, session=session) session.commit() - session.close() return r - def cache(self, chain_spec): - session = self.backend.create_session() - r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) - session.close() +# def cache(self, chain_spec): +# session = self.backend.create_session() +# r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) +# session.close() - def upcoming(self, chain_spec): - return self.backend.get(chain_spec, StatusBits.QUEUED, unpack) + def upcoming(self, chain_spec, session): + return self.backend.get(chain_spec, StatusBits.QUEUED, unpack) # possible maldesign, up-stack should use our session? - def dispatch(self, chain_spec, rpc, tx_hash, signed_tx): + def dispatch(self, chain_spec, rpc, tx_hash, signed_tx, session): o = raw(signed_tx) - session = self.backend.create_session() r = self.backend.dispatch(chain_spec, rpc, tx_hash, o) - session.close() return r diff --git a/requirements.txt b/requirements.txt index 85e0fae..0614599 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -chaind<=0.0.1,>=0.0.1a2 +chaind<=0.0.1,>=0.0.1a3 hexathon~=0.0.1a7 diff --git a/setup.cfg b/setup.cfg index 313f52b..02b14da 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chaind-eth -version = 0.0.1a1 +version = 0.0.1a2 description = Queue server for ethereum author = Louis Holbrook author_email = dev@holbrook.no