Feat/consolidate functionality (#22)

* breaking: use event router, adsd telegram notifier, update indexer schema

* fix: early ack on handler not found

* fix: jetstream switch to new API, discard buffer on close

* remove telegram dependency, rely on log alters instead, which indirectly connect to telegram via uptrace

* feat (breaking): switch to self contained auto bootstrapper
This commit is contained in:
Mohamed Sohail 2024-10-31 10:41:43 +03:00 committed by GitHub
parent 1acbaa0058
commit 2793d92343
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 462 additions and 431 deletions

View File

@ -11,7 +11,6 @@ WORKDIR /build
COPY . . COPY . .
RUN go mod download RUN go mod download
RUN go build -o eth-indexer-bootstrap -ldflags="-X main.build=${BUILD} -s -w" cmd/bootstrap/main.go
RUN go build -o eth-indexer -ldflags="-X main.build=${BUILD} -s -w" cmd/service/*.go RUN go build -o eth-indexer -ldflags="-X main.build=${BUILD} -s -w" cmd/service/*.go
FROM debian:bookworm-slim FROM debian:bookworm-slim

View File

@ -1,5 +1,4 @@
BIN := eth-indexer BIN := eth-indexer
BOOTSTRAP_BIN := eth-indexer-cache-bootstrap
DB_FILE := tracker_db DB_FILE := tracker_db
BUILD_CONF := CGO_ENABLED=1 GOOS=linux GOARCH=amd64 BUILD_CONF := CGO_ENABLED=1 GOOS=linux GOARCH=amd64
BUILD_COMMIT := $(shell git rev-parse --short HEAD 2> /dev/null) BUILD_COMMIT := $(shell git rev-parse --short HEAD 2> /dev/null)
@ -8,17 +7,13 @@ DEBUG := DEV=true
.PHONY: build run run-bootstrap clean clean-debug .PHONY: build run run-bootstrap clean clean-debug
clean: clean:
rm ${BIN} ${BOOTSTRAP_BIN} rm ${BIN}
clean-db: clean-db:
rm ${DB_FILE} rm ${DB_FILE}
build: build:
${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BOOTSTRAP_BIN} cmd/bootstrap/main.go
${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BIN} cmd/service/*.go ${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BIN} cmd/service/*.go
run-bootstrap:
${BUILD_CONF} ${DEBUG} go run cmd/bootstrap/main.go
run: run:
${BUILD_CONF} ${DEBUG} go run cmd/service/*.go ${BUILD_CONF} ${DEBUG} go run cmd/service/*.go

View File

@ -1,251 +0,0 @@
package main
import (
"context"
"flag"
"log/slog"
"os"
"time"
"github.com/grassrootseconomics/eth-indexer/internal/store"
"github.com/grassrootseconomics/eth-indexer/internal/util"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/knadh/koanf/v2"
"github.com/ethereum/go-ethereum/common"
"github.com/grassrootseconomics/ethutils"
"github.com/lmittmann/w3"
"github.com/lmittmann/w3/module/eth"
)
type TokenArgs struct {
ContractAddress string
TokenName string
TokenSymbol string
TokenDecimals uint8
}
const (
insertTokenQuery = `INSERT INTO tokens(
contract_address,
token_name,
token_symbol,
token_decimals
) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING`
)
var (
build = "dev"
confFlag string
migrationsFolderFlag string
queriesFlag string
lo *slog.Logger
ko *koanf.Koanf
dbPool *pgxpool.Pool
)
func init() {
flag.StringVar(&confFlag, "config", "config.toml", "Config file location")
flag.StringVar(&migrationsFolderFlag, "migrations", "migrations/", "Migrations folder location")
flag.StringVar(&queriesFlag, "queries", "queries.sql", "Queries file location")
flag.Parse()
lo = util.InitLogger()
ko = util.InitConfig(lo, confFlag)
lo.Info("starting GE indexer token bootstrapper", "build", build)
}
func main() {
var (
tokenRegistryGetter = w3.MustNewFunc("tokenRegistry()", "address")
nameGetter = w3.MustNewFunc("name()", "string")
symbolGetter = w3.MustNewFunc("symbol()", "string")
decimalsGetter = w3.MustNewFunc("decimals()", "uint8")
)
chainProvider := ethutils.NewProvider(ko.MustString("chain.rpc_endpoint"), ko.MustInt64("chain.chainid"))
var err error
dbPool, err = newPgStore()
if err != nil {
lo.Error("could not initialize postgres store", "error", err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
for _, registry := range ko.MustStrings("bootstrap.ge_registries") {
registryMap, err := chainProvider.RegistryMap(ctx, ethutils.HexToAddress(registry))
if err != nil {
lo.Error("could not fetch registry", "error", err)
os.Exit(1)
}
if tokenIndex := registryMap[ethutils.TokenIndex]; tokenIndex != ethutils.ZeroAddress {
tokenIndexIter, err := chainProvider.NewBatchIterator(ctx, tokenIndex)
if err != nil {
lo.Error("could not create token index iter", "error", err)
os.Exit(1)
}
for {
batch, err := tokenIndexIter.Next(ctx)
if err != nil {
lo.Error("error fetching next token index batch", "error", err)
os.Exit(1)
}
if batch == nil {
break
}
lo.Debug("index batch", "index", tokenIndex.Hex(), "size", len(batch))
for _, address := range batch {
if address != ethutils.ZeroAddress {
var (
tokenName string
tokenSymbol string
tokenDecimals uint8
)
err := chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(address, nameGetter).Returns(&tokenName),
eth.CallFunc(address, symbolGetter).Returns(&tokenSymbol),
eth.CallFunc(address, decimalsGetter).Returns(&tokenDecimals),
)
if err != nil {
lo.Error("error fetching token details", "error", err)
os.Exit(1)
}
if err := insertToken(ctx, TokenArgs{
ContractAddress: address.Hex(),
TokenName: tokenName,
TokenSymbol: tokenSymbol,
TokenDecimals: tokenDecimals,
}); err != nil {
lo.Error("pg insert error", "error", err)
os.Exit(1)
}
}
}
}
}
if poolIndex := registryMap[ethutils.PoolIndex]; poolIndex != ethutils.ZeroAddress {
poolIndexIter, err := chainProvider.NewBatchIterator(ctx, poolIndex)
if err != nil {
lo.Error("cache could create pool index iter", "error", err)
os.Exit(1)
}
for {
batch, err := poolIndexIter.Next(ctx)
if err != nil {
lo.Error("error fetching next pool index batch", "error", err)
os.Exit(1)
}
if batch == nil {
break
}
lo.Debug("index batch", "index", poolIndex.Hex(), "size", len(batch))
for _, address := range batch {
var poolTokenIndex common.Address
err := chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(address, tokenRegistryGetter).Returns(&poolTokenIndex),
)
if err != nil {
lo.Error("error fetching pool token index and/or quoter", "error", err)
os.Exit(1)
}
if poolTokenIndex != ethutils.ZeroAddress {
poolTokenIndexIter, err := chainProvider.NewBatchIterator(ctx, poolTokenIndex)
if err != nil {
lo.Error("error creating pool token index iter", "error", err)
os.Exit(1)
}
for {
batch, err := poolTokenIndexIter.Next(ctx)
if err != nil {
lo.Error("error fetching next pool token index batch", "error", err)
os.Exit(1)
}
if batch == nil {
break
}
lo.Debug("index batch", "index", poolTokenIndex.Hex(), "size", len(batch))
for _, address := range batch {
if address != ethutils.ZeroAddress {
var (
tokenName string
tokenSymbol string
tokenDecimals uint8
)
err := chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(address, nameGetter).Returns(&tokenName),
eth.CallFunc(address, symbolGetter).Returns(&tokenSymbol),
eth.CallFunc(address, decimalsGetter).Returns(&tokenDecimals),
)
if err != nil {
lo.Error("error fetching token details", "error", err)
os.Exit(1)
}
if err := insertToken(ctx, TokenArgs{
ContractAddress: address.Hex(),
TokenName: tokenName,
TokenSymbol: tokenSymbol,
TokenDecimals: tokenDecimals,
}); err != nil {
lo.Error("pg insert error", "error", err)
os.Exit(1)
}
}
}
}
}
}
}
}
}
lo.Info("tokens bootstrap complete")
}
func newPgStore() (*pgxpool.Pool, error) {
store, err := store.NewPgStore(store.PgOpts{
Logg: lo,
DSN: ko.MustString("postgres.dsn"),
MigrationsFolderPath: migrationsFolderFlag,
QueriesFolderPath: queriesFlag,
})
if err != nil {
lo.Error("could not initialize postgres store", "error", err)
os.Exit(1)
}
return store.Pool(), nil
}
func insertToken(ctx context.Context, insertArgs TokenArgs) error {
_, err := dbPool.Exec(
ctx,
insertTokenQuery,
insertArgs.ContractAddress,
insertArgs.TokenName,
insertArgs.TokenSymbol,
insertArgs.TokenDecimals,
)
if err != nil {
return err
}
return nil
}

View File

@ -13,10 +13,12 @@ import (
"time" "time"
"github.com/grassrootseconomics/eth-indexer/internal/api" "github.com/grassrootseconomics/eth-indexer/internal/api"
"github.com/grassrootseconomics/eth-indexer/internal/cache"
"github.com/grassrootseconomics/eth-indexer/internal/handler" "github.com/grassrootseconomics/eth-indexer/internal/handler"
"github.com/grassrootseconomics/eth-indexer/internal/store" "github.com/grassrootseconomics/eth-indexer/internal/store"
"github.com/grassrootseconomics/eth-indexer/internal/sub" "github.com/grassrootseconomics/eth-indexer/internal/sub"
"github.com/grassrootseconomics/eth-indexer/internal/util" "github.com/grassrootseconomics/eth-indexer/internal/util"
"github.com/grassrootseconomics/ethutils"
"github.com/knadh/koanf/v2" "github.com/knadh/koanf/v2"
) )
@ -60,14 +62,25 @@ func main() {
os.Exit(1) os.Exit(1)
} }
handler := handler.NewHandler(handler.HandlerOpts{ cache := cache.New()
chainProvider := ethutils.NewProvider(
ko.MustString("chain.rpc_endpoint"),
ko.MustInt64("chain.chainid"),
)
handlerContainer := handler.NewHandler(handler.HandlerOpts{
Store: store, Store: store,
Cache: cache,
ChainProvider: chainProvider,
Logg: lo,
}) })
router := bootstrapRouter(handlerContainer)
jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{ jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{
Logg: lo, Logg: lo,
Store: store, Router: router,
Handler: handler,
Endpoint: ko.MustString("jetstream.endpoint"), Endpoint: ko.MustString("jetstream.endpoint"),
JetStreamID: ko.MustString("jetstream.id"), JetStreamID: ko.MustString("jetstream.id"),
}) })

43
cmd/service/router.go Normal file
View File

@ -0,0 +1,43 @@
package main
import (
"github.com/grassrootseconomics/eth-indexer/internal/handler"
"github.com/grassrootseconomics/eth-indexer/pkg/router"
)
func bootstrapRouter(handlerContainer *handler.Handler) *router.Router {
router := router.New(lo)
router.RegisterRoute(
"TRACKER.TOKEN_TRANSFER",
handlerContainer.IndexTransfer,
handlerContainer.AddToken,
)
router.RegisterRoute(
"TRACKER.TOKEN_MINT",
handlerContainer.IndexTokenMint,
handlerContainer.AddToken,
)
router.RegisterRoute(
"TRACKER.TOKEN_BURN",
handlerContainer.IndexTokenMint,
handlerContainer.AddToken,
)
router.RegisterRoute(
"TRACKER.POOL_SWAP",
handlerContainer.IndexPoolSwap,
handlerContainer.AddPool,
)
router.RegisterRoute(
"TRACKER.POOL_DEPOSIT",
handlerContainer.IndexPoolDeposit,
handlerContainer.AddPool,
)
router.RegisterRoute(
"TRACKER.FAUCET_GIVE",
handlerContainer.IndexFaucetGive,
handlerContainer.FaucetHealthCheck,
)
return router
}

6
go.mod
View File

@ -5,6 +5,7 @@ go 1.23.0
require ( require (
github.com/VictoriaMetrics/metrics v1.35.1 github.com/VictoriaMetrics/metrics v1.35.1
github.com/ethereum/go-ethereum v1.14.8 github.com/ethereum/go-ethereum v1.14.8
github.com/go-chi/chi/v5 v5.1.0
github.com/grassrootseconomics/eth-tracker v1.2.2-rc github.com/grassrootseconomics/eth-tracker v1.2.2-rc
github.com/grassrootseconomics/ethutils v1.3.0 github.com/grassrootseconomics/ethutils v1.3.0
github.com/jackc/pgx/v5 v5.6.0 github.com/jackc/pgx/v5 v5.6.0
@ -17,7 +18,8 @@ require (
github.com/knadh/koanf/v2 v2.1.1 github.com/knadh/koanf/v2 v2.1.1
github.com/lmittmann/w3 v0.17.1 github.com/lmittmann/w3 v0.17.1
github.com/nats-io/nats.go v1.37.0 github.com/nats-io/nats.go v1.37.0
github.com/uptrace/bunrouter v1.0.22 github.com/puzpuzpuz/xsync/v3 v3.4.0
github.com/sourcegraph/conc v0.3.0
) )
require ( require (
@ -63,6 +65,8 @@ require (
github.com/tklauser/numcpus v0.6.1 // indirect github.com/tklauser/numcpus v0.6.1 // indirect
github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect github.com/valyala/histogram v1.2.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.27.0 // indirect golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/sync v0.8.0 // indirect golang.org/x/sync v0.8.0 // indirect

13
go.sum
View File

@ -47,6 +47,7 @@ github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c/go.mod h1:geZJ
github.com/crate-crypto/go-kzg-4844 v1.0.0 h1:TsSgHwrkTKecKJ4kadtHi4b3xHW5dCFUDFnUp1TsawI= github.com/crate-crypto/go-kzg-4844 v1.0.0 h1:TsSgHwrkTKecKJ4kadtHi4b3xHW5dCFUDFnUp1TsawI=
github.com/crate-crypto/go-kzg-4844 v1.0.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc= github.com/crate-crypto/go-kzg-4844 v1.0.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
@ -67,6 +68,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
@ -176,6 +179,8 @@ github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuI
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
@ -184,6 +189,8 @@ github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -199,12 +206,14 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/uptrace/bunrouter v1.0.22 h1:634bRGogHxjMaSqc5a3MjM/sisS/MkfXhWJ/WZXrktc=
github.com/uptrace/bunrouter v1.0.22/go.mod h1:O3jAcl+5qgnF+ejhgkmbceEk0E/mqaK+ADOocdNpY8M=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=

View File

@ -4,20 +4,19 @@ import (
"net/http" "net/http"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/uptrace/bunrouter" "github.com/go-chi/chi/v5"
) )
func New() *bunrouter.Router { func New() *chi.Mux {
router := bunrouter.New() r := chi.NewRouter()
router.GET("/metrics", metricsHandler()) r.Get("/metrics", metricsHandler())
return router return r
} }
func metricsHandler() bunrouter.HandlerFunc { func metricsHandler() http.HandlerFunc {
return func(w http.ResponseWriter, _ bunrouter.Request) error { return func(w http.ResponseWriter, _ *http.Request) {
metrics.WritePrometheus(w, true) metrics.WritePrometheus(w, true)
return nil
} }
} }

26
internal/cache/cache.go vendored Normal file
View File

@ -0,0 +1,26 @@
package cache
import "github.com/puzpuzpuz/xsync/v3"
type Cache struct {
provider *xsync.MapOf[string, bool]
}
func New() *Cache {
return &Cache{
provider: xsync.NewMapOf[string, bool](),
}
}
func (c *Cache) Set(key string) {
c.provider.Store(key, true)
}
func (c *Cache) Get(key string) bool {
v, _ := c.provider.Load(key)
return v
}
func (c *Cache) Size() int {
return c.provider.Size()
}

View File

@ -0,0 +1,81 @@
package handler
import (
"context"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/grassrootseconomics/ethutils"
"github.com/lmittmann/w3"
"github.com/lmittmann/w3/module/eth"
)
var (
nameGetter = w3.MustNewFunc("name()", "string")
symbolGetter = w3.MustNewFunc("symbol()", "string")
decimalsGetter = w3.MustNewFunc("decimals()", "uint8")
sinkAddressGetter = w3.MustNewFunc("sinkAddress()", "address")
)
func (h *Handler) AddToken(ctx context.Context, event event.Event) error {
if h.cache.Get(event.ContractAddress) {
return nil
}
var (
tokenName string
tokenSymbol string
tokenDecimals uint8
sinkAddress common.Address
batchErr w3.CallErrors
)
contractAddress := w3.A(event.ContractAddress)
if err := h.chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName),
eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol),
eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals),
); errors.As(err, &batchErr) {
return batchErr
} else if err != nil {
return err
}
if err := h.chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals),
); err != nil {
// This will most likely revert if the contract does not have a sinkAddress
// Instead of handling the error we just ignore it and set the value to 0
sinkAddress = ethutils.ZeroAddress
}
return h.store.InsertToken(ctx, event.ContractAddress, tokenName, tokenSymbol, tokenDecimals, sinkAddress.Hex())
}
func (h *Handler) AddPool(ctx context.Context, event event.Event) error {
if h.cache.Get(event.ContractAddress) {
return nil
}
var (
tokenName string
tokenSymbol string
)
contractAddress := w3.A(event.ContractAddress)
if err := h.chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName),
eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol),
); err != nil {
return err
}
return h.store.InsertPool(ctx, event.ContractAddress, tokenName, tokenSymbol)
}

View File

@ -0,0 +1,33 @@
package handler
import (
"context"
"math/big"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/lmittmann/w3"
"github.com/lmittmann/w3/module/eth"
)
const balanceThreshold = 5
func (h *Handler) IndexFaucetGive(ctx context.Context, event event.Event) error {
return h.store.InsertFaucetGive(ctx, event)
}
func (h *Handler) FaucetHealthCheck(ctx context.Context, event event.Event) error {
var balance *big.Int
if err := h.chainProvider.Client.CallCtx(
ctx,
eth.Balance(w3.A(event.ContractAddress), nil).Returns(&balance),
); err != nil {
return err
}
if balance.Cmp(new(big.Int).Mul(w3.BigEther, big.NewInt(balanceThreshold))) < 0 {
h.logg.Warn("faucet balance is less than 5 ether", "faucet", event.ContractAddress)
}
return nil
}

View File

@ -1,52 +1,34 @@
package handler package handler
import ( import (
"context" "log/slog"
"encoding/json"
"github.com/grassrootseconomics/eth-indexer/internal/cache"
"github.com/grassrootseconomics/eth-indexer/internal/store" "github.com/grassrootseconomics/eth-indexer/internal/store"
"github.com/grassrootseconomics/eth-tracker/pkg/event" "github.com/grassrootseconomics/ethutils"
) )
type ( type (
HandlerOpts struct { HandlerOpts struct {
Store store.Store Store store.Store
Cache *cache.Cache
ChainProvider *ethutils.Provider
Logg *slog.Logger
} }
Handler struct { Handler struct {
store store.Store store store.Store
cache *cache.Cache
chainProvider *ethutils.Provider
logg *slog.Logger
} }
) )
func NewHandler(o HandlerOpts) *Handler { func NewHandler(o HandlerOpts) *Handler {
return &Handler{ return &Handler{
store: o.Store, store: o.Store,
cache: o.Cache,
chainProvider: o.ChainProvider,
logg: o.Logg,
} }
} }
func (h *Handler) Handle(ctx context.Context, msgSubject string, msgData []byte) error {
var chainEvent event.Event
if err := json.Unmarshal(msgData, &chainEvent); err != nil {
return err
}
switch msgSubject {
case "TRACKER.TOKEN_TRANSFER":
return h.store.InsertTokenTransfer(ctx, chainEvent)
case "TRACKER.POOL_SWAP":
return h.store.InsertPoolSwap(ctx, chainEvent)
case "TRACKER.FAUCET_GIVE":
return h.store.InsertFaucetGive(ctx, chainEvent)
case "TRACKER.POOL_DEPOSIT":
return h.store.InsertPoolDeposit(ctx, chainEvent)
case "TRACKER.TOKEN_MINT":
return h.store.InsertTokenMint(ctx, chainEvent)
case "TRACKER.TOKEN_BURN":
return h.store.InsertTokenBurn(ctx, chainEvent)
case "TRACKER.QUOTER_PRICE_INDEX_UPDATED":
return h.store.InsertPriceQuoteUpdate(ctx, chainEvent)
}
return nil
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexPoolDeposit(ctx context.Context, event event.Event) error {
return h.store.InsertPoolDeposit(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexPoolSwap(ctx context.Context, event event.Event) error {
return h.store.InsertPoolSwap(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexTokenBurn(ctx context.Context, event event.Event) error {
return h.store.InsertTokenBurn(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexTokenMint(ctx context.Context, event event.Event) error {
return h.store.InsertTokenMint(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexTransfer(ctx context.Context, event event.Event) error {
return h.store.InsertTokenTransfer(ctx, event)
}

View File

@ -36,7 +36,8 @@ type (
InsertFaucetGive string `query:"insert-faucet-give"` InsertFaucetGive string `query:"insert-faucet-give"`
InsertPoolSwap string `query:"insert-pool-swap"` InsertPoolSwap string `query:"insert-pool-swap"`
InsertPoolDeposit string `query:"insert-pool-deposit"` InsertPoolDeposit string `query:"insert-pool-deposit"`
InsertPriceQuoteUpdate string `query:"insert-price-quote-update"` InsertToken string `query:"insert-token"`
InsertPool string `query:"insert-pool"`
} }
) )
@ -198,20 +199,29 @@ func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) e
}) })
} }
func (pg *Pg) InsertPriceQuoteUpdate(ctx context.Context, eventPayload event.Event) error { func (pg *Pg) InsertToken(ctx context.Context, contractAddress string, name string, symbol string, decimals uint8, sinkAddress string) error {
return pg.executeTransaction(ctx, func(tx pgx.Tx) error { return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
txID, err := pg.insertTx(ctx, tx, eventPayload) _, err := tx.Exec(
if err != nil { ctx,
pg.queries.InsertToken,
contractAddress,
name,
symbol,
decimals,
sinkAddress,
)
return err return err
})
} }
_, err = tx.Exec( func (pg *Pg) InsertPool(ctx context.Context, contractAddress string, name string, symbol string) error {
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
_, err := tx.Exec(
ctx, ctx,
pg.queries.InsertPriceQuoteUpdate, pg.queries.InsertPool,
txID, contractAddress,
eventPayload.Payload["token"].(string), name,
eventPayload.Payload["exchangeRate"].(string), symbol,
eventPayload.ContractAddress,
) )
return err return err
}) })

View File

@ -15,7 +15,8 @@ type (
InsertFaucetGive(context.Context, event.Event) error InsertFaucetGive(context.Context, event.Event) error
InsertPoolSwap(context.Context, event.Event) error InsertPoolSwap(context.Context, event.Event) error
InsertPoolDeposit(context.Context, event.Event) error InsertPoolDeposit(context.Context, event.Event) error
InsertPriceQuoteUpdate(context.Context, event.Event) error InsertToken(context.Context, string, string, string, uint8, string) error
InsertPool(context.Context, string, string, string) error
Pool() *pgxpool.Pool Pool() *pgxpool.Pool
Close() Close()
} }

View File

@ -6,27 +6,24 @@ import (
"log/slog" "log/slog"
"time" "time"
"github.com/grassrootseconomics/eth-indexer/internal/handler" "github.com/grassrootseconomics/eth-indexer/pkg/router"
"github.com/grassrootseconomics/eth-indexer/internal/store"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream" "github.com/nats-io/nats.go/jetstream"
) )
type ( type (
JetStreamOpts struct { JetStreamOpts struct {
Store store.Store
Logg *slog.Logger
Handler *handler.Handler
Endpoint string Endpoint string
JetStreamID string JetStreamID string
Logg *slog.Logger
Router *router.Router
} }
JetStreamSub struct { JetStreamSub struct {
jsConsumer jetstream.Consumer jsIter jetstream.MessagesContext
store store.Store
handler *handler.Handler
natsConn *nats.Conn
logg *slog.Logger logg *slog.Logger
natsConn *nats.Conn
router *router.Router
durableID string durableID string
} }
) )
@ -36,7 +33,7 @@ const (
pullSubject = "TRACKER.*" pullSubject = "TRACKER.*"
) )
func NewJetStreamSub(o JetStreamOpts) (Sub, error) { func NewJetStreamSub(o JetStreamOpts) (*JetStreamSub, error) {
natsConn, err := nats.Connect(o.Endpoint) natsConn, err := nats.Connect(o.Endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
@ -58,16 +55,24 @@ func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: o.JetStreamID, Durable: o.JetStreamID,
AckPolicy: jetstream.AckExplicitPolicy, AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: pullSubject,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
o.Logg.Info("successfully connected to NATS server") o.Logg.Info("successfully connected to NATS server")
iter, err := consumer.Messages(
jetstream.WithMessagesErrOnMissingHeartbeat(false),
jetstream.PullMaxMessages(10),
)
if err != nil {
return nil, err
}
return &JetStreamSub{ return &JetStreamSub{
jsConsumer: consumer, jsIter: iter,
store: o.Store, router: o.Router,
handler: o.Handler,
natsConn: natsConn, natsConn: natsConn,
logg: o.Logg, logg: o.Logg,
durableID: o.JetStreamID, durableID: o.JetStreamID,
@ -75,31 +80,25 @@ func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
} }
func (s *JetStreamSub) Close() { func (s *JetStreamSub) Close() {
if s.natsConn != nil { s.jsIter.Stop()
s.natsConn.Close()
}
} }
func (s *JetStreamSub) Process() error { func (s *JetStreamSub) Process() {
for { for {
events, err := s.jsConsumer.Fetch(100, jetstream.FetchMaxWait(1*time.Second)) msg, err := s.jsIter.Next()
if err != nil { if err != nil {
if errors.Is(err, nats.ErrTimeout) { if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
continue s.logg.Debug("jetstream: iterator closed")
} else if errors.Is(err, nats.ErrConnectionClosed) { return
return nil
} else { } else {
return err s.logg.Debug("jetstream: unknown iterator error", "error", err)
continue
} }
} }
for msg := range events.Messages() { s.logg.Debug("processing nats message", "subject", msg.Subject())
if err := s.handler.Handle(context.Background(), msg.Subject(), msg.Data()); err != nil { if err := s.router.Handle(context.Background(), msg); err != nil {
s.logg.Error("error processing nats message", "error", err) s.logg.Error("jetstream: router: error processing nats message", "error", err)
msg.Nak()
} else {
msg.Ack()
}
} }
} }
} }

View File

@ -1,8 +0,0 @@
package sub
type (
Sub interface {
Process() error
Close()
}
)

View File

@ -2,7 +2,6 @@ CREATE TABLE IF NOT EXISTS tx (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tx_hash VARCHAR(66) NOT NULL UNIQUE, tx_hash VARCHAR(66) NOT NULL UNIQUE,
block_number INT NOT NULL, block_number INT NOT NULL,
contract_address VARCHAR(42) NOT NULL,
date_block TIMESTAMP NOT NULL, date_block TIMESTAMP NOT NULL,
success BOOLEAN NOT NULL success BOOLEAN NOT NULL
); );
@ -12,6 +11,7 @@ CREATE TABLE IF NOT EXISTS token_transfer (
tx_id INT REFERENCES tx(id), tx_id INT REFERENCES tx(id),
sender_address VARCHAR(42) NOT NULL, sender_address VARCHAR(42) NOT NULL,
recipient_address VARCHAR(42) NOT NULL, recipient_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
transfer_value NUMERIC NOT NULL transfer_value NUMERIC NOT NULL
); );
@ -20,6 +20,7 @@ CREATE TABLE IF NOT EXISTS token_mint (
tx_id INT REFERENCES tx(id), tx_id INT REFERENCES tx(id),
minter_address VARCHAR(42) NOT NULL, minter_address VARCHAR(42) NOT NULL,
recipient_address VARCHAR(42) NOT NULL, recipient_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
mint_value NUMERIC NOT NULL mint_value NUMERIC NOT NULL
); );
@ -27,6 +28,7 @@ CREATE TABLE IF NOT EXISTS token_burn (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tx_id INT REFERENCES tx(id), tx_id INT REFERENCES tx(id),
burner_address VARCHAR(42) NOT NULL, burner_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
burn_value NUMERIC NOT NULL burn_value NUMERIC NOT NULL
); );
@ -35,6 +37,7 @@ CREATE TABLE IF NOT EXISTS faucet_give (
tx_id INT REFERENCES tx(id), tx_id INT REFERENCES tx(id),
token_address VARCHAR(42) NOT NULL, token_address VARCHAR(42) NOT NULL,
recipient_address VARCHAR(42) NOT NULL, recipient_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
give_value NUMERIC NOT NULL give_value NUMERIC NOT NULL
); );
@ -47,6 +50,7 @@ CREATE TABLE IF NOT EXISTS pool_swap (
token_out_address VARCHAR(42) NOT NULL, token_out_address VARCHAR(42) NOT NULL,
in_value NUMERIC NOT NULL, in_value NUMERIC NOT NULL,
out_value NUMERIC NOT NULL, out_value NUMERIC NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
fee NUMERIC NOT NULL fee NUMERIC NOT NULL
); );
@ -55,29 +59,22 @@ CREATE TABLE IF NOT EXISTS pool_deposit (
tx_id INT REFERENCES tx(id), tx_id INT REFERENCES tx(id),
initiator_address VARCHAR(42) NOT NULL, initiator_address VARCHAR(42) NOT NULL,
token_in_address VARCHAR(42) NOT NULL, token_in_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
in_value NUMERIC NOT NULL in_value NUMERIC NOT NULL
); );
CREATE TABLE IF NOT EXISTS price_index_updates (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tx_id INT REFERENCES tx(id),
token VARCHAR(42) NOT NULL,
exchange_rate NUMERIC NOT NULL
);
CREATE TABLE IF NOT EXISTS contracts (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
contract_address VARCHAR(42) UNIQUE NOT NULL,
contract_description TEXT NOT NULL,
is_token BOOLEAN NOT NULL
);
CREATE TABLE IF NOT EXISTS tokens ( CREATE TABLE IF NOT EXISTS tokens (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
contract_address VARCHAR(42) UNIQUE NOT NULL, contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
token_name TEXT NOT NULL, token_name TEXT NOT NULL,
token_symbol TEXT NOT NULL, token_symbol TEXT NOT NULL,
token_decimals INT NOT NULL, token_decimals INT NOT NULL,
token_version TEXT NOT NULL, sink_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000'
token_type TEXT NOT NULL );
CREATE TABLE IF NOT EXISTS pools (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
pool_name TEXT NOT NULL,
pool_symbol TEXT NOT NULL
); );

View File

@ -1,29 +0,0 @@
ALTER TABLE tx DROP COLUMN contract_address;
ALTER TABLE token_transfer ADD COLUMN contract_address VARCHAR(42);
UPDATE token_transfer SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE token_transfer ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE token_mint ADD COLUMN contract_address VARCHAR(42);
UPDATE token_mint SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE token_mint ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE token_burn ADD COLUMN contract_address VARCHAR(42);
UPDATE token_burn SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE token_burn ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE faucet_give ADD COLUMN contract_address VARCHAR(42);
UPDATE faucet_give SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE faucet_give ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE pool_swap ADD COLUMN contract_address VARCHAR(42);
UPDATE pool_swap SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE pool_swap ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE pool_deposit ADD COLUMN contract_address VARCHAR(42);
UPDATE pool_deposit SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE pool_deposit ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE price_index_updates ADD COLUMN contract_address VARCHAR(42);
UPDATE price_index_updates SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE price_index_updates ALTER COLUMN contract_address SET NOT NULL;

View File

@ -1 +0,0 @@
ALTER TABLE tokens ALTER token_version DROP NOT NULL, ALTER token_type DROP NOT NULL;

62
pkg/router/router.go Normal file
View File

@ -0,0 +1,62 @@
package router
import (
"context"
"encoding/json"
"log/slog"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/nats-io/nats.go/jetstream"
"github.com/sourcegraph/conc/pool"
)
type (
HandlerFunc func(context.Context, event.Event) error
Router struct {
logg *slog.Logger
handlers map[string][]HandlerFunc
}
)
func New(logg *slog.Logger) *Router {
return &Router{
handlers: make(map[string][]HandlerFunc),
logg: logg,
}
}
func (r *Router) RegisterRoute(subject string, handlerFunc ...HandlerFunc) {
r.handlers[subject] = handlerFunc
}
func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error {
handlers, ok := r.handlers[msg.Subject()]
if !ok {
r.logg.Debug("handler not found sending ack", "subject", msg.Subject())
return msg.Ack()
}
var chainEvent event.Event
if err := json.Unmarshal(msg.Data(), &chainEvent); err != nil {
return err
}
p := pool.New().WithErrors()
for _, handler := range handlers {
p.Go(func() error {
return handler(ctx, chainEvent)
})
}
if err := p.Wait(); err != nil {
r.logg.Error("handler error sending nack", "subject", msg.Subject(), "error", err)
if err := msg.Nak(); err != nil {
return err
}
return err
}
return msg.Ack()
}

View File

@ -104,14 +104,26 @@ INSERT INTO pool_deposit(
contract_address contract_address
) VALUES($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING ) VALUES($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING
--name: insert-price-quote-update --name: insert-token
-- $1: tx_id -- $1: contract_address
-- $2: token -- $2: token_name
-- $3: exchange_rate -- $3: token_symbol
-- $4: contract_address -- $4: token_decimals
INSERT INTO price_index_updates( -- $5: sink_address
tx_id, INSERT INTO tokens(
token, contract_address,
exchange_rate, token_name,
contract_address token_symbol,
) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING token_decimals,
sink_address
) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING
--name: insert-pool
-- $1: contract_address
-- $2: pool_name
-- $3: pool_symbol
INSERT INTO tokens(
contract_address,
pool_name,
pool_symbol
) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING