Compare commits

...

44 Commits

Author SHA1 Message Date
nolash
4f1014c5e1 Add ussd cli 2021-03-06 22:25:57 +01:00
Louis Holbrook
1bfc1434b4 Merge branch 'lash/cic-eth-upgrade' into 'master'
Upgrade cic-eth version

See merge request grassrootseconomics/cic-internal-integration!53
2021-03-06 20:00:10 +00:00
nolash
7b9cd2d4b8 Upgrade cic-eth version 2021-03-06 20:58:35 +01:00
Louis Holbrook
30febbd1e0 Merge branch 'lash/transfer-authorization' into 'master'
cic-eth: Make nonce separate task

See merge request grassrootseconomics/cic-internal-integration!52
2021-03-06 17:55:51 +00:00
Louis Holbrook
f0088f20de cic-eth: Make nonce separate task 2021-03-06 17:55:51 +00:00
Louis Holbrook
618769a0d2 Merge branch 'lash/cic-cache-biiig-num' into 'master'
cic-cache: Set value db fields in cic_cache to handle biiig numbers

See merge request grassrootseconomics/cic-internal-integration!51
2021-03-05 20:03:52 +00:00
nolash
e0a980363c Set value db fields in cic_cache to handle biiig numbers 2021-03-05 20:45:01 +01:00
9d36a5f92f Merge branch 'philip/dry-run-fixes' into 'master'
Get that "ussd_menu.json" out there.

See merge request grassrootseconomics/cic-internal-integration!50
2021-03-05 19:22:44 +00:00
2fe6f4125f Get that "ussd_menu.json" out there. 2021-03-05 22:09:22 +03:00
b5c50b348d Merge branch 'philip/dry-run-prep' into 'master'
Philip/dry run prep

See merge request grassrootseconomics/cic-internal-integration!49
2021-03-05 16:28:08 +00:00
ea336283dc Philip/dry run prep 2021-03-05 16:28:07 +00:00
aa99b16ad2 Merge branch 'lash/fix-tx-list' into 'master'
Make tx listing tasks work properly

See merge request grassrootseconomics/cic-internal-integration!45
2021-03-04 16:47:13 +00:00
Louis Holbrook
1e7fff0133 Minor refactors:
- Renames s_assemble to s_brief
-  Link s_local to s_brief
2021-03-04 16:47:13 +00:00
21c9d95c4b Merge branch 'lash/transfer-authorization' into 'master'
cic-eth: Introduce transfer authorization contract

See merge request grassrootseconomics/cic-internal-integration!47
2021-03-04 15:06:15 +00:00
Louis Holbrook
8240e58c0a cic-eth: Introduce transfer authorization contract 2021-03-04 15:06:14 +00:00
1e9bf6b4d3 Update .cic-template.yml 2021-03-04 14:39:32 +00:00
7e089e1083 Merge branch 'bvander/cic-meta-entrypoint-update' into 'master'
meta isn't using the compose command its in the the Dockerfile

See merge request grassrootseconomics/cic-internal-integration!42
2021-03-03 16:26:20 +00:00
Louis Holbrook
32627aad27 Merge branch 'lash/session-nonce-queue' into 'master'
Atomic nonce queue db sessions

See merge request grassrootseconomics/cic-internal-integration!48
2021-03-03 07:37:27 +00:00
Louis Holbrook
d9a8c672de Atomic nonce queue db sessions 2021-03-03 07:37:26 +00:00
Louis Holbrook
f92efa28f9 Merge branch 'bvander/bug/cic-trust-var-default' into 'master'
cic trust address bug

See merge request grassrootseconomics/cic-internal-integration!46
2021-03-02 18:02:00 +00:00
73729d19b0 Update apps/cic-cache/cic_cache/runnable/tracker.py 2021-03-02 17:33:13 +00:00
Louis Holbrook
ae1502a651 Merge branch 'lash/refactor-syncer' into 'master'
Syncer refactor

See merge request grassrootseconomics/cic-internal-integration!40
2021-03-01 20:15:17 +00:00
Louis Holbrook
5001113267 Syncer refactor 2021-03-01 20:15:17 +00:00
451079d004 Update ci_templates/.cic-template.yml 2021-03-01 14:35:53 +00:00
ba8a0b1953 Update ci_templates/.cic-template.yml 2021-03-01 14:29:08 +00:00
bbc948757f Update ci_templates/.cic-template.yml 2021-03-01 14:24:38 +00:00
ca8c1b1f27 Update ci_templates/.cic-template.yml 2021-03-01 14:22:46 +00:00
854753f120 Merge branch 'revert-f37fa1db' into 'master'
Revert "Update ci_templates/.cic-template.yml"

See merge request grassrootseconomics/cic-internal-integration!44
2021-02-28 18:32:41 +00:00
daadbc27e9 Revert "Update ci_templates/.cic-template.yml"
This reverts commit f37fa1dbcf
2021-02-28 18:23:01 +00:00
f37fa1dbcf Update ci_templates/.cic-template.yml 2021-02-28 17:54:13 +00:00
Spencer Ofwiti
ac264314c0 Merge branch 'spencer/cic-meta-exports' into 'master'
Add exports to interface with CICADA.

See merge request grassrootseconomics/cic-internal-integration!43
2021-02-25 09:58:43 +00:00
Spencer Ofwiti
84c1d11b48 Add exports to interface with CICADA. 2021-02-25 09:58:43 +00:00
f940d4a961 meta isn't using this. The entrypoin is in the Dockerfile 2021-02-24 06:47:34 -08:00
Spencer Ofwiti
6fe87652ce Merge branch 'spencer/update-cic-meta' into 'master'
Add changes from stand-alone cic-meta repo.

See merge request grassrootseconomics/cic-internal-integration!41
2021-02-24 12:29:41 +00:00
Spencer Ofwiti
8f65be16b1 Add changes from stand-alone cic-meta repo. 2021-02-24 15:14:17 +03:00
Louis Holbrook
34a48a6c6c Merge branch 'lash/fix-imports-again' into 'master'
Rehabilitate import scripts after leak fixes

See merge request grassrootseconomics/cic-internal-integration!37
2021-02-22 20:00:18 +00:00
Louis Holbrook
c68d9d8404 Rehabilitate import scripts after leak fixes 2021-02-22 20:00:18 +00:00
dd8d4b01e2 Merge branch 'bvander/cic-ussd-workdir' into 'master'
change workdir so ussd can run tasker

See merge request grassrootseconomics/cic-internal-integration!39
2021-02-22 18:07:33 +00:00
4d3cc85573 change workdir so it can run tasker 2021-02-22 09:51:57 -08:00
10717df7d1 Merge branch 'bvander/fixes-to-contract-migration-deps' into 'master'
run reset.sh outside docker and fix deps

See merge request grassrootseconomics/cic-internal-integration!38
2021-02-22 15:14:36 +00:00
3f2e0f5b2e run reset.sh outside docker and fix deps 2021-02-22 06:27:59 -08:00
Louis Holbrook
42ae8e5ed3 Merge branch 'lash/import-scripts-refactor' into 'master'
Refactor import scripts

See merge request grassrootseconomics/cic-internal-integration!28
2021-02-21 15:41:37 +00:00
Louis Holbrook
96b4ad4a72 Refactor import scripts 2021-02-21 15:41:37 +00:00
1274958493 rename docker latest tag 2021-02-20 04:12:18 +00:00
276 changed files with 10454 additions and 3674 deletions

View File

@@ -29,8 +29,8 @@ def upgrade():
sa.Column('source_token', sa.String(42), nullable=False),
sa.Column('destination_token', sa.String(42), nullable=False),
sa.Column('success', sa.Boolean, nullable=False),
sa.Column('from_value', sa.BIGINT(), nullable=False),
sa.Column('to_value', sa.BIGINT(), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=False),
sa.Column('date_block', sa.DateTime, nullable=False),
)
op.create_table(

View File

@@ -312,7 +312,7 @@ class Tracker:
session.close()
(provider, w3) = web3_constructor()
trust = config.get('CIC_TRUST_ADDRESS', []).split(",")
trust = config.get('CIC_TRUST_ADDRESS', "").split(",")
chain_spec = args.i
try:

View File

@@ -1,5 +1,5 @@
[database]
NAME=cic-eth
NAME=cic_cache
USER=postgres
PASSWORD=
HOST=localhost

View File

@@ -8,6 +8,7 @@ from cic_registry import zero_address
# local imports
from cic_eth.db.enum import LockEnum
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.lock import Lock
from cic_eth.error import LockedError
@@ -116,9 +117,10 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
@celery_app.task()
def check_lock(chained_input, chain_str, lock_flags, address=None):
r = Lock.check(chain_str, lock_flags, address=zero_address)
session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)
if address != None:
r |= Lock.check(chain_str, lock_flags, address=address)
r |= Lock.check(chain_str, lock_flags, address=address, session=session)
if r > 0:
logg.debug('lock check {} has match {} for {}'.format(lock_flags, r, address))
raise LockedError(r)

View File

@@ -1,12 +1,25 @@
# standard imports
import datetime
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
from cic_eth.db.models.base import SessionBase
from cic_eth.task import CriticalSQLAlchemyTask
celery_app = celery.current_app
@celery_app.task()
def out_tmp(tag, txt):
f = open('/tmp/err.{}.txt'.format(tag), "w")
f.write(txt)
f.close()
@celery_app.task(base=CriticalSQLAlchemyTask)
def alert(chained_input, tag, txt):
session = SessionBase.create_session()
o = Debug(tag, txt)
session.add(o)
session.commit()
session.close()
return chained_input

View File

@@ -1,5 +1,6 @@
# standard imports
import logging
import sys
# third-party imports
import celery
@@ -317,6 +318,8 @@ class AdminApi:
:return: Transaction details
:rtype: dict
"""
problems = []
if tx_hash != None and tx_raw != None:
ValueError('Specify only one of hash or raw tx')
@@ -444,10 +447,12 @@ class AdminApi:
r = c.w3.eth.getTransactionReceipt(tx_hash)
if r.status == 1:
tx['network_status'] = 'Confirmed'
tx['block'] = r.blockNumber
tx['tx_index'] = r.transactionIndex
else:
tx['network_status'] = 'Reverted'
tx['network_block_number'] = r.blockNumber
tx['network_tx_index'] = r.transactionIndex
if tx['block_number'] == None:
problems.append('Queue is missing block number {} for mined tx'.format(r.blockNumber))
except web3.exceptions.TransactionNotFound:
pass
@@ -457,6 +462,7 @@ class AdminApi:
tx_unpacked = unpack_signed_raw_tx(bytes.fromhex(tx['signed_tx'][2:]), chain_spec.chain_id())
tx['gas_price'] = tx_unpacked['gasPrice']
tx['gas_limit'] = tx_unpacked['gas']
tx['data'] = tx_unpacked['data']
s = celery.signature(
'cic_eth.queue.tx.get_state_log',
@@ -468,4 +474,9 @@ class AdminApi:
t = s.apply_async()
tx['status_log'] = t.get()
if len(problems) > 0:
sys.stderr.write('\n')
for p in problems:
sys.stderr.write('!!!{}\n'.format(p))
return tx

View File

@@ -6,10 +6,13 @@
# standard imports
import logging
# third-party imports
# external imports
import celery
from cic_registry.chain import ChainSpec
#from cic_registry.chain import ChainSpec
from cic_registry import CICRegistry
from chainlib.chain import ChainSpec
# local imports
from cic_eth.eth.factory import TxFactory
from cic_eth.db.enum import LockEnum
@@ -89,6 +92,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -107,7 +115,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -144,6 +153,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -162,7 +176,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_convert.link(self.callback_success)
s_tokens.link(s_convert).on_error(self.callback_error)
@@ -197,6 +212,13 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
from_address,
],
queue=self.queue,
)
s_tokens = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
@@ -214,7 +236,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_tokens)
s_nonce.link(s_tokens)
s_check.link(s_nonce)
if self.callback_param != None:
s_transfer.link(self.callback_success)
s_tokens.link(s_transfer).on_error(self.callback_error)
@@ -225,82 +248,6 @@ class Api:
return t
def transfer_request(self, from_address, to_address, spender_address, value, token_symbol):
"""Executes a chain of celery tasks that issues a transfer request of ERC20 tokens from one address to another.
:param from_address: Ethereum address of sender
:type from_address: str, 0x-hex
:param to_address: Ethereum address of recipient
:type to_address: str, 0x-hex
:param spender_address: Ethereum address that is executing transfer (typically an escrow contract)
:type spender_address: str, 0x-hex
:param value: Estimated return from conversion
:type value: int
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:returns: uuid of root task
:rtype: celery.Task
"""
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
[
[token_symbol],
self.chain_str,
LockEnum.QUEUE,
from_address,
],
queue=self.queue,
)
s_tokens_transfer_approval = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_tokens_approve = celery.signature(
'cic_eth.eth.token.resolve_tokens_by_symbol',
[
self.chain_str,
],
queue=self.queue,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
from_address,
spender_address,
value,
self.chain_str,
],
queue=self.queue,
)
s_transfer_approval = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
from_address,
to_address,
value,
self.chain_str,
],
queue=self.queue,
)
# TODO: make approve and transfer_approval chainable so callback can be part of the full chain
if self.callback_param != None:
s_transfer_approval.link(self.callback_success)
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval).on_error(self.callback_error)
else:
s_tokens_approve.link(s_approve)
s_tokens_transfer_approval.link(s_transfer_approval)
g = celery.group(s_tokens_approve, s_tokens_transfer_approval) #s_tokens.apply_async(queue=self.queue)
s_check.link(g)
t = s_check.apply_async()
#t = s_tokens.apply_async(queue=self.queue)
return t
def balance(self, address, token_symbol, include_pending=True):
"""Calls the provided callback with the current token balance of the given address.
@@ -393,6 +340,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_account = celery.signature(
'cic_eth.eth.account.create',
[
@@ -400,7 +352,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_account)
s_nonce.link(s_account)
s_check.link(s_nonce)
if self.callback_param != None:
s_account.link(self.callback_success)
@@ -435,6 +388,11 @@ class Api:
],
queue=self.queue,
)
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[],
queue=self.queue,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
@@ -442,7 +400,8 @@ class Api:
],
queue=self.queue,
)
s_check.link(s_refill)
s_nonce.link(s_refill)
s_check.link(s_nonce)
if self.callback_param != None:
s_refill.link(self.callback_success)
@@ -486,8 +445,9 @@ class Api:
],
queue=self.queue,
)
s_local.link(s_brief)
if self.callback_param != None:
s_assemble.link(self.callback_success).on_error(self.callback_error)
s_brief.link(self.callback_success).on_error(self.callback_error)
t = None
if external_task != None:
@@ -512,11 +472,10 @@ class Api:
c = celery.chain(s_external_get, s_external_process)
t = celery.chord([s_local, c])(s_brief)
else:
t = s_local.apply_sync()
t = s_local.apply_async(queue=self.queue)
return t
def ping(self, r):
"""A noop callback ping for testing purposes.

View File

@@ -18,7 +18,11 @@ logg = celery_app.log.get_default_logger()
def redis(self, result, destination, status_code):
(host, port, db, channel) = destination.split(':')
r = redis_interface.Redis(host=host, port=port, db=db)
s = json.dumps(result)
data = {
'root_id': self.request.root_id,
'status': status_code,
'result': result,
}
logg.debug('redis callback on host {} port {} db {} channel {}'.format(host, port, db, channel))
r.publish(channel, s)
r.publish(channel, json.dumps(data))
r.close()

View File

@@ -20,5 +20,10 @@ def tcp(self, result, destination, status_code):
(host, port) = destination.split(':')
logg.debug('tcp callback to {} {}'.format(host, port))
s.connect((host, int(port)))
s.send(json.dumps(result).encode('utf-8'))
data = {
'root_id': self.request.root_id,
'status': status_code,
'result': result,
}
s.send(json.dumps(data).encode('utf-8'))
s.close()

View File

@@ -103,6 +103,9 @@ def status_str(v, bits_only=False):
except ValueError:
pass
if v == 0:
return 'NONE'
for i in range(16):
b = (1 << i)
if (b & 0xffff) & v:

View File

@@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -0,0 +1,28 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -0,0 +1,30 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@@ -0,0 +1,28 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@@ -0,0 +1,32 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@@ -54,7 +54,7 @@ class SessionBase(Model):
@staticmethod
def connect(dsn, pool_size=8, debug=False):
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.

View File

@@ -0,0 +1,23 @@
# standard imports
import datetime
import logging
# external imports
from sqlalchemy import Column, String, DateTime
# local imports
from .base import SessionBase
class Debug(SessionBase):
__tablename__ = 'debug'
date_created = Column(DateTime, default=datetime.datetime.utcnow)
tag = Column(String)
description = Column(String)
def __init__(self, tag, description):
self.tag = tag
self.description = description

View File

@@ -55,11 +55,9 @@ class Lock(SessionBase):
:returns: New flag state of entry
:rtype: number
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(Lock)
q = session.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
@@ -71,7 +69,8 @@ class Lock(SessionBase):
lock.address = address
lock.blockchain = chain_str
if tx_hash != None:
q = localsession.query(Otx)
session.flush()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first()
if otx != None:
@@ -80,12 +79,11 @@ class Lock(SessionBase):
lock.flags |= flags
r = lock.flags
localsession.add(lock)
localsession.commit()
session.add(lock)
session.commit()
SessionBase.release_session(session)
if session == None:
localsession.close()
return r
@@ -110,11 +108,9 @@ class Lock(SessionBase):
:returns: New flag state of entry
:rtype: number
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(Lock)
q = session.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
@@ -124,14 +120,13 @@ class Lock(SessionBase):
if lock != None:
lock.flags &= ~flags
if lock.flags == 0:
localsession.delete(lock)
session.delete(lock)
else:
localsession.add(lock)
session.add(lock)
r = lock.flags
localsession.commit()
session.commit()
if session == None:
localsession.close()
SessionBase.release_session(session)
return r
@@ -156,22 +151,20 @@ class Lock(SessionBase):
:rtype: number
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(Lock)
q = session.query(Lock)
#q = q.join(TxCache, isouter=True)
q = q.filter(Lock.address==address)
q = q.filter(Lock.blockchain==chain_str)
q = q.filter(Lock.flags.op('&')(flags)==flags)
lock = q.first()
if session == None:
localsession.close()
r = 0
if lock != None:
r = lock.flags & flags
SessionBase.release_session(session)
return r

View File

@@ -1,11 +1,16 @@
# standard imports
import logging
import datetime
# third-party imports
from sqlalchemy import Column, String, Integer
from sqlalchemy import Column, String, Integer, DateTime
# local imports
from .base import SessionBase
from cic_eth.error import (
InitializationError,
IntegrityError,
)
logg = logging.getLogger()
@@ -21,12 +26,9 @@ class Nonce(SessionBase):
@staticmethod
def get(address, session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(Nonce)
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==address)
nonce = q.first()
@@ -34,8 +36,7 @@ class Nonce(SessionBase):
if nonce != None:
nonce_value = nonce.nonce;
if session == None:
localsession.close()
SessionBase.release_session(session)
return nonce_value
@@ -54,6 +55,28 @@ class Nonce(SessionBase):
conn.execute("UPDATE nonce set nonce = {} WHERE address_hex = '{}'".format(nonce, address))
@staticmethod
def __init(conn, address, nonce):
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
@staticmethod
def init(address, nonce=0, session=None):
session = SessionBase.bind_session(session)
q = session.query(Nonce)
q = q.filter(Nonce.address_hex==address)
o = q.first()
if o != None:
session.flush()
raise InitializationError('nonce on {} already exists ({})'.format(address, o.nonce))
session.flush()
Nonce.__init(session, address, nonce)
SessionBase.release_session(session)
# TODO: Incrementing nonce MUST be done by separate tasks.
@staticmethod
def next(address, initial_if_not_exists=0):
"""Generate next nonce for the given address.
@@ -67,20 +90,96 @@ class Nonce(SessionBase):
:returns: Nonce
:rtype: number
"""
#session = SessionBase.bind_session(session)
#session.begin_nested()
conn = Nonce.engine.connect()
if Nonce.transactional:
conn.execute('BEGIN')
conn.execute('LOCK TABLE nonce IN SHARE ROW EXCLUSIVE MODE')
logg.debug('locking nonce table for address {}'.format(address))
nonce = Nonce.__get(conn, address)
logg.debug('get nonce {} for address {}'.format(nonce, address))
if nonce == None:
nonce = initial_if_not_exists
conn.execute("INSERT INTO nonce (nonce, address_hex) VALUES ({}, '{}')".format(nonce, address))
logg.debug('setting default nonce to {} for address {}'.format(nonce, address))
Nonce.__init(conn, address, nonce)
Nonce.__set(conn, address, nonce+1)
if Nonce.transactional:
conn.execute('COMMIT')
logg.debug('unlocking nonce table for address {}'.format(address))
conn.close()
#session.commit()
#SessionBase.release_session(session)
return nonce
class NonceReservation(SessionBase):
__tablename__ = 'nonce_task_reservation'
nonce = Column(Integer)
key = Column(String)
date_created = Column(DateTime, default=datetime.datetime.utcnow)
@staticmethod
def peek(key, session=None):
session = SessionBase.bind_session(session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
nonce = None
if o != None:
nonce = o.nonce
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def release(key, session=None):
session = SessionBase.bind_session(session)
nonce = NonceReservation.peek(key, session=session)
q = session.query(NonceReservation)
q = q.filter(NonceReservation.key==key)
o = q.first()
if o == None:
raise IntegrityError('nonce for key {}'.format(nonce))
SessionBase.release_session(session)
session.delete(o)
session.flush()
SessionBase.release_session(session)
return nonce
@staticmethod
def next(address, key, session=None):
session = SessionBase.bind_session(session)
if NonceReservation.peek(key, session) != None:
raise IntegrityError('nonce for key {}'.format(key))
nonce = Nonce.next(address)
o = NonceReservation()
o.nonce = nonce
o.key = key
session.add(o)
SessionBase.release_session(session)
return nonce

View File

@@ -2,7 +2,7 @@
import datetime
import logging
# third-party imports
# external imports
from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
@@ -79,6 +79,13 @@ class Otx(SessionBase):
return r
def __status_not_set(self, status):
r = not(self.status & status)
if r:
logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash))
return r
def set_block(self, block, session=None):
"""Set block number transaction was mined in.
@@ -320,6 +327,32 @@ class Otx(SessionBase):
SessionBase.release_session(session)
def dequeue(self, session=None):
"""Marks that a process to execute send attempt is underway
Only manipulates object, does not transaction or commit to backend.
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_not_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__reset_status(StatusBits.QUEUED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def minefail(self, block, session=None):
"""Marks that transaction was mined but code execution did not succeed.
@@ -367,24 +400,12 @@ class Otx(SessionBase):
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if confirmed:
if not self.status & StatusBits.OBSOLETE:
if self.status > 0 and not self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusEnum.CANCELLED, session)
else:
self.__set_status(StatusEnum.OBSOLETED, session)
# if confirmed:
# if self.status != StatusEnum.OBSOLETED:
# logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.CANCELLED, session)
# elif self.status != StatusEnum.OBSOLETED:
# if self.status > StatusEnum.SENT:
# logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.OBSOLETED, session)
if self.tracing:
self.__state_log(session=session)

View File

@@ -24,9 +24,10 @@ class AccountRole(SessionBase):
tag = Column(Text)
address_hex = Column(String(42))
# TODO:
@staticmethod
def get_address(tag):
def get_address(tag, session):
"""Get Ethereum address matching the given tag
:param tag: Tag
@@ -34,14 +35,26 @@ class AccountRole(SessionBase):
:returns: Ethereum address, or zero-address if tag does not exist
:rtype: str, 0x-hex
"""
role = AccountRole.get_role(tag)
if role == None:
return zero_address
return role.address_hex
if session == None:
raise ValueError('nested bind session calls will not succeed as the first call to release_session in the stack will leave the db object detached further down the stack. We will need additional reference count.')
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
r = zero_address
if role != None:
r = role.address_hex
session.flush()
SessionBase.release_session(session)
return r
@staticmethod
def get_role(tag):
def get_role(tag, session=None):
"""Get AccountRole model object matching the given tag
:param tag: Tag
@@ -49,20 +62,27 @@ class AccountRole(SessionBase):
:returns: Role object, if found
:rtype: cic_eth.db.models.role.AccountRole
"""
session = AccountRole.create_session()
role = AccountRole.__get_role(session, tag)
session.close()
#return role.address_hex
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
session.flush()
SessionBase.release_session(session)
return role
@staticmethod
def __get_role(session, tag):
return session.query(AccountRole).filter(AccountRole.tag==tag).first()
def __get_role(tag, session):
q = session.query(AccountRole)
q = q.filter(AccountRole.tag==tag)
r = q.first()
return r
@staticmethod
def set(tag, address_hex):
def set(tag, address_hex, session=None):
"""Persist a tag to Ethereum address association.
This will silently overwrite the existing value.
@@ -74,16 +94,18 @@ class AccountRole(SessionBase):
:returns: Role object
:rtype: cic_eth.db.models.role.AccountRole
"""
#session = AccountRole.create_session()
#role = AccountRole.__get(session, tag)
role = AccountRole.get_role(tag) #session, tag)
session = SessionBase.bind_session(session)
role = AccountRole.__get_role(tag, session)
if role == None:
role = AccountRole(tag)
role.address_hex = address_hex
#session.add(role)
#session.commit()
#session.close()
return role #address_hex
session.flush()
SessionBase.release_session(session)
return role
@staticmethod
@@ -95,20 +117,17 @@ class AccountRole(SessionBase):
:returns: Role tag, or None if no match
:rtype: str or None
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
session = SessionBase.bind_session(session)
q = localsession.query(AccountRole)
q = session.query(AccountRole)
q = q.filter(AccountRole.address_hex==address)
role = q.first()
tag = None
if role != None:
tag = role.tag
if session == None:
localsession.close()
SessionBase.release_session(session)
return tag

View File

@@ -85,26 +85,27 @@ class TxCache(SessionBase):
:param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex
"""
localsession = SessionBase.bind_session(session)
session = SessionBase.bind_session(session)
q = localsession.query(TxCache)
q = session.query(TxCache)
q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash_original)
txc = q.first()
if txc == None:
SessionBase.release_session(localsession)
SessionBase.release_session(session)
raise NotLocalTxError('original {}'.format(tx_hash_original))
if txc.block_number != None:
SessionBase.release_session(localsession)
SessionBase.release_session(session)
raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original))
q = localsession.query(Otx)
session.flush()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_new)
otx = q.first()
if otx == None:
SessionBase.release_session(localsession)
SessionBase.release_session(session)
raise NotLocalTxError('new {}'.format(tx_hash_new))
txc_new = TxCache(
@@ -115,18 +116,21 @@ class TxCache(SessionBase):
txc.destination_token_address,
int(txc.from_value),
int(txc.to_value),
session=session,
)
localsession.add(txc_new)
localsession.commit()
session.add(txc_new)
session.commit()
SessionBase.release_session(localsession)
SessionBase.release_session(session)
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
localsession = SessionBase.bind_session(session)
tx = localsession.query(Otx).filter(Otx.tx_hash==tx_hash).first()
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
if tx == None:
SessionBase.release_session(localsession)
SessionBase.release_session(session)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id
@@ -139,9 +143,9 @@ class TxCache(SessionBase):
self.block_number = block_number
self.tx_index = tx_index
# not automatically set in sqlite, it seems:
self.date_created = datetime.datetime.now()
self.date_created = datetime.datetime.utcnow()
self.date_updated = self.date_created
self.date_checked = self.date_created
SessionBase.release_session(localsession)
SessionBase.release_session(session)

View File

@@ -54,6 +54,13 @@ class RoleMissingError(Exception):
pass
class IntegrityError(Exception):
"""Exception raised to signal irregularities with deduplication and ordering of tasks
"""
pass
class LockedError(Exception):
"""Exception raised when attempt is made to execute action that is deactivated by lock

View File

@@ -8,6 +8,7 @@ from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from erc20_single_shot_faucet import Faucet
from cic_registry import zero_address
from hexathon import strip_0x
# local import
from cic_eth.eth import RpcClient
@@ -21,6 +22,7 @@ from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.tx import TxCache
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.error import RoleMissingError
from cic_eth.task import CriticalSQLAlchemyTask
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@@ -34,6 +36,8 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
uuid,
session=None,
):
"""Register an Ethereum account address with the on-chain account registry
@@ -56,7 +60,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
@@ -66,6 +70,8 @@ class AccountTxFactory(TxFactory):
self,
address,
chain_spec,
uuid,
session=None,
):
"""Trigger the on-chain faucet to disburse tokens to the provided Ethereum account
@@ -86,7 +92,7 @@ class AccountTxFactory(TxFactory):
'gas': gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
'nonce': self.next_nonce(uuid, session=session),
'value': 0,
})
return tx_add
@@ -101,11 +107,12 @@ def unpack_register(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != '0a3b0a4f':
raise ValueError('Invalid account index register data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
@@ -120,17 +127,19 @@ def unpack_gift(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != '63e4bff4':
raise ValueError('Invalid account index register data ({})'.format(f))
raise ValueError('Invalid gift data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
}
@celery_app.task()
# TODO: Separate out nonce initialization task
@celery_app.task(base=CriticalSQLAlchemyTask)
def create(password, chain_str):
"""Creates and stores a new ethereum account in the keystore.
@@ -149,20 +158,14 @@ def create(password, chain_str):
logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account
n = c.w3.eth.getTransactionCount(a, 'pending')
session = SessionBase.create_session()
o = session.query(Nonce).filter(Nonce.address_hex==a).first()
if o == None:
o = Nonce()
o.address_hex = a
o.nonce = n
session.add(o)
session.commit()
Nonce.init(a, session=session)
session.commit()
session.close()
return a
@celery_app.task(bind=True, throws=(RoleMissingError,))
@celery_app.task(bind=True, throws=(RoleMissingError,), base=CriticalSQLAlchemyTask)
def register(self, account_address, chain_str, writer_address=None):
"""Creates a transaction to add the given address to the accounts index.
@@ -178,21 +181,24 @@ def register(self, account_address, chain_str, writer_address=None):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
session = SessionBase.create_session()
if writer_address == None:
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER')
writer_address = AccountRole.get_address('ACCOUNTS_INDEX_WRITER', session=session)
if writer_address == zero_address:
session.close()
raise RoleMissingError(account_address)
logg.debug('adding account address {} to index; writer {}'.format(account_address, writer_address))
queue = self.request.delivery_info['routing_key']
c = RpcClient(chain_spec, holder_address=writer_address)
txf = AccountTxFactory(writer_address, c)
tx_add = txf.add(account_address, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data')
tx_add = txf.add(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_account_data', session=session)
session.close()
gas_budget = tx_add['gas'] * tx_add['gasPrice']
@@ -209,7 +215,7 @@ def register(self, account_address, chain_str, writer_address=None):
return account_address
@celery_app.task(bind=True)
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def gift(self, account_address, chain_str):
"""Creates a transaction to invoke the faucet contract for the given address.
@@ -228,12 +234,14 @@ def gift(self, account_address, chain_str):
c = RpcClient(chain_spec, holder_address=account_address)
txf = AccountTxFactory(account_address, c)
tx_add = txf.gift(account_address, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data')
session = SessionBase.create_session()
tx_add = txf.gift(account_address, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_add, chain_str, queue, 'cic_eth.eth.account.cache_gift_data', session=session)
session.close()
gas_budget = tx_add['gas'] * tx_add['gasPrice']
logg.debug('register user tx {}'.format(tx_hash_hex))
logg.debug('gift user tx {}'.format(tx_hash_hex))
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
@@ -324,7 +332,7 @@ def cache_gift_data(
return (tx_hash_hex, cache_id)
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_account_data(
tx_hash_hex,
tx_signed_raw_hex,

View File

@@ -32,10 +32,10 @@ class TxFactory:
logg.debug('txfactory instance address {} gas price'.format(self.address, self.gas_price))
def next_nonce(self):
"""Returns the current cached nonce value, and increments it for next transaction.
def next_nonce(self, uuid, session=None):
"""Returns the current reserved nonce value, and increments it for next transaction.
:returns: Nonce
:rtype: number
"""
return self.nonce_oracle.next()
return self.nonce_oracle.next_by_task_uuid(uuid, session=session)

View File

@@ -52,7 +52,10 @@ class GasOracle():
:returns: Etheerum account address
:rtype: str, 0x-hex
"""
return AccountRole.get_address('GAS_GIFTER')
session = SessionBase.create_session()
a = AccountRole.get_address('GAS_GIFTER', session)
session.close()
return a
def gas_price(self, category='safe'):

View File

@@ -1,5 +1,8 @@
# local imports
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
class NonceOracle():
"""Ensures atomic nonce increments for all transactions across all tasks and threads.
@@ -20,4 +23,9 @@ class NonceOracle():
:returns: Nonce
:rtype: number
"""
raise AttributeError('this should not be called')
return Nonce.next(self.address, self.default_nonce)
def next_by_task_uuid(self, uuid, session=None):
return NonceReservation.release(uuid, session=session)

View File

@@ -1,194 +0,0 @@
# standard imports
import logging
# third-party imports
import web3
import celery
from erc20_approval_escrow import TransferApproval
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
# local imports
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.base import SessionBase
from cic_eth.eth import RpcClient
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.error import TokenCountError
celery_app = celery.current_app
logg = logging.getLogger()
contract_function_signatures = {
'request': 'b0addede',
}
class TransferRequestTxFactory(TxFactory):
"""Factory for creating Transfer request transactions using the TransferApproval contract backend
"""
def request(
self,
token_address,
beneficiary_address,
amount,
chain_spec,
):
"""Create a new TransferApproval.request transaction
:param token_address: Token to create transfer request for
:type token_address: str, 0x-hex
:param beneficiary_address: Beneficiary of token transfer
:type beneficiary_address: str, 0x-hex
:param amount: Amount of tokens to transfer
:type amount: number
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Transaction in standard Ethereum format
:rtype: dict
"""
transfer_approval = CICRegistry.get_contract(chain_spec, 'TransferApproval', 'TransferAuthorization')
fn = transfer_approval.function('createRequest')
tx_approval_buildable = fn(beneficiary_address, token_address, amount)
transfer_approval_gas = transfer_approval.gas('createRequest')
tx_approval = tx_approval_buildable.buildTransaction({
'from': self.address,
'gas': transfer_approval_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_approval
def unpack_transfer_approval_request(data):
"""Verifies that a transaction is an "TransferApproval.request" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
if f != contract_function_signatures['request']:
raise ValueError('Invalid transfer request data ({})'.format(f))
d = data[10:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'token': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
'amount': int(d[128:], 16)
}
@celery_app.task(bind=True)
def transfer_approval_request(self, tokens, holder_address, receiver_address, value, chain_str):
"""Creates a new transfer approval
:param tokens: Token to generate transfer request for
:type tokens: list with single token spec as dict
:param holder_address: Address to generate transfer on behalf of
:type holder_address: str, 0x-hex
:param receiver_address: Address to transfser tokens to
:type receiver_address: str, 0x-hex
:param value: Amount of tokens to transfer
:type value: number
:param chain_spec: Chain spec string representation
:type chain_spec: str
:raises cic_eth.error.TokenCountError: More than one token in tokens argument
:returns: Raw signed transaction
:rtype: list with transaction as only element
"""
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
t = tokens[0]
c = RpcClient(holder_address)
txf = TransferRequestTxFactory(holder_address, c)
tx_transfer = txf.request(t['address'], receiver_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, 'cic_eth.eth.request.otx_cache_transfer_approval_request')
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return [tx_signed_raw_hex]
@celery_app.task()
def otx_cache_transfer_approval_request(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an TransferApproval.request transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('in otx acche transfer approval request')
(txc, cache_id) = cache_transfer_approval_request_data(tx_hash_hex, tx)
return txc
@celery_app.task()
def cache_transfer_approval_request_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_transfer_approval_request
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_transfer_approval_request(tx['data'])
logg.debug('tx approval request data {}'.format(tx_data))
logg.debug('tx approval request {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx_data['token'],
tx_data['token'],
tx_data['amount'],
tx_data['amount'],
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)

View File

@@ -33,7 +33,7 @@ def sign_tx(tx, chain_str):
return (tx_hash_hex, tx_transfer_signed['raw'],)
def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
def sign_and_register_tx(tx, chain_str, queue, cache_task=None, session=None):
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
:param tx: Standard ethereum transaction data
@@ -44,6 +44,7 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
:type queue: str
:param cache_task: Cache task to call with signed transaction. If None, no task will be called.
:type cache_task: str
:raises: sqlalchemy.exc.DatabaseError
:returns: Tuple; Transaction hash, signed raw transaction data
:rtype: tuple
"""
@@ -51,25 +52,13 @@ def sign_and_register_tx(tx, chain_str, queue, cache_task=None):
logg.debug('adding queue tx {}'.format(tx_hash_hex))
# s = celery.signature(
# 'cic_eth.queue.tx.create',
# [
# tx['nonce'],
# tx['from'],
# tx_hash_hex,
# tx_signed_raw_hex,
# chain_str,
# ],
# queue=queue,
# )
# TODO: consider returning this as a signature that consequtive tasks can be linked to
queue_create(
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
session=session,
)
if cache_task != None:

View File

@@ -8,6 +8,8 @@ import web3
from cic_registry import CICRegistry
from cic_registry import zero_address
from cic_registry.chain import ChainSpec
from hexathon import strip_0x
from chainlib.status import Status as TxStatus
# platform imports
from cic_eth.db.models.tx import TxCache
@@ -19,6 +21,10 @@ from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.ext.address import translate_address
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
)
celery_app = celery.current_app
logg = logging.getLogger()
@@ -40,6 +46,8 @@ class TokenTxFactory(TxFactory):
spender_address,
amount,
chain_spec,
uuid,
session=None,
):
"""Create an ERC20 "approve" transaction
@@ -67,7 +75,7 @@ class TokenTxFactory(TxFactory):
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
'nonce': self.next_nonce(uuid, session=session),
})
return tx_approve
@@ -78,6 +86,8 @@ class TokenTxFactory(TxFactory):
receiver_address,
value,
chain_spec,
uuid,
session=None,
):
"""Create an ERC20 "transfer" transaction
@@ -106,7 +116,7 @@ class TokenTxFactory(TxFactory):
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
'nonce': self.next_nonce(uuid, session=session),
})
return tx_transfer
@@ -120,11 +130,12 @@ def unpack_transfer(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['transfer']:
raise ValueError('Invalid transfer data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
@@ -140,11 +151,12 @@ def unpack_transferfrom(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['transferfrom']:
raise ValueError('Invalid transferFrom data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'from': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'to': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
@@ -161,18 +173,19 @@ def unpack_approve(data):
:returns: Parsed parameters
:rtype: dict
"""
f = data[2:10]
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['approve']:
raise ValueError('Invalid approval data ({})'.format(f))
d = data[10:]
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
}
@celery_app.task()
@celery_app.task(base=CriticalWeb3Task)
def balance(tokens, holder_address, chain_str):
"""Return token balances for a list of tokens for given address
@@ -199,7 +212,7 @@ def balance(tokens, holder_address, chain_str):
return tokens
@celery_app.task(bind=True)
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
"""Transfer ERC20 tokens between addresses
@@ -235,9 +248,11 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
c = RpcClient(chain_spec, holder_address=holder_address)
txf = TokenTxFactory(holder_address, c)
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer')
session = SessionBase.create_session()
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer', session=session)
session.close()
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
@@ -253,7 +268,7 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
return tx_hash_hex
@celery_app.task(bind=True)
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def approve(self, tokens, holder_address, spender_address, value, chain_str):
"""Approve ERC20 transfer on behalf of holder address
@@ -290,8 +305,10 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
txf = TokenTxFactory(holder_address, c)
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve')
session = SessionBase.create_session()
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec, self.request.root_id, session=session)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve', session=session)
session.close()
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
@@ -307,7 +324,7 @@ def approve(self, tokens, holder_address, spender_address, value, chain_str):
return tx_hash_hex
@celery_app.task()
@celery_app.task(base=CriticalWeb3Task)
def resolve_tokens_by_symbol(token_symbols, chain_str):
"""Returns contract addresses of an array of ERC20 token symbols
@@ -330,7 +347,7 @@ def resolve_tokens_by_symbol(token_symbols, chain_str):
return tokens
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def otx_cache_transfer(
tx_hash_hex,
tx_signed_raw_hex,
@@ -354,7 +371,7 @@ def otx_cache_transfer(
return txc
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_transfer_data(
tx_hash_hex,
tx,
@@ -390,7 +407,7 @@ def cache_transfer_data(
return (tx_hash_hex, cache_id)
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def otx_cache_approve(
tx_hash_hex,
tx_signed_raw_hex,
@@ -414,7 +431,7 @@ def otx_cache_approve(
return txc
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_approve_data(
tx_hash_hex,
tx,
@@ -450,6 +467,7 @@ def cache_approve_data(
return (tx_hash_hex, cache_id)
# TODO: Move to dedicated metadata package
class ExtendedTx:
_default_decimals = 6
@@ -470,6 +488,8 @@ class ExtendedTx:
self.destination_token_symbol = ''
self.source_token_decimals = ExtendedTx._default_decimals
self.destination_token_decimals = ExtendedTx._default_decimals
self.status = TxStatus.PENDING.name
self.status_code = TxStatus.PENDING.value
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
@@ -497,10 +517,18 @@ class ExtendedTx:
self.destination_token_value = destination_value
def set_status(self, n):
if n:
self.status = TxStatus.ERROR.name
else:
self.status = TxStatus.SUCCESS.name
self.status_code = n
def to_dict(self):
o = {}
for attr in dir(self):
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'to_dict']:
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
continue
o[attr] = getattr(self, attr)
return o

View File

@@ -12,6 +12,7 @@ from cic_registry.chain import ChainSpec
from .rpc import RpcClient
from cic_eth.db import Otx, SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import NonceReservation
from cic_eth.db.models.lock import Lock
from cic_eth.db.enum import (
LockEnum,
@@ -32,6 +33,10 @@ from cic_eth.eth.nonce import NonceOracle
from cic_eth.error import AlreadyFillingGasError
from cic_eth.eth.util import tx_hex_string
from cic_eth.admin.ctrl import lock_send
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
)
celery_app = celery.current_app
logg = logging.getLogger()
@@ -40,7 +45,7 @@ MAX_NONCE_ATTEMPTS = 3
# TODO this function is too long
@celery_app.task(bind=True, throws=(OutOfGasError))
@celery_app.task(bind=True, throws=(OutOfGasError), base=CriticalSQLAlchemyTask)
def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None):
"""Check the gas level of the sender address of a transaction.
@@ -65,7 +70,6 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
for i in range(len(tx_hashes)):
o = get_tx(tx_hashes[i])
txs.append(o['signed_tx'])
logg.debug('ooooo {}'.format(o))
if address == None:
address = o['address']
@@ -81,15 +85,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
if gas_required > balance:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
address,
chain_str,
],
queue=queue,
)
s_refill_gas.apply_async()
s_nonce.link(s_refill_gas)
s_nonce.apply_async()
wait_tasks = []
for tx_hash in tx_hashes:
s = celery.signature(
@@ -105,15 +117,23 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
safe_gas = c.safe_threshold_amount()
if balance < safe_gas:
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
c.gas_provider(),
],
queue=queue,
)
s_refill_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
address,
chain_str,
],
queue=queue,
)
s_refill_gas.apply_async()
s_nonce.link(s_refill)
s_nonce.apply_async()
logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address))
ready_tasks = []
for tx_hash in tx_hashes:
@@ -131,7 +151,7 @@ def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=Non
# TODO: chain chainable transactions that use hashes as inputs may be chained to this function to output signed txs instead.
@celery_app.task(bind=True)
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def hashes_to_txs(self, tx_hashes):
"""Return a list of raw signed transactions from the local transaction queue corresponding to a list of transaction hashes.
@@ -174,12 +194,7 @@ class ParityNodeHandler:
def handle(self, exception, tx_hash_hex, tx_hex):
meth = self.handle_default
if isinstance(exception, (ValueError)):
# s_debug = celery.signature(
# 'cic_eth.admin.debug.out_tmp',
# [tx_hash_hex, '{}: {}'.format(tx_hash_hex, exception)],
# queue=queue,
# )
# s_debug.apply_async()
earg = exception.args[0]
if earg['code'] == -32010:
logg.debug('skipping lock for code {}'.format(earg['code']))
@@ -187,14 +202,15 @@ class ParityNodeHandler:
elif earg['code'] == -32602:
meth = self.handle_invalid_encoding
else:
# TODO: move to status log db comment field
meth = self.handle_invalid
elif isinstance(exception, (requests.exceptions.ConnectionError)):
meth = self.handle_connection
(t, e_fn, message) = meth(tx_hash_hex, tx_hex)
(t, e_fn, message) = meth(tx_hash_hex, tx_hex, str(exception))
return (t, e_fn, '{} {}'.format(message, exception))
def handle_connection(self, tx_hash_hex, tx_hex):
def handle_connection(self, tx_hash_hex, tx_hex, debugstr=None):
s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status',
[
@@ -207,7 +223,7 @@ class ParityNodeHandler:
return (t, TemporaryTxError, 'Sendfail {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_encoding(self, tx_hash_hex, tx_hex):
def handle_invalid_encoding(self, tx_hash_hex, tx_hex, debugstr=None):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
@@ -254,7 +270,7 @@ class ParityNodeHandler:
return (t, PermanentTxError, 'Reject invalid encoding {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid_parameters(self, tx_hash_hex, tx_hex):
def handle_invalid_parameters(self, tx_hash_hex, tx_hex, debugstr=None):
s_sync = celery.signature(
'cic_eth.eth.tx.sync_tx',
[
@@ -267,7 +283,7 @@ class ParityNodeHandler:
return (t, PermanentTxError, 'Reject invalid parameters {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_invalid(self, tx_hash_hex, tx_hex):
def handle_invalid(self, tx_hash_hex, tx_hex, debugstr=None):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
@@ -285,12 +301,21 @@ class ParityNodeHandler:
[],
queue=self.queue,
)
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
debugstr,
],
queue=self.queue,
)
s_set_reject.link(s_debug)
s_lock.link(s_set_reject)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
def handle_default(self, tx_hash_hex, tx_hex):
def handle_default(self, tx_hash_hex, tx_hex, debugstr):
tx_bytes = bytes.fromhex(tx_hex[2:])
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
s_lock = celery.signature(
@@ -308,12 +333,22 @@ class ParityNodeHandler:
[],
queue=self.queue,
)
s_debug = celery.signature(
'cic_eth.admin.debug.alert',
[
tx_hash_hex,
debugstr,
],
queue=self.queue,
)
s_set_fubar.link(s_debug)
s_lock.link(s_set_fubar)
t = s_lock.apply_async()
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
return (t, PermanentTxError, 'Fubar {} {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id()), debugstr))
@celery_app.task(bind=True)
# TODO: A lock should be introduced to ensure that the send status change and the transaction send is atomic.
@celery_app.task(bind=True, base=CriticalWeb3Task)
def send(self, txs, chain_str):
"""Send transactions to the network.
@@ -351,13 +386,6 @@ def send(self, txs, chain_str):
c = RpcClient(chain_spec)
r = None
try:
r = c.w3.eth.send_raw_transaction(tx_hex)
except Exception as e:
raiser = ParityNodeHandler(chain_spec, queue)
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
raise e(m)
s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status',
[
@@ -366,6 +394,14 @@ def send(self, txs, chain_str):
],
queue=queue,
)
try:
r = c.w3.eth.send_raw_transaction(tx_hex)
except requests.exceptions.ConnectionError as e:
raise(e)
except Exception as e:
raiser = ParityNodeHandler(chain_spec, queue)
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
raise e(m)
s_set_sent.apply_async()
tx_tail = txs[1:]
@@ -380,7 +416,9 @@ def send(self, txs, chain_str):
return r.hex()
@celery_app.task(bind=True, throws=(AlreadyFillingGasError))
# TODO: if this method fails the nonce will be out of sequence. session needs to be extended to include the queue create, so that nonce is rolled back if the second sql query fails. Better yet, split each state change into separate tasks.
# TODO: method is too long, factor out code for clarity
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
def refill_gas(self, recipient_address, chain_str):
"""Executes a native token transaction to fund the recipient's gas expenditures.
@@ -394,17 +432,21 @@ def refill_gas(self, recipient_address, chain_str):
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
zero_amount = False
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!='0x00')
q = q.filter(TxCache.from_value!=0)
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
session.close()
if c > 0:
raise AlreadyFillingGasError(recipient_address)
#session.close()
#raise AlreadyFillingGasError(recipient_address)
logg.warning('already filling gas {}'.format(str(AlreadyFillingGasError(recipient_address))))
zero_amount = True
session.flush()
queue = self.request.delivery_info['routing_key']
@@ -413,10 +455,13 @@ def refill_gas(self, recipient_address, chain_str):
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
nonce = nonce_generator.next()
#nonce = nonce_generator.next(session=session)
nonce = nonce_generator.next_by_task_uuid(self.request.root_id, session=session)
gas_price = c.gas_price()
gas_limit = c.default_gas_limit
refill_amount = c.refill_amount()
refill_amount = 0
if not zero_amount:
refill_amount = c.refill_amount()
logg.debug('tx send gas price {} nonce {}'.format(gas_price, nonce))
# create and sign transaction
@@ -442,7 +487,9 @@ def refill_gas(self, recipient_address, chain_str):
tx_hash_hex,
tx_send_gas_signed['raw'],
chain_str,
session=session,
)
session.close()
s_tx_cache = celery.signature(
'cic_eth.eth.tx.cache_gas_refill_data',
@@ -460,6 +507,7 @@ def refill_gas(self, recipient_address, chain_str):
queue=queue,
)
celery.group(s_tx_cache, s_status)()
return tx_send_gas_signed['raw']
@@ -485,8 +533,8 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
q = session.query(Otx)
q = q.filter(Otx.tx_hash==txold_hash_hex)
otx = q.first()
session.close()
if otx == None:
session.close()
raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_chain_str(chain_str)
@@ -521,8 +569,10 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
session=session,
)
TxCache.clone(txold_hash_hex, tx_hash_hex)
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
session.close()
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
@@ -537,8 +587,30 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
return tx_hash_hex
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,))
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def reserve_nonce(self, chained_input, address=None):
session = SessionBase.create_session()
if address == None:
address = chained_input
root_id = self.request.root_id
nonce = NonceReservation.next(address, root_id)
session.close()
return chained_input
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,), base=CriticalWeb3Task)
def sync_tx(self, tx_hash_hex, chain_str):
"""Force update of network status of a simgle transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
"""
queue = self.request.delivery_info['routing_key']
@@ -621,7 +693,7 @@ def resume_tx(self, txpending_hash_hex, chain_str):
return txpending_hash_hex
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def otx_cache_parse_tx(
tx_hash_hex,
tx_signed_raw_hex,
@@ -648,7 +720,7 @@ def otx_cache_parse_tx(
return txc
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_gas_refill_data(
tx_hash_hex,
tx,

View File

@@ -3,10 +3,11 @@ import logging
import sha3
import web3
# third-party imports
# external imports
from rlp import decode as rlp_decode
from rlp import encode as rlp_encode
from eth_keys import KeyAPI
from chainlib.eth.tx import unpack
logg = logging.getLogger()
@@ -22,64 +23,65 @@ field_debugs = [
's',
]
unpack_signed_raw_tx = unpack
def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
d = rlp_decode(tx_raw_bytes)
logg.debug('decoding using chain id {}'.format(chain_id))
j = 0
for i in d:
logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
j += 1
vb = chain_id
if chain_id != 0:
v = int.from_bytes(d[6], 'big')
vb = v - (chain_id * 2) - 35
while len(d[7]) < 32:
d[7] = b'\x00' + d[7]
while len(d[8]) < 32:
d[8] = b'\x00' + d[8]
s = b''.join([d[7], d[8], bytes([vb])])
so = KeyAPI.Signature(signature_bytes=s)
h = sha3.keccak_256()
h.update(rlp_encode(d))
signed_hash = h.digest()
d[6] = chain_id
d[7] = b''
d[8] = b''
h = sha3.keccak_256()
h.update(rlp_encode(d))
unsigned_hash = h.digest()
p = so.recover_public_key_from_msg_hash(unsigned_hash)
a = p.to_checksum_address()
logg.debug('decoded recovery byte {}'.format(vb))
logg.debug('decoded address {}'.format(a))
logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
to = d[3].hex() or None
if to != None:
to = web3.Web3.toChecksumAddress('0x' + to)
return {
'from': a,
'nonce': int.from_bytes(d[0], 'big'),
'gasPrice': int.from_bytes(d[1], 'big'),
'gas': int.from_bytes(d[2], 'big'),
'to': to,
'value': int.from_bytes(d[4], 'big'),
'data': '0x' + d[5].hex(),
'v': chain_id,
'r': '0x' + s[:32].hex(),
's': '0x' + s[32:64].hex(),
'chainId': chain_id,
'hash': '0x' + signed_hash.hex(),
'hash_unsigned': '0x' + unsigned_hash.hex(),
}
#def unpack_signed_raw_tx(tx_raw_bytes, chain_id):
# d = rlp_decode(tx_raw_bytes)
#
# logg.debug('decoding {} using chain id {}'.format(tx_raw_bytes.hex(), chain_id))
# j = 0
# for i in d:
# logg.debug('decoded {}: {}'.format(field_debugs[j], i.hex()))
# j += 1
# vb = chain_id
# if chain_id != 0:
# v = int.from_bytes(d[6], 'big')
# vb = v - (chain_id * 2) - 35
# while len(d[7]) < 32:
# d[7] = b'\x00' + d[7]
# while len(d[8]) < 32:
# d[8] = b'\x00' + d[8]
# s = b''.join([d[7], d[8], bytes([vb])])
# so = KeyAPI.Signature(signature_bytes=s)
#
# h = sha3.keccak_256()
# h.update(rlp_encode(d))
# signed_hash = h.digest()
#
# d[6] = chain_id
# d[7] = b''
# d[8] = b''
#
# h = sha3.keccak_256()
# h.update(rlp_encode(d))
# unsigned_hash = h.digest()
#
# p = so.recover_public_key_from_msg_hash(unsigned_hash)
# a = p.to_checksum_address()
# logg.debug('decoded recovery byte {}'.format(vb))
# logg.debug('decoded address {}'.format(a))
# logg.debug('decoded signed hash {}'.format(signed_hash.hex()))
# logg.debug('decoded unsigned hash {}'.format(unsigned_hash.hex()))
#
# to = d[3].hex() or None
# if to != None:
# to = web3.Web3.toChecksumAddress('0x' + to)
#
# return {
# 'from': a,
# 'nonce': int.from_bytes(d[0], 'big'),
# 'gasPrice': int.from_bytes(d[1], 'big'),
# 'gas': int.from_bytes(d[2], 'big'),
# 'to': to,
# 'value': int.from_bytes(d[4], 'big'),
# 'data': '0x' + d[5].hex(),
# 'v': chain_id,
# 'r': '0x' + s[:32].hex(),
# 's': '0x' + s[32:64].hex(),
# 'chainId': chain_id,
# 'hash': '0x' + signed_hash.hex(),
# 'hash_unsigned': '0x' + unsigned_hash.hex(),
# }
def unpack_signed_raw_tx_hex(tx_raw_hex, chain_id):

View File

@@ -149,6 +149,9 @@ def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True):
txs_by_block = {}
chain_spec = ChainSpec.from_chain_str(chain_str)
if isinstance(tx_batches, dict):
tx_batches = [tx_batches]
for b in tx_batches:
for v in b.values():
tx = None

View File

@@ -14,6 +14,7 @@ from cic_eth.db.enum import (
StatusBits,
dead,
)
from cic_eth.task import CriticalSQLAlchemyTask
celery_app = celery.current_app
@@ -35,7 +36,7 @@ def __balance_outgoing_compatible(token_address, holder_address, chain_str):
return delta
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def balance_outgoing(tokens, holder_address, chain_str):
"""Retrieve accumulated value of unprocessed transactions sent from the given address.
@@ -73,7 +74,7 @@ def __balance_incoming_compatible(token_address, receiver_address, chain_str):
return delta
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def balance_incoming(tokens, receipient_address, chain_str):
"""Retrieve accumulated value of unprocessed transactions to be received by the given address.

View File

@@ -10,6 +10,7 @@ from cic_registry.chain import ChainSpec
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.models.otx import Otx
from cic_eth.error import NotLocalTxError
from cic_eth.task import CriticalSQLAlchemyAndWeb3Task
celery_app = celery.current_app
@@ -17,7 +18,7 @@ logg = logging.getLogger()
# TODO: This method does not belong in the _queue_ module, it operates across queue and network
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyAndWeb3Task)
def tx_times(tx_hash, chain_str):
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)

View File

@@ -26,9 +26,11 @@ from cic_eth.db.enum import (
is_alive,
dead,
)
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
from cic_eth.error import NotLocalTxError
from cic_eth.error import LockedError
from cic_eth.db.enum import status_str
celery_app = celery.current_app
#logg = celery_app.log.get_default_logger()
@@ -86,7 +88,7 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
# TODO: Replace set_* with single task for set status
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_sent_status(tx_hash, fail=False):
"""Used to set the status after a send attempt
@@ -118,7 +120,7 @@ def set_sent_status(tx_hash, fail=False):
return tx_hash
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_final_status(tx_hash, block=None, fail=False):
"""Used to set the status of an incoming transaction result.
@@ -174,7 +176,7 @@ def set_final_status(tx_hash, block=None, fail=False):
return tx_hash
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_cancel(tx_hash, manual=False):
"""Used to set the status when a transaction is cancelled.
@@ -206,7 +208,7 @@ def set_cancel(tx_hash, manual=False):
return tx_hash
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_rejected(tx_hash):
"""Used to set the status when the node rejects sending a transaction to network
@@ -232,7 +234,7 @@ def set_rejected(tx_hash):
return tx_hash
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_fubar(tx_hash):
"""Used to set the status when an unexpected error occurs.
@@ -258,7 +260,7 @@ def set_fubar(tx_hash):
return tx_hash
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_manual(tx_hash):
"""Used to set the status when queue is manually changed
@@ -284,7 +286,7 @@ def set_manual(tx_hash):
return tx_hash
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_ready(tx_hash):
"""Used to mark a transaction as ready to be sent to network
@@ -310,7 +312,27 @@ def set_ready(tx_hash):
return tx_hash
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_dequeue(tx_hash):
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.dequeue(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_waitforgas(tx_hash):
"""Used to set the status when a transaction must be deferred due to gas refill
@@ -336,7 +358,7 @@ def set_waitforgas(tx_hash):
return tx_hash
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_state_log(tx_hash):
logs = []
@@ -355,7 +377,7 @@ def get_state_log(tx_hash):
return logs
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_tx_cache(tx_hash):
"""Returns an aggregate dictionary of outgoing transaction data and metadata
@@ -386,7 +408,7 @@ def get_tx_cache(tx_hash):
'tx_hash': otx.tx_hash,
'signed_tx': otx.signed_tx,
'nonce': otx.nonce,
'status': StatusEnum(otx.status).name,
'status': status_str(otx.status),
'status_code': otx.status,
'source_token': txc.source_token_address,
'destination_token': txc.destination_token_address,
@@ -404,7 +426,7 @@ def get_tx_cache(tx_hash):
return tx
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_lock(address=None):
"""Retrieve all active locks
@@ -442,7 +464,7 @@ def get_lock(address=None):
return locks
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_tx(tx_hash):
"""Retrieve a transaction queue record by transaction hash
@@ -453,7 +475,9 @@ def get_tx(tx_hash):
:rtype: dict
"""
session = SessionBase.create_session()
tx = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
if tx == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
@@ -469,7 +493,7 @@ def get_tx(tx_hash):
return o
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_nonce_tx(nonce, sender, chain_id):
"""Retrieve all transactions for address with specified nonce
@@ -544,7 +568,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
return txs
def get_status_tx(status, before=None, exact=False, limit=0, session=None):
def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, session=None):
"""Retrieve transaction with a specific queue status.
:param status: Status to match transactions with
@@ -560,11 +584,15 @@ def get_status_tx(status, before=None, exact=False, limit=0, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.date_updated<before)
# before = datetime.datetime.utcnow()
if before != None:
q = q.filter(TxCache.date_updated<before)
if exact:
q = q.filter(Otx.status==status.value)
q = q.filter(Otx.status==status)
else:
q = q.filter(Otx.status.op('&')(status.value)==status.value)
q = q.filter(Otx.status.op('&')(status)>0)
if not_status != None:
q = q.filter(Otx.status.op('&')(not_status)==0)
i = 0
for o in q.all():
if limit > 0 and i == limit:
@@ -652,7 +680,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
return txs
@celery_app.task()
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None):
"""Returns all local queue transactions for a given Ethereum address

View File

@@ -0,0 +1,86 @@
# standard imports
import logging
import copy
# external imports
from cic_registry import CICRegistry
from eth_token_index import TokenUniqueSymbolIndex
from eth_accounts_index import AccountRegistry
from chainlib.chain import ChainSpec
from cic_registry.chain import ChainRegistry
from cic_registry.helper.declarator import DeclaratorOracleAdapter
logg = logging.getLogger(__name__)
class TokenOracle:
def __init__(self, conn, chain_spec, registry):
self.tokens = []
self.chain_spec = chain_spec
self.registry = registry
token_registry_contract = CICRegistry.get_contract(chain_spec, 'TokenRegistry', 'Registry')
self.getter = TokenUniqueSymbolIndex(conn, token_registry_contract.address())
def get_tokens(self):
token_count = self.getter.count()
if token_count == len(self.tokens):
return self.tokens
for i in range(len(self.tokens), token_count):
token_address = self.getter.get_index(i)
t = self.registry.get_address(self.chain_spec, token_address)
token_symbol = t.symbol()
self.tokens.append(t)
logg.debug('adding token idx {} symbol {} address {}'.format(i, token_symbol, token_address))
return copy.copy(self.tokens)
class AccountsOracle:
def __init__(self, conn, chain_spec, registry):
self.accounts = []
self.chain_spec = chain_spec
self.registry = registry
accounts_registry_contract = CICRegistry.get_contract(chain_spec, 'AccountRegistry', 'Registry')
self.getter = AccountRegistry(conn, accounts_registry_contract.address())
def get_accounts(self):
accounts_count = self.getter.count()
if accounts_count == len(self.accounts):
return self.accounts
for i in range(len(self.accounts), accounts_count):
account = self.getter.get_index(i)
self.accounts.append(account)
logg.debug('adding account {}'.format(account))
return copy.copy(self.accounts)
def init_registry(config, w3):
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
CICRegistry.init(w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry, True)
declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator')
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
raise ValueError('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses)
chain_registry.add_oracle(oracle, 'naive_erc20_oracle')
return CICRegistry

View File

@@ -76,8 +76,9 @@ def main():
t = api.create_account(register=register)
ps.get_message()
m = ps.get_message(timeout=args.timeout)
print(json.loads(m['data']))
o = ps.get_message(timeout=args.timeout)
m = json.loads(o['data'])
print(m['result'])
if __name__ == '__main__':

View File

@@ -15,21 +15,30 @@ import web3
from web3 import HTTPProvider, WebsocketProvider
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from chainlib.eth.tx import unpack
from hexathon import strip_0x
# local imports
import cic_eth
from cic_eth.eth import RpcClient
from cic_eth.db import SessionBase
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import StatusBits
from cic_eth.db.enum import LockEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import get_upcoming_tx
from cic_eth.queue.tx import (
get_upcoming_tx,
set_dequeue,
)
from cic_eth.admin.ctrl import lock_send
from cic_eth.sync.error import LoopDone
from cic_eth.eth.tx import send as task_tx_send
from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError
from cic_eth.eth.util import unpack_signed_raw_tx_hex
from cic_eth.error import (
PermanentTxError,
TemporaryTxError,
NotLocalTxError,
)
#from cic_eth.eth.util import unpack_signed_raw_tx_hex
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -91,6 +100,8 @@ run = True
class DispatchSyncer:
yield_delay = 0.005
def __init__(self, chain_spec):
self.chain_spec = chain_spec
self.chain_id = chain_spec.chain_id()
@@ -106,7 +117,15 @@ class DispatchSyncer:
chain_str = str(self.chain_spec)
for k in txs.keys():
tx_raw = txs[k]
tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
#tx = unpack_signed_raw_tx_hex(tx_raw, self.chain_spec.chain_id())
tx_raw_bytes = bytes.fromhex(strip_0x(tx_raw))
tx = unpack(tx_raw_bytes, self.chain_spec.chain_id())
try:
set_dequeue(tx['hash'])
except NotLocalTxError as e:
logg.warning('dispatcher was triggered with non-local tx {}'.format(tx['hash']))
continue
s_check = celery.signature(
'cic_eth.admin.ctrl.check_lock',
@@ -127,18 +146,22 @@ class DispatchSyncer:
)
s_check.link(s_send)
t = s_check.apply_async()
logg.info('processed {}'.format(k))
def loop(self, w3, interval):
while run:
txs = {}
typ = StatusEnum.READYSEND
typ = StatusBits.QUEUED
utxs = get_upcoming_tx(typ, chain_id=self.chain_id)
for k in utxs.keys():
txs[k] = utxs[k]
self.process(w3, txs)
time.sleep(interval)
if len(utxs) > 0:
time.sleep(self.yield_delay)
else:
time.sleep(interval)
def main():

View File

@@ -2,3 +2,4 @@ from .callback import CallbackFilter
from .tx import TxFilter
from .gas import GasFilter
from .register import RegistrationFilter
from .transferauth import TransferAuthFilter

View File

@@ -5,37 +5,46 @@ import logging
import web3
import celery
from cic_registry.error import UnknownContractError
from chainlib.status import Status as TxStatus
from chainlib.eth.address import to_checksum
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import strip_0x
# local imports
from .base import SyncFilter
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.token import (
unpack_transfer,
unpack_transferfrom,
)
from cic_eth.eth.account import unpack_gift
from cic_eth.eth.token import ExtendedTx
from .base import SyncFilter
logg = logging.getLogger()
logg = logging.getLogger(__name__)
transfer_method_signature = '0xa9059cbb' # keccak256(transfer(address,uint256))
transferfrom_method_signature = '0x23b872dd' # keccak256(transferFrom(address,address,uint256))
giveto_method_signature = '0x63e4bff4' # keccak256(giveTo(address))
transfer_method_signature = 'a9059cbb' # keccak256(transfer(address,uint256))
transferfrom_method_signature = '23b872dd' # keccak256(transferFrom(address,address,uint256))
giveto_method_signature = '63e4bff4' # keccak256(giveTo(address))
class CallbackFilter(SyncFilter):
trusted_addresses = []
def __init__(self, method, queue):
def __init__(self, chain_spec, method, queue):
self.queue = queue
self.method = method
self.chain_spec = chain_spec
def call_back(self, transfer_type, result):
logg.debug('result {}'.format(result))
s = celery.signature(
self.method,
[
result,
transfer_type,
int(rcpt.status == 0),
int(result['status_code'] == 0),
],
queue=self.queue,
)
@@ -50,58 +59,85 @@ class CallbackFilter(SyncFilter):
# )
# s_translate.link(s)
# s_translate.apply_async()
s.apply_async()
t = s.apply_async()
return s
def parse_data(self, tx, rcpt):
transfer_type = 'transfer'
def parse_data(self, tx):
transfer_type = None
transfer_data = None
method_signature = tx.input[:10]
logg.debug('have payload {}'.format(tx.payload))
method_signature = tx.payload[:8]
logg.debug('tx status {}'.format(tx.status))
if method_signature == transfer_method_signature:
transfer_data = unpack_transfer(tx.input)
transfer_data = unpack_transfer(tx.payload)
transfer_data['from'] = tx['from']
transfer_data['token_address'] = tx['to']
elif method_signature == transferfrom_method_signature:
transfer_type = 'transferfrom'
transfer_data = unpack_transferfrom(tx.input)
transfer_data = unpack_transferfrom(tx.payload)
transfer_data['token_address'] = tx['to']
# TODO: do not rely on logs here
elif method_signature == giveto_method_signature:
transfer_type = 'tokengift'
transfer_data = unpack_gift(tx.input)
for l in rcpt.logs:
if l.topics[0].hex() == '0x45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=l.data)
token_address_bytes = l.topics[2][32-20:]
transfer_data['token_address'] = web3.Web3.toChecksumAddress(token_address_bytes.hex())
transfer_data['from'] = rcpt.to
transfer_data = unpack_gift(tx.payload)
transfer_data['from'] = tx.inputs[0]
transfer_data['value'] = 0
transfer_data['token_address'] = ZERO_ADDRESS
for l in tx.logs:
topics = l['topics']
logg.debug('topixx {}'.format(topics))
if strip_0x(topics[0]) == '45c201a59ac545000ead84f30b2db67da23353aa1d58ac522c48505412143ffa':
transfer_data['value'] = web3.Web3.toInt(hexstr=strip_0x(l['data']))
#token_address_bytes = topics[2][32-20:]
token_address = strip_0x(topics[2])[64-40:]
transfer_data['token_address'] = to_checksum(token_address)
logg.debug('resolved method {}'.format(transfer_type))
if transfer_data != None:
transfer_data['status'] = tx.status
return (transfer_type, transfer_data)
def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying callback filter "{}:{}"'.format(self.queue, self.method))
chain_str = str(chain_spec)
transfer_data = self.parse_data(tx, rcpt)
def filter(self, conn, block, tx, db_session=None):
chain_str = str(self.chain_spec)
transfer_data = None
if len(tx.input) < 10:
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx['hash']))
transfer_type = None
try:
(transfer_type, transfer_data) = self.parse_data(tx)
except TypeError:
logg.debug('invalid method data length for tx {}'.format(tx.hash))
return
logg.debug('checking callbacks filter input {}'.format(tx.input[:10]))
if len(tx.payload) < 8:
logg.debug('callbacks filter data length not sufficient for method signature in tx {}, skipping'.format(tx.hash))
return
logg.debug('checking callbacks filter input {}'.format(tx.payload[:8]))
if transfer_data != None:
logg.debug('wtfoo {}'.format(transfer_data))
token_symbol = None
result = None
try:
tokentx = ExtendedTx(self.chain_spec)
tokentx = ExtendedTx(tx.hash, self.chain_spec)
tokentx.set_actors(transfer_data['from'], transfer_data['to'], self.trusted_addresses)
tokentx.set_tokens(transfer_data['token_address'], transfer_data['value'])
self.call_back(tokentx.to_dict())
if transfer_data['status'] == 0:
tokentx.set_status(1)
else:
tokentx.set_status(0)
t = self.call_back(transfer_type, tokentx.to_dict())
logg.info('callback success task id {} tx {}'.format(t, tx.hash))
except UnknownContractError:
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash.hex()))
logg.debug('callback filter {}:{} skipping "transfer" method on unknown contract {} tx {}'.format(tc.queue, tc.method, transfer_data['to'], tx.hash))
def __str__(self):
return 'cic-eth callbacks'

View File

@@ -1,31 +1,32 @@
# standard imports
import logging
# third-party imports
# external imports
from cic_registry.chain import ChainSpec
from hexathon import add_0x
# local imports
from cic_eth.db.enum import StatusBits
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db import Otx
from cic_eth.db.models.otx import Otx
from cic_eth.queue.tx import get_paused_txs
from cic_eth.eth.task import create_check_gas_and_send_task
from .base import SyncFilter
logg = logging.getLogger()
logg = logging.getLogger(__name__)
class GasFilter(SyncFilter):
def __init__(self, gas_provider, queue=None):
def __init__(self, chain_spec, queue=None):
self.queue = queue
self.gas_provider = gas_provider
self.chain_spec = chain_spec
def filter(self, w3, tx, rcpt, chain_str, session=None):
logg.debug('applying gas filter')
tx_hash_hex = tx.hash.hex()
if tx['value'] > 0:
def filter(self, conn, block, tx, session):
tx_hash_hex = add_0x(tx.hash)
if tx.value > 0:
logg.debug('gas refill tx {}'.format(tx_hash_hex))
session = SessionBase.bind_session(session)
q = session.query(TxCache.recipient)
@@ -34,23 +35,26 @@ class GasFilter(SyncFilter):
r = q.first()
if r == None:
logg.warning('unsolicited gas refill tx {}'.format(tx_hash_hex))
logg.debug('unsolicited gas refill tx {}'.format(tx_hash_hex))
SessionBase.release_session(session)
return
chain_spec = ChainSpec.from_chain_str(chain_str)
txs = get_paused_txs(StatusEnum.WAITFORGAS, r[0], chain_spec.chain_id(), session=session)
txs = get_paused_txs(StatusBits.GAS_ISSUES, r[0], self.chain_spec.chain_id(), session=session)
SessionBase.release_session(session)
logg.info('resuming gas-in-waiting txs for {}'.format(r[0]))
if len(txs) > 0:
logg.info('resuming gas-in-waiting txs for {}: {}'.format(r[0], txs.keys()))
s = create_check_gas_and_send_task(
list(txs.values()),
str(chain_str),
str(self.chain_spec),
r[0],
0,
tx_hashes_hex=list(txs.keys()),
queue=self.queue,
)
s.apply_async()
def __str__(self):
return 'eic-eth gasfilter'

View File

@@ -4,32 +4,54 @@ import logging
# third-party imports
import celery
from chainlib.eth.address import to_checksum
from hexathon import (
add_0x,
strip_0x,
)
# local imports
from .base import SyncFilter
logg = logging.getLogger()
logg = logging.getLogger(__name__)
account_registry_add_log_hash = '0x5ed3bdd47b9af629827a8d129aa39c870b10c03f0153fe9ddb8e84b665061acd' # keccak256(AccountAdded(address,uint256))
class RegistrationFilter(SyncFilter):
def filter(self, w3, tx, rcpt, chain_spec, session=None):
logg.debug('applying registration filter')
def __init__(self, chain_spec, queue):
self.chain_spec = chain_spec
self.queue = queue
def filter(self, conn, block, tx, db_session=None):
registered_address = None
for l in rcpt['logs']:
event_topic_hex = l['topics'][0].hex()
for l in tx.logs:
event_topic_hex = l['topics'][0]
if event_topic_hex == account_registry_add_log_hash:
address_bytes = l.topics[1][32-20:]
address = to_checksum(address_bytes.hex())
logg.debug('request token gift to {}'.format(address))
s = celery.signature(
'cic_eth.eth.account.gift',
# TODO: use abi conversion method instead
address_hex = strip_0x(l['topics'][1])[64-40:]
address = to_checksum(add_0x(address_hex))
logg.info('request token gift to {}'.format(address))
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
address,
str(chain_spec),
],
queue=queue,
queue=self.queue,
)
s.apply_async()
s_gift = celery.signature(
'cic_eth.eth.account.gift',
[
str(self.chain_spec),
],
queue=self.queue,
)
s_nonce.link(s_gift)
s_nonce.apply_async()
def __str__(self):
return 'cic-eth account registration'

View File

@@ -0,0 +1,97 @@
# standard imports
import logging
# external imports
import celery
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.address import to_checksum
from .base import SyncFilter
logg = logging.getLogger(__name__)
transfer_request_signature = 'ed71262a'
def unpack_create_request(data):
data = strip_0x(data)
cursor = 0
f = data[cursor:cursor+8]
cursor += 8
if f != transfer_request_signature:
raise ValueError('Invalid create request data ({})'.format(f))
o = {}
o['sender'] = data[cursor+24:cursor+64]
cursor += 64
o['recipient'] = data[cursor+24:cursor+64]
cursor += 64
o['token'] = data[cursor+24:cursor+64]
cursor += 64
o['value'] = int(data[cursor:], 16)
return o
class TransferAuthFilter(SyncFilter):
def __init__(self, registry, chain_spec, queue=None):
self.queue = queue
self.chain_spec = chain_spec
self.transfer_request_contract = registry.get_contract(self.chain_spec, 'TransferAuthorization')
def filter(self, conn, block, tx, session): #rcpt, chain_str, session=None):
if tx.payload == None:
logg.debug('no payload')
return False
payloadlength = len(tx.payload)
if payloadlength != 8+256:
logg.debug('{} below minimum length for a transfer auth call'.format(payloadlength))
logg.debug('payload {}'.format(tx.payload))
return False
recipient = tx.inputs[0]
if recipient != self.transfer_request_contract.address():
logg.debug('not our transfer auth contract address {}'.format(recipient))
return False
o = unpack_create_request(tx.payload)
sender = add_0x(to_checksum(o['sender']))
recipient = add_0x(to_checksum(recipient))
token = add_0x(to_checksum(o['token']))
token_data = {
'address': token,
}
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
[token_data],
sender,
],
queue=self.queue,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
sender,
recipient,
o['value'],
str(self.chain_spec),
],
queue=self.queue,
)
s_nonce.link(s_approve)
t = s_nonce.apply_async()
return True
def __str__(self):
return 'cic-eth transfer auth filter'

View File

@@ -3,39 +3,47 @@ import logging
# third-party imports
import celery
from hexathon import (
add_0x,
)
# local imports
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.base import SessionBase
from chainsyncer.db.models.base import SessionBase
from chainlib.status import Status
from .base import SyncFilter
logg = logging.getLogger()
logg = logging.getLogger(__name__)
class TxFilter(SyncFilter):
def __init__(self, queue):
def __init__(self, chain_spec, queue):
self.queue = queue
self.chain_spec = chain_spec
def filter(self, w3, tx, rcpt, chain_spec, session=None):
session = SessionBase.bind_session(session)
logg.debug('applying tx filter')
tx_hash_hex = tx.hash.hex()
otx = Otx.load(tx_hash_hex, session=session)
SessionBase.release_session(session)
def filter(self, conn, block, tx, db_session=None):
db_session = SessionBase.bind_session(db_session)
tx_hash_hex = tx.hash
otx = Otx.load(add_0x(tx_hash_hex), session=db_session)
if otx == None:
logg.debug('tx {} not found locally, skipping'.format(tx_hash_hex))
return None
logg.info('otx found {}'.format(otx.tx_hash))
logg.info('tx filter match on {}'.format(otx.tx_hash))
SessionBase.release_session(db_session)
s = celery.signature(
'cic_eth.queue.tx.set_final_status',
[
tx_hash_hex,
rcpt.blockNumber,
rcpt.status == 0,
add_0x(tx_hash_hex),
tx.block.number,
tx.status == Status.ERROR,
],
queue=self.queue,
)
t = s.apply_async()
return t
def __str__(self):
return 'cic-eth erc20 transfer filter'

View File

@@ -1,207 +0,0 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# third-party imports
import confini
import celery
import rlp
import web3
from web3 import HTTPProvider, WebsocketProvider
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from cic_registry import zero_address
from cic_registry.chain import ChainRegistry
from cic_registry.error import UnknownContractError
from cic_bancor.bancor import BancorRegistryClient
# local imports
import cic_eth
from cic_eth.eth import RpcClient
from cic_eth.db import SessionBase
from cic_eth.db import Otx
from cic_eth.db import TxConvertTransfer
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import StatusEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import get_paused_txs
from cic_eth.sync import Syncer
from cic_eth.sync.error import LoopDone
from cic_eth.db.error import UnknownConvertError
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.sync.backend import SyncerBackend
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.account import unpack_gift
from cic_eth.runnable.daemons.filters import (
CallbackFilter,
GasFilter,
TxFilter,
RegistrationFilter,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL)
logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL)
logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL)
config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true')
argparser.add_argument('-vv', help='be more verbose', action='store_true')
argparser.add_argument('mode', type=str, help='sync mode: (head|history)', default='head')
args = argparser.parse_args(sys.argv[1:])
if args.v == True:
logging.getLogger().setLevel(logging.INFO)
elif args.vv == True:
logging.getLogger().setLevel(logging.DEBUG)
config_dir = os.path.join(args.c)
os.makedirs(config_dir, 0o777, True)
config = confini.Config(config_dir, args.env_prefix)
config.process()
# override args
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
}
config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
RpcClient.set_constructor(web3_constructor)
c = RpcClient(chain_spec)
CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry, True)
declarator = CICRegistry.get_contract(chain_spec, 'AddressDeclarator', interface='Declarator')
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
def main():
global chain_spec, c, queue
if config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER') != None:
CICRegistry.add_role(chain_spec, config.get('ETH_ACCOUNT_ACCOUNTS_INDEX_WRITER'), 'AccountRegistry', True)
syncers = []
block_offset = c.w3.eth.blockNumber
chain = str(chain_spec)
if SyncerBackend.first(chain):
from cic_eth.sync.history import HistorySyncer
backend = SyncerBackend.initial(chain, block_offset)
syncer = HistorySyncer(backend)
syncers.append(syncer)
if args.mode == 'head':
from cic_eth.sync.head import HeadSyncer
block_sync = SyncerBackend.live(chain, block_offset+1)
syncers.append(HeadSyncer(block_sync))
elif args.mode == 'history':
from cic_eth.sync.history import HistorySyncer
backends = SyncerBackend.resume(chain, block_offset+1)
for backend in backends:
syncers.append(HistorySyncer(backend))
if len(syncers) == 0:
logg.info('found no unsynced history. terminating')
sys.exit(0)
else:
sys.stderr.write("unknown mode '{}'\n".format(args.mode))
sys.exit(1)
# bancor_registry_contract = CICRegistry.get_contract(chain_spec, 'BancorRegistry', interface='Registry')
# bancor_chain_registry = CICRegistry.get_chain_registry(chain_spec)
# bancor_registry = BancorRegistryClient(c.w3, bancor_chain_registry, config.get('ETH_ABI_DIR'))
# bancor_registry.load()
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_queue = queue
if len(task_split) > 1:
task_queue = task_split[0]
callback_filter = CallbackFilter(task_split[1], task_queue)
callback_filters.append(callback_filter)
tx_filter = TxFilter(queue)
registration_filter = RegistrationFilter()
gas_filter = GasFilter(c.gas_provider(), queue)
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
syncer.filter.append(gas_filter.filter)
syncer.filter.append(registration_filter.filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.filter.append(tx_filter.filter)
#syncer.filter.append(convert_filter)
for cf in callback_filters:
syncer.filter.append(cf.filter)
try:
syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')))
except LoopDone as e:
sys.stderr.write("sync '{}' done at block {}\n".format(args.mode, e))
i += 1
sys.exit(0)
if __name__ == '__main__':
main()

View File

@@ -138,7 +138,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_str):
# TODO: can we merely use the dispatcher instead?
def dispatch(chain_str):
txs = get_status_tx(StatusEnum.RETRY, datetime.datetime.utcnow())
txs = get_status_tx(StatusEnum.RETRY, before=datetime.datetime.utcnow())
if len(txs) == 0:
logg.debug('no retry state txs found')
return

View File

@@ -55,62 +55,25 @@ SessionBase.connect(dsn)
celery_app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
queue = args.q
re_transfer_approval_request = r'^/transferrequest/?'
re_something = r'^/something/?'
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
def process_transfer_approval_request(session, env):
r = re.match(re_transfer_approval_request, env.get('PATH_INFO'))
def process_something(session, env):
r = re.match(re_something, env.get('PATH_INFO'))
if not r:
return None
if env.get('CONTENT_TYPE') != 'application/json':
raise AttributeError('content type')
#if env.get('CONTENT_TYPE') != 'application/json':
# raise AttributeError('content type')
if env.get('REQUEST_METHOD') != 'POST':
raise AttributeError('method')
#if env.get('REQUEST_METHOD') != 'POST':
# raise AttributeError('method')
post_data = json.load(env.get('wsgi.input'))
token_address = web3.Web3.toChecksumAddress(post_data['token_address'])
holder_address = web3.Web3.toChecksumAddress(post_data['holder_address'])
beneficiary_address = web3.Web3.toChecksumAddress(post_data['beneficiary_address'])
value = int(post_data['value'])
logg.debug('transfer approval request token {} to {} from {} value {}'.format(
token_address,
beneficiary_address,
holder_address,
value,
)
)
s = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
[
{
'address': token_address,
},
],
holder_address,
beneficiary_address,
value,
config.get('CIC_CHAIN_SPEC'),
],
queue=queue,
)
t = s.apply_async()
r = t.get()
tx_raw_bytes = bytes.fromhex(r[0][2:])
tx = unpack_signed_raw_tx(tx_raw_bytes, chain_spec.chain_id())
for r in t.collect():
logg.debug('result {}'.format(r))
if not t.successful():
raise RuntimeError(tx['hash'])
return ('text/plain', tx['hash'].encode('utf-8'),)
#post_data = json.load(env.get('wsgi.input'))
#return ('text/plain', 'foo'.encode('utf-8'),)
# uwsgi application
@@ -125,7 +88,7 @@ def application(env, start_response):
session = SessionBase.create_session()
for handler in [
process_transfer_approval_request,
process_something,
]:
try:
r = handler(session, env)

View File

@@ -26,12 +26,12 @@ from cic_eth.eth import bancor
from cic_eth.eth import token
from cic_eth.eth import tx
from cic_eth.eth import account
from cic_eth.eth import request
from cic_eth.admin import debug
from cic_eth.admin import ctrl
from cic_eth.eth.rpc import RpcClient
from cic_eth.eth.rpc import GasOracle
from cic_eth.queue import tx
from cic_eth.queue import balance
from cic_eth.callbacks import Callback
from cic_eth.callbacks import http
from cic_eth.callbacks import tcp
@@ -39,6 +39,7 @@ from cic_eth.callbacks import redis
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db import dsn_from_config
from cic_eth.ext import tx
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -49,6 +50,7 @@ argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='web3 provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config file')
argparser.add_argument('-q', type=str, default='cic-eth', help='queue name for worker tasks')
argparser.add_argument('-r', type=str, help='CIC registry address')
argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi')
argparser.add_argument('--trace-queue-status', default=None, dest='trace_queue_status', action='store_true', help='set to perist all queue entry status changes to storage')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
@@ -68,6 +70,7 @@ config.process()
args_override = {
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
'ETH_PROVIDER': getattr(args, 'p'),
'TASKS_TRACE_QUEUE_STATUS': getattr(args, 'trace_queue_status'),
}
@@ -228,7 +231,7 @@ def main():
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
oracle = DeclaratorOracleAdapter(declarator.contract, trusted_addresses)
chain_registry.add_oracle('naive_erc20_oracle', oracle)
chain_registry.add_oracle(oracle, 'naive_erc20_oracle')
#chain_spec = CICRegistry.default_chain_spec

View File

@@ -0,0 +1,172 @@
# standard imports
import os
import sys
import logging
import time
import argparse
import sys
import re
# third-party imports
import confini
import celery
import rlp
import web3
from web3 import HTTPProvider, WebsocketProvider
import cic_base.config
import cic_base.log
import cic_base.argparse
import cic_base.rpc
from cic_registry import CICRegistry
from chainlib.chain import ChainSpec
from cic_registry import zero_address
from cic_registry.chain import ChainRegistry
from cic_registry.error import UnknownContractError
from chainlib.eth.connection import HTTPConnection
from chainlib.eth.block import (
block_latest,
)
from hexathon import (
strip_0x,
)
from chainsyncer.backend import SyncerBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.db.models.base import SessionBase
# local imports
from cic_eth.registry import init_registry
from cic_eth.eth import RpcClient
from cic_eth.db import Otx
from cic_eth.db import TxConvertTransfer
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import StatusEnum
from cic_eth.db import dsn_from_config
from cic_eth.queue.tx import get_paused_txs
#from cic_eth.sync import Syncer
#from cic_eth.sync.error import LoopDone
from cic_eth.db.error import UnknownConvertError
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.token import unpack_transfer
from cic_eth.eth.token import unpack_transferfrom
from cic_eth.eth.account import unpack_gift
from cic_eth.runnable.daemons.filters import (
CallbackFilter,
GasFilter,
TxFilter,
RegistrationFilter,
TransferAuthFilter,
)
script_dir = os.path.realpath(os.path.dirname(__file__))
logg = cic_base.log.create()
argparser = cic_base.argparse.create(script_dir, cic_base.argparse.full_template)
#argparser = cic_base.argparse.add(argparser, add_traffic_args, 'traffic')
args = cic_base.argparse.parse(argparser, logg)
config = cic_base.config.create(args.c, args, args.env_prefix)
config.add(args.y, '_KEYSTORE_FILE', True)
config.add(args.q, '_CELERY_QUEUE', True)
cic_base.config.log(config)
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=1, debug=config.true('DATABASE_DEBUG'))
def main():
# parse chain spec object
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
# set up registry
w3 = cic_base.rpc.create(config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored
registry = init_registry(config, w3)
# Connect to blockchain with chainlib
conn = HTTPConnection(config.get('ETH_PROVIDER'))
o = block_latest()
r = conn.do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.debug('starting at block {}'.format(block_offset))
syncers = []
#if SyncerBackend.first(chain_spec):
# backend = SyncerBackend.initial(chain_spec, block_offset)
syncer_backends = SyncerBackend.resume(chain_spec, block_offset)
if len(syncer_backends) == 0:
logg.info('found no backends to resume')
syncer_backends.append(SyncerBackend.initial(chain_spec, block_offset))
else:
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
syncer_backends.append(SyncerBackend.live(chain_spec, block_offset+1))
for syncer_backend in syncer_backends:
try:
syncers.append(HistorySyncer(syncer_backend))
logg.info('Initializing HISTORY syncer on backend {}'.format(syncer_backend))
except AttributeError:
logg.info('Initializing HEAD syncer on backend {}'.format(syncer_backend))
syncers.append(HeadSyncer(syncer_backend))
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
sys.exit(1)
trusted_addresses = trusted_addresses_src.split(',')
for address in trusted_addresses:
logg.info('using trusted address {}'.format(address))
CallbackFilter.trusted_addresses = trusted_addresses
callback_filters = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_queue = config.get('_CELERY_QUEUE')
if len(task_split) > 1:
task_queue = task_split[0]
callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue)
callback_filters.append(callback_filter)
tx_filter = TxFilter(chain_spec, config.get('_CELERY_QUEUE'))
registration_filter = RegistrationFilter(chain_spec, config.get('_CELERY_QUEUE'))
gas_filter = GasFilter(chain_spec, config.get('_CELERY_QUEUE'))
transfer_auth_filter = TransferAuthFilter(registry, chain_spec, config.get('_CELERY_QUEUE'))
i = 0
for syncer in syncers:
logg.debug('running syncer index {}'.format(i))
syncer.add_filter(gas_filter)
syncer.add_filter(registration_filter)
# TODO: the two following filter functions break the filter loop if return uuid. Pro: less code executed. Con: Possibly unintuitive flow break
syncer.add_filter(tx_filter)
syncer.add_filter(transfer_auth_filter)
for cf in callback_filters:
syncer.add_filter(cf)
r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), conn)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1
sys.exit(0)
if __name__ == '__main__':
main()

View File

@@ -18,12 +18,16 @@ import web3
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from cic_registry.chain import ChainRegistry
from hexathon import add_0x
# local imports
from cic_eth.api import AdminApi
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import LockEnum
from cic_eth.db.enum import (
StatusEnum,
status_str,
LockEnum,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -36,9 +40,10 @@ default_abi_dir = '/usr/share/local/cic/solidity/abi'
default_config_dir = os.path.join('/usr/local/etc/cic-eth')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-r', '--registry-address', type=str, help='CIC registry address')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='Web3 provider url (http only)')
argparser.add_argument('-r', '--registry-address', dest='r', type=str, help='CIC registry address')
argparser.add_argument('-f', '--format', dest='f', default='terminal', type=str, help='Output format')
argparser.add_argument('--status-raw', dest='status_raw', action='store_true', help='Output statis bit enum names only')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
@@ -61,12 +66,16 @@ config.process()
args_override = {
'ETH_PROVIDER': getattr(args, 'p'),
'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_REGISTRY_ADDRESS': getattr(args, 'r'),
}
# override args
config.dict_override(args_override, 'cli args')
config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
config.add(add_0x(args.query), '_QUERY', True)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
@@ -114,7 +123,7 @@ def render_tx(o, **kwargs):
for v in o.get('status_log', []):
d = datetime.datetime.fromisoformat(v[0])
e = StatusEnum(v[1]).name
e = status_str(v[1], args.status_raw)
content += '{}: {}\n'.format(d, e)
return content
@@ -148,21 +157,20 @@ def render_lock(o, **kwargs):
# TODO: move each command to submodule
def main():
logg.debug('len {}'.format(len(args.query)))
txs = []
renderer = render_tx
if len(args.query) > 66:
txs = [admin_api.tx(chain_spec, tx_raw=args.query)]
elif len(args.query) > 42:
txs = [admin_api.tx(chain_spec, tx_hash=args.query)]
elif len(args.query) == 42:
txs = admin_api.account(chain_spec, args.query, include_recipient=False)
if len(config.get('_QUERY')) > 66:
txs = [admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'))]
elif len(config.get('_QUERY')) > 42:
txs = [admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'))]
elif len(config.get('_QUERY')) == 42:
txs = admin_api.account(chain_spec, config.get('_QUERY'), include_recipient=False)
renderer = render_account
elif len(args.query) >= 4 and args.query[:4] == 'lock':
elif len(config.get('_QUERY')) >= 4 and config.get('_QUERY')[:4] == 'lock':
txs = admin_api.get_lock()
renderer = render_lock
else:
raise ValueError('cannot parse argument {}'.format(args.query))
raise ValueError('cannot parse argument {}'.format(config.get('_QUERY')))
if len(txs) == 0:
logg.info('no matches found')

View File

@@ -21,7 +21,7 @@ class HistorySyncer(MinedSyncer):
:param mx: Maximum number of blocks to return in one call
:type mx: int
"""
def __init__(self, bc_cache, mx=20):
def __init__(self, bc_cache, mx=500):
super(HistorySyncer, self).__init__(bc_cache)
self.max = mx

View File

@@ -23,6 +23,8 @@ class MinedSyncer(Syncer):
:type bc_cache: Object implementing methods from cic_eth.sync.SyncerBackend
"""
yield_delay = 0.005
def __init__(self, bc_cache):
super(MinedSyncer, self).__init__(bc_cache)
self.block_offset = 0
@@ -98,7 +100,10 @@ class MinedSyncer(Syncer):
logg.debug('got blocks {}'.format(e))
for block in e:
block_number = self.process(c.w3, block.hex())
logg.info('processed block {} {}'.format(block_number, block.hex()))
logg.debug('processed block {} {}'.format(block_number, block.hex()))
self.bc_cache.disconnect()
time.sleep(interval)
if len(e) > 0:
time.sleep(self.yield_delay)
else:
time.sleep(interval)
logg.info("Syncer no longer set to run, gracefully exiting")

View File

@@ -9,7 +9,10 @@ import celery
# local imports
from .base import Syncer
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
from cic_eth.queue.tx import get_status_tx
logg = logging.getLogger()
@@ -47,7 +50,8 @@ class RetrySyncer(Syncer):
# )
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
stalled_txs = get_status_tx(
StatusEnum.SENT.value,
StatusBits.IN_NETWORK.value,
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
before=before,
)
# return list(failed_txs.keys()) + list(stalled_txs.keys())

View File

@@ -0,0 +1,33 @@
# import
import requests
# external imports
import celery
import sqlalchemy
class CriticalTask(celery.Task):
retry_jitter = True
retry_backoff = True
retry_backoff_max = 8
class CriticalSQLAlchemyTask(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
)
class CriticalWeb3Task(CriticalTask):
autoretry_for = (
requests.exceptions.ConnectionError,
)
class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
autoretry_for = (
sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError,
requests.exceptions.ConnectionError,
)

View File

@@ -10,7 +10,7 @@ version = (
0,
10,
0,
'alpha.27',
'alpha.39',
)
version_object = semver.VersionInfo(

View File

@@ -2,4 +2,4 @@
registry_address =
chain_spec =
tx_retry_delay =
trust_address =
trust_address =

View File

@@ -0,0 +1,2 @@
[bancor]
dir = /usr/local/share/cic/bancor

View File

@@ -0,0 +1,3 @@
[celery]
broker_url = redis://localhost:63379
result_url = redis://localhost:63379

View File

@@ -0,0 +1,4 @@
[cic]
registry_address =
chain_spec = evm:bloxberg:8996
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C

View File

@@ -0,0 +1,2 @@
[custody]
account_index_address =

View File

@@ -0,0 +1,9 @@
[database]
NAME=cic_eth
USER=postgres
PASSWORD=tralala
HOST=localhost
PORT=63432
ENGINE=postgresql
DRIVER=psycopg2
DEBUG=0

View File

@@ -0,0 +1,2 @@
[dispatcher]
loop_interval = 0.9

View File

@@ -0,0 +1,8 @@
[eth]
#ws_provider = ws://localhost:8546
#ttp_provider = http://localhost:8545
provider = http://localhost:63545
gas_provider_address =
#chain_id =
abi_dir = /home/lash/src/ext/cic/grassrootseconomics/cic-contracts/abis
account_accounts_index_writer =

View File

@@ -0,0 +1,4 @@
[redis]
host = localhost
port = 63379
db = 0

View File

@@ -0,0 +1,5 @@
[signer]
socket_path = /tmp/crypto-dev-signer/jsonrpc.ipc
secret = deedbeef
database_name = signer_test
dev_keys_path =

View File

@@ -0,0 +1,6 @@
[SSL]
enable_client = false
cert_file =
key_file =
password =
ca_file =

View File

@@ -0,0 +1,2 @@
[SYNCER]
loop_interval = 1

View File

@@ -0,0 +1,3 @@
[tasks]
transfer_callbacks = taskcall:cic_eth.callbacks.noop.noop
trace_queue_status = 1

View File

@@ -16,7 +16,7 @@ ARG root_requirement_file='requirements.txt'
#RUN apk add linux-headers
#RUN apk add libffi-dev
RUN apt-get update && \
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps
apt install -y gcc gnupg libpq-dev wget make g++ gnupg bash procps git
# Copy shared requirements from top of mono-repo
RUN echo "copying root req file ${root_requirement_file}"
@@ -42,7 +42,6 @@ COPY cic-eth/config/ /usr/local/etc/cic-eth/
COPY cic-eth/cic_eth/db/migrations/ /usr/local/share/cic-eth/alembic/
COPY cic-eth/crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/
RUN apt-get install -y git && \
git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi

View File

@@ -1,5 +0,0 @@
#!/bin/bash
. ./db.sh
/usr/local/bin/cic-eth-managerd $@

View File

@@ -6,7 +6,7 @@ set -e
# set CONFINI_ENV_PREFIX to override the env prefix to override env vars
echo "!!! starting signer"
python /usr/local/bin/crypto-dev-daemon -vv -c /usr/local/etc/crypto-dev-signer &
python /usr/local/bin/crypto-dev-daemon -c /usr/local/etc/crypto-dev-signer &
echo "!!! starting tracker"
/usr/local/bin/cic-eth-taskerd $@

View File

@@ -0,0 +1,5 @@
#!/bin/bash
. ./db.sh
/usr/local/bin/cic-eth-trackerd $@

View File

@@ -1,15 +1,16 @@
cic-base~=0.1.1a10
web3==5.12.2
celery==4.4.7
crypto-dev-signer~=0.4.13rc2
confini~=0.3.6b1
cic-registry~=0.5.3a18
crypto-dev-signer~=0.4.13rc4
confini~=0.3.6rc3
cic-registry~=0.5.3a22
cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2
websockets==8.1
requests~=2.24.0
eth_accounts_index~=0.0.10a7
erc20-approval-escrow~=0.3.0a5
eth_accounts_index~=0.0.10a10
erc20-transfer-authorization~=0.3.0a10
erc20-single-shot-faucet~=0.2.0a6
rlp==2.0.1
uWSGI==2.0.19.1
@@ -18,5 +19,6 @@ eth-gas-proxy==0.0.1a4
websocket-client==0.57.0
moolb~=0.1.1b2
eth-address-index~=0.1.0a8
chainlib~=0.0.1a12
chainlib~=0.0.1a20
hexathon~=0.0.1a3
chainsyncer~=0.0.1a19

View File

@@ -45,7 +45,7 @@ scripts =
console_scripts =
# daemons
cic-eth-taskerd = cic_eth.runnable.daemons.tasker:main
cic-eth-managerd = cic_eth.runnable.daemons.manager:main
cic-eth-trackerd = cic_eth.runnable.daemons.tracker:main
cic-eth-dispatcherd = cic_eth.runnable.daemons.dispatcher:main
cic-eth-retrierd = cic_eth.runnable.daemons.retry:main
# tools

View File

@@ -13,13 +13,13 @@ def celery_includes():
return [
'cic_eth.eth.bancor',
'cic_eth.eth.token',
'cic_eth.eth.request',
'cic_eth.eth.tx',
'cic_eth.ext.tx',
'cic_eth.queue.tx',
'cic_eth.queue.balance',
'cic_eth.admin.ctrl',
'cic_eth.admin.nonce',
'cic_eth.admin.debug',
'cic_eth.eth.account',
'cic_eth.callbacks.noop',
'cic_eth.callbacks.http',

View File

@@ -27,7 +27,7 @@ def database_engine(
SessionBase.poolable = False
dsn = dsn_from_config(load_config)
#SessionBase.connect(dsn, True)
SessionBase.connect(dsn, load_config.get('DATABASE_DEBUG') != None)
SessionBase.connect(dsn, debug=load_config.get('DATABASE_DEBUG') != None)
return dsn

View File

@@ -15,6 +15,7 @@ from eth_keys import KeyAPI
from cic_eth.eth import RpcClient
from cic_eth.eth.rpc import GasOracle
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
@@ -113,11 +114,17 @@ def init_w3_conn(
@pytest.fixture(scope='function')
def init_w3(
init_database,
init_eth_tester,
init_eth_account_roles,
init_w3_conn,
):
for address in init_w3_conn.eth.accounts:
nonce = init_w3_conn.eth.getTransactionCount(address, 'pending')
Nonce.init(address, nonce=nonce, session=init_database)
init_database.commit()
yield init_w3_conn
logg.debug('mining om nom nom... {}'.format(init_eth_tester.mine_block()))
@@ -128,9 +135,10 @@ def init_eth_account_roles(
w3_account_roles,
):
role = AccountRole.set('GAS_GIFTER', w3_account_roles.get('eth_account_gas_provider'))
address = w3_account_roles.get('eth_account_gas_provider')
role = AccountRole.set('GAS_GIFTER', address)
init_database.add(role)
init_database.commit()
return w3_account_roles
@@ -163,7 +171,6 @@ def w3_account_roles(
role_ids = [
'eth_account_bancor_deployer',
'eth_account_gas_provider',
'eth_account_reserve_owner',
'eth_account_reserve_minter',
'eth_account_accounts_index_owner',
@@ -172,6 +179,7 @@ def w3_account_roles(
'eth_account_sarafu_gifter',
'eth_account_approval_owner',
'eth_account_faucet_owner',
'eth_account_gas_provider',
]
roles = {}
@@ -187,6 +195,7 @@ def w3_account_roles(
return roles
@pytest.fixture(scope='session')
def w3_account_token_owners(
tokens_to_deploy,

View File

@@ -10,6 +10,8 @@ import web3
# local imports
from cic_eth.api import AdminApi
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
@@ -39,18 +41,37 @@ def test_resend_inplace(
c = RpcClient(default_chain_spec)
sigs = []
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
gas_provider = c.gas_provider()
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[0],
gas_provider,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
chain_str,
],
queue=None,
)
t = s.apply_async()
tx_raw = t.get()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
o = q.first()
tx_raw = o.signed_tx
tx_dict = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), default_chain_spec.chain_id())
gas_price_before = tx_dict['gasPrice']

View File

@@ -49,28 +49,7 @@ def test_transfer_api(
assert t.successful()
def test_transfer_approval_api(
default_chain_spec,
init_w3,
cic_registry,
init_database,
bancor_registry,
bancor_tokens,
transfer_approval,
celery_session_worker,
):
token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
approval_contract = CICRegistry.get_contract(default_chain_spec, 'TransferApproval')
api = Api(str(default_chain_spec), callback_param='transfer_request', callback_task='cic_eth.callbacks.noop.noop', queue=None)
t = api.transfer_request(init_w3.eth.accounts[2], init_w3.eth.accounts[4], approval_contract.address(), 111, token.symbol())
t.get()
#for r in t.collect():
# print(r)
assert t.successful()
@pytest.mark.skip()
def test_convert_api(
default_chain_spec,
init_w3,
@@ -91,6 +70,7 @@ def test_convert_api(
assert t.successful()
@pytest.mark.skip()
def test_convert_transfer_api(
default_chain_spec,
init_w3,

View File

@@ -9,6 +9,10 @@ from tests.mock.filter import (
block_filter,
tx_filter,
)
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
logg = logging.getLogger()
@@ -28,9 +32,20 @@ def test_list_tx(
tx_hashes = []
# external tx
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
NonceReservation.next(init_w3.eth.accounts[0], 'foo', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(13)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 3000, default_chain_spec, 'foo')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
@@ -42,9 +57,12 @@ def test_list_tx(
tx_filter.add(a.to_bytes(4, 'big'))
# external tx
NonceReservation.next(init_w3.eth.accounts[0], 'bar', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(28)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec)
tx = txf.transfer(dummy_token_gifted, init_w3.eth.accounts[1], 4000, default_chain_spec, 'bar')
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
tx_hashes.append(tx_hash_hex)
init_w3.eth.sendRawTransaction(tx_signed_raw_hex)
@@ -56,10 +74,13 @@ def test_list_tx(
tx_filter.add(a.to_bytes(4, 'big'))
# custodial tx
#NonceReservation.next(init_w3.eth.accounts[0], 'blinky', session=init_database)
#init_database.commit()
init_eth_tester.mine_blocks(3)
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
#txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
api = Api(str(default_chain_spec), queue=None)
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000, 'DUM')
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 1000, 'DUM') #, 'blinky')
t.get()
tx_hash_hex = None
for c in t.collect():
@@ -68,9 +89,11 @@ def test_list_tx(
tx_hashes.append(tx_hash_hex)
# custodial tx
#NonceReservation.next(init_w3.eth.accounts[0], 'clyde', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(6)
api = Api(str(default_chain_spec), queue=None)
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 2000, 'DUM')
t = api.transfer(init_w3.eth.accounts[0], init_w3.eth.accounts[1], 2000, 'DUM') #, 'clyde')
t.get()
tx_hash_hex = None
for c in t.collect():

View File

@@ -64,6 +64,7 @@ def test_register_account(
init_database,
init_eth_tester,
init_w3,
init_rpc,
cic_registry,
celery_session_worker,
eth_empty_accounts,
@@ -71,18 +72,27 @@ def test_register_account(
logg.debug('chainspec {}'.format(str(default_chain_spec)))
s = celery.signature(
'cic_eth.eth.account.register',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
eth_empty_accounts[0],
init_w3.eth.accounts[0],
],
queue=None,
)
s_register = celery.signature(
'cic_eth.eth.account.register',
[
str(default_chain_spec),
init_w3.eth.accounts[0],
],
)
t = s.apply_async()
s_nonce.link(s_register)
t = s_nonce.apply_async()
address = t.get()
r = t.collect()
t.successful()
for r in t.collect():
pass
assert t.successful()
session = SessionBase.create_session()
o = session.query(Otx).first()

View File

@@ -8,6 +8,7 @@ import celery
# local imports
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.nonce import Nonce
from cic_eth.eth.util import unpack_signed_raw_tx
#logg = logging.getLogger(__name__)
@@ -31,18 +32,38 @@ def test_balance_complex(
}
tx_hashes = []
# TODO: Temporary workaround for nonce db cache initialization being made before deployments.
# Instead use different accounts than system ones for transfers for tests
nonce = init_w3.eth.getTransactionCount(init_w3.eth.accounts[0])
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==init_w3.eth.accounts[0])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
for i in range(3):
s = celery.signature(
'cic_eth.eth.token.transfer',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
[token_data],
init_w3.eth.accounts[0],
],
queue=None,
)
s_transfer = celery.signature(
'cic_eth.eth.token.transfer',
[
init_w3.eth.accounts[0],
init_w3.eth.accounts[1],
1000*(i+1),
chain_str,
],
queue=None,
)
t = s.apply_async()
s_nonce.link(s_transfer)
t = s_nonce.apply_async()
t.get()
r = None
for c in t.collect():

View File

@@ -1,14 +1,19 @@
# standard imports
import logging
import os
# external imports
import pytest
import celery
# local imports
from cic_eth.db import TxConvertTransfer
from cic_eth.eth.bancor import BancorTxFactory
logg = logging.getLogger()
@pytest.mark.skip()
def test_transfer_after_convert(
init_w3,
init_database,

View File

@@ -0,0 +1,29 @@
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
def test_debug_alert(
init_database,
celery_session_worker,
):
s = celery.signature(
'cic_eth.admin.debug.alert',
[
'foo',
'bar',
'baz',
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
q = init_database.query(Debug)
q = q.filter(Debug.tag=='bar')
o = q.first()
assert o.description == 'baz'

View File

@@ -0,0 +1,29 @@
# external imports
import celery
# local imports
from cic_eth.db.models.debug import Debug
def test_debug_alert(
init_database,
celery_session_worker,
):
s = celery.signature(
'cic_eth.admin.debug.alert',
[
'foo',
'bar',
'baz',
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
q = init_database.query(Debug)
q = q.filter(Debug.tag=='bar')
o = q.first()
assert o.description == 'baz'

View File

@@ -10,6 +10,9 @@ import celery
from cic_eth.eth.account import unpack_gift
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
logg = logging.getLogger()
@@ -32,10 +35,16 @@ def test_faucet(
init_database,
):
s = celery.signature(
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[7],
],
queue=None,
)
s_gift = celery.signature(
'cic_eth.eth.account.gift',
[
init_w3.eth.accounts[7],
str(default_chain_spec),
],
)
@@ -45,15 +54,21 @@ def test_faucet(
str(default_chain_spec),
],
)
s.link(s_send)
t = s.apply_async()
signed_tx = t.get()
s_gift.link(s_send)
s_nonce.link(s_gift)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
logg.debug('result {}'.format(r))
assert t.successful()
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[0][2:]), default_chain_spec.chain_id())
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==init_w3.eth.accounts[7])
o = q.first()
signed_tx = o.signed_tx
tx = unpack_signed_raw_tx(bytes.fromhex(signed_tx[2:]), default_chain_spec.chain_id())
giveto = unpack_gift(tx['data'])
assert giveto['to'] == init_w3.eth.accounts[7]

View File

@@ -30,6 +30,7 @@ def test_refill_gas(
default_chain_spec,
init_eth_tester,
init_rpc,
init_w3,
init_database,
cic_registry,
init_eth_account_roles,
@@ -37,30 +38,46 @@ def test_refill_gas(
eth_empty_accounts,
):
provider_address = AccountRole.get_address('GAS_GIFTER')
provider_address = AccountRole.get_address('GAS_GIFTER', init_database)
receiver_address = eth_empty_accounts[0]
c = init_rpc
refill_amount = c.refill_amount()
balance = init_rpc.w3.eth.getBalance(receiver_address)
s = celery.signature(
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
eth_empty_accounts[0],
provider_address,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
receiver_address,
str(default_chain_spec),
],
queue=None,
)
t = s.apply_async()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
r = t.get()
t.collect()
for c in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==receiver_address)
o = q.first()
signed_tx = o.signed_tx
s = celery.signature(
'cic_eth.eth.tx.send',
[
[r],
[signed_tx],
str(default_chain_spec),
],
)
@@ -74,11 +91,11 @@ def test_refill_gas(
assert balance_new == (balance + refill_amount)
# Verify that entry is added in TxCache
session = SessionBase.create_session()
q = session.query(Otx)
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==receiver_address)
r = q.first()
init_database.commit()
assert r.status == StatusEnum.SENT
@@ -86,6 +103,7 @@ def test_refill_gas(
def test_refill_deduplication(
default_chain_spec,
init_rpc,
init_w3,
init_database,
init_eth_account_roles,
cic_registry,
@@ -93,89 +111,137 @@ def test_refill_deduplication(
eth_empty_accounts,
):
provider_address = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS')
provider_address = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS', init_database)
receiver_address = eth_empty_accounts[0]
c = init_rpc
refill_amount = c.refill_amount()
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
receiver_address,
provider_address,
],
queue=None,
)
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
str(default_chain_spec),
],
queue=None,
)
t = s.apply_async()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
r = t.get()
for e in t.collect():
pass
assert t.successful()
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
receiver_address,
str(default_chain_spec),
provider_address,
],
queue=None,
)
t = s.apply_async()
with pytest.raises(AlreadyFillingGasError):
t.get()
def test_check_gas(
default_chain_spec,
init_eth_tester,
init_w3,
init_rpc,
eth_empty_accounts,
init_database,
cic_registry,
celery_session_worker,
bancor_registry,
bancor_tokens,
):
provider_address = init_w3.eth.accounts[0]
gas_receiver_address = eth_empty_accounts[0]
token_receiver_address = init_w3.eth.accounts[1]
c = init_rpc
txf = TokenTxFactory(gas_receiver_address, c)
tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
gas_price = c.gas_price()
gas_limit = tx_transfer['gas']
s = celery.signature(
'cic_eth.eth.tx.check_gas',
s_refill = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
[tx_hash_hex],
str(default_chain_spec),
[],
gas_receiver_address,
gas_limit * gas_price,
],
)
t = s.apply_async()
with pytest.raises(OutOfGasError):
r = t.get()
#assert len(r) == 0
time.sleep(1)
t.collect()
s_nonce.link(s_refill)
t = s_nonce.apply_async()
#with pytest.raises(AlreadyFillingGasError):
t.get()
for e in t.collect():
pass
assert t.successful()
logg.warning('TODO: complete test by checking that second tx had zero value')
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash_hex)
r = q.first()
session.close()
assert r.status == StatusEnum.WAITFORGAS
# TODO: check gas is part of the transfer chain, and we cannot create the transfer nonce by uuid before the task. Test is subsumed by transfer task test, but should be tested in isolation
#def test_check_gas(
# default_chain_spec,
# init_eth_tester,
# init_w3,
# init_rpc,
# eth_empty_accounts,
# init_database,
# cic_registry,
# celery_session_worker,
# bancor_registry,
# bancor_tokens,
# ):
#
# provider_address = init_w3.eth.accounts[0]
# gas_receiver_address = eth_empty_accounts[0]
# token_receiver_address = init_w3.eth.accounts[1]
#
## c = init_rpc
## txf = TokenTxFactory(gas_receiver_address, c)
## tx_transfer = txf.transfer(bancor_tokens[0], token_receiver_address, 42, default_chain_spec, 'foo')
##
## (tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, str(default_chain_spec), None)
#
# token_data = [
# {
# 'address': bancor_tokens[0],
# },
# ]
#
# s_nonce = celery.signature(
# 'cic_eth.eth.tx.reserve_nonce',
# [
# token_data,
# init_w3.eth.accounts[0],
# ],
# queue=None,
# )
# s_transfer = celery.signature(
# 'cic_eth.eth.token.transfer',
# [
# init_w3.eth.accounts[0],
# init_w3.eth.accounts[1],
# 1024,
# str(default_chain_spec),
# ],
# queue=None,
# )
#
# gas_price = c.gas_price()
# gas_limit = tx_transfer['gas']
#
# s = celery.signature(
# 'cic_eth.eth.tx.check_gas',
# [
# [tx_hash_hex],
# str(default_chain_spec),
# [],
# gas_receiver_address,
# gas_limit * gas_price,
# ],
# )
# s_nonce.link(s_transfer)
# t = s_nonce.apply_async()
# with pytest.raises(OutOfGasError):
# r = t.get()
# #assert len(r) == 0
#
# time.sleep(1)
# t.collect()
#
# session = SessionBase.create_session()
# q = session.query(Otx)
# q = q.filter(Otx.tx_hash==tx_hash_hex)
# r = q.first()
# session.close()
# assert r.status == StatusEnum.WAITFORGAS
def test_resend_with_higher_gas(
@@ -191,39 +257,73 @@ def test_resend_with_higher_gas(
):
c = init_rpc
txf = TokenTxFactory(init_w3.eth.accounts[0], c)
tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec)
logg.debug('txtransfer {}'.format(tx_transfer))
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec))
logg.debug('signed raw {}'.format(tx_signed_raw_hex))
queue_create(
tx_transfer['nonce'],
tx_transfer['from'],
tx_hash_hex,
tx_signed_raw_hex,
str(default_chain_spec),
token_data = {
'address': bancor_tokens[0],
}
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
[token_data],
init_w3.eth.accounts[0],
],
queue=None,
)
logg.debug('create {}'.format(tx_transfer['from']))
cache_transfer_data(
tx_hash_hex,
tx_transfer, #_signed_raw_hex,
s_transfer = celery.signature(
'cic_eth.eth.token.transfer',
[
init_w3.eth.accounts[0],
init_w3.eth.accounts[1],
1024,
str(default_chain_spec),
],
queue=None,
)
# txf = TokenTxFactory(init_w3.eth.accounts[0], c)
# tx_transfer = txf.transfer(bancor_tokens[0], init_w3.eth.accounts[1], 1024, default_chain_spec, 'foo')
# logg.debug('txtransfer {}'.format(tx_transfer))
# (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx_transfer, str(default_chain_spec))
# logg.debug('signed raw {}'.format(tx_signed_raw_hex))
# queue_create(
# tx_transfer['nonce'],
# tx_transfer['from'],
# tx_hash_hex,
# tx_signed_raw_hex,
# str(default_chain_spec),
# )
# logg.debug('create {}'.format(tx_transfer['from']))
# cache_transfer_data(
# tx_hash_hex,
# tx_transfer, #_signed_raw_hex,
# )
s_nonce.link(s_transfer)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[1])
o = q.first()
tx_hash_hex = o.tx_hash
s_resend = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
tx_hash_hex,
str(default_chain_spec),
],
queue=None,
)
t = s_resend.apply_async()
i = 0
for r in t.collect():
logg.debug('{} {}'.format(i, r[0].get()))
i += 1
t = s_resend.apply_async()
for r in t.collect():
pass
assert t.successful()
#

View File

@@ -1,4 +1,5 @@
# third-party imports
import pytest
import celery
# local imports
@@ -6,7 +7,92 @@ from cic_eth.admin.nonce import shift_nonce
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.tx import otx_cache_parse_tx
from cic_eth.eth.task import sign_tx
from cic_eth.db.models.nonce import (
NonceReservation,
Nonce
)
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
@pytest.mark.skip()
def test_reserve_nonce_task(
init_database,
celery_session_worker,
eth_empty_accounts,
):
s = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
'foo',
eth_empty_accounts[0],
],
queue=None,
)
t = s.apply_async()
r = t.get()
assert r == 'foo'
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
o = q.first()
assert o != None
q = init_database.query(NonceReservation)
q = q.filter(NonceReservation.key==str(t))
o = q.first()
assert o != None
def test_reserve_nonce_chain(
default_chain_spec,
init_database,
celery_session_worker,
init_w3,
init_rpc,
):
provider_address = init_rpc.gas_provider()
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==provider_address)
o = q.first()
o.nonce = 42
init_database.add(o)
init_database.commit()
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
init_w3.eth.accounts[0],
provider_address,
],
queue=None,
)
s_gas = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
str(default_chain_spec),
],
queue=None,
)
s_nonce.link(s_gas)
t = s_nonce.apply_async()
r = t.get()
for c in t.collect():
pass
assert t.successful()
q = init_database.query(Otx)
Q = q.join(TxCache)
q = q.filter(TxCache.recipient==init_w3.eth.accounts[0])
o = q.first()
assert o.nonce == 42
@pytest.mark.skip()
def test_shift_nonce(
default_chain_spec,
init_database,
@@ -47,3 +133,4 @@ def test_shift_nonce(
for _ in t.collect():
pass
assert t.successful()

View File

@@ -27,14 +27,14 @@ def test_states_initial(
tx = {
'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[1],
'nonce': 42,
'nonce': 13,
'gas': 21000,
'gasPrice': 1000000,
'value': 128,
'chainId': 666,
'chainId': 42,
'data': '',
}
(tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'Foo:666', None)
(tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'foo:bar:42', None)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
assert otx.status == StatusEnum.PENDING.value
@@ -43,7 +43,7 @@ def test_states_initial(
'cic_eth.eth.tx.check_gas',
[
[tx_hash_hex],
'Foo:666',
'foo:bar:42',
[tx_raw_signed_hex],
init_w3.eth.accounts[0],
8000000,
@@ -67,7 +67,7 @@ def test_states_initial(
'cic_eth.eth.tx.check_gas',
[
[tx_hash_hex],
'Foo:666',
'foo:bar:42',
[tx_raw_signed_hex],
init_w3.eth.accounts[0],
8000000,
@@ -94,14 +94,14 @@ def test_states_failed(
tx = {
'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[1],
'nonce': 42,
'nonce': 13,
'gas': 21000,
'gasPrice': 1000000,
'value': 128,
'chainId': 666,
'chainId': 42,
'data': '',
}
(tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'Foo:666', None)
(tx_hash_hex, tx_raw_signed_hex) = sign_and_register_tx(tx, 'foo:bar:42', None)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
otx.sendfail(session=init_database)
@@ -112,7 +112,7 @@ def test_states_failed(
'cic_eth.eth.tx.check_gas',
[
[tx_hash_hex],
'Foo:666',
'foo:bar:42',
[tx_raw_signed_hex],
init_w3.eth.accounts[0],
8000000,

View File

@@ -20,21 +20,30 @@ def test_approve(
cic_registry,
):
s = celery.signature(
'cic_eth.eth.token.approve',
[
[
token_data = [
{
'address': bancor_tokens[0],
},
],
]
s_nonce = celery.signature(
'cic_eth.eth.tx.reserve_nonce',
[
token_data,
init_rpc.w3.eth.accounts[0],
],
queue=None,
)
s_approve = celery.signature(
'cic_eth.eth.token.approve',
[
init_rpc.w3.eth.accounts[0],
init_rpc.w3.eth.accounts[1],
1024,
str(default_chain_spec),
],
)
t = s.apply_async()
s_nonce.link(s_approve)
t = s_nonce.apply_async()
t.get()
for r in t.collect():
logg.debug('result {}'.format(r))

View File

@@ -1,76 +0,0 @@
# standard imports
import logging
import time
# third-party imports
from erc20_approval_escrow import TransferApproval
import celery
import sha3
# local imports
from cic_eth.eth.token import TokenTxFactory
logg = logging.getLogger()
# BUG: transaction receipt only found sometimes
def test_transfer_approval(
default_chain_spec,
transfer_approval,
bancor_tokens,
w3_account_roles,
eth_empty_accounts,
cic_registry,
init_database,
celery_session_worker,
init_eth_tester,
init_w3,
):
s = celery.signature(
'cic_eth.eth.request.transfer_approval_request',
[
[
{
'address': bancor_tokens[0],
},
],
w3_account_roles['eth_account_sarafu_owner'],
eth_empty_accounts[0],
1024,
str(default_chain_spec),
],
)
s_send = celery.signature(
'cic_eth.eth.tx.send',
[
str(default_chain_spec),
],
)
s.link(s_send)
t = s.apply_async()
tx_signed_raws = t.get()
for r in t.collect():
logg.debug('result {}'.format(r))
assert t.successful()
init_eth_tester.mine_block()
h = sha3.keccak_256()
tx_signed_raw = tx_signed_raws[0]
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw[2:])
h.update(tx_signed_raw_bytes)
tx_hash = h.digest()
rcpt = init_w3.eth.getTransactionReceipt(tx_hash)
assert rcpt.status == 1
a = TransferApproval(init_w3, transfer_approval)
assert a.last_serial() == 1
logg.debug('requests {}'.format(a.requests(1)['serial']))

View File

@@ -67,7 +67,7 @@ def test_callback_tcp(
logg.debug('recived {} '.format(data))
o = json.loads(echo)
try:
assert o == data
assert o['result'] == data
except Exception as e:
self.exception = e
@@ -130,7 +130,7 @@ def test_callback_redis(
o = json.loads(echo['data'])
logg.debug('recived {} '.format(o))
try:
assert o == data
assert o['result'] == data
except Exception as e:
self.exception = e

View File

@@ -0,0 +1,16 @@
# local imports
from cic_eth.db.models.debug import Debug
def test_debug(
init_database,
):
o = Debug('foo', 'bar')
init_database.add(o)
init_database.commit()
q = init_database.query(Debug)
q = q.filter(Debug.tag=='foo')
o = q.first()
assert o.description == 'bar'

View File

@@ -1,8 +1,29 @@
# third-party imports
import pytest
import uuid
# local imports
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
from cic_eth.error import (
InitializationError,
IntegrityError,
)
def test_nonce_init(
init_database,
eth_empty_accounts,
):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
init_database.commit()
with pytest.raises(InitializationError):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
def test_nonce_increment(
init_database,
@@ -10,11 +31,46 @@ def test_nonce_increment(
database_engine,
):
# if database_engine[:6] == 'sqlite':
# pytest.skip('sqlite cannot lock tables which is required for this test, skipping')
nonce = Nonce.next(eth_empty_accounts[0], 3)
assert nonce == 3
nonce = Nonce.next(eth_empty_accounts[0], 3)
assert nonce == 4
def test_nonce_reserve(
init_database,
eth_empty_accounts,
):
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
init_database.commit()
uu = uuid.uuid4()
nonce = NonceReservation.next(eth_empty_accounts[0], str(uu), session=init_database)
init_database.commit()
assert nonce == 42
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==eth_empty_accounts[0])
o = q.first()
assert o.nonce == 43
nonce = NonceReservation.release(str(uu))
init_database.commit()
assert nonce == 42
q = init_database.query(NonceReservation)
q = q.filter(NonceReservation.key==str(uu))
o = q.first()
assert o == None
def test_nonce_reserve_integrity(
init_database,
eth_empty_accounts,
):
uu = uuid.uuid4()
nonce = Nonce.init(eth_empty_accounts[0], 42, session=init_database)
with pytest.raises(IntegrityError):
NonceReservation.release(str(uu))

View File

@@ -9,18 +9,18 @@ def test_db_role(
foo = AccountRole.set('foo', eth_empty_accounts[0])
init_database.add(foo)
init_database.commit()
assert AccountRole.get_address('foo') == eth_empty_accounts[0]
assert AccountRole.get_address('foo', init_database) == eth_empty_accounts[0]
bar = AccountRole.set('bar', eth_empty_accounts[1])
init_database.add(bar)
init_database.commit()
assert AccountRole.get_address('bar') == eth_empty_accounts[1]
assert AccountRole.get_address('bar', init_database) == eth_empty_accounts[1]
foo = AccountRole.set('foo', eth_empty_accounts[2])
init_database.add(foo)
init_database.commit()
assert AccountRole.get_address('foo') == eth_empty_accounts[2]
assert AccountRole.get_address('bar') == eth_empty_accounts[1]
assert AccountRole.get_address('foo', init_database) == eth_empty_accounts[2]
assert AccountRole.get_address('bar', init_database) == eth_empty_accounts[1]
tag = AccountRole.role_for(eth_empty_accounts[2])
assert tag == 'foo'

View File

@@ -26,7 +26,7 @@ def test_set(
'data': '',
'chainId': 1,
}
(tx_hash, tx_signed) = sign_tx(tx_def, 'Foo:1')
(tx_hash, tx_signed) = sign_tx(tx_def, 'foo:bar:1')
otx = Otx(
tx_def['nonce'],
tx_def['from'],
@@ -82,7 +82,7 @@ def test_clone(
'data': '',
'chainId': 1,
}
(tx_hash, tx_signed) = sign_tx(tx_def, 'Foo:1')
(tx_hash, tx_signed) = sign_tx(tx_def, 'foo:bar:1')
otx = Otx(
tx_def['nonce'],
tx_def['from'],

View File

@@ -17,6 +17,7 @@ from cic_eth.db.models.tx import TxCache
logg = logging.getLogger()
@pytest.mark.skip()
def test_resolve_converters_by_tokens(
cic_registry,
init_w3,
@@ -43,6 +44,7 @@ def test_resolve_converters_by_tokens(
assert len(t['converters']) == 1
@pytest.mark.skip()
def test_unpack_convert(
default_chain_spec,
cic_registry,
@@ -84,6 +86,7 @@ def test_unpack_convert(
assert convert_data['fee'] == 0
@pytest.mark.skip()
def test_queue_cache_convert(
default_chain_spec,
init_w3,

View File

@@ -1,51 +0,0 @@
# standard imports
import logging
# local imports
from cic_eth.eth.nonce import NonceOracle
logg = logging.getLogger()
def test_nonce_sequence(
eth_empty_accounts,
init_database,
init_rpc,
):
account= init_rpc.w3.eth.personal.new_account('')
no = NonceOracle(account, 0)
n = no.next()
assert n == 0
n = no.next()
assert n == 1
init_rpc.w3.eth.sendTransaction({
'from': init_rpc.w3.eth.accounts[0],
'to': account,
'value': 200000000,
})
init_rpc.w3.eth.sendTransaction({
'from': account,
'to': eth_empty_accounts[0],
'value': 100,
})
c = init_rpc.w3.eth.getTransactionCount(account, 'pending')
logg.debug('nonce {}'.format(c))
account= init_rpc.w3.eth.personal.new_account('')
no = NonceOracle(account, c)
n = no.next()
assert n == 1
n = no.next()
assert n == 2
# try with bogus value
no = NonceOracle(account, 4)
n = no.next()
assert n == 3

Some files were not shown because too many files have changed in this diff Show More