From e203c49049b6f798ffef657a2d026f7a022f4468 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Wed, 29 Mar 2023 16:10:58 +0000 Subject: [PATCH] major refactor: use proxy contract and gas faucet (see notes) * remove uncessary tasks and task handlers * reafctor custodial container * refactor gas refiller. Gas refiller can queue at a later time to match cooldown * refactor sub handler to process chain events --- cmd/service/custodial.go | 73 --------- cmd/service/init.go | 3 +- cmd/service/main.go | 42 ++--- cmd/service/tasker.go | 4 - cmd/service/utils.go | 2 +- config.toml | 18 +-- go.mod | 2 +- go.sum | 2 + internal/api/account.go | 2 +- internal/api/network.go | 5 +- internal/api/sign.go | 29 +++- internal/custodial/abis.go | 27 ++++ internal/custodial/custodial.go | 105 ++++++++++--- internal/nonce/redis.go | 16 +- internal/pub/js_pub.go | 18 +-- internal/sub/handler.go | 37 ++++- internal/tasker/task/account_activate.go | 45 ------ internal/tasker/task/account_gift_gas.go | 145 ------------------ internal/tasker/task/account_gift_voucher.go | 137 ----------------- internal/tasker/task/account_prepare.go | 81 ---------- internal/tasker/task/account_refill_gas.go | 140 ++++++++++------- ...egister_onchain.go => account_register.go} | 45 +++--- internal/tasker/task/dispatch_tx.go | 17 +- internal/tasker/task/sign_transfer.go | 57 ++----- internal/tasker/task/utils.go | 3 + internal/tasker/types.go | 12 +- pkg/enum/enum.go | 2 - 27 files changed, 339 insertions(+), 730 deletions(-) delete mode 100644 cmd/service/custodial.go create mode 100644 internal/custodial/abis.go delete mode 100644 internal/tasker/task/account_activate.go delete mode 100644 internal/tasker/task/account_gift_gas.go delete mode 100644 internal/tasker/task/account_gift_voucher.go delete mode 100644 internal/tasker/task/account_prepare.go rename internal/tasker/task/{account_register_onchain.go => account_register.go} (68%) diff --git a/cmd/service/custodial.go b/cmd/service/custodial.go deleted file mode 100644 index 02653bd..0000000 --- a/cmd/service/custodial.go +++ /dev/null @@ -1,73 +0,0 @@ -package main - -import ( - "context" - "math/big" - "time" - - eth_crypto "github.com/celo-org/celo-blockchain/crypto" - "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/nonce" - "github.com/grassrootseconomics/w3-celo-patch" - "github.com/redis/go-redis/v9" -) - -// Define common smart contrcat ABI's that can be injected into the system container. -// Any relevant function signature that will be used by the custodial system can be defined here. -func initAbis() map[string]*w3.Func { - return map[string]*w3.Func{ - // Keccak hash -> 0x449a52f8 - "mintTo": w3.MustNewFunc("mintTo(address, uint256)", "bool"), - // Keccak hash -> 0xa9059cbb - "transfer": w3.MustNewFunc("transfer(address,uint256)", "bool"), - // Keccak hash -> 0x23b872dd - "transferFrom": w3.MustNewFunc("transferFrom(address, address, uint256)", "bool"), - // Add to account index - "add": w3.MustNewFunc("add(address)", "bool"), - // giveTo gas refill - "giveTo": w3.MustNewFunc("giveTo(address)", "uint256"), - } -} - -// Bootstrap the internal custodial system configs and system signer key. -// This container is passed down to individual tasker and API handlers. -func initSystemContainer(ctx context.Context, noncestore nonce.Noncestore) *custodial.SystemContainer { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - // Some custodial system defaults loaded from the config file. - systemContainer := &custodial.SystemContainer{ - Abis: initAbis(), - AccountIndexContract: w3.A(ko.MustString("system.account_index_address")), - GasFaucetContract: w3.A(ko.MustString("system.gas_faucet_address")), - GasRefillThreshold: big.NewInt(ko.MustInt64("system.gas_refill_threshold")), - GasRefillValue: big.NewInt(ko.MustInt64("system.gas_refill_value")), - GiftableGasValue: big.NewInt(ko.MustInt64("system.giftable_gas_value")), - GiftableToken: w3.A(ko.MustString("system.giftable_token_address")), - GiftableTokenValue: big.NewInt(ko.MustInt64("system.giftable_token_value")), - LockTimeout: 1 * time.Second, - PublicKey: ko.MustString("system.public_key"), - TokenDecimals: ko.MustInt("system.token_decimals"), - TokenTransferGasLimit: uint64(ko.MustInt64("system.token_transfer_gas_limit")), - } - - // Check if system signer account nonce is present. - // If not (first boot), we bootstrap it from the network. - currentSystemNonce, err := noncestore.Peek(ctx, ko.MustString("system.public_key")) - lo.Info("custodial: loaded system nonce from noncestore", "nonce", currentSystemNonce) - if err == redis.Nil { - nonce, err := noncestore.SyncNetworkNonce(ctx, ko.MustString("system.public_key")) - lo.Info("custodial: syncing system nonce from network", "nonce", nonce) - if err != nil { - lo.Fatal("custodial: critical error bootstrapping system container", "error", err) - } - } - - loadedPrivateKey, err := eth_crypto.HexToECDSA(ko.MustString("system.private_key")) - if err != nil { - lo.Fatal("custodial: critical error bootstrapping system container", "error", err) - } - systemContainer.PrivateKey = loadedPrivateKey - - return systemContainer -} diff --git a/cmd/service/init.go b/cmd/service/init.go index 5b1e5a7..b968b42 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -157,10 +157,9 @@ func initPostgresKeystore(postgresPool *pgxpool.Pool, queries *queries.Queries) } // Load redis backed noncestore. -func initRedisNoncestore(redisPool *redis.RedisPool, celoProvider *celoutils.Provider) nonce.Noncestore { +func initRedisNoncestore(redisPool *redis.RedisPool) nonce.Noncestore { return nonce.NewRedisNoncestore(nonce.Opts{ RedisPool: redisPool, - CeloProvider: celoProvider, }) } diff --git a/cmd/service/main.go b/cmd/service/main.go index bc4ada1..4117e8f 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -14,13 +14,11 @@ import ( "github.com/zerodha/logf" ) -type ( - internalServiceContainer struct { - apiService *echo.Echo - jetstreamSub *sub.Sub - taskerService *tasker.TaskerServer - } -) +type internalServicesContainer struct { + apiService *echo.Echo + jetstreamSub *sub.Sub + taskerService *tasker.TaskerServer +} var ( build string @@ -56,27 +54,31 @@ func main() { postgresKeystore := initPostgresKeystore(postgresPool, parsedQueries) pgStore := initPostgresStore(postgresPool, parsedQueries) - redisNoncestore := initRedisNoncestore(redisPool, celoProvider) + redisNoncestore := initRedisNoncestore(redisPool) lockProvider := initLockProvider(redisPool.Client) taskerClient := initTaskerClient(asynqRedisPool) - systemContainer := initSystemContainer(context.Background(), redisNoncestore) natsConn, jsCtx := initJetStream() jsPub := initPub(jsCtx) - custodial := &custodial.Custodial{ - CeloProvider: celoProvider, - Keystore: postgresKeystore, - LockProvider: lockProvider, - Noncestore: redisNoncestore, - PgStore: pgStore, - Pub: jsPub, - RedisClient: redisPool.Client, - SystemContainer: systemContainer, - TaskerClient: taskerClient, + custodial, err := custodial.NewCustodial(custodial.Opts{ + CeloProvider: celoProvider, + Keystore: postgresKeystore, + LockProvider: lockProvider, + Noncestore: redisNoncestore, + PgStore: pgStore, + Pub: jsPub, + RedisClient: redisPool.Client, + RegistryAddress: ko.MustString("chain.registry_address"), + SystemPrivateKey: ko.MustString("system.private_key"), + SystemPublicKey: ko.MustString("system.public_key"), + TaskerClient: taskerClient, + }) + if err != nil { + lo.Fatal("main: crtical error loading custodial container", "error", err) } - internalServices := &internalServiceContainer{} + internalServices := &internalServicesContainer{} wg := &sync.WaitGroup{} signalCh, closeCh := createSigChannel() diff --git a/cmd/service/tasker.go b/cmd/service/tasker.go index dee656a..e2b536d 100644 --- a/cmd/service/tasker.go +++ b/cmd/service/tasker.go @@ -34,11 +34,7 @@ func initTasker(custodialContainer *custodial.Custodial, redisPool *redis.RedisP observibilityMiddleware(), }) - taskerServer.RegisterHandlers(tasker.AccountPrepareTask, task.AccountPrepare(custodialContainer)) taskerServer.RegisterHandlers(tasker.AccountRegisterTask, task.AccountRegisterOnChainProcessor(custodialContainer)) - taskerServer.RegisterHandlers(tasker.AccountGiftGasTask, task.AccountGiftGasProcessor(custodialContainer)) - taskerServer.RegisterHandlers(tasker.AccountGiftVoucherTask, task.GiftVoucherProcessor(custodialContainer)) - taskerServer.RegisterHandlers(tasker.AccountActivateTask, task.AccountActivateProcessor(custodialContainer)) taskerServer.RegisterHandlers(tasker.AccountRefillGasTask, task.AccountRefillGasProcessor(custodialContainer)) taskerServer.RegisterHandlers(tasker.SignTransferTask, task.SignTransfer(custodialContainer)) taskerServer.RegisterHandlers(tasker.DispatchTxTask, task.DispatchTx(custodialContainer)) diff --git a/cmd/service/utils.go b/cmd/service/utils.go index 8cac34e..3c76f9a 100644 --- a/cmd/service/utils.go +++ b/cmd/service/utils.go @@ -17,7 +17,7 @@ func createSigChannel() (chan os.Signal, func()) { } } -func startGracefulShutdown(ctx context.Context, internalServices *internalServiceContainer) { +func startGracefulShutdown(ctx context.Context, internalServices *internalServicesContainer) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() diff --git a/config.toml b/config.toml index 9a37ebd..7a7ce54 100644 --- a/config.toml +++ b/config.toml @@ -8,27 +8,11 @@ metrics = true rpc_endpoint = "" testnet = true devnet = false +registry_address = "" [system] -# System default values - -# Gas values are in wei with 18 d.p. precision unless otherwise stated -# Token values are in wei with 6 d.p. precision unless otherwise stated - -# All addresses MUST be checksumed -account_index_address = "" -gas_faucet_address = "" -gas_refill_threshold = 2500000000000000 -gas_refill_value = 15000000000000000 -giftable_gas_value = 15000000000000000 -giftable_token_address = "" -giftable_token_value = 5000000 -# System private key -# Should always be toped up private_key = "" public_key = "" -token_decimals = 6 -token_transfer_gas_limit = 200000 [postgres] dsn = "" diff --git a/go.mod b/go.mod index 99c00cf..60b51ec 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/georgysavva/scany/v2 v2.0.0 github.com/go-playground/validator/v10 v10.12.0 github.com/google/uuid v1.3.0 - github.com/grassrootseconomics/celoutils v1.1.1 + github.com/grassrootseconomics/celoutils v1.2.1 github.com/grassrootseconomics/w3-celo-patch v0.2.0 github.com/hibiken/asynq v0.24.0 github.com/jackc/pgx/v5 v5.3.1 diff --git a/go.sum b/go.sum index 878be3e..b73f6af 100644 --- a/go.sum +++ b/go.sum @@ -262,6 +262,8 @@ github.com/grassrootseconomics/asynq v0.25.0 h1:2zSz5YwNLu/oCTm/xfNixn86i9aw4zth github.com/grassrootseconomics/asynq v0.25.0/go.mod h1:pe2XOdK1eIbTgTmRFHIYl75lvVuTPJxZq2T9Ocz/+2s= github.com/grassrootseconomics/celoutils v1.1.1 h1:REsndvfBkPN8UKOoQFNEGm/sCwKtTm+woYtgMl3bfZ0= github.com/grassrootseconomics/celoutils v1.1.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q= +github.com/grassrootseconomics/celoutils v1.2.1 h1:ndM4h7Df0d57m2kdRXRStrnunqOL61wQ51rnOanX1KI= +github.com/grassrootseconomics/celoutils v1.2.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q= github.com/grassrootseconomics/w3-celo-patch v0.2.0 h1:YqibbPzX0tQKmxU1nUGzThPKk/fiYeYZY6Aif3eyu8U= github.com/grassrootseconomics/w3-celo-patch v0.2.0/go.mod h1:WhBXNzNIvHmS6B2hAeShs56oa9Azb4jQSrOMKuMdBWw= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= diff --git a/internal/api/account.go b/internal/api/account.go index 3bbe99d..8438a74 100644 --- a/internal/api/account.go +++ b/internal/api/account.go @@ -38,7 +38,7 @@ func HandleAccountCreate(cu *custodial.Custodial) func(echo.Context) error { _, err = cu.TaskerClient.CreateTask( c.Request().Context(), - tasker.AccountPrepareTask, + tasker.AccountRegisterTask, tasker.DefaultPriority, &tasker.Task{ Id: trackingId, diff --git a/internal/api/network.go b/internal/api/network.go index cb020a2..eb502ef 100644 --- a/internal/api/network.go +++ b/internal/api/network.go @@ -5,6 +5,7 @@ import ( "math/big" "net/http" + "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/w3-celo-patch" "github.com/grassrootseconomics/w3-celo-patch/module/eth" @@ -31,8 +32,8 @@ func HandleNetworkAccountStatus(cu *custodial.Custodial) func(echo.Context) erro if err := cu.CeloProvider.Client.CallCtx( c.Request().Context(), - eth.Nonce(w3.A(accountStatusRequest.Address), nil).Returns(&networkNonce), - eth.Balance(w3.A(accountStatusRequest.Address), nil).Returns(&networkBalance), + eth.Nonce(celoutils.HexToAddress(accountStatusRequest.Address), nil).Returns(&networkNonce), + eth.Balance(celoutils.HexToAddress(accountStatusRequest.Address), nil).Returns(&networkBalance), ); err != nil { return err } diff --git a/internal/api/sign.go b/internal/api/sign.go index ea3fbba..2f3abd3 100644 --- a/internal/api/sign.go +++ b/internal/api/sign.go @@ -52,14 +52,36 @@ func HandleSignTransfer(cu *custodial.Custodial) func(echo.Context) error { }) } + trackingId := uuid.NewString() + if gasQuota < 1 { + gasRefillPayload, err := json.Marshal(task.AccountPayload{ + PublicKey: req.From, + TrackingId: trackingId, + }) + if err != nil { + return err + } + + _, err = cu.TaskerClient.CreateTask( + c.Request().Context(), + tasker.AccountRefillGasTask, + tasker.DefaultPriority, + &tasker.Task{ + Id: trackingId, + Payload: gasRefillPayload, + }, + ) + if err != nil { + return err + } + return c.JSON(http.StatusForbidden, ErrResp{ Ok: false, Message: "Out of gas, refill pending. Try again later.", }) } - trackingId := uuid.NewString() taskPayload, err := json.Marshal(task.TransferPayload{ TrackingId: trackingId, From: req.From, @@ -84,11 +106,6 @@ func HandleSignTransfer(cu *custodial.Custodial) func(echo.Context) error { return err } - err = cu.PgStore.DecrGasQuota(c.Request().Context(), req.From) - if err != nil { - return err - } - return c.JSON(http.StatusOK, OkResp{ Ok: true, Result: H{ diff --git a/internal/custodial/abis.go b/internal/custodial/abis.go new file mode 100644 index 0000000..c20658b --- /dev/null +++ b/internal/custodial/abis.go @@ -0,0 +1,27 @@ +package custodial + +import "github.com/grassrootseconomics/w3-celo-patch" + +const ( + Check = "check" + GiveTo = "giveTo" + MintTo = "mintTo" + NextTime = "nextTime" + Register = "register" + Transfer = "transfer" + TransferFrom = "transferFrom" +) + +// Define common smart contrcat ABI's that can be injected into the system container. +// Any relevant function signature that will be used by the custodial system can be defined here. +func initAbis() map[string]*w3.Func { + return map[string]*w3.Func{ + Check: w3.MustNewFunc("check(address)", "bool"), + GiveTo: w3.MustNewFunc("giveTo(address)", "uint256"), + MintTo: w3.MustNewFunc("mintTo(address, uint256)", "bool"), + NextTime: w3.MustNewFunc("nextTime(address)", "uint256"), + Register: w3.MustNewFunc("register(address)", ""), + Transfer: w3.MustNewFunc("transfer(address,uint256)", "bool"), + TransferFrom: w3.MustNewFunc("transferFrom(address, address, uint256)", "bool"), + } +} diff --git a/internal/custodial/custodial.go b/internal/custodial/custodial.go index b33af39..3f64413 100644 --- a/internal/custodial/custodial.go +++ b/internal/custodial/custodial.go @@ -1,12 +1,13 @@ package custodial import ( + "context" "crypto/ecdsa" - "math/big" "time" "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common" + eth_crypto "github.com/celo-org/celo-blockchain/crypto" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" @@ -14,34 +15,90 @@ import ( "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/w3-celo-patch" + "github.com/grassrootseconomics/w3-celo-patch/module/eth" + "github.com/labstack/gommon/log" "github.com/redis/go-redis/v9" ) type ( - SystemContainer struct { - Abis map[string]*w3.Func - AccountIndexContract common.Address - GasFaucetContract common.Address - GasRefillThreshold *big.Int - GasRefillValue *big.Int - GiftableGasValue *big.Int - GiftableToken common.Address - GiftableTokenValue *big.Int - LockTimeout time.Duration - PrivateKey *ecdsa.PrivateKey - PublicKey string - TokenDecimals int - TokenTransferGasLimit uint64 + Opts struct { + CeloProvider *celoutils.Provider + Keystore keystore.Keystore + LockProvider *redislock.Client + Noncestore nonce.Noncestore + PgStore store.Store + Pub *pub.Pub + RedisClient *redis.Client + RegistryAddress string + SystemPrivateKey string + SystemPublicKey string + TaskerClient *tasker.TaskerClient } + Custodial struct { - CeloProvider *celoutils.Provider - Keystore keystore.Keystore - LockProvider *redislock.Client - Noncestore nonce.Noncestore - PgStore store.Store - Pub *pub.Pub - RedisClient *redis.Client - SystemContainer *SystemContainer - TaskerClient *tasker.TaskerClient + Abis map[string]*w3.Func + CeloProvider *celoutils.Provider + Keystore keystore.Keystore + LockProvider *redislock.Client + Noncestore nonce.Noncestore + PgStore store.Store + Pub *pub.Pub + RedisClient *redis.Client + RegistryMap map[string]common.Address + SystemPrivateKey *ecdsa.PrivateKey + SystemPublicKey string + TaskerClient *tasker.TaskerClient } ) + +func NewCustodial(o Opts) (*Custodial, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + registryMap, err := o.CeloProvider.RegistryMap(ctx, celoutils.HexToAddress(o.RegistryAddress)) + if err != nil { + log.Errorf("err: %v", err) + return nil, err + } + + _, err = o.Noncestore.Peek(ctx, o.SystemPublicKey) + if err == redis.Nil { + // TODO: Bootsrap from Postgres first + var networkNonce uint64 + + err := o.CeloProvider.Client.CallCtx( + ctx, + eth.Nonce(celoutils.HexToAddress(o.SystemPublicKey), nil).Returns(&networkNonce), + ) + if err != nil { + return nil, err + } + + if err := o.Noncestore.SetAccountNonce(ctx, o.SystemPublicKey, networkNonce); err != nil { + return nil, err + } + } else if err != nil { + return nil, err + } + + privateKey, err := eth_crypto.HexToECDSA(o.SystemPrivateKey) + if err != nil { + return nil, err + } + + return &Custodial{ + Abis: initAbis(), + CeloProvider: o.CeloProvider, + Keystore: o.Keystore, + LockProvider: o.LockProvider, + Noncestore: o.Noncestore, + PgStore: o.PgStore, + Pub: o.Pub, + RedisClient: o.RedisClient, + RegistryMap: registryMap, + SystemPrivateKey: privateKey, + SystemPublicKey: o.SystemPublicKey, + TaskerClient: o.TaskerClient, + }, nil + +} diff --git a/internal/nonce/redis.go b/internal/nonce/redis.go index c0915fa..16d8b11 100644 --- a/internal/nonce/redis.go +++ b/internal/nonce/redis.go @@ -6,14 +6,16 @@ import ( redispool "github.com/grassrootseconomics/cic-custodial/pkg/redis" ) -type Opts struct { - RedisPool *redispool.RedisPool -} +type ( + Opts struct { + RedisPool *redispool.RedisPool + } -// RedisNoncestore implements `Noncestore` -type RedisNoncestore struct { - redis *redispool.RedisPool -} + // RedisNoncestore implements `Noncestore` + RedisNoncestore struct { + redis *redispool.RedisPool + } +) func NewRedisNoncestore(o Opts) Noncestore { return &RedisNoncestore{ diff --git a/internal/pub/js_pub.go b/internal/pub/js_pub.go index 64d8c07..a3155a8 100644 --- a/internal/pub/js_pub.go +++ b/internal/pub/js_pub.go @@ -8,16 +8,10 @@ import ( ) const ( - streamName string = "CUSTODIAL" - streamSubjects string = "CUSTODIAL.*" - AccountNewNonce string = "CUSTODIAL.accountNewNonce" - AccountRegister string = "CUSTODIAL.accountRegister" - AccountGiftGas string = "CUSTODIAL.systemNewAccountGas" - AccountGiftVoucher string = "CUSTODIAL.systemNewAccountVoucher" - AccountRefillGas string = "CUSTODIAL.systemRefillAccountGas" - DispatchFail string = "CUSTODIAL.dispatchFail" - DispatchSuccess string = "CUSTODIAL.dispatchSuccess" - SignTransfer string = "CUSTODIAL.signTransfer" + streamName string = "CUSTODIAL" + streamSubjects string = "CUSTODIAL.*" + AccountActivated string = "CUSTODIAL.accountActivated" + GasRefilled string = "CUSTODIAL.gasRefilled" ) type ( @@ -32,9 +26,7 @@ type ( } EventPayload struct { - OtxId uint `json:"otxId"` - TrackingId string `json:"trackingId"` - TxHash string `json:"txHash"` + TxHash string `json:"txHash"` } ) diff --git a/internal/sub/handler.go b/internal/sub/handler.go index ea487f1..9047366 100644 --- a/internal/sub/handler.go +++ b/internal/sub/handler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" + "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/nats-io/nats.go" ) @@ -22,9 +23,41 @@ func (s *Sub) handler(ctx context.Context, msg *nats.Msg) error { } switch msg.Subject { + case "CHAIN.register": + if chainEvent.Success { + if err := s.cu.PgStore.ActivateAccount(ctx, chainEvent.To); err != nil { + return err + } + + eventPayload := &pub.EventPayload{ + TxHash: chainEvent.TxHash, + } + + if err := s.cu.Pub.Publish( + pub.AccountActivated, + chainEvent.TxHash, + eventPayload, + ); err != nil { + return err + } + } case "CHAIN.gas": - if err := s.cu.PgStore.ResetGasQuota(ctx, chainEvent.To); err != nil { - return err + if chainEvent.Success { + if err := s.cu.PgStore.ResetGasQuota(ctx, chainEvent.To); err != nil { + return err + } + + eventPayload := &pub.EventPayload{ + TxHash: chainEvent.TxHash, + } + + if err := s.cu.Pub.Publish( + pub.GasRefilled, + chainEvent.TxHash, + eventPayload, + ); err != nil { + return err + } } } diff --git a/internal/tasker/task/account_activate.go b/internal/tasker/task/account_activate.go deleted file mode 100644 index 456209e..0000000 --- a/internal/tasker/task/account_activate.go +++ /dev/null @@ -1,45 +0,0 @@ -package task - -import ( - "context" - "encoding/json" - "errors" - - "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/hibiken/asynq" -) - -const ( - requiredQuorum = 3 -) - -var ( - ErrQuorumNotReached = errors.New("Account activation quorum not reached.") -) - -func AccountActivateProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - payload AccountPayload - ) - - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - return err - } - - quorum, err := cu.PgStore.GetAccountActivationQuorum(ctx, payload.TrackingId) - if err != nil { - return err - } - - if quorum < requiredQuorum { - return ErrQuorumNotReached - } - - if err := cu.PgStore.ActivateAccount(ctx, payload.PublicKey); err != nil { - return err - } - - return nil - } -} diff --git a/internal/tasker/task/account_gift_gas.go b/internal/tasker/task/account_gift_gas.go deleted file mode 100644 index 1ca2fee..0000000 --- a/internal/tasker/task/account_gift_gas.go +++ /dev/null @@ -1,145 +0,0 @@ -package task - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/bsm/redislock" - "github.com/celo-org/celo-blockchain/common/hexutil" - "github.com/grassrootseconomics/celoutils" - "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/pub" - "github.com/grassrootseconomics/cic-custodial/internal/store" - "github.com/grassrootseconomics/cic-custodial/internal/tasker" - "github.com/grassrootseconomics/cic-custodial/pkg/enum" - "github.com/grassrootseconomics/w3-celo-patch" - "github.com/hibiken/asynq" -) - -const ( - accountActivationCheckDelay = 5 * time.Second -) - -func AccountGiftGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - err error - payload AccountPayload - ) - - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) - } - - lock, err := cu.LockProvider.Obtain( - ctx, - lockPrefix+cu.SystemContainer.PublicKey, - cu.SystemContainer.LockTimeout, - &redislock.Options{ - RetryStrategy: lockRetry(), - }, - ) - if err != nil { - return err - } - defer lock.Release(ctx) - - nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey) - if err != nil { - return err - } - defer func() { - if err != nil { - if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { - err = nErr - } - } - }() - - builtTx, err := cu.CeloProvider.SignGasTransferTx( - cu.SystemContainer.PrivateKey, - celoutils.GasTransferTxOpts{ - To: w3.A(payload.PublicKey), - Nonce: nonce, - Value: cu.SystemContainer.GiftableGasValue, - GasFeeCap: celoutils.SafeGasFeeCap, - GasTipCap: celoutils.SafeGasTipCap, - }, - ) - if err != nil { - return err - } - - rawTx, err := builtTx.MarshalBinary() - if err != nil { - return err - } - - id, err := cu.PgStore.CreateOtx(ctx, store.OTX{ - TrackingId: payload.TrackingId, - Type: enum.GIFT_GAS, - RawTx: hexutil.Encode(rawTx), - TxHash: builtTx.Hash().Hex(), - From: cu.SystemContainer.PublicKey, - Data: hexutil.Encode(builtTx.Data()), - GasPrice: builtTx.GasPrice().Uint64(), - GasLimit: builtTx.Gas(), - TransferValue: cu.SystemContainer.GiftableGasValue.Uint64(), - Nonce: builtTx.Nonce(), - }) - if err != nil { - return err - } - - disptachJobPayload, err := json.Marshal(TxPayload{ - OtxId: id, - Tx: builtTx, - }) - if err != nil { - return err - } - - _, err = cu.TaskerClient.CreateTask( - ctx, - tasker.DispatchTxTask, - tasker.HighPriority, - &tasker.Task{ - Payload: disptachJobPayload, - }, - ) - if err != nil { - return err - } - - _, err = cu.TaskerClient.CreateTask( - ctx, - tasker.AccountActivateTask, - tasker.DefaultPriority, - &tasker.Task{ - Payload: t.Payload(), - }, - asynq.ProcessIn(accountActivationCheckDelay), - ) - if err != nil { - return err - } - - eventPayload := &pub.EventPayload{ - OtxId: id, - TrackingId: payload.TrackingId, - TxHash: builtTx.Hash().Hex(), - } - - if err := cu.Pub.Publish( - pub.AccountGiftGas, - builtTx.Hash().Hex(), - eventPayload, - ); err != nil { - return err - } - - return nil - } -} diff --git a/internal/tasker/task/account_gift_voucher.go b/internal/tasker/task/account_gift_voucher.go deleted file mode 100644 index 2ffca28..0000000 --- a/internal/tasker/task/account_gift_voucher.go +++ /dev/null @@ -1,137 +0,0 @@ -package task - -import ( - "context" - "encoding/json" - - "github.com/bsm/redislock" - "github.com/celo-org/celo-blockchain/common/hexutil" - "github.com/grassrootseconomics/celoutils" - "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/pub" - "github.com/grassrootseconomics/cic-custodial/internal/store" - "github.com/grassrootseconomics/cic-custodial/internal/tasker" - "github.com/grassrootseconomics/cic-custodial/pkg/enum" - "github.com/grassrootseconomics/w3-celo-patch" - "github.com/hibiken/asynq" -) - -func GiftVoucherProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - err error - payload AccountPayload - ) - - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - return err - } - - lock, err := cu.LockProvider.Obtain( - ctx, - lockPrefix+cu.SystemContainer.PublicKey, - cu.SystemContainer.LockTimeout, - &redislock.Options{ - RetryStrategy: lockRetry(), - }, - ) - if err != nil { - return err - } - defer lock.Release(ctx) - - nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey) - if err != nil { - return err - } - defer func() { - if err != nil { - if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { - err = nErr - } - } - }() - - input, err := cu.SystemContainer.Abis["mintTo"].EncodeArgs( - w3.A(payload.PublicKey), - cu.SystemContainer.GiftableTokenValue, - ) - if err != nil { - return err - } - - builtTx, err := cu.CeloProvider.SignContractExecutionTx( - cu.SystemContainer.PrivateKey, - celoutils.ContractExecutionTxOpts{ - ContractAddress: cu.SystemContainer.GiftableToken, - InputData: input, - GasFeeCap: celoutils.SafeGasFeeCap, - GasTipCap: celoutils.SafeGasTipCap, - GasLimit: cu.SystemContainer.TokenTransferGasLimit, - Nonce: nonce, - }, - ) - if err != nil { - return err - } - - rawTx, err := builtTx.MarshalBinary() - if err != nil { - return err - } - - id, err := cu.PgStore.CreateOtx(ctx, store.OTX{ - TrackingId: payload.TrackingId, - Type: enum.GIFT_VOUCHER, - RawTx: hexutil.Encode(rawTx), - TxHash: builtTx.Hash().Hex(), - From: cu.SystemContainer.PublicKey, - Data: hexutil.Encode(builtTx.Data()), - GasPrice: builtTx.GasPrice().Uint64(), - GasLimit: builtTx.Gas(), - TransferValue: cu.SystemContainer.GiftableTokenValue.Uint64(), - Nonce: builtTx.Nonce(), - }) - if err != nil { - - return err - } - - disptachJobPayload, err := json.Marshal(TxPayload{ - OtxId: id, - Tx: builtTx, - }) - if err != nil { - - return err - } - - _, err = cu.TaskerClient.CreateTask( - ctx, - tasker.DispatchTxTask, - tasker.HighPriority, - &tasker.Task{ - Payload: disptachJobPayload, - }, - ) - if err != nil { - return err - } - - eventPayload := &pub.EventPayload{ - OtxId: id, - TrackingId: payload.TrackingId, - TxHash: builtTx.Hash().Hex(), - } - - if err := cu.Pub.Publish( - pub.AccountGiftVoucher, - builtTx.Hash().Hex(), - eventPayload, - ); err != nil { - return err - } - - return nil - } -} diff --git a/internal/tasker/task/account_prepare.go b/internal/tasker/task/account_prepare.go deleted file mode 100644 index 6d75ed6..0000000 --- a/internal/tasker/task/account_prepare.go +++ /dev/null @@ -1,81 +0,0 @@ -package task - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/pub" - "github.com/grassrootseconomics/cic-custodial/internal/tasker" - "github.com/hibiken/asynq" -) - -type AccountPayload struct { - PublicKey string `json:"publicKey"` - TrackingId string `json:"trackingId"` -} - -func AccountPrepare(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var payload AccountPayload - - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) - } - - if err := cu.Noncestore.SetAccountNonce(ctx, payload.PublicKey, 0); err != nil { - return err - } - - _, err := cu.TaskerClient.CreateTask( - ctx, - tasker.AccountRegisterTask, - tasker.DefaultPriority, - &tasker.Task{ - Payload: t.Payload(), - }, - ) - if err != nil { - return err - } - - _, err = cu.TaskerClient.CreateTask( - ctx, - tasker.AccountGiftGasTask, - tasker.DefaultPriority, - &tasker.Task{ - Payload: t.Payload(), - }, - ) - if err != nil { - return err - } - - _, err = cu.TaskerClient.CreateTask( - ctx, - tasker.AccountGiftVoucherTask, - tasker.DefaultPriority, - &tasker.Task{ - Payload: t.Payload(), - }, - ) - if err != nil { - return err - } - - eventPayload := pub.EventPayload{ - TrackingId: payload.TrackingId, - } - - if err := cu.Pub.Publish( - pub.AccountNewNonce, - payload.PublicKey, - eventPayload, - ); err != nil { - return err - } - - return nil - } -} diff --git a/internal/tasker/task/account_refill_gas.go b/internal/tasker/task/account_refill_gas.go index 76ac118..5538100 100644 --- a/internal/tasker/task/account_refill_gas.go +++ b/internal/tasker/task/account_refill_gas.go @@ -3,26 +3,22 @@ package task import ( "context" "encoding/json" - "errors" - "fmt" + "math/big" "time" "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/enum" - "github.com/grassrootseconomics/w3-celo-patch" + "github.com/grassrootseconomics/w3-celo-patch/module/eth" "github.com/hibiken/asynq" - "github.com/redis/go-redis/v9" ) const ( - gasLockPrefix = "gas_lock:" - gasLockExpiry = 1 * time.Hour + gasGiveToLimit = 250000 ) func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { @@ -30,32 +26,75 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a var ( err error payload AccountPayload + + nextTime big.Int + checkStatus bool ) if err := json.Unmarshal(t.Payload(), &payload); err != nil { - return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) + return err } - // TODO: Check eth-faucet whether we can request for a topup before signing the tx. _, gasQuota, err := cu.PgStore.GetAccountStatusByAddress(ctx, payload.PublicKey) if err != nil { return err } - gasLock, err := cu.RedisClient.Get(ctx, gasLockPrefix+payload.PublicKey).Bool() - if !errors.Is(err, redis.Nil) { - return err - } - - if gasQuota > 0 || gasLock { + // The user has enough gas for atleast 5 more transactions. + if gasQuota > 5 { + return nil + } + + if err := cu.CeloProvider.Client.CallCtx( + ctx, + eth.CallFunc( + cu.Abis[custodial.NextTime], + cu.RegistryMap[celoutils.GasFaucet], + celoutils.HexToAddress(payload.PublicKey), + ).Returns(&nextTime), + ); err != nil { + return err + } + + // The user already requested funds, there is a cooldown applied. + // We can schedule an attempt after the cooldown period has passed. + if nextTime.Int64() > time.Now().Unix() { + _, err = cu.TaskerClient.CreateTask( + ctx, + tasker.AccountRefillGasTask, + tasker.DefaultPriority, + &tasker.Task{ + Payload: t.Payload(), + }, + asynq.ProcessAt(time.Unix(nextTime.Int64(), 0)), + ) + if err != nil { + return err + } + + return nil + } + + if err := cu.CeloProvider.Client.CallCtx( + ctx, + eth.CallFunc( + cu.Abis[custodial.Check], + cu.RegistryMap[celoutils.GasFaucet], + celoutils.HexToAddress(payload.PublicKey), + ).Returns(&checkStatus), + ); err != nil { + return err + } + + // The gas faucet backend returns a false status, a poke will fail. + if !checkStatus { return nil } - // TODO: Use eth-faucet. lock, err := cu.LockProvider.Obtain( ctx, - lockPrefix+cu.SystemContainer.PublicKey, - cu.SystemContainer.LockTimeout, + lockPrefix+cu.SystemPublicKey, + lockTimeout, &redislock.Options{ RetryStrategy: lockRetry(), }, @@ -65,27 +104,33 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a } defer lock.Release(ctx) - nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey) + nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemPublicKey) if err != nil { return err } defer func() { if err != nil { - if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { + if nErr := cu.Noncestore.Return(ctx, cu.SystemPublicKey); nErr != nil { err = nErr } } }() - // TODO: Review gas params - builtTx, err := cu.CeloProvider.SignGasTransferTx( - cu.SystemContainer.PrivateKey, - celoutils.GasTransferTxOpts{ - To: w3.A(payload.PublicKey), - Nonce: nonce, - Value: cu.SystemContainer.GiftableGasValue, - GasFeeCap: celoutils.SafeGasFeeCap, - GasTipCap: celoutils.SafeGasTipCap, + input, err := cu.Abis[custodial.GiveTo].EncodeArgs( + celoutils.HexToAddress(payload.PublicKey), + ) + if err != nil { + return err + } + + builtTx, err := cu.CeloProvider.SignContractExecutionTx( + cu.SystemPrivateKey, + celoutils.ContractExecutionTxOpts{ + ContractAddress: cu.RegistryMap[celoutils.GasFaucet], + InputData: input, + GasFeeCap: celoutils.SafeGasFeeCap, + GasTipCap: celoutils.SafeGasTipCap, + Nonce: nonce, }, ) if err != nil { @@ -98,16 +143,15 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a } id, err := cu.PgStore.CreateOtx(ctx, store.OTX{ - TrackingId: payload.TrackingId, - Type: enum.REFILL_GAS, - RawTx: hexutil.Encode(rawTx), - TxHash: builtTx.Hash().Hex(), - From: cu.SystemContainer.PublicKey, - Data: hexutil.Encode(builtTx.Data()), - GasPrice: builtTx.GasPrice().Uint64(), - GasLimit: builtTx.Gas(), - TransferValue: cu.SystemContainer.GiftableGasValue.Uint64(), - Nonce: builtTx.Nonce(), + TrackingId: payload.TrackingId, + Type: enum.REFILL_GAS, + RawTx: hexutil.Encode(rawTx), + TxHash: builtTx.Hash().Hex(), + From: cu.SystemPublicKey, + Data: hexutil.Encode(builtTx.Data()), + GasPrice: builtTx.GasPrice().Uint64(), + GasLimit: builtTx.Gas(), + Nonce: builtTx.Nonce(), }) if err != nil { return err @@ -133,24 +177,6 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a return err } - eventPayload := &pub.EventPayload{ - OtxId: id, - TrackingId: payload.TrackingId, - TxHash: builtTx.Hash().Hex(), - } - - if err := cu.Pub.Publish( - pub.AccountRefillGas, - builtTx.Hash().Hex(), - eventPayload, - ); err != nil { - return err - } - - if _, err := cu.RedisClient.SetEx(ctx, gasLockPrefix+payload.PublicKey, true, gasLockExpiry).Result(); err != nil { - return err - } - return nil } } diff --git a/internal/tasker/task/account_register_onchain.go b/internal/tasker/task/account_register.go similarity index 68% rename from internal/tasker/task/account_register_onchain.go rename to internal/tasker/task/account_register.go index c74d2d3..b626e2a 100644 --- a/internal/tasker/task/account_register_onchain.go +++ b/internal/tasker/task/account_register.go @@ -3,20 +3,22 @@ package task import ( "context" "encoding/json" - "fmt" "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/enum" - "github.com/grassrootseconomics/w3-celo-patch" "github.com/hibiken/asynq" ) +type AccountPayload struct { + PublicKey string `json:"publicKey"` + TrackingId string `json:"trackingId"` +} + func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { var ( @@ -25,13 +27,13 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte ) if err := json.Unmarshal(t.Payload(), &payload); err != nil { - return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) + return err } lock, err := cu.LockProvider.Obtain( ctx, - lockPrefix+cu.SystemContainer.PublicKey, - cu.SystemContainer.LockTimeout, + lockPrefix+cu.SystemPublicKey, + lockTimeout, &redislock.Options{ RetryStrategy: lockRetry(), }, @@ -41,34 +43,33 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte } defer lock.Release(ctx) - nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey) + nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemPublicKey) if err != nil { return err } defer func() { if err != nil { - if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { + if nErr := cu.Noncestore.Return(ctx, cu.SystemPublicKey); nErr != nil { err = nErr } } }() - input, err := cu.SystemContainer.Abis["add"].EncodeArgs( - w3.A(payload.PublicKey), + input, err := cu.Abis[custodial.Register].EncodeArgs( + celoutils.HexToAddress(payload.PublicKey), ) if err != nil { return err } - // TODO: Review gas params. builtTx, err := cu.CeloProvider.SignContractExecutionTx( - cu.SystemContainer.PrivateKey, + cu.SystemPrivateKey, celoutils.ContractExecutionTxOpts{ - ContractAddress: cu.SystemContainer.AccountIndexContract, + ContractAddress: cu.RegistryMap[celoutils.CustodialProxy], InputData: input, GasFeeCap: celoutils.SafeGasFeeCap, GasTipCap: celoutils.SafeGasTipCap, - GasLimit: cu.SystemContainer.TokenTransferGasLimit, + GasLimit: gasLimit, Nonce: nonce, }, ) @@ -86,7 +87,7 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte Type: enum.ACCOUNT_REGISTER, RawTx: hexutil.Encode(rawTx), TxHash: builtTx.Hash().Hex(), - From: cu.SystemContainer.PublicKey, + From: cu.SystemPublicKey, Data: hexutil.Encode(builtTx.Data()), GasPrice: builtTx.GasPrice().Uint64(), GasLimit: builtTx.Gas(), @@ -95,7 +96,7 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte if err != nil { return err } - + disptachJobPayload, err := json.Marshal(TxPayload{ OtxId: id, Tx: builtTx, @@ -116,17 +117,7 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte return err } - eventPayload := &pub.EventPayload{ - OtxId: id, - TrackingId: payload.TrackingId, - TxHash: builtTx.Hash().Hex(), - } - - if err := cu.Pub.Publish( - pub.AccountRegister, - builtTx.Hash().Hex(), - eventPayload, - ); err != nil { + if err := cu.Noncestore.SetAccountNonce(ctx, payload.PublicKey, 0); err != nil { return err } diff --git a/internal/tasker/task/dispatch_tx.go b/internal/tasker/task/dispatch_tx.go index 3271d0d..77aeabe 100644 --- a/internal/tasker/task/dispatch_tx.go +++ b/internal/tasker/task/dispatch_tx.go @@ -9,7 +9,6 @@ import ( "github.com/celo-org/celo-blockchain/core/types" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/w3-celo-patch/module/eth" @@ -26,18 +25,14 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro var ( payload TxPayload dispatchStatus store.DispatchStatus - eventPayload pub.EventPayload dispathchTx common.Hash ) if err := json.Unmarshal(t.Payload(), &payload); err != nil { - return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) + return err } - txHash := payload.Tx.Hash().Hex() - - dispatchStatus.OtxId, eventPayload.OtxId = payload.OtxId, payload.OtxId - eventPayload.TxHash = txHash + dispatchStatus.OtxId = payload.OtxId if err := cu.CeloProvider.Client.CallCtx( ctx, @@ -58,10 +53,6 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } - if err := cu.Pub.Publish(pub.DispatchFail, txHash, eventPayload); err != nil { - return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) - } - return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } @@ -71,10 +62,6 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } - if err := cu.Pub.Publish(pub.DispatchSuccess, txHash, eventPayload); err != nil { - return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) - } - return nil } } diff --git a/internal/tasker/task/sign_transfer.go b/internal/tasker/task/sign_transfer.go index f44c2f4..41680fb 100644 --- a/internal/tasker/task/sign_transfer.go +++ b/internal/tasker/task/sign_transfer.go @@ -3,37 +3,25 @@ package task import ( "context" "encoding/json" - "fmt" "math/big" "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common/hexutil" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/enum" - "github.com/grassrootseconomics/w3-celo-patch" "github.com/hibiken/asynq" ) -type ( - TransferPayload struct { - TrackingId string `json:"trackingId"` - From string `json:"from" ` - To string `json:"to"` - VoucherAddress string `json:"voucherAddress"` - Amount uint64 `json:"amount"` - } - - transferEventPayload struct { - DispatchTaskId string `json:"dispatchTaskId"` - OTXId uint `json:"otxId"` - TrackingId string `json:"trackingId"` - TxHash string `json:"txHash"` - } -) +type TransferPayload struct { + TrackingId string `json:"trackingId"` + From string `json:"from" ` + To string `json:"to"` + VoucherAddress string `json:"voucherAddress"` + Amount uint64 `json:"amount"` +} func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { @@ -43,13 +31,13 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er ) if err := json.Unmarshal(t.Payload(), &payload); err != nil { - return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) + return err } lock, err := cu.LockProvider.Obtain( ctx, lockPrefix+payload.From, - cu.SystemContainer.LockTimeout, + lockTimeout, &redislock.Options{ RetryStrategy: lockRetry(), }, @@ -70,26 +58,25 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er } defer func() { if err != nil { - if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { + if nErr := cu.Noncestore.Return(ctx, cu.SystemPublicKey); nErr != nil { err = nErr } } }() - input, err := cu.SystemContainer.Abis["transfer"].EncodeArgs(w3.A(payload.To), new(big.Int).SetUint64(payload.Amount)) + input, err := cu.Abis[custodial.Transfer].EncodeArgs(celoutils.HexToAddress(payload.To), new(big.Int).SetUint64(payload.Amount)) if err != nil { return err } - // TODO: Review gas params. builtTx, err := cu.CeloProvider.SignContractExecutionTx( key, celoutils.ContractExecutionTxOpts{ - ContractAddress: w3.A(payload.VoucherAddress), + ContractAddress: celoutils.HexToAddress(payload.VoucherAddress), InputData: input, GasFeeCap: celoutils.SafeGasFeeCap, GasTipCap: celoutils.SafeGasTipCap, - GasLimit: cu.SystemContainer.TokenTransferGasLimit, + GasLimit: gasLimit, Nonce: nonce, }, ) @@ -118,6 +105,10 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er return err } + if err := cu.PgStore.DecrGasQuota(ctx, payload.From); err != nil { + return err + } + disptachJobPayload, err := json.Marshal(TxPayload{ OtxId: id, Tx: builtTx, @@ -158,20 +149,6 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er return err } - eventPayload := &transferEventPayload{ - OTXId: id, - TrackingId: payload.TrackingId, - TxHash: builtTx.Hash().Hex(), - } - - if err := cu.Pub.Publish( - pub.SignTransfer, - builtTx.Hash().Hex(), - eventPayload, - ); err != nil { - return err - } - return nil } } diff --git a/internal/tasker/task/utils.go b/internal/tasker/task/utils.go index e75aa17..e3c7d77 100644 --- a/internal/tasker/task/utils.go +++ b/internal/tasker/task/utils.go @@ -7,8 +7,11 @@ import ( ) const ( + gasLimit = 250000 + lockPrefix = "lock:" lockRetryDelay = 25 * time.Millisecond + lockTimeout = 1 * time.Second ) // lockRetry will at most try to obtain the lock 20 times within ~0.5s. diff --git a/internal/tasker/types.go b/internal/tasker/types.go index 81f8c28..a321ad8 100644 --- a/internal/tasker/types.go +++ b/internal/tasker/types.go @@ -15,14 +15,10 @@ type Task struct { } const ( - AccountPrepareTask TaskName = "sys:prepare_account" - AccountRegisterTask TaskName = "sys:register_account" - AccountGiftGasTask TaskName = "sys:gift_gas" - AccountGiftVoucherTask TaskName = "sys:gift_token" - AccountRefillGasTask TaskName = "sys:refill_gas" - AccountActivateTask TaskName = "sys:quorum_check" - SignTransferTask TaskName = "usr:sign_transfer" - DispatchTxTask TaskName = "rpc:dispatch" + AccountRegisterTask TaskName = "sys:register_account" + AccountRefillGasTask TaskName = "sys:refill_gas" + SignTransferTask TaskName = "usr:sign_transfer" + DispatchTxTask TaskName = "rpc:dispatch" ) const ( diff --git a/pkg/enum/enum.go b/pkg/enum/enum.go index 5b4e159..5671e95 100644 --- a/pkg/enum/enum.go +++ b/pkg/enum/enum.go @@ -19,9 +19,7 @@ const ( FAIL_UNKNOWN_RPC_ERROR OtxStatus = "FAIL_UNKNOWN_RPC_ERROR" REVERTED OtxStatus = "REVERTED" - GIFT_GAS OtxType = "GIFT_GAS" ACCOUNT_REGISTER OtxType = "ACCOUNT_REGISTER" - GIFT_VOUCHER OtxType = "GIFT_VOUCHER" REFILL_GAS OtxType = "REFILL_GAS" TRANSFER_VOUCHER OtxType = "TRANSFER_VOUCHER" )