chaind/tests/test_fs.py

126 lines
2.8 KiB
Python
Raw Normal View History

2022-03-13 15:54:11 +01:00
# standard imports
import os
import tempfile
import unittest
import shutil
import logging
import hashlib
# external imports
from chainlib.chain import ChainSpec
from chainqueue.cache import CacheTokenTx
2022-03-14 20:53:29 +01:00
from chainlib.error import RPCException
2022-03-14 22:17:31 +01:00
from chainlib.status import Status as TxStatus
2022-03-13 15:54:11 +01:00
# local imports
from chaind.adapters.new import ChaindFsAdapter
2022-03-14 20:53:29 +01:00
from chaind.driver import QueueDriver
2022-03-14 22:17:31 +01:00
from chaind.filter import StateFilter
2022-03-13 15:54:11 +01:00
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class MockCacheAdapter(CacheTokenTx):
def deserialize(self, v):
tx = CacheTokenTx()
h = hashlib.sha256()
h.update(v.encode('utf-8'))
z = h.digest()
2022-03-14 22:17:31 +01:00
tx.hash = z.hex()
2022-03-13 15:54:11 +01:00
return tx
2022-03-14 20:53:29 +01:00
class MockDispatcher:
def __init__(self):
self.fails = []
def add_fail(self, v):
self.fails.append(v)
def send(self, v):
if v not in self.fails:
raise RPCException('{} is in fails'.format(v))
pass
2022-03-14 22:17:31 +01:00
class MockTx:
def __init__(self, tx_hash, status=TxStatus.SUCCESS):
self.hash = tx_hash
self.status = status
2022-03-13 15:54:11 +01:00
class TestChaindFs(unittest.TestCase):
def setUp(self):
self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
self.path = tempfile.mkdtemp()
2022-03-14 20:53:29 +01:00
self.dispatcher = MockDispatcher()
self.adapter = ChaindFsAdapter(self.chain_spec, self.path, MockCacheAdapter().deserialize, self.dispatcher)
2022-03-13 15:54:11 +01:00
def tearDown(self):
shutil.rmtree(self.path)
def test_fs_setup(self):
data = os.urandom(128).hex()
2022-03-14 20:53:29 +01:00
hsh = self.adapter.put(data)
2022-03-13 15:54:11 +01:00
v = self.adapter.get(hsh)
self.assertEqual(data, v)
2022-03-14 20:53:29 +01:00
def test_fs_defer(self):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
self.dispatcher.add_fail(hsh)
self.adapter.dispatch(hsh)
txs = self.adapter.deferred()
self.assertEqual(len(txs), 1)
def test_fs_process(self):
drv = QueueDriver(self.adapter)
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
txs = self.adapter.upcoming()
self.assertEqual(len(txs), 0)
drv.process()
txs = self.adapter.upcoming()
self.assertEqual(len(txs), 1)
2022-03-14 22:17:31 +01:00
def test_fs_filter(self):
drv = QueueDriver(self.adapter)
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter)
tx = MockTx(hsh)
fltr.filter(None, None, tx)
def test_fs_filter_fail(self):
drv = QueueDriver(self.adapter)
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter)
tx = MockTx(hsh, TxStatus.ERROR)
fltr.filter(None, None, tx)
2022-03-13 15:54:11 +01:00
if __name__ == '__main__':
unittest.main()