Compare commits
23 Commits
lash/cic-e
...
lash/refac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9119c04f1f
|
||
|
|
3ceafa90f4
|
||
|
|
bcf20cbfd3
|
||
|
|
4af7592212
|
||
|
|
4ea3933f07
|
||
|
|
1edacf70cf
|
||
|
|
2720e9ce6a
|
||
|
|
a0d405f582
|
||
|
|
435bfa88f1
|
||
|
|
5b77a2ea20
|
||
|
|
1124bd2d26
|
||
|
|
744c7b6840
|
||
|
|
841a366a2f
|
||
|
|
12f887c92c
|
||
|
|
72fed82bd5
|
||
|
|
a766a9de70
|
||
|
|
dfd0de32e1
|
||
|
|
7752ab5c5d
|
||
|
|
5a6fe7327d
|
||
|
|
1f126037fe
|
||
|
|
01387993b5
|
||
|
|
9d6bfef73d | ||
|
|
26c622f42f
|
@@ -29,8 +29,8 @@ def upgrade():
|
||||
sa.Column('source_token', sa.String(42), nullable=False),
|
||||
sa.Column('destination_token', sa.String(42), nullable=False),
|
||||
sa.Column('success', sa.Boolean, nullable=False),
|
||||
sa.Column('from_value', sa.NUMERIC(), nullable=False),
|
||||
sa.Column('to_value', sa.NUMERIC(), nullable=False),
|
||||
sa.Column('from_value', sa.BIGINT(), nullable=False),
|
||||
sa.Column('to_value', sa.BIGINT(), nullable=False),
|
||||
sa.Column('date_block', sa.DateTime, nullable=False),
|
||||
)
|
||||
op.create_table(
|
||||
|
||||
@@ -312,7 +312,7 @@ class Tracker:
|
||||
session.close()
|
||||
|
||||
(provider, w3) = web3_constructor()
|
||||
trust = config.get('CIC_TRUST_ADDRESS', "").split(",")
|
||||
trust = config.get('CIC_TRUST_ADDRESS', []).split(",")
|
||||
chain_spec = args.i
|
||||
|
||||
try:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[database]
|
||||
NAME=cic_cache
|
||||
NAME=cic-eth
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
HOST=localhost
|
||||
|
||||
@@ -1,25 +1,12 @@
|
||||
# standard imports
|
||||
import datetime
|
||||
|
||||
# external imports
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.models.debug import Debug
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
from cic_eth.task import CriticalSQLAlchemyTask
|
||||
|
||||
celery_app = celery.current_app
|
||||
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def alert(chained_input, tag, txt):
|
||||
session = SessionBase.create_session()
|
||||
|
||||
o = Debug(tag, txt)
|
||||
session.add(o)
|
||||
session.commit()
|
||||
|
||||
session.close()
|
||||
|
||||
return chained_input
|
||||
@celery_app.task()
|
||||
def out_tmp(tag, txt):
|
||||
f = open('/tmp/err.{}.txt'.format(tag), "w")
|
||||
f.write(txt)
|
||||
f.close()
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import sys
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
@@ -318,8 +317,6 @@ class AdminApi:
|
||||
:return: Transaction details
|
||||
:rtype: dict
|
||||
"""
|
||||
problems = []
|
||||
|
||||
if tx_hash != None and tx_raw != None:
|
||||
ValueError('Specify only one of hash or raw tx')
|
||||
|
||||
@@ -447,12 +444,10 @@ class AdminApi:
|
||||
r = c.w3.eth.getTransactionReceipt(tx_hash)
|
||||
if r.status == 1:
|
||||
tx['network_status'] = 'Confirmed'
|
||||
tx['block'] = r.blockNumber
|
||||
tx['tx_index'] = r.transactionIndex
|
||||
else:
|
||||
tx['network_status'] = 'Reverted'
|
||||
tx['network_block_number'] = r.blockNumber
|
||||
tx['network_tx_index'] = r.transactionIndex
|
||||
if tx['block_number'] == None:
|
||||
problems.append('Queue is missing block number {} for mined tx'.format(r.blockNumber))
|
||||
except web3.exceptions.TransactionNotFound:
|
||||
pass
|
||||
|
||||
@@ -474,9 +469,4 @@ class AdminApi:
|
||||
t = s.apply_async()
|
||||
tx['status_log'] = t.get()
|
||||
|
||||
if len(problems) > 0:
|
||||
sys.stderr.write('\n')
|
||||
for p in problems:
|
||||
sys.stderr.write('!!!{}\n'.format(p))
|
||||
|
||||
return tx
|
||||
|
||||
@@ -92,11 +92,6 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
@@ -115,8 +110,7 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce.link(s_tokens)
|
||||
s_check.link(s_nonce)
|
||||
s_check.link(s_tokens)
|
||||
if self.callback_param != None:
|
||||
s_convert.link(self.callback_success)
|
||||
s_tokens.link(s_convert).on_error(self.callback_error)
|
||||
@@ -153,11 +147,6 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
@@ -176,8 +165,7 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce.link(s_tokens)
|
||||
s_check.link(s_nonce)
|
||||
s_check.link(s_tokens)
|
||||
if self.callback_param != None:
|
||||
s_convert.link(self.callback_success)
|
||||
s_tokens.link(s_convert).on_error(self.callback_error)
|
||||
@@ -212,13 +200,6 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
from_address,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
@@ -236,8 +217,7 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce.link(s_tokens)
|
||||
s_check.link(s_nonce)
|
||||
s_check.link(s_tokens)
|
||||
if self.callback_param != None:
|
||||
s_transfer.link(self.callback_success)
|
||||
s_tokens.link(s_transfer).on_error(self.callback_error)
|
||||
@@ -248,6 +228,82 @@ class Api:
|
||||
return t
|
||||
|
||||
|
||||
def transfer_request(self, from_address, to_address, spender_address, value, token_symbol):
|
||||
"""Executes a chain of celery tasks that issues a transfer request of ERC20 tokens from one address to another.
|
||||
|
||||
:param from_address: Ethereum address of sender
|
||||
:type from_address: str, 0x-hex
|
||||
:param to_address: Ethereum address of recipient
|
||||
:type to_address: str, 0x-hex
|
||||
:param spender_address: Ethereum address that is executing transfer (typically an escrow contract)
|
||||
:type spender_address: str, 0x-hex
|
||||
:param value: Estimated return from conversion
|
||||
:type value: int
|
||||
:param token_symbol: ERC20 token symbol of token to send
|
||||
:type token_symbol: str
|
||||
:returns: uuid of root task
|
||||
:rtype: celery.Task
|
||||
"""
|
||||
s_check = celery.signature(
|
||||
'cic_eth.admin.ctrl.check_lock',
|
||||
[
|
||||
[token_symbol],
|
||||
self.chain_str,
|
||||
LockEnum.QUEUE,
|
||||
from_address,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens_transfer_approval = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_tokens_approve = celery.signature(
|
||||
'cic_eth.eth.token.resolve_tokens_by_symbol',
|
||||
[
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_approve = celery.signature(
|
||||
'cic_eth.eth.token.approve',
|
||||
[
|
||||
from_address,
|
||||
spender_address,
|
||||
value,
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_transfer_approval = celery.signature(
|
||||
'cic_eth.eth.request.transfer_approval_request',
|
||||
[
|
||||
from_address,
|
||||
to_address,
|
||||
value,
|
||||
self.chain_str,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
# TODO: make approve and transfer_approval chainable so callback can be part of the full chain
|
||||
if self.callback_param != None:
|
||||
s_transfer_approval.link(self.callback_success)
|
||||
s_tokens_approve.link(s_approve)
|
||||
s_tokens_transfer_approval.link(s_transfer_approval).on_error(self.callback_error)
|
||||
else:
|
||||
s_tokens_approve.link(s_approve)
|
||||
s_tokens_transfer_approval.link(s_transfer_approval)
|
||||
|
||||
g = celery.group(s_tokens_approve, s_tokens_transfer_approval) #s_tokens.apply_async(queue=self.queue)
|
||||
s_check.link(g)
|
||||
t = s_check.apply_async()
|
||||
#t = s_tokens.apply_async(queue=self.queue)
|
||||
return t
|
||||
|
||||
|
||||
def balance(self, address, token_symbol, include_pending=True):
|
||||
"""Calls the provided callback with the current token balance of the given address.
|
||||
|
||||
@@ -340,11 +396,6 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_account = celery.signature(
|
||||
'cic_eth.eth.account.create',
|
||||
[
|
||||
@@ -352,8 +403,7 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce.link(s_account)
|
||||
s_check.link(s_nonce)
|
||||
s_check.link(s_account)
|
||||
if self.callback_param != None:
|
||||
s_account.link(self.callback_success)
|
||||
|
||||
@@ -388,11 +438,6 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_refill = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
@@ -400,8 +445,7 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce.link(s_refill)
|
||||
s_check.link(s_nonce)
|
||||
s_check.link(s_refill)
|
||||
if self.callback_param != None:
|
||||
s_refill.link(self.callback_success)
|
||||
|
||||
@@ -445,7 +489,6 @@ class Api:
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_local.link(s_brief)
|
||||
if self.callback_param != None:
|
||||
s_brief.link(self.callback_success).on_error(self.callback_error)
|
||||
|
||||
@@ -472,10 +515,12 @@ class Api:
|
||||
c = celery.chain(s_external_get, s_external_process)
|
||||
t = celery.chord([s_local, c])(s_brief)
|
||||
else:
|
||||
t = s_local.apply_async(queue=self.queue)
|
||||
s_local.link(s_brief)
|
||||
t = s_local.apply_async()
|
||||
|
||||
return t
|
||||
|
||||
|
||||
def ping(self, r):
|
||||
"""A noop callback ping for testing purposes.
|
||||
|
||||
|
||||
@@ -103,9 +103,6 @@ def status_str(v, bits_only=False):
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if v == 0:
|
||||
return 'NONE'
|
||||
|
||||
for i in range(16):
|
||||
b = (1 << i)
|
||||
if (b & 0xffff) & v:
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
"""Nonce reservation
|
||||
|
||||
Revision ID: 3b693afd526a
|
||||
Revises: f738d9962fdf
|
||||
Create Date: 2021-03-05 07:09:50.898728
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '3b693afd526a'
|
||||
down_revision = 'f738d9962fdf'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'nonce_task_reservation',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('nonce', sa.Integer, nullable=False),
|
||||
sa.Column('key', sa.String, nullable=False),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('nonce_task_reservation')
|
||||
@@ -1,32 +0,0 @@
|
||||
"""debug output
|
||||
|
||||
Revision ID: f738d9962fdf
|
||||
Revises: ec40ac0974c1
|
||||
Create Date: 2021-03-04 08:32:43.281214
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'f738d9962fdf'
|
||||
down_revision = 'ec40ac0974c1'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'debug',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('tag', sa.String, nullable=False),
|
||||
sa.Column('description', sa.String, nullable=False),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
)
|
||||
pass
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('debug')
|
||||
pass
|
||||
@@ -1,30 +0,0 @@
|
||||
"""Nonce reservation
|
||||
|
||||
Revision ID: 3b693afd526a
|
||||
Revises: f738d9962fdf
|
||||
Create Date: 2021-03-05 07:09:50.898728
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '3b693afd526a'
|
||||
down_revision = 'f738d9962fdf'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'nonce_task_reservation',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('nonce', sa.Integer, nullable=False),
|
||||
sa.Column('key', sa.String, nullable=False),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('nonce_task_reservation')
|
||||
@@ -1,32 +0,0 @@
|
||||
"""debug output
|
||||
|
||||
Revision ID: f738d9962fdf
|
||||
Revises: ec40ac0974c1
|
||||
Create Date: 2021-03-04 08:32:43.281214
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'f738d9962fdf'
|
||||
down_revision = 'ec40ac0974c1'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'debug',
|
||||
sa.Column('id', sa.Integer, primary_key=True),
|
||||
sa.Column('tag', sa.String, nullable=False),
|
||||
sa.Column('description', sa.String, nullable=False),
|
||||
sa.Column('date_created', sa.DateTime, nullable=False),
|
||||
)
|
||||
pass
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table('debug')
|
||||
pass
|
||||
@@ -54,7 +54,7 @@ class SessionBase(Model):
|
||||
|
||||
|
||||
@staticmethod
|
||||
def connect(dsn, pool_size=16, debug=False):
|
||||
def connect(dsn, pool_size=8, debug=False):
|
||||
"""Create new database connection engine and connect to database backend.
|
||||
|
||||
:param dsn: DSN string defining connection.
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
# standard imports
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from sqlalchemy import Column, String, DateTime
|
||||
|
||||
# local imports
|
||||
from .base import SessionBase
|
||||
|
||||
|
||||
class Debug(SessionBase):
|
||||
|
||||
__tablename__ = 'debug'
|
||||
|
||||
date_created = Column(DateTime, default=datetime.datetime.utcnow)
|
||||
tag = Column(String)
|
||||
description = Column(String)
|
||||
|
||||
|
||||
def __init__(self, tag, description):
|
||||
self.tag = tag
|
||||
self.description = description
|
||||
@@ -1,16 +1,11 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
# third-party imports
|
||||
from sqlalchemy import Column, String, Integer, DateTime
|
||||
from sqlalchemy import Column, String, Integer
|
||||
|
||||
# local imports
|
||||
from .base import SessionBase
|
||||
from cic_eth.error import (
|
||||
InitializationError,
|
||||
IntegrityError,
|
||||
)
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
@@ -42,43 +37,23 @@ class Nonce(SessionBase):
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __get(conn, address):
|
||||
r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
|
||||
def __get(session, address):
|
||||
r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
|
||||
nonce = r.fetchone()
|
||||
session.flush()
|
||||
if nonce == None:
|
||||
return None
|
||||
return nonce[0]
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __set(conn, address, nonce):
|
||||
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
|
||||
|
||||
|
||||
@staticmethod
|
||||
def __init(conn, address, nonce):
|
||||
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
|
||||
|
||||
|
||||
@staticmethod
|
||||
def init(address, nonce=0, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = session.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==address)
|
||||
o = q.first()
|
||||
if o != None:
|
||||
session.flush()
|
||||
raise InitializationError('nonce on {} already exists ({})'.format(address, o.nonce))
|
||||
def __set(session, address, nonce):
|
||||
session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
|
||||
session.flush()
|
||||
Nonce.__init(session, address, nonce)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
|
||||
# TODO: Incrementing nonce MUST be done by separate tasks.
|
||||
@staticmethod
|
||||
def next(address, initial_if_not_exists=0):
|
||||
def next(address, initial_if_not_exists=0, session=None):
|
||||
"""Generate next nonce for the given address.
|
||||
|
||||
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
|
||||
@@ -90,96 +65,32 @@ class Nonce(SessionBase):
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
#session = SessionBase.bind_session(session)
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
#session.begin_nested()
|
||||
conn = Nonce.engine.connect()
|
||||
SessionBase.release_session(session)
|
||||
|
||||
session.begin_nested()
|
||||
#conn = Nonce.engine.connect()
|
||||
if Nonce.transactional:
|
||||
conn.execute('BEGIN')
|
||||
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
|
||||
logg.debug('locking nonce table for address {}'.format(address))
|
||||
nonce = Nonce.__get(conn, address)
|
||||
#session.execute('BEGIN')
|
||||
session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
|
||||
session.flush()
|
||||
nonce = Nonce.__get(session, address)
|
||||
logg.debug('get nonce {} for address {}'.format(nonce, address))
|
||||
if nonce == None:
|
||||
nonce = initial_if_not_exists
|
||||
session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
|
||||
session.flush()
|
||||
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
|
||||
Nonce.__init(conn, address, nonce)
|
||||
Nonce.__set(conn, address, nonce+1)
|
||||
if Nonce.transactional:
|
||||
conn.execute('COMMIT')
|
||||
logg.debug('unlocking nonce table for address {}'.format(address))
|
||||
conn.close()
|
||||
#session.commit()
|
||||
|
||||
#SessionBase.release_session(session)
|
||||
return nonce
|
||||
|
||||
|
||||
class NonceReservation(SessionBase):
|
||||
|
||||
__tablename__ = 'nonce_task_reservation'
|
||||
|
||||
nonce = Column(Integer)
|
||||
key = Column(String)
|
||||
date_created = Column(DateTime, default=datetime.datetime.utcnow)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def peek(key, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
q = session.query(NonceReservation)
|
||||
q = q.filter(NonceReservation.key==key)
|
||||
o = q.first()
|
||||
|
||||
nonce = None
|
||||
if o != None:
|
||||
nonce = o.nonce
|
||||
|
||||
session.flush()
|
||||
Nonce.__set(session, address, nonce+1)
|
||||
#if Nonce.transactional:
|
||||
#session.execute('COMMIT')
|
||||
#session.execute('UNLOCK TABLE nonce')
|
||||
#conn.close()
|
||||
session.commit()
|
||||
session.commit()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return nonce
|
||||
|
||||
|
||||
@staticmethod
|
||||
def release(key, session=None):
|
||||
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
nonce = NonceReservation.peek(key, session=session)
|
||||
|
||||
q = session.query(NonceReservation)
|
||||
q = q.filter(NonceReservation.key==key)
|
||||
o = q.first()
|
||||
|
||||
if o == None:
|
||||
raise IntegrityError('nonce for key {}'.format(nonce))
|
||||
SessionBase.release_session(session)
|
||||
|
||||
session.delete(o)
|
||||
session.flush()
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return nonce
|
||||
|
||||
|
||||
@staticmethod
|
||||
def next(address, key, session=None):
|
||||
session = SessionBase.bind_session(session)
|
||||
|
||||
if NonceReservation.peek(key, session) != None:
|
||||
raise IntegrityError('nonce for key {}'.format(key))
|
||||
|
||||
nonce = Nonce.next(address)
|
||||
|
||||
o = NonceReservation()
|
||||
o.nonce = nonce
|
||||
o.key = key
|
||||
session.add(o)
|
||||
|
||||
SessionBase.release_session(session)
|
||||
|
||||
return nonce
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
# third-party imports
|
||||
from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey
|
||||
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
|
||||
|
||||
@@ -400,7 +400,7 @@ class Otx(SessionBase):
|
||||
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
|
||||
|
||||
if confirmed:
|
||||
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
|
||||
if not self.status & StatusBits.OBSOLETE:
|
||||
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
|
||||
self.__set_status(StatusEnum.CANCELLED, session)
|
||||
else:
|
||||
|
||||
@@ -143,7 +143,7 @@ class TxCache(SessionBase):
|
||||
self.block_number = block_number
|
||||
self.tx_index = tx_index
|
||||
# not automatically set in sqlite, it seems:
|
||||
self.date_created = datetime.datetime.utcnow()
|
||||
self.date_created = datetime.datetime.now()
|
||||
self.date_updated = self.date_created
|
||||
self.date_checked = self.date_created
|
||||
|
||||
|
||||
@@ -54,13 +54,6 @@ class RoleMissingError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class IntegrityError(Exception):
|
||||
"""Exception raised to signal irregularities with deduplication and ordering of tasks
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class LockedError(Exception):
|
||||
"""Exception raised when attempt is made to execute action that is deactivated by lock
|
||||
|
||||
|
||||
@@ -36,7 +36,6 @@ class AccountTxFactory(TxFactory):
|
||||
self,
|
||||
address,
|
||||
chain_spec,
|
||||
uuid,
|
||||
session=None,
|
||||
):
|
||||
"""Register an Ethereum account address with the on-chain account registry
|
||||
@@ -60,7 +59,7 @@ class AccountTxFactory(TxFactory):
|
||||
'gas': gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(uuid, session=session),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'value': 0,
|
||||
})
|
||||
return tx_add
|
||||
@@ -70,7 +69,6 @@ class AccountTxFactory(TxFactory):
|
||||
self,
|
||||
address,
|
||||
chain_spec,
|
||||
uuid,
|
||||
session=None,
|
||||
):
|
||||
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
|
||||
@@ -92,7 +90,7 @@ class AccountTxFactory(TxFactory):
|
||||
'gas': gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(uuid, session=session),
|
||||
'nonce': self.next_nonce(session=session),
|
||||
'value': 0,
|
||||
})
|
||||
return tx_add
|
||||
@@ -158,9 +156,19 @@ def create(password, chain_str):
|
||||
logg.debug('created account {}'.format(a))
|
||||
|
||||
# Initialize nonce provider record for account
|
||||
# TODO: this can safely be set to zero, since we are randomly creating account
|
||||
n = c.w3.eth.getTransactionCount(a, 'pending')
|
||||
session = SessionBase.create_session()
|
||||
Nonce.init(a, session=session)
|
||||
session.commit()
|
||||
q = session.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==a)
|
||||
o = q.first()
|
||||
session.flush()
|
||||
if o == None:
|
||||
o = Nonce()
|
||||
o.address_hex = a
|
||||
o.nonce = n
|
||||
session.add(o)
|
||||
session.commit()
|
||||
session.close()
|
||||
return a
|
||||
|
||||
@@ -195,10 +203,9 @@ def register(self, account_address, chain_str, writer_address=None):
|
||||
c = RpcClient(chain_spec, holder_address=writer_address)
|
||||
txf = AccountTxFactory(writer_address, c)
|
||||
|
||||
tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session)
|
||||
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
|
||||
tx_add = txf.add(account_address, chain_spec, session=session)
|
||||
session.close()
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data')
|
||||
|
||||
gas_budget = tx_add['gas'] * tx_add['gasPrice']
|
||||
|
||||
@@ -234,14 +241,12 @@ def gift(self, account_address, chain_str):
|
||||
c = RpcClient(chain_spec, holder_address=account_address)
|
||||
txf = AccountTxFactory(account_address, c)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
|
||||
session.close()
|
||||
tx_add = txf.gift(account_address, chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data')
|
||||
|
||||
gas_budget = tx_add['gas'] * tx_add['gasPrice']
|
||||
|
||||
logg.debug('gift user tx {}'.format(tx_hash_hex))
|
||||
logg.debug('register user tx {}'.format(tx_hash_hex))
|
||||
s = create_check_gas_and_send_task(
|
||||
[tx_signed_raw_hex],
|
||||
chain_str,
|
||||
|
||||
@@ -32,10 +32,10 @@ class TxFactory:
|
||||
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
|
||||
|
||||
|
||||
def next_nonce(self, uuid, session=None):
|
||||
"""Returns the current reserved nonce value, and increments it for next transaction.
|
||||
def next_nonce(self, session=None):
|
||||
"""Returns the current cached nonce value, and increments it for next transaction.
|
||||
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
return self.nonce_oracle.next_by_task_uuid(uuid, session=session)
|
||||
return self.nonce_oracle.next(session=session)
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
# local imports
|
||||
from cic_eth.db.models.nonce import (
|
||||
Nonce,
|
||||
NonceReservation,
|
||||
)
|
||||
from cic_eth.db.models.nonce import Nonce
|
||||
|
||||
class NonceOracle():
|
||||
"""Ensures atomic nonce increments for all transactions across all tasks and threads.
|
||||
@@ -17,15 +14,10 @@ class NonceOracle():
|
||||
self.default_nonce = default_nonce
|
||||
|
||||
|
||||
def next(self):
|
||||
def next(self, session=None):
|
||||
"""Get next unique nonce.
|
||||
|
||||
:returns: Nonce
|
||||
:rtype: number
|
||||
"""
|
||||
raise AttributeError('this should not be called')
|
||||
return Nonce.next(self.address, self.default_nonce)
|
||||
|
||||
|
||||
def next_by_task_uuid(self, uuid, session=None):
|
||||
return NonceReservation.release(uuid, session=session)
|
||||
return Nonce.next(self.address, self.default_nonce, session=session)
|
||||
|
||||
194
apps/cic-eth/cic_eth/eth/request.py
Normal file
194
apps/cic-eth/cic_eth/eth/request.py
Normal file
@@ -0,0 +1,194 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
import web3
|
||||
import celery
|
||||
from erc20_approval_escrow import TransferApproval
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry.chain import ChainSpec
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
from cic_eth.eth import RpcClient
|
||||
from cic_eth.eth.factory import TxFactory
|
||||
from cic_eth.eth.task import sign_and_register_tx
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.eth.task import create_check_gas_and_send_task
|
||||
from cic_eth.error import TokenCountError
|
||||
|
||||
celery_app = celery.current_app
|
||||
logg = logging.getLogger()
|
||||
|
||||
contract_function_signatures = {
|
||||
'request': 'b0addede',
|
||||
}
|
||||
|
||||
|
||||
class TransferRequestTxFactory(TxFactory):
|
||||
"""Factory for creating Transfer request transactions using the TransferApproval contract backend
|
||||
"""
|
||||
def request(
|
||||
self,
|
||||
token_address,
|
||||
beneficiary_address,
|
||||
amount,
|
||||
chain_spec,
|
||||
):
|
||||
"""Create a new TransferApproval.request transaction
|
||||
|
||||
:param token_address: Token to create transfer request for
|
||||
:type token_address: str, 0x-hex
|
||||
:param beneficiary_address: Beneficiary of token transfer
|
||||
:type beneficiary_address: str, 0x-hex
|
||||
:param amount: Amount of tokens to transfer
|
||||
:type amount: number
|
||||
:param chain_spec: Chain spec
|
||||
:type chain_spec: cic_registry.chain.ChainSpec
|
||||
:returns: Transaction in standard Ethereum format
|
||||
:rtype: dict
|
||||
"""
|
||||
transfer_approval = CICRegistry.get_contract(chain_spec, 'TransferApproval', 'TransferAuthorization')
|
||||
fn = transfer_approval.function('createRequest')
|
||||
tx_approval_buildable = fn(beneficiary_address, token_address, amount)
|
||||
transfer_approval_gas = transfer_approval.gas('createRequest')
|
||||
|
||||
tx_approval = tx_approval_buildable.buildTransaction({
|
||||
'from': self.address,
|
||||
'gas': transfer_approval_gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(),
|
||||
})
|
||||
return tx_approval
|
||||
|
||||
|
||||
def unpack_transfer_approval_request(data):
|
||||
"""Verifies that a transaction is an "TransferApproval.request" transaction, and extracts call parameters from it.
|
||||
|
||||
:param data: Raw input data from Ethereum transaction.
|
||||
:type data: str, 0x-hex
|
||||
:raises ValueError: Function signature does not match AccountRegister.add
|
||||
:returns: Parsed parameters
|
||||
:rtype: dict
|
||||
"""
|
||||
f = data[2:10]
|
||||
if f != contract_function_signatures['request']:
|
||||
raise ValueError('Invalid transfer request data ({})'.format(f))
|
||||
|
||||
d = data[10:]
|
||||
return {
|
||||
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
|
||||
'token': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
|
||||
'amount': int(d[128:], 16)
|
||||
}
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def transfer_approval_request(self, tokens, holder_address, receiver_address, value, chain_str):
|
||||
"""Creates a new transfer approval
|
||||
|
||||
:param tokens: Token to generate transfer request for
|
||||
:type tokens: list with single token spec as dict
|
||||
:param holder_address: Address to generate transfer on behalf of
|
||||
:type holder_address: str, 0x-hex
|
||||
:param receiver_address: Address to transfser tokens to
|
||||
:type receiver_address: str, 0x-hex
|
||||
:param value: Amount of tokens to transfer
|
||||
:type value: number
|
||||
:param chain_spec: Chain spec string representation
|
||||
:type chain_spec: str
|
||||
:raises cic_eth.error.TokenCountError: More than one token in tokens argument
|
||||
:returns: Raw signed transaction
|
||||
:rtype: list with transaction as only element
|
||||
"""
|
||||
|
||||
if len(tokens) != 1:
|
||||
raise TokenCountError
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
|
||||
t = tokens[0]
|
||||
|
||||
c = RpcClient(holder_address)
|
||||
|
||||
txf = TransferRequestTxFactory(holder_address, c)
|
||||
|
||||
tx_transfer = txf.request(t['address'], receiver_address, value, chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, 'cic_eth.eth.request.otx_cache_transfer_approval_request')
|
||||
|
||||
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
|
||||
|
||||
s = create_check_gas_and_send_task(
|
||||
[tx_signed_raw_hex],
|
||||
chain_str,
|
||||
holder_address,
|
||||
gas_budget,
|
||||
[tx_hash_hex],
|
||||
queue,
|
||||
)
|
||||
s.apply_async()
|
||||
return [tx_signed_raw_hex]
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
def otx_cache_transfer_approval_request(
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
chain_str,
|
||||
):
|
||||
"""Generates and commits transaction cache metadata for an TransferApproval.request transaction
|
||||
|
||||
:param tx_hash_hex: Transaction hash
|
||||
:type tx_hash_hex: str, 0x-hex
|
||||
:param tx_signed_raw_hex: Raw signed transaction
|
||||
:type tx_signed_raw_hex: str, 0x-hex
|
||||
:param chain_str: Chain spec string representation
|
||||
:type chain_str: str
|
||||
:returns: Transaction hash and id of cache element in storage backend, respectively
|
||||
:rtype: tuple
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
|
||||
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
|
||||
logg.debug('in otx acche transfer approval request')
|
||||
(txc, cache_id) = cache_transfer_approval_request_data(tx_hash_hex, tx)
|
||||
return txc
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
def cache_transfer_approval_request_data(
|
||||
tx_hash_hex,
|
||||
tx,
|
||||
):
|
||||
"""Helper function for otx_cache_transfer_approval_request
|
||||
|
||||
:param tx_hash_hex: Transaction hash
|
||||
:type tx_hash_hex: str, 0x-hex
|
||||
:param tx: Signed raw transaction
|
||||
:type tx: str, 0x-hex
|
||||
:returns: Transaction hash and id of cache element in storage backend, respectively
|
||||
:rtype: tuple
|
||||
"""
|
||||
tx_data = unpack_transfer_approval_request(tx['data'])
|
||||
logg.debug('tx approval request data {}'.format(tx_data))
|
||||
logg.debug('tx approval request {}'.format(tx))
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_cache = TxCache(
|
||||
tx_hash_hex,
|
||||
tx['from'],
|
||||
tx_data['to'],
|
||||
tx_data['token'],
|
||||
tx_data['token'],
|
||||
tx_data['amount'],
|
||||
tx_data['amount'],
|
||||
)
|
||||
session.add(tx_cache)
|
||||
session.commit()
|
||||
cache_id = tx_cache.id
|
||||
session.close()
|
||||
return (tx_hash_hex, cache_id)
|
||||
@@ -46,8 +46,6 @@ class TokenTxFactory(TxFactory):
|
||||
spender_address,
|
||||
amount,
|
||||
chain_spec,
|
||||
uuid,
|
||||
session=None,
|
||||
):
|
||||
"""Create an ERC20 "approve" transaction
|
||||
|
||||
@@ -75,7 +73,7 @@ class TokenTxFactory(TxFactory):
|
||||
'gas': source_token_gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(uuid, session=session),
|
||||
'nonce': self.next_nonce(),
|
||||
})
|
||||
return tx_approve
|
||||
|
||||
@@ -86,8 +84,6 @@ class TokenTxFactory(TxFactory):
|
||||
receiver_address,
|
||||
value,
|
||||
chain_spec,
|
||||
uuid,
|
||||
session=None,
|
||||
):
|
||||
"""Create an ERC20 "transfer" transaction
|
||||
|
||||
@@ -116,7 +112,7 @@ class TokenTxFactory(TxFactory):
|
||||
'gas': source_token_gas,
|
||||
'gasPrice': self.gas_price,
|
||||
'chainId': chain_spec.chain_id(),
|
||||
'nonce': self.next_nonce(uuid, session=session),
|
||||
'nonce': self.next_nonce(),
|
||||
})
|
||||
return tx_transfer
|
||||
|
||||
@@ -248,11 +244,9 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
|
||||
c = RpcClient(chain_spec, holder_address=holder_address)
|
||||
|
||||
txf = TokenTxFactory(holder_address, c)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, self.request.root_id, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session)
|
||||
session.close()
|
||||
|
||||
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer')
|
||||
|
||||
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
|
||||
|
||||
@@ -305,10 +299,8 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
|
||||
|
||||
txf = TokenTxFactory(holder_address, c)
|
||||
|
||||
session = SessionBase.create_session()
|
||||
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, self.request.root_id, session=session)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session)
|
||||
session.close()
|
||||
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve')
|
||||
|
||||
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
|
||||
|
||||
@@ -467,7 +459,6 @@ def cache_approve_data(
|
||||
return (tx_hash_hex, cache_id)
|
||||
|
||||
|
||||
# TODO: Move to dedicated metadata package
|
||||
class ExtendedTx:
|
||||
|
||||
_default_decimals = 6
|
||||
|
||||
@@ -12,7 +12,6 @@ from cic_registry.chain import ChainSpec
|
||||
from .rpc import RpcClient
|
||||
from cic_eth.db import Otx, SessionBase
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.models.nonce import NonceReservation
|
||||
from cic_eth.db.models.lock import Lock
|
||||
from cic_eth.db.enum import (
|
||||
LockEnum,
|
||||
@@ -70,6 +69,7 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
|
||||
for i in range(len(tx_hashes)):
|
||||
o = get_tx(tx_hashes[i])
|
||||
txs.append(o['signed_tx'])
|
||||
logg.debug('ooooo {}'.format(o))
|
||||
if address == None:
|
||||
address = o['address']
|
||||
|
||||
@@ -85,23 +85,15 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
|
||||
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
|
||||
|
||||
if gas_required > balance:
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
address,
|
||||
c.gas_provider(),
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
s_refill_gas = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
address,
|
||||
chain_str,
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
s_nonce.link(s_refill_gas)
|
||||
s_nonce.apply_async()
|
||||
s_refill_gas.apply_async()
|
||||
wait_tasks = []
|
||||
for tx_hash in tx_hashes:
|
||||
s = celery.signature(
|
||||
@@ -117,23 +109,15 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
|
||||
|
||||
safe_gas = c.safe_threshold_amount()
|
||||
if balance < safe_gas:
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
address,
|
||||
c.gas_provider(),
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
s_refill_gas = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
address,
|
||||
chain_str,
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
s_nonce.link(s_refill)
|
||||
s_nonce.apply_async()
|
||||
s_refill_gas.apply_async()
|
||||
logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address))
|
||||
ready_tasks = []
|
||||
for tx_hash in tx_hashes:
|
||||
@@ -194,7 +178,12 @@ class ParityNodeHandler:
|
||||
def handle(self, exception, tx_hash_hex, tx_hex):
|
||||
meth = self.handle_default
|
||||
if isinstance(exception, (ValueError)):
|
||||
|
||||
# s_debug = celery.signature(
|
||||
# 'cic_eth.admin.debug.out_tmp',
|
||||
# [tx_hash_hex, '{}: {}'.format(tx_hash_hex, exception)],
|
||||
# queue=queue,
|
||||
# )
|
||||
# s_debug.apply_async()
|
||||
earg = exception.args[0]
|
||||
if earg['code'] == -32010:
|
||||
logg.debug('skipping lock for code {}'.format(earg['code']))
|
||||
@@ -202,15 +191,14 @@ class ParityNodeHandler:
|
||||
elif earg['code'] == -32602:
|
||||
meth = self.handle_invalid_encoding
|
||||
else:
|
||||
# TODO: move to status log db comment field
|
||||
meth = self.handle_invalid
|
||||
elif isinstance(exception, (requests.exceptions.ConnectionError)):
|
||||
meth = self.handle_connection
|
||||
(t, e_fn, message) = meth(tx_hash_hex, tx_hex, str(exception))
|
||||
(t, e_fn, message) = meth(tx_hash_hex, tx_hex)
|
||||
return (t, e_fn, '{} {}'.format(message, exception))
|
||||
|
||||
|
||||
def handle_connection(self, tx_hash_hex, tx_hex, debugstr=None):
|
||||
def handle_connection(self, tx_hash_hex, tx_hex):
|
||||
s_set_sent = celery.signature(
|
||||
'cic_eth.queue.tx.set_sent_status',
|
||||
[
|
||||
@@ -223,7 +211,7 @@ class ParityNodeHandler:
|
||||
return (t, TemporaryTxError, 'Sendfail {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||||
|
||||
|
||||
def handle_invalid_encoding(self, tx_hash_hex, tx_hex, debugstr=None):
|
||||
def handle_invalid_encoding(self, tx_hash_hex, tx_hex):
|
||||
tx_bytes = bytes.fromhex(tx_hex[2:])
|
||||
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
|
||||
s_lock = celery.signature(
|
||||
@@ -270,7 +258,7 @@ class ParityNodeHandler:
|
||||
return (t, PermanentTxError, 'Reject invalid encoding {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||||
|
||||
|
||||
def handle_invalid_parameters(self, tx_hash_hex, tx_hex, debugstr=None):
|
||||
def handle_invalid_parameters(self, tx_hash_hex, tx_hex):
|
||||
s_sync = celery.signature(
|
||||
'cic_eth.eth.tx.sync_tx',
|
||||
[
|
||||
@@ -283,7 +271,7 @@ class ParityNodeHandler:
|
||||
return (t, PermanentTxError, 'Reject invalid parameters {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||||
|
||||
|
||||
def handle_invalid(self, tx_hash_hex, tx_hex, debugstr=None):
|
||||
def handle_invalid(self, tx_hash_hex, tx_hex):
|
||||
tx_bytes = bytes.fromhex(tx_hex[2:])
|
||||
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
|
||||
s_lock = celery.signature(
|
||||
@@ -301,21 +289,12 @@ class ParityNodeHandler:
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_debug = celery.signature(
|
||||
'cic_eth.admin.debug.alert',
|
||||
[
|
||||
tx_hash_hex,
|
||||
debugstr,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_set_reject.link(s_debug)
|
||||
s_lock.link(s_set_reject)
|
||||
t = s_lock.apply_async()
|
||||
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||||
|
||||
|
||||
def handle_default(self, tx_hash_hex, tx_hex, debugstr):
|
||||
def handle_default(self, tx_hash_hex, tx_hex):
|
||||
tx_bytes = bytes.fromhex(tx_hex[2:])
|
||||
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
|
||||
s_lock = celery.signature(
|
||||
@@ -333,18 +312,9 @@ class ParityNodeHandler:
|
||||
[],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_debug = celery.signature(
|
||||
'cic_eth.admin.debug.alert',
|
||||
[
|
||||
tx_hash_hex,
|
||||
debugstr,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_set_fubar.link(s_debug)
|
||||
s_lock.link(s_set_fubar)
|
||||
t = s_lock.apply_async()
|
||||
return (t, PermanentTxError, 'Fubar {} {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()), debugstr))
|
||||
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||||
|
||||
|
||||
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
|
||||
@@ -417,7 +387,6 @@ def send(self, txs, chain_str):
|
||||
|
||||
|
||||
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
|
||||
# TODO: method is too long, factor out code for clarity
|
||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
|
||||
def refill_gas(self, recipient_address, chain_str):
|
||||
"""Executes a native token transaction to fund the recipient's gas expenditures.
|
||||
@@ -432,7 +401,6 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
"""
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
|
||||
zero_amount = False
|
||||
session = SessionBase.create_session()
|
||||
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
|
||||
q = session.query(Otx.tx_hash)
|
||||
@@ -441,12 +409,9 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
q = q.filter(TxCache.from_value!=0)
|
||||
q = q.filter(TxCache.recipient==recipient_address)
|
||||
c = q.count()
|
||||
session.close()
|
||||
if c > 0:
|
||||
#session.close()
|
||||
#raise AlreadyFillingGasError(recipient_address)
|
||||
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
|
||||
zero_amount = True
|
||||
session.flush()
|
||||
raise AlreadyFillingGasError(recipient_address)
|
||||
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
|
||||
@@ -455,13 +420,10 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
|
||||
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
|
||||
nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
|
||||
#nonce = nonce_generator.next(session=session)
|
||||
nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session)
|
||||
nonce = nonce_generator.next()
|
||||
gas_price = c.gas_price()
|
||||
gas_limit = c.default_gas_limit
|
||||
refill_amount = 0
|
||||
if not zero_amount:
|
||||
refill_amount = c.refill_amount()
|
||||
refill_amount = c.refill_amount()
|
||||
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
|
||||
|
||||
# create and sign transaction
|
||||
@@ -487,9 +449,7 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
tx_hash_hex,
|
||||
tx_send_gas_signed['raw'],
|
||||
chain_str,
|
||||
session=session,
|
||||
)
|
||||
session.close()
|
||||
|
||||
s_tx_cache = celery.signature(
|
||||
'cic_eth.eth.tx.cache_gas_refill_data',
|
||||
@@ -507,7 +467,6 @@ def refill_gas(self, recipient_address, chain_str):
|
||||
queue=queue,
|
||||
)
|
||||
celery.group(s_tx_cache, s_status)()
|
||||
|
||||
return tx_send_gas_signed['raw']
|
||||
|
||||
|
||||
@@ -533,8 +492,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==txold_hash_hex)
|
||||
otx = q.first()
|
||||
session.close()
|
||||
if otx == None:
|
||||
session.close()
|
||||
raise NotLocalTxError(txold_hash_hex)
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||||
@@ -569,10 +528,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
chain_str,
|
||||
session=session,
|
||||
)
|
||||
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
|
||||
session.close()
|
||||
TxCache.clone(txold_hash_hex, tx_hash_hex)
|
||||
|
||||
s = create_check_gas_and_send_task(
|
||||
[tx_signed_raw_hex],
|
||||
@@ -587,30 +544,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
||||
return tx_hash_hex
|
||||
|
||||
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
def reserve_nonce(self, chained_input, address=None):
|
||||
session = SessionBase.create_session()
|
||||
|
||||
if address == None:
|
||||
address = chained_input
|
||||
|
||||
root_id = self.request.root_id
|
||||
nonce = NonceReservation.next(address, root_id)
|
||||
|
||||
session.close()
|
||||
|
||||
return chained_input
|
||||
|
||||
|
||||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
|
||||
def sync_tx(self, tx_hash_hex, chain_str):
|
||||
"""Force update of network status of a simgle transaction
|
||||
|
||||
:param tx_hash_hex: Transaction hash
|
||||
:type tx_hash_hex: str, 0x-hex
|
||||
:param chain_str: Chain spec string representation
|
||||
:type chain_str: str
|
||||
"""
|
||||
|
||||
queue = self.request.delivery_info['routing_key']
|
||||
|
||||
|
||||
@@ -3,11 +3,10 @@ import logging
|
||||
import sha3
|
||||
import web3
|
||||
|
||||
# external imports
|
||||
# third-party imports
|
||||
from rlp import decode as rlp_decode
|
||||
from rlp import encode as rlp_encode
|
||||
from eth_keys import KeyAPI
|
||||
from chainlib.eth.tx import unpack
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
@@ -23,65 +22,64 @@ field_debugs = [
|
||||
's',
|
||||
]
|
||||
|
||||
unpack_signed_raw_tx = unpack
|
||||
|
||||
#def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
|
||||
# d = rlp_decode(tx_raw_bytes)
|
||||
#
|
||||
# logg.debug('decoding {} using chain id {}'.format(tx_raw_bytes.hex(), chain_id))
|
||||
# j = 0
|
||||
# for i in d:
|
||||
# logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
|
||||
# j += 1
|
||||
# vb = chain_id
|
||||
# if chain_id != 0:
|
||||
# v = int.from_bytes(d[6], 'big')
|
||||
# vb = v - (chain_id * 2) - 35
|
||||
# while len(d[7]) < 32:
|
||||
# d[7] = b'\x00' + d[7]
|
||||
# while len(d[8]) < 32:
|
||||
# d[8] = b'\x00' + d[8]
|
||||
# s = b''.join([d[7], d[8], bytes([vb])])
|
||||
# so = KeyAPI.Signature(signature_bytes=s)
|
||||
#
|
||||
# h = sha3.keccak_256()
|
||||
# h.update(rlp_encode(d))
|
||||
# signed_hash = h.digest()
|
||||
#
|
||||
# d[6] = chain_id
|
||||
# d[7] = b''
|
||||
# d[8] = b''
|
||||
#
|
||||
# h = sha3.keccak_256()
|
||||
# h.update(rlp_encode(d))
|
||||
# unsigned_hash = h.digest()
|
||||
#
|
||||
# p = so.recover_public_key_from_msg_hash(unsigned_hash)
|
||||
# a = p.to_checksum_address()
|
||||
# logg.debug('decoded recovery byte {}'.format(vb))
|
||||
# logg.debug('decoded address {}'.format(a))
|
||||
# logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
|
||||
# logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
|
||||
#
|
||||
# to = d[3].hex() or None
|
||||
# if to != None:
|
||||
# to = web3.Web3.toChecksumAddress('0x' + to)
|
||||
#
|
||||
# return {
|
||||
# 'from': a,
|
||||
# 'nonce': int.from_bytes(d[0], 'big'),
|
||||
# 'gasPrice': int.from_bytes(d[1], 'big'),
|
||||
# 'gas': int.from_bytes(d[2], 'big'),
|
||||
# 'to': to,
|
||||
# 'value': int.from_bytes(d[4], 'big'),
|
||||
# 'data': '0x' + d[5].hex(),
|
||||
# 'v': chain_id,
|
||||
# 'r': '0x' + s[:32].hex(),
|
||||
# 's': '0x' + s[32:64].hex(),
|
||||
# 'chainId': chain_id,
|
||||
# 'hash': '0x' + signed_hash.hex(),
|
||||
# 'hash_unsigned': '0x' + unsigned_hash.hex(),
|
||||
# }
|
||||
def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
|
||||
d = rlp_decode(tx_raw_bytes)
|
||||
|
||||
logg.debug('decoding using chain id {}'.format(chain_id))
|
||||
j = 0
|
||||
for i in d:
|
||||
logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
|
||||
j += 1
|
||||
vb = chain_id
|
||||
if chain_id != 0:
|
||||
v = int.from_bytes(d[6], 'big')
|
||||
vb = v - (chain_id * 2) - 35
|
||||
while len(d[7]) < 32:
|
||||
d[7] = b'\x00' + d[7]
|
||||
while len(d[8]) < 32:
|
||||
d[8] = b'\x00' + d[8]
|
||||
s = b''.join([d[7], d[8], bytes([vb])])
|
||||
so = KeyAPI.Signature(signature_bytes=s)
|
||||
|
||||
h = sha3.keccak_256()
|
||||
h.update(rlp_encode(d))
|
||||
signed_hash = h.digest()
|
||||
|
||||
d[6] = chain_id
|
||||
d[7] = b''
|
||||
d[8] = b''
|
||||
|
||||
h = sha3.keccak_256()
|
||||
h.update(rlp_encode(d))
|
||||
unsigned_hash = h.digest()
|
||||
|
||||
p = so.recover_public_key_from_msg_hash(unsigned_hash)
|
||||
a = p.to_checksum_address()
|
||||
logg.debug('decoded recovery byte {}'.format(vb))
|
||||
logg.debug('decoded address {}'.format(a))
|
||||
logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
|
||||
logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
|
||||
|
||||
to = d[3].hex() or None
|
||||
if to != None:
|
||||
to = web3.Web3.toChecksumAddress('0x' + to)
|
||||
|
||||
return {
|
||||
'from': a,
|
||||
'nonce': int.from_bytes(d[0], 'big'),
|
||||
'gasPrice': int.from_bytes(d[1], 'big'),
|
||||
'gas': int.from_bytes(d[2], 'big'),
|
||||
'to': to,
|
||||
'value': int.from_bytes(d[4], 'big'),
|
||||
'data': '0x' + d[5].hex(),
|
||||
'v': chain_id,
|
||||
'r': '0x' + s[:32].hex(),
|
||||
's': '0x' + s[32:64].hex(),
|
||||
'chainId': chain_id,
|
||||
'hash': '0x' + signed_hash.hex(),
|
||||
'hash_unsigned': '0x' + unsigned_hash.hex(),
|
||||
}
|
||||
|
||||
|
||||
def unpack_signed_raw_tx_hex(tx_raw_hex, chain_id):
|
||||
|
||||
@@ -30,7 +30,6 @@ from cic_eth.task import CriticalSQLAlchemyTask
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
|
||||
from cic_eth.error import NotLocalTxError
|
||||
from cic_eth.error import LockedError
|
||||
from cic_eth.db.enum import status_str
|
||||
|
||||
celery_app = celery.current_app
|
||||
#logg = celery_app.log.get_default_logger()
|
||||
@@ -315,9 +314,7 @@ def set_ready(tx_hash):
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def set_dequeue(tx_hash):
|
||||
session = SessionBase.create_session()
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash)
|
||||
o = q.first()
|
||||
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
|
||||
if o == None:
|
||||
session.close()
|
||||
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
|
||||
@@ -408,7 +405,7 @@ def get_tx_cache(tx_hash):
|
||||
'tx_hash': otx.tx_hash,
|
||||
'signed_tx': otx.signed_tx,
|
||||
'nonce': otx.nonce,
|
||||
'status': status_str(otx.status),
|
||||
'status': StatusEnum(otx.status).name,
|
||||
'status_code': otx.status,
|
||||
'source_token': txc.source_token_address,
|
||||
'destination_token': txc.destination_token_address,
|
||||
@@ -568,7 +565,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
|
||||
return txs
|
||||
|
||||
|
||||
def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, session=None):
|
||||
def get_status_tx(status, before=None, exact=False, limit=0, session=None):
|
||||
"""Retrieve transaction with a specific queue status.
|
||||
|
||||
:param status: Status to match transactions with
|
||||
@@ -584,15 +581,11 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se
|
||||
session = SessionBase.bind_session(session)
|
||||
q = session.query(Otx)
|
||||
q = q.join(TxCache)
|
||||
# before = datetime.datetime.utcnow()
|
||||
if before != None:
|
||||
q = q.filter(TxCache.date_updated<before)
|
||||
q = q.filter(TxCache.date_updated<before)
|
||||
if exact:
|
||||
q = q.filter(Otx.status==status)
|
||||
q = q.filter(Otx.status==status.value)
|
||||
else:
|
||||
q = q.filter(Otx.status.op('&')(status)>0)
|
||||
if not_status != None:
|
||||
q = q.filter(Otx.status.op('&')(not_status)==0)
|
||||
q = q.filter(Otx.status.op('&')(status.value)==status.value)
|
||||
i = 0
|
||||
for o in q.all():
|
||||
if limit > 0 and i == limit:
|
||||
|
||||
@@ -15,8 +15,6 @@ import web3
|
||||
from web3 import HTTPProvider, WebsocketProvider
|
||||
from cic_registry import CICRegistry
|
||||
from cic_registry.chain import ChainSpec
|
||||
from chainlib.eth.tx import unpack
|
||||
from hexathon import strip_0x
|
||||
|
||||
# local imports
|
||||
import cic_eth
|
||||
@@ -38,7 +36,7 @@ from cic_eth.error import (
|
||||
TemporaryTxError,
|
||||
NotLocalTxError,
|
||||
)
|
||||
#from cic_eth.eth.util import unpack_signed_raw_tx_hex
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx_hex
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
@@ -117,15 +115,12 @@ class DispatchSyncer:
|
||||
chain_str = str(self.chain_spec)
|
||||
for k in txs.keys():
|
||||
tx_raw = txs[k]
|
||||
#tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
|
||||
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
|
||||
tx = unpack(tx_raw_bytes, self.chain_spec.chain_id())
|
||||
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
|
||||
|
||||
try:
|
||||
set_dequeue(tx['hash'])
|
||||
except NotLocalTxError as e:
|
||||
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
|
||||
continue
|
||||
|
||||
s_check = celery.signature(
|
||||
'cic_eth.admin.ctrl.check_lock',
|
||||
|
||||
@@ -2,4 +2,3 @@ from .callback import CallbackFilter
|
||||
from .tx import TxFilter
|
||||
from .gas import GasFilter
|
||||
from .register import RegistrationFilter
|
||||
from .transferauth import TransferAuthFilter
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
# third-party imports
|
||||
from cic_registry.chain import ChainSpec
|
||||
from hexathon import add_0x
|
||||
|
||||
@@ -24,7 +24,7 @@ class GasFilter(SyncFilter):
|
||||
self.chain_spec = chain_spec
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, session):
|
||||
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
|
||||
tx_hash_hex = add_0x(tx.hash)
|
||||
if tx.value > 0:
|
||||
logg.debug('gas refill tx {}'.format(tx_hash_hex))
|
||||
|
||||
@@ -26,6 +26,7 @@ class RegistrationFilter(SyncFilter):
|
||||
|
||||
def filter(self, conn, block, tx, db_session=None):
|
||||
registered_address = None
|
||||
logg.debug('register filter checking log {}'.format(tx.logs))
|
||||
for l in tx.logs:
|
||||
event_topic_hex = l['topics'][0]
|
||||
if event_topic_hex == account_registry_add_log_hash:
|
||||
@@ -33,23 +34,16 @@ class RegistrationFilter(SyncFilter):
|
||||
|
||||
address_hex = strip_0x(l['topics'][1])[64-40:]
|
||||
address = to_checksum(add_0x(address_hex))
|
||||
logg.info('request token gift to {}'.format(address))
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
address,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_gift = celery.signature(
|
||||
logg.debug('request token gift to {}'.format(address))
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.account.gift',
|
||||
[
|
||||
address,
|
||||
str(self.chain_spec),
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce.link(s_gift)
|
||||
s_nonce.apply_async()
|
||||
s.apply_async()
|
||||
|
||||
|
||||
def __str__(self):
|
||||
|
||||
@@ -1,97 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
import celery
|
||||
from hexathon import (
|
||||
strip_0x,
|
||||
add_0x,
|
||||
)
|
||||
from chainlib.eth.address import to_checksum
|
||||
from .base import SyncFilter
|
||||
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
transfer_request_signature = 'ed71262a'
|
||||
|
||||
def unpack_create_request(data):
|
||||
|
||||
data = strip_0x(data)
|
||||
cursor = 0
|
||||
f = data[cursor:cursor+8]
|
||||
cursor += 8
|
||||
|
||||
if f != transfer_request_signature:
|
||||
raise ValueError('Invalid create request data ({})'.format(f))
|
||||
|
||||
o = {}
|
||||
o['sender'] = data[cursor+24:cursor+64]
|
||||
cursor += 64
|
||||
o['recipient'] = data[cursor+24:cursor+64]
|
||||
cursor += 64
|
||||
o['token'] = data[cursor+24:cursor+64]
|
||||
cursor += 64
|
||||
o['value'] = int(data[cursor:], 16)
|
||||
return o
|
||||
|
||||
|
||||
class TransferAuthFilter(SyncFilter):
|
||||
|
||||
def __init__(self, registry, chain_spec, queue=None):
|
||||
self.queue = queue
|
||||
self.chain_spec = chain_spec
|
||||
self.transfer_request_contract = registry.get_contract(self.chain_spec, 'TransferAuthorization')
|
||||
|
||||
|
||||
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
|
||||
|
||||
if tx.payload == None:
|
||||
logg.debug('no payload')
|
||||
return False
|
||||
|
||||
payloadlength = len(tx.payload)
|
||||
if payloadlength != 8+256:
|
||||
logg.debug('{} below minimum length for a transfer auth call'.format(payloadlength))
|
||||
logg.debug('payload {}'.format(tx.payload))
|
||||
return False
|
||||
|
||||
recipient = tx.inputs[0]
|
||||
if recipient != self.transfer_request_contract.address():
|
||||
logg.debug('not our transfer auth contract address {}'.format(recipient))
|
||||
return False
|
||||
|
||||
o = unpack_create_request(tx.payload)
|
||||
|
||||
sender = add_0x(to_checksum(o['sender']))
|
||||
recipient = add_0x(to_checksum(recipient))
|
||||
token = add_0x(to_checksum(o['token']))
|
||||
token_data = {
|
||||
'address': token,
|
||||
}
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
[token_data],
|
||||
sender,
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_approve = celery.signature(
|
||||
'cic_eth.eth.token.approve',
|
||||
[
|
||||
sender,
|
||||
recipient,
|
||||
o['value'],
|
||||
str(self.chain_spec),
|
||||
],
|
||||
queue=self.queue,
|
||||
)
|
||||
s_nonce.link(s_approve)
|
||||
t = s_nonce.apply_async()
|
||||
return True
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return 'cic-eth transfer auth filter'
|
||||
@@ -30,7 +30,7 @@ class TxFilter(SyncFilter):
|
||||
if otx == None:
|
||||
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
|
||||
return None
|
||||
logg.info('tx filter match on {}'.format(otx.tx_hash))
|
||||
logg.info('local tx match {}'.format(otx.tx_hash))
|
||||
SessionBase.release_session(db_session)
|
||||
s = celery.signature(
|
||||
'cic_eth.queue.tx.set_final_status',
|
||||
|
||||
@@ -138,7 +138,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_str):
|
||||
|
||||
# TODO: can we merely use the dispatcher instead?
|
||||
def dispatch(chain_str):
|
||||
txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow())
|
||||
txs = get_status_tx(StatusEnum.RETRY, datetime.datetime.utcnow())
|
||||
if len(txs) == 0:
|
||||
logg.debug('no retry state txs found')
|
||||
return
|
||||
|
||||
@@ -26,6 +26,7 @@ from cic_eth.eth import bancor
|
||||
from cic_eth.eth import token
|
||||
from cic_eth.eth import tx
|
||||
from cic_eth.eth import account
|
||||
from cic_eth.eth import request
|
||||
from cic_eth.admin import debug
|
||||
from cic_eth.admin import ctrl
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
@@ -40,6 +41,7 @@ from cic_eth.db.models.base import SessionBase
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db import dsn_from_config
|
||||
from cic_eth.ext import tx
|
||||
from cic_eth.ext import address
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
|
||||
@@ -58,7 +58,6 @@ from cic_eth.runnable.daemons.filters import (
|
||||
GasFilter,
|
||||
TxFilter,
|
||||
RegistrationFilter,
|
||||
TransferAuthFilter,
|
||||
)
|
||||
|
||||
script_dir = os.path.realpath(os.path.dirname(__file__))
|
||||
@@ -147,8 +146,6 @@ def main():
|
||||
|
||||
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
|
||||
|
||||
transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE'))
|
||||
|
||||
i = 0
|
||||
for syncer in syncers:
|
||||
logg.debug('running syncer index {}'.format(i))
|
||||
@@ -156,7 +153,6 @@ def main():
|
||||
syncer.add_filter(registration_filter)
|
||||
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
|
||||
syncer.add_filter(tx_filter)
|
||||
syncer.add_filter(transfer_auth_filter)
|
||||
for cf in callback_filters:
|
||||
syncer.add_filter(cf)
|
||||
|
||||
|
||||
@@ -55,25 +55,62 @@ SessionBase.connect(dsn)
|
||||
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
|
||||
queue = args.q
|
||||
|
||||
re_something = r'^/something/?'
|
||||
re_transfer_approval_request = r'^/transferrequest/?'
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
|
||||
|
||||
def process_something(session, env):
|
||||
r = re.match(re_something, env.get('PATH_INFO'))
|
||||
def process_transfer_approval_request(session, env):
|
||||
r = re.match(re_transfer_approval_request, env.get('PATH_INFO'))
|
||||
if not r:
|
||||
return None
|
||||
|
||||
#if env.get('CONTENT_TYPE') != 'application/json':
|
||||
# raise AttributeError('content type')
|
||||
if env.get('CONTENT_TYPE') != 'application/json':
|
||||
raise AttributeError('content type')
|
||||
|
||||
#if env.get('REQUEST_METHOD') != 'POST':
|
||||
# raise AttributeError('method')
|
||||
if env.get('REQUEST_METHOD') != 'POST':
|
||||
raise AttributeError('method')
|
||||
|
||||
#post_data = json.load(env.get('wsgi.input'))
|
||||
|
||||
#return ('text/plain', 'foo'.encode('utf-8'),)
|
||||
post_data = json.load(env.get('wsgi.input'))
|
||||
token_address = web3.Web3.toChecksumAddress(post_data['token_address'])
|
||||
holder_address = web3.Web3.toChecksumAddress(post_data['holder_address'])
|
||||
beneficiary_address = web3.Web3.toChecksumAddress(post_data['beneficiary_address'])
|
||||
value = int(post_data['value'])
|
||||
|
||||
logg.debug('transfer approval request token {} to {} from {} value {}'.format(
|
||||
token_address,
|
||||
beneficiary_address,
|
||||
holder_address,
|
||||
value,
|
||||
)
|
||||
)
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.request.transfer_approval_request',
|
||||
[
|
||||
[
|
||||
{
|
||||
'address': token_address,
|
||||
},
|
||||
],
|
||||
holder_address,
|
||||
beneficiary_address,
|
||||
value,
|
||||
config.get('CIC_CHAIN_SPEC'),
|
||||
],
|
||||
queue=queue,
|
||||
)
|
||||
t = s.apply_async()
|
||||
r = t.get()
|
||||
tx_raw_bytes = bytes.fromhex(r[0][2:])
|
||||
tx = unpack_signed_raw_tx(tx_raw_bytes, chain_spec.chain_id())
|
||||
for r in t.collect():
|
||||
logg.debug('result {}'.format(r))
|
||||
|
||||
if not t.successful():
|
||||
raise RuntimeError(tx['hash'])
|
||||
|
||||
return ('text/plain', tx['hash'].encode('utf-8'),)
|
||||
|
||||
|
||||
# uwsgi application
|
||||
@@ -88,7 +125,7 @@ def application(env, start_response):
|
||||
|
||||
session = SessionBase.create_session()
|
||||
for handler in [
|
||||
process_something,
|
||||
process_transfer_approval_request,
|
||||
]:
|
||||
try:
|
||||
r = handler(session, env)
|
||||
@@ -23,11 +23,8 @@ from hexathon import add_0x
|
||||
# local imports
|
||||
from cic_eth.api import AdminApi
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
from cic_eth.db.enum import (
|
||||
StatusEnum,
|
||||
status_str,
|
||||
LockEnum,
|
||||
)
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.db.enum import LockEnum
|
||||
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logg = logging.getLogger()
|
||||
@@ -43,7 +40,6 @@ argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument('-p', '--provider', dest='p', type=str, help='Web3 provider url (http only)')
|
||||
argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address')
|
||||
argparser.add_argument('-f', '--format', dest='f', default='terminal', type=str, help='Output format')
|
||||
argparser.add_argument('--status-raw', dest='status_raw', action='store_true', help='Output statis bit enum names only')
|
||||
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
|
||||
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
|
||||
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
|
||||
@@ -123,7 +119,7 @@ def render_tx(o, **kwargs):
|
||||
|
||||
for v in o.get('status_log', []):
|
||||
d = datetime.datetime.fromisoformat(v[0])
|
||||
e = status_str(v[1], args.status_raw)
|
||||
e = StatusEnum(v[1]).name
|
||||
content += '{}: {}\n'.format(d, e)
|
||||
|
||||
return content
|
||||
|
||||
@@ -9,10 +9,7 @@ import celery
|
||||
# local imports
|
||||
from .base import Syncer
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
from cic_eth.db.enum import (
|
||||
StatusEnum,
|
||||
StatusBits,
|
||||
)
|
||||
from cic_eth.db.enum import StatusEnum
|
||||
from cic_eth.queue.tx import get_status_tx
|
||||
|
||||
logg = logging.getLogger()
|
||||
@@ -50,8 +47,7 @@ class RetrySyncer(Syncer):
|
||||
# )
|
||||
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
|
||||
stalled_txs = get_status_tx(
|
||||
StatusBits.IN_NETWORK.value,
|
||||
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
|
||||
StatusEnum.SENT.value,
|
||||
before=before,
|
||||
)
|
||||
# return list(failed_txs.keys()) + list(stalled_txs.keys())
|
||||
|
||||
@@ -10,7 +10,7 @@ version = (
|
||||
0,
|
||||
10,
|
||||
0,
|
||||
'alpha.39',
|
||||
'alpha.36',
|
||||
)
|
||||
|
||||
version_object = semver.VersionInfo(
|
||||
|
||||
@@ -6,7 +6,7 @@ set -e
|
||||
# set CONFINI_ENV_PREFIX to override the env prefix to override env vars
|
||||
|
||||
echo "!!! starting signer"
|
||||
python /usr/local/bin/crypto-dev-daemon -c /usr/local/etc/crypto-dev-signer &
|
||||
python /usr/local/bin/crypto-dev-daemon -vv -c /usr/local/etc/crypto-dev-signer &
|
||||
|
||||
echo "!!! starting tracker"
|
||||
/usr/local/bin/cic-eth-taskerd $@
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
cic-base~=0.1.1a10
|
||||
web3==5.12.2
|
||||
celery==4.4.7
|
||||
crypto-dev-signer~=0.4.13rc4
|
||||
crypto-dev-signer~=0.4.13rc3
|
||||
confini~=0.3.6rc3
|
||||
cic-registry~=0.5.3a22
|
||||
cic-bancor~=0.0.6
|
||||
@@ -10,7 +9,7 @@ alembic==1.4.2
|
||||
websockets==8.1
|
||||
requests~=2.24.0
|
||||
eth_accounts_index~=0.0.10a10
|
||||
erc20-transfer-authorization~=0.3.0a10
|
||||
erc20-approval-escrow~=0.3.0a5
|
||||
erc20-single-shot-faucet~=0.2.0a6
|
||||
rlp==2.0.1
|
||||
uWSGI==2.0.19.1
|
||||
@@ -19,6 +18,7 @@ eth-gas-proxy==0.0.1a4
|
||||
websocket-client==0.57.0
|
||||
moolb~=0.1.1b2
|
||||
eth-address-index~=0.1.0a8
|
||||
chainlib~=0.0.1a20
|
||||
chainlib~=0.0.1a19
|
||||
hexathon~=0.0.1a3
|
||||
chainsyncer~=0.0.1a19
|
||||
cic-base==0.1.1a10
|
||||
|
||||
@@ -13,13 +13,13 @@ def celery_includes():
|
||||
return [
|
||||
'cic_eth.eth.bancor',
|
||||
'cic_eth.eth.token',
|
||||
'cic_eth.eth.request',
|
||||
'cic_eth.eth.tx',
|
||||
'cic_eth.ext.tx',
|
||||
'cic_eth.queue.tx',
|
||||
'cic_eth.queue.balance',
|
||||
'cic_eth.admin.ctrl',
|
||||
'cic_eth.admin.nonce',
|
||||
'cic_eth.admin.debug',
|
||||
'cic_eth.eth.account',
|
||||
'cic_eth.callbacks.noop',
|
||||
'cic_eth.callbacks.http',
|
||||
|
||||
@@ -27,7 +27,7 @@ def database_engine(
|
||||
SessionBase.poolable = False
|
||||
dsn = dsn_from_config(load_config)
|
||||
#SessionBase.connect(dsn, True)
|
||||
SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None)
|
||||
SessionBase.connect(dsn, load_config.get('DATABASE_DEBUG') != None)
|
||||
return dsn
|
||||
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@ from eth_keys import KeyAPI
|
||||
from cic_eth.eth import RpcClient
|
||||
from cic_eth.eth.rpc import GasOracle
|
||||
from cic_eth.db.models.role import AccountRole
|
||||
from cic_eth.db.models.nonce import Nonce
|
||||
|
||||
#logg = logging.getLogger(__name__)
|
||||
logg = logging.getLogger()
|
||||
@@ -114,17 +113,11 @@ def init_w3_conn(
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def init_w3(
|
||||
init_database,
|
||||
init_eth_tester,
|
||||
init_eth_account_roles,
|
||||
init_w3_conn,
|
||||
):
|
||||
|
||||
for address in init_w3_conn.eth.accounts:
|
||||
nonce = init_w3_conn.eth.getTransactionCount(address, 'pending')
|
||||
Nonce.init(address, nonce=nonce, session=init_database)
|
||||
init_database.commit()
|
||||
|
||||
yield init_w3_conn
|
||||
logg.debug('mining om nom nom... {}'.format(init_eth_tester.mine_block()))
|
||||
|
||||
@@ -135,10 +128,9 @@ def init_eth_account_roles(
|
||||
w3_account_roles,
|
||||
):
|
||||
|
||||
address = w3_account_roles.get('eth_account_gas_provider')
|
||||
role = AccountRole.set('GAS_GIFTER', address)
|
||||
role = AccountRole.set('GAS_GIFTER', w3_account_roles.get('eth_account_gas_provider'))
|
||||
init_database.add(role)
|
||||
|
||||
init_database.commit()
|
||||
return w3_account_roles
|
||||
|
||||
|
||||
@@ -171,6 +163,7 @@ def w3_account_roles(
|
||||
|
||||
role_ids = [
|
||||
'eth_account_bancor_deployer',
|
||||
'eth_account_gas_provider',
|
||||
'eth_account_reserve_owner',
|
||||
'eth_account_reserve_minter',
|
||||
'eth_account_accounts_index_owner',
|
||||
@@ -179,7 +172,6 @@ def w3_account_roles(
|
||||
'eth_account_sarafu_gifter',
|
||||
'eth_account_approval_owner',
|
||||
'eth_account_faucet_owner',
|
||||
'eth_account_gas_provider',
|
||||
]
|
||||
roles = {}
|
||||
|
||||
@@ -195,7 +187,6 @@ def w3_account_roles(
|
||||
|
||||
return roles
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def w3_account_token_owners(
|
||||
tokens_to_deploy,
|
||||
|
||||
@@ -10,8 +10,6 @@ import web3
|
||||
# local imports
|
||||
from cic_eth.api import AdminApi
|
||||
from cic_eth.db.models.role import AccountRole
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.enum import (
|
||||
StatusEnum,
|
||||
StatusBits,
|
||||
@@ -41,37 +39,18 @@ def test_resend_inplace(
|
||||
c = RpcClient(default_chain_spec)
|
||||
|
||||
sigs = []
|
||||
|
||||
gas_provider = c.gas_provider()
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
init_w3.eth.accounts[0],
|
||||
gas_provider,
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_refill = celery.signature(
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
init_w3.eth.accounts[0],
|
||||
chain_str,
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_nonce.link(s_refill)
|
||||
t = s_nonce.apply_async()
|
||||
t.get()
|
||||
for r in t.collect():
|
||||
pass
|
||||
t = s.apply_async()
|
||||
tx_raw = t.get()
|
||||
assert t.successful()
|
||||
|
||||
q = init_database.query(Otx)
|
||||
q = q.join(TxCache)
|
||||
q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
|
||||
o = q.first()
|
||||
tx_raw = o.signed_tx
|
||||
|
||||
tx_dict = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), default_chain_spec.chain_id())
|
||||
gas_price_before = tx_dict['gasPrice']
|
||||
|
||||
|
||||
@@ -49,7 +49,28 @@ def test_transfer_api(
|
||||
assert t.successful()
|
||||
|
||||
|
||||
@pytest.mark.skip()
|
||||
def test_transfer_approval_api(
|
||||
default_chain_spec,
|
||||
init_w3,
|
||||
cic_registry,
|
||||
init_database,
|
||||
bancor_registry,
|
||||
bancor_tokens,
|
||||
transfer_approval,
|
||||
celery_session_worker,
|
||||
):
|
||||
|
||||
token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
|
||||
approval_contract = CICRegistry.get_contract(default_chain_spec, 'TransferApproval')
|
||||
|
||||
api = Api(str(default_chain_spec), callback_param='transfer_request', callback_task='cic_eth.callbacks.noop.noop', queue=None)
|
||||
t = api.transfer_request(init_w3.eth.accounts[2], init_w3.eth.accounts[4], approval_contract.address(), 111, token.symbol())
|
||||
t.get()
|
||||
#for r in t.collect():
|
||||
# print(r)
|
||||
assert t.successful()
|
||||
|
||||
|
||||
def test_convert_api(
|
||||
default_chain_spec,
|
||||
init_w3,
|
||||
@@ -70,7 +91,6 @@ def test_convert_api(
|
||||
assert t.successful()
|
||||
|
||||
|
||||
@pytest.mark.skip()
|
||||
def test_convert_transfer_api(
|
||||
default_chain_spec,
|
||||
init_w3,
|
||||
|
||||
@@ -9,10 +9,6 @@ from tests.mock.filter import (
|
||||
block_filter,
|
||||
tx_filter,
|
||||
)
|
||||
from cic_eth.db.models.nonce import (
|
||||
Nonce,
|
||||
NonceReservation,
|
||||
)
|
||||
|
||||
|
||||
logg = logging.getLogger()
|
||||
@@ -32,20 +28,9 @@ def test_list_tx(
|
||||
|
||||
tx_hashes = []
|
||||
# external tx
|
||||
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
|
||||
q = init_database.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
|
||||
o = q.first()
|
||||
o.nonce = nonce
|
||||
init_database.add(o)
|
||||
init_database.commit()
|
||||
|
||||
NonceReservation.next(init_w3.eth.accounts[0], 'foo', session=init_database)
|
||||
init_database.commit()
|
||||
|
||||
init_eth_tester.mine_blocks(13)
|
||||
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec, 'foo')
|
||||
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
|
||||
tx_hashes.append(tx_hash_hex)
|
||||
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
|
||||
@@ -57,12 +42,9 @@ def test_list_tx(
|
||||
tx_filter.add(a.to_bytes(4, 'big'))
|
||||
|
||||
# external tx
|
||||
NonceReservation.next(init_w3.eth.accounts[0], 'bar', session=init_database)
|
||||
init_database.commit()
|
||||
|
||||
init_eth_tester.mine_blocks(28)
|
||||
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec, 'bar')
|
||||
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
|
||||
tx_hashes.append(tx_hash_hex)
|
||||
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
|
||||
@@ -74,13 +56,10 @@ def test_list_tx(
|
||||
tx_filter.add(a.to_bytes(4, 'big'))
|
||||
|
||||
# custodial tx
|
||||
#NonceReservation.next(init_w3.eth.accounts[0], 'blinky', session=init_database)
|
||||
#init_database.commit()
|
||||
|
||||
init_eth_tester.mine_blocks(3)
|
||||
#txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||
api = Api(str(default_chain_spec), queue=None)
|
||||
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000, 'DUM') #, 'blinky')
|
||||
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000, 'DUM')
|
||||
t.get()
|
||||
tx_hash_hex = None
|
||||
for c in t.collect():
|
||||
@@ -89,11 +68,9 @@ def test_list_tx(
|
||||
tx_hashes.append(tx_hash_hex)
|
||||
|
||||
# custodial tx
|
||||
#NonceReservation.next(init_w3.eth.accounts[0], 'clyde', session=init_database)
|
||||
init_database.commit()
|
||||
init_eth_tester.mine_blocks(6)
|
||||
api = Api(str(default_chain_spec), queue=None)
|
||||
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 2000, 'DUM') #, 'clyde')
|
||||
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 2000, 'DUM')
|
||||
t.get()
|
||||
tx_hash_hex = None
|
||||
for c in t.collect():
|
||||
|
||||
@@ -64,7 +64,6 @@ def test_register_account(
|
||||
init_database,
|
||||
init_eth_tester,
|
||||
init_w3,
|
||||
init_rpc,
|
||||
cic_registry,
|
||||
celery_session_worker,
|
||||
eth_empty_accounts,
|
||||
@@ -72,27 +71,18 @@ def test_register_account(
|
||||
|
||||
logg.debug('chainspec {}'.format(str(default_chain_spec)))
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
eth_empty_accounts[0],
|
||||
init_w3.eth.accounts[0],
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_register = celery.signature(
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.account.register',
|
||||
[
|
||||
eth_empty_accounts[0],
|
||||
str(default_chain_spec),
|
||||
init_w3.eth.accounts[0],
|
||||
],
|
||||
)
|
||||
s_nonce.link(s_register)
|
||||
t = s_nonce.apply_async()
|
||||
t = s.apply_async()
|
||||
address = t.get()
|
||||
for r in t.collect():
|
||||
pass
|
||||
assert t.successful()
|
||||
r = t.collect()
|
||||
t.successful()
|
||||
|
||||
session = SessionBase.create_session()
|
||||
o = session.query(Otx).first()
|
||||
|
||||
@@ -8,7 +8,6 @@ import celery
|
||||
# local imports
|
||||
from cic_eth.eth.rpc import RpcClient
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db.models.nonce import Nonce
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
|
||||
#logg = logging.getLogger(__name__)
|
||||
@@ -32,38 +31,18 @@ def test_balance_complex(
|
||||
}
|
||||
|
||||
tx_hashes = []
|
||||
|
||||
# TODO: Temporary workaround for nonce db cache initialization being made before deployments.
|
||||
# Instead use different accounts than system ones for transfers for tests
|
||||
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
|
||||
q = init_database.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
|
||||
o = q.first()
|
||||
o.nonce = nonce
|
||||
init_database.add(o)
|
||||
init_database.commit()
|
||||
|
||||
for i in range(3):
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
[token_data],
|
||||
init_w3.eth.accounts[0],
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_transfer = celery.signature(
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.token.transfer',
|
||||
[
|
||||
[token_data],
|
||||
init_w3.eth.accounts[0],
|
||||
init_w3.eth.accounts[1],
|
||||
1000*(i+1),
|
||||
chain_str,
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_nonce.link(s_transfer)
|
||||
t = s_nonce.apply_async()
|
||||
t = s.apply_async()
|
||||
t.get()
|
||||
r = None
|
||||
for c in t.collect():
|
||||
|
||||
@@ -1,19 +1,14 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import os
|
||||
|
||||
# external imports
|
||||
import pytest
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_eth.db import TxConvertTransfer
|
||||
from cic_eth.eth.bancor import BancorTxFactory
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
@pytest.mark.skip()
|
||||
def test_transfer_after_convert(
|
||||
init_w3,
|
||||
init_database,
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
# external imports
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.models.debug import Debug
|
||||
|
||||
|
||||
def test_debug_alert(
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
):
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.admin.debug.alert',
|
||||
[
|
||||
'foo',
|
||||
'bar',
|
||||
'baz',
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
t = s.apply_async()
|
||||
r = t.get()
|
||||
assert r == 'foo'
|
||||
|
||||
q = init_database.query(Debug)
|
||||
q = q.filter(Debug.tag=='bar')
|
||||
o = q.first()
|
||||
assert o.description == 'baz'
|
||||
@@ -1,29 +0,0 @@
|
||||
# external imports
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.models.debug import Debug
|
||||
|
||||
|
||||
def test_debug_alert(
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
):
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.admin.debug.alert',
|
||||
[
|
||||
'foo',
|
||||
'bar',
|
||||
'baz',
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
t = s.apply_async()
|
||||
r = t.get()
|
||||
assert r == 'foo'
|
||||
|
||||
q = init_database.query(Debug)
|
||||
q = q.filter(Debug.tag=='bar')
|
||||
o = q.first()
|
||||
assert o.description == 'baz'
|
||||
@@ -10,9 +10,6 @@ import celery
|
||||
from cic_eth.eth.account import unpack_gift
|
||||
from cic_eth.eth.factory import TxFactory
|
||||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.db.models.nonce import Nonce
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
@@ -35,16 +32,10 @@ def test_faucet(
|
||||
init_database,
|
||||
):
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
init_w3.eth.accounts[7],
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_gift = celery.signature(
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.account.gift',
|
||||
[
|
||||
init_w3.eth.accounts[7],
|
||||
str(default_chain_spec),
|
||||
],
|
||||
)
|
||||
@@ -54,21 +45,15 @@ def test_faucet(
|
||||
str(default_chain_spec),
|
||||
],
|
||||
)
|
||||
s_gift.link(s_send)
|
||||
s_nonce.link(s_gift)
|
||||
t = s_nonce.apply_async()
|
||||
t.get()
|
||||
s.link(s_send)
|
||||
t = s.apply_async()
|
||||
signed_tx = t.get()
|
||||
for r in t.collect():
|
||||
logg.debug('result {}'.format(r))
|
||||
|
||||
assert t.successful()
|
||||
|
||||
q = init_database.query(Otx)
|
||||
q = q.join(TxCache)
|
||||
q = q.filter(TxCache.sender==init_w3.eth.accounts[7])
|
||||
o = q.first()
|
||||
signed_tx = o.signed_tx
|
||||
|
||||
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[2:]), default_chain_spec.chain_id())
|
||||
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[0][2:]), default_chain_spec.chain_id())
|
||||
giveto = unpack_gift(tx['data'])
|
||||
assert giveto['to'] == init_w3.eth.accounts[7]
|
||||
|
||||
|
||||
@@ -30,7 +30,6 @@ def test_refill_gas(
|
||||
default_chain_spec,
|
||||
init_eth_tester,
|
||||
init_rpc,
|
||||
init_w3,
|
||||
init_database,
|
||||
cic_registry,
|
||||
init_eth_account_roles,
|
||||
@@ -45,39 +44,23 @@ def test_refill_gas(
|
||||
refill_amount = c.refill_amount()
|
||||
|
||||
balance = init_rpc.w3.eth.getBalance(receiver_address)
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
eth_empty_accounts[0],
|
||||
provider_address,
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_refill = celery.signature(
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
receiver_address,
|
||||
str(default_chain_spec),
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
|
||||
s_nonce.link(s_refill)
|
||||
t = s_nonce.apply_async()
|
||||
t = s.apply_async()
|
||||
r = t.get()
|
||||
for c in t.collect():
|
||||
pass
|
||||
t.collect()
|
||||
assert t.successful()
|
||||
|
||||
q = init_database.query(Otx)
|
||||
q = q.join(TxCache)
|
||||
q = q.filter(TxCache.recipient==receiver_address)
|
||||
o = q.first()
|
||||
signed_tx = o.signed_tx
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.tx.send',
|
||||
[
|
||||
[signed_tx],
|
||||
[r],
|
||||
str(default_chain_spec),
|
||||
],
|
||||
)
|
||||
@@ -91,11 +74,11 @@ def test_refill_gas(
|
||||
assert balance_new == (balance + refill_amount)
|
||||
|
||||
# Verify that entry is added in TxCache
|
||||
q = init_database.query(Otx)
|
||||
session = SessionBase.create_session()
|
||||
q = session.query(Otx)
|
||||
q = q.join(TxCache)
|
||||
q = q.filter(TxCache.recipient==receiver_address)
|
||||
r = q.first()
|
||||
init_database.commit()
|
||||
|
||||
assert r.status == StatusEnum.SENT
|
||||
|
||||
@@ -103,7 +86,6 @@ def test_refill_gas(
|
||||
def test_refill_deduplication(
|
||||
default_chain_spec,
|
||||
init_rpc,
|
||||
init_w3,
|
||||
init_database,
|
||||
init_eth_account_roles,
|
||||
cic_registry,
|
||||
@@ -117,131 +99,83 @@ def test_refill_deduplication(
|
||||
c = init_rpc
|
||||
refill_amount = c.refill_amount()
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
receiver_address,
|
||||
provider_address,
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_refill = celery.signature(
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
receiver_address,
|
||||
str(default_chain_spec),
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
|
||||
s_nonce.link(s_refill)
|
||||
t = s_nonce.apply_async()
|
||||
t = s.apply_async()
|
||||
r = t.get()
|
||||
for e in t.collect():
|
||||
pass
|
||||
assert t.successful()
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
receiver_address,
|
||||
provider_address,
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_refill = celery.signature(
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
receiver_address,
|
||||
str(default_chain_spec),
|
||||
],
|
||||
)
|
||||
|
||||
s_nonce.link(s_refill)
|
||||
t = s_nonce.apply_async()
|
||||
#with pytest.raises(AlreadyFillingGasError):
|
||||
t.get()
|
||||
for e in t.collect():
|
||||
pass
|
||||
assert t.successful()
|
||||
logg.warning('TODO: complete test by checking that second tx had zero value')
|
||||
t = s.apply_async()
|
||||
with pytest.raises(AlreadyFillingGasError):
|
||||
t.get()
|
||||
|
||||
|
||||
# TODO: check gas is part of the transfer chain, and we cannot create the transfer nonce by uuid before the task. Test is subsumed by transfer task test, but should be tested in isolation
|
||||
#def test_check_gas(
|
||||
# default_chain_spec,
|
||||
# init_eth_tester,
|
||||
# init_w3,
|
||||
# init_rpc,
|
||||
# eth_empty_accounts,
|
||||
# init_database,
|
||||
# cic_registry,
|
||||
# celery_session_worker,
|
||||
# bancor_registry,
|
||||
# bancor_tokens,
|
||||
# ):
|
||||
#
|
||||
# provider_address = init_w3.eth.accounts[0]
|
||||
# gas_receiver_address = eth_empty_accounts[0]
|
||||
# token_receiver_address = init_w3.eth.accounts[1]
|
||||
#
|
||||
## c = init_rpc
|
||||
## txf = TokenTxFactory(gas_receiver_address, c)
|
||||
## tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec, 'foo')
|
||||
##
|
||||
## (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
|
||||
#
|
||||
# token_data = [
|
||||
# {
|
||||
# 'address': bancor_tokens[0],
|
||||
# },
|
||||
# ]
|
||||
#
|
||||
# s_nonce = celery.signature(
|
||||
# 'cic_eth.eth.tx.reserve_nonce',
|
||||
# [
|
||||
# token_data,
|
||||
# init_w3.eth.accounts[0],
|
||||
# ],
|
||||
# queue=None,
|
||||
# )
|
||||
# s_transfer = celery.signature(
|
||||
# 'cic_eth.eth.token.transfer',
|
||||
# [
|
||||
# init_w3.eth.accounts[0],
|
||||
# init_w3.eth.accounts[1],
|
||||
# 1024,
|
||||
# str(default_chain_spec),
|
||||
# ],
|
||||
# queue=None,
|
||||
# )
|
||||
#
|
||||
# gas_price = c.gas_price()
|
||||
# gas_limit = tx_transfer['gas']
|
||||
#
|
||||
# s = celery.signature(
|
||||
# 'cic_eth.eth.tx.check_gas',
|
||||
# [
|
||||
# [tx_hash_hex],
|
||||
# str(default_chain_spec),
|
||||
# [],
|
||||
# gas_receiver_address,
|
||||
# gas_limit * gas_price,
|
||||
# ],
|
||||
# )
|
||||
# s_nonce.link(s_transfer)
|
||||
# t = s_nonce.apply_async()
|
||||
# with pytest.raises(OutOfGasError):
|
||||
# r = t.get()
|
||||
# #assert len(r) == 0
|
||||
#
|
||||
# time.sleep(1)
|
||||
# t.collect()
|
||||
#
|
||||
# session = SessionBase.create_session()
|
||||
# q = session.query(Otx)
|
||||
# q = q.filter(Otx.tx_hash==tx_hash_hex)
|
||||
# r = q.first()
|
||||
# session.close()
|
||||
# assert r.status == StatusEnum.WAITFORGAS
|
||||
def test_check_gas(
|
||||
default_chain_spec,
|
||||
init_eth_tester,
|
||||
init_w3,
|
||||
init_rpc,
|
||||
eth_empty_accounts,
|
||||
init_database,
|
||||
cic_registry,
|
||||
celery_session_worker,
|
||||
bancor_registry,
|
||||
bancor_tokens,
|
||||
):
|
||||
|
||||
provider_address = init_w3.eth.accounts[0]
|
||||
gas_receiver_address = eth_empty_accounts[0]
|
||||
token_receiver_address = init_w3.eth.accounts[1]
|
||||
|
||||
c = init_rpc
|
||||
txf = TokenTxFactory(gas_receiver_address, c)
|
||||
tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec)
|
||||
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
|
||||
|
||||
gas_price = c.gas_price()
|
||||
gas_limit = tx_transfer['gas']
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.tx.check_gas',
|
||||
[
|
||||
[tx_hash_hex],
|
||||
str(default_chain_spec),
|
||||
[],
|
||||
gas_receiver_address,
|
||||
gas_limit * gas_price,
|
||||
],
|
||||
)
|
||||
t = s.apply_async()
|
||||
with pytest.raises(OutOfGasError):
|
||||
r = t.get()
|
||||
#assert len(r) == 0
|
||||
|
||||
time.sleep(1)
|
||||
t.collect()
|
||||
|
||||
session = SessionBase.create_session()
|
||||
q = session.query(Otx)
|
||||
q = q.filter(Otx.tx_hash==tx_hash_hex)
|
||||
r = q.first()
|
||||
session.close()
|
||||
assert r.status == StatusEnum.WAITFORGAS
|
||||
|
||||
|
||||
def test_resend_with_higher_gas(
|
||||
@@ -257,73 +191,39 @@ def test_resend_with_higher_gas(
|
||||
):
|
||||
|
||||
c = init_rpc
|
||||
txf = TokenTxFactory(init_w3.eth.accounts[0], c)
|
||||
|
||||
token_data = {
|
||||
'address': bancor_tokens[0],
|
||||
}
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
[token_data],
|
||||
init_w3.eth.accounts[0],
|
||||
],
|
||||
queue=None,
|
||||
tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec)
|
||||
logg.debug('txtransfer {}'.format(tx_transfer))
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec))
|
||||
logg.debug('signed raw {}'.format(tx_signed_raw_hex))
|
||||
queue_create(
|
||||
tx_transfer['nonce'],
|
||||
tx_transfer['from'],
|
||||
tx_hash_hex,
|
||||
tx_signed_raw_hex,
|
||||
str(default_chain_spec),
|
||||
)
|
||||
s_transfer = celery.signature(
|
||||
'cic_eth.eth.token.transfer',
|
||||
[
|
||||
init_w3.eth.accounts[0],
|
||||
init_w3.eth.accounts[1],
|
||||
1024,
|
||||
str(default_chain_spec),
|
||||
],
|
||||
queue=None,
|
||||
logg.debug('create {}'.format(tx_transfer['from']))
|
||||
cache_transfer_data(
|
||||
tx_hash_hex,
|
||||
tx_transfer, #_signed_raw_hex,
|
||||
)
|
||||
|
||||
# txf = TokenTxFactory(init_w3.eth.accounts[0], c)
|
||||
|
||||
# tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec, 'foo')
|
||||
# logg.debug('txtransfer {}'.format(tx_transfer))
|
||||
# (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec))
|
||||
# logg.debug('signed raw {}'.format(tx_signed_raw_hex))
|
||||
# queue_create(
|
||||
# tx_transfer['nonce'],
|
||||
# tx_transfer['from'],
|
||||
# tx_hash_hex,
|
||||
# tx_signed_raw_hex,
|
||||
# str(default_chain_spec),
|
||||
# )
|
||||
# logg.debug('create {}'.format(tx_transfer['from']))
|
||||
# cache_transfer_data(
|
||||
# tx_hash_hex,
|
||||
# tx_transfer, #_signed_raw_hex,
|
||||
# )
|
||||
s_nonce.link(s_transfer)
|
||||
t = s_nonce.apply_async()
|
||||
t.get()
|
||||
for r in t.collect():
|
||||
pass
|
||||
assert t.successful()
|
||||
|
||||
q = init_database.query(Otx)
|
||||
q = q.join(TxCache)
|
||||
q = q.filter(TxCache.recipient==init_w3.eth.accounts[1])
|
||||
o = q.first()
|
||||
tx_hash_hex = o.tx_hash
|
||||
|
||||
s_resend = celery.signature(
|
||||
'cic_eth.eth.tx.resend_with_higher_gas',
|
||||
[
|
||||
tx_hash_hex,
|
||||
str(default_chain_spec),
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
|
||||
t = s_resend.apply_async()
|
||||
|
||||
i = 0
|
||||
for r in t.collect():
|
||||
pass
|
||||
logg.debug('{} {}'.format(i, r[0].get()))
|
||||
i += 1
|
||||
|
||||
assert t.successful()
|
||||
|
||||
#
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
# third-party imports
|
||||
import pytest
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
@@ -7,92 +6,7 @@ from cic_eth.admin.nonce import shift_nonce
|
||||
from cic_eth.queue.tx import create as queue_create
|
||||
from cic_eth.eth.tx import otx_cache_parse_tx
|
||||
from cic_eth.eth.task import sign_tx
|
||||
from cic_eth.db.models.nonce import (
|
||||
NonceReservation,
|
||||
Nonce
|
||||
)
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
|
||||
|
||||
@pytest.mark.skip()
|
||||
def test_reserve_nonce_task(
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
eth_empty_accounts,
|
||||
):
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
'foo',
|
||||
eth_empty_accounts[0],
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
t = s.apply_async()
|
||||
r = t.get()
|
||||
|
||||
assert r == 'foo'
|
||||
|
||||
q = init_database.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
|
||||
o = q.first()
|
||||
assert o != None
|
||||
|
||||
q = init_database.query(NonceReservation)
|
||||
q = q.filter(NonceReservation.key==str(t))
|
||||
o = q.first()
|
||||
assert o != None
|
||||
|
||||
|
||||
def test_reserve_nonce_chain(
|
||||
default_chain_spec,
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
init_w3,
|
||||
init_rpc,
|
||||
):
|
||||
|
||||
provider_address = init_rpc.gas_provider()
|
||||
q = init_database.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==provider_address)
|
||||
o = q.first()
|
||||
o.nonce = 42
|
||||
init_database.add(o)
|
||||
init_database.commit()
|
||||
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
init_w3.eth.accounts[0],
|
||||
provider_address,
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_gas = celery.signature(
|
||||
'cic_eth.eth.tx.refill_gas',
|
||||
[
|
||||
str(default_chain_spec),
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_nonce.link(s_gas)
|
||||
t = s_nonce.apply_async()
|
||||
r = t.get()
|
||||
for c in t.collect():
|
||||
pass
|
||||
assert t.successful()
|
||||
|
||||
q = init_database.query(Otx)
|
||||
Q = q.join(TxCache)
|
||||
q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
|
||||
o = q.first()
|
||||
|
||||
assert o.nonce == 42
|
||||
|
||||
|
||||
@pytest.mark.skip()
|
||||
def test_shift_nonce(
|
||||
default_chain_spec,
|
||||
init_database,
|
||||
@@ -133,4 +47,3 @@ def test_shift_nonce(
|
||||
for _ in t.collect():
|
||||
pass
|
||||
assert t.successful()
|
||||
|
||||
|
||||
@@ -20,30 +20,21 @@ def test_approve(
|
||||
cic_registry,
|
||||
):
|
||||
|
||||
token_data = [
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.token.approve',
|
||||
[
|
||||
[
|
||||
{
|
||||
'address': bancor_tokens[0],
|
||||
},
|
||||
]
|
||||
s_nonce = celery.signature(
|
||||
'cic_eth.eth.tx.reserve_nonce',
|
||||
[
|
||||
token_data,
|
||||
init_rpc.w3.eth.accounts[0],
|
||||
],
|
||||
queue=None,
|
||||
)
|
||||
s_approve = celery.signature(
|
||||
'cic_eth.eth.token.approve',
|
||||
[
|
||||
],
|
||||
init_rpc.w3.eth.accounts[0],
|
||||
init_rpc.w3.eth.accounts[1],
|
||||
1024,
|
||||
str(default_chain_spec),
|
||||
],
|
||||
)
|
||||
s_nonce.link(s_approve)
|
||||
t = s_nonce.apply_async()
|
||||
t = s.apply_async()
|
||||
t.get()
|
||||
for r in t.collect():
|
||||
logg.debug('result {}'.format(r))
|
||||
|
||||
76
apps/cic-eth/tests/tasks/test_transfer_approval.py
Normal file
76
apps/cic-eth/tests/tasks/test_transfer_approval.py
Normal file
@@ -0,0 +1,76 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import time
|
||||
|
||||
# third-party imports
|
||||
from erc20_approval_escrow import TransferApproval
|
||||
import celery
|
||||
import sha3
|
||||
|
||||
# local imports
|
||||
from cic_eth.eth.token import TokenTxFactory
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
# BUG: transaction receipt only found sometimes
|
||||
def test_transfer_approval(
|
||||
default_chain_spec,
|
||||
transfer_approval,
|
||||
bancor_tokens,
|
||||
w3_account_roles,
|
||||
eth_empty_accounts,
|
||||
cic_registry,
|
||||
init_database,
|
||||
celery_session_worker,
|
||||
init_eth_tester,
|
||||
init_w3,
|
||||
):
|
||||
|
||||
s = celery.signature(
|
||||
'cic_eth.eth.request.transfer_approval_request',
|
||||
[
|
||||
[
|
||||
{
|
||||
'address': bancor_tokens[0],
|
||||
},
|
||||
],
|
||||
w3_account_roles['eth_account_sarafu_owner'],
|
||||
eth_empty_accounts[0],
|
||||
1024,
|
||||
str(default_chain_spec),
|
||||
],
|
||||
)
|
||||
|
||||
s_send = celery.signature(
|
||||
'cic_eth.eth.tx.send',
|
||||
[
|
||||
str(default_chain_spec),
|
||||
],
|
||||
|
||||
)
|
||||
s.link(s_send)
|
||||
t = s.apply_async()
|
||||
|
||||
tx_signed_raws = t.get()
|
||||
for r in t.collect():
|
||||
logg.debug('result {}'.format(r))
|
||||
|
||||
assert t.successful()
|
||||
|
||||
init_eth_tester.mine_block()
|
||||
|
||||
h = sha3.keccak_256()
|
||||
tx_signed_raw = tx_signed_raws[0]
|
||||
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw[2:])
|
||||
h.update(tx_signed_raw_bytes)
|
||||
tx_hash = h.digest()
|
||||
rcpt = init_w3.eth.getTransactionReceipt(tx_hash)
|
||||
|
||||
assert rcpt.status == 1
|
||||
|
||||
a = TransferApproval(init_w3, transfer_approval)
|
||||
assert a.last_serial() == 1
|
||||
|
||||
logg.debug('requests {}'.format(a.requests(1)['serial']))
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
# local imports
|
||||
from cic_eth.db.models.debug import Debug
|
||||
|
||||
|
||||
def test_debug(
|
||||
init_database,
|
||||
):
|
||||
|
||||
o = Debug('foo', 'bar')
|
||||
init_database.add(o)
|
||||
init_database.commit()
|
||||
|
||||
q = init_database.query(Debug)
|
||||
q = q.filter(Debug.tag=='foo')
|
||||
o = q.first()
|
||||
assert o.description == 'bar'
|
||||
@@ -1,29 +1,8 @@
|
||||
# third-party imports
|
||||
import pytest
|
||||
import uuid
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.models.nonce import (
|
||||
Nonce,
|
||||
NonceReservation,
|
||||
)
|
||||
from cic_eth.error import (
|
||||
InitializationError,
|
||||
IntegrityError,
|
||||
)
|
||||
|
||||
|
||||
def test_nonce_init(
|
||||
init_database,
|
||||
eth_empty_accounts,
|
||||
):
|
||||
|
||||
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
|
||||
init_database.commit()
|
||||
|
||||
with pytest.raises(InitializationError):
|
||||
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
|
||||
|
||||
from cic_eth.db.models.nonce import Nonce
|
||||
|
||||
def test_nonce_increment(
|
||||
init_database,
|
||||
@@ -31,46 +10,11 @@ def test_nonce_increment(
|
||||
database_engine,
|
||||
):
|
||||
|
||||
# if database_engine[:6] == 'sqlite':
|
||||
# pytest.skip('sqlite cannot lock tables which is required for this test, skipping')
|
||||
|
||||
nonce = Nonce.next(eth_empty_accounts[0], 3)
|
||||
assert nonce == 3
|
||||
|
||||
nonce = Nonce.next(eth_empty_accounts[0], 3)
|
||||
assert nonce == 4
|
||||
|
||||
|
||||
def test_nonce_reserve(
|
||||
init_database,
|
||||
eth_empty_accounts,
|
||||
):
|
||||
|
||||
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
|
||||
init_database.commit()
|
||||
uu = uuid.uuid4()
|
||||
nonce = NonceReservation.next(eth_empty_accounts[0], str(uu), session=init_database)
|
||||
init_database.commit()
|
||||
assert nonce == 42
|
||||
|
||||
q = init_database.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
|
||||
o = q.first()
|
||||
assert o.nonce == 43
|
||||
|
||||
nonce = NonceReservation.release(str(uu))
|
||||
init_database.commit()
|
||||
assert nonce == 42
|
||||
|
||||
q = init_database.query(NonceReservation)
|
||||
q = q.filter(NonceReservation.key==str(uu))
|
||||
o = q.first()
|
||||
assert o == None
|
||||
|
||||
|
||||
def test_nonce_reserve_integrity(
|
||||
init_database,
|
||||
eth_empty_accounts,
|
||||
):
|
||||
|
||||
uu = uuid.uuid4()
|
||||
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
|
||||
with pytest.raises(IntegrityError):
|
||||
NonceReservation.release(str(uu))
|
||||
|
||||
@@ -17,7 +17,6 @@ from cic_eth.db.models.tx import TxCache
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
@pytest.mark.skip()
|
||||
def test_resolve_converters_by_tokens(
|
||||
cic_registry,
|
||||
init_w3,
|
||||
@@ -44,7 +43,6 @@ def test_resolve_converters_by_tokens(
|
||||
assert len(t['converters']) == 1
|
||||
|
||||
|
||||
@pytest.mark.skip()
|
||||
def test_unpack_convert(
|
||||
default_chain_spec,
|
||||
cic_registry,
|
||||
@@ -86,7 +84,6 @@ def test_unpack_convert(
|
||||
assert convert_data['fee'] == 0
|
||||
|
||||
|
||||
@pytest.mark.skip()
|
||||
def test_queue_cache_convert(
|
||||
default_chain_spec,
|
||||
init_w3,
|
||||
|
||||
51
apps/cic-eth/tests/unit/eth/test_nonce.py
Normal file
51
apps/cic-eth/tests/unit/eth/test_nonce.py
Normal file
@@ -0,0 +1,51 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from cic_eth.eth.nonce import NonceOracle
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def test_nonce_sequence(
|
||||
eth_empty_accounts,
|
||||
init_database,
|
||||
init_rpc,
|
||||
):
|
||||
|
||||
account= init_rpc.w3.eth.personal.new_account('')
|
||||
no = NonceOracle(account, 0)
|
||||
n = no.next()
|
||||
assert n == 0
|
||||
|
||||
n = no.next()
|
||||
assert n == 1
|
||||
|
||||
init_rpc.w3.eth.sendTransaction({
|
||||
'from': init_rpc.w3.eth.accounts[0],
|
||||
'to': account,
|
||||
'value': 200000000,
|
||||
})
|
||||
init_rpc.w3.eth.sendTransaction({
|
||||
'from': account,
|
||||
'to': eth_empty_accounts[0],
|
||||
'value': 100,
|
||||
})
|
||||
|
||||
c = init_rpc.w3.eth.getTransactionCount(account, 'pending')
|
||||
logg.debug('nonce {}'.format(c))
|
||||
|
||||
account= init_rpc.w3.eth.personal.new_account('')
|
||||
no = NonceOracle(account, c)
|
||||
|
||||
n = no.next()
|
||||
assert n == 1
|
||||
|
||||
n = no.next()
|
||||
assert n == 2
|
||||
|
||||
# try with bogus value
|
||||
no = NonceOracle(account, 4)
|
||||
n = no.next()
|
||||
assert n == 3
|
||||
|
||||
@@ -11,14 +11,12 @@ from cic_eth.eth.util import unpack_signed_raw_tx
|
||||
from cic_eth.queue.tx import create as queue_create
|
||||
from cic_eth.db.models.otx import Otx
|
||||
from cic_eth.db.models.tx import TxCache
|
||||
from cic_eth.db.models.nonce import NonceReservation
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def test_unpack_transfer(
|
||||
default_chain_spec,
|
||||
init_database,
|
||||
init_w3,
|
||||
init_rpc,
|
||||
cic_registry,
|
||||
@@ -26,9 +24,6 @@ def test_unpack_transfer(
|
||||
bancor_registry,
|
||||
):
|
||||
|
||||
NonceReservation.next(init_w3.eth.accounts[0], 'foo', init_database)
|
||||
init_database.commit()
|
||||
|
||||
source_token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
|
||||
logg.debug('bancor tokens {} {}'.format(bancor_tokens, source_token))
|
||||
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||
@@ -37,7 +32,6 @@ def test_unpack_transfer(
|
||||
init_w3.eth.accounts[1],
|
||||
42,
|
||||
default_chain_spec,
|
||||
'foo',
|
||||
)
|
||||
s = init_w3.eth.sign_transaction(transfer_tx)
|
||||
s_bytes = bytes.fromhex(s['raw'][2:])
|
||||
@@ -62,9 +56,6 @@ def test_queue_cache_transfer(
|
||||
bancor_registry,
|
||||
):
|
||||
|
||||
NonceReservation.next(init_w3.eth.accounts[0], 'foo', init_database)
|
||||
init_database.commit()
|
||||
|
||||
source_token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
|
||||
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||
value = 42
|
||||
@@ -73,7 +64,6 @@ def test_queue_cache_transfer(
|
||||
init_w3.eth.accounts[1],
|
||||
value,
|
||||
default_chain_spec,
|
||||
'foo',
|
||||
)
|
||||
tx_signed = init_w3.eth.sign_transaction(transfer_tx)
|
||||
tx_hash = init_w3.eth.sendRawTransaction(tx_signed['raw'])
|
||||
|
||||
@@ -8,17 +8,12 @@ import moolb
|
||||
# local imports
|
||||
from cic_eth.eth.token import TokenTxFactory
|
||||
from cic_eth.eth.task import sign_tx
|
||||
from cic_eth.db.models.nonce import (
|
||||
NonceReservation,
|
||||
Nonce,
|
||||
)
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
# TODO: This test fails when not run alone. Identify which fixture leaves a dirty state
|
||||
def test_filter_process(
|
||||
init_database,
|
||||
init_rpc,
|
||||
default_chain_spec,
|
||||
default_chain_registry,
|
||||
@@ -34,22 +29,9 @@ def test_filter_process(
|
||||
|
||||
tx_hashes = []
|
||||
# external tx
|
||||
|
||||
# TODO: it does not make sense to use the db setup for nonce here, but we need it as long as we are using the factory to assemble to tx
|
||||
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
|
||||
q = init_database.query(Nonce)
|
||||
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
|
||||
o = q.first()
|
||||
o.nonce = nonce
|
||||
init_database.add(o)
|
||||
init_database.commit()
|
||||
|
||||
NonceReservation.next(init_w3.eth.accounts[0], 'foo', init_database)
|
||||
init_database.commit()
|
||||
|
||||
init_eth_tester.mine_blocks(13)
|
||||
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec, 'foo')
|
||||
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
|
||||
tx_hashes.append(tx_hash_hex)
|
||||
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
|
||||
@@ -61,12 +43,9 @@ def test_filter_process(
|
||||
t.add(a.to_bytes(4, 'big'))
|
||||
|
||||
# external tx
|
||||
NonceReservation.next(init_w3.eth.accounts[0], 'bar', init_database)
|
||||
init_database.commit()
|
||||
|
||||
init_eth_tester.mine_blocks(28)
|
||||
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
||||
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec, 'bar')
|
||||
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec)
|
||||
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
|
||||
tx_hashes.append(tx_hash_hex)
|
||||
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# local imports
|
||||
from cic_eth.queue.tx import get_status_tx
|
||||
from cic_eth.db.enum import (
|
||||
StatusEnum,
|
||||
StatusBits,
|
||||
)
|
||||
from cic_eth.queue.tx import create as queue_create
|
||||
from cic_eth.eth.tx import cache_gas_refill_data
|
||||
from cic_eth.db.models.otx import Otx
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def test_status_tx_list(
|
||||
default_chain_spec,
|
||||
init_database,
|
||||
init_w3,
|
||||
):
|
||||
|
||||
tx = {
|
||||
'from': init_w3.eth.accounts[0],
|
||||
'to': init_w3.eth.accounts[1],
|
||||
'nonce': 42,
|
||||
'gas': 21000,
|
||||
'gasPrice': 1000000,
|
||||
'value': 128,
|
||||
'chainId': 666,
|
||||
'data': '',
|
||||
}
|
||||
logg.debug('nonce {}'.format(tx['nonce']))
|
||||
tx_signed = init_w3.eth.sign_transaction(tx)
|
||||
#tx_hash = RpcClient.w3.keccak(hexstr=tx_signed['raw'])
|
||||
tx_hash = init_w3.keccak(hexstr=tx_signed['raw'])
|
||||
queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec))
|
||||
cache_gas_refill_data(tx_hash.hex(), tx)
|
||||
tx_hash_hex = tx_hash.hex()
|
||||
|
||||
q = init_database.query(Otx)
|
||||
otx = q.get(1)
|
||||
otx.sendfail(session=init_database)
|
||||
init_database.add(otx)
|
||||
init_database.commit()
|
||||
init_database.refresh(otx)
|
||||
|
||||
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
|
||||
assert len(txs) == 1
|
||||
|
||||
otx.sendfail(session=init_database)
|
||||
otx.retry(session=init_database)
|
||||
init_database.add(otx)
|
||||
init_database.commit()
|
||||
init_database.refresh(otx)
|
||||
|
||||
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
|
||||
assert len(txs) == 1
|
||||
|
||||
txs = get_status_tx(StatusBits.QUEUED, session=init_database)
|
||||
assert len(txs) == 1
|
||||
|
||||
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.LOCAL_ERROR, session=init_database)
|
||||
assert len(txs) == 0
|
||||
|
||||
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK, session=init_database)
|
||||
assert len(txs) == 1
|
||||
|
||||
txs = get_status_tx(StatusBits.IN_NETWORK, session=init_database)
|
||||
assert len(txs) == 0
|
||||
|
||||
1
apps/cic-meta/.gitignore
vendored
1
apps/cic-meta/.gitignore
vendored
@@ -1,6 +1,5 @@
|
||||
node_modules
|
||||
dist
|
||||
dist-web
|
||||
dist-server
|
||||
scratch
|
||||
tests
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
* 0.0.7-pending
|
||||
- Add immutable content
|
||||
- Add db lock on server
|
||||
- Add ArgPair and KeyStore to src exports
|
||||
* 0.0.6
|
||||
- Add server build
|
||||
* 0.0.5
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "cic-client-meta",
|
||||
"version": "0.0.7-alpha.3",
|
||||
"version": "0.0.7-alpha.2",
|
||||
"description": "Signed CRDT metadata graphs for the CIC network",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
@@ -32,12 +32,6 @@
|
||||
"webpack-cli": "^4.2.0"
|
||||
},
|
||||
"author": "Louis Holbrook <dev@holbrook.no>",
|
||||
"contributors": [
|
||||
{
|
||||
"name": "Spencer Ofwiti",
|
||||
"email": "maxspencer56@gmail.com"
|
||||
}
|
||||
],
|
||||
"license": "GPL-3.0-or-later",
|
||||
"engines": {
|
||||
"node": "~15.3.0"
|
||||
|
||||
@@ -101,18 +101,6 @@ function parseDigest(url) {
|
||||
|
||||
async function processRequest(req, res) {
|
||||
let digest = undefined;
|
||||
const headers = {
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
"Access-Control-Allow-Methods": "OPTIONS, POST, GET, PUT",
|
||||
"Access-Control-Max-Age": 2592000, // 30 days
|
||||
"Access-Control-Allow-Headers": 'Access-Control-Allow-Origin, Content-Type, x-cic-automerge'
|
||||
};
|
||||
|
||||
if (req.method === "OPTIONS") {
|
||||
res.writeHead(200, headers);
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!['PUT', 'GET', 'POST'].includes(req.method)) {
|
||||
res.writeHead(405, {"Content-Type": "text/plain"});
|
||||
@@ -213,7 +201,6 @@ async function processRequest(req, res) {
|
||||
|
||||
const responseContentLength = (new TextEncoder().encode(content)).length;
|
||||
res.writeHead(200, {
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
"Content-Type": contentType,
|
||||
"Content-Length": responseContentLength,
|
||||
});
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
export { PGPSigner, PGPKeyStore, Signer, KeyStore } from './auth';
|
||||
export { ArgPair, Envelope, Syncable } from './sync';
|
||||
export { PGPSigner, PGPKeyStore } from './auth';
|
||||
export { Envelope, Syncable } from './sync';
|
||||
export { User } from './assets/user';
|
||||
export { Phone } from './assets/phone';
|
||||
export { Config } from './config';
|
||||
|
||||
@@ -93,7 +93,7 @@ function orderDict(src) {
|
||||
return dst;
|
||||
}
|
||||
|
||||
class Syncable implements JSONSerializable, Authoritative, Signable {
|
||||
class Syncable implements JSONSerializable, Authoritative {
|
||||
|
||||
id: string
|
||||
timestamp: number
|
||||
|
||||
@@ -7,8 +7,10 @@ PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
|
||||
SERVICE_CODE=*483*46#
|
||||
|
||||
[ussd]
|
||||
MENU_FILE=/usr/src/data/ussd_menu.json
|
||||
MENU_FILE=/usr/local/lib/python3.8/site-packages/cic_ussd/db/ussd_menu.json
|
||||
|
||||
[statemachine]
|
||||
STATES=/usr/src/cic-ussd/states/
|
||||
TRANSITIONS=/usr/src/cic-ussd/transitions/
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,2 @@
|
||||
[cic]
|
||||
engine = evm
|
||||
common_name = bloxberg
|
||||
network_id = 8996
|
||||
meta_url = http://localhost:63380
|
||||
chain_spec = Bloxberg:8995
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[database]
|
||||
NAME=cic_ussd
|
||||
USER=postgres
|
||||
PASSWORD=
|
||||
PASSWORD=password
|
||||
HOST=localhost
|
||||
PORT=5432
|
||||
ENGINE=postgresql
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
[pgp]
|
||||
export_dir = /usr/src/pgp/keys/
|
||||
keys_path = /usr/src/secrets/
|
||||
private_keys = privatekeys_meta.asc
|
||||
passphrase =
|
||||
@@ -7,8 +7,8 @@ PASSWORD_PEPPER=QYbzKff6NhiQzY3ygl2BkiKOpER8RE/Upqs/5aZWW+I=
|
||||
SERVICE_CODE=*483*46#
|
||||
|
||||
[ussd]
|
||||
MENU_FILE=/usr/local/lib/python3.8/site-packages/cic_ussd/db/ussd_menu.json
|
||||
MENU_FILE=cic_ussd/db/ussd_menu.json
|
||||
|
||||
[statemachine]
|
||||
STATES=/usr/src/cic-ussd/states/
|
||||
TRANSITIONS=/usr/src/cic-ussd/transitions/
|
||||
STATES=states/
|
||||
TRANSITIONS=transitions/
|
||||
|
||||
@@ -1,5 +1,2 @@
|
||||
[cic]
|
||||
engine = evm
|
||||
common_name = bloxberg
|
||||
network_id = 8996
|
||||
meta_url = http://localhost:63380
|
||||
chain_spec = Bloxberg:8995
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
[pgp]
|
||||
export_dir = /usr/src/pgp/keys/
|
||||
keys_path = /usr/src/secrets/
|
||||
private_keys = privatekeys_meta.asc
|
||||
passphrase =
|
||||
@@ -1,49 +0,0 @@
|
||||
# standard imports
|
||||
import json
|
||||
|
||||
# third-party imports
|
||||
from cic_eth.api import Api
|
||||
from cic_types.models.person import Person
|
||||
from cic_types.processor import generate_metadata_pointer
|
||||
|
||||
# local imports
|
||||
from cic_ussd.chain import Chain
|
||||
from cic_ussd.db.models.user import User
|
||||
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
||||
from cic_ussd.redis import get_cached_data
|
||||
|
||||
|
||||
def define_account_tx_metadata(user: User):
|
||||
# get sender metadata
|
||||
identifier = blockchain_address_to_metadata_pointer(
|
||||
blockchain_address=user.blockchain_address
|
||||
)
|
||||
key = generate_metadata_pointer(
|
||||
identifier=identifier,
|
||||
cic_type='cic.person'
|
||||
)
|
||||
account_metadata = get_cached_data(key=key)
|
||||
|
||||
if account_metadata:
|
||||
account_metadata = json.loads(account_metadata)
|
||||
person = Person()
|
||||
deserialized_person = person.deserialize(metadata=account_metadata)
|
||||
given_name = deserialized_person.given_name
|
||||
family_name = deserialized_person.family_name
|
||||
phone_number = deserialized_person.tel
|
||||
|
||||
return f'{given_name} {family_name} {phone_number}'
|
||||
else:
|
||||
phone_number = user.phone_number
|
||||
return phone_number
|
||||
|
||||
|
||||
def retrieve_account_statement(blockchain_address: str):
|
||||
chain_str = Chain.spec.__str__()
|
||||
cic_eth_api = Api(
|
||||
chain_str=chain_str,
|
||||
callback_queue='cic-ussd',
|
||||
callback_task='cic_ussd.tasks.callback_handler.process_statement_callback',
|
||||
callback_param=blockchain_address
|
||||
)
|
||||
result = cic_eth_api.list(address=blockchain_address, limit=9)
|
||||
39
apps/cic-ussd/cic_ussd/accounts.py
Normal file
39
apps/cic-ussd/cic_ussd/accounts.py
Normal file
@@ -0,0 +1,39 @@
|
||||
# standard imports
|
||||
import logging
|
||||
from collections import deque
|
||||
|
||||
# third-party imports
|
||||
from cic_eth.api import Api
|
||||
|
||||
# local imports
|
||||
from cic_ussd.transactions import from_wei
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class BalanceManager:
|
||||
|
||||
def __init__(self, address: str, chain_str: str, token_symbol: str):
|
||||
"""
|
||||
:param address: Ethereum address of account whose balance is being queried
|
||||
:type address: str, 0x-hex
|
||||
:param chain_str: The chain name and network id.
|
||||
:type chain_str: str
|
||||
:param token_symbol: ERC20 token symbol of whose balance is being queried
|
||||
:type token_symbol: str
|
||||
"""
|
||||
self.address = address
|
||||
self.chain_str = chain_str
|
||||
self.token_symbol = token_symbol
|
||||
|
||||
def get_operational_balance(self) -> float:
|
||||
"""This question queries cic-eth for an account's balance
|
||||
:return: The current balance of the account as reflected on the blockchain.
|
||||
:rtype: int
|
||||
"""
|
||||
cic_eth_api = Api(chain_str=self.chain_str, callback_task=None)
|
||||
balance_request_task = cic_eth_api.balance(address=self.address, token_symbol=self.token_symbol)
|
||||
balance_request_task_results = balance_request_task.collect()
|
||||
balance_result = deque(balance_request_task_results, maxlen=1).pop()
|
||||
balance = from_wei(value=balance_result[-1])
|
||||
return balance
|
||||
@@ -1,90 +0,0 @@
|
||||
# standard imports
|
||||
import json
|
||||
import logging
|
||||
from typing import Union
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
from cic_eth.api import Api
|
||||
|
||||
# local imports
|
||||
from cic_ussd.error import CachedDataNotFoundError
|
||||
from cic_ussd.redis import create_cached_data_key, get_cached_data
|
||||
from cic_ussd.conversions import from_wei
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class BalanceManager:
|
||||
|
||||
def __init__(self, address: str, chain_str: str, token_symbol: str):
|
||||
"""
|
||||
:param address: Ethereum address of account whose balance is being queried
|
||||
:type address: str, 0x-hex
|
||||
:param chain_str: The chain name and network id.
|
||||
:type chain_str: str
|
||||
:param token_symbol: ERC20 token symbol of whose balance is being queried
|
||||
:type token_symbol: str
|
||||
"""
|
||||
self.address = address
|
||||
self.chain_str = chain_str
|
||||
self.token_symbol = token_symbol
|
||||
|
||||
def get_balances(self, asynchronous: bool = False) -> Union[celery.Task, dict]:
|
||||
"""
|
||||
This function queries cic-eth for an account's balances, It provides a means to receive the balance either
|
||||
asynchronously or synchronously depending on the provided value for teh asynchronous parameter. It returns a
|
||||
dictionary containing network, outgoing and incoming balances.
|
||||
:param asynchronous: Boolean value checking whether to return balances asynchronously
|
||||
:type asynchronous: bool
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
if asynchronous:
|
||||
cic_eth_api = Api(
|
||||
chain_str=self.chain_str,
|
||||
callback_queue='cic-ussd',
|
||||
callback_task='cic_ussd.tasks.callback_handler.process_balances_callback',
|
||||
callback_param=''
|
||||
)
|
||||
cic_eth_api.balance(address=self.address, token_symbol=self.token_symbol)
|
||||
else:
|
||||
cic_eth_api = Api(chain_str=self.chain_str)
|
||||
balance_request_task = cic_eth_api.balance(
|
||||
address=self.address,
|
||||
token_symbol=self.token_symbol)
|
||||
return balance_request_task.get()[0]
|
||||
|
||||
|
||||
def compute_operational_balance(balances: dict) -> float:
|
||||
"""This function calculates the right balance given incoming and outgoing
|
||||
:param balances:
|
||||
:type balances:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
incoming_balance = balances.get('balance_incoming')
|
||||
outgoing_balance = balances.get('balance_outgoing')
|
||||
network_balance = balances.get('balance_network')
|
||||
|
||||
operational_balance = (network_balance + incoming_balance) - outgoing_balance
|
||||
return from_wei(value=operational_balance)
|
||||
|
||||
|
||||
def get_cached_operational_balance(blockchain_address: str):
|
||||
"""
|
||||
:param blockchain_address:
|
||||
:type blockchain_address:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
key = create_cached_data_key(
|
||||
identifier=bytes.fromhex(blockchain_address[2:]),
|
||||
salt='cic.balances_data'
|
||||
)
|
||||
cached_balance = get_cached_data(key=key)
|
||||
if cached_balance:
|
||||
operational_balance = compute_operational_balance(balances=json.loads(cached_balance))
|
||||
return operational_balance
|
||||
else:
|
||||
raise CachedDataNotFoundError('Cached operational balance not found.')
|
||||
@@ -1,10 +0,0 @@
|
||||
# local imports
|
||||
|
||||
# third-party imports
|
||||
from chainlib.chain import ChainSpec
|
||||
|
||||
# local imports
|
||||
|
||||
|
||||
class Chain:
|
||||
spec: ChainSpec = None
|
||||
@@ -1,41 +0,0 @@
|
||||
# standard imports
|
||||
import decimal
|
||||
|
||||
# third-party imports
|
||||
|
||||
# local imports
|
||||
|
||||
|
||||
def truncate(value: float, decimals: int):
|
||||
"""This function truncates a value to a specified number of decimals places.
|
||||
:param value: The value to be truncated.
|
||||
:type value: float
|
||||
:param decimals: The number of decimals for the value to be truncated to
|
||||
:type decimals: int
|
||||
:return: The truncated value.
|
||||
:rtype: int
|
||||
"""
|
||||
decimal.getcontext().rounding = decimal.ROUND_DOWN
|
||||
contextualized_value = decimal.Decimal(value)
|
||||
return round(contextualized_value, decimals)
|
||||
|
||||
|
||||
def from_wei(value: int) -> float:
|
||||
"""This function converts values in Wei to a token in the cic network.
|
||||
:param value: Value in Wei
|
||||
:type value: int
|
||||
:return: SRF equivalent of value in Wei
|
||||
:rtype: float
|
||||
"""
|
||||
value = float(value) / 1e+6
|
||||
return truncate(value=value, decimals=2)
|
||||
|
||||
|
||||
def to_wei(value: int) -> int:
|
||||
"""This functions converts values from a token in the cic network to Wei.
|
||||
:param value: Value in SRF
|
||||
:type value: int
|
||||
:return: Wei equivalent of value in SRF
|
||||
:rtype: int
|
||||
"""
|
||||
return int(value * 1e+6)
|
||||
@@ -1,237 +1,213 @@
|
||||
{
|
||||
"ussd_menu": {
|
||||
"1": {
|
||||
"description": "Entry point for users to select their preferred language.",
|
||||
"display_key": "ussd.kenya.initial_language_selection",
|
||||
"name": "initial_language_selection",
|
||||
"parent": null
|
||||
},
|
||||
"2": {
|
||||
"description": "Entry point for users to enter a pin to secure their account.",
|
||||
"display_key": "ussd.kenya.initial_pin_entry",
|
||||
"name": "initial_pin_entry",
|
||||
"parent": null
|
||||
},
|
||||
"3": {
|
||||
"description": "Pin confirmation entry menu.",
|
||||
"display_key": "ussd.kenya.initial_pin_confirmation",
|
||||
"name": "initial_pin_confirmation",
|
||||
"parent": "initial_pin_entry"
|
||||
},
|
||||
"4": {
|
||||
"description": "The signup process has been initiated and the account is being created.",
|
||||
"description": "The self signup process has been initiated and the account is being created",
|
||||
"display_key": "ussd.kenya.account_creation_prompt",
|
||||
"name": "account_creation_prompt",
|
||||
"parent": null
|
||||
},
|
||||
"2": {
|
||||
"description": "Start menu. This is the entry point for users to select their preferred language",
|
||||
"display_key": "ussd.kenya.initial_language_selection",
|
||||
"name": "initial_language_selection",
|
||||
"parent": null
|
||||
},
|
||||
"3": {
|
||||
"description": "PIN setup entry menu",
|
||||
"display_key": "ussd.kenya.initial_pin_entry",
|
||||
"name": "initial_pin_entry",
|
||||
"parent": "initial_language_selection"
|
||||
},
|
||||
"4": {
|
||||
"description": "Confirm new PIN menu",
|
||||
"display_key": "ussd.kenya.initial_pin_confirmation",
|
||||
"name": "initial_pin_confirmation",
|
||||
"parent": "initial_pin_entry"
|
||||
},
|
||||
"5": {
|
||||
"description": "Entry point for activated users.",
|
||||
"description": "Start menu. This is the entry point for activated users",
|
||||
"display_key": "ussd.kenya.start",
|
||||
"name": "start",
|
||||
"parent": null
|
||||
},
|
||||
"6": {
|
||||
"description": "Given name entry menu.",
|
||||
"display_key": "ussd.kenya.enter_given_name",
|
||||
"name": "enter_given_name",
|
||||
"parent": "metadata_management"
|
||||
},
|
||||
"7": {
|
||||
"description": "Family name entry menu.",
|
||||
"display_key": "ussd.kenya.enter_family_name",
|
||||
"name": "enter_family_name",
|
||||
"parent": "metadata_management"
|
||||
},
|
||||
"8": {
|
||||
"description": "Gender entry menu.",
|
||||
"display_key": "ussd.kenya.enter_gender",
|
||||
"name": "enter_gender",
|
||||
"parent": "metadata_management"
|
||||
},
|
||||
"9": {
|
||||
"description": "Age entry menu.",
|
||||
"display_key": "ussd.kenya.enter_gender",
|
||||
"name": "enter_gender",
|
||||
"parent": "metadata_management"
|
||||
},
|
||||
"10": {
|
||||
"description": "Location entry menu.",
|
||||
"display_key": "ussd.kenya.enter_location",
|
||||
"name": "enter_location",
|
||||
"parent": "metadata_management"
|
||||
},
|
||||
"11": {
|
||||
"description": "Products entry menu.",
|
||||
"display_key": "ussd.kenya.enter_products",
|
||||
"name": "enter_products",
|
||||
"parent": "metadata_management"
|
||||
},
|
||||
"12": {
|
||||
"description": "Entry point for activated users.",
|
||||
"display_key": "ussd.kenya.start",
|
||||
"name": "start",
|
||||
"parent": null
|
||||
},
|
||||
"13": {
|
||||
"description": "Send Token recipient entry.",
|
||||
"description": "Send Token recipient entry",
|
||||
"display_key": "ussd.kenya.enter_transaction_recipient",
|
||||
"name": "enter_transaction_recipient",
|
||||
"parent": "start"
|
||||
},
|
||||
"14": {
|
||||
"description": "Send Token amount prompt menu.",
|
||||
"7": {
|
||||
"description": "Send Token amount prompt menu",
|
||||
"display_key": "ussd.kenya.enter_transaction_amount",
|
||||
"name": "enter_transaction_amount",
|
||||
"parent": "start"
|
||||
},
|
||||
"15": {
|
||||
"description": "Pin entry for authorization to send token.",
|
||||
"8": {
|
||||
"description": "PIN entry for authorization to send token",
|
||||
"display_key": "ussd.kenya.transaction_pin_authorization",
|
||||
"name": "transaction_pin_authorization",
|
||||
"parent": "start"
|
||||
},
|
||||
"16": {
|
||||
"description": "Manage account menu.",
|
||||
"display_key": "ussd.kenya.account_management",
|
||||
"name": "account_management",
|
||||
"9": {
|
||||
"description": "Terminal of a menu flow where an SMS is expected after.",
|
||||
"display_key": "ussd.kenya.complete",
|
||||
"name": "complete",
|
||||
"parent": null
|
||||
},
|
||||
"10": {
|
||||
"description": "Help menu",
|
||||
"display_key": "ussd.kenya.help",
|
||||
"name": "help",
|
||||
"parent": "start"
|
||||
},
|
||||
"17": {
|
||||
"description": "Manage metadata menu.",
|
||||
"display_key": "ussd.kenya.metadata_management",
|
||||
"name": "metadata_management",
|
||||
"11": {
|
||||
"description": "Manage account menu",
|
||||
"display_key": "ussd.kenya.profile_management",
|
||||
"name": "profile_management",
|
||||
"parent": "start"
|
||||
},
|
||||
"18": {
|
||||
"description": "Manage user's preferred language menu.",
|
||||
"12": {
|
||||
"description": "Manage business directory info",
|
||||
"display_key": "ussd.kenya.select_preferred_language",
|
||||
"name": "select_preferred_language",
|
||||
"parent": "account_management"
|
||||
},
|
||||
"19": {
|
||||
"description": "Retrieve mini-statement menu.",
|
||||
"13": {
|
||||
"description": "About business directory info",
|
||||
"display_key": "ussd.kenya.mini_statement_pin_authorization",
|
||||
"name": "mini_statement_pin_authorization",
|
||||
"parent": "account_management"
|
||||
},
|
||||
"20": {
|
||||
"description": "Manage user's pin menu.",
|
||||
"14": {
|
||||
"description": "Change business directory info",
|
||||
"display_key": "ussd.kenya.enter_current_pin",
|
||||
"name": "enter_current_pin",
|
||||
"parent": "account_management"
|
||||
},
|
||||
"21": {
|
||||
"description": "New pin entry menu.",
|
||||
"15": {
|
||||
"description": "New PIN entry menu",
|
||||
"display_key": "ussd.kenya.enter_new_pin",
|
||||
"name": "enter_new_pin",
|
||||
"parent": "account_management"
|
||||
},
|
||||
"16": {
|
||||
"description": "First name entry menu",
|
||||
"display_key": "ussd.kenya.enter_first_name",
|
||||
"name": "enter_first_name",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"17": {
|
||||
"description": "Last name entry menu",
|
||||
"display_key": "ussd.kenya.enter_last_name",
|
||||
"name": "enter_last_name",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"18": {
|
||||
"description": "Gender entry menu",
|
||||
"display_key": "ussd.kenya.enter_gender",
|
||||
"name": "enter_gender",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"19": {
|
||||
"description": "Location entry menu",
|
||||
"display_key": "ussd.kenya.enter_location",
|
||||
"name": "enter_location",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"20": {
|
||||
"description": "Business profile entry menu",
|
||||
"display_key": "ussd.kenya.enter_business_profile",
|
||||
"name": "enter_business_profile",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"21": {
|
||||
"description": "Menu to display a user's entire profile",
|
||||
"display_key": "ussd.kenya.display_user_profile_data",
|
||||
"name": "display_user_profile_data",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"22": {
|
||||
"description": "Pin entry menu.",
|
||||
"display_key": "ussd.kenya.standard_pin_authorization",
|
||||
"name": "standard_pin_authorization",
|
||||
"parent": "start"
|
||||
"description": "Pin authorization to change name",
|
||||
"display_key": "ussd.kenya.name_management_pin_authorization",
|
||||
"name": "name_management_pin_authorization",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"23": {
|
||||
"description": "Exit menu.",
|
||||
"description": "Pin authorization to change gender",
|
||||
"display_key": "ussd.kenya.gender_management_pin_authorization",
|
||||
"name": "gender_management_pin_authorization",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"24": {
|
||||
"description": "Pin authorization to change location",
|
||||
"display_key": "ussd.kenya.location_management_pin_authorization",
|
||||
"name": "location_management_pin_authorization",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"26": {
|
||||
"description": "Pin authorization to display user's profile",
|
||||
"display_key": "ussd.kenya.view_profile_pin_authorization",
|
||||
"name": "view_profile_pin_authorization",
|
||||
"parent": "profile_management"
|
||||
},
|
||||
"27": {
|
||||
"description": "Exit menu",
|
||||
"display_key": "ussd.kenya.exit",
|
||||
"name": "exit",
|
||||
"parent": null
|
||||
},
|
||||
"24": {
|
||||
"description": "Invalid menu option.",
|
||||
"28": {
|
||||
"description": "Invalid menu option",
|
||||
"display_key": "ussd.kenya.exit_invalid_menu_option",
|
||||
"name": "exit_invalid_menu_option",
|
||||
"parent": null
|
||||
},
|
||||
"25": {
|
||||
"description": "Pin policy violation.",
|
||||
"29": {
|
||||
"description": "PIN policy violation",
|
||||
"display_key": "ussd.kenya.exit_invalid_pin",
|
||||
"name": "exit_invalid_pin",
|
||||
"parent": null
|
||||
},
|
||||
"26": {
|
||||
"description": "Pin mismatch. New pin and the new pin confirmation do not match",
|
||||
"30": {
|
||||
"description": "PIN mismatch. New PIN and the new PIN confirmation do not match",
|
||||
"display_key": "ussd.kenya.exit_pin_mismatch",
|
||||
"name": "exit_pin_mismatch",
|
||||
"parent": null
|
||||
},
|
||||
"27": {
|
||||
"description": "Ussd pin blocked Menu",
|
||||
"31": {
|
||||
"description": "Ussd PIN Blocked Menu",
|
||||
"display_key": "ussd.kenya.exit_pin_blocked",
|
||||
"name": "exit_pin_blocked",
|
||||
"parent": null
|
||||
},
|
||||
"28": {
|
||||
"description": "Key params missing in request.",
|
||||
"32": {
|
||||
"description": "Key params missing in request",
|
||||
"display_key": "ussd.kenya.exit_invalid_request",
|
||||
"name": "exit_invalid_request",
|
||||
"parent": null
|
||||
},
|
||||
"29": {
|
||||
"description": "The user did not select a choice.",
|
||||
"33": {
|
||||
"description": "The user did not select a choice",
|
||||
"display_key": "ussd.kenya.exit_invalid_input",
|
||||
"name": "exit_invalid_input",
|
||||
"parent": null
|
||||
},
|
||||
"30": {
|
||||
"description": "Exit following unsuccessful transaction due to insufficient account balance.",
|
||||
"display_key": "ussd.kenya.exit_insufficient_balance",
|
||||
"name": "exit_insufficient_balance",
|
||||
"parent": null
|
||||
},
|
||||
"31": {
|
||||
"34": {
|
||||
"description": "Exit following a successful transaction.",
|
||||
"display_key": "ussd.kenya.exit_successful_transaction",
|
||||
"name": "exit_successful_transaction",
|
||||
"parent": null
|
||||
},
|
||||
"32": {
|
||||
"description": "End of a menu flow.",
|
||||
"display_key": "ussd.kenya.complete",
|
||||
"name": "complete",
|
||||
"parent": null
|
||||
},
|
||||
"33": {
|
||||
"description": "Pin entry menu to view account balances.",
|
||||
"display_key": "ussd.kenya.account_balances_pin_authorization",
|
||||
"name": "account_balances_pin_authorization",
|
||||
"parent": "account_management"
|
||||
},
|
||||
"34": {
|
||||
"description": "Pin entry menu to view account statement.",
|
||||
"display_key": "ussd.kenya.account_statement_pin_authorization",
|
||||
"name": "account_statement_pin_authorization",
|
||||
"parent": "account_management"
|
||||
},
|
||||
"35": {
|
||||
"description": "Menu to display account balances.",
|
||||
"display_key": "ussd.kenya.account_balances",
|
||||
"name": "account_balances",
|
||||
"parent": "account_management"
|
||||
"description": "Manage account menu",
|
||||
"display_key": "ussd.kenya.account_management",
|
||||
"name": "account_management",
|
||||
"parent": "start"
|
||||
},
|
||||
"36": {
|
||||
"description": "Menu to display first set of transactions in statement.",
|
||||
"display_key": "ussd.kenya.first_transaction_set",
|
||||
"name": "first_transaction_set",
|
||||
"parent": null
|
||||
},
|
||||
"37": {
|
||||
"description": "Menu to display middle set of transactions in statement.",
|
||||
"display_key": "ussd.kenya.middle_transaction_set",
|
||||
"name": "middle_transaction_set",
|
||||
"parent": null
|
||||
},
|
||||
"38": {
|
||||
"description": "Menu to display last set of transactions in statement.",
|
||||
"display_key": "ussd.kenya.last_transaction_set",
|
||||
"name": "last_transaction_set",
|
||||
"parent": null
|
||||
},
|
||||
"39": {
|
||||
"description": "Menu to instruct users to call the office.",
|
||||
"display_key": "ussd.key.help",
|
||||
"name": "help",
|
||||
"description": "Exit following insufficient balance to perform a transaction.",
|
||||
"display_key": "ussd.kenya.exit_insufficient_balance",
|
||||
"name": "exit_insufficient_balance",
|
||||
"parent": null
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,17 +17,3 @@ class ActionDataNotFoundError(OSError):
|
||||
"""Raised when action data matching a specific task uuid is not found in the redis cache"""
|
||||
pass
|
||||
|
||||
|
||||
class UserMetadataNotFoundError(OSError):
|
||||
"""Raised when metadata is expected but not available in cache."""
|
||||
pass
|
||||
|
||||
|
||||
class UnsupportedMethodError(OSError):
|
||||
"""Raised when the method passed to the make request function is unsupported."""
|
||||
pass
|
||||
|
||||
|
||||
class CachedDataNotFoundError(OSError):
|
||||
"""Raised when the method passed to the make request function is unsupported."""
|
||||
pass
|
||||
|
||||
@@ -1,43 +0,0 @@
|
||||
# standard imports
|
||||
|
||||
# third-party imports
|
||||
import requests
|
||||
from chainlib.eth.address import to_checksum
|
||||
from hexathon import add_0x
|
||||
|
||||
# local imports
|
||||
from cic_ussd.error import UnsupportedMethodError
|
||||
|
||||
|
||||
def make_request(method: str, url: str, data: any = None, headers: dict = None):
|
||||
"""
|
||||
:param method:
|
||||
:type method:
|
||||
:param url:
|
||||
:type url:
|
||||
:param data:
|
||||
:type data:
|
||||
:param headers:
|
||||
:type headers:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
if method == 'GET':
|
||||
result = requests.get(url=url)
|
||||
elif method == 'POST':
|
||||
result = requests.post(url=url, data=data, headers=headers)
|
||||
elif method == 'PUT':
|
||||
result = requests.put(url=url, data=data, headers=headers)
|
||||
else:
|
||||
raise UnsupportedMethodError(f'Unsupported method: {method}')
|
||||
return result
|
||||
|
||||
|
||||
def blockchain_address_to_metadata_pointer(blockchain_address: str):
|
||||
"""
|
||||
:param blockchain_address:
|
||||
:type blockchain_address:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
return bytes.fromhex(blockchain_address[2:])
|
||||
@@ -1,63 +0,0 @@
|
||||
# standard imports
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
# third-party imports
|
||||
import gnupg
|
||||
|
||||
# local imports
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class Signer:
|
||||
"""
|
||||
:cvar gpg_path:
|
||||
:type gpg_path:
|
||||
:cvar gpg_passphrase:
|
||||
:type gpg_passphrase:
|
||||
:cvar key_file_path:
|
||||
:type key_file_path:
|
||||
|
||||
"""
|
||||
gpg_path: str = None
|
||||
gpg_passphrase: str = None
|
||||
key_file_path: str = None
|
||||
|
||||
def __init__(self):
|
||||
self.gpg = gnupg.GPG(gnupghome=self.gpg_path)
|
||||
|
||||
# parse key file data
|
||||
key_file = open(self.key_file_path, 'r')
|
||||
self.key_data = key_file.read()
|
||||
key_file.close()
|
||||
|
||||
def get_operational_key(self):
|
||||
"""
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
# import key data into keyring
|
||||
self.gpg.import_keys(key_data=self.key_data)
|
||||
gpg_keys = self.gpg.list_keys()
|
||||
key_algorithm = gpg_keys[0].get('algo')
|
||||
key_id = gpg_keys[0].get("keyid")
|
||||
logg.info(f'using signing key: {key_id}, algorithm: {key_algorithm}')
|
||||
return gpg_keys[0]
|
||||
|
||||
def sign_digest(self, data: bytes):
|
||||
"""
|
||||
:param data:
|
||||
:type data:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
data = json.loads(data)
|
||||
digest = data['digest']
|
||||
key_id = self.get_operational_key().get('keyid')
|
||||
signature = self.gpg.sign(digest, passphrase=self.gpg_passphrase, keyid=key_id)
|
||||
return str(signature)
|
||||
|
||||
|
||||
@@ -1,102 +0,0 @@
|
||||
# standard imports
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
# third-party imports
|
||||
import requests
|
||||
from cic_types.models.person import generate_metadata_pointer, Person
|
||||
|
||||
# local imports
|
||||
from cic_ussd.chain import Chain
|
||||
from cic_ussd.metadata import make_request
|
||||
from cic_ussd.metadata.signer import Signer
|
||||
from cic_ussd.redis import cache_data
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class UserMetadata:
|
||||
"""
|
||||
:cvar base_url:
|
||||
:type base_url:
|
||||
"""
|
||||
base_url = None
|
||||
|
||||
def __init__(self, identifier: bytes):
|
||||
"""
|
||||
:param identifier:
|
||||
:type identifier:
|
||||
"""
|
||||
self. headers = {
|
||||
'X-CIC-AUTOMERGE': 'server',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
self.identifier = identifier
|
||||
self.metadata_pointer = generate_metadata_pointer(
|
||||
identifier=self.identifier,
|
||||
cic_type='cic.person'
|
||||
)
|
||||
if self.base_url:
|
||||
self.url = os.path.join(self.base_url, self.metadata_pointer)
|
||||
|
||||
def create(self, data: dict):
|
||||
try:
|
||||
data = json.dumps(data).encode('utf-8')
|
||||
result = make_request(method='POST', url=self.url, data=data, headers=self.headers)
|
||||
metadata = result.content
|
||||
self.edit(data=metadata, engine='pgp')
|
||||
logg.info(f'Get sign material response status: {result.status_code}')
|
||||
result.raise_for_status()
|
||||
except requests.exceptions.HTTPError as error:
|
||||
raise RuntimeError(error)
|
||||
|
||||
def edit(self, data: bytes, engine: str):
|
||||
"""
|
||||
:param data:
|
||||
:type data:
|
||||
:param engine:
|
||||
:type engine:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
cic_meta_signer = Signer()
|
||||
signature = cic_meta_signer.sign_digest(data=data)
|
||||
algorithm = cic_meta_signer.get_operational_key().get('algo')
|
||||
formatted_data = {
|
||||
'm': data.decode('utf-8'),
|
||||
's': {
|
||||
'engine': engine,
|
||||
'algo': algorithm,
|
||||
'data': signature,
|
||||
'digest': json.loads(data).get('digest'),
|
||||
}
|
||||
}
|
||||
formatted_data = json.dumps(formatted_data).encode('utf-8')
|
||||
|
||||
try:
|
||||
result = make_request(method='PUT', url=self.url, data=formatted_data, headers=self.headers)
|
||||
logg.info(f'Signed content submission status: {result.status_code}.')
|
||||
result.raise_for_status()
|
||||
except requests.exceptions.HTTPError as error:
|
||||
raise RuntimeError(error)
|
||||
|
||||
def query(self):
|
||||
result = make_request(method='GET', url=self.url)
|
||||
status = result.status_code
|
||||
logg.info(f'Get latest data status: {status}')
|
||||
try:
|
||||
if status == 200:
|
||||
response_data = result.content
|
||||
data = json.loads(response_data.decode())
|
||||
|
||||
# validate data
|
||||
person = Person()
|
||||
deserialized_person = person.deserialize(metadata=json.loads(data))
|
||||
|
||||
cache_data(key=self.metadata_pointer, data=json.dumps(deserialized_person.serialize()))
|
||||
elif status == 404:
|
||||
logg.info('The data is not available and might need to be added.')
|
||||
result.raise_for_status()
|
||||
except requests.exceptions.HTTPError as error:
|
||||
raise RuntimeError(error)
|
||||
@@ -5,6 +5,7 @@ import logging
|
||||
# third party imports
|
||||
import celery
|
||||
import i18n
|
||||
import phonenumbers
|
||||
from cic_eth.api.api_task import Api
|
||||
from tinydb.table import Document
|
||||
from typing import Optional
|
||||
@@ -238,7 +239,7 @@ def persist_session_to_db_task(external_session_id: str, queue: str):
|
||||
:type queue: str
|
||||
"""
|
||||
s_persist_session_to_db = celery.signature(
|
||||
'cic_ussd.tasks.ussd_session.persist_session_to_db',
|
||||
'cic_ussd.tasks.ussd.persist_session_to_db',
|
||||
[external_session_id]
|
||||
)
|
||||
s_persist_session_to_db.apply_async(queue=queue)
|
||||
@@ -452,3 +453,37 @@ def save_to_in_memory_ussd_session_data(queue: str, session_data: dict, ussd_ses
|
||||
)
|
||||
persist_session_to_db_task(external_session_id=external_session_id, queue=queue)
|
||||
|
||||
|
||||
def process_phone_number(phone_number: str, region: str):
|
||||
"""This function parses any phone number for the provided region
|
||||
:param phone_number: A string with a phone number.
|
||||
:type phone_number: str
|
||||
:param region: Caller defined region
|
||||
:type region: str
|
||||
:return: The parsed phone number value based on the defined region
|
||||
:rtype: str
|
||||
"""
|
||||
if not isinstance(phone_number, str):
|
||||
try:
|
||||
phone_number = str(int(phone_number))
|
||||
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
phone_number_object = phonenumbers.parse(phone_number, region)
|
||||
parsed_phone_number = phonenumbers.format_number(phone_number_object, phonenumbers.PhoneNumberFormat.E164)
|
||||
|
||||
return parsed_phone_number
|
||||
|
||||
|
||||
def get_user_by_phone_number(phone_number: str) -> Optional[User]:
|
||||
"""This function queries the database for a user based on the provided phone number.
|
||||
:param phone_number: A valid phone number.
|
||||
:type phone_number: str
|
||||
:return: A user object matching a given phone number
|
||||
:rtype: User|None
|
||||
"""
|
||||
# consider adding region to user's metadata
|
||||
phone_number = process_phone_number(phone_number=phone_number, region='KE')
|
||||
user = User.session.query(User).filter_by(phone_number=phone_number).first()
|
||||
return user
|
||||
|
||||
@@ -1,43 +0,0 @@
|
||||
# standard imports
|
||||
from typing import Optional
|
||||
|
||||
# third-party imports
|
||||
import phonenumbers
|
||||
|
||||
# local imports
|
||||
from cic_ussd.db.models.user import User
|
||||
|
||||
|
||||
def process_phone_number(phone_number: str, region: str):
|
||||
"""This function parses any phone number for the provided region
|
||||
:param phone_number: A string with a phone number.
|
||||
:type phone_number: str
|
||||
:param region: Caller defined region
|
||||
:type region: str
|
||||
:return: The parsed phone number value based on the defined region
|
||||
:rtype: str
|
||||
"""
|
||||
if not isinstance(phone_number, str):
|
||||
try:
|
||||
phone_number = str(int(phone_number))
|
||||
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
phone_number_object = phonenumbers.parse(phone_number, region)
|
||||
parsed_phone_number = phonenumbers.format_number(phone_number_object, phonenumbers.PhoneNumberFormat.E164)
|
||||
|
||||
return parsed_phone_number
|
||||
|
||||
|
||||
def get_user_by_phone_number(phone_number: str) -> Optional[User]:
|
||||
"""This function queries the database for a user based on the provided phone number.
|
||||
:param phone_number: A valid phone number.
|
||||
:type phone_number: str
|
||||
:return: A user object matching a given phone number
|
||||
:rtype: User|None
|
||||
"""
|
||||
# consider adding region to user's metadata
|
||||
phone_number = process_phone_number(phone_number=phone_number, region='KE')
|
||||
user = User.session.query(User).filter_by(phone_number=phone_number).first()
|
||||
return user
|
||||
@@ -1,26 +1,17 @@
|
||||
# standard imports
|
||||
import logging
|
||||
import json
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
# third party imports
|
||||
import celery
|
||||
from cic_types.models.person import Person
|
||||
from tinydb.table import Document
|
||||
|
||||
# local imports
|
||||
from cic_ussd.account import define_account_tx_metadata, retrieve_account_statement
|
||||
from cic_ussd.balance import BalanceManager, compute_operational_balance, get_cached_operational_balance
|
||||
from cic_ussd.chain import Chain
|
||||
from cic_ussd.accounts import BalanceManager
|
||||
from cic_ussd.db.models.user import AccountStatus, User
|
||||
from cic_ussd.db.models.ussd_session import UssdSession
|
||||
from cic_ussd.menu.ussd_menu import UssdMenu
|
||||
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
||||
from cic_ussd.phone_number import get_user_by_phone_number
|
||||
from cic_ussd.redis import cache_data, create_cached_data_key, get_cached_data
|
||||
from cic_ussd.state_machine import UssdStateMachine
|
||||
from cic_ussd.conversions import to_wei, from_wei
|
||||
from cic_ussd.transactions import to_wei, from_wei
|
||||
from cic_ussd.translation import translation_for
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
@@ -66,17 +57,17 @@ def process_exit_insufficient_balance(display_key: str, user: User, ussd_session
|
||||
:rtype: str
|
||||
"""
|
||||
# get account balance
|
||||
operational_balance = get_cached_operational_balance(blockchain_address=user.blockchain_address)
|
||||
balance_manager = BalanceManager(address=user.blockchain_address,
|
||||
chain_str=UssdStateMachine.chain_str,
|
||||
token_symbol='SRF')
|
||||
balance = balance_manager.get_operational_balance()
|
||||
|
||||
# compile response data
|
||||
user_input = ussd_session.get('user_input').split('*')[-1]
|
||||
transaction_amount = to_wei(value=int(user_input))
|
||||
token_symbol = 'SRF'
|
||||
|
||||
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
|
||||
recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
|
||||
|
||||
tx_recipient_information = define_account_tx_metadata(user=recipient)
|
||||
tx_recipient_information = recipient_phone_number
|
||||
|
||||
return translation_for(
|
||||
key=display_key,
|
||||
@@ -84,7 +75,7 @@ def process_exit_insufficient_balance(display_key: str, user: User, ussd_session
|
||||
amount=from_wei(transaction_amount),
|
||||
token_symbol=token_symbol,
|
||||
recipient_information=tx_recipient_information,
|
||||
token_balance=operational_balance
|
||||
token_balance=balance
|
||||
)
|
||||
|
||||
|
||||
@@ -102,9 +93,9 @@ def process_exit_successful_transaction(display_key: str, user: User, ussd_sessi
|
||||
transaction_amount = to_wei(int(ussd_session.get('session_data').get('transaction_amount')))
|
||||
token_symbol = 'SRF'
|
||||
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
|
||||
recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
|
||||
tx_recipient_information = define_account_tx_metadata(user=recipient)
|
||||
tx_sender_information = define_account_tx_metadata(user=user)
|
||||
sender_phone_number = user.phone_number
|
||||
tx_recipient_information = recipient_phone_number
|
||||
tx_sender_information = sender_phone_number
|
||||
|
||||
return translation_for(
|
||||
key=display_key,
|
||||
@@ -131,10 +122,9 @@ def process_transaction_pin_authorization(user: User, display_key: str, ussd_ses
|
||||
"""
|
||||
# compile response data
|
||||
recipient_phone_number = ussd_session.get('session_data').get('recipient_phone_number')
|
||||
recipient = get_user_by_phone_number(phone_number=recipient_phone_number)
|
||||
tx_recipient_information = define_account_tx_metadata(user=recipient)
|
||||
tx_sender_information = define_account_tx_metadata(user=user)
|
||||
|
||||
tx_recipient_information = recipient_phone_number
|
||||
tx_sender_information = user.phone_number
|
||||
logg.debug('Requires integration with cic-meta to get user name.')
|
||||
token_symbol = 'SRF'
|
||||
user_input = ussd_session.get('user_input').split('*')[-1]
|
||||
transaction_amount = to_wei(value=int(user_input))
|
||||
@@ -149,122 +139,6 @@ def process_transaction_pin_authorization(user: User, display_key: str, ussd_ses
|
||||
)
|
||||
|
||||
|
||||
def process_account_balances(user: User, display_key: str, ussd_session: dict):
|
||||
"""
|
||||
:param user:
|
||||
:type user:
|
||||
:param display_key:
|
||||
:type display_key:
|
||||
:param ussd_session:
|
||||
:type ussd_session:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
# retrieve cached balance
|
||||
operational_balance = get_cached_operational_balance(blockchain_address=user.blockchain_address)
|
||||
|
||||
logg.debug('Requires call to retrieve tax and bonus amounts')
|
||||
tax = ''
|
||||
bonus = ''
|
||||
|
||||
return translation_for(
|
||||
key=display_key,
|
||||
preferred_language=user.preferred_language,
|
||||
operational_balance=operational_balance,
|
||||
tax=tax,
|
||||
bonus=bonus,
|
||||
token_symbol='SRF'
|
||||
)
|
||||
|
||||
|
||||
def format_transactions(transactions: list, preferred_language: str):
|
||||
|
||||
formatted_transactions = ''
|
||||
if len(transactions) > 0:
|
||||
for transaction in transactions:
|
||||
recipient_phone_number = transaction.get('recipient_phone_number')
|
||||
sender_phone_number = transaction.get('sender_phone_number')
|
||||
value = transaction.get('to_value')
|
||||
timestamp = transaction.get('timestamp')
|
||||
action_tag = transaction.get('action_tag')
|
||||
token_symbol = 'SRF'
|
||||
|
||||
if action_tag == 'SENT' or action_tag == 'ULITUMA':
|
||||
formatted_transactions += f'{action_tag} {value} {token_symbol} {recipient_phone_number} {timestamp}.\n'
|
||||
else:
|
||||
formatted_transactions += f'{action_tag} {value} {token_symbol} {sender_phone_number} {timestamp}. \n'
|
||||
return formatted_transactions
|
||||
else:
|
||||
if preferred_language == 'en':
|
||||
formatted_transactions = 'Empty'
|
||||
else:
|
||||
formatted_transactions = 'Hamna historia'
|
||||
return formatted_transactions
|
||||
|
||||
|
||||
def process_account_statement(user: User, display_key: str, ussd_session: dict):
|
||||
"""
|
||||
:param user:
|
||||
:type user:
|
||||
:param display_key:
|
||||
:type display_key:
|
||||
:param ussd_session:
|
||||
:type ussd_session:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
# retrieve cached statement
|
||||
identifier = blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address)
|
||||
key = create_cached_data_key(identifier=identifier, salt='cic.statement')
|
||||
transactions = get_cached_data(key=key)
|
||||
|
||||
first_transaction_set = []
|
||||
middle_transaction_set = []
|
||||
last_transaction_set = []
|
||||
|
||||
transactions = json.loads(transactions)
|
||||
|
||||
if len(transactions) > 6:
|
||||
last_transaction_set += transactions[6:]
|
||||
middle_transaction_set += transactions[3:][:3]
|
||||
first_transaction_set += transactions[:3]
|
||||
# there are probably much cleaner and operational inexpensive ways to do this so find them
|
||||
elif 4 < len(transactions) < 7:
|
||||
middle_transaction_set += transactions[3:]
|
||||
first_transaction_set += transactions[:3]
|
||||
else:
|
||||
first_transaction_set += transactions[:3]
|
||||
|
||||
if display_key == 'ussd.kenya.first_transaction_set':
|
||||
return translation_for(
|
||||
key=display_key,
|
||||
preferred_language=user.preferred_language,
|
||||
first_transaction_set=format_transactions(
|
||||
transactions=first_transaction_set,
|
||||
preferred_language=user.preferred_language
|
||||
)
|
||||
)
|
||||
elif display_key == 'ussd.kenya.middle_transaction_set':
|
||||
return translation_for(
|
||||
key=display_key,
|
||||
preferred_language=user.preferred_language,
|
||||
middle_transaction_set=format_transactions(
|
||||
transactions=middle_transaction_set,
|
||||
preferred_language=user.preferred_language
|
||||
)
|
||||
)
|
||||
|
||||
elif display_key == 'ussd.kenya.last_transaction_set':
|
||||
return translation_for(
|
||||
key=display_key,
|
||||
preferred_language=user.preferred_language,
|
||||
last_transaction_set=format_transactions(
|
||||
transactions=last_transaction_set,
|
||||
preferred_language=user.preferred_language
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def process_start_menu(display_key: str, user: User):
|
||||
"""This function gets data on an account's balance and token in order to append it to the start of the start menu's
|
||||
title. It passes said arguments to the translation function and returns the appropriate corresponding text from the
|
||||
@@ -276,41 +150,16 @@ def process_start_menu(display_key: str, user: User):
|
||||
:return: Corresponding translation text response
|
||||
:rtype: str
|
||||
"""
|
||||
chain_str = Chain.spec.__str__()
|
||||
blockchain_address = user.blockchain_address
|
||||
balance_manager = BalanceManager(address=blockchain_address,
|
||||
chain_str=chain_str,
|
||||
balance_manager = BalanceManager(address=user.blockchain_address,
|
||||
chain_str=UssdStateMachine.chain_str,
|
||||
token_symbol='SRF')
|
||||
|
||||
# get balances synchronously for display on start menu
|
||||
balances_data = balance_manager.get_balances()
|
||||
|
||||
key = create_cached_data_key(
|
||||
identifier=bytes.fromhex(blockchain_address[2:]),
|
||||
salt='cic.balances_data'
|
||||
)
|
||||
cache_data(key=key, data=json.dumps(balances_data))
|
||||
|
||||
# get operational balance
|
||||
operational_balance = compute_operational_balance(balances=balances_data)
|
||||
|
||||
# retrieve and cache account's metadata
|
||||
s_query_user_metadata = celery.signature(
|
||||
'cic_ussd.tasks.metadata.query_user_metadata',
|
||||
[blockchain_address]
|
||||
)
|
||||
s_query_user_metadata.apply_async(queue='cic-ussd')
|
||||
|
||||
# retrieve and cache account's statement
|
||||
retrieve_account_statement(blockchain_address=blockchain_address)
|
||||
|
||||
# TODO [Philip]: figure out how to get token symbol from a metadata layer of sorts.
|
||||
balance = balance_manager.get_operational_balance()
|
||||
token_symbol = 'SRF'
|
||||
|
||||
logg.debug("Requires integration to determine user's balance and token.")
|
||||
return translation_for(
|
||||
key=display_key,
|
||||
preferred_language=user.preferred_language,
|
||||
account_balance=operational_balance,
|
||||
account_balance=balance,
|
||||
account_token_name=token_symbol
|
||||
)
|
||||
|
||||
@@ -392,11 +241,5 @@ def custom_display_text(
|
||||
return process_exit_successful_transaction(display_key=display_key, user=user, ussd_session=ussd_session)
|
||||
elif menu_name == 'start':
|
||||
return process_start_menu(display_key=display_key, user=user)
|
||||
elif 'pin_authorization' in menu_name:
|
||||
return process_pin_authorization(display_key=display_key, user=user)
|
||||
elif menu_name == 'account_balances':
|
||||
return process_account_balances(display_key=display_key, user=user, ussd_session=ussd_session)
|
||||
elif 'transaction_set' in menu_name:
|
||||
return process_account_statement(display_key=display_key, user=user, ussd_session=ussd_session)
|
||||
else:
|
||||
return translation_for(key=display_key, preferred_language=user.preferred_language)
|
||||
|
||||
@@ -1,52 +1,6 @@
|
||||
# standard imports
|
||||
import hashlib
|
||||
import logging
|
||||
|
||||
# third-party imports
|
||||
from redis import Redis
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
class InMemoryStore:
|
||||
cache: Redis = None
|
||||
|
||||
|
||||
def cache_data(key: str, data: str):
|
||||
"""
|
||||
:param key:
|
||||
:type key:
|
||||
:param data:
|
||||
:type data:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
cache = InMemoryStore.cache
|
||||
cache.set(name=key, value=data)
|
||||
cache.persist(name=key)
|
||||
|
||||
|
||||
def get_cached_data(key: str):
|
||||
"""
|
||||
:param key:
|
||||
:type key:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
cache = InMemoryStore.cache
|
||||
return cache.get(name=key)
|
||||
|
||||
|
||||
def create_cached_data_key(identifier: bytes, salt: str):
|
||||
"""
|
||||
:param identifier:
|
||||
:type identifier:
|
||||
:param salt:
|
||||
:type salt:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
hash_object = hashlib.new("sha256")
|
||||
hash_object.update(identifier)
|
||||
hash_object.update(salt.encode(encoding="utf-8"))
|
||||
return hash_object.digest().hex()
|
||||
|
||||
@@ -12,18 +12,14 @@ import redis
|
||||
|
||||
# third-party imports
|
||||
from confini import Config
|
||||
from chainlib.chain import ChainSpec
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
# local imports
|
||||
from cic_ussd.chain import Chain
|
||||
from cic_ussd.db import dsn_from_config
|
||||
from cic_ussd.db.models.base import SessionBase
|
||||
from cic_ussd.encoder import PasswordEncoder
|
||||
from cic_ussd.files.local_files import create_local_file_data_stores, json_file_parser
|
||||
from cic_ussd.menu.ussd_menu import UssdMenu
|
||||
from cic_ussd.metadata.signer import Signer
|
||||
from cic_ussd.metadata.user import UserMetadata
|
||||
from cic_ussd.operations import (define_response_with_content,
|
||||
process_menu_interaction_requests,
|
||||
define_multilingual_responses)
|
||||
@@ -63,7 +59,6 @@ config.censor('PASSWORD', 'DATABASE')
|
||||
# define log levels
|
||||
if args.vv:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
|
||||
elif args.v:
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
@@ -97,14 +92,6 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
|
||||
decode_responses=True)
|
||||
InMemoryUssdSession.redis_cache = InMemoryStore.cache
|
||||
|
||||
# define metadata URL
|
||||
UserMetadata.base_url = config.get('CIC_META_URL')
|
||||
|
||||
# define signer values
|
||||
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
|
||||
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
|
||||
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
|
||||
|
||||
# initialize celery app
|
||||
celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
|
||||
|
||||
@@ -112,13 +99,7 @@ celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY
|
||||
states = json_file_parser(filepath=config.get('STATEMACHINE_STATES'))
|
||||
transitions = json_file_parser(filepath=config.get('STATEMACHINE_TRANSITIONS'))
|
||||
|
||||
chain_spec = ChainSpec(
|
||||
common_name=config.get('CIC_COMMON_NAME'),
|
||||
engine=config.get('CIC_ENGINE'),
|
||||
network_id=config.get('CIC_NETWORK_ID')
|
||||
)
|
||||
|
||||
Chain.spec = chain_spec
|
||||
UssdStateMachine.chain_str = config.get('CIC_CHAIN_SPEC')
|
||||
UssdStateMachine.states = states
|
||||
UssdStateMachine.transitions = transitions
|
||||
|
||||
@@ -171,8 +152,7 @@ def application(env, start_response):
|
||||
return []
|
||||
|
||||
# handle menu interaction requests
|
||||
chain_str = chain_spec.__str__()
|
||||
response = process_menu_interaction_requests(chain_str=chain_str,
|
||||
response = process_menu_interaction_requests(chain_str=config.get('CIC_CHAIN_SPEC'),
|
||||
external_session_id=external_session_id,
|
||||
phone_number=phone_number,
|
||||
queue=args.q,
|
||||
|
||||
@@ -12,8 +12,6 @@ from confini import Config
|
||||
# local imports
|
||||
from cic_ussd.db import dsn_from_config
|
||||
from cic_ussd.db.models.base import SessionBase
|
||||
from cic_ussd.metadata.signer import Signer
|
||||
from cic_ussd.metadata.user import UserMetadata
|
||||
from cic_ussd.redis import InMemoryStore
|
||||
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
|
||||
|
||||
@@ -61,14 +59,6 @@ InMemoryStore.cache = redis.StrictRedis(host=config.get('REDIS_HOSTNAME'),
|
||||
decode_responses=True)
|
||||
InMemoryUssdSession.redis_cache = InMemoryStore.cache
|
||||
|
||||
# define metadata URL
|
||||
UserMetadata.base_url = config.get('CIC_META_URL')
|
||||
|
||||
# define signer values
|
||||
Signer.gpg_path = config.get('PGP_EXPORT_DIR')
|
||||
Signer.gpg_passphrase = config.get('PGP_PASSPHRASE')
|
||||
Signer.key_file_path = f"{config.get('PGP_KEYS_PATH')}{config.get('PGP_PRIVATE_KEYS')}"
|
||||
|
||||
# set up celery
|
||||
current_app = celery.Celery(__name__)
|
||||
|
||||
|
||||
@@ -1,18 +1,14 @@
|
||||
# standard imports
|
||||
import json
|
||||
import logging
|
||||
from typing import Tuple
|
||||
|
||||
# third party imports
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_ussd.balance import BalanceManager, compute_operational_balance
|
||||
from cic_ussd.chain import Chain
|
||||
from cic_ussd.accounts import BalanceManager
|
||||
from cic_ussd.db.models.user import AccountStatus, User
|
||||
from cic_ussd.operations import save_to_in_memory_ussd_session_data
|
||||
from cic_ussd.phone_number import get_user_by_phone_number
|
||||
from cic_ussd.redis import create_cached_data_key, get_cached_data
|
||||
from cic_ussd.operations import get_user_by_phone_number, save_to_in_memory_ussd_session_data
|
||||
from cic_ussd.state_machine.state_machine import UssdStateMachine
|
||||
from cic_ussd.transactions import OutgoingTransactionProcessor
|
||||
|
||||
|
||||
@@ -31,7 +27,22 @@ def is_valid_recipient(state_machine_data: Tuple[str, dict, User]) -> bool:
|
||||
recipient = get_user_by_phone_number(phone_number=user_input)
|
||||
is_not_initiator = user_input != user.phone_number
|
||||
has_active_account_status = user.get_account_status() == AccountStatus.ACTIVE.name
|
||||
return is_not_initiator and has_active_account_status and recipient is not None
|
||||
logg.debug('This section requires implementation of checks for user roles and authorization status of an account.')
|
||||
return is_not_initiator and has_active_account_status
|
||||
|
||||
|
||||
def is_valid_token_agent(state_machine_data: Tuple[str, dict, User]) -> bool:
|
||||
"""This function checks that a user exists, is not the initiator of the transaction, has an active account status
|
||||
and is authorized to perform exchange transactions.
|
||||
:param state_machine_data: A tuple containing user input, a ussd session and user object.
|
||||
:type state_machine_data: tuple
|
||||
:return: A user's validity
|
||||
:rtype: bool
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
# is_token_agent = AccountRole.TOKEN_AGENT.value in user.get_user_roles()
|
||||
logg.debug('This section requires implementation of user roles and authorization to facilitate exchanges.')
|
||||
return is_valid_recipient(state_machine_data=state_machine_data)
|
||||
|
||||
|
||||
def is_valid_transaction_amount(state_machine_data: Tuple[str, dict, User]) -> bool:
|
||||
@@ -59,17 +70,10 @@ def has_sufficient_balance(state_machine_data: Tuple[str, dict, User]) -> bool:
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
balance_manager = BalanceManager(address=user.blockchain_address,
|
||||
chain_str=Chain.spec.__str__(),
|
||||
chain_str=UssdStateMachine.chain_str,
|
||||
token_symbol='SRF')
|
||||
# get cached balance
|
||||
key = create_cached_data_key(
|
||||
identifier=bytes.fromhex(user.blockchain_address[2:]),
|
||||
salt='cic.balances_data'
|
||||
)
|
||||
cached_balance = get_cached_data(key=key)
|
||||
operational_balance = compute_operational_balance(balances=json.loads(cached_balance))
|
||||
|
||||
return int(user_input) <= operational_balance
|
||||
balance = balance_manager.get_operational_balance()
|
||||
return int(user_input) <= balance
|
||||
|
||||
|
||||
def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, User]):
|
||||
@@ -84,25 +88,6 @@ def save_recipient_phone_to_session_data(state_machine_data: Tuple[str, dict, Us
|
||||
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)
|
||||
|
||||
|
||||
def retrieve_recipient_metadata(state_machine_data: Tuple[str, dict, User]):
|
||||
"""
|
||||
:param state_machine_data:
|
||||
:type state_machine_data:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
|
||||
recipient = get_user_by_phone_number(phone_number=user_input)
|
||||
blockchain_address = recipient.blockchain_address
|
||||
# retrieve and cache account's metadata
|
||||
s_query_user_metadata = celery.signature(
|
||||
'cic_ussd.tasks.metadata.query_user_metadata',
|
||||
[blockchain_address]
|
||||
)
|
||||
s_query_user_metadata.apply_async(queue='cic-ussd')
|
||||
|
||||
|
||||
def save_transaction_amount_to_session_data(state_machine_data: Tuple[str, dict, User]):
|
||||
"""This function saves the phone number corresponding the intended recipients blockchain account.
|
||||
:param state_machine_data: A tuple containing user input, a ussd session and user object.
|
||||
@@ -128,8 +113,7 @@ def process_transaction_request(state_machine_data: Tuple[str, dict, User]):
|
||||
to_address = recipient.blockchain_address
|
||||
from_address = user.blockchain_address
|
||||
amount = int(ussd_session.get('session_data').get('transaction_amount'))
|
||||
chain_str = Chain.spec.__str__()
|
||||
outgoing_tx_processor = OutgoingTransactionProcessor(chain_str=chain_str,
|
||||
outgoing_tx_processor = OutgoingTransactionProcessor(chain_str=UssdStateMachine.chain_str,
|
||||
from_address=from_address,
|
||||
to_address=to_address)
|
||||
outgoing_tx_processor.process_outgoing_transfer_transaction(amount=amount)
|
||||
|
||||
@@ -1,20 +1,10 @@
|
||||
# standard imports
|
||||
import json
|
||||
import logging
|
||||
from typing import Tuple
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
from cic_types.models.person import Person, generate_metadata_pointer
|
||||
from cic_types.models.person import generate_vcard_from_contact_data, manage_identity_data
|
||||
|
||||
# local imports
|
||||
from cic_ussd.chain import Chain
|
||||
from cic_ussd.db.models.user import User
|
||||
from cic_ussd.error import UserMetadataNotFoundError
|
||||
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
||||
from cic_ussd.operations import save_to_in_memory_ussd_session_data
|
||||
from cic_ussd.redis import get_cached_data
|
||||
|
||||
logg = logging.getLogger(__file__)
|
||||
|
||||
@@ -52,29 +42,7 @@ def update_account_status_to_active(state_machine_data: Tuple[str, dict, User]):
|
||||
User.session.commit()
|
||||
|
||||
|
||||
def process_gender_user_input(user: User, user_input: str):
|
||||
"""
|
||||
:param user:
|
||||
:type user:
|
||||
:param user_input:
|
||||
:type user_input:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
if user.preferred_language == 'en':
|
||||
if user_input == '1':
|
||||
gender = 'Male'
|
||||
else:
|
||||
gender = 'Female'
|
||||
else:
|
||||
if user_input == '1':
|
||||
gender = 'Mwanaume'
|
||||
else:
|
||||
gender = 'Mwanamke'
|
||||
return gender
|
||||
|
||||
|
||||
def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict, User]):
|
||||
def save_profile_attribute_to_session_data(state_machine_data: Tuple[str, dict, User]):
|
||||
"""This function saves first name data to the ussd session in the redis cache.
|
||||
:param state_machine_data: A tuple containing user input, a ussd session and user object.
|
||||
:type state_machine_data: tuple
|
||||
@@ -86,17 +54,16 @@ def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict,
|
||||
|
||||
# define session data key from current state
|
||||
key = ''
|
||||
if 'given_name' in current_state:
|
||||
key = 'given_name'
|
||||
elif 'family_name' in current_state:
|
||||
key = 'family_name'
|
||||
if 'first_name' in current_state:
|
||||
key = 'first_name'
|
||||
elif 'last_name' in current_state:
|
||||
key = 'last_name'
|
||||
elif 'gender' in current_state:
|
||||
key = 'gender'
|
||||
user_input = process_gender_user_input(user=user, user_input=user_input)
|
||||
elif 'location' in current_state:
|
||||
key = 'location'
|
||||
elif 'products' in current_state:
|
||||
key = 'products'
|
||||
elif 'business_profile' in current_state:
|
||||
key = 'business_profile'
|
||||
|
||||
# check if there is existing session data
|
||||
if ussd_session.get('session_data'):
|
||||
@@ -109,120 +76,14 @@ def save_metadata_attribute_to_session_data(state_machine_data: Tuple[str, dict,
|
||||
save_to_in_memory_ussd_session_data(queue='cic-ussd', session_data=session_data, ussd_session=ussd_session)
|
||||
|
||||
|
||||
def format_user_metadata(metadata: dict, user: User):
|
||||
"""
|
||||
:param metadata:
|
||||
:type metadata:
|
||||
:param user:
|
||||
:type user:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
gender = metadata.get('gender')
|
||||
given_name = metadata.get('given_name')
|
||||
family_name = metadata.get('family_name')
|
||||
location = {
|
||||
"area_name": metadata.get('location')
|
||||
}
|
||||
products = []
|
||||
if metadata.get('products'):
|
||||
products = metadata.get('products').split(',')
|
||||
phone_number = user.phone_number
|
||||
date_registered = int(user.created.replace().timestamp())
|
||||
blockchain_address = user.blockchain_address
|
||||
chain_spec = f'{Chain.spec.common_name()}:{Chain.spec.network_id()}'
|
||||
identities = manage_identity_data(
|
||||
blockchain_address=blockchain_address,
|
||||
blockchain_type=Chain.spec.engine(),
|
||||
chain_spec=chain_spec
|
||||
)
|
||||
return {
|
||||
"date_registered": date_registered,
|
||||
"gender": gender,
|
||||
"identities": identities,
|
||||
"location": location,
|
||||
"products": products,
|
||||
"vcard": generate_vcard_from_contact_data(
|
||||
family_name=family_name,
|
||||
given_name=given_name,
|
||||
tel=phone_number
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
def save_complete_user_metadata(state_machine_data: Tuple[str, dict, User]):
|
||||
"""This function persists elements of the user metadata stored in session data
|
||||
def persist_profile_data(state_machine_data: Tuple[str, dict, User]):
|
||||
"""This function persists elements of the user profile stored in session data
|
||||
:param state_machine_data: A tuple containing user input, a ussd session and user object.
|
||||
:type state_machine_data: tuple
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
|
||||
# get session data
|
||||
metadata = ussd_session.get('session_data')
|
||||
profile_data = ussd_session.get('session_data')
|
||||
logg.debug('This section requires implementation of user metadata.')
|
||||
|
||||
# format metadata appropriately
|
||||
user_metadata = format_user_metadata(metadata=metadata, user=user)
|
||||
|
||||
blockchain_address = user.blockchain_address
|
||||
s_create_user_metadata = celery.signature(
|
||||
'cic_ussd.tasks.metadata.create_user_metadata',
|
||||
[blockchain_address, user_metadata]
|
||||
)
|
||||
s_create_user_metadata.apply_async(queue='cic-ussd')
|
||||
|
||||
|
||||
def edit_user_metadata_attribute(state_machine_data: Tuple[str, dict, User]):
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
blockchain_address = user.blockchain_address
|
||||
key = generate_metadata_pointer(
|
||||
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
|
||||
cic_type='cic.person'
|
||||
)
|
||||
user_metadata = get_cached_data(key=key)
|
||||
|
||||
if not user_metadata:
|
||||
raise UserMetadataNotFoundError(f'Expected user metadata but found none in cache for key: {blockchain_address}')
|
||||
|
||||
given_name = ussd_session.get('session_data').get('given_name')
|
||||
family_name = ussd_session.get('session_data').get('family_name')
|
||||
gender = ussd_session.get('session_data').get('gender')
|
||||
location = ussd_session.get('session_data').get('location')
|
||||
products = ussd_session.get('session_data').get('products')
|
||||
|
||||
# validate user metadata
|
||||
person = Person()
|
||||
user_metadata = json.loads(user_metadata)
|
||||
deserialized_person = person.deserialize(metadata=user_metadata)
|
||||
|
||||
# edit specific metadata attribute
|
||||
if given_name:
|
||||
deserialized_person.given_name = given_name
|
||||
elif family_name:
|
||||
deserialized_person.family_name = family_name
|
||||
elif gender:
|
||||
deserialized_person.gender = gender
|
||||
elif location:
|
||||
# get existing location metadata:
|
||||
location_data = user_metadata.get('location')
|
||||
location_data['area_name'] = location
|
||||
deserialized_person.location = location_data
|
||||
elif products:
|
||||
deserialized_person.products = products
|
||||
|
||||
edited_metadata = deserialized_person.serialize()
|
||||
|
||||
s_edit_user_metadata = celery.signature(
|
||||
'cic_ussd.tasks.metadata.edit_user_metadata',
|
||||
[blockchain_address, edited_metadata, 'pgp']
|
||||
)
|
||||
s_edit_user_metadata.apply_async(queue='cic-ussd')
|
||||
|
||||
|
||||
def get_user_metadata(state_machine_data: Tuple[str, dict, User]):
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
blockchain_address = user.blockchain_address
|
||||
s_get_user_metadata = celery.signature(
|
||||
'cic_ussd.tasks.metadata.query_user_metadata',
|
||||
[blockchain_address]
|
||||
)
|
||||
s_get_user_metadata.apply_async(queue='cic-ussd')
|
||||
|
||||
@@ -3,30 +3,55 @@ import logging
|
||||
import re
|
||||
from typing import Tuple
|
||||
|
||||
# third-party imports
|
||||
from cic_types.models.person import generate_metadata_pointer
|
||||
|
||||
# local imports
|
||||
from cic_ussd.db.models.user import User
|
||||
from cic_ussd.metadata import blockchain_address_to_metadata_pointer
|
||||
from cic_ussd.redis import get_cached_data
|
||||
|
||||
logg = logging.getLogger()
|
||||
|
||||
|
||||
def has_cached_user_metadata(state_machine_data: Tuple[str, dict, User]):
|
||||
def has_complete_profile_data(state_machine_data: Tuple[str, dict, User]):
|
||||
"""This function checks whether the attributes of the user's metadata constituting a profile are filled out.
|
||||
:param state_machine_data: A tuple containing user input, a ussd session and user object.
|
||||
:type state_machine_data: str
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
# check for user metadata in cache
|
||||
key = generate_metadata_pointer(
|
||||
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
|
||||
cic_type='cic.person'
|
||||
)
|
||||
user_metadata = get_cached_data(key=key)
|
||||
return user_metadata is not None
|
||||
logg.debug('This section requires implementation of user metadata.')
|
||||
|
||||
|
||||
def has_empty_username_data(state_machine_data: Tuple[str, dict, User]):
|
||||
"""This function checks whether the aspects of the user's name metadata is filled out.
|
||||
:param state_machine_data: A tuple containing user input, a ussd session and user object.
|
||||
:type state_machine_data: str
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
logg.debug('This section requires implementation of user metadata.')
|
||||
|
||||
|
||||
def has_empty_gender_data(state_machine_data: Tuple[str, dict, User]):
|
||||
"""This function checks whether the aspects of the user's gender metadata is filled out.
|
||||
:param state_machine_data: A tuple containing user input, a ussd session and user object.
|
||||
:type state_machine_data: str
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
logg.debug('This section requires implementation of user metadata.')
|
||||
|
||||
|
||||
def has_empty_location_data(state_machine_data: Tuple[str, dict, User]):
|
||||
"""This function checks whether the aspects of the user's location metadata is filled out.
|
||||
:param state_machine_data: A tuple containing user input, a ussd session and user object.
|
||||
:type state_machine_data: str
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
logg.debug('This section requires implementation of user metadata.')
|
||||
|
||||
|
||||
def has_empty_business_profile_data(state_machine_data: Tuple[str, dict, User]):
|
||||
"""This function checks whether the aspects of the user's business profile metadata is filled out.
|
||||
:param state_machine_data: A tuple containing user input, a ussd session and user object.
|
||||
:type state_machine_data: str
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
logg.debug('This section requires implementation of user metadata.')
|
||||
|
||||
|
||||
def is_valid_name(state_machine_data: Tuple[str, dict, User]):
|
||||
@@ -41,18 +66,3 @@ def is_valid_name(state_machine_data: Tuple[str, dict, User]):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def is_valid_gender_selection(state_machine_data: Tuple[str, dict, User]):
|
||||
"""
|
||||
:param state_machine_data:
|
||||
:type state_machine_data:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
user_input, ussd_session, user = state_machine_data
|
||||
selection_matcher = "^[1-2]$"
|
||||
if re.match(selection_matcher, user_input):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@@ -14,11 +14,15 @@ class UssdStateMachine(Machine):
|
||||
menu as well as providing a means for navigating through these states based on different user inputs.
|
||||
It defines different helper functions that co-ordinate with the stakeholder components of the ussd menu: i.e the
|
||||
User, UssdSession, UssdMenu to facilitate user interaction with ussd menu.
|
||||
|
||||
:cvar chain_str: The chain name and network id.
|
||||
:type chain_str: str
|
||||
:cvar states: A list of pre-defined states.
|
||||
:type states: list
|
||||
:cvar transitions: A list of pre-defined transitions.
|
||||
:type transitions: list
|
||||
"""
|
||||
chain_str = None
|
||||
states = []
|
||||
transitions = []
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ import celery
|
||||
|
||||
celery_app = celery.current_app
|
||||
# export external celery task modules
|
||||
from .logger import *
|
||||
from .ussd_session import *
|
||||
from .callback_handler import *
|
||||
from .metadata import *
|
||||
from .foo import log_it_plz
|
||||
from .ussd import persist_session_to_db
|
||||
from .callback_handler import process_account_creation_callback
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
# standard imports
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
import sqlalchemy
|
||||
|
||||
# local imports
|
||||
|
||||
|
||||
class CriticalTask(celery.Task):
|
||||
retry_jitter = True
|
||||
retry_backoff = True
|
||||
retry_backoff_max = 8
|
||||
|
||||
|
||||
class CriticalSQLAlchemyTask(CriticalTask):
|
||||
autoretry_for = (
|
||||
sqlalchemy.exc.DatabaseError,
|
||||
sqlalchemy.exc.TimeoutError,
|
||||
)
|
||||
@@ -1,26 +1,23 @@
|
||||
# standard imports
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import timedelta
|
||||
|
||||
# third-party imports
|
||||
import celery
|
||||
|
||||
# local imports
|
||||
from cic_ussd.conversions import from_wei
|
||||
from cic_ussd.db.models.base import SessionBase
|
||||
from cic_ussd.db.models.user import User
|
||||
from cic_ussd.account import define_account_tx_metadata
|
||||
from cic_ussd.error import ActionDataNotFoundError
|
||||
from cic_ussd.redis import InMemoryStore, cache_data, create_cached_data_key
|
||||
from cic_ussd.tasks.base import CriticalSQLAlchemyTask
|
||||
from cic_ussd.redis import InMemoryStore
|
||||
from cic_ussd.transactions import IncomingTransactionProcessor
|
||||
|
||||
logg = logging.getLogger(__file__)
|
||||
celery_app = celery.current_app
|
||||
|
||||
|
||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
|
||||
@celery_app.task(bind=True)
|
||||
def process_account_creation_callback(self, result: str, url: str, status_code: int):
|
||||
"""This function defines a task that creates a user and
|
||||
:param result: The blockchain address for the created account
|
||||
@@ -52,14 +49,14 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
|
||||
user = User(blockchain_address=result, phone_number=phone_number)
|
||||
session.add(user)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
# expire cache
|
||||
cache.expire(task_id, timedelta(seconds=180))
|
||||
cache.expire(task_id, timedelta(seconds=30))
|
||||
session.close()
|
||||
|
||||
else:
|
||||
cache.expire(task_id, timedelta(seconds=30))
|
||||
session.close()
|
||||
cache.expire(task_id, timedelta(seconds=180))
|
||||
|
||||
else:
|
||||
session.close()
|
||||
@@ -68,8 +65,9 @@ def process_account_creation_callback(self, result: str, url: str, status_code:
|
||||
|
||||
@celery_app.task
|
||||
def process_incoming_transfer_callback(result: dict, param: str, status_code: int):
|
||||
logg.debug(f'PARAM: {param}, RESULT: {result}, STATUS_CODE: {status_code}')
|
||||
session = SessionBase.create_session()
|
||||
if status_code == 0:
|
||||
if result and status_code == 0:
|
||||
|
||||
# collect result data
|
||||
recipient_blockchain_address = result.get('recipient')
|
||||
@@ -95,123 +93,22 @@ def process_incoming_transfer_callback(result: dict, param: str, status_code: in
|
||||
value=value)
|
||||
|
||||
if param == 'tokengift':
|
||||
incoming_tx_processor.process_token_gift_incoming_transactions()
|
||||
logg.debug('Name information would require integration with cic meta.')
|
||||
incoming_tx_processor.process_token_gift_incoming_transactions(first_name="")
|
||||
elif param == 'transfer':
|
||||
logg.debug('Name information would require integration with cic meta.')
|
||||
if sender_user:
|
||||
sender_information = define_account_tx_metadata(user=sender_user)
|
||||
incoming_tx_processor.process_transfer_incoming_transaction(
|
||||
sender_information=sender_information,
|
||||
recipient_blockchain_address=recipient_blockchain_address
|
||||
)
|
||||
sender_information = f'{sender_user.phone_number}, {""}, {""}'
|
||||
incoming_tx_processor.process_transfer_incoming_transaction(sender_information=sender_information)
|
||||
else:
|
||||
logg.warning(
|
||||
f'Tx with sender: {sender_blockchain_address} was received but has no matching user in the system.'
|
||||
)
|
||||
incoming_tx_processor.process_transfer_incoming_transaction(
|
||||
sender_information='GRASSROOTS ECONOMICS',
|
||||
recipient_blockchain_address=recipient_blockchain_address
|
||||
)
|
||||
sender_information=sender_blockchain_address)
|
||||
else:
|
||||
session.close()
|
||||
raise ValueError(f'Unexpected transaction param: {param}.')
|
||||
else:
|
||||
session.close()
|
||||
raise ValueError(f'Unexpected status code: {status_code}.')
|
||||
|
||||
|
||||
@celery_app.task
|
||||
def process_balances_callback(result: list, param: str, status_code: int):
|
||||
if status_code == 0:
|
||||
balances_data = result[0]
|
||||
blockchain_address = balances_data.get('address')
|
||||
key = create_cached_data_key(
|
||||
identifier=bytes.fromhex(blockchain_address[2:]),
|
||||
salt='cic.balances_data'
|
||||
)
|
||||
cache_data(key=key, data=json.dumps(balances_data))
|
||||
else:
|
||||
raise ValueError(f'Unexpected status code: {status_code}.')
|
||||
|
||||
|
||||
# TODO: clean up this handler
|
||||
def define_transaction_action_tag(
|
||||
preferred_language: str,
|
||||
sender_blockchain_address: str,
|
||||
param: str):
|
||||
# check if out going ot incoming transaction
|
||||
if sender_blockchain_address == param:
|
||||
# check preferred language
|
||||
if preferred_language == 'en':
|
||||
action_tag = 'SENT'
|
||||
else:
|
||||
action_tag = 'ULITUMA'
|
||||
else:
|
||||
if preferred_language == 'en':
|
||||
action_tag = 'RECEIVED'
|
||||
else:
|
||||
action_tag = 'ULIPOKEA'
|
||||
return action_tag
|
||||
|
||||
|
||||
@celery_app.task
|
||||
def process_statement_callback(result, param: str, status_code: int):
|
||||
if status_code == 0:
|
||||
# create session
|
||||
session = SessionBase.create_session()
|
||||
processed_transactions = []
|
||||
|
||||
# process transaction data to cache
|
||||
for transaction in result:
|
||||
sender_blockchain_address = transaction.get('sender')
|
||||
recipient_address = transaction.get('recipient')
|
||||
source_token = transaction.get('source_token')
|
||||
|
||||
# filter out any transactions that are "gassy"
|
||||
if '0x0000000000000000000000000000000000000000' in source_token:
|
||||
pass
|
||||
else:
|
||||
# describe a processed transaction
|
||||
processed_transaction = {}
|
||||
|
||||
# check if sender is in the system
|
||||
sender: User = session.query(User).filter_by(blockchain_address=sender_blockchain_address).first()
|
||||
if sender:
|
||||
processed_transaction['sender_phone_number'] = sender.phone_number
|
||||
|
||||
action_tag = define_transaction_action_tag(
|
||||
preferred_language=sender.preferred_language,
|
||||
sender_blockchain_address=sender_blockchain_address,
|
||||
param=param
|
||||
)
|
||||
processed_transaction['action_tag'] = action_tag
|
||||
|
||||
else:
|
||||
processed_transaction['sender_phone_number'] = 'GRASSROOTS ECONOMICS'
|
||||
|
||||
# check if recipient is in the system
|
||||
recipient: User = session.query(User).filter_by(blockchain_address=recipient_address).first()
|
||||
if recipient:
|
||||
processed_transaction['recipient_phone_number'] = recipient.phone_number
|
||||
|
||||
else:
|
||||
logg.warning(f'Tx with recipient not found in cic-ussd')
|
||||
|
||||
# add transaction values
|
||||
processed_transaction['to_value'] = from_wei(value=transaction.get('to_value'))
|
||||
processed_transaction['from_value'] = from_wei(value=transaction.get('from_value'))
|
||||
|
||||
raw_timestamp = transaction.get('timestamp')
|
||||
timestamp = datetime.utcfromtimestamp(raw_timestamp).strftime('%d/%m/%y, %H:%M')
|
||||
processed_transaction['timestamp'] = timestamp
|
||||
|
||||
processed_transactions.append(processed_transaction)
|
||||
|
||||
# cache account statement
|
||||
identifier = bytes.fromhex(param[2:])
|
||||
key = create_cached_data_key(identifier=identifier, salt='cic.statement')
|
||||
data = json.dumps(processed_transactions)
|
||||
|
||||
# cache statement data
|
||||
cache_data(key=key, data=data)
|
||||
else:
|
||||
raise ValueError(f'Unexpected status code: {status_code}.')
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user