Compare commits

...

41 Commits

Author SHA1 Message Date
nolash
ed55c62347 Revert bump in contract migration dockerfile 2021-03-17 19:07:42 +01:00
nolash
73fbc48603 Seed rehabilitation 2021-03-17 18:42:16 +01:00
7fe5b6bea3 add libpq to runtime 2021-03-17 07:30:34 -07:00
fc27dd6826 Merge branch 'bvander/more-contract-migration-permissions' into 'master'
add permissions and move some pip installs

See merge request grassrootseconomics/cic-internal-integration!69
2021-03-16 05:34:20 +00:00
75262dae5d add permissions and move some pip installs 2021-03-16 05:34:19 +00:00
c59f9da0fc try the multiple dest flags... 2021-03-15 14:31:48 +00:00
20a26045eb Update .cic-template.yml 2021-03-15 14:21:39 +00:00
09a79e10d5 Merge branch 'bvander/fix-contract-migration-permissions' into 'master'
fix home dir permissions

See merge request grassrootseconomics/cic-internal-integration!68
2021-03-15 14:17:09 +00:00
c0dff41b3c fix home dir permissions 2021-03-15 07:10:08 -07:00
18be4c48a7 Merge branch 'fix-cache-tasker' into 'master'
adding cic cache details

See merge request grassrootseconomics/cic-internal-integration!65
2021-03-15 13:22:39 +00:00
432dbe9fee adding cic cache details 2021-03-15 13:22:39 +00:00
93338aebea Merge branch 'bvander/add-cache-dbinit' into 'master'
cic cache db fixes

See merge request grassrootseconomics/cic-internal-integration!64
2021-03-15 13:20:18 +00:00
097a80e8de cic cache db fixes 2021-03-15 13:20:18 +00:00
bd4e5b0a40 Merge branch 'contract-migration-file-issue' into 'master'
Contract migration file issue

See merge request grassrootseconomics/cic-internal-integration!67
2021-03-15 03:27:33 +00:00
4927d92215 seperate module install 2021-03-14 20:21:07 -07:00
650472252d Merge branch 'philip/ussd_session_resumption' into 'master'
Add barebone session resumption feature

See merge request grassrootseconomics/cic-internal-integration!60
2021-03-14 18:17:51 +00:00
746196d2b1 Merge branch 'drop-bancor-contracts' into 'master'
contract migration image build improvements

See merge request grassrootseconomics/cic-internal-integration!66
2021-03-13 18:50:36 +00:00
842cbadf00 contract migration image build improvements 2021-03-13 18:50:36 +00:00
3661e48fd1 Merge branch 'cic-ussd-image-normalize' into 'master'
use the ubuntu slim image

See merge request grassrootseconomics/cic-internal-integration!63
2021-03-12 16:48:40 +00:00
f136504988 use the ubuntu slim image 2021-03-12 16:48:39 +00:00
f6a7956fdf Update .cic-template.yml 2021-03-10 03:53:58 +00:00
562292bd01 Merge branch 'kaniko-builds' into 'master'
Update ci_templates/.cic-template.yml

See merge request grassrootseconomics/cic-internal-integration!61
2021-03-10 03:48:17 +00:00
3cdf7b9965 Update ci_templates/.cic-template.yml 2021-03-10 03:48:17 +00:00
16d88d389b move envlist to dockercontainer 2021-03-09 16:54:33 -08:00
29da44bb9f Add barebone session resumption feature 2021-03-09 19:05:01 +03:00
15445b8d0f Merge branch 'cic-ussd-reqfix' into 'master'
cic types

See merge request grassrootseconomics/cic-internal-integration!59
2021-03-07 20:55:16 +00:00
bb90ceea0b cic types 2021-03-07 12:54:41 -08:00
8904e2abb1 Merge branch 'cic-ussd-req' into 'master'
revert the new requirements for ussd

See merge request grassrootseconomics/cic-internal-integration!58
2021-03-07 20:27:03 +00:00
94d3e61d0c revert the new requirements for ussd 2021-03-07 12:25:36 -08:00
nolash
fc08f3d17a bump cic base for cic ussd 2021-03-07 20:35:26 +01:00
Louis Holbrook
8da5219290 Merge branch 'lash/cli-rehabilitations' into 'master'
Rehabilitate CLI and API after nonce changes

See merge request grassrootseconomics/cic-internal-integration!57
2021-03-07 18:01:44 +00:00
Louis Holbrook
5f01135b04 Rehabilitate CLI and API after nonce changes 2021-03-07 18:01:44 +00:00
Louis Holbrook
543c6249b9 Merge branch 'lash/retry-on-signer-error' into 'master'
Retry on signer error

See merge request grassrootseconomics/cic-internal-integration!55
2021-03-07 13:51:59 +00:00
Louis Holbrook
db4f8f8955 Retry on signer error 2021-03-07 13:51:59 +00:00
Louis Holbrook
0c45e12ce1 Merge branch 'lash/ussd-cli' into 'master'
Add ussd cli

See merge request grassrootseconomics/cic-internal-integration!54
2021-03-06 22:27:40 +00:00
nolash
4f1014c5e1 Add ussd cli 2021-03-06 22:25:57 +01:00
Louis Holbrook
1bfc1434b4 Merge branch 'lash/cic-eth-upgrade' into 'master'
Upgrade cic-eth version

See merge request grassrootseconomics/cic-internal-integration!53
2021-03-06 20:00:10 +00:00
nolash
7b9cd2d4b8 Upgrade cic-eth version 2021-03-06 20:58:35 +01:00
Louis Holbrook
30febbd1e0 Merge branch 'lash/transfer-authorization' into 'master'
cic-eth: Make nonce separate task

See merge request grassrootseconomics/cic-internal-integration!52
2021-03-06 17:55:51 +00:00
Louis Holbrook
f0088f20de cic-eth: Make nonce separate task 2021-03-06 17:55:51 +00:00
Louis Holbrook
618769a0d2 Merge branch 'lash/cic-cache-biiig-num' into 'master'
cic-cache: Set value db fields in cic_cache to handle biiig numbers

See merge request grassrootseconomics/cic-internal-integration!51
2021-03-05 20:03:52 +00:00
87 changed files with 1689 additions and 670 deletions

View File

@@ -1,5 +1,5 @@
[database]
NAME=cic-eth
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost

View File

@@ -36,7 +36,7 @@ script_location = .
# output_encoding = utf-8
#sqlalchemy.url = driver://user:pass@localhost/dbname
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-cache
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic_cache
[post_write_hooks]

View File

@@ -1,5 +0,0 @@
CREATE DATABASE "cic-cache";
CREATE DATABASE "cic-eth";
CREATE DATABASE "cic-notify";
CREATE DATABASE "cic-meta";
CREATE DATABASE "cic-signer";

View File

@@ -25,6 +25,7 @@ licence_files =
python_requires = >= 3.6
packages =
cic_cache
cic_cache.tasks
cic_cache.db
cic_cache.db.models
cic_cache.runnable
@@ -33,5 +34,6 @@ scripts =
[options.entry_points]
console_scripts =
cic-cache-tracker = cic_cache.runnable.tracker:main
cic-cache-server = cic_cache.runnable.server:main
cic-cache-trackerd = cic_cache.runnable.tracker:main
cic-cache-serverd = cic_cache.runnable.server:main
cic-cache-taskerd = cic_cache.runnable.tasker:main

View File

@@ -10,12 +10,15 @@ from cic_registry import zero_address
from cic_eth.db.enum import LockEnum
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.lock import Lock
from cic_eth.task import (
CriticalSQLAlchemyTask,
)
from cic_eth.error import LockedError
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks
@@ -33,7 +36,7 @@ def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
"""Task wrapper to reset arbitrary locks
@@ -51,7 +54,7 @@ def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
"""Task wrapper to set send lock
@@ -67,7 +70,7 @@ def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_send(chained_input, chain_str, address=zero_address):
"""Task wrapper to reset send lock
@@ -83,7 +86,7 @@ def unlock_send(chained_input, chain_str, address=zero_address):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
"""Task wrapper to set queue direct lock
@@ -99,7 +102,7 @@ def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_queue(chained_input, chain_str, address=zero_address):
"""Task wrapper to reset queue direct lock
@@ -115,7 +118,7 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
return chained_input
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_str, lock_flags, address=None):
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)

View File

@@ -1,5 +1,6 @@
# standard imports
import logging
import sys
# third-party imports
import celery
@@ -317,6 +318,8 @@ class AdminApi:
:return: Transaction details
:rtype: dict
"""
problems = []
if tx_hash != None and tx_raw != None:
ValueError('Specify only one of hash or raw tx')
@@ -444,10 +447,12 @@ class AdminApi:
r = c.w3.eth.getTransactionReceipt(tx_hash)
if r.status == 1:
tx['network_status'] = 'Confirmed'
tx['block'] = r.blockNumber
tx['tx_index'] = r.transactionIndex
else:
tx['network_status'] = 'Reverted'
tx['network_block_number'] = r.blockNumber
tx['network_tx_index'] = r.transactionIndex
if tx['block_number'] == None:
problems.append('Queue is missing block number {} for mined tx'.format(r.blockNumber))
except web3.exceptions.TransactionNotFound:
pass
@@ -469,4 +474,9 @@ class AdminApi:
t = s.apply_async()
tx['status_log'] = t.get()
if len(problems) > 0:
sys.stderr.write('\n')
for p in problems:
sys.stderr.write('!!!{}\n'.format(p))
return tx

View File

@@ -82,6 +82,7 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
@@ -92,6 +93,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -110,7 +116,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -137,6 +144,7 @@ class Api:
:returns: uuid of root task
:rtype: celery.Task
"""
raise NotImplementedError('out of service until new DEX migration is done')
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
@@ -147,6 +155,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -165,7 +178,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -200,6 +214,13 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
from_address,
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -217,7 +238,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_transfer.link(self.callback_success)
s_tokens.link(s_transfer).on_error(self.callback_error)
@@ -228,82 +250,6 @@ class Api:
return t
def transfer_request(self, from_address, to_address, spender_address, value, token_symbol):
"""Executes a chain of celery tasks that issues a transfer request of ERC20 tokens from one address to another.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of recipient
:type to_address: str, 0x-hex
:param spender_address: Ethereum address that is executing transfer (typically an escrow contract)
:type spender_address: str, 0x-hex
:param value: Estimated return from conversion
:type value: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
self.chain_str,
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_tokens_transfer_approval = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_tokens_approve = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
from_address,
spender_address,
value,
self.chain_str,
],
queue=self.queue,
)
s_transfer_approval = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
from_address,
to_address,
value,
self.chain_str,
],
queue=self.queue,
)
# TODO: make approve and transfer_approval chainable so callback can be part of the full chain
if self.callback_param != None:
s_transfer_approval.link(self.callback_success)
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval).on_error(self.callback_error)
else:
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval)
g = celery.group(s_tokens_approve, s_tokens_transfer_approval) #s_tokens.apply_async(queue=self.queue)
s_check.link(g)
t = s_check.apply_async()
#t = s_tokens.apply_async(queue=self.queue)
return t
def balance(self, address, token_symbol, include_pending=True):
"""Calls the provided callback with the current token balance of the given address.
@@ -408,6 +354,13 @@ class Api:
s_account.link(self.callback_success)
if register:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'ACCOUNTS_INDEX_WRITER',
],
queue=self.queue,
)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
@@ -415,7 +368,8 @@ class Api:
],
queue=self.queue,
)
s_account.link(s_register)
s_nonce.link(s_register)
s_account.link(s_nonce)
t = s_check.apply_async(queue=self.queue)
return t
@@ -438,6 +392,13 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'GAS_GIFTER',
],
queue=self.queue,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
@@ -445,7 +406,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_refill)
s_nonce.link(s_refill)
s_check.link(s_nonce)
if self.callback_param != None:
s_refill.link(self.callback_success)

View File

@@ -103,6 +103,9 @@ def status_str(v, bits_only=False):
except ValueError:
pass
if v == 0:
return 'NONE'
for i in range(16):
b = (1 << i)
if (b & 0xffff) & v:

View File

@@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -54,7 +54,7 @@ class SessionBase(Model):
@staticmethod
def connect(dsn, pool_size=8, debug=False):
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.

View File

@@ -1,11 +1,16 @@
# standard imports
import logging
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer
from sqlalchemy import Column, String, Integer, DateTime
# local imports
from .base import SessionBase
from cic_eth.error import (
InitializationError,
IntegrityError,
)
logg = logging.getLogger()
@@ -37,23 +42,43 @@ class Nonce(SessionBase):
@staticmethod
def __get(session, address):
r = session.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
def __get(conn, address):
r = conn.execute("SELECT nonce FROM nonce WHERE address_hex = '{}'".format(address))
nonce = r.fetchone()
session.flush()
if nonce == None:
return None
return nonce[0]
@staticmethod
def __set(session, address, nonce):
session.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
session.flush()
def __set(conn, address, nonce):
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
@staticmethod
def next(address, initial_if_not_exists=0, session=None):
def __init(conn, address, nonce):
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
@staticmethod
def init(address, nonce=0, session=None):
session = SessionBase.bind_session(session)
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==address)
o = q.first()
if o != None:
session.flush()
raise InitializationError('nonce on {} already exists ({})'.format(address, o.nonce))
session.flush()
Nonce.__init(session, address, nonce)
SessionBase.release_session(session)
# TODO: Incrementing nonce MUST be done by separate tasks.
@staticmethod
def next(address, initial_if_not_exists=0):
"""Generate next nonce for the given address.
If there is no previous nonce record for the address, the nonce may be initialized to a specified value, or 0 if no value has been given.
@@ -65,32 +90,96 @@ class Nonce(SessionBase):
:returns: Nonce
:rtype: number
"""
session = SessionBase.bind_session(session)
#session = SessionBase.bind_session(session)
SessionBase.release_session(session)
session.begin_nested()
#conn = Nonce.engine.connect()
#session.begin_nested()
conn = Nonce.engine.connect()
if Nonce.transactional:
#session.execute('BEGIN')
session.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
session.flush()
nonce = Nonce.__get(session, address)
conn.execute('BEGIN')
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
logg.debug('locking nonce table for address {}'.format(address))
nonce = Nonce.__get(conn, address)
logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None:
nonce = initial_if_not_exists
session.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
session.flush()
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
Nonce.__set(session, address, nonce+1)
#if Nonce.transactional:
#session.execute('COMMIT')
#session.execute('UNLOCK TABLE nonce')
#conn.close()
session.commit()
# session.commit()
Nonce.__init(conn, address, nonce)
Nonce.__set(conn, address, nonce+1)
if Nonce.transactional:
conn.execute('COMMIT')
logg.debug('unlocking nonce table for address {}'.format(address))
conn.close()
#session.commit()
SessionBase.release_session(session)
#SessionBase.release_session(session)
return nonce
class NonceReservation(SessionBase):
__tablename__ = 'nonce_task_reservation'
nonce = Column(Integer)
key = Column(String)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
@staticmethod
def peek(key, session=None):
session = SessionBase.bind_session(session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
nonce = None
if o != None:
nonce = o.nonce
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def release(key, session=None):
session = SessionBase.bind_session(session)
nonce = NonceReservation.peek(key, session=session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
if o == None:
raise IntegrityError('nonce for key {}'.format(nonce))
SessionBase.release_session(session)
session.delete(o)
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def next(address, key, session=None):
session = SessionBase.bind_session(session)
if NonceReservation.peek(key, session) != None:
raise IntegrityError('nonce for key {}'.format(key))
nonce = Nonce.next(address)
o = NonceReservation()
o.nonce = nonce
o.key = key
session.add(o)
SessionBase.release_session(session)
return nonce

View File

@@ -400,7 +400,7 @@ class Otx(SessionBase):
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if confirmed:
if not self.status & StatusBits.OBSOLETE:
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusEnum.CANCELLED, session)
else:

View File

@@ -143,7 +143,7 @@ class TxCache(SessionBase):
self.block_number = block_number
self.tx_index = tx_index
# not automatically set in sqlite, it seems:
self.date_created = datetime.datetime.now()
self.date_created = datetime.datetime.utcnow()
self.date_updated = self.date_created
self.date_checked = self.date_created

View File

@@ -54,8 +54,29 @@ class RoleMissingError(Exception):
pass
class IntegrityError(Exception):
"""Exception raised to signal irregularities with deduplication and ordering of tasks
"""
pass
class LockedError(Exception):
"""Exception raised when attempt is made to execute action that is deactivated by lock
"""
pass
class SignerError(Exception):
"""Exception raised when signer is unavailable or generates an error
"""
pass
class EthError(Exception):
"""Exception raised when unspecified error from evm node is encountered
"""
pass

View File

@@ -21,8 +21,14 @@ from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.tx import TxCache
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.error import RoleMissingError
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.error import (
RoleMissingError,
SignerError,
)
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalSQLAlchemyAndSignerTask,
)
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@@ -36,6 +42,7 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
uuid,
session=None,
):
"""Register an Ethereum account address with the on-chain account registry
@@ -59,7 +66,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
@@ -69,6 +76,7 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
uuid,
session=None,
):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
@@ -90,7 +98,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
@@ -137,7 +145,7 @@ def unpack_gift(data):
# TODO: Separate out nonce initialization task
@celery_app.task(base=CriticalSQLAlchemyTask)
@celery_app.task(base=CriticalSQLAlchemyAndSignerTask)
def create(password, chain_str):
"""Creates and stores a new ethereum account in the keystore.
@@ -152,28 +160,24 @@ def create(password, chain_str):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
a = c.w3.eth.personal.new_account(password)
a = None
try:
a = c.w3.eth.personal.new_account(password)
except FileNotFoundError:
pass
if a == None:
raise SignerError('create account')
logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account
# TODO: this can safely be set to zero, since we are randomly creating account
n = c.w3.eth.getTransactionCount(a, 'pending')
session = SessionBase.create_session()
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==a)
o = q.first()
session.flush()
if o == None:
o = Nonce()
o.address_hex = a
o.nonce = n
session.add(o)
session.commit()
Nonce.init(a, session=session)
session.commit()
session.close()
return a
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyAndSignerTask)
def register(self, account_address, chain_str, writer_address=None):
"""Creates a transaction to add the given address to the accounts index.
@@ -203,7 +207,7 @@ def register(self, account_address, chain_str, writer_address=None):
c = RpcClient(chain_spec, holder_address=writer_address)
txf = AccountTxFactory(writer_address, c)
tx_add = txf.add(account_address, chain_spec, session=session)
tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
session.close()
@@ -223,7 +227,7 @@ def register(self, account_address, chain_str, writer_address=None):
return account_address
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def gift(self, account_address, chain_str):
"""Creates a transaction to invoke the faucet contract for the given address.
@@ -243,7 +247,7 @@ def gift(self, account_address, chain_str):
txf = AccountTxFactory(account_address, c)
session = SessionBase.create_session()
tx_add = txf.gift(account_address, chain_spec, session=session)
tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
session.close()

View File

@@ -32,10 +32,10 @@ class TxFactory:
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
def next_nonce(self, session=None):
"""Returns the current cached nonce value, and increments it for next transaction.
def next_nonce(self, uuid, session=None):
"""Returns the current reserved nonce value, and increments it for next transaction.
:returns: Nonce
:rtype: number
"""
return self.nonce_oracle.next(session=session)
return self.nonce_oracle.next_by_task_uuid(uuid, session=session)

View File

@@ -54,6 +54,7 @@ class GasOracle():
"""
session = SessionBase.create_session()
a = AccountRole.get_address('GAS_GIFTER', session)
logg.debug('gasgifter {}'.format(a))
session.close()
return a

View File

@@ -1,5 +1,8 @@
# local imports
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
class NonceOracle():
"""Ensures atomic nonce increments for all transactions across all tasks and threads.
@@ -14,10 +17,15 @@ class NonceOracle():
self.default_nonce = default_nonce
def next(self, session=None):
def next(self):
"""Get next unique nonce.
:returns: Nonce
:rtype: number
"""
return Nonce.next(self.address, self.default_nonce, session=session)
raise AttributeError('this should not be called')
return Nonce.next(self.address, self.default_nonce)
def next_by_task_uuid(self, uuid, session=None):
return NonceReservation.release(uuid, session=session)

View File

@@ -8,6 +8,7 @@ from cic_registry.chain import ChainSpec
# local imports
from cic_eth.eth import RpcClient
from cic_eth.queue.tx import create as queue_create
from cic_eth.error import SignerError
celery_app = celery.current_app
logg = celery_app.log.get_default_logger()
@@ -26,7 +27,13 @@ def sign_tx(tx, chain_str):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
tx_transfer_signed = c.w3.eth.sign_transaction(tx)
tx_transfer_signed = None
try:
tx_transfer_signed = c.w3.eth.sign_transaction(tx)
except FileNotFoundError:
pass
if tx_transfer_signed == None:
raise SignerError('sign tx')
logg.debug('tx_transfer_signed {}'.format(tx_transfer_signed))
tx_hash = c.w3.keccak(hexstr=tx_transfer_signed['raw'])
tx_hash_hex = tx_hash.hex()

View File

@@ -24,6 +24,7 @@ from cic_eth.ext.address import translate_address
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
CriticalSQLAlchemyAndSignerTask,
)
celery_app = celery.current_app
@@ -46,6 +47,7 @@ class TokenTxFactory(TxFactory):
spender_address,
amount,
chain_spec,
uuid,
session=None,
):
"""Create an ERC20 "approve" transaction
@@ -74,7 +76,7 @@ class TokenTxFactory(TxFactory):
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(uuid, session=session),
})
return tx_approve
@@ -85,6 +87,7 @@ class TokenTxFactory(TxFactory):
receiver_address,
value,
chain_spec,
uuid,
session=None,
):
"""Create an ERC20 "transfer" transaction
@@ -114,7 +117,7 @@ class TokenTxFactory(TxFactory):
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(session=session),
'nonce': self.next_nonce(uuid, session=session),
})
return tx_transfer
@@ -210,7 +213,7 @@ def balance(tokens, holder_address, chain_str):
return tokens
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
"""Transfer ERC20 tokens between addresses
@@ -248,7 +251,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
txf = TokenTxFactory(holder_address, c)
session = SessionBase.create_session()
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, session=session)
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session)
session.close()
@@ -266,7 +269,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def approve(self, tokens, holder_address, spender_address, value, chain_str):
"""Approve ERC20 transfer on behalf of holder address
@@ -304,7 +307,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
txf = TokenTxFactory(holder_address, c)
session = SessionBase.create_session()
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, session=session)
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session)
session.close()

View File

@@ -12,7 +12,9 @@ from cic_registry.chain import ChainSpec
from .rpc import RpcClient
from cic_eth.db import Otx, SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import NonceReservation
from cic_eth.db.models.lock import Lock
from cic_eth.db.models.role import AccountRole
from cic_eth.db.enum import (
LockEnum,
StatusBits,
@@ -35,6 +37,9 @@ from cic_eth.admin.ctrl import lock_send
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
CriticalWeb3AndSignerTask,
CriticalSQLAlchemyAndSignerTask,
CriticalSQLAlchemyAndWeb3Task,
)
celery_app = celery.current_app
@@ -44,7 +49,7 @@ MAX_NONCE_ATTEMPTS = 3
# TODO this function is too long
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyTask)
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyAndWeb3Task)
def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
@@ -72,6 +77,9 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
if address == None:
address = o['address']
if not web3.Web3.isChecksumAddress(address):
raise ValueError('invalid address {}'.format(address))
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
@@ -80,19 +88,32 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
c = RpcClient(chain_spec)
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
balance = c.w3.eth.getBalance(address)
balance = 0
try:
balance = c.w3.eth.getBalance(address)
except ValueError as e:
raise EthError('balance call for {}'.format())
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
if gas_required > balance:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
address,
chain_str,
],
queue=queue,
)
s_refill_gas.apply_async()
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
wait_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
@@ -108,15 +129,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
safe_gas = c.safe_threshold_amount()
if balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
address,
chain_str,
],
queue=queue,
)
s_refill_gas.apply_async()
s_nonce.link(s_refill)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address))
ready_tasks = []
for tx_hash in tx_hashes:
@@ -287,11 +316,10 @@ class ParityNodeHandler:
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
tx_hash_hex,
debugstr,
],
queue=queue,
queue=self.queue,
)
s_set_reject.link(s_debug)
s_lock.link(s_set_reject)
@@ -299,7 +327,7 @@ class ParityNodeHandler:
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_default(self, tx_hash_hex, tx_hex):
def handle_default(self, tx_hash_hex, tx_hex, debugstr):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
@@ -317,9 +345,18 @@ class ParityNodeHandler:
[],
queue=self.queue,
)
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
debugstr,
],
queue=self.queue,
)
s_set_fubar.link(s_debug)
s_lock.link(s_set_fubar)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
return (t, PermanentTxError, 'Fubar {} {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()), debugstr))
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
@@ -393,7 +430,7 @@ def send(self, txs, chain_str):
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3AndSignerTask)
def refill_gas(self, recipient_address, chain_str):
"""Executes a native token transaction to fund the recipient's gas expenditures.
@@ -407,6 +444,7 @@ def refill_gas(self, recipient_address, chain_str):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
@@ -416,8 +454,11 @@ def refill_gas(self, recipient_address, chain_str):
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
if c > 0:
session.close()
raise AlreadyFillingGasError(recipient_address)
#session.close()
#raise AlreadyFillingGasError(recipient_address)
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True
session.flush()
queue = self.request.delivery_info['routing_key']
@@ -426,10 +467,13 @@ def refill_gas(self, recipient_address, chain_str):
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
nonce = nonce_generator.next(session=session)
#nonce = nonce_generator.next(session=session)
nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session)
gas_price = c.gas_price()
gas_limit = c.default_gas_limit
refill_amount = c.refill_amount()
refill_amount = 0
if not zero_amount:
refill_amount = c.refill_amount()
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
# create and sign transaction
@@ -475,10 +519,11 @@ def refill_gas(self, recipient_address, chain_str):
queue=queue,
)
celery.group(s_tx_cache, s_status)()
return tx_send_gas_signed['raw']
@celery_app.task(bind=True)
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price.
@@ -554,6 +599,33 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, signer=None):
session = SessionBase.create_session()
address = None
if signer == None:
address = chained_input
logg.debug('non-explicit address for reserve nonce, using arg head {}'.format(chained_input))
else:
if web3.Web3.isChecksumAddress(signer):
address = signer
logg.debug('explicit address for reserve nonce {}'.format(signer))
else:
address = AccountRole.get_address(signer, session=session)
logg.debug('role for reserve nonce {} -> {}'.format(signer, address))
if not web3.Web3.isChecksumAddress(address):
raise ValueError('invalid result when resolving address for nonce {}'.format(address))
root_id = self.request.root_id
nonce = NonceReservation.next(address, root_id)
session.close()
return chained_input
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_str):
"""Force update of network status of a simgle transaction

View File

@@ -3,10 +3,11 @@ import logging
import sha3
import web3
# third-party imports
# external imports
from rlp import decode as rlp_decode
from rlp import encode as rlp_encode
from eth_keys import KeyAPI
from chainlib.eth.tx import unpack
logg = logging.getLogger()
@@ -22,64 +23,65 @@ field_debugs = [
's',
]
unpack_signed_raw_tx = unpack
def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
d = rlp_decode(tx_raw_bytes)
logg.debug('decoding using chain id {}'.format(chain_id))
j = 0
for i in d:
logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
j += 1
vb = chain_id
if chain_id != 0:
v = int.from_bytes(d[6], 'big')
vb = v - (chain_id * 2) - 35
while len(d[7]) < 32:
d[7] = b'\x00' + d[7]
while len(d[8]) < 32:
d[8] = b'\x00' + d[8]
s = b''.join([d[7], d[8], bytes([vb])])
so = KeyAPI.Signature(signature_bytes=s)
h = sha3.keccak_256()
h.update(rlp_encode(d))
signed_hash = h.digest()
d[6] = chain_id
d[7] = b''
d[8] = b''
h = sha3.keccak_256()
h.update(rlp_encode(d))
unsigned_hash = h.digest()
p = so.recover_public_key_from_msg_hash(unsigned_hash)
a = p.to_checksum_address()
logg.debug('decoded recovery byte {}'.format(vb))
logg.debug('decoded address {}'.format(a))
logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
to = d[3].hex() or None
if to != None:
to = web3.Web3.toChecksumAddress('0x' + to)
return {
'from': a,
'nonce': int.from_bytes(d[0], 'big'),
'gasPrice': int.from_bytes(d[1], 'big'),
'gas': int.from_bytes(d[2], 'big'),
'to': to,
'value': int.from_bytes(d[4], 'big'),
'data': '0x' + d[5].hex(),
'v': chain_id,
'r': '0x' + s[:32].hex(),
's': '0x' + s[32:64].hex(),
'chainId': chain_id,
'hash': '0x' + signed_hash.hex(),
'hash_unsigned': '0x' + unsigned_hash.hex(),
}
#def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
# d = rlp_decode(tx_raw_bytes)
#
# logg.debug('decoding {} using chain id {}'.format(tx_raw_bytes.hex(), chain_id))
# j = 0
# for i in d:
# logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
# j += 1
# vb = chain_id
# if chain_id != 0:
# v = int.from_bytes(d[6], 'big')
# vb = v - (chain_id * 2) - 35
# while len(d[7]) < 32:
# d[7] = b'\x00' + d[7]
# while len(d[8]) < 32:
# d[8] = b'\x00' + d[8]
# s = b''.join([d[7], d[8], bytes([vb])])
# so = KeyAPI.Signature(signature_bytes=s)
#
# h = sha3.keccak_256()
# h.update(rlp_encode(d))
# signed_hash = h.digest()
#
# d[6] = chain_id
# d[7] = b''
# d[8] = b''
#
# h = sha3.keccak_256()
# h.update(rlp_encode(d))
# unsigned_hash = h.digest()
#
# p = so.recover_public_key_from_msg_hash(unsigned_hash)
# a = p.to_checksum_address()
# logg.debug('decoded recovery byte {}'.format(vb))
# logg.debug('decoded address {}'.format(a))
# logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
# logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
#
# to = d[3].hex() or None
# if to != None:
# to = web3.Web3.toChecksumAddress('0x' + to)
#
# return {
# 'from': a,
# 'nonce': int.from_bytes(d[0], 'big'),
# 'gasPrice': int.from_bytes(d[1], 'big'),
# 'gas': int.from_bytes(d[2], 'big'),
# 'to': to,
# 'value': int.from_bytes(d[4], 'big'),
# 'data': '0x' + d[5].hex(),
# 'v': chain_id,
# 'r': '0x' + s[:32].hex(),
# 's': '0x' + s[32:64].hex(),
# 'chainId': chain_id,
# 'hash': '0x' + signed_hash.hex(),
# 'hash_unsigned': '0x' + unsigned_hash.hex(),
# }
def unpack_signed_raw_tx_hex(tx_raw_hex, chain_id):

View File

@@ -315,7 +315,9 @@ def set_ready(tx_hash):
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_dequeue(tx_hash):
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
@@ -566,7 +568,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
return txs
def get_status_tx(status, before=None, exact=False, limit=0, session=None):
def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, session=None):
"""Retrieve transaction with a specific queue status.
:param status: Status to match transactions with
@@ -582,11 +584,15 @@ def get_status_tx(status, before=None, exact=False, limit=0, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.date_updated<before)
# before = datetime.datetime.utcnow()
if before != None:
q = q.filter(TxCache.date_updated<before)
if exact:
q = q.filter(Otx.status==status.value)
q = q.filter(Otx.status==status)
else:
q = q.filter(Otx.status.op('&')(status.value)==status.value)
q = q.filter(Otx.status.op('&')(status)>0)
if not_status != None:
q = q.filter(Otx.status.op('&')(not_status)==0)
i = 0
for o in q.all():
if limit > 0 and i == limit:

View File

@@ -5,6 +5,7 @@ import os
import logging
import uuid
import json
from xdg.BaseDirectory import xdg_config_home
import celery
from cic_eth.api import Api

View File

@@ -25,7 +25,7 @@ logging.getLogger('urllib3').setLevel(logging.WARNING)
default_abi_dir = '/usr/share/local/cic/solidity/abi'
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')

View File

@@ -15,6 +15,8 @@ import web3
from web3 import HTTPProvider, WebsocketProvider
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from chainlib.eth.tx import unpack
from hexathon import strip_0x
# local imports
import cic_eth
@@ -36,7 +38,7 @@ from cic_eth.error import (
TemporaryTxError,
NotLocalTxError,
)
from cic_eth.eth.util import unpack_signed_raw_tx_hex
#from cic_eth.eth.util import unpack_signed_raw_tx_hex
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -115,12 +117,15 @@ class DispatchSyncer:
chain_str = str(self.chain_spec)
for k in txs.keys():
tx_raw = txs[k]
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
#tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
tx = unpack(tx_raw_bytes, self.chain_spec.chain_id())
try:
set_dequeue(tx['hash'])
except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
continue
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',

View File

@@ -26,7 +26,6 @@ class RegistrationFilter(SyncFilter):
def filter(self, conn, block, tx, db_session=None):
registered_address = None
logg.debug('register filter checking log {}'.format(tx.logs))
for l in tx.logs:
event_topic_hex = l['topics'][0]
if event_topic_hex == account_registry_add_log_hash:
@@ -34,16 +33,23 @@ class RegistrationFilter(SyncFilter):
address_hex = strip_0x(l['topics'][1])[64-40:]
address = to_checksum(add_0x(address_hex))
logg.debug('request token gift to {}'.format(address))
s = celery.signature(
'cic_eth.eth.account.gift',
logg.info('request token gift to {}'.format(address))
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
],
queue=self.queue,
)
s_gift = celery.signature(
'cic_eth.eth.account.gift',
[
str(self.chain_spec),
],
queue=self.queue,
)
s.apply_async()
s_nonce.link(s_gift)
s_nonce.apply_async()
def __str__(self):

View File

@@ -66,14 +66,21 @@ class TransferAuthFilter(SyncFilter):
sender = add_0x(to_checksum(o['sender']))
recipient = add_0x(to_checksum(recipient))
token = add_0x(to_checksum(o['token']))
s = celery.signature(
token_data = {
'address': token,
}
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
[token_data],
sender,
],
queue=self.queue,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
[
{
'address': token,
},
],
sender,
recipient,
o['value'],
@@ -81,7 +88,8 @@ class TransferAuthFilter(SyncFilter):
],
queue=self.queue,
)
t = s.apply_async()
s_nonce.link(s_approve)
t = s_nonce.apply_async()
return True

View File

@@ -30,7 +30,7 @@ class TxFilter(SyncFilter):
if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None
logg.info('local tx match {}'.format(otx.tx_hash))
logg.info('tx filter match on {}'.format(otx.tx_hash))
SessionBase.release_session(db_session)
s = celery.signature(
'cic_eth.queue.tx.set_final_status',

View File

@@ -138,7 +138,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_str):
# TODO: can we merely use the dispatcher instead?
def dispatch(chain_str):
txs = get_status_tx(StatusEnum.RETRY, datetime.datetime.utcnow())
txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow())
if len(txs) == 0:
logg.debug('no retry state txs found')
return

View File

@@ -22,7 +22,7 @@ logg = logging.getLogger()
logging.getLogger('web3').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser()

View File

@@ -37,12 +37,13 @@ logging.getLogger('urllib3').setLevel(logging.WARNING)
default_abi_dir = '/usr/share/local/cic/solidity/abi'
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address')
argparser.add_argument('-f', '--format', dest='f', default='terminal', type=str, help='Output format')
argparser.add_argument('--status-raw', dest='status_raw', action='store_true', help='Output statis bit enum names only')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
@@ -122,7 +123,7 @@ def render_tx(o, **kwargs):
for v in o.get('status_log', []):
d = datetime.datetime.fromisoformat(v[0])
e = status_str(v[1])
e = status_str(v[1], args.status_raw)
content += '{}: {}\n'.format(d, e)
return content

View File

@@ -9,7 +9,10 @@ import celery
# local imports
from .base import Syncer
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
from cic_eth.queue.tx import get_status_tx
logg = logging.getLogger()
@@ -47,7 +50,8 @@ class RetrySyncer(Syncer):
# )
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
stalled_txs = get_status_tx(
StatusEnum.SENT.value,
StatusBits.IN_NETWORK.value,
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
before=before,
)
# return list(failed_txs.keys()) + list(stalled_txs.keys())

View File

@@ -5,6 +5,12 @@ import requests
import celery
import sqlalchemy
# local imports
from cic_eth.error import (
SignerError,
EthError,
)
class CriticalTask(celery.Task):
retry_jitter = True
@@ -30,4 +36,18 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
requests.exceptions.ConnectionError,
EthError,
)
class CriticalSQLAlchemyAndSignerTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
SignerError,
)
class CriticalWeb3AndSignerTask(CriticalTask):
autoretry_for = (
requests.exceptions.ConnectionError,
SignerError,
)

View File

@@ -10,7 +10,7 @@ version = (
0,
10,
0,
'alpha.38',
'alpha.41',
)
version_object = semver.VersionInfo(

View File

@@ -6,4 +6,4 @@ HOST=localhost
PORT=5432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=
DEBUG=0

View File

@@ -1,3 +1,3 @@
[tasks]
transfer_callbacks = taskcall:cic_eth.callbacks.noop.noop
trace_queue_status =
trace_queue_status = 1

View File

@@ -6,7 +6,7 @@ set -e
# set CONFINI_ENV_PREFIX to override the env prefix to override env vars
echo "!!! starting signer"
python /usr/local/bin/crypto-dev-daemon -vv -c /usr/local/etc/crypto-dev-signer &
python /usr/local/bin/crypto-dev-daemon -c /usr/local/etc/crypto-dev-signer &
echo "!!! starting tracker"
/usr/local/bin/cic-eth-taskerd $@

View File

@@ -1,4 +1,4 @@
cic-base~=0.1.1a10
cic-base~=0.1.1a20
web3==5.12.2
celery==4.4.7
crypto-dev-signer~=0.4.13rc4
@@ -19,6 +19,6 @@ eth-gas-proxy==0.0.1a4
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.0a8
chainlib~=0.0.1a19
chainlib~=0.0.1a20
hexathon~=0.0.1a3
chainsyncer~=0.0.1a19

View File

@@ -27,7 +27,7 @@ def database_engine(
SessionBase.poolable = False
dsn = dsn_from_config(load_config)
#SessionBase.connect(dsn, True)
SessionBase.connect(dsn, load_config.get('DATABASE_DEBUG') != None)
SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None)
return dsn

View File

@@ -15,6 +15,7 @@ from eth_keys import KeyAPI
from cic_eth.eth import RpcClient
from cic_eth.eth.rpc import GasOracle
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@@ -113,11 +114,17 @@ def init_w3_conn(
@pytest.fixture(scope='function')
def init_w3(
init_database,
init_eth_tester,
init_eth_account_roles,
init_w3_conn,
):
for address in init_w3_conn.eth.accounts:
nonce = init_w3_conn.eth.getTransactionCount(address, 'pending')
Nonce.init(address, nonce=nonce, session=init_database)
init_database.commit()
yield init_w3_conn
logg.debug('mining om nom nom... {}'.format(init_eth_tester.mine_block()))
@@ -128,9 +135,10 @@ def init_eth_account_roles(
w3_account_roles,
):
role = AccountRole.set('GAS_GIFTER', w3_account_roles.get('eth_account_gas_provider'))
address = w3_account_roles.get('eth_account_gas_provider')
role = AccountRole.set('GAS_GIFTER', address)
init_database.add(role)
init_database.commit()
return w3_account_roles
@@ -163,7 +171,6 @@ def w3_account_roles(
role_ids = [
'eth_account_bancor_deployer',
'eth_account_gas_provider',
'eth_account_reserve_owner',
'eth_account_reserve_minter',
'eth_account_accounts_index_owner',
@@ -172,6 +179,7 @@ def w3_account_roles(
'eth_account_sarafu_gifter',
'eth_account_approval_owner',
'eth_account_faucet_owner',
'eth_account_gas_provider',
]
roles = {}
@@ -187,6 +195,7 @@ def w3_account_roles(
return roles
@pytest.fixture(scope='session')
def w3_account_token_owners(
tokens_to_deploy,

View File

@@ -10,6 +10,8 @@ import web3
# local imports
from cic_eth.api import AdminApi
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
@@ -39,18 +41,37 @@ def test_resend_inplace(
c = RpcClient(default_chain_spec)
sigs = []
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
gas_provider = c.gas_provider()
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[0],
gas_provider,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
chain_str,
],
queue=None,
)
t = s.apply_async()
tx_raw = t.get()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
o = q.first()
tx_raw = o.signed_tx
tx_dict = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), default_chain_spec.chain_id())
gas_price_before = tx_dict['gasPrice']

View File

@@ -49,28 +49,7 @@ def test_transfer_api(
assert t.successful()
def test_transfer_approval_api(
default_chain_spec,
init_w3,
cic_registry,
init_database,
bancor_registry,
bancor_tokens,
transfer_approval,
celery_session_worker,
):
token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
approval_contract = CICRegistry.get_contract(default_chain_spec, 'TransferApproval')
api = Api(str(default_chain_spec), callback_param='transfer_request', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.transfer_request(init_w3.eth.accounts[2], init_w3.eth.accounts[4], approval_contract.address(), 111, token.symbol())
t.get()
#for r in t.collect():
# print(r)
assert t.successful()
@pytest.mark.skip()
def test_convert_api(
default_chain_spec,
init_w3,
@@ -91,6 +70,7 @@ def test_convert_api(
assert t.successful()
@pytest.mark.skip()
def test_convert_transfer_api(
default_chain_spec,
init_w3,

View File

@@ -9,6 +9,10 @@ from tests.mock.filter import (
block_filter,
tx_filter,
)
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
logg = logging.getLogger()
@@ -28,9 +32,20 @@ def test_list_tx(
tx_hashes = []
# external tx
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
NonceReservation.next(init_w3.eth.accounts[0], 'foo', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(13)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec, 'foo')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
@@ -42,9 +57,12 @@ def test_list_tx(
tx_filter.add(a.to_bytes(4, 'big'))
# external tx
NonceReservation.next(init_w3.eth.accounts[0], 'bar', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(28)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec, 'bar')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
@@ -56,10 +74,13 @@ def test_list_tx(
tx_filter.add(a.to_bytes(4, 'big'))
# custodial tx
#NonceReservation.next(init_w3.eth.accounts[0], 'blinky', session=init_database)
#init_database.commit()
init_eth_tester.mine_blocks(3)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
#txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
api = Api(str(default_chain_spec), queue=None)
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000, 'DUM')
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000, 'DUM') #, 'blinky')
t.get()
tx_hash_hex = None
for c in t.collect():
@@ -68,9 +89,11 @@ def test_list_tx(
tx_hashes.append(tx_hash_hex)
# custodial tx
#NonceReservation.next(init_w3.eth.accounts[0], 'clyde', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(6)
api = Api(str(default_chain_spec), queue=None)
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 2000, 'DUM')
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 2000, 'DUM') #, 'clyde')
t.get()
tx_hash_hex = None
for c in t.collect():

View File

@@ -64,6 +64,7 @@ def test_register_account(
init_database,
init_eth_tester,
init_w3,
init_rpc,
cic_registry,
celery_session_worker,
eth_empty_accounts,
@@ -71,18 +72,27 @@ def test_register_account(
logg.debug('chainspec {}'.format(str(default_chain_spec)))
s = celery.signature(
'cic_eth.eth.account.register',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
eth_empty_accounts[0],
init_w3.eth.accounts[0],
],
queue=None,
)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
str(default_chain_spec),
init_w3.eth.accounts[0],
],
)
t = s.apply_async()
s_nonce.link(s_register)
t = s_nonce.apply_async()
address = t.get()
r = t.collect()
t.successful()
for r in t.collect():
pass
assert t.successful()
session = SessionBase.create_session()
o = session.query(Otx).first()

View File

@@ -8,6 +8,7 @@ import celery
# local imports
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.nonce import Nonce
from cic_eth.eth.util import unpack_signed_raw_tx
#logg = logging.getLogger(__name__)
@@ -31,18 +32,38 @@ def test_balance_complex(
}
tx_hashes = []
# TODO: Temporary workaround for nonce db cache initialization being made before deployments.
# Instead use different accounts than system ones for transfers for tests
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
for i in range(3):
s = celery.signature(
'cic_eth.eth.token.transfer',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
[token_data],
init_w3.eth.accounts[0],
],
queue=None,
)
s_transfer = celery.signature(
'cic_eth.eth.token.transfer',
[
init_w3.eth.accounts[0],
init_w3.eth.accounts[1],
1000*(i+1),
chain_str,
],
queue=None,
)
t = s.apply_async()
s_nonce.link(s_transfer)
t = s_nonce.apply_async()
t.get()
r = None
for c in t.collect():

View File

@@ -1,14 +1,19 @@
# standard imports
import logging
import os
# external imports
import pytest
import celery
# local imports
from cic_eth.db import TxConvertTransfer
from cic_eth.eth.bancor import BancorTxFactory
logg = logging.getLogger()
@pytest.mark.skip()
def test_transfer_after_convert(
init_w3,
init_database,

View File

@@ -10,6 +10,9 @@ import celery
from cic_eth.eth.account import unpack_gift
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
logg = logging.getLogger()
@@ -32,10 +35,16 @@ def test_faucet(
init_database,
):
s = celery.signature(
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[7],
],
queue=None,
)
s_gift = celery.signature(
'cic_eth.eth.account.gift',
[
init_w3.eth.accounts[7],
str(default_chain_spec),
],
)
@@ -45,15 +54,21 @@ def test_faucet(
str(default_chain_spec),
],
)
s.link(s_send)
t = s.apply_async()
signed_tx = t.get()
s_gift.link(s_send)
s_nonce.link(s_gift)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
logg.debug('result {}'.format(r))
assert t.successful()
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[0][2:]), default_chain_spec.chain_id())
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==init_w3.eth.accounts[7])
o = q.first()
signed_tx = o.signed_tx
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[2:]), default_chain_spec.chain_id())
giveto = unpack_gift(tx['data'])
assert giveto['to'] == init_w3.eth.accounts[7]

View File

@@ -30,6 +30,7 @@ def test_refill_gas(
default_chain_spec,
init_eth_tester,
init_rpc,
init_w3,
init_database,
cic_registry,
init_eth_account_roles,
@@ -44,23 +45,39 @@ def test_refill_gas(
refill_amount = c.refill_amount()
balance = init_rpc.w3.eth.getBalance(receiver_address)
s = celery.signature(
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
eth_empty_accounts[0],
provider_address,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
receiver_address,
str(default_chain_spec),
],
queue=None,
)
t = s.apply_async()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
r = t.get()
t.collect()
for c in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==receiver_address)
o = q.first()
signed_tx = o.signed_tx
s = celery.signature(
'cic_eth.eth.tx.send',
[
[r],
[signed_tx],
str(default_chain_spec),
],
)
@@ -74,11 +91,11 @@ def test_refill_gas(
assert balance_new == (balance + refill_amount)
# Verify that entry is added in TxCache
session = SessionBase.create_session()
q = session.query(Otx)
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==receiver_address)
r = q.first()
init_database.commit()
assert r.status == StatusEnum.SENT
@@ -86,6 +103,7 @@ def test_refill_gas(
def test_refill_deduplication(
default_chain_spec,
init_rpc,
init_w3,
init_database,
init_eth_account_roles,
cic_registry,
@@ -99,83 +117,131 @@ def test_refill_deduplication(
c = init_rpc
refill_amount = c.refill_amount()
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
receiver_address,
provider_address,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
str(default_chain_spec),
],
queue=None,
)
t = s.apply_async()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
r = t.get()
for e in t.collect():
pass
assert t.successful()
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
receiver_address,
str(default_chain_spec),
provider_address,
],
queue=None,
)
t = s.apply_async()
with pytest.raises(AlreadyFillingGasError):
t.get()
def test_check_gas(
default_chain_spec,
init_eth_tester,
init_w3,
init_rpc,
eth_empty_accounts,
init_database,
cic_registry,
celery_session_worker,
bancor_registry,
bancor_tokens,
):
provider_address = init_w3.eth.accounts[0]
gas_receiver_address = eth_empty_accounts[0]
token_receiver_address = init_w3.eth.accounts[1]
c = init_rpc
txf = TokenTxFactory(gas_receiver_address, c)
tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
gas_price = c.gas_price()
gas_limit = tx_transfer['gas']
s = celery.signature(
'cic_eth.eth.tx.check_gas',
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
[tx_hash_hex],
str(default_chain_spec),
[],
gas_receiver_address,
gas_limit * gas_price,
],
)
t = s.apply_async()
with pytest.raises(OutOfGasError):
r = t.get()
#assert len(r) == 0
time.sleep(1)
t.collect()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
#with pytest.raises(AlreadyFillingGasError):
t.get()
for e in t.collect():
pass
assert t.successful()
logg.warning('TODO: complete test by checking that second tx had zero value')
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_hex)
r = q.first()
session.close()
assert r.status == StatusEnum.WAITFORGAS
# TODO: check gas is part of the transfer chain, and we cannot create the transfer nonce by uuid before the task. Test is subsumed by transfer task test, but should be tested in isolation
#def test_check_gas(
# default_chain_spec,
# init_eth_tester,
# init_w3,
# init_rpc,
# eth_empty_accounts,
# init_database,
# cic_registry,
# celery_session_worker,
# bancor_registry,
# bancor_tokens,
# ):
#
# provider_address = init_w3.eth.accounts[0]
# gas_receiver_address = eth_empty_accounts[0]
# token_receiver_address = init_w3.eth.accounts[1]
#
## c = init_rpc
## txf = TokenTxFactory(gas_receiver_address, c)
## tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec, 'foo')
##
## (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
#
# token_data = [
# {
# 'address': bancor_tokens[0],
# },
# ]
#
# s_nonce = celery.signature(
# 'cic_eth.eth.tx.reserve_nonce',
# [
# token_data,
# init_w3.eth.accounts[0],
# ],
# queue=None,
# )
# s_transfer = celery.signature(
# 'cic_eth.eth.token.transfer',
# [
# init_w3.eth.accounts[0],
# init_w3.eth.accounts[1],
# 1024,
# str(default_chain_spec),
# ],
# queue=None,
# )
#
# gas_price = c.gas_price()
# gas_limit = tx_transfer['gas']
#
# s = celery.signature(
# 'cic_eth.eth.tx.check_gas',
# [
# [tx_hash_hex],
# str(default_chain_spec),
# [],
# gas_receiver_address,
# gas_limit * gas_price,
# ],
# )
# s_nonce.link(s_transfer)
# t = s_nonce.apply_async()
# with pytest.raises(OutOfGasError):
# r = t.get()
# #assert len(r) == 0
#
# time.sleep(1)
# t.collect()
#
# session = SessionBase.create_session()
# q = session.query(Otx)
# q = q.filter(Otx.tx_hash==tx_hash_hex)
# r = q.first()
# session.close()
# assert r.status == StatusEnum.WAITFORGAS
def test_resend_with_higher_gas(
@@ -191,39 +257,73 @@ def test_resend_with_higher_gas(
):
c = init_rpc
txf = TokenTxFactory(init_w3.eth.accounts[0], c)
tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec)
logg.debug('txtransfer {}'.format(tx_transfer))
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec))
logg.debug('signed raw {}'.format(tx_signed_raw_hex))
queue_create(
tx_transfer['nonce'],
tx_transfer['from'],
tx_hash_hex,
tx_signed_raw_hex,
str(default_chain_spec),
token_data = {
'address': bancor_tokens[0],
}
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
[token_data],
init_w3.eth.accounts[0],
],
queue=None,
)
logg.debug('create {}'.format(tx_transfer['from']))
cache_transfer_data(
tx_hash_hex,
tx_transfer, #_signed_raw_hex,
s_transfer = celery.signature(
'cic_eth.eth.token.transfer',
[
init_w3.eth.accounts[0],
init_w3.eth.accounts[1],
1024,
str(default_chain_spec),
],
queue=None,
)
# txf = TokenTxFactory(init_w3.eth.accounts[0], c)
# tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec, 'foo')
# logg.debug('txtransfer {}'.format(tx_transfer))
# (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec))
# logg.debug('signed raw {}'.format(tx_signed_raw_hex))
# queue_create(
# tx_transfer['nonce'],
# tx_transfer['from'],
# tx_hash_hex,
# tx_signed_raw_hex,
# str(default_chain_spec),
# )
# logg.debug('create {}'.format(tx_transfer['from']))
# cache_transfer_data(
# tx_hash_hex,
# tx_transfer, #_signed_raw_hex,
# )
s_nonce.link(s_transfer)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[1])
o = q.first()
tx_hash_hex = o.tx_hash
s_resend = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
tx_hash_hex,
str(default_chain_spec),
],
queue=None,
)
t = s_resend.apply_async()
i = 0
for r in t.collect():
logg.debug('{} {}'.format(i, r[0].get()))
i += 1
t = s_resend.apply_async()
for r in t.collect():
pass
assert t.successful()
#

View File

@@ -1,4 +1,5 @@
# third-party imports
import pytest
import celery
# local imports
@@ -6,7 +7,92 @@ from cic_eth.admin.nonce import shift_nonce
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.tx import otx_cache_parse_tx
from cic_eth.eth.task import sign_tx
from cic_eth.db.models.nonce import (
NonceReservation,
Nonce
)
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
@pytest.mark.skip()
def test_reserve_nonce_task(
init_database,
celery_session_worker,
eth_empty_accounts,
):
s = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'foo',
eth_empty_accounts[0],
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
o = q.first()
assert o != None
q = init_database.query(NonceReservation)
q = q.filter(NonceReservation.key==str(t))
o = q.first()
assert o != None
def test_reserve_nonce_chain(
default_chain_spec,
init_database,
celery_session_worker,
init_w3,
init_rpc,
):
provider_address = init_rpc.gas_provider()
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==provider_address)
o = q.first()
o.nonce = 42
init_database.add(o)
init_database.commit()
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[0],
provider_address,
],
queue=None,
)
s_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
str(default_chain_spec),
],
queue=None,
)
s_nonce.link(s_gas)
t = s_nonce.apply_async()
r = t.get()
for c in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
Q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
o = q.first()
assert o.nonce == 42
@pytest.mark.skip()
def test_shift_nonce(
default_chain_spec,
init_database,
@@ -47,3 +133,4 @@ def test_shift_nonce(
for _ in t.collect():
pass
assert t.successful()

View File

@@ -20,21 +20,30 @@ def test_approve(
cic_registry,
):
s = celery.signature(
'cic_eth.eth.token.approve',
[
[
token_data = [
{
'address': bancor_tokens[0],
},
],
]
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
token_data,
init_rpc.w3.eth.accounts[0],
],
queue=None,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
init_rpc.w3.eth.accounts[0],
init_rpc.w3.eth.accounts[1],
1024,
str(default_chain_spec),
],
)
t = s.apply_async()
s_nonce.link(s_approve)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
logg.debug('result {}'.format(r))

View File

@@ -1,8 +1,29 @@
# third-party imports
import pytest
import uuid
# local imports
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
from cic_eth.error import (
InitializationError,
IntegrityError,
)
def test_nonce_init(
init_database,
eth_empty_accounts,
):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
init_database.commit()
with pytest.raises(InitializationError):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
def test_nonce_increment(
init_database,
@@ -10,11 +31,46 @@ def test_nonce_increment(
database_engine,
):
# if database_engine[:6] == 'sqlite':
# pytest.skip('sqlite cannot lock tables which is required for this test, skipping')
nonce = Nonce.next(eth_empty_accounts[0], 3)
assert nonce == 3
nonce = Nonce.next(eth_empty_accounts[0], 3)
assert nonce == 4
def test_nonce_reserve(
init_database,
eth_empty_accounts,
):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
init_database.commit()
uu = uuid.uuid4()
nonce = NonceReservation.next(eth_empty_accounts[0], str(uu), session=init_database)
init_database.commit()
assert nonce == 42
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
o = q.first()
assert o.nonce == 43
nonce = NonceReservation.release(str(uu))
init_database.commit()
assert nonce == 42
q = init_database.query(NonceReservation)
q = q.filter(NonceReservation.key==str(uu))
o = q.first()
assert o == None
def test_nonce_reserve_integrity(
init_database,
eth_empty_accounts,
):
uu = uuid.uuid4()
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
with pytest.raises(IntegrityError):
NonceReservation.release(str(uu))

View File

@@ -17,6 +17,7 @@ from cic_eth.db.models.tx import TxCache
logg = logging.getLogger()
@pytest.mark.skip()
def test_resolve_converters_by_tokens(
cic_registry,
init_w3,
@@ -43,6 +44,7 @@ def test_resolve_converters_by_tokens(
assert len(t['converters']) == 1
@pytest.mark.skip()
def test_unpack_convert(
default_chain_spec,
cic_registry,
@@ -84,6 +86,7 @@ def test_unpack_convert(
assert convert_data['fee'] == 0
@pytest.mark.skip()
def test_queue_cache_convert(
default_chain_spec,
init_w3,

View File

@@ -1,51 +0,0 @@
# standard imports
import logging
# local imports
from cic_eth.eth.nonce import NonceOracle
logg = logging.getLogger()
def test_nonce_sequence(
eth_empty_accounts,
init_database,
init_rpc,
):
account= init_rpc.w3.eth.personal.new_account('')
no = NonceOracle(account, 0)
n = no.next()
assert n == 0
n = no.next()
assert n == 1
init_rpc.w3.eth.sendTransaction({
'from': init_rpc.w3.eth.accounts[0],
'to': account,
'value': 200000000,
})
init_rpc.w3.eth.sendTransaction({
'from': account,
'to': eth_empty_accounts[0],
'value': 100,
})
c = init_rpc.w3.eth.getTransactionCount(account, 'pending')
logg.debug('nonce {}'.format(c))
account= init_rpc.w3.eth.personal.new_account('')
no = NonceOracle(account, c)
n = no.next()
assert n == 1
n = no.next()
assert n == 2
# try with bogus value
no = NonceOracle(account, 4)
n = no.next()
assert n == 3

View File

@@ -11,12 +11,14 @@ from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.queue.tx import create as queue_create
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import NonceReservation
logg = logging.getLogger()
def test_unpack_transfer(
default_chain_spec,
init_database,
init_w3,
init_rpc,
cic_registry,
@@ -24,6 +26,9 @@ def test_unpack_transfer(
bancor_registry,
):
NonceReservation.next(init_w3.eth.accounts[0], 'foo', init_database)
init_database.commit()
source_token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
logg.debug('bancor tokens {} {}'.format(bancor_tokens, source_token))
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
@@ -32,6 +37,7 @@ def test_unpack_transfer(
init_w3.eth.accounts[1],
42,
default_chain_spec,
'foo',
)
s = init_w3.eth.sign_transaction(transfer_tx)
s_bytes = bytes.fromhex(s['raw'][2:])
@@ -56,6 +62,9 @@ def test_queue_cache_transfer(
bancor_registry,
):
NonceReservation.next(init_w3.eth.accounts[0], 'foo', init_database)
init_database.commit()
source_token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
value = 42
@@ -64,6 +73,7 @@ def test_queue_cache_transfer(
init_w3.eth.accounts[1],
value,
default_chain_spec,
'foo',
)
tx_signed = init_w3.eth.sign_transaction(transfer_tx)
tx_hash = init_w3.eth.sendRawTransaction(tx_signed['raw'])

View File

@@ -8,12 +8,17 @@ import moolb
# local imports
from cic_eth.eth.token import TokenTxFactory
from cic_eth.eth.task import sign_tx
from cic_eth.db.models.nonce import (
NonceReservation,
Nonce,
)
logg = logging.getLogger()
# TODO: This test fails when not run alone. Identify which fixture leaves a dirty state
def test_filter_process(
init_database,
init_rpc,
default_chain_spec,
default_chain_registry,
@@ -29,9 +34,22 @@ def test_filter_process(
tx_hashes = []
# external tx
# TODO: it does not make sense to use the db setup for nonce here, but we need it as long as we are using the factory to assemble to tx
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
NonceReservation.next(init_w3.eth.accounts[0], 'foo', init_database)
init_database.commit()
init_eth_tester.mine_blocks(13)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec, 'foo')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
@@ -43,9 +61,12 @@ def test_filter_process(
t.add(a.to_bytes(4, 'big'))
# external tx
NonceReservation.next(init_w3.eth.accounts[0], 'bar', init_database)
init_database.commit()
init_eth_tester.mine_blocks(28)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec, 'bar')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)

View File

@@ -0,0 +1,71 @@
# standard imports
import logging
# local imports
from cic_eth.queue.tx import get_status_tx
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.tx import cache_gas_refill_data
from cic_eth.db.models.otx import Otx
logg = logging.getLogger()
def test_status_tx_list(
default_chain_spec,
init_database,
init_w3,
):
tx = {
'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[1],
'nonce': 42,
'gas': 21000,
'gasPrice': 1000000,
'value': 128,
'chainId': 666,
'data': '',
}
logg.debug('nonce {}'.format(tx['nonce']))
tx_signed = init_w3.eth.sign_transaction(tx)
#tx_hash = RpcClient.w3.keccak(hexstr=tx_signed['raw'])
tx_hash = init_w3.keccak(hexstr=tx_signed['raw'])
queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec))
cache_gas_refill_data(tx_hash.hex(), tx)
tx_hash_hex = tx_hash.hex()
q = init_database.query(Otx)
otx = q.get(1)
otx.sendfail(session=init_database)
init_database.add(otx)
init_database.commit()
init_database.refresh(otx)
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 1
otx.sendfail(session=init_database)
otx.retry(session=init_database)
init_database.add(otx)
init_database.commit()
init_database.refresh(otx)
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.QUEUED, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 0
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.IN_NETWORK, session=init_database)
assert len(txs) == 0

View File

@@ -10,7 +10,7 @@ from cic_notify.error import PleaseCommitFirstError
logg = logging.getLogger()
version = (0, 4, 0, 'alpha.2')
version = (0, 4, 0, 'alpha.3')
version_object = semver.VersionInfo(
major=version[0],
@@ -24,9 +24,6 @@ version_string = str(version_object)
def git_hash():
import subprocess
git_diff = subprocess.run(['git', 'diff'], capture_output=True)
if len(git_diff.stdout) > 0:
raise PleaseCommitFirstError()
git_hash = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True)
git_hash_brief = git_hash.stdout.decode('utf-8')[:8]
@@ -35,7 +32,7 @@ def git_hash():
try:
version_git = git_hash()
version_string += '.build.{}'.format(version_git)
version_string += '+build.{}'.format(version_git)
except FileNotFoundError:
time_string_pair = str(time.time()).split('.')
version_string += '+build.{}{:<09d}'.format(

View File

@@ -1,5 +1,5 @@
celery~=4.4.7
confini~=0.3.6a1
alembic~=1.4.2
celery~=4.4.7
confini~=0.3.6rc3
redis~=3.5.3
semver==2.13.0

View File

@@ -1,6 +1,6 @@
[metadata]
name = cic-notify
version= attr: cic_notify.version.__version_string__
version= 0.4.0a2
description = CIC notifications service
author = Louis Holbrook
author_email = dev@holbrook.no

View File

@@ -0,0 +1,2 @@
[app]
service_code = *483*46#

View File

@@ -0,0 +1,4 @@
[client]
host =
port =
ssl =

View File

@@ -0,0 +1,3 @@
[ussd]
user =
pass =

View File

@@ -14,7 +14,7 @@ from cic_ussd.db.models.user import User
from cic_ussd.db.models.ussd_session import UssdSession
from cic_ussd.db.models.task_tracker import TaskTracker
from cic_ussd.menu.ussd_menu import UssdMenu
from cic_ussd.processor import custom_display_text, process_request
from cic_ussd.processor import custom_display_text, process_request, retrieve_most_recent_ussd_session
from cic_ussd.redis import InMemoryStore
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
from cic_ussd.validator import check_known_user, validate_response_type
@@ -60,7 +60,8 @@ def create_ussd_session(
phone: str,
service_code: str,
user_input: str,
current_menu: str) -> InMemoryUssdSession:
current_menu: str,
session_data: Optional[dict] = None) -> InMemoryUssdSession:
"""
Creates a new ussd session
:param external_session_id: Session id value provided by AT
@@ -73,6 +74,8 @@ def create_ussd_session(
:type user_input: str
:param current_menu: Menu name that is currently being displayed on the ussd session
:type current_menu: str
:param session_data: Any additional data that was persisted during the user's interaction with the system.
:type session_data: dict.
:return: ussd session object
:rtype: Session
"""
@@ -81,7 +84,8 @@ def create_ussd_session(
msisdn=phone,
user_input=user_input,
state=current_menu,
service_code=service_code
service_code=service_code,
session_data=session_data
)
return session
@@ -126,7 +130,9 @@ def create_or_update_session(
phone=phone,
service_code=service_code,
user_input=user_input,
current_menu=current_menu)
current_menu=current_menu,
session_data=session_data
)
return ussd_session
@@ -338,14 +344,26 @@ def process_menu_interaction_requests(chain_str: str,
user_input=user_input
)
# create or update the ussd session as appropriate
ussd_session = create_or_update_session(
external_session_id=external_session_id,
phone=phone_number,
service_code=service_code,
user_input=user_input,
current_menu=current_menu.get('name')
)
last_ussd_session = retrieve_most_recent_ussd_session(phone_number=user.phone_number)
if last_ussd_session:
# create or update the ussd session as appropriate
ussd_session = create_or_update_session(
external_session_id=external_session_id,
phone=phone_number,
service_code=service_code,
user_input=user_input,
current_menu=current_menu.get('name'),
session_data=last_ussd_session.session_data
)
else:
ussd_session = create_or_update_session(
external_session_id=external_session_id,
phone=phone_number,
service_code=service_code,
user_input=user_input,
current_menu=current_menu.get('name')
)
# define appropriate response
response = custom_display_text(

View File

@@ -6,7 +6,7 @@ from typing import Optional
# third party imports
import celery
from cic_types.models.person import Person
from sqlalchemy import desc
from tinydb.table import Document
# local imports
@@ -315,6 +315,16 @@ def process_start_menu(display_key: str, user: User):
)
def retrieve_most_recent_ussd_session(phone_number: str) -> UssdSession:
# get last ussd session based on user phone number
last_ussd_session = UssdSession.session\
.query(UssdSession)\
.filter_by(msisdn=phone_number)\
.order_by(desc(UssdSession.created))\
.first()
return last_ussd_session
def process_request(user_input: str, user: User, ussd_session: Optional[dict] = None) -> Document:
"""This function assesses a request based on the user from the request comes, the session_id and the user's
input. It determines whether the request translates to a return to an existing session by checking whether the
@@ -337,7 +347,23 @@ def process_request(user_input: str, user: User, ussd_session: Optional[dict] =
return UssdMenu.find_by_name(name=successive_state)
else:
if user.has_valid_pin():
return UssdMenu.find_by_name(name='start')
last_ussd_session = retrieve_most_recent_ussd_session(phone_number=user.phone_number)
key = create_cached_data_key(
identifier=blockchain_address_to_metadata_pointer(blockchain_address=user.blockchain_address),
salt='cic.person'
)
user_metadata = get_cached_data(key=key)
if last_ussd_session:
# get last state
last_state = last_ussd_session.state
logg.debug(f'LAST USSD SESSION STATE: {last_state}')
# if last state is account_creation_prompt and metadata exists, show start menu
if last_state == 'account_creation_prompt' and user_metadata is not None:
return UssdMenu.find_by_name(name='start')
else:
return UssdMenu.find_by_name(name=last_state)
else:
if user.failed_pin_attempts >= 3 and user.get_account_status() == AccountStatus.LOCKED.name:
return UssdMenu.find_by_name(name='exit_pin_blocked')

View File

@@ -0,0 +1,107 @@
#!/usr/bin/python3
# Author: Louis Holbrook <dev@holbrook.no> (https://holbrook.no)
# Description: interactive console for Sempo USSD session
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import sys
import uuid
import json
import argparse
import logging
import urllib
from xdg.BaseDirectory import xdg_config_home
from urllib import request
# third-party imports
from confini import Config
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic')
argparser = argparse.ArgumentParser(description='CLI tool to interface a Sempo USSD session')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
#argparser.add_argument('-d', type=str, default='local', help='deployment name to interface (config root subdirectory)')
argparser.add_argument('--host', type=str, default='localhost')
argparser.add_argument('--port', type=int, default=9000)
argparser.add_argument('--nossl', help='do not use ssl (careful)', action='store_true')
argparser.add_argument('phone', help='phone number for USSD session')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
#config_dir = os.path.join(args.c, args.d)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = Config(config_dir)
config.process()
logg.debug('config loaded from {}'.format(config_dir))
host = config.get('CLIENT_HOST')
port = config.get('CLIENT_PORT')
ssl = config.get('CLIENT_SSL')
if host == None:
host = args.host
if port == None:
port = args.port
if ssl == None:
ssl = not args.nossl
elif ssl == 0:
ssl = False
else:
ssl = True
def main():
# TODO: improve url building
url = 'http'
if ssl:
url += 's'
url += '://{}:{}'.format(host, port)
url += '/?username={}&password={}'.format(config.get('USSD_USER'), config.get('USSD_PASS'))
logg.info('service url {}'.format(url))
logg.info('phone {}'.format(args.phone))
session = uuid.uuid4().hex
data = {
'sessionId': session,
'serviceCode': config.get('APP_SERVICE_CODE'),
'phoneNumber': args.phone,
'text': config.get('APP_SERVICE_CODE'),
}
state = "_BEGIN"
while state != "END":
if state != "_BEGIN":
user_input = input('next> ')
data['text'] = user_input
req = urllib.request.Request(url)
data_str = json.dumps(data)
data_bytes = data_str.encode('utf-8')
req.add_header('Content-Type', 'application/json')
req.data = data_bytes
response = urllib.request.urlopen(req)
response_data = response.read().decode('utf-8')
state = response_data[:3]
out = response_data[4:]
print(out)
if __name__ == "__main__":
main()

View File

@@ -1,7 +1,7 @@
# standard imports
import semver
version = (0, 3, 0, 'alpha.1')
version = (0, 3, 0, 'alpha.5')
version_object = semver.VersionInfo(
major=version[0],

View File

@@ -1,4 +1,5 @@
FROM python:3.8.5-alpine
# FROM python:3.8.5-alpine
FROM python:3.8.6-slim-buster
# set working directory
WORKDIR /usr/src
@@ -6,10 +7,8 @@ WORKDIR /usr/src
# add args for installing from self-hosted packages
ARG pip_extra_index_url_flag='--extra-index-url https://pip.grassrootseconomics.net:8433'
# add alpine sys packages
RUN apk update && \
apk add git linux-headers postgresql-dev gnupg bash
RUN apk add --update musl-dev gcc libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
# create secrets directory
RUN mkdir -vp pgp/keys

View File

@@ -16,9 +16,16 @@ div#session {
<textarea id="monitor" disabled="1"></textarea>
<div id="login">
<label for="user">API username</label>
<input type="text" id="user" name="user" type="text" /><br/>
<input type="text" id="user" name="user" />
<label for="user">API password</label>
<input type="text" id="pass" name="pass" type="text" /><br/>
<input type="text" id="pass" name="pass" /> <br/>
<label for="host">API host</label>
<input type="text" id="host" name="host" />
<label for="host">API port</label>
<input type="text" id="port" name="port" />
<label for="host">SSL</label>
<input type="checkbox" id="ssl" name="ssl" checked="1"/> <br/>
<hr/>
<input type="text" id="phone" /> <button onclick="setPhone(document.getElementById('phone').value);" id="send_phone">set phone number</button>
</div>

View File

@@ -1,10 +1,12 @@
//var proto = 'http';
//var host = 'localhost:9000';
var ssl = false;
var host = 'localhost';
var port = 9000;
var proto = 'https';
var host = 'staging.sarafu.network';
var user = 'admin_bert_token_inc.';
var pass = '197781ed60bf16d5dc12d84e3df37e35';
var serviceCode = '*483*061#';
var user = 'foo';
var pass = 'bar';
var path = '/';
var serviceCode = '*483*46#';
// cheekily stolen from https://www.tutorialspoint.com/how-to-create-guid-uuid-in-javascript
function createUUID() {
@@ -23,9 +25,17 @@ function send(s) {
document.getElementById('send_input').disabled = true;
var xhr = new XMLHttpRequest();
xhr.responseType = 'text';
current_user = document.getElementById('user').value;
current_pass = document.getElementById('pass').value;
xhr.open('POST', proto + '://' + host + '/api/v1/ussd/kenya?username=' + current_user + '&password=' + current_pass, true);
const current_user = document.getElementById('user').value;
const current_pass = document.getElementById('pass').value;
const current_host = document.getElementById('host').value;
const current_port = document.getElementById('port').value;
let current_scheme = 'http';
if (document.getElementById('ssl').checked) {
current_scheme += 's';
}
const url = current_scheme + '://' + current_host + ':' + current_port + '?username=' + current_user + '&password=' + current_pass
console.debug('connecting to', url);
xhr.open('POST', url, true);
xhr.setRequestHeader('Content-Type', 'application/json');
data = {
sessionId: uuid,
@@ -106,6 +116,8 @@ function abort() {
window.addEventListener('load', () => {
document.getElementById('user').value = user;
document.getElementById('pass').value = pass;
document.getElementById('host').value = host;
document.getElementById('port').value = port;
document.getElementById('phone').addEventListener('keyup', (e) => {
if (e.keyCode == '13') {
document.getElementById('input').value = '';

View File

@@ -6,12 +6,11 @@ betterpath==0.2.2
billiard==3.6.3.0
celery==4.4.7
cffi==1.14.3
chainlib~=0.0.1a15
cic-eth==0.10.0a38
cic-notify==0.3.1
cic-types==0.1.0a8
cic-eth~=0.10.0a41
cic-notify~=0.4.0a3
cic-types~=0.1.0a8
click==7.1.2
confini==0.3.5
confini~=0.3.6rc3
cryptography==3.2.1
faker==4.17.1
iniconfig==1.1.1
@@ -36,7 +35,6 @@ python-i18n==0.3.9
pytz==2020.1
PyYAML==5.3.1
redis==3.5.3
requests==2.24.0
semver==2.13.0
six==1.15.0
SQLAlchemy==1.3.20
@@ -46,4 +44,4 @@ transitions==0.8.4
uWSGI==2.0.19.1
vcversioner==2.16.0.0
vine==1.3.0
zope.interface==5.1.2
zope.interface==5.1.2

View File

@@ -44,4 +44,5 @@ scripts =
[options.entry_points]
console_scripts =
cic-ussd-tasker = cic_ussd.runnable.tasker:main
cic-ussd-tasker = cic_ussd.runnable.tasker:main
cic-ussd-client = cic_ussd.runnable.client:main

View File

@@ -1,14 +1,22 @@
#FROM ethereum/solc:0.6.12
FROM ethereum/solc:0.8.0
# syntax = docker/dockerfile:1.2
FROM python:3.8.6-slim-buster as compile-image
# The solc image messes up the alpine environment, so we have to go all over again
FROM alpine
COPY --from=0 /usr/bin/solc /usr/bin/solc
RUN apt-get update
RUN apt-get install -y --no-install-recommends git gcc g++ libpq-dev gawk jq telnet wget openssl iputils-ping gnupg socat bash procps make python2 cargo
RUN apk update && \
apk add make git
WORKDIR /usr/src
RUN apt-get install -y software-properties-common
RUN add-apt-repository ppa:ethereum/ethereum
RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 1C52189C923F6CA9
RUN apt-get update
RUN apt-get install solc
RUN pip install --upgrade pip
WORKDIR /root
RUN mkdir -vp /usr/local/etc/cic
COPY contract-migration/nvm.sh .
ENV CONFINI_DIR /usr/local/etc/cic/
RUN mkdir -vp $CONFINI_DIR
ARG cic_config_commit=35c69ba75f00c8147150acf325565d5391cf25bf
ARG cic_config_url=https://gitlab.com/grassrootseconomics/cic-config.git/
@@ -16,11 +24,8 @@ RUN echo Install confini schema files && \
git clone --depth 1 $cic_config_url cic-config && \
cd cic-config && \
git fetch --depth 1 origin $cic_config_commit && \
git checkout $cic_config_commit && \
mkdir -vp /usr/local/etc/cic && \
cp -v *.ini /usr/local/etc/cic/
ENV CONFINI_DIR /usr/local/etc/cic
git checkout $cic_config_commit && \
cp -v *.ini $CONFINI_DIR
ARG cic_contracts_commit=698ef3a30fde8d7f2c498f1208fb0ff45d665501
ARG cic_contracts_url=https://gitlab.com/grassrootseconomics/cic-contracts.git/
@@ -31,30 +36,6 @@ RUN echo Install ABI collection for solidity interfaces used across all componen
git checkout $cic_contracts_commit && \
make install
#COPY ./Makefile ./cic-contracts/Makefile
#COPY ./*.sol ./cic-contracts/
#RUN cd cic-contracts && \
# make -B && make install -B
FROM python:3.8.6-slim-buster
COPY --from=1 /usr/local/share/cic/ /usr/local/share/cic/
COPY --from=1 /usr/local/etc/ /usr/local/etc/
LABEL authors="Louis Holbrook <dev@holbrook.no> 0826EDA1702D1E87C6E2875121D2E7BB88C2A746"
LABEL spdx-license-identifier="GPL-3.0-or-later"
LABEL description="Base layer for buiding development images for the cic component suite"
RUN apt-get update && \
apt-get install -y git gcc g++ libpq-dev && \
apt-get install -y vim gawk jq telnet openssl iputils-ping curl wget gnupg socat bash procps make python2 postgresql-client
RUN echo installing nodejs tooling
COPY contract-migration/nvm.sh /root/
# Install nvm with node and npm
# https://stackoverflow.com/questions/25899912/how-to-install-nvm-in-docker
ENV NVM_DIR /root/.nvm
@@ -65,67 +46,100 @@ RUN wget -qO- https://raw.githubusercontent.com/nvm-sh/nvm/v0.37.2/install.sh |
&& . $NVM_DIR/nvm.sh \
&& nvm install $NODE_VERSION \
&& nvm alias default $NODE_VERSION \
&& nvm use $NODE_VERSION \
# So many ridiculously stupid issues with node in docker that take oceans of absolutely wasted time to resolve
# owner of these files is "1001" by default - wtf
&& chown -R root:root "$NVM_DIR/versions/node/v$NODE_VERSION"
&& nvm use $NODE_VERSION
# && chown -R root:root "$NVM_DIR/versions/node/v$NODE_VERSION"
ENV NODE_PATH $NVM_DIR/versions/node//v$NODE_VERSION/lib/node_modules
ENV PATH $NVM_DIR/versions/node//v$NODE_VERSION/bin:$PATH
# RUN pip install --user --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version
RUN useradd --create-home grassroots
WORKDIR /home/grassroots
USER grassroots
ARG pip_extra_index_url=https://pip.grassrootseconomics.net:8433
ARG cic_base_version=0.1.1a23
ARG cic_registry_version=0.5.3a24
ARG cic_eth_version=0.10.0a41
ARG chainlib_version=0.0.1a21
ARG cic_contracts_version=0.0.2a2
RUN pip install --user --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version \
cic-registry==$cic_registry_version \
cic-eth==$cic_eth_version \
chainlib==$chainlib_version \
cic-contracts==$cic_contracts_version
# ARG cic_bancor_url=https://gitlab.com/grassrootseconomics/cic-bancor.git/
# ARG cic_bancor_contracts_url=https://github.com/bancorprotocol/contracts-solidity
# RUN echo Compile and install bancor protocol contracts && \
# git clone --depth 1 $cic_bancor_url cic-bancor && \
# cd cic-bancor
# RUN cd cic-bancor/python && \
# pip install --extra-index-url $pip_extra_index_url .
# This is a temporary solution for building the Bancor contracts using the bancor protocol repository truffle setup
# We should instead flatten the files ourselves and build them with solc in the first image layer in this file
ARG cic_bancor_commit=a04c7ae6882ea515938d852cc861d59a35070094
ARG cic_bancor_url=https://gitlab.com/grassrootseconomics/cic-bancor.git/
ARG cic_bancor_contracts_url=https://github.com/bancorprotocol/contracts-solidity
RUN echo Compile and install bancor protocol contracts && \
git clone --depth 1 $cic_bancor_url cic-bancor && \
cd cic-bancor && \
git fetch --depth 1 origin $cic_bancor_commit && \
git checkout $cic_bancor_commit && \
# Apparently the git version here doesn't have set-url as a command. *sigh*
#if [ ! -z $cic_bancor_contracts_url ]; then
# git submodule set-url bancor $cic_bancor_contracts_url
#fi
git submodule init && \
git submodule update
RUN cd root && \
. $NVM_DIR/nvm.sh &&\
nvm install $BANCOR_NODE_VERSION && \
nvm use $BANCOR_NODE_VERSION && \
cd - && \
cd cic-bancor/bancor && \
npm install --python=/usr/bin/python2 && \
node_modules/truffle/build/cli.bundled.js compile && \
mkdir -vp /usr/local/share/cic/bancor/solidity/build && \
cp -vR solidity/build/contracts /usr/local/share/cic/bancor/solidity/build/
RUN cd cic-bancor/python && \
pip install --extra-index-url $pip_extra_index_url .
# ARG cic_bancor_commit=a04c7ae6882ea515938d852cc861d59a35070094
# ARG cic_bancor_url=https://gitlab.com/grassrootseconomics/cic-bancor.git/
# ARG cic_bancor_contracts_url=https://github.com/bancorprotocol/contracts-solidity
# RUN echo Compile and install bancor protocol contracts && \
# git clone --depth 1 $cic_bancor_url cic-bancor && \
# cd cic-bancor && \
# git fetch --depth 1 origin $cic_bancor_commit && \
# git checkout $cic_bancor_commit && \
# # Apparently the git version here doesn't have set-url as a command. *sigh*
# #if [ ! -z $cic_bancor_contracts_url ]; then
# # git submodule set-url bancor $cic_bancor_contracts_url
# #fi
# git submodule init && \
# git submodule update
# RUN cd root && \
# . $NVM_DIR/nvm.sh &&\
# nvm install $BANCOR_NODE_VERSION && \
# nvm use $BANCOR_NODE_VERSION && \
# cd - && \
# cd cic-bancor/bancor && \
# npm install --python=/usr/bin/python2 && \
# node_modules/truffle/build/cli.bundled.js compile && \
# mkdir -vp /usr/local/share/cic/bancor/solidity/build && \
# cp -vR solidity/build/contracts /usr/local/share/cic/bancor/solidity/build/
# RUN cd cic-bancor/python && \
# pip install --extra-index-url $pip_extra_index_url .
FROM python:3.8.6-slim-buster as runtime-image
RUN apt-get install -y cargo
ARG cic_base_version=0.1.1a10
RUN pip install --extra-index-url $pip_extra_index_url cic-base[full_graph]==$cic_base_version
RUN apt-get update
RUN apt-get install -y --no-install-recommends gnupg libpq-dev
ARG cic_registry_version=0.5.3a22
RUN pip install --extra-index-url $pip_extra_index_url cic-registry==$cic_registry_version
COPY --from=compile-image /usr/local/bin/ /usr/local/bin/
COPY --from=compile-image /usr/local/etc/cic/ /usr/local/etc/cic/
WORKDIR /root
RUN useradd --create-home grassroots
WORKDIR /home/grassroots
# COPY python dependencies to user dir
COPY --from=compile-image /home/grassroots/.local .local
ENV PATH=/home/grassroots/.local/bin:$PATH
COPY contract-migration/testdata/pgp testdata/pgp
COPY contract-migration/wait-for-it.sh .
RUN chmod +x ./wait-for-it.sh
# COPY contract-migration/.env_config_template .env_config_template
# COPY contract-migration/.env_dockercompose_template .env_dockercompose_template
COPY contract-migration/reset.sh reset.sh
COPY contract-migration/from_env.sh from_env.sh
COPY contract-migration/seed_cic_eth.sh seed_cic_eth.sh
COPY contract-migration/sarafu_declaration.json sarafu_declaration.json
COPY contract-migration/keystore keystore
COPY contract-migration/envlist .
ENTRYPOINT [ "/bin/bash" ]
# RUN chown grassroots:grassroots .local/
RUN chown grassroots:grassroots ./
RUN chmod gu+x *.sh
RUN mkdir -p /tmp/cic/config
RUN chown grassroots:grassroots /tmp/cic/config
# A shared output dir for environment configs
RUN chmod a+rwx /tmp/cic/config
USER grassroots
ENTRYPOINT [ ]

View File

@@ -0,0 +1,61 @@
SYNCER_LOOP_INTERVAL
SSL_ENABLE_CLIENT
SSL_CERT_FILE
SSL_KEY_FILE
SSL_PASSWORD
SSL_CA_FILE
BANCOR_DIR
REDIS_HOST
REDIS_PORT
REDIS_DB
PGP_EXPORTS_DIR
PGP_PRIVATEKEY_FILE
PGP_PASSPHRASE
DATABASE_USER
DATABASE_PASSWORD
DATABASE_NAME
DATABASE_HOST
DATABASE_PORT
DATABASE_ENGINE
DATABASE_DRIVER
DATABASE_DEBUG
TASKS_AFRICASTALKING
TASKS_SMS_DB
TASKS_LOG
TASKS_TRACE_QUEUE_STATUS
TASKS_TRANSFER_CALLBACKS
DEV_MNEMONIC
DEV_ETH_RESERVE_ADDRESS
DEV_ETH_ACCOUNTS_INDEX_ADDRESS
DEV_ETH_RESERVE_AMOUNT
DEV_ETH_ACCOUNT_BANCOR_DEPLOYER
DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER
DEV_ETH_ACCOUNT_GAS_PROVIDER
DEV_ETH_ACCOUNT_RESERVE_OWNER
DEV_ETH_ACCOUNT_RESERVE_MINTER
DEV_ETH_ACCOUNT_ACCOUNTS_INDEX_OWNER
DEV_ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER
DEV_ETH_ACCOUNT_SARAFU_OWNER
DEV_ETH_ACCOUNT_SARAFU_GIFTER
DEV_ETH_ACCOUNT_APPROVAL_ESCROW_OWNER
DEV_ETH_ACCOUNT_SINGLE_SHOT_FAUCET_OWNER
DEV_ETH_SARAFU_TOKEN_NAME
DEV_ETH_SARAFU_TOKEN_SYMBOL
DEV_ETH_SARAFU_TOKEN_DECIMALS
DEV_ETH_SARAFU_TOKEN_ADDRESS
DEV_PGP_PUBLICKEYS_ACTIVE_FILE
DEV_PGP_PUBLICKEYS_TRUSTED_FILE
DEV_PGP_PUBLICKEYS_ENCRYPT_FILE
CIC_REGISTRY_ADDRESS
CIC_APPROVAL_ESCROW_ADDRESS
CIC_TOKEN_INDEX_ADDRESS
CIC_ACCOUNTS_INDEX_ADDRESS
CIC_DECLARATOR_ADDRESS
CIC_CHAIN_SPEC
ETH_PROVIDER
ETH_ABI_DIR
SIGNER_SOCKET_PATH
SIGNER_SECRET
CELERY_BROKER_URL
CELERY_RESULT_URL
META_PROVIDER

View File

@@ -61,10 +61,10 @@ export DEV_ETH_ACCOUNTS_INDEX_ADDRESS=$CIC_ACCOUNTS_INDEX_ADDRESS
export BANCOR_REGISTRY_ADDRESS=$BANCOR_REGISTRY_ADDRESS
export CIC_REGISTRY_ADDRESS=$CIC_REGISTRY_ADDRESS
export CIC_TRUST_ADDRESS=$DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER
export CIC_DECLARATOR_ADDRESS=$CIC_DECLARATOR_ADDRESS
EOF
cat $CIC_DATA_DIR/envlist | bash from_env.sh > $CIC_DATA_DIR/.env_all
cat ./envlist | bash from_env.sh > $CIC_DATA_DIR/.env_all
# popd
set +a

View File

@@ -191,9 +191,16 @@ if __name__ == '__main__':
fa = open(os.path.join(user_dir, 'balances.csv'), 'w')
for i in range(user_count):
(eth, phone, o) = gen()
i = 0
while i < user_count:
eth = None
phone = None
o = None
try:
(eth, phone, o) = gen()
except Exception as e:
logg.warning('generate failed, trying anew: {}'.format(e))
continue
uid = eth[2:].upper()
print(o)
@@ -212,5 +219,7 @@ if __name__ == '__main__':
amount = genAmount()
fa.write('{},{}\n'.format(eth,amount))
logg.debug('pidx {}, uid {}, eth {}, amount {}'.format(pidx, uid, eth, amount))
i += 1
fa.close()

View File

@@ -74,7 +74,7 @@ new cic.PGPKeyStore(
importMeta,
);
const batchSize = 50;
const batchSize = 16;
const batchDelay = 1000;
const total = parseInt(process.argv[3]);
const workDir = path.join(process.argv[2], 'meta');

View File

@@ -38,7 +38,7 @@ argparser.add_argument('--redis-host-callback', dest='redis_host_callback', defa
argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
argparser.add_argument('--batch-size', dest='batch_size', default=50, type=int, help='burst size of sending transactions to node')
argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=20.0, type=float, help='Callback timeout')
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
@@ -97,11 +97,15 @@ def register_eth(i, u):
callback_queue=args.q,
)
t = api.create_account(register=True)
logg.debug('register {} -> {}'.format(u, t))
while True:
ps.get_message()
m = ps.get_message(timeout=args.timeout)
address = None
if m == None:
logg.debug('message timeout')
return
if m['type'] == 'subscribe':
logg.debug('skipping subscribe message')
continue
@@ -109,11 +113,11 @@ def register_eth(i, u):
r = json.loads(m['data'])
address = r['result']
break
except TypeError as e:
except Exception as e:
if m == None:
logg.critical('empty response from redis callback (did the service crash?)')
logg.critical('empty response from redis callback (did the service crash?) {}'.format(e))
else:
logg.critical('unexpected response from redis callback: {}'.format(m))
logg.critical('unexpected response from redis callback: {} {}'.format(m, e))
sys.exit(1)
logg.debug('[{}] register eth {} {}'.format(i, u, address))

View File

@@ -1,3 +1,3 @@
cic-base[full_graph]==0.1.1a12
cic-eth==0.10.0a37
cic-base[full_graph]==0.1.1a23
cic-eth==0.10.0a41
cic-types==0.1.0a8

View File

@@ -58,6 +58,7 @@ argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spe
argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-x', '--exit-on-error', dest='x', action='store_true', help='Halt exection on error')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('user_dir', type=str, help='user export directory')
@@ -91,6 +92,27 @@ old_chain_spec = ChainSpec.from_chain_str(args.old_chain_spec)
old_chain_str = str(old_chain_spec)
user_dir = args.user_dir # user_out_dir from import_users.py
meta_url = args.meta_provider
exit_on_error = args.x
class VerifierState:
def __init__(self, item_keys):
self.items = {}
for k in item_keys:
logg.info('k {}'.format(k))
self.items[k] = 0
def poke(self, item_key):
self.items[item_key] += 1
def __str__(self):
r = ''
for k in self.items.keys():
r += '{}: {}\n'.format(k, self.items[k])
return r
class VerifierError(Exception):
@@ -107,7 +129,8 @@ class VerifierError(Exception):
class Verifier:
def __init__(self, conn, cic_eth_api, gas_oracle, chain_spec, index_address, token_address, data_dir):
# TODO: what an awful function signature
def __init__(self, conn, cic_eth_api, gas_oracle, chain_spec, index_address, token_address, data_dir, exit_on_error=False):
self.conn = conn
self.gas_oracle = gas_oracle
self.chain_spec = chain_spec
@@ -117,9 +140,18 @@ class Verifier:
self.tx_factory = TxFactory(chain_id=chain_spec.chain_id(), gas_oracle=gas_oracle)
self.api = cic_eth_api
self.data_dir = data_dir
self.exit_on_error = exit_on_error
verifymethods = []
for k in dir(self):
if len(k) > 7 and k[:7] == 'verify_':
logg.info('adding verify method {}'.format(k))
verifymethods.append(k[7:])
self.state = VerifierState(verifymethods)
def verify_accounts_index(self, address):
def verify_accounts_index(self, address, balance=None):
tx = self.tx_factory.template(ZERO_ADDRESS, self.index_address)
data = keccak256_string_to_hex('have(address)')[:8]
data += eth_abi.encode_single('address', address).hex()
@@ -145,14 +177,14 @@ class Verifier:
raise VerifierError((actual_balance, balance), 'balance')
def verify_local_key(self, address):
def verify_local_key(self, address, balance=None):
r = self.api.have_account(address, str(self.chain_spec))
logg.debug('verify local key result {}'.format(r))
if r != address:
raise VerifierError((address, r), 'local key')
def verify_metadata(self, address):
def verify_metadata(self, address, balance=None):
k = generate_metadata_pointer(bytes.fromhex(strip_0x(address)), ':cic.person')
url = os.path.join(meta_url, k)
logg.debug('verify metadata url {}'.format(url))
@@ -184,15 +216,33 @@ class Verifier:
def verify(self, address, balance):
logg.debug('verify {} {}'.format(address, balance))
try:
self.verify_local_key(address)
self.verify_accounts_index(address)
self.verify_balance(address, balance)
self.verify_metadata(address)
except VerifierError as e:
logg.critical('verification failed: {}'.format(e))
sys.exit(1)
methods = [
'local_key',
'accounts_index',
'balance',
'metadata',
]
for k in methods:
try:
m = getattr(self, 'verify_{}'.format(k))
m(address, balance)
# self.verify_local_key(address)
# self.verify_accounts_index(address)
# self.verify_balance(address, balance)
# self.verify_metadata(address)
except VerifierError as e:
logline = 'verification {} failed for {}: {}'.format(k, address, str(e))
if self.exit_on_error:
logg.critical(logline)
sys.exit(1)
logg.error(logline)
self.state.poke(k)
def __str__(self):
return str(self.state)
class MockClient:
@@ -263,7 +313,8 @@ def main():
r = l.split(',')
try:
address = to_checksum(r[0])
sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r")
#sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r")
logg.debug('loading balance {} {}'.format(i, address).ljust(200))
except ValueError:
break
balance = int(r[1].rstrip())
@@ -274,7 +325,7 @@ def main():
api = AdminApi(MockClient())
verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address, user_dir)
verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address, user_dir, exit_on_error)
user_new_dir = os.path.join(user_dir, 'new')
for x in os.walk(user_new_dir):
@@ -298,11 +349,17 @@ def main():
new_address = u.identities['evm'][subchain_str][0]
subchain_str = '{}:{}'.format(old_chain_spec.common_name(), old_chain_spec.network_id())
old_address = u.identities['evm'][subchain_str][0]
balance = balances[old_address]
balance = 0
try:
balance = balances[old_address]
except KeyError:
logg.info('no old balance found for {}, assuming 0'.format(old_address))
logg.debug('checking {} -> {} = {}'.format(old_address, new_address, balance))
verifier.verify(new_address, balance)
print(verifier)
if __name__ == '__main__':
main()

6
apps/contract-migration/seed_cic_eth.sh Normal file → Executable file
View File

@@ -26,14 +26,11 @@ env_out_file=${CIC_DATA_DIR}/.env_seed
init_level_file=${CIC_DATA_DIR}/.init
truncate $env_out_file -s 0
pip install --extra-index-url https://pip.grassrootseconomics.net:8433 chainlib==0.0.1a22
set -e
set -a
# We need to not install these here...
pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL cic-eth==0.10.0a37 chainlib==0.0.1a19 cic-contracts==0.0.2a2
pip install --extra-index-url $DEV_PIP_EXTRA_INDEX_URL --force-reinstall erc20-transfer-authorization==0.3.0a10
>&2 echo "create account for gas gifter"
old_gas_provider=$DEV_ETH_ACCOUNT_GAS_PROVIDER
DEV_ETH_ACCOUNT_GAS_GIFTER=`cic-eth-create $debug --redis-host-callback=$REDIS_HOST --redis-port-callback=$REDIS_PORT --no-register`
@@ -129,6 +126,7 @@ export CIC_TOKEN_INDEX_ADDRESS=$CIC_TOKEN_INDEX_ADDRESS
>&2 echo "add declarations for sarafu token"
token_description_one=`sha256sum sarafu_declaration.json | awk '{ print $1; }'`
token_description_two=0x54686973206973207468652053617261667520746f6b656e0000000000000000
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> foo $CIC_DECLARATOR_ADDRESSh"
>&2 eth-address-declarator-add -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -r $CIC_DECLARATOR_ADDRESS -w $debug $DEV_ETH_SARAFU_TOKEN_ADDRESS $token_description_one
>&2 eth-address-declarator-add -y $keystore_file -i $CIC_CHAIN_SPEC -p $ETH_PROVIDER -r $CIC_DECLARATOR_ADDRESS -w $debug $DEV_ETH_SARAFU_TOKEN_ADDRESS $token_description_two

View File

@@ -42,6 +42,6 @@ rlp==2.0.1
cryptocurrency-cli-tools==0.0.4
giftable-erc20-token==0.0.7b12
hexathon==0.0.1a3
chainlib==0.0.1a19
chainlib==0.0.1a20
chainsyncer==0.0.1a19
cic-registry==0.5.3.a22

View File

@@ -1,54 +1,35 @@
image: docker:19.03.13
image:
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
variables:
# docker host
DOCKER_HOST: tcp://docker:2376
# container, thanks to volume mount from config.toml
DOCKER_TLS_CERTDIR: "/certs"
# These are usually specified by the entrypoint, however the
# Kubernetes executor doesn't run entrypoints
# https://gitlab.com/gitlab-org/gitlab-runner/-/issues/4125
DOCKER_TLS_VERIFY: 1
DOCKER_CERT_PATH: "$DOCKER_TLS_CERTDIR/client"
# We are building these from the apps dir to easily share the requirements file there.
# It would be nicer to build from the app dir context. TODO figure out a nice way to do this in local DOCKER_TLS_VERIFY
CONTEXT: apps/
services:
- docker:19.03.13-dind
before_script:
- docker info
KANIKO_CACHE_ARGS: "--cache=true --cache-copy-layers=true --cache-ttl=24h"
CONTEXT: $CI_PROJECT_DIR/apps/
.py_build_merge_request:
stage: build
before_script:
- cd $CONTEXT
variables:
CI_DEBUG_TRACE: "true"
IMAGE_TAG: $APP_NAME:$CI_COMMIT_SHORT_SHA
- CI_DEBUG_TRACE: "true"
script:
- docker build -t $IMAGE_TAG -f $DOCKERFILE_PATH .
- mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"$CI_REGISTRY\":{\"username\":\"$CI_REGISTRY_USER\",\"password\":\"$CI_REGISTRY_PASSWORD\"}}}" > "/kaniko/.docker/config.json"
- /kaniko/executor --context $CONTEXT --dockerfile $DOCKERFILE_PATH $KANIKO_CACHE_ARGS --cache-repo $CI_REGISTRY_IMAGE --no-push
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
when: always
.py_build_push:
stage: build
before_script:
- cd $CONTEXT
- echo "$CI_REGISTRY_PASSWORD" | docker login -u "$CI_REGISTRY_USER" $CI_REGISTRY --password-stdin
variables:
CI_DEBUG_TRACE: "true"
IMAGE_TAG_BASE: $CI_REGISTRY_IMAGE/$APP_NAME:$CI_COMMIT_BRANCH-$CI_COMMIT_SHORT_SHA
LATEST_TAG: $CI_REGISTRY_IMAGE/$APP_NAME:latest
script:
- export IMAGE_TAG="$IMAGE_TAG_BASE-$(date +%F.%H%M%S)"
- docker build -t $IMAGE_TAG -f $DOCKERFILE_PATH .
- docker push $IMAGE_TAG
- docker tag $IMAGE_TAG $LATEST_TAG
- docker push $LATEST_TAG
rules:
stage: build
variables:
IMAGE_TAG_BASE: $CI_REGISTRY_IMAGE/$APP_NAME:$CI_COMMIT_BRANCH-$CI_COMMIT_SHORT_SHA
LATEST_TAG: $CI_REGISTRY_IMAGE/$APP_NAME:latest
script:
- export IMAGE_TAG="$IMAGE_TAG_BASE-$(date +%F.%H%M%S)"
- mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"$CI_REGISTRY\":{\"username\":\"$CI_REGISTRY_USER\",\"password\":\"$CI_REGISTRY_PASSWORD\"}}}" > "/kaniko/.docker/config.json"
# - /kaniko/executor --context $CONTEXT --dockerfile $DOCKERFILE_PATH $KANIKO_CACHE_ARGS --destination $IMAGE_TAG
- /kaniko/executor --context $CONTEXT --dockerfile $DOCKERFILE_PATH $KANIKO_CACHE_ARGS --destination $IMAGE_TAG --destination $CI_REGISTRY_IMAGE/$APP_NAME:latest
rules:
- if: $CI_COMMIT_BRANCH == "master"
when: always

View File

@@ -50,9 +50,11 @@ services:
# PGDATA: /tmp/cic/postgres
ports:
- ${DEV_POSTGRES_PORT:-63432}:5432
command: [ "-c", "max_connections=200" ]
volumes:
- ./scripts/initdb/create_db.sql:/docker-entrypoint-initdb.d/1-create_all_db.sql
- ./apps/cic-meta/scripts/initdb/postgresql.sh:/docker-entrypoint-initdb.d/2-init-cic-meta.sh
- ./apps/cic-cache/db/psycopg2/db.sql:/docker-entrypoint-initdb.d/3-init-cic-meta.sql
- postgres-db:/var/lib/postgresql/data
redis:
@@ -161,7 +163,43 @@ services:
- -c
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
/usr/local/bin/cic-cache-tracker -vv
/usr/local/bin/cic-cache-trackerd -vv
volumes:
- contract-config:/tmp/cic/config/:ro
cic-cache-tasker:
build:
context: apps
dockerfile: cic-cache/docker/Dockerfile
environment:
CIC_REGISTRY_ADDRESS: $CIC_REGISTRY_ADDRESS # supplied at contract-config after contract provisioning
ETH_PROVIDER: ${ETH_PROVIDER:-http://eth:8545}
DATABASE_USER: ${DATABASE_USER:-grassroots}
DATABASE_PASSWORD: ${DATABASE_PASSWORD:-tralala} # this is is set at initdb see: postgres/initdb/create_db.sql
DATABASE_HOST: ${DATABASE_HOST:-postgres}
DATABASE_PORT: ${DATABASE_PORT:-5432}
DATABASE_NAME: ${DATABASE_NAME_CIC_CACHE:-cic_cache}
DATABASE_ENGINE: ${DATABASE_ENGINE:-postgres}
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: 1
ETH_ABI_DIR: ${ETH_ABI_DIR:-/usr/local/share/cic/solidity/abi}
CIC_TRUST_ADDRESS: ${DEV_ETH_ACCOUNT_CONTRACT_DEPLOYER:-0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996}
CELERY_BROKER_URL: redis://redis:6379
CELERY_RESULT_URL: redis://redis:6379
deploy:
restart_policy:
condition: on-failure
depends_on:
- redis
- postgres
- eth
command:
- /bin/bash
- -c
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
/usr/local/bin/cic-cache-taskerd -vv
volumes:
- contract-config:/tmp/cic/config/:ro
@@ -191,7 +229,7 @@ services:
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
"/usr/local/bin/uwsgi" \
--wsgi-file /usr/src/cic-cache/cic_cache/runnable/server.py \
--wsgi-file /usr/src/cic-cache/cic_cache/runnable/serverd.py \
--http :8000 \
--pyargv -vv
@@ -237,7 +275,7 @@ services:
- -c
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_tasker.sh -q cic-eth -v
./start_tasker.sh -q cic-eth -vv
# command: [/bin/sh, "./start_tasker.sh", -q, cic-eth, -vv ]
cic-eth-tracker:
@@ -313,7 +351,7 @@ services:
- -c
- |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_dispatcher.sh -q cic-eth -v
./start_dispatcher.sh -q cic-eth -vv
# command: "/root/start_dispatcher.sh -q cic-eth -vv"