diff --git a/chainsyncer/driver/head.py b/chainsyncer/driver/head.py index 46e2d21..87ca6f8 100644 --- a/chainsyncer/driver/head.py +++ b/chainsyncer/driver/head.py @@ -1,6 +1,12 @@ # standard imports import logging +# external imports +from chainlib.eth.tx import ( + transaction, + Tx, + ) + # local imports from chainsyncer.error import NoBlockForYou from .poll import BlockPollSyncer @@ -28,15 +34,18 @@ class HeadSyncer(BlockPollSyncer): (pair, fltr) = self.backend.get() logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr)) i = pair[1] # set tx index from previous - tx = None + tx_src = None while True: # handle block objects regardless of whether the tx data is embedded or not try: tx = block.tx(i) except AttributeError: - o = tx(block.txs[i]) + o = transaction(block.txs[i]) r = conn.do(o) - tx = self.interface.tx_from_src(Tx.src_normalize(r), block=block) + tx_src = Tx.src_normalize(r) + tx = self.chain_interface.tx_from_src(tx_src, block=block) + + #except IndexError as e: # logg.debug('index error syncer tx get {}'.format(e)) # break diff --git a/chainsyncer/driver/history.py b/chainsyncer/driver/history.py index 7cebc74..bc55994 100644 --- a/chainsyncer/driver/history.py +++ b/chainsyncer/driver/history.py @@ -26,7 +26,6 @@ class HistorySyncer(HeadSyncer): if block_number == None: raise AttributeError('backend has no future target. Use HeadSyner instead') self.block_target = block_number - logg.debug('block target {}'.format(self.block_target)) def get(self, conn): @@ -44,7 +43,7 @@ class HistorySyncer(HeadSyncer): raise SyncDone(self.block_target) block_number = height[0] block_hash = [] - o = self.chain_interface.block_by_number(block_number) + o = self.chain_interface.block_by_number(block_number, include_tx=True) try: r = conn.do(o) # TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future diff --git a/chainsyncer/driver/poll.py b/chainsyncer/driver/poll.py index 43bf4b1..e11fdcb 100644 --- a/chainsyncer/driver/poll.py +++ b/chainsyncer/driver/poll.py @@ -9,8 +9,7 @@ from chainsyncer.error import ( NoBlockForYou, ) -#logg = logging.getLogger(__name__) -logg = logging.getLogger() +logg = logging.getLogger(__name__) @@ -33,6 +32,7 @@ class BlockPollSyncer(Syncer): (pair, fltr) = self.backend.get() start_tx = pair[1] + while self.running and Syncer.running_global: if self.pre_callback != None: self.pre_callback() diff --git a/chainsyncer/driver/threadrange.py b/chainsyncer/driver/threadrange.py index 89a6b86..f35f14f 100644 --- a/chainsyncer/driver/threadrange.py +++ b/chainsyncer/driver/threadrange.py @@ -2,16 +2,18 @@ import copy import logging import multiprocessing +import os +# external iports +from chainlib.eth.connection import RPCConnection # local imports from chainsyncer.driver.history import HistorySyncer +from chainsyncer.driver.base import Syncer from .threadpool import ThreadPoolTask -#logg = logging.getLogger(__name__) -logg = logging.getLogger() +logg = logging.getLogger(__name__) -#def range_to_backends(chain_spec, block_offset, tx_offset, block_target, flags, flags_count, backend_class, backend_count): def sync_split(block_offset, block_target, count): block_count = block_target - block_offset if block_count < count: @@ -19,26 +21,19 @@ def sync_split(block_offset, block_target, count): count = block_count blocks_per_thread = int(block_count / count) - #backends = [] - #for i in range(backend_count): ranges = [] for i in range(count): block_target = block_offset + blocks_per_thread - #backend = backend_class.custom(chain_spec, block_target - 1, block_offset=block_offset, tx_offset=tx_offset, flags=flags, flags_count=flags_count) offset = block_offset target = block_target -1 ranges.append((offset, target,)) block_offset = block_target -# tx_offset = 0 -# flags = 0 - -# return backends return ranges class ThreadPoolRangeTask: - def __init__(self, backend, sync_range, conn_factory, chain_interface, syncer_factory=HistorySyncer): + def __init__(self, backend, sync_range, chain_interface, syncer_factory=HistorySyncer, filters=[]): backend_start = backend.start() backend_target = backend.target() backend_class = backend.__class__ @@ -49,34 +44,40 @@ class ThreadPoolRangeTask: flags = backend_start[1] self.backend = backend_class.custom(backend.chain_spec, sync_range[1], block_offset=sync_range[0], tx_offset=tx_offset, flags=flags, flags_count=0) self.syncer = syncer_factory(self.backend, chain_interface) - self.conn_factory = conn_factory - + for fltr in filters: + self.syncer.add_filter(fltr) def start_loop(self, interval): - conn = self.conn_factory() + conn = RPCConnection.connect(self.backend.chain_spec) return self.syncer.loop(interval, conn) class ThreadPoolRangeHistorySyncer: - def __init__(self, conn_factory, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None): + def __init__(self, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None): + #super(ThreadPoolRangeHistorySyncer, self).__init__(backend, chain_interface) self.src_backend = backend self.thread_count = thread_count - self.conn_factory = conn_factory self.single_sync_offset = 0 self.runlevel_callback = None backend_start = backend.start() backend_target = backend.target() self.ranges = sync_split(backend_start[0][0], backend_target[0], thread_count) self.chain_interface = chain_interface + self.filters = [] + + + def add_filter(self, f): + self.filters.append(f) def loop(self, interval, conn): self.worker_pool = multiprocessing.Pool(processes=self.thread_count) + for sync_range in self.ranges: - conn = self.conn_factory() - task = ThreadPoolRangeTask(self.src_backend, sync_range, self.conn_factory, self.chain_interface) + task = ThreadPoolRangeTask(self.src_backend, sync_range, self.chain_interface, filters=self.filters) t = self.worker_pool.apply_async(task.start_loop, (0.1,)) - print(t.get()) + print('result {}'.format(t.get())) + self.worker_pool.close() self.worker_pool.join() diff --git a/test_requirements.txt b/test_requirements.txt index 502eaac..8f9d5f2 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -2,3 +2,7 @@ chainlib-eth~=0.0.9a14 psycopg2==2.8.6 SQLAlchemy==1.3.20 alembic==1.4.2 +eth_tester==0.5.0b3 +py-evm==0.3.0a20 +rlp==2.0.1 +pytest==6.0.1 diff --git a/tests/chainsyncer_base.py b/tests/chainsyncer_base.py index c34cfe0..d1e9d40 100644 --- a/tests/chainsyncer_base.py +++ b/tests/chainsyncer_base.py @@ -8,8 +8,15 @@ import os # external imports from chainlib.chain import ChainSpec from chainlib.interface import ChainInterface -from chainlib.eth.tx import receipt -from chainlib.eth.block import block_by_number +from chainlib.eth.tx import ( + receipt, + Tx, + ) +from chainlib.eth.block import ( + block_by_number, + Block, + ) +from potaahto.symbols import snake_and_camel # local imports from chainsyncer.db import dsn_from_config @@ -28,6 +35,9 @@ class EthChainInterface(ChainInterface): def __init__(self): self._tx_receipt = receipt self._block_by_number = block_by_number + self._block_from_src = Block.from_src + self._tx_from_src = Tx.from_src + self._src_normalize = snake_and_camel class TestBase(unittest.TestCase): diff --git a/tests/test_thread_range.py b/tests/test_thread_range.py index 404e779..f59a6ed 100644 --- a/tests/test_thread_range.py +++ b/tests/test_thread_range.py @@ -4,25 +4,61 @@ import logging # external imports from chainlib.chain import ChainSpec +from chainlib.eth.unittest.ethtester import EthTesterCase +from chainlib.eth.nonce import RPCNonceOracle +from chainlib.eth.gas import ( + RPCGasOracle, + Gas, + ) +from chainlib.eth.unittest.base import TestRPCConnection # local imports from chainsyncer.backend.memory import MemBackend from chainsyncer.driver.threadrange import ( -# range_to_backends, sync_split, ThreadPoolRangeHistorySyncer, ) from chainsyncer.unittest.base import MockConn +from chainsyncer.unittest.db import ChainSyncerDb # testutil imports -from tests.chainsyncer_base import TestBase +from tests.chainsyncer_base import ( + EthChainInterface, + ) logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() -class TestThreadRange(TestBase): +class SyncerCounter: + def __init__(self): + self.hits = [] + + + def filter(self, conn, block, tx, db_session=None): + logg.debug('fltr {} {}'.format(block, tx)) + self.hits.append((block, tx)) + + +class TestBaseEth(EthTesterCase): + + interface = EthChainInterface() + + def setUp(self): + super(TestBaseEth, self).setUp() + self.db = ChainSyncerDb() + self.session = self.db.bind_session() + + def tearDown(self): + self.session.commit() + self.db.release_session(self.session) + #os.unlink(self.db_path) + + +class TestThreadRange(TestBaseEth): + + interface = EthChainInterface() def test_range_split_even(self): ranges = sync_split(5, 20, 3) @@ -31,41 +67,46 @@ class TestThreadRange(TestBase): self.assertEqual(ranges[1], (10, 14)) self.assertEqual(ranges[2], (15, 19)) -# def test_range_split_even(self): -# chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') -# backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3) -# self.assertEqual(len(backends), 3) -# self.assertEqual(((5, 3), 5), backends[0].start()) -# self.assertEqual((9, 1023), backends[0].target()) -# self.assertEqual(((10, 0), 0), backends[1].start()) -# self.assertEqual((14, 1023), backends[1].target()) -# self.assertEqual(((15, 0), 0), backends[2].start()) -# self.assertEqual((19, 1023), backends[2].target()) -# -# -# def test_range_split_underflow(self): -# chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') -# backends = range_to_backends(chain_spec, 5, 3, 7, 5, 10, MemBackend, 3) -# self.assertEqual(len(backends), 2) -# self.assertEqual(((5, 3), 5), backends[0].start()) -# self.assertEqual((5, 1023), backends[0].target()) -# self.assertEqual(((6, 0), 0), backends[1].start()) -# self.assertEqual((6, 1023), backends[1].target()) + + def test_range_split_underflow(self): + ranges = sync_split(5, 8, 4) + self.assertEqual(len(ranges), 3) + self.assertEqual(ranges[0], (5, 5)) + self.assertEqual(ranges[1], (6, 6)) + self.assertEqual(ranges[2], (7, 7)) -# def test_range_syncer(self): -# chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') -# backends = range_to_backends(chain_spec, 5, 3, 20, 5, 10, MemBackend, 3) -# -# syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backends, self.interface) -# syncer.loop(1, None) -# - def test_range_syncer(self): + def test_range_syncer_hello(self): chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') backend = MemBackend.custom(chain_spec, 20, 5, 3, 5, 10) syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backend, self.interface) syncer.loop(0.1, None) + def test_range_syncer_content(self): + nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) + gas_oracle = RPCGasOracle(self.rpc) + + self.backend.mine_blocks(10) + + c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec) + (tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 1024) + r = self.rpc.do(o) + + self.backend.mine_blocks(3) + + c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec) + (tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 2048) + r = self.rpc.do(o) + + self.backend.mine_blocks(10) + + backend = MemBackend.custom(self.chain_spec, 20, 5, 3, 5, 10) + syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface) + fltr = SyncerCounter() + syncer.add_filter(fltr) + syncer.loop(0.1, None) + + if __name__ == '__main__': unittest.main()