cic-custodial/internal/tasker/client.go

57 lines
1.1 KiB
Go
Raw Normal View History

2022-11-30 10:51:24 +01:00
package tasker
import (
"context"
2022-11-30 10:51:24 +01:00
"time"
"github.com/google/uuid"
"github.com/grassrootseconomics/cic-custodial/pkg/redis"
"github.com/hibiken/asynq"
)
const (
taskTimeout = 60 * time.Second
taskRetention = 48 * time.Hour
)
2022-11-30 10:51:24 +01:00
type TaskerClientOpts struct {
RedisPool *redis.RedisPool
2022-11-30 10:51:24 +01:00
}
type TaskerClient struct {
Client *asynq.Client
2022-11-30 10:51:24 +01:00
}
func NewTaskerClient(o TaskerClientOpts) *TaskerClient {
return &TaskerClient{
Client: asynq.NewClient(o.RedisPool),
}
}
func (c *TaskerClient) CreateTask(ctx context.Context, taskName TaskName, queueName QueueName, task *Task, extraOpts ...asynq.Option) (*asynq.TaskInfo, error) {
2022-11-30 10:51:24 +01:00
if task.Id == "" {
task.Id = uuid.NewString()
}
defaultOpts := []asynq.Option{
2022-11-30 10:51:24 +01:00
asynq.Queue(string(queueName)),
asynq.TaskID(task.Id),
asynq.Retention(taskRetention),
asynq.Timeout(taskTimeout),
}
2023-03-08 07:59:10 +01:00
defaultOpts = append(defaultOpts, extraOpts...)
qTask := asynq.NewTask(
string(taskName),
task.Payload,
2023-03-08 07:59:10 +01:00
defaultOpts...,
2022-11-30 10:51:24 +01:00
)
taskInfo, err := c.Client.EnqueueContext(ctx, qTask)
2022-11-30 10:51:24 +01:00
if err != nil {
return nil, err
}
return taskInfo, nil
}