2021-06-04 20:48:19 +02:00
|
|
|
|
# standard imports
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
# external imports
|
|
|
|
|
from chainlib.eth.address import to_checksum_address
|
|
|
|
|
from chainlib.eth.tx import unpack
|
2021-07-19 14:02:31 +02:00
|
|
|
|
from chainlib.error import JSONRPCException
|
2021-06-04 20:48:19 +02:00
|
|
|
|
from chainqueue.enum import StatusBits
|
|
|
|
|
from chainqueue.sql.query import count_tx
|
|
|
|
|
from hexathon import strip_0x
|
|
|
|
|
|
2021-07-18 12:02:14 +02:00
|
|
|
|
#logg = logging.getLogger(__name__)
|
|
|
|
|
logg = logging.getLogger()
|
2021-06-04 20:48:19 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Dispatcher:
|
|
|
|
|
|
|
|
|
|
status_inflight_mask = StatusBits.IN_NETWORK | StatusBits.FINAL
|
2021-07-18 12:02:14 +02:00
|
|
|
|
status_inflight_mask_match = StatusBits.IN_NETWORK
|
2021-06-04 20:48:19 +02:00
|
|
|
|
|
|
|
|
|
def __init__(self, chain_spec, adapter, limit=100):
|
|
|
|
|
self.address_counts = {}
|
|
|
|
|
self.chain_spec = chain_spec
|
|
|
|
|
self.adapter = adapter
|
|
|
|
|
self.limit = limit
|
|
|
|
|
|
|
|
|
|
|
2021-07-18 12:02:14 +02:00
|
|
|
|
def __init_count(self, address, session):
|
2021-06-04 20:48:19 +02:00
|
|
|
|
c = self.address_counts.get(address)
|
|
|
|
|
if c == None:
|
2021-07-18 12:02:14 +02:00
|
|
|
|
c = self.limit - count_tx(self.chain_spec, address, self.status_inflight_mask, self.status_inflight_mask_match, session=session)
|
2021-06-04 20:48:19 +02:00
|
|
|
|
if c < 0:
|
|
|
|
|
c = 0
|
|
|
|
|
self.address_counts[address] = c
|
|
|
|
|
return c
|
|
|
|
|
|
|
|
|
|
|
2021-07-18 12:02:14 +02:00
|
|
|
|
def get_count(self, address, session):
|
|
|
|
|
return self.__init_count(address, session)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def inc_count(self, address, session):
|
|
|
|
|
self.__init_count(address, session)
|
|
|
|
|
self.address_counts[address] -= 1
|
|
|
|
|
|
|
|
|
|
|
2021-06-04 20:48:19 +02:00
|
|
|
|
def process(self, rpc, session):
|
|
|
|
|
c = 0
|
|
|
|
|
txs = self.adapter.upcoming(self.chain_spec, session=session)
|
|
|
|
|
for k in txs.keys():
|
|
|
|
|
signed_tx_bytes = bytes.fromhex(strip_0x(txs[k]))
|
|
|
|
|
tx_obj = unpack(signed_tx_bytes, self.chain_spec)
|
|
|
|
|
sender = to_checksum_address(tx_obj['from'])
|
|
|
|
|
address_count = self.get_count(sender, session)
|
|
|
|
|
if address_count == 0:
|
|
|
|
|
logg.debug('too many inflight txs for {}, skipping {}'.format(sender, k))
|
|
|
|
|
continue
|
2021-07-18 12:02:14 +02:00
|
|
|
|
logg.debug('processing tx {} {}'.format(k, txs[k]))
|
|
|
|
|
try:
|
|
|
|
|
self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session)
|
|
|
|
|
except JSONRPCException as e:
|
|
|
|
|
logg.error('dispatch failed for {}: {}'.format(k, e))
|
|
|
|
|
continue
|
|
|
|
|
self.inc_count(sender, session)
|
2021-06-04 20:48:19 +02:00
|
|
|
|
c += 1
|
|
|
|
|
return c
|