fix: jetstream switch to new API, discard buffer on close

This commit is contained in:
Mohamed Sohail 2024-10-31 09:52:00 +03:00
parent d8bb140f94
commit 44a9b5cd29
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
3 changed files with 33 additions and 40 deletions

View File

@ -20,7 +20,7 @@ type (
}
JetStreamSub struct {
jsConsumer jetstream.Consumer
jsIter jetstream.MessagesContext
logg *slog.Logger
natsConn *nats.Conn
router *router.Router
@ -33,7 +33,7 @@ const (
pullSubject = "TRACKER.*"
)
func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
func NewJetStreamSub(o JetStreamOpts) (*JetStreamSub, error) {
natsConn, err := nats.Connect(o.Endpoint)
if err != nil {
return nil, err
@ -55,14 +55,23 @@ func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: o.JetStreamID,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: pullSubject,
})
if err != nil {
return nil, err
}
o.Logg.Info("successfully connected to NATS server")
iter, err := consumer.Messages(
jetstream.WithMessagesErrOnMissingHeartbeat(false),
jetstream.PullMaxMessages(100),
)
if err != nil {
return nil, err
}
return &JetStreamSub{
jsConsumer: consumer,
jsIter: iter,
router: o.Router,
natsConn: natsConn,
logg: o.Logg,
@ -71,34 +80,25 @@ func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
}
func (s *JetStreamSub) Close() {
if s.natsConn != nil {
s.natsConn.Close()
}
s.jsIter.Stop()
}
func (s *JetStreamSub) Process() error {
iter, err := s.jsConsumer.Messages(jetstream.WithMessagesErrOnMissingHeartbeat(false))
if err != nil {
return err
}
defer iter.Stop()
func (s *JetStreamSub) Process() {
for {
msg, err := iter.Next()
msg, err := s.jsIter.Next()
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
s.logg.Error("jetstream: iter fetch timeout")
continue
} else if errors.Is(err, nats.ErrConnectionClosed) {
return nil
if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
s.logg.Debug("jetstream: iterator closed")
return
} else {
return err
s.logg.Debug("jetstream: unknown iterator error", "error", err)
continue
}
}
s.logg.Info("processing nats message", "subject", msg.Subject())
s.logg.Debug("processing nats message", "subject", msg.Subject())
if err := s.router.Handle(context.Background(), msg); err != nil {
s.logg.Error("router: error processing nats message", "error", err)
s.logg.Error("jetstream: router: error processing nats message", "error", err)
}
}
}

View File

@ -1,6 +0,0 @@
package sub
type Sub interface {
Process() error
Close()
}

View File

@ -19,8 +19,7 @@ type (
)
const (
NOTIFY_LOW_BALANCE_ON_GAS_FAUCET = `
Gas faucet balance is low. Top is required soon!`
NOTIFY_LOW_BALANCE_ON_GAS_FAUCET = "Gas faucet balance is low. Top-up is required soon!"
)
func New(o TelegramOpts) *Telegram {