105 lines
2.1 KiB
Go
105 lines
2.1 KiB
Go
package sub
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/grassrootseconomics/eth-indexer/v2/pkg/router"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
)
|
|
|
|
type (
|
|
JetStreamOpts struct {
|
|
Endpoint string
|
|
JetStreamID string
|
|
Logg *slog.Logger
|
|
Router *router.Router
|
|
}
|
|
|
|
JetStreamSub struct {
|
|
jsIter jetstream.MessagesContext
|
|
logg *slog.Logger
|
|
natsConn *nats.Conn
|
|
router *router.Router
|
|
durableID string
|
|
}
|
|
)
|
|
|
|
const (
|
|
pullStream = "TRACKER"
|
|
pullSubject = "TRACKER.*"
|
|
)
|
|
|
|
func NewJetStreamSub(o JetStreamOpts) (*JetStreamSub, error) {
|
|
natsConn, err := nats.Connect(o.Endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
js, err := jetstream.New(natsConn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
stream, err := js.Stream(ctx, pullStream)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
|
Durable: o.JetStreamID,
|
|
AckPolicy: jetstream.AckExplicitPolicy,
|
|
FilterSubject: pullSubject,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
o.Logg.Info("successfully connected to NATS server")
|
|
|
|
iter, err := consumer.Messages(
|
|
jetstream.WithMessagesErrOnMissingHeartbeat(false),
|
|
jetstream.PullMaxMessages(10),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &JetStreamSub{
|
|
jsIter: iter,
|
|
router: o.Router,
|
|
natsConn: natsConn,
|
|
logg: o.Logg,
|
|
durableID: o.JetStreamID,
|
|
}, nil
|
|
}
|
|
|
|
func (s *JetStreamSub) Close() {
|
|
s.jsIter.Stop()
|
|
}
|
|
|
|
func (s *JetStreamSub) Process() {
|
|
for {
|
|
msg, err := s.jsIter.Next()
|
|
if err != nil {
|
|
if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
|
|
s.logg.Debug("jetstream: iterator closed")
|
|
return
|
|
} else {
|
|
s.logg.Debug("jetstream: unknown iterator error", "error", err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
s.logg.Debug("processing nats message", "subject", msg.Subject())
|
|
if err := s.router.Handle(context.Background(), msg); err != nil {
|
|
s.logg.Error("jetstream: router: error processing nats message", "error", err)
|
|
}
|
|
}
|
|
}
|