Use durable consumer
This commit is contained in:
parent
e29cb4b5a2
commit
51389476ca
@ -42,8 +42,10 @@ func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.cs, err = n.js.OrderedConsumer(ctx, "TRACKER", jetstream.OrderedConsumerConfig{
|
||||
//FilterSubjects: []string{"TRACKER.*"},
|
||||
n.cs, err = n.js.CreateConsumer(ctx, "TRACKER", jetstream.ConsumerConfig{
|
||||
Name: "omnom",
|
||||
Durable: "omnom",
|
||||
FilterSubjects: []string{"TRACKER.*"},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
@ -87,14 +89,13 @@ func(n *NatsSubscription) handleEvent(m jetstream.Msg) {
|
||||
err = n.Route(&ev)
|
||||
if err != nil {
|
||||
logg.Error("handler route fail", "err", err)
|
||||
fail(m)
|
||||
return
|
||||
//fail(m)
|
||||
}
|
||||
}
|
||||
err = m.Term()
|
||||
err = m.Ack()
|
||||
if err != nil {
|
||||
logg.Error("term fail", "err", err)
|
||||
panic("term fail")
|
||||
logg.Error("ack fail", "err", err)
|
||||
panic("ack fail")
|
||||
}
|
||||
logg.Debug("handle msg complete")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user