From cd5b31d07ed9a6152d360ed8653201cf5024fd50 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 9 Jan 2025 09:28:41 +0300 Subject: [PATCH] feat: switch to postgres only sync, removed nats publishing --- cmd/main.go | 12 ----- go.mod | 5 -- go.sum | 20 -------- internal/pub/pub.go | 89 ------------------------------------ internal/syncer/syncer.go | 51 +++++++++++---------- migrations/002_ussd_data.sql | 14 ++++++ pkg/data/type.go | 19 ++++---- 7 files changed, 52 insertions(+), 158 deletions(-) delete mode 100644 internal/pub/pub.go create mode 100644 migrations/002_ussd_data.sql diff --git a/cmd/main.go b/cmd/main.go index ef69b50..9f86c1d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -11,7 +11,6 @@ import ( "syscall" "time" - "git.grassecon.net/urdt/ussd-data-connect/internal/pub" "git.grassecon.net/urdt/ussd-data-connect/internal/store" "git.grassecon.net/urdt/ussd-data-connect/internal/syncer" "git.grassecon.net/urdt/ussd-data-connect/internal/util" @@ -58,18 +57,7 @@ func main() { os.Exit(1) } - pub, err := pub.NewJetStreamPub(pub.JetStreamOpts{ - Endpoint: ko.MustString("jetstream.endpoint"), - PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, - Logg: lo, - }) - if err != nil { - lo.Error("could not initialize jetstream publisher", "error", err) - os.Exit(1) - } - syncer := syncer.New(syncer.SyncerOpts{ - Pub: pub, Logg: lo, Store: store, }) diff --git a/go.mod b/go.mod index 16f0df7..8bdcc8e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module git.grassecon.net/urdt/ussd-data-connect go 1.23.3 require ( - github.com/georgysavva/scany/v2 v2.1.3 github.com/jackc/pgx/v5 v5.7.2 github.com/jackc/tern/v2 v2.3.2 github.com/kamikazechaser/common v1.0.0 @@ -12,7 +11,6 @@ require ( github.com/knadh/koanf/providers/env v1.0.0 github.com/knadh/koanf/providers/file v1.1.2 github.com/knadh/koanf/v2 v2.1.2 - github.com/nats-io/nats.go v1.38.0 ) require ( @@ -27,13 +25,10 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/klauspost/compress v1.17.9 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/lmittmann/tint v1.0.4 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect - github.com/nats-io/nkeys v0.4.9 // indirect - github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/spf13/cast v1.7.0 // indirect diff --git a/go.sum b/go.sum index 8fdeaf5..c1fbfb4 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/Masterminds/semver/v3 v3.3.0 h1:B8LGeaivUe71a5qox1ICM/JLl0NqZSW5CHyL+ github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/Masterminds/sprig/v3 v3.3.0 h1:mQh0Yrg1XPo6vjYXgtf5OtijNAKJRNcTdOOGZe3tPhs= github.com/Masterminds/sprig/v3 v3.3.0/go.mod h1:Zy1iXRYNqNLUolqCpL4uhk6SHUMAOSCzdgBfDb35Lz0= -github.com/cockroachdb/cockroach-go/v2 v2.2.0 h1:/5znzg5n373N/3ESjHF5SMLxiW4RKB05Ql//KWfeTFs= -github.com/cockroachdb/cockroach-go/v2 v2.2.0/go.mod h1:u3MiKYGupPPjkn3ozknpMUpxPaNLTFWAya419/zv6eI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -15,13 +13,9 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/georgysavva/scany/v2 v2.1.3 h1:Zd4zm/ej79Den7tBSU2kaTDPAH64suq4qlQdhiBeGds= -github.com/georgysavva/scany/v2 v2.1.3/go.mod h1:fqp9yHZzM/PFVa3/rYEC57VmDx+KDch0LoqrJzkvtos= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= -github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= -github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -42,8 +36,6 @@ github.com/jackc/tern/v2 v2.3.2/go.mod h1:cJYmwlpXLs3vBtbkfKdgoZL0G96mH56W+fugKx github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/kamikazechaser/common v1.0.0 h1:a/47O/TyQb417CUwsBDI7zlnJEnmhJz9czSPirpjlLg= github.com/kamikazechaser/common v1.0.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem4= github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= @@ -61,8 +53,6 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= -github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= @@ -70,16 +60,8 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1 github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= -github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= -github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= -github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= -github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= @@ -89,8 +71,6 @@ github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+D github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= diff --git a/internal/pub/pub.go b/internal/pub/pub.go deleted file mode 100644 index 2bf2129..0000000 --- a/internal/pub/pub.go +++ /dev/null @@ -1,89 +0,0 @@ -package pub - -import ( - "context" - "fmt" - "log/slog" - "time" - - "git.grassecon.net/urdt/ussd-data-connect/pkg/event" - "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" -) - -type ( - Pub interface { - Send(context.Context, event.Event) error - Close() - } - - JetStreamOpts struct { - Endpoint string - PersistDuration time.Duration - Logg *slog.Logger - } - - jetStreamPub struct { - js jetstream.JetStream - natsConn *nats.Conn - } -) - -const streamName string = "USSD_DATA" - -var streamSubjects = []string{ - "USSD_DATA.*", -} - -func NewJetStreamPub(o JetStreamOpts) (Pub, error) { - natsConn, err := nats.Connect(o.Endpoint) - if err != nil { - return nil, err - } - - js, err := jetstream.New(natsConn) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - js.CreateStream(ctx, jetstream.StreamConfig{ - Name: streamName, - Subjects: streamSubjects, - MaxAge: o.PersistDuration, - Storage: jetstream.FileStorage, - Duplicates: time.Minute, - }) - - return &jetStreamPub{ - natsConn: natsConn, - js: js, - }, nil -} - -func (p *jetStreamPub) Close() { - if p.natsConn != nil { - p.natsConn.Close() - } -} - -func (p *jetStreamPub) Send(ctx context.Context, payload event.Event) error { - data, err := payload.Serialize() - if err != nil { - return err - } - - _, err = p.js.Publish( - ctx, - fmt.Sprintf("%s.%d", streamName, payload.Type), - data, - jetstream.WithMsgID(fmt.Sprintf("%d:%s", payload.Type, payload.Value)), - ) - if err != nil { - return err - } - - return nil -} diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index 42a1479..e81d6f6 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -2,14 +2,12 @@ package syncer import ( "context" + "fmt" "log/slog" "time" - "git.grassecon.net/urdt/ussd-data-connect/internal/pub" "git.grassecon.net/urdt/ussd-data-connect/internal/store" "git.grassecon.net/urdt/ussd-data-connect/pkg/data" - "git.grassecon.net/urdt/ussd-data-connect/pkg/event" - "github.com/georgysavva/scany/v2/pgxscan" "github.com/jackc/pgx/v5" ) @@ -23,7 +21,6 @@ type ( } SyncerOpts struct { - Pub pub.Pub Logg *slog.Logger Store *store.Store } @@ -31,7 +28,6 @@ type ( Syncer struct { interval time.Duration done chan struct{} - pub pub.Pub logg *slog.Logger store *store.Store } @@ -41,7 +37,6 @@ func New(o SyncerOpts) *Syncer { return &Syncer{ done: make(chan struct{}), interval: syncInterval, - pub: o.Pub, logg: o.Logg, store: o.Store, } @@ -76,34 +71,44 @@ func (s *Syncer) Process(ctx context.Context) error { } defer rows.Close() - rs := pgxscan.NewRowScanner(rows) - var batchTimestamp *time.Time - for rows.Next() { - var row KVRow - if err := rs.Scan(&row); err != nil { - return err - } + kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) { + var kvRow KVRow + err := row.Scan(&kvRow.Key, &kvRow.Value, &kvRow.Updated) + return kvRow, err + }) + if err != nil { + return err + } + var batchTimestamp *time.Time + + for _, row := range kvRows { decodedKeyDataType, sessionID := data.DecodeKey(row.Key) - if _, ok := data.ValidDataTypeLookup[decodedKeyDataType]; ok { + column, exists := data.ValidDataTypeLookup[decodedKeyDataType] + if exists { if batchTimestamp == nil { batchTimestamp = &row.Updated } decodedValue := data.DecodeValue(row.Value) - s.logg.Debug("processing row", "batch_timestamp", batchTimestamp, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated) - if err := s.pub.Send(ctx, event.Event{ - Timestamp: row.Updated.Unix(), - Type: decodedKeyDataType, - Value: decodedValue, - }); err != nil { + s.logg.Debug("processing row", "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column) + query := fmt.Sprintf(` + INSERT INTO ussd_data (phone_number, %s) + VALUES ($1, $2) + ON CONFLICT (phone_number) DO UPDATE + SET %s = $2; + `, column, column) + _, err = tx.Exec( + ctx, + query, + sessionID, + decodedValue, + ) + if err != nil { return err } } } - if err := rows.Err(); err != nil { - return err - } if batchTimestamp != nil { _, err = tx.Exec(ctx, s.store.Queries.UpdateCursor, batchTimestamp) diff --git a/migrations/002_ussd_data.sql b/migrations/002_ussd_data.sql new file mode 100644 index 0000000..cf2763a --- /dev/null +++ b/migrations/002_ussd_data.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS ussd_data( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + phone_number TEXT UNIQUE, + blockchain_address TEXT UNIQUE, + first_name TEXT, + last_name TEXT, + yob INT, + location_name TEXT, + gender TEXT, + commodities TEXT, + active_voucher TEXT, + lang_code TEXT +); + diff --git a/pkg/data/type.go b/pkg/data/type.go index 79f3b94..9450ebd 100644 --- a/pkg/data/type.go +++ b/pkg/data/type.go @@ -15,13 +15,14 @@ const ( ACCOUNT_ACTIVE_VOUCHER = 17 ) -var ValidDataTypeLookup = map[uint16]struct{}{ - ACCOUNT_BLOCKCHAIN_ADDRESS: {}, - ACCOUNT_FIRST_NAME: {}, - ACCOUNT_LAST_NAME: {}, - ACCOUNT_YOB: {}, - ACCOUNT_LOCATION: {}, - ACCOUNT_GENDER: {}, - ACCOUNT_COMMODITIES: {}, - ACCOUNT_ACTIVE_VOUCHER: {}, +// ValidDataTypeLookup allows us to filter go-vise data types, additionally the value maps to the ussd_data coulmn +var ValidDataTypeLookup = map[uint16]string{ + ACCOUNT_BLOCKCHAIN_ADDRESS: "blockchain_address", + ACCOUNT_FIRST_NAME: "first_name", + ACCOUNT_LAST_NAME: "last_name", + ACCOUNT_YOB: "yob", + ACCOUNT_LOCATION: "location_name", + ACCOUNT_GENDER: "gender", + ACCOUNT_COMMODITIES: "commodities", + ACCOUNT_ACTIVE_VOUCHER: "active_voucher", }