From 20fc30c34a97b3f0f323144c33b7586c6d252eec Mon Sep 17 00:00:00 2001 From: Mohamed Sohail Date: Thu, 19 Jan 2023 11:42:59 +0300 Subject: [PATCH] feat: RPC block fetcher (#15) * feat: init base logic for rpc fetcher * feat: rpc block fetcher, move filters to internal * move filters to internal folder * rpc block fetcher * add benchmarks: goos: linux goarch: amd64 pkg: github.com/grassrootseconomics/cic-chain-events/pkg/fetch cpu: AMD EPYC Processor Benchmark_RPC Benchmark_RPC/RPC_Block_Fetcher_Benchmark Benchmark_RPC/RPC_Block_Fetcher_Benchmark-4 25 46000646 ns/op 221697 B/op 844 allocs/op Benchmark_GraphQL Benchmark_GraphQL/GraphQL_Block_Fetcher_Benchmark Benchmark_GraphQL/GraphQL_Block_Fetcher_Benchmark-4 56 21219962 ns/op 56686 B/op 94 allocs/op PASS ok github.com/grassrootseconomics/cic-chain-events/pkg/fetch 2.920s * inline-docs: Describe RPC fetcher --- Makefile | 3 + cmd/filters.go | 2 +- cmd/main.go | 2 +- go.mod | 1 + go.sum | 2 + {pkg => internal}/filter/address_filter.go | 0 .../filter/address_filter_test.go | 0 {pkg => internal}/filter/decode_filter.go | 0 .../filter/decode_filter_test.go | 0 {pkg => internal}/filter/filter.go | 0 internal/pipeline/pipeline.go | 2 +- pkg/fetch/benchmark_test.go | 50 ++++++++++ pkg/fetch/fetch.go | 6 +- pkg/fetch/rpc.go | 97 +++++++++++++++++++ pkg/fetch/rpc_test.go | 49 ++++++++++ 15 files changed, 208 insertions(+), 6 deletions(-) rename {pkg => internal}/filter/address_filter.go (100%) rename {pkg => internal}/filter/address_filter_test.go (100%) rename {pkg => internal}/filter/decode_filter.go (100%) rename {pkg => internal}/filter/decode_filter_test.go (100%) rename {pkg => internal}/filter/filter.go (100%) create mode 100644 pkg/fetch/benchmark_test.go create mode 100644 pkg/fetch/rpc.go create mode 100644 pkg/fetch/rpc_test.go diff --git a/Makefile b/Makefile index 8a7227e..e6edbf0 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,9 @@ mod: test-pkg: TEST_GRAPHQL_ENDPOINT=https://rpc.alfajores.celo.grassecon.net/graphql go test -v -covermode atomic -coverprofile=covprofile ./pkg/... +bench-pkg: + TEST_RPC_ENDPOINT=https://rpc.alfajores.celo.grassecon.net TEST_GRAPHQL_ENDPOINT=https://rpc.alfajores.celo.grassecon.net/graphql go test -v -bench=. -run=^Benchmark ./pkg/... + migrate: tern migrate -c migrations/tern.conf diff --git a/cmd/filters.go b/cmd/filters.go index db6aa59..beaa591 100644 --- a/cmd/filters.go +++ b/cmd/filters.go @@ -4,7 +4,7 @@ import ( "strings" "sync" - "github.com/grassrootseconomics/cic-chain-events/pkg/filter" + "github.com/grassrootseconomics/cic-chain-events/internal/filter" ) func initAddressFilter() filter.Filter { diff --git a/cmd/main.go b/cmd/main.go index ba6fa26..4091832 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,7 +13,7 @@ import ( "github.com/grassrootseconomics/cic-chain-events/internal/pipeline" "github.com/grassrootseconomics/cic-chain-events/internal/pool" "github.com/grassrootseconomics/cic-chain-events/internal/syncer" - "github.com/grassrootseconomics/cic-chain-events/pkg/filter" + "github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/knadh/goyesql/v2" "github.com/knadh/koanf" "github.com/zerodha/logf" diff --git a/go.mod b/go.mod index d3f0fec..434edeb 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/alitto/pond v1.8.2 github.com/celo-org/celo-blockchain v1.6.1 github.com/goccy/go-json v0.10.0 + github.com/grassrootseconomics/cic-celo-sdk v0.3.1 github.com/grassrootseconomics/w3-celo-patch v0.1.0 github.com/jackc/pgx/v5 v5.2.0 github.com/knadh/goyesql/v2 v2.2.0 diff --git a/go.sum b/go.sum index 833b213..bbefb0d 100644 --- a/go.sum +++ b/go.sum @@ -248,6 +248,8 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= +github.com/grassrootseconomics/cic-celo-sdk v0.3.1 h1:SzmMFrqxSIdgePqwbUdoS3PNP82MFnlOecycVk2ZYWg= +github.com/grassrootseconomics/cic-celo-sdk v0.3.1/go.mod h1:EiR6d03GYu6jlVKNL1MbTAw/bqAW2WP3J/lkrZxPMdU= github.com/grassrootseconomics/w3-celo-patch v0.1.0 h1:0fev2hYkGEyFX2D4oUG8yy4jXhtHv7qUtLLboXL5ycw= github.com/grassrootseconomics/w3-celo-patch v0.1.0/go.mod h1:JtkXc+yDUiQQJdhYTqddZI/itdYGHY7H8PNZzBo4hCk= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= diff --git a/pkg/filter/address_filter.go b/internal/filter/address_filter.go similarity index 100% rename from pkg/filter/address_filter.go rename to internal/filter/address_filter.go diff --git a/pkg/filter/address_filter_test.go b/internal/filter/address_filter_test.go similarity index 100% rename from pkg/filter/address_filter_test.go rename to internal/filter/address_filter_test.go diff --git a/pkg/filter/decode_filter.go b/internal/filter/decode_filter.go similarity index 100% rename from pkg/filter/decode_filter.go rename to internal/filter/decode_filter.go diff --git a/pkg/filter/decode_filter_test.go b/internal/filter/decode_filter_test.go similarity index 100% rename from pkg/filter/decode_filter_test.go rename to internal/filter/decode_filter_test.go diff --git a/pkg/filter/filter.go b/internal/filter/filter.go similarity index 100% rename from pkg/filter/filter.go rename to internal/filter/filter.go diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index c85898d..78872ff 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -3,9 +3,9 @@ package pipeline import ( "context" + "github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/store" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" - "github.com/grassrootseconomics/cic-chain-events/pkg/filter" "github.com/jackc/pgx/v5" "github.com/zerodha/logf" ) diff --git a/pkg/fetch/benchmark_test.go b/pkg/fetch/benchmark_test.go new file mode 100644 index 0000000..910ef54 --- /dev/null +++ b/pkg/fetch/benchmark_test.go @@ -0,0 +1,50 @@ +package fetch + +import ( + "context" + "testing" + + celo "github.com/grassrootseconomics/cic-celo-sdk" +) + +func Benchmark_RPC(b *testing.B) { + celoProvider, err := celo.NewProvider(celo.ProviderOpts{ + ChainId: celo.MainnetChainId, + RpcEndpoint: rpcEndpoint, + }) + + rpc := NewRPCFetcher(RPCOpts{ + RPCProvider: celoProvider, + }) + + if err != nil { + return + } + + b.Run("RPC_Block_Fetcher_Benchmark", func(b *testing.B) { + for n := 0; n < b.N; n++ { + _, err := rpc.Block(context.Background(), 14974600) + if err != nil { + b.Fatal(err) + } + } + b.ReportAllocs() + }) +} + +func Benchmark_GraphQL(b *testing.B) { + graphql := NewGraphqlFetcher(GraphqlOpts{ + GraphqlEndpoint: graphqlEndpoint, + }) + + b.Run("GraphQL_Block_Fetcher_Benchmark", func(b *testing.B) { + for n := 0; n < b.N; n++ { + _, err := graphql.Block(context.Background(), 14974600) + if err != nil { + b.Fatal(err) + } + } + b.ReportAllocs() + }) + +} diff --git a/pkg/fetch/fetch.go b/pkg/fetch/fetch.go index 5eb6c50..8539d74 100644 --- a/pkg/fetch/fetch.go +++ b/pkg/fetch/fetch.go @@ -10,7 +10,7 @@ type Fetch interface { // Transaction reprsents a JSON object of all important mined transaction information type Transaction struct { Block struct { - Number uint `json:"number"` + Number uint64 `json:"number"` Timestamp string `json:"timestamp"` } `json:"block"` Hash string `json:"hash"` @@ -23,8 +23,8 @@ type Transaction struct { } `json:"to"` Value string `json:"value"` InputData string `json:"inputData"` - Status uint `json:"status"` - GasUsed uint `json:"gasUsed"` + Status uint64 `json:"status"` + GasUsed uint64 `json:"gasUsed"` } // BlockResponse represents a full fetch JSON response diff --git a/pkg/fetch/rpc.go b/pkg/fetch/rpc.go new file mode 100644 index 0000000..f3f61f8 --- /dev/null +++ b/pkg/fetch/rpc.go @@ -0,0 +1,97 @@ +package fetch + +import ( + "context" + "math/big" + "strings" + + "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/celo-org/celo-blockchain/core/types" + celo "github.com/grassrootseconomics/cic-celo-sdk" + "github.com/grassrootseconomics/w3-celo-patch/module/eth" + "github.com/grassrootseconomics/w3-celo-patch/w3types" +) + +// RPCOpts reprsents the required paramters for an RPC fetcher. +type RPCOpts struct { + RPCProvider *celo.Provider +} + +// RPC is a RPC based block and transaction fetcher. +type RPC struct { + provider *celo.Provider +} + +// NewRPCFetcher returns a new RPC fetcher which implemnts Fetch. +// Note: No rate limiting feeature. +func NewRPCFetcher(o RPCOpts) Fetch { + return &RPC{ + provider: o.RPCProvider, + } +} + +// Block fetches via RPC and transforms the response to adapt to the GraphQL JSON response struct. +func (f *RPC) Block(ctx context.Context, blockNumber uint64) (FetchResponse, error) { + var ( + block types.Block + fetchResponse FetchResponse + ) + + if err := f.provider.Client.CallCtx( + ctx, + eth.BlockByNumber(big.NewInt(int64(blockNumber))).Returns(&block), + ); err != nil { + return fetchResponse, err + } + + txCount := len(block.Transactions()) + batchCalls := make([]w3types.Caller, txCount*2) + + txs := make([]types.Transaction, txCount) + txsReceipt := make([]types.Receipt, txCount) + + // Prepare batch calls. + for i, tx := range block.Transactions() { + batchCalls[i] = eth.Tx(tx.Hash()).Returns(&txs[i]) + batchCalls[txCount+i] = eth.TxReceipt(tx.Hash()).Returns(&txsReceipt[i]) + } + + if err := f.provider.Client.CallCtx( + ctx, + batchCalls..., + ); err != nil { + return fetchResponse, err + } + + // Transform response and adapt to FetchResponse. + for i := 0; i < txCount; i++ { + var txObject Transaction + + txObject.Block.Number = block.NumberU64() + txObject.Block.Timestamp = hexutil.EncodeUint64(block.Time()) + + from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i]) + if err != nil { + return fetchResponse, err + } + txObject.From.Address = strings.ToLower(from.Hex()) + // This check ignores contract deployment transactions. + if txs[i].To() != nil { + txObject.To.Address = strings.ToLower(txs[i].To().Hex()) + } + txObject.Value = hexutil.EncodeBig(txs[i].Value()) + txObject.InputData = hexutil.Encode(txs[i].Data()) + + txObject.Hash = txsReceipt[i].TxHash.Hex() + txObject.Index = txsReceipt[i].TransactionIndex + txObject.Status = txsReceipt[i].Status + txObject.GasUsed = txsReceipt[i].GasUsed + + fetchResponse.Data.Block.Transactions = append( + fetchResponse.Data.Block.Transactions, + txObject, + ) + } + + return fetchResponse, nil +} diff --git a/pkg/fetch/rpc_test.go b/pkg/fetch/rpc_test.go new file mode 100644 index 0000000..e586139 --- /dev/null +++ b/pkg/fetch/rpc_test.go @@ -0,0 +1,49 @@ +package fetch + +import ( + "context" + "os" + "testing" + + celo "github.com/grassrootseconomics/cic-celo-sdk" + "github.com/stretchr/testify/suite" +) + +var ( + rpcEndpoint = os.Getenv("TEST_RPC_ENDPOINT") +) + +type RPCTestSuite struct { + suite.Suite + fetch Fetch +} + +func (s *RPCTestSuite) SetupSuite() { + celoProvider, err := celo.NewProvider(celo.ProviderOpts{ + ChainId: celo.MainnetChainId, + RpcEndpoint: rpcEndpoint, + }) + + if err != nil { + return + } + + s.fetch = NewRPCFetcher(RPCOpts{ + RPCProvider: celoProvider, + }) +} + +func (s *RPCTestSuite) Test_E2E_Fetch_Existing_Block() { + resp, err := s.fetch.Block(context.Background(), 14974600) + s.NoError(err) + s.Len(resp.Data.Block.Transactions, 3) +} + +func (s *RPCTestSuite) Test_E2E_Fetch_Non_Existing_Block() { + _, err := s.fetch.Block(context.Background(), 14974600000) + s.Error(err) +} + +func TestRPCSuite(t *testing.T) { + suite.Run(t, new(RPCTestSuite)) +}