Allow backend objects to move between sync and get
This commit is contained in:
parent
b8c2b1b86a
commit
04d9901f0d
@ -8,6 +8,7 @@ from chainqueue.cache import CacheTx
|
|||||||
from chainqueue.entry import QueueEntry
|
from chainqueue.entry import QueueEntry
|
||||||
from chainqueue.error import (
|
from chainqueue.error import (
|
||||||
NotLocalTxError,
|
NotLocalTxError,
|
||||||
|
BackendIntegrityError,
|
||||||
)
|
)
|
||||||
from chainqueue.enum import (
|
from chainqueue.enum import (
|
||||||
StatusBits,
|
StatusBits,
|
||||||
@ -50,7 +51,17 @@ class Store:
|
|||||||
'modified',
|
'modified',
|
||||||
]:
|
]:
|
||||||
setattr(self, v, getattr(self.state_store, v))
|
setattr(self, v, getattr(self.state_store, v))
|
||||||
|
|
||||||
|
sync_err = None
|
||||||
|
for i in range(2):
|
||||||
|
try:
|
||||||
self.state_store.sync()
|
self.state_store.sync()
|
||||||
|
except Exception as e:
|
||||||
|
sync_err = e
|
||||||
|
continue
|
||||||
|
|
||||||
|
if sync_err != None:
|
||||||
|
raise BackendIntegrityError(sync_err)
|
||||||
|
|
||||||
|
|
||||||
def put(self, v, cache_adapter=CacheTx):
|
def put(self, v, cache_adapter=CacheTx):
|
||||||
@ -68,12 +79,17 @@ class Store:
|
|||||||
|
|
||||||
|
|
||||||
def get(self, k):
|
def get(self, k):
|
||||||
try:
|
v = None
|
||||||
|
for i in range(2):
|
||||||
s = self.index_store.get(k)
|
s = self.index_store.get(k)
|
||||||
except FileNotFoundError:
|
try:
|
||||||
raise NotLocalTxError(k)
|
|
||||||
self.state_store.sync()
|
self.state_store.sync()
|
||||||
v = self.state_store.get(s)
|
v = self.state_store.get(s)
|
||||||
|
except FileNotFoundError:
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
if v == None:
|
||||||
|
raise NotLocalTxError(k)
|
||||||
return (s, v,)
|
return (s, v,)
|
||||||
|
|
||||||
|
|
||||||
@ -184,3 +200,7 @@ class Store:
|
|||||||
entry = QueueEntry(self, k)
|
entry = QueueEntry(self, k)
|
||||||
entry.load()
|
entry.load()
|
||||||
return entry.test(self.RESERVED)
|
return entry.test(self.RESERVED)
|
||||||
|
|
||||||
|
|
||||||
|
def sync(self):
|
||||||
|
self.state_store.sync()
|
||||||
|
Loading…
Reference in New Issue
Block a user