mirror of
https://github.com/grassrootseconomics/cic-chain-events.git
synced 2026-05-27 13:17:56 +02:00
feat: rpc block fetcher, move filters to internal
* move filters to internal folder * rpc block fetcher * add benchmarks: goos: linux goarch: amd64 pkg: github.com/grassrootseconomics/cic-chain-events/pkg/fetch cpu: AMD EPYC Processor Benchmark_RPC Benchmark_RPC/RPC_Block_Fetcher_Benchmark Benchmark_RPC/RPC_Block_Fetcher_Benchmark-4 25 46000646 ns/op 221697 B/op 844 allocs/op Benchmark_GraphQL Benchmark_GraphQL/GraphQL_Block_Fetcher_Benchmark Benchmark_GraphQL/GraphQL_Block_Fetcher_Benchmark-4 56 21219962 ns/op 56686 B/op 94 allocs/op PASS ok github.com/grassrootseconomics/cic-chain-events/pkg/fetch 2.920s
This commit is contained in:
50
pkg/fetch/benchmark_test.go
Normal file
50
pkg/fetch/benchmark_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package fetch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
celo "github.com/grassrootseconomics/cic-celo-sdk"
|
||||
)
|
||||
|
||||
func Benchmark_RPC(b *testing.B) {
|
||||
celoProvider, err := celo.NewProvider(celo.ProviderOpts{
|
||||
ChainId: celo.MainnetChainId,
|
||||
RpcEndpoint: rpcEndpoint,
|
||||
})
|
||||
|
||||
rpc := NewRPCFetcher(RPCOpts{
|
||||
RPCProvider: celoProvider,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
b.Run("RPC_Block_Fetcher_Benchmark", func(b *testing.B) {
|
||||
for n := 0; n < b.N; n++ {
|
||||
_, err := rpc.Block(context.Background(), 14974600)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
b.ReportAllocs()
|
||||
})
|
||||
}
|
||||
|
||||
func Benchmark_GraphQL(b *testing.B) {
|
||||
graphql := NewGraphqlFetcher(GraphqlOpts{
|
||||
GraphqlEndpoint: graphqlEndpoint,
|
||||
})
|
||||
|
||||
b.Run("GraphQL_Block_Fetcher_Benchmark", func(b *testing.B) {
|
||||
for n := 0; n < b.N; n++ {
|
||||
_, err := graphql.Block(context.Background(), 14974600)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
b.ReportAllocs()
|
||||
})
|
||||
|
||||
}
|
||||
@@ -10,7 +10,7 @@ type Fetch interface {
|
||||
// Transaction reprsents a JSON object of all important mined transaction information
|
||||
type Transaction struct {
|
||||
Block struct {
|
||||
Number uint `json:"number"`
|
||||
Number uint64 `json:"number"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
} `json:"block"`
|
||||
Hash string `json:"hash"`
|
||||
@@ -23,8 +23,8 @@ type Transaction struct {
|
||||
} `json:"to"`
|
||||
Value string `json:"value"`
|
||||
InputData string `json:"inputData"`
|
||||
Status uint `json:"status"`
|
||||
GasUsed uint `json:"gasUsed"`
|
||||
Status uint64 `json:"status"`
|
||||
GasUsed uint64 `json:"gasUsed"`
|
||||
}
|
||||
|
||||
// BlockResponse represents a full fetch JSON response
|
||||
|
||||
@@ -3,7 +3,9 @@ package fetch
|
||||
import (
|
||||
"context"
|
||||
"math/big"
|
||||
"strings"
|
||||
|
||||
"github.com/celo-org/celo-blockchain/common/hexutil"
|
||||
"github.com/celo-org/celo-blockchain/core/types"
|
||||
celo "github.com/grassrootseconomics/cic-celo-sdk"
|
||||
"github.com/grassrootseconomics/w3-celo-patch/module/eth"
|
||||
@@ -37,7 +39,7 @@ func (f *RPC) Block(ctx context.Context, blockNumber uint64) (FetchResponse, err
|
||||
ctx,
|
||||
eth.BlockByNumber(big.NewInt(int64(blockNumber))).Returns(&block),
|
||||
); err != nil {
|
||||
return FetchResponse{}, nil
|
||||
return FetchResponse{}, err
|
||||
}
|
||||
|
||||
txCount := len(block.Transactions())
|
||||
@@ -58,7 +60,31 @@ func (f *RPC) Block(ctx context.Context, blockNumber uint64) (FetchResponse, err
|
||||
return FetchResponse{}, nil
|
||||
}
|
||||
|
||||
// TODO: Create FetchResponse
|
||||
for i := 0; i < txCount; i++ {
|
||||
tx := Transaction{}
|
||||
|
||||
tx.Block.Number = block.NumberU64()
|
||||
tx.Block.Timestamp = hexutil.EncodeUint64(block.Time())
|
||||
tx.Hash = txsReceipt[i].TxHash.Hex()
|
||||
tx.Index = txsReceipt[i].TransactionIndex
|
||||
|
||||
from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i])
|
||||
if err != nil {
|
||||
return FetchResponse{}, err
|
||||
}
|
||||
|
||||
tx.From.Address = strings.ToLower(from.Hex())
|
||||
tx.To.Address = strings.ToLower(txs[i].To().Hex())
|
||||
tx.Value = hexutil.EncodeBig(txs[i].Value())
|
||||
tx.InputData = hexutil.Encode(txs[i].Data())
|
||||
tx.Status = txsReceipt[i].Status
|
||||
tx.GasUsed = txsReceipt[i].GasUsed
|
||||
|
||||
fetchResponse.Data.Block.Transactions = append(
|
||||
fetchResponse.Data.Block.Transactions,
|
||||
tx,
|
||||
)
|
||||
}
|
||||
|
||||
return fetchResponse, nil
|
||||
}
|
||||
|
||||
49
pkg/fetch/rpc_test.go
Normal file
49
pkg/fetch/rpc_test.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package fetch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
celo "github.com/grassrootseconomics/cic-celo-sdk"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
var (
|
||||
rpcEndpoint = os.Getenv("TEST_RPC_ENDPOINT")
|
||||
)
|
||||
|
||||
type RPCTestSuite struct {
|
||||
suite.Suite
|
||||
fetch Fetch
|
||||
}
|
||||
|
||||
func (s *RPCTestSuite) SetupSuite() {
|
||||
celoProvider, err := celo.NewProvider(celo.ProviderOpts{
|
||||
ChainId: celo.MainnetChainId,
|
||||
RpcEndpoint: rpcEndpoint,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.fetch = NewRPCFetcher(RPCOpts{
|
||||
RPCProvider: celoProvider,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *RPCTestSuite) Test_E2E_Fetch_Existing_Block() {
|
||||
resp, err := s.fetch.Block(context.Background(), 14974600)
|
||||
s.NoError(err)
|
||||
s.Len(resp.Data.Block.Transactions, 3)
|
||||
}
|
||||
|
||||
func (s *RPCTestSuite) Test_E2E_Fetch_Non_Existing_Block() {
|
||||
_, err := s.fetch.Block(context.Background(), 14974600000)
|
||||
s.Error(err)
|
||||
}
|
||||
|
||||
func TestRPCSuite(t *testing.T) {
|
||||
suite.Run(t, new(RPCTestSuite))
|
||||
}
|
||||
@@ -1,34 +0,0 @@
|
||||
package filter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
|
||||
"github.com/zerodha/logf"
|
||||
)
|
||||
|
||||
type AddressFilterOpts struct {
|
||||
Cache *sync.Map
|
||||
Logg logf.Logger
|
||||
}
|
||||
|
||||
type AddressFilter struct {
|
||||
cache *sync.Map
|
||||
logg logf.Logger
|
||||
}
|
||||
|
||||
func NewAddressFilter(o AddressFilterOpts) Filter {
|
||||
return &AddressFilter{
|
||||
cache: o.Cache,
|
||||
logg: o.Logg,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *AddressFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
|
||||
if _, found := f.cache.Load(transaction.To.Address); found {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
@@ -1,77 +0,0 @@
|
||||
package filter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/zerodha/logf"
|
||||
)
|
||||
|
||||
type AddressFilterSuite struct {
|
||||
suite.Suite
|
||||
filter Filter
|
||||
}
|
||||
|
||||
func (s *AddressFilterSuite) SetupSuite() {
|
||||
addressCache := &sync.Map{}
|
||||
|
||||
addressCache.Store("0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9f", "")
|
||||
|
||||
logg := logf.New(
|
||||
logf.Opts{
|
||||
Level: logf.DebugLevel,
|
||||
},
|
||||
)
|
||||
|
||||
s.filter = NewAddressFilter(AddressFilterOpts{
|
||||
Cache: addressCache,
|
||||
Logg: logg,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *AddressFilterSuite) TestAddresses() {
|
||||
type testCase struct {
|
||||
transactionData fetch.Transaction
|
||||
want bool
|
||||
wantErr bool
|
||||
}
|
||||
|
||||
// Generated with eth-encode
|
||||
tests := []testCase{
|
||||
{
|
||||
transactionData: fetch.Transaction{
|
||||
To: struct {
|
||||
Address string "json:\"address\""
|
||||
}{
|
||||
Address: "0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9f",
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
transactionData: fetch.Transaction{
|
||||
To: struct {
|
||||
Address string "json:\"address\""
|
||||
}{
|
||||
Address: "0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9x",
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
next, err := s.filter.Execute(context.Background(), test.transactionData)
|
||||
s.NoError(err)
|
||||
s.Equal(test.want, next)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddressFilterSuite(t *testing.T) {
|
||||
suite.Run(t, new(AddressFilterSuite))
|
||||
}
|
||||
@@ -1,92 +0,0 @@
|
||||
package filter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/big"
|
||||
|
||||
"github.com/celo-org/celo-blockchain/common"
|
||||
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
|
||||
"github.com/grassrootseconomics/w3-celo-patch"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/zerodha/logf"
|
||||
)
|
||||
|
||||
var (
|
||||
transferSig = w3.MustNewFunc("transfer(address, uint256)", "bool")
|
||||
transferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool")
|
||||
mintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool")
|
||||
)
|
||||
|
||||
type DecodeFilterOpts struct {
|
||||
Logg logf.Logger
|
||||
JSCtx nats.JetStreamContext
|
||||
}
|
||||
|
||||
type DecodeFilter struct {
|
||||
logg logf.Logger
|
||||
js nats.JetStreamContext
|
||||
}
|
||||
|
||||
func NewDecodeFilter(o DecodeFilterOpts) Filter {
|
||||
return &DecodeFilter{
|
||||
logg: o.Logg,
|
||||
js: o.JSCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
|
||||
switch transaction.InputData[:10] {
|
||||
case "0xa9059cbb":
|
||||
var (
|
||||
to common.Address
|
||||
value big.Int
|
||||
)
|
||||
|
||||
if err := transferSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
_, err := f.js.Publish("CHAIN.transfer", []byte(transaction.Hash), nats.MsgId(transaction.Hash))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
case "0x23b872dd":
|
||||
var (
|
||||
from common.Address
|
||||
to common.Address
|
||||
value big.Int
|
||||
)
|
||||
|
||||
if err := transferFromSig.DecodeArgs(w3.B(transaction.InputData), &from, &to, &value); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
_, err := f.js.Publish("CHAIN.transferFrom", []byte(transaction.Hash), nats.MsgId(transaction.Hash))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
case "0x449a52f8":
|
||||
var (
|
||||
to common.Address
|
||||
value big.Int
|
||||
)
|
||||
|
||||
if err := mintToSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
_, err := f.js.Publish("CHAIN.mintTo", []byte(transaction.Hash), nats.MsgId(transaction.Hash))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
default:
|
||||
f.logg.Debug("unknownSignature", "inpuData", transaction.InputData)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
@@ -1,77 +0,0 @@
|
||||
package filter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/zerodha/logf"
|
||||
)
|
||||
|
||||
type DecodeFilterSuite struct {
|
||||
suite.Suite
|
||||
filter Filter
|
||||
}
|
||||
|
||||
func (s *DecodeFilterSuite) SetupSuite() {
|
||||
logg := logf.New(
|
||||
logf.Opts{
|
||||
Level: logf.DebugLevel,
|
||||
},
|
||||
)
|
||||
|
||||
s.filter = NewDecodeFilter(DecodeFilterOpts{
|
||||
Logg: logg,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *DecodeFilterSuite) TestTranfserInputs() {
|
||||
type testCase struct {
|
||||
transactionData fetch.Transaction
|
||||
want bool
|
||||
wantErr bool
|
||||
}
|
||||
|
||||
// Generated with eth-encode
|
||||
tests := []testCase{
|
||||
{
|
||||
transactionData: fetch.Transaction{
|
||||
InputData: "0xa9059cbb000000000000000000000000000000000000000000000000000000000000dEaD00000000000000000000000000000000000000000000000000000000000003e8",
|
||||
},
|
||||
want: true,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
transactionData: fetch.Transaction{
|
||||
InputData: "0x23b872dd000000000000000000000000000000000000000000000000000000000000dEaD000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000003e8",
|
||||
},
|
||||
want: true,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
transactionData: fetch.Transaction{
|
||||
InputData: "0x449a52f8000000000000000000000000000000000000000000000000000000000000dEaD00000000000000000000000000000000000000000000000000000000000003e8",
|
||||
},
|
||||
want: true,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
transactionData: fetch.Transaction{
|
||||
InputData: "0x8d72ec9d000000000000000000000000000000000000000000000000000000000000dEaD00000000000000000000000000000000000000000000000000000000000003e8",
|
||||
},
|
||||
want: false,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
next, err := s.filter.Execute(context.Background(), test.transactionData)
|
||||
s.NoError(err)
|
||||
s.Equal(test.want, next)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeFilterSuite(t *testing.T) {
|
||||
suite.Run(t, new(DecodeFilterSuite))
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
package filter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
|
||||
)
|
||||
|
||||
// Filter defines a read only filter which must return next as true/false or an error
|
||||
type Filter interface {
|
||||
Execute(ctx context.Context, inputTransaction fetch.Transaction) (next bool, err error)
|
||||
}
|
||||
Reference in New Issue
Block a user