From 1c65a11460f2c315ff2a0bd21a5cc1e69e70fed9 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Tue, 3 May 2022 21:37:48 +0300 Subject: [PATCH] add: ussd and cache syncer tasks - no repeat on failure, picked up on next schedule - enforce uniq on users and tx table to prevent duplicates --- Makefile | 5 +++++ cmd/cache_syncer.go | 38 ++++++++++++++++++++++++++++++++ cmd/init.go | 1 + cmd/processor.go | 2 ++ cmd/scheduler.go | 17 +++++++++++++- cmd/token_syncer.go | 8 +++---- cmd/ussd_syncer.go | 34 ++++++++++++++++++++++++++++ config.toml | 11 +++++++-- migrations/002_cic_dw_tables.sql | 15 +++++++------ queries.sql | 30 ++++++++++++------------- 10 files changed, 131 insertions(+), 30 deletions(-) create mode 100644 Makefile create mode 100644 cmd/cache_syncer.go create mode 100644 cmd/ussd_syncer.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0702c06 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +BIN := cic-dw + +.PHONY: build +build: + CGO_ENABLED=1 GOOS=linux go build -o ${BIN} -ldflags="-s -w" cmd/*.go \ No newline at end of file diff --git a/cmd/cache_syncer.go b/cmd/cache_syncer.go new file mode 100644 index 0000000..a9de2c7 --- /dev/null +++ b/cmd/cache_syncer.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "github.com/georgysavva/scany/pgxscan" + "github.com/hibiken/asynq" + "github.com/rs/zerolog/log" +) + +type cacheSyncer struct { + app *App +} + +type tableCount struct { + Count int `db:"count"` +} + +func newCacheSyncer(app *App) *cacheSyncer { + return &cacheSyncer{ + app: app, + } +} + +func (s *cacheSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error { + _, err := s.app.db.Exec(ctx, s.app.queries["cache-syncer"]) + if err != nil { + return asynq.SkipRetry + } + + var count tableCount + if err := pgxscan.Get(ctx, s.app.db, &count, "SELECT COUNT(*) from transactions"); err != nil { + return asynq.SkipRetry + } + + log.Info().Msgf("=> %d transactions synced", count.Count) + + return nil +} diff --git a/cmd/init.go b/cmd/init.go index c07062c..d841541 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -13,6 +13,7 @@ import ( "strings" ) +// TODO: Load into koanf struct func loadConfig(configFilePath string, envOverridePrefix string, conf *koanf.Koanf) error { // assumed to always be at the root folder confFile := file.Provider(configFilePath) diff --git a/cmd/processor.go b/cmd/processor.go index 15ce979..8a731c4 100644 --- a/cmd/processor.go +++ b/cmd/processor.go @@ -15,6 +15,8 @@ func runProcessor(app *App) { mux := asynq.NewServeMux() mux.Handle("token:sync", newTokenSyncer(app)) + mux.Handle("cache:sync", newCacheSyncer(app)) + mux.Handle("ussd:sync", newUssdSyncer(app)) if err := processorServer.Run(mux); err != nil { log.Fatal().Err(err).Msg("failed to start job processor") diff --git a/cmd/scheduler.go b/cmd/scheduler.go index 3e7a7be..9031ebd 100644 --- a/cmd/scheduler.go +++ b/cmd/scheduler.go @@ -10,14 +10,29 @@ var scheduler *asynq.Scheduler func runScheduler(app *App) { scheduler = asynq.NewScheduler(app.rClient, nil) + // TODO: Refactor boilerplate and pull enabled tasks from koanf tokenTask := asynq.NewTask("token:sync", nil) + cacheTask := asynq.NewTask("cache:sync", nil) + ussdTask := asynq.NewTask("ussd:sync", nil) - _, err := scheduler.Register(conf.String("schedule.token"), tokenTask) + _, err := scheduler.Register(conf.String("token.schedule"), tokenTask) if err != nil { log.Fatal().Err(err).Msg("failed to register token syncer") } log.Info().Msg("successfully registered token syncer") + _, err = scheduler.Register(conf.String("cache.schedule"), cacheTask) + if err != nil { + log.Fatal().Err(err).Msg("failed to register cache syncer") + } + log.Info().Msg("successfully registered cache syncer") + + _, err = scheduler.Register(conf.String("ussd.schedule"), ussdTask) + if err != nil { + log.Fatal().Err(err).Msg("failed to register ussd syncer") + } + log.Info().Msg("successfully registered ussd syncer") + if err := scheduler.Run(); err != nil { log.Fatal().Err(err).Msg("could not start asynq scheduler") } diff --git a/cmd/token_syncer.go b/cmd/token_syncer.go index 3148182..2046fa1 100644 --- a/cmd/token_syncer.go +++ b/cmd/token_syncer.go @@ -27,10 +27,9 @@ func newTokenSyncer(app *App) *tokenSyncer { } func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error { - log.Info().Msgf("running task type: %s", t.Type()) var lastCursor tokenCursor - if err := pgxscan.Get(ctx, s.app.db, &lastCursor, s.app.queries["token-cursor-pos"]); err != nil { + if err := pgxscan.Get(ctx, s.app.db, &lastCursor, s.app.queries["cursor-pos"], 3); err != nil { return err } latestChainIdx, err := s.app.cicnetClient.EntryCount(ctx) @@ -44,7 +43,7 @@ func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error { } latestChainPos := latestChainIdx.Int64() - 1 - log.Info().Msgf("current db cursor: %s, latest chain pos: %d", lastCursor.CursorPos, latestChainPos) + log.Info().Msgf("=> %d tokens synced", lastCursorPos) if latestChainPos >= lastCursorPos { batch := &pgx.Batch{} @@ -63,7 +62,6 @@ func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error { } res := s.app.db.SendBatch(ctx, batch) - log.Info().Msgf("inserting %d new records", batch.Len()) for i := 0; i < batch.Len(); i++ { _, err := res.Exec() if err != nil { @@ -75,7 +73,7 @@ func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error { return err } - _, err = s.app.db.Exec(ctx, s.app.queries["update-token-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10)) + _, err = s.app.db.Exec(ctx, s.app.queries["update-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10), 3) if err != nil { return err } diff --git a/cmd/ussd_syncer.go b/cmd/ussd_syncer.go new file mode 100644 index 0000000..7e31c07 --- /dev/null +++ b/cmd/ussd_syncer.go @@ -0,0 +1,34 @@ +package main + +import ( + "context" + "github.com/georgysavva/scany/pgxscan" + "github.com/hibiken/asynq" + "github.com/rs/zerolog/log" +) + +type ussdSyncer struct { + app *App +} + +func newUssdSyncer(app *App) *ussdSyncer { + return &ussdSyncer{ + app: app, + } +} + +func (s *ussdSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error { + _, err := s.app.db.Exec(ctx, s.app.queries["ussd-syncer"]) + if err != nil { + return asynq.SkipRetry + } + + var count tableCount + if err := pgxscan.Get(ctx, s.app.db, &count, "SELECT COUNT(*) from users"); err != nil { + return asynq.SkipRetry + } + + log.Info().Msgf("=> %d users synced", count.Count) + + return nil +} diff --git a/config.toml b/config.toml index efe7335..6c6e5f8 100644 --- a/config.toml +++ b/config.toml @@ -8,5 +8,12 @@ dsn = "127.0.0.1:6379" rpc = "http://127.0.0.1:8545" registry = "0x5A1EB529438D8b3cA943A45a48744f4c73d1f098" -[schedule] -token = "@every 15s" \ No newline at end of file +# syncers +[ussd] +schedule = "@every 15s" + +[cache] +schedule = "@every 15s" + +[token] +schedule = "@every 15s" diff --git a/migrations/002_cic_dw_tables.sql b/migrations/002_cic_dw_tables.sql index 06a3428..f312f6c 100644 --- a/migrations/002_cic_dw_tables.sql +++ b/migrations/002_cic_dw_tables.sql @@ -1,7 +1,7 @@ -- tx table CREATE TABLE IF NOT EXISTS transactions ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - tx_hash VARCHAR(64) NOT NULL, + tx_hash VARCHAR(64) NOT NULL UNIQUE, block_number INT NOT NULL, tx_index INT NOT NULL, token_address VARCHAR(40) NOT NULL, @@ -9,7 +9,8 @@ CREATE TABLE IF NOT EXISTS transactions ( recipient_address VARCHAR(40) NOT NULL, tx_value BIGINT NOT NULL, tx_type VARCHAR(16) NOT NULL, - date_block TIMESTAMP NOT NULL + date_block TIMESTAMP NOT NULL, + success BOOLEAN NOT NULL ); CREATE INDEX IF NOT EXISTS token_idx ON transactions USING hash(token_address); @@ -28,8 +29,8 @@ CREATE TABLE IF NOT EXISTS tokens ( -- users table CREATE TABLE IF NOT EXISTS users ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - phone_number VARCHAR(16) NOT NULL, - blockchain_address VARCHAR(40) NOT NULL, + phone_number VARCHAR(16) NOT NULL UNIQUE, + blockchain_address VARCHAR(40) NOT NULL UNIQUE, date_registered TIMESTAMP NOT NULL, failed_pin_attempts INT NOT NULL, ussd_account_status INT NOT NULL @@ -78,12 +79,12 @@ INSERT INTO cursors (id, cursor_pos, cursor_description) SELECT 1, blockchain_address, 'cic_ussd.account.block_chain_address remote cursor' FROM users ORDER BY id DESC LIMIT 1; -- bootstrap first tx row -INSERT INTO transactions (tx_hash, block_number, tx_index, token_address, sender_address, recipient_address, tx_value, date_block, tx_type) -SELECT tx.tx_hash, tx.block_number, tx.tx_index, tx.source_token, tx.sender, tx.recipient, tx.from_value, tx.date_block, concat(tag.domain, '_', tag.value) AS tx_type +INSERT INTO transactions (tx_hash, block_number, tx_index, token_address, sender_address, recipient_address, tx_value, date_block, tx_type, success) +SELECT tx.tx_hash, tx.block_number, tx.tx_index, tx.source_token, tx.sender, tx.recipient, tx.from_value, tx.date_block, concat(tag.domain, '_', tag.value) AS tx_type, tx.success FROM cic_cache.tx INNER JOIN cic_cache.tag_tx_link ON tx.id = cic_cache.tag_tx_link.tx_id INNER JOIN cic_cache.tag ON cic_cache.tag_tx_link.tag_id = cic_cache.tag.id -WHERE tx.success = true AND tx.id = 1; +WHERE tx.id = 1; -- id 2 = cic_cache cursor INSERT INTO cursors (id, cursor_pos, cursor_description) diff --git a/queries.sql b/queries.sql index f9aad7c..c03bfd2 100644 --- a/queries.sql +++ b/queries.sql @@ -7,9 +7,9 @@ WITH current_ussd_cursor AS ( SELECT id FROM cic_ussd.account WHERE blockchain_address = (SELECT cursor_pos FROM cursors WHERE id = 1) ) -INSERT INTO users (phone_number, blockchain_address, date_registered) -SELECT phone_number, blockchain_address, created -FROM cic_ussd.account WHERE id > (SELECT id FROM current_ussd_cursor) ORDER BY id ASC LIMIT 10; +INSERT INTO users (phone_number, blockchain_address, date_registered, failed_pin_attempts, ussd_account_status) +SELECT cic_ussd.account.phone_number, cic_ussd.account.blockchain_address, cic_ussd.account.created, cic_ussd.account.failed_pin_attempts, cic_ussd.account.status +FROM cic_ussd.account WHERE cic_ussd.account.id > (SELECT id FROM current_ussd_cursor) ORDER BY cic_ussd.account.id ASC LIMIT 200; UPDATE cursors SET cursor_pos = (SELECT blockchain_address FROM users ORDER BY id DESC LIMIT 1) WHERE cursors.id = 1; @@ -20,22 +20,22 @@ WITH current_cache_cursor AS ( SELECT id FROM cic_cache.tx WHERE tx_hash = (SELECT cursor_pos FROM cursors WHERE id = 2) ) -INSERT INTO transactions (tx_hash, block_number, tx_index, token_address, sender_address, recipient_address, tx_value, date_block, tx_type) -SELECT tx.tx_hash, tx.block_number, tx.tx_index, tx.source_token, tx.sender, tx.recipient, tx.from_value, tx.date_block, concat(tag.domain, '_', tag.value) AS tx_type -FROM cic_cache.tx INNER JOIN cic_cache.tag_tx_link ON tx.id = cic_cache.tag_tx_link.tx_id INNER JOIN cic_cache.tag ON cic_cache.tag_tx_link.tag_id = cic_cache.tag.id -WHERE tx.success = true AND tx.id > (SELECT id FROM current_cache_cursor) ORDER BY tx.id ASC LIMIT 10; +INSERT INTO transactions (tx_hash, block_number, tx_index, token_address, sender_address, recipient_address, tx_value, date_block, tx_type, success) +SELECT cic_cache.tx.tx_hash, cic_cache.tx.block_number, cic_cache.tx.tx_index, cic_cache.tx.source_token, cic_cache.tx.sender, cic_cache.tx.recipient, cic_cache.tx.from_value, cic_cache.tx.date_block, concat(cic_cache.tag.domain, '_', cic_cache.tag.value) AS tx_type, cic_cache.tx.success +FROM cic_cache.tx INNER JOIN cic_cache.tag_tx_link ON cic_cache.tx.id = cic_cache.tag_tx_link.tx_id INNER JOIN cic_cache.tag ON cic_cache.tag_tx_link.tag_id = cic_cache.tag.id +WHERE cic_cache.tx.id > (SELECT id FROM current_cache_cursor) ORDER BY cic_cache.tx.id ASC LIMIT 200; -UPDATE cursors SET cursor_pos = (SELECT tx_hash FROM tx ORDER BY id DESC LIMIT 1) WHERE cursors.id = 2; +UPDATE cursors SET cursor_pos = (SELECT tx_hash FROM transactions ORDER BY id DESC LIMIT 1) WHERE cursors.id = 2; --- name: token-cursor-pos --- Check last synced token -SELECT cursor_pos from cursors WHERE id = 3; +-- name: cursor-pos +-- Generic cursor query +SELECT cursor_pos from cursors WHERE id = $1; -- name: insert-token-data -- Insert new token INSERT INTO tokens (token_address, token_name, token_symbol, token_decimals) VALUES -($1, $2, $3, $4) +($1, $2, $3, $4); --- name: update-token-cursor --- Updates token cursor to the last synced token idx -UPDATE cursors SET cursor_pos = $1 WHERE cursors.id = 3; +-- name: update-cursor +-- Generic cursor update +UPDATE cursors SET cursor_pos = $1 WHERE cursors.id = $2; \ No newline at end of file