mirror of
https://github.com/GrassrootsEconomics/cic-dw.git
synced 2026-05-27 06:57:56 +02:00
sohail/update deps structure (#5)
* refactor: syncer structure - move syncer jobs to internal dir * refactor: queries struct and pkg updates - update cic-go to latest - separate sql queries by logic * ci: add dependabot
This commit is contained in:
@@ -1,28 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/georgysavva/scany/pgxscan"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type tableCount struct {
|
||||
Count int `db:"count"`
|
||||
}
|
||||
|
||||
func cacheSyncer(ctx context.Context, t *asynq.Task) error {
|
||||
_, err := db.Exec(ctx, queries["cache-syncer"])
|
||||
if err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
var count tableCount
|
||||
if err := pgxscan.Get(ctx, db, &count, "SELECT COUNT(*) from transactions"); err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
log.Info().Msgf("=> %d transactions synced", count.Count)
|
||||
|
||||
return nil
|
||||
}
|
||||
55
cmd/init.go
55
cmd/init.go
@@ -2,8 +2,9 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/grassrootseconomics/cic_go/cic_net"
|
||||
cic_net "github.com/grassrootseconomics/cic-go/net"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/knadh/koanf"
|
||||
@@ -11,7 +12,6 @@ import (
|
||||
"github.com/knadh/koanf/providers/env"
|
||||
"github.com/knadh/koanf/providers/file"
|
||||
"github.com/nleof/goyesql"
|
||||
"github.com/rs/zerolog/log"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -27,6 +27,11 @@ type config struct {
|
||||
Syncers map[string]string `koanf:"syncers"`
|
||||
}
|
||||
|
||||
type queries struct {
|
||||
core goyesql.Queries
|
||||
dashboard goyesql.Queries
|
||||
}
|
||||
|
||||
func loadConfig(configFilePath string, k *koanf.Koanf) error {
|
||||
confFile := file.Provider(configFilePath)
|
||||
if err := k.Load(confFile, toml.Parser()); err != nil {
|
||||
@@ -77,45 +82,21 @@ func connectCicNet(rpcProvider string, tokenIndex common.Address) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadQueries(sqlFile string) error {
|
||||
var err error
|
||||
queries, err = goyesql.ParseFile(sqlFile)
|
||||
func loadQueries(sqlFilesPath string) error {
|
||||
coreQueries, err := goyesql.ParseFile(fmt.Sprintf("%s/core.sql", sqlFilesPath))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func bootstrapScheduler(redis asynq.RedisConnOpt) (*asynq.Scheduler, error) {
|
||||
scheduler := asynq.NewScheduler(redis, nil)
|
||||
|
||||
for k, v := range conf.Syncers {
|
||||
task := asynq.NewTask(k, nil)
|
||||
|
||||
_, err := scheduler.Register(v, task)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Info().Msgf("successfully registered %s syncer", k)
|
||||
dashboardQueries, err := goyesql.ParseFile(fmt.Sprintf("%s/dashboard.sql", sqlFilesPath))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return scheduler, nil
|
||||
}
|
||||
|
||||
func bootstrapProcessor(redis asynq.RedisConnOpt) (*asynq.Server, *asynq.ServeMux) {
|
||||
processorServer := asynq.NewServer(
|
||||
redis,
|
||||
asynq.Config{
|
||||
Concurrency: 5,
|
||||
},
|
||||
)
|
||||
|
||||
mux := asynq.NewServeMux()
|
||||
mux.HandleFunc("token", tokenSyncer)
|
||||
mux.HandleFunc("cache", cacheSyncer)
|
||||
mux.HandleFunc("ussd", ussdSyncer)
|
||||
|
||||
return processorServer, mux
|
||||
preparedQueries = &queries{
|
||||
core: coreQueries,
|
||||
dashboard: dashboardQueries,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
13
cmd/main.go
13
cmd/main.go
@@ -1,11 +1,10 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/grassrootseconomics/cic_go/cic_net"
|
||||
cic_net "github.com/grassrootseconomics/cic-go/net"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/knadh/koanf"
|
||||
"github.com/lmittmann/w3"
|
||||
"github.com/nleof/goyesql"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"golang.org/x/sys/unix"
|
||||
@@ -16,10 +15,10 @@ import (
|
||||
var (
|
||||
k = koanf.New(".")
|
||||
|
||||
queries goyesql.Queries
|
||||
conf config
|
||||
db *pgxpool.Pool
|
||||
cicnetClient *cic_net.CicNet
|
||||
preparedQueries *queries
|
||||
conf config
|
||||
db *pgxpool.Pool
|
||||
cicnetClient *cic_net.CicNet
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -29,7 +28,7 @@ func init() {
|
||||
log.Fatal().Err(err).Msg("failed to load config")
|
||||
}
|
||||
|
||||
if err := loadQueries("queries.sql"); err != nil {
|
||||
if err := loadQueries("queries"); err != nil {
|
||||
log.Fatal().Err(err).Msg("failed to load sql file")
|
||||
}
|
||||
|
||||
|
||||
42
cmd/syncer.go
Normal file
42
cmd/syncer.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"cic-dw/internal/syncer"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func bootstrapScheduler(redis asynq.RedisConnOpt) (*asynq.Scheduler, error) {
|
||||
scheduler := asynq.NewScheduler(redis, nil)
|
||||
|
||||
for k, v := range conf.Syncers {
|
||||
task := asynq.NewTask(k, nil)
|
||||
|
||||
_, err := scheduler.Register(v, task)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Info().Msgf("successfully registered %s syncer", k)
|
||||
}
|
||||
|
||||
return scheduler, nil
|
||||
}
|
||||
|
||||
func bootstrapProcessor(redis asynq.RedisConnOpt) (*asynq.Server, *asynq.ServeMux) {
|
||||
processorServer := asynq.NewServer(
|
||||
redis,
|
||||
asynq.Config{
|
||||
Concurrency: 5,
|
||||
},
|
||||
)
|
||||
|
||||
syncer := syncer.New(db, redis, cicnetClient, preparedQueries.core)
|
||||
|
||||
mux := asynq.NewServeMux()
|
||||
mux.HandleFunc("token", syncer.TokenSyncer)
|
||||
mux.HandleFunc("cache", syncer.CacheSyncer)
|
||||
mux.HandleFunc("ussd", syncer.UssdSyncer)
|
||||
|
||||
return processorServer, mux
|
||||
}
|
||||
@@ -1,71 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/georgysavva/scany/pgxscan"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/lmittmann/w3"
|
||||
"github.com/rs/zerolog/log"
|
||||
"math/big"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type tokenCursor struct {
|
||||
CursorPos string `db:"cursor_pos"`
|
||||
}
|
||||
|
||||
func tokenSyncer(ctx context.Context, t *asynq.Task) error {
|
||||
var lastCursor tokenCursor
|
||||
|
||||
if err := pgxscan.Get(ctx, db, &lastCursor, queries["cursor-pos"], 3); err != nil {
|
||||
return err
|
||||
}
|
||||
latestChainIdx, err := cicnetClient.EntryCount(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lastCursorPos, err := strconv.ParseInt(lastCursor.CursorPos, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
latestChainPos := latestChainIdx.Int64() - 1
|
||||
log.Info().Msgf("=> %d tokens synced", lastCursorPos)
|
||||
if latestChainPos >= lastCursorPos {
|
||||
batch := &pgx.Batch{}
|
||||
|
||||
for i := lastCursorPos; i <= latestChainPos; i++ {
|
||||
nextTokenAddress, err := cicnetClient.AddressAtIndex(ctx, big.NewInt(i))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tokenInfo, err := cicnetClient.TokenInfo(ctx, w3.A(nextTokenAddress))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
batch.Queue(queries["insert-token-data"], nextTokenAddress[2:], tokenInfo.Name, tokenInfo.Symbol, tokenInfo.Decimals.Int64())
|
||||
}
|
||||
|
||||
res := db.SendBatch(ctx, batch)
|
||||
for i := 0; i < batch.Len(); i++ {
|
||||
_, err := res.Exec()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := res.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = db.Exec(ctx, queries["update-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10), 3)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/georgysavva/scany/pgxscan"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func ussdSyncer(ctx context.Context, t *asynq.Task) error {
|
||||
_, err := db.Exec(ctx, queries["ussd-syncer"])
|
||||
if err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
var count tableCount
|
||||
if err := pgxscan.Get(ctx, db, &count, "SELECT COUNT(*) from users"); err != nil {
|
||||
return asynq.SkipRetry
|
||||
}
|
||||
|
||||
log.Info().Msgf("=> %d users synced", count.Count)
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user