deps: upgrade, deprecate redis, upgrade pond worker

This commit is contained in:
2025-05-26 13:00:34 +03:00
parent 20434beb17
commit f90cc75323
8 changed files with 121 additions and 238 deletions

View File

@@ -3,15 +3,13 @@ package pool
import (
"context"
"log/slog"
"runtime/debug"
"github.com/alitto/pond"
"github.com/alitto/pond/v2"
"github.com/grassrootseconomics/eth-tracker/internal/processor"
)
type (
PoolOpts struct {
BatchSize int
Logg *slog.Logger
WorkerCount int
Processor *processor.Processor
@@ -19,7 +17,7 @@ type (
Pool struct {
logg *slog.Logger
workerPool *pond.WorkerPool
workerPool pond.Pool
processor *processor.Processor
}
)
@@ -27,11 +25,8 @@ type (
func New(o PoolOpts) *Pool {
return &Pool{
logg: o.Logg,
workerPool: pond.New(
workerPool: pond.NewPool(
o.WorkerCount,
o.BatchSize,
pond.Strategy(pond.Balanced()),
pond.PanicHandler(panicHandler(o.Logg)),
),
processor: o.Processor,
}
@@ -41,6 +36,7 @@ func (p *Pool) Stop() {
p.workerPool.StopAndWait()
}
// non-blocking
func (p *Pool) Push(block uint64) {
p.workerPool.Submit(func() {
err := p.processor.ProcessBlock(context.Background(), block)
@@ -54,12 +50,6 @@ func (p *Pool) Size() uint64 {
return p.workerPool.WaitingTasks()
}
func (p *Pool) ActiveWorkers() int {
func (p *Pool) ActiveWorkers() int64 {
return p.workerPool.RunningWorkers()
}
func panicHandler(logg *slog.Logger) func(interface{}) {
return func(panic interface{}) {
logg.Error("block processor goroutine exited from a panic", "error", panic, "stack_trace", string(debug.Stack()))
}
}