feat: switch to postgres only sync, removed nats publishing
Some checks failed
release / docker (push) Has been cancelled
Some checks failed
release / docker (push) Has been cancelled
This commit is contained in:
@@ -1,89 +0,0 @@
|
||||
package pub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"git.grassecon.net/urdt/ussd-data-connect/pkg/event"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
type (
|
||||
Pub interface {
|
||||
Send(context.Context, event.Event) error
|
||||
Close()
|
||||
}
|
||||
|
||||
JetStreamOpts struct {
|
||||
Endpoint string
|
||||
PersistDuration time.Duration
|
||||
Logg *slog.Logger
|
||||
}
|
||||
|
||||
jetStreamPub struct {
|
||||
js jetstream.JetStream
|
||||
natsConn *nats.Conn
|
||||
}
|
||||
)
|
||||
|
||||
const streamName string = "USSD_DATA"
|
||||
|
||||
var streamSubjects = []string{
|
||||
"USSD_DATA.*",
|
||||
}
|
||||
|
||||
func NewJetStreamPub(o JetStreamOpts) (Pub, error) {
|
||||
natsConn, err := nats.Connect(o.Endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
js, err := jetstream.New(natsConn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
js.CreateStream(ctx, jetstream.StreamConfig{
|
||||
Name: streamName,
|
||||
Subjects: streamSubjects,
|
||||
MaxAge: o.PersistDuration,
|
||||
Storage: jetstream.FileStorage,
|
||||
Duplicates: time.Minute,
|
||||
})
|
||||
|
||||
return &jetStreamPub{
|
||||
natsConn: natsConn,
|
||||
js: js,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *jetStreamPub) Close() {
|
||||
if p.natsConn != nil {
|
||||
p.natsConn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *jetStreamPub) Send(ctx context.Context, payload event.Event) error {
|
||||
data, err := payload.Serialize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = p.js.Publish(
|
||||
ctx,
|
||||
fmt.Sprintf("%s.%d", streamName, payload.Type),
|
||||
data,
|
||||
jetstream.WithMsgID(fmt.Sprintf("%d:%s", payload.Type, payload.Value)),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -2,14 +2,12 @@ package syncer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"git.grassecon.net/urdt/ussd-data-connect/internal/pub"
|
||||
"git.grassecon.net/urdt/ussd-data-connect/internal/store"
|
||||
"git.grassecon.net/urdt/ussd-data-connect/pkg/data"
|
||||
"git.grassecon.net/urdt/ussd-data-connect/pkg/event"
|
||||
"github.com/georgysavva/scany/v2/pgxscan"
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
@@ -23,7 +21,6 @@ type (
|
||||
}
|
||||
|
||||
SyncerOpts struct {
|
||||
Pub pub.Pub
|
||||
Logg *slog.Logger
|
||||
Store *store.Store
|
||||
}
|
||||
@@ -31,7 +28,6 @@ type (
|
||||
Syncer struct {
|
||||
interval time.Duration
|
||||
done chan struct{}
|
||||
pub pub.Pub
|
||||
logg *slog.Logger
|
||||
store *store.Store
|
||||
}
|
||||
@@ -41,7 +37,6 @@ func New(o SyncerOpts) *Syncer {
|
||||
return &Syncer{
|
||||
done: make(chan struct{}),
|
||||
interval: syncInterval,
|
||||
pub: o.Pub,
|
||||
logg: o.Logg,
|
||||
store: o.Store,
|
||||
}
|
||||
@@ -76,34 +71,44 @@ func (s *Syncer) Process(ctx context.Context) error {
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
rs := pgxscan.NewRowScanner(rows)
|
||||
var batchTimestamp *time.Time
|
||||
for rows.Next() {
|
||||
var row KVRow
|
||||
if err := rs.Scan(&row); err != nil {
|
||||
return err
|
||||
}
|
||||
kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) {
|
||||
var kvRow KVRow
|
||||
err := row.Scan(&kvRow.Key, &kvRow.Value, &kvRow.Updated)
|
||||
return kvRow, err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var batchTimestamp *time.Time
|
||||
|
||||
for _, row := range kvRows {
|
||||
decodedKeyDataType, sessionID := data.DecodeKey(row.Key)
|
||||
if _, ok := data.ValidDataTypeLookup[decodedKeyDataType]; ok {
|
||||
column, exists := data.ValidDataTypeLookup[decodedKeyDataType]
|
||||
if exists {
|
||||
if batchTimestamp == nil {
|
||||
batchTimestamp = &row.Updated
|
||||
}
|
||||
decodedValue := data.DecodeValue(row.Value)
|
||||
|
||||
s.logg.Debug("processing row", "batch_timestamp", batchTimestamp, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated)
|
||||
if err := s.pub.Send(ctx, event.Event{
|
||||
Timestamp: row.Updated.Unix(),
|
||||
Type: decodedKeyDataType,
|
||||
Value: decodedValue,
|
||||
}); err != nil {
|
||||
s.logg.Debug("processing row", "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column)
|
||||
query := fmt.Sprintf(`
|
||||
INSERT INTO ussd_data (phone_number, %s)
|
||||
VALUES ($1, $2)
|
||||
ON CONFLICT (phone_number) DO UPDATE
|
||||
SET %s = $2;
|
||||
`, column, column)
|
||||
_, err = tx.Exec(
|
||||
ctx,
|
||||
query,
|
||||
sessionID,
|
||||
decodedValue,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if batchTimestamp != nil {
|
||||
_, err = tx.Exec(ctx, s.store.Queries.UpdateCursor, batchTimestamp)
|
||||
|
||||
Reference in New Issue
Block a user