Logs limit & log_index bug (#2073)

* Limiting number of logs

* Test for logs

* Fixing logs ordering and indexing

* Fixing sort

* unwrap -> expect

* Revert "unwrap -> expect"

This reverts commit e99e6e77f37692fe568448e768aa72775de8d0cd.
This commit is contained in:
Tomasz Drwięga 2016-09-14 12:02:30 +02:00 committed by Arkadiy Paronyan
parent 21cc368066
commit 9ed9857fba
10 changed files with 268 additions and 56 deletions

View File

@ -23,6 +23,7 @@ use header::*;
use super::extras::*;
use transaction::*;
use views::*;
use log_entry::{LogEntry, LocalizedLogEntry};
use receipt::Receipt;
use blooms::{Bloom, BloomGroup};
use blockchain::block_info::{BlockInfo, BlockLocation, BranchBecomingCanonChainData};
@ -127,6 +128,10 @@ pub trait BlockProvider {
/// Returns numbers of blocks containing given bloom.
fn blocks_with_bloom(&self, bloom: &H2048, from_block: BlockNumber, to_block: BlockNumber) -> Vec<BlockNumber>;
/// Returns logs matching given filter.
fn logs<F>(&self, mut blocks: Vec<BlockNumber>, matches: F, limit: Option<usize>) -> Vec<LocalizedLogEntry>
where F: Fn(&LogEntry) -> bool, Self: Sized;
}
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
@ -315,6 +320,51 @@ impl BlockProvider for BlockChain {
.map(|b| b as BlockNumber)
.collect()
}
fn logs<F>(&self, mut blocks: Vec<BlockNumber>, matches: F, limit: Option<usize>) -> Vec<LocalizedLogEntry>
where F: Fn(&LogEntry) -> bool, Self: Sized {
// sort in reverse order
blocks.sort_by(|a, b| b.cmp(a));
let mut log_index = 0;
let mut logs = blocks.into_iter()
.filter_map(|number| self.block_hash(number).map(|hash| (number, hash)))
.filter_map(|(number, hash)| self.block_receipts(&hash).map(|r| (number, hash, r.receipts)))
.filter_map(|(number, hash, receipts)| self.block_body(&hash).map(|ref b| (number, hash, receipts, BodyView::new(b).transaction_hashes())))
.flat_map(|(number, hash, mut receipts, hashes)| {
assert_eq!(receipts.len(), hashes.len());
log_index = receipts.iter().fold(0, |sum, receipt| sum + receipt.logs.len());
let receipts_len = receipts.len();
receipts.reverse();
receipts.into_iter()
.map(|receipt| receipt.logs)
.zip(hashes)
.enumerate()
.flat_map(move |(index, (mut logs, tx_hash))| {
let current_log_index = log_index;
log_index -= logs.len();
logs.reverse();
logs.into_iter()
.enumerate()
.map(move |(i, log)| LocalizedLogEntry {
entry: log,
block_hash: hash,
block_number: number,
transaction_hash: tx_hash,
// iterating in reverse order
transaction_index: receipts_len - index - 1,
log_index: current_log_index - i - 1,
})
})
})
.filter(|log_entry| matches(&log_entry.entry))
.take(limit.unwrap_or(::std::usize::MAX))
.collect::<Vec<LocalizedLogEntry>>();
logs.reverse();
logs
}
}
pub struct AncestryIter<'a> {
@ -1160,6 +1210,7 @@ mod tests {
use blockchain::extras::TransactionAddress;
use views::BlockView;
use transaction::{Transaction, Action};
use log_entry::{LogEntry, LocalizedLogEntry};
fn new_db(path: &str) -> Arc<Database> {
Arc::new(Database::open(&DatabaseConfig::with_columns(::db::NUM_COLUMNS), path).unwrap())
@ -1235,7 +1286,7 @@ mod tests {
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
let mut block_hashes = vec![genesis_hash.clone()];
let mut batch =db.transaction();
let mut batch = db.transaction();
for _ in 0..10 {
let block = canon_chain.generate(&mut finalizer).unwrap();
block_hashes.push(BlockView::new(&block).header_view().sha3());
@ -1612,13 +1663,134 @@ mod tests {
}
fn insert_block(db: &Arc<Database>, bc: &BlockChain, bytes: &[u8], receipts: Vec<Receipt>) -> ImportRoute {
let mut batch =db.transaction();
let mut batch = db.transaction();
let res = bc.insert_block(&mut batch, bytes, receipts);
db.write(batch).unwrap();
bc.commit();
res
}
#[test]
fn test_logs() {
// given
let mut canon_chain = ChainGenerator::default();
let mut finalizer = BlockFinalizer::default();
let genesis = canon_chain.generate(&mut finalizer).unwrap();
// just insert dummy transaction so that #transactions=#receipts
let t1 = Transaction {
nonce: 0.into(),
gas_price: 0.into(),
gas: 100_000.into(),
action: Action::Create,
value: 100.into(),
data: "601080600c6000396000f3006000355415600957005b60203560003555".from_hex().unwrap(),
}.sign(&"".sha3());
let t2 = Transaction {
nonce: 0.into(),
gas_price: 0.into(),
gas: 100_000.into(),
action: Action::Create,
value: 100.into(),
data: "601080600c6000396000f3006000355415600957005b60203560003555".from_hex().unwrap(),
}.sign(&"".sha3());
let t3 = Transaction {
nonce: 0.into(),
gas_price: 0.into(),
gas: 100_000.into(),
action: Action::Create,
value: 100.into(),
data: "601080600c6000396000f3006000355415600957005b60203560003555".from_hex().unwrap(),
}.sign(&"".sha3());
let tx_hash1 = t1.hash();
let tx_hash2 = t2.hash();
let tx_hash3 = t3.hash();
let b1 = canon_chain.with_transaction(t1).with_transaction(t2).generate(&mut finalizer).unwrap();
let b2 = canon_chain.with_transaction(t3).generate(&mut finalizer).unwrap();
let temp = RandomTempPath::new();
let db = new_db(temp.as_str());
let bc = BlockChain::new(Config::default(), &genesis, db.clone());
insert_block(&db, &bc, &b1, vec![Receipt {
state_root: H256::default(),
gas_used: 10_000.into(),
log_bloom: Default::default(),
logs: vec![
LogEntry { address: Default::default(), topics: vec![], data: vec![1], },
LogEntry { address: Default::default(), topics: vec![], data: vec![2], },
],
},
Receipt {
state_root: H256::default(),
gas_used: 10_000.into(),
log_bloom: Default::default(),
logs: vec![
LogEntry { address: Default::default(), topics: vec![], data: vec![3], },
],
}]);
insert_block(&db, &bc, &b2, vec![
Receipt {
state_root: H256::default(),
gas_used: 10_000.into(),
log_bloom: Default::default(),
logs: vec![
LogEntry { address: Default::default(), topics: vec![], data: vec![4], },
],
}
]);
// when
let block1 = BlockView::new(&b1);
let block2 = BlockView::new(&b2);
let logs1 = bc.logs(vec![1, 2], |_| true, None);
let logs2 = bc.logs(vec![1, 2], |_| true, Some(1));
// then
assert_eq!(logs1, vec![
LocalizedLogEntry {
entry: LogEntry { address: Default::default(), topics: vec![], data: vec![1] },
block_hash: block1.hash(),
block_number: block1.header().number(),
transaction_hash: tx_hash1.clone(),
transaction_index: 0,
log_index: 0,
},
LocalizedLogEntry {
entry: LogEntry { address: Default::default(), topics: vec![], data: vec![2] },
block_hash: block1.hash(),
block_number: block1.header().number(),
transaction_hash: tx_hash1.clone(),
transaction_index: 0,
log_index: 1,
},
LocalizedLogEntry {
entry: LogEntry { address: Default::default(), topics: vec![], data: vec![3] },
block_hash: block1.hash(),
block_number: block1.header().number(),
transaction_hash: tx_hash2.clone(),
transaction_index: 1,
log_index: 2,
},
LocalizedLogEntry {
entry: LogEntry { address: Default::default(), topics: vec![], data: vec![4] },
block_hash: block2.hash(),
block_number: block2.header().number(),
transaction_hash: tx_hash3.clone(),
transaction_index: 0,
log_index: 0,
}
]);
assert_eq!(logs2, vec![
LocalizedLogEntry {
entry: LogEntry { address: Default::default(), topics: vec![], data: vec![4] },
block_hash: block2.hash(),
block_number: block2.header().number(),
transaction_hash: tx_hash3.clone(),
transaction_index: 0,
log_index: 0,
}
]);
}
#[test]
fn test_bloom_filter_simple() {
// TODO: From here

View File

@ -957,10 +957,8 @@ impl BlockChainClient for Client {
}
}
fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry> {
// TODO: lock blockchain only once
let mut blocks = filter.bloom_possibilities().iter()
fn logs(&self, filter: Filter, limit: Option<usize>) -> Vec<LocalizedLogEntry> {
let blocks = filter.bloom_possibilities().iter()
.filter_map(|bloom| self.blocks_with_bloom(bloom, filter.from_block.clone(), filter.to_block.clone()))
.flat_map(|m| m)
// remove duplicate elements
@ -968,35 +966,7 @@ impl BlockChainClient for Client {
.into_iter()
.collect::<Vec<u64>>();
blocks.sort();
let chain = self.chain.read();
blocks.into_iter()
.filter_map(|number| chain.block_hash(number).map(|hash| (number, hash)))
.filter_map(|(number, hash)| chain.block_receipts(&hash).map(|r| (number, hash, r.receipts)))
.filter_map(|(number, hash, receipts)| chain.block_body(&hash).map(|ref b| (number, hash, receipts, BodyView::new(b).transaction_hashes())))
.flat_map(|(number, hash, receipts, hashes)| {
let mut log_index = 0;
receipts.into_iter()
.enumerate()
.flat_map(|(index, receipt)| {
log_index += receipt.logs.len();
receipt.logs.into_iter()
.enumerate()
.filter(|tuple| filter.matches(&tuple.1))
.map(|(i, log)| LocalizedLogEntry {
entry: log,
block_hash: hash.clone(),
block_number: number,
transaction_hash: hashes.get(index).cloned().unwrap_or_else(H256::default),
transaction_index: index,
log_index: log_index + i
})
.collect::<Vec<LocalizedLogEntry>>()
})
.collect::<Vec<LocalizedLogEntry>>()
})
.collect()
self.chain.read().logs(blocks, |entry| filter.matches(entry), limit)
}
fn filter_traces(&self, filter: TraceFilter) -> Option<Vec<LocalizedTrace>> {

View File

@ -390,8 +390,8 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}
fn logs(&self, _filter: Filter) -> Vec<LocalizedLogEntry> {
unimplemented!();
fn logs(&self, _filter: Filter, _limit: Option<usize>) -> Vec<LocalizedLogEntry> {
Vec::new()
}
fn last_hashes(&self) -> LastHashes {

View File

@ -156,7 +156,7 @@ pub trait BlockChainClient : Sync + Send {
fn blocks_with_bloom(&self, bloom: &H2048, from_block: BlockID, to_block: BlockID) -> Option<Vec<BlockNumber>>;
/// Returns logs matching given filter.
fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry>;
fn logs(&self, filter: Filter, limit: Option<usize>) -> Vec<LocalizedLogEntry>;
/// Makes a non-persistent transaction call.
fn call(&self, t: &SignedTransaction, block: BlockID, analytics: CallAnalytics) -> Result<Executed, CallError>;

View File

@ -19,6 +19,7 @@ use client::{BlockChainClient, MiningBlockChainClient, Client, ClientConfig, Blo
use ethereum;
use block::IsBlock;
use tests::helpers::*;
use types::filter::Filter;
use common::*;
use devtools::*;
use miner::Miner;
@ -131,6 +132,34 @@ fn returns_chain_info() {
assert_eq!(info.best_block_hash, block.header().hash());
}
#[test]
fn returns_logs() {
let dummy_block = get_good_dummy_block();
let client_result = get_test_client_with_blocks(vec![dummy_block.clone()]);
let client = client_result.reference();
let logs = client.logs(Filter {
from_block: BlockID::Earliest,
to_block: BlockID::Latest,
address: None,
topics: vec![],
}, None);
assert_eq!(logs.len(), 0);
}
#[test]
fn returns_logs_with_limit() {
let dummy_block = get_good_dummy_block();
let client_result = get_test_client_with_blocks(vec![dummy_block.clone()]);
let client = client_result.reference();
let logs = client.logs(Filter {
from_block: BlockID::Earliest,
to_block: BlockID::Latest,
address: None,
topics: vec![],
}, Some(2));
assert_eq!(logs.len(), 0);
}
#[test]
fn returns_block_body() {
let dummy_block = get_good_dummy_block();

View File

@ -241,6 +241,7 @@ mod tests {
use spec::*;
use transaction::*;
use tests::helpers::*;
use types::log_entry::{LogEntry, LocalizedLogEntry};
use rlp::View;
fn check_ok(result: Result<(), Error>) {
@ -333,6 +334,12 @@ mod tests {
fn block_receipts(&self, _hash: &H256) -> Option<BlockReceipts> {
unimplemented!()
}
fn logs<F>(&self, _blocks: Vec<BlockNumber>, _matches: F, _limit: Option<usize>) -> Vec<LocalizedLogEntry>
where F: Fn(&LogEntry) -> bool, Self: Sized {
unimplemented!()
}
}
fn basic_test(bytes: &[u8], engine: &Engine) -> Result<(), Error> {

View File

@ -28,7 +28,8 @@ pub fn expect_no_params(params: Params) -> Result<(), Error> {
}
}
fn params_len(params: &Params) -> usize {
/// Returns number of different parameters in given `Params` object.
pub fn params_len(params: &Params) -> usize {
match params {
&Params::Array(ref vec) => vec.len(),
_ => 0,

View File

@ -45,7 +45,7 @@ use v1::traits::Eth;
use v1::types::{Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, Transaction, CallRequest, Index, Filter, Log, Receipt, H64 as RpcH64, H256 as RpcH256, H160 as RpcH160, U256 as RpcU256};
use v1::helpers::{CallRequest as CRequest, errors};
use v1::helpers::dispatch::{default_gas_price, dispatch_transaction};
use v1::helpers::params::{expect_no_params, from_params_default_second, from_params_default_third};
use v1::helpers::params::{expect_no_params, params_len, from_params_default_second, from_params_default_third};
/// Eth RPC options
pub struct EthClientOptions {
@ -498,22 +498,33 @@ impl<C, S: ?Sized, M, EM> Eth for EthClient<C, S, M, EM> where
fn logs(&self, params: Params) -> Result<Value, Error> {
try!(self.active());
from_params::<(Filter,)>(params)
.and_then(|(filter,)| {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.into();
let mut logs = take_weak!(self.client).logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
let params = match params_len(&params) {
1 => from_params::<(Filter, )>(params).map(|(filter, )| (filter, None)),
_ => from_params::<(Filter, usize)>(params).map(|(filter, val)| (filter, Some(val))),
};
params.and_then(|(filter, limit)| {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.into();
let mut logs = take_weak!(self.client).logs(filter.clone(), limit)
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
if include_pending {
let pending = pending_logs(&*take_weak!(self.miner), &filter);
logs.extend(pending);
}
if include_pending {
let pending = pending_logs(&*take_weak!(self.miner), &filter);
logs.extend(pending);
}
Ok(to_value(&logs))
})
let len = logs.len();
match limit {
Some(limit) if len >= limit => {
logs = logs.split_off(len - limit);
},
_ => {},
}
Ok(to_value(&logs))
})
}
fn work(&self, params: Params) -> Result<Value, Error> {

View File

@ -152,7 +152,7 @@ impl<C, M> EthFilter for EthFilterClient<C, M> where
filter.to_block = BlockID::Latest;
// retrieve logs in range from_block..min(BlockID::Latest..to_block)
let mut logs = client.logs(filter.clone())
let mut logs = client.logs(filter.clone(), None)
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
@ -194,7 +194,7 @@ impl<C, M> EthFilter for EthFilterClient<C, M> where
Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.clone().into();
let mut logs = take_weak!(self.client).logs(filter.clone())
let mut logs = take_weak!(self.client).logs(filter.clone(), None)
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();

View File

@ -149,6 +149,28 @@ fn rpc_eth_hashrate() {
assert_eq!(tester.io.handle_request_sync(request), Some(response.to_owned()));
}
#[test]
fn rpc_eth_logs() {
let tester = EthTester::default();
let request = r#"{"jsonrpc": "2.0", "method": "eth_getLogs", "params": [{}], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":[],"id":1}"#;
assert_eq!(tester.io.handle_request_sync(request), Some(response.to_owned()));
}
#[test]
fn rpc_eth_logs_with_limit() {
let tester = EthTester::default();
let request1 = r#"{"jsonrpc": "2.0", "method": "eth_getLogs", "params": [{}, 1], "id": 1}"#;
let request2 = r#"{"jsonrpc": "2.0", "method": "eth_getLogs", "params": [{}, 0], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":[],"id":1}"#;
assert_eq!(tester.io.handle_request_sync(request1), Some(response.to_owned()));
assert_eq!(tester.io.handle_request_sync(request2), Some(response.to_owned()));
}
#[test]
fn rpc_eth_submit_hashrate() {
let tester = EthTester::default();