diff --git a/cmd/service/init.go b/cmd/service/init.go index c48a422..68f0f9e 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -66,7 +66,7 @@ func initChainProvider() *chain.Provider { func initTaskerClient() *tasker_client.TaskerClient { return tasker_client.NewTaskerClient(tasker_client.Opts{ - RedisDSN: ko.MustString("tasker.dsn"), + RedisDSN: ko.MustString("redis.dsn"), }) } @@ -93,10 +93,10 @@ func initNoncestore() noncestore.Noncestore { switch provider := ko.MustString("noncestore.provider"); provider { case "redis": redisNoncestore, err := redis_noncestore.NewRedisNoncestore(redis_noncestore.Opts{ - RedisDSN: ko.MustString("noncestore.dsn"), + RedisDSN: ko.MustString("redis.dsn"), RedisDB: 2, - MinIdleConns: 8, - PoolSize: 15, + MinIdleConns: 5, + PoolSize: 10, ChainProvider: chainProvider, }) if err != nil { diff --git a/cmd/service/main.go b/cmd/service/main.go index ee19d04..94ef031 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -60,11 +60,14 @@ func main() { }) taskerServer = tasker_server.NewTaskerServer(tasker_server.Opts{ - ActionsProvider: actionsProvider, - TaskerClient: taskerClient, - RedisDSN: ko.MustString("tasker.dsn"), - Concurrency: 15, - Logger: lo, + ActionsProvider: actionsProvider, + TaskerClient: taskerClient, + RedisDSN: ko.MustString("redis.dsn"), + Concurrency: 15, + Logger: lo, + RedisLockDB: 1, + RedisLockMinIdleConns: 3, + RedisLockPoolSize: 6, }) var wg sync.WaitGroup diff --git a/config.toml b/config.toml index 00884b3..8ea4167 100644 --- a/config.toml +++ b/config.toml @@ -6,10 +6,10 @@ address = ":5000" endpoint = "https://rpc.sarafu.network" [admin] -public = "0x28Daa9bA426383fA0296A669Af13f3EeF57A2877" -key = "c7f7e36c62a1ecfc4fba508421919c6449e7dced0ea846ce00fe3e93a9aaa608" +public = "0x80097c773B3E83472FC7952c5206a7DB35d42bEF" +key = "a6af6c597c614e3c8ee4b7638ab7c3f737aece3773a5413ca8caf4338e6b06d1" -[tasker] +[redis] dsn = "127.0.0.1:6379" [keystore] @@ -18,4 +18,3 @@ dsn = "postgres://postgres:postgres@localhost:5432/cic_custodial" [noncestore] provider = "redis" -dsn = "127.0.0.1:6379" diff --git a/internal/noncestore/providers/redis/redis.go b/internal/noncestore/providers/redis/redis.go index 3c15e28..af36c83 100644 --- a/internal/noncestore/providers/redis/redis.go +++ b/internal/noncestore/providers/redis/redis.go @@ -2,9 +2,7 @@ package redis import ( "context" - "time" - "github.com/bsm/redislock" "github.com/go-redis/redis/v8" "github.com/grassrootseconomics/cic-custodial/internal/noncestore" "github.com/grassrootseconomics/cic-go-sdk/chain" @@ -12,11 +10,6 @@ import ( "github.com/lmittmann/w3/module/eth" ) -const ( - mutexLockTTL = 200 * time.Millisecond - mutexKeyPrefix = "lock_" -) - // Opts represents the Redis nonce store specific params type Opts struct { RedisDSN string @@ -28,9 +21,8 @@ type Opts struct { // RedisNoncestore implements `noncestore.Noncestore` type RedisNoncestore struct { - redisLockProvider *redislock.Client - chainProvider *chain.Provider - redis *redis.Client + chainProvider *chain.Provider + redis *redis.Client } func NewRedisNoncestore(o Opts) (noncestore.Noncestore, error) { @@ -46,19 +38,12 @@ func NewRedisNoncestore(o Opts) (noncestore.Noncestore, error) { } return &RedisNoncestore{ - redisLockProvider: redislock.New(redisClient), - redis: redisClient, - chainProvider: o.ChainProvider, + redis: redisClient, + chainProvider: o.ChainProvider, }, nil } func (ns *RedisNoncestore) Peek(ctx context.Context, publicKey string) (uint64, error) { - lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil) - if err != nil { - return 0, err - } - defer lock.Release(ctx) - nonce, err := ns.redis.Get(ctx, publicKey).Uint64() if err != nil { return 0, err @@ -72,13 +57,7 @@ func (ns *RedisNoncestore) Acquire(ctx context.Context, publicKey string) (uint6 nonce uint64 ) - lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil) - if err != nil { - return 0, err - } - defer lock.Release(ctx) - - nonce, err = ns.redis.Get(ctx, publicKey).Uint64() + nonce, err := ns.redis.Get(ctx, publicKey).Uint64() if err == redis.Nil { networkNonce, err := ns.SyncNetworkNonce(ctx, publicKey) if err != nil { @@ -99,12 +78,6 @@ func (ns *RedisNoncestore) Acquire(ctx context.Context, publicKey string) (uint6 } func (ns *RedisNoncestore) Return(ctx context.Context, publicKey string) (uint64, error) { - lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil) - if err != nil { - return 0, err - } - defer lock.Release(ctx) - nonce, err := ns.redis.Get(ctx, publicKey).Uint64() if err != nil { return 0, err @@ -125,13 +98,7 @@ func (ns *RedisNoncestore) SyncNetworkNonce(ctx context.Context, publicKey strin networkNonce uint64 ) - lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil) - if err != nil { - return 0, err - } - defer lock.Release(ctx) - - err = ns.chainProvider.EthClient.CallCtx( + err := ns.chainProvider.EthClient.CallCtx( ctx, eth.Nonce(w3.A(publicKey), nil).Returns(&networkNonce), ) @@ -148,13 +115,7 @@ func (ns *RedisNoncestore) SyncNetworkNonce(ctx context.Context, publicKey strin } func (ns *RedisNoncestore) SetNewAccountNonce(ctx context.Context, publicKey string) error { - lock, err := ns.redisLockProvider.Obtain(ctx, mutexKeyPrefix+publicKey, mutexLockTTL, nil) - if err != nil { - return err - } - defer lock.Release(ctx) - - err = ns.redis.Set(ctx, publicKey, 0, 0).Err() + err := ns.redis.Set(ctx, publicKey, 0, 0).Err() if err != nil { return err } diff --git a/internal/tasker/server/registration.go b/internal/tasker/server/registration.go index a3d2f9f..e6d48f7 100644 --- a/internal/tasker/server/registration.go +++ b/internal/tasker/server/registration.go @@ -39,6 +39,12 @@ func (tp *TaskerProcessor) giftGasProcessor(ctx context.Context, t *asynq.Task) return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } + lock, err := tp.LockProvider.Obtain(ctx, tp.ActionsProvider.SystemPublicKey, LockTTL, nil) + if err != nil { + return err + } + defer lock.Release(ctx) + signedTx, err := tp.ActionsProvider.SignGiftGasTx(ctx, p.PublicKey) if err != nil { return err diff --git a/internal/tasker/server/server.go b/internal/tasker/server/server.go index 6a3dbee..9069954 100644 --- a/internal/tasker/server/server.go +++ b/internal/tasker/server/server.go @@ -4,21 +4,30 @@ import ( "time" "github.com/bsm/redislock" + "github.com/go-redis/redis/v8" "github.com/grassrootseconomics/cic-custodial/internal/actions" tasker_client "github.com/grassrootseconomics/cic-custodial/internal/tasker/client" "github.com/hibiken/asynq" "github.com/zerodha/logf" ) +const ( + LockTTL = 1 * time.Second +) + type Opts struct { - ActionsProvider *actions.ActionsProvider - TaskerClient *tasker_client.TaskerClient - RedisDSN string - Concurrency int - Logger logf.Logger + ActionsProvider *actions.ActionsProvider + TaskerClient *tasker_client.TaskerClient + RedisDSN string + RedisLockDB int + RedisLockMinIdleConns int + RedisLockPoolSize int + Concurrency int + Logger logf.Logger } type TaskerProcessor struct { + LockProvider *redislock.Client ActionsProvider *actions.ActionsProvider TaskerClient *tasker_client.TaskerClient } @@ -29,9 +38,17 @@ type TaskerServer struct { } func NewTaskerServer(o Opts) *TaskerServer { + redisLockClient := redis.NewClient(&redis.Options{ + Addr: o.RedisDSN, + DB: o.RedisLockDB, + MinIdleConns: o.RedisLockMinIdleConns, + PoolSize: o.RedisLockPoolSize, + }) + taskerProcessor := &TaskerProcessor{ ActionsProvider: o.ActionsProvider, TaskerClient: o.TaskerClient, + LockProvider: redislock.New(redisLockClient), } asynqServer := asynq.NewServer(