add: token syncer

core:
- add koanf for runtime config loading
- cicnet connection must dial else panic
- add db connection init
- add goyesql for convenient querying
- add async tasker processor (scheduler, processor)

dev:
- add redis server to dev docker-compose
- update volume to prune-able local
This commit is contained in:
2022-05-03 18:54:51 +03:00
parent d9f42005af
commit 05ab865c63
13 changed files with 630 additions and 63 deletions

View File

@@ -1 +1,55 @@
package main
import (
"context"
"github.com/hibiken/asynq"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file"
"github.com/nleof/goyesql"
"github.com/rs/zerolog/log"
"strings"
)
func loadConfig(configFilePath string, envOverridePrefix string, conf *koanf.Koanf) error {
// assumed to always be at the root folder
confFile := file.Provider(configFilePath)
if err := conf.Load(confFile, toml.Parser()); err != nil {
return err
}
// override with env variables
if err := conf.Load(env.Provider(envOverridePrefix, ".", func(s string) string {
return strings.ReplaceAll(strings.ToLower(
strings.TrimPrefix(s, envOverridePrefix)), "_", ".")
}), nil); err != nil {
return err
}
return nil
}
func connectDb(dsn string) *pgxpool.Pool {
conn, err := pgxpool.Connect(context.Background(), dsn)
if err != nil {
log.Fatal().Err(err).Msg("failed to connect to db")
}
return conn
}
func loadQueries(sqlFile string) goyesql.Queries {
q, err := goyesql.ParseFile(sqlFile)
if err != nil {
log.Fatal().Err(err).Msg("failed to parse sql queries")
}
return q
}
func connectQueue(dsn string) asynq.RedisClientOpt {
rClient := asynq.RedisClientOpt{Addr: dsn}
return rClient
}

View File

@@ -1 +1,64 @@
package main
import (
"cic-dw/pkg/cicnet"
"github.com/hibiken/asynq"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/knadh/koanf"
"github.com/lmittmann/w3"
"github.com/nleof/goyesql"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"os"
"sync"
)
type App struct {
db *pgxpool.Pool
queries goyesql.Queries
rClient asynq.RedisClientOpt
cicnetClient *cicnet.CicNet
sigChan chan os.Signal
}
const (
confEnvOverridePrefix = ""
)
var (
conf = koanf.New(".")
db *pgxpool.Pool
queries goyesql.Queries
redisConn asynq.RedisClientOpt
cicnetClient *cicnet.CicNet
)
func init() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
if err := loadConfig("config.toml", confEnvOverridePrefix, conf); err != nil {
log.Fatal().Err(err).Msg("failed to load config")
}
db = connectDb(conf.String("db.dsn"))
queries = loadQueries("queries.sql")
redisConn = connectQueue(conf.String("redis.dsn"))
cicnetClient = cicnet.NewCicNet(conf.String("chain.rpc"), w3.A(conf.String("chain.registry")))
}
func main() {
// TODO: Graceful shutdown of go routines (handle SIG INT/TERM)
var wg sync.WaitGroup
app := &App{
db: db,
queries: queries,
rClient: redisConn,
cicnetClient: cicnetClient,
}
wg.Add(2)
go runScheduler(app)
go runProcessor(app)
wg.Wait()
}

22
cmd/processor.go Normal file
View File

@@ -0,0 +1,22 @@
package main
import (
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
)
func runProcessor(app *App) {
processorServer := asynq.NewServer(
app.rClient,
asynq.Config{
Concurrency: 10,
},
)
mux := asynq.NewServeMux()
mux.Handle("token:sync", newTokenSyncer(app))
if err := processorServer.Run(mux); err != nil {
log.Fatal().Err(err).Msg("failed to start job processor")
}
}

24
cmd/scheduler.go Normal file
View File

@@ -0,0 +1,24 @@
package main
import (
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
)
var scheduler *asynq.Scheduler
func runScheduler(app *App) {
scheduler = asynq.NewScheduler(app.rClient, nil)
tokenTask := asynq.NewTask("token:sync", nil)
_, err := scheduler.Register(conf.String("schedule.token"), tokenTask)
if err != nil {
log.Fatal().Err(err).Msg("failed to register token syncer")
}
log.Info().Msg("successfully registered token syncer")
if err := scheduler.Run(); err != nil {
log.Fatal().Err(err).Msg("could not start asynq scheduler")
}
}

85
cmd/token_syncer.go Normal file
View File

@@ -0,0 +1,85 @@
package main
import (
"context"
"fmt"
"github.com/georgysavva/scany/pgxscan"
"github.com/hibiken/asynq"
"github.com/jackc/pgx/v4"
"github.com/lmittmann/w3"
"github.com/rs/zerolog/log"
"math/big"
"strconv"
)
type tokenSyncer struct {
app *App
}
type tokenCursor struct {
CursorPos string `db:"cursor_pos"`
}
func newTokenSyncer(app *App) *tokenSyncer {
return &tokenSyncer{
app: app,
}
}
func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
log.Info().Msgf("running task type: %s", t.Type())
var lastCursor tokenCursor
if err := pgxscan.Get(ctx, s.app.db, &lastCursor, s.app.queries["token-cursor-pos"]); err != nil {
return err
}
latestChainIdx, err := s.app.cicnetClient.EntryCount(ctx)
if err != nil {
return err
}
lastCursorPos, err := strconv.ParseInt(lastCursor.CursorPos, 10, 64)
if err != nil {
return err
}
latestChainPos := latestChainIdx.Int64() - 1
log.Info().Msgf("current db cursor: %s, latest chain pos: %d", lastCursor.CursorPos, latestChainPos)
if latestChainPos >= lastCursorPos {
batch := &pgx.Batch{}
for i := lastCursorPos; i <= latestChainPos; i++ {
nextTokenAddress, err := s.app.cicnetClient.AddressAtIndex(ctx, big.NewInt(i))
if err != nil {
return err
}
tokenInfo, err := s.app.cicnetClient.TokenInfo(ctx, w3.A(fmt.Sprintf("0x%s", nextTokenAddress)))
if err != nil {
return err
}
batch.Queue(s.app.queries["insert-token-data"], nextTokenAddress, tokenInfo.Name, tokenInfo.Symbol, tokenInfo.Decimals.Int64())
}
res := s.app.db.SendBatch(ctx, batch)
log.Info().Msgf("inserting %d new records", batch.Len())
for i := 0; i < batch.Len(); i++ {
_, err := res.Exec()
if err != nil {
return err
}
}
err := res.Close()
if err != nil {
return err
}
_, err = s.app.db.Exec(ctx, s.app.queries["update-token-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10))
if err != nil {
return err
}
}
return nil
}