feat: refactor historical syncer

This commit is contained in:
Mohamed Sohail 2024-04-19 16:46:23 +08:00
parent f981007e71
commit 9782b3c9c5
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
9 changed files with 80 additions and 58 deletions

View File

@ -94,13 +94,14 @@ func main() {
chainSyncer, err := syncer.New(syncer.SyncerOpts{
WebSocketEndpoint: ko.MustString("chain.ws_endpoint"),
EnableHistorical: ko.Bool("chain.historical"),
StartBlock: uint64(ko.MustInt64("bootstrap.start_block")),
BatchQueue: &batchQueue,
BlocksQueue: &blocksQueue,
Chain: chain,
Logg: lo,
Stats: stats,
DB: db,
InitialLowerBound: uint64(ko.MustInt64("bootstrap.start_block")),
})
if err != nil {
lo.Error("could not initialize chain syncer", "error", err)
@ -114,7 +115,9 @@ func main() {
cache, err := cache.New(cache.CacheOpts{
Logg: lo,
Chain: chain,
Registries: ko.MustStrings("bootstrap.registries"),
Registries: ko.MustStrings("bootstrap.ge_registries"),
Blacklist: ko.MustStrings("bootstrap.blacklist"),
Watchlist: ko.MustStrings("bootstrap.watchlist"),
})
if err != nil {
lo.Error("could not initialize cache", "error", err)

View File

@ -6,14 +6,17 @@ address = ":5001"
[chain]
ws_endpoint = "wss://ws.celo.grassecon.net"
rpc_endpoint = "https://rpc.ankr.com/celo/bae2b7745f52c50974d7ecb1a7c23dc05d9ab5b68caf498a7c73f09a3e8bc04a"
rpc_endpoint = "https://1rpc.io/celo"
testnet = false
realtime = true
historical = false
start_block = 24905000
[bootstrap]
start_block = 24905000
# https://software.grassecon.org/addresses
registries = [
ge_registries = [
"0xd1FB944748aca327a1ba036B082993D9dd9Bfa0C",
"0x0cc9f4fff962def35bb34a53691180b13e653030",
]
watchlist = [""]
blacklist = [""]

View File

@ -10,7 +10,7 @@ import (
"github.com/grassrootseconomics/w3-celo/module/eth"
)
func bootstrapAllGESmartContracts(ctx context.Context, registries []string, chain *chain.Chain, cache Cache) (WatchableIndex, error) {
func bootstrapGESmartContracts(ctx context.Context, registries []string, chain *chain.Chain, cache Cache) (WatchableIndex, error) {
var (
watchableIndex = make(WatchableIndex)
)

View File

@ -25,6 +25,7 @@ type (
CacheType string
Registries []string
Blacklist []string
Watchlist []string
}
WatchableIndex map[string]bool
@ -47,7 +48,7 @@ func New(o CacheOpts) (Cache, error) {
cache = NewMapCache()
}
watchableIndex, err := bootstrapAllGESmartContracts(
watchableIndex, err := bootstrapGESmartContracts(
context.Background(),
o.Registries,
o.Chain,
@ -56,12 +57,16 @@ func New(o CacheOpts) (Cache, error) {
if err != nil {
return nil, err
}
// We only watch the token and pool indexes
// If at some point we want to eatch the user index, this line should be removed
cache.SetWatchableIndex(watchableIndex)
for _, address := range o.Watchlist {
cache.Add(address)
}
for _, address := range o.Blacklist {
cache.Remove(address)
}
o.Logg.Debug("cache bootstrap complete", "cached_addresses", cache.Size())
return cache, nil

View File

@ -13,13 +13,11 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error {
blockNumber := block.NumberU64()
txs, err := p.chain.GetTransactions(ctx, block)
p.logg.Debug("successfully fetched transactions", "txs", len(txs))
if err != nil {
return err
}
receiptsResp, err := p.chain.GetReceipts(ctx, block)
p.logg.Debug("successfully fetched receipts", "receipts", len(txs))
if err != nil {
return err
}
@ -42,7 +40,7 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error {
if p.cache.Exists(txs[i].To().Hex()) {
from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i])
if err != nil {
p.logg.Error("hanlder error", "handler_type", "revert", "error", err)
p.logg.Error("handler error", "handler_type", "revert", "error", err)
}
revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber)

View File

@ -83,7 +83,7 @@ func (p *Processor) Start() {
})
} else {
time.Sleep(emptyQueueIdleTime)
p.logg.Debug("queue empty slept for 1 second")
p.logg.Debug("processor queue empty slept for 1 second")
}
}
}

View File

@ -3,41 +3,31 @@ package syncer
import (
"context"
"fmt"
"github.com/dgraph-io/badger/v4"
"time"
)
const (
blockBatchSize = 100
blockBatchSize = 100
emptyQueueIdelTime = 2 * time.Second
)
func (s *Syncer) BootstrapHistoricalSyncer() error {
v, err := s.db.GetLowerBound()
if err != nil {
if err == badger.ErrKeyNotFound {
if err := s.db.SetLowerBound(s.initialLowerBound); err != nil {
return err
}
v = s.initialLowerBound
} else {
return err
}
}
latestBlock, err := s.chain.GetLatestBlock(context.Background())
lower, err := s.db.GetLowerBound()
if err != nil {
return err
}
if err := s.db.SetUpperBound(latestBlock); err != nil {
upper, err := s.db.GetUpperBound()
if err != nil {
return err
}
missingBlocks, err := s.db.GetMissingValuesBitSet(v, latestBlock)
missingBlocks, err := s.db.GetMissingValuesBitSet(lower, upper)
if err != nil {
return err
}
missingBlocksCount := missingBlocks.Count()
s.logg.Info("bootstrapping historical syncer", "missing_blocks", missingBlocksCount, "lower_bound", v, "upper_bound", latestBlock)
s.logg.Info("bootstrapping historical syncer", "missing_blocks", missingBlocksCount, "lower_bound", lower, "upper_bound", upper)
buffer := make([]uint, missingBlocksCount)
missingBlocks.NextSetMany(0, buffer)
@ -48,12 +38,12 @@ func (s *Syncer) BootstrapHistoricalSyncer() error {
return nil
}
func (s *Syncer) StartHistoricalSyncer(ctx context.Context) error {
func (s *Syncer) StartHistoricalSyncer() error {
s.logg.Info("starting historical syncer", "batch_size", blockBatchSize)
for {
select {
case <-ctx.Done():
s.logg.Info("historical syncer shutting down")
case <-s.quit:
s.logg.Info("historical syncer stopped")
return nil
default:
if s.batchQueue.Len() > 0 {
@ -78,7 +68,15 @@ func (s *Syncer) StartHistoricalSyncer(ctx context.Context) error {
for _, v := range blocks {
s.blocksQueue.PushBack(v)
}
} else {
time.Sleep(emptyQueueIdelTime)
s.logg.Debug("historical batcher queue empty slept for 2 seconds")
}
}
}
}
func (s *Syncer) StopHistoricalSyncer() {
s.logg.Info("signaling historical syncer shutdown")
s.quit <- struct{}{}
}

View File

@ -16,7 +16,7 @@ type (
)
const (
resubscribeInterval = 15 * time.Second
resubscribeInterval = 5 * time.Second
)
func (s *Syncer) StartRealtime() {
@ -55,7 +55,6 @@ func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (ce
for {
select {
case header := <-newHeadersReceiver:
s.logg.Debug("received block", "block", header.Number.Uint64())
if err := fn(eventsCtx, header.Number.Uint64()); err != nil {
if !errors.Is(err, context.Canceled) {
s.logg.Error("realtime block queuer error", "error", err)
@ -79,7 +78,6 @@ func (s *Syncer) queueRealtimeBlock(ctx context.Context, blockNumber uint64) err
}
}
s.blocksQueue.PushFront(block)
s.logg.Debug("queued block", "block", blockNumber)
return nil
}

View File

@ -1,7 +1,7 @@
package syncer
import (
"errors"
"context"
"log/slog"
"github.com/celo-org/celo-blockchain"
@ -16,32 +16,48 @@ import (
type (
SyncerOpts struct {
WebSocketEndpoint string
EnableHistorical bool
StartBlock uint64
BatchQueue *deque.Deque[uint64]
BlocksQueue *deque.Deque[types.Block]
Chain *chain.Chain
Logg *slog.Logger
Stats *stats.Stats
DB *db.DB
InitialLowerBound uint64
}
Syncer struct {
batchQueue *deque.Deque[uint64]
blocksQueue *deque.Deque[types.Block]
chain *chain.Chain
logg *slog.Logger
stats *stats.Stats
ethClient *ethclient.Client
db *db.DB
initialLowerBound uint64
//
batchQueue *deque.Deque[uint64]
blocksQueue *deque.Deque[types.Block]
chain *chain.Chain
logg *slog.Logger
stats *stats.Stats
ethClient *ethclient.Client
db *db.DB
quit chan struct{}
startBlock uint64
realtimeSub celo.Subscription
}
)
func New(o SyncerOpts) (*Syncer, error) {
if o.InitialLowerBound == 0 {
return nil, errors.New("initial lower bound not set")
if o.EnableHistorical {
latestBlock, err := o.Chain.GetLatestBlock(context.Background())
if err != nil {
return nil, err
}
if o.StartBlock == 0 {
o.StartBlock = latestBlock
}
if err := o.DB.SetLowerBound(o.StartBlock); err != nil {
return nil, err
}
if err := o.DB.SetUpperBound(latestBlock); err != nil {
return nil, err
}
}
ethClient, err := ethclient.Dial(o.WebSocketEndpoint)
@ -50,13 +66,14 @@ func New(o SyncerOpts) (*Syncer, error) {
}
return &Syncer{
batchQueue: o.BatchQueue,
blocksQueue: o.BlocksQueue,
chain: o.Chain,
logg: o.Logg,
stats: o.Stats,
ethClient: ethClient,
db: o.DB,
initialLowerBound: o.InitialLowerBound,
batchQueue: o.BatchQueue,
blocksQueue: o.BlocksQueue,
chain: o.Chain,
logg: o.Logg,
stats: o.Stats,
ethClient: ethClient,
db: o.DB,
quit: make(chan struct{}),
startBlock: o.StartBlock,
}, nil
}