89 lines
2.8 KiB
Python
89 lines
2.8 KiB
Python
|
# standard imports
|
|||
|
import logging
|
|||
|
|
|||
|
# third-party imports
|
|||
|
import pytest
|
|||
|
from cic_registry import CICRegistry
|
|||
|
|
|||
|
# local imports
|
|||
|
from cic_eth.eth.token import TokenTxFactory, unpack_transfer, otx_cache_transfer
|
|||
|
from cic_eth.eth.util import unpack_signed_raw_tx
|
|||
|
from cic_eth.queue.tx import create as queue_create
|
|||
|
from cic_eth.db.models.otx import Otx
|
|||
|
from cic_eth.db.models.tx import TxCache
|
|||
|
|
|||
|
logg = logging.getLogger()
|
|||
|
|
|||
|
|
|||
|
def test_unpack_transfer(
|
|||
|
default_chain_spec,
|
|||
|
init_w3,
|
|||
|
init_rpc,
|
|||
|
cic_registry,
|
|||
|
bancor_tokens,
|
|||
|
bancor_registry,
|
|||
|
):
|
|||
|
|
|||
|
source_token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
|
|||
|
logg.debug('bancor tokens {} {}'.format(bancor_tokens, source_token))
|
|||
|
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
|||
|
transfer_tx = txf.transfer(
|
|||
|
source_token.address(),
|
|||
|
init_w3.eth.accounts[1],
|
|||
|
42,
|
|||
|
default_chain_spec,
|
|||
|
)
|
|||
|
s = init_w3.eth.sign_transaction(transfer_tx)
|
|||
|
s_bytes = bytes.fromhex(s['raw'][2:])
|
|||
|
tx_dict = unpack_signed_raw_tx(s_bytes, default_chain_spec.chain_id())
|
|||
|
assert tx_dict['from'] == init_w3.eth.accounts[0]
|
|||
|
assert tx_dict['to'] == bancor_tokens[0]
|
|||
|
assert tx_dict['value'] == 0
|
|||
|
|
|||
|
transfer_data = unpack_transfer(tx_dict['data'])
|
|||
|
|
|||
|
assert transfer_data['to'] == init_w3.eth.accounts[1]
|
|||
|
assert transfer_data['amount'] == 42
|
|||
|
|
|||
|
|
|||
|
def test_queue_cache_transfer(
|
|||
|
default_chain_spec,
|
|||
|
init_w3,
|
|||
|
init_rpc,
|
|||
|
init_database,
|
|||
|
cic_registry,
|
|||
|
bancor_tokens,
|
|||
|
bancor_registry,
|
|||
|
):
|
|||
|
|
|||
|
source_token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
|
|||
|
txf = TokenTxFactory(init_w3.eth.accounts[0], init_rpc)
|
|||
|
value = 42
|
|||
|
transfer_tx = txf.transfer(
|
|||
|
source_token.address(),
|
|||
|
init_w3.eth.accounts[1],
|
|||
|
value,
|
|||
|
default_chain_spec,
|
|||
|
)
|
|||
|
tx_signed = init_w3.eth.sign_transaction(transfer_tx)
|
|||
|
tx_hash = init_w3.eth.sendRawTransaction(tx_signed['raw'])
|
|||
|
tx_hash_hex = tx_hash.hex()
|
|||
|
nonce = int(tx_signed['nonce'][2:], 16)
|
|||
|
tx_hash_queue = queue_create(nonce, init_w3.eth.accounts[0], tx_hash_hex, tx_signed['raw'], str(default_chain_spec))
|
|||
|
tx_hash_cache = otx_cache_transfer(tx_hash_hex, tx_signed['raw'], str(default_chain_spec))
|
|||
|
|
|||
|
assert tx_hash_hex == tx_hash_queue
|
|||
|
assert tx_hash_hex == tx_hash_cache
|
|||
|
|
|||
|
session = Otx.create_session()
|
|||
|
otx = session.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
|
|||
|
assert otx.tx_hash == tx_hash_hex
|
|||
|
|
|||
|
txc = session.query(TxCache).filter(TxCache.otx_id==otx.id).first()
|
|||
|
|
|||
|
assert txc.sender == init_w3.eth.accounts[0]
|
|||
|
assert txc.recipient == init_w3.eth.accounts[1]
|
|||
|
assert txc.source_token_address == bancor_tokens[0]
|
|||
|
assert txc.destination_token_address == bancor_tokens[0]
|
|||
|
assert txc.values() == (value, value)
|