Compare commits

..

12 Commits

Author SHA1 Message Date
8c453a9190 Fix minor syntactical error. 2021-04-07 10:15:41 +03:00
b101496402 Add top level phone number validation. 2021-04-07 10:15:15 +03:00
nolash
80dc84eb3a Merge remote-tracking branch 'origin/master' into lash/meta-index-phone 2021-04-07 08:31:27 +02:00
Louis Holbrook
5a4e0b8eba Merge branch 'lash/external-notify-tasks' into 'master'
Discover queue for external tasks

See merge request grassrootseconomics/cic-internal-integration!89
2021-04-07 06:31:26 +00:00
Louis Holbrook
226699568f Discover queue for external tasks 2021-04-07 06:31:26 +00:00
nolash
fe07d28193 Swap key and value on phone index 2021-04-07 08:29:12 +02:00
Louis Holbrook
ec2b0e56e5 Merge branch 'lash/add-missing-state-file' into 'master'
Add stat file

See merge request grassrootseconomics/cic-internal-integration!92
2021-04-07 06:26:44 +00:00
nolash
6ffaca5207 Add stat file 2021-04-07 08:24:10 +02:00
nolash
aed58e62df Add phone metadata generator 2021-04-07 08:21:15 +02:00
Louis Holbrook
5c6375c9ec Merge branch 'lash/calculate-block-time' into 'master'
Add block time calc to retry, tracker

See merge request grassrootseconomics/cic-internal-integration!90
2021-04-06 19:11:43 +00:00
Louis Holbrook
99f55f01ed Add block time calc to retry, tracker 2021-04-06 19:11:42 +00:00
Louis Holbrook
086308fdb8 Merge branch 'lash/remove-stray-stat' into 'master'
Remove stray chainlib stat

See merge request grassrootseconomics/cic-internal-integration!88
2021-04-06 18:11:31 +00:00
26 changed files with 285 additions and 87 deletions

View File

@@ -1,13 +1,13 @@
cic-base~=0.1.2a58
cic-base~=0.1.2a62
alembic==1.4.2
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.4a10
cic-eth-registry~=0.5.4a12
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainlib~=0.0.2a2
chainlib~=0.0.2a5
chainsyncer~=0.0.1a21

View File

@@ -20,6 +20,7 @@ from cic_eth.admin.ctrl import lock_send
from cic_eth.db.enum import LockEnum
from cic_eth.runnable.daemons.filters.straggler import StragglerFilter
from cic_eth.sync.retry import RetrySyncer
from cic_eth.stat import init_chain_stat
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -71,57 +72,21 @@ RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='def
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
## TODO: we already have the signed raw tx in get, so its a waste of cycles to get_tx here
#def sendfail_filter(w3, tx_hash, rcpt, chain_spec):
# tx_dict = get_tx(tx_hash)
# tx = unpack(tx_dict['signed_tx'], chain_spec)
# logg.debug('submitting tx {} for retry'.format(tx_hash))
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
# [
# tx_hash,
# chain_str,
# LockEnum.QUEUE,
# tx['from'],
# ],
# queue=queue,
# )
## s_resume = celery.signature(
## 'cic_eth.eth.tx.resume_tx',
## [
## chain_str,
## ],
## queue=queue,
## )
#
## s_retry_status = celery.signature(
## 'cic_eth.queue.state.set_ready',
## [],
## queue=queue,
## )
# s_resend = celery.signature(
# 'cic_eth.eth.gas.resend_with_higher_gas',
# [
# chain_str,
# ],
# queue=queue,
# )
#
# #s_resume.link(s_retry_status)
# #s_check.link(s_resume)
# s_check.link(s_resend)
# s_check.apply_async()
def main():
conn = RPCConnection.connect(chain_spec, 'default')
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None:
stat = init_chain_stat(conn)
loop_interval = stat.block_average()
syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE'))
syncer.backend.set(0, 0)
fltr = StragglerFilter(chain_spec, queue=queue)
syncer.add_filter(fltr)
syncer.loop(float(straggler_delay), conn)
syncer.loop(int(loop_interval), conn)
if __name__ == '__main__':

View File

@@ -7,7 +7,7 @@ import argparse
import sys
import re
# third-party imports
# external imports
import confini
import celery
import rlp
@@ -42,6 +42,7 @@ from cic_eth.runnable.daemons.filters import (
RegistrationFilter,
TransferAuthFilter,
)
from cic_eth.stat import init_chain_stat
script_dir = os.path.realpath(os.path.dirname(__file__))
@@ -78,6 +79,11 @@ def main():
block_current = int(r, 16)
block_offset = block_current + 1
loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None:
stat = init_chain_stat(rpc, block_start=block_current)
loop_interval = stat.block_average()
logg.debug('starting at block {}'.format(block_offset))
syncers = []
@@ -140,7 +146,8 @@ def main():
for cf in callback_filters:
syncer.add_filter(cf)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
#r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
r = syncer.loop(int(loop_interval), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1

View File

@@ -0,0 +1,33 @@
# standard imports
import logging
# external imports
from chainlib.stat import ChainStat
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
logg = logging.getLogger().getChild(__name__)
BLOCK_SAMPLES = 10
def init_chain_stat(rpc, block_start=0):
stat = ChainStat()
if block_start == 0:
o = block_latest()
r = rpc.do(o)
block_start = int(r, 16)
for i in range(BLOCK_SAMPLES):
o = block_by_number(block_start-10+i)
block_src = rpc.do(o)
logg.debug('block {}'.format(block_src))
block = Block(block_src)
stat.block_apply(block)
logg.debug('calculated block time {} from {} block samples'.format(stat.block_average(), BLOCK_SAMPLES))
return stat

View File

@@ -2,3 +2,4 @@
registry_address =
chain_spec = evm:bloxberg:8996
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C
tx_retry_delay = 20

View File

@@ -1,2 +1,2 @@
[SYNCER]
loop_interval = 1
loop_interval =

View File

@@ -1,2 +1,2 @@
[SYNCER]
loop_interval = 1
loop_interval =

View File

@@ -29,7 +29,7 @@ RUN /usr/local/bin/python -m pip install --upgrade pip
# python merge_requirements.py | tee merged_requirements.txt
#RUN cd cic-base && \
# pip install $pip_extra_index_url_flag -r ./merged_requirements.txt
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a61
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a62
COPY cic-eth/scripts/ scripts/
COPY cic-eth/setup.cfg cic-eth/setup.py ./

View File

@@ -1,4 +1,4 @@
cic-base~=0.1.2a60
cic-base~=0.1.2a62
celery==4.4.7
crypto-dev-signer~=0.4.14a17
confini~=0.3.6rc3
@@ -16,7 +16,7 @@ semver==2.13.0
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.1a7
chainlib~=0.0.2a4
chainlib~=0.0.2a5
hexathon~=0.0.1a7
chainsyncer~=0.0.1a21
chainqueue~=0.0.1a7

View File

@@ -3,6 +3,7 @@ import logging
import re
# third-party imports
from celery.app.control import Inspect
import celery
# local imports
@@ -15,6 +16,29 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
@@ -22,17 +46,9 @@ class Api:
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
registered_tasks = app.tasks
self.sms_tasks = []
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
@@ -44,12 +60,17 @@ class Api:
:rtype: Celery.Task
"""
signatures = []
for task in self.sms_tasks:
signature = celery.signature(task)
for q in self.sms_tasks:
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
)
signatures.append(signature)
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result
t = celery.group(signatures)()
return t

View File

@@ -0,0 +1,76 @@
# standard imports
import sys
import os
import logging
import argparse
import tempfile
# external imports
import celery
import confini
# local imports
from cic_notify.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('recipient', type=str, help='notification recipient')
argparser.add_argument('message', type=str, help='message text')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.add(args.recipient, '_RECIPIENT', True)
config.add(args.message, '_MESSAGE', True)
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
if __name__ == '__main__':
a = Api()
t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))

View File

@@ -6,7 +6,7 @@ RUN apt-get update && \
WORKDIR /usr/src/cic-notify
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a44
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a62
COPY cic-notify/setup.cfg \
cic-notify/setup.py \

View File

@@ -1 +1 @@
cic_base[full_graph]~=0.1.2a46
cic_base[full_graph]~=0.1.2a61

View File

@@ -1,6 +1,6 @@
[metadata]
name = cic-notify
version= 0.4.0a2
version= 0.4.0a3
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no
@@ -45,3 +45,4 @@ testing =
[options.entry_points]
console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main

View File

@@ -6,6 +6,9 @@ MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json

View File

@@ -18,7 +18,7 @@ class ActionDataNotFoundError(OSError):
pass
class UserMetadataNotFoundError(OSError):
class MetadataNotFoundError(OSError):
"""Raised when metadata is expected but not available in cache."""
pass
@@ -31,3 +31,10 @@ class UnsupportedMethodError(OSError):
class CachedDataNotFoundError(OSError):
"""Raised when the method passed to the make request function is unsupported."""
pass
class MetadataStoreError(Exception):
"""Raised when metadata storage fails"""
pass

View File

@@ -3,7 +3,10 @@
# third-party imports
import requests
from chainlib.eth.address import to_checksum
from hexathon import add_0x
from hexathon import (
add_0x,
strip_0x,
)
# local imports
from cic_ussd.error import UnsupportedMethodError
@@ -40,4 +43,4 @@ def blockchain_address_to_metadata_pointer(blockchain_address: str):
:return:
:rtype:
"""
return bytes.fromhex(blockchain_address[2:])
return bytes.fromhex(strip_0x(blockchain_address))

View File

@@ -0,0 +1,47 @@
# standard imports
import logging
import os
# external imports
from cic_types.models.person import generate_metadata_pointer
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
# local imports
from cic_ussd.error import MetadataStoreError
logg = logging.getLogger().getChild(__name__)
class PhonePointerMetadata:
base_url = None
def __init__(self, identifier: bytes, engine: str):
"""
:param identifier:
:type identifier:
"""
self.headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
self.identifier = identifier
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier,
cic_type='cic.phone'
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
self.engine = engine
def create(self, data: str):
try:
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata = result.content
self.edit(data=metadata, engine=self.engine)
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise MetadataStoreError(error)

View File

@@ -12,6 +12,7 @@ from cic_ussd.chain import Chain
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
from cic_ussd.error import MetadataStoreError
logg = logging.getLogger()
@@ -28,7 +29,7 @@ class UserMetadata:
:param identifier:
:type identifier:
"""
self. headers = {
self.headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
@@ -49,7 +50,7 @@ class UserMetadata:
logg.info(f'Get sign material response status: {result.status_code}')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
raise MetadataStoreError(error)
def edit(self, data: bytes, engine: str):
"""
@@ -79,7 +80,7 @@ class UserMetadata:
logg.info(f'Signed content submission status: {result.status_code}.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
raise MetadataStoreError(error)
def query(self):
result = make_request(method='GET', url=self.url)
@@ -99,4 +100,4 @@ class UserMetadata:
logg.info('The data is not available and might need to be added.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
raise MetadataNotFoundError(error)

View File

@@ -15,7 +15,7 @@ from cic_ussd.balance import BalanceManager, compute_operational_balance, get_ca
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.error import UserMetadataNotFoundError
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.phone_number import get_user_by_phone_number
@@ -235,7 +235,7 @@ def process_display_user_metadata(user: User, display_key: str):
products=products
)
else:
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {user.blockchain_address}')
raise MetadataNotFoundError(f'Expected user metadata but found none in cache for key: {user.blockchain_address}')
def process_account_statement(user: User, display_key: str, ussd_session: dict):

View File

@@ -27,6 +27,7 @@ from cic_ussd.metadata.user import UserMetadata
from cic_ussd.operations import (define_response_with_content,
process_menu_interaction_requests,
define_multilingual_responses)
from cic_ussd.phone_number import process_phone_number
from cic_ussd.redis import InMemoryStore
from cic_ussd.requests import (get_request_endpoint,
get_request_method,
@@ -151,6 +152,10 @@ def application(env, start_response):
external_session_id = post_data.get('sessionId')
user_input = post_data.get('text')
# add validation for phone number
if phone_number:
phone_number = process_phone_number(phone_number=phone_number, region=config.get('PHONE_NUMBER_REGION'))
# validate ip address
if not check_ip(config=config, env=env):
start_response('403 Sneaky, sneaky', errors_headers)

View File

@@ -11,7 +11,7 @@ from cic_types.models.person import generate_vcard_from_contact_data, manage_ide
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import User
from cic_ussd.error import UserMetadataNotFoundError
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.operations import save_to_in_memory_ussd_session_data
from cic_ussd.redis import get_cached_data
@@ -181,7 +181,7 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
user_metadata = get_cached_data(key=key)
if not user_metadata:
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
raise MetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
given_name = ussd_session.get('session_data').get('given_name')
family_name = ussd_session.get('session_data').get('family_name')

View File

@@ -5,6 +5,7 @@ import celery
import sqlalchemy
# local imports
from cic_ussd.error import MetadataStoreError
class CriticalTask(celery.Task):
@@ -18,3 +19,9 @@ class CriticalSQLAlchemyTask(CriticalTask):
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
)
class CriticalMetadataTask(CriticalTask):
autoretry_for = (
MetadataStoreError,
)

View File

@@ -53,6 +53,18 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
session.add(user)
session.commit()
session.close()
queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'cic_ussd.tasks.metadata.add_phone_pointer',
[
result,
phone_number,
'pgp',
],
queue=queue,
)
s.apply_async()
# expire cache
cache.expire(task_id, timedelta(seconds=180))

View File

@@ -8,6 +8,8 @@ import celery
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.phone import PhonePointerMetadata
from cic_ussd.tasks.base import CriticalMetadataTask
celery_app = celery.current_app
logg = logging.getLogger()
@@ -46,3 +48,10 @@ def edit_user_metadata(blockchain_address: str, data: bytes, engine: str):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.edit(data=data, engine=engine)
@celery_app.task(bind=True, base=CriticalMetadataTask)
def add_phone_pointer(blockchain_address: str, phone: str, engine: str):
stripped_address = strip_0x(blockchain_address)
phone_metadata_client = PhonePointerMetadata(identifier=phone, engine=engine)
phone_metadata_client.create(data=stripped_address)

View File

@@ -57,7 +57,7 @@ WORKDIR /home/grassroots
USER grassroots
ARG pip_extra_index_url=https://pip.grassrootseconomics.net:8433
ARG cic_base_version=0.1.2a61
ARG cic_base_version=0.1.2a62
ARG cic_eth_version=0.11.0b1
ARG sarafu_faucet_version=0.0.2a19
ARG cic_contracts_version=0.0.2a2