From 9b3ed0d6ae885ac0386c8393bb5b0d9bf609c014 Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 2 Oct 2024 23:57:55 +0100 Subject: [PATCH] Add session to timed db --- internal/storage/db.go | 4 +++ internal/storage/timed.go | 57 +++++++++++++++++++++++++++++----- internal/storage/timed_test.go | 54 ++++++++++++++++++++++++++++++-- 3 files changed, 105 insertions(+), 10 deletions(-) diff --git a/internal/storage/db.go b/internal/storage/db.go index d53a765..74bc571 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -26,6 +26,10 @@ func NewSubPrefixDb(store db.Db, pfx []byte) *SubPrefixDb { } } +func(s *SubPrefixDb) SetSession(sessionId string) { + s.store.SetSession(sessionId) +} + func(s *SubPrefixDb) toKey(k []byte) []byte { return append(s.pfx, k...) } diff --git a/internal/storage/timed.go b/internal/storage/timed.go index 2dce6b2..eedf00c 100644 --- a/internal/storage/timed.go +++ b/internal/storage/timed.go @@ -1,6 +1,7 @@ package storage import ( + "bytes" "context" "time" "encoding/binary" @@ -13,9 +14,10 @@ type TimedDb struct { tdb *SubPrefixDb ttl time.Duration parentPfx uint8 + parentSession []byte + matchPfx map[uint8][][]byte } - func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb { var b [2]byte binary.BigEndian.PutUint16(b[:], SUBPREFIX_TIME) @@ -27,11 +29,43 @@ func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb { } } +func(tib *TimedDb) WithMatch(pfx uint8, keyPart []byte) *TimedDb { + if tib.matchPfx == nil { + tib.matchPfx = make(map[uint8][][]byte) + } + tib.matchPfx[pfx] = append(tib.matchPfx[pfx], keyPart) + return tib +} + +func(tib *TimedDb) checkPrefix(pfx uint8, key []byte) bool { + var v []byte + if tib.matchPfx == nil { + return true + } + for _, v = range(tib.matchPfx[pfx]) { + l := len(v) + k := append(tib.parentSession, key...) + if l > len(k) { + continue + } + logg.Debugf("check the prefix", "v", v, "k", k, "l", l ) + if bytes.Equal(v, k[:l]) { + return true + } + } + return false +} + func(tib *TimedDb) SetPrefix(pfx uint8) { tib.Db.SetPrefix(pfx) tib.parentPfx = pfx } +func(tib *TimedDb) SetSession(session string) { + tib.Db.SetSession(session) + tib.parentSession = []byte(session) +} + func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error { t := time.Now() b, err := t.MarshalBinary() @@ -42,13 +76,18 @@ func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error { if err != nil { return err } - k := append([]byte{tib.parentPfx}, key...) defer func() { tib.parentPfx = 0 + tib.parentSession = nil }() - err = tib.tdb.Put(ctx, k, b) - if err != nil { - logg.ErrorCtxf(ctx, "failed to update timestamp of record", err) + if tib.checkPrefix(tib.parentPfx, key) { + tib.tdb.SetSession("") + k := db.ToSessionKey(tib.parentPfx, []byte(tib.parentSession), key) + k = append([]byte{tib.parentPfx}, k...) + err = tib.tdb.Put(ctx, k, b) + if err != nil { + logg.ErrorCtxf(ctx, "failed to update timestamp of record", err) + } } return nil } @@ -58,11 +97,13 @@ func(tib *TimedDb) Get(ctx context.Context, key []byte) ([]byte, error) { return v, err } -func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, key []byte) bool { - b := append([]byte{pfx}, key...) +func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, sessionId string, key []byte) bool { + tib.tdb.SetSession("") + b := db.ToSessionKey(pfx, []byte(sessionId), key) + b = append([]byte{pfx}, b...) v, err := tib.tdb.Get(ctx, b) if err != nil { - logg.ErrorCtxf(ctx, "no time entry", "key", key) + logg.ErrorCtxf(ctx, "no time entry", "key", key, "b", b) return false } t_now := time.Now() diff --git a/internal/storage/timed_test.go b/internal/storage/timed_test.go index 391d351..38fb63f 100644 --- a/internal/storage/timed_test.go +++ b/internal/storage/timed_test.go @@ -25,11 +25,61 @@ func TestStaleDb(t *testing.T) { t.Fatal(err) } - if tdb.Stale(ctx, db.DATATYPE_USERDATA, k) { + if tdb.Stale(ctx, db.DATATYPE_USERDATA, "", k) { t.Fatal("expected not stale") } time.Sleep(time.Millisecond) - if !tdb.Stale(ctx, db.DATATYPE_USERDATA, k) { + if !tdb.Stale(ctx, db.DATATYPE_USERDATA, "", k) { t.Fatal("expected stale") } } + +func TestFilteredStaleDb(t *testing.T) { + ctx := context.Background() + mdb := memdb.NewMemDb() + err := mdb.Connect(ctx, "") + if err != nil { + t.Fatal(err) + } + + k := []byte("foo") + tdb := NewTimedDb(mdb, time.Duration(time.Millisecond)) + tdb = tdb.WithMatch(db.DATATYPE_STATE, []byte("in")) + tdb.SetPrefix(db.DATATYPE_USERDATA) + tdb.SetSession("inky") + err = tdb.Put(ctx, k, []byte("bar")) + if err != nil { + t.Fatal(err) + } + tdb.SetPrefix(db.DATATYPE_STATE) + tdb.SetSession("inky") + err = tdb.Put(ctx, k, []byte("pinky")) + if err != nil { + t.Fatal(err) + } + tdb.SetSession("blinky") + err = tdb.Put(ctx, k, []byte("clyde")) + if err != nil { + t.Fatal(err) + } + + if tdb.Stale(ctx, db.DATATYPE_USERDATA, "inky", k) { + t.Fatal("expected not stale") + } + if tdb.Stale(ctx, db.DATATYPE_STATE, "inky", k) { + t.Fatal("expected not stale") + } + if tdb.Stale(ctx, db.DATATYPE_STATE, "blinky", k) { + t.Fatal("expected not stale") + } + time.Sleep(time.Millisecond) + if tdb.Stale(ctx, db.DATATYPE_USERDATA, "inky", k) { + t.Fatal("expected not stale") + } + if !tdb.Stale(ctx, db.DATATYPE_STATE, "inky", k) { + t.Fatal("expected stale") + } + if tdb.Stale(ctx, db.DATATYPE_STATE, "blinky", k) { + t.Fatal("expected not stale") + } +}