parallelize account trie creation

This commit is contained in:
Robert Habermeier 2016-06-30 20:43:54 +02:00
parent 6ecd6eaa12
commit 456619001a

View File

@ -34,6 +34,8 @@ use util::rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View};
use self::account::Account; use self::account::Account;
use self::block::AbridgedBlock; use self::block::AbridgedBlock;
use crossbeam::{scope, ScopedJoinHandle};
mod account; mod account;
mod block; mod block;
@ -292,23 +294,41 @@ impl StateRebuilder {
pub fn feed(&mut self, compressed: &[u8]) -> Result<(), Error> { pub fn feed(&mut self, compressed: &[u8]) -> Result<(), Error> {
let len = try!(snappy::decompress_into(compressed, &mut self.snappy_buffer)); let len = try!(snappy::decompress_into(compressed, &mut self.snappy_buffer));
let rlp = UntrustedRlp::new(&self.snappy_buffer[..len]); let rlp = UntrustedRlp::new(&self.snappy_buffer[..len]);
let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect();
let mut pairs = Vec::with_capacity(rlp.item_count()); let mut pairs = Vec::with_capacity(rlp.item_count());
for account_pair in rlp.iter() { // initialize the pairs vector with empty values so we have slots to write into.
let hash: H256 = try!(account_pair.val_at(0)); for _ in 0..rlp.item_count() {
let fat_rlp = try!(account_pair.at(1)); pairs.push((H256::new(), Vec::new()));
let thin_rlp = {
let mut acct_db = AccountDBMut::from_hash(self.db.as_hashdb_mut(), hash);
// fill out the storage trie and code while decoding.
let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp));
acc.to_thin_rlp()
};
pairs.push((hash, thin_rlp));
} }
let chunk_size = account_fat_rlps.len() / ::num_cpus::get();
// build account tries in parallel.
try!(scope(|scope| {
let mut handles = Vec::new();
for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) {
let mut db = self.db.boxed_clone();
let handle: ScopedJoinHandle<Result<(), Error>> = scope.spawn(move || {
try!(rebuild_account_trie(db.as_hashdb_mut(), account_chunk, out_pairs_chunk));
// commit the db changes we made in this thread.
try!(db.commit(0, &H256::zero(), None));
Ok(())
});
handles.push(handle);
}
// see if we got any errors.
for handle in handles {
try!(handle.join());
}
Ok::<_, Error>(())
}));
// batch trie writes // batch trie writes
{ {
let mut account_trie = if self.state_root != H256::zero() { let mut account_trie = if self.state_root != H256::zero() {
@ -328,4 +348,25 @@ impl StateRebuilder {
/// Get the state root of the rebuilder. /// Get the state root of the rebuilder.
pub fn state_root(&self) -> H256 { self.state_root } pub fn state_root(&self) -> H256 { self.state_root }
}
fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mut [(H256, Bytes)]) -> Result<(), Error> {
for (account_pair, out) in account_chunk.into_iter().zip(out_chunk) {
let account_rlp = UntrustedRlp::new(account_pair);
let hash: H256 = try!(account_rlp.val_at(0));
let fat_rlp = try!(account_rlp.at(1));
let thin_rlp = {
let mut acct_db = AccountDBMut::from_hash(db.as_hashdb_mut(), hash);
// fill out the storage trie and code while decoding.
let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp));
acc.to_thin_rlp()
};
*out = (hash, thin_rlp);
}
Ok(())
} }