Compare commits

...

10 Commits

Author SHA1 Message Date
nolash
c37ad876f1
Upgrade deps, bump version 2021-11-15 14:27:57 +01:00
nolash
674bcc598a
Remove urllib exception leak in chainlib, rehabilitate tests 2021-10-19 14:20:11 +02:00
nolash
30c574a74b
Change result in threadpoolhistorysyncer 2021-09-29 19:37:31 +02:00
nolash
d0c4aa5679
Remove cmment 2021-09-29 19:18:35 +02:00
nolash
acbfcedc2b
Add parallization test with eth_tester backend 2021-09-29 19:01:57 +02:00
nolash
0e37914991
Separate ranges calculation from backend creation 2021-09-27 21:37:48 +02:00
nolash
a49c152e24
Add thread range generator 2021-09-27 20:37:13 +02:00
nolash
db6128f823
Add lock flags to model, backend 2021-09-26 19:32:08 +02:00
nolash
9f4362ad07
Rename doc dir 2021-08-29 11:28:16 +02:00
nolash
59715f2b82
Add filter, stack descriptions to infotex docs 2021-08-29 10:57:32 +02:00
26 changed files with 553 additions and 39 deletions

View File

@ -54,8 +54,12 @@ class FileBackend(Backend):
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR :param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
:type base_dir: str :type base_dir: str
""" """
__warned = False
def __init__(self, chain_spec, object_id, base_dir=BACKEND_BASE_DIR): def __init__(self, chain_spec, object_id, base_dir=BACKEND_BASE_DIR):
if not FileBackend.__warned:
logg.warning('file backend for chainsyncer is experimental and not yet guaranteed to handle interrupted filter execution.')
FileBackend.__warned = True
super(FileBackend, self).__init__(object_id, flags_reversed=True) super(FileBackend, self).__init__(object_id, flags_reversed=True)
self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir) self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir)
@ -394,6 +398,11 @@ class FileBackend(Backend):
return entries[len(entries)-1] return entries[len(entries)-1]
# n is zero-index of bit field
def begin_filter(self, n, base_dir=BACKEND_BASE_DIR):
pass
# n is zero-index of bit field # n is zero-index of bit field
def complete_filter(self, n, base_dir=BACKEND_BASE_DIR): def complete_filter(self, n, base_dir=BACKEND_BASE_DIR):
"""Sets the filter at the given index as completed. """Sets the filter at the given index as completed.

View File

@ -1,5 +1,6 @@
# standard imports # standard imports
import logging import logging
import uuid
# local imports # local imports
from .base import Backend from .base import Backend
@ -20,19 +21,38 @@ class MemBackend(Backend):
:type target_block: int :type target_block: int
""" """
def __init__(self, chain_spec, object_id, target_block=None, block_height=0, tx_height=0, flags=0): def __init__(self, chain_spec, object_id):
super(MemBackend, self).__init__(object_id) super(MemBackend, self).__init__(object_id)
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.block_height_offset = block_height
self.block_height_cursor = block_height
self.tx_height_offset = tx_height
self.tx_height_cursor = tx_height
self.block_height_target = target_block
self.db_session = None self.db_session = None
self.flags = flags self.block_height_offset = 0
self.block_height_cursor = 0
self.tx_height_offset = 0
self.tx_height_cursor = 0
self.block_height_target = None
self.flags = 0
self.flags_start = 0
self.flags_target = 0
self.filter_names = [] self.filter_names = []
@staticmethod
def custom(chain_spec, target_block, block_offset=0, tx_offset=0, flags=0, flags_count=0, *args, **kwargs):
object_id = kwargs.get('object_id', str(uuid.uuid4()))
backend = MemBackend(chain_spec, object_id)
backend.block_height_offset = block_offset
backend.block_height_cursor = block_offset
backend.tx_height_offset = tx_offset
backend.tx_height_cursor = tx_offset
backend.block_height_target = target_block
backend.flags = flags
backend.flags_count = flags_count
backend.flags_start = flags
flags_target = (2 ** flags_count) - 1
backend.flags_target = flags_target
return backend
def connect(self): def connect(self):
"""NOOP as memory backend implements no connection. """NOOP as memory backend implements no connection.
""" """
@ -67,13 +87,22 @@ class MemBackend(Backend):
return ((self.block_height_cursor, self.tx_height_cursor), self.flags) return ((self.block_height_cursor, self.tx_height_cursor), self.flags)
def start(self):
"""Get the initial syncer state
:rtype: tuple
:returns: block height / tx index tuple, and filter flags value
"""
return ((self.block_height_offset, self.tx_height_offset), self.flags_start)
def target(self): def target(self):
"""Returns the syncer target. """Returns the syncer target.
:rtype: tuple :rtype: tuple
:returns: block height / tx index tuple :returns: block height / tx index tuple
""" """
return (self.block_height_target, self.flags) return (self.block_height_target, self.flags_target)
def register_filter(self, name): def register_filter(self, name):
@ -86,7 +115,7 @@ class MemBackend(Backend):
self.filter_count += 1 self.filter_count += 1
def complete_filter(self, n): def begin_filter(self, n):
"""Set filter at index as completed for the current block / tx state. """Set filter at index as completed for the current block / tx state.
:param n: Filter index :param n: Filter index
@ -97,6 +126,10 @@ class MemBackend(Backend):
logg.debug('set filter {} {}'.format(self.filter_names[n], v)) logg.debug('set filter {} {}'.format(self.filter_names[n], v))
def complete_filter(self, n):
pass
def reset_filter(self): def reset_filter(self):
"""Set all filters to unprocessed for the current block / tx state. """Set all filters to unprocessed for the current block / tx state.
""" """
@ -104,11 +137,5 @@ class MemBackend(Backend):
self.flags = 0 self.flags = 0
# def get_flags(self):
# """Returns flags
# """
# return self.flags
def __str__(self): def __str__(self):
return "syncer membackend {} chain {} cursor {}".format(self.object_id, self.chain(), self.get()) return "syncer membackend {} chain {} cursor {}".format(self.object_id, self.chain(), self.get())

View File

@ -148,6 +148,30 @@ class SQLBackend(Backend):
return (target, filter_target,) return (target, filter_target,)
@staticmethod
def custom(chain_spec, target_block, block_offset=0, tx_offset=0, flags=0, flag_count=0, *args, **kwargs):
"""
:param flags: flags bit field
:type flags: bytes
:param flag_count: number of flags in bit field
:type flag_count:
"""
session = SessionBase.create_session()
o = BlockchainSync(str(chain_spec), block_offset, tx_offset, target_block)
session.add(o)
session.commit()
object_id = o.id
of = BlockchainSyncFilter(o, flag_count, flags, kwargs.get('flags_digest'))
session.add(of)
session.commit()
session.close()
return SQLBackend(chain_spec, object_id)
@staticmethod @staticmethod
def first(chain_spec): def first(chain_spec):
"""Returns the model object of the most recent syncer in backend. """Returns the model object of the most recent syncer in backend.
@ -314,8 +338,8 @@ class SQLBackend(Backend):
self.disconnect() self.disconnect()
def complete_filter(self, n): def begin_filter(self, n):
"""Sets the filter at the given index as completed. """Marks start of execution of the filter indexed by the corresponding bit.
:param n: Filter index :param n: Filter index
:type n: int :type n: int
@ -327,6 +351,14 @@ class SQLBackend(Backend):
self.disconnect() self.disconnect()
def complete_filter(self, n):
self.connect()
self.db_object_filter.release(check_bit=n)
self.db_session.add(self.db_object_filter)
self.db_session.commit()
self.disconnect()
def reset_filter(self): def reset_filter(self):
"""Reset all filter states. """Reset all filter states.
""" """

View File

@ -21,6 +21,7 @@ def upgrade():
sa.Column('id', sa.Integer, primary_key=True), sa.Column('id', sa.Integer, primary_key=True),
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True), sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True),
sa.Column('flags', sa.LargeBinary, nullable=True), sa.Column('flags', sa.LargeBinary, nullable=True),
sa.Column('flags_lock', sa.Integer, nullable=False, default=0),
sa.Column('flags_start', sa.LargeBinary, nullable=True), sa.Column('flags_start', sa.LargeBinary, nullable=True),
sa.Column('count', sa.Integer, nullable=False, default=0), sa.Column('count', sa.Integer, nullable=False, default=0),
sa.Column('digest', sa.String(64), nullable=False), sa.Column('digest', sa.String(64), nullable=False),

View File

@ -9,6 +9,7 @@ from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports # local imports
from .base import SessionBase from .base import SessionBase
from .sync import BlockchainSync from .sync import BlockchainSync
from chainsyncer.error import LockError
zero_digest = bytes(32).hex() zero_digest = bytes(32).hex()
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -32,11 +33,14 @@ class BlockchainSyncFilter(SessionBase):
chain_sync_id = Column(Integer, ForeignKey('chain_sync.id')) chain_sync_id = Column(Integer, ForeignKey('chain_sync.id'))
flags_start = Column(LargeBinary) flags_start = Column(LargeBinary)
flags = Column(LargeBinary) flags = Column(LargeBinary)
flags_lock = Column(Integer)
digest = Column(String(64)) digest = Column(String(64))
count = Column(Integer) count = Column(Integer)
def __init__(self, chain_sync, count=0, flags=None, digest=zero_digest): def __init__(self, chain_sync, count=0, flags=None, digest=None):
if digest == None:
digest = zero_digest
self.digest = digest self.digest = digest
self.count = count self.count = count
@ -47,10 +51,20 @@ class BlockchainSyncFilter(SessionBase):
flags = flags.to_bytes(bytecount, 'big') flags = flags.to_bytes(bytecount, 'big')
self.flags_start = flags self.flags_start = flags
self.flags = flags self.flags = flags
self.flags_lock = 0
self.chain_sync_id = chain_sync.id self.chain_sync_id = chain_sync.id
@staticmethod
def load(sync_id, session=None):
q = session.query(BlockchainSyncFilter)
q = q.filter(BlockchainSyncFilter.chain_sync_id==sync_id)
o = q.first()
if o.is_locked():
raise LockError('locked state for flag {} of sync id {} must be manually resolved'.format(o.flags_lock))
def add(self, name): def add(self, name):
"""Add a new filter to the syncer record. """Add a new filter to the syncer record.
@ -106,9 +120,16 @@ class BlockchainSyncFilter(SessionBase):
return (n, self.count, self.digest) return (n, self.count, self.digest)
def is_locked(self):
return self.flags_lock > 0
def clear(self): def clear(self):
"""Set current filter flag value to zero. """Set current filter flag value to zero.
""" """
if self.is_locked():
raise LockError('flag clear attempted when lock set at {}'.format(self.flags_lock))
self.flags = bytearray(len(self.flags)) self.flags = bytearray(len(self.flags))
@ -120,9 +141,14 @@ class BlockchainSyncFilter(SessionBase):
:raises IndexError: Invalid flag index :raises IndexError: Invalid flag index
:raises AttributeError: Flag at index already set :raises AttributeError: Flag at index already set
""" """
if self.is_locked():
raise LockError('flag set attempted when lock set at {}'.format(self.flags_lock))
if n > self.count: if n > self.count:
raise IndexError('bit flag out of range') raise IndexError('bit flag out of range')
self.flags_lock = n
b = 1 << (n % 8) b = 1 << (n % 8)
i = int(n / 8) i = int(n / 8)
byte_idx = len(self.flags)-1-i byte_idx = len(self.flags)-1-i
@ -131,3 +157,10 @@ class BlockchainSyncFilter(SessionBase):
flags = bytearray(self.flags) flags = bytearray(self.flags)
flags[byte_idx] |= b flags[byte_idx] |= b
self.flags = flags self.flags = flags
def release(self, check_bit=0):
if check_bit > 0:
if self.flags_lock > 0 and self.flags_lock != check_bit:
raise LockError('release attemped on explicit bit {}, but bit {} was locked'.format(check_bit, self.flags_lock))
self.flags_lock = 0

View File

@ -1,6 +1,13 @@
# standard imports # standard imports
import logging import logging
# external imports
from chainlib.eth.tx import (
transaction,
Tx,
)
from chainlib.error import RPCException
# local imports # local imports
from chainsyncer.error import NoBlockForYou from chainsyncer.error import NoBlockForYou
from .poll import BlockPollSyncer from .poll import BlockPollSyncer
@ -28,15 +35,18 @@ class HeadSyncer(BlockPollSyncer):
(pair, fltr) = self.backend.get() (pair, fltr) = self.backend.get()
logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr)) logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr))
i = pair[1] # set tx index from previous i = pair[1] # set tx index from previous
tx = None tx_src = None
while True: while True:
# handle block objects regardless of whether the tx data is embedded or not # handle block objects regardless of whether the tx data is embedded or not
try: try:
tx = block.tx(i) tx = block.tx(i)
except AttributeError: except AttributeError:
o = tx(block.txs[i]) o = transaction(block.txs[i])
r = conn.do(o) r = conn.do(o)
tx = self.interface.tx_from_src(Tx.src_normalize(r), block=block) tx_src = Tx.src_normalize(r)
tx = self.chain_interface.tx_from_src(tx_src, block=block)
#except IndexError as e: #except IndexError as e:
# logg.debug('index error syncer tx get {}'.format(e)) # logg.debug('index error syncer tx get {}'.format(e))
# break # break

View File

@ -26,7 +26,6 @@ class HistorySyncer(HeadSyncer):
if block_number == None: if block_number == None:
raise AttributeError('backend has no future target. Use HeadSyner instead') raise AttributeError('backend has no future target. Use HeadSyner instead')
self.block_target = block_number self.block_target = block_number
logg.debug('block target {}'.format(self.block_target))
def get(self, conn): def get(self, conn):
@ -44,7 +43,7 @@ class HistorySyncer(HeadSyncer):
raise SyncDone(self.block_target) raise SyncDone(self.block_target)
block_number = height[0] block_number = height[0]
block_hash = [] block_hash = []
o = self.chain_interface.block_by_number(block_number) o = self.chain_interface.block_by_number(block_number, include_tx=True)
try: try:
r = conn.do(o) r = conn.do(o)
# TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future # TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future

View File

@ -32,6 +32,7 @@ class BlockPollSyncer(Syncer):
(pair, fltr) = self.backend.get() (pair, fltr) = self.backend.get()
start_tx = pair[1] start_tx = pair[1]
while self.running and Syncer.running_global: while self.running and Syncer.running_global:
if self.pre_callback != None: if self.pre_callback != None:
self.pre_callback() self.pre_callback()

View File

@ -115,7 +115,6 @@ class ThreadPoolHistorySyncer(HistorySyncer):
pass pass
#def process(self, conn, block):
def get(self, conn): def get(self, conn):
if not self.running: if not self.running:
raise SyncDone() raise SyncDone()

View File

@ -0,0 +1,81 @@
# standard imports
import copy
import logging
import multiprocessing
import os
# external iports
from chainlib.eth.connection import RPCConnection
# local imports
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.driver.base import Syncer
from .threadpool import ThreadPoolTask
logg = logging.getLogger(__name__)
def sync_split(block_offset, block_target, count):
block_count = block_target - block_offset
if block_count < count:
logg.warning('block count is less than thread count, adjusting thread count to {}'.format(block_count))
count = block_count
blocks_per_thread = int(block_count / count)
ranges = []
for i in range(count):
block_target = block_offset + blocks_per_thread
offset = block_offset
target = block_target -1
ranges.append((offset, target,))
block_offset = block_target
return ranges
class ThreadPoolRangeTask:
def __init__(self, backend, sync_range, chain_interface, syncer_factory=HistorySyncer, filters=[]):
backend_start = backend.start()
backend_target = backend.target()
backend_class = backend.__class__
tx_offset = 0
flags = 0
if sync_range[0] == backend_start[0][0]:
tx_offset = backend_start[0][1]
flags = backend_start[1]
self.backend = backend_class.custom(backend.chain_spec, sync_range[1], block_offset=sync_range[0], tx_offset=tx_offset, flags=flags, flags_count=0)
self.syncer = syncer_factory(self.backend, chain_interface)
for fltr in filters:
self.syncer.add_filter(fltr)
def start_loop(self, interval):
conn = RPCConnection.connect(self.backend.chain_spec)
return self.syncer.loop(interval, conn)
class ThreadPoolRangeHistorySyncer:
def __init__(self, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None):
self.src_backend = backend
self.thread_count = thread_count
self.single_sync_offset = 0
self.runlevel_callback = None
backend_start = backend.start()
backend_target = backend.target()
self.ranges = sync_split(backend_start[0][0], backend_target[0], thread_count)
self.chain_interface = chain_interface
self.filters = []
def add_filter(self, f):
self.filters.append(f)
def loop(self, interval, conn):
self.worker_pool = multiprocessing.Pool(processes=self.thread_count)
for sync_range in self.ranges:
task = ThreadPoolRangeTask(self.src_backend, sync_range, self.chain_interface, filters=self.filters)
t = self.worker_pool.apply_async(task.start_loop, (0.1,))
logg.debug('result of worker {}: {}'.format(t, t.get()))
self.worker_pool.close()
self.worker_pool.join()

View File

@ -21,6 +21,12 @@ class BackendError(Exception):
pass pass
class LockError(Exception):
"""Base exception for attempting to manipulate a locked property
"""
pass
#class AbortTx(Exception): #class AbortTx(Exception):
# """ # """
# """ # """

View File

@ -36,6 +36,7 @@ class SyncFilter:
def __apply_one(self, fltr, idx, conn, block, tx, session): def __apply_one(self, fltr, idx, conn, block, tx, session):
self.backend.begin_filter(idx)
fltr.filter(conn, block, tx, session) fltr.filter(conn, block, tx, session)
self.backend.complete_filter(idx) self.backend.complete_filter(idx)

View File

@ -12,7 +12,6 @@ from chainsyncer.error import NoBlockForYou
logg = logging.getLogger().getChild(__name__) logg = logging.getLogger().getChild(__name__)
class MockConn: class MockConn:
"""Noop connection mocker. """Noop connection mocker.

37
doc/texinfo/backend.texi Normal file
View File

@ -0,0 +1,37 @@
@node chainsyncer-backend
@section Backend implementations
While syncing the chain, we need to keep track of where we are in the syncing process. The syncer backend is the pluggable component that records this state.
The state consists of three parts, in hierarchical order:
@enumerate
@item Block height
@item Transaction index in block
@item Individual code filters to execute per transaction
@end enumerate
@subsection Syncer types
By default there are two types of syncers.
There is the @emph{live syncer} which continues to consume new blocks as they are available.
There is also the @emph{history syncer} which syncs blocks between two specific points.
A typical first-time syncer invocation defines a starting block height from which to retrieve blocks, in which case two syncers will be created;
@itemize
@item A history syncer syncing from the start height until the height at the time of the invocation.
@item A live syncer syncing from the height at the time of the invocation, indefinitely retrieveing new blocks.
@end itemize
A typical resumed syncer invocation will involve additional syncers, depending on the state that was left off:
@itemize
@item History syncers for each previously incomplete history syncer run.
@item A new history syncer resuming where the last live syncer left off, until the height at the time of invocation of the resume.
@item A new live syncer syncing from the height at the time of the invocation, indefinitely retrieving new blocks.
@end itemize
In short, the consequence for the above is that one additional syncer will be spawned every time a syncer session is resumed.

25
doc/texinfo/filter.texi Normal file
View File

@ -0,0 +1,25 @@
@node chainsyncer-filter
@section Syncer filter
A filter is a piece of code that gets executed for every transaction in a block.
The filter will receive the block and transaction data, aswell as an RPC and syncer backend connection. This enables the code interact with the blockchain node provisioning the chain context of the syncer session, aswell as all chain-specific details of the specific block and transaction at the current syncer state.
@subsection Filter execution state
The syncer keeps track of which filters have been executed for which transaction.
Every filter added to the syncer adds a new bit to a bit field constituting a filter flag. When a filter has finished executing, the bit corresponding to the filter's index in the syncer filter array should be set and stored.
Since atomicity across code execution and flag setting cannot be guaranteed, it is the implementer's responsibility to ensure that a @emph{started} filter execution actually has been @emph{completed}. If proper care is not taken, individual filter code @emph{may} be run twice for a particular transaction.
Allowing for disambiguation between started and completed filters should be in scope for imminent improvement of the syncer package.
@subsection Integrity protection
Backends may optionally enable filter integrity protection. In practice this means that if a specific syncer session was started with a specific collection of filters, the syncer session may not be resumed with a different collection of filters.
Depending on the implementation, the collection of filters may or may not be dependent on the order in which they are added.

7
doc/texinfo/index.texi Normal file
View File

@ -0,0 +1,7 @@
@top chainsyncer
@chapter Chainsyncer
@include backend.texi
@include stack.texi
@include filter.texi

22
doc/texinfo/stack.texi Normal file
View File

@ -0,0 +1,22 @@
@node chainsyncer-stack
@section Syncer driver stack
The chainsyncer package defines a generic syncer stack intended to cover most circumstances.
The default implementation is single-threaded. This means that block and block transactions will be processed sequentially.
It is defined as follows, in order of inheritance:
@table @code
@item chainsyncer.driver.base.Syncer
Base syncer object, providing base properties and flow control
@item chainsyncer.driver.poll.BlockPollSyncer
Polling block retriever, defining the main loop and callback logic
@item chainsyncer.driver.head.HeadSyncer
Applied open-ended syncer settings, and defines the processing of a single block
@item chainsyncer.driver.history.HistorySyncer
Builds on chainsyncer.driver.head.HeadSyncer, and differs only in the fact that block not found is considered an error, and reaching the target block is considered a normal termination of the syncer loop.
@end table
Additionally, asynchronous driver modules exist in the codebase. These are still considered as experimental, and will only be documented once considered semi-stable.

View File

@ -1,4 +1,4 @@
confini>=0.3.6rc3,<0.5.0 confini~=0.5.1
semver==2.13.0 semver==2.13.0
hexathon~=0.0.1a8 hexathon~=0.1.0
chainlib>=0.0.9a2,<=0.1.0 chainlib~=0.0.10

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chainsyncer name = chainsyncer
version = 0.0.6a3 version = 0.0.7
description = Generic blockchain syncer driver description = Generic blockchain syncer driver
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

View File

@ -1,4 +1,8 @@
chainlib-eth~=0.0.9a4 chainlib-eth~=0.0.9a14
psycopg2==2.8.6 psycopg2==2.8.6
SQLAlchemy==1.3.20 SQLAlchemy==1.3.20
alembic==1.4.2 alembic==1.4.2
eth_tester==0.5.0b3
py-evm==0.3.0a20
rlp==2.0.1
pytest==6.0.1

View File

@ -8,7 +8,15 @@ import os
# external imports # external imports
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.interface import ChainInterface from chainlib.interface import ChainInterface
from chainlib.eth.tx import receipt from chainlib.eth.tx import (
receipt,
Tx,
)
from chainlib.eth.block import (
block_by_number,
Block,
)
from potaahto.symbols import snake_and_camel
# local imports # local imports
from chainsyncer.db import dsn_from_config from chainsyncer.db import dsn_from_config
@ -26,6 +34,10 @@ class EthChainInterface(ChainInterface):
def __init__(self): def __init__(self):
self._tx_receipt = receipt self._tx_receipt = receipt
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._tx_from_src = Tx.from_src
self._src_normalize = snake_and_camel
class TestBase(unittest.TestCase): class TestBase(unittest.TestCase):

View File

@ -9,6 +9,7 @@ from chainlib.chain import ChainSpec
from chainsyncer.db.models.base import SessionBase from chainsyncer.db.models.base import SessionBase
from chainsyncer.db.models.filter import BlockchainSyncFilter from chainsyncer.db.models.filter import BlockchainSyncFilter
from chainsyncer.backend.sql import SQLBackend from chainsyncer.backend.sql import SQLBackend
from chainsyncer.error import LockError
# testutil imports # testutil imports
from tests.chainsyncer_base import TestBase from tests.chainsyncer_base import TestBase
@ -31,6 +32,35 @@ class TestDatabase(TestBase):
self.assertIsNone(sync_id) self.assertIsNone(sync_id)
def test_backend_filter_lock(self):
s = SQLBackend.live(self.chain_spec, 42)
s.connect()
filter_id = s.db_object_filter.id
s.disconnect()
session = SessionBase.create_session()
o = session.query(BlockchainSyncFilter).get(filter_id)
self.assertEqual(len(o.flags), 0)
session.close()
s.register_filter(str(0))
s.register_filter(str(1))
s.connect()
filter_id = s.db_object_filter.id
s.disconnect()
session = SessionBase.create_session()
o = session.query(BlockchainSyncFilter).get(filter_id)
o.set(1)
with self.assertRaises(LockError):
o.set(2)
o.release()
o.set(2)
def test_backend_filter(self): def test_backend_filter(self):
s = SQLBackend.live(self.chain_spec, 42) s = SQLBackend.live(self.chain_spec, 42)
@ -59,6 +89,7 @@ class TestDatabase(TestBase):
for i in range(9): for i in range(9):
o.set(i) o.set(i)
o.release()
(f, c, d) = o.cursor() (f, c, d) = o.cursor()
self.assertEqual(f, t) self.assertEqual(f, t)
@ -144,8 +175,8 @@ class TestDatabase(TestBase):
s.register_filter('baz') s.register_filter('baz')
s.set(43, 13) s.set(43, 13)
s.complete_filter(0) s.begin_filter(0)
s.complete_filter(2) s.begin_filter(2)
s = SQLBackend.resume(self.chain_spec, 666) s = SQLBackend.resume(self.chain_spec, 666)
(pair, flags) = s[0].get() (pair, flags) = s[0].get()
@ -153,5 +184,16 @@ class TestDatabase(TestBase):
self.assertEqual(flags, 5) self.assertEqual(flags, 5)
def test_backend_sql_custom(self):
chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
flags = 5
flags_target = 1023
flag_count = 10
backend = SQLBackend.custom(chain_spec, 666, 42, 2, flags, flag_count)
self.assertEqual(((42, 2), flags), backend.start())
self.assertEqual(((42, 2), flags), backend.get())
self.assertEqual((666, flags_target), backend.target())
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -14,6 +14,7 @@ from chainsyncer.backend.file import (
FileBackend, FileBackend,
data_dir_for, data_dir_for,
) )
from chainsyncer.error import LockError
# test imports # test imports
from tests.chainsyncer_base import TestBase from tests.chainsyncer_base import TestBase
@ -36,10 +37,10 @@ class NaughtyCountExceptionFilter:
def filter(self, conn, block, tx, db_session=None): def filter(self, conn, block, tx, db_session=None):
self.c += 1
if self.c == self.croak: if self.c == self.croak:
self.croak = -1 self.croak = -1
raise RuntimeError('foo') raise RuntimeError('foo')
self.c += 1
def __str__(self): def __str__(self):
@ -75,6 +76,7 @@ class TestInterrupt(TestBase):
[6, 5, 2], [6, 5, 2],
[6, 4, 3], [6, 4, 3],
] ]
self.track_complete = True
def assert_filter_interrupt(self, vector, chain_interface): def assert_filter_interrupt(self, vector, chain_interface):
@ -100,10 +102,16 @@ class TestInterrupt(TestBase):
try: try:
syncer.loop(0.1, self.conn) syncer.loop(0.1, self.conn)
except RuntimeError: except RuntimeError:
self.croaked = 2
logg.info('caught croak') logg.info('caught croak')
pass pass
(pair, fltr) = self.backend.get() (pair, fltr) = self.backend.get()
self.assertGreater(fltr, 0) self.assertGreater(fltr, 0)
try:
syncer.loop(0.1, self.conn)
except LockError:
self.backend.complete_filter(2)
syncer.loop(0.1, self.conn) syncer.loop(0.1, self.conn)
for fltr in filters: for fltr in filters:
@ -112,11 +120,13 @@ class TestInterrupt(TestBase):
def test_filter_interrupt_memory(self): def test_filter_interrupt_memory(self):
self.track_complete = True
for vector in self.vectors: for vector in self.vectors:
self.backend = MemBackend(self.chain_spec, None, target_block=len(vector)) self.backend = MemBackend.custom(self.chain_spec, target_block=len(vector))
self.assert_filter_interrupt(vector, self.interface) self.assert_filter_interrupt(vector, self.interface)
#TODO: implement flag lock in file backend
@unittest.expectedFailure
def test_filter_interrupt_file(self): def test_filter_interrupt_file(self):
#for vector in self.vectors: #for vector in self.vectors:
vector = self.vectors.pop() vector = self.vectors.pop()
@ -127,12 +137,11 @@ class TestInterrupt(TestBase):
def test_filter_interrupt_sql(self): def test_filter_interrupt_sql(self):
self.track_complete = True
for vector in self.vectors: for vector in self.vectors:
self.backend = SQLBackend.initial(self.chain_spec, len(vector)) self.backend = SQLBackend.initial(self.chain_spec, len(vector))
self.assert_filter_interrupt(vector, self.interface) self.assert_filter_interrupt(vector, self.interface)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

32
tests/test_mem.py Normal file
View File

@ -0,0 +1,32 @@
# standard imports
import unittest
import logging
# external imports
from chainlib.chain import ChainSpec
# local imports
from chainsyncer.backend.memory import MemBackend
# testutil imports
from tests.chainsyncer_base import TestBase
logging.basicConfig(level=logging.DEBUG)
class TestMem(TestBase):
def test_backend_mem_custom(self):
chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
flags = int(5).to_bytes(2, 'big')
flag_count = 10
flags_target = (2 ** 10) - 1
backend = MemBackend.custom(chain_spec, 666, 42, 2, flags, flag_count, object_id='xyzzy')
self.assertEqual(((42, 2), flags), backend.start())
self.assertEqual(((42, 2), flags), backend.get())
self.assertEqual((666, flags_target), backend.target())
self.assertEqual(backend.object_id, 'xyzzy')
if __name__ == '__main__':
unittest.main()

12
tests/test_thread.py Normal file
View File

@ -0,0 +1,12 @@
# standard imports
import logging
import unittest
# test imports
from tests.chainsyncer_base import TestBase
class TestThreadRange(TestBase):
def test_hello(self):
ThreadPoolRangeHistorySyncer(None, 3)

114
tests/test_thread_range.py Normal file
View File

@ -0,0 +1,114 @@
# standard imports
import unittest
import logging
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.unittest.ethtester import EthTesterCase
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import (
RPCGasOracle,
Gas,
)
from chainlib.eth.unittest.base import TestRPCConnection
# local imports
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver.threadrange import (
sync_split,
ThreadPoolRangeHistorySyncer,
)
from chainsyncer.unittest.base import MockConn
from chainsyncer.unittest.db import ChainSyncerDb
# testutil imports
from tests.chainsyncer_base import (
EthChainInterface,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class SyncerCounter:
def __init__(self):
self.hits = []
def filter(self, conn, block, tx, db_session=None):
logg.debug('fltr {} {}'.format(block, tx))
self.hits.append((block, tx))
class TestBaseEth(EthTesterCase):
interface = EthChainInterface()
def setUp(self):
super(TestBaseEth, self).setUp()
self.db = ChainSyncerDb()
self.session = self.db.bind_session()
def tearDown(self):
self.session.commit()
self.db.release_session(self.session)
#os.unlink(self.db_path)
class TestThreadRange(TestBaseEth):
interface = EthChainInterface()
def test_range_split_even(self):
ranges = sync_split(5, 20, 3)
self.assertEqual(len(ranges), 3)
self.assertEqual(ranges[0], (5, 9))
self.assertEqual(ranges[1], (10, 14))
self.assertEqual(ranges[2], (15, 19))
def test_range_split_underflow(self):
ranges = sync_split(5, 8, 4)
self.assertEqual(len(ranges), 3)
self.assertEqual(ranges[0], (5, 5))
self.assertEqual(ranges[1], (6, 6))
self.assertEqual(ranges[2], (7, 7))
def test_range_syncer_hello(self):
#chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
chain_spec = ChainSpec('evm', 'foochain', 42)
backend = MemBackend.custom(chain_spec, 20, 5, 3, 5, 10)
#syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backend, self.interface)
syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface)
syncer.loop(0.1, None)
def test_range_syncer_content(self):
nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc)
gas_oracle = RPCGasOracle(self.rpc)
self.backend.mine_blocks(10)
c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec)
(tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 1024)
r = self.rpc.do(o)
self.backend.mine_blocks(3)
c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec)
(tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 2048)
r = self.rpc.do(o)
self.backend.mine_blocks(10)
backend = MemBackend.custom(self.chain_spec, 20, 5, 3, 5, 10)
syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface)
fltr = SyncerCounter()
syncer.add_filter(fltr)
syncer.loop(0.1, None)
if __name__ == '__main__':
unittest.main()