cic-custodial/internal/tasker/client.go

53 lines
1004 B
Go
Raw Permalink Normal View History

2022-11-30 10:51:24 +01:00
package tasker
import (
"context"
2022-11-30 10:51:24 +01:00
"time"
"github.com/google/uuid"
"github.com/grassrootseconomics/cic-custodial/pkg/redis"
"github.com/hibiken/asynq"
)
const (
taskTimeout = 60
)
2022-11-30 10:51:24 +01:00
type TaskerClientOpts struct {
RedisPool *redis.RedisPool
TaskRetention time.Duration
}
type TaskerClient struct {
Client *asynq.Client
taskRetention time.Duration
}
func NewTaskerClient(o TaskerClientOpts) *TaskerClient {
return &TaskerClient{
Client: asynq.NewClient(o.RedisPool),
}
}
func (c *TaskerClient) CreateTask(ctx context.Context, taskName TaskName, queueName QueueName, task *Task) (*asynq.TaskInfo, error) {
2022-11-30 10:51:24 +01:00
if task.Id == "" {
task.Id = uuid.NewString()
}
qTask := asynq.NewTask(
string(taskName),
task.Payload,
asynq.Queue(string(queueName)),
asynq.TaskID(task.Id),
asynq.Retention(c.taskRetention),
asynq.Timeout(taskTimeout*time.Second),
2022-11-30 10:51:24 +01:00
)
taskInfo, err := c.Client.EnqueueContext(ctx, qTask)
2022-11-30 10:51:24 +01:00
if err != nil {
return nil, err
}
return taskInfo, nil
}