Compare commits
No commits in common. "master" and "2ba87de195a8e5facb6cd32be246075e4b55fcd9" have entirely different histories.
master
...
2ba87de195
23
CHANGELOG
23
CHANGELOG
@ -1,19 +1,4 @@
|
||||
* 0.3.7
|
||||
- Remove hard eth dependency in settings rendering
|
||||
- Add unlock cli tool
|
||||
* 0.3.6
|
||||
- Add cli arg processing and settings renderer
|
||||
* 0.3.5
|
||||
- Allow memory-only shep if factory set to None in store constructor
|
||||
* 0.3.4
|
||||
- Use explicit bool check in filter interrupt check
|
||||
* 0.3.3
|
||||
- Include shep persistent state bootstrap sync
|
||||
- Add chainsyncer extras
|
||||
* 0.3.2
|
||||
- Implement rocksdb backend
|
||||
* 0.3.1
|
||||
- Upgrade to release shep version
|
||||
- Move sync state to SYNC after start
|
||||
* 0.3.0
|
||||
- Re-implement chainsyncer on shep
|
||||
- 0.3.0
|
||||
* Re-implement filter state handling on shep
|
||||
- 0.1.0
|
||||
* Add deferred idle handling
|
||||
|
@ -1 +1 @@
|
||||
include *requirements.txt LICENSE.txt chainsyncer/data/config/*
|
||||
include *requirements.txt LICENSE.txt chainsyncer/db/migrations/default/* chainsyncer/db/migrations/default/versions/* chainsyncer/db/migrations/default/versions/src/*
|
||||
|
@ -1,12 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
|
||||
# local imports
|
||||
from .base import *
|
||||
from .arg import process_flags
|
||||
from .config import process_config
|
||||
|
||||
|
||||
__script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
|
||||
config_dir = os.path.join(data_dir, 'config')
|
@ -1,14 +0,0 @@
|
||||
# local imports
|
||||
from .base import SyncFlag
|
||||
|
||||
|
||||
def process_flags(argparser, flags):
|
||||
|
||||
if flags & SyncFlag.RANGE > 0:
|
||||
argparser.add_argument('--offset', type=int, help='Block to start sync from. Default is start of history (0).')
|
||||
argparser.add_argument('--until', type=int, default=-1, help='Block to stop sync on. Default is stop at block height of first run.')
|
||||
if flags & SyncFlag.HEAD > 0:
|
||||
argparser.add_argument('--head', action='store_true', help='Start from latest block as offset')
|
||||
argparser.add_argument('--keep-alive', action='store_true', help='Do not stop syncing when caught up')
|
||||
|
||||
argparser.add_argument('--backend', type=str, help='Backend to use for state store')
|
@ -1,7 +0,0 @@
|
||||
# standard imports
|
||||
import enum
|
||||
|
||||
|
||||
class SyncFlag(enum.IntEnum):
|
||||
RANGE = 1
|
||||
HEAD = 2
|
@ -1,20 +0,0 @@
|
||||
# external imports
|
||||
from chainsyncer.cli import SyncFlag
|
||||
|
||||
|
||||
def process_config(config, args, flags):
|
||||
args_override = {}
|
||||
|
||||
args_override['SYNCER_BACKEND'] = getattr(args, 'backend')
|
||||
|
||||
if flags & SyncFlag.RANGE:
|
||||
args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
|
||||
args_override['SYNCER_LIMIT'] = getattr(args, 'until')
|
||||
|
||||
config.dict_override(args_override, 'local cli args')
|
||||
|
||||
if flags & SyncFlag.HEAD:
|
||||
config.add(getattr(args, 'keep_alive'), '_KEEP_ALIVE')
|
||||
config.add(getattr(args, 'head'), '_HEAD')
|
||||
|
||||
return config
|
@ -1,4 +0,0 @@
|
||||
[syncer]
|
||||
offset = 0
|
||||
limit = 0
|
||||
backend = mem
|
@ -1 +1 @@
|
||||
from .base import SyncDriver
|
||||
from .base import Syncer
|
||||
|
@ -1,21 +1,48 @@
|
||||
# standard imports
|
||||
import uuid
|
||||
import logging
|
||||
import time
|
||||
import signal
|
||||
import json
|
||||
|
||||
# external imports
|
||||
from chainlib.error import JSONRPCException
|
||||
|
||||
# local imports
|
||||
from chainsyncer.filter import SyncFilter
|
||||
from chainsyncer.error import (
|
||||
SyncDone,
|
||||
NoBlockForYou,
|
||||
)
|
||||
from chainsyncer.session import SyncSession
|
||||
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
NS_DIV = 1000000000
|
||||
|
||||
class SyncDriver:
|
||||
def noop_callback(block, tx):
|
||||
"""Logger-only callback for pre- and post processing.
|
||||
|
||||
:param block: Block object
|
||||
:type block: chainlib.block.Block
|
||||
:param tx: Transaction object
|
||||
:type tx: chainlib.tx.Tx
|
||||
"""
|
||||
logg.debug('noop callback ({},{})'.format(block, tx))
|
||||
|
||||
|
||||
class Syncer:
|
||||
"""Base class for syncer implementations.
|
||||
|
||||
:param backend: Syncer state backend
|
||||
:type backend: chainsyncer.backend.base.Backend implementation
|
||||
:param chain_interface: Chain interface implementation
|
||||
:type chain_interface: chainlib.interface.ChainInterface implementation
|
||||
:param pre_callback: Function to call before polling. Function will receive no arguments.
|
||||
:type pre_callback: function
|
||||
:param block_callback: Function to call before processing txs in a retrieved block. Function should have signature as chainsyncer.driver.base.noop_callback
|
||||
:type block_callback: function
|
||||
:param post_callback: Function to call after polling. Function will receive no arguments.
|
||||
:type post_callback: function
|
||||
"""
|
||||
|
||||
running_global = True
|
||||
"""If set to false syncer will terminate polling loop."""
|
||||
@ -28,22 +55,19 @@ class SyncDriver:
|
||||
name = 'base'
|
||||
"""Syncer name, to be overriden for each extended implementation."""
|
||||
|
||||
|
||||
def __init__(self, store, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None):
|
||||
self.store = store
|
||||
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
|
||||
self.chain_interface = chain_interface
|
||||
self.cursor = None
|
||||
self.running = True
|
||||
self.backend = backend
|
||||
self.filter = SyncFilter(backend)
|
||||
self.block_callback = block_callback
|
||||
self.pre_callback = pre_callback
|
||||
self.post_callback = post_callback
|
||||
self.block_callback = block_callback
|
||||
self.idle_callback = idle_callback
|
||||
self.last_start = 0
|
||||
self.clock_id = time.CLOCK_MONOTONIC_RAW
|
||||
self.store.connect()
|
||||
self.store.start(offset=offset, target=target)
|
||||
if not SyncDriver.signal_set:
|
||||
for sig in SyncDriver.signal_request:
|
||||
if not Syncer.signal_set:
|
||||
for sig in Syncer.signal_request:
|
||||
signal.signal(sig, self.__sig_terminate)
|
||||
SyncDriver.signal_set = True
|
||||
Syncer.signal_set = True
|
||||
|
||||
|
||||
def __sig_terminate(self, sig, frame):
|
||||
@ -55,90 +79,48 @@ class SyncDriver:
|
||||
"""Set syncer to terminate as soon as possible.
|
||||
"""
|
||||
logg.info('termination requested!')
|
||||
SyncDriver.running_global = False
|
||||
Syncer.running_global = False
|
||||
self.running = False
|
||||
|
||||
|
||||
def run(self, conn, interval=1):
|
||||
while self.running_global:
|
||||
self.session = SyncSession(self.store)
|
||||
item = self.session.start()
|
||||
if item == None:
|
||||
self.running = False
|
||||
self.running_global = False
|
||||
break
|
||||
self.loop(conn, item, interval=interval)
|
||||
def add_filter(self, f):
|
||||
"""Add filter to be processed for each transaction.
|
||||
|
||||
|
||||
def idle(self, interval):
|
||||
interval *= NS_DIV
|
||||
idle_start = time.clock_gettime_ns(self.clock_id)
|
||||
delta = idle_start - self.last_start
|
||||
if delta > interval:
|
||||
interval /= NS_DIV
|
||||
time.sleep(interval)
|
||||
return
|
||||
|
||||
if self.idle_callback != None:
|
||||
r = True
|
||||
while r:
|
||||
before = time.clock_gettime_ns(self.clock_id)
|
||||
r = self.idle_callback(interval)
|
||||
after = time.clock_gettime_ns(self.clock_id)
|
||||
delta = after - before
|
||||
if delta < 0:
|
||||
return
|
||||
interval -= delta
|
||||
if interval < 0:
|
||||
return
|
||||
|
||||
interval /= NS_DIV
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def loop(self, conn, item, interval=1):
|
||||
logg.debug('started loop')
|
||||
while self.running and SyncDriver.running_global:
|
||||
self.last_start = time.clock_gettime_ns(self.clock_id)
|
||||
|
||||
if self.pre_callback != None:
|
||||
self.pre_callback()
|
||||
|
||||
while True and self.running:
|
||||
try:
|
||||
block = self.get(conn, item)
|
||||
except SyncDone as e:
|
||||
logg.info('all blocks sumitted for processing: {}'.format(e))
|
||||
return
|
||||
except NoBlockForYou as e:
|
||||
break
|
||||
if self.block_callback != None:
|
||||
self.block_callback(block, None)
|
||||
|
||||
try:
|
||||
self.process(conn, item, block)
|
||||
except IndexError:
|
||||
item.next(advance_block=True)
|
||||
time.sleep(self.yield_delay)
|
||||
|
||||
if self.store.target > -1 and block.number >= self.store.target:
|
||||
self.running = False
|
||||
|
||||
if self.post_callback != None:
|
||||
self.post_callback()
|
||||
|
||||
|
||||
self.idle(interval)
|
||||
:param f: Filter
|
||||
:type f: Object instance implementing signature as in chainsyncer.filter.NoopFilter.filter
|
||||
"""
|
||||
self.filter.add(f)
|
||||
self.backend.register_filter(str(f))
|
||||
|
||||
|
||||
def process_single(self, conn, block, tx):
|
||||
self.session.filter(conn, block, tx)
|
||||
"""Set syncer backend cursor to the given transaction index and block height, and apply all registered filters on transaction.
|
||||
|
||||
:param conn: RPC connection instance
|
||||
:type conn: chainlib.connection.RPCConnection
|
||||
:param block: Block object
|
||||
:type block: chainlib.block.Block
|
||||
:param block: Transaction object
|
||||
:type block: chainlib.tx.Tx
|
||||
"""
|
||||
self.backend.set(block.number, tx.index)
|
||||
self.filter.apply(conn, block, tx)
|
||||
|
||||
|
||||
def process(self, conn, item, block):
|
||||
def loop(self, interval, conn):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def process(self, conn, block):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return 'syncer "{}" {}'.format(
|
||||
self.name,
|
||||
self.backend,
|
||||
)
|
||||
|
@ -1,57 +0,0 @@
|
||||
# external imports
|
||||
from chainlib.error import RPCException
|
||||
|
||||
# local imports
|
||||
from chainsyncer.error import NoBlockForYou
|
||||
from chainsyncer.driver import SyncDriver
|
||||
|
||||
|
||||
class ChainInterfaceDriver(SyncDriver):
|
||||
|
||||
def __init__(self, store, chain_interface, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None):
|
||||
super(ChainInterfaceDriver, self).__init__(store, offset=offset, target=target, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_callback, idle_callback=idle_callback)
|
||||
self.chain_interface = chain_interface
|
||||
|
||||
|
||||
def get(self, conn, item):
|
||||
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
|
||||
|
||||
:param conn: RPC connection
|
||||
:type conn: chainlib.connectin.RPCConnection
|
||||
:raises NoBlockForYou: Block at the given height does not exist
|
||||
:rtype: chainlib.block.Block
|
||||
:returns: Block object
|
||||
"""
|
||||
o = self.chain_interface.block_by_number(item.cursor)
|
||||
try:
|
||||
r = conn.do(o)
|
||||
except RPCException:
|
||||
r = None
|
||||
if r == None:
|
||||
raise NoBlockForYou()
|
||||
b = self.chain_interface.block_from_src(r)
|
||||
b.txs = b.txs[item.tx_cursor:]
|
||||
|
||||
return b
|
||||
|
||||
|
||||
def process(self, conn, item, block):
|
||||
tx_src = None
|
||||
i = item.tx_cursor
|
||||
while True:
|
||||
# handle block objects regardless of whether the tx data is embedded or not
|
||||
try:
|
||||
tx = block.tx(i)
|
||||
except AttributeError:
|
||||
tx_hash = block.txs[i]
|
||||
o = self.chain_interface.tx_by_hash(tx_hash, block=block)
|
||||
r = conn.do(o)
|
||||
#tx = self.chain_interface.tx_from_src(tx_src, block=block)
|
||||
|
||||
rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash))
|
||||
if rcpt != None:
|
||||
tx.apply_receipt(self.chain_interface.src_normalize(rcpt))
|
||||
|
||||
self.process_single(conn, block, tx)
|
||||
|
||||
i += 1
|
86
chainsyncer/driver/head.py
Normal file
86
chainsyncer/driver/head.py
Normal file
@ -0,0 +1,86 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from chainlib.eth.tx import (
|
||||
transaction,
|
||||
Tx,
|
||||
)
|
||||
from chainlib.error import RPCException
|
||||
|
||||
# local imports
|
||||
from chainsyncer.error import NoBlockForYou
|
||||
from .poll import BlockPollSyncer
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
class HeadSyncer(BlockPollSyncer):
|
||||
"""Extends the block poller, implementing an open-ended syncer.
|
||||
"""
|
||||
|
||||
name = 'head'
|
||||
|
||||
def process(self, conn, block):
|
||||
"""Process a single block using the given RPC connection.
|
||||
|
||||
Processing means that all filters are executed on all transactions in the block.
|
||||
|
||||
If the block object does not contain the transaction details, the details will be retrieved from the network (incurring the corresponding performance penalty).
|
||||
|
||||
:param conn: RPC connection
|
||||
:type conn: chainlib.connection.RPCConnection
|
||||
:param block: Block object
|
||||
:type block: chainlib.block.Block
|
||||
"""
|
||||
(pair, fltr) = self.backend.get()
|
||||
logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr))
|
||||
i = pair[1] # set tx index from previous
|
||||
tx_src = None
|
||||
while True:
|
||||
# handle block objects regardless of whether the tx data is embedded or not
|
||||
try:
|
||||
tx = block.tx(i)
|
||||
except AttributeError:
|
||||
o = transaction(block.txs[i])
|
||||
r = conn.do(o)
|
||||
tx_src = Tx.src_normalize(r)
|
||||
tx = self.chain_interface.tx_from_src(tx_src, block=block)
|
||||
|
||||
|
||||
#except IndexError as e:
|
||||
# logg.debug('index error syncer tx get {}'.format(e))
|
||||
# break
|
||||
|
||||
rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash))
|
||||
if rcpt != None:
|
||||
tx.apply_receipt(self.chain_interface.src_normalize(rcpt))
|
||||
|
||||
self.process_single(conn, block, tx)
|
||||
self.backend.reset_filter()
|
||||
|
||||
i += 1
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
|
||||
|
||||
:param conn: RPC connection
|
||||
:type conn: chainlib.connectin.RPCConnection
|
||||
:raises NoBlockForYou: Block at the given height does not exist
|
||||
:rtype: chainlib.block.Block
|
||||
:returns: Block object
|
||||
"""
|
||||
(height, flags) = self.backend.get()
|
||||
block_number = height[0]
|
||||
block_hash = []
|
||||
o = self.chain_interface.block_by_number(block_number)
|
||||
try:
|
||||
r = conn.do(o)
|
||||
except RPCException:
|
||||
r = None
|
||||
if r == None:
|
||||
raise NoBlockForYou()
|
||||
b = self.chain_interface.block_from_src(r)
|
||||
b.txs = b.txs[height[1]:]
|
||||
|
||||
return b
|
56
chainsyncer/driver/history.py
Normal file
56
chainsyncer/driver/history.py
Normal file
@ -0,0 +1,56 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from chainlib.error import RPCException
|
||||
|
||||
# local imports
|
||||
from .head import HeadSyncer
|
||||
from chainsyncer.error import SyncDone
|
||||
from chainlib.error import RPCException
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HistorySyncer(HeadSyncer):
|
||||
"""Bounded syncer implementation of the block poller. Reuses the head syncer process method implementation.
|
||||
|
||||
|
||||
"""
|
||||
name = 'history'
|
||||
|
||||
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
|
||||
super(HeadSyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
|
||||
self.block_target = None
|
||||
(block_number, flags) = self.backend.target()
|
||||
if block_number == None:
|
||||
raise AttributeError('backend has no future target. Use HeadSyner instead')
|
||||
self.block_target = block_number
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
|
||||
|
||||
:param conn: RPC connection
|
||||
:type conn: chainlib.connectin.RPCConnection
|
||||
:raises SyncDone: Block target reached (at which point the syncer should terminate).
|
||||
:rtype: chainlib.block.Block
|
||||
:returns: Block object
|
||||
:todo: DRY against HeadSyncer
|
||||
"""
|
||||
(height, flags) = self.backend.get()
|
||||
if self.block_target < height[0]:
|
||||
raise SyncDone(self.block_target)
|
||||
block_number = height[0]
|
||||
block_hash = []
|
||||
o = self.chain_interface.block_by_number(block_number, include_tx=True)
|
||||
try:
|
||||
r = conn.do(o)
|
||||
# TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future
|
||||
except RPCException:
|
||||
r = None
|
||||
if r == None:
|
||||
raise SyncDone()
|
||||
b = self.chain_interface.block_from_src(r)
|
||||
|
||||
return b
|
99
chainsyncer/driver/poll.py
Normal file
99
chainsyncer/driver/poll.py
Normal file
@ -0,0 +1,99 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import time
|
||||
|
||||
# local imports
|
||||
from .base import Syncer
|
||||
from chainsyncer.error import (
|
||||
SyncDone,
|
||||
NoBlockForYou,
|
||||
)
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
NS_DIV = 1000000000
|
||||
|
||||
class BlockPollSyncer(Syncer):
|
||||
"""Syncer driver implementation of chainsyncer.driver.base.Syncer that retrieves new blocks through polling.
|
||||
"""
|
||||
|
||||
name = 'blockpoll'
|
||||
|
||||
|
||||
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, idle_callback=None):
|
||||
super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback=pre_callback, block_callback=block_callback, post_callback=post_callback)
|
||||
self.idle_callback = idle_callback
|
||||
self.last_start = 0
|
||||
self.clock_id = time.CLOCK_MONOTONIC_RAW
|
||||
|
||||
|
||||
def idle(self, interval):
|
||||
interval *= NS_DIV
|
||||
idle_start = time.clock_gettime_ns(self.clock_id)
|
||||
delta = idle_start - self.last_start
|
||||
if delta > interval:
|
||||
interval /= NS_DIV
|
||||
time.sleep(interval)
|
||||
return
|
||||
|
||||
if self.idle_callback != None:
|
||||
r = True
|
||||
while r:
|
||||
before = time.clock_gettime_ns(self.clock_id)
|
||||
r = self.idle_callback(interval)
|
||||
after = time.clock_gettime_ns(self.clock_id)
|
||||
delta = after - before
|
||||
if delta < 0:
|
||||
return
|
||||
interval -= delta
|
||||
if interval < 0:
|
||||
return
|
||||
|
||||
interval /= NS_DIV
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
"""Indefinite loop polling the given RPC connection for new blocks in the given interval.
|
||||
|
||||
:param interval: Seconds to wait for next poll after processing of previous poll has been completed.
|
||||
:type interval: int
|
||||
:param conn: RPC connection
|
||||
:type conn: chainlib.connection.RPCConnection
|
||||
:rtype: tuple
|
||||
:returns: See chainsyncer.backend.base.Backend.get
|
||||
"""
|
||||
(pair, fltr) = self.backend.get()
|
||||
start_tx = pair[1]
|
||||
|
||||
|
||||
while self.running and Syncer.running_global:
|
||||
self.last_start = time.clock_gettime_ns(self.clock_id)
|
||||
if self.pre_callback != None:
|
||||
self.pre_callback()
|
||||
while True and self.running:
|
||||
if start_tx > 0:
|
||||
start_tx -= 1
|
||||
continue
|
||||
try:
|
||||
block = self.get(conn)
|
||||
except SyncDone as e:
|
||||
logg.info('all blocks sumitted for processing: {}'.format(e))
|
||||
return self.backend.get()
|
||||
except NoBlockForYou as e:
|
||||
break
|
||||
if self.block_callback != None:
|
||||
self.block_callback(block, None)
|
||||
|
||||
last_block = block
|
||||
try:
|
||||
self.process(conn, block)
|
||||
except IndexError:
|
||||
self.backend.set(block.number + 1, 0)
|
||||
start_tx = 0
|
||||
time.sleep(self.yield_delay)
|
||||
if self.post_callback != None:
|
||||
self.post_callback()
|
||||
|
||||
self.idle(interval)
|
133
chainsyncer/driver/thread.py
Normal file
133
chainsyncer/driver/thread.py
Normal file
@ -0,0 +1,133 @@
|
||||
# standard imports
|
||||
import logging
|
||||
#import threading
|
||||
import multiprocessing
|
||||
import queue
|
||||
|
||||
# external imports
|
||||
from chainlib.error import RPCException
|
||||
|
||||
# local imports
|
||||
from .history import HistorySyncer
|
||||
from chainsyncer.error import SyncDone
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
class ThreadedHistorySyncer(HistorySyncer):
|
||||
|
||||
def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0):
|
||||
super(ThreadedHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
|
||||
self.workers = []
|
||||
if conn_limit == 0:
|
||||
conn_limit = thread_limit
|
||||
#self.conn_pool = queue.Queue(conn_limit)
|
||||
#self.queue = queue.Queue(thread_limit)
|
||||
#self.quit_queue = queue.Queue(1)
|
||||
self.conn_pool = multiprocessing.Queue(conn_limit)
|
||||
self.queue = multiprocessing.Queue(thread_limit)
|
||||
self.quit_queue = multiprocessing.Queue(1)
|
||||
#self.lock = threading.Lock()
|
||||
self.lock = multiprocessing.Lock()
|
||||
for i in range(thread_limit):
|
||||
#w = threading.Thread(target=self.worker)
|
||||
w = multiprocessing.Process(target=self.worker)
|
||||
self.workers.append(w)
|
||||
|
||||
for i in range(conn_limit):
|
||||
self.conn_pool.put(conn_factory())
|
||||
|
||||
|
||||
def terminate(self):
|
||||
self.quit_queue.put(())
|
||||
super(ThreadedHistorySyncer, self).terminate()
|
||||
|
||||
|
||||
def worker(self):
|
||||
while True:
|
||||
block_number = None
|
||||
try:
|
||||
block_number = self.queue.get(timeout=0.01)
|
||||
except queue.Empty:
|
||||
if self.quit_queue.qsize() > 0:
|
||||
#logg.debug('{} received quit'.format(threading.current_thread().getName()))
|
||||
logg.debug('{} received quit'.format(multiprocessing.current_process().name))
|
||||
return
|
||||
continue
|
||||
conn = self.conn_pool.get()
|
||||
try:
|
||||
logg.debug('processing parent {} {}'.format(conn, block_number))
|
||||
self.process_parent(conn, block_number)
|
||||
except IndexError:
|
||||
pass
|
||||
except RPCException as e:
|
||||
logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
|
||||
self.queue.put(block_number)
|
||||
conn = self.conn_pool.put(conn)
|
||||
|
||||
|
||||
def process_parent(self, conn, block_number):
|
||||
logg.debug('getting block {}'.format(block_number))
|
||||
o = self.chain_interface.block_by_number(block_number)
|
||||
r = conn.do(o)
|
||||
block = self.chain_interface.block_from_src(r)
|
||||
logg.debug('got block typ {}'.format(type(block)))
|
||||
super(ThreadedHistorySyncer, self).process(conn, block)
|
||||
|
||||
|
||||
def process_single(self, conn, block, tx):
|
||||
self.filter.apply(conn, block, tx)
|
||||
|
||||
|
||||
def process(self, conn, block):
|
||||
pass
|
||||
|
||||
|
||||
#def process(self, conn, block):
|
||||
def get(self, conn):
|
||||
if not self.running:
|
||||
raise SyncDone()
|
||||
|
||||
block_number = None
|
||||
tx_index = None
|
||||
flags = None
|
||||
((block_number, tx_index), flags) = self.backend.get()
|
||||
try:
|
||||
#logg.debug('putting {}'.format(block.number))
|
||||
#self.queue.put((conn, block_number,), timeout=0.1)
|
||||
self.queue.put(block_number, timeout=0.1)
|
||||
except queue.Full:
|
||||
#logg.debug('queue full, try again')
|
||||
return
|
||||
|
||||
target, flags = self.backend.target()
|
||||
next_block = block_number + 1
|
||||
if next_block > target:
|
||||
self.quit_queue.put(())
|
||||
raise SyncDone()
|
||||
self.backend.set(self.backend.block_height + 1, 0)
|
||||
|
||||
|
||||
# def get(self, conn):
|
||||
# try:
|
||||
# r = super(ThreadedHistorySyncer, self).get(conn)
|
||||
# return r
|
||||
# except SyncDone as e:
|
||||
# self.quit_queue.put(())
|
||||
# raise e
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
for w in self.workers:
|
||||
w.start()
|
||||
r = super(ThreadedHistorySyncer, self).loop(interval, conn)
|
||||
for w in self.workers:
|
||||
w.join()
|
||||
while True:
|
||||
try:
|
||||
self.quit_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
logg.info('workers done {}'.format(r))
|
170
chainsyncer/driver/threadpool.py
Normal file
170
chainsyncer/driver/threadpool.py
Normal file
@ -0,0 +1,170 @@
|
||||
# standard imports
|
||||
import logging
|
||||
#import threading
|
||||
import multiprocessing
|
||||
import queue
|
||||
import time
|
||||
|
||||
# external imports
|
||||
from chainlib.error import RPCException
|
||||
|
||||
# local imports
|
||||
from .history import HistorySyncer
|
||||
from chainsyncer.error import SyncDone
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def foobarcb(v):
|
||||
logg.debug('foooz {}'.format(v))
|
||||
|
||||
|
||||
class ThreadPoolTask:
|
||||
|
||||
process_func = None
|
||||
chain_interface = None
|
||||
|
||||
def poolworker(self, block_number, conn):
|
||||
# conn = args[1].get()
|
||||
try:
|
||||
logg.debug('processing parent {} {}'.format(conn, block_number))
|
||||
#self.process_parent(self.conn, block_number)
|
||||
self.process_parent(conn, block_number)
|
||||
except IndexError:
|
||||
pass
|
||||
except RPCException as e:
|
||||
logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
|
||||
raise e
|
||||
#self.queue.put(block_number)
|
||||
# conn = self.conn_pool.put(conn)
|
||||
|
||||
def process_parent(self, conn, block_number):
|
||||
logg.debug('getting block {}'.format(block_number))
|
||||
o = self.chain_interface.block_by_number(block_number)
|
||||
r = conn.do(o)
|
||||
block = self.chain_interface.block_from_src(r)
|
||||
logg.debug('got block typ {}'.format(type(block)))
|
||||
#super(ThreadedHistorySyncer, self).process(conn, block)
|
||||
self.process_func(conn, block)
|
||||
|
||||
|
||||
|
||||
class ThreadPoolHistorySyncer(HistorySyncer):
|
||||
|
||||
def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0):
|
||||
super(ThreadPoolHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
|
||||
self.workers = []
|
||||
self.thread_limit = thread_limit
|
||||
if conn_limit == 0:
|
||||
self.conn_limit = self.thread_limit
|
||||
#self.conn_pool = queue.Queue(conn_limit)
|
||||
#self.queue = queue.Queue(thread_limit)
|
||||
#self.quit_queue = queue.Queue(1)
|
||||
#self.conn_pool = multiprocessing.Queue(conn_limit)
|
||||
#self.queue = multiprocessing.Queue(thread_limit)
|
||||
#self.quit_queue = multiprocessing.Queue(1)
|
||||
#self.lock = threading.Lock()
|
||||
#self.lock = multiprocessing.Lock()
|
||||
ThreadPoolTask.process_func = super(ThreadPoolHistorySyncer, self).process
|
||||
ThreadPoolTask.chain_interface = chain_interface
|
||||
#for i in range(thread_limit):
|
||||
#w = threading.Thread(target=self.worker)
|
||||
# w = multiprocessing.Process(target=self.worker)
|
||||
# self.workers.append(w)
|
||||
|
||||
#for i in range(conn_limit):
|
||||
# self.conn_pool.put(conn_factory())
|
||||
self.conn_factory = conn_factory
|
||||
self.worker_pool = None
|
||||
|
||||
|
||||
def terminate(self):
|
||||
#self.quit_queue.put(())
|
||||
super(ThreadPoolHistorySyncer, self).terminate()
|
||||
|
||||
|
||||
# def worker(self):
|
||||
# while True:
|
||||
# block_number = None
|
||||
# try:
|
||||
# block_number = self.queue.get(timeout=0.01)
|
||||
# except queue.Empty:
|
||||
# if self.quit_queue.qsize() > 0:
|
||||
# #logg.debug('{} received quit'.format(threading.current_thread().getName()))
|
||||
# logg.debug('{} received quit'.format(multiprocessing.current_process().name))
|
||||
# return
|
||||
# continue
|
||||
# conn = self.conn_pool.get()
|
||||
# try:
|
||||
# logg.debug('processing parent {} {}'.format(conn, block_number))
|
||||
# self.process_parent(conn, block_number)
|
||||
# except IndexError:
|
||||
# pass
|
||||
# except RPCException as e:
|
||||
# logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
|
||||
# self.queue.put(block_number)
|
||||
# conn = self.conn_pool.put(conn)
|
||||
#
|
||||
|
||||
|
||||
def process_single(self, conn, block, tx):
|
||||
self.filter.apply(conn, block, tx)
|
||||
|
||||
|
||||
def process(self, conn, block):
|
||||
pass
|
||||
|
||||
|
||||
def get(self, conn):
|
||||
if not self.running:
|
||||
raise SyncDone()
|
||||
|
||||
block_number = None
|
||||
tx_index = None
|
||||
flags = None
|
||||
((block_number, tx_index), flags) = self.backend.get()
|
||||
#try:
|
||||
#logg.debug('putting {}'.format(block.number))
|
||||
#self.queue.put((conn, block_number,), timeout=0.1)
|
||||
#self.queue.put(block_number, timeout=0.1)
|
||||
#except queue.Full:
|
||||
#logg.debug('queue full, try again')
|
||||
# return
|
||||
task = ThreadPoolTask()
|
||||
conn = self.conn_factory()
|
||||
self.worker_pool.apply_async(task.poolworker, (block_number, conn,), {}, foobarcb)
|
||||
|
||||
target, flags = self.backend.target()
|
||||
next_block = block_number + 1
|
||||
if next_block > target:
|
||||
#self.quit_queue.put(())
|
||||
self.worker_pool.close()
|
||||
raise SyncDone()
|
||||
self.backend.set(self.backend.block_height + 1, 0)
|
||||
|
||||
|
||||
# def get(self, conn):
|
||||
# try:
|
||||
# r = super(ThreadedHistorySyncer, self).get(conn)
|
||||
# return r
|
||||
# except SyncDone as e:
|
||||
# self.quit_queue.put(())
|
||||
# raise e
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
self.worker_pool = multiprocessing.Pool(self.thread_limit)
|
||||
#for w in self.workers:
|
||||
# w.start()
|
||||
r = super(ThreadPoolHistorySyncer, self).loop(interval, conn)
|
||||
#for w in self.workers:
|
||||
# w.join()
|
||||
#while True:
|
||||
# try:
|
||||
# self.quit_queue.get_nowait()
|
||||
# except queue.Empty:
|
||||
# break
|
||||
time.sleep(1)
|
||||
self.worker_pool.join()
|
||||
|
||||
logg.info('workers done {}'.format(r))
|
81
chainsyncer/driver/threadrange.py
Normal file
81
chainsyncer/driver/threadrange.py
Normal file
@ -0,0 +1,81 @@
|
||||
# standard imports
|
||||
import copy
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
|
||||
# external iports
|
||||
from chainlib.eth.connection import RPCConnection
|
||||
# local imports
|
||||
from chainsyncer.driver.history import HistorySyncer
|
||||
from chainsyncer.driver.base import Syncer
|
||||
from .threadpool import ThreadPoolTask
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def sync_split(block_offset, block_target, count):
|
||||
block_count = block_target - block_offset
|
||||
if block_count < count:
|
||||
logg.warning('block count is less than thread count, adjusting thread count to {}'.format(block_count))
|
||||
count = block_count
|
||||
blocks_per_thread = int(block_count / count)
|
||||
|
||||
ranges = []
|
||||
for i in range(count):
|
||||
block_target = block_offset + blocks_per_thread
|
||||
offset = block_offset
|
||||
target = block_target -1
|
||||
ranges.append((offset, target,))
|
||||
block_offset = block_target
|
||||
return ranges
|
||||
|
||||
|
||||
class ThreadPoolRangeTask:
|
||||
|
||||
def __init__(self, backend, sync_range, chain_interface, syncer_factory=HistorySyncer, filters=[]):
|
||||
backend_start = backend.start()
|
||||
backend_target = backend.target()
|
||||
backend_class = backend.__class__
|
||||
tx_offset = 0
|
||||
flags = 0
|
||||
if sync_range[0] == backend_start[0][0]:
|
||||
tx_offset = backend_start[0][1]
|
||||
flags = backend_start[1]
|
||||
self.backend = backend_class.custom(backend.chain_spec, sync_range[1], block_offset=sync_range[0], tx_offset=tx_offset, flags=flags, flags_count=0)
|
||||
self.syncer = syncer_factory(self.backend, chain_interface)
|
||||
for fltr in filters:
|
||||
self.syncer.add_filter(fltr)
|
||||
|
||||
def start_loop(self, interval):
|
||||
conn = RPCConnection.connect(self.backend.chain_spec)
|
||||
return self.syncer.loop(interval, conn)
|
||||
|
||||
|
||||
class ThreadPoolRangeHistorySyncer:
|
||||
|
||||
def __init__(self, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None):
|
||||
self.src_backend = backend
|
||||
self.thread_count = thread_count
|
||||
self.single_sync_offset = 0
|
||||
self.runlevel_callback = None
|
||||
backend_start = backend.start()
|
||||
backend_target = backend.target()
|
||||
self.ranges = sync_split(backend_start[0][0], backend_target[0], thread_count)
|
||||
self.chain_interface = chain_interface
|
||||
self.filters = []
|
||||
|
||||
|
||||
def add_filter(self, f):
|
||||
self.filters.append(f)
|
||||
|
||||
|
||||
def loop(self, interval, conn):
|
||||
self.worker_pool = multiprocessing.Pool(processes=self.thread_count)
|
||||
|
||||
for sync_range in self.ranges:
|
||||
task = ThreadPoolRangeTask(self.src_backend, sync_range, self.chain_interface, filters=self.filters)
|
||||
t = self.worker_pool.apply_async(task.start_loop, (0.1,))
|
||||
logg.debug('result of worker {}: {}'.format(t, t.get()))
|
||||
self.worker_pool.close()
|
||||
self.worker_pool.join()
|
@ -3,7 +3,6 @@ class SyncDone(Exception):
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class NoBlockForYou(Exception):
|
||||
"""Exception raised when attempt to retrieve a block from network that does not (yet) exist.
|
||||
"""
|
||||
@ -28,20 +27,6 @@ class LockError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FilterDone(Exception):
|
||||
"""Exception raised when all registered filters have been executed
|
||||
"""
|
||||
|
||||
|
||||
class InterruptError(FilterDone):
|
||||
"""Exception for interrupting or attempting to use an interrupted sync
|
||||
"""
|
||||
|
||||
|
||||
class IncompleteFilterError(Exception):
|
||||
"""Exception raised if filter reset is executed prematurely
|
||||
"""
|
||||
|
||||
#class AbortTx(Exception):
|
||||
# """
|
||||
# """
|
||||
|
@ -1,131 +1,96 @@
|
||||
# standard imports
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
|
||||
# local imports
|
||||
from .error import BackendError
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
re_processedname = r'^_?[A-Z,\.]*$'
|
||||
|
||||
|
||||
class SyncFilter:
|
||||
"""Manages the collection of filters on behalf of a specific backend.
|
||||
|
||||
def sum(self):
|
||||
s = self.common_name()
|
||||
h = hashlib.sha256()
|
||||
h.update(s.encode('utf-8'))
|
||||
return h.digest()
|
||||
A filter is a pluggable piece of code to execute for every transaction retrieved by the syncer. Filters are executed in the sequence they were added to the instance.
|
||||
|
||||
:param backend: Syncer backend to apply filter state changes to
|
||||
:type backend: chainsyncer.backend.base.Backend implementation
|
||||
"""
|
||||
|
||||
def __init__(self, backend):
|
||||
self.filters = []
|
||||
self.backend = backend
|
||||
|
||||
|
||||
def filter(self, conn, block, tx):
|
||||
raise NotImplementedError()
|
||||
def add(self, fltr):
|
||||
"""Add a filter instance.
|
||||
|
||||
:param fltr: Filter instance.
|
||||
:type fltr: Object instance implementing signature as in chainsyncer.filter.NoopFilter.filter
|
||||
:raises ValueError: Object instance is incorrect implementation
|
||||
"""
|
||||
if getattr(fltr, 'filter') == None:
|
||||
raise ValueError('filter object must implement have method filter')
|
||||
logg.debug('added filter "{}"'.format(str(fltr)))
|
||||
|
||||
self.filters.append(fltr)
|
||||
|
||||
|
||||
def common_name(self):
|
||||
s = self.__module__ + '.' + self.__class__.__name__
|
||||
return s.replace('.', '_')
|
||||
def __apply_one(self, fltr, idx, conn, block, tx, session):
|
||||
self.backend.begin_filter(idx)
|
||||
fltr.filter(conn, block, tx, session)
|
||||
self.backend.complete_filter(idx)
|
||||
|
||||
|
||||
# TODO: properly clarify interface shared with syncfsstore, move to filter module?
|
||||
class FilterState:
|
||||
def apply(self, conn, block, tx):
|
||||
"""Apply all registered filters on the given transaction.
|
||||
|
||||
def __init__(self, state_store, scan=None):
|
||||
self.state_store = state_store
|
||||
self.digest = b'\x00' * 32
|
||||
self.summed = False
|
||||
self.__syncs = {}
|
||||
self.synced = False
|
||||
self.connected = False
|
||||
self.state_store.add('DONE')
|
||||
self.state_store.add('LOCK')
|
||||
self.state_store.add('INTERRUPT')
|
||||
self.state_store.add('RESET')
|
||||
|
||||
self.state = self.state_store.state
|
||||
self.elements = self.state_store.elements
|
||||
self.put = self.state_store.put
|
||||
self.mask = self.state_store.mask
|
||||
self.name = self.state_store.name
|
||||
self.set = self.state_store.set
|
||||
self.next = self.state_store.next
|
||||
self.move = self.state_store.move
|
||||
self.unset = self.state_store.unset
|
||||
self.peek = self.state_store.peek
|
||||
self.from_name = self.state_store.from_name
|
||||
self.list = self.state_store.list
|
||||
self.state_store.sync()
|
||||
self.all = self.state_store.all
|
||||
self.started = False
|
||||
|
||||
self.scan = scan
|
||||
|
||||
|
||||
def __verify_sum(self, v):
|
||||
if not isinstance(v, bytes) and not isinstance(v, bytearray):
|
||||
raise ValueError('argument must be instance of bytes')
|
||||
if len(v) != 32:
|
||||
raise ValueError('argument must be 32 bytes')
|
||||
|
||||
|
||||
def register(self, fltr):
|
||||
if self.summed:
|
||||
raise RuntimeError('filter already applied')
|
||||
z = fltr.sum()
|
||||
self.__verify_sum(z)
|
||||
self.digest += z
|
||||
s = fltr.common_name()
|
||||
self.state_store.add(s)
|
||||
n = self.state_store.from_name(s)
|
||||
logg.debug('add filter {} {} {}'.format(s, n, self))
|
||||
|
||||
|
||||
def sum(self):
|
||||
h = hashlib.sha256()
|
||||
h.update(self.digest)
|
||||
self.digest = h.digest()
|
||||
self.summed = True
|
||||
return self.digest
|
||||
|
||||
|
||||
def connect(self):
|
||||
if not self.synced:
|
||||
for v in self.state_store.all():
|
||||
k = self.state_store.from_name(v)
|
||||
self.state_store.sync(k)
|
||||
self.__syncs[v] = True
|
||||
if self.scan != None:
|
||||
ks = self.scan()
|
||||
for v in ks: #os.listdir(self.scan_path):
|
||||
logg.debug('ks {}'.format(v))
|
||||
k = None
|
||||
:param conn: RPC Connection, will be passed to the filter method
|
||||
:type conn: chainlib.connection.RPCConnection
|
||||
:param block: Block object
|
||||
:type block: chainlib.block.Block
|
||||
:param tx: Transaction object
|
||||
:type tx: chainlib.tx.Tx
|
||||
:raises BackendError: Backend connection failed
|
||||
"""
|
||||
session = None
|
||||
try:
|
||||
k = self.state_store.from_elements(v)
|
||||
self.state_store.alias(v, k)
|
||||
except ValueError:
|
||||
k = self.state_store.from_name(v)
|
||||
self.state_store.sync(k)
|
||||
self.__syncs[v] = True
|
||||
self.synced = True
|
||||
self.connected = True
|
||||
session = self.backend.connect()
|
||||
except TimeoutError as e:
|
||||
self.backend.disconnect()
|
||||
raise BackendError('database connection fail: {}'.format(e))
|
||||
i = 0
|
||||
(pair, flags) = self.backend.get()
|
||||
for f in self.filters:
|
||||
if not self.backend.check_filter(i, flags):
|
||||
logg.debug('applying filter {} {}'.format(str(f), flags))
|
||||
self.__apply_one(f, i, conn, block, tx, session)
|
||||
else:
|
||||
logg.debug('skipping previously applied filter {} {}'.format(str(f), flags))
|
||||
i += 1
|
||||
|
||||
self.backend.disconnect()
|
||||
|
||||
|
||||
def disconnect(self):
|
||||
self.connected = False
|
||||
class NoopFilter:
|
||||
"""A noop implemenation of a sync filter.
|
||||
|
||||
Logs the filter inputs at debug log level.
|
||||
"""
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
"""Filter method implementation:
|
||||
|
||||
:param conn: RPC Connection, will be passed to the filter method
|
||||
:type conn: chainlib.connection.RPCConnection
|
||||
:param block: Block object
|
||||
:type block: chainlib.block.Block
|
||||
:param tx: Transaction object
|
||||
:type tx: chainlib.tx.Tx
|
||||
:param db_session: Backend session object
|
||||
:type db_session: varies
|
||||
"""
|
||||
logg.debug('noop filter :received\n{} {} {}'.format(block, tx, id(db_session)))
|
||||
|
||||
|
||||
def start(self, offset=0, target=-1):
|
||||
self.state_store.start(offset=offset, target=target)
|
||||
self.started = True
|
||||
|
||||
|
||||
def get(self, k):
|
||||
return None
|
||||
|
||||
|
||||
def next_item(self):
|
||||
return None
|
||||
|
||||
|
||||
def filters(self):
|
||||
return []
|
||||
def __str__(self):
|
||||
return 'noopfilter'
|
||||
|
@ -1 +0,0 @@
|
||||
|
@ -1,146 +0,0 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
# standard imports
|
||||
import os
|
||||
import logging
|
||||
import sys
|
||||
import importlib
|
||||
|
||||
# external imports
|
||||
import chainlib.cli
|
||||
from shep.persist import PersistedState
|
||||
|
||||
# local imports
|
||||
import chainsyncer.cli
|
||||
from chainsyncer.settings import ChainsyncerSettings
|
||||
from chainsyncer.store import SyncStore
|
||||
from chainsyncer.filter import (
|
||||
FilterState,
|
||||
SyncFilter,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
|
||||
valid_fwd = [
|
||||
'fwd',
|
||||
'forward',
|
||||
'next',
|
||||
'continue',
|
||||
]
|
||||
|
||||
valid_rwd = [
|
||||
'rwd',
|
||||
'rewind',
|
||||
'current',
|
||||
'back',
|
||||
'repeat',
|
||||
'replay',
|
||||
]
|
||||
|
||||
action_is_forward = False
|
||||
|
||||
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
|
||||
argparser = chainlib.cli.ArgumentParser(arg_flags)
|
||||
argparser.add_argument('--state-dir', type=str, dest='state_dir', help='State directory')
|
||||
argparser.add_positional('action', type=str, help='Action to take on lock. Repeat means re-run the locked filter. Continue means resume execution for next filter.')
|
||||
|
||||
sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD
|
||||
chainsyncer.cli.process_flags(argparser, sync_flags)
|
||||
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.action in valid_fwd:
|
||||
action_is_forward = True
|
||||
elif args.action not in valid_rwd:
|
||||
sys.stderr.write('action argument must be one of {} or {}\n'.format(valid_rwd, valid_fwd))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
base_config_dir = chainsyncer.cli.config_dir,
|
||||
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
|
||||
config = chainsyncer.cli.process_config(config, args, sync_flags)
|
||||
config.add(args.state_dir, '_STATE_DIR', False)
|
||||
logg.debug('config loaded:\n{}'.format(config))
|
||||
|
||||
settings = ChainsyncerSettings()
|
||||
settings.process_sync_backend(config)
|
||||
logg.debug('settings:\n{}'.format(str(settings)))
|
||||
|
||||
|
||||
class FilterInNameOnly(SyncFilter):
|
||||
|
||||
def __init__(self, k):
|
||||
self.k = k
|
||||
|
||||
|
||||
def common_name(self):
|
||||
return self.k
|
||||
|
||||
|
||||
def main():
|
||||
if settings.get('SYNCER_BACKEND') == 'mem':
|
||||
raise ValueError('cannot unlock volatile state store')
|
||||
|
||||
state_dir = config.get('_STATE_DIR')
|
||||
|
||||
if config.get('SYNCER_BACKEND') == 'fs':
|
||||
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
|
||||
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
|
||||
elif config.get('SYNCER_BACKEND') == 'rocksdb':
|
||||
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
|
||||
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
|
||||
else:
|
||||
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
|
||||
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
|
||||
|
||||
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
|
||||
|
||||
store = syncer_store_class(state_dir)
|
||||
|
||||
filter_list = store.load_filter_list()
|
||||
for i, k in enumerate(filter_list):
|
||||
fltr = FilterInNameOnly(k)
|
||||
store.register(fltr)
|
||||
filter_list[i] = k.upper()
|
||||
|
||||
store.connect()
|
||||
store.start(ignore_lock=True)
|
||||
|
||||
lock_state = store.filter_state.from_name('LOCK')
|
||||
locked_item = store.filter_state.list(lock_state)
|
||||
if len(locked_item) == 0:
|
||||
sys.stderr.write('Sync filter in {} is not locked\n'.format(state_dir))
|
||||
sys.exit(1)
|
||||
elif len(locked_item) > 1:
|
||||
sys.stderr.write('More than one locked item encountered in {}. That should never happen, so I do not know what to do next.\n'.format(state_dir))
|
||||
sys.exit(1)
|
||||
|
||||
locked_item_key = locked_item[0]
|
||||
locked_item = store.get(int(locked_item_key))
|
||||
locked_state = store.filter_state.state(locked_item_key) - lock_state
|
||||
locked_state_name = store.filter_state.name(locked_state)
|
||||
logg.info('found item "{}" in locked state {}'.format(locked_item, store.filter_state.name(locked_state)))
|
||||
|
||||
if action_is_forward:
|
||||
k = locked_state_name
|
||||
filter_index = None
|
||||
filter_index = filter_list.index(k)
|
||||
filter_pos = filter_index + 1
|
||||
filter_count = len(filter_list)
|
||||
logg.debug('Locked filter {} found at position {} of {}'.format(k, filter_pos, filter_count))
|
||||
if filter_pos == filter_count:
|
||||
logg.info('Locked filter {} is the last filter in the list. Executing filter reset'.format(k))
|
||||
locked_item.reset(check_incomplete=False)
|
||||
else:
|
||||
locked_item.advance(ignore_lock=True)
|
||||
store.filter_state.unset(locked_item_key, lock_state)
|
||||
else:
|
||||
filter_mask = 0xf
|
||||
filter_state = store.filter_state.mask(locked_state, filter_mask)
|
||||
logg.info('Chosen action is "{}": will continue execution at previous filter {}'.format(args.action, store.filter_state.name(filter_state)))
|
||||
store.filter_state.unset(locked_item_key, lock_state)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,42 +1,23 @@
|
||||
# standard imports
|
||||
import uuid
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from chainsyncer.error import FilterDone
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SyncSession:
|
||||
|
||||
def __init__(self, session_store):
|
||||
self.session_store = session_store
|
||||
self.started = self.session_store.started
|
||||
self.get = self.session_store.get
|
||||
self.next = self.session_store.next_item
|
||||
self.item = None
|
||||
self.filters = self.session_store.filters
|
||||
def __init__(self, state_store, session_id=None, is_default=False):
|
||||
if session_id == None:
|
||||
session_id = str(uuid.uuid4())
|
||||
is_default = True
|
||||
self.session_id = session_id
|
||||
self.is_default = is_default
|
||||
self.state_store = state_store
|
||||
self.filters = []
|
||||
|
||||
|
||||
def start(self, offset=0, target=-1):
|
||||
self.session_store.start(offset=offset, target=target)
|
||||
self.item = self.session_store.next_item()
|
||||
return self.item
|
||||
def add_filter(self, fltr):
|
||||
self.state_store.register(fltr)
|
||||
self.filters.append(fltr)
|
||||
|
||||
|
||||
def stop(self, item):
|
||||
self.session_store.stop(item)
|
||||
|
||||
|
||||
def filter(self, conn, block, tx):
|
||||
self.session_store.connect()
|
||||
for fltr in self.filters:
|
||||
logg.debug('executing filter {}'.format(fltr))
|
||||
self.item.advance()
|
||||
interrupt = fltr.filter(conn, block, tx)
|
||||
if not self.item.release(interrupt=interrupt):
|
||||
break
|
||||
self.item.reset()
|
||||
self.next()
|
||||
self.session_store.disconnect()
|
||||
def start(self):
|
||||
self.state_store.start()
|
||||
|
@ -1,55 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from hexathon import (
|
||||
to_int as hex_to_int,
|
||||
strip_0x,
|
||||
)
|
||||
from chainlib.settings import ChainSettings
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChainsyncerSettings(ChainSettings):
|
||||
|
||||
def process_sync_backend(self, config):
|
||||
self.o['SYNCER_BACKEND'] = config.get('SYNCER_BACKEND')
|
||||
|
||||
|
||||
def process_sync_range(self, config):
|
||||
o = self.o['SYNCER_INTERFACE'].block_latest()
|
||||
r = self.o['RPC'].do(o)
|
||||
block_offset = int(strip_0x(r), 16) + 1
|
||||
logg.info('network block height at startup is {}'.format(block_offset))
|
||||
|
||||
keep_alive = False
|
||||
session_block_offset = 0
|
||||
block_limit = 0
|
||||
until = 0
|
||||
|
||||
if config.true('_HEAD'):
|
||||
self.o['SYNCER_OFFSET'] = block_offset
|
||||
self.o['SYNCER_LIMIT'] = -1
|
||||
return
|
||||
|
||||
session_block_offset = int(config.get('SYNCER_OFFSET'))
|
||||
until = int(config.get('SYNCER_LIMIT'))
|
||||
|
||||
if until > 0:
|
||||
if until <= session_block_offset:
|
||||
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
|
||||
block_limit = until
|
||||
elif until == -1:
|
||||
keep_alive = True
|
||||
|
||||
if session_block_offset == -1:
|
||||
session_block_offset = block_offset
|
||||
elif config.true('_KEEP_ALIVE'):
|
||||
block_limit = -1
|
||||
else:
|
||||
if block_limit == 0:
|
||||
block_limit = block_offset
|
||||
|
||||
self.o['SYNCER_OFFSET'] = session_block_offset
|
||||
self.o['SYNCER_LIMIT'] = block_limit
|
1
chainsyncer/state/__init__.py
Normal file
1
chainsyncer/state/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .base import SyncState
|
42
chainsyncer/state/base.py
Normal file
42
chainsyncer/state/base.py
Normal file
@ -0,0 +1,42 @@
|
||||
# standard imports
|
||||
import hashlib
|
||||
|
||||
class SyncState:
|
||||
|
||||
def __init__(self, state_store):
|
||||
self.store = state_store
|
||||
self.digest = b'\x00' * 32
|
||||
self.summed = False
|
||||
self.synced = {}
|
||||
|
||||
|
||||
def __verify_sum(self, v):
|
||||
if not isinstance(v, bytes) and not isinstance(v, bytearray):
|
||||
raise ValueError('argument must be instance of bytes')
|
||||
if len(v) != 32:
|
||||
raise ValueError('argument must be 32 bytes')
|
||||
|
||||
|
||||
def register(self, fltr):
|
||||
if self.summed:
|
||||
raise RuntimeError('filter already applied')
|
||||
z = fltr.sum()
|
||||
self.__verify_sum(z)
|
||||
self.digest += z
|
||||
s = fltr.common_name()
|
||||
self.store.add('i_' + s)
|
||||
self.store.add('o_' + s)
|
||||
|
||||
|
||||
def sum(self):
|
||||
h = hashlib.sha256()
|
||||
h.update(self.digest)
|
||||
self.digest = h.digest()
|
||||
self.summed = True
|
||||
return self.digest
|
||||
|
||||
|
||||
def start(self):
|
||||
for v in self.store.all():
|
||||
self.store.sync(v)
|
||||
self.synced[v] = True
|
@ -1 +0,0 @@
|
||||
from .base import *
|
@ -1,322 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from shep.persist import PersistedState
|
||||
from shep import State
|
||||
from shep.error import StateInvalid
|
||||
from chainsyncer.filter import FilterState
|
||||
from chainsyncer.error import (
|
||||
LockError,
|
||||
FilterDone,
|
||||
InterruptError,
|
||||
IncompleteFilterError,
|
||||
SyncDone,
|
||||
)
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def sync_state_serialize(block_height, tx_index, block_target):
|
||||
b = block_height.to_bytes(4, 'big')
|
||||
b += tx_index.to_bytes(4, 'big')
|
||||
b += block_target.to_bytes(4, 'big', signed=True)
|
||||
return b
|
||||
|
||||
|
||||
def sync_state_deserialize(b):
|
||||
block_height = int.from_bytes(b[:4], 'big')
|
||||
tx_index = int.from_bytes(b[4:8], 'big')
|
||||
block_target = int.from_bytes(b[8:], 'big', signed=True)
|
||||
return (block_height, tx_index, block_target,)
|
||||
|
||||
|
||||
# NOT thread safe
|
||||
class SyncItem:
|
||||
|
||||
def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_lock=False):
|
||||
self.offset = offset
|
||||
self.target = target
|
||||
self.sync_state = sync_state
|
||||
self.filter_state = filter_state
|
||||
self.state_key = str(offset)
|
||||
|
||||
logg.debug('get key {}'.format(self.state_key))
|
||||
v = self.sync_state.get(self.state_key)
|
||||
|
||||
(self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
|
||||
|
||||
filter_state = self.filter_state.state(self.state_key)
|
||||
if filter_state & self.filter_state.from_name('LOCK') > 0 and not ignore_lock:
|
||||
raise LockError(self.state_key)
|
||||
|
||||
self.count = len(self.filter_state.all(pure=True)) - 4
|
||||
self.skip_filter = False
|
||||
if self.count == 0:
|
||||
self.skip_filter = True
|
||||
elif not started:
|
||||
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
|
||||
|
||||
|
||||
def __check_done(self):
|
||||
if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0:
|
||||
raise InterruptError(self.state_key)
|
||||
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') > 0:
|
||||
raise FilterDone(self.state_key)
|
||||
|
||||
|
||||
def resume(self):
|
||||
filter_state = self.filter_state.state(self.state_key)
|
||||
if filter_state > 0x0f:
|
||||
filter_state_part = self.filter_state.mask(filter_state, 0x0f)
|
||||
if len(self.filter_state.elements(filter_state)) == 1:
|
||||
logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part))
|
||||
lock_state = self.filter_state.from_name('LOCK')
|
||||
self.filter_state.set(lock_state)
|
||||
|
||||
|
||||
def reset(self, check_incomplete=True):
|
||||
if check_incomplete:
|
||||
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
|
||||
raise LockError('reset attempt on {} when state locked'.format(self.state_key))
|
||||
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
|
||||
raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
|
||||
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
|
||||
|
||||
|
||||
def next(self, advance_block=False):
|
||||
v = self.sync_state.state(self.state_key)
|
||||
if v == self.sync_state.DONE:
|
||||
raise SyncDone(self.target)
|
||||
elif v == self.sync_state.NEW:
|
||||
self.sync_state.next(self.state_key)
|
||||
|
||||
v = self.sync_state.get(self.state_key)
|
||||
(block_number, tx_index, target) = sync_state_deserialize(v)
|
||||
if advance_block:
|
||||
block_number += 1
|
||||
tx_index = 0
|
||||
if self.target >= 0 and block_number > self.target:
|
||||
self.sync_state.move(self.state_key, self.sync_state.DONE)
|
||||
raise SyncDone(self.target)
|
||||
else:
|
||||
tx_index += 1
|
||||
|
||||
self.cursor = block_number
|
||||
self.tx_cursor = tx_index
|
||||
|
||||
b = sync_state_serialize(block_number, tx_index, target)
|
||||
self.sync_state.replace(self.state_key, b)
|
||||
|
||||
|
||||
def __find_advance(self):
|
||||
v = self.filter_state.state(self.state_key)
|
||||
|
||||
|
||||
def advance(self, ignore_lock=False):
|
||||
if self.skip_filter:
|
||||
raise FilterDone()
|
||||
self.__check_done()
|
||||
|
||||
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
|
||||
if ignore_lock:
|
||||
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
|
||||
else:
|
||||
raise LockError('advance attempt on {} when state locked'.format(self.state_key))
|
||||
done = False
|
||||
try:
|
||||
self.filter_state.next(self.state_key)
|
||||
except StateInvalid:
|
||||
done = True
|
||||
if done:
|
||||
raise FilterDone()
|
||||
self.filter_state.set(self.state_key, self.filter_state.from_name('LOCK'))
|
||||
|
||||
|
||||
def release(self, interrupt=False):
|
||||
if self.skip_filter:
|
||||
return False
|
||||
if interrupt == True:
|
||||
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
|
||||
self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
|
||||
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
|
||||
return False
|
||||
|
||||
state = self.filter_state.state(self.state_key)
|
||||
if state & self.filter_state.from_name('LOCK') == 0:
|
||||
raise LockError('release attempt on {} when state unlocked'.format(self.state_key))
|
||||
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
|
||||
try:
|
||||
c = self.filter_state.peek(self.state_key)
|
||||
logg.debug('peeked {}'.format(c))
|
||||
except StateInvalid:
|
||||
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return 'syncitem offset {} target {} cursor {}'.format(self.offset, self.target, self.cursor)
|
||||
|
||||
|
||||
class SyncStore:
|
||||
|
||||
def __init__(self, path, session_id=None):
|
||||
self.session_id = session_id
|
||||
self.session_path = None
|
||||
self.is_default = False
|
||||
self.first = False
|
||||
self.target = None
|
||||
self.items = {}
|
||||
self.item_keys = []
|
||||
self.started = False
|
||||
self.thresholds = []
|
||||
self.session_path = path
|
||||
|
||||
|
||||
def setup_sync_state(self, factory=None, event_callback=None):
|
||||
if factory == None:
|
||||
self.state = State(2, event_callback=event_callback)
|
||||
else:
|
||||
self.state = PersistedState(factory.add, 2, event_callback=event_callback)
|
||||
self.state.add('SYNC')
|
||||
self.state.add('DONE')
|
||||
|
||||
|
||||
def setup_filter_state(self, factory=None, event_callback=None):
|
||||
if factory == None:
|
||||
filter_state_backend = State(0, check_alias=False, event_callback=event_callback)
|
||||
self.filter_state = FilterState(filter_state_backend)
|
||||
else:
|
||||
filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback)
|
||||
self.filter_state = FilterState(filter_state_backend, scan=factory.ls)
|
||||
self.filters = []
|
||||
|
||||
|
||||
def set_target(self, v):
|
||||
pass
|
||||
|
||||
|
||||
def get_target(self):
|
||||
return None
|
||||
|
||||
|
||||
def register(self, fltr):
|
||||
self.filters.append(fltr)
|
||||
self.filter_state.register(fltr)
|
||||
|
||||
|
||||
def start(self, offset=0, target=-1, ignore_lock=False):
|
||||
if self.started:
|
||||
return
|
||||
|
||||
self.save_filter_list()
|
||||
|
||||
self.load(target, ignore_lock=ignore_lock)
|
||||
|
||||
if self.first:
|
||||
state_bytes = sync_state_serialize(offset, 0, target)
|
||||
block_number_str = str(offset)
|
||||
self.state.put(block_number_str, contents=state_bytes)
|
||||
self.filter_state.put(block_number_str)
|
||||
o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock)
|
||||
o.resume()
|
||||
self.items[offset] = o
|
||||
self.item_keys.append(offset)
|
||||
elif offset > 0:
|
||||
logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id))
|
||||
self.started = True
|
||||
|
||||
self.item_keys.sort()
|
||||
|
||||
|
||||
def stop(self, item):
|
||||
if item.target == -1:
|
||||
state_bytes = sync_state_serialize(item.cursor, 0, item.cursor)
|
||||
self.state.replace(str(item.offset), state_bytes)
|
||||
self.filter_state.put(str(item.cursor))
|
||||
|
||||
SyncItem(item.offset, -1, self.state, self.filter_state)
|
||||
logg.info('New sync state start at block number {} for next head sync backfill'.format(item.cursor))
|
||||
|
||||
self.state.move(item.state_key, self.state.DONE)
|
||||
|
||||
state_bytes = sync_state_serialize(item.cursor, 0, -1)
|
||||
self.state.put(str(item.cursor), contents=state_bytes)
|
||||
|
||||
|
||||
def load(self, target, ignore_lock=False):
|
||||
self.state.sync(self.state.NEW)
|
||||
self.state.sync(self.state.SYNC)
|
||||
|
||||
thresholds_sync = []
|
||||
for v in self.state.list(self.state.SYNC):
|
||||
block_number = int(v)
|
||||
thresholds_sync.append(block_number)
|
||||
logg.debug('queue resume {}'.format(block_number))
|
||||
thresholds_new = []
|
||||
for v in self.state.list(self.state.NEW):
|
||||
block_number = int(v)
|
||||
thresholds_new.append(block_number)
|
||||
logg.debug('queue new range {}'.format(block_number))
|
||||
|
||||
thresholds_sync.sort()
|
||||
thresholds_new.sort()
|
||||
thresholds = thresholds_sync + thresholds_new
|
||||
lim = len(thresholds) - 1
|
||||
|
||||
for i in range(len(thresholds)):
|
||||
item_target = target
|
||||
if i < lim:
|
||||
item_target = thresholds[i+1]
|
||||
o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock)
|
||||
o.resume()
|
||||
self.items[block_number] = o
|
||||
self.item_keys.append(block_number)
|
||||
logg.info('added existing {}'.format(o))
|
||||
|
||||
self.get_target()
|
||||
|
||||
if len(thresholds) == 0:
|
||||
if self.target != None:
|
||||
logg.warning('sync "{}" is already done, nothing to do'.format(self.session_id))
|
||||
else:
|
||||
logg.info('syncer first run target {}'.format(target))
|
||||
self.first = True
|
||||
self.set_target(target)
|
||||
|
||||
|
||||
def get(self, k):
|
||||
return self.items[k]
|
||||
|
||||
|
||||
def next_item(self):
|
||||
try:
|
||||
k = self.item_keys.pop(0)
|
||||
except IndexError:
|
||||
return None
|
||||
return self.items[k]
|
||||
|
||||
|
||||
def connect(self):
|
||||
self.filter_state.connect()
|
||||
|
||||
|
||||
def disconnect(self):
|
||||
self.filter_state.disconnect()
|
||||
|
||||
|
||||
def save_filter_list(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def load_filter_list(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def peek_next_filter(self):
|
||||
pass
|
||||
|
||||
def peek_current_filter(self):
|
||||
pass
|
@ -1,98 +0,0 @@
|
||||
# standard imports
|
||||
import uuid
|
||||
import os
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from shep.store.file import SimpleFileStoreFactory
|
||||
|
||||
# local imports
|
||||
from chainsyncer.store import SyncStore
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SyncFsStore(SyncStore):
|
||||
|
||||
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
|
||||
super(SyncFsStore, self).__init__(base_path, session_id=session_id)
|
||||
|
||||
create_path = False
|
||||
try:
|
||||
os.stat(self.session_path)
|
||||
except FileNotFoundError:
|
||||
create_path = True
|
||||
|
||||
if create_path:
|
||||
self.__create_path(base_path, self.default_path, session_id=session_id)
|
||||
|
||||
self.session_id = os.path.basename(self.session_path)
|
||||
logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
|
||||
|
||||
base_sync_path = os.path.join(self.session_path, 'sync')
|
||||
factory = SimpleFileStoreFactory(base_sync_path, binary=True)
|
||||
self.setup_sync_state(factory, state_event_callback)
|
||||
|
||||
self.setup_filter_state(callback=filter_state_event_callback)
|
||||
|
||||
|
||||
def setup_filter_state(self, callback=None):
|
||||
base_filter_path = os.path.join(self.session_path, 'filter')
|
||||
factory = SimpleFileStoreFactory(base_filter_path, binary=True)
|
||||
super(SyncFsStore, self).setup_filter_state(factory, callback)
|
||||
|
||||
|
||||
def __create_path(self, base_path, default_path, session_id=None):
|
||||
logg.debug('fs store path {} does not exist, creating'.format(self.session_path))
|
||||
if session_id == None:
|
||||
session_id = str(uuid.uuid4())
|
||||
self.session_path = os.path.join(base_path, session_id)
|
||||
os.makedirs(self.session_path)
|
||||
|
||||
if self.is_default:
|
||||
try:
|
||||
os.symlink(self.session_path, default_path)
|
||||
except FileExistsError:
|
||||
pass
|
||||
|
||||
|
||||
def get_target(self):
|
||||
fp = os.path.join(self.session_path, 'target')
|
||||
try:
|
||||
f = open(fp, 'r')
|
||||
v = f.read()
|
||||
f.close()
|
||||
self.target = int(v)
|
||||
except FileNotFoundError as e:
|
||||
logg.debug('cant find target {} {}'.format(fp, e))
|
||||
pass
|
||||
|
||||
|
||||
def set_target(self, v):
|
||||
fp = os.path.join(self.session_path, 'target')
|
||||
f = open(fp, 'w')
|
||||
f.write(str(v))
|
||||
f.close()
|
||||
self.target = v
|
||||
|
||||
|
||||
def load_filter_list(self):
|
||||
fltr = []
|
||||
fp = os.path.join(self.session_path, 'filter_list')
|
||||
f = open(fp, 'r')
|
||||
while True:
|
||||
v = f.readline()
|
||||
if len(v) == 0:
|
||||
break
|
||||
v = v.rstrip()
|
||||
fltr.append(v)
|
||||
f.close()
|
||||
return fltr
|
||||
|
||||
|
||||
def save_filter_list(self):
|
||||
fp = os.path.join(self.session_path, 'filter_list')
|
||||
f = open(fp, 'w')
|
||||
for fltr in self.filters:
|
||||
f.write(fltr.common_name() + '\n')
|
||||
f.close()
|
@ -1,45 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import os
|
||||
|
||||
# external imports
|
||||
from shep import State
|
||||
|
||||
# local imports
|
||||
from chainsyncer.store import SyncStore
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SyncMemStore(SyncStore):
|
||||
|
||||
def __init__(self, session_id=None, state_event_callback=None, filter_state_event_callback=None):
|
||||
super(SyncMemStore, self).__init__(None, session_id=session_id)
|
||||
|
||||
factory = None
|
||||
self.setup_sync_state(factory, state_event_callback)
|
||||
|
||||
factory = None
|
||||
self.setup_filter_state(factory, filter_state_event_callback)
|
||||
|
||||
|
||||
def set_target(self, v):
|
||||
self.target = int(v)
|
||||
|
||||
|
||||
def get_target(self):
|
||||
return self.target
|
||||
|
||||
|
||||
def stop(self, item):
|
||||
if item != None:
|
||||
super(SyncMemStore, self).stop(item)
|
||||
logg.info('I am an in-memory only state store. I am shutting down now, so all state will now be discarded.')
|
||||
|
||||
|
||||
def save_filter_list(self):
|
||||
pass
|
||||
|
||||
|
||||
def load_filter_list(self):
|
||||
return []
|
@ -1,79 +0,0 @@
|
||||
# standard imports
|
||||
import uuid
|
||||
import os
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from shep.store.rocksdb import RocksDbStoreFactory
|
||||
|
||||
# local imports
|
||||
from chainsyncer.store import (
|
||||
SyncItem,
|
||||
SyncStore,
|
||||
)
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RocksDbStoreAdder:
|
||||
|
||||
def __init__(self, factory, prefix):
|
||||
self.factory = factory
|
||||
self.prefix = prefix
|
||||
|
||||
|
||||
def add(self, k):
|
||||
path = os.path.join(self.prefix, k)
|
||||
return self.factory.add(path)
|
||||
|
||||
|
||||
def ls(self):
|
||||
return self.factory.ls()
|
||||
|
||||
|
||||
class SyncRocksDbStore(SyncStore):
|
||||
|
||||
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
|
||||
super(SyncRocksDbStore, self).__init__(base_path, session_id=session_id)
|
||||
|
||||
self.factory = RocksDbStoreFactory(self.session_path, binary=True)
|
||||
prefix_factory = RocksDbStoreAdder(self.factory, 'sync')
|
||||
self.setup_sync_state(prefix_factory, state_event_callback)
|
||||
|
||||
prefix_factory = RocksDbStoreAdder(self.factory, 'filter')
|
||||
self.setup_filter_state(prefix_factory, filter_state_event_callback)
|
||||
|
||||
#self.session_id = os.path.basename(self.session_path)
|
||||
#logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
|
||||
|
||||
self.target_db = RocksDbStoreAdder(self.factory, '.stat').add('target')
|
||||
|
||||
|
||||
def get_target(self):
|
||||
v = self.target_db.get('target')
|
||||
if v != None:
|
||||
self.target = int(v)
|
||||
|
||||
|
||||
def set_target(self, v):
|
||||
self.target_db.put('target', str(v))
|
||||
self.target = v
|
||||
|
||||
|
||||
def stop(self, item):
|
||||
if item != None:
|
||||
super(SyncRocksDbStore, self).stop(item)
|
||||
self.factory.close()
|
||||
|
||||
|
||||
def save_filter_list(self):
|
||||
fltr = []
|
||||
for v in self.filters:
|
||||
fltr.append(v.common_name())
|
||||
self.target_db.put('filter_list', ','.join(fltr))
|
||||
|
||||
|
||||
def load_filter_list(self):
|
||||
v = self.target_db.get('filter_list')
|
||||
v = v.decode('utf-8')
|
||||
return v.split(',')
|
@ -1 +0,0 @@
|
||||
from .base import *
|
@ -1,73 +1,17 @@
|
||||
# standard imports
|
||||
import os
|
||||
import logging
|
||||
import hashlib
|
||||
|
||||
# external imports
|
||||
from hexathon import add_0x
|
||||
from shep.state import State
|
||||
|
||||
# local imports
|
||||
#from chainsyncer.driver.history import HistorySyncer
|
||||
from chainsyncer.driver.history import HistorySyncer
|
||||
from chainsyncer.error import NoBlockForYou
|
||||
from chainsyncer.driver import SyncDriver
|
||||
|
||||
logging.STATETRACE = 5
|
||||
logg = logging.getLogger().getChild(__name__)
|
||||
|
||||
|
||||
def state_event_handler(k, v_old, v_new):
|
||||
if v_old == None:
|
||||
logg.log(logging.STATETRACE, 'sync state create key {}: -> {}'.format(k, v_new))
|
||||
else:
|
||||
logg.log(logging.STATETRACE, 'sync state change key {}: {} -> {}'.format(k, v_old, v_new))
|
||||
|
||||
|
||||
def filter_state_event_handler(k, v_old, v_new):
|
||||
if v_old == None:
|
||||
logg.log(logging.STATETRACE, 'filter state create key {}: -> {}'.format(k, v_new))
|
||||
else:
|
||||
logg.log(logging.STATETRACE, 'filter state change key {}: {} -> {}'.format(k, v_old, v_new))
|
||||
|
||||
|
||||
class MockFilterError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class MockBlockGenerator:
|
||||
|
||||
def __init__(self, offset=0):
|
||||
self.blocks = {}
|
||||
self.offset = offset
|
||||
self.cursor = offset
|
||||
|
||||
|
||||
def generate(self, spec=[], driver=None):
|
||||
for v in spec:
|
||||
txs = []
|
||||
for i in range(v):
|
||||
tx_hash = os.urandom(32).hex()
|
||||
tx = MockTx(0, tx_hash)
|
||||
txs.append(tx)
|
||||
|
||||
block = MockBlock(self.cursor, txs)
|
||||
self.blocks[self.cursor] = block
|
||||
self.cursor += 1
|
||||
|
||||
if driver != None:
|
||||
self.apply(driver)
|
||||
|
||||
|
||||
def apply(self, driver, offset=0):
|
||||
block_numbers = list(self.blocks.keys())
|
||||
for block_number in block_numbers:
|
||||
if block_number < offset:
|
||||
continue
|
||||
block = self.blocks[block_number]
|
||||
driver.add_block(block)
|
||||
|
||||
|
||||
|
||||
class MockConn:
|
||||
"""Noop connection mocker.
|
||||
|
||||
@ -112,7 +56,6 @@ class MockBlock:
|
||||
"""
|
||||
self.number = number
|
||||
self.txs = txs
|
||||
self.hash = os.urandom(32).hex()
|
||||
|
||||
|
||||
def tx(self, i):
|
||||
@ -121,158 +64,45 @@ class MockBlock:
|
||||
:param i: Transaction index
|
||||
:type i: int
|
||||
"""
|
||||
return MockTx(i, self.txs[i].hash)
|
||||
return MockTx(i, self.txs[i])
|
||||
|
||||
|
||||
class MockStore(State):
|
||||
class TestSyncer(HistorySyncer):
|
||||
"""Unittest extension of history syncer driver.
|
||||
|
||||
def __init__(self, bits=0):
|
||||
super(MockStore, self).__init__(bits, check_alias=False)
|
||||
:param backend: Syncer backend
|
||||
:type backend: chainsyncer.backend.base.Backend implementation
|
||||
:param chain_interface: Chain interface
|
||||
:type chain_interface: chainlib.interface.ChainInterface implementation
|
||||
:param tx_counts: List of integer values defining how many mock transactions to generate per block. Mock blocks will be generated for each element in list.
|
||||
:type tx_counts: list
|
||||
"""
|
||||
|
||||
def __init__(self, backend, chain_interface, tx_counts=[]):
|
||||
self.tx_counts = tx_counts
|
||||
super(TestSyncer, self).__init__(backend, chain_interface)
|
||||
|
||||
|
||||
def start(self, offset=0, target=-1):
|
||||
pass
|
||||
def get(self, conn):
|
||||
"""Implements the block getter of chainsyncer.driver.base.Syncer.
|
||||
|
||||
:param conn: RPC connection
|
||||
:type conn: chainlib.connection.RPCConnection
|
||||
:raises NoBlockForYou: End of mocked block array reached
|
||||
:rtype: chainsyncer.unittest.base.MockBlock
|
||||
:returns: Mock block.
|
||||
"""
|
||||
(pair, fltr) = self.backend.get()
|
||||
(target_block, fltr) = self.backend.target()
|
||||
block_height = pair[0]
|
||||
|
||||
class MockFilter:
|
||||
|
||||
def __init__(self, name, brk=None, brk_hard=None, z=None):
|
||||
self.name = name
|
||||
if z == None:
|
||||
h = hashlib.sha256()
|
||||
h.update(self.name.encode('utf-8'))
|
||||
z = h.digest()
|
||||
self.z = z
|
||||
self.brk = brk
|
||||
self.brk_hard = brk_hard
|
||||
self.contents = []
|
||||
|
||||
|
||||
def sum(self):
|
||||
return self.z
|
||||
|
||||
|
||||
def common_name(self):
|
||||
return self.name
|
||||
|
||||
|
||||
def filter(self, conn, block, tx):
|
||||
r = False
|
||||
if self.brk_hard != None:
|
||||
r = True
|
||||
if self.brk_hard > 0:
|
||||
r = True
|
||||
self.brk_hard -= 1
|
||||
if r:
|
||||
raise MockFilterError()
|
||||
if self.brk != None:
|
||||
if self.brk > 0:
|
||||
r = True
|
||||
self.brk -= 1
|
||||
self.contents.append((block.number, tx.index, tx.hash,))
|
||||
logg.debug('filter {} result {} block {} tx {} {}'.format(self.common_name(), r, block.number, tx.index, tx.hash))
|
||||
return r
|
||||
|
||||
|
||||
class MockDriver(SyncDriver):
|
||||
|
||||
def __init__(self, store, offset=0, target=-1, interrupt_block=None, interrupt_tx=None, interrupt_global=False):
|
||||
super(MockDriver, self).__init__(store, offset=offset, target=target)
|
||||
self.blocks = {}
|
||||
self.interrupt = None
|
||||
if interrupt_block != None:
|
||||
interrupt_block = int(interrupt_block)
|
||||
if interrupt_tx == None:
|
||||
interrupt_tx = 0
|
||||
else:
|
||||
interrupt_tx = int(interrupt_tx)
|
||||
self.interrupt = (interrupt_block, interrupt_tx,)
|
||||
self.interrupt_global = interrupt_global
|
||||
|
||||
|
||||
def add_block(self, block):
|
||||
logg.debug('add block {} {} with {} txs'.format(block.number, block.hash, len(block.txs)))
|
||||
self.blocks[block.number] = block
|
||||
|
||||
|
||||
def get(self, conn, item):
|
||||
try:
|
||||
return self.blocks[item.cursor]
|
||||
except KeyError:
|
||||
if block_height == target_block:
|
||||
self.running = False
|
||||
raise NoBlockForYou()
|
||||
|
||||
block_txs = []
|
||||
if block_height < len(self.tx_counts):
|
||||
for i in range(self.tx_counts[block_height]):
|
||||
block_txs.append(add_0x(os.urandom(32).hex()))
|
||||
|
||||
def process(self, conn, item, block):
|
||||
i = item.tx_cursor
|
||||
while self.running:
|
||||
if self.interrupt != None:
|
||||
if self.interrupt[0] == block.number and self.interrupt[1] == i:
|
||||
logg.info('interrupt triggered at {}'.format(self.interrupt))
|
||||
if self.interrupt_global:
|
||||
SyncDriver.running_global = False
|
||||
self.running = False
|
||||
break
|
||||
tx = block.tx(i)
|
||||
self.process_single(conn, block, tx)
|
||||
item.next()
|
||||
i += 1
|
||||
|
||||
|
||||
class MockChainInterface:
|
||||
|
||||
def block_by_number(self, number):
|
||||
return ('block_by_number', number,)
|
||||
|
||||
|
||||
def tx_by_hash(self, hsh):
|
||||
return ('tx_by_hash', hsh,)
|
||||
|
||||
|
||||
def block_from_src(self, src):
|
||||
return src
|
||||
|
||||
|
||||
def src_normalize(self, src):
|
||||
return src
|
||||
|
||||
|
||||
def tx_receipt(self, hsh):
|
||||
return ('receipt', hsh,)
|
||||
|
||||
|
||||
class MockChainInterfaceConn(MockConn):
|
||||
|
||||
def __init__(self, interface):
|
||||
self.ifc = interface
|
||||
self.blocks = {}
|
||||
self.txs = {}
|
||||
|
||||
|
||||
def add_block(self, block):
|
||||
logg.debug('add block {} {} with {} txs'.format(block.number, block.hash, len(block.txs)))
|
||||
self.blocks[block.number] = block
|
||||
for tx in block.txs:
|
||||
self.txs[tx.hash] = tx
|
||||
|
||||
|
||||
def do(self, o):
|
||||
m = getattr(self, 'handle_' + o[0])
|
||||
return m(o[1])
|
||||
|
||||
|
||||
def handle_block_by_number(self, number):
|
||||
return self.blocks[number]
|
||||
|
||||
|
||||
|
||||
def handle_receipt(self, hsh):
|
||||
return {}
|
||||
|
||||
|
||||
class MockItem:
|
||||
|
||||
def __init__(self, target, offset, cursor, state_key):
|
||||
self.target = target
|
||||
self.offset = offset
|
||||
self.cursor = cursor
|
||||
self.state_key = state_key
|
||||
return MockBlock(block_height, block_txs)
|
||||
|
64
chainsyncer/unittest/db.py
Normal file
64
chainsyncer/unittest/db.py
Normal file
@ -0,0 +1,64 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import os
|
||||
|
||||
# external imports
|
||||
import alembic
|
||||
import alembic.config
|
||||
|
||||
# local imports
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
from chainsyncer.db import dsn_from_config
|
||||
from chainsyncer.db.models.base import SessionBase
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChainSyncerDb:
|
||||
"""SQLITE database setup for unit tests
|
||||
|
||||
:param debug: Activate sql level debug (outputs sql statements)
|
||||
:type debug: bool
|
||||
"""
|
||||
|
||||
base = SessionBase
|
||||
|
||||
def __init__(self, debug=False):
|
||||
config = {
|
||||
'DATABASE_ENGINE': 'sqlite',
|
||||
'DATABASE_DRIVER': 'pysqlite',
|
||||
'DATABASE_NAME': 'chainsyncer.sqlite',
|
||||
}
|
||||
logg.debug('config {}'.format(config))
|
||||
|
||||
self.dsn = dsn_from_config(config)
|
||||
|
||||
self.base.poolable = False
|
||||
self.base.transactional = False
|
||||
self.base.procedural = False
|
||||
self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0
|
||||
|
||||
rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..')
|
||||
dbdir = os.path.join(rootdir, 'chainsyncer', 'db')
|
||||
#migrationsdir = os.path.join(dbdir, 'migrations', config.get('DATABASE_ENGINE'))
|
||||
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
|
||||
logg.info('using migrations directory {}'.format(migrationsdir))
|
||||
|
||||
ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini'))
|
||||
ac.set_main_option('sqlalchemy.url', self.dsn)
|
||||
ac.set_main_option('script_location', migrationsdir)
|
||||
|
||||
alembic.command.downgrade(ac, 'base')
|
||||
alembic.command.upgrade(ac, 'head')
|
||||
|
||||
|
||||
def bind_session(self, session=None):
|
||||
"""Create session using underlying session base
|
||||
"""
|
||||
return self.base.bind_session(session)
|
||||
|
||||
|
||||
def release_session(self, session=None):
|
||||
"""Release session using underlying session base
|
||||
"""
|
||||
return self.base.release_session(session)
|
@ -1,301 +0,0 @@
|
||||
# standard imports
|
||||
import os
|
||||
import stat
|
||||
import unittest
|
||||
import shutil
|
||||
import tempfile
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
# local imports
|
||||
from chainsyncer.session import SyncSession
|
||||
from chainsyncer.error import (
|
||||
LockError,
|
||||
FilterDone,
|
||||
IncompleteFilterError,
|
||||
SyncDone,
|
||||
)
|
||||
from chainsyncer.unittest import (
|
||||
MockFilter,
|
||||
MockItem,
|
||||
)
|
||||
|
||||
logging.STATETRACE = 5
|
||||
logg = logging.getLogger(__name__)
|
||||
logg.setLevel(logging.STATETRACE)
|
||||
|
||||
|
||||
def state_change_callback(k, old_state, new_state):
|
||||
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
|
||||
|
||||
|
||||
def filter_change_callback(k, old_state, new_state):
|
||||
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
|
||||
|
||||
|
||||
class TestStoreBase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.base_path = tempfile.mkdtemp()
|
||||
self.session_id = str(uuid.uuid4())
|
||||
self.path = os.path.join(self.base_path, self.session_id)
|
||||
os.makedirs(self.path)
|
||||
self.store_factory = None
|
||||
self.persist = True
|
||||
|
||||
|
||||
@classmethod
|
||||
def link(cls, target):
|
||||
for v in [
|
||||
"default",
|
||||
"store_start",
|
||||
"store_resume",
|
||||
"filter_list",
|
||||
"sync_process_nofilter",
|
||||
"sync_process_onefilter",
|
||||
"sync_process_outoforder",
|
||||
"sync_process_interrupt",
|
||||
"sync_process_reset",
|
||||
"sync_process_done",
|
||||
"sync_head_future",
|
||||
"sync_history_interrupted",
|
||||
"sync_history_complete",
|
||||
]:
|
||||
setattr(target, 'test_' + v, getattr(cls, 't_' + v))
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.path)
|
||||
|
||||
|
||||
def t_default(self):
|
||||
bogus_item = MockItem(0, 0, 0, 0)
|
||||
store = self.store_factory()
|
||||
|
||||
if store.session_path == None:
|
||||
return
|
||||
|
||||
#fp = os.path.join(self.path, store.session_id)
|
||||
fp = self.path
|
||||
session_id = store.session_id
|
||||
st = None
|
||||
st = os.stat(fp)
|
||||
|
||||
if st != None:
|
||||
self.assertTrue(stat.S_ISDIR(st.st_mode))
|
||||
#self.assertTrue(store.is_default)
|
||||
|
||||
store.stop(bogus_item)
|
||||
store = self.store_factory()
|
||||
fpr = os.path.join(self.path, self.session_id)
|
||||
self.assertEqual(fp, self.path)
|
||||
|
||||
|
||||
def t_store_start(self):
|
||||
bogus_item = MockItem(0, 0, 0, 0)
|
||||
store = self.store_factory()
|
||||
store.start(42)
|
||||
self.assertTrue(store.first)
|
||||
|
||||
store.stop(bogus_item)
|
||||
|
||||
if self.persist:
|
||||
store = self.store_factory()
|
||||
store.start()
|
||||
self.assertFalse(store.first)
|
||||
|
||||
|
||||
def t_store_resume(self):
|
||||
store = self.store_factory()
|
||||
store.start(13)
|
||||
self.assertTrue(store.first)
|
||||
# todo not done
|
||||
|
||||
|
||||
def t_sync_process_nofilter(self):
|
||||
store = self.store_factory()
|
||||
session = SyncSession(store)
|
||||
session.start()
|
||||
o = session.get(0)
|
||||
with self.assertRaises(FilterDone):
|
||||
o.advance()
|
||||
|
||||
|
||||
def t_sync_process_onefilter(self):
|
||||
store = self.store_factory()
|
||||
session = SyncSession(store)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
store.register(fltr_one)
|
||||
|
||||
session.start()
|
||||
o = session.get(0)
|
||||
o.advance()
|
||||
o.release()
|
||||
|
||||
|
||||
def t_sync_process_outoforder(self):
|
||||
store = self.store_factory()
|
||||
session = SyncSession(store)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
store.register(fltr_one)
|
||||
fltr_two = MockFilter('two')
|
||||
store.register(fltr_two)
|
||||
|
||||
session.start()
|
||||
o = session.get(0)
|
||||
o.advance()
|
||||
with self.assertRaises(LockError):
|
||||
o.advance()
|
||||
|
||||
o.release()
|
||||
with self.assertRaises(LockError):
|
||||
o.release()
|
||||
|
||||
o.advance()
|
||||
o.release()
|
||||
|
||||
|
||||
def t_sync_process_interrupt(self):
|
||||
store = self.store_factory()
|
||||
session = SyncSession(store)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
store.register(fltr_one)
|
||||
fltr_two = MockFilter('bar')
|
||||
store.register(fltr_two)
|
||||
|
||||
session.start()
|
||||
o = session.get(0)
|
||||
o.advance()
|
||||
o.release(interrupt=True)
|
||||
with self.assertRaises(FilterDone):
|
||||
o.advance()
|
||||
|
||||
|
||||
def t_sync_process_reset(self):
|
||||
store = self.store_factory()
|
||||
session = SyncSession(store)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
store.register(fltr_one)
|
||||
fltr_two = MockFilter('bar')
|
||||
store.register(fltr_two)
|
||||
|
||||
session.start()
|
||||
o = session.get(0)
|
||||
o.advance()
|
||||
with self.assertRaises(LockError):
|
||||
o.reset()
|
||||
o.release()
|
||||
with self.assertRaises(IncompleteFilterError):
|
||||
o.reset()
|
||||
|
||||
o.advance()
|
||||
o.release()
|
||||
|
||||
with self.assertRaises(FilterDone):
|
||||
o.advance()
|
||||
|
||||
o.reset()
|
||||
|
||||
|
||||
def t_sync_process_done(self):
|
||||
store = self.store_factory()
|
||||
session = SyncSession(store)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
store.register(fltr_one)
|
||||
|
||||
session.start(target=0)
|
||||
o = session.get(0)
|
||||
o.advance()
|
||||
o.release()
|
||||
with self.assertRaises(FilterDone):
|
||||
o.advance()
|
||||
o.reset()
|
||||
with self.assertRaises(SyncDone):
|
||||
o.next(advance_block=True)
|
||||
|
||||
|
||||
def t_sync_head_future(self):
|
||||
store = self.store_factory('foo')
|
||||
session = SyncSession(store)
|
||||
|
||||
session.start()
|
||||
o = session.get(0)
|
||||
o.next(advance_block=True)
|
||||
o.next(advance_block=True)
|
||||
session.stop(o)
|
||||
|
||||
if self.persist:
|
||||
store = self.store_factory('foo')
|
||||
store.start()
|
||||
o = store.get(2)
|
||||
|
||||
|
||||
def t_sync_history_interrupted(self):
|
||||
if not self.persist:
|
||||
return
|
||||
|
||||
bogus_item = MockItem(0, 0, 0, 0)
|
||||
store = self.store_factory('foo')
|
||||
session = SyncSession(store)
|
||||
|
||||
session.start(target=13)
|
||||
o = session.get(0)
|
||||
o.next(advance_block=True)
|
||||
o.next(advance_block=True)
|
||||
session.stop(o)
|
||||
|
||||
store.stop(bogus_item)
|
||||
store = self.store_factory('foo')
|
||||
store.start()
|
||||
o = store.get(0)
|
||||
self.assertEqual(o.cursor, 2)
|
||||
self.assertEqual(o.target, 13)
|
||||
o.next(advance_block=True)
|
||||
o.next(advance_block=True)
|
||||
|
||||
store.stop(bogus_item)
|
||||
store = self.store_factory('foo')
|
||||
store.start()
|
||||
self.assertEqual(o.cursor, 4)
|
||||
self.assertEqual(o.target, 13)
|
||||
|
||||
|
||||
def t_sync_history_complete(self):
|
||||
store = self.store_factory('foo')
|
||||
session = SyncSession(store)
|
||||
|
||||
session.start(target=3)
|
||||
o = session.get(0)
|
||||
o.next(advance_block=True)
|
||||
o.next(advance_block=True)
|
||||
o.next(advance_block=True)
|
||||
with self.assertRaises(SyncDone):
|
||||
o.next(advance_block=True)
|
||||
|
||||
|
||||
def t_filter_list(self):
|
||||
bogus_item = MockItem(0, 0, 0, 0)
|
||||
store = self.store_factory()
|
||||
|
||||
if store.session_path == None:
|
||||
return
|
||||
|
||||
fltr_one = MockFilter('foo_bar')
|
||||
store.register(fltr_one)
|
||||
|
||||
fltr_two = MockFilter('bar_baz')
|
||||
store.register(fltr_two)
|
||||
|
||||
store.start()
|
||||
store.stop(bogus_item)
|
||||
|
||||
store = self.store_factory()
|
||||
r = store.load_filter_list()
|
||||
|
||||
self.assertEqual(r[0], 'foo_bar')
|
||||
self.assertEqual(r[1], 'bar_baz')
|
@ -1,5 +1,5 @@
|
||||
confini~=0.6.0
|
||||
semver==2.13.0
|
||||
hexathon~=0.1.5
|
||||
chainlib~=0.1.1
|
||||
shep~=0.2.3
|
||||
chainlib>=0.1.0b1,<=0.1.0
|
||||
shep~=0.0.1
|
||||
|
@ -8,12 +8,5 @@ for f in `ls tests/*.py`; do
|
||||
exit
|
||||
fi
|
||||
done
|
||||
|
||||
for f in `ls tests/store/*.py`; do
|
||||
python $f
|
||||
if [ $? -gt 0 ]; then
|
||||
exit
|
||||
fi
|
||||
done
|
||||
set +x
|
||||
set +e
|
||||
|
21
setup.cfg
21
setup.cfg
@ -1,10 +1,10 @@
|
||||
[metadata]
|
||||
name = chainsyncer
|
||||
version = 0.4.1
|
||||
version = 0.3.0
|
||||
description = Generic blockchain syncer driver
|
||||
author = Louis Holbrook
|
||||
author_email = dev@holbrook.no
|
||||
url = https://gitlab.com/chaintool/chainsyncer
|
||||
url = https://gitlab.com/chaintools/chainsyncer
|
||||
keywords =
|
||||
cryptocurrency
|
||||
classifiers =
|
||||
@ -25,17 +25,16 @@ include_package_data = True
|
||||
python_requires = >= 3.6
|
||||
packages =
|
||||
chainsyncer
|
||||
chainsyncer.db
|
||||
chainsyncer.db.models
|
||||
chainsyncer.backend
|
||||
chainsyncer.driver
|
||||
chainsyncer.unittest
|
||||
chainsyncer.store
|
||||
chainsyncer.cli
|
||||
chainsyncer.runnable
|
||||
|
||||
#[options.package_data]
|
||||
#* =
|
||||
# sql/*
|
||||
[options.package_data]
|
||||
* =
|
||||
sql/*
|
||||
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
#[options.entry_points]
|
||||
#console_scripts =
|
||||
# blocksync-celery = chainsyncer.runnable.tracker:main
|
||||
chainsyncer-unlock = chainsyncer.runnable.unlock:main
|
||||
|
2
setup.py
2
setup.py
@ -26,7 +26,5 @@ setup(
|
||||
install_requires=requirements,
|
||||
extras_require={
|
||||
'sql': sql_requirements,
|
||||
'rocksdb': ['shep[rocksdb]~=0.2.2'],
|
||||
'redis': ['shep[redis]~=0.2.2'],
|
||||
}
|
||||
)
|
||||
|
@ -1,33 +0,0 @@
|
||||
# standard imports
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from shep import State
|
||||
|
||||
# local imports
|
||||
from chainsyncer.store.mem import SyncMemStore
|
||||
from chainsyncer.unittest.store import TestStoreBase
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class StoreFactory:
|
||||
|
||||
def create(self, session_id=None):
|
||||
return SyncMemStore(session_id=session_id)
|
||||
|
||||
|
||||
class TestMem(TestStoreBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestMem, self).setUp()
|
||||
self.store_factory = StoreFactory().create
|
||||
self.persist = False
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
TestStoreBase.link(TestMem)
|
||||
# Remove tests that test persistence of state
|
||||
unittest.main()
|
@ -1,32 +0,0 @@
|
||||
# standard imports
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from chainsyncer.store.fs import SyncFsStore
|
||||
from chainsyncer.unittest.store import TestStoreBase
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class StoreFactory:
|
||||
|
||||
def __init__(self, path):
|
||||
self.path = path
|
||||
|
||||
|
||||
def create(self, session_id=None):
|
||||
return SyncFsStore(self.path, session_id=session_id)
|
||||
|
||||
|
||||
class TestFs(TestStoreBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestFs, self).setUp()
|
||||
self.store_factory = StoreFactory(self.path).create
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
TestStoreBase.link(TestFs)
|
||||
unittest.main()
|
@ -1,35 +0,0 @@
|
||||
# standard imports
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from chainsyncer.store.rocksdb import SyncRocksDbStore
|
||||
from chainsyncer.unittest.store import (
|
||||
TestStoreBase,
|
||||
filter_change_callback,
|
||||
state_change_callback,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logg = logging.getLogger()
|
||||
|
||||
class StoreFactory:
|
||||
|
||||
def __init__(self, path):
|
||||
self.path = path
|
||||
|
||||
|
||||
def create(self, session_id=None):
|
||||
return SyncRocksDbStore(self.path, session_id=session_id, state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
|
||||
|
||||
|
||||
class TestRocksDb(TestStoreBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRocksDb, self).setUp()
|
||||
self.store_factory = StoreFactory(self.path).create
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
TestStoreBase.link(TestRocksDb)
|
||||
unittest.main()
|
@ -1,75 +1,72 @@
|
||||
# standard imports
|
||||
# standard imporst
|
||||
import unittest
|
||||
import tempfile
|
||||
import shutil
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from shep import State
|
||||
|
||||
# local imports
|
||||
from chainsyncer.state import SyncState
|
||||
from chainsyncer.session import SyncSession
|
||||
from chainsyncer.filter import FilterState
|
||||
from chainsyncer.store.fs import SyncFsStore
|
||||
from chainsyncer.unittest import (
|
||||
MockStore,
|
||||
MockFilter,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logg = logging.getLogger()
|
||||
|
||||
class MockStore(State):
|
||||
|
||||
def __init__(self, bits):
|
||||
super(MockStore, self).__init__(bits, check_alias=False)
|
||||
|
||||
|
||||
class MockFilter:
|
||||
|
||||
def __init__(self, z, name):
|
||||
self.z = z
|
||||
self.name = name
|
||||
|
||||
|
||||
def sum(self):
|
||||
return self.z
|
||||
|
||||
|
||||
def common_name(self):
|
||||
return self.name
|
||||
|
||||
|
||||
class TestSync(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.path = tempfile.mkdtemp()
|
||||
self.store = SyncFsStore(self.path)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.path)
|
||||
self.store = MockStore(6)
|
||||
self.state = SyncState(self.store)
|
||||
|
||||
|
||||
def test_basic(self):
|
||||
store = MockStore(6)
|
||||
state = FilterState(store)
|
||||
session = SyncSession(state)
|
||||
session = SyncSession(self.state)
|
||||
self.assertTrue(session.is_default)
|
||||
|
||||
session = SyncSession(self.state, session_id='foo')
|
||||
self.assertFalse(session.is_default)
|
||||
|
||||
|
||||
def test_sum(self):
|
||||
store = MockStore(6)
|
||||
state = FilterState(store)
|
||||
|
||||
b = b'\x2a' * 32
|
||||
fltr = MockFilter('foo', z=b)
|
||||
state.register(fltr)
|
||||
fltr = MockFilter(b, name='foo')
|
||||
self.state.register(fltr)
|
||||
|
||||
b = b'\x0d' * 31
|
||||
fltr = MockFilter('bar', z=b)
|
||||
fltr = MockFilter(b, name='bar')
|
||||
with self.assertRaises(ValueError):
|
||||
state.register(fltr)
|
||||
self.state.register(fltr)
|
||||
|
||||
b = b'\x0d' * 32
|
||||
fltr = MockFilter('bar', z=b)
|
||||
state.register(fltr)
|
||||
fltr = MockFilter(b, name='bar')
|
||||
self.state.register(fltr)
|
||||
|
||||
v = state.sum()
|
||||
v = self.state.sum()
|
||||
self.assertEqual(v.hex(), 'a24abf9fec112b4e0210ae874b4a371f8657b1ee0d923ad6d974aef90bad8550')
|
||||
|
||||
|
||||
def test_session_start(self):
|
||||
store = MockStore(6)
|
||||
state = FilterState(store)
|
||||
session = SyncSession(state)
|
||||
session = SyncSession(self.state)
|
||||
session.start()
|
||||
|
||||
|
||||
def test_state_dynamic(self):
|
||||
store = MockStore()
|
||||
state = FilterState(store)
|
||||
|
||||
b = b'\x0d' * 32
|
||||
fltr = MockFilter(name='foo', z=b)
|
||||
state.register(fltr)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -1,61 +0,0 @@
|
||||
# standard imports
|
||||
import unittest
|
||||
import tempfile
|
||||
import shutil
|
||||
import logging
|
||||
import stat
|
||||
import os
|
||||
|
||||
# local imports
|
||||
from chainsyncer.store.fs import SyncFsStore
|
||||
from chainsyncer.session import SyncSession
|
||||
from chainsyncer.error import (
|
||||
LockError,
|
||||
FilterDone,
|
||||
IncompleteFilterError,
|
||||
SyncDone,
|
||||
)
|
||||
from chainsyncer.unittest import (
|
||||
MockBlockGenerator,
|
||||
MockFilter,
|
||||
MockChainInterfaceConn,
|
||||
MockTx,
|
||||
MockBlock,
|
||||
MockChainInterface,
|
||||
MockFilterError,
|
||||
)
|
||||
from chainsyncer.driver.chain_interface import ChainInterfaceDriver
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class TestFilter(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.path = tempfile.mkdtemp()
|
||||
self.store = SyncFsStore(self.path)
|
||||
self.ifc = MockChainInterface()
|
||||
self.conn = MockChainInterfaceConn(self.ifc)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.path)
|
||||
|
||||
|
||||
def test_driver(self):
|
||||
generator = MockBlockGenerator()
|
||||
generator.generate([1, 2], driver=self.conn)
|
||||
|
||||
drv = ChainInterfaceDriver(self.store, self.ifc, target=1)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
self.store.register(fltr_one)
|
||||
with self.assertRaises(SyncDone):
|
||||
drv.run(self.conn)
|
||||
|
||||
self.assertEqual(len(fltr_one.contents), 3)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
@ -1,78 +0,0 @@
|
||||
# standard imports
|
||||
import unittest
|
||||
import tempfile
|
||||
import shutil
|
||||
import logging
|
||||
import stat
|
||||
import os
|
||||
|
||||
# local imports
|
||||
from chainsyncer.store.fs import SyncFsStore
|
||||
from chainsyncer.session import SyncSession
|
||||
from chainsyncer.error import (
|
||||
LockError,
|
||||
FilterDone,
|
||||
IncompleteFilterError,
|
||||
)
|
||||
from chainsyncer.unittest import (
|
||||
MockFilter,
|
||||
MockConn,
|
||||
MockTx,
|
||||
MockBlock,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class TestFilter(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.path = tempfile.mkdtemp()
|
||||
self.store = SyncFsStore(self.path)
|
||||
self.session = SyncSession(self.store)
|
||||
self.conn = MockConn()
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.path)
|
||||
|
||||
|
||||
def test_filter_basic(self):
|
||||
fltr_one = MockFilter('foo')
|
||||
self.store.register(fltr_one)
|
||||
fltr_two = MockFilter('bar')
|
||||
self.store.register(fltr_two)
|
||||
|
||||
self.session.start()
|
||||
|
||||
tx_hash = os.urandom(32).hex()
|
||||
tx = MockTx(42, tx_hash)
|
||||
block = MockBlock(13, [tx_hash])
|
||||
self.session.filter(self.conn, block, tx)
|
||||
|
||||
self.assertEqual(len(fltr_one.contents), 1)
|
||||
self.assertEqual(len(fltr_two.contents), 1)
|
||||
|
||||
|
||||
|
||||
def test_filter_interrupt(self):
|
||||
fltr_one = MockFilter('foo', brk=True)
|
||||
self.store.register(fltr_one)
|
||||
fltr_two = MockFilter('bar')
|
||||
self.store.register(fltr_two)
|
||||
|
||||
self.session.start()
|
||||
|
||||
tx_hash = os.urandom(32).hex()
|
||||
tx = MockTx(42, tx_hash)
|
||||
block = MockBlock(13, [tx_hash])
|
||||
self.session.filter(self.conn, block, tx)
|
||||
|
||||
self.assertEqual(len(fltr_one.contents), 1)
|
||||
self.assertEqual(len(fltr_two.contents), 0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -1,155 +0,0 @@
|
||||
# standard imports
|
||||
import unittest
|
||||
import tempfile
|
||||
import shutil
|
||||
import logging
|
||||
import stat
|
||||
import os
|
||||
|
||||
# local imports
|
||||
from chainsyncer.store.fs import SyncFsStore
|
||||
from chainsyncer.session import SyncSession
|
||||
from chainsyncer.error import (
|
||||
LockError,
|
||||
FilterDone,
|
||||
IncompleteFilterError,
|
||||
SyncDone,
|
||||
)
|
||||
from chainsyncer.unittest import (
|
||||
MockBlockGenerator,
|
||||
MockFilter,
|
||||
MockConn,
|
||||
MockTx,
|
||||
MockBlock,
|
||||
MockDriver,
|
||||
MockFilterError,
|
||||
state_event_handler,
|
||||
filter_state_event_handler,
|
||||
)
|
||||
from chainsyncer.driver import SyncDriver
|
||||
|
||||
logging.basicConfig(level=logging.STATETRACE)
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class TestFilter(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.path = tempfile.mkdtemp()
|
||||
self.store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
|
||||
self.conn = MockConn()
|
||||
|
||||
|
||||
# def tearDown(self):
|
||||
# shutil.rmtree(self.path)
|
||||
|
||||
|
||||
def test_filter_basic(self):
|
||||
session = SyncSession(self.store)
|
||||
session.start(target=1)
|
||||
fltr_one = MockFilter('foo')
|
||||
self.store.register(fltr_one)
|
||||
|
||||
tx_hash = os.urandom(32).hex()
|
||||
tx = MockTx(42, tx_hash)
|
||||
block = MockBlock(0, [tx_hash])
|
||||
session.filter(self.conn, block, tx)
|
||||
|
||||
tx_hash = os.urandom(32).hex()
|
||||
tx = MockTx(42, tx_hash)
|
||||
block = MockBlock(1, [tx_hash])
|
||||
session.filter(self.conn, block, tx)
|
||||
self.assertEqual(len(fltr_one.contents), 2)
|
||||
|
||||
|
||||
def test_driver(self):
|
||||
drv = MockDriver(self.store, target=1)
|
||||
generator = MockBlockGenerator()
|
||||
generator.generate([1, 2], driver=drv)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
self.store.register(fltr_one)
|
||||
with self.assertRaises(SyncDone):
|
||||
drv.run(self.conn)
|
||||
|
||||
self.assertEqual(len(fltr_one.contents), 3)
|
||||
|
||||
|
||||
def test_driver_interrupt_noresume(self):
|
||||
drv = MockDriver(self.store, target=1)
|
||||
generator = MockBlockGenerator()
|
||||
generator.generate([1], driver=drv)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
self.store.register(fltr_one)
|
||||
fltr_two = MockFilter('bar', brk_hard=1)
|
||||
self.store.register(fltr_two)
|
||||
|
||||
with self.assertRaises(MockFilterError):
|
||||
drv.run(self.conn)
|
||||
|
||||
self.assertEqual(len(fltr_one.contents), 1)
|
||||
self.assertEqual(len(fltr_two.contents), 0)
|
||||
|
||||
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
|
||||
|
||||
fltr_one = MockFilter('foo') #, brk_hard=1)
|
||||
store.register(fltr_one)
|
||||
fltr_two = MockFilter('bar')
|
||||
store.register(fltr_two)
|
||||
|
||||
with self.assertRaises(LockError):
|
||||
drv = MockDriver(store, target=1)
|
||||
|
||||
self.assertEqual(len(fltr_one.contents), 0)
|
||||
self.assertEqual(len(fltr_two.contents), 0)
|
||||
|
||||
|
||||
def test_driver_interrupt_filter(self):
|
||||
drv = MockDriver(self.store, target=1)
|
||||
generator = MockBlockGenerator()
|
||||
generator.generate([1, 1], driver=drv)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
self.store.register(fltr_one)
|
||||
fltr_two = MockFilter('bar', brk=1)
|
||||
self.store.register(fltr_two)
|
||||
fltr_three = MockFilter('baz')
|
||||
self.store.register(fltr_three)
|
||||
|
||||
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
|
||||
|
||||
with self.assertRaises(SyncDone):
|
||||
drv.run(self.conn)
|
||||
|
||||
self.assertEqual(len(fltr_one.contents), 2)
|
||||
self.assertEqual(len(fltr_two.contents), 2)
|
||||
self.assertEqual(len(fltr_three.contents), 1)
|
||||
|
||||
|
||||
def test_driver_interrupt_sync(self):
|
||||
drv = MockDriver(self.store, interrupt_block=1, target=2)
|
||||
generator = MockBlockGenerator()
|
||||
generator.generate([3, 1, 2], driver=drv)
|
||||
|
||||
fltr_one = MockFilter('foo')
|
||||
self.store.register(fltr_one)
|
||||
|
||||
drv.run(self.conn, interval=0.1)
|
||||
|
||||
self.assertEqual(len(fltr_one.contents), 3)
|
||||
|
||||
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
|
||||
store.register(fltr_one)
|
||||
drv = MockDriver(store)
|
||||
generator.apply(drv, offset=1)
|
||||
|
||||
with self.assertRaises(SyncDone) as e:
|
||||
drv.run(self.conn, interval=0.1)
|
||||
self.assertEqual(e, 2)
|
||||
|
||||
self.assertEqual(len(fltr_one.contents), 6)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user