ussd-data-connect/internal/syncer/syncer.go

123 lines
2.5 KiB
Go
Raw Normal View History

2025-01-02 10:05:09 +01:00
package syncer
import (
"context"
"fmt"
2025-01-02 10:05:09 +01:00
"log/slog"
"time"
"git.grassecon.net/urdt/ussd-data-connect/internal/store"
"git.grassecon.net/urdt/ussd-data-connect/pkg/data"
"github.com/jackc/pgx/v5"
)
const syncInterval = time.Second * 5
2025-01-02 10:05:09 +01:00
type (
KVRow struct {
Key []byte `db:"key"`
Value []byte `db:"value"`
Updated time.Time `db:"updated"`
}
SyncerOpts struct {
Logg *slog.Logger
Store *store.Store
}
Syncer struct {
interval time.Duration
done chan struct{}
logg *slog.Logger
2025-01-02 10:05:09 +01:00
store *store.Store
}
)
func New(o SyncerOpts) *Syncer {
return &Syncer{
done: make(chan struct{}),
interval: syncInterval,
logg: o.Logg,
store: o.Store,
}
}
func (s *Syncer) Run() {
ticker := time.NewTicker(s.interval)
s.logg.Info("syncer ticker started")
for {
select {
case <-s.done:
ticker.Stop()
return
case <-ticker.C:
s.logg.Debug("syncer tick")
if err := s.Process(context.Background()); err != nil {
2025-01-02 10:05:09 +01:00
s.logg.Error("failed to process sync tick", "error", err)
}
}
}
}
func (s *Syncer) Stop() {
close(s.done)
}
func (s *Syncer) Process(ctx context.Context) error {
2025-01-02 10:05:09 +01:00
return s.store.ExecuteTransaction(ctx, func(tx pgx.Tx) error {
rows, err := tx.Query(ctx, s.store.Queries.ExtractEntries)
if err != nil {
return err
}
defer rows.Close()
kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) {
var kvRow KVRow
err := row.Scan(&kvRow.Key, &kvRow.Value, &kvRow.Updated)
return kvRow, err
})
if err != nil {
return err
}
2025-01-02 10:05:09 +01:00
var batchTimestamp *time.Time
for _, row := range kvRows {
decodedKeyDataType, sessionID := data.DecodeKey(row.Key)
column, exists := data.ValidDataTypeLookup[decodedKeyDataType]
if exists {
if batchTimestamp == nil {
batchTimestamp = &row.Updated
}
decodedValue := data.DecodeValue(row.Value)
s.logg.Debug("processing row", "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column)
query := fmt.Sprintf(`
INSERT INTO ussd_data (phone_number, %s)
VALUES ($1, $2)
ON CONFLICT (phone_number) DO UPDATE
SET %s = $2;
`, column, column)
_, err = tx.Exec(
ctx,
query,
sessionID,
decodedValue,
)
if err != nil {
return err
}
2025-01-02 10:05:09 +01:00
}
}
if batchTimestamp != nil {
_, err = tx.Exec(ctx, s.store.Queries.UpdateCursor, batchTimestamp)
if err != nil {
return err
}
}
return nil
})
}