Merge branch 'master' into lash/rehabilitate-traffic-2

This commit is contained in:
nolash 2021-05-19 11:57:51 +02:00
commit 30d86884a4
47 changed files with 1454 additions and 257 deletions

View File

@ -1,22 +1,28 @@
# standard imports
import logging
import datetime
# third-party imports
# external imports
import moolb
# local imports
from cic_cache.db import list_transactions_mined
from cic_cache.db import list_transactions_account_mined
from cic_cache.db.list import (
list_transactions_mined,
list_transactions_account_mined,
list_transactions_mined_with_data,
)
logg = logging.getLogger()
class BloomCache:
class Cache:
def __init__(self, session):
self.session = session
class BloomCache(Cache):
@staticmethod
def __get_filter_size(n):
n = 8192 * 8
@ -87,3 +93,43 @@ class BloomCache:
f_blocktx.add(block + tx)
logg.debug('added block {} tx {} lo {} hi {}'.format(r[0], r[1], lowest_block, highest_block))
return (lowest_block, highest_block, f_block.to_bytes(), f_blocktx.to_bytes(),)
class DataCache(Cache):
def load_transactions_with_data(self, offset, end):
rows = list_transactions_mined_with_data(self.session, offset, end)
tx_cache = []
highest_block = -1;
lowest_block = -1;
date_is_str = None # stick this in startup
for r in rows:
if highest_block == -1:
highest_block = r['block_number']
lowest_block = r['block_number']
tx_type = 'unknown'
if r['value'] != None:
tx_type = '{}.{}'.format(r['domain'], r['value'])
if date_is_str == None:
date_is_str = type(r['date_block']).__name__ == 'str'
o = {
'block_number': r['block_number'],
'tx_hash': r['tx_hash'],
'date_block': r['date_block'],
'sender': r['sender'],
'recipient': r['recipient'],
'from_value': int(r['from_value']),
'to_value': int(r['to_value']),
'source_token': r['source_token'],
'destination_token': r['destination_token'],
'tx_type': tx_type,
}
if date_is_str:
o['date_block'] = datetime.datetime.fromisoformat(r['date_block'])
tx_cache.append(o)
return (lowest_block, highest_block, tx_cache)

View File

@ -28,6 +28,26 @@ def list_transactions_mined(
return r
def list_transactions_mined_with_data(
session,
offset,
end,
):
"""Executes db query to return all confirmed transactions according to the specified offset and limit.
:param offset: Offset in data set to return transactions from
:type offset: int
:param limit: Max number of transactions to retrieve
:type limit: int
:result: Result set
:rtype: SQLAlchemy.ResultProxy
"""
s = "SELECT tx_hash, block_number, date_block, sender, recipient, from_value, to_value, source_token, destination_token, domain, value FROM tx LEFT JOIN tag_tx_link ON tx.id = tag_tx_link.tx_id LEFT JOIN tag ON tag_tx_link.tag_id = tag.id WHERE block_number >= {} AND block_number <= {} ORDER BY block_number ASC, tx_index ASC".format(offset, end)
r = session.execute(s)
return r
def list_transactions_account_mined(
session,
address,

View File

@ -0,0 +1,110 @@
# standard imports
import logging
import json
import re
import base64
# local imports
from cic_cache.cache import (
BloomCache,
DataCache,
)
logg = logging.getLogger(__name__)
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
re_transactions_all_data = r'/txa/(\d+)/(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_data(session, env):
r = re.match(re_transactions_all_data, env.get('PATH_INFO'))
if not r:
return None
if env.get('HTTP_X_CIC_CACHE_MODE') != 'all':
return None
offset = r[1]
end = r[2]
if r[2] < r[1]:
raise ValueError('cart before the horse, dude')
c = DataCache(session)
(lowest_block, highest_block, tx_cache) = c.load_transactions_with_data(offset, end)
for r in tx_cache:
r['date_block'] = r['date_block'].timestamp()
o = {
'low': lowest_block,
'high': highest_block,
'data': tx_cache,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)

View File

@ -1,18 +1,20 @@
# standard imports
import os
import re
import logging
import argparse
import json
import base64
# third-party imports
# external imports
import confini
# local imports
from cic_cache import BloomCache
from cic_cache.db import dsn_from_config
from cic_cache.db.models.base import SessionBase
from cic_cache.runnable.daemons.query import (
process_transactions_account_bloom,
process_transactions_all_bloom,
process_transactions_all_data,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@ -44,72 +46,6 @@ logg.debug('config:\n{}'.format(config))
dsn = dsn_from_config(config)
SessionBase.connect(dsn, config.true('DATABASE_DEBUG'))
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)/?(\d+)?/?(\d+)/?'
DEFAULT_LIMIT = 100
def process_transactions_account_bloom(session, env):
r = re.match(re_transactions_account_bloom, env.get('PATH_INFO'))
if not r:
return None
address = r[1]
if r[2] == None:
address = '0x' + address
offset = DEFAULT_LIMIT
if r.lastindex > 2:
offset = r[3]
limit = 0
if r.lastindex > 3:
limit = r[4]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions_account(address, offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
def process_transactions_all_bloom(session, env):
r = re.match(re_transactions_all_bloom, env.get('PATH_INFO'))
if not r:
return None
offset = DEFAULT_LIMIT
if r.lastindex > 0:
offset = r[1]
limit = 0
if r.lastindex > 1:
limit = r[2]
c = BloomCache(session)
(lowest_block, highest_block, bloom_filter_block, bloom_filter_tx) = c.load_transactions(offset, limit)
o = {
'alg': 'sha256',
'low': lowest_block,
'high': highest_block,
'block_filter': base64.b64encode(bloom_filter_block).decode('utf-8'),
'blocktx_filter': base64.b64encode(bloom_filter_tx).decode('utf-8'),
'filter_rounds': 3,
}
j = json.dumps(o)
return ('application/json', j.encode('utf-8'),)
# uwsgi application
def application(env, start_response):
@ -119,10 +55,16 @@ def application(env, start_response):
session = SessionBase.create_session()
for handler in [
process_transactions_all_data,
process_transactions_all_bloom,
process_transactions_account_bloom,
]:
r = handler(session, env)
r = None
try:
r = handler(session, env)
except ValueError as e:
start_response('400 {}'.format(str(e)))
return []
if r != None:
(mime_type, content) = r
break

View File

@ -88,3 +88,16 @@ def txs(
tx_hash_first,
tx_hash_second,
]
@pytest.fixture(scope='function')
def tag_txs(
init_database,
txs,
):
db.add_tag(init_database, 'taag', domain='test')
init_database.commit()
db.tag_transaction(init_database, txs[1], 'taag', domain='test')

View File

@ -0,0 +1,31 @@
# standard imports
import json
# external imports
import pytest
# local imports
from cic_cache.runnable.daemons.query import process_transactions_all_data
def test_api_all_data(
init_database,
txs,
):
env = {
'PATH_INFO': '/txa/410000/420000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
j = process_transactions_all_data(init_database, env)
o = json.loads(j[1])
assert len(o['data']) == 2
env = {
'PATH_INFO': '/txa/420000/410000',
'HTTP_X_CIC_CACHE_MODE': 'all',
}
with pytest.raises(ValueError):
j = process_transactions_all_data(init_database, env)

View File

@ -9,6 +9,7 @@ import pytest
# local imports
from cic_cache import BloomCache
from cic_cache.cache import DataCache
logg = logging.getLogger()
@ -33,3 +34,23 @@ def test_cache(
assert b[0] == list_defaults['block'] - 1
def test_cache_data(
init_database,
list_defaults,
list_actors,
list_tokens,
txs,
tag_txs,
):
session = init_database
c = DataCache(session)
b = c.load_transactions_with_data(410000, 420000)
assert len(b[2]) == 2
assert b[2][0]['tx_hash'] == txs[1]
assert b[2][1]['tx_type'] == 'unknown'
assert b[2][0]['tx_type'] == 'test.taag'

View File

@ -16,4 +16,6 @@ def default_token(self):
return {
'symbol': self.default_token_symbol,
'address': self.default_token_address,
'name': self.default_token_name,
'decimals': self.default_token_decimals,
}

View File

@ -22,6 +22,7 @@ from chainlib.eth.connection import (
from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx
from cic_eth_registry.error import UnknownContractError
from cic_eth_registry.erc20 import ERC20Token
import liveness.linux
@ -36,7 +37,7 @@ from cic_eth.eth import (
from cic_eth.admin import (
debug,
ctrl,
token
token,
)
from cic_eth.queue import (
query,
@ -75,7 +76,6 @@ argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
argparser.add_argument('-r', type=str, help='CIC registry address')
argparser.add_argument('--default-token-symbol', dest='default_token_symbol', type=str, help='Symbol of default token to use')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
@ -121,20 +121,25 @@ broker = config.get('CELERY_BROKER_URL')
if broker[:4] == 'file':
bq = tempfile.mkdtemp()
bp = tempfile.mkdtemp()
current_app.conf.update({
conf_update = {
'broker_url': broker,
'broker_transport_options': {
'data_folder_in': bq,
'data_folder_out': bq,
'data_folder_processed': bp,
},
},
)
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
logg.warning('celery broker dirs queue i/o {} processed {}, will NOT be deleted on shutdown'.format(bq, bp))
else:
current_app.conf.update({
'broker_url': broker,
})
conf_update = {
'broker_url': broker,
}
if config.true('CELERY_DEBUG'):
conf_update['result_extended'] = True
current_app.conf.update(conf_update)
result = config.get('CELERY_RESULT_URL')
if result[:4] == 'file':
@ -203,6 +208,11 @@ def main():
BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')
BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol)
default_token = ERC20Token(chain_spec, rpc, BaseTask.default_token_address)
default_token.load(rpc)
BaseTask.default_token_decimals = default_token.decimals
BaseTask.default_token_name = default_token.name
BaseTask.run_dir = config.get('CIC_RUN_DIR')
logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address))

View File

@ -60,14 +60,16 @@ admin_api = AdminApi(None)
def main():
t = admin_api.registry()
registry = t.get()
print('Registry address: {}'.format(registry))
registry_address = t.get()
print('Registry: {}'.format(registry_address))
t = api.default_token()
token_info = t.get()
print('Default token symbol: {}'.format(token_info['symbol']))
print('Default token address: {}'.format(token_info['address']))
logg.debug('Default token name: {}'.format(token_info['name']))
logg.debug('Default token decimals: {}'.format(token_info['decimals']))
if __name__ == '__main__':
main()

View File

@ -33,6 +33,8 @@ class BaseTask(celery.Task):
create_gas_oracle = RPCGasOracle
default_token_address = None
default_token_symbol = None
default_token_name = None
default_token_decimals = None
run_dir = '/run'
def create_session(self):

View File

@ -1,3 +1,4 @@
[celery]
broker_url = redis://
result_url = redis://
debug = 0

View File

@ -1,3 +1,4 @@
[celery]
broker_url = redis://localhost:63379
result_url = redis://localhost:63379
debug = 0

View File

@ -0,0 +1,22 @@
@node cic-eth-accounts
@section Accounts
Accounts are private keys in the signer component keyed by "addresses," a one-way transformation of a public key. Data can be signed by using the account as identifier for corresponding RPC requests.
Any account to be managed by @code{cic-eth} must be created by the corresponding task. This is because @code{cic-eth} creates a @code{nonce} entry for each newly created account, and guarantees that every nonce will only be used once in its threaded environment.
The calling code receives the account address upon creation. It never receives or has access to the private key.
@subsection Signer RPC
The signer is expected to handle a subset of the standard JSON-RPC:
@table @code
@item personal_newAccount(password)
Creates a new account, returning the account address.
@item eth_signTransactions(tx_dict)
Sign the transaction represented as a dictionary.
@item eth_sign(address, message)
Signs an arbtirary message with the standard Ethereum prefix.
@end table

View File

@ -0,0 +1,60 @@
@node cic-eth system maintenance
@appendix Admin API
The admin API is still in an early stage of refinement. User friendliness can be considerably improved.
All of the API calls are celery task proxies, and return @code{Celery.AsyncResult} unless otherwise noted.
In contrast to the client API module, this API does not currently implement a pluggable callback.
@appendixsection registry
Returns the @code{ContractRegistry} this instance of @code{cic-eth-tasker} is running on.
@appendixsection proxy-do
Execute an arbitary JSON-RPC request using the @code{cic-eth-tasker} blockchain node RPC connection.
@appendixsection default_token
Returns the default token symbol and address.
@appendixsection lock
Set lock bits, globally or per address
@appendixsection unlock
Opposite of lock
@appendixsection get_lock
Get the current state of a lock
@appendixsection tag_account
Associate an identifier with an account address (@xref{cic-eth system accounts})
@appendixsection have_account
Check whether a private key exists in the keystore able to sign on behalf of the given account (it actually performs a signature).
@appendixsection resend
Clone or resend a transaction
@appendixsection check_nonce
Returns diagnostics for nonce sequences per account, e.g. detect nonce gaps that block execution of further transactions.
@appendixsection fix_nonce
Re-orders all nonces by shifting all transaction nonces after the given transaction down by one. This has the additional effect of obsoleting the given transaction. Can be used to close gaps in the nonce sequencing. Use with care!
@appendixsection account
Return brief transaction info lists per account
@appendixsection tx
Return a complex transaction metadata object for a single transaction. The object assembles state from both the blockchain node and the custodial queue system.

View File

@ -0,0 +1,18 @@
\input texinfo
@setfilename index.html
@settitle CIC custodial services reference deployment
@copying
Released 2021 under GPL3
@end copying
@titlepage
@title CIC custodial services reference deployment
@author Louis Holbrook
@end titlepage
@c
@contents
@include index.texi

View File

@ -0,0 +1,4 @@
@node cic-eth Appendix Task chains
@appendix Task chains
TBC - explain here how to generate these chain diagrams

View File

@ -0,0 +1,108 @@
@node cic-eth configuration
@section Configuration
(refer to @code{cic-base} for a general overview of the config pipeline)
Configuration parameters are grouped by configuration filename.
@subsection cic
@table @var
@item registry_address
Ethereum address of the @var{ContractRegistry} contract
@item chain_spec
String representation of the connected blockchain according to the @var{chainlib} @var{ChainSpec} format.
@item tx_retry_delay
Minimum time in seconds to wait before retrying a transaction
@item trust_address
Comma-separated list of one or more ethereum addresses regarded as trusted for describing other resources, Used by @var{cic-eth-registry} in the context of the @var{AddressDeclarator}.
@item defalt_token_symbol
Fallback token to operate on when no other context is given.
@item health_modules
Comma-separated list of methods to execute liveness tests against. (see ...)
@item run_dir
Directory to use for session-scoped variables for @var{cic-eth} daemon parent processes.
@end table
@subsection celery
@table @var
@item broker_url
Message broker URL
@item result_url
Result backend URL
@item debug
Boolean value. If set, the amount of available context for a task in the result backend will be maximized@footnote{This is a @emph{required} setting for the task graph documenter to enabled it to display task names in the graph}.
@end table
@subsection database
See ref cic-base when ready
@subsection eth
@table @var
@item provider
Address of default RPC endpoint for transactions and state queries.
@item gas_gifter_minimum_balance
The minimum gas balance that must be held by the @code{GAS GIFTER} token before the queue processing shuts down@footnote{You should really make sure that this threshold is never hit}
@end table
@subsection redis
Defines connection to the redis server used outside of the context of @var{celery}. This is usually the same server, but should be a different db.
@table @var
@item host
Redis hostname
@item port
Redis port
@item db
Redis db
@end table
@subsection signer
Parameters
@table @var
@item socket_path
The connection string for the signer JSON-RPC service.@footnote{The @var{crypto-dev-signer} supports UNIX socket or a HTTP(S) connections}
@item secret
If set, this password is used to add obfuscation on top of the encryption already applied by the signer for the keystore.
@end table
@subsection ssl
Certificate information for https api callbacks.
@table @var
@item enable_client
Boolean value. If set, client certificate will be used to authenticate the callback request.
@item cert_file
Client certificate file in PEM or DER format
@item key_file
Client key file in PEM or DER format
@item password
Password for unlocking the client key
@item ca_file
Certificate authority bundle, to verify the certificate sent by the callback server.
@end table
@subsection syncer
@table @var
@item loop_interval
Seconds to pause before each execution of the @var{chainsyncer} poll loop.
@end table

View File

@ -0,0 +1,46 @@
@node cic-eth-dependencies
@section Dependencies
This application is written in Python 3.8. It is tightly coupled with @code{python-celery}, which provides the task worker ecosystem. It also uses @code{SQLAlchemy} which provides useful abstractions for persistent storage though SQL, and @code{alembic} for database schema migrations.
There is currently also a somewhat explicit coupling with @code{Redis}, which is used as message broker for @code{python-celery}. @code{Redis} is also explicitly used by some CLI tools to retrieve results from command execution. This coupling may be relaxed in the future to allow other key-value pubsub solutions instead.
@subsection Generalized project dependencies
The core features are built around four main independent components that have been developed for the purpose of this project, but are separated and maintained as general-purpose libraries.
@table @samp
@item chainlib
A cross-chain library prototype that can provide encodings for transactions on a Solidity-based EVM contract network.
@item chainqueue
Queue manager that guarantees delivery of outgoing blockchain transactions.
@item chainsyncer
Monitors blockchains and guarantees execution of an arbitrary count of pluggable code objects for each block transaction.
@item crypto-dev-signer
An keystore capable of signing for the EVM chain through a standard Ethereum JSON-RPC interface.
@end table
@anchor{cic-eth-dependencies-smart-contracts}
@subsection Smart contract dependencies
The Smart contracts needed by the network must be discoverable through a single entry point called the Contract Registry. The contract registry is expected to reference itself in its records. The authenticity of the contract registry must be guaranteed by external sources of trust.
The contract registry maps contract addresses to well-known identifiers. The contracts are as follows:
@table @code
@item ContractRegistry (points to self)
Resolves plaintext identifiers to contract addresses.
@item AccountRegistry
An append-only store of accounts hosted by the custodial system
@item TokenRegistry
Unique symbol-to-address mappings for token contracts
@item AddressDeclarator
Reverse address to resource lookup
@item TokenAuthorization
Escrow contract for external spending on behalf of custodial users
@item Faucet
Called by newly created accounts to receive initial token balance
@end table
The dependency @code{cic-eth-registry} abstracts and facilitates lookups of resources on the blockchain network. In its current state it resolves tokens by symbol or address, and contracts by common-name identifiers. In the @code{cic-eth} code all lookups for EVM network resources will be performed through this dependency.

View File

@ -0,0 +1,49 @@
@node cic-eth-incoming
@section Incoming transactions
All transactions in mined blocks will be passed to a selection of plugin filters to the @code{chainsyncer} component. Each of these filters are individual python module files in @code{cic_eth.runnable.daemons.filters}. This section describes their function.
The status bits refer to the bits definining the @code{chainqueue} state.
@subsection tx
Looks up the transaction in the local queue, and if found it sets the @code{FINAL} state bit. If the contract code execution was unsuccessful, the @code{NETWORK ERROR} state bit is also set.
@subsection gas
If the transaction is a gas token transfer, it checks if the recipient is a custodial account awaiting gas refill to execute a transaction (the queue item will have the @code{GAS ISSUES} bit set). If this is the case, the transaction will be activated by setting the @code{QUEUED} bit.
@subsection register
If the transaction is an account registration@footnote{The contract keyed by @var{AccountRegistry} in the @var{ContractRegistry} contract}, a Faucet transaction will be triggered for the registered account@footnote{The faucet contract used in the reference implementation will verify whether the account calling it is registered in the @var{AccountRegistry}. Thus it cannot be called before the account registration has succeeded.}
@subsection callback
Executes, in order, Celery tasks defined in the configuration variable @var{TASKS_TRANSFER_CALLBACKS}. Each of these tasks are registered as individual filters in the @code{chainsyncer} component, with the corresponding execution guarantees.
The callbacks will receive the following arguments
@enumerate
@item @strong{result}
A complex representation of the transaction (see section ?)
@item @strong{transfertype}
A string describing the type of transaction found@footnote{See appendix ? for an overview of possible values}
@item @strong{status}
0 if contract code executed successfully. Any other value is an error@footnote{The values 1-1024 are reserved for system specific errors. In the current implementation only a general error state with value 1 is defined. See appendix ?.}
@end enumerate
@subsection transferauth
If a valid transfer authorization request has been made, a token @emph{allowance}@footnote{@code{approve} for ERC20 tokens} transaction is executed on behalf of the custodial account, with the @var{TransferAuthorization} contract as spender.
@subsection convert
If the transaction is a token conversion, @emph{and} there is a pending transfer registered for the conversion, the corresponding token transfer transaction will be executed. Not currently implemented

View File

@ -0,0 +1,14 @@
@top cic-eth
@include intro.texi
@include dependencies.texi
@include configuration.texi
@include system.texi
@include interacting.texi
@include outgoing.texi
@include incoming.texi
@include services.texi
@include tools.texi
@include admin.texi
@include chains.texi
@include transfertypes.texi

View File

@ -0,0 +1,109 @@
@node cic-eth-interacting
@section Interacting with the system
The API to the @var{cic-eth} component is a proxy for executing @emph{chains of Celery tasks}. The tasks that compose individual chains are documented in @ref{cic-eth Appendix Task chains,the Task Chain appendix}, which also describes a CLI tool that can generate graph representationso of them.
There are two API classes, @var{Api} and @var{AdminApi}. The former is described later in this section, the latter described in @ref{cic-eth system maintenance,the Admin API appendix}.
@subsection Interface
API calls are constructed by creating @emph{Celery task signatures} and linking them together, sequentially and/or in parallell. In turn, the tasks themselves may spawn other asynchronous tasks. This means that the code in @file{cic_eth.api.*} does not necessarily specify the full task graph that will be executed for any one command.
The operational guarantee that tasks will be executed, not forgotten, and retried under certain circumstances is deferred to @var{Celery}. On top of this, the @var{chainqueue} component ensures that semantic state changes that the @code{Celery} tasks ask of it are valid.
@anchor{cic-eth-locking}
@subsection Locking
All methods that make a change to the blockchain network must pass @emph{locking layer checks}. Locks may be applied on a global or per-address basis. Lock states are defined by a combination of bit flags. The implemented lock bits are:
@table @var
@item INIT
The system has not yet been initialized. In this state, writes are limited to creating unregistered accounts only.
@item QUEUE
Items may not be added to the queue
@item SEND
Queued items may not be attempted sent to the network
@item CREATE (global-only)
New accounts may not be created
@item STICKY
Until reset, no other part of the locking state can be reset
@end table
@subsection Callback
All API calls provide the option to attach a callback to the end of the task chain. This callback will be executed regardless of whether task chain execution succeeded or not.
Refer to @file{cic-eth.callbacks.noop.noop} for the expected callback signature.
@subsection API Methods that change state
@subsubsection create_account
Creates a new account in the keystore, optionally registering the account with the @var{AccountRegistry} contract.
@subsubsection transfer
Attempts to execute a token transaction between two addresses. It is the caller's responsibility to check whether the token balance is sufficient for the transactions.
@subsubsection refill_gas
Executes a gas token transfer to a custodial address from the @var{GAS GIFTER} system account.
@subsubsection convert
Converts a token to another token for the given custodial account. Currently not implemented.
@anchor{cic-eth-convert-and-transfer}
@subsubsection convert_and_transfer
Same as convert, but will automatically execute a token transfer to another custodial account when conversion has been completed. Currently not implemented.
@subsection Read-only API methods
@subsubsection balance
Retrieves a complex balance statement of a single account, including:
@itemize
@item The network balance at the current block height
@item Value reductions due to by pending outgoing transactions
@item Value increments due to by pending incoming transactions
@end itemize
Only the first of these balance items has guaranteed finality. The reduction by outgoing transaction can be reasonably be assumed to eventually become final. The same applies for the increment by incoming transaction, @emph{unless} the transfer is part of a multiple-transaction operation. For example, a @ref{cic-eth-convert-and-transfer,convert_and_transfer} operation may fail in the convert stage and/or may yield less tokens then expected after conversion.
@subsubsection list
Returns an aggregate iist of all token value changes for a given address. As not all value transfers are a result of literal value transfer contract calls (e.g. @var{transfer} and @var{transferFrom} in @var{ERC20}), this data may come from a number of sources, including:
@itemize
@item Literal value transfers within the custodial system
@item Literal value transfers from or to an external address
@item Faucet invocations (token minting)
@item Demurrage and redistribution built into the token contract
@end itemize
@subsubsection default_token
Return the symbol and address of the token used by default in the network.
@subsubsection ping
Convenience method for the caller to check whether the @var{cic-eth} engine is alive.

View File

@ -0,0 +1,74 @@
@node cic-eth-outgoing
@section Outgoing transactions
@strong{Important! A pre-requisite for proper functioning of the component is that no other agent is sending transactions to the network for any of the keys in the keystore.}
The term @var{state bit} refers to the bits definining the @code{chainqueue} state.
@subsection Lock
Any task that changes blockchain state @strong{must} apply a @code{QUEUE} lock for the address it operates on. This is to ensure that transactions are sent to the network in order.@footnote{If too many transactions arrive out of order to the blockchain node, it may arbitrarily prune those that cannot directly be included in a block. This puts unnecessary strain (and reliance) on the transaction retry mechanism.}
This lock will be released once the blockchain node confirms handover of the transaction.@footnote{This is the responsibility of the @var{dispatcher} service}
@subsection Nonce
A separate task step is executed for binding a transaction nonce to a Celery task root id, which uniquely identifies the task chain. This provides atomicity of the nonce across the parallell task environment, and also recoverability in case unexpected program interruption.
The nonce of a permanently failed task must be @emph{manually} unlocked. Celery tasks that involve nonces who permanently fail are to be considered @emph{critical anomalies} and should not happen. The queue locking mechanism is designed to prevent the amount of out-of-sequence transactions for an account to escalate.
@subsection Choosing fee prices
@code{cic-eth} uses the @code{chainlib} module to resolve gas price lookups.
Optimizing gas price discovery should be the responsibility of the chainlib layer. It already accommodates using an separate RPC for the @code{eth_gasPrice} call.@footnote{A sample implementation of a gas price tracker speaking JSON-RPC (also built using chainlib/chainsyncer) can be found at @url{https://gitlab.com/nolash/eth-stat-syncer}.}
@subsection Choosing gas limits
To determine the gas limit of a transaction, normally the EVM node will be used to perform a dry-run exection of the inputs against the current chain state.
As the current state of the custodial system should only rely on known, trusted contract bytecode, there is no real need for this mechanism. The @code{chainlib}-based contract interfaces are expected to provide a method call that return safe gas limit values for contract interactions.@footnote{Of course, this method call may in turn conceal more sophisticated gas limit heuristics.}
Note that it is still the responsibility of @code{cic-eth} to make sure that the gas limit of the network is sufficient to allow execution of all needed contracts.
@subsection Gas refills
If the gas balance of a custodial account is below a certain threshold, a gas refill task will be spawned. The gas will be transferred from the @code{GAS GIFTER} system account.
In the event that the balance is insufficient even for the imminent transaction@footnote{This will of course be the case when an account is first created, whereupon it has a balance of 0. The subsequent faucet call will spawn a gas refill task.}, execution of the transaction will be deferred until the gas refill transaction is completed. In this case the transaction will be marked with the @code{GAS ISSUES} state bit.
The value chosen for the gas refill threshold should ideally allow enough of a margin to avoid the need of deferring transactions in the future.
@subsection Queueing transactions
Once the lock, nonce and gas processing parts has been completed, the transaction will be queued for sending. This means that the @code{QUEUED} state bit is set. From here the @ref{cic-eth-services-dispatcher,dispatcher service} takes over responsibility.
@subsection Retrying transactions
There are three conditions create the need to defer and retry transactions.
The first is communication problems with the blockchain node itself, for example if it is overloaded or being restarted. As far as possible, retries of this nature will be left to the Celery task workers. There may be cases, however, where it is appropriate to hand the responsibility to the @code{chainqueue} instead. In this case, the queue item will have the @code{NODE ERROR} state bit set.
The second condition occurs when transactions take too long to be confirmed by the network. In this case, the transaction will be re-submitted, but with a higher gas price.
The third condition occurs when the blockchain node purges the transaction from the mempool before it is sent to the network. @code{cic-eth} does not distinguish this case from the second, as the issue is solved using the same mechanism.
@subsubsection Transaction obsoletion
"Re-submitting" a transaction means creating a transaction with a previously used nonce for an account address.
When this happens, The @code{chainqueue} will still contain all previous transactions with the same nonce. The transaction being superseded will have the @code{OBSOLETED} state bit set.
Once a transaction has been mined, all other transactions with the same node will have the @code{OBSOLETED} and @code{FINAL} state bits set.
@subsection Unexpected conditions
Any unexpected condition exposing the need for urgent code improvement and/or manual intervention will be signalled by marking the transaction with the @code{FUBAR} state bit set.

View File

@ -1,24 +0,0 @@
@node cic-eth
@chapter cic-eth
@section Overview
@code{cic-eth} is the heart of the custodial account component. It is a combination of python-celery task queues and daemons that sign, dispatch and monitor blockchain transactions, aswell as triggering tasks contingent on other transactions.
@subsection Dependencies
The @code{cic-registry} module is used as a cache for contracts and tokens on the network.
A web3 JSON-RPC service that transparently proxies a keystore and provides transaction and message signing. The current development version uses the python web3 middleware feature to route methodsi involving the keystore to the module @code{crypto-dev-signer}, which is hosted on @file{pypi.org}.
@subsection What does it do
@subsection Tasks
Two main categories exist for tasks, @code{eth} and @code{queue}.
The @code{eth} tasks provide means to construct and decode Ethereum transactions, as well as interfacing the underlying key store.
Tasks in the @code{queue} module operate on the state of transactions queued for processing by @code{cic-eth}.

View File

@ -0,0 +1,50 @@
@node cic-eth-services
@section Services
There are four daemons that together orchestrate all of the aforementioned recipes. This section will provide a high level description of them.
Each of them have their own set of command line flags. These are available in the CLI help text provided by @kbd{-h} @kbd{--help} and are not recited here.
Daemon executable scripts are located in the @file{cic_eth.runnable.daemons} package. If @var{cic-eth} is installed as a python package, they are installed as executables in @var{PATH}.
@subsection tasker
This is the heart of the custodial system. Tasker is the parent process for the celery workers executing all tasks interacting with and changing the state of the queue and the chain. It is also the only service that interfaces with the signer/keystore.
The other @var{cic-eth} daemons all interface with this component, along with any client adapter bridging an end-user gateway (e.g. @var{cic-ussd}). However, the service itself does not have to be actively running for the other services to run; @var{Celery} handles queueing up the incoming tasks until the @var{tasker} comes back online.@footnote{Whereas this is true, there is currently no fail-safe implemented to handles the event of task backlog overflow in Celery. Furthermore, no targeted testing has yet been performed to asses the stability of the system over time if a sudden, sustained surge of resumed task executions occurs. It may be advisable to suspend activity that adds new queue items to the system if volume is high and/or the @var{cic-eth} outage endures. However, there is no panacea for this condition, as every usage scenario is different}
The tasker has a set of pre-requisites that must be fulfilled before it will start
@itemize
@item It must be given a valid @var{ContractRegistry} address, which must include valid references to all contracts specified in @ref{cic-eth-dependencies-smart-contracts,Smart contract dependencies}
@item The gas gifter balance must be above the minimum threshold (See "eth" section in configurations).
@item There must be a valid alembic migration record in the storage database
@item The redis backend must be reachable and writable
@item There must be a reachable JSON-RPC server at the other end of the signer socket path (see "signer" section in configurations)
@end itemize
@subsection tracker
Implements the @var{chainsyncer}, and registers the filters described in @ref{cic-eth-incoming,Incoming Transactions} to be executed for every transaction. It consumes the appropriate @var{TASKS_TRANSFER_CALLBACKS} configuration setting to add externally defined filters at without having to change the daemon code.
The @var{tracker} has the same requisities for the @var{ContractRegistry} as the @var{tasker}.
@strong{Important! Guarantees of filter executions has some caveats. Refer to the @var{chainsyncer} documentation for more details.}
@anchor{cic-eth-services-dispatcher}
@subsection dispatcher
Uses the @code{get_upcoming_tx} method call from @var{chainqueue} to receive batches of queued transactions that are ready to send to the blockchain node. Every batch will only contain a single transaction by any one address, which will be the transaction with the next nonce not previously seen by the network. There is no limit currently set to how many transactions that will be included in a single batch.
@subsection retrier
The responsibility of the @var{retrier} is to re-queue transactions that failed to be sent to the blockchain node, as well as create @emph{replacements} for transactions whose processing by the network has been delayed. @strong{[refer transaction obolestion]}.
It is in turn the responsiblity of the @var{dispatcher} to send these (re-)queued transactions to the blockchain node.

View File

@ -0,0 +1,17 @@
@node cic-eth system accounts
@section System initialization
When the system starts for the first time, it is locked for any state change request other than account creation@footnote{Specifically, the @code{INIT}, @code{SEND} and @code{QUEUE} lock bits are set.}. These locks should be @emph{reset} once system initialization has been completed. Currently, system initialization only involves creating and tagging required system accounts, as specified below.
See @ref{cic-eth-locking,Locking} and @ref{cic-eth-tools-ctrl,ctrl in Tools} for details on locking.
@subsection System accounts
Certain accounts in the system have special roles. These are defined by @emph{tagging} certain accounts addresses with well-known identifiers.
@table @var
@item GAS_GIFTER
This account @strong{must} at all times have enough gas token to fund any custodial account address in need.
@item ACCOUNT_REGISTRY_WRITER
This account @strong{must} have access to add newly created account addresses to the (@xref{cic-eth-dependencies-smart-contracts,Smart contract dependencies})
@end table

View File

@ -0,0 +1,51 @@
@node cic-eth-tools
@section Tools
A collection of CLI tools have been provided to help with diagnostics and other administrative tasks. These use the same configuration infrastructure as the daemons.
Tool scripts are located in the @file{cic_eth.runnable} package. If @var{cic-eth} is installed as a python package, they are installed as executables in @var{PATH}.
@subsection info (cic-eth-info)
Returns self-explanatory metadata for the blockchain network, and optionally an address.
@subsection inspect (cic-eth-inspect)
Returns information about a specific resource related to the tranasaction queue. The results returned depend on the type of the argument.
@table @var
@item lock
If the argument is the literal string @kbd{lock}, it will list all active lock settings currently in effect. (@xref{cic-eth-locking})
@item <address>
If the argument is a 0x-prefixed hex string of 42 characters, it returns all transactions where the specified address is a sender or recipient@footnote{If the address is the gas gifter or the accounts index writer, this may be a @emph{lot} of transactions. Use with care!}
@item <tx_hash>
If the argument is a 0x-prefixed hex string of 66 characters, it returns data from the custodial queueing system aswell as the network for a single transaction whose hash matches the input. Fails if the transaction does not exist in the queue
@item <code>
If the argument is a 0x-prefixed hex string longer than 66 bytes, the argument will be interpreted as raw RLP serialized transaction data, and attempt to match this with an entry in the queue. If a match is found, the result is the same as for @var{<tx_hash>}
@end table
@subsection create (cic-eth-create)
Create a new account, optionally registering the account in the accounts registry, and optionally receiving the newly created address through a redis subscription.
@subsection transfer (cic-eth-transfer)
Execute a token transfer on behalf of a custodial account.
@subsection tag (cic-eth-tag)
Associate an account address with a string identifier. @xref{cic-eth system accounts}
@anchor{cic-eth-tools-ctrl}
@subsection ctrl (cic-eth-ctrl)
Set or reset lock bits, globally or per account address.
@subsection resend (cic-eth-resend)
Resend a transaction. This can either be done "in-place," which means increasing the gas price and re-queueing@footnote{this is the same thing that the retrier does}. It can also be used to @emph{clone} a transaction, which obviously will duplicate the effect of the cloned transaction on the blockchain network.

View File

@ -0,0 +1,11 @@
@node cic-eth Appendix Transaction types
@appendix Transfer types
@table @var
@item transfer
A regular token transfer, e.g. ERC20 @code{transfer}
@item transferfrom
A token transfer performed on behalf of another party, e.g. ERC20 @code{transferFrom}
@item tokengift
Result of a successful faucet request.
@end table

View File

@ -1,4 +1,4 @@
[AFRICASTALKING]
api_username = foo
api_key = bar
api_sender_id = baz
api_username =
api_key =
api_sender_id =

View File

@ -120,7 +120,7 @@ class MetadataRequestsHandler(Metadata):
data = json.loads(response_data.decode('utf-8'))
if result.status_code == 200 and self.cic_type == ':cic.person':
person = Person()
deserialized_person = person.deserialize(person_data=json.loads(data))
deserialized_person = person.deserialize(person_data=data)
data = json.dumps(deserialized_person.serialize())
cache_data(self.metadata_pointer, data=data)
logg.debug(f'caching: {data} with key: {self.metadata_pointer}')

View File

@ -325,6 +325,14 @@ def process_menu_interaction_requests(chain_str: str,
# get user
user = Account.session.query(Account).filter_by(phone_number=phone_number).first()
# retrieve and cache user's metadata
blockchain_address = user.blockchain_address
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
# find any existing ussd session
existing_ussd_session = UssdSession.session.query(UssdSession).filter_by(
external_session_id=external_session_id).first()

View File

@ -371,13 +371,6 @@ def process_start_menu(display_key: str, user: Account):
# get operational balance
operational_balance = compute_operational_balance(balances=balances_data)
# retrieve and cache account's metadata
s_query_person_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_person_metadata',
[blockchain_address]
)
s_query_person_metadata.apply_async(queue='cic-ussd')
# retrieve and cache account's statement
retrieve_account_statement(blockchain_address=blockchain_address)

View File

@ -1,7 +1,7 @@
en:
account_successfully_created: |-
Hello, you have been registered on Sarafu Network! Your balance is %{balance} %{token_symbol}. To use dial *483*46#. For help 0757628885.
You have been registered on Sarafu Network! To use dial *384*96# on Safaricom and *483*96# on other networks. For help %{support_phone}.
received_tokens: |-
Successfully received %{amount} %{token_symbol} from %{tx_sender_information} %{timestamp}. New balance is %{balance} %{token_symbol}.
terms: |-
By using the service, you agree to the terms and conditions at https://www.grassrootseconomics.org/terms-and-conditions.
By using the service, you agree to the terms and conditions at http://grassecon.org/tos

View File

@ -1,7 +1,7 @@
sw:
account_successfully_created: |-
Habari, umesajiliwa kwa huduma ya sarafu! Salio lako ni %{token_symbol} %{balance}. Kutumia bonyeza *483*46#. Kwa Usaidizi 0757628885.
Umesajiliwa kwa huduma ya Sarafu! Kutumia bonyeza *384*96# Safaricom ama *483*46# kwa utandao tofauti. Kwa Usaidizi %{support_phone}.
received_tokens: |-
Umepokea %{amount} %{token_symbol} kutoka kwa %{tx_sender_information} %{timestamp}. Salio la %{token_symbol} ni %{balance}.
terms: |-
Kwa kutumia hii huduma, umekubali sheria na masharti yafuatayo https://www.grassrootseconomics.org/terms-and-conditions.
Kwa kutumia hii huduma, umekubali sheria na masharti yafuatayo http://grassecon.org/tos

View File

@ -1,29 +1,30 @@
en:
kenya:
initial_language_selection: |-
CON Welcome to Sarafu
CON Welcome to Sarafu Network
1. English
2. Kiswahili
3. Help
initial_pin_entry: |-
CON Please enter a PIN to manage your account.
CON Please enter a new four number PIN for your account.
0. Back
initial_pin_confirmation: |-
CON Enter your PIN again
CON Enter your four number PIN again
0. Back
enter_given_name: |-
CON Enter first name
0. Back
enter_family_name: |-
CON Enter last name
CON Enter family name
0. Back
enter_gender: |-
CON Enter gender
1. Male
2. Female
3. Other
0. Back
enter_location: |-
CON Enter location
CON Enter your location
0. Back
enter_products: |-
CON Please enter a product or service you offer
@ -83,34 +84,34 @@ en:
Please enter your PIN to confirm.
0. Back
retry: |-
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining.
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining
0. Back
display_metadata_pin_authorization:
first: |-
CON Please enter your PIN.
CON Please enter your PIN
0. Back
retry: |-
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining.
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining
0. Back
account_balances_pin_authorization:
first: |-
CON Please enter your PIN to view balances.
CON Please enter your PIN to view balances
0. Back
retry: |-
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining.
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining
0. Back
account_statement_pin_authorization:
first: |-
CON Please enter your PIN to view statement.
CON Please enter your PIN to view statement
0. Back
retry: |-
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining.
CON Please enter your PIN. You have %{remaining_attempts} attempts remaining
0. Back
account_balances: |-
CON Your balances are as follows:
balance: %{operational_balance} %{token_symbol}
taxes: %{tax} %{token_symbol}
bonsuses: %{bonus} %{token_symbol}
fees: %{tax} %{token_symbol}
rewards: %{bonus} %{token_symbol}
0. Back
first_transaction_set: |-
CON %{first_transaction_set}
@ -140,9 +141,9 @@ en:
exit_pin_blocked: |-
END Your PIN has been blocked. For help, please call %{support_phone}.
exit_invalid_pin: |-
END The PIN you have entered is Invalid. PIN must consist of 4 digits. For help, call %{support_phone}.
END The PIN you have entered is invalid. PIN must consist of 4 digits. For help, call %{support_phone}.
exit_invalid_new_pin: |-
END The PIN you have entered is Invalid. PIN must be different from your current PIN. For help, call %{support_phone}.
END The PIN you have entered is invalid. PIN must be different from your current PIN. For help, call %{support_phone}.
exit_pin_mismatch: |-
END The new PIN does not match the one you entered. Please try again. For help, call %{support_phone}.
exit_invalid_recipient: |-
@ -169,4 +170,4 @@ en:
00. Back
99. Exit
account_creation_prompt: |-
Your account is being created. You will receive an SMS when your account is ready.
Your account is being created. You will receive an SMS when your account is ready.

View File

@ -89,7 +89,12 @@ After this step is run, you can find top-level ethereum addresses (like the cic
#### Custodial provisions
response_data = send_ussd_request(address, self.data_dir)
state = response_data[:3]
out = response_data[4:]
m = '{} {}'.format(state, out[:7])
if m != 'CON Welcome':
raise VerifierError(response_data, 'ussd')
This step is _only_ needed if you are importing using `cic_eth` or `cic_ussd`
`RUN_MASK=2 docker-compose up contract-migration`
@ -104,8 +109,8 @@ If importing using `cic_eth` or `cic_ussd` also run:
* cic-eth-retrier
If importing using `cic_ussd` also run:
* cic-ussd-tasker
* cic-ussd-server
* cic-user-tasker
* cic-user-ussd-server
* cic-notify-tasker
If metadata is to be imported, also run:
@ -169,6 +174,26 @@ In second terminal:
`python cic_ussd/import_users.py -v -c config out`
##### Importing pins and ussd data (optional)
Once the user imports are complete the next step should be importing the user's pins and auxiliary ussd data. This can be done in 3 steps:
In one terminal run:
`python create_import_pins.py -c config -v --userdir <path to the users export dir tree> pinsdir <path to pin export dir tree>`
This script will recursively walk through all the directories defining user data in the users export directory and generate a csv file containing phone numbers and password hashes generated using fernet in a manner reflecting the nature of said hashes in the old system.
This csv file will be stored in the pins export dir defined as the positional argument.
Once the creation of the pins file is complete, proceed to import the pins and ussd data as follows:
- To import the pins:
`python cic_ussd/import_pins.py -c config -v pinsdir <path to pin export dir tree>`
- To import ussd data:
`python cic_ussd/import_ussd_data.py -c config -v userdir <path to the users export dir tree>`
The balance script is a celery task worker, and will not exit by itself in its current version. However, after it's done doing its job, you will find "reached nonce ... exiting" among the last lines of the log.
The connection parameters for the `cic-ussd-server` is currently _hardcoded_ in the `import_users.py` script file.

View File

@ -171,6 +171,7 @@ if __name__ == '__main__':
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
logg.debug('deserializing {} {}'.format(filepath, o))
u = Person.deserialize(o)
new_address = register_eth(i, u)

View File

@ -0,0 +1,50 @@
# standard imports
import sys
import os
import logging
import json
# external imports
import celery
import confini
# local imports
from cic_eth.api import Api
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
config_dir = os.path.join(script_dir, '..', 'config')
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'), result_extended=True)
class Fmtr(celery.utils.graph.GraphFormatter):
def label(self, obj):
super(Fmtr, self).label(obj)
if obj != None:
if obj.name == None:
raise RuntimeError('task name is not defined. Did you run celery with result_extended=True?')
return obj.name
def main():
api = Api(
config.get('CIC_CHAIN_SPEC'),
queue='cic-eth',
#callback_param='{}:{}:{}:{}'.format(args.redis_host_callback, args.redis_port_callback, redis_db, redis_channel),
#callback_task='cic_eth.callbacks.redis.redis',
#callback_queue=args.q,
)
t = api.create_account(register=False)
t.get_leaf()
t.build_graph(intermediate=True, formatter=Fmtr()).to_dot(sys.stdout)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,70 @@
# standard import
import argparse
import csv
import logging
import os
# third-party imports
import celery
import confini
# local imports
from import_task import *
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = './config'
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
arg_parser.add_argument('--env-prefix',
default=os.environ.get('CONFINI_ENV_PREFIX'),
dest='env_prefix',
type=str,
help='environment prefix for variables to overwrite configuration')
arg_parser.add_argument('-q', type=str, default='cic-import-ussd', help='celery queue to submit transaction tasks to')
arg_parser.add_argument('-v', help='be verbose', action='store_true')
arg_parser.add_argument('-vv', help='be more verbose', action='store_true')
arg_parser.add_argument('pins_dir', default='out', type=str, help='user export directory')
args = arg_parser.parse_args()
# set log levels
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
# process configs
config_dir = args.c
config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
def main():
with open(f'{args.pins_dir}/pins.csv') as pins_file:
phone_to_pins = [tuple(row) for row in csv.reader(pins_file)]
s_import_pins = celery.signature(
'import_task.set_pins',
(db_configs, phone_to_pins),
queue=args.q
)
s_import_pins.apply_async()
argv = ['worker', '-Q', 'cic-import-ussd', '--loglevel=DEBUG']
celery_app.worker_main(argv)
if __name__ == '__main__':
main()

View File

@ -8,6 +8,8 @@ import json
# external imports
import celery
import psycopg2
from psycopg2 import extras
from hexathon import (
strip_0x,
add_0x,
@ -53,7 +55,7 @@ class MetadataTask(ImportTask):
def meta_url(self):
scheme = 'http'
if self.meta_ssl:
scheme += s
scheme += 's'
url = urllib.parse.urlparse('{}://{}:{}/{}'.format(scheme, self.meta_host, self.meta_port, self.meta_path))
return urllib.parse.urlunparse(url)
@ -91,7 +93,6 @@ def resolve_phone(self, phone):
def generate_metadata(self, address, phone):
old_address = old_address_from_phone(self.import_dir, phone)
logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> foo')
logg.debug('address {}'.format(address))
old_address_upper = strip_0x(old_address).upper()
metadata_path = '{}/old/{}/{}/{}.json'.format(
@ -216,3 +217,60 @@ def send_txs(self, nonce):
return nonce
@celery_app.task
def set_pins(config: dict, phone_to_pins: list):
# define db connection
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
port=config.get('port'),
user=config.get('user'),
password=config.get('password')
)
db_cursor = db_conn.cursor()
# update db
for element in phone_to_pins:
sql = 'UPDATE account SET password_hash = %s WHERE phone_number = %s'
db_cursor.execute(sql, (element[1], element[0]))
logg.debug(f'Updating: {element[0]} with: {element[1]}')
# commit changes
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()
@celery_app.task
def set_ussd_data(config: dict, ussd_data: dict):
# define db connection
db_conn = psycopg2.connect(
database=config.get('database'),
host=config.get('host'),
port=config.get('port'),
user=config.get('user'),
password=config.get('password')
)
db_cursor = db_conn.cursor()
# process ussd_data
account_status = 1
if ussd_data['is_activated'] == 1:
account_status = 2
preferred_language = ussd_data['preferred_language']
phone_number = ussd_data['phone']
sql = 'UPDATE account SET account_status = %s, preferred_language = %s WHERE phone_number = %s'
db_cursor.execute(sql, (account_status, preferred_language, phone_number))
# commit changes
db_conn.commit()
# close connections
db_cursor.close()
db_conn.close()

View File

@ -87,6 +87,13 @@ chain_str = str(chain_spec)
batch_size = args.batch_size
batch_delay = args.batch_delay
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
def build_ussd_request(phone, host, port, service_code, username, password, ssl=False):
@ -135,57 +142,60 @@ if __name__ == '__main__':
for y in x[2]:
if y[len(y)-5:] != '.json':
continue
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
# handle json containing person object
filepath = None
if y[:15] != '_ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
u = Person.deserialize(o)
new_address = register_ussd(i, u)
new_address = register_ussd(i, u)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
)
s_phone = celery.signature(
'import_task.resolve_phone',
[
phone,
],
queue='cic-import-ussd',
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
)
s_meta = celery.signature(
'import_task.generate_metadata',
[
phone,
],
queue='cic-import-ussd',
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
)
s_balance = celery.signature(
'import_task.opening_balance_tx',
[
phone,
i,
],
queue='cic-import-ussd',
)
s_meta.link(s_balance)
s_phone.link(s_meta)
s_phone.apply_async(countdown=7) # block time plus a bit of time for ussd processing
s_meta.link(s_balance)
s_phone.link(s_meta)
# block time plus a bit of time for ussd processing
s_phone.apply_async(countdown=7)
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1
if j == batch_size:
time.sleep(batch_delay)
j = 0
i += 1
sys.stdout.write('imported {} {}'.format(i, u).ljust(200) + "\r")
j += 1
if j == batch_size:
time.sleep(batch_delay)
j = 0
#fi.close()

View File

@ -0,0 +1,70 @@
# standard imports
import argparse
import json
import logging
import os
# external imports
import celery
from confini import Config
# local imports
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = '/usr/local/etc/cic'
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='config file')
arg_parser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('user_dir', type=str, help='path to users export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config_dir = args.c
config = Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
user_old_dir = os.path.join(args.user_dir, 'old')
os.stat(user_old_dir)
db_configs = {
'database': config.get('DATABASE_NAME'),
'host': config.get('DATABASE_HOST'),
'port': config.get('DATABASE_PORT'),
'user': config.get('DATABASE_USER'),
'password': config.get('DATABASE_PASSWORD')
}
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for y in x[2]:
if y[len(y) - 5:] != '.json':
continue
# handle ussd_data json object
if y[:15] == '_ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
ussd_data = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
s_set_ussd_data = celery.signature(
'import_task.set_ussd_data',
[db_configs, ussd_data]
)
s_set_ussd_data.apply_async(queue='cic-import-ussd')

View File

@ -0,0 +1,90 @@
# standard imports
import argparse
import json
import logging
import os
# third-party imports
import bcrypt
import celery
import confini
import phonenumbers
import random
from cic_types.models.person import Person
from cryptography.fernet import Fernet
# local imports
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(script_dir, 'config'))
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-c', type=str, default=default_config_dir, help='Config dir')
arg_parser.add_argument('-v', action='store_true', help='Be verbose')
arg_parser.add_argument('-vv', action='store_true', help='Be more verbose')
arg_parser.add_argument('--userdir', type=str, help='path to users export dir tree')
arg_parser.add_argument('pins_dir', type=str, help='path to pin export dir tree')
args = arg_parser.parse_args()
if args.v:
logg.setLevel(logging.INFO)
elif args.vv:
logg.setLevel(logging.DEBUG)
config = confini.Config(args.c, os.environ.get('CONFINI_ENV_PREFIX'))
config.process()
logg.info('loaded config\n{}'.format(config))
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
user_dir = args.userdir
pins_dir = args.pins_dir
def generate_password_hash():
key = Fernet.generate_key()
fnt = Fernet(key)
pin = str(random.randint(1000, 9999))
return fnt.encrypt(bcrypt.hashpw(pin.encode('utf-8'), bcrypt.gensalt())).decode()
user_old_dir = os.path.join(user_dir, 'old')
logg.debug(f'reading user data from: {user_old_dir}')
pins_file = open(f'{pins_dir}/pins.csv', 'w')
if __name__ == '__main__':
for x in os.walk(user_old_dir):
for y in x[2]:
# skip non-json files
if y[len(y) - 5:] != '.json':
continue
# define file path for
filepath = None
if y[:15] != '_ussd_data.json':
filepath = os.path.join(x[0], y)
f = open(filepath, 'r')
try:
o = json.load(f)
except json.decoder.JSONDecodeError as e:
f.close()
logg.error('load error for {}: {}'.format(y, e))
continue
f.close()
u = Person.deserialize(o)
phone_object = phonenumbers.parse(u.tel)
phone = phonenumbers.format_number(phone_object, phonenumbers.PhoneNumberFormat.E164)
password_hash = generate_password_hash()
pins_file.write(f'{phone},{password_hash}\n')
logg.info(f'Writing phone: {phone}, password_hash: {password_hash}')
pins_file.close()

View File

@ -105,7 +105,7 @@ def genId(addr, typ):
def genDate():
ts = random.randint(ts_then, ts_now)
return datetime.datetime.fromtimestamp(ts).timestamp()
return int(datetime.datetime.fromtimestamp(ts).timestamp())
def genPhone():
@ -130,6 +130,7 @@ def genCats():
def genAmount():
return random.randint(0, gift_max) * gift_factor
def genDob():
dob_src = fake.date_of_birth(minimum_age=15)
dob = {}
@ -168,8 +169,9 @@ def gen():
}
p.location['area_name'] = city
if random.randint(0, 1):
p.identities['latitude'] = (random.random() + 180) - 90 #fake.local_latitude()
p.identities['longitude'] = (random.random() + 360) - 180 #fake.local_latitude()
p.location['latitude'] = (random.random() + 180) - 90 #fake.local_latitude()
p.location['longitude'] = (random.random() + 360) - 180 #fake.local_latitude()
return (old_blockchain_checksum_address, phone, p)
@ -191,6 +193,7 @@ def prepareLocalFilePath(datadir, address):
if __name__ == '__main__':
base_dir = os.path.join(user_dir, 'old')
ussd_dir = os.path.join(user_dir, 'ussd')
os.makedirs(base_dir, exist_ok=True)
fa = open(os.path.join(user_dir, 'balances.csv'), 'w')
@ -210,11 +213,23 @@ if __name__ == '__main__':
print(o)
ussd_data = {
'phone': phone,
'is_activated': 1,
'preferred_language': random.sample(['en', 'sw'], 1)[0],
'is_disabled': False
}
d = prepareLocalFilePath(base_dir, uid)
f = open('{}/{}'.format(d, uid + '.json'), 'w')
json.dump(o.serialize(), f)
f.close()
d = prepareLocalFilePath(ussd_dir, uid)
x = open('{}/{}'.format(d, uid + '_ussd_data.json'), 'w')
json.dump(ussd_data, x)
x.close()
pidx = genPhoneIndex(phone)
d = prepareLocalFilePath(os.path.join(user_dir, 'phone'), pidx)
f = open('{}/{}'.format(d, pidx), 'w')

View File

@ -23,7 +23,7 @@ from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.gas import RPCGasOracle
from chainlib.eth.nonce import RPCNonceOracle
from cic_types.processor import generate_metadata_pointer
from eth_accounts_index import AccountRegistry
from eth_accounts_index.registry import AccountRegistry
from eth_contract_registry import Registry
from crypto_dev_signer.keystore.dict import DictKeystore
from crypto_dev_signer.eth.signer.defaultsigner import ReferenceSigner as EIP155Signer

View File

@ -1,52 +1,37 @@
# standard imports
import argparse
import copy
import hashlib
import json
import logging
import os
import sys
import logging
import time
import argparse
import sys
import re
import hashlib
import csv
import json
import urllib
import copy
import uuid
import urllib.request
import uuid
# external imports
import celery
import eth_abi
import confini
from hexathon import (
strip_0x,
add_0x,
)
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver import HeadSyncer
import eth_abi
from chainlib.chain import ChainSpec
from chainlib.eth.address import to_checksum_address
from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainlib.hash import keccak256_string_to_hex
from chainlib.eth.address import to_checksum_address
from chainlib.eth.gas import (
OverrideGasOracle,
balance,
)
OverrideGasOracle,
balance,
)
from chainlib.eth.tx import TxFactory
from chainlib.hash import keccak256_string_to_hex
from chainlib.jsonrpc import jsonrpc_template
from chainlib.eth.error import EthException
from cic_types.models.person import (
Person,
generate_metadata_pointer,
)
Person,
generate_metadata_pointer,
)
from erc20_faucet import Faucet
from eth_erc20 import ERC20
from hexathon.parse import strip_0x, add_0x
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@ -72,6 +57,7 @@ eth_tests = [
phone_tests = [
'ussd',
'ussd_pins'
]
all_tests = eth_tests + custodial_tests + metadata_tests + phone_tests
@ -171,6 +157,39 @@ if logg.isEnabledFor(logging.DEBUG):
outfunc = logg.debug
def send_ussd_request(address, data_dir):
upper_address = strip_0x(address).upper()
f = open(os.path.join(
data_dir,
'new',
upper_address[:2],
upper_address[2:4],
upper_address + '.json',
), 'r'
)
o = json.load(f)
f.close()
p = Person.deserialize(o)
phone = p.tel
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'phoneNumber': phone,
'text': '',
}
req = urllib.request.Request(config.get('_USSD_PROVIDER'))
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
response = urllib.request.urlopen(req)
return response.read().decode('utf-8')
class VerifierState:
def __init__(self, item_keys, active_tests=None):
@ -354,42 +373,18 @@ class Verifier:
def verify_ussd(self, address, balance=None):
upper_address = strip_0x(address).upper()
f = open(os.path.join(
self.data_dir,
'new',
upper_address[:2],
upper_address[2:4],
upper_address + '.json',
), 'r'
)
o = json.load(f)
f.close()
p = Person.deserialize(o)
phone = p.tel
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'phoneNumber': phone,
'text': config.get('APP_SERVICE_CODE'),
}
req = urllib.request.Request(config.get('_USSD_PROVIDER'))
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
response = urllib.request.urlopen(req)
response_data = response.read().decode('utf-8')
response_data = send_ussd_request(address, self.data_dir)
state = response_data[:3]
out = response_data[4:]
m = '{} {}'.format(state, out[:7])
if m != 'CON Welcome':
raise VerifierError(response_data, 'ussd')
def verify_ussd_pins(self, address, balance):
response_data = send_ussd_request(address, self.data_dir)
if response_data[:11] != 'CON Balance':
raise VerifierError(response_data, 'pins')
def verify(self, address, balance, debug_stem=None):

View File

@ -248,6 +248,7 @@ services:
BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor}
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
CELERY_DEBUG: ${CELERY_DEBUG:-1}
SIGNER_SOCKET_PATH: ${SIGNER_SOCKET_PATH:-ipc:///run/crypto-dev-signer/jsonrpc.ipc}
SIGNER_SECRET: ${SIGNER_SECRET:-deadbeef}
ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER: ${DEV_ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER:-0xACB0BC74E1686D62dE7DC6414C999EA60C09F0eA}