122 lines
2.2 KiB
Go
122 lines
2.2 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/jackc/tern/v2/migrate"
|
|
"github.com/knadh/goyesql/v2"
|
|
)
|
|
|
|
type (
|
|
Queries struct {
|
|
ExtractEntries string `query:"extract-entries"`
|
|
UpdateCursor string `query:"update-cursor"`
|
|
}
|
|
|
|
StoreOpts struct {
|
|
Logg *slog.Logger
|
|
DSN string
|
|
MigrationsFolderPath string
|
|
QueriesFolderPath string
|
|
}
|
|
|
|
Store struct {
|
|
Provider *pgxpool.Pool
|
|
Queries *Queries
|
|
}
|
|
)
|
|
|
|
func NewStore(o StoreOpts) (*Store, error) {
|
|
parsedConfig, err := pgxpool.ParseConfig(o.DSN)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dbPool, err := pgxpool.NewWithConfig(context.Background(), parsedConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queries, err := loadQueries(o.QueriesFolderPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := runMigrations(dbPool, o.MigrationsFolderPath); err != nil {
|
|
return nil, err
|
|
}
|
|
o.Logg.Info("migrations ran successfully")
|
|
|
|
return &Store{
|
|
Provider: dbPool,
|
|
Queries: queries,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Store) ExecuteTransaction(ctx context.Context, fn func(tx pgx.Tx) error) error {
|
|
tx, err := s.Provider.Begin(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
tx.Rollback(ctx)
|
|
} else {
|
|
tx.Commit(ctx)
|
|
}
|
|
}()
|
|
|
|
if err = fn(tx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func loadQueries(queriesPath string) (*Queries, error) {
|
|
parsedQueries, err := goyesql.ParseFile(queriesPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
loadedQueries := &Queries{}
|
|
|
|
if err := goyesql.ScanToStruct(loadedQueries, parsedQueries, nil); err != nil {
|
|
return nil, fmt.Errorf("failed to scan queries %v", err)
|
|
}
|
|
|
|
return loadedQueries, nil
|
|
}
|
|
|
|
func runMigrations(dbPool *pgxpool.Pool, migrationsPath string) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
|
|
conn, err := dbPool.Acquire(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Release()
|
|
|
|
migrator, err := migrate.NewMigrator(ctx, conn.Conn(), "schema_version")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := migrator.LoadMigrations(os.DirFS(migrationsPath)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := migrator.Migrate(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|