cic-custodial/internal/tasker/server/server.go
Mohammed Sohail 1bc8d65016
refactor: remove action provider
* allows better control over accounting  locking
2022-10-26 09:11:15 +00:00

111 lines
2.9 KiB
Go

package server
import (
"crypto/ecdsa"
"time"
"github.com/bsm/redislock"
eth_crypto "github.com/ethereum/go-ethereum/crypto"
"github.com/go-redis/redis/v8"
"github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/noncestore"
tasker_client "github.com/grassrootseconomics/cic-custodial/internal/tasker/client"
"github.com/grassrootseconomics/cic-go-sdk/chain"
"github.com/hibiken/asynq"
"github.com/zerodha/logf"
)
const (
LockTTL = 1 * time.Second
)
type Opts struct {
SystemPublicKey string
SystemPrivateKey string
ChainProvider *chain.Provider
Keystore keystore.Keystore
Noncestore noncestore.Noncestore
TaskerClient *tasker_client.TaskerClient
RedisDSN string
RedisLockDB int
RedisLockMinIdleConns int
RedisLockPoolSize int
Concurrency int
Logger logf.Logger
}
type TaskerProcessor struct {
SystemPublicKey string
SystemPrivateKey *ecdsa.PrivateKey
ChainProvider *chain.Provider
Noncestore noncestore.Noncestore
Keystore keystore.Keystore
LockProvider *redislock.Client
TaskerClient *tasker_client.TaskerClient
}
type TaskerServer struct {
Server *asynq.Server
Mux *asynq.ServeMux
}
func NewTaskerServer(o Opts) (*TaskerServer, error) {
loadedPrivateKey, err := eth_crypto.HexToECDSA(o.SystemPrivateKey)
if err != nil {
return nil, err
}
redisLockClient := redis.NewClient(&redis.Options{
Addr: o.RedisDSN,
DB: o.RedisLockDB,
MinIdleConns: o.RedisLockMinIdleConns,
PoolSize: o.RedisLockPoolSize,
})
taskerProcessor := &TaskerProcessor{
SystemPublicKey: o.SystemPublicKey,
SystemPrivateKey: loadedPrivateKey,
ChainProvider: o.ChainProvider,
Noncestore: o.Noncestore,
Keystore: o.Keystore,
TaskerClient: o.TaskerClient,
LockProvider: redislock.New(redisLockClient),
}
asynqServer := asynq.NewServer(
asynq.RedisClientOpt{
Addr: o.RedisDSN,
},
asynq.Config{
Concurrency: o.Concurrency,
Logger: asynqCompatibleLogger(o.Logger),
RetryDelayFunc: func(n int, e error, t *asynq.Task) time.Duration {
if n < 6 {
return 1 * time.Second
} else {
return asynq.DefaultRetryDelayFunc(n, e, t)
}
},
IsFailure: func(err error) bool {
switch err {
case redislock.ErrNotObtained:
return false
default:
return true
}
},
},
)
mux := asynq.NewServeMux()
mux.HandleFunc(string(tasker_client.ActivateAccountTask), taskerProcessor.activateAccountProcessor)
mux.HandleFunc(string(tasker_client.SetNewAccountNonceTask), taskerProcessor.setNewAccountNonce)
mux.HandleFunc(string(tasker_client.GiftGasTask), taskerProcessor.giftGasProcessor)
mux.HandleFunc(string(tasker_client.TxDispatchTask), taskerProcessor.txDispatcher)
return &TaskerServer{
Server: asynqServer,
Mux: mux,
}, nil
}