devnet: snapshot

This commit is contained in:
Mohamed Sohail 2023-02-24 10:22:06 +03:00
parent 6df00ddfce
commit 85ef2ffaac
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
5 changed files with 114 additions and 26 deletions

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/filter"
"github.com/nats-io/nats.go"
) )
func initAddressFilter() filter.Filter { func initAddressFilter() filter.Filter {
@ -13,9 +14,8 @@ func initAddressFilter() filter.Filter {
cache := &sync.Map{} cache := &sync.Map{}
// Example bootstrap addresses // Example bootstrap addresses
cache.Store(strings.ToLower("0x617f3112bf5397D0467D315cC709EF968D9ba546"), "USDT") cache.Store(strings.ToLower("0x54c8D8718Ea9E7b2b4542e630fd36Ccab32cE74E"), "BABVoucher")
cache.Store(strings.ToLower("0x765DE816845861e75A25fCA122bb6898B8B1282a"), "cUSD") cache.Store(strings.ToLower("0xdD4F5ea484F6b16f031eF7B98F3810365493BC20"), "GasFaucet")
cache.Store(strings.ToLower("0xD8763CBa276a3738E6DE85b4b3bF5FDed6D6cA73"), "cEUR")
return filter.NewAddressFilter(filter.AddressFilterOpts{ return filter.NewAddressFilter(filter.AddressFilterOpts{
Cache: cache, Cache: cache,
@ -23,14 +23,17 @@ func initAddressFilter() filter.Filter {
}) })
} }
func initDecodeFilter() filter.Filter { func initTransferFilter(jsCtx nats.JetStreamContext) filter.Filter {
js, err := initJetStream() return filter.NewTransferFilter(filter.TransferFilterOpts{
if err != nil {
lo.Fatal("filters: critical error loading jetstream", "error", err)
}
return filter.NewDecodeFilter(filter.DecodeFilterOpts{
Logg: lo, Logg: lo,
JSCtx: js, JSCtx: jsCtx,
})
}
func initGasGiftFilter(jsCtx nats.JetStreamContext) filter.Filter {
return filter.NewGasFilter(filter.GasFilterOpts{
Logg: lo,
JSCtx: jsCtx,
}) })
} }

View File

@ -10,10 +10,10 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/grassrootseconomics/cic-chain-events/internal/filter"
"github.com/grassrootseconomics/cic-chain-events/internal/pipeline" "github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
"github.com/grassrootseconomics/cic-chain-events/internal/pool" "github.com/grassrootseconomics/cic-chain-events/internal/pool"
"github.com/grassrootseconomics/cic-chain-events/internal/syncer" "github.com/grassrootseconomics/cic-chain-events/internal/syncer"
"github.com/grassrootseconomics/cic-chain-events/internal/filter"
"github.com/knadh/goyesql/v2" "github.com/knadh/goyesql/v2"
"github.com/knadh/koanf" "github.com/knadh/koanf"
"github.com/zerodha/logf" "github.com/zerodha/logf"
@ -58,13 +58,19 @@ func main() {
lo.Fatal("main: critical error loading pg store", "error", err) lo.Fatal("main: critical error loading pg store", "error", err)
} }
jsCtx, err := initJetStream()
if err != nil {
lo.Fatal("main: critical error loading jetstream context", "error", err)
}
graphqlFetcher := initFetcher() graphqlFetcher := initFetcher()
pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{ pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{
BlockFetcher: graphqlFetcher, BlockFetcher: graphqlFetcher,
Filters: []filter.Filter{ Filters: []filter.Filter{
initAddressFilter(), initAddressFilter(),
initDecodeFilter(), initTransferFilter(jsCtx),
initGasGiftFilter(jsCtx),
}, },
Logg: lo, Logg: lo,
Store: pgStore, Store: pgStore,

View File

@ -0,0 +1,79 @@
package filter
import (
"context"
"encoding/json"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf"
)
var (
giveToSig = w3.MustNewFunc("giveTo(address)", "uint256")
)
type GasFilterOpts struct {
Logg logf.Logger
JSCtx nats.JetStreamContext
}
type GasFilter struct {
logg logf.Logger
js nats.JetStreamContext
}
type minimalGasGiftTxInfo struct {
Block uint64 `json:"block"`
Success bool `json:"success"`
To string `json:"to"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
}
func NewGasFilter(o GasFilterOpts) Filter {
return &GasFilter{
logg: o.Logg,
js: o.JSCtx,
}
}
func (f *GasFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
switch transaction.InputData[:10] {
case "0x63e4bff4":
var (
to common.Address
)
if err := giveToSig.DecodeArgs(w3.B(transaction.InputData), &to); err != nil {
return false, err
}
transferEvent := &minimalGasGiftTxInfo{
Block: transaction.Block.Number,
To: to.Hex(),
TxHash: transaction.Hash,
TxIndex: transaction.Index,
}
if transaction.Status == 1 {
transferEvent.Success = true
}
json, err := json.Marshal(transferEvent)
if err != nil {
return false, err
}
_, err = f.js.Publish("CHAIN.gasGiveTo", json, nats.MsgId(transaction.Hash))
if err != nil {
return false, err
}
return true, nil
default:
return false, nil
}
}

View File

@ -18,12 +18,12 @@ var (
mintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool") mintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool")
) )
type DecodeFilterOpts struct { type TransferFilterOpts struct {
Logg logf.Logger Logg logf.Logger
JSCtx nats.JetStreamContext JSCtx nats.JetStreamContext
} }
type DecodeFilter struct { type TransferFilter struct {
logg logf.Logger logg logf.Logger
js nats.JetStreamContext js nats.JetStreamContext
} }
@ -39,14 +39,14 @@ type minimalTxInfo struct {
Value uint64 `json:"value"` Value uint64 `json:"value"`
} }
func NewDecodeFilter(o DecodeFilterOpts) Filter { func NewTransferFilter(o TransferFilterOpts) Filter {
return &DecodeFilter{ return &TransferFilter{
logg: o.Logg, logg: o.Logg,
js: o.JSCtx, js: o.JSCtx,
} }
} }
func (f *DecodeFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
switch transaction.InputData[:10] { switch transaction.InputData[:10] {
case "0xa9059cbb": case "0xa9059cbb":
var ( var (
@ -155,7 +155,7 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction *fetch.Transaction
return true, nil return true, nil
default: default:
f.logg.Debug("unknownSignature", "inpuData", transaction.InputData) // Skip and continue to next filter
return false, nil return true, nil
} }
} }

View File

@ -9,24 +9,24 @@ import (
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
type DecodeFilterSuite struct { type TransferFilterSuite struct {
suite.Suite suite.Suite
filter Filter filter Filter
} }
func (s *DecodeFilterSuite) SetupSuite() { func (s *TransferFilterSuite) SetupSuite() {
logg := logf.New( logg := logf.New(
logf.Opts{ logf.Opts{
Level: logf.DebugLevel, Level: logf.DebugLevel,
}, },
) )
s.filter = NewDecodeFilter(DecodeFilterOpts{ s.filter = NewTransferFilter(TransferFilterOpts{
Logg: logg, Logg: logg,
}) })
} }
func (s *DecodeFilterSuite) TestTranfserInputs() { func (s *TransferFilterSuite) TestTranfserInputs() {
type testCase struct { type testCase struct {
transactionData fetch.Transaction transactionData fetch.Transaction
want bool want bool
@ -72,6 +72,6 @@ func (s *DecodeFilterSuite) TestTranfserInputs() {
} }
} }
func TestDecodeFilterSuite(t *testing.T) { func TestTransferFilterSuite(t *testing.T) {
suite.Run(t, new(DecodeFilterSuite)) suite.Run(t, new(TransferFilterSuite))
} }