eth-indexer/internal/sub/jetstream.go
Mohamed Sohail 2793d92343
Feat/consolidate functionality (#22)
* breaking: use event router, adsd telegram notifier, update indexer schema

* fix: early ack on handler not found

* fix: jetstream switch to new API, discard buffer on close

* remove telegram dependency, rely on log alters instead, which indirectly connect to telegram via uptrace

* feat (breaking): switch to self contained auto bootstrapper
2024-10-31 10:41:43 +03:00

105 lines
2.1 KiB
Go

package sub
import (
"context"
"errors"
"log/slog"
"time"
"github.com/grassrootseconomics/eth-indexer/pkg/router"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type (
JetStreamOpts struct {
Endpoint string
JetStreamID string
Logg *slog.Logger
Router *router.Router
}
JetStreamSub struct {
jsIter jetstream.MessagesContext
logg *slog.Logger
natsConn *nats.Conn
router *router.Router
durableID string
}
)
const (
pullStream = "TRACKER"
pullSubject = "TRACKER.*"
)
func NewJetStreamSub(o JetStreamOpts) (*JetStreamSub, error) {
natsConn, err := nats.Connect(o.Endpoint)
if err != nil {
return nil, err
}
js, err := jetstream.New(natsConn)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := js.Stream(ctx, pullStream)
if err != nil {
return nil, err
}
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: o.JetStreamID,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: pullSubject,
})
if err != nil {
return nil, err
}
o.Logg.Info("successfully connected to NATS server")
iter, err := consumer.Messages(
jetstream.WithMessagesErrOnMissingHeartbeat(false),
jetstream.PullMaxMessages(10),
)
if err != nil {
return nil, err
}
return &JetStreamSub{
jsIter: iter,
router: o.Router,
natsConn: natsConn,
logg: o.Logg,
durableID: o.JetStreamID,
}, nil
}
func (s *JetStreamSub) Close() {
s.jsIter.Stop()
}
func (s *JetStreamSub) Process() {
for {
msg, err := s.jsIter.Next()
if err != nil {
if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
s.logg.Debug("jetstream: iterator closed")
return
} else {
s.logg.Debug("jetstream: unknown iterator error", "error", err)
continue
}
}
s.logg.Debug("processing nats message", "subject", msg.Subject())
if err := s.router.Handle(context.Background(), msg); err != nil {
s.logg.Error("jetstream: router: error processing nats message", "error", err)
}
}
}