From 4f9a95dfebe72e936a7ff2a28f04265e66e7217e Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 2 Jan 2025 12:05:09 +0300 Subject: [PATCH] feat: working state syncer --- cmd/main.go | 99 +++++++++++++++++++++++ config.toml | 12 +++ go.mod | 37 +++++++++ go.sum | 108 +++++++++++++++++++++++++ internal/store/store.go | 121 ++++++++++++++++++++++++++++ internal/syncer/syncer.go | 104 ++++++++++++++++++++++++ internal/util/init.go | 53 ++++++++++++ migrations/001_init_sync_cursor.sql | 4 + pkg/data/type.go | 11 +++ queries.sql | 8 ++ 10 files changed, 557 insertions(+) create mode 100644 cmd/main.go create mode 100644 config.toml create mode 100644 internal/store/store.go create mode 100644 internal/syncer/syncer.go create mode 100644 internal/util/init.go create mode 100644 migrations/001_init_sync_cursor.sql create mode 100644 queries.sql diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..3466b9a --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,99 @@ +package main + +import ( + "context" + "errors" + "flag" + "log/slog" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "git.grassecon.net/urdt/ussd-data-connect/internal/store" + "git.grassecon.net/urdt/ussd-data-connect/internal/syncer" + "git.grassecon.net/urdt/ussd-data-connect/internal/util" + "github.com/knadh/koanf/v2" +) + +const defaultGracefulShutdownPeriod = time.Second * 20 + +var ( + build = "dev" + + confFlag string + migrationsFolderFlag string + queriesFlag string + + lo *slog.Logger + ko *koanf.Koanf +) + +func init() { + flag.StringVar(&confFlag, "config", "config.toml", "Config file location") + flag.StringVar(&migrationsFolderFlag, "migrations", "migrations/", "Migrations folder location") + flag.StringVar(&queriesFlag, "queries", "queries.sql", "Queries file location") + flag.Parse() + + lo = util.InitLogger() + ko = util.InitConfig(lo, confFlag) + + lo.Info("starting ussd sync", "build", build) +} + +func main() { + var wg sync.WaitGroup + ctx, stop := notifyShutdown() + + store, err := store.NewStore(store.StoreOpts{ + Logg: lo, + DSN: ko.MustString("postgres.dsn"), + MigrationsFolderPath: migrationsFolderFlag, + QueriesFolderPath: queriesFlag, + }) + if err != nil { + lo.Error("could not initialize postgres store", "error", err) + os.Exit(1) + } + + syncer := syncer.New(syncer.SyncerOpts{ + Logg: lo, + Store: store, + }) + + wg.Add(1) + go func() { + defer wg.Done() + syncer.Run() + }() + + <-ctx.Done() + lo.Info("shutdown signal received") + shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultGracefulShutdownPeriod) + + wg.Add(1) + go func() { + syncer.Stop() + defer wg.Done() + }() + + go func() { + wg.Wait() + stop() + cancel() + os.Exit(0) + }() + + <-shutdownCtx.Done() + if errors.Is(shutdownCtx.Err(), context.DeadlineExceeded) { + stop() + cancel() + lo.Error("graceful shutdown period exceeded, forcefully shutting down") + } + os.Exit(1) +} + +func notifyShutdown() (context.Context, context.CancelFunc) { + return signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT) +} diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..8b585e7 --- /dev/null +++ b/config.toml @@ -0,0 +1,12 @@ +[metrics] +go_process = true + +[api] +address = ":5031" + +[postgres] +dsn = "postgres://postgres:postgres@127.0.0.1:5432/urdt_ussd" + +[jetstream] +endpoint = "nats://127.0.0.1:4222" +id = "ussd-sync" diff --git a/go.mod b/go.mod index 9770634..c57c4c7 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,40 @@ module git.grassecon.net/urdt/ussd-data-connect go 1.23.3 + +require ( + github.com/georgysavva/scany/v2 v2.1.3 + github.com/jackc/pgx/v5 v5.7.2 + github.com/jackc/tern/v2 v2.3.2 + github.com/kamikazechaser/common v1.0.0 + github.com/knadh/goyesql/v2 v2.2.0 + github.com/knadh/koanf/parsers/toml v0.1.0 + github.com/knadh/koanf/providers/env v1.0.0 + github.com/knadh/koanf/providers/file v1.1.2 + github.com/knadh/koanf/v2 v2.1.2 +) + +require ( + dario.cat/mergo v1.0.1 // indirect + github.com/Masterminds/goutils v1.1.1 // indirect + github.com/Masterminds/semver/v3 v3.3.0 // indirect + github.com/Masterminds/sprig/v3 v3.3.0 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/huandu/xstrings v1.5.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/lmittmann/tint v1.0.4 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/pelletier/go-toml v1.9.5 // indirect + github.com/shopspring/decimal v1.4.0 // indirect + github.com/spf13/cast v1.7.0 // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect +) diff --git a/go.sum b/go.sum index e69de29..a1240ac 100644 --- a/go.sum +++ b/go.sum @@ -0,0 +1,108 @@ +dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= +dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= +github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= +github.com/Masterminds/semver/v3 v3.3.0 h1:B8LGeaivUe71a5qox1ICM/JLl0NqZSW5CHyL+hmvYS0= +github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= +github.com/Masterminds/sprig/v3 v3.3.0 h1:mQh0Yrg1XPo6vjYXgtf5OtijNAKJRNcTdOOGZe3tPhs= +github.com/Masterminds/sprig/v3 v3.3.0/go.mod h1:Zy1iXRYNqNLUolqCpL4uhk6SHUMAOSCzdgBfDb35Lz0= +github.com/cockroachdb/cockroach-go/v2 v2.2.0 h1:/5znzg5n373N/3ESjHF5SMLxiW4RKB05Ql//KWfeTFs= +github.com/cockroachdb/cockroach-go/v2 v2.2.0/go.mod h1:u3MiKYGupPPjkn3ozknpMUpxPaNLTFWAya419/zv6eI= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/georgysavva/scany/v2 v2.1.3 h1:Zd4zm/ej79Den7tBSU2kaTDPAH64suq4qlQdhiBeGds= +github.com/georgysavva/scany/v2 v2.1.3/go.mod h1:fqp9yHZzM/PFVa3/rYEC57VmDx+KDch0LoqrJzkvtos= +github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= +github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI= +github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= +github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jackc/tern/v2 v2.3.2 h1:/d3ML6jyQGDDtvKCGnHp8HY0swh86VcNvTMkC65+frk= +github.com/jackc/tern/v2 v2.3.2/go.mod h1:cJYmwlpXLs3vBtbkfKdgoZL0G96mH56W+fugKx+k3zw= +github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= +github.com/kamikazechaser/common v1.0.0 h1:a/47O/TyQb417CUwsBDI7zlnJEnmhJz9czSPirpjlLg= +github.com/kamikazechaser/common v1.0.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY= +github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem4= +github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/parsers/toml v0.1.0 h1:S2hLqS4TgWZYj4/7mI5m1CQQcWurxUz6ODgOub/6LCI= +github.com/knadh/koanf/parsers/toml v0.1.0/go.mod h1:yUprhq6eo3GbyVXFFMdbfZSo928ksS+uo0FFqNMnO18= +github.com/knadh/koanf/providers/env v1.0.0 h1:ufePaI9BnWH+ajuxGGiJ8pdTG0uLEUWC7/HDDPGLah0= +github.com/knadh/koanf/providers/env v1.0.0/go.mod h1:mzFyRZueYhb37oPmC1HAv/oGEEuyvJDA98r3XAa8Gak= +github.com/knadh/koanf/providers/file v1.1.2 h1:aCC36YGOgV5lTtAFz2qkgtWdeQsgfxUkxDOe+2nQY3w= +github.com/knadh/koanf/providers/file v1.1.2/go.mod h1:/faSBcv2mxPVjFrXck95qeoyoZ5myJ6uxN8OOVNJJCI= +github.com/knadh/koanf/v2 v2.1.2 h1:I2rtLRqXRy1p01m/utEtpZSSA6dcJbgGVuE27kW2PzQ= +github.com/knadh/koanf/v2 v2.1.2/go.mod h1:Gphfaen0q1Fc1HTgJgSTC4oRX9R2R5ErYMZJy8fLJBo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= +github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= +github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= +github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= +github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= +github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/store/store.go b/internal/store/store.go new file mode 100644 index 0000000..c6dfb7d --- /dev/null +++ b/internal/store/store.go @@ -0,0 +1,121 @@ +package store + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/tern/v2/migrate" + "github.com/knadh/goyesql/v2" +) + +type ( + Queries struct { + ExtractEntries string `query:"extract-entries"` + UpdateCursor string `query:"update-cursor"` + } + + StoreOpts struct { + Logg *slog.Logger + DSN string + MigrationsFolderPath string + QueriesFolderPath string + } + + Store struct { + Provider *pgxpool.Pool + Queries *Queries + } +) + +func NewStore(o StoreOpts) (*Store, error) { + parsedConfig, err := pgxpool.ParseConfig(o.DSN) + if err != nil { + return nil, err + } + + dbPool, err := pgxpool.NewWithConfig(context.Background(), parsedConfig) + if err != nil { + return nil, err + } + + queries, err := loadQueries(o.QueriesFolderPath) + if err != nil { + return nil, err + } + + if err := runMigrations(dbPool, o.MigrationsFolderPath); err != nil { + return nil, err + } + o.Logg.Info("migrations ran successfully") + + return &Store{ + Provider: dbPool, + Queries: queries, + }, nil +} + +func (s *Store) ExecuteTransaction(ctx context.Context, fn func(tx pgx.Tx) error) error { + tx, err := s.Provider.Begin(ctx) + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback(ctx) + } else { + tx.Commit(ctx) + } + }() + + if err = fn(tx); err != nil { + return err + } + + return nil +} + +func loadQueries(queriesPath string) (*Queries, error) { + parsedQueries, err := goyesql.ParseFile(queriesPath) + if err != nil { + return nil, err + } + + loadedQueries := &Queries{} + + if err := goyesql.ScanToStruct(loadedQueries, parsedQueries, nil); err != nil { + return nil, fmt.Errorf("failed to scan queries %v", err) + } + + return loadedQueries, nil +} + +func runMigrations(dbPool *pgxpool.Pool, migrationsPath string) error { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + conn, err := dbPool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + migrator, err := migrate.NewMigrator(ctx, conn.Conn(), "schema_version") + if err != nil { + return err + } + + if err := migrator.LoadMigrations(os.DirFS(migrationsPath)); err != nil { + return err + } + + if err := migrator.Migrate(ctx); err != nil { + return err + } + + return nil +} diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go new file mode 100644 index 0000000..9ca5511 --- /dev/null +++ b/internal/syncer/syncer.go @@ -0,0 +1,104 @@ +package syncer + +import ( + "context" + "log/slog" + "time" + + "git.grassecon.net/urdt/ussd-data-connect/internal/store" + "git.grassecon.net/urdt/ussd-data-connect/pkg/data" + "github.com/georgysavva/scany/v2/pgxscan" + "github.com/jackc/pgx/v5" +) + +const syncInterval = time.Second * 10 + +type ( + KVRow struct { + Key []byte `db:"key"` + Value []byte `db:"value"` + Updated time.Time `db:"updated"` + } + + SyncerOpts struct { + Logg *slog.Logger + Store *store.Store + } + + Syncer struct { + logg *slog.Logger + interval time.Duration + done chan struct{} + store *store.Store + } +) + +func New(o SyncerOpts) *Syncer { + return &Syncer{ + done: make(chan struct{}), + interval: syncInterval, + logg: o.Logg, + store: o.Store, + } +} + +func (s *Syncer) Run() { + ticker := time.NewTicker(s.interval) + s.logg.Info("syncer ticker started") + for { + select { + case <-s.done: + ticker.Stop() + return + case <-ticker.C: + s.logg.Debug("syncer tick") + if err := s.process(context.Background()); err != nil { + s.logg.Error("failed to process sync tick", "error", err) + } + } + } +} + +func (s *Syncer) Stop() { + close(s.done) +} + +func (s *Syncer) process(ctx context.Context) error { + return s.store.ExecuteTransaction(ctx, func(tx pgx.Tx) error { + rows, err := tx.Query(ctx, s.store.Queries.ExtractEntries) + if err != nil { + return err + } + defer rows.Close() + + rs := pgxscan.NewRowScanner(rows) + var batchTimestamp *time.Time + for rows.Next() { + var row KVRow + if err := rs.Scan(&row); err != nil { + return err + } + if batchTimestamp == nil { + batchTimestamp = &row.Updated + } + decodedKeyDataType, sessionID := data.DecodeKey(row.Key) + decodedValue := data.DecodeValue(row.Value) + + if _, ok := data.ValidDataTypeLookup[decodedKeyDataType]; ok { + s.logg.Debug("processing row", "batch_timestamp", batchTimestamp, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated) + } + } + if err := rows.Err(); err != nil { + return err + } + + if batchTimestamp != nil { + _, err = tx.Exec(ctx, s.store.Queries.UpdateCursor, batchTimestamp) + if err != nil { + return err + } + } + + return nil + }) +} diff --git a/internal/util/init.go b/internal/util/init.go new file mode 100644 index 0000000..e4c772e --- /dev/null +++ b/internal/util/init.go @@ -0,0 +1,53 @@ +package util + +import ( + "log/slog" + "os" + "strings" + + "github.com/kamikazechaser/common/logg" + "github.com/knadh/koanf/parsers/toml" + "github.com/knadh/koanf/providers/env" + "github.com/knadh/koanf/providers/file" + "github.com/knadh/koanf/v2" +) + +func InitLogger() *slog.Logger { + loggOpts := logg.LoggOpts{ + FormatType: logg.Logfmt, + LogLevel: slog.LevelInfo, + } + + if os.Getenv("DEBUG") != "" { + loggOpts.LogLevel = slog.LevelDebug + } + + if os.Getenv("DEV") != "" { + loggOpts.LogLevel = slog.LevelDebug + loggOpts.FormatType = logg.Human + } + + return logg.NewLogg(loggOpts) +} + +func InitConfig(lo *slog.Logger, confFilePath string) *koanf.Koanf { + var ( + ko = koanf.New(".") + ) + + confFile := file.Provider(confFilePath) + if err := ko.Load(confFile, toml.Parser()); err != nil { + lo.Error("could not parse configuration file", "error", err) + os.Exit(1) + } + + if err := ko.Load(env.Provider("SYNC_", ".", func(s string) string { + return strings.ReplaceAll(strings.ToLower( + strings.TrimPrefix(s, "SYNC_")), "__", ".") + }), nil); err != nil { + lo.Error("could not override config from env vars", "error", err) + os.Exit(1) + } + + return ko +} diff --git a/migrations/001_init_sync_cursor.sql b/migrations/001_init_sync_cursor.sql new file mode 100644 index 0000000..79a0617 --- /dev/null +++ b/migrations/001_init_sync_cursor.sql @@ -0,0 +1,4 @@ +CREATE TABLE IF NOT EXISTS ussd_sync( + last_sync TIMESTAMP +); +INSERT INTO ussd_sync (last_sync) VALUES ('2024-01-01'); diff --git a/pkg/data/type.go b/pkg/data/type.go index 49dd78e..79f3b94 100644 --- a/pkg/data/type.go +++ b/pkg/data/type.go @@ -14,3 +14,14 @@ const ( ACCOUNT_COMMODITIES = 8 ACCOUNT_ACTIVE_VOUCHER = 17 ) + +var ValidDataTypeLookup = map[uint16]struct{}{ + ACCOUNT_BLOCKCHAIN_ADDRESS: {}, + ACCOUNT_FIRST_NAME: {}, + ACCOUNT_LAST_NAME: {}, + ACCOUNT_YOB: {}, + ACCOUNT_LOCATION: {}, + ACCOUNT_GENDER: {}, + ACCOUNT_COMMODITIES: {}, + ACCOUNT_ACTIVE_VOUCHER: {}, +} diff --git a/queries.sql b/queries.sql new file mode 100644 index 0000000..4b438ae --- /dev/null +++ b/queries.sql @@ -0,0 +1,8 @@ +--name: extract-entries +SELECT key, value, updated FROM kv_vise +WHERE updated > (SELECT last_sync FROM ussd_sync LIMIT 1) +ORDER BY updated DESC + +--name: update-cursor +-- $1: timestamp +UPDATE ussd_sync SET last_sync = ($1) \ No newline at end of file