Remove legacy syncer code

This commit is contained in:
nolash 2021-02-22 21:53:36 +01:00
parent 01387993b5
commit 1f126037fe
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
12 changed files with 0 additions and 928 deletions

View File

@ -1 +0,0 @@
from .base import Syncer

View File

@ -1,201 +0,0 @@
# standard imports
import logging
# local imports
from cic_eth.db.models.sync import BlockchainSync
from cic_eth.db.models.base import SessionBase
logg = logging.getLogger()
class SyncerBackend:
"""Interface to block and transaction sync state.
:param chain_spec: Chain spec for the chain that syncer is running for.
:type chain_spec: cic_registry.chain.ChainSpec
:param object_id: Unique id for the syncer session.
:type object_id: number
"""
def __init__(self, chain_spec, object_id):
self.db_session = None
self.db_object = None
self.chain_spec = chain_spec
self.object_id = object_id
self.connect()
self.disconnect()
def connect(self):
"""Loads the state of the syncer session with the given id.
"""
if self.db_session == None:
self.db_session = SessionBase.create_session()
q = self.db_session.query(BlockchainSync)
q = q.filter(BlockchainSync.id==self.object_id)
self.db_object = q.first()
if self.db_object == None:
self.disconnect()
raise ValueError('sync entry with id {} not found'.format(self.object_id))
return self.db_session
def disconnect(self):
"""Commits state of sync to backend.
"""
if self.db_session != None:
self.db_session.add(self.db_object)
self.db_session.commit()
self.db_session.close()
self.db_session = None
def chain(self):
"""Returns chain spec for syncer
:returns: Chain spec
:rtype chain_spec: cic_registry.chain.ChainSpec
"""
return self.chain_spec
def get(self):
"""Get the current state of the syncer cursor.
:returns: Block and block transaction height, respectively
:rtype: tuple
"""
self.connect()
pair = self.db_object.cursor()
self.disconnect()
return pair
def set(self, block_height, tx_height):
"""Update the state of the syncer cursor
:param block_height: Block height of cursor
:type block_height: number
:param tx_height: Block transaction height of cursor
:type tx_height: number
:returns: Block and block transaction height, respectively
:rtype: tuple
"""
self.connect()
pair = self.db_object.set(block_height, tx_height)
self.disconnect()
return pair
def start(self):
"""Get the initial state of the syncer cursor.
:returns: Initial block and block transaction height, respectively
:rtype: tuple
"""
self.connect()
pair = self.db_object.start()
self.disconnect()
return pair
def target(self):
"""Get the target state (upper bound of sync) of the syncer cursor.
:returns: Target block height
:rtype: number
"""
self.connect()
target = self.db_object.target()
self.disconnect()
return target
@staticmethod
def first(chain):
"""Returns the model object of the most recent syncer in backend.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:returns: Last syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
return BlockchainSync.first(chain)
@staticmethod
def initial(chain, block_height):
"""Creates a new syncer session and commit its initial state to backend.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: New syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
object_id = None
session = SessionBase.create_session()
o = BlockchainSync(chain, 0, 0, block_height)
session.add(o)
session.commit()
object_id = o.id
session.close()
return SyncerBackend(chain, object_id)
@staticmethod
def resume(chain, block_height):
"""Retrieves and returns all previously unfinished syncer sessions.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: Syncer objects of unfinished syncs
:rtype: list of cic_eth.db.models.BlockchainSync
"""
syncers = []
session = SessionBase.create_session()
object_id = None
for object_id in BlockchainSync.get_unsynced(session=session):
logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id))
syncers.append(SyncerBackend(chain, object_id))
(block_resume, tx_resume) = BlockchainSync.get_last_live_height(block_height, session=session)
if block_height != block_resume:
o = BlockchainSync(chain, block_resume, tx_resume, block_height)
session.add(o)
session.commit()
object_id = o.id
syncers.append(SyncerBackend(chain, object_id))
logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height))
session.close()
return syncers
@staticmethod
def live(chain, block_height):
"""Creates a new open-ended syncer session starting at the given block height.
:param chain: Chain spec of chain that syncer is running for.
:type chain: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: "Live" syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
object_id = None
session = SessionBase.create_session()
o = BlockchainSync(chain, block_height, 0, None)
session.add(o)
session.commit()
object_id = o.id
session.close()
return SyncerBackend(chain, object_id)

View File

@ -1,51 +0,0 @@
# TODO: extend blocksync model
class Syncer:
"""Base class and interface for implementing a block sync poller routine.
:param bc_cache: Retrieves block cache cursors for chain head and latest processed block.
:type bc_cache: cic_eth.sync.SyncerBackend
"""
w3 = None
running_global = True
def __init__(self, bc_cache):
self.cursor = None
self.bc_cache = bc_cache
self.filter = []
self.running = True
def chain(self):
"""Returns the string representation of the chain spec for the chain the syncer is running on.
:returns: Chain spec string
:rtype: str
"""
return self.bc_cache.chain()
def get(self):
"""Get latest unprocessed blocks.
:returns: list of block hash strings
:rtype: list
"""
raise NotImplementedError()
def process(self, w3, ref):
"""Process transactions in a single block.
:param ref: Reference of object to process
:type ref: str, 0x-hex
"""
raise NotImplementedError()
def loop(self, interval):
"""Entry point for syncer loop
:param interval: Delay in seconds until next attempt if no new blocks are found.
:type interval: int
"""
raise NotImplementedError()

View File

@ -1,4 +0,0 @@
class LoopDone(Exception):
"""Exception raised when a syncing is complete.
"""
pass

View File

@ -1,51 +0,0 @@
# standard imports
import logging
# third-party imports
import web3
# local imports
from .mined import MinedSyncer
from .base import Syncer
logg = logging.getLogger()
class HeadSyncer(MinedSyncer):
"""Implements the get method in Syncer for retrieving every new mined block.
:param bc_cache: Retrieves block cache cursors for chain head and latest processed block.
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
"""
def __init__(self, bc_cache):
super(HeadSyncer, self).__init__(bc_cache)
# TODO: filter not returning all blocks, at least with ganache. kind of defeats the point, then
#self.w3_filter = rpc.w3.eth.filter({
# 'fromBlock': block_offset,
# }) #'latest')
#self.bc_cache.set(block_offset, 0)
logg.debug('initialized head syncer with offset {}'.format(bc_cache.start()))
"""Implements Syncer.get
:param w3: Web3 object
:type w3: web3.Web3
:returns: Block hash of newly mined blocks. if any
:rtype: list of str, 0x-hex
"""
def get(self, w3):
# Of course, the filter doesn't return the same block dict format as getBlock() so we'll just waste some cycles getting the hashes instead.
#hashes = []
#for block in self.w3_filter.get_new_entries():
# hashes.append(block['blockHash'])
#logg.debug('blocks {}'.format(hashes))
#return hashes
(block_number, tx_number) = self.bc_cache.get()
block_hash = []
try:
block = w3.eth.getBlock(block_number)
block_hash.append(block.hash)
except web3.exceptions.BlockNotFound:
pass
return block_hash

View File

@ -1,74 +0,0 @@
# standard imports
import logging
# third-party imports
from web3.exceptions import BlockNotFound
from .error import LoopDone
# local imports
from .mined import MinedSyncer
from .base import Syncer
from cic_eth.db.models.base import SessionBase
logg = logging.getLogger()
class HistorySyncer(MinedSyncer):
"""Implements the get method in Syncer for retrieving all blocks between last processed block before previous shutdown and block height at time of syncer start.
:param bc_cache: Retrieves block cache cursors for chain head and latest processed block.
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
:param mx: Maximum number of blocks to return in one call
:type mx: int
"""
def __init__(self, bc_cache, mx=500):
super(HistorySyncer, self).__init__(bc_cache)
self.max = mx
self.target = bc_cache.target()
logg.info('History syncer target block number {}'.format(self.target))
session_offset = self.bc_cache.get()
self.block_offset = session_offset[0]
self.tx_offset = session_offset[1]
logg.info('History syncer starting at {}:{}'.format(session_offset[0], session_offset[1]))
self.filter = []
"""Implements Syncer.get
BUG: Should also raise LoopDone when block array is empty after loop.
:param w3: Web3 object
:type w3: web3.Web3
:raises LoopDone: If a block is not found.
:return: Return a batch of blocks to process
:rtype: list of str, 0x-hex
"""
def get(self, w3):
sync_db = self.bc_cache
height = self.bc_cache.get()
logg.debug('height {}'.format(height))
block_last = height[0]
tx_last = height[1]
if not self.running:
raise LoopDone((block_last, tx_last))
b = []
block_target = block_last + self.max
if block_target > self.target:
block_target = self.target
logg.debug('target {} last {} max {}'.format(block_target, block_last, self.max))
for i in range(block_last, block_target):
if i == self.target:
logg.info('reached target {}, exiting'.format(i))
self.running = False
break
bhash = w3.eth.getBlock(i).hash
b.append(bhash)
logg.debug('appending block {} {}'.format(i, bhash.hex()))
if block_last == block_target:
logg.info('aleady reached target {}, exiting'.format(self.target))
self.running = False
return b

View File

@ -1,50 +0,0 @@
class MemPoolSyncer(Syncer):
def __init__(self, bc_cache):
raise NotImplementedError('incomplete, needs web3 tx to raw transaction conversion')
super(MemPoolSyncer, self).__init__(bc_cache)
# self.w3_filter = Syncer.w3.eth.filter('pending')
# for tx in tx_cache.txs:
# self.txs.append(tx)
# logg.debug('add tx {} to mempoolsyncer'.format(tx))
#
#
# def get(self):
# return self.w3_filter.get_new_entries()
#
#
# def process(self, tx_hash):
# tx_hash_hex = tx_hash.hex()
# if tx_hash_hex in self.txs:
# logg.debug('syncer already watching {}, skipping'.format(tx_hash_hex))
# tx = self.w3.eth.getTransaction(tx_hash_hex)
# serialized_tx = rlp.encode({
# 'nonce': tx.nonce,
# 'from': getattr(tx, 'from'),
# })
# logg.info('add {} to syncer: {}'.format(tx, serialized_tx))
# otx = Otx(
# nonce=tx.nonce,
# address=getattr(tx, 'from'),
# tx_hash=tx_hash_hex,
# signed_tx=serialized_tx,
# )
# Otx.session.add(otx)
# Otx.session.commit()
#
#
# def loop(self, interval):
# while Syncer.running:
# logg.debug('loop execute')
# txs = self.get()
# logg.debug('got txs {}'.format(txs))
# for tx in txs:
# #block_number = self.process(block.hex())
# self.process(tx)
# #if block_number > self.bc_cache.head():
# # self.bc_cache.head(block_number)
# time.sleep(interval)
# logg.info("Syncer no longer set to run, gracefully exiting")

View File

@ -1,109 +0,0 @@
# standard imports
import logging
import time
# third-party imports
import celery
# local impotes
from .base import Syncer
from cic_eth.queue.tx import set_final_status
from cic_eth.eth import RpcClient
app = celery.current_app
logg = logging.getLogger()
class MinedSyncer(Syncer):
"""Base implementation of block processor for mined blocks.
Loops through all transactions,
:param bc_cache: Retrieves block cache cursors for chain head and latest processed block.
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
"""
yield_delay = 0.005
def __init__(self, bc_cache):
super(MinedSyncer, self).__init__(bc_cache)
self.block_offset = 0
self.tx_offset = 0
def process(self, w3, ref):
"""Processes transactions in a single block, advancing transaction (and block) cursor accordingly.
:param w3: Web3 object
:type w3: web3.Web3
:param ref: Block reference (hash) to process
:type ref: str, 0x-hex
:returns: Block number of next unprocessed block
:rtype: number
"""
b = w3.eth.getBlock(ref)
c = w3.eth.getBlockTransactionCount(ref)
s = 0
if self.block_offset == b.number:
s = self.tx_offset
logg.debug('processing {} (blocknumber {}, count {}, offset {})'.format(ref, b.number, c, s))
for i in range(s, c):
tx = w3.eth.getTransactionByBlock(ref, i)
tx_hash_hex = tx['hash'].hex()
rcpt = w3.eth.getTransactionReceipt(tx_hash_hex)
logg.debug('{}/{} processing tx {} from block {} {}'.format(i+1, c, tx_hash_hex, b.number, ref))
ours = False
# TODO: ensure filter loop can complete on graceful shutdown
for f in self.filter:
#try:
session = self.bc_cache.connect()
task_uuid = f(w3, tx, rcpt, self.chain(), session)
#except Exception as e:
# logg.error('error in filter {} tx {}: {}'.format(f, tx_hash_hex, e))
# continue
if task_uuid != None:
logg.debug('tx {} passed to celery task {}'.format(tx_hash_hex, task_uuid))
s = celery.signature(
'set_final_status',
[tx_hash_hex, rcpt['blockNumber'], not rcpt['status']],
)
s.apply_async()
break
next_tx = i + 1
if next_tx == c:
self.bc_cache.set(b.number+1, 0)
else:
self.bc_cache.set(b.number, next_tx)
if c == 0:
logg.info('synced block {} has no transactions'.format(b.number))
#self.bc_cache.session(b.number+1, 0)
self.bc_cache.set(b.number+1, 0)
return b['number']
def loop(self, interval):
"""Loop running until the "running" property of Syncer is set to False.
Retrieves latest unprocessed blocks and processes them.
:param interval: Delay in seconds until next attempt if no new blocks are found.
:type interval: int
"""
while self.running and Syncer.running_global:
self.bc_cache.connect()
c = RpcClient(self.chain())
logg.debug('loop execute')
e = self.get(c.w3)
logg.debug('got blocks {}'.format(e))
for block in e:
block_number = self.process(c.w3, block.hex())
logg.info('processed block {} {}'.format(block_number, block.hex()))
self.bc_cache.disconnect()
if len(e) > 0:
time.sleep(self.yield_delay)
else:
time.sleep(interval)
logg.info("Syncer no longer set to run, gracefully exiting")

View File

@ -1,71 +0,0 @@
# standard imports
import logging
import datetime
import time
# third-party imports
import celery
# local imports
from .base import Syncer
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.enum import StatusEnum
from cic_eth.queue.tx import get_status_tx
logg = logging.getLogger()
celery_app = celery.current_app
class noop_cache:
def __init__(self, chain_spec):
self.chain_spec = chain_spec
def chain(self):
return self.chain_spec
class RetrySyncer(Syncer):
def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None):
cache = noop_cache(chain_spec)
super(RetrySyncer, self).__init__(cache)
if failed_grace_seconds == None:
failed_grace_seconds = stalled_grace_seconds
self.stalled_grace_seconds = stalled_grace_seconds
self.failed_grace_seconds = failed_grace_seconds
self.final_func = final_func
def get(self, w3):
# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds)
# failed_txs = get_status_tx(
# StatusEnum.SENDFAIL.value,
# before=before,
# )
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
stalled_txs = get_status_tx(
StatusEnum.SENT.value,
before=before,
)
# return list(failed_txs.keys()) + list(stalled_txs.keys())
return stalled_txs
def process(self, w3, ref):
logg.debug('tx {}'.format(ref))
for f in self.filter:
f(w3, ref, None, str(self.chain()))
def loop(self, interval):
chain_str = str(self.chain())
while self.running and Syncer.running_global:
c = RpcClient(self.chain())
for tx in self.get(c.w3):
self.process(c.w3, tx)
if self.final_func != None:
self.final_func(chain_str)
time.sleep(interval)

View File

@ -1,43 +0,0 @@
# standard imports
import logging
# local imports
from cic_eth.sync.head import HeadSyncer
from cic_eth.sync.backend import SyncerBackend
logg = logging.getLogger()
def test_head(
init_rpc,
init_database,
init_eth_tester,
mocker,
eth_empty_accounts,
):
#backend = SyncBackend(eth_empty_accounts[0], 'foo')
block_number = init_rpc.w3.eth.blockNumber
backend = SyncerBackend.live('foo:666', block_number)
syncer = HeadSyncer(backend)
#init_eth_tester.mine_block()
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[0],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_one = init_rpc.w3.eth.sendTransaction(tx)
block_number = init_rpc.w3.eth.blockNumber
backend.set(block_number, 0)
b = syncer.get(init_rpc.w3)
tx = init_rpc.w3.eth.getTransactionByBlock(b[0], 0)
assert tx.hash.hex() == tx_hash_one.hex()

View File

@ -1,194 +0,0 @@
# standard imports
import logging
# third-party imports
import pytest
from web3.exceptions import BlockNotFound
from cic_registry import CICRegistry
# local imports
from cic_eth.sync.history import HistorySyncer
from cic_eth.sync.head import HeadSyncer
#from cic_eth.sync import Syncer
from cic_eth.db.models.otx import OtxSync
from cic_eth.db.models.base import SessionBase
from cic_eth.sync.backend import SyncerBackend
logg = logging.getLogger()
class FinishedError(Exception):
pass
class DebugFilter:
def __init__(self, address):
self.txs = []
self.monitor_to_address = address
def filter(self, w3, tx, rcpt, chain_spec):
logg.debug('sync filter {}'.format(tx['hash'].hex()))
if tx['to'] == self.monitor_to_address:
self.txs.append(tx)
# hack workaround, latest block hash not found in eth_tester for some reason
if len(self.txs) == 2:
raise FinishedError('intentionally finished on tx {}'.format(tx))
def test_history(
init_rpc,
init_database,
init_eth_tester,
#celery_session_worker,
eth_empty_accounts,
):
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[0],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_one = init_rpc.w3.eth.sendTransaction(tx)
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[1],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_two = init_rpc.w3.eth.sendTransaction(tx)
init_eth_tester.mine_block()
block_number = init_rpc.w3.eth.blockNumber
live_syncer = SyncerBackend.live('foo:666', 0)
HeadSyncer(live_syncer)
history_syncers = SyncerBackend.resume('foo:666', block_number)
for history_syncer in history_syncers:
logg.info('history syncer start {} target {}'.format(history_syncer.start(), history_syncer.target()))
backend = history_syncers[0]
syncer = HistorySyncer(backend)
fltr = DebugFilter(eth_empty_accounts[0])
syncer.filter.append(fltr.filter)
logg.debug('have txs {} {}'.format(tx_hash_one.hex(), tx_hash_two.hex()))
try:
syncer.loop(0.1)
except FinishedError:
pass
except BlockNotFound as e:
logg.error('the last block given in loop does not seem to exist :/ {}'.format(e))
check_hashes = []
for h in fltr.txs:
check_hashes.append(h['hash'].hex())
assert tx_hash_one.hex() in check_hashes
assert tx_hash_two.hex() in check_hashes
def test_history_multiple(
init_rpc,
init_database,
init_eth_tester,
#celery_session_worker,
eth_empty_accounts,
):
block_number = init_rpc.w3.eth.blockNumber
live_syncer = SyncerBackend.live('foo:666', block_number)
HeadSyncer(live_syncer)
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[0],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_one = init_rpc.w3.eth.sendTransaction(tx)
init_eth_tester.mine_block()
block_number = init_rpc.w3.eth.blockNumber
history_syncers = SyncerBackend.resume('foo:666', block_number)
for history_syncer in history_syncers:
logg.info('halfway history syncer start {} target {}'.format(history_syncer.start(), history_syncer.target()))
live_syncer = SyncerBackend.live('foo:666', block_number)
HeadSyncer(live_syncer)
nonce = init_rpc.w3.eth.getTransactionCount(init_rpc.w3.eth.accounts[0], 'pending')
logg.debug('nonce {}'.format(nonce))
tx = {
'from': init_rpc.w3.eth.accounts[1],
'to': eth_empty_accounts[0],
'value': 404,
'gas': 21000,
'gasPrice': init_rpc.w3.eth.gasPrice,
'nonce': nonce,
}
tx_hash_two = init_rpc.w3.eth.sendTransaction(tx)
init_eth_tester.mine_block()
block_number = init_rpc.w3.eth.blockNumber
history_syncers = SyncerBackend.resume('foo:666', block_number)
live_syncer = SyncerBackend.live('foo:666', block_number)
HeadSyncer(live_syncer)
for history_syncer in history_syncers:
logg.info('history syncer start {} target {}'.format(history_syncer.start(), history_syncer.target()))
assert len(history_syncers) == 2
backend = history_syncers[0]
syncer = HistorySyncer(backend)
fltr = DebugFilter(eth_empty_accounts[0])
syncer.filter.append(fltr.filter)
try:
syncer.loop(0.1)
except FinishedError:
pass
except BlockNotFound as e:
logg.error('the last block given in loop does not seem to exist :/ {}'.format(e))
check_hashes = []
for h in fltr.txs:
check_hashes.append(h['hash'].hex())
assert tx_hash_one.hex() in check_hashes
backend = history_syncers[1]
syncer = HistorySyncer(backend)
fltr = DebugFilter(eth_empty_accounts[0])
syncer.filter.append(fltr.filter)
try:
syncer.loop(0.1)
except FinishedError:
pass
except BlockNotFound as e:
logg.error('the last block given in loop does not seem to exist :/ {}'.format(e))
check_hashes = []
for h in fltr.txs:
check_hashes.append(h['hash'].hex())
assert tx_hash_two.hex() in check_hashes
history_syncers = SyncerBackend.resume('foo:666', block_number)
assert len(history_syncers) == 0

View File

@ -1,79 +0,0 @@
# third-party imports
import pytest
# local imports
from cic_eth.db.models.sync import BlockchainSync
from cic_eth.sync.backend import SyncerBackend
def test_scratch(
init_database,
):
with pytest.raises(ValueError):
s = SyncerBackend('Testchain:666', 13)
syncer = SyncerBackend.live('Testchain:666', 13)
s = SyncerBackend('Testchain:666', syncer.object_id)
def test_live(
init_database,
):
s = SyncerBackend.live('Testchain:666', 13)
s.connect()
assert s.db_object.target() == None
s.disconnect()
assert s.get() == (13, 0)
s.set(14, 1)
assert s.get() == (14, 1)
def test_resume(
init_database,
):
live = SyncerBackend.live('Testchain:666', 13)
live.set(13, 2)
resumes = SyncerBackend.resume('Testchain:666', 26)
assert len(resumes) == 1
resume = resumes[0]
assert resume.get() == (13, 2)
resume.set(13, 4)
assert resume.get() == (13, 4)
assert resume.start() == (13, 2)
assert resume.target() == 26
def test_unsynced(
init_database,
):
live = SyncerBackend.live('Testchain:666', 13)
live.set(13, 2)
resumes = SyncerBackend.resume('Testchain:666', 26)
live = SyncerBackend.live('Testchain:666', 26)
resumes[0].set(18, 12)
resumes = SyncerBackend.resume('Testchain:666', 42)
assert len(resumes) == 2
assert resumes[0].start() == (13, 2)
assert resumes[0].get() == (18, 12)
assert resumes[0].target() == 26
assert resumes[1].start() == (26, 0)
assert resumes[1].get() == (26, 0)
assert resumes[1].target() == 42