Compare commits

..

No commits in common. "master" and "v0.1.0-beta" have entirely different histories.

11 changed files with 165 additions and 68 deletions

1
.gitignore vendored
View File

@ -1 +0,0 @@
.vscode

View File

@ -11,6 +11,7 @@ import (
"syscall" "syscall"
"time" "time"
"git.grassecon.net/urdt/ussd-data-connect/internal/pub"
"git.grassecon.net/urdt/ussd-data-connect/internal/store" "git.grassecon.net/urdt/ussd-data-connect/internal/store"
"git.grassecon.net/urdt/ussd-data-connect/internal/syncer" "git.grassecon.net/urdt/ussd-data-connect/internal/syncer"
"git.grassecon.net/urdt/ussd-data-connect/internal/util" "git.grassecon.net/urdt/ussd-data-connect/internal/util"
@ -57,7 +58,18 @@ func main() {
os.Exit(1) os.Exit(1)
} }
pub, err := pub.NewJetStreamPub(pub.JetStreamOpts{
Endpoint: ko.MustString("jetstream.endpoint"),
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour,
Logg: lo,
})
if err != nil {
lo.Error("could not initialize jetstream publisher", "error", err)
os.Exit(1)
}
syncer := syncer.New(syncer.SyncerOpts{ syncer := syncer.New(syncer.SyncerOpts{
Pub: pub,
Logg: lo, Logg: lo,
Store: store, Store: store,
}) })
@ -71,7 +83,7 @@ func main() {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
// syncer.Run() syncer.Run()
}() }()
<-ctx.Done() <-ctx.Done()
@ -80,7 +92,7 @@ func main() {
wg.Add(1) wg.Add(1)
go func() { go func() {
// syncer.Stop() syncer.Stop()
defer wg.Done() defer wg.Done()
}() }()

5
go.mod
View File

@ -3,6 +3,7 @@ module git.grassecon.net/urdt/ussd-data-connect
go 1.23.3 go 1.23.3
require ( require (
github.com/georgysavva/scany/v2 v2.1.3
github.com/jackc/pgx/v5 v5.7.2 github.com/jackc/pgx/v5 v5.7.2
github.com/jackc/tern/v2 v2.3.2 github.com/jackc/tern/v2 v2.3.2
github.com/kamikazechaser/common v1.0.0 github.com/kamikazechaser/common v1.0.0
@ -11,6 +12,7 @@ require (
github.com/knadh/koanf/providers/env v1.0.0 github.com/knadh/koanf/providers/env v1.0.0
github.com/knadh/koanf/providers/file v1.1.2 github.com/knadh/koanf/providers/file v1.1.2
github.com/knadh/koanf/v2 v2.1.2 github.com/knadh/koanf/v2 v2.1.2
github.com/nats-io/nats.go v1.38.0
) )
require ( require (
@ -25,10 +27,13 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/lmittmann/tint v1.0.4 // indirect github.com/lmittmann/tint v1.0.4 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/nats-io/nkeys v0.4.9 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml v1.9.5 // indirect
github.com/shopspring/decimal v1.4.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect
github.com/spf13/cast v1.7.0 // indirect github.com/spf13/cast v1.7.0 // indirect

20
go.sum
View File

@ -6,6 +6,8 @@ github.com/Masterminds/semver/v3 v3.3.0 h1:B8LGeaivUe71a5qox1ICM/JLl0NqZSW5CHyL+
github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
github.com/Masterminds/sprig/v3 v3.3.0 h1:mQh0Yrg1XPo6vjYXgtf5OtijNAKJRNcTdOOGZe3tPhs= github.com/Masterminds/sprig/v3 v3.3.0 h1:mQh0Yrg1XPo6vjYXgtf5OtijNAKJRNcTdOOGZe3tPhs=
github.com/Masterminds/sprig/v3 v3.3.0/go.mod h1:Zy1iXRYNqNLUolqCpL4uhk6SHUMAOSCzdgBfDb35Lz0= github.com/Masterminds/sprig/v3 v3.3.0/go.mod h1:Zy1iXRYNqNLUolqCpL4uhk6SHUMAOSCzdgBfDb35Lz0=
github.com/cockroachdb/cockroach-go/v2 v2.2.0 h1:/5znzg5n373N/3ESjHF5SMLxiW4RKB05Ql//KWfeTFs=
github.com/cockroachdb/cockroach-go/v2 v2.2.0/go.mod h1:u3MiKYGupPPjkn3ozknpMUpxPaNLTFWAya419/zv6eI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -13,9 +15,13 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/georgysavva/scany/v2 v2.1.3 h1:Zd4zm/ej79Den7tBSU2kaTDPAH64suq4qlQdhiBeGds=
github.com/georgysavva/scany/v2 v2.1.3/go.mod h1:fqp9yHZzM/PFVa3/rYEC57VmDx+KDch0LoqrJzkvtos=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
@ -36,6 +42,8 @@ github.com/jackc/tern/v2 v2.3.2/go.mod h1:cJYmwlpXLs3vBtbkfKdgoZL0G96mH56W+fugKx
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/kamikazechaser/common v1.0.0 h1:a/47O/TyQb417CUwsBDI7zlnJEnmhJz9czSPirpjlLg= github.com/kamikazechaser/common v1.0.0 h1:a/47O/TyQb417CUwsBDI7zlnJEnmhJz9czSPirpjlLg=
github.com/kamikazechaser/common v1.0.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY= github.com/kamikazechaser/common v1.0.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem4= github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem4=
github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI= github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI=
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
@ -53,6 +61,8 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E=
github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc=
github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
@ -60,8 +70,16 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
@ -71,6 +89,8 @@ github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+D
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=

89
internal/pub/pub.go Normal file
View File

@ -0,0 +1,89 @@
package pub
import (
"context"
"fmt"
"log/slog"
"time"
"git.grassecon.net/urdt/ussd-data-connect/pkg/event"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type (
Pub interface {
Send(context.Context, event.Event) error
Close()
}
JetStreamOpts struct {
Endpoint string
PersistDuration time.Duration
Logg *slog.Logger
}
jetStreamPub struct {
js jetstream.JetStream
natsConn *nats.Conn
}
)
const streamName string = "USSD_DATA"
var streamSubjects = []string{
"USSD_DATA.*",
}
func NewJetStreamPub(o JetStreamOpts) (Pub, error) {
natsConn, err := nats.Connect(o.Endpoint)
if err != nil {
return nil, err
}
js, err := jetstream.New(natsConn)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: streamSubjects,
MaxAge: o.PersistDuration,
Storage: jetstream.FileStorage,
Duplicates: time.Minute,
})
return &jetStreamPub{
natsConn: natsConn,
js: js,
}, nil
}
func (p *jetStreamPub) Close() {
if p.natsConn != nil {
p.natsConn.Close()
}
}
func (p *jetStreamPub) Send(ctx context.Context, payload event.Event) error {
data, err := payload.Serialize()
if err != nil {
return err
}
_, err = p.js.Publish(
ctx,
fmt.Sprintf("%s.%d", streamName, payload.Type),
data,
jetstream.WithMsgID(fmt.Sprintf("%d:%s", payload.Type, payload.Value)),
)
if err != nil {
return err
}
return nil
}

View File

@ -2,26 +2,28 @@ package syncer
import ( import (
"context" "context"
"fmt"
"log/slog" "log/slog"
"time" "time"
"git.grassecon.net/urdt/ussd-data-connect/internal/pub"
"git.grassecon.net/urdt/ussd-data-connect/internal/store" "git.grassecon.net/urdt/ussd-data-connect/internal/store"
"git.grassecon.net/urdt/ussd-data-connect/pkg/data" "git.grassecon.net/urdt/ussd-data-connect/pkg/data"
"git.grassecon.net/urdt/ussd-data-connect/pkg/event"
"github.com/georgysavva/scany/v2/pgxscan"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
) )
const syncInterval = time.Hour * 1 const syncInterval = time.Second * 5
type ( type (
KVRow struct { KVRow struct {
ID int `db:"id"`
Key []byte `db:"key"` Key []byte `db:"key"`
Value []byte `db:"value"` Value []byte `db:"value"`
Updated time.Time `db:"updated"` Updated time.Time `db:"updated"`
} }
SyncerOpts struct { SyncerOpts struct {
Pub pub.Pub
Logg *slog.Logger Logg *slog.Logger
Store *store.Store Store *store.Store
} }
@ -29,6 +31,7 @@ type (
Syncer struct { Syncer struct {
interval time.Duration interval time.Duration
done chan struct{} done chan struct{}
pub pub.Pub
logg *slog.Logger logg *slog.Logger
store *store.Store store *store.Store
} }
@ -38,6 +41,7 @@ func New(o SyncerOpts) *Syncer {
return &Syncer{ return &Syncer{
done: make(chan struct{}), done: make(chan struct{}),
interval: syncInterval, interval: syncInterval,
pub: o.Pub,
logg: o.Logg, logg: o.Logg,
store: o.Store, store: o.Store,
} }
@ -72,45 +76,34 @@ func (s *Syncer) Process(ctx context.Context) error {
} }
defer rows.Close() defer rows.Close()
kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) { rs := pgxscan.NewRowScanner(rows)
var kvRow KVRow
err := row.Scan(&kvRow.ID, &kvRow.Key, &kvRow.Value, &kvRow.Updated)
return kvRow, err
})
if err != nil {
return err
}
var batchTimestamp *time.Time var batchTimestamp *time.Time
for rows.Next() {
var row KVRow
if err := rs.Scan(&row); err != nil {
return err
}
for _, row := range kvRows {
decodedKeyDataType, sessionID := data.DecodeKey(row.Key) decodedKeyDataType, sessionID := data.DecodeKey(row.Key)
column, exists := data.ValidDataTypeLookup[decodedKeyDataType] if _, ok := data.ValidDataTypeLookup[decodedKeyDataType]; ok {
if exists {
if batchTimestamp == nil { if batchTimestamp == nil {
batchTimestamp = &row.Updated batchTimestamp = &row.Updated
} }
decodedValue := data.DecodeValue(row.Value) decodedValue := data.DecodeValue(row.Value)
s.logg.Info("processing row", "id", row.ID) s.logg.Debug("processing row", "batch_timestamp", batchTimestamp, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated)
s.logg.Debug("processing row", "id", row.ID, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column) if err := s.pub.Send(ctx, event.Event{
query := fmt.Sprintf(` Timestamp: row.Updated.Unix(),
INSERT INTO ussd_data (phone_number, %s) Type: decodedKeyDataType,
VALUES ($1, $2) Value: decodedValue,
ON CONFLICT (phone_number) DO UPDATE }); err != nil {
SET %s = $2;
`, column, column)
_, err = tx.Exec(
ctx,
query,
sessionID,
decodedValue,
)
if err != nil {
return err return err
} }
} }
} }
if err := rows.Err(); err != nil {
return err
}
if batchTimestamp != nil { if batchTimestamp != nil {
_, err = tx.Exec(ctx, s.store.Queries.UpdateCursor, batchTimestamp) _, err = tx.Exec(ctx, s.store.Queries.UpdateCursor, batchTimestamp)

View File

@ -1,14 +0,0 @@
CREATE TABLE IF NOT EXISTS ussd_data(
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
phone_number TEXT UNIQUE,
blockchain_address TEXT UNIQUE,
first_name TEXT,
last_name TEXT,
yob INT,
location_name TEXT,
gender TEXT,
commodities TEXT,
active_voucher TEXT,
lang_code TEXT
);

View File

@ -8,13 +8,12 @@ const ()
// DecodeKey specifically only decodes user data keys stored as bytes into its respective session ID and data type // DecodeKey specifically only decodes user data keys stored as bytes into its respective session ID and data type
// TODO: Replace return data type with imported data types from the common package once lib-gdbm dependency is removed. // TODO: Replace return data type with imported data types from the common package once lib-gdbm dependency is removed.
// Note: 0x2e was added herehttps://holbrook.no/src/go-vise/file/db/db.go.html#l147, so we discard the last 3 bytes
func DecodeKey(key []byte) (uint16, string) { func DecodeKey(key []byte) (uint16, string) {
if key[0] != keyPrefix { if key[0] != keyPrefix {
return 0, "" return 0, ""
} }
return binary.BigEndian.Uint16(key[len(key)-2:]), string(key[1 : len(key)-3]) return binary.BigEndian.Uint16(key[len(key)-2:]), string(key[1 : len(key)-2])
} }
// DecodeValue returns the utf-8 string representation of the value stored in the storage backend // DecodeValue returns the utf-8 string representation of the value stored in the storage backend

View File

@ -21,10 +21,10 @@ func TestDecodeKey(t *testing.T) {
{ {
"blockchain_address", "blockchain_address",
args{ args{
keyBytesHex: "202b3235343731313737373733342e0001", keyBytesHex: "202b3235343731313030303132330001",
}, },
want{ want{
sessionID: "+254711777734", sessionID: "+254711000123",
dataType: ACCOUNT_BLOCKCHAIN_ADDRESS, dataType: ACCOUNT_BLOCKCHAIN_ADDRESS,
}, },
}, },

View File

@ -13,18 +13,15 @@ const (
ACCOUNT_GENDER = 7 ACCOUNT_GENDER = 7
ACCOUNT_COMMODITIES = 8 ACCOUNT_COMMODITIES = 8
ACCOUNT_ACTIVE_VOUCHER = 17 ACCOUNT_ACTIVE_VOUCHER = 17
ACCOUNT_LANG_CODE = 19
) )
// ValidDataTypeLookup allows us to filter go-vise data types, additionally the value maps to the ussd_data coulmn var ValidDataTypeLookup = map[uint16]struct{}{
var ValidDataTypeLookup = map[uint16]string{ ACCOUNT_BLOCKCHAIN_ADDRESS: {},
ACCOUNT_BLOCKCHAIN_ADDRESS: "blockchain_address", ACCOUNT_FIRST_NAME: {},
ACCOUNT_FIRST_NAME: "first_name", ACCOUNT_LAST_NAME: {},
ACCOUNT_LAST_NAME: "last_name", ACCOUNT_YOB: {},
ACCOUNT_YOB: "yob", ACCOUNT_LOCATION: {},
ACCOUNT_LOCATION: "location_name", ACCOUNT_GENDER: {},
ACCOUNT_GENDER: "gender", ACCOUNT_COMMODITIES: {},
ACCOUNT_COMMODITIES: "commodities", ACCOUNT_ACTIVE_VOUCHER: {},
ACCOUNT_ACTIVE_VOUCHER: "active_voucher",
ACCOUNT_LANG_CODE: "lang_code",
} }

View File

@ -1,11 +1,8 @@
--name: extract-entries --name: extract-entries
SELECT id, key, value, updated FROM kv_vise SELECT key, value, updated FROM kv_vise
WHERE updated > (SELECT last_sync FROM ussd_sync LIMIT 1) WHERE updated > (SELECT last_sync FROM ussd_sync LIMIT 1)
ORDER BY updated DESC ORDER BY updated DESC
--name: update-cursor --name: update-cursor
-- $1: timestamp -- $1: timestamp
UPDATE ussd_sync SET last_sync = ($1) UPDATE ussd_sync SET last_sync = ($1)
-- 202b3235343732343934323039370013
-- 202b3235343731313030303132330013