diff --git a/cmd/service/init.go b/cmd/service/init.go index 3d936e3..a23c29d 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -9,6 +9,7 @@ import ( "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/queries" + "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/logg" "github.com/grassrootseconomics/cic-custodial/pkg/postgres" @@ -19,6 +20,7 @@ import ( "github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/file" + "github.com/nats-io/nats.go" "github.com/zerodha/logf" ) @@ -121,8 +123,8 @@ func initCommonRedisPool() (*redis.RedisPool, error) { return pool, nil } -// Load postgres based keystore -func initPostgresKeystore(postgresPool *pgxpool.Pool) (keystore.Keystore, error) { +// Load SQL statements into struct. +func initQueries(queriesPath string) (*queries.Queries, error) { parsedQueries, err := goyesql.ParseFile(queriesFlag) if err != nil { return nil, err @@ -133,9 +135,14 @@ func initPostgresKeystore(postgresPool *pgxpool.Pool) (keystore.Keystore, error) return nil, err } + return loadedQueries, nil +} + +// Load postgres based keystore. +func initPostgresKeystore(postgresPool *pgxpool.Pool, queries *queries.Queries) (keystore.Keystore, error) { keystore := keystore.NewPostgresKeytore(keystore.Opts{ PostgresPool: postgresPool, - Queries: loadedQueries, + Queries: queries, }) return keystore, nil @@ -161,3 +168,42 @@ func initTaskerClient(redisPool *redis.RedisPool) *tasker.TaskerClient { TaskRetention: time.Duration(ko.MustInt64("asynq.task_retention_hrs")) * time.Hour, }) } + +// Load Postgres store +func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) store.Store { + return store.NewPostgresStore(store.Opts{ + PostgresPool: postgresPool, + Queries: queries, + }) +} + +// Init JetStream context for tasker events. +func initJetStream() (nats.JetStreamContext, error) { + natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint")) + if err != nil { + return nil, err + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, err + } + + // Bootstrap stream if it does not exist + stream, _ := js.StreamInfo(ko.MustString("jetstream.stream_name")) + if stream == nil { + lo.Info("jetstream: bootstrapping stream") + _, err = js.AddStream(&nats.StreamConfig{ + Name: ko.MustString("jetstream.stream_name"), + MaxAge: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour, + Storage: nats.FileStorage, + Subjects: ko.MustStrings("jetstream.stream_subjects"), + Duplicates: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour, + }) + if err != nil { + return nil, err + } + } + + return js, nil +} diff --git a/cmd/service/main.go b/cmd/service/main.go index 4b8b6a8..9672430 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -13,6 +13,7 @@ import ( celo "github.com/grassrootseconomics/cic-celo-sdk" "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" + "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/knadh/koanf" "github.com/labstack/echo/v4" @@ -33,6 +34,7 @@ type custodial struct { keystore keystore.Keystore lockProvider *redislock.Client noncestore nonce.Noncestore + pgStore store.Store systemContainer *tasker.SystemContainer taskerClient *tasker.TaskerClient } @@ -62,6 +64,11 @@ func main() { lo.Fatal("main: critical error loading chain provider", "error", err) } + queries, err := initQueries(queriesFlag) + if err != nil { + lo.Fatal("main: critical error loading SQL queries", "error", err) + } + postgresPool, err := initPostgresPool() if err != nil { lo.Fatal("main: critical error connecting to postgres", "error", err) @@ -77,11 +84,12 @@ func main() { lo.Fatal("main: critical error connecting to common redis db", "error", err) } - postgresKeystore, err := initPostgresKeystore(postgresPool) + postgresKeystore, err := initPostgresKeystore(postgresPool, queries) if err != nil { lo.Fatal("main: critical error loading keystore") } + pgStore := initPostgresStore(postgresPool, queries) redisNoncestore := initRedisNoncestore(redisPool, celoProvider) lockProvider := initLockProvider(redisPool.Client) taskerClient := initTaskerClient(asynqRedisPool) @@ -96,6 +104,7 @@ func main() { keystore: postgresKeystore, lockProvider: lockProvider, noncestore: redisNoncestore, + pgStore: pgStore, systemContainer: systemContainer, taskerClient: taskerClient, } diff --git a/cmd/service/tasker.go b/cmd/service/tasker.go index 52e4343..a3b97aa 100644 --- a/cmd/service/tasker.go +++ b/cmd/service/tasker.go @@ -10,6 +10,10 @@ import ( // Load tasker handlers, injecting any necessary handler dependencies from the system container. func initTasker(custodialContainer *custodial, redisPool *redis.RedisPool) *tasker.TaskerServer { lo.Debug("Bootstrapping tasker") + js, err := initJetStream() + if err != nil { + lo.Fatal("filters: critical error loading jetstream", "error", err) + } taskerServerOpts := tasker.TaskerServerOpts{ Concurrency: ko.MustInt("asynq.worker_count"), @@ -28,30 +32,40 @@ func initTasker(custodialContainer *custodial, redisPool *redis.RedisPool) *task taskerServer.RegisterHandlers(tasker.PrepareAccountTask, task.PrepareAccount( custodialContainer.noncestore, custodialContainer.taskerClient, + js, )) taskerServer.RegisterHandlers(tasker.GiftGasTask, task.GiftGasProcessor( custodialContainer.celoProvider, - custodialContainer.noncestore, custodialContainer.lockProvider, + custodialContainer.noncestore, + custodialContainer.pgStore, custodialContainer.systemContainer, custodialContainer.taskerClient, + js, )) taskerServer.RegisterHandlers(tasker.GiftTokenTask, task.GiftTokenProcessor( custodialContainer.celoProvider, - custodialContainer.noncestore, custodialContainer.lockProvider, + custodialContainer.noncestore, + custodialContainer.pgStore, custodialContainer.systemContainer, custodialContainer.taskerClient, + js, )) - taskerServer.RegisterHandlers(tasker.RefillGasTask, task.RefillGasProcessor( + taskerServer.RegisterHandlers(tasker.SignTransferTask, task.SignTransfer( custodialContainer.celoProvider, - custodialContainer.noncestore, + custodialContainer.keystore, custodialContainer.lockProvider, + custodialContainer.noncestore, + custodialContainer.pgStore, custodialContainer.systemContainer, custodialContainer.taskerClient, + js, )) taskerServer.RegisterHandlers(tasker.TxDispatchTask, task.TxDispatch( custodialContainer.celoProvider, + custodialContainer.pgStore, + js, )) return taskerServer diff --git a/internal/api/sign.go b/internal/api/sign.go index 6b587b0..aebbac6 100644 --- a/internal/api/sign.go +++ b/internal/api/sign.go @@ -50,7 +50,7 @@ func SignTransferHandler( } _, err = taskerClient.CreateTask( - tasker.TransferTokenTask, + tasker.SignTransferTask, tasker.HighPriority, &tasker.Task{ Id: transferRequest.TrackingId, diff --git a/internal/tasker/task/dispatch.go b/internal/tasker/task/dispatch.go index 08ca0d5..abe6ef1 100644 --- a/internal/tasker/task/dispatch.go +++ b/internal/tasker/task/dispatch.go @@ -29,8 +29,8 @@ type ( func TxDispatch( celoProvider *celo.Provider, - js nats.JetStreamContext, pg store.Store, + js nats.JetStreamContext, ) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { diff --git a/internal/tasker/task/sign.go b/internal/tasker/task/sign.go index 27d8de0..aa497a5 100644 --- a/internal/tasker/task/sign.go +++ b/internal/tasker/task/sign.go @@ -15,7 +15,6 @@ import ( "github.com/grassrootseconomics/w3-celo-patch" "github.com/hibiken/asynq" "github.com/nats-io/nats.go" - "github.com/zerodha/logf" ) type ( @@ -37,14 +36,13 @@ type ( func SignTransfer( celoProvider *celo.Provider, - js nats.JetStreamContext, keystore keystore.Keystore, lockProvider *redislock.Client, noncestore nonce.Noncestore, pg store.Store, system *tasker.SystemContainer, taskerClient *tasker.TaskerClient, - logger logf.Logger, + js nats.JetStreamContext, ) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { var ( diff --git a/internal/tasker/types.go b/internal/tasker/types.go index 9f39ae7..1ba08a7 100644 --- a/internal/tasker/types.go +++ b/internal/tasker/types.go @@ -42,7 +42,7 @@ const ( RefillGasTask TaskName = "admin:refill_gas" SweepGasTask TaskName = "admin:sweep_gas" AdminTokenApprovalTask TaskName = "admin:token_approval" - TransferTokenTask TaskName = "usr:transfer_token" + SignTransferTask TaskName = "usr:sign_transfer" TxDispatchTask TaskName = "rpc:dispatch" )