Add remaning query tests
This commit is contained in:
parent
167ff0f5e4
commit
1e602e1352
@ -18,7 +18,7 @@ logg = logging.getLogger(__name__)
|
||||
|
||||
re_transactions_all_bloom = r'/tx/(\d+)?/?(\d+)/?'
|
||||
re_transactions_account_bloom = r'/tx/user/((0x)?[a-fA-F0-9]+)(/(\d+)(/(\d+))?)?/?'
|
||||
re_transactions_all_data = r'/txa/(\d+)/(\d+)/?'
|
||||
re_transactions_all_data = r'/txa/(\d+)?/?(\d+)/?'
|
||||
|
||||
DEFAULT_LIMIT = 100
|
||||
|
||||
|
@ -86,7 +86,7 @@ def test_query_regex(
|
||||
('alice', 2, None, []), # 420000 == list_defaults['block']
|
||||
],
|
||||
)
|
||||
def test_query_process_txs(
|
||||
def test_query_process_txs_account(
|
||||
init_database,
|
||||
list_defaults,
|
||||
list_actors,
|
||||
@ -134,3 +134,97 @@ def test_query_process_txs(
|
||||
for (block, tx) in query_match:
|
||||
block = block.to_bytes(4, byteorder='big')
|
||||
assert block_filter.check(block)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'query_offset, query_limit, query_match',
|
||||
[
|
||||
(None, 2, [(420000, 13), (419999, 42)]),
|
||||
(0, 1, [(420000, 13)]),
|
||||
(1, 1, [(419999, 42)]),
|
||||
(2, 0, []),
|
||||
],
|
||||
)
|
||||
def test_query_process_txs_bloom(
|
||||
init_database,
|
||||
list_defaults,
|
||||
list_actors,
|
||||
list_tokens,
|
||||
txs,
|
||||
zero_filter,
|
||||
query_offset,
|
||||
query_limit,
|
||||
query_match,
|
||||
):
|
||||
|
||||
path_info = '/tx'
|
||||
if query_offset != None:
|
||||
path_info += '/' + str(query_offset)
|
||||
if query_limit != None:
|
||||
if query_offset == None:
|
||||
path_info += '/0'
|
||||
path_info += '/' + str(query_limit)
|
||||
env = {
|
||||
'PATH_INFO': path_info,
|
||||
}
|
||||
logg.debug('using path {}'.format(path_info))
|
||||
r = process_transactions_all_bloom(init_database, env)
|
||||
assert r != None
|
||||
|
||||
o = json.loads(r[1])
|
||||
block_filter_data = base64.b64decode(o['block_filter'].encode('utf-8'))
|
||||
zero_filter_data = zero_filter.to_bytes()
|
||||
if len(query_match) == 0:
|
||||
assert block_filter_data == zero_filter_data
|
||||
return
|
||||
|
||||
assert block_filter_data != zero_filter_data
|
||||
block_filter = copy.copy(zero_filter)
|
||||
block_filter.merge(block_filter_data)
|
||||
block_filter_data = block_filter.to_bytes()
|
||||
assert block_filter_data != zero_filter_data
|
||||
|
||||
for (block, tx) in query_match:
|
||||
block = block.to_bytes(4, byteorder='big')
|
||||
assert block_filter.check(block)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'query_block_start, query_block_end, query_match_count',
|
||||
[
|
||||
(None, 42, 0),
|
||||
(420000, 420001, 1),
|
||||
(419999, 419999, 1), # matches are inclusive
|
||||
(419999, 420000, 2),
|
||||
(419999, 420001, 2),
|
||||
],
|
||||
)
|
||||
def test_query_process_txs_data(
|
||||
init_database,
|
||||
list_defaults,
|
||||
list_actors,
|
||||
list_tokens,
|
||||
txs,
|
||||
zero_filter,
|
||||
query_block_start,
|
||||
query_block_end,
|
||||
query_match_count,
|
||||
):
|
||||
|
||||
path_info = '/txa'
|
||||
if query_block_start != None:
|
||||
path_info += '/' + str(query_block_start)
|
||||
if query_block_end != None:
|
||||
if query_block_start == None:
|
||||
path_info += '/0'
|
||||
path_info += '/' + str(query_block_end)
|
||||
env = {
|
||||
'PATH_INFO': path_info,
|
||||
'HTTP_X_CIC_CACHE_MODE': 'all',
|
||||
}
|
||||
logg.debug('using path {}'.format(path_info))
|
||||
r = process_transactions_all_data(init_database, env)
|
||||
assert r != None
|
||||
|
||||
o = json.loads(r[1])
|
||||
assert len(o['data']) == query_match_count
|
||||
|
Loading…
Reference in New Issue
Block a user