Merge branch 'philip/fix-import-script-pointers' into 'master'

Philip/fix import script pointers

See merge request grassrootseconomics/cic-internal-integration!194
This commit is contained in:
Blair Vanderlugt 2021-06-25 15:36:06 +00:00
commit 8529c349ca
5 changed files with 222 additions and 62 deletions

View File

@ -194,8 +194,7 @@ if __name__ == '__main__':
f.write(json.dumps(o)) f.write(json.dumps(o))
f.close() f.close()
#fi.write('{},{}\n'.format(new_address, old_address)) meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person')
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), 'cic.person')
meta_filepath = os.path.join(meta_dir, '{}.json'.format(new_address_clean.upper())) meta_filepath = os.path.join(meta_dir, '{}.json'.format(new_address_clean.upper()))
os.symlink(os.path.realpath(filepath), meta_filepath) os.symlink(os.path.realpath(filepath), meta_filepath)
@ -221,7 +220,7 @@ if __name__ == '__main__':
# custom data # custom data
custom_key = generate_metadata_pointer(phone.encode('utf-8'), ':cic.custom') custom_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.custom')
custom_filepath = os.path.join(custom_dir, 'meta', custom_key) custom_filepath = os.path.join(custom_dir, 'meta', custom_key)
filepath = os.path.join( filepath = os.path.join(

View File

@ -0,0 +1,133 @@
const fs = require('fs');
const path = require('path');
const http = require('http');
const cic = require('@cicnet/cic-client-meta');
const crdt = require('@cicnet/crdt-meta');
//const conf = JSON.parse(fs.readFileSync('./cic.conf'));
const config = new crdt.Config('./config');
config.process();
console.log(config);
function sendit(uid, envelope) {
const d = envelope.toJSON();
const contentLength = (new TextEncoder().encode(d)).length;
const opts = {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Content-Length': contentLength,
'X-CIC-AUTOMERGE': 'client',
},
};
let url = config.get('META_URL');
url = url.replace(new RegExp('^(.+://[^/]+)/*$'), '$1/');
console.log('posting to url: ' + url + uid);
const req = http.request(url + uid, opts, (res) => {
res.on('data', process.stdout.write);
res.on('end', () => {
console.log('result', res.statusCode, res.headers);
});
});
if (!req.write(d)) {
console.error('foo', d);
process.exit(1);
}
req.end();
}
function doOne(keystore, filePath, identifier) {
const signer = new crdt.PGPSigner(keystore);
const o = JSON.parse(fs.readFileSync(filePath).toString());
cic.Custom.toKey(identifier).then((uid) => {
const s = new crdt.Syncable(uid, o);
s.setSigner(signer);
s.onwrap = (env) => {
sendit(identifier, env);
};
s.sign();
});
}
const privateKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE'));
const publicKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE'));
pk = fs.readFileSync(privateKeyPath);
pubk = fs.readFileSync(publicKeyPath);
new crdt.PGPKeyStore(
config.get('PGP_PASSPHRASE'),
pk,
pubk,
undefined,
undefined,
importMetaCustom,
);
const batchSize = 16;
const batchDelay = 1000;
const total = parseInt(process.argv[3]);
const dataDir = process.argv[2];
const workDir = path.join(dataDir, 'preferences/meta');
const userDir = path.join(dataDir, 'preferences/new');
let count = 0;
let batchCount = 0;
function importMetaCustom(keystore) {
let err;
let files;
try {
err, files = fs.readdirSync(workDir);
} catch {
console.error('source directory not yet ready', workDir);
setTimeout(importMetaCustom, batchDelay, keystore);
return;
}
let limit = batchSize;
if (files.length < limit) {
limit = files.length;
}
for (let i = 0; i < limit; i++) {
const file = files[i];
if (file.length < 3) {
console.debug('skipping file', file);
continue;
}
//const identifier = file.substr(0,file.length-5);
const identifier = file;
const filePath = path.join(workDir, file);
console.log(filePath);
//const address = fs.readFileSync(filePath).toString().substring(2).toUpperCase();
const custom = JSON.parse(fs.readFileSync(filePath).toString());
const customFilePath = path.join(
userDir,
identifier.substring(0, 2),
identifier.substring(2, 4),
identifier + '.json',
);
doOne(keystore, filePath, identifier);
fs.unlinkSync(filePath);
count++;
batchCount++;
if (batchCount == batchSize) {
console.debug('reached batch size, breathing');
batchCount=0;
setTimeout(importMetaCustom, batchDelay, keystore);
return;
}
}
if (count == total) {
return;
}
setTimeout(importMetaCustom, 100, keystore);
}

View File

@ -1,6 +1,7 @@
# standard imports # standard imports
import os import os
import logging import logging
import random
import urllib.parse import urllib.parse
import urllib.error import urllib.error
import urllib.request import urllib.request
@ -136,6 +137,42 @@ def generate_metadata(self, address, phone):
) )
os.symlink(os.path.realpath(filepath), meta_filepath) os.symlink(os.path.realpath(filepath), meta_filepath)
# write ussd data
ussd_data = {
'phone': phone,
'is_activated': 1,
'preferred_language': random.sample(['en', 'sw'], 1)[0],
'is_disabled': False
}
ussd_data_dir = os.path.join(self.import_dir, 'ussd')
ussd_data_file_path = os.path.join(ussd_data_dir, f'{old_address}.json')
f = open(ussd_data_file_path, 'w')
f.write(json.dumps(ussd_data))
f.close()
# write preferences data
preferences_dir = os.path.join(self.import_dir, 'preferences')
preferences_data = {
'preferred_language': ussd_data['preferred_language']
}
preferences_key = generate_metadata_pointer(bytes.fromhex(new_address_clean[2:]), ':cic.preferences')
preferences_filepath = os.path.join(preferences_dir, 'meta', preferences_key)
filepath = os.path.join(
preferences_dir,
'new',
preferences_key[:2].upper(),
preferences_key[2:4].upper(),
preferences_key.upper() + '.json'
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
f = open(filepath, 'w')
f.write(json.dumps(preferences_data))
f.close()
os.symlink(os.path.realpath(filepath), preferences_filepath)
logg.debug('found metadata {} for phone {}'.format(o, phone)) logg.debug('found metadata {} for phone {}'.format(o, phone))
return address return address

View File

@ -64,6 +64,12 @@ ps = r.pubsub()
user_new_dir = os.path.join(args.user_dir, 'new') user_new_dir = os.path.join(args.user_dir, 'new')
os.makedirs(user_new_dir) os.makedirs(user_new_dir)
ussd_data_dir = os.path.join(args.user_dir, 'ussd')
os.makedirs(ussd_data_dir)
preferences_dir = os.path.join(args.user_dir, 'preferences')
os.makedirs(os.path.join(preferences_dir, 'meta'))
meta_dir = os.path.join(args.user_dir, 'meta') meta_dir = os.path.join(args.user_dir, 'meta')
os.makedirs(meta_dir) os.makedirs(meta_dir)
@ -128,59 +134,57 @@ if __name__ == '__main__':
if y[len(y)-5:] != '.json': if y[len(y)-5:] != '.json':
continue continue
# handle json containing person object # handle json containing person object
filepath = None filepath = os.path.join(x[0], y)
if y[:15] != '_ussd_data.json': f = open(filepath, 'r')
filepath = os.path.join(x[0], y) try:
f = open(filepath, 'r') o = json.load(f)
try: except json.decoder.JSONDecodeError as e:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close() f.close()
u = Person.deserialize(o) logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
new_address = register_ussd(i, u) new_address = register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel) phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature( s_phone = celery.signature(
'import_task.resolve_phone', 'import_task.resolve_phone',
[ [
phone, phone,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s_meta = celery.signature( s_meta = celery.signature(
'import_task.generate_metadata', 'import_task.generate_metadata',
[ [
phone, phone,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s_balance = celery.signature( s_balance = celery.signature(
'import_task.opening_balance_tx', 'import_task.opening_balance_tx',
[ [
phone, phone,
i, i,
], ],
queue='cic-import-ussd', queue='cic-import-ussd',
) )
s_meta.link(s_balance) s_meta.link(s_balance)
s_phone.link(s_meta) s_phone.link(s_meta)
# block time plus a bit of time for ussd processing # block time plus a bit of time for ussd processing
s_phone.apply_async(countdown=7) s_phone.apply_async(countdown=7)
i += 1 i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1 j += 1
if j == batch_size: if j == batch_size:
time.sleep(batch_delay) time.sleep(batch_delay)
j = 0 j = 0

View File

@ -228,7 +228,6 @@ def prepareLocalFilePath(datadir, address):
if __name__ == '__main__': if __name__ == '__main__':
base_dir = os.path.join(user_dir, 'old') base_dir = os.path.join(user_dir, 'old')
ussd_dir = os.path.join(user_dir, 'ussd')
os.makedirs(base_dir, exist_ok=True) os.makedirs(base_dir, exist_ok=True)
fa = open(os.path.join(user_dir, 'balances.csv'), 'w') fa = open(os.path.join(user_dir, 'balances.csv'), 'w')
@ -248,23 +247,11 @@ if __name__ == '__main__':
print(o) print(o)
ussd_data = {
'phone': phone,
'is_activated': 1,
'preferred_language': random.sample(['en', 'sw'], 1)[0],
'is_disabled': False
}
d = prepareLocalFilePath(base_dir, uid) d = prepareLocalFilePath(base_dir, uid)
f = open('{}/{}'.format(d, uid + '.json'), 'w') f = open('{}/{}'.format(d, uid + '.json'), 'w')
json.dump(o.serialize(), f) json.dump(o.serialize(), f)
f.close() f.close()
d = prepareLocalFilePath(ussd_dir, uid)
x = open('{}/{}'.format(d, uid + '_ussd_data.json'), 'w')
json.dump(ussd_data, x)
x.close()
pidx = genPhoneIndex(phone) pidx = genPhoneIndex(phone)
d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx) d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx)
f = open('{}/{}'.format(d, pidx), 'w') f = open('{}/{}'.format(d, pidx), 'w')