diff --git a/internal/processor/processor.go b/internal/processor/processor.go index ac8aceb..9a77e0b 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -3,6 +3,7 @@ package processor import ( "context" "log/slog" + "time" "github.com/alitto/pond" "github.com/celo-org/celo-blockchain/core/types" @@ -32,6 +33,10 @@ type ( } ) +const ( + emptyQueueIdleTime = 1 * time.Second +) + func NewProcessor(o ProcessorOpts) *Processor { return &Processor{ chain: o.Chain, @@ -43,26 +48,21 @@ func NewProcessor(o ProcessorOpts) *Processor { } } -func (p *Processor) Start(ctx context.Context) { +func (p *Processor) Start() { for { - select { - case <-ctx.Done(): - p.logg.Info("block processor shutting down") - p.Stop() - return - default: - if p.blocksQueue.Len() > 0 { - v, _ := p.blocksQueue.PopFront() - p.pool.Submit(func() { - if err := p.processBlock(v); err != nil { - p.logg.Info("block processor error", "block", v.NumberU64(), "error", err) - } - }) - } + if p.blocksQueue.Len() > 0 { + v, _ := p.blocksQueue.PopFront() + p.pool.Submit(func() { + if err := p.processBlock(v); err != nil { + p.logg.Info("block processor error", "block", v.NumberU64(), "error", err) + } + }) + } else { + time.Sleep(emptyQueueIdleTime) } } - } + func (p *Processor) Stop() { p.pool.StopAndWait() } diff --git a/internal/syncer/realtime.go b/internal/syncer/realtime.go index e89cb46..39bf723 100644 --- a/internal/syncer/realtime.go +++ b/internal/syncer/realtime.go @@ -2,39 +2,110 @@ package syncer import ( "context" + "fmt" "time" + "github.com/celo-org/celo-blockchain" "github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/event" ) +type ( + BlockQueueFn func(context.Context, uint64) error +) + const ( resubscribeInterval = 5 * time.Second ) -func (s *Syncer) StartRealtimeSyncer(ctx context.Context) error { - newHeadersReceiver := make(chan *types.Header, 1) +// func (s *Syncer) StartRealtimeSyncer(ctx context.Context) error { +// newHeadersReceiver := make(chan *types.Header, 1) - sub := event.ResubscribeErr(resubscribeInterval, func(ctx context.Context, err error) (event.Subscription, error) { - if err != nil { - s.logg.Error("realtime syncer resubscribe error", "error", err) - } - return s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver) - }) - defer sub.Unsubscribe() +// sub := event.ResubscribeErr(resubscribeInterval, func(ctx context.Context, err error) (event.Subscription, error) { +// if err != nil { +// s.logg.Error("realtime syncer resubscribe error", "error", err) +// } +// return s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver) +// }) +// defer sub.Unsubscribe() - for { - select { - case <-ctx.Done(): - s.logg.Info("realtime syncer shutting down") - return nil - case header := <-newHeadersReceiver: - blockNumber := header.Number.Uint64() - block, err := s.chain.GetBlock(context.Background(), blockNumber) - if err != nil { - s.logg.Error("realtime block fetcher error", "block", blockNumber, "error", err) +// for { +// select { +// case <-ctx.Done(): +// s.logg.Info("realtime syncer shutting down") +// return nil +// case header := <-newHeadersReceiver: +// blockNumber := header.Number.Uint64() +// block, err := s.chain.GetBlock(context.Background(), blockNumber) +// if err != nil { +// s.logg.Error("realtime block fetcher error", "block", blockNumber, "error", err) +// } +// s.blocksQueue.PushFront(block) +// } +// } +// } + +func (s *Syncer) StartRealtime() { + s.realtimeSub = event.ResubscribeErr(resubscribeInterval, s.resubscribeFn()) +} + +func (s *Syncer) StopRealtime() { + s.realtimeSub.Unsubscribe() + s.realtimeSub = nil +} + +func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (celo.Subscription, error) { + newHeadersReceiver := make(chan *types.Header, 10) + sub, err := s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver) + if err != nil { + return nil, err + } + + return event.NewSubscription(func(quit <-chan struct{}) error { + eventsCtx, eventsCancel := context.WithCancel(context.Background()) + defer sub.Unsubscribe() + defer eventsCancel() + + go func() { + select { + case <-quit: + eventsCancel() + case <-eventsCtx.Done(): + return + } + }() + + for { + select { + case header := <-newHeadersReceiver: + s.logg.Debug("received block", "block", header.Number.Uint64()) + if err := fn(eventsCtx, header.Number.Uint64()); err != nil { + s.logg.Error("realtime block queuer error", "error", err) + } + case <-eventsCtx.Done(): + return nil + case err := <-sub.Err(): + return err } - s.blocksQueue.PushFront(block) } + }), nil +} + +func (s *Syncer) queueRealtimeBlock(ctx context.Context, blockNumber uint64) error { + block, err := s.chain.GetBlock(ctx, blockNumber) + if err != nil { + return fmt.Errorf("block %d error: %v", blockNumber, err) + } + s.blocksQueue.PushFront(block) + s.logg.Debug("queued block", "block", blockNumber) + return nil +} + +func (s *Syncer) resubscribeFn() event.ResubscribeErrFunc { + return func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + s.logg.Error("resubscribing after failed suibscription", "error", err) + } + return s.receiveRealtimeBlocks(ctx, s.queueRealtimeBlock) } } diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index c7286e8..213a391 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -4,6 +4,7 @@ import ( "errors" "log/slog" + "github.com/celo-org/celo-blockchain" "github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/ethclient" "github.com/ef-ds/deque/v2" @@ -33,6 +34,8 @@ type ( ethClient *ethclient.Client db *db.DB initialLowerBound uint64 + // + realtimeSub celo.Subscription } )