feat: implement handler interface, add example emitter (stdout)

This commit is contained in:
Mohamed Sohail 2024-04-16 13:14:21 +08:00
parent 49feb5bd2e
commit 343f304eaf
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
8 changed files with 141 additions and 45 deletions

View File

@ -5,51 +5,8 @@ import (
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/w3-celo"
"github.com/grassrootseconomics/w3-celo/module/eth"
)
func (c *Chain) GetRevertReason(ctx context.Context, txHash common.Hash, blockNumber *big.Int) (string, error) {
return c.provider.SimulateRevertedTx(ctx, txHash, blockNumber)
}
func (c *Chain) TestDecodeTransfer(ctx context.Context, logs []*types.Log) {
signature := "Transfer(address indexed _from, address indexed _to, uint256 _value)"
eventTransfer := w3.MustNewEvent(signature)
for _, log := range logs {
if log.Topics[0] == w3.H("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef") {
var (
from common.Address
to common.Address
value big.Int
tokenSymbol string
tokenDecimals big.Int
)
if err := c.provider.Client.CallCtx(
ctx,
eth.CallFunc(log.Address, w3.MustNewFunc("symbol()", "string")).Returns(&tokenSymbol),
eth.CallFunc(log.Address, w3.MustNewFunc("decimals()", "uint256")).Returns(&tokenDecimals),
); err != nil {
c.logg.Error("token details fetcher", "error", err)
}
if err := eventTransfer.DecodeArgs(log, &from, &to, &value); err != nil {
c.logg.Error("event decoder", "error", err)
}
c.logg.Info("transfer event",
"hash", log.TxHash,
"token", tokenSymbol,
"from", from,
"to", to,
"value", value.Uint64(),
)
}
}
}

View File

@ -0,0 +1,15 @@
package emitter
import (
"log/slog"
"github.com/grassrootseconomics/celo-tracker/internal/handler"
)
func New(logg *slog.Logger) handler.EmitterEmitFunc {
stdOutEmitter := &LogEmitter{
logg: logg,
}
return stdOutEmitter.Emit
}

View File

@ -0,0 +1,24 @@
package emitter
import (
"context"
"encoding/json"
"log/slog"
)
type (
LogEmitter struct {
logg *slog.Logger
}
)
func (l *LogEmitter) Emit(_ context.Context, payload []byte) error {
var event map[string]interface{}
if err := json.Unmarshal(payload, &event); err != nil {
return err
}
l.logg.Info("emitted event", "json_payload", event)
return nil
}

View File

@ -0,0 +1,27 @@
package handler
import (
"context"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/w3-celo"
)
type (
EmitterEmitFunc func(context.Context, []byte) error
Handler interface {
Handle(context.Context, *types.Log, EmitterEmitFunc) error
}
)
func New() []Handler {
transferHandler := &TransferHandler{
topicHash: w3.H("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"),
event: w3.MustNewEvent("Transfer(address indexed _from, address indexed _to, uint256 _value)"),
}
return []Handler{
transferHandler,
}
}

View File

@ -0,0 +1,55 @@
package handler
import (
"context"
"encoding/json"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/w3-celo"
)
type (
TransferHandler struct {
topicHash common.Hash
event *w3.Event
}
TransferEvent struct {
Contract string
From string
To string
Value uint64
}
)
func (h *TransferHandler) Handle(ctx context.Context, log *types.Log, emitFn EmitterEmitFunc) error {
if log.Topics[0] == h.topicHash {
var (
from common.Address
to common.Address
value big.Int
)
if err := h.event.DecodeArgs(log, &from, &to, &value); err != nil {
return err
}
transferEvent := &TransferEvent{
Contract: log.Address.Hex(),
From: from.Hex(),
To: to.Hex(),
Value: value.Uint64(),
}
jsonData, err := json.Marshal(transferEvent)
if err != nil {
return err
}
return emitFn(ctx, jsonData)
}
return nil
}

View File

@ -5,6 +5,7 @@ import (
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celo-tracker/internal/emitter"
)
func (p *Processor) processBlock(ctx context.Context, block types.Block) error {
@ -24,8 +25,11 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error {
for i, receipt := range receiptsResp {
if receipt.Status > 0 {
// test transfers
p.chain.TestDecodeTransfer(ctx, receipt.Logs)
for _, log := range receipt.Logs {
if err := p.handleLogs(ctx, log); err != nil {
p.logg.Error("hanlder error", "error", err)
}
}
} else {
revertReason, _ := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber)
p.logg.Debug("tx reverted", "hash", receipt.TxHash, "revert_reason", revertReason, "input_data", common.Bytes2Hex(txs[i].Data()))
@ -40,3 +44,14 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error {
return nil
}
func (p *Processor) handleLogs(ctx context.Context, log *types.Log) error {
defaultEmitter := emitter.New(p.logg)
for _, handler := range p.handlers {
if err := handler.Handle(ctx, log, defaultEmitter); err != nil {
return err
}
}
return nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/ef-ds/deque/v2"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celo-tracker/internal/db"
"github.com/grassrootseconomics/celo-tracker/internal/handler"
"github.com/grassrootseconomics/celo-tracker/internal/pool"
"github.com/grassrootseconomics/celo-tracker/internal/stats"
)
@ -31,6 +32,7 @@ type (
stats *stats.Stats
db *db.DB
quit chan struct{}
handlers []handler.Handler
}
)
@ -47,6 +49,7 @@ func NewProcessor(o ProcessorOpts) *Processor {
stats: o.Stats,
db: o.DB,
quit: make(chan struct{}),
handlers: handler.New(),
}
}

BIN
test

Binary file not shown.