farmstar-survey-backend/cmd/farmstar/init.go

161 lines
4.0 KiB
Go

package main
import (
"context"
"log/slog"
"os"
"strings"
"github.com/grassrootseconomics/farmstar-survey-backend/internal/worker"
"github.com/grassrootseconomics/farmstar-survey-backend/pkg/custodial"
"github.com/grassrootseconomics/farmstar-survey-backend/pkg/telegram"
"github.com/grassrootseconomics/farmstar-survey-backend/pkg/ussd"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/kamikazechaser/africastalking"
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/v2"
"github.com/pocketbase/pocketbase"
"github.com/pocketbase/pocketbase/plugins/migratecmd"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
func initLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelError,
}))
}
func initConfig() *koanf.Koanf {
var (
ko = koanf.New(".")
)
confFile := file.Provider(confFlag)
if err := ko.Load(confFile, toml.Parser()); err != nil {
lo.Error("could not parse configuration file", err)
os.Exit(1)
}
if err := ko.Load(env.Provider("FARMSTAR_", ".", func(s string) string {
return strings.ReplaceAll(strings.ToLower(
strings.TrimPrefix(s, "FARMSTAR_")), "__", ".")
}), nil); err != nil {
lo.Error("could not override config from env vars", err)
os.Exit(1)
}
return ko
}
func initPocketbase() *pocketbase.PocketBase {
app := pocketbase.New()
migratecmd.MustRegister(app, app.RootCmd, migratecmd.Config{
Automigrate: true,
})
return app
}
func initPostgres() *pgxpool.Pool {
parsedConfig, err := pgxpool.ParseConfig(ko.MustString("postgres.dsn"))
if err != nil {
lo.Error("could not parse postgres dsn", err)
os.Exit(1)
}
dbPool, err := pgxpool.NewWithConfig(context.Background(), parsedConfig)
if err != nil {
lo.Error("could not create pgxpool", err)
os.Exit(1)
}
return dbPool
}
func initUSSDClient() *ussd.USSDClient {
return ussd.New(ko.MustString("ussd.endpoint"))
}
func initCustodialClient() *custodial.CustodialClient {
return custodial.New(ko.MustString("custodial.endpoint"))
}
func initTelegramClient() *telegram.TelegramClient {
return telegram.New(ko.MustString("telegram.key"))
}
func initATClient() *africastalking.AtClient {
return africastalking.New(
ko.MustString("at.key"),
ko.MustString("at.username"),
false,
)
}
func initRiverQueueWWorker() *worker.Worker {
ctx := context.Background()
tx, err := postgresPool.Begin(ctx)
if err != nil {
lo.Error("could not begin pgx tx", err)
os.Exit(1)
}
defer tx.Rollback(ctx)
migrator := rivermigrate.New(riverpgxv5.New(postgresPool), nil)
_, err = migrator.MigrateTx(context.Background(), tx, rivermigrate.DirectionUp, nil)
if err != nil {
lo.Error("could not migrate river queue", err)
os.Exit(1)
}
if err := tx.Commit(ctx); err != nil {
lo.Error("could not commit pgx tx", err)
os.Exit(1)
}
workers := river.NewWorkers()
if err := river.AddWorkerSafely(workers, &worker.RewardsWorker{
Pocketbase: pocketbaseApp,
USSDClient: ussdClient,
CustodialClient: custodialClient,
VaultAddress: ko.MustString("rewards.vault"),
VoucherAddress: ko.MustString("rewards.voucher"),
}); err != nil {
lo.Error("could not bootstrap rewards worker", err)
os.Exit(1)
}
if err := river.AddWorkerSafely(workers, &worker.SMSWorker{
AtClient: atClient,
}); err != nil {
lo.Error("could not bootstrap SMS worker", err)
os.Exit(1)
}
if err := river.AddWorkerSafely(workers, &worker.TelegramWorker{
TgClient: tgClient,
ChatID: ko.MustString("telegram.group"),
}); err != nil {
lo.Error("could not bootstrap tg worker", err)
os.Exit(1)
}
riverClient, err := river.NewClient(riverpgxv5.New(postgresPool), &river.Config{
Logger: lo,
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 10},
},
Workers: workers,
})
if err != nil {
lo.Error("could not create a river client", err)
os.Exit(1)
}
return worker.NewWorker(riverClient)
}