eth-tracker/internal/queue/queue.go

69 lines
1.2 KiB
Go
Raw Permalink Normal View History

2024-05-23 08:41:39 +02:00
package queue
import (
"context"
"log/slog"
"github.com/alitto/pond"
"github.com/grassrootseconomics/celo-tracker/internal/processor"
)
type (
QueueOpts struct {
QueueSize int
2024-05-23 08:41:39 +02:00
Logg *slog.Logger
Processor *processor.Processor
Pool *pond.WorkerPool
}
Queue struct {
logg *slog.Logger
processChan chan uint64
stopSignal chan interface{}
processor *processor.Processor
pool *pond.WorkerPool
}
)
func New(o QueueOpts) *Queue {
return &Queue{
logg: o.Logg,
processChan: make(chan uint64, o.QueueSize),
2024-05-23 08:41:39 +02:00
stopSignal: make(chan interface{}),
processor: o.Processor,
pool: o.Pool,
}
}
func (q *Queue) Stop() {
q.stopSignal <- struct{}{}
}
func (q *Queue) Process() {
for {
select {
case <-q.stopSignal:
q.logg.Info("shutdown signal received stopping queue processing")
return
case block, ok := <-q.processChan:
if !ok {
return
}
q.pool.Submit(func() {
err := q.processor.ProcessBlock(context.Background(), block)
if err != nil {
q.logg.Error("block processor error", "block_number", block, "error", err)
}
})
}
}
}
func (q *Queue) Push(block uint64) {
q.processChan <- block
}
func (q *Queue) Size() int {
return len(q.processChan)
}