diff --git a/.env.example b/.env.example index df795b6..1a282c9 100644 --- a/.env.example +++ b/.env.example @@ -1,7 +1,3 @@ -#Serve Http -PORT=7123 -HOST=127.0.0.1 - #PostgreSQL DB_HOST=localhost DB_USER=postgres @@ -19,3 +15,7 @@ DB_TIMEZONE=Africa/Nairobi #BALANCE_URL=/api/account/status/ #CUSTODIAL_URL_BASE=http://localhost:5003 #DATA_URL_BASE=http://localhost:5006 + +#Data stream +#NATS_JETSTREAM_URL=http://localhost:4222 +#NATS_JETSTREAM_CLIENT_NAME=omnom diff --git a/cmd/main.go b/cmd/main.go index 35780a8..dead575 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,18 +9,11 @@ import ( "git.defalsify.org/vise.git/db/mem" - "git.grassecon.net/urdt/ussd/config" - "git.grassecon.net/urdt/ussd/initializers" + "git.grassecon.net/term/config" "git.grassecon.net/term/event/nats" ) -func init() { - initializers.LoadEnvVariables() -} - func main() { - config.LoadConfig() - ctx := context.Background() db := mem.NewMemDb() err := db.Connect(ctx, "") @@ -29,7 +22,7 @@ func main() { os.Exit(1) } n := nats.NewNatsSubscription(db) - err = n.Connect(ctx, "localhost:4222") + err = n.Connect(ctx, config.JetstreamURL) if err != nil { fmt.Fprintf(os.Stderr, "Stream connect err: %v", err) os.Exit(1) diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..88f9f33 --- /dev/null +++ b/config/config.go @@ -0,0 +1,22 @@ +package config + +import ( + urdtconfig "git.grassecon.net/urdt/ussd/config" + "git.grassecon.net/urdt/ussd/initializers" +) + +var ( + JetstreamURL string + JetstreamClientName string +) + +func init() { + initializers.LoadEnvVariables() +} + +func LoadConfig() { + urdtconfig.LoadConfig() + + JetstreamURL = initializers.GetEnv("NATS_JETSTREAM_URL", "localhost:4222") + JetstreamClientName = initializers.GetEnv("NATS_JETSTREAM_CLIENT_NAME", "omnom") +} diff --git a/event/nats/nats.go b/event/nats/nats.go index c13559d..af8f64a 100644 --- a/event/nats/nats.go +++ b/event/nats/nats.go @@ -3,6 +3,7 @@ package nats import ( "context" "encoding/json" + "fmt" nats "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" @@ -11,6 +12,7 @@ import ( "git.defalsify.org/vise.git/db" "git.grassecon.net/urdt/ussd/common" "git.grassecon.net/term/event" + "git.grassecon.net/term/config" ) var ( @@ -36,6 +38,19 @@ func NewNatsSubscription(store db.Db) *NatsSubscription { } } +func toServerInfo(conn *nats.Conn) string { + return fmt.Sprintf("%s@%s (v%s)", conn.ConnectedServerName(), conn.ConnectedUrlRedacted(), conn.ConnectedServerVersion()) +} + +func disconnectHandler(conn *nats.Conn, err error) { + logg.Errorf("nats disconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "err", err) +} + +func reconnectHandler(conn *nats.Conn) { + serverInfo := toServerInfo(conn) + logg.Errorf("nats reconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "server", serverInfo) +} + func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error { var err error @@ -43,19 +58,23 @@ func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error { if err != nil { return err } + n.conn.SetDisconnectErrHandler(disconnectHandler) + n.conn.SetReconnectHandler(reconnectHandler) n.js, err = jetstream.New(n.conn) if err != nil { return err } n.cs, err = n.js.CreateConsumer(ctx, "TRACKER", jetstream.ConsumerConfig{ - Name: "omnom", - Durable: "omnom", + Name: config.JetstreamClientName, + Durable: config.JetstreamClientName, FilterSubjects: []string{"TRACKER.*"}, }) if err != nil { return err } + serverInfo := toServerInfo(n.conn) + logg.DebugCtxf(ctx, "nats connected, starting consumer", "status", n.conn.Status(), "server", serverInfo) n.ctx = ctx n.cctx, err = n.cs.Consume(n.handleEvent) if err != nil { diff --git a/go.mod b/go.mod index 8b24fb8..6a8085b 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.2 require ( git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed - git.grassecon.net/urdt/ussd v0.0.0-20241103143426-0506a8c452f1 + git.grassecon.net/urdt/ussd v0.0.0-20241103151417-7189235bee57 github.com/grassrootseconomics/eth-tracker v1.3.0-rc github.com/grassrootseconomics/ussd-data-service v0.0.0-20241003123429-4904b4438a3a github.com/nats-io/nats.go v1.37.0 diff --git a/go.sum b/go.sum index 871205a..c4ca7cc 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed h1:4TrsfbK7NKgsa7KjMPlnV/tjYTkAAXP5PWAZzUfzCdI= git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed/go.mod h1:jyBMe1qTYUz3mmuoC9JQ/TvFeW0vTanCUcPu3H8p4Ck= -git.grassecon.net/urdt/ussd v0.0.0-20241103143426-0506a8c452f1 h1:uTscFuyKCqWshcN+pgoJiE0jIVzRrUrgBfI/RsiM7qE= -git.grassecon.net/urdt/ussd v0.0.0-20241103143426-0506a8c452f1/go.mod h1:ADB/wpwvI6umvYzGqpJGm/GYj8msxYGiczzWCCdXegs= +git.grassecon.net/urdt/ussd v0.0.0-20241103151417-7189235bee57 h1:JKfbF1EY21ChL1ck/WTsdNS05orMiIgJ7LQETwqc3jA= +git.grassecon.net/urdt/ussd v0.0.0-20241103151417-7189235bee57/go.mod h1:ADB/wpwvI6umvYzGqpJGm/GYj8msxYGiczzWCCdXegs= github.com/alecthomas/assert/v2 v2.2.2 h1:Z/iVC0xZfWTaFNE6bA3z07T86hd45Xe2eLt6WVy2bbk= github.com/alecthomas/assert/v2 v2.2.2/go.mod h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ= github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk=