fix: early ack on handler not found

This commit is contained in:
Mohamed Sohail 2024-10-30 14:08:09 +03:00
parent b305d4814a
commit d8bb140f94
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
8 changed files with 41 additions and 16 deletions

View File

@ -6,7 +6,7 @@ import (
) )
func bootstrapRouter(handlerContainer *handler.Handler) *router.Router { func bootstrapRouter(handlerContainer *handler.Handler) *router.Router {
router := router.New() router := router.New(lo)
router.RegisterRoute( router.RegisterRoute(
"TRACKER.TOKEN_TRANSFER", "TRACKER.TOKEN_TRANSFER",

View File

@ -15,6 +15,10 @@ id = "celo-indexer-1"
rpc_endpoint = "http://localhost:8545" rpc_endpoint = "http://localhost:8545"
chainid = 1337 chainid = 1337
[telegram]
bot_token = ""
notification_channel = -1
[bootstrap] [bootstrap]
# This will bootstrap the cache on which addresses to track # This will bootstrap the cache on which addresses to track
ge_registries = ["0xE979a64D375F5D363d7cecF3c93B9aFD40Ba9f55"] ge_registries = ["0xE979a64D375F5D363d7cecF3c93B9aFD40Ba9f55"]

View File

@ -2,9 +2,11 @@ package handler
import ( import (
"context" "context"
"errors"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/grassrootseconomics/eth-tracker/pkg/event" "github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/grassrootseconomics/ethutils"
"github.com/lmittmann/w3" "github.com/lmittmann/w3"
"github.com/lmittmann/w3/module/eth" "github.com/lmittmann/w3/module/eth"
) )
@ -13,7 +15,7 @@ var (
nameGetter = w3.MustNewFunc("name()", "string") nameGetter = w3.MustNewFunc("name()", "string")
symbolGetter = w3.MustNewFunc("symbol()", "string") symbolGetter = w3.MustNewFunc("symbol()", "string")
decimalsGetter = w3.MustNewFunc("decimals()", "uint8") decimalsGetter = w3.MustNewFunc("decimals()", "uint8")
sinkAddressGetter = w3.MustNewFunc("sinkAddress", "address") sinkAddressGetter = w3.MustNewFunc("sinkAddress()", "address")
) )
func (h *Handler) AddToken(ctx context.Context, event event.Event) error { func (h *Handler) AddToken(ctx context.Context, event event.Event) error {
@ -26,6 +28,8 @@ func (h *Handler) AddToken(ctx context.Context, event event.Event) error {
tokenSymbol string tokenSymbol string
tokenDecimals uint8 tokenDecimals uint8
sinkAddress common.Address sinkAddress common.Address
batchErr w3.CallErrors
) )
contractAddress := w3.A(event.ContractAddress) contractAddress := w3.A(event.ContractAddress)
@ -35,11 +39,21 @@ func (h *Handler) AddToken(ctx context.Context, event event.Event) error {
eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName), eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName),
eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol), eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol),
eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals), eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals),
eth.CallFunc(contractAddress, sinkAddressGetter).Returns(&sinkAddress), ); errors.As(err, &batchErr) {
); err != nil { return batchErr
} else if err != nil {
return err return err
} }
if err := h.chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals),
); err != nil {
// This will most likely revert if the contract does not have a sinkAddress
// Instead of handling the error we just ignore it and set the value to 0
sinkAddress = ethutils.ZeroAddress
}
return h.store.InsertToken(ctx, event.ContractAddress, tokenName, tokenSymbol, tokenDecimals, sinkAddress.Hex()) return h.store.InsertToken(ctx, event.ContractAddress, tokenName, tokenSymbol, tokenDecimals, sinkAddress.Hex())
} }

View File

@ -11,7 +11,7 @@ import (
"github.com/lmittmann/w3/module/eth" "github.com/lmittmann/w3/module/eth"
) )
const balanceThreshold = 50 const balanceThreshold = 5
func (h *Handler) IndexFaucetGive(ctx context.Context, event event.Event) error { func (h *Handler) IndexFaucetGive(ctx context.Context, event event.Event) error {
return h.store.InsertFaucetGive(ctx, event) return h.store.InsertFaucetGive(ctx, event)
@ -28,7 +28,7 @@ func (h *Handler) FaucetHealthCheck(ctx context.Context, event event.Event) erro
} }
if balance.Cmp(new(big.Int).Mul(w3.BigEther, big.NewInt(balanceThreshold))) < 0 { if balance.Cmp(new(big.Int).Mul(w3.BigEther, big.NewInt(balanceThreshold))) < 0 {
return h.telegram.Notify(ctx, fmt.Sprintf("%s: %s", event.ContractAddress, telegram.NOTIFY_LOW_BALANCE_ON_GAS_FAUCET)) return h.telegram.Notify(ctx, fmt.Sprintf("%s:\n\n %s", event.ContractAddress, telegram.NOTIFY_LOW_BALANCE_ON_GAS_FAUCET))
} }
return nil return nil

View File

@ -96,6 +96,7 @@ func (s *JetStreamSub) Process() error {
} }
} }
s.logg.Info("processing nats message", "subject", msg.Subject())
if err := s.router.Handle(context.Background(), msg); err != nil { if err := s.router.Handle(context.Background(), msg); err != nil {
s.logg.Error("router: error processing nats message", "error", err) s.logg.Error("router: error processing nats message", "error", err)
} }

View File

@ -13,8 +13,8 @@ type (
} }
Telegram struct { Telegram struct {
client *tg.Client client *tg.Client
notificaationChannel int64 notificationChannel int64
} }
) )
@ -25,13 +25,13 @@ const (
func New(o TelegramOpts) *Telegram { func New(o TelegramOpts) *Telegram {
return &Telegram{ return &Telegram{
client: tg.New(o.BotToken, nil), client: tg.New(o.BotToken),
notificaationChannel: o.NotificationChannel, notificationChannel: o.NotificationChannel,
} }
} }
func (t *Telegram) Notify(ctx context.Context, message string) error { func (t *Telegram) Notify(ctx context.Context, message string) error {
_, err := t.client.SendMessage(tg.ChatID(t.notificaationChannel), message).Do(ctx) _, err := t.client.SendMessage(tg.ChatID(t.notificationChannel), message).Do(ctx)
return err return err
} }

View File

@ -65,7 +65,7 @@ CREATE TABLE IF NOT EXISTS pool_deposit (
CREATE TABLE IF NOT EXISTS tokens ( CREATE TABLE IF NOT EXISTS tokens (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
token_name TEXT NOT NULL, token_name TEXT NOT NULL,
token_symbol TEXT NOT NULL, token_symbol TEXT NOT NULL,
token_decimals INT NOT NULL, token_decimals INT NOT NULL,
@ -74,7 +74,7 @@ CREATE TABLE IF NOT EXISTS tokens (
CREATE TABLE IF NOT EXISTS pools ( CREATE TABLE IF NOT EXISTS pools (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
pool_name TEXT NOT NULL, pool_name TEXT NOT NULL,
pool_symbol TEXT NOT NULL pool_symbol TEXT NOT NULL
); );

View File

@ -19,9 +19,10 @@ type (
} }
) )
func New() *Router { func New(logg *slog.Logger) *Router {
return &Router{ return &Router{
handlers: make(map[string][]HandlerFunc), handlers: make(map[string][]HandlerFunc),
logg: logg,
} }
} }
@ -32,7 +33,8 @@ func (r *Router) RegisterRoute(subject string, handlerFunc ...HandlerFunc) {
func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error { func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error {
handlers, ok := r.handlers[msg.Subject()] handlers, ok := r.handlers[msg.Subject()]
if !ok { if !ok {
return nil r.logg.Debug("handler not found sending ack", "subject", msg.Subject())
return msg.Ack()
} }
var chainEvent event.Event var chainEvent event.Event
@ -49,7 +51,11 @@ func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error {
} }
if err := p.Wait(); err != nil { if err := p.Wait(); err != nil {
return msg.Nak() r.logg.Error("handler error sending nack", "subject", msg.Subject(), "error", err)
if err := msg.Nak(); err != nil {
return err
}
return err
} }
return msg.Ack() return msg.Ack()