cic-custodial/internal/tasker/client.go
Mohammed Sohail add7f2a442
refactor: ctx propagation, api handlers
* use context timeout middleware for correct ctx propagation
* Fix bind error handling
* Fix validation error handling
* Fix HTTP error handling (4XX)
* tasker client now  accepts ctx
* add recovery and body size middleware
2023-02-24 16:46:46 +00:00

53 lines
1004 B
Go

package tasker
import (
"context"
"time"
"github.com/google/uuid"
"github.com/grassrootseconomics/cic-custodial/pkg/redis"
"github.com/hibiken/asynq"
)
const (
taskTimeout = 60
)
type TaskerClientOpts struct {
RedisPool *redis.RedisPool
TaskRetention time.Duration
}
type TaskerClient struct {
Client *asynq.Client
taskRetention time.Duration
}
func NewTaskerClient(o TaskerClientOpts) *TaskerClient {
return &TaskerClient{
Client: asynq.NewClient(o.RedisPool),
}
}
func (c *TaskerClient) CreateTask(ctx context.Context, taskName TaskName, queueName QueueName, task *Task) (*asynq.TaskInfo, error) {
if task.Id == "" {
task.Id = uuid.NewString()
}
qTask := asynq.NewTask(
string(taskName),
task.Payload,
asynq.Queue(string(queueName)),
asynq.TaskID(task.Id),
asynq.Retention(c.taskRetention),
asynq.Timeout(taskTimeout*time.Second),
)
taskInfo, err := c.Client.EnqueueContext(ctx, qTask)
if err != nil {
return nil, err
}
return taskInfo, nil
}