feat (wip): add in-built chain subscription

* BREAKING: remove events interface -> use jetstream struct
* in-built chain subscription to update otx status
* potential fix for tasks: defer rollback nonce error detection
This commit is contained in:
Mohamed Sohail 2023-03-01 17:13:23 +00:00
parent 885163e48a
commit 1ddff06502
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
11 changed files with 79 additions and 18 deletions

View File

@ -48,9 +48,9 @@ func initConfig(configFilePath string) *koanf.Koanf {
lo.Fatal("Could not load config file", "error", err) lo.Fatal("Could not load config file", "error", err)
} }
if err := ko.Load(env.Provider("CUSTODIAL_", ".", func(s string) string { if err := ko.Load(env.Provider("", ".", func(s string) string {
return strings.Replace(strings.ToLower( return strings.ReplaceAll(strings.ToLower(
strings.TrimPrefix(s, "")), "_", ".", -1) strings.TrimPrefix(s, "")), "_", ".")
}), nil); err != nil { }), nil); err != nil {
lo.Fatal("Could not override config from env vars", "error", err) lo.Fatal("Could not override config from env vars", "error", err)
} }
@ -178,7 +178,7 @@ func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) sto
} }
// Init JetStream context for tasker events. // Init JetStream context for tasker events.
func initJetStream() (events.EventEmitter, error) { func initJetStream() (*events.JetStream, error) {
jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{ jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{
ServerUrl: ko.MustString("jetstream.endpoint"), ServerUrl: ko.MustString("jetstream.endpoint"),
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour,

View File

@ -12,7 +12,7 @@ import (
type Custodial struct { type Custodial struct {
CeloProvider *celoutils.Provider CeloProvider *celoutils.Provider
EventEmitter events.EventEmitter EventEmitter *events.JetStream
Keystore keystore.Keystore Keystore keystore.Keystore
LockProvider *redislock.Client LockProvider *redislock.Client
Noncestore nonce.Noncestore Noncestore nonce.Noncestore

View File

@ -1,10 +1,5 @@
package events package events
type EventEmitter interface {
Close()
Publish(subject string, dedupId string, eventPayload interface{}) error
}
type EventPayload struct { type EventPayload struct {
OtxId uint `json:"otxId"` OtxId uint `json:"otxId"`
TrackingId string `json:"trackingId"` TrackingId string `json:"trackingId"`

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/zerodha/logf"
) )
const ( const (
@ -22,17 +23,19 @@ const (
) )
type JetStreamOpts struct { type JetStreamOpts struct {
Logg logf.Logger
ServerUrl string ServerUrl string
PersistDuration time.Duration PersistDuration time.Duration
DedupDuration time.Duration DedupDuration time.Duration
} }
type JetStream struct { type JetStream struct {
jsCtx nats.JetStreamContext logg logf.Logger
nc *nats.Conn jsCtx nats.JetStreamContext
natsConn *nats.Conn
} }
func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) { func NewJetStreamEventEmitter(o JetStreamOpts) (*JetStream, error) {
natsConn, err := nats.Connect(o.ServerUrl) natsConn, err := nats.Connect(o.ServerUrl)
if err != nil { if err != nil {
return nil, err return nil, err
@ -59,15 +62,15 @@ func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) {
} }
return &JetStream{ return &JetStream{
jsCtx: js, jsCtx: js,
nc: natsConn, natsConn: natsConn,
}, nil }, nil
} }
// Close gracefully shutdowns the JetStream connection. // Close gracefully shutdowns the JetStream connection.
func (js *JetStream) Close() { func (js *JetStream) Close() {
if js.nc != nil { if js.natsConn != nil {
js.nc.Close() js.natsConn.Close()
} }
} }

View File

@ -0,0 +1,54 @@
package events
import (
"context"
"errors"
"time"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/nats-io/nats.go"
)
const (
backOffTimer = 2 * time.Second
durableId = "cic-custodial"
pullStream = "CHAIN"
pullSubject = "CHAIN.*"
)
func (js *JetStream) ChainSubscription(ctx context.Context, store store.Store) error {
subOpts := []nats.SubOpt{
nats.ManualAck(),
nats.Bind(pullStream, durableId),
}
natsSub, err := js.jsCtx.PullSubscribe(pullSubject, durableId, subOpts...)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
js.logg.Info("jetstream chain sub: shutdown signal received")
return nil
default:
events, err := natsSub.Fetch(1)
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
// Supressed retry
js.logg.Error("jetstream chain sub: fetch NATS timeout", "error", err)
time.Sleep(backOffTimer)
continue
} else {
js.logg.Error("jetstream chain sub: fetch other error", "error", err)
}
}
if len(events) > 0 {
// TODO: Unmarshal
// TODO: UpdateOtxStatus
}
}
}
}

View File

@ -57,3 +57,7 @@ func (s *PostgresStore) GetTxStatusByTrackingId(ctx context.Context, trackingId
return txs, nil return txs, nil
} }
func (s *PostgresStore) UpdateOtxStatus(ctx context.Context, status string) error {
return nil
}

View File

@ -19,6 +19,7 @@ import (
func AccountGiftGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { func AccountGiftGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error {
var ( var (
err error
payload AccountPayload payload AccountPayload
) )

View File

@ -18,6 +18,7 @@ import (
func GiftVoucherProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { func GiftVoucherProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error {
var ( var (
err error
payload AccountPayload payload AccountPayload
) )

View File

@ -21,8 +21,9 @@ import (
func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error {
var ( var (
payload AccountPayload
balance big.Int balance big.Int
err error
payload AccountPayload
) )
if err := json.Unmarshal(t.Payload(), &payload); err != nil { if err := json.Unmarshal(t.Payload(), &payload); err != nil {

View File

@ -19,6 +19,7 @@ import (
func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error {
var ( var (
err error
payload AccountPayload payload AccountPayload
) )

View File

@ -37,6 +37,7 @@ type (
func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error {
var ( var (
err error
payload TransferPayload payload TransferPayload
) )