From 8085168ed3452e7076c57014aa232c5fef3ed491 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Wed, 29 Mar 2023 16:17:30 +0000 Subject: [PATCH] major refactor: bootstrap from registry map and token index (see notes) * the cache is now bootstrapped from the token index and registry map * a token index filter auto updates the cache when a token is added (using add sig) * minor fixes to filters --- cmd/service/filters.go | 81 +++++++++++++++++++++------ cmd/service/init.go | 20 +++++++ cmd/service/main.go | 10 +++- cmd/service/utils.go | 2 +- config.toml | 11 ++-- go.mod | 2 +- go.sum | 2 + internal/filter/address_filter.go | 19 ++----- internal/filter/gas_filter.go | 56 +++++++++--------- internal/filter/register_filter.go | 24 ++++---- internal/filter/token_index_filter.go | 81 +++++++++++++++++++++++++++ internal/filter/transfer_filter.go | 12 +--- 12 files changed, 232 insertions(+), 88 deletions(-) create mode 100644 internal/filter/token_index_filter.go diff --git a/cmd/service/filters.go b/cmd/service/filters.go index b499395..ff1c3b5 100644 --- a/cmd/service/filters.go +++ b/cmd/service/filters.go @@ -1,33 +1,71 @@ package main import ( + "context" + "fmt" + "math/big" "strings" "sync" + "time" + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/pub" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/grassrootseconomics/w3-celo-patch/module/eth" + "github.com/grassrootseconomics/w3-celo-patch/w3types" ) -var ( - systemAddress string -) +func initAddressFilter(celoProvider *celoutils.Provider, cache *sync.Map) filter.Filter { + var ( + tokenIndexEntryCount big.Int + ) -func initAddressFilter() filter.Filter { - // TODO: Temporary shortcut - systemAddress = strings.ToLower(ko.MustString("chain.system_address")) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - // TODO: Bootstrap addresses from smart contract - // TODO: Add route to update cache - cache := &sync.Map{} + registryMap, err := celoProvider.RegistryMap(ctx, celoutils.HexToAddress(ko.MustString("chain.registry_address"))) + if err != nil { + lo.Fatal("init: critical error creating address filter", "error", err) + } - cache.Store(strings.ToLower(ko.MustString("chain.token_index_address")), "TokenIndex") - cache.Store(strings.ToLower(ko.MustString("chain.gas_faucet_address")), "GasFaucet") - cache.Store(strings.ToLower(ko.MustString("chain.user_index_address")), "UserIndex") + for k, v := range registryMap { + cache.Store(strings.ToLower(v.Hex()), k) + } + + if err := celoProvider.Client.CallCtx( + ctx, + eth.CallFunc(w3.MustNewFunc("entryCount()", "uint256"), registryMap[celoutils.TokenIndex]).Returns(&tokenIndexEntryCount), + ); err != nil { + lo.Fatal("init: critical error creating address filter", "error", err) + } + + calls := make([]w3types.Caller, tokenIndexEntryCount.Int64()) + tokenAddresses := make([]common.Address, tokenIndexEntryCount.Int64()) + + entrySig := w3.MustNewFunc("entry(uint256 _idx)", "address") + + // TODO: There is a 5MB limit to a RPC batch call size. + // Test if 10k entries will raise an error (future proofed for a lot of years) + for i := 0; i < int(tokenIndexEntryCount.Int64()); i++ { + calls[i] = eth.CallFunc(entrySig, registryMap[celoutils.TokenIndex], new(big.Int).SetInt64(int64(i))).Returns(&tokenAddresses[i]) + } + + if err := celoProvider.Client.CallCtx( + ctx, + calls..., + ); err != nil { + lo.Fatal("init: critical error creating address filter", "error", err) + } + + for i, v := range tokenAddresses { + cache.Store(strings.ToLower(v.Hex()), fmt.Sprintf("TOKEN_%d", i)) + } return filter.NewAddressFilter(filter.AddressFilterOpts{ - Cache: cache, - Logg: lo, - SystemAddress: systemAddress, + Cache: cache, + Logg: lo, }) } @@ -41,9 +79,8 @@ func initTransferFilter(pub *pub.Pub) filter.Filter { func initGasGiftFilter(pub *pub.Pub) filter.Filter { return filter.NewGasFilter(filter.GasFilterOpts{ - Pub: pub, - Logg: lo, - SystemAddress: systemAddress, + Pub: pub, + Logg: lo, }) } @@ -53,3 +90,11 @@ func initRegisterFilter(pub *pub.Pub) filter.Filter { Logg: lo, }) } + +func initTokenIndexFilter(cache *sync.Map, pub *pub.Pub) filter.Filter { + return filter.NewTokenIndexFilter(filter.TokenIndexFilterOpts{ + Cache: cache, + Pub: pub, + Logg: lo, + }) +} diff --git a/cmd/service/init.go b/cmd/service/init.go index f5798e7..4728b2e 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -6,6 +6,7 @@ import ( "time" "github.com/alitto/pond" + "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-chain-events/internal/pool" "github.com/grassrootseconomics/cic-chain-events/internal/pub" "github.com/grassrootseconomics/cic-chain-events/internal/store" @@ -128,3 +129,22 @@ func initPub(natsConn *nats.Conn, jsCtx nats.JetStreamContext) *pub.Pub { return pub } + +func initCeloProvider() *celoutils.Provider { + providerOpts := celoutils.ProviderOpts{ + RpcEndpoint: ko.MustString("chain.rpc_endpoint"), + } + + if ko.Bool("chain.testnet") { + providerOpts.ChainId = celoutils.TestnetChainId + } else { + providerOpts.ChainId = celoutils.MainnetChainId + } + + provider, err := celoutils.NewProvider(providerOpts) + if err != nil { + lo.Fatal("init: critical error loading chain provider", "error", err) + } + + return provider +} diff --git a/cmd/service/main.go b/cmd/service/main.go index 5d3a00e..ea82a5f 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -17,7 +17,7 @@ import ( ) type ( - internalServiceContainer struct { + internalServicesContainer struct { apiService *echo.Echo pub *pub.Pub } @@ -55,19 +55,23 @@ func main() { natsConn, jsCtx := initJetStream() jsPub := initPub(natsConn, jsCtx) + celoProvider := initCeloProvider() + cache := &sync.Map{} + pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{ BlockFetcher: graphqlFetcher, Filters: []filter.Filter{ - initAddressFilter(), + initAddressFilter(celoProvider, cache), initGasGiftFilter(jsPub), initTransferFilter(jsPub), initRegisterFilter(jsPub), + initTokenIndexFilter(cache, jsPub), }, Logg: lo, Store: pgStore, }) - internalServices := &internalServiceContainer{ + internalServices := &internalServicesContainer{ pub: jsPub, } syncerStats := &syncer.Stats{} diff --git a/cmd/service/utils.go b/cmd/service/utils.go index 0a89073..340e11d 100644 --- a/cmd/service/utils.go +++ b/cmd/service/utils.go @@ -17,7 +17,7 @@ func createSigChannel() (chan os.Signal, func()) { } } -func startGracefulShutdown(ctx context.Context, internalServices *internalServiceContainer) { +func startGracefulShutdown(ctx context.Context, internalServices *internalServicesContainer) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() diff --git a/config.toml b/config.toml index 82671a0..8348d69 100644 --- a/config.toml +++ b/config.toml @@ -9,12 +9,11 @@ address = ":5000" # Geth API endpoints [chain] -graphql_endpoint = "" -ws_endpoint = "" -system_address = "" -token_index_address = "" -gas_faucet_address = "" -user_index_address = "" +graphql_endpoint = "" +ws_endpoint = "" +rpc_endpoint = "" +testnet = true +registry_address = "" # Syncer configs [syncer] diff --git a/go.mod b/go.mod index a163155..3e1a711 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/VictoriaMetrics/metrics v1.23.1 github.com/alitto/pond v1.8.3 github.com/celo-org/celo-blockchain v1.7.2 - github.com/grassrootseconomics/celoutils v1.1.1 + github.com/grassrootseconomics/celoutils v1.2.1 github.com/grassrootseconomics/w3-celo-patch v0.2.0 github.com/jackc/pgx/v5 v5.3.1 github.com/jackc/tern/v2 v2.0.1 diff --git a/go.sum b/go.sum index ab338b2..dc35df5 100644 --- a/go.sum +++ b/go.sum @@ -269,6 +269,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/grassrootseconomics/celoutils v1.1.1 h1:REsndvfBkPN8UKOoQFNEGm/sCwKtTm+woYtgMl3bfZ0= github.com/grassrootseconomics/celoutils v1.1.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q= +github.com/grassrootseconomics/celoutils v1.2.1 h1:ndM4h7Df0d57m2kdRXRStrnunqOL61wQ51rnOanX1KI= +github.com/grassrootseconomics/celoutils v1.2.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q= github.com/grassrootseconomics/w3-celo-patch v0.2.0 h1:YqibbPzX0tQKmxU1nUGzThPKk/fiYeYZY6Aif3eyu8U= github.com/grassrootseconomics/w3-celo-patch v0.2.0/go.mod h1:WhBXNzNIvHmS6B2hAeShs56oa9Azb4jQSrOMKuMdBWw= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= diff --git a/internal/filter/address_filter.go b/internal/filter/address_filter.go index c9899d8..e1a726e 100644 --- a/internal/filter/address_filter.go +++ b/internal/filter/address_filter.go @@ -10,31 +10,24 @@ import ( type ( AddressFilterOpts struct { - Cache *sync.Map - Logg logf.Logger - SystemAddress string + Cache *sync.Map + Logg logf.Logger } AddressFilter struct { - cache *sync.Map - logg logf.Logger - systemAddress string + cache *sync.Map + logg logf.Logger } ) func NewAddressFilter(o AddressFilterOpts) Filter { return &AddressFilter{ - cache: o.Cache, - logg: o.Logg, - systemAddress: o.SystemAddress, + cache: o.Cache, + logg: o.Logg, } } func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { - if transaction.From.Address == f.systemAddress { - return true, nil - } - if _, found := f.cache.Load(transaction.To.Address); found { return true, nil } diff --git a/internal/filter/gas_filter.go b/internal/filter/gas_filter.go index 807bc92..6d073f5 100644 --- a/internal/filter/gas_filter.go +++ b/internal/filter/gas_filter.go @@ -3,63 +3,69 @@ package filter import ( "context" - "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/celo-org/celo-blockchain/common" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-chain-events/internal/pub" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" + "github.com/grassrootseconomics/w3-celo-patch" "github.com/zerodha/logf" ) const ( - gasFilterEventSubject = "CHAIN.gas" + gasEventSubject = "CHAIN.gas" +) + +var ( + giveToSig = w3.MustNewFunc("giveTo(address)", "uint256") ) type ( GasFilterOpts struct { - Logg logf.Logger - Pub *pub.Pub - SystemAddress string + Logg logf.Logger + Pub *pub.Pub } GasFilter struct { - logg logf.Logger - pub *pub.Pub - systemAddress string + logg logf.Logger + pub *pub.Pub } ) func NewGasFilter(o GasFilterOpts) Filter { return &GasFilter{ - logg: o.Logg, - pub: o.Pub, - systemAddress: o.SystemAddress, + logg: o.Logg, + pub: o.Pub, } } func (f *GasFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { - transferValue, err := hexutil.DecodeUint64(transaction.Value) - if err != nil { - return false, err + if len(transaction.InputData) < 10 { + return true, nil } - // TODO: This is a temporary shortcut to gift gas. Switch to gas faucet contract. - if transaction.From.Address == f.systemAddress && transferValue > 0 { - transferEvent := &pub.MinimalTxInfo{ - Block: transaction.Block.Number, - To: celoutils.ChecksumAddress(transaction.To.Address), - TxHash: transaction.Hash, - TxIndex: transaction.Index, - Value: transferValue, + if transaction.InputData[:10] == "0x63e4bff4" { + var address common.Address + + if err := giveToSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil { + return false, err + } + + giveToEvent := &pub.MinimalTxInfo{ + Block: transaction.Block.Number, + ContractAddress: celoutils.ChecksumAddress(transaction.To.Address), + To: address.Hex(), + TxHash: transaction.Hash, + TxIndex: transaction.Index, } if transaction.Status == 1 { - transferEvent.Success = true + giveToEvent.Success = true } if err := f.pub.Publish( - gasFilterEventSubject, + gasEventSubject, transaction.Hash, - transferEvent, + giveToEvent, ); err != nil { return false, err } diff --git a/internal/filter/register_filter.go b/internal/filter/register_filter.go index f76bf49..f87e8ec 100644 --- a/internal/filter/register_filter.go +++ b/internal/filter/register_filter.go @@ -16,44 +16,44 @@ const ( ) var ( - addSig = w3.MustNewFunc("add(address)", "bool") + registerSig = w3.MustNewFunc("register(address)", "") ) type ( RegisterFilterOpts struct { - Logg logf.Logger - Pub *pub.Pub + Logg logf.Logger + Pub *pub.Pub } RegisterFilter struct { - logg logf.Logger - pub *pub.Pub + logg logf.Logger + pub *pub.Pub } ) func NewRegisterFilter(o RegisterFilterOpts) Filter { return &RegisterFilter{ - logg: o.Logg, - pub: o.Pub, + logg: o.Logg, + pub: o.Pub, } } func (f *RegisterFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { - if len(transaction.InputData) < 10 { + if len(transaction.InputData) < 10 { return true, nil } - if transaction.InputData[:10] == "0x0a3b0a4f" { + if transaction.InputData[:10] == "0x4420e486" { var address common.Address - if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil { + if err := registerSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil { return false, err } addEvent := &pub.MinimalTxInfo{ Block: transaction.Block.Number, - ContractAddress: transaction.To.Address, - To: celoutils.ChecksumAddress(transaction.To.Address), + ContractAddress: celoutils.ChecksumAddress(transaction.To.Address), + To: address.Hex(), TxHash: transaction.Hash, TxIndex: transaction.Index, } diff --git a/internal/filter/token_index_filter.go b/internal/filter/token_index_filter.go new file mode 100644 index 0000000..d3a4cb9 --- /dev/null +++ b/internal/filter/token_index_filter.go @@ -0,0 +1,81 @@ +package filter + +import ( + "context" + "strings" + "sync" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celoutils" + "github.com/grassrootseconomics/cic-chain-events/internal/pub" + "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/zerodha/logf" +) + +const ( + tokenIndexFilterEventSubject = "CHAIN.tokenAdded" +) + +var ( + addSig = w3.MustNewFunc("add(address)", "bool") +) + +type ( + TokenIndexFilterOpts struct { + Cache *sync.Map + Logg logf.Logger + Pub *pub.Pub + } + + TokenIndexFilter struct { + pub *pub.Pub + cache *sync.Map + logg logf.Logger + } +) + +func NewTokenIndexFilter(o TokenIndexFilterOpts) Filter { + return &TokenIndexFilter{ + cache: o.Cache, + logg: o.Logg, + pub: o.Pub, + } +} + +func (f *TokenIndexFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { + if len(transaction.InputData) < 10 { + return true, nil + } + + if transaction.InputData[:10] == "0x0a3b0a4f" { + var address common.Address + + if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil { + return false, err + } + + f.cache.Store(strings.ToLower(address.Hex()), transaction.Hash) + + addEvent := &pub.MinimalTxInfo{ + Block: transaction.Block.Number, + ContractAddress: celoutils.ChecksumAddress(transaction.To.Address), + To: address.Hex(), + TxHash: transaction.Hash, + TxIndex: transaction.Index, + } + + if transaction.Status == 1 { + addEvent.Success = true + } + + if err := f.pub.Publish( + tokenIndexFilterEventSubject, + transaction.Hash, + addEvent, + ); err != nil { + return false, err + } + } + return true, nil +} diff --git a/internal/filter/transfer_filter.go b/internal/filter/transfer_filter.go index 1c20020..73a0d6c 100644 --- a/internal/filter/transfer_filter.go +++ b/internal/filter/transfer_filter.go @@ -57,13 +57,11 @@ func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transacti return false, err } - f.logg.Debug("transfer_filter: new reg", "transfer", to) - transferEvent := &pub.MinimalTxInfo{ Block: transaction.Block.Number, From: celoutils.ChecksumAddress(transaction.From.Address), To: to.Hex(), - ContractAddress: transaction.To.Address, + ContractAddress: celoutils.ChecksumAddress(transaction.To.Address), TxHash: transaction.Hash, TxIndex: transaction.Index, Value: value.Uint64(), @@ -93,13 +91,11 @@ func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transacti return false, err } - f.logg.Debug("transfer_filter: new reg", "transferFrom", to) - transferEvent := &pub.MinimalTxInfo{ Block: transaction.Block.Number, From: from.Hex(), To: to.Hex(), - ContractAddress: transaction.To.Address, + ContractAddress: celoutils.ChecksumAddress(transaction.To.Address), TxHash: transaction.Hash, TxIndex: transaction.Index, Value: value.Uint64(), @@ -128,13 +124,11 @@ func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transacti return false, err } - f.logg.Debug("transfer_filter: new reg", "mintTo", to) - transferEvent := &pub.MinimalTxInfo{ Block: transaction.Block.Number, From: celoutils.ChecksumAddress(transaction.From.Address), To: to.Hex(), - ContractAddress: transaction.To.Address, + ContractAddress: celoutils.ChecksumAddress(transaction.To.Address), TxHash: transaction.Hash, TxIndex: transaction.Index, Value: value.Uint64(),