Compare commits
8 Commits
master
...
lash/stale
Author | SHA1 | Date | |
---|---|---|---|
0e61945cad | |||
|
94551ba37f | ||
|
973a69455e | ||
|
0af7379ae4 | ||
|
ce30cb740e | ||
|
659fd00c53 | ||
|
9b3ed0d6ae | ||
|
fbcde2f322 |
4
go.mod
4
go.mod
@ -5,14 +5,14 @@ go 1.23.0
|
|||||||
toolchain go1.23.2
|
toolchain go1.23.2
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.defalsify.org/vise.git v0.2.1-0.20241017112704-307fa6fcdc6b
|
git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed
|
||||||
github.com/alecthomas/assert/v2 v2.2.2
|
github.com/alecthomas/assert/v2 v2.2.2
|
||||||
github.com/grassrootseconomics/eth-custodial v1.3.0-beta
|
github.com/grassrootseconomics/eth-custodial v1.3.0-beta
|
||||||
github.com/peteole/testdata-loader v0.3.0
|
github.com/peteole/testdata-loader v0.3.0
|
||||||
gopkg.in/leonelquinteros/gotext.v1 v1.3.1
|
gopkg.in/leonelquinteros/gotext.v1 v1.3.1
|
||||||
)
|
)
|
||||||
|
|
||||||
require github.com/grassrootseconomics/ussd-data-service v0.0.0-20241003123429-4904b4438a3a // indirect
|
require github.com/grassrootseconomics/ussd-data-service v0.0.0-20241003123429-4904b4438a3a
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
|
4
go.sum
4
go.sum
@ -1,5 +1,5 @@
|
|||||||
git.defalsify.org/vise.git v0.2.1-0.20241017112704-307fa6fcdc6b h1:dxBplsIlzJHV+5EH+gzB+w08Blt7IJbb2jeRe1OEjLU=
|
git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed h1:4TrsfbK7NKgsa7KjMPlnV/tjYTkAAXP5PWAZzUfzCdI=
|
||||||
git.defalsify.org/vise.git v0.2.1-0.20241017112704-307fa6fcdc6b/go.mod h1:jyBMe1qTYUz3mmuoC9JQ/TvFeW0vTanCUcPu3H8p4Ck=
|
git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed/go.mod h1:jyBMe1qTYUz3mmuoC9JQ/TvFeW0vTanCUcPu3H8p4Ck=
|
||||||
github.com/alecthomas/assert/v2 v2.2.2 h1:Z/iVC0xZfWTaFNE6bA3z07T86hd45Xe2eLt6WVy2bbk=
|
github.com/alecthomas/assert/v2 v2.2.2 h1:Z/iVC0xZfWTaFNE6bA3z07T86hd45Xe2eLt6WVy2bbk=
|
||||||
github.com/alecthomas/assert/v2 v2.2.2/go.mod h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ=
|
github.com/alecthomas/assert/v2 v2.2.2/go.mod h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ=
|
||||||
github.com/alecthomas/participle/v2 v2.0.0 h1:Fgrq+MbuSsJwIkw3fEj9h75vDP0Er5JzepJ0/HNHv0g=
|
github.com/alecthomas/participle/v2 v2.0.0 h1:Fgrq+MbuSsJwIkw3fEj9h75vDP0Er5JzepJ0/HNHv0g=
|
||||||
|
@ -10,6 +10,10 @@ const (
|
|||||||
DATATYPE_USERSUB = 64
|
DATATYPE_USERSUB = 64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
SUBPREFIX_TIME = uint16(1)
|
||||||
|
)
|
||||||
|
|
||||||
// PrefixDb interface abstracts the database operations.
|
// PrefixDb interface abstracts the database operations.
|
||||||
type PrefixDb interface {
|
type PrefixDb interface {
|
||||||
Get(ctx context.Context, key []byte) ([]byte, error)
|
Get(ctx context.Context, key []byte) ([]byte, error)
|
||||||
@ -30,8 +34,12 @@ func NewSubPrefixDb(store db.Db, pfx []byte) *SubPrefixDb {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SubPrefixDb) toKey(k []byte) []byte {
|
func(s *SubPrefixDb) SetSession(sessionId string) {
|
||||||
return append(s.pfx, k...)
|
s.store.SetSession(sessionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func(s *SubPrefixDb) toKey(k []byte) []byte {
|
||||||
|
return append(s.pfx, k...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SubPrefixDb) Get(ctx context.Context, key []byte) ([]byte, error) {
|
func (s *SubPrefixDb) Get(ctx context.Context, key []byte) ([]byte, error) {
|
||||||
|
@ -12,6 +12,7 @@ var (
|
|||||||
dbC map[string]chan db.Db
|
dbC map[string]chan db.Db
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
type ThreadGdbmDb struct {
|
type ThreadGdbmDb struct {
|
||||||
db db.Db
|
db db.Db
|
||||||
connStr string
|
connStr string
|
||||||
|
109
internal/storage/timed.go
Normal file
109
internal/storage/timed.go
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
"encoding/binary"
|
||||||
|
|
||||||
|
"git.defalsify.org/vise.git/db"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TimedDb struct {
|
||||||
|
db.Db
|
||||||
|
tdb *SubPrefixDb
|
||||||
|
ttl time.Duration
|
||||||
|
parentPfx uint8
|
||||||
|
parentSession []byte
|
||||||
|
matchPfx map[uint8][][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb {
|
||||||
|
var b [2]byte
|
||||||
|
binary.BigEndian.PutUint16(b[:], SUBPREFIX_TIME)
|
||||||
|
sdb := NewSubPrefixDb(db, b[:])
|
||||||
|
return &TimedDb{
|
||||||
|
Db: db,
|
||||||
|
tdb: sdb,
|
||||||
|
ttl: ttl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func(tib *TimedDb) WithMatch(pfx uint8, keyPart []byte) *TimedDb {
|
||||||
|
if tib.matchPfx == nil {
|
||||||
|
tib.matchPfx = make(map[uint8][][]byte)
|
||||||
|
}
|
||||||
|
tib.matchPfx[pfx] = append(tib.matchPfx[pfx], keyPart)
|
||||||
|
return tib
|
||||||
|
}
|
||||||
|
|
||||||
|
func(tib *TimedDb) checkPrefix(pfx uint8, key []byte) bool {
|
||||||
|
var v []byte
|
||||||
|
if tib.matchPfx == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
for _, v = range(tib.matchPfx[pfx]) {
|
||||||
|
l := len(v)
|
||||||
|
if l > len(key) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if bytes.Equal(v, key[:l]) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func(tib *TimedDb) SetPrefix(pfx uint8) {
|
||||||
|
tib.Db.SetPrefix(pfx)
|
||||||
|
tib.parentPfx = pfx
|
||||||
|
}
|
||||||
|
|
||||||
|
func(tib *TimedDb) SetSession(session string) {
|
||||||
|
tib.Db.SetSession(session)
|
||||||
|
tib.parentSession = []byte(session)
|
||||||
|
}
|
||||||
|
|
||||||
|
func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error {
|
||||||
|
t := time.Now()
|
||||||
|
b, err := t.MarshalBinary()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = tib.Db.Put(ctx, key, val)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
tib.parentPfx = 0
|
||||||
|
tib.parentSession = nil
|
||||||
|
}()
|
||||||
|
if tib.checkPrefix(tib.parentPfx, key) {
|
||||||
|
tib.tdb.SetSession("")
|
||||||
|
k := db.ToSessionKey(tib.parentPfx, []byte(tib.parentSession), key)
|
||||||
|
k = append([]byte{tib.parentPfx}, k...)
|
||||||
|
err = tib.tdb.Put(ctx, k, b)
|
||||||
|
if err != nil {
|
||||||
|
logg.ErrorCtxf(ctx, "failed to update timestamp of record", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, sessionId string, key []byte) bool {
|
||||||
|
tib.tdb.SetSession("")
|
||||||
|
b := db.ToSessionKey(pfx, []byte(sessionId), key)
|
||||||
|
b = append([]byte{pfx}, b...)
|
||||||
|
v, err := tib.tdb.Get(ctx, b)
|
||||||
|
if err != nil {
|
||||||
|
logg.WarnCtxf(ctx, "no time entry", "key", key, "b", b)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
t_now := time.Now()
|
||||||
|
t_then := time.Time{}
|
||||||
|
err = t_then.UnmarshalBinary(v)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return t_now.After(t_then.Add(tib.ttl))
|
||||||
|
}
|
125
internal/storage/timed_test.go
Normal file
125
internal/storage/timed_test.go
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.defalsify.org/vise.git/db"
|
||||||
|
memdb "git.defalsify.org/vise.git/db/mem"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStaleDb(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
mdb := memdb.NewMemDb()
|
||||||
|
err := mdb.Connect(ctx, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tdb := NewTimedDb(mdb, time.Duration(time.Millisecond))
|
||||||
|
tdb.SetPrefix(db.DATATYPE_USERDATA)
|
||||||
|
k := []byte("foo")
|
||||||
|
err = tdb.Put(ctx, k, []byte("bar"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tdb.Stale(ctx, db.DATATYPE_USERDATA, "", k) {
|
||||||
|
t.Fatal("expected not stale")
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
if !tdb.Stale(ctx, db.DATATYPE_USERDATA, "", k) {
|
||||||
|
t.Fatal("expected stale")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFilteredStaleDb(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
mdb := memdb.NewMemDb()
|
||||||
|
err := mdb.Connect(ctx, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
k := []byte("foo")
|
||||||
|
tdb := NewTimedDb(mdb, time.Duration(time.Millisecond))
|
||||||
|
tdb = tdb.WithMatch(db.DATATYPE_STATE, []byte("fo"))
|
||||||
|
tdb.SetPrefix(db.DATATYPE_USERDATA)
|
||||||
|
tdb.SetSession("inky")
|
||||||
|
err = tdb.Put(ctx, k, []byte("bar"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
tdb.SetPrefix(db.DATATYPE_STATE)
|
||||||
|
tdb.SetSession("inky")
|
||||||
|
err = tdb.Put(ctx, k, []byte("pinky"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
tdb.SetSession("blinky")
|
||||||
|
err = tdb.Put(ctx, k, []byte("clyde"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tdb.Stale(ctx, db.DATATYPE_USERDATA, "inky", k) {
|
||||||
|
t.Fatal("expected not stale")
|
||||||
|
}
|
||||||
|
if tdb.Stale(ctx, db.DATATYPE_STATE, "inky", k) {
|
||||||
|
t.Fatal("expected not stale")
|
||||||
|
}
|
||||||
|
if tdb.Stale(ctx, db.DATATYPE_STATE, "blinky", k) {
|
||||||
|
t.Fatal("expected not stale")
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
if tdb.Stale(ctx, db.DATATYPE_USERDATA, "inky", k) {
|
||||||
|
t.Fatal("expected not stale")
|
||||||
|
}
|
||||||
|
if !tdb.Stale(ctx, db.DATATYPE_STATE, "inky", k) {
|
||||||
|
t.Fatal("expected stale")
|
||||||
|
}
|
||||||
|
if tdb.Stale(ctx, db.DATATYPE_STATE, "blinky", k) {
|
||||||
|
t.Fatal("expected not stale")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFilteredSameKeypartStaleDb(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
mdb := memdb.NewMemDb()
|
||||||
|
err := mdb.Connect(ctx, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tdb := NewTimedDb(mdb, time.Duration(time.Millisecond))
|
||||||
|
tdb = tdb.WithMatch(db.DATATYPE_USERDATA, []byte("ba"))
|
||||||
|
tdb.SetPrefix(db.DATATYPE_USERDATA)
|
||||||
|
tdb.SetSession("xyzzy")
|
||||||
|
err = tdb.Put(ctx, []byte("bar"), []byte("inky"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
tdb.SetPrefix(db.DATATYPE_USERDATA)
|
||||||
|
tdb.SetSession("xyzzy")
|
||||||
|
err = tdb.Put(ctx, []byte("baz"), []byte("pinky"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
tdb.SetPrefix(db.DATATYPE_USERDATA)
|
||||||
|
tdb.SetSession("xyzzy")
|
||||||
|
err = tdb.Put(ctx, []byte("foo"), []byte("blinky"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
if !tdb.Stale(ctx, db.DATATYPE_USERDATA, "xyzzy", []byte("bar")) {
|
||||||
|
t.Fatal("expected stale")
|
||||||
|
}
|
||||||
|
if !tdb.Stale(ctx, db.DATATYPE_USERDATA, "xyzzy", []byte("baz")) {
|
||||||
|
t.Fatal("expected stale")
|
||||||
|
}
|
||||||
|
if tdb.Stale(ctx, db.DATATYPE_USERDATA, "xyzzy", []byte("foo")) {
|
||||||
|
t.Fatal("expected not stale")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user