refactor: moving locking to tasker

This commit is contained in:
Mohamed Sohail 2022-10-25 16:04:43 +00:00
parent 1392c03ab0
commit b8029be6ee
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
6 changed files with 50 additions and 64 deletions

View File

@ -66,7 +66,7 @@ func initChainProvider() *chain.Provider {
func initTaskerClient() *tasker_client.TaskerClient { func initTaskerClient() *tasker_client.TaskerClient {
return tasker_client.NewTaskerClient(tasker_client.Opts{ return tasker_client.NewTaskerClient(tasker_client.Opts{
RedisDSN: ko.MustString("tasker.dsn"), RedisDSN: ko.MustString("redis.dsn"),
}) })
} }
@ -93,10 +93,10 @@ func initNoncestore() noncestore.Noncestore {
switch provider := ko.MustString("noncestore.provider"); provider { switch provider := ko.MustString("noncestore.provider"); provider {
case "redis": case "redis":
redisNoncestore, err := redis_noncestore.NewRedisNoncestore(redis_noncestore.Opts{ redisNoncestore, err := redis_noncestore.NewRedisNoncestore(redis_noncestore.Opts{
RedisDSN: ko.MustString("noncestore.dsn"), RedisDSN: ko.MustString("redis.dsn"),
RedisDB: 2, RedisDB: 2,
MinIdleConns: 8, MinIdleConns: 5,
PoolSize: 15, PoolSize: 10,
ChainProvider: chainProvider, ChainProvider: chainProvider,
}) })
if err != nil { if err != nil {

View File

@ -62,9 +62,12 @@ func main() {
taskerServer = tasker_server.NewTaskerServer(tasker_server.Opts{ taskerServer = tasker_server.NewTaskerServer(tasker_server.Opts{
ActionsProvider: actionsProvider, ActionsProvider: actionsProvider,
TaskerClient: taskerClient, TaskerClient: taskerClient,
RedisDSN: ko.MustString("tasker.dsn"), RedisDSN: ko.MustString("redis.dsn"),
Concurrency: 15, Concurrency: 15,
Logger: lo, Logger: lo,
RedisLockDB: 1,
RedisLockMinIdleConns: 3,
RedisLockPoolSize: 6,
}) })
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -6,10 +6,10 @@ address = ":5000"
endpoint = "https://rpc.sarafu.network" endpoint = "https://rpc.sarafu.network"
[admin] [admin]
public = "0x28Daa9bA426383fA0296A669Af13f3EeF57A2877" public = "0x80097c773B3E83472FC7952c5206a7DB35d42bEF"
key = "c7f7e36c62a1ecfc4fba508421919c6449e7dced0ea846ce00fe3e93a9aaa608" key = "a6af6c597c614e3c8ee4b7638ab7c3f737aece3773a5413ca8caf4338e6b06d1"
[tasker] [redis]
dsn = "127.0.0.1:6379" dsn = "127.0.0.1:6379"
[keystore] [keystore]
@ -18,4 +18,3 @@ dsn = "postgres://postgres:postgres@localhost:5432/cic_custodial"
[noncestore] [noncestore]
provider = "redis" provider = "redis"
dsn = "127.0.0.1:6379"

View File

@ -2,9 +2,7 @@ package redis
import ( import (
"context" "context"
"time"
"github.com/bsm/redislock"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/grassrootseconomics/cic-custodial/internal/noncestore" "github.com/grassrootseconomics/cic-custodial/internal/noncestore"
"github.com/grassrootseconomics/cic-go-sdk/chain" "github.com/grassrootseconomics/cic-go-sdk/chain"
@ -12,11 +10,6 @@ import (
"github.com/lmittmann/w3/module/eth" "github.com/lmittmann/w3/module/eth"
) )
const (
mutexLockTTL = 200 * time.Millisecond
mutexKeyPrefix = "lock_"
)
// Opts represents the Redis nonce store specific params // Opts represents the Redis nonce store specific params
type Opts struct { type Opts struct {
RedisDSN string RedisDSN string
@ -28,7 +21,6 @@ type Opts struct {
// RedisNoncestore implements `noncestore.Noncestore` // RedisNoncestore implements `noncestore.Noncestore`
type RedisNoncestore struct { type RedisNoncestore struct {
redisLockProvider *redislock.Client
chainProvider *chain.Provider chainProvider *chain.Provider
redis *redis.Client redis *redis.Client
} }
@ -46,19 +38,12 @@ func NewRedisNoncestore(o Opts) (noncestore.Noncestore, error) {
} }
return &RedisNoncestore{ return &RedisNoncestore{
redisLockProvider: redislock.New(redisClient),
redis: redisClient, redis: redisClient,
chainProvider: o.ChainProvider, chainProvider: o.ChainProvider,
}, nil }, nil
} }
func (ns *RedisNoncestore) Peek(ctx context.Context, publicKey string) (uint64, error) { func (ns *RedisNoncestore) Peek(ctx context.Context, publicKey string) (uint64, error) {
lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil)
if err != nil {
return 0, err
}
defer lock.Release(ctx)
nonce, err := ns.redis.Get(ctx, publicKey).Uint64() nonce, err := ns.redis.Get(ctx, publicKey).Uint64()
if err != nil { if err != nil {
return 0, err return 0, err
@ -72,13 +57,7 @@ func (ns *RedisNoncestore) Acquire(ctx context.Context, publicKey string) (uint6
nonce uint64 nonce uint64
) )
lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil) nonce, err := ns.redis.Get(ctx, publicKey).Uint64()
if err != nil {
return 0, err
}
defer lock.Release(ctx)
nonce, err = ns.redis.Get(ctx, publicKey).Uint64()
if err == redis.Nil { if err == redis.Nil {
networkNonce, err := ns.SyncNetworkNonce(ctx, publicKey) networkNonce, err := ns.SyncNetworkNonce(ctx, publicKey)
if err != nil { if err != nil {
@ -99,12 +78,6 @@ func (ns *RedisNoncestore) Acquire(ctx context.Context, publicKey string) (uint6
} }
func (ns *RedisNoncestore) Return(ctx context.Context, publicKey string) (uint64, error) { func (ns *RedisNoncestore) Return(ctx context.Context, publicKey string) (uint64, error) {
lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil)
if err != nil {
return 0, err
}
defer lock.Release(ctx)
nonce, err := ns.redis.Get(ctx, publicKey).Uint64() nonce, err := ns.redis.Get(ctx, publicKey).Uint64()
if err != nil { if err != nil {
return 0, err return 0, err
@ -125,13 +98,7 @@ func (ns *RedisNoncestore) SyncNetworkNonce(ctx context.Context, publicKey strin
networkNonce uint64 networkNonce uint64
) )
lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil) err := ns.chainProvider.EthClient.CallCtx(
if err != nil {
return 0, err
}
defer lock.Release(ctx)
err = ns.chainProvider.EthClient.CallCtx(
ctx, ctx,
eth.Nonce(w3.A(publicKey), nil).Returns(&networkNonce), eth.Nonce(w3.A(publicKey), nil).Returns(&networkNonce),
) )
@ -148,13 +115,7 @@ func (ns *RedisNoncestore) SyncNetworkNonce(ctx context.Context, publicKey strin
} }
func (ns *RedisNoncestore) SetNewAccountNonce(ctx context.Context, publicKey string) error { func (ns *RedisNoncestore) SetNewAccountNonce(ctx context.Context, publicKey string) error {
lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil) err := ns.redis.Set(ctx, publicKey, 0, 0).Err()
if err != nil {
return err
}
defer lock.Release(ctx)
err = ns.redis.Set(ctx, publicKey, 0, 0).Err()
if err != nil { if err != nil {
return err return err
} }

View File

@ -39,6 +39,12 @@ func (tp *TaskerProcessor) giftGasProcessor(ctx context.Context, t *asynq.Task)
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
} }
lock, err := tp.LockProvider.Obtain(ctx, tp.ActionsProvider.SystemPublicKey, LockTTL, nil)
if err != nil {
return err
}
defer lock.Release(ctx)
signedTx, err := tp.ActionsProvider.SignGiftGasTx(ctx, p.PublicKey) signedTx, err := tp.ActionsProvider.SignGiftGasTx(ctx, p.PublicKey)
if err != nil { if err != nil {
return err return err

View File

@ -4,21 +4,30 @@ import (
"time" "time"
"github.com/bsm/redislock" "github.com/bsm/redislock"
"github.com/go-redis/redis/v8"
"github.com/grassrootseconomics/cic-custodial/internal/actions" "github.com/grassrootseconomics/cic-custodial/internal/actions"
tasker_client "github.com/grassrootseconomics/cic-custodial/internal/tasker/client" tasker_client "github.com/grassrootseconomics/cic-custodial/internal/tasker/client"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
const (
LockTTL = 1 * time.Second
)
type Opts struct { type Opts struct {
ActionsProvider *actions.ActionsProvider ActionsProvider *actions.ActionsProvider
TaskerClient *tasker_client.TaskerClient TaskerClient *tasker_client.TaskerClient
RedisDSN string RedisDSN string
RedisLockDB int
RedisLockMinIdleConns int
RedisLockPoolSize int
Concurrency int Concurrency int
Logger logf.Logger Logger logf.Logger
} }
type TaskerProcessor struct { type TaskerProcessor struct {
LockProvider *redislock.Client
ActionsProvider *actions.ActionsProvider ActionsProvider *actions.ActionsProvider
TaskerClient *tasker_client.TaskerClient TaskerClient *tasker_client.TaskerClient
} }
@ -29,9 +38,17 @@ type TaskerServer struct {
} }
func NewTaskerServer(o Opts) *TaskerServer { func NewTaskerServer(o Opts) *TaskerServer {
redisLockClient := redis.NewClient(&redis.Options{
Addr: o.RedisDSN,
DB: o.RedisLockDB,
MinIdleConns: o.RedisLockMinIdleConns,
PoolSize: o.RedisLockPoolSize,
})
taskerProcessor := &TaskerProcessor{ taskerProcessor := &TaskerProcessor{
ActionsProvider: o.ActionsProvider, ActionsProvider: o.ActionsProvider,
TaskerClient: o.TaskerClient, TaskerClient: o.TaskerClient,
LockProvider: redislock.New(redisLockClient),
} }
asynqServer := asynq.NewServer( asynqServer := asynq.NewServer(