Compare commits

..

No commits in common. "master" and "2ba87de195a8e5facb6cd32be246075e4b55fcd9" have entirely different histories.

44 changed files with 986 additions and 2096 deletions

View File

@ -1,19 +1,4 @@
* 0.3.7 - 0.3.0
- Remove hard eth dependency in settings rendering * Re-implement filter state handling on shep
- Add unlock cli tool - 0.1.0
* 0.3.6 * Add deferred idle handling
- Add cli arg processing and settings renderer
* 0.3.5
- Allow memory-only shep if factory set to None in store constructor
* 0.3.4
- Use explicit bool check in filter interrupt check
* 0.3.3
- Include shep persistent state bootstrap sync
- Add chainsyncer extras
* 0.3.2
- Implement rocksdb backend
* 0.3.1
- Upgrade to release shep version
- Move sync state to SYNC after start
* 0.3.0
- Re-implement chainsyncer on shep

View File

@ -1 +1 @@
include *requirements.txt LICENSE.txt chainsyncer/data/config/* include *requirements.txt LICENSE.txt chainsyncer/db/migrations/default/* chainsyncer/db/migrations/default/versions/* chainsyncer/db/migrations/default/versions/src/*

View File

@ -1,12 +0,0 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

View File

@ -1,14 +0,0 @@
# local imports
from .base import SyncFlag
def process_flags(argparser, flags):
if flags & SyncFlag.RANGE > 0:
argparser.add_argument('--offset', type=int, help='Block to start sync from. Default is start of history (0).')
argparser.add_argument('--until', type=int, default=-1, help='Block to stop sync on. Default is stop at block height of first run.')
if flags & SyncFlag.HEAD > 0:
argparser.add_argument('--head', action='store_true', help='Start from latest block as offset')
argparser.add_argument('--keep-alive', action='store_true', help='Do not stop syncing when caught up')
argparser.add_argument('--backend', type=str, help='Backend to use for state store')

View File

@ -1,7 +0,0 @@
# standard imports
import enum
class SyncFlag(enum.IntEnum):
RANGE = 1
HEAD = 2

View File

@ -1,20 +0,0 @@
# external imports
from chainsyncer.cli import SyncFlag
def process_config(config, args, flags):
args_override = {}
args_override['SYNCER_BACKEND'] = getattr(args, 'backend')
if flags & SyncFlag.RANGE:
args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
args_override['SYNCER_LIMIT'] = getattr(args, 'until')
config.dict_override(args_override, 'local cli args')
if flags & SyncFlag.HEAD:
config.add(getattr(args, 'keep_alive'), '_KEEP_ALIVE')
config.add(getattr(args, 'head'), '_HEAD')
return config

View File

@ -1,4 +0,0 @@
[syncer]
offset = 0
limit = 0
backend = mem

View File

@ -1 +1 @@
from .base import SyncDriver from .base import Syncer

View File

@ -1,21 +1,48 @@
# standard imports # standard imports
import uuid
import logging import logging
import time import time
import signal import signal
import json
# external imports
from chainlib.error import JSONRPCException
# local imports # local imports
from chainsyncer.filter import SyncFilter
from chainsyncer.error import ( from chainsyncer.error import (
SyncDone, SyncDone,
NoBlockForYou, NoBlockForYou,
) )
from chainsyncer.session import SyncSession
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
NS_DIV = 1000000000
class SyncDriver: def noop_callback(block, tx):
"""Logger-only callback for pre- and post processing.
:param block: Block object
:type block: chainlib.block.Block
:param tx: Transaction object
:type tx: chainlib.tx.Tx
"""
logg.debug('noop callback ({},{})'.format(block, tx))
class Syncer:
"""Base class for syncer implementations.
:param backend: Syncer state backend
:type backend: chainsyncer.backend.base.Backend implementation
:param chain_interface: Chain interface implementation
:type chain_interface: chainlib.interface.ChainInterface implementation
:param pre_callback: Function to call before polling. Function will receive no arguments.
:type pre_callback: function
:param block_callback: Function to call before processing txs in a retrieved block. Function should have signature as chainsyncer.driver.base.noop_callback
:type block_callback: function
:param post_callback: Function to call after polling. Function will receive no arguments.
:type post_callback: function
"""
running_global = True running_global = True
"""If set to false syncer will terminate polling loop.""" """If set to false syncer will terminate polling loop."""
@ -28,22 +55,19 @@ class SyncDriver:
name = 'base' name = 'base'
"""Syncer name, to be overriden for each extended implementation.""" """Syncer name, to be overriden for each extended implementation."""
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
def __init__(self, store, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None): self.chain_interface = chain_interface
self.store = store self.cursor = None
self.running = True self.running = True
self.backend = backend
self.filter = SyncFilter(backend)
self.block_callback = block_callback
self.pre_callback = pre_callback self.pre_callback = pre_callback
self.post_callback = post_callback self.post_callback = post_callback
self.block_callback = block_callback if not Syncer.signal_set:
self.idle_callback = idle_callback for sig in Syncer.signal_request:
self.last_start = 0
self.clock_id = time.CLOCK_MONOTONIC_RAW
self.store.connect()
self.store.start(offset=offset, target=target)
if not SyncDriver.signal_set:
for sig in SyncDriver.signal_request:
signal.signal(sig, self.__sig_terminate) signal.signal(sig, self.__sig_terminate)
SyncDriver.signal_set = True Syncer.signal_set = True
def __sig_terminate(self, sig, frame): def __sig_terminate(self, sig, frame):
@ -55,90 +79,48 @@ class SyncDriver:
"""Set syncer to terminate as soon as possible. """Set syncer to terminate as soon as possible.
""" """
logg.info('termination requested!') logg.info('termination requested!')
SyncDriver.running_global = False Syncer.running_global = False
self.running = False self.running = False
def run(self, conn, interval=1): def add_filter(self, f):
while self.running_global: """Add filter to be processed for each transaction.
self.session = SyncSession(self.store)
item = self.session.start()
if item == None:
self.running = False
self.running_global = False
break
self.loop(conn, item, interval=interval)
:param f: Filter
def idle(self, interval): :type f: Object instance implementing signature as in chainsyncer.filter.NoopFilter.filter
interval *= NS_DIV """
idle_start = time.clock_gettime_ns(self.clock_id) self.filter.add(f)
delta = idle_start - self.last_start self.backend.register_filter(str(f))
if delta > interval:
interval /= NS_DIV
time.sleep(interval)
return
if self.idle_callback != None:
r = True
while r:
before = time.clock_gettime_ns(self.clock_id)
r = self.idle_callback(interval)
after = time.clock_gettime_ns(self.clock_id)
delta = after - before
if delta < 0:
return
interval -= delta
if interval < 0:
return
interval /= NS_DIV
time.sleep(interval)
def loop(self, conn, item, interval=1):
logg.debug('started loop')
while self.running and SyncDriver.running_global:
self.last_start = time.clock_gettime_ns(self.clock_id)
if self.pre_callback != None:
self.pre_callback()
while True and self.running:
try:
block = self.get(conn, item)
except SyncDone as e:
logg.info('all blocks sumitted for processing: {}'.format(e))
return
except NoBlockForYou as e:
break
if self.block_callback != None:
self.block_callback(block, None)
try:
self.process(conn, item, block)
except IndexError:
item.next(advance_block=True)
time.sleep(self.yield_delay)
if self.store.target > -1 and block.number >= self.store.target:
self.running = False
if self.post_callback != None:
self.post_callback()
self.idle(interval)
def process_single(self, conn, block, tx): def process_single(self, conn, block, tx):
self.session.filter(conn, block, tx) """Set syncer backend cursor to the given transaction index and block height, and apply all registered filters on transaction.
:param conn: RPC connection instance
:type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
:param block: Transaction object
:type block: chainlib.tx.Tx
"""
self.backend.set(block.number, tx.index)
self.filter.apply(conn, block, tx)
def process(self, conn, item, block): def loop(self, interval, conn):
raise NotImplementedError()
def process(self, conn, block):
raise NotImplementedError() raise NotImplementedError()
def get(self, conn): def get(self, conn):
raise NotImplementedError() raise NotImplementedError()
def __str__(self):
return 'syncer "{}" {}'.format(
self.name,
self.backend,
)

View File

@ -1,57 +0,0 @@
# external imports
from chainlib.error import RPCException
# local imports
from chainsyncer.error import NoBlockForYou
from chainsyncer.driver import SyncDriver
class ChainInterfaceDriver(SyncDriver):
def __init__(self, store, chain_interface, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None):
super(ChainInterfaceDriver, self).__init__(store, offset=offset, target=target, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_callback, idle_callback=idle_callback)
self.chain_interface = chain_interface
def get(self, conn, item):
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
:param conn: RPC connection
:type conn: chainlib.connectin.RPCConnection
:raises NoBlockForYou: Block at the given height does not exist
:rtype: chainlib.block.Block
:returns: Block object
"""
o = self.chain_interface.block_by_number(item.cursor)
try:
r = conn.do(o)
except RPCException:
r = None
if r == None:
raise NoBlockForYou()
b = self.chain_interface.block_from_src(r)
b.txs = b.txs[item.tx_cursor:]
return b
def process(self, conn, item, block):
tx_src = None
i = item.tx_cursor
while True:
# handle block objects regardless of whether the tx data is embedded or not
try:
tx = block.tx(i)
except AttributeError:
tx_hash = block.txs[i]
o = self.chain_interface.tx_by_hash(tx_hash, block=block)
r = conn.do(o)
#tx = self.chain_interface.tx_from_src(tx_src, block=block)
rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash))
if rcpt != None:
tx.apply_receipt(self.chain_interface.src_normalize(rcpt))
self.process_single(conn, block, tx)
i += 1

View File

@ -0,0 +1,86 @@
# standard imports
import logging
# external imports
from chainlib.eth.tx import (
transaction,
Tx,
)
from chainlib.error import RPCException
# local imports
from chainsyncer.error import NoBlockForYou
from .poll import BlockPollSyncer
logg = logging.getLogger(__name__)
class HeadSyncer(BlockPollSyncer):
"""Extends the block poller, implementing an open-ended syncer.
"""
name = 'head'
def process(self, conn, block):
"""Process a single block using the given RPC connection.
Processing means that all filters are executed on all transactions in the block.
If the block object does not contain the transaction details, the details will be retrieved from the network (incurring the corresponding performance penalty).
:param conn: RPC connection
:type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
"""
(pair, fltr) = self.backend.get()
logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr))
i = pair[1] # set tx index from previous
tx_src = None
while True:
# handle block objects regardless of whether the tx data is embedded or not
try:
tx = block.tx(i)
except AttributeError:
o = transaction(block.txs[i])
r = conn.do(o)
tx_src = Tx.src_normalize(r)
tx = self.chain_interface.tx_from_src(tx_src, block=block)
#except IndexError as e:
# logg.debug('index error syncer tx get {}'.format(e))
# break
rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash))
if rcpt != None:
tx.apply_receipt(self.chain_interface.src_normalize(rcpt))
self.process_single(conn, block, tx)
self.backend.reset_filter()
i += 1
def get(self, conn):
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
:param conn: RPC connection
:type conn: chainlib.connectin.RPCConnection
:raises NoBlockForYou: Block at the given height does not exist
:rtype: chainlib.block.Block
:returns: Block object
"""
(height, flags) = self.backend.get()
block_number = height[0]
block_hash = []
o = self.chain_interface.block_by_number(block_number)
try:
r = conn.do(o)
except RPCException:
r = None
if r == None:
raise NoBlockForYou()
b = self.chain_interface.block_from_src(r)
b.txs = b.txs[height[1]:]
return b

View File

@ -0,0 +1,56 @@
# standard imports
import logging
# external imports
from chainlib.error import RPCException
# local imports
from .head import HeadSyncer
from chainsyncer.error import SyncDone
from chainlib.error import RPCException
logg = logging.getLogger(__name__)
class HistorySyncer(HeadSyncer):
"""Bounded syncer implementation of the block poller. Reuses the head syncer process method implementation.
"""
name = 'history'
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
super(HeadSyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
self.block_target = None
(block_number, flags) = self.backend.target()
if block_number == None:
raise AttributeError('backend has no future target. Use HeadSyner instead')
self.block_target = block_number
def get(self, conn):
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
:param conn: RPC connection
:type conn: chainlib.connectin.RPCConnection
:raises SyncDone: Block target reached (at which point the syncer should terminate).
:rtype: chainlib.block.Block
:returns: Block object
:todo: DRY against HeadSyncer
"""
(height, flags) = self.backend.get()
if self.block_target < height[0]:
raise SyncDone(self.block_target)
block_number = height[0]
block_hash = []
o = self.chain_interface.block_by_number(block_number, include_tx=True)
try:
r = conn.do(o)
# TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future
except RPCException:
r = None
if r == None:
raise SyncDone()
b = self.chain_interface.block_from_src(r)
return b

View File

@ -0,0 +1,99 @@
# standard imports
import logging
import time
# local imports
from .base import Syncer
from chainsyncer.error import (
SyncDone,
NoBlockForYou,
)
logg = logging.getLogger(__name__)
NS_DIV = 1000000000
class BlockPollSyncer(Syncer):
"""Syncer driver implementation of chainsyncer.driver.base.Syncer that retrieves new blocks through polling.
"""
name = 'blockpoll'
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, idle_callback=None):
super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback=pre_callback, block_callback=block_callback, post_callback=post_callback)
self.idle_callback = idle_callback
self.last_start = 0
self.clock_id = time.CLOCK_MONOTONIC_RAW
def idle(self, interval):
interval *= NS_DIV
idle_start = time.clock_gettime_ns(self.clock_id)
delta = idle_start - self.last_start
if delta > interval:
interval /= NS_DIV
time.sleep(interval)
return
if self.idle_callback != None:
r = True
while r:
before = time.clock_gettime_ns(self.clock_id)
r = self.idle_callback(interval)
after = time.clock_gettime_ns(self.clock_id)
delta = after - before
if delta < 0:
return
interval -= delta
if interval < 0:
return
interval /= NS_DIV
time.sleep(interval)
def loop(self, interval, conn):
"""Indefinite loop polling the given RPC connection for new blocks in the given interval.
:param interval: Seconds to wait for next poll after processing of previous poll has been completed.
:type interval: int
:param conn: RPC connection
:type conn: chainlib.connection.RPCConnection
:rtype: tuple
:returns: See chainsyncer.backend.base.Backend.get
"""
(pair, fltr) = self.backend.get()
start_tx = pair[1]
while self.running and Syncer.running_global:
self.last_start = time.clock_gettime_ns(self.clock_id)
if self.pre_callback != None:
self.pre_callback()
while True and self.running:
if start_tx > 0:
start_tx -= 1
continue
try:
block = self.get(conn)
except SyncDone as e:
logg.info('all blocks sumitted for processing: {}'.format(e))
return self.backend.get()
except NoBlockForYou as e:
break
if self.block_callback != None:
self.block_callback(block, None)
last_block = block
try:
self.process(conn, block)
except IndexError:
self.backend.set(block.number + 1, 0)
start_tx = 0
time.sleep(self.yield_delay)
if self.post_callback != None:
self.post_callback()
self.idle(interval)

View File

@ -0,0 +1,133 @@
# standard imports
import logging
#import threading
import multiprocessing
import queue
# external imports
from chainlib.error import RPCException
# local imports
from .history import HistorySyncer
from chainsyncer.error import SyncDone
logg = logging.getLogger(__name__)
class ThreadedHistorySyncer(HistorySyncer):
def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0):
super(ThreadedHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
self.workers = []
if conn_limit == 0:
conn_limit = thread_limit
#self.conn_pool = queue.Queue(conn_limit)
#self.queue = queue.Queue(thread_limit)
#self.quit_queue = queue.Queue(1)
self.conn_pool = multiprocessing.Queue(conn_limit)
self.queue = multiprocessing.Queue(thread_limit)
self.quit_queue = multiprocessing.Queue(1)
#self.lock = threading.Lock()
self.lock = multiprocessing.Lock()
for i in range(thread_limit):
#w = threading.Thread(target=self.worker)
w = multiprocessing.Process(target=self.worker)
self.workers.append(w)
for i in range(conn_limit):
self.conn_pool.put(conn_factory())
def terminate(self):
self.quit_queue.put(())
super(ThreadedHistorySyncer, self).terminate()
def worker(self):
while True:
block_number = None
try:
block_number = self.queue.get(timeout=0.01)
except queue.Empty:
if self.quit_queue.qsize() > 0:
#logg.debug('{} received quit'.format(threading.current_thread().getName()))
logg.debug('{} received quit'.format(multiprocessing.current_process().name))
return
continue
conn = self.conn_pool.get()
try:
logg.debug('processing parent {} {}'.format(conn, block_number))
self.process_parent(conn, block_number)
except IndexError:
pass
except RPCException as e:
logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
self.queue.put(block_number)
conn = self.conn_pool.put(conn)
def process_parent(self, conn, block_number):
logg.debug('getting block {}'.format(block_number))
o = self.chain_interface.block_by_number(block_number)
r = conn.do(o)
block = self.chain_interface.block_from_src(r)
logg.debug('got block typ {}'.format(type(block)))
super(ThreadedHistorySyncer, self).process(conn, block)
def process_single(self, conn, block, tx):
self.filter.apply(conn, block, tx)
def process(self, conn, block):
pass
#def process(self, conn, block):
def get(self, conn):
if not self.running:
raise SyncDone()
block_number = None
tx_index = None
flags = None
((block_number, tx_index), flags) = self.backend.get()
try:
#logg.debug('putting {}'.format(block.number))
#self.queue.put((conn, block_number,), timeout=0.1)
self.queue.put(block_number, timeout=0.1)
except queue.Full:
#logg.debug('queue full, try again')
return
target, flags = self.backend.target()
next_block = block_number + 1
if next_block > target:
self.quit_queue.put(())
raise SyncDone()
self.backend.set(self.backend.block_height + 1, 0)
# def get(self, conn):
# try:
# r = super(ThreadedHistorySyncer, self).get(conn)
# return r
# except SyncDone as e:
# self.quit_queue.put(())
# raise e
def loop(self, interval, conn):
for w in self.workers:
w.start()
r = super(ThreadedHistorySyncer, self).loop(interval, conn)
for w in self.workers:
w.join()
while True:
try:
self.quit_queue.get_nowait()
except queue.Empty:
break
logg.info('workers done {}'.format(r))

View File

@ -0,0 +1,170 @@
# standard imports
import logging
#import threading
import multiprocessing
import queue
import time
# external imports
from chainlib.error import RPCException
# local imports
from .history import HistorySyncer
from chainsyncer.error import SyncDone
logg = logging.getLogger(__name__)
def foobarcb(v):
logg.debug('foooz {}'.format(v))
class ThreadPoolTask:
process_func = None
chain_interface = None
def poolworker(self, block_number, conn):
# conn = args[1].get()
try:
logg.debug('processing parent {} {}'.format(conn, block_number))
#self.process_parent(self.conn, block_number)
self.process_parent(conn, block_number)
except IndexError:
pass
except RPCException as e:
logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
raise e
#self.queue.put(block_number)
# conn = self.conn_pool.put(conn)
def process_parent(self, conn, block_number):
logg.debug('getting block {}'.format(block_number))
o = self.chain_interface.block_by_number(block_number)
r = conn.do(o)
block = self.chain_interface.block_from_src(r)
logg.debug('got block typ {}'.format(type(block)))
#super(ThreadedHistorySyncer, self).process(conn, block)
self.process_func(conn, block)
class ThreadPoolHistorySyncer(HistorySyncer):
def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0):
super(ThreadPoolHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
self.workers = []
self.thread_limit = thread_limit
if conn_limit == 0:
self.conn_limit = self.thread_limit
#self.conn_pool = queue.Queue(conn_limit)
#self.queue = queue.Queue(thread_limit)
#self.quit_queue = queue.Queue(1)
#self.conn_pool = multiprocessing.Queue(conn_limit)
#self.queue = multiprocessing.Queue(thread_limit)
#self.quit_queue = multiprocessing.Queue(1)
#self.lock = threading.Lock()
#self.lock = multiprocessing.Lock()
ThreadPoolTask.process_func = super(ThreadPoolHistorySyncer, self).process
ThreadPoolTask.chain_interface = chain_interface
#for i in range(thread_limit):
#w = threading.Thread(target=self.worker)
# w = multiprocessing.Process(target=self.worker)
# self.workers.append(w)
#for i in range(conn_limit):
# self.conn_pool.put(conn_factory())
self.conn_factory = conn_factory
self.worker_pool = None
def terminate(self):
#self.quit_queue.put(())
super(ThreadPoolHistorySyncer, self).terminate()
# def worker(self):
# while True:
# block_number = None
# try:
# block_number = self.queue.get(timeout=0.01)
# except queue.Empty:
# if self.quit_queue.qsize() > 0:
# #logg.debug('{} received quit'.format(threading.current_thread().getName()))
# logg.debug('{} received quit'.format(multiprocessing.current_process().name))
# return
# continue
# conn = self.conn_pool.get()
# try:
# logg.debug('processing parent {} {}'.format(conn, block_number))
# self.process_parent(conn, block_number)
# except IndexError:
# pass
# except RPCException as e:
# logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
# self.queue.put(block_number)
# conn = self.conn_pool.put(conn)
#
def process_single(self, conn, block, tx):
self.filter.apply(conn, block, tx)
def process(self, conn, block):
pass
def get(self, conn):
if not self.running:
raise SyncDone()
block_number = None
tx_index = None
flags = None
((block_number, tx_index), flags) = self.backend.get()
#try:
#logg.debug('putting {}'.format(block.number))
#self.queue.put((conn, block_number,), timeout=0.1)
#self.queue.put(block_number, timeout=0.1)
#except queue.Full:
#logg.debug('queue full, try again')
# return
task = ThreadPoolTask()
conn = self.conn_factory()
self.worker_pool.apply_async(task.poolworker, (block_number, conn,), {}, foobarcb)
target, flags = self.backend.target()
next_block = block_number + 1
if next_block > target:
#self.quit_queue.put(())
self.worker_pool.close()
raise SyncDone()
self.backend.set(self.backend.block_height + 1, 0)
# def get(self, conn):
# try:
# r = super(ThreadedHistorySyncer, self).get(conn)
# return r
# except SyncDone as e:
# self.quit_queue.put(())
# raise e
def loop(self, interval, conn):
self.worker_pool = multiprocessing.Pool(self.thread_limit)
#for w in self.workers:
# w.start()
r = super(ThreadPoolHistorySyncer, self).loop(interval, conn)
#for w in self.workers:
# w.join()
#while True:
# try:
# self.quit_queue.get_nowait()
# except queue.Empty:
# break
time.sleep(1)
self.worker_pool.join()
logg.info('workers done {}'.format(r))

View File

@ -0,0 +1,81 @@
# standard imports
import copy
import logging
import multiprocessing
import os
# external iports
from chainlib.eth.connection import RPCConnection
# local imports
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.driver.base import Syncer
from .threadpool import ThreadPoolTask
logg = logging.getLogger(__name__)
def sync_split(block_offset, block_target, count):
block_count = block_target - block_offset
if block_count < count:
logg.warning('block count is less than thread count, adjusting thread count to {}'.format(block_count))
count = block_count
blocks_per_thread = int(block_count / count)
ranges = []
for i in range(count):
block_target = block_offset + blocks_per_thread
offset = block_offset
target = block_target -1
ranges.append((offset, target,))
block_offset = block_target
return ranges
class ThreadPoolRangeTask:
def __init__(self, backend, sync_range, chain_interface, syncer_factory=HistorySyncer, filters=[]):
backend_start = backend.start()
backend_target = backend.target()
backend_class = backend.__class__
tx_offset = 0
flags = 0
if sync_range[0] == backend_start[0][0]:
tx_offset = backend_start[0][1]
flags = backend_start[1]
self.backend = backend_class.custom(backend.chain_spec, sync_range[1], block_offset=sync_range[0], tx_offset=tx_offset, flags=flags, flags_count=0)
self.syncer = syncer_factory(self.backend, chain_interface)
for fltr in filters:
self.syncer.add_filter(fltr)
def start_loop(self, interval):
conn = RPCConnection.connect(self.backend.chain_spec)
return self.syncer.loop(interval, conn)
class ThreadPoolRangeHistorySyncer:
def __init__(self, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None):
self.src_backend = backend
self.thread_count = thread_count
self.single_sync_offset = 0
self.runlevel_callback = None
backend_start = backend.start()
backend_target = backend.target()
self.ranges = sync_split(backend_start[0][0], backend_target[0], thread_count)
self.chain_interface = chain_interface
self.filters = []
def add_filter(self, f):
self.filters.append(f)
def loop(self, interval, conn):
self.worker_pool = multiprocessing.Pool(processes=self.thread_count)
for sync_range in self.ranges:
task = ThreadPoolRangeTask(self.src_backend, sync_range, self.chain_interface, filters=self.filters)
t = self.worker_pool.apply_async(task.start_loop, (0.1,))
logg.debug('result of worker {}: {}'.format(t, t.get()))
self.worker_pool.close()
self.worker_pool.join()

View File

@ -3,7 +3,6 @@ class SyncDone(Exception):
""" """
pass pass
class NoBlockForYou(Exception): class NoBlockForYou(Exception):
"""Exception raised when attempt to retrieve a block from network that does not (yet) exist. """Exception raised when attempt to retrieve a block from network that does not (yet) exist.
""" """
@ -28,20 +27,6 @@ class LockError(Exception):
pass pass
class FilterDone(Exception):
"""Exception raised when all registered filters have been executed
"""
class InterruptError(FilterDone):
"""Exception for interrupting or attempting to use an interrupted sync
"""
class IncompleteFilterError(Exception):
"""Exception raised if filter reset is executed prematurely
"""
#class AbortTx(Exception): #class AbortTx(Exception):
# """ # """
# """ # """

View File

@ -1,131 +1,96 @@
# standard imports # standard imports
import hashlib
import logging import logging
import re
import os # local imports
from .error import BackendError
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
re_processedname = r'^_?[A-Z,\.]*$'
class SyncFilter: class SyncFilter:
"""Manages the collection of filters on behalf of a specific backend.
def sum(self): A filter is a pluggable piece of code to execute for every transaction retrieved by the syncer. Filters are executed in the sequence they were added to the instance.
s = self.common_name()
h = hashlib.sha256()
h.update(s.encode('utf-8'))
return h.digest()
def filter(self, conn, block, tx): :param backend: Syncer backend to apply filter state changes to
raise NotImplementedError() :type backend: chainsyncer.backend.base.Backend implementation
"""
def __init__(self, backend):
self.filters = []
self.backend = backend
def common_name(self): def add(self, fltr):
s = self.__module__ + '.' + self.__class__.__name__ """Add a filter instance.
return s.replace('.', '_')
:param fltr: Filter instance.
:type fltr: Object instance implementing signature as in chainsyncer.filter.NoopFilter.filter
:raises ValueError: Object instance is incorrect implementation
"""
if getattr(fltr, 'filter') == None:
raise ValueError('filter object must implement have method filter')
logg.debug('added filter "{}"'.format(str(fltr)))
self.filters.append(fltr)
# TODO: properly clarify interface shared with syncfsstore, move to filter module? def __apply_one(self, fltr, idx, conn, block, tx, session):
class FilterState: self.backend.begin_filter(idx)
fltr.filter(conn, block, tx, session)
def __init__(self, state_store, scan=None): self.backend.complete_filter(idx)
self.state_store = state_store
self.digest = b'\x00' * 32
self.summed = False
self.__syncs = {}
self.synced = False
self.connected = False
self.state_store.add('DONE')
self.state_store.add('LOCK')
self.state_store.add('INTERRUPT')
self.state_store.add('RESET')
self.state = self.state_store.state
self.elements = self.state_store.elements
self.put = self.state_store.put
self.mask = self.state_store.mask
self.name = self.state_store.name
self.set = self.state_store.set
self.next = self.state_store.next
self.move = self.state_store.move
self.unset = self.state_store.unset
self.peek = self.state_store.peek
self.from_name = self.state_store.from_name
self.list = self.state_store.list
self.state_store.sync()
self.all = self.state_store.all
self.started = False
self.scan = scan
def __verify_sum(self, v): def apply(self, conn, block, tx):
if not isinstance(v, bytes) and not isinstance(v, bytearray): """Apply all registered filters on the given transaction.
raise ValueError('argument must be instance of bytes')
if len(v) != 32: :param conn: RPC Connection, will be passed to the filter method
raise ValueError('argument must be 32 bytes') :type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
:param tx: Transaction object
:type tx: chainlib.tx.Tx
:raises BackendError: Backend connection failed
"""
session = None
try:
session = self.backend.connect()
except TimeoutError as e:
self.backend.disconnect()
raise BackendError('database connection fail: {}'.format(e))
i = 0
(pair, flags) = self.backend.get()
for f in self.filters:
if not self.backend.check_filter(i, flags):
logg.debug('applying filter {} {}'.format(str(f), flags))
self.__apply_one(f, i, conn, block, tx, session)
else:
logg.debug('skipping previously applied filter {} {}'.format(str(f), flags))
i += 1
self.backend.disconnect()
def register(self, fltr): class NoopFilter:
if self.summed: """A noop implemenation of a sync filter.
raise RuntimeError('filter already applied')
z = fltr.sum() Logs the filter inputs at debug log level.
self.__verify_sum(z) """
self.digest += z
s = fltr.common_name() def filter(self, conn, block, tx, db_session=None):
self.state_store.add(s) """Filter method implementation:
n = self.state_store.from_name(s)
logg.debug('add filter {} {} {}'.format(s, n, self)) :param conn: RPC Connection, will be passed to the filter method
:type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
:param tx: Transaction object
:type tx: chainlib.tx.Tx
:param db_session: Backend session object
:type db_session: varies
"""
logg.debug('noop filter :received\n{} {} {}'.format(block, tx, id(db_session)))
def sum(self): def __str__(self):
h = hashlib.sha256() return 'noopfilter'
h.update(self.digest)
self.digest = h.digest()
self.summed = True
return self.digest
def connect(self):
if not self.synced:
for v in self.state_store.all():
k = self.state_store.from_name(v)
self.state_store.sync(k)
self.__syncs[v] = True
if self.scan != None:
ks = self.scan()
for v in ks: #os.listdir(self.scan_path):
logg.debug('ks {}'.format(v))
k = None
try:
k = self.state_store.from_elements(v)
self.state_store.alias(v, k)
except ValueError:
k = self.state_store.from_name(v)
self.state_store.sync(k)
self.__syncs[v] = True
self.synced = True
self.connected = True
def disconnect(self):
self.connected = False
def start(self, offset=0, target=-1):
self.state_store.start(offset=offset, target=target)
self.started = True
def get(self, k):
return None
def next_item(self):
return None
def filters(self):
return []

View File

@ -1 +0,0 @@

View File

@ -1,146 +0,0 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import sys
import importlib
# external imports
import chainlib.cli
from shep.persist import PersistedState
# local imports
import chainsyncer.cli
from chainsyncer.settings import ChainsyncerSettings
from chainsyncer.store import SyncStore
from chainsyncer.filter import (
FilterState,
SyncFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
valid_fwd = [
'fwd',
'forward',
'next',
'continue',
]
valid_rwd = [
'rwd',
'rewind',
'current',
'back',
'repeat',
'replay',
]
action_is_forward = False
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--state-dir', type=str, dest='state_dir', help='State directory')
argparser.add_positional('action', type=str, help='Action to take on lock. Repeat means re-run the locked filter. Continue means resume execution for next filter.')
sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD
chainsyncer.cli.process_flags(argparser, sync_flags)
args = argparser.parse_args()
if args.action in valid_fwd:
action_is_forward = True
elif args.action not in valid_rwd:
sys.stderr.write('action argument must be one of {} or {}\n'.format(valid_rwd, valid_fwd))
sys.exit(1)
base_config_dir = chainsyncer.cli.config_dir,
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
config = chainsyncer.cli.process_config(config, args, sync_flags)
config.add(args.state_dir, '_STATE_DIR', False)
logg.debug('config loaded:\n{}'.format(config))
settings = ChainsyncerSettings()
settings.process_sync_backend(config)
logg.debug('settings:\n{}'.format(str(settings)))
class FilterInNameOnly(SyncFilter):
def __init__(self, k):
self.k = k
def common_name(self):
return self.k
def main():
if settings.get('SYNCER_BACKEND') == 'mem':
raise ValueError('cannot unlock volatile state store')
state_dir = config.get('_STATE_DIR')
if config.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
store = syncer_store_class(state_dir)
filter_list = store.load_filter_list()
for i, k in enumerate(filter_list):
fltr = FilterInNameOnly(k)
store.register(fltr)
filter_list[i] = k.upper()
store.connect()
store.start(ignore_lock=True)
lock_state = store.filter_state.from_name('LOCK')
locked_item = store.filter_state.list(lock_state)
if len(locked_item) == 0:
sys.stderr.write('Sync filter in {} is not locked\n'.format(state_dir))
sys.exit(1)
elif len(locked_item) > 1:
sys.stderr.write('More than one locked item encountered in {}. That should never happen, so I do not know what to do next.\n'.format(state_dir))
sys.exit(1)
locked_item_key = locked_item[0]
locked_item = store.get(int(locked_item_key))
locked_state = store.filter_state.state(locked_item_key) - lock_state
locked_state_name = store.filter_state.name(locked_state)
logg.info('found item "{}" in locked state {}'.format(locked_item, store.filter_state.name(locked_state)))
if action_is_forward:
k = locked_state_name
filter_index = None
filter_index = filter_list.index(k)
filter_pos = filter_index + 1
filter_count = len(filter_list)
logg.debug('Locked filter {} found at position {} of {}'.format(k, filter_pos, filter_count))
if filter_pos == filter_count:
logg.info('Locked filter {} is the last filter in the list. Executing filter reset'.format(k))
locked_item.reset(check_incomplete=False)
else:
locked_item.advance(ignore_lock=True)
store.filter_state.unset(locked_item_key, lock_state)
else:
filter_mask = 0xf
filter_state = store.filter_state.mask(locked_state, filter_mask)
logg.info('Chosen action is "{}": will continue execution at previous filter {}'.format(args.action, store.filter_state.name(filter_state)))
store.filter_state.unset(locked_item_key, lock_state)
if __name__ == '__main__':
main()

View File

@ -1,42 +1,23 @@
# standard imports # standard imports
import uuid import uuid
import logging
# local imports
from chainsyncer.error import FilterDone
logg = logging.getLogger(__name__)
class SyncSession: class SyncSession:
def __init__(self, session_store): def __init__(self, state_store, session_id=None, is_default=False):
self.session_store = session_store if session_id == None:
self.started = self.session_store.started session_id = str(uuid.uuid4())
self.get = self.session_store.get is_default = True
self.next = self.session_store.next_item self.session_id = session_id
self.item = None self.is_default = is_default
self.filters = self.session_store.filters self.state_store = state_store
self.filters = []
def start(self, offset=0, target=-1):
self.session_store.start(offset=offset, target=target)
self.item = self.session_store.next_item()
return self.item
def stop(self, item): def add_filter(self, fltr):
self.session_store.stop(item) self.state_store.register(fltr)
self.filters.append(fltr)
def filter(self, conn, block, tx): def start(self):
self.session_store.connect() self.state_store.start()
for fltr in self.filters:
logg.debug('executing filter {}'.format(fltr))
self.item.advance()
interrupt = fltr.filter(conn, block, tx)
if not self.item.release(interrupt=interrupt):
break
self.item.reset()
self.next()
self.session_store.disconnect()

View File

@ -1,55 +0,0 @@
# standard imports
import logging
# external imports
from hexathon import (
to_int as hex_to_int,
strip_0x,
)
from chainlib.settings import ChainSettings
logg = logging.getLogger(__name__)
class ChainsyncerSettings(ChainSettings):
def process_sync_backend(self, config):
self.o['SYNCER_BACKEND'] = config.get('SYNCER_BACKEND')
def process_sync_range(self, config):
o = self.o['SYNCER_INTERFACE'].block_latest()
r = self.o['RPC'].do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
keep_alive = False
session_block_offset = 0
block_limit = 0
until = 0
if config.true('_HEAD'):
self.o['SYNCER_OFFSET'] = block_offset
self.o['SYNCER_LIMIT'] = -1
return
session_block_offset = int(config.get('SYNCER_OFFSET'))
until = int(config.get('SYNCER_LIMIT'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
elif until == -1:
keep_alive = True
if session_block_offset == -1:
session_block_offset = block_offset
elif config.true('_KEEP_ALIVE'):
block_limit = -1
else:
if block_limit == 0:
block_limit = block_offset
self.o['SYNCER_OFFSET'] = session_block_offset
self.o['SYNCER_LIMIT'] = block_limit

View File

@ -0,0 +1 @@
from .base import SyncState

42
chainsyncer/state/base.py Normal file
View File

@ -0,0 +1,42 @@
# standard imports
import hashlib
class SyncState:
def __init__(self, state_store):
self.store = state_store
self.digest = b'\x00' * 32
self.summed = False
self.synced = {}
def __verify_sum(self, v):
if not isinstance(v, bytes) and not isinstance(v, bytearray):
raise ValueError('argument must be instance of bytes')
if len(v) != 32:
raise ValueError('argument must be 32 bytes')
def register(self, fltr):
if self.summed:
raise RuntimeError('filter already applied')
z = fltr.sum()
self.__verify_sum(z)
self.digest += z
s = fltr.common_name()
self.store.add('i_' + s)
self.store.add('o_' + s)
def sum(self):
h = hashlib.sha256()
h.update(self.digest)
self.digest = h.digest()
self.summed = True
return self.digest
def start(self):
for v in self.store.all():
self.store.sync(v)
self.synced[v] = True

View File

@ -1 +0,0 @@
from .base import *

View File

@ -1,322 +0,0 @@
# standard imports
import os
import logging
# local imports
from shep.persist import PersistedState
from shep import State
from shep.error import StateInvalid
from chainsyncer.filter import FilterState
from chainsyncer.error import (
LockError,
FilterDone,
InterruptError,
IncompleteFilterError,
SyncDone,
)
logg = logging.getLogger(__name__)
def sync_state_serialize(block_height, tx_index, block_target):
b = block_height.to_bytes(4, 'big')
b += tx_index.to_bytes(4, 'big')
b += block_target.to_bytes(4, 'big', signed=True)
return b
def sync_state_deserialize(b):
block_height = int.from_bytes(b[:4], 'big')
tx_index = int.from_bytes(b[4:8], 'big')
block_target = int.from_bytes(b[8:], 'big', signed=True)
return (block_height, tx_index, block_target,)
# NOT thread safe
class SyncItem:
def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_lock=False):
self.offset = offset
self.target = target
self.sync_state = sync_state
self.filter_state = filter_state
self.state_key = str(offset)
logg.debug('get key {}'.format(self.state_key))
v = self.sync_state.get(self.state_key)
(self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
filter_state = self.filter_state.state(self.state_key)
if filter_state & self.filter_state.from_name('LOCK') > 0 and not ignore_lock:
raise LockError(self.state_key)
self.count = len(self.filter_state.all(pure=True)) - 4
self.skip_filter = False
if self.count == 0:
self.skip_filter = True
elif not started:
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
def __check_done(self):
if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0:
raise InterruptError(self.state_key)
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') > 0:
raise FilterDone(self.state_key)
def resume(self):
filter_state = self.filter_state.state(self.state_key)
if filter_state > 0x0f:
filter_state_part = self.filter_state.mask(filter_state, 0x0f)
if len(self.filter_state.elements(filter_state)) == 1:
logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part))
lock_state = self.filter_state.from_name('LOCK')
self.filter_state.set(lock_state)
def reset(self, check_incomplete=True):
if check_incomplete:
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
raise LockError('reset attempt on {} when state locked'.format(self.state_key))
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
def next(self, advance_block=False):
v = self.sync_state.state(self.state_key)
if v == self.sync_state.DONE:
raise SyncDone(self.target)
elif v == self.sync_state.NEW:
self.sync_state.next(self.state_key)
v = self.sync_state.get(self.state_key)
(block_number, tx_index, target) = sync_state_deserialize(v)
if advance_block:
block_number += 1
tx_index = 0
if self.target >= 0 and block_number > self.target:
self.sync_state.move(self.state_key, self.sync_state.DONE)
raise SyncDone(self.target)
else:
tx_index += 1
self.cursor = block_number
self.tx_cursor = tx_index
b = sync_state_serialize(block_number, tx_index, target)
self.sync_state.replace(self.state_key, b)
def __find_advance(self):
v = self.filter_state.state(self.state_key)
def advance(self, ignore_lock=False):
if self.skip_filter:
raise FilterDone()
self.__check_done()
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
if ignore_lock:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
else:
raise LockError('advance attempt on {} when state locked'.format(self.state_key))
done = False
try:
self.filter_state.next(self.state_key)
except StateInvalid:
done = True
if done:
raise FilterDone()
self.filter_state.set(self.state_key, self.filter_state.from_name('LOCK'))
def release(self, interrupt=False):
if self.skip_filter:
return False
if interrupt == True:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
return False
state = self.filter_state.state(self.state_key)
if state & self.filter_state.from_name('LOCK') == 0:
raise LockError('release attempt on {} when state unlocked'.format(self.state_key))
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
try:
c = self.filter_state.peek(self.state_key)
logg.debug('peeked {}'.format(c))
except StateInvalid:
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
return False
return True
def __str__(self):
return 'syncitem offset {} target {} cursor {}'.format(self.offset, self.target, self.cursor)
class SyncStore:
def __init__(self, path, session_id=None):
self.session_id = session_id
self.session_path = None
self.is_default = False
self.first = False
self.target = None
self.items = {}
self.item_keys = []
self.started = False
self.thresholds = []
self.session_path = path
def setup_sync_state(self, factory=None, event_callback=None):
if factory == None:
self.state = State(2, event_callback=event_callback)
else:
self.state = PersistedState(factory.add, 2, event_callback=event_callback)
self.state.add('SYNC')
self.state.add('DONE')
def setup_filter_state(self, factory=None, event_callback=None):
if factory == None:
filter_state_backend = State(0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend)
else:
filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend, scan=factory.ls)
self.filters = []
def set_target(self, v):
pass
def get_target(self):
return None
def register(self, fltr):
self.filters.append(fltr)
self.filter_state.register(fltr)
def start(self, offset=0, target=-1, ignore_lock=False):
if self.started:
return
self.save_filter_list()
self.load(target, ignore_lock=ignore_lock)
if self.first:
state_bytes = sync_state_serialize(offset, 0, target)
block_number_str = str(offset)
self.state.put(block_number_str, contents=state_bytes)
self.filter_state.put(block_number_str)
o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock)
o.resume()
self.items[offset] = o
self.item_keys.append(offset)
elif offset > 0:
logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id))
self.started = True
self.item_keys.sort()
def stop(self, item):
if item.target == -1:
state_bytes = sync_state_serialize(item.cursor, 0, item.cursor)
self.state.replace(str(item.offset), state_bytes)
self.filter_state.put(str(item.cursor))
SyncItem(item.offset, -1, self.state, self.filter_state)
logg.info('New sync state start at block number {} for next head sync backfill'.format(item.cursor))
self.state.move(item.state_key, self.state.DONE)
state_bytes = sync_state_serialize(item.cursor, 0, -1)
self.state.put(str(item.cursor), contents=state_bytes)
def load(self, target, ignore_lock=False):
self.state.sync(self.state.NEW)
self.state.sync(self.state.SYNC)
thresholds_sync = []
for v in self.state.list(self.state.SYNC):
block_number = int(v)
thresholds_sync.append(block_number)
logg.debug('queue resume {}'.format(block_number))
thresholds_new = []
for v in self.state.list(self.state.NEW):
block_number = int(v)
thresholds_new.append(block_number)
logg.debug('queue new range {}'.format(block_number))
thresholds_sync.sort()
thresholds_new.sort()
thresholds = thresholds_sync + thresholds_new
lim = len(thresholds) - 1
for i in range(len(thresholds)):
item_target = target
if i < lim:
item_target = thresholds[i+1]
o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock)
o.resume()
self.items[block_number] = o
self.item_keys.append(block_number)
logg.info('added existing {}'.format(o))
self.get_target()
if len(thresholds) == 0:
if self.target != None:
logg.warning('sync "{}" is already done, nothing to do'.format(self.session_id))
else:
logg.info('syncer first run target {}'.format(target))
self.first = True
self.set_target(target)
def get(self, k):
return self.items[k]
def next_item(self):
try:
k = self.item_keys.pop(0)
except IndexError:
return None
return self.items[k]
def connect(self):
self.filter_state.connect()
def disconnect(self):
self.filter_state.disconnect()
def save_filter_list(self):
raise NotImplementedError()
def load_filter_list(self):
raise NotImplementedError()
def peek_next_filter(self):
pass
def peek_current_filter(self):
pass

View File

@ -1,98 +0,0 @@
# standard imports
import uuid
import os
import logging
# external imports
from shep.store.file import SimpleFileStoreFactory
# local imports
from chainsyncer.store import SyncStore
logg = logging.getLogger(__name__)
class SyncFsStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncFsStore, self).__init__(base_path, session_id=session_id)
create_path = False
try:
os.stat(self.session_path)
except FileNotFoundError:
create_path = True
if create_path:
self.__create_path(base_path, self.default_path, session_id=session_id)
self.session_id = os.path.basename(self.session_path)
logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
base_sync_path = os.path.join(self.session_path, 'sync')
factory = SimpleFileStoreFactory(base_sync_path, binary=True)
self.setup_sync_state(factory, state_event_callback)
self.setup_filter_state(callback=filter_state_event_callback)
def setup_filter_state(self, callback=None):
base_filter_path = os.path.join(self.session_path, 'filter')
factory = SimpleFileStoreFactory(base_filter_path, binary=True)
super(SyncFsStore, self).setup_filter_state(factory, callback)
def __create_path(self, base_path, default_path, session_id=None):
logg.debug('fs store path {} does not exist, creating'.format(self.session_path))
if session_id == None:
session_id = str(uuid.uuid4())
self.session_path = os.path.join(base_path, session_id)
os.makedirs(self.session_path)
if self.is_default:
try:
os.symlink(self.session_path, default_path)
except FileExistsError:
pass
def get_target(self):
fp = os.path.join(self.session_path, 'target')
try:
f = open(fp, 'r')
v = f.read()
f.close()
self.target = int(v)
except FileNotFoundError as e:
logg.debug('cant find target {} {}'.format(fp, e))
pass
def set_target(self, v):
fp = os.path.join(self.session_path, 'target')
f = open(fp, 'w')
f.write(str(v))
f.close()
self.target = v
def load_filter_list(self):
fltr = []
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'r')
while True:
v = f.readline()
if len(v) == 0:
break
v = v.rstrip()
fltr.append(v)
f.close()
return fltr
def save_filter_list(self):
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'w')
for fltr in self.filters:
f.write(fltr.common_name() + '\n')
f.close()

View File

@ -1,45 +0,0 @@
# standard imports
import logging
import os
# external imports
from shep import State
# local imports
from chainsyncer.store import SyncStore
logg = logging.getLogger(__name__)
class SyncMemStore(SyncStore):
def __init__(self, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncMemStore, self).__init__(None, session_id=session_id)
factory = None
self.setup_sync_state(factory, state_event_callback)
factory = None
self.setup_filter_state(factory, filter_state_event_callback)
def set_target(self, v):
self.target = int(v)
def get_target(self):
return self.target
def stop(self, item):
if item != None:
super(SyncMemStore, self).stop(item)
logg.info('I am an in-memory only state store. I am shutting down now, so all state will now be discarded.')
def save_filter_list(self):
pass
def load_filter_list(self):
return []

View File

@ -1,79 +0,0 @@
# standard imports
import uuid
import os
import logging
# external imports
from shep.store.rocksdb import RocksDbStoreFactory
# local imports
from chainsyncer.store import (
SyncItem,
SyncStore,
)
logg = logging.getLogger(__name__)
class RocksDbStoreAdder:
def __init__(self, factory, prefix):
self.factory = factory
self.prefix = prefix
def add(self, k):
path = os.path.join(self.prefix, k)
return self.factory.add(path)
def ls(self):
return self.factory.ls()
class SyncRocksDbStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncRocksDbStore, self).__init__(base_path, session_id=session_id)
self.factory = RocksDbStoreFactory(self.session_path, binary=True)
prefix_factory = RocksDbStoreAdder(self.factory, 'sync')
self.setup_sync_state(prefix_factory, state_event_callback)
prefix_factory = RocksDbStoreAdder(self.factory, 'filter')
self.setup_filter_state(prefix_factory, filter_state_event_callback)
#self.session_id = os.path.basename(self.session_path)
#logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
self.target_db = RocksDbStoreAdder(self.factory, '.stat').add('target')
def get_target(self):
v = self.target_db.get('target')
if v != None:
self.target = int(v)
def set_target(self, v):
self.target_db.put('target', str(v))
self.target = v
def stop(self, item):
if item != None:
super(SyncRocksDbStore, self).stop(item)
self.factory.close()
def save_filter_list(self):
fltr = []
for v in self.filters:
fltr.append(v.common_name())
self.target_db.put('filter_list', ','.join(fltr))
def load_filter_list(self):
v = self.target_db.get('filter_list')
v = v.decode('utf-8')
return v.split(',')

View File

@ -1 +0,0 @@
from .base import *

View File

@ -1,73 +1,17 @@
# standard imports # standard imports
import os import os
import logging import logging
import hashlib
# external imports # external imports
from hexathon import add_0x from hexathon import add_0x
from shep.state import State
# local imports # local imports
#from chainsyncer.driver.history import HistorySyncer from chainsyncer.driver.history import HistorySyncer
from chainsyncer.error import NoBlockForYou from chainsyncer.error import NoBlockForYou
from chainsyncer.driver import SyncDriver
logging.STATETRACE = 5
logg = logging.getLogger().getChild(__name__) logg = logging.getLogger().getChild(__name__)
def state_event_handler(k, v_old, v_new):
if v_old == None:
logg.log(logging.STATETRACE, 'sync state create key {}: -> {}'.format(k, v_new))
else:
logg.log(logging.STATETRACE, 'sync state change key {}: {} -> {}'.format(k, v_old, v_new))
def filter_state_event_handler(k, v_old, v_new):
if v_old == None:
logg.log(logging.STATETRACE, 'filter state create key {}: -> {}'.format(k, v_new))
else:
logg.log(logging.STATETRACE, 'filter state change key {}: {} -> {}'.format(k, v_old, v_new))
class MockFilterError(Exception):
pass
class MockBlockGenerator:
def __init__(self, offset=0):
self.blocks = {}
self.offset = offset
self.cursor = offset
def generate(self, spec=[], driver=None):
for v in spec:
txs = []
for i in range(v):
tx_hash = os.urandom(32).hex()
tx = MockTx(0, tx_hash)
txs.append(tx)
block = MockBlock(self.cursor, txs)
self.blocks[self.cursor] = block
self.cursor += 1
if driver != None:
self.apply(driver)
def apply(self, driver, offset=0):
block_numbers = list(self.blocks.keys())
for block_number in block_numbers:
if block_number < offset:
continue
block = self.blocks[block_number]
driver.add_block(block)
class MockConn: class MockConn:
"""Noop connection mocker. """Noop connection mocker.
@ -112,7 +56,6 @@ class MockBlock:
""" """
self.number = number self.number = number
self.txs = txs self.txs = txs
self.hash = os.urandom(32).hex()
def tx(self, i): def tx(self, i):
@ -121,158 +64,45 @@ class MockBlock:
:param i: Transaction index :param i: Transaction index
:type i: int :type i: int
""" """
return MockTx(i, self.txs[i].hash) return MockTx(i, self.txs[i])
class MockStore(State): class TestSyncer(HistorySyncer):
"""Unittest extension of history syncer driver.
def __init__(self, bits=0): :param backend: Syncer backend
super(MockStore, self).__init__(bits, check_alias=False) :type backend: chainsyncer.backend.base.Backend implementation
:param chain_interface: Chain interface
:type chain_interface: chainlib.interface.ChainInterface implementation
:param tx_counts: List of integer values defining how many mock transactions to generate per block. Mock blocks will be generated for each element in list.
:type tx_counts: list
"""
def __init__(self, backend, chain_interface, tx_counts=[]):
def start(self, offset=0, target=-1): self.tx_counts = tx_counts
pass super(TestSyncer, self).__init__(backend, chain_interface)
class MockFilter: def get(self, conn):
"""Implements the block getter of chainsyncer.driver.base.Syncer.
def __init__(self, name, brk=None, brk_hard=None, z=None): :param conn: RPC connection
self.name = name :type conn: chainlib.connection.RPCConnection
if z == None: :raises NoBlockForYou: End of mocked block array reached
h = hashlib.sha256() :rtype: chainsyncer.unittest.base.MockBlock
h.update(self.name.encode('utf-8')) :returns: Mock block.
z = h.digest() """
self.z = z (pair, fltr) = self.backend.get()
self.brk = brk (target_block, fltr) = self.backend.target()
self.brk_hard = brk_hard block_height = pair[0]
self.contents = []
if block_height == target_block:
def sum(self): self.running = False
return self.z
def common_name(self):
return self.name
def filter(self, conn, block, tx):
r = False
if self.brk_hard != None:
r = True
if self.brk_hard > 0:
r = True
self.brk_hard -= 1
if r:
raise MockFilterError()
if self.brk != None:
if self.brk > 0:
r = True
self.brk -= 1
self.contents.append((block.number, tx.index, tx.hash,))
logg.debug('filter {} result {} block {} tx {} {}'.format(self.common_name(), r, block.number, tx.index, tx.hash))
return r
class MockDriver(SyncDriver):
def __init__(self, store, offset=0, target=-1, interrupt_block=None, interrupt_tx=None, interrupt_global=False):
super(MockDriver, self).__init__(store, offset=offset, target=target)
self.blocks = {}
self.interrupt = None
if interrupt_block != None:
interrupt_block = int(interrupt_block)
if interrupt_tx == None:
interrupt_tx = 0
else:
interrupt_tx = int(interrupt_tx)
self.interrupt = (interrupt_block, interrupt_tx,)
self.interrupt_global = interrupt_global
def add_block(self, block):
logg.debug('add block {} {} with {} txs'.format(block.number, block.hash, len(block.txs)))
self.blocks[block.number] = block
def get(self, conn, item):
try:
return self.blocks[item.cursor]
except KeyError:
raise NoBlockForYou() raise NoBlockForYou()
block_txs = []
def process(self, conn, item, block): if block_height < len(self.tx_counts):
i = item.tx_cursor for i in range(self.tx_counts[block_height]):
while self.running: block_txs.append(add_0x(os.urandom(32).hex()))
if self.interrupt != None:
if self.interrupt[0] == block.number and self.interrupt[1] == i: return MockBlock(block_height, block_txs)
logg.info('interrupt triggered at {}'.format(self.interrupt))
if self.interrupt_global:
SyncDriver.running_global = False
self.running = False
break
tx = block.tx(i)
self.process_single(conn, block, tx)
item.next()
i += 1
class MockChainInterface:
def block_by_number(self, number):
return ('block_by_number', number,)
def tx_by_hash(self, hsh):
return ('tx_by_hash', hsh,)
def block_from_src(self, src):
return src
def src_normalize(self, src):
return src
def tx_receipt(self, hsh):
return ('receipt', hsh,)
class MockChainInterfaceConn(MockConn):
def __init__(self, interface):
self.ifc = interface
self.blocks = {}
self.txs = {}
def add_block(self, block):
logg.debug('add block {} {} with {} txs'.format(block.number, block.hash, len(block.txs)))
self.blocks[block.number] = block
for tx in block.txs:
self.txs[tx.hash] = tx
def do(self, o):
m = getattr(self, 'handle_' + o[0])
return m(o[1])
def handle_block_by_number(self, number):
return self.blocks[number]
def handle_receipt(self, hsh):
return {}
class MockItem:
def __init__(self, target, offset, cursor, state_key):
self.target = target
self.offset = offset
self.cursor = cursor
self.state_key = state_key

View File

@ -0,0 +1,64 @@
# standard imports
import logging
import os
# external imports
import alembic
import alembic.config
# local imports
from chainsyncer.db.models.base import SessionBase
from chainsyncer.db import dsn_from_config
from chainsyncer.db.models.base import SessionBase
logg = logging.getLogger(__name__)
class ChainSyncerDb:
"""SQLITE database setup for unit tests
:param debug: Activate sql level debug (outputs sql statements)
:type debug: bool
"""
base = SessionBase
def __init__(self, debug=False):
config = {
'DATABASE_ENGINE': 'sqlite',
'DATABASE_DRIVER': 'pysqlite',
'DATABASE_NAME': 'chainsyncer.sqlite',
}
logg.debug('config {}'.format(config))
self.dsn = dsn_from_config(config)
self.base.poolable = False
self.base.transactional = False
self.base.procedural = False
self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0
rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..')
dbdir = os.path.join(rootdir, 'chainsyncer', 'db')
#migrationsdir = os.path.join(dbdir, 'migrations', config.get('DATABASE_ENGINE'))
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', self.dsn)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
def bind_session(self, session=None):
"""Create session using underlying session base
"""
return self.base.bind_session(session)
def release_session(self, session=None):
"""Release session using underlying session base
"""
return self.base.release_session(session)

View File

@ -1,301 +0,0 @@
# standard imports
import os
import stat
import unittest
import shutil
import tempfile
import logging
import uuid
# local imports
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import (
MockFilter,
MockItem,
)
logging.STATETRACE = 5
logg = logging.getLogger(__name__)
logg.setLevel(logging.STATETRACE)
def state_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
def filter_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
class TestStoreBase(unittest.TestCase):
def setUp(self):
self.base_path = tempfile.mkdtemp()
self.session_id = str(uuid.uuid4())
self.path = os.path.join(self.base_path, self.session_id)
os.makedirs(self.path)
self.store_factory = None
self.persist = True
@classmethod
def link(cls, target):
for v in [
"default",
"store_start",
"store_resume",
"filter_list",
"sync_process_nofilter",
"sync_process_onefilter",
"sync_process_outoforder",
"sync_process_interrupt",
"sync_process_reset",
"sync_process_done",
"sync_head_future",
"sync_history_interrupted",
"sync_history_complete",
]:
setattr(target, 'test_' + v, getattr(cls, 't_' + v))
def tearDown(self):
shutil.rmtree(self.path)
def t_default(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
if store.session_path == None:
return
#fp = os.path.join(self.path, store.session_id)
fp = self.path
session_id = store.session_id
st = None
st = os.stat(fp)
if st != None:
self.assertTrue(stat.S_ISDIR(st.st_mode))
#self.assertTrue(store.is_default)
store.stop(bogus_item)
store = self.store_factory()
fpr = os.path.join(self.path, self.session_id)
self.assertEqual(fp, self.path)
def t_store_start(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
store.start(42)
self.assertTrue(store.first)
store.stop(bogus_item)
if self.persist:
store = self.store_factory()
store.start()
self.assertFalse(store.first)
def t_store_resume(self):
store = self.store_factory()
store.start(13)
self.assertTrue(store.first)
# todo not done
def t_sync_process_nofilter(self):
store = self.store_factory()
session = SyncSession(store)
session.start()
o = session.get(0)
with self.assertRaises(FilterDone):
o.advance()
def t_sync_process_onefilter(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start()
o = session.get(0)
o.advance()
o.release()
def t_sync_process_outoforder(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('two')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.advance()
o.release()
with self.assertRaises(LockError):
o.release()
o.advance()
o.release()
def t_sync_process_interrupt(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
o.release(interrupt=True)
with self.assertRaises(FilterDone):
o.advance()
def t_sync_process_reset(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.reset()
o.release()
with self.assertRaises(IncompleteFilterError):
o.reset()
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
def t_sync_process_done(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start(target=0)
o = session.get(0)
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def t_sync_head_future(self):
store = self.store_factory('foo')
session = SyncSession(store)
session.start()
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
if self.persist:
store = self.store_factory('foo')
store.start()
o = store.get(2)
def t_sync_history_interrupted(self):
if not self.persist:
return
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory('foo')
session = SyncSession(store)
session.start(target=13)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store.stop(bogus_item)
store = self.store_factory('foo')
store.start()
o = store.get(0)
self.assertEqual(o.cursor, 2)
self.assertEqual(o.target, 13)
o.next(advance_block=True)
o.next(advance_block=True)
store.stop(bogus_item)
store = self.store_factory('foo')
store.start()
self.assertEqual(o.cursor, 4)
self.assertEqual(o.target, 13)
def t_sync_history_complete(self):
store = self.store_factory('foo')
session = SyncSession(store)
session.start(target=3)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
o.next(advance_block=True)
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def t_filter_list(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
if store.session_path == None:
return
fltr_one = MockFilter('foo_bar')
store.register(fltr_one)
fltr_two = MockFilter('bar_baz')
store.register(fltr_two)
store.start()
store.stop(bogus_item)
store = self.store_factory()
r = store.load_filter_list()
self.assertEqual(r[0], 'foo_bar')
self.assertEqual(r[1], 'bar_baz')

View File

@ -1,5 +1,5 @@
confini~=0.6.0 confini~=0.6.0
semver==2.13.0 semver==2.13.0
hexathon~=0.1.5 hexathon~=0.1.5
chainlib~=0.1.1 chainlib>=0.1.0b1,<=0.1.0
shep~=0.2.3 shep~=0.0.1

View File

@ -8,12 +8,5 @@ for f in `ls tests/*.py`; do
exit exit
fi fi
done done
for f in `ls tests/store/*.py`; do
python $f
if [ $? -gt 0 ]; then
exit
fi
done
set +x set +x
set +e set +e

View File

@ -1,10 +1,10 @@
[metadata] [metadata]
name = chainsyncer name = chainsyncer
version = 0.4.1 version = 0.3.0
description = Generic blockchain syncer driver description = Generic blockchain syncer driver
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
url = https://gitlab.com/chaintool/chainsyncer url = https://gitlab.com/chaintools/chainsyncer
keywords = keywords =
cryptocurrency cryptocurrency
classifiers = classifiers =
@ -25,17 +25,16 @@ include_package_data = True
python_requires = >= 3.6 python_requires = >= 3.6
packages = packages =
chainsyncer chainsyncer
chainsyncer.db
chainsyncer.db.models
chainsyncer.backend
chainsyncer.driver chainsyncer.driver
chainsyncer.unittest chainsyncer.unittest
chainsyncer.store
chainsyncer.cli
chainsyncer.runnable
#[options.package_data] [options.package_data]
#* = * =
# sql/* sql/*
[options.entry_points] #[options.entry_points]
console_scripts = #console_scripts =
#blocksync-celery = chainsyncer.runnable.tracker:main # blocksync-celery = chainsyncer.runnable.tracker:main
chainsyncer-unlock = chainsyncer.runnable.unlock:main

View File

@ -26,7 +26,5 @@ setup(
install_requires=requirements, install_requires=requirements,
extras_require={ extras_require={
'sql': sql_requirements, 'sql': sql_requirements,
'rocksdb': ['shep[rocksdb]~=0.2.2'],
'redis': ['shep[redis]~=0.2.2'],
} }
) )

View File

@ -1,33 +0,0 @@
# standard imports
import unittest
import logging
# external imports
from shep import State
# local imports
from chainsyncer.store.mem import SyncMemStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def create(self, session_id=None):
return SyncMemStore(session_id=session_id)
class TestMem(TestStoreBase):
def setUp(self):
super(TestMem, self).setUp()
self.store_factory = StoreFactory().create
self.persist = False
if __name__ == '__main__':
TestStoreBase.link(TestMem)
# Remove tests that test persistence of state
unittest.main()

View File

@ -1,32 +0,0 @@
# standard imports
import unittest
import logging
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def __init__(self, path):
self.path = path
def create(self, session_id=None):
return SyncFsStore(self.path, session_id=session_id)
class TestFs(TestStoreBase):
def setUp(self):
super(TestFs, self).setUp()
self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
TestStoreBase.link(TestFs)
unittest.main()

View File

@ -1,35 +0,0 @@
# standard imports
import unittest
import logging
# local imports
from chainsyncer.store.rocksdb import SyncRocksDbStore
from chainsyncer.unittest.store import (
TestStoreBase,
filter_change_callback,
state_change_callback,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def __init__(self, path):
self.path = path
def create(self, session_id=None):
return SyncRocksDbStore(self.path, session_id=session_id, state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
class TestRocksDb(TestStoreBase):
def setUp(self):
super(TestRocksDb, self).setUp()
self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
TestStoreBase.link(TestRocksDb)
unittest.main()

View File

@ -1,75 +1,72 @@
# standard imports # standard imporst
import unittest import unittest
import tempfile
import shutil # external imports
import logging from shep import State
# local imports # local imports
from chainsyncer.state import SyncState
from chainsyncer.session import SyncSession from chainsyncer.session import SyncSession
from chainsyncer.filter import FilterState
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.unittest import (
MockStore,
MockFilter,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger() class MockStore(State):
def __init__(self, bits):
super(MockStore, self).__init__(bits, check_alias=False)
class MockFilter:
def __init__(self, z, name):
self.z = z
self.name = name
def sum(self):
return self.z
def common_name(self):
return self.name
class TestSync(unittest.TestCase): class TestSync(unittest.TestCase):
def setUp(self): def setUp(self):
self.path = tempfile.mkdtemp() self.store = MockStore(6)
self.store = SyncFsStore(self.path) self.state = SyncState(self.store)
def tearDown(self):
shutil.rmtree(self.path)
def test_basic(self): def test_basic(self):
store = MockStore(6) session = SyncSession(self.state)
state = FilterState(store) self.assertTrue(session.is_default)
session = SyncSession(state)
session = SyncSession(self.state, session_id='foo')
self.assertFalse(session.is_default)
def test_sum(self): def test_sum(self):
store = MockStore(6)
state = FilterState(store)
b = b'\x2a' * 32 b = b'\x2a' * 32
fltr = MockFilter('foo', z=b) fltr = MockFilter(b, name='foo')
state.register(fltr) self.state.register(fltr)
b = b'\x0d' * 31 b = b'\x0d' * 31
fltr = MockFilter('bar', z=b) fltr = MockFilter(b, name='bar')
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
state.register(fltr) self.state.register(fltr)
b = b'\x0d' * 32 b = b'\x0d' * 32
fltr = MockFilter('bar', z=b) fltr = MockFilter(b, name='bar')
state.register(fltr) self.state.register(fltr)
v = state.sum() v = self.state.sum()
self.assertEqual(v.hex(), 'a24abf9fec112b4e0210ae874b4a371f8657b1ee0d923ad6d974aef90bad8550') self.assertEqual(v.hex(), 'a24abf9fec112b4e0210ae874b4a371f8657b1ee0d923ad6d974aef90bad8550')
def test_session_start(self): def test_session_start(self):
store = MockStore(6) session = SyncSession(self.state)
state = FilterState(store)
session = SyncSession(state)
session.start() session.start()
def test_state_dynamic(self):
store = MockStore()
state = FilterState(store)
b = b'\x0d' * 32
fltr = MockFilter(name='foo', z=b)
state.register(fltr)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -1,61 +0,0 @@
# standard imports
import unittest
import tempfile
import shutil
import logging
import stat
import os
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import (
MockBlockGenerator,
MockFilter,
MockChainInterfaceConn,
MockTx,
MockBlock,
MockChainInterface,
MockFilterError,
)
from chainsyncer.driver.chain_interface import ChainInterfaceDriver
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestFilter(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
self.store = SyncFsStore(self.path)
self.ifc = MockChainInterface()
self.conn = MockChainInterfaceConn(self.ifc)
def tearDown(self):
shutil.rmtree(self.path)
def test_driver(self):
generator = MockBlockGenerator()
generator.generate([1, 2], driver=self.conn)
drv = ChainInterfaceDriver(self.store, self.ifc, target=1)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
with self.assertRaises(SyncDone):
drv.run(self.conn)
self.assertEqual(len(fltr_one.contents), 3)
if __name__ == '__main__':
unittest.main()

View File

@ -1,78 +0,0 @@
# standard imports
import unittest
import tempfile
import shutil
import logging
import stat
import os
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
)
from chainsyncer.unittest import (
MockFilter,
MockConn,
MockTx,
MockBlock,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestFilter(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
self.store = SyncFsStore(self.path)
self.session = SyncSession(self.store)
self.conn = MockConn()
def tearDown(self):
shutil.rmtree(self.path)
def test_filter_basic(self):
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
fltr_two = MockFilter('bar')
self.store.register(fltr_two)
self.session.start()
tx_hash = os.urandom(32).hex()
tx = MockTx(42, tx_hash)
block = MockBlock(13, [tx_hash])
self.session.filter(self.conn, block, tx)
self.assertEqual(len(fltr_one.contents), 1)
self.assertEqual(len(fltr_two.contents), 1)
def test_filter_interrupt(self):
fltr_one = MockFilter('foo', brk=True)
self.store.register(fltr_one)
fltr_two = MockFilter('bar')
self.store.register(fltr_two)
self.session.start()
tx_hash = os.urandom(32).hex()
tx = MockTx(42, tx_hash)
block = MockBlock(13, [tx_hash])
self.session.filter(self.conn, block, tx)
self.assertEqual(len(fltr_one.contents), 1)
self.assertEqual(len(fltr_two.contents), 0)
if __name__ == '__main__':
unittest.main()

View File

@ -1,155 +0,0 @@
# standard imports
import unittest
import tempfile
import shutil
import logging
import stat
import os
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import (
MockBlockGenerator,
MockFilter,
MockConn,
MockTx,
MockBlock,
MockDriver,
MockFilterError,
state_event_handler,
filter_state_event_handler,
)
from chainsyncer.driver import SyncDriver
logging.basicConfig(level=logging.STATETRACE)
logg = logging.getLogger()
class TestFilter(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
self.store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
self.conn = MockConn()
# def tearDown(self):
# shutil.rmtree(self.path)
def test_filter_basic(self):
session = SyncSession(self.store)
session.start(target=1)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
tx_hash = os.urandom(32).hex()
tx = MockTx(42, tx_hash)
block = MockBlock(0, [tx_hash])
session.filter(self.conn, block, tx)
tx_hash = os.urandom(32).hex()
tx = MockTx(42, tx_hash)
block = MockBlock(1, [tx_hash])
session.filter(self.conn, block, tx)
self.assertEqual(len(fltr_one.contents), 2)
def test_driver(self):
drv = MockDriver(self.store, target=1)
generator = MockBlockGenerator()
generator.generate([1, 2], driver=drv)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
with self.assertRaises(SyncDone):
drv.run(self.conn)
self.assertEqual(len(fltr_one.contents), 3)
def test_driver_interrupt_noresume(self):
drv = MockDriver(self.store, target=1)
generator = MockBlockGenerator()
generator.generate([1], driver=drv)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
fltr_two = MockFilter('bar', brk_hard=1)
self.store.register(fltr_two)
with self.assertRaises(MockFilterError):
drv.run(self.conn)
self.assertEqual(len(fltr_one.contents), 1)
self.assertEqual(len(fltr_two.contents), 0)
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
fltr_one = MockFilter('foo') #, brk_hard=1)
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
with self.assertRaises(LockError):
drv = MockDriver(store, target=1)
self.assertEqual(len(fltr_one.contents), 0)
self.assertEqual(len(fltr_two.contents), 0)
def test_driver_interrupt_filter(self):
drv = MockDriver(self.store, target=1)
generator = MockBlockGenerator()
generator.generate([1, 1], driver=drv)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
fltr_two = MockFilter('bar', brk=1)
self.store.register(fltr_two)
fltr_three = MockFilter('baz')
self.store.register(fltr_three)
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
with self.assertRaises(SyncDone):
drv.run(self.conn)
self.assertEqual(len(fltr_one.contents), 2)
self.assertEqual(len(fltr_two.contents), 2)
self.assertEqual(len(fltr_three.contents), 1)
def test_driver_interrupt_sync(self):
drv = MockDriver(self.store, interrupt_block=1, target=2)
generator = MockBlockGenerator()
generator.generate([3, 1, 2], driver=drv)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
drv.run(self.conn, interval=0.1)
self.assertEqual(len(fltr_one.contents), 3)
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
store.register(fltr_one)
drv = MockDriver(store)
generator.apply(drv, offset=1)
with self.assertRaises(SyncDone) as e:
drv.run(self.conn, interval=0.1)
self.assertEqual(e, 2)
self.assertEqual(len(fltr_one.contents), 6)
if __name__ == '__main__':
unittest.main()