From 656381aa7c6fed3806e56348c1e5152dc5705404 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Mon, 6 May 2024 15:52:00 +0800 Subject: [PATCH] update start dependency routines --- cmd/main.go | 109 +++++++++++++++++++----------------------- internal/pool/pool.go | 12 +++-- 2 files changed, 56 insertions(+), 65 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 8d43bf4..f32164c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,18 +7,19 @@ import ( "log/slog" "os" "os/signal" + "runtime" "sync" "syscall" "time" - "github.com/celo-org/celo-blockchain/core/types" - "github.com/ef-ds/deque/v2" "github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/pool" "github.com/grassrootseconomics/celo-tracker/internal/processor" "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/celo-tracker/internal/stats" "github.com/grassrootseconomics/celo-tracker/internal/syncer" + "github.com/grassrootseconomics/celo-tracker/internal/verifier" "github.com/grassrootseconomics/celo-tracker/pkg/chain" "github.com/knadh/koanf/v2" ) @@ -45,21 +46,7 @@ func init() { } func main() { - // mux := http.NewServeMux() - // statsviz.Register(mux) - - // go func() { - // lo.Info("metrics", "host:port", http.ListenAndServe("localhost:8080", mux)) - // }() - // go func() { - // lo.Info("profiler", "host:port", http.ListenAndServe("localhost:6060", nil)) - // }() - - var ( - batchQueue deque.Deque[uint64] - blocksQueue deque.Deque[types.Block] - wg sync.WaitGroup - ) + var wg sync.WaitGroup ctx, stop := notifyShutdown() /* @@ -68,16 +55,20 @@ func main() { - Stats - Chain - DB - - HistoricalSyncer - - RealtimeSyncer - - BlockProcessor + - Cache + - JetStream Pub + - Worker Pool + - Block Processor + - Chain Syncer + - Verifier + */ stats := stats.New(lo) chain, err := chain.New(chain.ChainOpts{ + Logg: lo, RPCEndpoint: ko.MustString("chain.rpc_endpoint"), TestNet: ko.Bool("chain.testnet"), - Logg: lo, }) if err != nil { lo.Error("could not initialize chain client", "error", err) @@ -92,23 +83,6 @@ func main() { os.Exit(1) } - chainSyncer, err := syncer.New(syncer.SyncerOpts{ - WebSocketEndpoint: ko.MustString("chain.ws_endpoint"), - EnableHistorical: ko.Bool("chain.historical"), - StartBlock: uint64(ko.MustInt64("chain.start_block")), - BatchSize: ko.MustInt("chain.batch_size"), - BatchQueue: &batchQueue, - BlocksQueue: &blocksQueue, - Chain: chain, - Logg: lo, - Stats: stats, - DB: db, - }) - if err != nil { - lo.Error("could not initialize chain syncer", "error", err) - os.Exit(1) - } - cache, err := cache.New(cache.CacheOpts{ Logg: lo, Chain: chain, @@ -132,39 +106,52 @@ func main() { os.Exit(1) } - blockProcessor := processor.NewProcessor(processor.ProcessorOpts{ - Chain: chain, - BlocksQueue: &blocksQueue, - Logg: lo, - Stats: stats, - DB: db, - Cache: cache, - Pub: jetStreamPub, + workerPool := pool.NewPool(pool.PoolOpts{ + PoolSize: runtime.NumCPU(), }) - if ko.Bool("chain.historical") { - wg.Add(1) - go func() { - defer wg.Done() - if err := chainSyncer.BootstrapHistoricalSyncer(); err != nil { - lo.Error("could not bootstrap historical syncer", "error", err) - os.Exit(1) - } + blockProcessor := processor.NewProcessor(processor.ProcessorOpts{ + Cache: cache, + Chain: chain, + DB: db, + Logg: lo, + Pub: jetStreamPub, + Stats: stats, + }) - chainSyncer.StartHistoricalSyncer() - }() + chainSyncer, err := syncer.New(syncer.SyncerOpts{ + BlockWorker: workerPool, + BlockProcessor: blockProcessor, + Chain: chain, + DB: db, + Logg: lo, + Stats: stats, + WebSocketEndpoint: ko.MustString("chain.ws_endpoint"), + }) + if err != nil { + lo.Error("could not initialize chain syncer", "error", err) + os.Exit(1) } + verifier := verifier.New(verifier.VerifierOpts{ + BlockWorker: workerPool, + BlockProcessor: blockProcessor, + Chain: chain, + DB: db, + Logg: lo, + Stats: stats, + }) + wg.Add(1) go func() { defer wg.Done() - chainSyncer.StartRealtime() + chainSyncer.Start() }() wg.Add(1) go func() { defer wg.Done() - blockProcessor.Start() + verifier.Start() }() <-ctx.Done() @@ -174,10 +161,10 @@ func main() { wg.Add(1) go func() { defer wg.Done() - blockProcessor.Stop() + workerPool.StopWait() + chainSyncer.Stop() + verifier.Stop() jetStreamPub.Close() - chainSyncer.StopHistoricalSyncer() - chainSyncer.StopRealtime() }() go func() { diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 3177f48..c5c4b0f 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -1,11 +1,15 @@ package pool import ( - "runtime" - "github.com/gammazero/workerpool" ) -func NewPool() *workerpool.WorkerPool { - return workerpool.New(runtime.NumCPU()) +type ( + PoolOpts struct { + PoolSize int + } +) + +func NewPool(o PoolOpts) *workerpool.WorkerPool { + return workerpool.New(o.PoolSize) }