19 Commits

Author SHA1 Message Date
lash
465d692956 Upgrade deps 2022-05-05 15:02:21 +00:00
lash
81c1207828 Recreate adapter only per block 2022-05-05 12:09:07 +00:00
lash
f33ba13d74 Upgrade chainsyncer 2022-05-05 08:00:14 +00:00
lash
5459d4c3f8 Upgrade deps 2022-05-04 18:22:29 +00:00
lash
3e05717395 Add error line for rpc fails in dispatch 2022-05-04 06:44:40 +00:00
lash
54d10ee40b Upgrade chainqueue 2022-05-04 05:43:39 +00:00
lash
9cffdc5867 Factor out dispatch processor from chain implementation 2022-05-04 05:38:01 +00:00
lash
4f96be2024 Upgrade chainqueue 2022-05-03 17:22:59 +00:00
lash
32e1bc6aa5 Correct purge call, add missing lock module 2022-05-02 20:10:30 +00:00
lash
387014f77b Purge items from memory state on final 2022-05-02 20:05:41 +00:00
lash
c3a592c0f6 Implement shep store lock 2022-05-02 09:59:13 +00:00
lash
5d61506133 WIP whackamole race condition problems 2022-05-01 07:55:51 +00:00
lash
5102b4ac6e Fix crashes related to race condition hits 2022-05-01 07:31:18 +00:00
lash
9b98703f24 Receive race handling from chainqueue, rehabilitate tests 2022-05-01 06:58:52 +00:00
lash
4be5325df2 Renew backend every filter pass, allow race in backend 2022-04-30 18:32:10 +00:00
lash
96bdca20cc Filter out final states from upcoming in network exceptions 2022-04-30 16:44:37 +00:00
lash
14fce6f63f Upgrade chainsyncer 2022-04-30 07:51:59 +00:00
lash
9ed3bad0c4 Add upcoming throttling, tests 2022-04-30 05:45:02 +00:00
lash
e87ec0cd4c Upgrade chainqueue, chainsyncer 2022-04-29 06:26:43 +00:00
14 changed files with 399 additions and 61 deletions

View File

@@ -1,3 +1,24 @@
- 0.2.10
* Upgrade shep to guarantee state lock atomicity
- 0.2.9
* Minimize instantiations of adapters in filter execution
- 0.2.8
* Upgrade chainsyncer
- 0.2.7
* Upgrade chainlib
- 0.2.6
* Deps upgrade
- 0.2.5
* Deps upgrade
- 0.2.4
* Allow omission of state store sync in queue store backend
- 0.2.2
* Fix missing symbol crashes related to race conditions
- 0.2.1
* Receive removed race checks from chainqueue
- 0.2.0
* primitive race condition handling between fs access of sync and queue
* re-enable throttling based on in-flight transaction count
- 0.1.2 - 0.1.2
* add settings object * add settings object
- 0.1.0 - 0.1.0

View File

@@ -1,10 +1,27 @@
# standard imports
import logging
import time
# external imports # external imports
from chainqueue import Store as QueueStore from chainqueue import Store as QueueStore
# local imports
from chaind.lock import StoreLock
logg = logging.getLogger(__name__)
class ChaindAdapter: class ChaindAdapter:
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0): def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, store_sync=True):
self.cache_adapter = cache_adapter self.cache_adapter = cache_adapter
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache) store_lock = StoreLock()
while True:
try:
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache, sync=store_sync)
break
except FileNotFoundError as e:
logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(e))
store_lock.again()
continue

View File

@@ -1,6 +1,7 @@
# standard imports # standard imports
import logging import logging
import os import os
import time
# external imports # external imports
from chainlib.error import RPCException from chainlib.error import RPCException
@@ -11,23 +12,27 @@ from chainqueue.store.fs import (
CounterStore, CounterStore,
) )
from shep.store.file import SimpleFileStoreFactory from shep.store.file import SimpleFileStoreFactory
from shep.error import StateInvalid from shep.error import (
StateInvalid,
StateLockedKey,
)
# local imports # local imports
from .base import ChaindAdapter from .base import ChaindAdapter
from chaind.lock import StoreLock
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class ChaindFsAdapter(ChaindAdapter): class ChaindFsAdapter(ChaindAdapter):
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32): def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None, store_sync=True):
factory = SimpleFileStoreFactory(path).add factory = SimpleFileStoreFactory(path, use_lock=True).add
state_store = Status(factory) state_store = Status(factory, allow_invalid=True, event_callback=event_callback)
index_path = os.path.join(path, 'tx') index_path = os.path.join(path, 'tx')
index_store = IndexStore(index_path, digest_bytes=digest_bytes) index_store = IndexStore(index_path, digest_bytes=digest_bytes)
counter_store = CounterStore(path) counter_store = CounterStore(path)
super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold) super(ChaindFsAdapter, self).__init__(chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=cache, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold, store_sync=store_sync)
def put(self, signed_tx): def put(self, signed_tx):
@@ -37,16 +42,37 @@ class ChaindFsAdapter(ChaindAdapter):
def get(self, tx_hash): def get(self, tx_hash):
v = None v = None
try: store_lock = StoreLock()
v = self.store.get(tx_hash) while True:
except StateInvalid as e: try:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e)) v = self.store.get(tx_hash)
return None break
except StateInvalid as e:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None
except FileNotFoundError as e:
logg.debug('queuestore get (file missing) {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('queuestore get (statelock) {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue
return v[1] return v[1]
def upcoming(self): def upcoming(self, limit=0):
return self.store.upcoming() real_limit = 0
in_flight = []
if limit > 0:
in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
real_limit = limit - len(in_flight)
if real_limit <= 0:
return []
r = self.store.upcoming(limit=real_limit)
logg.info('upcoming returning {} upcoming from limit {} less {} active in-flight txs'.format(len(r), limit, len(in_flight)))
return r
def pending(self): def pending(self):
@@ -57,12 +83,30 @@ class ChaindFsAdapter(ChaindAdapter):
return self.store.deferred() return self.store.deferred()
def failed(self):
return self.store.failed()
def succeed(self, block, tx): def succeed(self, block, tx):
return self.store.final(tx.hash, block, tx, error=False) if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
r = self.store.final(tx.hash, block, tx, error=False)
(k, v) = self.store.get(tx.hash)
self.store.purge(k)
return r
def fail(self, block, tx): def fail(self, block, tx):
return self.store.final(tx.hash, block, tx, error=True) if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
r = self.store.final(tx.hash, block, tx, error=True)
(k, v) = self.store.get(tx.hash)
self.store.purge(k)
return r
def sendfail(self):
return self.store.fail(tx.hash)
def enqueue(self, tx_hash): def enqueue(self, tx_hash):
@@ -70,15 +114,44 @@ class ChaindFsAdapter(ChaindAdapter):
def dispatch(self, tx_hash): def dispatch(self, tx_hash):
entry = self.store.send_start(tx_hash) entry = None
store_lock = StoreLock()
while True:
try:
entry = self.store.send_start(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue
tx_wire = entry.serialize() tx_wire = entry.serialize()
r = None r = None
try: try:
r = self.dispatcher.send(tx_wire) r = self.dispatcher.send(tx_wire)
except RPCException: except RPCException as e:
logg.error('dispatch send failed for {}: {}'.format(tx_hash, e))
self.store.fail(tx_hash) self.store.fail(tx_hash)
return False return False
self.store.send_end(tx_hash) store_lock = StoreLock()
while True:
try:
self.store.send_end(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
return True return True

View File

@@ -0,0 +1,2 @@
[token]
module =

33
chaind/dispatch.py Normal file
View File

@@ -0,0 +1,33 @@
# standard imports
import logging
# local ipmorts
from chaind.adapters.fs import ChaindFsAdapter
from chaind.eth.cache import EthCacheTx
logg = logging.getLogger(__name__)
class DispatchProcessor:
def __init__(self, chain_spec, queue_dir, dispatcher):
self.dispatcher = dispatcher
self.chain_spec = chain_spec,
self.queue_dir = queue_dir
def process(self, rpc, limit=50):
adapter = ChaindFsAdapter(
self.chain_spec,
self.queue_dir,
EthCacheTx,
self.dispatcher,
)
upcoming = adapter.upcoming(limit=limit)
logg.info('processor has {} candidates for {}, processing with limit {}'.format(len(upcoming), self.chain_spec, limit))
i = 0
for tx_hash in upcoming:
if adapter.dispatch(tx_hash):
i += 1
return i

View File

@@ -16,3 +16,11 @@ class ClientBlockError(BlockingIOError):
class ClientInputError(ValueError): class ClientInputError(ValueError):
pass pass
class QueueLockError(Exception):
pass
class BackendError(Exception):
pass

View File

@@ -1,31 +1,123 @@
# standard imports # standard imports
import logging import logging
import time
# external imports # external imports
from chainlib.status import Status as TxStatus from chainlib.status import Status as TxStatus
from chainsyncer.filter import SyncFilter from chainsyncer.filter import SyncFilter
from chainqueue.error import NotLocalTxError from chainqueue.error import NotLocalTxError
from chaind.adapters.fs import ChaindFsAdapter
from shep.error import StateLockedKey
# local imports
from .error import (
QueueLockError,
BackendError,
)
from chaind.lock import StoreLock
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class StateFilter(SyncFilter): class StateFilter(SyncFilter):
def __init__(self, adapter, throttler=None): def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None):
self.adapter = adapter self.chain_spec = chain_spec
self.adapter_path = adapter_path
self.tx_adapter = tx_adapter
self.throttler = throttler self.throttler = throttler
self.last_block_height = 0
self.adapter = None
self.store_lock = None
def __get_adapter(self, block, force_reload=False):
if self.store_lock == None:
self.store_lock = StoreLock()
reload = False
if block.number != self.last_block_height:
reload = True
elif self.adapter == None:
reload = True
elif force_reload:
reload = True
self.last_block_height = block.number
if reload:
while True:
logg.info('reloading adapter')
try:
self.adapter = ChaindFsAdapter(
self.chain_spec,
self.adapter_path,
self.tx_adapter,
None,
)
break
except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
self.store_lock.again()
continue
return self.adapter
def filter(self, conn, block, tx, session=None): def filter(self, conn, block, tx, session=None):
try: cache_tx = None
cache_tx = self.adapter.get(tx.hash) queue_adapter = self.__get_adapter(block)
except NotLocalTxError:
logg.debug('skipping not local transaction {}'.format(tx.hash)) self.store_lock.reset()
return False
while True:
try:
cache_tx = queue_adapter.get(tx.hash)
break
except NotLocalTxError:
logg.debug('skipping not local transaction {}'.format(tx.hash))
self.__stop_adapter()
return False
except BackendError as e:
logg.error('adapter get failed: {}, one more try'.format(e))
self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue
if cache_tx == None:
raise NotLocalTxError(tx.hash)
self.store_lock.reset()
queue_lock = StoreLock(error=QueueLockError)
while True:
try:
if tx.status == TxStatus.SUCCESS:
queue_adapter.succeed(block, tx)
else:
queue_adapter.fail(block, tx)
break
except QueueLockError as e:
logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e))
queue_lock.again()
except FileNotFoundError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue
except NotLocalTxError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue
except StateLockedKey as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue
logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block))
if tx.status == TxStatus.SUCCESS:
self.adapter.succeed(block, tx)
else:
self.adapter.fail(block, tx)
if self.throttler != None: if self.throttler != None:
self.throttler.dec(tx.hash) self.throttler.dec(tx.hash)

34
chaind/lock.py Normal file
View File

@@ -0,0 +1,34 @@
# standard imports
import time
# local imports
from .error import BackendError
BASE_DELAY = 0.01
BASE_DELAY_LIMIT = 10.0
class StoreLock:
def __init__(self, delay=BASE_DELAY, delay_limit=BASE_DELAY_LIMIT, error=BackendError, description=None):
self.base_delay = delay
self.delay = delay
self.delay_limit = delay_limit
self.error = error
self.description = description
def again(self, e=None):
if self.delay > self.delay_limit:
err = None
if e != None:
err = str(e)
else:
err = self.description
raise self.error(err)
time.sleep(self.delay)
self.delay *= 2
def reset(self):
self.delay = self.base_delay

View File

@@ -8,19 +8,21 @@ import stat
from hexathon import strip_0x from hexathon import strip_0x
# local imports # local imports
from chaind.error import ( from .error import (
NothingToDoError, NothingToDoError,
ClientGoneError, ClientGoneError,
ClientBlockError, ClientBlockError,
ClientInputError, ClientInputError,
) )
from .lock import StoreLock
from .error import BackendError
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class SessionController: class SessionController:
def __init__(self, config, adapter, processor): def __init__(self, config, processor):
self.dead = False self.dead = False
os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True) os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True)
try: try:
@@ -35,7 +37,6 @@ class SessionController:
self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY'))) self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY')))
self.processor = processor self.processor = processor
self.chain_spec = config.get('CHAIN_SPEC') self.chain_spec = config.get('CHAIN_SPEC')
self.adapter = adapter
def shutdown(self, signo, frame): def shutdown(self, signo, frame):
@@ -59,7 +60,16 @@ class SessionController:
def process(self, conn): def process(self, conn):
r = self.processor(self.chain_spec, self.adapter, conn) state_lock = StoreLock()
r = None
while True:
try:
r = self.processor(conn)
break
except BackendError as e:
state_lock.again(e)
continue
if r > 0: if r > 0:
self.srv.settimeout(0.1) self.srv.settimeout(0.1)
else: else:
@@ -104,7 +114,6 @@ class SessionController:
logg.error('invalid input "{}"'.format(data_in_str)) logg.error('invalid input "{}"'.format(data_in_str))
raise ClientInputError() raise ClientInputError()
logg.info('recv {} bytes'.format(len(data)))
return (srvs, data,) return (srvs, data,)

View File

@@ -1,5 +1,4 @@
# standard imports # standard imports
import unittest
import hashlib import hashlib
import tempfile import tempfile
@@ -9,9 +8,6 @@ from chainlib.status import Status as TxStatus
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.error import RPCException from chainlib.error import RPCException
# local imports
from chaind.adapters.fs import ChaindFsAdapter
class MockCacheAdapter(CacheTokenTx): class MockCacheAdapter(CacheTokenTx):
@@ -33,7 +29,7 @@ class MockDispatcher:
def send(self, v): def send(self, v):
if v not in self.fails: if v in self.fails:
raise RPCException('{} is in fails'.format(v)) raise RPCException('{} is in fails'.format(v))
pass pass
@@ -45,10 +41,7 @@ class MockTx:
self.status = status self.status = status
class TestChaindFsBase(unittest.TestCase): class MockBlock:
def setUp(self):
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.path = tempfile.mkdtemp()
self.adapter = ChaindFsAdapter(self.chain_spec, self.path, self.cache_adapter, self.dispatcher)
def __init__(self, number):
self.number = number

31
chaind/unittest/fs.py Normal file
View File

@@ -0,0 +1,31 @@
# standard imports
import unittest
import tempfile
import logging
# external imports
from chainlib.chain import ChainSpec
# local imports
from chaind.adapters.fs import ChaindFsAdapter
logging.STATETRACE = 5
logg = logging.getLogger(__name__)
logg.setLevel(logging.STATETRACE)
class TestChaindFsBase(unittest.TestCase):
def setUp(self):
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.path = tempfile.mkdtemp()
self.adapter = ChaindFsAdapter(self.chain_spec, self.path, self.cache_adapter, self.dispatcher, event_callback=self.log_state)
def log_state(self, k, from_state, to_state):
logg.log(logging.STATETRACE, 'state change {}: {} -> {}'.format(
k,
from_state,
to_state,
)
)

View File

@@ -1,6 +1,6 @@
chainlib~=0.1.1 chainlib~=0.1.2
chainqueue~=0.1.2 chainqueue~=0.1.13
chainsyncer~=0.4.0 chainsyncer~=0.4.5
confini~=0.6.0 confini~=0.6.0
funga~=0.5.2 funga~=0.5.2
pyxdg~=0.26 pyxdg~=0.26

View File

@@ -1,6 +1,6 @@
[metadata] [metadata]
name = chaind name = chaind
version = 0.1.2 version = 0.2.10
description = Base package for chain queue service description = Base package for chain queue service
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

View File

@@ -14,16 +14,16 @@ from chaind.filter import StateFilter
# test imports # test imports
from chaind.unittest.common import ( from chaind.unittest.common import (
MockTx, MockTx,
MockBlock,
MockCacheAdapter, MockCacheAdapter,
TestChaindFsBase, MockDispatcher,
) )
from chaind.unittest.fs import TestChaindFsBase
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger() logg = logging.getLogger()
class TestChaindFs(TestChaindFsBase): class TestChaindFs(TestChaindFsBase):
def setUp(self): def setUp(self):
@@ -43,12 +43,15 @@ class TestChaindFs(TestChaindFsBase):
self.assertEqual(data, v) self.assertEqual(data, v)
def test_fs_defer(self): def test_fs_fail(self):
data = os.urandom(128).hex() data = os.urandom(128).hex()
hsh = self.adapter.put(data) hsh = self.adapter.put(data)
self.dispatcher.add_fail(hsh) self.dispatcher.add_fail(data)
self.adapter.dispatch(hsh)
txs = self.adapter.deferred() r = self.adapter.dispatch(hsh)
self.assertFalse(r)
txs = self.adapter.failed()
self.assertEqual(len(txs), 1) self.assertEqual(len(txs), 1)
@@ -72,9 +75,10 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex() data = os.urandom(128).hex()
hsh = self.adapter.put(data) hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh) tx = MockTx(hsh)
fltr.filter(None, None, tx) block = MockBlock(42)
fltr.filter(None, block, tx)
def test_fs_filter_fail(self): def test_fs_filter_fail(self):
@@ -83,9 +87,30 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex() data = os.urandom(128).hex()
hsh = self.adapter.put(data) hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh, TxStatus.ERROR) tx = MockTx(hsh, TxStatus.ERROR)
fltr.filter(None, None, tx) block = MockBlock(42)
fltr.filter(None, block, tx)
def test_upcoming(self):
drv = QueueDriver(self.adapter)
txs = []
for i in range(10):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
txs.append(hsh)
self.adapter.enqueue(hsh)
r = self.adapter.upcoming(limit=5)
self.assertEqual(len(r), 5)
r = self.adapter.dispatch(txs[0])
self.assertTrue(r)
r = self.adapter.upcoming(limit=5)
self.assertEqual(len(r), 4)
if __name__ == '__main__': if __name__ == '__main__':