eth-indexer/internal/sub/jetstream.go

146 lines
3.1 KiB
Go
Raw Normal View History

2024-04-23 13:33:05 +02:00
package sub
import (
"context"
"encoding/json"
"errors"
"log/slog"
"time"
2024-04-23 13:33:05 +02:00
"github.com/grassrootseconomics/celo-indexer/internal/store"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
2024-04-23 13:33:05 +02:00
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
2024-04-23 13:33:05 +02:00
)
type (
JetStreamOpts struct {
Store store.Store
Logg *slog.Logger
Endpoint string
JetStreamID string
2024-04-23 13:33:05 +02:00
}
JetStreamSub struct {
jsConsumer jetstream.Consumer
store store.Store
natsConn *nats.Conn
logg *slog.Logger
durableID string
2024-04-23 13:33:05 +02:00
}
)
const (
pullStream = "TRACKER"
pullSubject = "TRACKER.*"
)
2024-04-23 13:33:05 +02:00
func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
natsConn, err := nats.Connect(o.Endpoint)
if err != nil {
return nil, err
}
js, err := jetstream.New(natsConn)
2024-04-23 13:33:05 +02:00
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := js.Stream(ctx, pullStream)
if err != nil {
return nil, err
}
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: o.JetStreamID,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: pullStream,
2024-04-23 13:33:05 +02:00
})
if err != nil {
return nil, err
}
o.Logg.Info("successfully connected to NATS server")
2024-04-23 13:33:05 +02:00
return &JetStreamSub{
jsConsumer: consumer,
store: o.Store,
natsConn: natsConn,
logg: o.Logg,
durableID: o.JetStreamID,
2024-04-23 13:33:05 +02:00
}, nil
}
func (s *JetStreamSub) Close() {
if s.natsConn != nil {
s.natsConn.Close()
}
}
func (s *JetStreamSub) Process() error {
for {
events, err := s.jsConsumer.Fetch(100, jetstream.FetchMaxWait(1*time.Second))
2024-04-23 13:33:05 +02:00
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
continue
} else if errors.Is(err, nats.ErrConnectionClosed) {
return nil
} else {
return err
}
}
for msg := range events.Messages() {
if err := s.processEventHandler(context.Background(), msg.Subject(), msg.Data()); err != nil {
2024-04-23 13:33:05 +02:00
s.logg.Error("error processing nats message", "error", err)
msg.Nak()
} else {
msg.Ack()
}
}
}
}
func (s *JetStreamSub) processEventHandler(ctx context.Context, msgSubject string, msgData []byte) error {
var chainEvent event.Event
2024-04-23 13:33:05 +02:00
if err := json.Unmarshal(msgData, &chainEvent); err != nil {
2024-04-23 13:33:05 +02:00
return err
}
switch msgSubject {
2024-04-23 13:33:05 +02:00
case "TRACKER.TOKEN_TRANSFER":
if err := s.store.InsertTokenTransfer(ctx, chainEvent); err != nil {
return err
}
case "TRACKER.POOL_SWAP":
if err := s.store.InsertPoolSwap(ctx, chainEvent); err != nil {
return err
}
2024-06-03 05:00:28 +02:00
case "TRACKER.FAUCET_GIVE":
if err := s.store.InsertFaucetGive(ctx, chainEvent); err != nil {
return err
}
2024-04-23 13:33:05 +02:00
case "TRACKER.POOL_DEPOSIT":
if err := s.store.InsertPoolDeposit(ctx, chainEvent); err != nil {
return err
}
2024-06-03 05:00:28 +02:00
case "TRACKER.TOKEN_MINT":
if err := s.store.InsertTokenMint(ctx, chainEvent); err != nil {
return err
}
case "TRACKER.TOKEN_BURN":
if err := s.store.InsertTokenBurn(ctx, chainEvent); err != nil {
return err
}
case "TRACKER.QUOTER_PRICE_INDEX_UPDATED":
if err := s.store.InsertPriceQuoteUpdate(ctx, chainEvent); err != nil {
return err
}
2024-04-23 13:33:05 +02:00
}
return nil
}