perf: increase worker pool queue buffer size 1 -> ~4k

* Previosuly the worker queue pool buffer size was 1 which applies a backpressure on the fast missing blocks producer

* Define our queue size and why we chose this value
This commit is contained in:
Mohamed Sohail 2024-06-20 14:19:06 +08:00
parent 1419caeea7
commit fe01fd1f72
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
4 changed files with 34 additions and 21 deletions

View File

@ -27,7 +27,12 @@ import (
"github.com/knadh/koanf/v2" "github.com/knadh/koanf/v2"
) )
const defaultGracefulShutdownPeriod = time.Second * 30 const (
defaultGracefulShutdownPeriod = time.Second * 30
// 24 hrs worth of blocks
defaultMaxQueueSize = 17_280
)
var ( var (
build = "dev" build = "dev"
@ -113,8 +118,11 @@ func main() {
poolOpts := pool.PoolOpts{ poolOpts := pool.PoolOpts{
Logg: lo, Logg: lo,
WorkerCount: ko.Int("core.pool_size"), WorkerCount: ko.Int("core.pool_size"),
// Immidiately allow processing of upto 6 hrs of missing blocks
BlocksBuffer: defaultMaxQueueSize / 4,
} }
if ko.Int("core.pool_size") <= 0 { if ko.Int("core.pool_size") <= 0 {
// TODO: Benchamrk to determine optimum size
poolOpts.WorkerCount = runtime.NumCPU() * 3 poolOpts.WorkerCount = runtime.NumCPU() * 3
} }
workerPool := pool.NewPool(poolOpts) workerPool := pool.NewPool(poolOpts)
@ -155,6 +163,7 @@ func main() {
} }
backfiller := backfiller.New(backfiller.BackfillerOpts{ backfiller := backfiller.New(backfiller.BackfillerOpts{
MaxQueueSize: defaultMaxQueueSize,
DB: db, DB: db,
Logg: lo, Logg: lo,
Queue: queue, Queue: queue,

View File

@ -12,12 +12,14 @@ import (
type ( type (
BackfillerOpts struct { BackfillerOpts struct {
MaxQueueSize int
DB db.DB DB db.DB
Logg *slog.Logger Logg *slog.Logger
Queue *queue.Queue Queue *queue.Queue
} }
backfiller struct { backfiller struct {
maxQueueSize int
db db.DB db db.DB
logg *slog.Logger logg *slog.Logger
queue *queue.Queue queue *queue.Queue
@ -26,10 +28,7 @@ type (
} }
) )
const ( const verifierInterval = 20 * time.Second
verifierInterval = 20 * time.Second
epochBlocksCount = 17_280
)
func New(o BackfillerOpts) *backfiller { func New(o BackfillerOpts) *backfiller {
return &backfiller{ return &backfiller{
@ -58,6 +57,9 @@ func (b *backfiller) Start() {
if err := b.Run(true); err != nil { if err := b.Run(true); err != nil {
b.logg.Error("verifier tick run error", "err", err) b.logg.Error("verifier tick run error", "err", err)
} }
b.logg.Debug("verifier successful run", "queue_size", b.queue.Size())
} else {
b.logg.Debug("skipping verifier run")
} }
} }
} }
@ -84,7 +86,7 @@ func (b *backfiller) Run(skipLatest bool) error {
missingBlocksCount := missingBlocks.Count() missingBlocksCount := missingBlocks.Count()
if missingBlocksCount > 0 { if missingBlocksCount > 0 {
if missingBlocksCount >= epochBlocksCount { if missingBlocksCount >= uint(b.maxQueueSize) {
b.logg.Warn("large number of blocks missing this may result in degraded RPC performance set FORCE_BACKFILL=* to continue", "missing_blocks", missingBlocksCount) b.logg.Warn("large number of blocks missing this may result in degraded RPC performance set FORCE_BACKFILL=* to continue", "missing_blocks", missingBlocksCount)
_, ok := os.LookupEnv("FORCE_BACKFILL") _, ok := os.LookupEnv("FORCE_BACKFILL")
if !ok { if !ok {

View File

@ -10,12 +10,13 @@ import (
type PoolOpts struct { type PoolOpts struct {
Logg *slog.Logger Logg *slog.Logger
WorkerCount int WorkerCount int
BlocksBuffer int
} }
func NewPool(o PoolOpts) *pond.WorkerPool { func NewPool(o PoolOpts) *pond.WorkerPool {
return pond.New( return pond.New(
o.WorkerCount, o.WorkerCount,
1, o.BlocksBuffer,
pond.Strategy(pond.Balanced()), pond.Strategy(pond.Balanced()),
pond.PanicHandler(panicHandler(o.Logg)), pond.PanicHandler(panicHandler(o.Logg)),
) )

View File

@ -10,6 +10,7 @@ import (
type ( type (
QueueOpts struct { QueueOpts struct {
QueueSize int
Logg *slog.Logger Logg *slog.Logger
Processor *processor.Processor Processor *processor.Processor
Pool *pond.WorkerPool Pool *pond.WorkerPool
@ -27,7 +28,7 @@ type (
func New(o QueueOpts) *Queue { func New(o QueueOpts) *Queue {
return &Queue{ return &Queue{
logg: o.Logg, logg: o.Logg,
processChan: make(chan uint64, 17_280), processChan: make(chan uint64, o.QueueSize),
stopSignal: make(chan interface{}), stopSignal: make(chan interface{}),
processor: o.Processor, processor: o.Processor,
pool: o.Pool, pool: o.Pool,