diff --git a/cmd/tracker/main.go b/cmd/tracker/main.go index a1966c8..365ee6c 100644 --- a/cmd/tracker/main.go +++ b/cmd/tracker/main.go @@ -23,7 +23,7 @@ import ( "github.com/knadh/koanf/v2" ) -const defaultGracefulShutdownPeriod = time.Second * 15 +const defaultGracefulShutdownPeriod = time.Second * 20 var ( build = "dev" @@ -95,7 +95,7 @@ func main() { chainSyncer, err := syncer.New(syncer.SyncerOpts{ WebSocketEndpoint: ko.MustString("chain.ws_endpoint"), EnableHistorical: ko.Bool("chain.historical"), - StartBlock: uint64(ko.MustInt64("bootstrap.start_block")), + StartBlock: uint64(ko.MustInt64("chain.start_block")), BatchQueue: &batchQueue, BlocksQueue: &blocksQueue, Chain: chain, @@ -107,10 +107,6 @@ func main() { lo.Error("could not initialize chain syncer", "error", err) os.Exit(1) } - // if err := chainSyncer.BootstrapHistoricalSyncer(); err != nil { - // lo.Error("could not bootstrap historical syncer", "error", err) - // os.Exit(1) - // } cache, err := cache.New(cache.CacheOpts{ Logg: lo, @@ -138,11 +134,18 @@ func main() { Emitter: defaultEmitter, }) - // wg.Add(1) - // go func() { - // defer wg.Done() - // chainSyncer.StartHistoricalSyncer(ctx) - // }() + if ko.Bool("chain.historical") { + wg.Add(1) + go func() { + defer wg.Done() + if err := chainSyncer.BootstrapHistoricalSyncer(); err != nil { + lo.Error("could not bootstrap historical syncer", "error", err) + os.Exit(1) + } + + chainSyncer.StartHistoricalSyncer() + }() + } wg.Add(1) go func() { @@ -160,16 +163,12 @@ func main() { lo.Info("shutdown signal received") shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultGracefulShutdownPeriod) - wg.Add(1) - go func() { - defer wg.Done() - chainSyncer.StopRealtime() - }() - wg.Add(1) go func() { defer wg.Done() blockProcessor.Stop() + chainSyncer.StopHistoricalSyncer() + chainSyncer.StopRealtime() }() go func() { diff --git a/config.toml b/config.toml index 5c4b8ff..e31e656 100644 --- a/config.toml +++ b/config.toml @@ -6,11 +6,11 @@ address = ":5001" [chain] ws_endpoint = "wss://ws.celo.grassecon.net" -rpc_endpoint = "https://1rpc.io/celo" +rpc_endpoint = "https://celo.grassecon.net" testnet = false realtime = true -historical = false -start_block = 24905000 +historical = true +start_block = 25217425 [bootstrap] # https://software.grassecon.org/addresses @@ -19,4 +19,4 @@ ge_registries = [ "0x0cc9f4fff962def35bb34a53691180b13e653030", ] watchlist = [""] -blacklist = [""] +blacklist = ["0x765DE816845861e75A25fCA122bb6898B8B1282a"] diff --git a/internal/processor/block.go b/internal/processor/block.go index 4db1413..0bc43c8 100644 --- a/internal/processor/block.go +++ b/internal/processor/block.go @@ -22,6 +22,10 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { return err } + if len(receiptsResp) != len(txs) { + return fmt.Errorf("block txs receipts len mismatch %d", blockNumber) + } + for i, receipt := range receiptsResp { if receipt.Status > 0 { for _, log := range receipt.Logs { @@ -37,7 +41,7 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { } } } else { - if p.cache.Exists(txs[i].To().Hex()) { + if txs[i].To() != nil && p.cache.Exists(txs[i].To().Hex()) { from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i]) if err != nil { p.logg.Error("handler error", "handler_type", "revert", "error", err) diff --git a/internal/syncer/historical.go b/internal/syncer/historical.go index 482b91f..284e1aa 100644 --- a/internal/syncer/historical.go +++ b/internal/syncer/historical.go @@ -7,7 +7,7 @@ import ( ) const ( - blockBatchSize = 100 + blockBatchSize = 10 emptyQueueIdelTime = 2 * time.Second ) diff --git a/internal/syncer/realtime.go b/internal/syncer/realtime.go index b870c27..d6d232e 100644 --- a/internal/syncer/realtime.go +++ b/internal/syncer/realtime.go @@ -16,7 +16,7 @@ type ( ) const ( - resubscribeInterval = 5 * time.Second + resubscribeInterval = 2 * time.Second ) func (s *Syncer) StartRealtime() {