From 0ccdaa034537b0104bd945cf424768a63be5a514 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Fri, 6 Jan 2023 11:32:20 +0000 Subject: [PATCH] feat: add stats api, fix pipeline exe * added cUSD sample filters * decouple stats from metrics --- cmd/api.go | 21 ++++++++++++ cmd/filters.go | 16 +++++++++- cmd/main.go | 46 ++++++++++++++------------- config.toml | 19 ++++++----- docker-compose.yaml | 2 +- go.mod | 17 +++++++--- go.sum | 36 ++++++++++++++------- internal/api/stats.go | 51 ++++++++++++++++++++++++++++++ internal/api/types.go | 15 +++++++++ internal/exporter/exporter.go | 24 -------------- internal/filter/address_filter.go | 32 +++++++++++++++++++ internal/filter/noop_filter.go | 2 +- internal/filter/transfer_filter.go | 33 +++++++++++++++++++ internal/pipeline/pipeline.go | 3 +- internal/syncer/janitor.go | 1 - 15 files changed, 243 insertions(+), 75 deletions(-) create mode 100644 cmd/api.go create mode 100644 internal/api/stats.go create mode 100644 internal/api/types.go delete mode 100644 internal/exporter/exporter.go create mode 100644 internal/filter/address_filter.go create mode 100644 internal/filter/transfer_filter.go diff --git a/cmd/api.go b/cmd/api.go new file mode 100644 index 0000000..d1c90a4 --- /dev/null +++ b/cmd/api.go @@ -0,0 +1,21 @@ +package main + +import ( + "github.com/VictoriaMetrics/metrics" + "github.com/labstack/echo/v4" +) + +func initApiServer() *echo.Echo { + server := echo.New() + server.HideBanner = true + server.HidePort = true + + if ko.Bool("metrics.go_process") { + server.GET("/metrics", func(c echo.Context) error { + metrics.WritePrometheus(c.Response(), true) + return nil + }) + } + + return server +} diff --git a/cmd/filters.go b/cmd/filters.go index edf9d55..c79a3e7 100644 --- a/cmd/filters.go +++ b/cmd/filters.go @@ -1,6 +1,20 @@ package main -import "github.com/grassrootseconomics/cic-chain-events/internal/filter" +import ( + "github.com/grassrootseconomics/cic-chain-events/internal/filter" +) + +func initAddressFilter() filter.Filter { + return filter.NewAddressFilter(filter.AddressFilterOpts{ + Logg: lo, + }) +} + +func initTransferFilter() filter.Filter { + return filter.NewTransferFilter(filter.TransferFilterOpts{ + Logg: lo, + }) +} func initNoopFilter() filter.Filter { return filter.NewNoopFilter(filter.NoopFilterOpts{ diff --git a/cmd/main.go b/cmd/main.go index 5a8b9ce..866677c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,15 +3,14 @@ package main import ( "context" "flag" - "net/http" "os" "os/signal" + "strings" "sync" "syscall" "time" - "github.com/VictoriaMetrics/metrics" - "github.com/grassrootseconomics/cic-chain-events/internal/exporter" + "github.com/grassrootseconomics/cic-chain-events/internal/api" "github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/pipeline" "github.com/grassrootseconomics/cic-chain-events/internal/syncer" @@ -44,6 +43,7 @@ func init() { func main() { syncerStats := &syncer.Stats{} wg := &sync.WaitGroup{} + apiServer := initApiServer() ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() @@ -58,7 +58,9 @@ func main() { pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{ BlockFetcher: graphqlFetcher, Filters: []filter.Filter{ - initNoopFilter(), + initAddressFilter(), + initTransferFilter(), + // initNoopFilter(), }, Logg: lo, Store: pgStore, @@ -86,6 +88,8 @@ func main() { SweepInterval: time.Second * time.Duration(ko.MustInt64("indexer.sweep_interval")), }) + apiServer.GET("/stats", api.StatsHandler(syncerStats, workerPool, lo)) + wg.Add(1) go func() { defer wg.Done() @@ -102,28 +106,26 @@ func main() { } }() - if ko.Bool("metrics.expose") { - metricsServer := &http.Server{ - Addr: ":9090", - } - - wg.Add(1) - go func() { - defer wg.Done() - exporter.Register(syncerStats) - - http.HandleFunc("/metrics", func(w http.ResponseWriter, _ *http.Request) { - metrics.WritePrometheus(w, true) - }) - - if err := metricsServer.ListenAndServe(); err != nil { - lo.Fatal("metrics server error", "error", err) + wg.Add(1) + go func() { + defer wg.Done() + lo.Info("starting API server") + if err := apiServer.Start(ko.MustString("api.address")); err != nil { + if strings.Contains(err.Error(), "Server closed") { + lo.Info("shutting down server") + } else { + lo.Fatal("could not start api server", "err", err) } - }() - } + } + }() <-ctx.Done() lo.Info("graceful shutdown triggered") workerPool.Stop() + if err := apiServer.Shutdown(ctx); err != nil { + lo.Error("could not gracefully shutdown api server", "err", err) + } + + wg.Wait() } diff --git a/config.toml b/config.toml index 6ccb85b..8d9ad4e 100644 --- a/config.toml +++ b/config.toml @@ -1,18 +1,21 @@ [metrics] -expose = true +go_process = true + +[api] +address = ":8080" [chain] -graphql_endpoint = "https://rpc.alfajores.celo.grassecon.net/graphql" -ws_endpoint = "wss://ws.alfajores.celo.grassecon.net" +graphql_endpoint = "https://rpc.celo.grassecon.net/graphql" +ws_endpoint = "wss://ws.celo.grassecon.net" [indexer] -batch_size = 500 -concurrency = 10 -head_block_lag = 5 +batch_size = 200 +concurrency = 3 +head_block_lag = 5 idle_worker_timeout = 1 -initial_lower_bound = 15491311 +initial_lower_bound = 17034445 queue_size = 1250 -sweep_interval = 5 +sweep_interval = 10 [postgres] dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events" \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 6a4e829..523a9b5 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -7,7 +7,7 @@ services: environment: - POSTGRES_PASSWORD=postgres - POSTGRES_USER=postgres - - POSTGRES_DB=cic_indexer + - POSTGRES_DB=cic_chain_events volumes: - cic-indexer-pg:/var/lib/postgresql/data ports: diff --git a/go.mod b/go.mod index 33a8b38..11ac124 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,8 @@ require ( github.com/jackc/pgx/v5 v5.2.0 github.com/knadh/goyesql/v2 v2.2.0 github.com/knadh/koanf v1.4.5 - github.com/stretchr/testify v1.8.0 + github.com/labstack/echo/v4 v4.2.1 + github.com/stretchr/testify v1.8.1 github.com/zerodha/logf v0.5.5 ) @@ -19,6 +20,7 @@ require ( github.com/celo-org/celo-bls-go v0.2.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea // indirect + github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-stack/stack v1.8.0 // indirect @@ -26,6 +28,9 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/puddle/v2 v2.1.2 // indirect + github.com/labstack/gommon v0.3.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.17 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect @@ -35,13 +40,17 @@ require ( github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect + github.com/valyala/fasttemplate v1.2.1 // indirect github.com/valyala/histogram v1.2.0 // indirect go.uber.org/atomic v1.10.0 // indirect - golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect + golang.org/x/crypto v0.5.0 // indirect + golang.org/x/net v0.5.0 // indirect golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 // indirect - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect - golang.org/x/text v0.3.8 // indirect + golang.org/x/sys v0.4.0 // indirect + golang.org/x/text v0.6.0 // indirect + golang.org/x/time v0.2.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4ac14a7..ece8d17 100644 --- a/go.sum +++ b/go.sum @@ -129,6 +129,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea h1:j4317fAZh7X6GqbFowYdYdI0L9bwxL07jyPZIdepyZ0= github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ= +github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI= +github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/deepmap/oapi-codegen v1.6.0/go.mod h1:ryDa9AgbELGeB+YEXE1dR53yAjHwFvE9iAUlWl9Al3M= github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= @@ -359,7 +361,9 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/labstack/echo/v4 v4.2.1 h1:LF5Iq7t/jrtUuSutNuiEWtB5eiHfZ5gSe2pcu5exjQw= github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= +github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -372,8 +376,9 @@ github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-ieproxy v0.0.0-20190610004146-91bb50d98149/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc= github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= @@ -382,8 +387,10 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= -github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= @@ -511,6 +518,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -518,8 +526,9 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= @@ -529,10 +538,12 @@ github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefld github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM= github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= @@ -570,8 +581,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM= -golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= +golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -630,7 +641,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= +golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -702,8 +714,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -714,13 +727,14 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.2.0 h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE= +golang.org/x/time v0.2.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/internal/api/stats.go b/internal/api/stats.go new file mode 100644 index 0000000..da35c34 --- /dev/null +++ b/internal/api/stats.go @@ -0,0 +1,51 @@ +package api + +import ( + "net/http" + + "github.com/alitto/pond" + "github.com/grassrootseconomics/cic-chain-events/internal/syncer" + "github.com/labstack/echo/v4" + "github.com/zerodha/logf" +) + +type statsResponse struct { + HeadCursor uint64 `json:"headCursor"` + LowerBound uint64 `json:"lowerBound"` + MissingBlocks uint64 `json:"missingBlocks"` + WorkerQueueSize uint64 `json:"workerQueueSize"` + WorkerCount int `json:"workerCount"` + WorkerSuccessfulTasks uint64 `json:"workerSuccessfulTasks"` + WorkerFailedTasks uint64 `json:"workerFailedTasks"` +} + +func StatsHandler( + syncerStats *syncer.Stats, + poolStats *pond.WorkerPool, + logg logf.Logger, +) func(echo.Context) error { + return func(ctx echo.Context) error { + headCursor := syncerStats.GetHeadCursor() + lowerBound := syncerStats.GetLowerBound() + + stats := statsResponse{ + HeadCursor: headCursor, + LowerBound: lowerBound, + WorkerCount: poolStats.RunningWorkers(), + WorkerQueueSize: poolStats.WaitingTasks(), + WorkerSuccessfulTasks: poolStats.SuccessfulTasks(), + WorkerFailedTasks: poolStats.FailedTasks(), + } + + if headCursor-lowerBound < 10 { + stats.MissingBlocks = 0 + } else { + stats.MissingBlocks = headCursor - lowerBound + } + + return ctx.JSON(http.StatusOK, okResp{ + Ok: true, + Data: stats, + }) + } +} diff --git a/internal/api/types.go b/internal/api/types.go new file mode 100644 index 0000000..c115612 --- /dev/null +++ b/internal/api/types.go @@ -0,0 +1,15 @@ +package api + +const ( + INTERNAL_ERROR = "ERR_INTERNAL" +) + +type okResp struct { + Ok bool `json:"ok"` + Data interface{} `json:"data"` +} + +type errResp struct { + Ok bool `json:"ok"` + Error string `json:"error"` +} diff --git a/internal/exporter/exporter.go b/internal/exporter/exporter.go deleted file mode 100644 index 01897cc..0000000 --- a/internal/exporter/exporter.go +++ /dev/null @@ -1,24 +0,0 @@ -package exporter - -import ( - "github.com/VictoriaMetrics/metrics" - "github.com/grassrootseconomics/cic-chain-events/internal/syncer" -) - -func Register(stats *syncer.Stats) { - metrics.NewGauge("indexer_head_cursor", func() float64 { - return float64(stats.GetHeadCursor()) - }) - - metrics.NewGauge("indexer_lower_bound", func() float64 { - return float64(stats.GetLowerBound()) - }) - - metrics.NewGauge("indexer_missing_blocks", func() float64 { - if stats.GetHeadCursor()-stats.GetLowerBound() < 10 { - return float64(0) - } else { - return float64(stats.GetHeadCursor() - stats.GetLowerBound()) - } - }) -} diff --git a/internal/filter/address_filter.go b/internal/filter/address_filter.go new file mode 100644 index 0000000..365336e --- /dev/null +++ b/internal/filter/address_filter.go @@ -0,0 +1,32 @@ +package filter + +import ( + "github.com/grassrootseconomics/cic-chain-events/internal/fetch" + "github.com/zerodha/logf" +) + +const ( + cUSD = "0x765de816845861e75a25fca122bb6898b8b1282a" +) + +type AddressFilterOpts struct { + Logg logf.Logger +} + +type AddressFilter struct { + logg logf.Logger +} + +func NewAddressFilter(o AddressFilterOpts) Filter { + return &AddressFilter{ + logg: o.Logg, + } +} + +func (f *AddressFilter) Execute(transaction fetch.Transaction) (bool, error) { + if transaction.To.Address == cUSD { + return true, nil + } + + return false, nil +} diff --git a/internal/filter/noop_filter.go b/internal/filter/noop_filter.go index 2bb0592..fcfb5c1 100644 --- a/internal/filter/noop_filter.go +++ b/internal/filter/noop_filter.go @@ -20,6 +20,6 @@ func NewNoopFilter(o NoopFilterOpts) Filter { } func (f *NoopFilter) Execute(transaction fetch.Transaction) (bool, error) { - f.logg.Debug("noop filter", "block", transaction.Block.Number, "tx", transaction.Hash) + f.logg.Debug("noop filter", "block", transaction.Block.Number, "index", transaction.Index) return true, nil } diff --git a/internal/filter/transfer_filter.go b/internal/filter/transfer_filter.go new file mode 100644 index 0000000..a69c1b3 --- /dev/null +++ b/internal/filter/transfer_filter.go @@ -0,0 +1,33 @@ +package filter + +import ( + "github.com/grassrootseconomics/cic-chain-events/internal/fetch" + "github.com/zerodha/logf" +) + +type TransferFilterOpts struct { + Logg logf.Logger +} + +type TransferFilter struct { + logg logf.Logger +} + +func NewTransferFilter(o TransferFilterOpts) Filter { + return &TransferFilter{ + logg: o.Logg, + } +} + +func (f *TransferFilter) Execute(transaction fetch.Transaction) (bool, error) { + switch transaction.InputData[:10] { + case "0xa9059cbb": + f.logg.Info("cUSD transfer", "block", transaction.Block.Number, "index", transaction.Index) + case "0x23b872dd": + f.logg.Info("cUSD transferFrom", "block", transaction.Block.Number, "index", transaction.Index) + default: + f.logg.Info("cUSD otherMethod", "block", transaction.Block.Number, "index", transaction.Index) + } + + return true, nil +} diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 0aa1100..51eecd8 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -47,8 +47,7 @@ func (md *Pipeline) Run(blockNumber uint64) error { return err } if !next { - md.logg.Debug("pipeline exit") - return nil + break } } } diff --git a/internal/syncer/janitor.go b/internal/syncer/janitor.go index 734030c..e9a159a 100644 --- a/internal/syncer/janitor.go +++ b/internal/syncer/janitor.go @@ -98,7 +98,6 @@ func (j *Janitor) QueueMissingBlocks() error { return err } - j.logg.Debug("submitting missing block for processing", "block", n) j.pool.Submit(func() { if err := j.pipeline.Run(n); err != nil { j.logg.Error("pipeline run error", "error", err)