mirror of
https://github.com/GrassrootsEconomics/cic-dw.git
synced 2025-02-08 12:57:37 +01:00
refactor: syncer structure
- move syncer jobs to internal dir
This commit is contained in:
parent
9882a9ddc6
commit
29af3b5c21
@ -1,28 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/georgysavva/scany/pgxscan"
|
|
||||||
"github.com/hibiken/asynq"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
type tableCount struct {
|
|
||||||
Count int `db:"count"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func cacheSyncer(ctx context.Context, t *asynq.Task) error {
|
|
||||||
_, err := db.Exec(ctx, queries["cache-syncer"])
|
|
||||||
if err != nil {
|
|
||||||
return asynq.SkipRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
var count tableCount
|
|
||||||
if err := pgxscan.Get(ctx, db, &count, "SELECT COUNT(*) from transactions"); err != nil {
|
|
||||||
return asynq.SkipRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info().Msgf("=> %d transactions synced", count.Count)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
42
cmd/syncer.go
Normal file
42
cmd/syncer.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"cic-dw/internal/syncer"
|
||||||
|
"github.com/hibiken/asynq"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func bootstrapScheduler(redis asynq.RedisConnOpt) (*asynq.Scheduler, error) {
|
||||||
|
scheduler := asynq.NewScheduler(redis, nil)
|
||||||
|
|
||||||
|
for k, v := range conf.Syncers {
|
||||||
|
task := asynq.NewTask(k, nil)
|
||||||
|
|
||||||
|
_, err := scheduler.Register(v, task)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Msgf("successfully registered %s syncer", k)
|
||||||
|
}
|
||||||
|
|
||||||
|
return scheduler, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func bootstrapProcessor(redis asynq.RedisConnOpt) (*asynq.Server, *asynq.ServeMux) {
|
||||||
|
processorServer := asynq.NewServer(
|
||||||
|
redis,
|
||||||
|
asynq.Config{
|
||||||
|
Concurrency: 5,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
syncer := syncer.New(db, redis, cicnetClient, preparedQueries.core)
|
||||||
|
|
||||||
|
mux := asynq.NewServeMux()
|
||||||
|
mux.HandleFunc("token", syncer.TokenSyncer)
|
||||||
|
mux.HandleFunc("cache", syncer.CacheSyncer)
|
||||||
|
mux.HandleFunc("ussd", syncer.UssdSyncer)
|
||||||
|
|
||||||
|
return processorServer, mux
|
||||||
|
}
|
@ -1,71 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/georgysavva/scany/pgxscan"
|
|
||||||
"github.com/hibiken/asynq"
|
|
||||||
"github.com/jackc/pgx/v4"
|
|
||||||
"github.com/lmittmann/w3"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
"math/big"
|
|
||||||
"strconv"
|
|
||||||
)
|
|
||||||
|
|
||||||
type tokenCursor struct {
|
|
||||||
CursorPos string `db:"cursor_pos"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func tokenSyncer(ctx context.Context, t *asynq.Task) error {
|
|
||||||
var lastCursor tokenCursor
|
|
||||||
|
|
||||||
if err := pgxscan.Get(ctx, db, &lastCursor, queries["cursor-pos"], 3); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
latestChainIdx, err := cicnetClient.EntryCount(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
lastCursorPos, err := strconv.ParseInt(lastCursor.CursorPos, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
latestChainPos := latestChainIdx.Int64() - 1
|
|
||||||
log.Info().Msgf("=> %d tokens synced", lastCursorPos)
|
|
||||||
if latestChainPos >= lastCursorPos {
|
|
||||||
batch := &pgx.Batch{}
|
|
||||||
|
|
||||||
for i := lastCursorPos; i <= latestChainPos; i++ {
|
|
||||||
nextTokenAddress, err := cicnetClient.AddressAtIndex(ctx, big.NewInt(i))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
tokenInfo, err := cicnetClient.TokenInfo(ctx, w3.A(nextTokenAddress))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
batch.Queue(queries["insert-token-data"], nextTokenAddress[2:], tokenInfo.Name, tokenInfo.Symbol, tokenInfo.Decimals.Int64())
|
|
||||||
}
|
|
||||||
|
|
||||||
res := db.SendBatch(ctx, batch)
|
|
||||||
for i := 0; i < batch.Len(); i++ {
|
|
||||||
_, err := res.Exec()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err := res.Close()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = db.Exec(ctx, queries["update-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10), 3)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,24 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/georgysavva/scany/pgxscan"
|
|
||||||
"github.com/hibiken/asynq"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
func ussdSyncer(ctx context.Context, t *asynq.Task) error {
|
|
||||||
_, err := db.Exec(ctx, queries["ussd-syncer"])
|
|
||||||
if err != nil {
|
|
||||||
return asynq.SkipRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
var count tableCount
|
|
||||||
if err := pgxscan.Get(ctx, db, &count, "SELECT COUNT(*) from users"); err != nil {
|
|
||||||
return asynq.SkipRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info().Msgf("=> %d users synced", count.Count)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
30
internal/syncer/cache.go
Normal file
30
internal/syncer/cache.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package syncer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/georgysavva/scany/pgxscan"
|
||||||
|
"github.com/hibiken/asynq"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type tableCount struct {
|
||||||
|
Count int `db:"count"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Syncer) CacheSyncer(ctx context.Context, t *asynq.Task) error {
|
||||||
|
_, err := s.db.Exec(ctx, s.queries["cache-syncer"])
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("cache syncer task failed")
|
||||||
|
return asynq.SkipRetry
|
||||||
|
}
|
||||||
|
|
||||||
|
var table tableCount
|
||||||
|
if err := pgxscan.Get(ctx, s.db, &table, "SELECT COUNT(*) from transactions"); err != nil {
|
||||||
|
log.Err(err).Msg("cache syncer task failed")
|
||||||
|
return asynq.SkipRetry
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Msgf("=> %d transactions synced", table.Count)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
24
internal/syncer/syncer.go
Normal file
24
internal/syncer/syncer.go
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
package syncer
|
||||||
|
|
||||||
|
import (
|
||||||
|
cic_net "github.com/grassrootseconomics/cic-go/net"
|
||||||
|
"github.com/hibiken/asynq"
|
||||||
|
"github.com/jackc/pgx/v4/pgxpool"
|
||||||
|
"github.com/nleof/goyesql"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Syncer struct {
|
||||||
|
db *pgxpool.Pool
|
||||||
|
rClient asynq.RedisConnOpt
|
||||||
|
cicnetClient *cic_net.CicNet
|
||||||
|
queries goyesql.Queries
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(db *pgxpool.Pool, rClient asynq.RedisConnOpt, cicnetClient *cic_net.CicNet, queries goyesql.Queries) *Syncer {
|
||||||
|
return &Syncer{
|
||||||
|
db: db,
|
||||||
|
rClient: rClient,
|
||||||
|
cicnetClient: cicnetClient,
|
||||||
|
queries: queries,
|
||||||
|
}
|
||||||
|
}
|
78
internal/syncer/token.go
Normal file
78
internal/syncer/token.go
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
package syncer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/georgysavva/scany/pgxscan"
|
||||||
|
"github.com/hibiken/asynq"
|
||||||
|
"github.com/jackc/pgx/v4"
|
||||||
|
"github.com/lmittmann/w3"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
"math/big"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
type tokenCursor struct {
|
||||||
|
cursorPos string `db:"cursor_pos"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Syncer) TokenSyncer(ctx context.Context, t *asynq.Task) error {
|
||||||
|
var lastCursor tokenCursor
|
||||||
|
|
||||||
|
if err := pgxscan.Get(ctx, s.db, &lastCursor, s.queries["cursor-pos"], 3); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
latestChainIdx, err := s.cicnetClient.EntryCount(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("token syncer task failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
lastCursorPos, err := strconv.ParseInt(lastCursor.cursorPos, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("token syncer task failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
latestChainPos := latestChainIdx.Int64() - 1
|
||||||
|
log.Info().Msgf("=> %d tokens synced", lastCursorPos)
|
||||||
|
if latestChainPos >= lastCursorPos {
|
||||||
|
batch := &pgx.Batch{}
|
||||||
|
|
||||||
|
for i := lastCursorPos; i <= latestChainPos; i++ {
|
||||||
|
nextTokenAddress, err := s.cicnetClient.AddressAtIndex(ctx, big.NewInt(i))
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("token syncer task failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tokenInfo, err := s.cicnetClient.ERC20TokenInfo(ctx, w3.A(nextTokenAddress))
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("token syncer task failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
batch.Queue(s.queries["insert-token-data"], nextTokenAddress[2:], tokenInfo.Name, tokenInfo.Symbol, tokenInfo.Decimals.Int64())
|
||||||
|
}
|
||||||
|
|
||||||
|
res := s.db.SendBatch(ctx, batch)
|
||||||
|
for i := 0; i < batch.Len(); i++ {
|
||||||
|
_, err := res.Exec()
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("token syncer task failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := res.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("token syncer task failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.db.Exec(ctx, s.queries["update-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10), 3)
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("token syncer task failed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
26
internal/syncer/ussd.go
Normal file
26
internal/syncer/ussd.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package syncer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/georgysavva/scany/pgxscan"
|
||||||
|
"github.com/hibiken/asynq"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Syncer) UssdSyncer(ctx context.Context, t *asynq.Task) error {
|
||||||
|
_, err := s.db.Exec(ctx, s.queries["ussd-syncer"])
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("ussd syncer task failed")
|
||||||
|
return asynq.SkipRetry
|
||||||
|
}
|
||||||
|
|
||||||
|
var table tableCount
|
||||||
|
if err := pgxscan.Get(ctx, s.db, &table, "SELECT COUNT(*) from users"); err != nil {
|
||||||
|
log.Err(err).Msg("ussd syncer task failed")
|
||||||
|
return asynq.SkipRetry
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Msgf("=> %d users synced", table.Count)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user