From ad58d1da47c7f1fae2e400fe713b0ea07f32b416 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Tue, 14 Mar 2023 16:02:25 +0000 Subject: [PATCH] feat: add lock retry strategy * previouly we relied on the task being re-queued which generally reduces the throughput of tasks --- internal/tasker/task/account_gift_gas.go | 5 ++++- internal/tasker/task/account_gift_voucher.go | 5 ++++- internal/tasker/task/account_refill_gas.go | 5 ++++- .../tasker/task/account_register_onchain.go | 5 ++++- internal/tasker/task/sign_transfer.go | 5 ++++- internal/tasker/task/utils.go | 20 +++++++++++++++++-- 6 files changed, 38 insertions(+), 7 deletions(-) diff --git a/internal/tasker/task/account_gift_gas.go b/internal/tasker/task/account_gift_gas.go index 21ac631..1ca2fee 100644 --- a/internal/tasker/task/account_gift_gas.go +++ b/internal/tasker/task/account_gift_gas.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" @@ -36,7 +37,9 @@ func AccountGiftGasProcessor(cu *custodial.Custodial) func(context.Context, *asy ctx, lockPrefix+cu.SystemContainer.PublicKey, cu.SystemContainer.LockTimeout, - nil, + &redislock.Options{ + RetryStrategy: lockRetry(), + }, ) if err != nil { return err diff --git a/internal/tasker/task/account_gift_voucher.go b/internal/tasker/task/account_gift_voucher.go index 42c6386..2ffca28 100644 --- a/internal/tasker/task/account_gift_voucher.go +++ b/internal/tasker/task/account_gift_voucher.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" + "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" @@ -30,7 +31,9 @@ func GiftVoucherProcessor(cu *custodial.Custodial) func(context.Context, *asynq. ctx, lockPrefix+cu.SystemContainer.PublicKey, cu.SystemContainer.LockTimeout, - nil, + &redislock.Options{ + RetryStrategy: lockRetry(), + }, ) if err != nil { return err diff --git a/internal/tasker/task/account_refill_gas.go b/internal/tasker/task/account_refill_gas.go index 5a2e43f..76ac118 100644 --- a/internal/tasker/task/account_refill_gas.go +++ b/internal/tasker/task/account_refill_gas.go @@ -7,6 +7,7 @@ import ( "fmt" "time" + "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" @@ -55,7 +56,9 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a ctx, lockPrefix+cu.SystemContainer.PublicKey, cu.SystemContainer.LockTimeout, - nil, + &redislock.Options{ + RetryStrategy: lockRetry(), + }, ) if err != nil { return err diff --git a/internal/tasker/task/account_register_onchain.go b/internal/tasker/task/account_register_onchain.go index 85dc315..c74d2d3 100644 --- a/internal/tasker/task/account_register_onchain.go +++ b/internal/tasker/task/account_register_onchain.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" + "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" @@ -31,7 +32,9 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte ctx, lockPrefix+cu.SystemContainer.PublicKey, cu.SystemContainer.LockTimeout, - nil, + &redislock.Options{ + RetryStrategy: lockRetry(), + }, ) if err != nil { return err diff --git a/internal/tasker/task/sign_transfer.go b/internal/tasker/task/sign_transfer.go index a906a89..f44c2f4 100644 --- a/internal/tasker/task/sign_transfer.go +++ b/internal/tasker/task/sign_transfer.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" + "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" @@ -49,7 +50,9 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er ctx, lockPrefix+payload.From, cu.SystemContainer.LockTimeout, - nil, + &redislock.Options{ + RetryStrategy: lockRetry(), + }, ) if err != nil { return err diff --git a/internal/tasker/task/utils.go b/internal/tasker/task/utils.go index 266cc28..e75aa17 100644 --- a/internal/tasker/task/utils.go +++ b/internal/tasker/task/utils.go @@ -1,5 +1,21 @@ package task -const ( - lockPrefix = "lock:" +import ( + "time" + + "github.com/bsm/redislock" ) + +const ( + lockPrefix = "lock:" + lockRetryDelay = 25 * time.Millisecond +) + +// lockRetry will at most try to obtain the lock 20 times within ~0.5s. +// it is expected to prevent immidiate requeue of the task at the expense of more redis calls. +func lockRetry() redislock.RetryStrategy { + return redislock.LimitRetry( + redislock.LinearBackoff(lockRetryDelay), + 20, + ) +}