hotfix: drain janitor queue, worker pool, js

* fully drain worker queue before reading gap from db
* dedicated go routine for head syncer
* emit blok, tx index and hash to jetstream
This commit is contained in:
Mohamed Sohail 2023-01-18 22:40:14 +03:00
parent 696263e35f
commit ef9f2b2b7f
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
8 changed files with 46 additions and 62 deletions

View File

@ -1,12 +1,9 @@
package main package main
import ( import (
"context"
"strings" "strings"
"time" "time"
"github.com/alitto/pond"
"github.com/grassrootseconomics/cic-chain-events/internal/pool"
"github.com/grassrootseconomics/cic-chain-events/internal/store" "github.com/grassrootseconomics/cic-chain-events/internal/store"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
@ -74,13 +71,6 @@ func initPgStore() (store.Store[pgx.Rows], error) {
return pgStore, nil return pgStore, nil
} }
func initWorkerPool(ctx context.Context) *pond.WorkerPool {
return pool.NewPool(ctx, pool.Opts{
ConcurrencyFactor: ko.MustInt("syncer.concurrency"),
PoolQueueSize: ko.MustInt("syncer.queue_size"),
})
}
func initFetcher() fetch.Fetch { func initFetcher() fetch.Fetch {
return fetch.NewGraphqlFetcher(fetch.GraphqlOpts{ return fetch.NewGraphqlFetcher(fetch.GraphqlOpts{
GraphqlEndpoint: ko.MustString("chain.graphql_endpoint"), GraphqlEndpoint: ko.MustString("chain.graphql_endpoint"),

View File

@ -10,8 +10,8 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/grassrootseconomics/cic-chain-events/internal/api"
"github.com/grassrootseconomics/cic-chain-events/internal/pipeline" "github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
"github.com/grassrootseconomics/cic-chain-events/internal/pool"
"github.com/grassrootseconomics/cic-chain-events/internal/syncer" "github.com/grassrootseconomics/cic-chain-events/internal/syncer"
"github.com/grassrootseconomics/cic-chain-events/pkg/filter" "github.com/grassrootseconomics/cic-chain-events/pkg/filter"
"github.com/knadh/goyesql/v2" "github.com/knadh/goyesql/v2"
@ -41,13 +41,6 @@ func init() {
} }
func main() { func main() {
// p := profiler.New(profiler.Conf{
// DirPath: "profiles",
// Quiet: true,
// NoShutdownHook: false,
// }, profiler.Cpu, profiler.Mem)
// p.Start()
syncerStats := &syncer.Stats{} syncerStats := &syncer.Stats{}
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
apiServer := initApiServer() apiServer := initApiServer()
@ -55,7 +48,10 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop() defer stop()
workerPool := initWorkerPool(ctx) janitorWorkerPool := pool.NewPool(ctx, pool.Opts{
Concurrency: ko.MustInt("syncer.janitor_concurrency"),
QueueSize: ko.MustInt("syncer.janitor_queue_size"),
})
pgStore, err := initPgStore() pgStore, err := initPgStore()
if err != nil { if err != nil {
@ -74,10 +70,15 @@ func main() {
Store: pgStore, Store: pgStore,
}) })
headSyncerWorker := pool.NewPool(ctx, pool.Opts{
Concurrency: 1,
QueueSize: 1,
})
headSyncer, err := syncer.NewHeadSyncer(syncer.HeadSyncerOpts{ headSyncer, err := syncer.NewHeadSyncer(syncer.HeadSyncerOpts{
Logg: lo, Logg: lo,
Pipeline: pipeline, Pipeline: pipeline,
Pool: workerPool, Pool: headSyncerWorker,
Stats: syncerStats, Stats: syncerStats,
WsEndpoint: ko.MustString("chain.ws_endpoint"), WsEndpoint: ko.MustString("chain.ws_endpoint"),
}) })
@ -86,18 +87,15 @@ func main() {
} }
janitor := syncer.NewJanitor(syncer.JanitorOpts{ janitor := syncer.NewJanitor(syncer.JanitorOpts{
BatchSize: uint64(ko.MustInt64("syncer.batch_size")), BatchSize: uint64(ko.MustInt64("syncer.janitor_queue_size")),
HeadBlockLag: uint64(ko.MustInt64("syncer.head_block_lag")),
Logg: lo, Logg: lo,
Pipeline: pipeline, Pipeline: pipeline,
Pool: workerPool, Pool: janitorWorkerPool,
Stats: syncerStats, Stats: syncerStats,
Store: pgStore, Store: pgStore,
SweepInterval: time.Second * time.Duration(ko.MustInt64("syncer.sweep_interval")), SweepInterval: time.Second * time.Duration(ko.MustInt64("syncer.janitor_sweep_interval")),
}) })
apiServer.GET("/stats", api.StatsHandler(syncerStats, workerPool))
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
@ -129,12 +127,9 @@ func main() {
<-ctx.Done() <-ctx.Done()
workerPool.Stop()
if err := apiServer.Shutdown(ctx); err != nil { if err := apiServer.Shutdown(ctx); err != nil {
lo.Error("main: could not gracefully shutdown api server", "err", err) lo.Error("main: could not gracefully shutdown api server", "err", err)
} }
wg.Wait() wg.Wait()
// p.Stop()
} }

View File

@ -10,24 +10,19 @@ address = ":8080"
# Geth API endpoints # Geth API endpoints
[chain] [chain]
graphql_endpoint = "https://rpc.celo.grassecon.net/graphql" graphql_endpoint = "https://rpc.celo.grassecon.net/graphql"
ws_endpoint = "wss://ws.celo.grassecon.net" ws_endpoint = "wss://socket.celo.grassecon.net"
# Syncer configs
[syncer] [syncer]
# Maximum number of missing blocks pushed into the worker queue every janitor sweep
batch_size = 200
# Number of goroutines assigned to the worker pool # Number of goroutines assigned to the worker pool
concurrency = 3 janitor_concurrency = 5
# Prevents reprocessing head block already in queue
head_block_lag = 5
# Max idle time after which goroutine is returned back to the pool # Max idle time after which goroutine is returned back to the pool
idle_worker_timeout = 1 idle_worker_timeout = 1
# Syncer start block # Syncer start block
initial_lower_bound = 17204504 initial_lower_bound = 17269000
# Max blocks in worker queue awaiting processing # Max blocks in worker queue awaiting processing
queue_size = 500 janitor_queue_size = 500
# Janitor sweep interval, should take into account concurrency and queue_size # Janitor sweep interval, should take into account concurrency and queue_size
sweep_interval = 10 janitor_sweep_interval = 5
[postgres] [postgres]
dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events" dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events"
@ -45,4 +40,8 @@ stream_subjects = [
"CHAIN.transfer", "CHAIN.transfer",
"CHAIN.transferFrom", "CHAIN.transferFrom",
"CHAIN.mintTo" "CHAIN.mintTo"
] ]
# 77G - snapshot
# 111G - decompressed
# 112G - latest

View File

@ -42,7 +42,6 @@ func NewPipeline(o PipelineOpts) *Pipeline {
// - Blocks are processed atomically, a failure in-between will process the block from the start // - Blocks are processed atomically, a failure in-between will process the block from the start
// - Therefore, any side effect/event sink in the filter should support dedup // - Therefore, any side effect/event sink in the filter should support dedup
func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error { func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error {
md.logg.Debug("pipeline: processing block", "block", blockNumber)
fetchResp, err := md.fetch.Block(ctx, blockNumber) fetchResp, err := md.fetch.Block(ctx, blockNumber)
if err != nil { if err != nil {
return err return err
@ -63,7 +62,6 @@ func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error {
if err := md.store.CommitBlock(ctx, blockNumber); err != nil { if err := md.store.CommitBlock(ctx, blockNumber); err != nil {
return err return err
} }
md.logg.Debug("pipeline: committed block", "block", blockNumber)
return nil return nil
} }

View File

@ -8,16 +8,16 @@ import (
) )
type Opts struct { type Opts struct {
ConcurrencyFactor int Concurrency int
PoolQueueSize int QueueSize int
} }
// NewPool creates a fixed size (and buffered) go routine worker pool. // NewPool creates a fixed size (and buffered) go routine worker pool.
func NewPool(ctx context.Context, o Opts) *pond.WorkerPool { func NewPool(ctx context.Context, o Opts) *pond.WorkerPool {
return pond.New( return pond.New(
o.ConcurrencyFactor, o.Concurrency,
o.PoolQueueSize, o.QueueSize,
pond.MinWorkers(o.ConcurrencyFactor), pond.MinWorkers(o.Concurrency),
pond.IdleTimeout(time.Second*1), pond.IdleTimeout(time.Second*1),
pond.Context(ctx), pond.Context(ctx),
) )

View File

@ -11,9 +11,12 @@ import (
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
const (
headBlockLag = 5
)
type JanitorOpts struct { type JanitorOpts struct {
BatchSize uint64 BatchSize uint64
HeadBlockLag uint64
Logg logf.Logger Logg logf.Logger
Pipeline *pipeline.Pipeline Pipeline *pipeline.Pipeline
Pool *pond.WorkerPool Pool *pond.WorkerPool
@ -24,7 +27,6 @@ type JanitorOpts struct {
type Janitor struct { type Janitor struct {
batchSize uint64 batchSize uint64
headBlockLag uint64
pipeline *pipeline.Pipeline pipeline *pipeline.Pipeline
logg logf.Logger logg logf.Logger
pool *pond.WorkerPool pool *pond.WorkerPool
@ -36,7 +38,6 @@ type Janitor struct {
func NewJanitor(o JanitorOpts) *Janitor { func NewJanitor(o JanitorOpts) *Janitor {
return &Janitor{ return &Janitor{
batchSize: o.BatchSize, batchSize: o.BatchSize,
headBlockLag: o.HeadBlockLag,
logg: o.Logg, logg: o.Logg,
pipeline: o.Pipeline, pipeline: o.Pipeline,
pool: o.Pool, pool: o.Pool,
@ -73,8 +74,8 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
return nil return nil
} }
if j.pool.WaitingTasks() >= j.batchSize { if j.pool.WaitingTasks() != 0 {
j.logg.Warn("janitor: (skipping) avoiding queue pressure") j.logg.Debug("janitor: (skipping) queue has pending jobs", "pending_jobs", j.pool.WaitingTasks())
return nil return nil
} }
@ -82,7 +83,7 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
ctx, ctx,
j.batchSize, j.batchSize,
j.stats.GetHeadCursor(), j.stats.GetHeadCursor(),
j.headBlockLag, headBlockLag,
) )
if err != nil { if err != nil {
return err return err
@ -95,13 +96,13 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
rowsProcessed := 0 rowsProcessed := 0
for rows.Next() { for rows.Next() {
var n uint64 var blockNumber uint64
if err := rows.Scan(&n); err != nil { if err := rows.Scan(&blockNumber); err != nil {
return err return err
} }
j.pool.Submit(func() { j.pool.Submit(func() {
if err := j.pipeline.Run(ctx, n); err != nil { if err := j.pipeline.Run(ctx, blockNumber); err != nil {
j.logg.Error("janitor: pipeline run error", "error", err) j.logg.Error("janitor: pipeline run error", "error", err)
} }
}) })
@ -110,7 +111,7 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
j.logg.Debug("janitor: missing blocks count", "count", rowsProcessed) j.logg.Debug("janitor: missing blocks count", "count", rowsProcessed)
if rowsProcessed == 0 { if rowsProcessed == 0 {
j.logg.Debug("janitor: rasing lower bound") j.logg.Info("janitor: no gap! rasing lower bound", "new_lower_bound", upperBound)
j.stats.UpdateLowerBound(upperBound) j.stats.UpdateLowerBound(upperBound)
j.store.SetSearchLowerBound(ctx, upperBound) j.store.SetSearchLowerBound(ctx, upperBound)
} }

View File

@ -26,7 +26,7 @@ type Graphql struct {
func NewGraphqlFetcher(o GraphqlOpts) Fetch { func NewGraphqlFetcher(o GraphqlOpts) Fetch {
return &Graphql{ return &Graphql{
httpClient: &http.Client{ httpClient: &http.Client{
Timeout: time.Second * 2, Timeout: time.Second * 5,
}, },
graphqlEndpoint: o.GraphqlEndpoint, graphqlEndpoint: o.GraphqlEndpoint,
} }

View File

@ -2,6 +2,7 @@ package filter
import ( import (
"context" "context"
"fmt"
"math/big" "math/big"
"github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/common"
@ -46,7 +47,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction)
return false, err return false, err
} }
_, err := f.js.Publish("CHAIN.transfer", []byte(transaction.Hash), nats.MsgId(transaction.Hash)) _, err := f.js.Publish("CHAIN.transfer", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash))
if err != nil { if err != nil {
return false, err return false, err
} }
@ -63,7 +64,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction)
return false, err return false, err
} }
_, err := f.js.Publish("CHAIN.transferFrom", []byte(transaction.Hash), nats.MsgId(transaction.Hash)) _, err := f.js.Publish("CHAIN.transferFrom", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash))
if err != nil { if err != nil {
return false, err return false, err
} }
@ -79,7 +80,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction)
return false, err return false, err
} }
_, err := f.js.Publish("CHAIN.mintTo", []byte(transaction.Hash), nats.MsgId(transaction.Hash)) _, err := f.js.Publish("CHAIN.mintTo", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash))
if err != nil { if err != nil {
return false, err return false, err
} }