feat: add retryable head subscription wrapper

* closes #18
This commit is contained in:
Mohamed Sohail 2023-04-14 08:27:08 +00:00
parent 8085168ed3
commit 4fcc088784
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
2 changed files with 12 additions and 9 deletions

View File

@ -5,7 +5,7 @@ go_process = true
# API server # API server
[service] [service]
# Host and port # Host and port
address = ":5000" address = ":5001"
# Geth API endpoints # Geth API endpoints
[chain] [chain]

View File

@ -7,12 +7,14 @@ import (
"github.com/alitto/pond" "github.com/alitto/pond"
"github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/ethclient" "github.com/celo-org/celo-blockchain/ethclient"
"github.com/celo-org/celo-blockchain/event"
"github.com/grassrootseconomics/cic-chain-events/internal/pipeline" "github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
const ( const (
jobTimeout = 5 * time.Second jobTimeout = 5 * time.Second
resubscribeBackoff = 2 * time.Second
) )
type ( type (
@ -53,10 +55,13 @@ func NewHeadSyncer(o HeadSyncerOpts) (*HeadSyncer, error) {
func (hs *HeadSyncer) Start(ctx context.Context) error { func (hs *HeadSyncer) Start(ctx context.Context) error {
headerReceiver := make(chan *types.Header, 1) headerReceiver := make(chan *types.Header, 1)
sub, err := hs.ethClient.SubscribeNewHead(context.Background(), headerReceiver) sub := event.ResubscribeErr(resubscribeBackoff, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil { if err != nil {
return err hs.logg.Error("head syncer: resubscribe error", "error", err)
} }
return hs.ethClient.SubscribeNewHead(ctx, headerReceiver)
})
defer sub.Unsubscribe() defer sub.Unsubscribe()
for { for {
@ -64,8 +69,6 @@ func (hs *HeadSyncer) Start(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
hs.logg.Info("head syncer: shutdown signal received") hs.logg.Info("head syncer: shutdown signal received")
return nil return nil
case err := <-sub.Err():
return err
case header := <-headerReceiver: case header := <-headerReceiver:
blockNumber := header.Number.Uint64() blockNumber := header.Number.Uint64()
hs.logg.Debug("head syncer: received new block", "block", blockNumber) hs.logg.Debug("head syncer: received new block", "block", blockNumber)
@ -75,7 +78,7 @@ func (hs *HeadSyncer) Start(ctx context.Context) error {
defer cancel() defer cancel()
if err := hs.pipeline.Run(ctx, blockNumber); err != nil { if err := hs.pipeline.Run(ctx, blockNumber); err != nil {
hs.logg.Error("head syncer: piepline run error", "error", err) hs.logg.Error("head syncer: pipeline run error", "error", err)
} }
}) })
} }