add ticker to verifier

This commit is contained in:
Mohamed Sohail 2024-05-06 14:50:44 +08:00
parent 73ea42bfa5
commit ab750b7e28
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"log/slog" "log/slog"
"time"
"github.com/gammazero/workerpool" "github.com/gammazero/workerpool"
"github.com/grassrootseconomics/celo-tracker/internal/db" "github.com/grassrootseconomics/celo-tracker/internal/db"
@ -34,7 +35,8 @@ type (
) )
const ( const (
blockBatchSize = 25 blockBatchSize = 25
verifierInterval = 1 * time.Minute
) )
func New(o VerifierOpts) *Verifier { func New(o VerifierOpts) *Verifier {
@ -48,27 +50,67 @@ func New(o VerifierOpts) *Verifier {
} }
} }
func (v *Verifier) Start() func (v *Verifier) Start() {
ticker := time.NewTicker(verifierInterval)
func (v *Verifier) getMissingBlocks() error { for {
select {
case <-v.quit:
v.logg.Info("janitor: shutdown signal received")
return
case <-ticker.C:
batch, err := v.getMissingBlocks()
if err != nil {
v.logg.Error("verifier error getting missing blocks", "err", err)
}
if batch != nil {
v.logg.Info("verifier found missing block gap requeuing missing blocks")
blocks, err := v.chain.GetBlocks(context.Background(), batch)
if err != nil {
v.logg.Error("batch blocks fetcher error", "error", "block_range", fmt.Sprintf("%d-%d", batch[0], batch[len(batch)-1]), "error", err)
}
for _, block := range blocks {
v.blockWorker.Submit(func() {
if err := v.blockProcessor.ProcessBlock(context.Background(), block); err != nil {
v.logg.Error("block processor error", "source", "verifier", "block", block.NumberU64(), "error", err)
}
})
}
} else {
v.logg.Debug("verifier found no missing blocks running db compactor")
if err := v.db.Cleanup(); err != nil {
v.logg.Error("verifier compactor error", "error", err)
}
}
}
}
}
func (v *Verifier) Stop() {
// TODO: Run with sync.Once
v.quit <- struct{}{}
}
func (v *Verifier) getMissingBlocks() ([]uint64, error) {
lower, err := v.db.GetLowerBound() lower, err := v.db.GetLowerBound()
if err != nil { if err != nil {
return err return nil, err
} }
upper, err := v.db.GetUpperBound() upper, err := v.db.GetUpperBound()
if err != nil { if err != nil {
return err return nil, err
} }
missingBlocks, err := v.db.GetMissingValuesBitSet(lower, upper-1) missingBlocks, err := v.db.GetMissingValuesBitSet(lower, upper-1)
if err != nil { if err != nil {
return err return nil, err
} }
missingBlocksCount := missingBlocks.Count() missingBlocksCount := missingBlocks.Count()
if missingBlocksCount > 0 { if missingBlocksCount > 0 {
v.logg.Info("verifier found block gap", "missing_blocks_count", missingBlocksCount, "lower_bound", lower, "upper_bound", upper)
buffer := make([]uint, missingBlocksCount) buffer := make([]uint, missingBlocksCount)
missingBlocks.NextSetMany(0, buffer) missingBlocks.NextSetMany(0, buffer)
@ -83,29 +125,9 @@ func (v *Verifier) getMissingBlocks() error {
batch[j-i] = uint64(buffer[j]) batch[j-i] = uint64(buffer[j])
} }
v.processMissingBlocksBatch(batch) return batch, nil
}
} else {
v.logg.Debug("verifier running db compactor")
if err := v.db.Cleanup(); err != nil {
v.logg.Error("verifier compactor error", "error", err)
} }
} }
return nil return nil, nil
}
func (v *Verifier) processMissingBlocksBatch(batch []uint64) {
blocks, err := v.chain.GetBlocks(context.Background(), batch)
if err != nil {
v.logg.Error("batch blocks fetcher error", "error", "block_range", fmt.Sprintf("%d-%d", batch[0], batch[len(batch)-1]), "error", err)
}
for _, block := range blocks {
v.blockWorker.Submit(func() {
if err := v.blockProcessor.ProcessBlock(context.Background(), block); err != nil {
v.logg.Error("block processor error", "source", "verifier", "block", block.NumberU64(), "error", err)
}
})
}
} }