Add db to enable stale cache record metdata #105

Open
lash wants to merge 8 commits from lash/stalecache into master
3 changed files with 105 additions and 10 deletions
Showing only changes of commit 9b3ed0d6ae - Show all commits

View File

@ -26,6 +26,10 @@ func NewSubPrefixDb(store db.Db, pfx []byte) *SubPrefixDb {
} }
} }
func(s *SubPrefixDb) SetSession(sessionId string) {
s.store.SetSession(sessionId)
}
func(s *SubPrefixDb) toKey(k []byte) []byte { func(s *SubPrefixDb) toKey(k []byte) []byte {
return append(s.pfx, k...) return append(s.pfx, k...)
} }

View File

@ -1,6 +1,7 @@
package storage package storage
import ( import (
"bytes"
"context" "context"
"time" "time"
"encoding/binary" "encoding/binary"
@ -13,9 +14,10 @@ type TimedDb struct {
tdb *SubPrefixDb tdb *SubPrefixDb
ttl time.Duration ttl time.Duration
parentPfx uint8 parentPfx uint8
parentSession []byte
matchPfx map[uint8][][]byte
} }
func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb { func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb {
var b [2]byte var b [2]byte
binary.BigEndian.PutUint16(b[:], SUBPREFIX_TIME) binary.BigEndian.PutUint16(b[:], SUBPREFIX_TIME)
@ -27,11 +29,43 @@ func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb {
} }
} }
func(tib *TimedDb) WithMatch(pfx uint8, keyPart []byte) *TimedDb {
if tib.matchPfx == nil {
tib.matchPfx = make(map[uint8][][]byte)
}
tib.matchPfx[pfx] = append(tib.matchPfx[pfx], keyPart)
return tib
}
func(tib *TimedDb) checkPrefix(pfx uint8, key []byte) bool {
var v []byte
if tib.matchPfx == nil {
return true
}
for _, v = range(tib.matchPfx[pfx]) {
l := len(v)
k := append(tib.parentSession, key...)
if l > len(k) {
continue
}
logg.Debugf("check the prefix", "v", v, "k", k, "l", l )
if bytes.Equal(v, k[:l]) {
return true
}
}
return false
}
func(tib *TimedDb) SetPrefix(pfx uint8) { func(tib *TimedDb) SetPrefix(pfx uint8) {
tib.Db.SetPrefix(pfx) tib.Db.SetPrefix(pfx)
tib.parentPfx = pfx tib.parentPfx = pfx
} }
func(tib *TimedDb) SetSession(session string) {
tib.Db.SetSession(session)
tib.parentSession = []byte(session)
}
func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error { func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error {
t := time.Now() t := time.Now()
b, err := t.MarshalBinary() b, err := t.MarshalBinary()
@ -42,14 +76,19 @@ func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error {
if err != nil { if err != nil {
return err return err
} }
k := append([]byte{tib.parentPfx}, key...)
defer func() { defer func() {
tib.parentPfx = 0 tib.parentPfx = 0
tib.parentSession = nil
}() }()
if tib.checkPrefix(tib.parentPfx, key) {
tib.tdb.SetSession("")
k := db.ToSessionKey(tib.parentPfx, []byte(tib.parentSession), key)
k = append([]byte{tib.parentPfx}, k...)
err = tib.tdb.Put(ctx, k, b) err = tib.tdb.Put(ctx, k, b)
if err != nil { if err != nil {
logg.ErrorCtxf(ctx, "failed to update timestamp of record", err) logg.ErrorCtxf(ctx, "failed to update timestamp of record", err)
} }
}
return nil return nil
} }
@ -58,11 +97,13 @@ func(tib *TimedDb) Get(ctx context.Context, key []byte) ([]byte, error) {
return v, err return v, err
} }
func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, key []byte) bool { func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, sessionId string, key []byte) bool {
b := append([]byte{pfx}, key...) tib.tdb.SetSession("")
b := db.ToSessionKey(pfx, []byte(sessionId), key)
b = append([]byte{pfx}, b...)
v, err := tib.tdb.Get(ctx, b) v, err := tib.tdb.Get(ctx, b)
if err != nil { if err != nil {
logg.ErrorCtxf(ctx, "no time entry", "key", key) logg.ErrorCtxf(ctx, "no time entry", "key", key, "b", b)
return false return false
} }
t_now := time.Now() t_now := time.Now()

View File

@ -25,11 +25,61 @@ func TestStaleDb(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if tdb.Stale(ctx, db.DATATYPE_USERDATA, k) { if tdb.Stale(ctx, db.DATATYPE_USERDATA, "", k) {
t.Fatal("expected not stale") t.Fatal("expected not stale")
} }
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
if !tdb.Stale(ctx, db.DATATYPE_USERDATA, k) { if !tdb.Stale(ctx, db.DATATYPE_USERDATA, "", k) {
t.Fatal("expected stale") t.Fatal("expected stale")
} }
} }
func TestFilteredStaleDb(t *testing.T) {
ctx := context.Background()
mdb := memdb.NewMemDb()
err := mdb.Connect(ctx, "")
if err != nil {
t.Fatal(err)
}
k := []byte("foo")
tdb := NewTimedDb(mdb, time.Duration(time.Millisecond))
tdb = tdb.WithMatch(db.DATATYPE_STATE, []byte("in"))
tdb.SetPrefix(db.DATATYPE_USERDATA)
tdb.SetSession("inky")
err = tdb.Put(ctx, k, []byte("bar"))
if err != nil {
t.Fatal(err)
}
tdb.SetPrefix(db.DATATYPE_STATE)
tdb.SetSession("inky")
err = tdb.Put(ctx, k, []byte("pinky"))
if err != nil {
t.Fatal(err)
}
tdb.SetSession("blinky")
err = tdb.Put(ctx, k, []byte("clyde"))
if err != nil {
t.Fatal(err)
}
if tdb.Stale(ctx, db.DATATYPE_USERDATA, "inky", k) {
t.Fatal("expected not stale")
}
if tdb.Stale(ctx, db.DATATYPE_STATE, "inky", k) {
t.Fatal("expected not stale")
}
if tdb.Stale(ctx, db.DATATYPE_STATE, "blinky", k) {
t.Fatal("expected not stale")
}
time.Sleep(time.Millisecond)
if tdb.Stale(ctx, db.DATATYPE_USERDATA, "inky", k) {
t.Fatal("expected not stale")
}
if !tdb.Stale(ctx, db.DATATYPE_STATE, "inky", k) {
t.Fatal("expected stale")
}
if tdb.Stale(ctx, db.DATATYPE_STATE, "blinky", k) {
t.Fatal("expected not stale")
}
}