feat: working state syncer
This commit is contained in:
121
internal/store/store.go
Normal file
121
internal/store/store.go
Normal file
@@ -0,0 +1,121 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/jackc/tern/v2/migrate"
|
||||
"github.com/knadh/goyesql/v2"
|
||||
)
|
||||
|
||||
type (
|
||||
Queries struct {
|
||||
ExtractEntries string `query:"extract-entries"`
|
||||
UpdateCursor string `query:"update-cursor"`
|
||||
}
|
||||
|
||||
StoreOpts struct {
|
||||
Logg *slog.Logger
|
||||
DSN string
|
||||
MigrationsFolderPath string
|
||||
QueriesFolderPath string
|
||||
}
|
||||
|
||||
Store struct {
|
||||
Provider *pgxpool.Pool
|
||||
Queries *Queries
|
||||
}
|
||||
)
|
||||
|
||||
func NewStore(o StoreOpts) (*Store, error) {
|
||||
parsedConfig, err := pgxpool.ParseConfig(o.DSN)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbPool, err := pgxpool.NewWithConfig(context.Background(), parsedConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queries, err := loadQueries(o.QueriesFolderPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := runMigrations(dbPool, o.MigrationsFolderPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o.Logg.Info("migrations ran successfully")
|
||||
|
||||
return &Store{
|
||||
Provider: dbPool,
|
||||
Queries: queries,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Store) ExecuteTransaction(ctx context.Context, fn func(tx pgx.Tx) error) error {
|
||||
tx, err := s.Provider.Begin(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
tx.Rollback(ctx)
|
||||
} else {
|
||||
tx.Commit(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
if err = fn(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadQueries(queriesPath string) (*Queries, error) {
|
||||
parsedQueries, err := goyesql.ParseFile(queriesPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
loadedQueries := &Queries{}
|
||||
|
||||
if err := goyesql.ScanToStruct(loadedQueries, parsedQueries, nil); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan queries %v", err)
|
||||
}
|
||||
|
||||
return loadedQueries, nil
|
||||
}
|
||||
|
||||
func runMigrations(dbPool *pgxpool.Pool, migrationsPath string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
conn, err := dbPool.Acquire(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Release()
|
||||
|
||||
migrator, err := migrate.NewMigrator(ctx, conn.Conn(), "schema_version")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := migrator.LoadMigrations(os.DirFS(migrationsPath)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := migrator.Migrate(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
104
internal/syncer/syncer.go
Normal file
104
internal/syncer/syncer.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package syncer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"git.grassecon.net/urdt/ussd-data-connect/internal/store"
|
||||
"git.grassecon.net/urdt/ussd-data-connect/pkg/data"
|
||||
"github.com/georgysavva/scany/v2/pgxscan"
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
const syncInterval = time.Second * 10
|
||||
|
||||
type (
|
||||
KVRow struct {
|
||||
Key []byte `db:"key"`
|
||||
Value []byte `db:"value"`
|
||||
Updated time.Time `db:"updated"`
|
||||
}
|
||||
|
||||
SyncerOpts struct {
|
||||
Logg *slog.Logger
|
||||
Store *store.Store
|
||||
}
|
||||
|
||||
Syncer struct {
|
||||
logg *slog.Logger
|
||||
interval time.Duration
|
||||
done chan struct{}
|
||||
store *store.Store
|
||||
}
|
||||
)
|
||||
|
||||
func New(o SyncerOpts) *Syncer {
|
||||
return &Syncer{
|
||||
done: make(chan struct{}),
|
||||
interval: syncInterval,
|
||||
logg: o.Logg,
|
||||
store: o.Store,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) Run() {
|
||||
ticker := time.NewTicker(s.interval)
|
||||
s.logg.Info("syncer ticker started")
|
||||
for {
|
||||
select {
|
||||
case <-s.done:
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.logg.Debug("syncer tick")
|
||||
if err := s.process(context.Background()); err != nil {
|
||||
s.logg.Error("failed to process sync tick", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) Stop() {
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
func (s *Syncer) process(ctx context.Context) error {
|
||||
return s.store.ExecuteTransaction(ctx, func(tx pgx.Tx) error {
|
||||
rows, err := tx.Query(ctx, s.store.Queries.ExtractEntries)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
rs := pgxscan.NewRowScanner(rows)
|
||||
var batchTimestamp *time.Time
|
||||
for rows.Next() {
|
||||
var row KVRow
|
||||
if err := rs.Scan(&row); err != nil {
|
||||
return err
|
||||
}
|
||||
if batchTimestamp == nil {
|
||||
batchTimestamp = &row.Updated
|
||||
}
|
||||
decodedKeyDataType, sessionID := data.DecodeKey(row.Key)
|
||||
decodedValue := data.DecodeValue(row.Value)
|
||||
|
||||
if _, ok := data.ValidDataTypeLookup[decodedKeyDataType]; ok {
|
||||
s.logg.Debug("processing row", "batch_timestamp", batchTimestamp, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated)
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if batchTimestamp != nil {
|
||||
_, err = tx.Exec(ctx, s.store.Queries.UpdateCursor, batchTimestamp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
53
internal/util/init.go
Normal file
53
internal/util/init.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/kamikazechaser/common/logg"
|
||||
"github.com/knadh/koanf/parsers/toml"
|
||||
"github.com/knadh/koanf/providers/env"
|
||||
"github.com/knadh/koanf/providers/file"
|
||||
"github.com/knadh/koanf/v2"
|
||||
)
|
||||
|
||||
func InitLogger() *slog.Logger {
|
||||
loggOpts := logg.LoggOpts{
|
||||
FormatType: logg.Logfmt,
|
||||
LogLevel: slog.LevelInfo,
|
||||
}
|
||||
|
||||
if os.Getenv("DEBUG") != "" {
|
||||
loggOpts.LogLevel = slog.LevelDebug
|
||||
}
|
||||
|
||||
if os.Getenv("DEV") != "" {
|
||||
loggOpts.LogLevel = slog.LevelDebug
|
||||
loggOpts.FormatType = logg.Human
|
||||
}
|
||||
|
||||
return logg.NewLogg(loggOpts)
|
||||
}
|
||||
|
||||
func InitConfig(lo *slog.Logger, confFilePath string) *koanf.Koanf {
|
||||
var (
|
||||
ko = koanf.New(".")
|
||||
)
|
||||
|
||||
confFile := file.Provider(confFilePath)
|
||||
if err := ko.Load(confFile, toml.Parser()); err != nil {
|
||||
lo.Error("could not parse configuration file", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err := ko.Load(env.Provider("SYNC_", ".", func(s string) string {
|
||||
return strings.ReplaceAll(strings.ToLower(
|
||||
strings.TrimPrefix(s, "SYNC_")), "__", ".")
|
||||
}), nil); err != nil {
|
||||
lo.Error("could not override config from env vars", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return ko
|
||||
}
|
||||
Reference in New Issue
Block a user