101 lines
3.0 KiB
Python
101 lines
3.0 KiB
Python
# standard imports
|
|
import unittest
|
|
import logging
|
|
import os
|
|
|
|
# external imports
|
|
from potaahto.symbols import snake_and_camel
|
|
from hexathon import (
|
|
strip_0x,
|
|
)
|
|
from chainqueue.adapters.eth import EthAdapter
|
|
from chainqueue.unittest.db import (
|
|
db_config,
|
|
dsn_from_config,
|
|
)
|
|
from chainqueue.sql.backend import SQLBackend
|
|
from chainqueue.enum import is_alive
|
|
from chainqueue.sql.query import get_tx
|
|
from chainlib.eth.gas import (
|
|
RPCGasOracle,
|
|
Gas,
|
|
)
|
|
from chainlib.eth.nonce import (
|
|
RPCNonceOracle,
|
|
)
|
|
from chainlib.eth.tx import (
|
|
TxFormat,
|
|
raw,
|
|
unpack,
|
|
receipt,
|
|
Tx,
|
|
)
|
|
from chainlib.eth.block import (
|
|
block_by_hash,
|
|
Block,
|
|
)
|
|
from chainqueue.sql.state import (
|
|
set_sent,
|
|
set_reserved,
|
|
set_ready,
|
|
)
|
|
|
|
# local imports
|
|
from chaind_eth.filter import StateFilter
|
|
from chaind_eth.chain import EthChainInterface
|
|
|
|
# test imports
|
|
from tests.chaind_eth_base import TestBase
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logg = logging.getLogger()
|
|
|
|
|
|
class TestFilter(TestBase):
|
|
|
|
def test_filter(self):
|
|
gas_oracle = RPCGasOracle(conn=self.rpc)
|
|
nonce_oracle = RPCNonceOracle(self.accounts[0], conn=self.rpc)
|
|
c = Gas(self.chain_spec, signer=self.signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
|
|
(tx_hash, tx_raw_rlp_signed) = c.create(self.accounts[0], self.accounts[1], 1024, tx_format=TxFormat.RLP_SIGNED)
|
|
o = raw(tx_raw_rlp_signed)
|
|
self.rpc.do(o)
|
|
|
|
#o = receipt(tx_hash)
|
|
o = self.interface.tx_receipt(tx_hash)
|
|
rcpt = self.rpc.do(o)
|
|
|
|
#o = block_by_hash(rcpt['block_hash'])
|
|
o = self.interface.block_by_number(rcpt['block_number'])
|
|
block_src = self.rpc.do(o)
|
|
#block = Block(block_src)
|
|
block = self.interface.block_from_src(block_src)
|
|
|
|
dsn = dsn_from_config(db_config)
|
|
backend = SQLBackend(dsn, debug=bool(os.environ.get('DATABASE_DEBUG')))
|
|
adapter = EthAdapter(backend)
|
|
|
|
tx_raw_rlp_signed_bytes = bytes.fromhex(strip_0x(tx_raw_rlp_signed))
|
|
adapter.add(tx_raw_rlp_signed_bytes, self.chain_spec, session=self.session_chainqueue)
|
|
|
|
set_ready(self.chain_spec, tx_hash, session=self.session_chainqueue)
|
|
set_reserved(self.chain_spec, tx_hash, session=self.session_chainqueue)
|
|
set_sent(self.chain_spec, tx_hash, session=self.session_chainqueue)
|
|
|
|
tx_src = unpack(tx_raw_rlp_signed_bytes, self.chain_spec)
|
|
tx_src = self.interface.src_normalize(tx_src)
|
|
tx = Tx(tx_src, block=block, rcpt=rcpt)
|
|
|
|
tx_repr = get_tx(self.chain_spec, tx_hash, session=self.session_chainqueue)
|
|
assert is_alive(tx_repr['status'])
|
|
|
|
fltr = StateFilter(self.chain_spec)
|
|
fltr.filter(self.rpc, block, tx, session=self.session_chainqueue)
|
|
|
|
tx_repr = get_tx(self.chain_spec, tx_hash, session=self.session_chainqueue)
|
|
assert not is_alive(tx_repr['status'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|