add: wip jetstream durable consumer

This commit is contained in:
Mohamed Sohail 2023-03-02 09:12:39 +00:00
parent 1ddff06502
commit a1b6cb08d8
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
8 changed files with 80 additions and 34 deletions

View File

@ -48,13 +48,15 @@ func initConfig(configFilePath string) *koanf.Koanf {
lo.Fatal("Could not load config file", "error", err)
}
if err := ko.Load(env.Provider("", ".", func(s string) string {
if err := ko.Load(env.Provider("CUSTODIAL_", ".", func(s string) string {
return strings.ReplaceAll(strings.ToLower(
strings.TrimPrefix(s, "")), "_", ".")
strings.TrimPrefix(s, "CUSTODIAL_")), "__", ".")
}), nil); err != nil {
lo.Fatal("Could not override config from env vars", "error", err)
}
ko.Print()
return ko
}
@ -180,6 +182,7 @@ func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) sto
// Init JetStream context for tasker events.
func initJetStream() (*events.JetStream, error) {
jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{
Logg: lo,
ServerUrl: ko.MustString("jetstream.endpoint"),
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour,
DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour,

View File

@ -128,6 +128,15 @@ func main() {
}
}()
wg.Add(1)
go func() {
defer wg.Done()
lo.Info("Starting jetstream subscriber")
if err := jsEventEmitter.ChainSubscription(ctx, pgStore); err != nil {
lo.Fatal("main: jetstream subscriber", "err", err)
}
}()
<-ctx.Done()
lo.Info("main: stopping tasker")

View File

@ -7,6 +7,7 @@ metrics = true
[chain]
rpc_endpoint = ""
testnet = true
devnet = false
[system]
# System default values

View File

@ -62,6 +62,7 @@ func NewJetStreamEventEmitter(o JetStreamOpts) (*JetStream, error) {
}
return &JetStream{
logg: o.Logg,
jsCtx: js,
natsConn: natsConn,
}, nil

View File

@ -2,21 +2,29 @@ package events
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/nats-io/nats.go"
)
const (
backOffTimer = 2 * time.Second
durableId = "cic-custodial"
pullStream = "CHAIN"
pullSubject = "CHAIN.*"
)
func (js *JetStream) ChainSubscription(ctx context.Context, store store.Store) error {
func (js *JetStream) ChainSubscription(ctx context.Context, pgStore store.Store) error {
_, err := js.jsCtx.AddConsumer(pullStream, &nats.ConsumerConfig{
Durable: durableId,
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: pullSubject,
})
if err != nil {
return err
}
subOpts := []nats.SubOpt{
nats.ManualAck(),
nats.Bind(pullStream, durableId),
@ -31,23 +39,34 @@ func (js *JetStream) ChainSubscription(ctx context.Context, store store.Store) e
select {
case <-ctx.Done():
js.logg.Info("jetstream chain sub: shutdown signal received")
js.Close()
return nil
default:
events, err := natsSub.Fetch(1)
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
// Supressed retry
js.logg.Error("jetstream chain sub: fetch NATS timeout", "error", err)
time.Sleep(backOffTimer)
continue
} else {
js.logg.Error("jetstream chain sub: fetch other error", "error", err)
}
}
if len(events) > 0 {
// TODO: Unmarshal
// TODO: UpdateOtxStatus
if len(events) == 0 {
continue
}
var (
chainEvent store.MinimalTxInfo
)
if err := json.Unmarshal(events[0].Data, &chainEvent); err != nil {
js.logg.Error("jetstream chain sub: json unmarshal fail", "error", err)
}
if err := pgStore.UpdateOtxStatusFromChainEvent(context.Background(), chainEvent); err != nil {
events[0].Nak()
js.logg.Error("jetstream chain sub: otx marker failed to update state", "error", err)
}
events[0].Ack()
js.logg.Debug("jetstream chain sub: successfully updated status", "tx", chainEvent.TxHash)
}
}

View File

@ -2,8 +2,6 @@ package store
import (
"context"
"github.com/grassrootseconomics/cic-custodial/pkg/enum"
)
func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error {
@ -18,17 +16,3 @@ func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch Dispa
return nil
}
func (s *PostgresStore) UpdateChainStatus(ctx context.Context, txHash string, status enum.OtxStatus, block uint64) error {
if _, err := s.db.Exec(
ctx,
s.queries.UpdateChainStatus,
txHash,
status,
block,
); err != nil {
return err
}
return nil
}

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/georgysavva/scany/v2/pgxscan"
"github.com/grassrootseconomics/cic-custodial/pkg/enum"
)
type TxStatus struct {
@ -58,6 +59,24 @@ func (s *PostgresStore) GetTxStatusByTrackingId(ctx context.Context, trackingId
return txs, nil
}
func (s *PostgresStore) UpdateOtxStatus(ctx context.Context, status string) error {
func (s *PostgresStore) UpdateOtxStatusFromChainEvent(ctx context.Context, chainEvent MinimalTxInfo) error {
var (
status = enum.SUCCESS
)
if !chainEvent.Success {
status = enum.REVERTED
}
if _, err := s.db.Exec(
ctx,
s.queries.UpdateChainStatus,
chainEvent.TxHash,
status,
chainEvent.Block,
); err != nil {
return err
}
return nil
}

View File

@ -7,6 +7,16 @@ import (
)
type (
MinimalTxInfo struct {
Block uint64 `json:"block"`
From string `json:"from"`
To string `json:"to"`
ContractAddress string `json:"contractAddress"`
Success bool `json:"success"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
Value uint64 `json:"value"`
}
OTX struct {
TrackingId string
Type enum.OtxType
@ -29,6 +39,6 @@ type (
CreateOtx(ctx context.Context, otx OTX) (id uint, err error)
CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error
GetTxStatusByTrackingId(ctx context.Context, trackingId string) ([]*TxStatus, error)
UpdateChainStatus(ctx context.Context, txHash string, status enum.OtxStatus, block uint64) error
UpdateOtxStatusFromChainEvent(ctx context.Context, chainEvent MinimalTxInfo) error
}
)