diff --git a/.env.example b/.env.example index 90d70fc..675abb3 100644 --- a/.env.example +++ b/.env.example @@ -1,5 +1,9 @@ EVENTS_CHAIN__GRAPHQL_ENDPOINT= EVENTS_CHAIN__WS_ENDPOINT= +EVENTS_CHAIN__SYSTEM_ADDRESS= +EVENTS_CHAIN__TOKEN_INDEX_ADDRESS= +EVENTS_CHAIN__GAS_FAUCET_ADDRESS= +EVENTS_CHAIN__USER_INDEX_ADDRESS= EVENTS_SYNCER__INITIAL_LOWER_BOUND= EVENTS_POSTGRES__DSN= EVENTS_JETSTREAM__ENDPOINT= diff --git a/cmd/service/filters.go b/cmd/service/filters.go index b26bd27..27de989 100644 --- a/cmd/service/filters.go +++ b/cmd/service/filters.go @@ -9,7 +9,7 @@ import ( ) var ( - systemAddress = strings.ToLower("0x3D85285e39f05773aC92EAD27CB50a4385A529E4") + systemAddress = ko.MustString("chain.system_address") ) func initAddressFilter() filter.Filter { @@ -17,14 +17,14 @@ func initAddressFilter() filter.Filter { // TODO: Add route to update cache cache := &sync.Map{} - // Example bootstrap addresses - cache.Store(strings.ToLower("0xB92463E2262E700e29c16416270c9Fdfa17934D7"), "TRNVoucher") - cache.Store(strings.ToLower("0xf2a1fc19Ad275A0EAe3445798761FeD1Eea725d5"), "GasFaucet") - cache.Store(strings.ToLower("0x1e041282695C66944BfC53cabce947cf35CEaf87"), "AddressIndex") + cache.Store(strings.ToLower(ko.MustString("chain.token_index_address")), "TokenIndex") + cache.Store(strings.ToLower(ko.MustString("chain.gas_faucet_address")), "GasFaucet") + cache.Store(strings.ToLower(ko.MustString("chain.user_index_address")), "UserIndex") return filter.NewAddressFilter(filter.AddressFilterOpts{ - Cache: cache, - Logg: lo, + Cache: cache, + Logg: lo, + // TODO: Temporary shortcut SystemAddress: systemAddress, }) } diff --git a/config.toml b/config.toml index 7c606fe..82671a0 100644 --- a/config.toml +++ b/config.toml @@ -9,8 +9,12 @@ address = ":5000" # Geth API endpoints [chain] -graphql_endpoint = "" -ws_endpoint = "" +graphql_endpoint = "" +ws_endpoint = "" +system_address = "" +token_index_address = "" +gas_faucet_address = "" +user_index_address = "" # Syncer configs [syncer] diff --git a/go.mod b/go.mod index 653f4ae..a163155 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/VictoriaMetrics/metrics v1.23.1 github.com/alitto/pond v1.8.3 github.com/celo-org/celo-blockchain v1.7.2 - github.com/goccy/go-json v0.10.0 + github.com/grassrootseconomics/celoutils v1.1.1 github.com/grassrootseconomics/w3-celo-patch v0.2.0 github.com/jackc/pgx/v5 v5.3.1 github.com/jackc/tern/v2 v2.0.1 diff --git a/go.sum b/go.sum index 18d69fb..ab338b2 100644 --- a/go.sum +++ b/go.sum @@ -200,8 +200,6 @@ github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= -github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= -github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -269,6 +267,8 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= +github.com/grassrootseconomics/celoutils v1.1.1 h1:REsndvfBkPN8UKOoQFNEGm/sCwKtTm+woYtgMl3bfZ0= +github.com/grassrootseconomics/celoutils v1.1.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q= github.com/grassrootseconomics/w3-celo-patch v0.2.0 h1:YqibbPzX0tQKmxU1nUGzThPKk/fiYeYZY6Aif3eyu8U= github.com/grassrootseconomics/w3-celo-patch v0.2.0/go.mod h1:WhBXNzNIvHmS6B2hAeShs56oa9Azb4jQSrOMKuMdBWw= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= diff --git a/internal/filter/address_filter.go b/internal/filter/address_filter.go index 74c3f7f..c9899d8 100644 --- a/internal/filter/address_filter.go +++ b/internal/filter/address_filter.go @@ -30,7 +30,7 @@ func NewAddressFilter(o AddressFilterOpts) Filter { } } -func (f *AddressFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { +func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { if transaction.From.Address == f.systemAddress { return true, nil } diff --git a/internal/filter/address_filter_test.go b/internal/filter/address_filter_test.go index f7a6a16..5fabc67 100644 --- a/internal/filter/address_filter_test.go +++ b/internal/filter/address_filter_test.go @@ -58,7 +58,7 @@ func (s *AddressFilterSuite) TestAddresses() { } for _, test := range tests { - next, err := s.filter.Execute(context.Background(), test.transactionData) + next, err := s.filter.Execute(context.Background(), &test.transactionData) s.NoError(err) s.Equal(test.want, next) } diff --git a/internal/filter/filter.go b/internal/filter/filter.go index 4006a94..77d340c 100644 --- a/internal/filter/filter.go +++ b/internal/filter/filter.go @@ -8,5 +8,5 @@ import ( // Filter defines a read only filter which must return next as true/false or an error type Filter interface { - Execute(ctx context.Context, inputTransaction fetch.Transaction) (next bool, err error) + Execute(ctx context.Context, inputTransaction *fetch.Transaction) (next bool, err error) } diff --git a/internal/filter/gas_filter.go b/internal/filter/gas_filter.go index add861c..807bc92 100644 --- a/internal/filter/gas_filter.go +++ b/internal/filter/gas_filter.go @@ -4,6 +4,7 @@ import ( "context" "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-chain-events/internal/pub" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/zerodha/logf" @@ -35,7 +36,7 @@ func NewGasFilter(o GasFilterOpts) Filter { } } -func (f *GasFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { +func (f *GasFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { transferValue, err := hexutil.DecodeUint64(transaction.Value) if err != nil { return false, err @@ -45,7 +46,7 @@ func (f *GasFilter) Execute(_ context.Context, transaction fetch.Transaction) (b if transaction.From.Address == f.systemAddress && transferValue > 0 { transferEvent := &pub.MinimalTxInfo{ Block: transaction.Block.Number, - To: transaction.To.Address, + To: celoutils.ChecksumAddress(transaction.To.Address), TxHash: transaction.Hash, TxIndex: transaction.Index, Value: transferValue, diff --git a/internal/filter/register_filter.go b/internal/filter/register_filter.go index a6ddcfe..f76bf49 100644 --- a/internal/filter/register_filter.go +++ b/internal/filter/register_filter.go @@ -4,6 +4,7 @@ import ( "context" "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-chain-events/internal/pub" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/w3-celo-patch" @@ -37,7 +38,7 @@ func NewRegisterFilter(o RegisterFilterOpts) Filter { } } -func (f *RegisterFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { +func (f *RegisterFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { if len(transaction.InputData) < 10 { return true, nil } @@ -52,7 +53,7 @@ func (f *RegisterFilter) Execute(_ context.Context, transaction fetch.Transactio addEvent := &pub.MinimalTxInfo{ Block: transaction.Block.Number, ContractAddress: transaction.To.Address, - To: transaction.To.Address, + To: celoutils.ChecksumAddress(transaction.To.Address), TxHash: transaction.Hash, TxIndex: transaction.Index, } diff --git a/internal/filter/transfer_filter.go b/internal/filter/transfer_filter.go index 26c2eeb..1c20020 100644 --- a/internal/filter/transfer_filter.go +++ b/internal/filter/transfer_filter.go @@ -5,6 +5,7 @@ import ( "math/big" "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-chain-events/internal/pub" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/w3-celo-patch" @@ -40,7 +41,7 @@ func NewTransferFilter(o TransferFilterOpts) Filter { } } -func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { +func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { if len(transaction.InputData) < 10 { return true, nil } @@ -60,7 +61,7 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio transferEvent := &pub.MinimalTxInfo{ Block: transaction.Block.Number, - From: transaction.From.Address, + From: celoutils.ChecksumAddress(transaction.From.Address), To: to.Hex(), ContractAddress: transaction.To.Address, TxHash: transaction.Hash, @@ -131,7 +132,7 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio transferEvent := &pub.MinimalTxInfo{ Block: transaction.Block.Number, - From: transaction.From.Address, + From: celoutils.ChecksumAddress(transaction.From.Address), To: to.Hex(), ContractAddress: transaction.To.Address, TxHash: transaction.Hash, diff --git a/internal/filter/transfer_filter_test.go b/internal/filter/transfer_filter_test.go index 092c65f..1642f92 100644 --- a/internal/filter/transfer_filter_test.go +++ b/internal/filter/transfer_filter_test.go @@ -66,7 +66,7 @@ func (s *TransferFilterSuite) TestTranfserInputs() { } for _, test := range tests { - next, err := s.filter.Execute(context.Background(), test.transactionData) + next, err := s.filter.Execute(context.Background(), &test.transactionData) s.NoError(err) s.Equal(test.want, next) } diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index c4ffec4..0032909 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -51,7 +51,7 @@ func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error { for _, tx := range fetchResp.Data.Block.Transactions { for _, filter := range md.filters { - next, err := filter.Execute(ctx, tx) + next, err := filter.Execute(ctx, &tx) if err != nil { return err } diff --git a/internal/pub/jetstream.go b/internal/pub/jetstream.go index 43c58e3..36f58b3 100644 --- a/internal/pub/jetstream.go +++ b/internal/pub/jetstream.go @@ -1,9 +1,9 @@ package pub import ( + "encoding/json" "time" - "github.com/goccy/go-json" "github.com/nats-io/nats.go" ) diff --git a/pkg/fetch/graphql.go b/pkg/fetch/graphql.go index 6c406e9..169ffdc 100644 --- a/pkg/fetch/graphql.go +++ b/pkg/fetch/graphql.go @@ -3,11 +3,10 @@ package fetch import ( "bytes" "context" + "encoding/json" "fmt" "net/http" "time" - - "github.com/goccy/go-json" ) const ( @@ -26,6 +25,11 @@ type Graphql struct { func NewGraphqlFetcher(o GraphqlOpts) Fetch { return &Graphql{ httpClient: &http.Client{ + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 60 * time.Second, + }, Timeout: time.Second * 5, }, graphqlEndpoint: o.GraphqlEndpoint, @@ -33,28 +37,26 @@ func NewGraphqlFetcher(o GraphqlOpts) Fetch { } func (f *Graphql) Block(ctx context.Context, blockNumber uint64) (FetchResponse, error) { - var ( - fetchResponse FetchResponse - ) + fetchResponse := FetchResponse{} req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.graphqlEndpoint, bytes.NewBufferString(fmt.Sprintf(graphqlQuery, blockNumber))) if err != nil { - return FetchResponse{}, err + return fetchResponse, err } req.Header.Set("Content-Type", "application/json") resp, err := f.httpClient.Do(req) if err != nil { - return FetchResponse{}, err - } - - if resp.StatusCode >= http.StatusBadRequest { - return FetchResponse{}, fmt.Errorf("error fetching block %s", resp.Status) + return fetchResponse, err } defer resp.Body.Close() + if resp.StatusCode >= http.StatusBadRequest { + return fetchResponse, fmt.Errorf("error fetching block %s", resp.Status) + } + if err := json.NewDecoder(resp.Body).Decode(&fetchResponse); err != nil { - return FetchResponse{}, err + return fetchResponse, err } return fetchResponse, nil