Add dispatch tests

This commit is contained in:
nolash 2021-07-18 12:02:14 +02:00
parent d05cd9f202
commit d208ebb473
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
6 changed files with 92 additions and 33 deletions

View File

@ -8,12 +8,14 @@ from chainqueue.enum import StatusBits
from chainqueue.sql.query import count_tx from chainqueue.sql.query import count_tx
from hexathon import strip_0x from hexathon import strip_0x
logg = logging.getLogger(__name__) #logg = logging.getLogger(__name__)
logg = logging.getLogger()
class Dispatcher: class Dispatcher:
status_inflight_mask = StatusBits.IN_NETWORK | StatusBits.FINAL status_inflight_mask = StatusBits.IN_NETWORK | StatusBits.FINAL
status_inflight_mask_match = StatusBits.IN_NETWORK
def __init__(self, chain_spec, adapter, limit=100): def __init__(self, chain_spec, adapter, limit=100):
self.address_counts = {} self.address_counts = {}
@ -22,16 +24,25 @@ class Dispatcher:
self.limit = limit self.limit = limit
def get_count(self, address, session): def __init_count(self, address, session):
c = self.address_counts.get(address) c = self.address_counts.get(address)
if c == None: if c == None:
c = self.limit - count_tx(self.chain_spec, address, self.status_inflight_mask, StatusBits.IN_NETWORK, session=session) c = self.limit - count_tx(self.chain_spec, address, self.status_inflight_mask, self.status_inflight_mask_match, session=session)
if c < 0: if c < 0:
c = 0 c = 0
self.address_counts[address] = c self.address_counts[address] = c
return c return c
def get_count(self, address, session):
return self.__init_count(address, session)
def inc_count(self, address, session):
self.__init_count(address, session)
self.address_counts[address] -= 1
def process(self, rpc, session): def process(self, rpc, session):
c = 0 c = 0
txs = self.adapter.upcoming(self.chain_spec, session=session) txs = self.adapter.upcoming(self.chain_spec, session=session)
@ -43,7 +54,12 @@ class Dispatcher:
if address_count == 0: if address_count == 0:
logg.debug('too many inflight txs for {}, skipping {}'.format(sender, k)) logg.debug('too many inflight txs for {}, skipping {}'.format(sender, k))
continue continue
logg.debug('txs {} {}'.format(k, txs[k])) logg.debug('processing tx {} {}'.format(k, txs[k]))
try:
self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session) self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session)
except JSONRPCException as e:
logg.error('dispatch failed for {}: {}'.format(k, e))
continue
self.inc_count(sender, session)
c += 1 c += 1
return c return c

View File

@ -1,3 +1,6 @@
# standard imports
import logging
# external imports # external imports
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.tx import ( from chainlib.eth.tx import (
@ -8,14 +11,19 @@ from hexathon import (
add_0x, add_0x,
strip_0x, strip_0x,
) )
from chainqueue.enum import StatusBits
# local imports # local imports
from chainqueue.adapters.base import Adapter from chainqueue.adapters.base import Adapter
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
class EthAdapter(Adapter): class EthAdapter(Adapter):
def translate(self, chain_spec, bytecode): def translate(self, bytecode, chain_spec):
logg.debug('bytecode {}'.format(bytecode))
tx = unpack(bytecode, chain_spec) tx = unpack(bytecode, chain_spec)
tx['source_token'] = ZERO_ADDRESS tx['source_token'] = ZERO_ADDRESS
tx['destination_token'] = ZERO_ADDRESS tx['destination_token'] = ZERO_ADDRESS
@ -34,8 +42,8 @@ class EthAdapter(Adapter):
return self.backend.get(chain_spec, StatusBits.QUEUED, self.translate) # possible maldesign, up-stack should use our session? return self.backend.get(chain_spec, StatusBits.QUEUED, self.translate) # possible maldesign, up-stack should use our session?
def add(self, chain_spec, bytecode, session=None): def add(self, bytecode, chain_spec, session=None):
tx = self.translate(chain_spec, bytecode) tx = self.translate(bytecode, chain_spec)
r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
if r: if r:
session.rollback() session.rollback()

View File

@ -1,15 +1,23 @@
# standard imports # standard imports
import os
import unittest import unittest
# external imports # external imports
from chainsyncer.unittest.db import ChainSyncerDb from chainsyncer.unittest.db import ChainSyncerDb
from chainqueue.unittest.db import ChainQueueDb from chainqueue.unittest.db import ChainQueueDb
from chainlib.eth.unittest.ethtester import EthTesterCase from chainlib.eth.unittest.ethtester import EthTesterCase
from chainqueue.adapters.eth import EthAdapter
from chainqueue.unittest.db import (
db_config,
dsn_from_config,
)
from chainqueue.sql.backend import SQLBackend
from chainlib.eth.address import to_checksum_address
from hexathon import add_0x
# local imports # local imports
from chaind_eth.chain import EthChainInterface from chaind_eth.chain import EthChainInterface
class TestBase(EthTesterCase): class TestBase(EthTesterCase):
def setUp(self): def setUp(self):
@ -28,3 +36,15 @@ class TestBase(EthTesterCase):
self.session_chainqueue.commit() self.session_chainqueue.commit()
self.db_chainqueue.release_session(self.session_chainqueue) self.db_chainqueue.release_session(self.session_chainqueue)
super(TestBase, self).tearDown() super(TestBase, self).tearDown()
class TestSQLBase(TestBase):
example_tx = bytes.fromhex('f8650d8405f5e10082520894ee38d3a40e177608d41978778206831f60dd0fa88204008077a040adee2ad0a0e566bced4b76a8899549e86719eb8866b87674b6fdc88479c201a030b3ca061bb330f4d78bc9cb8144c8e570339496f56b7809387de2ffeaa585d5')
example_tx_sender = add_0x(to_checksum_address('eb3907ecad74a0013c259d5874ae7f22dcbcc95c'))
dsn = dsn_from_config(db_config)
def setUp(self):
super(TestSQLBase, self).setUp()
self.backend = SQLBackend(self.dsn, debug=bool(os.environ.get('DATABASE_DEBUG')))
self.adapter = EthAdapter(self.backend)

View File

@ -1,44 +1,25 @@
# stanndard imports # stanndard imports
import logging import logging
import unittest import unittest
import os
# external imports # external imports
from chainqueue.sql.backend import SQLBackend
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainqueue.unittest.db import (
db_config,
dsn_from_config,
)
# local imports
from chainqueue.adapters.eth import EthAdapter
# test imports # test imports
from tests.chaind_eth_base import TestBase from tests.chaind_eth_base import TestSQLBase
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
class TestAdapter(TestBase): class TestAdapter(TestSQLBase):
example_tx = bytes.fromhex('f8640183989680825208948311ad69b3429400ab795d45af85d204f73329ae8204d38026a097a7fd66548e4c116270b547ac7ed8cb531b0b97f80d49b45986144e47dbe44da07cc4345741dc0fabf65a473c0d3a1536cd501961f7e01b07dd8e107ff87d1556')
dsn = dsn_from_config(db_config)
def setUp(self):
super(TestAdapter, self).setUp()
self.chain_spec = ChainSpec.from_chain_str('foo:bar:1:baz')
self.backend = SQLBackend(self.dsn, debug=bool(os.environ.get('DATABASE_DEBUG')))
self.adapter = EthAdapter(self.backend)
def test_eth_adapter_translate(self): def test_eth_adapter_translate(self):
self.adapter.translate(self.chain_spec, self.example_tx) self.adapter.translate(self.example_tx, self.chain_spec)
# succesful decode means translate is working, no further checks needed # succesful decode means translate is working, no further checks needed
def test_eth_adapter_add(self): def test_eth_adapter_add(self):
self.adapter.add(self.chain_spec, self.example_tx, session=self.session_chainqueue) self.adapter.add(self.example_tx, self.chain_spec, session=self.session_chainqueue)
if __name__ == '__main__': if __name__ == '__main__':

34
tests/test_dispatch.py Normal file
View File

@ -0,0 +1,34 @@
# stanndard imports
import logging
import unittest
# external imports
from chainlib.eth.tx import unpack
from chainqueue.sql.query import get_tx
from chainqueue.enum import StatusBits
# local imports
from chaind_eth.dispatch import Dispatcher
# test imports
from tests.chaind_eth_base import TestSQLBase
logging.basicConfig(level=logging.DEBUG)
class TestDispatcher(TestSQLBase):
def test_dispatch_process(self):
dispatcher = Dispatcher(self.chain_spec, self.adapter, 1)
self.adapter.add(self.example_tx, self.chain_spec, session=self.session_chainqueue)
assert dispatcher.get_count(self.example_tx_sender, self.session_chainqueue) == 1
dispatcher.process(self.rpc, self.session_chainqueue)
tx_obj = unpack(self.example_tx, self.chain_spec)
o = get_tx(self.chain_spec, tx_obj['hash'], session=self.session_chainqueue)
assert o['status'] & StatusBits.IN_NETWORK > 0
assert dispatcher.get_count(self.example_tx_sender, self.session_chainqueue) == 0
if __name__ == '__main__':
unittest.main()

View File

@ -76,7 +76,7 @@ class TestFilter(TestBase):
adapter = EthAdapter(backend) adapter = EthAdapter(backend)
tx_raw_rlp_signed_bytes = bytes.fromhex(strip_0x(tx_raw_rlp_signed)) tx_raw_rlp_signed_bytes = bytes.fromhex(strip_0x(tx_raw_rlp_signed))
adapter.add(self.chain_spec, tx_raw_rlp_signed_bytes, session=self.session_chainqueue) adapter.add(tx_raw_rlp_signed_bytes, self.chain_spec, session=self.session_chainqueue)
set_ready(self.chain_spec, tx_hash, session=self.session_chainqueue) set_ready(self.chain_spec, tx_hash, session=self.session_chainqueue)
set_reserved(self.chain_spec, tx_hash, session=self.session_chainqueue) set_reserved(self.chain_spec, tx_hash, session=self.session_chainqueue)