Philip/fix import script pointers
This commit is contained in:
parent
da3c812bf5
commit
4368d2bf59
@ -194,8 +194,7 @@ if __name__ == '__main__':
|
||||
f.write(json.dumps(o))
|
||||
f.close()
|
||||
|
||||
#fi.write('{},{}\n'.format(new_address, old_address))
|
||||
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), 'cic.person')
|
||||
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person')
|
||||
meta_filepath = os.path.join(meta_dir, '{}.json'.format(new_address_clean.upper()))
|
||||
os.symlink(os.path.realpath(filepath), meta_filepath)
|
||||
|
||||
@ -221,7 +220,7 @@ if __name__ == '__main__':
|
||||
|
||||
|
||||
# custom data
|
||||
custom_key = generate_metadata_pointer(phone.encode('utf-8'), ':cic.custom')
|
||||
custom_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.custom')
|
||||
custom_filepath = os.path.join(custom_dir, 'meta', custom_key)
|
||||
|
||||
filepath = os.path.join(
|
||||
|
133
apps/data-seeding/cic_meta/import_meta_preferences.js
Normal file
133
apps/data-seeding/cic_meta/import_meta_preferences.js
Normal file
@ -0,0 +1,133 @@
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const http = require('http');
|
||||
|
||||
const cic = require('@cicnet/cic-client-meta');
|
||||
const crdt = require('@cicnet/crdt-meta');
|
||||
|
||||
//const conf = JSON.parse(fs.readFileSync('./cic.conf'));
|
||||
|
||||
const config = new crdt.Config('./config');
|
||||
config.process();
|
||||
console.log(config);
|
||||
|
||||
|
||||
function sendit(uid, envelope) {
|
||||
const d = envelope.toJSON();
|
||||
|
||||
const contentLength = (new TextEncoder().encode(d)).length;
|
||||
const opts = {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Content-Length': contentLength,
|
||||
'X-CIC-AUTOMERGE': 'client',
|
||||
|
||||
},
|
||||
};
|
||||
let url = config.get('META_URL');
|
||||
url = url.replace(new RegExp('^(.+://[^/]+)/*$'), '$1/');
|
||||
console.log('posting to url: ' + url + uid);
|
||||
const req = http.request(url + uid, opts, (res) => {
|
||||
res.on('data', process.stdout.write);
|
||||
res.on('end', () => {
|
||||
console.log('result', res.statusCode, res.headers);
|
||||
});
|
||||
});
|
||||
if (!req.write(d)) {
|
||||
console.error('foo', d);
|
||||
process.exit(1);
|
||||
}
|
||||
req.end();
|
||||
}
|
||||
|
||||
function doOne(keystore, filePath, identifier) {
|
||||
const signer = new crdt.PGPSigner(keystore);
|
||||
|
||||
const o = JSON.parse(fs.readFileSync(filePath).toString());
|
||||
|
||||
cic.Custom.toKey(identifier).then((uid) => {
|
||||
const s = new crdt.Syncable(uid, o);
|
||||
s.setSigner(signer);
|
||||
s.onwrap = (env) => {
|
||||
sendit(identifier, env);
|
||||
};
|
||||
s.sign();
|
||||
});
|
||||
}
|
||||
|
||||
const privateKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE'));
|
||||
const publicKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE'));
|
||||
pk = fs.readFileSync(privateKeyPath);
|
||||
pubk = fs.readFileSync(publicKeyPath);
|
||||
|
||||
new crdt.PGPKeyStore(
|
||||
config.get('PGP_PASSPHRASE'),
|
||||
pk,
|
||||
pubk,
|
||||
undefined,
|
||||
undefined,
|
||||
importMetaCustom,
|
||||
);
|
||||
|
||||
const batchSize = 16;
|
||||
const batchDelay = 1000;
|
||||
const total = parseInt(process.argv[3]);
|
||||
const dataDir = process.argv[2];
|
||||
const workDir = path.join(dataDir, 'preferences/meta');
|
||||
const userDir = path.join(dataDir, 'preferences/new');
|
||||
let count = 0;
|
||||
let batchCount = 0;
|
||||
|
||||
|
||||
function importMetaCustom(keystore) {
|
||||
let err;
|
||||
let files;
|
||||
|
||||
try {
|
||||
err, files = fs.readdirSync(workDir);
|
||||
} catch {
|
||||
console.error('source directory not yet ready', workDir);
|
||||
setTimeout(importMetaCustom, batchDelay, keystore);
|
||||
return;
|
||||
}
|
||||
let limit = batchSize;
|
||||
if (files.length < limit) {
|
||||
limit = files.length;
|
||||
}
|
||||
for (let i = 0; i < limit; i++) {
|
||||
const file = files[i];
|
||||
if (file.length < 3) {
|
||||
console.debug('skipping file', file);
|
||||
continue;
|
||||
}
|
||||
//const identifier = file.substr(0,file.length-5);
|
||||
const identifier = file;
|
||||
const filePath = path.join(workDir, file);
|
||||
console.log(filePath);
|
||||
|
||||
//const address = fs.readFileSync(filePath).toString().substring(2).toUpperCase();
|
||||
const custom = JSON.parse(fs.readFileSync(filePath).toString());
|
||||
const customFilePath = path.join(
|
||||
userDir,
|
||||
identifier.substring(0, 2),
|
||||
identifier.substring(2, 4),
|
||||
identifier + '.json',
|
||||
);
|
||||
|
||||
doOne(keystore, filePath, identifier);
|
||||
fs.unlinkSync(filePath);
|
||||
count++;
|
||||
batchCount++;
|
||||
if (batchCount == batchSize) {
|
||||
console.debug('reached batch size, breathing');
|
||||
batchCount=0;
|
||||
setTimeout(importMetaCustom, batchDelay, keystore);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (count == total) {
|
||||
return;
|
||||
}
|
||||
setTimeout(importMetaCustom, 100, keystore);
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
# standard imports
|
||||
import os
|
||||
import logging
|
||||
import random
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
@ -136,6 +137,42 @@ def generate_metadata(self, address, phone):
|
||||
)
|
||||
os.symlink(os.path.realpath(filepath), meta_filepath)
|
||||
|
||||
# write ussd data
|
||||
ussd_data = {
|
||||
'phone': phone,
|
||||
'is_activated': 1,
|
||||
'preferred_language': random.sample(['en', 'sw'], 1)[0],
|
||||
'is_disabled': False
|
||||
}
|
||||
ussd_data_dir = os.path.join(self.import_dir, 'ussd')
|
||||
ussd_data_file_path = os.path.join(ussd_data_dir, f'{old_address}.json')
|
||||
f = open(ussd_data_file_path, 'w')
|
||||
f.write(json.dumps(ussd_data))
|
||||
f.close()
|
||||
|
||||
# write preferences data
|
||||
preferences_dir = os.path.join(self.import_dir, 'preferences')
|
||||
preferences_data = {
|
||||
'preferred_language': ussd_data['preferred_language']
|
||||
}
|
||||
|
||||
preferences_key = generate_metadata_pointer(bytes.fromhex(new_address_clean[2:]), ':cic.preferences')
|
||||
preferences_filepath = os.path.join(preferences_dir, 'meta', preferences_key)
|
||||
|
||||
filepath = os.path.join(
|
||||
preferences_dir,
|
||||
'new',
|
||||
preferences_key[:2].upper(),
|
||||
preferences_key[2:4].upper(),
|
||||
preferences_key.upper() + '.json'
|
||||
)
|
||||
os.makedirs(os.path.dirname(filepath), exist_ok=True)
|
||||
|
||||
f = open(filepath, 'w')
|
||||
f.write(json.dumps(preferences_data))
|
||||
f.close()
|
||||
os.symlink(os.path.realpath(filepath), preferences_filepath)
|
||||
|
||||
logg.debug('found metadata {} for phone {}'.format(o, phone))
|
||||
|
||||
return address
|
||||
|
@ -64,6 +64,12 @@ ps = r.pubsub()
|
||||
user_new_dir = os.path.join(args.user_dir, 'new')
|
||||
os.makedirs(user_new_dir)
|
||||
|
||||
ussd_data_dir = os.path.join(args.user_dir, 'ussd')
|
||||
os.makedirs(ussd_data_dir)
|
||||
|
||||
preferences_dir = os.path.join(args.user_dir, 'preferences')
|
||||
os.makedirs(os.path.join(preferences_dir, 'meta'))
|
||||
|
||||
meta_dir = os.path.join(args.user_dir, 'meta')
|
||||
os.makedirs(meta_dir)
|
||||
|
||||
@ -128,59 +134,57 @@ if __name__ == '__main__':
|
||||
if y[len(y)-5:] != '.json':
|
||||
continue
|
||||
# handle json containing person object
|
||||
filepath = None
|
||||
if y[:15] != '_ussd_data.json':
|
||||
filepath = os.path.join(x[0], y)
|
||||
f = open(filepath, 'r')
|
||||
try:
|
||||
o = json.load(f)
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
f.close()
|
||||
logg.error('load error for {}: {}'.format(y, e))
|
||||
continue
|
||||
filepath = os.path.join(x[0], y)
|
||||
f = open(filepath, 'r')
|
||||
try:
|
||||
o = json.load(f)
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
f.close()
|
||||
u = Person.deserialize(o)
|
||||
logg.error('load error for {}: {}'.format(y, e))
|
||||
continue
|
||||
f.close()
|
||||
u = Person.deserialize(o)
|
||||
|
||||
new_address = register_ussd(i, u)
|
||||
new_address = register_ussd(i, u)
|
||||
|
||||
phone_object = phonenumbers.parse(u.tel)
|
||||
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
|
||||
phone_object = phonenumbers.parse(u.tel)
|
||||
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
|
||||
|
||||
s_phone = celery.signature(
|
||||
'import_task.resolve_phone',
|
||||
[
|
||||
phone,
|
||||
],
|
||||
queue='cic-import-ussd',
|
||||
)
|
||||
s_phone = celery.signature(
|
||||
'import_task.resolve_phone',
|
||||
[
|
||||
phone,
|
||||
],
|
||||
queue='cic-import-ussd',
|
||||
)
|
||||
|
||||
s_meta = celery.signature(
|
||||
'import_task.generate_metadata',
|
||||
[
|
||||
phone,
|
||||
],
|
||||
queue='cic-import-ussd',
|
||||
)
|
||||
s_meta = celery.signature(
|
||||
'import_task.generate_metadata',
|
||||
[
|
||||
phone,
|
||||
],
|
||||
queue='cic-import-ussd',
|
||||
)
|
||||
|
||||
s_balance = celery.signature(
|
||||
'import_task.opening_balance_tx',
|
||||
[
|
||||
phone,
|
||||
i,
|
||||
],
|
||||
queue='cic-import-ussd',
|
||||
)
|
||||
s_balance = celery.signature(
|
||||
'import_task.opening_balance_tx',
|
||||
[
|
||||
phone,
|
||||
i,
|
||||
],
|
||||
queue='cic-import-ussd',
|
||||
)
|
||||
|
||||
s_meta.link(s_balance)
|
||||
s_phone.link(s_meta)
|
||||
# block time plus a bit of time for ussd processing
|
||||
s_phone.apply_async(countdown=7)
|
||||
s_meta.link(s_balance)
|
||||
s_phone.link(s_meta)
|
||||
# block time plus a bit of time for ussd processing
|
||||
s_phone.apply_async(countdown=7)
|
||||
|
||||
i += 1
|
||||
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
|
||||
i += 1
|
||||
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
|
||||
|
||||
j += 1
|
||||
if j == batch_size:
|
||||
time.sleep(batch_delay)
|
||||
j = 0
|
||||
j += 1
|
||||
if j == batch_size:
|
||||
time.sleep(batch_delay)
|
||||
j = 0
|
||||
|
||||
|
@ -228,7 +228,6 @@ def prepareLocalFilePath(datadir, address):
|
||||
if __name__ == '__main__':
|
||||
|
||||
base_dir = os.path.join(user_dir, 'old')
|
||||
ussd_dir = os.path.join(user_dir, 'ussd')
|
||||
os.makedirs(base_dir, exist_ok=True)
|
||||
|
||||
fa = open(os.path.join(user_dir, 'balances.csv'), 'w')
|
||||
@ -248,23 +247,11 @@ if __name__ == '__main__':
|
||||
|
||||
print(o)
|
||||
|
||||
ussd_data = {
|
||||
'phone': phone,
|
||||
'is_activated': 1,
|
||||
'preferred_language': random.sample(['en', 'sw'], 1)[0],
|
||||
'is_disabled': False
|
||||
}
|
||||
|
||||
d = prepareLocalFilePath(base_dir, uid)
|
||||
f = open('{}/{}'.format(d, uid + '.json'), 'w')
|
||||
json.dump(o.serialize(), f)
|
||||
f.close()
|
||||
|
||||
d = prepareLocalFilePath(ussd_dir, uid)
|
||||
x = open('{}/{}'.format(d, uid + '_ussd_data.json'), 'w')
|
||||
json.dump(ussd_data, x)
|
||||
x.close()
|
||||
|
||||
pidx = genPhoneIndex(phone)
|
||||
d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx)
|
||||
f = open('{}/{}'.format(d, pidx), 'w')
|
||||
|
Loading…
Reference in New Issue
Block a user