From fbcde2f322963d393b5e88707242168ce59b53eb Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 2 Oct 2024 22:06:43 +0100 Subject: [PATCH 1/5] Implement timed data row db --- internal/storage/db.go | 4 ++ internal/storage/gdbm.go | 1 + internal/storage/timed.go | 75 ++++++++++++++++++++++++++++++++++ internal/storage/timed_test.go | 35 ++++++++++++++++ 4 files changed, 115 insertions(+) create mode 100644 internal/storage/timed.go create mode 100644 internal/storage/timed_test.go diff --git a/internal/storage/db.go b/internal/storage/db.go index b2ac6a9..d53a765 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -10,6 +10,10 @@ const ( DATATYPE_USERSUB = 64 ) +const ( + SUBPREFIX_TIME = uint16(1) +) + type SubPrefixDb struct { store db.Db pfx []byte diff --git a/internal/storage/gdbm.go b/internal/storage/gdbm.go index eb959cf..61d53fc 100644 --- a/internal/storage/gdbm.go +++ b/internal/storage/gdbm.go @@ -12,6 +12,7 @@ var ( dbC map[string]chan db.Db ) + type ThreadGdbmDb struct { db db.Db connStr string diff --git a/internal/storage/timed.go b/internal/storage/timed.go new file mode 100644 index 0000000..2dce6b2 --- /dev/null +++ b/internal/storage/timed.go @@ -0,0 +1,75 @@ +package storage + +import ( + "context" + "time" + "encoding/binary" + + "git.defalsify.org/vise.git/db" +) + +type TimedDb struct { + db.Db + tdb *SubPrefixDb + ttl time.Duration + parentPfx uint8 +} + + +func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb { + var b [2]byte + binary.BigEndian.PutUint16(b[:], SUBPREFIX_TIME) + sdb := NewSubPrefixDb(db, b[:]) + return &TimedDb{ + Db: db, + tdb: sdb, + ttl: ttl, + } +} + +func(tib *TimedDb) SetPrefix(pfx uint8) { + tib.Db.SetPrefix(pfx) + tib.parentPfx = pfx +} + +func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error { + t := time.Now() + b, err := t.MarshalBinary() + if err != nil { + return err + } + err = tib.Db.Put(ctx, key, val) + if err != nil { + return err + } + k := append([]byte{tib.parentPfx}, key...) + defer func() { + tib.parentPfx = 0 + }() + err = tib.tdb.Put(ctx, k, b) + if err != nil { + logg.ErrorCtxf(ctx, "failed to update timestamp of record", err) + } + return nil +} + +func(tib *TimedDb) Get(ctx context.Context, key []byte) ([]byte, error) { + v, err := tib.Db.Get(ctx, key) + return v, err +} + +func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, key []byte) bool { + b := append([]byte{pfx}, key...) + v, err := tib.tdb.Get(ctx, b) + if err != nil { + logg.ErrorCtxf(ctx, "no time entry", "key", key) + return false + } + t_now := time.Now() + t_then := time.Time{} + err = t_then.UnmarshalBinary(v) + if err != nil { + return false + } + return t_now.After(t_then.Add(tib.ttl)) +} diff --git a/internal/storage/timed_test.go b/internal/storage/timed_test.go new file mode 100644 index 0000000..391d351 --- /dev/null +++ b/internal/storage/timed_test.go @@ -0,0 +1,35 @@ +package storage + +import ( + "context" + "testing" + "time" + + "git.defalsify.org/vise.git/db" + memdb "git.defalsify.org/vise.git/db/mem" +) + +func TestStaleDb(t *testing.T) { + ctx := context.Background() + mdb := memdb.NewMemDb() + err := mdb.Connect(ctx, "") + if err != nil { + t.Fatal(err) + } + + tdb := NewTimedDb(mdb, time.Duration(time.Millisecond)) + tdb.SetPrefix(db.DATATYPE_USERDATA) + k := []byte("foo") + err = tdb.Put(ctx, k, []byte("bar")) + if err != nil { + t.Fatal(err) + } + + if tdb.Stale(ctx, db.DATATYPE_USERDATA, k) { + t.Fatal("expected not stale") + } + time.Sleep(time.Millisecond) + if !tdb.Stale(ctx, db.DATATYPE_USERDATA, k) { + t.Fatal("expected stale") + } +} -- 2.45.2 From 9b3ed0d6ae885ac0386c8393bb5b0d9bf609c014 Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 2 Oct 2024 23:57:55 +0100 Subject: [PATCH 2/5] Add session to timed db --- internal/storage/db.go | 4 +++ internal/storage/timed.go | 57 +++++++++++++++++++++++++++++----- internal/storage/timed_test.go | 54 ++++++++++++++++++++++++++++++-- 3 files changed, 105 insertions(+), 10 deletions(-) diff --git a/internal/storage/db.go b/internal/storage/db.go index d53a765..74bc571 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -26,6 +26,10 @@ func NewSubPrefixDb(store db.Db, pfx []byte) *SubPrefixDb { } } +func(s *SubPrefixDb) SetSession(sessionId string) { + s.store.SetSession(sessionId) +} + func(s *SubPrefixDb) toKey(k []byte) []byte { return append(s.pfx, k...) } diff --git a/internal/storage/timed.go b/internal/storage/timed.go index 2dce6b2..eedf00c 100644 --- a/internal/storage/timed.go +++ b/internal/storage/timed.go @@ -1,6 +1,7 @@ package storage import ( + "bytes" "context" "time" "encoding/binary" @@ -13,9 +14,10 @@ type TimedDb struct { tdb *SubPrefixDb ttl time.Duration parentPfx uint8 + parentSession []byte + matchPfx map[uint8][][]byte } - func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb { var b [2]byte binary.BigEndian.PutUint16(b[:], SUBPREFIX_TIME) @@ -27,11 +29,43 @@ func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb { } } +func(tib *TimedDb) WithMatch(pfx uint8, keyPart []byte) *TimedDb { + if tib.matchPfx == nil { + tib.matchPfx = make(map[uint8][][]byte) + } + tib.matchPfx[pfx] = append(tib.matchPfx[pfx], keyPart) + return tib +} + +func(tib *TimedDb) checkPrefix(pfx uint8, key []byte) bool { + var v []byte + if tib.matchPfx == nil { + return true + } + for _, v = range(tib.matchPfx[pfx]) { + l := len(v) + k := append(tib.parentSession, key...) + if l > len(k) { + continue + } + logg.Debugf("check the prefix", "v", v, "k", k, "l", l ) + if bytes.Equal(v, k[:l]) { + return true + } + } + return false +} + func(tib *TimedDb) SetPrefix(pfx uint8) { tib.Db.SetPrefix(pfx) tib.parentPfx = pfx } +func(tib *TimedDb) SetSession(session string) { + tib.Db.SetSession(session) + tib.parentSession = []byte(session) +} + func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error { t := time.Now() b, err := t.MarshalBinary() @@ -42,13 +76,18 @@ func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error { if err != nil { return err } - k := append([]byte{tib.parentPfx}, key...) defer func() { tib.parentPfx = 0 + tib.parentSession = nil }() - err = tib.tdb.Put(ctx, k, b) - if err != nil { - logg.ErrorCtxf(ctx, "failed to update timestamp of record", err) + if tib.checkPrefix(tib.parentPfx, key) { + tib.tdb.SetSession("") + k := db.ToSessionKey(tib.parentPfx, []byte(tib.parentSession), key) + k = append([]byte{tib.parentPfx}, k...) + err = tib.tdb.Put(ctx, k, b) + if err != nil { + logg.ErrorCtxf(ctx, "failed to update timestamp of record", err) + } } return nil } @@ -58,11 +97,13 @@ func(tib *TimedDb) Get(ctx context.Context, key []byte) ([]byte, error) { return v, err } -func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, key []byte) bool { - b := append([]byte{pfx}, key...) +func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, sessionId string, key []byte) bool { + tib.tdb.SetSession("") + b := db.ToSessionKey(pfx, []byte(sessionId), key) + b = append([]byte{pfx}, b...) v, err := tib.tdb.Get(ctx, b) if err != nil { - logg.ErrorCtxf(ctx, "no time entry", "key", key) + logg.ErrorCtxf(ctx, "no time entry", "key", key, "b", b) return false } t_now := time.Now() diff --git a/internal/storage/timed_test.go b/internal/storage/timed_test.go index 391d351..38fb63f 100644 --- a/internal/storage/timed_test.go +++ b/internal/storage/timed_test.go @@ -25,11 +25,61 @@ func TestStaleDb(t *testing.T) { t.Fatal(err) } - if tdb.Stale(ctx, db.DATATYPE_USERDATA, k) { + if tdb.Stale(ctx, db.DATATYPE_USERDATA, "", k) { t.Fatal("expected not stale") } time.Sleep(time.Millisecond) - if !tdb.Stale(ctx, db.DATATYPE_USERDATA, k) { + if !tdb.Stale(ctx, db.DATATYPE_USERDATA, "", k) { t.Fatal("expected stale") } } + +func TestFilteredStaleDb(t *testing.T) { + ctx := context.Background() + mdb := memdb.NewMemDb() + err := mdb.Connect(ctx, "") + if err != nil { + t.Fatal(err) + } + + k := []byte("foo") + tdb := NewTimedDb(mdb, time.Duration(time.Millisecond)) + tdb = tdb.WithMatch(db.DATATYPE_STATE, []byte("in")) + tdb.SetPrefix(db.DATATYPE_USERDATA) + tdb.SetSession("inky") + err = tdb.Put(ctx, k, []byte("bar")) + if err != nil { + t.Fatal(err) + } + tdb.SetPrefix(db.DATATYPE_STATE) + tdb.SetSession("inky") + err = tdb.Put(ctx, k, []byte("pinky")) + if err != nil { + t.Fatal(err) + } + tdb.SetSession("blinky") + err = tdb.Put(ctx, k, []byte("clyde")) + if err != nil { + t.Fatal(err) + } + + if tdb.Stale(ctx, db.DATATYPE_USERDATA, "inky", k) { + t.Fatal("expected not stale") + } + if tdb.Stale(ctx, db.DATATYPE_STATE, "inky", k) { + t.Fatal("expected not stale") + } + if tdb.Stale(ctx, db.DATATYPE_STATE, "blinky", k) { + t.Fatal("expected not stale") + } + time.Sleep(time.Millisecond) + if tdb.Stale(ctx, db.DATATYPE_USERDATA, "inky", k) { + t.Fatal("expected not stale") + } + if !tdb.Stale(ctx, db.DATATYPE_STATE, "inky", k) { + t.Fatal("expected stale") + } + if tdb.Stale(ctx, db.DATATYPE_STATE, "blinky", k) { + t.Fatal("expected not stale") + } +} -- 2.45.2 From 659fd00c53cd248e2e2e9871197948cc5c45c165 Mon Sep 17 00:00:00 2001 From: lash Date: Wed, 2 Oct 2024 23:59:41 +0100 Subject: [PATCH 3/5] Remove noop get override in timed db --- internal/storage/timed.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/storage/timed.go b/internal/storage/timed.go index eedf00c..cf05dbb 100644 --- a/internal/storage/timed.go +++ b/internal/storage/timed.go @@ -92,11 +92,6 @@ func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error { return nil } -func(tib *TimedDb) Get(ctx context.Context, key []byte) ([]byte, error) { - v, err := tib.Db.Get(ctx, key) - return v, err -} - func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, sessionId string, key []byte) bool { tib.tdb.SetSession("") b := db.ToSessionKey(pfx, []byte(sessionId), key) -- 2.45.2 From ce30cb740eb9d4c55d7872bd00386a79d4fb22bf Mon Sep 17 00:00:00 2001 From: lash Date: Thu, 3 Oct 2024 01:44:28 +0100 Subject: [PATCH 4/5] Remove session id from filter match key --- internal/storage/timed.go | 8 +++---- internal/storage/timed_test.go | 42 +++++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/internal/storage/timed.go b/internal/storage/timed.go index cf05dbb..256bd9d 100644 --- a/internal/storage/timed.go +++ b/internal/storage/timed.go @@ -44,12 +44,10 @@ func(tib *TimedDb) checkPrefix(pfx uint8, key []byte) bool { } for _, v = range(tib.matchPfx[pfx]) { l := len(v) - k := append(tib.parentSession, key...) - if l > len(k) { + if l > len(key) { continue } - logg.Debugf("check the prefix", "v", v, "k", k, "l", l ) - if bytes.Equal(v, k[:l]) { + if bytes.Equal(v, key[:l]) { return true } } @@ -98,7 +96,7 @@ func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, sessionId string, key [ b = append([]byte{pfx}, b...) v, err := tib.tdb.Get(ctx, b) if err != nil { - logg.ErrorCtxf(ctx, "no time entry", "key", key, "b", b) + logg.WarnCtxf(ctx, "no time entry", "key", key, "b", b) return false } t_now := time.Now() diff --git a/internal/storage/timed_test.go b/internal/storage/timed_test.go index 38fb63f..a5c7980 100644 --- a/internal/storage/timed_test.go +++ b/internal/storage/timed_test.go @@ -44,7 +44,7 @@ func TestFilteredStaleDb(t *testing.T) { k := []byte("foo") tdb := NewTimedDb(mdb, time.Duration(time.Millisecond)) - tdb = tdb.WithMatch(db.DATATYPE_STATE, []byte("in")) + tdb = tdb.WithMatch(db.DATATYPE_STATE, []byte("fo")) tdb.SetPrefix(db.DATATYPE_USERDATA) tdb.SetSession("inky") err = tdb.Put(ctx, k, []byte("bar")) @@ -83,3 +83,43 @@ func TestFilteredStaleDb(t *testing.T) { t.Fatal("expected not stale") } } + +func TestFilteredSameKeypartStaleDb(t *testing.T) { + ctx := context.Background() + mdb := memdb.NewMemDb() + err := mdb.Connect(ctx, "") + if err != nil { + t.Fatal(err) + } + + tdb := NewTimedDb(mdb, time.Duration(time.Millisecond)) + tdb = tdb.WithMatch(db.DATATYPE_USERDATA, []byte("ba")) + tdb.SetPrefix(db.DATATYPE_USERDATA) + tdb.SetSession("xyzzy") + err = tdb.Put(ctx, []byte("bar"), []byte("inky")) + if err != nil { + t.Fatal(err) + } + tdb.SetPrefix(db.DATATYPE_USERDATA) + tdb.SetSession("xyzzy") + err = tdb.Put(ctx, []byte("baz"), []byte("pinky")) + if err != nil { + t.Fatal(err) + } + tdb.SetPrefix(db.DATATYPE_USERDATA) + tdb.SetSession("xyzzy") + err = tdb.Put(ctx, []byte("foo"), []byte("blinky")) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Millisecond) + if !tdb.Stale(ctx, db.DATATYPE_USERDATA, "xyzzy", []byte("bar")) { + t.Fatal("expected stale") + } + if !tdb.Stale(ctx, db.DATATYPE_USERDATA, "xyzzy", []byte("baz")) { + t.Fatal("expected stale") + } + if tdb.Stale(ctx, db.DATATYPE_USERDATA, "xyzzy", []byte("foo")) { + t.Fatal("expected not stale") + } +} -- 2.45.2 From 94551ba37fc3780a17c8af36342670482defb2c9 Mon Sep 17 00:00:00 2001 From: lash Date: Thu, 31 Oct 2024 20:51:02 +0000 Subject: [PATCH 5/5] Update govise dep --- go.mod | 4 ++-- go.sum | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 391c1a5..eef74b9 100644 --- a/go.mod +++ b/go.mod @@ -5,14 +5,14 @@ go 1.23.0 toolchain go1.23.2 require ( - git.defalsify.org/vise.git v0.2.1-0.20241017112704-307fa6fcdc6b + git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed github.com/alecthomas/assert/v2 v2.2.2 github.com/grassrootseconomics/eth-custodial v1.3.0-beta github.com/peteole/testdata-loader v0.3.0 gopkg.in/leonelquinteros/gotext.v1 v1.3.1 ) -require github.com/grassrootseconomics/ussd-data-service v0.0.0-20241003123429-4904b4438a3a // indirect +require github.com/grassrootseconomics/ussd-data-service v0.0.0-20241003123429-4904b4438a3a require ( github.com/jackc/pgpassfile v1.0.0 // indirect diff --git a/go.sum b/go.sum index 0ba38c1..c8fa4cd 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -git.defalsify.org/vise.git v0.2.1-0.20241017112704-307fa6fcdc6b h1:dxBplsIlzJHV+5EH+gzB+w08Blt7IJbb2jeRe1OEjLU= -git.defalsify.org/vise.git v0.2.1-0.20241017112704-307fa6fcdc6b/go.mod h1:jyBMe1qTYUz3mmuoC9JQ/TvFeW0vTanCUcPu3H8p4Ck= +git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed h1:4TrsfbK7NKgsa7KjMPlnV/tjYTkAAXP5PWAZzUfzCdI= +git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed/go.mod h1:jyBMe1qTYUz3mmuoC9JQ/TvFeW0vTanCUcPu3H8p4Ck= github.com/alecthomas/assert/v2 v2.2.2 h1:Z/iVC0xZfWTaFNE6bA3z07T86hd45Xe2eLt6WVy2bbk= github.com/alecthomas/assert/v2 v2.2.2/go.mod h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ= github.com/alecthomas/participle/v2 v2.0.0 h1:Fgrq+MbuSsJwIkw3fEj9h75vDP0Er5JzepJ0/HNHv0g= -- 2.45.2