diff --git a/chaind/adapters/new.py b/chaind/adapters/new.py index 21e587c..1ebbbbb 100644 --- a/chaind/adapters/new.py +++ b/chaind/adapters/new.py @@ -1,3 +1,6 @@ +# standard imports +import logging + # external imports from chainlib.error import RPCException from chainqueue import ( @@ -11,6 +14,8 @@ from chainqueue.store.fs import ( ) from shep.store.file import SimpleFileStoreFactory +logg = logging.getLogger(__name__) + class ChaindFsAdapter: @@ -26,8 +31,8 @@ class ChaindFsAdapter: def put(self, signed_tx): cache_tx = self.deserialize(signed_tx) - self.store.put(cache_tx.tx_hash, signed_tx) - return cache_tx.tx_hash + self.store.put(cache_tx.hash, signed_tx) + return cache_tx.hash def get(self, tx_hash): @@ -47,6 +52,14 @@ class ChaindFsAdapter: return self.store.deferred() + def succeed(self, block, tx): + return self.store.final(tx.hash, block, tx, error=False) + + + def fail(self, block, tx): + return self.store.final(tx.hash, block, tx, error=True) + + def enqueue(self, tx_hash): return self.store.enqueue(tx_hash) diff --git a/chaind/driver.py b/chaind/driver.py index 6d7d7f1..cc73ac2 100644 --- a/chaind/driver.py +++ b/chaind/driver.py @@ -20,7 +20,7 @@ class QueueDriver: for i in range(c): self.adapter.enqueue(txs[i]) if self.throttler != None: - self.throttler.inc() + self.throttler.inc(txs[i].hash) return c diff --git a/chaind/filter.py b/chaind/filter.py new file mode 100644 index 0000000..2b374e3 --- /dev/null +++ b/chaind/filter.py @@ -0,0 +1,19 @@ +# external imports +from chainlib.status import Status as TxStatus + + +class StateFilter: + + def __init__(self, adapter, throttler=None): + self.adapter = adapter + self.throttler = throttler + + + def filter(self, conn, block, tx, session=None): + cache_tx = self.adapter.get(tx.hash) + if tx.status == TxStatus.SUCCESS: + self.adapter.succeed(block, tx) + else: + self.adapter.fail(block, tx) + if self.throttler != None: + self.throttler.dec(tx.hash) diff --git a/tests/test_fs.py b/tests/test_fs.py index 7340bc2..86ca251 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -10,10 +10,12 @@ import hashlib from chainlib.chain import ChainSpec from chainqueue.cache import CacheTokenTx from chainlib.error import RPCException +from chainlib.status import Status as TxStatus # local imports from chaind.adapters.new import ChaindFsAdapter from chaind.driver import QueueDriver +from chaind.filter import StateFilter logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() @@ -26,7 +28,7 @@ class MockCacheAdapter(CacheTokenTx): h = hashlib.sha256() h.update(v.encode('utf-8')) z = h.digest() - tx.tx_hash = z.hex() + tx.hash = z.hex() return tx @@ -46,6 +48,14 @@ class MockDispatcher: pass +class MockTx: + + def __init__(self, tx_hash, status=TxStatus.SUCCESS): + self.hash = tx_hash + self.status = status + + + class TestChaindFs(unittest.TestCase): def setUp(self): @@ -89,5 +99,27 @@ class TestChaindFs(unittest.TestCase): self.assertEqual(len(txs), 1) + def test_fs_filter(self): + drv = QueueDriver(self.adapter) + + data = os.urandom(128).hex() + hsh = self.adapter.put(data) + + fltr = StateFilter(self.adapter) + tx = MockTx(hsh) + fltr.filter(None, None, tx) + + + def test_fs_filter_fail(self): + drv = QueueDriver(self.adapter) + + data = os.urandom(128).hex() + hsh = self.adapter.put(data) + + fltr = StateFilter(self.adapter) + tx = MockTx(hsh, TxStatus.ERROR) + fltr.filter(None, None, tx) + + if __name__ == '__main__': unittest.main()