Local config, nats state visibility in logs
This commit is contained in:
parent
a3e2293047
commit
4491f0155f
@ -1,7 +1,3 @@
|
||||
#Serve Http
|
||||
PORT=7123
|
||||
HOST=127.0.0.1
|
||||
|
||||
#PostgreSQL
|
||||
DB_HOST=localhost
|
||||
DB_USER=postgres
|
||||
@ -19,3 +15,7 @@ DB_TIMEZONE=Africa/Nairobi
|
||||
#BALANCE_URL=/api/account/status/
|
||||
#CUSTODIAL_URL_BASE=http://localhost:5003
|
||||
#DATA_URL_BASE=http://localhost:5006
|
||||
|
||||
#Data stream
|
||||
#NATS_JETSTREAM_URL=http://localhost:4222
|
||||
#NATS_JETSTREAM_CLIENT_NAME=omnom
|
||||
|
11
cmd/main.go
11
cmd/main.go
@ -9,18 +9,11 @@ import (
|
||||
|
||||
"git.defalsify.org/vise.git/db/mem"
|
||||
|
||||
"git.grassecon.net/urdt/ussd/config"
|
||||
"git.grassecon.net/urdt/ussd/initializers"
|
||||
"git.grassecon.net/term/config"
|
||||
"git.grassecon.net/term/event/nats"
|
||||
)
|
||||
|
||||
func init() {
|
||||
initializers.LoadEnvVariables()
|
||||
}
|
||||
|
||||
func main() {
|
||||
config.LoadConfig()
|
||||
|
||||
ctx := context.Background()
|
||||
db := mem.NewMemDb()
|
||||
err := db.Connect(ctx, "")
|
||||
@ -29,7 +22,7 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
n := nats.NewNatsSubscription(db)
|
||||
err = n.Connect(ctx, "localhost:4222")
|
||||
err = n.Connect(ctx, config.JetstreamURL)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Stream connect err: %v", err)
|
||||
os.Exit(1)
|
||||
|
22
config/config.go
Normal file
22
config/config.go
Normal file
@ -0,0 +1,22 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
urdtconfig "git.grassecon.net/urdt/ussd/config"
|
||||
"git.grassecon.net/urdt/ussd/initializers"
|
||||
)
|
||||
|
||||
var (
|
||||
JetstreamURL string
|
||||
JetstreamClientName string
|
||||
)
|
||||
|
||||
func init() {
|
||||
initializers.LoadEnvVariables()
|
||||
}
|
||||
|
||||
func LoadConfig() {
|
||||
urdtconfig.LoadConfig()
|
||||
|
||||
JetstreamURL = initializers.GetEnv("NATS_JETSTREAM_URL", "localhost:4222")
|
||||
JetstreamClientName = initializers.GetEnv("NATS_JETSTREAM_CLIENT_NAME", "omnom")
|
||||
}
|
@ -3,6 +3,7 @@ package nats
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
nats "github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
@ -11,6 +12,7 @@ import (
|
||||
"git.defalsify.org/vise.git/db"
|
||||
"git.grassecon.net/urdt/ussd/common"
|
||||
"git.grassecon.net/term/event"
|
||||
"git.grassecon.net/term/config"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -36,6 +38,19 @@ func NewNatsSubscription(store db.Db) *NatsSubscription {
|
||||
}
|
||||
}
|
||||
|
||||
func toServerInfo(conn *nats.Conn) string {
|
||||
return fmt.Sprintf("%s@%s (v%s)", conn.ConnectedServerName(), conn.ConnectedUrlRedacted(), conn.ConnectedServerVersion())
|
||||
}
|
||||
|
||||
func disconnectHandler(conn *nats.Conn, err error) {
|
||||
logg.Errorf("nats disconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "err", err)
|
||||
}
|
||||
|
||||
func reconnectHandler(conn *nats.Conn) {
|
||||
serverInfo := toServerInfo(conn)
|
||||
logg.Errorf("nats reconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "server", serverInfo)
|
||||
}
|
||||
|
||||
func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error {
|
||||
var err error
|
||||
|
||||
@ -43,19 +58,23 @@ func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.conn.SetDisconnectErrHandler(disconnectHandler)
|
||||
n.conn.SetReconnectHandler(reconnectHandler)
|
||||
n.js, err = jetstream.New(n.conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.cs, err = n.js.CreateConsumer(ctx, "TRACKER", jetstream.ConsumerConfig{
|
||||
Name: "omnom",
|
||||
Durable: "omnom",
|
||||
Name: config.JetstreamClientName,
|
||||
Durable: config.JetstreamClientName,
|
||||
FilterSubjects: []string{"TRACKER.*"},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
serverInfo := toServerInfo(n.conn)
|
||||
logg.DebugCtxf(ctx, "nats connected, starting consumer", "status", n.conn.Status(), "server", serverInfo)
|
||||
n.ctx = ctx
|
||||
n.cctx, err = n.cs.Consume(n.handleEvent)
|
||||
if err != nil {
|
||||
|
2
go.mod
2
go.mod
@ -4,7 +4,7 @@ go 1.23.2
|
||||
|
||||
require (
|
||||
git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed
|
||||
git.grassecon.net/urdt/ussd v0.0.0-20241103143426-0506a8c452f1
|
||||
git.grassecon.net/urdt/ussd v0.0.0-20241103151417-7189235bee57
|
||||
github.com/grassrootseconomics/eth-tracker v1.3.0-rc
|
||||
github.com/grassrootseconomics/ussd-data-service v0.0.0-20241003123429-4904b4438a3a
|
||||
github.com/nats-io/nats.go v1.37.0
|
||||
|
4
go.sum
4
go.sum
@ -1,7 +1,7 @@
|
||||
git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed h1:4TrsfbK7NKgsa7KjMPlnV/tjYTkAAXP5PWAZzUfzCdI=
|
||||
git.defalsify.org/vise.git v0.2.1-0.20241031204035-b588301738ed/go.mod h1:jyBMe1qTYUz3mmuoC9JQ/TvFeW0vTanCUcPu3H8p4Ck=
|
||||
git.grassecon.net/urdt/ussd v0.0.0-20241103143426-0506a8c452f1 h1:uTscFuyKCqWshcN+pgoJiE0jIVzRrUrgBfI/RsiM7qE=
|
||||
git.grassecon.net/urdt/ussd v0.0.0-20241103143426-0506a8c452f1/go.mod h1:ADB/wpwvI6umvYzGqpJGm/GYj8msxYGiczzWCCdXegs=
|
||||
git.grassecon.net/urdt/ussd v0.0.0-20241103151417-7189235bee57 h1:JKfbF1EY21ChL1ck/WTsdNS05orMiIgJ7LQETwqc3jA=
|
||||
git.grassecon.net/urdt/ussd v0.0.0-20241103151417-7189235bee57/go.mod h1:ADB/wpwvI6umvYzGqpJGm/GYj8msxYGiczzWCCdXegs=
|
||||
github.com/alecthomas/assert/v2 v2.2.2 h1:Z/iVC0xZfWTaFNE6bA3z07T86hd45Xe2eLt6WVy2bbk=
|
||||
github.com/alecthomas/assert/v2 v2.2.2/go.mod h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ=
|
||||
github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk=
|
||||
|
Loading…
Reference in New Issue
Block a user