mirror of
				https://github.com/grassrootseconomics/cic-custodial.git
				synced 2025-11-04 10:48:24 +01:00 
			
		
		
		
	feat: add service mode flags, other refactors
* closes #32 * bin can now run in full (default), tasker, api mode * move logg bootstrpping earlier * config file loading via flag * rename service to main * graceful shutdown now in main and dependant on mode
This commit is contained in:
		
							parent
							
								
									4b180ea285
								
							
						
					
					
						commit
						611addbf64
					
				@ -10,11 +10,13 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func initApiServer() *echo.Echo {
 | 
			
		||||
	lo.Debug("bootstrapping api server")
 | 
			
		||||
	server := echo.New()
 | 
			
		||||
	server.HideBanner = true
 | 
			
		||||
	server.HidePort = true
 | 
			
		||||
 | 
			
		||||
	if ko.Bool("service.statsviz_debug") {
 | 
			
		||||
		lo.Debug("Starting stats_viz at /debug/statsviz")
 | 
			
		||||
		statsVizMux := http.NewServeMux()
 | 
			
		||||
		_ = statsviz.Register(statsVizMux)
 | 
			
		||||
		server.GET("/debug/statsviz/", echo.WrapHandler(statsVizMux))
 | 
			
		||||
@ -36,5 +38,6 @@ func initApiServer() *echo.Echo {
 | 
			
		||||
		taskerClient,
 | 
			
		||||
	))
 | 
			
		||||
 | 
			
		||||
	lo.Debug("Registered all api handlers")
 | 
			
		||||
	return server
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,7 +1,6 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"log"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@ -28,25 +27,30 @@ func initConfig(configFilePath string) *koanf.Koanf {
 | 
			
		||||
 | 
			
		||||
	confFile := file.Provider(configFilePath)
 | 
			
		||||
	if err := ko.Load(confFile, toml.Parser()); err != nil {
 | 
			
		||||
		log.Fatalf("could not load config file: %v", err)
 | 
			
		||||
		lo.Fatal("Could not load config file", "error", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := ko.Load(env.Provider("", ".", func(s string) string {
 | 
			
		||||
		return strings.ReplaceAll(strings.ToLower(
 | 
			
		||||
			strings.TrimPrefix(s, "")), "_", ".")
 | 
			
		||||
	}), nil); err != nil {
 | 
			
		||||
		log.Fatalf("could not override config from env vars: %v", err)
 | 
			
		||||
		lo.Fatal("Could not override config from env vars", "error", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return ko
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initLogger() logf.Logger {
 | 
			
		||||
	return logg.NewLogg(logg.LoggOpts{
 | 
			
		||||
		Debug:  ko.Bool("logg.debug"),
 | 
			
		||||
		Color:  ko.Bool("logg.color"),
 | 
			
		||||
		Caller: ko.Bool("logg.caller"),
 | 
			
		||||
	})
 | 
			
		||||
func initLogger(debug bool) logf.Logger {
 | 
			
		||||
	loggOpts := logg.LoggOpts{
 | 
			
		||||
		Color: true,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if debug {
 | 
			
		||||
		loggOpts.Caller = true
 | 
			
		||||
		loggOpts.Debug = true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return logg.NewLogg(loggOpts)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initCeloProvider() *celo.Provider {
 | 
			
		||||
 | 
			
		||||
@ -7,6 +7,8 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func initTasker() *tasker.TaskerServer {
 | 
			
		||||
	lo.Debug("Bootstrapping tasker")
 | 
			
		||||
 | 
			
		||||
	taskerServerOpts := tasker.TaskerServerOpts{
 | 
			
		||||
		Concurrency:     ko.MustInt("asynq.concurrency"),
 | 
			
		||||
		Logg:            lo,
 | 
			
		||||
@ -15,7 +17,7 @@ func initTasker() *tasker.TaskerServer {
 | 
			
		||||
		TaskerClient:    taskerClient,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if ko.Bool("asynq.debug") {
 | 
			
		||||
	if debugFlag {
 | 
			
		||||
		taskerServerOpts.LogLevel = asynq.DebugLevel
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -58,5 +60,6 @@ func initTasker() *tasker.TaskerServer {
 | 
			
		||||
		celoProvider,
 | 
			
		||||
	))
 | 
			
		||||
 | 
			
		||||
	lo.Debug("Registered all tasker handlers")
 | 
			
		||||
	return taskerServer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -2,6 +2,7 @@ package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"flag"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/signal"
 | 
			
		||||
	"strings"
 | 
			
		||||
@ -20,12 +21,12 @@ import (
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type service struct {
 | 
			
		||||
	taskerServer *tasker.TaskerServer
 | 
			
		||||
	apiServer    *echo.Echo
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	confFlag       string
 | 
			
		||||
	debugFlag      bool
 | 
			
		||||
	taskerModeFlag bool
 | 
			
		||||
	apiModeFlag    bool
 | 
			
		||||
 | 
			
		||||
	asynqRedisPool   *redis.RedisPool
 | 
			
		||||
	celoProvider     *celo.Provider
 | 
			
		||||
	commonRedisPool  *redis.RedisPool
 | 
			
		||||
@ -40,8 +41,14 @@ var (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	ko = initConfig("config.toml")
 | 
			
		||||
	lo = initLogger()
 | 
			
		||||
	flag.StringVar(&confFlag, "config", "config.toml", "Config file location")
 | 
			
		||||
	flag.BoolVar(&debugFlag, "log", false, "Enable debug logging")
 | 
			
		||||
	flag.BoolVar(&taskerModeFlag, "tasker", true, "Start tasker")
 | 
			
		||||
	flag.BoolVar(&apiModeFlag, "api", true, "Start API server")
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
 | 
			
		||||
	lo = initLogger(debugFlag)
 | 
			
		||||
	ko = initConfig(confFlag)
 | 
			
		||||
 | 
			
		||||
	celoProvider = initCeloProvider()
 | 
			
		||||
	postgresPool = initPostgresPool()
 | 
			
		||||
@ -55,56 +62,61 @@ func init() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	service := &service{}
 | 
			
		||||
	var (
 | 
			
		||||
		tasker    *tasker.TaskerServer
 | 
			
		||||
		apiServer *echo.Echo
 | 
			
		||||
		wg        sync.WaitGroup
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
 | 
			
		||||
	defer stop()
 | 
			
		||||
 | 
			
		||||
	service.taskerServer = initTasker()
 | 
			
		||||
	service.apiServer = initApiServer()
 | 
			
		||||
	if apiModeFlag {
 | 
			
		||||
		apiServer = initApiServer()
 | 
			
		||||
 | 
			
		||||
	startServices(service, ctx)
 | 
			
		||||
}
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
 | 
			
		||||
func startServices(serviceContainer *service, ctx context.Context) {
 | 
			
		||||
	wg := sync.WaitGroup{}
 | 
			
		||||
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer wg.Done()
 | 
			
		||||
 | 
			
		||||
		if err := serviceContainer.taskerServer.Start(); err != nil {
 | 
			
		||||
			lo.Fatal("Could not start task server", "err", err)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer wg.Done()
 | 
			
		||||
 | 
			
		||||
		if err := serviceContainer.apiServer.Start(ko.MustString("service.address")); err != nil {
 | 
			
		||||
			if strings.Contains(err.Error(), "Server closed") {
 | 
			
		||||
				lo.Info("Shutting down server")
 | 
			
		||||
			} else {
 | 
			
		||||
				lo.Fatal("Could not start api server", "err", err)
 | 
			
		||||
			lo.Info("Starting API server")
 | 
			
		||||
			if err := apiServer.Start(ko.MustString("service.address")); err != nil {
 | 
			
		||||
				if strings.Contains(err.Error(), "Server closed") {
 | 
			
		||||
					lo.Info("Shutting down server")
 | 
			
		||||
				} else {
 | 
			
		||||
					lo.Fatal("Could not start api server", "err", err)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	gracefulShutdown(serviceContainer, ctx)
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
	if taskerModeFlag {
 | 
			
		||||
		tasker = initTasker()
 | 
			
		||||
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
 | 
			
		||||
			lo.Info("Starting tasker")
 | 
			
		||||
			if err := tasker.Start(); err != nil {
 | 
			
		||||
				lo.Fatal("Could not start task server", "err", err)
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
func gracefulShutdown(serviceContainer *service, ctx context.Context) {
 | 
			
		||||
	<-ctx.Done()
 | 
			
		||||
	lo.Info("Graceful shutdown triggered")
 | 
			
		||||
 | 
			
		||||
	lo.Info("Stopping tasker dequeue")
 | 
			
		||||
	serviceContainer.taskerServer.Stop()
 | 
			
		||||
	lo.Info("Stopped tasker")
 | 
			
		||||
 | 
			
		||||
	if err := serviceContainer.apiServer.Shutdown(ctx); err != nil {
 | 
			
		||||
		lo.Error("Could not gracefully shutdown api server", "err", err)
 | 
			
		||||
	if taskerModeFlag {
 | 
			
		||||
		lo.Debug("Stopping tasker")
 | 
			
		||||
		tasker.Stop()
 | 
			
		||||
	}
 | 
			
		||||
	lo.Info("Stopped API server")
 | 
			
		||||
 | 
			
		||||
	if apiModeFlag {
 | 
			
		||||
		lo.Debug("Stopping api server")
 | 
			
		||||
		if err := apiServer.Shutdown(ctx); err != nil {
 | 
			
		||||
			lo.Error("Could not gracefully shutdown api server", "err", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
@ -14,11 +14,6 @@ public_key               = "0x80097c773B3E83472FC7952c5206a7DB35d42bEF"
 | 
			
		||||
token_decimals           = 18
 | 
			
		||||
token_transfer_gas_limit = 100000
 | 
			
		||||
 | 
			
		||||
[logg]
 | 
			
		||||
caller = true
 | 
			
		||||
color  = true
 | 
			
		||||
debug  = true
 | 
			
		||||
 | 
			
		||||
[chain]
 | 
			
		||||
rpc_endpoint = "https://alfajores-forno.celo-testnet.org"
 | 
			
		||||
testnet      = true
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user