Add parallization test with eth_tester backend

This commit is contained in:
nolash 2021-09-29 19:01:57 +02:00
parent 0e37914991
commit acbfcedc2b
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
7 changed files with 123 additions and 59 deletions

View File

@ -1,6 +1,12 @@
# standard imports
import logging
# external imports
from chainlib.eth.tx import (
transaction,
Tx,
)
# local imports
from chainsyncer.error import NoBlockForYou
from .poll import BlockPollSyncer
@ -28,15 +34,18 @@ class HeadSyncer(BlockPollSyncer):
(pair, fltr) = self.backend.get()
logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr))
i = pair[1] # set tx index from previous
tx = None
tx_src = None
while True:
# handle block objects regardless of whether the tx data is embedded or not
try:
tx = block.tx(i)
except AttributeError:
o = tx(block.txs[i])
o = transaction(block.txs[i])
r = conn.do(o)
tx = self.interface.tx_from_src(Tx.src_normalize(r), block=block)
tx_src = Tx.src_normalize(r)
tx = self.chain_interface.tx_from_src(tx_src, block=block)
#except IndexError as e:
# logg.debug('index error syncer tx get {}'.format(e))
# break

View File

@ -26,7 +26,6 @@ class HistorySyncer(HeadSyncer):
if block_number == None:
raise AttributeError('backend has no future target. Use HeadSyner instead')
self.block_target = block_number
logg.debug('block target {}'.format(self.block_target))
def get(self, conn):
@ -44,7 +43,7 @@ class HistorySyncer(HeadSyncer):
raise SyncDone(self.block_target)
block_number = height[0]
block_hash = []
o = self.chain_interface.block_by_number(block_number)
o = self.chain_interface.block_by_number(block_number, include_tx=True)
try:
r = conn.do(o)
# TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future

View File

@ -9,8 +9,7 @@ from chainsyncer.error import (
NoBlockForYou,
)
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
logg = logging.getLogger(__name__)
@ -33,6 +32,7 @@ class BlockPollSyncer(Syncer):
(pair, fltr) = self.backend.get()
start_tx = pair[1]
while self.running and Syncer.running_global:
if self.pre_callback != None:
self.pre_callback()

View File

@ -2,16 +2,18 @@
import copy
import logging
import multiprocessing
import os
# external iports
from chainlib.eth.connection import RPCConnection
# local imports
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.driver.base import Syncer
from .threadpool import ThreadPoolTask
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
logg = logging.getLogger(__name__)
#def range_to_backends(chain_spec, block_offset, tx_offset, block_target, flags, flags_count, backend_class, backend_count):
def sync_split(block_offset, block_target, count):
block_count = block_target - block_offset
if block_count < count:
@ -19,26 +21,19 @@ def sync_split(block_offset, block_target, count):
count = block_count
blocks_per_thread = int(block_count / count)
#backends = []
#for i in range(backend_count):
ranges = []
for i in range(count):
block_target = block_offset + blocks_per_thread
#backend = backend_class.custom(chain_spec, block_target - 1, block_offset=block_offset, tx_offset=tx_offset, flags=flags, flags_count=flags_count)
offset = block_offset
target = block_target -1
ranges.append((offset, target,))
block_offset = block_target
# tx_offset = 0
# flags = 0
# return backends
return ranges
class ThreadPoolRangeTask:
def __init__(self, backend, sync_range, conn_factory, chain_interface, syncer_factory=HistorySyncer):
def __init__(self, backend, sync_range, chain_interface, syncer_factory=HistorySyncer, filters=[]):
backend_start = backend.start()
backend_target = backend.target()
backend_class = backend.__class__
@ -49,34 +44,40 @@ class ThreadPoolRangeTask:
flags = backend_start[1]
self.backend = backend_class.custom(backend.chain_spec, sync_range[1], block_offset=sync_range[0], tx_offset=tx_offset, flags=flags, flags_count=0)
self.syncer = syncer_factory(self.backend, chain_interface)
self.conn_factory = conn_factory
for fltr in filters:
self.syncer.add_filter(fltr)
def start_loop(self, interval):
conn = self.conn_factory()
conn = RPCConnection.connect(self.backend.chain_spec)
return self.syncer.loop(interval, conn)
class ThreadPoolRangeHistorySyncer:
def __init__(self, conn_factory, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None):
def __init__(self, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None):
#super(ThreadPoolRangeHistorySyncer, self).__init__(backend, chain_interface)
self.src_backend = backend
self.thread_count = thread_count
self.conn_factory = conn_factory
self.single_sync_offset = 0
self.runlevel_callback = None
backend_start = backend.start()
backend_target = backend.target()
self.ranges = sync_split(backend_start[0][0], backend_target[0], thread_count)
self.chain_interface = chain_interface
self.filters = []
def add_filter(self, f):
self.filters.append(f)
def loop(self, interval, conn):
self.worker_pool = multiprocessing.Pool(processes=self.thread_count)
for sync_range in self.ranges:
conn = self.conn_factory()
task = ThreadPoolRangeTask(self.src_backend, sync_range, self.conn_factory, self.chain_interface)
task = ThreadPoolRangeTask(self.src_backend, sync_range, self.chain_interface, filters=self.filters)
t = self.worker_pool.apply_async(task.start_loop, (0.1,))
print(t.get())
print('result {}'.format(t.get()))
self.worker_pool.close()
self.worker_pool.join()

View File

@ -2,3 +2,7 @@ chainlib-eth~=0.0.9a14
psycopg2==2.8.6
SQLAlchemy==1.3.20
alembic==1.4.2
eth_tester==0.5.0b3
py-evm==0.3.0a20
rlp==2.0.1
pytest==6.0.1

View File

@ -8,8 +8,15 @@ import os
# external imports
from chainlib.chain import ChainSpec
from chainlib.interface import ChainInterface
from chainlib.eth.tx import receipt
from chainlib.eth.block import block_by_number
from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.eth.block import (
block_by_number,
Block,
)
from potaahto.symbols import snake_and_camel
# local imports
from chainsyncer.db import dsn_from_config
@ -28,6 +35,9 @@ class EthChainInterface(ChainInterface):
def __init__(self):
self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._tx_from_src = Tx.from_src
self._src_normalize = snake_and_camel
class TestBase(unittest.TestCase):

View File

@ -4,25 +4,61 @@ import logging
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.unittest.ethtester import EthTesterCase
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import (
RPCGasOracle,
Gas,
)
from chainlib.eth.unittest.base import TestRPCConnection
# local imports
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver.threadrange import (
# range_to_backends,
sync_split,
ThreadPoolRangeHistorySyncer,
)
from chainsyncer.unittest.base import MockConn
from chainsyncer.unittest.db import ChainSyncerDb
# testutil imports
from tests.chainsyncer_base import TestBase
from tests.chainsyncer_base import (
EthChainInterface,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestThreadRange(TestBase):
class SyncerCounter:
def __init__(self):
self.hits = []
def filter(self, conn, block, tx, db_session=None):
logg.debug('fltr {} {}'.format(block, tx))
self.hits.append((block, tx))
class TestBaseEth(EthTesterCase):
interface = EthChainInterface()
def setUp(self):
super(TestBaseEth, self).setUp()
self.db = ChainSyncerDb()
self.session = self.db.bind_session()
def tearDown(self):
self.session.commit()
self.db.release_session(self.session)
#os.unlink(self.db_path)
class TestThreadRange(TestBaseEth):
interface = EthChainInterface()
def test_range_split_even(self):
ranges = sync_split(5, 20, 3)
@ -31,41 +67,46 @@ class TestThreadRange(TestBase):
self.assertEqual(ranges[1], (10, 14))
self.assertEqual(ranges[2], (15, 19))
# def test_range_split_even(self):
# chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
# backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3)
# self.assertEqual(len(backends), 3)
# self.assertEqual(((5, 3), 5), backends[0].start())
# self.assertEqual((9, 1023), backends[0].target())
# self.assertEqual(((10, 0), 0), backends[1].start())
# self.assertEqual((14, 1023), backends[1].target())
# self.assertEqual(((15, 0), 0), backends[2].start())
# self.assertEqual((19, 1023), backends[2].target())
#
#
# def test_range_split_underflow(self):
# chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
# backends = range_to_backends(chain_spec, 5, 3, 7, 5, 10, MemBackend, 3)
# self.assertEqual(len(backends), 2)
# self.assertEqual(((5, 3), 5), backends[0].start())
# self.assertEqual((5, 1023), backends[0].target())
# self.assertEqual(((6, 0), 0), backends[1].start())
# self.assertEqual((6, 1023), backends[1].target())
def test_range_split_underflow(self):
ranges = sync_split(5, 8, 4)
self.assertEqual(len(ranges), 3)
self.assertEqual(ranges[0], (5, 5))
self.assertEqual(ranges[1], (6, 6))
self.assertEqual(ranges[2], (7, 7))
# def test_range_syncer(self):
# chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
# backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3)
#
# syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backends, self.interface)
# syncer.loop(1, None)
#
def test_range_syncer(self):
def test_range_syncer_hello(self):
chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
backend = MemBackend.custom(chain_spec, 20, 5, 3, 5, 10)
syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backend, self.interface)
syncer.loop(0.1, None)
def test_range_syncer_content(self):
nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc)
gas_oracle = RPCGasOracle(self.rpc)
self.backend.mine_blocks(10)
c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec)
(tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 1024)
r = self.rpc.do(o)
self.backend.mine_blocks(3)
c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec)
(tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 2048)
r = self.rpc.do(o)
self.backend.mine_blocks(10)
backend = MemBackend.custom(self.chain_spec, 20, 5, 3, 5, 10)
syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface)
fltr = SyncerCounter()
syncer.add_filter(fltr)
syncer.loop(0.1, None)
if __name__ == '__main__':
unittest.main()