From 85ef2ffaac98de307b9098324e905e13482eec3a Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Fri, 24 Feb 2023 10:22:06 +0300 Subject: [PATCH] devnet: snapshot --- cmd/filters.go | 25 +++--- cmd/main.go | 10 ++- internal/filter/gas_filter.go | 79 +++++++++++++++++++ .../{decode_filter.go => transfer_filter.go} | 14 ++-- ...filter_test.go => transfer_filter_test.go} | 12 +-- 5 files changed, 114 insertions(+), 26 deletions(-) create mode 100644 internal/filter/gas_filter.go rename internal/filter/{decode_filter.go => transfer_filter.go} (91%) rename internal/filter/{decode_filter_test.go => transfer_filter_test.go} (86%) diff --git a/cmd/filters.go b/cmd/filters.go index beaa591..0b40fc0 100644 --- a/cmd/filters.go +++ b/cmd/filters.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/grassrootseconomics/cic-chain-events/internal/filter" + "github.com/nats-io/nats.go" ) func initAddressFilter() filter.Filter { @@ -13,9 +14,8 @@ func initAddressFilter() filter.Filter { cache := &sync.Map{} // Example bootstrap addresses - cache.Store(strings.ToLower("0x617f3112bf5397D0467D315cC709EF968D9ba546"), "USDT") - cache.Store(strings.ToLower("0x765DE816845861e75A25fCA122bb6898B8B1282a"), "cUSD") - cache.Store(strings.ToLower("0xD8763CBa276a3738E6DE85b4b3bF5FDed6D6cA73"), "cEUR") + cache.Store(strings.ToLower("0x54c8D8718Ea9E7b2b4542e630fd36Ccab32cE74E"), "BABVoucher") + cache.Store(strings.ToLower("0xdD4F5ea484F6b16f031eF7B98F3810365493BC20"), "GasFaucet") return filter.NewAddressFilter(filter.AddressFilterOpts{ Cache: cache, @@ -23,14 +23,17 @@ func initAddressFilter() filter.Filter { }) } -func initDecodeFilter() filter.Filter { - js, err := initJetStream() - if err != nil { - lo.Fatal("filters: critical error loading jetstream", "error", err) - } - - return filter.NewDecodeFilter(filter.DecodeFilterOpts{ +func initTransferFilter(jsCtx nats.JetStreamContext) filter.Filter { + return filter.NewTransferFilter(filter.TransferFilterOpts{ Logg: lo, - JSCtx: js, + JSCtx: jsCtx, + }) + +} + +func initGasGiftFilter(jsCtx nats.JetStreamContext) filter.Filter { + return filter.NewGasFilter(filter.GasFilterOpts{ + Logg: lo, + JSCtx: jsCtx, }) } diff --git a/cmd/main.go b/cmd/main.go index 4091832..7ba961c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,10 +10,10 @@ import ( "syscall" "time" + "github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/pipeline" "github.com/grassrootseconomics/cic-chain-events/internal/pool" "github.com/grassrootseconomics/cic-chain-events/internal/syncer" - "github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/knadh/goyesql/v2" "github.com/knadh/koanf" "github.com/zerodha/logf" @@ -58,13 +58,19 @@ func main() { lo.Fatal("main: critical error loading pg store", "error", err) } + jsCtx, err := initJetStream() + if err != nil { + lo.Fatal("main: critical error loading jetstream context", "error", err) + } + graphqlFetcher := initFetcher() pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{ BlockFetcher: graphqlFetcher, Filters: []filter.Filter{ initAddressFilter(), - initDecodeFilter(), + initTransferFilter(jsCtx), + initGasGiftFilter(jsCtx), }, Logg: lo, Store: pgStore, diff --git a/internal/filter/gas_filter.go b/internal/filter/gas_filter.go new file mode 100644 index 0000000..bf841b0 --- /dev/null +++ b/internal/filter/gas_filter.go @@ -0,0 +1,79 @@ +package filter + +import ( + "context" + "encoding/json" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/nats-io/nats.go" + "github.com/zerodha/logf" +) + +var ( + giveToSig = w3.MustNewFunc("giveTo(address)", "uint256") +) + +type GasFilterOpts struct { + Logg logf.Logger + JSCtx nats.JetStreamContext +} + +type GasFilter struct { + logg logf.Logger + js nats.JetStreamContext +} + +type minimalGasGiftTxInfo struct { + Block uint64 `json:"block"` + Success bool `json:"success"` + To string `json:"to"` + TxHash string `json:"transactionHash"` + TxIndex uint `json:"transactionIndex"` +} + +func NewGasFilter(o GasFilterOpts) Filter { + return &GasFilter{ + logg: o.Logg, + js: o.JSCtx, + } +} + +func (f *GasFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { + switch transaction.InputData[:10] { + case "0x63e4bff4": + var ( + to common.Address + ) + + if err := giveToSig.DecodeArgs(w3.B(transaction.InputData), &to); err != nil { + return false, err + } + + transferEvent := &minimalGasGiftTxInfo{ + Block: transaction.Block.Number, + To: to.Hex(), + TxHash: transaction.Hash, + TxIndex: transaction.Index, + } + + if transaction.Status == 1 { + transferEvent.Success = true + } + + json, err := json.Marshal(transferEvent) + if err != nil { + return false, err + } + + _, err = f.js.Publish("CHAIN.gasGiveTo", json, nats.MsgId(transaction.Hash)) + if err != nil { + return false, err + } + + return true, nil + default: + return false, nil + } +} diff --git a/internal/filter/decode_filter.go b/internal/filter/transfer_filter.go similarity index 91% rename from internal/filter/decode_filter.go rename to internal/filter/transfer_filter.go index de4909a..cadc2ab 100644 --- a/internal/filter/decode_filter.go +++ b/internal/filter/transfer_filter.go @@ -18,12 +18,12 @@ var ( mintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool") ) -type DecodeFilterOpts struct { +type TransferFilterOpts struct { Logg logf.Logger JSCtx nats.JetStreamContext } -type DecodeFilter struct { +type TransferFilter struct { logg logf.Logger js nats.JetStreamContext } @@ -39,14 +39,14 @@ type minimalTxInfo struct { Value uint64 `json:"value"` } -func NewDecodeFilter(o DecodeFilterOpts) Filter { - return &DecodeFilter{ +func NewTransferFilter(o TransferFilterOpts) Filter { + return &TransferFilter{ logg: o.Logg, js: o.JSCtx, } } -func (f *DecodeFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { +func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { switch transaction.InputData[:10] { case "0xa9059cbb": var ( @@ -155,7 +155,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction *fetch.Transaction return true, nil default: - f.logg.Debug("unknownSignature", "inpuData", transaction.InputData) - return false, nil + // Skip and continue to next filter + return true, nil } } diff --git a/internal/filter/decode_filter_test.go b/internal/filter/transfer_filter_test.go similarity index 86% rename from internal/filter/decode_filter_test.go rename to internal/filter/transfer_filter_test.go index 234d304..1642f92 100644 --- a/internal/filter/decode_filter_test.go +++ b/internal/filter/transfer_filter_test.go @@ -9,24 +9,24 @@ import ( "github.com/zerodha/logf" ) -type DecodeFilterSuite struct { +type TransferFilterSuite struct { suite.Suite filter Filter } -func (s *DecodeFilterSuite) SetupSuite() { +func (s *TransferFilterSuite) SetupSuite() { logg := logf.New( logf.Opts{ Level: logf.DebugLevel, }, ) - s.filter = NewDecodeFilter(DecodeFilterOpts{ + s.filter = NewTransferFilter(TransferFilterOpts{ Logg: logg, }) } -func (s *DecodeFilterSuite) TestTranfserInputs() { +func (s *TransferFilterSuite) TestTranfserInputs() { type testCase struct { transactionData fetch.Transaction want bool @@ -72,6 +72,6 @@ func (s *DecodeFilterSuite) TestTranfserInputs() { } } -func TestDecodeFilterSuite(t *testing.T) { - suite.Run(t, new(DecodeFilterSuite)) +func TestTransferFilterSuite(t *testing.T) { + suite.Run(t, new(TransferFilterSuite)) }