Harden query tests

This commit is contained in:
lash 2022-03-13 16:04:14 +00:00
parent 04dfb185ce
commit 485b33866b
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 52 additions and 12 deletions

View File

@ -99,4 +99,5 @@ class Store:
def fail(self, k):
entry = QueueEntry(self, k)
entry.reject()
entry.load()
entry.sendfail()

View File

@ -1,4 +1,5 @@
# standard imports
import os
import tempfile
import unittest
import logging
@ -40,24 +41,62 @@ class TestIntegrateBase(TestShepBase):
def test_integration_valid(self):
self.store.put(b'foo'.hex(), b'bar'.hex(), cache_adapter=MockCacheTokenTx)
self.store.put(os.urandom(4).hex(), os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
def test_state_enqueu(self):
hx = b'foo'.hex()
self.store.put(hx, b'bar'.hex(), cache_adapter=MockCacheTokenTx)
self.store.get(hx)
self.store.enqueue(hx)
v = self.store.upcoming()
def test_state_default(self):
hx = os.urandom(4).hex()
self.store.put(hx, os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
v = self.store.pending()
self.assertEqual(len(v), 1)
self.assertEqual(v[0], hx)
def test_state_defer(self):
hx = b'foo'.hex()
self.store.put(hx, b'bar'.hex(), cache_adapter=MockCacheTokenTx)
def test_state_enqueue(self):
hx = os.urandom(4).hex()
self.store.put(hx, os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.get(hx)
self.store.enqueue(hx)
v = self.store.upcoming()
self.assertEqual(len(v), 1)
v = self.store.pending()
self.assertEqual(len(v), 0)
def test_state_defer(self):
hx = os.urandom(4).hex()
self.store.put(hx, os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.fail(hx)
v = self.store.deferred()
self.assertEqual(len(v), 1)
self.assertEqual(v[0], hx)
def test_state_multiple(self):
hx = os.urandom(4).hex()
self.store.put(hx, os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.fail(hx)
hx = os.urandom(8).hex()
self.store.put(hx, os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.fail(hx)
v = self.store.deferred()
self.assertEqual(len(v), 2)
def test_state_multiple_sort(self):
hx = os.urandom(4).hex()
self.store.put(hx, os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.fail(hx)
hx = os.urandom(4).hex()
self.store.put(hx, os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.enqueue(hx)
hx = os.urandom(4).hex()
self.store.put(hx, os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
self.store.fail(hx)
hx = os.urandom(4).hex()
self.store.put(hx, os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
v = self.store.deferred()
self.assertEqual(len(v), 2)
if __name__ == '__main__':