Compare commits
10 Commits
6f699dcf86
...
c37ad876f1
Author | SHA1 | Date | |
---|---|---|---|
|
c37ad876f1 | ||
|
674bcc598a | ||
|
30c574a74b | ||
|
d0c4aa5679 | ||
|
acbfcedc2b | ||
|
0e37914991 | ||
|
a49c152e24 | ||
|
db6128f823 | ||
|
9f4362ad07 | ||
|
59715f2b82 |
@ -54,8 +54,12 @@ class FileBackend(Backend):
|
||||
:param base_dir: Base directory to use for generation. Default is value of BACKEND_BASE_DIR
|
||||
:type base_dir: str
|
||||
"""
|
||||
__warned = False
|
||||
|
||||
def __init__(self, chain_spec, object_id, base_dir=BACKEND_BASE_DIR):
|
||||
if not FileBackend.__warned:
|
||||
logg.warning('file backend for chainsyncer is experimental and not yet guaranteed to handle interrupted filter execution.')
|
||||
FileBackend.__warned = True
|
||||
super(FileBackend, self).__init__(object_id, flags_reversed=True)
|
||||
self.object_data_dir = data_dir_for(chain_spec, object_id, base_dir=base_dir)
|
||||
|
||||
@ -394,6 +398,11 @@ class FileBackend(Backend):
|
||||
return entries[len(entries)-1]
|
||||
|
||||
|
||||
# n is zero-index of bit field
|
||||
def begin_filter(self, n, base_dir=BACKEND_BASE_DIR):
|
||||
pass
|
||||
|
||||
|
||||
# n is zero-index of bit field
|
||||
def complete_filter(self, n, base_dir=BACKEND_BASE_DIR):
|
||||
"""Sets the filter at the given index as completed.
|
||||
|
@ -1,5 +1,6 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
# local imports
|
||||
from .base import Backend
|
||||
@ -20,19 +21,38 @@ class MemBackend(Backend):
|
||||
:type target_block: int
|
||||
"""
|
||||
|
||||
def __init__(self, chain_spec, object_id, target_block=None, block_height=0, tx_height=0, flags=0):
|
||||
def __init__(self, chain_spec, object_id):
|
||||
super(MemBackend, self).__init__(object_id)
|
||||
self.chain_spec = chain_spec
|
||||
self.block_height_offset = block_height
|
||||
self.block_height_cursor = block_height
|
||||
self.tx_height_offset = tx_height
|
||||
self.tx_height_cursor = tx_height
|
||||
self.block_height_target = target_block
|
||||
self.db_session = None
|
||||
self.flags = flags
|
||||
self.block_height_offset = 0
|
||||
self.block_height_cursor = 0
|
||||
self.tx_height_offset = 0
|
||||
self.tx_height_cursor = 0
|
||||
self.block_height_target = None
|
||||
self.flags = 0
|
||||
self.flags_start = 0
|
||||
self.flags_target = 0
|
||||
self.filter_names = []
|
||||
|
||||
|
||||
@staticmethod
|
||||
def custom(chain_spec, target_block, block_offset=0, tx_offset=0, flags=0, flags_count=0, *args, **kwargs):
|
||||
object_id = kwargs.get('object_id', str(uuid.uuid4()))
|
||||
backend = MemBackend(chain_spec, object_id)
|
||||
backend.block_height_offset = block_offset
|
||||
backend.block_height_cursor = block_offset
|
||||
backend.tx_height_offset = tx_offset
|
||||
backend.tx_height_cursor = tx_offset
|
||||
backend.block_height_target = target_block
|
||||
backend.flags = flags
|
||||
backend.flags_count = flags_count
|
||||
backend.flags_start = flags
|
||||
flags_target = (2 ** flags_count) - 1
|
||||
backend.flags_target = flags_target
|
||||
return backend
|
||||
|
||||
|
||||
def connect(self):
|
||||
"""NOOP as memory backend implements no connection.
|
||||
"""
|
||||
@ -67,13 +87,22 @@ class MemBackend(Backend):
|
||||
return ((self.block_height_cursor, self.tx_height_cursor), self.flags)
|
||||
|
||||
|
||||
def start(self):
|
||||
"""Get the initial syncer state
|
||||
|
||||
:rtype: tuple
|
||||
:returns: block height / tx index tuple, and filter flags value
|
||||
"""
|
||||
return ((self.block_height_offset, self.tx_height_offset), self.flags_start)
|
||||
|
||||
|
||||
def target(self):
|
||||
"""Returns the syncer target.
|
||||
|
||||
:rtype: tuple
|
||||
:returns: block height / tx index tuple
|
||||
"""
|
||||
return (self.block_height_target, self.flags)
|
||||
return (self.block_height_target, self.flags_target)
|
||||
|
||||
|
||||
def register_filter(self, name):
|
||||
@ -86,7 +115,7 @@ class MemBackend(Backend):
|
||||
self.filter_count += 1
|
||||
|
||||
|
||||
def complete_filter(self, n):
|
||||
def begin_filter(self, n):
|
||||
"""Set filter at index as completed for the current block / tx state.
|
||||
|
||||
:param n: Filter index
|
||||
@ -97,6 +126,10 @@ class MemBackend(Backend):
|
||||
logg.debug('set filter {} {}'.format(self.filter_names[n], v))
|
||||
|
||||
|
||||
def complete_filter(self, n):
|
||||
pass
|
||||
|
||||
|
||||
def reset_filter(self):
|
||||
"""Set all filters to unprocessed for the current block / tx state.
|
||||
"""
|
||||
@ -104,11 +137,5 @@ class MemBackend(Backend):
|
||||
self.flags = 0
|
||||
|
||||
|
||||
# def get_flags(self):
|
||||
# """Returns flags
|
||||
# """
|
||||
# return self.flags
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return "syncer membackend {} chain {} cursor {}".format(self.object_id, self.chain(), self.get())
|
||||
|
@ -148,6 +148,30 @@ class SQLBackend(Backend):
|
||||
return (target, filter_target,)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def custom(chain_spec, target_block, block_offset=0, tx_offset=0, flags=0, flag_count=0, *args, **kwargs):
|
||||
"""
|
||||
|
||||
:param flags: flags bit field
|
||||
:type flags: bytes
|
||||
:param flag_count: number of flags in bit field
|
||||
:type flag_count:
|
||||
"""
|
||||
session = SessionBase.create_session()
|
||||
o = BlockchainSync(str(chain_spec), block_offset, tx_offset, target_block)
|
||||
session.add(o)
|
||||
session.commit()
|
||||
object_id = o.id
|
||||
|
||||
of = BlockchainSyncFilter(o, flag_count, flags, kwargs.get('flags_digest'))
|
||||
session.add(of)
|
||||
session.commit()
|
||||
|
||||
session.close()
|
||||
|
||||
return SQLBackend(chain_spec, object_id)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def first(chain_spec):
|
||||
"""Returns the model object of the most recent syncer in backend.
|
||||
@ -314,8 +338,8 @@ class SQLBackend(Backend):
|
||||
self.disconnect()
|
||||
|
||||
|
||||
def complete_filter(self, n):
|
||||
"""Sets the filter at the given index as completed.
|
||||
def begin_filter(self, n):
|
||||
"""Marks start of execution of the filter indexed by the corresponding bit.
|
||||
|
||||
:param n: Filter index
|
||||
:type n: int
|
||||
@ -327,6 +351,14 @@ class SQLBackend(Backend):
|
||||
self.disconnect()
|
||||
|
||||
|
||||
def complete_filter(self, n):
|
||||
self.connect()
|
||||
self.db_object_filter.release(check_bit=n)
|
||||
self.db_session.add(self.db_object_filter)
|
||||
self.db_session.commit()
|
||||
self.disconnect()
|
||||
|
||||
|
||||
def reset_filter(self):
|
||||
"""Reset all filter states.
|
||||
"""
|
||||
|
@ -21,6 +21,7 @@ def upgrade():
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True),
|
||||
sa.Column('flags', sa.LargeBinary, nullable=True),
|
||||
sa.Column('flags_lock', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('flags_start', sa.LargeBinary, nullable=True),
|
||||
sa.Column('count', sa.Integer, nullable=False, default=0),
|
||||
sa.Column('digest', sa.String(64), nullable=False),
|
||||
|
@ -9,6 +9,7 @@ from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
|
||||
# local imports
|
||||
from .base import SessionBase
|
||||
from .sync import BlockchainSync
|
||||
from chainsyncer.error import LockError
|
||||
|
||||
zero_digest = bytes(32).hex()
|
||||
logg = logging.getLogger(__name__)
|
||||
@ -32,11 +33,14 @@ class BlockchainSyncFilter(SessionBase):
|
||||
chain_sync_id = Column(Integer, ForeignKey('chain_sync.id'))
|
||||
flags_start = Column(LargeBinary)
|
||||
flags = Column(LargeBinary)
|
||||
flags_lock = Column(Integer)
|
||||
digest = Column(String(64))
|
||||
count = Column(Integer)
|
||||
|
||||
|
||||
def __init__(self, chain_sync, count=0, flags=None, digest=zero_digest):
|
||||
def __init__(self, chain_sync, count=0, flags=None, digest=None):
|
||||
if digest == None:
|
||||
digest = zero_digest
|
||||
self.digest = digest
|
||||
self.count = count
|
||||
|
||||
@ -47,10 +51,20 @@ class BlockchainSyncFilter(SessionBase):
|
||||
flags = flags.to_bytes(bytecount, 'big')
|
||||
self.flags_start = flags
|
||||
self.flags = flags
|
||||
self.flags_lock = 0
|
||||
|
||||
self.chain_sync_id = chain_sync.id
|
||||
|
||||
|
||||
@staticmethod
|
||||
def load(sync_id, session=None):
|
||||
q = session.query(BlockchainSyncFilter)
|
||||
q = q.filter(BlockchainSyncFilter.chain_sync_id==sync_id)
|
||||
o = q.first()
|
||||
if o.is_locked():
|
||||
raise LockError('locked state for flag {} of sync id {} must be manually resolved'.format(o.flags_lock))
|
||||
|
||||
|
||||
def add(self, name):
|
||||
"""Add a new filter to the syncer record.
|
||||
|
||||
@ -106,9 +120,16 @@ class BlockchainSyncFilter(SessionBase):
|
||||
return (n, self.count, self.digest)
|
||||
|
||||
|
||||
def is_locked(self):
|
||||
return self.flags_lock > 0
|
||||
|
||||
|
||||
def clear(self):
|
||||
"""Set current filter flag value to zero.
|
||||
"""
|
||||
if self.is_locked():
|
||||
raise LockError('flag clear attempted when lock set at {}'.format(self.flags_lock))
|
||||
|
||||
self.flags = bytearray(len(self.flags))
|
||||
|
||||
|
||||
@ -120,9 +141,14 @@ class BlockchainSyncFilter(SessionBase):
|
||||
:raises IndexError: Invalid flag index
|
||||
:raises AttributeError: Flag at index already set
|
||||
"""
|
||||
if self.is_locked():
|
||||
raise LockError('flag set attempted when lock set at {}'.format(self.flags_lock))
|
||||
|
||||
if n > self.count:
|
||||
raise IndexError('bit flag out of range')
|
||||
|
||||
self.flags_lock = n
|
||||
|
||||
b = 1 << (n % 8)
|
||||
i = int(n / 8)
|
||||
byte_idx = len(self.flags)-1-i
|
||||
@ -131,3 +157,10 @@ class BlockchainSyncFilter(SessionBase):
|
||||
flags = bytearray(self.flags)
|
||||
flags[byte_idx] |= b
|
||||
self.flags = flags
|
||||
|
||||
|
||||
def release(self, check_bit=0):
|
||||
if check_bit > 0:
|
||||
if self.flags_lock > 0 and self.flags_lock != check_bit:
|
||||
raise LockError('release attemped on explicit bit {}, but bit {} was locked'.format(check_bit, self.flags_lock))
|
||||
self.flags_lock = 0
|
||||
|
@ -1,6 +1,13 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from chainlib.eth.tx import (
|
||||
transaction,
|
||||
Tx,
|
||||
)
|
||||
from chainlib.error import RPCException
|
||||
|
||||
# local imports
|
||||
from chainsyncer.error import NoBlockForYou
|
||||
from .poll import BlockPollSyncer
|
||||
@ -28,15 +35,18 @@ class HeadSyncer(BlockPollSyncer):
|
||||
(pair, fltr) = self.backend.get()
|
||||
logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr))
|
||||
i = pair[1] # set tx index from previous
|
||||
tx = None
|
||||
tx_src = None
|
||||
while True:
|
||||
# handle block objects regardless of whether the tx data is embedded or not
|
||||
try:
|
||||
tx = block.tx(i)
|
||||
except AttributeError:
|
||||
o = tx(block.txs[i])
|
||||
o = transaction(block.txs[i])
|
||||
r = conn.do(o)
|
||||
tx = self.interface.tx_from_src(Tx.src_normalize(r), block=block)
|
||||
tx_src = Tx.src_normalize(r)
|
||||
tx = self.chain_interface.tx_from_src(tx_src, block=block)
|
||||
|
||||
|
||||
#except IndexError as e:
|
||||
# logg.debug('index error syncer tx get {}'.format(e))
|
||||
# break
|
||||
|
@ -26,7 +26,6 @@ class HistorySyncer(HeadSyncer):
|
||||
if block_number == None:
|
||||
raise AttributeError('backend has no future target. Use HeadSyner instead')
|
||||
self.block_target = block_number
|
||||
logg.debug('block target {}'.format(self.block_target))
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
@ -44,7 +43,7 @@ class HistorySyncer(HeadSyncer):
|
||||
raise SyncDone(self.block_target)
|
||||
block_number = height[0]
|
||||
block_hash = []
|
||||
o = self.chain_interface.block_by_number(block_number)
|
||||
o = self.chain_interface.block_by_number(block_number, include_tx=True)
|
||||
try:
|
||||
r = conn.do(o)
|
||||
# TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future
|
||||
|
@ -32,6 +32,7 @@ class BlockPollSyncer(Syncer):
|
||||
(pair, fltr) = self.backend.get()
|
||||
start_tx = pair[1]
|
||||
|
||||
|
||||
while self.running and Syncer.running_global:
|
||||
if self.pre_callback != None:
|
||||
self.pre_callback()
|
||||
|
@ -115,7 +115,6 @@ class ThreadPoolHistorySyncer(HistorySyncer):
|
||||
pass
|
||||
|
||||
|
||||
#def process(self, conn, block):
|
||||
def get(self, conn):
|
||||
if not self.running:
|
||||
raise SyncDone()
|
||||
|
81
chainsyncer/driver/threadrange.py
Normal file
81
chainsyncer/driver/threadrange.py
Normal file
@ -0,0 +1,81 @@
|
||||
# standard imports
|
||||
import copy
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
|
||||
# external iports
|
||||
from chainlib.eth.connection import RPCConnection
|
||||
# local imports
|
||||
from chainsyncer.driver.history import HistorySyncer
|
||||
from chainsyncer.driver.base import Syncer
|
||||
from .threadpool import ThreadPoolTask
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def sync_split(block_offset, block_target, count):
|
||||
block_count = block_target - block_offset
|
||||
if block_count < count:
|
||||
logg.warning('block count is less than thread count, adjusting thread count to {}'.format(block_count))
|
||||
count = block_count
|
||||
blocks_per_thread = int(block_count / count)
|
||||
|
||||
ranges = []
|
||||
for i in range(count):
|
||||
block_target = block_offset + blocks_per_thread
|
||||
offset = block_offset
|
||||
target = block_target -1
|
||||
ranges.append((offset, target,))
|
||||
block_offset = block_target
|
||||
return ranges
|
||||
|
||||
|
||||
class ThreadPoolRangeTask:
|
||||
|
||||
def __init__(self, backend, sync_range, chain_interface, syncer_factory=HistorySyncer, filters=[]):
|
||||
backend_start = backend.start()
|
||||
backend_target = backend.target()
|
||||
backend_class = backend.__class__
|
||||
tx_offset = 0
|
||||
flags = 0
|
||||
if sync_range[0] == backend_start[0][0]:
|
||||
tx_offset = backend_start[0][1]
|
||||
flags = backend_start[1]
|
||||
self.backend = backend_class.custom(backend.chain_spec, sync_range[1], block_offset=sync_range[0], tx_offset=tx_offset, flags=flags, flags_count=0)
|
||||
self.syncer = syncer_factory(self.backend, chain_interface)
|
||||
for fltr in filters:
|
||||
self.syncer.add_filter(fltr)
|
||||
|
||||
def start_loop(self, interval):
|
||||
conn = RPCConnection.connect(self.backend.chain_spec)
|
||||
return self.syncer.loop(interval, conn)
|
||||
|
||||
|
||||
class ThreadPoolRangeHistorySyncer:
|
||||
|
||||
def __init__(self, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None):
|
||||
self.src_backend = backend
|
||||
self.thread_count = thread_count
|
||||
self.single_sync_offset = 0
|
||||
self.runlevel_callback = None
|
||||
backend_start = backend.start()
|
||||
backend_target = backend.target()
|
||||
self.ranges = sync_split(backend_start[0][0], backend_target[0], thread_count)
|
||||
self.chain_interface = chain_interface
|
||||
self.filters = []
|
||||
|
||||
|
||||
def add_filter(self, f):
|
||||
self.filters.append(f)
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
self.worker_pool = multiprocessing.Pool(processes=self.thread_count)
|
||||
|
||||
for sync_range in self.ranges:
|
||||
task = ThreadPoolRangeTask(self.src_backend, sync_range, self.chain_interface, filters=self.filters)
|
||||
t = self.worker_pool.apply_async(task.start_loop, (0.1,))
|
||||
logg.debug('result of worker {}: {}'.format(t, t.get()))
|
||||
self.worker_pool.close()
|
||||
self.worker_pool.join()
|
@ -21,6 +21,12 @@ class BackendError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class LockError(Exception):
|
||||
"""Base exception for attempting to manipulate a locked property
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
#class AbortTx(Exception):
|
||||
# """
|
||||
# """
|
||||
|
@ -36,6 +36,7 @@ class SyncFilter:
|
||||
|
||||
|
||||
def __apply_one(self, fltr, idx, conn, block, tx, session):
|
||||
self.backend.begin_filter(idx)
|
||||
fltr.filter(conn, block, tx, session)
|
||||
self.backend.complete_filter(idx)
|
||||
|
||||
|
@ -12,7 +12,6 @@ from chainsyncer.error import NoBlockForYou
|
||||
logg = logging.getLogger().getChild(__name__)
|
||||
|
||||
|
||||
|
||||
class MockConn:
|
||||
"""Noop connection mocker.
|
||||
|
||||
|
37
doc/texinfo/backend.texi
Normal file
37
doc/texinfo/backend.texi
Normal file
@ -0,0 +1,37 @@
|
||||
@node chainsyncer-backend
|
||||
@section Backend implementations
|
||||
|
||||
While syncing the chain, we need to keep track of where we are in the syncing process. The syncer backend is the pluggable component that records this state.
|
||||
|
||||
The state consists of three parts, in hierarchical order:
|
||||
|
||||
@enumerate
|
||||
@item Block height
|
||||
@item Transaction index in block
|
||||
@item Individual code filters to execute per transaction
|
||||
@end enumerate
|
||||
|
||||
@subsection Syncer types
|
||||
|
||||
By default there are two types of syncers.
|
||||
|
||||
There is the @emph{live syncer} which continues to consume new blocks as they are available.
|
||||
|
||||
There is also the @emph{history syncer} which syncs blocks between two specific points.
|
||||
|
||||
A typical first-time syncer invocation defines a starting block height from which to retrieve blocks, in which case two syncers will be created;
|
||||
|
||||
@itemize
|
||||
@item A history syncer syncing from the start height until the height at the time of the invocation.
|
||||
@item A live syncer syncing from the height at the time of the invocation, indefinitely retrieveing new blocks.
|
||||
@end itemize
|
||||
|
||||
A typical resumed syncer invocation will involve additional syncers, depending on the state that was left off:
|
||||
|
||||
@itemize
|
||||
@item History syncers for each previously incomplete history syncer run.
|
||||
@item A new history syncer resuming where the last live syncer left off, until the height at the time of invocation of the resume.
|
||||
@item A new live syncer syncing from the height at the time of the invocation, indefinitely retrieving new blocks.
|
||||
@end itemize
|
||||
|
||||
In short, the consequence for the above is that one additional syncer will be spawned every time a syncer session is resumed.
|
25
doc/texinfo/filter.texi
Normal file
25
doc/texinfo/filter.texi
Normal file
@ -0,0 +1,25 @@
|
||||
@node chainsyncer-filter
|
||||
@section Syncer filter
|
||||
|
||||
A filter is a piece of code that gets executed for every transaction in a block.
|
||||
|
||||
The filter will receive the block and transaction data, aswell as an RPC and syncer backend connection. This enables the code interact with the blockchain node provisioning the chain context of the syncer session, aswell as all chain-specific details of the specific block and transaction at the current syncer state.
|
||||
|
||||
|
||||
@subsection Filter execution state
|
||||
|
||||
The syncer keeps track of which filters have been executed for which transaction.
|
||||
|
||||
Every filter added to the syncer adds a new bit to a bit field constituting a filter flag. When a filter has finished executing, the bit corresponding to the filter's index in the syncer filter array should be set and stored.
|
||||
|
||||
Since atomicity across code execution and flag setting cannot be guaranteed, it is the implementer's responsibility to ensure that a @emph{started} filter execution actually has been @emph{completed}. If proper care is not taken, individual filter code @emph{may} be run twice for a particular transaction.
|
||||
|
||||
Allowing for disambiguation between started and completed filters should be in scope for imminent improvement of the syncer package.
|
||||
|
||||
|
||||
@subsection Integrity protection
|
||||
|
||||
Backends may optionally enable filter integrity protection. In practice this means that if a specific syncer session was started with a specific collection of filters, the syncer session may not be resumed with a different collection of filters.
|
||||
|
||||
Depending on the implementation, the collection of filters may or may not be dependent on the order in which they are added.
|
||||
|
7
doc/texinfo/index.texi
Normal file
7
doc/texinfo/index.texi
Normal file
@ -0,0 +1,7 @@
|
||||
@top chainsyncer
|
||||
|
||||
@chapter Chainsyncer
|
||||
|
||||
@include backend.texi
|
||||
@include stack.texi
|
||||
@include filter.texi
|
22
doc/texinfo/stack.texi
Normal file
22
doc/texinfo/stack.texi
Normal file
@ -0,0 +1,22 @@
|
||||
@node chainsyncer-stack
|
||||
@section Syncer driver stack
|
||||
|
||||
The chainsyncer package defines a generic syncer stack intended to cover most circumstances.
|
||||
|
||||
The default implementation is single-threaded. This means that block and block transactions will be processed sequentially.
|
||||
|
||||
It is defined as follows, in order of inheritance:
|
||||
|
||||
@table @code
|
||||
@item chainsyncer.driver.base.Syncer
|
||||
Base syncer object, providing base properties and flow control
|
||||
@item chainsyncer.driver.poll.BlockPollSyncer
|
||||
Polling block retriever, defining the main loop and callback logic
|
||||
@item chainsyncer.driver.head.HeadSyncer
|
||||
Applied open-ended syncer settings, and defines the processing of a single block
|
||||
@item chainsyncer.driver.history.HistorySyncer
|
||||
Builds on chainsyncer.driver.head.HeadSyncer, and differs only in the fact that block not found is considered an error, and reaching the target block is considered a normal termination of the syncer loop.
|
||||
@end table
|
||||
|
||||
Additionally, asynchronous driver modules exist in the codebase. These are still considered as experimental, and will only be documented once considered semi-stable.
|
||||
|
@ -1,4 +1,4 @@
|
||||
confini>=0.3.6rc3,<0.5.0
|
||||
confini~=0.5.1
|
||||
semver==2.13.0
|
||||
hexathon~=0.0.1a8
|
||||
chainlib>=0.0.9a2,<=0.1.0
|
||||
hexathon~=0.1.0
|
||||
chainlib~=0.0.10
|
||||
|
@ -1,6 +1,6 @@
|
||||
[metadata]
|
||||
name = chainsyncer
|
||||
version = 0.0.6a3
|
||||
version = 0.0.7
|
||||
description = Generic blockchain syncer driver
|
||||
author = Louis Holbrook
|
||||
author_email = dev@holbrook.no
|
||||
|
@ -1,4 +1,8 @@
|
||||
chainlib-eth~=0.0.9a4
|
||||
chainlib-eth~=0.0.9a14
|
||||
psycopg2==2.8.6
|
||||
SQLAlchemy==1.3.20
|
||||
alembic==1.4.2
|
||||
eth_tester==0.5.0b3
|
||||
py-evm==0.3.0a20
|
||||
rlp==2.0.1
|
||||
pytest==6.0.1
|
||||
|
@ -8,7 +8,15 @@ import os
|
||||
# external imports
|
||||
from chainlib.chain import ChainSpec
|
||||
from chainlib.interface import ChainInterface
|
||||
from chainlib.eth.tx import receipt
|
||||
from chainlib.eth.tx import (
|
||||
receipt,
|
||||
Tx,
|
||||
)
|
||||
from chainlib.eth.block import (
|
||||
block_by_number,
|
||||
Block,
|
||||
)
|
||||
from potaahto.symbols import snake_and_camel
|
||||
|
||||
# local imports
|
||||
from chainsyncer.db import dsn_from_config
|
||||
@ -26,6 +34,10 @@ class EthChainInterface(ChainInterface):
|
||||
|
||||
def __init__(self):
|
||||
self._tx_receipt = receipt
|
||||
self._block_by_number = block_by_number
|
||||
self._block_from_src = Block.from_src
|
||||
self._tx_from_src = Tx.from_src
|
||||
self._src_normalize = snake_and_camel
|
||||
|
||||
|
||||
class TestBase(unittest.TestCase):
|
||||
|
@ -9,6 +9,7 @@ from chainlib.chain import ChainSpec
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
from chainsyncer.db.models.filter import BlockchainSyncFilter
|
||||
from chainsyncer.backend.sql import SQLBackend
|
||||
from chainsyncer.error import LockError
|
||||
|
||||
# testutil imports
|
||||
from tests.chainsyncer_base import TestBase
|
||||
@ -31,6 +32,35 @@ class TestDatabase(TestBase):
|
||||
self.assertIsNone(sync_id)
|
||||
|
||||
|
||||
def test_backend_filter_lock(self):
|
||||
s = SQLBackend.live(self.chain_spec, 42)
|
||||
|
||||
s.connect()
|
||||
filter_id = s.db_object_filter.id
|
||||
s.disconnect()
|
||||
|
||||
session = SessionBase.create_session()
|
||||
o = session.query(BlockchainSyncFilter).get(filter_id)
|
||||
self.assertEqual(len(o.flags), 0)
|
||||
session.close()
|
||||
|
||||
s.register_filter(str(0))
|
||||
s.register_filter(str(1))
|
||||
|
||||
s.connect()
|
||||
filter_id = s.db_object_filter.id
|
||||
s.disconnect()
|
||||
|
||||
session = SessionBase.create_session()
|
||||
o = session.query(BlockchainSyncFilter).get(filter_id)
|
||||
|
||||
o.set(1)
|
||||
with self.assertRaises(LockError):
|
||||
o.set(2)
|
||||
o.release()
|
||||
o.set(2)
|
||||
|
||||
|
||||
def test_backend_filter(self):
|
||||
s = SQLBackend.live(self.chain_spec, 42)
|
||||
|
||||
@ -59,6 +89,7 @@ class TestDatabase(TestBase):
|
||||
|
||||
for i in range(9):
|
||||
o.set(i)
|
||||
o.release()
|
||||
|
||||
(f, c, d) = o.cursor()
|
||||
self.assertEqual(f, t)
|
||||
@ -144,8 +175,8 @@ class TestDatabase(TestBase):
|
||||
s.register_filter('baz')
|
||||
|
||||
s.set(43, 13)
|
||||
s.complete_filter(0)
|
||||
s.complete_filter(2)
|
||||
s.begin_filter(0)
|
||||
s.begin_filter(2)
|
||||
|
||||
s = SQLBackend.resume(self.chain_spec, 666)
|
||||
(pair, flags) = s[0].get()
|
||||
@ -153,5 +184,16 @@ class TestDatabase(TestBase):
|
||||
self.assertEqual(flags, 5)
|
||||
|
||||
|
||||
def test_backend_sql_custom(self):
|
||||
chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
|
||||
flags = 5
|
||||
flags_target = 1023
|
||||
flag_count = 10
|
||||
backend = SQLBackend.custom(chain_spec, 666, 42, 2, flags, flag_count)
|
||||
self.assertEqual(((42, 2), flags), backend.start())
|
||||
self.assertEqual(((42, 2), flags), backend.get())
|
||||
self.assertEqual((666, flags_target), backend.target())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -14,6 +14,7 @@ from chainsyncer.backend.file import (
|
||||
FileBackend,
|
||||
data_dir_for,
|
||||
)
|
||||
from chainsyncer.error import LockError
|
||||
|
||||
# test imports
|
||||
from tests.chainsyncer_base import TestBase
|
||||
@ -36,10 +37,10 @@ class NaughtyCountExceptionFilter:
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
self.c += 1
|
||||
if self.c == self.croak:
|
||||
self.croak = -1
|
||||
raise RuntimeError('foo')
|
||||
self.c += 1
|
||||
|
||||
|
||||
def __str__(self):
|
||||
@ -75,6 +76,7 @@ class TestInterrupt(TestBase):
|
||||
[6, 5, 2],
|
||||
[6, 4, 3],
|
||||
]
|
||||
self.track_complete = True
|
||||
|
||||
|
||||
def assert_filter_interrupt(self, vector, chain_interface):
|
||||
@ -100,11 +102,17 @@ class TestInterrupt(TestBase):
|
||||
try:
|
||||
syncer.loop(0.1, self.conn)
|
||||
except RuntimeError:
|
||||
self.croaked = 2
|
||||
logg.info('caught croak')
|
||||
pass
|
||||
(pair, fltr) = self.backend.get()
|
||||
self.assertGreater(fltr, 0)
|
||||
syncer.loop(0.1, self.conn)
|
||||
|
||||
try:
|
||||
syncer.loop(0.1, self.conn)
|
||||
except LockError:
|
||||
self.backend.complete_filter(2)
|
||||
syncer.loop(0.1, self.conn)
|
||||
|
||||
for fltr in filters:
|
||||
logg.debug('{} {}'.format(str(fltr), fltr.c))
|
||||
@ -112,11 +120,13 @@ class TestInterrupt(TestBase):
|
||||
|
||||
|
||||
def test_filter_interrupt_memory(self):
|
||||
self.track_complete = True
|
||||
for vector in self.vectors:
|
||||
self.backend = MemBackend(self.chain_spec, None, target_block=len(vector))
|
||||
self.backend = MemBackend.custom(self.chain_spec, target_block=len(vector))
|
||||
self.assert_filter_interrupt(vector, self.interface)
|
||||
|
||||
|
||||
#TODO: implement flag lock in file backend
|
||||
@unittest.expectedFailure
|
||||
def test_filter_interrupt_file(self):
|
||||
#for vector in self.vectors:
|
||||
vector = self.vectors.pop()
|
||||
@ -127,12 +137,11 @@ class TestInterrupt(TestBase):
|
||||
|
||||
|
||||
def test_filter_interrupt_sql(self):
|
||||
self.track_complete = True
|
||||
for vector in self.vectors:
|
||||
self.backend = SQLBackend.initial(self.chain_spec, len(vector))
|
||||
self.assert_filter_interrupt(vector, self.interface)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
32
tests/test_mem.py
Normal file
32
tests/test_mem.py
Normal file
@ -0,0 +1,32 @@
|
||||
# standard imports
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from chainlib.chain import ChainSpec
|
||||
|
||||
# local imports
|
||||
from chainsyncer.backend.memory import MemBackend
|
||||
|
||||
# testutil imports
|
||||
from tests.chainsyncer_base import TestBase
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
|
||||
class TestMem(TestBase):
|
||||
|
||||
def test_backend_mem_custom(self):
|
||||
chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
|
||||
flags = int(5).to_bytes(2, 'big')
|
||||
flag_count = 10
|
||||
flags_target = (2 ** 10) - 1
|
||||
backend = MemBackend.custom(chain_spec, 666, 42, 2, flags, flag_count, object_id='xyzzy')
|
||||
self.assertEqual(((42, 2), flags), backend.start())
|
||||
self.assertEqual(((42, 2), flags), backend.get())
|
||||
self.assertEqual((666, flags_target), backend.target())
|
||||
self.assertEqual(backend.object_id, 'xyzzy')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
12
tests/test_thread.py
Normal file
12
tests/test_thread.py
Normal file
@ -0,0 +1,12 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import unittest
|
||||
|
||||
# test imports
|
||||
from tests.chainsyncer_base import TestBase
|
||||
|
||||
|
||||
class TestThreadRange(TestBase):
|
||||
|
||||
def test_hello(self):
|
||||
ThreadPoolRangeHistorySyncer(None, 3)
|
114
tests/test_thread_range.py
Normal file
114
tests/test_thread_range.py
Normal file
@ -0,0 +1,114 @@
|
||||
# standard imports
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from chainlib.chain import ChainSpec
|
||||
from chainlib.eth.unittest.ethtester import EthTesterCase
|
||||
from chainlib.eth.nonce import RPCNonceOracle
|
||||
from chainlib.eth.gas import (
|
||||
RPCGasOracle,
|
||||
Gas,
|
||||
)
|
||||
from chainlib.eth.unittest.base import TestRPCConnection
|
||||
|
||||
# local imports
|
||||
from chainsyncer.backend.memory import MemBackend
|
||||
from chainsyncer.driver.threadrange import (
|
||||
sync_split,
|
||||
ThreadPoolRangeHistorySyncer,
|
||||
)
|
||||
from chainsyncer.unittest.base import MockConn
|
||||
from chainsyncer.unittest.db import ChainSyncerDb
|
||||
|
||||
# testutil imports
|
||||
from tests.chainsyncer_base import (
|
||||
EthChainInterface,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class SyncerCounter:
|
||||
|
||||
def __init__(self):
|
||||
self.hits = []
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
logg.debug('fltr {} {}'.format(block, tx))
|
||||
self.hits.append((block, tx))
|
||||
|
||||
|
||||
class TestBaseEth(EthTesterCase):
|
||||
|
||||
interface = EthChainInterface()
|
||||
|
||||
def setUp(self):
|
||||
super(TestBaseEth, self).setUp()
|
||||
self.db = ChainSyncerDb()
|
||||
self.session = self.db.bind_session()
|
||||
|
||||
def tearDown(self):
|
||||
self.session.commit()
|
||||
self.db.release_session(self.session)
|
||||
#os.unlink(self.db_path)
|
||||
|
||||
|
||||
class TestThreadRange(TestBaseEth):
|
||||
|
||||
interface = EthChainInterface()
|
||||
|
||||
def test_range_split_even(self):
|
||||
ranges = sync_split(5, 20, 3)
|
||||
self.assertEqual(len(ranges), 3)
|
||||
self.assertEqual(ranges[0], (5, 9))
|
||||
self.assertEqual(ranges[1], (10, 14))
|
||||
self.assertEqual(ranges[2], (15, 19))
|
||||
|
||||
|
||||
def test_range_split_underflow(self):
|
||||
ranges = sync_split(5, 8, 4)
|
||||
self.assertEqual(len(ranges), 3)
|
||||
self.assertEqual(ranges[0], (5, 5))
|
||||
self.assertEqual(ranges[1], (6, 6))
|
||||
self.assertEqual(ranges[2], (7, 7))
|
||||
|
||||
|
||||
def test_range_syncer_hello(self):
|
||||
#chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo')
|
||||
chain_spec = ChainSpec('evm', 'foochain', 42)
|
||||
backend = MemBackend.custom(chain_spec, 20, 5, 3, 5, 10)
|
||||
#syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backend, self.interface)
|
||||
syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface)
|
||||
syncer.loop(0.1, None)
|
||||
|
||||
|
||||
def test_range_syncer_content(self):
|
||||
nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc)
|
||||
gas_oracle = RPCGasOracle(self.rpc)
|
||||
|
||||
self.backend.mine_blocks(10)
|
||||
|
||||
c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec)
|
||||
(tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 1024)
|
||||
r = self.rpc.do(o)
|
||||
|
||||
self.backend.mine_blocks(3)
|
||||
|
||||
c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec)
|
||||
(tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 2048)
|
||||
r = self.rpc.do(o)
|
||||
|
||||
self.backend.mine_blocks(10)
|
||||
|
||||
backend = MemBackend.custom(self.chain_spec, 20, 5, 3, 5, 10)
|
||||
syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface)
|
||||
fltr = SyncerCounter()
|
||||
syncer.add_filter(fltr)
|
||||
syncer.loop(0.1, None)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user