add: ussd and cache syncer tasks

- no repeat on failure, picked up on next schedule
- enforce uniq on users and tx table to prevent duplicates
This commit is contained in:
Mohamed Sohail 2022-05-03 21:37:48 +03:00
parent 05ab865c63
commit 1c65a11460
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
10 changed files with 131 additions and 30 deletions

5
Makefile Normal file
View File

@ -0,0 +1,5 @@
BIN := cic-dw
.PHONY: build
build:
CGO_ENABLED=1 GOOS=linux go build -o ${BIN} -ldflags="-s -w" cmd/*.go

38
cmd/cache_syncer.go Normal file
View File

@ -0,0 +1,38 @@
package main
import (
"context"
"github.com/georgysavva/scany/pgxscan"
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
)
type cacheSyncer struct {
app *App
}
type tableCount struct {
Count int `db:"count"`
}
func newCacheSyncer(app *App) *cacheSyncer {
return &cacheSyncer{
app: app,
}
}
func (s *cacheSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
_, err := s.app.db.Exec(ctx, s.app.queries["cache-syncer"])
if err != nil {
return asynq.SkipRetry
}
var count tableCount
if err := pgxscan.Get(ctx, s.app.db, &count, "SELECT COUNT(*) from transactions"); err != nil {
return asynq.SkipRetry
}
log.Info().Msgf("=> %d transactions synced", count.Count)
return nil
}

View File

@ -13,6 +13,7 @@ import (
"strings" "strings"
) )
// TODO: Load into koanf struct
func loadConfig(configFilePath string, envOverridePrefix string, conf *koanf.Koanf) error { func loadConfig(configFilePath string, envOverridePrefix string, conf *koanf.Koanf) error {
// assumed to always be at the root folder // assumed to always be at the root folder
confFile := file.Provider(configFilePath) confFile := file.Provider(configFilePath)

View File

@ -15,6 +15,8 @@ func runProcessor(app *App) {
mux := asynq.NewServeMux() mux := asynq.NewServeMux()
mux.Handle("token:sync", newTokenSyncer(app)) mux.Handle("token:sync", newTokenSyncer(app))
mux.Handle("cache:sync", newCacheSyncer(app))
mux.Handle("ussd:sync", newUssdSyncer(app))
if err := processorServer.Run(mux); err != nil { if err := processorServer.Run(mux); err != nil {
log.Fatal().Err(err).Msg("failed to start job processor") log.Fatal().Err(err).Msg("failed to start job processor")

View File

@ -10,14 +10,29 @@ var scheduler *asynq.Scheduler
func runScheduler(app *App) { func runScheduler(app *App) {
scheduler = asynq.NewScheduler(app.rClient, nil) scheduler = asynq.NewScheduler(app.rClient, nil)
// TODO: Refactor boilerplate and pull enabled tasks from koanf
tokenTask := asynq.NewTask("token:sync", nil) tokenTask := asynq.NewTask("token:sync", nil)
cacheTask := asynq.NewTask("cache:sync", nil)
ussdTask := asynq.NewTask("ussd:sync", nil)
_, err := scheduler.Register(conf.String("schedule.token"), tokenTask) _, err := scheduler.Register(conf.String("token.schedule"), tokenTask)
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("failed to register token syncer") log.Fatal().Err(err).Msg("failed to register token syncer")
} }
log.Info().Msg("successfully registered token syncer") log.Info().Msg("successfully registered token syncer")
_, err = scheduler.Register(conf.String("cache.schedule"), cacheTask)
if err != nil {
log.Fatal().Err(err).Msg("failed to register cache syncer")
}
log.Info().Msg("successfully registered cache syncer")
_, err = scheduler.Register(conf.String("ussd.schedule"), ussdTask)
if err != nil {
log.Fatal().Err(err).Msg("failed to register ussd syncer")
}
log.Info().Msg("successfully registered ussd syncer")
if err := scheduler.Run(); err != nil { if err := scheduler.Run(); err != nil {
log.Fatal().Err(err).Msg("could not start asynq scheduler") log.Fatal().Err(err).Msg("could not start asynq scheduler")
} }

View File

@ -27,10 +27,9 @@ func newTokenSyncer(app *App) *tokenSyncer {
} }
func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error { func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
log.Info().Msgf("running task type: %s", t.Type())
var lastCursor tokenCursor var lastCursor tokenCursor
if err := pgxscan.Get(ctx, s.app.db, &lastCursor, s.app.queries["token-cursor-pos"]); err != nil { if err := pgxscan.Get(ctx, s.app.db, &lastCursor, s.app.queries["cursor-pos"], 3); err != nil {
return err return err
} }
latestChainIdx, err := s.app.cicnetClient.EntryCount(ctx) latestChainIdx, err := s.app.cicnetClient.EntryCount(ctx)
@ -44,7 +43,7 @@ func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
} }
latestChainPos := latestChainIdx.Int64() - 1 latestChainPos := latestChainIdx.Int64() - 1
log.Info().Msgf("current db cursor: %s, latest chain pos: %d", lastCursor.CursorPos, latestChainPos) log.Info().Msgf("=> %d tokens synced", lastCursorPos)
if latestChainPos >= lastCursorPos { if latestChainPos >= lastCursorPos {
batch := &pgx.Batch{} batch := &pgx.Batch{}
@ -63,7 +62,6 @@ func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
} }
res := s.app.db.SendBatch(ctx, batch) res := s.app.db.SendBatch(ctx, batch)
log.Info().Msgf("inserting %d new records", batch.Len())
for i := 0; i < batch.Len(); i++ { for i := 0; i < batch.Len(); i++ {
_, err := res.Exec() _, err := res.Exec()
if err != nil { if err != nil {
@ -75,7 +73,7 @@ func (s *tokenSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
return err return err
} }
_, err = s.app.db.Exec(ctx, s.app.queries["update-token-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10)) _, err = s.app.db.Exec(ctx, s.app.queries["update-cursor"], strconv.FormatInt(latestChainIdx.Int64(), 10), 3)
if err != nil { if err != nil {
return err return err
} }

34
cmd/ussd_syncer.go Normal file
View File

@ -0,0 +1,34 @@
package main
import (
"context"
"github.com/georgysavva/scany/pgxscan"
"github.com/hibiken/asynq"
"github.com/rs/zerolog/log"
)
type ussdSyncer struct {
app *App
}
func newUssdSyncer(app *App) *ussdSyncer {
return &ussdSyncer{
app: app,
}
}
func (s *ussdSyncer) ProcessTask(ctx context.Context, t *asynq.Task) error {
_, err := s.app.db.Exec(ctx, s.app.queries["ussd-syncer"])
if err != nil {
return asynq.SkipRetry
}
var count tableCount
if err := pgxscan.Get(ctx, s.app.db, &count, "SELECT COUNT(*) from users"); err != nil {
return asynq.SkipRetry
}
log.Info().Msgf("=> %d users synced", count.Count)
return nil
}

View File

@ -8,5 +8,12 @@ dsn = "127.0.0.1:6379"
rpc = "http://127.0.0.1:8545" rpc = "http://127.0.0.1:8545"
registry = "0x5A1EB529438D8b3cA943A45a48744f4c73d1f098" registry = "0x5A1EB529438D8b3cA943A45a48744f4c73d1f098"
[schedule] # syncers
token = "@every 15s" [ussd]
schedule = "@every 15s"
[cache]
schedule = "@every 15s"
[token]
schedule = "@every 15s"

View File

@ -1,7 +1,7 @@
-- tx table -- tx table
CREATE TABLE IF NOT EXISTS transactions ( CREATE TABLE IF NOT EXISTS transactions (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tx_hash VARCHAR(64) NOT NULL, tx_hash VARCHAR(64) NOT NULL UNIQUE,
block_number INT NOT NULL, block_number INT NOT NULL,
tx_index INT NOT NULL, tx_index INT NOT NULL,
token_address VARCHAR(40) NOT NULL, token_address VARCHAR(40) NOT NULL,
@ -9,7 +9,8 @@ CREATE TABLE IF NOT EXISTS transactions (
recipient_address VARCHAR(40) NOT NULL, recipient_address VARCHAR(40) NOT NULL,
tx_value BIGINT NOT NULL, tx_value BIGINT NOT NULL,
tx_type VARCHAR(16) NOT NULL, tx_type VARCHAR(16) NOT NULL,
date_block TIMESTAMP NOT NULL date_block TIMESTAMP NOT NULL,
success BOOLEAN NOT NULL
); );
CREATE INDEX IF NOT EXISTS token_idx ON transactions USING hash(token_address); CREATE INDEX IF NOT EXISTS token_idx ON transactions USING hash(token_address);
@ -28,8 +29,8 @@ CREATE TABLE IF NOT EXISTS tokens (
-- users table -- users table
CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS users (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
phone_number VARCHAR(16) NOT NULL, phone_number VARCHAR(16) NOT NULL UNIQUE,
blockchain_address VARCHAR(40) NOT NULL, blockchain_address VARCHAR(40) NOT NULL UNIQUE,
date_registered TIMESTAMP NOT NULL, date_registered TIMESTAMP NOT NULL,
failed_pin_attempts INT NOT NULL, failed_pin_attempts INT NOT NULL,
ussd_account_status INT NOT NULL ussd_account_status INT NOT NULL
@ -78,12 +79,12 @@ INSERT INTO cursors (id, cursor_pos, cursor_description)
SELECT 1, blockchain_address, 'cic_ussd.account.block_chain_address remote cursor' FROM users ORDER BY id DESC LIMIT 1; SELECT 1, blockchain_address, 'cic_ussd.account.block_chain_address remote cursor' FROM users ORDER BY id DESC LIMIT 1;
-- bootstrap first tx row -- bootstrap first tx row
INSERT INTO transactions (tx_hash, block_number, tx_index, token_address, sender_address, recipient_address, tx_value, date_block, tx_type) INSERT INTO transactions (tx_hash, block_number, tx_index, token_address, sender_address, recipient_address, tx_value, date_block, tx_type, success)
SELECT tx.tx_hash, tx.block_number, tx.tx_index, tx.source_token, tx.sender, tx.recipient, tx.from_value, tx.date_block, concat(tag.domain, '_', tag.value) AS tx_type SELECT tx.tx_hash, tx.block_number, tx.tx_index, tx.source_token, tx.sender, tx.recipient, tx.from_value, tx.date_block, concat(tag.domain, '_', tag.value) AS tx_type, tx.success
FROM cic_cache.tx FROM cic_cache.tx
INNER JOIN cic_cache.tag_tx_link ON tx.id = cic_cache.tag_tx_link.tx_id INNER JOIN cic_cache.tag_tx_link ON tx.id = cic_cache.tag_tx_link.tx_id
INNER JOIN cic_cache.tag ON cic_cache.tag_tx_link.tag_id = cic_cache.tag.id INNER JOIN cic_cache.tag ON cic_cache.tag_tx_link.tag_id = cic_cache.tag.id
WHERE tx.success = true AND tx.id = 1; WHERE tx.id = 1;
-- id 2 = cic_cache cursor -- id 2 = cic_cache cursor
INSERT INTO cursors (id, cursor_pos, cursor_description) INSERT INTO cursors (id, cursor_pos, cursor_description)

View File

@ -7,9 +7,9 @@ WITH current_ussd_cursor AS (
SELECT id FROM cic_ussd.account WHERE blockchain_address = (SELECT cursor_pos FROM cursors WHERE id = 1) SELECT id FROM cic_ussd.account WHERE blockchain_address = (SELECT cursor_pos FROM cursors WHERE id = 1)
) )
INSERT INTO users (phone_number, blockchain_address, date_registered) INSERT INTO users (phone_number, blockchain_address, date_registered, failed_pin_attempts, ussd_account_status)
SELECT phone_number, blockchain_address, created SELECT cic_ussd.account.phone_number, cic_ussd.account.blockchain_address, cic_ussd.account.created, cic_ussd.account.failed_pin_attempts, cic_ussd.account.status
FROM cic_ussd.account WHERE id > (SELECT id FROM current_ussd_cursor) ORDER BY id ASC LIMIT 10; FROM cic_ussd.account WHERE cic_ussd.account.id > (SELECT id FROM current_ussd_cursor) ORDER BY cic_ussd.account.id ASC LIMIT 200;
UPDATE cursors SET cursor_pos = (SELECT blockchain_address FROM users ORDER BY id DESC LIMIT 1) WHERE cursors.id = 1; UPDATE cursors SET cursor_pos = (SELECT blockchain_address FROM users ORDER BY id DESC LIMIT 1) WHERE cursors.id = 1;
@ -20,22 +20,22 @@ WITH current_cache_cursor AS (
SELECT id FROM cic_cache.tx WHERE tx_hash = (SELECT cursor_pos FROM cursors WHERE id = 2) SELECT id FROM cic_cache.tx WHERE tx_hash = (SELECT cursor_pos FROM cursors WHERE id = 2)
) )
INSERT INTO transactions (tx_hash, block_number, tx_index, token_address, sender_address, recipient_address, tx_value, date_block, tx_type) INSERT INTO transactions (tx_hash, block_number, tx_index, token_address, sender_address, recipient_address, tx_value, date_block, tx_type, success)
SELECT tx.tx_hash, tx.block_number, tx.tx_index, tx.source_token, tx.sender, tx.recipient, tx.from_value, tx.date_block, concat(tag.domain, '_', tag.value) AS tx_type SELECT cic_cache.tx.tx_hash, cic_cache.tx.block_number, cic_cache.tx.tx_index, cic_cache.tx.source_token, cic_cache.tx.sender, cic_cache.tx.recipient, cic_cache.tx.from_value, cic_cache.tx.date_block, concat(cic_cache.tag.domain, '_', cic_cache.tag.value) AS tx_type, cic_cache.tx.success
FROM cic_cache.tx INNER JOIN cic_cache.tag_tx_link ON tx.id = cic_cache.tag_tx_link.tx_id INNER JOIN cic_cache.tag ON cic_cache.tag_tx_link.tag_id = cic_cache.tag.id FROM cic_cache.tx INNER JOIN cic_cache.tag_tx_link ON cic_cache.tx.id = cic_cache.tag_tx_link.tx_id INNER JOIN cic_cache.tag ON cic_cache.tag_tx_link.tag_id = cic_cache.tag.id
WHERE tx.success = true AND tx.id > (SELECT id FROM current_cache_cursor) ORDER BY tx.id ASC LIMIT 10; WHERE cic_cache.tx.id > (SELECT id FROM current_cache_cursor) ORDER BY cic_cache.tx.id ASC LIMIT 200;
UPDATE cursors SET cursor_pos = (SELECT tx_hash FROM tx ORDER BY id DESC LIMIT 1) WHERE cursors.id = 2; UPDATE cursors SET cursor_pos = (SELECT tx_hash FROM transactions ORDER BY id DESC LIMIT 1) WHERE cursors.id = 2;
-- name: token-cursor-pos -- name: cursor-pos
-- Check last synced token -- Generic cursor query
SELECT cursor_pos from cursors WHERE id = 3; SELECT cursor_pos from cursors WHERE id = $1;
-- name: insert-token-data -- name: insert-token-data
-- Insert new token -- Insert new token
INSERT INTO tokens (token_address, token_name, token_symbol, token_decimals) VALUES INSERT INTO tokens (token_address, token_name, token_symbol, token_decimals) VALUES
($1, $2, $3, $4) ($1, $2, $3, $4);
-- name: update-token-cursor -- name: update-cursor
-- Updates token cursor to the last synced token idx -- Generic cursor update
UPDATE cursors SET cursor_pos = $1 WHERE cursors.id = 3; UPDATE cursors SET cursor_pos = $1 WHERE cursors.id = $2;