mirror of
https://github.com/GrassrootsEconomics/cic-dw.git
synced 2024-12-22 19:07:33 +01:00
Mohammed Sohail
37538c68ff
- see docs for pagination api usage - added control for syncer/api goroutines
124 lines
2.9 KiB
Go
124 lines
2.9 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"time"
|
|
|
|
batch_balance "github.com/grassrootseconomics/cic-go/batch_balance"
|
|
cic_net "github.com/grassrootseconomics/cic-go/net"
|
|
"github.com/grassrootseconomics/cic-go/provider"
|
|
"github.com/hibiken/asynq"
|
|
"github.com/jackc/pgx/v4/pgxpool"
|
|
"github.com/knadh/koanf"
|
|
"github.com/lmittmann/w3"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
"golang.org/x/sys/unix"
|
|
)
|
|
|
|
var (
|
|
k = koanf.New(".")
|
|
|
|
preparedQueries *queries
|
|
conf config
|
|
db *pgxpool.Pool
|
|
rpcProvider *provider.Provider
|
|
cicnetClient *cic_net.CicNet
|
|
batchBalance *batch_balance.BatchBalance
|
|
rClient asynq.RedisConnOpt
|
|
)
|
|
|
|
func init() {
|
|
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
|
|
|
|
if err := loadConfig("config.toml", k); err != nil {
|
|
log.Fatal().Err(err).Msg("failed to load config")
|
|
}
|
|
|
|
if err := loadQueries("queries"); err != nil {
|
|
log.Fatal().Err(err).Msg("failed to load sql file")
|
|
}
|
|
|
|
if err := connectDb(conf.Db.Postgres); err != nil {
|
|
log.Fatal().Err(err).Msg("failed to connect to postgres")
|
|
}
|
|
|
|
if err := loadProvider(conf.Chain.RpcProvider); err != nil {
|
|
log.Fatal().Err(err).Msg("failed to connect to rpc endpoint")
|
|
}
|
|
|
|
if err := loadCicNet(w3.A(conf.Chain.TokenRegistry)); err != nil {
|
|
log.Fatal().Err(err).Msg("failed to load cicnet")
|
|
}
|
|
|
|
if err := loadBatchBalance(w3.A(conf.Chain.BalanceResolver)); err != nil {
|
|
log.Fatal().Err(err).Msg("failed to load balance resolver")
|
|
}
|
|
|
|
if err := parseRedis(conf.Db.Redis); err != nil {
|
|
|
|
log.Fatal().Err(err).Msg("could not parse redis connection string")
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
scheduler, err := bootstrapScheduler(rClient)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("could not bootstrap scheduler")
|
|
}
|
|
|
|
processor, mux := bootstrapProcessor(rClient)
|
|
|
|
if conf.Syncer.Enabled {
|
|
go func() {
|
|
if err := scheduler.Run(); err != nil {
|
|
log.Fatal().Err(err).Msg("could not start scheduler")
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
if err := processor.Run(mux); err != nil {
|
|
log.Fatal().Err(err).Msg("failed to start job processor")
|
|
}
|
|
}()
|
|
}
|
|
|
|
server := initHTTPServer()
|
|
|
|
if conf.Api.Enabled {
|
|
go func() {
|
|
if err := server.Start(conf.Server.Address); err != nil {
|
|
if strings.Contains(err.Error(), "Server closed") {
|
|
log.Info().Msg("shutting down server")
|
|
} else {
|
|
log.Fatal().Err(err).Msg("could not start server")
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
|
|
for {
|
|
s := <-sigs
|
|
if s == unix.SIGTSTP {
|
|
processor.Stop()
|
|
scheduler.Shutdown()
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
processor.Shutdown()
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
err = server.Shutdown(ctx)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("could not shut down server")
|
|
}
|
|
log.Info().Msg("gracefully shutdown processor, scheduler and server")
|
|
}
|