ussd-data-connect/internal/pub/pub.go

90 lines
1.5 KiB
Go

package pub
import (
"context"
"fmt"
"log/slog"
"time"
"git.grassecon.net/urdt/ussd-data-connect/pkg/event"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type (
Pub interface {
Send(context.Context, event.Event) error
Close()
}
JetStreamOpts struct {
Endpoint string
PersistDuration time.Duration
Logg *slog.Logger
}
jetStreamPub struct {
js jetstream.JetStream
natsConn *nats.Conn
}
)
const streamName string = "USSD_DATA"
var streamSubjects = []string{
"USSD_DATA.*",
}
func NewJetStreamPub(o JetStreamOpts) (Pub, error) {
natsConn, err := nats.Connect(o.Endpoint)
if err != nil {
return nil, err
}
js, err := jetstream.New(natsConn)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: streamSubjects,
MaxAge: o.PersistDuration,
Storage: jetstream.FileStorage,
Duplicates: time.Minute,
})
return &jetStreamPub{
natsConn: natsConn,
js: js,
}, nil
}
func (p *jetStreamPub) Close() {
if p.natsConn != nil {
p.natsConn.Close()
}
}
func (p *jetStreamPub) Send(ctx context.Context, payload event.Event) error {
data, err := payload.Serialize()
if err != nil {
return err
}
_, err = p.js.Publish(
ctx,
fmt.Sprintf("%s.%d", streamName, payload.Type),
data,
jetstream.WithMsgID(fmt.Sprintf("%d:%s", payload.Type, payload.Value)),
)
if err != nil {
return err
}
return nil
}