update: deps initializers

This commit is contained in:
Mohamed Sohail 2023-02-09 14:23:37 +03:00
parent 8a0880fcfc
commit 773474cad9
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
7 changed files with 81 additions and 14 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/queries"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/pkg/logg"
"github.com/grassrootseconomics/cic-custodial/pkg/postgres"
@ -19,6 +20,7 @@ import (
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf"
)
@ -121,8 +123,8 @@ func initCommonRedisPool() (*redis.RedisPool, error) {
return pool, nil
}
// Load postgres based keystore
func initPostgresKeystore(postgresPool *pgxpool.Pool) (keystore.Keystore, error) {
// Load SQL statements into struct.
func initQueries(queriesPath string) (*queries.Queries, error) {
parsedQueries, err := goyesql.ParseFile(queriesFlag)
if err != nil {
return nil, err
@ -133,9 +135,14 @@ func initPostgresKeystore(postgresPool *pgxpool.Pool) (keystore.Keystore, error)
return nil, err
}
return loadedQueries, nil
}
// Load postgres based keystore.
func initPostgresKeystore(postgresPool *pgxpool.Pool, queries *queries.Queries) (keystore.Keystore, error) {
keystore := keystore.NewPostgresKeytore(keystore.Opts{
PostgresPool: postgresPool,
Queries: loadedQueries,
Queries: queries,
})
return keystore, nil
@ -161,3 +168,42 @@ func initTaskerClient(redisPool *redis.RedisPool) *tasker.TaskerClient {
TaskRetention: time.Duration(ko.MustInt64("asynq.task_retention_hrs")) * time.Hour,
})
}
// Load Postgres store
func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) store.Store {
return store.NewPostgresStore(store.Opts{
PostgresPool: postgresPool,
Queries: queries,
})
}
// Init JetStream context for tasker events.
func initJetStream() (nats.JetStreamContext, error) {
natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint"))
if err != nil {
return nil, err
}
js, err := natsConn.JetStream()
if err != nil {
return nil, err
}
// Bootstrap stream if it does not exist
stream, _ := js.StreamInfo(ko.MustString("jetstream.stream_name"))
if stream == nil {
lo.Info("jetstream: bootstrapping stream")
_, err = js.AddStream(&nats.StreamConfig{
Name: ko.MustString("jetstream.stream_name"),
MaxAge: time.Duration(ko.MustInt("jetstream.persist_duration_hours")) * time.Hour,
Storage: nats.FileStorage,
Subjects: ko.MustStrings("jetstream.stream_subjects"),
Duplicates: time.Duration(ko.MustInt("jetstream.dedup_duration_hours")) * time.Hour,
})
if err != nil {
return nil, err
}
}
return js, nil
}

View File

@ -13,6 +13,7 @@ import (
celo "github.com/grassrootseconomics/cic-celo-sdk"
"github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/knadh/koanf"
"github.com/labstack/echo/v4"
@ -33,6 +34,7 @@ type custodial struct {
keystore keystore.Keystore
lockProvider *redislock.Client
noncestore nonce.Noncestore
pgStore store.Store
systemContainer *tasker.SystemContainer
taskerClient *tasker.TaskerClient
}
@ -62,6 +64,11 @@ func main() {
lo.Fatal("main: critical error loading chain provider", "error", err)
}
queries, err := initQueries(queriesFlag)
if err != nil {
lo.Fatal("main: critical error loading SQL queries", "error", err)
}
postgresPool, err := initPostgresPool()
if err != nil {
lo.Fatal("main: critical error connecting to postgres", "error", err)
@ -77,11 +84,12 @@ func main() {
lo.Fatal("main: critical error connecting to common redis db", "error", err)
}
postgresKeystore, err := initPostgresKeystore(postgresPool)
postgresKeystore, err := initPostgresKeystore(postgresPool, queries)
if err != nil {
lo.Fatal("main: critical error loading keystore")
}
pgStore := initPostgresStore(postgresPool, queries)
redisNoncestore := initRedisNoncestore(redisPool, celoProvider)
lockProvider := initLockProvider(redisPool.Client)
taskerClient := initTaskerClient(asynqRedisPool)
@ -96,6 +104,7 @@ func main() {
keystore: postgresKeystore,
lockProvider: lockProvider,
noncestore: redisNoncestore,
pgStore: pgStore,
systemContainer: systemContainer,
taskerClient: taskerClient,
}

View File

@ -10,6 +10,10 @@ import (
// Load tasker handlers, injecting any necessary handler dependencies from the system container.
func initTasker(custodialContainer *custodial, redisPool *redis.RedisPool) *tasker.TaskerServer {
lo.Debug("Bootstrapping tasker")
js, err := initJetStream()
if err != nil {
lo.Fatal("filters: critical error loading jetstream", "error", err)
}
taskerServerOpts := tasker.TaskerServerOpts{
Concurrency: ko.MustInt("asynq.worker_count"),
@ -28,30 +32,40 @@ func initTasker(custodialContainer *custodial, redisPool *redis.RedisPool) *task
taskerServer.RegisterHandlers(tasker.PrepareAccountTask, task.PrepareAccount(
custodialContainer.noncestore,
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.GiftGasTask, task.GiftGasProcessor(
custodialContainer.celoProvider,
custodialContainer.noncestore,
custodialContainer.lockProvider,
custodialContainer.noncestore,
custodialContainer.pgStore,
custodialContainer.systemContainer,
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.GiftTokenTask, task.GiftTokenProcessor(
custodialContainer.celoProvider,
custodialContainer.noncestore,
custodialContainer.lockProvider,
custodialContainer.noncestore,
custodialContainer.pgStore,
custodialContainer.systemContainer,
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.RefillGasTask, task.RefillGasProcessor(
taskerServer.RegisterHandlers(tasker.SignTransferTask, task.SignTransfer(
custodialContainer.celoProvider,
custodialContainer.noncestore,
custodialContainer.keystore,
custodialContainer.lockProvider,
custodialContainer.noncestore,
custodialContainer.pgStore,
custodialContainer.systemContainer,
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.TxDispatchTask, task.TxDispatch(
custodialContainer.celoProvider,
custodialContainer.pgStore,
js,
))
return taskerServer

View File

@ -50,7 +50,7 @@ func SignTransferHandler(
}
_, err = taskerClient.CreateTask(
tasker.TransferTokenTask,
tasker.SignTransferTask,
tasker.HighPriority,
&tasker.Task{
Id: transferRequest.TrackingId,

View File

@ -29,8 +29,8 @@ type (
func TxDispatch(
celoProvider *celo.Provider,
js nats.JetStreamContext,
pg store.Store,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {

View File

@ -15,7 +15,6 @@ import (
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/hibiken/asynq"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf"
)
type (
@ -37,14 +36,13 @@ type (
func SignTransfer(
celoProvider *celo.Provider,
js nats.JetStreamContext,
keystore keystore.Keystore,
lockProvider *redislock.Client,
noncestore nonce.Noncestore,
pg store.Store,
system *tasker.SystemContainer,
taskerClient *tasker.TaskerClient,
logger logf.Logger,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (

View File

@ -42,7 +42,7 @@ const (
RefillGasTask TaskName = "admin:refill_gas"
SweepGasTask TaskName = "admin:sweep_gas"
AdminTokenApprovalTask TaskName = "admin:token_approval"
TransferTokenTask TaskName = "usr:transfer_token"
SignTransferTask TaskName = "usr:sign_transfer"
TxDispatchTask TaskName = "rpc:dispatch"
)