From 2793d92343429cfcc4ad21d25809989d95571151 Mon Sep 17 00:00:00 2001 From: Mohamed Sohail Date: Thu, 31 Oct 2024 10:41:43 +0300 Subject: [PATCH] Feat/consolidate functionality (#22) * breaking: use event router, adsd telegram notifier, update indexer schema * fix: early ack on handler not found * fix: jetstream switch to new API, discard buffer on close * remove telegram dependency, rely on log alters instead, which indirectly connect to telegram via uptrace * feat (breaking): switch to self contained auto bootstrapper --- Dockerfile | 1 - Makefile | 7 +- cmd/bootstrap/main.go | 251 ----------------------- cmd/service/main.go | 21 +- cmd/service/router.go | 43 ++++ go.mod | 6 +- go.sum | 13 +- internal/api/api.go | 15 +- internal/cache/cache.go | 26 +++ internal/handler/add_contract.go | 81 ++++++++ internal/handler/faucet_give.go | 33 +++ internal/handler/handler.go | 48 ++--- internal/handler/pool_deposit.go | 11 + internal/handler/pool_swap.go | 11 + internal/handler/token_burn.go | 11 + internal/handler/token_mint.go | 11 + internal/handler/transfer.go | 11 + internal/store/pg.go | 50 +++-- internal/store/store.go | 3 +- internal/sub/jetstream.go | 73 ++++--- internal/sub/sub.go | 8 - migrations/001_indexer_base.sql | 33 ++- migrations/002_fix_contract_address.sql | 29 --- migrations/003_fix_tokens_constraint.sql | 1 - pkg/router/router.go | 62 ++++++ queries.sql | 34 ++- 26 files changed, 462 insertions(+), 431 deletions(-) delete mode 100644 cmd/bootstrap/main.go create mode 100644 cmd/service/router.go create mode 100644 internal/cache/cache.go create mode 100644 internal/handler/add_contract.go create mode 100644 internal/handler/faucet_give.go create mode 100644 internal/handler/pool_deposit.go create mode 100644 internal/handler/pool_swap.go create mode 100644 internal/handler/token_burn.go create mode 100644 internal/handler/token_mint.go create mode 100644 internal/handler/transfer.go delete mode 100644 internal/sub/sub.go delete mode 100644 migrations/002_fix_contract_address.sql delete mode 100644 migrations/003_fix_tokens_constraint.sql create mode 100644 pkg/router/router.go diff --git a/Dockerfile b/Dockerfile index e538bbc..46d5cb8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,6 @@ WORKDIR /build COPY . . RUN go mod download -RUN go build -o eth-indexer-bootstrap -ldflags="-X main.build=${BUILD} -s -w" cmd/bootstrap/main.go RUN go build -o eth-indexer -ldflags="-X main.build=${BUILD} -s -w" cmd/service/*.go FROM debian:bookworm-slim diff --git a/Makefile b/Makefile index 7cc494f..ac32730 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,4 @@ BIN := eth-indexer -BOOTSTRAP_BIN := eth-indexer-cache-bootstrap DB_FILE := tracker_db BUILD_CONF := CGO_ENABLED=1 GOOS=linux GOARCH=amd64 BUILD_COMMIT := $(shell git rev-parse --short HEAD 2> /dev/null) @@ -8,17 +7,13 @@ DEBUG := DEV=true .PHONY: build run run-bootstrap clean clean-debug clean: - rm ${BIN} ${BOOTSTRAP_BIN} + rm ${BIN} clean-db: rm ${DB_FILE} build: - ${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BOOTSTRAP_BIN} cmd/bootstrap/main.go ${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BIN} cmd/service/*.go -run-bootstrap: - ${BUILD_CONF} ${DEBUG} go run cmd/bootstrap/main.go - run: ${BUILD_CONF} ${DEBUG} go run cmd/service/*.go \ No newline at end of file diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go deleted file mode 100644 index d0c6736..0000000 --- a/cmd/bootstrap/main.go +++ /dev/null @@ -1,251 +0,0 @@ -package main - -import ( - "context" - "flag" - "log/slog" - "os" - "time" - - "github.com/grassrootseconomics/eth-indexer/internal/store" - "github.com/grassrootseconomics/eth-indexer/internal/util" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/knadh/koanf/v2" - - "github.com/ethereum/go-ethereum/common" - "github.com/grassrootseconomics/ethutils" - "github.com/lmittmann/w3" - "github.com/lmittmann/w3/module/eth" -) - -type TokenArgs struct { - ContractAddress string - TokenName string - TokenSymbol string - TokenDecimals uint8 -} - -const ( - insertTokenQuery = `INSERT INTO tokens( - contract_address, - token_name, - token_symbol, - token_decimals - ) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING` -) - -var ( - build = "dev" - - confFlag string - migrationsFolderFlag string - queriesFlag string - - lo *slog.Logger - ko *koanf.Koanf - - dbPool *pgxpool.Pool -) - -func init() { - flag.StringVar(&confFlag, "config", "config.toml", "Config file location") - flag.StringVar(&migrationsFolderFlag, "migrations", "migrations/", "Migrations folder location") - flag.StringVar(&queriesFlag, "queries", "queries.sql", "Queries file location") - flag.Parse() - - lo = util.InitLogger() - ko = util.InitConfig(lo, confFlag) - - lo.Info("starting GE indexer token bootstrapper", "build", build) -} - -func main() { - var ( - tokenRegistryGetter = w3.MustNewFunc("tokenRegistry()", "address") - nameGetter = w3.MustNewFunc("name()", "string") - symbolGetter = w3.MustNewFunc("symbol()", "string") - decimalsGetter = w3.MustNewFunc("decimals()", "uint8") - ) - - chainProvider := ethutils.NewProvider(ko.MustString("chain.rpc_endpoint"), ko.MustInt64("chain.chainid")) - - var err error - dbPool, err = newPgStore() - if err != nil { - lo.Error("could not initialize postgres store", "error", err) - os.Exit(1) - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) - defer cancel() - - for _, registry := range ko.MustStrings("bootstrap.ge_registries") { - registryMap, err := chainProvider.RegistryMap(ctx, ethutils.HexToAddress(registry)) - if err != nil { - lo.Error("could not fetch registry", "error", err) - os.Exit(1) - } - - if tokenIndex := registryMap[ethutils.TokenIndex]; tokenIndex != ethutils.ZeroAddress { - tokenIndexIter, err := chainProvider.NewBatchIterator(ctx, tokenIndex) - if err != nil { - lo.Error("could not create token index iter", "error", err) - os.Exit(1) - } - - for { - batch, err := tokenIndexIter.Next(ctx) - if err != nil { - lo.Error("error fetching next token index batch", "error", err) - os.Exit(1) - } - if batch == nil { - break - } - lo.Debug("index batch", "index", tokenIndex.Hex(), "size", len(batch)) - for _, address := range batch { - if address != ethutils.ZeroAddress { - var ( - tokenName string - tokenSymbol string - tokenDecimals uint8 - ) - - err := chainProvider.Client.CallCtx( - ctx, - eth.CallFunc(address, nameGetter).Returns(&tokenName), - eth.CallFunc(address, symbolGetter).Returns(&tokenSymbol), - eth.CallFunc(address, decimalsGetter).Returns(&tokenDecimals), - ) - if err != nil { - lo.Error("error fetching token details", "error", err) - os.Exit(1) - } - - if err := insertToken(ctx, TokenArgs{ - ContractAddress: address.Hex(), - TokenName: tokenName, - TokenSymbol: tokenSymbol, - TokenDecimals: tokenDecimals, - }); err != nil { - lo.Error("pg insert error", "error", err) - os.Exit(1) - } - } - } - } - } - - if poolIndex := registryMap[ethutils.PoolIndex]; poolIndex != ethutils.ZeroAddress { - poolIndexIter, err := chainProvider.NewBatchIterator(ctx, poolIndex) - if err != nil { - lo.Error("cache could create pool index iter", "error", err) - os.Exit(1) - } - - for { - batch, err := poolIndexIter.Next(ctx) - if err != nil { - lo.Error("error fetching next pool index batch", "error", err) - os.Exit(1) - } - if batch == nil { - break - } - lo.Debug("index batch", "index", poolIndex.Hex(), "size", len(batch)) - for _, address := range batch { - var poolTokenIndex common.Address - err := chainProvider.Client.CallCtx( - ctx, - eth.CallFunc(address, tokenRegistryGetter).Returns(&poolTokenIndex), - ) - if err != nil { - lo.Error("error fetching pool token index and/or quoter", "error", err) - os.Exit(1) - } - if poolTokenIndex != ethutils.ZeroAddress { - poolTokenIndexIter, err := chainProvider.NewBatchIterator(ctx, poolTokenIndex) - if err != nil { - lo.Error("error creating pool token index iter", "error", err) - os.Exit(1) - } - - for { - batch, err := poolTokenIndexIter.Next(ctx) - if err != nil { - lo.Error("error fetching next pool token index batch", "error", err) - os.Exit(1) - } - if batch == nil { - break - } - lo.Debug("index batch", "index", poolTokenIndex.Hex(), "size", len(batch)) - for _, address := range batch { - if address != ethutils.ZeroAddress { - var ( - tokenName string - tokenSymbol string - tokenDecimals uint8 - ) - - err := chainProvider.Client.CallCtx( - ctx, - eth.CallFunc(address, nameGetter).Returns(&tokenName), - eth.CallFunc(address, symbolGetter).Returns(&tokenSymbol), - eth.CallFunc(address, decimalsGetter).Returns(&tokenDecimals), - ) - if err != nil { - lo.Error("error fetching token details", "error", err) - os.Exit(1) - } - - if err := insertToken(ctx, TokenArgs{ - ContractAddress: address.Hex(), - TokenName: tokenName, - TokenSymbol: tokenSymbol, - TokenDecimals: tokenDecimals, - }); err != nil { - lo.Error("pg insert error", "error", err) - os.Exit(1) - } - } - } - } - } - } - } - } - } - lo.Info("tokens bootstrap complete") -} - -func newPgStore() (*pgxpool.Pool, error) { - store, err := store.NewPgStore(store.PgOpts{ - Logg: lo, - DSN: ko.MustString("postgres.dsn"), - MigrationsFolderPath: migrationsFolderFlag, - QueriesFolderPath: queriesFlag, - }) - if err != nil { - lo.Error("could not initialize postgres store", "error", err) - os.Exit(1) - } - - return store.Pool(), nil -} - -func insertToken(ctx context.Context, insertArgs TokenArgs) error { - _, err := dbPool.Exec( - ctx, - insertTokenQuery, - insertArgs.ContractAddress, - insertArgs.TokenName, - insertArgs.TokenSymbol, - insertArgs.TokenDecimals, - ) - if err != nil { - return err - } - - return nil -} diff --git a/cmd/service/main.go b/cmd/service/main.go index c29c20f..bcb4440 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -13,10 +13,12 @@ import ( "time" "github.com/grassrootseconomics/eth-indexer/internal/api" + "github.com/grassrootseconomics/eth-indexer/internal/cache" "github.com/grassrootseconomics/eth-indexer/internal/handler" "github.com/grassrootseconomics/eth-indexer/internal/store" "github.com/grassrootseconomics/eth-indexer/internal/sub" "github.com/grassrootseconomics/eth-indexer/internal/util" + "github.com/grassrootseconomics/ethutils" "github.com/knadh/koanf/v2" ) @@ -60,14 +62,25 @@ func main() { os.Exit(1) } - handler := handler.NewHandler(handler.HandlerOpts{ - Store: store, + cache := cache.New() + + chainProvider := ethutils.NewProvider( + ko.MustString("chain.rpc_endpoint"), + ko.MustInt64("chain.chainid"), + ) + + handlerContainer := handler.NewHandler(handler.HandlerOpts{ + Store: store, + Cache: cache, + ChainProvider: chainProvider, + Logg: lo, }) + router := bootstrapRouter(handlerContainer) + jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{ Logg: lo, - Store: store, - Handler: handler, + Router: router, Endpoint: ko.MustString("jetstream.endpoint"), JetStreamID: ko.MustString("jetstream.id"), }) diff --git a/cmd/service/router.go b/cmd/service/router.go new file mode 100644 index 0000000..7ddc0e8 --- /dev/null +++ b/cmd/service/router.go @@ -0,0 +1,43 @@ +package main + +import ( + "github.com/grassrootseconomics/eth-indexer/internal/handler" + "github.com/grassrootseconomics/eth-indexer/pkg/router" +) + +func bootstrapRouter(handlerContainer *handler.Handler) *router.Router { + router := router.New(lo) + + router.RegisterRoute( + "TRACKER.TOKEN_TRANSFER", + handlerContainer.IndexTransfer, + handlerContainer.AddToken, + ) + router.RegisterRoute( + "TRACKER.TOKEN_MINT", + handlerContainer.IndexTokenMint, + handlerContainer.AddToken, + ) + router.RegisterRoute( + "TRACKER.TOKEN_BURN", + handlerContainer.IndexTokenMint, + handlerContainer.AddToken, + ) + router.RegisterRoute( + "TRACKER.POOL_SWAP", + handlerContainer.IndexPoolSwap, + handlerContainer.AddPool, + ) + router.RegisterRoute( + "TRACKER.POOL_DEPOSIT", + handlerContainer.IndexPoolDeposit, + handlerContainer.AddPool, + ) + router.RegisterRoute( + "TRACKER.FAUCET_GIVE", + handlerContainer.IndexFaucetGive, + handlerContainer.FaucetHealthCheck, + ) + + return router +} diff --git a/go.mod b/go.mod index 517b2b6..d82587e 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.0 require ( github.com/VictoriaMetrics/metrics v1.35.1 github.com/ethereum/go-ethereum v1.14.8 + github.com/go-chi/chi/v5 v5.1.0 github.com/grassrootseconomics/eth-tracker v1.2.2-rc github.com/grassrootseconomics/ethutils v1.3.0 github.com/jackc/pgx/v5 v5.6.0 @@ -17,7 +18,8 @@ require ( github.com/knadh/koanf/v2 v2.1.1 github.com/lmittmann/w3 v0.17.1 github.com/nats-io/nats.go v1.37.0 - github.com/uptrace/bunrouter v1.0.22 + github.com/puzpuzpuz/xsync/v3 v3.4.0 + github.com/sourcegraph/conc v0.3.0 ) require ( @@ -63,6 +65,8 @@ require ( github.com/tklauser/numcpus v0.6.1 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/sync v0.8.0 // indirect diff --git a/go.sum b/go.sum index e668124..1627af9 100644 --- a/go.sum +++ b/go.sum @@ -47,6 +47,7 @@ github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c/go.mod h1:geZJ github.com/crate-crypto/go-kzg-4844 v1.0.0 h1:TsSgHwrkTKecKJ4kadtHi4b3xHW5dCFUDFnUp1TsawI= github.com/crate-crypto/go-kzg-4844 v1.0.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= @@ -67,6 +68,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= +github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= @@ -176,6 +179,8 @@ github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuI github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= +github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -184,6 +189,8 @@ github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1 github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -199,12 +206,14 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= -github.com/uptrace/bunrouter v1.0.22 h1:634bRGogHxjMaSqc5a3MjM/sisS/MkfXhWJ/WZXrktc= -github.com/uptrace/bunrouter v1.0.22/go.mod h1:O3jAcl+5qgnF+ejhgkmbceEk0E/mqaK+ADOocdNpY8M= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= diff --git a/internal/api/api.go b/internal/api/api.go index ea47162..29d774e 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -4,20 +4,19 @@ import ( "net/http" "github.com/VictoriaMetrics/metrics" - "github.com/uptrace/bunrouter" + "github.com/go-chi/chi/v5" ) -func New() *bunrouter.Router { - router := bunrouter.New() +func New() *chi.Mux { + r := chi.NewRouter() - router.GET("/metrics", metricsHandler()) + r.Get("/metrics", metricsHandler()) - return router + return r } -func metricsHandler() bunrouter.HandlerFunc { - return func(w http.ResponseWriter, _ bunrouter.Request) error { +func metricsHandler() http.HandlerFunc { + return func(w http.ResponseWriter, _ *http.Request) { metrics.WritePrometheus(w, true) - return nil } } diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 0000000..d22a203 --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,26 @@ +package cache + +import "github.com/puzpuzpuz/xsync/v3" + +type Cache struct { + provider *xsync.MapOf[string, bool] +} + +func New() *Cache { + return &Cache{ + provider: xsync.NewMapOf[string, bool](), + } +} + +func (c *Cache) Set(key string) { + c.provider.Store(key, true) +} + +func (c *Cache) Get(key string) bool { + v, _ := c.provider.Load(key) + return v +} + +func (c *Cache) Size() int { + return c.provider.Size() +} diff --git a/internal/handler/add_contract.go b/internal/handler/add_contract.go new file mode 100644 index 0000000..618e95e --- /dev/null +++ b/internal/handler/add_contract.go @@ -0,0 +1,81 @@ +package handler + +import ( + "context" + "errors" + + "github.com/ethereum/go-ethereum/common" + "github.com/grassrootseconomics/eth-tracker/pkg/event" + "github.com/grassrootseconomics/ethutils" + "github.com/lmittmann/w3" + "github.com/lmittmann/w3/module/eth" +) + +var ( + nameGetter = w3.MustNewFunc("name()", "string") + symbolGetter = w3.MustNewFunc("symbol()", "string") + decimalsGetter = w3.MustNewFunc("decimals()", "uint8") + sinkAddressGetter = w3.MustNewFunc("sinkAddress()", "address") +) + +func (h *Handler) AddToken(ctx context.Context, event event.Event) error { + if h.cache.Get(event.ContractAddress) { + return nil + } + + var ( + tokenName string + tokenSymbol string + tokenDecimals uint8 + sinkAddress common.Address + + batchErr w3.CallErrors + ) + + contractAddress := w3.A(event.ContractAddress) + + if err := h.chainProvider.Client.CallCtx( + ctx, + eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName), + eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol), + eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals), + ); errors.As(err, &batchErr) { + return batchErr + } else if err != nil { + return err + } + + if err := h.chainProvider.Client.CallCtx( + ctx, + eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals), + ); err != nil { + // This will most likely revert if the contract does not have a sinkAddress + // Instead of handling the error we just ignore it and set the value to 0 + sinkAddress = ethutils.ZeroAddress + } + + return h.store.InsertToken(ctx, event.ContractAddress, tokenName, tokenSymbol, tokenDecimals, sinkAddress.Hex()) +} + +func (h *Handler) AddPool(ctx context.Context, event event.Event) error { + if h.cache.Get(event.ContractAddress) { + return nil + } + + var ( + tokenName string + tokenSymbol string + ) + + contractAddress := w3.A(event.ContractAddress) + + if err := h.chainProvider.Client.CallCtx( + ctx, + eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName), + eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol), + ); err != nil { + return err + } + + return h.store.InsertPool(ctx, event.ContractAddress, tokenName, tokenSymbol) +} diff --git a/internal/handler/faucet_give.go b/internal/handler/faucet_give.go new file mode 100644 index 0000000..163643d --- /dev/null +++ b/internal/handler/faucet_give.go @@ -0,0 +1,33 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" + "github.com/lmittmann/w3" + "github.com/lmittmann/w3/module/eth" +) + +const balanceThreshold = 5 + +func (h *Handler) IndexFaucetGive(ctx context.Context, event event.Event) error { + return h.store.InsertFaucetGive(ctx, event) +} + +func (h *Handler) FaucetHealthCheck(ctx context.Context, event event.Event) error { + var balance *big.Int + + if err := h.chainProvider.Client.CallCtx( + ctx, + eth.Balance(w3.A(event.ContractAddress), nil).Returns(&balance), + ); err != nil { + return err + } + + if balance.Cmp(new(big.Int).Mul(w3.BigEther, big.NewInt(balanceThreshold))) < 0 { + h.logg.Warn("faucet balance is less than 5 ether", "faucet", event.ContractAddress) + } + + return nil +} diff --git a/internal/handler/handler.go b/internal/handler/handler.go index cec1ab2..348460f 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -1,52 +1,34 @@ package handler import ( - "context" - "encoding/json" + "log/slog" + "github.com/grassrootseconomics/eth-indexer/internal/cache" "github.com/grassrootseconomics/eth-indexer/internal/store" - "github.com/grassrootseconomics/eth-tracker/pkg/event" + "github.com/grassrootseconomics/ethutils" ) type ( HandlerOpts struct { - Store store.Store + Store store.Store + Cache *cache.Cache + ChainProvider *ethutils.Provider + Logg *slog.Logger } Handler struct { - store store.Store + store store.Store + cache *cache.Cache + chainProvider *ethutils.Provider + logg *slog.Logger } ) func NewHandler(o HandlerOpts) *Handler { return &Handler{ - store: o.Store, + store: o.Store, + cache: o.Cache, + chainProvider: o.ChainProvider, + logg: o.Logg, } } - -func (h *Handler) Handle(ctx context.Context, msgSubject string, msgData []byte) error { - var chainEvent event.Event - - if err := json.Unmarshal(msgData, &chainEvent); err != nil { - return err - } - - switch msgSubject { - case "TRACKER.TOKEN_TRANSFER": - return h.store.InsertTokenTransfer(ctx, chainEvent) - case "TRACKER.POOL_SWAP": - return h.store.InsertPoolSwap(ctx, chainEvent) - case "TRACKER.FAUCET_GIVE": - return h.store.InsertFaucetGive(ctx, chainEvent) - case "TRACKER.POOL_DEPOSIT": - return h.store.InsertPoolDeposit(ctx, chainEvent) - case "TRACKER.TOKEN_MINT": - return h.store.InsertTokenMint(ctx, chainEvent) - case "TRACKER.TOKEN_BURN": - return h.store.InsertTokenBurn(ctx, chainEvent) - case "TRACKER.QUOTER_PRICE_INDEX_UPDATED": - return h.store.InsertPriceQuoteUpdate(ctx, chainEvent) - } - - return nil -} diff --git a/internal/handler/pool_deposit.go b/internal/handler/pool_deposit.go new file mode 100644 index 0000000..50d4cc3 --- /dev/null +++ b/internal/handler/pool_deposit.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexPoolDeposit(ctx context.Context, event event.Event) error { + return h.store.InsertPoolDeposit(ctx, event) +} diff --git a/internal/handler/pool_swap.go b/internal/handler/pool_swap.go new file mode 100644 index 0000000..a19317b --- /dev/null +++ b/internal/handler/pool_swap.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexPoolSwap(ctx context.Context, event event.Event) error { + return h.store.InsertPoolSwap(ctx, event) +} diff --git a/internal/handler/token_burn.go b/internal/handler/token_burn.go new file mode 100644 index 0000000..ab15e6a --- /dev/null +++ b/internal/handler/token_burn.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexTokenBurn(ctx context.Context, event event.Event) error { + return h.store.InsertTokenBurn(ctx, event) +} diff --git a/internal/handler/token_mint.go b/internal/handler/token_mint.go new file mode 100644 index 0000000..f32fe9c --- /dev/null +++ b/internal/handler/token_mint.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexTokenMint(ctx context.Context, event event.Event) error { + return h.store.InsertTokenMint(ctx, event) +} diff --git a/internal/handler/transfer.go b/internal/handler/transfer.go new file mode 100644 index 0000000..323fbcf --- /dev/null +++ b/internal/handler/transfer.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexTransfer(ctx context.Context, event event.Event) error { + return h.store.InsertTokenTransfer(ctx, event) +} diff --git a/internal/store/pg.go b/internal/store/pg.go index be30992..820ffc2 100644 --- a/internal/store/pg.go +++ b/internal/store/pg.go @@ -29,14 +29,15 @@ type ( } queries struct { - InsertTx string `query:"insert-tx"` - InsertTokenTransfer string `query:"insert-token-transfer"` - InsertTokenMint string `query:"insert-token-mint"` - InsertTokenBurn string `query:"insert-token-burn"` - InsertFaucetGive string `query:"insert-faucet-give"` - InsertPoolSwap string `query:"insert-pool-swap"` - InsertPoolDeposit string `query:"insert-pool-deposit"` - InsertPriceQuoteUpdate string `query:"insert-price-quote-update"` + InsertTx string `query:"insert-tx"` + InsertTokenTransfer string `query:"insert-token-transfer"` + InsertTokenMint string `query:"insert-token-mint"` + InsertTokenBurn string `query:"insert-token-burn"` + InsertFaucetGive string `query:"insert-faucet-give"` + InsertPoolSwap string `query:"insert-pool-swap"` + InsertPoolDeposit string `query:"insert-pool-deposit"` + InsertToken string `query:"insert-token"` + InsertPool string `query:"insert-pool"` } ) @@ -198,20 +199,29 @@ func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) e }) } -func (pg *Pg) InsertPriceQuoteUpdate(ctx context.Context, eventPayload event.Event) error { +func (pg *Pg) InsertToken(ctx context.Context, contractAddress string, name string, symbol string, decimals uint8, sinkAddress string) error { return pg.executeTransaction(ctx, func(tx pgx.Tx) error { - txID, err := pg.insertTx(ctx, tx, eventPayload) - if err != nil { - return err - } - - _, err = tx.Exec( + _, err := tx.Exec( ctx, - pg.queries.InsertPriceQuoteUpdate, - txID, - eventPayload.Payload["token"].(string), - eventPayload.Payload["exchangeRate"].(string), - eventPayload.ContractAddress, + pg.queries.InsertToken, + contractAddress, + name, + symbol, + decimals, + sinkAddress, + ) + return err + }) +} + +func (pg *Pg) InsertPool(ctx context.Context, contractAddress string, name string, symbol string) error { + return pg.executeTransaction(ctx, func(tx pgx.Tx) error { + _, err := tx.Exec( + ctx, + pg.queries.InsertPool, + contractAddress, + name, + symbol, ) return err }) diff --git a/internal/store/store.go b/internal/store/store.go index fe57a3d..8beab2a 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -15,7 +15,8 @@ type ( InsertFaucetGive(context.Context, event.Event) error InsertPoolSwap(context.Context, event.Event) error InsertPoolDeposit(context.Context, event.Event) error - InsertPriceQuoteUpdate(context.Context, event.Event) error + InsertToken(context.Context, string, string, string, uint8, string) error + InsertPool(context.Context, string, string, string) error Pool() *pgxpool.Pool Close() } diff --git a/internal/sub/jetstream.go b/internal/sub/jetstream.go index 147156e..9644b34 100644 --- a/internal/sub/jetstream.go +++ b/internal/sub/jetstream.go @@ -6,28 +6,25 @@ import ( "log/slog" "time" - "github.com/grassrootseconomics/eth-indexer/internal/handler" - "github.com/grassrootseconomics/eth-indexer/internal/store" + "github.com/grassrootseconomics/eth-indexer/pkg/router" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) type ( JetStreamOpts struct { - Store store.Store - Logg *slog.Logger - Handler *handler.Handler Endpoint string JetStreamID string + Logg *slog.Logger + Router *router.Router } JetStreamSub struct { - jsConsumer jetstream.Consumer - store store.Store - handler *handler.Handler - natsConn *nats.Conn - logg *slog.Logger - durableID string + jsIter jetstream.MessagesContext + logg *slog.Logger + natsConn *nats.Conn + router *router.Router + durableID string } ) @@ -36,7 +33,7 @@ const ( pullSubject = "TRACKER.*" ) -func NewJetStreamSub(o JetStreamOpts) (Sub, error) { +func NewJetStreamSub(o JetStreamOpts) (*JetStreamSub, error) { natsConn, err := nats.Connect(o.Endpoint) if err != nil { return nil, err @@ -56,50 +53,52 @@ func NewJetStreamSub(o JetStreamOpts) (Sub, error) { } consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ - Durable: o.JetStreamID, - AckPolicy: jetstream.AckExplicitPolicy, + Durable: o.JetStreamID, + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: pullSubject, }) if err != nil { return nil, err } o.Logg.Info("successfully connected to NATS server") + iter, err := consumer.Messages( + jetstream.WithMessagesErrOnMissingHeartbeat(false), + jetstream.PullMaxMessages(10), + ) + if err != nil { + return nil, err + } + return &JetStreamSub{ - jsConsumer: consumer, - store: o.Store, - handler: o.Handler, - natsConn: natsConn, - logg: o.Logg, - durableID: o.JetStreamID, + jsIter: iter, + router: o.Router, + natsConn: natsConn, + logg: o.Logg, + durableID: o.JetStreamID, }, nil } func (s *JetStreamSub) Close() { - if s.natsConn != nil { - s.natsConn.Close() - } + s.jsIter.Stop() } -func (s *JetStreamSub) Process() error { +func (s *JetStreamSub) Process() { for { - events, err := s.jsConsumer.Fetch(100, jetstream.FetchMaxWait(1*time.Second)) + msg, err := s.jsIter.Next() if err != nil { - if errors.Is(err, nats.ErrTimeout) { - continue - } else if errors.Is(err, nats.ErrConnectionClosed) { - return nil + if errors.Is(err, jetstream.ErrMsgIteratorClosed) { + s.logg.Debug("jetstream: iterator closed") + return } else { - return err + s.logg.Debug("jetstream: unknown iterator error", "error", err) + continue } } - for msg := range events.Messages() { - if err := s.handler.Handle(context.Background(), msg.Subject(), msg.Data()); err != nil { - s.logg.Error("error processing nats message", "error", err) - msg.Nak() - } else { - msg.Ack() - } + s.logg.Debug("processing nats message", "subject", msg.Subject()) + if err := s.router.Handle(context.Background(), msg); err != nil { + s.logg.Error("jetstream: router: error processing nats message", "error", err) } } } diff --git a/internal/sub/sub.go b/internal/sub/sub.go deleted file mode 100644 index 18a6067..0000000 --- a/internal/sub/sub.go +++ /dev/null @@ -1,8 +0,0 @@ -package sub - -type ( - Sub interface { - Process() error - Close() - } -) diff --git a/migrations/001_indexer_base.sql b/migrations/001_indexer_base.sql index f490459..6dc5341 100644 --- a/migrations/001_indexer_base.sql +++ b/migrations/001_indexer_base.sql @@ -2,7 +2,6 @@ CREATE TABLE IF NOT EXISTS tx ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, tx_hash VARCHAR(66) NOT NULL UNIQUE, block_number INT NOT NULL, - contract_address VARCHAR(42) NOT NULL, date_block TIMESTAMP NOT NULL, success BOOLEAN NOT NULL ); @@ -12,6 +11,7 @@ CREATE TABLE IF NOT EXISTS token_transfer ( tx_id INT REFERENCES tx(id), sender_address VARCHAR(42) NOT NULL, recipient_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', transfer_value NUMERIC NOT NULL ); @@ -20,6 +20,7 @@ CREATE TABLE IF NOT EXISTS token_mint ( tx_id INT REFERENCES tx(id), minter_address VARCHAR(42) NOT NULL, recipient_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', mint_value NUMERIC NOT NULL ); @@ -27,6 +28,7 @@ CREATE TABLE IF NOT EXISTS token_burn ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, tx_id INT REFERENCES tx(id), burner_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', burn_value NUMERIC NOT NULL ); @@ -35,6 +37,7 @@ CREATE TABLE IF NOT EXISTS faucet_give ( tx_id INT REFERENCES tx(id), token_address VARCHAR(42) NOT NULL, recipient_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', give_value NUMERIC NOT NULL ); @@ -47,6 +50,7 @@ CREATE TABLE IF NOT EXISTS pool_swap ( token_out_address VARCHAR(42) NOT NULL, in_value NUMERIC NOT NULL, out_value NUMERIC NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', fee NUMERIC NOT NULL ); @@ -55,29 +59,22 @@ CREATE TABLE IF NOT EXISTS pool_deposit ( tx_id INT REFERENCES tx(id), initiator_address VARCHAR(42) NOT NULL, token_in_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', in_value NUMERIC NOT NULL ); -CREATE TABLE IF NOT EXISTS price_index_updates ( - id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - tx_id INT REFERENCES tx(id), - token VARCHAR(42) NOT NULL, - exchange_rate NUMERIC NOT NULL -); - -CREATE TABLE IF NOT EXISTS contracts ( - id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - contract_address VARCHAR(42) UNIQUE NOT NULL, - contract_description TEXT NOT NULL, - is_token BOOLEAN NOT NULL -); - CREATE TABLE IF NOT EXISTS tokens ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - contract_address VARCHAR(42) UNIQUE NOT NULL, + contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', token_name TEXT NOT NULL, token_symbol TEXT NOT NULL, token_decimals INT NOT NULL, - token_version TEXT NOT NULL, - token_type TEXT NOT NULL + sink_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000' +); + +CREATE TABLE IF NOT EXISTS pools ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', + pool_name TEXT NOT NULL, + pool_symbol TEXT NOT NULL ); \ No newline at end of file diff --git a/migrations/002_fix_contract_address.sql b/migrations/002_fix_contract_address.sql deleted file mode 100644 index 64f9fe1..0000000 --- a/migrations/002_fix_contract_address.sql +++ /dev/null @@ -1,29 +0,0 @@ -ALTER TABLE tx DROP COLUMN contract_address; - -ALTER TABLE token_transfer ADD COLUMN contract_address VARCHAR(42); -UPDATE token_transfer SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE token_transfer ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE token_mint ADD COLUMN contract_address VARCHAR(42); -UPDATE token_mint SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE token_mint ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE token_burn ADD COLUMN contract_address VARCHAR(42); -UPDATE token_burn SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE token_burn ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE faucet_give ADD COLUMN contract_address VARCHAR(42); -UPDATE faucet_give SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE faucet_give ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE pool_swap ADD COLUMN contract_address VARCHAR(42); -UPDATE pool_swap SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE pool_swap ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE pool_deposit ADD COLUMN contract_address VARCHAR(42); -UPDATE pool_deposit SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE pool_deposit ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE price_index_updates ADD COLUMN contract_address VARCHAR(42); -UPDATE price_index_updates SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE price_index_updates ALTER COLUMN contract_address SET NOT NULL; \ No newline at end of file diff --git a/migrations/003_fix_tokens_constraint.sql b/migrations/003_fix_tokens_constraint.sql deleted file mode 100644 index 4d838a0..0000000 --- a/migrations/003_fix_tokens_constraint.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE tokens ALTER token_version DROP NOT NULL, ALTER token_type DROP NOT NULL; \ No newline at end of file diff --git a/pkg/router/router.go b/pkg/router/router.go new file mode 100644 index 0000000..b68b380 --- /dev/null +++ b/pkg/router/router.go @@ -0,0 +1,62 @@ +package router + +import ( + "context" + "encoding/json" + "log/slog" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" + "github.com/nats-io/nats.go/jetstream" + "github.com/sourcegraph/conc/pool" +) + +type ( + HandlerFunc func(context.Context, event.Event) error + + Router struct { + logg *slog.Logger + handlers map[string][]HandlerFunc + } +) + +func New(logg *slog.Logger) *Router { + return &Router{ + handlers: make(map[string][]HandlerFunc), + logg: logg, + } +} + +func (r *Router) RegisterRoute(subject string, handlerFunc ...HandlerFunc) { + r.handlers[subject] = handlerFunc +} + +func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error { + handlers, ok := r.handlers[msg.Subject()] + if !ok { + r.logg.Debug("handler not found sending ack", "subject", msg.Subject()) + return msg.Ack() + } + + var chainEvent event.Event + if err := json.Unmarshal(msg.Data(), &chainEvent); err != nil { + return err + } + + p := pool.New().WithErrors() + + for _, handler := range handlers { + p.Go(func() error { + return handler(ctx, chainEvent) + }) + } + + if err := p.Wait(); err != nil { + r.logg.Error("handler error sending nack", "subject", msg.Subject(), "error", err) + if err := msg.Nak(); err != nil { + return err + } + return err + } + + return msg.Ack() +} diff --git a/queries.sql b/queries.sql index 2c7998d..ab21ed1 100644 --- a/queries.sql +++ b/queries.sql @@ -104,14 +104,26 @@ INSERT INTO pool_deposit( contract_address ) VALUES($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING ---name: insert-price-quote-update --- $1: tx_id --- $2: token --- $3: exchange_rate --- $4: contract_address -INSERT INTO price_index_updates( - tx_id, - token, - exchange_rate, - contract_address -) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING +--name: insert-token +-- $1: contract_address +-- $2: token_name +-- $3: token_symbol +-- $4: token_decimals +-- $5: sink_address +INSERT INTO tokens( + contract_address, + token_name, + token_symbol, + token_decimals, + sink_address +) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING + +--name: insert-pool +-- $1: contract_address +-- $2: pool_name +-- $3: pool_symbol +INSERT INTO tokens( + contract_address, + pool_name, + pool_symbol +) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING