Merge branch 'master' into feat/rpc-fetcher

* decode_filter changes
This commit is contained in:
Mohamed Sohail 2023-01-19 11:39:12 +03:00
commit d5159fb4c9
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
10 changed files with 49 additions and 65 deletions

View File

@ -1,12 +1,9 @@
package main
import (
"context"
"strings"
"time"
"github.com/alitto/pond"
"github.com/grassrootseconomics/cic-chain-events/internal/pool"
"github.com/grassrootseconomics/cic-chain-events/internal/store"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/jackc/pgx/v5"
@ -74,13 +71,6 @@ func initPgStore() (store.Store[pgx.Rows], error) {
return pgStore, nil
}
func initWorkerPool(ctx context.Context) *pond.WorkerPool {
return pool.NewPool(ctx, pool.Opts{
ConcurrencyFactor: ko.MustInt("syncer.concurrency"),
PoolQueueSize: ko.MustInt("syncer.queue_size"),
})
}
func initFetcher() fetch.Fetch {
return fetch.NewGraphqlFetcher(fetch.GraphqlOpts{
GraphqlEndpoint: ko.MustString("chain.graphql_endpoint"),

View File

@ -10,8 +10,8 @@ import (
"syscall"
"time"
"github.com/grassrootseconomics/cic-chain-events/internal/api"
"github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
"github.com/grassrootseconomics/cic-chain-events/internal/pool"
"github.com/grassrootseconomics/cic-chain-events/internal/syncer"
"github.com/grassrootseconomics/cic-chain-events/internal/filter"
"github.com/knadh/goyesql/v2"
@ -41,13 +41,6 @@ func init() {
}
func main() {
// p := profiler.New(profiler.Conf{
// DirPath: "profiles",
// Quiet: true,
// NoShutdownHook: false,
// }, profiler.Cpu, profiler.Mem)
// p.Start()
syncerStats := &syncer.Stats{}
wg := &sync.WaitGroup{}
apiServer := initApiServer()
@ -55,7 +48,10 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
workerPool := initWorkerPool(ctx)
janitorWorkerPool := pool.NewPool(ctx, pool.Opts{
Concurrency: ko.MustInt("syncer.janitor_concurrency"),
QueueSize: ko.MustInt("syncer.janitor_queue_size"),
})
pgStore, err := initPgStore()
if err != nil {
@ -74,10 +70,15 @@ func main() {
Store: pgStore,
})
headSyncerWorker := pool.NewPool(ctx, pool.Opts{
Concurrency: 1,
QueueSize: 1,
})
headSyncer, err := syncer.NewHeadSyncer(syncer.HeadSyncerOpts{
Logg: lo,
Pipeline: pipeline,
Pool: workerPool,
Pool: headSyncerWorker,
Stats: syncerStats,
WsEndpoint: ko.MustString("chain.ws_endpoint"),
})
@ -86,18 +87,15 @@ func main() {
}
janitor := syncer.NewJanitor(syncer.JanitorOpts{
BatchSize: uint64(ko.MustInt64("syncer.batch_size")),
HeadBlockLag: uint64(ko.MustInt64("syncer.head_block_lag")),
BatchSize: uint64(ko.MustInt64("syncer.janitor_queue_size")),
Logg: lo,
Pipeline: pipeline,
Pool: workerPool,
Pool: janitorWorkerPool,
Stats: syncerStats,
Store: pgStore,
SweepInterval: time.Second * time.Duration(ko.MustInt64("syncer.sweep_interval")),
SweepInterval: time.Second * time.Duration(ko.MustInt64("syncer.janitor_sweep_interval")),
})
apiServer.GET("/stats", api.StatsHandler(syncerStats, workerPool))
wg.Add(1)
go func() {
defer wg.Done()
@ -129,12 +127,9 @@ func main() {
<-ctx.Done()
workerPool.Stop()
if err := apiServer.Shutdown(ctx); err != nil {
lo.Error("main: could not gracefully shutdown api server", "err", err)
}
wg.Wait()
// p.Stop()
}

View File

@ -10,24 +10,19 @@ address = ":8080"
# Geth API endpoints
[chain]
graphql_endpoint = "https://rpc.celo.grassecon.net/graphql"
ws_endpoint = "wss://ws.celo.grassecon.net"
ws_endpoint = "wss://socket.celo.grassecon.net"
# Syncer configs
[syncer]
# Maximum number of missing blocks pushed into the worker queue every janitor sweep
batch_size = 200
# Number of goroutines assigned to the worker pool
concurrency = 3
# Prevents reprocessing head block already in queue
head_block_lag = 5
janitor_concurrency = 5
# Max idle time after which goroutine is returned back to the pool
idle_worker_timeout = 1
idle_worker_timeout = 1
# Syncer start block
initial_lower_bound = 17204504
initial_lower_bound = 17269000
# Max blocks in worker queue awaiting processing
queue_size = 500
janitor_queue_size = 500
# Janitor sweep interval, should take into account concurrency and queue_size
sweep_interval = 10
janitor_sweep_interval = 5
[postgres]
dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events"
@ -45,4 +40,8 @@ stream_subjects = [
"CHAIN.transfer",
"CHAIN.transferFrom",
"CHAIN.mintTo"
]
]
# 77G - snapshot
# 111G - decompressed
# 112G - latest

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/grassrootseconomics/cic-chain-events
go 1.19
require (
github.com/VictoriaMetrics/metrics v1.23.0
github.com/VictoriaMetrics/metrics v1.23.1
github.com/alitto/pond v1.8.2
github.com/celo-org/celo-blockchain v1.6.1
github.com/goccy/go-json v0.10.0

4
go.sum
View File

@ -42,8 +42,8 @@ github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIO
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VictoriaMetrics/fastcache v1.6.0 h1:C/3Oi3EiBCqufydp1neRZkqcwmEiuRT9c3fqvvgKm5o=
github.com/VictoriaMetrics/fastcache v1.6.0/go.mod h1:0qHz5QP0GMX4pfmMA/zt5RgfNuXJrTP0zS7DqpHGGTw=
github.com/VictoriaMetrics/metrics v1.23.0 h1:WzfqyzCaxUZip+OBbg1+lV33WChDSu4ssYII3nxtpeA=
github.com/VictoriaMetrics/metrics v1.23.0/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
github.com/VictoriaMetrics/metrics v1.23.1 h1:/j8DzeJBxSpL2qSIdqnRFLvQQhbJyJbbEi22yMm7oL0=
github.com/VictoriaMetrics/metrics v1.23.1/go.mod h1:rAr/llLpEnAdTehiNlUxKgnjcOuROSzpw0GvjpEbvFc=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

View File

@ -2,6 +2,7 @@ package filter
import (
"context"
"fmt"
"math/big"
"github.com/celo-org/celo-blockchain/common"
@ -46,7 +47,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction)
return false, err
}
_, err := f.js.Publish("CHAIN.transfer", []byte(transaction.Hash), nats.MsgId(transaction.Hash))
_, err := f.js.Publish("CHAIN.transfer", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash))
if err != nil {
return false, err
}
@ -63,7 +64,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction)
return false, err
}
_, err := f.js.Publish("CHAIN.transferFrom", []byte(transaction.Hash), nats.MsgId(transaction.Hash))
_, err := f.js.Publish("CHAIN.transferFrom", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash))
if err != nil {
return false, err
}
@ -79,7 +80,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction)
return false, err
}
_, err := f.js.Publish("CHAIN.mintTo", []byte(transaction.Hash), nats.MsgId(transaction.Hash))
_, err := f.js.Publish("CHAIN.mintTo", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash))
if err != nil {
return false, err
}

View File

@ -42,7 +42,6 @@ func NewPipeline(o PipelineOpts) *Pipeline {
// - Blocks are processed atomically, a failure in-between will process the block from the start
// - Therefore, any side effect/event sink in the filter should support dedup
func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error {
md.logg.Debug("pipeline: processing block", "block", blockNumber)
fetchResp, err := md.fetch.Block(ctx, blockNumber)
if err != nil {
return err
@ -63,7 +62,6 @@ func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error {
if err := md.store.CommitBlock(ctx, blockNumber); err != nil {
return err
}
md.logg.Debug("pipeline: committed block", "block", blockNumber)
return nil
}

View File

@ -8,16 +8,16 @@ import (
)
type Opts struct {
ConcurrencyFactor int
PoolQueueSize int
Concurrency int
QueueSize int
}
// NewPool creates a fixed size (and buffered) go routine worker pool.
func NewPool(ctx context.Context, o Opts) *pond.WorkerPool {
return pond.New(
o.ConcurrencyFactor,
o.PoolQueueSize,
pond.MinWorkers(o.ConcurrencyFactor),
o.Concurrency,
o.QueueSize,
pond.MinWorkers(o.Concurrency),
pond.IdleTimeout(time.Second*1),
pond.Context(ctx),
)

View File

@ -11,9 +11,12 @@ import (
"github.com/zerodha/logf"
)
const (
headBlockLag = 5
)
type JanitorOpts struct {
BatchSize uint64
HeadBlockLag uint64
Logg logf.Logger
Pipeline *pipeline.Pipeline
Pool *pond.WorkerPool
@ -24,7 +27,6 @@ type JanitorOpts struct {
type Janitor struct {
batchSize uint64
headBlockLag uint64
pipeline *pipeline.Pipeline
logg logf.Logger
pool *pond.WorkerPool
@ -36,7 +38,6 @@ type Janitor struct {
func NewJanitor(o JanitorOpts) *Janitor {
return &Janitor{
batchSize: o.BatchSize,
headBlockLag: o.HeadBlockLag,
logg: o.Logg,
pipeline: o.Pipeline,
pool: o.Pool,
@ -73,8 +74,8 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
return nil
}
if j.pool.WaitingTasks() >= j.batchSize {
j.logg.Warn("janitor: (skipping) avoiding queue pressure")
if j.pool.WaitingTasks() != 0 {
j.logg.Debug("janitor: (skipping) queue has pending jobs", "pending_jobs", j.pool.WaitingTasks())
return nil
}
@ -82,7 +83,7 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
ctx,
j.batchSize,
j.stats.GetHeadCursor(),
j.headBlockLag,
headBlockLag,
)
if err != nil {
return err
@ -95,13 +96,13 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
rowsProcessed := 0
for rows.Next() {
var n uint64
if err := rows.Scan(&n); err != nil {
var blockNumber uint64
if err := rows.Scan(&blockNumber); err != nil {
return err
}
j.pool.Submit(func() {
if err := j.pipeline.Run(ctx, n); err != nil {
if err := j.pipeline.Run(ctx, blockNumber); err != nil {
j.logg.Error("janitor: pipeline run error", "error", err)
}
})
@ -110,7 +111,7 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
j.logg.Debug("janitor: missing blocks count", "count", rowsProcessed)
if rowsProcessed == 0 {
j.logg.Debug("janitor: rasing lower bound")
j.logg.Info("janitor: no gap! rasing lower bound", "new_lower_bound", upperBound)
j.stats.UpdateLowerBound(upperBound)
j.store.SetSearchLowerBound(ctx, upperBound)
}

View File

@ -26,7 +26,7 @@ type Graphql struct {
func NewGraphqlFetcher(o GraphqlOpts) Fetch {
return &Graphql{
httpClient: &http.Client{
Timeout: time.Second * 2,
Timeout: time.Second * 5,
},
graphqlEndpoint: o.GraphqlEndpoint,
}