diff --git a/cmd/service/router.go b/cmd/service/router.go index fc8757b..7ddc0e8 100644 --- a/cmd/service/router.go +++ b/cmd/service/router.go @@ -6,7 +6,7 @@ import ( ) func bootstrapRouter(handlerContainer *handler.Handler) *router.Router { - router := router.New() + router := router.New(lo) router.RegisterRoute( "TRACKER.TOKEN_TRANSFER", diff --git a/config.toml b/config.toml index 9ec9476..da3a28b 100644 --- a/config.toml +++ b/config.toml @@ -15,6 +15,10 @@ id = "celo-indexer-1" rpc_endpoint = "http://localhost:8545" chainid = 1337 +[telegram] +bot_token = "" +notification_channel = -1 + [bootstrap] # This will bootstrap the cache on which addresses to track ge_registries = ["0xE979a64D375F5D363d7cecF3c93B9aFD40Ba9f55"] diff --git a/internal/handler/add_contract.go b/internal/handler/add_contract.go index 19c50f1..618e95e 100644 --- a/internal/handler/add_contract.go +++ b/internal/handler/add_contract.go @@ -2,9 +2,11 @@ package handler import ( "context" + "errors" "github.com/ethereum/go-ethereum/common" "github.com/grassrootseconomics/eth-tracker/pkg/event" + "github.com/grassrootseconomics/ethutils" "github.com/lmittmann/w3" "github.com/lmittmann/w3/module/eth" ) @@ -13,7 +15,7 @@ var ( nameGetter = w3.MustNewFunc("name()", "string") symbolGetter = w3.MustNewFunc("symbol()", "string") decimalsGetter = w3.MustNewFunc("decimals()", "uint8") - sinkAddressGetter = w3.MustNewFunc("sinkAddress", "address") + sinkAddressGetter = w3.MustNewFunc("sinkAddress()", "address") ) func (h *Handler) AddToken(ctx context.Context, event event.Event) error { @@ -26,6 +28,8 @@ func (h *Handler) AddToken(ctx context.Context, event event.Event) error { tokenSymbol string tokenDecimals uint8 sinkAddress common.Address + + batchErr w3.CallErrors ) contractAddress := w3.A(event.ContractAddress) @@ -35,11 +39,21 @@ func (h *Handler) AddToken(ctx context.Context, event event.Event) error { eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName), eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol), eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals), - eth.CallFunc(contractAddress, sinkAddressGetter).Returns(&sinkAddress), - ); err != nil { + ); errors.As(err, &batchErr) { + return batchErr + } else if err != nil { return err } + if err := h.chainProvider.Client.CallCtx( + ctx, + eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals), + ); err != nil { + // This will most likely revert if the contract does not have a sinkAddress + // Instead of handling the error we just ignore it and set the value to 0 + sinkAddress = ethutils.ZeroAddress + } + return h.store.InsertToken(ctx, event.ContractAddress, tokenName, tokenSymbol, tokenDecimals, sinkAddress.Hex()) } diff --git a/internal/handler/faucet_give.go b/internal/handler/faucet_give.go index 5a136d4..d97fc69 100644 --- a/internal/handler/faucet_give.go +++ b/internal/handler/faucet_give.go @@ -11,7 +11,7 @@ import ( "github.com/lmittmann/w3/module/eth" ) -const balanceThreshold = 50 +const balanceThreshold = 5 func (h *Handler) IndexFaucetGive(ctx context.Context, event event.Event) error { return h.store.InsertFaucetGive(ctx, event) @@ -28,7 +28,7 @@ func (h *Handler) FaucetHealthCheck(ctx context.Context, event event.Event) erro } if balance.Cmp(new(big.Int).Mul(w3.BigEther, big.NewInt(balanceThreshold))) < 0 { - return h.telegram.Notify(ctx, fmt.Sprintf("%s: %s", event.ContractAddress, telegram.NOTIFY_LOW_BALANCE_ON_GAS_FAUCET)) + return h.telegram.Notify(ctx, fmt.Sprintf("%s:\n\n %s", event.ContractAddress, telegram.NOTIFY_LOW_BALANCE_ON_GAS_FAUCET)) } return nil diff --git a/internal/sub/jetstream.go b/internal/sub/jetstream.go index 126f2da..acbf82c 100644 --- a/internal/sub/jetstream.go +++ b/internal/sub/jetstream.go @@ -96,6 +96,7 @@ func (s *JetStreamSub) Process() error { } } + s.logg.Info("processing nats message", "subject", msg.Subject()) if err := s.router.Handle(context.Background(), msg); err != nil { s.logg.Error("router: error processing nats message", "error", err) } diff --git a/internal/telegram/telegram.go b/internal/telegram/telegram.go index 17515f4..ebdef1a 100644 --- a/internal/telegram/telegram.go +++ b/internal/telegram/telegram.go @@ -13,8 +13,8 @@ type ( } Telegram struct { - client *tg.Client - notificaationChannel int64 + client *tg.Client + notificationChannel int64 } ) @@ -25,13 +25,13 @@ const ( func New(o TelegramOpts) *Telegram { return &Telegram{ - client: tg.New(o.BotToken, nil), - notificaationChannel: o.NotificationChannel, + client: tg.New(o.BotToken), + notificationChannel: o.NotificationChannel, } } func (t *Telegram) Notify(ctx context.Context, message string) error { - _, err := t.client.SendMessage(tg.ChatID(t.notificaationChannel), message).Do(ctx) + _, err := t.client.SendMessage(tg.ChatID(t.notificationChannel), message).Do(ctx) return err } diff --git a/migrations/001_indexer_base.sql b/migrations/001_indexer_base.sql index 35a45e3..6dc5341 100644 --- a/migrations/001_indexer_base.sql +++ b/migrations/001_indexer_base.sql @@ -65,7 +65,7 @@ CREATE TABLE IF NOT EXISTS pool_deposit ( CREATE TABLE IF NOT EXISTS tokens ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', + contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', token_name TEXT NOT NULL, token_symbol TEXT NOT NULL, token_decimals INT NOT NULL, @@ -74,7 +74,7 @@ CREATE TABLE IF NOT EXISTS tokens ( CREATE TABLE IF NOT EXISTS pools ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', + contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', pool_name TEXT NOT NULL, pool_symbol TEXT NOT NULL ); \ No newline at end of file diff --git a/pkg/router/router.go b/pkg/router/router.go index bcdfeeb..b68b380 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -19,9 +19,10 @@ type ( } ) -func New() *Router { +func New(logg *slog.Logger) *Router { return &Router{ handlers: make(map[string][]HandlerFunc), + logg: logg, } } @@ -32,7 +33,8 @@ func (r *Router) RegisterRoute(subject string, handlerFunc ...HandlerFunc) { func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error { handlers, ok := r.handlers[msg.Subject()] if !ok { - return nil + r.logg.Debug("handler not found sending ack", "subject", msg.Subject()) + return msg.Ack() } var chainEvent event.Event @@ -49,7 +51,11 @@ func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error { } if err := p.Wait(); err != nil { - return msg.Nak() + r.logg.Error("handler error sending nack", "subject", msg.Subject(), "error", err) + if err := msg.Nak(); err != nil { + return err + } + return err } return msg.Ack()