Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e87ec0cd4c
|
||
|
|
b52d69c3f9
|
||
|
|
1d2656aaed
|
@@ -1,3 +1,5 @@
|
||||
- 0.1.2
|
||||
* add settings object
|
||||
- 0.1.0
|
||||
* consume non chain-specific code
|
||||
- 0.0.1
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import os
|
||||
|
||||
# external imports
|
||||
from chainlib.error import RPCException
|
||||
@@ -10,6 +11,7 @@ from chainqueue.store.fs import (
|
||||
CounterStore,
|
||||
)
|
||||
from shep.store.file import SimpleFileStoreFactory
|
||||
from shep.error import StateInvalid
|
||||
|
||||
# local imports
|
||||
from .base import ChaindAdapter
|
||||
@@ -22,7 +24,8 @@ class ChaindFsAdapter(ChaindAdapter):
|
||||
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32):
|
||||
factory = SimpleFileStoreFactory(path).add
|
||||
state_store = Status(factory)
|
||||
index_store = IndexStore(path, digest_bytes=digest_bytes)
|
||||
index_path = os.path.join(path, 'tx')
|
||||
index_store = IndexStore(index_path, digest_bytes=digest_bytes)
|
||||
counter_store = CounterStore(path)
|
||||
super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold)
|
||||
|
||||
@@ -33,7 +36,12 @@ class ChaindFsAdapter(ChaindAdapter):
|
||||
|
||||
|
||||
def get(self, tx_hash):
|
||||
v = self.store.get(tx_hash)
|
||||
v = None
|
||||
try:
|
||||
v = self.store.get(tx_hash)
|
||||
except StateInvalid as e:
|
||||
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
|
||||
return None
|
||||
return v[1]
|
||||
|
||||
|
||||
@@ -50,6 +58,9 @@ class ChaindFsAdapter(ChaindAdapter):
|
||||
|
||||
|
||||
def succeed(self, block, tx):
|
||||
if self.store.is_reserved(tx.hash):
|
||||
raise QueueLockError(tx.hash)
|
||||
|
||||
return self.store.final(tx.hash, block, tx, error=False)
|
||||
|
||||
|
||||
|
||||
@@ -3,4 +3,3 @@ socket_path =
|
||||
runtime_dir =
|
||||
id =
|
||||
data_dir =
|
||||
|
||||
|
||||
@@ -16,3 +16,7 @@ class ClientBlockError(BlockingIOError):
|
||||
|
||||
class ClientInputError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class QueueLockError(Exception):
|
||||
pass
|
||||
|
||||
@@ -1,15 +1,21 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import time
|
||||
|
||||
# external imports
|
||||
from chainlib.status import Status as TxStatus
|
||||
from chainsyncer.filter import SyncFilter
|
||||
from chainqueue.error import NotLocalTxError
|
||||
|
||||
# local imports
|
||||
from .error import QueueLockError
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
class StateFilter(SyncFilter):
|
||||
|
||||
delay_limit = 3.0
|
||||
|
||||
def __init__(self, adapter, throttler=None):
|
||||
self.adapter = adapter
|
||||
self.throttler = throttler
|
||||
@@ -22,10 +28,21 @@ class StateFilter(SyncFilter):
|
||||
logg.debug('skipping not local transaction {}'.format(tx.hash))
|
||||
return False
|
||||
|
||||
if tx.status == TxStatus.SUCCESS:
|
||||
self.adapter.succeed(block, tx)
|
||||
else:
|
||||
self.adapter.fail(block, tx)
|
||||
delay = 0.01
|
||||
while True:
|
||||
if delay > self.delay_limit:
|
||||
raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash))
|
||||
try:
|
||||
if tx.status == TxStatus.SUCCESS:
|
||||
self.adapter.succeed(block, tx)
|
||||
else:
|
||||
self.adapter.fail(block, tx)
|
||||
break
|
||||
except QueueLockError as e:
|
||||
logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e))
|
||||
time.sleep(delay)
|
||||
delay *= 2
|
||||
|
||||
if self.throttler != None:
|
||||
self.throttler.dec(tx.hash)
|
||||
|
||||
|
||||
@@ -4,32 +4,27 @@ import os
|
||||
import uuid
|
||||
|
||||
# external imports
|
||||
from chainlib.chain import ChainSpec
|
||||
from chainsyncer.settings import ChainsyncerSettings
|
||||
from chainqueue.settings import ChainqueueSettings
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChaindSettings(ChainsyncerSettings):
|
||||
class ChaindSettings(ChainsyncerSettings, ChainqueueSettings):
|
||||
|
||||
def __init__(self, include_sync=False, include_queue=False):
|
||||
self.o = {}
|
||||
self.get = self.o.get
|
||||
super(ChaindSettings, self).__init__()
|
||||
self.include_sync = include_sync
|
||||
self.include_queue = include_queue
|
||||
|
||||
|
||||
def process_common(self, config):
|
||||
self.o['CHAIN_SPEC'] = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
|
||||
|
||||
|
||||
def process_session(self, config):
|
||||
session_id = config.get('SESSION_ID')
|
||||
|
||||
base_dir = os.getcwd()
|
||||
data_dir = config.get('SESSION_DATA_DIR')
|
||||
if data_dir == None:
|
||||
data_dir = os.path.join(base_dir, '.chaind', 'chaind')
|
||||
data_dir = os.path.join(base_dir, '.chaind', 'chaind', self.o.get('CHAIND_BACKEND'))
|
||||
data_engine_dir = os.path.join(data_dir, config.get('CHAIND_ENGINE'))
|
||||
os.makedirs(data_engine_dir, exist_ok=True)
|
||||
|
||||
@@ -55,15 +50,17 @@ class ChaindSettings(ChainsyncerSettings):
|
||||
fp = os.path.join(data_engine_dir, 'default')
|
||||
os.symlink(session_dir, fp)
|
||||
|
||||
data_dir = os.path.join(session_dir, config.get('CHAIND_COMPONENT'))
|
||||
#data_dir = os.path.join(session_dir, config.get('CHAIND_COMPONENT'))
|
||||
data_dir = session_dir
|
||||
os.makedirs(data_dir, exist_ok=True)
|
||||
|
||||
# create volatile dir
|
||||
uid = os.getuid()
|
||||
runtime_dir = config.get('SESSION_RUNTIME_DIR')
|
||||
if runtime_dir == None:
|
||||
runtime_dir = os.path.join('/run', 'user', str(uid), 'chaind')
|
||||
runtime_dir = os.path.join(runtime_dir, config.get('CHAIND_ENGINE'), session_id, config.get('CHAIND_COMPONENT'))
|
||||
runtime_dir = os.path.join('/run', 'user', str(uid), 'chaind', self.o.get('CHAIND_BACKEND'))
|
||||
#runtime_dir = os.path.join(runtime_dir, config.get('CHAIND_ENGINE'), session_id, config.get('CHAIND_COMPONENT'))
|
||||
runtime_dir = os.path.join(runtime_dir, config.get('CHAIND_ENGINE'), session_id)
|
||||
os.makedirs(runtime_dir, exist_ok=True)
|
||||
|
||||
self.o['SESSION_RUNTIME_DIR'] = runtime_dir
|
||||
@@ -91,25 +88,38 @@ class ChaindSettings(ChainsyncerSettings):
|
||||
def process_dispatch(self, config):
|
||||
self.o['SESSION_DISPATCH_DELAY'] = 0.01
|
||||
|
||||
|
||||
|
||||
def process_token(self, config):
|
||||
self.o['TOKEN_MODULE'] = config.get('TOKEN_MODULE')
|
||||
|
||||
|
||||
def process_backend(self, config):
|
||||
if self.include_sync and self.include_queue:
|
||||
if self.o['QUEUE_BACKEND'] != self.o['SYNCER_BACKEND']:
|
||||
raise ValueError('queue and syncer backends must match. queue "{}" != syncer "{}"'.format(self.o['QUEUE_BACKEND'], self.o['SYNCER_BACKEND']))
|
||||
self.o['CHAIND_BACKEND'] = self.o['SYNCER_BACKEND']
|
||||
elif self.include_sync:
|
||||
self.o['CHAIND_BACKEND'] = self.o['SYNCER_BACKEND']
|
||||
elif self.include_queue:
|
||||
self.o['CHAIND_BACKEND'] = self.o['QUEUE_BACKEND']
|
||||
else:
|
||||
raise ValueError('at least one backend must be set')
|
||||
|
||||
|
||||
def process(self, config):
|
||||
self.process_common(config)
|
||||
self.process_session(config)
|
||||
self.process_socket(config)
|
||||
super(ChaindSettings, self).process(config)
|
||||
if self.include_sync:
|
||||
self.process_sync(config)
|
||||
self.process_sync_backend(config)
|
||||
if self.include_queue:
|
||||
self.process_queue_backend(config)
|
||||
self.process_dispatch(config)
|
||||
self.process_token(config)
|
||||
|
||||
self.process_backend(config)
|
||||
self.process_session(config)
|
||||
self.process_socket(config)
|
||||
|
||||
|
||||
def dir_for(self, k):
|
||||
return os.path.join(self.o['SESSION_DIR'], k)
|
||||
|
||||
|
||||
def __str__(self):
|
||||
ks = list(self.o.keys())
|
||||
ks.sort()
|
||||
s = ''
|
||||
for k in ks:
|
||||
s += '{}: {}\n'.format(k, self.o.get(k))
|
||||
return s
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
chainlib~=0.1.0
|
||||
chainqueue~=0.1.2
|
||||
chainsyncer~=0.3.6
|
||||
chainlib~=0.1.1
|
||||
chainqueue~=0.1.5
|
||||
chainsyncer~=0.4.2
|
||||
confini~=0.6.0
|
||||
funga~=0.5.2
|
||||
pyxdg~=0.26
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[metadata]
|
||||
name = chaind
|
||||
version = 0.1.0
|
||||
description = Base package for chain queue servicek
|
||||
version = 0.1.3
|
||||
description = Base package for chain queue service
|
||||
author = Louis Holbrook
|
||||
author_email = dev@holbrook.no
|
||||
url = https://gitlab.com/chaintool/chaind
|
||||
@@ -23,7 +23,7 @@ licence_files =
|
||||
LICENSE
|
||||
|
||||
[options]
|
||||
python_requires = >= 3.6
|
||||
python_requires = >= 3.7
|
||||
include_package_data = True
|
||||
packages =
|
||||
chaind
|
||||
|
||||
Reference in New Issue
Block a user