Allow multiple db connections in menuservice
This commit is contained in:
parent
c5bb1c80a5
commit
348fff8936
73
storage/conn.go
Normal file
73
storage/conn.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DBTYPE_NONE = iota
|
||||||
|
DBTYPE_MEM
|
||||||
|
DBTYPE_FS
|
||||||
|
DBTYPE_GDBM
|
||||||
|
DBTYPE_POSTGRES
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
STORETYPE_STATE = iota
|
||||||
|
STORETYPE_RESOURCE
|
||||||
|
STORETYPE_USER
|
||||||
|
_STORETYPE_MAX
|
||||||
|
)
|
||||||
|
|
||||||
|
type Conns map[int8]ConnData
|
||||||
|
|
||||||
|
func NewConns() Conns {
|
||||||
|
c := make(Conns)
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Conns) Set(typ int8, conn ConnData) {
|
||||||
|
if typ < 0 || typ >= _STORETYPE_MAX {
|
||||||
|
panic(fmt.Errorf("invalid store type: %d", typ))
|
||||||
|
}
|
||||||
|
c[typ] = conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Conns) Have(conn *ConnData) int8 {
|
||||||
|
for i := range(_STORETYPE_MAX) {
|
||||||
|
ii := int8(i)
|
||||||
|
v, ok := c[ii]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if v.String() == conn.String() {
|
||||||
|
return ii
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConnData struct {
|
||||||
|
typ int
|
||||||
|
str string
|
||||||
|
domain string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cd *ConnData) DbType() int {
|
||||||
|
return cd.typ
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cd *ConnData) String() string {
|
||||||
|
return cd.str
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cd *ConnData) Domain() string {
|
||||||
|
return cd.domain
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cd *ConnData) Path() string {
|
||||||
|
v, _ := url.Parse(cd.str)
|
||||||
|
v.RawQuery = ""
|
||||||
|
return v.String()
|
||||||
|
}
|
@ -141,3 +141,7 @@ func(tdb *ThreadGdbmDb) Start(ctx context.Context) error {
|
|||||||
func(tdb *ThreadGdbmDb) Stop(ctx context.Context) error {
|
func(tdb *ThreadGdbmDb) Stop(ctx context.Context) error {
|
||||||
return tdb.db.Stop(ctx)
|
return tdb.db.Stop(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func(tdb *ThreadGdbmDb) Connection() string {
|
||||||
|
return tdb.db.Connection()
|
||||||
|
}
|
||||||
|
@ -7,38 +7,6 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
DBTYPE_NONE = iota
|
|
||||||
DBTYPE_MEM
|
|
||||||
DBTYPE_FS
|
|
||||||
DBTYPE_GDBM
|
|
||||||
DBTYPE_POSTGRES
|
|
||||||
)
|
|
||||||
|
|
||||||
type ConnData struct {
|
|
||||||
typ int
|
|
||||||
str string
|
|
||||||
domain string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cd *ConnData) DbType() int {
|
|
||||||
return cd.typ
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cd *ConnData) String() string {
|
|
||||||
return cd.str
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cd *ConnData) Domain() string {
|
|
||||||
return cd.domain
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cd *ConnData) Path() string {
|
|
||||||
v, _ := url.Parse(cd.str)
|
|
||||||
v.RawQuery = ""
|
|
||||||
return v.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
func probePostgres(s string) (string, string, bool) {
|
func probePostgres(s string) (string, string, bool) {
|
||||||
domain := "public"
|
domain := "public"
|
||||||
v, err := url.Parse(s)
|
v, err := url.Parse(s)
|
||||||
|
@ -2,6 +2,7 @@ package storage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
@ -29,53 +30,77 @@ type StorageService interface {
|
|||||||
|
|
||||||
// TODO: Support individual backend for each store (conndata)
|
// TODO: Support individual backend for each store (conndata)
|
||||||
type MenuStorageService struct {
|
type MenuStorageService struct {
|
||||||
conn ConnData
|
conns Conns
|
||||||
resourceDir string
|
|
||||||
poResource resource.Resource
|
poResource resource.Resource
|
||||||
resourceStore db.Db
|
store map[int8]db.Db
|
||||||
stateStore db.Db
|
|
||||||
userDataStore db.Db
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMenuStorageService(conn ConnData, resourceDir string) *MenuStorageService {
|
func NewMenuStorageService(conn Conns) *MenuStorageService {
|
||||||
return &MenuStorageService{
|
return &MenuStorageService{
|
||||||
conn: conn,
|
conns: conn,
|
||||||
resourceDir: resourceDir,
|
store: make(map[int8]db.Db),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MenuStorageService) WithResourceDir(resourceDir string) *MenuStorageService {
|
func (ms *MenuStorageService) WithDb(store db.Db, typ int8) *MenuStorageService {
|
||||||
ms.resourceDir = resourceDir
|
var err error
|
||||||
|
if ms.store[typ] != nil {
|
||||||
|
panic(fmt.Errorf("db already set for typ: %d", typ))
|
||||||
|
}
|
||||||
|
ms.store[typ] = store
|
||||||
|
ms.conns[typ], err = ToConnData(store.Connection())
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
return ms
|
return ms
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: allow fsdb, memdb
|
func (ms *MenuStorageService) checkDb(ctx context.Context,typ int8) db.Db {
|
||||||
func (ms *MenuStorageService) getOrCreateDb(ctx context.Context, existingDb db.Db, section string, typ string) (db.Db, error) {
|
store := ms.store[typ]
|
||||||
var newDb db.Db
|
if store != nil {
|
||||||
|
return store
|
||||||
|
}
|
||||||
|
connData := ms.conns[typ]
|
||||||
|
v := ms.conns.Have(&connData)
|
||||||
|
if v == -1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
src := ms.store[v]
|
||||||
|
if src == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ms.store[typ] = ms.store[v]
|
||||||
|
logg.InfoCtxf(ctx, "found existing db", "typ", typ, "srctyp", v, "store", ms.store[typ], "srcstore", ms.store[v])
|
||||||
|
return ms.store[typ]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms *MenuStorageService) getOrCreateDb(ctx context.Context, section string, typ int8) (db.Db, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if existingDb != nil {
|
newDb := ms.checkDb(ctx, typ)
|
||||||
return existingDb, nil
|
if newDb != nil {
|
||||||
|
return newDb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
connStr := ms.conn.String()
|
connData := ms.conns[typ]
|
||||||
dbTyp := ms.conn.DbType()
|
connStr := connData.String()
|
||||||
|
dbTyp := connData.DbType()
|
||||||
if dbTyp == DBTYPE_POSTGRES {
|
if dbTyp == DBTYPE_POSTGRES {
|
||||||
// TODO: move to vise
|
// TODO: move to vise
|
||||||
err = ensureSchemaExists(ctx, ms.conn)
|
err = ensureSchemaExists(ctx, connData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
newDb = postgres.NewPgDb().WithSchema(ms.conn.Domain())
|
newDb = postgres.NewPgDb().WithSchema(connData.Domain())
|
||||||
} else if dbTyp == DBTYPE_GDBM {
|
} else if dbTyp == DBTYPE_GDBM {
|
||||||
err = ms.ensureDbDir()
|
err = ms.ensureDbDir(connStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
connStr = path.Join(connStr, section)
|
connStr = path.Join(connStr, section)
|
||||||
newDb = gdbmstorage.NewThreadGdbmDb()
|
newDb = gdbmstorage.NewThreadGdbmDb()
|
||||||
} else if dbTyp == DBTYPE_FS {
|
} else if dbTyp == DBTYPE_FS {
|
||||||
err = ms.ensureDbDir()
|
err = ms.ensureDbDir(connStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -83,13 +108,14 @@ func (ms *MenuStorageService) getOrCreateDb(ctx context.Context, existingDb db.D
|
|||||||
} else if dbTyp == DBTYPE_MEM {
|
} else if dbTyp == DBTYPE_MEM {
|
||||||
logg.WarnCtxf(ctx, "using volatile storage (memdb)")
|
logg.WarnCtxf(ctx, "using volatile storage (memdb)")
|
||||||
} else {
|
} else {
|
||||||
return nil, fmt.Errorf("unsupported connection string: '%s'\n", ms.conn.String())
|
return nil, fmt.Errorf("unsupported connection string: '%s'\n", connData.String())
|
||||||
}
|
}
|
||||||
logg.DebugCtxf(ctx, "connecting to db", "conn", connStr, "conndata", ms.conn, "typ", typ)
|
logg.DebugCtxf(ctx, "connecting to db", "conn", connData, "typ", typ)
|
||||||
err = newDb.Connect(ctx, connStr)
|
err = newDb.Connect(ctx, connStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
ms.store[typ] = newDb
|
||||||
|
|
||||||
return newDb, nil
|
return newDb, nil
|
||||||
}
|
}
|
||||||
@ -145,26 +171,15 @@ func (ms *MenuStorageService) GetPersister(ctx context.Context) (*persist.Persis
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MenuStorageService) GetUserdataDb(ctx context.Context) (db.Db, error) {
|
func (ms *MenuStorageService) GetUserdataDb(ctx context.Context) (db.Db, error) {
|
||||||
if ms.userDataStore != nil {
|
return ms.getOrCreateDb(ctx, "userdata.gdbm", STORETYPE_USER)
|
||||||
return ms.userDataStore, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
userDataStore, err := ms.getOrCreateDb(ctx, ms.userDataStore, "userdata.gdbm", "userdata")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ms.userDataStore = userDataStore
|
|
||||||
return ms.userDataStore, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MenuStorageService) GetResource(ctx context.Context) (resource.Resource, error) {
|
func (ms *MenuStorageService) GetResource(ctx context.Context) (resource.Resource, error) {
|
||||||
ms.resourceStore = fsdb.NewFsDb()
|
store, err := ms.getOrCreateDb(ctx, "resource.gdbm", STORETYPE_RESOURCE)
|
||||||
err := ms.resourceStore.Connect(ctx, ms.resourceDir)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
rfs := resource.NewDbResource(ms.resourceStore)
|
rfs := resource.NewDbResource(store)
|
||||||
if ms.poResource != nil {
|
if ms.poResource != nil {
|
||||||
logg.InfoCtxf(ctx, "using poresource for menu and template")
|
logg.InfoCtxf(ctx, "using poresource for menu and template")
|
||||||
rfs.WithMenuGetter(ms.poResource.GetMenu)
|
rfs.WithMenuGetter(ms.poResource.GetMenu)
|
||||||
@ -174,34 +189,34 @@ func (ms *MenuStorageService) GetResource(ctx context.Context) (resource.Resourc
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MenuStorageService) GetStateStore(ctx context.Context) (db.Db, error) {
|
func (ms *MenuStorageService) GetStateStore(ctx context.Context) (db.Db, error) {
|
||||||
if ms.stateStore != nil {
|
return ms.getOrCreateDb(ctx, "state.gdbm", STORETYPE_STATE)
|
||||||
return ms.stateStore, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
stateStore, err := ms.getOrCreateDb(ctx, ms.stateStore, "state.gdbm", "state")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ms.stateStore = stateStore
|
|
||||||
return ms.stateStore, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MenuStorageService) ensureDbDir() error {
|
func (ms *MenuStorageService) ensureDbDir(path string) error {
|
||||||
err := os.MkdirAll(ms.conn.String(), 0700)
|
err := os.MkdirAll(path, 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("state dir create exited with error: %v\n", err)
|
return fmt.Errorf("store dir create exited with error: %v\n", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: how to handle persister here?
|
// TODO: how to handle persister here?
|
||||||
func (ms *MenuStorageService) Close(ctx context.Context) error {
|
func (ms *MenuStorageService) Close(ctx context.Context) error {
|
||||||
errA := ms.stateStore.Close(ctx)
|
var errs []error
|
||||||
errB := ms.userDataStore.Close(ctx)
|
var haveErr bool
|
||||||
errC := ms.resourceStore.Close(ctx)
|
for i := range(_STORETYPE_MAX) {
|
||||||
if errA != nil || errB != nil || errC != nil {
|
err := ms.store[int8(i)].Close(ctx)
|
||||||
return fmt.Errorf("%v %v %v", errA, errB, errC)
|
if err != nil {
|
||||||
|
haveErr = true
|
||||||
|
}
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
if haveErr {
|
||||||
|
errStr := ""
|
||||||
|
for i, err := range(errs) {
|
||||||
|
errStr += fmt.Sprintf("(%d: %v)", i, err)
|
||||||
|
}
|
||||||
|
return errors.New(errStr)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
113
storage/storage_service_test.go
Normal file
113
storage/storage_service_test.go
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
fsdb "git.defalsify.org/vise.git/db/fs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMenuStorageServiceOneSet(t *testing.T) {
|
||||||
|
d, err := os.MkdirTemp("", "visedriver-menustorageservice")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(d)
|
||||||
|
conns := NewConns()
|
||||||
|
connData, err := ToConnData(d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
conns.Set(STORETYPE_STATE, connData)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
ms := NewMenuStorageService(conns)
|
||||||
|
_, err = ms.GetStateStore(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = ms.GetResource(ctx)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected error getting resource")
|
||||||
|
}
|
||||||
|
_, err = ms.GetUserdataDb(ctx)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected error getting userdata")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMenuStorageServiceExplicit(t *testing.T) {
|
||||||
|
d, err := os.MkdirTemp("", "visedriver-menustorageservice")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(d)
|
||||||
|
conns := NewConns()
|
||||||
|
connData, err := ToConnData(d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
conns.Set(STORETYPE_STATE, connData)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
d, err = os.MkdirTemp("", "visedriver-menustorageservice")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(d)
|
||||||
|
store := fsdb.NewFsDb()
|
||||||
|
err = store.Connect(ctx, d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ms := NewMenuStorageService(conns)
|
||||||
|
ms = ms.WithDb(store, STORETYPE_RESOURCE)
|
||||||
|
_, err = ms.GetStateStore(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = ms.GetResource(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = ms.GetUserdataDb(ctx)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected error getting userdata")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMenuStorageServiceReuse(t *testing.T) {
|
||||||
|
d, err := os.MkdirTemp("", "visedriver-menustorageservice")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(d)
|
||||||
|
conns := NewConns()
|
||||||
|
connData, err := ToConnData(d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
conns.Set(STORETYPE_STATE, connData)
|
||||||
|
conns.Set(STORETYPE_USER, connData)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
ms := NewMenuStorageService(conns)
|
||||||
|
stateStore, err := ms.GetStateStore(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = ms.GetResource(ctx)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected error getting resource")
|
||||||
|
}
|
||||||
|
userStore, err := ms.GetUserdataDb(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if userStore != stateStore {
|
||||||
|
t.Fatalf("expected same store, but they are %p and %p", userStore, stateStore)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user