feat: add cache bootstrapper

This commit is contained in:
Mohamed Sohail 2024-04-18 12:56:05 +08:00
parent 0aa1db902e
commit b3b4503565
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
11 changed files with 87 additions and 26 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/celo-org/celo-blockchain/core/types"
"github.com/ef-ds/deque/v2"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celo-tracker/internal/db"
"github.com/grassrootseconomics/celo-tracker/internal/processor"
@ -98,7 +99,7 @@ func main() {
Logg: lo,
Stats: stats,
DB: db,
InitialLowerBound: uint64(ko.MustInt64("chain.start_block")),
InitialLowerBound: uint64(ko.MustInt64("bootstrap.start_block")),
})
if err != nil {
lo.Error("could not initialize chain syncer", "error", err)
@ -109,12 +110,23 @@ func main() {
// os.Exit(1)
// }
cache, err := cache.New(cache.CacheOpts{
Logg: lo,
Chain: chain,
Registries: ko.MustStrings("bootstrap.registries"),
})
if err != nil {
lo.Error("could not initialize cache", "error", err)
os.Exit(1)
}
blockProcessor := processor.NewProcessor(processor.ProcessorOpts{
Chain: chain,
BlocksQueue: &blocksQueue,
Logg: lo,
Stats: stats,
DB: db,
Cache: cache,
})
// wg.Add(1)

View File

@ -1,16 +1,18 @@
[metrics]
# Exposes Prometheus metrics
go_process = true
# API server
[service]
# Host and port
[api]
address = ":5001"
[chain]
start_block = 24905000
graphql_endpoint = ""
ws_endpoint = "wss://ws.celo.grassecon.net"
rpc_endpoint = "https://rpc.ankr.com/celo/bae2b7745f52c50974d7ecb1a7c23dc05d9ab5b68caf498a7c73f09a3e8bc04a"
testnet = false
registry_address = ""
[bootstrap]
start_block = 24905000
# https://software.grassecon.org/addresses
registries = [
"0xd1FB944748aca327a1ba036B082993D9dd9Bfa0C",
"0x0cc9f4fff962def35bb34a53691180b13e653030",
]

2
go.mod
View File

@ -9,7 +9,7 @@ require (
github.com/celo-org/celo-blockchain v1.8.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/ef-ds/deque/v2 v2.0.2
github.com/grassrootseconomics/celoutils/v2 v2.4.2
github.com/grassrootseconomics/celoutils/v2 v2.6.0
github.com/grassrootseconomics/w3-celo v0.16.0
github.com/kamikazechaser/common v0.2.0
github.com/knadh/koanf/parsers/toml v0.1.0

2
go.sum
View File

@ -263,6 +263,8 @@ github.com/grassrootseconomics/celoutils/v2 v2.4.1 h1:8S4+TfXVevxu3+tBIyzGM4z6iT
github.com/grassrootseconomics/celoutils/v2 v2.4.1/go.mod h1:DB9sh7lY9zw0/cyCu8uYunAe+IDM8/104l+KEhkJnqg=
github.com/grassrootseconomics/celoutils/v2 v2.4.2 h1:EAXLMLJhv9ukAlM2me8A+jHInxXeSSOmEWKY9zHPONQ=
github.com/grassrootseconomics/celoutils/v2 v2.4.2/go.mod h1:DB9sh7lY9zw0/cyCu8uYunAe+IDM8/104l+KEhkJnqg=
github.com/grassrootseconomics/celoutils/v2 v2.6.0 h1:Ccr4KQ7lvsO73xaOmu+OW1jn1p9l0/lGm5vEw81HXGQ=
github.com/grassrootseconomics/celoutils/v2 v2.6.0/go.mod h1:DB9sh7lY9zw0/cyCu8uYunAe+IDM8/104l+KEhkJnqg=
github.com/grassrootseconomics/w3-celo v0.16.0 h1:AKPd+LGqR4YgkLw44V4Jgq/+prhJfTnaWzFOdS8JRgg=
github.com/grassrootseconomics/w3-celo v0.16.0/go.mod h1:SVduFQshhMPMIRFKix6JwOZmv5a/e0NTObVeB4lXrH4=
github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE=

View File

@ -1,9 +1,13 @@
package cache
import (
"context"
"log/slog"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celoutils/v2"
"github.com/grassrootseconomics/w3-celo"
)
type (
@ -15,13 +19,14 @@ type (
}
CacheOpts struct {
Logg *slog.Logger
Chain *chain.Chain
CacheType string
Logg *slog.Logger
Chain *chain.Chain
CacheType string
Registries []string
}
)
func New(o CacheOpts) Cache {
func New(o CacheOpts) (Cache, error) {
var (
cache Cache
)
@ -32,7 +37,42 @@ func New(o CacheOpts) Cache {
default:
cache = NewMapCache()
}
o.Logg.Debug("bootstrapping cache")
return cache
ctx := context.Background()
for _, registry := range o.Registries {
registryMap, err := o.Chain.Provider.RegistryMap(ctx, w3.A(registry))
if err != nil {
return nil, err
}
for _, v := range registryMap {
cache.Add(v.Hex())
}
if registryMap[celoutils.TokenIndex] != common.ZeroAddress {
tokens, err := o.Chain.GetAllTokensFromTokenIndex(ctx, registryMap[celoutils.TokenIndex])
if err != nil {
return nil, err
}
for _, token := range tokens {
cache.Add(token.Hex())
}
}
if registryMap[celoutils.PoolIndex] != common.ZeroAddress {
pools, err := o.Chain.GetAllTokensFromTokenIndex(ctx, registryMap[celoutils.PoolIndex])
if err != nil {
return nil, err
}
for _, pool := range pools {
cache.Add(pool.Hex())
}
}
}
o.Logg.Debug("cache bootstrap complete", "cached_addresses", cache.Size())
return cache, nil
}

View File

@ -19,7 +19,7 @@ func (c *Chain) GetBlocks(ctx context.Context, blockNumbers []uint64) ([]types.B
calls[i] = eth.BlockByNumber(new(big.Int).SetUint64(v)).Returns(&blocks[i])
}
if err := c.provider.Client.CallCtx(ctx, calls...); err != nil {
if err := c.Provider.Client.CallCtx(ctx, calls...); err != nil {
return nil, err
}
@ -33,7 +33,7 @@ func (c *Chain) GetBlock(ctx context.Context, blockNumber uint64) (types.Block,
blockCall := eth.BlockByNumber(new(big.Int).SetUint64(blockNumber)).Returns(&block)
if err := c.provider.Client.CallCtx(ctx, blockCall); err != nil {
if err := c.Provider.Client.CallCtx(ctx, blockCall); err != nil {
return block, err
}
@ -47,7 +47,7 @@ func (c *Chain) GetLatestBlock(ctx context.Context) (uint64, error) {
latestBlockCall := eth.BlockNumber().Returns(&latestBlock)
if err := c.provider.Client.CallCtx(ctx, latestBlockCall); err != nil {
if err := c.Provider.Client.CallCtx(ctx, latestBlockCall); err != nil {
return 0, err
}

View File

@ -14,7 +14,7 @@ type (
}
Chain struct {
provider *celoutils.Provider
Provider *celoutils.Provider
logg *slog.Logger
}
)
@ -35,7 +35,7 @@ func New(o ChainOpts) (*Chain, error) {
}
return &Chain{
provider: provider,
Provider: provider,
logg: o.Logg,
}, nil
}

View File

@ -8,5 +8,5 @@ import (
)
func (c *Chain) GetRevertReason(ctx context.Context, txHash common.Hash, blockNumber *big.Int) (string, error) {
return c.provider.SimulateRevertedTx(ctx, txHash, blockNumber)
return c.Provider.SimulateRevertedTx(ctx, txHash, blockNumber)
}

View File

@ -20,7 +20,7 @@ func (c *Chain) GetAllTokensFromTokenIndex(ctx context.Context, tokenIndex commo
tokenIndexEntryCount big.Int
)
if err := c.provider.Client.CallCtx(
if err := c.Provider.Client.CallCtx(
ctx,
eth.CallFunc(tokenIndex, entryCountFunc).Returns(&tokenIndexEntryCount),
); err != nil {
@ -34,7 +34,7 @@ func (c *Chain) GetAllTokensFromTokenIndex(ctx context.Context, tokenIndex commo
calls[i] = eth.CallFunc(tokenIndex, entrySig, new(big.Int).SetInt64(int64(i))).Returns(&tokenAddresses[i])
}
if err := c.provider.Client.CallCtx(ctx, calls...); err != nil {
if err := c.Provider.Client.CallCtx(ctx, calls...); err != nil {
return nil, err
}

View File

@ -18,7 +18,7 @@ func (c *Chain) GetTransactions(ctx context.Context, block types.Block) ([]types
calls[i] = eth.Tx(tx.Hash()).Returns(&transactions[i])
}
if err := c.provider.Client.CallCtx(ctx, calls...); err != nil {
if err := c.Provider.Client.CallCtx(ctx, calls...); err != nil {
return nil, err
}
@ -35,7 +35,7 @@ func (c *Chain) GetReceipts(ctx context.Context, block types.Block) ([]types.Rec
calls[i] = eth.TxReceipt(tx.Hash()).Returns(&receipts[i])
}
if err := c.provider.Client.CallCtx(ctx, calls...); err != nil {
if err := c.Provider.Client.CallCtx(ctx, calls...); err != nil {
return nil, err
}

View File

@ -8,6 +8,7 @@ import (
"github.com/alitto/pond"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/ef-ds/deque/v2"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celo-tracker/internal/db"
"github.com/grassrootseconomics/celo-tracker/internal/handler"
@ -22,6 +23,7 @@ type (
Logg *slog.Logger
Stats *stats.Stats
DB *db.DB
Cache cache.Cache
}
Processor struct {
@ -33,6 +35,7 @@ type (
db *db.DB
quit chan struct{}
handlers []handler.Handler
cache cache.Cache
}
)
@ -50,10 +53,12 @@ func NewProcessor(o ProcessorOpts) *Processor {
db: o.DB,
quit: make(chan struct{}),
handlers: handler.New(),
cache: o.Cache,
}
}
func (p *Processor) Start() {
p.logg.Info("processor started")
for {
select {
case <-p.quit:
@ -67,7 +72,7 @@ func (p *Processor) Start() {
if p.blocksQueue.Len() > 0 {
v, _ := p.blocksQueue.PopFront()
p.pool.Submit(func() {
p.logg.Info("processing", "block", v.Number())
p.logg.Debug("processing", "block", v.Number())
if err := p.processBlock(context.Background(), v); err != nil {
p.logg.Info("block processor error", "block", v.NumberU64(), "error", err)
}