mirror of
https://github.com/grassrootseconomics/cic-chain-events.git
synced 2024-11-29 01:56:46 +01:00
Mohammed Sohail
2bbc05bb45
* This is a major refactor and includes general improvements around - context cancellation - build settings - jetstream pub sub - logging - docker builds - conf loading
130 lines
3.0 KiB
Go
130 lines
3.0 KiB
Go
package syncer
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/alitto/pond"
|
|
"github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
|
|
"github.com/grassrootseconomics/cic-chain-events/internal/store"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/zerodha/logf"
|
|
)
|
|
|
|
const (
|
|
headBlockLag = 5
|
|
)
|
|
|
|
type (
|
|
JanitorOpts struct {
|
|
BatchSize uint64
|
|
Logg logf.Logger
|
|
Pipeline *pipeline.Pipeline
|
|
Pool *pond.WorkerPool
|
|
Stats *Stats
|
|
Store store.Store[pgx.Rows]
|
|
SweepInterval time.Duration
|
|
}
|
|
|
|
Janitor struct {
|
|
batchSize uint64
|
|
pipeline *pipeline.Pipeline
|
|
logg logf.Logger
|
|
pool *pond.WorkerPool
|
|
stats *Stats
|
|
store store.Store[pgx.Rows]
|
|
sweepInterval time.Duration
|
|
}
|
|
)
|
|
|
|
func NewJanitor(o JanitorOpts) *Janitor {
|
|
return &Janitor{
|
|
batchSize: o.BatchSize,
|
|
logg: o.Logg,
|
|
pipeline: o.Pipeline,
|
|
pool: o.Pool,
|
|
stats: o.Stats,
|
|
store: o.Store,
|
|
sweepInterval: o.SweepInterval,
|
|
}
|
|
}
|
|
|
|
func (j *Janitor) Start(ctx context.Context) error {
|
|
ticker := time.NewTicker(j.sweepInterval)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
j.logg.Info("janitor: shutdown signal received")
|
|
return nil
|
|
case <-ticker.C:
|
|
ctx, cancel := context.WithTimeout(context.Background(), jobTimeout)
|
|
defer cancel()
|
|
|
|
j.logg.Debug("janitor: starting sweep")
|
|
if err := j.QueueMissingBlocks(ctx); err != nil {
|
|
j.logg.Error("janitor: queue missing blocks error", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// QueueMissingBlocks searches for missing block and queues the block for processing.
|
|
// It will run twice for a given search range and only after, raise the lower bound.
|
|
func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
|
|
if j.stats.GetHeadCursor() == 0 {
|
|
j.logg.Warn("janitor: (skipping) awaiting head synchronization")
|
|
return nil
|
|
}
|
|
|
|
if j.pool.WaitingTasks() != 0 {
|
|
j.logg.Debug("janitor: (skipping) queue has pending jobs", "pending_jobs", j.pool.WaitingTasks())
|
|
return nil
|
|
}
|
|
|
|
lowerBound, upperBound, err := j.store.GetSearchBounds(
|
|
ctx,
|
|
j.batchSize,
|
|
j.stats.GetHeadCursor(),
|
|
headBlockLag,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rows, err := j.store.GetMissingBlocks(ctx, lowerBound, upperBound)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
j.logg.Info("janitor: missing blocks", "count", j.stats.GetHeadCursor()-lowerBound)
|
|
|
|
rowsProcessed := 0
|
|
for rows.Next() {
|
|
var blockNumber uint64
|
|
if err := rows.Scan(&blockNumber); err != nil {
|
|
return err
|
|
}
|
|
|
|
j.pool.Submit(func() {
|
|
if err := j.pipeline.Run(ctx, blockNumber); err != nil {
|
|
j.logg.Error("janitor: pipeline run error", "error", err)
|
|
}
|
|
})
|
|
rowsProcessed++
|
|
}
|
|
|
|
j.logg.Debug("janitor: missing blocks to be processed", "count", rowsProcessed)
|
|
if rowsProcessed == 0 {
|
|
j.logg.Info("janitor: no gap! rasing lower bound", "new_lower_bound", upperBound)
|
|
j.stats.UpdateLowerBound(upperBound)
|
|
j.store.SetSearchLowerBound(ctx, upperBound)
|
|
}
|
|
|
|
if rows.Err() != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|