From 68e02ba5b8d4d813b634ace054ba2a50956f3d7c Mon Sep 17 00:00:00 2001 From: nolash Date: Fri, 2 Apr 2021 18:45:51 +0200 Subject: [PATCH] WIP Expunge obsolete db migrations, bring in chainqueue --- apps/cic-eth/cic_eth/api/api_admin.py | 4 +- .../versions/0ec0d6d1e785_add_chainqueue.py | 29 + .../default/versions/1f1b3b641d08_roles.py | 29 + .../2a07b543335e_add_new_syncer_table.py | 35 - .../3b693afd526a_nonce_reservation.py | 31 - .../versions/49b348246d70_add_nonce_index.py | 29 - .../52c7c59cd0b1_add_account_roles.py | 31 - ..._debug_output.py => 5ca4b77ce205_debug.py} | 14 +- .../6ac7a1dadc46_add_otx_state_log.py | 30 - ...bd_add_attempts_and_version_log_for_otx.py | 31 - ...d_account_lock.py => 75d4767b3031_lock.py} | 13 +- .../7cb65b893934_add_blocknumber_pointer.py | 31 - .../versions/7e8d7626e38f_add_block_sync.py | 45 -- .../8593fa1ca0f4_add_transaction_queue.py | 35 - .../versions/9c420530eeb2_nonce.py} | 22 +- .../9c4bd7491015_rename_block_sync_table.py | 26 - .../9daa16518a91_add_tx_sync_state.py | 30 - ...e2aab8f331_add_date_accessed_to_txcache.py | 34 - ...rt_tx_index.py => aee12aeb47ec_convert.py} | 14 +- ...er.py => b125cbf81e32_add_chain_syncer.py} | 13 +- .../versions/df19f4e69676_add_tx_track.py | 31 - .../default/versions/e3b5330ee71c_.py | 38 - .../db/migrations/postgresql/alembic.ini | 85 -- .../cic_eth/db/migrations/postgresql/env.py | 77 -- .../db/migrations/postgresql/script.py.mako | 24 - .../2a07b543335e_add_new_syncer_table.py | 35 - .../versions/49b348246d70_add_nonce_index.py | 29 - .../52c7c59cd0b1_add_account_roles.py | 31 - .../6ac7a1dadc46_add_otx_state_log.py | 30 - ...bd_add_attempts_and_version_log_for_otx.py | 31 - .../7cb65b893934_add_blocknumber_pointer.py | 31 - .../versions/7e8d7626e38f_add_block_sync.py | 42 - .../8593fa1ca0f4_add_transaction_queue.py | 35 - .../versions/89e1e9baa53c_add_account_lock.py | 33 - .../9c4bd7491015_rename_block_sync_table.py | 26 - .../9daa16518a91_add_tx_sync_state.py | 30 - ...e2aab8f331_add_date_accessed_to_txcache.py | 33 - .../versions/cd2052be6db2_convert_tx_index.py | 34 - .../versions/df19f4e69676_add_tx_track.py | 31 - .../postgresql/versions/e3b5330ee71c_.py | 37 - .../versions/ec40ac0974c1_add_chain_syncer.py | 28 - .../versions/f738d9962fdf_debug_output.py | 32 - apps/cic-eth/cic_eth/eth/bancor.py.bak | 2 +- apps/cic-eth/cic_eth/eth/tx.py | 12 +- apps/cic-eth/cic_eth/queue/balance.py | 12 +- apps/cic-eth/cic_eth/queue/lock.py | 46 ++ apps/cic-eth/cic_eth/queue/query.py | 131 +++ apps/cic-eth/cic_eth/queue/state.py | 67 ++ apps/cic-eth/cic_eth/queue/time.py | 4 +- apps/cic-eth/cic_eth/queue/tx.py | 764 +----------------- .../cic_eth/runnable/daemons/filters/tx.py | 2 +- .../cic-eth/cic_eth/runnable/daemons/retry.py | 2 +- .../cic_eth/runnable/daemons/tasker.py | 5 +- apps/cic-eth/requirements.txt | 1 + .../cic-eth/tests/unit/queue/test_balances.py | 6 +- apps/cic-eth/tests/unit/queue/test_list_tx.py | 76 -- .../tests/unit/queue/test_otx_state_log.py | 22 - .../tests/unit/queue/test_queue_lock.py | 2 +- .../cic-eth/tests/unit/queue/test_tx_queue.py | 577 ------------- 59 files changed, 383 insertions(+), 2677 deletions(-) create mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/0ec0d6d1e785_add_chainqueue.py create mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/1f1b3b641d08_roles.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/2a07b543335e_add_new_syncer_table.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/49b348246d70_add_nonce_index.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/52c7c59cd0b1_add_account_roles.py rename apps/cic-eth/cic_eth/db/migrations/default/versions/{f738d9962fdf_debug_output.py => 5ca4b77ce205_debug.py} (73%) delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/6ac7a1dadc46_add_otx_state_log.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/71708e943dbd_add_attempts_and_version_log_for_otx.py rename apps/cic-eth/cic_eth/db/migrations/default/versions/{89e1e9baa53c_add_account_lock.py => 75d4767b3031_lock.py} (81%) delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/7cb65b893934_add_blocknumber_pointer.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/7e8d7626e38f_add_block_sync.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/8593fa1ca0f4_add_transaction_queue.py rename apps/cic-eth/cic_eth/db/migrations/{postgresql/versions/3b693afd526a_nonce_reservation.py => default/versions/9c420530eeb2_nonce.py} (54%) delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/9c4bd7491015_rename_block_sync_table.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/9daa16518a91_add_tx_sync_state.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/a2e2aab8f331_add_date_accessed_to_txcache.py rename apps/cic-eth/cic_eth/db/migrations/default/versions/{cd2052be6db2_convert_tx_index.py => aee12aeb47ec_convert.py} (68%) rename apps/cic-eth/cic_eth/db/migrations/default/versions/{ec40ac0974c1_add_chain_syncer.py => b125cbf81e32_add_chain_syncer.py} (72%) delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/df19f4e69676_add_tx_track.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/default/versions/e3b5330ee71c_.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/alembic.ini delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/env.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/script.py.mako delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/2a07b543335e_add_new_syncer_table.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/49b348246d70_add_nonce_index.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/52c7c59cd0b1_add_account_roles.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/6ac7a1dadc46_add_otx_state_log.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/71708e943dbd_add_attempts_and_version_log_for_otx.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/7cb65b893934_add_blocknumber_pointer.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/7e8d7626e38f_add_block_sync.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/8593fa1ca0f4_add_transaction_queue.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/89e1e9baa53c_add_account_lock.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/9c4bd7491015_rename_block_sync_table.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/9daa16518a91_add_tx_sync_state.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/a2e2aab8f331_add_date_accessed_to_txcache.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/cd2052be6db2_convert_tx_index.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/df19f4e69676_add_tx_track.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/e3b5330ee71c_.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/ec40ac0974c1_add_chain_syncer.py delete mode 100644 apps/cic-eth/cic_eth/db/migrations/postgresql/versions/f738d9962fdf_debug_output.py create mode 100644 apps/cic-eth/cic_eth/queue/lock.py create mode 100644 apps/cic-eth/cic_eth/queue/query.py create mode 100644 apps/cic-eth/cic_eth/queue/state.py delete mode 100644 apps/cic-eth/tests/unit/queue/test_list_tx.py delete mode 100644 apps/cic-eth/tests/unit/queue/test_otx_state_log.py delete mode 100644 apps/cic-eth/tests/unit/queue/test_tx_queue.py diff --git a/apps/cic-eth/cic_eth/api/api_admin.py b/apps/cic-eth/cic_eth/api/api_admin.py index 458b03fb..67be4176 100644 --- a/apps/cic-eth/cic_eth/api/api_admin.py +++ b/apps/cic-eth/cic_eth/api/api_admin.py @@ -92,7 +92,7 @@ class AdminApi: def get_lock(self): s_lock = celery.signature( - 'cic_eth.queue.tx.get_lock', + 'cic_eth.queue.lock.get_lock', [], queue=self.queue, ) @@ -165,7 +165,7 @@ class AdminApi: ) s_manual = celery.signature( - 'cic_eth.queue.tx.set_manual', + 'cic_eth.queue.state.set_manual', [ tx_hash_hex, ], diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/0ec0d6d1e785_add_chainqueue.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/0ec0d6d1e785_add_chainqueue.py new file mode 100644 index 00000000..09217067 --- /dev/null +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/0ec0d6d1e785_add_chainqueue.py @@ -0,0 +1,29 @@ +"""Add chainqueue + +Revision ID: 0ec0d6d1e785 +Revises: +Create Date: 2021-04-02 18:30:55.398388 + +""" +from alembic import op +import sqlalchemy as sa + +from chainqueue.db.migrations.sqlalchemy import ( + chainqueue_upgrade, + chainqueue_downgrade, + ) + +# revision identifiers, used by Alembic. +revision = '0ec0d6d1e785' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + chainqueue_upgrade(0, 0, 1) + + +def downgrade(): + chainqueue_downgrade(0, 0, 1) + diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/1f1b3b641d08_roles.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/1f1b3b641d08_roles.py new file mode 100644 index 00000000..35fed2a2 --- /dev/null +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/1f1b3b641d08_roles.py @@ -0,0 +1,29 @@ +"""Roles + +Revision ID: 1f1b3b641d08 +Revises: 9c420530eeb2 +Create Date: 2021-04-02 18:40:27.787631 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '1f1b3b641d08' +down_revision = '9c420530eeb2' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + 'account_role', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('tag', sa.Text, nullable=False, unique=True), + sa.Column('address_hex', sa.String(42), nullable=False), + ) + + +def downgrade(): + op.drop_table('account_role') diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/2a07b543335e_add_new_syncer_table.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/2a07b543335e_add_new_syncer_table.py deleted file mode 100644 index 30ddb96f..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/2a07b543335e_add_new_syncer_table.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Add new syncer table - -Revision ID: 2a07b543335e -Revises: a2e2aab8f331 -Create Date: 2020-12-27 09:35:44.017981 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '2a07b543335e' -down_revision = 'a2e2aab8f331' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'blockchain_sync', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('blockchain', sa.String, nullable=False), - sa.Column('block_start', sa.Integer, nullable=False, default=0), - sa.Column('tx_start', sa.Integer, nullable=False, default=0), - sa.Column('block_cursor', sa.Integer, nullable=False, default=0), - sa.Column('tx_cursor', sa.Integer, nullable=False, default=0), - sa.Column('block_target', sa.Integer, nullable=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime), - ) - - -def downgrade(): - op.drop_table('blockchain_sync') diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py deleted file mode 100644 index 580d0345..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/3b693afd526a_nonce_reservation.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Nonce reservation - -Revision ID: 3b693afd526a -Revises: f738d9962fdf -Create Date: 2021-03-05 07:09:50.898728 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '3b693afd526a' -down_revision = 'f738d9962fdf' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'nonce_task_reservation', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('address_hex', sa.String(42), nullable=False), - sa.Column('nonce', sa.Integer, nullable=False), - sa.Column('key', sa.String, nullable=False), - sa.Column('date_created', sa.DateTime, nullable=False), - ) - - -def downgrade(): - op.drop_table('nonce_task_reservation') diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/49b348246d70_add_nonce_index.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/49b348246d70_add_nonce_index.py deleted file mode 100644 index 3abee1b6..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/49b348246d70_add_nonce_index.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Add nonce index - -Revision ID: 49b348246d70 -Revises: 52c7c59cd0b1 -Create Date: 2020-12-19 09:45:36.186446 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '49b348246d70' -down_revision = '52c7c59cd0b1' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'nonce', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('address_hex', sa.String(42), nullable=False, unique=True), - sa.Column('nonce', sa.Integer, nullable=False), - ) - - -def downgrade(): - op.drop_table('nonce') diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/52c7c59cd0b1_add_account_roles.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/52c7c59cd0b1_add_account_roles.py deleted file mode 100644 index 7aa74059..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/52c7c59cd0b1_add_account_roles.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Add account roles - -Revision ID: 52c7c59cd0b1 -Revises: 9c4bd7491015 -Create Date: 2020-12-19 07:21:38.249237 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '52c7c59cd0b1' -down_revision = '9c4bd7491015' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'account_role', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('tag', sa.Text, nullable=False, unique=True), - sa.Column('address_hex', sa.String(42), nullable=False), - ) - pass - - -def downgrade(): - op.drop_table('account_role') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/f738d9962fdf_debug_output.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/5ca4b77ce205_debug.py similarity index 73% rename from apps/cic-eth/cic_eth/db/migrations/default/versions/f738d9962fdf_debug_output.py rename to apps/cic-eth/cic_eth/db/migrations/default/versions/5ca4b77ce205_debug.py index 378f2965..5014fb2d 100644 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/f738d9962fdf_debug_output.py +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/5ca4b77ce205_debug.py @@ -1,8 +1,8 @@ -"""debug output +"""DEbug -Revision ID: f738d9962fdf -Revises: ec40ac0974c1 -Create Date: 2021-03-04 08:32:43.281214 +Revision ID: 5ca4b77ce205 +Revises: 75d4767b3031 +Create Date: 2021-04-02 18:42:12.257244 """ from alembic import op @@ -10,8 +10,8 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = 'f738d9962fdf' -down_revision = 'ec40ac0974c1' +revision = '5ca4b77ce205' +down_revision = '75d4767b3031' branch_labels = None depends_on = None @@ -24,9 +24,7 @@ def upgrade(): sa.Column('description', sa.String, nullable=False), sa.Column('date_created', sa.DateTime, nullable=False), ) - pass def downgrade(): op.drop_table('debug') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/6ac7a1dadc46_add_otx_state_log.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/6ac7a1dadc46_add_otx_state_log.py deleted file mode 100644 index f15834d9..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/6ac7a1dadc46_add_otx_state_log.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Add otx state log - -Revision ID: 6ac7a1dadc46 -Revises: 89e1e9baa53c -Create Date: 2021-01-30 13:59:49.022373 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '6ac7a1dadc46' -down_revision = '89e1e9baa53c' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'otx_state_log', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False), - sa.Column('date', sa.DateTime, nullable=False), - sa.Column('status', sa.Integer, nullable=False), - ) - - -def downgrade(): - op.drop_table('otx_state_log') diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/71708e943dbd_add_attempts_and_version_log_for_otx.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/71708e943dbd_add_attempts_and_version_log_for_otx.py deleted file mode 100644 index 3be687eb..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/71708e943dbd_add_attempts_and_version_log_for_otx.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Add attempts and version log for otx - -Revision ID: 71708e943dbd -Revises: 7e8d7626e38f -Create Date: 2020-09-26 14:41:19.298651 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '71708e943dbd' -down_revision = '7e8d7626e38f' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'otx_attempts', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False), - sa.Column('date', sa.DateTime, nullable=False), - ) - pass - - -def downgrade(): - op.drop_table('otx_attempts') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/89e1e9baa53c_add_account_lock.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/75d4767b3031_lock.py similarity index 81% rename from apps/cic-eth/cic_eth/db/migrations/default/versions/89e1e9baa53c_add_account_lock.py rename to apps/cic-eth/cic_eth/db/migrations/default/versions/75d4767b3031_lock.py index 4b1c4401..82e4384b 100644 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/89e1e9baa53c_add_account_lock.py +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/75d4767b3031_lock.py @@ -1,8 +1,8 @@ -"""Add account lock +"""Lock -Revision ID: 89e1e9baa53c -Revises: 2a07b543335e -Create Date: 2021-01-27 19:57:36.793882 +Revision ID: 75d4767b3031 +Revises: 1f1b3b641d08 +Create Date: 2021-04-02 18:41:20.864265 """ from alembic import op @@ -10,8 +10,8 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '89e1e9baa53c' -down_revision = '2a07b543335e' +revision = '75d4767b3031' +down_revision = '1f1b3b641d08' branch_labels = None depends_on = None @@ -28,6 +28,7 @@ def upgrade(): ) op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True) + def downgrade(): op.drop_index('idx_chain_address') op.drop_table('lock') diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/7cb65b893934_add_blocknumber_pointer.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/7cb65b893934_add_blocknumber_pointer.py deleted file mode 100644 index 74a97f83..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/7cb65b893934_add_blocknumber_pointer.py +++ /dev/null @@ -1,31 +0,0 @@ -"""add blocknumber pointer - -Revision ID: 7cb65b893934 -Revises: 8593fa1ca0f4 -Create Date: 2020-09-24 19:29:13.543648 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '7cb65b893934' -down_revision = '8593fa1ca0f4' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'watcher_state', - sa.Column('block_number', sa.Integer) - ) - conn = op.get_bind() - conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);') - pass - - -def downgrade(): - op.drop_table('watcher_state') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/7e8d7626e38f_add_block_sync.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/7e8d7626e38f_add_block_sync.py deleted file mode 100644 index 52f29a7b..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/7e8d7626e38f_add_block_sync.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Add block sync - -Revision ID: 7e8d7626e38f -Revises: cd2052be6db2 -Create Date: 2020-09-26 11:12:27.818524 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '7e8d7626e38f' -down_revision = 'cd2052be6db2' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'block_sync', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('blockchain', sa.String, nullable=False, unique=True), - sa.Column('block_height_backlog', sa.Integer, nullable=False, default=0), - sa.Column('tx_height_backlog', sa.Integer, nullable=False, default=0), - sa.Column('block_height_session', sa.Integer, nullable=False, default=0), - sa.Column('tx_height_session', sa.Integer, nullable=False, default=0), - sa.Column('block_height_head', sa.Integer, nullable=False, default=0), - sa.Column('tx_height_head', sa.Integer, nullable=False, default=0), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime), - ) - op.drop_table('watcher_state') - pass - - -def downgrade(): - op.drop_table('block_sync') - op.create_table( - 'watcher_state', - sa.Column('block_number', sa.Integer) - ) - conn = op.get_bind() - conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/8593fa1ca0f4_add_transaction_queue.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/8593fa1ca0f4_add_transaction_queue.py deleted file mode 100644 index e9f9cf6a..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/8593fa1ca0f4_add_transaction_queue.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Add transaction queue - -Revision ID: 8593fa1ca0f4 -Revises: -Create Date: 2020-09-22 21:56:42.117047 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '8593fa1ca0f4' -down_revision = None -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'otx', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('nonce', sa.Integer, nullable=False), - sa.Column('tx_hash', sa.String(66), nullable=False), - sa.Column('signed_tx', sa.Text, nullable=False), - sa.Column('status', sa.Integer, nullable=False, default=-9), - sa.Column('block', sa.Integer), - ) - op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True) - - -def downgrade(): - op.drop_index('idx_otx_tx') - op.drop_table('otx') diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/9c420530eeb2_nonce.py similarity index 54% rename from apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py rename to apps/cic-eth/cic_eth/db/migrations/default/versions/9c420530eeb2_nonce.py index 580d0345..3dffac98 100644 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/3b693afd526a_nonce_reservation.py +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/9c420530eeb2_nonce.py @@ -1,8 +1,8 @@ -"""Nonce reservation +"""Nonce -Revision ID: 3b693afd526a -Revises: f738d9962fdf -Create Date: 2021-03-05 07:09:50.898728 +Revision ID: 9c420530eeb2 +Revises: b125cbf81e32 +Create Date: 2021-04-02 18:38:56.459334 """ from alembic import op @@ -10,15 +10,22 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '3b693afd526a' -down_revision = 'f738d9962fdf' +revision = '9c420530eeb2' +down_revision = 'b125cbf81e32' branch_labels = None depends_on = None def upgrade(): op.create_table( - 'nonce_task_reservation', + 'nonce', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('address_hex', sa.String(42), nullable=False, unique=True), + sa.Column('nonce', sa.Integer, nullable=False), + ) + + op.create_table( + 'nonce_task_reservation', sa.Column('id', sa.Integer, primary_key=True), sa.Column('address_hex', sa.String(42), nullable=False), sa.Column('nonce', sa.Integer, nullable=False), @@ -29,3 +36,4 @@ def upgrade(): def downgrade(): op.drop_table('nonce_task_reservation') + op.drop_table('nonce') diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/9c4bd7491015_rename_block_sync_table.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/9c4bd7491015_rename_block_sync_table.py deleted file mode 100644 index f58721b2..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/9c4bd7491015_rename_block_sync_table.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Rename block sync table - -Revision ID: 9c4bd7491015 -Revises: 9daa16518a91 -Create Date: 2020-10-15 23:45:56.306898 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '9c4bd7491015' -down_revision = '9daa16518a91' -branch_labels = None -depends_on = None - - -def upgrade(): - op.rename_table('block_sync', 'otx_sync') - pass - - -def downgrade(): - op.rename_table('otx_sync', 'block_sync') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/9daa16518a91_add_tx_sync_state.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/9daa16518a91_add_tx_sync_state.py deleted file mode 100644 index 2c6cc0fe..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/9daa16518a91_add_tx_sync_state.py +++ /dev/null @@ -1,30 +0,0 @@ -"""add tx sync state - -Revision ID: 9daa16518a91 -Revises: e3b5330ee71c -Create Date: 2020-10-10 14:43:18.699276 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '9daa16518a91' -down_revision = 'e3b5330ee71c' -branch_labels = None -depends_on = None - - -def upgrade(): -# op.create_table( -# 'tx_sync', -# sa.Column('tx', sa.String(66), nullable=False), -# ) -# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')") - pass - - -def downgrade(): -# op.drop_table('tx_sync') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/a2e2aab8f331_add_date_accessed_to_txcache.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/a2e2aab8f331_add_date_accessed_to_txcache.py deleted file mode 100644 index f82f2615..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/a2e2aab8f331_add_date_accessed_to_txcache.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Add date accessed to txcache - -Revision ID: a2e2aab8f331 -Revises: 49b348246d70 -Create Date: 2020-12-24 18:58:06.137812 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'a2e2aab8f331' -down_revision = '49b348246d70' -branch_labels = None -depends_on = None - - -def upgrade(): - op.add_column( - 'tx_cache', - sa.Column( - 'date_checked', - sa.DateTime, - nullable=False - ) - ) - pass - - -def downgrade(): - # drop does not work withs qlite - #op.drop_column('tx_cache', 'date_checked') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/cd2052be6db2_convert_tx_index.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/aee12aeb47ec_convert.py similarity index 68% rename from apps/cic-eth/cic_eth/db/migrations/default/versions/cd2052be6db2_convert_tx_index.py rename to apps/cic-eth/cic_eth/db/migrations/default/versions/aee12aeb47ec_convert.py index 5b59d734..21aced4e 100644 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/cd2052be6db2_convert_tx_index.py +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/aee12aeb47ec_convert.py @@ -1,8 +1,8 @@ -"""convert tx index +"""Convert -Revision ID: cd2052be6db2 -Revises: 7cb65b893934 -Create Date: 2020-09-24 21:20:51.580500 +Revision ID: aee12aeb47ec +Revises: 5ca4b77ce205 +Create Date: 2021-04-02 18:42:45.233356 """ from alembic import op @@ -10,8 +10,8 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = 'cd2052be6db2' -down_revision = '7cb65b893934' +revision = 'aee12aeb47ec' +down_revision = '5ca4b77ce205' branch_labels = None depends_on = None @@ -20,10 +20,8 @@ def upgrade(): op.create_table( 'tx_convert_transfer', sa.Column('id', sa.Integer, primary_key=True), - #sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True), sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True), sa.Column('transfer_tx_hash', sa.String(66), unique=True), -# sa.Column('holder_address', sa.String(42), nullable=False), sa.Column('recipient_address', sa.String(42), nullable=False), ) op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address']) diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/ec40ac0974c1_add_chain_syncer.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/b125cbf81e32_add_chain_syncer.py similarity index 72% rename from apps/cic-eth/cic_eth/db/migrations/default/versions/ec40ac0974c1_add_chain_syncer.py rename to apps/cic-eth/cic_eth/db/migrations/default/versions/b125cbf81e32_add_chain_syncer.py index a278997f..8f3be98f 100644 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/ec40ac0974c1_add_chain_syncer.py +++ b/apps/cic-eth/cic_eth/db/migrations/default/versions/b125cbf81e32_add_chain_syncer.py @@ -1,12 +1,13 @@ """Add chain syncer -Revision ID: ec40ac0974c1 -Revises: 6ac7a1dadc46 -Create Date: 2021-02-23 06:10:19.246304 +Revision ID: b125cbf81e32 +Revises: 0ec0d6d1e785 +Create Date: 2021-04-02 18:36:44.459603 """ from alembic import op import sqlalchemy as sa + from chainsyncer.db.migrations.sqlalchemy import ( chainsyncer_upgrade, chainsyncer_downgrade, @@ -14,15 +15,15 @@ from chainsyncer.db.migrations.sqlalchemy import ( # revision identifiers, used by Alembic. -revision = 'ec40ac0974c1' -down_revision = '6ac7a1dadc46' +revision = 'b125cbf81e32' +down_revision = '0ec0d6d1e785' branch_labels = None depends_on = None - def upgrade(): chainsyncer_upgrade(0, 0, 1) def downgrade(): chainsyncer_downgrade(0, 0, 1) + diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/df19f4e69676_add_tx_track.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/df19f4e69676_add_tx_track.py deleted file mode 100644 index 2c1ea138..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/df19f4e69676_add_tx_track.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Add tx tracker record - -Revision ID: df19f4e69676 -Revises: 71708e943dbd -Create Date: 2020-10-09 23:31:44.563498 - -""" -from alembic import op -import sqlalchemy as sa - -# revision identifiers, used by Alembic. -revision = 'df19f4e69676' -down_revision = '71708e943dbd' -branch_labels = None -depends_on = None - - -def upgrade(): -# op.create_table( -# 'tx', -# sa.Column('id', sa.Integer, primary_key=True), -# sa.Column('date_added', sa.DateTime, nullable=False), -# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True), -# sa.Column('success', sa.Boolean(), nullable=False), -# ) - pass - - -def downgrade(): -# op.drop_table('tx') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/default/versions/e3b5330ee71c_.py b/apps/cic-eth/cic_eth/db/migrations/default/versions/e3b5330ee71c_.py deleted file mode 100644 index 6c9115b7..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/default/versions/e3b5330ee71c_.py +++ /dev/null @@ -1,38 +0,0 @@ -"""Add cached values for tx - -Revision ID: e3b5330ee71c -Revises: df19f4e69676 -Create Date: 2020-10-10 00:17:07.094893 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'e3b5330ee71c' -down_revision = 'df19f4e69676' -branch_labels = None -depends_on = None - -def upgrade(): - op.create_table( - 'tx_cache', - sa.Column('id', sa.Integer, primary_key=True), -# sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime, nullable=False), - sa.Column('source_token_address', sa.String(42), nullable=False), - sa.Column('destination_token_address', sa.String(42), nullable=False), - sa.Column('sender', sa.String(42), nullable=False), - sa.Column('recipient', sa.String(42), nullable=False), - sa.Column('from_value', sa.NUMERIC(), nullable=False), - sa.Column('to_value', sa.NUMERIC(), nullable=True), - sa.Column('block_number', sa.BIGINT(), nullable=True), - sa.Column('tx_index', sa.Integer, nullable=True), - ) - -def downgrade(): - op.drop_table('tx_cache') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/alembic.ini b/apps/cic-eth/cic_eth/db/migrations/postgresql/alembic.ini deleted file mode 100644 index 1555f68c..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/alembic.ini +++ /dev/null @@ -1,85 +0,0 @@ -# A generic, single database configuration. - -[alembic] -# path to migration scripts -script_location = . - -# template used to generate migration files -# file_template = %%(rev)s_%%(slug)s - -# timezone to use when rendering the date -# within the migration file as well as the filename. -# string value is passed to dateutil.tz.gettz() -# leave blank for localtime -# timezone = - -# max length of characters to apply to the -# "slug" field -# truncate_slug_length = 40 - -# set to 'true' to run the environment during -# the 'revision' command, regardless of autogenerate -# revision_environment = false - -# set to 'true' to allow .pyc and .pyo files without -# a source .py file to be detected as revisions in the -# versions/ directory -# sourceless = false - -# version location specification; this defaults -# to migrations/versions. When using multiple version -# directories, initial revisions must be specified with --version-path -# version_locations = %(here)s/bar %(here)s/bat migrations/versions - -# the output encoding used when revision files -# are written from script.py.mako -# output_encoding = utf-8 - -#sqlalchemy.url = driver://user:pass@localhost/dbname -sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-eth - -[post_write_hooks] -# post_write_hooks defines scripts or Python functions that are run -# on newly generated revision scripts. See the documentation for further -# detail and examples - -# format using "black" - use the console_scripts runner, against the "black" entrypoint -# hooks=black -# black.type=console_scripts -# black.entrypoint=black -# black.options=-l 79 - -# Logging configuration -[loggers] -keys = root,sqlalchemy,alembic - -[handlers] -keys = console - -[formatters] -keys = generic - -[logger_root] -level = WARN -handlers = console -qualname = - -[logger_sqlalchemy] -level = WARN -handlers = -qualname = sqlalchemy.engine - -[logger_alembic] -level = INFO -handlers = -qualname = alembic - -[handler_console] -class = StreamHandler -args = (sys.stderr,) -level = NOTSET -formatter = generic - -[formatter_generic] -format = %(levelname)-5.5s [%(name)s] %(message)s -datefmt = %H:%M:%S diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/env.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/env.py deleted file mode 100644 index 70518a2e..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/env.py +++ /dev/null @@ -1,77 +0,0 @@ -from logging.config import fileConfig - -from sqlalchemy import engine_from_config -from sqlalchemy import pool - -from alembic import context - -# this is the Alembic Config object, which provides -# access to the values within the .ini file in use. -config = context.config - -# Interpret the config file for Python logging. -# This line sets up loggers basically. -fileConfig(config.config_file_name) - -# add your model's MetaData object here -# for 'autogenerate' support -# from myapp import mymodel -# target_metadata = mymodel.Base.metadata -target_metadata = None - -# other values from the config, defined by the needs of env.py, -# can be acquired: -# my_important_option = config.get_main_option("my_important_option") -# ... etc. - - -def run_migrations_offline(): - """Run migrations in 'offline' mode. - - This configures the context with just a URL - and not an Engine, though an Engine is acceptable - here as well. By skipping the Engine creation - we don't even need a DBAPI to be available. - - Calls to context.execute() here emit the given string to the - script output. - - """ - url = config.get_main_option("sqlalchemy.url") - context.configure( - url=url, - target_metadata=target_metadata, - literal_binds=True, - dialect_opts={"paramstyle": "named"}, - ) - - with context.begin_transaction(): - context.run_migrations() - - -def run_migrations_online(): - """Run migrations in 'online' mode. - - In this scenario we need to create an Engine - and associate a connection with the context. - - """ - connectable = engine_from_config( - config.get_section(config.config_ini_section), - prefix="sqlalchemy.", - poolclass=pool.NullPool, - ) - - with connectable.connect() as connection: - context.configure( - connection=connection, target_metadata=target_metadata - ) - - with context.begin_transaction(): - context.run_migrations() - - -if context.is_offline_mode(): - run_migrations_offline() -else: - run_migrations_online() diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/script.py.mako b/apps/cic-eth/cic_eth/db/migrations/postgresql/script.py.mako deleted file mode 100644 index 2c015630..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/script.py.mako +++ /dev/null @@ -1,24 +0,0 @@ -"""${message} - -Revision ID: ${up_revision} -Revises: ${down_revision | comma,n} -Create Date: ${create_date} - -""" -from alembic import op -import sqlalchemy as sa -${imports if imports else ""} - -# revision identifiers, used by Alembic. -revision = ${repr(up_revision)} -down_revision = ${repr(down_revision)} -branch_labels = ${repr(branch_labels)} -depends_on = ${repr(depends_on)} - - -def upgrade(): - ${upgrades if upgrades else "pass"} - - -def downgrade(): - ${downgrades if downgrades else "pass"} diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/2a07b543335e_add_new_syncer_table.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/2a07b543335e_add_new_syncer_table.py deleted file mode 100644 index 30ddb96f..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/2a07b543335e_add_new_syncer_table.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Add new syncer table - -Revision ID: 2a07b543335e -Revises: a2e2aab8f331 -Create Date: 2020-12-27 09:35:44.017981 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '2a07b543335e' -down_revision = 'a2e2aab8f331' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'blockchain_sync', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('blockchain', sa.String, nullable=False), - sa.Column('block_start', sa.Integer, nullable=False, default=0), - sa.Column('tx_start', sa.Integer, nullable=False, default=0), - sa.Column('block_cursor', sa.Integer, nullable=False, default=0), - sa.Column('tx_cursor', sa.Integer, nullable=False, default=0), - sa.Column('block_target', sa.Integer, nullable=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime), - ) - - -def downgrade(): - op.drop_table('blockchain_sync') diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/49b348246d70_add_nonce_index.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/49b348246d70_add_nonce_index.py deleted file mode 100644 index 3abee1b6..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/49b348246d70_add_nonce_index.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Add nonce index - -Revision ID: 49b348246d70 -Revises: 52c7c59cd0b1 -Create Date: 2020-12-19 09:45:36.186446 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '49b348246d70' -down_revision = '52c7c59cd0b1' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'nonce', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('address_hex', sa.String(42), nullable=False, unique=True), - sa.Column('nonce', sa.Integer, nullable=False), - ) - - -def downgrade(): - op.drop_table('nonce') diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/52c7c59cd0b1_add_account_roles.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/52c7c59cd0b1_add_account_roles.py deleted file mode 100644 index 7aa74059..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/52c7c59cd0b1_add_account_roles.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Add account roles - -Revision ID: 52c7c59cd0b1 -Revises: 9c4bd7491015 -Create Date: 2020-12-19 07:21:38.249237 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '52c7c59cd0b1' -down_revision = '9c4bd7491015' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'account_role', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('tag', sa.Text, nullable=False, unique=True), - sa.Column('address_hex', sa.String(42), nullable=False), - ) - pass - - -def downgrade(): - op.drop_table('account_role') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/6ac7a1dadc46_add_otx_state_log.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/6ac7a1dadc46_add_otx_state_log.py deleted file mode 100644 index f15834d9..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/6ac7a1dadc46_add_otx_state_log.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Add otx state log - -Revision ID: 6ac7a1dadc46 -Revises: 89e1e9baa53c -Create Date: 2021-01-30 13:59:49.022373 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '6ac7a1dadc46' -down_revision = '89e1e9baa53c' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'otx_state_log', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False), - sa.Column('date', sa.DateTime, nullable=False), - sa.Column('status', sa.Integer, nullable=False), - ) - - -def downgrade(): - op.drop_table('otx_state_log') diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/71708e943dbd_add_attempts_and_version_log_for_otx.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/71708e943dbd_add_attempts_and_version_log_for_otx.py deleted file mode 100644 index 3be687eb..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/71708e943dbd_add_attempts_and_version_log_for_otx.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Add attempts and version log for otx - -Revision ID: 71708e943dbd -Revises: 7e8d7626e38f -Create Date: 2020-09-26 14:41:19.298651 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '71708e943dbd' -down_revision = '7e8d7626e38f' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'otx_attempts', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False), - sa.Column('date', sa.DateTime, nullable=False), - ) - pass - - -def downgrade(): - op.drop_table('otx_attempts') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/7cb65b893934_add_blocknumber_pointer.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/7cb65b893934_add_blocknumber_pointer.py deleted file mode 100644 index 74a97f83..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/7cb65b893934_add_blocknumber_pointer.py +++ /dev/null @@ -1,31 +0,0 @@ -"""add blocknumber pointer - -Revision ID: 7cb65b893934 -Revises: 8593fa1ca0f4 -Create Date: 2020-09-24 19:29:13.543648 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '7cb65b893934' -down_revision = '8593fa1ca0f4' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'watcher_state', - sa.Column('block_number', sa.Integer) - ) - conn = op.get_bind() - conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);') - pass - - -def downgrade(): - op.drop_table('watcher_state') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/7e8d7626e38f_add_block_sync.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/7e8d7626e38f_add_block_sync.py deleted file mode 100644 index 6924fa26..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/7e8d7626e38f_add_block_sync.py +++ /dev/null @@ -1,42 +0,0 @@ -"""Add block sync - -Revision ID: 7e8d7626e38f -Revises: cd2052be6db2 -Create Date: 2020-09-26 11:12:27.818524 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '7e8d7626e38f' -down_revision = 'cd2052be6db2' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'block_sync', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('blockchain', sa.String, nullable=False, unique=True), - sa.Column('height_backlog', sa.Integer, nullable=False, default=0), - sa.Column('height_session', sa.Integer, nullable=False, default=0), - sa.Column('height_head', sa.Integer, nullable=False, default=0), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime), - ) - op.drop_table('watcher_state') - pass - - -def downgrade(): - op.drop_table('block_sync') - op.create_table( - 'watcher_state', - sa.Column('block_number', sa.Integer) - ) - conn = op.get_bind() - conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/8593fa1ca0f4_add_transaction_queue.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/8593fa1ca0f4_add_transaction_queue.py deleted file mode 100644 index e9f9cf6a..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/8593fa1ca0f4_add_transaction_queue.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Add transaction queue - -Revision ID: 8593fa1ca0f4 -Revises: -Create Date: 2020-09-22 21:56:42.117047 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '8593fa1ca0f4' -down_revision = None -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'otx', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('nonce', sa.Integer, nullable=False), - sa.Column('tx_hash', sa.String(66), nullable=False), - sa.Column('signed_tx', sa.Text, nullable=False), - sa.Column('status', sa.Integer, nullable=False, default=-9), - sa.Column('block', sa.Integer), - ) - op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True) - - -def downgrade(): - op.drop_index('idx_otx_tx') - op.drop_table('otx') diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/89e1e9baa53c_add_account_lock.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/89e1e9baa53c_add_account_lock.py deleted file mode 100644 index 4b1c4401..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/89e1e9baa53c_add_account_lock.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Add account lock - -Revision ID: 89e1e9baa53c -Revises: 2a07b543335e -Create Date: 2021-01-27 19:57:36.793882 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '89e1e9baa53c' -down_revision = '2a07b543335e' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'lock', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column("address", sa.String(42), nullable=True), - sa.Column('blockchain', sa.String), - sa.Column("flags", sa.BIGINT(), nullable=False, default=0), - sa.Column("date_created", sa.DateTime, nullable=False), - sa.Column("otx_id", sa.Integer, nullable=True), - ) - op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True) - -def downgrade(): - op.drop_index('idx_chain_address') - op.drop_table('lock') diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/9c4bd7491015_rename_block_sync_table.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/9c4bd7491015_rename_block_sync_table.py deleted file mode 100644 index f58721b2..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/9c4bd7491015_rename_block_sync_table.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Rename block sync table - -Revision ID: 9c4bd7491015 -Revises: 9daa16518a91 -Create Date: 2020-10-15 23:45:56.306898 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '9c4bd7491015' -down_revision = '9daa16518a91' -branch_labels = None -depends_on = None - - -def upgrade(): - op.rename_table('block_sync', 'otx_sync') - pass - - -def downgrade(): - op.rename_table('otx_sync', 'block_sync') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/9daa16518a91_add_tx_sync_state.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/9daa16518a91_add_tx_sync_state.py deleted file mode 100644 index 2c6cc0fe..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/9daa16518a91_add_tx_sync_state.py +++ /dev/null @@ -1,30 +0,0 @@ -"""add tx sync state - -Revision ID: 9daa16518a91 -Revises: e3b5330ee71c -Create Date: 2020-10-10 14:43:18.699276 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '9daa16518a91' -down_revision = 'e3b5330ee71c' -branch_labels = None -depends_on = None - - -def upgrade(): -# op.create_table( -# 'tx_sync', -# sa.Column('tx', sa.String(66), nullable=False), -# ) -# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')") - pass - - -def downgrade(): -# op.drop_table('tx_sync') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/a2e2aab8f331_add_date_accessed_to_txcache.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/a2e2aab8f331_add_date_accessed_to_txcache.py deleted file mode 100644 index 808a503c..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/a2e2aab8f331_add_date_accessed_to_txcache.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Add date accessed to txcache - -Revision ID: a2e2aab8f331 -Revises: 49b348246d70 -Create Date: 2020-12-24 18:58:06.137812 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'a2e2aab8f331' -down_revision = '49b348246d70' -branch_labels = None -depends_on = None - - -def upgrade(): - op.add_column( - 'tx_cache', - sa.Column( - 'date_checked', - sa.DateTime, - nullable=False - ) - ) - pass - - -def downgrade(): - op.drop_column('tx_cache', 'date_checked') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/cd2052be6db2_convert_tx_index.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/cd2052be6db2_convert_tx_index.py deleted file mode 100644 index 5b59d734..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/cd2052be6db2_convert_tx_index.py +++ /dev/null @@ -1,34 +0,0 @@ -"""convert tx index - -Revision ID: cd2052be6db2 -Revises: 7cb65b893934 -Create Date: 2020-09-24 21:20:51.580500 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'cd2052be6db2' -down_revision = '7cb65b893934' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'tx_convert_transfer', - sa.Column('id', sa.Integer, primary_key=True), - #sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True), - sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True), - sa.Column('transfer_tx_hash', sa.String(66), unique=True), -# sa.Column('holder_address', sa.String(42), nullable=False), - sa.Column('recipient_address', sa.String(42), nullable=False), - ) - op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address']) - - -def downgrade(): - op.drop_index('idx_tx_convert_address') - op.drop_table('tx_convert_transfer') diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/df19f4e69676_add_tx_track.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/df19f4e69676_add_tx_track.py deleted file mode 100644 index 2c1ea138..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/df19f4e69676_add_tx_track.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Add tx tracker record - -Revision ID: df19f4e69676 -Revises: 71708e943dbd -Create Date: 2020-10-09 23:31:44.563498 - -""" -from alembic import op -import sqlalchemy as sa - -# revision identifiers, used by Alembic. -revision = 'df19f4e69676' -down_revision = '71708e943dbd' -branch_labels = None -depends_on = None - - -def upgrade(): -# op.create_table( -# 'tx', -# sa.Column('id', sa.Integer, primary_key=True), -# sa.Column('date_added', sa.DateTime, nullable=False), -# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True), -# sa.Column('success', sa.Boolean(), nullable=False), -# ) - pass - - -def downgrade(): -# op.drop_table('tx') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/e3b5330ee71c_.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/e3b5330ee71c_.py deleted file mode 100644 index 3abafb73..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/e3b5330ee71c_.py +++ /dev/null @@ -1,37 +0,0 @@ -"""Add cached values for tx - -Revision ID: e3b5330ee71c -Revises: df19f4e69676 -Create Date: 2020-10-10 00:17:07.094893 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'e3b5330ee71c' -down_revision = 'df19f4e69676' -branch_labels = None -depends_on = None - -def upgrade(): - op.create_table( - 'tx_cache', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime, nullable=False), - sa.Column('source_token_address', sa.String(42), nullable=False), - sa.Column('destination_token_address', sa.String(42), nullable=False), - sa.Column('sender', sa.String(42), nullable=False), - sa.Column('recipient', sa.String(42), nullable=False), - sa.Column('from_value', sa.NUMERIC(), nullable=False), - sa.Column('to_value', sa.NUMERIC(), nullable=True), - sa.Column('block_number', sa.BIGINT(), nullable=True), - sa.Column('tx_index', sa.Integer, nullable=True), - ) - -def downgrade(): - op.drop_table('tx_cache') - pass diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/ec40ac0974c1_add_chain_syncer.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/ec40ac0974c1_add_chain_syncer.py deleted file mode 100644 index a278997f..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/ec40ac0974c1_add_chain_syncer.py +++ /dev/null @@ -1,28 +0,0 @@ -"""Add chain syncer - -Revision ID: ec40ac0974c1 -Revises: 6ac7a1dadc46 -Create Date: 2021-02-23 06:10:19.246304 - -""" -from alembic import op -import sqlalchemy as sa -from chainsyncer.db.migrations.sqlalchemy import ( - chainsyncer_upgrade, - chainsyncer_downgrade, - ) - - -# revision identifiers, used by Alembic. -revision = 'ec40ac0974c1' -down_revision = '6ac7a1dadc46' -branch_labels = None -depends_on = None - - -def upgrade(): - chainsyncer_upgrade(0, 0, 1) - - -def downgrade(): - chainsyncer_downgrade(0, 0, 1) diff --git a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/f738d9962fdf_debug_output.py b/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/f738d9962fdf_debug_output.py deleted file mode 100644 index 378f2965..00000000 --- a/apps/cic-eth/cic_eth/db/migrations/postgresql/versions/f738d9962fdf_debug_output.py +++ /dev/null @@ -1,32 +0,0 @@ -"""debug output - -Revision ID: f738d9962fdf -Revises: ec40ac0974c1 -Create Date: 2021-03-04 08:32:43.281214 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'f738d9962fdf' -down_revision = 'ec40ac0974c1' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'debug', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('tag', sa.String, nullable=False), - sa.Column('description', sa.String, nullable=False), - sa.Column('date_created', sa.DateTime, nullable=False), - ) - pass - - -def downgrade(): - op.drop_table('debug') - pass diff --git a/apps/cic-eth/cic_eth/eth/bancor.py.bak b/apps/cic-eth/cic_eth/eth/bancor.py.bak index fdc01144..19008e24 100644 --- a/apps/cic-eth/cic_eth/eth/bancor.py.bak +++ b/apps/cic-eth/cic_eth/eth/bancor.py.bak @@ -222,7 +222,7 @@ def convert_with_default_reserve(self, tokens, from_address, source_amount, mini # ) # # s_set_sent = celery.signature( -# 'cic_eth.queue.tx.set_sent_status', +# 'cic_eth.queue.state.set_sent_status', # [False], # ) # s_send.link(s_set_sent) diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index a930b847..67000ec1 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -162,7 +162,7 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir wait_tasks = [] for tx_hash in tx_hashes: s = celery.signature( - 'cic_eth.queue.tx.set_waitforgas', + 'cic_eth.queue.state.set_waitforgas', [ tx_hash, ], @@ -195,7 +195,7 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir ready_tasks = [] for tx_hash in tx_hashes: s = celery.signature( - 'cic_eth.queue.tx.set_ready', + 'cic_eth.queue.state.set_ready', [ tx_hash, ], @@ -278,7 +278,7 @@ def send(self, txs, chain_spec_dict): r = None s_set_sent = celery.signature( - 'cic_eth.queue.tx.set_sent_status', + 'cic_eth.queue.state.set_sent_status', [ tx_hash_hex, False @@ -366,7 +366,7 @@ def refill_gas(self, recipient_address, chain_spec_dict): # add transaction to send queue s_status = celery.signature( - 'cic_eth.queue.tx.set_ready', + 'cic_eth.queue.state.set_ready', [ tx_hash_hex, ], @@ -525,7 +525,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict): logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success)) s = celery.signature( - 'cic_eth.queue.tx.set_final_status', + 'cic_eth.queue.state.set_final_status', [ tx_hash_hex, rcpt['blockNumber'], @@ -537,7 +537,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict): logg.debug('sync tx {} mempool'.format(tx_hash_hex)) s = celery.signature( - 'cic_eth.queue.tx.set_sent_status', + 'cic_eth.queue.state.set_sent_status', [ tx_hash_hex, ], diff --git a/apps/cic-eth/cic_eth/queue/balance.py b/apps/cic-eth/cic_eth/queue/balance.py index 1a5cf2db..a927a17a 100644 --- a/apps/cic-eth/cic_eth/queue/balance.py +++ b/apps/cic-eth/cic_eth/queue/balance.py @@ -5,15 +5,15 @@ import logging import celery from chainlib.chain import ChainSpec from hexathon import strip_0x - -# local imports -from cic_eth.db import SessionBase -from cic_eth.db.models.otx import Otx -from cic_eth.db.models.tx import TxCache -from cic_eth.db.enum import ( +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache +from chainqueue.db.enum import ( StatusBits, dead, ) + +# local imports +from cic_eth.db import SessionBase from cic_eth.task import CriticalSQLAlchemyTask celery_app = celery.current_app diff --git a/apps/cic-eth/cic_eth/queue/lock.py b/apps/cic-eth/cic_eth/queue/lock.py new file mode 100644 index 00000000..f25e4c2b --- /dev/null +++ b/apps/cic-eth/cic_eth/queue/lock.py @@ -0,0 +1,46 @@ +# external imports +from chainqueue.db.models.otx import Otx +import celery + +# local imports +from cic_eth.task import CriticalSQLAlchemyTask +from cic_eth.db import SessionBase +from cic_eth.db.models.lock import Lock + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def get_lock(address=None): + """Retrieve all active locks + + If address is set, the query will look up the lock for the specified address only. A list of zero or one elements is returned, depending on whether a lock is set or not. + + :param address: Get lock for only the specified address + :type address: str, 0x-hex + :returns: List of locks + :rtype: list of dicts + """ + session = SessionBase.create_session() + q = session.query( + Lock.date_created, + Lock.address, + Lock.flags, + Otx.tx_hash, + ) + q = q.join(Otx, isouter=True) + if address != None: + q = q.filter(Lock.address==address) + else: + q = q.order_by(Lock.date_created.asc()) + + locks = [] + for lock in q.all(): + o = { + 'date': lock[0], + 'address': lock[1], + 'tx_hash': lock[3], + 'flags': lock[2], + } + locks.append(o) + session.close() + + return locks diff --git a/apps/cic-eth/cic_eth/queue/query.py b/apps/cic-eth/cic_eth/queue/query.py new file mode 100644 index 00000000..615aa9d5 --- /dev/null +++ b/apps/cic-eth/cic_eth/queue/query.py @@ -0,0 +1,131 @@ +# external imports +from chainlib.chain import ChainSpec +from chainlib.eth.tx import unpack +import chainqueue.query +from sqlalchemy import func +from sqlalchemy import or_ + +# local imports +from cic_eth.task import CriticalSQLAlchemyTask +from cic_eth.db.models.lock import Lock + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def get_tx_cache(chain_spec_dict, tx_hash): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.query.get_tx_cache(chain_spec, tx_hash) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def get_tx(chain_spec_dict, tx_hash): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.query.get_tx(chain_spec, tx_hash) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def get_account_tx(chain_spec_dict, address, as_sender=True, as_recipient=True, counterpart=None): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.query.get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def get_upcoming_tx_nolock(chain_spec_dict, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.query.get_upcoming_tx(chain_spec, status, not_status=not_status, recipient=recipient, before=before, limit=limit, session=session, decoder=unpack): + + +def get_status_tx(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None): + return chainqueue.query.get_status_tx_cache(chain_spec, status, not_status=not_status, before=before, exact=exact, limit=limit, session=session, decoder=unpack) + + +def get_paused_tx(chain_spec, status=None, sender=None, session=None, decoder=None): + return chainqueue.query.get_paused_tx_cache(chain_spec, status=status, sender=sender, session=session, decoder=unpack) + + +def get_nonce_tx(chain_spec, nonce, sender): + return get_nonce_tx_cache(chain_spec, nonce, sender, decoder=unpack): + + +def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None): + """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. + + Will omit addresses that have the LockEnum.SEND bit in Lock set. + + (TODO) Will not return any rows if LockEnum.SEND bit in Lock is set for zero address. + + :param status: Defines the status used to filter as upcoming. + :type status: cic_eth.db.enum.StatusEnum + :param recipient: Ethereum address of recipient to return transaction for + :type recipient: str, 0x-hex + :param before: Only return transactions if their modification date is older than the given timestamp + :type before: datetime.datetime + :param chain_id: Chain id to use to parse signed transaction data + :type chain_id: number + :raises ValueError: Status is finalized, sent or never attempted sent + :returns: Transactions + :rtype: dict, with transaction hash as key, signed raw transaction as value + """ + session = SessionBase.bind_session(session) + q_outer = session.query( + TxCache.sender, + func.min(Otx.nonce).label('nonce'), + ) + q_outer = q_outer.join(TxCache) + q_outer = q_outer.join(Lock, isouter=True) + q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0)) + + if not is_alive(status): + SessionBase.release_session(session) + raise ValueError('not a valid non-final tx value: {}'.format(status)) + if status == StatusEnum.PENDING: + q_outer = q_outer.filter(Otx.status==status.value) + else: + q_outer = q_outer.filter(Otx.status.op('&')(status)==status) + + if not_status != None: + q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0) + + if recipient != None: + q_outer = q_outer.filter(TxCache.recipient==recipient) + + q_outer = q_outer.group_by(TxCache.sender) + + txs = {} + + i = 0 + for r in q_outer.all(): + q = session.query(Otx) + q = q.join(TxCache) + q = q.filter(TxCache.sender==r.sender) + q = q.filter(Otx.nonce==r.nonce) + + if before != None: + q = q.filter(TxCache.date_checked 0 and limit == i: + break + + SessionBase.release_session(session) + + return txs + diff --git a/apps/cic-eth/cic_eth/queue/state.py b/apps/cic-eth/cic_eth/queue/state.py new file mode 100644 index 00000000..44c11b63 --- /dev/null +++ b/apps/cic-eth/cic_eth/queue/state.py @@ -0,0 +1,67 @@ +# external imports +from chainlib.chain import ChainSpec +import chainqueue.state + +# local imports +from cic_eth.task import CriticalSQLAlchemyTask + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_sent_status(chain_spec_dict, tx_hash, fail=False): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.set_sent_status(chain_spec, tx_hash, fail) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_final_status(chain_spec_dict, tx_hash, block=None, fail=False): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.set_final_status(chain_spec, tx_hash, block, fail) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_cancel(chain_spec_dict, tx_hash, manual=False): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.set_cancel(chain_spec, tx_hash, manual) + +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_rejected(chain_spec_dict, tx_hash): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.set_rejected(chain_spec, tx_hash) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_fubar(chain_spec_dict, tx_hash): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.set_fubar(chain_spec, tx_hash) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_manual(chain_spec_dict, tx_hash): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.set_manual(chain_spec, tx_hash) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_ready(chain_spec_dict, tx_hash): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.set_ready(chain_spec, tx_hash) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_reserved(chain_spec_dict, tx_hash): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.set_reserved(chain_spec, tx_hash) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def set_waitforgas(chain_spec_dict, tx_hash): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.set_waitforgas(chain_spec, tx_hash) + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def get_state_log(chain_spec_dict, tx_hash): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + return chainqueue.state.get_state_log(chain_spec, tx_hash) + + diff --git a/apps/cic-eth/cic_eth/queue/time.py b/apps/cic-eth/cic_eth/queue/time.py index f85e55c0..e7ff7b64 100644 --- a/apps/cic-eth/cic_eth/queue/time.py +++ b/apps/cic-eth/cic_eth/queue/time.py @@ -7,10 +7,10 @@ from chainlib.chain import ChainSpec from chainlib.connection import RPCConnection from chainlib.eth.block import block_by_hash from chainlib.eth.tx import receipt +from chainqueue.db.models.otx import Otx +from chainqueue.error import NotLocalTxError # local imports -from cic_eth.db.models.otx import Otx -from cic_eth.error import NotLocalTxError from cic_eth.task import CriticalSQLAlchemyAndWeb3Task celery_app = celery.current_app diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 66b4e3a2..4d30b83e 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -5,95 +5,37 @@ import datetime # external imports import celery +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.otx import OtxStateLog +from chainqueue.db.models.tx import TxCache from hexathon import strip_0x from sqlalchemy import or_ from sqlalchemy import not_ from sqlalchemy import tuple_ from sqlalchemy import func +from chainlib.chain import ChainSpec from chainlib.eth.tx import unpack - -# local imports -from cic_eth.db.models.otx import Otx -from cic_eth.db.models.otx import OtxStateLog -from cic_eth.db.models.tx import TxCache -from cic_eth.db.models.lock import Lock -from cic_eth.db import SessionBase -from cic_eth.db.enum import ( +import chainqueue.state +from chainqueue.db.enum import ( StatusEnum, - LockEnum, StatusBits, is_alive, dead, ) +from chainqueue.error import NotLocalTxError +from chainqueue.db.enum import status_str + +# local imports +from cic_eth.db.models.lock import Lock +from cic_eth.db import SessionBase +from cic_eth.db.enum import LockEnum from cic_eth.task import CriticalSQLAlchemyTask -from cic_eth.error import NotLocalTxError from cic_eth.error import LockedError -from cic_eth.db.enum import status_str celery_app = celery.current_app -#logg = celery_app.log.get_default_logger() logg = logging.getLogger() -def create(nonce, holder_address, tx_hash, signed_tx, chain_spec, obsolete_predecessors=True, session=None): - """Create a new transaction queue record. - - :param nonce: Transaction nonce - :type nonce: int - :param holder_address: Sender address - :type holder_address: str, 0x-hex - :param tx_hash: Transaction hash - :type tx_hash: str, 0x-hex - :param signed_tx: Signed raw transaction - :type signed_tx: str, 0x-hex - :param chain_spec: Chain spec to create transaction for - :type chain_spec: ChainSpec - :returns: transaction hash - :rtype: str, 0x-hash - """ - session = SessionBase.bind_session(session) - lock = Lock.check_aggregate(str(chain_spec), LockEnum.QUEUE, holder_address, session=session) - if lock > 0: - SessionBase.release_session(session) - raise LockedError(lock) - - o = Otx.add( - nonce=nonce, - address=holder_address, - tx_hash=tx_hash, - signed_tx=signed_tx, - session=session, - ) - session.flush() - - if obsolete_predecessors: - q = session.query(Otx) - q = q.join(TxCache) - q = q.filter(Otx.nonce==nonce) - q = q.filter(TxCache.sender==holder_address) - q = q.filter(Otx.tx_hash!=tx_hash) - q = q.filter(Otx.status.op('&')(StatusBits.FINAL)==0) - - for otx in q.all(): - logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash)) - try: - otx.cancel(confirmed=False, session=session) - except TxStateChangeError as e: - logg.exception('obsolete fail: {}'.format(e)) - session.close() - raise(e) - except Exception as e: - logg.exception('obsolete UNEXPECTED fail: {}'.format(e)) - session.close() - raise(e) - - - session.commit() - SessionBase.release_session(session) - logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash)) - return tx_hash - - def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None): """Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING). @@ -134,683 +76,3 @@ def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=No queue=queue, ) s_cache.apply_async() - - return (tx_hash_hex, tx_signed_raw_hex,) - - -# TODO: Replace set_* with single task for set status -@celery_app.task(base=CriticalSQLAlchemyTask) -def set_sent_status(tx_hash, fail=False): - """Used to set the status after a send attempt - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :param fail: if True, will set a SENDFAIL status, otherwise a SENT status. (Default: False) - :type fail: boolean - :raises NotLocalTxError: If transaction not found in queue. - :returns: True if tx is known, False otherwise - :rtype: boolean - """ - session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - o = q.first() - if o == None: - logg.warning('not local tx, skippingĀ {}'.format(tx_hash)) - session.close() - return False - - try: - if fail: - o.sendfail(session=session) - else: - o.sent(session=session) - except TxStateChangeError as e: - logg.exception('set sent fail: {}'.format(e)) - session.close() - raise(e) - except Exception as e: - logg.exception('set sent UNEXPECED fail: {}'.format(e)) - session.close() - raise(e) - - - session.commit() - session.close() - - return tx_hash - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def set_final_status(tx_hash, block=None, fail=False): - """Used to set the status of an incoming transaction result. - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :param block: Block number if final status represents a confirmation on the network - :type block: number - :param fail: if True, will set a SUCCESS status, otherwise a REVERTED status. (Default: False) - :type fail: boolean - :raises NotLocalTxError: If transaction not found in queue. - """ - session = SessionBase.create_session() - q = session.query( - Otx.nonce.label('nonce'), - TxCache.sender.label('sender'), - Otx.id.label('otxid'), - ) - q = q.join(TxCache) - q = q.filter(Otx.tx_hash==tx_hash) - o = q.first() - - if o == None: - session.close() - raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) - - session.flush() - - nonce = o.nonce - sender = o.sender - otxid = o.otxid - - q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - o = q.first() - - try: - if fail: - o.minefail(block, session=session) - else: - o.success(block, session=session) - session.commit() - except TxStateChangeError as e: - logg.exception('set final fail: {}'.format(e)) - session.close() - raise(e) - except Exception as e: - logg.exception('set final UNEXPECED fail: {}'.format(e)) - session.close() - raise(e) - - q = session.query(Otx) - q = q.join(TxCache) - q = q.filter(Otx.nonce==nonce) - q = q.filter(TxCache.sender==sender) - q = q.filter(Otx.tx_hash!=tx_hash) - - for otwo in q.all(): - try: - otwo.cancel(True, session=session) - except TxStateChangeError as e: - logg.exception('cancel non-final fail: {}'.format(e)) - session.close() - raise(e) - except Exception as e: - logg.exception('cancel non-final UNEXPECTED fail: {}'.format(e)) - session.close() - raise(e) - session.commit() - session.close() - - return tx_hash - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def set_cancel(tx_hash, manual=False): - """Used to set the status when a transaction is cancelled. - - Will set the state to CANCELLED or OVERRIDDEN - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :param manual: If set, status will be OVERRIDDEN. Otherwise CANCELLED. - :type manual: boolean - :raises NotLocalTxError: If transaction not found in queue. - """ - - session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() - if o == None: - session.close() - raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) - - session.flush() - - try: - if manual: - o.override(session=session) - else: - o.cancel(session=session) - session.commit() - except TxStateChangeError as e: - logg.exception('set cancel fail: {}'.format(e)) - except Exception as e: - logg.exception('set cancel UNEXPECTED fail: {}'.format(e)) - session.close() - - return tx_hash - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def set_rejected(tx_hash): - """Used to set the status when the node rejects sending a transaction to network - - Will set the state to REJECTED - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :raises NotLocalTxError: If transaction not found in queue. - """ - - session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() - if o == None: - session.close() - raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) - - session.flush() - - o.reject(session=session) - session.commit() - session.close() - - return tx_hash - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def set_fubar(tx_hash): - """Used to set the status when an unexpected error occurs. - - Will set the state to FUBAR - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :raises NotLocalTxError: If transaction not found in queue. - """ - - session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() - if o == None: - session.close() - raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) - - session.flush() - - o.fubar(session=session) - session.commit() - session.close() - - return tx_hash - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def set_manual(tx_hash): - """Used to set the status when queue is manually changed - - Will set the state to MANUAL - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :raises NotLocalTxError: If transaction not found in queue. - """ - - session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() - if o == None: - session.close() - raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) - - session.flush() - - o.manual(session=session) - session.commit() - session.close() - - return tx_hash - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def set_ready(tx_hash): - """Used to mark a transaction as ready to be sent to network - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :raises NotLocalTxError: If transaction not found in queue. - """ - session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() - if o == None: - session.close() - raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) - session.flush() - - if o.status & StatusBits.GAS_ISSUES or o.status == StatusEnum.PENDING: - o.readysend(session=session) - else: - o.retry(session=session) - - session.commit() - session.close() - - return tx_hash - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def set_dequeue(tx_hash): - session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - o = q.first() - if o == None: - session.close() - raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) - - session.flush() - - o.dequeue(session=session) - session.commit() - session.close() - - return tx_hash - - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def set_waitforgas(tx_hash): - """Used to set the status when a transaction must be deferred due to gas refill - - Will set the state to WAITFORGAS - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :raises NotLocalTxError: If transaction not found in queue. - """ - - session = SessionBase.create_session() - o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() - if o == None: - session.close() - raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) - - session.flush() - - o.waitforgas(session=session) - session.commit() - session.close() - - return tx_hash - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def get_state_log(tx_hash): - - logs = [] - - session = SessionBase.create_session() - - q = session.query(OtxStateLog) - q = q.join(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - q = q.order_by(OtxStateLog.date.asc()) - for l in q.all(): - logs.append((l.date, l.status,)) - - session.close() - - return logs - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def get_tx_cache(tx_hash): - """Returns an aggregate dictionary of outgoing transaction data and metadata - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :raises NotLocalTxError: If transaction not found in queue. - :returns: Transaction data - :rtype: dict - """ - session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - otx = q.first() - - if otx == None: - session.close() - raise NotLocalTxError(tx_hash) - - session.flush() - - q = session.query(TxCache) - q = q.filter(TxCache.otx_id==otx.id) - txc = q.first() - - session.close() - - tx = { - 'tx_hash': otx.tx_hash, - 'signed_tx': otx.signed_tx, - 'nonce': otx.nonce, - 'status': status_str(otx.status), - 'status_code': otx.status, - 'source_token': txc.source_token_address, - 'destination_token': txc.destination_token_address, - 'block_number': txc.block_number, - 'tx_index': txc.tx_index, - 'sender': txc.sender, - 'recipient': txc.recipient, - 'from_value': int(txc.from_value), - 'to_value': int(txc.to_value), - 'date_created': txc.date_created, - 'date_updated': txc.date_updated, - 'date_checked': txc.date_checked, - } - - return tx - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def get_lock(address=None): - """Retrieve all active locks - - If address is set, the query will look up the lock for the specified address only. A list of zero or one elements is returned, depending on whether a lock is set or not. - - :param address: Get lock for only the specified address - :type address: str, 0x-hex - :returns: List of locks - :rtype: list of dicts - """ - session = SessionBase.create_session() - q = session.query( - Lock.date_created, - Lock.address, - Lock.flags, - Otx.tx_hash, - ) - q = q.join(Otx, isouter=True) - if address != None: - q = q.filter(Lock.address==address) - else: - q = q.order_by(Lock.date_created.asc()) - - locks = [] - for lock in q.all(): - o = { - 'date': lock[0], - 'address': lock[1], - 'tx_hash': lock[3], - 'flags': lock[2], - } - locks.append(o) - session.close() - - return locks - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def get_tx(tx_hash): - """Retrieve a transaction queue record by transaction hash - - :param tx_hash: Transaction hash of record to modify - :type tx_hash: str, 0x-hex - :raises NotLocalTxError: If transaction not found in queue. - :returns: nonce, address and signed_tx (raw signed transaction) - :rtype: dict - """ - session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - tx = q.first() - if tx == None: - session.close() - raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) - - o = { - 'otx_id': tx.id, - 'nonce': tx.nonce, - 'signed_tx': tx.signed_tx, - 'status': tx.status, - } - logg.debug('get tx {}'.format(o)) - session.close() - return o - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def get_nonce_tx(nonce, sender, chain_id): - """Retrieve all transactions for address with specified nonce - - :param nonce: Nonce - :type nonce: number - :param address: Ethereum address - :type address: str, 0x-hex - :returns: Transactions - :rtype: dict, with transaction hash as key, signed raw transaction as value - """ - session = SessionBase.create_session() - q = session.query(Otx) - q = q.join(TxCache) - q = q.filter(TxCache.sender==sender) - q = q.filter(Otx.nonce==nonce) - - txs = {} - for r in q.all(): - tx_signed_bytes = bytes.fromhex(r.signed_tx[2:]) - tx = unpack(tx_signed_bytes, chain_id) - if sender == None or tx['from'] == sender: - txs[r.tx_hash] = r.signed_tx - - session.close() - - return txs - - - -# TODO: pass chain spec instead of chain id -def get_paused_txs(status=None, sender=None, chain_id=0, session=None): - """Returns not finalized transactions that have been attempted sent without success. - - :param status: If set, will return transactions with this local queue status only - :type status: cic_eth.db.enum.StatusEnum - :param recipient: Recipient address to return transactions for - :type recipient: str, 0x-hex - :param chain_id: Numeric chain id to use to parse signed transaction data - :type chain_id: number - :raises ValueError: Status is finalized, sent or never attempted sent - :returns: Transactions - :rtype: dict, with transaction hash as key, signed raw transaction as value - """ - session = SessionBase.bind_session(session) - q = session.query(Otx) - - if status != None: - #if status == StatusEnum.PENDING or status >= StatusEnum.SENT: - if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status): - SessionBase.release_session(session) - raise ValueError('not a valid paused tx value: {}'.format(status)) - q = q.filter(Otx.status.op('&')(status.value)==status.value) - q = q.join(TxCache) - else: - q = q.filter(Otx.status>StatusEnum.PENDING.value) - q = q.filter(not_(Otx.status.op('&')(StatusBits.IN_NETWORK.value)>0)) - - if sender != None: - q = q.filter(TxCache.sender==sender) - - txs = {} - - for r in q.all(): - tx_signed_bytes = bytes.fromhex(r.signed_tx[2:]) - tx = unpack(tx_signed_bytes, chain_id) - if sender == None or tx['from'] == sender: - #gas += tx['gas'] * tx['gasPrice'] - txs[r.tx_hash] = r.signed_tx - - SessionBase.release_session(session) - - return txs - - -def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, session=None): - """Retrieve transaction with a specific queue status. - - :param status: Status to match transactions with - :type status: str - :param before: If set, return only transactions older than the timestamp - :type status: datetime.dateTime - :param limit: Limit amount of returned transactions - :type limit: number - :returns: Transactions - :rtype: list of cic_eth.db.models.otx.Otx - """ - txs = {} - session = SessionBase.bind_session(session) - q = session.query(Otx) - q = q.join(TxCache) - # before = datetime.datetime.utcnow() - if before != None: - q = q.filter(TxCache.date_updated0) - if not_status != None: - q = q.filter(Otx.status.op('&')(not_status)==0) - q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc()) - i = 0 - for o in q.all(): - if limit > 0 and i == limit: - break - txs[o.tx_hash] = o.signed_tx - i += 1 - SessionBase.release_session(session) - return txs - - -# TODO: move query to model -def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None): - """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. - - Will omit addresses that have the LockEnum.SEND bit in Lock set. - - (TODO) Will not return any rows if LockEnum.SEND bit in Lock is set for zero address. - - :param status: Defines the status used to filter as upcoming. - :type status: cic_eth.db.enum.StatusEnum - :param recipient: Ethereum address of recipient to return transaction for - :type recipient: str, 0x-hex - :param before: Only return transactions if their modification date is older than the given timestamp - :type before: datetime.datetime - :param chain_id: Chain id to use to parse signed transaction data - :type chain_id: number - :raises ValueError: Status is finalized, sent or never attempted sent - :returns: Transactions - :rtype: dict, with transaction hash as key, signed raw transaction as value - """ - session = SessionBase.bind_session(session) - q_outer = session.query( - TxCache.sender, - func.min(Otx.nonce).label('nonce'), - ) - q_outer = q_outer.join(TxCache) - q_outer = q_outer.join(Lock, isouter=True) - q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0)) - - if not is_alive(status): - SessionBase.release_session(session) - raise ValueError('not a valid non-final tx value: {}'.format(status)) - if status == StatusEnum.PENDING: - q_outer = q_outer.filter(Otx.status==status.value) - else: - q_outer = q_outer.filter(Otx.status.op('&')(status)==status) - - if not_status != None: - q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0) - - if recipient != None: - q_outer = q_outer.filter(TxCache.recipient==recipient) - - q_outer = q_outer.group_by(TxCache.sender) - - txs = {} - - i = 0 - for r in q_outer.all(): - q = session.query(Otx) - q = q.join(TxCache) - q = q.filter(TxCache.sender==r.sender) - q = q.filter(Otx.nonce==r.nonce) - - if before != None: - q = q.filter(TxCache.date_checked 0 and limit == i: - break - - SessionBase.release_session(session) - - return txs - - -@celery_app.task(base=CriticalSQLAlchemyTask) -def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None): - """Returns all local queue transactions for a given Ethereum address - - :param address: Ethereum address - :type address: str, 0x-hex - :param as_sender: If False, will omit transactions where address is sender - :type as_sender: bool - :param as_sender: If False, will omit transactions where address is recipient - :type as_sender: bool - :param counterpart: Only return transactions where this Ethereum address is the other end of the transaction (not in use) - :type counterpart: str, 0x-hex - :raises ValueError: If address is set to be neither sender nor recipient - :returns: Transactions - :rtype: dict, with transaction hash as key, signed raw transaction as value - """ - if not as_sender and not as_recipient: - raise ValueError('at least one of as_sender and as_recipient must be True') - - txs = {} - - session = SessionBase.create_session() - q = session.query(Otx) - q = q.join(TxCache) - if as_sender and as_recipient: - q = q.filter(or_(TxCache.sender==address, TxCache.recipient==address)) - elif as_sender: - q = q.filter(TxCache.sender==address) - else: - q = q.filter(TxCache.recipient==address) - q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc()) - - results = q.all() - for r in results: - if txs.get(r.tx_hash) != None: - logg.debug('tx {} already recorded'.format(r.tx_hash)) - continue - txs[r.tx_hash] = r.signed_tx - session.close() - - return txs - diff --git a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py index 52ec2532..7e783c3e 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/filters/tx.py @@ -34,7 +34,7 @@ class TxFilter(SyncFilter): db_session.flush() SessionBase.release_session(db_session) s = celery.signature( - 'cic_eth.queue.tx.set_final_status', + 'cic_eth.queue.state.set_final_status', [ add_0x(tx_hash_hex), tx.block.number, diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index b7889a32..e15712ae 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -113,7 +113,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_spec): # ) # s_retry_status = celery.signature( -# 'cic_eth.queue.tx.set_ready', +# 'cic_eth.queue.state.set_ready', # [], # queue=queue, # ) diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index fd316be0..3a8ae728 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -14,6 +14,7 @@ import confini from chainlib.connection import RPCConnection from chainlib.eth.connection import EthUnixSignerConnection from chainlib.chain import ChainSpec +from chainqueue.db.models.otx import Otx # local imports from cic_eth.eth import erc20 @@ -21,14 +22,14 @@ from cic_eth.eth import tx from cic_eth.eth import account from cic_eth.admin import debug from cic_eth.admin import ctrl -from cic_eth.queue import tx +from cic_eth.queue import state +from cic_eth.queue import query from cic_eth.queue import balance from cic_eth.callbacks import Callback from cic_eth.callbacks import http from cic_eth.callbacks import tcp from cic_eth.callbacks import redis from cic_eth.db.models.base import SessionBase -from cic_eth.db.models.otx import Otx from cic_eth.db import dsn_from_config from cic_eth.ext import tx from cic_eth.registry import ( diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index e382cd39..ab7824f7 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -19,6 +19,7 @@ eth-address-index~=0.1.1a5 chainlib~=0.0.1a43 hexathon~=0.0.1a7 chainsyncer~=0.0.1a20 +chainqueue~=0.0.1a1 pysha3==1.0.2 coincurve==15.0.0 sarafu-faucet~=0.0.2a15 diff --git a/apps/cic-eth/tests/unit/queue/test_balances.py b/apps/cic-eth/tests/unit/queue/test_balances.py index 168401c9..8d028b75 100644 --- a/apps/cic-eth/tests/unit/queue/test_balances.py +++ b/apps/cic-eth/tests/unit/queue/test_balances.py @@ -2,12 +2,12 @@ import os import logging -# third-party imports +# external imports import pytest +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache # local imports -from cic_eth.db.models.otx import Otx -from cic_eth.db.models.tx import TxCache from cic_eth.queue.balance import ( balance_outgoing, balance_incoming, diff --git a/apps/cic-eth/tests/unit/queue/test_list_tx.py b/apps/cic-eth/tests/unit/queue/test_list_tx.py deleted file mode 100644 index 397ed07e..00000000 --- a/apps/cic-eth/tests/unit/queue/test_list_tx.py +++ /dev/null @@ -1,76 +0,0 @@ -# standard imports -import logging - -# external imports -from chainlib.connection import RPCConnection -from chainlib.eth.gas import RPCGasOracle -from chainlib.eth.nonce import RPCNonceOracle -from chainlib.eth.gas import Gas - -# local imports -from cic_eth.queue.tx import get_status_tx -from cic_eth.db.enum import ( - StatusEnum, - StatusBits, - ) -from cic_eth.queue.tx import create as queue_create -from cic_eth.eth.tx import cache_gas_data -from cic_eth.queue.tx import register_tx -from cic_eth.db.models.otx import Otx - -logg = logging.getLogger() - - -def test_status_tx_list( - default_chain_spec, - init_database, - eth_rpc, - eth_signer, - agent_roles, - ): - - rpc = RPCConnection.connect(default_chain_spec, 'default') - - nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) - gas_oracle = RPCGasOracle(eth_rpc) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - - (tx_hash_hex, o) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024) - r = rpc.do(o) - - tx_signed_raw_hex = o['params'][0] - #queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) - register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) - cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) - - q = init_database.query(Otx) - otx = q.get(1) - otx.sendfail(session=init_database) - init_database.add(otx) - init_database.commit() - init_database.refresh(otx) - - txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database) - assert len(txs) == 1 - - otx.sendfail(session=init_database) - otx.retry(session=init_database) - init_database.add(otx) - init_database.commit() - init_database.refresh(otx) - - txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database) - assert len(txs) == 1 - - txs = get_status_tx(StatusBits.QUEUED, session=init_database) - assert len(txs) == 1 - - txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.LOCAL_ERROR, session=init_database) - assert len(txs) == 0 - - txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK, session=init_database) - assert len(txs) == 1 - - txs = get_status_tx(StatusBits.IN_NETWORK, session=init_database) - assert len(txs) == 0 - diff --git a/apps/cic-eth/tests/unit/queue/test_otx_state_log.py b/apps/cic-eth/tests/unit/queue/test_otx_state_log.py deleted file mode 100644 index 9c5194da..00000000 --- a/apps/cic-eth/tests/unit/queue/test_otx_state_log.py +++ /dev/null @@ -1,22 +0,0 @@ -# standard imports -import os - -# local imports -from cic_eth.db.models.otx import Otx -from cic_eth.queue.tx import get_state_log - - -def test_otx_state_log( - init_database, - ): - - Otx.tracing = True - - address = '0x' + os.urandom(20).hex() - tx_hash = '0x' + os.urandom(32).hex() - signed_tx = '0x' + os.urandom(128).hex() - otx = Otx.add(0, address, tx_hash, signed_tx, session=init_database) - init_database.commit() - - log = get_state_log(tx_hash) - assert len(log) == 1 diff --git a/apps/cic-eth/tests/unit/queue/test_queue_lock.py b/apps/cic-eth/tests/unit/queue/test_queue_lock.py index fe18fa16..4ca65ab2 100644 --- a/apps/cic-eth/tests/unit/queue/test_queue_lock.py +++ b/apps/cic-eth/tests/unit/queue/test_queue_lock.py @@ -3,9 +3,9 @@ import os # third-party imports import pytest +from chainqueue.tx import create as queue_create # local imports -from cic_eth.queue.tx import create as queue_create from cic_eth.db.models.lock import Lock from cic_eth.db.enum import LockEnum from cic_eth.error import LockedError diff --git a/apps/cic-eth/tests/unit/queue/test_tx_queue.py b/apps/cic-eth/tests/unit/queue/test_tx_queue.py deleted file mode 100644 index 385b3fb5..00000000 --- a/apps/cic-eth/tests/unit/queue/test_tx_queue.py +++ /dev/null @@ -1,577 +0,0 @@ -# standard imports import logging -import datetime -import os -import logging - -# external imports -import pytest -from sqlalchemy import DateTime -from chainlib.connection import RPCConnection -from chainlib.eth.nonce import OverrideNonceOracle -from chainlib.eth.tx import unpack -from chainlib.eth.gas import ( - RPCGasOracle, - Gas, - ) -from chainlib.eth.constant import ZERO_ADDRESS -from hexathon import strip_0x - -# local imports -from cic_eth.eth.tx import cache_gas_data -from cic_eth.db.models.otx import Otx -from cic_eth.db.models.otx import OtxSync -from cic_eth.db.models.tx import TxCache -from cic_eth.db.models.lock import Lock -from cic_eth.db.models.base import SessionBase -from cic_eth.db.enum import ( - StatusEnum, - LockEnum, - StatusBits, - is_alive, - is_error_status, - status_str, - ) -from cic_eth.queue.tx import create as queue_create -from cic_eth.queue.tx import set_final_status -from cic_eth.queue.tx import set_sent_status -from cic_eth.queue.tx import set_waitforgas -from cic_eth.queue.tx import set_ready -from cic_eth.queue.tx import get_paused_txs -from cic_eth.queue.tx import get_upcoming_tx -from cic_eth.queue.tx import get_account_tx -from cic_eth.queue.tx import get_tx -from cic_eth.db.error import TxStateChangeError -from cic_eth.queue.tx import register_tx - -# test imports -from tests.util.nonce import StaticNonceOracle - -logg = logging.getLogger() - - -def test_finalize( - default_chain_spec, - eth_rpc, - eth_signer, - init_database, - agent_roles, - ): - - rpc = RPCConnection.connect(default_chain_spec, 'default') - nonce_oracle = StaticNonceOracle(0) - gas_oracle = RPCGasOracle(eth_rpc) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - - txs_rpc = [ - c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)), - c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)), - c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)), - c.create(agent_roles['ALICE'], agent_roles['BOB'], 400 * (10 ** 6)), - ] - - nonce_oracle = StaticNonceOracle(1) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - txs_rpc.append(c.create(agent_roles['ALICE'], agent_roles['BOB'], 500 * (10 ** 6))) - - tx_hashes = [] - i = 0 - for entry in txs_rpc: - tx_hash_hex = entry[0] - tx_rpc = entry[1] - tx_signed_raw_hex = tx_rpc['params'][0] - - register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) - cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) - - tx_hashes.append(tx_hash_hex) - - if i < 3: - set_sent_status(tx_hash_hex) - - i += 1 - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first() - assert otx.status & StatusBits.OBSOLETE - assert not is_alive(otx.status) - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first() - assert otx.status & StatusBits.OBSOLETE - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first() - assert otx.status & StatusBits.OBSOLETE - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first() - assert otx.status == StatusEnum.PENDING - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[4]).first() - assert otx.status == StatusEnum.PENDING - - set_sent_status(tx_hashes[3], False) - set_sent_status(tx_hashes[4], False) - set_final_status(tx_hashes[3], 1024) - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first() - assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL) - assert not is_alive(otx.status) - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first() - assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL) - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first() - assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL) - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first() - assert otx.status & (StatusBits.IN_NETWORK | StatusBits.FINAL) - assert not is_error_status(otx.status) - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[4]).first() - assert otx.status & (StatusBits.IN_NETWORK | StatusBits.FINAL) - assert not is_error_status(otx.status) - - -def test_expired( - default_chain_spec, - init_database, - eth_rpc, - eth_signer, - agent_roles, - ): - - rpc = RPCConnection.connect(default_chain_spec, 'default') - nonce_oracle = StaticNonceOracle(42) - gas_oracle = RPCGasOracle(eth_rpc) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - - txs_rpc = [ - c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)), - c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)), - ] - - nonce_oracle = StaticNonceOracle(43) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - txs_rpc += [ - c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)), - c.create(agent_roles['ALICE'], agent_roles['BOB'], 400 * (10 ** 6)), - ] - - nonce_oracle = StaticNonceOracle(44) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - txs_rpc.append(c.create(agent_roles['ALICE'], agent_roles['BOB'], 500 * (10 ** 6))) - - tx_hashes = [] - - i = 0 - for entry in txs_rpc: - tx_hash_hex = entry[0] - tx_rpc = entry[1] - tx_signed_raw_hex = tx_rpc['params'][0] - - register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) - cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) - - tx_hashes.append(tx_hash_hex) - - set_sent_status(tx_hash_hex, False) - - otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first() - fake_created = datetime.datetime.utcnow() - datetime.timedelta(seconds=40*i) - otx.date_created = fake_created - init_database.add(otx) - init_database.commit() - init_database.refresh(otx) - - i += 1 - - now = datetime.datetime.utcnow() - delta = datetime.timedelta(seconds=61) - then = now - delta - - otxs = OtxSync.get_expired(then) - nonce_acc = 0 - for otx in otxs: - nonce_acc += otx.nonce - - assert nonce_acc == (43 + 44) - - -def test_get_paused( - init_database, - default_chain_spec, - eth_rpc, - eth_signer, - agent_roles, - ): - - chain_id = default_chain_spec.chain_id() - rpc = RPCConnection.connect(default_chain_spec, 'default') - nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 42) - gas_oracle = RPCGasOracle(eth_rpc) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - - txs_rpc = [ - c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)), - c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)), - ] - - tx_hashes = [] - for entry in txs_rpc: - tx_hash_hex = entry[0] - tx_rpc = entry[1] - tx_signed_raw_hex = tx_rpc['params'][0] - - register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) - cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) - - tx_hashes.append(tx_hash_hex) - - txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) - assert len(txs.keys()) == 0 - - q = init_database.query(Otx) - q = q.filter(Otx.tx_hash==tx_hashes[0]) - r = q.first() - r.waitforgas(session=init_database) - init_database.add(r) - init_database.commit() - - chain_id = default_chain_spec.chain_id() - txs = get_paused_txs(chain_id=chain_id) - assert len(txs.keys()) == 1 - - txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0]) - assert len(txs.keys()) == 1 - - txs = get_paused_txs(status=StatusBits.GAS_ISSUES) - assert len(txs.keys()) == 1 - - txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) - assert len(txs.keys()) == 1 - - - q = init_database.query(Otx) - q = q.filter(Otx.tx_hash==tx_hashes[1]) - o = q.first() - o.waitforgas(session=init_database) - init_database.add(o) - init_database.commit() - - txs = get_paused_txs() - assert len(txs.keys()) == 2 - - txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0]) - assert len(txs.keys()) == 2 - - txs = get_paused_txs(status=StatusBits.GAS_ISSUES, chain_id=chain_id) - assert len(txs.keys()) == 2 - - txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) # init_w3.eth.accounts[0]) - assert len(txs.keys()) == 2 - - q = init_database.query(Otx) - q = q.filter(Otx.tx_hash==tx_hashes[1]) - o = q.first() - o.sendfail(session=init_database) - init_database.add(o) - init_database.commit() - - txs = get_paused_txs() - assert len(txs.keys()) == 2 - - txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0]) - assert len(txs.keys()) == 2 - - txs = get_paused_txs(status=StatusBits.GAS_ISSUES, chain_id=chain_id) - txs = get_paused_txs(status=StatusEnum.WAITFORGAS, chain_id=chain_id) - assert len(txs.keys()) == 1 - - txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) # init_w3.eth.accounts[0]) - assert len(txs.keys()) == 1 - - -def test_get_upcoming( - default_chain_spec, - eth_rpc, - eth_signer, - init_database, - agent_roles, - ): - - chain_id = default_chain_spec.chain_id() - rpc = RPCConnection.connect(default_chain_spec, 'default') - nonce_oracle = StaticNonceOracle(42) - gas_oracle = RPCGasOracle(eth_rpc) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - - txs_rpc = [ - c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)), - c.create(agent_roles['BOB'], agent_roles['DAVE'], 200 * (10 ** 6)), - c.create(agent_roles['CAROL'], agent_roles['DAVE'], 300 * (10 ** 6)), - ] - - nonce_oracle = StaticNonceOracle(43) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - txs_rpc += [ - c.create(agent_roles['ALICE'], agent_roles['DAVE'], 400 * (10 ** 6)), - c.create(agent_roles['BOB'], agent_roles['DAVE'], 500 * (10 ** 6)), - c.create(agent_roles['CAROL'], agent_roles['DAVE'], 600 * (10 ** 6)), - ] - - nonce_oracle = StaticNonceOracle(44) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - txs_rpc += [ - c.create(agent_roles['ALICE'], agent_roles['DAVE'], 700 * (10 ** 6)), - ] - - tx_hashes = [] - for entry in txs_rpc: - tx_hash_hex = entry[0] - tx_rpc = entry[1] - tx_signed_raw_hex = tx_rpc['params'][0] - - register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) - cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) - - tx_hashes.append(tx_hash_hex) - - set_ready(tx_hash_hex) - - txs = get_upcoming_tx(StatusBits.QUEUED, chain_id=chain_id) - assert len(txs.keys()) == 3 - - tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[0]])), chain_id) - assert tx['nonce'] == 42 - - tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[1]])), chain_id) - assert tx['nonce'] == 42 - - tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[2]])), chain_id) - assert tx['nonce'] == 42 - - q = init_database.query(TxCache) - q = q.filter(TxCache.sender==agent_roles['ALICE']) - for o in q.all(): - o.date_checked -= datetime.timedelta(seconds=30) - init_database.add(o) - init_database.commit() - - before = datetime.datetime.now() - datetime.timedelta(seconds=20) - logg.debug('before {}'.format(before)) - txs = get_upcoming_tx(StatusBits.QUEUED, before=before) - logg.debug('txs {} {}'.format(txs.keys(), txs.values())) - assert len(txs.keys()) == 1 - - # Now date checked has been set to current time, and the check returns no results - txs = get_upcoming_tx(StatusBits.QUEUED, before=before) - logg.debug('txs {} {}'.format(txs.keys(), txs.values())) - assert len(txs.keys()) == 0 - - set_sent_status(tx_hashes[0]) - - txs = get_upcoming_tx(StatusBits.QUEUED) - assert len(txs.keys()) == 3 - with pytest.raises(KeyError): - tx = txs[tx_hashes[0]] - - tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[3]])), chain_id) - assert tx['nonce'] == 43 - - set_waitforgas(tx_hashes[1]) - txs = get_upcoming_tx(StatusBits.QUEUED) - assert len(txs.keys()) == 3 - with pytest.raises(KeyError): - tx = txs[tx_hashes[1]] - - tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[3]])), chain_id) - assert tx['nonce'] == 43 - - txs = get_upcoming_tx(StatusBits.GAS_ISSUES) - assert len(txs.keys()) == 1 - - -def test_upcoming_with_lock( - default_chain_spec, - init_database, - eth_rpc, - eth_signer, - agent_roles, - ): - - chain_id = int(default_chain_spec.chain_id()) - - rpc = RPCConnection.connect(default_chain_spec, 'default') - nonce_oracle = StaticNonceOracle(42) - gas_oracle = RPCGasOracle(eth_rpc) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - - (tx_hash_hex, tx_rpc) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)) - tx_signed_raw_hex = tx_rpc['params'][0] - - register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) - cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) - - txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) - assert len(txs.keys()) == 1 - - Lock.set(str(default_chain_spec), LockEnum.SEND, address=agent_roles['ALICE']) - - txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) - assert len(txs.keys()) == 0 - - (tx_hash_hex, tx_rpc) = c.create(agent_roles['BOB'], agent_roles['ALICE'], 100 * (10 ** 6)) - tx_signed_raw_hex = tx_rpc['params'][0] - - register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) - cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) - - txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) - assert len(txs.keys()) == 1 - - -def test_obsoletion( - default_chain_spec, - init_database, - eth_rpc, - eth_signer, - agent_roles, - ): - - chain_id = default_chain_spec.chain_id() - rpc = RPCConnection.connect(default_chain_spec, 'default') - nonce_oracle = StaticNonceOracle(42) - gas_oracle = RPCGasOracle(eth_rpc) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - - txs_rpc = [ - c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)), - c.create(agent_roles['ALICE'], agent_roles['DAVE'], 200 * (10 ** 6)), - c.create(agent_roles['BOB'], agent_roles['DAVE'], 300 * (10 ** 6)), - ] - - nonce_oracle = StaticNonceOracle(43) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - txs_rpc += [ - c.create(agent_roles['BOB'], agent_roles['DAVE'], 400 * (10 ** 6)), - ] - - tx_hashes = [] - i = 0 - for entry in txs_rpc: - tx_hash_hex = entry[0] - tx_rpc = entry[1] - tx_signed_raw_hex = tx_rpc['params'][0] - - register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) - cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) - - tx_hashes.append(tx_hash_hex) - - if i < 2: - set_sent_status(tx_hash_hex) - - i += 1 - - session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.status.op('&')(StatusEnum.OBSOLETED.value)==StatusEnum.OBSOLETED.value) - z = 0 - for o in q.all(): - z += o.nonce - - session.close() - assert z == 42 - - set_final_status(tx_hashes[1], 1023, True) - - session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.OBSOLETED.value) - zo = 0 - for o in q.all(): - zo += o.nonce - - q = session.query(Otx) - q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.CANCELLED.value) - zc = 0 - for o in q.all(): - zc += o.nonce - - session.close() - assert zo == 0 - assert zc == 42 - - -def test_retry( - init_database, - ): - - address = '0x' + os.urandom(20).hex() - tx_hash = '0x' + os.urandom(32).hex() - signed_tx = '0x' + os.urandom(128).hex() - otx = Otx(0, address, tx_hash, signed_tx) - init_database.add(otx) - init_database.commit() - - set_sent_status(tx_hash, True) - set_ready(tx_hash) - - q = init_database.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - otx = q.first() - - assert (otx.status & StatusEnum.RETRY.value) == StatusEnum.RETRY.value - assert is_error_status(otx.status) - - set_sent_status(tx_hash, False) - set_ready(tx_hash) - - init_database.commit() - - q = init_database.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - otx = q.first() - - assert (otx.status & StatusEnum.RETRY.value) == StatusBits.QUEUED.value - assert not is_error_status(otx.status) - - -def test_get_account_tx( - default_chain_spec, - init_database, - eth_rpc, - eth_signer, - agent_roles, - ): - - chain_id = default_chain_spec.chain_id() - rpc = RPCConnection.connect(default_chain_spec, 'default') - nonce_oracle = OverrideNonceOracle(ZERO_ADDRESS, 42) - gas_oracle = RPCGasOracle(eth_rpc) - c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) - - txs_rpc = [ - c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)), - c.create(agent_roles['ALICE'], agent_roles['CAROL'], 200 * (10 ** 6)), - c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)), - c.create(agent_roles['BOB'], agent_roles['ALICE'], 300 * (10 ** 6)), - ] - - tx_hashes = [] - for entry in txs_rpc: - tx_hash_hex = entry[0] - tx_rpc = entry[1] - tx_signed_raw_hex = tx_rpc['params'][0] - - register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) - cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) - - tx_hashes.append(tx_hash_hex) - - txs = get_account_tx(agent_roles['ALICE']) - logg.debug('tx {} tx {}'.format(list(txs.keys()), tx_hashes)) - assert list(txs.keys()) == tx_hashes - - txs = get_account_tx(agent_roles['ALICE'], as_recipient=False) - assert list(txs.keys()) == tx_hashes[:3] - - txs = get_account_tx(agent_roles['ALICE'], as_sender=False) - assert list(txs.keys()) == tx_hashes[3:]