diff --git a/apps/cic-eth/cic_eth/sync/__init__.py b/apps/cic-eth/cic_eth/sync/__init__.py deleted file mode 100644 index 325f58dc..00000000 --- a/apps/cic-eth/cic_eth/sync/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .base import Syncer diff --git a/apps/cic-eth/cic_eth/sync/backend.py b/apps/cic-eth/cic_eth/sync/backend.py deleted file mode 100644 index 56f08f30..00000000 --- a/apps/cic-eth/cic_eth/sync/backend.py +++ /dev/null @@ -1,201 +0,0 @@ -# standard imports -import logging - -# local imports -from cic_eth.db.models.sync import BlockchainSync -from cic_eth.db.models.base import SessionBase - -logg = logging.getLogger() - - -class SyncerBackend: - """Interface to block and transaction sync state. - - :param chain_spec: Chain spec for the chain that syncer is running for. - :type chain_spec: cic_registry.chain.ChainSpec - :param object_id: Unique id for the syncer session. - :type object_id: number - """ - def __init__(self, chain_spec, object_id): - self.db_session = None - self.db_object = None - self.chain_spec = chain_spec - self.object_id = object_id - self.connect() - self.disconnect() - - - def connect(self): - """Loads the state of the syncer session with the given id. - """ - if self.db_session == None: - self.db_session = SessionBase.create_session() - q = self.db_session.query(BlockchainSync) - q = q.filter(BlockchainSync.id==self.object_id) - self.db_object = q.first() - if self.db_object == None: - self.disconnect() - raise ValueError('sync entry with id {} not found'.format(self.object_id)) - return self.db_session - - - def disconnect(self): - """Commits state of sync to backend. - """ - if self.db_session != None: - self.db_session.add(self.db_object) - self.db_session.commit() - self.db_session.close() - self.db_session = None - - - def chain(self): - """Returns chain spec for syncer - - :returns: Chain spec - :rtype chain_spec: cic_registry.chain.ChainSpec - """ - return self.chain_spec - - - def get(self): - """Get the current state of the syncer cursor. - - :returns: Block and block transaction height, respectively - :rtype: tuple - """ - self.connect() - pair = self.db_object.cursor() - self.disconnect() - return pair - - - def set(self, block_height, tx_height): - """Update the state of the syncer cursor - :param block_height: Block height of cursor - :type block_height: number - :param tx_height: Block transaction height of cursor - :type tx_height: number - :returns: Block and block transaction height, respectively - :rtype: tuple - """ - self.connect() - pair = self.db_object.set(block_height, tx_height) - self.disconnect() - return pair - - - def start(self): - """Get the initial state of the syncer cursor. - - :returns: Initial block and block transaction height, respectively - :rtype: tuple - """ - self.connect() - pair = self.db_object.start() - self.disconnect() - return pair - - - def target(self): - """Get the target state (upper bound of sync) of the syncer cursor. - - :returns: Target block height - :rtype: number - """ - self.connect() - target = self.db_object.target() - self.disconnect() - return target - - - @staticmethod - def first(chain): - """Returns the model object of the most recent syncer in backend. - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :returns: Last syncer object - :rtype: cic_eth.db.models.BlockchainSync - """ - return BlockchainSync.first(chain) - - - @staticmethod - def initial(chain, block_height): - """Creates a new syncer session and commit its initial state to backend. - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number - :returns: New syncer object - :rtype: cic_eth.db.models.BlockchainSync - """ - object_id = None - session = SessionBase.create_session() - o = BlockchainSync(chain, 0, 0, block_height) - session.add(o) - session.commit() - object_id = o.id - session.close() - - return SyncerBackend(chain, object_id) - - - @staticmethod - def resume(chain, block_height): - """Retrieves and returns all previously unfinished syncer sessions. - - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number - :returns: Syncer objects of unfinished syncs - :rtype: list of cic_eth.db.models.BlockchainSync - """ - syncers = [] - - session = SessionBase.create_session() - - object_id = None - - for object_id in BlockchainSync.get_unsynced(session=session): - logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) - syncers.append(SyncerBackend(chain, object_id)) - - (block_resume, tx_resume) = BlockchainSync.get_last_live_height(block_height, session=session) - if block_height != block_resume: - o = BlockchainSync(chain, block_resume, tx_resume, block_height) - session.add(o) - session.commit() - object_id = o.id - syncers.append(SyncerBackend(chain, object_id)) - logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height)) - - session.close() - - return syncers - - - @staticmethod - def live(chain, block_height): - """Creates a new open-ended syncer session starting at the given block height. - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number - :returns: "Live" syncer object - :rtype: cic_eth.db.models.BlockchainSync - """ - object_id = None - session = SessionBase.create_session() - o = BlockchainSync(chain, block_height, 0, None) - session.add(o) - session.commit() - object_id = o.id - session.close() - - return SyncerBackend(chain, object_id) diff --git a/apps/cic-eth/cic_eth/sync/base.py b/apps/cic-eth/cic_eth/sync/base.py deleted file mode 100644 index b678ccb7..00000000 --- a/apps/cic-eth/cic_eth/sync/base.py +++ /dev/null @@ -1,51 +0,0 @@ -# TODO: extend blocksync model -class Syncer: - """Base class and interface for implementing a block sync poller routine. - - :param bc_cache: Retrieves block cache cursors for chain head and latest processed block. - :type bc_cache: cic_eth.sync.SyncerBackend - """ - w3 = None - running_global = True - - def __init__(self, bc_cache): - self.cursor = None - self.bc_cache = bc_cache - self.filter = [] - self.running = True - - - def chain(self): - """Returns the string representation of the chain spec for the chain the syncer is running on. - - :returns: Chain spec string - :rtype: str - """ - return self.bc_cache.chain() - - - def get(self): - """Get latest unprocessed blocks. - - :returns: list of block hash strings - :rtype: list - """ - raise NotImplementedError() - - - def process(self, w3, ref): - """Process transactions in a single block. - - :param ref: Reference of object to process - :type ref: str, 0x-hex - """ - raise NotImplementedError() - - - def loop(self, interval): - """Entry point for syncer loop - - :param interval: Delay in seconds until next attempt if no new blocks are found. - :type interval: int - """ - raise NotImplementedError() diff --git a/apps/cic-eth/cic_eth/sync/error.py b/apps/cic-eth/cic_eth/sync/error.py deleted file mode 100644 index 1d2ff27b..00000000 --- a/apps/cic-eth/cic_eth/sync/error.py +++ /dev/null @@ -1,4 +0,0 @@ -class LoopDone(Exception): - """Exception raised when a syncing is complete. - """ - pass diff --git a/apps/cic-eth/cic_eth/sync/head.py b/apps/cic-eth/cic_eth/sync/head.py deleted file mode 100644 index f96a75b7..00000000 --- a/apps/cic-eth/cic_eth/sync/head.py +++ /dev/null @@ -1,51 +0,0 @@ -# standard imports -import logging - -# third-party imports -import web3 - -# local imports -from .mined import MinedSyncer -from .base import Syncer - -logg = logging.getLogger() - - -class HeadSyncer(MinedSyncer): - """Implements the get method in Syncer for retrieving every new mined block. - - :param bc_cache: Retrieves block cache cursors for chain head and latest processed block. - :type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend - """ - def __init__(self, bc_cache): - super(HeadSyncer, self).__init__(bc_cache) - # TODO: filter not returning all blocks, at least with ganache. kind of defeats the point, then - #self.w3_filter = rpc.w3.eth.filter({ - # 'fromBlock': block_offset, - # }) #'latest') - #self.bc_cache.set(block_offset, 0) - logg.debug('initialized head syncer with offset {}'.format(bc_cache.start())) - - """Implements Syncer.get - - :param w3: Web3 object - :type w3: web3.Web3 - :returns: Block hash of newly mined blocks. if any - :rtype: list of str, 0x-hex - """ - def get(self, w3): - # Of course, the filter doesn't return the same block dict format as getBlock() so we'll just waste some cycles getting the hashes instead. - #hashes = [] - #for block in self.w3_filter.get_new_entries(): - # hashes.append(block['blockHash']) - #logg.debug('blocks {}'.format(hashes)) - #return hashes - (block_number, tx_number) = self.bc_cache.get() - block_hash = [] - try: - block = w3.eth.getBlock(block_number) - block_hash.append(block.hash) - except web3.exceptions.BlockNotFound: - pass - - return block_hash diff --git a/apps/cic-eth/cic_eth/sync/history.py b/apps/cic-eth/cic_eth/sync/history.py deleted file mode 100644 index 6a500fd0..00000000 --- a/apps/cic-eth/cic_eth/sync/history.py +++ /dev/null @@ -1,74 +0,0 @@ -# standard imports -import logging - -# third-party imports -from web3.exceptions import BlockNotFound -from .error import LoopDone - -# local imports -from .mined import MinedSyncer -from .base import Syncer -from cic_eth.db.models.base import SessionBase - -logg = logging.getLogger() - - -class HistorySyncer(MinedSyncer): - """Implements the get method in Syncer for retrieving all blocks between last processed block before previous shutdown and block height at time of syncer start. - - :param bc_cache: Retrieves block cache cursors for chain head and latest processed block. - :type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend - :param mx: Maximum number of blocks to return in one call - :type mx: int - """ - def __init__(self, bc_cache, mx=500): - super(HistorySyncer, self).__init__(bc_cache) - self.max = mx - - self.target = bc_cache.target() - logg.info('History syncer target block number {}'.format(self.target)) - - session_offset = self.bc_cache.get() - - self.block_offset = session_offset[0] - self.tx_offset = session_offset[1] - logg.info('History syncer starting at {}:{}'.format(session_offset[0], session_offset[1])) - - self.filter = [] - - - """Implements Syncer.get - - BUG: Should also raise LoopDone when block array is empty after loop. - - :param w3: Web3 object - :type w3: web3.Web3 - :raises LoopDone: If a block is not found. - :return: Return a batch of blocks to process - :rtype: list of str, 0x-hex - """ - def get(self, w3): - sync_db = self.bc_cache - height = self.bc_cache.get() - logg.debug('height {}'.format(height)) - block_last = height[0] - tx_last = height[1] - if not self.running: - raise LoopDone((block_last, tx_last)) - b = [] - block_target = block_last + self.max - if block_target > self.target: - block_target = self.target - logg.debug('target {} last {} max {}'.format(block_target, block_last, self.max)) - for i in range(block_last, block_target): - if i == self.target: - logg.info('reached target {}, exiting'.format(i)) - self.running = False - break - bhash = w3.eth.getBlock(i).hash - b.append(bhash) - logg.debug('appending block {} {}'.format(i, bhash.hex())) - if block_last == block_target: - logg.info('aleady reached target {}, exiting'.format(self.target)) - self.running = False - return b diff --git a/apps/cic-eth/cic_eth/sync/mempool.py b/apps/cic-eth/cic_eth/sync/mempool.py deleted file mode 100644 index a3a62c6d..00000000 --- a/apps/cic-eth/cic_eth/sync/mempool.py +++ /dev/null @@ -1,50 +0,0 @@ -class MemPoolSyncer(Syncer): - - - def __init__(self, bc_cache): - raise NotImplementedError('incomplete, needs web3 tx to raw transaction conversion') - super(MemPoolSyncer, self).__init__(bc_cache) -# self.w3_filter = Syncer.w3.eth.filter('pending') -# for tx in tx_cache.txs: -# self.txs.append(tx) -# logg.debug('add tx {} to mempoolsyncer'.format(tx)) -# -# -# def get(self): -# return self.w3_filter.get_new_entries() -# -# -# def process(self, tx_hash): -# tx_hash_hex = tx_hash.hex() -# if tx_hash_hex in self.txs: -# logg.debug('syncer already watching {}, skipping'.format(tx_hash_hex)) -# tx = self.w3.eth.getTransaction(tx_hash_hex) -# serialized_tx = rlp.encode({ -# 'nonce': tx.nonce, -# 'from': getattr(tx, 'from'), -# }) -# logg.info('add {} to syncer: {}'.format(tx, serialized_tx)) -# otx = Otx( -# nonce=tx.nonce, -# address=getattr(tx, 'from'), -# tx_hash=tx_hash_hex, -# signed_tx=serialized_tx, -# ) -# Otx.session.add(otx) -# Otx.session.commit() -# -# -# def loop(self, interval): -# while Syncer.running: -# logg.debug('loop execute') -# txs = self.get() -# logg.debug('got txs {}'.format(txs)) -# for tx in txs: -# #block_number = self.process(block.hex()) -# self.process(tx) -# #if block_number > self.bc_cache.head(): -# # self.bc_cache.head(block_number) -# time.sleep(interval) -# logg.info("Syncer no longer set to run, gracefully exiting") - - diff --git a/apps/cic-eth/cic_eth/sync/mined.py b/apps/cic-eth/cic_eth/sync/mined.py deleted file mode 100644 index 62b55674..00000000 --- a/apps/cic-eth/cic_eth/sync/mined.py +++ /dev/null @@ -1,109 +0,0 @@ -# standard imports -import logging -import time - -# third-party imports -import celery - -# local impotes -from .base import Syncer -from cic_eth.queue.tx import set_final_status -from cic_eth.eth import RpcClient - -app = celery.current_app -logg = logging.getLogger() - - -class MinedSyncer(Syncer): - """Base implementation of block processor for mined blocks. - - Loops through all transactions, - - :param bc_cache: Retrieves block cache cursors for chain head and latest processed block. - :type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend - """ - - yield_delay = 0.005 - - def __init__(self, bc_cache): - super(MinedSyncer, self).__init__(bc_cache) - self.block_offset = 0 - self.tx_offset = 0 - - - def process(self, w3, ref): - """Processes transactions in a single block, advancing transaction (and block) cursor accordingly. - - :param w3: Web3 object - :type w3: web3.Web3 - :param ref: Block reference (hash) to process - :type ref: str, 0x-hex - :returns: Block number of next unprocessed block - :rtype: number - """ - b = w3.eth.getBlock(ref) - c = w3.eth.getBlockTransactionCount(ref) - s = 0 - if self.block_offset == b.number: - s = self.tx_offset - - logg.debug('processing {} (blocknumber {}, count {}, offset {})'.format(ref, b.number, c, s)) - - for i in range(s, c): - tx = w3.eth.getTransactionByBlock(ref, i) - tx_hash_hex = tx['hash'].hex() - rcpt = w3.eth.getTransactionReceipt(tx_hash_hex) - logg.debug('{}/{} processing tx {} from block {} {}'.format(i+1, c, tx_hash_hex, b.number, ref)) - ours = False - # TODO: ensure filter loop can complete on graceful shutdown - for f in self.filter: - #try: - session = self.bc_cache.connect() - task_uuid = f(w3, tx, rcpt, self.chain(), session) - #except Exception as e: - # logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e)) - # continue - if task_uuid != None: - logg.debug('tx {} passed to celery task {}'.format(tx_hash_hex, task_uuid)) - s = celery.signature( - 'set_final_status', - [tx_hash_hex, rcpt['blockNumber'], not rcpt['status']], - ) - s.apply_async() - break - next_tx = i + 1 - if next_tx == c: - self.bc_cache.set(b.number+1, 0) - else: - self.bc_cache.set(b.number, next_tx) - if c == 0: - logg.info('synced block {} has no transactions'.format(b.number)) - #self.bc_cache.session(b.number+1, 0) - self.bc_cache.set(b.number+1, 0) - return b['number'] - - - - def loop(self, interval): - """Loop running until the "running" property of Syncer is set to False. - - Retrieves latest unprocessed blocks and processes them. - - :param interval: Delay in seconds until next attempt if no new blocks are found. - :type interval: int - """ - while self.running and Syncer.running_global: - self.bc_cache.connect() - c = RpcClient(self.chain()) - logg.debug('loop execute') - e = self.get(c.w3) - logg.debug('got blocks {}'.format(e)) - for block in e: - block_number = self.process(c.w3, block.hex()) - logg.info('processed block {} {}'.format(block_number, block.hex())) - self.bc_cache.disconnect() - if len(e) > 0: - time.sleep(self.yield_delay) - else: - time.sleep(interval) - logg.info("Syncer no longer set to run, gracefully exiting") diff --git a/apps/cic-eth/cic_eth/sync/retry.py b/apps/cic-eth/cic_eth/sync/retry.py deleted file mode 100644 index 302e4c75..00000000 --- a/apps/cic-eth/cic_eth/sync/retry.py +++ /dev/null @@ -1,71 +0,0 @@ -# standard imports -import logging -import datetime -import time - -# third-party imports -import celery - -# local imports -from .base import Syncer -from cic_eth.eth.rpc import RpcClient -from cic_eth.db.enum import StatusEnum -from cic_eth.queue.tx import get_status_tx - -logg = logging.getLogger() - -celery_app = celery.current_app - - -class noop_cache: - - def __init__(self, chain_spec): - self.chain_spec = chain_spec - - - def chain(self): - return self.chain_spec - - -class RetrySyncer(Syncer): - - def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None): - cache = noop_cache(chain_spec) - super(RetrySyncer, self).__init__(cache) - if failed_grace_seconds == None: - failed_grace_seconds = stalled_grace_seconds - self.stalled_grace_seconds = stalled_grace_seconds - self.failed_grace_seconds = failed_grace_seconds - self.final_func = final_func - - - def get(self, w3): -# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds) -# failed_txs = get_status_tx( -# StatusEnum.SENDFAIL.value, -# before=before, -# ) - before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) - stalled_txs = get_status_tx( - StatusEnum.SENT.value, - before=before, - ) - # return list(failed_txs.keys()) + list(stalled_txs.keys()) - return stalled_txs - - - def process(self, w3, ref): - logg.debug('tx {}'.format(ref)) - for f in self.filter: - f(w3, ref, None, str(self.chain())) - - - def loop(self, interval): - chain_str = str(self.chain()) - while self.running and Syncer.running_global: - c = RpcClient(self.chain()) - for tx in self.get(c.w3): - self.process(c.w3, tx) - if self.final_func != None: - self.final_func(chain_str) - time.sleep(interval) diff --git a/apps/cic-eth/tests/unit/sync/test_head.py b/apps/cic-eth/tests/unit/sync/test_head.py deleted file mode 100644 index bf5aaf7d..00000000 --- a/apps/cic-eth/tests/unit/sync/test_head.py +++ /dev/null @@ -1,43 +0,0 @@ -# standard imports -import logging - -# local imports -from cic_eth.sync.head import HeadSyncer -from cic_eth.sync.backend import SyncerBackend - -logg = logging.getLogger() - - -def test_head( - init_rpc, - init_database, - init_eth_tester, - mocker, - eth_empty_accounts, - ): - - #backend = SyncBackend(eth_empty_accounts[0], 'foo') - block_number = init_rpc.w3.eth.blockNumber - backend = SyncerBackend.live('foo:666', block_number) - syncer = HeadSyncer(backend) - - #init_eth_tester.mine_block() - nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending') - logg.debug('nonce {}'.format(nonce)) - tx = { - 'from': init_rpc.w3.eth.accounts[0], - 'to': eth_empty_accounts[0], - 'value': 404, - 'gas': 21000, - 'gasPrice': init_rpc.w3.eth.gasPrice, - 'nonce': nonce, - } - tx_hash_one = init_rpc.w3.eth.sendTransaction(tx) - - block_number = init_rpc.w3.eth.blockNumber - backend.set(block_number, 0) - b = syncer.get(init_rpc.w3) - - tx = init_rpc.w3.eth.getTransactionByBlock(b[0], 0) - - assert tx.hash.hex() == tx_hash_one.hex() diff --git a/apps/cic-eth/tests/unit/sync/test_history.py b/apps/cic-eth/tests/unit/sync/test_history.py deleted file mode 100644 index 764a7308..00000000 --- a/apps/cic-eth/tests/unit/sync/test_history.py +++ /dev/null @@ -1,194 +0,0 @@ -# standard imports -import logging - -# third-party imports -import pytest -from web3.exceptions import BlockNotFound -from cic_registry import CICRegistry - -# local imports -from cic_eth.sync.history import HistorySyncer -from cic_eth.sync.head import HeadSyncer -#from cic_eth.sync import Syncer -from cic_eth.db.models.otx import OtxSync -from cic_eth.db.models.base import SessionBase -from cic_eth.sync.backend import SyncerBackend - -logg = logging.getLogger() - -class FinishedError(Exception): - pass - - -class DebugFilter: - - def __init__(self, address): - self.txs = [] - self.monitor_to_address = address - - def filter(self, w3, tx, rcpt, chain_spec): - logg.debug('sync filter {}'.format(tx['hash'].hex())) - if tx['to'] == self.monitor_to_address: - self.txs.append(tx) - # hack workaround, latest block hash not found in eth_tester for some reason - if len(self.txs) == 2: - raise FinishedError('intentionally finished on tx {}'.format(tx)) - - -def test_history( - init_rpc, - init_database, - init_eth_tester, - #celery_session_worker, - eth_empty_accounts, - ): - - nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending') - logg.debug('nonce {}'.format(nonce)) - tx = { - 'from': init_rpc.w3.eth.accounts[0], - 'to': eth_empty_accounts[0], - 'value': 404, - 'gas': 21000, - 'gasPrice': init_rpc.w3.eth.gasPrice, - 'nonce': nonce, - } - tx_hash_one = init_rpc.w3.eth.sendTransaction(tx) - - nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending') - logg.debug('nonce {}'.format(nonce)) - tx = { - 'from': init_rpc.w3.eth.accounts[1], - 'to': eth_empty_accounts[0], - 'value': 404, - 'gas': 21000, - 'gasPrice': init_rpc.w3.eth.gasPrice, - 'nonce': nonce, - } - tx_hash_two = init_rpc.w3.eth.sendTransaction(tx) - init_eth_tester.mine_block() - - block_number = init_rpc.w3.eth.blockNumber - - live_syncer = SyncerBackend.live('foo:666', 0) - HeadSyncer(live_syncer) - - history_syncers = SyncerBackend.resume('foo:666', block_number) - - for history_syncer in history_syncers: - logg.info('history syncer start {} target {}'.format(history_syncer.start(), history_syncer.target())) - - backend = history_syncers[0] - - syncer = HistorySyncer(backend) - fltr = DebugFilter(eth_empty_accounts[0]) - syncer.filter.append(fltr.filter) - - logg.debug('have txs {} {}'.format(tx_hash_one.hex(), tx_hash_two.hex())) - - try: - syncer.loop(0.1) - except FinishedError: - pass - except BlockNotFound as e: - logg.error('the last block given in loop does not seem to exist :/ {}'.format(e)) - - check_hashes = [] - for h in fltr.txs: - check_hashes.append(h['hash'].hex()) - assert tx_hash_one.hex() in check_hashes - assert tx_hash_two.hex() in check_hashes - - -def test_history_multiple( - init_rpc, - init_database, - init_eth_tester, - #celery_session_worker, - eth_empty_accounts, - ): - - block_number = init_rpc.w3.eth.blockNumber - live_syncer = SyncerBackend.live('foo:666', block_number) - HeadSyncer(live_syncer) - - nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending') - logg.debug('nonce {}'.format(nonce)) - tx = { - 'from': init_rpc.w3.eth.accounts[0], - 'to': eth_empty_accounts[0], - 'value': 404, - 'gas': 21000, - 'gasPrice': init_rpc.w3.eth.gasPrice, - 'nonce': nonce, - } - tx_hash_one = init_rpc.w3.eth.sendTransaction(tx) - - - init_eth_tester.mine_block() - block_number = init_rpc.w3.eth.blockNumber - history_syncers = SyncerBackend.resume('foo:666', block_number) - for history_syncer in history_syncers: - logg.info('halfway history syncer start {} target {}'.format(history_syncer.start(), history_syncer.target())) - live_syncer = SyncerBackend.live('foo:666', block_number) - HeadSyncer(live_syncer) - - nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending') - logg.debug('nonce {}'.format(nonce)) - tx = { - 'from': init_rpc.w3.eth.accounts[1], - 'to': eth_empty_accounts[0], - 'value': 404, - 'gas': 21000, - 'gasPrice': init_rpc.w3.eth.gasPrice, - 'nonce': nonce, - } - tx_hash_two = init_rpc.w3.eth.sendTransaction(tx) - - init_eth_tester.mine_block() - block_number = init_rpc.w3.eth.blockNumber - history_syncers = SyncerBackend.resume('foo:666', block_number) - live_syncer = SyncerBackend.live('foo:666', block_number) - HeadSyncer(live_syncer) - - for history_syncer in history_syncers: - logg.info('history syncer start {} target {}'.format(history_syncer.start(), history_syncer.target())) - - assert len(history_syncers) == 2 - - backend = history_syncers[0] - syncer = HistorySyncer(backend) - fltr = DebugFilter(eth_empty_accounts[0]) - syncer.filter.append(fltr.filter) - try: - syncer.loop(0.1) - except FinishedError: - pass - except BlockNotFound as e: - logg.error('the last block given in loop does not seem to exist :/ {}'.format(e)) - - check_hashes = [] - for h in fltr.txs: - check_hashes.append(h['hash'].hex()) - assert tx_hash_one.hex() in check_hashes - - - backend = history_syncers[1] - syncer = HistorySyncer(backend) - fltr = DebugFilter(eth_empty_accounts[0]) - syncer.filter.append(fltr.filter) - try: - syncer.loop(0.1) - except FinishedError: - pass - except BlockNotFound as e: - logg.error('the last block given in loop does not seem to exist :/ {}'.format(e)) - - check_hashes = [] - for h in fltr.txs: - check_hashes.append(h['hash'].hex()) - assert tx_hash_two.hex() in check_hashes - - history_syncers = SyncerBackend.resume('foo:666', block_number) - - assert len(history_syncers) == 0 diff --git a/apps/cic-eth/tests/unit/sync/test_syncer_scratch.py b/apps/cic-eth/tests/unit/sync/test_syncer_scratch.py deleted file mode 100644 index 2188fa83..00000000 --- a/apps/cic-eth/tests/unit/sync/test_syncer_scratch.py +++ /dev/null @@ -1,79 +0,0 @@ -# third-party imports -import pytest - -# local imports -from cic_eth.db.models.sync import BlockchainSync -from cic_eth.sync.backend import SyncerBackend - - -def test_scratch( - init_database, - ): - - with pytest.raises(ValueError): - s = SyncerBackend('Testchain:666', 13) - - syncer = SyncerBackend.live('Testchain:666', 13) - - s = SyncerBackend('Testchain:666', syncer.object_id) - - - -def test_live( - init_database, - ): - - s = SyncerBackend.live('Testchain:666', 13) - - s.connect() - assert s.db_object.target() == None - s.disconnect() - - assert s.get() == (13, 0) - - s.set(14, 1) - assert s.get() == (14, 1) - - -def test_resume( - init_database, - ): - - live = SyncerBackend.live('Testchain:666', 13) - live.set(13, 2) - - resumes = SyncerBackend.resume('Testchain:666', 26) - - assert len(resumes) == 1 - resume = resumes[0] - - assert resume.get() == (13, 2) - - resume.set(13, 4) - assert resume.get() == (13, 4) - assert resume.start() == (13, 2) - assert resume.target() == 26 - - -def test_unsynced( - init_database, - ): - - live = SyncerBackend.live('Testchain:666', 13) - live.set(13, 2) - - resumes = SyncerBackend.resume('Testchain:666', 26) - live = SyncerBackend.live('Testchain:666', 26) - resumes[0].set(18, 12) - - resumes = SyncerBackend.resume('Testchain:666', 42) - - assert len(resumes) == 2 - - assert resumes[0].start() == (13, 2) - assert resumes[0].get() == (18, 12) - assert resumes[0].target() == 26 - - assert resumes[1].start() == (26, 0) - assert resumes[1].get() == (26, 0) - assert resumes[1].target() == 42