diff --git a/cmd/service/custodial.go b/cmd/service/custodial.go index 08e2787..c14ee8f 100644 --- a/cmd/service/custodial.go +++ b/cmd/service/custodial.go @@ -7,8 +7,8 @@ import ( eth_crypto "github.com/celo-org/celo-blockchain/crypto" "github.com/go-redis/redis/v8" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/nonce" - "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/w3-celo-patch" ) @@ -31,12 +31,12 @@ func initAbis() map[string]*w3.Func { // Bootstrap the internal custodial system configs and system signer key. // This container is passed down to individual tasker and API handlers. -func initSystemContainer(ctx context.Context, noncestore nonce.Noncestore) *tasker.SystemContainer { +func initSystemContainer(ctx context.Context, noncestore nonce.Noncestore) *custodial.SystemContainer { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() // Some custodial system defaults loaded from the config file. - systemContainer := &tasker.SystemContainer{ + systemContainer := &custodial.SystemContainer{ Abis: initAbis(), AccountIndexContract: w3.A(ko.MustString("system.account_index_address")), GasFaucetContract: w3.A(ko.MustString("system.gas_faucet_address")), @@ -55,10 +55,10 @@ func initSystemContainer(ctx context.Context, noncestore nonce.Noncestore) *task // Check if system signer account nonce is present. // If not (first boot), we bootstrap it from the network. currentSystemNonce, err := noncestore.Peek(ctx, ko.MustString("system.public_key")) - lo.Info("custodial: loaded (noncestore) system nonce", "nonce", currentSystemNonce) + lo.Info("custodial: loaded system nonce from noncestore", "nonce", currentSystemNonce) if err == redis.Nil { nonce, err := noncestore.SyncNetworkNonce(ctx, ko.MustString("system.public_key")) - lo.Info("custodial: syncing system nonce", "nonce", nonce) + lo.Info("custodial: syncing system nonce from network", "nonce", nonce) if err != nil { lo.Fatal("custodial: critical error bootstrapping system container", "error", err) } diff --git a/cmd/service/init.go b/cmd/service/init.go index 4e8ecdc..c6c7d63 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -7,11 +7,13 @@ import ( "github.com/bsm/redislock" "github.com/grassrootseconomics/celoutils" - "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/queries" "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/grassrootseconomics/cic-custodial/internal/sub" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/logg" "github.com/grassrootseconomics/cic-custodial/pkg/postgres" @@ -22,6 +24,7 @@ import ( "github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/v2" + "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) @@ -46,14 +49,14 @@ func initConfig() *koanf.Koanf { confFile := file.Provider(confFlag) if err := ko.Load(confFile, toml.Parser()); err != nil { - lo.Fatal("Could not load config file", "error", err) + lo.Fatal("init: could not load config file", "error", err) } if err := ko.Load(env.Provider("CUSTODIAL_", ".", func(s string) string { return strings.ReplaceAll(strings.ToLower( strings.TrimPrefix(s, "CUSTODIAL_")), "__", ".") }), nil); err != nil { - lo.Fatal("Could not override config from env vars", "error", err) + lo.Fatal("init: could not override config from env vars", "error", err) } if debugFlag { @@ -169,12 +172,11 @@ func initLockProvider(redisPool redislock.RedisClient) *redislock.Client { // Load tasker client. func initTaskerClient(redisPool *redis.RedisPool) *tasker.TaskerClient { return tasker.NewTaskerClient(tasker.TaskerClientOpts{ - RedisPool: redisPool, - TaskRetention: time.Duration(ko.MustInt64("asynq.task_retention_hrs")) * time.Hour, + RedisPool: redisPool, }) } -// Load Postgres store +// Load Postgres store. func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) store.Store { return store.NewPostgresStore(store.Opts{ PostgresPool: postgresPool, @@ -182,20 +184,44 @@ func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) sto }) } -// Init JetStream context for tasker events. -func initJetStream(pgStore store.Store) *events.JetStream { - jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{ - Logg: lo, - PgStore: pgStore, - ServerUrl: ko.MustString("jetstream.endpoint"), - PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, - DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour, - }) - +// Init JetStream context for both pub/sub. +func initJetStream() (*nats.Conn, nats.JetStreamContext) { + natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint")) if err != nil { - lo.Fatal("main: critical error loading jetstream event emitter") + lo.Fatal("init: critical error connecting to NATS", "error", err) + } + + js, err := natsConn.JetStream() + if err != nil { + lo.Fatal("init: bad JetStream opts", "error", err) } - return jsEmitter + return natsConn, js +} + +func initPub(jsCtx nats.JetStreamContext) *pub.Pub { + pub, err := pub.NewPub(pub.PubOpts{ + DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour, + JsCtx: jsCtx, + PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, + }) + if err != nil { + lo.Fatal("init: critical error bootstrapping pub", "error", err) + } + + return pub +} +func initSub(natsConn *nats.Conn, jsCtx nats.JetStreamContext, cu *custodial.Custodial) *sub.Sub { + sub, err := sub.NewSub(sub.SubOpts{ + CustodialContainer: cu, + JsCtx: jsCtx, + Logg: lo, + NatsConn: natsConn, + }) + if err != nil { + lo.Fatal("init: critical error bootstrapping sub", "error", err) + } + + return sub } diff --git a/cmd/service/main.go b/cmd/service/main.go index 1f180b6..bdadbab 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -7,7 +7,7 @@ import ( "sync" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/sub" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/knadh/koanf/v2" "github.com/labstack/echo/v4" @@ -17,7 +17,7 @@ import ( type ( internalServiceContainer struct { apiService *echo.Echo - jetstreamSub *events.JetStream + jetstreamSub *sub.Sub taskerService *tasker.TaskerServer } ) @@ -57,15 +57,16 @@ func main() { taskerClient := initTaskerClient(asynqRedisPool) systemContainer := initSystemContainer(context.Background(), redisNoncestore) - jsEventEmitter := initJetStream(pgStore) + natsConn, jsCtx := initJetStream() + jsPub := initPub(jsCtx) custodial := &custodial.Custodial{ CeloProvider: celoProvider, - EventEmitter: jsEventEmitter, Keystore: postgresKeystore, LockProvider: lockProvider, Noncestore: redisNoncestore, PgStore: pgStore, + Pub: jsPub, SystemContainer: systemContainer, TaskerClient: taskerClient, } @@ -80,8 +81,9 @@ func main() { wg.Add(1) go func() { defer wg.Done() - lo.Info("main: starting API server") - if err := internalServices.apiService.Start(ko.MustString("service.address")); err != nil { + host := ko.MustString("service.address") + lo.Info("main: starting API server", "host", host) + if err := internalServices.apiService.Start(host); err != nil { if strings.Contains(err.Error(), "Server closed") { lo.Info("main: shutting down server") } else { @@ -94,23 +96,23 @@ func main() { wg.Add(1) go func() { defer wg.Done() - lo.Info("Starting tasker") + lo.Info("main: starting tasker") if err := internalServices.taskerService.Start(); err != nil { lo.Fatal("main: could not start task server", "err", err) } }() - internalServices.jetstreamSub = jsEventEmitter + internalServices.jetstreamSub = initSub(natsConn, jsCtx, custodial) wg.Add(1) go func() { defer wg.Done() - lo.Info("Starting jetstream sub") - if err := internalServices.jetstreamSub.Subscriber(); err != nil { + lo.Info("main: starting jetstream sub") + if err := internalServices.jetstreamSub.Process(); err != nil { lo.Fatal("main: error running jetstream sub", "err", err) } }() - <-signalCh + lo.Info("main: graceful shutdown triggered", "signal", <-signalCh) startGracefulShutdown(context.Background(), internalServices) wg.Wait() diff --git a/cmd/service/tasker.go b/cmd/service/tasker.go index 25b29de..0e69a10 100644 --- a/cmd/service/tasker.go +++ b/cmd/service/tasker.go @@ -1,24 +1,40 @@ package main import ( + "context" + "time" + + "github.com/bsm/redislock" "github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker/task" "github.com/grassrootseconomics/cic-custodial/pkg/redis" "github.com/hibiken/asynq" + "github.com/zerodha/logf" +) + +const ( + fixedRetryCount = 25 + fixedRetryPeriod = time.Second * 1 ) // Load tasker handlers, injecting any necessary handler dependencies from the system container. func initTasker(custodialContainer *custodial.Custodial, redisPool *redis.RedisPool) *tasker.TaskerServer { taskerServerOpts := tasker.TaskerServerOpts{ - Concurrency: ko.MustInt("asynq.worker_count"), - Logg: lo, - LogLevel: asynq.InfoLevel, - RedisPool: redisPool, + Concurrency: ko.MustInt("asynq.worker_count"), + IsFailureHandler: isFailureHandler, + Logg: lo, + LogLevel: asynq.InfoLevel, + RedisPool: redisPool, + RetryHandler: retryHandler, } taskerServer := tasker.NewTaskerServer(taskerServerOpts) + taskerServer.RegisterMiddlewareStack([]asynq.MiddlewareFunc{ + observibilityMiddleware(lo), + }) + taskerServer.RegisterHandlers(tasker.AccountPrepareTask, task.AccountPrepare(custodialContainer)) taskerServer.RegisterHandlers(tasker.AccountRegisterTask, task.AccountRegisterOnChainProcessor(custodialContainer)) taskerServer.RegisterHandlers(tasker.AccountGiftGasTask, task.AccountGiftGasProcessor(custodialContainer)) @@ -29,3 +45,38 @@ func initTasker(custodialContainer *custodial.Custodial, redisPool *redis.RedisP return taskerServer } + +func isFailureHandler(err error) bool { + switch err { + // Ignore lock contention errors; retry until lock obtain. + case redislock.ErrNotObtained: + return false + default: + return true + } +} + +func retryHandler(count int, err error, task *asynq.Task) time.Duration { + if count < fixedRetryCount { + return fixedRetryPeriod + } else { + return asynq.DefaultRetryDelayFunc(count, err, task) + } +} + +func observibilityMiddleware(logg logf.Logger) asynq.MiddlewareFunc { + return func(handler asynq.Handler) asynq.Handler { + return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error { + taskId, _ := asynq.GetTaskID(ctx) + + err := handler.ProcessTask(ctx, task) + if err != nil && isFailureHandler(err) { + lo.Error("tasker: handler error", "err", err, "task_type", task.Type(), "task_id", taskId) + } else { + lo.Info("tasker: process task", "task_type", task.Type(), "task_id", taskId) + } + + return err + }) + } +} diff --git a/internal/custodial/custodial.go b/internal/custodial/custodial.go index 4e44b35..1ee9ae6 100644 --- a/internal/custodial/custodial.go +++ b/internal/custodial/custodial.go @@ -1,22 +1,46 @@ package custodial import ( + "crypto/ecdsa" + "math/big" + "time" + "github.com/bsm/redislock" + "github.com/celo-org/celo-blockchain/common" "github.com/grassrootseconomics/celoutils" - "github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" + "github.com/grassrootseconomics/w3-celo-patch" ) -type Custodial struct { - CeloProvider *celoutils.Provider - EventEmitter *events.JetStream - Keystore keystore.Keystore - LockProvider *redislock.Client - Noncestore nonce.Noncestore - PgStore store.Store - SystemContainer *tasker.SystemContainer - TaskerClient *tasker.TaskerClient -} +type ( + SystemContainer struct { + Abis map[string]*w3.Func + AccountIndexContract common.Address + GasFaucetContract common.Address + GasRefillThreshold *big.Int + GasRefillValue *big.Int + GiftableGasValue *big.Int + GiftableToken common.Address + GiftableTokenValue *big.Int + LockPrefix string + LockTimeout time.Duration + PrivateKey *ecdsa.PrivateKey + PublicKey string + TokenDecimals int + TokenTransferGasLimit uint64 + } + Custodial struct { + CeloProvider *celoutils.Provider + Keystore keystore.Keystore + LockProvider *redislock.Client + Noncestore nonce.Noncestore + PgStore store.Store + Pub *pub.Pub + SystemContainer *SystemContainer + TaskerClient *tasker.TaskerClient + } +) diff --git a/internal/events/jetstream.go b/internal/events/jetstream.go deleted file mode 100644 index c28dc3a..0000000 --- a/internal/events/jetstream.go +++ /dev/null @@ -1,169 +0,0 @@ -package events - -import ( - "context" - "encoding/json" - "errors" - "time" - - "github.com/grassrootseconomics/cic-custodial/internal/store" - "github.com/nats-io/nats.go" - "github.com/zerodha/logf" -) - -const ( - // Pub - StreamName string = "CUSTODIAL" - StreamSubjects string = "CUSTODIAL.*" - AccountNewNonce string = "CUSTODIAL.accountNewNonce" - AccountRegister string = "CUSTODIAL.accountRegister" - AccountGiftGas string = "CUSTODIAL.systemNewAccountGas" - AccountGiftVoucher string = "CUSTODIAL.systemNewAccountVoucher" - AccountRefillGas string = "CUSTODIAL.systemRefillAccountGas" - DispatchFail string = "CUSTODIAL.dispatchFail" - DispatchSuccess string = "CUSTODIAL.dispatchSuccess" - SignTransfer string = "CUSTODIAL.signTransfer" - - // Sub - durableId = "cic-custodial" - pullStream = "CHAIN" - pullSubject = "CHAIN.*" - actionTimeout = 5 * time.Second -) - -type ( - JetStreamOpts struct { - Logg logf.Logger - ServerUrl string - PersistDuration time.Duration - PgStore store.Store - DedupDuration time.Duration - } - - JetStream struct { - logg logf.Logger - jsCtx nats.JetStreamContext - pgStore store.Store - natsConn *nats.Conn - } - - EventPayload struct { - OtxId uint `json:"otxId"` - TrackingId string `json:"trackingId"` - TxHash string `json:"txHash"` - } -) - -func NewJetStreamEventEmitter(o JetStreamOpts) (*JetStream, error) { - natsConn, err := nats.Connect(o.ServerUrl) - if err != nil { - return nil, err - } - - js, err := natsConn.JetStream() - if err != nil { - return nil, err - } - - // Bootstrap stream if it doesn't exist. - stream, _ := js.StreamInfo(StreamName) - if stream == nil { - _, err = js.AddStream(&nats.StreamConfig{ - Name: StreamName, - MaxAge: o.PersistDuration, - Storage: nats.FileStorage, - Subjects: []string{StreamSubjects}, - Duplicates: o.DedupDuration, - }) - if err != nil { - return nil, err - } - } - - // Add a durable consumer - _, err = js.AddConsumer(pullStream, &nats.ConsumerConfig{ - Durable: durableId, - AckPolicy: nats.AckExplicitPolicy, - FilterSubject: pullSubject, - }) - if err != nil { - return nil, err - } - - return &JetStream{ - logg: o.Logg, - jsCtx: js, - pgStore: o.PgStore, - natsConn: natsConn, - }, nil -} - -// Close gracefully shutdowns the JetStream connection. -func (js *JetStream) Close() { - if js.natsConn != nil { - js.natsConn.Close() - } -} - -// Publish publishes the JSON data to the NATS stream. -func (js *JetStream) Publish(subject string, dedupId string, eventPayload interface{}) error { - jsonData, err := json.Marshal(eventPayload) - if err != nil { - return err - } - - _, err = js.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId)) - if err != nil { - return err - } - - return nil -} - -func (js *JetStream) Subscriber() error { - subOpts := []nats.SubOpt{ - nats.ManualAck(), - nats.Bind(pullStream, durableId), - } - - natsSub, err := js.jsCtx.PullSubscribe(pullSubject, durableId, subOpts...) - if err != nil { - return err - } - - for { - events, err := natsSub.Fetch(1) - if err != nil { - if errors.Is(err, nats.ErrTimeout) { - continue - } else if errors.Is(err, nats.ErrConnectionClosed) { - return nil - } else { - return err - } - } - if len(events) > 0 { - var ( - chainEvent store.MinimalTxInfo - - msg = events[0] - ) - - if err := json.Unmarshal(msg.Data, &chainEvent); err != nil { - msg.Nak() - js.logg.Error("jetstream sub: json unmarshal fail", "error", err) - } else { - ctx, cancel := context.WithTimeout(context.Background(), actionTimeout) - - if err := js.pgStore.UpdateOtxStatusFromChainEvent(ctx, chainEvent); err != nil { - msg.Nak() - js.logg.Error("jetstream sub: otx marker failed to update state", "error", err) - } else { - msg.Ack() - } - cancel() - } - - } - } -} diff --git a/internal/pub/js_pub.go b/internal/pub/js_pub.go new file mode 100644 index 0000000..fcdcf14 --- /dev/null +++ b/internal/pub/js_pub.go @@ -0,0 +1,73 @@ +package pub + +import ( + "encoding/json" + "time" + + "github.com/nats-io/nats.go" +) + +const ( + StreamName string = "CUSTODIAL" + StreamSubjects string = "CUSTODIAL.*" + AccountNewNonce string = "CUSTODIAL.accountNewNonce" + AccountRegister string = "CUSTODIAL.accountRegister" + AccountGiftGas string = "CUSTODIAL.systemNewAccountGas" + AccountGiftVoucher string = "CUSTODIAL.systemNewAccountVoucher" + AccountRefillGas string = "CUSTODIAL.systemRefillAccountGas" + DispatchFail string = "CUSTODIAL.dispatchFail" + DispatchSuccess string = "CUSTODIAL.dispatchSuccess" + SignTransfer string = "CUSTODIAL.signTransfer" +) + +type ( + PubOpts struct { + DedupDuration time.Duration + JsCtx nats.JetStreamContext + PersistDuration time.Duration + } + + Pub struct { + jsCtx nats.JetStreamContext + } + + EventPayload struct { + OtxId uint `json:"otxId"` + TrackingId string `json:"trackingId"` + TxHash string `json:"txHash"` + } +) + +func NewPub(o PubOpts) (*Pub, error) { + stream, _ := o.JsCtx.StreamInfo(StreamName) + if stream == nil { + _, err := o.JsCtx.AddStream(&nats.StreamConfig{ + Name: StreamName, + MaxAge: o.PersistDuration, + Storage: nats.FileStorage, + Subjects: []string{StreamSubjects}, + Duplicates: o.DedupDuration, + }) + if err != nil { + return nil, err + } + } + + return &Pub{ + jsCtx: o.JsCtx, + }, nil +} + +func (p *Pub) Publish(subject string, dedupId string, eventPayload interface{}) error { + jsonData, err := json.Marshal(eventPayload) + if err != nil { + return err + } + + _, err = p.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId)) + if err != nil { + return err + } + + return nil +} diff --git a/internal/sub/handler.go b/internal/sub/handler.go new file mode 100644 index 0000000..75577f8 --- /dev/null +++ b/internal/sub/handler.go @@ -0,0 +1,30 @@ +package sub + +import ( + "context" + "encoding/json" + + "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/nats-io/nats.go" +) + +func (s *Sub) handler(ctx context.Context, msg *nats.Msg) error { + var ( + chainEvent store.MinimalTxInfo + ) + + if err := json.Unmarshal(msg.Data, &chainEvent); err != nil { + return err + } + + if err := s.cu.PgStore.UpdateOtxStatusFromChainEvent(ctx, chainEvent); err != nil { + return err + } + + switch msg.Subject { + case "CHAIN.gas": + // + } + + return nil +} diff --git a/internal/sub/js_sub.go b/internal/sub/js_sub.go new file mode 100644 index 0000000..022ef50 --- /dev/null +++ b/internal/sub/js_sub.go @@ -0,0 +1,100 @@ +package sub + +import ( + "context" + "errors" + "time" + + "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/nats-io/nats.go" + "github.com/zerodha/logf" +) + +const ( + durableId = "cic-custodial" + pullStream = "CHAIN" + pullSubject = "CHAIN.*" + actionTimeout = 5 * time.Second + waitDelay = 1 * time.Second +) + +type ( + SubOpts struct { + CustodialContainer *custodial.Custodial + JsCtx nats.JetStreamContext + Logg logf.Logger + NatsConn *nats.Conn + } + + Sub struct { + cu *custodial.Custodial + jsCtx nats.JetStreamContext + logg logf.Logger + natsConn *nats.Conn + } +) + +func NewSub(o SubOpts) (*Sub, error) { + _, err := o.JsCtx.AddConsumer(pullStream, &nats.ConsumerConfig{ + Durable: durableId, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: pullSubject, + }) + if err != nil { + return nil, err + } + + return &Sub{ + cu: o.CustodialContainer, + jsCtx: o.JsCtx, + logg: o.Logg, + natsConn: o.NatsConn, + }, nil +} + +func (s *Sub) Process() error { + subOpts := []nats.SubOpt{ + nats.ManualAck(), + nats.Bind(pullStream, durableId), + } + + natsSub, err := s.jsCtx.PullSubscribe(pullSubject, durableId, subOpts...) + if err != nil { + return err + } + + for { + events, err := natsSub.Fetch(1) + if err != nil { + if errors.Is(err, nats.ErrTimeout) { + s.logg.Debug("sub: no msg to pull") + time.Sleep(waitDelay) + continue + } else if errors.Is(err, nats.ErrConnectionClosed) { + return nil + } else { + return err + } + } + + if len(events) > 0 { + msg := events[0] + ctx, cancel := context.WithTimeout(context.Background(), actionTimeout) + + if err := s.handler(ctx, msg); err != nil { + s.logg.Error("sub: handler error", "error", err) + msg.Nak() + } else { + s.logg.Debug("sub: processed msg", "subject", msg.Subject) + msg.Ack() + } + cancel() + } + } +} + +func (s *Sub) Close() { + if s.natsConn != nil { + s.natsConn.Close() + } +} diff --git a/internal/tasker/client.go b/internal/tasker/client.go index 8510bf3..657524f 100644 --- a/internal/tasker/client.go +++ b/internal/tasker/client.go @@ -10,17 +10,16 @@ import ( ) const ( - taskTimeout = 60 + taskTimeout = 60 * time.Second + taskRetention = 48 * time.Hour ) type TaskerClientOpts struct { - RedisPool *redis.RedisPool - TaskRetention time.Duration + RedisPool *redis.RedisPool } type TaskerClient struct { - Client *asynq.Client - taskRetention time.Duration + Client *asynq.Client } func NewTaskerClient(o TaskerClientOpts) *TaskerClient { @@ -39,8 +38,8 @@ func (c *TaskerClient) CreateTask(ctx context.Context, taskName TaskName, queueN task.Payload, asynq.Queue(string(queueName)), asynq.TaskID(task.Id), - asynq.Retention(c.taskRetention), - asynq.Timeout(taskTimeout*time.Second), + asynq.Retention(taskRetention), + asynq.Timeout(taskTimeout), ) taskInfo, err := c.Client.EnqueueContext(ctx, qTask) diff --git a/internal/tasker/server.go b/internal/tasker/server.go index 1d30156..44185bb 100644 --- a/internal/tasker/server.go +++ b/internal/tasker/server.go @@ -4,22 +4,24 @@ import ( "context" "time" - "github.com/bsm/redislock" + "github.com/grassrootseconomics/cic-custodial/pkg/logg" "github.com/grassrootseconomics/cic-custodial/pkg/redis" "github.com/hibiken/asynq" "github.com/zerodha/logf" ) const ( - fixedRetryCount = 25 - fixedRetryPeriod = time.Second * 1 + retryRequeueInterval = 2 * time.Second ) type TaskerServerOpts struct { - Concurrency int - Logg logf.Logger - LogLevel asynq.LogLevel - RedisPool *redis.RedisPool + Concurrency int + ErrorHandler asynq.ErrorHandler + IsFailureHandler func(error) bool + Logg logf.Logger + LogLevel asynq.LogLevel + RedisPool *redis.RedisPool + RetryHandler asynq.RetryDelayFunc } type TaskerServer struct { @@ -31,14 +33,17 @@ func NewTaskerServer(o TaskerServerOpts) *TaskerServer { server := asynq.NewServer( o.RedisPool, asynq.Config{ - Concurrency: o.Concurrency, - IsFailure: expectedFailures, - LogLevel: o.LogLevel, + Concurrency: o.Concurrency, + DelayedTaskCheckInterval: retryRequeueInterval, + ErrorHandler: o.ErrorHandler, + IsFailure: o.IsFailureHandler, + LogLevel: o.LogLevel, + Logger: logg.AsynqCompatibleLogger(o.Logg), Queues: map[string]int{ - string(HighPriority): 5, - string(DefaultPriority): 2, + string(HighPriority): 7, + string(DefaultPriority): 3, }, - RetryDelayFunc: retryDelay, + RetryDelayFunc: o.RetryHandler, }, ) @@ -50,6 +55,10 @@ func NewTaskerServer(o TaskerServerOpts) *TaskerServer { } } +func (ts *TaskerServer) RegisterMiddlewareStack(middlewareStack []asynq.MiddlewareFunc) { + ts.mux.Use(middlewareStack...) +} + func (ts *TaskerServer) RegisterHandlers(taskName TaskName, taskHandler func(context.Context, *asynq.Task) error) { ts.mux.HandleFunc(string(taskName), taskHandler) } @@ -66,22 +75,3 @@ func (ts *TaskerServer) Stop() { ts.server.Stop() ts.server.Shutdown() } - -func expectedFailures(err error) bool { - switch err { - // Ignore lock contention errors; retry until lock obtain. - case redislock.ErrNotObtained: - return false - default: - return true - } -} - -// Immidiatel -func retryDelay(count int, err error, task *asynq.Task) time.Duration { - if count < fixedRetryCount { - return fixedRetryPeriod - } else { - return asynq.DefaultRetryDelayFunc(count, err, task) - } -} diff --git a/internal/tasker/task/account_gift_gas.go b/internal/tasker/task/account_gift_gas.go index 42a9aab..5c58408 100644 --- a/internal/tasker/task/account_gift_gas.go +++ b/internal/tasker/task/account_gift_gas.go @@ -8,7 +8,7 @@ import ( "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/enum" @@ -105,14 +105,14 @@ func AccountGiftGasProcessor(cu *custodial.Custodial) func(context.Context, *asy return err } - eventPayload := &events.EventPayload{ + eventPayload := &pub.EventPayload{ OtxId: id, TrackingId: payload.TrackingId, TxHash: builtTx.Hash().Hex(), } - if err := cu.EventEmitter.Publish( - events.AccountGiftGas, + if err := cu.Pub.Publish( + pub.AccountGiftGas, builtTx.Hash().Hex(), eventPayload, ); err != nil { diff --git a/internal/tasker/task/account_gift_voucher.go b/internal/tasker/task/account_gift_voucher.go index f07b02e..788b48f 100644 --- a/internal/tasker/task/account_gift_voucher.go +++ b/internal/tasker/task/account_gift_voucher.go @@ -7,7 +7,7 @@ import ( "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/enum" @@ -115,14 +115,14 @@ func GiftVoucherProcessor(cu *custodial.Custodial) func(context.Context, *asynq. return err } - eventPayload := &events.EventPayload{ + eventPayload := &pub.EventPayload{ OtxId: id, TrackingId: payload.TrackingId, TxHash: builtTx.Hash().Hex(), } - if err := cu.EventEmitter.Publish( - events.AccountGiftVoucher, + if err := cu.Pub.Publish( + pub.AccountGiftVoucher, builtTx.Hash().Hex(), eventPayload, ); err != nil { diff --git a/internal/tasker/task/account_prepare.go b/internal/tasker/task/account_prepare.go index e3b90db..00755ac 100644 --- a/internal/tasker/task/account_prepare.go +++ b/internal/tasker/task/account_prepare.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/hibiken/asynq" ) @@ -64,12 +64,12 @@ func AccountPrepare(cu *custodial.Custodial) func(context.Context, *asynq.Task) return err } - eventPayload := events.EventPayload{ + eventPayload := pub.EventPayload{ TrackingId: payload.TrackingId, } - if err := cu.EventEmitter.Publish( - events.AccountNewNonce, + if err := cu.Pub.Publish( + pub.AccountNewNonce, payload.PublicKey, eventPayload, ); err != nil { diff --git a/internal/tasker/task/account_refill_gas.go b/internal/tasker/task/account_refill_gas.go index 576ddf3..5d0bf52 100644 --- a/internal/tasker/task/account_refill_gas.go +++ b/internal/tasker/task/account_refill_gas.go @@ -9,7 +9,7 @@ import ( "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/enum" @@ -120,14 +120,14 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a return err } - eventPayload := &events.EventPayload{ + eventPayload := &pub.EventPayload{ OtxId: id, TrackingId: payload.TrackingId, TxHash: builtTx.Hash().Hex(), } - if err := cu.EventEmitter.Publish( - events.AccountRefillGas, + if err := cu.Pub.Publish( + pub.AccountRefillGas, builtTx.Hash().Hex(), eventPayload, ); err != nil { diff --git a/internal/tasker/task/account_register_onchain.go b/internal/tasker/task/account_register_onchain.go index bdf7f17..7a2029f 100644 --- a/internal/tasker/task/account_register_onchain.go +++ b/internal/tasker/task/account_register_onchain.go @@ -8,7 +8,7 @@ import ( "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/enum" @@ -113,14 +113,14 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte return err } - eventPayload := &events.EventPayload{ + eventPayload := &pub.EventPayload{ OtxId: id, TrackingId: payload.TrackingId, TxHash: builtTx.Hash().Hex(), } - if err := cu.EventEmitter.Publish( - events.AccountRegister, + if err := cu.Pub.Publish( + pub.AccountRegister, builtTx.Hash().Hex(), eventPayload, ); err != nil { diff --git a/internal/tasker/task/dispatch_tx.go b/internal/tasker/task/dispatch_tx.go index 46cb873..3271d0d 100644 --- a/internal/tasker/task/dispatch_tx.go +++ b/internal/tasker/task/dispatch_tx.go @@ -9,7 +9,7 @@ import ( "github.com/celo-org/celo-blockchain/core/types" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/w3-celo-patch/module/eth" @@ -26,7 +26,7 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro var ( payload TxPayload dispatchStatus store.DispatchStatus - eventPayload events.EventPayload + eventPayload pub.EventPayload dispathchTx common.Hash ) @@ -58,7 +58,7 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } - if err := cu.EventEmitter.Publish(events.DispatchFail, txHash, eventPayload); err != nil { + if err := cu.Pub.Publish(pub.DispatchFail, txHash, eventPayload); err != nil { return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } @@ -71,7 +71,7 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } - if err := cu.EventEmitter.Publish(events.DispatchSuccess, txHash, eventPayload); err != nil { + if err := cu.Pub.Publish(pub.DispatchSuccess, txHash, eventPayload); err != nil { return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } diff --git a/internal/tasker/task/sign_transfer.go b/internal/tasker/task/sign_transfer.go index 29a2583..93e9dc5 100644 --- a/internal/tasker/task/sign_transfer.go +++ b/internal/tasker/task/sign_transfer.go @@ -9,7 +9,7 @@ import ( "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/enum" @@ -160,8 +160,8 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er TxHash: builtTx.Hash().Hex(), } - if err := cu.EventEmitter.Publish( - events.SignTransfer, + if err := cu.Pub.Publish( + pub.SignTransfer, builtTx.Hash().Hex(), eventPayload, ); err != nil { diff --git a/internal/tasker/types.go b/internal/tasker/types.go index 1870f40..99d3188 100644 --- a/internal/tasker/types.go +++ b/internal/tasker/types.go @@ -1,13 +1,7 @@ package tasker import ( - "crypto/ecdsa" "encoding/json" - "math/big" - "time" - - "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/w3-celo-patch" ) type ( @@ -15,23 +9,6 @@ type ( TaskName string ) -type SystemContainer struct { - Abis map[string]*w3.Func - AccountIndexContract common.Address - GasFaucetContract common.Address - GasRefillThreshold *big.Int - GasRefillValue *big.Int - GiftableGasValue *big.Int - GiftableToken common.Address - GiftableTokenValue *big.Int - LockPrefix string - LockTimeout time.Duration - PrivateKey *ecdsa.PrivateKey - PublicKey string - TokenDecimals int - TokenTransferGasLimit uint64 -} - type Task struct { Id string `json:"id"` Payload json.RawMessage `json:"payload"` diff --git a/pkg/logg/asynq.go b/pkg/logg/asynq.go index cdc986a..8bdcfa9 100644 --- a/pkg/logg/asynq.go +++ b/pkg/logg/asynq.go @@ -13,21 +13,21 @@ func AsynqCompatibleLogger(lo logf.Logger) AsynqLogger { } func (l AsynqLogger) Debug(args ...interface{}) { - l.Lo.Debug("asynq", "debug", args[0]) + l.Lo.Debug("asynq: server", "msg", args[0]) } func (l AsynqLogger) Info(args ...interface{}) { - l.Lo.Info("asynq", "info", args[0]) + l.Lo.Info("asynq: server", "msg", args[0]) } func (l AsynqLogger) Warn(args ...interface{}) { - l.Lo.Warn("asynq", "warn", args[0]) + l.Lo.Warn("asynq: server", "msg", args[0]) } func (l AsynqLogger) Error(args ...interface{}) { - l.Lo.Error("asynq", "error", args[0]) + l.Lo.Error("asynq: server", "msg", args[0]) } func (l AsynqLogger) Fatal(args ...interface{}) { - l.Lo.Fatal("asynq", "fatal", args[0]) + l.Lo.Fatal("asynq: server", "msg", args[0]) }