14 Commits

Author SHA1 Message Date
lash
9b98703f24 Receive race handling from chainqueue, rehabilitate tests 2022-05-01 06:58:52 +00:00
lash
4be5325df2 Renew backend every filter pass, allow race in backend 2022-04-30 18:32:10 +00:00
lash
96bdca20cc Filter out final states from upcoming in network exceptions 2022-04-30 16:44:37 +00:00
lash
14fce6f63f Upgrade chainsyncer 2022-04-30 07:51:59 +00:00
lash
9ed3bad0c4 Add upcoming throttling, tests 2022-04-30 05:45:02 +00:00
lash
e87ec0cd4c Upgrade chainqueue, chainsyncer 2022-04-29 06:26:43 +00:00
lash
b52d69c3f9 Correct chainlib version 2022-04-28 12:44:59 +00:00
lash
1d2656aaed Add settings module 2022-04-28 12:41:08 +00:00
lash
b198e3f6b1 Upgrade chainsyncer, chainqueue 2022-04-27 06:35:18 +00:00
lash
a55591f243 Include cli module in setup 2022-04-27 06:31:30 +00:00
lash
800b70680b Improve data dir generation, improve socket settings handling 2022-04-27 06:29:46 +00:00
lash
e54f03a8f6 Add chainqueue settings 2022-04-26 21:28:31 +00:00
lash
18d10dc8b7 Allow selective sync and or queue in settings 2022-04-26 19:41:29 +00:00
lash
7e78fd0da2 Implement cli processing, settings renderer 2022-04-26 17:25:37 +00:00
19 changed files with 403 additions and 140 deletions

View File

@@ -1,3 +1,8 @@
- 0.2.0
* primitive race condition handling between fs access of sync and queue
* re-enable throttling based on in-flight transaction count
- 0.1.2
* add settings object
- 0.1.0 - 0.1.0
* consume non chain-specific code * consume non chain-specific code
- 0.0.1 - 0.0.1

View File

@@ -1,10 +1,28 @@
# external imports # external imports
from chainqueue import Store as QueueStore from chainqueue import Store as QueueStore
# local imports
from chaind.error import BackendIntegrityError
class ChaindAdapter: class ChaindAdapter:
race_delay = 0.1
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0): def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0):
self.cache_adapter = cache_adapter self.cache_adapter = cache_adapter
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache) err = None
for i in range(3):
try:
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)
err = None
break
except FileNotFoundError as e:
logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(tx_hash, e))
err = e
time.sleep(self.race_delay)
continue
if err != None:
raise BackendIntegrityError(err)

View File

@@ -1,5 +1,6 @@
# standard imports # standard imports
import logging import logging
import os
# external imports # external imports
from chainlib.error import RPCException from chainlib.error import RPCException
@@ -10,36 +11,62 @@ from chainqueue.store.fs import (
CounterStore, CounterStore,
) )
from shep.store.file import SimpleFileStoreFactory from shep.store.file import SimpleFileStoreFactory
from shep.error import StateInvalid
# local imports # local imports
from .base import ChaindAdapter from .base import ChaindAdapter
from chaind.error import BackendIntegrityError
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class ChaindFsAdapter(ChaindAdapter): class ChaindFsAdapter(ChaindAdapter):
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32): def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None):
factory = SimpleFileStoreFactory(path).add factory = SimpleFileStoreFactory(path).add
state_store = Status(factory) state_store = Status(factory, allow_invalid=True, event_callback=event_callback)
index_store = IndexStore(path, digest_bytes=digest_bytes) index_path = os.path.join(path, 'tx')
index_store = IndexStore(index_path, digest_bytes=digest_bytes)
counter_store = CounterStore(path) counter_store = CounterStore(path)
super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold) super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold)
def put(self, signed_tx): def put(self, signed_tx):
#cache_tx = self.deserialize(signed_tx)
(s, tx_hash,) = self.store.put(signed_tx, cache_adapter=self.cache_adapter) (s, tx_hash,) = self.store.put(signed_tx, cache_adapter=self.cache_adapter)
return tx_hash return tx_hash
def get(self, tx_hash): def get(self, tx_hash):
v = self.store.get(tx_hash) v = None
err = None
for i in range(3):
try:
v = self.store.get(tx_hash)
err = None
except StateInvalid as e:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None
except FileNotFoundError as e:
err = e
time.sleep(self.race_delay)
logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
continue
if v ==None:
raise BackendIntegrityError(tx_hash)
return v[1] return v[1]
def upcoming(self): def upcoming(self, limit=0):
return self.store.upcoming() real_limit = 0
in_flight = []
if limit > 0:
in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
real_limit = limit - len(in_flight)
if real_limit <= 0:
return []
r = self.store.upcoming(limit=real_limit)
logg.info('upcoming returning {} upcoming from limit {} less {} active in-flight txs'.format(len(r), limit, len(in_flight)))
return r
def pending(self): def pending(self):
@@ -50,7 +77,14 @@ class ChaindFsAdapter(ChaindAdapter):
return self.store.deferred() return self.store.deferred()
def failed(self):
return self.store.failed()
def succeed(self, block, tx): def succeed(self, block, tx):
if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
return self.store.final(tx.hash, block, tx, error=False) return self.store.final(tx.hash, block, tx, error=False)
@@ -58,6 +92,10 @@ class ChaindFsAdapter(ChaindAdapter):
return self.store.final(tx.hash, block, tx, error=True) return self.store.final(tx.hash, block, tx, error=True)
def sendfail(self):
return self.store.fail(tx.hash)
def enqueue(self, tx_hash): def enqueue(self, tx_hash):
return self.store.enqueue(tx_hash) return self.store.enqueue(tx_hash)

12
chaind/cli/__init__.py Normal file
View File

@@ -0,0 +1,12 @@
# standard imports
import os
# local imports
from .base import *
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

17
chaind/cli/arg.py Normal file
View File

@@ -0,0 +1,17 @@
# local imports
from .base import ChaindFlag
def process_flags(argparser, flags):
if flags & ChaindFlag.SESSION > 0:
argparser.add_argument('--session-id', dest='session_id', type=str, help='Session to store state and data under')
argparser.add_argument('--runtime-dir', dest='runtime_dir', type=str, help='Directory to store volatile data')
argparser.add_argument('--data-dir', dest='data_dir', type=str, help='Directory to store persistent data')
if flags & ChaindFlag.SOCKET > 0:
argparser.add_argument('--socket-path', dest='socket', type=str, help='UNIX socket path')
if flags & ChaindFlag.SOCKET_CLIENT > 0:
argparser.add_argument('--send-socket', dest='socket_send', action='store_true', help='Send to UNIX socket')
if flags & ChaindFlag.TOKEN > 0:
argparser.add_argument('--token-module', dest='token_module', type=str, help='Python module path to resolve tokens from identifiers')

13
chaind/cli/base.py Normal file
View File

@@ -0,0 +1,13 @@
# standard imports
import enum
class ChaindFlag(enum.IntEnum):
SESSION = 1
DISPATCH = 2
SOCKET = 16
SOCKET_CLIENT = 32
TOKEN = 256
argflag_local_base = ChaindFlag.SESSION
argflag_local_socket_client = ChaindFlag.SESSION | ChaindFlag.SOCKET | ChaindFlag.SOCKET_CLIENT

23
chaind/cli/config.py Normal file
View File

@@ -0,0 +1,23 @@
# external imports
from chaind.cli import ChaindFlag
def process_config(config, args, flags):
args_override = {}
if flags & ChaindFlag.SESSION:
args_override['SESSION_ID'] = getattr(args, 'session_id')
args_override['SESSION_RUNTIME_DIR'] = getattr(args, 'runtime_dir')
args_override['SESSION_DATA_DIR'] = getattr(args, 'data_dir')
if flags & ChaindFlag.SOCKET:
args_override['SESSION_SOCKET_PATH'] = getattr(args, 'socket')
if flags & ChaindFlag.TOKEN:
args_override['TOKEN_MODULE'] = getattr(args, 'token_module')
config.dict_override(args_override, 'local cli args')
if flags & ChaindFlag.SOCKET_CLIENT:
config.add(getattr(args, 'socket_send'), '_SOCKET_SEND', False)
return config

View File

@@ -3,13 +3,3 @@ socket_path =
runtime_dir = runtime_dir =
id = id =
data_dir = data_dir =
[database]
engine =
name = chaind
driver =
user =
password =
host =
port =
debug = 0

View File

@@ -0,0 +1,2 @@
[token]
module =

View File

@@ -16,3 +16,11 @@ class ClientBlockError(BlockingIOError):
class ClientInputError(ValueError): class ClientInputError(ValueError):
pass pass
class QueueLockError(Exception):
pass
class BackendIntegrityError(Exception):
pass

View File

@@ -1,30 +1,80 @@
# standard imports # standard imports
import logging import logging
import time
# external imports # external imports
from chainlib.status import Status as TxStatus from chainlib.status import Status as TxStatus
from chainsyncer.filter import SyncFilter from chainsyncer.filter import SyncFilter
from chainqueue.error import NotLocalTxError from chainqueue.error import NotLocalTxError
from chaind.adapters.fs import ChaindFsAdapter
# local imports
from .error import (
QueueLockError,
BackendIntegrityError,
)
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class StateFilter(SyncFilter): class StateFilter(SyncFilter):
def __init__(self, adapter, throttler=None): delay_limit = 3.0
self.adapter = adapter race_delay = 0.1
def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None):
self.chain_spec = chain_spec
self.adapter_path = adapter_path
self.tx_adapter = tx_adapter
self.throttler = throttler self.throttler = throttler
def filter(self, conn, block, tx, session=None): def filter(self, conn, block, tx, session=None):
try: cache_tx = None
cache_tx = self.adapter.get(tx.hash) for i in range(3):
except NotLocalTxError: queue_adapter = None
logg.debug('skipping not local transaction {}'.format(tx.hash)) try:
return False queue_adapter = ChaindFsAdapter(
if tx.status == TxStatus.SUCCESS: self.chain_spec,
self.adapter.succeed(block, tx) self.adapter_path,
else: self.tx_adapter,
self.adapter.fail(block, tx) None,
)
except BackendIntegrityError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
time.sleep(self.race_delay)
continue
try:
cache_tx = queue_adapter.get(tx.hash)
except NotLocalTxError:
logg.debug('skipping not local transaction {}'.format(tx.hash))
return False
except BackendIntegrityError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
time.sleep(self.race_delay)
continue
break
if cache_tx == None:
raise NotLocalTxError(tx.hash)
delay = 0.01
while True:
if delay > self.delay_limit:
raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash))
try:
if tx.status == TxStatus.SUCCESS:
queue_adapter.succeed(block, tx)
else:
queue_adapter.fail(block, tx)
break
except QueueLockError as e:
logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e))
time.sleep(delay)
delay *= 2
if self.throttler != None: if self.throttler != None:
self.throttler.dec(tx.hash) self.throttler.dec(tx.hash)

View File

@@ -104,7 +104,6 @@ class SessionController:
logg.error('invalid input "{}"'.format(data_in_str)) logg.error('invalid input "{}"'.format(data_in_str))
raise ClientInputError() raise ClientInputError()
logg.info('recv {} bytes'.format(len(data)))
return (srvs, data,) return (srvs, data,)
@@ -117,3 +116,4 @@ class SessionController:
logg.debug('{} bytes sent'.format(len(v))) logg.debug('{} bytes sent'.format(len(v)))
except BrokenPipeError: except BrokenPipeError:
logg.debug('they just hung up. how rude.') logg.debug('they just hung up. how rude.')
srvs.close()

125
chaind/settings.py Normal file
View File

@@ -0,0 +1,125 @@
# standard imports
import logging
import os
import uuid
# external imports
from chainsyncer.settings import ChainsyncerSettings
from chainqueue.settings import ChainqueueSettings
logg = logging.getLogger(__name__)
class ChaindSettings(ChainsyncerSettings, ChainqueueSettings):
def __init__(self, include_sync=False, include_queue=False):
super(ChaindSettings, self).__init__()
self.include_sync = include_sync
self.include_queue = include_queue
def process_session(self, config):
session_id = config.get('SESSION_ID')
base_dir = os.getcwd()
data_dir = config.get('SESSION_DATA_DIR')
if data_dir == None:
data_dir = os.path.join(base_dir, '.chaind', 'chaind', self.o.get('CHAIND_BACKEND'))
data_engine_dir = os.path.join(data_dir, config.get('CHAIND_ENGINE'))
os.makedirs(data_engine_dir, exist_ok=True)
# check if existing session
if session_id == None:
fp = os.path.join(data_engine_dir, 'default')
try:
os.stat(fp)
fp = os.path.realpath(fp)
except FileNotFoundError:
fp = None
if fp != None:
session_id = os.path.basename(fp)
make_default = False
if session_id == None:
session_id = str(uuid.uuid4())
make_default = True
# create the session persistent dir
session_dir = os.path.join(data_engine_dir, session_id)
if make_default:
fp = os.path.join(data_engine_dir, 'default')
os.symlink(session_dir, fp)
#data_dir = os.path.join(session_dir, config.get('CHAIND_COMPONENT'))
data_dir = session_dir
os.makedirs(data_dir, exist_ok=True)
# create volatile dir
uid = os.getuid()
runtime_dir = config.get('SESSION_RUNTIME_DIR')
if runtime_dir == None:
runtime_dir = os.path.join('/run', 'user', str(uid), 'chaind', self.o.get('CHAIND_BACKEND'))
#runtime_dir = os.path.join(runtime_dir, config.get('CHAIND_ENGINE'), session_id, config.get('CHAIND_COMPONENT'))
runtime_dir = os.path.join(runtime_dir, config.get('CHAIND_ENGINE'), session_id)
os.makedirs(runtime_dir, exist_ok=True)
self.o['SESSION_RUNTIME_DIR'] = runtime_dir
self.o['SESSION_DIR'] = session_dir
self.o['SESSION_DATA_DIR'] = data_dir
self.o['SESSION_ID'] = session_id
def process_sync_interface(self, config):
raise NotImplementedError('no sync interface implementation defined')
def process_sync(self, config):
self.process_sync_interface(config)
self.process_sync_range(config)
def process_socket(self, config):
socket_path = config.get('SESSION_SOCKET_PATH')
if socket_path == None:
socket_path = os.path.join(self.o['SESSION_RUNTIME_DIR'], 'chaind.sock')
self.o['SESSION_SOCKET_PATH'] = socket_path
def process_dispatch(self, config):
self.o['SESSION_DISPATCH_DELAY'] = 0.01
def process_token(self, config):
self.o['TOKEN_MODULE'] = config.get('TOKEN_MODULE')
def process_backend(self, config):
if self.include_sync and self.include_queue:
if self.o['QUEUE_BACKEND'] != self.o['SYNCER_BACKEND']:
raise ValueError('queue and syncer backends must match. queue "{}" != syncer "{}"'.format(self.o['QUEUE_BACKEND'], self.o['SYNCER_BACKEND']))
self.o['CHAIND_BACKEND'] = self.o['SYNCER_BACKEND']
elif self.include_sync:
self.o['CHAIND_BACKEND'] = self.o['SYNCER_BACKEND']
elif self.include_queue:
self.o['CHAIND_BACKEND'] = self.o['QUEUE_BACKEND']
else:
raise ValueError('at least one backend must be set')
def process(self, config):
super(ChaindSettings, self).process(config)
if self.include_sync:
self.process_sync(config)
self.process_sync_backend(config)
if self.include_queue:
self.process_queue_backend(config)
self.process_dispatch(config)
self.process_token(config)
self.process_backend(config)
self.process_session(config)
self.process_socket(config)
def dir_for(self, k):
return os.path.join(self.o['SESSION_DIR'], k)

View File

@@ -1,5 +1,4 @@
# standard imports # standard imports
import unittest
import hashlib import hashlib
import tempfile import tempfile
@@ -9,9 +8,6 @@ from chainlib.status import Status as TxStatus
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.error import RPCException from chainlib.error import RPCException
# local imports
from chaind.adapters.fs import ChaindFsAdapter
class MockCacheAdapter(CacheTokenTx): class MockCacheAdapter(CacheTokenTx):
@@ -33,7 +29,9 @@ class MockDispatcher:
def send(self, v): def send(self, v):
if v not in self.fails: import sys
sys.stderr.write('susu v {} {}\n'.format(v, self.fails))
if v in self.fails:
raise RPCException('{} is in fails'.format(v)) raise RPCException('{} is in fails'.format(v))
pass pass
@@ -43,12 +41,3 @@ class MockTx:
def __init__(self, tx_hash, status=TxStatus.SUCCESS): def __init__(self, tx_hash, status=TxStatus.SUCCESS):
self.hash = tx_hash self.hash = tx_hash
self.status = status self.status = status
class TestChaindFsBase(unittest.TestCase):
def setUp(self):
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.path = tempfile.mkdtemp()
self.adapter = ChaindFsAdapter(self.chain_spec, self.path, self.cache_adapter, self.dispatcher)

31
chaind/unittest/fs.py Normal file
View File

@@ -0,0 +1,31 @@
# standard imports
import unittest
import tempfile
import logging
# external imports
from chainlib.chain import ChainSpec
# local imports
from chaind.adapters.fs import ChaindFsAdapter
logging.STATETRACE = 5
logg = logging.getLogger(__name__)
logg.setLevel(logging.STATETRACE)
class TestChaindFsBase(unittest.TestCase):
def setUp(self):
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.path = tempfile.mkdtemp()
self.adapter = ChaindFsAdapter(self.chain_spec, self.path, self.cache_adapter, self.dispatcher, event_callback=self.log_state)
def log_state(self, k, from_state, to_state):
logg.log(logging.STATETRACE, 'state change {}: {} -> {}'.format(
k,
from_state,
to_state,
)
)

View File

@@ -1,6 +1,6 @@
chainlib~=0.1.0 chainlib~=0.1.1
chainqueue~=0.1.1 chainqueue~=0.1.6
chainsyncer~=0.3.5 chainsyncer~=0.4.3
confini~=0.6.0 confini~=0.6.0
funga~=0.5.2 funga~=0.5.2
pyxdg~=0.26 pyxdg~=0.26

View File

@@ -1,80 +0,0 @@
#!/usr/bin/python
import os
import argparse
import logging
# external imports
import alembic
from alembic.config import Config as AlembicConfig
import confini
import chainqueue.db
import chainsyncer.db
# local imports
from chaind import Environment
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
dbdir = os.path.join(rootdir, 'chaind', 'db')
configdir = os.path.join(rootdir, 'chaind', 'data', 'config')
default_migrations_dir = os.path.join(dbdir, 'migrations')
env = Environment(env=os.environ)
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=env.config_dir, help='config directory')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--data-dir', dest='data_dir', type=str, help='data directory')
argparser.add_argument('--migrations-dir', dest='migrations_dir', default=default_migrations_dir, type=str, help='path to alembic migrations directory')
argparser.add_argument('--reset', action='store_true', help='reset exsting database')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
# process config
logg.debug('loading config from {}'.format(args.c))
config = confini.Config(configdir, args.env_prefix, override_dirs=[args.c])
config.process()
args_override = {
'SESSION_DATA_DIR': getattr(args, 'data_dir'),
}
config.dict_override(args_override, 'cli args')
if config.get('DATABASE_ENGINE') == 'sqlite':
config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', True)
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded:\n{}'.format(config))
config.add(os.path.join(args.migrations_dir, config.get('DATABASE_ENGINE')), '_MIGRATIONS_DIR', True)
if not os.path.isdir(config.get('_MIGRATIONS_DIR')):
logg.debug('migrations dir for engine {} not found, reverting to default'.format(config.get('DATABASE_ENGINE')))
config.add(os.path.join(args.migrations_dir, 'default'), '_MIGRATIONS_DIR', True)
os.makedirs(config.get('SESSION_DATA_DIR'), exist_ok=True)
dsn = chainqueue.db.dsn_from_config(config)
def main():
logg.info('using migrations dir {}'.format(config.get('_MIGRATIONS_DIR')))
logg.info('using db {}'.format(dsn))
ac = AlembicConfig(os.path.join(config.get('_MIGRATIONS_DIR'), 'alembic.ini'))
ac.set_main_option('sqlalchemy.url', dsn)
ac.set_main_option('script_location', config.get('_MIGRATIONS_DIR'))
if args.reset:
logg.debug('reset is set, purging existing content')
alembic.command.downgrade(ac, 'base')
alembic.command.upgrade(ac, 'head')
if __name__ == '__main__':
main()

View File

@@ -1,7 +1,7 @@
[metadata] [metadata]
name = chaind name = chaind
version = 0.1.0 version = 0.2.1
description = Base package for chain queue servicek description = Base package for chain queue service
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no
url = https://gitlab.com/chaintool/chaind url = https://gitlab.com/chaintool/chaind
@@ -23,14 +23,14 @@ licence_files =
LICENSE LICENSE
[options] [options]
python_requires = >= 3.6 python_requires = >= 3.7
include_package_data = True include_package_data = True
packages = packages =
chaind chaind
# chaind.sql
# chaind.runnable # chaind.runnable
chaind.adapters chaind.adapters
chaind.unittest chaind.unittest
chaind.cli
#[options.entry_points] #[options.entry_points]
#console_scripts = #console_scripts =

View File

@@ -15,15 +15,14 @@ from chaind.filter import StateFilter
from chaind.unittest.common import ( from chaind.unittest.common import (
MockTx, MockTx,
MockCacheAdapter, MockCacheAdapter,
TestChaindFsBase, MockDispatcher,
) )
from chaind.unittest.fs import TestChaindFsBase
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger() logg = logging.getLogger()
class TestChaindFs(TestChaindFsBase): class TestChaindFs(TestChaindFsBase):
def setUp(self): def setUp(self):
@@ -43,12 +42,15 @@ class TestChaindFs(TestChaindFsBase):
self.assertEqual(data, v) self.assertEqual(data, v)
def test_fs_defer(self): def test_fs_fail(self):
data = os.urandom(128).hex() data = os.urandom(128).hex()
hsh = self.adapter.put(data) hsh = self.adapter.put(data)
self.dispatcher.add_fail(hsh) self.dispatcher.add_fail(data)
self.adapter.dispatch(hsh)
txs = self.adapter.deferred() r = self.adapter.dispatch(hsh)
self.assertFalse(r)
txs = self.adapter.failed()
self.assertEqual(len(txs), 1) self.assertEqual(len(txs), 1)
@@ -72,7 +74,7 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex() data = os.urandom(128).hex()
hsh = self.adapter.put(data) hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh) tx = MockTx(hsh)
fltr.filter(None, None, tx) fltr.filter(None, None, tx)
@@ -83,10 +85,30 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex() data = os.urandom(128).hex()
hsh = self.adapter.put(data) hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh, TxStatus.ERROR) tx = MockTx(hsh, TxStatus.ERROR)
fltr.filter(None, None, tx) fltr.filter(None, None, tx)
def test_upcoming(self):
drv = QueueDriver(self.adapter)
txs = []
for i in range(10):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
txs.append(hsh)
self.adapter.enqueue(hsh)
r = self.adapter.upcoming(limit=5)
self.assertEqual(len(r), 5)
r = self.adapter.dispatch(txs[0])
self.assertTrue(r)
r = self.adapter.upcoming(limit=5)
self.assertEqual(len(r), 4)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()