package syncer import ( "context" "fmt" "log/slog" "time" "git.grassecon.net/urdt/ussd-data-connect/internal/store" "git.grassecon.net/urdt/ussd-data-connect/pkg/data" "github.com/jackc/pgx/v5" ) const syncInterval = time.Second * 5 type ( KVRow struct { Key []byte `db:"key"` Value []byte `db:"value"` Updated time.Time `db:"updated"` } SyncerOpts struct { Logg *slog.Logger Store *store.Store } Syncer struct { interval time.Duration done chan struct{} logg *slog.Logger store *store.Store } ) func New(o SyncerOpts) *Syncer { return &Syncer{ done: make(chan struct{}), interval: syncInterval, logg: o.Logg, store: o.Store, } } func (s *Syncer) Run() { ticker := time.NewTicker(s.interval) s.logg.Info("syncer ticker started") for { select { case <-s.done: ticker.Stop() return case <-ticker.C: s.logg.Debug("syncer tick") if err := s.Process(context.Background()); err != nil { s.logg.Error("failed to process sync tick", "error", err) } } } } func (s *Syncer) Stop() { close(s.done) } func (s *Syncer) Process(ctx context.Context) error { return s.store.ExecuteTransaction(ctx, func(tx pgx.Tx) error { rows, err := tx.Query(ctx, s.store.Queries.ExtractEntries) if err != nil { return err } defer rows.Close() kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) { var kvRow KVRow err := row.Scan(&kvRow.Key, &kvRow.Value, &kvRow.Updated) return kvRow, err }) if err != nil { return err } var batchTimestamp *time.Time for _, row := range kvRows { decodedKeyDataType, sessionID := data.DecodeKey(row.Key) column, exists := data.ValidDataTypeLookup[decodedKeyDataType] if exists { if batchTimestamp == nil { batchTimestamp = &row.Updated } decodedValue := data.DecodeValue(row.Value) s.logg.Debug("processing row", "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column) query := fmt.Sprintf(` INSERT INTO ussd_data (phone_number, %s) VALUES ($1, $2) ON CONFLICT (phone_number) DO UPDATE SET %s = $2; `, column, column) _, err = tx.Exec( ctx, query, sessionID, decodedValue, ) if err != nil { return err } } } if batchTimestamp != nil { _, err = tx.Exec(ctx, s.store.Queries.UpdateCursor, batchTimestamp) if err != nil { return err } } return nil }) }