refactor: syncer structure and async bootstrapping

- asynq bootstrap handlers
- graceful shutdown of goroutines
- remove unnecessary global App struct
- unmarhsal toml/env to koanf struct
This commit is contained in:
Mohamed Sohail 2022-05-05 15:01:34 +03:00
parent 4f868d8d94
commit 9c6310440c
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
8 changed files with 149 additions and 178 deletions

View File

@ -7,28 +7,18 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
type cacheSyncer struct {
app *App
}
type tableCount struct { type tableCount struct {
Count int `db:"count"` Count int `db:"count"`
} }
func newCacheSyncer(app *App) *cacheSyncer { func cacheSyncer(ctx context.Context, t *asynq.Task) error {
return &cacheSyncer{ _, err := db.Exec(ctx, queries["cache-syncer"])
app: app,
}
}
func (s *cacheSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
_, err := s.app.db.Exec(ctx, s.app.queries["cache-syncer"])
if err != nil { if err != nil {
return asynq.SkipRetry return asynq.SkipRetry
} }
var count tableCount var count tableCount
if err := pgxscan.Get(ctx, s.app.db, &count, "SELECT COUNT(*) from transactions"); err != nil { if err := pgxscan.Get(ctx, db, &count, "SELECT COUNT(*) from transactions"); err != nil {
return asynq.SkipRetry return asynq.SkipRetry
} }

View File

@ -1,7 +1,9 @@
package main package main
import ( import (
"cic-dw/pkg/cicnet"
"context" "context"
"github.com/ethereum/go-ethereum/common"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
"github.com/knadh/koanf" "github.com/knadh/koanf"
@ -13,44 +15,98 @@ import (
"strings" "strings"
) )
// TODO: Load into koanf struct type config struct {
func loadConfig(configFilePath string, envOverridePrefix string, conf *koanf.Koanf) error { Db struct {
// assumed to always be at the root folder Postgres string `koanf:"postgres"`
Redis string `koanf:"redis"`
}
Chain struct {
RpcProvider string `koanf:"rpc"`
TokenRegistry string `koanf:"index"`
}
Syncers map[string]string `koanf:"syncers"`
}
func loadConfig(configFilePath string, k *koanf.Koanf) error {
confFile := file.Provider(configFilePath) confFile := file.Provider(configFilePath)
if err := conf.Load(confFile, toml.Parser()); err != nil { if err := k.Load(confFile, toml.Parser()); err != nil {
return err return err
} }
// override with env variables if err := k.Load(env.Provider("", ".", func(s string) string {
if err := conf.Load(env.Provider(envOverridePrefix, ".", func(s string) string {
return strings.ReplaceAll(strings.ToLower( return strings.ReplaceAll(strings.ToLower(
strings.TrimPrefix(s, envOverridePrefix)), "_", ".") strings.TrimPrefix(s, "")), "_", ".")
}), nil); err != nil { }), nil); err != nil {
return err return err
} }
err := k.UnmarshalWithConf("", &conf, koanf.UnmarshalConf{Tag: "koanf"})
if err != nil {
return err
}
return nil return nil
} }
func connectDb(dsn string) *pgxpool.Pool { func connectDb(dsn string) error {
conn, err := pgxpool.Connect(context.Background(), dsn) var err error
db, err = pgxpool.Connect(context.Background(), dsn)
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("failed to connect to db") return err
} }
return conn return nil
} }
func loadQueries(sqlFile string) goyesql.Queries { func connectCicNet(rpcProvider string, tokenIndex common.Address) error {
q, err := goyesql.ParseFile(sqlFile) var err error
cicnetClient, err = cicnet.NewCicNet(rpcProvider, tokenIndex)
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("failed to parse sql queries") return err
} }
return q return nil
} }
func connectQueue(dsn string) asynq.RedisClientOpt { func loadQueries(sqlFile string) error {
rClient := asynq.RedisClientOpt{Addr: dsn} var err error
queries, err = goyesql.ParseFile(sqlFile)
if err != nil {
return err
}
return rClient return nil
}
func bootstrapScheduler(redis asynq.RedisClientOpt) (*asynq.Scheduler, error) {
scheduler := asynq.NewScheduler(redis, nil)
for k, v := range conf.Syncers {
task := asynq.NewTask(k, nil)
_, err := scheduler.Register(v, task)
if err != nil {
return nil, err
}
log.Info().Msgf("successfully registered %s syncer", k)
}
return scheduler, nil
}
func bootstrapProcessor(redis asynq.RedisClientOpt) (*asynq.Server, *asynq.ServeMux) {
processorServer := asynq.NewServer(
redis,
asynq.Config{
Concurrency: 5,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("token", tokenSyncer)
mux.HandleFunc("cache", cacheSyncer)
mux.HandleFunc("ussd", ussdSyncer)
return processorServer, mux
} }

View File

@ -9,56 +9,73 @@ import (
"github.com/nleof/goyesql" "github.com/nleof/goyesql"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"golang.org/x/sys/unix"
"os" "os"
"sync" "os/signal"
)
type App struct {
db *pgxpool.Pool
queries goyesql.Queries
rClient asynq.RedisClientOpt
cicnetClient *cicnet.CicNet
sigChan chan os.Signal
}
const (
confEnvOverridePrefix = ""
) )
var ( var (
conf = koanf.New(".") k = koanf.New(".")
db *pgxpool.Pool
rClient asynq.RedisClientOpt
queries goyesql.Queries queries goyesql.Queries
redisConn asynq.RedisClientOpt conf config
db *pgxpool.Pool
cicnetClient *cicnet.CicNet cicnetClient *cicnet.CicNet
) )
func init() { func init() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
if err := loadConfig("config.toml", confEnvOverridePrefix, conf); err != nil { if err := loadConfig("config.toml", k); err != nil {
log.Fatal().Err(err).Msg("failed to load config") log.Fatal().Err(err).Msg("failed to load config")
} }
db = connectDb(conf.String("db.dsn")) if err := loadQueries("queries.sql"); err != nil {
queries = loadQueries("queries.sql") log.Fatal().Err(err).Msg("failed to load sql file")
redisConn = connectQueue(conf.String("redis.dsn")) }
cicnetClient = cicnet.NewCicNet(conf.String("chain.rpc"), w3.A(conf.String("chain.registry")))
if err := connectDb(conf.Db.Postgres); err != nil {
log.Fatal().Err(err).Msg("failed to connect to postgres")
}
// TODO: Not core, should be handled by job processor
if err := connectCicNet(conf.Chain.RpcProvider, w3.A(conf.Chain.TokenRegistry)); err != nil {
log.Fatal().Err(err).Msg("failed to connect to postgres")
}
} }
func main() { func main() {
// TODO: Graceful shutdown of go routines (handle SIG INT/TERM) scheduler, err := bootstrapScheduler(rClient)
var wg sync.WaitGroup if err != nil {
log.Fatal().Err(err).Msg("could not bootstrap scheduler")
app := &App{
db: db,
queries: queries,
rClient: redisConn,
cicnetClient: cicnetClient,
} }
wg.Add(2) go func() {
go runScheduler(app) if err := scheduler.Run(); err != nil {
go runProcessor(app) log.Fatal().Err(err).Msg("could not start scheduler")
wg.Wait() }
}()
processor, mux := bootstrapProcessor(rClient)
go func() {
if err := processor.Run(mux); err != nil {
log.Fatal().Err(err).Msg("failed to start job processor")
}
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for {
s := <-sigs
if s == unix.SIGTSTP {
processor.Stop()
scheduler.Shutdown()
continue
}
break
}
processor.Shutdown()
log.Info().Msg("gracefully shutdown processor and scheduler")
} }

View File

@ -1,24 +0,0 @@
package main
import (
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
)
func runProcessor(app *App) {
processorServer := asynq.NewServer(
app.rClient,
asynq.Config{
Concurrency: 10,
},
)
mux := asynq.NewServeMux()
mux.Handle("token:sync", newTokenSyncer(app))
mux.Handle("cache:sync", newCacheSyncer(app))
mux.Handle("ussd:sync", newUssdSyncer(app))
if err := processorServer.Run(mux); err != nil {
log.Fatal().Err(err).Msg("failed to start job processor")
}
}

View File

@ -1,39 +0,0 @@
package main
import (
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
)
var scheduler *asynq.Scheduler
func runScheduler(app *App) {
scheduler = asynq.NewScheduler(app.rClient, nil)
// TODO: Refactor boilerplate and pull enabled tasks from koanf
tokenTask := asynq.NewTask("token:sync", nil)
cacheTask := asynq.NewTask("cache:sync", nil)
ussdTask := asynq.NewTask("ussd:sync", nil)
_, err := scheduler.Register(conf.String("token.schedule"), tokenTask)
if err != nil {
log.Fatal().Err(err).Msg("failed to register token syncer")
}
log.Info().Msg("successfully registered token syncer")
_, err = scheduler.Register(conf.String("cache.schedule"), cacheTask)
if err != nil {
log.Fatal().Err(err).Msg("failed to register cache syncer")
}
log.Info().Msg("successfully registered cache syncer")
_, err = scheduler.Register(conf.String("ussd.schedule"), ussdTask)
if err != nil {
log.Fatal().Err(err).Msg("failed to register ussd syncer")
}
log.Info().Msg("successfully registered ussd syncer")
if err := scheduler.Run(); err != nil {
log.Fatal().Err(err).Msg("could not start asynq scheduler")
}
}

View File

@ -2,7 +2,6 @@ package main
import ( import (
"context" "context"
"fmt"
"github.com/georgysavva/scany/pgxscan" "github.com/georgysavva/scany/pgxscan"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4"
@ -12,27 +11,17 @@ import (
"strconv" "strconv"
) )
type tokenSyncer struct {
app *App
}
type tokenCursor struct { type tokenCursor struct {
CursorPos string `db:"cursor_pos"` CursorPos string `db:"cursor_pos"`
} }
func newTokenSyncer(app *App) *tokenSyncer { func tokenSyncer(ctx context.Context, t *asynq.Task) error {
return &tokenSyncer{
app: app,
}
}
func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
var lastCursor tokenCursor var lastCursor tokenCursor
if err := pgxscan.Get(ctx, s.app.db, &lastCursor, s.app.queries["cursor-pos"], 3); err != nil { if err := pgxscan.Get(ctx, db, &lastCursor, queries["cursor-pos"], 3); err != nil {
return err return err
} }
latestChainIdx, err := s.app.cicnetClient.EntryCount(ctx) latestChainIdx, err := cicnetClient.EntryCount(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -48,20 +37,19 @@ func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
batch := &pgx.Batch{} batch := &pgx.Batch{}
for i := lastCursorPos; i <= latestChainPos; i++ { for i := lastCursorPos; i <= latestChainPos; i++ {
nextTokenAddress, err := s.app.cicnetClient.AddressAtIndex(ctx, big.NewInt(i)) nextTokenAddress, err := cicnetClient.AddressAtIndex(ctx, big.NewInt(i))
if err != nil {
return err
}
tokenInfo, err := cicnetClient.TokenInfo(ctx, w3.A(nextTokenAddress))
if err != nil { if err != nil {
return err return err
} }
tokenInfo, err := s.app.cicnetClient.TokenInfo(ctx, w3.A(fmt.Sprintf("0x%s", nextTokenAddress))) batch.Queue(queries["insert-token-data"], nextTokenAddress[2:], tokenInfo.Name, tokenInfo.Symbol, tokenInfo.Decimals.Int64())
if err != nil {
return err
}
batch.Queue(s.app.queries["insert-token-data"], nextTokenAddress, tokenInfo.Name, tokenInfo.Symbol, tokenInfo.Decimals.Int64())
} }
res := s.app.db.SendBatch(ctx, batch) res := db.SendBatch(ctx, batch)
for i := 0; i < batch.Len(); i++ { for i := 0; i < batch.Len(); i++ {
_, err := res.Exec() _, err := res.Exec()
if err != nil { if err != nil {
@ -73,7 +61,7 @@ func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
return err return err
} }
_, err = s.app.db.Exec(ctx, s.app.queries["update-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10), 3) _, err = db.Exec(ctx, queries["update-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10), 3)
if err != nil { if err != nil {
return err return err
} }

View File

@ -7,24 +7,14 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
type ussdSyncer struct { func ussdSyncer(ctx context.Context, t *asynq.Task) error {
app *App _, err := db.Exec(ctx, queries["ussd-syncer"])
}
func newUssdSyncer(app *App) *ussdSyncer {
return &ussdSyncer{
app: app,
}
}
func (s *ussdSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
_, err := s.app.db.Exec(ctx, s.app.queries["ussd-syncer"])
if err != nil { if err != nil {
return asynq.SkipRetry return asynq.SkipRetry
} }
var count tableCount var count tableCount
if err := pgxscan.Get(ctx, s.app.db, &count, "SELECT COUNT(*) from users"); err != nil { if err := pgxscan.Get(ctx, db, &count, "SELECT COUNT(*) from users"); err != nil {
return asynq.SkipRetry return asynq.SkipRetry
} }

View File

@ -1,19 +1,12 @@
[db] [db]
dsn = "postgresql://postgres:postgres@127.0.0.1:5432/cic_dw" postgres = "postgresql://postgres:postgres@127.0.0.1:5432/cic_dw"
redis = "127.0.0.1:6379"
[redis]
dsn = "127.0.0.1:6379"
[chain] [chain]
rpc = "http://127.0.0.1:8545" index = "0x5A1EB529438D8b3cA943A45a48744f4c73d1f098"
registry = "0x5A1EB529438D8b3cA943A45a48744f4c73d1f098" rpc = "http://127.0.0.1:8545"
# syncers [syncers]
[ussd] cache = "@every 20s"
schedule = "@every 15s" ussd = "@every 1m"
token = "@every 10s"
[cache]
schedule = "@every 15s"
[token]
schedule = "@every 15s"