Fetching logs by hash in blockchain database (#8463)

* Fetch logs by hash in blockchain database

* Fix tests

* Add unit test for branch block logs fetching

* Add docs that blocks must already be sorted

* Handle branch block cases properly

* typo: empty -> is_empty

* Remove return_empty_if_none by using a closure

* Use BTreeSet to avoid sorting again

* Move is_canon to BlockChain

* typo: pass value by reference

* Use loop and wrap inside blocks to simplify the code

Borrowed from https://github.com/paritytech/parity/pull/8463#discussion_r183453326

* typo: missed a comment
This commit is contained in:
Wei Tang 2018-05-02 15:40:27 +08:00 committed by Marek Kotewicz
parent 8e8679807d
commit b10094508f
3 changed files with 129 additions and 24 deletions

View File

@ -57,6 +57,12 @@ pub trait BlockProvider {
/// (though not necessarily a part of the canon chain). /// (though not necessarily a part of the canon chain).
fn is_known(&self, hash: &H256) -> bool; fn is_known(&self, hash: &H256) -> bool;
/// Returns true if the given block is known and in the canon chain.
fn is_canon(&self, hash: &H256) -> bool {
let is_canon = || Some(hash == &self.block_hash(self.block_number(hash)?)?);
is_canon().unwrap_or(false)
}
/// Get the first block of the best part of the chain. /// Get the first block of the best part of the chain.
/// Return `None` if there is no gap and the first block is the genesis. /// Return `None` if there is no gap and the first block is the genesis.
/// Any queries of blocks which precede this one are not guaranteed to /// Any queries of blocks which precede this one are not guaranteed to
@ -148,7 +154,7 @@ pub trait BlockProvider {
fn blocks_with_bloom(&self, bloom: &Bloom, from_block: BlockNumber, to_block: BlockNumber) -> Vec<BlockNumber>; fn blocks_with_bloom(&self, bloom: &Bloom, from_block: BlockNumber, to_block: BlockNumber) -> Vec<BlockNumber>;
/// Returns logs matching given filter. /// Returns logs matching given filter.
fn logs<F>(&self, blocks: Vec<BlockNumber>, matches: F, limit: Option<usize>) -> Vec<LocalizedLogEntry> fn logs<F>(&self, blocks: Vec<H256>, matches: F, limit: Option<usize>) -> Vec<LocalizedLogEntry>
where F: Fn(&LogEntry) -> bool + Send + Sync, Self: Sized; where F: Fn(&LogEntry) -> bool + Send + Sync, Self: Sized;
} }
@ -332,16 +338,18 @@ impl BlockProvider for BlockChain {
.collect() .collect()
} }
fn logs<F>(&self, mut blocks: Vec<BlockNumber>, matches: F, limit: Option<usize>) -> Vec<LocalizedLogEntry> /// Returns logs matching given filter. The order of logs returned will be the same as the order of the blocks
/// provided. And it's the callers responsibility to sort blocks provided in advance.
fn logs<F>(&self, mut blocks: Vec<H256>, matches: F, limit: Option<usize>) -> Vec<LocalizedLogEntry>
where F: Fn(&LogEntry) -> bool + Send + Sync, Self: Sized { where F: Fn(&LogEntry) -> bool + Send + Sync, Self: Sized {
// sort in reverse order // sort in reverse order
blocks.sort_by(|a, b| b.cmp(a)); blocks.reverse();
let mut logs = blocks let mut logs = blocks
.chunks(128) .chunks(128)
.flat_map(move |blocks_chunk| { .flat_map(move |blocks_chunk| {
blocks_chunk.into_par_iter() blocks_chunk.into_par_iter()
.filter_map(|number| self.block_hash(*number).map(|hash| (*number, hash))) .filter_map(|hash| self.block_number(&hash).map(|r| (r, hash)))
.filter_map(|(number, hash)| self.block_receipts(&hash).map(|r| (number, hash, r.receipts))) .filter_map(|(number, hash)| self.block_receipts(&hash).map(|r| (number, hash, r.receipts)))
.filter_map(|(number, hash, receipts)| self.block_body(&hash).map(|ref b| (number, hash, receipts, b.transaction_hashes()))) .filter_map(|(number, hash, receipts)| self.block_body(&hash).map(|ref b| (number, hash, receipts, b.transaction_hashes())))
.flat_map(|(number, hash, mut receipts, mut hashes)| { .flat_map(|(number, hash, mut receipts, mut hashes)| {
@ -368,7 +376,7 @@ impl BlockProvider for BlockChain {
.enumerate() .enumerate()
.map(move |(i, log)| LocalizedLogEntry { .map(move |(i, log)| LocalizedLogEntry {
entry: log, entry: log,
block_hash: hash, block_hash: *hash,
block_number: number, block_number: number,
transaction_hash: tx_hash, transaction_hash: tx_hash,
// iterating in reverse order // iterating in reverse order
@ -1933,17 +1941,33 @@ mod tests {
value: 103.into(), value: 103.into(),
data: "601080600c6000396000f3006000355415600957005b60203560003555".from_hex().unwrap(), data: "601080600c6000396000f3006000355415600957005b60203560003555".from_hex().unwrap(),
}.sign(&secret(), None); }.sign(&secret(), None);
let t4 = Transaction {
nonce: 0.into(),
gas_price: 0.into(),
gas: 100_000.into(),
action: Action::Create,
value: 104.into(),
data: "601080600c6000396000f3006000355415600957005b60203560003555".from_hex().unwrap(),
}.sign(&secret(), None);
let tx_hash1 = t1.hash(); let tx_hash1 = t1.hash();
let tx_hash2 = t2.hash(); let tx_hash2 = t2.hash();
let tx_hash3 = t3.hash(); let tx_hash3 = t3.hash();
let tx_hash4 = t4.hash();
let genesis = BlockBuilder::genesis(); let genesis = BlockBuilder::genesis();
let b1 = genesis.add_block_with_transactions(vec![t1, t2]); let b1 = genesis.add_block_with_transactions(vec![t1, t2]);
let b2 = b1.add_block_with_transactions(iter::once(t3)); let b2 = b1.add_block_with_transactions(iter::once(t3));
let b3 = genesis.add_block_with(|| BlockOptions {
transactions: vec![t4.clone()],
difficulty: U256::from(9),
..Default::default()
}); // Branch block
let b1_hash = b1.last().hash(); let b1_hash = b1.last().hash();
let b1_number = b1.last().number(); let b1_number = b1.last().number();
let b2_hash = b2.last().hash(); let b2_hash = b2.last().hash();
let b2_number = b2.last().number(); let b2_number = b2.last().number();
let b3_hash = b3.last().hash();
let b3_number = b3.last().number();
let db = new_db(); let db = new_db();
let bc = new_chain(&genesis.last().encoded(), db.clone()); let bc = new_chain(&genesis.last().encoded(), db.clone());
@ -1974,10 +1998,21 @@ mod tests {
], ],
} }
]); ]);
insert_block(&db, &bc, &b3.last().encoded(), vec![
Receipt {
outcome: TransactionOutcome::StateRoot(H256::default()),
gas_used: 10_000.into(),
log_bloom: Default::default(),
logs: vec![
LogEntry { address: Default::default(), topics: vec![], data: vec![5], },
],
}
]);
// when // when
let logs1 = bc.logs(vec![1, 2], |_| true, None); let logs1 = bc.logs(vec![b1_hash, b2_hash], |_| true, None);
let logs2 = bc.logs(vec![1, 2], |_| true, Some(1)); let logs2 = bc.logs(vec![b1_hash, b2_hash], |_| true, Some(1));
let logs3 = bc.logs(vec![b3_hash], |_| true, None);
// then // then
assert_eq!(logs1, vec![ assert_eq!(logs1, vec![
@ -2029,6 +2064,17 @@ mod tests {
log_index: 0, log_index: 0,
} }
]); ]);
assert_eq!(logs3, vec![
LocalizedLogEntry {
entry: LogEntry { address: Default::default(), topics: vec![], data: vec![5] },
block_hash: b3_hash,
block_number: b3_number,
transaction_hash: tx_hash4,
transaction_index: 0,
transaction_log_index: 0,
log_index: 0,
}
]);
} }
#[test] #[test]

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashSet, HashMap, BTreeMap, VecDeque}; use std::collections::{HashSet, HashMap, BTreeMap, BTreeSet, VecDeque};
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
@ -1848,23 +1848,82 @@ impl BlockChainClient for Client {
} }
fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry> { fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry> {
let (from, to) = match (self.block_number_ref(&filter.from_block), self.block_number_ref(&filter.to_block)) { // Wrap the logic inside a closure so that we can take advantage of question mark syntax.
(Some(from), Some(to)) => (from, to), let fetch_logs = || {
_ => return Vec::new(), let chain = self.chain.read();
// First, check whether `filter.from_block` and `filter.to_block` is on the canon chain. If so, we can use the
// optimized version.
let is_canon = |id| {
match id {
// If it is referred by number, then it is always on the canon chain.
&BlockId::Earliest | &BlockId::Latest | &BlockId::Number(_) => true,
// If it is referred by hash, we see whether a hash -> number -> hash conversion gives us the same
// result.
&BlockId::Hash(ref hash) => chain.is_canon(hash),
}
}; };
let chain = self.chain.read(); let blocks = if is_canon(&filter.from_block) && is_canon(&filter.to_block) {
let blocks = filter.bloom_possibilities().iter() // If we are on the canon chain, use bloom filter to fetch required hashes.
.map(move |bloom| { let from = self.block_number_ref(&filter.from_block)?;
let to = self.block_number_ref(&filter.to_block)?;
filter.bloom_possibilities().iter()
.map(|bloom| {
chain.blocks_with_bloom(bloom, from, to) chain.blocks_with_bloom(bloom, from, to)
}) })
.flat_map(|m| m) .flat_map(|m| m)
// remove duplicate elements // remove duplicate elements
.collect::<HashSet<u64>>() .collect::<BTreeSet<u64>>()
.into_iter() .into_iter()
.collect::<Vec<u64>>(); .filter_map(|n| chain.block_hash(n))
.collect::<Vec<H256>>()
self.chain.read().logs(blocks, |entry| filter.matches(entry), filter.limit) } else {
// Otherwise, we use a slower version that finds a link between from_block and to_block.
let from_hash = Self::block_hash(&chain, filter.from_block)?;
let from_number = chain.block_number(&from_hash)?;
let to_hash = Self::block_hash(&chain, filter.from_block)?;
let blooms = filter.bloom_possibilities();
let bloom_match = |header: &encoded::Header| {
blooms.iter().any(|bloom| header.log_bloom().contains_bloom(bloom))
};
let (blocks, last_hash) = {
let mut blocks = Vec::new();
let mut current_hash = to_hash;
loop {
let header = chain.block_header_data(&current_hash)?;
if bloom_match(&header) {
blocks.push(current_hash);
}
// Stop if `from` block is reached.
if header.number() <= from_number {
break;
}
current_hash = header.parent_hash();
}
blocks.reverse();
(blocks, current_hash)
};
// Check if we've actually reached the expected `from` block.
if last_hash != from_hash || blocks.is_empty() {
return None;
}
blocks
};
Some(self.chain.read().logs(blocks, |entry| filter.matches(entry), filter.limit))
};
fetch_logs().unwrap_or_default()
} }
fn filter_traces(&self, filter: TraceFilter) -> Option<Vec<LocalizedTrace>> { fn filter_traces(&self, filter: TraceFilter) -> Option<Vec<LocalizedTrace>> {

View File

@ -476,7 +476,7 @@ mod tests {
unimplemented!() unimplemented!()
} }
fn logs<F>(&self, _blocks: Vec<BlockNumber>, _matches: F, _limit: Option<usize>) -> Vec<LocalizedLogEntry> fn logs<F>(&self, _blocks: Vec<H256>, _matches: F, _limit: Option<usize>) -> Vec<LocalizedLogEntry>
where F: Fn(&LogEntry) -> bool, Self: Sized { where F: Fn(&LogEntry) -> bool, Self: Sized {
unimplemented!() unimplemented!()
} }