From a1b6cb08d818297cdb97995eb06c896a072446c5 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 2 Mar 2023 09:12:39 +0000 Subject: [PATCH] add: wip jetstream durable consumer --- cmd/service/init.go | 7 ++++-- cmd/service/main.go | 9 +++++++ config.toml | 1 + internal/events/jetstream.go | 5 ++-- internal/events/jetstream_sub.go | 43 +++++++++++++++++++++++--------- internal/store/dispatch.go | 16 ------------ internal/store/otx.go | 21 +++++++++++++++- internal/store/store.go | 12 ++++++++- 8 files changed, 80 insertions(+), 34 deletions(-) diff --git a/cmd/service/init.go b/cmd/service/init.go index 18707b5..ed550e2 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -48,13 +48,15 @@ func initConfig(configFilePath string) *koanf.Koanf { lo.Fatal("Could not load config file", "error", err) } - if err := ko.Load(env.Provider("", ".", func(s string) string { + if err := ko.Load(env.Provider("CUSTODIAL_", ".", func(s string) string { return strings.ReplaceAll(strings.ToLower( - strings.TrimPrefix(s, "")), "_", ".") + strings.TrimPrefix(s, "CUSTODIAL_")), "__", ".") }), nil); err != nil { lo.Fatal("Could not override config from env vars", "error", err) } + ko.Print() + return ko } @@ -180,6 +182,7 @@ func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) sto // Init JetStream context for tasker events. func initJetStream() (*events.JetStream, error) { jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{ + Logg: lo, ServerUrl: ko.MustString("jetstream.endpoint"), PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour, diff --git a/cmd/service/main.go b/cmd/service/main.go index de083ad..ee586b6 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -128,6 +128,15 @@ func main() { } }() + wg.Add(1) + go func() { + defer wg.Done() + lo.Info("Starting jetstream subscriber") + if err := jsEventEmitter.ChainSubscription(ctx, pgStore); err != nil { + lo.Fatal("main: jetstream subscriber", "err", err) + } + }() + <-ctx.Done() lo.Info("main: stopping tasker") diff --git a/config.toml b/config.toml index db0258b..41655c6 100644 --- a/config.toml +++ b/config.toml @@ -7,6 +7,7 @@ metrics = true [chain] rpc_endpoint = "" testnet = true +devnet = false [system] # System default values diff --git a/internal/events/jetstream.go b/internal/events/jetstream.go index 0348e5b..e6afbad 100644 --- a/internal/events/jetstream.go +++ b/internal/events/jetstream.go @@ -23,14 +23,14 @@ const ( ) type JetStreamOpts struct { - Logg logf.Logger + Logg logf.Logger ServerUrl string PersistDuration time.Duration DedupDuration time.Duration } type JetStream struct { - logg logf.Logger + logg logf.Logger jsCtx nats.JetStreamContext natsConn *nats.Conn } @@ -62,6 +62,7 @@ func NewJetStreamEventEmitter(o JetStreamOpts) (*JetStream, error) { } return &JetStream{ + logg: o.Logg, jsCtx: js, natsConn: natsConn, }, nil diff --git a/internal/events/jetstream_sub.go b/internal/events/jetstream_sub.go index 0bc9784..3af5405 100644 --- a/internal/events/jetstream_sub.go +++ b/internal/events/jetstream_sub.go @@ -2,21 +2,29 @@ package events import ( "context" + "encoding/json" "errors" - "time" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/nats-io/nats.go" ) const ( - backOffTimer = 2 * time.Second - durableId = "cic-custodial" - pullStream = "CHAIN" - pullSubject = "CHAIN.*" + durableId = "cic-custodial" + pullStream = "CHAIN" + pullSubject = "CHAIN.*" ) -func (js *JetStream) ChainSubscription(ctx context.Context, store store.Store) error { +func (js *JetStream) ChainSubscription(ctx context.Context, pgStore store.Store) error { + _, err := js.jsCtx.AddConsumer(pullStream, &nats.ConsumerConfig{ + Durable: durableId, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: pullSubject, + }) + if err != nil { + return err + } + subOpts := []nats.SubOpt{ nats.ManualAck(), nats.Bind(pullStream, durableId), @@ -31,23 +39,34 @@ func (js *JetStream) ChainSubscription(ctx context.Context, store store.Store) e select { case <-ctx.Done(): js.logg.Info("jetstream chain sub: shutdown signal received") + js.Close() return nil default: events, err := natsSub.Fetch(1) if err != nil { if errors.Is(err, nats.ErrTimeout) { - // Supressed retry - js.logg.Error("jetstream chain sub: fetch NATS timeout", "error", err) - time.Sleep(backOffTimer) continue } else { js.logg.Error("jetstream chain sub: fetch other error", "error", err) } } - if len(events) > 0 { - // TODO: Unmarshal - // TODO: UpdateOtxStatus + if len(events) == 0 { + continue } + var ( + chainEvent store.MinimalTxInfo + ) + + if err := json.Unmarshal(events[0].Data, &chainEvent); err != nil { + js.logg.Error("jetstream chain sub: json unmarshal fail", "error", err) + } + + if err := pgStore.UpdateOtxStatusFromChainEvent(context.Background(), chainEvent); err != nil { + events[0].Nak() + js.logg.Error("jetstream chain sub: otx marker failed to update state", "error", err) + } + events[0].Ack() + js.logg.Debug("jetstream chain sub: successfully updated status", "tx", chainEvent.TxHash) } } diff --git a/internal/store/dispatch.go b/internal/store/dispatch.go index 6d36bea..cfb21e3 100644 --- a/internal/store/dispatch.go +++ b/internal/store/dispatch.go @@ -2,8 +2,6 @@ package store import ( "context" - - "github.com/grassrootseconomics/cic-custodial/pkg/enum" ) func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error { @@ -18,17 +16,3 @@ func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch Dispa return nil } - -func (s *PostgresStore) UpdateChainStatus(ctx context.Context, txHash string, status enum.OtxStatus, block uint64) error { - if _, err := s.db.Exec( - ctx, - s.queries.UpdateChainStatus, - txHash, - status, - block, - ); err != nil { - return err - } - - return nil -} diff --git a/internal/store/otx.go b/internal/store/otx.go index a985702..00e0b2a 100644 --- a/internal/store/otx.go +++ b/internal/store/otx.go @@ -5,6 +5,7 @@ import ( "time" "github.com/georgysavva/scany/v2/pgxscan" + "github.com/grassrootseconomics/cic-custodial/pkg/enum" ) type TxStatus struct { @@ -58,6 +59,24 @@ func (s *PostgresStore) GetTxStatusByTrackingId(ctx context.Context, trackingId return txs, nil } -func (s *PostgresStore) UpdateOtxStatus(ctx context.Context, status string) error { +func (s *PostgresStore) UpdateOtxStatusFromChainEvent(ctx context.Context, chainEvent MinimalTxInfo) error { + var ( + status = enum.SUCCESS + ) + + if !chainEvent.Success { + status = enum.REVERTED + } + + if _, err := s.db.Exec( + ctx, + s.queries.UpdateChainStatus, + chainEvent.TxHash, + status, + chainEvent.Block, + ); err != nil { + return err + } + return nil } diff --git a/internal/store/store.go b/internal/store/store.go index ee369e0..efba0e7 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -7,6 +7,16 @@ import ( ) type ( + MinimalTxInfo struct { + Block uint64 `json:"block"` + From string `json:"from"` + To string `json:"to"` + ContractAddress string `json:"contractAddress"` + Success bool `json:"success"` + TxHash string `json:"transactionHash"` + TxIndex uint `json:"transactionIndex"` + Value uint64 `json:"value"` + } OTX struct { TrackingId string Type enum.OtxType @@ -29,6 +39,6 @@ type ( CreateOtx(ctx context.Context, otx OTX) (id uint, err error) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error GetTxStatusByTrackingId(ctx context.Context, trackingId string) ([]*TxStatus, error) - UpdateChainStatus(ctx context.Context, txHash string, status enum.OtxStatus, block uint64) error + UpdateOtxStatusFromChainEvent(ctx context.Context, chainEvent MinimalTxInfo) error } )