feat: RPC block fetcher (#15)

* feat: init base logic for rpc fetcher

* feat: rpc block fetcher, move filters to internal

* move filters to internal folder
* rpc block fetcher
* add benchmarks:

goos: linux
goarch: amd64
pkg: github.com/grassrootseconomics/cic-chain-events/pkg/fetch
cpu: AMD EPYC Processor
Benchmark_RPC
Benchmark_RPC/RPC_Block_Fetcher_Benchmark
Benchmark_RPC/RPC_Block_Fetcher_Benchmark-4                   25          46000646 ns/op          221697 B/op        844 allocs/op
Benchmark_GraphQL
Benchmark_GraphQL/GraphQL_Block_Fetcher_Benchmark
Benchmark_GraphQL/GraphQL_Block_Fetcher_Benchmark-4           56          21219962 ns/op           56686 B/op         94 allocs/op
PASS
ok      github.com/grassrootseconomics/cic-chain-events/pkg/fetch       2.920s

* inline-docs: Describe RPC fetcher
This commit is contained in:
Mohamed Sohail 2023-01-19 11:42:59 +03:00 committed by GitHub
parent ef9f2b2b7f
commit 20fc30c34a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 208 additions and 6 deletions

View File

@ -16,6 +16,9 @@ mod:
test-pkg:
TEST_GRAPHQL_ENDPOINT=https://rpc.alfajores.celo.grassecon.net/graphql go test -v -covermode atomic -coverprofile=covprofile ./pkg/...
bench-pkg:
TEST_RPC_ENDPOINT=https://rpc.alfajores.celo.grassecon.net TEST_GRAPHQL_ENDPOINT=https://rpc.alfajores.celo.grassecon.net/graphql go test -v -bench=. -run=^Benchmark ./pkg/...
migrate:
tern migrate -c migrations/tern.conf

View File

@ -4,7 +4,7 @@ import (
"strings"
"sync"
"github.com/grassrootseconomics/cic-chain-events/pkg/filter"
"github.com/grassrootseconomics/cic-chain-events/internal/filter"
)
func initAddressFilter() filter.Filter {

View File

@ -13,7 +13,7 @@ import (
"github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
"github.com/grassrootseconomics/cic-chain-events/internal/pool"
"github.com/grassrootseconomics/cic-chain-events/internal/syncer"
"github.com/grassrootseconomics/cic-chain-events/pkg/filter"
"github.com/grassrootseconomics/cic-chain-events/internal/filter"
"github.com/knadh/goyesql/v2"
"github.com/knadh/koanf"
"github.com/zerodha/logf"

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/alitto/pond v1.8.2
github.com/celo-org/celo-blockchain v1.6.1
github.com/goccy/go-json v0.10.0
github.com/grassrootseconomics/cic-celo-sdk v0.3.1
github.com/grassrootseconomics/w3-celo-patch v0.1.0
github.com/jackc/pgx/v5 v5.2.0
github.com/knadh/goyesql/v2 v2.2.0

2
go.sum
View File

@ -248,6 +248,8 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/grassrootseconomics/cic-celo-sdk v0.3.1 h1:SzmMFrqxSIdgePqwbUdoS3PNP82MFnlOecycVk2ZYWg=
github.com/grassrootseconomics/cic-celo-sdk v0.3.1/go.mod h1:EiR6d03GYu6jlVKNL1MbTAw/bqAW2WP3J/lkrZxPMdU=
github.com/grassrootseconomics/w3-celo-patch v0.1.0 h1:0fev2hYkGEyFX2D4oUG8yy4jXhtHv7qUtLLboXL5ycw=
github.com/grassrootseconomics/w3-celo-patch v0.1.0/go.mod h1:JtkXc+yDUiQQJdhYTqddZI/itdYGHY7H8PNZzBo4hCk=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=

View File

@ -3,9 +3,9 @@ package pipeline
import (
"context"
"github.com/grassrootseconomics/cic-chain-events/internal/filter"
"github.com/grassrootseconomics/cic-chain-events/internal/store"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/grassrootseconomics/cic-chain-events/pkg/filter"
"github.com/jackc/pgx/v5"
"github.com/zerodha/logf"
)

View File

@ -0,0 +1,50 @@
package fetch
import (
"context"
"testing"
celo "github.com/grassrootseconomics/cic-celo-sdk"
)
func Benchmark_RPC(b *testing.B) {
celoProvider, err := celo.NewProvider(celo.ProviderOpts{
ChainId: celo.MainnetChainId,
RpcEndpoint: rpcEndpoint,
})
rpc := NewRPCFetcher(RPCOpts{
RPCProvider: celoProvider,
})
if err != nil {
return
}
b.Run("RPC_Block_Fetcher_Benchmark", func(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := rpc.Block(context.Background(), 14974600)
if err != nil {
b.Fatal(err)
}
}
b.ReportAllocs()
})
}
func Benchmark_GraphQL(b *testing.B) {
graphql := NewGraphqlFetcher(GraphqlOpts{
GraphqlEndpoint: graphqlEndpoint,
})
b.Run("GraphQL_Block_Fetcher_Benchmark", func(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := graphql.Block(context.Background(), 14974600)
if err != nil {
b.Fatal(err)
}
}
b.ReportAllocs()
})
}

View File

@ -10,7 +10,7 @@ type Fetch interface {
// Transaction reprsents a JSON object of all important mined transaction information
type Transaction struct {
Block struct {
Number uint `json:"number"`
Number uint64 `json:"number"`
Timestamp string `json:"timestamp"`
} `json:"block"`
Hash string `json:"hash"`
@ -23,8 +23,8 @@ type Transaction struct {
} `json:"to"`
Value string `json:"value"`
InputData string `json:"inputData"`
Status uint `json:"status"`
GasUsed uint `json:"gasUsed"`
Status uint64 `json:"status"`
GasUsed uint64 `json:"gasUsed"`
}
// BlockResponse represents a full fetch JSON response

97
pkg/fetch/rpc.go Normal file
View File

@ -0,0 +1,97 @@
package fetch
import (
"context"
"math/big"
"strings"
"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/celo-org/celo-blockchain/core/types"
celo "github.com/grassrootseconomics/cic-celo-sdk"
"github.com/grassrootseconomics/w3-celo-patch/module/eth"
"github.com/grassrootseconomics/w3-celo-patch/w3types"
)
// RPCOpts reprsents the required paramters for an RPC fetcher.
type RPCOpts struct {
RPCProvider *celo.Provider
}
// RPC is a RPC based block and transaction fetcher.
type RPC struct {
provider *celo.Provider
}
// NewRPCFetcher returns a new RPC fetcher which implemnts Fetch.
// Note: No rate limiting feeature.
func NewRPCFetcher(o RPCOpts) Fetch {
return &RPC{
provider: o.RPCProvider,
}
}
// Block fetches via RPC and transforms the response to adapt to the GraphQL JSON response struct.
func (f *RPC) Block(ctx context.Context, blockNumber uint64) (FetchResponse, error) {
var (
block types.Block
fetchResponse FetchResponse
)
if err := f.provider.Client.CallCtx(
ctx,
eth.BlockByNumber(big.NewInt(int64(blockNumber))).Returns(&block),
); err != nil {
return fetchResponse, err
}
txCount := len(block.Transactions())
batchCalls := make([]w3types.Caller, txCount*2)
txs := make([]types.Transaction, txCount)
txsReceipt := make([]types.Receipt, txCount)
// Prepare batch calls.
for i, tx := range block.Transactions() {
batchCalls[i] = eth.Tx(tx.Hash()).Returns(&txs[i])
batchCalls[txCount+i] = eth.TxReceipt(tx.Hash()).Returns(&txsReceipt[i])
}
if err := f.provider.Client.CallCtx(
ctx,
batchCalls...,
); err != nil {
return fetchResponse, err
}
// Transform response and adapt to FetchResponse.
for i := 0; i < txCount; i++ {
var txObject Transaction
txObject.Block.Number = block.NumberU64()
txObject.Block.Timestamp = hexutil.EncodeUint64(block.Time())
from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i])
if err != nil {
return fetchResponse, err
}
txObject.From.Address = strings.ToLower(from.Hex())
// This check ignores contract deployment transactions.
if txs[i].To() != nil {
txObject.To.Address = strings.ToLower(txs[i].To().Hex())
}
txObject.Value = hexutil.EncodeBig(txs[i].Value())
txObject.InputData = hexutil.Encode(txs[i].Data())
txObject.Hash = txsReceipt[i].TxHash.Hex()
txObject.Index = txsReceipt[i].TransactionIndex
txObject.Status = txsReceipt[i].Status
txObject.GasUsed = txsReceipt[i].GasUsed
fetchResponse.Data.Block.Transactions = append(
fetchResponse.Data.Block.Transactions,
txObject,
)
}
return fetchResponse, nil
}

49
pkg/fetch/rpc_test.go Normal file
View File

@ -0,0 +1,49 @@
package fetch
import (
"context"
"os"
"testing"
celo "github.com/grassrootseconomics/cic-celo-sdk"
"github.com/stretchr/testify/suite"
)
var (
rpcEndpoint = os.Getenv("TEST_RPC_ENDPOINT")
)
type RPCTestSuite struct {
suite.Suite
fetch Fetch
}
func (s *RPCTestSuite) SetupSuite() {
celoProvider, err := celo.NewProvider(celo.ProviderOpts{
ChainId: celo.MainnetChainId,
RpcEndpoint: rpcEndpoint,
})
if err != nil {
return
}
s.fetch = NewRPCFetcher(RPCOpts{
RPCProvider: celoProvider,
})
}
func (s *RPCTestSuite) Test_E2E_Fetch_Existing_Block() {
resp, err := s.fetch.Block(context.Background(), 14974600)
s.NoError(err)
s.Len(resp.Data.Block.Transactions, 3)
}
func (s *RPCTestSuite) Test_E2E_Fetch_Non_Existing_Block() {
_, err := s.fetch.Block(context.Background(), 14974600000)
s.Error(err)
}
func TestRPCSuite(t *testing.T) {
suite.Run(t, new(RPCTestSuite))
}