feat: enable historical syncer, fix ignore contract deploy txs in revert handler

This commit is contained in:
Mohamed Sohail 2024-04-22 12:00:37 +08:00
parent 9782b3c9c5
commit 03b765a3bb
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
5 changed files with 27 additions and 24 deletions

View File

@ -23,7 +23,7 @@ import (
"github.com/knadh/koanf/v2" "github.com/knadh/koanf/v2"
) )
const defaultGracefulShutdownPeriod = time.Second * 15 const defaultGracefulShutdownPeriod = time.Second * 20
var ( var (
build = "dev" build = "dev"
@ -95,7 +95,7 @@ func main() {
chainSyncer, err := syncer.New(syncer.SyncerOpts{ chainSyncer, err := syncer.New(syncer.SyncerOpts{
WebSocketEndpoint: ko.MustString("chain.ws_endpoint"), WebSocketEndpoint: ko.MustString("chain.ws_endpoint"),
EnableHistorical: ko.Bool("chain.historical"), EnableHistorical: ko.Bool("chain.historical"),
StartBlock: uint64(ko.MustInt64("bootstrap.start_block")), StartBlock: uint64(ko.MustInt64("chain.start_block")),
BatchQueue: &batchQueue, BatchQueue: &batchQueue,
BlocksQueue: &blocksQueue, BlocksQueue: &blocksQueue,
Chain: chain, Chain: chain,
@ -107,10 +107,6 @@ func main() {
lo.Error("could not initialize chain syncer", "error", err) lo.Error("could not initialize chain syncer", "error", err)
os.Exit(1) os.Exit(1)
} }
// if err := chainSyncer.BootstrapHistoricalSyncer(); err != nil {
// lo.Error("could not bootstrap historical syncer", "error", err)
// os.Exit(1)
// }
cache, err := cache.New(cache.CacheOpts{ cache, err := cache.New(cache.CacheOpts{
Logg: lo, Logg: lo,
@ -138,11 +134,18 @@ func main() {
Emitter: defaultEmitter, Emitter: defaultEmitter,
}) })
// wg.Add(1) if ko.Bool("chain.historical") {
// go func() { wg.Add(1)
// defer wg.Done() go func() {
// chainSyncer.StartHistoricalSyncer(ctx) defer wg.Done()
// }() if err := chainSyncer.BootstrapHistoricalSyncer(); err != nil {
lo.Error("could not bootstrap historical syncer", "error", err)
os.Exit(1)
}
chainSyncer.StartHistoricalSyncer()
}()
}
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -160,16 +163,12 @@ func main() {
lo.Info("shutdown signal received") lo.Info("shutdown signal received")
shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultGracefulShutdownPeriod) shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultGracefulShutdownPeriod)
wg.Add(1)
go func() {
defer wg.Done()
chainSyncer.StopRealtime()
}()
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
blockProcessor.Stop() blockProcessor.Stop()
chainSyncer.StopHistoricalSyncer()
chainSyncer.StopRealtime()
}() }()
go func() { go func() {

View File

@ -6,11 +6,11 @@ address = ":5001"
[chain] [chain]
ws_endpoint = "wss://ws.celo.grassecon.net" ws_endpoint = "wss://ws.celo.grassecon.net"
rpc_endpoint = "https://1rpc.io/celo" rpc_endpoint = "https://celo.grassecon.net"
testnet = false testnet = false
realtime = true realtime = true
historical = false historical = true
start_block = 24905000 start_block = 25217425
[bootstrap] [bootstrap]
# https://software.grassecon.org/addresses # https://software.grassecon.org/addresses
@ -19,4 +19,4 @@ ge_registries = [
"0x0cc9f4fff962def35bb34a53691180b13e653030", "0x0cc9f4fff962def35bb34a53691180b13e653030",
] ]
watchlist = [""] watchlist = [""]
blacklist = [""] blacklist = ["0x765DE816845861e75A25fCA122bb6898B8B1282a"]

View File

@ -22,6 +22,10 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error {
return err return err
} }
if len(receiptsResp) != len(txs) {
return fmt.Errorf("block txs receipts len mismatch %d", blockNumber)
}
for i, receipt := range receiptsResp { for i, receipt := range receiptsResp {
if receipt.Status > 0 { if receipt.Status > 0 {
for _, log := range receipt.Logs { for _, log := range receipt.Logs {
@ -37,7 +41,7 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error {
} }
} }
} else { } else {
if p.cache.Exists(txs[i].To().Hex()) { if txs[i].To() != nil && p.cache.Exists(txs[i].To().Hex()) {
from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i]) from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i])
if err != nil { if err != nil {
p.logg.Error("handler error", "handler_type", "revert", "error", err) p.logg.Error("handler error", "handler_type", "revert", "error", err)

View File

@ -7,7 +7,7 @@ import (
) )
const ( const (
blockBatchSize = 100 blockBatchSize = 10
emptyQueueIdelTime = 2 * time.Second emptyQueueIdelTime = 2 * time.Second
) )

View File

@ -16,7 +16,7 @@ type (
) )
const ( const (
resubscribeInterval = 5 * time.Second resubscribeInterval = 2 * time.Second
) )
func (s *Syncer) StartRealtime() { func (s *Syncer) StartRealtime() {