sub: update handlers for latest CHAIN.* events

* Minor fixes and improvements
This commit is contained in:
Mohamed Sohail 2023-04-11 10:18:06 +00:00
parent 21a17d2735
commit b137088d38
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
2 changed files with 39 additions and 24 deletions

View File

@ -4,33 +4,52 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
func (s *Sub) handler(ctx context.Context, msg *nats.Msg) error { type (
ChainEvent struct {
Block uint64 `json:"block"`
From string `json:"from"`
To string `json:"to"`
ContractAddress string `json:"contractAddress"`
Success bool `json:"success"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
Value uint64 `json:"value"`
}
)
func (s *Sub) processEventHandler(ctx context.Context, msg *nats.Msg) error {
var ( var (
chainEvent store.MinimalTxInfo chainEvent ChainEvent
) )
if err := json.Unmarshal(msg.Data, &chainEvent); err != nil { if err := json.Unmarshal(msg.Data, &chainEvent); err != nil {
return err return err
} }
if err := s.cu.PgStore.UpdateOtxStatusFromChainEvent(ctx, chainEvent); err != nil { if err := s.cu.Store.UpdateDispatchStatus(
ctx,
chainEvent.Success,
chainEvent.TxHash,
chainEvent.Block,
); err != nil {
return err return err
} }
if chainEvent.Success {
switch msg.Subject { switch msg.Subject {
case "CHAIN.register": case "CHAIN.register":
if chainEvent.Success { if err := s.cu.Store.ActivateAccount(ctx, chainEvent.To); err != nil {
if err := s.cu.PgStore.ActivateAccount(ctx, chainEvent.To); err != nil {
return err return err
} }
if err := s.cu.Store.ResetGasQuota(ctx, chainEvent.To); err != nil {
return err
} }
case "CHAIN.gas": case "CHAIN.gas":
if chainEvent.Success { if err := s.cu.Store.ResetGasQuota(ctx, chainEvent.To); err != nil {
if err := s.cu.PgStore.ResetGasQuota(ctx, chainEvent.To); err != nil {
return err return err
} }
} }

View File

@ -3,9 +3,9 @@ package sub
import ( import (
"context" "context"
"errors" "errors"
"time"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/pkg/util"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
@ -14,8 +14,6 @@ const (
durableId = "cic-custodial" durableId = "cic-custodial"
pullStream = "CHAIN" pullStream = "CHAIN"
pullSubject = "CHAIN.*" pullSubject = "CHAIN.*"
actionTimeout = 5 * time.Second
waitDelay = 1 * time.Second
) )
type ( type (
@ -67,7 +65,6 @@ func (s *Sub) Process() error {
events, err := natsSub.Fetch(1) events, err := natsSub.Fetch(1)
if err != nil { if err != nil {
if errors.Is(err, nats.ErrTimeout) { if errors.Is(err, nats.ErrTimeout) {
s.logg.Debug("sub: no msg to pull")
continue continue
} else if errors.Is(err, nats.ErrConnectionClosed) { } else if errors.Is(err, nats.ErrConnectionClosed) {
return nil return nil
@ -78,15 +75,14 @@ func (s *Sub) Process() error {
if len(events) > 0 { if len(events) > 0 {
msg := events[0] msg := events[0]
ctx, cancel := context.WithTimeout(context.Background(), actionTimeout) ctx, cancel := context.WithTimeout(context.Background(), util.SLATimeout)
if err := s.handler(ctx, msg); err != nil { if err := s.processEventHandler(ctx, msg); err != nil {
s.logg.Error("sub: handler error", "error", err) s.logg.Error("sub: handler error", "error", err)
msg.Nak() msg.Nak()
} } else {
s.logg.Debug("sub: processed msg", "subject", msg.Subject)
msg.Ack() msg.Ack()
}
cancel() cancel()
} }
} }