25 Commits

Author SHA1 Message Date
lash
443e64f3e9 Add chain spec parts to session paths 2022-05-15 11:32:42 +00:00
lash
bb5e0b359f Bump deps 2022-05-14 16:24:03 +00:00
lash
8bc49e7408 Bump minor version 2022-05-14 16:22:40 +00:00
lash
c83aade144 Add data config path module 2022-05-14 12:40:21 +00:00
lash
63a1d07a28 Update dir arg destinations in config handle 2022-05-14 12:39:53 +00:00
lash
6da2a1ced9 Update config processing 2022-05-14 12:24:24 +00:00
lash
9f2a791b1f Update state config names, remove dead code 2022-05-13 13:47:11 +00:00
lash
ba894044d8 Remove redundant chaind init file 2022-05-13 10:35:21 +00:00
lash
e5a18d1abf Make flag and arg preparations stateless 2022-05-13 10:32:56 +00:00
lash
139e2772af Implement settings processing on chainlib 0.3.0 structure 2022-05-13 09:40:18 +00:00
lash
0a5818ebf1 Update settings for queue and syncer 2022-05-10 06:13:56 +00:00
lash
5d2d73fa64 Upgrade chainqueue, chainsyncer 2022-05-09 19:44:28 +00:00
lash
dd1879bb91 Implement mocks on generic tx, block 2022-05-09 19:25:31 +00:00
lash
e264ed5c37 Upgrade deps, to avoid shep sync on persist set 2022-05-05 17:08:09 +00:00
lash
d472bd4f7c Upgrade deps, handle filestore list exception in shep 2022-05-05 15:41:30 +00:00
lash
465d692956 Upgrade deps 2022-05-05 15:02:21 +00:00
lash
81c1207828 Recreate adapter only per block 2022-05-05 12:09:07 +00:00
lash
f33ba13d74 Upgrade chainsyncer 2022-05-05 08:00:14 +00:00
lash
5459d4c3f8 Upgrade deps 2022-05-04 18:22:29 +00:00
lash
3e05717395 Add error line for rpc fails in dispatch 2022-05-04 06:44:40 +00:00
lash
54d10ee40b Upgrade chainqueue 2022-05-04 05:43:39 +00:00
lash
9cffdc5867 Factor out dispatch processor from chain implementation 2022-05-04 05:38:01 +00:00
lash
4f96be2024 Upgrade chainqueue 2022-05-03 17:22:59 +00:00
lash
32e1bc6aa5 Correct purge call, add missing lock module 2022-05-02 20:10:30 +00:00
lash
387014f77b Purge items from memory state on final 2022-05-02 20:05:41 +00:00
17 changed files with 331 additions and 199 deletions

View File

@@ -1,3 +1,24 @@
- 0.3.0
* Implement on chainlib 0.3.0
- 0.2.12
* Breaking upgrade of chainlib.
* Implement generic block and tx.
- 0.2.11
* Upgrade shep to handle exception in filestore list
- 0.2.10
* Upgrade shep to guarantee state lock atomicity
- 0.2.9
* Minimize instantiations of adapters in filter execution
- 0.2.8
* Upgrade chainsyncer
- 0.2.7
* Upgrade chainlib
- 0.2.6
* Deps upgrade
- 0.2.5
* Deps upgrade
- 0.2.4
* Allow omission of state store sync in queue store backend
- 0.2.2 - 0.2.2
* Fix missing symbol crashes related to race conditions * Fix missing symbol crashes related to race conditions
- 0.2.1 - 0.2.1

View File

@@ -13,13 +13,13 @@ logg = logging.getLogger(__name__)
class ChaindAdapter: class ChaindAdapter:
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0): def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, store_sync=True):
self.cache_adapter = cache_adapter self.cache_adapter = cache_adapter
self.dispatcher = dispatcher self.dispatcher = dispatcher
store_lock = StoreLock() store_lock = StoreLock()
while True: while True:
try: try:
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache) self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache, sync=store_sync)
break break
except FileNotFoundError as e: except FileNotFoundError as e:
logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(e)) logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(e))

View File

@@ -26,13 +26,13 @@ logg = logging.getLogger(__name__)
class ChaindFsAdapter(ChaindAdapter): class ChaindFsAdapter(ChaindAdapter):
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None): def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None, store_sync=True):
factory = SimpleFileStoreFactory(path, use_lock=True).add factory = SimpleFileStoreFactory(path, use_lock=True).add
state_store = Status(factory, allow_invalid=True, event_callback=event_callback) state_store = Status(factory, allow_invalid=True, event_callback=event_callback)
index_path = os.path.join(path, 'tx') index_path = os.path.join(path, 'tx')
index_store = IndexStore(index_path, digest_bytes=digest_bytes) index_store = IndexStore(index_path, digest_bytes=digest_bytes)
counter_store = CounterStore(path) counter_store = CounterStore(path)
super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold) super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold, store_sync=store_sync)
def put(self, signed_tx): def put(self, signed_tx):
@@ -51,11 +51,11 @@ class ChaindFsAdapter(ChaindAdapter):
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e)) logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None return None
except FileNotFoundError as e: except FileNotFoundError as e:
logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e)) logg.debug('queuestore get (file missing) {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again() store_lock.again()
continue continue
except StateLockedKey as e: except StateLockedKey as e:
logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e)) logg.debug('queuestore get (statelock) {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again() store_lock.again()
continue continue
@@ -90,12 +90,19 @@ class ChaindFsAdapter(ChaindAdapter):
def succeed(self, block, tx): def succeed(self, block, tx):
if self.store.is_reserved(tx.hash): if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash) raise QueueLockError(tx.hash)
r = self.store.final(tx.hash, block, tx, error=False)
return self.store.final(tx.hash, block, tx, error=False) (k, v) = self.store.get(tx.hash)
self.store.purge(k)
return r
def fail(self, block, tx): def fail(self, block, tx):
return self.store.final(tx.hash, block, tx, error=True) if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
r = self.store.final(tx.hash, block, tx, error=True)
(k, v) = self.store.get(tx.hash)
self.store.purge(k)
return r
def sendfail(self): def sendfail(self):
@@ -128,7 +135,8 @@ class ChaindFsAdapter(ChaindAdapter):
r = None r = None
try: try:
r = self.dispatcher.send(tx_wire) r = self.dispatcher.send(tx_wire)
except RPCException: except RPCException as e:
logg.error('dispatch send failed for {}: {}'.format(tx_hash, e))
self.store.fail(tx_hash) self.store.fail(tx_hash)
return False return False

View File

@@ -1,12 +0,0 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

View File

@@ -1,17 +1,19 @@
# local imports def apply_flag(flag):
from .base import ChaindFlag flag.add('session')
flag.add('dispatch')
flag.add('socket')
flag.add('socket_client')
flag.add('token')
def process_flags(argparser, flags): flag.alias('chaind_base', 'session')
if flags & ChaindFlag.SESSION > 0: flag.alias('chaind_socket_client', 'session', 'socket', 'socket_client')
argparser.add_argument('--session-id', dest='session_id', type=str, help='Session to store state and data under')
argparser.add_argument('--runtime-dir', dest='runtime_dir', type=str, help='Directory to store volatile data')
argparser.add_argument('--data-dir', dest='data_dir', type=str, help='Directory to store persistent data')
if flags & ChaindFlag.SOCKET > 0: return flag
argparser.add_argument('--socket-path', dest='socket', type=str, help='UNIX socket path')
if flags & ChaindFlag.SOCKET_CLIENT > 0:
argparser.add_argument('--send-socket', dest='socket_send', action='store_true', help='Send to UNIX socket')
if flags & ChaindFlag.TOKEN > 0: def apply_arg(arg):
argparser.add_argument('--token-module', dest='token_module', type=str, help='Python module path to resolve tokens from identifiers') arg.add_long('session-id', 'session', help='Session to store state and data under')
arg.add_long('socket-path', 'socket', help='UNIX socket path')
arg.add_long('send-socket', 'socket_client', typ=bool, help='Send to UNIX socket')
arg.add_long('token-module', 'token', help='Python module path to resolve tokens from identifiers')
return arg

View File

@@ -1,23 +1,19 @@
# external imports def process_config(config, arg, args, flags):
from chaind.cli import ChaindFlag
def process_config(config, args, flags):
args_override = {} args_override = {}
if flags & ChaindFlag.SESSION: if arg.match('session', flags):
args_override['SESSION_ID'] = getattr(args, 'session_id') args_override['SESSION_ID'] = getattr(args, 'session_id')
args_override['SESSION_RUNTIME_DIR'] = getattr(args, 'runtime_dir') args_override['SESSION_RUNTIME_DIR'] = getattr(args, 'runtime_path')
args_override['SESSION_DATA_DIR'] = getattr(args, 'data_dir') args_override['SESSION_DATA_DIR'] = getattr(args, 'state_path')
if flags & ChaindFlag.SOCKET: if arg.match('socket', flags):
args_override['SESSION_SOCKET_PATH'] = getattr(args, 'socket') args_override['SESSION_SOCKET_PATH'] = getattr(args, 'socket')
if flags & ChaindFlag.TOKEN: if arg.match('token', flags):
args_override['TOKEN_MODULE'] = getattr(args, 'token_module') args_override['TOKEN_MODULE'] = getattr(args, 'token_module')
config.dict_override(args_override, 'local cli args') config.dict_override(args_override, 'local cli args')
if flags & ChaindFlag.SOCKET_CLIENT: if arg.match('socket_client', flags):
config.add(getattr(args, 'socket_send'), '_SOCKET_SEND', False) config.add(getattr(args, 'send_socket'), '_SOCKET_SEND', False)
return config return config

6
chaind/data/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
# standard imports
import os
data_dir = os.path.realpath(os.path.dirname(__file__))
config_dir = os.path.join(data_dir, 'config')

View File

@@ -1,5 +1,5 @@
[session] [session]
socket_path = socket_path =
runtime_dir = runtime_path =
id = id =
data_dir = data_path =

33
chaind/dispatch.py Normal file
View File

@@ -0,0 +1,33 @@
# standard imports
import logging
# local ipmorts
from chaind.adapters.fs import ChaindFsAdapter
from chaind.eth.cache import EthCacheTx
logg = logging.getLogger(__name__)
class DispatchProcessor:
def __init__(self, chain_spec, queue_dir, dispatcher):
self.dispatcher = dispatcher
self.chain_spec = chain_spec,
self.queue_dir = queue_dir
def process(self, rpc, limit=50):
adapter = ChaindFsAdapter(
self.chain_spec,
self.queue_dir,
EthCacheTx,
self.dispatcher,
)
upcoming = adapter.upcoming(limit=limit)
logg.info('processor has {} candidates for {}, processing with limit {}'.format(len(upcoming), self.chain_spec, limit))
i = 0
for tx_hash in upcoming:
if adapter.dispatch(tx_hash):
i += 1
return i

View File

@@ -26,27 +26,51 @@ class StateFilter(SyncFilter):
self.adapter_path = adapter_path self.adapter_path = adapter_path
self.tx_adapter = tx_adapter self.tx_adapter = tx_adapter
self.throttler = throttler self.throttler = throttler
self.last_block_height = 0
self.adapter = None
self.store_lock = None
def __get_adapter(self, block, force_reload=False):
if self.store_lock == None:
self.store_lock = StoreLock()
reload = False
if block.number != self.last_block_height:
reload = True
elif self.adapter == None:
reload = True
elif force_reload:
reload = True
self.last_block_height = block.number
if reload:
while True:
logg.info('reloading adapter')
try:
self.adapter = ChaindFsAdapter(
self.chain_spec,
self.adapter_path,
self.tx_adapter,
None,
)
break
except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
self.store_lock.again()
continue
return self.adapter
def filter(self, conn, block, tx, session=None): def filter(self, conn, block, tx, session=None):
cache_tx = None cache_tx = None
store_lock = StoreLock() queue_adapter = self.__get_adapter(block)
queue_adapter = None
self.store_lock.reset()
while True: while True:
try:
queue_adapter = ChaindFsAdapter(
self.chain_spec,
self.adapter_path,
self.tx_adapter,
None,
)
except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
store_lock.again()
continue
store_lock.reset()
try: try:
cache_tx = queue_adapter.get(tx.hash) cache_tx = queue_adapter.get(tx.hash)
break break
@@ -54,15 +78,16 @@ class StateFilter(SyncFilter):
logg.debug('skipping not local transaction {}'.format(tx.hash)) logg.debug('skipping not local transaction {}'.format(tx.hash))
return False return False
except BackendError as e: except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e)) logg.error('adapter get failed: {}, one more try'.format(e))
queue_adapter = None self.store_lock.again()
store_lock.again() queue_adapter = self.__get_adapter(block, force_reload=True)
continue continue
if cache_tx == None: if cache_tx == None:
raise NotLocalTxError(tx.hash) raise NotLocalTxError(tx.hash)
store_lock = StoreLock() self.store_lock.reset()
queue_lock = StoreLock(error=QueueLockError) queue_lock = StoreLock(error=QueueLockError)
while True: while True:
try: try:
@@ -76,18 +101,21 @@ class StateFilter(SyncFilter):
queue_lock.again() queue_lock.again()
except FileNotFoundError as e: except FileNotFoundError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again() self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue continue
except NotLocalTxError as e: except NotLocalTxError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again() self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue continue
except StateLockedKey as e: except StateLockedKey as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again() self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue continue
logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block)) logg.info('filter registered {} for {} in {}'.format(tx.status_name, tx.hash, block))
if self.throttler != None: if self.throttler != None:
self.throttler.dec(tx.hash) self.throttler.dec(tx.hash)

34
chaind/lock.py Normal file
View File

@@ -0,0 +1,34 @@
# standard imports
import time
# local imports
from .error import BackendError
BASE_DELAY = 0.01
BASE_DELAY_LIMIT = 10.0
class StoreLock:
def __init__(self, delay=BASE_DELAY, delay_limit=BASE_DELAY_LIMIT, error=BackendError, description=None):
self.base_delay = delay
self.delay = delay
self.delay_limit = delay_limit
self.error = error
self.description = description
def again(self, e=None):
if self.delay > self.delay_limit:
err = None
if e != None:
err = str(e)
else:
err = self.description
raise self.error(err)
time.sleep(self.delay)
self.delay *= 2
def reset(self):
self.delay = self.base_delay

View File

@@ -22,7 +22,7 @@ logg = logging.getLogger(__name__)
class SessionController: class SessionController:
def __init__(self, config, adapter, processor): def __init__(self, config, processor):
self.dead = False self.dead = False
os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True) os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True)
try: try:
@@ -37,7 +37,6 @@ class SessionController:
self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY'))) self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY')))
self.processor = processor self.processor = processor
self.chain_spec = config.get('CHAIN_SPEC') self.chain_spec = config.get('CHAIN_SPEC')
self.adapter = adapter
def shutdown(self, signo, frame): def shutdown(self, signo, frame):
@@ -65,7 +64,7 @@ class SessionController:
r = None r = None
while True: while True:
try: try:
r = self.processor(self.chain_spec, self.adapter, conn) r = self.processor(conn)
break break
except BackendError as e: except BackendError as e:
state_lock.again(e) state_lock.again(e)

View File

@@ -4,122 +4,123 @@ import os
import uuid import uuid
# external imports # external imports
from chainsyncer.settings import ChainsyncerSettings from chainlib.settings import ChainSettings
from chainqueue.settings import ChainqueueSettings from chainqueue.settings import *
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class ChaindSettings(ChainsyncerSettings, ChainqueueSettings): class ChaindSettings(ChainSettings):
def __init__(self, include_sync=False, include_queue=False): def __init__(settings, include_sync=False, include_queue=False):
super(ChaindSettings, self).__init__() super(ChaindSettings, settings).__init__()
self.include_sync = include_sync settings.include_sync = include_sync
self.include_queue = include_queue settings.include_queue = include_queue
def process_session(self, config):
session_id = config.get('SESSION_ID')
base_dir = os.getcwd()
data_dir = config.get('SESSION_DATA_DIR')
if data_dir == None:
data_dir = os.path.join(base_dir, '.chaind', 'chaind', self.o.get('CHAIND_BACKEND'))
data_engine_dir = os.path.join(data_dir, config.get('CHAIND_ENGINE'))
os.makedirs(data_engine_dir, exist_ok=True)
# check if existing session
if session_id == None:
fp = os.path.join(data_engine_dir, 'default')
try:
os.stat(fp)
fp = os.path.realpath(fp)
except FileNotFoundError:
fp = None
if fp != None:
session_id = os.path.basename(fp)
make_default = False
if session_id == None:
session_id = str(uuid.uuid4())
make_default = True
# create the session persistent dir
session_dir = os.path.join(data_engine_dir, session_id)
if make_default:
fp = os.path.join(data_engine_dir, 'default')
os.symlink(session_dir, fp)
#data_dir = os.path.join(session_dir, config.get('CHAIND_COMPONENT'))
data_dir = session_dir
os.makedirs(data_dir, exist_ok=True)
# create volatile dir
uid = os.getuid()
runtime_dir = config.get('SESSION_RUNTIME_DIR')
if runtime_dir == None:
runtime_dir = os.path.join('/run', 'user', str(uid), 'chaind', self.o.get('CHAIND_BACKEND'))
#runtime_dir = os.path.join(runtime_dir, config.get('CHAIND_ENGINE'), session_id, config.get('CHAIND_COMPONENT'))
runtime_dir = os.path.join(runtime_dir, config.get('CHAIND_ENGINE'), session_id)
os.makedirs(runtime_dir, exist_ok=True)
self.o['SESSION_RUNTIME_DIR'] = runtime_dir
self.o['SESSION_DIR'] = session_dir
self.o['SESSION_DATA_DIR'] = data_dir
self.o['SESSION_ID'] = session_id
def process_sync_interface(self, config):
raise NotImplementedError('no sync interface implementation defined')
def process_sync(self, config):
self.process_sync_interface(config)
self.process_sync_range(config)
def process_socket(self, config):
socket_path = config.get('SESSION_SOCKET_PATH')
if socket_path == None:
socket_path = os.path.join(self.o['SESSION_RUNTIME_DIR'], 'chaind.sock')
self.o['SESSION_SOCKET_PATH'] = socket_path
def process_dispatch(self, config):
self.o['SESSION_DISPATCH_DELAY'] = 0.01
def process_token(self, config):
self.o['TOKEN_MODULE'] = config.get('TOKEN_MODULE')
def process_backend(self, config):
if self.include_sync and self.include_queue:
if self.o['QUEUE_BACKEND'] != self.o['SYNCER_BACKEND']:
raise ValueError('queue and syncer backends must match. queue "{}" != syncer "{}"'.format(self.o['QUEUE_BACKEND'], self.o['SYNCER_BACKEND']))
self.o['CHAIND_BACKEND'] = self.o['SYNCER_BACKEND']
elif self.include_sync:
self.o['CHAIND_BACKEND'] = self.o['SYNCER_BACKEND']
elif self.include_queue:
self.o['CHAIND_BACKEND'] = self.o['QUEUE_BACKEND']
else:
raise ValueError('at least one backend must be set')
def process(self, config):
super(ChaindSettings, self).process(config)
if self.include_sync:
self.process_sync(config)
self.process_sync_backend(config)
if self.include_queue:
self.process_queue_backend(config)
self.process_dispatch(config)
self.process_token(config)
self.process_backend(config)
self.process_session(config)
self.process_socket(config)
def dir_for(self, k): def dir_for(self, k):
return os.path.join(self.o['SESSION_DIR'], k) return os.path.join(self.o['SESSION_PATH'], k)
def process_session(settings, config):
session_id = config.get('SESSION_ID')
base_dir = os.getcwd()
data_dir = config.get('SESSION_DATA_PATH')
if data_dir == None:
data_dir = os.path.join(base_dir, '.chaind', 'chaind', settings.get('CHAIND_BACKEND'))
data_engine_dir = os.path.join(data_dir, config.get('CHAIND_ENGINE'))
os.makedirs(data_engine_dir, exist_ok=True)
# check if existing session
if session_id == None:
fp = os.path.join(data_engine_dir, 'default')
try:
os.stat(fp)
fp = os.path.realpath(fp)
except FileNotFoundError:
fp = None
if fp != None:
session_id = os.path.basename(fp)
make_default = False
if session_id == None:
session_id = str(uuid.uuid4())
make_default = True
chain_spec = settings.get('CHAIN_SPEC')
network_id_str = str(chain_spec.network_id())
# create the session persistent dir
session_path = os.path.join(
data_engine_dir,
chain_spec.arch(),
chain_spec.fork(),
network_id_str,
session_id,
)
if make_default:
fp = os.path.join(data_engine_dir, 'default')
os.symlink(session_path, fp)
data_path = session_path
os.makedirs(data_path, exist_ok=True)
# create volatile dir
uid = os.getuid()
runtime_path = config.get('SESSION_RUNTIME_PATH')
if runtime_path == None:
runtime_path = os.path.join('/run', 'user', str(uid), 'chaind', settings.get('CHAIND_BACKEND'))
runtime_path = os.path.join(
runtime_path,
config.get('CHAIND_ENGINE'),
chain_spec.arch(),
chain_spec.fork(),
str(chain_spec.network_id()),
session_id,
)
os.makedirs(runtime_path, exist_ok=True)
settings.set('SESSION_RUNTIME_PATH', runtime_path)
settings.set('SESSION_PATH', session_path)
settings.set('SESSION_DATA_PATH', data_path)
settings.set('SESSION_ID', session_id)
return settings
def process_socket(settings, config):
socket_path = config.get('SESSION_SOCKET_PATH')
if socket_path == None:
socket_path = os.path.join(settings.get('SESSION_RUNTIME_PATH'), 'chaind.sock')
settings.set('SESSION_SOCKET_PATH', socket_path)
return settings
def process_dispatch(settings, config):
settings.set('SESSION_DISPATCH_DELAY', 0.01)
return settings
def process_token(settings, config):
settings.set('TOKEN_MODULE', config.get('TOKEN_MODULE'))
return settings
def process_backend(settings, config):
settings.set('CHAIND_BACKEND', config.get('STATE_BACKEND')) #backend)
return settings
def process_queue(settings, config):
if config.get('STATE_PATH') == None:
queue_state_dir = settings.dir_for('queue')
config.add(queue_state_dir, 'STATE_PATH', False)
logg.debug('setting queue state path {}'.format(queue_state_dir))
settings = process_queue_tx(settings, config)
settings = process_queue_paths(settings, config)
if config.get('STATE_BACKEND') == 'fs':
settings = process_queue_backend_fs(settings, config)
settings = process_queue_store(settings, config)
return settings

View File

@@ -7,6 +7,11 @@ from chainqueue.cache import CacheTokenTx
from chainlib.status import Status as TxStatus from chainlib.status import Status as TxStatus
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.error import RPCException from chainlib.error import RPCException
from chainlib.tx import (
Tx,
TxResult,
)
from chainlib.block import Block
class MockCacheAdapter(CacheTokenTx): class MockCacheAdapter(CacheTokenTx):
@@ -29,15 +34,22 @@ class MockDispatcher:
def send(self, v): def send(self, v):
import sys
sys.stderr.write('susu v {} {}\n'.format(v, self.fails))
if v in self.fails: if v in self.fails:
raise RPCException('{} is in fails'.format(v)) raise RPCException('{} is in fails'.format(v))
pass pass
class MockTx: class MockTx(Tx):
def __init__(self, tx_hash, status=TxStatus.SUCCESS): def __init__(self, tx_hash, status=TxStatus.SUCCESS):
self.hash = tx_hash result = TxResult()
self.status = status result.status = status
super(MockTx, self).__init__(result=result)
self.set_hash(tx_hash)
class MockBlock(Block):
def __init__(self, number):
super(MockBlock, self).__init__()
self.number = number

View File

@@ -1,6 +1,6 @@
chainlib~=0.1.1 chainlib~=0.3.0
chainqueue~=0.1.8 chainqueue~=0.2.0
chainsyncer~=0.4.3 chainsyncer~=0.5.0
confini~=0.6.0 confini~=0.6.1
funga~=0.5.2 funga~=0.5.2
pyxdg~=0.26 pyxdg~=0.26

View File

@@ -1,6 +1,6 @@
[metadata] [metadata]
name = chaind name = chaind
version = 0.2.2 version = 0.3.0
description = Base package for chain queue service description = Base package for chain queue service
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
@@ -30,6 +30,7 @@ packages =
# chaind.runnable # chaind.runnable
chaind.adapters chaind.adapters
chaind.unittest chaind.unittest
chaind.data
chaind.cli chaind.cli
#[options.entry_points] #[options.entry_points]

View File

@@ -14,6 +14,7 @@ from chaind.filter import StateFilter
# test imports # test imports
from chaind.unittest.common import ( from chaind.unittest.common import (
MockTx, MockTx,
MockBlock,
MockCacheAdapter, MockCacheAdapter,
MockDispatcher, MockDispatcher,
) )
@@ -76,7 +77,8 @@ class TestChaindFs(TestChaindFsBase):
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh) tx = MockTx(hsh)
fltr.filter(None, None, tx) block = MockBlock(42)
fltr.filter(None, block, tx)
def test_fs_filter_fail(self): def test_fs_filter_fail(self):
@@ -87,7 +89,8 @@ class TestChaindFs(TestChaindFsBase):
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh, TxStatus.ERROR) tx = MockTx(hsh, TxStatus.ERROR)
fltr.filter(None, None, tx) block = MockBlock(42)
fltr.filter(None, block, tx)
def test_upcoming(self): def test_upcoming(self):