eth-tracker/internal/processor/processor.go

148 lines
4.2 KiB
Go
Raw Permalink Normal View History

2024-05-23 08:41:39 +02:00
package processor
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celo-tracker/internal/db"
"github.com/grassrootseconomics/celo-tracker/internal/handler"
"github.com/grassrootseconomics/celo-tracker/internal/pub"
"github.com/grassrootseconomics/celo-tracker/internal/stats"
)
type (
ProcessorOpts struct {
Cache cache.Cache
DB db.DB
Chain chain.Chain
Pub pub.Pub
Logg *slog.Logger
Stats *stats.Stats
}
Processor struct {
cache cache.Cache
db db.DB
chain chain.Chain
handlerPipeline handler.HandlerPipeline
logg *slog.Logger
stats *stats.Stats
}
)
func NewProcessor(o ProcessorOpts) *Processor {
return &Processor{
cache: o.Cache,
db: o.DB,
handlerPipeline: handler.New(o.Pub, o.Cache),
chain: o.Chain,
logg: o.Logg,
stats: o.Stats,
}
}
func (p *Processor) ProcessBlock(ctx context.Context, blockNumber uint64) error {
block, err := p.chain.GetBlock(ctx, blockNumber)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("block %d error: %v", blockNumber, err)
}
receiptsResp, err := p.chain.GetReceipts(ctx, block)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("receipts fetch error: block %d: %v", blockNumber, err)
}
for _, receipt := range receiptsResp {
if receipt.Status > 0 {
for _, log := range receipt.Logs {
if p.cache.Exists(log.Address.Hex()) {
msg := handler.LogMessage{
Log: log,
Timestamp: block.Time(),
}
if err := p.handleLog(ctx, msg); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("handle logs error: block %d: %v", blockNumber, err)
}
}
}
} else if p.isTrieAvailable(blockNumber) {
tx, err := p.chain.GetTransaction(ctx, receipt.TxHash)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("get transaction error: tx %s: %v", receipt.TxHash.Hex(), err)
}
if tx.To() == nil {
return nil
}
if p.cache.Exists(tx.To().Hex()) {
from, err := types.Sender(types.LatestSignerForChainID(tx.ChainId()), tx)
if err != nil {
return fmt.Errorf("transaction decode error: tx %s: %v", receipt.TxHash.Hex(), err)
}
revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("get revert reason error: tx %s: %v", receipt.TxHash.Hex(), err)
}
msg := handler.RevertMessage{
From: from.Hex(),
RevertReason: revertReason,
InputData: common.Bytes2Hex(tx.Data()),
Block: blockNumber,
ContractAddress: tx.To().Hex(),
Timestamp: block.Time(),
TxHash: receipt.TxHash.Hex(),
}
if err := p.handleRevert(ctx, msg); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("handle revert error: tx %s: %v", receipt.TxHash.Hex(), err)
}
}
}
}
if err := p.db.SetValue(blockNumber); err != nil {
return err
}
p.logg.Debug("successfully processed block", "block", blockNumber)
return nil
}
func (p *Processor) isTrieAvailable(blockNumber uint64) bool {
2024-05-23 09:21:42 +02:00
available := p.chain.IsArchiveNode() || p.stats.GetLatestBlock()-blockNumber <= 128
2024-05-23 08:41:39 +02:00
if !available {
p.logg.Warn("skipping block due to potentially missing trie", "block_number", blockNumber)
}
return available
}
func (p *Processor) handleLog(ctx context.Context, msg handler.LogMessage) error {
for _, handler := range p.handlerPipeline {
if err := handler.HandleLog(ctx, msg); err != nil {
return fmt.Errorf("log handler: %s err: %v", handler.Name(), err)
}
}
return nil
}
func (p *Processor) handleRevert(ctx context.Context, msg handler.RevertMessage) error {
for _, handler := range p.handlerPipeline {
if err := handler.HandleRevert(ctx, msg); err != nil {
return fmt.Errorf("revert handler: %s err: %v", handler.Name(), err)
}
}
return nil
}