Compare commits

..

33 Commits

Author SHA1 Message Date
Spencer Ofwiti
8560761861 Change database name. 2021-03-10 19:28:16 +03:00
f6a7956fdf Update .cic-template.yml 2021-03-10 03:53:58 +00:00
562292bd01 Merge branch 'kaniko-builds' into 'master'
Update ci_templates/.cic-template.yml

See merge request grassrootseconomics/cic-internal-integration!61
2021-03-10 03:48:17 +00:00
3cdf7b9965 Update ci_templates/.cic-template.yml 2021-03-10 03:48:17 +00:00
16d88d389b move envlist to dockercontainer 2021-03-09 16:54:33 -08:00
15445b8d0f Merge branch 'cic-ussd-reqfix' into 'master'
cic types

See merge request grassrootseconomics/cic-internal-integration!59
2021-03-07 20:55:16 +00:00
bb90ceea0b cic types 2021-03-07 12:54:41 -08:00
8904e2abb1 Merge branch 'cic-ussd-req' into 'master'
revert the new requirements for ussd

See merge request grassrootseconomics/cic-internal-integration!58
2021-03-07 20:27:03 +00:00
94d3e61d0c revert the new requirements for ussd 2021-03-07 12:25:36 -08:00
nolash
fc08f3d17a bump cic base for cic ussd 2021-03-07 20:35:26 +01:00
Louis Holbrook
8da5219290 Merge branch 'lash/cli-rehabilitations' into 'master'
Rehabilitate CLI and API after nonce changes

See merge request grassrootseconomics/cic-internal-integration!57
2021-03-07 18:01:44 +00:00
Louis Holbrook
5f01135b04 Rehabilitate CLI and API after nonce changes 2021-03-07 18:01:44 +00:00
Louis Holbrook
543c6249b9 Merge branch 'lash/retry-on-signer-error' into 'master'
Retry on signer error

See merge request grassrootseconomics/cic-internal-integration!55
2021-03-07 13:51:59 +00:00
Louis Holbrook
db4f8f8955 Retry on signer error 2021-03-07 13:51:59 +00:00
Louis Holbrook
0c45e12ce1 Merge branch 'lash/ussd-cli' into 'master'
Add ussd cli

See merge request grassrootseconomics/cic-internal-integration!54
2021-03-06 22:27:40 +00:00
nolash
4f1014c5e1 Add ussd cli 2021-03-06 22:25:57 +01:00
Louis Holbrook
1bfc1434b4 Merge branch 'lash/cic-eth-upgrade' into 'master'
Upgrade cic-eth version

See merge request grassrootseconomics/cic-internal-integration!53
2021-03-06 20:00:10 +00:00
nolash
7b9cd2d4b8 Upgrade cic-eth version 2021-03-06 20:58:35 +01:00
Louis Holbrook
30febbd1e0 Merge branch 'lash/transfer-authorization' into 'master'
cic-eth: Make nonce separate task

See merge request grassrootseconomics/cic-internal-integration!52
2021-03-06 17:55:51 +00:00
Louis Holbrook
f0088f20de cic-eth: Make nonce separate task 2021-03-06 17:55:51 +00:00
Louis Holbrook
618769a0d2 Merge branch 'lash/cic-cache-biiig-num' into 'master'
cic-cache: Set value db fields in cic_cache to handle biiig numbers

See merge request grassrootseconomics/cic-internal-integration!51
2021-03-05 20:03:52 +00:00
nolash
e0a980363c Set value db fields in cic_cache to handle biiig numbers 2021-03-05 20:45:01 +01:00
9d36a5f92f Merge branch 'philip/dry-run-fixes' into 'master'
Get that "ussd_menu.json" out there.

See merge request grassrootseconomics/cic-internal-integration!50
2021-03-05 19:22:44 +00:00
2fe6f4125f Get that "ussd_menu.json" out there. 2021-03-05 22:09:22 +03:00
b5c50b348d Merge branch 'philip/dry-run-prep' into 'master'
Philip/dry run prep

See merge request grassrootseconomics/cic-internal-integration!49
2021-03-05 16:28:08 +00:00
ea336283dc Philip/dry run prep 2021-03-05 16:28:07 +00:00
aa99b16ad2 Merge branch 'lash/fix-tx-list' into 'master'
Make tx listing tasks work properly

See merge request grassrootseconomics/cic-internal-integration!45
2021-03-04 16:47:13 +00:00
Louis Holbrook
1e7fff0133 Minor refactors:
- Renames s_assemble to s_brief
-  Link s_local to s_brief
2021-03-04 16:47:13 +00:00
21c9d95c4b Merge branch 'lash/transfer-authorization' into 'master'
cic-eth: Introduce transfer authorization contract

See merge request grassrootseconomics/cic-internal-integration!47
2021-03-04 15:06:15 +00:00
Louis Holbrook
8240e58c0a cic-eth: Introduce transfer authorization contract 2021-03-04 15:06:14 +00:00
1e9bf6b4d3 Update .cic-template.yml 2021-03-04 14:39:32 +00:00
7e089e1083 Merge branch 'bvander/cic-meta-entrypoint-update' into 'master'
meta isn't using the compose command its in the the Dockerfile

See merge request grassrootseconomics/cic-internal-integration!42
2021-03-03 16:26:20 +00:00
f940d4a961 meta isn't using this. The entrypoin is in the Dockerfile 2021-02-24 06:47:34 -08:00
187 changed files with 4054 additions and 1697 deletions

View File

@@ -1,5 +1,5 @@
[database]
NAME=cic-eth
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost

View File

@@ -29,8 +29,8 @@ def upgrade():
sa.Column('source_token', sa.String(42), nullable=False),
sa.Column('destination_token', sa.String(42), nullable=False),
sa.Column('success', sa.Boolean, nullable=False),
sa.Column('from_value', sa.BIGINT(), nullable=False),
sa.Column('to_value', sa.BIGINT(), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=False),
sa.Column('date_block', sa.DateTime, nullable=False),
)
op.create_table(

View File

@@ -1,5 +1,5 @@
[database]
NAME=cic-eth
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost

View File

@@ -10,12 +10,15 @@ from cic_registry import zero_address
from cic_eth.db.enum import LockEnum
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.lock import Lock
from cic_eth.task import (
CriticalSQLAlchemyTask,
)
from cic_eth.error import LockedError
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks
@@ -33,7 +36,7 @@ def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
"""Task wrapper to reset arbitrary locks
@@ -51,7 +54,7 @@ def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
"""Task wrapper to set send lock
@@ -67,7 +70,7 @@ def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_send(chained_input, chain_str, address=zero_address):
"""Task wrapper to reset send lock
@@ -83,7 +86,7 @@ def unlock_send(chained_input, chain_str, address=zero_address):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
"""Task wrapper to set queue direct lock
@@ -99,7 +102,7 @@ def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_queue(chained_input, chain_str, address=zero_address):
"""Task wrapper to reset queue direct lock
@@ -115,7 +118,7 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_str, lock_flags, address=None):
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)

View File

@@ -1,12 +1,25 @@
# standard imports
import datetime
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
from cic_eth.db.models.base import SessionBase
from cic_eth.task import CriticalSQLAlchemyTask
celery_app = celery.current_app
@celery_app.task()
def out_tmp(tag, txt):
f = open('/tmp/err.{}.txt'.format(tag), "w")
f.write(txt)
f.close()
@celery_app.task(base=CriticalSQLAlchemyTask)
def alert(chained_input, tag, txt):
session = SessionBase.create_session()
o = Debug(tag, txt)
session.add(o)
session.commit()
session.close()
return chained_input

View File

@@ -1,5 +1,6 @@
# standard imports
import logging
import sys
# third-party imports
import celery
@@ -317,6 +318,8 @@ class AdminApi:
:return: Transaction details
:rtype: dict
"""
problems = []
if tx_hash != None and tx_raw != None:
ValueError('Specify only one of hash or raw tx')
@@ -444,10 +447,12 @@ class AdminApi:
r = c.w3.eth.getTransactionReceipt(tx_hash)
if r.status == 1:
tx['network_status'] = 'Confirmed'
tx['block'] = r.blockNumber
tx['tx_index'] = r.transactionIndex
else:
tx['network_status'] = 'Reverted'
tx['network_block_number'] = r.blockNumber
tx['network_tx_index'] = r.transactionIndex
if tx['block_number'] == None:
problems.append('Queue is missing block number {} for mined tx'.format(r.blockNumber))
except web3.exceptions.TransactionNotFound:
pass
@@ -469,4 +474,9 @@ class AdminApi:
t = s.apply_async()
tx['status_log'] = t.get()
if len(problems) > 0:
sys.stderr.write('\n')
for p in problems:
sys.stderr.write('!!!{}\n'.format(p))
return tx

View File

@@ -82,6 +82,7 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
@@ -92,6 +93,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -110,7 +116,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -137,6 +144,7 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
@@ -147,6 +155,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -165,7 +178,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -200,6 +214,13 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
from_address,
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -217,7 +238,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_transfer.link(self.callback_success)
s_tokens.link(s_transfer).on_error(self.callback_error)
@@ -228,82 +250,6 @@ class Api:
return t
def transfer_request(self, from_address, to_address, spender_address, value, token_symbol):
"""Executes a chain of celery tasks that issues a transfer request of ERC20 tokens from one address to another.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of recipient
:type to_address: str, 0x-hex
:param spender_address: Ethereum address that is executing transfer (typically an escrow contract)
:type spender_address: str, 0x-hex
:param value: Estimated return from conversion
:type value: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
self.chain_str,
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_tokens_transfer_approval = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_tokens_approve = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
from_address,
spender_address,
value,
self.chain_str,
],
queue=self.queue,
)
s_transfer_approval = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
from_address,
to_address,
value,
self.chain_str,
],
queue=self.queue,
)
# TODO: make approve and transfer_approval chainable so callback can be part of the full chain
if self.callback_param != None:
s_transfer_approval.link(self.callback_success)
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval).on_error(self.callback_error)
else:
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval)
g = celery.group(s_tokens_approve, s_tokens_transfer_approval) #s_tokens.apply_async(queue=self.queue)
s_check.link(g)
t = s_check.apply_async()
#t = s_tokens.apply_async(queue=self.queue)
return t
def balance(self, address, token_symbol, include_pending=True):
"""Calls the provided callback with the current token balance of the given address.
@@ -408,6 +354,13 @@ class Api:
s_account.link(self.callback_success)
if register:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'ACCOUNTS_INDEX_WRITER',
],
queue=self.queue,
)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
@@ -415,7 +368,8 @@ class Api:
],
queue=self.queue,
)
s_account.link(s_register)
s_nonce.link(s_register)
s_account.link(s_nonce)
t = s_check.apply_async(queue=self.queue)
return t
@@ -438,6 +392,13 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'GAS_GIFTER',
],
queue=self.queue,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
@@ -445,7 +406,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_refill)
s_nonce.link(s_refill)
s_check.link(s_nonce)
if self.callback_param != None:
s_refill.link(self.callback_success)
@@ -489,8 +451,9 @@ class Api:
],
queue=self.queue,
)
s_local.link(s_brief)
if self.callback_param != None:
s_assemble.link(self.callback_success).on_error(self.callback_error)
s_brief.link(self.callback_success).on_error(self.callback_error)
t = None
if external_task != None:
@@ -515,11 +478,10 @@ class Api:
c = celery.chain(s_external_get, s_external_process)
t = celery.chord([s_local, c])(s_brief)
else:
t = s_local.apply_sync()
t = s_local.apply_async(queue=self.queue)
return t
def ping(self, r):
"""A noop callback ping for testing purposes.

View File

@@ -103,6 +103,9 @@ def status_str(v, bits_only=False):
except ValueError:
pass
if v == 0:
return 'NONE'
for i in range(16):
b = (1 << i)
if (b & 0xffff) & v:

View File

@@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -54,7 +54,7 @@ class SessionBase(Model):
@staticmethod
def connect(dsn, pool_size=8, debug=False):
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.

View File

@@ -0,0 +1,23 @@
# standard imports
import datetime
import logging
# external imports
from sqlalchemy import Column, String, DateTime
# local imports
from .base import SessionBase
class Debug(SessionBase):
__tablename__ = 'debug'
date_created = Column(DateTime, default=datetime.datetime.utcnow)
tag = Column(String)
description = Column(String)
def __init__(self, tag, description):
self.tag = tag
self.description = description

View File

@@ -1,11 +1,16 @@
# standard imports
import logging
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer
from sqlalchemy import Column, String, Integer, DateTime
# local imports
from .base import SessionBase
from cic_eth.error import (
InitializationError,
IntegrityError,
)
logg = logging.getLogger()
@@ -37,23 +42,43 @@ class Nonce(SessionBase):
@staticmethod
def __get(session, address):
r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
def __get(conn, address):
r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
nonce = r.fetchone()
session.flush()
if nonce == None:
return None
return nonce[0]
@staticmethod
def __set(session, address, nonce):
session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
session.flush()
def __set(conn, address, nonce):
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
@staticmethod
def next(address, initial_if_not_exists=0, session=None):
def __init(conn, address, nonce):
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
@staticmethod
def init(address, nonce=0, session=None):
session = SessionBase.bind_session(session)
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==address)
o = q.first()
if o != None:
session.flush()
raise InitializationError('nonce on {} already exists ({})'.format(address, o.nonce))
session.flush()
Nonce.__init(session, address, nonce)
SessionBase.release_session(session)
# TODO: Incrementing nonce MUST be done by separate tasks.
@staticmethod
def next(address, initial_if_not_exists=0):
"""Generate next nonce for the given address.
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
@@ -65,32 +90,96 @@ class Nonce(SessionBase):
:returns: Nonce
:rtype: number
"""
session = SessionBase.bind_session(session)
#session = SessionBase.bind_session(session)
SessionBase.release_session(session)
session.begin_nested()
#conn = Nonce.engine.connect()
#session.begin_nested()
conn = Nonce.engine.connect()
if Nonce.transactional:
#session.execute('BEGIN')
session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
session.flush()
nonce = Nonce.__get(session, address)
conn.execute('BEGIN')
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
logg.debug('locking nonce table for address {}'.format(address))
nonce = Nonce.__get(conn, address)
logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None:
nonce = initial_if_not_exists
session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
session.flush()
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
Nonce.__set(session, address, nonce+1)
#if Nonce.transactional:
#session.execute('COMMIT')
#session.execute('UNLOCK TABLE nonce')
#conn.close()
session.commit()
session.commit()
Nonce.__init(conn, address, nonce)
Nonce.__set(conn, address, nonce+1)
if Nonce.transactional:
conn.execute('COMMIT')
logg.debug('unlocking nonce table for address {}'.format(address))
conn.close()
#session.commit()
SessionBase.release_session(session)
#SessionBase.release_session(session)
return nonce
class NonceReservation(SessionBase):
__tablename__ = 'nonce_task_reservation'
nonce = Column(Integer)
key = Column(String)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
@staticmethod
def peek(key, session=None):
session = SessionBase.bind_session(session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
nonce = None
if o != None:
nonce = o.nonce
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def release(key, session=None):
session = SessionBase.bind_session(session)
nonce = NonceReservation.peek(key, session=session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
if o == None:
raise IntegrityError('nonce for key {}'.format(nonce))
SessionBase.release_session(session)
session.delete(o)
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def next(address, key, session=None):
session = SessionBase.bind_session(session)
if NonceReservation.peek(key, session) != None:
raise IntegrityError('nonce for key {}'.format(key))
nonce = Nonce.next(address)
o = NonceReservation()
o.nonce = nonce
o.key = key
session.add(o)
SessionBase.release_session(session)
return nonce

View File

@@ -2,7 +2,7 @@
import datetime
import logging
# third-party imports
# external imports
from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
@@ -400,7 +400,7 @@ class Otx(SessionBase):
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if confirmed:
if not self.status & StatusBits.OBSOLETE:
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusEnum.CANCELLED, session)
else:

View File

@@ -143,7 +143,7 @@ class TxCache(SessionBase):
self.block_number = block_number
self.tx_index = tx_index
# not automatically set in sqlite, it seems:
self.date_created = datetime.datetime.now()
self.date_created = datetime.datetime.utcnow()
self.date_updated = self.date_created
self.date_checked = self.date_created

View File

@@ -54,8 +54,29 @@ class RoleMissingError(Exception):
pass
class IntegrityError(Exception):
"""Exception raised to signal irregularities with deduplication and ordering of tasks
"""
pass
class LockedError(Exception):
"""Exception raised when attempt is made to execute action that is deactivated by lock
"""
pass
class SignerError(Exception):
"""Exception raised when signer is unavailable or generates an error
"""
pass
class EthError(Exception):
"""Exception raised when unspecified error from evm node is encountered
"""
pass

View File

@@ -21,8 +21,14 @@ from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.tx import TxCache
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.error import RoleMissingError
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.error import (
RoleMissingError,
SignerError,
)
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalSQLAlchemyAndSignerTask,
)
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@@ -36,6 +42,7 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
uuid,
session=None,
):
"""Register an Ethereum account address with the on-chain account registry
@@ -59,7 +66,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
@@ -69,6 +76,7 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
uuid,
session=None,
):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
@@ -90,7 +98,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
@@ -137,7 +145,7 @@ def unpack_gift(data):
# TODO: Separate out nonce initialization task
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task(base=CriticalSQLAlchemyAndSignerTask)
def create(password, chain_str):
"""Creates and stores a new ethereum account in the keystore.
@@ -152,28 +160,24 @@ def create(password, chain_str):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
a = c.w3.eth.personal.new_account(password)
a = None
try:
a = c.w3.eth.personal.new_account(password)
except FileNotFoundError:
pass
if a == None:
raise SignerError('create account')
logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account
# TODO: this can safely be set to zero, since we are randomly creating account
n = c.w3.eth.getTransactionCount(a, 'pending')
session = SessionBase.create_session()
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==a)
o = q.first()
session.flush()
if o == None:
o = Nonce()
o.address_hex = a
o.nonce = n
session.add(o)
session.commit()
Nonce.init(a, session=session)
session.commit()
session.close()
return a
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyAndSignerTask)
def register(self, account_address, chain_str, writer_address=None):
"""Creates a transaction to add the given address to the accounts index.
@@ -203,7 +207,7 @@ def register(self, account_address, chain_str, writer_address=None):
c = RpcClient(chain_spec, holder_address=writer_address)
txf = AccountTxFactory(writer_address, c)
tx_add = txf.add(account_address, chain_spec, session=session)
tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
session.close()
@@ -223,7 +227,7 @@ def register(self, account_address, chain_str, writer_address=None):
return account_address
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def gift(self, account_address, chain_str):
"""Creates a transaction to invoke the faucet contract for the given address.
@@ -243,7 +247,7 @@ def gift(self, account_address, chain_str):
txf = AccountTxFactory(account_address, c)
session = SessionBase.create_session()
tx_add = txf.gift(account_address, chain_spec, session=session)
tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
session.close()

View File

@@ -32,10 +32,10 @@ class TxFactory:
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
def next_nonce(self, session=None):
"""Returns the current cached nonce value, and increments it for next transaction.
def next_nonce(self, uuid, session=None):
"""Returns the current reserved nonce value, and increments it for next transaction.
:returns: Nonce
:rtype: number
"""
return self.nonce_oracle.next(session=session)
return self.nonce_oracle.next_by_task_uuid(uuid, session=session)

View File

@@ -54,6 +54,7 @@ class GasOracle():
"""
session = SessionBase.create_session()
a = AccountRole.get_address('GAS_GIFTER', session)
logg.debug('gasgifter {}'.format(a))
session.close()
return a

View File

@@ -1,5 +1,8 @@
# local imports
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
class NonceOracle():
"""Ensures atomic nonce increments for all transactions across all tasks and threads.
@@ -14,10 +17,15 @@ class NonceOracle():
self.default_nonce = default_nonce
def next(self, session=None):
def next(self):
"""Get next unique nonce.
:returns: Nonce
:rtype: number
"""
return Nonce.next(self.address, self.default_nonce, session=session)
raise AttributeError('this should not be called')
return Nonce.next(self.address, self.default_nonce)
def next_by_task_uuid(self, uuid, session=None):
return NonceReservation.release(uuid, session=session)

View File

@@ -1,194 +0,0 @@
# standard imports
import logging
# third-party imports
import web3
import celery
from erc20_approval_escrow import TransferApproval
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.base import SessionBase
from cic_eth.eth import RpcClient
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.error import TokenCountError
celery_app = celery.current_app
logg = logging.getLogger()
contract_function_signatures = {
'request': 'b0addede',
}
class TransferRequestTxFactory(TxFactory):
"""Factory for creating Transfer request transactions using the TransferApproval contract backend
"""
def request(
self,
token_address,
beneficiary_address,
amount,
chain_spec,
):
"""Create a new TransferApproval.request transaction
:param token_address: Token to create transfer request for
:type token_address: str, 0x-hex
:param beneficiary_address: Beneficiary of token transfer
:type beneficiary_address: str, 0x-hex
:param amount: Amount of tokens to transfer
:type amount: number
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Transaction in standard Ethereum format
:rtype: dict
"""
transfer_approval = CICRegistry.get_contract(chain_spec, 'TransferApproval', 'TransferAuthorization')
fn = transfer_approval.function('createRequest')
tx_approval_buildable = fn(beneficiary_address, token_address, amount)
transfer_approval_gas = transfer_approval.gas('createRequest')
tx_approval = tx_approval_buildable.buildTransaction({
'from': self.address,
'gas': transfer_approval_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_approval
def unpack_transfer_approval_request(data):
"""Verifies that a transaction is an "TransferApproval.request" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
if f != contract_function_signatures['request']:
raise ValueError('Invalid transfer request data ({})'.format(f))
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'token': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
'amount': int(d[128:], 16)
}
@celery_app.task(bind=True)
def transfer_approval_request(self, tokens, holder_address, receiver_address, value, chain_str):
"""Creates a new transfer approval
:param tokens: Token to generate transfer request for
:type tokens: list with single token spec as dict
:param holder_address: Address to generate transfer on behalf of
:type holder_address: str, 0x-hex
:param receiver_address: Address to transfser tokens to
:type receiver_address: str, 0x-hex
:param value: Amount of tokens to transfer
:type value: number
:param chain_spec: Chain spec string representation
:type chain_spec: str
:raises cic_eth.error.TokenCountError: More than one token in tokens argument
:returns: Raw signed transaction
:rtype: list with transaction as only element
"""
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
t = tokens[0]
c = RpcClient(holder_address)
txf = TransferRequestTxFactory(holder_address, c)
tx_transfer = txf.request(t['address'], receiver_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, 'cic_eth.eth.request.otx_cache_transfer_approval_request')
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return [tx_signed_raw_hex]
@celery_app.task()
def otx_cache_transfer_approval_request(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an TransferApproval.request transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('in otx acche transfer approval request')
(txc, cache_id) = cache_transfer_approval_request_data(tx_hash_hex, tx)
return txc
@celery_app.task()
def cache_transfer_approval_request_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_transfer_approval_request
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_transfer_approval_request(tx['data'])
logg.debug('tx approval request data {}'.format(tx_data))
logg.debug('tx approval request {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx_data['token'],
tx_data['token'],
tx_data['amount'],
tx_data['amount'],
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -8,6 +8,7 @@ from cic_registry.chain import ChainSpec
# local imports
from cic_eth.eth import RpcClient
from cic_eth.queue.tx import create as queue_create
from cic_eth.error import SignerError
celery_app = celery.current_app
logg = celery_app.log.get_default_logger()
@@ -26,7 +27,13 @@ def sign_tx(tx, chain_str):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_transfer_signed = c.w3.eth.sign_transaction(tx)
tx_transfer_signed = None
try:
tx_transfer_signed = c.w3.eth.sign_transaction(tx)
except FileNotFoundError:
pass
if tx_transfer_signed == None:
raise SignerError('sign tx')
logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed))
tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
tx_hash_hex = tx_hash.hex()

View File

@@ -24,6 +24,7 @@ from cic_eth.ext.address import translate_address
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
CriticalSQLAlchemyAndSignerTask,
)
celery_app = celery.current_app
@@ -46,6 +47,7 @@ class TokenTxFactory(TxFactory):
spender_address,
amount,
chain_spec,
uuid,
session=None,
):
"""Create an ERC20 "approve" transaction
@@ -74,7 +76,7 @@ class TokenTxFactory(TxFactory):
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(uuid, session=session),
})
return tx_approve
@@ -85,6 +87,7 @@ class TokenTxFactory(TxFactory):
receiver_address,
value,
chain_spec,
uuid,
session=None,
):
"""Create an ERC20 "transfer" transaction
@@ -114,7 +117,7 @@ class TokenTxFactory(TxFactory):
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(uuid, session=session),
})
return tx_transfer
@@ -210,7 +213,7 @@ def balance(tokens, holder_address, chain_str):
return tokens
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
"""Transfer ERC20 tokens between addresses
@@ -248,7 +251,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
txf = TokenTxFactory(holder_address, c)
session = SessionBase.create_session()
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, session=session)
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session)
session.close()
@@ -266,7 +269,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def approve(self, tokens, holder_address, spender_address, value, chain_str):
"""Approve ERC20 transfer on behalf of holder address
@@ -304,7 +307,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
txf = TokenTxFactory(holder_address, c)
session = SessionBase.create_session()
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, session=session)
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session)
session.close()

View File

@@ -12,7 +12,9 @@ from cic_registry.chain import ChainSpec
from .rpc import RpcClient
from cic_eth.db import Otx, SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import NonceReservation
from cic_eth.db.models.lock import Lock
from cic_eth.db.models.role import AccountRole
from cic_eth.db.enum import (
LockEnum,
StatusBits,
@@ -35,6 +37,9 @@ from cic_eth.admin.ctrl import lock_send
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
CriticalWeb3AndSignerTask,
CriticalSQLAlchemyAndSignerTask,
CriticalSQLAlchemyAndWeb3Task,
)
celery_app = celery.current_app
@@ -44,7 +49,7 @@ MAX_NONCE_ATTEMPTS = 3
# TODO this function is too long
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
@@ -69,10 +74,12 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
for i in range(len(tx_hashes)):
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
logg.debug('ooooo {}'.format(o))
if address == None:
address = o['address']
if not web3.Web3.isChecksumAddress(address):
raise ValueError('invalid address {}'.format(address))
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
@@ -81,19 +88,32 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
c = RpcClient(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
balance = c.w3.eth.getBalance(address)
balance = 0
try:
balance = c.w3.eth.getBalance(address)
except ValueError as e:
raise EthError('balance call for {}'.format())
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
if gas_required > balance:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
address,
chain_str,
],
queue=queue,
)
s_refill_gas.apply_async()
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
wait_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
@@ -109,15 +129,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
safe_gas = c.safe_threshold_amount()
if balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
address,
chain_str,
],
queue=queue,
)
s_refill_gas.apply_async()
s_nonce.link(s_refill)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address))
ready_tasks = []
for tx_hash in tx_hashes:
@@ -178,12 +206,7 @@ class ParityNodeHandler:
def handle(self, exception, tx_hash_hex, tx_hex):
meth = self.handle_default
if isinstance(exception, (ValueError)):
# s_debug = celery.signature(
# 'cic_eth.admin.debug.out_tmp',
# [tx_hash_hex, '{}: {}'.format(tx_hash_hex, exception)],
# queue=queue,
# )
# s_debug.apply_async()
earg = exception.args[0]
if earg['code'] == -32010:
logg.debug('skipping lock for code {}'.format(earg['code']))
@@ -191,14 +214,15 @@ class ParityNodeHandler:
elif earg['code'] == -32602:
meth = self.handle_invalid_encoding
else:
# TODO: move to status log db comment field
meth = self.handle_invalid
elif isinstance(exception, (requests.exceptions.ConnectionError)):
meth = self.handle_connection
(t, e_fn, message) = meth(tx_hash_hex, tx_hex)
(t, e_fn, message) = meth(tx_hash_hex, tx_hex, str(exception))
return (t, e_fn, '{} {}'.format(message, exception))
def handle_connection(self, tx_hash_hex, tx_hex):
def handle_connection(self, tx_hash_hex, tx_hex, debugstr=None):
s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status',
[
@@ -211,7 +235,7 @@ class ParityNodeHandler:
return (t, TemporaryTxError, 'Sendfail {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_encoding(self, tx_hash_hex, tx_hex):
def handle_invalid_encoding(self, tx_hash_hex, tx_hex, debugstr=None):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
@@ -258,7 +282,7 @@ class ParityNodeHandler:
return (t, PermanentTxError, 'Reject invalid encoding {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_parameters(self, tx_hash_hex, tx_hex):
def handle_invalid_parameters(self, tx_hash_hex, tx_hex, debugstr=None):
s_sync = celery.signature(
'cic_eth.eth.tx.sync_tx',
[
@@ -271,7 +295,7 @@ class ParityNodeHandler:
return (t, PermanentTxError, 'Reject invalid parameters {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid(self, tx_hash_hex, tx_hex):
def handle_invalid(self, tx_hash_hex, tx_hex, debugstr=None):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
@@ -289,12 +313,21 @@ class ParityNodeHandler:
[],
queue=self.queue,
)
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
debugstr,
],
queue=self.queue,
)
s_set_reject.link(s_debug)
s_lock.link(s_set_reject)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_default(self, tx_hash_hex, tx_hex):
def handle_default(self, tx_hash_hex, tx_hex, debugstr):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
@@ -312,9 +345,18 @@ class ParityNodeHandler:
[],
queue=self.queue,
)
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
debugstr,
],
queue=self.queue,
)
s_set_fubar.link(s_debug)
s_lock.link(s_set_fubar)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
return (t, PermanentTxError, 'Fubar {} {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()), debugstr))
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
@@ -388,7 +430,7 @@ def send(self, txs, chain_str):
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3AndSignerTask)
def refill_gas(self, recipient_address, chain_str):
"""Executes a native token transaction to fund the recipient's gas expenditures.
@@ -402,6 +444,7 @@ def refill_gas(self, recipient_address, chain_str):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
@@ -411,8 +454,11 @@ def refill_gas(self, recipient_address, chain_str):
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
if c > 0:
session.close()
raise AlreadyFillingGasError(recipient_address)
#session.close()
#raise AlreadyFillingGasError(recipient_address)
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True
session.flush()
queue = self.request.delivery_info['routing_key']
@@ -421,10 +467,13 @@ def refill_gas(self, recipient_address, chain_str):
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
nonce = nonce_generator.next(session=session)
#nonce = nonce_generator.next(session=session)
nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session)
gas_price = c.gas_price()
gas_limit = c.default_gas_limit
refill_amount = c.refill_amount()
refill_amount = 0
if not zero_amount:
refill_amount = c.refill_amount()
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
# create and sign transaction
@@ -470,10 +519,11 @@ def refill_gas(self, recipient_address, chain_str):
queue=queue,
)
celery.group(s_tx_cache, s_status)()
return tx_send_gas_signed['raw']
@celery_app.task(bind=True)
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price.
@@ -549,6 +599,33 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, signer=None):
session = SessionBase.create_session()
address = None
if signer == None:
address = chained_input
logg.debug('non-explicit address for reserve nonce, using arg head {}'.format(chained_input))
else:
if web3.Web3.isChecksumAddress(signer):
address = signer
logg.debug('explicit address for reserve nonce {}'.format(signer))
else:
address = AccountRole.get_address(signer, session=session)
logg.debug('role for reserve nonce {} -> {}'.format(signer, address))
if not web3.Web3.isChecksumAddress(address):
raise ValueError('invalid result when resolving address for nonce {}'.format(address))
root_id = self.request.root_id
nonce = NonceReservation.next(address, root_id)
session.close()
return chained_input
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_str):
"""Force update of network status of a simgle transaction

View File

@@ -3,10 +3,11 @@ import logging
import sha3
import web3
# third-party imports
# external imports
from rlp import decode as rlp_decode
from rlp import encode as rlp_encode
from eth_keys import KeyAPI
from chainlib.eth.tx import unpack
logg = logging.getLogger()
@@ -22,64 +23,65 @@ field_debugs = [
's',
]
unpack_signed_raw_tx = unpack
def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
d = rlp_decode(tx_raw_bytes)
logg.debug('decoding using chain id {}'.format(chain_id))
j = 0
for i in d:
logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
j += 1
vb = chain_id
if chain_id != 0:
v = int.from_bytes(d[6], 'big')
vb = v - (chain_id * 2) - 35
while len(d[7]) < 32:
d[7] = b'\x00' + d[7]
while len(d[8]) < 32:
d[8] = b'\x00' + d[8]
s = b''.join([d[7], d[8], bytes([vb])])
so = KeyAPI.Signature(signature_bytes=s)
h = sha3.keccak_256()
h.update(rlp_encode(d))
signed_hash = h.digest()
d[6] = chain_id
d[7] = b''
d[8] = b''
h = sha3.keccak_256()
h.update(rlp_encode(d))
unsigned_hash = h.digest()
p = so.recover_public_key_from_msg_hash(unsigned_hash)
a = p.to_checksum_address()
logg.debug('decoded recovery byte {}'.format(vb))
logg.debug('decoded address {}'.format(a))
logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
to = d[3].hex() or None
if to != None:
to = web3.Web3.toChecksumAddress('0x' + to)
return {
'from': a,
'nonce': int.from_bytes(d[0], 'big'),
'gasPrice': int.from_bytes(d[1], 'big'),
'gas': int.from_bytes(d[2], 'big'),
'to': to,
'value': int.from_bytes(d[4], 'big'),
'data': '0x' + d[5].hex(),
'v': chain_id,
'r': '0x' + s[:32].hex(),
's': '0x' + s[32:64].hex(),
'chainId': chain_id,
'hash': '0x' + signed_hash.hex(),
'hash_unsigned': '0x' + unsigned_hash.hex(),
}
#def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
# d = rlp_decode(tx_raw_bytes)
#
# logg.debug('decoding {} using chain id {}'.format(tx_raw_bytes.hex(), chain_id))
# j = 0
# for i in d:
# logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
# j += 1
# vb = chain_id
# if chain_id != 0:
# v = int.from_bytes(d[6], 'big')
# vb = v - (chain_id * 2) - 35
# while len(d[7]) < 32:
# d[7] = b'\x00' + d[7]
# while len(d[8]) < 32:
# d[8] = b'\x00' + d[8]
# s = b''.join([d[7], d[8], bytes([vb])])
# so = KeyAPI.Signature(signature_bytes=s)
#
# h = sha3.keccak_256()
# h.update(rlp_encode(d))
# signed_hash = h.digest()
#
# d[6] = chain_id
# d[7] = b''
# d[8] = b''
#
# h = sha3.keccak_256()
# h.update(rlp_encode(d))
# unsigned_hash = h.digest()
#
# p = so.recover_public_key_from_msg_hash(unsigned_hash)
# a = p.to_checksum_address()
# logg.debug('decoded recovery byte {}'.format(vb))
# logg.debug('decoded address {}'.format(a))
# logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
# logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
#
# to = d[3].hex() or None
# if to != None:
# to = web3.Web3.toChecksumAddress('0x' + to)
#
# return {
# 'from': a,
# 'nonce': int.from_bytes(d[0], 'big'),
# 'gasPrice': int.from_bytes(d[1], 'big'),
# 'gas': int.from_bytes(d[2], 'big'),
# 'to': to,
# 'value': int.from_bytes(d[4], 'big'),
# 'data': '0x' + d[5].hex(),
# 'v': chain_id,
# 'r': '0x' + s[:32].hex(),
# 's': '0x' + s[32:64].hex(),
# 'chainId': chain_id,
# 'hash': '0x' + signed_hash.hex(),
# 'hash_unsigned': '0x' + unsigned_hash.hex(),
# }
def unpack_signed_raw_tx_hex(tx_raw_hex, chain_id):

View File

@@ -149,6 +149,9 @@ def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True):
txs_by_block = {}
chain_spec = ChainSpec.from_chain_str(chain_str)
if isinstance(tx_batches, dict):
tx_batches = [tx_batches]
for b in tx_batches:
for v in b.values():
tx = None

View File

@@ -30,6 +30,7 @@ from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
from cic_eth.error import NotLocalTxError
from cic_eth.error import LockedError
from cic_eth.db.enum import status_str
celery_app = celery.current_app
#logg = celery_app.log.get_default_logger()
@@ -314,7 +315,9 @@ def set_ready(tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_dequeue(tx_hash):
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
@@ -405,7 +408,7 @@ def get_tx_cache(tx_hash):
'tx_hash': otx.tx_hash,
'signed_tx': otx.signed_tx,
'nonce': otx.nonce,
'status': StatusEnum(otx.status).name,
'status': status_str(otx.status),
'status_code': otx.status,
'source_token': txc.source_token_address,
'destination_token': txc.destination_token_address,
@@ -565,7 +568,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
return txs
def get_status_tx(status, before=None, exact=False, limit=0, session=None):
def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, session=None):
"""Retrieve transaction with a specific queue status.
:param status: Status to match transactions with
@@ -581,11 +584,15 @@ def get_status_tx(status, before=None, exact=False, limit=0, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.date_updated<before)
# before = datetime.datetime.utcnow()
if before != None:
q = q.filter(TxCache.date_updated<before)
if exact:
q = q.filter(Otx.status==status.value)
q = q.filter(Otx.status==status)
else:
q = q.filter(Otx.status.op('&')(status.value)==status.value)
q = q.filter(Otx.status.op('&')(status)>0)
if not_status != None:
q = q.filter(Otx.status.op('&')(not_status)==0)
i = 0
for o in q.all():
if limit > 0 and i == limit:

View File

@@ -5,6 +5,7 @@ import os
import logging
import uuid
import json
from xdg.BaseDirectory import xdg_config_home
import celery
from cic_eth.api import Api

View File

@@ -25,7 +25,7 @@ logging.getLogger('urllib3').setLevel(logging.WARNING)
default_abi_dir = '/usr/share/local/cic/solidity/abi'
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')

View File

@@ -15,6 +15,8 @@ import web3
from web3 import HTTPProvider, WebsocketProvider
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from chainlib.eth.tx import unpack
from hexathon import strip_0x
# local imports
import cic_eth
@@ -36,7 +38,7 @@ from cic_eth.error import (
TemporaryTxError,
NotLocalTxError,
)
from cic_eth.eth.util import unpack_signed_raw_tx_hex
#from cic_eth.eth.util import unpack_signed_raw_tx_hex
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -115,12 +117,15 @@ class DispatchSyncer:
chain_str = str(self.chain_spec)
for k in txs.keys():
tx_raw = txs[k]
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
#tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
tx = unpack(tx_raw_bytes, self.chain_spec.chain_id())
try:
set_dequeue(tx['hash'])
except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
continue
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',

View File

@@ -2,3 +2,4 @@ from .callback import CallbackFilter
from .tx import TxFilter
from .gas import GasFilter
from .register import RegistrationFilter
from .transferauth import TransferAuthFilter

View File

@@ -1,7 +1,7 @@
# standard imports
import logging
# third-party imports
# external imports
from cic_registry.chain import ChainSpec
from hexathon import add_0x
@@ -24,7 +24,7 @@ class GasFilter(SyncFilter):
self.chain_spec = chain_spec
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
def filter(self, conn, block, tx, session):
tx_hash_hex = add_0x(tx.hash)
if tx.value > 0:
logg.debug('gas refill tx {}'.format(tx_hash_hex))

View File

@@ -26,7 +26,6 @@ class RegistrationFilter(SyncFilter):
def filter(self, conn, block, tx, db_session=None):
registered_address = None
logg.debug('register filter checking log {}'.format(tx.logs))
for l in tx.logs:
event_topic_hex = l['topics'][0]
if event_topic_hex == account_registry_add_log_hash:
@@ -34,16 +33,23 @@ class RegistrationFilter(SyncFilter):
address_hex = strip_0x(l['topics'][1])[64-40:]
address = to_checksum(add_0x(address_hex))
logg.debug('request token gift to {}'.format(address))
s = celery.signature(
'cic_eth.eth.account.gift',
logg.info('request token gift to {}'.format(address))
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
],
queue=self.queue,
)
s_gift = celery.signature(
'cic_eth.eth.account.gift',
[
str(self.chain_spec),
],
queue=self.queue,
)
s.apply_async()
s_nonce.link(s_gift)
s_nonce.apply_async()
def __str__(self):

View File

@@ -0,0 +1,97 @@
# standard imports
import logging
# external imports
import celery
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum
from .base import SyncFilter
logg = logging.getLogger(__name__)
transfer_request_signature = 'ed71262a'
def unpack_create_request(data):
data = strip_0x(data)
cursor = 0
f = data[cursor:cursor+8]
cursor += 8
if f != transfer_request_signature:
raise ValueError('Invalid create request data ({})'.format(f))
o = {}
o['sender'] = data[cursor+24:cursor+64]
cursor += 64
o['recipient'] = data[cursor+24:cursor+64]
cursor += 64
o['token'] = data[cursor+24:cursor+64]
cursor += 64
o['value'] = int(data[cursor:], 16)
return o
class TransferAuthFilter(SyncFilter):
def __init__(self, registry, chain_spec, queue=None):
self.queue = queue
self.chain_spec = chain_spec
self.transfer_request_contract = registry.get_contract(self.chain_spec, 'TransferAuthorization')
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
if tx.payload == None:
logg.debug('no payload')
return False
payloadlength = len(tx.payload)
if payloadlength != 8+256:
logg.debug('{} below minimum length for a transfer auth call'.format(payloadlength))
logg.debug('payload {}'.format(tx.payload))
return False
recipient = tx.inputs[0]
if recipient != self.transfer_request_contract.address():
logg.debug('not our transfer auth contract address {}'.format(recipient))
return False
o = unpack_create_request(tx.payload)
sender = add_0x(to_checksum(o['sender']))
recipient = add_0x(to_checksum(recipient))
token = add_0x(to_checksum(o['token']))
token_data = {
'address': token,
}
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
[token_data],
sender,
],
queue=self.queue,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
sender,
recipient,
o['value'],
str(self.chain_spec),
],
queue=self.queue,
)
s_nonce.link(s_approve)
t = s_nonce.apply_async()
return True
def __str__(self):
return 'cic-eth transfer auth filter'

View File

@@ -30,7 +30,7 @@ class TxFilter(SyncFilter):
if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None
logg.info('local tx match {}'.format(otx.tx_hash))
logg.info('tx filter match on {}'.format(otx.tx_hash))
SessionBase.release_session(db_session)
s = celery.signature(
'cic_eth.queue.tx.set_final_status',

View File

@@ -138,7 +138,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_str):
# TODO: can we merely use the dispatcher instead?
def dispatch(chain_str):
txs = get_status_tx(StatusEnum.RETRY, datetime.datetime.utcnow())
txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow())
if len(txs) == 0:
logg.debug('no retry state txs found')
return

View File

@@ -55,62 +55,25 @@ SessionBase.connect(dsn)
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
re_transfer_approval_request = r'^/transferrequest/?'
re_something = r'^/something/?'
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
def process_transfer_approval_request(session, env):
r = re.match(re_transfer_approval_request, env.get('PATH_INFO'))
def process_something(session, env):
r = re.match(re_something, env.get('PATH_INFO'))
if not r:
return None
if env.get('CONTENT_TYPE') != 'application/json':
raise AttributeError('content type')
#if env.get('CONTENT_TYPE') != 'application/json':
# raise AttributeError('content type')
if env.get('REQUEST_METHOD') != 'POST':
raise AttributeError('method')
#if env.get('REQUEST_METHOD') != 'POST':
# raise AttributeError('method')
post_data = json.load(env.get('wsgi.input'))
token_address = web3.Web3.toChecksumAddress(post_data['token_address'])
holder_address = web3.Web3.toChecksumAddress(post_data['holder_address'])
beneficiary_address = web3.Web3.toChecksumAddress(post_data['beneficiary_address'])
value = int(post_data['value'])
logg.debug('transfer approval request token {} to {} from {} value {}'.format(
token_address,
beneficiary_address,
holder_address,
value,
)
)
s = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
[
{
'address': token_address,
},
],
holder_address,
beneficiary_address,
value,
config.get('CIC_CHAIN_SPEC'),
],
queue=queue,
)
t = s.apply_async()
r = t.get()
tx_raw_bytes = bytes.fromhex(r[0][2:])
tx = unpack_signed_raw_tx(tx_raw_bytes, chain_spec.chain_id())
for r in t.collect():
logg.debug('result {}'.format(r))
if not t.successful():
raise RuntimeError(tx['hash'])
return ('text/plain', tx['hash'].encode('utf-8'),)
#post_data = json.load(env.get('wsgi.input'))
#return ('text/plain', 'foo'.encode('utf-8'),)
# uwsgi application
@@ -125,7 +88,7 @@ def application(env, start_response):
session = SessionBase.create_session()
for handler in [
process_transfer_approval_request,
process_something,
]:
try:
r = handler(session, env)

View File

@@ -26,7 +26,6 @@ from cic_eth.eth import bancor
from cic_eth.eth import token
from cic_eth.eth import tx
from cic_eth.eth import account
from cic_eth.eth import request
from cic_eth.admin import debug
from cic_eth.admin import ctrl
from cic_eth.eth.rpc import RpcClient
@@ -40,6 +39,7 @@ from cic_eth.callbacks import redis
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db import dsn_from_config
from cic_eth.ext import tx
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()

View File

@@ -58,6 +58,7 @@ from cic_eth.runnable.daemons.filters import (
GasFilter,
TxFilter,
RegistrationFilter,
TransferAuthFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
@@ -146,6 +147,8 @@ def main():
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE'))
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
@@ -153,6 +156,7 @@ def main():
syncer.add_filter(registration_filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.add_filter(tx_filter)
syncer.add_filter(transfer_auth_filter)
for cf in callback_filters:
syncer.add_filter(cf)

View File

@@ -22,7 +22,7 @@ logg = logging.getLogger()
logging.getLogger('web3').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser()

View File

@@ -23,8 +23,11 @@ from hexathon import add_0x
# local imports
from cic_eth.api import AdminApi
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import LockEnum
from cic_eth.db.enum import (
StatusEnum,
status_str,
LockEnum,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -34,12 +37,13 @@ logging.getLogger('urllib3').setLevel(logging.WARNING)
default_abi_dir = '/usr/share/local/cic/solidity/abi'
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address')
argparser.add_argument('-f', '--format', dest='f', default='terminal', type=str, help='Output format')
argparser.add_argument('--status-raw', dest='status_raw', action='store_true', help='Output statis bit enum names only')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
@@ -119,7 +123,7 @@ def render_tx(o, **kwargs):
for v in o.get('status_log', []):
d = datetime.datetime.fromisoformat(v[0])
e = StatusEnum(v[1]).name
e = status_str(v[1], args.status_raw)
content += '{}: {}\n'.format(d, e)
return content

View File

@@ -9,7 +9,10 @@ import celery
# local imports
from .base import Syncer
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
from cic_eth.queue.tx import get_status_tx
logg = logging.getLogger()
@@ -47,7 +50,8 @@ class RetrySyncer(Syncer):
# )
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
stalled_txs = get_status_tx(
StatusEnum.SENT.value,
StatusBits.IN_NETWORK.value,
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
before=before,
)
# return list(failed_txs.keys()) + list(stalled_txs.keys())

View File

@@ -5,6 +5,12 @@ import requests
import celery
import sqlalchemy
# local imports
from cic_eth.error import (
SignerError,
EthError,
)
class CriticalTask(celery.Task):
retry_jitter = True
@@ -30,4 +36,18 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
requests.exceptions.ConnectionError,
EthError,
)
class CriticalSQLAlchemyAndSignerTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
SignerError,
)
class CriticalWeb3AndSignerTask(CriticalTask):
autoretry_for = (
requests.exceptions.ConnectionError,
SignerError,
)

View File

@@ -10,7 +10,7 @@ version = (
0,
10,
0,
'alpha.36',
'alpha.41',
)
version_object = semver.VersionInfo(

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=
DEBUG=0

View File

@@ -1,3 +1,3 @@
[tasks]
transfer_callbacks = taskcall:cic_eth.callbacks.noop.noop
trace_queue_status =
trace_queue_status = 1

View File

@@ -6,7 +6,7 @@ set -e
# set CONFINI_ENV_PREFIX to override the env prefix to override env vars
echo "!!! starting signer"
python /usr/local/bin/crypto-dev-daemon -vv -c /usr/local/etc/crypto-dev-signer &
python /usr/local/bin/crypto-dev-daemon -c /usr/local/etc/crypto-dev-signer &
echo "!!! starting tracker"
/usr/local/bin/cic-eth-taskerd $@

View File

@@ -1,6 +1,7 @@
cic-base~=0.1.1a20
web3==5.12.2
celery==4.4.7
crypto-dev-signer~=0.4.13rc3
crypto-dev-signer~=0.4.13rc4
confini~=0.3.6rc3
cic-registry~=0.5.3a22
cic-bancor~=0.0.6
@@ -9,7 +10,7 @@ alembic==1.4.2
websockets==8.1
requests~=2.24.0
eth_accounts_index~=0.0.10a10
erc20-approval-escrow~=0.3.0a5
erc20-transfer-authorization~=0.3.0a10
erc20-single-shot-faucet~=0.2.0a6
rlp==2.0.1
uWSGI==2.0.19.1
@@ -18,7 +19,6 @@ eth-gas-proxy==0.0.1a4
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.0a8
chainlib~=0.0.1a19
chainlib~=0.0.1a20
hexathon~=0.0.1a3
chainsyncer~=0.0.1a19
cic-base==0.1.1a10

View File

@@ -13,13 +13,13 @@ def celery_includes():
return [
'cic_eth.eth.bancor',
'cic_eth.eth.token',
'cic_eth.eth.request',
'cic_eth.eth.tx',
'cic_eth.ext.tx',
'cic_eth.queue.tx',
'cic_eth.queue.balance',
'cic_eth.admin.ctrl',
'cic_eth.admin.nonce',
'cic_eth.admin.debug',
'cic_eth.eth.account',
'cic_eth.callbacks.noop',
'cic_eth.callbacks.http',

View File

@@ -27,7 +27,7 @@ def database_engine(
SessionBase.poolable = False
dsn = dsn_from_config(load_config)
#SessionBase.connect(dsn, True)
SessionBase.connect(dsn, load_config.get('DATABASE_DEBUG') != None)
SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None)
return dsn

View File

@@ -15,6 +15,7 @@ from eth_keys import KeyAPI
from cic_eth.eth import RpcClient
from cic_eth.eth.rpc import GasOracle
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@@ -113,11 +114,17 @@ def init_w3_conn(
@pytest.fixture(scope='function')
def init_w3(
init_database,
init_eth_tester,
init_eth_account_roles,
init_w3_conn,
):
for address in init_w3_conn.eth.accounts:
nonce = init_w3_conn.eth.getTransactionCount(address, 'pending')
Nonce.init(address, nonce=nonce, session=init_database)
init_database.commit()
yield init_w3_conn
logg.debug('mining om nom nom... {}'.format(init_eth_tester.mine_block()))
@@ -128,9 +135,10 @@ def init_eth_account_roles(
w3_account_roles,
):
role = AccountRole.set('GAS_GIFTER', w3_account_roles.get('eth_account_gas_provider'))
address = w3_account_roles.get('eth_account_gas_provider')
role = AccountRole.set('GAS_GIFTER', address)
init_database.add(role)
init_database.commit()
return w3_account_roles
@@ -163,7 +171,6 @@ def w3_account_roles(
role_ids = [
'eth_account_bancor_deployer',
'eth_account_gas_provider',
'eth_account_reserve_owner',
'eth_account_reserve_minter',
'eth_account_accounts_index_owner',
@@ -172,6 +179,7 @@ def w3_account_roles(
'eth_account_sarafu_gifter',
'eth_account_approval_owner',
'eth_account_faucet_owner',
'eth_account_gas_provider',
]
roles = {}
@@ -187,6 +195,7 @@ def w3_account_roles(
return roles
@pytest.fixture(scope='session')
def w3_account_token_owners(
tokens_to_deploy,

View File

@@ -10,6 +10,8 @@ import web3
# local imports
from cic_eth.api import AdminApi
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
@@ -39,18 +41,37 @@ def test_resend_inplace(
c = RpcClient(default_chain_spec)
sigs = []
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
gas_provider = c.gas_provider()
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[0],
gas_provider,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
chain_str,
],
queue=None,
)
t = s.apply_async()
tx_raw = t.get()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
o = q.first()
tx_raw = o.signed_tx
tx_dict = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), default_chain_spec.chain_id())
gas_price_before = tx_dict['gasPrice']

View File

@@ -49,28 +49,7 @@ def test_transfer_api(
assert t.successful()
def test_transfer_approval_api(
default_chain_spec,
init_w3,
cic_registry,
init_database,
bancor_registry,
bancor_tokens,
transfer_approval,
celery_session_worker,
):
token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
approval_contract = CICRegistry.get_contract(default_chain_spec, 'TransferApproval')
api = Api(str(default_chain_spec), callback_param='transfer_request', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.transfer_request(init_w3.eth.accounts[2], init_w3.eth.accounts[4], approval_contract.address(), 111, token.symbol())
t.get()
#for r in t.collect():
# print(r)
assert t.successful()
@pytest.mark.skip()
def test_convert_api(
default_chain_spec,
init_w3,
@@ -91,6 +70,7 @@ def test_convert_api(
assert t.successful()
@pytest.mark.skip()
def test_convert_transfer_api(
default_chain_spec,
init_w3,

View File

@@ -9,6 +9,10 @@ from tests.mock.filter import (
block_filter,
tx_filter,
)
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
logg = logging.getLogger()
@@ -28,9 +32,20 @@ def test_list_tx(
tx_hashes = []
# external tx
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
NonceReservation.next(init_w3.eth.accounts[0], 'foo', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(13)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec, 'foo')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
@@ -42,9 +57,12 @@ def test_list_tx(
tx_filter.add(a.to_bytes(4, 'big'))
# external tx
NonceReservation.next(init_w3.eth.accounts[0], 'bar', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(28)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec, 'bar')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
@@ -56,10 +74,13 @@ def test_list_tx(
tx_filter.add(a.to_bytes(4, 'big'))
# custodial tx
#NonceReservation.next(init_w3.eth.accounts[0], 'blinky', session=init_database)
#init_database.commit()
init_eth_tester.mine_blocks(3)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
#txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
api = Api(str(default_chain_spec), queue=None)
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000, 'DUM')
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000, 'DUM') #, 'blinky')
t.get()
tx_hash_hex = None
for c in t.collect():
@@ -68,9 +89,11 @@ def test_list_tx(
tx_hashes.append(tx_hash_hex)
# custodial tx
#NonceReservation.next(init_w3.eth.accounts[0], 'clyde', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(6)
api = Api(str(default_chain_spec), queue=None)
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 2000, 'DUM')
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 2000, 'DUM') #, 'clyde')
t.get()
tx_hash_hex = None
for c in t.collect():

View File

@@ -64,6 +64,7 @@ def test_register_account(
init_database,
init_eth_tester,
init_w3,
init_rpc,
cic_registry,
celery_session_worker,
eth_empty_accounts,
@@ -71,18 +72,27 @@ def test_register_account(
logg.debug('chainspec {}'.format(str(default_chain_spec)))
s = celery.signature(
'cic_eth.eth.account.register',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
eth_empty_accounts[0],
init_w3.eth.accounts[0],
],
queue=None,
)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
str(default_chain_spec),
init_w3.eth.accounts[0],
],
)
t = s.apply_async()
s_nonce.link(s_register)
t = s_nonce.apply_async()
address = t.get()
r = t.collect()
t.successful()
for r in t.collect():
pass
assert t.successful()
session = SessionBase.create_session()
o = session.query(Otx).first()

View File

@@ -8,6 +8,7 @@ import celery
# local imports
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.nonce import Nonce
from cic_eth.eth.util import unpack_signed_raw_tx
#logg = logging.getLogger(__name__)
@@ -31,18 +32,38 @@ def test_balance_complex(
}
tx_hashes = []
# TODO: Temporary workaround for nonce db cache initialization being made before deployments.
# Instead use different accounts than system ones for transfers for tests
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
for i in range(3):
s = celery.signature(
'cic_eth.eth.token.transfer',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
[token_data],
init_w3.eth.accounts[0],
],
queue=None,
)
s_transfer = celery.signature(
'cic_eth.eth.token.transfer',
[
init_w3.eth.accounts[0],
init_w3.eth.accounts[1],
1000*(i+1),
chain_str,
],
queue=None,
)
t = s.apply_async()
s_nonce.link(s_transfer)
t = s_nonce.apply_async()
t.get()
r = None
for c in t.collect():

View File

@@ -1,14 +1,19 @@
# standard imports
import logging
import os
# external imports
import pytest
import celery
# local imports
from cic_eth.db import TxConvertTransfer
from cic_eth.eth.bancor import BancorTxFactory
logg = logging.getLogger()
@pytest.mark.skip()
def test_transfer_after_convert(
init_w3,
init_database,

View File

@@ -0,0 +1,29 @@
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
def test_debug_alert(
init_database,
celery_session_worker,
):
s = celery.signature(
'cic_eth.admin.debug.alert',
[
'foo',
'bar',
'baz',
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
q = init_database.query(Debug)
q = q.filter(Debug.tag=='bar')
o = q.first()
assert o.description == 'baz'

View File

@@ -10,6 +10,9 @@ import celery
from cic_eth.eth.account import unpack_gift
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
logg = logging.getLogger()
@@ -32,10 +35,16 @@ def test_faucet(
init_database,
):
s = celery.signature(
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[7],
],
queue=None,
)
s_gift = celery.signature(
'cic_eth.eth.account.gift',
[
init_w3.eth.accounts[7],
str(default_chain_spec),
],
)
@@ -45,15 +54,21 @@ def test_faucet(
str(default_chain_spec),
],
)
s.link(s_send)
t = s.apply_async()
signed_tx = t.get()
s_gift.link(s_send)
s_nonce.link(s_gift)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
logg.debug('result {}'.format(r))
assert t.successful()
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[0][2:]), default_chain_spec.chain_id())
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==init_w3.eth.accounts[7])
o = q.first()
signed_tx = o.signed_tx
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[2:]), default_chain_spec.chain_id())
giveto = unpack_gift(tx['data'])
assert giveto['to'] == init_w3.eth.accounts[7]

View File

@@ -30,6 +30,7 @@ def test_refill_gas(
default_chain_spec,
init_eth_tester,
init_rpc,
init_w3,
init_database,
cic_registry,
init_eth_account_roles,
@@ -44,23 +45,39 @@ def test_refill_gas(
refill_amount = c.refill_amount()
balance = init_rpc.w3.eth.getBalance(receiver_address)
s = celery.signature(
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
eth_empty_accounts[0],
provider_address,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
receiver_address,
str(default_chain_spec),
],
queue=None,
)
t = s.apply_async()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
r = t.get()
t.collect()
for c in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==receiver_address)
o = q.first()
signed_tx = o.signed_tx
s = celery.signature(
'cic_eth.eth.tx.send',
[
[r],
[signed_tx],
str(default_chain_spec),
],
)
@@ -74,11 +91,11 @@ def test_refill_gas(
assert balance_new == (balance + refill_amount)
# Verify that entry is added in TxCache
session = SessionBase.create_session()
q = session.query(Otx)
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==receiver_address)
r = q.first()
init_database.commit()
assert r.status == StatusEnum.SENT
@@ -86,6 +103,7 @@ def test_refill_gas(
def test_refill_deduplication(
default_chain_spec,
init_rpc,
init_w3,
init_database,
init_eth_account_roles,
cic_registry,
@@ -99,83 +117,131 @@ def test_refill_deduplication(
c = init_rpc
refill_amount = c.refill_amount()
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
receiver_address,
provider_address,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
str(default_chain_spec),
],
queue=None,
)
t = s.apply_async()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
r = t.get()
for e in t.collect():
pass
assert t.successful()
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
receiver_address,
str(default_chain_spec),
provider_address,
],
queue=None,
)
t = s.apply_async()
with pytest.raises(AlreadyFillingGasError):
t.get()
def test_check_gas(
default_chain_spec,
init_eth_tester,
init_w3,
init_rpc,
eth_empty_accounts,
init_database,
cic_registry,
celery_session_worker,
bancor_registry,
bancor_tokens,
):
provider_address = init_w3.eth.accounts[0]
gas_receiver_address = eth_empty_accounts[0]
token_receiver_address = init_w3.eth.accounts[1]
c = init_rpc
txf = TokenTxFactory(gas_receiver_address, c)
tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
gas_price = c.gas_price()
gas_limit = tx_transfer['gas']
s = celery.signature(
'cic_eth.eth.tx.check_gas',
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
[tx_hash_hex],
str(default_chain_spec),
[],
gas_receiver_address,
gas_limit * gas_price,
],
)
t = s.apply_async()
with pytest.raises(OutOfGasError):
r = t.get()
#assert len(r) == 0
time.sleep(1)
t.collect()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
#with pytest.raises(AlreadyFillingGasError):
t.get()
for e in t.collect():
pass
assert t.successful()
logg.warning('TODO: complete test by checking that second tx had zero value')
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_hex)
r = q.first()
session.close()
assert r.status == StatusEnum.WAITFORGAS
# TODO: check gas is part of the transfer chain, and we cannot create the transfer nonce by uuid before the task. Test is subsumed by transfer task test, but should be tested in isolation
#def test_check_gas(
# default_chain_spec,
# init_eth_tester,
# init_w3,
# init_rpc,
# eth_empty_accounts,
# init_database,
# cic_registry,
# celery_session_worker,
# bancor_registry,
# bancor_tokens,
# ):
#
# provider_address = init_w3.eth.accounts[0]
# gas_receiver_address = eth_empty_accounts[0]
# token_receiver_address = init_w3.eth.accounts[1]
#
## c = init_rpc
## txf = TokenTxFactory(gas_receiver_address, c)
## tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec, 'foo')
##
## (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
#
# token_data = [
# {
# 'address': bancor_tokens[0],
# },
# ]
#
# s_nonce = celery.signature(
# 'cic_eth.eth.tx.reserve_nonce',
# [
# token_data,
# init_w3.eth.accounts[0],
# ],
# queue=None,
# )
# s_transfer = celery.signature(
# 'cic_eth.eth.token.transfer',
# [
# init_w3.eth.accounts[0],
# init_w3.eth.accounts[1],
# 1024,
# str(default_chain_spec),
# ],
# queue=None,
# )
#
# gas_price = c.gas_price()
# gas_limit = tx_transfer['gas']
#
# s = celery.signature(
# 'cic_eth.eth.tx.check_gas',
# [
# [tx_hash_hex],
# str(default_chain_spec),
# [],
# gas_receiver_address,
# gas_limit * gas_price,
# ],
# )
# s_nonce.link(s_transfer)
# t = s_nonce.apply_async()
# with pytest.raises(OutOfGasError):
# r = t.get()
# #assert len(r) == 0
#
# time.sleep(1)
# t.collect()
#
# session = SessionBase.create_session()
# q = session.query(Otx)
# q = q.filter(Otx.tx_hash==tx_hash_hex)
# r = q.first()
# session.close()
# assert r.status == StatusEnum.WAITFORGAS
def test_resend_with_higher_gas(
@@ -191,39 +257,73 @@ def test_resend_with_higher_gas(
):
c = init_rpc
txf = TokenTxFactory(init_w3.eth.accounts[0], c)
tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec)
logg.debug('txtransfer {}'.format(tx_transfer))
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec))
logg.debug('signed raw {}'.format(tx_signed_raw_hex))
queue_create(
tx_transfer['nonce'],
tx_transfer['from'],
tx_hash_hex,
tx_signed_raw_hex,
str(default_chain_spec),
token_data = {
'address': bancor_tokens[0],
}
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
[token_data],
init_w3.eth.accounts[0],
],
queue=None,
)
logg.debug('create {}'.format(tx_transfer['from']))
cache_transfer_data(
tx_hash_hex,
tx_transfer, #_signed_raw_hex,
s_transfer = celery.signature(
'cic_eth.eth.token.transfer',
[
init_w3.eth.accounts[0],
init_w3.eth.accounts[1],
1024,
str(default_chain_spec),
],
queue=None,
)
# txf = TokenTxFactory(init_w3.eth.accounts[0], c)
# tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec, 'foo')
# logg.debug('txtransfer {}'.format(tx_transfer))
# (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec))
# logg.debug('signed raw {}'.format(tx_signed_raw_hex))
# queue_create(
# tx_transfer['nonce'],
# tx_transfer['from'],
# tx_hash_hex,
# tx_signed_raw_hex,
# str(default_chain_spec),
# )
# logg.debug('create {}'.format(tx_transfer['from']))
# cache_transfer_data(
# tx_hash_hex,
# tx_transfer, #_signed_raw_hex,
# )
s_nonce.link(s_transfer)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[1])
o = q.first()
tx_hash_hex = o.tx_hash
s_resend = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
tx_hash_hex,
str(default_chain_spec),
],
queue=None,
)
t = s_resend.apply_async()
i = 0
for r in t.collect():
logg.debug('{} {}'.format(i, r[0].get()))
i += 1
t = s_resend.apply_async()
for r in t.collect():
pass
assert t.successful()
#

View File

@@ -1,4 +1,5 @@
# third-party imports
import pytest
import celery
# local imports
@@ -6,7 +7,92 @@ from cic_eth.admin.nonce import shift_nonce
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.tx import otx_cache_parse_tx
from cic_eth.eth.task import sign_tx
from cic_eth.db.models.nonce import (
NonceReservation,
Nonce
)
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
@pytest.mark.skip()
def test_reserve_nonce_task(
init_database,
celery_session_worker,
eth_empty_accounts,
):
s = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'foo',
eth_empty_accounts[0],
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
o = q.first()
assert o != None
q = init_database.query(NonceReservation)
q = q.filter(NonceReservation.key==str(t))
o = q.first()
assert o != None
def test_reserve_nonce_chain(
default_chain_spec,
init_database,
celery_session_worker,
init_w3,
init_rpc,
):
provider_address = init_rpc.gas_provider()
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==provider_address)
o = q.first()
o.nonce = 42
init_database.add(o)
init_database.commit()
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[0],
provider_address,
],
queue=None,
)
s_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
str(default_chain_spec),
],
queue=None,
)
s_nonce.link(s_gas)
t = s_nonce.apply_async()
r = t.get()
for c in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
Q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
o = q.first()
assert o.nonce == 42
@pytest.mark.skip()
def test_shift_nonce(
default_chain_spec,
init_database,
@@ -47,3 +133,4 @@ def test_shift_nonce(
for _ in t.collect():
pass
assert t.successful()

View File

@@ -20,21 +20,30 @@ def test_approve(
cic_registry,
):
s = celery.signature(
'cic_eth.eth.token.approve',
[
[
token_data = [
{
'address': bancor_tokens[0],
},
],
]
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
token_data,
init_rpc.w3.eth.accounts[0],
],
queue=None,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
init_rpc.w3.eth.accounts[0],
init_rpc.w3.eth.accounts[1],
1024,
str(default_chain_spec),
],
)
t = s.apply_async()
s_nonce.link(s_approve)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
logg.debug('result {}'.format(r))

View File

@@ -1,76 +0,0 @@
# standard imports
import logging
import time
# third-party imports
from erc20_approval_escrow import TransferApproval
import celery
import sha3
# local imports
from cic_eth.eth.token import TokenTxFactory
logg = logging.getLogger()
# BUG: transaction receipt only found sometimes
def test_transfer_approval(
default_chain_spec,
transfer_approval,
bancor_tokens,
w3_account_roles,
eth_empty_accounts,
cic_registry,
init_database,
celery_session_worker,
init_eth_tester,
init_w3,
):
s = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
[
{
'address': bancor_tokens[0],
},
],
w3_account_roles['eth_account_sarafu_owner'],
eth_empty_accounts[0],
1024,
str(default_chain_spec),
],
)
s_send = celery.signature(
'cic_eth.eth.tx.send',
[
str(default_chain_spec),
],
)
s.link(s_send)
t = s.apply_async()
tx_signed_raws = t.get()
for r in t.collect():
logg.debug('result {}'.format(r))
assert t.successful()
init_eth_tester.mine_block()
h = sha3.keccak_256()
tx_signed_raw = tx_signed_raws[0]
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw[2:])
h.update(tx_signed_raw_bytes)
tx_hash = h.digest()
rcpt = init_w3.eth.getTransactionReceipt(tx_hash)
assert rcpt.status == 1
a = TransferApproval(init_w3, transfer_approval)
assert a.last_serial() == 1
logg.debug('requests {}'.format(a.requests(1)['serial']))

View File

@@ -0,0 +1,16 @@
# local imports
from cic_eth.db.models.debug import Debug
def test_debug(
init_database,
):
o = Debug('foo', 'bar')
init_database.add(o)
init_database.commit()
q = init_database.query(Debug)
q = q.filter(Debug.tag=='foo')
o = q.first()
assert o.description == 'bar'

View File

@@ -1,8 +1,29 @@
# third-party imports
import pytest
import uuid
# local imports
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
from cic_eth.error import (
InitializationError,
IntegrityError,
)
def test_nonce_init(
init_database,
eth_empty_accounts,
):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
init_database.commit()
with pytest.raises(InitializationError):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
def test_nonce_increment(
init_database,
@@ -10,11 +31,46 @@ def test_nonce_increment(
database_engine,
):
# if database_engine[:6] == 'sqlite':
# pytest.skip('sqlite cannot lock tables which is required for this test, skipping')
nonce = Nonce.next(eth_empty_accounts[0], 3)
assert nonce == 3
nonce = Nonce.next(eth_empty_accounts[0], 3)
assert nonce == 4
def test_nonce_reserve(
init_database,
eth_empty_accounts,
):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
init_database.commit()
uu = uuid.uuid4()
nonce = NonceReservation.next(eth_empty_accounts[0], str(uu), session=init_database)
init_database.commit()
assert nonce == 42
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
o = q.first()
assert o.nonce == 43
nonce = NonceReservation.release(str(uu))
init_database.commit()
assert nonce == 42
q = init_database.query(NonceReservation)
q = q.filter(NonceReservation.key==str(uu))
o = q.first()
assert o == None
def test_nonce_reserve_integrity(
init_database,
eth_empty_accounts,
):
uu = uuid.uuid4()
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
with pytest.raises(IntegrityError):
NonceReservation.release(str(uu))

View File

@@ -17,6 +17,7 @@ from cic_eth.db.models.tx import TxCache
logg = logging.getLogger()
@pytest.mark.skip()
def test_resolve_converters_by_tokens(
cic_registry,
init_w3,
@@ -43,6 +44,7 @@ def test_resolve_converters_by_tokens(
assert len(t['converters']) == 1
@pytest.mark.skip()
def test_unpack_convert(
default_chain_spec,
cic_registry,
@@ -84,6 +86,7 @@ def test_unpack_convert(
assert convert_data['fee'] == 0
@pytest.mark.skip()
def test_queue_cache_convert(
default_chain_spec,
init_w3,

View File

@@ -1,51 +0,0 @@
# standard imports
import logging
# local imports
from cic_eth.eth.nonce import NonceOracle
logg = logging.getLogger()
def test_nonce_sequence(
eth_empty_accounts,
init_database,
init_rpc,
):
account= init_rpc.w3.eth.personal.new_account('')
no = NonceOracle(account, 0)
n = no.next()
assert n == 0
n = no.next()
assert n == 1
init_rpc.w3.eth.sendTransaction({
'from': init_rpc.w3.eth.accounts[0],
'to': account,
'value': 200000000,
})
init_rpc.w3.eth.sendTransaction({
'from': account,
'to': eth_empty_accounts[0],
'value': 100,
})
c = init_rpc.w3.eth.getTransactionCount(account, 'pending')
logg.debug('nonce {}'.format(c))
account= init_rpc.w3.eth.personal.new_account('')
no = NonceOracle(account, c)
n = no.next()
assert n == 1
n = no.next()
assert n == 2
# try with bogus value
no = NonceOracle(account, 4)
n = no.next()
assert n == 3

View File

@@ -11,12 +11,14 @@ from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.queue.tx import create as queue_create
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import NonceReservation
logg = logging.getLogger()
def test_unpack_transfer(
default_chain_spec,
init_database,
init_w3,
init_rpc,
cic_registry,
@@ -24,6 +26,9 @@ def test_unpack_transfer(
bancor_registry,
):
NonceReservation.next(init_w3.eth.accounts[0], 'foo', init_database)
init_database.commit()
source_token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
logg.debug('bancor tokens {} {}'.format(bancor_tokens, source_token))
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
@@ -32,6 +37,7 @@ def test_unpack_transfer(
init_w3.eth.accounts[1],
42,
default_chain_spec,
'foo',
)
s = init_w3.eth.sign_transaction(transfer_tx)
s_bytes = bytes.fromhex(s['raw'][2:])
@@ -56,6 +62,9 @@ def test_queue_cache_transfer(
bancor_registry,
):
NonceReservation.next(init_w3.eth.accounts[0], 'foo', init_database)
init_database.commit()
source_token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
value = 42
@@ -64,6 +73,7 @@ def test_queue_cache_transfer(
init_w3.eth.accounts[1],
value,
default_chain_spec,
'foo',
)
tx_signed = init_w3.eth.sign_transaction(transfer_tx)
tx_hash = init_w3.eth.sendRawTransaction(tx_signed['raw'])

View File

@@ -8,12 +8,17 @@ import moolb
# local imports
from cic_eth.eth.token import TokenTxFactory
from cic_eth.eth.task import sign_tx
from cic_eth.db.models.nonce import (
NonceReservation,
Nonce,
)
logg = logging.getLogger()
# TODO: This test fails when not run alone. Identify which fixture leaves a dirty state
def test_filter_process(
init_database,
init_rpc,
default_chain_spec,
default_chain_registry,
@@ -29,9 +34,22 @@ def test_filter_process(
tx_hashes = []
# external tx
# TODO: it does not make sense to use the db setup for nonce here, but we need it as long as we are using the factory to assemble to tx
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
NonceReservation.next(init_w3.eth.accounts[0], 'foo', init_database)
init_database.commit()
init_eth_tester.mine_blocks(13)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec, 'foo')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
@@ -43,9 +61,12 @@ def test_filter_process(
t.add(a.to_bytes(4, 'big'))
# external tx
NonceReservation.next(init_w3.eth.accounts[0], 'bar', init_database)
init_database.commit()
init_eth_tester.mine_blocks(28)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec, 'bar')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)

View File

@@ -0,0 +1,71 @@
# standard imports
import logging
# local imports
from cic_eth.queue.tx import get_status_tx
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.tx import cache_gas_refill_data
from cic_eth.db.models.otx import Otx
logg = logging.getLogger()
def test_status_tx_list(
default_chain_spec,
init_database,
init_w3,
):
tx = {
'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[1],
'nonce': 42,
'gas': 21000,
'gasPrice': 1000000,
'value': 128,
'chainId': 666,
'data': '',
}
logg.debug('nonce {}'.format(tx['nonce']))
tx_signed = init_w3.eth.sign_transaction(tx)
#tx_hash = RpcClient.w3.keccak(hexstr=tx_signed['raw'])
tx_hash = init_w3.keccak(hexstr=tx_signed['raw'])
queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec))
cache_gas_refill_data(tx_hash.hex(), tx)
tx_hash_hex = tx_hash.hex()
q = init_database.query(Otx)
otx = q.get(1)
otx.sendfail(session=init_database)
init_database.add(otx)
init_database.commit()
init_database.refresh(otx)
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 1
otx.sendfail(session=init_database)
otx.retry(session=init_database)
init_database.add(otx)
init_database.commit()
init_database.refresh(otx)
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.QUEUED, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 0
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.IN_NETWORK, session=init_database)
assert len(txs) == 0

View File

@@ -1,6 +1,6 @@
[metadata]
name = cic-notify
version= attr: cic_notify.version.__version_string__
version= 0.4.0a2
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no

View File

@@ -7,10 +7,8 @@ PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[ussd]
MENU_FILE=/usr/local/lib/python3.8/site-packages/cic_ussd/db/ussd_menu.json
MENU_FILE=/usr/src/data/ussd_menu.json
[statemachine]
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/

View File

@@ -1,2 +1,5 @@
[cic]
chain_spec = Bloxberg:8995
engine = evm
common_name = bloxberg
network_id = 8996
meta_url = http://localhost:63380

View File

@@ -1,7 +1,7 @@
[database]
NAME=cic_ussd
USER=postgres
PASSWORD=password
PASSWORD=
HOST=localhost
PORT=5432
ENGINE=postgresql

View File

@@ -0,0 +1,5 @@
[pgp]
export_dir = /usr/src/pgp/keys/
keys_path = /usr/src/secrets/
private_keys = privatekeys_meta.asc
passphrase =

View File

@@ -7,8 +7,8 @@ PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
SERVICE_CODE=*483*46#
[ussd]
MENU_FILE=cic_ussd/db/ussd_menu.json
MENU_FILE=/usr/local/lib/python3.8/site-packages/cic_ussd/db/ussd_menu.json
[statemachine]
STATES=states/
TRANSITIONS=transitions/
STATES=/usr/src/cic-ussd/states/
TRANSITIONS=/usr/src/cic-ussd/transitions/

View File

@@ -1,2 +1,5 @@
[cic]
chain_spec = Bloxberg:8995
engine = evm
common_name = bloxberg
network_id = 8996
meta_url = http://localhost:63380

View File

@@ -0,0 +1,5 @@
[pgp]
export_dir = /usr/src/pgp/keys/
keys_path = /usr/src/secrets/
private_keys = privatekeys_meta.asc
passphrase =

View File

@@ -0,0 +1,2 @@
[app]
service_code = *483*46#

View File

@@ -0,0 +1,4 @@
[client]
host =
port =
ssl =

View File

@@ -0,0 +1,3 @@
[ussd]
user =
pass =

View File

@@ -0,0 +1,49 @@
# standard imports
import json
# third-party imports
from cic_eth.api import Api
from cic_types.models.person import Person
from cic_types.processor import generate_metadata_pointer
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import User
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.redis import get_cached_data
def define_account_tx_metadata(user: User):
# get sender metadata
identifier = blockchain_address_to_metadata_pointer(
blockchain_address=user.blockchain_address
)
key = generate_metadata_pointer(
identifier=identifier,
cic_type='cic.person'
)
account_metadata = get_cached_data(key=key)
if account_metadata:
account_metadata = json.loads(account_metadata)
person = Person()
deserialized_person = person.deserialize(metadata=account_metadata)
given_name = deserialized_person.given_name
family_name = deserialized_person.family_name
phone_number = deserialized_person.tel
return f'{given_name} {family_name} {phone_number}'
else:
phone_number = user.phone_number
return phone_number
def retrieve_account_statement(blockchain_address: str):
chain_str = Chain.spec.__str__()
cic_eth_api = Api(
chain_str=chain_str,
callback_queue='cic-ussd',
callback_task='cic_ussd.tasks.callback_handler.process_statement_callback',
callback_param=blockchain_address
)
result = cic_eth_api.list(address=blockchain_address, limit=9)

View File

@@ -1,39 +0,0 @@
# standard imports
import logging
from collections import deque
# third-party imports
from cic_eth.api import Api
# local imports
from cic_ussd.transactions import from_wei
logg = logging.getLogger()
class BalanceManager:
def __init__(self, address: str, chain_str: str, token_symbol: str):
"""
:param address: Ethereum address of account whose balance is being queried
:type address: str, 0x-hex
:param chain_str: The chain name and network id.
:type chain_str: str
:param token_symbol: ERC20 token symbol of whose balance is being queried
:type token_symbol: str
"""
self.address = address
self.chain_str = chain_str
self.token_symbol = token_symbol
def get_operational_balance(self) -> float:
"""This question queries cic-eth for an account's balance
:return: The current balance of the account as reflected on the blockchain.
:rtype: int
"""
cic_eth_api = Api(chain_str=self.chain_str, callback_task=None)
balance_request_task = cic_eth_api.balance(address=self.address, token_symbol=self.token_symbol)
balance_request_task_results = balance_request_task.collect()
balance_result = deque(balance_request_task_results, maxlen=1).pop()
balance = from_wei(value=balance_result[-1])
return balance

View File

@@ -0,0 +1,90 @@
# standard imports
import json
import logging
from typing import Union
# third-party imports
import celery
from cic_eth.api import Api
# local imports
from cic_ussd.error import CachedDataNotFoundError
from cic_ussd.redis import create_cached_data_key, get_cached_data
from cic_ussd.conversions import from_wei
logg = logging.getLogger()
class BalanceManager:
def __init__(self, address: str, chain_str: str, token_symbol: str):
"""
:param address: Ethereum address of account whose balance is being queried
:type address: str, 0x-hex
:param chain_str: The chain name and network id.
:type chain_str: str
:param token_symbol: ERC20 token symbol of whose balance is being queried
:type token_symbol: str
"""
self.address = address
self.chain_str = chain_str
self.token_symbol = token_symbol
def get_balances(self, asynchronous: bool = False) -> Union[celery.Task, dict]:
"""
This function queries cic-eth for an account's balances, It provides a means to receive the balance either
asynchronously or synchronously depending on the provided value for teh asynchronous parameter. It returns a
dictionary containing network, outgoing and incoming balances.
:param asynchronous: Boolean value checking whether to return balances asynchronously
:type asynchronous: bool
:return:
:rtype:
"""
if asynchronous:
cic_eth_api = Api(
chain_str=self.chain_str,
callback_queue='cic-ussd',
callback_task='cic_ussd.tasks.callback_handler.process_balances_callback',
callback_param=''
)
cic_eth_api.balance(address=self.address, token_symbol=self.token_symbol)
else:
cic_eth_api = Api(chain_str=self.chain_str)
balance_request_task = cic_eth_api.balance(
address=self.address,
token_symbol=self.token_symbol)
return balance_request_task.get()[0]
def compute_operational_balance(balances: dict) -> float:
"""This function calculates the right balance given incoming and outgoing
:param balances:
:type balances:
:return:
:rtype:
"""
incoming_balance = balances.get('balance_incoming')
outgoing_balance = balances.get('balance_outgoing')
network_balance = balances.get('balance_network')
operational_balance = (network_balance + incoming_balance) - outgoing_balance
return from_wei(value=operational_balance)
def get_cached_operational_balance(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
:return:
:rtype:
"""
key = create_cached_data_key(
identifier=bytes.fromhex(blockchain_address[2:]),
salt='cic.balances_data'
)
cached_balance = get_cached_data(key=key)
if cached_balance:
operational_balance = compute_operational_balance(balances=json.loads(cached_balance))
return operational_balance
else:
raise CachedDataNotFoundError('Cached operational balance not found.')

View File

@@ -0,0 +1,10 @@
# local imports
# third-party imports
from chainlib.chain import ChainSpec
# local imports
class Chain:
spec: ChainSpec = None

View File

@@ -0,0 +1,41 @@
# standard imports
import decimal
# third-party imports
# local imports
def truncate(value: float, decimals: int):
"""This function truncates a value to a specified number of decimals places.
:param value: The value to be truncated.
:type value: float
:param decimals: The number of decimals for the value to be truncated to
:type decimals: int
:return: The truncated value.
:rtype: int
"""
decimal.getcontext().rounding = decimal.ROUND_DOWN
contextualized_value = decimal.Decimal(value)
return round(contextualized_value, decimals)
def from_wei(value: int) -> float:
"""This function converts values in Wei to a token in the cic network.
:param value: Value in Wei
:type value: int
:return: SRF equivalent of value in Wei
:rtype: float
"""
value = float(value) / 1e+6
return truncate(value=value, decimals=2)
def to_wei(value: int) -> int:
"""This functions converts values from a token in the cic network to Wei.
:param value: Value in SRF
:type value: int
:return: Wei equivalent of value in SRF
:rtype: int
"""
return int(value * 1e+6)

View File

@@ -1,213 +1,237 @@
{
"ussd_menu": {
"1": {
"description": "The self signup process has been initiated and the account is being created",
"display_key": "ussd.kenya.account_creation_prompt",
"name": "account_creation_prompt",
"parent": null
},
"2": {
"description": "Start menu. This is the entry point for users to select their preferred language",
"description": "Entry point for users to select their preferred language.",
"display_key": "ussd.kenya.initial_language_selection",
"name": "initial_language_selection",
"parent": null
},
"3": {
"description": "PIN setup entry menu",
"2": {
"description": "Entry point for users to enter a pin to secure their account.",
"display_key": "ussd.kenya.initial_pin_entry",
"name": "initial_pin_entry",
"parent": "initial_language_selection"
"parent": null
},
"4": {
"description": "Confirm new PIN menu",
"3": {
"description": "Pin confirmation entry menu.",
"display_key": "ussd.kenya.initial_pin_confirmation",
"name": "initial_pin_confirmation",
"parent": "initial_pin_entry"
},
"4": {
"description": "The signup process has been initiated and the account is being created.",
"display_key": "ussd.kenya.account_creation_prompt",
"name": "account_creation_prompt",
"parent": null
},
"5": {
"description": "Start menu. This is the entry point for activated users",
"description": "Entry point for activated users.",
"display_key": "ussd.kenya.start",
"name": "start",
"parent": null
},
"6": {
"description": "Send Token recipient entry",
"description": "Given name entry menu.",
"display_key": "ussd.kenya.enter_given_name",
"name": "enter_given_name",
"parent": "metadata_management"
},
"7": {
"description": "Family name entry menu.",
"display_key": "ussd.kenya.enter_family_name",
"name": "enter_family_name",
"parent": "metadata_management"
},
"8": {
"description": "Gender entry menu.",
"display_key": "ussd.kenya.enter_gender",
"name": "enter_gender",
"parent": "metadata_management"
},
"9": {
"description": "Age entry menu.",
"display_key": "ussd.kenya.enter_gender",
"name": "enter_gender",
"parent": "metadata_management"
},
"10": {
"description": "Location entry menu.",
"display_key": "ussd.kenya.enter_location",
"name": "enter_location",
"parent": "metadata_management"
},
"11": {
"description": "Products entry menu.",
"display_key": "ussd.kenya.enter_products",
"name": "enter_products",
"parent": "metadata_management"
},
"12": {
"description": "Entry point for activated users.",
"display_key": "ussd.kenya.start",
"name": "start",
"parent": null
},
"13": {
"description": "Send Token recipient entry.",
"display_key": "ussd.kenya.enter_transaction_recipient",
"name": "enter_transaction_recipient",
"parent": "start"
},
"7": {
"description": "Send Token amount prompt menu",
"14": {
"description": "Send Token amount prompt menu.",
"display_key": "ussd.kenya.enter_transaction_amount",
"name": "enter_transaction_amount",
"parent": "start"
},
"8": {
"description": "PIN entry for authorization to send token",
"15": {
"description": "Pin entry for authorization to send token.",
"display_key": "ussd.kenya.transaction_pin_authorization",
"name": "transaction_pin_authorization",
"parent": "start"
},
"9": {
"description": "Terminal of a menu flow where an SMS is expected after.",
"display_key": "ussd.kenya.complete",
"name": "complete",
"parent": null
},
"10": {
"description": "Help menu",
"display_key": "ussd.kenya.help",
"name": "help",
"16": {
"description": "Manage account menu.",
"display_key": "ussd.kenya.account_management",
"name": "account_management",
"parent": "start"
},
"11": {
"description": "Manage account menu",
"display_key": "ussd.kenya.profile_management",
"name": "profile_management",
"17": {
"description": "Manage metadata menu.",
"display_key": "ussd.kenya.metadata_management",
"name": "metadata_management",
"parent": "start"
},
"12": {
"description": "Manage business directory info",
"18": {
"description": "Manage user's preferred language menu.",
"display_key": "ussd.kenya.select_preferred_language",
"name": "select_preferred_language",
"parent": "account_management"
},
"13": {
"description": "About business directory info",
"19": {
"description": "Retrieve mini-statement menu.",
"display_key": "ussd.kenya.mini_statement_pin_authorization",
"name": "mini_statement_pin_authorization",
"parent": "account_management"
},
"14": {
"description": "Change business directory info",
"20": {
"description": "Manage user's pin menu.",
"display_key": "ussd.kenya.enter_current_pin",
"name": "enter_current_pin",
"parent": "account_management"
},
"15": {
"description": "New PIN entry menu",
"21": {
"description": "New pin entry menu.",
"display_key": "ussd.kenya.enter_new_pin",
"name": "enter_new_pin",
"parent": "account_management"
},
"16": {
"description": "First name entry menu",
"display_key": "ussd.kenya.enter_first_name",
"name": "enter_first_name",
"parent": "profile_management"
},
"17": {
"description": "Last name entry menu",
"display_key": "ussd.kenya.enter_last_name",
"name": "enter_last_name",
"parent": "profile_management"
},
"18": {
"description": "Gender entry menu",
"display_key": "ussd.kenya.enter_gender",
"name": "enter_gender",
"parent": "profile_management"
},
"19": {
"description": "Location entry menu",
"display_key": "ussd.kenya.enter_location",
"name": "enter_location",
"parent": "profile_management"
},
"20": {
"description": "Business profile entry menu",
"display_key": "ussd.kenya.enter_business_profile",
"name": "enter_business_profile",
"parent": "profile_management"
},
"21": {
"description": "Menu to display a user's entire profile",
"display_key": "ussd.kenya.display_user_profile_data",
"name": "display_user_profile_data",
"parent": "profile_management"
},
"22": {
"description": "Pin authorization to change name",
"display_key": "ussd.kenya.name_management_pin_authorization",
"name": "name_management_pin_authorization",
"parent": "profile_management"
"description": "Pin entry menu.",
"display_key": "ussd.kenya.standard_pin_authorization",
"name": "standard_pin_authorization",
"parent": "start"
},
"23": {
"description": "Pin authorization to change gender",
"display_key": "ussd.kenya.gender_management_pin_authorization",
"name": "gender_management_pin_authorization",
"parent": "profile_management"
},
"24": {
"description": "Pin authorization to change location",
"display_key": "ussd.kenya.location_management_pin_authorization",
"name": "location_management_pin_authorization",
"parent": "profile_management"
},
"26": {
"description": "Pin authorization to display user's profile",
"display_key": "ussd.kenya.view_profile_pin_authorization",
"name": "view_profile_pin_authorization",
"parent": "profile_management"
},
"27": {
"description": "Exit menu",
"description": "Exit menu.",
"display_key": "ussd.kenya.exit",
"name": "exit",
"parent": null
},
"28": {
"description": "Invalid menu option",
"24": {
"description": "Invalid menu option.",
"display_key": "ussd.kenya.exit_invalid_menu_option",
"name": "exit_invalid_menu_option",
"parent": null
},
"29": {
"description": "PIN policy violation",
"25": {
"description": "Pin policy violation.",
"display_key": "ussd.kenya.exit_invalid_pin",
"name": "exit_invalid_pin",
"parent": null
},
"30": {
"description": "PIN mismatch. New PIN and the new PIN confirmation do not match",
"26": {
"description": "Pin mismatch. New pin and the new pin confirmation do not match",
"display_key": "ussd.kenya.exit_pin_mismatch",
"name": "exit_pin_mismatch",
"parent": null
},
"31": {
"description": "Ussd PIN Blocked Menu",
"27": {
"description": "Ussd pin blocked Menu",
"display_key": "ussd.kenya.exit_pin_blocked",
"name": "exit_pin_blocked",
"parent": null
},
"32": {
"description": "Key params missing in request",
"28": {
"description": "Key params missing in request.",
"display_key": "ussd.kenya.exit_invalid_request",
"name": "exit_invalid_request",
"parent": null
},
"33": {
"description": "The user did not select a choice",
"29": {
"description": "The user did not select a choice.",
"display_key": "ussd.kenya.exit_invalid_input",
"name": "exit_invalid_input",
"parent": null
},
"34": {
"30": {
"description": "Exit following unsuccessful transaction due to insufficient account balance.",
"display_key": "ussd.kenya.exit_insufficient_balance",
"name": "exit_insufficient_balance",
"parent": null
},
"31": {
"description": "Exit following a successful transaction.",
"display_key": "ussd.kenya.exit_successful_transaction",
"name": "exit_successful_transaction",
"parent": null
},
"32": {
"description": "End of a menu flow.",
"display_key": "ussd.kenya.complete",
"name": "complete",
"parent": null
},
"33": {
"description": "Pin entry menu to view account balances.",
"display_key": "ussd.kenya.account_balances_pin_authorization",
"name": "account_balances_pin_authorization",
"parent": "account_management"
},
"34": {
"description": "Pin entry menu to view account statement.",
"display_key": "ussd.kenya.account_statement_pin_authorization",
"name": "account_statement_pin_authorization",
"parent": "account_management"
},
"35": {
"description": "Manage account menu",
"display_key": "ussd.kenya.account_management",
"name": "account_management",
"parent": "start"
"description": "Menu to display account balances.",
"display_key": "ussd.kenya.account_balances",
"name": "account_balances",
"parent": "account_management"
},
"36": {
"description": "Exit following insufficient balance to perform a transaction.",
"display_key": "ussd.kenya.exit_insufficient_balance",
"name": "exit_insufficient_balance",
"description": "Menu to display first set of transactions in statement.",
"display_key": "ussd.kenya.first_transaction_set",
"name": "first_transaction_set",
"parent": null
},
"37": {
"description": "Menu to display middle set of transactions in statement.",
"display_key": "ussd.kenya.middle_transaction_set",
"name": "middle_transaction_set",
"parent": null
},
"38": {
"description": "Menu to display last set of transactions in statement.",
"display_key": "ussd.kenya.last_transaction_set",
"name": "last_transaction_set",
"parent": null
},
"39": {
"description": "Menu to instruct users to call the office.",
"display_key": "ussd.key.help",
"name": "help",
"parent": null
}
}

View File

@@ -17,3 +17,17 @@ class ActionDataNotFoundError(OSError):
"""Raised when action data matching a specific task uuid is not found in the redis cache"""
pass
class UserMetadataNotFoundError(OSError):
"""Raised when metadata is expected but not available in cache."""
pass
class UnsupportedMethodError(OSError):
"""Raised when the method passed to the make request function is unsupported."""
pass
class CachedDataNotFoundError(OSError):
"""Raised when the method passed to the make request function is unsupported."""
pass

View File

@@ -0,0 +1,43 @@
# standard imports
# third-party imports
import requests
from chainlib.eth.address import to_checksum
from hexathon import add_0x
# local imports
from cic_ussd.error import UnsupportedMethodError
def make_request(method: str, url: str, data: any = None, headers: dict = None):
"""
:param method:
:type method:
:param url:
:type url:
:param data:
:type data:
:param headers:
:type headers:
:return:
:rtype:
"""
if method == 'GET':
result = requests.get(url=url)
elif method == 'POST':
result = requests.post(url=url, data=data, headers=headers)
elif method == 'PUT':
result = requests.put(url=url, data=data, headers=headers)
else:
raise UnsupportedMethodError(f'Unsupported method: {method}')
return result
def blockchain_address_to_metadata_pointer(blockchain_address: str):
"""
:param blockchain_address:
:type blockchain_address:
:return:
:rtype:
"""
return bytes.fromhex(blockchain_address[2:])

View File

@@ -0,0 +1,63 @@
# standard imports
import json
import logging
from typing import Optional
from urllib.request import Request, urlopen
# third-party imports
import gnupg
# local imports
logg = logging.getLogger()
class Signer:
"""
:cvar gpg_path:
:type gpg_path:
:cvar gpg_passphrase:
:type gpg_passphrase:
:cvar key_file_path:
:type key_file_path:
"""
gpg_path: str = None
gpg_passphrase: str = None
key_file_path: str = None
def __init__(self):
self.gpg = gnupg.GPG(gnupghome=self.gpg_path)
# parse key file data
key_file = open(self.key_file_path, 'r')
self.key_data = key_file.read()
key_file.close()
def get_operational_key(self):
"""
:return:
:rtype:
"""
# import key data into keyring
self.gpg.import_keys(key_data=self.key_data)
gpg_keys = self.gpg.list_keys()
key_algorithm = gpg_keys[0].get('algo')
key_id = gpg_keys[0].get("keyid")
logg.info(f'using signing key: {key_id}, algorithm: {key_algorithm}')
return gpg_keys[0]
def sign_digest(self, data: bytes):
"""
:param data:
:type data:
:return:
:rtype:
"""
data = json.loads(data)
digest = data['digest']
key_id = self.get_operational_key().get('keyid')
signature = self.gpg.sign(digest, passphrase=self.gpg_passphrase, keyid=key_id)
return str(signature)

View File

@@ -0,0 +1,102 @@
# standard imports
import json
import logging
import os
# third-party imports
import requests
from cic_types.models.person import generate_metadata_pointer, Person
# local imports
from cic_ussd.chain import Chain
from cic_ussd.metadata import make_request
from cic_ussd.metadata.signer import Signer
from cic_ussd.redis import cache_data
logg = logging.getLogger()
class UserMetadata:
"""
:cvar base_url:
:type base_url:
"""
base_url = None
def __init__(self, identifier: bytes):
"""
:param identifier:
:type identifier:
"""
self. headers = {
'X-CIC-AUTOMERGE': 'server',
'Content-Type': 'application/json'
}
self.identifier = identifier
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier,
cic_type='cic.person'
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: dict):
try:
data = json.dumps(data).encode('utf-8')
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
metadata = result.content
self.edit(data=metadata, engine='pgp')
logg.info(f'Get sign material response status: {result.status_code}')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def edit(self, data: bytes, engine: str):
"""
:param data:
:type data:
:param engine:
:type engine:
:return:
:rtype:
"""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get('algo')
formatted_data = {
'm': data.decode('utf-8'),
's': {
'engine': engine,
'algo': algorithm,
'data': signature,
'digest': json.loads(data).get('digest'),
}
}
formatted_data = json.dumps(formatted_data).encode('utf-8')
try:
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
logg.info(f'Signed content submission status: {result.status_code}.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)
def query(self):
result = make_request(method='GET', url=self.url)
status = result.status_code
logg.info(f'Get latest data status: {status}')
try:
if status == 200:
response_data = result.content
data = json.loads(response_data.decode())
# validate data
person = Person()
deserialized_person = person.deserialize(metadata=json.loads(data))
cache_data(key=self.metadata_pointer, data=json.dumps(deserialized_person.serialize()))
elif status == 404:
logg.info('The data is not available and might need to be added.')
result.raise_for_status()
except requests.exceptions.HTTPError as error:
raise RuntimeError(error)

View File

@@ -5,7 +5,6 @@ import logging
# third party imports
import celery
import i18n
import phonenumbers
from cic_eth.api.api_task import Api
from tinydb.table import Document
from typing import Optional
@@ -239,7 +238,7 @@ def persist_session_to_db_task(external_session_id: str, queue: str):
:type queue: str
"""
s_persist_session_to_db = celery.signature(
'cic_ussd.tasks.ussd.persist_session_to_db',
'cic_ussd.tasks.ussd_session.persist_session_to_db',
[external_session_id]
)
s_persist_session_to_db.apply_async(queue=queue)
@@ -453,37 +452,3 @@ def save_to_in_memory_ussd_session_data(queue: str, session_data: dict, ussd_ses
)
persist_session_to_db_task(external_session_id=external_session_id, queue=queue)
def process_phone_number(phone_number: str, region: str):
"""This function parses any phone number for the provided region
:param phone_number: A string with a phone number.
:type phone_number: str
:param region: Caller defined region
:type region: str
:return: The parsed phone number value based on the defined region
:rtype: str
"""
if not isinstance(phone_number, str):
try:
phone_number = str(int(phone_number))
except ValueError:
pass
phone_number_object = phonenumbers.parse(phone_number, region)
parsed_phone_number = phonenumbers.format_number(phone_number_object, phonenumbers.PhoneNumberFormat.E164)
return parsed_phone_number
def get_user_by_phone_number(phone_number: str) -> Optional[User]:
"""This function queries the database for a user based on the provided phone number.
:param phone_number: A valid phone number.
:type phone_number: str
:return: A user object matching a given phone number
:rtype: User|None
"""
# consider adding region to user's metadata
phone_number = process_phone_number(phone_number=phone_number, region='KE')
user = User.session.query(User).filter_by(phone_number=phone_number).first()
return user

View File

@@ -0,0 +1,43 @@
# standard imports
from typing import Optional
# third-party imports
import phonenumbers
# local imports
from cic_ussd.db.models.user import User
def process_phone_number(phone_number: str, region: str):
"""This function parses any phone number for the provided region
:param phone_number: A string with a phone number.
:type phone_number: str
:param region: Caller defined region
:type region: str
:return: The parsed phone number value based on the defined region
:rtype: str
"""
if not isinstance(phone_number, str):
try:
phone_number = str(int(phone_number))
except ValueError:
pass
phone_number_object = phonenumbers.parse(phone_number, region)
parsed_phone_number = phonenumbers.format_number(phone_number_object, phonenumbers.PhoneNumberFormat.E164)
return parsed_phone_number
def get_user_by_phone_number(phone_number: str) -> Optional[User]:
"""This function queries the database for a user based on the provided phone number.
:param phone_number: A valid phone number.
:type phone_number: str
:return: A user object matching a given phone number
:rtype: User|None
"""
# consider adding region to user's metadata
phone_number = process_phone_number(phone_number=phone_number, region='KE')
user = User.session.query(User).filter_by(phone_number=phone_number).first()
return user

View File

@@ -1,17 +1,26 @@
# standard imports
import logging
import json
import re
from typing import Optional
# third party imports
import celery
from cic_types.models.person import Person
from tinydb.table import Document
# local imports
from cic_ussd.accounts import BalanceManager
from cic_ussd.account import define_account_tx_metadata, retrieve_account_statement
from cic_ussd.balance import BalanceManager, compute_operational_balance, get_cached_operational_balance
from cic_ussd.chain import Chain
from cic_ussd.db.models.user import AccountStatus, User
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
from cic_ussd.phone_number import get_user_by_phone_number
from cic_ussd.redis import cache_data, create_cached_data_key, get_cached_data
from cic_ussd.state_machine import UssdStateMachine
from cic_ussd.transactions import to_wei, from_wei
from cic_ussd.conversions import to_wei, from_wei
from cic_ussd.translation import translation_for
logg = logging.getLogger(__name__)
@@ -57,17 +66,17 @@ def process_exit_insufficient_balance(display_key: str, user: User, ussd_session
:rtype: str
"""
# get account balance
balance_manager = BalanceManager(address=user.blockchain_address,
chain_str=UssdStateMachine.chain_str,
token_symbol='SRF')
balance = balance_manager.get_operational_balance()
operational_balance = get_cached_operational_balance(blockchain_address=user.blockchain_address)
# compile response data
user_input = ussd_session.get('user_input').split('*')[-1]
transaction_amount = to_wei(value=int(user_input))
token_symbol = 'SRF'
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
tx_recipient_information = recipient_phone_number
recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
tx_recipient_information = define_account_tx_metadata(user=recipient)
return translation_for(
key=display_key,
@@ -75,7 +84,7 @@ def process_exit_insufficient_balance(display_key: str, user: User, ussd_session
amount=from_wei(transaction_amount),
token_symbol=token_symbol,
recipient_information=tx_recipient_information,
token_balance=balance
token_balance=operational_balance
)
@@ -93,9 +102,9 @@ def process_exit_successful_transaction(display_key: str, user: User, ussd_sessi
transaction_amount = to_wei(int(ussd_session.get('session_data').get('transaction_amount')))
token_symbol = 'SRF'
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
sender_phone_number = user.phone_number
tx_recipient_information = recipient_phone_number
tx_sender_information = sender_phone_number
recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
tx_recipient_information = define_account_tx_metadata(user=recipient)
tx_sender_information = define_account_tx_metadata(user=user)
return translation_for(
key=display_key,
@@ -122,9 +131,10 @@ def process_transaction_pin_authorization(user: User, display_key: str, ussd_ses
"""
# compile response data
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
tx_recipient_information = recipient_phone_number
tx_sender_information = user.phone_number
logg.debug('Requires integration with cic-meta to get user name.')
recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
tx_recipient_information = define_account_tx_metadata(user=recipient)
tx_sender_information = define_account_tx_metadata(user=user)
token_symbol = 'SRF'
user_input = ussd_session.get('user_input').split('*')[-1]
transaction_amount = to_wei(value=int(user_input))
@@ -139,6 +149,122 @@ def process_transaction_pin_authorization(user: User, display_key: str, ussd_ses
)
def process_account_balances(user: User, display_key: str, ussd_session: dict):
"""
:param user:
:type user:
:param display_key:
:type display_key:
:param ussd_session:
:type ussd_session:
:return:
:rtype:
"""
# retrieve cached balance
operational_balance = get_cached_operational_balance(blockchain_address=user.blockchain_address)
logg.debug('Requires call to retrieve tax and bonus amounts')
tax = ''
bonus = ''
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
operational_balance=operational_balance,
tax=tax,
bonus=bonus,
token_symbol='SRF'
)
def format_transactions(transactions: list, preferred_language: str):
formatted_transactions = ''
if len(transactions) > 0:
for transaction in transactions:
recipient_phone_number = transaction.get('recipient_phone_number')
sender_phone_number = transaction.get('sender_phone_number')
value = transaction.get('to_value')
timestamp = transaction.get('timestamp')
action_tag = transaction.get('action_tag')
token_symbol = 'SRF'
if action_tag == 'SENT' or action_tag == 'ULITUMA':
formatted_transactions += f'{action_tag} {value} {token_symbol} {recipient_phone_number} {timestamp}.\n'
else:
formatted_transactions += f'{action_tag} {value} {token_symbol} {sender_phone_number} {timestamp}. \n'
return formatted_transactions
else:
if preferred_language == 'en':
formatted_transactions = 'Empty'
else:
formatted_transactions = 'Hamna historia'
return formatted_transactions
def process_account_statement(user: User, display_key: str, ussd_session: dict):
"""
:param user:
:type user:
:param display_key:
:type display_key:
:param ussd_session:
:type ussd_session:
:return:
:rtype:
"""
# retrieve cached statement
identifier = blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address)
key = create_cached_data_key(identifier=identifier, salt='cic.statement')
transactions = get_cached_data(key=key)
first_transaction_set = []
middle_transaction_set = []
last_transaction_set = []
transactions = json.loads(transactions)
if len(transactions) > 6:
last_transaction_set += transactions[6:]
middle_transaction_set += transactions[3:][:3]
first_transaction_set += transactions[:3]
# there are probably much cleaner and operational inexpensive ways to do this so find them
elif 4 < len(transactions) < 7:
middle_transaction_set += transactions[3:]
first_transaction_set += transactions[:3]
else:
first_transaction_set += transactions[:3]
if display_key == 'ussd.kenya.first_transaction_set':
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
first_transaction_set=format_transactions(
transactions=first_transaction_set,
preferred_language=user.preferred_language
)
)
elif display_key == 'ussd.kenya.middle_transaction_set':
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
middle_transaction_set=format_transactions(
transactions=middle_transaction_set,
preferred_language=user.preferred_language
)
)
elif display_key == 'ussd.kenya.last_transaction_set':
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
last_transaction_set=format_transactions(
transactions=last_transaction_set,
preferred_language=user.preferred_language
)
)
def process_start_menu(display_key: str, user: User):
"""This function gets data on an account's balance and token in order to append it to the start of the start menu's
title. It passes said arguments to the translation function and returns the appropriate corresponding text from the
@@ -150,16 +276,41 @@ def process_start_menu(display_key: str, user: User):
:return: Corresponding translation text response
:rtype: str
"""
balance_manager = BalanceManager(address=user.blockchain_address,
chain_str=UssdStateMachine.chain_str,
chain_str = Chain.spec.__str__()
blockchain_address = user.blockchain_address
balance_manager = BalanceManager(address=blockchain_address,
chain_str=chain_str,
token_symbol='SRF')
balance = balance_manager.get_operational_balance()
# get balances synchronously for display on start menu
balances_data = balance_manager.get_balances()
key = create_cached_data_key(
identifier=bytes.fromhex(blockchain_address[2:]),
salt='cic.balances_data'
)
cache_data(key=key, data=json.dumps(balances_data))
# get operational balance
operational_balance = compute_operational_balance(balances=balances_data)
# retrieve and cache account's metadata
s_query_user_metadata = celery.signature(
'cic_ussd.tasks.metadata.query_user_metadata',
[blockchain_address]
)
s_query_user_metadata.apply_async(queue='cic-ussd')
# retrieve and cache account's statement
retrieve_account_statement(blockchain_address=blockchain_address)
# TODO [Philip]: figure out how to get token symbol from a metadata layer of sorts.
token_symbol = 'SRF'
logg.debug("Requires integration to determine user's balance and token.")
return translation_for(
key=display_key,
preferred_language=user.preferred_language,
account_balance=balance,
account_balance=operational_balance,
account_token_name=token_symbol
)
@@ -241,5 +392,11 @@ def custom_display_text(
return process_exit_successful_transaction(display_key=display_key, user=user, ussd_session=ussd_session)
elif menu_name == 'start':
return process_start_menu(display_key=display_key, user=user)
elif 'pin_authorization' in menu_name:
return process_pin_authorization(display_key=display_key, user=user)
elif menu_name == 'account_balances':
return process_account_balances(display_key=display_key, user=user, ussd_session=ussd_session)
elif 'transaction_set' in menu_name:
return process_account_statement(display_key=display_key, user=user, ussd_session=ussd_session)
else:
return translation_for(key=display_key, preferred_language=user.preferred_language)

View File

@@ -1,6 +1,52 @@
# standard imports
import hashlib
import logging
# third-party imports
from redis import Redis
logg = logging.getLogger()
class InMemoryStore:
cache: Redis = None
def cache_data(key: str, data: str):
"""
:param key:
:type key:
:param data:
:type data:
:return:
:rtype:
"""
cache = InMemoryStore.cache
cache.set(name=key, value=data)
cache.persist(name=key)
def get_cached_data(key: str):
"""
:param key:
:type key:
:return:
:rtype:
"""
cache = InMemoryStore.cache
return cache.get(name=key)
def create_cached_data_key(identifier: bytes, salt: str):
"""
:param identifier:
:type identifier:
:param salt:
:type salt:
:return:
:rtype:
"""
hash_object = hashlib.new("sha256")
hash_object.update(identifier)
hash_object.update(salt.encode(encoding="utf-8"))
return hash_object.digest().hex()

View File

@@ -0,0 +1,107 @@
#!/usr/bin/python3
# Author: Louis Holbrook <dev@holbrook.no> (https://holbrook.no)
# Description: interactive console for Sempo USSD session
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import sys
import uuid
import json
import argparse
import logging
import urllib
from xdg.BaseDirectory import xdg_config_home
from urllib import request
# third-party imports
from confini import Config
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser(description='CLI tool to interface a Sempo USSD session')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
#argparser.add_argument('-d', type=str, default='local', help='deployment name to interface (config root subdirectory)')
argparser.add_argument('--host', type=str, default='localhost')
argparser.add_argument('--port', type=int, default=9000)
argparser.add_argument('--nossl', help='do not use ssl (careful)', action='store_true')
argparser.add_argument('phone', help='phone number for USSD session')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
#config_dir = os.path.join(args.c, args.d)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = Config(config_dir)
config.process()
logg.debug('config loaded from {}'.format(config_dir))
host = config.get('CLIENT_HOST')
port = config.get('CLIENT_PORT')
ssl = config.get('CLIENT_SSL')
if host == None:
host = args.host
if port == None:
port = args.port
if ssl == None:
ssl = not args.nossl
elif ssl == 0:
ssl = False
else:
ssl = True
def main():
# TODO: improve url building
url = 'http'
if ssl:
url += 's'
url += '://{}:{}'.format(host, port)
url += '/?username={}&password={}'.format(config.get('USSD_USER'), config.get('USSD_PASS'))
logg.info('service url {}'.format(url))
logg.info('phone {}'.format(args.phone))
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'phoneNumber': args.phone,
'text': config.get('APP_SERVICE_CODE'),
}
state = "_BEGIN"
while state != "END":
if state != "_BEGIN":
user_input = input('next> ')
data['text'] = user_input
req = urllib.request.Request(url)
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
response = urllib.request.urlopen(req)
response_data = response.read().decode('utf-8')
state = response_data[:3]
out = response_data[4:]
print(out)
if __name__ == "__main__":
main()

View File

@@ -12,14 +12,18 @@ import redis
# third-party imports
from confini import Config
from chainlib.chain import ChainSpec
from urllib.parse import quote_plus
# local imports
from cic_ussd.chain import Chain
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.encoder import PasswordEncoder
from cic_ussd.files.local_files import create_local_file_data_stores, json_file_parser
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.operations import (define_response_with_content,
process_menu_interaction_requests,
define_multilingual_responses)
@@ -59,6 +63,7 @@ config.censor('PASSWORD', 'DATABASE')
# define log levels
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
@@ -92,6 +97,14 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
decode_responses=True)
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
UserMetadata.base_url = config.get('CIC_META_URL')
# define signer values
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
# initialize celery app
celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
@@ -99,7 +112,13 @@ celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY
states = json_file_parser(filepath=config.get('STATEMACHINE_STATES'))
transitions = json_file_parser(filepath=config.get('STATEMACHINE_TRANSITIONS'))
UssdStateMachine.chain_str = config.get('CIC_CHAIN_SPEC')
chain_spec = ChainSpec(
common_name=config.get('CIC_COMMON_NAME'),
engine=config.get('CIC_ENGINE'),
network_id=config.get('CIC_NETWORK_ID')
)
Chain.spec = chain_spec
UssdStateMachine.states = states
UssdStateMachine.transitions = transitions
@@ -152,7 +171,8 @@ def application(env, start_response):
return []
# handle menu interaction requests
response = process_menu_interaction_requests(chain_str=config.get('CIC_CHAIN_SPEC'),
chain_str = chain_spec.__str__()
response = process_menu_interaction_requests(chain_str=chain_str,
external_session_id=external_session_id,
phone_number=phone_number,
queue=args.q,

View File

@@ -12,6 +12,8 @@ from confini import Config
# local imports
from cic_ussd.db import dsn_from_config
from cic_ussd.db.models.base import SessionBase
from cic_ussd.metadata.signer import Signer
from cic_ussd.metadata.user import UserMetadata
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
@@ -59,6 +61,14 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
decode_responses=True)
InMemoryUssdSession.redis_cache = InMemoryStore.cache
# define metadata URL
UserMetadata.base_url = config.get('CIC_META_URL')
# define signer values
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
# set up celery
current_app = celery.Celery(__name__)

Some files were not shown because too many files have changed in this diff Show More