From 4968cdff37cc8a9ab5e3f20db99fce6c684e25e6 Mon Sep 17 00:00:00 2001 From: alfred-mk Date: Thu, 17 Oct 2024 12:47:57 +0300 Subject: [PATCH] switch to postgres once the flag is set --- internal/storage/postgres.go | 115 +++++++++++++++++++++++++++++ internal/storage/storageservice.go | 53 ++++++++++--- 2 files changed, 156 insertions(+), 12 deletions(-) create mode 100644 internal/storage/postgres.go diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go new file mode 100644 index 0000000..ed4a711 --- /dev/null +++ b/internal/storage/postgres.go @@ -0,0 +1,115 @@ +package storage + +import ( + "context" + + "git.defalsify.org/vise.git/db" + postgres "git.defalsify.org/vise.git/db/postgres" + "git.defalsify.org/vise.git/lang" +) + +var ( + pdbC map[string]chan db.Db +) + +type ThreadPostgresDb struct { + db db.Db + connStr string +} + +func NewThreadPostgresDb() *ThreadPostgresDb { + if pdbC == nil { + pdbC = make(map[string]chan db.Db) + } + return &ThreadPostgresDb{} +} + +func (tpdb *ThreadPostgresDb) Connect(ctx context.Context, connStr string) error { + var ok bool + _, ok = pdbC[connStr] + if ok { + logg.WarnCtxf(ctx, "already registered thread postgres, skipping", "connStr", connStr) + return nil + } + postgresdb := postgres.NewPgDb().WithSchema("public") + err := postgresdb.Connect(ctx, connStr) + if err != nil { + return err + } + pdbC[connStr] = make(chan db.Db, 1) + pdbC[connStr] <- postgresdb + tpdb.connStr = connStr + return nil +} + +func (tpdb *ThreadPostgresDb) reserve() { + if tpdb.db == nil { + tpdb.db = <-pdbC[tpdb.connStr] + } +} + +func (tpdb *ThreadPostgresDb) release() { + if tpdb.db == nil { + return + } + pdbC[tpdb.connStr] <- tpdb.db + tpdb.db = nil +} + +func (tpdb *ThreadPostgresDb) SetPrefix(pfx uint8) { + tpdb.reserve() + tpdb.db.SetPrefix(pfx) +} + +func (tpdb *ThreadPostgresDb) SetSession(sessionId string) { + tpdb.reserve() + tpdb.db.SetSession(sessionId) +} + +func (tpdb *ThreadPostgresDb) SetLanguage(lng *lang.Language) { + tpdb.reserve() + tpdb.db.SetLanguage(lng) +} + +func (tpdb *ThreadPostgresDb) Safe() bool { + tpdb.reserve() + v := tpdb.db.Safe() + tpdb.release() + return v +} + +func (tpdb *ThreadPostgresDb) Prefix() uint8 { + tpdb.reserve() + v := tpdb.db.Prefix() + tpdb.release() + return v +} + +func (tpdb *ThreadPostgresDb) SetLock(typ uint8, locked bool) error { + tpdb.reserve() + err := tpdb.db.SetLock(typ, locked) + tpdb.release() + return err +} + +func (tpdb *ThreadPostgresDb) Put(ctx context.Context, key []byte, val []byte) error { + tpdb.reserve() + err := tpdb.db.Put(ctx, key, val) + tpdb.release() + return err +} + +func (tpdb *ThreadPostgresDb) Get(ctx context.Context, key []byte) ([]byte, error) { + tpdb.reserve() + v, err := tpdb.db.Get(ctx, key) + tpdb.release() + return v, err +} + +func (tpdb *ThreadPostgresDb) Close() error { + tpdb.reserve() + close(pdbC[tpdb.connStr]) + err := tpdb.db.Close() + tpdb.db = nil + return err +} diff --git a/internal/storage/storageservice.go b/internal/storage/storageservice.go index 07bccd6..d99e0ca 100644 --- a/internal/storage/storageservice.go +++ b/internal/storage/storageservice.go @@ -8,14 +8,15 @@ import ( "git.defalsify.org/vise.git/db" fsdb "git.defalsify.org/vise.git/db/fs" + "git.defalsify.org/vise.git/logging" "git.defalsify.org/vise.git/persist" "git.defalsify.org/vise.git/resource" - "git.defalsify.org/vise.git/logging" + "git.grassecon.net/urdt/ussd/initializers" ) var ( logg = logging.NewVanilla().WithDomain("storage") -) +) type StorageService interface { GetPersister(ctx context.Context) (*persist.Persister, error) @@ -24,17 +25,30 @@ type StorageService interface { EnsureDbDir() error } -type MenuStorageService struct{ - dbDir string - resourceDir string +type MenuStorageService struct { + dbDir string + resourceDir string resourceStore db.Db - stateStore db.Db + stateStore db.Db userDataStore db.Db } +func buildConnStr() string { + host := initializers.GetEnv("DB_HOST", "localhost") + user := initializers.GetEnv("DB_USER", "postgres") + password := initializers.GetEnv("DB_PASSWORD", "") + dbName := initializers.GetEnv("DB_NAME", "") + port := initializers.GetEnv("DB_PORT", "5432") + + return fmt.Sprintf( + "postgres://%s:%s@%s:%s/%s", + user, password, host, port, dbName, + ) +} + func NewMenuStorageService(dbDir string, resourceDir string) *MenuStorageService { return &MenuStorageService{ - dbDir: dbDir, + dbDir: dbDir, resourceDir: resourceDir, } } @@ -52,12 +66,27 @@ func (ms *MenuStorageService) GetPersister(ctx context.Context) (*persist.Persis } func (ms *MenuStorageService) GetUserdataDb(ctx context.Context) (db.Db, error) { - ms.userDataStore = NewThreadGdbmDb() - storeFile := path.Join(ms.dbDir, "userdata.gdbm") - err := ms.userDataStore.Connect(ctx, storeFile) - if err != nil { - return nil, err + database, ok := ctx.Value("Database").(string) + if !ok { + fmt.Println("The database is not set") } + + if database == "postgres" { + ms.userDataStore = NewThreadPostgresDb() + connStr := buildConnStr() + err := ms.userDataStore.Connect(ctx, connStr) + if err != nil { + return nil, err + } + } else { + ms.userDataStore = NewThreadGdbmDb() + storeFile := path.Join(ms.dbDir, "userdata.gdbm") + err := ms.userDataStore.Connect(ctx, storeFile) + if err != nil { + return nil, err + } + } + return ms.userDataStore, nil }