Add hexdir handler, test

This commit is contained in:
nolash 2021-06-01 11:37:32 +02:00
parent 19d4e38dcf
commit 245832235d
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 172 additions and 0 deletions

104
chainqueue/fs/dir.py Normal file
View File

@ -0,0 +1,104 @@
# standard imports
import os
import stat
import logging
# external imports
from hexathon import valid as valid_hex
logg = logging.getLogger(__name__)
class HexDir:
def __init__(self, root_path, key_length, levels=2, prefix_length=0):
self.path = root_path
self.key_length = key_length
self.prefix_length = prefix_length
self.__levels = levels + 2
fi = None
try:
fi = os.stat(self.path)
self.__verify_directory()
except FileNotFoundError:
HexDir.__setup_directory(self.path)
if not stat.S_ISDIR(fi.st_mode):
raise ValueError('{} is not a directory'.format(self.path))
self.master_file = os.path.join(self.path, 'master')
@property
def levels(self):
return self.__levels - 2
def add(self, key, content, prefix=b''):
l = len(key)
if l != self.key_length:
raise ValueError('expected key length {}, got {}'.format(self.key_length, l))
l = len(prefix)
if l != self.prefix_length:
raise ValueError('expected prefix length {}, got {}'.format(self.prefix_length, l))
if not isinstance(content, bytes):
raise ValueError('content must be bytes, got {}'.format(type(content).__name__))
if prefix != None and not isinstance(prefix, bytes):
raise ValueError('prefix must be bytes, got {}'.format(type(content).__name__))
key_hex = key.hex()
entry_path = self.to_filepath(key_hex)
os.makedirs(os.path.dirname(entry_path), exist_ok=True)
f = open(entry_path, 'wb')
f.write(content)
f.close()
f = open(self.master_file, 'ab')
if prefix != None:
f.write(prefix)
f.write(key)
f.close()
logg.info('created new entry {} in {}'.format(key_hex, entry_path))
def set_prefix(self, idx, prefix):
l = len(prefix)
if l != self.prefix_length:
raise ValueError('expected prefix length {}, got {}'.format(self.prefix_length, l))
if not isinstance(prefix, bytes):
raise ValueError('prefix must be bytes, got {}'.format(type(content).__name__))
cursor = idx * (self.prefix_length + self.key_length)
f = open(self.master_file, 'rb+')
f.seek(cursor)
f.write(prefix)
f.close()
def to_subpath(self, hx):
lead = ''
for i in range(0, self.__levels, 2):
lead += hx[i:i+2] + '/'
return lead.upper()
def to_dirpath(self, hx):
sub_path = self.to_subpath(hx)
return os.path.join(self.path, sub_path)
def to_filepath(self, hx):
dir_path = self.to_dirpath(hx)
file_path = os.path.join(dir_path, hx.upper())
return file_path
def __verify_directory(self):
return True
@staticmethod
def __prepare_directory(path):
os.makedirs(path, exist_ok=True)
state_file = os.path.join(path, 'master')
f = open(state_file, 'w')
f.close()

68
tests/test_hexdir.py Normal file
View File

@ -0,0 +1,68 @@
# standard imports
import unittest
import tempfile
import shutil
import logging
import os
# local imports
from chainqueue.fs.dir import HexDir
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class HexDirTest(unittest.TestCase):
def setUp(self):
self.dir = tempfile.mkdtemp()
self.hexdir = HexDir(self.dir, 4, 3, 2)
logg.debug('setup hexdir root {}'.format(self.dir))
def tearDown(self):
shutil.rmtree(self.dir)
logg.debug('cleaned hexdir root {}'.format(self.dir))
def test_path(self):
content = b'cdef'
prefix = b'ab'
label = b'\xde\xad\xbe\xef'
self.hexdir.add(label, content, prefix=prefix)
file_path = os.path.join(self.dir, 'DE', 'AD', 'BE', label.hex().upper())
f = open(file_path, 'rb')
r = f.read()
f.close()
self.assertEqual(content, r)
f = open(self.hexdir.master_file, 'rb')
r = f.read()
f.close()
self.assertEqual(prefix + label, r)
def test_size(self):
content = b'cdef'
prefix = b'ab'
label = b'\xde\xad\xbe\xef'
with self.assertRaises(ValueError):
self.hexdir.add(label, content, prefix=b'a')
def test_edit(self):
self.hexdir.add(b'\xde\xad\xbe\xef', b'foo', b'ab')
self.hexdir.add(b'\xbe\xef\xfe\xed', b'bar', b'cd')
self.hexdir.add(b'\x01\x02\x03\x04', b'baz', b'ef')
self.hexdir.set_prefix(1, b'ff')
f = open(self.hexdir.master_file, 'rb')
f.seek(6)
r = f.read(2)
f.close()
self.assertEqual(b'ff', r)
if __name__ == '__main__':
unittest.main()