eth-indexer/pkg/router/router.go
Mohamed Sohail 2793d92343
Feat/consolidate functionality (#22)
* breaking: use event router, adsd telegram notifier, update indexer schema

* fix: early ack on handler not found

* fix: jetstream switch to new API, discard buffer on close

* remove telegram dependency, rely on log alters instead, which indirectly connect to telegram via uptrace

* feat (breaking): switch to self contained auto bootstrapper
2024-10-31 10:41:43 +03:00

63 lines
1.2 KiB
Go

package router
import (
"context"
"encoding/json"
"log/slog"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/nats-io/nats.go/jetstream"
"github.com/sourcegraph/conc/pool"
)
type (
HandlerFunc func(context.Context, event.Event) error
Router struct {
logg *slog.Logger
handlers map[string][]HandlerFunc
}
)
func New(logg *slog.Logger) *Router {
return &Router{
handlers: make(map[string][]HandlerFunc),
logg: logg,
}
}
func (r *Router) RegisterRoute(subject string, handlerFunc ...HandlerFunc) {
r.handlers[subject] = handlerFunc
}
func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error {
handlers, ok := r.handlers[msg.Subject()]
if !ok {
r.logg.Debug("handler not found sending ack", "subject", msg.Subject())
return msg.Ack()
}
var chainEvent event.Event
if err := json.Unmarshal(msg.Data(), &chainEvent); err != nil {
return err
}
p := pool.New().WithErrors()
for _, handler := range handlers {
p.Go(func() error {
return handler(ctx, chainEvent)
})
}
if err := p.Wait(); err != nil {
r.logg.Error("handler error sending nack", "subject", msg.Subject(), "error", err)
if err := msg.Nak(); err != nil {
return err
}
return err
}
return msg.Ack()
}