diff --git a/Dockerfile b/Dockerfile index d545a1b..3b67371 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,7 +12,7 @@ WORKDIR /build COPY . . RUN go mod download RUN go build -o celo-tracker-cache-bootstrap -ldflags="-X main.build=${BUILD} -s -w" cmd/bootstrap/main.go -RUN go build -o celo-tracker -ldflags="-X main.build=${BUILD} -s -w" cmd/service/main.go +RUN go build -o celo-tracker -ldflags="-X main.build=${BUILD} -s -w" cmd/service/*.go FROM debian:bookworm-slim diff --git a/Makefile b/Makefile index 3b8b9ee..d8b0982 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ clean-db: build: ${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BOOTSTRAP_BIN} cmd/bootstrap/main.go - ${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BIN} cmd/service/main.go + ${BUILD_CONF} go build -ldflags="-X main.build=${BUILD_COMMIT} -s -w" -o ${BIN} cmd/service/*.go run-bootstrap: ${BUILD_CONF} ${DEBUG} go run cmd/bootstrap/main.go diff --git a/cmd/service/main.go b/cmd/service/main.go index 643c4ed..1b88c0f 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -21,7 +21,6 @@ import ( "github.com/grassrootseconomics/celo-tracker/internal/pool" "github.com/grassrootseconomics/celo-tracker/internal/processor" "github.com/grassrootseconomics/celo-tracker/internal/pub" - "github.com/grassrootseconomics/celo-tracker/internal/router" "github.com/grassrootseconomics/celo-tracker/internal/stats" "github.com/grassrootseconomics/celo-tracker/internal/syncer" "github.com/grassrootseconomics/celo-tracker/internal/util" @@ -92,10 +91,7 @@ func main() { os.Exit(1) } - router := router.New(router.RouterOpts{ - Pub: jetStreamPub, - Cache: cache, - }) + router := bootstrapEventRouter(cache, jetStreamPub.Send) blockProcessor := processor.NewProcessor(processor.ProcessorOpts{ Cache: cache, diff --git a/cmd/service/router.go b/cmd/service/router.go new file mode 100644 index 0000000..3114a58 --- /dev/null +++ b/cmd/service/router.go @@ -0,0 +1,42 @@ +package main + +import ( + "github.com/grassrootseconomics/celo-tracker/internal/cache" + "github.com/grassrootseconomics/celo-tracker/internal/handler" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +func bootstrapEventRouter(cacheProvider cache.Cache, pubCB router.Callback) *router.Router { + handlerContainer := handler.New(cacheProvider) + router := router.New(pubCB) + + router.RegisterLogRoute(w3.H("0x26162814817e23ec5035d6a2edc6c422da2da2119e27cfca6be65cc2dc55ca4c"), handler.HandleFaucetGiveLog()) + router.RegisterLogRoute(w3.H("0xa226db3f664042183ee0281230bba26cbf7b5057e50aee7f25a175ff45ce4d7f"), handler.HandleIndexAddLog(handlerContainer)) + router.RegisterLogRoute(w3.H("0x24a12366c02e13fe4a9e03d86a8952e85bb74a456c16e4a18b6d8295700b74bb"), handler.HandleIndexRemoveLog(handlerContainer)) + router.RegisterLogRoute(w3.H("0x8be0079c531659141344cd1fd0a4f28419497f9722a3daafe3b4186f6b6457e0"), handler.HandleOwnershipLog()) + router.RegisterLogRoute(w3.H("0x5548c837ab068cf56a2c2479df0882a4922fd203edb7517321831d95078c5f62"), handler.HandlePoolDepositLog()) + router.RegisterLogRoute(w3.H("0xd6d34547c69c5ee3d2667625c188acf1006abb93e0ee7cf03925c67cf7760413"), handler.HandlePoolSwapLog()) + router.RegisterLogRoute(w3.H("0xdb9ce1a76955721ca61ac50cd1b87f9ab8620325c8619a62192c2dc7871d56b1"), handler.HandleQuoterPriceUpdateLog()) + router.RegisterLogRoute(w3.H("0x6b7e2e653f93b645d4ed7292d6429f96637084363e477c8aaea1a43ed13c284e"), handler.HandleSealStateChangeLog()) + router.RegisterLogRoute(w3.H("0xcc16f5dbb4873280815c1ee09dbd06736cffcc184412cf7a71a0fdb75d397ca5"), handler.HandleTokenBurnLog()) + router.RegisterLogRoute(w3.H("0xab8530f87dc9b59234c4623bf917212bb2536d647574c8e7e5da92c2ede0c9f8"), handler.HandleTokenMintLog()) + router.RegisterLogRoute(w3.H("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), handler.HandleTokenTransferLog(handlerContainer)) + + router.RegisterInputDataRoute("63e4bff4", handler.HandleFaucetGiveInputData()) + router.RegisterInputDataRoute("de82efb4", handler.HandleFaucetGiveInputData()) + router.RegisterInputDataRoute("0a3b0a4f", handler.HandleIndexAddInputData()) + router.RegisterInputDataRoute("4420e486", handler.HandleIndexAddInputData()) + router.RegisterInputDataRoute("29092d0e", handler.HandleIndexRemoveInputData()) + router.RegisterInputDataRoute("f2fde38b", handler.HandleOwnershipInputData()) + router.RegisterInputDataRoute("47e7ef24", handler.HandlePoolDepositInputData()) + router.RegisterInputDataRoute("d9caed12", handler.HandlePoolSwapInputData()) + router.RegisterInputDataRoute("ebc59dff", handler.HandleQuoterPriceUpdateInputdata()) + router.RegisterInputDataRoute("86fe212d", handler.HandleSealStateChangeInputData()) + router.RegisterInputDataRoute("42966c68", handler.HandleTokenBurnInputData()) + router.RegisterInputDataRoute("449a52f8", handler.HandleTokenMintInputData()) + router.RegisterInputDataRoute("a9059cbb", handler.HandleTokenTransferInputData(handlerContainer)) + router.RegisterInputDataRoute("23b872dd", handler.HandleTokenTransferInputData(handlerContainer)) + + return router +} diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 77b32f4..7166208 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -39,24 +39,5 @@ func New(o CacheOpts) (Cache, error) { o.Logg.Warn("invalid cache type, using default type (map)") } - // geSmartContracts, err := o.Chain.Provider().GetGESmartContracts( - // context.Background(), - // o.Registries, - // ) - // if err != nil { - // return nil, fmt.Errorf("cache could not bootstrap GE smart contracts: err %v", err) - // } - - // for k, v := range geSmartContracts { - // cache.Add(k, v) - // } - // for _, address := range o.Watchlist { - // cache.Add(address, false) - // } - // for _, address := range o.Blacklist { - // cache.Remove(address) - // } - // o.Logg.Info("cache bootstrap complete", "cached_addresses", cache.Size()) - return cache, nil } diff --git a/internal/handler/faucet_give.go b/internal/handler/faucet_give.go new file mode 100644 index 0000000..3b61dd5 --- /dev/null +++ b/internal/handler/faucet_give.go @@ -0,0 +1,91 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/celoutils/v3" + "github.com/grassrootseconomics/w3-celo" +) + +const faucetGiveEventName = "FAUCET_GIVE" + +var ( + faucetGiveEvent = w3.MustNewEvent("Give(address indexed _recipient, address indexed _token, uint256 _amount)") + faucetGiveToSig = w3.MustNewFunc("giveTo(address)", "uint256") + faucetGimmeSig = w3.MustNewFunc("gimme()", "uint256") +) + +func HandleFaucetGiveLog() router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var ( + recipient common.Address + token common.Address + amount big.Int + ) + + if err := faucetGiveEvent.DecodeArgs(lp.Log, &recipient, &token, &amount); err != nil { + return err + } + + faucetGiveEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: faucetGiveEventName, + Payload: map[string]any{ + "recipient": recipient.Hex(), + "token": token.Hex(), + "amount": amount.String(), + }, + } + + return c(ctx, faucetGiveEvent) + } +} + +func HandleFaucetGiveInputData() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + faucetGiveEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: faucetGiveEventName, + } + + switch idp.InputData[:8] { + case "63e4bff4": + var to common.Address + + if err := faucetGiveToSig.DecodeArgs(w3.B(idp.InputData), &to); err != nil { + return err + } + + faucetGiveEvent.Payload = map[string]any{ + "recipient": to.Hex(), + "token": celoutils.ZeroAddress, + "amount": "0", + } + + return c(ctx, faucetGiveEvent) + case "de82efb4": + faucetGiveEvent.Payload = map[string]any{ + "recipient": celoutils.ZeroAddress, + "token": celoutils.ZeroAddress, + "amount": "0", + } + + return c(ctx, faucetGiveEvent) + } + + return nil + } +} diff --git a/internal/handler/handler.go b/internal/handler/handler.go new file mode 100644 index 0000000..5376cd0 --- /dev/null +++ b/internal/handler/handler.go @@ -0,0 +1,13 @@ +package handler + +import "github.com/grassrootseconomics/celo-tracker/internal/cache" + +type HandlerContainer struct { + cache cache.Cache +} + +func New(cacheProvider cache.Cache) *HandlerContainer { + return &HandlerContainer{ + cache: cacheProvider, + } +} diff --git a/internal/handler/index_add.go b/internal/handler/index_add.go new file mode 100644 index 0000000..decbe9d --- /dev/null +++ b/internal/handler/index_add.go @@ -0,0 +1,89 @@ +package handler + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +const indexAddEventName = "INDEX_ADD" + +var ( + indexAddEvent = w3.MustNewEvent("AddressAdded(address _token)") + indexAddSig = w3.MustNewFunc("add(address)", "bool") + indexRegisterSig = w3.MustNewFunc("register(address)", "bool") +) + +func HandleIndexAddLog(hc *HandlerContainer) router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var address common.Address + + if err := indexAddEvent.DecodeArgs(lp.Log, &address); err != nil { + return err + } + + indexAddEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: indexAddEventName, + Payload: map[string]any{ + "address": address.Hex(), + }, + } + + if err := hc.cache.Add(ctx, address.Hex()); err != nil { + return err + } + + return c(ctx, indexAddEvent) + } +} + +func HandleIndexAddInputData() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + indexAddEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: indexAddEventName, + } + + switch idp.InputData[:8] { + case "0a3b0a4f": + var address common.Address + + indexAddEvent.Payload = map[string]any{ + "address": address.Hex(), + } + + if err := indexAddSig.DecodeArgs(w3.B(idp.InputData), &address); err != nil { + return err + } + + return c(ctx, indexAddEvent) + case "4420e486": + var address common.Address + + indexAddEvent.Payload = map[string]any{ + "address": address.Hex(), + } + + if err := indexRegisterSig.DecodeArgs(w3.B(idp.InputData), &address); err != nil { + return err + } + + return c(ctx, indexAddEvent) + } + + return nil + } +} diff --git a/internal/handler/index_remove.go b/internal/handler/index_remove.go new file mode 100644 index 0000000..d284551 --- /dev/null +++ b/internal/handler/index_remove.go @@ -0,0 +1,70 @@ +package handler + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +const indexRemoveEventName = "INDEX_REMOVE" + +var ( + indexRemoveEvent = w3.MustNewEvent("AddressRemoved(address _token)") + indexRemoveSig = w3.MustNewFunc("remove(address)", "bool") +) + +func HandleIndexRemoveLog(hc *HandlerContainer) router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var address common.Address + + if err := indexRemoveEvent.DecodeArgs(lp.Log, &address); err != nil { + return err + } + + indexRemoveEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: indexRemoveEventName, + Payload: map[string]any{ + "address": address.Hex(), + }, + } + + if err := hc.cache.Remove(ctx, address.Hex()); err != nil { + return err + } + + return c(ctx, indexRemoveEvent) + } +} + +func HandleIndexRemoveInputData() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + var address common.Address + + if err := indexRemoveSig.DecodeArgs(w3.B(idp.InputData), &address); err != nil { + return err + } + + indexRemoveEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: indexRemoveEventName, + Payload: map[string]any{ + "address": address.Hex(), + }, + } + + return c(ctx, indexRemoveEvent) + } +} diff --git a/internal/handler/ownership.go b/internal/handler/ownership.go new file mode 100644 index 0000000..8f60cdf --- /dev/null +++ b/internal/handler/ownership.go @@ -0,0 +1,71 @@ +package handler + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +const ownershipEventName = "OWNERSHIP_TRANSFERRED" + +var ( + ownershipEvent = w3.MustNewEvent("OwnershipTransferred(address indexed previousOwner, address indexed newOwner)") + ownershipToSig = w3.MustNewFunc("transferOwnership(address)", "bool") +) + +func HandleOwnershipLog() router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var ( + previousOwner common.Address + newOwner common.Address + ) + + if err := ownershipEvent.DecodeArgs(lp.Log, &previousOwner, &newOwner); err != nil { + return err + } + + ownershipEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: ownershipEventName, + Payload: map[string]any{ + "previousOwner": previousOwner.Hex(), + "newOwner": newOwner.Hex(), + }, + } + + return c(ctx, ownershipEvent) + } +} + +func HandleOwnershipInputData() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + var newOwner common.Address + + if err := ownershipToSig.DecodeArgs(w3.B(idp.InputData), &newOwner); err != nil { + return err + } + + ownershipEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: ownershipEventName, + Payload: map[string]any{ + "previousOwner": idp.From, + "newOwner": newOwner.Hex(), + }, + } + + return c(ctx, ownershipEvent) + } +} diff --git a/internal/handler/pool_deposit.go b/internal/handler/pool_deposit.go new file mode 100644 index 0000000..a61e32a --- /dev/null +++ b/internal/handler/pool_deposit.go @@ -0,0 +1,83 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +const poolDepositEventName = "POOL_DEPOSIT" + +var ( + poolDepositEvent = w3.MustNewEvent("Deposit(address indexed initiator, address indexed tokenIn, uint256 amountIn)") + poolDepositSig = w3.MustNewFunc("deposit(address, uint256)", "") +) + +func HandlePoolDepositLog() router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var ( + initiator common.Address + tokenIn common.Address + amountIn big.Int + ) + + if err := poolDepositEvent.DecodeArgs( + lp.Log, + &initiator, + &tokenIn, + &amountIn, + ); err != nil { + return err + } + + poolDepositEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: poolDepositEventName, + Payload: map[string]any{ + "initiator": initiator.Hex(), + "tokenIn": tokenIn.Hex(), + "amountIn": amountIn.String(), + }, + } + + return c(ctx, poolDepositEvent) + } +} + +func HandlePoolDepositInputData() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + var ( + tokenIn common.Address + amountIn big.Int + ) + + if err := poolDepositSig.DecodeArgs(w3.B(idp.InputData), &tokenIn, &amountIn); err != nil { + return err + } + + poolDepositEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: poolDepositEventName, + Payload: map[string]any{ + "initiator": idp.From, + "tokenIn": tokenIn.Hex(), + "amountIn": amountIn.String(), + }, + } + + return c(ctx, poolDepositEvent) + } +} diff --git a/internal/handler/pool_swap.go b/internal/handler/pool_swap.go new file mode 100644 index 0000000..ddbfefe --- /dev/null +++ b/internal/handler/pool_swap.go @@ -0,0 +1,96 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +const poolSwapEventName = "POOL_SWAP" + +var ( + poolSwapEvent = w3.MustNewEvent("Swap(address indexed initiator, address indexed tokenIn, address tokenOut, uint256 amountIn, uint256 amountOut, uint256 fee)") + poolSwapSig = w3.MustNewFunc("withdraw(address, address, uint256)", "") +) + +func HandlePoolSwapLog() router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var ( + initiator common.Address + tokenIn common.Address + tokenOut common.Address + amountIn big.Int + amountOut big.Int + fee big.Int + ) + + if err := poolSwapEvent.DecodeArgs( + lp.Log, + &initiator, + &tokenIn, + &tokenOut, + &amountIn, + &amountOut, + &fee, + ); err != nil { + return err + } + + poolSwapEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: poolSwapEventName, + Payload: map[string]any{ + "initiator": initiator.Hex(), + "tokenIn": tokenIn.Hex(), + "tokenOut": tokenOut.Hex(), + "amountIn": amountIn.String(), + "amountOut": amountOut.String(), + "fee": fee.String(), + }, + } + + return c(ctx, poolSwapEvent) + } +} + +func HandlePoolSwapInputData() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + var ( + tokenOut common.Address + tokenIn common.Address + amountIn big.Int + ) + + if err := poolSwapSig.DecodeArgs(w3.B(idp.InputData), &tokenOut, &tokenIn, &amountIn); err != nil { + return err + } + + poolSwapEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: poolSwapEventName, + Payload: map[string]any{ + "initiator": idp.From, + "tokenIn": tokenIn.Hex(), + "tokenOut": tokenOut.Hex(), + "amountIn": amountIn.String(), + "amountOut": "0", + "fee": "0", + }, + } + + return c(ctx, poolSwapEvent) + } +} diff --git a/internal/handler/quoter_price.go b/internal/handler/quoter_price.go new file mode 100644 index 0000000..e4b39db --- /dev/null +++ b/internal/handler/quoter_price.go @@ -0,0 +1,75 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +const quoterPriceEventName = "QUOTER_PRICE_INDEX_UPDATED" + +var ( + quoterPriceEvent = w3.MustNewEvent("PriceIndexUpdated(address _tokenAddress, uint256 _exchangeRate)") + quoterPriceToSig = w3.MustNewFunc("setPriceIndexValue(address, uint256)", "uint256") +) + +func HandleQuoterPriceUpdateLog() router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var ( + token common.Address + exchangeRate big.Int + ) + + if err := quoterPriceEvent.DecodeArgs(lp.Log, &token, &exchangeRate); err != nil { + return err + } + + quoterPriceEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: quoterPriceEventName, + Payload: map[string]any{ + "token": token.Hex(), + "exchangeRate": exchangeRate.String(), + }, + } + + return c(ctx, quoterPriceEvent) + } +} + +func HandleQuoterPriceUpdateInputdata() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + var ( + token common.Address + exchangeRate big.Int + ) + + if err := quoterPriceToSig.DecodeArgs(w3.B(idp.InputData), &token, &exchangeRate); err != nil { + return err + } + + quoterPriceEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: quoterPriceEventName, + Payload: map[string]any{ + "token": token.Hex(), + "exchangeRate": exchangeRate.String(), + }, + } + + return c(ctx, quoterPriceEvent) + } +} diff --git a/internal/handler/seal.go b/internal/handler/seal.go new file mode 100644 index 0000000..5315111 --- /dev/null +++ b/internal/handler/seal.go @@ -0,0 +1,70 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +const sealEventName = "SEAL_STATE_CHANGE" + +var ( + sealEvent = w3.MustNewEvent("SealStateChange(bool indexed _final, uint256 _sealState)") + sealToSig = w3.MustNewFunc("seal(uint256)", "uint256") +) + +func HandleSealStateChangeLog() router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var ( + final bool + sealState big.Int + ) + + if err := sealEvent.DecodeArgs(lp.Log, &final, &sealState); err != nil { + return err + } + + sealEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: sealEventName, + Payload: map[string]any{ + "final": final, + "sealState": sealState.String(), + }, + } + + return c(ctx, sealEvent) + } +} + +func HandleSealStateChangeInputData() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + var sealState big.Int + + if err := sealToSig.DecodeArgs(w3.B(idp.InputData), &sealState); err != nil { + return err + } + + sealEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: sealEventName, + Payload: map[string]any{ + "sealState": sealState.String(), + }, + } + + return c(ctx, sealEvent) + } +} diff --git a/internal/handler/token_burn.go b/internal/handler/token_burn.go new file mode 100644 index 0000000..89da620 --- /dev/null +++ b/internal/handler/token_burn.go @@ -0,0 +1,72 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +const burnEventName = "TOKEN_BURN" + +var ( + tokenBurnEvent = w3.MustNewEvent("Burn(address indexed _tokenBurner, uint256 _value)") + tokenBurnToSig = w3.MustNewFunc("burn(uint256)", "bool") +) + +func HandleTokenBurnLog() router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var ( + tokenBurner common.Address + value big.Int + ) + + if err := tokenBurnEvent.DecodeArgs(lp.Log, &tokenBurner, &value); err != nil { + return err + } + + tokenBurnEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: burnEventName, + Payload: map[string]any{ + "tokenBurner": tokenBurner.Hex(), + "value": value.String(), + }, + } + + return c(ctx, tokenBurnEvent) + } +} + +func HandleTokenBurnInputData() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + var value big.Int + + if err := tokenBurnToSig.DecodeArgs(w3.B(idp.InputData), &value); err != nil { + return err + } + + tokenBurnEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: burnEventName, + Payload: map[string]any{ + "tokenBurner": idp.From, + "value": value.String(), + }, + } + + return c(ctx, tokenBurnEvent) + } +} diff --git a/internal/handler/token_mint.go b/internal/handler/token_mint.go new file mode 100644 index 0000000..4e9456c --- /dev/null +++ b/internal/handler/token_mint.go @@ -0,0 +1,78 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/w3-celo" +) + +const mintEventName = "TOKEN_MINT" + +var ( + tokenMintEvent = w3.MustNewEvent("Mint(address indexed _tokenMinter, address indexed _beneficiary, uint256 _value)") + tokenMintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool") +) + +func HandleTokenMintLog() router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var ( + tokenMinter common.Address + to common.Address + value big.Int + ) + + if err := tokenMintEvent.DecodeArgs(lp.Log, &tokenMinter, &to, &value); err != nil { + return err + } + + tokenMintEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: mintEventName, + Payload: map[string]any{ + "tokenMinter": tokenMinter.Hex(), + "to": to.Hex(), + "value": value.String(), + }, + } + + return c(ctx, tokenMintEvent) + } +} + +func HandleTokenMintInputData() router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + var ( + to common.Address + value big.Int + ) + + if err := tokenMintToSig.DecodeArgs(w3.B(idp.InputData), &to, &value); err != nil { + return err + } + + tokenMintEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: mintEventName, + Payload: map[string]any{ + "tokenMinter": idp.From, + "to": to.Hex(), + "value": value.String(), + }, + } + + return c(ctx, tokenMintEvent) + } +} diff --git a/internal/handler/token_transfer.go b/internal/handler/token_transfer.go new file mode 100644 index 0000000..675559d --- /dev/null +++ b/internal/handler/token_transfer.go @@ -0,0 +1,155 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/grassrootseconomics/celo-tracker/pkg/router" + "github.com/grassrootseconomics/celoutils/v3" + "github.com/grassrootseconomics/w3-celo" +) + +const transferEventName = "TOKEN_TRANSFER" + +var ( + tokenTransferEvent = w3.MustNewEvent("Transfer(address indexed _from, address indexed _to, uint256 _value)") + tokenTransferSig = w3.MustNewFunc("transfer(address, uint256)", "bool") + tokenTransferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool") + + stables = map[string]bool{ + celoutils.CUSDContractMainnet: true, + celoutils.CKESContractMainnet: true, + celoutils.USDTContractMainnet: true, + celoutils.USDCContractMainnet: true, + } +) + +func HandleTokenTransferLog(hc *HandlerContainer) router.LogHandlerFunc { + return func(ctx context.Context, lp router.LogPayload, c router.Callback) error { + var ( + from common.Address + to common.Address + value big.Int + ) + + if err := tokenTransferEvent.DecodeArgs(lp.Log, &from, &to, &value); err != nil { + return err + } + + proceed, err := hc.checkStables(ctx, from.Hex(), to.Hex(), lp.Log.Address.Hex()) + if err != nil { + return err + } + if !proceed { + return nil + } + + tokenTransferEvent := event.Event{ + Index: lp.Log.Index, + Block: lp.Log.BlockNumber, + ContractAddress: lp.Log.Address.Hex(), + Success: true, + Timestamp: lp.Timestamp, + TxHash: lp.Log.TxHash.Hex(), + TxType: transferEventName, + Payload: map[string]any{ + "from": from.Hex(), + "to": to.Hex(), + "value": value.String(), + }, + } + + return c(ctx, tokenTransferEvent) + } +} + +func HandleTokenTransferInputData(hc *HandlerContainer) router.InputDataHandlerFunc { + return func(ctx context.Context, idp router.InputDataPayload, c router.Callback) error { + tokenTransferEvent := event.Event{ + Block: idp.Block, + ContractAddress: idp.ContractAddress, + Success: false, + Timestamp: idp.Timestamp, + TxHash: idp.TxHash, + TxType: transferEventName, + } + + switch idp.InputData[:8] { + case "a9059cbb": + var ( + to common.Address + value big.Int + ) + + if err := tokenTransferSig.DecodeArgs(w3.B(idp.InputData), &to, &value); err != nil { + return err + } + + proceed, err := hc.checkStables(ctx, idp.From, to.Hex(), idp.ContractAddress) + if err != nil { + return err + } + if !proceed { + return nil + } + + tokenTransferEvent.Payload = map[string]any{ + "from": idp.From, + "to": to.Hex(), + "value": value.String(), + } + + return c(ctx, tokenTransferEvent) + case "23b872dd": + var ( + from common.Address + to common.Address + value big.Int + ) + + if err := tokenTransferFromSig.DecodeArgs(w3.B(idp.InputData), &from, &to, &value); err != nil { + return err + } + + proceed, err := hc.checkStables(ctx, from.Hex(), to.Hex(), idp.ContractAddress) + if err != nil { + return err + } + if !proceed { + return nil + } + + tokenTransferEvent.Payload = map[string]any{ + "from": from.Hex(), + "to": to.Hex(), + "value": value.String(), + } + + return c(ctx, tokenTransferEvent) + } + + return nil + } +} + +func (hc *HandlerContainer) checkStables(ctx context.Context, from string, to string, contractAddress string) (bool, error) { + _, ok := stables[contractAddress] + if !ok { + return true, nil + } + + // TODO: Pipeline this check on Redis with a new method + fromExists, err := hc.cache.Exists(ctx, from) + if err != nil { + return false, err + } + + toExists, err := hc.cache.Exists(ctx, to) + if err != nil { + return false, err + } + + return fromExists || toExists, nil +} diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 4db3c51..181e1c9 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -11,7 +11,7 @@ import ( "github.com/grassrootseconomics/celo-tracker/db" "github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/chain" - "github.com/grassrootseconomics/celo-tracker/internal/router" + "github.com/grassrootseconomics/celo-tracker/pkg/router" ) type ( @@ -61,9 +61,9 @@ func (p *Processor) ProcessBlock(ctx context.Context, blockNumber uint64) error return err } if exists { - if err := p.router.RouteSuccessTx( + if err := p.router.ProcessLog( ctx, - router.SuccessTx{ + router.LogPayload{ Log: log, Timestamp: block.Time(), }, @@ -89,9 +89,9 @@ func (p *Processor) ProcessBlock(ctx context.Context, blockNumber uint64) error return fmt.Errorf("transaction decode error: tx %s: %v", receipt.TxHash.Hex(), err) } - if err := p.router.RouteRevertTx( + if err := p.router.ProcessInputData( ctx, - router.RevertTx{ + router.InputDataPayload{ From: from.Hex(), InputData: common.Bytes2Hex(tx.Data()), Block: blockNumber, diff --git a/internal/queue/queue.go b/internal/queue/queue.go deleted file mode 100644 index 84ee4cd..0000000 --- a/internal/queue/queue.go +++ /dev/null @@ -1,73 +0,0 @@ -package queue - -// -// import ( -// "context" -// "log/slog" - -// "github.com/alitto/pond" -// "github.com/grassrootseconomics/celo-tracker/internal/processor" -// ) - -// type ( -// QueueOpts struct { -// QueueSize int -// Logg *slog.Logger -// Processor *processor.Processor -// Pool *pond.WorkerPool -// } - -// Queue struct { -// logg *slog.Logger -// processChan chan uint64 -// stopSignal chan interface{} -// processor *processor.Processor -// pool *pond.WorkerPool -// } -// ) - -// func New(o QueueOpts) *Queue { -// return &Queue{ -// logg: o.Logg, -// processChan: make(chan uint64, o.QueueSize), -// stopSignal: make(chan interface{}), -// processor: o.Processor, -// pool: o.Pool, -// } -// } - -// func (q *Queue) Stop() { -// q.stopSignal <- struct{}{} -// } - -// func (q *Queue) Process() { -// for { -// select { -// case <-q.stopSignal: -// q.logg.Info("shutdown signal received stopping queue processing") -// return -// case block, ok := <-q.processChan: -// if !ok { -// return -// } -// q.pool.Submit(func() { -// err := q.processor.ProcessBlock(context.Background(), block) -// if err != nil { -// q.logg.Error("block processor error", "block_number", block, "error", err) -// } -// }) -// } -// } -// } - -// func (q *Queue) Push(block uint64) { -// q.processChan <- block -// } - -// func (q *Queue) Size() int { -// return len(q.processChan) -// } - -// func (q *Queue) WaitingSize() uint64 { -// return q.pool.WaitingTasks() -// } diff --git a/internal/router/faucet_give.go b/internal/router/faucet_give.go deleted file mode 100644 index 9574549..0000000 --- a/internal/router/faucet_give.go +++ /dev/null @@ -1,94 +0,0 @@ -package router - -import ( - "context" - "math/big" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/celoutils/v3" - "github.com/grassrootseconomics/w3-celo" -) - -type faucetGiveHandler struct{} - -const faucetGiveEventName = "FAUCET_GIVE" - -var ( - _ Handler = (*faucetGiveHandler)(nil) - - faucetGiveEvent = w3.MustNewEvent("Give(address indexed _recipient, address indexed _token, uint256 _amount)") - faucetGiveToSig = w3.MustNewFunc("giveTo(address)", "uint256") - faucetGimmeSig = w3.MustNewFunc("gimme()", "uint256") -) - -func (h *faucetGiveHandler) Name() string { - return faucetGiveEventName -} - -func (h *faucetGiveHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var ( - recipient common.Address - token common.Address - amount big.Int - ) - - if err := faucetGiveEvent.DecodeArgs(tx.Log, &recipient, &token, &amount); err != nil { - return err - } - - faucetGiveEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: faucetGiveEventName, - Payload: map[string]any{ - "recipient": recipient.Hex(), - "token": token.Hex(), - "amount": amount.String(), - }, - } - - return pubCB(ctx, faucetGiveEvent) -} - -func (h *faucetGiveHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - faucetGiveEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: faucetGiveEventName, - } - - switch tx.InputData[:8] { - case "63e4bff4": - var to common.Address - - if err := faucetGiveToSig.DecodeArgs(w3.B(tx.InputData), &to); err != nil { - return err - } - - faucetGiveEvent.Payload = map[string]any{ - "recipient": to.Hex(), - "token": celoutils.ZeroAddress, - "amount": "0", - } - - return pubCB(ctx, faucetGiveEvent) - case "de82efb4": - faucetGiveEvent.Payload = map[string]any{ - "recipient": celoutils.ZeroAddress, - "token": celoutils.ZeroAddress, - "amount": "0", - } - - return pubCB(ctx, faucetGiveEvent) - } - - return nil -} diff --git a/internal/router/index_add.go b/internal/router/index_add.go deleted file mode 100644 index 7247a02..0000000 --- a/internal/router/index_add.go +++ /dev/null @@ -1,95 +0,0 @@ -package router - -import ( - "context" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/cache" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type indexAddHandler struct { - cache cache.Cache -} - -const indexAddEventName = "INDEX_ADD" - -var ( - _ Handler = (*indexAddHandler)(nil) - - indexAddEvent = w3.MustNewEvent("AddressAdded(address _token)") - indexAddSig = w3.MustNewFunc("add(address)", "bool") - indexRegisterSig = w3.MustNewFunc("register(address)", "bool") -) - -func (h *indexAddHandler) Name() string { - return indexAddEventName -} - -func (h *indexAddHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var address common.Address - - if err := indexAddEvent.DecodeArgs(tx.Log, &address); err != nil { - return err - } - - indexAddEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: indexAddEventName, - Payload: map[string]any{ - "address": address.Hex(), - }, - } - - if err := h.cache.Add(ctx, address.Hex()); err != nil { - return err - } - - return pubCB(ctx, indexAddEvent) -} - -func (h *indexAddHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - indexAddEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: indexAddEventName, - } - - switch tx.InputData[:8] { - case "0a3b0a4f": - var address common.Address - - indexAddEvent.Payload = map[string]any{ - "address": address.Hex(), - } - - if err := indexAddSig.DecodeArgs(w3.B(tx.InputData), &address); err != nil { - return err - } - - return pubCB(ctx, indexAddEvent) - case "4420e486": - var address common.Address - - indexAddEvent.Payload = map[string]any{ - "address": address.Hex(), - } - - if err := indexRegisterSig.DecodeArgs(w3.B(tx.InputData), &address); err != nil { - return err - } - - return pubCB(ctx, indexAddEvent) - } - - return nil -} diff --git a/internal/router/index_remove.go b/internal/router/index_remove.go deleted file mode 100644 index d63fe32..0000000 --- a/internal/router/index_remove.go +++ /dev/null @@ -1,77 +0,0 @@ -package router - -import ( - "context" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/cache" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type indexRemoveHandler struct { - cache cache.Cache -} - -const indexRemoveEventName = "INDEX_REMOVE" - -var ( - _ Handler = (*indexRemoveHandler)(nil) - - indexRemoveEvent = w3.MustNewEvent("AddressRemoved(address _token)") - indexRemoveSig = w3.MustNewFunc("remove(address)", "bool") -) - -func (h *indexRemoveHandler) Name() string { - return indexRemoveEventName -} - -func (h *indexRemoveHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var address common.Address - - if err := indexRemoveEvent.DecodeArgs(tx.Log, &address); err != nil { - return err - } - - indexRemoveEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: indexRemoveEventName, - Payload: map[string]any{ - "address": address.Hex(), - }, - } - - if err := h.cache.Remove(ctx, address.Hex()); err != nil { - return err - } - - return pubCB(ctx, indexRemoveEvent) - -} - -func (h *indexRemoveHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - var address common.Address - - if err := indexRemoveSig.DecodeArgs(w3.B(tx.InputData), &address); err != nil { - return err - } - - indexRemoveEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: indexRemoveEventName, - Payload: map[string]any{ - "address": address.Hex(), - }, - } - - return pubCB(ctx, indexRemoveEvent) -} diff --git a/internal/router/ownership.go b/internal/router/ownership.go deleted file mode 100644 index de4f3f1..0000000 --- a/internal/router/ownership.go +++ /dev/null @@ -1,77 +0,0 @@ -package router - -import ( - "context" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type ownershipHandler struct{} - -const ( - ownershipEventName = "OWNERSHIP_TRANSFERRED" -) - -var ( - _ Handler = (*ownershipHandler)(nil) - - ownershipEvent = w3.MustNewEvent("OwnershipTransferred(address indexed previousOwner, address indexed newOwner)") - ownershipToSig = w3.MustNewFunc("transferOwnership(address)", "bool") -) - -func (h *ownershipHandler) Name() string { - return ownershipEventName -} - -func (h *ownershipHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var ( - previousOwner common.Address - newOwner common.Address - ) - - if err := ownershipEvent.DecodeArgs(tx.Log, &previousOwner, &newOwner); err != nil { - return err - } - - ownershipEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: ownershipEventName, - Payload: map[string]any{ - "previousOwner": previousOwner.Hex(), - "newOwner": newOwner.Hex(), - }, - } - - return pubCB(ctx, ownershipEvent) -} - -func (h *ownershipHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - - var newOwner common.Address - - if err := ownershipToSig.DecodeArgs(w3.B(tx.InputData), &newOwner); err != nil { - return err - } - - ownershipEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: ownershipEventName, - Payload: map[string]any{ - "previousOwner": tx.From, - "newOwner": newOwner.Hex(), - }, - } - - return pubCB(ctx, ownershipEvent) -} diff --git a/internal/router/pool_deposit.go b/internal/router/pool_deposit.go deleted file mode 100644 index e8c5cac..0000000 --- a/internal/router/pool_deposit.go +++ /dev/null @@ -1,86 +0,0 @@ -package router - -import ( - "context" - "math/big" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type poolDepositHandler struct{} - -const poolDepositEventName = "POOL_DEPOSIT" - -var ( - _ Handler = (*poolDepositHandler)(nil) - - poolDepositEvent = w3.MustNewEvent("Deposit(address indexed initiator, address indexed tokenIn, uint256 amountIn)") - poolDepositSig = w3.MustNewFunc("deposit(address, uint256)", "") -) - -func (h *poolDepositHandler) Name() string { - return poolDepositEventName -} - -func (h *poolDepositHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var ( - initiator common.Address - tokenIn common.Address - amountIn big.Int - ) - - if err := poolDepositEvent.DecodeArgs( - tx.Log, - &initiator, - &tokenIn, - &amountIn, - ); err != nil { - return err - } - - poolDepositEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: poolDepositEventName, - Payload: map[string]any{ - "initiator": initiator.Hex(), - "tokenIn": tokenIn.Hex(), - "amountIn": amountIn.String(), - }, - } - - return pubCB(ctx, poolDepositEvent) -} - -func (h *poolDepositHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - var ( - tokenIn common.Address - amountIn big.Int - ) - - if err := poolDepositSig.DecodeArgs(w3.B(tx.InputData), &tokenIn, &amountIn); err != nil { - return err - } - - poolDepositEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: poolDepositEventName, - Payload: map[string]any{ - "initiator": tx.From, - "tokenIn": tokenIn.Hex(), - "amountIn": amountIn.String(), - }, - } - - return pubCB(ctx, poolDepositEvent) -} diff --git a/internal/router/pool_swap.go b/internal/router/pool_swap.go deleted file mode 100644 index 874bb20..0000000 --- a/internal/router/pool_swap.go +++ /dev/null @@ -1,99 +0,0 @@ -package router - -import ( - "context" - "math/big" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type poolSwapHandler struct{} - -const poolSwapEventName = "POOL_SWAP" - -var ( - _ Handler = (*poolSwapHandler)(nil) - - poolSwapEvent = w3.MustNewEvent("Swap(address indexed initiator, address indexed tokenIn, address tokenOut, uint256 amountIn, uint256 amountOut, uint256 fee)") - poolSwapSig = w3.MustNewFunc("withdraw(address, address, uint256)", "") -) - -func (h *poolSwapHandler) Name() string { - return poolSwapEventName -} - -func (h *poolSwapHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var ( - initiator common.Address - tokenIn common.Address - tokenOut common.Address - amountIn big.Int - amountOut big.Int - fee big.Int - ) - - if err := poolSwapEvent.DecodeArgs( - tx.Log, - &initiator, - &tokenIn, - &tokenOut, - &amountIn, - &amountOut, - &fee, - ); err != nil { - return err - } - - poolSwapEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: poolSwapEventName, - Payload: map[string]any{ - "initiator": initiator.Hex(), - "tokenIn": tokenIn.Hex(), - "tokenOut": tokenOut.Hex(), - "amountIn": amountIn.String(), - "amountOut": amountOut.String(), - "fee": fee.String(), - }, - } - - return pubCB(ctx, poolSwapEvent) -} - -func (h *poolSwapHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - var ( - tokenOut common.Address - tokenIn common.Address - amountIn big.Int - ) - - if err := poolSwapSig.DecodeArgs(w3.B(tx.InputData), &tokenOut, &tokenIn, &amountIn); err != nil { - return err - } - - poolSwapEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: poolSwapEventName, - Payload: map[string]any{ - "initiator": tx.From, - "tokenIn": tokenIn.Hex(), - "tokenOut": tokenOut.Hex(), - "amountIn": amountIn.String(), - "amountOut": "0", - "fee": "0", - }, - } - - return pubCB(ctx, poolSwapEvent) -} diff --git a/internal/router/quoter_price.go b/internal/router/quoter_price.go deleted file mode 100644 index 131a704..0000000 --- a/internal/router/quoter_price.go +++ /dev/null @@ -1,78 +0,0 @@ -package router - -import ( - "context" - "math/big" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type quoterPriceHandler struct{} - -const quoterPriceEventName = "QUOTER_PRICE_INDEX_UPDATED" - -var ( - _ Handler = (*quoterPriceHandler)(nil) - - quoterPriceEvent = w3.MustNewEvent("PriceIndexUpdated(address _tokenAddress, uint256 _exchangeRate)") - quoterPriceToSig = w3.MustNewFunc("setPriceIndexValue(address, uint256)", "uint256") -) - -func (h *quoterPriceHandler) Name() string { - return quoterPriceEventName -} - -func (h *quoterPriceHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var ( - token common.Address - exchangeRate big.Int - ) - - if err := quoterPriceEvent.DecodeArgs(tx.Log, &token, &exchangeRate); err != nil { - return err - } - - quoterPriceEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: quoterPriceEventName, - Payload: map[string]any{ - "token": token.Hex(), - "exchangeRate": exchangeRate.String(), - }, - } - - return pubCB(ctx, quoterPriceEvent) -} - -func (h *quoterPriceHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - var ( - token common.Address - exchangeRate big.Int - ) - - if err := quoterPriceToSig.DecodeArgs(w3.B(tx.InputData), &token, &exchangeRate); err != nil { - return err - } - - quoterPriceEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: quoterPriceEventName, - Payload: map[string]any{ - "token": token.Hex(), - "exchangeRate": exchangeRate.String(), - }, - } - - return pubCB(ctx, quoterPriceEvent) -} diff --git a/internal/router/router.go b/internal/router/router.go deleted file mode 100644 index 6aeeeb8..0000000 --- a/internal/router/router.go +++ /dev/null @@ -1,123 +0,0 @@ -package router - -import ( - "context" - - "github.com/celo-org/celo-blockchain/common" - "github.com/celo-org/celo-blockchain/core/types" - "github.com/grassrootseconomics/celo-tracker/internal/cache" - "github.com/grassrootseconomics/celo-tracker/internal/pub" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type ( - PubCallback func(context.Context, event.Event) error - - SuccessTx struct { - Log *types.Log - Timestamp uint64 - } - - RevertTx struct { - From string - InputData string - Block uint64 - ContractAddress string - Timestamp uint64 - TxHash string - } - - Handler interface { - Name() string - SuccessTx(context.Context, SuccessTx, PubCallback) error - RevertTx(context.Context, RevertTx, PubCallback) error - } - - RouterOpts struct { - Pub pub.Pub - Cache cache.Cache - } - - Router struct { - pub pub.Pub - cache cache.Cache - logHandlers map[common.Hash]Handler - inputDataHandlers map[string]Handler - } -) - -func New(o RouterOpts) *Router { - var ( - indexAddHandler *indexAddHandler = &indexAddHandler{cache: o.Cache} - indexRemoveHandler *indexRemoveHandler = &indexRemoveHandler{cache: o.Cache} - tokenTransferHandler *tokenTransferHandler = &tokenTransferHandler{cache: o.Cache} - faucetGiveHandler *faucetGiveHandler = &faucetGiveHandler{} - ownershipHandler *ownershipHandler = &ownershipHandler{} - poolDepositHandler *poolDepositHandler = &poolDepositHandler{} - poolSwapHandler *poolSwapHandler = &poolSwapHandler{} - quoterPriceHandler *quoterPriceHandler = "erPriceHandler{} - sealHandler *sealHandler = &sealHandler{} - tokenBurnHandler *tokenBurnHandler = &tokenBurnHandler{} - tokenMintHandler *tokenMintHandler = &tokenMintHandler{} - ) - - logHandlers := map[common.Hash]Handler{ - w3.H("0x26162814817e23ec5035d6a2edc6c422da2da2119e27cfca6be65cc2dc55ca4c"): faucetGiveHandler, - w3.H("0xa226db3f664042183ee0281230bba26cbf7b5057e50aee7f25a175ff45ce4d7f"): indexAddHandler, - w3.H("0x24a12366c02e13fe4a9e03d86a8952e85bb74a456c16e4a18b6d8295700b74bb"): indexRemoveHandler, - w3.H("0x8be0079c531659141344cd1fd0a4f28419497f9722a3daafe3b4186f6b6457e0"): ownershipHandler, - w3.H("0x5548c837ab068cf56a2c2479df0882a4922fd203edb7517321831d95078c5f62"): poolDepositHandler, - w3.H("0xd6d34547c69c5ee3d2667625c188acf1006abb93e0ee7cf03925c67cf7760413"): poolSwapHandler, - w3.H("0xdb9ce1a76955721ca61ac50cd1b87f9ab8620325c8619a62192c2dc7871d56b1"): quoterPriceHandler, - w3.H("0x6b7e2e653f93b645d4ed7292d6429f96637084363e477c8aaea1a43ed13c284e"): sealHandler, - w3.H("0xcc16f5dbb4873280815c1ee09dbd06736cffcc184412cf7a71a0fdb75d397ca5"): tokenBurnHandler, - w3.H("0xab8530f87dc9b59234c4623bf917212bb2536d647574c8e7e5da92c2ede0c9f8"): tokenMintHandler, - w3.H("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"): tokenTransferHandler, - } - - inputDataHandlers := map[string]Handler{ - "63e4bff4": faucetGiveHandler, - "de82efb4": faucetGiveHandler, - "0a3b0a4f": indexAddHandler, - "4420e486": indexAddHandler, - "29092d0e": indexRemoveHandler, - "f2fde38b": ownershipHandler, - "47e7ef24": poolDepositHandler, - "d9caed12": poolSwapHandler, - "ebc59dff": quoterPriceHandler, - "86fe212d": sealHandler, - "42966c68": tokenBurnHandler, - "449a52f8": tokenMintHandler, - "a9059cbb": tokenTransferHandler, - "23b872dd": tokenTransferHandler, - } - - return &Router{ - pub: o.Pub, - logHandlers: logHandlers, - inputDataHandlers: inputDataHandlers, - } -} - -func (r *Router) RouteSuccessTx(ctx context.Context, msg SuccessTx) error { - handler, ok := r.logHandlers[msg.Log.Topics[0]] - if ok { - return handler.SuccessTx(ctx, msg, r.pub.Send) - } - - return nil -} - -func (r *Router) RouteRevertTx(ctx context.Context, msg RevertTx) error { - if len(msg.InputData) < 8 { - return nil - } - - handler, ok := r.inputDataHandlers[msg.InputData[:8]] - if ok { - return handler.RevertTx(ctx, msg, r.pub.Send) - } - - return nil -} diff --git a/internal/router/seal.go b/internal/router/seal.go deleted file mode 100644 index c932169..0000000 --- a/internal/router/seal.go +++ /dev/null @@ -1,73 +0,0 @@ -package router - -import ( - "context" - "math/big" - - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type sealHandler struct{} - -const sealEventName = "SEAL_STATE_CHANGE" - -var ( - _ Handler = (*sealHandler)(nil) - - sealEvent = w3.MustNewEvent("SealStateChange(bool indexed _final, uint256 _sealState)") - sealToSig = w3.MustNewFunc("seal(uint256)", "uint256") -) - -func (h *sealHandler) Name() string { - return sealEventName -} - -func (h *sealHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var ( - final bool - sealState big.Int - ) - - if err := sealEvent.DecodeArgs(tx.Log, &final, &sealState); err != nil { - return err - } - - sealEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: sealEventName, - Payload: map[string]any{ - "final": final, - "sealState": sealState.String(), - }, - } - - return pubCB(ctx, sealEvent) -} - -func (h *sealHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - var sealState big.Int - - if err := sealToSig.DecodeArgs(w3.B(tx.InputData), &sealState); err != nil { - return err - } - - sealEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: sealEventName, - Payload: map[string]any{ - "sealState": sealState.String(), - }, - } - - return pubCB(ctx, sealEvent) -} diff --git a/internal/router/token_burn.go b/internal/router/token_burn.go deleted file mode 100644 index ee39c6e..0000000 --- a/internal/router/token_burn.go +++ /dev/null @@ -1,75 +0,0 @@ -package router - -import ( - "context" - "math/big" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type tokenBurnHandler struct{} - -const burnEventName = "TOKEN_BURN" - -var ( - _ Handler = (*tokenBurnHandler)(nil) - - tokenBurnEvent = w3.MustNewEvent("Burn(address indexed _tokenBurner, uint256 _value)") - tokenBurnToSig = w3.MustNewFunc("burn(uint256)", "bool") -) - -func (h *tokenBurnHandler) Name() string { - return burnEventName -} - -func (h *tokenBurnHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var ( - tokenBurner common.Address - value big.Int - ) - - if err := tokenBurnEvent.DecodeArgs(tx.Log, &tokenBurner, &value); err != nil { - return err - } - - tokenBurnEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: burnEventName, - Payload: map[string]any{ - "tokenBurner": tokenBurner.Hex(), - "value": value.String(), - }, - } - - return pubCB(ctx, tokenBurnEvent) -} - -func (h *tokenBurnHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - var value big.Int - - if err := tokenBurnToSig.DecodeArgs(w3.B(tx.InputData), &value); err != nil { - return err - } - - tokenBurnEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: burnEventName, - Payload: map[string]any{ - "tokenBurner": tx.From, - "value": value.String(), - }, - } - - return pubCB(ctx, tokenBurnEvent) -} diff --git a/internal/router/token_mint.go b/internal/router/token_mint.go deleted file mode 100644 index ed11050..0000000 --- a/internal/router/token_mint.go +++ /dev/null @@ -1,82 +0,0 @@ -package router - -import ( - "context" - "math/big" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/w3-celo" -) - -type tokenMintHandler struct{} - -const mintEventName = "TOKEN_MINT" - -var ( - _ Handler = (*tokenMintHandler)(nil) - - tokenMintEvent = w3.MustNewEvent("Mint(address indexed _tokenMinter, address indexed _beneficiary, uint256 _value)") - tokenMintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool") -) - -func (h *tokenMintHandler) Name() string { - return mintEventName -} - -func (h *tokenMintHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var ( - tokenMinter common.Address - to common.Address - value big.Int - ) - - if err := tokenMintEvent.DecodeArgs(tx.Log, &tokenMinter, &to, &value); err != nil { - return err - } - - tokenMintEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: mintEventName, - Payload: map[string]any{ - "tokenMinter": tokenMinter.Hex(), - "to": to.Hex(), - "value": value.String(), - }, - } - - return pubCB(ctx, tokenMintEvent) -} - -func (h *tokenMintHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - - var ( - to common.Address - value big.Int - ) - - if err := tokenMintToSig.DecodeArgs(w3.B(tx.InputData), &to, &value); err != nil { - return err - } - - tokenMintEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: mintEventName, - Payload: map[string]any{ - "tokenMinter": tx.From, - "to": to.Hex(), - "value": value.String(), - }, - } - - return pubCB(ctx, tokenMintEvent) -} diff --git a/internal/router/token_transfer.go b/internal/router/token_transfer.go deleted file mode 100644 index 78afb6f..0000000 --- a/internal/router/token_transfer.go +++ /dev/null @@ -1,161 +0,0 @@ -package router - -import ( - "context" - "math/big" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/cache" - "github.com/grassrootseconomics/celo-tracker/pkg/event" - "github.com/grassrootseconomics/celoutils/v3" - "github.com/grassrootseconomics/w3-celo" -) - -type tokenTransferHandler struct { - cache cache.Cache -} - -const transferEventName = "TOKEN_TRANSFER" - -var ( - _ Handler = (*tokenTransferHandler)(nil) - - tokenTransferEvent = w3.MustNewEvent("Transfer(address indexed _from, address indexed _to, uint256 _value)") - tokenTransferSig = w3.MustNewFunc("transfer(address, uint256)", "bool") - tokenTransferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool") - - stables = map[string]bool{ - celoutils.CUSDContractMainnet: true, - celoutils.CKESContractMainnet: true, - celoutils.USDTContractMainnet: true, - celoutils.USDCContractMainnet: true, - } -) - -func (h *tokenTransferHandler) Name() string { - return transferEventName -} - -func (h *tokenTransferHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error { - var ( - from common.Address - to common.Address - value big.Int - ) - - if err := tokenTransferEvent.DecodeArgs(tx.Log, &from, &to, &value); err != nil { - return err - } - - proceed, err := h.checkStables(ctx, from.Hex(), to.Hex(), tx.Log.Address.Hex()) - if err != nil { - return err - } - if !proceed { - return nil - } - - tokenTransferEvent := event.Event{ - Index: tx.Log.Index, - Block: tx.Log.BlockNumber, - ContractAddress: tx.Log.Address.Hex(), - Success: true, - Timestamp: tx.Timestamp, - TxHash: tx.Log.TxHash.Hex(), - TxType: transferEventName, - Payload: map[string]any{ - "from": from.Hex(), - "to": to.Hex(), - "value": value.String(), - }, - } - - return pubCB(ctx, tokenTransferEvent) -} - -func (h *tokenTransferHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error { - tokenTransferEvent := event.Event{ - Block: tx.Block, - ContractAddress: tx.ContractAddress, - Success: false, - Timestamp: tx.Timestamp, - TxHash: tx.TxHash, - TxType: transferEventName, - } - - switch tx.InputData[:8] { - case "a9059cbb": - var ( - to common.Address - value big.Int - ) - - if err := tokenTransferSig.DecodeArgs(w3.B(tx.InputData), &to, &value); err != nil { - return err - } - - proceed, err := h.checkStables(ctx, tx.From, to.Hex(), tx.ContractAddress) - if err != nil { - return err - } - if !proceed { - return nil - } - - tokenTransferEvent.Payload = map[string]any{ - "from": tx.From, - "to": to.Hex(), - "value": value.String(), - } - - return pubCB(ctx, tokenTransferEvent) - case "23b872dd": - var ( - from common.Address - to common.Address - value big.Int - ) - - if err := tokenTransferFromSig.DecodeArgs(w3.B(tx.InputData), &from, &to, &value); err != nil { - return err - } - - proceed, err := h.checkStables(ctx, from.Hex(), to.Hex(), tx.ContractAddress) - if err != nil { - return err - } - if !proceed { - return nil - } - - tokenTransferEvent.Payload = map[string]any{ - "from": from.Hex(), - "to": to.Hex(), - "value": value.String(), - } - - return pubCB(ctx, tokenTransferEvent) - } - - return nil -} - -func (h *tokenTransferHandler) checkStables(ctx context.Context, from string, to string, contractAddress string) (bool, error) { - _, ok := stables[contractAddress] - if !ok { - return true, nil - } - - // TODO: Pipeline this check on Redis with a new method - fromExists, err := h.cache.Exists(ctx, from) - if err != nil { - return false, err - } - - toExists, err := h.cache.Exists(ctx, to) - if err != nil { - return false, err - } - - return fromExists || toExists, nil -} diff --git a/pkg/router/router.go b/pkg/router/router.go new file mode 100644 index 0000000..1d7271c --- /dev/null +++ b/pkg/router/router.go @@ -0,0 +1,90 @@ +package router + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/celo-org/celo-blockchain/core/types" + "github.com/grassrootseconomics/celo-tracker/pkg/event" +) + +type ( + Callback func(context.Context, event.Event) error + + LogPayload struct { + Log *types.Log + Timestamp uint64 + } + + InputDataPayload struct { + From string + InputData string + Block uint64 + ContractAddress string + Timestamp uint64 + TxHash string + } + + LogHandlerFunc func(context.Context, LogPayload, Callback) error + InputDataHandlerFunc func(context.Context, InputDataPayload, Callback) error + + LogRouteEntry struct { + Signature common.Hash + HandlerFunc LogHandlerFunc + } + + InputDataEntry struct { + Signature string + HandlerFunc InputDataHandlerFunc + } + + Router struct { + callbackFn Callback + logHandlers map[common.Hash]LogRouteEntry + inputDataHandlers map[string]InputDataEntry + } +) + +func New(callbackFn Callback) *Router { + return &Router{ + callbackFn: callbackFn, + logHandlers: make(map[common.Hash]LogRouteEntry), + inputDataHandlers: make(map[string]InputDataEntry), + } +} + +func (r *Router) RegisterLogRoute(signature common.Hash, handlerFunc LogHandlerFunc) { + r.logHandlers[signature] = LogRouteEntry{ + Signature: signature, + HandlerFunc: handlerFunc, + } +} + +func (r *Router) RegisterInputDataRoute(signature string, handlerFunc InputDataHandlerFunc) { + r.inputDataHandlers[signature] = InputDataEntry{ + Signature: signature, + HandlerFunc: handlerFunc, + } +} + +func (r *Router) ProcessLog(ctx context.Context, payload LogPayload) error { + handler, ok := r.logHandlers[payload.Log.Topics[0]] + if ok { + return handler.HandlerFunc(ctx, payload, r.callbackFn) + } + + return nil +} + +func (r *Router) ProcessInputData(ctx context.Context, payload InputDataPayload) error { + if len(payload.InputData) < 8 { + return nil + } + + handler, ok := r.inputDataHandlers[payload.InputData[:8]] + if ok { + return handler.HandlerFunc(ctx, payload, r.callbackFn) + } + + return nil +}