From 2161e8936b0971fca768a7532ca18177a8f5f8ca Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 19 Jan 2023 13:36:21 +0300 Subject: [PATCH] feat: jetstream event full tx object * minor pass pointer down the pipleine --- internal/filter/address_filter.go | 2 +- internal/filter/address_filter_test.go | 2 +- internal/filter/decode_filter.go | 78 ++++++++++++++++++++++++-- internal/filter/decode_filter_test.go | 2 +- internal/filter/filter.go | 2 +- internal/pipeline/pipeline.go | 2 +- internal/syncer/janitor.go | 4 +- 7 files changed, 81 insertions(+), 11 deletions(-) diff --git a/internal/filter/address_filter.go b/internal/filter/address_filter.go index 12efe72..6705826 100644 --- a/internal/filter/address_filter.go +++ b/internal/filter/address_filter.go @@ -25,7 +25,7 @@ func NewAddressFilter(o AddressFilterOpts) Filter { } } -func (f *AddressFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { +func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { if _, found := f.cache.Load(transaction.To.Address); found { return true, nil } diff --git a/internal/filter/address_filter_test.go b/internal/filter/address_filter_test.go index 772ce98..45461fb 100644 --- a/internal/filter/address_filter_test.go +++ b/internal/filter/address_filter_test.go @@ -66,7 +66,7 @@ func (s *AddressFilterSuite) TestAddresses() { } for _, test := range tests { - next, err := s.filter.Execute(context.Background(), test.transactionData) + next, err := s.filter.Execute(context.Background(), &test.transactionData) s.NoError(err) s.Equal(test.want, next) } diff --git a/internal/filter/decode_filter.go b/internal/filter/decode_filter.go index bd17655..de4909a 100644 --- a/internal/filter/decode_filter.go +++ b/internal/filter/decode_filter.go @@ -2,7 +2,7 @@ package filter import ( "context" - "fmt" + "encoding/json" "math/big" "github.com/celo-org/celo-blockchain/common" @@ -28,6 +28,17 @@ type DecodeFilter struct { js nats.JetStreamContext } +type minimalTxInfo struct { + Block uint64 `json:"block"` + From string `json:"from"` + Success bool `json:"success"` + To string `json:"to"` + TokenAddress string `json:"tokenAddress"` + TxHash string `json:"transactionHash"` + TxIndex uint `json:"transactionIndex"` + Value uint64 `json:"value"` +} + func NewDecodeFilter(o DecodeFilterOpts) Filter { return &DecodeFilter{ logg: o.Logg, @@ -35,7 +46,7 @@ func NewDecodeFilter(o DecodeFilterOpts) Filter { } } -func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { +func (f *DecodeFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { switch transaction.InputData[:10] { case "0xa9059cbb": var ( @@ -47,7 +58,26 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) return false, err } - _, err := f.js.Publish("CHAIN.transfer", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash)) + transferEvent := &minimalTxInfo{ + Block: transaction.Block.Number, + From: transaction.From.Address, + To: to.Hex(), + TokenAddress: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: value.Uint64(), + } + + if transaction.Status == 1 { + transferEvent.Success = true + } + + json, err := json.Marshal(transferEvent) + if err != nil { + return false, err + } + + _, err = f.js.Publish("CHAIN.transfer", json, nats.MsgId(transaction.Hash)) if err != nil { return false, err } @@ -64,7 +94,26 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) return false, err } - _, err := f.js.Publish("CHAIN.transferFrom", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash)) + transferFromEvent := &minimalTxInfo{ + Block: transaction.Block.Number, + From: from.Hex(), + To: to.Hex(), + TokenAddress: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: value.Uint64(), + } + + if transaction.Status == 1 { + transferFromEvent.Success = true + } + + json, err := json.Marshal(transferFromEvent) + if err != nil { + return false, err + } + + _, err = f.js.Publish("CHAIN.transferFrom", json, nats.MsgId(transaction.Hash)) if err != nil { return false, err } @@ -80,7 +129,26 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) return false, err } - _, err := f.js.Publish("CHAIN.mintTo", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash)) + mintToEvent := &minimalTxInfo{ + Block: transaction.Block.Number, + From: transaction.From.Address, + To: to.Hex(), + TokenAddress: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: value.Uint64(), + } + + if transaction.Status == 1 { + mintToEvent.Success = true + } + + json, err := json.Marshal(mintToEvent) + if err != nil { + return false, err + } + + _, err = f.js.Publish("CHAIN.mintTo", json, nats.MsgId(transaction.Hash)) if err != nil { return false, err } diff --git a/internal/filter/decode_filter_test.go b/internal/filter/decode_filter_test.go index f801487..234d304 100644 --- a/internal/filter/decode_filter_test.go +++ b/internal/filter/decode_filter_test.go @@ -66,7 +66,7 @@ func (s *DecodeFilterSuite) TestTranfserInputs() { } for _, test := range tests { - next, err := s.filter.Execute(context.Background(), test.transactionData) + next, err := s.filter.Execute(context.Background(), &test.transactionData) s.NoError(err) s.Equal(test.want, next) } diff --git a/internal/filter/filter.go b/internal/filter/filter.go index 4006a94..77d340c 100644 --- a/internal/filter/filter.go +++ b/internal/filter/filter.go @@ -8,5 +8,5 @@ import ( // Filter defines a read only filter which must return next as true/false or an error type Filter interface { - Execute(ctx context.Context, inputTransaction fetch.Transaction) (next bool, err error) + Execute(ctx context.Context, inputTransaction *fetch.Transaction) (next bool, err error) } diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 78872ff..a64edbc 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -49,7 +49,7 @@ func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error { for _, tx := range fetchResp.Data.Block.Transactions { for _, filter := range md.filters { - next, err := filter.Execute(ctx, tx) + next, err := filter.Execute(ctx, &tx) if err != nil { return err } diff --git a/internal/syncer/janitor.go b/internal/syncer/janitor.go index c78ab52..be7bf97 100644 --- a/internal/syncer/janitor.go +++ b/internal/syncer/janitor.go @@ -94,6 +94,8 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error { return err } + j.logg.Info("janitor: missing blocks", "count", j.stats.GetHeadCursor()-lowerBound) + rowsProcessed := 0 for rows.Next() { var blockNumber uint64 @@ -109,7 +111,7 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error { rowsProcessed++ } - j.logg.Debug("janitor: missing blocks count", "count", rowsProcessed) + j.logg.Debug("janitor: missing blocks to be processed", "count", rowsProcessed) if rowsProcessed == 0 { j.logg.Info("janitor: no gap! rasing lower bound", "new_lower_bound", upperBound) j.stats.UpdateLowerBound(upperBound)