WIP Expunge obsolete db migrations, bring in chainqueue

This commit is contained in:
nolash 2021-04-02 18:45:51 +02:00
parent 1a97f1e97d
commit 68e02ba5b8
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
59 changed files with 383 additions and 2677 deletions

View File

@ -92,7 +92,7 @@ class AdminApi:
def get_lock(self): def get_lock(self):
s_lock = celery.signature( s_lock = celery.signature(
'cic_eth.queue.tx.get_lock', 'cic_eth.queue.lock.get_lock',
[], [],
queue=self.queue, queue=self.queue,
) )
@ -165,7 +165,7 @@ class AdminApi:
) )
s_manual = celery.signature( s_manual = celery.signature(
'cic_eth.queue.tx.set_manual', 'cic_eth.queue.state.set_manual',
[ [
tx_hash_hex, tx_hash_hex,
], ],

View File

@ -0,0 +1,29 @@
"""Add chainqueue
Revision ID: 0ec0d6d1e785
Revises:
Create Date: 2021-04-02 18:30:55.398388
"""
from alembic import op
import sqlalchemy as sa
from chainqueue.db.migrations.sqlalchemy import (
chainqueue_upgrade,
chainqueue_downgrade,
)
# revision identifiers, used by Alembic.
revision = '0ec0d6d1e785'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
chainqueue_upgrade(0, 0, 1)
def downgrade():
chainqueue_downgrade(0, 0, 1)

View File

@ -0,0 +1,29 @@
"""Roles
Revision ID: 1f1b3b641d08
Revises: 9c420530eeb2
Create Date: 2021-04-02 18:40:27.787631
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1f1b3b641d08'
down_revision = '9c420530eeb2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
def downgrade():
op.drop_table('account_role')

View File

@ -1,35 +0,0 @@
"""Add new syncer table
Revision ID: 2a07b543335e
Revises: a2e2aab8f331
Create Date: 2020-12-27 09:35:44.017981
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a07b543335e'
down_revision = 'a2e2aab8f331'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'blockchain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
def downgrade():
op.drop_table('blockchain_sync')

View File

@ -1,31 +0,0 @@
"""Nonce reservation
Revision ID: 3b693afd526a
Revises: f738d9962fdf
Create Date: 2021-03-05 07:09:50.898728
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3b693afd526a'
down_revision = 'f738d9962fdf'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('key', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
def downgrade():
op.drop_table('nonce_task_reservation')

View File

@ -1,29 +0,0 @@
"""Add nonce index
Revision ID: 49b348246d70
Revises: 52c7c59cd0b1
Create Date: 2020-12-19 09:45:36.186446
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '49b348246d70'
down_revision = '52c7c59cd0b1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('nonce')

View File

@ -1,31 +0,0 @@
"""Add account roles
Revision ID: 52c7c59cd0b1
Revises: 9c4bd7491015
Create Date: 2020-12-19 07:21:38.249237
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '52c7c59cd0b1'
down_revision = '9c4bd7491015'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
pass
def downgrade():
op.drop_table('account_role')
pass

View File

@ -1,8 +1,8 @@
"""debug output """DEbug
Revision ID: f738d9962fdf Revision ID: 5ca4b77ce205
Revises: ec40ac0974c1 Revises: 75d4767b3031
Create Date: 2021-03-04 08:32:43.281214 Create Date: 2021-04-02 18:42:12.257244
""" """
from alembic import op from alembic import op
@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = 'f738d9962fdf' revision = '5ca4b77ce205'
down_revision = 'ec40ac0974c1' down_revision = '75d4767b3031'
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@ -24,9 +24,7 @@ def upgrade():
sa.Column('description', sa.String, nullable=False), sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False), sa.Column('date_created', sa.DateTime, nullable=False),
) )
pass
def downgrade(): def downgrade():
op.drop_table('debug') op.drop_table('debug')
pass

View File

@ -1,30 +0,0 @@
"""Add otx state log
Revision ID: 6ac7a1dadc46
Revises: 89e1e9baa53c
Create Date: 2021-01-30 13:59:49.022373
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ac7a1dadc46'
down_revision = '89e1e9baa53c'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_state_log',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
sa.Column('status', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('otx_state_log')

View File

@ -1,31 +0,0 @@
"""Add attempts and version log for otx
Revision ID: 71708e943dbd
Revises: 7e8d7626e38f
Create Date: 2020-09-26 14:41:19.298651
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '71708e943dbd'
down_revision = '7e8d7626e38f'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_attempts',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('otx_attempts')
pass

View File

@ -1,8 +1,8 @@
"""Add account lock """Lock
Revision ID: 89e1e9baa53c Revision ID: 75d4767b3031
Revises: 2a07b543335e Revises: 1f1b3b641d08
Create Date: 2021-01-27 19:57:36.793882 Create Date: 2021-04-02 18:41:20.864265
""" """
from alembic import op from alembic import op
@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = '89e1e9baa53c' revision = '75d4767b3031'
down_revision = '2a07b543335e' down_revision = '1f1b3b641d08'
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@ -28,6 +28,7 @@ def upgrade():
) )
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True) op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
def downgrade(): def downgrade():
op.drop_index('idx_chain_address') op.drop_index('idx_chain_address')
op.drop_table('lock') op.drop_table('lock')

View File

@ -1,31 +0,0 @@
"""add blocknumber pointer
Revision ID: 7cb65b893934
Revises: 8593fa1ca0f4
Create Date: 2020-09-24 19:29:13.543648
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7cb65b893934'
down_revision = '8593fa1ca0f4'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass
def downgrade():
op.drop_table('watcher_state')
pass

View File

@ -1,45 +0,0 @@
"""Add block sync
Revision ID: 7e8d7626e38f
Revises: cd2052be6db2
Create Date: 2020-09-26 11:12:27.818524
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7e8d7626e38f'
down_revision = 'cd2052be6db2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'block_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False, unique=True),
sa.Column('block_height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('block_height_session', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_session', sa.Integer, nullable=False, default=0),
sa.Column('block_height_head', sa.Integer, nullable=False, default=0),
sa.Column('tx_height_head', sa.Integer, nullable=False, default=0),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.drop_table('watcher_state')
pass
def downgrade():
op.drop_table('block_sync')
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass

View File

@ -1,35 +0,0 @@
"""Add transaction queue
Revision ID: 8593fa1ca0f4
Revises:
Create Date: 2020-09-22 21:56:42.117047
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '8593fa1ca0f4'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('signed_tx', sa.Text, nullable=False),
sa.Column('status', sa.Integer, nullable=False, default=-9),
sa.Column('block', sa.Integer),
)
op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True)
def downgrade():
op.drop_index('idx_otx_tx')
op.drop_table('otx')

View File

@ -1,8 +1,8 @@
"""Nonce reservation """Nonce
Revision ID: 3b693afd526a Revision ID: 9c420530eeb2
Revises: f738d9962fdf Revises: b125cbf81e32
Create Date: 2021-03-05 07:09:50.898728 Create Date: 2021-04-02 18:38:56.459334
""" """
from alembic import op from alembic import op
@ -10,15 +10,22 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = '3b693afd526a' revision = '9c420530eeb2'
down_revision = 'f738d9962fdf' down_revision = 'b125cbf81e32'
branch_labels = None branch_labels = None
depends_on = None depends_on = None
def upgrade(): def upgrade():
op.create_table( op.create_table(
'nonce_task_reservation', 'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
op.create_table(
'nonce_task_reservation',
sa.Column('id', sa.Integer, primary_key=True), sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False), sa.Column('address_hex', sa.String(42), nullable=False),
sa.Column('nonce', sa.Integer, nullable=False), sa.Column('nonce', sa.Integer, nullable=False),
@ -29,3 +36,4 @@ def upgrade():
def downgrade(): def downgrade():
op.drop_table('nonce_task_reservation') op.drop_table('nonce_task_reservation')
op.drop_table('nonce')

View File

@ -1,26 +0,0 @@
"""Rename block sync table
Revision ID: 9c4bd7491015
Revises: 9daa16518a91
Create Date: 2020-10-15 23:45:56.306898
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c4bd7491015'
down_revision = '9daa16518a91'
branch_labels = None
depends_on = None
def upgrade():
op.rename_table('block_sync', 'otx_sync')
pass
def downgrade():
op.rename_table('otx_sync', 'block_sync')
pass

View File

@ -1,30 +0,0 @@
"""add tx sync state
Revision ID: 9daa16518a91
Revises: e3b5330ee71c
Create Date: 2020-10-10 14:43:18.699276
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9daa16518a91'
down_revision = 'e3b5330ee71c'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx_sync',
# sa.Column('tx', sa.String(66), nullable=False),
# )
# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')")
pass
def downgrade():
# op.drop_table('tx_sync')
pass

View File

@ -1,34 +0,0 @@
"""Add date accessed to txcache
Revision ID: a2e2aab8f331
Revises: 49b348246d70
Create Date: 2020-12-24 18:58:06.137812
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a2e2aab8f331'
down_revision = '49b348246d70'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'tx_cache',
sa.Column(
'date_checked',
sa.DateTime,
nullable=False
)
)
pass
def downgrade():
# drop does not work withs qlite
#op.drop_column('tx_cache', 'date_checked')
pass

View File

@ -1,8 +1,8 @@
"""convert tx index """Convert
Revision ID: cd2052be6db2 Revision ID: aee12aeb47ec
Revises: 7cb65b893934 Revises: 5ca4b77ce205
Create Date: 2020-09-24 21:20:51.580500 Create Date: 2021-04-02 18:42:45.233356
""" """
from alembic import op from alembic import op
@ -10,8 +10,8 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = 'cd2052be6db2' revision = 'aee12aeb47ec'
down_revision = '7cb65b893934' down_revision = '5ca4b77ce205'
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@ -20,10 +20,8 @@ def upgrade():
op.create_table( op.create_table(
'tx_convert_transfer', 'tx_convert_transfer',
sa.Column('id', sa.Integer, primary_key=True), sa.Column('id', sa.Integer, primary_key=True),
#sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True), sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('transfer_tx_hash', sa.String(66), unique=True), sa.Column('transfer_tx_hash', sa.String(66), unique=True),
# sa.Column('holder_address', sa.String(42), nullable=False),
sa.Column('recipient_address', sa.String(42), nullable=False), sa.Column('recipient_address', sa.String(42), nullable=False),
) )
op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address']) op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address'])

View File

@ -1,12 +1,13 @@
"""Add chain syncer """Add chain syncer
Revision ID: ec40ac0974c1 Revision ID: b125cbf81e32
Revises: 6ac7a1dadc46 Revises: 0ec0d6d1e785
Create Date: 2021-02-23 06:10:19.246304 Create Date: 2021-04-02 18:36:44.459603
""" """
from alembic import op from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import ( from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade, chainsyncer_upgrade,
chainsyncer_downgrade, chainsyncer_downgrade,
@ -14,15 +15,15 @@ from chainsyncer.db.migrations.sqlalchemy import (
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = 'ec40ac0974c1' revision = 'b125cbf81e32'
down_revision = '6ac7a1dadc46' down_revision = '0ec0d6d1e785'
branch_labels = None branch_labels = None
depends_on = None depends_on = None
def upgrade(): def upgrade():
chainsyncer_upgrade(0, 0, 1) chainsyncer_upgrade(0, 0, 1)
def downgrade(): def downgrade():
chainsyncer_downgrade(0, 0, 1) chainsyncer_downgrade(0, 0, 1)

View File

@ -1,31 +0,0 @@
"""Add tx tracker record
Revision ID: df19f4e69676
Revises: 71708e943dbd
Create Date: 2020-10-09 23:31:44.563498
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'df19f4e69676'
down_revision = '71708e943dbd'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx',
# sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('date_added', sa.DateTime, nullable=False),
# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True),
# sa.Column('success', sa.Boolean(), nullable=False),
# )
pass
def downgrade():
# op.drop_table('tx')
pass

View File

@ -1,38 +0,0 @@
"""Add cached values for tx
Revision ID: e3b5330ee71c
Revises: df19f4e69676
Create Date: 2020-10-10 00:17:07.094893
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e3b5330ee71c'
down_revision = 'df19f4e69676'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_cache',
sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('tx_id', sa.Integer, sa.ForeignKey('tx.id'), nullable=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime, nullable=False),
sa.Column('source_token_address', sa.String(42), nullable=False),
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)
def downgrade():
op.drop_table('tx_cache')
pass

View File

@ -1,85 +0,0 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = .
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat migrations/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
#sqlalchemy.url = driver://user:pass@localhost/dbname
sqlalchemy.url = postgresql+psycopg2://postgres@localhost:5432/cic-eth
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks=black
# black.type=console_scripts
# black.entrypoint=black
# black.options=-l 79
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -1,77 +0,0 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -1,24 +0,0 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -1,35 +0,0 @@
"""Add new syncer table
Revision ID: 2a07b543335e
Revises: a2e2aab8f331
Create Date: 2020-12-27 09:35:44.017981
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a07b543335e'
down_revision = 'a2e2aab8f331'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'blockchain_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False),
sa.Column('block_start', sa.Integer, nullable=False, default=0),
sa.Column('tx_start', sa.Integer, nullable=False, default=0),
sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
sa.Column('block_target', sa.Integer, nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
def downgrade():
op.drop_table('blockchain_sync')

View File

@ -1,29 +0,0 @@
"""Add nonce index
Revision ID: 49b348246d70
Revises: 52c7c59cd0b1
Create Date: 2020-12-19 09:45:36.186446
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '49b348246d70'
down_revision = '52c7c59cd0b1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'nonce',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('address_hex', sa.String(42), nullable=False, unique=True),
sa.Column('nonce', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('nonce')

View File

@ -1,31 +0,0 @@
"""Add account roles
Revision ID: 52c7c59cd0b1
Revises: 9c4bd7491015
Create Date: 2020-12-19 07:21:38.249237
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '52c7c59cd0b1'
down_revision = '9c4bd7491015'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'account_role',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.Text, nullable=False, unique=True),
sa.Column('address_hex', sa.String(42), nullable=False),
)
pass
def downgrade():
op.drop_table('account_role')
pass

View File

@ -1,30 +0,0 @@
"""Add otx state log
Revision ID: 6ac7a1dadc46
Revises: 89e1e9baa53c
Create Date: 2021-01-30 13:59:49.022373
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6ac7a1dadc46'
down_revision = '89e1e9baa53c'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_state_log',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
sa.Column('status', sa.Integer, nullable=False),
)
def downgrade():
op.drop_table('otx_state_log')

View File

@ -1,31 +0,0 @@
"""Add attempts and version log for otx
Revision ID: 71708e943dbd
Revises: 7e8d7626e38f
Create Date: 2020-09-26 14:41:19.298651
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '71708e943dbd'
down_revision = '7e8d7626e38f'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx_attempts',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False),
sa.Column('date', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('otx_attempts')
pass

View File

@ -1,31 +0,0 @@
"""add blocknumber pointer
Revision ID: 7cb65b893934
Revises: 8593fa1ca0f4
Create Date: 2020-09-24 19:29:13.543648
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7cb65b893934'
down_revision = '8593fa1ca0f4'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass
def downgrade():
op.drop_table('watcher_state')
pass

View File

@ -1,42 +0,0 @@
"""Add block sync
Revision ID: 7e8d7626e38f
Revises: cd2052be6db2
Create Date: 2020-09-26 11:12:27.818524
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '7e8d7626e38f'
down_revision = 'cd2052be6db2'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'block_sync',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('blockchain', sa.String, nullable=False, unique=True),
sa.Column('height_backlog', sa.Integer, nullable=False, default=0),
sa.Column('height_session', sa.Integer, nullable=False, default=0),
sa.Column('height_head', sa.Integer, nullable=False, default=0),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime),
)
op.drop_table('watcher_state')
pass
def downgrade():
op.drop_table('block_sync')
op.create_table(
'watcher_state',
sa.Column('block_number', sa.Integer)
)
conn = op.get_bind()
conn.execute('INSERT INTO watcher_state (block_number) VALUES (0);')
pass

View File

@ -1,35 +0,0 @@
"""Add transaction queue
Revision ID: 8593fa1ca0f4
Revises:
Create Date: 2020-09-22 21:56:42.117047
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '8593fa1ca0f4'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'otx',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('nonce', sa.Integer, nullable=False),
sa.Column('tx_hash', sa.String(66), nullable=False),
sa.Column('signed_tx', sa.Text, nullable=False),
sa.Column('status', sa.Integer, nullable=False, default=-9),
sa.Column('block', sa.Integer),
)
op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True)
def downgrade():
op.drop_index('idx_otx_tx')
op.drop_table('otx')

View File

@ -1,33 +0,0 @@
"""Add account lock
Revision ID: 89e1e9baa53c
Revises: 2a07b543335e
Create Date: 2021-01-27 19:57:36.793882
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '89e1e9baa53c'
down_revision = '2a07b543335e'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'lock',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column("address", sa.String(42), nullable=True),
sa.Column('blockchain', sa.String),
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
sa.Column("date_created", sa.DateTime, nullable=False),
sa.Column("otx_id", sa.Integer, nullable=True),
)
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
def downgrade():
op.drop_index('idx_chain_address')
op.drop_table('lock')

View File

@ -1,26 +0,0 @@
"""Rename block sync table
Revision ID: 9c4bd7491015
Revises: 9daa16518a91
Create Date: 2020-10-15 23:45:56.306898
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9c4bd7491015'
down_revision = '9daa16518a91'
branch_labels = None
depends_on = None
def upgrade():
op.rename_table('block_sync', 'otx_sync')
pass
def downgrade():
op.rename_table('otx_sync', 'block_sync')
pass

View File

@ -1,30 +0,0 @@
"""add tx sync state
Revision ID: 9daa16518a91
Revises: e3b5330ee71c
Create Date: 2020-10-10 14:43:18.699276
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9daa16518a91'
down_revision = 'e3b5330ee71c'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx_sync',
# sa.Column('tx', sa.String(66), nullable=False),
# )
# op.execute("INSERT INTO tx_sync VALUES('0x0000000000000000000000000000000000000000000000000000000000000000')")
pass
def downgrade():
# op.drop_table('tx_sync')
pass

View File

@ -1,33 +0,0 @@
"""Add date accessed to txcache
Revision ID: a2e2aab8f331
Revises: 49b348246d70
Create Date: 2020-12-24 18:58:06.137812
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a2e2aab8f331'
down_revision = '49b348246d70'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'tx_cache',
sa.Column(
'date_checked',
sa.DateTime,
nullable=False
)
)
pass
def downgrade():
op.drop_column('tx_cache', 'date_checked')
pass

View File

@ -1,34 +0,0 @@
"""convert tx index
Revision ID: cd2052be6db2
Revises: 7cb65b893934
Create Date: 2020-09-24 21:20:51.580500
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'cd2052be6db2'
down_revision = '7cb65b893934'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_convert_transfer',
sa.Column('id', sa.Integer, primary_key=True),
#sa.Column('approve_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('convert_tx_hash', sa.String(66), nullable=False, unique=True),
sa.Column('transfer_tx_hash', sa.String(66), unique=True),
# sa.Column('holder_address', sa.String(42), nullable=False),
sa.Column('recipient_address', sa.String(42), nullable=False),
)
op.create_index('idx_tx_convert_address', 'tx_convert_transfer', ['recipient_address'])
def downgrade():
op.drop_index('idx_tx_convert_address')
op.drop_table('tx_convert_transfer')

View File

@ -1,31 +0,0 @@
"""Add tx tracker record
Revision ID: df19f4e69676
Revises: 71708e943dbd
Create Date: 2020-10-09 23:31:44.563498
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'df19f4e69676'
down_revision = '71708e943dbd'
branch_labels = None
depends_on = None
def upgrade():
# op.create_table(
# 'tx',
# sa.Column('id', sa.Integer, primary_key=True),
# sa.Column('date_added', sa.DateTime, nullable=False),
# sa.Column('tx_hash', sa.String(66), nullable=False, unique=True),
# sa.Column('success', sa.Boolean(), nullable=False),
# )
pass
def downgrade():
# op.drop_table('tx')
pass

View File

@ -1,37 +0,0 @@
"""Add cached values for tx
Revision ID: e3b5330ee71c
Revises: df19f4e69676
Create Date: 2020-10-10 00:17:07.094893
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e3b5330ee71c'
down_revision = 'df19f4e69676'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'tx_cache',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
sa.Column('date_created', sa.DateTime, nullable=False),
sa.Column('date_updated', sa.DateTime, nullable=False),
sa.Column('source_token_address', sa.String(42), nullable=False),
sa.Column('destination_token_address', sa.String(42), nullable=False),
sa.Column('sender', sa.String(42), nullable=False),
sa.Column('recipient', sa.String(42), nullable=False),
sa.Column('from_value', sa.NUMERIC(), nullable=False),
sa.Column('to_value', sa.NUMERIC(), nullable=True),
sa.Column('block_number', sa.BIGINT(), nullable=True),
sa.Column('tx_index', sa.Integer, nullable=True),
)
def downgrade():
op.drop_table('tx_cache')
pass

View File

@ -1,28 +0,0 @@
"""Add chain syncer
Revision ID: ec40ac0974c1
Revises: 6ac7a1dadc46
Create Date: 2021-02-23 06:10:19.246304
"""
from alembic import op
import sqlalchemy as sa
from chainsyncer.db.migrations.sqlalchemy import (
chainsyncer_upgrade,
chainsyncer_downgrade,
)
# revision identifiers, used by Alembic.
revision = 'ec40ac0974c1'
down_revision = '6ac7a1dadc46'
branch_labels = None
depends_on = None
def upgrade():
chainsyncer_upgrade(0, 0, 1)
def downgrade():
chainsyncer_downgrade(0, 0, 1)

View File

@ -1,32 +0,0 @@
"""debug output
Revision ID: f738d9962fdf
Revises: ec40ac0974c1
Create Date: 2021-03-04 08:32:43.281214
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f738d9962fdf'
down_revision = 'ec40ac0974c1'
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'debug',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('tag', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False),
sa.Column('date_created', sa.DateTime, nullable=False),
)
pass
def downgrade():
op.drop_table('debug')
pass

View File

@ -222,7 +222,7 @@ def convert_with_default_reserve(self, tokens, from_address, source_amount, mini
# ) # )
# #
# s_set_sent = celery.signature( # s_set_sent = celery.signature(
# 'cic_eth.queue.tx.set_sent_status', # 'cic_eth.queue.state.set_sent_status',
# [False], # [False],
# ) # )
# s_send.link(s_set_sent) # s_send.link(s_set_sent)

View File

@ -162,7 +162,7 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
wait_tasks = [] wait_tasks = []
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
s = celery.signature( s = celery.signature(
'cic_eth.queue.tx.set_waitforgas', 'cic_eth.queue.state.set_waitforgas',
[ [
tx_hash, tx_hash,
], ],
@ -195,7 +195,7 @@ def check_gas(self, tx_hashes, chain_spec_dict, txs=[], address=None, gas_requir
ready_tasks = [] ready_tasks = []
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
s = celery.signature( s = celery.signature(
'cic_eth.queue.tx.set_ready', 'cic_eth.queue.state.set_ready',
[ [
tx_hash, tx_hash,
], ],
@ -278,7 +278,7 @@ def send(self, txs, chain_spec_dict):
r = None r = None
s_set_sent = celery.signature( s_set_sent = celery.signature(
'cic_eth.queue.tx.set_sent_status', 'cic_eth.queue.state.set_sent_status',
[ [
tx_hash_hex, tx_hash_hex,
False False
@ -366,7 +366,7 @@ def refill_gas(self, recipient_address, chain_spec_dict):
# add transaction to send queue # add transaction to send queue
s_status = celery.signature( s_status = celery.signature(
'cic_eth.queue.tx.set_ready', 'cic_eth.queue.state.set_ready',
[ [
tx_hash_hex, tx_hash_hex,
], ],
@ -525,7 +525,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success)) logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success))
s = celery.signature( s = celery.signature(
'cic_eth.queue.tx.set_final_status', 'cic_eth.queue.state.set_final_status',
[ [
tx_hash_hex, tx_hash_hex,
rcpt['blockNumber'], rcpt['blockNumber'],
@ -537,7 +537,7 @@ def sync_tx(self, tx_hash_hex, chain_spec_dict):
logg.debug('sync tx {} mempool'.format(tx_hash_hex)) logg.debug('sync tx {} mempool'.format(tx_hash_hex))
s = celery.signature( s = celery.signature(
'cic_eth.queue.tx.set_sent_status', 'cic_eth.queue.state.set_sent_status',
[ [
tx_hash_hex, tx_hash_hex,
], ],

View File

@ -5,15 +5,15 @@ import logging
import celery import celery
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from hexathon import strip_0x from hexathon import strip_0x
from chainqueue.db.models.otx import Otx
# local imports from chainqueue.db.models.tx import TxCache
from cic_eth.db import SessionBase from chainqueue.db.enum import (
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import (
StatusBits, StatusBits,
dead, dead,
) )
# local imports
from cic_eth.db import SessionBase
from cic_eth.task import CriticalSQLAlchemyTask from cic_eth.task import CriticalSQLAlchemyTask
celery_app = celery.current_app celery_app = celery.current_app

View File

@ -0,0 +1,46 @@
# external imports
from chainqueue.db.models.otx import Otx
import celery
# local imports
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.db import SessionBase
from cic_eth.db.models.lock import Lock
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_lock(address=None):
"""Retrieve all active locks
If address is set, the query will look up the lock for the specified address only. A list of zero or one elements is returned, depending on whether a lock is set or not.
:param address: Get lock for only the specified address
:type address: str, 0x-hex
:returns: List of locks
:rtype: list of dicts
"""
session = SessionBase.create_session()
q = session.query(
Lock.date_created,
Lock.address,
Lock.flags,
Otx.tx_hash,
)
q = q.join(Otx, isouter=True)
if address != None:
q = q.filter(Lock.address==address)
else:
q = q.order_by(Lock.date_created.asc())
locks = []
for lock in q.all():
o = {
'date': lock[0],
'address': lock[1],
'tx_hash': lock[3],
'flags': lock[2],
}
locks.append(o)
session.close()
return locks

View File

@ -0,0 +1,131 @@
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
import chainqueue.query
from sqlalchemy import func
from sqlalchemy import or_
# local imports
from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.db.models.lock import Lock
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_tx_cache(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.query.get_tx_cache(chain_spec, tx_hash)
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_tx(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.query.get_tx(chain_spec, tx_hash)
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_account_tx(chain_spec_dict, address, as_sender=True, as_recipient=True, counterpart=None):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.query.get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None)
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_upcoming_tx_nolock(chain_spec_dict, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.query.get_upcoming_tx(chain_spec, status, not_status=not_status, recipient=recipient, before=before, limit=limit, session=session, decoder=unpack):
def get_status_tx(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None):
return chainqueue.query.get_status_tx_cache(chain_spec, status, not_status=not_status, before=before, exact=exact, limit=limit, session=session, decoder=unpack)
def get_paused_tx(chain_spec, status=None, sender=None, session=None, decoder=None):
return chainqueue.query.get_paused_tx_cache(chain_spec, status=status, sender=sender, session=session, decoder=unpack)
def get_nonce_tx(chain_spec, nonce, sender):
return get_nonce_tx_cache(chain_spec, nonce, sender, decoder=unpack):
def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None):
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
Will omit addresses that have the LockEnum.SEND bit in Lock set.
(TODO) Will not return any rows if LockEnum.SEND bit in Lock is set for zero address.
:param status: Defines the status used to filter as upcoming.
:type status: cic_eth.db.enum.StatusEnum
:param recipient: Ethereum address of recipient to return transaction for
:type recipient: str, 0x-hex
:param before: Only return transactions if their modification date is older than the given timestamp
:type before: datetime.datetime
:param chain_id: Chain id to use to parse signed transaction data
:type chain_id: number
:raises ValueError: Status is finalized, sent or never attempted sent
:returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value
"""
session = SessionBase.bind_session(session)
q_outer = session.query(
TxCache.sender,
func.min(Otx.nonce).label('nonce'),
)
q_outer = q_outer.join(TxCache)
q_outer = q_outer.join(Lock, isouter=True)
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if not is_alive(status):
SessionBase.release_session(session)
raise ValueError('not a valid non-final tx value: {}'.format(status))
if status == StatusEnum.PENDING:
q_outer = q_outer.filter(Otx.status==status.value)
else:
q_outer = q_outer.filter(Otx.status.op('&')(status)==status)
if not_status != None:
q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0)
if recipient != None:
q_outer = q_outer.filter(TxCache.recipient==recipient)
q_outer = q_outer.group_by(TxCache.sender)
txs = {}
i = 0
for r in q_outer.all():
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==r.sender)
q = q.filter(Otx.nonce==r.nonce)
if before != None:
q = q.filter(TxCache.date_checked<before)
q = q.order_by(TxCache.date_created.desc())
o = q.first()
# TODO: audit; should this be possible if a row is found in the initial query? If not, at a minimum log error.
if o == None:
continue
tx_signed_bytes = bytes.fromhex(o.signed_tx[2:])
tx = unpack(tx_signed_bytes, chain_id)
txs[o.tx_hash] = o.signed_tx
q = session.query(TxCache)
q = q.filter(TxCache.otx_id==o.id)
o = q.first()
o.date_checked = datetime.datetime.now()
session.add(o)
session.commit()
i += 1
if limit > 0 and limit == i:
break
SessionBase.release_session(session)
return txs

View File

@ -0,0 +1,67 @@
# external imports
from chainlib.chain import ChainSpec
import chainqueue.state
# local imports
from cic_eth.task import CriticalSQLAlchemyTask
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_sent_status(chain_spec_dict, tx_hash, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.set_sent_status(chain_spec, tx_hash, fail)
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_final_status(chain_spec_dict, tx_hash, block=None, fail=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.set_final_status(chain_spec, tx_hash, block, fail)
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_cancel(chain_spec_dict, tx_hash, manual=False):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.set_cancel(chain_spec, tx_hash, manual)
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_rejected(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.set_rejected(chain_spec, tx_hash)
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_fubar(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.set_fubar(chain_spec, tx_hash)
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_manual(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.set_manual(chain_spec, tx_hash)
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_ready(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.set_ready(chain_spec, tx_hash)
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_reserved(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.set_reserved(chain_spec, tx_hash)
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_waitforgas(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.set_waitforgas(chain_spec, tx_hash)
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_state_log(chain_spec_dict, tx_hash):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
return chainqueue.state.get_state_log(chain_spec, tx_hash)

View File

@ -7,10 +7,10 @@ from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.eth.block import block_by_hash from chainlib.eth.block import block_by_hash
from chainlib.eth.tx import receipt from chainlib.eth.tx import receipt
from chainqueue.db.models.otx import Otx
from chainqueue.error import NotLocalTxError
# local imports # local imports
from cic_eth.db.models.otx import Otx
from cic_eth.error import NotLocalTxError
from cic_eth.task import CriticalSQLAlchemyAndWeb3Task from cic_eth.task import CriticalSQLAlchemyAndWeb3Task
celery_app = celery.current_app celery_app = celery.current_app

View File

@ -5,95 +5,37 @@ import datetime
# external imports # external imports
import celery import celery
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.otx import OtxStateLog
from chainqueue.db.models.tx import TxCache
from hexathon import strip_0x from hexathon import strip_0x
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy import not_ from sqlalchemy import not_
from sqlalchemy import tuple_ from sqlalchemy import tuple_
from sqlalchemy import func from sqlalchemy import func
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack from chainlib.eth.tx import unpack
import chainqueue.state
# local imports from chainqueue.db.enum import (
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.otx import OtxStateLog
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.lock import Lock
from cic_eth.db import SessionBase
from cic_eth.db.enum import (
StatusEnum, StatusEnum,
LockEnum,
StatusBits, StatusBits,
is_alive, is_alive,
dead, dead,
) )
from chainqueue.error import NotLocalTxError
from chainqueue.db.enum import status_str
# local imports
from cic_eth.db.models.lock import Lock
from cic_eth.db import SessionBase
from cic_eth.db.enum import LockEnum
from cic_eth.task import CriticalSQLAlchemyTask from cic_eth.task import CriticalSQLAlchemyTask
from cic_eth.error import NotLocalTxError
from cic_eth.error import LockedError from cic_eth.error import LockedError
from cic_eth.db.enum import status_str
celery_app = celery.current_app celery_app = celery.current_app
#logg = celery_app.log.get_default_logger()
logg = logging.getLogger() logg = logging.getLogger()
def create(nonce, holder_address, tx_hash, signed_tx, chain_spec, obsolete_predecessors=True, session=None):
"""Create a new transaction queue record.
:param nonce: Transaction nonce
:type nonce: int
:param holder_address: Sender address
:type holder_address: str, 0x-hex
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param signed_tx: Signed raw transaction
:type signed_tx: str, 0x-hex
:param chain_spec: Chain spec to create transaction for
:type chain_spec: ChainSpec
:returns: transaction hash
:rtype: str, 0x-hash
"""
session = SessionBase.bind_session(session)
lock = Lock.check_aggregate(str(chain_spec), LockEnum.QUEUE, holder_address, session=session)
if lock > 0:
SessionBase.release_session(session)
raise LockedError(lock)
o = Otx.add(
nonce=nonce,
address=holder_address,
tx_hash=tx_hash,
signed_tx=signed_tx,
session=session,
)
session.flush()
if obsolete_predecessors:
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(Otx.nonce==nonce)
q = q.filter(TxCache.sender==holder_address)
q = q.filter(Otx.tx_hash!=tx_hash)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL)==0)
for otx in q.all():
logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash))
try:
otx.cancel(confirmed=False, session=session)
except TxStateChangeError as e:
logg.exception('obsolete fail: {}'.format(e))
session.close()
raise(e)
except Exception as e:
logg.exception('obsolete UNEXPECTED fail: {}'.format(e))
session.close()
raise(e)
session.commit()
SessionBase.release_session(session)
logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash))
return tx_hash
def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None): def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None):
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING). """Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
@ -134,683 +76,3 @@ def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=No
queue=queue, queue=queue,
) )
s_cache.apply_async() s_cache.apply_async()
return (tx_hash_hex, tx_signed_raw_hex,)
# TODO: Replace set_* with single task for set status
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_sent_status(tx_hash, fail=False):
"""Used to set the status after a send attempt
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:param fail: if True, will set a SENDFAIL status, otherwise a SENT status. (Default: False)
:type fail: boolean
:raises NotLocalTxError: If transaction not found in queue.
:returns: True if tx is known, False otherwise
:rtype: boolean
"""
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
if o == None:
logg.warning('not local tx, skipping {}'.format(tx_hash))
session.close()
return False
try:
if fail:
o.sendfail(session=session)
else:
o.sent(session=session)
except TxStateChangeError as e:
logg.exception('set sent fail: {}'.format(e))
session.close()
raise(e)
except Exception as e:
logg.exception('set sent UNEXPECED fail: {}'.format(e))
session.close()
raise(e)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_final_status(tx_hash, block=None, fail=False):
"""Used to set the status of an incoming transaction result.
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:param block: Block number if final status represents a confirmation on the network
:type block: number
:param fail: if True, will set a SUCCESS status, otherwise a REVERTED status. (Default: False)
:type fail: boolean
:raises NotLocalTxError: If transaction not found in queue.
"""
session = SessionBase.create_session()
q = session.query(
Otx.nonce.label('nonce'),
TxCache.sender.label('sender'),
Otx.id.label('otxid'),
)
q = q.join(TxCache)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
nonce = o.nonce
sender = o.sender
otxid = o.otxid
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
try:
if fail:
o.minefail(block, session=session)
else:
o.success(block, session=session)
session.commit()
except TxStateChangeError as e:
logg.exception('set final fail: {}'.format(e))
session.close()
raise(e)
except Exception as e:
logg.exception('set final UNEXPECED fail: {}'.format(e))
session.close()
raise(e)
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(Otx.nonce==nonce)
q = q.filter(TxCache.sender==sender)
q = q.filter(Otx.tx_hash!=tx_hash)
for otwo in q.all():
try:
otwo.cancel(True, session=session)
except TxStateChangeError as e:
logg.exception('cancel non-final fail: {}'.format(e))
session.close()
raise(e)
except Exception as e:
logg.exception('cancel non-final UNEXPECTED fail: {}'.format(e))
session.close()
raise(e)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_cancel(tx_hash, manual=False):
"""Used to set the status when a transaction is cancelled.
Will set the state to CANCELLED or OVERRIDDEN
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:param manual: If set, status will be OVERRIDDEN. Otherwise CANCELLED.
:type manual: boolean
:raises NotLocalTxError: If transaction not found in queue.
"""
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
try:
if manual:
o.override(session=session)
else:
o.cancel(session=session)
session.commit()
except TxStateChangeError as e:
logg.exception('set cancel fail: {}'.format(e))
except Exception as e:
logg.exception('set cancel UNEXPECTED fail: {}'.format(e))
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_rejected(tx_hash):
"""Used to set the status when the node rejects sending a transaction to network
Will set the state to REJECTED
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:raises NotLocalTxError: If transaction not found in queue.
"""
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.reject(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_fubar(tx_hash):
"""Used to set the status when an unexpected error occurs.
Will set the state to FUBAR
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:raises NotLocalTxError: If transaction not found in queue.
"""
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.fubar(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_manual(tx_hash):
"""Used to set the status when queue is manually changed
Will set the state to MANUAL
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:raises NotLocalTxError: If transaction not found in queue.
"""
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.manual(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_ready(tx_hash):
"""Used to mark a transaction as ready to be sent to network
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:raises NotLocalTxError: If transaction not found in queue.
"""
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
if o.status & StatusBits.GAS_ISSUES or o.status == StatusEnum.PENDING:
o.readysend(session=session)
else:
o.retry(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_dequeue(tx_hash):
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
o = q.first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.dequeue(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def set_waitforgas(tx_hash):
"""Used to set the status when a transaction must be deferred due to gas refill
Will set the state to WAITFORGAS
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:raises NotLocalTxError: If transaction not found in queue.
"""
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.waitforgas(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_state_log(tx_hash):
logs = []
session = SessionBase.create_session()
q = session.query(OtxStateLog)
q = q.join(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
q = q.order_by(OtxStateLog.date.asc())
for l in q.all():
logs.append((l.date, l.status,))
session.close()
return logs
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_tx_cache(tx_hash):
"""Returns an aggregate dictionary of outgoing transaction data and metadata
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:raises NotLocalTxError: If transaction not found in queue.
:returns: Transaction data
:rtype: dict
"""
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first()
if otx == None:
session.close()
raise NotLocalTxError(tx_hash)
session.flush()
q = session.query(TxCache)
q = q.filter(TxCache.otx_id==otx.id)
txc = q.first()
session.close()
tx = {
'tx_hash': otx.tx_hash,
'signed_tx': otx.signed_tx,
'nonce': otx.nonce,
'status': status_str(otx.status),
'status_code': otx.status,
'source_token': txc.source_token_address,
'destination_token': txc.destination_token_address,
'block_number': txc.block_number,
'tx_index': txc.tx_index,
'sender': txc.sender,
'recipient': txc.recipient,
'from_value': int(txc.from_value),
'to_value': int(txc.to_value),
'date_created': txc.date_created,
'date_updated': txc.date_updated,
'date_checked': txc.date_checked,
}
return tx
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_lock(address=None):
"""Retrieve all active locks
If address is set, the query will look up the lock for the specified address only. A list of zero or one elements is returned, depending on whether a lock is set or not.
:param address: Get lock for only the specified address
:type address: str, 0x-hex
:returns: List of locks
:rtype: list of dicts
"""
session = SessionBase.create_session()
q = session.query(
Lock.date_created,
Lock.address,
Lock.flags,
Otx.tx_hash,
)
q = q.join(Otx, isouter=True)
if address != None:
q = q.filter(Lock.address==address)
else:
q = q.order_by(Lock.date_created.asc())
locks = []
for lock in q.all():
o = {
'date': lock[0],
'address': lock[1],
'tx_hash': lock[3],
'flags': lock[2],
}
locks.append(o)
session.close()
return locks
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_tx(tx_hash):
"""Retrieve a transaction queue record by transaction hash
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:raises NotLocalTxError: If transaction not found in queue.
:returns: nonce, address and signed_tx (raw signed transaction)
:rtype: dict
"""
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
tx = q.first()
if tx == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
o = {
'otx_id': tx.id,
'nonce': tx.nonce,
'signed_tx': tx.signed_tx,
'status': tx.status,
}
logg.debug('get tx {}'.format(o))
session.close()
return o
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_nonce_tx(nonce, sender, chain_id):
"""Retrieve all transactions for address with specified nonce
:param nonce: Nonce
:type nonce: number
:param address: Ethereum address
:type address: str, 0x-hex
:returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value
"""
session = SessionBase.create_session()
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==sender)
q = q.filter(Otx.nonce==nonce)
txs = {}
for r in q.all():
tx_signed_bytes = bytes.fromhex(r.signed_tx[2:])
tx = unpack(tx_signed_bytes, chain_id)
if sender == None or tx['from'] == sender:
txs[r.tx_hash] = r.signed_tx
session.close()
return txs
# TODO: pass chain spec instead of chain id
def get_paused_txs(status=None, sender=None, chain_id=0, session=None):
"""Returns not finalized transactions that have been attempted sent without success.
:param status: If set, will return transactions with this local queue status only
:type status: cic_eth.db.enum.StatusEnum
:param recipient: Recipient address to return transactions for
:type recipient: str, 0x-hex
:param chain_id: Numeric chain id to use to parse signed transaction data
:type chain_id: number
:raises ValueError: Status is finalized, sent or never attempted sent
:returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value
"""
session = SessionBase.bind_session(session)
q = session.query(Otx)
if status != None:
#if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status):
SessionBase.release_session(session)
raise ValueError('not a valid paused tx value: {}'.format(status))
q = q.filter(Otx.status.op('&')(status.value)==status.value)
q = q.join(TxCache)
else:
q = q.filter(Otx.status>StatusEnum.PENDING.value)
q = q.filter(not_(Otx.status.op('&')(StatusBits.IN_NETWORK.value)>0))
if sender != None:
q = q.filter(TxCache.sender==sender)
txs = {}
for r in q.all():
tx_signed_bytes = bytes.fromhex(r.signed_tx[2:])
tx = unpack(tx_signed_bytes, chain_id)
if sender == None or tx['from'] == sender:
#gas += tx['gas'] * tx['gasPrice']
txs[r.tx_hash] = r.signed_tx
SessionBase.release_session(session)
return txs
def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, session=None):
"""Retrieve transaction with a specific queue status.
:param status: Status to match transactions with
:type status: str
:param before: If set, return only transactions older than the timestamp
:type status: datetime.dateTime
:param limit: Limit amount of returned transactions
:type limit: number
:returns: Transactions
:rtype: list of cic_eth.db.models.otx.Otx
"""
txs = {}
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.join(TxCache)
# before = datetime.datetime.utcnow()
if before != None:
q = q.filter(TxCache.date_updated<before)
if exact:
q = q.filter(Otx.status==status)
else:
q = q.filter(Otx.status.op('&')(status)>0)
if not_status != None:
q = q.filter(Otx.status.op('&')(not_status)==0)
q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc())
i = 0
for o in q.all():
if limit > 0 and i == limit:
break
txs[o.tx_hash] = o.signed_tx
i += 1
SessionBase.release_session(session)
return txs
# TODO: move query to model
def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None):
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
Will omit addresses that have the LockEnum.SEND bit in Lock set.
(TODO) Will not return any rows if LockEnum.SEND bit in Lock is set for zero address.
:param status: Defines the status used to filter as upcoming.
:type status: cic_eth.db.enum.StatusEnum
:param recipient: Ethereum address of recipient to return transaction for
:type recipient: str, 0x-hex
:param before: Only return transactions if their modification date is older than the given timestamp
:type before: datetime.datetime
:param chain_id: Chain id to use to parse signed transaction data
:type chain_id: number
:raises ValueError: Status is finalized, sent or never attempted sent
:returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value
"""
session = SessionBase.bind_session(session)
q_outer = session.query(
TxCache.sender,
func.min(Otx.nonce).label('nonce'),
)
q_outer = q_outer.join(TxCache)
q_outer = q_outer.join(Lock, isouter=True)
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if not is_alive(status):
SessionBase.release_session(session)
raise ValueError('not a valid non-final tx value: {}'.format(status))
if status == StatusEnum.PENDING:
q_outer = q_outer.filter(Otx.status==status.value)
else:
q_outer = q_outer.filter(Otx.status.op('&')(status)==status)
if not_status != None:
q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0)
if recipient != None:
q_outer = q_outer.filter(TxCache.recipient==recipient)
q_outer = q_outer.group_by(TxCache.sender)
txs = {}
i = 0
for r in q_outer.all():
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==r.sender)
q = q.filter(Otx.nonce==r.nonce)
if before != None:
q = q.filter(TxCache.date_checked<before)
q = q.order_by(TxCache.date_created.desc())
o = q.first()
# TODO: audit; should this be possible if a row is found in the initial query? If not, at a minimum log error.
if o == None:
continue
tx_signed_bytes = bytes.fromhex(o.signed_tx[2:])
tx = unpack(tx_signed_bytes, chain_id)
txs[o.tx_hash] = o.signed_tx
q = session.query(TxCache)
q = q.filter(TxCache.otx_id==o.id)
o = q.first()
o.date_checked = datetime.datetime.now()
session.add(o)
session.commit()
i += 1
if limit > 0 and limit == i:
break
SessionBase.release_session(session)
return txs
@celery_app.task(base=CriticalSQLAlchemyTask)
def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None):
"""Returns all local queue transactions for a given Ethereum address
:param address: Ethereum address
:type address: str, 0x-hex
:param as_sender: If False, will omit transactions where address is sender
:type as_sender: bool
:param as_sender: If False, will omit transactions where address is recipient
:type as_sender: bool
:param counterpart: Only return transactions where this Ethereum address is the other end of the transaction (not in use)
:type counterpart: str, 0x-hex
:raises ValueError: If address is set to be neither sender nor recipient
:returns: Transactions
:rtype: dict, with transaction hash as key, signed raw transaction as value
"""
if not as_sender and not as_recipient:
raise ValueError('at least one of as_sender and as_recipient must be True')
txs = {}
session = SessionBase.create_session()
q = session.query(Otx)
q = q.join(TxCache)
if as_sender and as_recipient:
q = q.filter(or_(TxCache.sender==address, TxCache.recipient==address))
elif as_sender:
q = q.filter(TxCache.sender==address)
else:
q = q.filter(TxCache.recipient==address)
q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc())
results = q.all()
for r in results:
if txs.get(r.tx_hash) != None:
logg.debug('tx {} already recorded'.format(r.tx_hash))
continue
txs[r.tx_hash] = r.signed_tx
session.close()
return txs

View File

@ -34,7 +34,7 @@ class TxFilter(SyncFilter):
db_session.flush() db_session.flush()
SessionBase.release_session(db_session) SessionBase.release_session(db_session)
s = celery.signature( s = celery.signature(
'cic_eth.queue.tx.set_final_status', 'cic_eth.queue.state.set_final_status',
[ [
add_0x(tx_hash_hex), add_0x(tx_hash_hex),
tx.block.number, tx.block.number,

View File

@ -113,7 +113,7 @@ def sendfail_filter(w3, tx_hash, rcpt, chain_spec):
# ) # )
# s_retry_status = celery.signature( # s_retry_status = celery.signature(
# 'cic_eth.queue.tx.set_ready', # 'cic_eth.queue.state.set_ready',
# [], # [],
# queue=queue, # queue=queue,
# ) # )

View File

@ -14,6 +14,7 @@ import confini
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.eth.connection import EthUnixSignerConnection from chainlib.eth.connection import EthUnixSignerConnection
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx
# local imports # local imports
from cic_eth.eth import erc20 from cic_eth.eth import erc20
@ -21,14 +22,14 @@ from cic_eth.eth import tx
from cic_eth.eth import account from cic_eth.eth import account
from cic_eth.admin import debug from cic_eth.admin import debug
from cic_eth.admin import ctrl from cic_eth.admin import ctrl
from cic_eth.queue import tx from cic_eth.queue import state
from cic_eth.queue import query
from cic_eth.queue import balance from cic_eth.queue import balance
from cic_eth.callbacks import Callback from cic_eth.callbacks import Callback
from cic_eth.callbacks import http from cic_eth.callbacks import http
from cic_eth.callbacks import tcp from cic_eth.callbacks import tcp
from cic_eth.callbacks import redis from cic_eth.callbacks import redis
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.ext import tx from cic_eth.ext import tx
from cic_eth.registry import ( from cic_eth.registry import (

View File

@ -19,6 +19,7 @@ eth-address-index~=0.1.1a5
chainlib~=0.0.1a43 chainlib~=0.0.1a43
hexathon~=0.0.1a7 hexathon~=0.0.1a7
chainsyncer~=0.0.1a20 chainsyncer~=0.0.1a20
chainqueue~=0.0.1a1
pysha3==1.0.2 pysha3==1.0.2
coincurve==15.0.0 coincurve==15.0.0
sarafu-faucet~=0.0.2a15 sarafu-faucet~=0.0.2a15

View File

@ -2,12 +2,12 @@
import os import os
import logging import logging
# third-party imports # external imports
import pytest import pytest
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
# local imports # local imports
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.queue.balance import ( from cic_eth.queue.balance import (
balance_outgoing, balance_outgoing,
balance_incoming, balance_incoming,

View File

@ -1,76 +0,0 @@
# standard imports
import logging
# external imports
from chainlib.connection import RPCConnection
from chainlib.eth.gas import RPCGasOracle
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import Gas
# local imports
from cic_eth.queue.tx import get_status_tx
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.tx import cache_gas_data
from cic_eth.queue.tx import register_tx
from cic_eth.db.models.otx import Otx
logg = logging.getLogger()
def test_status_tx_list(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
(tx_hash_hex, o) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024)
r = rpc.do(o)
tx_signed_raw_hex = o['params'][0]
#queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec))
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
q = init_database.query(Otx)
otx = q.get(1)
otx.sendfail(session=init_database)
init_database.add(otx)
init_database.commit()
init_database.refresh(otx)
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 1
otx.sendfail(session=init_database)
otx.retry(session=init_database)
init_database.add(otx)
init_database.commit()
init_database.refresh(otx)
txs = get_status_tx(StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.QUEUED, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.LOCAL_ERROR, session=init_database)
assert len(txs) == 0
txs = get_status_tx(StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK, session=init_database)
assert len(txs) == 1
txs = get_status_tx(StatusBits.IN_NETWORK, session=init_database)
assert len(txs) == 0

View File

@ -1,22 +0,0 @@
# standard imports
import os
# local imports
from cic_eth.db.models.otx import Otx
from cic_eth.queue.tx import get_state_log
def test_otx_state_log(
init_database,
):
Otx.tracing = True
address = '0x' + os.urandom(20).hex()
tx_hash = '0x' + os.urandom(32).hex()
signed_tx = '0x' + os.urandom(128).hex()
otx = Otx.add(0, address, tx_hash, signed_tx, session=init_database)
init_database.commit()
log = get_state_log(tx_hash)
assert len(log) == 1

View File

@ -3,9 +3,9 @@ import os
# third-party imports # third-party imports
import pytest import pytest
from chainqueue.tx import create as queue_create
# local imports # local imports
from cic_eth.queue.tx import create as queue_create
from cic_eth.db.models.lock import Lock from cic_eth.db.models.lock import Lock
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import LockEnum
from cic_eth.error import LockedError from cic_eth.error import LockedError

View File

@ -1,577 +0,0 @@
# standard imports import logging
import datetime
import os
import logging
# external imports
import pytest
from sqlalchemy import DateTime
from chainlib.connection import RPCConnection
from chainlib.eth.nonce import OverrideNonceOracle
from chainlib.eth.tx import unpack
from chainlib.eth.gas import (
RPCGasOracle,
Gas,
)
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import strip_0x
# local imports
from cic_eth.eth.tx import cache_gas_data
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.otx import OtxSync
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.lock import Lock
from cic_eth.db.models.base import SessionBase
from cic_eth.db.enum import (
StatusEnum,
LockEnum,
StatusBits,
is_alive,
is_error_status,
status_str,
)
from cic_eth.queue.tx import create as queue_create
from cic_eth.queue.tx import set_final_status
from cic_eth.queue.tx import set_sent_status
from cic_eth.queue.tx import set_waitforgas
from cic_eth.queue.tx import set_ready
from cic_eth.queue.tx import get_paused_txs
from cic_eth.queue.tx import get_upcoming_tx
from cic_eth.queue.tx import get_account_tx
from cic_eth.queue.tx import get_tx
from cic_eth.db.error import TxStateChangeError
from cic_eth.queue.tx import register_tx
# test imports
from tests.util.nonce import StaticNonceOracle
logg = logging.getLogger()
def test_finalize(
default_chain_spec,
eth_rpc,
eth_signer,
init_database,
agent_roles,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = StaticNonceOracle(0)
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc = [
c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['BOB'], 400 * (10 ** 6)),
]
nonce_oracle = StaticNonceOracle(1)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc.append(c.create(agent_roles['ALICE'], agent_roles['BOB'], 500 * (10 ** 6)))
tx_hashes = []
i = 0
for entry in txs_rpc:
tx_hash_hex = entry[0]
tx_rpc = entry[1]
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_hashes.append(tx_hash_hex)
if i < 3:
set_sent_status(tx_hash_hex)
i += 1
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first()
assert otx.status & StatusBits.OBSOLETE
assert not is_alive(otx.status)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first()
assert otx.status & StatusBits.OBSOLETE
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first()
assert otx.status & StatusBits.OBSOLETE
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first()
assert otx.status == StatusEnum.PENDING
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[4]).first()
assert otx.status == StatusEnum.PENDING
set_sent_status(tx_hashes[3], False)
set_sent_status(tx_hashes[4], False)
set_final_status(tx_hashes[3], 1024)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first()
assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL)
assert not is_alive(otx.status)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first()
assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first()
assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first()
assert otx.status & (StatusBits.IN_NETWORK | StatusBits.FINAL)
assert not is_error_status(otx.status)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[4]).first()
assert otx.status & (StatusBits.IN_NETWORK | StatusBits.FINAL)
assert not is_error_status(otx.status)
def test_expired(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
):
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = StaticNonceOracle(42)
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc = [
c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)),
]
nonce_oracle = StaticNonceOracle(43)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc += [
c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['BOB'], 400 * (10 ** 6)),
]
nonce_oracle = StaticNonceOracle(44)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc.append(c.create(agent_roles['ALICE'], agent_roles['BOB'], 500 * (10 ** 6)))
tx_hashes = []
i = 0
for entry in txs_rpc:
tx_hash_hex = entry[0]
tx_rpc = entry[1]
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_hashes.append(tx_hash_hex)
set_sent_status(tx_hash_hex, False)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
fake_created = datetime.datetime.utcnow() - datetime.timedelta(seconds=40*i)
otx.date_created = fake_created
init_database.add(otx)
init_database.commit()
init_database.refresh(otx)
i += 1
now = datetime.datetime.utcnow()
delta = datetime.timedelta(seconds=61)
then = now - delta
otxs = OtxSync.get_expired(then)
nonce_acc = 0
for otx in otxs:
nonce_acc += otx.nonce
assert nonce_acc == (43 + 44)
def test_get_paused(
init_database,
default_chain_spec,
eth_rpc,
eth_signer,
agent_roles,
):
chain_id = default_chain_spec.chain_id()
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 42)
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc = [
c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)),
]
tx_hashes = []
for entry in txs_rpc:
tx_hash_hex = entry[0]
tx_rpc = entry[1]
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_hashes.append(tx_hash_hex)
txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id)
assert len(txs.keys()) == 0
q = init_database.query(Otx)
q = q.filter(Otx.tx_hash==tx_hashes[0])
r = q.first()
r.waitforgas(session=init_database)
init_database.add(r)
init_database.commit()
chain_id = default_chain_spec.chain_id()
txs = get_paused_txs(chain_id=chain_id)
assert len(txs.keys()) == 1
txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0])
assert len(txs.keys()) == 1
txs = get_paused_txs(status=StatusBits.GAS_ISSUES)
assert len(txs.keys()) == 1
txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id)
assert len(txs.keys()) == 1
q = init_database.query(Otx)
q = q.filter(Otx.tx_hash==tx_hashes[1])
o = q.first()
o.waitforgas(session=init_database)
init_database.add(o)
init_database.commit()
txs = get_paused_txs()
assert len(txs.keys()) == 2
txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0])
assert len(txs.keys()) == 2
txs = get_paused_txs(status=StatusBits.GAS_ISSUES, chain_id=chain_id)
assert len(txs.keys()) == 2
txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) # init_w3.eth.accounts[0])
assert len(txs.keys()) == 2
q = init_database.query(Otx)
q = q.filter(Otx.tx_hash==tx_hashes[1])
o = q.first()
o.sendfail(session=init_database)
init_database.add(o)
init_database.commit()
txs = get_paused_txs()
assert len(txs.keys()) == 2
txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0])
assert len(txs.keys()) == 2
txs = get_paused_txs(status=StatusBits.GAS_ISSUES, chain_id=chain_id)
txs = get_paused_txs(status=StatusEnum.WAITFORGAS, chain_id=chain_id)
assert len(txs.keys()) == 1
txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) # init_w3.eth.accounts[0])
assert len(txs.keys()) == 1
def test_get_upcoming(
default_chain_spec,
eth_rpc,
eth_signer,
init_database,
agent_roles,
):
chain_id = default_chain_spec.chain_id()
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = StaticNonceOracle(42)
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc = [
c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)),
c.create(agent_roles['BOB'], agent_roles['DAVE'], 200 * (10 ** 6)),
c.create(agent_roles['CAROL'], agent_roles['DAVE'], 300 * (10 ** 6)),
]
nonce_oracle = StaticNonceOracle(43)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc += [
c.create(agent_roles['ALICE'], agent_roles['DAVE'], 400 * (10 ** 6)),
c.create(agent_roles['BOB'], agent_roles['DAVE'], 500 * (10 ** 6)),
c.create(agent_roles['CAROL'], agent_roles['DAVE'], 600 * (10 ** 6)),
]
nonce_oracle = StaticNonceOracle(44)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc += [
c.create(agent_roles['ALICE'], agent_roles['DAVE'], 700 * (10 ** 6)),
]
tx_hashes = []
for entry in txs_rpc:
tx_hash_hex = entry[0]
tx_rpc = entry[1]
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_hashes.append(tx_hash_hex)
set_ready(tx_hash_hex)
txs = get_upcoming_tx(StatusBits.QUEUED, chain_id=chain_id)
assert len(txs.keys()) == 3
tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[0]])), chain_id)
assert tx['nonce'] == 42
tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[1]])), chain_id)
assert tx['nonce'] == 42
tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[2]])), chain_id)
assert tx['nonce'] == 42
q = init_database.query(TxCache)
q = q.filter(TxCache.sender==agent_roles['ALICE'])
for o in q.all():
o.date_checked -= datetime.timedelta(seconds=30)
init_database.add(o)
init_database.commit()
before = datetime.datetime.now() - datetime.timedelta(seconds=20)
logg.debug('before {}'.format(before))
txs = get_upcoming_tx(StatusBits.QUEUED, before=before)
logg.debug('txs {} {}'.format(txs.keys(), txs.values()))
assert len(txs.keys()) == 1
# Now date checked has been set to current time, and the check returns no results
txs = get_upcoming_tx(StatusBits.QUEUED, before=before)
logg.debug('txs {} {}'.format(txs.keys(), txs.values()))
assert len(txs.keys()) == 0
set_sent_status(tx_hashes[0])
txs = get_upcoming_tx(StatusBits.QUEUED)
assert len(txs.keys()) == 3
with pytest.raises(KeyError):
tx = txs[tx_hashes[0]]
tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[3]])), chain_id)
assert tx['nonce'] == 43
set_waitforgas(tx_hashes[1])
txs = get_upcoming_tx(StatusBits.QUEUED)
assert len(txs.keys()) == 3
with pytest.raises(KeyError):
tx = txs[tx_hashes[1]]
tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[3]])), chain_id)
assert tx['nonce'] == 43
txs = get_upcoming_tx(StatusBits.GAS_ISSUES)
assert len(txs.keys()) == 1
def test_upcoming_with_lock(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
):
chain_id = int(default_chain_spec.chain_id())
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = StaticNonceOracle(42)
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
(tx_hash_hex, tx_rpc) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6))
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id)
assert len(txs.keys()) == 1
Lock.set(str(default_chain_spec), LockEnum.SEND, address=agent_roles['ALICE'])
txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id)
assert len(txs.keys()) == 0
(tx_hash_hex, tx_rpc) = c.create(agent_roles['BOB'], agent_roles['ALICE'], 100 * (10 ** 6))
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id)
assert len(txs.keys()) == 1
def test_obsoletion(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
):
chain_id = default_chain_spec.chain_id()
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = StaticNonceOracle(42)
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc = [
c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['DAVE'], 200 * (10 ** 6)),
c.create(agent_roles['BOB'], agent_roles['DAVE'], 300 * (10 ** 6)),
]
nonce_oracle = StaticNonceOracle(43)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc += [
c.create(agent_roles['BOB'], agent_roles['DAVE'], 400 * (10 ** 6)),
]
tx_hashes = []
i = 0
for entry in txs_rpc:
tx_hash_hex = entry[0]
tx_rpc = entry[1]
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_hashes.append(tx_hash_hex)
if i < 2:
set_sent_status(tx_hash_hex)
i += 1
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.status.op('&')(StatusEnum.OBSOLETED.value)==StatusEnum.OBSOLETED.value)
z = 0
for o in q.all():
z += o.nonce
session.close()
assert z == 42
set_final_status(tx_hashes[1], 1023, True)
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.OBSOLETED.value)
zo = 0
for o in q.all():
zo += o.nonce
q = session.query(Otx)
q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.CANCELLED.value)
zc = 0
for o in q.all():
zc += o.nonce
session.close()
assert zo == 0
assert zc == 42
def test_retry(
init_database,
):
address = '0x' + os.urandom(20).hex()
tx_hash = '0x' + os.urandom(32).hex()
signed_tx = '0x' + os.urandom(128).hex()
otx = Otx(0, address, tx_hash, signed_tx)
init_database.add(otx)
init_database.commit()
set_sent_status(tx_hash, True)
set_ready(tx_hash)
q = init_database.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first()
assert (otx.status & StatusEnum.RETRY.value) == StatusEnum.RETRY.value
assert is_error_status(otx.status)
set_sent_status(tx_hash, False)
set_ready(tx_hash)
init_database.commit()
q = init_database.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first()
assert (otx.status & StatusEnum.RETRY.value) == StatusBits.QUEUED.value
assert not is_error_status(otx.status)
def test_get_account_tx(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
):
chain_id = default_chain_spec.chain_id()
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = OverrideNonceOracle(ZERO_ADDRESS, 42)
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc = [
c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['CAROL'], 200 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)),
c.create(agent_roles['BOB'], agent_roles['ALICE'], 300 * (10 ** 6)),
]
tx_hashes = []
for entry in txs_rpc:
tx_hash_hex = entry[0]
tx_rpc = entry[1]
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_hashes.append(tx_hash_hex)
txs = get_account_tx(agent_roles['ALICE'])
logg.debug('tx {} tx {}'.format(list(txs.keys()), tx_hashes))
assert list(txs.keys()) == tx_hashes
txs = get_account_tx(agent_roles['ALICE'], as_recipient=False)
assert list(txs.keys()) == tx_hashes[:3]
txs = get_account_tx(agent_roles['ALICE'], as_sender=False)
assert list(txs.keys()) == tx_hashes[3:]