Add change tx state, move queue state

This commit is contained in:
nolash 2021-06-01 13:26:09 +02:00
parent 565a58252a
commit bdd678fb3f
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 83 additions and 3 deletions

View File

@ -2,6 +2,7 @@
import stat
import logging
import os
import stat
# local imports
from chainqueue.enum import (
@ -19,7 +20,7 @@ class FsQueueBackend:
raise NotImplementedError()
def get_index(self, idx):
def get(self, idx):
raise NotImplementedError()
@ -58,11 +59,44 @@ class FsQueue:
f.close()
ptr_path = os.path.join(self.path_state['new'], key_hex)
os.link(entry_path, ptr_path)
os.symlink(entry_path, ptr_path)
logg.debug('added new queue entry {} -> {} index {}'.format(ptr_path, entry_path, c))
def __get_backend_idx(self, key):
entry_path = os.path.join(self.index_path, key.hex())
f = open(entry_path, 'rb')
b = f.read()
f.close()
return int.from_bytes(b, byteorder='big')
def get(self, key):
idx = self.__get_backend_idx(key)
return self.backend.get(idx)
def move(self, key, queuestate_from, queuestate_to):
key_hex = key.hex()
cur_path = os.path.join(self.path_state[queuestate_from], key_hex)
fi = os.lstat(cur_path)
if not stat.S_ISLNK(fi.st_mode):
logg.error('no such entry {}'.format(cur_path))
raise FileNotFoundError(key_hex)
new_path = os.path.join(self.path_state[queuestate_to], key_hex)
os.rename(cur_path, new_path)
def set(self, key, status):
idx = self.__get_backend_idx(key)
prefix = status_bytes(status)
self.backend.set_prefix(idx, prefix)
logg.debug('set queue state {} to {}'.format(key.hex(), status))
@staticmethod
def __state_dirs(path):
r = []

View File

@ -72,19 +72,32 @@ class HexDir:
return r
def __cursor(self, idx):
return idx * (self.prefix_length + self.key_length)
def set_prefix(self, idx, prefix):
l = len(prefix)
if l != self.prefix_length:
raise ValueError('expected prefix length {}, got {}'.format(self.prefix_length, l))
if not isinstance(prefix, bytes):
raise ValueError('prefix must be bytes, got {}'.format(type(content).__name__))
cursor = idx * (self.prefix_length + self.key_length)
cursor = self.__cursor(idx)
f = open(self.master_file, 'rb+')
f.seek(cursor)
f.write(prefix)
f.close()
def get(self, idx):
cursor = self.__cursor(idx)
f = open(self.master_file, 'rb')
f.seek(cursor)
prefix = f.read(self.prefix_length)
key = f.read(self.key_length)
f.close()
return (prefix, key)
def to_subpath(self, hx):
lead = ''
for i in range(0, self.__levels, 2):

View File

@ -8,6 +8,7 @@ import os
# local imports
from chainqueue.fs.cache import FsQueue
from chainqueue.fs.dir import HexDir
from chainqueue.enum import StatusBits
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
@ -37,6 +38,29 @@ class HexDirTest(unittest.TestCase):
f.close()
self.assertEqual(r, b'\x00' * 8)
def test_change(self):
tx_hash = os.urandom(32)
tx_content = os.urandom(128)
self.q.add(tx_hash, tx_content)
self.q.set(tx_hash, StatusBits.QUEUED)
(tx_status, tx_content_retrieved) = self.q.get(tx_hash)
status = int.from_bytes(tx_status, byteorder='big')
self.assertEqual(status & StatusBits.QUEUED, StatusBits.QUEUED)
def test_move(self):
tx_hash = os.urandom(32)
tx_content = os.urandom(128)
self.q.add(tx_hash, tx_content)
self.q.move(tx_hash, 'new', 'ready')
f = open(os.path.join(self.q.path_state['ready'], tx_hash.hex()), 'rb')
r = f.read()
f.close()
self.assertEqual(r, b'\x00' * 8)
if __name__ == '__main__':
unittest.main()

View File

@ -71,5 +71,14 @@ class HexDirTest(unittest.TestCase):
self.assertEqual(b'ff', r)
def test_get(self):
self.hexdir.add(b'\xde\xad\xbe\xef', b'foo', b'ab')
self.hexdir.add(b'\xbe\xef\xfe\xed', b'bar', b'cd')
self.hexdir.add(b'\x01\x02\x03\x04', b'baz', b'ef')
(prefix, key) = self.hexdir.get(1)
self.assertEqual(b'\xbe\xef\xfe\xed', key)
self.assertEqual(b'cd', prefix)
if __name__ == '__main__':
unittest.main()