From 1bc8d650168249f7edf9b1e6a021dd92158bb227 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Wed, 26 Oct 2022 09:11:15 +0000 Subject: [PATCH] refactor: remove action provider * allows better control over accounting locking --- cmd/service/init.go | 18 ++++- cmd/service/main.go | 36 +++++----- internal/actions/actions.go | 71 -------------------- internal/actions/dispatch.go | 22 ------ internal/actions/storage.go | 36 ---------- internal/actions/system.go | 53 --------------- internal/api/registration.go | 13 ++-- internal/api/server.go | 8 +-- internal/noncestore/providers/redis/redis.go | 5 ++ internal/tasker/server/dispatch.go | 11 ++- internal/tasker/server/registration.go | 25 +++++-- internal/tasker/server/server.go | 41 ++++++++--- 12 files changed, 109 insertions(+), 230 deletions(-) delete mode 100644 internal/actions/actions.go delete mode 100644 internal/actions/dispatch.go delete mode 100644 internal/actions/storage.go delete mode 100644 internal/actions/system.go diff --git a/cmd/service/init.go b/cmd/service/init.go index 68f0f9e..21a0b4d 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" "strings" @@ -90,6 +91,8 @@ func initKeystore() keystore.Keystore { } func initNoncestore() noncestore.Noncestore { + var loadedNoncestore noncestore.Noncestore + switch provider := ko.MustString("noncestore.provider"); provider { case "redis": redisNoncestore, err := redis_noncestore.NewRedisNoncestore(redis_noncestore.Opts{ @@ -103,11 +106,22 @@ func initNoncestore() noncestore.Noncestore { lo.Fatal("initNoncestore", "error", err) } - return redisNoncestore + loadedNoncestore = redisNoncestore case "postgres": lo.Fatal("initNoncestore", "error", "not implemented") default: lo.Fatal("initNoncestore", "error", "no noncestore provider selected") } - return nil + + currentSystemNonce, err := loadedNoncestore.Peek(context.Background(), ko.MustString("admin.public")) + lo.Debug("initNoncestore: loaded (noncestore) system nonce", "nonce", currentSystemNonce) + if err != nil { + nonce, err := loadedNoncestore.SyncNetworkNonce(context.Background(), ko.MustString("admin.public")) + lo.Debug("initNoncestore: syncing system nonce", "nonce", nonce) + if err != nil { + lo.Fatal("initNonceStore", "error", "system account nonce sync failed") + } + } + + return loadedNoncestore } diff --git a/cmd/service/main.go b/cmd/service/main.go index 32786fd..a2b351e 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -8,7 +8,6 @@ import ( "sync" "syscall" - "github.com/grassrootseconomics/cic-custodial/internal/actions" "github.com/grassrootseconomics/cic-custodial/internal/api" tasker_client "github.com/grassrootseconomics/cic-custodial/internal/tasker/client" tasker_server "github.com/grassrootseconomics/cic-custodial/internal/tasker/server" @@ -40,35 +39,32 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - taskerClient = initTaskerClient() - - actionsProvider, err := actions.NewActionsProvider(actions.Opts{ - SystemPublicKey: ko.MustString("admin.public"), - SystemPrivateKey: ko.MustString("admin.key"), - ChainProvider: chainProvider, - Keystore: initKeystore(), - Noncestore: initNoncestore(), - Logger: lo, - }) - if err != nil { - lo.Fatal("initActionsProvider", "err", err) - } + taskerClient := initTaskerClient() + keystore := initKeystore() + noncestore := initNoncestore() httpServer = api.BootstrapHTTPServer(api.Opts{ - ActionsProvider: actionsProvider, - TaskerClient: taskerClient, + Keystore: keystore, + TaskerClient: taskerClient, }) - taskerServer = tasker_server.NewTaskerServer(tasker_server.Opts{ - ActionsProvider: actionsProvider, + taskerServer, err := tasker_server.NewTaskerServer(tasker_server.Opts{ + SystemPublicKey: ko.MustString("admin.public"), + SystemPrivateKey: ko.MustString("admin.key"), + ChainProvider: chainProvider, + Keystore: keystore, + Noncestore: noncestore, TaskerClient: taskerClient, RedisDSN: ko.MustString("redis.dsn"), - Concurrency: 20, - Logger: lo, RedisLockDB: 1, RedisLockMinIdleConns: 3, RedisLockPoolSize: 6, + Concurrency: 20, + Logger: lo, }) + if err != nil { + lo.Fatal("initTaskerServer", "err", err) + } var wg sync.WaitGroup diff --git a/internal/actions/actions.go b/internal/actions/actions.go deleted file mode 100644 index 4700e75..0000000 --- a/internal/actions/actions.go +++ /dev/null @@ -1,71 +0,0 @@ -package actions - -import ( - "context" - "crypto/ecdsa" - - "github.com/ethereum/go-ethereum/core/types" - eth_crypto "github.com/ethereum/go-ethereum/crypto" - "github.com/grassrootseconomics/cic-custodial/internal/ethereum" - "github.com/grassrootseconomics/cic-custodial/internal/keystore" - "github.com/grassrootseconomics/cic-custodial/internal/noncestore" - "github.com/grassrootseconomics/cic-go-sdk/chain" - "github.com/zerodha/logf" -) - -type Actions interface { - CreateNewKeyPair(context.Context) (ethereum.Key, error) - ActivateCustodialAccount(context.Context, string) error - SetNewAccountNonce(context.Context, string) error - - SignGiftGasTx(context.Context, string) (*types.Transaction, error) - SignTopUpGasTx(context.Context, string) (*types.Transaction, error) - SignGiftVouchertx(context.Context, string) (*types.Transaction, error) - - DispatchSignedTx(context.Context, *types.Transaction) (string, error) -} - -type Opts struct { - SystemPublicKey string - SystemPrivateKey string - ChainProvider *chain.Provider - Keystore keystore.Keystore - Noncestore noncestore.Noncestore - Logger logf.Logger -} - -type ActionsProvider struct { - SystemPublicKey string - SystemPrivateKey *ecdsa.PrivateKey - ChainProvider *chain.Provider - Keystore keystore.Keystore - Noncestore noncestore.Noncestore - Lo logf.Logger -} - -func NewActionsProvider(o Opts) (*ActionsProvider, error) { - var _ Actions = (*ActionsProvider)(nil) - - loadedPrivateKey, err := eth_crypto.HexToECDSA(o.SystemPrivateKey) - if err != nil { - return nil, err - } - - _, err = o.Noncestore.Peek(context.Background(), o.SystemPublicKey) - if err != nil { - nonce, err := o.Noncestore.SyncNetworkNonce(context.Background(), o.SystemPublicKey) - o.Logger.Debug("actionsProvider: syncing system nonce", "nonce", nonce) - if err != nil { - return nil, err - } - } - - return &ActionsProvider{ - SystemPublicKey: o.SystemPublicKey, - SystemPrivateKey: loadedPrivateKey, - ChainProvider: o.ChainProvider, - Keystore: o.Keystore, - Noncestore: o.Noncestore, - Lo: o.Logger, - }, nil -} diff --git a/internal/actions/dispatch.go b/internal/actions/dispatch.go deleted file mode 100644 index 987df29..0000000 --- a/internal/actions/dispatch.go +++ /dev/null @@ -1,22 +0,0 @@ -package actions - -import ( - "context" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/lmittmann/w3/module/eth" -) - -func (ap *ActionsProvider) DispatchSignedTx(ctx context.Context, builtTx *types.Transaction) (string, error) { - var txHash common.Hash - - if err := ap.ChainProvider.EthClient.CallCtx( - ctx, - eth.SendTx(builtTx).Returns(&txHash), - ); err != nil { - return "", err - } - - return txHash.String(), nil -} diff --git a/internal/actions/storage.go b/internal/actions/storage.go deleted file mode 100644 index d0d2549..0000000 --- a/internal/actions/storage.go +++ /dev/null @@ -1,36 +0,0 @@ -package actions - -import ( - "context" - - "github.com/grassrootseconomics/cic-custodial/internal/ethereum" -) - -func (ap *ActionsProvider) CreateNewKeyPair(ctx context.Context) (ethereum.Key, error) { - generatedKeyPair, err := ethereum.GenerateKeyPair() - if err != nil { - return ethereum.Key{}, err - } - - if err := ap.Keystore.WriteKeyPair(ctx, generatedKeyPair); err != nil { - return ethereum.Key{}, err - } - - return generatedKeyPair, nil -} - -func (ap *ActionsProvider) ActivateCustodialAccount(ctx context.Context, publicKey string) error { - if err := ap.Keystore.ActivateAccount(ctx, publicKey); err != nil { - return err - } - - return nil -} - -func (ap *ActionsProvider) SetNewAccountNonce(ctx context.Context, publicKey string) error { - if err := ap.Noncestore.SetNewAccountNonce(ctx, publicKey); err != nil { - return err - } - - return nil -} diff --git a/internal/actions/system.go b/internal/actions/system.go deleted file mode 100644 index d106a9c..0000000 --- a/internal/actions/system.go +++ /dev/null @@ -1,53 +0,0 @@ -package actions - -import ( - "context" - "math/big" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/grassrootseconomics/cic-go-sdk/chain" - "github.com/lmittmann/w3" -) - -const ( - initialGiftGasValue = 1000000 - topupGiftGasValue = 500000 -) - -func (ap *ActionsProvider) SignGiftGasTx(ctx context.Context, giftTo string) (*types.Transaction, error) { - nonce, err := ap.Noncestore.Acquire(ctx, ap.SystemPublicKey) - if err != nil { - return nil, err - } - - builtTx, err := ap.ChainProvider.BuildGasTransferTx(ap.SystemPrivateKey, chain.TransactionData{ - To: w3.A(giftTo), - Nonce: nonce, - }, big.NewInt(initialGiftGasValue)) - if err != nil { - return &types.Transaction{}, err - } - - return builtTx, nil -} - -func (ap *ActionsProvider) SignTopUpGasTx(ctx context.Context, giftTo string) (*types.Transaction, error) { - nonce, err := ap.Noncestore.Acquire(ctx, ap.SystemPublicKey) - if err != nil { - return nil, err - } - - builtTx, err := ap.ChainProvider.BuildGasTransferTx(ap.SystemPrivateKey, chain.TransactionData{ - To: w3.A(giftTo), - Nonce: nonce, - }, big.NewInt(topupGiftGasValue)) - if err != nil { - return &types.Transaction{}, err - } - - return builtTx, nil -} - -func (ap *ActionsProvider) SignGiftVouchertx(ctx context.Context, giftTo string) (*types.Transaction, error) { - return &types.Transaction{}, nil -} diff --git a/internal/api/registration.go b/internal/api/registration.go index f4da35a..54353ee 100644 --- a/internal/api/registration.go +++ b/internal/api/registration.go @@ -3,27 +3,32 @@ package api import ( "net/http" - "github.com/grassrootseconomics/cic-custodial/internal/actions" + "github.com/grassrootseconomics/cic-custodial/internal/ethereum" + "github.com/grassrootseconomics/cic-custodial/internal/keystore" tasker_client "github.com/grassrootseconomics/cic-custodial/internal/tasker/client" "github.com/labstack/echo/v4" ) type registrationResponse struct { PublicKey string `json:"publicKey"` - JobRef string `json:"jobRef"` + JobRef string `json:"jobRef"` } func handleRegistration(c echo.Context) error { var ( - ap = c.Get("actions").(*actions.ActionsProvider) tc = c.Get("tasker_client").(*tasker_client.TaskerClient) + ks = c.Get("keystore").(keystore.Keystore) ) - generatedKeyPair, err := ap.CreateNewKeyPair(c.Request().Context()) + generatedKeyPair, err := ethereum.GenerateKeyPair() if err != nil { return echo.NewHTTPError(http.StatusInternalServerError, "ERR_GEN_KEYPAIR") } + if err := ks.WriteKeyPair(c.Request().Context(), generatedKeyPair); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, "ERR_SAVE_KEYPAIR") + } + job, err := tc.CreateRegistrationTask(tasker_client.RegistrationPayload{ PublicKey: generatedKeyPair.Public, }, tasker_client.SetNewAccountNonceTask) diff --git a/internal/api/server.go b/internal/api/server.go index 220b2dc..f15fbd9 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -4,7 +4,7 @@ import ( "net/http" "github.com/arl/statsviz" - "github.com/grassrootseconomics/cic-custodial/internal/actions" + "github.com/grassrootseconomics/cic-custodial/internal/keystore" tasker_client "github.com/grassrootseconomics/cic-custodial/internal/tasker/client" "github.com/labstack/echo/v4" ) @@ -14,8 +14,8 @@ type okResp struct { } type Opts struct { - ActionsProvider *actions.ActionsProvider - TaskerClient *tasker_client.TaskerClient + Keystore keystore.Keystore + TaskerClient *tasker_client.TaskerClient } func BootstrapHTTPServer(o Opts) *echo.Echo { @@ -31,7 +31,7 @@ func BootstrapHTTPServer(o Opts) *echo.Echo { server.Use(func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { - c.Set("actions", o.ActionsProvider) + c.Set("keystore", o.Keystore) c.Set("tasker_client", o.TaskerClient) return next(c) } diff --git a/internal/noncestore/providers/redis/redis.go b/internal/noncestore/providers/redis/redis.go index af36c83..83ddf4b 100644 --- a/internal/noncestore/providers/redis/redis.go +++ b/internal/noncestore/providers/redis/redis.go @@ -8,6 +8,7 @@ import ( "github.com/grassrootseconomics/cic-go-sdk/chain" "github.com/lmittmann/w3" "github.com/lmittmann/w3/module/eth" + "github.com/zerodha/logf" ) // Opts represents the Redis nonce store specific params @@ -17,12 +18,14 @@ type Opts struct { MinIdleConns int PoolSize int ChainProvider *chain.Provider + Lo logf.Logger } // RedisNoncestore implements `noncestore.Noncestore` type RedisNoncestore struct { chainProvider *chain.Provider redis *redis.Client + lo logf.Logger } func NewRedisNoncestore(o Opts) (noncestore.Noncestore, error) { @@ -40,6 +43,7 @@ func NewRedisNoncestore(o Opts) (noncestore.Noncestore, error) { return &RedisNoncestore{ redis: redisClient, chainProvider: o.ChainProvider, + lo: o.Lo, }, nil } @@ -117,6 +121,7 @@ func (ns *RedisNoncestore) SyncNetworkNonce(ctx context.Context, publicKey strin func (ns *RedisNoncestore) SetNewAccountNonce(ctx context.Context, publicKey string) error { err := ns.redis.Set(ctx, publicKey, 0, 0).Err() if err != nil { + ns.lo.Error("noncestore", "err", err) return err } diff --git a/internal/tasker/server/dispatch.go b/internal/tasker/server/dispatch.go index e9fd207..9490bc1 100644 --- a/internal/tasker/server/dispatch.go +++ b/internal/tasker/server/dispatch.go @@ -5,21 +5,26 @@ import ( "encoding/json" "fmt" + "github.com/ethereum/go-ethereum/common" "github.com/grassrootseconomics/cic-custodial/internal/tasker/client" "github.com/hibiken/asynq" + "github.com/lmittmann/w3/module/eth" ) func (tp *TaskerProcessor) txDispatcher(ctx context.Context, t *asynq.Task) error { var ( - p client.TxPayload + p client.TxPayload + txHash common.Hash ) if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - _, err := tp.ActionsProvider.DispatchSignedTx(ctx, p.Tx) - if err != nil { + if err := tp.ChainProvider.EthClient.CallCtx( + ctx, + eth.SendTx(p.Tx).Returns(&txHash), + ); err != nil { return err } diff --git a/internal/tasker/server/registration.go b/internal/tasker/server/registration.go index e6d48f7..5a6981e 100644 --- a/internal/tasker/server/registration.go +++ b/internal/tasker/server/registration.go @@ -4,9 +4,16 @@ import ( "context" "encoding/json" "fmt" + "math/big" "github.com/grassrootseconomics/cic-custodial/internal/tasker/client" + "github.com/grassrootseconomics/cic-go-sdk/chain" "github.com/hibiken/asynq" + "github.com/lmittmann/w3" +) + +const ( + initialGiftGasValue = 1000000 ) func (tp *TaskerProcessor) setNewAccountNonce(ctx context.Context, t *asynq.Task) error { @@ -18,7 +25,7 @@ func (tp *TaskerProcessor) setNewAccountNonce(ctx context.Context, t *asynq.Task return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - if err := tp.ActionsProvider.SetNewAccountNonce(ctx, p.PublicKey); err != nil { + if err := tp.Noncestore.SetNewAccountNonce(ctx, p.PublicKey); err != nil { return err } @@ -39,19 +46,27 @@ func (tp *TaskerProcessor) giftGasProcessor(ctx context.Context, t *asynq.Task) return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - lock, err := tp.LockProvider.Obtain(ctx, tp.ActionsProvider.SystemPublicKey, LockTTL, nil) + lock, err := tp.LockProvider.Obtain(ctx, tp.SystemPublicKey, LockTTL, nil) if err != nil { return err } defer lock.Release(ctx) - signedTx, err := tp.ActionsProvider.SignGiftGasTx(ctx, p.PublicKey) + nonce, err := tp.Noncestore.Acquire(ctx, tp.SystemPublicKey) + if err != nil { + return err + } + + builtTx, err := tp.ChainProvider.BuildGasTransferTx(tp.SystemPrivateKey, chain.TransactionData{ + To: w3.A(p.PublicKey), + Nonce: nonce, + }, big.NewInt(initialGiftGasValue)) if err != nil { return err } _, err = tp.TaskerClient.CreateTxDispatchTask(client.TxPayload{ - Tx: signedTx, + Tx: builtTx, }, client.TxDispatchTask) if err != nil { return err @@ -74,7 +89,7 @@ func (tp *TaskerProcessor) activateAccountProcessor(ctx context.Context, t *asyn return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - if err := tp.ActionsProvider.ActivateCustodialAccount(ctx, p.PublicKey); err != nil { + if err := tp.Keystore.ActivateAccount(ctx, p.PublicKey); err != nil { return err } diff --git a/internal/tasker/server/server.go b/internal/tasker/server/server.go index 9069954..e5996d0 100644 --- a/internal/tasker/server/server.go +++ b/internal/tasker/server/server.go @@ -1,12 +1,16 @@ package server import ( + "crypto/ecdsa" "time" "github.com/bsm/redislock" + eth_crypto "github.com/ethereum/go-ethereum/crypto" "github.com/go-redis/redis/v8" - "github.com/grassrootseconomics/cic-custodial/internal/actions" + "github.com/grassrootseconomics/cic-custodial/internal/keystore" + "github.com/grassrootseconomics/cic-custodial/internal/noncestore" tasker_client "github.com/grassrootseconomics/cic-custodial/internal/tasker/client" + "github.com/grassrootseconomics/cic-go-sdk/chain" "github.com/hibiken/asynq" "github.com/zerodha/logf" ) @@ -16,7 +20,11 @@ const ( ) type Opts struct { - ActionsProvider *actions.ActionsProvider + SystemPublicKey string + SystemPrivateKey string + ChainProvider *chain.Provider + Keystore keystore.Keystore + Noncestore noncestore.Noncestore TaskerClient *tasker_client.TaskerClient RedisDSN string RedisLockDB int @@ -27,9 +35,13 @@ type Opts struct { } type TaskerProcessor struct { - LockProvider *redislock.Client - ActionsProvider *actions.ActionsProvider - TaskerClient *tasker_client.TaskerClient + SystemPublicKey string + SystemPrivateKey *ecdsa.PrivateKey + ChainProvider *chain.Provider + Noncestore noncestore.Noncestore + Keystore keystore.Keystore + LockProvider *redislock.Client + TaskerClient *tasker_client.TaskerClient } type TaskerServer struct { @@ -37,7 +49,12 @@ type TaskerServer struct { Mux *asynq.ServeMux } -func NewTaskerServer(o Opts) *TaskerServer { +func NewTaskerServer(o Opts) (*TaskerServer, error) { + loadedPrivateKey, err := eth_crypto.HexToECDSA(o.SystemPrivateKey) + if err != nil { + return nil, err + } + redisLockClient := redis.NewClient(&redis.Options{ Addr: o.RedisDSN, DB: o.RedisLockDB, @@ -46,9 +63,13 @@ func NewTaskerServer(o Opts) *TaskerServer { }) taskerProcessor := &TaskerProcessor{ - ActionsProvider: o.ActionsProvider, - TaskerClient: o.TaskerClient, - LockProvider: redislock.New(redisLockClient), + SystemPublicKey: o.SystemPublicKey, + SystemPrivateKey: loadedPrivateKey, + ChainProvider: o.ChainProvider, + Noncestore: o.Noncestore, + Keystore: o.Keystore, + TaskerClient: o.TaskerClient, + LockProvider: redislock.New(redisLockClient), } asynqServer := asynq.NewServer( @@ -85,5 +106,5 @@ func NewTaskerServer(o Opts) *TaskerServer { return &TaskerServer{ Server: asynqServer, Mux: mux, - } + }, nil }