From cf1f9f34c315fc706672dab8448216cc27b606ec Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Mon, 20 Feb 2023 09:56:30 +0000 Subject: [PATCH] refactor: task handlers, emitter, tx signer, et.c. * fallback to custom ethereum checksum validator -> https://github.com/go-playground/validator/issues/1073 * decouple jetsream emitter to separate package * refactor task handlers into individual files * add error handler for echo to capture unexpected errors and log them * move handler dependencies into single struct container -> custodialContainer * replace signer to use EIP 1559 signer -> celoutils v1 * Add 1 minutes timeout to all custodial tasks --- cmd/service/api.go | 29 +- cmd/service/init.go | 79 +-- cmd/service/main.go | 52 +- cmd/service/tasker.go | 75 +-- config.toml | 37 +- go.mod | 9 +- go.sum | 13 + internal/api/account.go | 13 +- internal/api/sign.go | 25 +- internal/api/validator.go | 20 +- internal/custodial/custodial.go | 22 + internal/events/events.go | 14 + internal/events/jetstream.go | 87 +++ internal/nonce/redis.go | 6 +- internal/store/dispatch.go | 14 +- internal/store/store.go | 2 +- internal/tasker/client.go | 5 + internal/tasker/server.go | 12 +- internal/tasker/task/account.go | 556 ------------------ internal/tasker/task/account_gift_gas.go | 119 ++++ internal/tasker/task/account_gift_voucher.go | 129 ++++ internal/tasker/task/account_prepare.go | 78 +++ internal/tasker/task/account_refill_gas.go | 134 +++++ .../tasker/task/account_register_onchain.go | 128 ++++ internal/tasker/task/dispatch.go | 108 ---- internal/tasker/task/dispatch_tx.go | 80 +++ internal/tasker/task/sign.go | 182 ------ internal/tasker/task/sign_transfer.go | 167 ++++++ internal/tasker/types.go | 14 +- pkg/logg/asynq.go | 31 +- 30 files changed, 1147 insertions(+), 1093 deletions(-) create mode 100644 internal/custodial/custodial.go create mode 100644 internal/events/events.go create mode 100644 internal/events/jetstream.go delete mode 100644 internal/tasker/task/account.go create mode 100644 internal/tasker/task/account_gift_gas.go create mode 100644 internal/tasker/task/account_gift_voucher.go create mode 100644 internal/tasker/task/account_prepare.go create mode 100644 internal/tasker/task/account_refill_gas.go create mode 100644 internal/tasker/task/account_register_onchain.go delete mode 100644 internal/tasker/task/dispatch.go create mode 100644 internal/tasker/task/dispatch_tx.go delete mode 100644 internal/tasker/task/sign.go create mode 100644 internal/tasker/task/sign_transfer.go diff --git a/cmd/service/api.go b/cmd/service/api.go index 9f83c9b..711e257 100644 --- a/cmd/service/api.go +++ b/cmd/service/api.go @@ -5,14 +5,15 @@ import ( "net/http" "github.com/VictoriaMetrics/metrics" - "github.com/go-playground/validator" + "github.com/go-playground/validator/v10" "github.com/grassrootseconomics/cic-custodial/internal/api" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/hibiken/asynq" "github.com/labstack/echo/v4" ) // Bootstrap API server. -func initApiServer(custodialContainer *custodial) *echo.Echo { +func initApiServer(custodialContainer *custodial.Custodial) *echo.Echo { lo.Debug("api: bootstrapping api server") server := echo.New() server.HideBanner = true @@ -29,6 +30,15 @@ func initApiServer(custodialContainer *custodial) *echo.Echo { return } + if err.(validator.ValidationErrors) != nil { + c.JSON(http.StatusForbidden, api.ErrResp{ + Ok: false, + Code: api.VALIDATION_ERROR, + Message: err.(validator.ValidationErrors).Error(), + }) + return + } + // Log internal server error for further investigation. lo.Error("api:", "path", c.Path(), "err", err) @@ -46,19 +56,16 @@ func initApiServer(custodialContainer *custodial) *echo.Echo { }) } + customValidator := validator.New() + customValidator.RegisterValidation("eth_checksum", api.EthChecksumValidator) + server.Validator = &api.Validator{ - ValidatorProvider: validator.New(), + ValidatorProvider: customValidator, } apiRoute := server.Group("/api") - apiRoute.POST("/account/create", api.CreateAccountHandler( - custodialContainer.keystore, - custodialContainer.taskerClient, - )) - - apiRoute.POST("/sign/transfer", api.SignTransferHandler( - custodialContainer.taskerClient, - )) + apiRoute.POST("/account/create", api.CreateAccountHandler(custodialContainer)) + apiRoute.POST("/sign/transfer", api.SignTransferHandler(custodialContainer)) return server } diff --git a/cmd/service/init.go b/cmd/service/init.go index e070f8d..a2b10fe 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -5,7 +5,8 @@ import ( "time" "github.com/bsm/redislock" - celo "github.com/grassrootseconomics/cic-celo-sdk" + "github.com/grassrootseconomics/celoutils" + "github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/queries" @@ -20,10 +21,23 @@ import ( "github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/file" - "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) +// Load logger. +func initLogger(debug bool) logf.Logger { + loggOpts := logg.LoggOpts{ + Color: true, + } + + if debug { + loggOpts.Caller = true + loggOpts.Debug = true + } + + return logg.NewLogg(loggOpts) +} + // Load config file. func initConfig(configFilePath string) *koanf.Koanf { var ( @@ -45,34 +59,19 @@ func initConfig(configFilePath string) *koanf.Koanf { return ko } -// Load logger. -func initLogger(debug bool) logf.Logger { - loggOpts := logg.LoggOpts{ - Color: true, - } - - if debug { - loggOpts.Caller = true - loggOpts.Debug = true - } - - return logg.NewLogg(loggOpts) -} - // Load Celo chain provider. -func initCeloProvider() (*celo.Provider, error) { - providerOpts := celo.ProviderOpts{ +func initCeloProvider() (*celoutils.Provider, error) { + providerOpts := celoutils.ProviderOpts{ RpcEndpoint: ko.MustString("chain.rpc_endpoint"), } if ko.Bool("chain.testnet") { - // Devnet = 1337 - providerOpts.ChainId = 1337 + providerOpts.ChainId = celoutils.TestnetChainId } else { - providerOpts.ChainId = celo.MainnetChainId + providerOpts.ChainId = celoutils.MainnetChainId } - provider, err := celo.NewProvider(providerOpts) + provider, err := celoutils.NewProvider(providerOpts) if err != nil { return nil, err } @@ -126,7 +125,7 @@ func initCommonRedisPool() (*redis.RedisPool, error) { // Load SQL statements into struct. func initQueries(queriesPath string) (*queries.Queries, error) { - parsedQueries, err := goyesql.ParseFile(queriesFlag) + parsedQueries, err := goyesql.ParseFile(queriesPath) if err != nil { return nil, err } @@ -150,7 +149,7 @@ func initPostgresKeystore(postgresPool *pgxpool.Pool, queries *queries.Queries) } // Load redis backed noncestore. -func initRedisNoncestore(redisPool *redis.RedisPool, celoProvider *celo.Provider) nonce.Noncestore { +func initRedisNoncestore(redisPool *redis.RedisPool, celoProvider *celoutils.Provider) nonce.Noncestore { return nonce.NewRedisNoncestore(nonce.Opts{ RedisPool: redisPool, CeloProvider: celoProvider, @@ -179,32 +178,16 @@ func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) sto } // Init JetStream context for tasker events. -func initJetStream() (nats.JetStreamContext, error) { - natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint")) +func initJetStream() (events.EventEmitter, error) { + jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{ + ServerUrl: ko.MustString("jetstream.endpoint"), + PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour, + DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour, + }) + if err != nil { return nil, err } - js, err := natsConn.JetStream() - if err != nil { - return nil, err - } - - // Bootstrap stream if it does not exist - stream, _ := js.StreamInfo(ko.MustString("jetstream.stream_name")) - if stream == nil { - lo.Info("jetstream: bootstrapping stream") - _, err = js.AddStream(&nats.StreamConfig{ - Name: ko.MustString("jetstream.stream_name"), - MaxAge: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour, - Storage: nats.FileStorage, - Subjects: ko.MustStrings("jetstream.stream_subjects"), - Duplicates: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour, - }) - if err != nil { - return nil, err - } - } - - return js, nil + return jsEmitter, nil } diff --git a/cmd/service/main.go b/cmd/service/main.go index 9672430..09f5849 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -9,11 +9,7 @@ import ( "sync" "syscall" - "github.com/bsm/redislock" - celo "github.com/grassrootseconomics/cic-celo-sdk" - "github.com/grassrootseconomics/cic-custodial/internal/keystore" - "github.com/grassrootseconomics/cic-custodial/internal/nonce" - "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/knadh/koanf" "github.com/labstack/echo/v4" @@ -29,16 +25,6 @@ var ( ko *koanf.Koanf ) -type custodial struct { - celoProvider *celo.Provider - keystore keystore.Keystore - lockProvider *redislock.Client - noncestore nonce.Noncestore - pgStore store.Store - systemContainer *tasker.SystemContainer - taskerClient *tasker.TaskerClient -} - func init() { flag.StringVar(&confFlag, "config", "config.toml", "Config file location") flag.BoolVar(&debugFlag, "log", false, "Enable debug logging") @@ -47,28 +33,28 @@ func init() { lo = initLogger(debugFlag) ko = initConfig(confFlag) + } func main() { var ( tasker *tasker.TaskerServer apiServer *echo.Echo - wg sync.WaitGroup ) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - celoProvider, err := initCeloProvider() - if err != nil { - lo.Fatal("main: critical error loading chain provider", "error", err) - } - queries, err := initQueries(queriesFlag) if err != nil { lo.Fatal("main: critical error loading SQL queries", "error", err) } + celoProvider, err := initCeloProvider() + if err != nil { + lo.Fatal("main: critical error loading chain provider", "error", err) + } + postgresPool, err := initPostgresPool() if err != nil { lo.Fatal("main: critical error connecting to postgres", "error", err) @@ -89,6 +75,11 @@ func main() { lo.Fatal("main: critical error loading keystore") } + jsEventEmitter, err := initJetStream() + if err != nil { + lo.Fatal("main: critical error loading jetstream event emitter") + } + pgStore := initPostgresStore(postgresPool, queries) redisNoncestore := initRedisNoncestore(redisPool, celoProvider) lockProvider := initLockProvider(redisPool.Client) @@ -99,16 +90,19 @@ func main() { lo.Fatal("main: critical error bootstrapping system container", "error", err) } - custodial := &custodial{ - celoProvider: celoProvider, - keystore: postgresKeystore, - lockProvider: lockProvider, - noncestore: redisNoncestore, - pgStore: pgStore, - systemContainer: systemContainer, - taskerClient: taskerClient, + custodial := &custodial.Custodial{ + CeloProvider: celoProvider, + EventEmitter: jsEventEmitter, + Keystore: postgresKeystore, + LockProvider: lockProvider, + Noncestore: redisNoncestore, + PgStore: pgStore, + SystemContainer: systemContainer, + TaskerClient: taskerClient, } + wg := &sync.WaitGroup{} + apiServer = initApiServer(custodial) wg.Add(1) go func() { diff --git a/cmd/service/tasker.go b/cmd/service/tasker.go index b402abc..c8236b1 100644 --- a/cmd/service/tasker.go +++ b/cmd/service/tasker.go @@ -1,6 +1,7 @@ package main import ( + "github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker/task" "github.com/grassrootseconomics/cic-custodial/pkg/redis" @@ -8,75 +9,25 @@ import ( ) // Load tasker handlers, injecting any necessary handler dependencies from the system container. -func initTasker(custodialContainer *custodial, redisPool *redis.RedisPool) *tasker.TaskerServer { +func initTasker(custodialContainer *custodial.Custodial, redisPool *redis.RedisPool) *tasker.TaskerServer { lo.Debug("Bootstrapping tasker") - js, err := initJetStream() - if err != nil { - lo.Fatal("filters: critical error loading jetstream", "error", err) - } taskerServerOpts := tasker.TaskerServerOpts{ - Concurrency: ko.MustInt("asynq.worker_count"), - Logg: lo, - LogLevel: asynq.ErrorLevel, - RedisPool: redisPool, - SystemContainer: custodialContainer.systemContainer, - TaskerClient: custodialContainer.taskerClient, - } - - if debugFlag { - taskerServerOpts.LogLevel = asynq.DebugLevel + Concurrency: ko.MustInt("asynq.worker_count"), + Logg: lo, + LogLevel: asynq.InfoLevel, + RedisPool: redisPool, } taskerServer := tasker.NewTaskerServer(taskerServerOpts) - taskerServer.RegisterHandlers(tasker.PrepareAccountTask, task.PrepareAccount( - custodialContainer.noncestore, - custodialContainer.taskerClient, - js, - )) - taskerServer.RegisterHandlers(tasker.RegisterAccountOnChain, task.RegisterAccountOnChainProcessor( - custodialContainer.celoProvider, - custodialContainer.lockProvider, - custodialContainer.noncestore, - custodialContainer.pgStore, - custodialContainer.systemContainer, - custodialContainer.taskerClient, - js, - )) - taskerServer.RegisterHandlers(tasker.GiftGasTask, task.GiftGasProcessor( - custodialContainer.celoProvider, - custodialContainer.lockProvider, - custodialContainer.noncestore, - custodialContainer.pgStore, - custodialContainer.systemContainer, - custodialContainer.taskerClient, - js, - )) - taskerServer.RegisterHandlers(tasker.GiftTokenTask, task.GiftTokenProcessor( - custodialContainer.celoProvider, - custodialContainer.lockProvider, - custodialContainer.noncestore, - custodialContainer.pgStore, - custodialContainer.systemContainer, - custodialContainer.taskerClient, - js, - )) - taskerServer.RegisterHandlers(tasker.SignTransferTask, task.SignTransfer( - custodialContainer.celoProvider, - custodialContainer.keystore, - custodialContainer.lockProvider, - custodialContainer.noncestore, - custodialContainer.pgStore, - custodialContainer.systemContainer, - custodialContainer.taskerClient, - js, - )) - taskerServer.RegisterHandlers(tasker.TxDispatchTask, task.TxDispatch( - custodialContainer.celoProvider, - custodialContainer.pgStore, - js, - )) + taskerServer.RegisterHandlers(tasker.AccountPrepareTask, task.AccountPrepare(custodialContainer)) + taskerServer.RegisterHandlers(tasker.AccountRegisterTask, task.AccountRegisterOnChainProcessor(custodialContainer)) + taskerServer.RegisterHandlers(tasker.AccountGiftGasTask, task.AccountGiftGasProcessor(custodialContainer)) + taskerServer.RegisterHandlers(tasker.AccountGiftVoucherTask, task.GiftVoucherProcessor(custodialContainer)) + taskerServer.RegisterHandlers(tasker.AccountRefillGasTask, task.AccountRefillGasProcessor(custodialContainer)) + taskerServer.RegisterHandlers(tasker.SignTransferTask, task.SignTransfer(custodialContainer)) + taskerServer.RegisterHandlers(tasker.DispatchTxTask, task.DispatchTx(custodialContainer)) return taskerServer } diff --git a/config.toml b/config.toml index 1d5440e..8a10226 100644 --- a/config.toml +++ b/config.toml @@ -4,30 +4,30 @@ address = ":5005" # /metrics endpoint metrics = true +[chain] +rpc_endpoint = "https://rpc.alfajores.celo.grassecon.net" +testnet = true + # System default values # Valus are in wei unless otherwise stated [system] # The giftable token is a training voucher # Every new user is given 5 DGFT -gas_faucet = "0xA8b3Ffc715e85792FB361BDee9357B38D5A4a6cF" -giftable_token_address = "0xdD4F5ea484F6b16f031eF7B98F3810365493BC20" +gas_faucet = "0xf2a1fc19Ad275A0EAe3445798761FeD1Eea725d5" +giftable_token_address = "0xB92463E2262E700e29c16416270c9Fdfa17934D7" giftable_token_value = 5000000 -gas_refill_threshold = 100000000000000000 -gas_refill_value = 100000000000000000 +gas_refill_threshold = 20000000000000000 +gas_refill_value = 10000000000000000 # Every custodial account is given 2 KES worth of CELO -giftable_gas_value = 2000000000000000000 +giftable_gas_value = 10000000000000000 # System private key # Should always be toped up -private_key = "bfa7222a7bea3bde312434abe819b14cf3bc8703ceaabb98a9e3a97ceb0b79fd" +private_key = "95f04bc8321cbf693db9cc2bc063411bbd8bee0e4a03dea096c755d8bcca42c6" lock_prefix = "lock:" -public_key = "0x08eb3a90128D5874da54cf654fCfA88cEd1bb047" +public_key = "0x0577AE5A3547AC753aC499E282557CE0275a4942" token_decimals = 6 token_transfer_gas_limit = 200000 -account_index = "0xdb2550ac5E52A54B6189FFAf17ECfF33AE190db9" - -[chain] -rpc_endpoint = "http://192.168.0.101:8545" -testnet = true +account_index = "0x1e041282695C66944BfC53cabce947cf35CEaf87" [postgres] debug = false @@ -47,18 +47,7 @@ task_retention_hrs = 24 # https://docs.nats.io/ [jetstream] endpoint = "nats://localhost:4222" -stream_name = "CUSTODIAL" # Duration JetStream should keep the message before remocing it from the persistent store persist_duration_hours = 48 # Duration to ignore duplicate transactions (e.g. due to restart) -dedup_duration_hours = 6 -# Stream subjects -stream_subjects = [ - "CUSTODIAL.accountNewNonce", - "CUSTODIAL.accountRegister", - "CUSTODIAL.giftNewAccountGas", - "CUSTODIAL.giftNewAccountVoucher", - "CUSTODIAL.dispatchFail", - "CUSTODIAL.dispatchSuccess", - "CUSTODIAL.transferSign" -] \ No newline at end of file +dedup_duration_hours = 6 \ No newline at end of file diff --git a/go.mod b/go.mod index 074f42a..82b1580 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/hibiken/asynq v0.24.0 github.com/jackc/pgx/v5 v5.3.0 github.com/knadh/goyesql/v2 v2.2.0 - github.com/knadh/koanf v1.4.5 + github.com/knadh/koanf v1.5.0 github.com/labstack/echo/v4 v4.10.0 github.com/nats-io/nats.go v1.23.0 github.com/zerodha/logf v0.5.5 @@ -36,13 +36,15 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-ole/go-ole v1.2.6 // indirect - github.com/go-playground/locales v0.14.0 // indirect - github.com/go-playground/universal-translator v0.18.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.11.2 // indirect github.com/go-stack/stack v1.8.1 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/gorilla/websocket v1.5.0 // indirect + github.com/grassrootseconomics/celoutils v1.0.0 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/hdevalence/ed25519consensus v0.1.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect @@ -52,6 +54,7 @@ require ( github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect + github.com/knadh/koanf/v2 v2.0.0 // indirect github.com/labstack/gommon v0.4.0 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect diff --git a/go.sum b/go.sum index a2bdc31..ff12d09 100644 --- a/go.sum +++ b/go.sum @@ -200,10 +200,16 @@ github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34 github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator v9.31.0+incompatible h1:UA72EPEogEnq76ehGdEDp4Mit+3FDh548oRqwVgNsHA= github.com/go-playground/validator v9.31.0+incompatible/go.mod h1:yrEkQXlcI+PugkyDjY2bRrL/UBU4f3rvrgkN3V8JEig= +github.com/go-playground/validator/v10 v10.11.2 h1:q3SHpufmypg+erIExEKUmsgmhDTyhcJ38oeKGACXohU= +github.com/go-playground/validator/v10 v10.11.2/go.mod h1:NieE624vt4SCTJtD87arVLvdmjPAeV8BQlHtMnw9D7s= github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M= github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= @@ -285,6 +291,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= +github.com/grassrootseconomics/celoutils v1.0.0 h1:+pfiNZA+PJczmpN7RsZ/sr9PM0jKQQHxB6y/Ix3AFwA= +github.com/grassrootseconomics/celoutils v1.0.0/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q= github.com/grassrootseconomics/cic-celo-sdk v0.4.0 h1:wh7aOQ/oK+q1nBl2koKe45WVbsWM0riPhhtEz6JLub4= github.com/grassrootseconomics/cic-celo-sdk v0.4.0/go.mod h1:G8uRw+rEw6yVP/+vBZ2V0UWXfs6iioit+eqVHrB9sBk= github.com/grassrootseconomics/w3-celo-patch v0.2.0 h1:YqibbPzX0tQKmxU1nUGzThPKk/fiYeYZY6Aif3eyu8U= @@ -403,6 +411,11 @@ github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI= github.com/knadh/koanf v1.4.5 h1:yKWFswTrqFc0u7jBAoERUz30+N1b1yPXU01gAPr8IrY= github.com/knadh/koanf v1.4.5/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= +github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= +github.com/knadh/koanf v1.5.0/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/v2 v2.0.0 h1:XPQ5ilNnwnNaHrfQ1YpTVhUAjcGHnEKA+lRpipQv02Y= +github.com/knadh/koanf/v2 v2.0.0/go.mod h1:ZeiIlIDXTE7w1lMT6UVcNiRAS2/rCeLn/GdLNvY1Dus= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= diff --git a/internal/api/account.go b/internal/api/account.go index 7aeb756..d72451b 100644 --- a/internal/api/account.go +++ b/internal/api/account.go @@ -5,7 +5,7 @@ import ( "net/http" "github.com/google/uuid" - "github.com/grassrootseconomics/cic-custodial/internal/keystore" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker/task" "github.com/grassrootseconomics/cic-custodial/pkg/keypair" @@ -15,10 +15,7 @@ import ( // CreateAccountHandler route. // POST: /api/account/create // Returns the public key. -func CreateAccountHandler( - keystore keystore.Keystore, - taskerClient *tasker.TaskerClient, -) func(echo.Context) error { +func CreateAccountHandler(cu *custodial.Custodial) func(echo.Context) error { return func(c echo.Context) error { trackingId := uuid.NewString() @@ -27,7 +24,7 @@ func CreateAccountHandler( return err } - id, err := keystore.WriteKeyPair(c.Request().Context(), generatedKeyPair) + id, err := cu.Keystore.WriteKeyPair(c.Request().Context(), generatedKeyPair) if err != nil { return err } @@ -40,8 +37,8 @@ func CreateAccountHandler( return err } - _, err = taskerClient.CreateTask( - tasker.PrepareAccountTask, + _, err = cu.TaskerClient.CreateTask( + tasker.AccountPrepareTask, tasker.DefaultPriority, &tasker.Task{ Id: trackingId, diff --git a/internal/api/sign.go b/internal/api/sign.go index 78d4397..92260e8 100644 --- a/internal/api/sign.go +++ b/internal/api/sign.go @@ -5,30 +5,30 @@ import ( "net/http" "github.com/google/uuid" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/internal/tasker" + "github.com/grassrootseconomics/cic-custodial/internal/tasker/task" "github.com/labstack/echo/v4" ) // SignTxHandler route. // POST: /api/sign/transfer // JSON Body: -// trackingId -> Unique string // from -> ETH address // to -> ETH address // voucherAddress -> ETH address // amount -> int (6 d.p. precision) // e.g. 1000000 = 1 VOUCHER // Returns the task id. -func SignTransferHandler( - taskerClient *tasker.TaskerClient, -) func(echo.Context) error { +func SignTransferHandler(cu *custodial.Custodial) func(echo.Context) error { return func(c echo.Context) error { trackingId := uuid.NewString() var transferRequest struct { - From string `json:"from" validate:"required,eth_addr"` - To string `json:"to" validate:"required,eth_addr"` - VoucherAddress string `json:"voucherAddress" validate:"required,eth_addr"` + From string `json:"from" validate:"required,eth_checksum"` + To string `json:"to" validate:"required,eth_checksum"` + VoucherAddress string `json:"voucherAddress" validate:"required,eth_checksum"` Amount int64 `json:"amount" validate:"required,numeric"` } @@ -40,12 +40,19 @@ func SignTransferHandler( return err } - taskPayload, err := json.Marshal(transferRequest) + // TODO: Checksum addresses + taskPayload, err := json.Marshal(task.TransferPayload{ + TrackingId: trackingId, + From: transferRequest.From, + To: transferRequest.To, + VoucherAddress: transferRequest.VoucherAddress, + Amount: transferRequest.Amount, + }) if err != nil { return err } - _, err = taskerClient.CreateTask( + _, err = cu.TaskerClient.CreateTask( tasker.SignTransferTask, tasker.HighPriority, &tasker.Task{ diff --git a/internal/api/validator.go b/internal/api/validator.go index b2be293..6e5edfb 100644 --- a/internal/api/validator.go +++ b/internal/api/validator.go @@ -1,10 +1,8 @@ package api import ( - "net/http" - - "github.com/go-playground/validator" - "github.com/labstack/echo/v4" + "github.com/celo-org/celo-blockchain/common" + "github.com/go-playground/validator/v10" ) type Validator struct { @@ -13,10 +11,16 @@ type Validator struct { func (v *Validator) Validate(i interface{}) error { if err := v.ValidatorProvider.Struct(i); err != nil { - return echo.NewHTTPError(http.StatusBadRequest, ErrResp{ - Ok: false, - Code: VALIDATION_ERROR, - }) + return err } return nil } + +func EthChecksumValidator(fl validator.FieldLevel) bool { + addr, err := common.NewMixedcaseAddressFromString(fl.Field().String()) + if err != nil { + return false + } + + return addr.ValidChecksum() +} diff --git a/internal/custodial/custodial.go b/internal/custodial/custodial.go new file mode 100644 index 0000000..35a2c81 --- /dev/null +++ b/internal/custodial/custodial.go @@ -0,0 +1,22 @@ +package custodial + +import ( + "github.com/bsm/redislock" + "github.com/grassrootseconomics/celoutils" + "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/keystore" + "github.com/grassrootseconomics/cic-custodial/internal/nonce" + "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/grassrootseconomics/cic-custodial/internal/tasker" +) + +type Custodial struct { + CeloProvider *celoutils.Provider + EventEmitter events.EventEmitter + Keystore keystore.Keystore + LockProvider *redislock.Client + Noncestore nonce.Noncestore + PgStore store.Store + SystemContainer *tasker.SystemContainer + TaskerClient *tasker.TaskerClient +} diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 0000000..10ff251 --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,14 @@ +package events + +type EventEmitter interface { + Close() + Publish(subject string, dedupId string, eventPayload interface{}) error +} + +type ( + EventPayload struct { + OtxId uint `json:"otxId"` + TrackingId string `json:"trackingId"` + TxHash string `json:"txHash"` + } +) diff --git a/internal/events/jetstream.go b/internal/events/jetstream.go new file mode 100644 index 0000000..f258274 --- /dev/null +++ b/internal/events/jetstream.go @@ -0,0 +1,87 @@ +package events + +import ( + "encoding/json" + "time" + + "github.com/nats-io/nats.go" +) + +const ( + StreamName string = "CUSTODIAL" + StreamSubjects string = "CUSTODIAL.*" + // Subjects + AccountNewNonce string = "CUSTODIAL.accountNewNonce" + AccountRegister string = "CUSTODIAL.accountRegister" + AccountGiftGas string = "CUSTODIAL.systemNewAccountGas" + AccountGiftVoucher string = "CUSTODIAL.systemNewAccountVoucher" + AccountRefillGas string = "CUSTODIAL.systemRefillAccountGas" + DispatchFail string = "CUSTODIAL.dispatchFail" + DispatchSuccess string = "CUSTODIAL.dispatchSuccess" + SignTransfer string = "CUSTODIAL.signTransfer" +) + +type JetStreamOpts struct { + ServerUrl string + PersistDuration time.Duration + DedupDuration time.Duration +} + +type JetStream struct { + jsCtx nats.JetStreamContext + nc *nats.Conn +} + +func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) { + natsConn, err := nats.Connect(o.ServerUrl) + if err != nil { + return nil, err + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, err + } + + // Bootstrap stream if it doesn't exist. + stream, _ := js.StreamInfo(StreamName) + if stream == nil { + _, err = js.AddStream(&nats.StreamConfig{ + Name: StreamName, + MaxAge: o.PersistDuration, + Storage: nats.FileStorage, + Subjects: []string{StreamSubjects}, + Duplicates: o.DedupDuration, + }) + if err != nil { + return nil, err + } + } + + return &JetStream{ + jsCtx: js, + nc: natsConn, + }, nil +} + +// Close gracefully shutdowns the JetStream connection. +func (js *JetStream) Close() { + if js.nc != nil { + js.nc.Close() + } +} + +// Publish publishes the JSON data to the NATS stream. +func (js *JetStream) Publish(subject string, dedupId string, eventPayload interface{}) error { + jsonData, err := json.Marshal(eventPayload) + if err != nil { + return err + } + + _, err = js.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId)) + if err != nil { + return err + } + + return nil +} diff --git a/internal/nonce/redis.go b/internal/nonce/redis.go index 56a9fe8..e7705e5 100644 --- a/internal/nonce/redis.go +++ b/internal/nonce/redis.go @@ -3,7 +3,7 @@ package nonce import ( "context" - celo "github.com/grassrootseconomics/cic-celo-sdk" + "github.com/grassrootseconomics/celoutils" redispool "github.com/grassrootseconomics/cic-custodial/pkg/redis" "github.com/grassrootseconomics/w3-celo-patch" "github.com/grassrootseconomics/w3-celo-patch/module/eth" @@ -11,12 +11,12 @@ import ( type Opts struct { RedisPool *redispool.RedisPool - CeloProvider *celo.Provider + CeloProvider *celoutils.Provider } // RedisNoncestore implements `Noncestore` type RedisNoncestore struct { - chainProvider *celo.Provider + chainProvider *celoutils.Provider redis *redispool.RedisPool } diff --git a/internal/store/dispatch.go b/internal/store/dispatch.go index 6679783..a584e48 100644 --- a/internal/store/dispatch.go +++ b/internal/store/dispatch.go @@ -6,19 +6,15 @@ import ( type Status string -func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) (uint, error) { - var ( - id uint - ) - - if err := s.db.QueryRow( +func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error { + if _, err := s.db.Exec( ctx, s.queries.CreateDispatchStatus, dispatch.OtxId, dispatch.Status, - ).Scan(&id); err != nil { - return id, err + ); err != nil { + return err } - return id, nil + return nil } diff --git a/internal/store/store.go b/internal/store/store.go index 621a40d..12f6db7 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -27,6 +27,6 @@ type ( // OTX (Custodial originating transactions). CreateOTX(ctx context.Context, otx OTX) (id uint, err error) // Dispatch status. - CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) (id uint, err error) + CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error } ) diff --git a/internal/tasker/client.go b/internal/tasker/client.go index f29be13..6a44236 100644 --- a/internal/tasker/client.go +++ b/internal/tasker/client.go @@ -8,6 +8,10 @@ import ( "github.com/hibiken/asynq" ) +const ( + taskTimeout = 60 +) + type TaskerClientOpts struct { RedisPool *redis.RedisPool TaskRetention time.Duration @@ -35,6 +39,7 @@ func (c *TaskerClient) CreateTask(taskName TaskName, queueName QueueName, task * asynq.Queue(string(queueName)), asynq.TaskID(task.Id), asynq.Retention(c.taskRetention), + asynq.Timeout(taskTimeout*time.Second), ) taskInfo, err := c.Client.Enqueue(qTask) diff --git a/internal/tasker/server.go b/internal/tasker/server.go index 4009f04..1d30156 100644 --- a/internal/tasker/server.go +++ b/internal/tasker/server.go @@ -5,7 +5,6 @@ import ( "time" "github.com/bsm/redislock" - "github.com/grassrootseconomics/cic-custodial/pkg/logg" "github.com/grassrootseconomics/cic-custodial/pkg/redis" "github.com/hibiken/asynq" "github.com/zerodha/logf" @@ -17,12 +16,10 @@ const ( ) type TaskerServerOpts struct { - Concurrency int - Logg logf.Logger - LogLevel asynq.LogLevel - RedisPool *redis.RedisPool - SystemContainer *SystemContainer - TaskerClient *TaskerClient + Concurrency int + Logg logf.Logger + LogLevel asynq.LogLevel + RedisPool *redis.RedisPool } type TaskerServer struct { @@ -36,7 +33,6 @@ func NewTaskerServer(o TaskerServerOpts) *TaskerServer { asynq.Config{ Concurrency: o.Concurrency, IsFailure: expectedFailures, - Logger: logg.NewAsynqLogg(o.Logg), LogLevel: o.LogLevel, Queues: map[string]int{ string(HighPriority): 5, diff --git a/internal/tasker/task/account.go b/internal/tasker/task/account.go deleted file mode 100644 index 2a8ae1a..0000000 --- a/internal/tasker/task/account.go +++ /dev/null @@ -1,556 +0,0 @@ -package task - -import ( - "context" - "encoding/json" - "fmt" - "math/big" - - "github.com/bsm/redislock" - "github.com/celo-org/celo-blockchain/common/hexutil" - celo "github.com/grassrootseconomics/cic-celo-sdk" - "github.com/grassrootseconomics/cic-custodial/internal/nonce" - "github.com/grassrootseconomics/cic-custodial/internal/store" - "github.com/grassrootseconomics/cic-custodial/internal/tasker" - "github.com/grassrootseconomics/w3-celo-patch" - "github.com/grassrootseconomics/w3-celo-patch/module/eth" - "github.com/hibiken/asynq" - "github.com/nats-io/nats.go" -) - -type ( - AccountPayload struct { - PublicKey string `json:"publicKey"` - TrackingId string `json:"trackingId"` - } - - accountEventPayload struct { - TrackingId string `json:"trackingId"` - } -) - -func PrepareAccount( - noncestore nonce.Noncestore, - taskerClient *tasker.TaskerClient, - js nats.JetStreamContext, -) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - p AccountPayload - ) - - if err := json.Unmarshal(t.Payload(), &p); err != nil { - return err - } - - if err := noncestore.SetNewAccountNonce(ctx, p.PublicKey); err != nil { - return err - } - - _, err := taskerClient.CreateTask( - tasker.RegisterAccountOnChain, - tasker.DefaultPriority, - &tasker.Task{ - Payload: t.Payload(), - }, - ) - if err != nil { - return err - } - - _, err = taskerClient.CreateTask( - tasker.GiftTokenTask, - tasker.DefaultPriority, - &tasker.Task{ - Payload: t.Payload(), - }, - ) - if err != nil { - return err - } - - eventPayload := &accountEventPayload{ - TrackingId: p.TrackingId, - } - - eventJson, err := json.Marshal(eventPayload) - if err != nil { - return err - } - - _, err = js.Publish("CUSTODIAL.accountNewNonce", eventJson) - if err != nil { - return err - } - - return nil - } -} - -func RegisterAccountOnChainProcessor( - celoProvider *celo.Provider, - lockProvider *redislock.Client, - noncestore nonce.Noncestore, - pg store.Store, - system *tasker.SystemContainer, - taskerClient *tasker.TaskerClient, - js nats.JetStreamContext, -) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - p AccountPayload - ) - - if err := json.Unmarshal(t.Payload(), &p); err != nil { - return err - } - - lock, err := lockProvider.Obtain(ctx, system.LockPrefix+system.PublicKey, system.LockTimeout, nil) - if err != nil { - return err - } - defer lock.Release(ctx) - - nonce, err := noncestore.Acquire(ctx, system.PublicKey) - if err != nil { - return err - } - - input, err := system.Abis["add"].EncodeArgs(w3.A(p.PublicKey)) - if err != nil { - return err - } - - // TODO: Review gas params. - builtTx, err := celoProvider.SignContractExecutionTx( - system.PrivateKey, - celo.ContractExecutionTxOpts{ - ContractAddress: system.AccountIndexContract, - InputData: input, - GasPrice: big.NewInt(20000000000), - GasLimit: system.TokenTransferGasLimit, - Nonce: nonce, - }, - ) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - return err - } - - rawTx, err := builtTx.MarshalBinary() - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - id, err := pg.CreateOTX(ctx, store.OTX{ - TrackingId: p.TrackingId, - Type: "ACCOUNT_REGISTER", - RawTx: hexutil.Encode(rawTx), - TxHash: builtTx.Hash().Hex(), - From: system.PublicKey, - Data: hexutil.Encode(builtTx.Data()), - GasPrice: builtTx.GasPrice().Uint64(), - Nonce: builtTx.Nonce(), - }) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - disptachJobPayload, err := json.Marshal(TxPayload{ - OtxId: id, - Tx: builtTx, - }) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - _, err = taskerClient.CreateTask( - tasker.TxDispatchTask, - tasker.HighPriority, - &tasker.Task{ - Payload: disptachJobPayload, - }, - ) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - _, err = taskerClient.CreateTask( - tasker.GiftGasTask, - tasker.DefaultPriority, - &tasker.Task{ - Payload: t.Payload(), - }, - ) - if err != nil { - return err - } - - eventPayload := &accountEventPayload{ - TrackingId: p.TrackingId, - } - - eventJson, err := json.Marshal(eventPayload) - if err != nil { - return err - } - - _, err = js.Publish("CUSTODIAL.accountRegister", eventJson) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - return nil - } -} - -func GiftGasProcessor( - celoProvider *celo.Provider, - lockProvider *redislock.Client, - noncestore nonce.Noncestore, - pg store.Store, - system *tasker.SystemContainer, - taskerClient *tasker.TaskerClient, - js nats.JetStreamContext, -) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - p AccountPayload - ) - - if err := json.Unmarshal(t.Payload(), &p); err != nil { - return err - } - - lock, err := lockProvider.Obtain(ctx, system.LockPrefix+system.PublicKey, system.LockTimeout, nil) - if err != nil { - return err - } - defer lock.Release(ctx) - - nonce, err := noncestore.Acquire(ctx, system.PublicKey) - if err != nil { - return err - } - - // TODO: Review gas params - builtTx, err := celoProvider.SignGasTransferTx( - system.PrivateKey, - celo.GasTransferTxOpts{ - To: w3.A(p.PublicKey), - Nonce: nonce, - Value: system.GiftableGasValue, - GasPrice: celo.FixedMinGas, - }, - ) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - rawTx, err := builtTx.MarshalBinary() - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - id, err := pg.CreateOTX(ctx, store.OTX{ - TrackingId: p.TrackingId, - Type: "GIFT_GAS", - RawTx: hexutil.Encode(rawTx), - TxHash: builtTx.Hash().Hex(), - From: system.PublicKey, - Data: hexutil.Encode(builtTx.Data()), - GasPrice: builtTx.GasPrice().Uint64(), - Nonce: builtTx.Nonce(), - }) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - disptachJobPayload, err := json.Marshal(TxPayload{ - OtxId: id, - Tx: builtTx, - }) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - _, err = taskerClient.CreateTask( - tasker.TxDispatchTask, - tasker.HighPriority, - &tasker.Task{ - Payload: disptachJobPayload, - }, - ) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - eventPayload := &accountEventPayload{ - TrackingId: p.TrackingId, - } - - eventJson, err := json.Marshal(eventPayload) - if err != nil { - return err - } - - _, err = js.Publish("CUSTODIAL.giftNewAccountGas", eventJson) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - return nil - } -} - -func GiftTokenProcessor( - celoProvider *celo.Provider, - lockProvider *redislock.Client, - noncestore nonce.Noncestore, - pg store.Store, - system *tasker.SystemContainer, - taskerClient *tasker.TaskerClient, - js nats.JetStreamContext, -) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - p AccountPayload - ) - - if err := json.Unmarshal(t.Payload(), &p); err != nil { - return err - } - - lock, err := lockProvider.Obtain(ctx, system.LockPrefix+system.PublicKey, system.LockTimeout, nil) - if err != nil { - return err - } - defer lock.Release(ctx) - - nonce, err := noncestore.Acquire(ctx, system.PublicKey) - if err != nil { - return err - } - - input, err := system.Abis["mintTo"].EncodeArgs(w3.A(p.PublicKey), system.GiftableTokenValue) - if err != nil { - return err - } - - // TODO: Review gas params. - builtTx, err := celoProvider.SignContractExecutionTx( - system.PrivateKey, - celo.ContractExecutionTxOpts{ - ContractAddress: system.GiftableToken, - InputData: input, - GasPrice: big.NewInt(20000000000), - GasLimit: system.TokenTransferGasLimit, - Nonce: nonce, - }, - ) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - return err - } - - rawTx, err := builtTx.MarshalBinary() - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - id, err := pg.CreateOTX(ctx, store.OTX{ - TrackingId: p.TrackingId, - Type: "GIFT_VOUCHER", - RawTx: hexutil.Encode(rawTx), - TxHash: builtTx.Hash().Hex(), - From: system.PublicKey, - Data: hexutil.Encode(builtTx.Data()), - GasPrice: builtTx.GasPrice().Uint64(), - Nonce: builtTx.Nonce(), - }) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - disptachJobPayload, err := json.Marshal(TxPayload{ - OtxId: id, - Tx: builtTx, - }) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - _, err = taskerClient.CreateTask( - tasker.TxDispatchTask, - tasker.HighPriority, - &tasker.Task{ - Payload: disptachJobPayload, - }, - ) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - eventPayload := &accountEventPayload{ - TrackingId: p.TrackingId, - } - - eventJson, err := json.Marshal(eventPayload) - if err != nil { - return err - } - - _, err = js.Publish("CUSTODIAL.giftNewAccountVoucher", eventJson) - if err != nil { - if err := noncestore.Return(ctx, system.PublicKey); err != nil { - return err - } - - return err - } - - return nil - } -} - -// TODO: https://github.com/grassrootseconomics/cic-custodial/issues/43 -// TODO: -func RefillGasProcessor( - celoProvider *celo.Provider, - nonceProvider nonce.Noncestore, - lockProvider *redislock.Client, - system *tasker.SystemContainer, - taskerClient *tasker.TaskerClient, -) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - p AccountPayload - balance big.Int - ) - if err := json.Unmarshal(t.Payload(), &p); err != nil { - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - - if err := celoProvider.Client.CallCtx( - ctx, - eth.Balance(w3.A(p.PublicKey), nil).Returns(&balance), - ); err != nil { - return err - } - - if belowThreshold := balance.Cmp(system.GasRefillThreshold); belowThreshold > 0 { - return nil - } - - lock, err := lockProvider.Obtain(ctx, system.LockPrefix+system.PublicKey, system.LockTimeout, nil) - if err != nil { - return err - } - defer lock.Release(ctx) - - nonce, err := nonceProvider.Acquire(ctx, system.PublicKey) - if err != nil { - return err - } - - builtTx, err := celoProvider.SignGasTransferTx( - system.PrivateKey, - celo.GasTransferTxOpts{ - To: w3.A(p.PublicKey), - Nonce: nonce, - Value: system.GasRefillValue, - GasPrice: celo.FixedMinGas, - }, - ) - if err != nil { - if err := nonceProvider.Return(ctx, p.PublicKey); err != nil { - return err - } - return fmt.Errorf("nonce.Return failed: %v: %w", err, asynq.SkipRetry) - } - - disptachJobPayload, err := json.Marshal(TxPayload{ - Tx: builtTx, - }) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v: %w", err, asynq.SkipRetry) - } - - _, err = taskerClient.CreateTask( - tasker.TxDispatchTask, - tasker.HighPriority, - &tasker.Task{ - Payload: disptachJobPayload, - }, - ) - if err != nil { - return err - } - - return nil - } -} diff --git a/internal/tasker/task/account_gift_gas.go b/internal/tasker/task/account_gift_gas.go new file mode 100644 index 0000000..c9a8e55 --- /dev/null +++ b/internal/tasker/task/account_gift_gas.go @@ -0,0 +1,119 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/grassrootseconomics/celoutils" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/grassrootseconomics/cic-custodial/internal/tasker" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/hibiken/asynq" +) + +func AccountGiftGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { + return func(ctx context.Context, t *asynq.Task) error { + var ( + payload AccountPayload + ) + + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) + } + + lock, err := cu.LockProvider.Obtain( + ctx, + cu.SystemContainer.LockPrefix+cu.SystemContainer.PublicKey, + cu.SystemContainer.LockTimeout, + nil, + ) + if err != nil { + return err + } + defer lock.Release(ctx) + + nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey) + if err != nil { + return err + } + defer func() { + if err != nil { + if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { + err = nErr + } + } + }() + + builtTx, err := cu.CeloProvider.SignGasTransferTx( + cu.SystemContainer.PrivateKey, + celoutils.GasTransferTxOpts{ + To: w3.A(payload.PublicKey), + Nonce: nonce, + Value: cu.SystemContainer.GiftableGasValue, + GasFeeCap: celoutils.SafeGasFeeCap, + GasTipCap: celoutils.SafeGasTipCap, + }, + ) + if err != nil { + return err + } + + rawTx, err := builtTx.MarshalBinary() + if err != nil { + return err + } + + id, err := cu.PgStore.CreateOTX(ctx, store.OTX{ + TrackingId: payload.TrackingId, + Type: "GIFT_GAS", + RawTx: hexutil.Encode(rawTx), + TxHash: builtTx.Hash().Hex(), + From: cu.SystemContainer.PublicKey, + Data: hexutil.Encode(builtTx.Data()), + GasPrice: builtTx.GasPrice().Uint64(), + Nonce: builtTx.Nonce(), + }) + if err != nil { + return err + } + + disptachJobPayload, err := json.Marshal(TxPayload{ + OtxId: id, + Tx: builtTx, + }) + if err != nil { + return err + } + + _, err = cu.TaskerClient.CreateTask( + tasker.DispatchTxTask, + tasker.HighPriority, + &tasker.Task{ + Payload: disptachJobPayload, + }, + ) + if err != nil { + return err + } + + eventPayload := &events.EventPayload{ + OtxId: id, + TrackingId: payload.TrackingId, + TxHash: builtTx.Hash().Hex(), + } + + if err := cu.EventEmitter.Publish( + events.AccountGiftGas, + builtTx.Hash().Hex(), + eventPayload, + ); err != nil { + return err + } + + return nil + } +} diff --git a/internal/tasker/task/account_gift_voucher.go b/internal/tasker/task/account_gift_voucher.go new file mode 100644 index 0000000..6f1dbe0 --- /dev/null +++ b/internal/tasker/task/account_gift_voucher.go @@ -0,0 +1,129 @@ +package task + +import ( + "context" + "encoding/json" + + "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/grassrootseconomics/celoutils" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/grassrootseconomics/cic-custodial/internal/tasker" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/hibiken/asynq" +) + +func GiftVoucherProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { + return func(ctx context.Context, t *asynq.Task) error { + var ( + payload AccountPayload + ) + + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + return err + } + + lock, err := cu.LockProvider.Obtain( + ctx, + cu.SystemContainer.LockPrefix+cu.SystemContainer.PublicKey, + cu.SystemContainer.LockTimeout, + nil, + ) + if err != nil { + return err + } + defer lock.Release(ctx) + + nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey) + if err != nil { + return err + } + defer func() { + if err != nil { + if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { + err = nErr + } + } + }() + + input, err := cu.SystemContainer.Abis["mintTo"].EncodeArgs( + w3.A(payload.PublicKey), + cu.SystemContainer.GiftableTokenValue, + ) + if err != nil { + return err + } + + builtTx, err := cu.CeloProvider.SignContractExecutionTx( + cu.SystemContainer.PrivateKey, + celoutils.ContractExecutionTxOpts{ + ContractAddress: cu.SystemContainer.GiftableToken, + InputData: input, + GasFeeCap: celoutils.SafeGasFeeCap, + GasTipCap: celoutils.SafeGasTipCap, + GasLimit: cu.SystemContainer.TokenTransferGasLimit, + Nonce: nonce, + }, + ) + if err != nil { + return err + } + + rawTx, err := builtTx.MarshalBinary() + if err != nil { + return err + } + + id, err := cu.PgStore.CreateOTX(ctx, store.OTX{ + TrackingId: payload.TrackingId, + Type: "GIFT_VOUCHER", + RawTx: hexutil.Encode(rawTx), + TxHash: builtTx.Hash().Hex(), + From: cu.SystemContainer.PublicKey, + Data: hexutil.Encode(builtTx.Data()), + GasPrice: builtTx.GasPrice().Uint64(), + Nonce: builtTx.Nonce(), + }) + if err != nil { + + return err + } + + disptachJobPayload, err := json.Marshal(TxPayload{ + OtxId: id, + Tx: builtTx, + }) + if err != nil { + + return err + } + + _, err = cu.TaskerClient.CreateTask( + tasker.DispatchTxTask, + tasker.HighPriority, + &tasker.Task{ + Payload: disptachJobPayload, + }, + ) + if err != nil { + return err + } + + eventPayload := &events.EventPayload{ + OtxId: id, + TrackingId: payload.TrackingId, + TxHash: builtTx.Hash().Hex(), + } + + if err := cu.EventEmitter.Publish( + events.AccountGiftVoucher, + builtTx.Hash().Hex(), + eventPayload, + ); err != nil { + return err + } + + return nil + } +} diff --git a/internal/tasker/task/account_prepare.go b/internal/tasker/task/account_prepare.go new file mode 100644 index 0000000..728cf56 --- /dev/null +++ b/internal/tasker/task/account_prepare.go @@ -0,0 +1,78 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/tasker" + "github.com/hibiken/asynq" +) + +type AccountPayload struct { + PublicKey string `json:"publicKey"` + TrackingId string `json:"trackingId"` +} + +func AccountPrepare(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { + return func(ctx context.Context, t *asynq.Task) error { + var payload AccountPayload + + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) + } + + if err := cu.Noncestore.SetNewAccountNonce(ctx, payload.PublicKey); err != nil { + return err + } + + _, err := cu.TaskerClient.CreateTask( + tasker.AccountRegisterTask, + tasker.DefaultPriority, + &tasker.Task{ + Payload: t.Payload(), + }, + ) + if err != nil { + return err + } + + _, err = cu.TaskerClient.CreateTask( + tasker.AccountGiftGasTask, + tasker.DefaultPriority, + &tasker.Task{ + Payload: t.Payload(), + }, + ) + if err != nil { + return err + } + + _, err = cu.TaskerClient.CreateTask( + tasker.AccountGiftVoucherTask, + tasker.DefaultPriority, + &tasker.Task{ + Payload: t.Payload(), + }, + ) + if err != nil { + return err + } + + eventPayload := events.EventPayload{ + TrackingId: payload.TrackingId, + } + + if err := cu.EventEmitter.Publish( + events.AccountNewNonce, + payload.PublicKey, + eventPayload, + ); err != nil { + return err + } + + return nil + } +} diff --git a/internal/tasker/task/account_refill_gas.go b/internal/tasker/task/account_refill_gas.go new file mode 100644 index 0000000..07623e6 --- /dev/null +++ b/internal/tasker/task/account_refill_gas.go @@ -0,0 +1,134 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + + "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/grassrootseconomics/celoutils" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/grassrootseconomics/cic-custodial/internal/tasker" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/grassrootseconomics/w3-celo-patch/module/eth" + "github.com/hibiken/asynq" +) + +func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { + return func(ctx context.Context, t *asynq.Task) error { + var ( + payload AccountPayload + balance big.Int + ) + + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) + } + + if err := cu.CeloProvider.Client.CallCtx( + ctx, + eth.Balance(w3.A(payload.PublicKey), nil).Returns(&balance), + ); err != nil { + return err + } + + if belowThreshold := balance.Cmp(cu.SystemContainer.GasRefillThreshold); belowThreshold > 0 { + return nil + } + + lock, err := cu.LockProvider.Obtain( + ctx, + cu.SystemContainer.LockPrefix+cu.SystemContainer.PublicKey, + cu.SystemContainer.LockTimeout, + nil, + ) + if err != nil { + return err + } + defer lock.Release(ctx) + + nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey) + if err != nil { + return err + } + defer func() { + if err != nil { + if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { + err = nErr + } + } + }() + + // TODO: Review gas params + builtTx, err := cu.CeloProvider.SignGasTransferTx( + cu.SystemContainer.PrivateKey, + celoutils.GasTransferTxOpts{ + To: w3.A(payload.PublicKey), + Nonce: nonce, + Value: cu.SystemContainer.GiftableGasValue, + GasFeeCap: celoutils.SafeGasFeeCap, + GasTipCap: celoutils.SafeGasTipCap, + }, + ) + if err != nil { + return err + } + + rawTx, err := builtTx.MarshalBinary() + if err != nil { + return err + } + + id, err := cu.PgStore.CreateOTX(ctx, store.OTX{ + TrackingId: payload.TrackingId, + Type: "REFILL_GAS", + RawTx: hexutil.Encode(rawTx), + TxHash: builtTx.Hash().Hex(), + From: cu.SystemContainer.PublicKey, + Data: hexutil.Encode(builtTx.Data()), + GasPrice: builtTx.GasPrice().Uint64(), + Nonce: builtTx.Nonce(), + }) + if err != nil { + return err + } + + disptachJobPayload, err := json.Marshal(TxPayload{ + OtxId: id, + Tx: builtTx, + }) + if err != nil { + return err + } + + _, err = cu.TaskerClient.CreateTask( + tasker.DispatchTxTask, + tasker.HighPriority, + &tasker.Task{ + Payload: disptachJobPayload, + }, + ) + if err != nil { + return err + } + + eventPayload := &events.EventPayload{ + OtxId: id, + TrackingId: payload.TrackingId, + TxHash: builtTx.Hash().Hex(), + } + + if err := cu.EventEmitter.Publish( + events.AccountRefillGas, + builtTx.Hash().Hex(), + eventPayload, + ); err != nil { + return err + } + + return nil + } +} diff --git a/internal/tasker/task/account_register_onchain.go b/internal/tasker/task/account_register_onchain.go new file mode 100644 index 0000000..bc35de6 --- /dev/null +++ b/internal/tasker/task/account_register_onchain.go @@ -0,0 +1,128 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/grassrootseconomics/celoutils" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/grassrootseconomics/cic-custodial/internal/tasker" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/hibiken/asynq" +) + +func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { + return func(ctx context.Context, t *asynq.Task) error { + var ( + payload AccountPayload + ) + + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) + } + + lock, err := cu.LockProvider.Obtain( + ctx, + cu.SystemContainer.LockPrefix+cu.SystemContainer.PublicKey, + cu.SystemContainer.LockTimeout, + nil, + ) + if err != nil { + return err + } + defer lock.Release(ctx) + + nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey) + if err != nil { + return err + } + defer func() { + if err != nil { + if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { + err = nErr + } + } + }() + + input, err := cu.SystemContainer.Abis["add"].EncodeArgs( + w3.A(payload.PublicKey), + ) + if err != nil { + return err + } + + // TODO: Review gas params. + builtTx, err := cu.CeloProvider.SignContractExecutionTx( + cu.SystemContainer.PrivateKey, + celoutils.ContractExecutionTxOpts{ + ContractAddress: cu.SystemContainer.AccountIndexContract, + InputData: input, + GasFeeCap: celoutils.SafeGasFeeCap, + GasTipCap: celoutils.SafeGasTipCap, + GasLimit: cu.SystemContainer.TokenTransferGasLimit, + Nonce: nonce, + }, + ) + if err != nil { + return err + } + + rawTx, err := builtTx.MarshalBinary() + if err != nil { + return err + } + + id, err := cu.PgStore.CreateOTX(ctx, store.OTX{ + TrackingId: payload.TrackingId, + Type: "ACCOUNT_REGISTER", + RawTx: hexutil.Encode(rawTx), + TxHash: builtTx.Hash().Hex(), + From: cu.SystemContainer.PublicKey, + Data: hexutil.Encode(builtTx.Data()), + GasPrice: builtTx.GasPrice().Uint64(), + Nonce: builtTx.Nonce(), + }) + if err != nil { + return err + } + + disptachJobPayload, err := json.Marshal(TxPayload{ + OtxId: id, + Tx: builtTx, + }) + if err != nil { + return err + } + + _, err = cu.TaskerClient.CreateTask( + tasker.DispatchTxTask, + tasker.HighPriority, + &tasker.Task{ + Payload: disptachJobPayload, + }, + ) + if err != nil { + return err + } + + eventPayload := &events.EventPayload{ + OtxId: id, + TrackingId: payload.TrackingId, + TxHash: builtTx.Hash().Hex(), + } + + if err := cu.EventEmitter.Publish( + events.AccountRegister, + builtTx.Hash().Hex(), + eventPayload, + ); err != nil { + return err + } + + return nil + } +} diff --git a/internal/tasker/task/dispatch.go b/internal/tasker/task/dispatch.go deleted file mode 100644 index 088214d..0000000 --- a/internal/tasker/task/dispatch.go +++ /dev/null @@ -1,108 +0,0 @@ -package task - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/celo-org/celo-blockchain/common" - "github.com/celo-org/celo-blockchain/core/types" - celo "github.com/grassrootseconomics/cic-celo-sdk" - "github.com/grassrootseconomics/cic-custodial/internal/store" - "github.com/grassrootseconomics/cic-custodial/pkg/status" - "github.com/grassrootseconomics/w3-celo-patch/module/eth" - "github.com/hibiken/asynq" - "github.com/nats-io/nats.go" -) - -type ( - TxPayload struct { - OtxId uint `json:"otxId"` - Tx *types.Transaction `json:"tx"` - } - - dispatchEventPayload struct { - OtxId uint - TxHash string - DispatchStatus status.Status - } -) - -func TxDispatch( - celoProvider *celo.Provider, - pg store.Store, - js nats.JetStreamContext, - -) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - p TxPayload - txHash common.Hash - ) - - if err := json.Unmarshal(t.Payload(), &p); err != nil { - return err - } - - dispatchStatus := store.DispatchStatus{ - OtxId: p.OtxId, - } - - eventPayload := &dispatchEventPayload{ - OtxId: p.OtxId, - } - - if err := celoProvider.Client.CallCtx( - ctx, - eth.SendTx(p.Tx).Returns(&txHash), - ); err != nil { - switch err.Error() { - case celo.ErrGasPriceLow: - dispatchStatus.Status = status.FailGasPrice - case celo.ErrInsufficientGas: - dispatchStatus.Status = status.FailInsufficientGas - case celo.ErrNonceLow: - dispatchStatus.Status = status.FailNonce - default: - dispatchStatus.Status = status.Unknown - } - - _, err := pg.CreateDispatchStatus(ctx, dispatchStatus) - if err != nil { - return err - } - - eventJson, err := json.Marshal(eventPayload) - if err != nil { - return err - } - - _, err = js.Publish("CUSTODIAL.dispatchFail", eventJson, nats.MsgId(txHash.Hex())) - if err != nil { - return err - } - - return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) - } - - dispatchStatus.Status = status.Successful - _, err := pg.CreateDispatchStatus(ctx, dispatchStatus) - if err != nil { - return err - } - - eventPayload.TxHash = txHash.Hex() - - eventJson, err := json.Marshal(eventPayload) - if err != nil { - return err - } - - _, err = js.Publish("CUSTODIAL.dispatchSuccess", eventJson, nats.MsgId(txHash.Hex())) - if err != nil { - return err - } - - return nil - } -} diff --git a/internal/tasker/task/dispatch_tx.go b/internal/tasker/task/dispatch_tx.go new file mode 100644 index 0000000..d0b362a --- /dev/null +++ b/internal/tasker/task/dispatch_tx.go @@ -0,0 +1,80 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/celo-org/celo-blockchain/common" + "github.com/celo-org/celo-blockchain/core/types" + "github.com/grassrootseconomics/celoutils" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/grassrootseconomics/cic-custodial/pkg/status" + "github.com/grassrootseconomics/w3-celo-patch/module/eth" + "github.com/hibiken/asynq" +) + +type TxPayload struct { + OtxId uint `json:"otxId"` + Tx *types.Transaction `json:"tx"` +} + +func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { + return func(ctx context.Context, t *asynq.Task) error { + var ( + payload TxPayload + dispatchStatus store.DispatchStatus + eventPayload events.EventPayload + dispathchTx common.Hash + ) + + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) + } + + txHash := payload.Tx.Hash().Hex() + + dispatchStatus.OtxId, eventPayload.OtxId = payload.OtxId, payload.OtxId + eventPayload.TxHash = txHash + + if err := cu.CeloProvider.Client.CallCtx( + ctx, + eth.SendTx(payload.Tx).Returns(&dispathchTx), + ); err != nil { + dispatchStatus.Status = status.Unknown + + switch err.Error() { + case celoutils.ErrGasPriceLow: + dispatchStatus.Status = status.FailGasPrice + case celoutils.ErrInsufficientGas: + dispatchStatus.Status = status.FailInsufficientGas + case celoutils.ErrNonceLow: + dispatchStatus.Status = status.FailNonce + } + + if err := cu.PgStore.CreateDispatchStatus(ctx, dispatchStatus); err != nil { + return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) + } + + if err := cu.EventEmitter.Publish(events.DispatchFail, txHash, eventPayload); err != nil { + return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) + } + + return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) + } + + dispatchStatus.Status = status.Successful + + if err := cu.PgStore.CreateDispatchStatus(ctx, dispatchStatus); err != nil { + return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) + } + + if err := cu.EventEmitter.Publish(events.DispatchSuccess, txHash, eventPayload); err != nil { + return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) + } + + return nil + } +} diff --git a/internal/tasker/task/sign.go b/internal/tasker/task/sign.go deleted file mode 100644 index 7735f12..0000000 --- a/internal/tasker/task/sign.go +++ /dev/null @@ -1,182 +0,0 @@ -package task - -import ( - "context" - "encoding/json" - "math/big" - - "github.com/bsm/redislock" - "github.com/celo-org/celo-blockchain/common/hexutil" - celo "github.com/grassrootseconomics/cic-celo-sdk" - "github.com/grassrootseconomics/cic-custodial/internal/keystore" - "github.com/grassrootseconomics/cic-custodial/internal/nonce" - "github.com/grassrootseconomics/cic-custodial/internal/store" - "github.com/grassrootseconomics/cic-custodial/internal/tasker" - "github.com/grassrootseconomics/w3-celo-patch" - "github.com/hibiken/asynq" - "github.com/nats-io/nats.go" -) - -type ( - TransferPayload struct { - TrackingId string `json:"trackingId"` - From string `json:"from" ` - To string `json:"to"` - VoucherAddress string `json:"voucherAddress"` - Amount int64 `json:"amount"` - } - - transferEventPayload struct { - DispatchTaskId string `json:"dispatchTaskId"` - OTXId uint `json:"otxId"` - TrackingId string `json:"trackingId"` - TxHash string `json:"txHash"` - } -) - -func SignTransfer( - celoProvider *celo.Provider, - keystore keystore.Keystore, - lockProvider *redislock.Client, - noncestore nonce.Noncestore, - pg store.Store, - system *tasker.SystemContainer, - taskerClient *tasker.TaskerClient, - js nats.JetStreamContext, -) func(context.Context, *asynq.Task) error { - return func(ctx context.Context, t *asynq.Task) error { - var ( - p TransferPayload - ) - - if err := json.Unmarshal(t.Payload(), &p); err != nil { - return err - } - - lock, err := lockProvider.Obtain( - ctx, - system.LockPrefix+p.From, - system.LockTimeout, - nil, - ) - if err != nil { - return err - } - defer lock.Release(ctx) - - key, err := keystore.LoadPrivateKey(ctx, p.From) - if err != nil { - return err - } - - nonce, err := noncestore.Acquire(ctx, p.From) - if err != nil { - return err - } - - input, err := system.Abis["transfer"].EncodeArgs(w3.A(p.To), big.NewInt(p.Amount)) - if err != nil { - return err - } - - // TODO: Review gas params. - builtTx, err := celoProvider.SignContractExecutionTx( - key, - celo.ContractExecutionTxOpts{ - ContractAddress: w3.A(p.VoucherAddress), - InputData: input, - GasPrice: big.NewInt(20000000000), - GasLimit: system.TokenTransferGasLimit, - Nonce: nonce, - }, - ) - if err != nil { - if err := noncestore.Return(ctx, p.From); err != nil { - return err - } - - return err - } - - rawTx, err := builtTx.MarshalBinary() - if err != nil { - if err := noncestore.Return(ctx, p.From); err != nil { - return err - } - - return err - } - - id, err := pg.CreateOTX(ctx, store.OTX{ - TrackingId: p.TrackingId, - Type: "TRANSFER", - RawTx: hexutil.Encode(rawTx), - TxHash: builtTx.Hash().Hex(), - From: p.From, - Data: hexutil.Encode(builtTx.Data()), - GasPrice: builtTx.GasPrice().Uint64(), - Nonce: builtTx.Nonce(), - }) - if err != nil { - if err := noncestore.Return(ctx, p.From); err != nil { - return err - } - - return err - } - - disptachJobPayload, err := json.Marshal(TxPayload{ - OtxId: id, - Tx: builtTx, - }) - if err != nil { - if err := noncestore.Return(ctx, p.From); err != nil { - return err - } - - return err - } - - dispatchTask, err := taskerClient.CreateTask( - tasker.TxDispatchTask, - tasker.HighPriority, - &tasker.Task{ - Payload: disptachJobPayload, - }, - ) - if err != nil { - if err := noncestore.Return(ctx, p.From); err != nil { - return err - } - - return err - } - - eventPayload := &transferEventPayload{ - DispatchTaskId: dispatchTask.ID, - OTXId: id, - TrackingId: p.TrackingId, - TxHash: builtTx.Hash().Hex(), - } - - eventJson, err := json.Marshal(eventPayload) - if err != nil { - if err := noncestore.Return(ctx, p.From); err != nil { - return err - } - - return err - } - - _, err = js.Publish("CUSTODIAL.transferSign", eventJson, nats.MsgId(builtTx.Hash().Hex())) - if err != nil { - if err := noncestore.Return(ctx, p.From); err != nil { - return err - } - - return err - } - - return nil - } -} diff --git a/internal/tasker/task/sign_transfer.go b/internal/tasker/task/sign_transfer.go new file mode 100644 index 0000000..caf92c5 --- /dev/null +++ b/internal/tasker/task/sign_transfer.go @@ -0,0 +1,167 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + + "github.com/celo-org/celo-blockchain/common/hexutil" + "github.com/grassrootseconomics/celoutils" + "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/internal/events" + "github.com/grassrootseconomics/cic-custodial/internal/store" + "github.com/grassrootseconomics/cic-custodial/internal/tasker" + "github.com/grassrootseconomics/w3-celo-patch" + "github.com/hibiken/asynq" +) + +type ( + TransferPayload struct { + TrackingId string `json:"trackingId"` + From string `json:"from" ` + To string `json:"to"` + VoucherAddress string `json:"voucherAddress"` + Amount int64 `json:"amount"` + } + + transferEventPayload struct { + DispatchTaskId string `json:"dispatchTaskId"` + OTXId uint `json:"otxId"` + TrackingId string `json:"trackingId"` + TxHash string `json:"txHash"` + } +) + +func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) error { + return func(ctx context.Context, t *asynq.Task) error { + var ( + payload TransferPayload + ) + + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry) + } + + lock, err := cu.LockProvider.Obtain( + ctx, + cu.SystemContainer.LockPrefix+payload.From, + cu.SystemContainer.LockTimeout, + nil, + ) + if err != nil { + return err + } + defer lock.Release(ctx) + + key, err := cu.Keystore.LoadPrivateKey(ctx, payload.From) + if err != nil { + return err + } + + nonce, err := cu.Noncestore.Acquire(ctx, payload.From) + if err != nil { + return err + } + defer func() { + if err != nil { + if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil { + err = nErr + } + } + }() + + input, err := cu.SystemContainer.Abis["transfer"].EncodeArgs(w3.A(payload.To), big.NewInt(payload.Amount)) + if err != nil { + return err + } + + // TODO: Review gas params. + builtTx, err := cu.CeloProvider.SignContractExecutionTx( + key, + celoutils.ContractExecutionTxOpts{ + ContractAddress: w3.A(payload.VoucherAddress), + InputData: input, + GasFeeCap: celoutils.SafeGasFeeCap, + GasTipCap: celoutils.SafeGasTipCap, + GasLimit: cu.SystemContainer.TokenTransferGasLimit, + Nonce: nonce, + }, + ) + if err != nil { + return err + } + + rawTx, err := builtTx.MarshalBinary() + if err != nil { + return err + } + + id, err := cu.PgStore.CreateOTX(ctx, store.OTX{ + TrackingId: payload.TrackingId, + Type: "TRANSFER", + RawTx: hexutil.Encode(rawTx), + TxHash: builtTx.Hash().Hex(), + From: payload.From, + Data: hexutil.Encode(builtTx.Data()), + GasPrice: builtTx.GasPrice().Uint64(), + Nonce: builtTx.Nonce(), + }) + if err != nil { + return err + } + + disptachJobPayload, err := json.Marshal(TxPayload{ + OtxId: id, + Tx: builtTx, + }) + if err != nil { + return err + } + + _, err = cu.TaskerClient.CreateTask( + tasker.DispatchTxTask, + tasker.HighPriority, + &tasker.Task{ + Payload: disptachJobPayload, + }, + ) + if err != nil { + return err + } + + gasRefillPayload, err := json.Marshal(AccountPayload{ + PublicKey: payload.From, + }) + if err != nil { + return err + } + + _, err = cu.TaskerClient.CreateTask( + tasker.AccountRefillGasTask, + tasker.DefaultPriority, + &tasker.Task{ + Payload: gasRefillPayload, + }, + ) + if err != nil { + return err + } + + eventPayload := &transferEventPayload{ + OTXId: id, + TrackingId: payload.TrackingId, + TxHash: builtTx.Hash().Hex(), + } + + if err := cu.EventEmitter.Publish( + events.SignTransfer, + builtTx.Hash().Hex(), + eventPayload, + ); err != nil { + return err + } + + return nil + } +} diff --git a/internal/tasker/types.go b/internal/tasker/types.go index ff75080..1870f40 100644 --- a/internal/tasker/types.go +++ b/internal/tasker/types.go @@ -38,15 +38,13 @@ type Task struct { } const ( - PrepareAccountTask TaskName = "sys:prepare_account" - RegisterAccountOnChain TaskName = "sys:register_account" - GiftGasTask TaskName = "sys:gift_gas" - GiftTokenTask TaskName = "sys:gift_token" - RefillGasTask TaskName = "admin:refill_gas" - SweepGasTask TaskName = "admin:sweep_gas" - AdminTokenApprovalTask TaskName = "admin:token_approval" + AccountPrepareTask TaskName = "sys:prepare_account" + AccountRegisterTask TaskName = "sys:register_account" + AccountGiftGasTask TaskName = "sys:gift_gas" + AccountGiftVoucherTask TaskName = "sys:gift_token" + AccountRefillGasTask TaskName = "sys:refill_gas" SignTransferTask TaskName = "usr:sign_transfer" - TxDispatchTask TaskName = "rpc:dispatch" + DispatchTxTask TaskName = "rpc:dispatch" ) const ( diff --git a/pkg/logg/asynq.go b/pkg/logg/asynq.go index ab85ce5..cdc986a 100644 --- a/pkg/logg/asynq.go +++ b/pkg/logg/asynq.go @@ -2,33 +2,32 @@ package logg import "github.com/zerodha/logf" -type AsynqLogg struct { - logg *logf.Logger +type AsynqLogger struct { + Lo *logf.Logger } -// NewAsynqLogg creates a logf based logging adapter for asynq. -func NewAsynqLogg(lo logf.Logger) AsynqLogg { - return AsynqLogg{ - logg: &lo, +func AsynqCompatibleLogger(lo logf.Logger) AsynqLogger { + return AsynqLogger{ + Lo: &lo, } } -func (l AsynqLogg) Debug(args ...interface{}) { - l.logg.Debug("asynq", "debug", args[0]) +func (l AsynqLogger) Debug(args ...interface{}) { + l.Lo.Debug("asynq", "debug", args[0]) } -func (l AsynqLogg) Info(args ...interface{}) { - l.logg.Info("asynq", "info", args[0]) +func (l AsynqLogger) Info(args ...interface{}) { + l.Lo.Info("asynq", "info", args[0]) } -func (l AsynqLogg) Warn(args ...interface{}) { - l.logg.Warn("asynq", "warn", args[0]) +func (l AsynqLogger) Warn(args ...interface{}) { + l.Lo.Warn("asynq", "warn", args[0]) } -func (l AsynqLogg) Error(args ...interface{}) { - l.logg.Error("asynq", "error", args[0]) +func (l AsynqLogger) Error(args ...interface{}) { + l.Lo.Error("asynq", "error", args[0]) } -func (l AsynqLogg) Fatal(args ...interface{}) { - l.logg.Fatal("asynq", "fatal", args[0]) +func (l AsynqLogger) Fatal(args ...interface{}) { + l.Lo.Fatal("asynq", "fatal", args[0]) }