term/event/nats/nats.go

107 lines
2.0 KiB
Go
Raw Normal View History

2024-10-23 22:01:10 +02:00
package nats
import (
"context"
"encoding/json"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
geEvent "github.com/grassrootseconomics/eth-tracker/pkg/event"
2024-11-03 01:34:28 +01:00
"git.defalsify.org/vise.git/logging"
"git.defalsify.org/vise.git/db"
"git.grassecon.net/urdt/ussd/common"
2024-10-23 22:01:10 +02:00
"git.grassecon.net/term/event"
)
var (
2024-11-03 01:34:28 +01:00
logg = logging.NewVanilla().WithDomain("term-nats")
2024-10-23 22:01:10 +02:00
)
type NatsSubscription struct {
2024-10-24 17:17:04 +02:00
event.Router
2024-10-23 22:01:10 +02:00
ctx context.Context
conn *nats.Conn
js jetstream.JetStream
cs jetstream.Consumer
cctx jetstream.ConsumeContext
}
func NewNatsSubscription(store db.Db) *NatsSubscription {
return &NatsSubscription{
Router: event.Router{
2024-11-02 17:08:05 +01:00
Store: &common.UserDataStore{
Db: store,
},
},
}
2024-10-23 22:01:10 +02:00
}
func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error {
var err error
n.conn, err = nats.Connect(connStr)
if err != nil {
return err
}
n.js, err = jetstream.New(n.conn)
if err != nil {
return err
}
2024-10-24 17:30:24 +02:00
n.cs, err = n.js.CreateConsumer(ctx, "TRACKER", jetstream.ConsumerConfig{
Name: "omnom",
Durable: "omnom",
FilterSubjects: []string{"TRACKER.*"},
2024-10-23 22:01:10 +02:00
})
if err != nil {
return err
}
n.ctx = ctx
n.cctx, err = n.cs.Consume(n.handleEvent)
if err != nil {
return err
}
return nil
}
func(n *NatsSubscription) Close() error {
n.cctx.Stop()
select {
case <-n.cctx.Closed():
n.conn.Close()
}
return nil
}
func fail(m jetstream.Msg) {
err := m.Nak()
if err != nil {
2024-11-03 01:34:28 +01:00
logg.Errorf("nats nak fail", "err", err)
2024-10-23 22:01:10 +02:00
}
}
func(n *NatsSubscription) handleEvent(m jetstream.Msg) {
var ev geEvent.Event
2024-11-03 01:34:28 +01:00
logg.DebugCtxf(n.ctx, "have msg", "err", m)
2024-10-23 22:01:10 +02:00
b := m.Data()
err := json.Unmarshal(b, &ev)
if err != nil {
2024-11-03 01:34:28 +01:00
logg.ErrorCtxf(n.ctx, "nats msg deserialize fail", "err", err)
2024-10-24 17:00:46 +02:00
//fail(m)
} else {
err = n.Route(n.ctx, &ev)
2024-10-24 17:00:46 +02:00
if err != nil {
2024-11-03 01:34:28 +01:00
logg.ErrorCtxf(n.ctx, "handler route fail", "err", err)
2024-10-24 17:30:24 +02:00
//fail(m)
2024-10-24 17:00:46 +02:00
}
2024-10-23 22:01:10 +02:00
}
2024-10-24 17:30:24 +02:00
err = m.Ack()
2024-10-23 22:01:10 +02:00
if err != nil {
2024-11-03 01:34:28 +01:00
logg.ErrorCtxf(n.ctx, "ack fail", "err", err)
2024-10-24 17:30:24 +02:00
panic("ack fail")
2024-10-23 22:01:10 +02:00
}
2024-11-03 01:34:28 +01:00
logg.DebugCtxf(n.ctx, "handle msg complete")
2024-10-23 22:01:10 +02:00
}