2024-05-23 08:41:39 +02:00
package backfiller
import (
"fmt"
"log/slog"
"os"
"time"
"github.com/grassrootseconomics/celo-tracker/internal/db"
"github.com/grassrootseconomics/celo-tracker/internal/queue"
)
type (
BackfillerOpts struct {
2024-06-20 08:19:06 +02:00
MaxQueueSize int
DB db . DB
Logg * slog . Logger
Queue * queue . Queue
2024-05-23 08:41:39 +02:00
}
backfiller struct {
2024-06-20 08:19:06 +02:00
maxQueueSize int
db db . DB
logg * slog . Logger
queue * queue . Queue
stopCh chan struct { }
ticker * time . Ticker
2024-05-23 08:41:39 +02:00
}
)
2024-06-20 08:19:06 +02:00
const verifierInterval = 20 * time . Second
2024-05-23 08:41:39 +02:00
func New ( o BackfillerOpts ) * backfiller {
return & backfiller {
2024-08-05 10:45:14 +02:00
maxQueueSize : o . MaxQueueSize ,
db : o . DB ,
logg : o . Logg ,
queue : o . Queue ,
stopCh : make ( chan struct { } ) ,
ticker : time . NewTicker ( verifierInterval ) ,
2024-05-23 08:41:39 +02:00
}
}
func ( b * backfiller ) Stop ( ) {
b . ticker . Stop ( )
b . stopCh <- struct { } { }
}
func ( b * backfiller ) Start ( ) {
for {
select {
case <- b . stopCh :
b . logg . Info ( "verifier shutting down" )
b . ticker . Stop ( )
return
case <- b . ticker . C :
if b . queue . Size ( ) <= 1 {
if err := b . Run ( true ) ; err != nil {
b . logg . Error ( "verifier tick run error" , "err" , err )
}
2024-06-20 08:19:06 +02:00
b . logg . Debug ( "verifier successful run" , "queue_size" , b . queue . Size ( ) )
} else {
b . logg . Debug ( "skipping verifier run" )
2024-05-23 08:41:39 +02:00
}
}
}
}
func ( b * backfiller ) Run ( skipLatest bool ) error {
lower , err := b . db . GetLowerBound ( )
if err != nil {
return fmt . Errorf ( "verifier could not get lower bound from db: err %v" , err )
}
upper , err := b . db . GetUpperBound ( )
if err != nil {
return fmt . Errorf ( "verifier could not get upper bound from db: err %v" , err )
}
if skipLatest {
upper --
}
missingBlocks , err := b . db . GetMissingValuesBitSet ( lower , upper )
if err != nil {
return fmt . Errorf ( "verifier could not get missing values bitset: err %v" , err )
}
missingBlocksCount := missingBlocks . Count ( )
if missingBlocksCount > 0 {
2024-06-20 08:19:06 +02:00
if missingBlocksCount >= uint ( b . maxQueueSize ) {
2024-08-05 10:45:14 +02:00
b . logg . Warn ( "large number of blocks missing this may result in degraded RPC performance set FORCE_BACKFILL=* to continue" , "missing_blocks" , missingBlocksCount , "max_queue_size" , b . maxQueueSize )
2024-05-23 08:41:39 +02:00
_ , ok := os . LookupEnv ( "FORCE_BACKFILL" )
if ! ok {
os . Exit ( 0 )
}
}
b . logg . Info ( "bootstrapping queue with missing blocks" )
b . logg . Info ( "found missing blocks" , "skip_latest" , skipLatest , "missing_blocks_count" , missingBlocksCount )
buffer := make ( [ ] uint , missingBlocksCount )
missingBlocks . NextSetMany ( 0 , buffer )
defer missingBlocks . ClearAll ( )
for _ , block := range buffer {
b . queue . Push ( uint64 ( block ) )
}
}
return nil
}