mirror of
https://github.com/grassrootseconomics/eth-tracker.git
synced 2025-04-22 08:21:03 +02:00
update start dependency routines
This commit is contained in:
parent
ab750b7e28
commit
656381aa7c
105
cmd/main.go
105
cmd/main.go
@ -7,18 +7,19 @@ import (
|
|||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/celo-org/celo-blockchain/core/types"
|
|
||||||
"github.com/ef-ds/deque/v2"
|
|
||||||
"github.com/grassrootseconomics/celo-tracker/internal/cache"
|
"github.com/grassrootseconomics/celo-tracker/internal/cache"
|
||||||
"github.com/grassrootseconomics/celo-tracker/internal/db"
|
"github.com/grassrootseconomics/celo-tracker/internal/db"
|
||||||
|
"github.com/grassrootseconomics/celo-tracker/internal/pool"
|
||||||
"github.com/grassrootseconomics/celo-tracker/internal/processor"
|
"github.com/grassrootseconomics/celo-tracker/internal/processor"
|
||||||
"github.com/grassrootseconomics/celo-tracker/internal/pub"
|
"github.com/grassrootseconomics/celo-tracker/internal/pub"
|
||||||
"github.com/grassrootseconomics/celo-tracker/internal/stats"
|
"github.com/grassrootseconomics/celo-tracker/internal/stats"
|
||||||
"github.com/grassrootseconomics/celo-tracker/internal/syncer"
|
"github.com/grassrootseconomics/celo-tracker/internal/syncer"
|
||||||
|
"github.com/grassrootseconomics/celo-tracker/internal/verifier"
|
||||||
"github.com/grassrootseconomics/celo-tracker/pkg/chain"
|
"github.com/grassrootseconomics/celo-tracker/pkg/chain"
|
||||||
"github.com/knadh/koanf/v2"
|
"github.com/knadh/koanf/v2"
|
||||||
)
|
)
|
||||||
@ -45,21 +46,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// mux := http.NewServeMux()
|
var wg sync.WaitGroup
|
||||||
// statsviz.Register(mux)
|
|
||||||
|
|
||||||
// go func() {
|
|
||||||
// lo.Info("metrics", "host:port", http.ListenAndServe("localhost:8080", mux))
|
|
||||||
// }()
|
|
||||||
// go func() {
|
|
||||||
// lo.Info("profiler", "host:port", http.ListenAndServe("localhost:6060", nil))
|
|
||||||
// }()
|
|
||||||
|
|
||||||
var (
|
|
||||||
batchQueue deque.Deque[uint64]
|
|
||||||
blocksQueue deque.Deque[types.Block]
|
|
||||||
wg sync.WaitGroup
|
|
||||||
)
|
|
||||||
ctx, stop := notifyShutdown()
|
ctx, stop := notifyShutdown()
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -68,16 +55,20 @@ func main() {
|
|||||||
- Stats
|
- Stats
|
||||||
- Chain
|
- Chain
|
||||||
- DB
|
- DB
|
||||||
- HistoricalSyncer
|
- Cache
|
||||||
- RealtimeSyncer
|
- JetStream Pub
|
||||||
|
- Worker Pool
|
||||||
- Block Processor
|
- Block Processor
|
||||||
|
- Chain Syncer
|
||||||
|
- Verifier
|
||||||
|
|
||||||
*/
|
*/
|
||||||
stats := stats.New(lo)
|
stats := stats.New(lo)
|
||||||
|
|
||||||
chain, err := chain.New(chain.ChainOpts{
|
chain, err := chain.New(chain.ChainOpts{
|
||||||
|
Logg: lo,
|
||||||
RPCEndpoint: ko.MustString("chain.rpc_endpoint"),
|
RPCEndpoint: ko.MustString("chain.rpc_endpoint"),
|
||||||
TestNet: ko.Bool("chain.testnet"),
|
TestNet: ko.Bool("chain.testnet"),
|
||||||
Logg: lo,
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lo.Error("could not initialize chain client", "error", err)
|
lo.Error("could not initialize chain client", "error", err)
|
||||||
@ -92,23 +83,6 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
chainSyncer, err := syncer.New(syncer.SyncerOpts{
|
|
||||||
WebSocketEndpoint: ko.MustString("chain.ws_endpoint"),
|
|
||||||
EnableHistorical: ko.Bool("chain.historical"),
|
|
||||||
StartBlock: uint64(ko.MustInt64("chain.start_block")),
|
|
||||||
BatchSize: ko.MustInt("chain.batch_size"),
|
|
||||||
BatchQueue: &batchQueue,
|
|
||||||
BlocksQueue: &blocksQueue,
|
|
||||||
Chain: chain,
|
|
||||||
Logg: lo,
|
|
||||||
Stats: stats,
|
|
||||||
DB: db,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
lo.Error("could not initialize chain syncer", "error", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
cache, err := cache.New(cache.CacheOpts{
|
cache, err := cache.New(cache.CacheOpts{
|
||||||
Logg: lo,
|
Logg: lo,
|
||||||
Chain: chain,
|
Chain: chain,
|
||||||
@ -132,39 +106,52 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
blockProcessor := processor.NewProcessor(processor.ProcessorOpts{
|
workerPool := pool.NewPool(pool.PoolOpts{
|
||||||
Chain: chain,
|
PoolSize: runtime.NumCPU(),
|
||||||
BlocksQueue: &blocksQueue,
|
|
||||||
Logg: lo,
|
|
||||||
Stats: stats,
|
|
||||||
DB: db,
|
|
||||||
Cache: cache,
|
|
||||||
Pub: jetStreamPub,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if ko.Bool("chain.historical") {
|
blockProcessor := processor.NewProcessor(processor.ProcessorOpts{
|
||||||
wg.Add(1)
|
Cache: cache,
|
||||||
go func() {
|
Chain: chain,
|
||||||
defer wg.Done()
|
DB: db,
|
||||||
if err := chainSyncer.BootstrapHistoricalSyncer(); err != nil {
|
Logg: lo,
|
||||||
lo.Error("could not bootstrap historical syncer", "error", err)
|
Pub: jetStreamPub,
|
||||||
|
Stats: stats,
|
||||||
|
})
|
||||||
|
|
||||||
|
chainSyncer, err := syncer.New(syncer.SyncerOpts{
|
||||||
|
BlockWorker: workerPool,
|
||||||
|
BlockProcessor: blockProcessor,
|
||||||
|
Chain: chain,
|
||||||
|
DB: db,
|
||||||
|
Logg: lo,
|
||||||
|
Stats: stats,
|
||||||
|
WebSocketEndpoint: ko.MustString("chain.ws_endpoint"),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
lo.Error("could not initialize chain syncer", "error", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
chainSyncer.StartHistoricalSyncer()
|
verifier := verifier.New(verifier.VerifierOpts{
|
||||||
}()
|
BlockWorker: workerPool,
|
||||||
}
|
BlockProcessor: blockProcessor,
|
||||||
|
Chain: chain,
|
||||||
|
DB: db,
|
||||||
|
Logg: lo,
|
||||||
|
Stats: stats,
|
||||||
|
})
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
chainSyncer.StartRealtime()
|
chainSyncer.Start()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
blockProcessor.Start()
|
verifier.Start()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
@ -174,10 +161,10 @@ func main() {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
blockProcessor.Stop()
|
workerPool.StopWait()
|
||||||
|
chainSyncer.Stop()
|
||||||
|
verifier.Stop()
|
||||||
jetStreamPub.Close()
|
jetStreamPub.Close()
|
||||||
chainSyncer.StopHistoricalSyncer()
|
|
||||||
chainSyncer.StopRealtime()
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -1,11 +1,15 @@
|
|||||||
package pool
|
package pool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"runtime"
|
|
||||||
|
|
||||||
"github.com/gammazero/workerpool"
|
"github.com/gammazero/workerpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewPool() *workerpool.WorkerPool {
|
type (
|
||||||
return workerpool.New(runtime.NumCPU())
|
PoolOpts struct {
|
||||||
|
PoolSize int
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewPool(o PoolOpts) *workerpool.WorkerPool {
|
||||||
|
return workerpool.New(o.PoolSize)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user