Compare commits

...

11 Commits

Author SHA1 Message Date
nolash
a4a656707c Restore session worker in gas task tests 2021-09-01 18:43:21 +02:00
nolash
4a0dabe531 Merge branch 'master' into lash/lockfix 2021-09-01 18:03:08 +02:00
nolash
da751b64ce Upgrade deps 2021-09-01 18:03:03 +02:00
nolash
bff9d5bfd7 Fix 0x in check gas 2021-09-01 15:04:38 +02:00
c21c1eb2ef fix data seeding node installs 2021-08-31 11:43:01 -07:00
eb5e612105 minor update to import_ussd script 2021-08-30 11:09:47 -07:00
e017d11770 update readme 2021-08-30 10:14:22 -07:00
e327af68e1 Merge branch 'philip/refactor-import-scripts' into 'master'
Consolidated ussd dataseeding script

See merge request grassrootseconomics/cic-internal-integration!252
2021-08-29 09:55:47 +00:00
92cc6a3f27 Consolidated ussd dataseeding script 2021-08-29 09:55:47 +00:00
f42bf7754a Merge branch 'lash/lockfix' into 'master'
Normalize initial INIT lock address

See merge request grassrootseconomics/cic-internal-integration!260
2021-08-29 09:07:46 +00:00
nolash
7342927e91 Normalize initial INIT lock address 2021-08-29 10:43:12 +02:00
30 changed files with 2788 additions and 715 deletions

View File

@@ -2,25 +2,21 @@
## Getting started
## Make some keys
This repo uses docker-compose and docker buildkit. Set the following environment variables to get started:
```
docker build -t bloxie . && docker run -v "$(pwd)/keys:/root/keys" --rm -it -t bloxie account new --chain /root/bloxberg.json --keys-path /root/keys
export COMPOSE_DOCKER_CLI_BUILD=1
export DOCKER_BUILDKIT=1
```
### Prepare the repo
This is stuff we need to put in makefile but for now...
File mounts and permisssions need to be set
start services, database, redis and local ethereum node
```
chmod -R 755 scripts/initdb apps/cic-meta/scripts/initdb
````
start cluster
docker-compose up -d
```
docker-compose up
Run app/contract-migration to deploy contracts
```
RUN_MASK=3 docker-compose up contract-migration
```
stop cluster
@@ -28,7 +24,7 @@ stop cluster
docker-compose down
```
delete data
stop cluster and delete data
```
docker-compose down -v
```
@@ -38,5 +34,4 @@ rebuild an images
docker-compose up --build <service_name>
```
Deployment variables are writtend to service-configs/.env after everthing is up.

View File

@@ -8,8 +8,8 @@ Create Date: 2021-04-02 18:41:20.864265
import datetime
from alembic import op
import sqlalchemy as sa
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth.db.enum import LockEnum
from cic_eth.encode import ZERO_ADDRESS_NORMAL
# revision identifiers, used by Alembic.
@@ -30,7 +30,7 @@ def upgrade():
sa.Column("otx_id", sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS_NORMAL, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
def downgrade():

View File

@@ -3,7 +3,10 @@ import logging
# external imports
import celery
from hexathon import strip_0x
from hexathon import (
strip_0x,
add_0x,
)
#from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address
@@ -180,6 +183,7 @@ def check_gas(self, tx_hashes_hex, chain_spec_dict, txs_hex=[], address=None, ga
if not is_checksum_address(address):
raise ValueError('invalid address {}'.format(address))
address = tx_normalize.wallet_address(address)
address = add_0x(address)
tx_hashes = []
txs = []

View File

@@ -13,7 +13,6 @@ from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import RPCGasOracle
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
import liveness.linux
# local imports
from cic_eth.error import SeppukuError
@@ -48,6 +47,7 @@ class BaseTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, SeppukuError):
import liveness.linux
liveness.linux.reset(rundir=self.run_dir)
logg.critical(einfo)
msg = 'received critical exception {}, calling shutdown'.format(str(exc))

View File

@@ -10,7 +10,7 @@ version = (
0,
12,
4,
'alpha.7',
'alpha.8',
)
version_object = semver.VersionInfo(

View File

@@ -1,8 +1,6 @@
@node cic-eth configuration
@section Configuration
(refer to @code{cic-base} for a general overview of the config pipeline)
Configuration parameters are grouped by configuration filename.
@@ -40,7 +38,26 @@ Boolean value. If set, the amount of available context for a task in the result
@subsection database
See ref cic-base when ready
@table @var
@item host
Database host
@item port
Database port
@item name
Database name
@item user
Database user
@item password
Database password
@item engine
The engine part of the dsn connection string (@code{postgresql} in @code{postgresql+psycopg2})
@item driver
The driver part of the dsn connection string (@code{psycopg2} in @code{postgresql+psycopg2})
@item pool_size
Connection pool size for database drivers that provide connection pooling
@item debug
Output actual sql queries to logs. Potentially very verbose
@end table
@subsection eth

View File

@@ -75,7 +75,7 @@ def test_task_check_gas_ok(
'cic_eth.eth.gas.check_gas',
[
[
tx_hash_hex,
strip_0x(tx_hash_hex),
],
default_chain_spec.asdict(),
[],
@@ -283,4 +283,3 @@ def test_task_resend_explicit(
tx_after = unpack(bytes.fromhex(strip_0x(otx.signed_tx)), default_chain_spec)
logg.debug('gasprices before {} after {}'.format(tx_before['gasPrice'], tx_after['gasPrice']))
assert tx_after['gasPrice'] > tx_before['gasPrice']

View File

@@ -11,12 +11,12 @@ celery_app = celery.current_app
@celery_app.task
def persist_notification(recipient, message):
def persist_notification(message, recipient):
"""
:param recipient:
:type recipient:
:param message:
:type message:
:param recipient:
:type recipient:
:return:
:rtype:
"""

View File

@@ -11,12 +11,13 @@ local_logg = logging.getLogger(__name__)
@celery_app.task
def log(recipient, message):
def log(message, recipient):
"""
:param recipient:
:type recipient:
:param message:
:type message:
:param recipient:
:type recipient:
:return:
:rtype:
"""

View File

@@ -1,5 +1,6 @@
# standard import
import decimal
import json
import logging
from typing import Dict, Tuple
@@ -8,6 +9,8 @@ from cic_eth.api import Api
from sqlalchemy.orm.session import Session
# local import
from cic_ussd.account.chain import Chain
from cic_ussd.account.tokens import get_cached_default_token
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.error import UnknownUssdRecipient
@@ -59,7 +62,9 @@ def from_wei(value: int) -> float:
:return: SRF equivalent of value in Wei
:rtype: float
"""
value = float(value) / 1e+6
cached_token_data = json.loads(get_cached_default_token(Chain.spec.__str__()))
token_decimals: int = cached_token_data.get('decimals')
value = float(value) / (10**token_decimals)
return truncate(value=value, decimals=2)
@@ -70,7 +75,9 @@ def to_wei(value: int) -> int:
:return: Wei equivalent of value in SRF
:rtype: int
"""
return int(value * 1e+6)
cached_token_data = json.loads(get_cached_default_token(Chain.spec.__str__()))
token_decimals: int = cached_token_data.get('decimals')
return int(value * (10**token_decimals))
def truncate(value: float, decimals: int):

View File

@@ -44,7 +44,7 @@ class MetadataRequestsHandler(Metadata):
def create(self, data: Union[Dict, str]):
""""""
data = json.dumps(data)
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
error_handler(result=result)

View File

@@ -146,7 +146,7 @@ def create_ussd_session(
)
def update_ussd_session(ussd_session: UssdSession,
def update_ussd_session(ussd_session: DbUssdSession,
user_input: str,
state: str,
data: Optional[dict] = None) -> UssdSession:

View File

@@ -138,26 +138,14 @@ def transaction_balances_callback(self, result: list, param: dict, status_code:
balances_data = result[0]
available_balance = calculate_available_balance(balances_data)
transaction = param
blockchain_address = transaction.get('blockchain_address')
transaction['available_balance'] = available_balance
queue = self.request.delivery_info.get('routing_key')
s_preferences_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_preferences_metadata', [blockchain_address], queue=queue
)
s_process_account_metadata = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [transaction], queue=queue
)
s_notify_account = celery.signature('cic_ussd.tasks.notifications.transaction', queue=queue)
if transaction.get('transaction_type') == 'transfer':
celery.chain(s_preferences_metadata, s_process_account_metadata, s_notify_account).apply_async()
if transaction.get('transaction_type') == 'tokengift':
s_process_account_metadata = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [{}, transaction], queue=queue
)
celery.chain(s_process_account_metadata, s_notify_account).apply_async()
celery.chain(s_process_account_metadata, s_notify_account).apply_async()
@celery_app.task

View File

@@ -8,6 +8,7 @@ import i18n
from chainlib.hash import strip_0x
# local imports
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.account.statement import get_cached_statement
from cic_ussd.account.transaction import aux_transaction_data, validate_transaction_account
from cic_ussd.cache import cache_data, cache_data_key
@@ -58,19 +59,17 @@ def cache_statement(parsed_transaction: dict, querying_party: str):
@celery_app.task
def parse_transaction(preferences: dict, transaction: dict) -> dict:
def parse_transaction(transaction: dict) -> dict:
"""This function parses transaction objects and collates all relevant data for system use i.e:
- An account's set preferred language.
- Account identifier that facilitates notification.
- Contextual tags i.e action and direction tags.
:param preferences: An account's set preferences.
:type preferences: dict
:param transaction: Transaction object.
:type transaction: dict
:return: Transaction object with contextual data for use in the system.
:rtype: dict
"""
preferred_language = preferences.get('preferred_language')
preferred_language = get_cached_preferred_language(transaction.get('blockchain_address'))
if not preferred_language:
preferred_language = i18n.config.get('fallback')
transaction['preferred_language'] = preferred_language
@@ -83,6 +82,8 @@ def parse_transaction(preferences: dict, transaction: dict) -> dict:
alt_account = session.query(Account).filter_by(blockchain_address=alt_blockchain_address).first()
if alt_account:
transaction['alt_metadata_id'] = alt_account.standard_metadata_id()
else:
transaction['alt_metadata_id'] = 'GRASSROOTS ECONOMICS'
transaction['metadata_id'] = account.standard_metadata_id()
transaction['phone_number'] = account.phone_number
session.close()

View File

@@ -1,6 +1,6 @@
cic-eth[tools]==0.12.4a4
chainlib-eth>=0.0.9a7,<0.1.0
eth-erc20>=0.1.2a2,<0.2.0
cic-eth[tools]==0.12.4a8
chainlib-eth>=0.0.9a9,<0.1.0
eth-erc20>=0.1.2a3,<0.2.0
erc20-demurrage-token>=0.0.5a2,<0.1.0
eth-accounts-index>=0.1.2a2,<0.2.0
eth-address-index>=0.2.3a4,<0.3.0

View File

@@ -2,7 +2,7 @@
.cache
.dot
**/doc
**/node_modules
node_modules/
**/venv
**/.venv

View File

@@ -1,64 +1,61 @@
# standard imports
import argparse
import logging
import sys
import os
import sys
# external imports
import celery
import confini
import redis
from chainlib.chain import ChainSpec
from chainlib.eth.address import to_checksum_address
from chainlib.eth.connection import EthHTTPConnection
from confini import Config
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore.dict import DictKeystore
# local imports
from import_task import ImportTask, MetadataTask
from import_util import BalanceProcessor, get_celery_worker_status
from import_task import ImportTask, MetadataTask
logging.basicConfig(level=logging.WARNING)
default_config_dir = './config'
logg = logging.getLogger()
config_dir = './config'
arg_parser = argparse.ArgumentParser(description='Daemon worker that handles data seeding tasks.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
arg_parser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
arg_parser.add_argument('--include-balances', dest='include_balances', help='include opening balance transactions',
action='store_true')
arg_parser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host')
arg_parser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host')
arg_parser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit data seeding tasks to.')
arg_parser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
arg_parser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
arg_parser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
arg_parser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
arg_parser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol',
help='Token symbol to use for transactions')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
arg_parser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
arg_parser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1',
help='chain spec')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host')
argparser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol',
help='Token symbol to use for transactions')
argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str,
help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to')
argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', default='out', type=str, help='user export directory')
args = argparser.parse_args(sys.argv[1:])
if args.v:
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
elif args.vv:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config = Config(args.c, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
@@ -73,88 +70,76 @@ args_override = {
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
logg.debug(f'config loaded from {args.c}:\n{config}')
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
r = redis.Redis(redis_host, redis_port, redis_db)
db_config = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
ImportTask.db_config = db_config
# create celery apps
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
status = get_celery_worker_status(celery_app=celery_app)
signer_address = None
keystore = DictKeystore()
if args.y is not None:
logg.debug('loading keystore file {}'.format(args.y))
signer_address = keystore.import_keystore_file(args.y)
logg.debug('now have key for signer address {}'.format(signer_address))
# define signer
os.path.isfile(args.y)
logg.debug(f'loading keystore file {args.y}')
signer_address = keystore.import_keystore_file(args.y)
logg.debug(f'now have key for signer address {signer_address}')
signer = EIP155Signer(keystore)
queue = args.q
chain_str = config.get('CIC_CHAIN_SPEC')
block_offset = 0
if args.head:
block_offset = -1
else:
block_offset = args.offset
block_offset = -1 if args.head else args.offset
chain_str = config.get('CIC_CHAIN_SPEC')
chain_spec = ChainSpec.from_chain_str(chain_str)
ImportTask.chain_spec = chain_spec
old_chain_spec_str = args.old_chain_spec
old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str)
user_dir = args.user_dir # user_out_dir from import_users.py
token_symbol = args.token_symbol
MetadataTask.meta_host = config.get('META_HOST')
MetadataTask.meta_port = config.get('META_PORT')
ImportTask.chain_spec = chain_spec
txs_dir = os.path.join(args.import_dir, 'txs')
os.makedirs(txs_dir, exist_ok=True)
sys.stdout.write(f'created txs dir: {txs_dir}')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
get_celery_worker_status(celery_app)
def main():
conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'),
signer_address, signer)
ImportTask.balance_processor.init(token_symbol)
# TODO get decimals from token
ImportTask.balance_processor = BalanceProcessor(conn,
chain_spec,
config.get('CIC_REGISTRY_ADDRESS'),
signer_address,
signer)
ImportTask.balance_processor.init(args.token_symbol)
balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r'))
remove_zeros = 10 ** 6
i = 0
while True:
l = f.readline()
if l is None:
break
r = l.split(',')
try:
address = to_checksum_address(r[0])
sys.stdout.write('loading balance {} {} {}'.format(i, address, r[1]).ljust(200) + "\r")
except ValueError:
break
balance = int(int(r[1].rstrip()) / remove_zeros)
balances[address] = balance
i += 1
f.close()
accuracy = 10 ** 6
count = 0
with open(f'{args.import_dir}/balances.csv', 'r') as balances_file:
while True:
line = balances_file.readline()
if line is None:
break
balance_data = line.split(',')
try:
blockchain_address = to_checksum_address(balance_data[0])
logg.info(
'loading balance: {} {} {}'.format(count, blockchain_address, balance_data[1].ljust(200) + "\r"))
except ValueError:
break
balance = int(int(balance_data[1].rstrip()) / accuracy)
balances[blockchain_address] = balance
count += 1
ImportTask.balances = balances
ImportTask.count = i
ImportTask.import_dir = user_dir
s = celery.signature(
'import_task.send_txs',
[
MetadataTask.balance_processor.nonce_offset,
],
queue=queue,
)
s.apply_async()
ImportTask.count = count
ImportTask.include_balances = args.include_balances is True
ImportTask.import_dir = args.import_dir
s_send_txs = celery.signature(
'import_task.send_txs', [ImportTask.balance_processor.nonce_offset], queue=args.q)
s_send_txs.apply_async()
argv = ['worker']
if args.vv:
@@ -165,6 +150,7 @@ def main():
argv.append(args.q)
argv.append('-n')
argv.append(args.q)
argv.append(f'--pidfile={args.q}.pid')
celery_app.worker_main(argv)

View File

@@ -1,71 +1,63 @@
# standard import
# standard imports
import argparse
import csv
import logging
import os
import psycopg2
# third-party imports
import celery
import confini
# external imports
from confini import Config
# local imports
from import_util import get_celery_worker_status
default_config_dir = './config'
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = './config'
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
arg_parser = argparse.ArgumentParser(description='Pins import script.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to')
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('pins_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
# set log levels
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
# process configs
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config = Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
status = get_celery_worker_status(celery_app=celery_app)
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
logg.debug(f'config loaded from {args.c}:\n{config}')
def main():
with open(f'{args.pins_dir}/pins.csv') as pins_file:
with open(f'{args.import_dir}/pins.csv') as pins_file:
phone_to_pins = [tuple(row) for row in csv.reader(pins_file)]
s_import_pins = celery.signature(
'import_task.set_pins',
(db_configs, phone_to_pins),
queue=args.q
db_conn = psycopg2.connect(
database=config.get('DATABASE_NAME'),
host=config.get('DATABASE_HOST'),
port=config.get('DATABASE_PORT'),
user=config.get('DATABASE_USER'),
password=config.get('DATABASE_PASSWORD')
)
result = s_import_pins.apply_async()
logg.debug(f'TASK: {result.id}, STATUS: {result.status}')
db_cursor = db_conn.cursor()
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
for element in phone_to_pins:
db_cursor.execute(sql, (element[1], element[0]))
logg.debug(f'Updating account: {element[0]} with: {element[1]}')
db_conn.commit()
db_cursor.close()
db_conn.close()
if __name__ == '__main__':

View File

@@ -1,38 +1,37 @@
# standard imports
import csv
import json
import logging
import os
import random
import urllib.error
import urllib.parse
import urllib.request
import uuid
from urllib import error, parse, request
# external imports
import celery
import psycopg2
from celery import Task
from chainlib.chain import ChainSpec
from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import (
unpack,
raw,
)
from cic_types.models.person import Person
from cic_types.processor import generate_metadata_pointer
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.tx import raw, unpack
from cic_types.models.person import Person, generate_metadata_pointer
from hexathon import add_0x, strip_0x
# local imports
logg = logging.getLogger()
celery_app = celery.current_app
logg = logging.getLogger()
class ImportTask(celery.Task):
class ImportTask(Task):
balances = None
import_dir = 'out'
count = 0
chain_spec = None
balance_processor = None
chain_spec: ChainSpec = None
count = 0
db_config: dict = None
import_dir = ''
include_balances = False
max_retries = None
@@ -41,121 +40,70 @@ class MetadataTask(ImportTask):
meta_port = None
meta_path = ''
meta_ssl = False
autoretry_for = (
urllib.error.HTTPError,
OSError,
)
autoretry_for = (error.HTTPError, OSError,)
retry_jitter = True
retry_backoff = True
retry_backoff_max = 60
@classmethod
def meta_url(self):
def meta_url(cls):
scheme = 'http'
if self.meta_ssl:
if cls.meta_ssl:
scheme += 's'
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path))
return urllib.parse.urlunparse(url)
url = parse.urlparse(f'{scheme}://{cls.meta_host}:{cls.meta_port}/{cls.meta_path}')
return parse.urlunparse(url)
def old_address_from_phone(base_path, phone):
pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format(
base_path,
pidx[:2],
pidx[2:4],
pidx,
)
)
f = open(phone_idx_path, 'r')
old_address = f.read()
f.close()
def old_address_from_phone(base_path: str, phone_number: str):
pid_x = generate_metadata_pointer(phone_number.encode('utf-8'), ':cic.phone')
phone_idx_path = os.path.join(f'{base_path}/phone/{pid_x[:2]}/{pid_x[2:4]}/{pid_x}')
with open(phone_idx_path, 'r') as f:
old_address = f.read()
return old_address
@celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone):
identifier = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
url = urllib.parse.urljoin(self.meta_url(), identifier)
logg.debug('attempt getting phone pointer at {} for phone {}'.format(url, phone))
r = urllib.request.urlopen(url)
address = json.load(r)
address = address.replace('"', '')
logg.debug('address {} for phone {}'.format(address, phone))
return address
@celery_app.task(bind=True, base=MetadataTask)
def generate_metadata(self, address, phone):
old_address = old_address_from_phone(self.import_dir, phone)
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
self.import_dir,
old_address_upper[:2],
old_address_upper[2:4],
old_address_upper,
)
f = open(metadata_path, 'r')
o = json.load(f)
f.close()
u = Person.deserialize(o)
if u.identities.get('evm') == None:
u.identities['evm'] = {}
sub_chain_str = '{}:{}'.format(self.chain_spec.common_name(), self.chain_spec.network_id())
u.identities['evm'][sub_chain_str] = [add_0x(address)]
new_address_clean = strip_0x(address)
filepath = os.path.join(
def generate_person_metadata(self, blockchain_address: str, phone_number: str):
logg.debug(f'blockchain address: {blockchain_address}')
old_blockchain_address = old_address_from_phone(self.import_dir, phone_number)
old_address_upper = strip_0x(old_blockchain_address).upper()
metadata_path = f'{self.import_dir}/old/{old_address_upper[:2]}/{old_address_upper[2:4]}/{old_address_upper}.json'
with open(metadata_path, 'r') as metadata_file:
person_metadata = json.load(metadata_file)
person = Person.deserialize(person_metadata)
if not person.identities.get('evm'):
person.identities['evm'] = {}
sub_chain_str = f'{self.chain_spec.common_name()}:{self.chain_spec.network_id()}'
person.identities['evm'][sub_chain_str] = [add_0x(blockchain_address)]
blockchain_address = strip_0x(blockchain_address)
file_path = os.path.join(
self.import_dir,
'new',
new_address_clean[:2].upper(),
new_address_clean[2:4].upper(),
new_address_clean.upper() + '.json',
blockchain_address[:2].upper(),
blockchain_address[2:4].upper(),
blockchain_address.upper() + '.json'
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
o = u.serialize()
f = open(filepath, 'w')
f.write(json.dumps(o))
f.close()
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person')
os.makedirs(os.path.dirname(file_path), exist_ok=True)
serialized_person_metadata = person.serialize()
with open(file_path, 'w') as metadata_file:
metadata_file.write(json.dumps(serialized_person_metadata))
logg.debug(f'written person metadata for address: {blockchain_address}')
meta_filepath = os.path.join(
self.import_dir,
'meta',
'{}.json'.format(new_address_clean.upper()),
'{}.json'.format(blockchain_address.upper()),
)
os.symlink(os.path.realpath(filepath), meta_filepath)
os.symlink(os.path.realpath(file_path), meta_filepath)
return blockchain_address
# write ussd data
ussd_data = {
'phone': phone,
'is_activated': 1,
'preferred_language': random.sample(['en', 'sw'], 1)[0],
'is_disabled': False
}
ussd_data_dir = os.path.join(self.import_dir, 'ussd')
ussd_data_file_path = os.path.join(ussd_data_dir, f'{old_address}.json')
f = open(ussd_data_file_path, 'w')
f.write(json.dumps(ussd_data))
f.close()
# write preferences data
@celery_app.task(bind=True, base=MetadataTask)
def generate_preferences_data(self, data: tuple):
blockchain_address: str = data[0]
preferences = data[1]
preferences_dir = os.path.join(self.import_dir, 'preferences')
preferences_data = {
'preferred_language': ussd_data['preferred_language']
}
preferences_key = generate_metadata_pointer(bytes.fromhex(new_address_clean[2:]), ':cic.preferences')
preferences_key = generate_metadata_pointer(bytes.fromhex(strip_0x(blockchain_address)), ':cic.preferences')
preferences_filepath = os.path.join(preferences_dir, 'meta', preferences_key)
filepath = os.path.join(
preferences_dir,
'new',
@@ -164,95 +112,95 @@ def generate_metadata(self, address, phone):
preferences_key.upper() + '.json'
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
f = open(filepath, 'w')
f.write(json.dumps(preferences_data))
f.close()
with open(filepath, 'w') as preferences_file:
preferences_file.write(json.dumps(preferences))
logg.debug(f'written preferences metadata: {preferences} for address: {blockchain_address}')
os.symlink(os.path.realpath(filepath), preferences_filepath)
logg.debug('found metadata {} for phone {}'.format(o, phone))
return address
return blockchain_address
@celery_app.task(bind=True, base=MetadataTask)
def opening_balance_tx(self, address, phone, serial):
old_address = old_address_from_phone(self.import_dir, phone)
def generate_pins_data(self, blockchain_address: str, phone_number: str):
pins_file = f'{self.import_dir}/pins.csv'
file_op = 'a' if os.path.exists(pins_file) else 'w'
with open(pins_file, file_op) as pins_file:
password_hash = uuid.uuid4().hex
pins_file.write(f'{phone_number},{password_hash}\n')
logg.debug(f'written pin data for address: {blockchain_address}')
return blockchain_address
k = to_checksum_address(strip_0x(old_address))
balance = self.balances[k]
logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone))
@celery_app.task(bind=True, base=MetadataTask)
def generate_ussd_data(self, blockchain_address: str, phone_number: str):
ussd_data_file = f'{self.import_dir}/ussd_data.csv'
file_op = 'a' if os.path.exists(ussd_data_file) else 'w'
preferred_language = random.sample(["en", "sw"], 1)[0]
preferences = {'preferred_language': preferred_language}
with open(ussd_data_file, file_op) as ussd_data_file:
ussd_data_file.write(f'{phone_number}, { 1}, {preferred_language}, {False}\n')
logg.debug(f'written ussd data for address: {blockchain_address}')
return blockchain_address, preferences
@celery_app.task(bind=True, base=MetadataTask)
def opening_balance_tx(self, blockchain_address: str, phone_number: str, serial: str):
old_blockchain_address = old_address_from_phone(self.import_dir, phone_number)
address = to_checksum_address(strip_0x(old_blockchain_address))
balance = self.balances[address]
logg.debug(f'found balance: {balance} for address: {address} phone: {phone_number}')
decimal_balance = self.balance_processor.get_decimal_amount(int(balance))
(tx_hash_hex, o) = self.balance_processor.get_rpc_tx(address, decimal_balance, serial)
tx_hash_hex, o = self.balance_processor.get_rpc_tx(blockchain_address, decimal_balance, serial)
tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec)
logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex))
tx_path = os.path.join(
self.import_dir,
'txs',
strip_0x(tx_hash_hex),
)
f = open(tx_path, 'w')
f.write(strip_0x(o))
f.close()
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(tx['nonce']),
)
logg.debug(f'generated tx token value: {decimal_balance}: {blockchain_address} tx hash {tx_hash_hex}')
tx_path = os.path.join(self.import_dir, 'txs', strip_0x(tx_hash_hex))
with open(tx_path, 'w') as tx_file:
tx_file.write(strip_0x(o))
logg.debug(f'written tx with tx hash: {tx["hash"]} for address: {blockchain_address}')
tx_nonce_path = os.path.join(self.import_dir, 'txs', '.' + str(tx['nonce']))
os.symlink(os.path.realpath(tx_path), tx_nonce_path)
return tx['hash']
@celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None,
@celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone_number: str):
identifier = generate_metadata_pointer(phone_number.encode('utf-8'), ':cic.phone')
url = parse.urljoin(self.meta_url(), identifier)
logg.debug(f'attempt getting phone pointer at: {url} for phone: {phone_number}')
r = request.urlopen(url)
address = json.load(r)
address = address.replace('"', '')
logg.debug(f'address: {address} for phone: {phone_number}')
return address
@celery_app.task(autoretry_for=(FileNotFoundError,),
bind=True,
base=ImportTask,
max_retries=None,
default_retry_delay=0.1)
def send_txs(self, nonce):
if nonce == self.count + self.balance_processor.nonce_offset:
logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset,
self.count))
return
logg.debug('attempt to open symlink for nonce {}'.format(nonce))
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(nonce),
)
f = open(tx_nonce_path, 'r')
tx_signed_raw_hex = f.read()
f.close()
os.unlink(tx_nonce_path)
o = raw(add_0x(tx_signed_raw_hex))
tx_hash_hex = self.balance_processor.conn.do(o)
logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) # tx_signed_raw_hex))
nonce += 1
queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'import_task.send_txs',
[
nonce,
],
queue=queue,
)
s.apply_async()
if nonce == self.count + self.balance_processor.nonce_offset:
logg.info(f'reached nonce {nonce} (offset {self.balance_processor.nonce_offset} + count {self.count}).')
celery_app.control.broadcast('shutdown', destination=[f'celery@{queue}'])
logg.debug(f'attempt to open symlink for nonce {nonce}')
tx_nonce_path = os.path.join(self.import_dir, 'txs', '.' + str(nonce))
with open(tx_nonce_path, 'r') as tx_nonce_file:
tx_signed_raw_hex = tx_nonce_file.read()
os.unlink(tx_nonce_path)
o = raw(add_0x(tx_signed_raw_hex))
if self.include_balances:
tx_hash_hex = self.balance_processor.conn.do(o)
logg.info(f'sent nonce {nonce} tx hash {tx_hash_hex}')
nonce += 1
s = celery.signature('import_task.send_txs', [nonce], queue=queue)
s.apply_async()
return nonce
@celery_app.task
def set_pins(config: dict, phone_to_pins: list):
# define db connection
@celery_app.task()
def set_pin_data(config: dict, phone_to_pins: list):
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
@@ -261,24 +209,17 @@ def set_pins(config: dict, phone_to_pins: list):
password=config.get('password')
)
db_cursor = db_conn.cursor()
# update db
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
for element in phone_to_pins:
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
db_cursor.execute(sql, (element[1], element[0]))
logg.debug(f'Updating: {element[0]} with: {element[1]}')
# commit changes
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()
@celery_app.task
def set_ussd_data(config: dict, ussd_data: dict):
# define db connection
def set_ussd_data(config: dict, ussd_data: list):
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
@@ -287,20 +228,12 @@ def set_ussd_data(config: dict, ussd_data: dict):
password=config.get('password')
)
db_cursor = db_conn.cursor()
# process ussd_data
account_status = 1
if ussd_data['is_activated'] == 1:
account_status = 2
preferred_language = ussd_data['preferred_language']
phone_number = ussd_data['phone']
sql = 'UPDATE account SET status = %s, preferred_language = %s WHERE phone_number = %s'
db_cursor.execute(sql, (account_status, preferred_language, phone_number))
# commit changes
for element in ussd_data:
status = 2 if int(element[1]) == 1 else 1
preferred_language = element[2]
phone_number = element[0]
db_cursor.execute(sql, (status, preferred_language, phone_number))
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()

View File

@@ -3,56 +3,61 @@ import argparse
import json
import logging
import os
import redis
import sys
import time
import urllib.request
import uuid
from urllib import request
from urllib.parse import urlencode
# external imports
import celery
import confini
import phonenumbers
import redis
from chainlib.chain import ChainSpec
from cic_types.models.person import Person
from confini import Config
# local imports
from import_util import get_celery_worker_status
default_config_dir = './config'
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic'
arg_parser = argparse.ArgumentParser(description='Daemon worker that handles data seeding tasks.')
# batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
arg_parser.add_argument('--batch-size',
dest='batch_size',
default=100,
type=int,
help='burst size of sending transactions to node')
arg_parser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit data seeding tasks to.')
arg_parser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
arg_parser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
arg_parser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
arg_parser.add_argument('--ussd-host', dest='ussd_host', type=str,
help="host to ussd app responsible for processing ussd requests.")
arg_parser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true')
arg_parser.add_argument('--ussd-port', dest='ussd_port', type=str,
help="port to ussd app responsible for processing ussd requests.")
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain specification string')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int,
help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
argparser.add_argument('--ussd-host', dest='ussd_host', type=str,
help="host to ussd app responsible for processing ussd requests.")
argparser.add_argument('--ussd-port', dest='ussd_port', type=str,
help="port to ussd app responsible for processing ussd requests.")
argparser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
argparser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config = Config(args.c, args.env_prefix)
config.process()
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
@@ -60,44 +65,29 @@ args_override = {
'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'),
}
config.dict_override(args_override, 'cli')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug(f'config loaded from {args.c}:\n{config}')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
get_celery_worker_status(celery_app=celery_app)
old_account_dir = os.path.join(args.import_dir, 'old')
os.stat(old_account_dir)
logg.debug(f'created old system data dir: {old_account_dir}')
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
r = redis.Redis(redis_host, redis_port, redis_db)
new_account_dir = os.path.join(args.import_dir, 'new')
os.makedirs(new_account_dir, exist_ok=True)
logg.debug(f'created new system data dir: {new_account_dir}')
ps = r.pubsub()
person_metadata_dir = os.path.join(args.import_dir, 'meta')
os.makedirs(person_metadata_dir, exist_ok=True)
logg.debug(f'created person metadata dir: {person_metadata_dir}')
user_new_dir = os.path.join(args.user_dir, 'new')
os.makedirs(user_new_dir, exist_ok=True)
ussd_data_dir = os.path.join(args.user_dir, 'ussd')
os.makedirs(ussd_data_dir, exist_ok=True)
preferences_dir = os.path.join(args.user_dir, 'preferences')
preferences_dir = os.path.join(args.import_dir, 'preferences')
os.makedirs(os.path.join(preferences_dir, 'meta'), exist_ok=True)
logg.debug(f'created preferences metadata dir: {preferences_dir}')
meta_dir = os.path.join(args.user_dir, 'meta')
os.makedirs(meta_dir, exist_ok=True)
valid_service_codes = config.get('USSD_SERVICE_CODE').split(",")
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
txs_dir = os.path.join(args.user_dir, 'txs')
os.makedirs(txs_dir, exist_ok=True)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
ussd_port = args.ussd_port
ussd_host = args.ussd_host
ussd_no_ssl = args.ussd_no_ssl
if ussd_no_ssl is True:
ussd_ssl = False
@@ -105,7 +95,17 @@ else:
ussd_ssl = True
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
get_celery_worker_status(celery_app)
def build_ussd_request(host: str,
password: str,
phone_number: str,
port: str,
service_code: str,
username: str,
ssl: bool = False):
url = 'http'
if ssl:
url += 's'
@@ -115,16 +115,16 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl=
url += '/?username={}&password={}'.format(username, password)
logg.info('ussd service url {}'.format(url))
logg.info('ussd phone {}'.format(phone))
logg.info('ussd phone {}'.format(phone_number))
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': service_code,
'phoneNumber': phone,
'phoneNumber': phone_number,
'text': service_code,
}
req = urllib.request.Request(url)
req = request.Request(url)
req.method = 'POST'
data_str = urlencode(data)
data_bytes = data_str.encode('utf-8')
@@ -134,85 +134,77 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl=
return req
def register_ussd(i, u):
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
logg.debug('tel {} {}'.format(u.tel, phone))
req = build_ussd_request(
phone,
ussd_host,
ussd_port,
config.get('APP_SERVICE_CODE'),
'',
'',
ussd_ssl
)
response = urllib.request.urlopen(req)
def e164_phone_number(phone_number: str):
phone_object = phonenumbers.parse(phone_number)
return phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
def register_account(person: Person):
phone_number = e164_phone_number(person.tel)
logg.debug(f'tel: {phone_number}')
req = build_ussd_request(args.ussd_host,
'',
phone_number,
args.ussd_port,
valid_service_codes[0],
'',
ussd_ssl)
response = request.urlopen(req)
response_data = response.read().decode('utf-8')
state = response_data[:3]
out = response_data[4:]
logg.debug('ussd reponse: {}'.format(out))
logg.debug(f'ussd response: {response_data[4:]}')
if __name__ == '__main__':
i = 0
j = 0
for x in os.walk(user_old_dir):
for x in os.walk(old_account_dir):
for y in x[2]:
if y[len(y) - 5:] != '.json':
continue
# handle json containing person object
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
file_path = os.path.join(x[0], y)
with open(file_path, 'r') as account_file:
try:
account_data = json.load(account_file)
except json.decoder.JSONDecodeError as e:
logg.error('load error for {}: {}'.format(y, e))
continue
person = Person.deserialize(account_data)
register_account(person)
phone_number = e164_phone_number(person.tel)
s_resolve_phone = celery.signature(
'import_task.resolve_phone', [phone_number], queue=args.q
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
s_person_metadata = celery.signature(
'import_task.generate_person_metadata', [phone_number], queue=args.q
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
s_ussd_data = celery.signature(
'import_task.generate_ussd_data', [phone_number], queue=args.q
)
s_meta.link(s_balance)
s_phone.link(s_meta)
# block time plus a bit of time for ussd processing
s_phone.apply_async(countdown=7)
s_preferences_metadata = celery.signature(
'import_task.generate_preferences_data', [], queue=args.q
)
s_pins_data = celery.signature(
'import_task.generate_pins_data', [phone_number], queue=args.q
)
s_opening_balance = celery.signature(
'import_task.opening_balance_tx', [phone_number, i], queue=args.q
)
celery.chain(s_resolve_phone,
s_person_metadata,
s_ussd_data,
s_preferences_metadata,
s_pins_data,
s_opening_balance).apply_async(countdown=7)
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
sys.stdout.write('imported: {} {}'.format(i, person).ljust(200) + "\r\n")
j += 1
if j == batch_size:
time.sleep(batch_delay)
if j == args.batch_size:
time.sleep(args.batch_delay)
j = 0

View File

@@ -1,67 +1,67 @@
# standard imports
import argparse
import json
import csv
import logging
import os
import psycopg2
# external imports
import celery
from confini import Config
# local imports
default_config_dir = './config'
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic'
arg_parser = argparse.ArgumentParser(description='Pins import script.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config file')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='Task queue')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config_dir = args.c
config = Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config = Config(args.c, args.env_prefix)
config.process()
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
config.censor('PASSWORD', 'DATABASE')
logg.debug(f'config loaded from {args.c}:\n{config}')
ussd_data_dir = os.path.join(args.user_dir, 'ussd')
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
def main():
with open(f'{args.import_dir}/ussd_data.csv') as ussd_data_file:
ussd_data = [tuple(row) for row in csv.reader(ussd_data_file)]
db_conn = psycopg2.connect(
database=config.get('DATABASE_NAME'),
host=config.get('DATABASE_HOST'),
port=config.get('DATABASE_PORT'),
user=config.get('DATABASE_USER'),
password=config.get('DATABASE_PASSWORD')
)
db_cursor = db_conn.cursor()
sql = 'UPDATE account SET status = %s, preferred_language = %s WHERE phone_number = %s'
for element in ussd_data:
status = 2 if int(element[1]) == 1 else 1
preferred_language = element[2]
phone_number = element[0]
db_cursor.execute(sql, (status, preferred_language, phone_number))
logg.debug(f'Updating account:{phone_number} with: preferred language: {preferred_language} status: {status}.')
db_conn.commit()
db_cursor.close()
db_conn.close()
if __name__ == '__main__':
for x in os.walk(ussd_data_dir):
for y in x[2]:
if y[len(y) - 5:] == '.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
ussd_data = json.load(f)
logg.debug(f'LOADING USSD DATA: {ussd_data}')
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
s_set_ussd_data = celery.signature(
'import_task.set_ussd_data',
[db_configs, ussd_data]
)
s_set_ussd_data.apply_async(queue='cic-import-ussd')
main()

View File

@@ -1,27 +1,4 @@
[app]
ALLOWED_IP=0.0.0.0/0
LOCALE_FALLBACK=en
LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json
user =
pass =
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/
[client]
host =
port =
ssl =
[keystore]
file_path = keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c
allowed_ip=0.0.0.0/0
max_body_length=1024
password_pepper=

View File

@@ -1,10 +1,10 @@
[database]
NAME=sempo
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
POOL_SIZE=1
name=cic_ussd
user=postgres
password=
host=localhost
port=5432
engine=postgresql
driver=psycopg2
debug=0
pool_size=1

View File

@@ -0,0 +1,5 @@
[ussd]
menu_file=data/ussd_menu.json
service_code=*483*46#,*483*061#,*384*96#
user =
pass =

View File

@@ -1,91 +0,0 @@
# standard imports
import argparse
import json
import logging
import os
import uuid
# third-party imports
import bcrypt
import celery
import confini
import phonenumbers
import random
from cic_types.models.person import Person
from cryptography.fernet import Fernet
# local imports
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(script_dir, 'config'))
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='Config dir')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('--userdir', type=str, help='path to users export dir tree')
arg_parser.add_argument('pins_dir', type=str, help='path to pin export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config = confini.Config(args.c, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
logg.info('loaded config\n{}'.format(config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
user_dir = args.userdir
pins_dir = args.pins_dir
def generate_password_hash():
key = Fernet.generate_key()
fnt = Fernet(key)
pin = str(random.randint(1000, 9999))
return fnt.encrypt(bcrypt.hashpw(pin.encode('utf-8'), bcrypt.gensalt())).decode()
user_old_dir = os.path.join(user_dir, 'old')
logg.debug(f'reading user data from: {user_old_dir}')
pins_file = open(f'{pins_dir}/pins.csv', 'w')
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for y in x[2]:
# skip non-json files
if y[len(y) - 5:] != '.json':
continue
# define file path for
filepath = None
if y[:15] != '_ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
password_hash = uuid.uuid4().hex
pins_file.write(f'{phone},{password_hash}\n')
logg.info(f'Writing phone: {phone}, password_hash: {password_hash}')
pins_file.close()

View File

@@ -9,7 +9,9 @@ COPY package.json \
package-lock.json \
.
RUN --mount=type=cache,mode=0755,target=/root/node_modules npm install
RUN npm ci --production
#RUN --mount=type=cache,mode=0755,target=/root/node_modules npm install
COPY requirements.txt .

View File

@@ -0,0 +1,60 @@
#!/usr/bin/env bash
set -e
echo "Creating seed data..."
python create_import_users.py -vv --dir "$IMPORT_DIR" "$ACCOUNT_COUNT"
wait $!
echo "Purge tasks from celery worker"
celery -A cic_ussd.import_task purge -Q "$CELERY_QUEUE" --broker redis://"$REDIS_HOST":"$REDIS_PORT" -f
echo "Start celery work and import balance job"
if [ "$INCLUDE_BALANCES" != "y" ]
then
echo "Running worker without opening balance transactions"
TARGET_TX_COUNT=$ACCOUNT_COUNT
python cic_ussd/import_balance.py -vv -c "$CONFIG" -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --token-symbol "$TOKEN_SYMBOL" -y "$KEYSTORE_PATH" "$IMPORT_DIR" &
else
echo "Running worker with opening balance transactions"
TARGET_TX_COUNT=$((ACCOUNT_COUNT*2))
python cic_ussd/import_balance.py -vv -c "$CONFIG" -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --include-balances --token-symbol "$TOKEN_SYMBOL" -y "$KEYSTORE_PATH" "$IMPORT_DIR" &
fi
until [ -f ./cic-import-ussd.pid ]
do
echo "Polling for celery worker pid file..."
sleep 1
done
IMPORT_BALANCE_JOB=$(<cic-import-ussd.pid)
echo "Start import users job"
if [ "$USSD_SSL" == "y" ]
then
echo "Targeting secure ussd-user server"
python cic_ussd/import_users.py -vv -c "$CONFIG" --ussd-host "$USSD_HOST" --ussd-port "$USSD_PORT" "$IMPORT_DIR"
else
python cic_ussd/import_users.py -vv -c "$CONFIG" --ussd-host "$USSD_HOST" --ussd-port "$USSD_PORT" --ussd-no-ssl "$IMPORT_DIR"
fi
echo "Waiting for import balance job to complete ..."
tail --pid="$IMPORT_BALANCE_JOB" -f /dev/null
set -e
echo "Importing pins"
python cic_ussd/import_pins.py -c "$CONFIG" -vv "$IMPORT_DIR"
set +e
wait $!
set -e
echo "Importing ussd data"
python cic_ussd/import_ussd_data.py -c "$CONFIG" -vv "$IMPORT_DIR"
set +e
wait $!
echo "Importing person metadata"
node cic_meta/import_meta.js "$IMPORT_DIR" "$ACCOUNT_COUNT"
echo "Import preferences metadata"
node cic_meta/import_meta_preferences.js "$IMPORT_DIR" "$ACCOUNT_COUNT"
CIC_NOTIFY_DATABASE=postgres://$DATABASE_USER:$DATABASE_PASSWORD@$DATABASE_HOST:$DATABASE_PORT/$NOTIFY_DATABASE_NAME
NOTIFICATION_COUNT=$(psql -qtA "$CIC_NOTIFY_DATABASE" -c 'SELECT COUNT(message) FROM notification WHERE message IS NOT NULL')
while [[ "$NOTIFICATION_COUNT" < "$TARGET_TX_COUNT" ]]
do
NOTIFICATION_COUNT=$(psql -qtA "$CIC_NOTIFY_DATABASE" -c 'SELECT COUNT(message) FROM notification WHERE message IS NOT NULL')
sleep 5
echo "Notification count is: ${NOTIFICATION_COUNT}. Checking after 5 ..."
done
python verify.py -c "$CONFIG" -v -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --exclude "$EXCLUSIONS" --token-symbol "$TOKEN_SYMBOL" "$IMPORT_DIR"

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
sarafu-faucet~=0.0.7a1
cic-eth[tools]~=0.12.4a7
cic-eth[tools]~=0.12.4a8
cic-types~=0.1.0a14
crypto-dev-signer>=0.4.15a1,<=0.4.15
faker==4.17.1

View File

@@ -64,6 +64,10 @@ phone_tests = [
'ussd_pins'
]
admin_tests = [
'local_key',
]
all_tests = eth_tests + custodial_tests + metadata_tests + phone_tests
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
@@ -74,6 +78,7 @@ argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spe
argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url')
argparser.add_argument('--ussd-provider', type=str, dest='ussd_provider', default='http://localhost:63315', help='cic-ussd url')
argparser.add_argument('--skip-custodial', dest='skip_custodial', action='store_true', help='skip all custodial verifications')
argparser.add_argument('--skip-ussd', dest='skip_ussd', action='store_true', help='skip all ussd verifications')
argparser.add_argument('--skip-metadata', dest='skip_metadata', action='store_true', help='skip all metadata verifications')
argparser.add_argument('--exclude', action='append', type=str, default=[], help='skip specified verification')
argparser.add_argument('--include', action='append', type=str, help='include specified verification')
@@ -134,6 +139,11 @@ if args.skip_custodial:
for t in custodial_tests:
if t not in exclude:
exclude.append(t)
if args.skip_ussd:
logg.info('will skip all ussd verifications ({})'.format(','.join(phone_tests)))
for t in phone_tests:
if t not in exclude:
exclude.append(t)
if args.skip_metadata:
logg.info('will skip all metadata verifications ({})'.format(','.join(metadata_tests)))
for t in metadata_tests:
@@ -187,9 +197,10 @@ def send_ussd_request(address, data_dir):
phone = p.tel
session = uuid.uuid4().hex
valid_service_codes = config.get('USSD_SERVICE_CODE').split(",")
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'serviceCode': valid_service_codes[0],
'phoneNumber': phone,
'text': '',
}