From 4368d2bf59819f94318d2f3e4299e20df913c8d0 Mon Sep 17 00:00:00 2001 From: Philip Wafula Date: Fri, 25 Jun 2021 15:36:05 +0000 Subject: [PATCH] Philip/fix import script pointers --- apps/data-seeding/cic_eth/import_users.py | 5 +- .../cic_meta/import_meta_preferences.js | 133 ++++++++++++++++++ apps/data-seeding/cic_ussd/import_task.py | 37 +++++ apps/data-seeding/cic_ussd/import_users.py | 96 +++++++------ apps/data-seeding/create_import_users.py | 13 -- 5 files changed, 222 insertions(+), 62 deletions(-) create mode 100644 apps/data-seeding/cic_meta/import_meta_preferences.js diff --git a/apps/data-seeding/cic_eth/import_users.py b/apps/data-seeding/cic_eth/import_users.py index f803913b..8faa152c 100644 --- a/apps/data-seeding/cic_eth/import_users.py +++ b/apps/data-seeding/cic_eth/import_users.py @@ -194,8 +194,7 @@ if __name__ == '__main__': f.write(json.dumps(o)) f.close() - #fi.write('{},{}\n'.format(new_address, old_address)) - meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), 'cic.person') + meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person') meta_filepath = os.path.join(meta_dir, '{}.json'.format(new_address_clean.upper())) os.symlink(os.path.realpath(filepath), meta_filepath) @@ -221,7 +220,7 @@ if __name__ == '__main__': # custom data - custom_key = generate_metadata_pointer(phone.encode('utf-8'), ':cic.custom') + custom_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.custom') custom_filepath = os.path.join(custom_dir, 'meta', custom_key) filepath = os.path.join( diff --git a/apps/data-seeding/cic_meta/import_meta_preferences.js b/apps/data-seeding/cic_meta/import_meta_preferences.js new file mode 100644 index 00000000..be9be563 --- /dev/null +++ b/apps/data-seeding/cic_meta/import_meta_preferences.js @@ -0,0 +1,133 @@ +const fs = require('fs'); +const path = require('path'); +const http = require('http'); + +const cic = require('@cicnet/cic-client-meta'); +const crdt = require('@cicnet/crdt-meta'); + +//const conf = JSON.parse(fs.readFileSync('./cic.conf')); + +const config = new crdt.Config('./config'); +config.process(); +console.log(config); + + +function sendit(uid, envelope) { + const d = envelope.toJSON(); + + const contentLength = (new TextEncoder().encode(d)).length; + const opts = { + method: 'PUT', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': contentLength, + 'X-CIC-AUTOMERGE': 'client', + + }, + }; + let url = config.get('META_URL'); + url = url.replace(new RegExp('^(.+://[^/]+)/*$'), '$1/'); + console.log('posting to url: ' + url + uid); + const req = http.request(url + uid, opts, (res) => { + res.on('data', process.stdout.write); + res.on('end', () => { + console.log('result', res.statusCode, res.headers); + }); + }); + if (!req.write(d)) { + console.error('foo', d); + process.exit(1); + } + req.end(); +} + +function doOne(keystore, filePath, identifier) { + const signer = new crdt.PGPSigner(keystore); + + const o = JSON.parse(fs.readFileSync(filePath).toString()); + + cic.Custom.toKey(identifier).then((uid) => { + const s = new crdt.Syncable(uid, o); + s.setSigner(signer); + s.onwrap = (env) => { + sendit(identifier, env); + }; + s.sign(); + }); +} + +const privateKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE')); +const publicKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE')); +pk = fs.readFileSync(privateKeyPath); +pubk = fs.readFileSync(publicKeyPath); + +new crdt.PGPKeyStore( + config.get('PGP_PASSPHRASE'), + pk, + pubk, + undefined, + undefined, + importMetaCustom, +); + +const batchSize = 16; +const batchDelay = 1000; +const total = parseInt(process.argv[3]); +const dataDir = process.argv[2]; +const workDir = path.join(dataDir, 'preferences/meta'); +const userDir = path.join(dataDir, 'preferences/new'); +let count = 0; +let batchCount = 0; + + +function importMetaCustom(keystore) { + let err; + let files; + + try { + err, files = fs.readdirSync(workDir); + } catch { + console.error('source directory not yet ready', workDir); + setTimeout(importMetaCustom, batchDelay, keystore); + return; + } + let limit = batchSize; + if (files.length < limit) { + limit = files.length; + } + for (let i = 0; i < limit; i++) { + const file = files[i]; + if (file.length < 3) { + console.debug('skipping file', file); + continue; + } + //const identifier = file.substr(0,file.length-5); + const identifier = file; + const filePath = path.join(workDir, file); + console.log(filePath); + + //const address = fs.readFileSync(filePath).toString().substring(2).toUpperCase(); + const custom = JSON.parse(fs.readFileSync(filePath).toString()); + const customFilePath = path.join( + userDir, + identifier.substring(0, 2), + identifier.substring(2, 4), + identifier + '.json', + ); + + doOne(keystore, filePath, identifier); + fs.unlinkSync(filePath); + count++; + batchCount++; + if (batchCount == batchSize) { + console.debug('reached batch size, breathing'); + batchCount=0; + setTimeout(importMetaCustom, batchDelay, keystore); + return; + } + } + if (count == total) { + return; + } + setTimeout(importMetaCustom, 100, keystore); +} diff --git a/apps/data-seeding/cic_ussd/import_task.py b/apps/data-seeding/cic_ussd/import_task.py index bc06b4e8..c5917688 100644 --- a/apps/data-seeding/cic_ussd/import_task.py +++ b/apps/data-seeding/cic_ussd/import_task.py @@ -1,6 +1,7 @@ # standard imports import os import logging +import random import urllib.parse import urllib.error import urllib.request @@ -136,6 +137,42 @@ def generate_metadata(self, address, phone): ) os.symlink(os.path.realpath(filepath), meta_filepath) + # write ussd data + ussd_data = { + 'phone': phone, + 'is_activated': 1, + 'preferred_language': random.sample(['en', 'sw'], 1)[0], + 'is_disabled': False + } + ussd_data_dir = os.path.join(self.import_dir, 'ussd') + ussd_data_file_path = os.path.join(ussd_data_dir, f'{old_address}.json') + f = open(ussd_data_file_path, 'w') + f.write(json.dumps(ussd_data)) + f.close() + + # write preferences data + preferences_dir = os.path.join(self.import_dir, 'preferences') + preferences_data = { + 'preferred_language': ussd_data['preferred_language'] + } + + preferences_key = generate_metadata_pointer(bytes.fromhex(new_address_clean[2:]), ':cic.preferences') + preferences_filepath = os.path.join(preferences_dir, 'meta', preferences_key) + + filepath = os.path.join( + preferences_dir, + 'new', + preferences_key[:2].upper(), + preferences_key[2:4].upper(), + preferences_key.upper() + '.json' + ) + os.makedirs(os.path.dirname(filepath), exist_ok=True) + + f = open(filepath, 'w') + f.write(json.dumps(preferences_data)) + f.close() + os.symlink(os.path.realpath(filepath), preferences_filepath) + logg.debug('found metadata {} for phone {}'.format(o, phone)) return address diff --git a/apps/data-seeding/cic_ussd/import_users.py b/apps/data-seeding/cic_ussd/import_users.py index 7dd4ff5e..81fd63e2 100644 --- a/apps/data-seeding/cic_ussd/import_users.py +++ b/apps/data-seeding/cic_ussd/import_users.py @@ -64,6 +64,12 @@ ps = r.pubsub() user_new_dir = os.path.join(args.user_dir, 'new') os.makedirs(user_new_dir) +ussd_data_dir = os.path.join(args.user_dir, 'ussd') +os.makedirs(ussd_data_dir) + +preferences_dir = os.path.join(args.user_dir, 'preferences') +os.makedirs(os.path.join(preferences_dir, 'meta')) + meta_dir = os.path.join(args.user_dir, 'meta') os.makedirs(meta_dir) @@ -128,59 +134,57 @@ if __name__ == '__main__': if y[len(y)-5:] != '.json': continue # handle json containing person object - filepath = None - if y[:15] != '_ussd_data.json': - filepath = os.path.join(x[0], y) - f = open(filepath, 'r') - try: - o = json.load(f) - except json.decoder.JSONDecodeError as e: - f.close() - logg.error('load error for {}: {}'.format(y, e)) - continue + filepath = os.path.join(x[0], y) + f = open(filepath, 'r') + try: + o = json.load(f) + except json.decoder.JSONDecodeError as e: f.close() - u = Person.deserialize(o) + logg.error('load error for {}: {}'.format(y, e)) + continue + f.close() + u = Person.deserialize(o) - new_address = register_ussd(i, u) + new_address = register_ussd(i, u) - phone_object = phonenumbers.parse(u.tel) - phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) + phone_object = phonenumbers.parse(u.tel) + phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164) - s_phone = celery.signature( - 'import_task.resolve_phone', - [ - phone, - ], - queue='cic-import-ussd', - ) + s_phone = celery.signature( + 'import_task.resolve_phone', + [ + phone, + ], + queue='cic-import-ussd', + ) - s_meta = celery.signature( - 'import_task.generate_metadata', - [ - phone, - ], - queue='cic-import-ussd', - ) + s_meta = celery.signature( + 'import_task.generate_metadata', + [ + phone, + ], + queue='cic-import-ussd', + ) - s_balance = celery.signature( - 'import_task.opening_balance_tx', - [ - phone, - i, - ], - queue='cic-import-ussd', - ) + s_balance = celery.signature( + 'import_task.opening_balance_tx', + [ + phone, + i, + ], + queue='cic-import-ussd', + ) - s_meta.link(s_balance) - s_phone.link(s_meta) - # block time plus a bit of time for ussd processing - s_phone.apply_async(countdown=7) + s_meta.link(s_balance) + s_phone.link(s_meta) + # block time plus a bit of time for ussd processing + s_phone.apply_async(countdown=7) - i += 1 - sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") + i += 1 + sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r") - j += 1 - if j == batch_size: - time.sleep(batch_delay) - j = 0 + j += 1 + if j == batch_size: + time.sleep(batch_delay) + j = 0 diff --git a/apps/data-seeding/create_import_users.py b/apps/data-seeding/create_import_users.py index 58e0f137..0e0b51ad 100644 --- a/apps/data-seeding/create_import_users.py +++ b/apps/data-seeding/create_import_users.py @@ -228,7 +228,6 @@ def prepareLocalFilePath(datadir, address): if __name__ == '__main__': base_dir = os.path.join(user_dir, 'old') - ussd_dir = os.path.join(user_dir, 'ussd') os.makedirs(base_dir, exist_ok=True) fa = open(os.path.join(user_dir, 'balances.csv'), 'w') @@ -248,23 +247,11 @@ if __name__ == '__main__': print(o) - ussd_data = { - 'phone': phone, - 'is_activated': 1, - 'preferred_language': random.sample(['en', 'sw'], 1)[0], - 'is_disabled': False - } - d = prepareLocalFilePath(base_dir, uid) f = open('{}/{}'.format(d, uid + '.json'), 'w') json.dump(o.serialize(), f) f.close() - d = prepareLocalFilePath(ussd_dir, uid) - x = open('{}/{}'.format(d, uid + '_ussd_data.json'), 'w') - json.dump(ussd_data, x) - x.close() - pidx = genPhoneIndex(phone) d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx) f = open('{}/{}'.format(d, pidx), 'w')