mirror of
https://github.com/grassrootseconomics/eth-tracker.git
synced 2025-04-24 09:21:01 +02:00
96 lines
2.3 KiB
Go
96 lines
2.3 KiB
Go
package processor
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/alitto/pond"
|
|
"github.com/celo-org/celo-blockchain/core/types"
|
|
"github.com/ef-ds/deque/v2"
|
|
"github.com/grassrootseconomics/celo-tracker/internal/cache"
|
|
"github.com/grassrootseconomics/celo-tracker/internal/chain"
|
|
"github.com/grassrootseconomics/celo-tracker/internal/db"
|
|
"github.com/grassrootseconomics/celo-tracker/internal/handler"
|
|
"github.com/grassrootseconomics/celo-tracker/internal/pool"
|
|
"github.com/grassrootseconomics/celo-tracker/internal/pub"
|
|
"github.com/grassrootseconomics/celo-tracker/internal/stats"
|
|
)
|
|
|
|
type (
|
|
ProcessorOpts struct {
|
|
Chain *chain.Chain
|
|
BlocksQueue *deque.Deque[types.Block]
|
|
Logg *slog.Logger
|
|
Stats *stats.Stats
|
|
DB *db.DB
|
|
Cache cache.Cache
|
|
Pub pub.Pub
|
|
}
|
|
|
|
Processor struct {
|
|
chain *chain.Chain
|
|
pool *pond.WorkerPool
|
|
blocksQueue *deque.Deque[types.Block]
|
|
logg *slog.Logger
|
|
stats *stats.Stats
|
|
db *db.DB
|
|
quit chan struct{}
|
|
handlers []handler.Handler
|
|
cache cache.Cache
|
|
pub pub.Pub
|
|
}
|
|
)
|
|
|
|
const (
|
|
emptyQueueIdleTime = 1 * time.Second
|
|
)
|
|
|
|
func NewProcessor(o ProcessorOpts) *Processor {
|
|
return &Processor{
|
|
chain: o.Chain,
|
|
pool: pool.NewPool(o.Logg),
|
|
blocksQueue: o.BlocksQueue,
|
|
logg: o.Logg,
|
|
stats: o.Stats,
|
|
db: o.DB,
|
|
quit: make(chan struct{}),
|
|
handlers: handler.New(o.Cache),
|
|
cache: o.Cache,
|
|
pub: o.Pub,
|
|
}
|
|
}
|
|
|
|
func (p *Processor) Start() {
|
|
p.logg.Info("processor started")
|
|
for {
|
|
select {
|
|
case <-p.quit:
|
|
p.logg.Info("processor stopped, draining workerpool queue")
|
|
p.pool.StopAndWait()
|
|
if err := p.db.Close(); err != nil {
|
|
p.logg.Info("error closing db", "error", err)
|
|
}
|
|
return
|
|
default:
|
|
if p.blocksQueue.Len() > 0 {
|
|
v, _ := p.blocksQueue.PopFront()
|
|
p.pool.Submit(func() {
|
|
p.logg.Debug("processing", "block", v.Number())
|
|
if err := p.processBlock(context.Background(), v); err != nil {
|
|
p.logg.Info("block processor error", "block", v.NumberU64(), "error", err)
|
|
}
|
|
})
|
|
} else {
|
|
time.Sleep(emptyQueueIdleTime)
|
|
p.logg.Debug("processor queue empty slept for 1 second")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Processor) Stop() {
|
|
p.logg.Info("signaling processor shutdown")
|
|
p.quit <- struct{}{}
|
|
}
|