From 3d659ace3a3798041930c764cc886890db745439 Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Thu, 26 Aug 2021 08:05:56 +0000 Subject: [PATCH] Consolidate adapter interface --- .gitignore | 4 + CHANGELOG | 4 + MANIFEST.in | 2 +- chainqueue/adapters/base.py | 101 ++++- chainqueue/data/config/config.ini | 9 + chainqueue/db/migrations/default/export.py | 33 ++ .../2215c497248b_transaction_cache.py | 24 +- .../3e43847c0717_otx_state_history.py | 18 +- .../versions/c537a0fd8466_outgoing_queue.py | 24 +- .../db/migrations/default/versions/src/otx.py | 21 + .../default/versions/src/otx_state_history.py | 15 + .../default/versions/src/tx_cache.py | 25 ++ chainqueue/db/migrations/sqlalchemy.py | 55 --- chainqueue/db/models/base.py | 1 + chainqueue/db/models/tx.py | 2 +- chainqueue/enum.py | 51 ++- chainqueue/error.py | 2 + chainqueue/fs/dir.py | 133 ------ chainqueue/fs/entry.py | 4 + chainqueue/fs/queue.py | 3 +- chainqueue/runnable/list.py | 144 +++++++ chainqueue/runnable/sql.py | 14 + chainqueue/sql/backend.py | 50 ++- chainqueue/sql/query.py | 171 +++++++- chainqueue/sql/state.py | 84 ++++ chainqueue/sql/tx.py | 66 ++- chainqueue/unittest/db.py | 55 +++ requirements.txt | 6 +- run_tests.sh | 13 + setup.cfg | 12 +- tests/base.py | 112 ----- tests/chainqueue_base.py | 85 ++++ tests/test_basic.py | 2 +- tests/test_fs.py | 4 +- tests/test_fs_entry.py | 10 +- tests/test_helo.py | 15 + tests/test_hexdir.py | 84 ---- tests/test_otx.py | 2 +- tests/test_otx_status_log.py | 2 +- tests/test_query.py | 386 ++++++++++++------ tests/test_tx_cache.py | 2 +- 41 files changed, 1190 insertions(+), 660 deletions(-) create mode 100644 chainqueue/data/config/config.ini create mode 100644 chainqueue/db/migrations/default/export.py create mode 100644 chainqueue/db/migrations/default/versions/src/otx.py create mode 100644 chainqueue/db/migrations/default/versions/src/otx_state_history.py create mode 100644 chainqueue/db/migrations/default/versions/src/tx_cache.py delete mode 100644 chainqueue/db/migrations/sqlalchemy.py delete mode 100644 chainqueue/fs/dir.py create mode 100644 chainqueue/runnable/list.py create mode 100644 chainqueue/runnable/sql.py create mode 100644 chainqueue/unittest/db.py create mode 100644 run_tests.sh delete mode 100644 tests/base.py create mode 100644 tests/chainqueue_base.py create mode 100644 tests/test_helo.py delete mode 100644 tests/test_hexdir.py diff --git a/.gitignore b/.gitignore index 4bde206..399d953 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ __pycache__ *.pyc *.sqlite +build/ +dist/ +*.html +*.egg-info/ diff --git a/CHANGELOG b/CHANGELOG index 4eb8173..290642b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,7 @@ +- 0.0.3 + * cli tool for listing queue by address + * ensure lowercase hex input in db + * exportable unittest utilities - 0.0.2 * add fs backend * add flexible data directory diff --git a/MANIFEST.in b/MANIFEST.in index 829a7e6..a31dbb8 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -include *requirements.txt LICENSE +include *requirements.txt LICENSE chainqueue/db/migrations/default/* chainqueue/db/migrations/default/versions/* chainqueue/db/migrations/default/versions/src/* chainqueue/data/config/* diff --git a/chainqueue/adapters/base.py b/chainqueue/adapters/base.py index c0d7b45..5bb3e00 100644 --- a/chainqueue/adapters/base.py +++ b/chainqueue/adapters/base.py @@ -1,13 +1,104 @@ +# standard imports +import datetime + +# local imports +from chainqueue.enum import StatusBits + class Adapter: + """Base class defining interface to be implemented by chainqueue adapters. - def __init__(self, backend): + The chainqueue adapter collects the following actions: + + - add: add a transaction to the queue + - upcoming: get queued transactions ready to be sent to network + - dispatch: send a queued transaction to the network + - translate: decode details of a transaction + - create_session, release_session: session management to control queue state integrity + + :param backend: Chainqueue backend + :type backend: TODO - abstract backend class. Must implement get, create_session, release_session + :param pending_retry_threshold: seconds delay before retrying a transaction stalled in the newtork + :type pending_retry_threshold: int + :param error_retry_threshold: seconds delay before retrying a transaction that incurred a recoverable error state + :type error_retry_threshold: int + """ + + def __init__(self, backend, pending_retry_threshold=0, error_retry_threshold=0): self.backend = backend + self.pending_retry_threshold = datetime.timedelta(pending_retry_threshold) + self.error_retry_threshold = datetime.timedelta(error_retry_threshold) - def process(self, chain_spec): - raise NotImplementedEror() + def add(self, bytecode, chain_spec, session=None): + """Add a transaction to the queue. + + :param bytecode: Transaction wire format bytecode, in hex + :type bytecode: str + :param chain_spec: Chain spec to use for transaction decode + :type chain_spec: chainlib.chain.ChainSpec + :param session: Backend state integrity session + :type session: varies + """ + raise NotImplementedError() - def add(self, chain_spec, bytecode): - raise NotImplementedEror() + def translate(self, bytecode, chain_spec): + """Decode details of a transaction. + + :param bytecode: Transaction wire format bytecode, in hex + :type bytecode: str + :param chain_spec: Chain spec to use for transaction decode + :type chain_spec: chainlib.chain.ChainSpec + """ + raise NotImplementedError() + + + def dispatch(self, chain_spec, rpc, tx_hash, signed_tx, session=None): + """Send a queued transaction to the network. + + :param chain_spec: Chain spec to use to identify the transaction network + :type chain_spec: chainlib.chain.ChainSpec + :param rpc: RPC connection to use for transaction send + :type rpc: chainlib.connection.RPCConnection + :param tx_hash: Transaction hash (checksum of transaction), in hex + :type tx_hash: str + :param signed_tx: Transaction wire format bytecode, in hex + :type signed_tx: str + :param session: Backend state integrity session + :type session: varies + """ + raise NotImplementedError() + + + def upcoming(self, chain_spec, session=None): + """Get queued transactions ready to be sent to the network. + + The transactions will be a combination of newly submitted transactions, previously sent but stalled transactions, and transactions that could temporarily not be submitted. + + :param chain_spec: Chain spec to use to identify the transaction network + :type chain_spec: chainlib.chain.ChainSpec + :param session: Backend state integrity session + :type session: varies + """ + raise NotImplementedError() + + + def create_session(self, session=None): + """Create a session context to guarantee atomic state change in backend. + + :param session: If specified, session will be used instead of creating a new one + :type session: varies + """ + return self.backend.create_session(session) + + + def release_session(self, session=None): + """Release a session context created by create_session. + + If session parameter is defined, final session destruction will be deferred to the initial provider of the session. In other words; if create_session was called with a session, release_session should symmetrically be called with the same session. + + :param session: Session context. + :type session: varies + """ + return self.backend.release_session(session) diff --git a/chainqueue/data/config/config.ini b/chainqueue/data/config/config.ini new file mode 100644 index 0000000..795c85c --- /dev/null +++ b/chainqueue/data/config/config.ini @@ -0,0 +1,9 @@ +[database] +name = +engine = +driver = +host = +port = +user = +password = +debug = 0 diff --git a/chainqueue/db/migrations/default/export.py b/chainqueue/db/migrations/default/export.py new file mode 100644 index 0000000..ba85958 --- /dev/null +++ b/chainqueue/db/migrations/default/export.py @@ -0,0 +1,33 @@ +from alembic import op +import sqlalchemy as sa + +from chainqueue.db.migrations.default.versions.src.otx import ( + upgrade as upgrade_otx, + downgrade as downgrade_otx, + ) +from chainqueue.db.migrations.default.versions.src.tx_cache import ( + upgrade as upgrade_tx_cache, + downgrade as downgrade_tx_cache, + ) +from chainqueue.db.migrations.default.versions.src.otx_state_history import ( + upgrade as upgrade_otx_state_history, + downgrade as downgrade_otx_state_history, + ) + +def chainqueue_upgrade(major=0, minor=0, patch=1): + r0_0_1_u() + + +def chainqueue_downgrade(major=0, minor=0, patch=1): + r0_0_1_d() + + +def r0_0_1_u(): + upgrade_otx() + upgrade_tx_cache() + upgrade_otx_state_history() + +def r0_0_1_d(): + downgrade_otx_state_history() + downgrade_tx_cache() + downgrade_otx() diff --git a/chainqueue/db/migrations/default/versions/2215c497248b_transaction_cache.py b/chainqueue/db/migrations/default/versions/2215c497248b_transaction_cache.py index 7e5a8b2..e8bcd6d 100644 --- a/chainqueue/db/migrations/default/versions/2215c497248b_transaction_cache.py +++ b/chainqueue/db/migrations/default/versions/2215c497248b_transaction_cache.py @@ -8,32 +8,10 @@ Create Date: 2021-04-02 10:09:11.923949 from alembic import op import sqlalchemy as sa - # revision identifiers, used by Alembic. revision = '2215c497248b' down_revision = 'c537a0fd8466' branch_labels = None depends_on = None - -def upgrade(): - op.create_table( - 'tx_cache', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime, nullable=False), - sa.Column('date_checked', sa.DateTime, nullable=False), - sa.Column('source_token_address', sa.String(42), nullable=False), - sa.Column('destination_token_address', sa.String(42), nullable=False), - sa.Column('sender', sa.String(42), nullable=False), - sa.Column('recipient', sa.String(42), nullable=False), - sa.Column('from_value', sa.NUMERIC(), nullable=False), - sa.Column('to_value', sa.NUMERIC(), nullable=True), -# sa.Column('block_number', sa.BIGINT(), nullable=True), - sa.Column('tx_index', sa.Integer, nullable=True), - ) - - -def downgrade(): - op.drop_table('tx_cache') +from chainqueue.db.migrations.default.versions.src.tx_cache import upgrade, downgrade diff --git a/chainqueue/db/migrations/default/versions/3e43847c0717_otx_state_history.py b/chainqueue/db/migrations/default/versions/3e43847c0717_otx_state_history.py index 4b3dea3..261c8ae 100644 --- a/chainqueue/db/migrations/default/versions/3e43847c0717_otx_state_history.py +++ b/chainqueue/db/migrations/default/versions/3e43847c0717_otx_state_history.py @@ -5,26 +5,10 @@ Revises: 2215c497248b Create Date: 2021-04-02 10:10:58.656139 """ -from alembic import op -import sqlalchemy as sa - - # revision identifiers, used by Alembic. revision = '3e43847c0717' down_revision = '2215c497248b' branch_labels = None depends_on = None - -def upgrade(): - op.create_table( - 'otx_state_log', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False), - sa.Column('date', sa.DateTime, nullable=False), - sa.Column('status', sa.Integer, nullable=False), - ) - - -def downgrade(): - op.drop_table('otx_state_log') +from chainqueue.db.migrations.default.versions.src.otx_state_history import upgrade, downgrade diff --git a/chainqueue/db/migrations/default/versions/c537a0fd8466_outgoing_queue.py b/chainqueue/db/migrations/default/versions/c537a0fd8466_outgoing_queue.py index f39b07a..75818e6 100644 --- a/chainqueue/db/migrations/default/versions/c537a0fd8466_outgoing_queue.py +++ b/chainqueue/db/migrations/default/versions/c537a0fd8466_outgoing_queue.py @@ -5,32 +5,10 @@ Revises: Create Date: 2021-04-02 10:04:27.092819 """ -from alembic import op -import sqlalchemy as sa - - # revision identifiers, used by Alembic. revision = 'c537a0fd8466' down_revision = None branch_labels = None depends_on = None - -def upgrade(): - op.create_table( - 'otx', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime, nullable=False), - sa.Column('nonce', sa.Integer, nullable=False), - sa.Column('tx_hash', sa.Text, nullable=False), - sa.Column('signed_tx', sa.Text, nullable=False), - sa.Column('status', sa.Integer, nullable=False, default=0), - sa.Column('block', sa.Integer), - ) - op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True) - - -def downgrade(): - op.drop_index('idx_otx_tx') - op.drop_table('otx') +from chainqueue.db.migrations.default.versions.src.otx import upgrade, downgrade diff --git a/chainqueue/db/migrations/default/versions/src/otx.py b/chainqueue/db/migrations/default/versions/src/otx.py new file mode 100644 index 0000000..67ca31f --- /dev/null +++ b/chainqueue/db/migrations/default/versions/src/otx.py @@ -0,0 +1,21 @@ +from alembic import op +import sqlalchemy as sa + +def upgrade(): + op.create_table( + 'otx', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('date_created', sa.DateTime, nullable=False), + sa.Column('date_updated', sa.DateTime, nullable=False), + sa.Column('nonce', sa.Integer, nullable=False), + sa.Column('tx_hash', sa.Text, nullable=False), + sa.Column('signed_tx', sa.Text, nullable=False), + sa.Column('status', sa.Integer, nullable=False, default=0), + sa.Column('block', sa.Integer), + ) + op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True) + + +def downgrade(): + op.drop_index('idx_otx_tx') + op.drop_table('otx') diff --git a/chainqueue/db/migrations/default/versions/src/otx_state_history.py b/chainqueue/db/migrations/default/versions/src/otx_state_history.py new file mode 100644 index 0000000..ff35d5c --- /dev/null +++ b/chainqueue/db/migrations/default/versions/src/otx_state_history.py @@ -0,0 +1,15 @@ +from alembic import op +import sqlalchemy as sa + +def upgrade(): + op.create_table( + 'otx_state_log', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False), + sa.Column('date', sa.DateTime, nullable=False), + sa.Column('status', sa.Integer, nullable=False), + ) + + +def downgrade(): + op.drop_table('otx_state_log') diff --git a/chainqueue/db/migrations/default/versions/src/tx_cache.py b/chainqueue/db/migrations/default/versions/src/tx_cache.py new file mode 100644 index 0000000..a986a3a --- /dev/null +++ b/chainqueue/db/migrations/default/versions/src/tx_cache.py @@ -0,0 +1,25 @@ +# external imports +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'tx_cache', + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True), + sa.Column('date_created', sa.DateTime, nullable=False), + sa.Column('date_updated', sa.DateTime, nullable=False), + sa.Column('date_checked', sa.DateTime, nullable=False), + sa.Column('source_token_address', sa.String(42), nullable=False), + sa.Column('destination_token_address', sa.String(42), nullable=False), + sa.Column('sender', sa.String(42), nullable=False), + sa.Column('recipient', sa.String(42), nullable=False), + sa.Column('from_value', sa.NUMERIC(), nullable=False), + sa.Column('to_value', sa.NUMERIC(), nullable=True), +# sa.Column('block_number', sa.BIGINT(), nullable=True), + sa.Column('tx_index', sa.Integer, nullable=True), + ) + +def downgrade(): + op.drop_table('tx_cache') diff --git a/chainqueue/db/migrations/sqlalchemy.py b/chainqueue/db/migrations/sqlalchemy.py deleted file mode 100644 index 640ad3c..0000000 --- a/chainqueue/db/migrations/sqlalchemy.py +++ /dev/null @@ -1,55 +0,0 @@ -from alembic import op -import sqlalchemy as sa - -def chainqueue_upgrade(major=0, minor=0, patch=1): - r0_0_1_u() - - -def chainqueue_downgrade(major=0, minor=0, patch=1): - r0_0_1_d() - - -def r0_0_1_u(): - op.create_table( - 'otx', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime, nullable=False), - sa.Column('nonce', sa.Integer, nullable=False), - sa.Column('tx_hash', sa.Text, nullable=False), - sa.Column('signed_tx', sa.Text, nullable=False), - sa.Column('status', sa.Integer, nullable=False, default=0), - sa.Column('block', sa.Integer), - ) - op.create_index('idx_otx_tx', 'otx', ['tx_hash'], unique=True) - - op.create_table( - 'otx_state_log', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=False), - sa.Column('date', sa.DateTime, nullable=False), - sa.Column('status', sa.Integer, nullable=False), - ) - - op.create_table( - 'tx_cache', - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('otx_id', sa.Integer, sa.ForeignKey('otx.id'), nullable=True), - sa.Column('date_created', sa.DateTime, nullable=False), - sa.Column('date_updated', sa.DateTime, nullable=False), - sa.Column('date_checked', sa.DateTime, nullable=False), - sa.Column('source_token_address', sa.String(42), nullable=False), - sa.Column('destination_token_address', sa.String(42), nullable=False), - sa.Column('sender', sa.String(42), nullable=False), - sa.Column('recipient', sa.String(42), nullable=False), - sa.Column('from_value', sa.NUMERIC(), nullable=False), - sa.Column('to_value', sa.NUMERIC(), nullable=True), - sa.Column('tx_index', sa.Integer, nullable=True), - ) - - -def r0_0_1_d(): - op.drop_table('tx_cache') - op.drop_table('otx_state_log') - op.drop_index('idx_otx_tx') - op.drop_table('otx') diff --git a/chainqueue/db/models/base.py b/chainqueue/db/models/base.py index fc3541c..b7d30ba 100644 --- a/chainqueue/db/models/base.py +++ b/chainqueue/db/models/base.py @@ -119,3 +119,4 @@ class SessionBase(Model): logg.debug('commit and destroy session {}'.format(session_key)) session.commit() session.close() + del SessionBase.localsessions[session_key] diff --git a/chainqueue/db/models/tx.py b/chainqueue/db/models/tx.py index 03908b5..549d826 100644 --- a/chainqueue/db/models/tx.py +++ b/chainqueue/db/models/tx.py @@ -71,7 +71,7 @@ class TxCache(SessionBase): Only manipulates object, does not transaction or commit to backend. """ - self.date_checked = datetime.datetime.now() + self.date_checked = datetime.datetime.utcnow() @staticmethod diff --git a/chainqueue/enum.py b/chainqueue/enum.py index 8146064..35e494c 100644 --- a/chainqueue/enum.py +++ b/chainqueue/enum.py @@ -26,32 +26,14 @@ class StatusBits(enum.IntEnum): @enum.unique class StatusEnum(enum.IntEnum): - """ - - - Inactive, not finalized. (<0) - * PENDING: The initial state of a newly added transaction record. No action has been performed on this transaction yet. - * SENDFAIL: The transaction was not received by the node. - * RETRY: The transaction is queued for a new send attempt after previously failing. - * READYSEND: The transaction is queued for its first send attempt - * OBSOLETED: A new transaction with the same nonce and higher gas has been sent to network. - * WAITFORGAS: The transaction is on hold pending gas funding. - - Active state: (==0) - * SENT: The transaction has been sent to the mempool. - - Inactive, finalized. (>0) - * FUBAR: Unknown error occurred and transaction is abandoned. Manual intervention needed. - * CANCELLED: The transaction was sent, but was not mined and has disappered from the mempool. This usually follows a transaction being obsoleted. - * OVERRIDDEN: Transaction has been manually overriden. - * REJECTED: The transaction was rejected by the node. - * REVERTED: The transaction was mined, but exception occurred during EVM execution. (Block number will be set) - * SUCCESS: THe transaction was successfully mined. (Block number will be set) - + """Semantic states intended for human consumption """ PENDING = 0 SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR RETRY = StatusBits.QUEUED | StatusBits.DEFERRED READYSEND = StatusBits.QUEUED - RESERVED = StatusBits.QUEUED | StatusBits.RESERVED + RESERVED = StatusBits.RESERVED OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK @@ -105,6 +87,15 @@ def status_str(v, bits_only=False): def status_bytes(status=0): + """Serialize a status bit field integer value to bytes. + + if invoked without an argument, it will return the serialization of an empty state. + + :param status: Status bit field + :type status: number + :returns: Serialized value + :rtype: bytes + """ return status.to_bytes(8, byteorder='big') @@ -116,6 +107,7 @@ def all_errors(): """ return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR +errors = all_errors() def is_error_status(v): """Check if value is an error state @@ -130,10 +122,24 @@ def is_error_status(v): __ignore_manual_value = ~StatusBits.MANUAL def ignore_manual(v): + """Reset the MANUAL bit from the given status + + :param v: Status bit field + :type v: number + :returns: Input state with manual bit removed + :rtype: number + """ return v & __ignore_manual_value def is_nascent(v): + """Check if state is the empty state + + :param v: Status bit field + :type v: number + :returns: True if empty + :rtype: bool + """ return ignore_manual(v) == StatusEnum.PENDING @@ -151,6 +157,9 @@ def is_alive(v): The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set). - :returns: + :param v: Status bit field + :type v: number + :returns: True if status is not yet finalized + :rtype: bool """ return bool(v & dead() == 0) diff --git a/chainqueue/error.py b/chainqueue/error.py index c333d74..c4b4d5e 100644 --- a/chainqueue/error.py +++ b/chainqueue/error.py @@ -1,4 +1,6 @@ class ChainQueueException(Exception): + """Base class for chainqueue related exceptions + """ pass diff --git a/chainqueue/fs/dir.py b/chainqueue/fs/dir.py deleted file mode 100644 index 5911f75..0000000 --- a/chainqueue/fs/dir.py +++ /dev/null @@ -1,133 +0,0 @@ -# standard imports -import os -import stat -import logging - -# external imports -from hexathon import valid as valid_hex - -logg = logging.getLogger(__name__) - - -class HexDir: - - def __init__(self, root_path, key_length, levels=2, prefix_length=0): - self.path = root_path - self.key_length = key_length - self.prefix_length = prefix_length - self.entry_length = key_length + prefix_length - self.__levels = levels + 2 - fi = None - try: - fi = os.stat(self.path) - self.__verify_directory() - except FileNotFoundError: - HexDir.__prepare_directory(self.path) - self.master_file = os.path.join(self.path, 'master') - - - @property - def levels(self): - return self.__levels - 2 - - - def add(self, key, content, prefix=b''): - l = len(key) - if l != self.key_length: - raise ValueError('expected key length {}, got {}'.format(self.key_length, l)) - l = len(prefix) - if l != self.prefix_length: - raise ValueError('expected prefix length {}, got {}'.format(self.prefix_length, l)) - if not isinstance(content, bytes): - raise ValueError('content must be bytes, got {}'.format(type(content).__name__)) - if prefix != None and not isinstance(prefix, bytes): - raise ValueError('prefix must be bytes, got {}'.format(type(content).__name__)) - key_hex = key.hex() - entry_path = self.to_filepath(key_hex) - - c = self.count() - - os.makedirs(os.path.dirname(entry_path), exist_ok=True) - f = open(entry_path, 'wb') - f.write(content) - f.close() - - f = open(self.master_file, 'ab') - if prefix != None: - f.write(prefix) - f.write(key) - f.close() - - logg.info('created new entry {} idx {} in {}'.format(key_hex, c, entry_path)) - - return c - - - def count(self): - fi = os.stat(self.master_file) - c = fi.st_size / self.entry_length - r = int(c) - if r != c: # TODO: verify valid for check if evenly divided - raise IndexError('master file not aligned') - return r - - - def __cursor(self, idx): - return idx * (self.prefix_length + self.key_length) - - - def set_prefix(self, idx, prefix): - l = len(prefix) - if l != self.prefix_length: - raise ValueError('expected prefix length {}, got {}'.format(self.prefix_length, l)) - if not isinstance(prefix, bytes): - raise ValueError('prefix must be bytes, got {}'.format(type(content).__name__)) - cursor = self.__cursor(idx) - f = open(self.master_file, 'rb+') - f.seek(cursor) - f.write(prefix) - f.close() - - - def get(self, idx): - cursor = self.__cursor(idx) - f = open(self.master_file, 'rb') - f.seek(cursor) - prefix = f.read(self.prefix_length) - key = f.read(self.key_length) - f.close() - return (prefix, key) - - def to_subpath(self, hx): - lead = '' - for i in range(0, self.__levels, 2): - lead += hx[i:i+2] + '/' - return lead.upper() - - - def to_dirpath(self, hx): - sub_path = self.to_subpath(hx) - return os.path.join(self.path, sub_path) - - - def to_filepath(self, hx): - dir_path = self.to_dirpath(hx) - file_path = os.path.join(dir_path, hx.upper()) - return file_path - - - def __verify_directory(self): - #if not stat.S_ISDIR(fi.st_mode): - # raise ValueError('{} is not a directory'.format(self.path)) - f = opendir(self.path) - f.close() - return True - - - @staticmethod - def __prepare_directory(path): - os.makedirs(path, exist_ok=True) - state_file = os.path.join(path, 'master') - f = open(state_file, 'w') - f.close() - diff --git a/chainqueue/fs/entry.py b/chainqueue/fs/entry.py index 5f81895..afe6b2a 100644 --- a/chainqueue/fs/entry.py +++ b/chainqueue/fs/entry.py @@ -1,5 +1,6 @@ # standard imports import datetime +import logging # external imports from hexathon import strip_0x @@ -7,6 +8,8 @@ from hexathon import strip_0x # local imports from chainqueue.enum import StatusEnum +logg = logging.getLogger(__name__) + class DefaultApplier: @@ -22,6 +25,7 @@ class Entry: self.signed_tx = strip_0x(signed_tx) self.status = StatusEnum.PENDING + self.applier = applier self.applier.add(bytes.fromhex(tx_hash), bytes.fromhex(signed_tx)) diff --git a/chainqueue/fs/queue.py b/chainqueue/fs/queue.py index 8daf9a8..bf6d530 100644 --- a/chainqueue/fs/queue.py +++ b/chainqueue/fs/queue.py @@ -14,7 +14,6 @@ logg = logging.getLogger(__name__) class FsQueueBackend: - def add(self, label, content, prefix): raise NotImplementedError() @@ -52,7 +51,7 @@ class FsQueue: def add(self, key, value): prefix = status_bytes() - c = self.backend.add(key, value, prefix=prefix) + (c, entry_location) = self.backend.add(key, value, prefix=prefix) key_hex = key.hex() entry_path = os.path.join(self.index_path, key_hex) diff --git a/chainqueue/runnable/list.py b/chainqueue/runnable/list.py new file mode 100644 index 0000000..fe9d824 --- /dev/null +++ b/chainqueue/runnable/list.py @@ -0,0 +1,144 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# standard imports +import os +import logging +import sys + +# external imports +from hexathon import add_0x +import chainlib.cli +from chainlib.chain import ChainSpec +from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer + +# local imports +from chainqueue.enum import ( + StatusBits, + all_errors, + is_alive, + is_error_status, + status_str, + ) + + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.dirname(os.path.realpath(__file__)) +config_dir = os.path.join(script_dir, '..', 'data', 'config') + +arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC +argparser = chainlib.cli.ArgumentParser(arg_flags) +argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")') +argparser.add_argument('--start', type=str, help='Oldest transaction hash to include in results') +argparser.add_argument('--end', type=str, help='Newest transaction hash to include in results') +argparser.add_argument('--error', action='store_true', help='Only show transactions which have error state') +argparser.add_argument('--pending', action='store_true', help='Omit finalized transactions') +argparser.add_argument('--status-mask', type=int, dest='status_mask', help='Manually specify status bitmask value to match (overrides --error and --pending)') +argparser.add_argument('--summary', action='store_true', help='output summary for each status category') +argparser.add_positional('address', type=str, help='Ethereum address of recipient') +args = argparser.parse_args() +extra_args = { + 'address': None, + 'backend': None, + 'start': None, + 'end': None, + 'error': None, + 'pending': None, + 'status_mask': None, + 'summary': None, + } +config = chainlib.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + +status_mask = config.get('_STATUS_MASK', None) +not_status_mask = None +if status_mask == None: + if config.get('_ERROR'): + status_mask = all_errors() + if config.get('_PENDING'): + not_status_mask = StatusBits.FINAL + +tx_getter = None +session_method = None +if config.get('_BACKEND') == 'sql': + from chainqueue.sql.query import get_account_tx as tx_lister + from chainqueue.sql.query import get_tx_cache as tx_getter + from chainqueue.runnable.sql import setup_backend + from chainqueue.db.models.base import SessionBase + setup_backend(config, debug=config.true('DATABASE_DEBUG')) + session_method = SessionBase.create_session +else: + raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND'))) + + +class Outputter: + + def __init__(self, chain_spec, writer, session_method=None): + self.writer = writer + self.chain_spec = chain_spec + self.chain_spec_str = str(chain_spec) + self.session = None + if session_method != None: + self.session = session_method() + self.results = { + 'pending_error': 0, + 'final_error': 0, + 'pending': 0, + 'final': 0, + } + + def __del__(self): + if self.session != None: + self.session.close() + + + def add(self, tx_hash): + tx = tx_getter(self.chain_spec, tx_hash, session=self.session) + category = None + if is_alive(tx['status_code']): + category = 'pending' + else: + category = 'final' + self.results[category] += 1 + if is_error_status(tx['status_code']): + logg.debug('registered {} as {} with error'.format(tx_hash, category)) + self.results[category + '_error'] += 1 + else: + logg.debug('registered {} as {}'.format(tx_hash, category)) + + + def decode_summary(self): + self.writer.write('pending\t{}\t{}\n'.format(self.results['pending'], self.results['pending_error'])) + self.writer.write('final\t{}\t{}\n'.format(self.results['final'], self.results['final_error'])) + self.writer.write('total\t{}\t{}\n'.format(self.results['final'] + self.results['pending'], self.results['final_error'] + self.results['pending_error'])) + + + def decode_single(self, tx_hash): + tx = tx_getter(self.chain_spec, tx_hash, session=self.session) + status = tx['status'] + if config.get('_RAW'): + status = status_str(tx['status_code'], bits_only=True) + self.writer.write('{}\t{}\t{}\t{}\n'.format(self.chain_spec_str, add_0x(tx_hash), status, tx['status_code'])) + + +def main(): + since = config.get('_START', None) + if since != None: + since = add_0x(since) + until = config.get('_END', None) + if until != None: + until = add_0x(until) + txs = tx_lister(chain_spec, config.get('_ADDRESS'), since=since, until=until, status=status_mask, not_status=not_status_mask) + outputter = Outputter(chain_spec, sys.stdout, session_method=session_method) + if config.get('_SUMMARY'): + for k in txs.keys(): + outputter.add(k) + outputter.decode_summary() + else: + for k in txs.keys(): + outputter.decode_single(k) + +if __name__ == '__main__': + main() diff --git a/chainqueue/runnable/sql.py b/chainqueue/runnable/sql.py new file mode 100644 index 0000000..5ea4bdc --- /dev/null +++ b/chainqueue/runnable/sql.py @@ -0,0 +1,14 @@ +# standard imports +import logging + +# local imports +from chainqueue.db.models.base import SessionBase +from chainqueue.db import dsn_from_config + +logg = logging.getLogger(__name__) + + +def setup_backend(config, debug=False): + dsn = dsn_from_config(config) + logg.debug('dsn {}'.format(dsn)) + SessionBase.connect(dsn, debug=debug) diff --git a/chainqueue/sql/backend.py b/chainqueue/sql/backend.py index 581aa4a..16a09a5 100644 --- a/chainqueue/sql/backend.py +++ b/chainqueue/sql/backend.py @@ -1,22 +1,33 @@ # standard imports import logging +import urllib.error # external imports from sqlalchemy.exc import ( IntegrityError, ) from chainlib.error import JSONRPCException +from hexathon import ( + add_0x, + strip_0x, + uniform as hex_uniform, + ) # local imports from chainqueue.sql.tx import create as queue_create from chainqueue.db.models.base import SessionBase from chainqueue.db.models.tx import TxCache -from chainqueue.sql.query import get_upcoming_tx +from chainqueue.sql.query import ( + get_upcoming_tx, + get_tx as backend_get_tx, + ) from chainqueue.sql.state import ( set_ready, set_reserved, set_sent, + set_fubar, ) +from chainqueue.sql.tx import cache_tx_dict logg = logging.getLogger(__name__) @@ -38,15 +49,20 @@ class SQLBackend: def cache(self, tx, session=None): - txc = TxCache(tx['hash'], tx['from'], tx['to'], tx['source_token'], tx['destination_token'], tx['from_value'], tx['to_value'], session=session) - session.add(txc) - session.flush() - logg.debug('cache {}'.format(txc)) + (tx, txc_id) = cache_tx_dict(tx, session=session) + logg.debug('cached {} db insert id {}'.format(tx, txc_id)) return 0 - def get(self, chain_spec, typ, decoder): - txs = get_upcoming_tx(chain_spec, typ, decoder=decoder) + def get_tx(self, chain_spec, tx_hash, session=None): + return backend_get_tx(chain_spec, tx_hash, session=session) + + + def get(self, chain_spec, decoder, session=None, requeue=False, *args, **kwargs): + txs = get_upcoming_tx(chain_spec, status=kwargs.get('status'), decoder=decoder, not_status=kwargs.get('not_status', 0), recipient=kwargs.get('recipient'), before=kwargs.get('before'), limit=kwargs.get('limit', 0)) + if requeue: + for tx_hash in txs.keys(): + set_ready(chain_spec, tx_hash, session=session) return txs @@ -57,15 +73,25 @@ class SQLBackend: try: rpc.do(payload) r = 0 - except ConnectionError: + except ConnectionError as e: + logg.error('dispatch {} connection error {}'.format(tx_hash, e)) + fail = True + except urllib.error.URLError as e: + logg.error('dispatch {} urllib error {}'.format(tx_hash, e)) fail = True except JSONRPCException as e: - logg.error('error! {}'.format(e)) + logg.exception('error! {}'.format(e)) + set_fubar(chain_spec, tx_hash, session=session) + raise e - logg.debug('foo') set_sent(chain_spec, tx_hash, fail=fail, session=session) + return r - def create_session(self): - return SessionBase.create_session() + def create_session(self, session=None): + return SessionBase.bind_session(session=session) + + + def release_session(self, session): + return SessionBase.release_session(session=session) diff --git a/chainqueue/sql/query.py b/chainqueue/sql/query.py index 478899a..35ddaff 100644 --- a/chainqueue/sql/query.py +++ b/chainqueue/sql/query.py @@ -11,6 +11,7 @@ from sqlalchemy import func from hexathon import ( add_0x, strip_0x, + uniform as hex_uniform, ) # local imports @@ -26,16 +27,21 @@ from chainqueue.db.enum import ( ) from chainqueue.error import ( NotLocalTxError, + CacheIntegrityError, ) -logg = logging.getLogger().getChild(__name__) +logg = logging.getLogger(__name__) def get_tx_cache(chain_spec, tx_hash, session=None): """Returns an aggregate dictionary of outgoing transaction data and metadata + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. :returns: Transaction data :rtype: dict @@ -81,8 +87,12 @@ def get_tx_cache(chain_spec, tx_hash, session=None): def get_tx(chain_spec, tx_hash, session=None): """Retrieve a transaction queue record by transaction hash + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. :returns: nonce, address and signed_tx (raw signed transaction) :rtype: dict @@ -107,26 +117,35 @@ def get_tx(chain_spec, tx_hash, session=None): def get_nonce_tx_cache(chain_spec, nonce, sender, decoder=None, session=None): """Retrieve all transactions for address with specified nonce + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param nonce: Nonce :type nonce: number - :param address: Ethereum address - :type address: str, 0x-hex + :param sender: Ethereum address + :type sender: str, 0x-hex + :param decoder: Transaction decoder + :type decoder: TODO - define transaction decoder + :param session: Backend state integrity session + :type session: varies :returns: Transactions :rtype: dict, with transaction hash as key, signed raw transaction as value """ + sender = add_0x(hex_uniform(strip_0x(sender))) + session = SessionBase.bind_session(session) q = session.query(Otx) q = q.join(TxCache) q = q.filter(TxCache.sender==sender) q = q.filter(Otx.nonce==nonce) - + txs = {} for r in q.all(): tx_signed_bytes = bytes.fromhex(r.signed_tx) if decoder != None: tx = decoder(tx_signed_bytes, chain_spec) - if sender != None and tx['from'] != sender: - raise IntegrityError('Cache sender {} does not match sender in tx {} using decoder {}'.format(sender, r.tx_hash, str(decoder))) + tx_from = add_0x(hex_uniform(strip_0x(tx['from']))) + if sender != None and tx_from != sender: + raise CacheIntegrityError('Cache sender {} does not match sender {} in tx {} using decoder {}'.format(sender, tx_from, r.tx_hash, str(decoder))) txs[r.tx_hash] = r.signed_tx SessionBase.release_session(session) @@ -137,12 +156,18 @@ def get_nonce_tx_cache(chain_spec, nonce, sender, decoder=None, session=None): def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, decoder=None): """Returns not finalized transactions that have been attempted sent without success. + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param status: If set, will return transactions with this local queue status only :type status: cic_eth.db.enum.StatusEnum :param recipient: Recipient address to return transactions for :type recipient: str, 0x-hex :param chain_id: Numeric chain id to use to parse signed transaction data :type chain_id: number + :param decoder: Transaction decoder + :type decoder: TODO - define transaction decoder + :param session: Backend state integrity session + :type session: varies :raises ValueError: Status is finalized, sent or never attempted sent :returns: Transactions :rtype: dict, with transaction hash as key, signed raw transaction as value @@ -161,6 +186,7 @@ def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, deco q = q.filter(not_(Otx.status.op('&')(StatusBits.IN_NETWORK.value)>0)) if sender != None: + sender = add_0x(hex_uniform(strip_0x(sender))) q = q.filter(TxCache.sender==sender) txs = {} @@ -170,8 +196,9 @@ def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, deco tx_signed_bytes = bytes.fromhex(r.signed_tx) if decoder != None: tx = decoder(tx_signed_bytes, chain_spec) - if sender != None and tx['from'] != sender: - raise IntegrityError('Cache sender {} does not match sender in tx {} using decoder {}'.format(sender, r.tx_hash, str(decoder))) + tx_from = add_0x(hex_uniform(strip_0x(tx['from']))) + if sender != None and tx_from != sender: + raise CacheIntegrityError('Cache sender {} does not match sender {} in tx {} using decoder {}'.format(sender, tx_from, r.tx_hash, str(decoder))) gas += tx['gas'] * tx['gasPrice'] txs[r.tx_hash] = r.signed_tx @@ -184,12 +211,20 @@ def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, deco def get_status_tx_cache(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None, decoder=None): """Retrieve transaction with a specific queue status. + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param status: Status to match transactions with :type status: str :param before: If set, return only transactions older than the timestamp - :type status: datetime.dateTime + :type before: datetime.dateTime + :param exact: If set, will match exact status value. If not set, will match any of the status bits set + :type exact: bool :param limit: Limit amount of returned transactions :type limit: number + :param decoder: Transaction decoder + :type decoder: TODO - define transaction decoder + :param session: Backend state integrity session + :type session: varies :returns: Transactions :rtype: list of cic_eth.db.models.otx.Otx """ @@ -223,14 +258,22 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re (TODO) Will not return any rows if LockEnum.SEND bit in Lock is set for zero address. + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param status: Defines the status used to filter as upcoming. :type status: cic_eth.db.enum.StatusEnum + :param not_status: Invalidates any matches matching one of the given bits + :type not_status: cic_eth.db.enum.StatusEnum :param recipient: Ethereum address of recipient to return transaction for :type recipient: str, 0x-hex :param before: Only return transactions if their modification date is older than the given timestamp :type before: datetime.datetime - :param chain_id: Chain id to use to parse signed transaction data - :type chain_id: number + :param limit: Limit amount of returned transactions + :type limit: number + :param decoder: Transaction decoder + :type decoder: TODO - define transaction decoder + :param session: Backend state integrity session + :type session: varies :raises ValueError: Status is finalized, sent or never attempted sent :returns: Transactions :rtype: dict, with transaction hash as key, signed raw transaction as value @@ -285,7 +328,7 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re q = q.filter(TxCache.otx_id==o.id) o = q.first() - o.date_checked = datetime.datetime.now() + o.date_checked = datetime.datetime.utcnow() session.add(o) session.commit() @@ -298,17 +341,68 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re return txs -def get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None, session=None): +def sql_range_filter(session, criteria=None): + """Convert an arbitrary type to a sql query range + + :param session: Backend state integrity session + :type session: varies + :param criteria: Range criteria + :type criteria: any + :raises NotLocalTxError: If criteria is string, transaction hash does not exist in backend + :rtype: tuple + :returns: type string identifier, value + """ + boundary = None + + if criteria == None: + return None + + if isinstance(criteria, str): + q = session.query(Otx) + q = q.filter(Otx.tx_hash==strip_0x(criteria)) + r = q.first() + if r == None: + raise NotLocalTxError('unknown tx hash as bound criteria specified: {}'.format(criteria)) + boundary = ('id', r.id,) + elif isinstance(criteria, int): + boundary = ('id', criteria,) + elif isinstance(criteria, datetime.datetime): + boundary = ('date', criteria,) + + return boundary + + +def get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None, since=None, until=None, status=None, not_status=None, status_target=None, session=None): """Returns all local queue transactions for a given Ethereum address + The since parameter effect depends on its type. Results are returned inclusive of the given parameter condition. + + * str - transaction hash; all transactions added after the given hash + * int - all transactions after the given db insert id + * datetime.datetime - all transactions added since the given datetime + + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param address: Ethereum address :type address: str, 0x-hex :param as_sender: If False, will omit transactions where address is sender :type as_sender: bool - :param as_sender: If False, will omit transactions where address is recipient - :type as_sender: bool + :param as_recipient: If False, will omit transactions where address is recipient + :type as_recipient: bool :param counterpart: Only return transactions where this Ethereum address is the other end of the transaction (not in use) :type counterpart: str, 0x-hex + :param since: Only include transactions submitted before this datetime + :type since: datetime + :param until: Only include transactions submitted before this datetime + :type until: datetime + :param status: Only include transactions where the given status bits are set + :type status: chainqueue.enum.StatusEnum + :param not_status: Only include transactions where the given status bits are not set + :type not_status: chainqueue.enum.StatusEnum + :param status_target: Only include transaction where the status argument is exact match + :type status_target: chainqueue.enum.StatusEnum + :param session: Backend state integrity session + :type session: varies :raises ValueError: If address is set to be neither sender nor recipient :returns: Transactions :rtype: dict, with transaction hash as key, signed raw transaction as value @@ -319,6 +413,15 @@ def get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, count txs = {} session = SessionBase.bind_session(session) + address = add_0x(hex_uniform(strip_0x(address))) + + try: + filter_offset = sql_range_filter(session, criteria=since) + filter_limit = sql_range_filter(session, criteria=until) + except NotLocalTxError as e: + logg.error('query build failed: {}'.format(e)) + return {} + q = session.query(Otx) q = q.join(TxCache) if as_sender and as_recipient: @@ -327,7 +430,28 @@ def get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, count q = q.filter(TxCache.sender==address) else: q = q.filter(TxCache.recipient==address) - q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc()) + + if filter_offset != None: + if filter_offset[0] == 'id': + q = q.filter(Otx.id>=filter_offset[1]) + elif filter_offset[0] == 'date': + q = q.filter(Otx.date_created>=filter_offset[1]) + + if filter_limit != None: + if filter_limit[0] == 'id': + q = q.filter(Otx.id<=filter_limit[1]) + elif filter_limit[0] == 'date': + q = q.filter(Otx.date_created<=filter_limit[1]) + + if status != None: + if status_target == None: + status_target = status + q = q.filter(Otx.status.op('&')(status)==status_target) + + if not_status != None: + q = q.filter(Otx.status.op('&')(not_status)==0) + + q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc()) results = q.all() for r in results: @@ -342,6 +466,21 @@ def get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, count def count_tx(chain_spec, address=None, status=None, status_target=None, session=None): + """ + + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec + :param address: Address to count transactions for + :type address: str + :param status: Status to count transactions for + :type status: chainqueue.enum.StatusEnum + :param status_target: If set, will match status argument exactly against the given value + :type status_target: chainqueue.enum.StatusEnum + :param session: Backend state integrity session + :type session: varies + :rtype: int + :returns: Transaction count + """ session = SessionBase.bind_session(session) q = session.query(Otx.id) q = q.join(TxCache) diff --git a/chainqueue/sql/state.py b/chainqueue/sql/state.py index c0646df..8743854 100644 --- a/chainqueue/sql/state.py +++ b/chainqueue/sql/state.py @@ -25,10 +25,14 @@ logg = logging.getLogger().getChild(__name__) def set_sent(chain_spec, tx_hash, fail=False, session=None): """Used to set the status after a send attempt + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex :param fail: if True, will set a SENDFAIL status, otherwise a SENT status. (Default: False) :type fail: boolean + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. :returns: True if tx is known, False otherwise :rtype: boolean @@ -63,13 +67,21 @@ def set_sent(chain_spec, tx_hash, fail=False, session=None): def set_final(chain_spec, tx_hash, block=None, tx_index=None, fail=False, session=None): """Used to set the status of an incoming transaction result. + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex :param block: Block number if final status represents a confirmation on the network :type block: number + :param tx_index: Transaction index if final status represents a confirmation on the network + :type tx_index: number :param fail: if True, will set a SUCCESS status, otherwise a REVERTED status. (Default: False) :type fail: boolean + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. + :rtype: str + :returns: Transaction hash, in hex """ session = SessionBase.bind_session(session) o = Otx.load(tx_hash, session=session) @@ -112,11 +124,17 @@ def set_cancel(chain_spec, tx_hash, manual=False, session=None): Will set the state to CANCELLED or OVERRIDDEN + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex :param manual: If set, status will be OVERRIDDEN. Otherwise CANCELLED. :type manual: boolean + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. + :rtype: str + :returns: Transaction hash, in hex """ session = SessionBase.bind_session(session) o = Otx.load(tx_hash, session=session) @@ -146,9 +164,15 @@ def set_rejected(chain_spec, tx_hash, session=None): Will set the state to REJECTED + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. + :rtype: str + :returns: Transaction hash, in hex """ session = SessionBase.bind_session(session) @@ -171,9 +195,15 @@ def set_fubar(chain_spec, tx_hash, session=None): Will set the state to FUBAR + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. + :rtype: str + :returns: Transaction hash, in hex """ session = SessionBase.bind_session(session) @@ -196,9 +226,15 @@ def set_manual(chain_spec, tx_hash, session=None): Will set the state to MANUAL + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. + :rtype: str + :returns: Transaction hash, in hex """ session = SessionBase.bind_session(session) @@ -219,9 +255,15 @@ def set_manual(chain_spec, tx_hash, session=None): def set_ready(chain_spec, tx_hash, session=None): """Used to mark a transaction as ready to be sent to network + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. + :rtype: str + :returns: Transaction hash, in hex """ session = SessionBase.bind_session(session) o = Otx.load(tx_hash, session=session) @@ -242,6 +284,17 @@ def set_ready(chain_spec, tx_hash, session=None): def set_reserved(chain_spec, tx_hash, session=None): + """Used to mark a transaction as reserved by a worker for processing. + + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :param session: Backend state integrity session + :type session: varies + :raises NotLocalTxError: If transaction not found in queue. + """ + session = SessionBase.bind_session(session) o = Otx.load(tx_hash, session=session) if o == None: @@ -263,9 +316,15 @@ def set_waitforgas(chain_spec, tx_hash, session=None): Will set the state to WAITFORGAS + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec :param tx_hash: Transaction hash of record to modify :type tx_hash: str, 0x-hex + :param session: Backend state integrity session + :type session: varies :raises NotLocalTxError: If transaction not found in queue. + :rtype: str + :returns: Transaction hash, in hex """ session = SessionBase.bind_session(session) o = Otx.load(tx_hash, session=session) @@ -283,7 +342,18 @@ def set_waitforgas(chain_spec, tx_hash, session=None): def get_state_log(chain_spec, tx_hash, session=None): + """If state log is activated, retrieves all state log changes for the given transaction. + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :param session: Backend state integrity session + :type session: varies + :raises NotLocalTxError: If transaction not found in queue. + :rtype: list + :returns: Log items + """ logs = [] session = SessionBase.bind_session(session) @@ -302,6 +372,20 @@ def get_state_log(chain_spec, tx_hash, session=None): def obsolete_by_cache(chain_spec, tx_hash, final, session=None): + """Explicitly obsolete single transaction by transaction with same nonce. + + :param chain_spec: Chain spec for transaction network + :type chain_spec: chainlib.chain.ChainSpec + :param tx_hash: Transaction hash of record to modify, in hex + :type tx_hash: str + :param final: Transaction hash superseding record, in hex + :type tx_hash: str + :param session: Backend state integrity session + :type session: varies + :raises TxStateChangeError: Transaction is not obsoletable + :rtype: str + :returns: Transaction hash, in hex + """ session = SessionBase.bind_session(session) q = session.query( diff --git a/chainqueue/sql/tx.py b/chainqueue/sql/tx.py index 22a8716..8cd73c1 100644 --- a/chainqueue/sql/tx.py +++ b/chainqueue/sql/tx.py @@ -1,5 +1,13 @@ # standard imports import logging +import copy + +# external imports +from hexathon import ( + uniform as hex_uniform, + add_0x, + strip_0x, + ) # local imports from chainqueue.db.models.otx import Otx @@ -16,6 +24,8 @@ logg = logging.getLogger().getChild(__name__) def create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None): """Create a new transaction queue record. + :param chain_spec: Chain spec of transaction network + :type chain_spec: chainlib.chain.ChainSpec :param nonce: Transaction nonce :type nonce: int :param holder_address: Sender address @@ -24,13 +34,18 @@ def create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_prede :type tx_hash: str, 0x-hex :param signed_tx: Signed raw transaction :type signed_tx: str, 0x-hex - :param chain_spec: Chain spec to create transaction for - :type chain_spec: ChainSpec - :returns: transaction hash - :rtype: str, 0x-hash + :param obsolete_predecessors: If true will mark all other transactions with the same nonce as obsolete (should not be retried) + :type obsolete_predecessors: bool + :param session: Backend state integrity session + :type session: varies + :returns: transaction hash, in hex + :rtype: str """ session = SessionBase.bind_session(session) + holder_address = holder_address.lower() + tx_hash = tx_hash.lower() + signed_tx = signed_tx.lower() o = Otx.add( nonce=nonce, tx_hash=tx_hash, @@ -65,3 +80,46 @@ def create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_prede SessionBase.release_session(session) logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash)) return tx_hash + + +def cache_tx_dict(tx_dict, session=None): + """Add a transaction cache entry to backend. + + :param tx_dict: Transaction cache details + :type tx_dict: dict + :param session: Backend state integrity session + :type session: varies + :rtype: tuple + :returns: original transaction, backend insertion id + """ + session = SessionBase.bind_session(session) + + ntx = copy.copy(tx_dict) + for k in [ + 'hash', + 'from', + 'to', + 'source_token', + 'destination_token', + ]: + ntx[k] = add_0x(hex_uniform(strip_0x(ntx[k]))) + + txc = TxCache( + ntx['hash'], + ntx['from'], + ntx['to'], + ntx['source_token'], + ntx['destination_token'], + ntx['from_value'], + ntx['to_value'], + session=session + ) + + session.add(txc) + session.commit() + + insert_id = txc.id + + SessionBase.release_session(session) + + return (ntx, insert_id) diff --git a/chainqueue/unittest/db.py b/chainqueue/unittest/db.py new file mode 100644 index 0000000..f89033f --- /dev/null +++ b/chainqueue/unittest/db.py @@ -0,0 +1,55 @@ +# standard imports +import logging +import os + +# external imports +import alembic +import alembic.config + +# local imports +from chainqueue.db.models.base import SessionBase +from chainqueue.db import dsn_from_config + +logg = logging.getLogger(__name__) + + +db_config = { + 'DATABASE_ENGINE': 'sqlite', + 'DATABASE_DRIVER': 'pysqlite', + 'DATABASE_NAME': 'chainqueue.sqlite', + } + + +class ChainQueueDb: + + base = SessionBase + + def __init__(self, debug=False): + logg.debug('config {}'.format(db_config)) + self.dsn = dsn_from_config(db_config) + + self.base.poolable = False + self.base.transactional = False + self.base.procedural = False + self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0 + + rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..') + dbdir = os.path.join(rootdir, 'chainqueue', 'db') + #migrationsdir = os.path.join(dbdir, 'migrations', db_config.get('DATABASE_ENGINE')) + migrationsdir = os.path.join(dbdir, 'migrations', 'default') + logg.info('using migrations directory {}'.format(migrationsdir)) + + ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini')) + ac.set_main_option('sqlalchemy.url', self.dsn) + ac.set_main_option('script_location', migrationsdir) + + alembic.command.downgrade(ac, 'base') + alembic.command.upgrade(ac, 'head') + + + def bind_session(self, session=None): + return self.base.bind_session(session) + + + def release_session(self, session=None): + return self.base.release_session(session) diff --git a/requirements.txt b/requirements.txt index 004049d..2ffa881 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ pysha3==1.0.2 -hexathon~=0.0.1a7 +hexathon~=0.0.1a8 +leveldir~=0.0.2 alembic==1.4.2 SQLAlchemy==1.3.20 -confini~=0.3.6rc3 +confini>=0.4.1a1,<0.5.0 pyxdg~=0.27 +chainlib~=0.0.9a2 diff --git a/run_tests.sh b/run_tests.sh new file mode 100644 index 0000000..60cb203 --- /dev/null +++ b/run_tests.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +set +e +set +x +export PYTHONPATH=. +for f in `ls tests/*.py`; do + python $f + if [ $? -gt 0 ]; then + exit 1 + fi +done +set -x +set -e diff --git a/setup.cfg b/setup.cfg index 040a37d..060e0c6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainqueue -version = 0.0.2b4 +version = 0.0.4a3 description = Generic blockchain transaction queue control author = Louis Holbrook author_email = dev@holbrook.no @@ -26,14 +26,16 @@ licence_files = [options] python_requires = >= 3.6 +include_package_data = True packages = chainqueue chainqueue.db chainqueue.db.models - chainqueue.db.migrations chainqueue.sql chainqueue.adapters + chainqueue.unittest + chainqueue.runnable -#[options.entry_points] -#console_scripts = -# chainqueue-eth-server = chainqueue.runnable.server_eth:main +[options.entry_points] +console_scripts = + chainqueue-list = chainqueue.runnable.list:main diff --git a/tests/base.py b/tests/base.py deleted file mode 100644 index b1822c7..0000000 --- a/tests/base.py +++ /dev/null @@ -1,112 +0,0 @@ -# standard imports -import logging -import unittest -import tempfile -import os -#import pysqlite - -# external imports -from chainqueue.db.models.otx import Otx -from chainqueue.db.models.tx import TxCache -from chainlib.chain import ChainSpec -import alembic -import alembic.config -from hexathon import ( - add_0x, - strip_0x, - ) - -# local imports -from chainqueue.db import dsn_from_config -from chainqueue.db.models.base import SessionBase -from chainqueue.sql.tx import create - -script_dir = os.path.realpath(os.path.dirname(__file__)) - -#logg = logging.getLogger().getChild(__name__) -logg = logging.getLogger() - - -class TestBase(unittest.TestCase): - - def setUp(self): - rootdir = os.path.dirname(os.path.dirname(__file__)) - dbdir = os.path.join(rootdir, 'chainqueue', 'db') - #migrationsdir = os.path.join(dbdir, 'migrations', load_config.get('DATABASE_ENGINE')) - #if not os.path.isdir(migrationsdir): - migrationsdir = os.path.join(dbdir, 'migrations', 'default') - logg.info('using migrations directory {}'.format(migrationsdir)) - - config = { - 'DATABASE_ENGINE': 'sqlite', - 'DATABASE_DRIVER': 'pysqlite', - 'DATABASE_NAME': 'chainqueue.sqlite', - } - logg.debug('config {}'.format(config)) - - dsn = dsn_from_config(config) - SessionBase.poolable = False - SessionBase.transactional = False - SessionBase.procedural = False - SessionBase.connect(dsn, debug=bool(os.environ.get('DATABASE_DEBUG'))) # TODO: evaluates to "true" even if string is 0 - - ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini')) - ac.set_main_option('sqlalchemy.url', dsn) - ac.set_main_option('script_location', migrationsdir) - - alembic.command.downgrade(ac, 'base') - alembic.command.upgrade(ac, 'head') - - self.session = SessionBase.create_session() - - self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar') - - - def tearDown(self): - self.session.commit() - self.session.close() - - -class TestOtxBase(TestBase): - - def setUp(self): - super(TestOtxBase, self).setUp() - self.tx_hash = os.urandom(32).hex() - self.tx = os.urandom(128).hex() - self.nonce = 42 - self.alice = add_0x(os.urandom(20).hex()) - - tx_hash = create(self.chain_spec, self.nonce, self.alice, self.tx_hash, self.tx, session=self.session) - self.assertEqual(tx_hash, self.tx_hash) - self.session.commit() - - logg.info('using tx hash {}'.format(self.tx_hash)) - - -class TestTxBase(TestOtxBase): - - def setUp(self): - super(TestTxBase, self).setUp() - self.bob = add_0x(os.urandom(20).hex()) - self.carol = add_0x(os.urandom(20).hex()) - self.foo_token = add_0x(os.urandom(20).hex()) - self.bar_token = add_0x(os.urandom(20).hex()) - self.from_value = 42 - self.to_value = 13 - - txc = TxCache( - self.tx_hash, - self.alice, - self.bob, - self.foo_token, - self.bar_token, - self.from_value, - self.to_value, - session=self.session, - ) - self.session.add(txc) - self.session.commit() - - otx = Otx.load(self.tx_hash) - self.assertEqual(txc.otx_id, otx.id) - diff --git a/tests/chainqueue_base.py b/tests/chainqueue_base.py new file mode 100644 index 0000000..00129e5 --- /dev/null +++ b/tests/chainqueue_base.py @@ -0,0 +1,85 @@ +# standard imports +import logging +import unittest +import tempfile +import os +#import pysqlite + +# external imports +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache +from chainlib.chain import ChainSpec +from hexathon import ( + add_0x, + strip_0x, + ) + +# local imports +from chainqueue.sql.tx import create +from chainqueue.unittest.db import ChainQueueDb +from chainqueue.sql.backend import SQLBackend + +script_dir = os.path.realpath(os.path.dirname(__file__)) + +logg = logging.getLogger(__name__) + + +class TestBase(unittest.TestCase): + + def setUp(self): + debug = bool(os.environ.get('DATABASE_DEBUG', False)) + self.db = ChainQueueDb(debug=debug) + self.session = self.db.bind_session() + self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar') + + + def tearDown(self): + self.session.commit() + self.db.release_session(self.session) + + +class TestOtxBase(TestBase): + + def setUp(self): + super(TestOtxBase, self).setUp() + self.tx_hash = os.urandom(32).hex() + self.tx = os.urandom(128).hex() + self.nonce = 42 + self.alice = add_0x(os.urandom(20).hex()) + + tx_hash = create(self.chain_spec, self.nonce, self.alice, self.tx_hash, self.tx, session=self.session) + self.assertEqual(tx_hash, self.tx_hash) + self.session.commit() + + logg.info('using tx hash {}'.format(self.tx_hash)) + + +class TestTxBase(TestOtxBase): + + def setUp(self): + super(TestTxBase, self).setUp() + self.bob = add_0x(os.urandom(20).hex()) + self.carol = add_0x(os.urandom(20).hex()) + self.foo_token = add_0x(os.urandom(20).hex()) + self.bar_token = add_0x(os.urandom(20).hex()) + self.from_value = 42 + self.to_value = 13 + + backend = SQLBackend(self.db.dsn) + tx = { + 'hash': self.tx_hash, + 'from': self.alice, + 'to': self.bob, + 'source_token': self.foo_token, + 'destination_token': self.bar_token, + 'from_value': self.from_value, + 'to_value': self.to_value, + } + backend.cache(tx, session=self.session) + self.session.commit() + + otx = Otx.load(self.tx_hash) + txc = TxCache.load(self.tx_hash) + + self.assertEqual(txc.otx_id, otx.id) + diff --git a/tests/test_basic.py b/tests/test_basic.py index 9aca07f..ddc9427 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -14,7 +14,7 @@ from chainqueue.db.models.otx import Otx from chainqueue.db.models.tx import TxCache # test imports -from tests.base import TestBase +from tests.chainqueue_base import TestBase logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() diff --git a/tests/test_fs.py b/tests/test_fs.py index b7724e4..c2704e3 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -5,9 +5,11 @@ import shutil import logging import os +# external imports +from leveldir.hex import HexDir + # local imports from chainqueue.fs.queue import FsQueue -from chainqueue.fs.dir import HexDir from chainqueue.enum import StatusBits logging.basicConfig(level=logging.DEBUG) diff --git a/tests/test_fs_entry.py b/tests/test_fs_entry.py index 5adcd37..d4145b7 100644 --- a/tests/test_fs_entry.py +++ b/tests/test_fs_entry.py @@ -5,9 +5,11 @@ import shutil import logging import os +# external imports +from leveldir.hex import HexDir + # local imports from chainqueue.fs.queue import FsQueue -from chainqueue.fs.dir import HexDir from chainqueue.fs.entry import Entry from chainqueue.enum import StatusBits @@ -30,9 +32,9 @@ class FsQueueEntryTest(unittest.TestCase): def test_entry(self): - tx_hash = os.urandom(32) - tx_content = os.urandom(128) - Entry(tx_hash, tx_content) + tx_hash = os.urandom(32).hex() + tx_content = os.urandom(128).hex() + Entry(0, tx_hash, tx_content) if __name__ == '__main__': diff --git a/tests/test_helo.py b/tests/test_helo.py new file mode 100644 index 0000000..e32f613 --- /dev/null +++ b/tests/test_helo.py @@ -0,0 +1,15 @@ +# standard imports +import unittest + +# local imports +from tests.chainqueue_base import TestBase + + +class TestHelo(TestBase): + + def test_helo(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_hexdir.py b/tests/test_hexdir.py deleted file mode 100644 index c0402e0..0000000 --- a/tests/test_hexdir.py +++ /dev/null @@ -1,84 +0,0 @@ -# standard imports -import unittest -import tempfile -import shutil -import logging -import os - -# local imports -from chainqueue.fs.dir import HexDir - -logging.basicConfig(level=logging.DEBUG) -logg = logging.getLogger() - - -class HexDirTest(unittest.TestCase): - - def setUp(self): - self.dir = tempfile.mkdtemp() - self.hexdir = HexDir(os.path.join(self.dir, 'q'), 4, 3, 2) - logg.debug('setup hexdir root {}'.format(self.dir)) - - - def tearDown(self): - shutil.rmtree(self.dir) - logg.debug('cleaned hexdir root {}'.format(self.dir)) - - - def test_path(self): - content = b'cdef' - prefix = b'ab' - label = b'\xde\xad\xbe\xef' - self.hexdir.add(label, content, prefix=prefix) - file_path = os.path.join(self.dir, 'q', 'DE', 'AD', 'BE', label.hex().upper()) - - f = open(file_path, 'rb') - r = f.read() - f.close() - self.assertEqual(content, r) - - f = open(self.hexdir.master_file, 'rb') - r = f.read() - f.close() - self.assertEqual(prefix + label, r) - - - def test_size(self): - content = b'cdef' - prefix = b'ab' - label = b'\xde\xad\xbe\xef' - with self.assertRaises(ValueError): - self.hexdir.add(label, content, prefix=b'a') - - - def test_index(self): - self.hexdir.add(b'\xde\xad\xbe\xef', b'foo', b'ab') - self.hexdir.add(b'\xbe\xef\xfe\xed', b'bar', b'cd') - c = self.hexdir.add(b'\x01\x02\x03\x04', b'baz', b'ef') - self.assertEqual(c, 2) - - - def test_edit(self): - self.hexdir.add(b'\xde\xad\xbe\xef', b'foo', b'ab') - self.hexdir.add(b'\xbe\xef\xfe\xed', b'bar', b'cd') - self.hexdir.add(b'\x01\x02\x03\x04', b'baz', b'ef') - self.hexdir.set_prefix(1, b'ff') - - f = open(self.hexdir.master_file, 'rb') - f.seek(6) - r = f.read(2) - f.close() - self.assertEqual(b'ff', r) - - - def test_get(self): - self.hexdir.add(b'\xde\xad\xbe\xef', b'foo', b'ab') - self.hexdir.add(b'\xbe\xef\xfe\xed', b'bar', b'cd') - self.hexdir.add(b'\x01\x02\x03\x04', b'baz', b'ef') - (prefix, key) = self.hexdir.get(1) - self.assertEqual(b'\xbe\xef\xfe\xed', key) - self.assertEqual(b'cd', prefix) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_otx.py b/tests/test_otx.py index 53198e0..d907cc8 100644 --- a/tests/test_otx.py +++ b/tests/test_otx.py @@ -15,7 +15,7 @@ from chainqueue.db.enum import ( from chainqueue.sql.state import * # test imports -from tests.base import TestOtxBase +from tests.chainqueue_base import TestOtxBase logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() diff --git a/tests/test_otx_status_log.py b/tests/test_otx_status_log.py index be01d38..9208cb6 100644 --- a/tests/test_otx_status_log.py +++ b/tests/test_otx_status_log.py @@ -6,7 +6,7 @@ from chainqueue.db.models.otx import Otx from chainqueue.sql.state import * # test imports -from tests.base import TestOtxBase +from tests.chainqueue_base import TestOtxBase class TestOtxState(TestOtxBase): diff --git a/tests/test_query.py b/tests/test_query.py index 74d4c4d..eb8896c 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -22,7 +22,7 @@ from chainqueue.sql.state import ( from chainqueue.enum import StatusBits # test imports -from tests.base import TestTxBase +from tests.chainqueue_base import TestTxBase logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() @@ -30,21 +30,207 @@ logg = logging.getLogger() class TestTxQuery(TestTxBase): - def test_get_tx(self): - tx = get_tx(self.chain_spec, self.tx_hash) - expected_keys = [ - 'otx_id', - 'status', - 'signed_tx', - 'nonce', - ] - for k in tx.keys(): - expected_keys.remove(k) +# def test_get_tx(self): +# tx = get_tx(self.chain_spec, self.tx_hash) +# expected_keys = [ +# 'otx_id', +# 'status', +# 'signed_tx', +# 'nonce', +# ] +# for k in tx.keys(): +# expected_keys.remove(k) +# +# self.assertEqual(len(expected_keys), 0) +# +# +# def test_nonce_tx(self): +# +# nonce_hashes = [self.tx_hash] +# tx_hash = add_0x(os.urandom(32).hex()) +# signed_tx = add_0x(os.urandom(128).hex()) +# create( +# self.chain_spec, +# 42, +# self.alice, +# tx_hash, +# signed_tx, +# session=self.session, +# ) +# txc = TxCache( +# tx_hash, +# self.alice, +# self.bob, +# self.foo_token, +# self.bar_token, +# self.from_value, +# self.to_value, +# session=self.session, +# ) +# self.session.add(txc) +# self.session.commit() +# +# nonce_hashes.append(tx_hash) +# +# tx_hash = add_0x(os.urandom(32).hex()) +# signed_tx = add_0x(os.urandom(128).hex()) +# create( +# self.chain_spec, +# 41, +# self.alice, +# tx_hash, +# signed_tx, +# session=self.session, +# ) +# txc = TxCache( +# tx_hash, +# self.alice, +# self.bob, +# self.foo_token, +# self.bar_token, +# self.from_value, +# self.to_value, +# session=self.session, +# ) +# self.session.add(txc) +# +# txs = get_nonce_tx_cache(self.chain_spec, 42, self.alice) +# self.assertEqual(len(txs.keys()), 2) +# +# for h in nonce_hashes: +# self.assertTrue(strip_0x(h) in txs) +# +# +# def test_paused_tx_cache(self): +# set_waitforgas(self.chain_spec, self.tx_hash) +# +# tx_hash = add_0x(os.urandom(32).hex()) +# signed_tx = add_0x(os.urandom(128).hex()) +# create( +# self.chain_spec, +# 43, +# self.alice, +# tx_hash, +# signed_tx, +# session=self.session, +# ) +# txc = TxCache( +# tx_hash, +# self.alice, +# self.bob, +# self.foo_token, +# self.bar_token, +# self.from_value, +# self.to_value, +# session=self.session, +# ) +# self.session.add(txc) +# self.session.commit() +# +# txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, sender=self.alice, session=self.session) +# self.assertEqual(len(txs.keys()), 1) +# +# txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, session=self.session) +# self.assertEqual(len(txs.keys()), 1) +# +# tx_hash = add_0x(os.urandom(32).hex()) +# signed_tx = add_0x(os.urandom(128).hex()) +# create( +# self.chain_spec, +# 42, +# self.bob, +# tx_hash, +# signed_tx, +# session=self.session, +# ) +# txc = TxCache( +# tx_hash, +# self.bob, +# self.alice, +# self.bar_token, +# self.foo_token, +# self.to_value, +# self.from_value, +# session=self.session, +# ) +# self.session.add(txc) +# self.session.commit() +# +# txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, session=self.session) +# self.assertEqual(len(txs.keys()), 1) +# +# set_waitforgas(self.chain_spec, tx_hash) +# self.session.commit() +# +# txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, session=self.session) +# self.assertEqual(len(txs.keys()), 2) +# +# txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, sender=self.bob, session=self.session) +# self.assertEqual(len(txs.keys()), 1) +# +# +# def test_count(self): +# for i in range(3): +# tx_hash = add_0x(os.urandom(32).hex()) +# signed_tx = add_0x(os.urandom(128).hex()) +# create( +# self.chain_spec, +# i, +# self.alice, +# tx_hash, +# signed_tx, +# session=self.session, +# ) +# txc = TxCache( +# tx_hash, +# self.alice, +# self.bob, +# self.foo_token, +# self.bar_token, +# self.from_value, +# self.to_value, +# session=self.session, +# ) +# self.session.add(txc) +# set_ready(self.chain_spec, tx_hash, session=self.session) +# set_reserved(self.chain_spec, tx_hash, session=self.session) +# if i > 0: +# set_sent(self.chain_spec, tx_hash, session=self.session) +# if i == 2: +# set_final(self.chain_spec, tx_hash, session=self.session) +# +# tx_hash = add_0x(os.urandom(32).hex()) +# signed_tx = add_0x(os.urandom(128).hex()) +# create( +# self.chain_spec, +# i, +# self.bob, +# tx_hash, +# signed_tx, +# session=self.session, +# ) +# txc = TxCache( +# tx_hash, +# self.bob, +# self.carol, +# self.foo_token, +# self.bar_token, +# self.from_value, +# self.to_value, +# session=self.session, +# ) +# +# self.session.add(txc) +# set_ready(self.chain_spec, tx_hash, session=self.session) +# set_reserved(self.chain_spec, tx_hash, session=self.session) +# set_sent(self.chain_spec, tx_hash, session=self.session) +# self.session.commit() +# +# self.assertEqual(count_tx(self.chain_spec, status=StatusBits.IN_NETWORK | StatusBits.FINAL, status_target=StatusBits.IN_NETWORK), 2) +# self.assertEqual(count_tx(self.chain_spec, address=self.alice, status=StatusBits.IN_NETWORK | StatusBits.FINAL, status_target=StatusBits.IN_NETWORK), 1) +# - self.assertEqual(len(expected_keys), 0) - - - def test_nonce_tx(self): + def test_account_tx(self): nonce_hashes = [self.tx_hash] tx_hash = add_0x(os.urandom(32).hex()) @@ -72,6 +258,8 @@ class TestTxQuery(TestTxBase): nonce_hashes.append(tx_hash) + time_between = datetime.datetime.utcnow() + tx_hash = add_0x(os.urandom(32).hex()) signed_tx = add_0x(os.urandom(128).hex()) create( @@ -94,140 +282,68 @@ class TestTxQuery(TestTxBase): ) self.session.add(txc) - txs = get_nonce_tx_cache(self.chain_spec, 42, self.alice) - self.assertEqual(len(txs.keys()), 2) - - for h in nonce_hashes: - self.assertTrue(strip_0x(h) in txs) + nonce_hashes.append(tx_hash) - - def test_paused_tx_cache(self): - set_waitforgas(self.chain_spec, self.tx_hash) - - tx_hash = add_0x(os.urandom(32).hex()) - signed_tx = add_0x(os.urandom(128).hex()) - create( - self.chain_spec, - 43, - self.alice, - tx_hash, - signed_tx, - session=self.session, - ) - txc = TxCache( - tx_hash, - self.alice, - self.bob, - self.foo_token, - self.bar_token, - self.from_value, - self.to_value, - session=self.session, - ) - self.session.add(txc) - self.session.commit() - - txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, sender=self.alice, session=self.session) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session) + self.assertEqual(len(txs.keys()), 3) + + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=tx_hash) self.assertEqual(len(txs.keys()), 1) - txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, session=self.session) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=nonce_hashes[0]) + self.assertEqual(len(txs.keys()), 3) + + bogus_hash = add_0x(os.urandom(32).hex()) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=bogus_hash) + self.assertEqual(len(txs.keys()), 0) + + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=time_between) self.assertEqual(len(txs.keys()), 1) - tx_hash = add_0x(os.urandom(32).hex()) - signed_tx = add_0x(os.urandom(128).hex()) - create( - self.chain_spec, - 42, - self.bob, - tx_hash, - signed_tx, - session=self.session, - ) - txc = TxCache( - tx_hash, - self.bob, - self.alice, - self.bar_token, - self.foo_token, - self.to_value, - self.from_value, - session=self.session, - ) - self.session.add(txc) - self.session.commit() + time_before = time_between - datetime.timedelta(hours=1) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=time_before) + self.assertEqual(len(txs.keys()), 3) - txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, session=self.session) - self.assertEqual(len(txs.keys()), 1) + time_after = datetime.datetime.utcnow() + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=time_after) + self.assertEqual(len(txs.keys()), 0) - set_waitforgas(self.chain_spec, tx_hash) - self.session.commit() - txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, session=self.session) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=2) self.assertEqual(len(txs.keys()), 2) - txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, sender=self.bob, session=self.session) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=1) + self.assertEqual(len(txs.keys()), 3) + + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=4) + self.assertEqual(len(txs.keys()), 0) + + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=1, until=2) + self.assertEqual(len(txs.keys()), 2) + + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=time_before, until=time_between) + self.assertEqual(len(txs.keys()), 2) + + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, session=self.session, since=nonce_hashes[0], until=nonce_hashes[1]) + self.assertEqual(len(txs.keys()), 2) + + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, status=StatusBits.QUEUED, session=self.session) + self.assertEqual(len(txs.keys()), 0) + + set_ready(self.chain_spec, nonce_hashes[1], session=self.session) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, status=StatusBits.QUEUED, session=self.session) self.assertEqual(len(txs.keys()), 1) + set_reserved(self.chain_spec, nonce_hashes[1], session=self.session) + set_sent(self.chain_spec, nonce_hashes[1], session=self.session) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, status=StatusBits.QUEUED, session=self.session) + self.assertEqual(len(txs.keys()), 0) - def test_count(self): - for i in range(3): - tx_hash = add_0x(os.urandom(32).hex()) - signed_tx = add_0x(os.urandom(128).hex()) - create( - self.chain_spec, - i, - self.alice, - tx_hash, - signed_tx, - session=self.session, - ) - txc = TxCache( - tx_hash, - self.alice, - self.bob, - self.foo_token, - self.bar_token, - self.from_value, - self.to_value, - session=self.session, - ) - self.session.add(txc) - set_ready(self.chain_spec, tx_hash, session=self.session) - set_reserved(self.chain_spec, tx_hash, session=self.session) - if i > 0: - set_sent(self.chain_spec, tx_hash, session=self.session) - if i == 2: - set_final(self.chain_spec, tx_hash, session=self.session) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, not_status=StatusBits.QUEUED, session=self.session) + self.assertEqual(len(txs.keys()), 3) - tx_hash = add_0x(os.urandom(32).hex()) - signed_tx = add_0x(os.urandom(128).hex()) - create( - self.chain_spec, - i, - self.bob, - tx_hash, - signed_tx, - session=self.session, - ) - txc = TxCache( - tx_hash, - self.bob, - self.carol, - self.foo_token, - self.bar_token, - self.from_value, - self.to_value, - session=self.session, - ) - - self.session.add(txc) - set_ready(self.chain_spec, tx_hash, session=self.session) - set_reserved(self.chain_spec, tx_hash, session=self.session) - set_sent(self.chain_spec, tx_hash, session=self.session) - self.session.commit() - - self.assertEqual(count_tx(self.chain_spec, status=StatusBits.IN_NETWORK | StatusBits.FINAL, status_target=StatusBits.IN_NETWORK), 2) - self.assertEqual(count_tx(self.chain_spec, address=self.alice, status=StatusBits.IN_NETWORK | StatusBits.FINAL, status_target=StatusBits.IN_NETWORK), 1) + txs = get_account_tx(self.chain_spec, self.alice, as_sender=True, as_recipient=False, not_status=StatusBits.QUEUED, status=StatusBits.IN_NETWORK, session=self.session) + self.assertEqual(len(txs.keys()), 1) if __name__ == '__main__': diff --git a/tests/test_tx_cache.py b/tests/test_tx_cache.py index e5dbb36..3135fef 100644 --- a/tests/test_tx_cache.py +++ b/tests/test_tx_cache.py @@ -11,7 +11,7 @@ from chainqueue.sql.state import * from chainqueue.sql.query import get_tx_cache # test imports -from tests.base import TestTxBase +from tests.chainqueue_base import TestTxBase class TestTxCache(TestTxBase):