eth-tracker/internal/queue/queue.go
Mohammed Sohail fe01fd1f72
perf: increase worker pool queue buffer size 1 -> ~4k
* Previosuly the worker queue pool buffer size was 1 which applies a backpressure on the fast missing blocks producer

* Define our queue size and why we chose this value
2024-06-20 14:19:06 +08:00

69 lines
1.2 KiB
Go

package queue
import (
"context"
"log/slog"
"github.com/alitto/pond"
"github.com/grassrootseconomics/celo-tracker/internal/processor"
)
type (
QueueOpts struct {
QueueSize int
Logg *slog.Logger
Processor *processor.Processor
Pool *pond.WorkerPool
}
Queue struct {
logg *slog.Logger
processChan chan uint64
stopSignal chan interface{}
processor *processor.Processor
pool *pond.WorkerPool
}
)
func New(o QueueOpts) *Queue {
return &Queue{
logg: o.Logg,
processChan: make(chan uint64, o.QueueSize),
stopSignal: make(chan interface{}),
processor: o.Processor,
pool: o.Pool,
}
}
func (q *Queue) Stop() {
q.stopSignal <- struct{}{}
}
func (q *Queue) Process() {
for {
select {
case <-q.stopSignal:
q.logg.Info("shutdown signal received stopping queue processing")
return
case block, ok := <-q.processChan:
if !ok {
return
}
q.pool.Submit(func() {
err := q.processor.ProcessBlock(context.Background(), block)
if err != nil {
q.logg.Error("block processor error", "block_number", block, "error", err)
}
})
}
}
}
func (q *Queue) Push(block uint64) {
q.processChan <- block
}
func (q *Queue) Size() int {
return len(q.processChan)
}