From 1ddff06502b6c97e257c2c27a0d0eea7a6abe662 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Wed, 1 Mar 2023 17:13:23 +0000 Subject: [PATCH] feat (wip): add in-built chain subscription * BREAKING: remove events interface -> use jetstream struct * in-built chain subscription to update otx status * potential fix for tasks: defer rollback nonce error detection --- cmd/service/init.go | 8 +-- internal/custodial/custodial.go | 2 +- internal/events/events.go | 5 -- internal/events/jetstream.go | 17 +++--- internal/events/jetstream_sub.go | 54 +++++++++++++++++++ internal/store/otx.go | 4 ++ internal/tasker/task/account_gift_gas.go | 1 + internal/tasker/task/account_gift_voucher.go | 1 + internal/tasker/task/account_refill_gas.go | 3 +- .../tasker/task/account_register_onchain.go | 1 + internal/tasker/task/sign_transfer.go | 1 + 11 files changed, 79 insertions(+), 18 deletions(-) create mode 100644 internal/events/jetstream_sub.go diff --git a/cmd/service/init.go b/cmd/service/init.go index 5a31347..18707b5 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -48,9 +48,9 @@ func initConfig(configFilePath string) *koanf.Koanf { lo.Fatal("Could not load config file", "error", err) } - if err := ko.Load(env.Provider("CUSTODIAL_", ".", func(s string) string { - return strings.Replace(strings.ToLower( - strings.TrimPrefix(s, "")), "_", ".", -1) + if err := ko.Load(env.Provider("", ".", func(s string) string { + return strings.ReplaceAll(strings.ToLower( + strings.TrimPrefix(s, "")), "_", ".") }), nil); err != nil { lo.Fatal("Could not override config from env vars", "error", err) } @@ -178,7 +178,7 @@ func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) sto } // Init JetStream context for tasker events. -func initJetStream() (events.EventEmitter, error) { +func initJetStream() (*events.JetStream, error) { jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{ ServerUrl: ko.MustString("jetstream.endpoint"), PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, diff --git a/internal/custodial/custodial.go b/internal/custodial/custodial.go index 35a2c81..4e44b35 100644 --- a/internal/custodial/custodial.go +++ b/internal/custodial/custodial.go @@ -12,7 +12,7 @@ import ( type Custodial struct { CeloProvider *celoutils.Provider - EventEmitter events.EventEmitter + EventEmitter *events.JetStream Keystore keystore.Keystore LockProvider *redislock.Client Noncestore nonce.Noncestore diff --git a/internal/events/events.go b/internal/events/events.go index 83a16ea..e51f73c 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -1,10 +1,5 @@ package events -type EventEmitter interface { - Close() - Publish(subject string, dedupId string, eventPayload interface{}) error -} - type EventPayload struct { OtxId uint `json:"otxId"` TrackingId string `json:"trackingId"` diff --git a/internal/events/jetstream.go b/internal/events/jetstream.go index f258274..0348e5b 100644 --- a/internal/events/jetstream.go +++ b/internal/events/jetstream.go @@ -5,6 +5,7 @@ import ( "time" "github.com/nats-io/nats.go" + "github.com/zerodha/logf" ) const ( @@ -22,17 +23,19 @@ const ( ) type JetStreamOpts struct { + Logg logf.Logger ServerUrl string PersistDuration time.Duration DedupDuration time.Duration } type JetStream struct { - jsCtx nats.JetStreamContext - nc *nats.Conn + logg logf.Logger + jsCtx nats.JetStreamContext + natsConn *nats.Conn } -func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) { +func NewJetStreamEventEmitter(o JetStreamOpts) (*JetStream, error) { natsConn, err := nats.Connect(o.ServerUrl) if err != nil { return nil, err @@ -59,15 +62,15 @@ func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) { } return &JetStream{ - jsCtx: js, - nc: natsConn, + jsCtx: js, + natsConn: natsConn, }, nil } // Close gracefully shutdowns the JetStream connection. func (js *JetStream) Close() { - if js.nc != nil { - js.nc.Close() + if js.natsConn != nil { + js.natsConn.Close() } } diff --git a/internal/events/jetstream_sub.go b/internal/events/jetstream_sub.go new file mode 100644 index 0000000..0bc9784 --- /dev/null +++ b/internal/events/jetstream_sub.go @@ -0,0 +1,54 @@ +package events + +import ( + "context" + "errors" + "time" + + "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/nats-io/nats.go" +) + +const ( + backOffTimer = 2 * time.Second + durableId = "cic-custodial" + pullStream = "CHAIN" + pullSubject = "CHAIN.*" +) + +func (js *JetStream) ChainSubscription(ctx context.Context, store store.Store) error { + subOpts := []nats.SubOpt{ + nats.ManualAck(), + nats.Bind(pullStream, durableId), + } + + natsSub, err := js.jsCtx.PullSubscribe(pullSubject, durableId, subOpts...) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + js.logg.Info("jetstream chain sub: shutdown signal received") + return nil + default: + events, err := natsSub.Fetch(1) + if err != nil { + if errors.Is(err, nats.ErrTimeout) { + // Supressed retry + js.logg.Error("jetstream chain sub: fetch NATS timeout", "error", err) + time.Sleep(backOffTimer) + continue + } else { + js.logg.Error("jetstream chain sub: fetch other error", "error", err) + } + } + if len(events) > 0 { + // TODO: Unmarshal + // TODO: UpdateOtxStatus + } + } + + } +} diff --git a/internal/store/otx.go b/internal/store/otx.go index 7d0ac7a..a985702 100644 --- a/internal/store/otx.go +++ b/internal/store/otx.go @@ -57,3 +57,7 @@ func (s *PostgresStore) GetTxStatusByTrackingId(ctx context.Context, trackingId return txs, nil } + +func (s *PostgresStore) UpdateOtxStatus(ctx context.Context, status string) error { + return nil +} diff --git a/internal/tasker/task/account_gift_gas.go b/internal/tasker/task/account_gift_gas.go index 1fc7239..42a9aab 100644 --- a/internal/tasker/task/account_gift_gas.go +++ b/internal/tasker/task/account_gift_gas.go @@ -19,6 +19,7 @@ import ( func AccountGiftGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { var ( + err error payload AccountPayload ) diff --git a/internal/tasker/task/account_gift_voucher.go b/internal/tasker/task/account_gift_voucher.go index cca5a01..f07b02e 100644 --- a/internal/tasker/task/account_gift_voucher.go +++ b/internal/tasker/task/account_gift_voucher.go @@ -18,6 +18,7 @@ import ( func GiftVoucherProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { var ( + err error payload AccountPayload ) diff --git a/internal/tasker/task/account_refill_gas.go b/internal/tasker/task/account_refill_gas.go index 5276d9a..576ddf3 100644 --- a/internal/tasker/task/account_refill_gas.go +++ b/internal/tasker/task/account_refill_gas.go @@ -21,8 +21,9 @@ import ( func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { var ( - payload AccountPayload balance big.Int + err error + payload AccountPayload ) if err := json.Unmarshal(t.Payload(), &payload); err != nil { diff --git a/internal/tasker/task/account_register_onchain.go b/internal/tasker/task/account_register_onchain.go index 883bf25..bdf7f17 100644 --- a/internal/tasker/task/account_register_onchain.go +++ b/internal/tasker/task/account_register_onchain.go @@ -19,6 +19,7 @@ import ( func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { var ( + err error payload AccountPayload ) diff --git a/internal/tasker/task/sign_transfer.go b/internal/tasker/task/sign_transfer.go index b90488e..29a2583 100644 --- a/internal/tasker/task/sign_transfer.go +++ b/internal/tasker/task/sign_transfer.go @@ -37,6 +37,7 @@ type ( func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { var ( + err error payload TransferPayload )