Compare commits

..

No commits in common. "master" and "v0.2.0-beta" have entirely different histories.

7 changed files with 10 additions and 19 deletions

1
.gitignore vendored
View File

@ -1 +0,0 @@
.vscode

View File

@ -71,7 +71,7 @@ func main() {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
// syncer.Run() syncer.Run()
}() }()
<-ctx.Done() <-ctx.Done()
@ -80,7 +80,7 @@ func main() {
wg.Add(1) wg.Add(1)
go func() { go func() {
// syncer.Stop() syncer.Stop()
defer wg.Done() defer wg.Done()
}() }()

View File

@ -11,11 +11,10 @@ import (
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
) )
const syncInterval = time.Hour * 1 const syncInterval = time.Second * 5
type ( type (
KVRow struct { KVRow struct {
ID int `db:"id"`
Key []byte `db:"key"` Key []byte `db:"key"`
Value []byte `db:"value"` Value []byte `db:"value"`
Updated time.Time `db:"updated"` Updated time.Time `db:"updated"`
@ -74,7 +73,7 @@ func (s *Syncer) Process(ctx context.Context) error {
kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) { kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) {
var kvRow KVRow var kvRow KVRow
err := row.Scan(&kvRow.ID, &kvRow.Key, &kvRow.Value, &kvRow.Updated) err := row.Scan(&kvRow.Key, &kvRow.Value, &kvRow.Updated)
return kvRow, err return kvRow, err
}) })
if err != nil { if err != nil {
@ -92,8 +91,7 @@ func (s *Syncer) Process(ctx context.Context) error {
} }
decodedValue := data.DecodeValue(row.Value) decodedValue := data.DecodeValue(row.Value)
s.logg.Info("processing row", "id", row.ID) s.logg.Debug("processing row", "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column)
s.logg.Debug("processing row", "id", row.ID, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column)
query := fmt.Sprintf(` query := fmt.Sprintf(`
INSERT INTO ussd_data (phone_number, %s) INSERT INTO ussd_data (phone_number, %s)
VALUES ($1, $2) VALUES ($1, $2)

View File

@ -8,13 +8,12 @@ const ()
// DecodeKey specifically only decodes user data keys stored as bytes into its respective session ID and data type // DecodeKey specifically only decodes user data keys stored as bytes into its respective session ID and data type
// TODO: Replace return data type with imported data types from the common package once lib-gdbm dependency is removed. // TODO: Replace return data type with imported data types from the common package once lib-gdbm dependency is removed.
// Note: 0x2e was added herehttps://holbrook.no/src/go-vise/file/db/db.go.html#l147, so we discard the last 3 bytes
func DecodeKey(key []byte) (uint16, string) { func DecodeKey(key []byte) (uint16, string) {
if key[0] != keyPrefix { if key[0] != keyPrefix {
return 0, "" return 0, ""
} }
return binary.BigEndian.Uint16(key[len(key)-2:]), string(key[1 : len(key)-3]) return binary.BigEndian.Uint16(key[len(key)-2:]), string(key[1 : len(key)-2])
} }
// DecodeValue returns the utf-8 string representation of the value stored in the storage backend // DecodeValue returns the utf-8 string representation of the value stored in the storage backend

View File

@ -21,10 +21,10 @@ func TestDecodeKey(t *testing.T) {
{ {
"blockchain_address", "blockchain_address",
args{ args{
keyBytesHex: "202b3235343731313737373733342e0001", keyBytesHex: "202b3235343731313030303132330001",
}, },
want{ want{
sessionID: "+254711777734", sessionID: "+254711000123",
dataType: ACCOUNT_BLOCKCHAIN_ADDRESS, dataType: ACCOUNT_BLOCKCHAIN_ADDRESS,
}, },
}, },

View File

@ -13,7 +13,6 @@ const (
ACCOUNT_GENDER = 7 ACCOUNT_GENDER = 7
ACCOUNT_COMMODITIES = 8 ACCOUNT_COMMODITIES = 8
ACCOUNT_ACTIVE_VOUCHER = 17 ACCOUNT_ACTIVE_VOUCHER = 17
ACCOUNT_LANG_CODE = 19
) )
// ValidDataTypeLookup allows us to filter go-vise data types, additionally the value maps to the ussd_data coulmn // ValidDataTypeLookup allows us to filter go-vise data types, additionally the value maps to the ussd_data coulmn
@ -26,5 +25,4 @@ var ValidDataTypeLookup = map[uint16]string{
ACCOUNT_GENDER: "gender", ACCOUNT_GENDER: "gender",
ACCOUNT_COMMODITIES: "commodities", ACCOUNT_COMMODITIES: "commodities",
ACCOUNT_ACTIVE_VOUCHER: "active_voucher", ACCOUNT_ACTIVE_VOUCHER: "active_voucher",
ACCOUNT_LANG_CODE: "lang_code",
} }

View File

@ -1,11 +1,8 @@
--name: extract-entries --name: extract-entries
SELECT id, key, value, updated FROM kv_vise SELECT key, value, updated FROM kv_vise
WHERE updated > (SELECT last_sync FROM ussd_sync LIMIT 1) WHERE updated > (SELECT last_sync FROM ussd_sync LIMIT 1)
ORDER BY updated DESC ORDER BY updated DESC
--name: update-cursor --name: update-cursor
-- $1: timestamp -- $1: timestamp
UPDATE ussd_sync SET last_sync = ($1) UPDATE ussd_sync SET last_sync = ($1)
-- 202b3235343732343934323039370013
-- 202b3235343731313030303132330013