eth-tracker/internal/pool/pool.go

56 lines
957 B
Go

package pool
import (
"context"
"log/slog"
"github.com/alitto/pond/v2"
"github.com/grassrootseconomics/eth-tracker/internal/processor"
)
type (
PoolOpts struct {
Logg *slog.Logger
WorkerCount int
Processor *processor.Processor
}
Pool struct {
logg *slog.Logger
workerPool pond.Pool
processor *processor.Processor
}
)
func New(o PoolOpts) *Pool {
return &Pool{
logg: o.Logg,
workerPool: pond.NewPool(
o.WorkerCount,
),
processor: o.Processor,
}
}
func (p *Pool) Stop() {
p.workerPool.StopAndWait()
}
// non-blocking
func (p *Pool) Push(block uint64) {
p.workerPool.Submit(func() {
err := p.processor.ProcessBlock(context.Background(), block)
if err != nil {
p.logg.Error("block processor error", "block_number", block, "error", err)
}
})
}
func (p *Pool) Size() uint64 {
return p.workerPool.WaitingTasks()
}
func (p *Pool) ActiveWorkers() int64 {
return p.workerPool.RunningWorkers()
}