Compare commits
7 Commits
v0.2.0-bet
...
master
Author | SHA1 | Date | |
---|---|---|---|
ad4963e5a0 | |||
fbd5f28d54 | |||
8b7e6ca8bc | |||
ef477c736f | |||
1e0c853d40 | |||
6fff0ba538 | |||
3c212c0612 |
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
.vscode
|
@ -71,7 +71,7 @@ func main() {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
syncer.Run()
|
// syncer.Run()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
@ -80,7 +80,7 @@ func main() {
|
|||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
syncer.Stop()
|
// syncer.Stop()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -11,10 +11,11 @@ import (
|
|||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
)
|
)
|
||||||
|
|
||||||
const syncInterval = time.Second * 5
|
const syncInterval = time.Hour * 1
|
||||||
|
|
||||||
type (
|
type (
|
||||||
KVRow struct {
|
KVRow struct {
|
||||||
|
ID int `db:"id"`
|
||||||
Key []byte `db:"key"`
|
Key []byte `db:"key"`
|
||||||
Value []byte `db:"value"`
|
Value []byte `db:"value"`
|
||||||
Updated time.Time `db:"updated"`
|
Updated time.Time `db:"updated"`
|
||||||
@ -73,7 +74,7 @@ func (s *Syncer) Process(ctx context.Context) error {
|
|||||||
|
|
||||||
kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) {
|
kvRows, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (KVRow, error) {
|
||||||
var kvRow KVRow
|
var kvRow KVRow
|
||||||
err := row.Scan(&kvRow.Key, &kvRow.Value, &kvRow.Updated)
|
err := row.Scan(&kvRow.ID, &kvRow.Key, &kvRow.Value, &kvRow.Updated)
|
||||||
return kvRow, err
|
return kvRow, err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -91,7 +92,8 @@ func (s *Syncer) Process(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
decodedValue := data.DecodeValue(row.Value)
|
decodedValue := data.DecodeValue(row.Value)
|
||||||
|
|
||||||
s.logg.Debug("processing row", "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column)
|
s.logg.Info("processing row", "id", row.ID)
|
||||||
|
s.logg.Debug("processing row", "id", row.ID, "key_type", decodedKeyDataType, "session_id", sessionID, "value", decodedValue, "timestamp", row.Updated, "column", column)
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
INSERT INTO ussd_data (phone_number, %s)
|
INSERT INTO ussd_data (phone_number, %s)
|
||||||
VALUES ($1, $2)
|
VALUES ($1, $2)
|
||||||
|
@ -8,12 +8,13 @@ const ()
|
|||||||
|
|
||||||
// DecodeKey specifically only decodes user data keys stored as bytes into its respective session ID and data type
|
// DecodeKey specifically only decodes user data keys stored as bytes into its respective session ID and data type
|
||||||
// TODO: Replace return data type with imported data types from the common package once lib-gdbm dependency is removed.
|
// TODO: Replace return data type with imported data types from the common package once lib-gdbm dependency is removed.
|
||||||
|
// Note: 0x2e was added herehttps://holbrook.no/src/go-vise/file/db/db.go.html#l147, so we discard the last 3 bytes
|
||||||
func DecodeKey(key []byte) (uint16, string) {
|
func DecodeKey(key []byte) (uint16, string) {
|
||||||
if key[0] != keyPrefix {
|
if key[0] != keyPrefix {
|
||||||
return 0, ""
|
return 0, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
return binary.BigEndian.Uint16(key[len(key)-2:]), string(key[1 : len(key)-2])
|
return binary.BigEndian.Uint16(key[len(key)-2:]), string(key[1 : len(key)-3])
|
||||||
}
|
}
|
||||||
|
|
||||||
// DecodeValue returns the utf-8 string representation of the value stored in the storage backend
|
// DecodeValue returns the utf-8 string representation of the value stored in the storage backend
|
||||||
|
@ -21,10 +21,10 @@ func TestDecodeKey(t *testing.T) {
|
|||||||
{
|
{
|
||||||
"blockchain_address",
|
"blockchain_address",
|
||||||
args{
|
args{
|
||||||
keyBytesHex: "202b3235343731313030303132330001",
|
keyBytesHex: "202b3235343731313737373733342e0001",
|
||||||
},
|
},
|
||||||
want{
|
want{
|
||||||
sessionID: "+254711000123",
|
sessionID: "+254711777734",
|
||||||
dataType: ACCOUNT_BLOCKCHAIN_ADDRESS,
|
dataType: ACCOUNT_BLOCKCHAIN_ADDRESS,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -13,6 +13,7 @@ const (
|
|||||||
ACCOUNT_GENDER = 7
|
ACCOUNT_GENDER = 7
|
||||||
ACCOUNT_COMMODITIES = 8
|
ACCOUNT_COMMODITIES = 8
|
||||||
ACCOUNT_ACTIVE_VOUCHER = 17
|
ACCOUNT_ACTIVE_VOUCHER = 17
|
||||||
|
ACCOUNT_LANG_CODE = 19
|
||||||
)
|
)
|
||||||
|
|
||||||
// ValidDataTypeLookup allows us to filter go-vise data types, additionally the value maps to the ussd_data coulmn
|
// ValidDataTypeLookup allows us to filter go-vise data types, additionally the value maps to the ussd_data coulmn
|
||||||
@ -25,4 +26,5 @@ var ValidDataTypeLookup = map[uint16]string{
|
|||||||
ACCOUNT_GENDER: "gender",
|
ACCOUNT_GENDER: "gender",
|
||||||
ACCOUNT_COMMODITIES: "commodities",
|
ACCOUNT_COMMODITIES: "commodities",
|
||||||
ACCOUNT_ACTIVE_VOUCHER: "active_voucher",
|
ACCOUNT_ACTIVE_VOUCHER: "active_voucher",
|
||||||
|
ACCOUNT_LANG_CODE: "lang_code",
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,11 @@
|
|||||||
--name: extract-entries
|
--name: extract-entries
|
||||||
SELECT key, value, updated FROM kv_vise
|
SELECT id, key, value, updated FROM kv_vise
|
||||||
WHERE updated > (SELECT last_sync FROM ussd_sync LIMIT 1)
|
WHERE updated > (SELECT last_sync FROM ussd_sync LIMIT 1)
|
||||||
ORDER BY updated DESC
|
ORDER BY updated DESC
|
||||||
|
|
||||||
--name: update-cursor
|
--name: update-cursor
|
||||||
-- $1: timestamp
|
-- $1: timestamp
|
||||||
UPDATE ussd_sync SET last_sync = ($1)
|
UPDATE ussd_sync SET last_sync = ($1)
|
||||||
|
|
||||||
|
-- 202b3235343732343934323039370013
|
||||||
|
-- 202b3235343731313030303132330013
|
Loading…
Reference in New Issue
Block a user