Compare commits

...

12 Commits

Author SHA1 Message Date
8d26e7bf3d
feat [UNTESTED]: set removed flags on pools and tokens 2025-02-10 13:37:20 +03:00
16ddf2b2ce
release: v2.2.0-stable 2025-01-20 15:59:33 +03:00
1f52616f97
release: v2.1.1-stable 2025-01-08 11:45:01 +03:00
d79aaafcd1
fix: migration file 2024-11-25 14:22:28 +03:00
defde73bd4
feat: index ownership changes, bump min go version 2024-11-25 13:53:50 +03:00
afd8e3f30b
hotfix: burn router handler 2024-11-21 15:00:33 +03:00
93118a8e9f
fix: sinkAddress getter 2024-11-14 12:43:39 +03:00
bba5be8964
fix: pool index query 2024-11-12 11:38:00 +03:00
dependabot[bot]
d6a23c08ed
build(deps): bump github.com/grassrootseconomics/eth-tracker (#19)
Bumps [github.com/grassrootseconomics/eth-tracker](https://github.com/grassrootseconomics/eth-tracker) from 1.2.2-rc to 1.3.0-rc.
- [Commits](https://github.com/grassrootseconomics/eth-tracker/compare/v1.2.2-rc...v1.3.0-rc)

---
updated-dependencies:
- dependency-name: github.com/grassrootseconomics/eth-tracker
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-10-31 10:46:30 +03:00
dependabot[bot]
af468f5942
build(deps): bump github.com/jackc/pgx/v5 from 5.6.0 to 5.7.1 (#21)
Bumps [github.com/jackc/pgx/v5](https://github.com/jackc/pgx) from 5.6.0 to 5.7.1.
- [Changelog](https://github.com/jackc/pgx/blob/master/CHANGELOG.md)
- [Commits](https://github.com/jackc/pgx/compare/v5.6.0...v5.7.1)

---
updated-dependencies:
- dependency-name: github.com/jackc/pgx/v5
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-10-31 10:46:21 +03:00
ac508f4830
conf: update to latest spec 2024-10-31 10:46:03 +03:00
2793d92343
Feat/consolidate functionality (#22)
* breaking: use event router, adsd telegram notifier, update indexer schema

* fix: early ack on handler not found

* fix: jetstream switch to new API, discard buffer on close

* remove telegram dependency, rely on log alters instead, which indirectly connect to telegram via uptrace

* feat (breaking): switch to self contained auto bootstrapper
2024-10-31 10:41:43 +03:00
34 changed files with 602 additions and 459 deletions

View File

@ -1,4 +1,4 @@
FROM golang:1.23.0-bookworm as build
FROM golang:1.23.3-bookworm as build
ENV CGO_ENABLED=1
@ -11,7 +11,6 @@ WORKDIR /build
COPY . .
RUN go mod download
RUN go build -o eth-indexer-bootstrap -ldflags="-X main.build=${BUILD} -s -w" cmd/bootstrap/main.go
RUN go build -o eth-indexer -ldflags="-X main.build=${BUILD} -s -w" cmd/service/*.go
FROM debian:bookworm-slim

View File

@ -1,5 +1,4 @@
BIN := eth-indexer
BOOTSTRAP_BIN := eth-indexer-cache-bootstrap
DB_FILE := tracker_db
BUILD_CONF := CGO_ENABLED=1 GOOS=linux GOARCH=amd64
BUILD_COMMIT := $(shell git rev-parse --short HEAD 2> /dev/null)
@ -8,17 +7,13 @@ DEBUG := DEV=true
.PHONY: build run run-bootstrap clean clean-debug
clean:
rm ${BIN} ${BOOTSTRAP_BIN}
rm ${BIN}
clean-db:
rm ${DB_FILE}
build:
${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BOOTSTRAP_BIN} cmd/bootstrap/main.go
${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BIN} cmd/service/*.go
run-bootstrap:
${BUILD_CONF} ${DEBUG} go run cmd/bootstrap/main.go
run:
${BUILD_CONF} ${DEBUG} go run cmd/service/*.go

View File

@ -2,18 +2,21 @@
![GitHub Tag](https://img.shields.io/github/v/tag/grassrootseconomics/eth-indexer)
A lightweight Postgres chain indexer designed to couple with [eth-tracker](https://github.com/grassrootseconomics/eth-tracker) to index all relevant GE related blockchain data on any EVM chain.
A lightweight Postgres chain indexer designed to couple with
[eth-tracker](https://github.com/grassrootseconomics/eth-tracker) to index all
relevant GE related blockchain data on any EVM chain.
## Getting Started
### Prerequisites
* Git
* Docker
* Postgres server
* Access to a `eth-tracker` instance
- Git
- Docker
- Postgres server
- Access to a `eth-tracker` instance
See [docker-compose.yaml](dev/docker-compose.yaml) for an example on how to run and deploy a single instance.
See [docker-compose.yaml](dev/docker-compose.yaml) for an example on how to run
and deploy a single instance.
### 1. Build the Docker image
@ -34,7 +37,9 @@ For an example, see `dev/docker-compose.postgres.yaml`.
### 3. Update config values
See `.env.example` on how to override default values defined in `config.toml` using env variables. Alternatively, mount your own config.toml either during build time or Docker runtime.
See `.env.example` on how to override default values defined in `config.toml`
using env variables. Alternatively, mount your own config.toml either during
build time or Docker runtime.
```bash
# Override only specific config values
@ -44,10 +49,10 @@ mv .env.example .env
Special env variables:
* DEV=*
Refer to [`config.toml`](config.toml) to understand different config value settings.
- DEV=*
Refer to [`config.toml`](config.toml) to understand different config value
settings.
### 4. Run the indexer
@ -58,4 +63,4 @@ docker compose up
## License
[AGPL-3.0](LICENSE).
[AGPL-3.0](LICENSE).

View File

@ -1,251 +0,0 @@
package main
import (
"context"
"flag"
"log/slog"
"os"
"time"
"github.com/grassrootseconomics/eth-indexer/internal/store"
"github.com/grassrootseconomics/eth-indexer/internal/util"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/knadh/koanf/v2"
"github.com/ethereum/go-ethereum/common"
"github.com/grassrootseconomics/ethutils"
"github.com/lmittmann/w3"
"github.com/lmittmann/w3/module/eth"
)
type TokenArgs struct {
ContractAddress string
TokenName string
TokenSymbol string
TokenDecimals uint8
}
const (
insertTokenQuery = `INSERT INTO tokens(
contract_address,
token_name,
token_symbol,
token_decimals
) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING`
)
var (
build = "dev"
confFlag string
migrationsFolderFlag string
queriesFlag string
lo *slog.Logger
ko *koanf.Koanf
dbPool *pgxpool.Pool
)
func init() {
flag.StringVar(&confFlag, "config", "config.toml", "Config file location")
flag.StringVar(&migrationsFolderFlag, "migrations", "migrations/", "Migrations folder location")
flag.StringVar(&queriesFlag, "queries", "queries.sql", "Queries file location")
flag.Parse()
lo = util.InitLogger()
ko = util.InitConfig(lo, confFlag)
lo.Info("starting GE indexer token bootstrapper", "build", build)
}
func main() {
var (
tokenRegistryGetter = w3.MustNewFunc("tokenRegistry()", "address")
nameGetter = w3.MustNewFunc("name()", "string")
symbolGetter = w3.MustNewFunc("symbol()", "string")
decimalsGetter = w3.MustNewFunc("decimals()", "uint8")
)
chainProvider := ethutils.NewProvider(ko.MustString("chain.rpc_endpoint"), ko.MustInt64("chain.chainid"))
var err error
dbPool, err = newPgStore()
if err != nil {
lo.Error("could not initialize postgres store", "error", err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
for _, registry := range ko.MustStrings("bootstrap.ge_registries") {
registryMap, err := chainProvider.RegistryMap(ctx, ethutils.HexToAddress(registry))
if err != nil {
lo.Error("could not fetch registry", "error", err)
os.Exit(1)
}
if tokenIndex := registryMap[ethutils.TokenIndex]; tokenIndex != ethutils.ZeroAddress {
tokenIndexIter, err := chainProvider.NewBatchIterator(ctx, tokenIndex)
if err != nil {
lo.Error("could not create token index iter", "error", err)
os.Exit(1)
}
for {
batch, err := tokenIndexIter.Next(ctx)
if err != nil {
lo.Error("error fetching next token index batch", "error", err)
os.Exit(1)
}
if batch == nil {
break
}
lo.Debug("index batch", "index", tokenIndex.Hex(), "size", len(batch))
for _, address := range batch {
if address != ethutils.ZeroAddress {
var (
tokenName string
tokenSymbol string
tokenDecimals uint8
)
err := chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(address, nameGetter).Returns(&tokenName),
eth.CallFunc(address, symbolGetter).Returns(&tokenSymbol),
eth.CallFunc(address, decimalsGetter).Returns(&tokenDecimals),
)
if err != nil {
lo.Error("error fetching token details", "error", err)
os.Exit(1)
}
if err := insertToken(ctx, TokenArgs{
ContractAddress: address.Hex(),
TokenName: tokenName,
TokenSymbol: tokenSymbol,
TokenDecimals: tokenDecimals,
}); err != nil {
lo.Error("pg insert error", "error", err)
os.Exit(1)
}
}
}
}
}
if poolIndex := registryMap[ethutils.PoolIndex]; poolIndex != ethutils.ZeroAddress {
poolIndexIter, err := chainProvider.NewBatchIterator(ctx, poolIndex)
if err != nil {
lo.Error("cache could create pool index iter", "error", err)
os.Exit(1)
}
for {
batch, err := poolIndexIter.Next(ctx)
if err != nil {
lo.Error("error fetching next pool index batch", "error", err)
os.Exit(1)
}
if batch == nil {
break
}
lo.Debug("index batch", "index", poolIndex.Hex(), "size", len(batch))
for _, address := range batch {
var poolTokenIndex common.Address
err := chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(address, tokenRegistryGetter).Returns(&poolTokenIndex),
)
if err != nil {
lo.Error("error fetching pool token index and/or quoter", "error", err)
os.Exit(1)
}
if poolTokenIndex != ethutils.ZeroAddress {
poolTokenIndexIter, err := chainProvider.NewBatchIterator(ctx, poolTokenIndex)
if err != nil {
lo.Error("error creating pool token index iter", "error", err)
os.Exit(1)
}
for {
batch, err := poolTokenIndexIter.Next(ctx)
if err != nil {
lo.Error("error fetching next pool token index batch", "error", err)
os.Exit(1)
}
if batch == nil {
break
}
lo.Debug("index batch", "index", poolTokenIndex.Hex(), "size", len(batch))
for _, address := range batch {
if address != ethutils.ZeroAddress {
var (
tokenName string
tokenSymbol string
tokenDecimals uint8
)
err := chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(address, nameGetter).Returns(&tokenName),
eth.CallFunc(address, symbolGetter).Returns(&tokenSymbol),
eth.CallFunc(address, decimalsGetter).Returns(&tokenDecimals),
)
if err != nil {
lo.Error("error fetching token details", "error", err)
os.Exit(1)
}
if err := insertToken(ctx, TokenArgs{
ContractAddress: address.Hex(),
TokenName: tokenName,
TokenSymbol: tokenSymbol,
TokenDecimals: tokenDecimals,
}); err != nil {
lo.Error("pg insert error", "error", err)
os.Exit(1)
}
}
}
}
}
}
}
}
}
lo.Info("tokens bootstrap complete")
}
func newPgStore() (*pgxpool.Pool, error) {
store, err := store.NewPgStore(store.PgOpts{
Logg: lo,
DSN: ko.MustString("postgres.dsn"),
MigrationsFolderPath: migrationsFolderFlag,
QueriesFolderPath: queriesFlag,
})
if err != nil {
lo.Error("could not initialize postgres store", "error", err)
os.Exit(1)
}
return store.Pool(), nil
}
func insertToken(ctx context.Context, insertArgs TokenArgs) error {
_, err := dbPool.Exec(
ctx,
insertTokenQuery,
insertArgs.ContractAddress,
insertArgs.TokenName,
insertArgs.TokenSymbol,
insertArgs.TokenDecimals,
)
if err != nil {
return err
}
return nil
}

View File

@ -12,11 +12,13 @@ import (
"syscall"
"time"
"github.com/grassrootseconomics/eth-indexer/internal/api"
"github.com/grassrootseconomics/eth-indexer/internal/handler"
"github.com/grassrootseconomics/eth-indexer/internal/store"
"github.com/grassrootseconomics/eth-indexer/internal/sub"
"github.com/grassrootseconomics/eth-indexer/internal/util"
"github.com/grassrootseconomics/eth-indexer/v2/internal/api"
"github.com/grassrootseconomics/eth-indexer/v2/internal/cache"
"github.com/grassrootseconomics/eth-indexer/v2/internal/handler"
"github.com/grassrootseconomics/eth-indexer/v2/internal/store"
"github.com/grassrootseconomics/eth-indexer/v2/internal/sub"
"github.com/grassrootseconomics/eth-indexer/v2/internal/util"
"github.com/grassrootseconomics/ethutils"
"github.com/knadh/koanf/v2"
)
@ -60,14 +62,25 @@ func main() {
os.Exit(1)
}
handler := handler.NewHandler(handler.HandlerOpts{
Store: store,
cache := cache.New()
chainProvider := ethutils.NewProvider(
ko.MustString("chain.rpc_endpoint"),
ko.MustInt64("chain.chainid"),
)
handlerContainer := handler.NewHandler(handler.HandlerOpts{
Store: store,
Cache: cache,
ChainProvider: chainProvider,
Logg: lo,
})
router := bootstrapRouter(handlerContainer)
jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{
Logg: lo,
Store: store,
Handler: handler,
Router: router,
Endpoint: ko.MustString("jetstream.endpoint"),
JetStreamID: ko.MustString("jetstream.id"),
})

52
cmd/service/router.go Normal file
View File

@ -0,0 +1,52 @@
package main
import (
"github.com/grassrootseconomics/eth-indexer/v2/internal/handler"
"github.com/grassrootseconomics/eth-indexer/v2/pkg/router"
)
func bootstrapRouter(handlerContainer *handler.Handler) *router.Router {
router := router.New(lo)
router.RegisterRoute(
"TRACKER.TOKEN_TRANSFER",
handlerContainer.IndexTransfer,
handlerContainer.AddToken,
)
router.RegisterRoute(
"TRACKER.TOKEN_MINT",
handlerContainer.IndexTokenMint,
handlerContainer.AddToken,
)
router.RegisterRoute(
"TRACKER.TOKEN_BURN",
handlerContainer.IndexTokenBurn,
handlerContainer.AddToken,
)
router.RegisterRoute(
"TRACKER.POOL_SWAP",
handlerContainer.IndexPoolSwap,
handlerContainer.AddPool,
)
router.RegisterRoute(
"TRACKER.POOL_DEPOSIT",
handlerContainer.IndexPoolDeposit,
handlerContainer.AddPool,
)
router.RegisterRoute(
"TRACKER.FAUCET_GIVE",
handlerContainer.IndexFaucetGive,
handlerContainer.FaucetHealthCheck,
)
router.RegisterRoute(
"TRACKER.OWNERSHIP_TRANSFERRED",
handlerContainer.IndexOwnershipChange,
)
router.RegisterRoute(
"TRACKER.INDEX_REMOVE",
handlerContainer.IndexRemove,
)
return router
}

View File

@ -5,16 +5,12 @@ go_process = true
address = ":5002"
[postgres]
dsn = "postgres://postgres:postgres@127.0.0.1:5433/ge_celo_data"
dsn = "postgres://postgres:postgres@127.0.0.1:5433/chain_data"
[jetstream]
endpoint = "nats://127.0.0.1:4222"
id = "celo-indexer-1"
id = "eth-indexer-1"
[chain]
rpc_endpoint = "http://localhost:8545"
chainid = 1337
[bootstrap]
# This will bootstrap the cache on which addresses to track
ge_registries = ["0xE979a64D375F5D363d7cecF3c93B9aFD40Ba9f55"]

View File

@ -16,8 +16,8 @@ services:
interval: 10s
timeout: 5s
retries: 5
celo-indexer:
image: celo-indexer:latest
eth-indexer:
image: ghcr.io/grassrootseconomics/eth-indexer:latest
restart: unless-stopped
depends_on:
postgres:

View File

@ -1 +1 @@
CREATE DATABASE ge_celo_data;
CREATE DATABASE chain_data;

18
go.mod
View File

@ -1,13 +1,14 @@
module github.com/grassrootseconomics/eth-indexer
module github.com/grassrootseconomics/eth-indexer/v2
go 1.23.0
go 1.23.3
require (
github.com/VictoriaMetrics/metrics v1.35.1
github.com/ethereum/go-ethereum v1.14.8
github.com/grassrootseconomics/eth-tracker v1.2.2-rc
github.com/go-chi/chi/v5 v5.1.0
github.com/grassrootseconomics/eth-tracker v1.3.0-rc
github.com/grassrootseconomics/ethutils v1.3.0
github.com/jackc/pgx/v5 v5.6.0
github.com/jackc/pgx/v5 v5.7.1
github.com/jackc/tern/v2 v2.2.3
github.com/kamikazechaser/common v0.2.0
github.com/knadh/goyesql/v2 v2.2.0
@ -17,7 +18,8 @@ require (
github.com/knadh/koanf/v2 v2.1.1
github.com/lmittmann/w3 v0.17.1
github.com/nats-io/nats.go v1.37.0
github.com/uptrace/bunrouter v1.0.22
github.com/puzpuzpuz/xsync/v3 v3.4.0
github.com/sourcegraph/conc v0.3.0
)
require (
@ -43,8 +45,8 @@ require (
github.com/holiman/uint256 v1.3.1 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/lmittmann/tint v1.0.4 // indirect
@ -63,6 +65,8 @@ require (
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/sync v0.8.0 // indirect

29
go.sum
View File

@ -47,6 +47,7 @@ github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c/go.mod h1:geZJ
github.com/crate-crypto/go-kzg-4844 v1.0.0 h1:TsSgHwrkTKecKJ4kadtHi4b3xHW5dCFUDFnUp1TsawI=
github.com/crate-crypto/go-kzg-4844 v1.0.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
@ -67,6 +68,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
@ -91,8 +94,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grassrootseconomics/eth-tracker v1.2.2-rc h1:71iSlRXMl9fgVUBNuMopGINiVy0Z9Lyx+rps7PTc8OY=
github.com/grassrootseconomics/eth-tracker v1.2.2-rc/go.mod h1:rLXM5u8FDHnMEdah8ACgo/wfawu4o2sljHGkky2rQKE=
github.com/grassrootseconomics/eth-tracker v1.3.0-rc h1:iYe2rwCBrU5O8x0+HSJRjcPT1h68k/uGd3i/cJJQuTQ=
github.com/grassrootseconomics/eth-tracker v1.3.0-rc/go.mod h1:rLXM5u8FDHnMEdah8ACgo/wfawu4o2sljHGkky2rQKE=
github.com/grassrootseconomics/ethutils v1.3.0 h1:0uX9HG7EujqoNyueYN2gB40zki50AIdxuNLmU0FZroU=
github.com/grassrootseconomics/ethutils v1.3.0/go.mod h1:Wuv1VEZrkLIXqTSEYI3Nh9HG/ZHOUQ+U+xvWJ8QtjgQ=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
@ -103,12 +106,12 @@ github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI
github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs=
github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackc/tern/v2 v2.2.3 h1:UWD24+m3zP7eRSlX9vYg2tb6Bf0V161IdOuo4YWWyd4=
github.com/jackc/tern/v2 v2.2.3/go.mod h1:EStqJVUowhII9OpCTcZISE1BfpGlwE4oq0oQtHAGuuI=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
@ -176,6 +179,8 @@ github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuI
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
@ -184,6 +189,8 @@ github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -199,12 +206,14 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/uptrace/bunrouter v1.0.22 h1:634bRGogHxjMaSqc5a3MjM/sisS/MkfXhWJ/WZXrktc=
github.com/uptrace/bunrouter v1.0.22/go.mod h1:O3jAcl+5qgnF+ejhgkmbceEk0E/mqaK+ADOocdNpY8M=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=

View File

@ -4,20 +4,19 @@ import (
"net/http"
"github.com/VictoriaMetrics/metrics"
"github.com/uptrace/bunrouter"
"github.com/go-chi/chi/v5"
)
func New() *bunrouter.Router {
router := bunrouter.New()
func New() *chi.Mux {
r := chi.NewRouter()
router.GET("/metrics", metricsHandler())
r.Get("/metrics", metricsHandler())
return router
return r
}
func metricsHandler() bunrouter.HandlerFunc {
return func(w http.ResponseWriter, _ bunrouter.Request) error {
func metricsHandler() http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
metrics.WritePrometheus(w, true)
return nil
}
}

26
internal/cache/cache.go vendored Normal file
View File

@ -0,0 +1,26 @@
package cache
import "github.com/puzpuzpuz/xsync/v3"
type Cache struct {
provider *xsync.MapOf[string, bool]
}
func New() *Cache {
return &Cache{
provider: xsync.NewMapOf[string, bool](),
}
}
func (c *Cache) Set(key string) {
c.provider.Store(key, true)
}
func (c *Cache) Get(key string) bool {
v, _ := c.provider.Load(key)
return v
}
func (c *Cache) Size() int {
return c.provider.Size()
}

View File

@ -0,0 +1,81 @@
package handler
import (
"context"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/grassrootseconomics/ethutils"
"github.com/lmittmann/w3"
"github.com/lmittmann/w3/module/eth"
)
var (
nameGetter = w3.MustNewFunc("name()", "string")
symbolGetter = w3.MustNewFunc("symbol()", "string")
decimalsGetter = w3.MustNewFunc("decimals()", "uint8")
sinkAddressGetter = w3.MustNewFunc("sinkAddress()", "address")
)
func (h *Handler) AddToken(ctx context.Context, event event.Event) error {
if h.cache.Get(event.ContractAddress) {
return nil
}
var (
tokenName string
tokenSymbol string
tokenDecimals uint8
sinkAddress common.Address
batchErr w3.CallErrors
)
contractAddress := w3.A(event.ContractAddress)
if err := h.chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName),
eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol),
eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals),
); errors.As(err, &batchErr) {
return batchErr
} else if err != nil {
return err
}
if err := h.chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(contractAddress, sinkAddressGetter).Returns(&sinkAddress),
); err != nil {
// This will most likely revert if the contract does not have a sinkAddress
// Instead of handling the error we just ignore it and set the value to 0
sinkAddress = ethutils.ZeroAddress
}
return h.store.InsertToken(ctx, event.ContractAddress, tokenName, tokenSymbol, tokenDecimals, sinkAddress.Hex())
}
func (h *Handler) AddPool(ctx context.Context, event event.Event) error {
if h.cache.Get(event.ContractAddress) {
return nil
}
var (
tokenName string
tokenSymbol string
)
contractAddress := w3.A(event.ContractAddress)
if err := h.chainProvider.Client.CallCtx(
ctx,
eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName),
eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol),
); err != nil {
return err
}
return h.store.InsertPool(ctx, event.ContractAddress, tokenName, tokenSymbol)
}

View File

@ -0,0 +1,33 @@
package handler
import (
"context"
"math/big"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/lmittmann/w3"
"github.com/lmittmann/w3/module/eth"
)
const balanceThreshold = 5
func (h *Handler) IndexFaucetGive(ctx context.Context, event event.Event) error {
return h.store.InsertFaucetGive(ctx, event)
}
func (h *Handler) FaucetHealthCheck(ctx context.Context, event event.Event) error {
var balance *big.Int
if err := h.chainProvider.Client.CallCtx(
ctx,
eth.Balance(w3.A(event.ContractAddress), nil).Returns(&balance),
); err != nil {
return err
}
if balance.Cmp(new(big.Int).Mul(w3.BigEther, big.NewInt(balanceThreshold))) < 0 {
h.logg.Warn("faucet balance is less than 5 ether", "faucet", event.ContractAddress)
}
return nil
}

View File

@ -1,52 +1,34 @@
package handler
import (
"context"
"encoding/json"
"log/slog"
"github.com/grassrootseconomics/eth-indexer/internal/store"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/grassrootseconomics/eth-indexer/v2/internal/cache"
"github.com/grassrootseconomics/eth-indexer/v2/internal/store"
"github.com/grassrootseconomics/ethutils"
)
type (
HandlerOpts struct {
Store store.Store
Store store.Store
Cache *cache.Cache
ChainProvider *ethutils.Provider
Logg *slog.Logger
}
Handler struct {
store store.Store
store store.Store
cache *cache.Cache
chainProvider *ethutils.Provider
logg *slog.Logger
}
)
func NewHandler(o HandlerOpts) *Handler {
return &Handler{
store: o.Store,
store: o.Store,
cache: o.Cache,
chainProvider: o.ChainProvider,
logg: o.Logg,
}
}
func (h *Handler) Handle(ctx context.Context, msgSubject string, msgData []byte) error {
var chainEvent event.Event
if err := json.Unmarshal(msgData, &chainEvent); err != nil {
return err
}
switch msgSubject {
case "TRACKER.TOKEN_TRANSFER":
return h.store.InsertTokenTransfer(ctx, chainEvent)
case "TRACKER.POOL_SWAP":
return h.store.InsertPoolSwap(ctx, chainEvent)
case "TRACKER.FAUCET_GIVE":
return h.store.InsertFaucetGive(ctx, chainEvent)
case "TRACKER.POOL_DEPOSIT":
return h.store.InsertPoolDeposit(ctx, chainEvent)
case "TRACKER.TOKEN_MINT":
return h.store.InsertTokenMint(ctx, chainEvent)
case "TRACKER.TOKEN_BURN":
return h.store.InsertTokenBurn(ctx, chainEvent)
case "TRACKER.QUOTER_PRICE_INDEX_UPDATED":
return h.store.InsertPriceQuoteUpdate(ctx, chainEvent)
}
return nil
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexRemove(ctx context.Context, event event.Event) error {
return h.store.RemoveContractAddress(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexOwnershipChange(ctx context.Context, event event.Event) error {
return h.store.InsertOwnershipChange(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexPoolDeposit(ctx context.Context, event event.Event) error {
return h.store.InsertPoolDeposit(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexPoolSwap(ctx context.Context, event event.Event) error {
return h.store.InsertPoolSwap(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexTokenBurn(ctx context.Context, event event.Event) error {
return h.store.InsertTokenBurn(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexTokenMint(ctx context.Context, event event.Event) error {
return h.store.InsertTokenMint(ctx, event)
}

View File

@ -0,0 +1,11 @@
package handler
import (
"context"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
)
func (h *Handler) IndexTransfer(ctx context.Context, event event.Event) error {
return h.store.InsertTokenTransfer(ctx, event)
}

View File

@ -29,14 +29,18 @@ type (
}
queries struct {
InsertTx string `query:"insert-tx"`
InsertTokenTransfer string `query:"insert-token-transfer"`
InsertTokenMint string `query:"insert-token-mint"`
InsertTokenBurn string `query:"insert-token-burn"`
InsertFaucetGive string `query:"insert-faucet-give"`
InsertPoolSwap string `query:"insert-pool-swap"`
InsertPoolDeposit string `query:"insert-pool-deposit"`
InsertPriceQuoteUpdate string `query:"insert-price-quote-update"`
InsertTx string `query:"insert-tx"`
InsertTokenTransfer string `query:"insert-token-transfer"`
InsertTokenMint string `query:"insert-token-mint"`
InsertTokenBurn string `query:"insert-token-burn"`
InsertFaucetGive string `query:"insert-faucet-give"`
InsertPoolSwap string `query:"insert-pool-swap"`
InsertPoolDeposit string `query:"insert-pool-deposit"`
InsertOwnershipChange string `query:"insert-ownership-change"`
InsertToken string `query:"insert-token"`
InsertPool string `query:"insert-pool"`
RemovePool string `query:"remove-pool"`
RemoveToken string `query:"remove-token"`
}
)
@ -198,7 +202,7 @@ func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) e
})
}
func (pg *Pg) InsertPriceQuoteUpdate(ctx context.Context, eventPayload event.Event) error {
func (pg *Pg) InsertOwnershipChange(ctx context.Context, eventPayload event.Event) error {
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
txID, err := pg.insertTx(ctx, tx, eventPayload)
if err != nil {
@ -207,16 +211,68 @@ func (pg *Pg) InsertPriceQuoteUpdate(ctx context.Context, eventPayload event.Eve
_, err = tx.Exec(
ctx,
pg.queries.InsertPriceQuoteUpdate,
pg.queries.InsertOwnershipChange,
txID,
eventPayload.Payload["token"].(string),
eventPayload.Payload["exchangeRate"].(string),
eventPayload.Payload["previousOwner"].(string),
eventPayload.Payload["newOwner"].(string),
eventPayload.ContractAddress,
)
return err
})
}
func (pg *Pg) InsertToken(ctx context.Context, contractAddress string, name string, symbol string, decimals uint8, sinkAddress string) error {
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
_, err := tx.Exec(
ctx,
pg.queries.InsertToken,
contractAddress,
name,
symbol,
decimals,
sinkAddress,
)
return err
})
}
func (pg *Pg) InsertPool(ctx context.Context, contractAddress string, name string, symbol string) error {
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
_, err := tx.Exec(
ctx,
pg.queries.InsertPool,
contractAddress,
name,
symbol,
)
return err
})
}
func (pg *Pg) RemoveContractAddress(ctx context.Context, eventPayload event.Event) error {
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
_, err := tx.Exec(
ctx,
pg.queries.RemovePool,
eventPayload.Payload["address"].(string),
)
if err != nil {
return err
}
_, err = tx.Exec(
ctx,
pg.queries.RemoveToken,
eventPayload.Payload["address"].(string),
)
if err != nil {
return err
}
return nil
})
}
func (pg *Pg) insertTx(ctx context.Context, tx pgx.Tx, eventPayload event.Event) (int, error) {
var txID int
if err := tx.QueryRow(

View File

@ -15,7 +15,10 @@ type (
InsertFaucetGive(context.Context, event.Event) error
InsertPoolSwap(context.Context, event.Event) error
InsertPoolDeposit(context.Context, event.Event) error
InsertPriceQuoteUpdate(context.Context, event.Event) error
InsertOwnershipChange(context.Context, event.Event) error
InsertToken(context.Context, string, string, string, uint8, string) error
InsertPool(context.Context, string, string, string) error
RemoveContractAddress(context.Context, event.Event) error
Pool() *pgxpool.Pool
Close()
}

View File

@ -6,28 +6,25 @@ import (
"log/slog"
"time"
"github.com/grassrootseconomics/eth-indexer/internal/handler"
"github.com/grassrootseconomics/eth-indexer/internal/store"
"github.com/grassrootseconomics/eth-indexer/v2/pkg/router"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type (
JetStreamOpts struct {
Store store.Store
Logg *slog.Logger
Handler *handler.Handler
Endpoint string
JetStreamID string
Logg *slog.Logger
Router *router.Router
}
JetStreamSub struct {
jsConsumer jetstream.Consumer
store store.Store
handler *handler.Handler
natsConn *nats.Conn
logg *slog.Logger
durableID string
jsIter jetstream.MessagesContext
logg *slog.Logger
natsConn *nats.Conn
router *router.Router
durableID string
}
)
@ -36,7 +33,7 @@ const (
pullSubject = "TRACKER.*"
)
func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
func NewJetStreamSub(o JetStreamOpts) (*JetStreamSub, error) {
natsConn, err := nats.Connect(o.Endpoint)
if err != nil {
return nil, err
@ -56,50 +53,52 @@ func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
}
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: o.JetStreamID,
AckPolicy: jetstream.AckExplicitPolicy,
Durable: o.JetStreamID,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: pullSubject,
})
if err != nil {
return nil, err
}
o.Logg.Info("successfully connected to NATS server")
iter, err := consumer.Messages(
jetstream.WithMessagesErrOnMissingHeartbeat(false),
jetstream.PullMaxMessages(10),
)
if err != nil {
return nil, err
}
return &JetStreamSub{
jsConsumer: consumer,
store: o.Store,
handler: o.Handler,
natsConn: natsConn,
logg: o.Logg,
durableID: o.JetStreamID,
jsIter: iter,
router: o.Router,
natsConn: natsConn,
logg: o.Logg,
durableID: o.JetStreamID,
}, nil
}
func (s *JetStreamSub) Close() {
if s.natsConn != nil {
s.natsConn.Close()
}
s.jsIter.Stop()
}
func (s *JetStreamSub) Process() error {
func (s *JetStreamSub) Process() {
for {
events, err := s.jsConsumer.Fetch(100, jetstream.FetchMaxWait(1*time.Second))
msg, err := s.jsIter.Next()
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
continue
} else if errors.Is(err, nats.ErrConnectionClosed) {
return nil
if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
s.logg.Debug("jetstream: iterator closed")
return
} else {
return err
s.logg.Debug("jetstream: unknown iterator error", "error", err)
continue
}
}
for msg := range events.Messages() {
if err := s.handler.Handle(context.Background(), msg.Subject(), msg.Data()); err != nil {
s.logg.Error("error processing nats message", "error", err)
msg.Nak()
} else {
msg.Ack()
}
s.logg.Debug("processing nats message", "subject", msg.Subject())
if err := s.router.Handle(context.Background(), msg); err != nil {
s.logg.Error("jetstream: router: error processing nats message", "error", err)
}
}
}

View File

@ -1,8 +0,0 @@
package sub
type (
Sub interface {
Process() error
Close()
}
)

View File

@ -2,7 +2,6 @@ CREATE TABLE IF NOT EXISTS tx (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tx_hash VARCHAR(66) NOT NULL UNIQUE,
block_number INT NOT NULL,
contract_address VARCHAR(42) NOT NULL,
date_block TIMESTAMP NOT NULL,
success BOOLEAN NOT NULL
);
@ -12,6 +11,7 @@ CREATE TABLE IF NOT EXISTS token_transfer (
tx_id INT REFERENCES tx(id),
sender_address VARCHAR(42) NOT NULL,
recipient_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
transfer_value NUMERIC NOT NULL
);
@ -20,6 +20,7 @@ CREATE TABLE IF NOT EXISTS token_mint (
tx_id INT REFERENCES tx(id),
minter_address VARCHAR(42) NOT NULL,
recipient_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
mint_value NUMERIC NOT NULL
);
@ -27,6 +28,7 @@ CREATE TABLE IF NOT EXISTS token_burn (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tx_id INT REFERENCES tx(id),
burner_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
burn_value NUMERIC NOT NULL
);
@ -35,6 +37,7 @@ CREATE TABLE IF NOT EXISTS faucet_give (
tx_id INT REFERENCES tx(id),
token_address VARCHAR(42) NOT NULL,
recipient_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
give_value NUMERIC NOT NULL
);
@ -47,6 +50,7 @@ CREATE TABLE IF NOT EXISTS pool_swap (
token_out_address VARCHAR(42) NOT NULL,
in_value NUMERIC NOT NULL,
out_value NUMERIC NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
fee NUMERIC NOT NULL
);
@ -55,29 +59,22 @@ CREATE TABLE IF NOT EXISTS pool_deposit (
tx_id INT REFERENCES tx(id),
initiator_address VARCHAR(42) NOT NULL,
token_in_address VARCHAR(42) NOT NULL,
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
in_value NUMERIC NOT NULL
);
CREATE TABLE IF NOT EXISTS price_index_updates (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tx_id INT REFERENCES tx(id),
token VARCHAR(42) NOT NULL,
exchange_rate NUMERIC NOT NULL
);
CREATE TABLE IF NOT EXISTS contracts (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
contract_address VARCHAR(42) UNIQUE NOT NULL,
contract_description TEXT NOT NULL,
is_token BOOLEAN NOT NULL
);
CREATE TABLE IF NOT EXISTS tokens (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
contract_address VARCHAR(42) UNIQUE NOT NULL,
contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
token_name TEXT NOT NULL,
token_symbol TEXT NOT NULL,
token_decimals INT NOT NULL,
token_version TEXT NOT NULL,
token_type TEXT NOT NULL
sink_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000'
);
CREATE TABLE IF NOT EXISTS pools (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
contract_address VARCHAR(42) UNIQUE NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
pool_name TEXT NOT NULL,
pool_symbol TEXT NOT NULL
);

View File

@ -1,29 +0,0 @@
ALTER TABLE tx DROP COLUMN contract_address;
ALTER TABLE token_transfer ADD COLUMN contract_address VARCHAR(42);
UPDATE token_transfer SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE token_transfer ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE token_mint ADD COLUMN contract_address VARCHAR(42);
UPDATE token_mint SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE token_mint ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE token_burn ADD COLUMN contract_address VARCHAR(42);
UPDATE token_burn SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE token_burn ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE faucet_give ADD COLUMN contract_address VARCHAR(42);
UPDATE faucet_give SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE faucet_give ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE pool_swap ADD COLUMN contract_address VARCHAR(42);
UPDATE pool_swap SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE pool_swap ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE pool_deposit ADD COLUMN contract_address VARCHAR(42);
UPDATE pool_deposit SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE pool_deposit ALTER COLUMN contract_address SET NOT NULL;
ALTER TABLE price_index_updates ADD COLUMN contract_address VARCHAR(42);
UPDATE price_index_updates SET contract_address = '0x0000000000000000000000000000000000000000';
ALTER TABLE price_index_updates ALTER COLUMN contract_address SET NOT NULL;

View File

@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS ownership_change (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tx_id INT REFERENCES tx(id),
previous_owner VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
new_owner VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000',
contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000'
);

View File

@ -0,0 +1,5 @@
ALTER TABLE tokens
ADD COLUMN removed BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE pools
ADD COLUMN removed BOOLEAN NOT NULL DEFAULT false;

View File

@ -1 +0,0 @@
ALTER TABLE tokens ALTER token_version DROP NOT NULL, ALTER token_type DROP NOT NULL;

62
pkg/router/router.go Normal file
View File

@ -0,0 +1,62 @@
package router
import (
"context"
"encoding/json"
"log/slog"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/nats-io/nats.go/jetstream"
"github.com/sourcegraph/conc/pool"
)
type (
HandlerFunc func(context.Context, event.Event) error
Router struct {
logg *slog.Logger
handlers map[string][]HandlerFunc
}
)
func New(logg *slog.Logger) *Router {
return &Router{
handlers: make(map[string][]HandlerFunc),
logg: logg,
}
}
func (r *Router) RegisterRoute(subject string, handlerFunc ...HandlerFunc) {
r.handlers[subject] = handlerFunc
}
func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error {
handlers, ok := r.handlers[msg.Subject()]
if !ok {
r.logg.Debug("handler not found sending ack", "subject", msg.Subject())
return msg.Ack()
}
var chainEvent event.Event
if err := json.Unmarshal(msg.Data(), &chainEvent); err != nil {
return err
}
p := pool.New().WithErrors()
for _, handler := range handlers {
p.Go(func() error {
return handler(ctx, chainEvent)
})
}
if err := p.Wait(); err != nil {
r.logg.Error("handler error sending nack", "subject", msg.Subject(), "error", err)
if err := msg.Nak(); err != nil {
return err
}
return err
}
return msg.Ack()
}

View File

@ -104,14 +104,46 @@ INSERT INTO pool_deposit(
contract_address
) VALUES($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING
--name: insert-price-quote-update
--name: insert-ownership-change
-- $1: tx_id
-- $2: token
-- $3: exchange_rate
-- $2: previous_owner
-- $3: new_owner
-- $4: contract_address
INSERT INTO price_index_updates(
INSERT INTO ownership_change(
tx_id,
token,
exchange_rate,
previous_owner,
new_owner,
contract_address
) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING
--name: insert-token
-- $1: contract_address
-- $2: token_name
-- $3: token_symbol
-- $4: token_decimals
-- $5: sink_address
INSERT INTO tokens(
contract_address,
token_name,
token_symbol,
token_decimals,
sink_address
) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING
--name: insert-pool
-- $1: contract_address
-- $2: pool_name
-- $3: pool_symbol
INSERT INTO pools(
contract_address,
pool_name,
pool_symbol
) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING
--name: remove-pool
-- $1: contract_address
UPDATE pools SET removed = true WHERE contract_address = $1
--name: remove-token
-- $1: contract_address
UPDATE tokens SET removed = true WHERE contract_address = $1