Compare commits

...

3 Commits

Author SHA1 Message Date
ad4963e5a0
(temp): fix extract entries query
Some checks failed
release / docker (push) Has been cancelled
2025-03-24 12:23:53 +03:00
fbd5f28d54
(temp): fix missing ID scan
Some checks failed
release / docker (push) Has been cancelled
2025-03-24 12:21:00 +03:00
8b7e6ca8bc
(temp): migration (disable dual syncer)
Some checks failed
release / docker (push) Has been cancelled
2025-03-24 12:17:26 +03:00
3 changed files with 11 additions and 6 deletions

View File

@ -71,7 +71,7 @@ func main() {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
syncer.Run() // syncer.Run()
}() }()
<-ctx.Done() <-ctx.Done()
@ -80,7 +80,7 @@ func main() {
wg.Add(1) wg.Add(1)
go func() { go func() {
syncer.Stop() // syncer.Stop()
defer wg.Done() defer wg.Done()
}() }()

View File

@ -15,6 +15,7 @@ const syncInterval = time.Hour * 1
type ( type (
KVRow struct { KVRow struct {
ID int `db:"id"`
Key []byte `db:"key"` Key []byte `db:"key"`
Value []byte `db:"value"` Value []byte `db:"value"`
Updated time.Time `db:"updated"` Updated time.Time `db:"updated"`
@ -73,7 +74,7 @@ func (s *Syncer) Process(ctx context.Context) error {
kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) { kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) {
var kvRow KVRow var kvRow KVRow
err := row.Scan(&kvRow.Key, &kvRow.Value, &kvRow.Updated) err := row.Scan(&kvRow.ID, &kvRow.Key, &kvRow.Value, &kvRow.Updated)
return kvRow, err return kvRow, err
}) })
if err != nil { if err != nil {
@ -91,7 +92,8 @@ func (s *Syncer) Process(ctx context.Context) error {
} }
decodedValue := data.DecodeValue(row.Value) decodedValue := data.DecodeValue(row.Value)
s.logg.Debug("processing row", "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column) s.logg.Info("processing row", "id", row.ID)
s.logg.Debug("processing row", "id", row.ID, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column)
query := fmt.Sprintf(` query := fmt.Sprintf(`
INSERT INTO ussd_data (phone_number, %s) INSERT INTO ussd_data (phone_number, %s)
VALUES ($1, $2) VALUES ($1, $2)

View File

@ -1,8 +1,11 @@
--name: extract-entries --name: extract-entries
SELECT key, value, updated FROM kv_vise SELECT id, key, value, updated FROM kv_vise
WHERE updated > (SELECT last_sync FROM ussd_sync LIMIT 1) WHERE updated > (SELECT last_sync FROM ussd_sync LIMIT 1)
ORDER BY updated DESC ORDER BY updated DESC
--name: update-cursor --name: update-cursor
-- $1: timestamp -- $1: timestamp
UPDATE ussd_sync SET last_sync = ($1) UPDATE ussd_sync SET last_sync = ($1)
-- 202b3235343732343934323039370013
-- 202b3235343731313030303132330013