forked from grassrootseconomics/visedriver
Implement timed data row db
This commit is contained in:
parent
2a93ea7a0c
commit
fbcde2f322
@ -10,6 +10,10 @@ const (
|
||||
DATATYPE_USERSUB = 64
|
||||
)
|
||||
|
||||
const (
|
||||
SUBPREFIX_TIME = uint16(1)
|
||||
)
|
||||
|
||||
type SubPrefixDb struct {
|
||||
store db.Db
|
||||
pfx []byte
|
||||
|
@ -12,6 +12,7 @@ var (
|
||||
dbC map[string]chan db.Db
|
||||
)
|
||||
|
||||
|
||||
type ThreadGdbmDb struct {
|
||||
db db.Db
|
||||
connStr string
|
||||
|
75
internal/storage/timed.go
Normal file
75
internal/storage/timed.go
Normal file
@ -0,0 +1,75 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
"encoding/binary"
|
||||
|
||||
"git.defalsify.org/vise.git/db"
|
||||
)
|
||||
|
||||
type TimedDb struct {
|
||||
db.Db
|
||||
tdb *SubPrefixDb
|
||||
ttl time.Duration
|
||||
parentPfx uint8
|
||||
}
|
||||
|
||||
|
||||
func NewTimedDb(db db.Db, ttl time.Duration) *TimedDb {
|
||||
var b [2]byte
|
||||
binary.BigEndian.PutUint16(b[:], SUBPREFIX_TIME)
|
||||
sdb := NewSubPrefixDb(db, b[:])
|
||||
return &TimedDb{
|
||||
Db: db,
|
||||
tdb: sdb,
|
||||
ttl: ttl,
|
||||
}
|
||||
}
|
||||
|
||||
func(tib *TimedDb) SetPrefix(pfx uint8) {
|
||||
tib.Db.SetPrefix(pfx)
|
||||
tib.parentPfx = pfx
|
||||
}
|
||||
|
||||
func(tib *TimedDb) Put(ctx context.Context, key []byte, val []byte) error {
|
||||
t := time.Now()
|
||||
b, err := t.MarshalBinary()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = tib.Db.Put(ctx, key, val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
k := append([]byte{tib.parentPfx}, key...)
|
||||
defer func() {
|
||||
tib.parentPfx = 0
|
||||
}()
|
||||
err = tib.tdb.Put(ctx, k, b)
|
||||
if err != nil {
|
||||
logg.ErrorCtxf(ctx, "failed to update timestamp of record", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func(tib *TimedDb) Get(ctx context.Context, key []byte) ([]byte, error) {
|
||||
v, err := tib.Db.Get(ctx, key)
|
||||
return v, err
|
||||
}
|
||||
|
||||
func(tib *TimedDb) Stale(ctx context.Context, pfx uint8, key []byte) bool {
|
||||
b := append([]byte{pfx}, key...)
|
||||
v, err := tib.tdb.Get(ctx, b)
|
||||
if err != nil {
|
||||
logg.ErrorCtxf(ctx, "no time entry", "key", key)
|
||||
return false
|
||||
}
|
||||
t_now := time.Now()
|
||||
t_then := time.Time{}
|
||||
err = t_then.UnmarshalBinary(v)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return t_now.After(t_then.Add(tib.ttl))
|
||||
}
|
35
internal/storage/timed_test.go
Normal file
35
internal/storage/timed_test.go
Normal file
@ -0,0 +1,35 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.defalsify.org/vise.git/db"
|
||||
memdb "git.defalsify.org/vise.git/db/mem"
|
||||
)
|
||||
|
||||
func TestStaleDb(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mdb := memdb.NewMemDb()
|
||||
err := mdb.Connect(ctx, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
tdb := NewTimedDb(mdb, time.Duration(time.Millisecond))
|
||||
tdb.SetPrefix(db.DATATYPE_USERDATA)
|
||||
k := []byte("foo")
|
||||
err = tdb.Put(ctx, k, []byte("bar"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if tdb.Stale(ctx, db.DATATYPE_USERDATA, k) {
|
||||
t.Fatal("expected not stale")
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
if !tdb.Stale(ctx, db.DATATYPE_USERDATA, k) {
|
||||
t.Fatal("expected stale")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user