refactor: filters, jetstream emitter

* add register filter
* update gas filter
This commit is contained in:
Mohamed Sohail 2023-02-24 10:21:54 +00:00
parent 0d5bc3a333
commit f0b65a59ad
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
11 changed files with 320 additions and 176 deletions

View File

@ -4,8 +4,12 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/grassrootseconomics/cic-chain-events/internal/events"
"github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/filter"
"github.com/nats-io/nats.go" )
var (
systemAddress = strings.ToLower("0x3D85285e39f05773aC92EAD27CB50a4385A529E4")
) )
func initAddressFilter() filter.Filter { func initAddressFilter() filter.Filter {
@ -14,26 +18,36 @@ func initAddressFilter() filter.Filter {
cache := &sync.Map{} cache := &sync.Map{}
// Example bootstrap addresses // Example bootstrap addresses
cache.Store(strings.ToLower("0x54c8D8718Ea9E7b2b4542e630fd36Ccab32cE74E"), "BABVoucher") cache.Store(strings.ToLower("0xB92463E2262E700e29c16416270c9Fdfa17934D7"), "TRNVoucher")
cache.Store(strings.ToLower("0xdD4F5ea484F6b16f031eF7B98F3810365493BC20"), "GasFaucet") cache.Store(strings.ToLower("0xf2a1fc19Ad275A0EAe3445798761FeD1Eea725d5"), "GasFaucet")
cache.Store(strings.ToLower("0x1e041282695C66944BfC53cabce947cf35CEaf87"), "AddressIndex")
return filter.NewAddressFilter(filter.AddressFilterOpts{ return filter.NewAddressFilter(filter.AddressFilterOpts{
Cache: cache, Cache: cache,
Logg: lo, Logg: lo,
SystemAddress: systemAddress,
}) })
} }
func initTransferFilter(jsCtx nats.JetStreamContext) filter.Filter { func initTransferFilter(eventEmitter events.EventEmitter) filter.Filter {
return filter.NewTransferFilter(filter.TransferFilterOpts{ return filter.NewTransferFilter(filter.TransferFilterOpts{
Logg: lo, EventEmitter: eventEmitter,
JSCtx: jsCtx, Logg: lo,
}) })
} }
func initGasGiftFilter(jsCtx nats.JetStreamContext) filter.Filter { func initGasGiftFilter(eventEmitter events.EventEmitter) filter.Filter {
return filter.NewGasFilter(filter.GasFilterOpts{ return filter.NewGasFilter(filter.GasFilterOpts{
Logg: lo, EventEmitter: eventEmitter,
JSCtx: jsCtx, Logg: lo,
SystemAddress: systemAddress,
})
}
func initRegisterFilter(eventEmitter events.EventEmitter) filter.Filter {
return filter.NewRegisterFilter(filter.RegisterFilterOpts{
EventEmitter: eventEmitter,
Logg: lo,
}) })
} }

View File

@ -4,6 +4,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/grassrootseconomics/cic-chain-events/internal/events"
"github.com/grassrootseconomics/cic-chain-events/internal/store" "github.com/grassrootseconomics/cic-chain-events/internal/store"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
@ -12,7 +13,6 @@ import (
"github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/providers/file"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
@ -23,6 +23,7 @@ func initLogger(debug bool) logf.Logger {
if debug { if debug {
loggOpts.Level = logf.DebugLevel loggOpts.Level = logf.DebugLevel
loggOpts.EnableCaller = true
} }
return logf.New(loggOpts) return logf.New(loggOpts)
@ -77,32 +78,16 @@ func initFetcher() fetch.Fetch {
}) })
} }
func initJetStream() (nats.JetStreamContext, error) { func initJetStream() (events.EventEmitter, error) {
natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint")) jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{
ServerUrl: ko.MustString("jetstream.endpoint"),
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour,
DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
js, err := natsConn.JetStream() return jsEmitter, nil
if err != nil {
return nil, err
}
// Bootstrap stream if it does not exist
stream, _ := js.StreamInfo(ko.MustString("jetstream.stream_name"))
if stream == nil {
lo.Info("jetstream: bootstrapping stream")
_, err = js.AddStream(&nats.StreamConfig{
Name: ko.MustString("jetstream.stream_name"),
MaxAge: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour,
Storage: nats.FileStorage,
Subjects: ko.MustStrings("jetstream.stream_subjects"),
Duplicates: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour,
})
if err != nil {
return nil, err
}
}
return js, nil
} }

View File

@ -69,8 +69,9 @@ func main() {
BlockFetcher: graphqlFetcher, BlockFetcher: graphqlFetcher,
Filters: []filter.Filter{ Filters: []filter.Filter{
initAddressFilter(), initAddressFilter(),
initTransferFilter(jsCtx),
initGasGiftFilter(jsCtx), initGasGiftFilter(jsCtx),
initTransferFilter(jsCtx),
initRegisterFilter(jsCtx),
}, },
Logg: lo, Logg: lo,
Store: pgStore, Store: pgStore,

View File

@ -1,43 +1,35 @@
[metrics] [metrics]
# Exposes Prometheus metrics # Exposes Prometheus metrics
# /metrics endpoint
go_process = true go_process = true
# API server # API server
[api] [api]
# Host and port # Host and port
address = ":8080" address = ":8085"
# Geth API endpoints # Geth API endpoints
[chain] [chain]
graphql_endpoint = "https://rpc.celo.grassecon.net/graphql" graphql_endpoint = "https://rpc.alfajores.celo.grassecon.net/graphql"
ws_endpoint = "wss://socket.celo.grassecon.net" ws_endpoint = "wss://ws.alfajores.celo.grassecon.net"
# Syncer configs
[syncer] [syncer]
# Number of goroutines assigned to the janitor worker pool # Maximum number of missing blocks pushed into the worker queue every janitor sweep
janitor_queue_size = 500
# Number of goroutines assigned to the worker pool
janitor_concurrency = 5 janitor_concurrency = 5
# Syncer start block # Syncer start block
initial_lower_bound = 17269000 initial_lower_bound = 16373156
# Max blocks in worker queue awaiting processing # Janitor sweep interval, should take into account concurrency and queue_size
janitor_queue_size = 500
# Janitor sweep interval
janitor_sweep_interval = 5 janitor_sweep_interval = 5
[postgres] [postgres]
# Default is the Docker container DSN
dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events" dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events"
# https://docs.nats.io/ # https://docs.nats.io/
[jetstream] [jetstream]
endpoint = "nats://localhost:4222" endpoint = "nats://localhost:4222"
stream_name = "CHAIN" # Duration JetStream should keep the message before GC
# Duration JetStream should keep the message before remocing it from the persistent store
persist_duration_hours = 48 persist_duration_hours = 48
# Duration to ignore duplicate transactions (e.g. due to restart) # Duration to ignore duplicate transactions (e.g. due to restart)
dedup_duration_hours = 6 dedup_duration_hours = 6
# Stream subjects
stream_subjects = [
"CHAIN.transfer",
"CHAIN.transferFrom",
"CHAIN.mintTo"
]

17
internal/events/events.go Normal file
View File

@ -0,0 +1,17 @@
package events
type EventEmitter interface {
Close()
Publish(subject string, dedupId string, eventPayload interface{}) error
}
type MinimalTxInfo struct {
Block uint64 `json:"block"`
From string `json:"from"`
To string `json:"to"`
ContractAddress string `json:"contractAddress"`
Success bool `json:"success"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
Value uint64 `json:"value"`
}

View File

@ -0,0 +1,78 @@
package events
import (
"time"
"github.com/goccy/go-json"
"github.com/nats-io/nats.go"
)
const (
StreamName string = "CHAIN"
StreamSubjects string = "CHAIN.*"
)
type JetStreamOpts struct {
ServerUrl string
PersistDuration time.Duration
DedupDuration time.Duration
}
type JetStream struct {
jsCtx nats.JetStreamContext
nc *nats.Conn
}
func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) {
natsConn, err := nats.Connect(o.ServerUrl)
if err != nil {
return nil, err
}
js, err := natsConn.JetStream()
if err != nil {
return nil, err
}
// Bootstrap stream if it doesn't exist.
stream, _ := js.StreamInfo(StreamName)
if stream == nil {
_, err = js.AddStream(&nats.StreamConfig{
Name: StreamName,
MaxAge: o.PersistDuration,
Storage: nats.FileStorage,
Subjects: []string{StreamSubjects},
Duplicates: o.DedupDuration,
})
if err != nil {
return nil, err
}
}
return &JetStream{
jsCtx: js,
nc: natsConn,
}, nil
}
// Close gracefully shutdowns the JetStream connection.
func (js *JetStream) Close() {
if js.nc != nil {
js.nc.Close()
}
}
// Publish publishes the JSON data to the NATS stream.
func (js *JetStream) Publish(subject string, dedupId string, eventPayload interface{}) error {
jsonData, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId))
if err != nil {
return err
}
return nil
}

View File

@ -9,23 +9,30 @@ import (
) )
type AddressFilterOpts struct { type AddressFilterOpts struct {
Cache *sync.Map Cache *sync.Map
Logg logf.Logger Logg logf.Logger
SystemAddress string
} }
type AddressFilter struct { type AddressFilter struct {
cache *sync.Map cache *sync.Map
logg logf.Logger logg logf.Logger
systemAddress string
} }
func NewAddressFilter(o AddressFilterOpts) Filter { func NewAddressFilter(o AddressFilterOpts) Filter {
return &AddressFilter{ return &AddressFilter{
cache: o.Cache, cache: o.Cache,
logg: o.Logg, logg: o.Logg,
systemAddress: o.SystemAddress,
} }
} }
func (f *AddressFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { func (f *AddressFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
if transaction.From.Address == f.systemAddress {
return true, nil
}
if _, found := f.cache.Load(transaction.To.Address); found { if _, found := f.cache.Load(transaction.To.Address); found {
return true, nil return true, nil
} }

View File

@ -7,7 +7,6 @@ import (
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/zerodha/logf"
) )
type AddressFilterSuite struct { type AddressFilterSuite struct {
@ -20,15 +19,8 @@ func (s *AddressFilterSuite) SetupSuite() {
addressCache.Store("0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9f", "") addressCache.Store("0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9f", "")
logg := logf.New(
logf.Opts{
Level: logf.DebugLevel,
},
)
s.filter = NewAddressFilter(AddressFilterOpts{ s.filter = NewAddressFilter(AddressFilterOpts{
Cache: addressCache, Cache: addressCache,
Logg: logg,
}) })
} }

View File

@ -2,78 +2,67 @@ package filter
import ( import (
"context" "context"
"encoding/json"
"github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/cic-chain-events/internal/events"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
var ( const (
giveToSig = w3.MustNewFunc("giveTo(address)", "uint256") gasFilterEventSubject = "CHAIN.gas"
) )
type GasFilterOpts struct { type GasFilterOpts struct {
Logg logf.Logger EventEmitter events.EventEmitter
JSCtx nats.JetStreamContext Logg logf.Logger
SystemAddress string
} }
type GasFilter struct { type GasFilter struct {
logg logf.Logger eventEmitter events.EventEmitter
js nats.JetStreamContext logg logf.Logger
} systemAddress string
type minimalGasGiftTxInfo struct {
Block uint64 `json:"block"`
Success bool `json:"success"`
To string `json:"to"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
} }
func NewGasFilter(o GasFilterOpts) Filter { func NewGasFilter(o GasFilterOpts) Filter {
return &GasFilter{ return &GasFilter{
logg: o.Logg, eventEmitter: o.EventEmitter,
js: o.JSCtx, logg: o.Logg,
systemAddress: o.SystemAddress,
} }
} }
func (f *GasFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { func (f *GasFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
switch transaction.InputData[:10] { transferValue, err := hexutil.DecodeUint64(transaction.Value)
case "0x63e4bff4": if err != nil {
var ( return false, err
to common.Address }
)
if err := giveToSig.DecodeArgs(w3.B(transaction.InputData), &to); err != nil { // TODO: This is a temporary shortcut to gift gas. Switch to gas faucet contract.
return false, err if transaction.From.Address == f.systemAddress && transferValue > 0 {
} transferEvent := &events.MinimalTxInfo{
transferEvent := &minimalGasGiftTxInfo{
Block: transaction.Block.Number, Block: transaction.Block.Number,
To: to.Hex(), To: transaction.To.Address,
TxHash: transaction.Hash, TxHash: transaction.Hash,
TxIndex: transaction.Index, TxIndex: transaction.Index,
Value: transferValue,
} }
if transaction.Status == 1 { if transaction.Status == 1 {
transferEvent.Success = true transferEvent.Success = true
} }
json, err := json.Marshal(transferEvent) if err := f.eventEmitter.Publish(
if err != nil { gasFilterEventSubject,
return false, err transaction.Hash,
} transferEvent,
); err != nil {
_, err = f.js.Publish("CHAIN.gasGiveTo", json, nats.MsgId(transaction.Hash))
if err != nil {
return false, err return false, err
} }
return true, nil return true, nil
default:
return false, nil
} }
return true, nil
} }

View File

@ -0,0 +1,74 @@
package filter
import (
"context"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/cic-chain-events/internal/events"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/zerodha/logf"
)
const (
registerEventSubject = "CHAIN.register"
)
var (
addSig = w3.MustNewFunc("add(address)", "bool")
)
type RegisterFilterOpts struct {
EventEmitter events.EventEmitter
Logg logf.Logger
}
type RegisterFilter struct {
eventEmitter events.EventEmitter
logg logf.Logger
}
func NewRegisterFilter(o RegisterFilterOpts) Filter {
return &RegisterFilter{
eventEmitter: o.EventEmitter,
logg: o.Logg,
}
}
func (f *RegisterFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
if len(transaction.InputData) < 10 {
return true, nil
}
if transaction.InputData[:10] == "0x0a3b0a4f" {
var address common.Address
if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
return false, err
}
addEvent := &events.MinimalTxInfo{
Block: transaction.Block.Number,
ContractAddress: transaction.To.Address,
To: transaction.To.Address,
TxHash: transaction.Hash,
TxIndex: transaction.Index,
}
if transaction.Status == 1 {
addEvent.Success = true
}
if err := f.eventEmitter.Publish(
registerEventSubject,
transaction.Hash,
addEvent,
); err != nil {
return false, err
}
return true, nil
}
return true, nil
}

View File

@ -2,16 +2,19 @@ package filter
import ( import (
"context" "context"
"encoding/json"
"math/big" "math/big"
"github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/cic-chain-events/internal/events"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/grassrootseconomics/w3-celo-patch" "github.com/grassrootseconomics/w3-celo-patch"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
const (
transferFilterEventSubject = "CHAIN.transfer"
)
var ( var (
transferSig = w3.MustNewFunc("transfer(address, uint256)", "bool") transferSig = w3.MustNewFunc("transfer(address, uint256)", "bool")
transferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool") transferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool")
@ -19,34 +22,27 @@ var (
) )
type TransferFilterOpts struct { type TransferFilterOpts struct {
Logg logf.Logger EventEmitter events.EventEmitter
JSCtx nats.JetStreamContext Logg logf.Logger
} }
type TransferFilter struct { type TransferFilter struct {
logg logf.Logger eventEmitter events.EventEmitter
js nats.JetStreamContext logg logf.Logger
}
type minimalTxInfo struct {
Block uint64 `json:"block"`
From string `json:"from"`
Success bool `json:"success"`
To string `json:"to"`
TokenAddress string `json:"tokenAddress"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
Value uint64 `json:"value"`
} }
func NewTransferFilter(o TransferFilterOpts) Filter { func NewTransferFilter(o TransferFilterOpts) Filter {
return &TransferFilter{ return &TransferFilter{
logg: o.Logg, eventEmitter: o.EventEmitter,
js: o.JSCtx, logg: o.Logg,
} }
} }
func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
if len(transaction.InputData) < 10 {
return true, nil
}
switch transaction.InputData[:10] { switch transaction.InputData[:10] {
case "0xa9059cbb": case "0xa9059cbb":
var ( var (
@ -58,27 +54,27 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio
return false, err return false, err
} }
transferEvent := &minimalTxInfo{ f.logg.Debug("transfer_filter: new reg", "transfer", to)
Block: transaction.Block.Number,
From: transaction.From.Address, transferEvent := &events.MinimalTxInfo{
To: to.Hex(), Block: transaction.Block.Number,
TokenAddress: transaction.To.Address, From: transaction.From.Address,
TxHash: transaction.Hash, To: to.Hex(),
TxIndex: transaction.Index, ContractAddress: transaction.To.Address,
Value: value.Uint64(), TxHash: transaction.Hash,
TxIndex: transaction.Index,
Value: value.Uint64(),
} }
if transaction.Status == 1 { if transaction.Status == 1 {
transferEvent.Success = true transferEvent.Success = true
} }
json, err := json.Marshal(transferEvent) if err := f.eventEmitter.Publish(
if err != nil { transferFilterEventSubject,
return false, err transaction.Hash,
} transferEvent,
); err != nil {
_, err = f.js.Publish("CHAIN.transfer", json, nats.MsgId(transaction.Hash))
if err != nil {
return false, err return false, err
} }
@ -94,27 +90,27 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio
return false, err return false, err
} }
transferFromEvent := &minimalTxInfo{ f.logg.Debug("transfer_filter: new reg", "transferFrom", to)
Block: transaction.Block.Number,
From: from.Hex(), transferEvent := &events.MinimalTxInfo{
To: to.Hex(), Block: transaction.Block.Number,
TokenAddress: transaction.To.Address, From: from.Hex(),
TxHash: transaction.Hash, To: to.Hex(),
TxIndex: transaction.Index, ContractAddress: transaction.To.Address,
Value: value.Uint64(), TxHash: transaction.Hash,
TxIndex: transaction.Index,
Value: value.Uint64(),
} }
if transaction.Status == 1 { if transaction.Status == 1 {
transferFromEvent.Success = true transferEvent.Success = true
} }
json, err := json.Marshal(transferFromEvent) if err := f.eventEmitter.Publish(
if err != nil { transferFilterEventSubject,
return false, err transaction.Hash,
} transferEvent,
); err != nil {
_, err = f.js.Publish("CHAIN.transferFrom", json, nats.MsgId(transaction.Hash))
if err != nil {
return false, err return false, err
} }
@ -129,33 +125,32 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio
return false, err return false, err
} }
mintToEvent := &minimalTxInfo{ f.logg.Debug("transfer_filter: new reg", "mintTo", to)
Block: transaction.Block.Number,
From: transaction.From.Address, transferEvent := &events.MinimalTxInfo{
To: to.Hex(), Block: transaction.Block.Number,
TokenAddress: transaction.To.Address, From: transaction.From.Address,
TxHash: transaction.Hash, To: to.Hex(),
TxIndex: transaction.Index, ContractAddress: transaction.To.Address,
Value: value.Uint64(), TxHash: transaction.Hash,
TxIndex: transaction.Index,
Value: value.Uint64(),
} }
if transaction.Status == 1 { if transaction.Status == 1 {
mintToEvent.Success = true transferEvent.Success = true
} }
json, err := json.Marshal(mintToEvent) if err := f.eventEmitter.Publish(
if err != nil { transferFilterEventSubject,
return false, err transaction.Hash,
} transferEvent,
); err != nil {
_, err = f.js.Publish("CHAIN.mintTo", json, nats.MsgId(transaction.Hash))
if err != nil {
return false, err return false, err
} }
return true, nil return true, nil
default: default:
// Skip and continue to next filter
return true, nil return true, nil
} }
} }