diff --git a/cmd/filters.go b/cmd/filters.go index 0b40fc0..8656835 100644 --- a/cmd/filters.go +++ b/cmd/filters.go @@ -4,8 +4,12 @@ import ( "strings" "sync" + "github.com/grassrootseconomics/cic-chain-events/internal/events" "github.com/grassrootseconomics/cic-chain-events/internal/filter" - "github.com/nats-io/nats.go" +) + +var ( + systemAddress = strings.ToLower("0x3D85285e39f05773aC92EAD27CB50a4385A529E4") ) func initAddressFilter() filter.Filter { @@ -14,26 +18,36 @@ func initAddressFilter() filter.Filter { cache := &sync.Map{} // Example bootstrap addresses - cache.Store(strings.ToLower("0x54c8D8718Ea9E7b2b4542e630fd36Ccab32cE74E"), "BABVoucher") - cache.Store(strings.ToLower("0xdD4F5ea484F6b16f031eF7B98F3810365493BC20"), "GasFaucet") + cache.Store(strings.ToLower("0xB92463E2262E700e29c16416270c9Fdfa17934D7"), "TRNVoucher") + cache.Store(strings.ToLower("0xf2a1fc19Ad275A0EAe3445798761FeD1Eea725d5"), "GasFaucet") + cache.Store(strings.ToLower("0x1e041282695C66944BfC53cabce947cf35CEaf87"), "AddressIndex") return filter.NewAddressFilter(filter.AddressFilterOpts{ - Cache: cache, - Logg: lo, + Cache: cache, + Logg: lo, + SystemAddress: systemAddress, }) } -func initTransferFilter(jsCtx nats.JetStreamContext) filter.Filter { +func initTransferFilter(eventEmitter events.EventEmitter) filter.Filter { return filter.NewTransferFilter(filter.TransferFilterOpts{ - Logg: lo, - JSCtx: jsCtx, + EventEmitter: eventEmitter, + Logg: lo, }) } -func initGasGiftFilter(jsCtx nats.JetStreamContext) filter.Filter { +func initGasGiftFilter(eventEmitter events.EventEmitter) filter.Filter { return filter.NewGasFilter(filter.GasFilterOpts{ - Logg: lo, - JSCtx: jsCtx, + EventEmitter: eventEmitter, + Logg: lo, + SystemAddress: systemAddress, + }) +} + +func initRegisterFilter(eventEmitter events.EventEmitter) filter.Filter { + return filter.NewRegisterFilter(filter.RegisterFilterOpts{ + EventEmitter: eventEmitter, + Logg: lo, }) } diff --git a/cmd/init.go b/cmd/init.go index c10f605..e085194 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -4,6 +4,7 @@ import ( "strings" "time" + "github.com/grassrootseconomics/cic-chain-events/internal/events" "github.com/grassrootseconomics/cic-chain-events/internal/store" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/jackc/pgx/v5" @@ -12,7 +13,6 @@ import ( "github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/file" - "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) @@ -23,6 +23,7 @@ func initLogger(debug bool) logf.Logger { if debug { loggOpts.Level = logf.DebugLevel + loggOpts.EnableCaller = true } return logf.New(loggOpts) @@ -77,32 +78,16 @@ func initFetcher() fetch.Fetch { }) } -func initJetStream() (nats.JetStreamContext, error) { - natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint")) +func initJetStream() (events.EventEmitter, error) { + jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{ + ServerUrl: ko.MustString("jetstream.endpoint"), + PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour, + DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour, + }) + if err != nil { return nil, err } - js, err := natsConn.JetStream() - if err != nil { - return nil, err - } - - // Bootstrap stream if it does not exist - stream, _ := js.StreamInfo(ko.MustString("jetstream.stream_name")) - if stream == nil { - lo.Info("jetstream: bootstrapping stream") - _, err = js.AddStream(&nats.StreamConfig{ - Name: ko.MustString("jetstream.stream_name"), - MaxAge: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour, - Storage: nats.FileStorage, - Subjects: ko.MustStrings("jetstream.stream_subjects"), - Duplicates: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour, - }) - if err != nil { - return nil, err - } - } - - return js, nil + return jsEmitter, nil } diff --git a/cmd/main.go b/cmd/main.go index 7ba961c..e8e31bb 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -69,8 +69,9 @@ func main() { BlockFetcher: graphqlFetcher, Filters: []filter.Filter{ initAddressFilter(), - initTransferFilter(jsCtx), initGasGiftFilter(jsCtx), + initTransferFilter(jsCtx), + initRegisterFilter(jsCtx), }, Logg: lo, Store: pgStore, diff --git a/config.toml b/config.toml index caa2e08..dbfa45b 100644 --- a/config.toml +++ b/config.toml @@ -1,43 +1,35 @@ [metrics] # Exposes Prometheus metrics -# /metrics endpoint go_process = true # API server [api] # Host and port -address = ":8080" +address = ":8085" # Geth API endpoints [chain] -graphql_endpoint = "https://rpc.celo.grassecon.net/graphql" -ws_endpoint = "wss://socket.celo.grassecon.net" +graphql_endpoint = "https://rpc.alfajores.celo.grassecon.net/graphql" +ws_endpoint = "wss://ws.alfajores.celo.grassecon.net" +# Syncer configs [syncer] -# Number of goroutines assigned to the janitor worker pool +# Maximum number of missing blocks pushed into the worker queue every janitor sweep +janitor_queue_size = 500 +# Number of goroutines assigned to the worker pool janitor_concurrency = 5 # Syncer start block -initial_lower_bound = 17269000 -# Max blocks in worker queue awaiting processing -janitor_queue_size = 500 -# Janitor sweep interval +initial_lower_bound = 16373156 +# Janitor sweep interval, should take into account concurrency and queue_size janitor_sweep_interval = 5 [postgres] -# Default is the Docker container DSN dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events" # https://docs.nats.io/ [jetstream] endpoint = "nats://localhost:4222" -stream_name = "CHAIN" -# Duration JetStream should keep the message before remocing it from the persistent store +# Duration JetStream should keep the message before GC persist_duration_hours = 48 # Duration to ignore duplicate transactions (e.g. due to restart) dedup_duration_hours = 6 -# Stream subjects -stream_subjects = [ - "CHAIN.transfer", - "CHAIN.transferFrom", - "CHAIN.mintTo" -] diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 0000000..02c2bc7 --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,17 @@ +package events + +type EventEmitter interface { + Close() + Publish(subject string, dedupId string, eventPayload interface{}) error +} + +type MinimalTxInfo struct { + Block uint64 `json:"block"` + From string `json:"from"` + To string `json:"to"` + ContractAddress string `json:"contractAddress"` + Success bool `json:"success"` + TxHash string `json:"transactionHash"` + TxIndex uint `json:"transactionIndex"` + Value uint64 `json:"value"` +} diff --git a/internal/events/jetstream.go b/internal/events/jetstream.go new file mode 100644 index 0000000..31bbd40 --- /dev/null +++ b/internal/events/jetstream.go @@ -0,0 +1,78 @@ +package events + +import ( + "time" + + "github.com/goccy/go-json" + "github.com/nats-io/nats.go" +) + +const ( + StreamName string = "CHAIN" + StreamSubjects string = "CHAIN.*" +) + +type JetStreamOpts struct { + ServerUrl string + PersistDuration time.Duration + DedupDuration time.Duration +} + +type JetStream struct { + jsCtx nats.JetStreamContext + nc *nats.Conn +} + +func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) { + natsConn, err := nats.Connect(o.ServerUrl) + if err != nil { + return nil, err + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, err + } + + // Bootstrap stream if it doesn't exist. + stream, _ := js.StreamInfo(StreamName) + if stream == nil { + _, err = js.AddStream(&nats.StreamConfig{ + Name: StreamName, + MaxAge: o.PersistDuration, + Storage: nats.FileStorage, + Subjects: []string{StreamSubjects}, + Duplicates: o.DedupDuration, + }) + if err != nil { + return nil, err + } + } + + return &JetStream{ + jsCtx: js, + nc: natsConn, + }, nil +} + +// Close gracefully shutdowns the JetStream connection. +func (js *JetStream) Close() { + if js.nc != nil { + js.nc.Close() + } +} + +// Publish publishes the JSON data to the NATS stream. +func (js *JetStream) Publish(subject string, dedupId string, eventPayload interface{}) error { + jsonData, err := json.Marshal(eventPayload) + if err != nil { + return err + } + + _, err = js.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId)) + if err != nil { + return err + } + + return nil +} diff --git a/internal/filter/address_filter.go b/internal/filter/address_filter.go index 12efe72..00f4d74 100644 --- a/internal/filter/address_filter.go +++ b/internal/filter/address_filter.go @@ -9,23 +9,30 @@ import ( ) type AddressFilterOpts struct { - Cache *sync.Map - Logg logf.Logger + Cache *sync.Map + Logg logf.Logger + SystemAddress string } type AddressFilter struct { - cache *sync.Map - logg logf.Logger + cache *sync.Map + logg logf.Logger + systemAddress string } func NewAddressFilter(o AddressFilterOpts) Filter { return &AddressFilter{ - cache: o.Cache, - logg: o.Logg, + cache: o.Cache, + logg: o.Logg, + systemAddress: o.SystemAddress, } } func (f *AddressFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { + if transaction.From.Address == f.systemAddress { + return true, nil + } + if _, found := f.cache.Load(transaction.To.Address); found { return true, nil } diff --git a/internal/filter/address_filter_test.go b/internal/filter/address_filter_test.go index 772ce98..f7a6a16 100644 --- a/internal/filter/address_filter_test.go +++ b/internal/filter/address_filter_test.go @@ -7,7 +7,6 @@ import ( "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/stretchr/testify/suite" - "github.com/zerodha/logf" ) type AddressFilterSuite struct { @@ -20,15 +19,8 @@ func (s *AddressFilterSuite) SetupSuite() { addressCache.Store("0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9f", "") - logg := logf.New( - logf.Opts{ - Level: logf.DebugLevel, - }, - ) - s.filter = NewAddressFilter(AddressFilterOpts{ Cache: addressCache, - Logg: logg, }) } diff --git a/internal/filter/gas_filter.go b/internal/filter/gas_filter.go index 30f0bfe..e164d52 100644 --- a/internal/filter/gas_filter.go +++ b/internal/filter/gas_filter.go @@ -2,78 +2,67 @@ package filter import ( "context" - "encoding/json" - "github.com/celo-org/celo-blockchain/common" + "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/grassrootseconomics/cic-chain-events/internal/events" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" - "github.com/grassrootseconomics/w3-celo-patch" - "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) -var ( - giveToSig = w3.MustNewFunc("giveTo(address)", "uint256") +const ( + gasFilterEventSubject = "CHAIN.gas" ) type GasFilterOpts struct { - Logg logf.Logger - JSCtx nats.JetStreamContext + EventEmitter events.EventEmitter + Logg logf.Logger + SystemAddress string } type GasFilter struct { - logg logf.Logger - js nats.JetStreamContext -} - -type minimalGasGiftTxInfo struct { - Block uint64 `json:"block"` - Success bool `json:"success"` - To string `json:"to"` - TxHash string `json:"transactionHash"` - TxIndex uint `json:"transactionIndex"` + eventEmitter events.EventEmitter + logg logf.Logger + systemAddress string } func NewGasFilter(o GasFilterOpts) Filter { return &GasFilter{ - logg: o.Logg, - js: o.JSCtx, + eventEmitter: o.EventEmitter, + logg: o.Logg, + systemAddress: o.SystemAddress, } } func (f *GasFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { - switch transaction.InputData[:10] { - case "0x63e4bff4": - var ( - to common.Address - ) + transferValue, err := hexutil.DecodeUint64(transaction.Value) + if err != nil { + return false, err + } - if err := giveToSig.DecodeArgs(w3.B(transaction.InputData), &to); err != nil { - return false, err - } - - transferEvent := &minimalGasGiftTxInfo{ + // TODO: This is a temporary shortcut to gift gas. Switch to gas faucet contract. + if transaction.From.Address == f.systemAddress && transferValue > 0 { + transferEvent := &events.MinimalTxInfo{ Block: transaction.Block.Number, - To: to.Hex(), + To: transaction.To.Address, TxHash: transaction.Hash, TxIndex: transaction.Index, + Value: transferValue, } if transaction.Status == 1 { transferEvent.Success = true } - json, err := json.Marshal(transferEvent) - if err != nil { - return false, err - } - - _, err = f.js.Publish("CHAIN.gasGiveTo", json, nats.MsgId(transaction.Hash)) - if err != nil { + if err := f.eventEmitter.Publish( + gasFilterEventSubject, + transaction.Hash, + transferEvent, + ); err != nil { return false, err } return true, nil - default: - return false, nil } + + return true, nil } diff --git a/internal/filter/register_filter.go b/internal/filter/register_filter.go new file mode 100644 index 0000000..093dcea --- /dev/null +++ b/internal/filter/register_filter.go @@ -0,0 +1,74 @@ +package filter + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/cic-chain-events/internal/events" + "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/zerodha/logf" +) + +const ( + registerEventSubject = "CHAIN.register" +) + +var ( + addSig = w3.MustNewFunc("add(address)", "bool") +) + +type RegisterFilterOpts struct { + EventEmitter events.EventEmitter + Logg logf.Logger +} + +type RegisterFilter struct { + eventEmitter events.EventEmitter + logg logf.Logger +} + +func NewRegisterFilter(o RegisterFilterOpts) Filter { + return &RegisterFilter{ + eventEmitter: o.EventEmitter, + logg: o.Logg, + } +} + +func (f *RegisterFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { + if len(transaction.InputData) < 10 { + return true, nil + } + + if transaction.InputData[:10] == "0x0a3b0a4f" { + var address common.Address + + if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil { + return false, err + } + + addEvent := &events.MinimalTxInfo{ + Block: transaction.Block.Number, + ContractAddress: transaction.To.Address, + To: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + } + + if transaction.Status == 1 { + addEvent.Success = true + } + + if err := f.eventEmitter.Publish( + registerEventSubject, + transaction.Hash, + addEvent, + ); err != nil { + return false, err + } + + return true, nil + } + + return true, nil +} diff --git a/internal/filter/transfer_filter.go b/internal/filter/transfer_filter.go index d67608c..5bbe30a 100644 --- a/internal/filter/transfer_filter.go +++ b/internal/filter/transfer_filter.go @@ -2,16 +2,19 @@ package filter import ( "context" - "encoding/json" "math/big" "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/cic-chain-events/internal/events" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/w3-celo-patch" - "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) +const ( + transferFilterEventSubject = "CHAIN.transfer" +) + var ( transferSig = w3.MustNewFunc("transfer(address, uint256)", "bool") transferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool") @@ -19,34 +22,27 @@ var ( ) type TransferFilterOpts struct { - Logg logf.Logger - JSCtx nats.JetStreamContext + EventEmitter events.EventEmitter + Logg logf.Logger } type TransferFilter struct { - logg logf.Logger - js nats.JetStreamContext -} - -type minimalTxInfo struct { - Block uint64 `json:"block"` - From string `json:"from"` - Success bool `json:"success"` - To string `json:"to"` - TokenAddress string `json:"tokenAddress"` - TxHash string `json:"transactionHash"` - TxIndex uint `json:"transactionIndex"` - Value uint64 `json:"value"` + eventEmitter events.EventEmitter + logg logf.Logger } func NewTransferFilter(o TransferFilterOpts) Filter { return &TransferFilter{ - logg: o.Logg, - js: o.JSCtx, + eventEmitter: o.EventEmitter, + logg: o.Logg, } } func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transaction) (bool, error) { + if len(transaction.InputData) < 10 { + return true, nil + } + switch transaction.InputData[:10] { case "0xa9059cbb": var ( @@ -58,27 +54,27 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio return false, err } - transferEvent := &minimalTxInfo{ - Block: transaction.Block.Number, - From: transaction.From.Address, - To: to.Hex(), - TokenAddress: transaction.To.Address, - TxHash: transaction.Hash, - TxIndex: transaction.Index, - Value: value.Uint64(), + f.logg.Debug("transfer_filter: new reg", "transfer", to) + + transferEvent := &events.MinimalTxInfo{ + Block: transaction.Block.Number, + From: transaction.From.Address, + To: to.Hex(), + ContractAddress: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: value.Uint64(), } if transaction.Status == 1 { transferEvent.Success = true } - json, err := json.Marshal(transferEvent) - if err != nil { - return false, err - } - - _, err = f.js.Publish("CHAIN.transfer", json, nats.MsgId(transaction.Hash)) - if err != nil { + if err := f.eventEmitter.Publish( + transferFilterEventSubject, + transaction.Hash, + transferEvent, + ); err != nil { return false, err } @@ -94,27 +90,27 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio return false, err } - transferFromEvent := &minimalTxInfo{ - Block: transaction.Block.Number, - From: from.Hex(), - To: to.Hex(), - TokenAddress: transaction.To.Address, - TxHash: transaction.Hash, - TxIndex: transaction.Index, - Value: value.Uint64(), + f.logg.Debug("transfer_filter: new reg", "transferFrom", to) + + transferEvent := &events.MinimalTxInfo{ + Block: transaction.Block.Number, + From: from.Hex(), + To: to.Hex(), + ContractAddress: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: value.Uint64(), } if transaction.Status == 1 { - transferFromEvent.Success = true + transferEvent.Success = true } - json, err := json.Marshal(transferFromEvent) - if err != nil { - return false, err - } - - _, err = f.js.Publish("CHAIN.transferFrom", json, nats.MsgId(transaction.Hash)) - if err != nil { + if err := f.eventEmitter.Publish( + transferFilterEventSubject, + transaction.Hash, + transferEvent, + ); err != nil { return false, err } @@ -129,33 +125,32 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio return false, err } - mintToEvent := &minimalTxInfo{ - Block: transaction.Block.Number, - From: transaction.From.Address, - To: to.Hex(), - TokenAddress: transaction.To.Address, - TxHash: transaction.Hash, - TxIndex: transaction.Index, - Value: value.Uint64(), + f.logg.Debug("transfer_filter: new reg", "mintTo", to) + + transferEvent := &events.MinimalTxInfo{ + Block: transaction.Block.Number, + From: transaction.From.Address, + To: to.Hex(), + ContractAddress: transaction.To.Address, + TxHash: transaction.Hash, + TxIndex: transaction.Index, + Value: value.Uint64(), } if transaction.Status == 1 { - mintToEvent.Success = true + transferEvent.Success = true } - json, err := json.Marshal(mintToEvent) - if err != nil { - return false, err - } - - _, err = f.js.Publish("CHAIN.mintTo", json, nats.MsgId(transaction.Hash)) - if err != nil { + if err := f.eventEmitter.Publish( + transferFilterEventSubject, + transaction.Hash, + transferEvent, + ); err != nil { return false, err } return true, nil default: - // Skip and continue to next filter return true, nil } }