Add purge queue entry method

This commit is contained in:
nolash 2021-06-01 13:47:55 +02:00
parent bdd678fb3f
commit 3e487d0702
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 33 additions and 4 deletions

View File

@ -72,22 +72,35 @@ class FsQueue:
return int.from_bytes(b, byteorder='big') return int.from_bytes(b, byteorder='big')
def get(self, key): def get(self, key):
idx = self.__get_backend_idx(key) idx = self.__get_backend_idx(key)
return self.backend.get(idx) return self.backend.get(idx)
def move(self, key, queuestate_from, queuestate_to): def move(self, key, to_state, from_state=None):
key_hex = key.hex() key_hex = key.hex()
cur_path = os.path.join(self.path_state[queuestate_from], key_hex) cur_path = os.path.join(self.path_state[from_state], key_hex)
fi = os.lstat(cur_path) fi = os.lstat(cur_path)
if not stat.S_ISLNK(fi.st_mode): if not stat.S_ISLNK(fi.st_mode):
logg.error('no such entry {}'.format(cur_path)) logg.error('no such entry {}'.format(cur_path))
raise FileNotFoundError(key_hex) raise FileNotFoundError(key_hex)
new_path = os.path.join(self.path_state[queuestate_to], key_hex) new_path = os.path.join(self.path_state[to_state], key_hex)
os.rename(cur_path, new_path) os.rename(cur_path, new_path)
def purge(self, key, queuestate):
key_hex = key.hex()
cur_path = os.path.join(self.path_state[queuestate], key_hex)
active_path = os.path.join(self.index_path, key_hex)
try:
fi = os.stat(cur_path)
os.unlink(active_path)
except FileNotFoundError:
os.unlink(cur_path)
logg.debug('purge queue entry {}'.format(key_hex))
def set(self, key, status): def set(self, key, status):
idx = self.__get_backend_idx(key) idx = self.__get_backend_idx(key)

View File

@ -54,7 +54,7 @@ class HexDirTest(unittest.TestCase):
tx_hash = os.urandom(32) tx_hash = os.urandom(32)
tx_content = os.urandom(128) tx_content = os.urandom(128)
self.q.add(tx_hash, tx_content) self.q.add(tx_hash, tx_content)
self.q.move(tx_hash, 'new', 'ready') self.q.move(tx_hash, 'ready', from_state='new')
f = open(os.path.join(self.q.path_state['ready'], tx_hash.hex()), 'rb') f = open(os.path.join(self.q.path_state['ready'], tx_hash.hex()), 'rb')
r = f.read() r = f.read()
@ -62,5 +62,21 @@ class HexDirTest(unittest.TestCase):
self.assertEqual(r, b'\x00' * 8) self.assertEqual(r, b'\x00' * 8)
def test_purge(self):
tx_hash = os.urandom(32)
tx_content = os.urandom(128)
self.q.add(tx_hash, tx_content)
self.q.move(tx_hash, 'ready', from_state='new')
self.q.purge(tx_hash, 'ready')
with self.assertRaises(FileNotFoundError):
entry_path = os.path.join(self.q.path_state['ready'], tx_hash.hex())
os.stat(entry_path)
with self.assertRaises(FileNotFoundError):
entry_path = os.path.join(self.q.index_path, tx_hash.hex())
os.stat(entry_path)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()