feat: add JS publisher, fix syncer batch timestamp to correctly use user data only
This commit is contained in:
parent
4f9a95dfeb
commit
3c68e1384c
18
cmd/main.go
18
cmd/main.go
@ -11,6 +11,7 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.grassecon.net/urdt/ussd-data-connect/internal/pub"
|
||||||
"git.grassecon.net/urdt/ussd-data-connect/internal/store"
|
"git.grassecon.net/urdt/ussd-data-connect/internal/store"
|
||||||
"git.grassecon.net/urdt/ussd-data-connect/internal/syncer"
|
"git.grassecon.net/urdt/ussd-data-connect/internal/syncer"
|
||||||
"git.grassecon.net/urdt/ussd-data-connect/internal/util"
|
"git.grassecon.net/urdt/ussd-data-connect/internal/util"
|
||||||
@ -57,11 +58,28 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub, err := pub.NewJetStreamPub(pub.JetStreamOpts{
|
||||||
|
Endpoint: ko.MustString("jetstream.endpoint"),
|
||||||
|
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour,
|
||||||
|
Logg: lo,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
lo.Error("could not initialize jetstream publisher", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
syncer := syncer.New(syncer.SyncerOpts{
|
syncer := syncer.New(syncer.SyncerOpts{
|
||||||
|
Pub: pub,
|
||||||
Logg: lo,
|
Logg: lo,
|
||||||
Store: store,
|
Store: store,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Trigger an initial sync before the ticker routine takes over
|
||||||
|
if err := syncer.Process(context.Background()); err != nil {
|
||||||
|
lo.Error("failed to process initial sync", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
@ -9,4 +9,5 @@ dsn = "postgres://postgres:postgres@127.0.0.1:5432/urdt_ussd"
|
|||||||
|
|
||||||
[jetstream]
|
[jetstream]
|
||||||
endpoint = "nats://127.0.0.1:4222"
|
endpoint = "nats://127.0.0.1:4222"
|
||||||
|
persist_duration_hrs = 24
|
||||||
id = "ussd-sync"
|
id = "ussd-sync"
|
||||||
|
4
go.mod
4
go.mod
@ -12,6 +12,7 @@ require (
|
|||||||
github.com/knadh/koanf/providers/env v1.0.0
|
github.com/knadh/koanf/providers/env v1.0.0
|
||||||
github.com/knadh/koanf/providers/file v1.1.2
|
github.com/knadh/koanf/providers/file v1.1.2
|
||||||
github.com/knadh/koanf/v2 v2.1.2
|
github.com/knadh/koanf/v2 v2.1.2
|
||||||
|
github.com/nats-io/nats.go v1.38.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@ -26,10 +27,13 @@ require (
|
|||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
|
github.com/klauspost/compress v1.17.9 // indirect
|
||||||
github.com/knadh/koanf/maps v0.1.1 // indirect
|
github.com/knadh/koanf/maps v0.1.1 // indirect
|
||||||
github.com/lmittmann/tint v1.0.4 // indirect
|
github.com/lmittmann/tint v1.0.4 // indirect
|
||||||
github.com/mitchellh/copystructure v1.2.0 // indirect
|
github.com/mitchellh/copystructure v1.2.0 // indirect
|
||||||
github.com/mitchellh/reflectwalk v1.0.2 // indirect
|
github.com/mitchellh/reflectwalk v1.0.2 // indirect
|
||||||
|
github.com/nats-io/nkeys v0.4.9 // indirect
|
||||||
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/pelletier/go-toml v1.9.5 // indirect
|
github.com/pelletier/go-toml v1.9.5 // indirect
|
||||||
github.com/shopspring/decimal v1.4.0 // indirect
|
github.com/shopspring/decimal v1.4.0 // indirect
|
||||||
github.com/spf13/cast v1.7.0 // indirect
|
github.com/spf13/cast v1.7.0 // indirect
|
||||||
|
8
go.sum
8
go.sum
@ -42,6 +42,8 @@ github.com/jackc/tern/v2 v2.3.2/go.mod h1:cJYmwlpXLs3vBtbkfKdgoZL0G96mH56W+fugKx
|
|||||||
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
|
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
|
||||||
github.com/kamikazechaser/common v1.0.0 h1:a/47O/TyQb417CUwsBDI7zlnJEnmhJz9czSPirpjlLg=
|
github.com/kamikazechaser/common v1.0.0 h1:a/47O/TyQb417CUwsBDI7zlnJEnmhJz9czSPirpjlLg=
|
||||||
github.com/kamikazechaser/common v1.0.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY=
|
github.com/kamikazechaser/common v1.0.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY=
|
||||||
|
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
|
||||||
|
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||||
github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem4=
|
github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem4=
|
||||||
github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI=
|
github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI=
|
||||||
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
|
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
|
||||||
@ -68,6 +70,12 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1
|
|||||||
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
|
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
|
||||||
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
|
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
|
||||||
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
|
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
|
||||||
|
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
|
||||||
|
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
|
||||||
|
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
|
||||||
|
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
|
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
|
||||||
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
|
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
|
||||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||||
|
89
internal/pub/pub.go
Normal file
89
internal/pub/pub.go
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
package pub
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.grassecon.net/urdt/ussd-data-connect/pkg/event"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
Pub interface {
|
||||||
|
Send(context.Context, event.Event) error
|
||||||
|
Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
JetStreamOpts struct {
|
||||||
|
Endpoint string
|
||||||
|
PersistDuration time.Duration
|
||||||
|
Logg *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
jetStreamPub struct {
|
||||||
|
js jetstream.JetStream
|
||||||
|
natsConn *nats.Conn
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
const streamName string = "USSD_DATA"
|
||||||
|
|
||||||
|
var streamSubjects = []string{
|
||||||
|
"USSD_DATA.*",
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewJetStreamPub(o JetStreamOpts) (Pub, error) {
|
||||||
|
natsConn, err := nats.Connect(o.Endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
js, err := jetstream.New(natsConn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
js.CreateStream(ctx, jetstream.StreamConfig{
|
||||||
|
Name: streamName,
|
||||||
|
Subjects: streamSubjects,
|
||||||
|
MaxAge: o.PersistDuration,
|
||||||
|
Storage: jetstream.FileStorage,
|
||||||
|
Duplicates: time.Minute,
|
||||||
|
})
|
||||||
|
|
||||||
|
return &jetStreamPub{
|
||||||
|
natsConn: natsConn,
|
||||||
|
js: js,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *jetStreamPub) Close() {
|
||||||
|
if p.natsConn != nil {
|
||||||
|
p.natsConn.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *jetStreamPub) Send(ctx context.Context, payload event.Event) error {
|
||||||
|
data, err := payload.Serialize()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = p.js.Publish(
|
||||||
|
ctx,
|
||||||
|
fmt.Sprintf("%s.%d", streamName, payload.Type),
|
||||||
|
data,
|
||||||
|
jetstream.WithMsgID(fmt.Sprintf("%d:%s", payload.Type, payload.Value)),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -5,13 +5,15 @@ import (
|
|||||||
"log/slog"
|
"log/slog"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.grassecon.net/urdt/ussd-data-connect/internal/pub"
|
||||||
"git.grassecon.net/urdt/ussd-data-connect/internal/store"
|
"git.grassecon.net/urdt/ussd-data-connect/internal/store"
|
||||||
"git.grassecon.net/urdt/ussd-data-connect/pkg/data"
|
"git.grassecon.net/urdt/ussd-data-connect/pkg/data"
|
||||||
|
"git.grassecon.net/urdt/ussd-data-connect/pkg/event"
|
||||||
"github.com/georgysavva/scany/v2/pgxscan"
|
"github.com/georgysavva/scany/v2/pgxscan"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
)
|
)
|
||||||
|
|
||||||
const syncInterval = time.Second * 10
|
const syncInterval = time.Second * 5
|
||||||
|
|
||||||
type (
|
type (
|
||||||
KVRow struct {
|
KVRow struct {
|
||||||
@ -21,14 +23,16 @@ type (
|
|||||||
}
|
}
|
||||||
|
|
||||||
SyncerOpts struct {
|
SyncerOpts struct {
|
||||||
|
Pub pub.Pub
|
||||||
Logg *slog.Logger
|
Logg *slog.Logger
|
||||||
Store *store.Store
|
Store *store.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
Syncer struct {
|
Syncer struct {
|
||||||
logg *slog.Logger
|
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
pub pub.Pub
|
||||||
|
logg *slog.Logger
|
||||||
store *store.Store
|
store *store.Store
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@ -37,6 +41,7 @@ func New(o SyncerOpts) *Syncer {
|
|||||||
return &Syncer{
|
return &Syncer{
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
interval: syncInterval,
|
interval: syncInterval,
|
||||||
|
pub: o.Pub,
|
||||||
logg: o.Logg,
|
logg: o.Logg,
|
||||||
store: o.Store,
|
store: o.Store,
|
||||||
}
|
}
|
||||||
@ -52,7 +57,7 @@ func (s *Syncer) Run() {
|
|||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
s.logg.Debug("syncer tick")
|
s.logg.Debug("syncer tick")
|
||||||
if err := s.process(context.Background()); err != nil {
|
if err := s.Process(context.Background()); err != nil {
|
||||||
s.logg.Error("failed to process sync tick", "error", err)
|
s.logg.Error("failed to process sync tick", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -63,7 +68,7 @@ func (s *Syncer) Stop() {
|
|||||||
close(s.done)
|
close(s.done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Syncer) process(ctx context.Context) error {
|
func (s *Syncer) Process(ctx context.Context) error {
|
||||||
return s.store.ExecuteTransaction(ctx, func(tx pgx.Tx) error {
|
return s.store.ExecuteTransaction(ctx, func(tx pgx.Tx) error {
|
||||||
rows, err := tx.Query(ctx, s.store.Queries.ExtractEntries)
|
rows, err := tx.Query(ctx, s.store.Queries.ExtractEntries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -78,14 +83,22 @@ func (s *Syncer) process(ctx context.Context) error {
|
|||||||
if err := rs.Scan(&row); err != nil {
|
if err := rs.Scan(&row); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
decodedKeyDataType, sessionID := data.DecodeKey(row.Key)
|
||||||
|
if _, ok := data.ValidDataTypeLookup[decodedKeyDataType]; ok {
|
||||||
if batchTimestamp == nil {
|
if batchTimestamp == nil {
|
||||||
batchTimestamp = &row.Updated
|
batchTimestamp = &row.Updated
|
||||||
}
|
}
|
||||||
decodedKeyDataType, sessionID := data.DecodeKey(row.Key)
|
|
||||||
decodedValue := data.DecodeValue(row.Value)
|
decodedValue := data.DecodeValue(row.Value)
|
||||||
|
|
||||||
if _, ok := data.ValidDataTypeLookup[decodedKeyDataType]; ok {
|
|
||||||
s.logg.Debug("processing row", "batch_timestamp", batchTimestamp, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated)
|
s.logg.Debug("processing row", "batch_timestamp", batchTimestamp, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated)
|
||||||
|
if err := s.pub.Send(ctx, event.Event{
|
||||||
|
Timestamp: row.Updated.Unix(),
|
||||||
|
Type: decodedKeyDataType,
|
||||||
|
Value: decodedValue,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
|
32
pkg/event/event.go
Normal file
32
pkg/event/event.go
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
package event
|
||||||
|
|
||||||
|
import "encoding/json"
|
||||||
|
|
||||||
|
type (
|
||||||
|
Event struct {
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
Type uint16 `json:"type"`
|
||||||
|
Value string `json:"payload"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func (e Event) Serialize() ([]byte, error) {
|
||||||
|
jsonData, err := json.Marshal(e)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonData, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func Deserialize(jsonData []byte) (Event, error) {
|
||||||
|
var (
|
||||||
|
event Event
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := json.Unmarshal(jsonData, &event); err != nil {
|
||||||
|
return event, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return event, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user