cic-dw/cmd/main.go

127 lines
3.0 KiB
Go

package main
import (
"context"
"os"
"os/signal"
"strings"
"time"
batch_balance "github.com/grassrootseconomics/cic-go/batch_balance"
"github.com/grassrootseconomics/cic-go/meta"
cic_net "github.com/grassrootseconomics/cic-go/net"
"github.com/grassrootseconomics/cic-go/provider"
"github.com/hibiken/asynq"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/knadh/koanf"
"github.com/lmittmann/w3"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sys/unix"
)
var (
k = koanf.New(".")
preparedQueries *queries
conf config
db *pgxpool.Pool
rpcProvider *provider.Provider
cicnetClient *cic_net.CicNet
batchBalance *batch_balance.BatchBalance
metaClient *meta.CicMeta
rClient asynq.RedisConnOpt
)
func init() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
if err := loadConfig("config.toml", k); err != nil {
log.Fatal().Err(err).Msg("failed to load config")
}
if err := loadQueries("queries"); err != nil {
log.Fatal().Err(err).Msg("failed to load sql file")
}
if err := connectDb(conf.Db.Postgres); err != nil {
log.Fatal().Err(err).Msg("failed to connect to postgres")
}
if err := loadProvider(conf.Chain.RpcProvider); err != nil {
log.Fatal().Err(err).Msg("failed to connect to rpc endpoint")
}
if err := loadCicNet(w3.A(conf.Chain.TokenRegistry)); err != nil {
log.Fatal().Err(err).Msg("failed to load cicnet")
}
if err := loadBatchBalance(w3.A(conf.Chain.BalanceResolver)); err != nil {
log.Fatal().Err(err).Msg("failed to load balance resolver")
}
if err := parseRedis(conf.Db.Redis); err != nil {
log.Fatal().Err(err).Msg("could not parse redis connection string")
}
loadCicMeta(conf.Meta.Endpoint)
}
func main() {
scheduler, err := bootstrapScheduler(rClient)
if err != nil {
log.Fatal().Err(err).Msg("could not bootstrap scheduler")
}
processor, mux := bootstrapProcessor(rClient)
if conf.Syncer.Enabled {
go func() {
if err := scheduler.Run(); err != nil {
log.Fatal().Err(err).Msg("could not start scheduler")
}
}()
go func() {
if err := processor.Run(mux); err != nil {
log.Fatal().Err(err).Msg("failed to start job processor")
}
}()
}
server := initHTTPServer()
if conf.Api.Enabled {
go func() {
if err := server.Start(conf.Server.Address); err != nil {
if strings.Contains(err.Error(), "Server closed") {
log.Info().Msg("shutting down server")
} else {
log.Fatal().Err(err).Msg("could not start server")
}
}
}()
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for {
s := <-sigs
if s == unix.SIGTSTP {
processor.Stop()
scheduler.Shutdown()
continue
}
break
}
processor.Shutdown()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err = server.Shutdown(ctx)
if err != nil {
log.Fatal().Err(err).Msg("could not shut down server")
}
log.Info().Msg("gracefully shutdown processor, scheduler and server")
}