Compare commits

..

43 Commits

Author SHA1 Message Date
Spencer Ofwiti
1c846234a3 Remove leading '0x' from saved address. 2021-04-22 14:58:26 +03:00
Spencer Ofwiti
d0e39a6fb2 Refactor import meta phone to set account address. 2021-04-21 15:04:04 +03:00
Spencer Ofwiti
6ac04cbdc6 Merge branch 'master' into spencer/refactor-meta-library 2021-04-21 09:38:22 +03:00
Louis Holbrook
0123ce13ea Merge branch 'lash/settable-gas-price' into 'master'
Adapt deployment to Bloxberg

See merge request grassrootseconomics/cic-internal-integration!99
2021-04-21 05:46:42 +00:00
Louis Holbrook
03b3e8cd3f Adapt deployment to Bloxberg 2021-04-21 05:46:42 +00:00
Spencer Ofwiti
df8a031b5a Merge branch 'master' into spencer/refactor-meta-library
# Conflicts:
#	apps/contract-migration/scripts/package-lock.json
#	apps/contract-migration/scripts/package.json
2021-04-20 18:53:33 +03:00
Louis Holbrook
3ee84f780e Merge branch 'lash/cic-cache-syncer-backend-mixup' into 'master'
CIC-cache backend syncer mixup

See merge request grassrootseconomics/cic-internal-integration!106
2021-04-20 13:25:03 +00:00
Louis Holbrook
95269f69ed CIC-cache backend syncer mixup 2021-04-20 13:25:02 +00:00
Spencer Ofwiti
266edd20c8 Merge branch 'master' into spencer/refactor-meta-library 2021-04-20 08:06:30 +03:00
621780e9b6 Update .cic-template.yml 2021-04-19 17:56:19 +00:00
Spencer Ofwiti
9310f7cce1 Merge branch 'master' into spencer/refactor-meta-library 2021-04-19 14:55:26 +03:00
eecdca1a55 Merge branch 'philip/ussd-db-fixes' into 'master'
Philip/ussd db fixes

See merge request grassrootseconomics/cic-internal-integration!107
2021-04-19 08:44:41 +00:00
6fef0ecec9 Philip/ussd db fixes 2021-04-19 08:44:40 +00:00
Louis Holbrook
6b89a2da89 Merge branch 'lash/chainlib-regression' into 'master'
Correct chainlib import paths

See merge request grassrootseconomics/cic-internal-integration!101
2021-04-16 21:44:14 +00:00
Louis Holbrook
254f2a266b Correct chainlib import paths 2021-04-16 21:44:14 +00:00
ba18914498 Merge branch 'philip/fix-filter-callbacks' into 'master'
Philip/fix filter callbacks

See merge request grassrootseconomics/cic-internal-integration!95
2021-04-16 20:24:07 +00:00
f410e8b7e3 Philip/fix filter callbacks 2021-04-16 20:24:07 +00:00
01454c9ac0 Merge branch 'add-chainsync-db' into 'master'
Add chainsync db

See merge request grassrootseconomics/cic-internal-integration!105
2021-04-15 21:22:07 +00:00
462d7046ed Add chainsync db 2021-04-15 21:22:07 +00:00
f91b491251 Merge branch 'ida/changes-to-args-commandline' into 'master'
Ida/changes to args commandline

See merge request grassrootseconomics/cic-internal-integration!104
2021-04-15 17:04:17 +00:00
0de79521dc Ida/changes to args commandline 2021-04-15 17:04:16 +00:00
nolash
22ec8e2e0e Update sql backend symbol name, deps 2021-04-15 18:04:47 +02:00
Louis Holbrook
a8529ae2ef Merge branch 'lash/improve-cic-cache-service' into 'master'
Iimprove cic cache service

See merge request grassrootseconomics/cic-internal-integration!100
2021-04-15 14:02:11 +00:00
Louis Holbrook
98ddf56a1d Iimprove cic cache service 2021-04-15 14:02:09 +00:00
Spencer Ofwiti
395080340b Fix depenndency import. 2021-04-15 11:12:20 +03:00
Spencer Ofwiti
2f8531c195 Refactor scripts to use imports from crdt-meta. 2021-04-14 16:42:41 +03:00
Spencer Ofwiti
426818baa1 Bump dependency versions. 2021-04-14 16:37:55 +03:00
Spencer Ofwiti
5853aab840 Merge branch 'master' into spencer/refactor-meta-library 2021-04-14 13:59:04 +03:00
bee602b16a Merge branch 'philip/leaner-metadata-handling' into 'master'
Philip/leaner metadata handling

See merge request grassrootseconomics/cic-internal-integration!94
2021-04-14 09:00:11 +00:00
c67274846f Philip/leaner metadata handling 2021-04-14 09:00:10 +00:00
Louis Holbrook
48570b2338 Merge branch 'lash/update-syncer-imports' into 'master'
Update syncer imports

See merge request grassrootseconomics/cic-internal-integration!97
2021-04-14 08:17:48 +00:00
Louis Holbrook
c80b8771b9 Update syncer imports 2021-04-14 08:17:47 +00:00
Spencer Ofwiti
edbd7ffdf5 Remove library files into crdt-meta. 2021-04-14 09:48:29 +03:00
Louis Holbrook
6c6db7bc7b Merge branch 'lash/cache-tracker-history' into 'master'
Fix missing history syncer in cic-cache-tracker

See merge request grassrootseconomics/cic-internal-integration!96
2021-04-13 14:48:25 +00:00
nolash
bb941acd7e Fix missing history syncer in cic-cache-tracker 2021-04-13 15:31:40 +02:00
Louis Holbrook
7dee7de26e Merge branch 'lash/import-ussd' into 'master'
Implement migration script with ussd and notify

See merge request grassrootseconomics/cic-internal-integration!87
2021-04-09 13:00:15 +00:00
Louis Holbrook
7b16a36a62 Implement migration script with ussd and notify 2021-04-09 13:00:15 +00:00
Louis Holbrook
5a4e0b8eba Merge branch 'lash/external-notify-tasks' into 'master'
Discover queue for external tasks

See merge request grassrootseconomics/cic-internal-integration!89
2021-04-07 06:31:26 +00:00
Louis Holbrook
226699568f Discover queue for external tasks 2021-04-07 06:31:26 +00:00
Louis Holbrook
ec2b0e56e5 Merge branch 'lash/add-missing-state-file' into 'master'
Add stat file

See merge request grassrootseconomics/cic-internal-integration!92
2021-04-07 06:26:44 +00:00
nolash
6ffaca5207 Add stat file 2021-04-07 08:24:10 +02:00
Louis Holbrook
5c6375c9ec Merge branch 'lash/calculate-block-time' into 'master'
Add block time calc to retry, tracker

See merge request grassrootseconomics/cic-internal-integration!90
2021-04-06 19:11:43 +00:00
Louis Holbrook
99f55f01ed Add block time calc to retry, tracker 2021-04-06 19:11:42 +00:00
143 changed files with 2963 additions and 3525 deletions

4
.gitignore vendored
View File

@@ -1,2 +1,6 @@
service-configs/*
!service-configs/.gitkeep
**/node_modules/
__pycache__
*.pyc
*.o

View File

@@ -67,6 +67,7 @@ class ERC20TransferFilter(SyncFilter):
tx.status == Status.SUCCESS,
block.timestamp,
)
db_session.flush()
#db_session.flush()
db_session.commit()
return True

View File

@@ -26,9 +26,10 @@ from chainlib.eth.block import (
from hexathon import (
strip_0x,
)
from chainsyncer.backend import SyncerBackend
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
@@ -70,19 +71,21 @@ def main():
syncers = []
#if SyncerBackend.first(chain_spec):
# backend = SyncerBackend.initial(chain_spec, block_offset)
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')

View File

@@ -17,7 +17,7 @@ RUN apt-get update && \
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a58
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a76
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
@@ -47,6 +47,9 @@ RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server

View File

@@ -0,0 +1,6 @@
#!/bin/bash
set -e
>&2 echo executing database migration
python scripts/migrate.py -c /usr/local/etc/cic-cache --migrations-dir /usr/local/share/cic-cache/alembic -vv
set +e

View File

@@ -0,0 +1,5 @@
#!/bin/bash
. ./db.sh
/usr/local/bin/cic-cache-trackerd $@

View File

@@ -1,13 +1,12 @@
cic-base~=0.1.2a62
cic-base~=0.1.2a77
alembic==1.4.2
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.4a12
cic-eth-registry~=0.5.4a16
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainlib~=0.0.2a5
chainsyncer~=0.0.1a21
chainsyncer[sql]~=0.0.2a2

View File

@@ -10,6 +10,7 @@ from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger()
@@ -64,6 +65,7 @@ class SessionBase(Model):
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
@@ -74,17 +76,22 @@ class SessionBase(Model):
echo=debug,
)
else:
if debug:
if pool_size == 0:
logg.info('db using nullpool')
poolclass = NullPool
elif debug:
logg.info('db using assertion pool')
poolclass = AssertionPool
else:
logg.info('db using static pool')
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db not poolable')
e = create_engine(
dsn,
echo=debug,

View File

@@ -1,6 +1,10 @@
# extended imports
# external imports
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status as TxStatus
from cic_eth_registry.erc20 import ERC20Token
# local imports
from cic_eth.ext.address import translate_address
class ExtendedTx:
@@ -27,12 +31,12 @@ class ExtendedTx:
self.status_code = TxStatus.PENDING.value
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
def set_actors(self, sender, recipient, trusted_declarator_addresses=None, caller_address=ZERO_ADDRESS):
self.sender = sender
self.recipient = recipient
if trusted_declarator_addresses != None:
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain_spec)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain_spec)
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain_spec, sender_address=caller_address)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain_spec, sender_address=caller_address)
def set_tokens(self, source, source_value, destination=None, destination_value=None):
@@ -40,8 +44,8 @@ class ExtendedTx:
destination = source
if destination_value == None:
destination_value = source_value
st = ERC20Token(self.rpc, source)
dt = ERC20Token(self.rpc, destination)
st = ERC20Token(self.chain_spec, self.rpc, source)
dt = ERC20Token(self.chain_spec, self.rpc, destination)
self.source_token = source
self.source_token_symbol = st.symbol
self.source_token_name = st.name
@@ -62,10 +66,10 @@ class ExtendedTx:
self.status_code = n
def to_dict(self):
def asdict(self):
o = {}
for attr in dir(self):
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'asdict', 'rpc']:
continue
o[attr] = getattr(self, attr)
return o

View File

@@ -15,7 +15,6 @@ from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
from chainlib.connection import RPCConnection
from chainsyncer.error import SyncDone
from hexathon import strip_0x
from chainqueue.db.enum import (
StatusEnum,
@@ -153,10 +152,7 @@ class DispatchSyncer:
def main():
syncer = DispatchSyncer(chain_spec)
conn = RPCConnection.connect(chain_spec, 'default')
try:
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
except SyncDone as e:
sys.stderr.write("dispatcher done at block {}\n".format(e))
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
sys.exit(0)

View File

@@ -1,7 +1,7 @@
# standard imports
import logging
# third-party imports
# external imports
import celery
from cic_eth_registry.error import UnknownContractError
from chainlib.status import Status as TxStatus
@@ -9,7 +9,13 @@ from chainlib.eth.address import to_checksum_address
from chainlib.eth.error import RequestMismatchException
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.erc20 import ERC20
from hexathon import strip_0x
from hexathon import (
strip_0x,
add_0x,
)
# TODO: use sarafu_Faucet for both when inheritance has been implemented
from erc20_single_shot_faucet import SingleShotFaucet
from sarafu_faucet import MinterFaucet as Faucet
# local imports
from .base import SyncFilter
@@ -18,65 +24,73 @@ from cic_eth.eth.meta import ExtendedTx
logg = logging.getLogger().getChild(__name__)
def parse_transfer(tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = {}
transfer_data['to'] = r[0]
transfer_data['value'] = r[1]
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
return ('transfer', transfer_data)
def parse_transferfrom(tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = unpack_transferfrom(tx.payload)
transfer_data['from'] = r[0]
transfer_data['to'] = r[1]
transfer_data['value'] = r[2]
transfer_data['token_address'] = tx['to']
return ('transferfrom', transfer_data)
def parse_giftto(tx):
# TODO: broken
logg.error('broken')
return
transfer_data = unpack_gift(tx.payload)
transfer_data['from'] = tx.inputs[0]
transfer_data['value'] = 0
transfer_data['token_address'] = ZERO_ADDRESS
# TODO: would be better to query the gift amount from the block state
for l in tx.logs:
topics = l['topics']
logg.debug('topixx {}'.format(topics))
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
#transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
transfer_data['value'] = int.from_bytes(bytes.fromhex(strip_0x(l_data)))
#token_address_bytes = topics[2][32-20:]
token_address = strip_0x(topics[2])[64-40:]
transfer_data['token_address'] = to_checksum_address(token_address)
return ('tokengift', transfer_data)
class CallbackFilter(SyncFilter):
trusted_addresses = []
def __init__(self, chain_spec, method, queue):
def __init__(self, chain_spec, method, queue, caller_address=ZERO_ADDRESS):
self.queue = queue
self.method = method
self.chain_spec = chain_spec
self.caller_address = caller_address
def parse_transfer(self, tx, conn):
if not tx.payload:
return (None, None)
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = {}
transfer_data['to'] = r[0]
transfer_data['value'] = r[1]
transfer_data['from'] = tx.outputs[0]
transfer_data['token_address'] = tx.inputs[0]
return ('transfer', transfer_data)
def parse_transferfrom(self, tx, conn):
if not tx.payload:
return (None, None)
r = ERC20.parse_transfer_from_request(tx.payload)
transfer_data = {}
transfer_data['from'] = r[0]
transfer_data['to'] = r[1]
transfer_data['value'] = r[2]
transfer_data['token_address'] = tx.inputs[0]
return ('transferfrom', transfer_data)
def parse_giftto(self, tx, conn):
if not tx.payload:
return (None, None)
r = Faucet.parse_give_to_request(tx.payload)
transfer_data = {}
transfer_data['to'] = r[0]
transfer_data['value'] = tx.value
transfer_data['from'] = tx.outputs[0]
#transfer_data['token_address'] = tx.inputs[0]
faucet_contract = tx.inputs[0]
c = SingleShotFaucet(self.chain_spec)
o = c.token(faucet_contract, sender_address=self.caller_address)
r = conn.do(o)
transfer_data['token_address'] = add_0x(c.parse_token(r))
o = c.amount(faucet_contract, sender_address=self.caller_address)
r = conn.do(o)
transfer_data['value'] = c.parse_amount(r)
return ('tokengift', transfer_data)
def call_back(self, transfer_type, result):
logg.debug('result {}'.format(result))
result['chain_spec'] = result['chain_spec'].asdict()
s = celery.signature(
self.method,
[
result,
transfer_type,
int(result['status_code'] == 0),
int(result['status_code'] != 0),
],
queue=self.queue,
)
@@ -92,26 +106,29 @@ class CallbackFilter(SyncFilter):
# s_translate.link(s)
# s_translate.apply_async()
t = s.apply_async()
return s
return t
def parse_data(self, tx):
def parse_data(self, tx, conn):
transfer_type = None
transfer_data = None
# TODO: what's with the mix of attributes and dict keys
logg.debug('have payload {}'.format(tx.payload))
method_signature = tx.payload[:8]
logg.debug('tx status {}'.format(tx.status))
for parser in [
parse_transfer,
parse_transferfrom,
parse_giftto,
self.parse_transfer,
self.parse_transferfrom,
self.parse_giftto,
]:
try:
(transfer_type, transfer_data) = parser(tx)
break
if tx:
(transfer_type, transfer_data) = parser(tx, conn)
if transfer_type == None:
continue
else:
pass
except RequestMismatchException:
continue
@@ -128,7 +145,7 @@ class CallbackFilter(SyncFilter):
transfer_data = None
transfer_type = None
try:
(transfer_type, transfer_data) = self.parse_data(tx)
(transfer_type, transfer_data) = self.parse_data(tx, conn)
except TypeError:
logg.debug('invalid method data length for tx {}'.format(tx.hash))
return
@@ -144,16 +161,17 @@ class CallbackFilter(SyncFilter):
result = None
try:
tokentx = ExtendedTx(conn, tx.hash, self.chain_spec)
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses)
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses, caller_address=self.caller_address)
tokentx.set_tokens(transfer_data['token_address'], transfer_data['value'])
if transfer_data['status'] == 0:
tokentx.set_status(1)
else:
tokentx.set_status(0)
t = self.call_back(transfer_type, tokentx.to_dict())
logg.info('callback success task id {} tx {}'.format(t, tx.hash))
result = tokentx.asdict()
t = self.call_back(transfer_type, result)
logg.info('callback success task id {} tx {} queue {}'.format(t, tx.hash, t.queue))
except UnknownContractError:
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash))
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tx.queue, tx.method, transfer_data['to'], tx.hash))
def __str__(self):

View File

@@ -39,6 +39,7 @@ from cic_eth.queue import (
from cic_eth.callbacks import (
Callback,
http,
noop,
#tcp,
redis,
)
@@ -91,7 +92,7 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=50, debug=config.true('DATABASE_DEBUG'))
SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query
session = SessionBase.create_session()

View File

@@ -15,7 +15,6 @@ import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
@@ -26,7 +25,7 @@ from chainlib.eth.block import (
from hexathon import (
strip_0x,
)
from chainsyncer.backend import SyncerBackend
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
@@ -43,6 +42,12 @@ from cic_eth.runnable.daemons.filters import (
TransferAuthFilter,
)
from cic_eth.stat import init_chain_stat
from cic_eth.registry import (
connect as connect_registry,
connect_declarator,
connect_token_registry,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
@@ -88,18 +93,18 @@ def main():
syncers = []
#if SyncerBackend.first(chain_spec):
# backend = SyncerBackend.initial(chain_spec, block_offset)
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
syncer_backends.append(SQLBackend.live(chain_spec, block_offset+1))
for syncer_backend in syncer_backends:
try:
@@ -109,6 +114,8 @@ def main():
logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend))
syncers.append(HeadSyncer(syncer_backend))
connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
@@ -116,6 +123,8 @@ def main():
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
connect_declarator(rpc, chain_spec, trusted_addresses)
connect_token_registry(rpc, chain_spec)
CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = []

View File

@@ -0,0 +1,33 @@
# standard imports
import logging
# external imports
from chainlib.stat import ChainStat
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
logg = logging.getLogger().getChild(__name__)
BLOCK_SAMPLES = 10
def init_chain_stat(rpc, block_start=0):
stat = ChainStat()
if block_start == 0:
o = block_latest()
r = rpc.do(o)
block_start = int(r, 16)
for i in range(BLOCK_SAMPLES):
o = block_by_number(block_start-10+i)
block_src = rpc.do(o)
logg.debug('block {}'.format(block_src))
block = Block(block_src)
stat.block_apply(block)
logg.debug('calculated block time {} from {} block samples'.format(stat.block_average(), BLOCK_SAMPLES))
return stat

View File

@@ -4,7 +4,7 @@ import datetime
# external imports
from chainsyncer.driver import HeadSyncer
from chainsyncer.backend import MemBackend
from chainsyncer.backend.memory import MemBackend
from chainsyncer.error import NoBlockForYou
from chainlib.eth.block import (
block_by_number,

View File

@@ -10,7 +10,7 @@ version = (
0,
11,
0,
'beta.1',
'beta.6',
)
version_object = semver.VersionInfo(

View File

@@ -6,4 +6,5 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
POOL_SIZE=50
DEBUG=0

View File

@@ -6,4 +6,5 @@ HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
POOL_SIZE=50
DEBUG=0

View File

@@ -29,7 +29,7 @@ RUN /usr/local/bin/python -m pip install --upgrade pip
# python merge_requirements.py | tee merged_requirements.txt
#RUN cd cic-base && \
# pip install $pip_extra_index_url_flag -r ./merged_requirements.txt
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a62
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a77
COPY cic-eth/scripts/ scripts/
COPY cic-eth/setup.cfg cic-eth/setup.py ./

View File

@@ -1,25 +1,25 @@
cic-base~=0.1.2a62
cic-base~=0.1.2a76
celery==4.4.7
crypto-dev-signer~=0.4.14a17
crypto-dev-signer~=0.4.14b2
confini~=0.3.6rc3
cic-eth-registry~=0.5.4a12
cic-eth-registry~=0.5.4a16
#cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2
websockets==8.1
requests~=2.24.0
eth_accounts_index~=0.0.11a7
erc20-transfer-authorization~=0.3.1a3
#simple-rlp==0.1.2
eth_accounts_index~=0.0.11a9
erc20-transfer-authorization~=0.3.1a5
uWSGI==2.0.19.1
semver==2.13.0
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.1a7
chainlib~=0.0.2a5
eth-address-index~=0.1.1a9
chainlib~=0.0.2a13
hexathon~=0.0.1a7
chainsyncer~=0.0.1a21
chainsyncer[sql]~=0.0.2a2
chainqueue~=0.0.1a7
pysha3==1.0.2
coincurve==15.0.0
sarafu-faucet~=0.0.2a19
sarafu-faucet==0.0.2a28
potaahto~=0.0.1a1

View File

@@ -4,4 +4,4 @@ pytest-mock==3.3.1
pytest-cov==2.10.1
eth-tester==0.5.0b3
py-evm==0.3.0a20
giftable-erc20-token==0.0.8a4
giftable-erc20-token==0.0.8a9

View File

@@ -0,0 +1,223 @@
# standard import
import logging
import datetime
import os
# external imports
import pytest
from chainlib.connection import RPCConnection
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.tx import (
receipt,
transaction,
Tx,
)
from chainlib.eth.block import Block
from chainlib.eth.erc20 import ERC20
from sarafu_faucet import MinterFaucet
from eth_accounts_index import AccountRegistry
from potaahto.symbols import snake_and_camel
from hexathon import add_0x
# local imports
from cic_eth.runnable.daemons.filters.callback import CallbackFilter
logg = logging.getLogger()
@pytest.mark.skip()
def test_transfer_tx(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
foo_token,
agent_roles,
token_roles,
contract_roles,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], rpc)
gas_oracle = OverrideGasOracle(conn=rpc, limit=200000)
txf = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], 1024)
r = rpc.do(o)
o = transaction(tx_hash_hex)
r = rpc.do(o)
logg.debug(r)
tx_src = snake_and_camel(r)
tx = Tx(tx_src)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
rcpt = snake_and_camel(r)
tx.apply_receipt(rcpt)
fltr = CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER'])
(transfer_type, transfer_data) = fltr.parse_transfer(tx, eth_rpc)
assert transfer_type == 'transfer'
@pytest.mark.skip()
def test_transfer_from_tx(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
foo_token,
agent_roles,
token_roles,
contract_roles,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], rpc)
gas_oracle = OverrideGasOracle(conn=rpc, limit=200000)
txf = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.approve(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], 1024)
r = rpc.do(o)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], rpc)
txf = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.transfer_from(foo_token, agent_roles['ALICE'], token_roles['FOO_TOKEN_OWNER'], agent_roles['BOB'], 1024)
r = rpc.do(o)
o = transaction(tx_hash_hex)
r = rpc.do(o)
tx_src = snake_and_camel(r)
tx = Tx(tx_src)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
rcpt = snake_and_camel(r)
tx.apply_receipt(rcpt)
fltr = CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER'])
(transfer_type, transfer_data) = fltr.parse_transferfrom(tx, eth_rpc)
assert transfer_type == 'transferfrom'
def test_faucet_gift_to_tx(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
foo_token,
agent_roles,
contract_roles,
faucet,
account_registry,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
gas_oracle = OverrideGasOracle(conn=rpc, limit=800000)
nonce_oracle = RPCNonceOracle(contract_roles['ACCOUNT_REGISTRY_WRITER'], rpc)
txf = AccountRegistry(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.add(account_registry, contract_roles['ACCOUNT_REGISTRY_WRITER'], agent_roles['ALICE'])
r = rpc.do(o)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], rpc)
txf = MinterFaucet(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.give_to(faucet, agent_roles['ALICE'], agent_roles['ALICE'])
r = rpc.do(o)
o = transaction(tx_hash_hex)
r = rpc.do(o)
tx_src = snake_and_camel(r)
tx = Tx(tx_src)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
rcpt = snake_and_camel(r)
tx.apply_receipt(rcpt)
fltr = CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER'])
(transfer_type, transfer_data) = fltr.parse_giftto(tx, eth_rpc)
assert transfer_type == 'tokengift'
assert transfer_data['token_address'] == foo_token
def test_callback_filter(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
foo_token,
token_roles,
agent_roles,
contract_roles,
register_lookups,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], rpc)
gas_oracle = OverrideGasOracle(conn=rpc, limit=200000)
txf = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], 1024)
r = rpc.do(o)
o = transaction(tx_hash_hex)
r = rpc.do(o)
logg.debug(r)
mockblock_src = {
'hash': add_0x(os.urandom(32).hex()),
'number': '0x2a',
'transactions': [tx_hash_hex],
'timestamp': datetime.datetime.utcnow().timestamp(),
}
mockblock = Block(mockblock_src)
tx_src = snake_and_camel(r)
tx = Tx(tx_src, block=mockblock)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
rcpt = snake_and_camel(r)
tx.apply_receipt(rcpt)
fltr = CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER'])
class CallbackMock:
def __init__(self):
self.results = {}
def call_back(self, transfer_type, result):
self.results[transfer_type] = result
mock = CallbackMock()
fltr.call_back = mock.call_back
fltr.filter(eth_rpc, mockblock, tx, init_database)
assert mock.results.get('transfer') != None
assert mock.results['transfer']['destination_token'] == foo_token

View File

@@ -3,4 +3,3 @@ dist
dist-web
dist-server
scratch
tests

View File

@@ -1,6 +1,6 @@
{
"name": "cic-client-meta",
"version": "0.0.7-alpha.2",
"version": "0.0.7-alpha.7",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@@ -852,6 +852,75 @@
"printj": "~1.1.0"
}
},
"crdt-meta": {
"version": "0.0.8",
"resolved": "https://registry.npmjs.org/crdt-meta/-/crdt-meta-0.0.8.tgz",
"integrity": "sha512-CS0sS0L2QWthz7vmu6vzl3p4kcpJ+IKILBJ4tbgN4A3iNG8wnBeuDIv/z3KFFQjcfuP4QAh6E9LywKUTxtDc3g==",
"requires": {
"automerge": "^0.14.2",
"ini": "^1.3.8",
"openpgp": "^4.10.8",
"pg": "^8.5.1",
"sqlite3": "^5.0.2"
},
"dependencies": {
"automerge": {
"version": "0.14.2",
"resolved": "https://registry.npmjs.org/automerge/-/automerge-0.14.2.tgz",
"integrity": "sha512-shiwuJHCbNRI23WZyIECLV4Ovf3WiAFJ7P9BH4l5gON1In/UUbjcSJKRygtIirObw2UQumeYxp3F2XBdSvQHnA==",
"requires": {
"immutable": "^3.8.2",
"transit-immutable-js": "^0.7.0",
"transit-js": "^0.8.861",
"uuid": "^3.4.0"
}
},
"node-addon-api": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-3.1.0.tgz",
"integrity": "sha512-flmrDNB06LIl5lywUz7YlNGZH/5p0M7W28k8hzd9Lshtdh1wshD2Y+U4h9LD6KObOy1f+fEVdgprPrEymjM5uw=="
},
"pg": {
"version": "8.6.0",
"resolved": "https://registry.npmjs.org/pg/-/pg-8.6.0.tgz",
"integrity": "sha512-qNS9u61lqljTDFvmk/N66EeGq3n6Ujzj0FFyNMGQr6XuEv4tgNTXvJQTfJdcvGit5p5/DWPu+wj920hAJFI+QQ==",
"requires": {
"buffer-writer": "2.0.0",
"packet-reader": "1.0.0",
"pg-connection-string": "^2.5.0",
"pg-pool": "^3.3.0",
"pg-protocol": "^1.5.0",
"pg-types": "^2.1.0",
"pgpass": "1.x"
}
},
"pg-connection-string": {
"version": "2.5.0",
"resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz",
"integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ=="
},
"pg-pool": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.3.0.tgz",
"integrity": "sha512-0O5huCql8/D6PIRFAlmccjphLYWC+JIzvUhSzXSpGaf+tjTZc4nn+Lr7mLXBbFJfvwbP0ywDv73EiaBsxn7zdg=="
},
"pg-protocol": {
"version": "1.5.0",
"resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.5.0.tgz",
"integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ=="
},
"sqlite3": {
"version": "5.0.2",
"resolved": "https://registry.npmjs.org/sqlite3/-/sqlite3-5.0.2.tgz",
"integrity": "sha512-1SdTNo+BVU211Xj1csWa8lV6KM0CtucDwRyA0VHl91wEH1Mgh7RxUpI4rVvG7OhHrzCSGaVyW5g8vKvlrk9DJA==",
"requires": {
"node-addon-api": "^3.0.0",
"node-gyp": "3.x",
"node-pre-gyp": "^0.11.0"
}
}
}
},
"create-hash": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/create-hash/-/create-hash-1.2.0.tgz",
@@ -966,17 +1035,17 @@
"dev": true
},
"elliptic": {
"version": "6.5.3",
"resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.5.3.tgz",
"integrity": "sha512-IMqzv5wNQf+E6aHeIqATs0tOLeOTwj1QKbRcS3jBbYkl5oLAserA8yJTT7/VyHUYG91PRmPyeQDObKLPpeS4dw==",
"version": "6.5.4",
"resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.5.4.tgz",
"integrity": "sha512-iLhC6ULemrljPZb+QutR5TQGB+pdW6KGD5RSegS+8sorOZT+rdQFbsQFJgvN3eRqNALqJer4oQ16YvJHlU8hzQ==",
"requires": {
"bn.js": "^4.4.0",
"brorand": "^1.0.1",
"bn.js": "^4.11.9",
"brorand": "^1.1.0",
"hash.js": "^1.0.0",
"hmac-drbg": "^1.0.0",
"inherits": "^2.0.1",
"minimalistic-assert": "^1.0.0",
"minimalistic-crypto-utils": "^1.0.0"
"hmac-drbg": "^1.0.1",
"inherits": "^2.0.4",
"minimalistic-assert": "^1.0.1",
"minimalistic-crypto-utils": "^1.0.1"
}
},
"emoji-regex": {
@@ -1489,9 +1558,9 @@
"integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ=="
},
"ini": {
"version": "1.3.5",
"resolved": "https://registry.npmjs.org/ini/-/ini-1.3.5.tgz",
"integrity": "sha512-RZY5huIKCMRWDUqZlEi72f/lmXKMvuszcMBduliQ3nnWbx9X/ZBQO7DijMEYS9EhHBb2qacRUMtC7svLwe0lcw=="
"version": "1.3.8",
"resolved": "https://registry.npmjs.org/ini/-/ini-1.3.8.tgz",
"integrity": "sha512-JV/yugV2uzW5iMRSiZAyDtQd+nxtUnjeLt0acNdw98kKLrvuRVyB80tsREOE7yvGVgalhZ6RNXCmEHkUKBKxew=="
},
"interpret": {
"version": "2.2.0",
@@ -1957,9 +2026,9 @@
}
},
"y18n": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.0.tgz",
"integrity": "sha512-r9S/ZyXu/Xu9q1tYlpsLIsa3EeLXXk0VwlxqTcFRfg9EhMW+17kbt9G0NrgCmhGb5vT2hyhJZLfDGx+7+5Uj/w==",
"version": "4.0.3",
"resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.3.tgz",
"integrity": "sha512-JKhqTOwSrqNA1NY5lSztJ1GrBiUodLMmIZuLiDaMRJ+itFd+ABVE8XBjOvIWL+rSqNDC74LCSFmlb/U4UZ4hJQ==",
"dev": true
},
"yargs": {

View File

@@ -1,6 +1,6 @@
{
"name": "cic-client-meta",
"version": "0.0.7-alpha.3",
"version": "0.0.7-alpha.8",
"description": "Signed CRDT metadata graphs for the CIC network",
"main": "dist/index.js",
"types": "dist/index.d.ts",
@@ -15,8 +15,9 @@
"dependencies": {
"@ethereumjs/tx": "^3.0.0-beta.1",
"automerge": "^0.14.1",
"crdt-meta": "0.0.8",
"ethereumjs-wallet": "^1.0.1",
"ini": "^1.3.5",
"ini": "^1.3.8",
"openpgp": "^4.10.8",
"pg": "^8.4.2",
"sqlite3": "^5.0.0",
@@ -40,6 +41,6 @@
],
"license": "GPL-3.0-or-later",
"engines": {
"node": "~15.3.0"
"node": ">=14.16.1"
}
}

View File

@@ -1,4 +1,4 @@
const config = require('./src/config');
import { Config } from 'crdt-meta';
const fs = require('fs');
if (process.argv[2] === undefined) {
@@ -15,6 +15,6 @@ try {
process.exit(1);
}
const c = new config.Config(process.argv[2], process.env['CONFINI_ENV_PREFIX']);
const c = new Config(process.argv[2], process.env['CONFINI_ENV_PREFIX']);
c.process();
process.stdout.write(c.toString());

View File

@@ -1,8 +1,7 @@
import * as Automerge from 'automerge';
import * as pgp from 'openpgp';
import * as pg from 'pg';
import { Envelope, Syncable } from '../../src/sync';
import { Envelope, Syncable } from 'crdt-meta';
function handleNoMergeGet(db, digest, keystore) {

View File

@@ -1,15 +1,11 @@
import * as http from 'http';
import * as fs from 'fs';
import * as path from 'path';
import * as pgp from 'openpgp';
import * as handlers from './handlers';
import { Envelope, Syncable } from '../../src/sync';
import { PGPKeyStore, PGPSigner } from '../../src/auth';
import { PGPKeyStore, PGPSigner, Config, SqliteAdapter, PostgresAdapter } from 'crdt-meta';
import { standardArgs } from './args';
import { Config } from '../../src/config';
import { SqliteAdapter, PostgresAdapter } from '../../src/db';
let configPath = '/usr/local/etc/cic-meta';
@@ -114,6 +110,7 @@ async function processRequest(req, res) {
return;
}
if (!['PUT', 'GET', 'POST'].includes(req.method)) {
res.writeHead(405, {"Content-Type": "text/plain"});
res.end();
@@ -123,6 +120,7 @@ async function processRequest(req, res) {
try {
digest = parseDigest(req.url);
} catch(e) {
console.error('digest error: ' + e)
res.writeHead(400, {"Content-Type": "text/plain"});
res.end();
return;

View File

@@ -1,191 +0,0 @@
import * as pgp from 'openpgp';
import * as crypto from 'crypto';
interface Signable {
digest():string;
}
type KeyGetter = () => any;
type Signature = {
engine:string
algo:string
data:string
digest:string
}
interface Signer {
prepare(Signable):boolean;
onsign(Signature):void;
onverify(boolean):void;
sign(digest:string):void
verify(digest:string, signature:Signature):void
fingerprint():string
}
interface Authoritative {
}
interface KeyStore {
getPrivateKey: KeyGetter
getFingerprint: () => string
getTrustedKeys: () => Array<any>
getTrustedActiveKeys: () => Array<any>
getEncryptKeys: () => Array<any>
}
class PGPKeyStore implements KeyStore {
fingerprint: string
pk: any
pubk = {
active: [],
trusted: [],
encrypt: [],
}
loads = 0x00;
loadsTarget = 0x0f;
onload: (k:KeyStore) => void;
constructor(passphrase:string, pkArmor:string, pubkActiveArmor:string, pubkTrustedArmor:string, pubkEncryptArmor:string, onload = (ks:KeyStore) => {}) {
this._readKey(pkArmor, undefined, 1, passphrase);
this._readKey(pubkActiveArmor, 'active', 2);
this._readKey(pubkTrustedArmor, 'trusted', 4);
this._readKey(pubkEncryptArmor, 'encrypt', 8);
this.onload = onload;
}
private _readKey(a:string, x:any, n:number, pass?:string) {
pgp.key.readArmored(a).then((k) => {
if (pass !== undefined) {
this.pk = k.keys[0];
this.pk.decrypt(pass).then(() => {
this.fingerprint = this.pk.getFingerprint();
console.log('private key (sign)', this.fingerprint);
this._registerLoad(n);
});
} else {
this.pubk[x] = k.keys;
k.keys.forEach((pubk) => {
console.log('public key (' + x + ')', pubk.getFingerprint());
});
this._registerLoad(n);
}
});
}
private _registerLoad(b:number) {
this.loads |= b;
if (this.loads == this.loadsTarget) {
this.onload(this);
}
}
public getTrustedKeys(): Array<any> {
return this.pubk['trusted'];
}
public getTrustedActiveKeys(): Array<any> {
return this.pubk['active'];
}
public getEncryptKeys(): Array<any> {
return this.pubk['encrypt'];
}
public getPrivateKey(): any {
return this.pk;
}
public getFingerprint(): string {
return this.fingerprint;
}
}
class PGPSigner implements Signer {
engine = 'pgp'
algo = 'sha256'
dgst: string
signature: Signature
keyStore: KeyStore
onsign: (Signature) => void
onverify: (boolean) => void
constructor(keyStore:KeyStore) {
this.keyStore = keyStore
this.onsign = (string) => {};
this.onverify = (boolean) => {};
}
public fingerprint(): string {
return this.keyStore.getFingerprint();
}
public prepare(material:Signable):boolean {
this.dgst = material.digest();
return true;
}
public verify(digest:string, signature:Signature) {
pgp.signature.readArmored(signature.data).then((s) => {
const opts = {
message: pgp.cleartext.fromText(digest),
publicKeys: this.keyStore.getTrustedKeys(),
signature: s,
};
pgp.verify(opts).then((v) => {
let i = 0;
for (i = 0; i < v.signatures.length; i++) {
const s = v.signatures[i];
if (s.valid) {
this.onverify(s);
return;
}
}
console.error('checked ' + i + ' signature(s) but none valid');
this.onverify(false);
});
}).catch((e) => {
console.error(e);
this.onverify(false);
});
}
public sign(digest:string) {
const m = pgp.cleartext.fromText(digest);
const pk = this.keyStore.getPrivateKey();
const opts = {
message: m,
privateKeys: [pk],
detached: true,
}
pgp.sign(opts).then((s) => {
this.signature = {
engine: this.engine,
algo: this.algo,
data: s.signature,
// TODO: fix for browser later
digest: digest,
};
this.onsign(this.signature);
}).catch((e) => {
console.error(e);
this.onsign(undefined);
});
}
}
export {
Signature,
Authoritative,
Signer,
KeyGetter,
Signable,
KeyStore,
PGPSigner,
PGPKeyStore,
};

View File

@@ -1,71 +0,0 @@
import * as fs from 'fs';
import * as ini from 'ini';
import * as path from 'path';
class Config {
filepath: string
store: Object
censor: Array<string>
require: Array<string>
env_prefix: string
constructor(filepath:string, env_prefix?:string) {
this.filepath = filepath;
this.store = {};
this.censor = [];
this.require = [];
this.env_prefix = '';
if (env_prefix !== undefined) {
this.env_prefix = env_prefix + "_";
}
}
public process() {
const d = fs.readdirSync(this.filepath);
const r = /.*\.ini$/;
for (let i = 0; i < d.length; i++) {
const f = d[i];
if (!f.match(r)) {
return;
}
const fp = path.join(this.filepath, f);
const v = fs.readFileSync(fp, 'utf-8');
const inid = ini.decode(v);
const inik = Object.keys(inid);
for (let j = 0; j < inik.length; j++) {
const k_section = inik[j]
const k = k_section.toUpperCase();
Object.keys(inid[k_section]).forEach((k_directive) => {
const kk = k_directive.toUpperCase();
const kkk = k + '_' + kk;
let r = inid[k_section][k_directive];
const k_env = this.env_prefix + kkk
const env = process.env[k_env];
if (env !== undefined) {
console.debug('Environment variable ' + k_env + ' overrides ' + kkk);
r = env;
}
this.store[kkk] = r;
});
}
}
}
public get(s:string) {
return this.store[s];
}
public toString() {
let s = '';
Object.keys(this.store).forEach((k) => {
s += k + '=' + this.store[k] + '\n';
});
return s;
}
}
export { Config };

View File

@@ -1,38 +0,0 @@
import { JSONSerializable } from './format';
const ENGINE_NAME = 'automerge';
const ENGINE_VERSION = '0.14.1';
const NETWORK_NAME = 'cic';
const NETWORK_VERSION = '1';
const CRYPTO_NAME = 'pgp';
const CRYPTO_VERSION = '2';
type VersionedSpec = {
name: string
version: string
ext?: Object
}
const engineSpec:VersionedSpec = {
name: ENGINE_NAME,
version: ENGINE_VERSION,
}
const cryptoSpec:VersionedSpec = {
name: CRYPTO_NAME,
version: CRYPTO_VERSION,
}
const networkSpec:VersionedSpec = {
name: NETWORK_NAME,
version: NETWORK_VERSION,
}
export {
engineSpec,
cryptoSpec,
networkSpec,
VersionedSpec,
};

View File

@@ -1,27 +0,0 @@
import * as crypto from 'crypto';
const _algs = {
'SHA-256': 'sha256',
}
function cryptoWrapper() {
}
cryptoWrapper.prototype.digest = async function(s, d) {
const h = crypto.createHash(_algs[s]);
h.update(d);
return h.digest();
}
let subtle = undefined;
if (typeof window !== 'undefined') {
subtle = window.crypto.subtle;
} else {
subtle = new cryptoWrapper();
}
export {
subtle,
}

View File

@@ -1,90 +0,0 @@
import * as pg from 'pg';
import * as sqlite from 'sqlite3';
type DbConfig = {
name: string
host: string
port: number
user: string
password: string
}
interface DbAdapter {
query: (s:string, callback:(e:any, rs:any) => void) => void
close: () => void
}
const re_creatematch = /^(CREATE)/i
const re_getmatch = /^(SELECT)/i;
const re_setmatch = /^(INSERT|UPDATE)/i;
class SqliteAdapter implements DbAdapter {
db: any
constructor(dbConfig:DbConfig, callback?:(any) => void) {
this.db = new sqlite.Database(dbConfig.name); //, callback);
}
public query(s:string, callback:(e:any, rs?:any) => void): void {
const local_callback = (e, rs) => {
let r = undefined;
if (rs !== undefined) {
r = {
rowCount: rs.length,
rows: rs,
}
}
callback(e, r);
};
if (s.match(re_getmatch)) {
this.db.all(s, local_callback);
} else if (s.match(re_setmatch)) {
this.db.run(s, local_callback);
} else if (s.match(re_creatematch)) {
this.db.run(s, callback);
} else {
throw 'unhandled query';
}
}
public close() {
this.db.close();
}
}
class PostgresAdapter implements DbAdapter {
db: any
constructor(dbConfig:DbConfig) {
let o = dbConfig;
o['database'] = o.name;
this.db = new pg.Pool(o);
return this.db;
}
public query(s:string, callback:(e:any, rs:any) => void): void {
this.db.query(s, (e, rs) => {
let r = {
length: rs.rowCount,
}
rs.length = rs.rowCount;
if (e === undefined) {
e = null;
}
console.debug(e, rs);
callback(e, rs);
});
}
public close() {
this.db.end();
}
}
export {
DbConfig,
SqliteAdapter,
PostgresAdapter,
}

View File

@@ -1,67 +0,0 @@
import * as crypto from './crypto';
interface Addressable {
key(): string
digest(): string
}
function stringToBytes(s:string) {
const a = new Uint8Array(20);
let j = 2;
for (let i = 0; i < a.byteLength; i++) {
const n = parseInt(s.substring(j, j+2), 16);
a[i] = n;
j += 2;
}
return a;
}
function bytesToHex(a:Uint8Array) {
let s = '';
for (let i = 0; i < a.byteLength; i++) {
const h = '00' + a[i].toString(16);
s += h.slice(-2);
}
return s;
}
async function mergeKey(a:Uint8Array, s:Uint8Array) {
const y = new Uint8Array(a.byteLength + s.byteLength);
for (let i = 0; i < a.byteLength; i++) {
y[i] = a[i];
}
for (let i = 0; i < s.byteLength; i++) {
y[a.byteLength + i] = s[i];
}
const z = await crypto.subtle.digest('SHA-256', y);
return bytesToHex(new Uint8Array(z));
}
async function toKey(v:string, salt:string) {
const a = stringToBytes(v);
const s = new TextEncoder().encode(salt);
return await mergeKey(a, s);
}
async function toAddressKey(zeroExHex:string, salt:string) {
const a = addressToBytes(zeroExHex);
const s = new TextEncoder().encode(salt);
return await mergeKey(a, s);
}
const re_addrHex = /^0[xX][a-fA-F0-9]{40}$/;
function addressToBytes(s:string) {
if (!s.match(re_addrHex)) {
throw 'invalid address hex';
}
return stringToBytes(s);
}
export {
toKey,
toAddressKey,
bytesToHex,
addressToBytes,
Addressable,
}

View File

@@ -1,58 +0,0 @@
import { v4 as uuidv4 } from 'uuid';
import { Syncable } from './sync';
import { Store } from './store';
import { PubSub } from './transport';
function toIndexKey(id:string):string {
const d = Date.now();
return d + '_' + id + '_' + uuidv4();
}
const _re_indexKey = /^\d+_(.+)_[-\d\w]+$/;
function fromIndexKey(s:string):string {
const m = s.match(_re_indexKey);
if (m === null) {
throw 'Invalid index key';
}
return m[1];
}
class Dispatcher {
idx: Array<string>
syncer: PubSub
store: Store
constructor(store:Store, syncer:PubSub) {
this.idx = new Array<string>()
this.syncer = syncer;
this.store = store;
}
public isDirty(): boolean {
return this.idx.length > 0;
}
public add(id:string, item:Syncable): string {
const v = item.toJSON();
const k = toIndexKey(id);
this.store.put(k, v, true);
localStorage.setItem(k, v);
this.idx.push(k);
return k;
}
public sync(offset:number): number {
let i = 0;
this.idx.forEach((k) => {
const v = localStorage.getItem(k);
const k_id = fromIndexKey(k);
this.syncer.pub(v); // this must block until guaranteed delivery
localStorage.removeItem(k);
i++;
});
return i;
}
}
export { Dispatcher, toIndexKey, fromIndexKey }

View File

@@ -1,5 +0,0 @@
interface JSONSerializable {
toJSON(): string
}
export { JSONSerializable };

View File

@@ -1,5 +1,2 @@
export { PGPSigner, PGPKeyStore, Signer, KeyStore } from './auth';
export { ArgPair,  Envelope, Syncable } from './sync';
export { User } from './assets/user';
export { Phone } from './assets/phone';
export { Config } from './config';
export { User } from './user';
export { Phone } from './phone';

View File

@@ -1,12 +1,11 @@
import { ArgPair, Syncable } from '../sync';
import { Addressable, addressToBytes, bytesToHex, toKey } from '../digest';
import { Syncable, Addressable, mergeKey } from 'crdt-meta';
class Phone extends Syncable implements Addressable {
address: string
value: number
constructor(address:string, v:number) {
constructor(address:string, v:string) {
const o = {
msisdn: v,
}
@@ -17,8 +16,8 @@ class Phone extends Syncable implements Addressable {
});
}
public static async toKey(msisdn:number) {
return await toKey(msisdn.toString(), ':cic.msisdn');
public static async toKey(msisdn:string) {
return await mergeKey(Buffer.from(msisdn), Buffer.from(':cic.phone'));
}
public key(): string {

View File

@@ -1,9 +0,0 @@
import { Syncable } from './sync';
interface Store {
put(string, Syncable, boolean?)
get(string):Syncable
delete(string)
}
export { Store };

View File

@@ -1,266 +0,0 @@
import * as Automerge from 'automerge';
import { JSONSerializable } from './format';
import { Authoritative, Signer, PGPSigner, Signable, Signature } from './auth';
import { engineSpec, cryptoSpec, networkSpec, VersionedSpec } from './constants';
const fullSpec:VersionedSpec = {
name: 'cic',
version: '1',
ext: {
network: cryptoSpec,
engine: engineSpec,
},
}
class Envelope {
o = fullSpec
constructor(payload:Object) {
this.set(payload);
}
public set(payload:Object) {
this.o['payload'] = payload
}
public get():string {
return this.o['payload'];
}
public toJSON() {
return JSON.stringify(this.o);
}
public static fromJSON(s:string): Envelope {
const e = new Envelope(undefined);
e.o = JSON.parse(s);
return e;
}
public unwrap(): Syncable {
return Syncable.fromJSON(this.o['payload']);
}
}
class ArgPair {
k:string
v:any
constructor(k:string, v:any) {
this.k = k;
this.v = v;
}
}
class SignablePart implements Signable {
s: string
constructor(s:string) {
this.s = s;
}
public digest():string {
return this.s;
}
}
function orderDict(src) {
let dst;
if (Array.isArray(src)) {
dst = [];
src.forEach((v) => {
if (typeof(v) == 'object') {
v = orderDict(v);
}
dst.push(v);
});
} else {
dst = {}
Object.keys(src).sort().forEach((k) => {
let v = src[k];
if (typeof(v) == 'object') {
v = orderDict(v);
}
dst[k] = v;
});
}
return dst;
}
class Syncable implements JSONSerializable, Authoritative, Signable {
id: string
timestamp: number
m: any // automerge object
e: Envelope
signer: Signer
onwrap: (string) => void
onauthenticate: (boolean) => void
// TODO: Move data to sub-object so timestamp, id, signature don't collide
constructor(id:string, v:Object) {
this.id = id;
const o = {
'id': id,
'timestamp': Math.floor(Date.now() / 1000),
'data': v,
}
//this.m = Automerge.from(v)
this.m = Automerge.from(o)
}
public setSigner(signer:Signer) {
this.signer = signer;
this.signer.onsign = (s) => {
this.wrap(s);
};
}
// TODO: To keep integrity, the non-link key/value pairs for each step also need to be hashed
public digest(): string {
const links = [];
Automerge.getAllChanges(this.m).forEach((ch:Object) => {
const op:Array<any> = ch['ops'];
ch['ops'].forEach((op:Array<Object>) => {
if (op['action'] == 'link') {
//console.log('op link', op);
links.push([op['obj'], op['value']]);
}
});
});
//return JSON.stringify(links);
const j = JSON.stringify(links);
return Buffer.from(j).toString('base64');
}
private wrap(s:any) {
this.m = Automerge.change(this.m, 'sign', (doc) => {
doc['signature'] = s;
});
this.e = new Envelope(this.toJSON());
console.log('wrappin s', s, typeof(s));
this.e.o['digest'] = s.digest;
if (this.onwrap !== undefined) {
this.onwrap(this.e);
}
}
// private _verifyLoop(i:number, history:Array<any>, signable:Signable, result:boolean) {
// if (!result) {
// this.onauthenticate(false);
// return;
// } else if (history.length == 0) {
// this.onauthenticate(true);
// return;
// }
// const h = history.shift()
// if (i % 2 == 0) {
// i++;
// signable = {
// digest: () => {
// return Automerge.save(h.snapshot)
// },
// };
// this._verifyLoop(i, history, signable, true);
// } else {
// i++;
// const signature = h.snapshot['signature'];
// console.debug('signature', signature, signable.digest());
// this.signer.onverify = (v) => {
// this._verifyLoop(i, history, signable, v)
// }
// this.signer.verify(signable, signature);
// }
// }
//
// // TODO: This should replay the graph and check signatures on each step
// public _authenticate(full:boolean=false) {
// let h = Automerge.getHistory(this.m);
// h.forEach((m) => {
// //console.debug(m.snapshot);
// });
// const signable = {
// digest: () => { return '' },
// }
// if (!full) {
// h = h.slice(h.length-2);
// }
// this._verifyLoop(0, h, signable, true);
// }
public authenticate(full:boolean=false) {
if (full) {
console.warn('only doing shallow authentication for now, sorry');
}
//console.log('authenticating', signable.digest());
//console.log('signature', this.m.signature);
this.signer.onverify = (v) => {
//this._verifyLoop(i, history, signable, v)
this.onauthenticate(v);
}
this.signer.verify(this.m.signature.digest, this.m.signature);
}
public sign() {
//this.signer.prepare(this);
this.signer.sign(this.digest());
}
public update(changes:Array<ArgPair>, changesDescription:string) {
this.m = Automerge.change(this.m, changesDescription, (m) => {
changes.forEach((c) => {
let path = c.k.split('.');
let target = m['data'];
while (path.length > 1) {
const part = path.shift();
target = target[part];
}
target[path[0]] = c.v;
});
m['timestamp'] = Math.floor(Date.now() / 1000);
});
}
public replace(o:Object, changesDescription:string) {
this.m = Automerge.change(this.m, changesDescription, (m) => {
Object.keys(o).forEach((k) => {
m['data'][k] = o[k];
});
Object.keys(m).forEach((k) => {
if (o[k] == undefined) {
delete m['data'][k];
}
});
m['timestamp'] = Math.floor(Date.now() / 1000);
});
}
public merge(s:Syncable) {
this.m = Automerge.merge(s.m, this.m);
}
public toJSON(): string {
const s = Automerge.save(this.m);
const o = JSON.parse(s);
const oo = orderDict(o)
return JSON.stringify(oo);
}
public static fromJSON(s:string): Syncable {
const doc = Automerge.load(s);
let y = new Syncable(doc['id'], {});
y.m = doc
return y
}
}
export { JSONSerializable, Syncable, ArgPair, Envelope };

View File

@@ -1,11 +0,0 @@
interface SubConsumer {
post(string)
}
interface PubSub {
pub(v:string):boolean
close()
}
export { PubSub, SubConsumer };

View File

@@ -1,5 +1,4 @@
import { ArgPair, Syncable } from '../sync';
import { Addressable, addressToBytes, bytesToHex, toAddressKey } from '../digest';
import { Syncable, Addressable, toAddressKey } from 'crdt-meta';
const keySalt = new TextEncoder().encode(':cic.person');
class User extends Syncable implements Addressable {

View File

@@ -1,50 +0,0 @@
import * as Automerge from 'automerge';
import assert = require('assert');
import { Dispatcher, toIndexKey, fromIndexKey } from '../src/dispatch';
import { User } from '../src/assets/user';
import { Syncable, ArgPair } from '../src/sync';
import { MockSigner, MockStore } from './mock';
describe('basic', () => {
it('store', () => {
const store = new MockStore('s');
assert.equal(store.name, 's');
const mockSigner = new MockSigner();
const v = new Syncable('foo', {baz: 42});
v.setSigner(mockSigner);
store.put('foo', v);
const one = store.get('foo').toJSON();
const vv = new Syncable('bar', {baz: 666});
vv.setSigner(mockSigner);
assert.throws(() => {
store.put('foo', vv)
});
store.put('foo', vv, true);
const other = store.get('foo').toJSON();
assert.notEqual(one, other);
store.delete('foo');
assert.equal(store.get('foo'), undefined);
});
it('add_doc_to_dispatcher', () => {
const store = new MockStore('s');
//const syncer = new MockSyncer();
const dispatcher = new Dispatcher(store, undefined);
const user = new User('foo');
dispatcher.add(user.id, user);
assert(dispatcher.isDirty());
});
it('dispatch_keyindex', () => {
const s = 'foo';
const k = toIndexKey(s);
const v = fromIndexKey(k);
assert.equal(s, v);
});
});

View File

@@ -1,212 +0,0 @@
import * as Automerge from 'automerge';
import assert = require('assert');
import * as pgp from 'openpgp';
import * as fs from 'fs';
import { PGPSigner } from '../src/auth';
import { Syncable, ArgPair } from '../src/sync';
import { MockKeyStore, MockSigner } from './mock';
describe('sync', async () => {
it('sync_merge', () => {
const mockSigner = new MockSigner();
const s = new Syncable('foo', {
bar: 'baz',
});
s.setSigner(mockSigner);
const changePair = new ArgPair('xyzzy', 42);
s.update([changePair], 'ch-ch-cha-changes');
assert.equal(s.m.data['xyzzy'], 42)
assert.equal(s.m.data['bar'], 'baz')
assert.equal(s.m['id'], 'foo')
assert.equal(Automerge.getHistory(s.m).length, 2);
});
it('sync_serialize', () => {
const mockSigner = new MockSigner();
const s = new Syncable('foo', {
bar: 'baz',
});
s.setSigner(mockSigner);
const j = s.toJSON();
const ss = Syncable.fromJSON(j);
assert.equal(ss.m['id'], 'foo');
assert.equal(ss.m['data']['bar'], 'baz');
assert.equal(Automerge.getHistory(ss.m).length, 1);
});
it('sync_sign_and_wrap', () => {
const mockSigner = new MockSigner();
const s = new Syncable('foo', {
bar: 'baz',
});
s.setSigner(mockSigner);
s.onwrap = (e) => {
const j = e.toJSON();
const v = JSON.parse(j);
assert.deepEqual(v.payload, e.o.payload);
}
s.sign();
});
it('sync_verify_success', async () => {
const pksa = fs.readFileSync(__dirname + '/privatekeys.asc');
const pks = await pgp.key.readArmored(pksa);
await pks.keys[0].decrypt('merman');
await pks.keys[1].decrypt('beastman');
const pubksa = fs.readFileSync(__dirname + '/publickeys.asc');
const pubks = await pgp.key.readArmored(pubksa);
const oneStore = new MockKeyStore(pks.keys[0], pubks.keys);
const twoStore = new MockKeyStore(pks.keys[1], pubks.keys);
const threeStore = new MockKeyStore(pks.keys[2], [pubks.keys[0], pubks.keys[2]]);
const oneSigner = new PGPSigner(oneStore);
const twoSigner = new PGPSigner(twoStore);
const threeSigner = new PGPSigner(threeStore);
const x = new Syncable('foo', {
bar: 'baz',
});
x.setSigner(oneSigner);
// TODO: make this look better
x.onwrap = (e) => {
let updateData = new ArgPair('bar', 'xyzzy');
x.update([updateData], 'change one');
x.onwrap = (e) => {
x.setSigner(twoSigner);
updateData = new ArgPair('bar', 42);
x.update([updateData], 'change two');
x.onwrap = (e) => {
const p = e.unwrap();
p.setSigner(twoSigner);
p.onauthenticate = (v) => {
assert(v);
}
p.authenticate();
}
x.sign();
};
x.sign();
}
x.sign();
});
it('sync_verify_fail', async () => {
const pksa = fs.readFileSync(__dirname + '/privatekeys.asc');
const pks = await pgp.key.readArmored(pksa);
await pks.keys[0].decrypt('merman');
await pks.keys[1].decrypt('beastman');
const pubksa = fs.readFileSync(__dirname + '/publickeys.asc');
const pubks = await pgp.key.readArmored(pubksa);
const oneStore = new MockKeyStore(pks.keys[0], pubks.keys);
const twoStore = new MockKeyStore(pks.keys[1], pubks.keys);
const threeStore = new MockKeyStore(pks.keys[2], [pubks.keys[0], pubks.keys[2]]);
const oneSigner = new PGPSigner(oneStore);
const twoSigner = new PGPSigner(twoStore);
const threeSigner = new PGPSigner(threeStore);
const x = new Syncable('foo', {
bar: 'baz',
});
x.setSigner(oneSigner);
// TODO: make this look better
x.onwrap = (e) => {
let updateData = new ArgPair('bar', 'xyzzy');
x.update([updateData], 'change one');
x.onwrap = (e) => {
x.setSigner(twoSigner);
updateData = new ArgPair('bar', 42);
x.update([updateData], 'change two');
x.onwrap = (e) => {
const p = e.unwrap();
p.setSigner(threeSigner);
p.onauthenticate = (v) => {
assert(!v);
}
p.authenticate();
}
x.sign();
};
x.sign();
}
x.sign();
});
xit('sync_verify_shallow_tricked', async () => {
const pksa = fs.readFileSync(__dirname + '/privatekeys.asc');
const pks = await pgp.key.readArmored(pksa);
await pks.keys[0].decrypt('merman');
await pks.keys[1].decrypt('beastman');
const pubksa = fs.readFileSync(__dirname + '/publickeys.asc');
const pubks = await pgp.key.readArmored(pubksa);
const oneStore = new MockKeyStore(pks.keys[0], pubks.keys);
const twoStore = new MockKeyStore(pks.keys[1], pubks.keys);
const threeStore = new MockKeyStore(pks.keys[2], [pubks.keys[0], pubks.keys[2]]);
const oneSigner = new PGPSigner(oneStore);
const twoSigner = new PGPSigner(twoStore);
const threeSigner = new PGPSigner(threeStore);
const x = new Syncable('foo', {
bar: 'baz',
});
x.setSigner(twoSigner);
// TODO: make this look better
x.onwrap = (e) => {
let updateData = new ArgPair('bar', 'xyzzy');
x.update([updateData], 'change one');
x.onwrap = (e) => {
updateData = new ArgPair('bar', 42);
x.update([updateData], 'change two');
x.setSigner(oneSigner);
x.onwrap = (e) => {
const p = e.unwrap();
p.setSigner(threeSigner);
p.onauthenticate = (v) => {
assert(v);
p.onauthenticate = (v) => {
assert(!v);
}
p.authenticate(true);
}
p.authenticate();
}
x.sign();
};
x.sign();
}
x.sign();
});
});

View File

@@ -1,14 +0,0 @@
import * as assert from 'assert';
import { MockPubSub, MockConsumer } from './mock';
describe('transport', () => {
it('pub_sub', () => {
const c = new MockConsumer();
const ps = new MockPubSub('foo', c);
ps.pub('foo');
ps.pub('bar');
ps.flush();
assert.deepEqual(c.omnoms, ['foo', 'bar']);
});
});

View File

@@ -1,46 +0,0 @@
import assert = require('assert');
import pgp = require('openpgp');
import crypto = require('crypto');
import { Syncable, ArgPair } from '../src/sync';
import { MockKeyStore, MockSignable } from './mock';
import { PGPSigner } from '../src/auth';
describe('auth', async () => {
await it('digest', async () => {
const opts = {
userIds: [
{
name: 'John Marston',
email: 'red@dead.com',
},
],
numBits: 2048,
passphrase: 'foo',
};
const pkgen = await pgp.generateKey(opts);
const pka = pkgen.privateKeyArmored;
const pks = await pgp.key.readArmored(pka);
await pks.keys[0].decrypt('foo');
const pubka = pkgen.publicKeyArmored;
const pubks = await pgp.key.readArmored(pubka);
const keyStore = new MockKeyStore(pks.keys[0], pubks.keys);
const s = new PGPSigner(keyStore);
const message = await pgp.cleartext.fromText('foo');
s.onverify = (ok) => {
assert(ok);
}
s.onsign = (signature) => {
s.onverify((v) => {
console.log('bar', v);
});
s.verify('foo', signature);
}
await s.sign('foo');
});
});

View File

@@ -1,47 +0,0 @@
import * as assert from 'assert';
import * as pgp from 'openpgp';
import { Dispatcher } from '../src/dispatch';
import { User } from '../src/assets/user';
import { PGPSigner, KeyStore } from '../src/auth';
import { SubConsumer } from '../src/transport';
import { MockStore, MockPubSub, MockConsumer, MockKeyStore } from './mock';
async function createKeyStore() {
const opts = {
userIds: [
{
name: 'John Marston',
email: 'red@dead.com',
},
],
numBits: 2048,
passphrase: 'foo',
};
const pkgen = await pgp.generateKey(opts);
const pka = pkgen.privateKeyArmored;
const pks = await pgp.key.readArmored(pka);
await pks.keys[0].decrypt('foo');
return new MockKeyStore(pks.keys[0], []);
}
describe('fullchain', async () => {
it('dispatch_and_publish_user', async () => {
const g = await createKeyStore();
const n = new PGPSigner(g);
const u = new User('u1', {});
u.setSigner(n);
u.setName('Nico', 'Bellic');
const s = new MockStore('fooStore');
const c = new MockConsumer();
const p = new MockPubSub('fooPubSub', c);
const d = new Dispatcher(s, p);
u.onwrap = (e) => {
d.add(u.id, e);
d.sync(0);
assert.equal(p.pubs.length, 1);
};
u.sign();
});
});

View File

@@ -1,150 +0,0 @@
import * as crypto from 'crypto';
import { Signable, Signature, KeyStore } from '../src/auth';
import { Store } from '../src/store';
import { PubSub, SubConsumer } from '../src/transport';
import { Syncable } from '../src/sync';
class MockStore implements Store {
contents: Object
name: string
constructor(name:string) {
this.name = name;
this.contents = {};
}
public put(k:string, v:Syncable, existsOk = false) {
if (!existsOk && this.contents[k] !== undefined) {
throw '"' + k + '" already exists in store ' + this.name;
} 
this.contents[k] = v;
}
public get(k:string): Syncable {
return this.contents[k];
}
public delete(k:string) {
delete this.contents[k];
}
}
class MockSigner {
onsign: (string) => void
onverify: (boolean) => void
public verify(src:string, signature:Signature) {
return true;
}
public sign(s:string):boolean {
this.onsign('there would be a signature here');
return true;
}
public prepare(m:Signable):boolean {
return true;
}
public fingerprint():string {
return '';
}
}
class MockConsumer implements SubConsumer {
omnoms: Array<string>
constructor() {
this.omnoms = Array<string>();
}
public post(v:string) {
this.omnoms.push(v);
}
}
class MockPubSub implements PubSub {
pubs: Array<string>
consumer: SubConsumer
constructor(name:string, consumer:SubConsumer) {
this.pubs = Array<string>();
this.consumer = consumer;
}
public pub(v:string): boolean {
this.pubs.push(v);
return true;
}
public flush() {
while (this.pubs.length > 0) {
const s = this.pubs.shift();
this.consumer.post(s);
}
}
public close() {
}
}
class MockSignable implements Signable {
src: string
dst: string
constructor(src:string) {
this.src = src;
}
public digest():string {
const h = crypto.createHash('sha256');
h.update(this.src);
this.dst= h.digest('hex');
return this.dst;
}
}
class MockKeyStore implements KeyStore {
pk: any
pubks: Array<any>
constructor(pk:any, pubks:Array<any>) {
this.pk = pk;
this.pubks = pubks;
}
public getPrivateKey(): any {
return this.pk;
}
public getTrustedKeys(): Array<any> {
return this.pubks;
}
public getTrustedActiveKeys(): Array<any> {
return [];
}
public getEncryptKeys(): Array<any> {
return [];
}
public getFingerprint(): string {
return '';
}
}
export {
MockStore,
MockPubSub,
MockConsumer,
MockSignable,
MockKeyStore,
MockSigner,
};

View File

@@ -1,13 +1,10 @@
import Automerge = require('automerge');
import assert = require('assert');
import fs = require('fs');
import pgp = require('openpgp');
import sqlite = require('sqlite3');
import * as handlers from '../scripts/server/handlers';
import { Envelope, Syncable, ArgPair } from '../src/sync';
import { PGPKeyStore, PGPSigner, KeyStore, Signer } from '../src/auth';
import { SqliteAdapter } from '../src/db';
import { Envelope, Syncable, ArgPair, PGPKeyStore, PGPSigner, KeyStore, Signer, SqliteAdapter } from 'crdt-meta';
function createKeystore() {
const pksa = fs.readFileSync(__dirname + '/privatekeys.asc', 'utf-8');

View File

@@ -3,6 +3,7 @@ import logging
import re
# third-party imports
from celery.app.control import Inspect
import celery
# local imports
@@ -15,6 +16,29 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
@@ -22,17 +46,9 @@ class Api:
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
registered_tasks = app.tasks
self.sms_tasks = []
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
@@ -44,12 +60,17 @@ class Api:
:rtype: Celery.Task
"""
signatures = []
for task in self.sms_tasks:
signature = celery.signature(task)
for q in self.sms_tasks:
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
)
signatures.append(signature)
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result
t = celery.group(signatures)()
return t

View File

@@ -0,0 +1,76 @@
# standard imports
import sys
import os
import logging
import argparse
import tempfile
# external imports
import celery
import confini
# local imports
from cic_notify.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('recipient', type=str, help='notification recipient')
argparser.add_argument('message', type=str, help='message text')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.add(args.recipient, '_RECIPIENT', True)
config.add(args.message, '_MESSAGE', True)
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
if __name__ == '__main__':
a = Api()
t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))

View File

@@ -33,7 +33,9 @@ elif args.v:
config = confini.Config(args.c, args.env_prefix)
config.process()
config.add(args.q, '_CELERY_QUEUE', True)
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database
dsn = dsn_from_config(config)

View File

@@ -6,11 +6,10 @@ import time
import semver
# local imports
from cic_notify.error import PleaseCommitFirstError
logg = logging.getLogger()
version = (0, 4, 0, 'alpha.3')
version = (0, 4, 0, 'alpha.4')
version_object = semver.VersionInfo(
major=version[0],
@@ -18,27 +17,4 @@ version_object = semver.VersionInfo(
patch=version[2],
prerelease=version[3],
)
version_string = str(version_object)
def git_hash():
import subprocess
git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True)
git_hash_brief = git_hash.stdout.decode('utf-8')[:8]
return git_hash_brief
try:
version_git = git_hash()
version_string += '+build.{}'.format(version_git)
except FileNotFoundError:
time_string_pair = str(time.time()).split('.')
version_string += '+build.{}{:<09d}'.format(
time_string_pair[0],
int(time_string_pair[1]),
)
logg.info(f'Final version string will be {version_string}')
__version_string__ = version_string

View File

@@ -1,4 +1,4 @@
FROM python:3.8.6
FROM python:3.8.6-slim-buster
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps

View File

@@ -1 +1 @@
cic_base[full_graph]~=0.1.2a46
cic_base[full_graph]~=0.1.2a61

View File

@@ -1,6 +1,5 @@
[metadata]
name = cic-notify
version= 0.4.0a2
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no
@@ -45,3 +44,4 @@ testing =
[options.entry_points]
console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main

View File

@@ -1,9 +1,31 @@
# standard imports
import logging
import subprocess
import time
from setuptools import setup
# third-party imports
# local imports
from cic_notify.version import version_string
logg = logging.getLogger()
def git_hash():
git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True)
git_hash_brief = git_hash.stdout.decode('utf-8')[:8]
return git_hash_brief
try:
version_git = git_hash()
version_string += '+build.{}'.format(version_git)
except FileNotFoundError:
time_string_pair = str(time.time()).split('.')
version_string += '+build.{}{:<09d}'.format(
time_string_pair[0],
int(time_string_pair[1]),
)
logg.info(f'Final version string will be {version_string}')
requirements = []
@@ -25,6 +47,6 @@ while True:
test_requirements_file.close()
setup(
version=version_string,
install_requires=requirements,
tests_require=test_requirements,
)
tests_require=test_requirements)

View File

@@ -1,14 +1,24 @@
[app]
ALLOWED_IP=127.0.0.1
ALLOWED_IP=0.0.0.0/0
LOCALE_FALLBACK=en
LOCALE_PATH=var/lib/locale/
LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json
user =
pass =
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/
[client]
host =
port =
ssl =

View File

@@ -6,3 +6,5 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
POOL_SIZE=1

View File

@@ -3,7 +3,7 @@ BROKER_URL=redis://
RESULT_URL=redis://
[redis]
HOSTNAME=localhost
HOSTNAME=redis
PASSWORD=
PORT=6379
DATABASE=0

View File

@@ -8,12 +8,12 @@ from cic_types.processor import generate_metadata_pointer
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.redis import get_cached_data
def define_account_tx_metadata(user: User):
def define_account_tx_metadata(user: Account):
# get sender metadata
identifier = blockchain_address_to_metadata_pointer(
blockchain_address=user.blockchain_address

View File

@@ -1,4 +1,4 @@
"""Create user table
"""Create account table
Revision ID: f289e8510444
Revises:
@@ -17,7 +17,7 @@ depends_on = None
def upgrade():
op.create_table('user',
op.create_table('account',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('blockchain_address', sa.String(), nullable=False),
sa.Column('phone_number', sa.String(), nullable=False),
@@ -29,11 +29,11 @@ def upgrade():
sa.Column('updated', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_phone_number'), 'user', ['phone_number'], unique=True)
op.create_index(op.f('ix_user_blockchain_address'), 'user', ['blockchain_address'], unique=True)
op.create_index(op.f('ix_account_phone_number'), 'account', ['phone_number'], unique=True)
op.create_index(op.f('ix_account_blockchain_address'), 'account', ['blockchain_address'], unique=True)
def downgrade():
op.drop_index(op.f('ix_user_blockchain_address'), table_name='user')
op.drop_index(op.f('ix_user_phone_number'), table_name='user')
op.drop_table('user')
op.drop_index(op.f('ix_account_blockchain_address'), table_name='account')
op.drop_index(op.f('ix_account_phone_number'), table_name='account')
op.drop_table('account')

View File

@@ -16,12 +16,12 @@ class AccountStatus(IntEnum):
RESET = 4
class User(SessionBase):
class Account(SessionBase):
"""
This class defines a user record along with functions responsible for hashing the user's corresponding password and
subsequently verifying a password's validity given an input to compare against the persisted hash.
"""
__tablename__ = 'user'
__tablename__ = 'account'
blockchain_address = Column(String)
phone_number = Column(String)
@@ -38,7 +38,7 @@ class User(SessionBase):
self.account_status = AccountStatus.PENDING.value
def __repr__(self):
return f'<User: {self.blockchain_address}>'
return f'<Account: {self.blockchain_address}>'
def create_password(self, password):
"""This method takes a password value and hashes the value before assigning it to the corresponding

View File

@@ -1,47 +1,129 @@
# standard imports
# stanard imports
import logging
import datetime
# third-party imports
# external imports
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger().getChild(__name__)
Model = declarative_base(name='Model')
class SessionBase(Model):
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
"""
__abstract__ = True
id = Column(Integer, primary_key=True)
created = Column(DateTime, default=datetime.datetime.utcnow)
updated = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
id = Column(Integer, primary_key=True)
engine = None
session = None
query = None
"""Database connection engine of the running aplication"""
sessionmaker = None
"""Factory object responsible for creating sessions from the connection pool"""
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
def create_session():
session = sessionmaker(bind=SessionBase.engine)
return session()
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
"""
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@staticmethod
def build():
Model.metadata.create_all(bind=SessionBase.engine)
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
:type dsn: str
"""
e = None
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if pool_size == 0:
poolclass = NullPool
elif debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db connection not poolable')
e = create_engine(
dsn,
echo=debug,
)
SessionBase._set_engine(e)
@staticmethod
# https://docs.sqlalchemy.org/en/13/core/pooling.html#pool-disconnects
def connect(data_source_name):
engine = create_engine(data_source_name, pool_pre_ping=True)
SessionBase._set_engine(engine)
@staticmethod
def disconnect():
"""Disconnect from database and free resources.
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()

View File

@@ -18,7 +18,7 @@ class ActionDataNotFoundError(OSError):
pass
class UserMetadataNotFoundError(OSError):
class MetadataNotFoundError(OSError):
"""Raised when metadata is expected but not available in cache."""
pass
@@ -31,3 +31,10 @@ class UnsupportedMethodError(OSError):
class CachedDataNotFoundError(OSError):
"""Raised when the method passed to the make request function is unsupported."""
pass
class MetadataStoreError(Exception):
"""Raised when metadata storage fails"""
pass

View File

@@ -3,7 +3,10 @@
# third-party imports
import requests
from chainlib.eth.address import to_checksum
from hexathon import add_0x
from hexathon import (
add_0x,
strip_0x,
)
# local imports
from cic_ussd.error import UnsupportedMethodError
@@ -40,4 +43,4 @@ def blockchain_address_to_metadata_pointer(blockchain_address: str):
:return:
:rtype:
"""
return bytes.fromhex(blockchain_address[2:])
return bytes.fromhex(strip_0x(blockchain_address))

View File

@@ -0,0 +1,126 @@
# standard imports
import json
import logging
import os
from typing import Dict, Union
# third-part imports
import requests
from cic_types.models.person import generate_metadata_pointer, Person
# local imports
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
from cic_ussd.error import MetadataStoreError
logg = logging.getLogger().getChild(__name__)
class Metadata:
"""
:cvar base_url: The base url or the metadata server.
:type base_url: str
"""
base_url = None
def metadata_http_error_handler(result: requests.Response):
""" This function handles and appropriately raises errors from http requests interacting with the metadata server.
:param result: The response object from a http request.
:type result: requests.Response
"""
status_code = result.status_code
if 100 <= status_code < 200:
raise MetadataStoreError(f'Informational errors: {status_code}, reason: {result.reason}')
elif 300 <= status_code < 400:
raise MetadataStoreError(f'Redirect Issues: {status_code}, reason: {result.reason}')
elif 400 <= status_code < 500:
raise MetadataStoreError(f'Client Error: {status_code}, reason: {result.reason}')
elif 500 <= status_code < 600:
raise MetadataStoreError(f'Server Error: {status_code}, reason: {result.reason}')
class MetadataRequestsHandler(Metadata):
def __init__(self, cic_type: str, identifier: bytes, engine: str = 'pgp'):
"""
:param cic_type: The salt value with which to hash a specific metadata identifier.
:type cic_type: str
:param engine: Encryption used for sending data to the metadata server.
:type engine: str
:param identifier: A unique element of data in bytes necessary for creating a metadata pointer.
:type identifier: bytes
"""
self.cic_type = cic_type
self.engine = engine
self.headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
self.identifier = identifier
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier,
cic_type=self.cic_type
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: Union[Dict, str]):
""" This function is responsible for posting data to the metadata server with a corresponding metadata pointer
for storage.
:param data: The data to be stored in the metadata server.
:type data: dict|str
"""
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata_http_error_handler(result=result)
metadata = result.content
self.edit(data=metadata)
def edit(self, data: bytes):
""" This function is responsible for editing data in the metadata server corresponding to a unique pointer.
:param data: The data to be edited in the metadata server.
:type data: bytes
"""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
decoded_data = data.decode('utf-8')
formatted_data = {
'm': data.decode('utf-8'),
's': {
'engine': self.engine,
'algo': algorithm,
'data': signature,
'digest': json.loads(data).get('digest'),
}
}
formatted_data = json.dumps(formatted_data).encode('utf-8')
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'signed metadata submission status: {result.status_code}.')
metadata_http_error_handler(result=result)
try:
decoded_identifier = self.identifier.decode("utf-8")
except UnicodeDecodeError:
decoded_identifier = self.identifier.hex()
logg.info(f'identifier: {decoded_identifier}. metadata pointer: {self.metadata_pointer} set to: {decoded_data}.')
def query(self):
"""This function is responsible for querying the metadata server for data corresponding to a unique pointer."""
result = make_request(method='GET', url=self.url)
metadata_http_error_handler(result=result)
response_data = result.content
data = json.loads(response_data.decode('utf-8'))
if result.status_code == 200 and self.cic_type == 'cic.person':
person = Person()
deserialized_person = person.deserialize(person_data=json.loads(data))
data = json.dumps(deserialized_person.serialize())
cache_data(self.metadata_pointer, data=data)
logg.debug(f'caching: {data} with key: {self.metadata_pointer}')

View File

@@ -0,0 +1,12 @@
# standard imports
# third-party imports
# local imports
from .base import MetadataRequestsHandler
class PersonMetadata(MetadataRequestsHandler):
def __init__(self, identifier: bytes):
super().__init__(cic_type='cic.person', identifier=identifier)

View File

@@ -0,0 +1,13 @@
# standard imports
import logging
# external imports
# local imports
from .base import MetadataRequestsHandler
class PhonePointerMetadata(MetadataRequestsHandler):
def __init__(self, identifier: bytes):
super().__init__(cic_type='cic.msisdn', identifier=identifier)

View File

@@ -44,7 +44,7 @@ class Signer:
gpg_keys = self.gpg.list_keys()
key_algorithm = gpg_keys[0].get('algo')
key_id = gpg_keys[0].get("keyid")
logg.info(f'using signing key: {key_id}, algorithm: {key_algorithm}')
logg.debug(f'using signing key: {key_id}, algorithm: {key_algorithm}')
return gpg_keys[0]
def sign_digest(self, data: bytes):

View File

@@ -1,102 +0,0 @@
# standard imports
import json
import logging
import os
# third-party imports
import requests
from cic_types.models.person import generate_metadata_pointer, Person
# local imports
from cic_ussd.chain import Chain
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
logg = logging.getLogger()
class UserMetadata:
"""
:cvar base_url:
:type base_url:
"""
base_url = None
def __init__(self, identifier: bytes):
"""
:param identifier:
:type identifier:
"""
self. headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
self.identifier = identifier
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier,
cic_type='cic.person'
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: dict):
try:
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata = result.content
self.edit(data=metadata, engine='pgp')
logg.info(f'Get sign material response status: {result.status_code}')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def edit(self, data: bytes, engine: str):
"""
:param data:
:type data:
:param engine:
:type engine:
:return:
:rtype:
"""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
formatted_data = {
'm': data.decode('utf-8'),
's': {
'engine': engine,
'algo': algorithm,
'data': signature,
'digest': json.loads(data).get('digest'),
}
}
formatted_data = json.dumps(formatted_data).encode('utf-8')
try:
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'Signed content submission status: {result.status_code}.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def query(self):
result = make_request(method='GET', url=self.url)
status = result.status_code
logg.info(f'Get latest data status: {status}')
try:
if status == 200:
response_data = result.content
data = json.loads(response_data.decode())
# validate data
person = Person()
deserialized_person = person.deserialize(person_data=json.loads(data))
cache_data(key=self.metadata_pointer, data=json.dumps(deserialized_person.serialize()))
elif status == 404:
logg.info('The data is not available and might need to be added.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)

View File

@@ -10,7 +10,7 @@ from tinydb.table import Document
from typing import Optional
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.db.models.task_tracker import TaskTracker
from cic_ussd.menu.ussd_menu import UssdMenu
@@ -143,10 +143,10 @@ def get_account_status(phone_number) -> str:
:return: The user account status.
:rtype: str
"""
user = User.session.query(User).filter_by(phone_number=phone_number).first()
user = Account.session.query(Account).filter_by(phone_number=phone_number).first()
status = user.get_account_status()
User.session.add(user)
User.session.commit()
Account.session.add(user)
Account.session.commit()
return status
@@ -269,12 +269,12 @@ def cache_account_creation_task_id(phone_number: str, task_id: str):
redis_cache.persist(name=task_id)
def process_current_menu(ussd_session: Optional[dict], user: User, user_input: str) -> Document:
def process_current_menu(ussd_session: Optional[dict], user: Account, user_input: str) -> Document:
"""This function checks user input and returns a corresponding ussd menu
:param ussd_session: An in db ussd session object.
:type ussd_session: UssdSession
:param user: A user object.
:type user: User
:type user: Account
:param user_input: The user's input.
:type user_input: str
:return: An in memory ussd menu object.
@@ -324,7 +324,7 @@ def process_menu_interaction_requests(chain_str: str,
else:
# get user
user = User.session.query(User).filter_by(phone_number=phone_number).first()
user = Account.session.query(Account).filter_by(phone_number=phone_number).first()
# find any existing ussd session
existing_ussd_session = UssdSession.session.query(UssdSession).filter_by(
@@ -390,10 +390,10 @@ def reset_pin(phone_number: str) -> str:
:return: The status of the pin reset.
:rtype: str
"""
user = User.session.query(User).filter_by(phone_number=phone_number).first()
user = Account.session.query(Account).filter_by(phone_number=phone_number).first()
user.reset_account_pin()
User.session.add(user)
User.session.commit()
Account.session.add(user)
Account.session.commit()
response = f'Pin reset for user {phone_number} is successful!'
return response

View File

@@ -5,7 +5,7 @@ from typing import Optional
import phonenumbers
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
def process_phone_number(phone_number: str, region: str):
@@ -30,14 +30,14 @@ def process_phone_number(phone_number: str, region: str):
return parsed_phone_number
def get_user_by_phone_number(phone_number: str) -> Optional[User]:
def get_user_by_phone_number(phone_number: str) -> Optional[Account]:
"""This function queries the database for a user based on the provided phone number.
:param phone_number: A valid phone number.
:type phone_number: str
:return: A user object matching a given phone number
:rtype: User|None
:rtype: Account|None
"""
# consider adding region to user's metadata
phone_number = process_phone_number(phone_number=phone_number, region='KE')
user = User.session.query(User).filter_by(phone_number=phone_number).first()
user = Account.session.query(Account).filter_by(phone_number=phone_number).first()
return user

View File

@@ -13,9 +13,9 @@ from tinydb.table import Document
from cic_ussd.account import define_account_tx_metadata, retrieve_account_statement
from cic_ussd.balance import BalanceManager, compute_operational_balance, get_cached_operational_balance
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.account import AccountStatus, Account
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.error import UserMetadataNotFoundError
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.phone_number import get_user_by_phone_number
@@ -28,13 +28,13 @@ from cic_types.models.person import generate_metadata_pointer, get_contact_data_
logg = logging.getLogger(__name__)
def process_pin_authorization(display_key: str, user: User, **kwargs) -> str:
def process_pin_authorization(display_key: str, user: Account, **kwargs) -> str:
"""
This method provides translation for all ussd menu entries that follow the pin authorization pattern.
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param user: The user in a running USSD session.
:type user: User
:type user: Account
:param kwargs: Any additional information required by the text values in the internationalization files.
:type kwargs
:return: A string value corresponding the ussd menu's text value.
@@ -55,13 +55,13 @@ def process_pin_authorization(display_key: str, user: User, **kwargs) -> str:
)
def process_exit_insufficient_balance(display_key: str, user: User, ussd_session: dict):
def process_exit_insufficient_balance(display_key: str, user: Account, ussd_session: dict):
"""This function processes the exit menu letting users their account balance is insufficient to perform a specific
transaction.
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param user: The user requesting access to the ussd menu.
:type user: User
:type user: Account
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:return: Corresponding translation text response
@@ -90,12 +90,12 @@ def process_exit_insufficient_balance(display_key: str, user: User, ussd_session
)
def process_exit_successful_transaction(display_key: str, user: User, ussd_session: dict):
def process_exit_successful_transaction(display_key: str, user: Account, ussd_session: dict):
"""This function processes the exit menu after a successful initiation for a transfer of tokens.
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param user: The user requesting access to the ussd menu.
:type user: User
:type user: Account
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:return: Corresponding translation text response
@@ -118,11 +118,11 @@ def process_exit_successful_transaction(display_key: str, user: User, ussd_sessi
)
def process_transaction_pin_authorization(user: User, display_key: str, ussd_session: dict):
def process_transaction_pin_authorization(user: Account, display_key: str, ussd_session: dict):
"""This function processes pin authorization where making a transaction is concerned. It constructs a
pre-transaction response menu that shows the details of the transaction.
:param user: The user requesting access to the ussd menu.
:type user: User
:type user: Account
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:param ussd_session: The USSD session determining what user data needs to be extracted and added to the menu's
@@ -151,7 +151,7 @@ def process_transaction_pin_authorization(user: User, display_key: str, ussd_ses
)
def process_account_balances(user: User, display_key: str, ussd_session: dict):
def process_account_balances(user: Account, display_key: str, ussd_session: dict):
"""
:param user:
:type user:
@@ -205,7 +205,7 @@ def format_transactions(transactions: list, preferred_language: str):
return formatted_transactions
def process_display_user_metadata(user: User, display_key: str):
def process_display_user_metadata(user: Account, display_key: str):
"""
:param user:
:type user:
@@ -235,10 +235,10 @@ def process_display_user_metadata(user: User, display_key: str):
products=products
)
else:
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {user.blockchain_address}')
raise MetadataNotFoundError(f'Expected person metadata but found none in cache for key: {key}')
def process_account_statement(user: User, display_key: str, ussd_session: dict):
def process_account_statement(user: Account, display_key: str, ussd_session: dict):
"""
:param user:
:type user:
@@ -301,12 +301,12 @@ def process_account_statement(user: User, display_key: str, ussd_session: dict):
)
def process_start_menu(display_key: str, user: User):
def process_start_menu(display_key: str, user: Account):
"""This function gets data on an account's balance and token in order to append it to the start of the start menu's
title. It passes said arguments to the translation function and returns the appropriate corresponding text from the
translation files.
:param user: The user requesting access to the ussd menu.
:type user: User
:type user: Account
:param display_key: The path in the translation files defining an appropriate ussd response
:type display_key: str
:return: Corresponding translation text response
@@ -331,11 +331,11 @@ def process_start_menu(display_key: str, user: User):
operational_balance = compute_operational_balance(balances=balances_data)
# retrieve and cache account's metadata
s_query_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_user_metadata.apply_async(queue='cic-ussd')
s_query_person_metadata.apply_async(queue='cic-ussd')
# retrieve and cache account's statement
retrieve_account_statement(blockchain_address=blockchain_address)
@@ -361,13 +361,13 @@ def retrieve_most_recent_ussd_session(phone_number: str) -> UssdSession:
return last_ussd_session
def process_request(user_input: str, user: User, ussd_session: Optional[dict] = None) -> Document:
def process_request(user_input: str, user: Account, ussd_session: Optional[dict] = None) -> Document:
"""This function assesses a request based on the user from the request comes, the session_id and the user's
input. It determines whether the request translates to a return to an existing session by checking whether the
provided session id exists in the database or whether the creation of a new ussd session object is warranted.
It then returns the appropriate ussd menu text values.
:param user: The user requesting access to the ussd menu.
:type user: User
:type user: Account
:param user_input: The value a user enters in the ussd menu.
:type user_input: str
:param ussd_session: A JSON serialized in-memory ussd session object
@@ -389,14 +389,11 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
)
logg.debug(f'METADATA POINTER: {key}')
user_metadata = get_cached_data(key=key)
logg.debug(f'METADATA: {user_metadata}')
person_metadata = get_cached_data(key=key)
if last_ussd_session:
# get last state
last_state = last_ussd_session.state
logg.debug(f'LAST USSD SESSION STATE: {last_state}')
# if last state is account_creation_prompt and metadata exists, show start menu
if last_state in [
'account_creation_prompt',
@@ -405,7 +402,7 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
'exit_invalid_new_pin',
'exit_pin_mismatch',
'exit_invalid_request'
] and user_metadata is not None:
] and person_metadata is not None:
return UssdMenu.find_by_name(name='start')
else:
return UssdMenu.find_by_name(name=last_state)
@@ -418,14 +415,14 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
return UssdMenu.find_by_name(name='initial_pin_entry')
def next_state(ussd_session: dict, user: User, user_input: str) -> str:
def next_state(ussd_session: dict, user: Account, user_input: str) -> str:
"""This function navigates the state machine based on the ussd session object and user inputs it receives.
It checks the user input and provides the successive state in the state machine. It then updates the session's
state attribute with the new state.
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:param user: The user requesting access to the ussd menu.
:type user: User
:type user: Account
:param user_input: The value a user enters in the ussd menu.
:type user_input: str
:return: A string value corresponding the successive give a specific state in the state machine.
@@ -441,7 +438,7 @@ def custom_display_text(
display_key: str,
menu_name: str,
ussd_session: dict,
user: User) -> str:
user: Account) -> str:
"""This function extracts the appropriate session data based on the current menu name. It then inserts them as
keywords in the i18n function.
:param display_key: The path in the translation files defining an appropriate ussd response
@@ -449,7 +446,7 @@ def custom_display_text(
:param menu_name: The name by which a specific menu can be identified.
:type menu_name: str
:param user: The user in a running USSD session.
:type user: User
:type user: Account
:param ussd_session: A JSON serialized in-memory ussd session object
:type ussd_session: dict
:return: A string value corresponding the ussd menu's text value.

View File

@@ -10,7 +10,7 @@ from urllib.parse import urlparse, parse_qs
from sqlalchemy import desc
# local imports
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.account import AccountStatus, Account
from cic_ussd.operations import get_account_status, reset_pin
from cic_ussd.validator import check_known_user
@@ -123,9 +123,9 @@ def process_locked_accounts_requests(env: dict) -> tuple:
else:
limit = r[1]
locked_accounts = User.session.query(User.blockchain_address).filter(
User.account_status == AccountStatus.LOCKED.value,
User.failed_pin_attempts >= 3).order_by(desc(User.updated)).offset(offset).limit(limit).all()
locked_accounts = Account.session.query(Account.blockchain_address).filter(
Account.account_status == AccountStatus.LOCKED.value,
Account.failed_pin_attempts >= 3).order_by(desc(Account.updated)).offset(offset).limit(limit).all()
# convert lists to scalar blockchain addresses
locked_accounts = [blockchain_address for (blockchain_address, ) in locked_accounts]

View File

@@ -23,10 +23,11 @@ from cic_ussd.encoder import PasswordEncoder
from cic_ussd.files.local_files import create_local_file_data_stores, json_file_parser
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.base import Metadata
from cic_ussd.operations import (define_response_with_content,
process_menu_interaction_requests,
define_multilingual_responses)
from cic_ussd.phone_number import process_phone_number
from cic_ussd.redis import InMemoryStore
from cic_ussd.requests import (get_request_endpoint,
get_request_method,
@@ -56,20 +57,17 @@ arg_parser.add_argument('--env-prefix',
help='environment prefix for variables to overwrite configuration')
args = arg_parser.parse_args()
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
# parse config
config = Config(config_dir=args.c, env_prefix=args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
# log config vars
logg.debug(config)
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# initialize elements
# set up translations
@@ -86,7 +84,7 @@ UssdMenu.ussd_menu_db = ussd_menu_db
# set up db
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name=data_source_name)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# create session for the life time of http request
SessionBase.session = SessionBase.create_session()
@@ -99,7 +97,7 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
UserMetadata.base_url = config.get('CIC_META_URL')
Metadata.base_url = config.get('CIC_META_URL')
# define signer values
export_dir = config.get('PGP_EXPORT_DIR')
@@ -151,6 +149,10 @@ def application(env, start_response):
external_session_id = post_data.get('sessionId')
user_input = post_data.get('text')
# add validation for phone number
if phone_number:
phone_number = process_phone_number(phone_number=phone_number, region=config.get('PHONE_NUMBER_REGION'))
# validate ip address
if not check_ip(config=config, env=env):
start_response('403 Sneaky, sneaky', errors_headers)
@@ -174,8 +176,10 @@ def application(env, start_response):
# validate phone number
if not validate_phone_number(phone_number):
logg.error('invalid phone number {}'.format(phone_number))
start_response('400 Invalid phone number format', errors_headers)
return []
logg.debug('session {} started for {}'.format(external_session_id, phone_number))
# handle menu interaction requests
chain_str = chain_spec.__str__()

View File

@@ -6,6 +6,7 @@ import tempfile
# third party imports
import celery
import i18n
import redis
from confini import Config
@@ -13,13 +14,14 @@ from confini import Config
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.base import Metadata
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.validator import validate_presence
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('gnupg').setLevel(logging.WARNING)
config_directory = '/usr/local/etc/cic-ussd/'
@@ -32,22 +34,22 @@ arg_parser.add_argument('-vv', action='store_true', help='be more verbose')
arg_parser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
args = arg_parser.parse_args()
# parse config
config = Config(config_dir=args.c, env_prefix=args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
logg.debug(config)
# parse config
config = Config(args.c, args.env_prefix)
config.process()
config.add(args.q, '_CELERY_QUEUE', True)
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name=data_source_name)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query
session = SessionBase.create_session()
@@ -63,7 +65,7 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
UserMetadata.base_url = config.get('CIC_META_URL')
Metadata.base_url = config.get('CIC_META_URL')
# define signer values
export_dir = config.get('PGP_EXPORT_DIR')
@@ -76,6 +78,10 @@ if key_file_path:
validate_presence(path=key_file_path)
Signer.key_file_path = key_file_path
# set up translations
i18n.load_path.append(config.get('APP_LOCALE_PATH'))
i18n.set('fallback', config.get('APP_LOCALE_FALLBACK'))
# set up celery
current_app = celery.Celery(__name__)

View File

@@ -5,12 +5,12 @@ from typing import Tuple
# third-party imports
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
logg = logging.getLogger(__file__)
def process_mini_statement_request(state_machine_data: Tuple[str, dict, User]):
def process_mini_statement_request(state_machine_data: Tuple[str, dict, Account]):
"""This function compiles a brief statement of a user's last three inbound and outbound transactions and send the
same as a message on their selected avenue for notification.
:param state_machine_data: A tuple containing user input, a ussd session and user object.

View File

@@ -6,10 +6,10 @@ ussd menu facilitating the return of appropriate menu responses based on said us
from typing import Tuple
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
def menu_one_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
def menu_one_selected(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks that user input matches a string with value '1'
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
@@ -20,7 +20,7 @@ def menu_one_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
return user_input == '1'
def menu_two_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
def menu_two_selected(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks that user input matches a string with value '2'
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -31,7 +31,7 @@ def menu_two_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
return user_input == '2'
def menu_three_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
def menu_three_selected(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks that user input matches a string with value '3'
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -42,7 +42,7 @@ def menu_three_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
return user_input == '3'
def menu_four_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
def menu_four_selected(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""
This function checks that user input matches a string with value '4'
:param state_machine_data: A tuple containing user input, a ussd session and user object.
@@ -54,7 +54,7 @@ def menu_four_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
return user_input == '4'
def menu_five_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
def menu_five_selected(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""
This function checks that user input matches a string with value '5'
:param state_machine_data: A tuple containing user input, a ussd session and user object.
@@ -66,7 +66,7 @@ def menu_five_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
return user_input == '5'
def menu_zero_zero_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
def menu_zero_zero_selected(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""
This function checks that user input matches a string with value '00'
:param state_machine_data: A tuple containing user input, a ussd session and user object.
@@ -78,7 +78,7 @@ def menu_zero_zero_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
return user_input == '00'
def menu_ninety_nine_selected(state_machine_data: Tuple[str, dict, User]) -> bool:
def menu_ninety_nine_selected(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""
This function checks that user input matches a string with value '99'
:param state_machine_data: A tuple containing user input, a ussd session and user object.

View File

@@ -12,7 +12,7 @@ from typing import Tuple
import bcrypt
# local imports
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.account import AccountStatus, Account
from cic_ussd.encoder import PasswordEncoder, create_password_hash
from cic_ussd.operations import persist_session_to_db_task, create_or_update_session
from cic_ussd.redis import InMemoryStore
@@ -21,7 +21,7 @@ from cic_ussd.redis import InMemoryStore
logg = logging.getLogger(__file__)
def is_valid_pin(state_machine_data: Tuple[str, dict, User]) -> bool:
def is_valid_pin(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks a pin's validity by ensuring it has a length of for characters and the characters are
numeric.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
@@ -37,7 +37,7 @@ def is_valid_pin(state_machine_data: Tuple[str, dict, User]) -> bool:
return pin_is_valid
def is_authorized_pin(state_machine_data: Tuple[str, dict, User]) -> bool:
def is_authorized_pin(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether the user input confirming a specific pin matches the initial pin entered.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -48,7 +48,7 @@ def is_authorized_pin(state_machine_data: Tuple[str, dict, User]) -> bool:
return user.verify_password(password=user_input)
def is_locked_account(state_machine_data: Tuple[str, dict, User]) -> bool:
def is_locked_account(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether a user's account is locked due to too many failed attempts.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -59,7 +59,7 @@ def is_locked_account(state_machine_data: Tuple[str, dict, User]) -> bool:
return user.get_account_status() == AccountStatus.LOCKED.name
def save_initial_pin_to_session_data(state_machine_data: Tuple[str, dict, User]):
def save_initial_pin_to_session_data(state_machine_data: Tuple[str, dict, Account]):
"""This function hashes a pin and stores it in session data.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -94,7 +94,7 @@ def save_initial_pin_to_session_data(state_machine_data: Tuple[str, dict, User])
persist_session_to_db_task(external_session_id=external_session_id, queue='cic-ussd')
def pins_match(state_machine_data: Tuple[str, dict, User]) -> bool:
def pins_match(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether the user input confirming a specific pin matches the initial pin entered.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -108,7 +108,7 @@ def pins_match(state_machine_data: Tuple[str, dict, User]) -> bool:
return bcrypt.checkpw(user_input.encode(), initial_pin)
def complete_pin_change(state_machine_data: Tuple[str, dict, User]):
def complete_pin_change(state_machine_data: Tuple[str, dict, Account]):
"""This function persists the user's pin to the database
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -116,11 +116,11 @@ def complete_pin_change(state_machine_data: Tuple[str, dict, User]):
user_input, ussd_session, user = state_machine_data
password_hash = ussd_session.get('session_data').get('initial_pin')
user.password_hash = password_hash
User.session.add(user)
User.session.commit()
Account.session.add(user)
Account.session.commit()
def is_blocked_pin(state_machine_data: Tuple[str, dict, User]) -> bool:
def is_blocked_pin(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether the user input confirming a specific pin matches the initial pin entered.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -131,7 +131,7 @@ def is_blocked_pin(state_machine_data: Tuple[str, dict, User]) -> bool:
return user.get_account_status() == AccountStatus.LOCKED.name
def is_valid_new_pin(state_machine_data: Tuple[str, dict, User]) -> bool:
def is_valid_new_pin(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks whether the user's new pin is a valid pin and that it isn't the same as the old one.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple

View File

@@ -3,21 +3,21 @@ import logging
from typing import Tuple
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
logg = logging.getLogger()
def send_terms_to_user_if_required(state_machine_data: Tuple[str, dict, User]):
def send_terms_to_user_if_required(state_machine_data: Tuple[str, dict, Account]):
user_input, ussd_session, user = state_machine_data
logg.debug('Requires integration to cic-notify.')
def process_mini_statement_request(state_machine_data: Tuple[str, dict, User]):
def process_mini_statement_request(state_machine_data: Tuple[str, dict, Account]):
user_input, ussd_session, user = state_machine_data
logg.debug('Requires integration to cic-notify.')
def upsell_unregistered_recipient(state_machine_data: Tuple[str, dict, User]):
def upsell_unregistered_recipient(state_machine_data: Tuple[str, dict, Account]):
user_input, ussd_session, user = state_machine_data
logg.debug('Requires integration to cic-notify.')

View File

@@ -9,7 +9,7 @@ import celery
# local imports
from cic_ussd.balance import BalanceManager, compute_operational_balance
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.account import AccountStatus, Account
from cic_ussd.operations import save_to_in_memory_ussd_session_data
from cic_ussd.phone_number import get_user_by_phone_number
from cic_ussd.redis import create_cached_data_key, get_cached_data
@@ -19,7 +19,7 @@ from cic_ussd.transactions import OutgoingTransactionProcessor
logg = logging.getLogger(__file__)
def is_valid_recipient(state_machine_data: Tuple[str, dict, User]) -> bool:
def is_valid_recipient(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks that a user exists, is not the initiator of the transaction, has an active account status
and is authorized to perform standard transactions.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
@@ -34,7 +34,7 @@ def is_valid_recipient(state_machine_data: Tuple[str, dict, User]) -> bool:
return is_not_initiator and has_active_account_status and recipient is not None
def is_valid_transaction_amount(state_machine_data: Tuple[str, dict, User]) -> bool:
def is_valid_transaction_amount(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks that the transaction amount provided is valid as per the criteria for the transaction
being attempted.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
@@ -49,7 +49,7 @@ def is_valid_transaction_amount(state_machine_data: Tuple[str, dict, User]) -> b
return False
def has_sufficient_balance(state_machine_data: Tuple[str, dict, User]) -> bool:
def has_sufficient_balance(state_machine_data: Tuple[str, dict, Account]) -> bool:
"""This function checks that the transaction amount provided is valid as per the criteria for the transaction
being attempted.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
@@ -72,7 +72,7 @@ def has_sufficient_balance(state_machine_data: Tuple[str, dict, User]) -> bool:
return int(user_input) <= operational_balance
def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, User]):
def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, Account]):
"""This function saves the phone number corresponding the intended recipients blockchain account.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
@@ -85,7 +85,7 @@ def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, Us
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)
def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, User]):
def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, Account]):
"""
:param state_machine_data:
:type state_machine_data:
@@ -97,14 +97,14 @@ def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, User]):
recipient = get_user_by_phone_number(phone_number=user_input)
blockchain_address = recipient.blockchain_address
# retrieve and cache account's metadata
s_query_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_user_metadata.apply_async(queue='cic-ussd')
s_query_person_metadata.apply_async(queue='cic-ussd')
def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, User]):
def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, Account]):
"""This function saves the phone number corresponding the intended recipients blockchain account.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
@@ -117,7 +117,7 @@ def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict,
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)
def process_transaction_request(state_machine_data: Tuple[str, dict, User]):
def process_transaction_request(state_machine_data: Tuple[str, dict, Account]):
"""This function saves the phone number corresponding the intended recipients blockchain account.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str

View File

@@ -10,8 +10,8 @@ from cic_types.models.person import generate_vcard_from_contact_data, manage_ide
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import User
from cic_ussd.error import UserMetadataNotFoundError
from cic_ussd.db.models.account import Account
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.operations import save_to_in_memory_ussd_session_data
from cic_ussd.redis import get_cached_data
@@ -19,40 +19,40 @@ from cic_ussd.redis import get_cached_data
logg = logging.getLogger(__file__)
def change_preferred_language_to_en(state_machine_data: Tuple[str, dict, User]):
def change_preferred_language_to_en(state_machine_data: Tuple[str, dict, Account]):
"""This function changes the user's preferred language to english.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
user_input, ussd_session, user = state_machine_data
user.preferred_language = 'en'
User.session.add(user)
User.session.commit()
Account.session.add(user)
Account.session.commit()
def change_preferred_language_to_sw(state_machine_data: Tuple[str, dict, User]):
def change_preferred_language_to_sw(state_machine_data: Tuple[str, dict, Account]):
"""This function changes the user's preferred language to swahili.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
user_input, ussd_session, user = state_machine_data
user.preferred_language = 'sw'
User.session.add(user)
User.session.commit()
Account.session.add(user)
Account.session.commit()
def update_account_status_to_active(state_machine_data: Tuple[str, dict, User]):
def update_account_status_to_active(state_machine_data: Tuple[str, dict, Account]):
"""This function sets user's account to active.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
"""
user_input, ussd_session, user = state_machine_data
user.activate_account()
User.session.add(user)
User.session.commit()
Account.session.add(user)
Account.session.commit()
def process_gender_user_input(user: User, user_input: str):
def process_gender_user_input(user: Account, user_input: str):
"""
:param user:
:type user:
@@ -74,7 +74,7 @@ def process_gender_user_input(user: User, user_input: str):
return gender
def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict, User]):
def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict, Account]):
"""This function saves first name data to the ussd session in the redis cache.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -109,7 +109,7 @@ def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict,
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)
def format_user_metadata(metadata: dict, user: User):
def format_user_metadata(metadata: dict, user: Account):
"""
:param metadata:
:type metadata:
@@ -150,7 +150,7 @@ def format_user_metadata(metadata: dict, user: User):
}
def save_complete_user_metadata(state_machine_data: Tuple[str, dict, User]):
def save_complete_user_metadata(state_machine_data: Tuple[str, dict, Account]):
"""This function persists elements of the user metadata stored in session data
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: tuple
@@ -164,14 +164,14 @@ def save_complete_user_metadata(state_machine_data: Tuple[str, dict, User]):
user_metadata = format_user_metadata(metadata=metadata, user=user)
blockchain_address = user.blockchain_address
s_create_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.create_user_metadata',
s_create_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.create_person_metadata',
[blockchain_address, user_metadata]
)
s_create_user_metadata.apply_async(queue='cic-ussd')
s_create_person_metadata.apply_async(queue='cic-ussd')
def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, Account]):
user_input, ussd_session, user = state_machine_data
blockchain_address = user.blockchain_address
key = generate_metadata_pointer(
@@ -181,7 +181,7 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
user_metadata = get_cached_data(key=key)
if not user_metadata:
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
raise MetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
given_name = ussd_session.get('session_data').get('given_name')
family_name = ussd_session.get('session_data').get('family_name')
@@ -211,18 +211,18 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
edited_metadata = deserialized_person.serialize()
s_edit_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.edit_user_metadata',
[blockchain_address, edited_metadata, 'pgp']
s_edit_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.edit_person_metadata',
[blockchain_address, edited_metadata]
)
s_edit_user_metadata.apply_async(queue='cic-ussd')
s_edit_person_metadata.apply_async(queue='cic-ussd')
def get_user_metadata(state_machine_data: Tuple[str, dict, User]):
def get_user_metadata(state_machine_data: Tuple[str, dict, Account]):
user_input, ussd_session, user = state_machine_data
blockchain_address = user.blockchain_address
s_get_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_get_user_metadata.apply_async(queue='cic-ussd')

View File

@@ -7,14 +7,14 @@ from typing import Tuple
from cic_types.models.person import generate_metadata_pointer
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.redis import get_cached_data
logg = logging.getLogger()
def has_cached_user_metadata(state_machine_data: Tuple[str, dict, User]):
def has_cached_user_metadata(state_machine_data: Tuple[str, dict, Account]):
"""This function checks whether the attributes of the user's metadata constituting a profile are filled out.
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
@@ -29,7 +29,7 @@ def has_cached_user_metadata(state_machine_data: Tuple[str, dict, User]):
return user_metadata is not None
def is_valid_name(state_machine_data: Tuple[str, dict, User]):
def is_valid_name(state_machine_data: Tuple[str, dict, Account]):
"""This function checks that a user provided name is valid
:param state_machine_data: A tuple containing user input, a ussd session and user object.
:type state_machine_data: str
@@ -43,7 +43,7 @@ def is_valid_name(state_machine_data: Tuple[str, dict, User]):
return False
def is_valid_gender_selection(state_machine_data: Tuple[str, dict, User]):
def is_valid_gender_selection(state_machine_data: Tuple[str, dict, Account]):
"""
:param state_machine_data:
:type state_machine_data:

View File

@@ -13,7 +13,7 @@ class UssdStateMachine(Machine):
"""This class describes a finite state machine responsible for maintaining all the states that describe the ussd
menu as well as providing a means for navigating through these states based on different user inputs.
It defines different helper functions that co-ordinate with the stakeholder components of the ussd menu: i.e the
User, UssdSession, UssdMenu to facilitate user interaction with ussd menu.
Account, UssdSession, UssdMenu to facilitate user interaction with ussd menu.
:cvar states: A list of pre-defined states.
:type states: list
:cvar transitions: A list of pre-defined transitions.

View File

@@ -5,9 +5,24 @@ import celery
import sqlalchemy
# local imports
from cic_ussd.error import MetadataStoreError
from cic_ussd.db.models.base import SessionBase
class CriticalTask(celery.Task):
class BaseTask(celery.Task):
session_func = SessionBase.create_session
def create_session(self):
return BaseTask.session_func()
def log_banner(self):
logg.debug('task {} root uuid {}'.format(self.__class__.__name__, self.request.root_id))
return
class CriticalTask(BaseTask):
retry_jitter = True
retry_backoff = True
retry_backoff_max = 8
@@ -17,4 +32,11 @@ class CriticalSQLAlchemyTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
sqlalchemy.exc.ResourceClosedError,
)
class CriticalMetadataTask(CriticalTask):
autoretry_for = (
MetadataStoreError,
)

View File

@@ -9,7 +9,7 @@ import celery
# local imports
from cic_ussd.conversions import from_wei
from cic_ussd.db.models.base import SessionBase
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
from cic_ussd.account import define_account_tx_metadata
from cic_ussd.error import ActionDataNotFoundError
from cic_ussd.redis import InMemoryStore, cache_data, create_cached_data_key
@@ -49,10 +49,17 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
phone_number = account_creation_data.get('phone_number')
# create user
user = User(blockchain_address=result, phone_number=phone_number)
user = Account(blockchain_address=result, phone_number=phone_number)
session.add(user)
session.commit()
session.close()
queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'cic_ussd.tasks.metadata.add_phone_pointer',
[result, phone_number]
)
s.apply_async(queue=queue)
# expire cache
cache.expire(task_id, timedelta(seconds=180))
@@ -65,6 +72,8 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
session.close()
raise ActionDataNotFoundError(f'Account creation task: {task_id}, returned unexpected response: {status_code}')
session.close()
@celery_app.task
def process_incoming_transfer_callback(result: dict, param: str, status_code: int):
@@ -74,12 +83,12 @@ def process_incoming_transfer_callback(result: dict, param: str, status_code: in
# collect result data
recipient_blockchain_address = result.get('recipient')
sender_blockchain_address = result.get('sender')
token_symbol = result.get('token_symbol')
value = result.get('destination_value')
token_symbol = result.get('destination_token_symbol')
value = result.get('destination_token_value')
# try to find users in system
recipient_user = session.query(User).filter_by(blockchain_address=recipient_blockchain_address).first()
sender_user = session.query(User).filter_by(blockchain_address=sender_blockchain_address).first()
recipient_user = session.query(Account).filter_by(blockchain_address=recipient_blockchain_address).first()
sender_user = session.query(Account).filter_by(blockchain_address=sender_blockchain_address).first()
# check whether recipient is in the system
if not recipient_user:
@@ -118,6 +127,7 @@ def process_incoming_transfer_callback(result: dict, param: str, status_code: in
session.close()
raise ValueError(f'Unexpected status code: {status_code}.')
session.close()
@celery_app.task
def process_balances_callback(result: list, param: str, status_code: int):
@@ -161,7 +171,6 @@ def define_transaction_action_tag(
def process_statement_callback(result, param: str, status_code: int):
if status_code == 0:
# create session
session = SessionBase.create_session()
processed_transactions = []
# process transaction data to cache
@@ -174,12 +183,13 @@ def process_statement_callback(result, param: str, status_code: int):
if '0x0000000000000000000000000000000000000000' in source_token:
pass
else:
session = SessionBase.create_session()
# describe a processed transaction
processed_transaction = {}
# check if sender is in the system
sender: User = session.query(User).filter_by(blockchain_address=sender_blockchain_address).first()
owner: User = session.query(User).filter_by(blockchain_address=param).first()
sender: Account = session.query(Account).filter_by(blockchain_address=sender_blockchain_address).first()
owner: Account = session.query(Account).filter_by(blockchain_address=param).first()
if sender:
processed_transaction['sender_phone_number'] = sender.phone_number
@@ -195,13 +205,15 @@ def process_statement_callback(result, param: str, status_code: int):
processed_transaction['sender_phone_number'] = 'GRASSROOTS ECONOMICS'
# check if recipient is in the system
recipient: User = session.query(User).filter_by(blockchain_address=recipient_address).first()
recipient: Account = session.query(Account).filter_by(blockchain_address=recipient_address).first()
if recipient:
processed_transaction['recipient_phone_number'] = recipient.phone_number
else:
logg.warning(f'Tx with recipient not found in cic-ussd')
session.close()
# add transaction values
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value')).__str__()
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value')).__str__()

View File

@@ -1,20 +1,22 @@
# standard imports
import json
import logging
# third-party imports
import celery
from hexathon import strip_0x
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.person import PersonMetadata
from cic_ussd.metadata.phone import PhonePointerMetadata
from cic_ussd.tasks.base import CriticalMetadataTask
celery_app = celery.current_app
logg = logging.getLogger()
logg = logging.getLogger().getChild(__name__)
@celery_app.task
def query_user_metadata(blockchain_address: str):
def query_person_metadata(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
@@ -22,12 +24,12 @@ def query_user_metadata(blockchain_address: str):
:rtype:
"""
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.query()
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.query()
@celery_app.task
def create_user_metadata(blockchain_address: str, data: dict):
def create_person_metadata(blockchain_address: str, data: dict):
"""
:param blockchain_address:
:type blockchain_address:
@@ -37,12 +39,20 @@ def create_user_metadata(blockchain_address: str, data: dict):
:rtype:
"""
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.create(data=data)
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.create(data=data)
@celery_app.task
def edit_user_metadata(blockchain_address: str, data: bytes, engine: str):
def edit_person_metadata(blockchain_address: str, data: bytes):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.edit(data=data, engine=engine)
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.edit(data=data)
@celery_app.task(bind=True, base=CriticalMetadataTask)
def add_phone_pointer(self, blockchain_address: str, phone_number: str):
identifier = phone_number.encode('utf-8')
stripped_address = strip_0x(blockchain_address)
phone_metadata_client = PhonePointerMetadata(identifier=identifier)
phone_metadata_client.create(data=stripped_address)

View File

@@ -70,3 +70,4 @@ def persist_session_to_db(external_session_id: str):
session.close()
raise SessionNotFoundError('Session does not exist!')
session.close()

View File

@@ -8,7 +8,7 @@ from typing import Optional
def translation_for(key: str, preferred_language: Optional[str] = None, **kwargs) -> str:
"""
Translates text mapped to a specific YAML key into the user's set preferred language.
:param preferred_language: User's preferred language in which to view the ussd menu.
:param preferred_language: Account's preferred language in which to view the ussd menu.
:type preferred_language str
:param key: Key to a specific YAML test entry
:type key: str

View File

@@ -2,12 +2,13 @@
import logging
import os
import re
import ipaddress
# third-party imports
from confini import Config
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
logg = logging.getLogger(__file__)
@@ -21,7 +22,14 @@ def check_ip(config: Config, env: dict):
:return: Request IP validity
:rtype: boolean
"""
return env.get('REMOTE_ADDR') == config.get('APP_ALLOWED_IP')
# TODO: do once at boot time
actual_ip = ipaddress.ip_network(env.get('REMOTE_ADDR') + '/32')
for allowed_net_src in config.get('APP_ALLOWED_IP').split(','):
allowed_net = ipaddress.ip_network(allowed_net_src)
if actual_ip.subnet_of(allowed_net):
return True
return False
def check_request_content_length(config: Config, env: dict):
@@ -60,7 +68,7 @@ def check_known_user(phone: str):
:return: Is known phone number
:rtype: boolean
"""
user = User.session.query(User).filter_by(phone_number=phone).first()
user = Account.session.query(Account).filter_by(phone_number=phone).first()
return user is not None

View File

@@ -1,7 +1,7 @@
# standard imports
import semver
version = (0, 3, 0, 'alpha.8')
version = (0, 3, 0, 'alpha.9')
version_object = semver.VersionInfo(
major=version[0],

View File

@@ -51,4 +51,4 @@ RUN cd cic-ussd && \
COPY cic-ussd/.config/ /usr/local/etc/cic-ussd/
COPY cic-ussd/cic_ussd/db/migrations/ /usr/local/share/cic-ussd/alembic
WORKDIR /root
WORKDIR /root

View File

@@ -2,4 +2,4 @@
. /root/db.sh
/usr/local/bin/cic-ussd-tasker -vv "$@"
/usr/local/bin/cic-ussd-tasker $@

View File

@@ -2,4 +2,6 @@
. /root/db.sh
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :9000 --pyargv "-vv"
server_port=${SERVER_PORT:-9000}
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :$server_port --pyargv "$@"

View File

@@ -1,4 +1,4 @@
cic_base[full_graph]~=0.1.2a58
cic-eth~=0.11.0a4
cic-notify~=0.4.0a3
cic_base[full_graph]~=0.1.2a68
cic-eth~=0.11.0b3
cic-notify~=0.4.0a4
cic-types~=0.1.0a10

View File

@@ -4,19 +4,19 @@
import pytest
# platform imports
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
def test_user(init_database, set_fernet_key):
user = User(blockchain_address='0x417f5962fc52dc33ff0689659b25848680dec6dcedc6785b03d1df60fc6d5c51',
phone_number='+254700000000')
user = Account(blockchain_address='0x417f5962fc52dc33ff0689659b25848680dec6dcedc6785b03d1df60fc6d5c51',
phone_number='+254700000000')
user.create_password('0000')
session = User.session
session = Account.session
session.add(user)
session.commit()
queried_user = session.query(User).get(1)
queried_user = session.query(Account).get(1)
assert queried_user.blockchain_address == '0x417f5962fc52dc33ff0689659b25848680dec6dcedc6785b03d1df60fc6d5c51'
assert queried_user.phone_number == '+254700000000'
assert queried_user.failed_pin_attempts == 0
@@ -25,7 +25,7 @@ def test_user(init_database, set_fernet_key):
def test_user_state_transition(create_pending_user):
user = create_pending_user
session = User.session
session = Account.session
assert user.get_account_status() == 'PENDING'
user.activate_account()

View File

@@ -9,26 +9,26 @@ from cic_types.models.person import generate_metadata_pointer
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.person import PersonMetadata
from cic_ussd.redis import get_cached_data
def test_user_metadata(create_activated_user, define_metadata_pointer_url, load_config):
UserMetadata.base_url = load_config.get('CIC_META_URL')
PersonMetadata.base_url = load_config.get('CIC_META_URL')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
person_metadata_client = PersonMetadata(identifier=identifier)
assert user_metadata_client.url == define_metadata_pointer_url
assert person_metadata_client.url == define_metadata_pointer_url
def test_create_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
mock_meta_post_response,
person_metadata):
def test_create_person_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
mock_meta_post_response,
person_metadata):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
person_metadata_client = PersonMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
@@ -38,7 +38,7 @@ def test_create_user_metadata(caplog,
reason='CREATED',
content=json.dumps(mock_meta_post_response).encode('utf-8')
)
user_metadata_client.create(data=person_metadata)
person_metadata_client.create(data=person_metadata)
assert 'Get signed material response status: 201' in caplog.text
with pytest.raises(RuntimeError) as error:
@@ -49,19 +49,19 @@ def test_create_user_metadata(caplog,
status_code=400,
reason='BAD REQUEST'
)
user_metadata_client.create(data=person_metadata)
person_metadata_client.create(data=person_metadata)
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
def test_edit_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
person_metadata,
setup_metadata_signer):
def test_edit_person_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
person_metadata,
setup_metadata_signer):
Signer.gpg_passphrase = load_config.get('KEYS_PASSPHRASE')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
person_metadata_client = PersonMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'PUT',
@@ -69,7 +69,7 @@ def test_edit_user_metadata(caplog,
status_code=200,
reason='OK'
)
user_metadata_client.edit(data=person_metadata, engine='pgp')
person_metadata_client.edit(data=person_metadata)
assert 'Signed content submission status: 200' in caplog.text
with pytest.raises(RuntimeError) as error:
@@ -80,7 +80,7 @@ def test_edit_user_metadata(caplog,
status_code=400,
reason='BAD REQUEST'
)
user_metadata_client.edit(data=person_metadata, engine='pgp')
person_metadata_client.edit(data=person_metadata)
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
@@ -92,7 +92,7 @@ def test_get_user_metadata(caplog,
person_metadata,
setup_metadata_signer):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
person_metadata_client = PersonMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'GET',
@@ -101,7 +101,7 @@ def test_get_user_metadata(caplog,
content=json.dumps(person_metadata).encode('utf-8'),
reason='OK'
)
user_metadata_client.query()
person_metadata_client.query()
assert 'Get latest data status: 200' in caplog.text
key = generate_metadata_pointer(
identifier=identifier,
@@ -118,6 +118,6 @@ def test_get_user_metadata(caplog,
status_code=404,
reason='NOT FOUND'
)
user_metadata_client.query()
person_metadata_client.query()
assert 'The data is not available and might need to be added.' in caplog.text
assert str(error.value) == f'400 Client Error: NOT FOUND for url: {define_metadata_pointer_url}'

View File

@@ -15,7 +15,7 @@ from cic_ussd.state_machine.logic.user import (
get_user_metadata,
save_complete_user_metadata,
process_gender_user_input,
save_profile_attribute_to_session_data,
save_metadata_attribute_to_session_data,
update_account_status_to_active)
@@ -41,14 +41,14 @@ def test_update_account_status_to_active(create_pending_user, create_in_db_ussd_
("enter_location", "location", "Kangemi", "Kangemi"),
("enter_products", "products", "Mandazi", "Mandazi"),
])
def test_save_save_profile_attribute_to_session_data(current_state,
expected_key,
expected_result,
user_input,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session,
create_in_redis_ussd_session):
def test_save_metadata_attribute_to_session_data(current_state,
expected_key,
expected_result,
user_input,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session,
create_in_redis_ussd_session):
create_in_db_ussd_session.state = current_state
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
@@ -56,7 +56,7 @@ def test_save_save_profile_attribute_to_session_data(current_state,
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session['state'] = current_state
save_profile_attribute_to_session_data(state_machine_data=state_machine_data)
save_metadata_attribute_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
@@ -87,18 +87,18 @@ def test_format_user_metadata(create_activated_user,
def test_save_complete_user_metadata(celery_session_worker,
complete_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
mocker,
setup_chain_spec,
ussd_session_data):
complete_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
mocker,
setup_chain_spec,
ussd_session_data):
ussd_session = create_in_redis_ussd_session.get(ussd_session_data.get('external_session_id'))
ussd_session = json.loads(ussd_session)
ussd_session['session_data'] = complete_user_metadata
user_metadata = format_user_metadata(metadata=ussd_session.get('session_data'), user=create_activated_user)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_user_metadata.apply_async')
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_person_metadata.apply_async')
save_complete_user_metadata(state_machine_data=state_machine_data)
mocked_create_metadata_task.assert_called_with(
(user_metadata, create_activated_user.blockchain_address),
@@ -127,7 +127,7 @@ def test_edit_user_metadata_attribute(celery_session_worker,
}
state_machine_data = ('', ussd_session, create_activated_user)
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_user_metadata.apply_async')
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_person_metadata.apply_async')
edit_user_metadata_attribute(state_machine_data=state_machine_data)
person_metadata['location']['area_name'] = 'nairobi'
mocked_edit_metadata.assert_called_with(
@@ -146,7 +146,7 @@ def test_get_user_metadata_attribute(celery_session_worker,
ussd_session = json.loads(ussd_session)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_user_metadata.apply_async')
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_person_metadata.apply_async')
get_user_metadata(state_machine_data=state_machine_data)
mocked_get_metadata.assert_called_with(
(create_activated_user.blockchain_address,),

View File

@@ -8,7 +8,7 @@ import celery
import pytest
# local imports
from cic_ussd.db.models.user import User
from cic_ussd.db.models.account import Account
from cic_ussd.error import ActionDataNotFoundError
from cic_ussd.conversions import from_wei
@@ -29,7 +29,7 @@ def test_successful_process_account_creation_callback_task(account_creation_acti
# WARNING: [THE SETTING OF THE ROOT ID IS A HACK AND SHOULD BE REVIEWED OR IMPROVED]
mocked_task_request.root_id = task_id
user = init_database.query(User).filter_by(phone_number=phone_number).first()
user = init_database.query(Account).filter_by(phone_number=phone_number).first()
assert user is None
redis_cache = init_redis_cache
@@ -48,7 +48,7 @@ def test_successful_process_account_creation_callback_task(account_creation_acti
)
s_process_callback_request.apply_async().get()
user = init_database.query(User).filter_by(phone_number=phone_number).first()
user = init_database.query(Account).filter_by(phone_number=phone_number).first()
assert user.blockchain_address == result
action_data = redis_cache.get(task_id)

Some files were not shown because too many files have changed in this diff Show More