Compare commits

15 Commits

Author SHA1 Message Date
lash
bc135759e8 Bump minor vesrion 2022-05-14 16:25:35 +00:00
lash
cad695c676 Update config handlnig 2022-05-14 16:14:16 +00:00
lash
1adecdde2d Remove redundant backend setter code 2022-05-13 13:45:41 +00:00
lash
4e7ed45e80 Remove obsolete cli init file 2022-05-13 10:34:36 +00:00
lash
576e62507b Make arg and flag preparations stateless 2022-05-13 10:33:26 +00:00
lash
88870dc12d Implement chainlib 0.3.0 structure 2022-05-13 07:12:27 +00:00
lash
7d5ceb9a28 Bump version 2022-05-10 18:35:13 +00:00
lash
21db658575 Add missing single flag 2022-05-10 18:30:28 +00:00
lash
909b85b2b7 Upgrade deps 2022-05-09 19:43:36 +00:00
lash
2f9663a8f8 Fix faulty new fs store path create 2022-05-07 11:28:13 +00:00
lash
637ead1a38 Improve log output for unlocking tool 2022-05-07 11:27:01 +00:00
lash
412018fc64 Upgrade shep to avoid sync in persist set 2022-05-05 17:05:39 +00:00
lash
95663621bc Update changelog 2022-05-05 15:43:54 +00:00
lash
0726f7a730 Upgrade shep, handle exception in filestore list in shep 2022-05-05 15:39:18 +00:00
lash
2c8ad85307 Upgrade chainlib, shep (state lock integrity) 2022-05-05 14:49:34 +00:00
12 changed files with 78 additions and 82 deletions

View File

@@ -1,3 +1,12 @@
* 0.4.8
- Add unlock action description to info loglevel for unlock tool
* 0.4.7
- Upgrade shep to avoid sync in persist set
* 0.4.6
- Upgrade shep to handle filesystem list exception
* 0.4.5
- Upgrade chainlib
- Upgrade shep to guarantee atomic state locks
* 0.4.4
- Reinstate tx index bump in sync state on filter execution complete
* 0.4.3

1
chainsyncer/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .filter import SyncFilter

View File

@@ -1,12 +0,0 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

View File

@@ -1,14 +1,14 @@
# local imports
from .base import SyncFlag
def apply_flag(flag):
flag.add('range')
flag.add('head')
flag.alias('sync_range_ext', 'range', 'head')
return flag
def process_flags(argparser, flags):
if flags & SyncFlag.RANGE > 0:
argparser.add_argument('--offset', type=int, help='Block to start sync from. Default is start of history (0).')
argparser.add_argument('--until', type=int, default=-1, help='Block to stop sync on. Default is stop at block height of first run.')
if flags & SyncFlag.HEAD > 0:
argparser.add_argument('--head', action='store_true', help='Start from latest block as offset')
argparser.add_argument('--keep-alive', action='store_true', help='Do not stop syncing when caught up')
argparser.add_argument('--backend', type=str, help='Backend to use for state store')
def apply_arg(arg):
arg.add_long('offset', 'range', typ=int, help='Block to start sync from. Default is start of history (0).')
arg.add_long('until', 'range', typ=int, default=-1, help='Block to stop sync on. Default is stop at block height of first run.')
arg.add_long('single', 'range', typ=bool, help='Execute a single sync, regardless of previous states')
arg.add_long('head', 'head', typ=bool, help='Start from latest block as offset')
arg.add_long('keep-alive', 'head', typ=bool, help='Do not stop syncing when caught up')
return arg

View File

@@ -1,20 +1,18 @@
# external imports
from chainsyncer.cli import SyncFlag
def process_config(config, args, flags):
def process_config(config, arg, args, flags):
args_override = {}
args_override['SYNCER_BACKEND'] = getattr(args, 'backend')
if flags & SyncFlag.RANGE:
if arg.match('range', flags):
args_override['SYNCER_OFFSET'] = getattr(args, 'offset')
args_override['SYNCER_LIMIT'] = getattr(args, 'until')
config.dict_override(args_override, 'local cli args')
if flags & SyncFlag.HEAD:
if arg.match('head', flags):
config.add(getattr(args, 'keep_alive'), '_KEEP_ALIVE')
config.add(getattr(args, 'head'), '_HEAD')
config.add(getattr(args, 'single'), '_SINGLE')
return config

View File

@@ -97,7 +97,6 @@ class FilterState:
if self.scan != None:
ks = self.scan()
for v in ks: #os.listdir(self.scan_path):
logg.debug('ks {}'.format(v))
k = None
try:
k = self.state_store.from_elements(v)

View File

@@ -1 +0,0 @@

View File

@@ -11,45 +11,40 @@ from chainlib.settings import ChainSettings
logg = logging.getLogger(__name__)
class ChainsyncerSettings(ChainSettings):
def process_sync_range(settings, config):
o = settings.get('SYNCER_INTERFACE').block_latest()
r = settings.get('CONN').do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
def process_sync_backend(self, config):
self.o['SYNCER_BACKEND'] = config.get('SYNCER_BACKEND')
keep_alive = False
session_block_offset = 0
block_limit = 0
until = 0
if config.true('_HEAD'):
settings.set('SYNCER_OFFSET', block_offset)
settings.set('SYNCER_LIMIT', -1)
return settings
def process_sync_range(self, config):
o = self.o['SYNCER_INTERFACE'].block_latest()
r = self.o['RPC'].do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
session_block_offset = int(config.get('SYNCER_OFFSET'))
until = int(config.get('SYNCER_LIMIT'))
keep_alive = False
session_block_offset = 0
block_limit = 0
until = 0
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
elif until == -1:
keep_alive = True
if config.true('_HEAD'):
self.o['SYNCER_OFFSET'] = block_offset
self.o['SYNCER_LIMIT'] = -1
return
if session_block_offset == -1:
session_block_offset = block_offset
elif config.true('_KEEP_ALIVE'):
block_limit = -1
else:
if block_limit == 0:
block_limit = block_offset
session_block_offset = int(config.get('SYNCER_OFFSET'))
until = int(config.get('SYNCER_LIMIT'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
elif until == -1:
keep_alive = True
if session_block_offset == -1:
session_block_offset = block_offset
elif config.true('_KEEP_ALIVE'):
block_limit = -1
else:
if block_limit == 0:
block_limit = block_offset
self.o['SYNCER_OFFSET'] = session_block_offset
self.o['SYNCER_LIMIT'] = block_limit
settings.set('SYNCER_OFFSET', session_block_offset)
settings.set('SYNCER_LIMIT', block_limit)
return settings

View File

@@ -43,7 +43,6 @@ class SyncItem:
self.filter_state = filter_state
self.state_key = str(offset)
logg.debug('get key {}'.format(self.state_key))
v = self.sync_state.get(self.state_key)
(self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
@@ -101,10 +100,6 @@ class SyncItem:
self.sync_state.replace(self.state_key, b)
def __find_advance(self):
v = self.filter_state.state(self.state_key)
def advance(self, ignore_lock=False):
if self.skip_filter:
raise FilterDone()
@@ -267,7 +262,9 @@ class SyncStore:
self.item_keys.append(k)
logg.info('added existing {}'.format(o))
self.get_target()
v = self.get_target()
if v != None:
target = v
if len(thresholds) == 0:
if self.target != None:
@@ -332,7 +329,8 @@ class SyncStore:
if locked_item_key == None:
return False
locked_item = self.get(locked_item_key)
locked_state = self.filter_state.state(locked_item_key) - self.filter_state.state_store.LOCK
state = self.filter_state.state(locked_item_key)
locked_state = state - self.filter_state.state_store.LOCK
locked_state_name = self.filter_state.name(locked_state)
logg.debug('found locked item {} in state {}'.format(locked_item, locked_state))
@@ -342,10 +340,17 @@ class SyncStore:
if i == -1:
raise FilterInitializationError('locked state {} ({}) found for item {}, but matching filter has not been registered'.format(locked_state_name, locked_state, locked_item))
direction = None
if revert:
self.__unlock_previous(locked_item, fltrs, i)
new_state = self.filter_state.state(locked_item_key)
direction = 'previous'
else:
self.__unlock_next(locked_item, fltrs, i)
new_state = self.filter_state.state(locked_item_key)
direction = 'next'
logg.info('chainstate unlock to {} {} ({}) -> {} ({})'.format(direction, self.filter_state.name(state), state, self.filter_state.name(new_state), new_state))
return True

View File

@@ -24,7 +24,8 @@ class SyncFsStore(SyncStore):
create_path = True
if create_path:
self.__create_path(base_path, self.default_path, session_id=session_id)
#self.__create_path(base_path, self.default_path, session_id=session_id)
os.makedirs(self.session_path)
self.session_id = os.path.basename(self.session_path)
logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))

View File

@@ -1,5 +1,5 @@
confini~=0.6.0
confini~=0.6.1
semver==2.13.0
hexathon~=0.1.5
chainlib~=0.1.1
shep~=0.2.3
hexathon~=0.1.7
chainlib~=0.3.0
shep~=0.2.9

View File

@@ -1,6 +1,6 @@
[metadata]
name = chainsyncer
version = 0.4.4
version = 0.5.0
description = Generic blockchain syncer driver
author = Louis Holbrook
author_email = dev@holbrook.no
@@ -30,6 +30,7 @@ packages =
chainsyncer.store
chainsyncer.cli
chainsyncer.runnable
chainsyncer.data
#[options.package_data]
#* =