diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 307015812..9e85b9a3e 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -34,6 +34,8 @@ use util::rlp::{DecoderError, RlpStream, Stream, UntrustedRlp, View}; use self::account::Account; use self::block::AbridgedBlock; +use crossbeam::{scope, ScopedJoinHandle}; + mod account; mod block; @@ -292,23 +294,41 @@ impl StateRebuilder { pub fn feed(&mut self, compressed: &[u8]) -> Result<(), Error> { let len = try!(snappy::decompress_into(compressed, &mut self.snappy_buffer)); let rlp = UntrustedRlp::new(&self.snappy_buffer[..len]); + let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect(); let mut pairs = Vec::with_capacity(rlp.item_count()); - for account_pair in rlp.iter() { - let hash: H256 = try!(account_pair.val_at(0)); - let fat_rlp = try!(account_pair.at(1)); - - let thin_rlp = { - let mut acct_db = AccountDBMut::from_hash(self.db.as_hashdb_mut(), hash); - - // fill out the storage trie and code while decoding. - let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp)); - acc.to_thin_rlp() - }; - - pairs.push((hash, thin_rlp)); + // initialize the pairs vector with empty values so we have slots to write into. + for _ in 0..rlp.item_count() { + pairs.push((H256::new(), Vec::new())); } + let chunk_size = account_fat_rlps.len() / ::num_cpus::get(); + + // build account tries in parallel. + try!(scope(|scope| { + let mut handles = Vec::new(); + for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) { + let mut db = self.db.boxed_clone(); + let handle: ScopedJoinHandle> = scope.spawn(move || { + try!(rebuild_account_trie(db.as_hashdb_mut(), account_chunk, out_pairs_chunk)); + + // commit the db changes we made in this thread. + try!(db.commit(0, &H256::zero(), None)); + + Ok(()) + }); + + handles.push(handle); + } + + // see if we got any errors. + for handle in handles { + try!(handle.join()); + } + + Ok::<_, Error>(()) + })); + // batch trie writes { let mut account_trie = if self.state_root != H256::zero() { @@ -328,4 +348,25 @@ impl StateRebuilder { /// Get the state root of the rebuilder. pub fn state_root(&self) -> H256 { self.state_root } +} + +fn rebuild_account_trie(db: &mut HashDB, account_chunk: &[&[u8]], out_chunk: &mut [(H256, Bytes)]) -> Result<(), Error> { + for (account_pair, out) in account_chunk.into_iter().zip(out_chunk) { + let account_rlp = UntrustedRlp::new(account_pair); + + let hash: H256 = try!(account_rlp.val_at(0)); + let fat_rlp = try!(account_rlp.at(1)); + + let thin_rlp = { + let mut acct_db = AccountDBMut::from_hash(db.as_hashdb_mut(), hash); + + // fill out the storage trie and code while decoding. + let acc = try!(Account::from_fat_rlp(&mut acct_db, fat_rlp)); + + acc.to_thin_rlp() + }; + + *out = (hash, thin_rlp); + } + Ok(()) } \ No newline at end of file