diff --git a/config.toml b/config.toml index 8348d69..a6af3a5 100644 --- a/config.toml +++ b/config.toml @@ -5,7 +5,7 @@ go_process = true # API server [service] # Host and port -address = ":5000" +address = ":5001" # Geth API endpoints [chain] diff --git a/internal/syncer/head.go b/internal/syncer/head.go index 818ca24..6209311 100644 --- a/internal/syncer/head.go +++ b/internal/syncer/head.go @@ -7,12 +7,14 @@ import ( "github.com/alitto/pond" "github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/ethclient" + "github.com/celo-org/celo-blockchain/event" "github.com/grassrootseconomics/cic-chain-events/internal/pipeline" "github.com/zerodha/logf" ) const ( - jobTimeout = 5 * time.Second + jobTimeout = 5 * time.Second + resubscribeBackoff = 2 * time.Second ) type ( @@ -53,10 +55,13 @@ func NewHeadSyncer(o HeadSyncerOpts) (*HeadSyncer, error) { func (hs *HeadSyncer) Start(ctx context.Context) error { headerReceiver := make(chan *types.Header, 1) - sub, err := hs.ethClient.SubscribeNewHead(context.Background(), headerReceiver) - if err != nil { - return err - } + sub := event.ResubscribeErr(resubscribeBackoff, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + hs.logg.Error("head syncer: resubscribe error", "error", err) + } + + return hs.ethClient.SubscribeNewHead(ctx, headerReceiver) + }) defer sub.Unsubscribe() for { @@ -64,8 +69,6 @@ func (hs *HeadSyncer) Start(ctx context.Context) error { case <-ctx.Done(): hs.logg.Info("head syncer: shutdown signal received") return nil - case err := <-sub.Err(): - return err case header := <-headerReceiver: blockNumber := header.Number.Uint64() hs.logg.Debug("head syncer: received new block", "block", blockNumber) @@ -75,7 +78,7 @@ func (hs *HeadSyncer) Start(ctx context.Context) error { defer cancel() if err := hs.pipeline.Run(ctx, blockNumber); err != nil { - hs.logg.Error("head syncer: piepline run error", "error", err) + hs.logg.Error("head syncer: pipeline run error", "error", err) } }) }