From 44a9b5cd29a9a4bed350b3311caa253e00ca6b9e Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 31 Oct 2024 09:52:00 +0300 Subject: [PATCH] fix: jetstream switch to new API, discard buffer on close --- internal/sub/jetstream.go | 64 +++++++++++++++++------------------ internal/sub/sub.go | 6 ---- internal/telegram/telegram.go | 3 +- 3 files changed, 33 insertions(+), 40 deletions(-) delete mode 100644 internal/sub/sub.go diff --git a/internal/sub/jetstream.go b/internal/sub/jetstream.go index acbf82c..d1a315e 100644 --- a/internal/sub/jetstream.go +++ b/internal/sub/jetstream.go @@ -20,11 +20,11 @@ type ( } JetStreamSub struct { - jsConsumer jetstream.Consumer - logg *slog.Logger - natsConn *nats.Conn - router *router.Router - durableID string + jsIter jetstream.MessagesContext + logg *slog.Logger + natsConn *nats.Conn + router *router.Router + durableID string } ) @@ -33,7 +33,7 @@ const ( pullSubject = "TRACKER.*" ) -func NewJetStreamSub(o JetStreamOpts) (Sub, error) { +func NewJetStreamSub(o JetStreamOpts) (*JetStreamSub, error) { natsConn, err := nats.Connect(o.Endpoint) if err != nil { return nil, err @@ -53,52 +53,52 @@ func NewJetStreamSub(o JetStreamOpts) (Sub, error) { } consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ - Durable: o.JetStreamID, - AckPolicy: jetstream.AckExplicitPolicy, + Durable: o.JetStreamID, + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: pullSubject, }) if err != nil { return nil, err } o.Logg.Info("successfully connected to NATS server") + iter, err := consumer.Messages( + jetstream.WithMessagesErrOnMissingHeartbeat(false), + jetstream.PullMaxMessages(100), + ) + if err != nil { + return nil, err + } + return &JetStreamSub{ - jsConsumer: consumer, - router: o.Router, - natsConn: natsConn, - logg: o.Logg, - durableID: o.JetStreamID, + jsIter: iter, + router: o.Router, + natsConn: natsConn, + logg: o.Logg, + durableID: o.JetStreamID, }, nil } func (s *JetStreamSub) Close() { - if s.natsConn != nil { - s.natsConn.Close() - } + s.jsIter.Stop() } -func (s *JetStreamSub) Process() error { - iter, err := s.jsConsumer.Messages(jetstream.WithMessagesErrOnMissingHeartbeat(false)) - if err != nil { - return err - } - defer iter.Stop() - +func (s *JetStreamSub) Process() { for { - msg, err := iter.Next() + msg, err := s.jsIter.Next() if err != nil { - if errors.Is(err, nats.ErrTimeout) { - s.logg.Error("jetstream: iter fetch timeout") - continue - } else if errors.Is(err, nats.ErrConnectionClosed) { - return nil + if errors.Is(err, jetstream.ErrMsgIteratorClosed) { + s.logg.Debug("jetstream: iterator closed") + return } else { - return err + s.logg.Debug("jetstream: unknown iterator error", "error", err) + continue } } - s.logg.Info("processing nats message", "subject", msg.Subject()) + s.logg.Debug("processing nats message", "subject", msg.Subject()) if err := s.router.Handle(context.Background(), msg); err != nil { - s.logg.Error("router: error processing nats message", "error", err) + s.logg.Error("jetstream: router: error processing nats message", "error", err) } } } diff --git a/internal/sub/sub.go b/internal/sub/sub.go deleted file mode 100644 index 6bb5fd5..0000000 --- a/internal/sub/sub.go +++ /dev/null @@ -1,6 +0,0 @@ -package sub - -type Sub interface { - Process() error - Close() -} diff --git a/internal/telegram/telegram.go b/internal/telegram/telegram.go index ebdef1a..8d349f2 100644 --- a/internal/telegram/telegram.go +++ b/internal/telegram/telegram.go @@ -19,8 +19,7 @@ type ( ) const ( - NOTIFY_LOW_BALANCE_ON_GAS_FAUCET = ` - Gas faucet balance is low. Top is required soon!` + NOTIFY_LOW_BALANCE_ON_GAS_FAUCET = "Gas faucet balance is low. Top-up is required soon!" ) func New(o TelegramOpts) *Telegram {