forked from grassrootseconomics/visedriver
		
	switch to postgres once the flag is set
This commit is contained in:
		
							parent
							
								
									1aeb18379c
								
							
						
					
					
						commit
						4968cdff37
					
				
							
								
								
									
										115
									
								
								internal/storage/postgres.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										115
									
								
								internal/storage/postgres.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,115 @@ | ||||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"git.defalsify.org/vise.git/db" | ||||
| 	postgres "git.defalsify.org/vise.git/db/postgres" | ||||
| 	"git.defalsify.org/vise.git/lang" | ||||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	pdbC map[string]chan db.Db | ||||
| ) | ||||
| 
 | ||||
| type ThreadPostgresDb struct { | ||||
| 	db      db.Db | ||||
| 	connStr string | ||||
| } | ||||
| 
 | ||||
| func NewThreadPostgresDb() *ThreadPostgresDb { | ||||
| 	if pdbC == nil { | ||||
| 		pdbC = make(map[string]chan db.Db) | ||||
| 	} | ||||
| 	return &ThreadPostgresDb{} | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) Connect(ctx context.Context, connStr string) error { | ||||
| 	var ok bool | ||||
| 	_, ok = pdbC[connStr] | ||||
| 	if ok { | ||||
| 		logg.WarnCtxf(ctx, "already registered thread postgres, skipping", "connStr", connStr) | ||||
| 		return nil | ||||
| 	} | ||||
| 	postgresdb := postgres.NewPgDb().WithSchema("public") | ||||
| 	err := postgresdb.Connect(ctx, connStr) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	pdbC[connStr] = make(chan db.Db, 1) | ||||
| 	pdbC[connStr] <- postgresdb | ||||
| 	tpdb.connStr = connStr | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) reserve() { | ||||
| 	if tpdb.db == nil { | ||||
| 		tpdb.db = <-pdbC[tpdb.connStr] | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) release() { | ||||
| 	if tpdb.db == nil { | ||||
| 		return | ||||
| 	} | ||||
| 	pdbC[tpdb.connStr] <- tpdb.db | ||||
| 	tpdb.db = nil | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) SetPrefix(pfx uint8) { | ||||
| 	tpdb.reserve() | ||||
| 	tpdb.db.SetPrefix(pfx) | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) SetSession(sessionId string) { | ||||
| 	tpdb.reserve() | ||||
| 	tpdb.db.SetSession(sessionId) | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) SetLanguage(lng *lang.Language) { | ||||
| 	tpdb.reserve() | ||||
| 	tpdb.db.SetLanguage(lng) | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) Safe() bool { | ||||
| 	tpdb.reserve() | ||||
| 	v := tpdb.db.Safe() | ||||
| 	tpdb.release() | ||||
| 	return v | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) Prefix() uint8 { | ||||
| 	tpdb.reserve() | ||||
| 	v := tpdb.db.Prefix() | ||||
| 	tpdb.release() | ||||
| 	return v | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) SetLock(typ uint8, locked bool) error { | ||||
| 	tpdb.reserve() | ||||
| 	err := tpdb.db.SetLock(typ, locked) | ||||
| 	tpdb.release() | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) Put(ctx context.Context, key []byte, val []byte) error { | ||||
| 	tpdb.reserve() | ||||
| 	err := tpdb.db.Put(ctx, key, val) | ||||
| 	tpdb.release() | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) Get(ctx context.Context, key []byte) ([]byte, error) { | ||||
| 	tpdb.reserve() | ||||
| 	v, err := tpdb.db.Get(ctx, key) | ||||
| 	tpdb.release() | ||||
| 	return v, err | ||||
| } | ||||
| 
 | ||||
| func (tpdb *ThreadPostgresDb) Close() error { | ||||
| 	tpdb.reserve() | ||||
| 	close(pdbC[tpdb.connStr]) | ||||
| 	err := tpdb.db.Close() | ||||
| 	tpdb.db = nil | ||||
| 	return err | ||||
| } | ||||
| @ -8,14 +8,15 @@ import ( | ||||
| 
 | ||||
| 	"git.defalsify.org/vise.git/db" | ||||
| 	fsdb "git.defalsify.org/vise.git/db/fs" | ||||
| 	"git.defalsify.org/vise.git/logging" | ||||
| 	"git.defalsify.org/vise.git/persist" | ||||
| 	"git.defalsify.org/vise.git/resource" | ||||
| 	"git.defalsify.org/vise.git/logging" | ||||
| 	"git.grassecon.net/urdt/ussd/initializers" | ||||
| ) | ||||
| 
 | ||||
| var ( | ||||
| 	logg = logging.NewVanilla().WithDomain("storage") | ||||
| )	 | ||||
| ) | ||||
| 
 | ||||
| type StorageService interface { | ||||
| 	GetPersister(ctx context.Context) (*persist.Persister, error) | ||||
| @ -24,17 +25,30 @@ type StorageService interface { | ||||
| 	EnsureDbDir() error | ||||
| } | ||||
| 
 | ||||
| type MenuStorageService struct{ | ||||
| 	dbDir string | ||||
| 	resourceDir string | ||||
| type MenuStorageService struct { | ||||
| 	dbDir         string | ||||
| 	resourceDir   string | ||||
| 	resourceStore db.Db | ||||
| 	stateStore db.Db | ||||
| 	stateStore    db.Db | ||||
| 	userDataStore db.Db | ||||
| } | ||||
| 
 | ||||
| func buildConnStr() string { | ||||
| 	host := initializers.GetEnv("DB_HOST", "localhost") | ||||
| 	user := initializers.GetEnv("DB_USER", "postgres") | ||||
| 	password := initializers.GetEnv("DB_PASSWORD", "") | ||||
| 	dbName := initializers.GetEnv("DB_NAME", "") | ||||
| 	port := initializers.GetEnv("DB_PORT", "5432") | ||||
| 
 | ||||
| 	return fmt.Sprintf( | ||||
| 		"postgres://%s:%s@%s:%s/%s", | ||||
| 		user, password, host, port, dbName, | ||||
| 	) | ||||
| } | ||||
| 
 | ||||
| func NewMenuStorageService(dbDir string, resourceDir string) *MenuStorageService { | ||||
| 	return &MenuStorageService{ | ||||
| 		dbDir: dbDir, | ||||
| 		dbDir:       dbDir, | ||||
| 		resourceDir: resourceDir, | ||||
| 	} | ||||
| } | ||||
| @ -52,12 +66,27 @@ func (ms *MenuStorageService) GetPersister(ctx context.Context) (*persist.Persis | ||||
| } | ||||
| 
 | ||||
| func (ms *MenuStorageService) GetUserdataDb(ctx context.Context) (db.Db, error) { | ||||
| 	ms.userDataStore = NewThreadGdbmDb() | ||||
| 	storeFile := path.Join(ms.dbDir, "userdata.gdbm") | ||||
| 	err := ms.userDataStore.Connect(ctx, storeFile) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	database, ok := ctx.Value("Database").(string) | ||||
| 	if !ok { | ||||
| 		fmt.Println("The database is not set") | ||||
| 	} | ||||
| 
 | ||||
| 	if database == "postgres" { | ||||
| 		ms.userDataStore = NewThreadPostgresDb() | ||||
| 		connStr := buildConnStr() | ||||
| 		err := ms.userDataStore.Connect(ctx, connStr) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} else { | ||||
| 		ms.userDataStore = NewThreadGdbmDb() | ||||
| 		storeFile := path.Join(ms.dbDir, "userdata.gdbm") | ||||
| 		err := ms.userDataStore.Connect(ctx, storeFile) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return ms.userDataStore, nil | ||||
| } | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user