diff --git a/cmd/cache_syncer.go b/cmd/cache_syncer.go deleted file mode 100644 index 954b1d6..0000000 --- a/cmd/cache_syncer.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "context" - "github.com/georgysavva/scany/pgxscan" - "github.com/hibiken/asynq" - "github.com/rs/zerolog/log" -) - -type tableCount struct { - Count int `db:"count"` -} - -func cacheSyncer(ctx context.Context, t *asynq.Task) error { - _, err := db.Exec(ctx, queries["cache-syncer"]) - if err != nil { - return asynq.SkipRetry - } - - var count tableCount - if err := pgxscan.Get(ctx, db, &count, "SELECT COUNT(*) from transactions"); err != nil { - return asynq.SkipRetry - } - - log.Info().Msgf("=> %d transactions synced", count.Count) - - return nil -} diff --git a/cmd/syncer.go b/cmd/syncer.go new file mode 100644 index 0000000..89502a3 --- /dev/null +++ b/cmd/syncer.go @@ -0,0 +1,42 @@ +package main + +import ( + "cic-dw/internal/syncer" + "github.com/hibiken/asynq" + "github.com/rs/zerolog/log" +) + +func bootstrapScheduler(redis asynq.RedisConnOpt) (*asynq.Scheduler, error) { + scheduler := asynq.NewScheduler(redis, nil) + + for k, v := range conf.Syncers { + task := asynq.NewTask(k, nil) + + _, err := scheduler.Register(v, task) + if err != nil { + return nil, err + } + + log.Info().Msgf("successfully registered %s syncer", k) + } + + return scheduler, nil +} + +func bootstrapProcessor(redis asynq.RedisConnOpt) (*asynq.Server, *asynq.ServeMux) { + processorServer := asynq.NewServer( + redis, + asynq.Config{ + Concurrency: 5, + }, + ) + + syncer := syncer.New(db, redis, cicnetClient, preparedQueries.core) + + mux := asynq.NewServeMux() + mux.HandleFunc("token", syncer.TokenSyncer) + mux.HandleFunc("cache", syncer.CacheSyncer) + mux.HandleFunc("ussd", syncer.UssdSyncer) + + return processorServer, mux +} diff --git a/cmd/token_syncer.go b/cmd/token_syncer.go deleted file mode 100644 index dfb3e20..0000000 --- a/cmd/token_syncer.go +++ /dev/null @@ -1,71 +0,0 @@ -package main - -import ( - "context" - "github.com/georgysavva/scany/pgxscan" - "github.com/hibiken/asynq" - "github.com/jackc/pgx/v4" - "github.com/lmittmann/w3" - "github.com/rs/zerolog/log" - "math/big" - "strconv" -) - -type tokenCursor struct { - CursorPos string `db:"cursor_pos"` -} - -func tokenSyncer(ctx context.Context, t *asynq.Task) error { - var lastCursor tokenCursor - - if err := pgxscan.Get(ctx, db, &lastCursor, queries["cursor-pos"], 3); err != nil { - return err - } - latestChainIdx, err := cicnetClient.EntryCount(ctx) - if err != nil { - return err - } - - lastCursorPos, err := strconv.ParseInt(lastCursor.CursorPos, 10, 64) - if err != nil { - return err - } - - latestChainPos := latestChainIdx.Int64() - 1 - log.Info().Msgf("=> %d tokens synced", lastCursorPos) - if latestChainPos >= lastCursorPos { - batch := &pgx.Batch{} - - for i := lastCursorPos; i <= latestChainPos; i++ { - nextTokenAddress, err := cicnetClient.AddressAtIndex(ctx, big.NewInt(i)) - if err != nil { - return err - } - tokenInfo, err := cicnetClient.TokenInfo(ctx, w3.A(nextTokenAddress)) - if err != nil { - return err - } - - batch.Queue(queries["insert-token-data"], nextTokenAddress[2:], tokenInfo.Name, tokenInfo.Symbol, tokenInfo.Decimals.Int64()) - } - - res := db.SendBatch(ctx, batch) - for i := 0; i < batch.Len(); i++ { - _, err := res.Exec() - if err != nil { - return err - } - } - err := res.Close() - if err != nil { - return err - } - - _, err = db.Exec(ctx, queries["update-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10), 3) - if err != nil { - return err - } - } - - return nil -} diff --git a/cmd/ussd_syncer.go b/cmd/ussd_syncer.go deleted file mode 100644 index b7f9b24..0000000 --- a/cmd/ussd_syncer.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -import ( - "context" - "github.com/georgysavva/scany/pgxscan" - "github.com/hibiken/asynq" - "github.com/rs/zerolog/log" -) - -func ussdSyncer(ctx context.Context, t *asynq.Task) error { - _, err := db.Exec(ctx, queries["ussd-syncer"]) - if err != nil { - return asynq.SkipRetry - } - - var count tableCount - if err := pgxscan.Get(ctx, db, &count, "SELECT COUNT(*) from users"); err != nil { - return asynq.SkipRetry - } - - log.Info().Msgf("=> %d users synced", count.Count) - - return nil -} diff --git a/internal/syncer/cache.go b/internal/syncer/cache.go new file mode 100644 index 0000000..6ed83ab --- /dev/null +++ b/internal/syncer/cache.go @@ -0,0 +1,30 @@ +package syncer + +import ( + "context" + "github.com/georgysavva/scany/pgxscan" + "github.com/hibiken/asynq" + "github.com/rs/zerolog/log" +) + +type tableCount struct { + Count int `db:"count"` +} + +func (s *Syncer) CacheSyncer(ctx context.Context, t *asynq.Task) error { + _, err := s.db.Exec(ctx, s.queries["cache-syncer"]) + if err != nil { + log.Err(err).Msg("cache syncer task failed") + return asynq.SkipRetry + } + + var table tableCount + if err := pgxscan.Get(ctx, s.db, &table, "SELECT COUNT(*) from transactions"); err != nil { + log.Err(err).Msg("cache syncer task failed") + return asynq.SkipRetry + } + + log.Info().Msgf("=> %d transactions synced", table.Count) + + return nil +} diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go new file mode 100644 index 0000000..970ab2d --- /dev/null +++ b/internal/syncer/syncer.go @@ -0,0 +1,24 @@ +package syncer + +import ( + cic_net "github.com/grassrootseconomics/cic-go/net" + "github.com/hibiken/asynq" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/nleof/goyesql" +) + +type Syncer struct { + db *pgxpool.Pool + rClient asynq.RedisConnOpt + cicnetClient *cic_net.CicNet + queries goyesql.Queries +} + +func New(db *pgxpool.Pool, rClient asynq.RedisConnOpt, cicnetClient *cic_net.CicNet, queries goyesql.Queries) *Syncer { + return &Syncer{ + db: db, + rClient: rClient, + cicnetClient: cicnetClient, + queries: queries, + } +} diff --git a/internal/syncer/token.go b/internal/syncer/token.go new file mode 100644 index 0000000..3c68bdb --- /dev/null +++ b/internal/syncer/token.go @@ -0,0 +1,78 @@ +package syncer + +import ( + "context" + "github.com/georgysavva/scany/pgxscan" + "github.com/hibiken/asynq" + "github.com/jackc/pgx/v4" + "github.com/lmittmann/w3" + "github.com/rs/zerolog/log" + "math/big" + "strconv" +) + +type tokenCursor struct { + cursorPos string `db:"cursor_pos"` +} + +func (s *Syncer) TokenSyncer(ctx context.Context, t *asynq.Task) error { + var lastCursor tokenCursor + + if err := pgxscan.Get(ctx, s.db, &lastCursor, s.queries["cursor-pos"], 3); err != nil { + return err + } + latestChainIdx, err := s.cicnetClient.EntryCount(ctx) + if err != nil { + log.Err(err).Msg("token syncer task failed") + return err + } + + lastCursorPos, err := strconv.ParseInt(lastCursor.cursorPos, 10, 64) + if err != nil { + log.Err(err).Msg("token syncer task failed") + return err + } + + latestChainPos := latestChainIdx.Int64() - 1 + log.Info().Msgf("=> %d tokens synced", lastCursorPos) + if latestChainPos >= lastCursorPos { + batch := &pgx.Batch{} + + for i := lastCursorPos; i <= latestChainPos; i++ { + nextTokenAddress, err := s.cicnetClient.AddressAtIndex(ctx, big.NewInt(i)) + if err != nil { + log.Err(err).Msg("token syncer task failed") + return err + } + tokenInfo, err := s.cicnetClient.ERC20TokenInfo(ctx, w3.A(nextTokenAddress)) + if err != nil { + log.Err(err).Msg("token syncer task failed") + return err + } + + batch.Queue(s.queries["insert-token-data"], nextTokenAddress[2:], tokenInfo.Name, tokenInfo.Symbol, tokenInfo.Decimals.Int64()) + } + + res := s.db.SendBatch(ctx, batch) + for i := 0; i < batch.Len(); i++ { + _, err := res.Exec() + if err != nil { + log.Err(err).Msg("token syncer task failed") + return err + } + } + err := res.Close() + if err != nil { + log.Err(err).Msg("token syncer task failed") + return err + } + + _, err = s.db.Exec(ctx, s.queries["update-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10), 3) + if err != nil { + log.Err(err).Msg("token syncer task failed") + return err + } + } + + return nil +} diff --git a/internal/syncer/ussd.go b/internal/syncer/ussd.go new file mode 100644 index 0000000..a83a8ba --- /dev/null +++ b/internal/syncer/ussd.go @@ -0,0 +1,26 @@ +package syncer + +import ( + "context" + "github.com/georgysavva/scany/pgxscan" + "github.com/hibiken/asynq" + "github.com/rs/zerolog/log" +) + +func (s *Syncer) UssdSyncer(ctx context.Context, t *asynq.Task) error { + _, err := s.db.Exec(ctx, s.queries["ussd-syncer"]) + if err != nil { + log.Err(err).Msg("ussd syncer task failed") + return asynq.SkipRetry + } + + var table tableCount + if err := pgxscan.Get(ctx, s.db, &table, "SELECT COUNT(*) from users"); err != nil { + log.Err(err).Msg("ussd syncer task failed") + return asynq.SkipRetry + } + + log.Info().Msgf("=> %d users synced", table.Count) + + return nil +}