shep/tests/test_file.py

310 lines
8.8 KiB
Python

# standard imports
import unittest
import tempfile
import os
import shutil
# local imports
from shep.persist import PersistedState
from shep.store.file import SimpleFileStoreFactory
from shep.error import (
StateExists,
StateInvalid,
StateItemExists,
StateLockedKey,
)
class TestFileStore(unittest.TestCase):
def setUp(self):
self.d = tempfile.mkdtemp()
self.factory = SimpleFileStoreFactory(self.d)
self.states = PersistedState(self.factory.add, 3)
self.states.add('foo')
self.states.add('bar')
self.states.add('baz')
def tearDown(self):
shutil.rmtree(self.d)
def test_add(self):
self.states.put('abcd', state=self.states.FOO, contents='baz')
fp = os.path.join(self.d, 'FOO', 'abcd')
f = open(fp, 'r')
v = f.read()
f.close()
self.assertEqual(v, 'baz')
def test_dup(self):
self.states.put('abcd', state=self.states.FOO)
with self.assertRaises(StateItemExists):
self.states.put('abcd', state=self.states.FOO)
with self.assertRaises(StateItemExists): #FileExistsError):
self.states.put('abcd', state=self.states.FOO)
def test_list(self):
self.states.put('abcd', state=self.states.FOO)
self.states.put('xx!', state=self.states.FOO)
self.states.put('1234', state=self.states.BAR)
keys = self.states.list(self.states.FOO)
self.assertIn('abcd', keys)
self.assertIn('xx!', keys)
self.assertNotIn('1234', keys)
self.states.alias('xyzzy', self.states.BAR | self.states.FOO)
self.states.put('yyy', state=self.states.XYZZY)
keys = self.states.list(self.states.XYZZY)
self.assertIn('yyy', keys)
self.assertNotIn('1234', keys)
self.assertNotIn('xx!', keys)
def test_move(self):
self.states.put('abcd', state=self.states.FOO, contents='foo')
self.states.move('abcd', self.states.BAR)
fp = os.path.join(self.d, 'BAR', 'abcd')
f = open(fp, 'r')
v = f.read()
f.close()
fp = os.path.join(self.d, 'FOO', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
def test_change(self):
self.states.alias('inky', self.states.FOO | self.states.BAR)
self.states.put('abcd', state=self.states.FOO, contents='foo')
self.states.change('abcd', self.states.BAR, 0)
fp = os.path.join(self.d, 'INKY', 'abcd')
f = open(fp, 'r')
v = f.read()
f.close()
fp = os.path.join(self.d, 'FOO', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
fp = os.path.join(self.d, 'BAR', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
self.states.change('abcd', 0, self.states.BAR)
fp = os.path.join(self.d, 'FOO', 'abcd')
f = open(fp, 'r')
v = f.read()
f.close()
fp = os.path.join(self.d, 'INKY', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
fp = os.path.join(self.d, 'BAR', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
def test_set(self):
self.states.alias('xyzzy', self.states.FOO | self.states.BAR)
self.states.put('abcd', state=self.states.FOO, contents='foo')
self.states.set('abcd', self.states.BAR)
fp = os.path.join(self.d, 'XYZZY', 'abcd')
f = open(fp, 'r')
v = f.read()
f.close()
fp = os.path.join(self.d, 'FOO', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
fp = os.path.join(self.d, 'BAR', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
self.states.unset('abcd', self.states.FOO)
fp = os.path.join(self.d, 'BAR', 'abcd')
f = open(fp, 'r')
v = f.read()
f.close()
fp = os.path.join(self.d, 'FOO', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
fp = os.path.join(self.d, 'XYZZY', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
def test_sync_one(self):
self.states.put('abcd', state=self.states.FOO, contents='foo')
self.states.put('xxx', state=self.states.FOO)
self.states.put('yyy', state=self.states.FOO)
fp = os.path.join(self.d, 'FOO', 'yyy')
f = open(fp, 'w')
f.write('')
f.close()
fp = os.path.join(self.d, 'FOO', 'zzzz')
f = open(fp, 'w')
f.write('xyzzy')
f.close()
self.states.sync(self.states.FOO)
self.assertEqual(self.states.get('yyy'), None)
self.assertEqual(self.states.get('zzzz'), 'xyzzy')
def test_sync_all(self):
self.states.put('abcd', state=self.states.FOO)
self.states.put('xxx', state=self.states.BAR)
fp = os.path.join(self.d, 'FOO', 'abcd')
f = open(fp, 'w')
f.write('foofoo')
f.close()
fp = os.path.join(self.d, 'BAR', 'zzzz')
f = open(fp, 'w')
f.write('barbar')
f.close()
fp = os.path.join(self.d, 'BAR', 'yyyy')
f = open(fp, 'w')
f.close()
self.states.sync()
self.assertEqual(self.states.get('abcd'), None)
self.assertEqual(self.states.state('abcd'), self.states.FOO)
self.assertEqual(self.states.get('zzzz'), 'barbar')
self.assertEqual(self.states.state('zzzz'), self.states.BAR)
self.assertEqual(self.states.get('yyyy'), None)
self.assertEqual(self.states.state('yyyy'), self.states.BAR)
def test_path(self):
self.states.put('yyy', state=self.states.FOO)
d = os.path.join(self.d, 'FOO')
self.assertEqual(self.states.path(self.states.FOO), d)
d = os.path.join(self.d, 'FOO', 'BAR')
self.assertEqual(self.states.path(self.states.FOO, key='BAR'), d)
def test_next(self):
self.states.put('abcd')
self.states.next('abcd')
self.assertEqual(self.states.state('abcd'), self.states.FOO)
self.states.next('abcd')
self.assertEqual(self.states.state('abcd'), self.states.BAR)
self.states.next('abcd')
self.assertEqual(self.states.state('abcd'), self.states.BAZ)
with self.assertRaises(StateInvalid):
self.states.next('abcd')
v = self.states.state('abcd')
self.assertEqual(v, self.states.BAZ)
fp = os.path.join(self.d, 'FOO', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
fp = os.path.join(self.d, 'BAZ', 'abcd')
os.stat(fp)
def test_replace(self):
self.states.put('abcd')
self.states.replace('abcd', 'foo')
self.assertEqual(self.states.get('abcd'), 'foo')
fp = os.path.join(self.d, 'NEW', 'abcd')
f = open(fp, 'r')
r = f.read()
f.close()
self.assertEqual(r, 'foo')
def test_factory_ls(self):
self.states.put('abcd')
self.states.put('xxxx', state=self.states.BAZ)
r = self.factory.ls()
self.assertEqual(len(r), 2)
self.states.put('yyyy', state=self.states.BAZ)
r = self.factory.ls()
self.assertEqual(len(r), 2)
self.states.put('zzzz', state=self.states.BAR)
r = self.factory.ls()
self.assertEqual(len(r), 3)
def test_lock(self):
factory = SimpleFileStoreFactory(self.d, use_lock=True)
states = PersistedState(factory.add, 3)
states.add('foo')
states.add('bar')
states.add('baz')
states.alias('xyzzy', states.FOO | states.BAR)
states.alias('plugh', states.FOO | states.BAR | states.BAZ)
states.put('abcd')
lock_path = os.path.join(self.d, '.lock')
os.stat(lock_path)
fp = os.path.join(self.d, '.lock', 'xxxx')
f = open(fp, 'w')
f.close()
with self.assertRaises(StateLockedKey):
states.put('xxxx')
os.unlink(fp)
states.put('xxxx')
states.set('xxxx', states.FOO)
states.set('xxxx', states.BAR)
states.replace('xxxx', contents='zzzz')
fp = os.path.join(self.d, '.lock', 'xxxx')
f = open(fp, 'w')
f.close()
with self.assertRaises(StateLockedKey):
states.set('xxxx', states.BAZ)
v = states.state('xxxx')
self.assertEqual(v, states.XYZZY)
with self.assertRaises(StateLockedKey):
states.unset('xxxx', states.FOO)
with self.assertRaises(StateLockedKey):
states.replace('xxxx', contents='yyyy')
v = states.get('xxxx')
self.assertEqual(v, 'zzzz')
if __name__ == '__main__':
unittest.main()