From 9d1b77e907c083a2643ce915a485cad04e9d3cf9 Mon Sep 17 00:00:00 2001 From: Mohamed Sohail Date: Fri, 24 Feb 2023 13:28:30 +0300 Subject: [PATCH] refactor: filters, pipeline, minor fixes, remove RPC support (#27) * devnet: snapshot * refactor: pass struct through pipeline fllters * refactor: replace timer with ticker * refactor: filters, jetstream emitter * add register filter * update gas filter * refactor: remove RPC fetcher support --- cmd/filters.go | 43 +++-- cmd/init.go | 35 ++-- cmd/main.go | 11 +- config.toml | 28 ++- go.mod | 1 - go.sum | 2 - internal/events/events.go | 17 ++ internal/events/jetstream.go | 78 +++++++++ internal/filter/address_filter.go | 21 ++- internal/filter/address_filter_test.go | 10 +- internal/filter/decode_filter.go | 161 ------------------ internal/filter/filter.go | 2 +- internal/filter/gas_filter.go | 68 ++++++++ internal/filter/register_filter.go | 74 ++++++++ internal/filter/transfer_filter.go | 156 +++++++++++++++++ ...filter_test.go => transfer_filter_test.go} | 14 +- internal/pipeline/pipeline.go | 2 +- internal/syncer/janitor.go | 6 +- pkg/fetch/benchmark_test.go | 50 ------ pkg/fetch/rpc.go | 97 ----------- pkg/fetch/rpc_test.go | 49 ------ 21 files changed, 478 insertions(+), 447 deletions(-) create mode 100644 internal/events/events.go create mode 100644 internal/events/jetstream.go delete mode 100644 internal/filter/decode_filter.go create mode 100644 internal/filter/gas_filter.go create mode 100644 internal/filter/register_filter.go create mode 100644 internal/filter/transfer_filter.go rename internal/filter/{decode_filter_test.go => transfer_filter_test.go} (82%) delete mode 100644 pkg/fetch/benchmark_test.go delete mode 100644 pkg/fetch/rpc.go delete mode 100644 pkg/fetch/rpc_test.go diff --git a/cmd/filters.go b/cmd/filters.go index beaa591..8656835 100644 --- a/cmd/filters.go +++ b/cmd/filters.go @@ -4,33 +4,50 @@ import ( "strings" "sync" + "github.com/grassrootseconomics/cic-chain-events/internal/events" "github.com/grassrootseconomics/cic-chain-events/internal/filter" ) +var ( + systemAddress = strings.ToLower("0x3D85285e39f05773aC92EAD27CB50a4385A529E4") +) + func initAddressFilter() filter.Filter { // TODO: Bootstrap addresses from smart contract // TODO: Add route to update cache cache := &sync.Map{} // Example bootstrap addresses - cache.Store(strings.ToLower("0x617f3112bf5397D0467D315cC709EF968D9ba546"), "USDT") - cache.Store(strings.ToLower("0x765DE816845861e75A25fCA122bb6898B8B1282a"), "cUSD") - cache.Store(strings.ToLower("0xD8763CBa276a3738E6DE85b4b3bF5FDed6D6cA73"), "cEUR") + cache.Store(strings.ToLower("0xB92463E2262E700e29c16416270c9Fdfa17934D7"), "TRNVoucher") + cache.Store(strings.ToLower("0xf2a1fc19Ad275A0EAe3445798761FeD1Eea725d5"), "GasFaucet") + cache.Store(strings.ToLower("0x1e041282695C66944BfC53cabce947cf35CEaf87"), "AddressIndex") return filter.NewAddressFilter(filter.AddressFilterOpts{ - Cache: cache, - Logg: lo, + Cache: cache, + Logg: lo, + SystemAddress: systemAddress, }) } -func initDecodeFilter() filter.Filter { - js, err := initJetStream() - if err != nil { - lo.Fatal("filters: critical error loading jetstream", "error", err) - } +func initTransferFilter(eventEmitter events.EventEmitter) filter.Filter { + return filter.NewTransferFilter(filter.TransferFilterOpts{ + EventEmitter: eventEmitter, + Logg: lo, + }) - return filter.NewDecodeFilter(filter.DecodeFilterOpts{ - Logg: lo, - JSCtx: js, +} + +func initGasGiftFilter(eventEmitter events.EventEmitter) filter.Filter { + return filter.NewGasFilter(filter.GasFilterOpts{ + EventEmitter: eventEmitter, + Logg: lo, + SystemAddress: systemAddress, + }) +} + +func initRegisterFilter(eventEmitter events.EventEmitter) filter.Filter { + return filter.NewRegisterFilter(filter.RegisterFilterOpts{ + EventEmitter: eventEmitter, + Logg: lo, }) } diff --git a/cmd/init.go b/cmd/init.go index c10f605..e085194 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -4,6 +4,7 @@ import ( "strings" "time" + "github.com/grassrootseconomics/cic-chain-events/internal/events" "github.com/grassrootseconomics/cic-chain-events/internal/store" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/jackc/pgx/v5" @@ -12,7 +13,6 @@ import ( "github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/file" - "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) @@ -23,6 +23,7 @@ func initLogger(debug bool) logf.Logger { if debug { loggOpts.Level = logf.DebugLevel + loggOpts.EnableCaller = true } return logf.New(loggOpts) @@ -77,32 +78,16 @@ func initFetcher() fetch.Fetch { }) } -func initJetStream() (nats.JetStreamContext, error) { - natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint")) +func initJetStream() (events.EventEmitter, error) { + jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{ + ServerUrl: ko.MustString("jetstream.endpoint"), + PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour, + DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour, + }) + if err != nil { return nil, err } - js, err := natsConn.JetStream() - if err != nil { - return nil, err - } - - // Bootstrap stream if it does not exist - stream, _ := js.StreamInfo(ko.MustString("jetstream.stream_name")) - if stream == nil { - lo.Info("jetstream: bootstrapping stream") - _, err = js.AddStream(&nats.StreamConfig{ - Name: ko.MustString("jetstream.stream_name"), - MaxAge: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour, - Storage: nats.FileStorage, - Subjects: ko.MustStrings("jetstream.stream_subjects"), - Duplicates: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour, - }) - if err != nil { - return nil, err - } - } - - return js, nil + return jsEmitter, nil } diff --git a/cmd/main.go b/cmd/main.go index 4091832..e8e31bb 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,10 +10,10 @@ import ( "syscall" "time" + "github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/pipeline" "github.com/grassrootseconomics/cic-chain-events/internal/pool" "github.com/grassrootseconomics/cic-chain-events/internal/syncer" - "github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/knadh/goyesql/v2" "github.com/knadh/koanf" "github.com/zerodha/logf" @@ -58,13 +58,20 @@ func main() { lo.Fatal("main: critical error loading pg store", "error", err) } + jsCtx, err := initJetStream() + if err != nil { + lo.Fatal("main: critical error loading jetstream context", "error", err) + } + graphqlFetcher := initFetcher() pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{ BlockFetcher: graphqlFetcher, Filters: []filter.Filter{ initAddressFilter(), - initDecodeFilter(), + initGasGiftFilter(jsCtx), + initTransferFilter(jsCtx), + initRegisterFilter(jsCtx), }, Logg: lo, Store: pgStore, diff --git a/config.toml b/config.toml index caa2e08..dbfa45b 100644 --- a/config.toml +++ b/config.toml @@ -1,43 +1,35 @@ [metrics] # Exposes Prometheus metrics -# /metrics endpoint go_process = true # API server [api] # Host and port -address = ":8080" +address = ":8085" # Geth API endpoints [chain] -graphql_endpoint = "https://rpc.celo.grassecon.net/graphql" -ws_endpoint = "wss://socket.celo.grassecon.net" +graphql_endpoint = "https://rpc.alfajores.celo.grassecon.net/graphql" +ws_endpoint = "wss://ws.alfajores.celo.grassecon.net" +# Syncer configs [syncer] -# Number of goroutines assigned to the janitor worker pool +# Maximum number of missing blocks pushed into the worker queue every janitor sweep +janitor_queue_size = 500 +# Number of goroutines assigned to the worker pool janitor_concurrency = 5 # Syncer start block -initial_lower_bound = 17269000 -# Max blocks in worker queue awaiting processing -janitor_queue_size = 500 -# Janitor sweep interval +initial_lower_bound = 16373156 +# Janitor sweep interval, should take into account concurrency and queue_size janitor_sweep_interval = 5 [postgres] -# Default is the Docker container DSN dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events" # https://docs.nats.io/ [jetstream] endpoint = "nats://localhost:4222" -stream_name = "CHAIN" -# Duration JetStream should keep the message before remocing it from the persistent store +# Duration JetStream should keep the message before GC persist_duration_hours = 48 # Duration to ignore duplicate transactions (e.g. due to restart) dedup_duration_hours = 6 -# Stream subjects -stream_subjects = [ - "CHAIN.transfer", - "CHAIN.transferFrom", - "CHAIN.mintTo" -] diff --git a/go.mod b/go.mod index 632fd4c..b6c65de 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/alitto/pond v1.8.2 github.com/celo-org/celo-blockchain v1.6.1 github.com/goccy/go-json v0.10.0 - github.com/grassrootseconomics/cic-celo-sdk v0.3.1 github.com/grassrootseconomics/w3-celo-patch v0.1.0 github.com/jackc/pgx/v5 v5.2.0 github.com/knadh/goyesql/v2 v2.2.0 diff --git a/go.sum b/go.sum index ce0bbd9..14f95a8 100644 --- a/go.sum +++ b/go.sum @@ -248,8 +248,6 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= -github.com/grassrootseconomics/cic-celo-sdk v0.3.1 h1:SzmMFrqxSIdgePqwbUdoS3PNP82MFnlOecycVk2ZYWg= -github.com/grassrootseconomics/cic-celo-sdk v0.3.1/go.mod h1:EiR6d03GYu6jlVKNL1MbTAw/bqAW2WP3J/lkrZxPMdU= github.com/grassrootseconomics/w3-celo-patch v0.1.0 h1:0fev2hYkGEyFX2D4oUG8yy4jXhtHv7qUtLLboXL5ycw= github.com/grassrootseconomics/w3-celo-patch v0.1.0/go.mod h1:JtkXc+yDUiQQJdhYTqddZI/itdYGHY7H8PNZzBo4hCk= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 0000000..02c2bc7 --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,17 @@ +package events + +type EventEmitter interface { + Close() + Publish(subject string, dedupId string, eventPayload interface{}) error +} + +type MinimalTxInfo struct { + Block uint64 `json:"block"` + From string `json:"from"` + To string `json:"to"` + ContractAddress string `json:"contractAddress"` + Success bool `json:"success"` + TxHash string `json:"transactionHash"` + TxIndex uint `json:"transactionIndex"` + Value uint64 `json:"value"` +} diff --git a/internal/events/jetstream.go b/internal/events/jetstream.go new file mode 100644 index 0000000..31bbd40 --- /dev/null +++ b/internal/events/jetstream.go @@ -0,0 +1,78 @@ +package events + +import ( + "time" + + "github.com/goccy/go-json" + "github.com/nats-io/nats.go" +) + +const ( + StreamName string = "CHAIN" + StreamSubjects string = "CHAIN.*" +) + +type JetStreamOpts struct { + ServerUrl string + PersistDuration time.Duration + DedupDuration time.Duration +} + +type JetStream struct { + jsCtx nats.JetStreamContext + nc *nats.Conn +} + +func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) { + natsConn, err := nats.Connect(o.ServerUrl) + if err != nil { + return nil, err + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, err + } + + // Bootstrap stream if it doesn't exist. + stream, _ := js.StreamInfo(StreamName) + if stream == nil { + _, err = js.AddStream(&nats.StreamConfig{ + Name: StreamName, + MaxAge: o.PersistDuration, + Storage: nats.FileStorage, + Subjects: []string{StreamSubjects}, + Duplicates: o.DedupDuration, + }) + if err != nil { + return nil, err + } + } + + return &JetStream{ + jsCtx: js, + nc: natsConn, + }, nil +} + +// Close gracefully shutdowns the JetStream connection. +func (js *JetStream) Close() { + if js.nc != nil { + js.nc.Close() + } +} + +// Publish publishes the JSON data to the NATS stream. +func (js *JetStream) Publish(subject string, dedupId string, eventPayload interface{}) error { + jsonData, err := json.Marshal(eventPayload) + if err != nil { + return err + } + + _, err = js.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId)) + if err != nil { + return err + } + + return nil +} diff --git a/internal/filter/address_filter.go b/internal/filter/address_filter.go index 6705826..00f4d74 100644 --- a/internal/filter/address_filter.go +++ b/internal/filter/address_filter.go @@ -9,23 +9,30 @@ import ( ) type AddressFilterOpts struct { - Cache *sync.Map - Logg logf.Logger + Cache *sync.Map + Logg logf.Logger + SystemAddress string } type AddressFilter struct { - cache *sync.Map - logg logf.Logger + cache *sync.Map + logg logf.Logger + systemAddress string } func NewAddressFilter(o AddressFilterOpts) Filter { return &AddressFilter{ - cache: o.Cache, - logg: o.Logg, + cache: o.Cache, + logg: o.Logg, + systemAddress: o.SystemAddress, } } -func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { +func (f *AddressFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { + if transaction.From.Address == f.systemAddress { + return true, nil + } + if _, found := f.cache.Load(transaction.To.Address); found { return true, nil } diff --git a/internal/filter/address_filter_test.go b/internal/filter/address_filter_test.go index 45461fb..f7a6a16 100644 --- a/internal/filter/address_filter_test.go +++ b/internal/filter/address_filter_test.go @@ -7,7 +7,6 @@ import ( "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/stretchr/testify/suite" - "github.com/zerodha/logf" ) type AddressFilterSuite struct { @@ -20,15 +19,8 @@ func (s *AddressFilterSuite) SetupSuite() { addressCache.Store("0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9f", "") - logg := logf.New( - logf.Opts{ - Level: logf.DebugLevel, - }, - ) - s.filter = NewAddressFilter(AddressFilterOpts{ Cache: addressCache, - Logg: logg, }) } @@ -66,7 +58,7 @@ func (s *AddressFilterSuite) TestAddresses() { } for _, test := range tests { - next, err := s.filter.Execute(context.Background(), &test.transactionData) + next, err := s.filter.Execute(context.Background(), test.transactionData) s.NoError(err) s.Equal(test.want, next) } diff --git a/internal/filter/decode_filter.go b/internal/filter/decode_filter.go deleted file mode 100644 index de4909a..0000000 --- a/internal/filter/decode_filter.go +++ /dev/null @@ -1,161 +0,0 @@ -package filter - -import ( - "context" - "encoding/json" - "math/big" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" - "github.com/grassrootseconomics/w3-celo-patch" - "github.com/nats-io/nats.go" - "github.com/zerodha/logf" -) - -var ( - transferSig = w3.MustNewFunc("transfer(address, uint256)", "bool") - transferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool") - mintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool") -) - -type DecodeFilterOpts struct { - Logg logf.Logger - JSCtx nats.JetStreamContext -} - -type DecodeFilter struct { - logg logf.Logger - js nats.JetStreamContext -} - -type minimalTxInfo struct { - Block uint64 `json:"block"` - From string `json:"from"` - Success bool `json:"success"` - To string `json:"to"` - TokenAddress string `json:"tokenAddress"` - TxHash string `json:"transactionHash"` - TxIndex uint `json:"transactionIndex"` - Value uint64 `json:"value"` -} - -func NewDecodeFilter(o DecodeFilterOpts) Filter { - return &DecodeFilter{ - logg: o.Logg, - js: o.JSCtx, - } -} - -func (f *DecodeFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { - switch transaction.InputData[:10] { - case "0xa9059cbb": - var ( - to common.Address - value big.Int - ) - - if err := transferSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil { - return false, err - } - - transferEvent := &minimalTxInfo{ - Block: transaction.Block.Number, - From: transaction.From.Address, - To: to.Hex(), - TokenAddress: transaction.To.Address, - TxHash: transaction.Hash, - TxIndex: transaction.Index, - Value: value.Uint64(), - } - - if transaction.Status == 1 { - transferEvent.Success = true - } - - json, err := json.Marshal(transferEvent) - if err != nil { - return false, err - } - - _, err = f.js.Publish("CHAIN.transfer", json, nats.MsgId(transaction.Hash)) - if err != nil { - return false, err - } - - return true, nil - case "0x23b872dd": - var ( - from common.Address - to common.Address - value big.Int - ) - - if err := transferFromSig.DecodeArgs(w3.B(transaction.InputData), &from, &to, &value); err != nil { - return false, err - } - - transferFromEvent := &minimalTxInfo{ - Block: transaction.Block.Number, - From: from.Hex(), - To: to.Hex(), - TokenAddress: transaction.To.Address, - TxHash: transaction.Hash, - TxIndex: transaction.Index, - Value: value.Uint64(), - } - - if transaction.Status == 1 { - transferFromEvent.Success = true - } - - json, err := json.Marshal(transferFromEvent) - if err != nil { - return false, err - } - - _, err = f.js.Publish("CHAIN.transferFrom", json, nats.MsgId(transaction.Hash)) - if err != nil { - return false, err - } - - return true, nil - case "0x449a52f8": - var ( - to common.Address - value big.Int - ) - - if err := mintToSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil { - return false, err - } - - mintToEvent := &minimalTxInfo{ - Block: transaction.Block.Number, - From: transaction.From.Address, - To: to.Hex(), - TokenAddress: transaction.To.Address, - TxHash: transaction.Hash, - TxIndex: transaction.Index, - Value: value.Uint64(), - } - - if transaction.Status == 1 { - mintToEvent.Success = true - } - - json, err := json.Marshal(mintToEvent) - if err != nil { - return false, err - } - - _, err = f.js.Publish("CHAIN.mintTo", json, nats.MsgId(transaction.Hash)) - if err != nil { - return false, err - } - - return true, nil - default: - f.logg.Debug("unknownSignature", "inpuData", transaction.InputData) - return false, nil - } -} diff --git a/internal/filter/filter.go b/internal/filter/filter.go index 77d340c..4006a94 100644 --- a/internal/filter/filter.go +++ b/internal/filter/filter.go @@ -8,5 +8,5 @@ import ( // Filter defines a read only filter which must return next as true/false or an error type Filter interface { - Execute(ctx context.Context, inputTransaction *fetch.Transaction) (next bool, err error) + Execute(ctx context.Context, inputTransaction fetch.Transaction) (next bool, err error) } diff --git a/internal/filter/gas_filter.go b/internal/filter/gas_filter.go new file mode 100644 index 0000000..e164d52 --- /dev/null +++ b/internal/filter/gas_filter.go @@ -0,0 +1,68 @@ +package filter + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/grassrootseconomics/cic-chain-events/internal/events" + "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" + "github.com/zerodha/logf" +) + +const ( + gasFilterEventSubject = "CHAIN.gas" +) + +type GasFilterOpts struct { + EventEmitter events.EventEmitter + Logg logf.Logger + SystemAddress string +} + +type GasFilter struct { + eventEmitter events.EventEmitter + logg logf.Logger + systemAddress string +} + +func NewGasFilter(o GasFilterOpts) Filter { + return &GasFilter{ + eventEmitter: o.EventEmitter, + logg: o.Logg, + systemAddress: o.SystemAddress, + } +} + +func (f *GasFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { + transferValue, err := hexutil.DecodeUint64(transaction.Value) + if err != nil { + return false, err + } + + // TODO: This is a temporary shortcut to gift gas. Switch to gas faucet contract. + if transaction.From.Address == f.systemAddress && transferValue > 0 { + transferEvent := &events.MinimalTxInfo{ + Block: transaction.Block.Number, + To: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: transferValue, + } + + if transaction.Status == 1 { + transferEvent.Success = true + } + + if err := f.eventEmitter.Publish( + gasFilterEventSubject, + transaction.Hash, + transferEvent, + ); err != nil { + return false, err + } + + return true, nil + } + + return true, nil +} diff --git a/internal/filter/register_filter.go b/internal/filter/register_filter.go new file mode 100644 index 0000000..093dcea --- /dev/null +++ b/internal/filter/register_filter.go @@ -0,0 +1,74 @@ +package filter + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/cic-chain-events/internal/events" + "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/zerodha/logf" +) + +const ( + registerEventSubject = "CHAIN.register" +) + +var ( + addSig = w3.MustNewFunc("add(address)", "bool") +) + +type RegisterFilterOpts struct { + EventEmitter events.EventEmitter + Logg logf.Logger +} + +type RegisterFilter struct { + eventEmitter events.EventEmitter + logg logf.Logger +} + +func NewRegisterFilter(o RegisterFilterOpts) Filter { + return &RegisterFilter{ + eventEmitter: o.EventEmitter, + logg: o.Logg, + } +} + +func (f *RegisterFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { + if len(transaction.InputData) < 10 { + return true, nil + } + + if transaction.InputData[:10] == "0x0a3b0a4f" { + var address common.Address + + if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil { + return false, err + } + + addEvent := &events.MinimalTxInfo{ + Block: transaction.Block.Number, + ContractAddress: transaction.To.Address, + To: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + } + + if transaction.Status == 1 { + addEvent.Success = true + } + + if err := f.eventEmitter.Publish( + registerEventSubject, + transaction.Hash, + addEvent, + ); err != nil { + return false, err + } + + return true, nil + } + + return true, nil +} diff --git a/internal/filter/transfer_filter.go b/internal/filter/transfer_filter.go new file mode 100644 index 0000000..5bbe30a --- /dev/null +++ b/internal/filter/transfer_filter.go @@ -0,0 +1,156 @@ +package filter + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/cic-chain-events/internal/events" + "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/zerodha/logf" +) + +const ( + transferFilterEventSubject = "CHAIN.transfer" +) + +var ( + transferSig = w3.MustNewFunc("transfer(address, uint256)", "bool") + transferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool") + mintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool") +) + +type TransferFilterOpts struct { + EventEmitter events.EventEmitter + Logg logf.Logger +} + +type TransferFilter struct { + eventEmitter events.EventEmitter + logg logf.Logger +} + +func NewTransferFilter(o TransferFilterOpts) Filter { + return &TransferFilter{ + eventEmitter: o.EventEmitter, + logg: o.Logg, + } +} + +func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { + if len(transaction.InputData) < 10 { + return true, nil + } + + switch transaction.InputData[:10] { + case "0xa9059cbb": + var ( + to common.Address + value big.Int + ) + + if err := transferSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil { + return false, err + } + + f.logg.Debug("transfer_filter: new reg", "transfer", to) + + transferEvent := &events.MinimalTxInfo{ + Block: transaction.Block.Number, + From: transaction.From.Address, + To: to.Hex(), + ContractAddress: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: value.Uint64(), + } + + if transaction.Status == 1 { + transferEvent.Success = true + } + + if err := f.eventEmitter.Publish( + transferFilterEventSubject, + transaction.Hash, + transferEvent, + ); err != nil { + return false, err + } + + return true, nil + case "0x23b872dd": + var ( + from common.Address + to common.Address + value big.Int + ) + + if err := transferFromSig.DecodeArgs(w3.B(transaction.InputData), &from, &to, &value); err != nil { + return false, err + } + + f.logg.Debug("transfer_filter: new reg", "transferFrom", to) + + transferEvent := &events.MinimalTxInfo{ + Block: transaction.Block.Number, + From: from.Hex(), + To: to.Hex(), + ContractAddress: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: value.Uint64(), + } + + if transaction.Status == 1 { + transferEvent.Success = true + } + + if err := f.eventEmitter.Publish( + transferFilterEventSubject, + transaction.Hash, + transferEvent, + ); err != nil { + return false, err + } + + return true, nil + case "0x449a52f8": + var ( + to common.Address + value big.Int + ) + + if err := mintToSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil { + return false, err + } + + f.logg.Debug("transfer_filter: new reg", "mintTo", to) + + transferEvent := &events.MinimalTxInfo{ + Block: transaction.Block.Number, + From: transaction.From.Address, + To: to.Hex(), + ContractAddress: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: value.Uint64(), + } + + if transaction.Status == 1 { + transferEvent.Success = true + } + + if err := f.eventEmitter.Publish( + transferFilterEventSubject, + transaction.Hash, + transferEvent, + ); err != nil { + return false, err + } + + return true, nil + default: + return true, nil + } +} diff --git a/internal/filter/decode_filter_test.go b/internal/filter/transfer_filter_test.go similarity index 82% rename from internal/filter/decode_filter_test.go rename to internal/filter/transfer_filter_test.go index 234d304..092c65f 100644 --- a/internal/filter/decode_filter_test.go +++ b/internal/filter/transfer_filter_test.go @@ -9,24 +9,24 @@ import ( "github.com/zerodha/logf" ) -type DecodeFilterSuite struct { +type TransferFilterSuite struct { suite.Suite filter Filter } -func (s *DecodeFilterSuite) SetupSuite() { +func (s *TransferFilterSuite) SetupSuite() { logg := logf.New( logf.Opts{ Level: logf.DebugLevel, }, ) - s.filter = NewDecodeFilter(DecodeFilterOpts{ + s.filter = NewTransferFilter(TransferFilterOpts{ Logg: logg, }) } -func (s *DecodeFilterSuite) TestTranfserInputs() { +func (s *TransferFilterSuite) TestTranfserInputs() { type testCase struct { transactionData fetch.Transaction want bool @@ -66,12 +66,12 @@ func (s *DecodeFilterSuite) TestTranfserInputs() { } for _, test := range tests { - next, err := s.filter.Execute(context.Background(), &test.transactionData) + next, err := s.filter.Execute(context.Background(), test.transactionData) s.NoError(err) s.Equal(test.want, next) } } -func TestDecodeFilterSuite(t *testing.T) { - suite.Run(t, new(DecodeFilterSuite)) +func TestTransferFilterSuite(t *testing.T) { + suite.Run(t, new(TransferFilterSuite)) } diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index a64edbc..78872ff 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -49,7 +49,7 @@ func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error { for _, tx := range fetchResp.Data.Block.Transactions { for _, filter := range md.filters { - next, err := filter.Execute(ctx, &tx) + next, err := filter.Execute(ctx, tx) if err != nil { return err } diff --git a/internal/syncer/janitor.go b/internal/syncer/janitor.go index be7bf97..1a3cd63 100644 --- a/internal/syncer/janitor.go +++ b/internal/syncer/janitor.go @@ -48,20 +48,18 @@ func NewJanitor(o JanitorOpts) *Janitor { } func (j *Janitor) Start(ctx context.Context) error { - timer := time.NewTimer(j.sweepInterval) + ticker := time.NewTicker(j.sweepInterval) for { select { case <-ctx.Done(): j.logg.Info("janitor: shutdown signal received") return nil - case <-timer.C: + case <-ticker.C: j.logg.Debug("janitor: starting sweep") if err := j.QueueMissingBlocks(context.Background()); err != nil { j.logg.Error("janitor: queue missing blocks error", "error", err) } - - timer.Reset(j.sweepInterval) } } } diff --git a/pkg/fetch/benchmark_test.go b/pkg/fetch/benchmark_test.go deleted file mode 100644 index 910ef54..0000000 --- a/pkg/fetch/benchmark_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package fetch - -import ( - "context" - "testing" - - celo "github.com/grassrootseconomics/cic-celo-sdk" -) - -func Benchmark_RPC(b *testing.B) { - celoProvider, err := celo.NewProvider(celo.ProviderOpts{ - ChainId: celo.MainnetChainId, - RpcEndpoint: rpcEndpoint, - }) - - rpc := NewRPCFetcher(RPCOpts{ - RPCProvider: celoProvider, - }) - - if err != nil { - return - } - - b.Run("RPC_Block_Fetcher_Benchmark", func(b *testing.B) { - for n := 0; n < b.N; n++ { - _, err := rpc.Block(context.Background(), 14974600) - if err != nil { - b.Fatal(err) - } - } - b.ReportAllocs() - }) -} - -func Benchmark_GraphQL(b *testing.B) { - graphql := NewGraphqlFetcher(GraphqlOpts{ - GraphqlEndpoint: graphqlEndpoint, - }) - - b.Run("GraphQL_Block_Fetcher_Benchmark", func(b *testing.B) { - for n := 0; n < b.N; n++ { - _, err := graphql.Block(context.Background(), 14974600) - if err != nil { - b.Fatal(err) - } - } - b.ReportAllocs() - }) - -} diff --git a/pkg/fetch/rpc.go b/pkg/fetch/rpc.go deleted file mode 100644 index f3f61f8..0000000 --- a/pkg/fetch/rpc.go +++ /dev/null @@ -1,97 +0,0 @@ -package fetch - -import ( - "context" - "math/big" - "strings" - - "github.com/celo-org/celo-blockchain/common/hexutil" - "github.com/celo-org/celo-blockchain/core/types" - celo "github.com/grassrootseconomics/cic-celo-sdk" - "github.com/grassrootseconomics/w3-celo-patch/module/eth" - "github.com/grassrootseconomics/w3-celo-patch/w3types" -) - -// RPCOpts reprsents the required paramters for an RPC fetcher. -type RPCOpts struct { - RPCProvider *celo.Provider -} - -// RPC is a RPC based block and transaction fetcher. -type RPC struct { - provider *celo.Provider -} - -// NewRPCFetcher returns a new RPC fetcher which implemnts Fetch. -// Note: No rate limiting feeature. -func NewRPCFetcher(o RPCOpts) Fetch { - return &RPC{ - provider: o.RPCProvider, - } -} - -// Block fetches via RPC and transforms the response to adapt to the GraphQL JSON response struct. -func (f *RPC) Block(ctx context.Context, blockNumber uint64) (FetchResponse, error) { - var ( - block types.Block - fetchResponse FetchResponse - ) - - if err := f.provider.Client.CallCtx( - ctx, - eth.BlockByNumber(big.NewInt(int64(blockNumber))).Returns(&block), - ); err != nil { - return fetchResponse, err - } - - txCount := len(block.Transactions()) - batchCalls := make([]w3types.Caller, txCount*2) - - txs := make([]types.Transaction, txCount) - txsReceipt := make([]types.Receipt, txCount) - - // Prepare batch calls. - for i, tx := range block.Transactions() { - batchCalls[i] = eth.Tx(tx.Hash()).Returns(&txs[i]) - batchCalls[txCount+i] = eth.TxReceipt(tx.Hash()).Returns(&txsReceipt[i]) - } - - if err := f.provider.Client.CallCtx( - ctx, - batchCalls..., - ); err != nil { - return fetchResponse, err - } - - // Transform response and adapt to FetchResponse. - for i := 0; i < txCount; i++ { - var txObject Transaction - - txObject.Block.Number = block.NumberU64() - txObject.Block.Timestamp = hexutil.EncodeUint64(block.Time()) - - from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i]) - if err != nil { - return fetchResponse, err - } - txObject.From.Address = strings.ToLower(from.Hex()) - // This check ignores contract deployment transactions. - if txs[i].To() != nil { - txObject.To.Address = strings.ToLower(txs[i].To().Hex()) - } - txObject.Value = hexutil.EncodeBig(txs[i].Value()) - txObject.InputData = hexutil.Encode(txs[i].Data()) - - txObject.Hash = txsReceipt[i].TxHash.Hex() - txObject.Index = txsReceipt[i].TransactionIndex - txObject.Status = txsReceipt[i].Status - txObject.GasUsed = txsReceipt[i].GasUsed - - fetchResponse.Data.Block.Transactions = append( - fetchResponse.Data.Block.Transactions, - txObject, - ) - } - - return fetchResponse, nil -} diff --git a/pkg/fetch/rpc_test.go b/pkg/fetch/rpc_test.go deleted file mode 100644 index e586139..0000000 --- a/pkg/fetch/rpc_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package fetch - -import ( - "context" - "os" - "testing" - - celo "github.com/grassrootseconomics/cic-celo-sdk" - "github.com/stretchr/testify/suite" -) - -var ( - rpcEndpoint = os.Getenv("TEST_RPC_ENDPOINT") -) - -type RPCTestSuite struct { - suite.Suite - fetch Fetch -} - -func (s *RPCTestSuite) SetupSuite() { - celoProvider, err := celo.NewProvider(celo.ProviderOpts{ - ChainId: celo.MainnetChainId, - RpcEndpoint: rpcEndpoint, - }) - - if err != nil { - return - } - - s.fetch = NewRPCFetcher(RPCOpts{ - RPCProvider: celoProvider, - }) -} - -func (s *RPCTestSuite) Test_E2E_Fetch_Existing_Block() { - resp, err := s.fetch.Block(context.Background(), 14974600) - s.NoError(err) - s.Len(resp.Data.Block.Transactions, 3) -} - -func (s *RPCTestSuite) Test_E2E_Fetch_Non_Existing_Block() { - _, err := s.fetch.Block(context.Background(), 14974600000) - s.Error(err) -} - -func TestRPCSuite(t *testing.T) { - suite.Run(t, new(RPCTestSuite)) -}