diff --git a/cmd/main.go b/cmd/main.go index 3466b9a..d8e7d37 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -11,6 +11,7 @@ import ( "syscall" "time" + "git.grassecon.net/urdt/ussd-data-connect/internal/pub" "git.grassecon.net/urdt/ussd-data-connect/internal/store" "git.grassecon.net/urdt/ussd-data-connect/internal/syncer" "git.grassecon.net/urdt/ussd-data-connect/internal/util" @@ -57,11 +58,28 @@ func main() { os.Exit(1) } + pub, err := pub.NewJetStreamPub(pub.JetStreamOpts{ + Endpoint: ko.MustString("jetstream.endpoint"), + PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, + Logg: lo, + }) + if err != nil { + lo.Error("could not initialize jetstream publisher", "error", err) + os.Exit(1) + } + syncer := syncer.New(syncer.SyncerOpts{ + Pub: pub, Logg: lo, Store: store, }) + // Trigger an initial sync before the ticker routine takes over + if err := syncer.Process(context.Background()); err != nil { + lo.Error("failed to process initial sync", "error", err) + os.Exit(1) + } + wg.Add(1) go func() { defer wg.Done() diff --git a/config.toml b/config.toml index 8b585e7..bb98f99 100644 --- a/config.toml +++ b/config.toml @@ -9,4 +9,5 @@ dsn = "postgres://postgres:postgres@127.0.0.1:5432/urdt_ussd" [jetstream] endpoint = "nats://127.0.0.1:4222" +persist_duration_hrs = 24 id = "ussd-sync" diff --git a/go.mod b/go.mod index c57c4c7..16f0df7 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/knadh/koanf/providers/env v1.0.0 github.com/knadh/koanf/providers/file v1.1.2 github.com/knadh/koanf/v2 v2.1.2 + github.com/nats-io/nats.go v1.38.0 ) require ( @@ -26,10 +27,13 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/lmittmann/tint v1.0.4 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.9 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/spf13/cast v1.7.0 // indirect diff --git a/go.sum b/go.sum index a1240ac..8fdeaf5 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/jackc/tern/v2 v2.3.2/go.mod h1:cJYmwlpXLs3vBtbkfKdgoZL0G96mH56W+fugKx github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/kamikazechaser/common v1.0.0 h1:a/47O/TyQb417CUwsBDI7zlnJEnmhJz9czSPirpjlLg= github.com/kamikazechaser/common v1.0.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem4= github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= @@ -68,6 +70,12 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1 github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= +github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/internal/pub/pub.go b/internal/pub/pub.go new file mode 100644 index 0000000..2bf2129 --- /dev/null +++ b/internal/pub/pub.go @@ -0,0 +1,89 @@ +package pub + +import ( + "context" + "fmt" + "log/slog" + "time" + + "git.grassecon.net/urdt/ussd-data-connect/pkg/event" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +type ( + Pub interface { + Send(context.Context, event.Event) error + Close() + } + + JetStreamOpts struct { + Endpoint string + PersistDuration time.Duration + Logg *slog.Logger + } + + jetStreamPub struct { + js jetstream.JetStream + natsConn *nats.Conn + } +) + +const streamName string = "USSD_DATA" + +var streamSubjects = []string{ + "USSD_DATA.*", +} + +func NewJetStreamPub(o JetStreamOpts) (Pub, error) { + natsConn, err := nats.Connect(o.Endpoint) + if err != nil { + return nil, err + } + + js, err := jetstream.New(natsConn) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + js.CreateStream(ctx, jetstream.StreamConfig{ + Name: streamName, + Subjects: streamSubjects, + MaxAge: o.PersistDuration, + Storage: jetstream.FileStorage, + Duplicates: time.Minute, + }) + + return &jetStreamPub{ + natsConn: natsConn, + js: js, + }, nil +} + +func (p *jetStreamPub) Close() { + if p.natsConn != nil { + p.natsConn.Close() + } +} + +func (p *jetStreamPub) Send(ctx context.Context, payload event.Event) error { + data, err := payload.Serialize() + if err != nil { + return err + } + + _, err = p.js.Publish( + ctx, + fmt.Sprintf("%s.%d", streamName, payload.Type), + data, + jetstream.WithMsgID(fmt.Sprintf("%d:%s", payload.Type, payload.Value)), + ) + if err != nil { + return err + } + + return nil +} diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index 9ca5511..42a1479 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -5,13 +5,15 @@ import ( "log/slog" "time" + "git.grassecon.net/urdt/ussd-data-connect/internal/pub" "git.grassecon.net/urdt/ussd-data-connect/internal/store" "git.grassecon.net/urdt/ussd-data-connect/pkg/data" + "git.grassecon.net/urdt/ussd-data-connect/pkg/event" "github.com/georgysavva/scany/v2/pgxscan" "github.com/jackc/pgx/v5" ) -const syncInterval = time.Second * 10 +const syncInterval = time.Second * 5 type ( KVRow struct { @@ -21,14 +23,16 @@ type ( } SyncerOpts struct { + Pub pub.Pub Logg *slog.Logger Store *store.Store } Syncer struct { - logg *slog.Logger interval time.Duration done chan struct{} + pub pub.Pub + logg *slog.Logger store *store.Store } ) @@ -37,6 +41,7 @@ func New(o SyncerOpts) *Syncer { return &Syncer{ done: make(chan struct{}), interval: syncInterval, + pub: o.Pub, logg: o.Logg, store: o.Store, } @@ -52,7 +57,7 @@ func (s *Syncer) Run() { return case <-ticker.C: s.logg.Debug("syncer tick") - if err := s.process(context.Background()); err != nil { + if err := s.Process(context.Background()); err != nil { s.logg.Error("failed to process sync tick", "error", err) } } @@ -63,7 +68,7 @@ func (s *Syncer) Stop() { close(s.done) } -func (s *Syncer) process(ctx context.Context) error { +func (s *Syncer) Process(ctx context.Context) error { return s.store.ExecuteTransaction(ctx, func(tx pgx.Tx) error { rows, err := tx.Query(ctx, s.store.Queries.ExtractEntries) if err != nil { @@ -78,14 +83,22 @@ func (s *Syncer) process(ctx context.Context) error { if err := rs.Scan(&row); err != nil { return err } - if batchTimestamp == nil { - batchTimestamp = &row.Updated - } - decodedKeyDataType, sessionID := data.DecodeKey(row.Key) - decodedValue := data.DecodeValue(row.Value) + decodedKeyDataType, sessionID := data.DecodeKey(row.Key) if _, ok := data.ValidDataTypeLookup[decodedKeyDataType]; ok { + if batchTimestamp == nil { + batchTimestamp = &row.Updated + } + decodedValue := data.DecodeValue(row.Value) + s.logg.Debug("processing row", "batch_timestamp", batchTimestamp, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated) + if err := s.pub.Send(ctx, event.Event{ + Timestamp: row.Updated.Unix(), + Type: decodedKeyDataType, + Value: decodedValue, + }); err != nil { + return err + } } } if err := rows.Err(); err != nil { diff --git a/pkg/event/event.go b/pkg/event/event.go new file mode 100644 index 0000000..cce9bdc --- /dev/null +++ b/pkg/event/event.go @@ -0,0 +1,32 @@ +package event + +import "encoding/json" + +type ( + Event struct { + Timestamp int64 `json:"timestamp"` + Type uint16 `json:"type"` + Value string `json:"payload"` + } +) + +func (e Event) Serialize() ([]byte, error) { + jsonData, err := json.Marshal(e) + if err != nil { + return nil, err + } + + return jsonData, err +} + +func Deserialize(jsonData []byte) (Event, error) { + var ( + event Event + ) + + if err := json.Unmarshal(jsonData, &event); err != nil { + return event, err + } + + return event, nil +}