chaind/tests/test_fs.py

118 lines
2.7 KiB
Python
Raw Normal View History

2022-03-13 15:54:11 +01:00
# standard imports
import os
import unittest
import shutil
import logging
# external imports
2022-03-14 22:17:31 +01:00
from chainlib.status import Status as TxStatus
2022-03-13 15:54:11 +01:00
# local imports
2022-03-14 20:53:29 +01:00
from chaind.driver import QueueDriver
2022-03-14 22:17:31 +01:00
from chaind.filter import StateFilter
2022-03-13 15:54:11 +01:00
# test imports
from chaind.unittest.common import (
MockTx,
2022-05-05 17:02:21 +02:00
MockBlock,
MockCacheAdapter,
2022-04-30 07:45:02 +02:00
MockDispatcher,
)
2022-04-30 07:45:02 +02:00
from chaind.unittest.fs import TestChaindFsBase
2022-03-14 20:53:29 +01:00
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
2022-03-14 22:17:31 +01:00
class TestChaindFs(TestChaindFsBase):
2022-03-13 15:54:11 +01:00
def setUp(self):
self.cache_adapter = MockCacheAdapter
2022-03-15 10:33:14 +01:00
self.dispatcher = MockDispatcher()
super(TestChaindFs, self).setUp()
2022-03-13 15:54:11 +01:00
def tearDown(self):
shutil.rmtree(self.path)
def test_fs_setup(self):
data = os.urandom(128).hex()
2022-03-14 20:53:29 +01:00
hsh = self.adapter.put(data)
2022-03-13 15:54:11 +01:00
v = self.adapter.get(hsh)
self.assertEqual(data, v)
2022-04-30 07:45:02 +02:00
def test_fs_fail(self):
2022-03-14 20:53:29 +01:00
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
2022-04-30 07:45:02 +02:00
self.dispatcher.add_fail(data)
r = self.adapter.dispatch(hsh)
self.assertFalse(r)
txs = self.adapter.failed()
2022-03-14 20:53:29 +01:00
self.assertEqual(len(txs), 1)
def test_fs_process(self):
drv = QueueDriver(self.adapter)
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
txs = self.adapter.upcoming()
self.assertEqual(len(txs), 0)
drv.process()
txs = self.adapter.upcoming()
self.assertEqual(len(txs), 1)
2022-03-14 22:17:31 +01:00
def test_fs_filter(self):
drv = QueueDriver(self.adapter)
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
2022-03-14 22:17:31 +01:00
tx = MockTx(hsh)
2022-05-05 17:02:21 +02:00
block = MockBlock(42)
fltr.filter(None, block, tx)
2022-03-14 22:17:31 +01:00
def test_fs_filter_fail(self):
drv = QueueDriver(self.adapter)
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
2022-03-14 22:17:31 +01:00
tx = MockTx(hsh, TxStatus.ERROR)
2022-05-05 17:02:21 +02:00
block = MockBlock(42)
fltr.filter(None, block, tx)
2022-03-14 22:17:31 +01:00
2022-04-30 07:45:02 +02:00
def test_upcoming(self):
drv = QueueDriver(self.adapter)
txs = []
for i in range(10):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
txs.append(hsh)
self.adapter.enqueue(hsh)
r = self.adapter.upcoming(limit=5)
self.assertEqual(len(r), 5)
r = self.adapter.dispatch(txs[0])
self.assertTrue(r)
r = self.adapter.upcoming(limit=5)
self.assertEqual(len(r), 4)
2022-03-13 15:54:11 +01:00
if __name__ == '__main__':
unittest.main()