diff --git a/cmd/tracker/main.go b/cmd/tracker/main.go index 4e62e20..03a2e4e 100644 --- a/cmd/tracker/main.go +++ b/cmd/tracker/main.go @@ -16,6 +16,7 @@ import ( "github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/chain" "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/emitter" "github.com/grassrootseconomics/celo-tracker/internal/processor" "github.com/grassrootseconomics/celo-tracker/internal/stats" "github.com/grassrootseconomics/celo-tracker/internal/syncer" @@ -120,6 +121,10 @@ func main() { os.Exit(1) } + defaultEmitter := emitter.New(emitter.EmitterOpts{ + Logg: lo, + }) + blockProcessor := processor.NewProcessor(processor.ProcessorOpts{ Chain: chain, BlocksQueue: &blocksQueue, @@ -127,6 +132,7 @@ func main() { Stats: stats, DB: db, Cache: cache, + Emitter: defaultEmitter, }) // wg.Add(1) diff --git a/internal/emitter/console.go b/internal/emitter/console.go index 9666326..4be9378 100644 --- a/internal/emitter/console.go +++ b/internal/emitter/console.go @@ -18,13 +18,12 @@ func NewConsoleEmitter(logg *slog.Logger) *ConsoleEmitter { } } -func (l *ConsoleEmitter) Emit(_ context.Context, payload []byte) error { - var event map[string]interface{} - - if err := json.Unmarshal(payload, &event); err != nil { +func (l *ConsoleEmitter) Emit(_ context.Context, payload interface{}) error { + jsonData, err := json.Marshal(payload) + if err != nil { return err } - l.logg.Info("emitted event", "json_payload", event) + l.logg.Info("emitted event", "json_payload", string(jsonData)) return nil } diff --git a/internal/emitter/emitter.go b/internal/emitter/emitter.go index 47786f4..9cb4285 100644 --- a/internal/emitter/emitter.go +++ b/internal/emitter/emitter.go @@ -7,7 +7,7 @@ import ( type ( Emitter interface { - Emit(context.Context, []byte) error + Emit(context.Context, interface{}) error } EmitterOpts struct { diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 23c0fc5..c4cc824 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -5,22 +5,42 @@ import ( "github.com/celo-org/celo-blockchain/core/types" "github.com/grassrootseconomics/celo-tracker/internal/emitter" - "github.com/grassrootseconomics/w3-celo" ) type ( Handler interface { - Handle(context.Context, *types.Log, emitter.Emitter) error + HandleLog(context.Context, LogMessage, emitter.Emitter) error + HandleRevert(context.Context, RevertMessage, emitter.Emitter) error + } + + LogMessage struct { + Log *types.Log + BlockTime uint64 + } + + RevertMessage struct { + From string + RevertReason string + InputData string + Block uint64 + ContractAddress string + Timestamp uint64 + TxHash string + } + + Event struct { + Block uint64 `json:"block"` + ContractAddress string `json:"contractAddress"` + Success bool `json:"success"` + Timestamp uint64 `json:"timestamp"` + TxHash string `json:"transactionHash"` + TxType string `json:"transactionType"` + Payload map[string]any `json:"payload"` } ) func New() []Handler { - transferHandler := &TransferHandler{ - topicHash: w3.H("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), - event: w3.MustNewEvent("Transfer(address indexed _from, address indexed _to, uint256 _value)"), - } - return []Handler{ - transferHandler, + &TransferHandler{}, } } diff --git a/internal/handler/mint.go b/internal/handler/mint.go new file mode 100644 index 0000000..abeebd1 --- /dev/null +++ b/internal/handler/mint.go @@ -0,0 +1 @@ +package handler diff --git a/internal/handler/transfer.go b/internal/handler/transfer.go index ffe571a..3ee5de5 100644 --- a/internal/handler/transfer.go +++ b/internal/handler/transfer.go @@ -2,11 +2,9 @@ package handler import ( "context" - "encoding/json" "math/big" "github.com/celo-org/celo-blockchain/common" - "github.com/celo-org/celo-blockchain/core/types" "github.com/grassrootseconomics/celo-tracker/internal/emitter" "github.com/grassrootseconomics/w3-celo" ) @@ -16,40 +14,106 @@ type ( topicHash common.Hash event *w3.Event } - - TransferEvent struct { - Contract string - From string - To string - Value uint64 - } ) -func (h *TransferHandler) Handle(ctx context.Context, log *types.Log, emitter emitter.Emitter) error { - if log.Topics[0] == h.topicHash { +var ( + transferTopicHash = w3.H("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef") + transferEvent = w3.MustNewEvent("Transfer(address indexed _from, address indexed _to, uint256 _value)") + transferSig = w3.MustNewFunc("transfer(address, uint256)", "bool") + transferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool") +) + +func (h *TransferHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { + if msg.Log.Topics[0] == transferTopicHash { var ( from common.Address to common.Address value big.Int ) - if err := h.event.DecodeArgs(log, &from, &to, &value); err != nil { + if err := transferEvent.DecodeArgs(msg.Log, &from, &to, &value); err != nil { return err } - transferEvent := &TransferEvent{ - Contract: log.Address.Hex(), - From: from.Hex(), - To: to.Hex(), - Value: value.Uint64(), + transferEvent := Event{ + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.BlockTime, + TxHash: msg.Log.TxHash.Hex(), + TxType: "TRANSFER", + Payload: map[string]any{ + "from": from.Hex(), + "to": to.Hex(), + "value": value.String(), + }, } - jsonData, err := json.Marshal(transferEvent) - if err != nil { - return err - } - - return emitter.Emit(ctx, jsonData) + return emitter.Emit(ctx, transferEvent) + } + + return nil +} + +func (h *TransferHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { + if len(msg.InputData) < 8 { + return nil + } + + switch msg.InputData[:8] { + case "a9059cbb": + var ( + to common.Address + value big.Int + ) + + if err := transferSig.DecodeArgs(w3.B(msg.InputData), &to, &value); err != nil { + return err + } + + transferEvent := Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: "TRANSFER", + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "from": msg.From, + "to": to.Hex(), + "value": value.String(), + }, + } + + return emitter.Emit(ctx, transferEvent) + case "23b872dd": + var ( + from common.Address + to common.Address + value big.Int + ) + + if err := transferFromSig.DecodeArgs(w3.B(msg.InputData), &from, &to, &value); err != nil { + return err + } + + transferEvent := Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: "TRANSFER", + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "from": from.Hex(), + "to": to.Hex(), + "value": value.String(), + }, + } + + return emitter.Emit(ctx, transferEvent) } return nil diff --git a/internal/processor/block.go b/internal/processor/block.go index 00be435..43751fc 100644 --- a/internal/processor/block.go +++ b/internal/processor/block.go @@ -5,7 +5,7 @@ import ( "github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/core/types" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/handler" ) func (p *Processor) processBlock(ctx context.Context, block types.Block) error { @@ -27,15 +27,41 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { if receipt.Status > 0 { for _, log := range receipt.Logs { if p.cache.Exists(log.Address.Hex()) { - if err := p.handleLogs(ctx, log); err != nil { - p.logg.Error("hanlder error", "error", err) + msg := handler.LogMessage{ + Log: log, + BlockTime: block.Time(), + } + + if err := p.handleLogs(ctx, msg); err != nil { + p.logg.Error("handler error", "handler_type", "log", "error", err) } } } } else { if p.cache.Exists(txs[i].To().Hex()) { - revertReason, _ := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber) - p.logg.Debug("tx reverted", "hash", receipt.TxHash, "revert_reason", revertReason, "input_data", common.Bytes2Hex(txs[i].Data())) + from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i]) + if err != nil { + p.logg.Error("hanlder error", "handler_type", "revert", "error", err) + } + + revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber) + if err != nil { + p.logg.Error("handler error", "handler_type", "revert", "error", err) + } + + msg := handler.RevertMessage{ + From: from.Hex(), + RevertReason: revertReason, + InputData: common.Bytes2Hex(txs[i].Data()), + Block: blockNumber, + ContractAddress: txs[i].To().Hex(), + Timestamp: block.Time(), + TxHash: receipt.TxHash.Hex(), + } + + if err := p.handleRevert(ctx, msg); err != nil { + p.logg.Error("handler error", "handler_type", "revert", "error", err) + } } } } @@ -48,15 +74,22 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { return nil } -func (p *Processor) handleLogs(ctx context.Context, log *types.Log) error { - defaultEmitter := emitter.New(emitter.EmitterOpts{ - Logg: p.logg, - }) - +func (p *Processor) handleLogs(ctx context.Context, msg handler.LogMessage) error { for _, handler := range p.handlers { - if err := handler.Handle(ctx, log, defaultEmitter); err != nil { + if err := handler.HandleLog(ctx, msg, p.emitter); err != nil { return err } } + + return nil +} + +func (p *Processor) handleRevert(ctx context.Context, msg handler.RevertMessage) error { + for _, handler := range p.handlers { + if err := handler.HandleRevert(ctx, msg, p.emitter); err != nil { + return err + } + } + return nil } diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 40e1205..e04a987 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -11,6 +11,7 @@ import ( "github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/chain" "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/emitter" "github.com/grassrootseconomics/celo-tracker/internal/handler" "github.com/grassrootseconomics/celo-tracker/internal/pool" "github.com/grassrootseconomics/celo-tracker/internal/stats" @@ -24,6 +25,7 @@ type ( Stats *stats.Stats DB *db.DB Cache cache.Cache + Emitter emitter.Emitter } Processor struct { @@ -36,6 +38,7 @@ type ( quit chan struct{} handlers []handler.Handler cache cache.Cache + emitter emitter.Emitter } ) @@ -54,6 +57,7 @@ func NewProcessor(o ProcessorOpts) *Processor { quit: make(chan struct{}), handlers: handler.New(), cache: o.Cache, + emitter: o.Emitter, } }