refactor: task handlers, emitter, tx signer, et.c.

* fallback to custom ethereum checksum validator -> https://github.com/go-playground/validator/issues/1073
* decouple jetsream emitter to separate package
* refactor task handlers into individual files
* add error handler for echo to capture unexpected errors and log them
* move handler dependencies into single struct container -> custodialContainer
* replace signer to use EIP 1559 signer -> celoutils v1
* Add 1 minutes timeout to all custodial tasks
This commit is contained in:
Mohamed Sohail 2023-02-20 09:56:30 +00:00
parent 55a560ffde
commit cf1f9f34c3
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
30 changed files with 1147 additions and 1093 deletions

View File

@ -5,14 +5,15 @@ import (
"net/http" "net/http"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/go-playground/validator" "github.com/go-playground/validator/v10"
"github.com/grassrootseconomics/cic-custodial/internal/api" "github.com/grassrootseconomics/cic-custodial/internal/api"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
) )
// Bootstrap API server. // Bootstrap API server.
func initApiServer(custodialContainer *custodial) *echo.Echo { func initApiServer(custodialContainer *custodial.Custodial) *echo.Echo {
lo.Debug("api: bootstrapping api server") lo.Debug("api: bootstrapping api server")
server := echo.New() server := echo.New()
server.HideBanner = true server.HideBanner = true
@ -29,6 +30,15 @@ func initApiServer(custodialContainer *custodial) *echo.Echo {
return return
} }
if err.(validator.ValidationErrors) != nil {
c.JSON(http.StatusForbidden, api.ErrResp{
Ok: false,
Code: api.VALIDATION_ERROR,
Message: err.(validator.ValidationErrors).Error(),
})
return
}
// Log internal server error for further investigation. // Log internal server error for further investigation.
lo.Error("api:", "path", c.Path(), "err", err) lo.Error("api:", "path", c.Path(), "err", err)
@ -46,19 +56,16 @@ func initApiServer(custodialContainer *custodial) *echo.Echo {
}) })
} }
customValidator := validator.New()
customValidator.RegisterValidation("eth_checksum", api.EthChecksumValidator)
server.Validator = &api.Validator{ server.Validator = &api.Validator{
ValidatorProvider: validator.New(), ValidatorProvider: customValidator,
} }
apiRoute := server.Group("/api") apiRoute := server.Group("/api")
apiRoute.POST("/account/create", api.CreateAccountHandler( apiRoute.POST("/account/create", api.CreateAccountHandler(custodialContainer))
custodialContainer.keystore, apiRoute.POST("/sign/transfer", api.SignTransferHandler(custodialContainer))
custodialContainer.taskerClient,
))
apiRoute.POST("/sign/transfer", api.SignTransferHandler(
custodialContainer.taskerClient,
))
return server return server
} }

View File

@ -5,7 +5,8 @@ import (
"time" "time"
"github.com/bsm/redislock" "github.com/bsm/redislock"
celo "github.com/grassrootseconomics/cic-celo-sdk" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/queries" "github.com/grassrootseconomics/cic-custodial/internal/queries"
@ -20,10 +21,23 @@ import (
"github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/providers/file"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
// Load logger.
func initLogger(debug bool) logf.Logger {
loggOpts := logg.LoggOpts{
Color: true,
}
if debug {
loggOpts.Caller = true
loggOpts.Debug = true
}
return logg.NewLogg(loggOpts)
}
// Load config file. // Load config file.
func initConfig(configFilePath string) *koanf.Koanf { func initConfig(configFilePath string) *koanf.Koanf {
var ( var (
@ -45,34 +59,19 @@ func initConfig(configFilePath string) *koanf.Koanf {
return ko return ko
} }
// Load logger.
func initLogger(debug bool) logf.Logger {
loggOpts := logg.LoggOpts{
Color: true,
}
if debug {
loggOpts.Caller = true
loggOpts.Debug = true
}
return logg.NewLogg(loggOpts)
}
// Load Celo chain provider. // Load Celo chain provider.
func initCeloProvider() (*celo.Provider, error) { func initCeloProvider() (*celoutils.Provider, error) {
providerOpts := celo.ProviderOpts{ providerOpts := celoutils.ProviderOpts{
RpcEndpoint: ko.MustString("chain.rpc_endpoint"), RpcEndpoint: ko.MustString("chain.rpc_endpoint"),
} }
if ko.Bool("chain.testnet") { if ko.Bool("chain.testnet") {
// Devnet = 1337 providerOpts.ChainId = celoutils.TestnetChainId
providerOpts.ChainId = 1337
} else { } else {
providerOpts.ChainId = celo.MainnetChainId providerOpts.ChainId = celoutils.MainnetChainId
} }
provider, err := celo.NewProvider(providerOpts) provider, err := celoutils.NewProvider(providerOpts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -126,7 +125,7 @@ func initCommonRedisPool() (*redis.RedisPool, error) {
// Load SQL statements into struct. // Load SQL statements into struct.
func initQueries(queriesPath string) (*queries.Queries, error) { func initQueries(queriesPath string) (*queries.Queries, error) {
parsedQueries, err := goyesql.ParseFile(queriesFlag) parsedQueries, err := goyesql.ParseFile(queriesPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -150,7 +149,7 @@ func initPostgresKeystore(postgresPool *pgxpool.Pool, queries *queries.Queries)
} }
// Load redis backed noncestore. // Load redis backed noncestore.
func initRedisNoncestore(redisPool *redis.RedisPool, celoProvider *celo.Provider) nonce.Noncestore { func initRedisNoncestore(redisPool *redis.RedisPool, celoProvider *celoutils.Provider) nonce.Noncestore {
return nonce.NewRedisNoncestore(nonce.Opts{ return nonce.NewRedisNoncestore(nonce.Opts{
RedisPool: redisPool, RedisPool: redisPool,
CeloProvider: celoProvider, CeloProvider: celoProvider,
@ -179,32 +178,16 @@ func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) sto
} }
// Init JetStream context for tasker events. // Init JetStream context for tasker events.
func initJetStream() (nats.JetStreamContext, error) { func initJetStream() (events.EventEmitter, error) {
natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint")) jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{
ServerUrl: ko.MustString("jetstream.endpoint"),
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour,
DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
js, err := natsConn.JetStream() return jsEmitter, nil
if err != nil {
return nil, err
}
// Bootstrap stream if it does not exist
stream, _ := js.StreamInfo(ko.MustString("jetstream.stream_name"))
if stream == nil {
lo.Info("jetstream: bootstrapping stream")
_, err = js.AddStream(&nats.StreamConfig{
Name: ko.MustString("jetstream.stream_name"),
MaxAge: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour,
Storage: nats.FileStorage,
Subjects: ko.MustStrings("jetstream.stream_subjects"),
Duplicates: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour,
})
if err != nil {
return nil, err
}
}
return js, nil
} }

View File

@ -9,11 +9,7 @@ import (
"sync" "sync"
"syscall" "syscall"
"github.com/bsm/redislock" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
celo "github.com/grassrootseconomics/cic-celo-sdk"
"github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/knadh/koanf" "github.com/knadh/koanf"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
@ -29,16 +25,6 @@ var (
ko *koanf.Koanf ko *koanf.Koanf
) )
type custodial struct {
celoProvider *celo.Provider
keystore keystore.Keystore
lockProvider *redislock.Client
noncestore nonce.Noncestore
pgStore store.Store
systemContainer *tasker.SystemContainer
taskerClient *tasker.TaskerClient
}
func init() { func init() {
flag.StringVar(&confFlag, "config", "config.toml", "Config file location") flag.StringVar(&confFlag, "config", "config.toml", "Config file location")
flag.BoolVar(&debugFlag, "log", false, "Enable debug logging") flag.BoolVar(&debugFlag, "log", false, "Enable debug logging")
@ -47,28 +33,28 @@ func init() {
lo = initLogger(debugFlag) lo = initLogger(debugFlag)
ko = initConfig(confFlag) ko = initConfig(confFlag)
} }
func main() { func main() {
var ( var (
tasker *tasker.TaskerServer tasker *tasker.TaskerServer
apiServer *echo.Echo apiServer *echo.Echo
wg sync.WaitGroup
) )
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop() defer stop()
celoProvider, err := initCeloProvider()
if err != nil {
lo.Fatal("main: critical error loading chain provider", "error", err)
}
queries, err := initQueries(queriesFlag) queries, err := initQueries(queriesFlag)
if err != nil { if err != nil {
lo.Fatal("main: critical error loading SQL queries", "error", err) lo.Fatal("main: critical error loading SQL queries", "error", err)
} }
celoProvider, err := initCeloProvider()
if err != nil {
lo.Fatal("main: critical error loading chain provider", "error", err)
}
postgresPool, err := initPostgresPool() postgresPool, err := initPostgresPool()
if err != nil { if err != nil {
lo.Fatal("main: critical error connecting to postgres", "error", err) lo.Fatal("main: critical error connecting to postgres", "error", err)
@ -89,6 +75,11 @@ func main() {
lo.Fatal("main: critical error loading keystore") lo.Fatal("main: critical error loading keystore")
} }
jsEventEmitter, err := initJetStream()
if err != nil {
lo.Fatal("main: critical error loading jetstream event emitter")
}
pgStore := initPostgresStore(postgresPool, queries) pgStore := initPostgresStore(postgresPool, queries)
redisNoncestore := initRedisNoncestore(redisPool, celoProvider) redisNoncestore := initRedisNoncestore(redisPool, celoProvider)
lockProvider := initLockProvider(redisPool.Client) lockProvider := initLockProvider(redisPool.Client)
@ -99,16 +90,19 @@ func main() {
lo.Fatal("main: critical error bootstrapping system container", "error", err) lo.Fatal("main: critical error bootstrapping system container", "error", err)
} }
custodial := &custodial{ custodial := &custodial.Custodial{
celoProvider: celoProvider, CeloProvider: celoProvider,
keystore: postgresKeystore, EventEmitter: jsEventEmitter,
lockProvider: lockProvider, Keystore: postgresKeystore,
noncestore: redisNoncestore, LockProvider: lockProvider,
pgStore: pgStore, Noncestore: redisNoncestore,
systemContainer: systemContainer, PgStore: pgStore,
taskerClient: taskerClient, SystemContainer: systemContainer,
TaskerClient: taskerClient,
} }
wg := &sync.WaitGroup{}
apiServer = initApiServer(custodial) apiServer = initApiServer(custodial)
wg.Add(1) wg.Add(1)
go func() { go func() {

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/internal/tasker/task" "github.com/grassrootseconomics/cic-custodial/internal/tasker/task"
"github.com/grassrootseconomics/cic-custodial/pkg/redis" "github.com/grassrootseconomics/cic-custodial/pkg/redis"
@ -8,75 +9,25 @@ import (
) )
// Load tasker handlers, injecting any necessary handler dependencies from the system container. // Load tasker handlers, injecting any necessary handler dependencies from the system container.
func initTasker(custodialContainer *custodial, redisPool *redis.RedisPool) *tasker.TaskerServer { func initTasker(custodialContainer *custodial.Custodial, redisPool *redis.RedisPool) *tasker.TaskerServer {
lo.Debug("Bootstrapping tasker") lo.Debug("Bootstrapping tasker")
js, err := initJetStream()
if err != nil {
lo.Fatal("filters: critical error loading jetstream", "error", err)
}
taskerServerOpts := tasker.TaskerServerOpts{ taskerServerOpts := tasker.TaskerServerOpts{
Concurrency: ko.MustInt("asynq.worker_count"), Concurrency: ko.MustInt("asynq.worker_count"),
Logg: lo, Logg: lo,
LogLevel: asynq.ErrorLevel, LogLevel: asynq.InfoLevel,
RedisPool: redisPool, RedisPool: redisPool,
SystemContainer: custodialContainer.systemContainer,
TaskerClient: custodialContainer.taskerClient,
}
if debugFlag {
taskerServerOpts.LogLevel = asynq.DebugLevel
} }
taskerServer := tasker.NewTaskerServer(taskerServerOpts) taskerServer := tasker.NewTaskerServer(taskerServerOpts)
taskerServer.RegisterHandlers(tasker.PrepareAccountTask, task.PrepareAccount( taskerServer.RegisterHandlers(tasker.AccountPrepareTask, task.AccountPrepare(custodialContainer))
custodialContainer.noncestore, taskerServer.RegisterHandlers(tasker.AccountRegisterTask, task.AccountRegisterOnChainProcessor(custodialContainer))
custodialContainer.taskerClient, taskerServer.RegisterHandlers(tasker.AccountGiftGasTask, task.AccountGiftGasProcessor(custodialContainer))
js, taskerServer.RegisterHandlers(tasker.AccountGiftVoucherTask, task.GiftVoucherProcessor(custodialContainer))
)) taskerServer.RegisterHandlers(tasker.AccountRefillGasTask, task.AccountRefillGasProcessor(custodialContainer))
taskerServer.RegisterHandlers(tasker.RegisterAccountOnChain, task.RegisterAccountOnChainProcessor( taskerServer.RegisterHandlers(tasker.SignTransferTask, task.SignTransfer(custodialContainer))
custodialContainer.celoProvider, taskerServer.RegisterHandlers(tasker.DispatchTxTask, task.DispatchTx(custodialContainer))
custodialContainer.lockProvider,
custodialContainer.noncestore,
custodialContainer.pgStore,
custodialContainer.systemContainer,
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.GiftGasTask, task.GiftGasProcessor(
custodialContainer.celoProvider,
custodialContainer.lockProvider,
custodialContainer.noncestore,
custodialContainer.pgStore,
custodialContainer.systemContainer,
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.GiftTokenTask, task.GiftTokenProcessor(
custodialContainer.celoProvider,
custodialContainer.lockProvider,
custodialContainer.noncestore,
custodialContainer.pgStore,
custodialContainer.systemContainer,
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.SignTransferTask, task.SignTransfer(
custodialContainer.celoProvider,
custodialContainer.keystore,
custodialContainer.lockProvider,
custodialContainer.noncestore,
custodialContainer.pgStore,
custodialContainer.systemContainer,
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.TxDispatchTask, task.TxDispatch(
custodialContainer.celoProvider,
custodialContainer.pgStore,
js,
))
return taskerServer return taskerServer
} }

View File

@ -4,30 +4,30 @@ address = ":5005"
# /metrics endpoint # /metrics endpoint
metrics = true metrics = true
[chain]
rpc_endpoint = "https://rpc.alfajores.celo.grassecon.net"
testnet = true
# System default values # System default values
# Valus are in wei unless otherwise stated # Valus are in wei unless otherwise stated
[system] [system]
# The giftable token is a training voucher # The giftable token is a training voucher
# Every new user is given 5 DGFT # Every new user is given 5 DGFT
gas_faucet = "0xA8b3Ffc715e85792FB361BDee9357B38D5A4a6cF" gas_faucet = "0xf2a1fc19Ad275A0EAe3445798761FeD1Eea725d5"
giftable_token_address = "0xdD4F5ea484F6b16f031eF7B98F3810365493BC20" giftable_token_address = "0xB92463E2262E700e29c16416270c9Fdfa17934D7"
giftable_token_value = 5000000 giftable_token_value = 5000000
gas_refill_threshold = 100000000000000000 gas_refill_threshold = 20000000000000000
gas_refill_value = 100000000000000000 gas_refill_value = 10000000000000000
# Every custodial account is given 2 KES worth of CELO # Every custodial account is given 2 KES worth of CELO
giftable_gas_value = 2000000000000000000 giftable_gas_value = 10000000000000000
# System private key # System private key
# Should always be toped up # Should always be toped up
private_key = "bfa7222a7bea3bde312434abe819b14cf3bc8703ceaabb98a9e3a97ceb0b79fd" private_key = "95f04bc8321cbf693db9cc2bc063411bbd8bee0e4a03dea096c755d8bcca42c6"
lock_prefix = "lock:" lock_prefix = "lock:"
public_key = "0x08eb3a90128D5874da54cf654fCfA88cEd1bb047" public_key = "0x0577AE5A3547AC753aC499E282557CE0275a4942"
token_decimals = 6 token_decimals = 6
token_transfer_gas_limit = 200000 token_transfer_gas_limit = 200000
account_index = "0xdb2550ac5E52A54B6189FFAf17ECfF33AE190db9" account_index = "0x1e041282695C66944BfC53cabce947cf35CEaf87"
[chain]
rpc_endpoint = "http://192.168.0.101:8545"
testnet = true
[postgres] [postgres]
debug = false debug = false
@ -47,18 +47,7 @@ task_retention_hrs = 24
# https://docs.nats.io/ # https://docs.nats.io/
[jetstream] [jetstream]
endpoint = "nats://localhost:4222" endpoint = "nats://localhost:4222"
stream_name = "CUSTODIAL"
# Duration JetStream should keep the message before remocing it from the persistent store # Duration JetStream should keep the message before remocing it from the persistent store
persist_duration_hours = 48 persist_duration_hours = 48
# Duration to ignore duplicate transactions (e.g. due to restart) # Duration to ignore duplicate transactions (e.g. due to restart)
dedup_duration_hours = 6 dedup_duration_hours = 6
# Stream subjects
stream_subjects = [
"CUSTODIAL.accountNewNonce",
"CUSTODIAL.accountRegister",
"CUSTODIAL.giftNewAccountGas",
"CUSTODIAL.giftNewAccountVoucher",
"CUSTODIAL.dispatchFail",
"CUSTODIAL.dispatchSuccess",
"CUSTODIAL.transferSign"
]

9
go.mod
View File

@ -14,7 +14,7 @@ require (
github.com/hibiken/asynq v0.24.0 github.com/hibiken/asynq v0.24.0
github.com/jackc/pgx/v5 v5.3.0 github.com/jackc/pgx/v5 v5.3.0
github.com/knadh/goyesql/v2 v2.2.0 github.com/knadh/goyesql/v2 v2.2.0
github.com/knadh/koanf v1.4.5 github.com/knadh/koanf v1.5.0
github.com/labstack/echo/v4 v4.10.0 github.com/labstack/echo/v4 v4.10.0
github.com/nats-io/nats.go v1.23.0 github.com/nats-io/nats.go v1.23.0
github.com/zerodha/logf v0.5.5 github.com/zerodha/logf v0.5.5
@ -36,13 +36,15 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.11.2 // indirect
github.com/go-stack/stack v1.8.1 // indirect github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect github.com/google/go-cmp v0.5.9 // indirect
github.com/gorilla/websocket v1.5.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect
github.com/grassrootseconomics/celoutils v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/hdevalence/ed25519consensus v0.1.0 // indirect github.com/hdevalence/ed25519consensus v0.1.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
@ -52,6 +54,7 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/knadh/koanf/v2 v2.0.0 // indirect
github.com/labstack/gommon v0.4.0 // indirect github.com/labstack/gommon v0.4.0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-colorable v0.1.13 // indirect

13
go.sum
View File

@ -200,10 +200,16 @@ github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho=
github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator v9.31.0+incompatible h1:UA72EPEogEnq76ehGdEDp4Mit+3FDh548oRqwVgNsHA= github.com/go-playground/validator v9.31.0+incompatible h1:UA72EPEogEnq76ehGdEDp4Mit+3FDh548oRqwVgNsHA=
github.com/go-playground/validator v9.31.0+incompatible/go.mod h1:yrEkQXlcI+PugkyDjY2bRrL/UBU4f3rvrgkN3V8JEig= github.com/go-playground/validator v9.31.0+incompatible/go.mod h1:yrEkQXlcI+PugkyDjY2bRrL/UBU4f3rvrgkN3V8JEig=
github.com/go-playground/validator/v10 v10.11.2 h1:q3SHpufmypg+erIExEKUmsgmhDTyhcJ38oeKGACXohU=
github.com/go-playground/validator/v10 v10.11.2/go.mod h1:NieE624vt4SCTJtD87arVLvdmjPAeV8BQlHtMnw9D7s=
github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M= github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M=
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
@ -285,6 +291,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/grassrootseconomics/celoutils v1.0.0 h1:+pfiNZA+PJczmpN7RsZ/sr9PM0jKQQHxB6y/Ix3AFwA=
github.com/grassrootseconomics/celoutils v1.0.0/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q=
github.com/grassrootseconomics/cic-celo-sdk v0.4.0 h1:wh7aOQ/oK+q1nBl2koKe45WVbsWM0riPhhtEz6JLub4= github.com/grassrootseconomics/cic-celo-sdk v0.4.0 h1:wh7aOQ/oK+q1nBl2koKe45WVbsWM0riPhhtEz6JLub4=
github.com/grassrootseconomics/cic-celo-sdk v0.4.0/go.mod h1:G8uRw+rEw6yVP/+vBZ2V0UWXfs6iioit+eqVHrB9sBk= github.com/grassrootseconomics/cic-celo-sdk v0.4.0/go.mod h1:G8uRw+rEw6yVP/+vBZ2V0UWXfs6iioit+eqVHrB9sBk=
github.com/grassrootseconomics/w3-celo-patch v0.2.0 h1:YqibbPzX0tQKmxU1nUGzThPKk/fiYeYZY6Aif3eyu8U= github.com/grassrootseconomics/w3-celo-patch v0.2.0 h1:YqibbPzX0tQKmxU1nUGzThPKk/fiYeYZY6Aif3eyu8U=
@ -403,6 +411,11 @@ github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem
github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI= github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI=
github.com/knadh/koanf v1.4.5 h1:yKWFswTrqFc0u7jBAoERUz30+N1b1yPXU01gAPr8IrY= github.com/knadh/koanf v1.4.5 h1:yKWFswTrqFc0u7jBAoERUz30+N1b1yPXU01gAPr8IrY=
github.com/knadh/koanf v1.4.5/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= github.com/knadh/koanf v1.4.5/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs=
github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs=
github.com/knadh/koanf v1.5.0/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs=
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
github.com/knadh/koanf/v2 v2.0.0 h1:XPQ5ilNnwnNaHrfQ1YpTVhUAjcGHnEKA+lRpipQv02Y=
github.com/knadh/koanf/v2 v2.0.0/go.mod h1:ZeiIlIDXTE7w1lMT6UVcNiRAS2/rCeLn/GdLNvY1Dus=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=

View File

@ -5,7 +5,7 @@ import (
"net/http" "net/http"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/internal/tasker/task" "github.com/grassrootseconomics/cic-custodial/internal/tasker/task"
"github.com/grassrootseconomics/cic-custodial/pkg/keypair" "github.com/grassrootseconomics/cic-custodial/pkg/keypair"
@ -15,10 +15,7 @@ import (
// CreateAccountHandler route. // CreateAccountHandler route.
// POST: /api/account/create // POST: /api/account/create
// Returns the public key. // Returns the public key.
func CreateAccountHandler( func CreateAccountHandler(cu *custodial.Custodial) func(echo.Context) error {
keystore keystore.Keystore,
taskerClient *tasker.TaskerClient,
) func(echo.Context) error {
return func(c echo.Context) error { return func(c echo.Context) error {
trackingId := uuid.NewString() trackingId := uuid.NewString()
@ -27,7 +24,7 @@ func CreateAccountHandler(
return err return err
} }
id, err := keystore.WriteKeyPair(c.Request().Context(), generatedKeyPair) id, err := cu.Keystore.WriteKeyPair(c.Request().Context(), generatedKeyPair)
if err != nil { if err != nil {
return err return err
} }
@ -40,8 +37,8 @@ func CreateAccountHandler(
return err return err
} }
_, err = taskerClient.CreateTask( _, err = cu.TaskerClient.CreateTask(
tasker.PrepareAccountTask, tasker.AccountPrepareTask,
tasker.DefaultPriority, tasker.DefaultPriority,
&tasker.Task{ &tasker.Task{
Id: trackingId, Id: trackingId,

View File

@ -5,30 +5,30 @@ import (
"net/http" "net/http"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/internal/tasker/task"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
) )
// SignTxHandler route. // SignTxHandler route.
// POST: /api/sign/transfer // POST: /api/sign/transfer
// JSON Body: // JSON Body:
// trackingId -> Unique string
// from -> ETH address // from -> ETH address
// to -> ETH address // to -> ETH address
// voucherAddress -> ETH address // voucherAddress -> ETH address
// amount -> int (6 d.p. precision) // amount -> int (6 d.p. precision)
// e.g. 1000000 = 1 VOUCHER // e.g. 1000000 = 1 VOUCHER
// Returns the task id. // Returns the task id.
func SignTransferHandler( func SignTransferHandler(cu *custodial.Custodial) func(echo.Context) error {
taskerClient *tasker.TaskerClient,
) func(echo.Context) error {
return func(c echo.Context) error { return func(c echo.Context) error {
trackingId := uuid.NewString() trackingId := uuid.NewString()
var transferRequest struct { var transferRequest struct {
From string `json:"from" validate:"required,eth_addr"` From string `json:"from" validate:"required,eth_checksum"`
To string `json:"to" validate:"required,eth_addr"` To string `json:"to" validate:"required,eth_checksum"`
VoucherAddress string `json:"voucherAddress" validate:"required,eth_addr"` VoucherAddress string `json:"voucherAddress" validate:"required,eth_checksum"`
Amount int64 `json:"amount" validate:"required,numeric"` Amount int64 `json:"amount" validate:"required,numeric"`
} }
@ -40,12 +40,19 @@ func SignTransferHandler(
return err return err
} }
taskPayload, err := json.Marshal(transferRequest) // TODO: Checksum addresses
taskPayload, err := json.Marshal(task.TransferPayload{
TrackingId: trackingId,
From: transferRequest.From,
To: transferRequest.To,
VoucherAddress: transferRequest.VoucherAddress,
Amount: transferRequest.Amount,
})
if err != nil { if err != nil {
return err return err
} }
_, err = taskerClient.CreateTask( _, err = cu.TaskerClient.CreateTask(
tasker.SignTransferTask, tasker.SignTransferTask,
tasker.HighPriority, tasker.HighPriority,
&tasker.Task{ &tasker.Task{

View File

@ -1,10 +1,8 @@
package api package api
import ( import (
"net/http" "github.com/celo-org/celo-blockchain/common"
"github.com/go-playground/validator/v10"
"github.com/go-playground/validator"
"github.com/labstack/echo/v4"
) )
type Validator struct { type Validator struct {
@ -13,10 +11,16 @@ type Validator struct {
func (v *Validator) Validate(i interface{}) error { func (v *Validator) Validate(i interface{}) error {
if err := v.ValidatorProvider.Struct(i); err != nil { if err := v.ValidatorProvider.Struct(i); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, ErrResp{ return err
Ok: false,
Code: VALIDATION_ERROR,
})
} }
return nil return nil
} }
func EthChecksumValidator(fl validator.FieldLevel) bool {
addr, err := common.NewMixedcaseAddressFromString(fl.Field().String())
if err != nil {
return false
}
return addr.ValidChecksum()
}

View File

@ -0,0 +1,22 @@
package custodial
import (
"github.com/bsm/redislock"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
)
type Custodial struct {
CeloProvider *celoutils.Provider
EventEmitter events.EventEmitter
Keystore keystore.Keystore
LockProvider *redislock.Client
Noncestore nonce.Noncestore
PgStore store.Store
SystemContainer *tasker.SystemContainer
TaskerClient *tasker.TaskerClient
}

14
internal/events/events.go Normal file
View File

@ -0,0 +1,14 @@
package events
type EventEmitter interface {
Close()
Publish(subject string, dedupId string, eventPayload interface{}) error
}
type (
EventPayload struct {
OtxId uint `json:"otxId"`
TrackingId string `json:"trackingId"`
TxHash string `json:"txHash"`
}
)

View File

@ -0,0 +1,87 @@
package events
import (
"encoding/json"
"time"
"github.com/nats-io/nats.go"
)
const (
StreamName string = "CUSTODIAL"
StreamSubjects string = "CUSTODIAL.*"
// Subjects
AccountNewNonce string = "CUSTODIAL.accountNewNonce"
AccountRegister string = "CUSTODIAL.accountRegister"
AccountGiftGas string = "CUSTODIAL.systemNewAccountGas"
AccountGiftVoucher string = "CUSTODIAL.systemNewAccountVoucher"
AccountRefillGas string = "CUSTODIAL.systemRefillAccountGas"
DispatchFail string = "CUSTODIAL.dispatchFail"
DispatchSuccess string = "CUSTODIAL.dispatchSuccess"
SignTransfer string = "CUSTODIAL.signTransfer"
)
type JetStreamOpts struct {
ServerUrl string
PersistDuration time.Duration
DedupDuration time.Duration
}
type JetStream struct {
jsCtx nats.JetStreamContext
nc *nats.Conn
}
func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) {
natsConn, err := nats.Connect(o.ServerUrl)
if err != nil {
return nil, err
}
js, err := natsConn.JetStream()
if err != nil {
return nil, err
}
// Bootstrap stream if it doesn't exist.
stream, _ := js.StreamInfo(StreamName)
if stream == nil {
_, err = js.AddStream(&nats.StreamConfig{
Name: StreamName,
MaxAge: o.PersistDuration,
Storage: nats.FileStorage,
Subjects: []string{StreamSubjects},
Duplicates: o.DedupDuration,
})
if err != nil {
return nil, err
}
}
return &JetStream{
jsCtx: js,
nc: natsConn,
}, nil
}
// Close gracefully shutdowns the JetStream connection.
func (js *JetStream) Close() {
if js.nc != nil {
js.nc.Close()
}
}
// Publish publishes the JSON data to the NATS stream.
func (js *JetStream) Publish(subject string, dedupId string, eventPayload interface{}) error {
jsonData, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId))
if err != nil {
return err
}
return nil
}

View File

@ -3,7 +3,7 @@ package nonce
import ( import (
"context" "context"
celo "github.com/grassrootseconomics/cic-celo-sdk" "github.com/grassrootseconomics/celoutils"
redispool "github.com/grassrootseconomics/cic-custodial/pkg/redis" redispool "github.com/grassrootseconomics/cic-custodial/pkg/redis"
"github.com/grassrootseconomics/w3-celo-patch" "github.com/grassrootseconomics/w3-celo-patch"
"github.com/grassrootseconomics/w3-celo-patch/module/eth" "github.com/grassrootseconomics/w3-celo-patch/module/eth"
@ -11,12 +11,12 @@ import (
type Opts struct { type Opts struct {
RedisPool *redispool.RedisPool RedisPool *redispool.RedisPool
CeloProvider *celo.Provider CeloProvider *celoutils.Provider
} }
// RedisNoncestore implements `Noncestore` // RedisNoncestore implements `Noncestore`
type RedisNoncestore struct { type RedisNoncestore struct {
chainProvider *celo.Provider chainProvider *celoutils.Provider
redis *redispool.RedisPool redis *redispool.RedisPool
} }

View File

@ -6,19 +6,15 @@ import (
type Status string type Status string
func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) (uint, error) { func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error {
var ( if _, err := s.db.Exec(
id uint
)
if err := s.db.QueryRow(
ctx, ctx,
s.queries.CreateDispatchStatus, s.queries.CreateDispatchStatus,
dispatch.OtxId, dispatch.OtxId,
dispatch.Status, dispatch.Status,
).Scan(&id); err != nil { ); err != nil {
return id, err return err
} }
return id, nil return nil
} }

View File

@ -27,6 +27,6 @@ type (
// OTX (Custodial originating transactions). // OTX (Custodial originating transactions).
CreateOTX(ctx context.Context, otx OTX) (id uint, err error) CreateOTX(ctx context.Context, otx OTX) (id uint, err error)
// Dispatch status. // Dispatch status.
CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) (id uint, err error) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error
} }
) )

View File

@ -8,6 +8,10 @@ import (
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
const (
taskTimeout = 60
)
type TaskerClientOpts struct { type TaskerClientOpts struct {
RedisPool *redis.RedisPool RedisPool *redis.RedisPool
TaskRetention time.Duration TaskRetention time.Duration
@ -35,6 +39,7 @@ func (c *TaskerClient) CreateTask(taskName TaskName, queueName QueueName, task *
asynq.Queue(string(queueName)), asynq.Queue(string(queueName)),
asynq.TaskID(task.Id), asynq.TaskID(task.Id),
asynq.Retention(c.taskRetention), asynq.Retention(c.taskRetention),
asynq.Timeout(taskTimeout*time.Second),
) )
taskInfo, err := c.Client.Enqueue(qTask) taskInfo, err := c.Client.Enqueue(qTask)

View File

@ -5,7 +5,6 @@ import (
"time" "time"
"github.com/bsm/redislock" "github.com/bsm/redislock"
"github.com/grassrootseconomics/cic-custodial/pkg/logg"
"github.com/grassrootseconomics/cic-custodial/pkg/redis" "github.com/grassrootseconomics/cic-custodial/pkg/redis"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/zerodha/logf" "github.com/zerodha/logf"
@ -17,12 +16,10 @@ const (
) )
type TaskerServerOpts struct { type TaskerServerOpts struct {
Concurrency int Concurrency int
Logg logf.Logger Logg logf.Logger
LogLevel asynq.LogLevel LogLevel asynq.LogLevel
RedisPool *redis.RedisPool RedisPool *redis.RedisPool
SystemContainer *SystemContainer
TaskerClient *TaskerClient
} }
type TaskerServer struct { type TaskerServer struct {
@ -36,7 +33,6 @@ func NewTaskerServer(o TaskerServerOpts) *TaskerServer {
asynq.Config{ asynq.Config{
Concurrency: o.Concurrency, Concurrency: o.Concurrency,
IsFailure: expectedFailures, IsFailure: expectedFailures,
Logger: logg.NewAsynqLogg(o.Logg),
LogLevel: o.LogLevel, LogLevel: o.LogLevel,
Queues: map[string]int{ Queues: map[string]int{
string(HighPriority): 5, string(HighPriority): 5,

View File

@ -1,556 +0,0 @@
package task
import (
"context"
"encoding/json"
"fmt"
"math/big"
"github.com/bsm/redislock"
"github.com/celo-org/celo-blockchain/common/hexutil"
celo "github.com/grassrootseconomics/cic-celo-sdk"
"github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/grassrootseconomics/w3-celo-patch/module/eth"
"github.com/hibiken/asynq"
"github.com/nats-io/nats.go"
)
type (
AccountPayload struct {
PublicKey string `json:"publicKey"`
TrackingId string `json:"trackingId"`
}
accountEventPayload struct {
TrackingId string `json:"trackingId"`
}
)
func PrepareAccount(
noncestore nonce.Noncestore,
taskerClient *tasker.TaskerClient,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
p AccountPayload
)
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
if err := noncestore.SetNewAccountNonce(ctx, p.PublicKey); err != nil {
return err
}
_, err := taskerClient.CreateTask(
tasker.RegisterAccountOnChain,
tasker.DefaultPriority,
&tasker.Task{
Payload: t.Payload(),
},
)
if err != nil {
return err
}
_, err = taskerClient.CreateTask(
tasker.GiftTokenTask,
tasker.DefaultPriority,
&tasker.Task{
Payload: t.Payload(),
},
)
if err != nil {
return err
}
eventPayload := &accountEventPayload{
TrackingId: p.TrackingId,
}
eventJson, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.Publish("CUSTODIAL.accountNewNonce", eventJson)
if err != nil {
return err
}
return nil
}
}
func RegisterAccountOnChainProcessor(
celoProvider *celo.Provider,
lockProvider *redislock.Client,
noncestore nonce.Noncestore,
pg store.Store,
system *tasker.SystemContainer,
taskerClient *tasker.TaskerClient,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
p AccountPayload
)
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
lock, err := lockProvider.Obtain(ctx, system.LockPrefix+system.PublicKey, system.LockTimeout, nil)
if err != nil {
return err
}
defer lock.Release(ctx)
nonce, err := noncestore.Acquire(ctx, system.PublicKey)
if err != nil {
return err
}
input, err := system.Abis["add"].EncodeArgs(w3.A(p.PublicKey))
if err != nil {
return err
}
// TODO: Review gas params.
builtTx, err := celoProvider.SignContractExecutionTx(
system.PrivateKey,
celo.ContractExecutionTxOpts{
ContractAddress: system.AccountIndexContract,
InputData: input,
GasPrice: big.NewInt(20000000000),
GasLimit: system.TokenTransferGasLimit,
Nonce: nonce,
},
)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
id, err := pg.CreateOTX(ctx, store.OTX{
TrackingId: p.TrackingId,
Type: "ACCOUNT_REGISTER",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: system.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
_, err = taskerClient.CreateTask(
tasker.TxDispatchTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
_, err = taskerClient.CreateTask(
tasker.GiftGasTask,
tasker.DefaultPriority,
&tasker.Task{
Payload: t.Payload(),
},
)
if err != nil {
return err
}
eventPayload := &accountEventPayload{
TrackingId: p.TrackingId,
}
eventJson, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.Publish("CUSTODIAL.accountRegister", eventJson)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
return nil
}
}
func GiftGasProcessor(
celoProvider *celo.Provider,
lockProvider *redislock.Client,
noncestore nonce.Noncestore,
pg store.Store,
system *tasker.SystemContainer,
taskerClient *tasker.TaskerClient,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
p AccountPayload
)
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
lock, err := lockProvider.Obtain(ctx, system.LockPrefix+system.PublicKey, system.LockTimeout, nil)
if err != nil {
return err
}
defer lock.Release(ctx)
nonce, err := noncestore.Acquire(ctx, system.PublicKey)
if err != nil {
return err
}
// TODO: Review gas params
builtTx, err := celoProvider.SignGasTransferTx(
system.PrivateKey,
celo.GasTransferTxOpts{
To: w3.A(p.PublicKey),
Nonce: nonce,
Value: system.GiftableGasValue,
GasPrice: celo.FixedMinGas,
},
)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
id, err := pg.CreateOTX(ctx, store.OTX{
TrackingId: p.TrackingId,
Type: "GIFT_GAS",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: system.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
_, err = taskerClient.CreateTask(
tasker.TxDispatchTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
eventPayload := &accountEventPayload{
TrackingId: p.TrackingId,
}
eventJson, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.Publish("CUSTODIAL.giftNewAccountGas", eventJson)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
return nil
}
}
func GiftTokenProcessor(
celoProvider *celo.Provider,
lockProvider *redislock.Client,
noncestore nonce.Noncestore,
pg store.Store,
system *tasker.SystemContainer,
taskerClient *tasker.TaskerClient,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
p AccountPayload
)
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
lock, err := lockProvider.Obtain(ctx, system.LockPrefix+system.PublicKey, system.LockTimeout, nil)
if err != nil {
return err
}
defer lock.Release(ctx)
nonce, err := noncestore.Acquire(ctx, system.PublicKey)
if err != nil {
return err
}
input, err := system.Abis["mintTo"].EncodeArgs(w3.A(p.PublicKey), system.GiftableTokenValue)
if err != nil {
return err
}
// TODO: Review gas params.
builtTx, err := celoProvider.SignContractExecutionTx(
system.PrivateKey,
celo.ContractExecutionTxOpts{
ContractAddress: system.GiftableToken,
InputData: input,
GasPrice: big.NewInt(20000000000),
GasLimit: system.TokenTransferGasLimit,
Nonce: nonce,
},
)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
id, err := pg.CreateOTX(ctx, store.OTX{
TrackingId: p.TrackingId,
Type: "GIFT_VOUCHER",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: system.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
_, err = taskerClient.CreateTask(
tasker.TxDispatchTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
eventPayload := &accountEventPayload{
TrackingId: p.TrackingId,
}
eventJson, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.Publish("CUSTODIAL.giftNewAccountVoucher", eventJson)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
return nil
}
}
// TODO: https://github.com/grassrootseconomics/cic-custodial/issues/43
// TODO:
func RefillGasProcessor(
celoProvider *celo.Provider,
nonceProvider nonce.Noncestore,
lockProvider *redislock.Client,
system *tasker.SystemContainer,
taskerClient *tasker.TaskerClient,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
p AccountPayload
balance big.Int
)
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
if err := celoProvider.Client.CallCtx(
ctx,
eth.Balance(w3.A(p.PublicKey), nil).Returns(&balance),
); err != nil {
return err
}
if belowThreshold := balance.Cmp(system.GasRefillThreshold); belowThreshold > 0 {
return nil
}
lock, err := lockProvider.Obtain(ctx, system.LockPrefix+system.PublicKey, system.LockTimeout, nil)
if err != nil {
return err
}
defer lock.Release(ctx)
nonce, err := nonceProvider.Acquire(ctx, system.PublicKey)
if err != nil {
return err
}
builtTx, err := celoProvider.SignGasTransferTx(
system.PrivateKey,
celo.GasTransferTxOpts{
To: w3.A(p.PublicKey),
Nonce: nonce,
Value: system.GasRefillValue,
GasPrice: celo.FixedMinGas,
},
)
if err != nil {
if err := nonceProvider.Return(ctx, p.PublicKey); err != nil {
return err
}
return fmt.Errorf("nonce.Return failed: %v: %w", err, asynq.SkipRetry)
}
disptachJobPayload, err := json.Marshal(TxPayload{
Tx: builtTx,
})
if err != nil {
return fmt.Errorf("json.Marshal failed: %v: %w", err, asynq.SkipRetry)
}
_, err = taskerClient.CreateTask(
tasker.TxDispatchTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
return err
}
return nil
}
}

View File

@ -0,0 +1,119 @@
package task
import (
"context"
"encoding/json"
"fmt"
"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/hibiken/asynq"
)
func AccountGiftGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
payload AccountPayload
)
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry)
}
lock, err := cu.LockProvider.Obtain(
ctx,
cu.SystemContainer.LockPrefix+cu.SystemContainer.PublicKey,
cu.SystemContainer.LockTimeout,
nil,
)
if err != nil {
return err
}
defer lock.Release(ctx)
nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey)
if err != nil {
return err
}
defer func() {
if err != nil {
if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil {
err = nErr
}
}
}()
builtTx, err := cu.CeloProvider.SignGasTransferTx(
cu.SystemContainer.PrivateKey,
celoutils.GasTransferTxOpts{
To: w3.A(payload.PublicKey),
Nonce: nonce,
Value: cu.SystemContainer.GiftableGasValue,
GasFeeCap: celoutils.SafeGasFeeCap,
GasTipCap: celoutils.SafeGasTipCap,
},
)
if err != nil {
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
return err
}
id, err := cu.PgStore.CreateOTX(ctx, store.OTX{
TrackingId: payload.TrackingId,
Type: "GIFT_GAS",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: cu.SystemContainer.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
return err
}
_, err = cu.TaskerClient.CreateTask(
tasker.DispatchTxTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
return err
}
eventPayload := &events.EventPayload{
OtxId: id,
TrackingId: payload.TrackingId,
TxHash: builtTx.Hash().Hex(),
}
if err := cu.EventEmitter.Publish(
events.AccountGiftGas,
builtTx.Hash().Hex(),
eventPayload,
); err != nil {
return err
}
return nil
}
}

View File

@ -0,0 +1,129 @@
package task
import (
"context"
"encoding/json"
"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/hibiken/asynq"
)
func GiftVoucherProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
payload AccountPayload
)
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return err
}
lock, err := cu.LockProvider.Obtain(
ctx,
cu.SystemContainer.LockPrefix+cu.SystemContainer.PublicKey,
cu.SystemContainer.LockTimeout,
nil,
)
if err != nil {
return err
}
defer lock.Release(ctx)
nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey)
if err != nil {
return err
}
defer func() {
if err != nil {
if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil {
err = nErr
}
}
}()
input, err := cu.SystemContainer.Abis["mintTo"].EncodeArgs(
w3.A(payload.PublicKey),
cu.SystemContainer.GiftableTokenValue,
)
if err != nil {
return err
}
builtTx, err := cu.CeloProvider.SignContractExecutionTx(
cu.SystemContainer.PrivateKey,
celoutils.ContractExecutionTxOpts{
ContractAddress: cu.SystemContainer.GiftableToken,
InputData: input,
GasFeeCap: celoutils.SafeGasFeeCap,
GasTipCap: celoutils.SafeGasTipCap,
GasLimit: cu.SystemContainer.TokenTransferGasLimit,
Nonce: nonce,
},
)
if err != nil {
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
return err
}
id, err := cu.PgStore.CreateOTX(ctx, store.OTX{
TrackingId: payload.TrackingId,
Type: "GIFT_VOUCHER",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: cu.SystemContainer.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
return err
}
_, err = cu.TaskerClient.CreateTask(
tasker.DispatchTxTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
return err
}
eventPayload := &events.EventPayload{
OtxId: id,
TrackingId: payload.TrackingId,
TxHash: builtTx.Hash().Hex(),
}
if err := cu.EventEmitter.Publish(
events.AccountGiftVoucher,
builtTx.Hash().Hex(),
eventPayload,
); err != nil {
return err
}
return nil
}
}

View File

@ -0,0 +1,78 @@
package task
import (
"context"
"encoding/json"
"fmt"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/hibiken/asynq"
)
type AccountPayload struct {
PublicKey string `json:"publicKey"`
TrackingId string `json:"trackingId"`
}
func AccountPrepare(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var payload AccountPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry)
}
if err := cu.Noncestore.SetNewAccountNonce(ctx, payload.PublicKey); err != nil {
return err
}
_, err := cu.TaskerClient.CreateTask(
tasker.AccountRegisterTask,
tasker.DefaultPriority,
&tasker.Task{
Payload: t.Payload(),
},
)
if err != nil {
return err
}
_, err = cu.TaskerClient.CreateTask(
tasker.AccountGiftGasTask,
tasker.DefaultPriority,
&tasker.Task{
Payload: t.Payload(),
},
)
if err != nil {
return err
}
_, err = cu.TaskerClient.CreateTask(
tasker.AccountGiftVoucherTask,
tasker.DefaultPriority,
&tasker.Task{
Payload: t.Payload(),
},
)
if err != nil {
return err
}
eventPayload := events.EventPayload{
TrackingId: payload.TrackingId,
}
if err := cu.EventEmitter.Publish(
events.AccountNewNonce,
payload.PublicKey,
eventPayload,
); err != nil {
return err
}
return nil
}
}

View File

@ -0,0 +1,134 @@
package task
import (
"context"
"encoding/json"
"fmt"
"math/big"
"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/grassrootseconomics/w3-celo-patch/module/eth"
"github.com/hibiken/asynq"
)
func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
payload AccountPayload
balance big.Int
)
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry)
}
if err := cu.CeloProvider.Client.CallCtx(
ctx,
eth.Balance(w3.A(payload.PublicKey), nil).Returns(&balance),
); err != nil {
return err
}
if belowThreshold := balance.Cmp(cu.SystemContainer.GasRefillThreshold); belowThreshold > 0 {
return nil
}
lock, err := cu.LockProvider.Obtain(
ctx,
cu.SystemContainer.LockPrefix+cu.SystemContainer.PublicKey,
cu.SystemContainer.LockTimeout,
nil,
)
if err != nil {
return err
}
defer lock.Release(ctx)
nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey)
if err != nil {
return err
}
defer func() {
if err != nil {
if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil {
err = nErr
}
}
}()
// TODO: Review gas params
builtTx, err := cu.CeloProvider.SignGasTransferTx(
cu.SystemContainer.PrivateKey,
celoutils.GasTransferTxOpts{
To: w3.A(payload.PublicKey),
Nonce: nonce,
Value: cu.SystemContainer.GiftableGasValue,
GasFeeCap: celoutils.SafeGasFeeCap,
GasTipCap: celoutils.SafeGasTipCap,
},
)
if err != nil {
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
return err
}
id, err := cu.PgStore.CreateOTX(ctx, store.OTX{
TrackingId: payload.TrackingId,
Type: "REFILL_GAS",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: cu.SystemContainer.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
return err
}
_, err = cu.TaskerClient.CreateTask(
tasker.DispatchTxTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
return err
}
eventPayload := &events.EventPayload{
OtxId: id,
TrackingId: payload.TrackingId,
TxHash: builtTx.Hash().Hex(),
}
if err := cu.EventEmitter.Publish(
events.AccountRefillGas,
builtTx.Hash().Hex(),
eventPayload,
); err != nil {
return err
}
return nil
}
}

View File

@ -0,0 +1,128 @@
package task
import (
"context"
"encoding/json"
"fmt"
"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/hibiken/asynq"
)
func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
payload AccountPayload
)
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry)
}
lock, err := cu.LockProvider.Obtain(
ctx,
cu.SystemContainer.LockPrefix+cu.SystemContainer.PublicKey,
cu.SystemContainer.LockTimeout,
nil,
)
if err != nil {
return err
}
defer lock.Release(ctx)
nonce, err := cu.Noncestore.Acquire(ctx, cu.SystemContainer.PublicKey)
if err != nil {
return err
}
defer func() {
if err != nil {
if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil {
err = nErr
}
}
}()
input, err := cu.SystemContainer.Abis["add"].EncodeArgs(
w3.A(payload.PublicKey),
)
if err != nil {
return err
}
// TODO: Review gas params.
builtTx, err := cu.CeloProvider.SignContractExecutionTx(
cu.SystemContainer.PrivateKey,
celoutils.ContractExecutionTxOpts{
ContractAddress: cu.SystemContainer.AccountIndexContract,
InputData: input,
GasFeeCap: celoutils.SafeGasFeeCap,
GasTipCap: celoutils.SafeGasTipCap,
GasLimit: cu.SystemContainer.TokenTransferGasLimit,
Nonce: nonce,
},
)
if err != nil {
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
return err
}
id, err := cu.PgStore.CreateOTX(ctx, store.OTX{
TrackingId: payload.TrackingId,
Type: "ACCOUNT_REGISTER",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: cu.SystemContainer.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
return err
}
_, err = cu.TaskerClient.CreateTask(
tasker.DispatchTxTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
return err
}
eventPayload := &events.EventPayload{
OtxId: id,
TrackingId: payload.TrackingId,
TxHash: builtTx.Hash().Hex(),
}
if err := cu.EventEmitter.Publish(
events.AccountRegister,
builtTx.Hash().Hex(),
eventPayload,
); err != nil {
return err
}
return nil
}
}

View File

@ -1,108 +0,0 @@
package task
import (
"context"
"encoding/json"
"fmt"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
celo "github.com/grassrootseconomics/cic-celo-sdk"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/pkg/status"
"github.com/grassrootseconomics/w3-celo-patch/module/eth"
"github.com/hibiken/asynq"
"github.com/nats-io/nats.go"
)
type (
TxPayload struct {
OtxId uint `json:"otxId"`
Tx *types.Transaction `json:"tx"`
}
dispatchEventPayload struct {
OtxId uint
TxHash string
DispatchStatus status.Status
}
)
func TxDispatch(
celoProvider *celo.Provider,
pg store.Store,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
p TxPayload
txHash common.Hash
)
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
dispatchStatus := store.DispatchStatus{
OtxId: p.OtxId,
}
eventPayload := &dispatchEventPayload{
OtxId: p.OtxId,
}
if err := celoProvider.Client.CallCtx(
ctx,
eth.SendTx(p.Tx).Returns(&txHash),
); err != nil {
switch err.Error() {
case celo.ErrGasPriceLow:
dispatchStatus.Status = status.FailGasPrice
case celo.ErrInsufficientGas:
dispatchStatus.Status = status.FailInsufficientGas
case celo.ErrNonceLow:
dispatchStatus.Status = status.FailNonce
default:
dispatchStatus.Status = status.Unknown
}
_, err := pg.CreateDispatchStatus(ctx, dispatchStatus)
if err != nil {
return err
}
eventJson, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.Publish("CUSTODIAL.dispatchFail", eventJson, nats.MsgId(txHash.Hex()))
if err != nil {
return err
}
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
}
dispatchStatus.Status = status.Successful
_, err := pg.CreateDispatchStatus(ctx, dispatchStatus)
if err != nil {
return err
}
eventPayload.TxHash = txHash.Hex()
eventJson, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.Publish("CUSTODIAL.dispatchSuccess", eventJson, nats.MsgId(txHash.Hex()))
if err != nil {
return err
}
return nil
}
}

View File

@ -0,0 +1,80 @@
package task
import (
"context"
"encoding/json"
"fmt"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/pkg/status"
"github.com/grassrootseconomics/w3-celo-patch/module/eth"
"github.com/hibiken/asynq"
)
type TxPayload struct {
OtxId uint `json:"otxId"`
Tx *types.Transaction `json:"tx"`
}
func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
payload TxPayload
dispatchStatus store.DispatchStatus
eventPayload events.EventPayload
dispathchTx common.Hash
)
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
}
txHash := payload.Tx.Hash().Hex()
dispatchStatus.OtxId, eventPayload.OtxId = payload.OtxId, payload.OtxId
eventPayload.TxHash = txHash
if err := cu.CeloProvider.Client.CallCtx(
ctx,
eth.SendTx(payload.Tx).Returns(&dispathchTx),
); err != nil {
dispatchStatus.Status = status.Unknown
switch err.Error() {
case celoutils.ErrGasPriceLow:
dispatchStatus.Status = status.FailGasPrice
case celoutils.ErrInsufficientGas:
dispatchStatus.Status = status.FailInsufficientGas
case celoutils.ErrNonceLow:
dispatchStatus.Status = status.FailNonce
}
if err := cu.PgStore.CreateDispatchStatus(ctx, dispatchStatus); err != nil {
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
}
if err := cu.EventEmitter.Publish(events.DispatchFail, txHash, eventPayload); err != nil {
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
}
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
}
dispatchStatus.Status = status.Successful
if err := cu.PgStore.CreateDispatchStatus(ctx, dispatchStatus); err != nil {
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
}
if err := cu.EventEmitter.Publish(events.DispatchSuccess, txHash, eventPayload); err != nil {
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
}
return nil
}
}

View File

@ -1,182 +0,0 @@
package task
import (
"context"
"encoding/json"
"math/big"
"github.com/bsm/redislock"
"github.com/celo-org/celo-blockchain/common/hexutil"
celo "github.com/grassrootseconomics/cic-celo-sdk"
"github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/hibiken/asynq"
"github.com/nats-io/nats.go"
)
type (
TransferPayload struct {
TrackingId string `json:"trackingId"`
From string `json:"from" `
To string `json:"to"`
VoucherAddress string `json:"voucherAddress"`
Amount int64 `json:"amount"`
}
transferEventPayload struct {
DispatchTaskId string `json:"dispatchTaskId"`
OTXId uint `json:"otxId"`
TrackingId string `json:"trackingId"`
TxHash string `json:"txHash"`
}
)
func SignTransfer(
celoProvider *celo.Provider,
keystore keystore.Keystore,
lockProvider *redislock.Client,
noncestore nonce.Noncestore,
pg store.Store,
system *tasker.SystemContainer,
taskerClient *tasker.TaskerClient,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
p TransferPayload
)
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
lock, err := lockProvider.Obtain(
ctx,
system.LockPrefix+p.From,
system.LockTimeout,
nil,
)
if err != nil {
return err
}
defer lock.Release(ctx)
key, err := keystore.LoadPrivateKey(ctx, p.From)
if err != nil {
return err
}
nonce, err := noncestore.Acquire(ctx, p.From)
if err != nil {
return err
}
input, err := system.Abis["transfer"].EncodeArgs(w3.A(p.To), big.NewInt(p.Amount))
if err != nil {
return err
}
// TODO: Review gas params.
builtTx, err := celoProvider.SignContractExecutionTx(
key,
celo.ContractExecutionTxOpts{
ContractAddress: w3.A(p.VoucherAddress),
InputData: input,
GasPrice: big.NewInt(20000000000),
GasLimit: system.TokenTransferGasLimit,
Nonce: nonce,
},
)
if err != nil {
if err := noncestore.Return(ctx, p.From); err != nil {
return err
}
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
if err := noncestore.Return(ctx, p.From); err != nil {
return err
}
return err
}
id, err := pg.CreateOTX(ctx, store.OTX{
TrackingId: p.TrackingId,
Type: "TRANSFER",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: p.From,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
if err := noncestore.Return(ctx, p.From); err != nil {
return err
}
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
if err := noncestore.Return(ctx, p.From); err != nil {
return err
}
return err
}
dispatchTask, err := taskerClient.CreateTask(
tasker.TxDispatchTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
if err := noncestore.Return(ctx, p.From); err != nil {
return err
}
return err
}
eventPayload := &transferEventPayload{
DispatchTaskId: dispatchTask.ID,
OTXId: id,
TrackingId: p.TrackingId,
TxHash: builtTx.Hash().Hex(),
}
eventJson, err := json.Marshal(eventPayload)
if err != nil {
if err := noncestore.Return(ctx, p.From); err != nil {
return err
}
return err
}
_, err = js.Publish("CUSTODIAL.transferSign", eventJson, nats.MsgId(builtTx.Hash().Hex()))
if err != nil {
if err := noncestore.Return(ctx, p.From); err != nil {
return err
}
return err
}
return nil
}
}

View File

@ -0,0 +1,167 @@
package task
import (
"context"
"encoding/json"
"fmt"
"math/big"
"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/hibiken/asynq"
)
type (
TransferPayload struct {
TrackingId string `json:"trackingId"`
From string `json:"from" `
To string `json:"to"`
VoucherAddress string `json:"voucherAddress"`
Amount int64 `json:"amount"`
}
transferEventPayload struct {
DispatchTaskId string `json:"dispatchTaskId"`
OTXId uint `json:"otxId"`
TrackingId string `json:"trackingId"`
TxHash string `json:"txHash"`
}
)
func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
payload TransferPayload
)
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("account: failed %v: %w", err, asynq.SkipRetry)
}
lock, err := cu.LockProvider.Obtain(
ctx,
cu.SystemContainer.LockPrefix+payload.From,
cu.SystemContainer.LockTimeout,
nil,
)
if err != nil {
return err
}
defer lock.Release(ctx)
key, err := cu.Keystore.LoadPrivateKey(ctx, payload.From)
if err != nil {
return err
}
nonce, err := cu.Noncestore.Acquire(ctx, payload.From)
if err != nil {
return err
}
defer func() {
if err != nil {
if nErr := cu.Noncestore.Return(ctx, cu.SystemContainer.PublicKey); nErr != nil {
err = nErr
}
}
}()
input, err := cu.SystemContainer.Abis["transfer"].EncodeArgs(w3.A(payload.To), big.NewInt(payload.Amount))
if err != nil {
return err
}
// TODO: Review gas params.
builtTx, err := cu.CeloProvider.SignContractExecutionTx(
key,
celoutils.ContractExecutionTxOpts{
ContractAddress: w3.A(payload.VoucherAddress),
InputData: input,
GasFeeCap: celoutils.SafeGasFeeCap,
GasTipCap: celoutils.SafeGasTipCap,
GasLimit: cu.SystemContainer.TokenTransferGasLimit,
Nonce: nonce,
},
)
if err != nil {
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
return err
}
id, err := cu.PgStore.CreateOTX(ctx, store.OTX{
TrackingId: payload.TrackingId,
Type: "TRANSFER",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: payload.From,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
return err
}
_, err = cu.TaskerClient.CreateTask(
tasker.DispatchTxTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
return err
}
gasRefillPayload, err := json.Marshal(AccountPayload{
PublicKey: payload.From,
})
if err != nil {
return err
}
_, err = cu.TaskerClient.CreateTask(
tasker.AccountRefillGasTask,
tasker.DefaultPriority,
&tasker.Task{
Payload: gasRefillPayload,
},
)
if err != nil {
return err
}
eventPayload := &transferEventPayload{
OTXId: id,
TrackingId: payload.TrackingId,
TxHash: builtTx.Hash().Hex(),
}
if err := cu.EventEmitter.Publish(
events.SignTransfer,
builtTx.Hash().Hex(),
eventPayload,
); err != nil {
return err
}
return nil
}
}

View File

@ -38,15 +38,13 @@ type Task struct {
} }
const ( const (
PrepareAccountTask TaskName = "sys:prepare_account" AccountPrepareTask TaskName = "sys:prepare_account"
RegisterAccountOnChain TaskName = "sys:register_account" AccountRegisterTask TaskName = "sys:register_account"
GiftGasTask TaskName = "sys:gift_gas" AccountGiftGasTask TaskName = "sys:gift_gas"
GiftTokenTask TaskName = "sys:gift_token" AccountGiftVoucherTask TaskName = "sys:gift_token"
RefillGasTask TaskName = "admin:refill_gas" AccountRefillGasTask TaskName = "sys:refill_gas"
SweepGasTask TaskName = "admin:sweep_gas"
AdminTokenApprovalTask TaskName = "admin:token_approval"
SignTransferTask TaskName = "usr:sign_transfer" SignTransferTask TaskName = "usr:sign_transfer"
TxDispatchTask TaskName = "rpc:dispatch" DispatchTxTask TaskName = "rpc:dispatch"
) )
const ( const (

View File

@ -2,33 +2,32 @@ package logg
import "github.com/zerodha/logf" import "github.com/zerodha/logf"
type AsynqLogg struct { type AsynqLogger struct {
logg *logf.Logger Lo *logf.Logger
} }
// NewAsynqLogg creates a logf based logging adapter for asynq. func AsynqCompatibleLogger(lo logf.Logger) AsynqLogger {
func NewAsynqLogg(lo logf.Logger) AsynqLogg { return AsynqLogger{
return AsynqLogg{ Lo: &lo,
logg: &lo,
} }
} }
func (l AsynqLogg) Debug(args ...interface{}) { func (l AsynqLogger) Debug(args ...interface{}) {
l.logg.Debug("asynq", "debug", args[0]) l.Lo.Debug("asynq", "debug", args[0])
} }
func (l AsynqLogg) Info(args ...interface{}) { func (l AsynqLogger) Info(args ...interface{}) {
l.logg.Info("asynq", "info", args[0]) l.Lo.Info("asynq", "info", args[0])
} }
func (l AsynqLogg) Warn(args ...interface{}) { func (l AsynqLogger) Warn(args ...interface{}) {
l.logg.Warn("asynq", "warn", args[0]) l.Lo.Warn("asynq", "warn", args[0])
} }
func (l AsynqLogg) Error(args ...interface{}) { func (l AsynqLogger) Error(args ...interface{}) {
l.logg.Error("asynq", "error", args[0]) l.Lo.Error("asynq", "error", args[0])
} }
func (l AsynqLogg) Fatal(args ...interface{}) { func (l AsynqLogger) Fatal(args ...interface{}) {
l.logg.Fatal("asynq", "fatal", args[0]) l.Lo.Fatal("asynq", "fatal", args[0])
} }