diff --git a/internal/sub/handler.go b/internal/sub/handler.go index a627966..66d5a0b 100644 --- a/internal/sub/handler.go +++ b/internal/sub/handler.go @@ -4,33 +4,52 @@ import ( "context" "encoding/json" - "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/nats-io/nats.go" ) -func (s *Sub) handler(ctx context.Context, msg *nats.Msg) error { +type ( + ChainEvent struct { + Block uint64 `json:"block"` + From string `json:"from"` + To string `json:"to"` + ContractAddress string `json:"contractAddress"` + Success bool `json:"success"` + TxHash string `json:"transactionHash"` + TxIndex uint `json:"transactionIndex"` + Value uint64 `json:"value"` + } +) + +func (s *Sub) processEventHandler(ctx context.Context, msg *nats.Msg) error { var ( - chainEvent store.MinimalTxInfo + chainEvent ChainEvent ) if err := json.Unmarshal(msg.Data, &chainEvent); err != nil { return err } - if err := s.cu.PgStore.UpdateOtxStatusFromChainEvent(ctx, chainEvent); err != nil { + if err := s.cu.Store.UpdateDispatchStatus( + ctx, + chainEvent.Success, + chainEvent.TxHash, + chainEvent.Block, + ); err != nil { return err } - switch msg.Subject { - case "CHAIN.register": - if chainEvent.Success { - if err := s.cu.PgStore.ActivateAccount(ctx, chainEvent.To); err != nil { + if chainEvent.Success { + switch msg.Subject { + case "CHAIN.register": + if err := s.cu.Store.ActivateAccount(ctx, chainEvent.To); err != nil { return err } - } - case "CHAIN.gas": - if chainEvent.Success { - if err := s.cu.PgStore.ResetGasQuota(ctx, chainEvent.To); err != nil { + + if err := s.cu.Store.ResetGasQuota(ctx, chainEvent.To); err != nil { + return err + } + case "CHAIN.gas": + if err := s.cu.Store.ResetGasQuota(ctx, chainEvent.To); err != nil { return err } } diff --git a/internal/sub/js_sub.go b/internal/sub/jetstream.go similarity index 79% rename from internal/sub/js_sub.go rename to internal/sub/jetstream.go index 095a501..b49af01 100644 --- a/internal/sub/js_sub.go +++ b/internal/sub/jetstream.go @@ -3,19 +3,17 @@ package sub import ( "context" "errors" - "time" "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/pkg/util" "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) const ( - durableId = "cic-custodial" - pullStream = "CHAIN" - pullSubject = "CHAIN.*" - actionTimeout = 5 * time.Second - waitDelay = 1 * time.Second + durableId = "cic-custodial" + pullStream = "CHAIN" + pullSubject = "CHAIN.*" ) type ( @@ -67,7 +65,6 @@ func (s *Sub) Process() error { events, err := natsSub.Fetch(1) if err != nil { if errors.Is(err, nats.ErrTimeout) { - s.logg.Debug("sub: no msg to pull") continue } else if errors.Is(err, nats.ErrConnectionClosed) { return nil @@ -78,15 +75,14 @@ func (s *Sub) Process() error { if len(events) > 0 { msg := events[0] - ctx, cancel := context.WithTimeout(context.Background(), actionTimeout) + ctx, cancel := context.WithTimeout(context.Background(), util.SLATimeout) - if err := s.handler(ctx, msg); err != nil { + if err := s.processEventHandler(ctx, msg); err != nil { s.logg.Error("sub: handler error", "error", err) msg.Nak() + } else { + msg.Ack() } - - s.logg.Debug("sub: processed msg", "subject", msg.Subject) - msg.Ack() cancel() } }