Merge pull request #1478 from ethcore/seal-import

Sealed block importing and propagation optimization
This commit is contained in:
Arkadiy Paronyan 2016-06-30 08:13:07 +02:00 committed by GitHub
commit 946c1fd76a
10 changed files with 163 additions and 72 deletions

View File

@ -164,6 +164,12 @@ pub trait IsBlock {
fn uncles(&self) -> &Vec<Header> { &self.block().base.uncles }
}
/// Trait for a object that has a state database.
pub trait Drain {
/// Drop this object and return the underlieing database.
fn drain(self) -> Box<JournalDB>;
}
impl IsBlock for ExecutedBlock {
fn block(&self) -> &ExecutedBlock { self }
}
@ -436,9 +442,11 @@ impl LockedBlock {
_ => Ok(SealedBlock { block: s.block, uncle_bytes: s.uncle_bytes }),
}
}
}
impl Drain for LockedBlock {
/// Drop this object and return the underlieing database.
pub fn drain(self) -> Box<JournalDB> { self.block.state.drop().1 }
fn drain(self) -> Box<JournalDB> { self.block.state.drop().1 }
}
impl SealedBlock {
@ -450,9 +458,11 @@ impl SealedBlock {
block_rlp.append_raw(&self.uncle_bytes, 1);
block_rlp.out()
}
}
impl Drain for SealedBlock {
/// Drop this object and return the underlieing database.
pub fn drain(self) -> Box<JournalDB> { self.block.state.drop().1 }
fn drain(self) -> Box<JournalDB> { self.block.state.drop().1 }
}
impl IsBlock for SealedBlock {

View File

@ -249,7 +249,7 @@ impl Client {
Ok(locked_block)
}
fn calculate_enacted_retracted(&self, import_results: Vec<ImportRoute>) -> (Vec<H256>, Vec<H256>) {
fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}
@ -259,12 +259,12 @@ impl Client {
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = import_results.into_iter().fold(HashMap::new(), |mut map, route| {
for hash in route.enacted {
map.insert(hash, true);
let map = import_results.iter().fold(HashMap::new(), |mut map, route| {
for hash in &route.enacted {
map.insert(hash.clone(), true);
}
for hash in route.retracted {
map.insert(hash, false);
for hash in &route.retracted {
map.insert(hash.clone(), false);
}
map
});
@ -301,36 +301,10 @@ impl Client {
invalid_blocks.insert(header.hash());
continue;
}
let closed_block = closed_block.unwrap();
imported_blocks.push(header.hash());
// Are we committing an era?
let ancient = if header.number() >= HISTORY {
let n = header.number() - HISTORY;
Some((n, self.chain.block_hash(n).unwrap()))
} else {
None
};
// Commit results
let closed_block = closed_block.unwrap();
let receipts = closed_block.block().receipts().clone();
let traces = From::from(closed_block.block().traces().clone().unwrap_or_else(Vec::new));
closed_block.drain()
.commit(header.number(), &header.hash(), ancient)
.expect("State DB commit failed.");
// And update the chain after commit to prevent race conditions
// (when something is in chain but you are not able to fetch details)
let route = self.chain.insert_block(&block.bytes, receipts);
self.tracedb.import(TraceImportRequest {
traces: traces,
block_hash: header.hash(),
block_number: header.number(),
enacted: route.enacted.clone(),
retracted: route.retracted.len()
});
let route = self.commit_block(closed_block, &header.hash(), &block.bytes);
import_results.push(route);
self.report.write().unwrap().accrue_block(&block);
@ -351,7 +325,7 @@ impl Client {
{
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() {
let (enacted, retracted) = self.calculate_enacted_retracted(import_results);
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);
if self.queue_info().is_empty() {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
@ -362,19 +336,47 @@ impl Client {
invalid: invalid_blocks,
enacted: enacted,
retracted: retracted,
sealed: Vec::new(),
})).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
}
}
{
if self.chain_info().best_block_hash != original_best {
self.miner.update_sealing(self);
}
if self.chain_info().best_block_hash != original_best {
self.miner.update_sealing(self);
}
imported
}
fn commit_block<B>(&self, block: B, hash: &H256, block_data: &Bytes) -> ImportRoute where B: IsBlock + Drain {
let number = block.header().number();
// Are we committing an era?
let ancient = if number >= HISTORY {
let n = number - HISTORY;
Some((n, self.chain.block_hash(n).unwrap()))
} else {
None
};
// Commit results
let receipts = block.receipts().clone();
let traces = From::from(block.traces().clone().unwrap_or_else(Vec::new));
block.drain().commit(number, hash, ancient).expect("State DB commit failed.");
// And update the chain after commit to prevent race conditions
// (when something is in chain but you are not able to fetch details)
let route = self.chain.insert_block(block_data, receipts);
self.tracedb.import(TraceImportRequest {
traces: traces,
block_hash: hash.clone(),
block_number: number,
enacted: route.enacted.clone(),
retracted: route.retracted.len()
});
route
}
/// Import transactions from the IO queue
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
let _timer = PerfTimer::new("import_queued_transactions");
@ -830,6 +832,40 @@ impl MiningBlockChainClient for Client {
fn vm_factory(&self) -> &EvmFactory {
&self.vm_factory
}
fn import_sealed_block(&self, block: SealedBlock) -> ImportResult {
let _import_lock = self.import_lock.lock();
let _timer = PerfTimer::new("import_sealed_block");
let original_best = self.chain_info().best_block_hash;
let h = block.header().hash();
let number = block.header().number();
let block_data = block.rlp_bytes();
let route = self.commit_block(block, &h, &block_data);
trace!(target: "client", "Imported sealed block #{} ({})", number, h);
{
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted);
self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
imported: vec![h.clone()],
invalid: vec![],
enacted: enacted,
retracted: retracted,
sealed: vec![h.clone()],
})).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
}
if self.chain_info().best_block_hash != original_best {
self.miner.update_sealing(self);
}
info!("Block {} ({}) submitted and imported.", h.hex(), number);
Ok(h)
}
}
impl MayPanic for Client {

View File

@ -37,7 +37,7 @@ use util::numbers::U256;
use util::Itertools;
use blockchain::TreeRoute;
use block_queue::BlockQueueInfo;
use block::OpenBlock;
use block::{OpenBlock, SealedBlock};
use header::{BlockNumber, Header};
use transaction::{LocalizedTransaction, SignedTransaction};
use log_entry::LocalizedLogEntry;
@ -253,4 +253,7 @@ pub trait MiningBlockChainClient : BlockChainClient {
/// Returns EvmFactory.
fn vm_factory(&self) -> &EvmFactory;
/// Import sealed block. Skips all verifications.
fn import_sealed_block(&self, block: SealedBlock) -> ImportResult;
}

View File

@ -32,7 +32,7 @@ use miner::{Miner, MinerService};
use spec::Spec;
use block_queue::BlockQueueInfo;
use block::OpenBlock;
use block::{OpenBlock, SealedBlock};
use executive::Executed;
use error::{ExecutionError};
use trace::LocalizedTrace;
@ -248,6 +248,10 @@ impl MiningBlockChainClient for TestBlockChainClient {
fn vm_factory(&self) -> &EvmFactory {
unimplemented!();
}
fn import_sealed_block(&self, _block: SealedBlock) -> ImportResult {
unimplemented!();
}
}
impl BlockChainClient for TestBlockChainClient {

View File

@ -589,26 +589,25 @@ impl MinerService for Miner {
}
fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
if let Some(b) = self.sealing_work.lock().unwrap().take_used_if(|b| &b.hash() == &pow_hash) {
let result = if let Some(b) = self.sealing_work.lock().unwrap().take_used_if(|b| &b.hash() == &pow_hash) {
match b.lock().try_seal(self.engine(), seal) {
Err(_) => {
info!(target: "miner", "Mined block rejected, PoW was invalid.");
Err(Error::PowInvalid)
}
Ok(sealed) => {
info!(target: "miner", "New block mined, hash: {}", sealed.header().hash());
// TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice.
let b = sealed.rlp_bytes();
let h = b.sha3();
try!(chain.import_block(b));
info!("Block {} submitted and imported.", h);
Ok(())
info!(target: "miner", "New block mined, hash: {}", sealed.header().hash().hex());
Ok(sealed)
}
}
} else {
info!(target: "miner", "Mined block rejected, PoW hash invalid or out of date.");
Err(Error::PowHashInvalid)
}
};
result.and_then(|sealed| {
try!(chain.import_sealed_block(sealed));
Ok(())
})
}
fn chain_new_blocks(&self, chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) {

View File

@ -36,6 +36,8 @@ pub enum SyncMessage {
retracted: Vec<H256>,
/// Hashes of blocks that are now included in cannonical chain
enacted: Vec<H256>,
/// Hashes of blocks that are sealed by this node
sealed: Vec<H256>,
},
/// Best Block Hash in chain has been changed
NewChainHead,

View File

@ -17,7 +17,7 @@
use client::{BlockChainClient, Client, ClientConfig};
use common::*;
use spec::*;
use block::{OpenBlock};
use block::{OpenBlock, Drain};
use blockchain::{BlockChain, Config as BlockChainConfig};
use state::*;
use evm::Schedule;

View File

@ -1231,6 +1231,14 @@ impl ChainSync {
rlp_stream.out()
}
/// creates latest block rlp for the given client
fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes {
let mut rlp_stream = RlpStream::new_list(2);
rlp_stream.append_raw(&chain.block(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed"), 1);
rlp_stream.append(&chain.block_total_difficulty(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed."));
rlp_stream.out()
}
/// returns peer ids that have less blocks than our chain
fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> {
let latest_hash = chain_info.best_block_hash;
@ -1250,7 +1258,6 @@ impl ChainSync {
.collect::<Vec<_>>()
}
fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> {
use rand::Rng;
let mut lagging_peers = self.get_lagging_peers(chain_info, io);
@ -1263,13 +1270,24 @@ impl ChainSync {
}
/// propagates latest block to lagging peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize {
let lucky_peers = self.select_lagging_peers(chain_info, io);
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256]) -> usize {
let lucky_peers: Vec<_> = if sealed.is_empty() {
self.select_lagging_peers(chain_info, io).iter().map(|&(id, _)| id).collect()
} else {
self.peers.keys().cloned().collect()
};
trace!(target: "sync", "Sending NewBlocks to {:?}", lucky_peers);
let mut sent = 0;
for (peer_id, _) in lucky_peers {
let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
for peer_id in lucky_peers {
if sealed.is_empty() {
let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
} else {
for h in sealed {
let rlp = ChainSync::create_new_block_rlp(io.chain(), h);
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp);
}
}
self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone();
self.peers.get_mut(&peer_id).unwrap().latest_number = Some(chain_info.best_block_number);
sent += 1;
@ -1346,11 +1364,11 @@ impl ChainSync {
sent
}
fn propagate_latest_blocks(&mut self, io: &mut SyncIo) {
fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) {
let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let hashes = self.propagate_new_hashes(&chain_info, io);
let blocks = self.propagate_blocks(&chain_info, io);
let blocks = self.propagate_blocks(&chain_info, io, sealed);
if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
}
@ -1365,10 +1383,10 @@ impl ChainSync {
}
/// called when block is imported to chain, updates transactions queue and propagates the blocks
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) {
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) {
if io.is_chain_queue_empty() {
// Propagate latests blocks
self.propagate_latest_blocks(io);
self.propagate_latest_blocks(io, sealed);
}
if !invalid.is_empty() {
trace!(target: "sync", "Bad blocks in the queue, restarting");
@ -1637,7 +1655,26 @@ mod tests {
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let chain_info = client.chain_info();
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagate_blocks(&chain_info, &mut io);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[]);
// 1 message should be send
assert_eq!(1, io.queue.len());
// 1 peer should be updated
assert_eq!(1, peer_count);
// NEW_BLOCK_PACKET
assert_eq!(0x07, io.queue[0].packet_id);
}
#[test]
fn sends_sealed_block() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
let mut queue = VecDeque::new();
let hash = client.block_hash(BlockID::Number(99)).unwrap();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
let chain_info = client.chain_info();
let mut io = TestIo::new(&mut client, &mut queue, None);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()]);
// 1 message should be send
assert_eq!(1, io.queue.len());
@ -1761,7 +1798,7 @@ mod tests {
let chain_info = client.chain_info();
let mut io = TestIo::new(&mut client, &mut queue, None);
sync.propagate_blocks(&chain_info, &mut io);
sync.propagate_blocks(&chain_info, &mut io, &[]);
let data = &io.queue[0].data.clone();
let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(data));
@ -1794,7 +1831,7 @@ mod tests {
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 1);
}
@ -1808,7 +1845,7 @@ mod tests {
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]);
}
// then
@ -1833,10 +1870,10 @@ mod tests {
let mut io = TestIo::new(&mut client, &mut queue, None);
// when
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 0);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]);
// then
let status = io.chain.miner.status();

View File

@ -196,9 +196,9 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
#[cfg_attr(feature="dev", allow(single_match))]
fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
match *message {
SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted } => {
SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted, ref sealed } => {
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted);
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted, sealed);
},
_ => {/* Ignore other messages */},
}

View File

@ -173,6 +173,6 @@ impl TestNet {
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let mut peer = self.peer_mut(peer_id);
peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]);
peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[], &[]);
}
}