mirror of
				https://github.com/grassrootseconomics/cic-chain-events.git
				synced 2025-11-04 09:28:22 +01:00 
			
		
		
		
	refactor: filters, pipeline, minor fixes, remove RPC support (#27)
* devnet: snapshot * refactor: pass struct through pipeline fllters * refactor: replace timer with ticker * refactor: filters, jetstream emitter * add register filter * update gas filter * refactor: remove RPC fetcher support
This commit is contained in:
		
							parent
							
								
									6df00ddfce
								
							
						
					
					
						commit
						9d1b77e907
					
				@ -4,33 +4,50 @@ import (
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/events"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/filter"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	systemAddress = strings.ToLower("0x3D85285e39f05773aC92EAD27CB50a4385A529E4")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func initAddressFilter() filter.Filter {
 | 
			
		||||
	// TODO: Bootstrap addresses from smart contract
 | 
			
		||||
	// TODO: Add route to update cache
 | 
			
		||||
	cache := &sync.Map{}
 | 
			
		||||
 | 
			
		||||
	// Example bootstrap addresses
 | 
			
		||||
	cache.Store(strings.ToLower("0x617f3112bf5397D0467D315cC709EF968D9ba546"), "USDT")
 | 
			
		||||
	cache.Store(strings.ToLower("0x765DE816845861e75A25fCA122bb6898B8B1282a"), "cUSD")
 | 
			
		||||
	cache.Store(strings.ToLower("0xD8763CBa276a3738E6DE85b4b3bF5FDed6D6cA73"), "cEUR")
 | 
			
		||||
	cache.Store(strings.ToLower("0xB92463E2262E700e29c16416270c9Fdfa17934D7"), "TRNVoucher")
 | 
			
		||||
	cache.Store(strings.ToLower("0xf2a1fc19Ad275A0EAe3445798761FeD1Eea725d5"), "GasFaucet")
 | 
			
		||||
	cache.Store(strings.ToLower("0x1e041282695C66944BfC53cabce947cf35CEaf87"), "AddressIndex")
 | 
			
		||||
 | 
			
		||||
	return filter.NewAddressFilter(filter.AddressFilterOpts{
 | 
			
		||||
		Cache: cache,
 | 
			
		||||
		Logg:  lo,
 | 
			
		||||
		Cache:         cache,
 | 
			
		||||
		Logg:          lo,
 | 
			
		||||
		SystemAddress: systemAddress,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initDecodeFilter() filter.Filter {
 | 
			
		||||
	js, err := initJetStream()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		lo.Fatal("filters: critical error loading jetstream", "error", err)
 | 
			
		||||
	}
 | 
			
		||||
func initTransferFilter(eventEmitter events.EventEmitter) filter.Filter {
 | 
			
		||||
	return filter.NewTransferFilter(filter.TransferFilterOpts{
 | 
			
		||||
		EventEmitter: eventEmitter,
 | 
			
		||||
		Logg:         lo,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	return filter.NewDecodeFilter(filter.DecodeFilterOpts{
 | 
			
		||||
		Logg:  lo,
 | 
			
		||||
		JSCtx: js,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initGasGiftFilter(eventEmitter events.EventEmitter) filter.Filter {
 | 
			
		||||
	return filter.NewGasFilter(filter.GasFilterOpts{
 | 
			
		||||
		EventEmitter:  eventEmitter,
 | 
			
		||||
		Logg:          lo,
 | 
			
		||||
		SystemAddress: systemAddress,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initRegisterFilter(eventEmitter events.EventEmitter) filter.Filter {
 | 
			
		||||
	return filter.NewRegisterFilter(filter.RegisterFilterOpts{
 | 
			
		||||
		EventEmitter: eventEmitter,
 | 
			
		||||
		Logg:         lo,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										35
									
								
								cmd/init.go
									
									
									
									
									
								
							
							
						
						
									
										35
									
								
								cmd/init.go
									
									
									
									
									
								
							@ -4,6 +4,7 @@ import (
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/events"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/store"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/jackc/pgx/v5"
 | 
			
		||||
@ -12,7 +13,6 @@ import (
 | 
			
		||||
	"github.com/knadh/koanf/parsers/toml"
 | 
			
		||||
	"github.com/knadh/koanf/providers/env"
 | 
			
		||||
	"github.com/knadh/koanf/providers/file"
 | 
			
		||||
	"github.com/nats-io/nats.go"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@ -23,6 +23,7 @@ func initLogger(debug bool) logf.Logger {
 | 
			
		||||
 | 
			
		||||
	if debug {
 | 
			
		||||
		loggOpts.Level = logf.DebugLevel
 | 
			
		||||
		loggOpts.EnableCaller = true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return logf.New(loggOpts)
 | 
			
		||||
@ -77,32 +78,16 @@ func initFetcher() fetch.Fetch {
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initJetStream() (nats.JetStreamContext, error) {
 | 
			
		||||
	natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint"))
 | 
			
		||||
func initJetStream() (events.EventEmitter, error) {
 | 
			
		||||
	jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{
 | 
			
		||||
		ServerUrl:       ko.MustString("jetstream.endpoint"),
 | 
			
		||||
		PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour,
 | 
			
		||||
		DedupDuration:   time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	js, err := natsConn.JetStream()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Bootstrap stream if it does not exist
 | 
			
		||||
	stream, _ := js.StreamInfo(ko.MustString("jetstream.stream_name"))
 | 
			
		||||
	if stream == nil {
 | 
			
		||||
		lo.Info("jetstream: bootstrapping stream")
 | 
			
		||||
		_, err = js.AddStream(&nats.StreamConfig{
 | 
			
		||||
			Name:       ko.MustString("jetstream.stream_name"),
 | 
			
		||||
			MaxAge:     time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour,
 | 
			
		||||
			Storage:    nats.FileStorage,
 | 
			
		||||
			Subjects:   ko.MustStrings("jetstream.stream_subjects"),
 | 
			
		||||
			Duplicates: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour,
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return js, nil
 | 
			
		||||
	return jsEmitter, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										11
									
								
								cmd/main.go
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								cmd/main.go
									
									
									
									
									
								
							@ -10,10 +10,10 @@ import (
 | 
			
		||||
	"syscall"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/filter"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pool"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/syncer"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/filter"
 | 
			
		||||
	"github.com/knadh/goyesql/v2"
 | 
			
		||||
	"github.com/knadh/koanf"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
@ -58,13 +58,20 @@ func main() {
 | 
			
		||||
		lo.Fatal("main: critical error loading pg store", "error", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	jsCtx, err := initJetStream()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		lo.Fatal("main: critical error loading jetstream context", "error", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	graphqlFetcher := initFetcher()
 | 
			
		||||
 | 
			
		||||
	pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{
 | 
			
		||||
		BlockFetcher: graphqlFetcher,
 | 
			
		||||
		Filters: []filter.Filter{
 | 
			
		||||
			initAddressFilter(),
 | 
			
		||||
			initDecodeFilter(),
 | 
			
		||||
			initGasGiftFilter(jsCtx),
 | 
			
		||||
			initTransferFilter(jsCtx),
 | 
			
		||||
			initRegisterFilter(jsCtx),
 | 
			
		||||
		},
 | 
			
		||||
		Logg:  lo,
 | 
			
		||||
		Store: pgStore,
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										28
									
								
								config.toml
									
									
									
									
									
								
							
							
						
						
									
										28
									
								
								config.toml
									
									
									
									
									
								
							@ -1,43 +1,35 @@
 | 
			
		||||
[metrics]
 | 
			
		||||
# Exposes Prometheus metrics
 | 
			
		||||
# /metrics endpoint
 | 
			
		||||
go_process = true
 | 
			
		||||
 | 
			
		||||
# API server
 | 
			
		||||
[api]
 | 
			
		||||
# Host and port
 | 
			
		||||
address = ":8080"
 | 
			
		||||
address = ":8085"
 | 
			
		||||
 | 
			
		||||
# Geth API endpoints
 | 
			
		||||
[chain]
 | 
			
		||||
graphql_endpoint = "https://rpc.celo.grassecon.net/graphql"
 | 
			
		||||
ws_endpoint      = "wss://socket.celo.grassecon.net"
 | 
			
		||||
graphql_endpoint = "https://rpc.alfajores.celo.grassecon.net/graphql"
 | 
			
		||||
ws_endpoint      = "wss://ws.alfajores.celo.grassecon.net"
 | 
			
		||||
 | 
			
		||||
# Syncer configs
 | 
			
		||||
[syncer]
 | 
			
		||||
# Number of goroutines assigned to the janitor worker pool
 | 
			
		||||
# Maximum number of missing blocks pushed into the worker queue every janitor sweep
 | 
			
		||||
janitor_queue_size     = 500
 | 
			
		||||
# Number of goroutines assigned to the worker pool
 | 
			
		||||
janitor_concurrency    = 5
 | 
			
		||||
# Syncer start block
 | 
			
		||||
initial_lower_bound    = 17269000
 | 
			
		||||
# Max blocks in worker queue awaiting processing
 | 
			
		||||
janitor_queue_size     = 500
 | 
			
		||||
# Janitor sweep interval
 | 
			
		||||
initial_lower_bound    = 16373156
 | 
			
		||||
# Janitor sweep interval, should take into account concurrency and queue_size
 | 
			
		||||
janitor_sweep_interval = 5
 | 
			
		||||
 | 
			
		||||
[postgres]
 | 
			
		||||
# Default is the Docker container DSN
 | 
			
		||||
dsn   = "postgres://postgres:postgres@localhost:5432/cic_chain_events"
 | 
			
		||||
 | 
			
		||||
# https://docs.nats.io/
 | 
			
		||||
[jetstream]
 | 
			
		||||
endpoint               = "nats://localhost:4222"
 | 
			
		||||
stream_name            = "CHAIN"
 | 
			
		||||
# Duration JetStream should keep the message before remocing it from the persistent store
 | 
			
		||||
# Duration JetStream should keep the message before GC
 | 
			
		||||
persist_duration_hours = 48
 | 
			
		||||
# Duration to ignore duplicate transactions (e.g. due to restart)
 | 
			
		||||
dedup_duration_hours   = 6
 | 
			
		||||
# Stream subjects
 | 
			
		||||
stream_subjects        = [
 | 
			
		||||
    "CHAIN.transfer",
 | 
			
		||||
    "CHAIN.transferFrom",
 | 
			
		||||
    "CHAIN.mintTo"
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										1
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								go.mod
									
									
									
									
									
								
							@ -7,7 +7,6 @@ require (
 | 
			
		||||
	github.com/alitto/pond v1.8.2
 | 
			
		||||
	github.com/celo-org/celo-blockchain v1.6.1
 | 
			
		||||
	github.com/goccy/go-json v0.10.0
 | 
			
		||||
	github.com/grassrootseconomics/cic-celo-sdk v0.3.1
 | 
			
		||||
	github.com/grassrootseconomics/w3-celo-patch v0.1.0
 | 
			
		||||
	github.com/jackc/pgx/v5 v5.2.0
 | 
			
		||||
	github.com/knadh/goyesql/v2 v2.2.0
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							@ -248,8 +248,6 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
 | 
			
		||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
 | 
			
		||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 | 
			
		||||
github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
 | 
			
		||||
github.com/grassrootseconomics/cic-celo-sdk v0.3.1 h1:SzmMFrqxSIdgePqwbUdoS3PNP82MFnlOecycVk2ZYWg=
 | 
			
		||||
github.com/grassrootseconomics/cic-celo-sdk v0.3.1/go.mod h1:EiR6d03GYu6jlVKNL1MbTAw/bqAW2WP3J/lkrZxPMdU=
 | 
			
		||||
github.com/grassrootseconomics/w3-celo-patch v0.1.0 h1:0fev2hYkGEyFX2D4oUG8yy4jXhtHv7qUtLLboXL5ycw=
 | 
			
		||||
github.com/grassrootseconomics/w3-celo-patch v0.1.0/go.mod h1:JtkXc+yDUiQQJdhYTqddZI/itdYGHY7H8PNZzBo4hCk=
 | 
			
		||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										17
									
								
								internal/events/events.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										17
									
								
								internal/events/events.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,17 @@
 | 
			
		||||
package events
 | 
			
		||||
 | 
			
		||||
type EventEmitter interface {
 | 
			
		||||
	Close()
 | 
			
		||||
	Publish(subject string, dedupId string, eventPayload interface{}) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MinimalTxInfo struct {
 | 
			
		||||
	Block           uint64 `json:"block"`
 | 
			
		||||
	From            string `json:"from"`
 | 
			
		||||
	To              string `json:"to"`
 | 
			
		||||
	ContractAddress string `json:"contractAddress"`
 | 
			
		||||
	Success         bool   `json:"success"`
 | 
			
		||||
	TxHash          string `json:"transactionHash"`
 | 
			
		||||
	TxIndex         uint   `json:"transactionIndex"`
 | 
			
		||||
	Value           uint64 `json:"value"`
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										78
									
								
								internal/events/jetstream.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										78
									
								
								internal/events/jetstream.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,78 @@
 | 
			
		||||
package events
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/goccy/go-json"
 | 
			
		||||
	"github.com/nats-io/nats.go"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	StreamName     string = "CHAIN"
 | 
			
		||||
	StreamSubjects string = "CHAIN.*"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type JetStreamOpts struct {
 | 
			
		||||
	ServerUrl       string
 | 
			
		||||
	PersistDuration time.Duration
 | 
			
		||||
	DedupDuration   time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JetStream struct {
 | 
			
		||||
	jsCtx nats.JetStreamContext
 | 
			
		||||
	nc    *nats.Conn
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) {
 | 
			
		||||
	natsConn, err := nats.Connect(o.ServerUrl)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	js, err := natsConn.JetStream()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Bootstrap stream if it doesn't exist.
 | 
			
		||||
	stream, _ := js.StreamInfo(StreamName)
 | 
			
		||||
	if stream == nil {
 | 
			
		||||
		_, err = js.AddStream(&nats.StreamConfig{
 | 
			
		||||
			Name:       StreamName,
 | 
			
		||||
			MaxAge:     o.PersistDuration,
 | 
			
		||||
			Storage:    nats.FileStorage,
 | 
			
		||||
			Subjects:   []string{StreamSubjects},
 | 
			
		||||
			Duplicates: o.DedupDuration,
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &JetStream{
 | 
			
		||||
		jsCtx: js,
 | 
			
		||||
		nc:    natsConn,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close gracefully shutdowns the JetStream connection.
 | 
			
		||||
func (js *JetStream) Close() {
 | 
			
		||||
	if js.nc != nil {
 | 
			
		||||
		js.nc.Close()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Publish publishes the JSON data to the NATS stream.
 | 
			
		||||
func (js *JetStream) Publish(subject string, dedupId string, eventPayload interface{}) error {
 | 
			
		||||
	jsonData, err := json.Marshal(eventPayload)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = js.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@ -9,23 +9,30 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type AddressFilterOpts struct {
 | 
			
		||||
	Cache *sync.Map
 | 
			
		||||
	Logg  logf.Logger
 | 
			
		||||
	Cache         *sync.Map
 | 
			
		||||
	Logg          logf.Logger
 | 
			
		||||
	SystemAddress string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type AddressFilter struct {
 | 
			
		||||
	cache *sync.Map
 | 
			
		||||
	logg  logf.Logger
 | 
			
		||||
	cache         *sync.Map
 | 
			
		||||
	logg          logf.Logger
 | 
			
		||||
	systemAddress string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewAddressFilter(o AddressFilterOpts) Filter {
 | 
			
		||||
	return &AddressFilter{
 | 
			
		||||
		cache: o.Cache,
 | 
			
		||||
		logg:  o.Logg,
 | 
			
		||||
		cache:         o.Cache,
 | 
			
		||||
		logg:          o.Logg,
 | 
			
		||||
		systemAddress: o.SystemAddress,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
 | 
			
		||||
func (f *AddressFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
 | 
			
		||||
	if transaction.From.Address == f.systemAddress {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if _, found := f.cache.Load(transaction.To.Address); found {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -7,7 +7,6 @@ import (
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/stretchr/testify/suite"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type AddressFilterSuite struct {
 | 
			
		||||
@ -20,15 +19,8 @@ func (s *AddressFilterSuite) SetupSuite() {
 | 
			
		||||
 | 
			
		||||
	addressCache.Store("0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9f", "")
 | 
			
		||||
 | 
			
		||||
	logg := logf.New(
 | 
			
		||||
		logf.Opts{
 | 
			
		||||
			Level: logf.DebugLevel,
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	s.filter = NewAddressFilter(AddressFilterOpts{
 | 
			
		||||
		Cache: addressCache,
 | 
			
		||||
		Logg:  logg,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -66,7 +58,7 @@ func (s *AddressFilterSuite) TestAddresses() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		next, err := s.filter.Execute(context.Background(), &test.transactionData)
 | 
			
		||||
		next, err := s.filter.Execute(context.Background(), test.transactionData)
 | 
			
		||||
		s.NoError(err)
 | 
			
		||||
		s.Equal(test.want, next)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -1,161 +0,0 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"math/big"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch"
 | 
			
		||||
	"github.com/nats-io/nats.go"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	transferSig     = w3.MustNewFunc("transfer(address, uint256)", "bool")
 | 
			
		||||
	transferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool")
 | 
			
		||||
	mintToSig       = w3.MustNewFunc("mintTo(address, uint256)", "bool")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type DecodeFilterOpts struct {
 | 
			
		||||
	Logg  logf.Logger
 | 
			
		||||
	JSCtx nats.JetStreamContext
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type DecodeFilter struct {
 | 
			
		||||
	logg logf.Logger
 | 
			
		||||
	js   nats.JetStreamContext
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type minimalTxInfo struct {
 | 
			
		||||
	Block        uint64 `json:"block"`
 | 
			
		||||
	From         string `json:"from"`
 | 
			
		||||
	Success      bool   `json:"success"`
 | 
			
		||||
	To           string `json:"to"`
 | 
			
		||||
	TokenAddress string `json:"tokenAddress"`
 | 
			
		||||
	TxHash       string `json:"transactionHash"`
 | 
			
		||||
	TxIndex      uint   `json:"transactionIndex"`
 | 
			
		||||
	Value        uint64 `json:"value"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewDecodeFilter(o DecodeFilterOpts) Filter {
 | 
			
		||||
	return &DecodeFilter{
 | 
			
		||||
		logg: o.Logg,
 | 
			
		||||
		js:   o.JSCtx,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *DecodeFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
 | 
			
		||||
	switch transaction.InputData[:10] {
 | 
			
		||||
	case "0xa9059cbb":
 | 
			
		||||
		var (
 | 
			
		||||
			to    common.Address
 | 
			
		||||
			value big.Int
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		if err := transferSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		transferEvent := &minimalTxInfo{
 | 
			
		||||
			Block:        transaction.Block.Number,
 | 
			
		||||
			From:         transaction.From.Address,
 | 
			
		||||
			To:           to.Hex(),
 | 
			
		||||
			TokenAddress: transaction.To.Address,
 | 
			
		||||
			TxHash:       transaction.Hash,
 | 
			
		||||
			TxIndex:      transaction.Index,
 | 
			
		||||
			Value:        value.Uint64(),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			transferEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		json, err := json.Marshal(transferEvent)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		_, err = f.js.Publish("CHAIN.transfer", json, nats.MsgId(transaction.Hash))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	case "0x23b872dd":
 | 
			
		||||
		var (
 | 
			
		||||
			from  common.Address
 | 
			
		||||
			to    common.Address
 | 
			
		||||
			value big.Int
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		if err := transferFromSig.DecodeArgs(w3.B(transaction.InputData), &from, &to, &value); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		transferFromEvent := &minimalTxInfo{
 | 
			
		||||
			Block:        transaction.Block.Number,
 | 
			
		||||
			From:         from.Hex(),
 | 
			
		||||
			To:           to.Hex(),
 | 
			
		||||
			TokenAddress: transaction.To.Address,
 | 
			
		||||
			TxHash:       transaction.Hash,
 | 
			
		||||
			TxIndex:      transaction.Index,
 | 
			
		||||
			Value:        value.Uint64(),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			transferFromEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		json, err := json.Marshal(transferFromEvent)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		_, err = f.js.Publish("CHAIN.transferFrom", json, nats.MsgId(transaction.Hash))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	case "0x449a52f8":
 | 
			
		||||
		var (
 | 
			
		||||
			to    common.Address
 | 
			
		||||
			value big.Int
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		if err := mintToSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		mintToEvent := &minimalTxInfo{
 | 
			
		||||
			Block:        transaction.Block.Number,
 | 
			
		||||
			From:         transaction.From.Address,
 | 
			
		||||
			To:           to.Hex(),
 | 
			
		||||
			TokenAddress: transaction.To.Address,
 | 
			
		||||
			TxHash:       transaction.Hash,
 | 
			
		||||
			TxIndex:      transaction.Index,
 | 
			
		||||
			Value:        value.Uint64(),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			mintToEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		json, err := json.Marshal(mintToEvent)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		_, err = f.js.Publish("CHAIN.mintTo", json, nats.MsgId(transaction.Hash))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	default:
 | 
			
		||||
		f.logg.Debug("unknownSignature", "inpuData", transaction.InputData)
 | 
			
		||||
		return false, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -8,5 +8,5 @@ import (
 | 
			
		||||
 | 
			
		||||
// Filter defines a read only filter which must return next as true/false or an error
 | 
			
		||||
type Filter interface {
 | 
			
		||||
	Execute(ctx context.Context, inputTransaction *fetch.Transaction) (next bool, err error)
 | 
			
		||||
	Execute(ctx context.Context, inputTransaction fetch.Transaction) (next bool, err error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										68
									
								
								internal/filter/gas_filter.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										68
									
								
								internal/filter/gas_filter.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,68 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common/hexutil"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/events"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	gasFilterEventSubject = "CHAIN.gas"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type GasFilterOpts struct {
 | 
			
		||||
	EventEmitter  events.EventEmitter
 | 
			
		||||
	Logg          logf.Logger
 | 
			
		||||
	SystemAddress string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type GasFilter struct {
 | 
			
		||||
	eventEmitter  events.EventEmitter
 | 
			
		||||
	logg          logf.Logger
 | 
			
		||||
	systemAddress string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewGasFilter(o GasFilterOpts) Filter {
 | 
			
		||||
	return &GasFilter{
 | 
			
		||||
		eventEmitter:  o.EventEmitter,
 | 
			
		||||
		logg:          o.Logg,
 | 
			
		||||
		systemAddress: o.SystemAddress,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *GasFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
 | 
			
		||||
	transferValue, err := hexutil.DecodeUint64(transaction.Value)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return false, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO: This is a temporary shortcut to gift gas. Switch to gas faucet contract.
 | 
			
		||||
	if transaction.From.Address == f.systemAddress && transferValue > 0 {
 | 
			
		||||
		transferEvent := &events.MinimalTxInfo{
 | 
			
		||||
			Block:   transaction.Block.Number,
 | 
			
		||||
			To:      transaction.To.Address,
 | 
			
		||||
			TxHash:  transaction.Hash,
 | 
			
		||||
			TxIndex: transaction.Index,
 | 
			
		||||
			Value:   transferValue,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			transferEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := f.eventEmitter.Publish(
 | 
			
		||||
			gasFilterEventSubject,
 | 
			
		||||
			transaction.Hash,
 | 
			
		||||
			transferEvent,
 | 
			
		||||
		); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true, nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										74
									
								
								internal/filter/register_filter.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										74
									
								
								internal/filter/register_filter.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,74 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/events"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	registerEventSubject = "CHAIN.register"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	addSig = w3.MustNewFunc("add(address)", "bool")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type RegisterFilterOpts struct {
 | 
			
		||||
	EventEmitter events.EventEmitter
 | 
			
		||||
	Logg         logf.Logger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type RegisterFilter struct {
 | 
			
		||||
	eventEmitter events.EventEmitter
 | 
			
		||||
	logg         logf.Logger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewRegisterFilter(o RegisterFilterOpts) Filter {
 | 
			
		||||
	return &RegisterFilter{
 | 
			
		||||
		eventEmitter: o.EventEmitter,
 | 
			
		||||
		logg:         o.Logg,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *RegisterFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
 | 
			
		||||
	if len(transaction.InputData) < 10 {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if transaction.InputData[:10] == "0x0a3b0a4f" {
 | 
			
		||||
		var address common.Address
 | 
			
		||||
 | 
			
		||||
		if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		addEvent := &events.MinimalTxInfo{
 | 
			
		||||
			Block:           transaction.Block.Number,
 | 
			
		||||
			ContractAddress: transaction.To.Address,
 | 
			
		||||
			To:              transaction.To.Address,
 | 
			
		||||
			TxHash:          transaction.Hash,
 | 
			
		||||
			TxIndex:         transaction.Index,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			addEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := f.eventEmitter.Publish(
 | 
			
		||||
			registerEventSubject,
 | 
			
		||||
			transaction.Hash,
 | 
			
		||||
			addEvent,
 | 
			
		||||
		); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
	
 | 
			
		||||
	return true, nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										156
									
								
								internal/filter/transfer_filter.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										156
									
								
								internal/filter/transfer_filter.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,156 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"math/big"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/events"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	transferFilterEventSubject = "CHAIN.transfer"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	transferSig     = w3.MustNewFunc("transfer(address, uint256)", "bool")
 | 
			
		||||
	transferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool")
 | 
			
		||||
	mintToSig       = w3.MustNewFunc("mintTo(address, uint256)", "bool")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type TransferFilterOpts struct {
 | 
			
		||||
	EventEmitter events.EventEmitter
 | 
			
		||||
	Logg         logf.Logger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type TransferFilter struct {
 | 
			
		||||
	eventEmitter events.EventEmitter
 | 
			
		||||
	logg         logf.Logger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewTransferFilter(o TransferFilterOpts) Filter {
 | 
			
		||||
	return &TransferFilter{
 | 
			
		||||
		eventEmitter: o.EventEmitter,
 | 
			
		||||
		logg:         o.Logg,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) {
 | 
			
		||||
	if len(transaction.InputData) < 10 {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch transaction.InputData[:10] {
 | 
			
		||||
	case "0xa9059cbb":
 | 
			
		||||
		var (
 | 
			
		||||
			to    common.Address
 | 
			
		||||
			value big.Int
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		if err := transferSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		f.logg.Debug("transfer_filter: new reg", "transfer", to)
 | 
			
		||||
 | 
			
		||||
		transferEvent := &events.MinimalTxInfo{
 | 
			
		||||
			Block:           transaction.Block.Number,
 | 
			
		||||
			From:            transaction.From.Address,
 | 
			
		||||
			To:              to.Hex(),
 | 
			
		||||
			ContractAddress: transaction.To.Address,
 | 
			
		||||
			TxHash:          transaction.Hash,
 | 
			
		||||
			TxIndex:         transaction.Index,
 | 
			
		||||
			Value:           value.Uint64(),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			transferEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := f.eventEmitter.Publish(
 | 
			
		||||
			transferFilterEventSubject,
 | 
			
		||||
			transaction.Hash,
 | 
			
		||||
			transferEvent,
 | 
			
		||||
		); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	case "0x23b872dd":
 | 
			
		||||
		var (
 | 
			
		||||
			from  common.Address
 | 
			
		||||
			to    common.Address
 | 
			
		||||
			value big.Int
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		if err := transferFromSig.DecodeArgs(w3.B(transaction.InputData), &from, &to, &value); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		f.logg.Debug("transfer_filter: new reg", "transferFrom", to)
 | 
			
		||||
 | 
			
		||||
		transferEvent := &events.MinimalTxInfo{
 | 
			
		||||
			Block:           transaction.Block.Number,
 | 
			
		||||
			From:            from.Hex(),
 | 
			
		||||
			To:              to.Hex(),
 | 
			
		||||
			ContractAddress: transaction.To.Address,
 | 
			
		||||
			TxHash:          transaction.Hash,
 | 
			
		||||
			TxIndex:         transaction.Index,
 | 
			
		||||
			Value:           value.Uint64(),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			transferEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := f.eventEmitter.Publish(
 | 
			
		||||
			transferFilterEventSubject,
 | 
			
		||||
			transaction.Hash,
 | 
			
		||||
			transferEvent,
 | 
			
		||||
		); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	case "0x449a52f8":
 | 
			
		||||
		var (
 | 
			
		||||
			to    common.Address
 | 
			
		||||
			value big.Int
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		if err := mintToSig.DecodeArgs(w3.B(transaction.InputData), &to, &value); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		f.logg.Debug("transfer_filter: new reg", "mintTo", to)
 | 
			
		||||
 | 
			
		||||
		transferEvent := &events.MinimalTxInfo{
 | 
			
		||||
			Block:           transaction.Block.Number,
 | 
			
		||||
			From:            transaction.From.Address,
 | 
			
		||||
			To:              to.Hex(),
 | 
			
		||||
			ContractAddress: transaction.To.Address,
 | 
			
		||||
			TxHash:          transaction.Hash,
 | 
			
		||||
			TxIndex:         transaction.Index,
 | 
			
		||||
			Value:           value.Uint64(),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			transferEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := f.eventEmitter.Publish(
 | 
			
		||||
			transferFilterEventSubject,
 | 
			
		||||
			transaction.Hash,
 | 
			
		||||
			transferEvent,
 | 
			
		||||
		); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	default:
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -9,24 +9,24 @@ import (
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type DecodeFilterSuite struct {
 | 
			
		||||
type TransferFilterSuite struct {
 | 
			
		||||
	suite.Suite
 | 
			
		||||
	filter Filter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *DecodeFilterSuite) SetupSuite() {
 | 
			
		||||
func (s *TransferFilterSuite) SetupSuite() {
 | 
			
		||||
	logg := logf.New(
 | 
			
		||||
		logf.Opts{
 | 
			
		||||
			Level: logf.DebugLevel,
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	s.filter = NewDecodeFilter(DecodeFilterOpts{
 | 
			
		||||
	s.filter = NewTransferFilter(TransferFilterOpts{
 | 
			
		||||
		Logg: logg,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *DecodeFilterSuite) TestTranfserInputs() {
 | 
			
		||||
func (s *TransferFilterSuite) TestTranfserInputs() {
 | 
			
		||||
	type testCase struct {
 | 
			
		||||
		transactionData fetch.Transaction
 | 
			
		||||
		want            bool
 | 
			
		||||
@ -66,12 +66,12 @@ func (s *DecodeFilterSuite) TestTranfserInputs() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		next, err := s.filter.Execute(context.Background(), &test.transactionData)
 | 
			
		||||
		next, err := s.filter.Execute(context.Background(), test.transactionData)
 | 
			
		||||
		s.NoError(err)
 | 
			
		||||
		s.Equal(test.want, next)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestDecodeFilterSuite(t *testing.T) {
 | 
			
		||||
	suite.Run(t, new(DecodeFilterSuite))
 | 
			
		||||
func TestTransferFilterSuite(t *testing.T) {
 | 
			
		||||
	suite.Run(t, new(TransferFilterSuite))
 | 
			
		||||
}
 | 
			
		||||
@ -49,7 +49,7 @@ func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error {
 | 
			
		||||
 | 
			
		||||
	for _, tx := range fetchResp.Data.Block.Transactions {
 | 
			
		||||
		for _, filter := range md.filters {
 | 
			
		||||
			next, err := filter.Execute(ctx, &tx)
 | 
			
		||||
			next, err := filter.Execute(ctx, tx)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
@ -48,20 +48,18 @@ func NewJanitor(o JanitorOpts) *Janitor {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (j *Janitor) Start(ctx context.Context) error {
 | 
			
		||||
	timer := time.NewTimer(j.sweepInterval)
 | 
			
		||||
	ticker := time.NewTicker(j.sweepInterval)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			j.logg.Info("janitor: shutdown signal received")
 | 
			
		||||
			return nil
 | 
			
		||||
		case <-timer.C:
 | 
			
		||||
		case <-ticker.C:
 | 
			
		||||
			j.logg.Debug("janitor: starting sweep")
 | 
			
		||||
			if err := j.QueueMissingBlocks(context.Background()); err != nil {
 | 
			
		||||
				j.logg.Error("janitor: queue missing blocks error", "error", err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			timer.Reset(j.sweepInterval)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,50 +0,0 @@
 | 
			
		||||
package fetch
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	celo "github.com/grassrootseconomics/cic-celo-sdk"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Benchmark_RPC(b *testing.B) {
 | 
			
		||||
	celoProvider, err := celo.NewProvider(celo.ProviderOpts{
 | 
			
		||||
		ChainId:     celo.MainnetChainId,
 | 
			
		||||
		RpcEndpoint: rpcEndpoint,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	rpc := NewRPCFetcher(RPCOpts{
 | 
			
		||||
		RPCProvider: celoProvider,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b.Run("RPC_Block_Fetcher_Benchmark", func(b *testing.B) {
 | 
			
		||||
		for n := 0; n < b.N; n++ {
 | 
			
		||||
			_, err := rpc.Block(context.Background(), 14974600)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				b.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		b.ReportAllocs()
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Benchmark_GraphQL(b *testing.B) {
 | 
			
		||||
	graphql := NewGraphqlFetcher(GraphqlOpts{
 | 
			
		||||
		GraphqlEndpoint: graphqlEndpoint,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	b.Run("GraphQL_Block_Fetcher_Benchmark", func(b *testing.B) {
 | 
			
		||||
		for n := 0; n < b.N; n++ {
 | 
			
		||||
			_, err := graphql.Block(context.Background(), 14974600)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				b.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		b.ReportAllocs()
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -1,97 +0,0 @@
 | 
			
		||||
package fetch
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"math/big"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common/hexutil"
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/core/types"
 | 
			
		||||
	celo "github.com/grassrootseconomics/cic-celo-sdk"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch/module/eth"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch/w3types"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// RPCOpts reprsents the required paramters for an RPC fetcher.
 | 
			
		||||
type RPCOpts struct {
 | 
			
		||||
	RPCProvider *celo.Provider
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RPC is a RPC based block and transaction fetcher.
 | 
			
		||||
type RPC struct {
 | 
			
		||||
	provider *celo.Provider
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewRPCFetcher returns a new RPC fetcher which implemnts Fetch.
 | 
			
		||||
// Note: No rate limiting feeature.
 | 
			
		||||
func NewRPCFetcher(o RPCOpts) Fetch {
 | 
			
		||||
	return &RPC{
 | 
			
		||||
		provider: o.RPCProvider,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Block fetches via RPC and transforms the response to adapt to the GraphQL JSON response struct.
 | 
			
		||||
func (f *RPC) Block(ctx context.Context, blockNumber uint64) (FetchResponse, error) {
 | 
			
		||||
	var (
 | 
			
		||||
		block         types.Block
 | 
			
		||||
		fetchResponse FetchResponse
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	if err := f.provider.Client.CallCtx(
 | 
			
		||||
		ctx,
 | 
			
		||||
		eth.BlockByNumber(big.NewInt(int64(blockNumber))).Returns(&block),
 | 
			
		||||
	); err != nil {
 | 
			
		||||
		return fetchResponse, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	txCount := len(block.Transactions())
 | 
			
		||||
	batchCalls := make([]w3types.Caller, txCount*2)
 | 
			
		||||
 | 
			
		||||
	txs := make([]types.Transaction, txCount)
 | 
			
		||||
	txsReceipt := make([]types.Receipt, txCount)
 | 
			
		||||
 | 
			
		||||
	// Prepare batch calls.
 | 
			
		||||
	for i, tx := range block.Transactions() {
 | 
			
		||||
		batchCalls[i] = eth.Tx(tx.Hash()).Returns(&txs[i])
 | 
			
		||||
		batchCalls[txCount+i] = eth.TxReceipt(tx.Hash()).Returns(&txsReceipt[i])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := f.provider.Client.CallCtx(
 | 
			
		||||
		ctx,
 | 
			
		||||
		batchCalls...,
 | 
			
		||||
	); err != nil {
 | 
			
		||||
		return fetchResponse, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Transform response and adapt to FetchResponse.
 | 
			
		||||
	for i := 0; i < txCount; i++ {
 | 
			
		||||
		var txObject Transaction
 | 
			
		||||
 | 
			
		||||
		txObject.Block.Number = block.NumberU64()
 | 
			
		||||
		txObject.Block.Timestamp = hexutil.EncodeUint64(block.Time())
 | 
			
		||||
 | 
			
		||||
		from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fetchResponse, err
 | 
			
		||||
		}
 | 
			
		||||
		txObject.From.Address = strings.ToLower(from.Hex())
 | 
			
		||||
		// This check ignores contract deployment transactions.
 | 
			
		||||
		if txs[i].To() != nil {
 | 
			
		||||
			txObject.To.Address = strings.ToLower(txs[i].To().Hex())
 | 
			
		||||
		}
 | 
			
		||||
		txObject.Value = hexutil.EncodeBig(txs[i].Value())
 | 
			
		||||
		txObject.InputData = hexutil.Encode(txs[i].Data())
 | 
			
		||||
 | 
			
		||||
		txObject.Hash = txsReceipt[i].TxHash.Hex()
 | 
			
		||||
		txObject.Index = txsReceipt[i].TransactionIndex
 | 
			
		||||
		txObject.Status = txsReceipt[i].Status
 | 
			
		||||
		txObject.GasUsed = txsReceipt[i].GasUsed
 | 
			
		||||
 | 
			
		||||
		fetchResponse.Data.Block.Transactions = append(
 | 
			
		||||
			fetchResponse.Data.Block.Transactions,
 | 
			
		||||
			txObject,
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return fetchResponse, nil
 | 
			
		||||
}
 | 
			
		||||
@ -1,49 +0,0 @@
 | 
			
		||||
package fetch
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"os"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	celo "github.com/grassrootseconomics/cic-celo-sdk"
 | 
			
		||||
	"github.com/stretchr/testify/suite"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	rpcEndpoint = os.Getenv("TEST_RPC_ENDPOINT")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type RPCTestSuite struct {
 | 
			
		||||
	suite.Suite
 | 
			
		||||
	fetch Fetch
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *RPCTestSuite) SetupSuite() {
 | 
			
		||||
	celoProvider, err := celo.NewProvider(celo.ProviderOpts{
 | 
			
		||||
		ChainId:     celo.MainnetChainId,
 | 
			
		||||
		RpcEndpoint: rpcEndpoint,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.fetch = NewRPCFetcher(RPCOpts{
 | 
			
		||||
		RPCProvider: celoProvider,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *RPCTestSuite) Test_E2E_Fetch_Existing_Block() {
 | 
			
		||||
	resp, err := s.fetch.Block(context.Background(), 14974600)
 | 
			
		||||
	s.NoError(err)
 | 
			
		||||
	s.Len(resp.Data.Block.Transactions, 3)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *RPCTestSuite) Test_E2E_Fetch_Non_Existing_Block() {
 | 
			
		||||
	_, err := s.fetch.Block(context.Background(), 14974600000)
 | 
			
		||||
	s.Error(err)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestRPCSuite(t *testing.T) {
 | 
			
		||||
	suite.Run(t, new(RPCTestSuite))
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user