Add change method

This commit is contained in:
lash 2022-03-11 12:01:56 +00:00
parent 9ad005ae42
commit 10fdb77c94
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 112 additions and 5 deletions

View File

@ -1,5 +1,6 @@
- 0.1.1
* Add optional, pluggable verifier to protect state transition
* Add change method for atomic simultaneous set and unset
- 0.1.0
* Release version bump
- 0.0.19:

View File

@ -14,8 +14,8 @@ class PersistedState(State):
:type logger: object
"""
def __init__(self, factory, bits, logger=None):
super(PersistedState, self).__init__(bits, logger=logger)
def __init__(self, factory, bits, logger=None, verifier=None):
super(PersistedState, self).__init__(bits, logger=logger, verifier=verifier)
self.__store_factory = factory
self.__stores = {}
@ -78,6 +78,26 @@ class PersistedState(State):
return to_state
def change(self, key, bits_set, bits_unset):
"""Persist a new state for a key or key/content.
See shep.state.State.unset
"""
from_state = self.state(key)
k_from = self.name(from_state)
to_state = super(PersistedState, self).change(key, bits_set, bits_unset)
k_to = self.name(to_state)
self.__ensure_store(k_to)
contents = self.__stores[k_from].get(key)
self.__stores[k_to].add(key, contents)
self.__stores[k_from].remove(key)
return to_state
def move(self, key, to_state):
"""Persist a new state for a key or key/content.

View File

@ -332,7 +332,7 @@ class State:
if self.verifier != None:
r = self.verifier(self, from_state, to_state)
if r != None:
raise StateTransitionInvalid('{} -> {}: {}'.format(from_state, to_state, r))
raise StateTransitionInvalid(r)
self.__add_state_list(to_state, key)
current_state_list.pop(idx)
@ -367,7 +367,7 @@ class State:
return self.__move(key, current_state, to_state)
def unset(self, key, not_state):
"""Unset a single bit, moving to a pure or alias state.
@ -404,6 +404,28 @@ class State:
return self.__move(key, current_state, to_state)
def change(self, key, sets, unsets):
current_state = self.__keys_reverse.get(key)
if current_state == None:
raise StateItemNotFound(key)
to_state = current_state | sets
to_state &= ~unsets & self.__limit
if sets == 0:
to_state = current_state & (~unsets)
if to_state == current_state:
raise ValueError('invalid change by unsets for state {}: {}'.format(key, unsets))
if to_state == getattr(self, self.base_state_name):
raise ValueError('State {} for {} cannot be reverted to {}'.format(current_state, key, self.base_state_name))
new_state = self.__reverse.get(to_state)
if new_state == None:
raise StateInvalid('resulting to state is unknown: {}'.format(to_state))
return self.__move(key, current_state, to_state)
def state(self, key):
"""Return the current numeric state for the given content key.

View File

@ -73,7 +73,41 @@ class TestStateReport(unittest.TestCase):
with self.assertRaises(FileNotFoundError):
os.stat(fp)
def test_change(self):
self.states.alias('inky', self.states.FOO | self.states.BAR)
self.states.put('abcd', state=self.states.FOO, contents='foo')
self.states.change('abcd', self.states.BAR, 0)
fp = os.path.join(self.d, 'INKY', 'abcd')
f = open(fp, 'r')
v = f.read()
f.close()
fp = os.path.join(self.d, 'FOO', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
fp = os.path.join(self.d, 'BAR', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
self.states.change('abcd', 0, self.states.BAR)
fp = os.path.join(self.d, 'FOO', 'abcd')
f = open(fp, 'r')
v = f.read()
f.close()
fp = os.path.join(self.d, 'INKY', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
fp = os.path.join(self.d, 'BAR', 'abcd')
with self.assertRaises(FileNotFoundError):
os.stat(fp)
def test_set(self):
self.states.alias('xyzzy', self.states.FOO | self.states.BAR)
self.states.put('abcd', state=self.states.FOO, contents='foo')

View File

@ -106,5 +106,35 @@ class TestState(unittest.TestCase):
self.assertEqual(states.from_name('foo'), states.FOO)
def test_change(self):
states = State(3)
states.add('foo')
states.add('bar')
states.add('baz')
states.alias('inky', states.FOO | states.BAR)
states.alias('pinky', states.FOO | states.BAZ)
states.put('abcd')
states.next('abcd')
states.set('abcd', states.BAR)
states.change('abcd', states.BAZ, states.BAR)
self.assertEqual(states.state('abcd'), states.PINKY)
def test_change_onezero(self):
states = State(3)
states.add('foo')
states.add('bar')
states.add('baz')
states.alias('inky', states.FOO | states.BAR)
states.alias('pinky', states.FOO | states.BAZ)
states.put('abcd')
states.next('abcd')
states.change('abcd', states.BAR, 0)
self.assertEqual(states.state('abcd'), states.INKY)
states.change('abcd', 0, states.BAR)
self.assertEqual(states.state('abcd'), states.FOO)
if __name__ == '__main__':
unittest.main()