fix: tight loop on processor, refactor realtime routine

This commit is contained in:
Mohamed Sohail 2024-04-04 15:28:48 +08:00
parent 2007b34908
commit 239d706042
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
3 changed files with 110 additions and 36 deletions

View File

@ -3,6 +3,7 @@ package processor
import (
"context"
"log/slog"
"time"
"github.com/alitto/pond"
"github.com/celo-org/celo-blockchain/core/types"
@ -32,6 +33,10 @@ type (
}
)
const (
emptyQueueIdleTime = 1 * time.Second
)
func NewProcessor(o ProcessorOpts) *Processor {
return &Processor{
chain: o.Chain,
@ -43,26 +48,21 @@ func NewProcessor(o ProcessorOpts) *Processor {
}
}
func (p *Processor) Start(ctx context.Context) {
func (p *Processor) Start() {
for {
select {
case <-ctx.Done():
p.logg.Info("block processor shutting down")
p.Stop()
return
default:
if p.blocksQueue.Len() > 0 {
v, _ := p.blocksQueue.PopFront()
p.pool.Submit(func() {
if err := p.processBlock(v); err != nil {
p.logg.Info("block processor error", "block", v.NumberU64(), "error", err)
}
})
}
if p.blocksQueue.Len() > 0 {
v, _ := p.blocksQueue.PopFront()
p.pool.Submit(func() {
if err := p.processBlock(v); err != nil {
p.logg.Info("block processor error", "block", v.NumberU64(), "error", err)
}
})
} else {
time.Sleep(emptyQueueIdleTime)
}
}
}
func (p *Processor) Stop() {
p.pool.StopAndWait()
}

View File

@ -2,39 +2,110 @@ package syncer
import (
"context"
"fmt"
"time"
"github.com/celo-org/celo-blockchain"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/event"
)
type (
BlockQueueFn func(context.Context, uint64) error
)
const (
resubscribeInterval = 5 * time.Second
)
func (s *Syncer) StartRealtimeSyncer(ctx context.Context) error {
newHeadersReceiver := make(chan *types.Header, 1)
// func (s *Syncer) StartRealtimeSyncer(ctx context.Context) error {
// newHeadersReceiver := make(chan *types.Header, 1)
sub := event.ResubscribeErr(resubscribeInterval, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
s.logg.Error("realtime syncer resubscribe error", "error", err)
}
return s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver)
})
defer sub.Unsubscribe()
// sub := event.ResubscribeErr(resubscribeInterval, func(ctx context.Context, err error) (event.Subscription, error) {
// if err != nil {
// s.logg.Error("realtime syncer resubscribe error", "error", err)
// }
// return s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver)
// })
// defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
s.logg.Info("realtime syncer shutting down")
return nil
case header := <-newHeadersReceiver:
blockNumber := header.Number.Uint64()
block, err := s.chain.GetBlock(context.Background(), blockNumber)
if err != nil {
s.logg.Error("realtime block fetcher error", "block", blockNumber, "error", err)
// for {
// select {
// case <-ctx.Done():
// s.logg.Info("realtime syncer shutting down")
// return nil
// case header := <-newHeadersReceiver:
// blockNumber := header.Number.Uint64()
// block, err := s.chain.GetBlock(context.Background(), blockNumber)
// if err != nil {
// s.logg.Error("realtime block fetcher error", "block", blockNumber, "error", err)
// }
// s.blocksQueue.PushFront(block)
// }
// }
// }
func (s *Syncer) StartRealtime() {
s.realtimeSub = event.ResubscribeErr(resubscribeInterval, s.resubscribeFn())
}
func (s *Syncer) StopRealtime() {
s.realtimeSub.Unsubscribe()
s.realtimeSub = nil
}
func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (celo.Subscription, error) {
newHeadersReceiver := make(chan *types.Header, 10)
sub, err := s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver)
if err != nil {
return nil, err
}
return event.NewSubscription(func(quit <-chan struct{}) error {
eventsCtx, eventsCancel := context.WithCancel(context.Background())
defer sub.Unsubscribe()
defer eventsCancel()
go func() {
select {
case <-quit:
eventsCancel()
case <-eventsCtx.Done():
return
}
}()
for {
select {
case header := <-newHeadersReceiver:
s.logg.Debug("received block", "block", header.Number.Uint64())
if err := fn(eventsCtx, header.Number.Uint64()); err != nil {
s.logg.Error("realtime block queuer error", "error", err)
}
case <-eventsCtx.Done():
return nil
case err := <-sub.Err():
return err
}
s.blocksQueue.PushFront(block)
}
}), nil
}
func (s *Syncer) queueRealtimeBlock(ctx context.Context, blockNumber uint64) error {
block, err := s.chain.GetBlock(ctx, blockNumber)
if err != nil {
return fmt.Errorf("block %d error: %v", blockNumber, err)
}
s.blocksQueue.PushFront(block)
s.logg.Debug("queued block", "block", blockNumber)
return nil
}
func (s *Syncer) resubscribeFn() event.ResubscribeErrFunc {
return func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
s.logg.Error("resubscribing after failed suibscription", "error", err)
}
return s.receiveRealtimeBlocks(ctx, s.queueRealtimeBlock)
}
}

View File

@ -4,6 +4,7 @@ import (
"errors"
"log/slog"
"github.com/celo-org/celo-blockchain"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/ethclient"
"github.com/ef-ds/deque/v2"
@ -33,6 +34,8 @@ type (
ethClient *ethclient.Client
db *db.DB
initialLowerBound uint64
//
realtimeSub celo.Subscription
}
)