From ef9f2b2b7ff94203195f75c5f2084aaf86f5cf5c Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Wed, 18 Jan 2023 22:40:14 +0300 Subject: [PATCH] hotfix: drain janitor queue, worker pool, js * fully drain worker queue before reading gap from db * dedicated go routine for head syncer * emit blok, tx index and hash to jetstream --- cmd/init.go | 10 ---------- cmd/main.go | 33 ++++++++++++++------------------- config.toml | 23 +++++++++++------------ internal/pipeline/pipeline.go | 2 -- internal/pool/pool.go | 10 +++++----- internal/syncer/janitor.go | 21 +++++++++++---------- pkg/fetch/graphql.go | 2 +- pkg/filter/decode_filter.go | 7 ++++--- 8 files changed, 46 insertions(+), 62 deletions(-) diff --git a/cmd/init.go b/cmd/init.go index 1a03296..c10f605 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -1,12 +1,9 @@ package main import ( - "context" "strings" "time" - "github.com/alitto/pond" - "github.com/grassrootseconomics/cic-chain-events/internal/pool" "github.com/grassrootseconomics/cic-chain-events/internal/store" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/jackc/pgx/v5" @@ -74,13 +71,6 @@ func initPgStore() (store.Store[pgx.Rows], error) { return pgStore, nil } -func initWorkerPool(ctx context.Context) *pond.WorkerPool { - return pool.NewPool(ctx, pool.Opts{ - ConcurrencyFactor: ko.MustInt("syncer.concurrency"), - PoolQueueSize: ko.MustInt("syncer.queue_size"), - }) -} - func initFetcher() fetch.Fetch { return fetch.NewGraphqlFetcher(fetch.GraphqlOpts{ GraphqlEndpoint: ko.MustString("chain.graphql_endpoint"), diff --git a/cmd/main.go b/cmd/main.go index 78d85bd..ba6fa26 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,8 +10,8 @@ import ( "syscall" "time" - "github.com/grassrootseconomics/cic-chain-events/internal/api" "github.com/grassrootseconomics/cic-chain-events/internal/pipeline" + "github.com/grassrootseconomics/cic-chain-events/internal/pool" "github.com/grassrootseconomics/cic-chain-events/internal/syncer" "github.com/grassrootseconomics/cic-chain-events/pkg/filter" "github.com/knadh/goyesql/v2" @@ -41,13 +41,6 @@ func init() { } func main() { - // p := profiler.New(profiler.Conf{ - // DirPath: "profiles", - // Quiet: true, - // NoShutdownHook: false, - // }, profiler.Cpu, profiler.Mem) - // p.Start() - syncerStats := &syncer.Stats{} wg := &sync.WaitGroup{} apiServer := initApiServer() @@ -55,7 +48,10 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - workerPool := initWorkerPool(ctx) + janitorWorkerPool := pool.NewPool(ctx, pool.Opts{ + Concurrency: ko.MustInt("syncer.janitor_concurrency"), + QueueSize: ko.MustInt("syncer.janitor_queue_size"), + }) pgStore, err := initPgStore() if err != nil { @@ -74,10 +70,15 @@ func main() { Store: pgStore, }) + headSyncerWorker := pool.NewPool(ctx, pool.Opts{ + Concurrency: 1, + QueueSize: 1, + }) + headSyncer, err := syncer.NewHeadSyncer(syncer.HeadSyncerOpts{ Logg: lo, Pipeline: pipeline, - Pool: workerPool, + Pool: headSyncerWorker, Stats: syncerStats, WsEndpoint: ko.MustString("chain.ws_endpoint"), }) @@ -86,18 +87,15 @@ func main() { } janitor := syncer.NewJanitor(syncer.JanitorOpts{ - BatchSize: uint64(ko.MustInt64("syncer.batch_size")), - HeadBlockLag: uint64(ko.MustInt64("syncer.head_block_lag")), + BatchSize: uint64(ko.MustInt64("syncer.janitor_queue_size")), Logg: lo, Pipeline: pipeline, - Pool: workerPool, + Pool: janitorWorkerPool, Stats: syncerStats, Store: pgStore, - SweepInterval: time.Second * time.Duration(ko.MustInt64("syncer.sweep_interval")), + SweepInterval: time.Second * time.Duration(ko.MustInt64("syncer.janitor_sweep_interval")), }) - apiServer.GET("/stats", api.StatsHandler(syncerStats, workerPool)) - wg.Add(1) go func() { defer wg.Done() @@ -129,12 +127,9 @@ func main() { <-ctx.Done() - workerPool.Stop() - if err := apiServer.Shutdown(ctx); err != nil { lo.Error("main: could not gracefully shutdown api server", "err", err) } wg.Wait() - // p.Stop() } diff --git a/config.toml b/config.toml index 7de922c..c8dcbf5 100644 --- a/config.toml +++ b/config.toml @@ -10,24 +10,19 @@ address = ":8080" # Geth API endpoints [chain] graphql_endpoint = "https://rpc.celo.grassecon.net/graphql" -ws_endpoint = "wss://ws.celo.grassecon.net" +ws_endpoint = "wss://socket.celo.grassecon.net" -# Syncer configs [syncer] -# Maximum number of missing blocks pushed into the worker queue every janitor sweep -batch_size = 200 # Number of goroutines assigned to the worker pool -concurrency = 3 -# Prevents reprocessing head block already in queue -head_block_lag = 5 +janitor_concurrency = 5 # Max idle time after which goroutine is returned back to the pool -idle_worker_timeout = 1 +idle_worker_timeout = 1 # Syncer start block -initial_lower_bound = 17204504 +initial_lower_bound = 17269000 # Max blocks in worker queue awaiting processing -queue_size = 500 +janitor_queue_size = 500 # Janitor sweep interval, should take into account concurrency and queue_size -sweep_interval = 10 +janitor_sweep_interval = 5 [postgres] dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events" @@ -45,4 +40,8 @@ stream_subjects = [ "CHAIN.transfer", "CHAIN.transferFrom", "CHAIN.mintTo" -] \ No newline at end of file +] + +# 77G - snapshot +# 111G - decompressed +# 112G - latest \ No newline at end of file diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index b6f589c..c85898d 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -42,7 +42,6 @@ func NewPipeline(o PipelineOpts) *Pipeline { // - Blocks are processed atomically, a failure in-between will process the block from the start // - Therefore, any side effect/event sink in the filter should support dedup func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error { - md.logg.Debug("pipeline: processing block", "block", blockNumber) fetchResp, err := md.fetch.Block(ctx, blockNumber) if err != nil { return err @@ -63,7 +62,6 @@ func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error { if err := md.store.CommitBlock(ctx, blockNumber); err != nil { return err } - md.logg.Debug("pipeline: committed block", "block", blockNumber) return nil } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index aa20b6c..cc5f9f5 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -8,16 +8,16 @@ import ( ) type Opts struct { - ConcurrencyFactor int - PoolQueueSize int + Concurrency int + QueueSize int } // NewPool creates a fixed size (and buffered) go routine worker pool. func NewPool(ctx context.Context, o Opts) *pond.WorkerPool { return pond.New( - o.ConcurrencyFactor, - o.PoolQueueSize, - pond.MinWorkers(o.ConcurrencyFactor), + o.Concurrency, + o.QueueSize, + pond.MinWorkers(o.Concurrency), pond.IdleTimeout(time.Second*1), pond.Context(ctx), ) diff --git a/internal/syncer/janitor.go b/internal/syncer/janitor.go index b721bd7..c78ab52 100644 --- a/internal/syncer/janitor.go +++ b/internal/syncer/janitor.go @@ -11,9 +11,12 @@ import ( "github.com/zerodha/logf" ) +const ( + headBlockLag = 5 +) + type JanitorOpts struct { BatchSize uint64 - HeadBlockLag uint64 Logg logf.Logger Pipeline *pipeline.Pipeline Pool *pond.WorkerPool @@ -24,7 +27,6 @@ type JanitorOpts struct { type Janitor struct { batchSize uint64 - headBlockLag uint64 pipeline *pipeline.Pipeline logg logf.Logger pool *pond.WorkerPool @@ -36,7 +38,6 @@ type Janitor struct { func NewJanitor(o JanitorOpts) *Janitor { return &Janitor{ batchSize: o.BatchSize, - headBlockLag: o.HeadBlockLag, logg: o.Logg, pipeline: o.Pipeline, pool: o.Pool, @@ -73,8 +74,8 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error { return nil } - if j.pool.WaitingTasks() >= j.batchSize { - j.logg.Warn("janitor: (skipping) avoiding queue pressure") + if j.pool.WaitingTasks() != 0 { + j.logg.Debug("janitor: (skipping) queue has pending jobs", "pending_jobs", j.pool.WaitingTasks()) return nil } @@ -82,7 +83,7 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error { ctx, j.batchSize, j.stats.GetHeadCursor(), - j.headBlockLag, + headBlockLag, ) if err != nil { return err @@ -95,13 +96,13 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error { rowsProcessed := 0 for rows.Next() { - var n uint64 - if err := rows.Scan(&n); err != nil { + var blockNumber uint64 + if err := rows.Scan(&blockNumber); err != nil { return err } j.pool.Submit(func() { - if err := j.pipeline.Run(ctx, n); err != nil { + if err := j.pipeline.Run(ctx, blockNumber); err != nil { j.logg.Error("janitor: pipeline run error", "error", err) } }) @@ -110,7 +111,7 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error { j.logg.Debug("janitor: missing blocks count", "count", rowsProcessed) if rowsProcessed == 0 { - j.logg.Debug("janitor: rasing lower bound") + j.logg.Info("janitor: no gap! rasing lower bound", "new_lower_bound", upperBound) j.stats.UpdateLowerBound(upperBound) j.store.SetSearchLowerBound(ctx, upperBound) } diff --git a/pkg/fetch/graphql.go b/pkg/fetch/graphql.go index 7440494..6c406e9 100644 --- a/pkg/fetch/graphql.go +++ b/pkg/fetch/graphql.go @@ -26,7 +26,7 @@ type Graphql struct { func NewGraphqlFetcher(o GraphqlOpts) Fetch { return &Graphql{ httpClient: &http.Client{ - Timeout: time.Second * 2, + Timeout: time.Second * 5, }, graphqlEndpoint: o.GraphqlEndpoint, } diff --git a/pkg/filter/decode_filter.go b/pkg/filter/decode_filter.go index 9324666..bd17655 100644 --- a/pkg/filter/decode_filter.go +++ b/pkg/filter/decode_filter.go @@ -2,6 +2,7 @@ package filter import ( "context" + "fmt" "math/big" "github.com/celo-org/celo-blockchain/common" @@ -46,7 +47,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) return false, err } - _, err := f.js.Publish("CHAIN.transfer", []byte(transaction.Hash), nats.MsgId(transaction.Hash)) + _, err := f.js.Publish("CHAIN.transfer", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash)) if err != nil { return false, err } @@ -63,7 +64,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) return false, err } - _, err := f.js.Publish("CHAIN.transferFrom", []byte(transaction.Hash), nats.MsgId(transaction.Hash)) + _, err := f.js.Publish("CHAIN.transferFrom", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash)) if err != nil { return false, err } @@ -79,7 +80,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) return false, err } - _, err := f.js.Publish("CHAIN.mintTo", []byte(transaction.Hash), nats.MsgId(transaction.Hash)) + _, err := f.js.Publish("CHAIN.mintTo", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash)) if err != nil { return false, err }