From fe01fd1f72131c0cf33372d2e8cbcdbf1c06fbd7 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 20 Jun 2024 14:19:06 +0800 Subject: [PATCH] perf: increase worker pool queue buffer size 1 -> ~4k * Previosuly the worker queue pool buffer size was 1 which applies a backpressure on the fast missing blocks producer * Define our queue size and why we chose this value --- cmd/main.go | 17 +++++++++++++---- internal/backfiller/backfiller.go | 28 +++++++++++++++------------- internal/pool/pool.go | 7 ++++--- internal/queue/queue.go | 3 ++- 4 files changed, 34 insertions(+), 21 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index be4ba8c..7ff9f9c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -27,7 +27,12 @@ import ( "github.com/knadh/koanf/v2" ) -const defaultGracefulShutdownPeriod = time.Second * 30 +const ( + defaultGracefulShutdownPeriod = time.Second * 30 + + // 24 hrs worth of blocks + defaultMaxQueueSize = 17_280 +) var ( build = "dev" @@ -113,8 +118,11 @@ func main() { poolOpts := pool.PoolOpts{ Logg: lo, WorkerCount: ko.Int("core.pool_size"), + // Immidiately allow processing of upto 6 hrs of missing blocks + BlocksBuffer: defaultMaxQueueSize / 4, } if ko.Int("core.pool_size") <= 0 { + // TODO: Benchamrk to determine optimum size poolOpts.WorkerCount = runtime.NumCPU() * 3 } workerPool := pool.NewPool(poolOpts) @@ -155,9 +163,10 @@ func main() { } backfiller := backfiller.New(backfiller.BackfillerOpts{ - DB: db, - Logg: lo, - Queue: queue, + MaxQueueSize: defaultMaxQueueSize, + DB: db, + Logg: lo, + Queue: queue, }) apiServer := &http.Server{ diff --git a/internal/backfiller/backfiller.go b/internal/backfiller/backfiller.go index 38c3f77..d970794 100644 --- a/internal/backfiller/backfiller.go +++ b/internal/backfiller/backfiller.go @@ -12,24 +12,23 @@ import ( type ( BackfillerOpts struct { - DB db.DB - Logg *slog.Logger - Queue *queue.Queue + MaxQueueSize int + DB db.DB + Logg *slog.Logger + Queue *queue.Queue } backfiller struct { - db db.DB - logg *slog.Logger - queue *queue.Queue - stopCh chan struct{} - ticker *time.Ticker + maxQueueSize int + db db.DB + logg *slog.Logger + queue *queue.Queue + stopCh chan struct{} + ticker *time.Ticker } ) -const ( - verifierInterval = 20 * time.Second - epochBlocksCount = 17_280 -) +const verifierInterval = 20 * time.Second func New(o BackfillerOpts) *backfiller { return &backfiller{ @@ -58,6 +57,9 @@ func (b *backfiller) Start() { if err := b.Run(true); err != nil { b.logg.Error("verifier tick run error", "err", err) } + b.logg.Debug("verifier successful run", "queue_size", b.queue.Size()) + } else { + b.logg.Debug("skipping verifier run") } } } @@ -84,7 +86,7 @@ func (b *backfiller) Run(skipLatest bool) error { missingBlocksCount := missingBlocks.Count() if missingBlocksCount > 0 { - if missingBlocksCount >= epochBlocksCount { + if missingBlocksCount >= uint(b.maxQueueSize) { b.logg.Warn("large number of blocks missing this may result in degraded RPC performance set FORCE_BACKFILL=* to continue", "missing_blocks", missingBlocksCount) _, ok := os.LookupEnv("FORCE_BACKFILL") if !ok { diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 40a8cdb..a9eec1b 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -8,14 +8,15 @@ import ( ) type PoolOpts struct { - Logg *slog.Logger - WorkerCount int + Logg *slog.Logger + WorkerCount int + BlocksBuffer int } func NewPool(o PoolOpts) *pond.WorkerPool { return pond.New( o.WorkerCount, - 1, + o.BlocksBuffer, pond.Strategy(pond.Balanced()), pond.PanicHandler(panicHandler(o.Logg)), ) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 0bc6a0e..9e79434 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -10,6 +10,7 @@ import ( type ( QueueOpts struct { + QueueSize int Logg *slog.Logger Processor *processor.Processor Pool *pond.WorkerPool @@ -27,7 +28,7 @@ type ( func New(o QueueOpts) *Queue { return &Queue{ logg: o.Logg, - processChan: make(chan uint64, 17_280), + processChan: make(chan uint64, o.QueueSize), stopSignal: make(chan interface{}), processor: o.Processor, pool: o.Pool,