diff --git a/internal/verifier/verifier.go b/internal/verifier/verifier.go index b710f11..d493afd 100644 --- a/internal/verifier/verifier.go +++ b/internal/verifier/verifier.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/gammazero/workerpool" "github.com/grassrootseconomics/celo-tracker/internal/db" @@ -34,7 +35,8 @@ type ( ) const ( - blockBatchSize = 25 + blockBatchSize = 25 + verifierInterval = 1 * time.Minute ) func New(o VerifierOpts) *Verifier { @@ -48,27 +50,67 @@ func New(o VerifierOpts) *Verifier { } } -func (v *Verifier) Start() +func (v *Verifier) Start() { + ticker := time.NewTicker(verifierInterval) -func (v *Verifier) getMissingBlocks() error { + for { + select { + case <-v.quit: + v.logg.Info("janitor: shutdown signal received") + return + case <-ticker.C: + batch, err := v.getMissingBlocks() + if err != nil { + v.logg.Error("verifier error getting missing blocks", "err", err) + } + + if batch != nil { + v.logg.Info("verifier found missing block gap requeuing missing blocks") + blocks, err := v.chain.GetBlocks(context.Background(), batch) + if err != nil { + v.logg.Error("batch blocks fetcher error", "error", "block_range", fmt.Sprintf("%d-%d", batch[0], batch[len(batch)-1]), "error", err) + } + + for _, block := range blocks { + v.blockWorker.Submit(func() { + if err := v.blockProcessor.ProcessBlock(context.Background(), block); err != nil { + v.logg.Error("block processor error", "source", "verifier", "block", block.NumberU64(), "error", err) + } + }) + } + } else { + v.logg.Debug("verifier found no missing blocks running db compactor") + if err := v.db.Cleanup(); err != nil { + v.logg.Error("verifier compactor error", "error", err) + } + } + } + } +} + +func (v *Verifier) Stop() { + // TODO: Run with sync.Once + v.quit <- struct{}{} +} + +func (v *Verifier) getMissingBlocks() ([]uint64, error) { lower, err := v.db.GetLowerBound() if err != nil { - return err + return nil, err } upper, err := v.db.GetUpperBound() if err != nil { - return err + return nil, err } missingBlocks, err := v.db.GetMissingValuesBitSet(lower, upper-1) if err != nil { - return err + return nil, err } missingBlocksCount := missingBlocks.Count() if missingBlocksCount > 0 { - v.logg.Info("verifier found block gap", "missing_blocks_count", missingBlocksCount, "lower_bound", lower, "upper_bound", upper) buffer := make([]uint, missingBlocksCount) missingBlocks.NextSetMany(0, buffer) @@ -83,29 +125,9 @@ func (v *Verifier) getMissingBlocks() error { batch[j-i] = uint64(buffer[j]) } - v.processMissingBlocksBatch(batch) - } - } else { - v.logg.Debug("verifier running db compactor") - if err := v.db.Cleanup(); err != nil { - v.logg.Error("verifier compactor error", "error", err) + return batch, nil } } - return nil -} - -func (v *Verifier) processMissingBlocksBatch(batch []uint64) { - blocks, err := v.chain.GetBlocks(context.Background(), batch) - if err != nil { - v.logg.Error("batch blocks fetcher error", "error", "block_range", fmt.Sprintf("%d-%d", batch[0], batch[len(batch)-1]), "error", err) - } - - for _, block := range blocks { - v.blockWorker.Submit(func() { - if err := v.blockProcessor.ProcessBlock(context.Background(), block); err != nil { - v.logg.Error("block processor error", "source", "verifier", "block", block.NumberU64(), "error", err) - } - }) - } + return nil, nil }