Compare commits

..

39 Commits

Author SHA1 Message Date
nolash
0490d41aa5 Merge remote-tracking branch 'origin/master' into philip/fix-filter-callbacks 2021-04-15 18:11:30 +02:00
nolash
22ec8e2e0e Update sql backend symbol name, deps 2021-04-15 18:04:47 +02:00
Louis Holbrook
a8529ae2ef Merge branch 'lash/improve-cic-cache-service' into 'master'
Iimprove cic cache service

See merge request grassrootseconomics/cic-internal-integration!100
2021-04-15 14:02:11 +00:00
Louis Holbrook
98ddf56a1d Iimprove cic cache service 2021-04-15 14:02:09 +00:00
nolash
e2eca335f1 More dependency fun 2021-04-14 20:42:27 +02:00
nolash
be32fc75fc Add value to token data from giftto 2021-04-14 20:28:05 +02:00
nolash
bb930ce148 Add token lookup for faucet gift callback 2021-04-14 19:21:35 +02:00
nolash
5f99048154 Add mock block in callback test code 2021-04-14 18:33:33 +02:00
nolash
2df8e8f299 Add registry address to eth tracker 2021-04-14 15:47:05 +02:00
nolash
c0a8be637f Propagate chainlib, cic-eth-registry upgrade 2021-04-14 14:09:04 +02:00
nolash
ddf3ba0d22 Bump version, upgrade cic-eth-registry 2021-04-14 14:06:17 +02:00
nolash
60033e9f9e Upgrade chainlib 2021-04-14 14:04:01 +02:00
nolash
30b47f05ef Merge remote-tracking branch 'origin/master' into philip/fix-filter-callbacks 2021-04-14 13:42:58 +02:00
nolash
7a958c6f89 Add toplevel callback filter test 2021-04-14 13:42:14 +02:00
nolash
be65da9924 Add test for callback filter 2021-04-14 13:26:12 +02:00
nolash
fe4bef5290 Add test for giftto filter 2021-04-14 13:20:34 +02:00
nolash
5345803dc7 Add transferfrom filter 2021-04-14 12:52:39 +02:00
nolash
bf8ddd0a5c Pass parse filter 2021-04-14 12:44:00 +02:00
nolash
537ea782c7 SOLVED dependency hell 2021-04-14 11:28:01 +02:00
nolash
0c7b78fdf2 Dependency hell 2021-04-14 11:17:57 +02:00
bee602b16a Merge branch 'philip/leaner-metadata-handling' into 'master'
Philip/leaner metadata handling

See merge request grassrootseconomics/cic-internal-integration!94
2021-04-14 09:00:11 +00:00
c67274846f Philip/leaner metadata handling 2021-04-14 09:00:10 +00:00
nolash
bb556b5c45 Merge remote-tracking branch 'origin/master' into philip/fix-filter-callbacks 2021-04-14 10:19:14 +02:00
nolash
25847de75e Add callback filter test file stub 2021-04-14 10:18:31 +02:00
Louis Holbrook
48570b2338 Merge branch 'lash/update-syncer-imports' into 'master'
Update syncer imports

See merge request grassrootseconomics/cic-internal-integration!97
2021-04-14 08:17:48 +00:00
Louis Holbrook
c80b8771b9 Update syncer imports 2021-04-14 08:17:47 +00:00
nolash
6535760abb Simplify conditional scope 2021-04-14 08:11:06 +02:00
689f04c1be Merge branch 'master' of gitlab.com:grassrootseconomics/cic-internal-integration into philip/fix-filter-callbacks 2021-04-13 21:42:16 +03:00
f03d3c4390 Minor fixes but still failing. 2021-04-13 21:41:32 +03:00
32ce66ce43 updates deps. 2021-04-13 21:40:55 +03:00
Louis Holbrook
6c6db7bc7b Merge branch 'lash/cache-tracker-history' into 'master'
Fix missing history syncer in cic-cache-tracker

See merge request grassrootseconomics/cic-internal-integration!96
2021-04-13 14:48:25 +00:00
nolash
bb941acd7e Fix missing history syncer in cic-cache-tracker 2021-04-13 15:31:40 +02:00
b8fbf512fb Adds rudimentary fix for "parse_gifto" method. 2021-04-12 16:53:49 +03:00
3aaa42a1a7 Updates sarafu-faucet deps 2021-04-12 16:52:43 +03:00
Louis Holbrook
7dee7de26e Merge branch 'lash/import-ussd' into 'master'
Implement migration script with ussd and notify

See merge request grassrootseconomics/cic-internal-integration!87
2021-04-09 13:00:15 +00:00
Louis Holbrook
7b16a36a62 Implement migration script with ussd and notify 2021-04-09 13:00:15 +00:00
Louis Holbrook
5a4e0b8eba Merge branch 'lash/external-notify-tasks' into 'master'
Discover queue for external tasks

See merge request grassrootseconomics/cic-internal-integration!89
2021-04-07 06:31:26 +00:00
Louis Holbrook
226699568f Discover queue for external tasks 2021-04-07 06:31:26 +00:00
Louis Holbrook
ec2b0e56e5 Merge branch 'lash/add-missing-state-file' into 'master'
Add stat file

See merge request grassrootseconomics/cic-internal-integration!92
2021-04-07 06:26:44 +00:00
94 changed files with 4556 additions and 1837 deletions

View File

@@ -26,9 +26,10 @@ from chainlib.eth.block import (
from hexathon import (
strip_0x,
)
from chainsyncer.backend import SyncerBackend
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
@@ -70,19 +71,21 @@ def main():
syncers = []
#if SyncerBackend.first(chain_spec):
# backend = SyncerBackend.initial(chain_spec, block_offset)
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
syncers.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend))
syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')

View File

@@ -17,7 +17,7 @@ RUN apt-get update && \
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a58
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a76
COPY cic-cache/requirements.txt ./
COPY cic-cache/setup.cfg \
@@ -47,6 +47,9 @@ RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
COPY cic-cache/docker/start_tracker.sh ./start_tracker.sh
COPY cic-cache/docker/db.sh ./db.sh
RUN chmod 755 ./*.sh
# Tracker
# ENTRYPOINT ["/usr/local/bin/cic-cache-tracker", "-vv"]
# Server

View File

@@ -0,0 +1,6 @@
#!/bin/bash
set -e
>&2 echo executing database migration
python scripts/migrate.py -c /usr/local/etc/cic-cache --migrations-dir /usr/local/share/cic-cache/alembic -vv
set +e

View File

@@ -0,0 +1,5 @@
#!/bin/bash
. ./db.sh
/usr/local/bin/cic-cache-trackerd $@

View File

@@ -1,13 +1,12 @@
cic-base~=0.1.2a62
cic-base~=0.1.2a77
alembic==1.4.2
confini~=0.3.6rc3
uwsgi==2.0.19.1
moolb~=0.1.0
cic-eth-registry~=0.5.4a12
cic-eth-registry~=0.5.4a16
SQLAlchemy==1.3.20
semver==2.13.0
psycopg2==2.8.6
celery==4.4.7
redis==3.5.3
chainlib~=0.0.2a5
chainsyncer~=0.0.1a21
chainsyncer[sql]~=0.0.2a2

View File

@@ -10,6 +10,7 @@ from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger()
@@ -64,6 +65,7 @@ class SessionBase(Model):
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
@@ -74,17 +76,22 @@ class SessionBase(Model):
echo=debug,
)
else:
if debug:
if pool_size == 0:
logg.info('db using nullpool')
poolclass = NullPool
elif debug:
logg.info('db using assertion pool')
poolclass = AssertionPool
else:
logg.info('db using static pool')
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db not poolable')
e = create_engine(
dsn,
echo=debug,

View File

@@ -1,6 +1,10 @@
# extended imports
# external imports
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.status import Status as TxStatus
from cic_eth_registry.erc20 import ERC20Token
# local imports
from cic_eth.ext.address import translate_address
class ExtendedTx:
@@ -27,12 +31,12 @@ class ExtendedTx:
self.status_code = TxStatus.PENDING.value
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
def set_actors(self, sender, recipient, trusted_declarator_addresses=None, caller_address=ZERO_ADDRESS):
self.sender = sender
self.recipient = recipient
if trusted_declarator_addresses != None:
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain_spec)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain_spec)
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain_spec, sender_address=caller_address)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain_spec, sender_address=caller_address)
def set_tokens(self, source, source_value, destination=None, destination_value=None):
@@ -40,8 +44,8 @@ class ExtendedTx:
destination = source
if destination_value == None:
destination_value = source_value
st = ERC20Token(self.rpc, source)
dt = ERC20Token(self.rpc, destination)
st = ERC20Token(self.chain_spec, self.rpc, source)
dt = ERC20Token(self.chain_spec, self.rpc, destination)
self.source_token = source
self.source_token_symbol = st.symbol
self.source_token_name = st.name

View File

@@ -15,7 +15,6 @@ from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
from chainlib.connection import RPCConnection
from chainsyncer.error import SyncDone
from hexathon import strip_0x
from chainqueue.db.enum import (
StatusEnum,
@@ -153,10 +152,7 @@ class DispatchSyncer:
def main():
syncer = DispatchSyncer(chain_spec)
conn = RPCConnection.connect(chain_spec, 'default')
try:
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
except SyncDone as e:
sys.stderr.write("dispatcher done at block {}\n".format(e))
syncer.loop(conn, float(config.get('DISPATCHER_LOOP_INTERVAL')))
sys.exit(0)

View File

@@ -1,7 +1,7 @@
# standard imports
import logging
# third-party imports
# external imports
import celery
from cic_eth_registry.error import UnknownContractError
from chainlib.status import Status as TxStatus
@@ -9,7 +9,13 @@ from chainlib.eth.address import to_checksum_address
from chainlib.eth.error import RequestMismatchException
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.erc20 import ERC20
from hexathon import strip_0x
from hexathon import (
strip_0x,
add_0x,
)
# TODO: use sarafu_Faucet for both when inheritance has been implemented
from erc20_single_shot_faucet import SingleShotFaucet
from sarafu_faucet import MinterFaucet as Faucet
# local imports
from .base import SyncFilter
@@ -18,55 +24,63 @@ from cic_eth.eth.meta import ExtendedTx
logg = logging.getLogger().getChild(__name__)
def parse_transfer(tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = {}
transfer_data['to'] = r[0]
transfer_data['value'] = r[1]
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
return ('transfer', transfer_data)
def parse_transferfrom(tx):
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = unpack_transferfrom(tx.payload)
transfer_data['from'] = r[0]
transfer_data['to'] = r[1]
transfer_data['value'] = r[2]
transfer_data['token_address'] = tx['to']
return ('transferfrom', transfer_data)
def parse_giftto(tx):
# TODO: broken
logg.error('broken')
return
transfer_data = unpack_gift(tx.payload)
transfer_data['from'] = tx.inputs[0]
transfer_data['value'] = 0
transfer_data['token_address'] = ZERO_ADDRESS
# TODO: would be better to query the gift amount from the block state
for l in tx.logs:
topics = l['topics']
logg.debug('topixx {}'.format(topics))
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
#transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
transfer_data['value'] = int.from_bytes(bytes.fromhex(strip_0x(l_data)))
#token_address_bytes = topics[2][32-20:]
token_address = strip_0x(topics[2])[64-40:]
transfer_data['token_address'] = to_checksum_address(token_address)
return ('tokengift', transfer_data)
class CallbackFilter(SyncFilter):
trusted_addresses = []
def __init__(self, chain_spec, method, queue):
def __init__(self, chain_spec, method, queue, caller_address=ZERO_ADDRESS):
self.queue = queue
self.method = method
self.chain_spec = chain_spec
self.caller_address = caller_address
def parse_transfer(self, tx, conn):
if not tx.payload:
return (None, None)
r = ERC20.parse_transfer_request(tx.payload)
transfer_data = {}
transfer_data['to'] = r[0]
transfer_data['value'] = r[1]
transfer_data['from'] = tx.outputs[0]
transfer_data['token_address'] = tx.inputs[0]
return ('transfer', transfer_data)
def parse_transferfrom(self, tx, conn):
if not tx.payload:
return (None, None)
r = ERC20.parse_transfer_from_request(tx.payload)
transfer_data = {}
transfer_data['from'] = r[0]
transfer_data['to'] = r[1]
transfer_data['value'] = r[2]
transfer_data['token_address'] = tx.inputs[0]
return ('transferfrom', transfer_data)
def parse_giftto(self, tx, conn):
if not tx.payload:
return (None, None)
r = Faucet.parse_give_to_request(tx.payload)
transfer_data = {}
transfer_data['to'] = r[0]
transfer_data['value'] = tx.value
transfer_data['from'] = tx.outputs[0]
#transfer_data['token_address'] = tx.inputs[0]
faucet_contract = tx.inputs[0]
c = SingleShotFaucet(self.chain_spec)
o = c.token(faucet_contract, sender_address=self.caller_address)
r = conn.do(o)
transfer_data['token_address'] = add_0x(c.parse_token(r))
o = c.amount(faucet_contract, sender_address=self.caller_address)
r = conn.do(o)
transfer_data['amount'] = add_0x(c.parse_amount(r))
return ('tokengift', transfer_data)
def call_back(self, transfer_type, result):
@@ -95,23 +109,26 @@ class CallbackFilter(SyncFilter):
return s
def parse_data(self, tx):
def parse_data(self, tx, conn):
transfer_type = None
transfer_data = None
# TODO: what's with the mix of attributes and dict keys
logg.debug('have payload {}'.format(tx.payload))
method_signature = tx.payload[:8]
logg.debug('tx status {}'.format(tx.status))
for parser in [
parse_transfer,
parse_transferfrom,
parse_giftto,
self.parse_transfer,
self.parse_transferfrom,
self.parse_giftto,
]:
try:
(transfer_type, transfer_data) = parser(tx)
break
if tx:
(transfer_type, transfer_data) = parser(tx, conn)
if transfer_type == None:
continue
else:
pass
except RequestMismatchException:
continue
@@ -128,7 +145,7 @@ class CallbackFilter(SyncFilter):
transfer_data = None
transfer_type = None
try:
(transfer_type, transfer_data) = self.parse_data(tx)
(transfer_type, transfer_data) = self.parse_data(tx, conn)
except TypeError:
logg.debug('invalid method data length for tx {}'.format(tx.hash))
return
@@ -144,7 +161,7 @@ class CallbackFilter(SyncFilter):
result = None
try:
tokentx = ExtendedTx(conn, tx.hash, self.chain_spec)
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses)
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses, caller_address=self.caller_address)
tokentx.set_tokens(transfer_data['token_address'], transfer_data['value'])
if transfer_data['status'] == 0:
tokentx.set_status(1)
@@ -153,7 +170,7 @@ class CallbackFilter(SyncFilter):
t = self.call_back(transfer_type, tokentx.to_dict())
logg.info('callback success task id {} tx {}'.format(t, tx.hash))
except UnknownContractError:
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash))
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tx.queue, tx.method, transfer_data['to'], tx.hash))
def __str__(self):

View File

@@ -91,7 +91,7 @@ logg.debug('config loaded from {}:\n{}'.format(args.c, config))
# connect to database
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=50, debug=config.true('DATABASE_DEBUG'))
SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query
session = SessionBase.create_session()

View File

@@ -15,7 +15,6 @@ import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_eth_registry import CICRegistry
from cic_eth_registry.error import UnknownContractError
from chainlib.chain import ChainSpec
from chainlib.eth.constant import ZERO_ADDRESS
@@ -26,7 +25,7 @@ from chainlib.eth.block import (
from hexathon import (
strip_0x,
)
from chainsyncer.backend import SyncerBackend
from chainsyncer.backend.sql import SQLBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
@@ -43,6 +42,12 @@ from cic_eth.runnable.daemons.filters import (
TransferAuthFilter,
)
from cic_eth.stat import init_chain_stat
from cic_eth.registry import (
connect as connect_registry,
connect_declarator,
connect_token_registry,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
@@ -88,18 +93,18 @@ def main():
syncers = []
#if SyncerBackend.first(chain_spec):
# backend = SyncerBackend.initial(chain_spec, block_offset)
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
#if SQLBackend.first(chain_spec):
# backend = SQLBackend.initial(chain_spec, block_offset)
syncer_backends = SQLBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
syncer_backends.append(SQLBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
syncer_backends.append(SQLBackend.live(chain_spec, block_offset+1))
for syncer_backend in syncer_backends:
try:
@@ -109,6 +114,8 @@ def main():
logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend))
syncers.append(HeadSyncer(syncer_backend))
connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
@@ -116,6 +123,8 @@ def main():
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
connect_declarator(rpc, chain_spec, trusted_addresses)
connect_token_registry(rpc, chain_spec)
CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = []

View File

@@ -4,7 +4,7 @@ import datetime
# external imports
from chainsyncer.driver import HeadSyncer
from chainsyncer.backend import MemBackend
from chainsyncer.backend.memory import MemBackend
from chainsyncer.error import NoBlockForYou
from chainlib.eth.block import (
block_by_number,

View File

@@ -10,7 +10,7 @@ version = (
0,
11,
0,
'beta.1',
'beta.6',
)
version_object = semver.VersionInfo(

View File

@@ -6,4 +6,5 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
POOL_SIZE=50
DEBUG=0

View File

@@ -6,4 +6,5 @@ HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
POOL_SIZE=50
DEBUG=0

View File

@@ -29,7 +29,7 @@ RUN /usr/local/bin/python -m pip install --upgrade pip
# python merge_requirements.py | tee merged_requirements.txt
#RUN cd cic-base && \
# pip install $pip_extra_index_url_flag -r ./merged_requirements.txt
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a62
RUN pip install $pip_extra_index_url_flag cic-base[full_graph]==0.1.2a77
COPY cic-eth/scripts/ scripts/
COPY cic-eth/setup.cfg cic-eth/setup.py ./

View File

@@ -1,25 +1,25 @@
cic-base~=0.1.2a62
cic-base~=0.1.2a76
celery==4.4.7
crypto-dev-signer~=0.4.14a17
crypto-dev-signer~=0.4.14b2
confini~=0.3.6rc3
cic-eth-registry~=0.5.4a12
cic-eth-registry~=0.5.4a16
#cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2
websockets==8.1
requests~=2.24.0
eth_accounts_index~=0.0.11a7
erc20-transfer-authorization~=0.3.1a3
#simple-rlp==0.1.2
eth_accounts_index~=0.0.11a9
erc20-transfer-authorization~=0.3.1a5
uWSGI==2.0.19.1
semver==2.13.0
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.1a7
chainlib~=0.0.2a5
eth-address-index~=0.1.1a9
chainlib~=0.0.2a13
hexathon~=0.0.1a7
chainsyncer~=0.0.1a21
chainsyncer[sql]~=0.0.2a2
chainqueue~=0.0.1a7
pysha3==1.0.2
coincurve==15.0.0
sarafu-faucet~=0.0.2a19
sarafu-faucet==0.0.2a28
potaahto~=0.0.1a1

View File

@@ -4,4 +4,4 @@ pytest-mock==3.3.1
pytest-cov==2.10.1
eth-tester==0.5.0b3
py-evm==0.3.0a20
giftable-erc20-token==0.0.8a4
giftable-erc20-token==0.0.8a9

View File

@@ -0,0 +1,223 @@
# standard import
import logging
import datetime
import os
# external imports
import pytest
from chainlib.connection import RPCConnection
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.tx import (
receipt,
transaction,
Tx,
)
from chainlib.eth.block import Block
from chainlib.eth.erc20 import ERC20
from sarafu_faucet import MinterFaucet
from eth_accounts_index import AccountRegistry
from potaahto.symbols import snake_and_camel
from hexathon import add_0x
# local imports
from cic_eth.runnable.daemons.filters.callback import CallbackFilter
logg = logging.getLogger()
@pytest.mark.skip()
def test_transfer_tx(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
foo_token,
agent_roles,
token_roles,
contract_roles,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], rpc)
gas_oracle = OverrideGasOracle(conn=rpc, limit=200000)
txf = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], 1024)
r = rpc.do(o)
o = transaction(tx_hash_hex)
r = rpc.do(o)
logg.debug(r)
tx_src = snake_and_camel(r)
tx = Tx(tx_src)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
rcpt = snake_and_camel(r)
tx.apply_receipt(rcpt)
fltr = CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER'])
(transfer_type, transfer_data) = fltr.parse_transfer(tx, eth_rpc)
assert transfer_type == 'transfer'
@pytest.mark.skip()
def test_transfer_from_tx(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
foo_token,
agent_roles,
token_roles,
contract_roles,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], rpc)
gas_oracle = OverrideGasOracle(conn=rpc, limit=200000)
txf = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.approve(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], 1024)
r = rpc.do(o)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], rpc)
txf = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.transfer_from(foo_token, agent_roles['ALICE'], token_roles['FOO_TOKEN_OWNER'], agent_roles['BOB'], 1024)
r = rpc.do(o)
o = transaction(tx_hash_hex)
r = rpc.do(o)
tx_src = snake_and_camel(r)
tx = Tx(tx_src)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
rcpt = snake_and_camel(r)
tx.apply_receipt(rcpt)
fltr = CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER'])
(transfer_type, transfer_data) = fltr.parse_transferfrom(tx, eth_rpc)
assert transfer_type == 'transferfrom'
def test_faucet_gift_to_tx(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
foo_token,
agent_roles,
contract_roles,
faucet,
account_registry,
celery_session_worker,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
gas_oracle = OverrideGasOracle(conn=rpc, limit=800000)
nonce_oracle = RPCNonceOracle(contract_roles['ACCOUNT_REGISTRY_WRITER'], rpc)
txf = AccountRegistry(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.add(account_registry, contract_roles['ACCOUNT_REGISTRY_WRITER'], agent_roles['ALICE'])
r = rpc.do(o)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], rpc)
txf = MinterFaucet(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.give_to(faucet, agent_roles['ALICE'], agent_roles['ALICE'])
r = rpc.do(o)
o = transaction(tx_hash_hex)
r = rpc.do(o)
tx_src = snake_and_camel(r)
tx = Tx(tx_src)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
rcpt = snake_and_camel(r)
tx.apply_receipt(rcpt)
fltr = CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER'])
(transfer_type, transfer_data) = fltr.parse_giftto(tx, eth_rpc)
assert transfer_type == 'tokengift'
assert transfer_data['token_address'] == foo_token
def test_callback_filter(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
foo_token,
token_roles,
agent_roles,
contract_roles,
register_lookups,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(token_roles['FOO_TOKEN_OWNER'], rpc)
gas_oracle = OverrideGasOracle(conn=rpc, limit=200000)
txf = ERC20(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, o) = txf.transfer(foo_token, token_roles['FOO_TOKEN_OWNER'], agent_roles['ALICE'], 1024)
r = rpc.do(o)
o = transaction(tx_hash_hex)
r = rpc.do(o)
logg.debug(r)
mockblock_src = {
'hash': add_0x(os.urandom(32).hex()),
'number': '0x2a',
'transactions': [tx_hash_hex],
'timestamp': datetime.datetime.utcnow().timestamp(),
}
mockblock = Block(mockblock_src)
tx_src = snake_and_camel(r)
tx = Tx(tx_src, block=mockblock)
o = receipt(tx_hash_hex)
r = rpc.do(o)
assert r['status'] == 1
rcpt = snake_and_camel(r)
tx.apply_receipt(rcpt)
fltr = CallbackFilter(default_chain_spec, None, None, caller_address=contract_roles['CONTRACT_DEPLOYER'])
class CallbackMock:
def __init__(self):
self.results = {}
def call_back(self, transfer_type, result):
self.results[transfer_type] = result
mock = CallbackMock()
fltr.call_back = mock.call_back
fltr.filter(eth_rpc, mockblock, tx, init_database)
assert mock.results.get('transfer') != None
assert mock.results['transfer']['destination_token'] == foo_token

View File

@@ -1,6 +1,6 @@
{
"name": "cic-client-meta",
"version": "0.0.7-alpha.3",
"version": "0.0.7-alpha.6",
"description": "Signed CRDT metadata graphs for the CIC network",
"main": "dist/index.js",
"types": "dist/index.d.ts",
@@ -40,6 +40,6 @@
],
"license": "GPL-3.0-or-later",
"engines": {
"node": "~15.3.0"
"node": "~14.16.1"
}
}

View File

@@ -114,6 +114,7 @@ async function processRequest(req, res) {
return;
}
if (!['PUT', 'GET', 'POST'].includes(req.method)) {
res.writeHead(405, {"Content-Type": "text/plain"});
res.end();
@@ -123,6 +124,7 @@ async function processRequest(req, res) {
try {
digest = parseDigest(req.url);
} catch(e) {
console.error('digest error: ' + e)
res.writeHead(400, {"Content-Type": "text/plain"});
res.end();
return;

View File

@@ -1,12 +1,12 @@
import { ArgPair, Syncable } from '../sync';
import { Addressable, addressToBytes, bytesToHex, toKey } from '../digest';
import { Addressable, mergeKey } from '../digest';
class Phone extends Syncable implements Addressable {
address: string
value: number
constructor(address:string, v:number) {
constructor(address:string, v:string) {
const o = {
msisdn: v,
}
@@ -17,8 +17,8 @@ class Phone extends Syncable implements Addressable {
});
}
public static async toKey(msisdn:number) {
return await toKey(msisdn.toString(), ':cic.msisdn');
public static async toKey(msisdn:string) {
return await mergeKey(Buffer.from(msisdn), Buffer.from(':cic.phone'));
}
public key(): string {

View File

@@ -61,6 +61,7 @@ function addressToBytes(s:string) {
export {
toKey,
toAddressKey,
mergeKey,
bytesToHex,
addressToBytes,
Addressable,

View File

@@ -3,6 +3,7 @@ import logging
import re
# third-party imports
from celery.app.control import Inspect
import celery
# local imports
@@ -15,6 +16,29 @@ logg = logging.getLogger()
sms_tasks_matcher = r"^(cic_notify.tasks.sms)(\.\w+)?"
re_q = r'^cic-notify'
def get_sms_queue_tasks(app, task_prefix='cic_notify.tasks.sms.'):
host_queues = []
i = Inspect(app=app)
qs = i.active_queues()
for host in qs.keys():
for q in qs[host]:
if re.match(re_q, q['name']):
host_queues.append((host, q['name'],))
task_prefix_len = len(task_prefix)
queue_tasks = []
for (host, queue) in host_queues:
i = Inspect(app=app, destination=[host])
for tasks in i.registered_tasks().values():
for task in tasks:
if len(task) >= task_prefix_len and task[:task_prefix_len] == task_prefix:
queue_tasks.append((queue, task,))
return queue_tasks
class Api:
# TODO: Implement callback strategy
def __init__(self, queue='cic-notify'):
@@ -22,17 +46,9 @@ class Api:
:param queue: The queue on which to execute notification tasks
:type queue: str
"""
registered_tasks = app.tasks
self.sms_tasks = []
self.sms_tasks = get_sms_queue_tasks(app)
logg.debug('sms tasks {}'.format(self.sms_tasks))
for task in registered_tasks.keys():
logg.debug(f'Found: {task} {registered_tasks[task]}')
match = re.match(sms_tasks_matcher, task)
if match:
self.sms_tasks.append(task)
self.queue = queue
logg.info(f'api using queue: {self.queue}')
def sms(self, message, recipient):
"""This function chains all sms tasks in order to send a message, log and persist said data to disk
@@ -44,12 +60,17 @@ class Api:
:rtype: Celery.Task
"""
signatures = []
for task in self.sms_tasks:
signature = celery.signature(task)
for q in self.sms_tasks:
signature = celery.signature(
q[1],
[
message,
recipient,
],
queue=q[0],
)
signatures.append(signature)
signature_group = celery.group(signatures)
result = signature_group.apply_async(
args=[message, recipient],
queue=self.queue
)
return result
t = celery.group(signatures)()
return t

View File

@@ -0,0 +1,76 @@
# standard imports
import sys
import os
import logging
import argparse
import tempfile
# external imports
import celery
import confini
# local imports
from cic_notify.api import Api
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = os.path.join('/usr/local/etc/cic-notify')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
argparser.add_argument('recipient', type=str, help='notification recipient')
argparser.add_argument('message', type=str, help='message text')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
config.censor('PASSWORD', 'DATABASE')
config.add(args.recipient, '_RECIPIENT', True)
config.add(args.message, '_MESSAGE', True)
# set up celery
app = celery.Celery(__name__)
broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
app.conf.update({
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
app.conf.update({
'broker_url': broker,
})
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
rq = tempfile.mkdtemp()
app.conf.update({
'result_backend': 'file://{}'.format(rq),
})
logg.warning('celery backend store dir {} created, will NOT be deleted on shutdown'.format(rq))
else:
app.conf.update({
'result_backend': result,
})
if __name__ == '__main__':
a = Api()
t = a.sms(config.get('_RECIPIENT'), config.get('_MESSAGE'))

View File

@@ -1,4 +1,4 @@
FROM python:3.8.6
FROM python:3.8.6-slim-buster
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps

View File

@@ -1 +1 @@
cic_base[full_graph]~=0.1.2a46
cic_base[full_graph]~=0.1.2a61

View File

@@ -1,6 +1,6 @@
[metadata]
name = cic-notify
version= 0.4.0a2
version= 0.4.0a3
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no
@@ -45,3 +45,4 @@ testing =
[options.entry_points]
console_scripts =
cic-notify-tasker = cic_notify.runnable.tasker:main
cic-notify-send = cic_notify.runnable.send:main

View File

@@ -1,14 +1,24 @@
[app]
ALLOWED_IP=127.0.0.1
ALLOWED_IP=0.0.0.0/0
LOCALE_FALLBACK=en
LOCALE_PATH=var/lib/locale/
LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json
user =
pass =
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/
[client]
host =
port =
ssl =

View File

@@ -6,3 +6,5 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0
POOL_SIZE=1

View File

@@ -1,9 +1,9 @@
[celery]
BROKER_URL=redis://
RESULT_URL=redis://
BROKER_URL=redis://redis:6379
RESULT_URL=redis://redis:6379
[redis]
HOSTNAME=localhost
HOSTNAME=redis
PASSWORD=
PORT=6379
DATABASE=0

View File

@@ -1,47 +1,129 @@
# standard imports
# stanard imports
import logging
import datetime
# third-party imports
# external imports
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger().getChild(__name__)
Model = declarative_base(name='Model')
class SessionBase(Model):
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
"""
__abstract__ = True
id = Column(Integer, primary_key=True)
created = Column(DateTime, default=datetime.datetime.utcnow)
updated = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
id = Column(Integer, primary_key=True)
engine = None
session = None
query = None
"""Database connection engine of the running aplication"""
sessionmaker = None
"""Factory object responsible for creating sessions from the connection pool"""
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
def create_session():
session = sessionmaker(bind=SessionBase.engine)
return session()
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
"""
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@staticmethod
def build():
Model.metadata.create_all(bind=SessionBase.engine)
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
:type dsn: str
"""
e = None
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if pool_size == 0:
poolclass = NullPool
elif debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db connection not poolable')
e = create_engine(
dsn,
echo=debug,
)
SessionBase._set_engine(e)
@staticmethod
# https://docs.sqlalchemy.org/en/13/core/pooling.html#pool-disconnects
def connect(data_source_name):
engine = create_engine(data_source_name, pool_pre_ping=True)
SessionBase._set_engine(engine)
@staticmethod
def disconnect():
"""Disconnect from database and free resources.
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()

View File

@@ -18,7 +18,7 @@ class ActionDataNotFoundError(OSError):
pass
class UserMetadataNotFoundError(OSError):
class MetadataNotFoundError(OSError):
"""Raised when metadata is expected but not available in cache."""
pass
@@ -31,3 +31,10 @@ class UnsupportedMethodError(OSError):
class CachedDataNotFoundError(OSError):
"""Raised when the method passed to the make request function is unsupported."""
pass
class MetadataStoreError(Exception):
"""Raised when metadata storage fails"""
pass

View File

@@ -3,7 +3,10 @@
# third-party imports
import requests
from chainlib.eth.address import to_checksum
from hexathon import add_0x
from hexathon import (
add_0x,
strip_0x,
)
# local imports
from cic_ussd.error import UnsupportedMethodError
@@ -40,4 +43,4 @@ def blockchain_address_to_metadata_pointer(blockchain_address: str):
:return:
:rtype:
"""
return bytes.fromhex(blockchain_address[2:])
return bytes.fromhex(strip_0x(blockchain_address))

View File

@@ -0,0 +1,126 @@
# standard imports
import json
import logging
import os
from typing import Dict, Union
# third-part imports
import requests
from cic_types.models.person import generate_metadata_pointer, Person
# local imports
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
from cic_ussd.error import MetadataStoreError
logg = logging.getLogger().getChild(__name__)
class Metadata:
"""
:cvar base_url: The base url or the metadata server.
:type base_url: str
"""
base_url = None
def metadata_http_error_handler(result: requests.Response):
""" This function handles and appropriately raises errors from http requests interacting with the metadata server.
:param result: The response object from a http request.
:type result: requests.Response
"""
status_code = result.status_code
if 100 <= status_code < 200:
raise MetadataStoreError(f'Informational errors: {status_code}, reason: {result.reason}')
elif 300 <= status_code < 400:
raise MetadataStoreError(f'Redirect Issues: {status_code}, reason: {result.reason}')
elif 400 <= status_code < 500:
raise MetadataStoreError(f'Client Error: {status_code}, reason: {result.reason}')
elif 500 <= status_code < 600:
raise MetadataStoreError(f'Server Error: {status_code}, reason: {result.reason}')
class MetadataRequestsHandler(Metadata):
def __init__(self, cic_type: str, identifier: bytes, engine: str = 'pgp'):
"""
:param cic_type: The salt value with which to hash a specific metadata identifier.
:type cic_type: str
:param engine: Encryption used for sending data to the metadata server.
:type engine: str
:param identifier: A unique element of data in bytes necessary for creating a metadata pointer.
:type identifier: bytes
"""
self.cic_type = cic_type
self.engine = engine
self.headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
self.identifier = identifier
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier,
cic_type=self.cic_type
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: Union[Dict, str]):
""" This function is responsible for posting data to the metadata server with a corresponding metadata pointer
for storage.
:param data: The data to be stored in the metadata server.
:type data: dict|str
"""
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata_http_error_handler(result=result)
metadata = result.content
self.edit(data=metadata)
def edit(self, data: bytes):
""" This function is responsible for editing data in the metadata server corresponding to a unique pointer.
:param data: The data to be edited in the metadata server.
:type data: bytes
"""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
decoded_data = data.decode('utf-8')
formatted_data = {
'm': data.decode('utf-8'),
's': {
'engine': self.engine,
'algo': algorithm,
'data': signature,
'digest': json.loads(data).get('digest'),
}
}
formatted_data = json.dumps(formatted_data).encode('utf-8')
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'signed metadata submission status: {result.status_code}.')
metadata_http_error_handler(result=result)
try:
decoded_identifier = self.identifier.decode("utf-8")
except UnicodeDecodeError:
decoded_identifier = self.identifier.hex()
logg.info(f'identifier: {decoded_identifier}. metadata pointer: {self.metadata_pointer} set to: {decoded_data}.')
def query(self):
"""This function is responsible for querying the metadata server for data corresponding to a unique pointer."""
result = make_request(method='GET', url=self.url)
metadata_http_error_handler(result=result)
response_data = result.content
data = json.loads(response_data.decode('utf-8'))
if result.status_code == 200 and self.cic_type == 'cic.person':
person = Person()
deserialized_person = person.deserialize(person_data=json.loads(data))
data = json.dumps(deserialized_person.serialize())
cache_data(self.metadata_pointer, data=data)
logg.debug(f'caching: {data} with key: {self.metadata_pointer}')

View File

@@ -0,0 +1,12 @@
# standard imports
# third-party imports
# local imports
from .base import MetadataRequestsHandler
class PersonMetadata(MetadataRequestsHandler):
def __init__(self, identifier: bytes):
super().__init__(cic_type='cic.person', identifier=identifier)

View File

@@ -0,0 +1,13 @@
# standard imports
import logging
# external imports
# local imports
from .base import MetadataRequestsHandler
class PhonePointerMetadata(MetadataRequestsHandler):
def __init__(self, identifier: bytes):
super().__init__(cic_type='cic.msisdn', identifier=identifier)

View File

@@ -44,7 +44,7 @@ class Signer:
gpg_keys = self.gpg.list_keys()
key_algorithm = gpg_keys[0].get('algo')
key_id = gpg_keys[0].get("keyid")
logg.info(f'using signing key: {key_id}, algorithm: {key_algorithm}')
logg.debug(f'using signing key: {key_id}, algorithm: {key_algorithm}')
return gpg_keys[0]
def sign_digest(self, data: bytes):

View File

@@ -1,102 +0,0 @@
# standard imports
import json
import logging
import os
# third-party imports
import requests
from cic_types.models.person import generate_metadata_pointer, Person
# local imports
from cic_ussd.chain import Chain
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
logg = logging.getLogger()
class UserMetadata:
"""
:cvar base_url:
:type base_url:
"""
base_url = None
def __init__(self, identifier: bytes):
"""
:param identifier:
:type identifier:
"""
self. headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
self.identifier = identifier
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier,
cic_type='cic.person'
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: dict):
try:
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata = result.content
self.edit(data=metadata, engine='pgp')
logg.info(f'Get sign material response status: {result.status_code}')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def edit(self, data: bytes, engine: str):
"""
:param data:
:type data:
:param engine:
:type engine:
:return:
:rtype:
"""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
formatted_data = {
'm': data.decode('utf-8'),
's': {
'engine': engine,
'algo': algorithm,
'data': signature,
'digest': json.loads(data).get('digest'),
}
}
formatted_data = json.dumps(formatted_data).encode('utf-8')
try:
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'Signed content submission status: {result.status_code}.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def query(self):
result = make_request(method='GET', url=self.url)
status = result.status_code
logg.info(f'Get latest data status: {status}')
try:
if status == 200:
response_data = result.content
data = json.loads(response_data.decode())
# validate data
person = Person()
deserialized_person = person.deserialize(person_data=json.loads(data))
cache_data(key=self.metadata_pointer, data=json.dumps(deserialized_person.serialize()))
elif status == 404:
logg.info('The data is not available and might need to be added.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)

View File

@@ -15,7 +15,7 @@ from cic_ussd.balance import BalanceManager, compute_operational_balance, get_ca
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.error import UserMetadataNotFoundError
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.phone_number import get_user_by_phone_number
@@ -235,7 +235,7 @@ def process_display_user_metadata(user: User, display_key: str):
products=products
)
else:
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {user.blockchain_address}')
raise MetadataNotFoundError(f'Expected person metadata but found none in cache for key: {key}')
def process_account_statement(user: User, display_key: str, ussd_session: dict):
@@ -331,11 +331,11 @@ def process_start_menu(display_key: str, user: User):
operational_balance = compute_operational_balance(balances=balances_data)
# retrieve and cache account's metadata
s_query_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_user_metadata.apply_async(queue='cic-ussd')
s_query_person_metadata.apply_async(queue='cic-ussd')
# retrieve and cache account's statement
retrieve_account_statement(blockchain_address=blockchain_address)
@@ -389,14 +389,11 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
cic_type='cic.person'
)
logg.debug(f'METADATA POINTER: {key}')
user_metadata = get_cached_data(key=key)
logg.debug(f'METADATA: {user_metadata}')
person_metadata = get_cached_data(key=key)
if last_ussd_session:
# get last state
last_state = last_ussd_session.state
logg.debug(f'LAST USSD SESSION STATE: {last_state}')
# if last state is account_creation_prompt and metadata exists, show start menu
if last_state in [
'account_creation_prompt',
@@ -405,7 +402,7 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
'exit_invalid_new_pin',
'exit_pin_mismatch',
'exit_invalid_request'
] and user_metadata is not None:
] and person_metadata is not None:
return UssdMenu.find_by_name(name='start')
else:
return UssdMenu.find_by_name(name=last_state)

View File

@@ -23,10 +23,11 @@ from cic_ussd.encoder import PasswordEncoder
from cic_ussd.files.local_files import create_local_file_data_stores, json_file_parser
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.base import Metadata
from cic_ussd.operations import (define_response_with_content,
process_menu_interaction_requests,
define_multilingual_responses)
from cic_ussd.phone_number import process_phone_number
from cic_ussd.redis import InMemoryStore
from cic_ussd.requests import (get_request_endpoint,
get_request_method,
@@ -64,7 +65,6 @@ config.censor('PASSWORD', 'DATABASE')
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
@@ -86,7 +86,7 @@ UssdMenu.ussd_menu_db = ussd_menu_db
# set up db
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name=data_source_name)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# create session for the life time of http request
SessionBase.session = SessionBase.create_session()
@@ -99,7 +99,7 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
UserMetadata.base_url = config.get('CIC_META_URL')
Metadata.base_url = config.get('CIC_META_URL')
# define signer values
export_dir = config.get('PGP_EXPORT_DIR')
@@ -151,6 +151,10 @@ def application(env, start_response):
external_session_id = post_data.get('sessionId')
user_input = post_data.get('text')
# add validation for phone number
if phone_number:
phone_number = process_phone_number(phone_number=phone_number, region=config.get('PHONE_NUMBER_REGION'))
# validate ip address
if not check_ip(config=config, env=env):
start_response('403 Sneaky, sneaky', errors_headers)
@@ -174,8 +178,10 @@ def application(env, start_response):
# validate phone number
if not validate_phone_number(phone_number):
logg.error('invalid phone number {}'.format(phone_number))
start_response('400 Invalid phone number format', errors_headers)
return []
logg.debug('session {} started for {}'.format(external_session_id, phone_number))
# handle menu interaction requests
chain_str = chain_spec.__str__()

View File

@@ -13,13 +13,14 @@ from confini import Config
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.base import Metadata
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.validator import validate_presence
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('gnupg').setLevel(logging.WARNING)
config_directory = '/usr/local/etc/cic-ussd/'
@@ -47,7 +48,7 @@ logg.debug(config)
# connect to database
data_source_name = dsn_from_config(config)
SessionBase.connect(data_source_name=data_source_name)
SessionBase.connect(data_source_name, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query
session = SessionBase.create_session()
@@ -63,7 +64,7 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
UserMetadata.base_url = config.get('CIC_META_URL')
Metadata.base_url = config.get('CIC_META_URL')
# define signer values
export_dir = config.get('PGP_EXPORT_DIR')

View File

@@ -97,11 +97,11 @@ def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, User]):
recipient = get_user_by_phone_number(phone_number=user_input)
blockchain_address = recipient.blockchain_address
# retrieve and cache account's metadata
s_query_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_user_metadata.apply_async(queue='cic-ussd')
s_query_person_metadata.apply_async(queue='cic-ussd')
def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, User]):

View File

@@ -11,7 +11,7 @@ from cic_types.models.person import generate_vcard_from_contact_data, manage_ide
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import User
from cic_ussd.error import UserMetadataNotFoundError
from cic_ussd.error import MetadataNotFoundError
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.operations import save_to_in_memory_ussd_session_data
from cic_ussd.redis import get_cached_data
@@ -164,11 +164,11 @@ def save_complete_user_metadata(state_machine_data: Tuple[str, dict, User]):
user_metadata = format_user_metadata(metadata=metadata, user=user)
blockchain_address = user.blockchain_address
s_create_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.create_user_metadata',
s_create_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.create_person_metadata',
[blockchain_address, user_metadata]
)
s_create_user_metadata.apply_async(queue='cic-ussd')
s_create_person_metadata.apply_async(queue='cic-ussd')
def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
@@ -181,7 +181,7 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
user_metadata = get_cached_data(key=key)
if not user_metadata:
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
raise MetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
given_name = ussd_session.get('session_data').get('given_name')
family_name = ussd_session.get('session_data').get('family_name')
@@ -211,18 +211,18 @@ def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
edited_metadata = deserialized_person.serialize()
s_edit_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.edit_user_metadata',
[blockchain_address, edited_metadata, 'pgp']
s_edit_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.edit_person_metadata',
[blockchain_address, edited_metadata]
)
s_edit_user_metadata.apply_async(queue='cic-ussd')
s_edit_person_metadata.apply_async(queue='cic-ussd')
def get_user_metadata(state_machine_data: Tuple[str, dict, User]):
user_input, ussd_session, user = state_machine_data
blockchain_address = user.blockchain_address
s_get_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_get_user_metadata.apply_async(queue='cic-ussd')

View File

@@ -5,9 +5,24 @@ import celery
import sqlalchemy
# local imports
from cic_ussd.error import MetadataStoreError
from cic_ussd.db.models.base import SessionBase
class CriticalTask(celery.Task):
class BaseTask(celery.Task):
session_func = SessionBase.create_session
def create_session(self):
return BaseTask.session_func()
def log_banner(self):
logg.debug('task {} root uuid {}'.format(self.__class__.__name__, self.request.root_id))
return
class CriticalTask(BaseTask):
retry_jitter = True
retry_backoff = True
retry_backoff_max = 8
@@ -17,4 +32,11 @@ class CriticalSQLAlchemyTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
sqlalchemy.exc.ResourceClosedError,
)
class CriticalMetadataTask(CriticalTask):
autoretry_for = (
MetadataStoreError,
)

View File

@@ -53,6 +53,13 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
session.add(user)
session.commit()
session.close()
queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'cic_ussd.tasks.metadata.add_phone_pointer',
[result, phone_number]
)
s.apply_async(queue=queue)
# expire cache
cache.expire(task_id, timedelta(seconds=180))
@@ -65,6 +72,8 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
session.close()
raise ActionDataNotFoundError(f'Account creation task: {task_id}, returned unexpected response: {status_code}')
session.close()
@celery_app.task
def process_incoming_transfer_callback(result: dict, param: str, status_code: int):
@@ -118,6 +127,7 @@ def process_incoming_transfer_callback(result: dict, param: str, status_code: in
session.close()
raise ValueError(f'Unexpected status code: {status_code}.')
session.close()
@celery_app.task
def process_balances_callback(result: list, param: str, status_code: int):
@@ -161,7 +171,6 @@ def define_transaction_action_tag(
def process_statement_callback(result, param: str, status_code: int):
if status_code == 0:
# create session
session = SessionBase.create_session()
processed_transactions = []
# process transaction data to cache
@@ -174,6 +183,7 @@ def process_statement_callback(result, param: str, status_code: int):
if '0x0000000000000000000000000000000000000000' in source_token:
pass
else:
session = SessionBase.create_session()
# describe a processed transaction
processed_transaction = {}
@@ -202,6 +212,8 @@ def process_statement_callback(result, param: str, status_code: int):
else:
logg.warning(f'Tx with recipient not found in cic-ussd')
session.close()
# add transaction values
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value')).__str__()
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value')).__str__()

View File

@@ -1,20 +1,22 @@
# standard imports
import json
import logging
# third-party imports
import celery
from hexathon import strip_0x
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.person import PersonMetadata
from cic_ussd.metadata.phone import PhonePointerMetadata
from cic_ussd.tasks.base import CriticalMetadataTask
celery_app = celery.current_app
logg = logging.getLogger()
logg = logging.getLogger().getChild(__name__)
@celery_app.task
def query_user_metadata(blockchain_address: str):
def query_person_metadata(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
@@ -22,12 +24,12 @@ def query_user_metadata(blockchain_address: str):
:rtype:
"""
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.query()
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.query()
@celery_app.task
def create_user_metadata(blockchain_address: str, data: dict):
def create_person_metadata(blockchain_address: str, data: dict):
"""
:param blockchain_address:
:type blockchain_address:
@@ -37,12 +39,20 @@ def create_user_metadata(blockchain_address: str, data: dict):
:rtype:
"""
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.create(data=data)
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.create(data=data)
@celery_app.task
def edit_user_metadata(blockchain_address: str, data: bytes, engine: str):
def edit_person_metadata(blockchain_address: str, data: bytes):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
user_metadata_client.edit(data=data, engine=engine)
person_metadata_client = PersonMetadata(identifier=identifier)
person_metadata_client.edit(data=data)
@celery_app.task(bind=True, base=CriticalMetadataTask)
def add_phone_pointer(self, blockchain_address: str, phone_number: str):
identifier = phone_number.encode('utf-8')
stripped_address = strip_0x(blockchain_address)
phone_metadata_client = PhonePointerMetadata(identifier=identifier)
phone_metadata_client.create(data=stripped_address)

View File

@@ -70,3 +70,4 @@ def persist_session_to_db(external_session_id: str):
session.close()
raise SessionNotFoundError('Session does not exist!')
session.close()

View File

@@ -2,6 +2,7 @@
import logging
import os
import re
import ipaddress
# third-party imports
from confini import Config
@@ -21,7 +22,14 @@ def check_ip(config: Config, env: dict):
:return: Request IP validity
:rtype: boolean
"""
return env.get('REMOTE_ADDR') == config.get('APP_ALLOWED_IP')
# TODO: do once at boot time
actual_ip = ipaddress.ip_network(env.get('REMOTE_ADDR') + '/32')
for allowed_net_src in config.get('APP_ALLOWED_IP').split(','):
allowed_net = ipaddress.ip_network(allowed_net_src)
if actual_ip.subnet_of(allowed_net):
return True
return False
def check_request_content_length(config: Config, env: dict):

View File

@@ -51,4 +51,4 @@ RUN cd cic-ussd && \
COPY cic-ussd/.config/ /usr/local/etc/cic-ussd/
COPY cic-ussd/cic_ussd/db/migrations/ /usr/local/share/cic-ussd/alembic
WORKDIR /root
WORKDIR /root

View File

@@ -2,4 +2,4 @@
. /root/db.sh
/usr/local/bin/cic-ussd-tasker -vv "$@"
/usr/local/bin/cic-ussd-tasker $@

View File

@@ -2,4 +2,6 @@
. /root/db.sh
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :9000 --pyargv "-vv"
server_port=${SERVER_PORT:-9000}
/usr/local/bin/uwsgi --wsgi-file /usr/local/lib/python3.8/site-packages/cic_ussd/runnable/server.py --http :$server_port --pyargv "$@"

View File

@@ -1,4 +1,4 @@
cic_base[full_graph]~=0.1.2a58
cic-eth~=0.11.0a4
cic_base[full_graph]~=0.1.2a68
cic-eth~=0.11.0b3
cic-notify~=0.4.0a3
cic-types~=0.1.0a10

View File

@@ -9,26 +9,26 @@ from cic_types.models.person import generate_metadata_pointer
# local imports
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.person import PersonMetadata
from cic_ussd.redis import get_cached_data
def test_user_metadata(create_activated_user, define_metadata_pointer_url, load_config):
UserMetadata.base_url = load_config.get('CIC_META_URL')
PersonMetadata.base_url = load_config.get('CIC_META_URL')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
person_metadata_client = PersonMetadata(identifier=identifier)
assert user_metadata_client.url == define_metadata_pointer_url
assert person_metadata_client.url == define_metadata_pointer_url
def test_create_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
mock_meta_post_response,
person_metadata):
def test_create_person_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
mock_meta_post_response,
person_metadata):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
person_metadata_client = PersonMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
@@ -38,7 +38,7 @@ def test_create_user_metadata(caplog,
reason='CREATED',
content=json.dumps(mock_meta_post_response).encode('utf-8')
)
user_metadata_client.create(data=person_metadata)
person_metadata_client.create(data=person_metadata)
assert 'Get signed material response status: 201' in caplog.text
with pytest.raises(RuntimeError) as error:
@@ -49,19 +49,19 @@ def test_create_user_metadata(caplog,
status_code=400,
reason='BAD REQUEST'
)
user_metadata_client.create(data=person_metadata)
person_metadata_client.create(data=person_metadata)
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
def test_edit_user_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
person_metadata,
setup_metadata_signer):
def test_edit_person_metadata(caplog,
create_activated_user,
define_metadata_pointer_url,
load_config,
person_metadata,
setup_metadata_signer):
Signer.gpg_passphrase = load_config.get('KEYS_PASSPHRASE')
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
person_metadata_client = PersonMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'PUT',
@@ -69,7 +69,7 @@ def test_edit_user_metadata(caplog,
status_code=200,
reason='OK'
)
user_metadata_client.edit(data=person_metadata, engine='pgp')
person_metadata_client.edit(data=person_metadata)
assert 'Signed content submission status: 200' in caplog.text
with pytest.raises(RuntimeError) as error:
@@ -80,7 +80,7 @@ def test_edit_user_metadata(caplog,
status_code=400,
reason='BAD REQUEST'
)
user_metadata_client.edit(data=person_metadata, engine='pgp')
person_metadata_client.edit(data=person_metadata)
assert str(error.value) == f'400 Client Error: BAD REQUEST for url: {define_metadata_pointer_url}'
@@ -92,7 +92,7 @@ def test_get_user_metadata(caplog,
person_metadata,
setup_metadata_signer):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
user_metadata_client = UserMetadata(identifier=identifier)
person_metadata_client = PersonMetadata(identifier=identifier)
with requests_mock.Mocker(real_http=False) as request_mocker:
request_mocker.register_uri(
'GET',
@@ -101,7 +101,7 @@ def test_get_user_metadata(caplog,
content=json.dumps(person_metadata).encode('utf-8'),
reason='OK'
)
user_metadata_client.query()
person_metadata_client.query()
assert 'Get latest data status: 200' in caplog.text
key = generate_metadata_pointer(
identifier=identifier,
@@ -118,6 +118,6 @@ def test_get_user_metadata(caplog,
status_code=404,
reason='NOT FOUND'
)
user_metadata_client.query()
person_metadata_client.query()
assert 'The data is not available and might need to be added.' in caplog.text
assert str(error.value) == f'400 Client Error: NOT FOUND for url: {define_metadata_pointer_url}'

View File

@@ -15,7 +15,7 @@ from cic_ussd.state_machine.logic.user import (
get_user_metadata,
save_complete_user_metadata,
process_gender_user_input,
save_profile_attribute_to_session_data,
save_metadata_attribute_to_session_data,
update_account_status_to_active)
@@ -41,14 +41,14 @@ def test_update_account_status_to_active(create_pending_user, create_in_db_ussd_
("enter_location", "location", "Kangemi", "Kangemi"),
("enter_products", "products", "Mandazi", "Mandazi"),
])
def test_save_save_profile_attribute_to_session_data(current_state,
expected_key,
expected_result,
user_input,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session,
create_in_redis_ussd_session):
def test_save_metadata_attribute_to_session_data(current_state,
expected_key,
expected_result,
user_input,
celery_session_worker,
create_activated_user,
create_in_db_ussd_session,
create_in_redis_ussd_session):
create_in_db_ussd_session.state = current_state
serialized_in_db_ussd_session = create_in_db_ussd_session.to_json()
state_machine_data = (user_input, serialized_in_db_ussd_session, create_activated_user)
@@ -56,7 +56,7 @@ def test_save_save_profile_attribute_to_session_data(current_state,
in_memory_ussd_session = json.loads(in_memory_ussd_session)
assert in_memory_ussd_session.get('session_data') == {}
serialized_in_db_ussd_session['state'] = current_state
save_profile_attribute_to_session_data(state_machine_data=state_machine_data)
save_metadata_attribute_to_session_data(state_machine_data=state_machine_data)
in_memory_ussd_session = InMemoryStore.cache.get('AT974186')
in_memory_ussd_session = json.loads(in_memory_ussd_session)
@@ -87,18 +87,18 @@ def test_format_user_metadata(create_activated_user,
def test_save_complete_user_metadata(celery_session_worker,
complete_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
mocker,
setup_chain_spec,
ussd_session_data):
complete_user_metadata,
create_activated_user,
create_in_redis_ussd_session,
mocker,
setup_chain_spec,
ussd_session_data):
ussd_session = create_in_redis_ussd_session.get(ussd_session_data.get('external_session_id'))
ussd_session = json.loads(ussd_session)
ussd_session['session_data'] = complete_user_metadata
user_metadata = format_user_metadata(metadata=ussd_session.get('session_data'), user=create_activated_user)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_user_metadata.apply_async')
mocked_create_metadata_task = mocker.patch('cic_ussd.tasks.metadata.create_person_metadata.apply_async')
save_complete_user_metadata(state_machine_data=state_machine_data)
mocked_create_metadata_task.assert_called_with(
(user_metadata, create_activated_user.blockchain_address),
@@ -127,7 +127,7 @@ def test_edit_user_metadata_attribute(celery_session_worker,
}
state_machine_data = ('', ussd_session, create_activated_user)
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_user_metadata.apply_async')
mocked_edit_metadata = mocker.patch('cic_ussd.tasks.metadata.edit_person_metadata.apply_async')
edit_user_metadata_attribute(state_machine_data=state_machine_data)
person_metadata['location']['area_name'] = 'nairobi'
mocked_edit_metadata.assert_called_with(
@@ -146,7 +146,7 @@ def test_get_user_metadata_attribute(celery_session_worker,
ussd_session = json.loads(ussd_session)
state_machine_data = ('', ussd_session, create_activated_user)
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_user_metadata.apply_async')
mocked_get_metadata = mocker.patch('cic_ussd.tasks.metadata.query_person_metadata.apply_async')
get_user_metadata(state_machine_data=state_machine_data)
mocked_get_metadata.assert_called_with(
(create_activated_user.blockchain_address,),

View File

@@ -18,7 +18,7 @@ from cic_ussd.files.local_files import create_local_file_data_stores, json_file_
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.metadata.person import PersonMetadata
from cic_ussd.state_machine import UssdStateMachine
@@ -121,9 +121,9 @@ def setup_metadata_signer(load_config):
@pytest.fixture(scope='function')
def define_metadata_pointer_url(load_config, create_activated_user):
identifier = blockchain_address_to_metadata_pointer(blockchain_address=create_activated_user.blockchain_address)
UserMetadata.base_url = load_config.get('CIC_META_URL')
user_metadata_client = UserMetadata(identifier=identifier)
return user_metadata_client.url
PersonMetadata.base_url = load_config.get('CIC_META_URL')
person_metadata_client = PersonMetadata(identifier=identifier)
return person_metadata_client.url
@pytest.fixture(scope='function')

View File

@@ -57,9 +57,9 @@ WORKDIR /home/grassroots
USER grassroots
ARG pip_extra_index_url=https://pip.grassrootseconomics.net:8433
ARG cic_base_version=0.1.2a62
ARG cic_eth_version=0.11.0b1
ARG sarafu_faucet_version=0.0.2a19
ARG cic_base_version=0.1.2a77
ARG cic_eth_version=0.11.0b6
ARG sarafu_faucet_version=0.0.2a28
ARG cic_contracts_version=0.0.2a2
RUN pip install --user --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version \
cic-eth==$cic_eth_version \

View File

@@ -72,12 +72,13 @@ if [[ -n "${ETH_PROVIDER}" ]]; then
# Sarafu faucet contract
>&2 echo "deploy token faucet contract"
DEV_FAUCET_ADDRESS=`sarafu-faucet-deploy -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -w -v --account-index-address $DEV_ACCOUNT_INDEX_ADDRESS $DEV_RESERVE_ADDRESS`
>&2 echo "set token faucet amount"
sarafu-faucet-set -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -a $DEV_FAUCET_ADDRESS $faucet_amount
eth-contract-registry-set -w -y $keystore_file -r $CIC_REGISTRY_ADDRESS -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -vv Faucet $DEV_FAUCET_ADDRESS
>&2 echo "set faucet as token minter"
giftable-token-minter -w -y $keystore_file -a $DEV_RESERVE_ADDRESS -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -vv $DEV_FAUCET_ADDRESS
>&2 echo "set token faucet amount"
sarafu-faucet-set -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -a $DEV_FAUCET_ADDRESS $faucet_amount
else
echo "\$ETH_PROVIDER not set!"

View File

@@ -2,88 +2,234 @@
This folder contains tools to generate and import test data.
## DATA CREATION
## OVERVIEW
Does not need the cluster to run.
Three sets of tools are available, sorted by respective subdirectories.
Vanilla:
* **eth**: Import using sovereign wallets.
* **cic_eth**: Import using the `cic_eth` custodial engine.
* **cic_ussd**: Import using the `cic_ussd` interface (backed by `cic_eth`)
Each of the modules include two main scripts:
* **import_users.py**: Registers all created accounts in the network
* **import_balance.py**: Transfer an opening balance using an external keystore wallet
The balance script will sync with the blockchain, processing transactions and triggering actions when it finds. In its current version it does not keep track of any other state, so it will run indefinitly and needs You the Human to decide when it has done what it needs to do.
In addition the following common tools are available:
* **create_import_users.py**: User creation script
* **verify.py**: Import verification script
* **cic_meta**: Metadata imports
## REQUIREMENTS
A virtual environment for the python scripts is recommended. We know it works with `python 3.8.x`. Let us know if you run it successfully with other minor versions.
```
python3 -m venv .venv
source .venv/bin/activate
```
Install all requirements from the `requirements.txt` file:
`pip install --extra-index-url https://pip.grassrootseconomics.net:8433 -r requirements.txt`
If you are importing metadata, also do ye olde:
`npm install`
## HOW TO USE
### Step 1 - Data creation
Before running any of the imports, the user data to import has to be generated and saved to disk.
The script does not need any services to run.
Vanilla version:
`python create_import_users.py [--dir <datadir>] <number_of_users>`
If you want to use the `import_balance.py` script to add to the user's balance from an external address, add:
If you want to use a `import_balance.py` script to add to the user's balance from an external address, use:
`python create_import_users.py --gift-threshold <max_units_to_send> [--dir <datadir>] <number_of_users>`
## IMPORT
### Step 2 - Services
Make sure the following is running in the cluster:
* eth
* postgres
* redis
* cic-meta-server
Unless you know what you are doing, start with a clean slate, and execute (in the repository root):
`docker-compose down -v`
Then go through, in sequence:
#### Base requirements
If you are importing using `eth` and _not_ importing metadata, then the only service you need running in the cluster is:
* eth
In all other cases you will _also_ need:
* postgres
* redis
If using the _custodial_ alternative for user imports, also run:
* cic-eth-tasker
* cic-eth-dispatcher
* cic-eth-tracker
#### EVM provisions
This step is needed in *all* cases.
`RUN_MASK=1 docker-compose up contract-migration`
After this step is run, you can find top-level ethereum addresses (like the cic registry address, which you will need below) in `<repository_root>/service-configs/.env`
You will want to run these in sequence:
#### Custodial provisions
This step is _only_ needed if you are importing using `cic_eth` or `cic_ussd`
`RUN_MASK=2 docker-compose up contract-migration`
## 1. Metadata
#### Custodial services
`node import_meta.js <datadir> <number_of_users>`
If importing using `cic_eth` or `cic_ussd` also run:
* cic-eth-tasker
* cic-eth-dispatcher
* cic-eth-tracker
* cic-eth-retrier
If importing using `cic_ussd` also run:
* cic-ussd-tasker
* cic-ussd-server
* cic-notify-tasker
If metadata is to be imported, also run:
* cic-meta-server
### Step 3 - User imports
If you did not change the docker-compose setup, your `eth_provider` the you need for the commands below will be `http://localhost:63545`.
Only run _one_ of the alternatives.
The keystore file used for transferring external opening balances tracker is relative to the directory you found this README in. Of course you can use a different wallet, but then you will have to provide it with tokens yourself (hint: `../reset.sh`)
All external balance transactions are saved in raw wire format in `<datadir>/txs`, with transaction hash as file name.
#### Alternative 1 - Sovereign wallet import - `eth`
First, make a note of the **block height** before running anything.
To import, run to _completion_:
`python eth/import_users.py -v -c config -p <eth_provider> -r <cic_registry_address> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c <datadir>`
After the script completes, keystore files for all generated accouts will be found in `<datadir>/keystore`, all with `foo` as password (would set it empty, but believe it or not some interfaces out there won't work unless you have one).
Then run:
`python eth/import_balance.py -v -c config -r <cic_registry_address> -p <eth_provider> --offset <block_height_at_start> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c <datadir>`
#### Alternative 2 - Custodial engine import - `cic_eth`
Run in sequence, in first terminal:
`python cic_eth/import_balance.py -v -c config -p <eth_provider> -r <cic_registry_address> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c --head out`
In another terminal:
`python cic_eth/import_users.py -v -c config --redis-host-callback <redis_hostname_in_docker> out`
The `redis_hostname_in_docker` value is the hostname required to reach the redis server from within the docker cluster, and should be `redis` if you left the docker-compose unchanged. The `import_users` script will receive the address of each newly created custodial account on a redis subscription fed by a callback task in the `cic_eth` account creation task chain.
#### Alternative 3 - USSD import - `cic_ussd`
If you have previously run the `cic_ussd` import incompletely, it could be a good idea to purge the queue. If you have left docker-compose unchanged, `redis_url` should be `redis://localhost:63379`.
`celery -A cic_ussd.import_task purge -Q cic-import-ussd --broker <redis_url>`
Then, in sequence, run in first terminal:
`python cic_eth/import_balance.py -v -c config -p <eth_provider> -r <cic_registry_address> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c out`
In second terminal:
`python cic_ussd/import_users.py -v -c config out`
The balance script is a celery task worker, and will not exit by itself in its current version. However, after it's done doing its job, you will find "reached nonce ... exiting" among the last lines of the log.
The connection parameters for the `cic-ussd-server` is currently _hardcoded_ in the `import_users.py` script file.
### Step 4 - Metadata import (optional)
The metadata import scripts can be run at any time after step 1 has been completed.
#### Importing user metadata
To import the main user metadata structs, run:
`node cic_meta/import_meta.js <datadir> <number_of_users>`
Monitors a folder for output from the `import_users.py` script, adding the metadata found to the `cic-meta` service.
## 2. Balances
(Only if you used the `--gift-threshold` option above)
`python -c config -i <newchain:id> -r <cic_registry_address> -p <eth_provider> --head -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c <datadir>`
This will monitor new mined blocks and send balances to the newly created accounts.
If _number of users_ is omitted the script will run until manually interrupted.
### 3. Users
#### Importing phone pointer
Only use **one** of the following
`node cic_meta/import_meta_phone.js <datadir> <number_of_users>`
#### Custodial
If you imported using `cic_ussd`, the phone pointer is _already added_ and this script will do nothing.
This alternative generates accounts using the `cic-eth` custodial engine
Without any modifications to the cluster and config files:
### Step 5 - Verify
`python import_users.py -c config --redis-host-callback redis <datadir>`
`python verify.py -v -c config -r <cic_registry_address> -p <eth_provider> <datadir>`
** A note on the The callback**: The script uses a redis callback to retrieve the newly generated custodial address. This is the redis server _from the perspective of the cic-eth component_.
Included checks:
* Private key is in cic-eth keystore
* Address is in accounts index
* Address has gas balance
* Address has triggered the token faucet
* Address has token balance matching the gift threshold
* Personal metadata can be retrieved and has exact match
* Phone pointer metadata can be retrieved and matches address
* USSD menu response is initial state after registration
#### Sovereign
Checks can be selectively included and excluded. See `--help` for details.
This alternative generates keystore files, while registering corresponding addresses in the accounts registry directly
`python import_sovereign_users.py -c config -i <newchain:id> -r <cic_registry_address> -p <eth_provider> -y ../keystore/UTC--2021-01-08T17-18-44.521011372Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c <datadir>`
A `keystore` sub-directory in the data path is created, with ethereum keystore files for all generated private keys. Passphrase is set to empty string for all of them.
## VERIFY
`python verify.py -c config -i <newchain:id> -r <cic_registry_address> -p <eth_provider> <datadir>`
Checks
* Private key is in cic-eth keystore
* Address is in accounts index
* Address has balance matching the gift threshold
* Metadata can be retrieved and has exact match
Will output one line for each check, with name of check and number of errors found per check.
Should exit with code 0 if all input data is found in the respective services.
## KNOWN ISSUES
If the faucet disbursement is set to a non-zero amount, the balances will be off. The verify script needs to be improved to check the faucet amount.
- If the faucet disbursement is set to a non-zero amount, the balances will be off. The verify script needs to be improved to check the faucet amount.
- When the account callback in `cic_eth` fails, the `cic_eth/import_users.py` script will exit with a cryptic complaint concerning a `None` value.
- Sovereign import scripts use the same keystore, and running them simultaneously will mess up the transaction nonce sequence. Better would be to use two different keystore wallets so balance and users scripts can be run simultaneously.
- `pycrypto` and `pycryptodome` _have to be installed in that order_. If you get errors concerning `Crypto.KDF` then uninstall both and re-install in that order. Make sure you use the versions listed in `requirements.txt`. `pycryptodome` is a legacy dependency and will be removed as soon as possible.
- Sovereign import script is very slow because it's scrypt'ing keystore files for the accounts that it creates. An improvement would be optional and/or asynchronous keyfile generation.
- Running the balance script should be _optional_ in all cases, but is currently required in the case of `cic_ussd` because it is needed to generate the metadata. An improvement would be moving the task to `import_users.py`, for a different queue than the balance tx handler.
- `cic_ussd` imports is poorly implemented, and consumes a lot of resources. Therefore it takes a long time to complete. Reducing the amount of polls for the phone pointer would go a long way to improve it.

View File

@@ -10,14 +10,14 @@ import hashlib
import csv
import json
# third-party impotts
# external imports
import eth_abi
import confini
from hexathon import (
strip_0x,
add_0x,
)
from chainsyncer.backend import MemBackend
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver import HeadSyncer
from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.block import (
@@ -42,7 +42,7 @@ from cic_types.models.person import Person
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = '/usr/local/etc/cic-syncer'
config_dir = './config'
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
@@ -162,6 +162,15 @@ class Handler:
(tx_hash_hex, o) = self.tx_factory.transfer(self.token_address, signer_address, recipient, balance_full)
logg.info('submitting erc20 transfer tx {} for recipient {}'.format(tx_hash_hex, recipient))
r = conn.do(o)
tx_path = os.path.join(
user_dir,
'txs',
strip_0x(tx_hash_hex),
)
f = open(tx_path, 'w')
f.write(strip_0x(o['params'][0]))
f.close()
# except TypeError as e:
# logg.warning('typerror {}'.format(e))
# pass
@@ -195,8 +204,8 @@ class Handler:
# return b
def progress_callback(block_number, tx_index, s):
sys.stdout.write(str(s).ljust(200) + "\n")
def progress_callback(block_number, tx_index):
sys.stdout.write(str(block_number).ljust(200) + "\n")
@@ -290,7 +299,7 @@ def main():
f.close()
syncer_backend.set(block_offset, 0)
syncer = HeadSyncer(syncer_backend, progress_callback=progress_callback)
syncer = HeadSyncer(syncer_backend, block_callback=progress_callback)
handler = Handler(conn, chain_spec, user_dir, balances, sarafu_token_address, signer, gas_oracle, nonce_oracle)
syncer.add_filter(handler)
syncer.loop(1, conn)

View File

@@ -7,6 +7,7 @@ import argparse
import uuid
import datetime
import time
import phonenumbers
from glob import glob
# third-party imports
@@ -17,7 +18,7 @@ from hexathon import (
add_0x,
strip_0x,
)
from chainlib.eth.address import to_checksum
from chainlib.eth.address import to_checksum_address
from cic_types.models.person import Person
from cic_eth.api.api_task import Api
from chainlib.chain import ChainSpec
@@ -75,9 +76,15 @@ os.makedirs(user_new_dir)
meta_dir = os.path.join(args.user_dir, 'meta')
os.makedirs(meta_dir)
phone_dir = os.path.join(args.user_dir, 'phone')
os.makedirs(os.path.join(phone_dir, 'meta'))
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
txs_dir = os.path.join(args.user_dir, 'txs')
os.makedirs(txs_dir)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec)
@@ -165,12 +172,32 @@ if __name__ == '__main__':
f.write(json.dumps(o))
f.close()
#old_address = to_checksum(add_0x(y[:len(y)-5]))
#fi.write('{},{}\n'.format(new_address, old_address))
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), 'cic.person')
meta_filepath = os.path.join(meta_dir, '{}.json'.format(new_address_clean.upper()))
os.symlink(os.path.realpath(filepath), meta_filepath)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
meta_phone_key = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
meta_phone_filepath = os.path.join(phone_dir, 'meta', meta_phone_key)
filepath = os.path.join(
phone_dir,
'new',
meta_phone_key[:2].upper(),
meta_phone_key[2:4].upper(),
meta_phone_key.upper(),
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
f = open(filepath, 'w')
f.write(to_checksum_address(new_address_clean))
f.close()
os.symlink(os.path.realpath(filepath), meta_phone_filepath)
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")

View File

@@ -101,6 +101,7 @@ function importMeta(keystore) {
const file = files[i];
if (file.substr(-5) != '.json') {
console.debug('skipping file', file);
continue;
}
const filePath = path.join(workDir, file);
doOne(keystore, filePath);

View File

@@ -0,0 +1,134 @@
const fs = require('fs');
const path = require('path');
const http = require('http');
const cic = require('cic-client-meta');
const vcfp = require('vcard-parser');
//const conf = JSON.parse(fs.readFileSync('./cic.conf'));
const config = new cic.Config('./config');
config.process();
console.log(config);
function sendit(uid, envelope) {
const d = envelope.toJSON();
const contentLength = (new TextEncoder().encode(d)).length;
const opts = {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Content-Length': contentLength,
'X-CIC-AUTOMERGE': 'client',
},
};
let url = config.get('META_URL');
url = url.replace(new RegExp('^(.+://[^/]+)/*$'), '$1/');
console.log('posting to url: ' + url + uid);
const req = http.request(url + uid, opts, (res) => {
res.on('data', process.stdout.write);
res.on('end', () => {
console.log('result', res.statusCode, res.headers);
});
});
if (!req.write(d)) {
console.error('foo', d);
process.exit(1);
}
req.end();
}
function doOne(keystore, filePath, address) {
const signer = new cic.PGPSigner(keystore);
const j = JSON.parse(fs.readFileSync(filePath).toString());
const b = Buffer.from(j['vcard'], 'base64');
const s = b.toString();
const o = vcfp.parse(s);
const phone = o.tel[0].value;
cic.Phone.toKey(phone).then((uid) => {
const o = fs.readFileSync(filePath, 'utf-8');
const s = new cic.Syncable(uid, o);
s.setSigner(signer);
s.onwrap = (env) => {
sendit(uid, env);
};
s.sign();
});
}
const privateKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE'));
const publicKeyPath = path.join(config.get('PGP_EXPORTS_DIR'), config.get('PGP_PRIVATE_KEY_FILE'));
pk = fs.readFileSync(privateKeyPath);
pubk = fs.readFileSync(publicKeyPath);
new cic.PGPKeyStore(
config.get('PGP_PASSPHRASE'),
pk,
pubk,
undefined,
undefined,
importMetaPhone,
);
const batchSize = 16;
const batchDelay = 1000;
const total = parseInt(process.argv[3]);
const dataDir = process.argv[2];
const workDir = path.join(dataDir, 'phone/meta');
const userDir = path.join(dataDir, 'new');
let count = 0;
let batchCount = 0;
function importMetaPhone(keystore) {
let err;
let files;
try {
err, files = fs.readdirSync(workDir);
} catch {
console.error('source directory not yet ready', workDir);
setTimeout(importMetaPhone, batchDelay, keystore);
return;
}
let limit = batchSize;
if (files.length < limit) {
limit = files.length;
}
for (let i = 0; i < limit; i++) {
const file = files[i];
if (file.substr(0) == '.') {
console.debug('skipping file', file);
}
const filePath = path.join(workDir, file);
const address = fs.readFileSync(filePath).toString().substring(2).toUpperCase();
const metaFilePath = path.join(
userDir,
address.substring(0, 2),
address.substring(2, 4),
address + '.json',
);
doOne(keystore, metaFilePath, address);
fs.unlinkSync(filePath);
count++;
batchCount++;
if (batchCount == batchSize) {
console.debug('reached batch size, breathing');
batchCount=0;
setTimeout(importMeta, batchDelay, keystore);
return;
}
}
if (count == total) {
return;
}
setTimeout(importMetaPhone, 100, keystore);
}

View File

@@ -0,0 +1,157 @@
# standard imports
import os
import sys
import logging
import argparse
import hashlib
import redis
import celery
# external imports
import confini
from chainlib.eth.connection import EthHTTPConnection
from chainlib.chain import ChainSpec
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum_address
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore.dict import DictKeystore
from cic_types.models.person import Person
# local imports
from import_util import BalanceProcessor
from import_task import *
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = './config'
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--meta-host', dest='meta_host', type=str, help='metadata server host')
argparser.add_argument('--meta-port', dest='meta_port', type=int, help='metadata server host')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--token-symbol', default='SRF', type=str, dest='token_symbol', help='Token symbol to use for trnsactions')
argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', default='out', type=str, help='user export directory')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
'REDIS_HOST': getattr(args, 'redis_host'),
'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'),
'META_HOST': getattr(args, 'meta_host'),
'META_PORT': getattr(args, 'meta_port'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
r = redis.Redis(redis_host, redis_port, redis_db)
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
signer_address = None
keystore = DictKeystore()
if args.y != None:
logg.debug('loading keystore file {}'.format(args.y))
signer_address = keystore.import_keystore_file(args.y)
logg.debug('now have key for signer address {}'.format(signer_address))
signer = EIP155Signer(keystore)
queue = args.q
chain_str = config.get('CIC_CHAIN_SPEC')
block_offset = 0
if args.head:
block_offset = -1
else:
block_offset = args.offset
chain_spec = ChainSpec.from_chain_str(chain_str)
old_chain_spec_str = args.old_chain_spec
old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str)
user_dir = args.user_dir # user_out_dir from import_users.py
token_symbol = args.token_symbol
MetadataTask.meta_host = config.get('META_HOST')
MetadataTask.meta_port = config.get('META_PORT')
ImportTask.chain_spec = chain_spec
def main():
conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
ImportTask.balance_processor = BalanceProcessor(conn, chain_spec, config.get('CIC_REGISTRY_ADDRESS'), signer_address, signer)
ImportTask.balance_processor.init()
# TODO get decimals from token
balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r'))
remove_zeros = 10**6
i = 0
while True:
l = f.readline()
if l == None:
break
r = l.split(',')
try:
address = to_checksum_address(r[0])
sys.stdout.write('loading balance {} {} {}'.format(i, address, r[1]).ljust(200) + "\r")
except ValueError:
break
balance = int(int(r[1].rstrip()) / remove_zeros)
balances[address] = balance
i += 1
f.close()
ImportTask.balances = balances
ImportTask.count = i
s = celery.signature(
'import_task.send_txs',
[
MetadataTask.balance_processor.nonce_offset,
],
queue='cic-import-ussd',
)
s.apply_async()
argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG']
celery_app.worker_main(argv)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,218 @@
# standard imports
import os
import logging
import urllib.parse
import urllib.error
import urllib.request
import json
# external imports
import celery
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum_address
from chainlib.eth.tx import (
unpack,
raw,
)
from cic_types.processor import generate_metadata_pointer
from cic_types.models.person import Person
#logg = logging.getLogger().getChild(__name__)
logg = logging.getLogger()
celery_app = celery.current_app
class ImportTask(celery.Task):
balances = None
import_dir = 'out'
count = 0
chain_spec = None
balance_processor = None
max_retries = None
class MetadataTask(ImportTask):
meta_host = None
meta_port = None
meta_path = ''
meta_ssl = False
autoretry_for = (
urllib.error.HTTPError,
OSError,
)
retry_jitter = True
retry_backoff = True
retry_backoff_max = 60
@classmethod
def meta_url(self):
scheme = 'http'
if self.meta_ssl:
scheme += s
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path))
return urllib.parse.urlunparse(url)
def old_address_from_phone(base_path, phone):
pidx = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
phone_idx_path = os.path.join('{}/phone/{}/{}/{}'.format(
base_path,
pidx[:2],
pidx[2:4],
pidx,
)
)
f = open(phone_idx_path, 'r')
old_address = f.read()
f.close()
return old_address
@celery_app.task(bind=True, base=MetadataTask)
def resolve_phone(self, phone):
identifier = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
url = urllib.parse.urljoin(self.meta_url(), identifier)
logg.debug('attempt getting phone pointer at {} for phone {}'.format(url, phone))
r = urllib.request.urlopen(url)
address = json.load(r)
address = address.replace('"', '')
logg.debug('address {} for phone {}'.format(address, phone))
return address
@celery_app.task(bind=True, base=MetadataTask)
def generate_metadata(self, address, phone):
old_address = old_address_from_phone(self.import_dir, phone)
logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> foo')
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
self.import_dir,
old_address_upper[:2],
old_address_upper[2:4],
old_address_upper,
)
f = open(metadata_path, 'r')
o = json.load(f)
f.close()
u = Person.deserialize(o)
if u.identities.get('evm') == None:
u.identities['evm'] = {}
sub_chain_str = '{}:{}'.format(self.chain_spec.common_name(), self.chain_spec.network_id())
u.identities['evm'][sub_chain_str] = [add_0x(address)]
new_address_clean = strip_0x(address)
filepath = os.path.join(
self.import_dir,
'new',
new_address_clean[:2].upper(),
new_address_clean[2:4].upper(),
new_address_clean.upper() + '.json',
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
o = u.serialize()
f = open(filepath, 'w')
f.write(json.dumps(o))
f.close()
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person')
meta_filepath = os.path.join(
self.import_dir,
'meta',
'{}.json'.format(new_address_clean.upper()),
)
os.symlink(os.path.realpath(filepath), meta_filepath)
logg.debug('found metadata {} for phone {}'.format(o, phone))
return address
@celery_app.task(bind=True, base=MetadataTask)
def opening_balance_tx(self, address, phone, serial):
old_address = old_address_from_phone(self.import_dir, phone)
k = to_checksum_address(strip_0x(old_address))
balance = self.balances[k]
logg.debug('found balance {} for address {} phone {}'.format(balance, old_address, phone))
decimal_balance = self.balance_processor.get_decimal_amount(int(balance))
(tx_hash_hex, o) = self.balance_processor.get_rpc_tx(address, decimal_balance, serial)
tx = unpack(bytes.fromhex(strip_0x(o)), self.chain_spec)
logg.debug('generated tx token value {} to {} tx hash {}'.format(decimal_balance, address, tx_hash_hex))
tx_path = os.path.join(
self.import_dir,
'txs',
strip_0x(tx_hash_hex),
)
f = open(tx_path, 'w')
f.write(strip_0x(o))
f.close()
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(tx['nonce']),
)
os.symlink(os.path.realpath(tx_path), tx_nonce_path)
return tx['hash']
@celery_app.task(bind=True, base=ImportTask, autoretry_for=(FileNotFoundError,), max_retries=None, default_retry_delay=0.1)
def send_txs(self, nonce):
if nonce == self.count + self.balance_processor.nonce_offset:
logg.info('reached nonce {} (offset {} + count {}) exiting'.format(nonce, self.balance_processor.nonce_offset, self.count))
return
logg.debug('attempt to open symlink for nonce {}'.format(nonce))
tx_nonce_path = os.path.join(
self.import_dir,
'txs',
'.' + str(nonce),
)
f = open(tx_nonce_path, 'r')
tx_signed_raw_hex = f.read()
f.close()
os.unlink(tx_nonce_path)
o = raw(add_0x(tx_signed_raw_hex))
tx_hash_hex = self.balance_processor.conn.do(o)
logg.info('sent nonce {} tx hash {}'.format(nonce, tx_hash_hex)) #tx_signed_raw_hex))
nonce += 1
queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'import_task.send_txs',
[
nonce,
],
queue=queue,
)
s.apply_async()
return nonce

View File

@@ -0,0 +1,191 @@
# standard imports
import os
import sys
import json
import logging
import argparse
import uuid
import datetime
import time
import urllib.request
from glob import glob
# third-party imports
import redis
import confini
import celery
from hexathon import (
add_0x,
strip_0x,
)
from chainlib.eth.address import to_checksum
from cic_types.models.person import Person
from cic_eth.api.api_task import Api
from chainlib.chain import ChainSpec
from cic_types.processor import generate_metadata_pointer
import phonenumbers
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic'
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain specification string')
argparser.add_argument('--redis-host', dest='redis_host', type=str, help='redis host to use for task submission')
argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis host to use for task submission')
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--batch-size', dest='batch_size', default=100, type=int, help='burst size of sending transactions to node') # batch size should be slightly below cumulative gas limit worth, eg 80000 gas txs with 8000000 limit is a bit less than 100 batch size
argparser.add_argument('--batch-delay', dest='batch_delay', default=3, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
argparser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = argparser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'REDIS_HOST': getattr(args, 'redis_host'),
'REDIS_PORT': getattr(args, 'redis_port'),
'REDIS_DB': getattr(args, 'redis_db'),
}
config.dict_override(args_override, 'cli')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
redis_host = config.get('REDIS_HOST')
redis_port = config.get('REDIS_PORT')
redis_db = config.get('REDIS_DB')
r = redis.Redis(redis_host, redis_port, redis_db)
ps = r.pubsub()
user_new_dir = os.path.join(args.user_dir, 'new')
os.makedirs(user_new_dir)
meta_dir = os.path.join(args.user_dir, 'meta')
os.makedirs(meta_dir)
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
txs_dir = os.path.join(args.user_dir, 'txs')
os.makedirs(txs_dir)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
url = 'http'
if ssl:
url += 's'
url += '://{}:{}'.format(host, port)
url += '/?username={}&password={}'.format(username, password) #config.get('USSD_USER'), config.get('USSD_PASS'))
logg.info('ussd service url {}'.format(url))
logg.info('ussd phone {}'.format(phone))
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': service_code,
'phoneNumber': phone,
'text': service_code,
}
req = urllib.request.Request(url)
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
return req
def register_ussd(i, u):
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
logg.debug('tel {} {}'.format(u.tel, phone))
req = build_ussd_request(phone, 'localhost', 63315, '*483*46#', '', '')
response = urllib.request.urlopen(req)
response_data = response.read().decode('utf-8')
state = response_data[:3]
out = response_data[4:]
logg.debug('ussd reponse: {}'.format(out))
if __name__ == '__main__':
i = 0
j = 0
for x in os.walk(user_old_dir):
for y in x[2]:
if y[len(y)-5:] != '.json':
continue
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
new_address = register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
)
s_meta.link(s_balance)
s_phone.link(s_meta)
s_phone.apply_async(countdown=7) # block time plus a bit of time for ussd processing
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1
if j == batch_size:
time.sleep(batch_delay)
j = 0
#fi.close()

View File

@@ -0,0 +1,72 @@
# standard imports
import logging
# external imports
from eth_contract_registry import Registry
from eth_token_index import TokenUniqueSymbolIndex
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.nonce import OverrideNonceOracle
from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import (
count,
TxFormat,
)
logg = logging.getLogger().getChild(__name__)
class BalanceProcessor:
def __init__(self, conn, chain_spec, registry_address, signer_address, signer):
self.chain_spec = chain_spec
self.conn = conn
#self.signer_address = signer_address
self.registry_address = registry_address
self.token_index_address = None
self.token_address = None
self.signer_address = signer_address
self.signer = signer
o = count(signer_address)
c = self.conn.do(o)
self.nonce_offset = int(c, 16)
self.gas_oracle = OverrideGasOracle(conn=conn, limit=8000000)
self.value_multiplier = 1
def init(self):
# Get Token registry address
registry = Registry(self.chain_spec)
o = registry.address_of(self.registry_address, 'TokenRegistry')
r = self.conn.do(o)
self.token_index_address = registry.parse_address_of(r)
logg.info('found token index address {}'.format(self.token_index_address))
token_registry = TokenUniqueSymbolIndex(self.chain_spec)
o = token_registry.address_of(self.token_index_address, 'SRF')
r = self.conn.do(o)
self.token_address = token_registry.parse_address_of(r)
logg.info('found SRF token address {}'.format(self.token_address))
tx_factory = ERC20(self.chain_spec)
o = tx_factory.decimals(self.token_address)
r = self.conn.do(o)
n = tx_factory.parse_decimals(r)
self.value_multiplier = 10 ** n
def get_rpc_tx(self, recipient, value, i):
logg.debug('initiating nonce offset {} for recipient {}'.format(self.nonce_offset + i, recipient))
nonce_oracle = OverrideNonceOracle(self.signer_address, self.nonce_offset + i)
tx_factory = ERC20(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.gas_oracle)
return tx_factory.transfer(self.token_address, self.signer_address, recipient, value, tx_format=TxFormat.RLP_SIGNED)
#(tx_hash_hex, o) = tx_factory.transfer(self.token_address, self.signer_address, recipient, value)
#self.conn.do(o)
#return tx_hash_hex
def get_decimal_amount(self, value):
return value * self.value_multiplier

View File

@@ -0,0 +1,24 @@
[app]
ALLOWED_IP=0.0.0.0/0
LOCALE_FALLBACK=en
LOCALE_PATH=/usr/src/cic-ussd/var/lib/locale/
MAX_BODY_LENGTH=1024
PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[phone_number]
REGION=KE
[ussd]
MENU_FILE=/usr/src/data/ussd_menu.json
user =
pass =
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/
[client]
host =
port =
ssl =

View File

@@ -1,2 +1,5 @@
[meta]
url = http://localhost:63380
host = localhost
port = 63380
ssl = 0

View File

@@ -24,6 +24,7 @@ from cic_types.models.person import (
get_contact_data_from_vcard,
)
from chainlib.eth.address import to_checksum_address
import phonenumbers
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -80,10 +81,12 @@ phone_idx = []
user_dir = args.dir
user_count = args.user_count
random.seed()
def genPhoneIndex(phone):
h = hashlib.new('sha256')
h.update(phone.encode('utf-8'))
h.update(b'cic.msisdn')
h.update(b':cic.phone')
return h.digest().hex()
@@ -101,7 +104,9 @@ def genDate():
def genPhone():
return fake.msisdn()
phone_str = '+254' + str(random.randint(100000000, 999999999))
phone_object = phonenumbers.parse(phone_str)
return phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
def genPersonal(phone):
@@ -205,14 +210,14 @@ if __name__ == '__main__':
f.close()
pidx = genPhoneIndex(phone)
d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), uid)
d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx)
f = open('{}/{}'.format(d, pidx), 'w')
f.write(eth)
f.close()
amount = genAmount()
fa.write('{},{}\n'.format(eth,amount))
logg.debug('pidx {}, uid {}, eth {}, amount {}'.format(pidx, uid, eth, amount))
logg.debug('pidx {}, uid {}, eth {}, amount {}, phone {}'.format(pidx, uid, eth, amount, phone))
i += 1

View File

@@ -0,0 +1,309 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
import hashlib
import csv
import json
# external imports
import eth_abi
import confini
from hexathon import (
strip_0x,
add_0x,
)
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver import HeadSyncer
from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainlib.eth.hash import keccak256_string_to_hex
from chainlib.eth.address import to_checksum_address
from chainlib.eth.erc20 import ERC20
from chainlib.eth.gas import OverrideGasOracle
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import TxFactory
from chainlib.eth.rpc import jsonrpc_template
from chainlib.eth.error import EthException
from chainlib.chain import ChainSpec
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore.dict import DictKeystore
from cic_types.models.person import Person
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
config_dir = './config'
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--token-symbol', default='SRF', type=str, dest='token_symbol', help='Token symbol to use for trnsactions')
argparser.add_argument('--head', action='store_true', help='start at current block height (overrides --offset)')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('--offset', type=int, default=0, help='block offset to start syncer from')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', type=str, help='user export directory')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
#app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
signer_address = None
keystore = DictKeystore()
if args.y != None:
logg.debug('loading keystore file {}'.format(args.y))
signer_address = keystore.import_keystore_file(args.y)
logg.debug('now have key for signer address {}'.format(signer_address))
signer = EIP155Signer(keystore)
queue = args.q
chain_str = config.get('CIC_CHAIN_SPEC')
block_offset = 0
if args.head:
block_offset = -1
else:
block_offset = args.offset
chain_spec = ChainSpec.from_chain_str(chain_str)
old_chain_spec_str = args.old_chain_spec
old_chain_spec = ChainSpec.from_chain_str(old_chain_spec_str)
user_dir = args.user_dir # user_out_dir from import_users.py
token_symbol = args.token_symbol
class Handler:
account_index_add_signature = keccak256_string_to_hex('add(address)')[:8]
def __init__(self, conn, chain_spec, user_dir, balances, token_address, signer, gas_oracle, nonce_oracle):
self.token_address = token_address
self.user_dir = user_dir
self.balances = balances
self.chain_spec = chain_spec
self.tx_factory = ERC20(chain_spec, signer, gas_oracle, nonce_oracle)
def name(self):
return 'balance_handler'
def filter(self, conn, block, tx, db_session):
if tx.payload == None or len(tx.payload) == 0:
logg.debug('no payload, skipping {}'.format(tx))
return
if tx.payload[:8] == self.account_index_add_signature:
recipient = eth_abi.decode_single('address', bytes.fromhex(tx.payload[-64:]))
#original_address = to_checksum_address(self.addresses[to_checksum_address(recipient)])
user_file = 'new/{}/{}/{}.json'.format(
recipient[2:4].upper(),
recipient[4:6].upper(),
recipient[2:].upper(),
)
filepath = os.path.join(self.user_dir, user_file)
o = None
try:
f = open(filepath, 'r')
o = json.load(f)
f.close()
except FileNotFoundError:
logg.error('no import record of address {}'.format(recipient))
return
u = Person.deserialize(o)
original_address = u.identities[old_chain_spec.engine()]['{}:{}'.format(old_chain_spec.common_name(), old_chain_spec.network_id())][0]
try:
balance = self.balances[original_address]
except KeyError as e:
logg.error('balance get fail orig {} new {}'.format(original_address, recipient))
return
# TODO: store token object in handler ,get decimals from there
multiplier = 10**6
balance_full = balance * multiplier
logg.info('registered {} originally {} ({}) tx hash {} balance {}'.format(recipient, original_address, u, tx.hash, balance_full))
(tx_hash_hex, o) = self.tx_factory.transfer(self.token_address, signer_address, recipient, balance_full)
logg.info('submitting erc20 transfer tx {} for recipient {}'.format(tx_hash_hex, recipient))
r = conn.do(o)
tx_path = os.path.join(
user_dir,
'txs',
strip_0x(tx_hash_hex),
)
f = open(tx_path, 'w')
f.write(strip_0x(o['params'][0]))
f.close()
# except TypeError as e:
# logg.warning('typerror {}'.format(e))
# pass
# except IndexError as e:
# logg.warning('indexerror {}'.format(e))
# pass
# except EthException as e:
# logg.error('send error {}'.format(e).ljust(200))
#except KeyError as e:
# logg.error('key record not found in imports: {}'.format(e).ljust(200))
#class BlockGetter:
#
# def __init__(self, conn, gas_oracle, nonce_oracle, chain_spec):
# self.conn = conn
# self.tx_factory = ERC20(signer=signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle, chain_id=chain_id)
#
#
# def get(self, n):
# o = block_by_number(n)
# r = self.conn.do(o)
# b = None
# try:
# b = Block(r)
# except TypeError as e:
# if r == None:
# logg.debug('block not found {}'.format(n))
# else:
# logg.error('block retrieve error {}'.format(e))
# return b
def progress_callback(block_number, tx_index):
sys.stdout.write(str(block_number).ljust(200) + "\n")
def main():
global chain_str, block_offset, user_dir
conn = EthHTTPConnection(config.get('ETH_PROVIDER'))
gas_oracle = OverrideGasOracle(conn=conn, limit=8000000)
nonce_oracle = RPCNonceOracle(signer_address, conn)
# Get Token registry address
txf = TxFactory(chain_spec, signer=signer, gas_oracle=gas_oracle, nonce_oracle=None)
tx = txf.template(signer_address, config.get('CIC_REGISTRY_ADDRESS'))
registry_addressof_method = keccak256_string_to_hex('addressOf(bytes32)')[:8]
data = add_0x(registry_addressof_method)
data += eth_abi.encode_single('bytes32', b'TokenRegistry').hex()
txf.set_code(tx, data)
o = jsonrpc_template()
o['method'] = 'eth_call'
o['params'].append(txf.normalize(tx))
o['params'].append('latest')
r = conn.do(o)
token_index_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r))))
logg.info('found token index address {}'.format(token_index_address))
# Get Sarafu token address
tx = txf.template(signer_address, token_index_address)
data = add_0x(registry_addressof_method)
h = hashlib.new('sha256')
h.update(token_symbol.encode('utf-8'))
z = h.digest()
data += eth_abi.encode_single('bytes32', z).hex()
txf.set_code(tx, data)
o = jsonrpc_template()
o['method'] = 'eth_call'
o['params'].append(txf.normalize(tx))
o['params'].append('latest')
r = conn.do(o)
try:
sarafu_token_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r))))
except ValueError as e:
logg.critical('lookup failed for token {}: {}'.format(token_symbol, e))
sys.exit(1)
logg.info('found token address {}'.format(sarafu_token_address))
syncer_backend = MemBackend(chain_str, 0)
if block_offset == -1:
o = block_latest()
r = conn.do(o)
block_offset = int(strip_0x(r), 16) + 1
#
# addresses = {}
# f = open('{}/addresses.csv'.format(user_dir, 'r'))
# while True:
# l = f.readline()
# if l == None:
# break
# r = l.split(',')
# try:
# k = r[0]
# v = r[1].rstrip()
# addresses[k] = v
# sys.stdout.write('loading address mapping {} -> {}'.format(k, v).ljust(200) + "\r")
# except IndexError as e:
# break
# f.close()
# TODO get decimals from token
balances = {}
f = open('{}/balances.csv'.format(user_dir, 'r'))
remove_zeros = 10**6
i = 0
while True:
l = f.readline()
if l == None:
break
r = l.split(',')
try:
address = to_checksum_address(r[0])
sys.stdout.write('loading balance {} {} {}'.format(i, address, r[1]).ljust(200) + "\r")
except ValueError:
break
balance = int(int(r[1].rstrip()) / remove_zeros)
balances[address] = balance
i += 1
f.close()
syncer_backend.set(block_offset, 0)
syncer = HeadSyncer(syncer_backend, block_callback=progress_callback)
handler = Handler(conn, chain_spec, user_dir, balances, sarafu_token_address, signer, gas_oracle, nonce_oracle)
syncer.add_filter(handler)
syncer.loop(1, conn)
if __name__ == '__main__':
main()

View File

@@ -7,6 +7,7 @@ import argparse
import uuid
import datetime
import time
import phonenumbers
from glob import glob
# external imports
@@ -67,9 +68,15 @@ os.makedirs(user_new_dir)
meta_dir = os.path.join(args.user_dir, 'meta')
os.makedirs(meta_dir)
phone_dir = os.path.join(args.user_dir, 'phone')
os.makedirs(os.path.join(phone_dir, 'meta'))
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
txs_dir = os.path.join(args.user_dir, 'txs')
os.makedirs(txs_dir)
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
chain_str = str(chain_spec)
@@ -88,7 +95,7 @@ signer = EIP155Signer(keystore)
nonce_oracle = RPCNonceOracle(signer_address, rpc)
registry = Registry()
registry = Registry(chain_spec)
o = registry.address_of(config.get('CIC_REGISTRY_ADDRESS'), 'AccountRegistry')
r = rpc.do(o)
account_registry_address = registry.parse_address_of(r)
@@ -109,7 +116,7 @@ def register_eth(i, u):
rpc.do(o)
pk = keystore.get(address)
keyfile_content = to_keyfile_dict(pk, '')
keyfile_content = to_keyfile_dict(pk, 'foo')
keyfile_path = os.path.join(keyfile_dir, '{}.json'.format(address))
f = open(keyfile_path, 'w')
json.dump(keyfile_content, f)
@@ -166,10 +173,31 @@ if __name__ == '__main__':
f.write(json.dumps(o))
f.close()
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), 'cic.person')
meta_key = generate_metadata_pointer(bytes.fromhex(new_address_clean), ':cic.person')
meta_filepath = os.path.join(meta_dir, '{}.json'.format(new_address_clean.upper()))
os.symlink(os.path.realpath(filepath), meta_filepath)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
logg.debug('>>>>> Using phone {}'.format(phone))
meta_phone_key = generate_metadata_pointer(phone.encode('utf-8'), ':cic.phone')
meta_phone_filepath = os.path.join(phone_dir, 'meta', meta_phone_key)
filepath = os.path.join(
phone_dir,
'new',
meta_phone_key[:2].upper(),
meta_phone_key[2:4].upper(),
meta_phone_key.upper(),
)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
f = open(filepath, 'w')
f.write(to_checksum_address(new_address_clean))
f.close()
os.symlink(os.path.realpath(filepath), meta_phone_filepath)
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")

View File

@@ -1 +0,0 @@
{"metricId":"ea11447f-da1c-49e6-b0a2-8a988a99e3ce","metrics":{"from":"2021-02-12T18:59:21.666Z","to":"2021-02-12T18:59:21.666Z","successfulInstalls":0,"failedInstalls":1}}

View File

@@ -1 +0,0 @@
python import_balance.py -c config -i evm:bloxberg:8996 -y /home/lash/tmp/d/keystore/UTC--2021-02-07T09-58-35.341813355Z--eb3907ecad74a0013c259d5874ae7f22dcbcc95c -v $@

View File

@@ -1 +0,0 @@
python import_users.py -c config --redis-host-callback redis -vv $@

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,6 @@
{
"dependencies": {
"cic-client-meta": "^0.0.7-alpha.6",
"vcard-parser": "^1.0.0"
}
}

View File

@@ -1,5 +1,5 @@
cic-base[full_graph]==0.1.2a61
sarafu-faucet==0.0.2a17
cic-eth==0.11.0b1
cic-base[full_graph]==0.1.2a77
sarafu-faucet==0.0.2a28
cic-eth==0.11.0b6
cic-types==0.1.0a10
crypto-dev-signer==0.4.14a17
crypto-dev-signer==0.4.14b2

View File

@@ -11,6 +11,8 @@ import csv
import json
import urllib
import copy
import uuid
import urllib.request
# external imports
import celery
@@ -57,18 +59,29 @@ custodial_tests = [
'faucet',
]
all_tests = custodial_tests + [
metadata_tests = [
'metadata',
'metadata_phone',
]
eth_tests = [
'accounts_index',
'balance',
'metadata',
]
phone_tests = [
'ussd',
]
all_tests = eth_tests + custodial_tests + metadata_tests + phone_tests
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url')
argparser.add_argument('--ussd-provider', type=str, dest='ussd_provider', default='http://localhost:63315', help='cic-ussd url')
argparser.add_argument('--skip-custodial', dest='skip_custodial', action='store_true', help='skip all custodial verifications')
argparser.add_argument('--exclude', action='append', type=str, default=[], help='skip specified verification')
argparser.add_argument('--include', action='append', type=str, help='include specified verification')
@@ -98,6 +111,9 @@ args_override = {
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
config.add(args.meta_provider, '_META_PROVIDER', True)
config.add(args.ussd_provider, '_USSD_PROVIDER', True)
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
@@ -107,7 +123,6 @@ chain_str = str(chain_spec)
old_chain_spec = ChainSpec.from_chain_str(args.old_chain_spec)
old_chain_str = str(old_chain_spec)
user_dir = args.user_dir # user_out_dir from import_users.py
meta_url = args.meta_provider
exit_on_error = args.x
active_tests = []
@@ -271,7 +286,7 @@ class Verifier:
def verify_metadata(self, address, balance=None):
k = generate_metadata_pointer(bytes.fromhex(strip_0x(address)), ':cic.person')
url = os.path.join(meta_url, k)
url = os.path.join(config.get('_META_PROVIDER'), k)
logg.debug('verify metadata url {}'.format(url))
try:
res = urllib.request.urlopen(url)
@@ -299,6 +314,83 @@ class Verifier:
raise VerifierError(o_retrieved, 'metadata (person)')
def verify_metadata_phone(self, address, balance=None):
upper_address = strip_0x(address).upper()
f = open(os.path.join(
self.data_dir,
'new',
upper_address[:2],
upper_address[2:4],
upper_address + '.json',
), 'r'
)
o = json.load(f)
f.close()
p = Person.deserialize(o)
k = generate_metadata_pointer(p.tel.encode('utf-8'), ':cic.phone')
url = os.path.join(config.get('_META_PROVIDER'), k)
logg.debug('verify metadata phone url {}'.format(url))
try:
res = urllib.request.urlopen(url)
except urllib.error.HTTPError as e:
raise VerifierError(
'({}) {}'.format(url, e),
'metadata (phone)',
)
b = res.read()
address_recovered = json.loads(b.decode('utf-8'))
address_recovered = address_recovered.replace('"', '')
try:
address = strip_0x(address)
address_recovered = strip_0x(address_recovered)
except ValueError:
raise VerifierError(address_recovered, 'metadata (phone) address {} address recovered {}'.format(address, address_recovered))
if address != address_recovered:
raise VerifierError(address_recovered, 'metadata (phone)')
def verify_ussd(self, address, balance=None):
upper_address = strip_0x(address).upper()
f = open(os.path.join(
self.data_dir,
'new',
upper_address[:2],
upper_address[2:4],
upper_address + '.json',
), 'r'
)
o = json.load(f)
f.close()
p = Person.deserialize(o)
phone = p.tel
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'phoneNumber': phone,
'text': config.get('APP_SERVICE_CODE'),
}
req = urllib.request.Request(config.get('_USSD_PROVIDER'))
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
response = urllib.request.urlopen(req)
response_data = response.read().decode('utf-8')
state = response_data[:3]
out = response_data[4:]
m = '{} {}'.format(state, out[:7])
if m != 'CON Welcome':
raise VerifierError(response_data, 'ussd')
def verify(self, address, balance, debug_stem=None):
for k in active_tests:

View File

@@ -145,7 +145,7 @@ services:
- -c
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
/usr/local/bin/cic-cache-trackerd -vv -c /usr/local/etc/cic-cache
./start_tracker.sh -c /usr/local/etc/cic-cache -vv
volumes:
- contract-config:/tmp/cic/config/:ro
@@ -164,6 +164,7 @@ services:
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: 1
DATABASE_POOL_SIZE: 0
ETH_ABI_DIR: ${ETH_ABI_DIR:-/usr/local/share/cic/solidity/abi}
CIC_TRUST_ADDRESS: ${DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER:-0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996}
@@ -190,13 +191,13 @@ services:
context: apps
dockerfile: cic-cache/docker/Dockerfile
environment:
DATABASE_USER: $DATABASE_USER
DATABASE_HOST: $DATABASE_HOST
DATABASE_PORT: $DATABASE_PORT
DATABASE_PASSWORD: $DATABASE_PASSWORD
DATABASE_NAME: $DATABASE_NAME_CIC_CACHE
DATABASE_USER: ${DATABASE_USER:-grassroots}
DATABASE_HOST: ${DATABASE_HOST:-postgres}
DATABASE_PORT: ${DATABASE_PORT:-5432}
#DATABASE_PASSWORD: ${DATABASE_PASSWORD:-
DATABASE_NAME: ${DATABASE_NAME_CIC_CACHE:-cic_cache}
DATABASE_DEBUG: 1
PGPASSWORD: $DATABASE_PASSWORD
#PGPASSWORD: $DATABASE_PASSWORD
SERVER_PORT: 8000
ports:
- ${HTTP_PORT_CIC_CACHE:-63313}:8000
@@ -211,9 +212,10 @@ services:
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
"/usr/local/bin/uwsgi" \
--wsgi-file /usr/src/cic-cache/cic_cache/runnable/serverd.py \
--wsgi-file /usr/src/cic-cache/cic_cache/runnable/daemons/server.py \
--http :8000 \
--pyargv -vv
--pyargv "-vv"
cic-eth-tasker:
# image: grassrootseconomics:cic-eth-service
@@ -233,6 +235,7 @@ services:
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
DATABASE_POOL_SIZE: 0
PGPASSWORD: ${DATABASE_PASSWORD:-tralala}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996}
BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor}
@@ -430,6 +433,7 @@ services:
DATABASE_NAME: ${DATABASE_NAME_CIC_NOTIFY:-cic_notify}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_POOL_SIZE: 0
PGPASSWORD: ${DATABASE_PASSWORD:-tralala}
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_BROKER_URL:-redis://redis}
@@ -446,6 +450,7 @@ services:
cic-meta-server:
hostname: meta
build:
context: apps/
dockerfile: cic-meta/docker/Dockerfile
@@ -490,9 +495,10 @@ services:
DATABASE_ENGINE: postgresql
DATABASE_DRIVER: psycopg2
PGP_PASSPHRASE: merman
SERVER_PORT: 8000
SERVER_PORT: 9000
CIC_META_URL: ${CIC_META_URL:-http://meta:8000}
ports:
- ${HTTP_PORT_CIC_USSD:-63315}:8000
- ${HTTP_PORT_CIC_USSD:-63315}:9000
depends_on:
- postgres
- redis
@@ -501,7 +507,7 @@ services:
deploy:
restart_policy:
condition: on-failure
command: "/root/start_uwsgi.sh"
command: "/root/start_uwsgi.sh -vv"
cic-ussd-tasker:
# image: grassrootseconomics:cic-ussd
@@ -516,9 +522,11 @@ services:
DATABASE_NAME: cic_ussd
DATABASE_ENGINE: postgresql
DATABASE_DRIVER: psycopg2
DATABASE_POOL_SIZE: 0
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_BROKER_URL:-redis://redis}
PGP_PASSPHRASE: merman
CIC_META_URL: ${CIC_META_URL:-http://meta:8000}
depends_on:
- postgres
- redis
@@ -527,4 +535,4 @@ services:
deploy:
restart_policy:
condition: on-failure
command: "/root/start_tasker.sh -q cic-ussd"
command: "/root/start_tasker.sh -q cic-ussd -vv"