Compare commits

..

66 Commits

Author SHA1 Message Date
lash
f039d6c9ad
Correct runnable package name 2022-04-28 15:36:04 +00:00
lash
58787b3884
Add unlock cli tool to setup 2022-04-28 15:30:45 +00:00
lash
b192dd6e95
Add resume of filter in syncitem 2022-04-28 12:35:18 +00:00
lash
ca1441d50d
WIP safe access to unlocking sync with tool 2022-04-28 08:15:04 +00:00
lash
ca82ea247f
Filter list persistencE 2022-04-28 06:45:59 +00:00
lash
384c79bed0
Remove session id path generation 2022-04-28 06:10:43 +00:00
lash
36acf3f09a
WIP more work on lock cli tool 2022-04-27 09:59:40 +00:00
lash
3a2317d253
WIP add lock cli tool 2022-04-27 09:43:57 +00:00
lash
6647e11df5
Add cli args handler and settings processor 2022-04-27 05:04:13 +00:00
lash
4e7c0f0d73
Add settings renderer, cli flag and config handling 2022-04-26 19:07:34 +00:00
lash
b5617fc9fb
Upgrade shep 2022-04-26 08:16:38 +00:00
lash
044e85fb99
Allow memory-only syncing 2022-04-26 07:56:04 +00:00
lash
927913bd02
Check explicit for bool in filter interrupt check 2022-04-25 06:28:42 +00:00
lash
290fa1844d
Add chainsyncer extras 2022-04-24 21:20:41 +00:00
lash
b6ed8d7d8f
Remove loglines 2022-04-24 20:52:11 +00:00
lash
4905fe4fc2
Upgrade shep 2022-04-24 20:47:09 +00:00
lash
6d9d0f0462
Update deps 2022-04-20 19:01:07 +00:00
lash
2c206078ff
Remove deleted module from setup 2022-04-20 16:38:04 +00:00
lash
05898a7e00
Complete rocksdb test 2022-04-20 16:36:06 +00:00
lash
4bda7522ab
Move store tests to separate dir, run last 2022-04-20 15:28:12 +00:00
lash
d27bcaa9f5
Factor out common store tests, implement for fs and rocksdb 2022-04-20 15:15:43 +00:00
lash
197560ae16
Implement rocksdb and default test 2022-04-20 14:27:59 +00:00
lash
f385b26e1e
Remove state module, move filterstate to filter module 2022-04-20 13:17:38 +00:00
lash
b7957b8a0b
Rename syncstate to filterstate 2022-04-20 12:58:50 +00:00
lash
9aded5c561
Factor out target state 2022-04-20 12:55:43 +00:00
lash
97c2d41df3
Factor out sync state scanner 2022-04-20 12:31:32 +00:00
lash
891f90ae5f
WIP refactor to allow other backends 2022-04-20 11:59:12 +00:00
lash
f521162e7b
Move to SYNC state after start 2022-04-10 15:27:14 +00:00
lash
9be67bd78a Revert "Bump shep"
This reverts commit 1c92b97799.
2022-04-09 19:06:06 +00:00
lash
1c92b97799
Bump shep 2022-04-09 19:05:01 +00:00
lash
9fcd85927b
Bump version 2022-04-09 19:03:51 +00:00
lash
5f298cb804
Graceful shutdown of driver 2022-04-02 11:21:58 +00:00
lash
f4c6936517
Detect done sync in store start function 2022-04-02 07:33:12 +00:00
lash
9758ade3d5
Remove unused files 2022-03-30 08:22:05 +00:00
lash
8df2a03d5a
Add belated changelog 2022-03-30 08:14:42 +00:00
lash
af6eedf87e
Set defaults for common name and sum in filter 2022-03-30 08:13:34 +00:00
lash
8527901e6c
Add chain interface driver 2022-03-30 06:55:21 +00:00
lash
e8decb9cb7
Add filter counts in session tests, finish sync interrupt test 2022-03-30 05:25:26 +00:00
lash
18f9b9bd1f
complete test for sync resume 2022-03-29 12:56:15 +00:00
lash
7078adaf7e
WIP implement sync done in resume sync for mock driver 2022-03-29 11:28:37 +00:00
lash
ecb123f495
WIP implement resume sync test 2022-03-29 08:12:43 +00:00
lash
55e30eb13b
Add mock block generator 2022-03-29 07:28:28 +00:00
lash
61d7ee49f3
Soft interrupt tests 2022-03-29 06:56:21 +00:00
lash
ce945ae56e
Complete test for lock on interrupted filter 2022-03-23 23:36:17 +00:00
lash
a3517d0203
WIP session filter interrupt test 2022-03-21 21:03:24 +00:00
lash
045c120439
Remove dead code 2022-03-19 02:47:25 +00:00
lash
8130d28e27
Remove commented code 2022-03-19 01:59:38 +00:00
lash
80f9a8be88
Rehabilitate sync driver on changes target handling 2022-03-19 01:58:13 +00:00
lash
36a8609cb5
Short-circuit syncdone on sync state done in item next 2022-03-19 01:25:24 +00:00
lash
755a030175
Syncitem sync state done on last next 2022-03-19 01:24:08 +00:00
lash
2b5383e9e0
Correct first target state filename 2022-03-19 01:13:37 +00:00
lash
43249a9ec0
Move serialize code block 2022-03-19 01:04:43 +00:00
lash
41e00449f8
correct serialization on next block in sync item 2022-03-19 01:03:49 +00:00
lash
7ff4e8faa0
Add target serialization to first state 2022-03-19 00:59:55 +00:00
lash
5f2809c394
Correct sync states of live sync ends 2022-03-19 00:52:47 +00:00
lash
e48b62679d
Replace filter execution control with bools instead of exceptions 2022-03-18 19:12:07 +00:00
lash
75a6d2975e
Implement driver processing 2022-03-18 01:11:30 +00:00
lash
78bd6ca538
Move filter registration to session store 2022-03-18 00:02:18 +00:00
lash
18f16d878f
Introduce driver object 2022-03-17 23:48:23 +00:00
lash
dcf095cc86
Complete syncitem filter advance 2022-03-17 22:07:19 +00:00
lash
5968a19042
Implement filter state per sync item 2022-03-17 19:36:27 +00:00
lash
9386b9e7f9
Fs syncer 2022-03-17 14:54:34 +00:00
lash
2d14515d34
Repair after merge 2022-03-17 10:16:55 +00:00
lash
58e983efcc Merge branch 'dev-0.3.0' into lash/shep 2022-03-17 10:12:00 +00:00
lash
bf0b2eb5a5 Revert "WIP shep state defs"
This reverts commit 2ba87de195.
2022-03-17 10:10:37 +00:00
lash
af47e31cc8
New filter interface, add state step stubs 2022-03-17 10:09:12 +00:00
44 changed files with 2096 additions and 986 deletions

View File

@ -1,4 +1,19 @@
- 0.3.0
* Re-implement filter state handling on shep
- 0.1.0
* Add deferred idle handling
* 0.3.7
- Remove hard eth dependency in settings rendering
- Add unlock cli tool
* 0.3.6
- Add cli arg processing and settings renderer
* 0.3.5
- Allow memory-only shep if factory set to None in store constructor
* 0.3.4
- Use explicit bool check in filter interrupt check
* 0.3.3
- Include shep persistent state bootstrap sync
- Add chainsyncer extras
* 0.3.2
- Implement rocksdb backend
* 0.3.1
- Upgrade to release shep version
- Move sync state to SYNC after start
* 0.3.0
- Re-implement chainsyncer on shep

View File

@ -1 +1 @@
include *requirements.txt LICENSE.txt chainsyncer/db/migrations/default/* chainsyncer/db/migrations/default/versions/* chainsyncer/db/migrations/default/versions/src/*
include *requirements.txt LICENSE.txt chainsyncer/data/config/*

View File

@ -0,0 +1,12 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

14
chainsyncer/cli/arg.py Normal file
View File

@ -0,0 +1,14 @@
# local imports
from .base import SyncFlag
def process_flags(argparser, flags):
if flags & SyncFlag.RANGE > 0:
argparser.add_argument('--offset', type=int, help='Block to start sync from. Default is start of history (0).')
argparser.add_argument('--until', type=int, default=-1, help='Block to stop sync on. Default is stop at block height of first run.')
if flags & SyncFlag.HEAD > 0:
argparser.add_argument('--head', action='store_true', help='Start from latest block as offset')
argparser.add_argument('--keep-alive', action='store_true', help='Do not stop syncing when caught up')
argparser.add_argument('--backend', type=str, help='Backend to use for state store')

7
chainsyncer/cli/base.py Normal file
View File

@ -0,0 +1,7 @@
# standard imports
import enum
class SyncFlag(enum.IntEnum):
RANGE = 1
HEAD = 2

20
chainsyncer/cli/config.py Normal file
View File

@ -0,0 +1,20 @@
# external imports
from chainsyncer.cli import SyncFlag
def process_config(config, args, flags):
args_override = {}
args_override['SYNCER_BACKEND'] = getattr(args, 'backend')
if flags & SyncFlag.RANGE:
args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
args_override['SYNCER_LIMIT'] = getattr(args, 'until')
config.dict_override(args_override, 'local cli args')
if flags & SyncFlag.HEAD:
config.add(getattr(args, 'keep_alive'), '_KEEP_ALIVE')
config.add(getattr(args, 'head'), '_HEAD')
return config

View File

@ -0,0 +1,4 @@
[syncer]
offset = 0
limit = 0
backend = mem

View File

@ -1 +1 @@
from .base import Syncer
from .base import SyncDriver

View File

@ -1,48 +1,21 @@
# standard imports
import uuid
import logging
import time
import signal
import json
# external imports
from chainlib.error import JSONRPCException
# local imports
from chainsyncer.filter import SyncFilter
from chainsyncer.error import (
SyncDone,
NoBlockForYou,
)
)
from chainsyncer.session import SyncSession
logg = logging.getLogger(__name__)
NS_DIV = 1000000000
def noop_callback(block, tx):
"""Logger-only callback for pre- and post processing.
:param block: Block object
:type block: chainlib.block.Block
:param tx: Transaction object
:type tx: chainlib.tx.Tx
"""
logg.debug('noop callback ({},{})'.format(block, tx))
class Syncer:
"""Base class for syncer implementations.
:param backend: Syncer state backend
:type backend: chainsyncer.backend.base.Backend implementation
:param chain_interface: Chain interface implementation
:type chain_interface: chainlib.interface.ChainInterface implementation
:param pre_callback: Function to call before polling. Function will receive no arguments.
:type pre_callback: function
:param block_callback: Function to call before processing txs in a retrieved block. Function should have signature as chainsyncer.driver.base.noop_callback
:type block_callback: function
:param post_callback: Function to call after polling. Function will receive no arguments.
:type post_callback: function
"""
class SyncDriver:
running_global = True
"""If set to false syncer will terminate polling loop."""
@ -55,19 +28,22 @@ class Syncer:
name = 'base'
"""Syncer name, to be overriden for each extended implementation."""
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
self.chain_interface = chain_interface
self.cursor = None
def __init__(self, store, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None):
self.store = store
self.running = True
self.backend = backend
self.filter = SyncFilter(backend)
self.block_callback = block_callback
self.pre_callback = pre_callback
self.post_callback = post_callback
if not Syncer.signal_set:
for sig in Syncer.signal_request:
self.block_callback = block_callback
self.idle_callback = idle_callback
self.last_start = 0
self.clock_id = time.CLOCK_MONOTONIC_RAW
self.store.connect()
self.store.start(offset=offset, target=target)
if not SyncDriver.signal_set:
for sig in SyncDriver.signal_request:
signal.signal(sig, self.__sig_terminate)
Syncer.signal_set = True
SyncDriver.signal_set = True
def __sig_terminate(self, sig, frame):
@ -79,48 +55,90 @@ class Syncer:
"""Set syncer to terminate as soon as possible.
"""
logg.info('termination requested!')
Syncer.running_global = False
SyncDriver.running_global = False
self.running = False
def add_filter(self, f):
"""Add filter to be processed for each transaction.
def run(self, conn, interval=1):
while self.running_global:
self.session = SyncSession(self.store)
item = self.session.start()
if item == None:
self.running = False
self.running_global = False
break
self.loop(conn, item, interval=interval)
:param f: Filter
:type f: Object instance implementing signature as in chainsyncer.filter.NoopFilter.filter
"""
self.filter.add(f)
self.backend.register_filter(str(f))
def idle(self, interval):
interval *= NS_DIV
idle_start = time.clock_gettime_ns(self.clock_id)
delta = idle_start - self.last_start
if delta > interval:
interval /= NS_DIV
time.sleep(interval)
return
if self.idle_callback != None:
r = True
while r:
before = time.clock_gettime_ns(self.clock_id)
r = self.idle_callback(interval)
after = time.clock_gettime_ns(self.clock_id)
delta = after - before
if delta < 0:
return
interval -= delta
if interval < 0:
return
interval /= NS_DIV
time.sleep(interval)
def loop(self, conn, item, interval=1):
logg.debug('started loop')
while self.running and SyncDriver.running_global:
self.last_start = time.clock_gettime_ns(self.clock_id)
if self.pre_callback != None:
self.pre_callback()
while True and self.running:
try:
block = self.get(conn, item)
except SyncDone as e:
logg.info('all blocks sumitted for processing: {}'.format(e))
return
except NoBlockForYou as e:
break
if self.block_callback != None:
self.block_callback(block, None)
try:
self.process(conn, item, block)
except IndexError:
item.next(advance_block=True)
time.sleep(self.yield_delay)
if self.store.target > -1 and block.number >= self.store.target:
self.running = False
if self.post_callback != None:
self.post_callback()
self.idle(interval)
def process_single(self, conn, block, tx):
"""Set syncer backend cursor to the given transaction index and block height, and apply all registered filters on transaction.
:param conn: RPC connection instance
:type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
:param block: Transaction object
:type block: chainlib.tx.Tx
"""
self.backend.set(block.number, tx.index)
self.filter.apply(conn, block, tx)
self.session.filter(conn, block, tx)
def loop(self, interval, conn):
raise NotImplementedError()
def process(self, conn, block):
def process(self, conn, item, block):
raise NotImplementedError()
def get(self, conn):
raise NotImplementedError()
def __str__(self):
return 'syncer "{}" {}'.format(
self.name,
self.backend,
)

View File

@ -0,0 +1,57 @@
# external imports
from chainlib.error import RPCException
# local imports
from chainsyncer.error import NoBlockForYou
from chainsyncer.driver import SyncDriver
class ChainInterfaceDriver(SyncDriver):
def __init__(self, store, chain_interface, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None):
super(ChainInterfaceDriver, self).__init__(store, offset=offset, target=target, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_callback, idle_callback=idle_callback)
self.chain_interface = chain_interface
def get(self, conn, item):
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
:param conn: RPC connection
:type conn: chainlib.connectin.RPCConnection
:raises NoBlockForYou: Block at the given height does not exist
:rtype: chainlib.block.Block
:returns: Block object
"""
o = self.chain_interface.block_by_number(item.cursor)
try:
r = conn.do(o)
except RPCException:
r = None
if r == None:
raise NoBlockForYou()
b = self.chain_interface.block_from_src(r)
b.txs = b.txs[item.tx_cursor:]
return b
def process(self, conn, item, block):
tx_src = None
i = item.tx_cursor
while True:
# handle block objects regardless of whether the tx data is embedded or not
try:
tx = block.tx(i)
except AttributeError:
tx_hash = block.txs[i]
o = self.chain_interface.tx_by_hash(tx_hash, block=block)
r = conn.do(o)
#tx = self.chain_interface.tx_from_src(tx_src, block=block)
rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash))
if rcpt != None:
tx.apply_receipt(self.chain_interface.src_normalize(rcpt))
self.process_single(conn, block, tx)
i += 1

View File

@ -1,86 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.eth.tx import (
transaction,
Tx,
)
from chainlib.error import RPCException
# local imports
from chainsyncer.error import NoBlockForYou
from .poll import BlockPollSyncer
logg = logging.getLogger(__name__)
class HeadSyncer(BlockPollSyncer):
"""Extends the block poller, implementing an open-ended syncer.
"""
name = 'head'
def process(self, conn, block):
"""Process a single block using the given RPC connection.
Processing means that all filters are executed on all transactions in the block.
If the block object does not contain the transaction details, the details will be retrieved from the network (incurring the corresponding performance penalty).
:param conn: RPC connection
:type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
"""
(pair, fltr) = self.backend.get()
logg.debug('process block {} (backend {}:{})'.format(block, pair, fltr))
i = pair[1] # set tx index from previous
tx_src = None
while True:
# handle block objects regardless of whether the tx data is embedded or not
try:
tx = block.tx(i)
except AttributeError:
o = transaction(block.txs[i])
r = conn.do(o)
tx_src = Tx.src_normalize(r)
tx = self.chain_interface.tx_from_src(tx_src, block=block)
#except IndexError as e:
# logg.debug('index error syncer tx get {}'.format(e))
# break
rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash))
if rcpt != None:
tx.apply_receipt(self.chain_interface.src_normalize(rcpt))
self.process_single(conn, block, tx)
self.backend.reset_filter()
i += 1
def get(self, conn):
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
:param conn: RPC connection
:type conn: chainlib.connectin.RPCConnection
:raises NoBlockForYou: Block at the given height does not exist
:rtype: chainlib.block.Block
:returns: Block object
"""
(height, flags) = self.backend.get()
block_number = height[0]
block_hash = []
o = self.chain_interface.block_by_number(block_number)
try:
r = conn.do(o)
except RPCException:
r = None
if r == None:
raise NoBlockForYou()
b = self.chain_interface.block_from_src(r)
b.txs = b.txs[height[1]:]
return b

View File

@ -1,56 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.error import RPCException
# local imports
from .head import HeadSyncer
from chainsyncer.error import SyncDone
from chainlib.error import RPCException
logg = logging.getLogger(__name__)
class HistorySyncer(HeadSyncer):
"""Bounded syncer implementation of the block poller. Reuses the head syncer process method implementation.
"""
name = 'history'
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None):
super(HeadSyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
self.block_target = None
(block_number, flags) = self.backend.target()
if block_number == None:
raise AttributeError('backend has no future target. Use HeadSyner instead')
self.block_target = block_number
def get(self, conn):
"""Retrieve the block currently defined by the syncer cursor from the RPC provider.
:param conn: RPC connection
:type conn: chainlib.connectin.RPCConnection
:raises SyncDone: Block target reached (at which point the syncer should terminate).
:rtype: chainlib.block.Block
:returns: Block object
:todo: DRY against HeadSyncer
"""
(height, flags) = self.backend.get()
if self.block_target < height[0]:
raise SyncDone(self.block_target)
block_number = height[0]
block_hash = []
o = self.chain_interface.block_by_number(block_number, include_tx=True)
try:
r = conn.do(o)
# TODO: Disambiguate whether error is temporary or permanent, if permanent, SyncDone should be raised, because a historical sync is attempted into the future
except RPCException:
r = None
if r == None:
raise SyncDone()
b = self.chain_interface.block_from_src(r)
return b

View File

@ -1,99 +0,0 @@
# standard imports
import logging
import time
# local imports
from .base import Syncer
from chainsyncer.error import (
SyncDone,
NoBlockForYou,
)
logg = logging.getLogger(__name__)
NS_DIV = 1000000000
class BlockPollSyncer(Syncer):
"""Syncer driver implementation of chainsyncer.driver.base.Syncer that retrieves new blocks through polling.
"""
name = 'blockpoll'
def __init__(self, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, idle_callback=None):
super(BlockPollSyncer, self).__init__(backend, chain_interface, pre_callback=pre_callback, block_callback=block_callback, post_callback=post_callback)
self.idle_callback = idle_callback
self.last_start = 0
self.clock_id = time.CLOCK_MONOTONIC_RAW
def idle(self, interval):
interval *= NS_DIV
idle_start = time.clock_gettime_ns(self.clock_id)
delta = idle_start - self.last_start
if delta > interval:
interval /= NS_DIV
time.sleep(interval)
return
if self.idle_callback != None:
r = True
while r:
before = time.clock_gettime_ns(self.clock_id)
r = self.idle_callback(interval)
after = time.clock_gettime_ns(self.clock_id)
delta = after - before
if delta < 0:
return
interval -= delta
if interval < 0:
return
interval /= NS_DIV
time.sleep(interval)
def loop(self, interval, conn):
"""Indefinite loop polling the given RPC connection for new blocks in the given interval.
:param interval: Seconds to wait for next poll after processing of previous poll has been completed.
:type interval: int
:param conn: RPC connection
:type conn: chainlib.connection.RPCConnection
:rtype: tuple
:returns: See chainsyncer.backend.base.Backend.get
"""
(pair, fltr) = self.backend.get()
start_tx = pair[1]
while self.running and Syncer.running_global:
self.last_start = time.clock_gettime_ns(self.clock_id)
if self.pre_callback != None:
self.pre_callback()
while True and self.running:
if start_tx > 0:
start_tx -= 1
continue
try:
block = self.get(conn)
except SyncDone as e:
logg.info('all blocks sumitted for processing: {}'.format(e))
return self.backend.get()
except NoBlockForYou as e:
break
if self.block_callback != None:
self.block_callback(block, None)
last_block = block
try:
self.process(conn, block)
except IndexError:
self.backend.set(block.number + 1, 0)
start_tx = 0
time.sleep(self.yield_delay)
if self.post_callback != None:
self.post_callback()
self.idle(interval)

View File

@ -1,133 +0,0 @@
# standard imports
import logging
#import threading
import multiprocessing
import queue
# external imports
from chainlib.error import RPCException
# local imports
from .history import HistorySyncer
from chainsyncer.error import SyncDone
logg = logging.getLogger(__name__)
class ThreadedHistorySyncer(HistorySyncer):
def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0):
super(ThreadedHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
self.workers = []
if conn_limit == 0:
conn_limit = thread_limit
#self.conn_pool = queue.Queue(conn_limit)
#self.queue = queue.Queue(thread_limit)
#self.quit_queue = queue.Queue(1)
self.conn_pool = multiprocessing.Queue(conn_limit)
self.queue = multiprocessing.Queue(thread_limit)
self.quit_queue = multiprocessing.Queue(1)
#self.lock = threading.Lock()
self.lock = multiprocessing.Lock()
for i in range(thread_limit):
#w = threading.Thread(target=self.worker)
w = multiprocessing.Process(target=self.worker)
self.workers.append(w)
for i in range(conn_limit):
self.conn_pool.put(conn_factory())
def terminate(self):
self.quit_queue.put(())
super(ThreadedHistorySyncer, self).terminate()
def worker(self):
while True:
block_number = None
try:
block_number = self.queue.get(timeout=0.01)
except queue.Empty:
if self.quit_queue.qsize() > 0:
#logg.debug('{} received quit'.format(threading.current_thread().getName()))
logg.debug('{} received quit'.format(multiprocessing.current_process().name))
return
continue
conn = self.conn_pool.get()
try:
logg.debug('processing parent {} {}'.format(conn, block_number))
self.process_parent(conn, block_number)
except IndexError:
pass
except RPCException as e:
logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
self.queue.put(block_number)
conn = self.conn_pool.put(conn)
def process_parent(self, conn, block_number):
logg.debug('getting block {}'.format(block_number))
o = self.chain_interface.block_by_number(block_number)
r = conn.do(o)
block = self.chain_interface.block_from_src(r)
logg.debug('got block typ {}'.format(type(block)))
super(ThreadedHistorySyncer, self).process(conn, block)
def process_single(self, conn, block, tx):
self.filter.apply(conn, block, tx)
def process(self, conn, block):
pass
#def process(self, conn, block):
def get(self, conn):
if not self.running:
raise SyncDone()
block_number = None
tx_index = None
flags = None
((block_number, tx_index), flags) = self.backend.get()
try:
#logg.debug('putting {}'.format(block.number))
#self.queue.put((conn, block_number,), timeout=0.1)
self.queue.put(block_number, timeout=0.1)
except queue.Full:
#logg.debug('queue full, try again')
return
target, flags = self.backend.target()
next_block = block_number + 1
if next_block > target:
self.quit_queue.put(())
raise SyncDone()
self.backend.set(self.backend.block_height + 1, 0)
# def get(self, conn):
# try:
# r = super(ThreadedHistorySyncer, self).get(conn)
# return r
# except SyncDone as e:
# self.quit_queue.put(())
# raise e
def loop(self, interval, conn):
for w in self.workers:
w.start()
r = super(ThreadedHistorySyncer, self).loop(interval, conn)
for w in self.workers:
w.join()
while True:
try:
self.quit_queue.get_nowait()
except queue.Empty:
break
logg.info('workers done {}'.format(r))

View File

@ -1,170 +0,0 @@
# standard imports
import logging
#import threading
import multiprocessing
import queue
import time
# external imports
from chainlib.error import RPCException
# local imports
from .history import HistorySyncer
from chainsyncer.error import SyncDone
logg = logging.getLogger(__name__)
def foobarcb(v):
logg.debug('foooz {}'.format(v))
class ThreadPoolTask:
process_func = None
chain_interface = None
def poolworker(self, block_number, conn):
# conn = args[1].get()
try:
logg.debug('processing parent {} {}'.format(conn, block_number))
#self.process_parent(self.conn, block_number)
self.process_parent(conn, block_number)
except IndexError:
pass
except RPCException as e:
logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
raise e
#self.queue.put(block_number)
# conn = self.conn_pool.put(conn)
def process_parent(self, conn, block_number):
logg.debug('getting block {}'.format(block_number))
o = self.chain_interface.block_by_number(block_number)
r = conn.do(o)
block = self.chain_interface.block_from_src(r)
logg.debug('got block typ {}'.format(type(block)))
#super(ThreadedHistorySyncer, self).process(conn, block)
self.process_func(conn, block)
class ThreadPoolHistorySyncer(HistorySyncer):
def __init__(self, conn_factory, thread_limit, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, conn_limit=0):
super(ThreadPoolHistorySyncer, self).__init__(backend, chain_interface, pre_callback, block_callback, post_callback)
self.workers = []
self.thread_limit = thread_limit
if conn_limit == 0:
self.conn_limit = self.thread_limit
#self.conn_pool = queue.Queue(conn_limit)
#self.queue = queue.Queue(thread_limit)
#self.quit_queue = queue.Queue(1)
#self.conn_pool = multiprocessing.Queue(conn_limit)
#self.queue = multiprocessing.Queue(thread_limit)
#self.quit_queue = multiprocessing.Queue(1)
#self.lock = threading.Lock()
#self.lock = multiprocessing.Lock()
ThreadPoolTask.process_func = super(ThreadPoolHistorySyncer, self).process
ThreadPoolTask.chain_interface = chain_interface
#for i in range(thread_limit):
#w = threading.Thread(target=self.worker)
# w = multiprocessing.Process(target=self.worker)
# self.workers.append(w)
#for i in range(conn_limit):
# self.conn_pool.put(conn_factory())
self.conn_factory = conn_factory
self.worker_pool = None
def terminate(self):
#self.quit_queue.put(())
super(ThreadPoolHistorySyncer, self).terminate()
# def worker(self):
# while True:
# block_number = None
# try:
# block_number = self.queue.get(timeout=0.01)
# except queue.Empty:
# if self.quit_queue.qsize() > 0:
# #logg.debug('{} received quit'.format(threading.current_thread().getName()))
# logg.debug('{} received quit'.format(multiprocessing.current_process().name))
# return
# continue
# conn = self.conn_pool.get()
# try:
# logg.debug('processing parent {} {}'.format(conn, block_number))
# self.process_parent(conn, block_number)
# except IndexError:
# pass
# except RPCException as e:
# logg.error('RPC failure for block {}, resubmitting to queue: {}'.format(block, e))
# self.queue.put(block_number)
# conn = self.conn_pool.put(conn)
#
def process_single(self, conn, block, tx):
self.filter.apply(conn, block, tx)
def process(self, conn, block):
pass
def get(self, conn):
if not self.running:
raise SyncDone()
block_number = None
tx_index = None
flags = None
((block_number, tx_index), flags) = self.backend.get()
#try:
#logg.debug('putting {}'.format(block.number))
#self.queue.put((conn, block_number,), timeout=0.1)
#self.queue.put(block_number, timeout=0.1)
#except queue.Full:
#logg.debug('queue full, try again')
# return
task = ThreadPoolTask()
conn = self.conn_factory()
self.worker_pool.apply_async(task.poolworker, (block_number, conn,), {}, foobarcb)
target, flags = self.backend.target()
next_block = block_number + 1
if next_block > target:
#self.quit_queue.put(())
self.worker_pool.close()
raise SyncDone()
self.backend.set(self.backend.block_height + 1, 0)
# def get(self, conn):
# try:
# r = super(ThreadedHistorySyncer, self).get(conn)
# return r
# except SyncDone as e:
# self.quit_queue.put(())
# raise e
def loop(self, interval, conn):
self.worker_pool = multiprocessing.Pool(self.thread_limit)
#for w in self.workers:
# w.start()
r = super(ThreadPoolHistorySyncer, self).loop(interval, conn)
#for w in self.workers:
# w.join()
#while True:
# try:
# self.quit_queue.get_nowait()
# except queue.Empty:
# break
time.sleep(1)
self.worker_pool.join()
logg.info('workers done {}'.format(r))

View File

@ -1,81 +0,0 @@
# standard imports
import copy
import logging
import multiprocessing
import os
# external iports
from chainlib.eth.connection import RPCConnection
# local imports
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.driver.base import Syncer
from .threadpool import ThreadPoolTask
logg = logging.getLogger(__name__)
def sync_split(block_offset, block_target, count):
block_count = block_target - block_offset
if block_count < count:
logg.warning('block count is less than thread count, adjusting thread count to {}'.format(block_count))
count = block_count
blocks_per_thread = int(block_count / count)
ranges = []
for i in range(count):
block_target = block_offset + blocks_per_thread
offset = block_offset
target = block_target -1
ranges.append((offset, target,))
block_offset = block_target
return ranges
class ThreadPoolRangeTask:
def __init__(self, backend, sync_range, chain_interface, syncer_factory=HistorySyncer, filters=[]):
backend_start = backend.start()
backend_target = backend.target()
backend_class = backend.__class__
tx_offset = 0
flags = 0
if sync_range[0] == backend_start[0][0]:
tx_offset = backend_start[0][1]
flags = backend_start[1]
self.backend = backend_class.custom(backend.chain_spec, sync_range[1], block_offset=sync_range[0], tx_offset=tx_offset, flags=flags, flags_count=0)
self.syncer = syncer_factory(self.backend, chain_interface)
for fltr in filters:
self.syncer.add_filter(fltr)
def start_loop(self, interval):
conn = RPCConnection.connect(self.backend.chain_spec)
return self.syncer.loop(interval, conn)
class ThreadPoolRangeHistorySyncer:
def __init__(self, thread_count, backend, chain_interface, pre_callback=None, block_callback=None, post_callback=None, runlevel_callback=None):
self.src_backend = backend
self.thread_count = thread_count
self.single_sync_offset = 0
self.runlevel_callback = None
backend_start = backend.start()
backend_target = backend.target()
self.ranges = sync_split(backend_start[0][0], backend_target[0], thread_count)
self.chain_interface = chain_interface
self.filters = []
def add_filter(self, f):
self.filters.append(f)
def loop(self, interval, conn):
self.worker_pool = multiprocessing.Pool(processes=self.thread_count)
for sync_range in self.ranges:
task = ThreadPoolRangeTask(self.src_backend, sync_range, self.chain_interface, filters=self.filters)
t = self.worker_pool.apply_async(task.start_loop, (0.1,))
logg.debug('result of worker {}: {}'.format(t, t.get()))
self.worker_pool.close()
self.worker_pool.join()

View File

@ -3,6 +3,7 @@ class SyncDone(Exception):
"""
pass
class NoBlockForYou(Exception):
"""Exception raised when attempt to retrieve a block from network that does not (yet) exist.
"""
@ -27,6 +28,20 @@ class LockError(Exception):
pass
class FilterDone(Exception):
"""Exception raised when all registered filters have been executed
"""
class InterruptError(FilterDone):
"""Exception for interrupting or attempting to use an interrupted sync
"""
class IncompleteFilterError(Exception):
"""Exception raised if filter reset is executed prematurely
"""
#class AbortTx(Exception):
# """
# """

View File

@ -1,96 +1,131 @@
# standard imports
import hashlib
import logging
# local imports
from .error import BackendError
import re
import os
logg = logging.getLogger(__name__)
re_processedname = r'^_?[A-Z,\.]*$'
class SyncFilter:
"""Manages the collection of filters on behalf of a specific backend.
A filter is a pluggable piece of code to execute for every transaction retrieved by the syncer. Filters are executed in the sequence they were added to the instance.
:param backend: Syncer backend to apply filter state changes to
:type backend: chainsyncer.backend.base.Backend implementation
"""
def __init__(self, backend):
self.filters = []
self.backend = backend
def sum(self):
s = self.common_name()
h = hashlib.sha256()
h.update(s.encode('utf-8'))
return h.digest()
def add(self, fltr):
"""Add a filter instance.
:param fltr: Filter instance.
:type fltr: Object instance implementing signature as in chainsyncer.filter.NoopFilter.filter
:raises ValueError: Object instance is incorrect implementation
"""
if getattr(fltr, 'filter') == None:
raise ValueError('filter object must implement have method filter')
logg.debug('added filter "{}"'.format(str(fltr)))
self.filters.append(fltr)
def filter(self, conn, block, tx):
raise NotImplementedError()
def __apply_one(self, fltr, idx, conn, block, tx, session):
self.backend.begin_filter(idx)
fltr.filter(conn, block, tx, session)
self.backend.complete_filter(idx)
def common_name(self):
s = self.__module__ + '.' + self.__class__.__name__
return s.replace('.', '_')
def apply(self, conn, block, tx):
"""Apply all registered filters on the given transaction.
# TODO: properly clarify interface shared with syncfsstore, move to filter module?
class FilterState:
:param conn: RPC Connection, will be passed to the filter method
:type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
:param tx: Transaction object
:type tx: chainlib.tx.Tx
:raises BackendError: Backend connection failed
"""
session = None
try:
session = self.backend.connect()
except TimeoutError as e:
self.backend.disconnect()
raise BackendError('database connection fail: {}'.format(e))
i = 0
(pair, flags) = self.backend.get()
for f in self.filters:
if not self.backend.check_filter(i, flags):
logg.debug('applying filter {} {}'.format(str(f), flags))
self.__apply_one(f, i, conn, block, tx, session)
else:
logg.debug('skipping previously applied filter {} {}'.format(str(f), flags))
i += 1
def __init__(self, state_store, scan=None):
self.state_store = state_store
self.digest = b'\x00' * 32
self.summed = False
self.__syncs = {}
self.synced = False
self.connected = False
self.state_store.add('DONE')
self.state_store.add('LOCK')
self.state_store.add('INTERRUPT')
self.state_store.add('RESET')
self.backend.disconnect()
self.state = self.state_store.state
self.elements = self.state_store.elements
self.put = self.state_store.put
self.mask = self.state_store.mask
self.name = self.state_store.name
self.set = self.state_store.set
self.next = self.state_store.next
self.move = self.state_store.move
self.unset = self.state_store.unset
self.peek = self.state_store.peek
self.from_name = self.state_store.from_name
self.list = self.state_store.list
self.state_store.sync()
self.all = self.state_store.all
self.started = False
self.scan = scan
class NoopFilter:
"""A noop implemenation of a sync filter.
Logs the filter inputs at debug log level.
"""
def filter(self, conn, block, tx, db_session=None):
"""Filter method implementation:
:param conn: RPC Connection, will be passed to the filter method
:type conn: chainlib.connection.RPCConnection
:param block: Block object
:type block: chainlib.block.Block
:param tx: Transaction object
:type tx: chainlib.tx.Tx
:param db_session: Backend session object
:type db_session: varies
"""
logg.debug('noop filter :received\n{} {} {}'.format(block, tx, id(db_session)))
def __verify_sum(self, v):
if not isinstance(v, bytes) and not isinstance(v, bytearray):
raise ValueError('argument must be instance of bytes')
if len(v) != 32:
raise ValueError('argument must be 32 bytes')
def __str__(self):
return 'noopfilter'
def register(self, fltr):
if self.summed:
raise RuntimeError('filter already applied')
z = fltr.sum()
self.__verify_sum(z)
self.digest += z
s = fltr.common_name()
self.state_store.add(s)
n = self.state_store.from_name(s)
logg.debug('add filter {} {} {}'.format(s, n, self))
def sum(self):
h = hashlib.sha256()
h.update(self.digest)
self.digest = h.digest()
self.summed = True
return self.digest
def connect(self):
if not self.synced:
for v in self.state_store.all():
k = self.state_store.from_name(v)
self.state_store.sync(k)
self.__syncs[v] = True
if self.scan != None:
ks = self.scan()
for v in ks: #os.listdir(self.scan_path):
logg.debug('ks {}'.format(v))
k = None
try:
k = self.state_store.from_elements(v)
self.state_store.alias(v, k)
except ValueError:
k = self.state_store.from_name(v)
self.state_store.sync(k)
self.__syncs[v] = True
self.synced = True
self.connected = True
def disconnect(self):
self.connected = False
def start(self, offset=0, target=-1):
self.state_store.start(offset=offset, target=target)
self.started = True
def get(self, k):
return None
def next_item(self):
return None
def filters(self):
return []

1
chainsyncer/paths.py Normal file
View File

@ -0,0 +1 @@

View File

@ -0,0 +1,146 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import sys
import importlib
# external imports
import chainlib.cli
from shep.persist import PersistedState
# local imports
import chainsyncer.cli
from chainsyncer.settings import ChainsyncerSettings
from chainsyncer.store import SyncStore
from chainsyncer.filter import (
FilterState,
SyncFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
valid_fwd = [
'fwd',
'forward',
'next',
'continue',
]
valid_rwd = [
'rwd',
'rewind',
'current',
'back',
'repeat',
'replay',
]
action_is_forward = False
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--state-dir', type=str, dest='state_dir', help='State directory')
argparser.add_positional('action', type=str, help='Action to take on lock. Repeat means re-run the locked filter. Continue means resume execution for next filter.')
sync_flags = chainsyncer.cli.SyncFlag.RANGE | chainsyncer.cli.SyncFlag.HEAD
chainsyncer.cli.process_flags(argparser, sync_flags)
args = argparser.parse_args()
if args.action in valid_fwd:
action_is_forward = True
elif args.action not in valid_rwd:
sys.stderr.write('action argument must be one of {} or {}\n'.format(valid_rwd, valid_fwd))
sys.exit(1)
base_config_dir = chainsyncer.cli.config_dir,
config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
config = chainsyncer.cli.process_config(config, args, sync_flags)
config.add(args.state_dir, '_STATE_DIR', False)
logg.debug('config loaded:\n{}'.format(config))
settings = ChainsyncerSettings()
settings.process_sync_backend(config)
logg.debug('settings:\n{}'.format(str(settings)))
class FilterInNameOnly(SyncFilter):
def __init__(self, k):
self.k = k
def common_name(self):
return self.k
def main():
if settings.get('SYNCER_BACKEND') == 'mem':
raise ValueError('cannot unlock volatile state store')
state_dir = config.get('_STATE_DIR')
if config.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
store = syncer_store_class(state_dir)
filter_list = store.load_filter_list()
for i, k in enumerate(filter_list):
fltr = FilterInNameOnly(k)
store.register(fltr)
filter_list[i] = k.upper()
store.connect()
store.start(ignore_lock=True)
lock_state = store.filter_state.from_name('LOCK')
locked_item = store.filter_state.list(lock_state)
if len(locked_item) == 0:
sys.stderr.write('Sync filter in {} is not locked\n'.format(state_dir))
sys.exit(1)
elif len(locked_item) > 1:
sys.stderr.write('More than one locked item encountered in {}. That should never happen, so I do not know what to do next.\n'.format(state_dir))
sys.exit(1)
locked_item_key = locked_item[0]
locked_item = store.get(int(locked_item_key))
locked_state = store.filter_state.state(locked_item_key) - lock_state
locked_state_name = store.filter_state.name(locked_state)
logg.info('found item "{}" in locked state {}'.format(locked_item, store.filter_state.name(locked_state)))
if action_is_forward:
k = locked_state_name
filter_index = None
filter_index = filter_list.index(k)
filter_pos = filter_index + 1
filter_count = len(filter_list)
logg.debug('Locked filter {} found at position {} of {}'.format(k, filter_pos, filter_count))
if filter_pos == filter_count:
logg.info('Locked filter {} is the last filter in the list. Executing filter reset'.format(k))
locked_item.reset(check_incomplete=False)
else:
locked_item.advance(ignore_lock=True)
store.filter_state.unset(locked_item_key, lock_state)
else:
filter_mask = 0xf
filter_state = store.filter_state.mask(locked_state, filter_mask)
logg.info('Chosen action is "{}": will continue execution at previous filter {}'.format(args.action, store.filter_state.name(filter_state)))
store.filter_state.unset(locked_item_key, lock_state)
if __name__ == '__main__':
main()

View File

@ -1,23 +1,42 @@
# standard imports
import uuid
import logging
# local imports
from chainsyncer.error import FilterDone
logg = logging.getLogger(__name__)
class SyncSession:
def __init__(self, state_store, session_id=None, is_default=False):
if session_id == None:
session_id = str(uuid.uuid4())
is_default = True
self.session_id = session_id
self.is_default = is_default
self.state_store = state_store
self.filters = []
def __init__(self, session_store):
self.session_store = session_store
self.started = self.session_store.started
self.get = self.session_store.get
self.next = self.session_store.next_item
self.item = None
self.filters = self.session_store.filters
def add_filter(self, fltr):
self.state_store.register(fltr)
self.filters.append(fltr)
def start(self, offset=0, target=-1):
self.session_store.start(offset=offset, target=target)
self.item = self.session_store.next_item()
return self.item
def start(self):
self.state_store.start()
def stop(self, item):
self.session_store.stop(item)
def filter(self, conn, block, tx):
self.session_store.connect()
for fltr in self.filters:
logg.debug('executing filter {}'.format(fltr))
self.item.advance()
interrupt = fltr.filter(conn, block, tx)
if not self.item.release(interrupt=interrupt):
break
self.item.reset()
self.next()
self.session_store.disconnect()

55
chainsyncer/settings.py Normal file
View File

@ -0,0 +1,55 @@
# standard imports
import logging
# external imports
from hexathon import (
to_int as hex_to_int,
strip_0x,
)
from chainlib.settings import ChainSettings
logg = logging.getLogger(__name__)
class ChainsyncerSettings(ChainSettings):
def process_sync_backend(self, config):
self.o['SYNCER_BACKEND'] = config.get('SYNCER_BACKEND')
def process_sync_range(self, config):
o = self.o['SYNCER_INTERFACE'].block_latest()
r = self.o['RPC'].do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
keep_alive = False
session_block_offset = 0
block_limit = 0
until = 0
if config.true('_HEAD'):
self.o['SYNCER_OFFSET'] = block_offset
self.o['SYNCER_LIMIT'] = -1
return
session_block_offset = int(config.get('SYNCER_OFFSET'))
until = int(config.get('SYNCER_LIMIT'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
elif until == -1:
keep_alive = True
if session_block_offset == -1:
session_block_offset = block_offset
elif config.true('_KEEP_ALIVE'):
block_limit = -1
else:
if block_limit == 0:
block_limit = block_offset
self.o['SYNCER_OFFSET'] = session_block_offset
self.o['SYNCER_LIMIT'] = block_limit

View File

@ -1 +0,0 @@
from .base import SyncState

View File

@ -1,42 +0,0 @@
# standard imports
import hashlib
class SyncState:
def __init__(self, state_store):
self.store = state_store
self.digest = b'\x00' * 32
self.summed = False
self.synced = {}
def __verify_sum(self, v):
if not isinstance(v, bytes) and not isinstance(v, bytearray):
raise ValueError('argument must be instance of bytes')
if len(v) != 32:
raise ValueError('argument must be 32 bytes')
def register(self, fltr):
if self.summed:
raise RuntimeError('filter already applied')
z = fltr.sum()
self.__verify_sum(z)
self.digest += z
s = fltr.common_name()
self.store.add('i_' + s)
self.store.add('o_' + s)
def sum(self):
h = hashlib.sha256()
h.update(self.digest)
self.digest = h.digest()
self.summed = True
return self.digest
def start(self):
for v in self.store.all():
self.store.sync(v)
self.synced[v] = True

View File

@ -0,0 +1 @@
from .base import *

322
chainsyncer/store/base.py Normal file
View File

@ -0,0 +1,322 @@
# standard imports
import os
import logging
# local imports
from shep.persist import PersistedState
from shep import State
from shep.error import StateInvalid
from chainsyncer.filter import FilterState
from chainsyncer.error import (
LockError,
FilterDone,
InterruptError,
IncompleteFilterError,
SyncDone,
)
logg = logging.getLogger(__name__)
def sync_state_serialize(block_height, tx_index, block_target):
b = block_height.to_bytes(4, 'big')
b += tx_index.to_bytes(4, 'big')
b += block_target.to_bytes(4, 'big', signed=True)
return b
def sync_state_deserialize(b):
block_height = int.from_bytes(b[:4], 'big')
tx_index = int.from_bytes(b[4:8], 'big')
block_target = int.from_bytes(b[8:], 'big', signed=True)
return (block_height, tx_index, block_target,)
# NOT thread safe
class SyncItem:
def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_lock=False):
self.offset = offset
self.target = target
self.sync_state = sync_state
self.filter_state = filter_state
self.state_key = str(offset)
logg.debug('get key {}'.format(self.state_key))
v = self.sync_state.get(self.state_key)
(self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
filter_state = self.filter_state.state(self.state_key)
if filter_state & self.filter_state.from_name('LOCK') > 0 and not ignore_lock:
raise LockError(self.state_key)
self.count = len(self.filter_state.all(pure=True)) - 4
self.skip_filter = False
if self.count == 0:
self.skip_filter = True
elif not started:
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
def __check_done(self):
if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0:
raise InterruptError(self.state_key)
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') > 0:
raise FilterDone(self.state_key)
def resume(self):
filter_state = self.filter_state.state(self.state_key)
if filter_state > 0x0f:
filter_state_part = self.filter_state.mask(filter_state, 0x0f)
if len(self.filter_state.elements(filter_state)) == 1:
logg.info('resume execution on state {} ({})'.format(self.filter_state.name(filter_state_part), filter_state_part))
lock_state = self.filter_state.from_name('LOCK')
self.filter_state.set(lock_state)
def reset(self, check_incomplete=True):
if check_incomplete:
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
raise LockError('reset attempt on {} when state locked'.format(self.state_key))
if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
def next(self, advance_block=False):
v = self.sync_state.state(self.state_key)
if v == self.sync_state.DONE:
raise SyncDone(self.target)
elif v == self.sync_state.NEW:
self.sync_state.next(self.state_key)
v = self.sync_state.get(self.state_key)
(block_number, tx_index, target) = sync_state_deserialize(v)
if advance_block:
block_number += 1
tx_index = 0
if self.target >= 0 and block_number > self.target:
self.sync_state.move(self.state_key, self.sync_state.DONE)
raise SyncDone(self.target)
else:
tx_index += 1
self.cursor = block_number
self.tx_cursor = tx_index
b = sync_state_serialize(block_number, tx_index, target)
self.sync_state.replace(self.state_key, b)
def __find_advance(self):
v = self.filter_state.state(self.state_key)
def advance(self, ignore_lock=False):
if self.skip_filter:
raise FilterDone()
self.__check_done()
if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
if ignore_lock:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
else:
raise LockError('advance attempt on {} when state locked'.format(self.state_key))
done = False
try:
self.filter_state.next(self.state_key)
except StateInvalid:
done = True
if done:
raise FilterDone()
self.filter_state.set(self.state_key, self.filter_state.from_name('LOCK'))
def release(self, interrupt=False):
if self.skip_filter:
return False
if interrupt == True:
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
return False
state = self.filter_state.state(self.state_key)
if state & self.filter_state.from_name('LOCK') == 0:
raise LockError('release attempt on {} when state unlocked'.format(self.state_key))
self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
try:
c = self.filter_state.peek(self.state_key)
logg.debug('peeked {}'.format(c))
except StateInvalid:
self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
return False
return True
def __str__(self):
return 'syncitem offset {} target {} cursor {}'.format(self.offset, self.target, self.cursor)
class SyncStore:
def __init__(self, path, session_id=None):
self.session_id = session_id
self.session_path = None
self.is_default = False
self.first = False
self.target = None
self.items = {}
self.item_keys = []
self.started = False
self.thresholds = []
self.session_path = path
def setup_sync_state(self, factory=None, event_callback=None):
if factory == None:
self.state = State(2, event_callback=event_callback)
else:
self.state = PersistedState(factory.add, 2, event_callback=event_callback)
self.state.add('SYNC')
self.state.add('DONE')
def setup_filter_state(self, factory=None, event_callback=None):
if factory == None:
filter_state_backend = State(0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend)
else:
filter_state_backend = PersistedState(factory.add, 0, check_alias=False, event_callback=event_callback)
self.filter_state = FilterState(filter_state_backend, scan=factory.ls)
self.filters = []
def set_target(self, v):
pass
def get_target(self):
return None
def register(self, fltr):
self.filters.append(fltr)
self.filter_state.register(fltr)
def start(self, offset=0, target=-1, ignore_lock=False):
if self.started:
return
self.save_filter_list()
self.load(target, ignore_lock=ignore_lock)
if self.first:
state_bytes = sync_state_serialize(offset, 0, target)
block_number_str = str(offset)
self.state.put(block_number_str, contents=state_bytes)
self.filter_state.put(block_number_str)
o = SyncItem(offset, target, self.state, self.filter_state, ignore_lock=ignore_lock)
o.resume()
self.items[offset] = o
self.item_keys.append(offset)
elif offset > 0:
logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id))
self.started = True
self.item_keys.sort()
def stop(self, item):
if item.target == -1:
state_bytes = sync_state_serialize(item.cursor, 0, item.cursor)
self.state.replace(str(item.offset), state_bytes)
self.filter_state.put(str(item.cursor))
SyncItem(item.offset, -1, self.state, self.filter_state)
logg.info('New sync state start at block number {} for next head sync backfill'.format(item.cursor))
self.state.move(item.state_key, self.state.DONE)
state_bytes = sync_state_serialize(item.cursor, 0, -1)
self.state.put(str(item.cursor), contents=state_bytes)
def load(self, target, ignore_lock=False):
self.state.sync(self.state.NEW)
self.state.sync(self.state.SYNC)
thresholds_sync = []
for v in self.state.list(self.state.SYNC):
block_number = int(v)
thresholds_sync.append(block_number)
logg.debug('queue resume {}'.format(block_number))
thresholds_new = []
for v in self.state.list(self.state.NEW):
block_number = int(v)
thresholds_new.append(block_number)
logg.debug('queue new range {}'.format(block_number))
thresholds_sync.sort()
thresholds_new.sort()
thresholds = thresholds_sync + thresholds_new
lim = len(thresholds) - 1
for i in range(len(thresholds)):
item_target = target
if i < lim:
item_target = thresholds[i+1]
o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True, ignore_lock=ignore_lock)
o.resume()
self.items[block_number] = o
self.item_keys.append(block_number)
logg.info('added existing {}'.format(o))
self.get_target()
if len(thresholds) == 0:
if self.target != None:
logg.warning('sync "{}" is already done, nothing to do'.format(self.session_id))
else:
logg.info('syncer first run target {}'.format(target))
self.first = True
self.set_target(target)
def get(self, k):
return self.items[k]
def next_item(self):
try:
k = self.item_keys.pop(0)
except IndexError:
return None
return self.items[k]
def connect(self):
self.filter_state.connect()
def disconnect(self):
self.filter_state.disconnect()
def save_filter_list(self):
raise NotImplementedError()
def load_filter_list(self):
raise NotImplementedError()
def peek_next_filter(self):
pass
def peek_current_filter(self):
pass

98
chainsyncer/store/fs.py Normal file
View File

@ -0,0 +1,98 @@
# standard imports
import uuid
import os
import logging
# external imports
from shep.store.file import SimpleFileStoreFactory
# local imports
from chainsyncer.store import SyncStore
logg = logging.getLogger(__name__)
class SyncFsStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncFsStore, self).__init__(base_path, session_id=session_id)
create_path = False
try:
os.stat(self.session_path)
except FileNotFoundError:
create_path = True
if create_path:
self.__create_path(base_path, self.default_path, session_id=session_id)
self.session_id = os.path.basename(self.session_path)
logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
base_sync_path = os.path.join(self.session_path, 'sync')
factory = SimpleFileStoreFactory(base_sync_path, binary=True)
self.setup_sync_state(factory, state_event_callback)
self.setup_filter_state(callback=filter_state_event_callback)
def setup_filter_state(self, callback=None):
base_filter_path = os.path.join(self.session_path, 'filter')
factory = SimpleFileStoreFactory(base_filter_path, binary=True)
super(SyncFsStore, self).setup_filter_state(factory, callback)
def __create_path(self, base_path, default_path, session_id=None):
logg.debug('fs store path {} does not exist, creating'.format(self.session_path))
if session_id == None:
session_id = str(uuid.uuid4())
self.session_path = os.path.join(base_path, session_id)
os.makedirs(self.session_path)
if self.is_default:
try:
os.symlink(self.session_path, default_path)
except FileExistsError:
pass
def get_target(self):
fp = os.path.join(self.session_path, 'target')
try:
f = open(fp, 'r')
v = f.read()
f.close()
self.target = int(v)
except FileNotFoundError as e:
logg.debug('cant find target {} {}'.format(fp, e))
pass
def set_target(self, v):
fp = os.path.join(self.session_path, 'target')
f = open(fp, 'w')
f.write(str(v))
f.close()
self.target = v
def load_filter_list(self):
fltr = []
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'r')
while True:
v = f.readline()
if len(v) == 0:
break
v = v.rstrip()
fltr.append(v)
f.close()
return fltr
def save_filter_list(self):
fp = os.path.join(self.session_path, 'filter_list')
f = open(fp, 'w')
for fltr in self.filters:
f.write(fltr.common_name() + '\n')
f.close()

45
chainsyncer/store/mem.py Normal file
View File

@ -0,0 +1,45 @@
# standard imports
import logging
import os
# external imports
from shep import State
# local imports
from chainsyncer.store import SyncStore
logg = logging.getLogger(__name__)
class SyncMemStore(SyncStore):
def __init__(self, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncMemStore, self).__init__(None, session_id=session_id)
factory = None
self.setup_sync_state(factory, state_event_callback)
factory = None
self.setup_filter_state(factory, filter_state_event_callback)
def set_target(self, v):
self.target = int(v)
def get_target(self):
return self.target
def stop(self, item):
if item != None:
super(SyncMemStore, self).stop(item)
logg.info('I am an in-memory only state store. I am shutting down now, so all state will now be discarded.')
def save_filter_list(self):
pass
def load_filter_list(self):
return []

View File

@ -0,0 +1,79 @@
# standard imports
import uuid
import os
import logging
# external imports
from shep.store.rocksdb import RocksDbStoreFactory
# local imports
from chainsyncer.store import (
SyncItem,
SyncStore,
)
logg = logging.getLogger(__name__)
class RocksDbStoreAdder:
def __init__(self, factory, prefix):
self.factory = factory
self.prefix = prefix
def add(self, k):
path = os.path.join(self.prefix, k)
return self.factory.add(path)
def ls(self):
return self.factory.ls()
class SyncRocksDbStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncRocksDbStore, self).__init__(base_path, session_id=session_id)
self.factory = RocksDbStoreFactory(self.session_path, binary=True)
prefix_factory = RocksDbStoreAdder(self.factory, 'sync')
self.setup_sync_state(prefix_factory, state_event_callback)
prefix_factory = RocksDbStoreAdder(self.factory, 'filter')
self.setup_filter_state(prefix_factory, filter_state_event_callback)
#self.session_id = os.path.basename(self.session_path)
#logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
self.target_db = RocksDbStoreAdder(self.factory, '.stat').add('target')
def get_target(self):
v = self.target_db.get('target')
if v != None:
self.target = int(v)
def set_target(self, v):
self.target_db.put('target', str(v))
self.target = v
def stop(self, item):
if item != None:
super(SyncRocksDbStore, self).stop(item)
self.factory.close()
def save_filter_list(self):
fltr = []
for v in self.filters:
fltr.append(v.common_name())
self.target_db.put('filter_list', ','.join(fltr))
def load_filter_list(self):
v = self.target_db.get('filter_list')
v = v.decode('utf-8')
return v.split(',')

View File

@ -0,0 +1 @@
from .base import *

View File

@ -1,17 +1,73 @@
# standard imports
import os
import logging
import hashlib
# external imports
from hexathon import add_0x
from shep.state import State
# local imports
from chainsyncer.driver.history import HistorySyncer
#from chainsyncer.driver.history import HistorySyncer
from chainsyncer.error import NoBlockForYou
from chainsyncer.driver import SyncDriver
logging.STATETRACE = 5
logg = logging.getLogger().getChild(__name__)
def state_event_handler(k, v_old, v_new):
if v_old == None:
logg.log(logging.STATETRACE, 'sync state create key {}: -> {}'.format(k, v_new))
else:
logg.log(logging.STATETRACE, 'sync state change key {}: {} -> {}'.format(k, v_old, v_new))
def filter_state_event_handler(k, v_old, v_new):
if v_old == None:
logg.log(logging.STATETRACE, 'filter state create key {}: -> {}'.format(k, v_new))
else:
logg.log(logging.STATETRACE, 'filter state change key {}: {} -> {}'.format(k, v_old, v_new))
class MockFilterError(Exception):
pass
class MockBlockGenerator:
def __init__(self, offset=0):
self.blocks = {}
self.offset = offset
self.cursor = offset
def generate(self, spec=[], driver=None):
for v in spec:
txs = []
for i in range(v):
tx_hash = os.urandom(32).hex()
tx = MockTx(0, tx_hash)
txs.append(tx)
block = MockBlock(self.cursor, txs)
self.blocks[self.cursor] = block
self.cursor += 1
if driver != None:
self.apply(driver)
def apply(self, driver, offset=0):
block_numbers = list(self.blocks.keys())
for block_number in block_numbers:
if block_number < offset:
continue
block = self.blocks[block_number]
driver.add_block(block)
class MockConn:
"""Noop connection mocker.
@ -56,6 +112,7 @@ class MockBlock:
"""
self.number = number
self.txs = txs
self.hash = os.urandom(32).hex()
def tx(self, i):
@ -64,45 +121,158 @@ class MockBlock:
:param i: Transaction index
:type i: int
"""
return MockTx(i, self.txs[i])
return MockTx(i, self.txs[i].hash)
class TestSyncer(HistorySyncer):
"""Unittest extension of history syncer driver.
class MockStore(State):
:param backend: Syncer backend
:type backend: chainsyncer.backend.base.Backend implementation
:param chain_interface: Chain interface
:type chain_interface: chainlib.interface.ChainInterface implementation
:param tx_counts: List of integer values defining how many mock transactions to generate per block. Mock blocks will be generated for each element in list.
:type tx_counts: list
"""
def __init__(self, backend, chain_interface, tx_counts=[]):
self.tx_counts = tx_counts
super(TestSyncer, self).__init__(backend, chain_interface)
def __init__(self, bits=0):
super(MockStore, self).__init__(bits, check_alias=False)
def get(self, conn):
"""Implements the block getter of chainsyncer.driver.base.Syncer.
def start(self, offset=0, target=-1):
pass
:param conn: RPC connection
:type conn: chainlib.connection.RPCConnection
:raises NoBlockForYou: End of mocked block array reached
:rtype: chainsyncer.unittest.base.MockBlock
:returns: Mock block.
"""
(pair, fltr) = self.backend.get()
(target_block, fltr) = self.backend.target()
block_height = pair[0]
if block_height == target_block:
self.running = False
class MockFilter:
def __init__(self, name, brk=None, brk_hard=None, z=None):
self.name = name
if z == None:
h = hashlib.sha256()
h.update(self.name.encode('utf-8'))
z = h.digest()
self.z = z
self.brk = brk
self.brk_hard = brk_hard
self.contents = []
def sum(self):
return self.z
def common_name(self):
return self.name
def filter(self, conn, block, tx):
r = False
if self.brk_hard != None:
r = True
if self.brk_hard > 0:
r = True
self.brk_hard -= 1
if r:
raise MockFilterError()
if self.brk != None:
if self.brk > 0:
r = True
self.brk -= 1
self.contents.append((block.number, tx.index, tx.hash,))
logg.debug('filter {} result {} block {} tx {} {}'.format(self.common_name(), r, block.number, tx.index, tx.hash))
return r
class MockDriver(SyncDriver):
def __init__(self, store, offset=0, target=-1, interrupt_block=None, interrupt_tx=None, interrupt_global=False):
super(MockDriver, self).__init__(store, offset=offset, target=target)
self.blocks = {}
self.interrupt = None
if interrupt_block != None:
interrupt_block = int(interrupt_block)
if interrupt_tx == None:
interrupt_tx = 0
else:
interrupt_tx = int(interrupt_tx)
self.interrupt = (interrupt_block, interrupt_tx,)
self.interrupt_global = interrupt_global
def add_block(self, block):
logg.debug('add block {} {} with {} txs'.format(block.number, block.hash, len(block.txs)))
self.blocks[block.number] = block
def get(self, conn, item):
try:
return self.blocks[item.cursor]
except KeyError:
raise NoBlockForYou()
block_txs = []
if block_height < len(self.tx_counts):
for i in range(self.tx_counts[block_height]):
block_txs.append(add_0x(os.urandom(32).hex()))
return MockBlock(block_height, block_txs)
def process(self, conn, item, block):
i = item.tx_cursor
while self.running:
if self.interrupt != None:
if self.interrupt[0] == block.number and self.interrupt[1] == i:
logg.info('interrupt triggered at {}'.format(self.interrupt))
if self.interrupt_global:
SyncDriver.running_global = False
self.running = False
break
tx = block.tx(i)
self.process_single(conn, block, tx)
item.next()
i += 1
class MockChainInterface:
def block_by_number(self, number):
return ('block_by_number', number,)
def tx_by_hash(self, hsh):
return ('tx_by_hash', hsh,)
def block_from_src(self, src):
return src
def src_normalize(self, src):
return src
def tx_receipt(self, hsh):
return ('receipt', hsh,)
class MockChainInterfaceConn(MockConn):
def __init__(self, interface):
self.ifc = interface
self.blocks = {}
self.txs = {}
def add_block(self, block):
logg.debug('add block {} {} with {} txs'.format(block.number, block.hash, len(block.txs)))
self.blocks[block.number] = block
for tx in block.txs:
self.txs[tx.hash] = tx
def do(self, o):
m = getattr(self, 'handle_' + o[0])
return m(o[1])
def handle_block_by_number(self, number):
return self.blocks[number]
def handle_receipt(self, hsh):
return {}
class MockItem:
def __init__(self, target, offset, cursor, state_key):
self.target = target
self.offset = offset
self.cursor = cursor
self.state_key = state_key

View File

@ -1,64 +0,0 @@
# standard imports
import logging
import os
# external imports
import alembic
import alembic.config
# local imports
from chainsyncer.db.models.base import SessionBase
from chainsyncer.db import dsn_from_config
from chainsyncer.db.models.base import SessionBase
logg = logging.getLogger(__name__)
class ChainSyncerDb:
"""SQLITE database setup for unit tests
:param debug: Activate sql level debug (outputs sql statements)
:type debug: bool
"""
base = SessionBase
def __init__(self, debug=False):
config = {
'DATABASE_ENGINE': 'sqlite',
'DATABASE_DRIVER': 'pysqlite',
'DATABASE_NAME': 'chainsyncer.sqlite',
}
logg.debug('config {}'.format(config))
self.dsn = dsn_from_config(config)
self.base.poolable = False
self.base.transactional = False
self.base.procedural = False
self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0
rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..')
dbdir = os.path.join(rootdir, 'chainsyncer', 'db')
#migrationsdir = os.path.join(dbdir, 'migrations', config.get('DATABASE_ENGINE'))
migrationsdir = os.path.join(dbdir, 'migrations', 'default')
logg.info('using migrations directory {}'.format(migrationsdir))
ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', self.dsn)
ac.set_main_option('script_location', migrationsdir)
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
def bind_session(self, session=None):
"""Create session using underlying session base
"""
return self.base.bind_session(session)
def release_session(self, session=None):
"""Release session using underlying session base
"""
return self.base.release_session(session)

View File

@ -0,0 +1,301 @@
# standard imports
import os
import stat
import unittest
import shutil
import tempfile
import logging
import uuid
# local imports
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import (
MockFilter,
MockItem,
)
logging.STATETRACE = 5
logg = logging.getLogger(__name__)
logg.setLevel(logging.STATETRACE)
def state_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'state change: {} {} -> {}'.format(k, old_state, new_state))
def filter_change_callback(k, old_state, new_state):
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
class TestStoreBase(unittest.TestCase):
def setUp(self):
self.base_path = tempfile.mkdtemp()
self.session_id = str(uuid.uuid4())
self.path = os.path.join(self.base_path, self.session_id)
os.makedirs(self.path)
self.store_factory = None
self.persist = True
@classmethod
def link(cls, target):
for v in [
"default",
"store_start",
"store_resume",
"filter_list",
"sync_process_nofilter",
"sync_process_onefilter",
"sync_process_outoforder",
"sync_process_interrupt",
"sync_process_reset",
"sync_process_done",
"sync_head_future",
"sync_history_interrupted",
"sync_history_complete",
]:
setattr(target, 'test_' + v, getattr(cls, 't_' + v))
def tearDown(self):
shutil.rmtree(self.path)
def t_default(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
if store.session_path == None:
return
#fp = os.path.join(self.path, store.session_id)
fp = self.path
session_id = store.session_id
st = None
st = os.stat(fp)
if st != None:
self.assertTrue(stat.S_ISDIR(st.st_mode))
#self.assertTrue(store.is_default)
store.stop(bogus_item)
store = self.store_factory()
fpr = os.path.join(self.path, self.session_id)
self.assertEqual(fp, self.path)
def t_store_start(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
store.start(42)
self.assertTrue(store.first)
store.stop(bogus_item)
if self.persist:
store = self.store_factory()
store.start()
self.assertFalse(store.first)
def t_store_resume(self):
store = self.store_factory()
store.start(13)
self.assertTrue(store.first)
# todo not done
def t_sync_process_nofilter(self):
store = self.store_factory()
session = SyncSession(store)
session.start()
o = session.get(0)
with self.assertRaises(FilterDone):
o.advance()
def t_sync_process_onefilter(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start()
o = session.get(0)
o.advance()
o.release()
def t_sync_process_outoforder(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('two')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.advance()
o.release()
with self.assertRaises(LockError):
o.release()
o.advance()
o.release()
def t_sync_process_interrupt(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
o.release(interrupt=True)
with self.assertRaises(FilterDone):
o.advance()
def t_sync_process_reset(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
session.start()
o = session.get(0)
o.advance()
with self.assertRaises(LockError):
o.reset()
o.release()
with self.assertRaises(IncompleteFilterError):
o.reset()
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
def t_sync_process_done(self):
store = self.store_factory()
session = SyncSession(store)
fltr_one = MockFilter('foo')
store.register(fltr_one)
session.start(target=0)
o = session.get(0)
o.advance()
o.release()
with self.assertRaises(FilterDone):
o.advance()
o.reset()
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def t_sync_head_future(self):
store = self.store_factory('foo')
session = SyncSession(store)
session.start()
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
if self.persist:
store = self.store_factory('foo')
store.start()
o = store.get(2)
def t_sync_history_interrupted(self):
if not self.persist:
return
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory('foo')
session = SyncSession(store)
session.start(target=13)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
session.stop(o)
store.stop(bogus_item)
store = self.store_factory('foo')
store.start()
o = store.get(0)
self.assertEqual(o.cursor, 2)
self.assertEqual(o.target, 13)
o.next(advance_block=True)
o.next(advance_block=True)
store.stop(bogus_item)
store = self.store_factory('foo')
store.start()
self.assertEqual(o.cursor, 4)
self.assertEqual(o.target, 13)
def t_sync_history_complete(self):
store = self.store_factory('foo')
session = SyncSession(store)
session.start(target=3)
o = session.get(0)
o.next(advance_block=True)
o.next(advance_block=True)
o.next(advance_block=True)
with self.assertRaises(SyncDone):
o.next(advance_block=True)
def t_filter_list(self):
bogus_item = MockItem(0, 0, 0, 0)
store = self.store_factory()
if store.session_path == None:
return
fltr_one = MockFilter('foo_bar')
store.register(fltr_one)
fltr_two = MockFilter('bar_baz')
store.register(fltr_two)
store.start()
store.stop(bogus_item)
store = self.store_factory()
r = store.load_filter_list()
self.assertEqual(r[0], 'foo_bar')
self.assertEqual(r[1], 'bar_baz')

View File

@ -1,5 +1,5 @@
confini~=0.6.0
semver==2.13.0
hexathon~=0.1.5
chainlib>=0.1.0b1,<=0.1.0
shep~=0.0.1
chainlib~=0.1.1
shep~=0.2.3

View File

@ -8,5 +8,12 @@ for f in `ls tests/*.py`; do
exit
fi
done
for f in `ls tests/store/*.py`; do
python $f
if [ $? -gt 0 ]; then
exit
fi
done
set +x
set +e

View File

@ -1,10 +1,10 @@
[metadata]
name = chainsyncer
version = 0.3.0
version = 0.4.1
description = Generic blockchain syncer driver
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/chaintools/chainsyncer
url = https://gitlab.com/chaintool/chainsyncer
keywords =
cryptocurrency
classifiers =
@ -25,16 +25,17 @@ include_package_data = True
python_requires = >= 3.6
packages =
chainsyncer
chainsyncer.db
chainsyncer.db.models
chainsyncer.backend
chainsyncer.driver
chainsyncer.unittest
chainsyncer.store
chainsyncer.cli
chainsyncer.runnable
[options.package_data]
* =
sql/*
#[options.package_data]
#* =
# sql/*
#[options.entry_points]
#console_scripts =
# blocksync-celery = chainsyncer.runnable.tracker:main
[options.entry_points]
console_scripts =
#blocksync-celery = chainsyncer.runnable.tracker:main
chainsyncer-unlock = chainsyncer.runnable.unlock:main

View File

@ -26,5 +26,7 @@ setup(
install_requires=requirements,
extras_require={
'sql': sql_requirements,
'rocksdb': ['shep[rocksdb]~=0.2.2'],
'redis': ['shep[redis]~=0.2.2'],
}
)

33
tests/store/test_0_mem.py Normal file
View File

@ -0,0 +1,33 @@
# standard imports
import unittest
import logging
# external imports
from shep import State
# local imports
from chainsyncer.store.mem import SyncMemStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def create(self, session_id=None):
return SyncMemStore(session_id=session_id)
class TestMem(TestStoreBase):
def setUp(self):
super(TestMem, self).setUp()
self.store_factory = StoreFactory().create
self.persist = False
if __name__ == '__main__':
TestStoreBase.link(TestMem)
# Remove tests that test persistence of state
unittest.main()

32
tests/store/test_1_fs.py Normal file
View File

@ -0,0 +1,32 @@
# standard imports
import unittest
import logging
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def __init__(self, path):
self.path = path
def create(self, session_id=None):
return SyncFsStore(self.path, session_id=session_id)
class TestFs(TestStoreBase):
def setUp(self):
super(TestFs, self).setUp()
self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
TestStoreBase.link(TestFs)
unittest.main()

View File

@ -0,0 +1,35 @@
# standard imports
import unittest
import logging
# local imports
from chainsyncer.store.rocksdb import SyncRocksDbStore
from chainsyncer.unittest.store import (
TestStoreBase,
filter_change_callback,
state_change_callback,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class StoreFactory:
def __init__(self, path):
self.path = path
def create(self, session_id=None):
return SyncRocksDbStore(self.path, session_id=session_id, state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
class TestRocksDb(TestStoreBase):
def setUp(self):
super(TestRocksDb, self).setUp()
self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
TestStoreBase.link(TestRocksDb)
unittest.main()

View File

@ -1,72 +1,75 @@
# standard imporst
# standard imports
import unittest
# external imports
from shep import State
import tempfile
import shutil
import logging
# local imports
from chainsyncer.state import SyncState
from chainsyncer.session import SyncSession
from chainsyncer.filter import FilterState
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.unittest import (
MockStore,
MockFilter,
)
class MockStore(State):
def __init__(self, bits):
super(MockStore, self).__init__(bits, check_alias=False)
class MockFilter:
def __init__(self, z, name):
self.z = z
self.name = name
def sum(self):
return self.z
def common_name(self):
return self.name
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestSync(unittest.TestCase):
def setUp(self):
self.store = MockStore(6)
self.state = SyncState(self.store)
self.path = tempfile.mkdtemp()
self.store = SyncFsStore(self.path)
def tearDown(self):
shutil.rmtree(self.path)
def test_basic(self):
session = SyncSession(self.state)
self.assertTrue(session.is_default)
session = SyncSession(self.state, session_id='foo')
self.assertFalse(session.is_default)
store = MockStore(6)
state = FilterState(store)
session = SyncSession(state)
def test_sum(self):
store = MockStore(6)
state = FilterState(store)
b = b'\x2a' * 32
fltr = MockFilter(b, name='foo')
self.state.register(fltr)
fltr = MockFilter('foo', z=b)
state.register(fltr)
b = b'\x0d' * 31
fltr = MockFilter(b, name='bar')
fltr = MockFilter('bar', z=b)
with self.assertRaises(ValueError):
self.state.register(fltr)
state.register(fltr)
b = b'\x0d' * 32
fltr = MockFilter(b, name='bar')
self.state.register(fltr)
fltr = MockFilter('bar', z=b)
state.register(fltr)
v = self.state.sum()
v = state.sum()
self.assertEqual(v.hex(), 'a24abf9fec112b4e0210ae874b4a371f8657b1ee0d923ad6d974aef90bad8550')
def test_session_start(self):
session = SyncSession(self.state)
store = MockStore(6)
state = FilterState(store)
session = SyncSession(state)
session.start()
def test_state_dynamic(self):
store = MockStore()
state = FilterState(store)
b = b'\x0d' * 32
fltr = MockFilter(name='foo', z=b)
state.register(fltr)
if __name__ == '__main__':
unittest.main()

61
tests/test_driver.py Normal file
View File

@ -0,0 +1,61 @@
# standard imports
import unittest
import tempfile
import shutil
import logging
import stat
import os
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import (
MockBlockGenerator,
MockFilter,
MockChainInterfaceConn,
MockTx,
MockBlock,
MockChainInterface,
MockFilterError,
)
from chainsyncer.driver.chain_interface import ChainInterfaceDriver
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestFilter(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
self.store = SyncFsStore(self.path)
self.ifc = MockChainInterface()
self.conn = MockChainInterfaceConn(self.ifc)
def tearDown(self):
shutil.rmtree(self.path)
def test_driver(self):
generator = MockBlockGenerator()
generator.generate([1, 2], driver=self.conn)
drv = ChainInterfaceDriver(self.store, self.ifc, target=1)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
with self.assertRaises(SyncDone):
drv.run(self.conn)
self.assertEqual(len(fltr_one.contents), 3)
if __name__ == '__main__':
unittest.main()

78
tests/test_filter.py Normal file
View File

@ -0,0 +1,78 @@
# standard imports
import unittest
import tempfile
import shutil
import logging
import stat
import os
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
)
from chainsyncer.unittest import (
MockFilter,
MockConn,
MockTx,
MockBlock,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestFilter(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
self.store = SyncFsStore(self.path)
self.session = SyncSession(self.store)
self.conn = MockConn()
def tearDown(self):
shutil.rmtree(self.path)
def test_filter_basic(self):
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
fltr_two = MockFilter('bar')
self.store.register(fltr_two)
self.session.start()
tx_hash = os.urandom(32).hex()
tx = MockTx(42, tx_hash)
block = MockBlock(13, [tx_hash])
self.session.filter(self.conn, block, tx)
self.assertEqual(len(fltr_one.contents), 1)
self.assertEqual(len(fltr_two.contents), 1)
def test_filter_interrupt(self):
fltr_one = MockFilter('foo', brk=True)
self.store.register(fltr_one)
fltr_two = MockFilter('bar')
self.store.register(fltr_two)
self.session.start()
tx_hash = os.urandom(32).hex()
tx = MockTx(42, tx_hash)
block = MockBlock(13, [tx_hash])
self.session.filter(self.conn, block, tx)
self.assertEqual(len(fltr_one.contents), 1)
self.assertEqual(len(fltr_two.contents), 0)
if __name__ == '__main__':
unittest.main()

155
tests/test_session.py Normal file
View File

@ -0,0 +1,155 @@
# standard imports
import unittest
import tempfile
import shutil
import logging
import stat
import os
# local imports
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.session import SyncSession
from chainsyncer.error import (
LockError,
FilterDone,
IncompleteFilterError,
SyncDone,
)
from chainsyncer.unittest import (
MockBlockGenerator,
MockFilter,
MockConn,
MockTx,
MockBlock,
MockDriver,
MockFilterError,
state_event_handler,
filter_state_event_handler,
)
from chainsyncer.driver import SyncDriver
logging.basicConfig(level=logging.STATETRACE)
logg = logging.getLogger()
class TestFilter(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
self.store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
self.conn = MockConn()
# def tearDown(self):
# shutil.rmtree(self.path)
def test_filter_basic(self):
session = SyncSession(self.store)
session.start(target=1)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
tx_hash = os.urandom(32).hex()
tx = MockTx(42, tx_hash)
block = MockBlock(0, [tx_hash])
session.filter(self.conn, block, tx)
tx_hash = os.urandom(32).hex()
tx = MockTx(42, tx_hash)
block = MockBlock(1, [tx_hash])
session.filter(self.conn, block, tx)
self.assertEqual(len(fltr_one.contents), 2)
def test_driver(self):
drv = MockDriver(self.store, target=1)
generator = MockBlockGenerator()
generator.generate([1, 2], driver=drv)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
with self.assertRaises(SyncDone):
drv.run(self.conn)
self.assertEqual(len(fltr_one.contents), 3)
def test_driver_interrupt_noresume(self):
drv = MockDriver(self.store, target=1)
generator = MockBlockGenerator()
generator.generate([1], driver=drv)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
fltr_two = MockFilter('bar', brk_hard=1)
self.store.register(fltr_two)
with self.assertRaises(MockFilterError):
drv.run(self.conn)
self.assertEqual(len(fltr_one.contents), 1)
self.assertEqual(len(fltr_two.contents), 0)
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
fltr_one = MockFilter('foo') #, brk_hard=1)
store.register(fltr_one)
fltr_two = MockFilter('bar')
store.register(fltr_two)
with self.assertRaises(LockError):
drv = MockDriver(store, target=1)
self.assertEqual(len(fltr_one.contents), 0)
self.assertEqual(len(fltr_two.contents), 0)
def test_driver_interrupt_filter(self):
drv = MockDriver(self.store, target=1)
generator = MockBlockGenerator()
generator.generate([1, 1], driver=drv)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
fltr_two = MockFilter('bar', brk=1)
self.store.register(fltr_two)
fltr_three = MockFilter('baz')
self.store.register(fltr_three)
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
with self.assertRaises(SyncDone):
drv.run(self.conn)
self.assertEqual(len(fltr_one.contents), 2)
self.assertEqual(len(fltr_two.contents), 2)
self.assertEqual(len(fltr_three.contents), 1)
def test_driver_interrupt_sync(self):
drv = MockDriver(self.store, interrupt_block=1, target=2)
generator = MockBlockGenerator()
generator.generate([3, 1, 2], driver=drv)
fltr_one = MockFilter('foo')
self.store.register(fltr_one)
drv.run(self.conn, interval=0.1)
self.assertEqual(len(fltr_one.contents), 3)
store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
store.register(fltr_one)
drv = MockDriver(store)
generator.apply(drv, offset=1)
with self.assertRaises(SyncDone) as e:
drv.run(self.conn, interval=0.1)
self.assertEqual(e, 2)
self.assertEqual(len(fltr_one.contents), 6)
if __name__ == '__main__':
unittest.main()