Compare commits

..

8 Commits

Author SHA1 Message Date
lash
da9fb5925d Upgrade shep 2022-05-05 15:11:01 +00:00
lash
cbf00281c6 Remove sync for each get 2022-05-05 14:22:21 +00:00
lash
01ad409077 Raise correct error in index store exists check 2022-05-04 18:37:02 +00:00
lash
3a8ec01588 Allow for sync skip in queue store instantiation 2022-05-04 05:44:47 +00:00
lash
b63793fd9b Add purge to chainqueue store object 2022-05-02 20:21:51 +00:00
lash
84b8eb10e6 Remove spam logline 2022-05-01 07:40:32 +00:00
lash
532ff230b4 Remove race waits (defer to client layer) 2022-05-01 06:44:33 +00:00
lash
f7c09acfe2 Add race delay 2022-05-01 06:27:52 +00:00
6 changed files with 62 additions and 34 deletions

View File

@@ -1,3 +1,27 @@
- 0.1.13
* Remove sync on each get
* Upgrade shep to guarantee atomic state lock state
- 0.1.12
* Raise correct exception from index store exists check
- 0.1.11
* Allow for sync skip in store instantiation
- 0.1.10
* Improve logging
- 0.1.9
* Upgrade deps
- 0.1.8
* Upgrade deps
- 0.1.7
* Improve logging
- 0.1.6
* Sort upcoming queue item chronologically
* Add unit testing for upcoming query method
- 0.1.5
* Add reserved state check method
- 0.1.4
* Dependency cleanups
- 0.1.3
* Add CLI args and config handling, settings object
- 0.1.2
* Add CLI inspection tools
- 0.1.1

View File

@@ -24,12 +24,6 @@ class CacheIntegrityError(ChainQueueException):
pass
class BackendIntegrityError(ChainQueueException):
"""Raised when queue backend has invalid state
"""
pass
class DuplicateTxError(ChainQueueException):
"""Backend already knows transaction
"""

View File

@@ -2,14 +2,12 @@
import re
import datetime
import logging
import time
# local imports
from chainqueue.cache import CacheTx
from chainqueue.entry import QueueEntry
from chainqueue.error import (
NotLocalTxError,
BackendIntegrityError,
)
from chainqueue.error import NotLocalTxError
from chainqueue.enum import (
StatusBits,
all_errors,
@@ -31,7 +29,7 @@ all_local_errors = all_errors() - StatusBits.NETWORK_ERROR
re_u = r'^[^_][_A-Z]+$'
class Store:
def __init__(self, chain_spec, state_store, index_store, counter, cache=None):
def __init__(self, chain_spec, state_store, index_store, counter, cache=None, sync=True):
self.chain_spec = chain_spec
self.cache = cache
self.state_store = state_store
@@ -49,19 +47,21 @@ class Store:
'unset',
'name',
'modified',
'purge',
]:
setattr(self, v, getattr(self.state_store, v))
if not sync:
return
sync_err = None
for i in range(2):
try:
self.state_store.sync()
except Exception as e:
sync_err = e
continue
try:
self.state_store.sync()
except Exception as e:
sync_err = e
if sync_err != None:
raise BackendIntegrityError(sync_err)
raise FileNotFoundError(sync_err)
def put(self, v, cache_adapter=CacheTx):
@@ -80,16 +80,14 @@ class Store:
def get(self, k):
v = None
for i in range(2):
s = self.index_store.get(k)
try:
self.state_store.sync()
v = self.state_store.get(s)
except FileNotFoundError:
continue
break
s = self.index_store.get(k)
err = None
try:
v = self.state_store.get(s)
except FileNotFoundError as e:
err = e
if v == None:
raise NotLocalTxError(k)
raise NotLocalTxError('could not find tx {}: {}'.format(k, err))
return (s, v,)
@@ -110,10 +108,12 @@ class Store:
if item_state & state != item_state:
continue
logg.info('state {} {}'.format(ref, item_state))
if item_state & not_state > 0:
continue
item_state_str = self.state_store.name(item_state)
logg.info('state {} {} ({})'.format(ref, item_state_str, item_state))
if threshold != None:
v = self.state_store.modified(ref)
if v > threshold:

View File

@@ -6,7 +6,10 @@ import logging
from leveldir.hex import HexDir
# local imports
from chainqueue.error import DuplicateTxError
from chainqueue.error import (
DuplicateTxError,
NotLocalTxError,
)
logg = logging.getLogger(__name__)
@@ -22,7 +25,7 @@ class IndexStore(HexDir):
existing = None
try:
existing = self.get(k)
except FileNotFoundError:
except NotLocalTxError:
pass
return existing != None
@@ -37,7 +40,14 @@ class IndexStore(HexDir):
def get(self, k):
fp = self.store.to_filepath(k)
f = open(fp, 'rb')
f = None
err = None
try:
f = open(fp, 'rb')
except FileNotFoundError as e:
err = e
if err != None:
raise NotLocalTxError(err)
v = f.read()
f.close()
return v.decode('utf-8')
@@ -64,7 +74,7 @@ class CounterStore:
v = f.read(8)
self.count = int.from_bytes(v, byteorder='big')
logg.info('counter starts at {}'.format(self.count))
logg.debug('counter starts at {}'.format(self.count))
f.seek(0)

View File

@@ -6,4 +6,4 @@ leveldir~=0.3.0
confini~=0.6.0
#pyxdg~=0.27
chainlib~=0.1.1
shep~=0.2.3
shep~=0.2.6

View File

@@ -1,6 +1,6 @@
[metadata]
name = chainqueue
version = 0.1.6
version = 0.1.13
description = Generic blockchain transaction queue control
author = Louis Holbrook
author_email = dev@holbrook.no