major refactor: bootstrap from registry map and token index (see notes)

* the cache is now bootstrapped from the token index and registry map
* a token index filter auto updates the cache when a token is added (using add sig)
* minor fixes to filters
This commit is contained in:
Mohamed Sohail 2023-03-29 16:17:30 +00:00
parent f592a6237d
commit 8085168ed3
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
12 changed files with 232 additions and 88 deletions

View File

@ -1,33 +1,71 @@
package main package main
import ( import (
"context"
"fmt"
"math/big"
"strings" "strings"
"sync" "sync"
"time"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/filter"
"github.com/grassrootseconomics/cic-chain-events/internal/pub" "github.com/grassrootseconomics/cic-chain-events/internal/pub"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/grassrootseconomics/w3-celo-patch/module/eth"
"github.com/grassrootseconomics/w3-celo-patch/w3types"
) )
var ( func initAddressFilter(celoProvider *celoutils.Provider, cache *sync.Map) filter.Filter {
systemAddress string var (
) tokenIndexEntryCount big.Int
)
func initAddressFilter() filter.Filter { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// TODO: Temporary shortcut defer cancel()
systemAddress = strings.ToLower(ko.MustString("chain.system_address"))
// TODO: Bootstrap addresses from smart contract registryMap, err := celoProvider.RegistryMap(ctx, celoutils.HexToAddress(ko.MustString("chain.registry_address")))
// TODO: Add route to update cache if err != nil {
cache := &sync.Map{} lo.Fatal("init: critical error creating address filter", "error", err)
}
cache.Store(strings.ToLower(ko.MustString("chain.token_index_address")), "TokenIndex") for k, v := range registryMap {
cache.Store(strings.ToLower(ko.MustString("chain.gas_faucet_address")), "GasFaucet") cache.Store(strings.ToLower(v.Hex()), k)
cache.Store(strings.ToLower(ko.MustString("chain.user_index_address")), "UserIndex") }
if err := celoProvider.Client.CallCtx(
ctx,
eth.CallFunc(w3.MustNewFunc("entryCount()", "uint256"), registryMap[celoutils.TokenIndex]).Returns(&tokenIndexEntryCount),
); err != nil {
lo.Fatal("init: critical error creating address filter", "error", err)
}
calls := make([]w3types.Caller, tokenIndexEntryCount.Int64())
tokenAddresses := make([]common.Address, tokenIndexEntryCount.Int64())
entrySig := w3.MustNewFunc("entry(uint256 _idx)", "address")
// TODO: There is a 5MB limit to a RPC batch call size.
// Test if 10k entries will raise an error (future proofed for a lot of years)
for i := 0; i < int(tokenIndexEntryCount.Int64()); i++ {
calls[i] = eth.CallFunc(entrySig, registryMap[celoutils.TokenIndex], new(big.Int).SetInt64(int64(i))).Returns(&tokenAddresses[i])
}
if err := celoProvider.Client.CallCtx(
ctx,
calls...,
); err != nil {
lo.Fatal("init: critical error creating address filter", "error", err)
}
for i, v := range tokenAddresses {
cache.Store(strings.ToLower(v.Hex()), fmt.Sprintf("TOKEN_%d", i))
}
return filter.NewAddressFilter(filter.AddressFilterOpts{ return filter.NewAddressFilter(filter.AddressFilterOpts{
Cache: cache, Cache: cache,
Logg: lo, Logg: lo,
SystemAddress: systemAddress,
}) })
} }
@ -41,9 +79,8 @@ func initTransferFilter(pub *pub.Pub) filter.Filter {
func initGasGiftFilter(pub *pub.Pub) filter.Filter { func initGasGiftFilter(pub *pub.Pub) filter.Filter {
return filter.NewGasFilter(filter.GasFilterOpts{ return filter.NewGasFilter(filter.GasFilterOpts{
Pub: pub, Pub: pub,
Logg: lo, Logg: lo,
SystemAddress: systemAddress,
}) })
} }
@ -53,3 +90,11 @@ func initRegisterFilter(pub *pub.Pub) filter.Filter {
Logg: lo, Logg: lo,
}) })
} }
func initTokenIndexFilter(cache *sync.Map, pub *pub.Pub) filter.Filter {
return filter.NewTokenIndexFilter(filter.TokenIndexFilterOpts{
Cache: cache,
Pub: pub,
Logg: lo,
})
}

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/alitto/pond" "github.com/alitto/pond"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-chain-events/internal/pool" "github.com/grassrootseconomics/cic-chain-events/internal/pool"
"github.com/grassrootseconomics/cic-chain-events/internal/pub" "github.com/grassrootseconomics/cic-chain-events/internal/pub"
"github.com/grassrootseconomics/cic-chain-events/internal/store" "github.com/grassrootseconomics/cic-chain-events/internal/store"
@ -128,3 +129,22 @@ func initPub(natsConn *nats.Conn, jsCtx nats.JetStreamContext) *pub.Pub {
return pub return pub
} }
func initCeloProvider() *celoutils.Provider {
providerOpts := celoutils.ProviderOpts{
RpcEndpoint: ko.MustString("chain.rpc_endpoint"),
}
if ko.Bool("chain.testnet") {
providerOpts.ChainId = celoutils.TestnetChainId
} else {
providerOpts.ChainId = celoutils.MainnetChainId
}
provider, err := celoutils.NewProvider(providerOpts)
if err != nil {
lo.Fatal("init: critical error loading chain provider", "error", err)
}
return provider
}

View File

@ -17,7 +17,7 @@ import (
) )
type ( type (
internalServiceContainer struct { internalServicesContainer struct {
apiService *echo.Echo apiService *echo.Echo
pub *pub.Pub pub *pub.Pub
} }
@ -55,19 +55,23 @@ func main() {
natsConn, jsCtx := initJetStream() natsConn, jsCtx := initJetStream()
jsPub := initPub(natsConn, jsCtx) jsPub := initPub(natsConn, jsCtx)
celoProvider := initCeloProvider()
cache := &sync.Map{}
pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{ pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{
BlockFetcher: graphqlFetcher, BlockFetcher: graphqlFetcher,
Filters: []filter.Filter{ Filters: []filter.Filter{
initAddressFilter(), initAddressFilter(celoProvider, cache),
initGasGiftFilter(jsPub), initGasGiftFilter(jsPub),
initTransferFilter(jsPub), initTransferFilter(jsPub),
initRegisterFilter(jsPub), initRegisterFilter(jsPub),
initTokenIndexFilter(cache, jsPub),
}, },
Logg: lo, Logg: lo,
Store: pgStore, Store: pgStore,
}) })
internalServices := &internalServiceContainer{ internalServices := &internalServicesContainer{
pub: jsPub, pub: jsPub,
} }
syncerStats := &syncer.Stats{} syncerStats := &syncer.Stats{}

View File

@ -17,7 +17,7 @@ func createSigChannel() (chan os.Signal, func()) {
} }
} }
func startGracefulShutdown(ctx context.Context, internalServices *internalServiceContainer) { func startGracefulShutdown(ctx context.Context, internalServices *internalServicesContainer) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()

View File

@ -9,12 +9,11 @@ address = ":5000"
# Geth API endpoints # Geth API endpoints
[chain] [chain]
graphql_endpoint = "" graphql_endpoint = ""
ws_endpoint = "" ws_endpoint = ""
system_address = "" rpc_endpoint = ""
token_index_address = "" testnet = true
gas_faucet_address = "" registry_address = ""
user_index_address = ""
# Syncer configs # Syncer configs
[syncer] [syncer]

2
go.mod
View File

@ -6,7 +6,7 @@ require (
github.com/VictoriaMetrics/metrics v1.23.1 github.com/VictoriaMetrics/metrics v1.23.1
github.com/alitto/pond v1.8.3 github.com/alitto/pond v1.8.3
github.com/celo-org/celo-blockchain v1.7.2 github.com/celo-org/celo-blockchain v1.7.2
github.com/grassrootseconomics/celoutils v1.1.1 github.com/grassrootseconomics/celoutils v1.2.1
github.com/grassrootseconomics/w3-celo-patch v0.2.0 github.com/grassrootseconomics/w3-celo-patch v0.2.0
github.com/jackc/pgx/v5 v5.3.1 github.com/jackc/pgx/v5 v5.3.1
github.com/jackc/tern/v2 v2.0.1 github.com/jackc/tern/v2 v2.0.1

2
go.sum
View File

@ -269,6 +269,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/grassrootseconomics/celoutils v1.1.1 h1:REsndvfBkPN8UKOoQFNEGm/sCwKtTm+woYtgMl3bfZ0= github.com/grassrootseconomics/celoutils v1.1.1 h1:REsndvfBkPN8UKOoQFNEGm/sCwKtTm+woYtgMl3bfZ0=
github.com/grassrootseconomics/celoutils v1.1.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q= github.com/grassrootseconomics/celoutils v1.1.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q=
github.com/grassrootseconomics/celoutils v1.2.1 h1:ndM4h7Df0d57m2kdRXRStrnunqOL61wQ51rnOanX1KI=
github.com/grassrootseconomics/celoutils v1.2.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q=
github.com/grassrootseconomics/w3-celo-patch v0.2.0 h1:YqibbPzX0tQKmxU1nUGzThPKk/fiYeYZY6Aif3eyu8U= github.com/grassrootseconomics/w3-celo-patch v0.2.0 h1:YqibbPzX0tQKmxU1nUGzThPKk/fiYeYZY6Aif3eyu8U=
github.com/grassrootseconomics/w3-celo-patch v0.2.0/go.mod h1:WhBXNzNIvHmS6B2hAeShs56oa9Azb4jQSrOMKuMdBWw= github.com/grassrootseconomics/w3-celo-patch v0.2.0/go.mod h1:WhBXNzNIvHmS6B2hAeShs56oa9Azb4jQSrOMKuMdBWw=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=

View File

@ -10,31 +10,24 @@ import (
type ( type (
AddressFilterOpts struct { AddressFilterOpts struct {
Cache *sync.Map Cache *sync.Map
Logg logf.Logger Logg logf.Logger
SystemAddress string
} }
AddressFilter struct { AddressFilter struct {
cache *sync.Map cache *sync.Map
logg logf.Logger logg logf.Logger
systemAddress string
} }
) )
func NewAddressFilter(o AddressFilterOpts) Filter { func NewAddressFilter(o AddressFilterOpts) Filter {
return &AddressFilter{ return &AddressFilter{
cache: o.Cache, cache: o.Cache,
logg: o.Logg, logg: o.Logg,
systemAddress: o.SystemAddress,
} }
} }
func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
if transaction.From.Address == f.systemAddress {
return true, nil
}
if _, found := f.cache.Load(transaction.To.Address); found { if _, found := f.cache.Load(transaction.To.Address); found {
return true, nil return true, nil
} }

View File

@ -3,63 +3,69 @@ package filter
import ( import (
"context" "context"
"github.com/celo-org/celo-blockchain/common/hexutil" "github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-chain-events/internal/pub" "github.com/grassrootseconomics/cic-chain-events/internal/pub"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
const ( const (
gasFilterEventSubject = "CHAIN.gas" gasEventSubject = "CHAIN.gas"
)
var (
giveToSig = w3.MustNewFunc("giveTo(address)", "uint256")
) )
type ( type (
GasFilterOpts struct { GasFilterOpts struct {
Logg logf.Logger Logg logf.Logger
Pub *pub.Pub Pub *pub.Pub
SystemAddress string
} }
GasFilter struct { GasFilter struct {
logg logf.Logger logg logf.Logger
pub *pub.Pub pub *pub.Pub
systemAddress string
} }
) )
func NewGasFilter(o GasFilterOpts) Filter { func NewGasFilter(o GasFilterOpts) Filter {
return &GasFilter{ return &GasFilter{
logg: o.Logg, logg: o.Logg,
pub: o.Pub, pub: o.Pub,
systemAddress: o.SystemAddress,
} }
} }
func (f *GasFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { func (f *GasFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
transferValue, err := hexutil.DecodeUint64(transaction.Value) if len(transaction.InputData) < 10 {
if err != nil { return true, nil
return false, err
} }
// TODO: This is a temporary shortcut to gift gas. Switch to gas faucet contract. if transaction.InputData[:10] == "0x63e4bff4" {
if transaction.From.Address == f.systemAddress && transferValue > 0 { var address common.Address
transferEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number, if err := giveToSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
To: celoutils.ChecksumAddress(transaction.To.Address), return false, err
TxHash: transaction.Hash, }
TxIndex: transaction.Index,
Value: transferValue, giveToEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number,
ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
To: address.Hex(),
TxHash: transaction.Hash,
TxIndex: transaction.Index,
} }
if transaction.Status == 1 { if transaction.Status == 1 {
transferEvent.Success = true giveToEvent.Success = true
} }
if err := f.pub.Publish( if err := f.pub.Publish(
gasFilterEventSubject, gasEventSubject,
transaction.Hash, transaction.Hash,
transferEvent, giveToEvent,
); err != nil { ); err != nil {
return false, err return false, err
} }

View File

@ -16,44 +16,44 @@ const (
) )
var ( var (
addSig = w3.MustNewFunc("add(address)", "bool") registerSig = w3.MustNewFunc("register(address)", "")
) )
type ( type (
RegisterFilterOpts struct { RegisterFilterOpts struct {
Logg logf.Logger Logg logf.Logger
Pub *pub.Pub Pub *pub.Pub
} }
RegisterFilter struct { RegisterFilter struct {
logg logf.Logger logg logf.Logger
pub *pub.Pub pub *pub.Pub
} }
) )
func NewRegisterFilter(o RegisterFilterOpts) Filter { func NewRegisterFilter(o RegisterFilterOpts) Filter {
return &RegisterFilter{ return &RegisterFilter{
logg: o.Logg, logg: o.Logg,
pub: o.Pub, pub: o.Pub,
} }
} }
func (f *RegisterFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) { func (f *RegisterFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
if len(transaction.InputData) < 10 { if len(transaction.InputData) < 10 {
return true, nil return true, nil
} }
if transaction.InputData[:10] == "0x0a3b0a4f" { if transaction.InputData[:10] == "0x4420e486" {
var address common.Address var address common.Address
if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil { if err := registerSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
return false, err return false, err
} }
addEvent := &pub.MinimalTxInfo{ addEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number, Block: transaction.Block.Number,
ContractAddress: transaction.To.Address, ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
To: celoutils.ChecksumAddress(transaction.To.Address), To: address.Hex(),
TxHash: transaction.Hash, TxHash: transaction.Hash,
TxIndex: transaction.Index, TxIndex: transaction.Index,
} }

View File

@ -0,0 +1,81 @@
package filter
import (
"context"
"strings"
"sync"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-chain-events/internal/pub"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/zerodha/logf"
)
const (
tokenIndexFilterEventSubject = "CHAIN.tokenAdded"
)
var (
addSig = w3.MustNewFunc("add(address)", "bool")
)
type (
TokenIndexFilterOpts struct {
Cache *sync.Map
Logg logf.Logger
Pub *pub.Pub
}
TokenIndexFilter struct {
pub *pub.Pub
cache *sync.Map
logg logf.Logger
}
)
func NewTokenIndexFilter(o TokenIndexFilterOpts) Filter {
return &TokenIndexFilter{
cache: o.Cache,
logg: o.Logg,
pub: o.Pub,
}
}
func (f *TokenIndexFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
if len(transaction.InputData) < 10 {
return true, nil
}
if transaction.InputData[:10] == "0x0a3b0a4f" {
var address common.Address
if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
return false, err
}
f.cache.Store(strings.ToLower(address.Hex()), transaction.Hash)
addEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number,
ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
To: address.Hex(),
TxHash: transaction.Hash,
TxIndex: transaction.Index,
}
if transaction.Status == 1 {
addEvent.Success = true
}
if err := f.pub.Publish(
tokenIndexFilterEventSubject,
transaction.Hash,
addEvent,
); err != nil {
return false, err
}
}
return true, nil
}

View File

@ -57,13 +57,11 @@ func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transacti
return false, err return false, err
} }
f.logg.Debug("transfer_filter: new reg", "transfer", to)
transferEvent := &pub.MinimalTxInfo{ transferEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number, Block: transaction.Block.Number,
From: celoutils.ChecksumAddress(transaction.From.Address), From: celoutils.ChecksumAddress(transaction.From.Address),
To: to.Hex(), To: to.Hex(),
ContractAddress: transaction.To.Address, ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
TxHash: transaction.Hash, TxHash: transaction.Hash,
TxIndex: transaction.Index, TxIndex: transaction.Index,
Value: value.Uint64(), Value: value.Uint64(),
@ -93,13 +91,11 @@ func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transacti
return false, err return false, err
} }
f.logg.Debug("transfer_filter: new reg", "transferFrom", to)
transferEvent := &pub.MinimalTxInfo{ transferEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number, Block: transaction.Block.Number,
From: from.Hex(), From: from.Hex(),
To: to.Hex(), To: to.Hex(),
ContractAddress: transaction.To.Address, ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
TxHash: transaction.Hash, TxHash: transaction.Hash,
TxIndex: transaction.Index, TxIndex: transaction.Index,
Value: value.Uint64(), Value: value.Uint64(),
@ -128,13 +124,11 @@ func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transacti
return false, err return false, err
} }
f.logg.Debug("transfer_filter: new reg", "mintTo", to)
transferEvent := &pub.MinimalTxInfo{ transferEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number, Block: transaction.Block.Number,
From: celoutils.ChecksumAddress(transaction.From.Address), From: celoutils.ChecksumAddress(transaction.From.Address),
To: to.Hex(), To: to.Hex(),
ContractAddress: transaction.To.Address, ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
TxHash: transaction.Hash, TxHash: transaction.Hash,
TxIndex: transaction.Index, TxIndex: transaction.Index,
Value: value.Uint64(), Value: value.Uint64(),