Compare commits

..

11 Commits

Author SHA1 Message Date
nolash
428c0983f6 Merge remote-tracking branch 'origin/master' into lash/external-chain-queue 2021-04-05 17:02:09 +02:00
nolash
e2879cd22f Add db session creator to retrier backend 2021-04-05 16:56:20 +02:00
nolash
a32955cc6a Add tracker tx filter test 2021-04-05 12:21:37 +02:00
nolash
b1e228b8e3 Bump cic-eth, cic-ussd versions 2021-04-04 19:34:13 +02:00
nolash
6210fe05ef Update use chain_spec instead of chain_id for tx factory, unpack 2021-04-04 14:36:17 +02:00
nolash
65d3efd72b Move nonce, gas tasks to respective modules, chainspec instead of chainid for chainlib tx 2021-04-04 13:27:28 +02:00
nolash
d3ccac7dac Bump version 2021-04-03 07:43:44 +02:00
nolash
0079fc3fbb Upgrade deps 2021-04-03 00:23:42 +02:00
nolash
08fba819e4 WIP implement chainqueue in daemons 2021-04-03 00:16:31 +02:00
nolash
ffe7416040 Rehabilitate tests, tx_list tests still failing 2021-04-02 23:26:13 +02:00
nolash
68e02ba5b8 WIP Expunge obsolete db migrations, bring in chainqueue 2021-04-02 18:45:51 +02:00
114 changed files with 1883 additions and 4452 deletions

View File

@@ -26,10 +26,9 @@ from chainlib.eth.block import (
from hexathon import (
strip_0x,
)
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.backend import SyncerBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
@@ -71,21 +70,19 @@ def main():
syncers = []
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
#if SyncerBackend.first(chain_spec):
# backend = SyncerBackend.initial(chain_spec, block_offset)
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncers.append(SQLBackend.initial(chain_spec, block_offset))
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend))
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')

View File

@@ -47,9 +47,6 @@ RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server

View File

@@ -1,6 +0,0 @@
#!/bin/bash
set -e
>&2 echo executing database migration
python scripts/migrate.py -c /usr/local/etc/cic-cache --migrations-dir /usr/local/share/cic-cache/alembic -vv
set +e

View File

@@ -1,5 +0,0 @@
#!/bin/bash
. ./db.sh
/usr/local/bin/cic-cache-trackerd $@

View File

@@ -1,12 +1,13 @@
cic-base~=0.1.2a77
cic-base~=0.1.2a58
alembic==1.4.2
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.4a16
cic-eth-registry~=0.5.4a10
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainsyncer[sql]~=0.0.2a2
chainlib~=0.0.2a2
chainsyncer~=0.0.1a21

View File

@@ -10,7 +10,6 @@ from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger()
@@ -65,7 +64,6 @@ class SessionBase(Model):
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
@@ -76,22 +74,17 @@ class SessionBase(Model):
echo=debug,
)
else:
if pool_size == 0:
logg.info('db using nullpool')
poolclass = NullPool
elif debug:
logg.info('db using assertion pool')
if debug:
poolclass = AssertionPool
else:
logg.info('db using static pool')
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db not poolable')
e = create_engine(
dsn,
echo=debug,

View File

@@ -61,8 +61,8 @@ def balance(tokens, holder_address, chain_spec_dict):
for t in tokens:
address = t['address']
token = ERC20Token(chain_spec, rpc, address)
c = ERC20(chain_spec)
token = ERC20Token(rpc, address)
c = ERC20()
o = c.balance_of(address, holder_address, sender_address=caller_address)
r = rpc.do(o)
t['balance_network'] = c.parse_balance(r)

View File

@@ -171,7 +171,6 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
except NotFoundEthException as e:
pass
# TODO: apply receipt in tx object to validate and normalize input
if rcpt != None:
success = rcpt['status'] == 1
logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success))
@@ -181,7 +180,6 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
[
tx_hash_hex,
rcpt['blockNumber'],
rcpt['transactionIndex'],
not success,
],
queue=queue,

View File

@@ -23,7 +23,7 @@ def translate_address(address, trusted_addresses, chain_spec, sender_address=ZER
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
c = AddressDeclarator(chain_spec)
c = AddressDeclarator()
for trusted_address in trusted_addresses:
o = c.declaration(declarator_address, trusted_address, address, sender_address=sender_address)

View File

@@ -20,10 +20,10 @@ def set_sent(chain_spec_dict, tx_hash, fail=False):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_final(chain_spec_dict, tx_hash, block=None, tx_index=None, fail=False):
def set_final(chain_spec_dict, tx_hash, block=None, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
session = SessionBase.create_session()
r = chainqueue.state.set_final(chain_spec, tx_hash, block=block, tx_index=tx_index, fail=fail, session=session)
r = chainqueue.state.set_final(chain_spec, tx_hash, block, fail, session=session)
session.close()
return r

View File

@@ -15,6 +15,7 @@ from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
from chainlib.connection import RPCConnection
from chainsyncer.error import SyncDone
from hexathon import strip_0x
from chainqueue.db.enum import (
StatusEnum,
@@ -152,7 +153,10 @@ class DispatchSyncer:
def main():
syncer = DispatchSyncer(chain_spec)
conn = RPCConnection.connect(chain_spec, 'default')
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
try:
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
except SyncDone as e:
sys.stderr.write("dispatcher done at block {}\n".format(e))
sys.exit(0)

View File

@@ -39,7 +39,6 @@ class TxFilter(SyncFilter):
self.chain_spec.asdict(),
add_0x(tx_hash_hex),
tx.block.number,
tx.index,
tx.status == Status.ERROR,
],
queue=self.queue,

View File

@@ -20,7 +20,6 @@ from cic_eth.admin.ctrl import lock_send
from cic_eth.db.enum import LockEnum
from cic_eth.runnable.daemons.filters.straggler import StragglerFilter
from cic_eth.sync.retry import RetrySyncer
from cic_eth.stat import init_chain_stat
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -72,21 +71,57 @@ RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='def
dsn = dsn_from_config(config)
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
## TODO: we already have the signed raw tx in get, so its a waste of cycles to get_tx here
#def sendfail_filter(w3, tx_hash, rcpt, chain_spec):
# tx_dict = get_tx(tx_hash)
# tx = unpack(tx_dict['signed_tx'], chain_spec)
# logg.debug('submitting tx {} for retry'.format(tx_hash))
# s_check = celery.signature(
# 'cic_eth.admin.ctrl.check_lock',
[
# tx_hash,
# chain_str,
# LockEnum.QUEUE,
# tx['from'],
# ],
# queue=queue,
# )
## s_resume = celery.signature(
## 'cic_eth.eth.tx.resume_tx',
## [
## chain_str,
## ],
## queue=queue,
## )
#
## s_retry_status = celery.signature(
## 'cic_eth.queue.state.set_ready',
## [],
## queue=queue,
## )
# s_resend = celery.signature(
# 'cic_eth.eth.gas.resend_with_higher_gas',
# [
# chain_str,
# ],
# queue=queue,
# )
#
# #s_resume.link(s_retry_status)
# #s_check.link(s_resume)
# s_check.link(s_resend)
# s_check.apply_async()
def main():
conn = RPCConnection.connect(chain_spec, 'default')
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None:
stat = init_chain_stat(conn)
loop_interval = stat.block_average()
syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE'))
syncer.backend.set(0, 0)
fltr = StragglerFilter(chain_spec, queue=queue)
syncer.add_filter(fltr)
syncer.loop(int(loop_interval), conn)
syncer.loop(float(straggler_delay), conn)
if __name__ == '__main__':

View File

@@ -91,7 +91,7 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
SessionBase.connect(dsn, pool_size=50, debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query
session = SessionBase.create_session()

View File

@@ -7,7 +7,7 @@ import argparse
import sys
import re
# external imports
# third-party imports
import confini
import celery
import rlp
@@ -26,7 +26,7 @@ from chainlib.eth.block import (
from hexathon import (
strip_0x,
)
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.backend import SyncerBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
@@ -42,7 +42,6 @@ from cic_eth.runnable.daemons.filters import (
RegistrationFilter,
TransferAuthFilter,
)
from cic_eth.stat import init_chain_stat
script_dir = os.path.realpath(os.path.dirname(__file__))
@@ -67,6 +66,7 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
#RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
cic_base.rpc.setup(chain_spec, config.get('ETH_PROVIDER'))
def main():
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
@@ -76,30 +76,24 @@ def main():
o = block_latest()
r = rpc.do(o)
block_current = int(r, 16)
block_offset = block_current + 1
loop_interval = config.get('SYNCER_LOOP_INTERVAL')
if loop_interval == None:
stat = init_chain_stat(rpc, block_start=block_current)
loop_interval = stat.block_average()
block_offset = int(strip_0x(r), 16) + 1
logg.debug('starting at block {}'.format(block_offset))
syncers = []
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
#if SyncerBackend.first(chain_spec):
# backend = SyncerBackend.initial(chain_spec, block_offset)
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
syncer_backends.append(SQLBackend.live(chain_spec, block_offset+1))
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
for syncer_backend in syncer_backends:
try:
@@ -146,8 +140,7 @@ def main():
for cf in callback_filters:
syncer.add_filter(cf)
#r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
r = syncer.loop(int(loop_interval), rpc)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1

View File

@@ -1,33 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.stat import ChainStat
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
logg = logging.getLogger().getChild(__name__)
BLOCK_SAMPLES = 10
def init_chain_stat(rpc, block_start=0):
stat = ChainStat()
if block_start == 0:
o = block_latest()
r = rpc.do(o)
block_start = int(r, 16)
for i in range(BLOCK_SAMPLES):
o = block_by_number(block_start-10+i)
block_src = rpc.do(o)
logg.debug('block {}'.format(block_src))
block = Block(block_src)
stat.block_apply(block)
logg.debug('calculated block time {} from {} block samples'.format(stat.block_average(), BLOCK_SAMPLES))
return stat

View File

@@ -4,7 +4,7 @@ import datetime
# external imports
from chainsyncer.driver import HeadSyncer
from chainsyncer.backend.memory import MemBackend
from chainsyncer.backend import MemBackend
from chainsyncer.error import NoBlockForYou
from chainlib.eth.block import (
block_by_number,

View File

@@ -10,7 +10,7 @@ version = (
0,
11,
0,
'beta.3',
'alpha.4',
)
version_object = semver.VersionInfo(

View File

@@ -6,5 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
POOL_SIZE=50
DEBUG=0

View File

@@ -2,4 +2,3 @@
registry_address =
chain_spec = evm:bloxberg:8996
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C
tx_retry_delay = 20

View File

@@ -6,5 +6,4 @@ HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
POOL_SIZE=50
DEBUG=0

View File

@@ -1,2 +1,2 @@
[SYNCER]
loop_interval =
loop_interval = 1

View File

@@ -1,2 +1,2 @@
[SYNCER]
loop_interval =
loop_interval = 1

View File

@@ -29,8 +29,7 @@ RUN /usr/local/bin/python -m pip install --upgrade pip
# python merge_requirements.py | tee merged_requirements.txt
#RUN cd cic-base && \
# pip install $pip_extra_index_url_flag -r ./merged_requirements.txt
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a76
RUN pip install $pip_extra_index_url_flag chainsyncer~=0.0.2a2
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a60
COPY cic-eth/scripts/ scripts/
COPY cic-eth/setup.cfg cic-eth/setup.py ./

View File

@@ -1,8 +1,8 @@
cic-base~=0.1.2a67
cic-base~=0.1.2a60
celery==4.4.7
crypto-dev-signer~=0.4.14a17
confini~=0.3.6rc3
cic-eth-registry~=0.5.4a13
cic-eth-registry~=0.5.4a11
#cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2
@@ -16,10 +16,10 @@ semver==2.13.0
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.1a7
chainlib~=0.0.2a5
chainlib~=0.0.2a4
hexathon~=0.0.1a7
chainsyncer[sql]~=0.0.2a2
chainqueue~=0.0.1a7
chainsyncer~=0.0.1a21
chainqueue~=0.0.1a5
pysha3==1.0.2
coincurve==15.0.0
sarafu-faucet~=0.0.2a19
sarafu-faucet~=0.0.2a16

View File

@@ -37,7 +37,7 @@ def test_tx(
eth_rpc,
eth_signer,
agent_roles,
celery_session_worker,
celery_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')

View File

@@ -37,7 +37,7 @@ def test_tx(
eth_rpc,
eth_signer,
agent_roles,
celery_session_worker,
celery_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')

View File

@@ -38,12 +38,11 @@ def test_transfer_api(
agent_roles,
cic_registry,
register_tokens,
register_lookups,
celery_session_worker,
):
#token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
foo_token_cache = ERC20Token(default_chain_spec, eth_rpc, foo_token)
foo_token_cache = ERC20Token(eth_rpc, foo_token)
api = Api(str(default_chain_spec), callback_param='transfer', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.transfer(custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], 1024, foo_token_cache.symbol)

View File

@@ -116,7 +116,7 @@ def test_register_account(
init_eth_tester.mine_block()
c = AccountRegistry(default_chain_spec)
c = AccountRegistry()
o = c.have(account_registry, eth_empty_accounts[0], sender_address=call_sender)
r = eth_rpc.do(o)
assert int(strip_0x(r), 16) == 1

View File

@@ -43,7 +43,7 @@ def test_filter_process(
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
init_eth_tester.mine_blocks(13)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 1024)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
@@ -56,7 +56,7 @@ def test_filter_process(
# external tx
init_eth_tester.mine_blocks(28)
c = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 512)
eth_rpc.do(o)
o = receipt(tx_hash_hex)

View File

@@ -1,6 +1,6 @@
{
"name": "cic-client-meta",
"version": "0.0.7-alpha.6",
"version": "0.0.7-alpha.3",
"description": "Signed CRDT metadata graphs for the CIC network",
"main": "dist/index.js",
"types": "dist/index.d.ts",
@@ -40,6 +40,6 @@
],
"license": "GPL-3.0-or-later",
"engines": {
"node": "~14.16.1"
"node": "~15.3.0"
}
}

View File

@@ -114,7 +114,6 @@ async function processRequest(req, res) {
return;
}
if (!['PUT', 'GET', 'POST'].includes(req.method)) {
res.writeHead(405, {"Content-Type": "text/plain"});
res.end();
@@ -124,7 +123,6 @@ async function processRequest(req, res) {
try {
digest = parseDigest(req.url);
} catch(e) {
console.error('digest error: ' + e)
res.writeHead(400, {"Content-Type": "text/plain"});
res.end();
return;

View File

@@ -1,12 +1,12 @@
import { ArgPair, Syncable } from '../sync';
import { Addressable, mergeKey } from '../digest';
import { Addressable, addressToBytes, bytesToHex, toKey } from '../digest';
class Phone extends Syncable implements Addressable {
address: string
value: number
constructor(address:string, v:string) {
constructor(address:string, v:number) {
const o = {
msisdn: v,
}
@@ -17,8 +17,8 @@ class Phone extends Syncable implements Addressable {
});
}
public static async toKey(msisdn:string) {
return await mergeKey(Buffer.from(msisdn), Buffer.from(':cic.phone'));
public static async toKey(msisdn:number) {
return await toKey(msisdn.toString(), ':cic.msisdn');
}
public key(): string {

View File

@@ -61,7 +61,6 @@ function addressToBytes(s:string) {
export {
toKey,
toAddressKey,
mergeKey,
bytesToHex,
addressToBytes,
Addressable,

View File

@@ -3,7 +3,6 @@ import logging
import re
# third-party imports
from celery.app.control import Inspect
import celery
# local imports
@@ -16,29 +15,6 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
@@ -46,9 +22,17 @@ class Api:
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
registered_tasks = app.tasks
self.sms_tasks = []
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
@@ -60,17 +44,12 @@ class Api:
:rtype: Celery.Task
"""
signatures = []
for q in self.sms_tasks:
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
)
for task in self.sms_tasks:
signature = celery.signature(task)
signatures.append(signature)
t = celery.group(signatures)()
return t
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result

View File

@@ -1,76 +0,0 @@
# standard imports
import sys
import os
import logging
import argparse
import tempfile
# external imports
import celery
import confini
# local imports
from cic_notify.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('recipient', type=str, help='notification recipient')
argparser.add_argument('message', type=str, help='message text')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.add(args.recipient, '_RECIPIENT', True)
config.add(args.message, '_MESSAGE', True)
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
if __name__ == '__main__':
a = Api()
t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))

View File

@@ -1,4 +1,4 @@
FROM python:3.8.6-slim-buster
FROM python:3.8.6
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps
@@ -6,7 +6,7 @@ RUN apt-get update && \
WORKDIR /usr/src/cic-notify
ARG pip_extra_index_url_flag='--index https://pypi.org/simple --extra-index-url https://pip.grassrootseconomics.net:8433'
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a62
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a44
COPY cic-notify/setup.cfg \
cic-notify/setup.py \

View File

@@ -1 +1 @@
cic_base[full_graph]~=0.1.2a61
cic_base[full_graph]~=0.1.2a46

View File

@@ -1,6 +1,6 @@
[metadata]
name = cic-notify
version= 0.4.0a3
version= 0.4.0a2
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no
@@ -45,4 +45,3 @@ testing =
[options.entry_points]
console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main

View File

@@ -1,24 +1,14 @@
[app]
ALLOWED_IP=0.0.0.0/0
ALLOWED_IP=127.0.0.1
LOCALE_FALLBACK=en
LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/
LOCALE_PATH=var/lib/locale/
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json
user =
pass =
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/
[client]
host =
port =
ssl =

View File

@@ -6,5 +6,3 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
POOL_SIZE=1

View File

@@ -1,9 +1,9 @@
[celery]
BROKER_URL=redis://redis:6379
RESULT_URL=redis://redis:6379
BROKER_URL=redis://
RESULT_URL=redis://
[redis]
HOSTNAME=redis
HOSTNAME=localhost
PASSWORD=
PORT=6379
DATABASE=0

View File

@@ -27,7 +27,7 @@ def define_account_tx_metadata(user: User):
if account_metadata:
account_metadata = json.loads(account_metadata)
person = Person()
deserialized_person = person.deserialize(person_data=account_metadata)
deserialized_person = person.deserialize(metadata=account_metadata)
given_name = deserialized_person.given_name
family_name = deserialized_person.family_name
phone_number = deserialized_person.tel
@@ -46,4 +46,4 @@ def retrieve_account_statement(blockchain_address: str):
callback_task='cic_ussd.tasks.callback_handler.process_statement_callback',
callback_param=blockchain_address
)
cic_eth_api.list(address=blockchain_address, limit=9)
result = cic_eth_api.list(address=blockchain_address, limit=9)

View File

@@ -1,129 +1,47 @@
# stanard imports
import logging
# standard imports
import datetime
# external imports
# third-party imports
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger().getChild(__name__)
Model = declarative_base(name='Model')
class SessionBase(Model):
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
"""
__abstract__ = True
id = Column(Integer, primary_key=True)
created = Column(DateTime, default=datetime.datetime.utcnow)
updated = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
id = Column(Integer, primary_key=True)
engine = None
"""Database connection engine of the running aplication"""
sessionmaker = None
"""Factory object responsible for creating sessions from the connection pool"""
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
session = None
query = None
@staticmethod
def create_session():
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
session = sessionmaker(bind=SessionBase.engine)
return session()
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
"""
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@staticmethod
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
:type dsn: str
"""
e = None
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if pool_size == 0:
poolclass = NullPool
elif debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db connection not poolable')
e = create_engine(
dsn,
echo=debug,
)
SessionBase._set_engine(e)
def build():
Model.metadata.create_all(bind=SessionBase.engine)
@staticmethod
# https://docs.sqlalchemy.org/en/13/core/pooling.html#pool-disconnects
def connect(data_source_name):
engine = create_engine(data_source_name, pool_pre_ping=True)
SessionBase._set_engine(engine)
@staticmethod
def disconnect():
"""Disconnect from database and free resources.
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()

View File

@@ -128,8 +128,8 @@
},
"22": {
"description": "Pin entry menu.",
"display_key": "ussd.kenya.display_metadata_pin_authorization",
"name": "display_metadata_pin_authorization",
"display_key": "ussd.kenya.standard_pin_authorization",
"name": "standard_pin_authorization",
"parent": "start"
},
"23": {
@@ -230,22 +230,9 @@
},
"39": {
"description": "Menu to instruct users to call the office.",
"display_key": "ussd.kenya.help",
"display_key": "ussd.key.help",
"name": "help",
"parent": null
},
"40": {
"description": "Menu to display a user's entire profile",
"display_key": "ussd.kenya.display_user_metadata",
"name": "display_user_metadata",
"parent": "account_management"
},
"41": {
"description": "The recipient is not in the system",
"display_key": "ussd.kenya.exit_invalid_recipient",
"name": "exit_invalid_recipient",
"parent": null
}
}
}

View File

@@ -18,7 +18,7 @@ class ActionDataNotFoundError(OSError):
pass
class MetadataNotFoundError(OSError):
class UserMetadataNotFoundError(OSError):
"""Raised when metadata is expected but not available in cache."""
pass
@@ -31,10 +31,3 @@ class UnsupportedMethodError(OSError):
class CachedDataNotFoundError(OSError):
"""Raised when the method passed to the make request function is unsupported."""
pass
class MetadataStoreError(Exception):
"""Raised when metadata storage fails"""
pass

View File

@@ -3,10 +3,7 @@
# third-party imports
import requests
from chainlib.eth.address import to_checksum
from hexathon import (
add_0x,
strip_0x,
)
from hexathon import add_0x
# local imports
from cic_ussd.error import UnsupportedMethodError
@@ -43,4 +40,4 @@ def blockchain_address_to_metadata_pointer(blockchain_address: str):
:return:
:rtype:
"""
return bytes.fromhex(strip_0x(blockchain_address))
return bytes.fromhex(blockchain_address[2:])

View File

@@ -1,126 +0,0 @@
# standard imports
import json
import logging
import os
from typing import Dict, Union
# third-part imports
import requests
from cic_types.models.person import generate_metadata_pointer, Person
# local imports
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
from cic_ussd.error import MetadataStoreError
logg = logging.getLogger().getChild(__name__)
class Metadata:
"""
:cvar base_url: The base url or the metadata server.
:type base_url: str
"""
base_url = None
def metadata_http_error_handler(result: requests.Response):
""" This function handles and appropriately raises errors from http requests interacting with the metadata server.
:param result: The response object from a http request.
:type result: requests.Response
"""
status_code = result.status_code
if 100 <= status_code < 200:
raise MetadataStoreError(f'Informational errors: {status_code}, reason: {result.reason}')
elif 300 <= status_code < 400:
raise MetadataStoreError(f'Redirect Issues: {status_code}, reason: {result.reason}')
elif 400 <= status_code < 500:
raise MetadataStoreError(f'Client Error: {status_code}, reason: {result.reason}')
elif 500 <= status_code < 600:
raise MetadataStoreError(f'Server Error: {status_code}, reason: {result.reason}')
class MetadataRequestsHandler(Metadata):
def __init__(self, cic_type: str, identifier: bytes, engine: str = 'pgp'):
"""
:param cic_type: The salt value with which to hash a specific metadata identifier.
:type cic_type: str
:param engine: Encryption used for sending data to the metadata server.
:type engine: str
:param identifier: A unique element of data in bytes necessary for creating a metadata pointer.
:type identifier: bytes
"""
self.cic_type = cic_type
self.engine = engine
self.headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
self.identifier = identifier
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier,
cic_type=self.cic_type
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: Union[Dict, str]):
""" This function is responsible for posting data to the metadata server with a corresponding metadata pointer
for storage.
:param data: The data to be stored in the metadata server.
:type data: dict|str
"""
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata_http_error_handler(result=result)
metadata = result.content
self.edit(data=metadata)
def edit(self, data: bytes):
""" This function is responsible for editing data in the metadata server corresponding to a unique pointer.
:param data: The data to be edited in the metadata server.
:type data: bytes
"""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
decoded_data = data.decode('utf-8')
formatted_data = {
'm': data.decode('utf-8'),
's': {
'engine': self.engine,
'algo': algorithm,
'data': signature,
'digest': json.loads(data).get('digest'),
}
}
formatted_data = json.dumps(formatted_data).encode('utf-8')
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'signed metadata submission status: {result.status_code}.')
metadata_http_error_handler(result=result)
try:
decoded_identifier = self.identifier.decode("utf-8")
except UnicodeDecodeError:
decoded_identifier = self.identifier.hex()
logg.info(f'identifier: {decoded_identifier}. metadata pointer: {self.metadata_pointer} set to: {decoded_data}.')
def query(self):
"""This function is responsible for querying the metadata server for data corresponding to a unique pointer."""
result = make_request(method='GET', url=self.url)
metadata_http_error_handler(result=result)
response_data = result.content
data = json.loads(response_data.decode('utf-8'))
if result.status_code == 200 and self.cic_type == 'cic.person':
person = Person()
deserialized_person = person.deserialize(person_data=json.loads(data))
data = json.dumps(deserialized_person.serialize())
cache_data(self.metadata_pointer, data=data)
logg.debug(f'caching: {data} with key: {self.metadata_pointer}')

View File

@@ -1,12 +0,0 @@
# standard imports
# third-party imports
# local imports
from .base import MetadataRequestsHandler
class PersonMetadata(MetadataRequestsHandler):
def __init__(self, identifier: bytes):
super().__init__(cic_type='cic.person', identifier=identifier)

View File

@@ -1,13 +0,0 @@
# standard imports
import logging
# external imports
# local imports
from .base import MetadataRequestsHandler
class PhonePointerMetadata(MetadataRequestsHandler):
def __init__(self, identifier: bytes):
super().__init__(cic_type='cic.msisdn', identifier=identifier)

View File

@@ -44,7 +44,7 @@ class Signer:
gpg_keys = self.gpg.list_keys()
key_algorithm = gpg_keys[0].get('algo')
key_id = gpg_keys[0].get("keyid")
logg.debug(f'using signing key: {key_id}, algorithm: {key_algorithm}')
logg.info(f'using signing key: {key_id}, algorithm: {key_algorithm}')
return gpg_keys[0]
def sign_digest(self, data: bytes):

View File

@@ -0,0 +1,102 @@
# standard imports
import json
import logging
import os
# third-party imports
import requests
from cic_types.models.person import generate_metadata_pointer, Person
# local imports
from cic_ussd.chain import Chain
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
logg = logging.getLogger()
class UserMetadata:
"""
:cvar base_url:
:type base_url:
"""
base_url = None
def __init__(self, identifier: bytes):
"""
:param identifier:
:type identifier:
"""
self. headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
self.identifier = identifier
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier,
cic_type='cic.person'
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: dict):
try:
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata = result.content
self.edit(data=metadata, engine='pgp')
logg.info(f'Get sign material response status: {result.status_code}')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def edit(self, data: bytes, engine: str):
"""
:param data:
:type data:
:param engine:
:type engine:
:return:
:rtype:
"""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
formatted_data = {
'm': data.decode('utf-8'),
's': {
'engine': engine,
'algo': algorithm,
'data': signature,
'digest': json.loads(data).get('digest'),
}
}
formatted_data = json.dumps(formatted_data).encode('utf-8')
try:
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'Signed content submission status: {result.status_code}.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def query(self):
result = make_request(method='GET', url=self.url)
status = result.status_code
logg.info(f'Get latest data status: {status}')
try:
if status == 200:
response_data = result.content
data = json.loads(response_data.decode())
# validate data
person = Person()
deserialized_person = person.deserialize(metadata=json.loads(data))
cache_data(key=self.metadata_pointer, data=json.dumps(deserialized_person.serialize()))
elif status == 404:
logg.info('The data is not available and might need to be added.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)

View File

@@ -15,7 +15,6 @@ from cic_ussd.balance import BalanceManager, compute_operational_balance, get_ca
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.phone_number import get_user_by_phone_number
@@ -23,7 +22,6 @@ from cic_ussd.redis import cache_data, create_cached_data_key, get_cached_data
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.conversions import to_wei, from_wei
from cic_ussd.translation import translation_for
from cic_types.models.person import generate_metadata_pointer, get_contact_data_from_vcard
logg = logging.getLogger(__name__)
@@ -138,7 +136,7 @@ def process_transaction_pin_authorization(user: User, display_key: str, ussd_ses
tx_sender_information = define_account_tx_metadata(user=user)
token_symbol = 'SRF'
user_input = ussd_session.get('session_data').get('transaction_amount')
user_input = ussd_session.get('user_input').split('*')[-1]
transaction_amount = to_wei(value=int(user_input))
logg.debug('Requires integration to determine user tokens.')
return process_pin_authorization(
@@ -189,55 +187,21 @@ def format_transactions(transactions: list, preferred_language: str):
value = transaction.get('to_value')
timestamp = transaction.get('timestamp')
action_tag = transaction.get('action_tag')
direction = transaction.get('direction')
token_symbol = 'SRF'
if action_tag == 'SENT' or action_tag == 'ULITUMA':
formatted_transactions += f'{action_tag} {value} {token_symbol} {direction} {recipient_phone_number} {timestamp}.\n'
formatted_transactions += f'{action_tag} {value} {token_symbol} {recipient_phone_number} {timestamp}.\n'
else:
formatted_transactions += f'{action_tag} {value} {token_symbol} {direction} {sender_phone_number} {timestamp}. \n'
formatted_transactions += f'{action_tag} {value} {token_symbol} {sender_phone_number} {timestamp}. \n'
return formatted_transactions
else:
if preferred_language == 'en':
formatted_transactions = 'NO TRANSACTION HISTORY'
formatted_transactions = 'Empty'
else:
formatted_transactions = 'HAMNA RIPOTI YA MATUMIZI'
formatted_transactions = 'Hamna historia'
return formatted_transactions
def process_display_user_metadata(user: User, display_key: str):
"""
:param user:
:type user:
:param display_key:
:type display_key:
"""
key = generate_metadata_pointer(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
)
user_metadata = get_cached_data(key)
if user_metadata:
user_metadata = json.loads(user_metadata)
contact_data = get_contact_data_from_vcard(vcard=user_metadata.get('vcard'))
logg.debug(f'{contact_data}')
full_name = f'{contact_data.get("given")} {contact_data.get("family")}'
gender = user_metadata.get('gender')
products = ', '.join(user_metadata.get('products'))
location = user_metadata.get('location').get('area_name')
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
full_name=full_name,
gender=gender,
location=location,
products=products
)
else:
raise MetadataNotFoundError(f'Expected person metadata but found none in cache for key: {key}')
def process_account_statement(user: User, display_key: str, ussd_session: dict):
"""
:param user:
@@ -265,7 +229,7 @@ def process_account_statement(user: User, display_key: str, ussd_session: dict):
middle_transaction_set += transactions[3:][:3]
first_transaction_set += transactions[:3]
# there are probably much cleaner and operational inexpensive ways to do this so find them
elif 3 < len(transactions) < 7:
elif 4 < len(transactions) < 7:
middle_transaction_set += transactions[3:]
first_transaction_set += transactions[:3]
else:
@@ -331,11 +295,11 @@ def process_start_menu(display_key: str, user: User):
operational_balance = compute_operational_balance(balances=balances_data)
# retrieve and cache account's metadata
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
s_query_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
[blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
s_query_user_metadata.apply_async(queue='cic-ussd')
# retrieve and cache account's statement
retrieve_account_statement(blockchain_address=blockchain_address)
@@ -385,24 +349,18 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
if user.has_valid_pin():
last_ussd_session = retrieve_most_recent_ussd_session(phone_number=user.phone_number)
key = generate_metadata_pointer(
key = create_cached_data_key(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
salt='cic.person'
)
person_metadata = get_cached_data(key=key)
user_metadata = get_cached_data(key=key)
if last_ussd_session:
# get last state
last_state = last_ussd_session.state
logg.debug(f'LAST USSD SESSION STATE: {last_state}')
# if last state is account_creation_prompt and metadata exists, show start menu
if last_state in [
'account_creation_prompt',
'exit',
'exit_invalid_pin',
'exit_invalid_new_pin',
'exit_pin_mismatch',
'exit_invalid_request'
] and person_metadata is not None:
if last_state == 'account_creation_prompt' and user_metadata is not None:
return UssdMenu.find_by_name(name='start')
else:
return UssdMenu.find_by_name(name=last_state)
@@ -462,13 +420,9 @@ def custom_display_text(
return process_start_menu(display_key=display_key, user=user)
elif 'pin_authorization' in menu_name:
return process_pin_authorization(display_key=display_key, user=user)
elif 'enter_current_pin' in menu_name:
return process_pin_authorization(display_key=display_key, user=user)
elif menu_name == 'account_balances':
return process_account_balances(display_key=display_key, user=user, ussd_session=ussd_session)
elif 'transaction_set' in menu_name:
return process_account_statement(display_key=display_key, user=user, ussd_session=ussd_session)
elif menu_name == 'display_user_metadata':
return process_display_user_metadata(display_key=display_key, user=user)
else:
return translation_for(key=display_key, preferred_language=user.preferred_language)

View File

@@ -23,11 +23,10 @@ from cic_ussd.encoder import PasswordEncoder
from cic_ussd.files.local_files import create_local_file_data_stores, json_file_parser
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.base import Metadata
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.operations import (define_response_with_content,
process_menu_interaction_requests,
define_multilingual_responses)
from cic_ussd.phone_number import process_phone_number
from cic_ussd.redis import InMemoryStore
from cic_ussd.requests import (get_request_endpoint,
get_request_method,
@@ -36,8 +35,7 @@ from cic_ussd.requests import (get_request_endpoint,
process_pin_reset_requests)
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.validator import check_ip, check_request_content_length, check_service_code, validate_phone_number, \
validate_presence
from cic_ussd.validator import check_ip, check_request_content_length, check_service_code, validate_phone_number
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -65,6 +63,7 @@ config.censor('PASSWORD', 'DATABASE')
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
@@ -86,7 +85,7 @@ UssdMenu.ussd_menu_db = ussd_menu_db
# set up db
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
SessionBase.connect(data_source_name=data_source_name)
# create session for the life time of http request
SessionBase.session = SessionBase.create_session()
@@ -99,18 +98,12 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
Metadata.base_url = config.get('CIC_META_URL')
UserMetadata.base_url = config.get('CIC_META_URL')
# define signer values
export_dir = config.get('PGP_EXPORT_DIR')
if export_dir:
validate_presence(path=export_dir)
Signer.gpg_path = export_dir
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
if key_file_path:
validate_presence(path=key_file_path)
Signer.key_file_path = key_file_path
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
# initialize celery app
celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
@@ -151,10 +144,6 @@ def application(env, start_response):
external_session_id = post_data.get('sessionId')
user_input = post_data.get('text')
# add validation for phone number
if phone_number:
phone_number = process_phone_number(phone_number=phone_number, region=config.get('PHONE_NUMBER_REGION'))
# validate ip address
if not check_ip(config=config, env=env):
start_response('403 Sneaky, sneaky', errors_headers)
@@ -178,10 +167,8 @@ def application(env, start_response):
# validate phone number
if not validate_phone_number(phone_number):
logg.error('invalid phone number {}'.format(phone_number))
start_response('400 Invalid phone number format', errors_headers)
return []
logg.debug('session {} started for {}'.format(external_session_id, phone_number))
# handle menu interaction requests
chain_str = chain_spec.__str__()

View File

@@ -13,14 +13,12 @@ from confini import Config
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.base import Metadata
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.validator import validate_presence
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('gnupg').setLevel(logging.WARNING)
config_directory = '/usr/local/etc/cic-ussd/'
@@ -48,7 +46,7 @@ logg.debug(config)
# connect to database
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
SessionBase.connect(data_source_name=data_source_name)
# verify database connection with minimal sanity query
session = SessionBase.create_session()
@@ -64,18 +62,12 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
Metadata.base_url = config.get('CIC_META_URL')
UserMetadata.base_url = config.get('CIC_META_URL')
# define signer values
export_dir = config.get('PGP_EXPORT_DIR')
if export_dir:
validate_presence(path=export_dir)
Signer.gpg_path = export_dir
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
if key_file_path:
validate_presence(path=key_file_path)
Signer.key_file_path = key_file_path
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
# set up celery
current_app = celery.Celery(__name__)

View File

@@ -78,10 +78,9 @@ def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, Us
:type state_machine_data: str
"""
user_input, ussd_session, user = state_machine_data
session_data = ussd_session.get('session_data') or {}
session_data['recipient_phone_number'] = user_input
session_data = {
'recipient_phone_number': user_input
}
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)
@@ -97,11 +96,11 @@ def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, User]):
recipient = get_user_by_phone_number(phone_number=user_input)
blockchain_address = recipient.blockchain_address
# retrieve and cache account's metadata
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
s_query_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
[blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
s_query_user_metadata.apply_async(queue='cic-ussd')
def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, User]):
@@ -110,10 +109,9 @@ def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict,
:type state_machine_data: str
"""
user_input, ussd_session, user = state_machine_data
session_data = ussd_session.get('session_data') or {}
session_data['transaction_amount'] = user_input
session_data = {
'transaction_amount': user_input
}
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)

View File

@@ -11,7 +11,7 @@ from cic_types.models.person import generate_vcard_from_contact_data, manage_ide
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import User
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.error import UserMetadataNotFoundError
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.operations import save_to_in_memory_ussd_session_data
from cic_ussd.redis import get_cached_data
@@ -164,11 +164,11 @@ def save_complete_user_metadata(state_machine_data: Tuple[str, dict, User]):
user_metadata = format_user_metadata(metadata=metadata, user=user)
blockchain_address = user.blockchain_address
s_create_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.create_person_metadata',
s_create_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.create_user_metadata',
[blockchain_address, user_metadata]
)
s_create_person_metadata.apply_async(queue='cic-ussd')
s_create_user_metadata.apply_async(queue='cic-ussd')
def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
@@ -181,7 +181,7 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
user_metadata = get_cached_data(key=key)
if not user_metadata:
raise MetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
given_name = ussd_session.get('session_data').get('given_name')
family_name = ussd_session.get('session_data').get('family_name')
@@ -192,7 +192,7 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
# validate user metadata
person = Person()
user_metadata = json.loads(user_metadata)
deserialized_person = person.deserialize(person_data=user_metadata)
deserialized_person = person.deserialize(metadata=user_metadata)
# edit specific metadata attribute
if given_name:
@@ -211,18 +211,18 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
edited_metadata = deserialized_person.serialize()
s_edit_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.edit_person_metadata',
[blockchain_address, edited_metadata]
s_edit_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.edit_user_metadata',
[blockchain_address, edited_metadata, 'pgp']
)
s_edit_person_metadata.apply_async(queue='cic-ussd')
s_edit_user_metadata.apply_async(queue='cic-ussd')
def get_user_metadata(state_machine_data: Tuple[str, dict, User]):
user_input, ussd_session, user = state_machine_data
blockchain_address = user.blockchain_address
s_get_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
'cic_ussd.tasks.metadata.query_user_metadata',
[blockchain_address]
)
s_get_user_metadata.apply_async(queue='cic-ussd')

View File

@@ -5,24 +5,9 @@ import celery
import sqlalchemy
# local imports
from cic_ussd.error import MetadataStoreError
from cic_ussd.db.models.base import SessionBase
class BaseTask(celery.Task):
session_func = SessionBase.create_session
def create_session(self):
return BaseTask.session_func()
def log_banner(self):
logg.debug('task {} root uuid {}'.format(self.__class__.__name__, self.request.root_id))
return
class CriticalTask(BaseTask):
class CriticalTask(celery.Task):
retry_jitter = True
retry_backoff = True
retry_backoff_max = 8
@@ -32,11 +17,4 @@ class CriticalSQLAlchemyTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
sqlalchemy.exc.ResourceClosedError,
)
class CriticalMetadataTask(CriticalTask):
autoretry_for = (
MetadataStoreError,
)

View File

@@ -53,13 +53,6 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
session.add(user)
session.commit()
session.close()
queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'cic_ussd.tasks.metadata.add_phone_pointer',
[result, phone_number]
)
s.apply_async(queue=queue)
# expire cache
cache.expire(task_id, timedelta(seconds=180))
@@ -72,8 +65,6 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
session.close()
raise ActionDataNotFoundError(f'Account creation task: {task_id}, returned unexpected response: {status_code}')
session.close()
@celery_app.task
def process_incoming_transfer_callback(result: dict, param: str, status_code: int):
@@ -127,7 +118,6 @@ def process_incoming_transfer_callback(result: dict, param: str, status_code: in
session.close()
raise ValueError(f'Unexpected status code: {status_code}.')
session.close()
@celery_app.task
def process_balances_callback(result: list, param: str, status_code: int):
@@ -153,24 +143,21 @@ def define_transaction_action_tag(
# check preferred language
if preferred_language == 'en':
action_tag = 'SENT'
direction = 'TO'
else:
action_tag = 'ULITUMA'
direction = 'KWA'
else:
if preferred_language == 'en':
action_tag = 'RECEIVED'
direction = 'FROM'
else:
action_tag = 'ULIPOKEA'
direction = 'KUTOKA'
return action_tag, direction
return action_tag
@celery_app.task
def process_statement_callback(result, param: str, status_code: int):
if status_code == 0:
# create session
session = SessionBase.create_session()
processed_transactions = []
# process transaction data to cache
@@ -183,23 +170,20 @@ def process_statement_callback(result, param: str, status_code: int):
if '0x0000000000000000000000000000000000000000' in source_token:
pass
else:
session = SessionBase.create_session()
# describe a processed transaction
processed_transaction = {}
# check if sender is in the system
sender: User = session.query(User).filter_by(blockchain_address=sender_blockchain_address).first()
owner: User = session.query(User).filter_by(blockchain_address=param).first()
if sender:
processed_transaction['sender_phone_number'] = sender.phone_number
action_tag, direction = define_transaction_action_tag(
preferred_language=owner.preferred_language,
action_tag = define_transaction_action_tag(
preferred_language=sender.preferred_language,
sender_blockchain_address=sender_blockchain_address,
param=param
)
processed_transaction['action_tag'] = action_tag
processed_transaction['direction'] = direction
else:
processed_transaction['sender_phone_number'] = 'GRASSROOTS ECONOMICS'
@@ -212,11 +196,9 @@ def process_statement_callback(result, param: str, status_code: int):
else:
logg.warning(f'Tx with recipient not found in cic-ussd')
session.close()
# add transaction values
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value')).__str__()
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value')).__str__()
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value'))
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value'))
raw_timestamp = transaction.get('timestamp')
timestamp = datetime.utcfromtimestamp(raw_timestamp).strftime('%d/%m/%y, %H:%M')

View File

@@ -1,22 +1,20 @@
# standard imports
import json
import logging
# third-party imports
import celery
from hexathon import strip_0x
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.person import PersonMetadata
from cic_ussd.metadata.phone import PhonePointerMetadata
from cic_ussd.tasks.base import CriticalMetadataTask
from cic_ussd.metadata.user import UserMetadata
celery_app = celery.current_app
logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
@celery_app.task
def query_person_metadata(blockchain_address: str):
def query_user_metadata(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
@@ -24,12 +22,12 @@ def query_person_metadata(blockchain_address: str):
:rtype:
"""
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.query()
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.query()
@celery_app.task
def create_person_metadata(blockchain_address: str, data: dict):
def create_user_metadata(blockchain_address: str, data: dict):
"""
:param blockchain_address:
:type blockchain_address:
@@ -39,20 +37,12 @@ def create_person_metadata(blockchain_address: str, data: dict):
:rtype:
"""
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.create(data=data)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.create(data=data)
@celery_app.task
def edit_person_metadata(blockchain_address: str, data: bytes):
def edit_user_metadata(blockchain_address: str, data: bytes, engine: str):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.edit(data=data)
@celery_app.task(bind=True, base=CriticalMetadataTask)
def add_phone_pointer(self, blockchain_address: str, phone_number: str):
identifier = phone_number.encode('utf-8')
stripped_address = strip_0x(blockchain_address)
phone_metadata_client = PhonePointerMetadata(identifier=identifier)
phone_metadata_client.create(data=stripped_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.edit(data=data, engine=engine)

View File

@@ -70,4 +70,3 @@ def persist_session_to_db(external_session_id: str):
session.close()
raise SessionNotFoundError('Session does not exist!')
session.close()

View File

@@ -47,7 +47,7 @@ def to_wei(value: int) -> int:
:return: Wei equivalent of value in SRF
:rtype: int
"""
return int(value * 1e+6)
return int(value * 1e+18)
class IncomingTransactionProcessor:

View File

@@ -1,8 +1,6 @@
# standard imports
import logging
import os
import re
import ipaddress
# third-party imports
from confini import Config
@@ -22,14 +20,7 @@ def check_ip(config: Config, env: dict):
:return: Request IP validity
:rtype: boolean
"""
# TODO: do once at boot time
actual_ip = ipaddress.ip_network(env.get('REMOTE_ADDR') + '/32')
for allowed_net_src in config.get('APP_ALLOWED_IP').split(','):
allowed_net = ipaddress.ip_network(allowed_net_src)
if actual_ip.subnet_of(allowed_net):
return True
return False
return env.get('REMOTE_ADDR') == config.get('APP_ALLOWED_IP')
def check_request_content_length(config: Config, env: dict):
@@ -119,7 +110,7 @@ def validate_phone_number(phone: str):
def validate_response_type(processor_response: str) -> bool:
"""
"""1*3443*3443*Philip*Wanga*1*Juja*Software Developer*2*3
This function checks the prefix for a corresponding menu's text from the response offered by the Ussd Processor and
determines whether the response should prompt the end of a ussd session or the
:param processor_response: A ussd menu's text value.
@@ -135,14 +126,3 @@ def validate_response_type(processor_response: str) -> bool:
return True
return False
def validate_presence(path: str):
"""
"""
is_present = os.path.exists(path=path)
if not is_present:
raise ValueError(f'Directory/File in path: {path} not found.')
else:
logg.debug(f'Loading data from: {path}')

View File

@@ -51,4 +51,4 @@ RUN cd cic-ussd && \
COPY cic-ussd/.config/ /usr/local/etc/cic-ussd/
COPY cic-ussd/cic_ussd/db/migrations/ /usr/local/share/cic-ussd/alembic
WORKDIR /root
WORKDIR /root

View File

@@ -2,4 +2,4 @@
. /root/db.sh
/usr/local/bin/cic-ussd-tasker $@
/usr/local/bin/cic-ussd-tasker -vv "$@"

View File

@@ -2,6 +2,4 @@
. /root/db.sh
server_port=${SERVER_PORT:-9000}
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :$server_port --pyargv "$@"
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :9000 --pyargv "-vv"

View File

@@ -1,4 +1,4 @@
cic_base[full_graph]~=0.1.2a68
cic-eth~=0.11.0b3
cic_base[full_graph]~=0.1.2a58
cic-eth~=0.11.0a4
cic-notify~=0.4.0a3
cic-types~=0.1.0a10

View File

@@ -4,6 +4,5 @@
"enter_gender",
"enter_age",
"enter_location",
"enter_products",
"display_metadata_pin_authorization"
"enter_products"
]

View File

@@ -9,26 +9,26 @@ from cic_types.models.person import generate_metadata_pointer
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.person import PersonMetadata
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.redis import get_cached_data
def test_user_metadata(create_activated_user, define_metadata_pointer_url, load_config):
PersonMetadata.base_url = load_config.get('CIC_META_URL')
UserMetadata.base_url = load_config.get('CIC_META_URL')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
user_metadata_client = UserMetadata(identifier=identifier)
assert person_metadata_client.url == define_metadata_pointer_url
assert user_metadata_client.url == define_metadata_pointer_url
def test_create_person_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
mock_meta_post_response,
person_metadata):
def test_create_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
mock_meta_post_response,
person_metadata):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
user_metadata_client = UserMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
@@ -38,7 +38,7 @@ def test_create_person_metadata(caplog,
reason='CREATED',
content=json.dumps(mock_meta_post_response).encode('utf-8')
)
person_metadata_client.create(data=person_metadata)
user_metadata_client.create(data=person_metadata)
assert 'Get signed material response status: 201' in caplog.text
with pytest.raises(RuntimeError) as error:
@@ -49,19 +49,19 @@ def test_create_person_metadata(caplog,
status_code=400,
reason='BAD REQUEST'
)
person_metadata_client.create(data=person_metadata)
user_metadata_client.create(data=person_metadata)
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
def test_edit_person_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
person_metadata,
setup_metadata_signer):
def test_edit_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
person_metadata,
setup_metadata_signer):
Signer.gpg_passphrase = load_config.get('KEYS_PASSPHRASE')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
user_metadata_client = UserMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'PUT',
@@ -69,7 +69,7 @@ def test_edit_person_metadata(caplog,
status_code=200,
reason='OK'
)
person_metadata_client.edit(data=person_metadata)
user_metadata_client.edit(data=person_metadata, engine='pgp')
assert 'Signed content submission status: 200' in caplog.text
with pytest.raises(RuntimeError) as error:
@@ -80,7 +80,7 @@ def test_edit_person_metadata(caplog,
status_code=400,
reason='BAD REQUEST'
)
person_metadata_client.edit(data=person_metadata)
user_metadata_client.edit(data=person_metadata, engine='pgp')
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
@@ -92,7 +92,7 @@ def test_get_user_metadata(caplog,
person_metadata,
setup_metadata_signer):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
person_metadata_client = PersonMetadata(identifier=identifier)
user_metadata_client = UserMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'GET',
@@ -101,7 +101,7 @@ def test_get_user_metadata(caplog,
content=json.dumps(person_metadata).encode('utf-8'),
reason='OK'
)
person_metadata_client.query()
user_metadata_client.query()
assert 'Get latest data status: 200' in caplog.text
key = generate_metadata_pointer(
identifier=identifier,
@@ -118,6 +118,6 @@ def test_get_user_metadata(caplog,
status_code=404,
reason='NOT FOUND'
)
person_metadata_client.query()
user_metadata_client.query()
assert 'The data is not available and might need to be added.' in caplog.text
assert str(error.value) == f'400 Client Error: NOT FOUND for url: {define_metadata_pointer_url}'

View File

@@ -15,7 +15,7 @@ from cic_ussd.state_machine.logic.user import (
get_user_metadata,
save_complete_user_metadata,
process_gender_user_input,
save_metadata_attribute_to_session_data,
save_profile_attribute_to_session_data,
update_account_status_to_active)
@@ -41,14 +41,14 @@ def test_update_account_status_to_active(create_pending_user, create_in_db_ussd_
("enter_location", "location", "Kangemi", "Kangemi"),
("enter_products", "products", "Mandazi", "Mandazi"),
])
def test_save_metadata_attribute_to_session_data(current_state,
expected_key,
expected_result,
user_input,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session,
create_in_redis_ussd_session):
def test_save_save_profile_attribute_to_session_data(current_state,
expected_key,
expected_result,
user_input,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session,
create_in_redis_ussd_session):
create_in_db_ussd_session.state = current_state
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
@@ -56,7 +56,7 @@ def test_save_metadata_attribute_to_session_data(current_state,
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session['state'] = current_state
save_metadata_attribute_to_session_data(state_machine_data=state_machine_data)
save_profile_attribute_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
@@ -82,23 +82,23 @@ def test_format_user_metadata(create_activated_user,
from cic_types.models.person import Person
formatted_user_metadata = format_user_metadata(metadata=complete_user_metadata, user=create_activated_user)
person = Person()
user_metadata = person.deserialize(person_data=formatted_user_metadata)
user_metadata = person.deserialize(metadata=formatted_user_metadata)
assert formatted_user_metadata == user_metadata.serialize()
def test_save_complete_user_metadata(celery_session_worker,
complete_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
mocker,
setup_chain_spec,
ussd_session_data):
complete_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
mocker,
setup_chain_spec,
ussd_session_data):
ussd_session = create_in_redis_ussd_session.get(ussd_session_data.get('external_session_id'))
ussd_session = json.loads(ussd_session)
ussd_session['session_data'] = complete_user_metadata
user_metadata = format_user_metadata(metadata=ussd_session.get('session_data'), user=create_activated_user)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_person_metadata.apply_async')
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_user_metadata.apply_async')
save_complete_user_metadata(state_machine_data=state_machine_data)
mocked_create_metadata_task.assert_called_with(
(user_metadata, create_activated_user.blockchain_address),
@@ -127,7 +127,7 @@ def test_edit_user_metadata_attribute(celery_session_worker,
}
state_machine_data = ('', ussd_session, create_activated_user)
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_person_metadata.apply_async')
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_user_metadata.apply_async')
edit_user_metadata_attribute(state_machine_data=state_machine_data)
person_metadata['location']['area_name'] = 'nairobi'
mocked_edit_metadata.assert_called_with(
@@ -146,7 +146,7 @@ def test_get_user_metadata_attribute(celery_session_worker,
ussd_session = json.loads(ussd_session)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_person_metadata.apply_async')
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_user_metadata.apply_async')
get_user_metadata(state_machine_data=state_machine_data)
mocked_get_metadata.assert_called_with(
(create_activated_user.blockchain_address,),

View File

@@ -18,7 +18,7 @@ from cic_ussd.files.local_files import create_local_file_data_stores, json_file_
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.person import PersonMetadata
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.state_machine import UssdStateMachine
@@ -121,9 +121,9 @@ def setup_metadata_signer(load_config):
@pytest.fixture(scope='function')
def define_metadata_pointer_url(load_config, create_activated_user):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
PersonMetadata.base_url = load_config.get('CIC_META_URL')
person_metadata_client = PersonMetadata(identifier=identifier)
return person_metadata_client.url
UserMetadata.base_url = load_config.get('CIC_META_URL')
user_metadata_client = UserMetadata(identifier=identifier)
return user_metadata_client.url
@pytest.fixture(scope='function')

View File

@@ -38,7 +38,7 @@
{
"trigger": "scan_data",
"source": "exit_invalid_recipient",
"dest": "enter_transaction_recipient",
"dest": "send_enter_recipient",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
@@ -49,13 +49,13 @@
"after": "cic_ussd.state_machine.logic.sms.upsell_unregistered_recipient"
},
{
"trigger": "scan_data",
"trigger": "feed_char",
"source": "exit_successful_transaction",
"dest": "start",
"conditions": "cic_ussd.state_machine.logic.menu.menu_zero_zero_selected"
},
{
"trigger": "scan_data",
"trigger": "feed_char",
"source": "exit_successful_transaction",
"dest": "exit",
"conditions": "cic_ussd.state_machine.logic.menu.menu_ninety_nine_selected"

View File

@@ -26,18 +26,18 @@
{
"trigger": "scan_data",
"source": "metadata_management",
"dest": "display_metadata_pin_authorization",
"dest": "standard_pin_authorization",
"conditions": "cic_ussd.state_machine.logic.menu.menu_five_selected"
},
{
"trigger": "scan_data",
"source": "display_metadata_pin_authorization",
"source": "standard_pin_authorization",
"dest": "display_user_metadata",
"conditions": "cic_ussd.state_machine.logic.pin.is_authorized_pin"
},
{
"trigger": "scan_data",
"source": "display_metadata_pin_authorization",
"source": "standard_pin_authorization",
"dest": "exit_pin_blocked",
"conditions": "cic_ussd.state_machine.logic.pin.is_locked_account"
},

View File

@@ -55,8 +55,8 @@ en:
4. Edit products
5. View my profile
0. Back
display_user_metadata: |-
CON Your details are:
display_user_profile_data: |-
END Your details are:
Name: %{full_name}
Gender: %{gender}
Location: %{location}
@@ -85,7 +85,7 @@ en:
retry: |-
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining.
0. Back
display_metadata_pin_authorization:
standard_pin_authorization:
first: |-
CON Please enter your PIN.
0. Back

View File

@@ -56,7 +56,7 @@ sw:
5. Angalia wasifu wako
0. Nyuma
display_user_metadata: |-
CON Wasifu wako una maelezo yafuatayo:
END Wasifu wako una maelezo yafuatayo:
Jina: %{full_name}
Jinsia: %{gender}
Eneo: %{location}

View File

@@ -57,9 +57,9 @@ WORKDIR /home/grassroots
USER grassroots
ARG pip_extra_index_url=https://pip.grassrootseconomics.net:8433
ARG cic_base_version=0.1.2a77
ARG cic_eth_version=0.11.0b6
ARG sarafu_faucet_version=0.0.2a28
ARG cic_base_version=0.1.2a60
ARG cic_eth_version=0.11.0a4
ARG sarafu_faucet_version=0.0.2a16
ARG cic_contracts_version=0.0.2a2
RUN pip install --user --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version \
cic-eth==$cic_eth_version \

View File

@@ -6,7 +6,6 @@ DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER=0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C
DEV_ETH_ACCOUNT_RESERVE_MINTER=${DEV_ETH_ACCOUNT_RESERVE_MINTER:-$DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER}
DEV_ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER=${DEV_ETH_ACCOUNT_RESERVE_MINTER:-$DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER}
DEV_RESERVE_AMOUNT=${DEV_ETH_RESERVE_AMOUNT:-""10000000000000000000000000000000000}
faucet_amount=${DEV_FAUCET_AMOUNT:-0}
keystore_file=$(realpath ./keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c)
echo "environment:"
@@ -76,9 +75,6 @@ if [[ -n "${ETH_PROVIDER}" ]]; then
>&2 echo "set faucet as token minter"
giftable-token-minter -w -y $keystore_file -a $DEV_RESERVE_ADDRESS -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -vv $DEV_FAUCET_ADDRESS
>&2 echo "set token faucet amount"
sarafu-faucet-set -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -a $DEV_FAUCET_ADDRESS $faucet_amount
else
echo "\$ETH_PROVIDER not set!"

View File

@@ -2,234 +2,88 @@
This folder contains tools to generate and import test data.
## OVERVIEW
## DATA CREATION
Three sets of tools are available, sorted by respective subdirectories.
Does not need the cluster to run.
* **eth**: Import using sovereign wallets.
* **cic_eth**: Import using the `cic_eth` custodial engine.
* **cic_ussd**: Import using the `cic_ussd` interface (backed by `cic_eth`)
Each of the modules include two main scripts:
* **import_users.py**: Registers all created accounts in the network
* **import_balance.py**: Transfer an opening balance using an external keystore wallet
The balance script will sync with the blockchain, processing transactions and triggering actions when it finds. In its current version it does not keep track of any other state, so it will run indefinitly and needs You the Human to decide when it has done what it needs to do.
In addition the following common tools are available:
* **create_import_users.py**: User creation script
* **verify.py**: Import verification script
* **cic_meta**: Metadata imports
## REQUIREMENTS
A virtual environment for the python scripts is recommended. We know it works with `python 3.8.x`. Let us know if you run it successfully with other minor versions.
```
python3 -m venv .venv
source .venv/bin/activate
```
Install all requirements from the `requirements.txt` file:
`pip install --extra-index-url https://pip.grassrootseconomics.net:8433 -r requirements.txt`
If you are importing metadata, also do ye olde:
`npm install`
## HOW TO USE
### Step 1 - Data creation
Before running any of the imports, the user data to import has to be generated and saved to disk.
The script does not need any services to run.
Vanilla version:
Vanilla:
`python create_import_users.py [--dir <datadir>] <number_of_users>`
If you want to use a `import_balance.py` script to add to the user's balance from an external address, use:
If you want to use the `import_balance.py` script to add to the user's balance from an external address, add:
`python create_import_users.py --gift-threshold <max_units_to_send> [--dir <datadir>] <number_of_users>`
### Step 2 - Services
## IMPORT
Unless you know what you are doing, start with a clean slate, and execute (in the repository root):
`docker-compose down -v`
Then go through, in sequence:
#### Base requirements
If you are importing using `eth` and _not_ importing metadata, then the only service you need running in the cluster is:
* eth
In all other cases you will _also_ need:
* postgres
* redis
Make sure the following is running in the cluster:
* eth
* postgres
* redis
* cic-meta-server
#### EVM provisions
This step is needed in *all* cases.
`RUN_MASK=1 docker-compose up contract-migration`
After this step is run, you can find top-level ethereum addresses (like the cic registry address, which you will need below) in `<repository_root>/service-configs/.env`
If using the _custodial_ alternative for user imports, also run:
* cic-eth-tasker
* cic-eth-dispatcher
* cic-eth-tracker
#### Custodial provisions
This step is _only_ needed if you are importing using `cic_eth` or `cic_ussd`
`RUN_MASK=2 docker-compose up contract-migration`
You will want to run these in sequence:
#### Custodial services
## 1. Metadata
If importing using `cic_eth` or `cic_ussd` also run:
* cic-eth-tasker
* cic-eth-dispatcher
* cic-eth-tracker
* cic-eth-retrier
If importing using `cic_ussd` also run:
* cic-ussd-tasker
* cic-ussd-server
* cic-notify-tasker
If metadata is to be imported, also run:
* cic-meta-server
### Step 3 - User imports
If you did not change the docker-compose setup, your `eth_provider` the you need for the commands below will be `http://localhost:63545`.
Only run _one_ of the alternatives.
The keystore file used for transferring external opening balances tracker is relative to the directory you found this README in. Of course you can use a different wallet, but then you will have to provide it with tokens yourself (hint: `../reset.sh`)
All external balance transactions are saved in raw wire format in `<datadir>/txs`, with transaction hash as file name.
#### Alternative 1 - Sovereign wallet import - `eth`
First, make a note of the **block height** before running anything.
To import, run to _completion_:
`python eth/import_users.py -v -c config -p <eth_provider> -r <cic_registry_address> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c <datadir>`
After the script completes, keystore files for all generated accouts will be found in `<datadir>/keystore`, all with `foo` as password (would set it empty, but believe it or not some interfaces out there won't work unless you have one).
Then run:
`python eth/import_balance.py -v -c config -r <cic_registry_address> -p <eth_provider> --offset <block_height_at_start> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c <datadir>`
#### Alternative 2 - Custodial engine import - `cic_eth`
Run in sequence, in first terminal:
`python cic_eth/import_balance.py -v -c config -p <eth_provider> -r <cic_registry_address> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c --head out`
In another terminal:
`python cic_eth/import_users.py -v -c config --redis-host-callback <redis_hostname_in_docker> out`
The `redis_hostname_in_docker` value is the hostname required to reach the redis server from within the docker cluster, and should be `redis` if you left the docker-compose unchanged. The `import_users` script will receive the address of each newly created custodial account on a redis subscription fed by a callback task in the `cic_eth` account creation task chain.
#### Alternative 3 - USSD import - `cic_ussd`
If you have previously run the `cic_ussd` import incompletely, it could be a good idea to purge the queue. If you have left docker-compose unchanged, `redis_url` should be `redis://localhost:63379`.
`celery -A cic_ussd.import_task purge -Q cic-import-ussd --broker <redis_url>`
Then, in sequence, run in first terminal:
`python cic_eth/import_balance.py -v -c config -p <eth_provider> -r <cic_registry_address> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c out`
In second terminal:
`python cic_ussd/import_users.py -v -c config out`
The balance script is a celery task worker, and will not exit by itself in its current version. However, after it's done doing its job, you will find "reached nonce ... exiting" among the last lines of the log.
The connection parameters for the `cic-ussd-server` is currently _hardcoded_ in the `import_users.py` script file.
### Step 4 - Metadata import (optional)
The metadata import scripts can be run at any time after step 1 has been completed.
#### Importing user metadata
To import the main user metadata structs, run:
`node cic_meta/import_meta.js <datadir> <number_of_users>`
`node import_meta.js <datadir> <number_of_users>`
Monitors a folder for output from the `import_users.py` script, adding the metadata found to the `cic-meta` service.
If _number of users_ is omitted the script will run until manually interrupted.
## 2. Balances
(Only if you used the `--gift-threshold` option above)
`python -c config -i <newchain:id> -r <cic_registry_address> -p <eth_provider> --head -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c <datadir>`
This will monitor new mined blocks and send balances to the newly created accounts.
#### Importing phone pointer
### 3. Users
`node cic_meta/import_meta_phone.js <datadir> <number_of_users>`
Only use **one** of the following
If you imported using `cic_ussd`, the phone pointer is _already added_ and this script will do nothing.
#### Custodial
This alternative generates accounts using the `cic-eth` custodial engine
### Step 5 - Verify
Without any modifications to the cluster and config files:
`python verify.py -v -c config -r <cic_registry_address> -p <eth_provider> <datadir>`
`python import_users.py -c config --redis-host-callback redis <datadir>`
Included checks:
* Private key is in cic-eth keystore
* Address is in accounts index
* Address has gas balance
* Address has triggered the token faucet
* Address has token balance matching the gift threshold
* Personal metadata can be retrieved and has exact match
* Phone pointer metadata can be retrieved and matches address
* USSD menu response is initial state after registration
** A note on the The callback**: The script uses a redis callback to retrieve the newly generated custodial address. This is the redis server _from the perspective of the cic-eth component_.
Checks can be selectively included and excluded. See `--help` for details.
#### Sovereign
Will output one line for each check, with name of check and number of errors found per check.
This alternative generates keystore files, while registering corresponding addresses in the accounts registry directly
`python import_sovereign_users.py -c config -i <newchain:id> -r <cic_registry_address> -p <eth_provider> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c <datadir>`
A `keystore` sub-directory in the data path is created, with ethereum keystore files for all generated private keys. Passphrase is set to empty string for all of them.
## VERIFY
`python verify.py -c config -i <newchain:id> -r <cic_registry_address> -p <eth_provider> <datadir>`
Checks
* Private key is in cic-eth keystore
* Address is in accounts index
* Address has balance matching the gift threshold
* Metadata can be retrieved and has exact match
Should exit with code 0 if all input data is found in the respective services.
## KNOWN ISSUES
- If the faucet disbursement is set to a non-zero amount, the balances will be off. The verify script needs to be improved to check the faucet amount.
- When the account callback in `cic_eth` fails, the `cic_eth/import_users.py` script will exit with a cryptic complaint concerning a `None` value.
- Sovereign import scripts use the same keystore, and running them simultaneously will mess up the transaction nonce sequence. Better would be to use two different keystore wallets so balance and users scripts can be run simultaneously.
- `pycrypto` and `pycryptodome` _have to be installed in that order_. If you get errors concerning `Crypto.KDF` then uninstall both and re-install in that order. Make sure you use the versions listed in `requirements.txt`. `pycryptodome` is a legacy dependency and will be removed as soon as possible.
- Sovereign import script is very slow because it's scrypt'ing keystore files for the accounts that it creates. An improvement would be optional and/or asynchronous keyfile generation.
- Running the balance script should be _optional_ in all cases, but is currently required in the case of `cic_ussd` because it is needed to generate the metadata. An improvement would be moving the task to `import_users.py`, for a different queue than the balance tx handler.
- `cic_ussd` imports is poorly implemented, and consumes a lot of resources. Therefore it takes a long time to complete. Reducing the amount of polls for the phone pointer would go a long way to improve it.
If the faucet disbursement is set to a non-zero amount, the balances will be off. The verify script needs to be improved to check the faucet amount.

View File

@@ -1,134 +0,0 @@
const fs = require('fs');
const path = require('path');
const http = require('http');
const cic = require('cic-client-meta');
const vcfp = require('vcard-parser');
//const conf = JSON.parse(fs.readFileSync('./cic.conf'));
const config = new cic.Config('./config');
config.process();
console.log(config);
function sendit(uid, envelope) {
const d = envelope.toJSON();
const contentLength = (new TextEncoder().encode(d)).length;
const opts = {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Content-Length': contentLength,
'X-CIC-AUTOMERGE': 'client',
},
};
let url = config.get('META_URL');
url = url.replace(new RegExp('^(.+://[^/]+)/*$'), '$1/');
console.log('posting to url: ' + url + uid);
const req = http.request(url + uid, opts, (res) => {
res.on('data', process.stdout.write);
res.on('end', () => {
console.log('result', res.statusCode, res.headers);
});
});
if (!req.write(d)) {
console.error('foo', d);
process.exit(1);
}
req.end();
}
function doOne(keystore, filePath, address) {
const signer = new cic.PGPSigner(keystore);
const j = JSON.parse(fs.readFileSync(filePath).toString());
const b = Buffer.from(j['vcard'], 'base64');
const s = b.toString();
const o = vcfp.parse(s);
const phone = o.tel[0].value;
cic.Phone.toKey(phone).then((uid) => {
const o = fs.readFileSync(filePath, 'utf-8');
const s = new cic.Syncable(uid, o);
s.setSigner(signer);
s.onwrap = (env) => {
sendit(uid, env);
};
s.sign();
});
}
const privateKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE'));
const publicKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE'));
pk = fs.readFileSync(privateKeyPath);
pubk = fs.readFileSync(publicKeyPath);
new cic.PGPKeyStore(
config.get('PGP_PASSPHRASE'),
pk,
pubk,
undefined,
undefined,
importMetaPhone,
);
const batchSize = 16;
const batchDelay = 1000;
const total = parseInt(process.argv[3]);
const dataDir = process.argv[2];
const workDir = path.join(dataDir, 'phone/meta');
const userDir = path.join(dataDir, 'new');
let count = 0;
let batchCount = 0;
function importMetaPhone(keystore) {
let err;
let files;
try {
err, files = fs.readdirSync(workDir);
} catch {
console.error('source directory not yet ready', workDir);
setTimeout(importMetaPhone, batchDelay, keystore);
return;
}
let limit = batchSize;
if (files.length < limit) {
limit = files.length;
}
for (let i = 0; i < limit; i++) {
const file = files[i];
if (file.substr(0) == '.') {
console.debug('skipping file', file);
}
const filePath = path.join(workDir, file);
const address = fs.readFileSync(filePath).toString().substring(2).toUpperCase();
const metaFilePath = path.join(
userDir,
address.substring(0, 2),
address.substring(2, 4),
address + '.json',
);
doOne(keystore, metaFilePath, address);
fs.unlinkSync(filePath);
count++;
batchCount++;
if (batchCount == batchSize) {
console.debug('reached batch size, breathing');
batchCount=0;
setTimeout(importMeta, batchDelay, keystore);
return;
}
}
if (count == total) {
return;
}
setTimeout(importMetaPhone, 100, keystore);
}

View File

@@ -1,157 +0,0 @@
# standard imports
import os
import sys
import logging
import argparse
import hashlib
import redis
import celery
# external imports
import confini
from chainlib.eth.connection import EthHTTPConnection
from chainlib.chain import ChainSpec
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum_address
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore.dict import DictKeystore
from cic_types.models.person import Person
# local imports
from import_util import BalanceProcessor
from import_task import *
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = './config'
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host')
argparser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--token-symbol', default='SRF', type=str, dest='token_symbol', help='Token symbol to use for trnsactions')
argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', default='out', type=str, help='user export directory')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
'REDIS_HOST': getattr(args, 'redis_host'),
'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'),
'META_HOST': getattr(args, 'meta_host'),
'META_PORT': getattr(args, 'meta_port'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
r = redis.Redis(redis_host, redis_port, redis_db)
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
signer_address = None
keystore = DictKeystore()
if args.y != None:
logg.debug('loading keystore file {}'.format(args.y))
signer_address = keystore.import_keystore_file(args.y)
logg.debug('now have key for signer address {}'.format(signer_address))
signer = EIP155Signer(keystore)
queue = args.q
chain_str = config.get('CIC_CHAIN_SPEC')
block_offset = 0
if args.head:
block_offset = -1
else:
block_offset = args.offset
chain_spec = ChainSpec.from_chain_str(chain_str)
old_chain_spec_str = args.old_chain_spec
old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str)
user_dir = args.user_dir # user_out_dir from import_users.py
token_symbol = args.token_symbol
MetadataTask.meta_host = config.get('META_HOST')
MetadataTask.meta_port = config.get('META_PORT')
ImportTask.chain_spec = chain_spec
def main():
conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), signer_address, signer)
ImportTask.balance_processor.init()
# TODO get decimals from token
balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r'))
remove_zeros = 10**6
i = 0
while True:
l = f.readline()
if l == None:
break
r = l.split(',')
try:
address = to_checksum_address(r[0])
sys.stdout.write('loading balance {} {} {}'.format(i, address, r[1]).ljust(200) + "\r")
except ValueError:
break
balance = int(int(r[1].rstrip()) / remove_zeros)
balances[address] = balance
i += 1
f.close()
ImportTask.balances = balances
ImportTask.count = i
s = celery.signature(
'import_task.send_txs',
[
MetadataTask.balance_processor.nonce_offset,
],
queue='cic-import-ussd',
)
s.apply_async()
argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG']
celery_app.worker_main(argv)
if __name__ == '__main__':
main()

View File

@@ -1,218 +0,0 @@
# standard imports
import os
import logging
import urllib.parse
import urllib.error
import urllib.request
import json
# external imports
import celery
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import (
unpack,
raw,
)
from cic_types.processor import generate_metadata_pointer
from cic_types.models.person import Person
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
celery_app = celery.current_app
class ImportTask(celery.Task):
balances = None
import_dir = 'out'
count = 0
chain_spec = None
balance_processor = None
max_retries = None
class MetadataTask(ImportTask):
meta_host = None
meta_port = None
meta_path = ''
meta_ssl = False
autoretry_for = (
urllib.error.HTTPError,
OSError,
)
retry_jitter = True
retry_backoff = True
retry_backoff_max = 60
@classmethod
def meta_url(self):
scheme = 'http'
if self.meta_ssl:
scheme += s
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path))
return urllib.parse.urlunparse(url)
def old_address_from_phone(base_path, phone):
pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format(
base_path,
pidx[:2],
pidx[2:4],
pidx,
)
)
f = open(phone_idx_path, 'r')
old_address = f.read()
f.close()
return old_address
@celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone):
identifier = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
url = urllib.parse.urljoin(self.meta_url(), identifier)
logg.debug('attempt getting phone pointer at {} for phone {}'.format(url, phone))
r = urllib.request.urlopen(url)
address = json.load(r)
address = address.replace('"', '')
logg.debug('address {} for phone {}'.format(address, phone))
return address
@celery_app.task(bind=True, base=MetadataTask)
def generate_metadata(self, address, phone):
old_address = old_address_from_phone(self.import_dir, phone)
logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> foo')
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
self.import_dir,
old_address_upper[:2],
old_address_upper[2:4],
old_address_upper,
)
f = open(metadata_path, 'r')
o = json.load(f)
f.close()
u = Person.deserialize(o)
if u.identities.get('evm') == None:
u.identities['evm'] = {}
sub_chain_str = '{}:{}'.format(self.chain_spec.common_name(), self.chain_spec.network_id())
u.identities['evm'][sub_chain_str] = [add_0x(address)]
new_address_clean = strip_0x(address)
filepath = os.path.join(
self.import_dir,
'new',
new_address_clean[:2].upper(),
new_address_clean[2:4].upper(),
new_address_clean.upper() + '.json',
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
o = u.serialize()
f = open(filepath, 'w')
f.write(json.dumps(o))
f.close()
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person')
meta_filepath = os.path.join(
self.import_dir,
'meta',
'{}.json'.format(new_address_clean.upper()),
)
os.symlink(os.path.realpath(filepath), meta_filepath)
logg.debug('found metadata {} for phone {}'.format(o, phone))
return address
@celery_app.task(bind=True, base=MetadataTask)
def opening_balance_tx(self, address, phone, serial):
old_address = old_address_from_phone(self.import_dir, phone)
k = to_checksum_address(strip_0x(old_address))
balance = self.balances[k]
logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone))
decimal_balance = self.balance_processor.get_decimal_amount(int(balance))
(tx_hash_hex, o) = self.balance_processor.get_rpc_tx(address, decimal_balance, serial)
tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec)
logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex))
tx_path = os.path.join(
self.import_dir,
'txs',
strip_0x(tx_hash_hex),
)
f = open(tx_path, 'w')
f.write(strip_0x(o))
f.close()
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(tx['nonce']),
)
os.symlink(os.path.realpath(tx_path), tx_nonce_path)
return tx['hash']
@celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None, default_retry_delay=0.1)
def send_txs(self, nonce):
if nonce == self.count + self.balance_processor.nonce_offset:
logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset, self.count))
return
logg.debug('attempt to open symlink for nonce {}'.format(nonce))
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(nonce),
)
f = open(tx_nonce_path, 'r')
tx_signed_raw_hex = f.read()
f.close()
os.unlink(tx_nonce_path)
o = raw(add_0x(tx_signed_raw_hex))
tx_hash_hex = self.balance_processor.conn.do(o)
logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) #tx_signed_raw_hex))
nonce += 1
queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'import_task.send_txs',
[
nonce,
],
queue=queue,
)
s.apply_async()
return nonce

View File

@@ -1,191 +0,0 @@
# standard imports
import os
import sys
import json
import logging
import argparse
import uuid
import datetime
import time
import urllib.request
from glob import glob
# third-party imports
import redis
import confini
import celery
from hexathon import (
add_0x,
strip_0x,
)
from chainlib.eth.address import to_checksum
from cic_types.models.person import Person
from cic_eth.api.api_task import Api
from chainlib.chain import ChainSpec
from cic_types.processor import generate_metadata_pointer
import phonenumbers
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic'
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain specification string')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
argparser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = argparser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'REDIS_HOST': getattr(args, 'redis_host'),
'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'),
}
config.dict_override(args_override, 'cli')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub()
user_new_dir = os.path.join(args.user_dir, 'new')
os.makedirs(user_new_dir)
meta_dir = os.path.join(args.user_dir, 'meta')
os.makedirs(meta_dir)
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
txs_dir = os.path.join(args.user_dir, 'txs')
os.makedirs(txs_dir)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
url = 'http'
if ssl:
url += 's'
url += '://{}:{}'.format(host, port)
url += '/?username={}&password={}'.format(username, password) #config.get('USSD_USER'), config.get('USSD_PASS'))
logg.info('ussd service url {}'.format(url))
logg.info('ussd phone {}'.format(phone))
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': service_code,
'phoneNumber': phone,
'text': service_code,
}
req = urllib.request.Request(url)
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
return req
def register_ussd(i, u):
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
logg.debug('tel {} {}'.format(u.tel, phone))
req = build_ussd_request(phone, 'localhost', 63315, '*483*46#', '', '')
response = urllib.request.urlopen(req)
response_data = response.read().decode('utf-8')
state = response_data[:3]
out = response_data[4:]
logg.debug('ussd reponse: {}'.format(out))
if __name__ == '__main__':
i = 0
j = 0
for x in os.walk(user_old_dir):
for y in x[2]:
if y[len(y)-5:] != '.json':
continue
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
new_address = register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
)
s_meta.link(s_balance)
s_phone.link(s_meta)
s_phone.apply_async(countdown=7) # block time plus a bit of time for ussd processing
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1
if j == batch_size:
time.sleep(batch_delay)
j = 0
#fi.close()

View File

@@ -1,72 +0,0 @@
# standard imports
import logging
# external imports
from eth_contract_registry import Registry
from eth_token_index import TokenUniqueSymbolIndex
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.nonce import OverrideNonceOracle
from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import (
count,
TxFormat,
)
logg = logging.getLogger().getChild(__name__)
class BalanceProcessor:
def __init__(self, conn, chain_spec, registry_address, signer_address, signer):
self.chain_spec = chain_spec
self.conn = conn
#self.signer_address = signer_address
self.registry_address = registry_address
self.token_index_address = None
self.token_address = None
self.signer_address = signer_address
self.signer = signer
o = count(signer_address)
c = self.conn.do(o)
self.nonce_offset = int(c, 16)
self.gas_oracle = OverrideGasOracle(conn=conn, limit=8000000)
self.value_multiplier = 1
def init(self):
# Get Token registry address
registry = Registry(self.chain_spec)
o = registry.address_of(self.registry_address, 'TokenRegistry')
r = self.conn.do(o)
self.token_index_address = registry.parse_address_of(r)
logg.info('found token index address {}'.format(self.token_index_address))
token_registry = TokenUniqueSymbolIndex(self.chain_spec)
o = token_registry.address_of(self.token_index_address, 'SRF')
r = self.conn.do(o)
self.token_address = token_registry.parse_address_of(r)
logg.info('found SRF token address {}'.format(self.token_address))
tx_factory = ERC20(self.chain_spec)
o = tx_factory.decimals(self.token_address)
r = self.conn.do(o)
n = tx_factory.parse_decimals(r)
self.value_multiplier = 10 ** n
def get_rpc_tx(self, recipient, value, i):
logg.debug('initiating nonce offset {} for recipient {}'.format(self.nonce_offset + i, recipient))
nonce_oracle = OverrideNonceOracle(self.signer_address, self.nonce_offset + i)
tx_factory = ERC20(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.gas_oracle)
return tx_factory.transfer(self.token_address, self.signer_address, recipient, value, tx_format=TxFormat.RLP_SIGNED)
#(tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value)
#self.conn.do(o)
#return tx_hash_hex
def get_decimal_amount(self, value):
return value * self.value_multiplier

View File

@@ -1,24 +0,0 @@
[app]
ALLOWED_IP=0.0.0.0/0
LOCALE_FALLBACK=en
LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json
user =
pass =
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/
[client]
host =
port =
ssl =

View File

@@ -1,5 +1,2 @@
[meta]
url = http://localhost:63380
host = localhost
port = 63380
ssl = 0

View File

@@ -24,7 +24,6 @@ from cic_types.models.person import (
get_contact_data_from_vcard,
)
from chainlib.eth.address import to_checksum_address
import phonenumbers
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -81,12 +80,10 @@ phone_idx = []
user_dir = args.dir
user_count = args.user_count
random.seed()
def genPhoneIndex(phone):
h = hashlib.new('sha256')
h.update(phone.encode('utf-8'))
h.update(b':cic.phone')
h.update(b'cic.msisdn')
return h.digest().hex()
@@ -104,9 +101,7 @@ def genDate():
def genPhone():
phone_str = '+254' + str(random.randint(100000000, 999999999))
phone_object = phonenumbers.parse(phone_str)
return phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
return fake.msisdn()
def genPersonal(phone):
@@ -210,14 +205,14 @@ if __name__ == '__main__':
f.close()
pidx = genPhoneIndex(phone)
d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx)
d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), uid)
f = open('{}/{}'.format(d, pidx), 'w')
f.write(eth)
f.close()
amount = genAmount()
fa.write('{},{}\n'.format(eth,amount))
logg.debug('pidx {}, uid {}, eth {}, amount {}, phone {}'.format(pidx, uid, eth, amount, phone))
logg.debug('pidx {}, uid {}, eth {}, amount {}'.format(pidx, uid, eth, amount))
i += 1

View File

@@ -1,309 +0,0 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
import hashlib
import csv
import json
# external imports
import eth_abi
import confini
from hexathon import (
strip_0x,
add_0x,
)
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver import HeadSyncer
from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainlib.hash import keccak256_string_to_hex
from chainlib.eth.address import to_checksum_address
from chainlib.eth.erc20 import ERC20
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import TxFactory
from chainlib.jsonrpc import jsonrpc_template
from chainlib.eth.error import EthException
from chainlib.chain import ChainSpec
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore.dict import DictKeystore
from cic_types.models.person import Person
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = './config'
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--token-symbol', default='SRF', type=str, dest='token_symbol', help='Token symbol to use for trnsactions')
argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', type=str, help='user export directory')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
#app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
signer_address = None
keystore = DictKeystore()
if args.y != None:
logg.debug('loading keystore file {}'.format(args.y))
signer_address = keystore.import_keystore_file(args.y)
logg.debug('now have key for signer address {}'.format(signer_address))
signer = EIP155Signer(keystore)
queue = args.q
chain_str = config.get('CIC_CHAIN_SPEC')
block_offset = 0
if args.head:
block_offset = -1
else:
block_offset = args.offset
chain_spec = ChainSpec.from_chain_str(chain_str)
old_chain_spec_str = args.old_chain_spec
old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str)
user_dir = args.user_dir # user_out_dir from import_users.py
token_symbol = args.token_symbol
class Handler:
account_index_add_signature = keccak256_string_to_hex('add(address)')[:8]
def __init__(self, conn, chain_spec, user_dir, balances, token_address, signer, gas_oracle, nonce_oracle):
self.token_address = token_address
self.user_dir = user_dir
self.balances = balances
self.chain_spec = chain_spec
self.tx_factory = ERC20(chain_spec, signer, gas_oracle, nonce_oracle)
def name(self):
return 'balance_handler'
def filter(self, conn, block, tx, db_session):
if tx.payload == None or len(tx.payload) == 0:
logg.debug('no payload, skipping {}'.format(tx))
return
if tx.payload[:8] == self.account_index_add_signature:
recipient = eth_abi.decode_single('address', bytes.fromhex(tx.payload[-64:]))
#original_address = to_checksum_address(self.addresses[to_checksum_address(recipient)])
user_file = 'new/{}/{}/{}.json'.format(
recipient[2:4].upper(),
recipient[4:6].upper(),
recipient[2:].upper(),
)
filepath = os.path.join(self.user_dir, user_file)
o = None
try:
f = open(filepath, 'r')
o = json.load(f)
f.close()
except FileNotFoundError:
logg.error('no import record of address {}'.format(recipient))
return
u = Person.deserialize(o)
original_address = u.identities[old_chain_spec.engine()]['{}:{}'.format(old_chain_spec.common_name(), old_chain_spec.network_id())][0]
try:
balance = self.balances[original_address]
except KeyError as e:
logg.error('balance get fail orig {} new {}'.format(original_address, recipient))
return
# TODO: store token object in handler ,get decimals from there
multiplier = 10**6
balance_full = balance * multiplier
logg.info('registered {} originally {} ({}) tx hash {} balance {}'.format(recipient, original_address, u, tx.hash, balance_full))
(tx_hash_hex, o) = self.tx_factory.transfer(self.token_address, signer_address, recipient, balance_full)
logg.info('submitting erc20 transfer tx {} for recipient {}'.format(tx_hash_hex, recipient))
r = conn.do(o)
tx_path = os.path.join(
user_dir,
'txs',
strip_0x(tx_hash_hex),
)
f = open(tx_path, 'w')
f.write(strip_0x(o['params'][0]))
f.close()
# except TypeError as e:
# logg.warning('typerror {}'.format(e))
# pass
# except IndexError as e:
# logg.warning('indexerror {}'.format(e))
# pass
# except EthException as e:
# logg.error('send error {}'.format(e).ljust(200))
#except KeyError as e:
# logg.error('key record not found in imports: {}'.format(e).ljust(200))
#class BlockGetter:
#
# def __init__(self, conn, gas_oracle, nonce_oracle, chain_spec):
# self.conn = conn
# self.tx_factory = ERC20(signer=signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle, chain_id=chain_id)
#
#
# def get(self, n):
# o = block_by_number(n)
# r = self.conn.do(o)
# b = None
# try:
# b = Block(r)
# except TypeError as e:
# if r == None:
# logg.debug('block not found {}'.format(n))
# else:
# logg.error('block retrieve error {}'.format(e))
# return b
def progress_callback(block_number, tx_index):
sys.stdout.write(str(block_number).ljust(200) + "\n")
def main():
global chain_str, block_offset, user_dir
conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
gas_oracle = OverrideGasOracle(conn=conn, limit=8000000)
nonce_oracle = RPCNonceOracle(signer_address, conn)
# Get Token registry address
txf = TxFactory(chain_spec, signer=signer, gas_oracle=gas_oracle, nonce_oracle=None)
tx = txf.template(signer_address, config.get('CIC_REGISTRY_ADDRESS'))
registry_addressof_method = keccak256_string_to_hex('addressOf(bytes32)')[:8]
data = add_0x(registry_addressof_method)
data += eth_abi.encode_single('bytes32', b'TokenRegistry').hex()
txf.set_code(tx, data)
o = jsonrpc_template()
o['method'] = 'eth_call'
o['params'].append(txf.normalize(tx))
o['params'].append('latest')
r = conn.do(o)
token_index_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r))))
logg.info('found token index address {}'.format(token_index_address))
# Get Sarafu token address
tx = txf.template(signer_address, token_index_address)
data = add_0x(registry_addressof_method)
h = hashlib.new('sha256')
h.update(token_symbol.encode('utf-8'))
z = h.digest()
data += eth_abi.encode_single('bytes32', z).hex()
txf.set_code(tx, data)
o = jsonrpc_template()
o['method'] = 'eth_call'
o['params'].append(txf.normalize(tx))
o['params'].append('latest')
r = conn.do(o)
try:
sarafu_token_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r))))
except ValueError as e:
logg.critical('lookup failed for token {}: {}'.format(token_symbol, e))
sys.exit(1)
logg.info('found token address {}'.format(sarafu_token_address))
syncer_backend = MemBackend(chain_str, 0)
if block_offset == -1:
o = block_latest()
r = conn.do(o)
block_offset = int(strip_0x(r), 16) + 1
#
# addresses = {}
# f = open('{}/addresses.csv'.format(user_dir, 'r'))
# while True:
# l = f.readline()
# if l == None:
# break
# r = l.split(',')
# try:
# k = r[0]
# v = r[1].rstrip()
# addresses[k] = v
# sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r")
# except IndexError as e:
# break
# f.close()
# TODO get decimals from token
balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r'))
remove_zeros = 10**6
i = 0
while True:
l = f.readline()
if l == None:
break
r = l.split(',')
try:
address = to_checksum_address(r[0])
sys.stdout.write('loading balance {} {} {}'.format(i, address, r[1]).ljust(200) + "\r")
except ValueError:
break
balance = int(int(r[1].rstrip()) / remove_zeros)
balances[address] = balance
i += 1
f.close()
syncer_backend.set(block_offset, 0)
syncer = HeadSyncer(syncer_backend, block_callback=progress_callback)
handler = Handler(conn, chain_spec, user_dir, balances, sarafu_token_address, signer, gas_oracle, nonce_oracle)
syncer.add_filter(handler)
syncer.loop(1, conn)
if __name__ == '__main__':
main()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
{"metricId":"ea11447f-da1c-49e6-b0a2-8a988a99e3ce","metrics":{"from":"2021-02-12T18:59:21.666Z","to":"2021-02-12T18:59:21.666Z","successfulInstalls":0,"failedInstalls":1}}

View File

@@ -10,14 +10,14 @@ import hashlib
import csv
import json
# external imports
# third-party impotts
import eth_abi
import confini
from hexathon import (
strip_0x,
add_0x,
)
from chainsyncer.backend.memory import MemBackend
from chainsyncer.backend import MemBackend
from chainsyncer.driver import HeadSyncer
from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.block import (
@@ -25,13 +25,13 @@ from chainlib.eth.block import (
block_by_number,
Block,
)
from chainlib.hash import keccak256_string_to_hex
from chainlib.eth.hash import keccak256_string_to_hex
from chainlib.eth.address import to_checksum_address
from chainlib.eth.erc20 import ERC20
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import TxFactory
from chainlib.jsonrpc import jsonrpc_template
from chainlib.eth.rpc import jsonrpc_template
from chainlib.eth.error import EthException
from chainlib.chain import ChainSpec
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
@@ -42,7 +42,7 @@ from cic_types.models.person import Person
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = './config'
config_dir = '/usr/local/etc/cic-syncer'
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
@@ -162,15 +162,6 @@ class Handler:
(tx_hash_hex, o) = self.tx_factory.transfer(self.token_address, signer_address, recipient, balance_full)
logg.info('submitting erc20 transfer tx {} for recipient {}'.format(tx_hash_hex, recipient))
r = conn.do(o)
tx_path = os.path.join(
user_dir,
'txs',
strip_0x(tx_hash_hex),
)
f = open(tx_path, 'w')
f.write(strip_0x(o['params'][0]))
f.close()
# except TypeError as e:
# logg.warning('typerror {}'.format(e))
# pass
@@ -204,8 +195,8 @@ class Handler:
# return b
def progress_callback(block_number, tx_index):
sys.stdout.write(str(block_number).ljust(200) + "\n")
def progress_callback(block_number, tx_index, s):
sys.stdout.write(str(s).ljust(200) + "\n")
@@ -299,7 +290,7 @@ def main():
f.close()
syncer_backend.set(block_offset, 0)
syncer = HeadSyncer(syncer_backend, block_callback=progress_callback)
syncer = HeadSyncer(syncer_backend, progress_callback=progress_callback)
handler = Handler(conn, chain_spec, user_dir, balances, sarafu_token_address, signer, gas_oracle, nonce_oracle)
syncer.add_filter(handler)
syncer.loop(1, conn)

View File

@@ -0,0 +1 @@
python import_balance.py -c config -i evm:bloxberg:8996 -y /home/lash/tmp/d/keystore/UTC--2021-02-07T09-58-35.341813355Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c -v $@

Some files were not shown because too many files have changed in this diff Show More