From 29e91fafab0395498a0ad89404b22571e264d89e Mon Sep 17 00:00:00 2001 From: Blair Vanderlugt Date: Mon, 28 Jun 2021 22:40:54 +0000 Subject: [PATCH] integration updates: meta imports --- apps/data-seeding/cic_meta/import_meta.js | 19 ++++++++++++---- apps/data-seeding/cic_ussd/import_balance.py | 2 +- apps/data-seeding/cic_ussd/import_users.py | 23 +++++++++++++++----- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/apps/data-seeding/cic_meta/import_meta.js b/apps/data-seeding/cic_meta/import_meta.js index 2884fa9..7bdda53 100644 --- a/apps/data-seeding/cic_meta/import_meta.js +++ b/apps/data-seeding/cic_meta/import_meta.js @@ -31,13 +31,16 @@ function sendit(uid, envelope) { const req = http.request(url + uid, opts, (res) => { res.on('data', process.stdout.write); res.on('end', () => { + if (!res.complete) { + console.log('The connection was terminated while the message was being sent.') + } console.log('result', res.statusCode, res.headers); }); }); - if (!req.write(d)) { - console.error('foo', d); - process.exit(1); - } + req.on('error', (err) => { + console.log('ERROR when talking to meta', err) + }) + req.write(d) req.end(); } @@ -55,6 +58,7 @@ function doOne(keystore, filePath) { const s = new crdt.Syncable(uid, o); s.setSigner(signer); s.onwrap = (env) => { + console.log(`Sending uid: ${uid} and env: ${env} to meta`) sendit(uid, env); }; s.sign(); @@ -84,6 +88,7 @@ let batchCount = 0; function importMeta(keystore) { + console.log('Running importMeta....') let err; let files; @@ -94,6 +99,11 @@ function importMeta(keystore) { setTimeout(importMeta, batchDelay, keystore); return; } + console.log(`Trying to read ${files.length} files`) + if (files === 0) { + console.log(`ERROR did not find any files under ${workDir}. \nLooks like there is no work for me, bailing!`) + process.exit(1) + } let limit = batchSize; if (files.length < limit) { limit = files.length; @@ -108,6 +118,7 @@ function importMeta(keystore) { doOne(keystore, filePath); count++; batchCount++; + //console.log('done one', count, batchCount) if (batchCount == batchSize) { console.debug('reached batch size, breathing'); batchCount=0; diff --git a/apps/data-seeding/cic_ussd/import_balance.py b/apps/data-seeding/cic_ussd/import_balance.py index 19315b1..5643560 100644 --- a/apps/data-seeding/cic_ussd/import_balance.py +++ b/apps/data-seeding/cic_ussd/import_balance.py @@ -70,7 +70,7 @@ args_override = { 'REDIS_DB': getattr(args, 'redis_db'), 'META_HOST': getattr(args, 'meta_host'), 'META_PORT': getattr(args, 'meta_port'), - 'KEYSTORE_FILE_PATH': getattr(args, 'key-file') + 'KEYSTORE_FILE_PATH': getattr(args, 'y') } config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') diff --git a/apps/data-seeding/cic_ussd/import_users.py b/apps/data-seeding/cic_ussd/import_users.py index 07ef4dd..e44b92f 100644 --- a/apps/data-seeding/cic_ussd/import_users.py +++ b/apps/data-seeding/cic_ussd/import_users.py @@ -31,6 +31,9 @@ argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db t argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches') argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout') +argparser.add_argument('--ussd-host', dest='ussd_host', type=str, help="host to ussd app responsible for processing ussd requests.") +argparser.add_argument('--ussd-port', dest='ussd_port', type=str, help="port to ussd app responsible for processing ussd requests.") +argparser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true') argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') argparser.add_argument('-v', action='store_true', help='Be verbose') argparser.add_argument('-vv', action='store_true', help='Be more verbose') @@ -84,13 +87,21 @@ chain_str = str(chain_spec) batch_size = args.batch_size batch_delay = args.batch_delay - +ussd_port = args.ussd_port +ussd_host = args.ussd_host +ussd_no_ssl = args.ussd_no_ssl +if ussd_no_ssl is True: + ussd_ssl = False +else: + ussd_ssl = True def build_ussd_request(phone, host, port, service_code, username, password, ssl=False): url = 'http' if ssl: url += 's' - url += '://{}:{}'.format(host, port) + url += '://{}'.format(host) + if port: + url += ':{}'.format(port) url += '/?username={}&password={}'.format(username, password) logg.info('ussd service url {}'.format(url)) @@ -119,11 +130,13 @@ def register_ussd(i, u): logg.debug('tel {} {}'.format(u.tel, phone)) req = build_ussd_request( phone, - 'localhost', - config.get('CIC_USER_USSD_SVC_SERVICE_PORT'), + ussd_host, + ussd_port, config.get('APP_SERVICE_CODE'), '', - '') + '', + ussd_ssl + ) response = urllib.request.urlopen(req) response_data = response.read().decode('utf-8') state = response_data[:3]