feat: jetstream event full tx object

* minor pass pointer down the pipleine
This commit is contained in:
Mohamed Sohail 2023-01-19 13:36:21 +03:00
parent 7852d95335
commit 2161e8936b
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
7 changed files with 81 additions and 11 deletions

View File

@ -25,7 +25,7 @@ func NewAddressFilter(o AddressFilterOpts) Filter {
} }
} }
func (f *AddressFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
if _, found := f.cache.Load(transaction.To.Address); found { if _, found := f.cache.Load(transaction.To.Address); found {
return true, nil return true, nil
} }

View File

@ -66,7 +66,7 @@ func (s *AddressFilterSuite) TestAddresses() {
} }
for _, test := range tests { for _, test := range tests {
next, err := s.filter.Execute(context.Background(), test.transactionData) next, err := s.filter.Execute(context.Background(), &test.transactionData)
s.NoError(err) s.NoError(err)
s.Equal(test.want, next) s.Equal(test.want, next)
} }

View File

@ -2,7 +2,7 @@ package filter
import ( import (
"context" "context"
"fmt" "encoding/json"
"math/big" "math/big"
"github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/common"
@ -28,6 +28,17 @@ type DecodeFilter struct {
js nats.JetStreamContext js nats.JetStreamContext
} }
type minimalTxInfo struct {
Block uint64 `json:"block"`
From string `json:"from"`
Success bool `json:"success"`
To string `json:"to"`
TokenAddress string `json:"tokenAddress"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
Value uint64 `json:"value"`
}
func NewDecodeFilter(o DecodeFilterOpts) Filter { func NewDecodeFilter(o DecodeFilterOpts) Filter {
return &DecodeFilter{ return &DecodeFilter{
logg: o.Logg, logg: o.Logg,
@ -35,7 +46,7 @@ func NewDecodeFilter(o DecodeFilterOpts) Filter {
} }
} }
func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { func (f *DecodeFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
switch transaction.InputData[:10] { switch transaction.InputData[:10] {
case "0xa9059cbb": case "0xa9059cbb":
var ( var (
@ -47,7 +58,26 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction)
return false, err return false, err
} }
_, err := f.js.Publish("CHAIN.transfer", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash)) transferEvent := &minimalTxInfo{
Block: transaction.Block.Number,
From: transaction.From.Address,
To: to.Hex(),
TokenAddress: transaction.To.Address,
TxHash: transaction.Hash,
TxIndex: transaction.Index,
Value: value.Uint64(),
}
if transaction.Status == 1 {
transferEvent.Success = true
}
json, err := json.Marshal(transferEvent)
if err != nil {
return false, err
}
_, err = f.js.Publish("CHAIN.transfer", json, nats.MsgId(transaction.Hash))
if err != nil { if err != nil {
return false, err return false, err
} }
@ -64,7 +94,26 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction)
return false, err return false, err
} }
_, err := f.js.Publish("CHAIN.transferFrom", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash)) transferFromEvent := &minimalTxInfo{
Block: transaction.Block.Number,
From: from.Hex(),
To: to.Hex(),
TokenAddress: transaction.To.Address,
TxHash: transaction.Hash,
TxIndex: transaction.Index,
Value: value.Uint64(),
}
if transaction.Status == 1 {
transferFromEvent.Success = true
}
json, err := json.Marshal(transferFromEvent)
if err != nil {
return false, err
}
_, err = f.js.Publish("CHAIN.transferFrom", json, nats.MsgId(transaction.Hash))
if err != nil { if err != nil {
return false, err return false, err
} }
@ -80,7 +129,26 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction)
return false, err return false, err
} }
_, err := f.js.Publish("CHAIN.mintTo", []byte(fmt.Sprintf("%d:%d:%s", transaction.Block.Number, transaction.Index, transaction.Hash)), nats.MsgId(transaction.Hash)) mintToEvent := &minimalTxInfo{
Block: transaction.Block.Number,
From: transaction.From.Address,
To: to.Hex(),
TokenAddress: transaction.To.Address,
TxHash: transaction.Hash,
TxIndex: transaction.Index,
Value: value.Uint64(),
}
if transaction.Status == 1 {
mintToEvent.Success = true
}
json, err := json.Marshal(mintToEvent)
if err != nil {
return false, err
}
_, err = f.js.Publish("CHAIN.mintTo", json, nats.MsgId(transaction.Hash))
if err != nil { if err != nil {
return false, err return false, err
} }

View File

@ -66,7 +66,7 @@ func (s *DecodeFilterSuite) TestTranfserInputs() {
} }
for _, test := range tests { for _, test := range tests {
next, err := s.filter.Execute(context.Background(), test.transactionData) next, err := s.filter.Execute(context.Background(), &test.transactionData)
s.NoError(err) s.NoError(err)
s.Equal(test.want, next) s.Equal(test.want, next)
} }

View File

@ -8,5 +8,5 @@ import (
// Filter defines a read only filter which must return next as true/false or an error // Filter defines a read only filter which must return next as true/false or an error
type Filter interface { type Filter interface {
Execute(ctx context.Context, inputTransaction fetch.Transaction) (next bool, err error) Execute(ctx context.Context, inputTransaction *fetch.Transaction) (next bool, err error)
} }

View File

@ -49,7 +49,7 @@ func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error {
for _, tx := range fetchResp.Data.Block.Transactions { for _, tx := range fetchResp.Data.Block.Transactions {
for _, filter := range md.filters { for _, filter := range md.filters {
next, err := filter.Execute(ctx, tx) next, err := filter.Execute(ctx, &tx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -94,6 +94,8 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
return err return err
} }
j.logg.Info("janitor: missing blocks", "count", j.stats.GetHeadCursor()-lowerBound)
rowsProcessed := 0 rowsProcessed := 0
for rows.Next() { for rows.Next() {
var blockNumber uint64 var blockNumber uint64
@ -109,7 +111,7 @@ func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
rowsProcessed++ rowsProcessed++
} }
j.logg.Debug("janitor: missing blocks count", "count", rowsProcessed) j.logg.Debug("janitor: missing blocks to be processed", "count", rowsProcessed)
if rowsProcessed == 0 { if rowsProcessed == 0 {
j.logg.Info("janitor: no gap! rasing lower bound", "new_lower_bound", upperBound) j.logg.Info("janitor: no gap! rasing lower bound", "new_lower_bound", upperBound)
j.stats.UpdateLowerBound(upperBound) j.stats.UpdateLowerBound(upperBound)