Merge branch 'master' into lash/lockfix

This commit is contained in:
nolash 2021-09-01 18:03:08 +02:00
commit 4a0dabe531
48 changed files with 2984 additions and 1232 deletions

View File

@ -1,14 +1,36 @@
include:
- local: 'ci_templates/.cic-template.yml'
- local: 'apps/contract-migration/.gitlab-ci.yml'
#- local: 'ci_templates/.cic-template.yml' #kaniko build templates
# these includes are app specific unit tests
- local: 'apps/cic-eth/.gitlab-ci.yml'
- local: 'apps/cic-ussd/.gitlab-ci.yml'
- local: 'apps/cic-notify/.gitlab-ci.yml'
- local: 'apps/cic-meta/.gitlab-ci.yml'
- local: 'apps/cic-cache/.gitlab-ci.yml'
- local: 'apps/data-seeding/.gitlab-ci.yml'
#- local: 'apps/contract-migration/.gitlab-ci.yml'
#- local: 'apps/data-seeding/.gitlab-ci.yml'
stages:
- build
- test
- release
- deploy
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/docker-with-compose:latest
variables:
DOCKER_BUILDKIT: "1"
COMPOSE_DOCKER_CLI_BUILD: "1"
CI_DEBUG_TRACE: "true"
before_script:
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
# runs on protected branches and pushes to repo
build-push:
stage: build
tags:
- integration
script:
- TAG=$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA sh ./scripts/build-push.sh
rules:
- if: $CI_COMMIT_REF_PROTECTED == "true"
when: always

View File

@ -2,25 +2,21 @@
## Getting started
## Make some keys
This repo uses docker-compose and docker buildkit. Set the following environment variables to get started:
```
docker build -t bloxie . && docker run -v "$(pwd)/keys:/root/keys" --rm -it -t bloxie account new --chain /root/bloxberg.json --keys-path /root/keys
export COMPOSE_DOCKER_CLI_BUILD=1
export DOCKER_BUILDKIT=1
```
### Prepare the repo
This is stuff we need to put in makefile but for now...
File mounts and permisssions need to be set
start services, database, redis and local ethereum node
```
chmod -R 755 scripts/initdb apps/cic-meta/scripts/initdb
````
start cluster
docker-compose up -d
```
docker-compose up
Run app/contract-migration to deploy contracts
```
RUN_MASK=3 docker-compose up contract-migration
```
stop cluster
@ -28,7 +24,7 @@ stop cluster
docker-compose down
```
delete data
stop cluster and delete data
```
docker-compose down -v
```
@ -38,5 +34,4 @@ rebuild an images
docker-compose up --build <service_name>
```
Deployment variables are writtend to service-configs/.env after everthing is up.

View File

@ -1,34 +0,0 @@
# The solc image messes up the alpine environment, so we have to go all over again
FROM python:3.8.6-slim-buster
LABEL authors="Louis Holbrook <dev@holbrook.no> 0826EDA1702D1E87C6E2875121D2E7BB88C2A746"
LABEL spdx-license-identifier="GPL-3.0-or-later"
LABEL description="Base layer for buiding development images for the cic component suite"
RUN apt-get update && \
apt-get install -y git gcc g++ libpq-dev && \
apt-get install -y vim gawk jq telnet openssl iputils-ping curl wget gnupg socat bash procps make python2 postgresql-client
RUN echo installing nodejs tooling
COPY ./dev/nvm.sh /root/
# Install nvm with node and npm
# https://stackoverflow.com/questions/25899912/how-to-install-nvm-in-docker
ENV NVM_DIR /root/.nvm
ENV NODE_VERSION 15.3.0
ENV BANCOR_NODE_VERSION 10.16.0
RUN wget -qO- https://raw.githubusercontent.com/nvm-sh/nvm/v0.37.2/install.sh | bash \
&& . $NVM_DIR/nvm.sh \
&& nvm install $NODE_VERSION \
&& nvm alias default $NODE_VERSION \
&& nvm use $NODE_VERSION \
# So many ridiculously stupid issues with node in docker that take oceans of absolutely wasted time to resolve
# owner of these files is "1001" by default - wtf
&& chown -R root:root "$NVM_DIR/versions/node/v$NODE_VERSION"
ENV NODE_PATH $NVM_DIR/versions/node//v$NODE_VERSION/lib/node_modules
ENV PATH $NVM_DIR/versions/node//v$NODE_VERSION/bin:$PATH

View File

@ -1 +0,0 @@
## this is an example base image if we wanted one for all the other apps. Its just OS level things

View File

@ -1,52 +1,17 @@
.cic_cache_variables:
variables:
APP_NAME: cic-cache
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
build-mr-cic-cache:
extends:
- .py_build_merge_request
- .cic_cache_variables
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-cache/**/*
when: always
test-mr-cic-cache:
stage: test
extends:
- .cic_cache_variables
cache:
key:
files:
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script:
- cd apps/$APP_NAME/
- >
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_cache --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-cache"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always
build-push-cic-cache:
extends:
- .py_build_push
- .cic_cache_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-cache/**/*
when: always
build-test-cic-cache:
stage: test
tags:
- integration
variables:
APP_NAME: cic-cache
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-cache
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
allow_failure: true
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always

View File

@ -0,0 +1 @@
# CIC-CACHE

View File

@ -1,38 +0,0 @@
# syntax = docker/dockerfile:1.2
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
# RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2b9
COPY requirements.txt .
#RUN pip install $pip_extra_index_url_flag -r test_requirements.txt
#RUN pip install $pip_extra_index_url_flag .
#RUN pip install .[server]
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
ARG EXTRA_PIP_ARGS=""
RUN pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL $EXTRA_PIP_ARGS \
-r requirements.txt
COPY . .
RUN python setup.py install
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY config/ /usr/local/etc/cic-cache/
# for db migrations
RUN git clone https://github.com/vishnubob/wait-for-it.git /usr/local/bin/wait-for-it/
COPY cic_cache/db/migrations/ /usr/local/share/cic-cache/alembic/
COPY /docker/start_tracker.sh ./start_tracker.sh
COPY /docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server
# ENTRYPOINT [ "/usr/local/bin/uwsgi", "--wsgi-file", "/usr/local/lib/python3.8/site-packages/cic_cache/runnable/server.py", "--http", ":80", "--pyargv", "-vv" ]
ENTRYPOINT []

View File

@ -0,0 +1,10 @@
#! /bin/bash
set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 \
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \
-r test_requirements.txt
export PYTHONPATH=. && pytest -x --cov=cic_cache --cov-fail-under=90 --cov-report term-missing tests

View File

@ -1,52 +1,16 @@
.cic_eth_variables:
variables:
APP_NAME: cic-eth
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
build-mr-cic-eth:
extends:
- .cic_eth_variables
- .py_build_target_dev
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-eth/**/*
when: always
test-mr-cic-eth:
stage: test
extends:
- .cic_eth_variables
cache:
key:
files:
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script:
- cd apps/$APP_NAME/
- >
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r admin_requirements.txt
-r services_requirements.txt
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-eth"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-eth/**/*
when: always
build-push-cic-eth:
extends:
- .py_build_push
- .cic_eth_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-eth/**/*
when: always
build-test-cic-eth:
stage: test
tags:
- integration
variables:
APP_NAME: cic-eth
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-eth
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
#rules:
#- if: $CI_PIPELINE_SOURCE == "merge_request_event"
# changes:
# - apps/$APP_NAME/**/*
# when: always

View File

@ -1,71 +0,0 @@
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
WORKDIR /usr/src/cic-eth
# Copy just the requirements and install....this _might_ give docker a hint on caching but we
# do load these all into setup.py later
# TODO can we take all the requirements out of setup.py and just do a pip install -r requirements.txt && python setup.py
#COPY cic-eth/requirements.txt .
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
ARG EXTRA_PIP_ARGS=""
#RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \
# pip install --index-url https://pypi.org/simple \
# --force-reinstall \
# --extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
# -r requirements.txt
COPY *requirements.txt .
RUN pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY \
--extra-index-url $EXTRA_INDEX_URL \
$EXTRA_PIP_ARGS \
-r requirements.txt \
-r services_requirements.txt \
-r admin_requirements.txt
COPY . .
RUN python setup.py install
COPY docker/entrypoints/* ./
RUN chmod 755 *.sh
# # ini files in config directory defines the configurable parameters for the application
# # they can all be overridden by environment variables
# # to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY config/ /usr/local/etc/cic-eth/
COPY cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/
COPY crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/
# TODO this kind of code sharing across projects should be discouraged...can we make util a library?
#COPY util/liveness/health.sh /usr/local/bin/health.sh
ENTRYPOINT []
# ------------------ PRODUCTION CONTAINER ----------------------
#FROM python:3.8.6-slim-buster as prod
#
#RUN apt-get update && \
# apt install -y gnupg libpq-dev procps
#
#WORKDIR /root
#
#COPY --from=dev /usr/local/bin/ /usr/local/bin/
#COPY --from=dev /usr/local/lib/python3.8/site-packages/ \
# /usr/local/lib/python3.8/site-packages/
#
#COPY docker/entrypoints/* ./
#RUN chmod 755 *.sh
#
## # ini files in config directory defines the configurable parameters for the application
## # they can all be overridden by environment variables
## # to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
#COPY config/ /usr/local/etc/cic-eth/
#COPY cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/
#COPY crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/
#COPY scripts/ scripts/
#
## TODO this kind of code sharing across projects should be discouraged...can we make util a library?
##COPY util/liveness/health.sh /usr/local/bin/health.sh
#
#ENTRYPOINT []
#

View File

@ -0,0 +1,11 @@
#! /bin/bash
set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 --extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \
-r admin_requirements.txt \
-r services_requirements.txt \
-r test_requirements.txt
export PYTHONPATH=. && pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests

View File

@ -0,0 +1,10 @@
#! /bin/bash
set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 --extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r admin_requirements.txt
-r services_requirements.txt
-r test_requirements.txt
export PYTHONPATH=. && pytest -x --cov=cic_eth --cov-fail-under=90 --cov-report term-missing tests

View File

@ -1,43 +1,16 @@
.cic_meta_variables:
variables:
APP_NAME: cic-meta
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
build-mr-cic-meta:
extends:
- .py_build_merge_request
- .cic_meta_variables
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-meta/**/*
when: always
test-mr-cic-meta:
extends:
- .cic_meta_variables
stage: test
image: $MR_IMAGE_TAG
script:
- cd /root
- npm install --dev
- npm run test
- npm run test:coverage
needs: ["build-mr-cic-meta"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-meta/**/*
when: always
build-push-cic-meta:
extends:
- .py_build_push
- .cic_meta_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-meta/**/*
when: always
build-test-cic-meta:
stage: test
tags:
- integration
variables:
APP_NAME: cic-meta
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-meta
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run --entrypoint=sh $MR_IMAGE_TAG docker/run_tests.sh
#rules:
#- if: $CI_PIPELINE_SOURCE == "merge_request_event"
# changes:
# - apps/$APP_NAME/**/*
# when: always

View File

@ -15,11 +15,10 @@ RUN --mount=type=cache,mode=0755,target=/root/.npm \
COPY webpack.config.js .
COPY tsconfig.json .
## required to build the cic-client-meta module
COPY src/ src/
COPY scripts/ scripts/
COPY tests/ tests/
COPY . .
COPY tests/*.asc /root/pgp/
## copy runtime configs
COPY .config/ /usr/local/etc/cic-meta/
#

View File

@ -1,32 +0,0 @@
# syntax = docker/dockerfile:1.2
#FROM node:15.3.0-alpine3.10
FROM node:lts-alpine3.14
WORKDIR /root
RUN apk add --no-cache postgresql bash
# copy the dependencies
COPY package.json package-lock.json .
RUN npm set cache /root/.npm && \
npm ci
COPY webpack.config.js .
COPY tsconfig.json .
## required to build the cic-client-meta module
COPY src/ src/
COPY scripts/ scripts/
COPY tests/ tests/
COPY tests/*.asc /root/pgp/
## copy runtime configs
COPY .config/ /usr/local/etc/cic-meta/
#
## db migrations
COPY docker/db.sh ./db.sh
RUN chmod 755 ./db.sh
#
RUN alias tsc=node_modules/typescript/bin/tsc
COPY docker/start_server.sh ./start_server.sh
RUN chmod 755 ./start_server.sh
ENTRYPOINT ["sh", "./start_server.sh"]

View File

@ -0,0 +1,7 @@
#! /bin/bash
set -e
npm install --dev
npm run test
npm run test:coverage

View File

@ -1,52 +1,17 @@
.cic_notify_variables:
variables:
APP_NAME: cic-notify
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
build-mr-cic-notify:
extends:
- .py_build_merge_request
- .cic_notify_variables
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-notify/**/*
when: always
test-mr-cic-notify:
stage: test
extends:
- .cic_notify_variables
cache:
key:
files:
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script:
- cd apps/$APP_NAME/
- >
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_notify --cov-fail-under=90 --cov-report term-missing tests
needs: ["build-mr-cic-notify"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always
build-push-cic-notify:
extends:
- .py_build_push
- .cic_notify_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-notify/**/*
when: always
build-test-cic-notify:
stage: test
tags:
- integration
variables:
APP_NAME: cic-notify
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-notify
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
allow_failure: true
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always

View File

@ -11,12 +11,12 @@ celery_app = celery.current_app
@celery_app.task
def persist_notification(recipient, message):
def persist_notification(message, recipient):
"""
:param recipient:
:type recipient:
:param message:
:type message:
:param recipient:
:type recipient:
:return:
:rtype:
"""

View File

@ -11,12 +11,13 @@ local_logg = logging.getLogger(__name__)
@celery_app.task
def log(recipient, message):
def log(message, recipient):
"""
:param recipient:
:type recipient:
:param message:
:type message:
:param recipient:
:type recipient:
:return:
:rtype:
"""

View File

@ -1,27 +0,0 @@
# syntax = docker/dockerfile:1.2
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
#RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a62
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
COPY requirements.txt .
RUN pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
-r requirements.txt
COPY . .
RUN python setup.py install
COPY docker/*.sh .
RUN chmod +x *.sh
# ini files in config directory defines the configurable parameters for the application
# they can all be overridden by environment variables
# to generate a list of environment variables from configuration, use: confini-dump -z <dir> (executable provided by confini package)
COPY .config/ /usr/local/etc/cic-notify/
COPY cic_notify/db/migrations/ /usr/local/share/cic-notify/alembic/
ENTRYPOINT []

View File

@ -0,0 +1,9 @@
#! /bin/bash
set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 \
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \
-r test_requirements.txt
export PYTHONPATH=. && pytest -x --cov=cic_notify --cov-fail-under=90 --cov-report term-missing tests

View File

@ -1,52 +1,16 @@
.cic_ussd_variables:
variables:
APP_NAME: cic-ussd
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
build-mr-cic-ussd:
extends:
- .py_build_merge_request
- .cic_ussd_variables
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/cic-ussd/**/*
when: always
test-mr-cic-ussd:
stage: test
extends:
- .cic_ussd_variables
cache:
key:
files:
- test_requirements.txt
paths:
- /root/.cache/pip
image: $MR_IMAGE_TAG
script:
- cd apps/$APP_NAME/
- >
pip install --extra-index-url https://pip.grassrootseconomics.net:8433
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple
-r test_requirements.txt
- export PYTHONPATH=. && pytest -x --cov=cic_ussd --cov-fail-under=90 --cov-report term-missing tests/cic_ussd
needs: ["build-mr-cic-ussd"]
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always
build-push-cic-ussd:
extends:
- .py_build_push
- .cic_ussd_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/cic-ussd/**/*
when: always
build-test-cic-ussd:
stage: test
tags:
- integration
variables:
APP_NAME: cic-ussd
MR_IMAGE_TAG: mr-$APP_NAME-$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA
script:
- cd apps/cic-ussd
- docker build -t $MR_IMAGE_TAG -f docker/Dockerfile .
- docker run $MR_IMAGE_TAG sh docker/run_tests.sh
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/$APP_NAME/**/*
when: always

View File

@ -1,5 +1,6 @@
# standard import
import decimal
import json
import logging
from typing import Dict, Tuple
@ -8,6 +9,8 @@ from cic_eth.api import Api
from sqlalchemy.orm.session import Session
# local import
from cic_ussd.account.chain import Chain
from cic_ussd.account.tokens import get_cached_default_token
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.base import SessionBase
from cic_ussd.error import UnknownUssdRecipient
@ -59,7 +62,9 @@ def from_wei(value: int) -> float:
:return: SRF equivalent of value in Wei
:rtype: float
"""
value = float(value) / 1e+6
cached_token_data = json.loads(get_cached_default_token(Chain.spec.__str__()))
token_decimals: int = cached_token_data.get('decimals')
value = float(value) / (10**token_decimals)
return truncate(value=value, decimals=2)
@ -70,7 +75,9 @@ def to_wei(value: int) -> int:
:return: Wei equivalent of value in SRF
:rtype: int
"""
return int(value * 1e+6)
cached_token_data = json.loads(get_cached_default_token(Chain.spec.__str__()))
token_decimals: int = cached_token_data.get('decimals')
return int(value * (10**token_decimals))
def truncate(value: float, decimals: int):

View File

@ -44,7 +44,7 @@ class MetadataRequestsHandler(Metadata):
def create(self, data: Union[Dict, str]):
""""""
data = json.dumps(data)
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
error_handler(result=result)

View File

@ -146,7 +146,7 @@ def create_ussd_session(
)
def update_ussd_session(ussd_session: UssdSession,
def update_ussd_session(ussd_session: DbUssdSession,
user_input: str,
state: str,
data: Optional[dict] = None) -> UssdSession:

View File

@ -138,26 +138,14 @@ def transaction_balances_callback(self, result: list, param: dict, status_code:
balances_data = result[0]
available_balance = calculate_available_balance(balances_data)
transaction = param
blockchain_address = transaction.get('blockchain_address')
transaction['available_balance'] = available_balance
queue = self.request.delivery_info.get('routing_key')
s_preferences_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_preferences_metadata', [blockchain_address], queue=queue
)
s_process_account_metadata = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [transaction], queue=queue
)
s_notify_account = celery.signature('cic_ussd.tasks.notifications.transaction', queue=queue)
if transaction.get('transaction_type') == 'transfer':
celery.chain(s_preferences_metadata, s_process_account_metadata, s_notify_account).apply_async()
if transaction.get('transaction_type') == 'tokengift':
s_process_account_metadata = celery.signature(
'cic_ussd.tasks.processor.parse_transaction', [{}, transaction], queue=queue
)
celery.chain(s_process_account_metadata, s_notify_account).apply_async()
celery.chain(s_process_account_metadata, s_notify_account).apply_async()
@celery_app.task

View File

@ -8,6 +8,7 @@ import i18n
from chainlib.hash import strip_0x
# local imports
from cic_ussd.account.metadata import get_cached_preferred_language
from cic_ussd.account.statement import get_cached_statement
from cic_ussd.account.transaction import aux_transaction_data, validate_transaction_account
from cic_ussd.cache import cache_data, cache_data_key
@ -58,19 +59,17 @@ def cache_statement(parsed_transaction: dict, querying_party: str):
@celery_app.task
def parse_transaction(preferences: dict, transaction: dict) -> dict:
def parse_transaction(transaction: dict) -> dict:
"""This function parses transaction objects and collates all relevant data for system use i.e:
- An account's set preferred language.
- Account identifier that facilitates notification.
- Contextual tags i.e action and direction tags.
:param preferences: An account's set preferences.
:type preferences: dict
:param transaction: Transaction object.
:type transaction: dict
:return: Transaction object with contextual data for use in the system.
:rtype: dict
"""
preferred_language = preferences.get('preferred_language')
preferred_language = get_cached_preferred_language(transaction.get('blockchain_address'))
if not preferred_language:
preferred_language = i18n.config.get('fallback')
transaction['preferred_language'] = preferred_language
@ -83,6 +82,8 @@ def parse_transaction(preferences: dict, transaction: dict) -> dict:
alt_account = session.query(Account).filter_by(blockchain_address=alt_blockchain_address).first()
if alt_account:
transaction['alt_metadata_id'] = alt_account.standard_metadata_id()
else:
transaction['alt_metadata_id'] = 'GRASSROOTS ECONOMICS'
transaction['metadata_id'] = account.standard_metadata_id()
transaction['phone_number'] = account.phone_number
session.close()

View File

@ -1,32 +0,0 @@
# syntax = docker/dockerfile:1.2
FROM registry.gitlab.com/grassrootseconomics/cic-base-images:python-3.8.6-dev-55da5f4e as dev
RUN apt-get install -y redis-server
# create secrets directory
RUN mkdir -vp pgp/keys
# create application directory
RUN mkdir -vp cic-ussd
RUN mkdir -vp data
COPY requirements.txt .
ARG EXTRA_INDEX_URL="https://pip.grassrootseconomics.net:8433"
ARG GITLAB_PYTHON_REGISTRY="https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple"
RUN pip install --index-url https://pypi.org/simple \
--extra-index-url $GITLAB_PYTHON_REGISTRY --extra-index-url $EXTRA_INDEX_URL \
-r requirements.txt
COPY . .
RUN python setup.py install
COPY cic_ussd/db/ussd_menu.json data/
COPY docker/*.sh .
RUN chmod +x /root/*.sh
# copy config and migration files to definitive file so they can be referenced in path definitions for running scripts
COPY config/ /usr/local/etc/cic-ussd/
COPY cic_ussd/db/migrations/ /usr/local/share/cic-ussd/alembic
ENTRYPOINT []

View File

@ -0,0 +1,10 @@
#! /bin/bash
set -e
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 \
--extra-index-url https://gitlab.com/api/v4/projects/27624814/packages/pypi/simple \
-r test_requirements.txt
export PYTHONPATH=. && pytest -x --cov=cic_ussd --cov-fail-under=90 --cov-report term-missing tests/cic_ussd

View File

@ -1,25 +1,25 @@
.contract_migration_variables:
variables:
APP_NAME: contract-migration
DOCKERFILE_PATH: docker/Dockerfile_ci
CONTEXT: apps/$APP_NAME
build-mr-contract-migration:
extends:
- .py_build_merge_request
- .contract_migration_variables
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- apps/contract-migration/**/*
when: always
build-push-contract-migration:
extends:
- .py_build_push
- .contract_migration_variables
rules:
- if: $CI_COMMIT_BRANCH == "master"
changes:
- apps/contract-migration/**/*
when: always
#.contract_migration_variables:
# variables:
# APP_NAME: contract-migration
# DOCKERFILE_PATH: docker/Dockerfile_ci
# CONTEXT: apps/$APP_NAME
#
#build-mr-contract-migration:
# extends:
# - .py_build_merge_request
# - .contract_migration_variables
# rules:
# - if: $CI_PIPELINE_SOURCE == "merge_request_event"
# changes:
# - apps/contract-migration/**/*
# when: always
#
#build-push-contract-migration:
# extends:
# - .py_build_push
# - .contract_migration_variables
# rules:
# - if: $CI_COMMIT_BRANCH == "master"
# changes:
# - apps/contract-migration/**/*
# when: always

View File

@ -2,7 +2,7 @@
.cache
.dot
**/doc
**/node_modules
node_modules/
**/venv
**/.venv

View File

@ -1,64 +1,61 @@
# standard imports
import argparse
import logging
import sys
import os
import sys
# external imports
import celery
import confini
import redis
from chainlib.chain import ChainSpec
from chainlib.eth.address import to_checksum_address
from chainlib.eth.connection import EthHTTPConnection
from confini import Config
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore.dict import DictKeystore
# local imports
from import_task import ImportTask, MetadataTask
from import_util import BalanceProcessor, get_celery_worker_status
from import_task import ImportTask, MetadataTask
logging.basicConfig(level=logging.WARNING)
default_config_dir = './config'
logg = logging.getLogger()
config_dir = './config'
arg_parser = argparse.ArgumentParser(description='Daemon worker that handles data seeding tasks.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
arg_parser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
arg_parser.add_argument('--include-balances', dest='include_balances', help='include opening balance transactions',
action='store_true')
arg_parser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host')
arg_parser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host')
arg_parser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit data seeding tasks to.')
arg_parser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
arg_parser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
arg_parser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
arg_parser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
arg_parser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol',
help='Token symbol to use for transactions')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
arg_parser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
arg_parser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1',
help='chain spec')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host')
argparser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--token-symbol', default='GFT', type=str, dest='token_symbol',
help='Token symbol to use for transactions')
argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str,
help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to')
argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', default='out', type=str, help='user export directory')
args = argparser.parse_args(sys.argv[1:])
if args.v:
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
elif args.vv:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config = Config(args.c, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
@ -73,88 +70,76 @@ args_override = {
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
logg.debug(f'config loaded from {args.c}:\n{config}')
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
r = redis.Redis(redis_host, redis_port, redis_db)
db_config = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
ImportTask.db_config = db_config
# create celery apps
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
status = get_celery_worker_status(celery_app=celery_app)
signer_address = None
keystore = DictKeystore()
if args.y is not None:
logg.debug('loading keystore file {}'.format(args.y))
signer_address = keystore.import_keystore_file(args.y)
logg.debug('now have key for signer address {}'.format(signer_address))
# define signer
os.path.isfile(args.y)
logg.debug(f'loading keystore file {args.y}')
signer_address = keystore.import_keystore_file(args.y)
logg.debug(f'now have key for signer address {signer_address}')
signer = EIP155Signer(keystore)
queue = args.q
chain_str = config.get('CIC_CHAIN_SPEC')
block_offset = 0
if args.head:
block_offset = -1
else:
block_offset = args.offset
block_offset = -1 if args.head else args.offset
chain_str = config.get('CIC_CHAIN_SPEC')
chain_spec = ChainSpec.from_chain_str(chain_str)
ImportTask.chain_spec = chain_spec
old_chain_spec_str = args.old_chain_spec
old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str)
user_dir = args.user_dir # user_out_dir from import_users.py
token_symbol = args.token_symbol
MetadataTask.meta_host = config.get('META_HOST')
MetadataTask.meta_port = config.get('META_PORT')
ImportTask.chain_spec = chain_spec
txs_dir = os.path.join(args.import_dir, 'txs')
os.makedirs(txs_dir, exist_ok=True)
sys.stdout.write(f'created txs dir: {txs_dir}')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
get_celery_worker_status(celery_app)
def main():
conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'),
signer_address, signer)
ImportTask.balance_processor.init(token_symbol)
# TODO get decimals from token
ImportTask.balance_processor = BalanceProcessor(conn,
chain_spec,
config.get('CIC_REGISTRY_ADDRESS'),
signer_address,
signer)
ImportTask.balance_processor.init(args.token_symbol)
balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r'))
remove_zeros = 10 ** 6
i = 0
while True:
l = f.readline()
if l is None:
break
r = l.split(',')
try:
address = to_checksum_address(r[0])
sys.stdout.write('loading balance {} {} {}'.format(i, address, r[1]).ljust(200) + "\r")
except ValueError:
break
balance = int(int(r[1].rstrip()) / remove_zeros)
balances[address] = balance
i += 1
f.close()
accuracy = 10 ** 6
count = 0
with open(f'{args.import_dir}/balances.csv', 'r') as balances_file:
while True:
line = balances_file.readline()
if line is None:
break
balance_data = line.split(',')
try:
blockchain_address = to_checksum_address(balance_data[0])
logg.info(
'loading balance: {} {} {}'.format(count, blockchain_address, balance_data[1].ljust(200) + "\r"))
except ValueError:
break
balance = int(int(balance_data[1].rstrip()) / accuracy)
balances[blockchain_address] = balance
count += 1
ImportTask.balances = balances
ImportTask.count = i
ImportTask.import_dir = user_dir
s = celery.signature(
'import_task.send_txs',
[
MetadataTask.balance_processor.nonce_offset,
],
queue=queue,
)
s.apply_async()
ImportTask.count = count
ImportTask.include_balances = args.include_balances is True
ImportTask.import_dir = args.import_dir
s_send_txs = celery.signature(
'import_task.send_txs', [ImportTask.balance_processor.nonce_offset], queue=args.q)
s_send_txs.apply_async()
argv = ['worker']
if args.vv:
@ -165,6 +150,7 @@ def main():
argv.append(args.q)
argv.append('-n')
argv.append(args.q)
argv.append(f'--pidfile={args.q}.pid')
celery_app.worker_main(argv)

View File

@ -1,71 +1,63 @@
# standard import
# standard imports
import argparse
import csv
import logging
import os
import psycopg2
# third-party imports
import celery
import confini
# external imports
from confini import Config
# local imports
from import_util import get_celery_worker_status
default_config_dir = './config'
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = './config'
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
arg_parser = argparse.ArgumentParser(description='Pins import script.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to')
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('pins_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
# set log levels
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
# process configs
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config = Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
status = get_celery_worker_status(celery_app=celery_app)
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
logg.debug(f'config loaded from {args.c}:\n{config}')
def main():
with open(f'{args.pins_dir}/pins.csv') as pins_file:
with open(f'{args.import_dir}/pins.csv') as pins_file:
phone_to_pins = [tuple(row) for row in csv.reader(pins_file)]
s_import_pins = celery.signature(
'import_task.set_pins',
(db_configs, phone_to_pins),
queue=args.q
db_conn = psycopg2.connect(
database=config.get('DATABASE_NAME'),
host=config.get('DATABASE_HOST'),
port=config.get('DATABASE_PORT'),
user=config.get('DATABASE_USER'),
password=config.get('DATABASE_PASSWORD')
)
result = s_import_pins.apply_async()
logg.debug(f'TASK: {result.id}, STATUS: {result.status}')
db_cursor = db_conn.cursor()
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
for element in phone_to_pins:
db_cursor.execute(sql, (element[1], element[0]))
logg.debug(f'Updating account: {element[0]} with: {element[1]}')
db_conn.commit()
db_cursor.close()
db_conn.close()
if __name__ == '__main__':

View File

@ -1,38 +1,37 @@
# standard imports
import csv
import json
import logging
import os
import random
import urllib.error
import urllib.parse
import urllib.request
import uuid
from urllib import error, parse, request
# external imports
import celery
import psycopg2
from celery import Task
from chainlib.chain import ChainSpec
from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import (
unpack,
raw,
)
from cic_types.models.person import Person
from cic_types.processor import generate_metadata_pointer
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.tx import raw, unpack
from cic_types.models.person import Person, generate_metadata_pointer
from hexathon import add_0x, strip_0x
# local imports
logg = logging.getLogger()
celery_app = celery.current_app
logg = logging.getLogger()
class ImportTask(celery.Task):
class ImportTask(Task):
balances = None
import_dir = 'out'
count = 0
chain_spec = None
balance_processor = None
chain_spec: ChainSpec = None
count = 0
db_config: dict = None
import_dir = ''
include_balances = False
max_retries = None
@ -41,121 +40,70 @@ class MetadataTask(ImportTask):
meta_port = None
meta_path = ''
meta_ssl = False
autoretry_for = (
urllib.error.HTTPError,
OSError,
)
autoretry_for = (error.HTTPError, OSError,)
retry_jitter = True
retry_backoff = True
retry_backoff_max = 60
@classmethod
def meta_url(self):
def meta_url(cls):
scheme = 'http'
if self.meta_ssl:
if cls.meta_ssl:
scheme += 's'
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path))
return urllib.parse.urlunparse(url)
url = parse.urlparse(f'{scheme}://{cls.meta_host}:{cls.meta_port}/{cls.meta_path}')
return parse.urlunparse(url)
def old_address_from_phone(base_path, phone):
pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format(
base_path,
pidx[:2],
pidx[2:4],
pidx,
)
)
f = open(phone_idx_path, 'r')
old_address = f.read()
f.close()
def old_address_from_phone(base_path: str, phone_number: str):
pid_x = generate_metadata_pointer(phone_number.encode('utf-8'), ':cic.phone')
phone_idx_path = os.path.join(f'{base_path}/phone/{pid_x[:2]}/{pid_x[2:4]}/{pid_x}')
with open(phone_idx_path, 'r') as f:
old_address = f.read()
return old_address
@celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone):
identifier = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
url = urllib.parse.urljoin(self.meta_url(), identifier)
logg.debug('attempt getting phone pointer at {} for phone {}'.format(url, phone))
r = urllib.request.urlopen(url)
address = json.load(r)
address = address.replace('"', '')
logg.debug('address {} for phone {}'.format(address, phone))
return address
@celery_app.task(bind=True, base=MetadataTask)
def generate_metadata(self, address, phone):
old_address = old_address_from_phone(self.import_dir, phone)
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
self.import_dir,
old_address_upper[:2],
old_address_upper[2:4],
old_address_upper,
)
f = open(metadata_path, 'r')
o = json.load(f)
f.close()
u = Person.deserialize(o)
if u.identities.get('evm') == None:
u.identities['evm'] = {}
sub_chain_str = '{}:{}'.format(self.chain_spec.common_name(), self.chain_spec.network_id())
u.identities['evm'][sub_chain_str] = [add_0x(address)]
new_address_clean = strip_0x(address)
filepath = os.path.join(
def generate_person_metadata(self, blockchain_address: str, phone_number: str):
logg.debug(f'blockchain address: {blockchain_address}')
old_blockchain_address = old_address_from_phone(self.import_dir, phone_number)
old_address_upper = strip_0x(old_blockchain_address).upper()
metadata_path = f'{self.import_dir}/old/{old_address_upper[:2]}/{old_address_upper[2:4]}/{old_address_upper}.json'
with open(metadata_path, 'r') as metadata_file:
person_metadata = json.load(metadata_file)
person = Person.deserialize(person_metadata)
if not person.identities.get('evm'):
person.identities['evm'] = {}
sub_chain_str = f'{self.chain_spec.common_name()}:{self.chain_spec.network_id()}'
person.identities['evm'][sub_chain_str] = [add_0x(blockchain_address)]
blockchain_address = strip_0x(blockchain_address)
file_path = os.path.join(
self.import_dir,
'new',
new_address_clean[:2].upper(),
new_address_clean[2:4].upper(),
new_address_clean.upper() + '.json',
blockchain_address[:2].upper(),
blockchain_address[2:4].upper(),
blockchain_address.upper() + '.json'
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
o = u.serialize()
f = open(filepath, 'w')
f.write(json.dumps(o))
f.close()
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person')
os.makedirs(os.path.dirname(file_path), exist_ok=True)
serialized_person_metadata = person.serialize()
with open(file_path, 'w') as metadata_file:
metadata_file.write(json.dumps(serialized_person_metadata))
logg.debug(f'written person metadata for address: {blockchain_address}')
meta_filepath = os.path.join(
self.import_dir,
'meta',
'{}.json'.format(new_address_clean.upper()),
'{}.json'.format(blockchain_address.upper()),
)
os.symlink(os.path.realpath(filepath), meta_filepath)
os.symlink(os.path.realpath(file_path), meta_filepath)
return blockchain_address
# write ussd data
ussd_data = {
'phone': phone,
'is_activated': 1,
'preferred_language': random.sample(['en', 'sw'], 1)[0],
'is_disabled': False
}
ussd_data_dir = os.path.join(self.import_dir, 'ussd')
ussd_data_file_path = os.path.join(ussd_data_dir, f'{old_address}.json')
f = open(ussd_data_file_path, 'w')
f.write(json.dumps(ussd_data))
f.close()
# write preferences data
@celery_app.task(bind=True, base=MetadataTask)
def generate_preferences_data(self, data: tuple):
blockchain_address: str = data[0]
preferences = data[1]
preferences_dir = os.path.join(self.import_dir, 'preferences')
preferences_data = {
'preferred_language': ussd_data['preferred_language']
}
preferences_key = generate_metadata_pointer(bytes.fromhex(new_address_clean[2:]), ':cic.preferences')
preferences_key = generate_metadata_pointer(bytes.fromhex(strip_0x(blockchain_address)), ':cic.preferences')
preferences_filepath = os.path.join(preferences_dir, 'meta', preferences_key)
filepath = os.path.join(
preferences_dir,
'new',
@ -164,95 +112,95 @@ def generate_metadata(self, address, phone):
preferences_key.upper() + '.json'
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
f = open(filepath, 'w')
f.write(json.dumps(preferences_data))
f.close()
with open(filepath, 'w') as preferences_file:
preferences_file.write(json.dumps(preferences))
logg.debug(f'written preferences metadata: {preferences} for address: {blockchain_address}')
os.symlink(os.path.realpath(filepath), preferences_filepath)
logg.debug('found metadata {} for phone {}'.format(o, phone))
return address
return blockchain_address
@celery_app.task(bind=True, base=MetadataTask)
def opening_balance_tx(self, address, phone, serial):
old_address = old_address_from_phone(self.import_dir, phone)
def generate_pins_data(self, blockchain_address: str, phone_number: str):
pins_file = f'{self.import_dir}/pins.csv'
file_op = 'a' if os.path.exists(pins_file) else 'w'
with open(pins_file, file_op) as pins_file:
password_hash = uuid.uuid4().hex
pins_file.write(f'{phone_number},{password_hash}\n')
logg.debug(f'written pin data for address: {blockchain_address}')
return blockchain_address
k = to_checksum_address(strip_0x(old_address))
balance = self.balances[k]
logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone))
@celery_app.task(bind=True, base=MetadataTask)
def generate_ussd_data(self, blockchain_address: str, phone_number: str):
ussd_data_file = f'{self.import_dir}/ussd_data.csv'
file_op = 'a' if os.path.exists(ussd_data_file) else 'w'
preferred_language = random.sample(["en", "sw"], 1)[0]
preferences = {'preferred_language': preferred_language}
with open(ussd_data_file, file_op) as ussd_data_file:
ussd_data_file.write(f'{phone_number}, { 1}, {preferred_language}, {False}\n')
logg.debug(f'written ussd data for address: {blockchain_address}')
return blockchain_address, preferences
@celery_app.task(bind=True, base=MetadataTask)
def opening_balance_tx(self, blockchain_address: str, phone_number: str, serial: str):
old_blockchain_address = old_address_from_phone(self.import_dir, phone_number)
address = to_checksum_address(strip_0x(old_blockchain_address))
balance = self.balances[address]
logg.debug(f'found balance: {balance} for address: {address} phone: {phone_number}')
decimal_balance = self.balance_processor.get_decimal_amount(int(balance))
(tx_hash_hex, o) = self.balance_processor.get_rpc_tx(address, decimal_balance, serial)
tx_hash_hex, o = self.balance_processor.get_rpc_tx(blockchain_address, decimal_balance, serial)
tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec)
logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex))
tx_path = os.path.join(
self.import_dir,
'txs',
strip_0x(tx_hash_hex),
)
f = open(tx_path, 'w')
f.write(strip_0x(o))
f.close()
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(tx['nonce']),
)
logg.debug(f'generated tx token value: {decimal_balance}: {blockchain_address} tx hash {tx_hash_hex}')
tx_path = os.path.join(self.import_dir, 'txs', strip_0x(tx_hash_hex))
with open(tx_path, 'w') as tx_file:
tx_file.write(strip_0x(o))
logg.debug(f'written tx with tx hash: {tx["hash"]} for address: {blockchain_address}')
tx_nonce_path = os.path.join(self.import_dir, 'txs', '.' + str(tx['nonce']))
os.symlink(os.path.realpath(tx_path), tx_nonce_path)
return tx['hash']
@celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None,
@celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone_number: str):
identifier = generate_metadata_pointer(phone_number.encode('utf-8'), ':cic.phone')
url = parse.urljoin(self.meta_url(), identifier)
logg.debug(f'attempt getting phone pointer at: {url} for phone: {phone_number}')
r = request.urlopen(url)
address = json.load(r)
address = address.replace('"', '')
logg.debug(f'address: {address} for phone: {phone_number}')
return address
@celery_app.task(autoretry_for=(FileNotFoundError,),
bind=True,
base=ImportTask,
max_retries=None,
default_retry_delay=0.1)
def send_txs(self, nonce):
if nonce == self.count + self.balance_processor.nonce_offset:
logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset,
self.count))
return
logg.debug('attempt to open symlink for nonce {}'.format(nonce))
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(nonce),
)
f = open(tx_nonce_path, 'r')
tx_signed_raw_hex = f.read()
f.close()
os.unlink(tx_nonce_path)
o = raw(add_0x(tx_signed_raw_hex))
tx_hash_hex = self.balance_processor.conn.do(o)
logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) # tx_signed_raw_hex))
nonce += 1
queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'import_task.send_txs',
[
nonce,
],
queue=queue,
)
s.apply_async()
if nonce == self.count + self.balance_processor.nonce_offset:
logg.info(f'reached nonce {nonce} (offset {self.balance_processor.nonce_offset} + count {self.count}).')
celery_app.control.broadcast('shutdown', destination=[f'celery@{queue}'])
logg.debug(f'attempt to open symlink for nonce {nonce}')
tx_nonce_path = os.path.join(self.import_dir, 'txs', '.' + str(nonce))
with open(tx_nonce_path, 'r') as tx_nonce_file:
tx_signed_raw_hex = tx_nonce_file.read()
os.unlink(tx_nonce_path)
o = raw(add_0x(tx_signed_raw_hex))
if self.include_balances:
tx_hash_hex = self.balance_processor.conn.do(o)
logg.info(f'sent nonce {nonce} tx hash {tx_hash_hex}')
nonce += 1
s = celery.signature('import_task.send_txs', [nonce], queue=queue)
s.apply_async()
return nonce
@celery_app.task
def set_pins(config: dict, phone_to_pins: list):
# define db connection
@celery_app.task()
def set_pin_data(config: dict, phone_to_pins: list):
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
@ -261,24 +209,17 @@ def set_pins(config: dict, phone_to_pins: list):
password=config.get('password')
)
db_cursor = db_conn.cursor()
# update db
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
for element in phone_to_pins:
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
db_cursor.execute(sql, (element[1], element[0]))
logg.debug(f'Updating: {element[0]} with: {element[1]}')
# commit changes
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()
@celery_app.task
def set_ussd_data(config: dict, ussd_data: dict):
# define db connection
def set_ussd_data(config: dict, ussd_data: list):
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
@ -287,20 +228,12 @@ def set_ussd_data(config: dict, ussd_data: dict):
password=config.get('password')
)
db_cursor = db_conn.cursor()
# process ussd_data
account_status = 1
if ussd_data['is_activated'] == 1:
account_status = 2
preferred_language = ussd_data['preferred_language']
phone_number = ussd_data['phone']
sql = 'UPDATE account SET status = %s, preferred_language = %s WHERE phone_number = %s'
db_cursor.execute(sql, (account_status, preferred_language, phone_number))
# commit changes
for element in ussd_data:
status = 2 if int(element[1]) == 1 else 1
preferred_language = element[2]
phone_number = element[0]
db_cursor.execute(sql, (status, preferred_language, phone_number))
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()

View File

@ -3,56 +3,61 @@ import argparse
import json
import logging
import os
import redis
import sys
import time
import urllib.request
import uuid
from urllib import request
from urllib.parse import urlencode
# external imports
import celery
import confini
import phonenumbers
import redis
from chainlib.chain import ChainSpec
from cic_types.models.person import Person
from confini import Config
# local imports
from import_util import get_celery_worker_status
default_config_dir = './config'
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic'
arg_parser = argparse.ArgumentParser(description='Daemon worker that handles data seeding tasks.')
# batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
arg_parser.add_argument('--batch-size',
dest='batch_size',
default=100,
type=int,
help='burst size of sending transactions to node')
arg_parser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit data seeding tasks to.')
arg_parser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
arg_parser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
arg_parser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
arg_parser.add_argument('--ussd-host', dest='ussd_host', type=str,
help="host to ussd app responsible for processing ussd requests.")
arg_parser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true')
arg_parser.add_argument('--ussd-port', dest='ussd_port', type=str,
help="port to ussd app responsible for processing ussd requests.")
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain specification string')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int,
help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
argparser.add_argument('--ussd-host', dest='ussd_host', type=str,
help="host to ussd app responsible for processing ussd requests.")
argparser.add_argument('--ussd-port', dest='ussd_port', type=str,
help="port to ussd app responsible for processing ussd requests.")
argparser.add_argument('--ussd-no-ssl', dest='ussd_no_ssl', help='do not use ssl (careful)', action='store_true')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
argparser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config = Config(args.c, args.env_prefix)
config.process()
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
@ -60,44 +65,29 @@ args_override = {
'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'),
}
config.dict_override(args_override, 'cli')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug(f'config loaded from {args.c}:\n{config}')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
get_celery_worker_status(celery_app=celery_app)
old_account_dir = os.path.join(args.import_dir, 'old')
os.stat(old_account_dir)
logg.debug(f'created old system data dir: {old_account_dir}')
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
r = redis.Redis(redis_host, redis_port, redis_db)
new_account_dir = os.path.join(args.import_dir, 'new')
os.makedirs(new_account_dir, exist_ok=True)
logg.debug(f'created new system data dir: {new_account_dir}')
ps = r.pubsub()
person_metadata_dir = os.path.join(args.import_dir, 'meta')
os.makedirs(person_metadata_dir, exist_ok=True)
logg.debug(f'created person metadata dir: {person_metadata_dir}')
user_new_dir = os.path.join(args.user_dir, 'new')
os.makedirs(user_new_dir, exist_ok=True)
ussd_data_dir = os.path.join(args.user_dir, 'ussd')
os.makedirs(ussd_data_dir, exist_ok=True)
preferences_dir = os.path.join(args.user_dir, 'preferences')
preferences_dir = os.path.join(args.import_dir, 'preferences')
os.makedirs(os.path.join(preferences_dir, 'meta'), exist_ok=True)
logg.debug(f'created preferences metadata dir: {preferences_dir}')
meta_dir = os.path.join(args.user_dir, 'meta')
os.makedirs(meta_dir, exist_ok=True)
valid_service_codes = config.get('USSD_SERVICE_CODE').split(",")
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
txs_dir = os.path.join(args.user_dir, 'txs')
os.makedirs(txs_dir, exist_ok=True)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
ussd_port = args.ussd_port
ussd_host = args.ussd_host
ussd_no_ssl = args.ussd_no_ssl
if ussd_no_ssl is True:
ussd_ssl = False
@ -105,7 +95,17 @@ else:
ussd_ssl = True
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
get_celery_worker_status(celery_app)
def build_ussd_request(host: str,
password: str,
phone_number: str,
port: str,
service_code: str,
username: str,
ssl: bool = False):
url = 'http'
if ssl:
url += 's'
@ -115,16 +115,16 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl=
url += '/?username={}&password={}'.format(username, password)
logg.info('ussd service url {}'.format(url))
logg.info('ussd phone {}'.format(phone))
logg.info('ussd phone {}'.format(phone_number))
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': service_code,
'phoneNumber': phone,
'phoneNumber': phone_number,
'text': service_code,
}
req = urllib.request.Request(url)
req = request.Request(url)
req.method = 'POST'
data_str = urlencode(data)
data_bytes = data_str.encode('utf-8')
@ -134,85 +134,77 @@ def build_ussd_request(phone, host, port, service_code, username, password, ssl=
return req
def register_ussd(i, u):
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
logg.debug('tel {} {}'.format(u.tel, phone))
req = build_ussd_request(
phone,
ussd_host,
ussd_port,
config.get('APP_SERVICE_CODE'),
'',
'',
ussd_ssl
)
response = urllib.request.urlopen(req)
def e164_phone_number(phone_number: str):
phone_object = phonenumbers.parse(phone_number)
return phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
def register_account(person: Person):
phone_number = e164_phone_number(person.tel)
logg.debug(f'tel: {phone_number}')
req = build_ussd_request(args.ussd_host,
'',
phone_number,
args.ussd_port,
valid_service_codes[0],
'',
ussd_ssl)
response = request.urlopen(req)
response_data = response.read().decode('utf-8')
state = response_data[:3]
out = response_data[4:]
logg.debug('ussd reponse: {}'.format(out))
logg.debug(f'ussd response: {response_data[4:]}')
if __name__ == '__main__':
i = 0
j = 0
for x in os.walk(user_old_dir):
for x in os.walk(old_account_dir):
for y in x[2]:
if y[len(y) - 5:] != '.json':
continue
# handle json containing person object
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
file_path = os.path.join(x[0], y)
with open(file_path, 'r') as account_file:
try:
account_data = json.load(account_file)
except json.decoder.JSONDecodeError as e:
logg.error('load error for {}: {}'.format(y, e))
continue
person = Person.deserialize(account_data)
register_account(person)
phone_number = e164_phone_number(person.tel)
s_resolve_phone = celery.signature(
'import_task.resolve_phone', [phone_number], queue=args.q
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
s_person_metadata = celery.signature(
'import_task.generate_person_metadata', [phone_number], queue=args.q
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
s_ussd_data = celery.signature(
'import_task.generate_ussd_data', [phone_number], queue=args.q
)
s_meta.link(s_balance)
s_phone.link(s_meta)
# block time plus a bit of time for ussd processing
s_phone.apply_async(countdown=7)
s_preferences_metadata = celery.signature(
'import_task.generate_preferences_data', [], queue=args.q
)
s_pins_data = celery.signature(
'import_task.generate_pins_data', [phone_number], queue=args.q
)
s_opening_balance = celery.signature(
'import_task.opening_balance_tx', [phone_number, i], queue=args.q
)
celery.chain(s_resolve_phone,
s_person_metadata,
s_ussd_data,
s_preferences_metadata,
s_pins_data,
s_opening_balance).apply_async(countdown=7)
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
sys.stdout.write('imported: {} {}'.format(i, person).ljust(200) + "\r\n")
j += 1
if j == batch_size:
time.sleep(batch_delay)
if j == args.batch_size:
time.sleep(args.batch_delay)
j = 0

View File

@ -1,67 +1,67 @@
# standard imports
import argparse
import json
import csv
import logging
import os
import psycopg2
# external imports
import celery
from confini import Config
# local imports
default_config_dir = './config'
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic'
arg_parser = argparse.ArgumentParser(description='Pins import script.')
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use.')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration.')
arg_parser.add_argument('import_dir', default='out', type=str, help='user export directory')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config file')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='Task queue')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config_dir = args.c
config = Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config = Config(args.c, args.env_prefix)
config.process()
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
config.censor('PASSWORD', 'DATABASE')
logg.debug(f'config loaded from {args.c}:\n{config}')
ussd_data_dir = os.path.join(args.user_dir, 'ussd')
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
def main():
with open(f'{args.import_dir}/ussd_data.csv') as ussd_data_file:
ussd_data = [tuple(row) for row in csv.reader(ussd_data_file)]
db_conn = psycopg2.connect(
database=config.get('DATABASE_NAME'),
host=config.get('DATABASE_HOST'),
port=config.get('DATABASE_PORT'),
user=config.get('DATABASE_USER'),
password=config.get('DATABASE_PASSWORD')
)
db_cursor = db_conn.cursor()
sql = 'UPDATE account SET status = %s, preferred_language = %s WHERE phone_number = %s'
for element in ussd_data:
status = 2 if int(element[1]) == 1 else 1
preferred_language = element[2]
phone_number = element[0]
db_cursor.execute(sql, (status, preferred_language, phone_number))
logg.debug(f'Updating account:{phone_number} with: preferred language: {preferred_language} status: {status}.')
db_conn.commit()
db_cursor.close()
db_conn.close()
if __name__ == '__main__':
for x in os.walk(ussd_data_dir):
for y in x[2]:
if y[len(y) - 5:] == '.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
ussd_data = json.load(f)
logg.debug(f'LOADING USSD DATA: {ussd_data}')
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
s_set_ussd_data = celery.signature(
'import_task.set_ussd_data',
[db_configs, ussd_data]
)
s_set_ussd_data.apply_async(queue='cic-import-ussd')
main()

View File

@ -1,27 +1,4 @@
[app]
ALLOWED_IP=0.0.0.0/0
LOCALE_FALLBACK=en
LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json
user =
pass =
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/
[client]
host =
port =
ssl =
[keystore]
file_path = keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c
allowed_ip=0.0.0.0/0
max_body_length=1024
password_pepper=

View File

@ -1,10 +1,10 @@
[database]
NAME=sempo
USER=postgres
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
POOL_SIZE=1
name=cic_ussd
user=postgres
password=
host=localhost
port=5432
engine=postgresql
driver=psycopg2
debug=0
pool_size=1

View File

@ -0,0 +1,5 @@
[ussd]
menu_file=data/ussd_menu.json
service_code=*483*46#,*483*061#,*384*96#
user =
pass =

View File

@ -1,91 +0,0 @@
# standard imports
import argparse
import json
import logging
import os
import uuid
# third-party imports
import bcrypt
import celery
import confini
import phonenumbers
import random
from cic_types.models.person import Person
from cryptography.fernet import Fernet
# local imports
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(script_dir, 'config'))
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='Config dir')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('--userdir', type=str, help='path to users export dir tree')
arg_parser.add_argument('pins_dir', type=str, help='path to pin export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config = confini.Config(args.c, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
logg.info('loaded config\n{}'.format(config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
user_dir = args.userdir
pins_dir = args.pins_dir
def generate_password_hash():
key = Fernet.generate_key()
fnt = Fernet(key)
pin = str(random.randint(1000, 9999))
return fnt.encrypt(bcrypt.hashpw(pin.encode('utf-8'), bcrypt.gensalt())).decode()
user_old_dir = os.path.join(user_dir, 'old')
logg.debug(f'reading user data from: {user_old_dir}')
pins_file = open(f'{pins_dir}/pins.csv', 'w')
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for y in x[2]:
# skip non-json files
if y[len(y) - 5:] != '.json':
continue
# define file path for
filepath = None
if y[:15] != '_ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
password_hash = uuid.uuid4().hex
pins_file.write(f'{phone},{password_hash}\n')
logg.info(f'Writing phone: {phone}, password_hash: {password_hash}')
pins_file.close()

View File

@ -9,7 +9,9 @@ COPY package.json \
package-lock.json \
.
RUN --mount=type=cache,mode=0755,target=/root/node_modules npm install
RUN npm ci --production
#RUN --mount=type=cache,mode=0755,target=/root/node_modules npm install
COPY requirements.txt .

View File

@ -0,0 +1,60 @@
#!/usr/bin/env bash
set -e
echo "Creating seed data..."
python create_import_users.py -vv --dir "$IMPORT_DIR" "$ACCOUNT_COUNT"
wait $!
echo "Purge tasks from celery worker"
celery -A cic_ussd.import_task purge -Q "$CELERY_QUEUE" --broker redis://"$REDIS_HOST":"$REDIS_PORT" -f
echo "Start celery work and import balance job"
if [ "$INCLUDE_BALANCES" != "y" ]
then
echo "Running worker without opening balance transactions"
TARGET_TX_COUNT=$ACCOUNT_COUNT
python cic_ussd/import_balance.py -vv -c "$CONFIG" -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --token-symbol "$TOKEN_SYMBOL" -y "$KEYSTORE_PATH" "$IMPORT_DIR" &
else
echo "Running worker with opening balance transactions"
TARGET_TX_COUNT=$((ACCOUNT_COUNT*2))
python cic_ussd/import_balance.py -vv -c "$CONFIG" -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --include-balances --token-symbol "$TOKEN_SYMBOL" -y "$KEYSTORE_PATH" "$IMPORT_DIR" &
fi
until [ -f ./cic-import-ussd.pid ]
do
echo "Polling for celery worker pid file..."
sleep 1
done
IMPORT_BALANCE_JOB=$(<cic-import-ussd.pid)
echo "Start import users job"
if [ "$USSD_SSL" == "y" ]
then
echo "Targeting secure ussd-user server"
python cic_ussd/import_users.py -vv -c "$CONFIG" --ussd-host "$USSD_HOST" --ussd-port "$USSD_PORT" "$IMPORT_DIR"
else
python cic_ussd/import_users.py -vv -c "$CONFIG" --ussd-host "$USSD_HOST" --ussd-port "$USSD_PORT" --ussd-no-ssl "$IMPORT_DIR"
fi
echo "Waiting for import balance job to complete ..."
tail --pid="$IMPORT_BALANCE_JOB" -f /dev/null
set -e
echo "Importing pins"
python cic_ussd/import_pins.py -c "$CONFIG" -vv "$IMPORT_DIR"
set +e
wait $!
set -e
echo "Importing ussd data"
python cic_ussd/import_ussd_data.py -c "$CONFIG" -vv "$IMPORT_DIR"
set +e
wait $!
echo "Importing person metadata"
node cic_meta/import_meta.js "$IMPORT_DIR" "$ACCOUNT_COUNT"
echo "Import preferences metadata"
node cic_meta/import_meta_preferences.js "$IMPORT_DIR" "$ACCOUNT_COUNT"
CIC_NOTIFY_DATABASE=postgres://$DATABASE_USER:$DATABASE_PASSWORD@$DATABASE_HOST:$DATABASE_PORT/$NOTIFY_DATABASE_NAME
NOTIFICATION_COUNT=$(psql -qtA "$CIC_NOTIFY_DATABASE" -c 'SELECT COUNT(message) FROM notification WHERE message IS NOT NULL')
while [[ "$NOTIFICATION_COUNT" < "$TARGET_TX_COUNT" ]]
do
NOTIFICATION_COUNT=$(psql -qtA "$CIC_NOTIFY_DATABASE" -c 'SELECT COUNT(message) FROM notification WHERE message IS NOT NULL')
sleep 5
echo "Notification count is: ${NOTIFICATION_COUNT}. Checking after 5 ..."
done
python verify.py -c "$CONFIG" -v -p "$ETH_PROVIDER" -r "$CIC_REGISTRY_ADDRESS" --exclude "$EXCLUSIONS" --token-symbol "$TOKEN_SYMBOL" "$IMPORT_DIR"

File diff suppressed because it is too large Load Diff

View File

@ -197,9 +197,10 @@ def send_ussd_request(address, data_dir):
phone = p.tel
session = uuid.uuid4().hex
valid_service_codes = config.get('USSD_SERVICE_CODE').split(",")
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'serviceCode': valid_service_codes[0],
'phoneNumber': phone,
'text': '',
}

View File

@ -13,20 +13,8 @@ networks:
name: cic-network
services:
# eth:
# image: trufflesuite/ganache-cli
# ports:
# - ${HTTP_PORT_ETH:-8545}
# - ${WS_PORT_ETH:-8546}
# # Note! -e switch doesnt work, whatever you put there, it will be 100
# command: "-i 8996 -e 1000 -l 90000000 \
# -m '${DEV_MNEMONIC:-\"history stumble mystery avoid embark arrive mom foil pledge keep grain dice\"}' \
# -v --db /tmp/cic/ganache/ganache.db \
# --noVMErrorsOnRPCResponse --allowUnlimitedContractSize"
# volumes:
# - ganache-db:/tmp/cic/ganache
eth:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/bloxberg-node:${TAG:-latest}
build:
context: apps/bloxbergValidatorSetup
restart: unless-stopped
@ -71,6 +59,7 @@ services:
- bee-data:/tmp/cic/bee
contract-migration:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/contract-migration:${TAG:-latest}
profiles:
- migrations
build:
@ -129,6 +118,7 @@ services:
- contract-config:/tmp/cic/config
cic-cache-tracker:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-cache:${TAG:-latest}
profiles:
- cache
build:
@ -170,6 +160,7 @@ services:
- contract-config:/tmp/cic/config/:ro
cic-cache-tasker:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-cache:${TAG:-latest}
profiles:
- cache
build:
@ -210,6 +201,7 @@ services:
- contract-config:/tmp/cic/config/:ro
cic-cache-server:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-cache:${TAG:-latest}
profiles:
- cache
build:
@ -245,6 +237,7 @@ services:
cic-eth-tasker:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-eth:${TAG:-latest}
build:
context: apps/cic-eth
dockerfile: docker/Dockerfile
@ -298,6 +291,7 @@ services:
# command: [/bin/sh, "./start_tasker.sh", -q, cic-eth, -vv ]
cic-eth-tracker:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-eth:${TAG:-latest}
build:
context: apps/cic-eth
dockerfile: docker/Dockerfile
@ -342,6 +336,7 @@ services:
cic-eth-dispatcher:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-eth:${TAG:-latest}
build:
context: apps/cic-eth
dockerfile: docker/Dockerfile
@ -386,6 +381,7 @@ services:
cic-eth-retrier:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-eth:${TAG:-latest}
build:
context: apps/cic-eth
dockerfile: docker/Dockerfile
@ -433,6 +429,7 @@ services:
cic-notify-tasker:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-notify:${TAG:-latest}
build:
context: apps/cic-notify
dockerfile: docker/Dockerfile
@ -461,6 +458,7 @@ services:
cic-meta-server:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-meta:${TAG:-latest}
profiles:
- custodial-meta
hostname: meta
@ -496,6 +494,7 @@ services:
# command: "/root/start_server.sh -vv"
cic-user-ussd-server:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-ussd:${TAG:-latest}
profiles:
- custodial-ussd
build:
@ -528,6 +527,7 @@ services:
command: "/root/start_cic_user_ussd_server.sh -vv"
cic-user-server:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-ussd:${TAG:-latest}
profiles:
- custodial-ussd
build:
@ -553,6 +553,7 @@ services:
command: "/root/start_cic_user_server.sh -vv"
cic-user-tasker:
image: registry.gitlab.com/grassrootseconomics/cic-internal-integration/cic-ussd:${TAG:-latest}
profiles:
- custodial-ussd
build:

9
scripts/build-push.sh Executable file
View File

@ -0,0 +1,9 @@
#! /usr/bin/env sh
# Exit in case of error
set -e
TAG=${TAG?Variable not set} \
sh ./scripts/build.sh
docker-compose -f docker-compose.yml push

9
scripts/build.sh Executable file
View File

@ -0,0 +1,9 @@
#! /usr/bin/env sh
# Exit in case of error
set -e
TAG=${TAG?Variable not set} \
docker-compose \
-f docker-compose.yml \
build

15
scripts/test-local.sh Executable file
View File

@ -0,0 +1,15 @@
#! /usr/bin/env bash
# Exit in case of error
set -e
docker-compose down -v --remove-orphans # Remove possibly previous broken stacks left hanging after an error
if [ $(uname -s) = "Linux" ]; then
echo "Remove __pycache__ files"
sudo find . -type d -name __pycache__ -exec rm -r {} \+
fi
docker-compose build
docker-compose up -d
docker-compose exec -T backend bash /app/tests-start.sh "$@"