From ad5ff05a3277f6be8c644e6d638c6e4b66564023 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Sat, 14 Jan 2023 10:17:34 +0000 Subject: [PATCH] feat: add jetstream integration * closes #11 --- Makefile | 9 ++++++++- cmd/filters.go | 23 +++++++++++++++++++++-- cmd/init.go | 32 ++++++++++++++++++++++++++++++++ config.toml | 19 +++++++++++++++++-- go.mod | 3 +++ go.sum | 7 +++++++ pkg/filter/address_filter.go | 7 ++----- pkg/filter/decode_filter.go | 24 ++++++++++++++++++++---- 8 files changed, 110 insertions(+), 14 deletions(-) diff --git a/Makefile b/Makefile index 365da5f..8a7227e 100644 --- a/Makefile +++ b/Makefile @@ -14,4 +14,11 @@ mod: go mod verify test-pkg: - TEST_GRAPHQL_ENDPOINT=https://rpc.alfajores.celo.grassecon.net/graphql go test -v -covermode atomic -coverprofile=covprofile ./pkg/... \ No newline at end of file + TEST_GRAPHQL_ENDPOINT=https://rpc.alfajores.celo.grassecon.net/graphql go test -v -covermode atomic -coverprofile=covprofile ./pkg/... + +migrate: + tern migrate -c migrations/tern.conf + +docker-clean: + docker-compose down + docker volume rm cic-chain-events_cic-indexer-pg cic-chain-events_cic-indexer-nats diff --git a/cmd/filters.go b/cmd/filters.go index 1439c2f..db6aa59 100644 --- a/cmd/filters.go +++ b/cmd/filters.go @@ -1,17 +1,36 @@ package main import ( + "strings" + "sync" + "github.com/grassrootseconomics/cic-chain-events/pkg/filter" ) func initAddressFilter() filter.Filter { + // TODO: Bootstrap addresses from smart contract + // TODO: Add route to update cache + cache := &sync.Map{} + + // Example bootstrap addresses + cache.Store(strings.ToLower("0x617f3112bf5397D0467D315cC709EF968D9ba546"), "USDT") + cache.Store(strings.ToLower("0x765DE816845861e75A25fCA122bb6898B8B1282a"), "cUSD") + cache.Store(strings.ToLower("0xD8763CBa276a3738E6DE85b4b3bF5FDed6D6cA73"), "cEUR") + return filter.NewAddressFilter(filter.AddressFilterOpts{ - Logg: lo, + Cache: cache, + Logg: lo, }) } func initDecodeFilter() filter.Filter { + js, err := initJetStream() + if err != nil { + lo.Fatal("filters: critical error loading jetstream", "error", err) + } + return filter.NewDecodeFilter(filter.DecodeFilterOpts{ - Logg: lo, + Logg: lo, + JSCtx: js, }) } diff --git a/cmd/init.go b/cmd/init.go index 4f03cc7..1a03296 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -3,6 +3,7 @@ package main import ( "context" "strings" + "time" "github.com/alitto/pond" "github.com/grassrootseconomics/cic-chain-events/internal/pool" @@ -14,6 +15,7 @@ import ( "github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/file" + "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) @@ -84,3 +86,33 @@ func initFetcher() fetch.Fetch { GraphqlEndpoint: ko.MustString("chain.graphql_endpoint"), }) } + +func initJetStream() (nats.JetStreamContext, error) { + natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint")) + if err != nil { + return nil, err + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, err + } + + // Bootstrap stream if it does not exist + stream, _ := js.StreamInfo(ko.MustString("jetstream.stream_name")) + if stream == nil { + lo.Info("jetstream: bootstrapping stream") + _, err = js.AddStream(&nats.StreamConfig{ + Name: ko.MustString("jetstream.stream_name"), + MaxAge: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour, + Storage: nats.FileStorage, + Subjects: ko.MustStrings("jetstream.stream_subjects"), + Duplicates: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour, + }) + if err != nil { + return nil, err + } + } + + return js, nil +} diff --git a/config.toml b/config.toml index 72ef033..7de922c 100644 --- a/config.toml +++ b/config.toml @@ -23,11 +23,26 @@ head_block_lag = 5 # Max idle time after which goroutine is returned back to the pool idle_worker_timeout = 1 # Syncer start block -initial_lower_bound = 17034445 +initial_lower_bound = 17204504 # Max blocks in worker queue awaiting processing queue_size = 500 # Janitor sweep interval, should take into account concurrency and queue_size sweep_interval = 10 [postgres] -dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events" \ No newline at end of file +dsn = "postgres://postgres:postgres@localhost:5432/cic_chain_events" + +# https://docs.nats.io/ +[jetstream] +endpoint = "nats://localhost:4222" +stream_name = "CHAIN" +# Duration JetStream should keep the message before GC +persist_duration_hours = 48 +# Duration to ignore duplicate transactions (e.g. due to restart) +dedup_duration_hours = 6 +# Stream subjects +stream_subjects = [ + "CHAIN.transfer", + "CHAIN.transferFrom", + "CHAIN.mintTo" +] \ No newline at end of file diff --git a/go.mod b/go.mod index 9b13901..60f6b0f 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,9 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/nats-io/nats.go v1.22.1 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/onsi/gomega v1.10.1 // indirect github.com/pelletier/go-toml v1.7.0 // indirect diff --git a/go.sum b/go.sum index c1649e4..6f6171c 100644 --- a/go.sum +++ b/go.sum @@ -443,6 +443,12 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= +github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE= +github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -597,6 +603,7 @@ golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= diff --git a/pkg/filter/address_filter.go b/pkg/filter/address_filter.go index 96ec2c3..12efe72 100644 --- a/pkg/filter/address_filter.go +++ b/pkg/filter/address_filter.go @@ -4,15 +4,13 @@ import ( "context" "sync" - celo "github.com/grassrootseconomics/cic-celo-sdk" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/zerodha/logf" ) type AddressFilterOpts struct { - Cache *sync.Map - CeloProvider *celo.Provider - Logg logf.Logger + Cache *sync.Map + Logg logf.Logger } type AddressFilter struct { @@ -21,7 +19,6 @@ type AddressFilter struct { } func NewAddressFilter(o AddressFilterOpts) Filter { - // TODO: Bootstrap addresses from registry smart contract return &AddressFilter{ cache: o.Cache, logg: o.Logg, diff --git a/pkg/filter/decode_filter.go b/pkg/filter/decode_filter.go index 6d4f529..9324666 100644 --- a/pkg/filter/decode_filter.go +++ b/pkg/filter/decode_filter.go @@ -7,6 +7,7 @@ import ( "github.com/celo-org/celo-blockchain/common" "github.com/grassrootseconomics/cic-chain-events/pkg/fetch" "github.com/grassrootseconomics/w3-celo-patch" + "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) @@ -17,16 +18,19 @@ var ( ) type DecodeFilterOpts struct { - Logg logf.Logger + Logg logf.Logger + JSCtx nats.JetStreamContext } type DecodeFilter struct { logg logf.Logger + js nats.JetStreamContext } func NewDecodeFilter(o DecodeFilterOpts) Filter { return &DecodeFilter{ logg: o.Logg, + js: o.JSCtx, } } @@ -42,7 +46,11 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) return false, err } - f.logg.Debug("transfer", "to", to.Hex(), "value", value.String()) + _, err := f.js.Publish("CHAIN.transfer", []byte(transaction.Hash), nats.MsgId(transaction.Hash)) + if err != nil { + return false, err + } + return true, nil case "0x23b872dd": var ( @@ -55,7 +63,11 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) return false, err } - f.logg.Debug("transferFrom", "from", from.Hex(), "to", to.Hex(), "value", value.String()) + _, err := f.js.Publish("CHAIN.transferFrom", []byte(transaction.Hash), nats.MsgId(transaction.Hash)) + if err != nil { + return false, err + } + return true, nil case "0x449a52f8": var ( @@ -67,7 +79,11 @@ func (f *DecodeFilter) Execute(_ context.Context, transaction fetch.Transaction) return false, err } - f.logg.Debug("mintTo", "to", to.Hex(), "value", value.String()) + _, err := f.js.Publish("CHAIN.mintTo", []byte(transaction.Hash), nats.MsgId(transaction.Hash)) + if err != nil { + return false, err + } + return true, nil default: f.logg.Debug("unknownSignature", "inpuData", transaction.InputData)