package processor import ( "context" "log/slog" "time" "github.com/alitto/pond" "github.com/celo-org/celo-blockchain/core/types" "github.com/ef-ds/deque/v2" "github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/chain" "github.com/grassrootseconomics/celo-tracker/internal/db" "github.com/grassrootseconomics/celo-tracker/internal/handler" "github.com/grassrootseconomics/celo-tracker/internal/pool" "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/celo-tracker/internal/stats" ) type ( ProcessorOpts struct { Chain *chain.Chain BlocksQueue *deque.Deque[types.Block] Logg *slog.Logger Stats *stats.Stats DB *db.DB Cache cache.Cache Pub pub.Pub } Processor struct { chain *chain.Chain pool *pond.WorkerPool blocksQueue *deque.Deque[types.Block] logg *slog.Logger stats *stats.Stats db *db.DB quit chan struct{} handlers []handler.Handler cache cache.Cache pub pub.Pub } ) const ( emptyQueueIdleTime = 1 * time.Second ) func NewProcessor(o ProcessorOpts) *Processor { return &Processor{ chain: o.Chain, pool: pool.NewPool(o.Logg), blocksQueue: o.BlocksQueue, logg: o.Logg, stats: o.Stats, db: o.DB, quit: make(chan struct{}), handlers: handler.New(o.Cache), cache: o.Cache, pub: o.Pub, } } func (p *Processor) Start() { p.logg.Info("processor started") for { select { case <-p.quit: p.logg.Info("processor stopped, draining workerpool queue") p.pool.StopAndWait() if err := p.db.Close(); err != nil { p.logg.Info("error closing db", "error", err) } return default: if p.blocksQueue.Len() > 0 { v, _ := p.blocksQueue.PopFront() p.pool.Submit(func() { p.logg.Debug("processing", "block", v.Number()) if err := p.processBlock(context.Background(), v); err != nil { p.logg.Info("block processor error", "block", v.NumberU64(), "error", err) } }) } else { time.Sleep(emptyQueueIdleTime) p.logg.Debug("processor queue empty slept for 1 second") } } } } func (p *Processor) Stop() { p.logg.Info("signaling processor shutdown") p.quit <- struct{}{} }