Backports to beta v1.3.6 (#2571)

* v1.3.6

* Print backtrace on panic (#2535)

* Don't activate peers on connect; Test (#2537)

* Removing unwarps from sync module (#2551)

* Remove unwrap from client module (#2554)

* remove unwraps in client

* imporve block hash expect message

* mining perf trace

* Fixed race condition in trace import (#2555)
This commit is contained in:
Arkadiy Paronyan 2016-10-11 15:53:45 +02:00 committed by GitHub
parent 4b7c97783b
commit 8492e3e61d
13 changed files with 244 additions and 179 deletions

26
Cargo.lock generated
View File

@ -1,6 +1,6 @@
[root] [root]
name = "parity" name = "parity"
version = "1.3.5" version = "1.3.6"
dependencies = [ dependencies = [
"ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
@ -20,7 +20,7 @@ dependencies = [
"ethcore-logger 1.3.0", "ethcore-logger 1.3.0",
"ethcore-rpc 1.3.0", "ethcore-rpc 1.3.0",
"ethcore-signer 1.3.0", "ethcore-signer 1.3.0",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"ethsync 1.3.0", "ethsync 1.3.0",
"fdlimit 0.1.0", "fdlimit 0.1.0",
"hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -270,7 +270,7 @@ dependencies = [
"ethcore-ipc 1.3.0", "ethcore-ipc 1.3.0",
"ethcore-ipc-codegen 1.3.0", "ethcore-ipc-codegen 1.3.0",
"ethcore-ipc-nano 1.3.0", "ethcore-ipc-nano 1.3.0",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"ethjson 0.1.0", "ethjson 0.1.0",
"ethstore 0.1.0", "ethstore 0.1.0",
"evmjit 1.3.0", "evmjit 1.3.0",
@ -294,7 +294,7 @@ version = "1.3.0"
dependencies = [ dependencies = [
"clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-rpc 1.3.0", "ethcore-rpc 1.3.0",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "hyper 0.9.4 (git+https://github.com/ethcore/hyper)",
"jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-http-server 6.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git)", "jsonrpc-http-server 6.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git)",
@ -336,7 +336,7 @@ name = "ethcore-ipc"
version = "1.3.0" version = "1.3.0"
dependencies = [ dependencies = [
"ethcore-devtools 1.3.0", "ethcore-devtools 1.3.0",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -381,7 +381,7 @@ dependencies = [
"ethcore-ipc 1.3.0", "ethcore-ipc 1.3.0",
"ethcore-ipc-codegen 1.3.0", "ethcore-ipc-codegen 1.3.0",
"ethcore-ipc-nano 1.3.0", "ethcore-ipc-nano 1.3.0",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -393,7 +393,7 @@ name = "ethcore-logger"
version = "1.3.0" version = "1.3.0"
dependencies = [ dependencies = [
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -408,7 +408,7 @@ dependencies = [
"ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-devtools 1.3.0", "ethcore-devtools 1.3.0",
"ethcore-io 1.3.0", "ethcore-io 1.3.0",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"igd 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "igd 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -432,7 +432,7 @@ dependencies = [
"ethcore-devtools 1.3.0", "ethcore-devtools 1.3.0",
"ethcore-io 1.3.0", "ethcore-io 1.3.0",
"ethcore-ipc 1.3.0", "ethcore-ipc 1.3.0",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"ethjson 0.1.0", "ethjson 0.1.0",
"ethsync 1.3.0", "ethsync 1.3.0",
"json-ipc-server 0.2.4 (git+https://github.com/ethcore/json-ipc-server.git?branch=beta)", "json-ipc-server 0.2.4 (git+https://github.com/ethcore/json-ipc-server.git?branch=beta)",
@ -455,7 +455,7 @@ dependencies = [
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-io 1.3.0", "ethcore-io 1.3.0",
"ethcore-rpc 1.3.0", "ethcore-rpc 1.3.0",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-dapps-signer 1.4.0 (git+https://github.com/ethcore/parity-ui.git)", "parity-dapps-signer 1.4.0 (git+https://github.com/ethcore/parity-ui.git)",
@ -466,7 +466,7 @@ dependencies = [
[[package]] [[package]]
name = "ethcore-util" name = "ethcore-util"
version = "1.3.5" version = "1.3.6"
dependencies = [ dependencies = [
"ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"arrayvec 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", "arrayvec 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
@ -499,7 +499,7 @@ dependencies = [
name = "ethjson" name = "ethjson"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_codegen 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", "serde_codegen 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)",
@ -547,7 +547,7 @@ dependencies = [
"ethcore-ipc-codegen 1.3.0", "ethcore-ipc-codegen 1.3.0",
"ethcore-ipc-nano 1.3.0", "ethcore-ipc-nano 1.3.0",
"ethcore-network 1.3.0", "ethcore-network 1.3.0",
"ethcore-util 1.3.5", "ethcore-util 1.3.6",
"heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,7 +1,7 @@
[package] [package]
description = "Ethcore client." description = "Ethcore client."
name = "parity" name = "parity"
version = "1.3.5" version = "1.3.6"
license = "GPL-3.0" license = "GPL-3.0"
authors = ["Ethcore <admin@ethcore.io>"] authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs" build = "build.rs"

View File

@ -159,13 +159,6 @@ pub const DB_COL_ACCOUNT_BLOOM: Option<u32> = Some(5);
/// Number of columns in DB /// Number of columns in DB
pub const DB_NO_OF_COLUMNS: Option<u32> = Some(6); pub const DB_NO_OF_COLUMNS: Option<u32> = Some(6);
/// Append a path element to the given path and return the string.
pub fn append_path<P>(path: P, item: &str) -> String where P: AsRef<Path> {
let mut p = path.as_ref().to_path_buf();
p.push(item);
p.to_str().unwrap().to_owned()
}
impl Client { impl Client {
/// Create a new client with given spec and DB path and custom verifier. /// Create a new client with given spec and DB path and custom verifier.
pub fn new( pub fn new(
@ -182,7 +175,7 @@ impl Client {
db_config.compaction = config.db_compaction.compaction_profile(); db_config.compaction = config.db_compaction.compaction_profile();
db_config.wal = config.db_wal; db_config.wal = config.db_wal;
let db = Arc::new(try!(Database::open(&db_config, &path.to_str().unwrap()).map_err(ClientError::Database))); let db = Arc::new(try!(Database::open(&db_config, &path.to_str().expect("DB path could not be converted to string.")).map_err(ClientError::Database)));
let chain = Arc::new(BlockChain::new(config.blockchain, &gb, db.clone())); let chain = Arc::new(BlockChain::new(config.blockchain, &gb, db.clone()));
let tracedb = Arc::new(try!(TraceDB::new(config.tracing, db.clone(), chain.clone()))); let tracedb = Arc::new(try!(TraceDB::new(config.tracing, db.clone(), chain.clone())));
@ -295,31 +288,28 @@ impl Client {
}; };
// Check if Parent is in chain // Check if Parent is in chain
let chain_has_parent = self.chain.block_header(&header.parent_hash); let chain_has_parent = self.chain.block_header(header.parent_hash());
if let None = chain_has_parent { if let Some(parent) = chain_has_parent {
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash);
return Err(());
};
// Enact Verified Block // Enact Verified Block
let parent = chain_has_parent.unwrap();
let last_hashes = self.build_last_hashes(header.parent_hash().clone()); let last_hashes = self.build_last_hashes(header.parent_hash().clone());
let db = self.state_db.lock().boxed_clone_canon(&header.parent_hash); let db = self.state_db.lock().boxed_clone_canon(&header.parent_hash());
let enact_result = enact_verified(block, engine, self.tracedb.tracing_enabled(), db, &parent, last_hashes, &self.vm_factory, self.trie_factory.clone()); let enact_result = enact_verified(block, engine, self.tracedb.tracing_enabled(), db, &parent, last_hashes, &self.vm_factory, self.trie_factory.clone());
if let Err(e) = enact_result { let locked_block = try!(enact_result.map_err(|e| {
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(()); }));
};
// Final Verification // Final Verification
let locked_block = enact_result.unwrap();
if let Err(e) = self.verifier.verify_block_final(header, locked_block.block().header()) { if let Err(e) = self.verifier.verify_block_final(header, locked_block.block().header()) {
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(()); return Err(());
} }
Ok(locked_block) Ok(locked_block)
} else {
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash());
Err(())
}
} }
fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) { fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
@ -372,14 +362,17 @@ impl Client {
invalid_blocks.insert(header.hash()); invalid_blocks.insert(header.hash());
continue; continue;
} }
let is_invalid = invalid_blocks.contains(header.parent_hash());
let closed_block = closed_block.unwrap(); if let (false, Ok(closed_block)) = (is_invalid, self.check_and_close_block(&block)) {
imported_blocks.push(header.hash()); imported_blocks.push(header.hash());
let route = self.commit_block(closed_block, &header.hash(), &block.bytes); let route = self.commit_block(closed_block, &header.hash(), &block.bytes);
import_results.push(route); import_results.push(route);
self.report.write().accrue_block(&block); self.report.write().accrue_block(&block);
} else {
invalid_blocks.insert(header.hash());
}
} }
let imported = imported_blocks.len(); let imported = imported_blocks.len();
@ -428,7 +421,7 @@ impl Client {
// Are we committing an era? // Are we committing an era?
let ancient = if number >= HISTORY { let ancient = if number >= HISTORY {
let n = number - HISTORY; let n = number - HISTORY;
Some((n, self.chain.block_hash(n).unwrap())) Some((n, self.chain.block_hash(n).expect("only verified blocks can be commited; verified block has hash; qed")))
} else { } else {
None None
}; };
@ -835,8 +828,9 @@ impl BlockChainClient for Client {
let t = self.chain.block_body(&address.block_hash) let t = self.chain.block_body(&address.block_hash)
.and_then(|block| BodyView::new(&block).localized_transaction_at(&address.block_hash, block_number, address.index)); .and_then(|block| BodyView::new(&block).localized_transaction_at(&address.block_hash, block_number, address.index));
match (t, self.chain.transaction_receipt(&address)) { let tx_and_sender = t.and_then(|tx| tx.sender().ok().map(|sender| (tx, sender)));
(Some(tx), Some(receipt)) => { match (tx_and_sender, self.chain.transaction_receipt(&address)) {
(Some((tx, sender)), Some(receipt)) => {
let block_hash = tx.block_hash.clone(); let block_hash = tx.block_hash.clone();
let block_number = tx.block_number.clone(); let block_number = tx.block_number.clone();
let transaction_hash = tx.hash(); let transaction_hash = tx.hash();
@ -858,7 +852,7 @@ impl BlockChainClient for Client {
gas_used: receipt.gas_used - prior_gas_used, gas_used: receipt.gas_used - prior_gas_used,
contract_address: match tx.action { contract_address: match tx.action {
Action::Call(_) => None, Action::Call(_) => None,
Action::Create => Some(contract_address(&tx.sender().unwrap(), &tx.nonce)) Action::Create => Some(contract_address(&sender, &tx.nonce))
}, },
logs: receipt.logs.into_iter().enumerate().map(|(i, log)| LocalizedLogEntry { logs: receipt.logs.into_iter().enumerate().map(|(i, log)| LocalizedLogEntry {
entry: log, entry: log,
@ -981,17 +975,18 @@ impl BlockChainClient for Client {
let start = self.block_number(filter.range.start); let start = self.block_number(filter.range.start);
let end = self.block_number(filter.range.end); let end = self.block_number(filter.range.end);
if start.is_some() && end.is_some() { match (start, end) {
(Some(s), Some(e)) => {
let filter = trace::Filter { let filter = trace::Filter {
range: start.unwrap() as usize..end.unwrap() as usize, range: s as usize..e as usize,
from_address: From::from(filter.from_address), from_address: From::from(filter.from_address),
to_address: From::from(filter.to_address), to_address: From::from(filter.to_address),
}; };
let traces = self.tracedb.filter(&filter); let traces = self.tracedb.filter(&filter);
Some(traces) Some(traces)
} else { },
None _ => None,
} }
} }
@ -1063,11 +1058,15 @@ impl MiningBlockChainClient for Client {
// Add uncles // Add uncles
self.chain self.chain
.find_uncle_headers(&h, engine.maximum_uncle_age()) .find_uncle_headers(&h, engine.maximum_uncle_age())
.unwrap() .unwrap_or_else(Vec::new)
.into_iter() .into_iter()
.take(engine.maximum_uncle_count()) .take(engine.maximum_uncle_count())
.foreach(|h| { .foreach(|h| {
open_block.push_uncle(h).unwrap(); open_block.push_uncle(h).expect("pushing maximum_uncle_count;
open_block was just created;
push_uncle is not ok only if more than maximum_uncle_count is pushed;
so all push_uncle are Ok;
qed");
}); });
open_block open_block

View File

@ -53,6 +53,8 @@ pub struct TestBlockChainClient {
pub genesis_hash: H256, pub genesis_hash: H256,
/// Last block hash. /// Last block hash.
pub last_hash: RwLock<H256>, pub last_hash: RwLock<H256>,
/// Extra data do set for each block
pub extra_data: Bytes,
/// Difficulty. /// Difficulty.
pub difficulty: RwLock<U256>, pub difficulty: RwLock<U256>,
/// Balances. /// Balances.
@ -99,11 +101,17 @@ impl Default for TestBlockChainClient {
impl TestBlockChainClient { impl TestBlockChainClient {
/// Creates new test client. /// Creates new test client.
pub fn new() -> Self { pub fn new() -> Self {
Self::new_with_extra_data(Bytes::new())
}
/// Creates new test client with specified extra data for each block
pub fn new_with_extra_data(extra_data: Bytes) -> Self {
let spec = Spec::new_test(); let spec = Spec::new_test();
let mut client = TestBlockChainClient { let mut client = TestBlockChainClient {
blocks: RwLock::new(HashMap::new()), blocks: RwLock::new(HashMap::new()),
numbers: RwLock::new(HashMap::new()), numbers: RwLock::new(HashMap::new()),
genesis_hash: H256::new(), genesis_hash: H256::new(),
extra_data: extra_data,
last_hash: RwLock::new(H256::new()), last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)), difficulty: RwLock::new(From::from(0)),
balances: RwLock::new(HashMap::new()), balances: RwLock::new(HashMap::new()),
@ -166,6 +174,7 @@ impl TestBlockChainClient {
header.parent_hash = self.last_hash.read().clone(); header.parent_hash = self.last_hash.read().clone();
header.number = n as BlockNumber; header.number = n as BlockNumber;
header.gas_limit = U256::from(1_000_000); header.gas_limit = U256::from(1_000_000);
header.extra_data = self.extra_data.clone();
let uncles = match with { let uncles = match with {
EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => {
let mut uncles = RlpStream::new_list(1); let mut uncles = RlpStream::new_list(1);

View File

@ -249,6 +249,7 @@ impl Miner {
fn prepare_sealing(&self, chain: &MiningBlockChainClient) { fn prepare_sealing(&self, chain: &MiningBlockChainClient) {
trace!(target: "miner", "prepare_sealing: entering"); trace!(target: "miner", "prepare_sealing: entering");
let _timer = PerfTimer::new("prepare_sealing");
{ {
trace!(target: "miner", "recalibrating..."); trace!(target: "miner", "recalibrating...");
let txq = self.transaction_queue.clone(); let txq = self.transaction_queue.clone();

View File

@ -268,16 +268,6 @@ impl<T> TraceDatabase for TraceDB<T> where T: DatabaseExtras {
return; return;
} }
// at first, let's insert new block traces
{
let mut traces = self.traces.write();
// it's important to use overwrite here,
// cause this value might be queried by hash later
batch.write_with_cache(DB_COL_TRACE, &mut *traces, request.block_hash, request.traces, CacheUpdatePolicy::Overwrite);
// note_used must be called after locking traces to avoid cache/traces deadlock on garbage collection
self.note_used(CacheID::Trace(request.block_hash.clone()));
}
// now let's rebuild the blooms // now let's rebuild the blooms
{ {
let range_start = request.block_number as Number + 1 - request.enacted.len(); let range_start = request.block_number as Number + 1 - request.enacted.len();
@ -288,12 +278,25 @@ impl<T> TraceDatabase for TraceDB<T> where T: DatabaseExtras {
// all traces are expected to be found here. That's why `expect` has been used // all traces are expected to be found here. That's why `expect` has been used
// instead of `filter_map`. If some traces haven't been found, it meens that // instead of `filter_map`. If some traces haven't been found, it meens that
// traces database is corrupted or incomplete. // traces database is corrupted or incomplete.
.map(|block_hash| self.traces(block_hash).expect("Traces database is incomplete.")) .map(|block_hash| if block_hash == &request.block_hash {
.map(|block_traces| block_traces.bloom()) request.traces.bloom()
} else {
self.traces(block_hash).expect("Traces database is incomplete.").bloom()
})
.map(blooms::Bloom::from) .map(blooms::Bloom::from)
.map(Into::into) .map(Into::into)
.collect(); .collect();
// insert new block traces into the cache and the database
{
let mut traces = self.traces.write();
// it's important to use overwrite here,
// cause this value might be queried by hash later
batch.write_with_cache(DB_COL_TRACE, &mut *traces, request.block_hash, request.traces, CacheUpdatePolicy::Overwrite);
// note_used must be called after locking traces to avoid cache/traces deadlock on garbage collection
self.note_used(CacheID::Trace(request.block_hash.clone()));
}
let chain = BloomGroupChain::new(self.bloom_config, self); let chain = BloomGroupChain::new(self.bloom_config, self);
let trace_blooms = chain.replace(&replaced_range, enacted_blooms); let trace_blooms = chain.replace(&replaced_range, enacted_blooms);
let blooms_to_insert = trace_blooms.into_iter() let blooms_to_insert = trace_blooms.into_iter()

View File

@ -4,7 +4,7 @@
!define DESCRIPTION "Fast, light, robust Ethereum implementation" !define DESCRIPTION "Fast, light, robust Ethereum implementation"
!define VERSIONMAJOR 1 !define VERSIONMAJOR 1
!define VERSIONMINOR 3 !define VERSIONMINOR 3
!define VERSIONBUILD 5 !define VERSIONBUILD 6
!addplugindir .\ !addplugindir .\

View File

@ -117,6 +117,8 @@ fn start() -> Result<String, String> {
} }
fn main() { fn main() {
// Always print backtrace on panic.
::std::env::set_var("RUST_BACKTRACE", "1");
// just redirect to the sync::main() // just redirect to the sync::main()
if std::env::args().nth(1).map_or(false, |arg| arg == "sync") { if std::env::args().nth(1).map_or(false, |arg| arg == "sync") {
sync::main(); sync::main();

View File

@ -183,8 +183,8 @@ impl BlockCollection {
{ {
let mut blocks = Vec::new(); let mut blocks = Vec::new();
let mut head = self.head; let mut head = self.head;
while head.is_some() { while let Some(h) = head {
head = self.parents.get(&head.unwrap()).cloned(); head = self.parents.get(&h).cloned();
if let Some(head) = head { if let Some(head) = head {
match self.blocks.get(&head) { match self.blocks.get(&head) {
Some(block) if block.body.is_some() => { Some(block) if block.body.is_some() => {
@ -200,7 +200,7 @@ impl BlockCollection {
for block in blocks.drain(..) { for block in blocks.drain(..) {
let mut block_rlp = RlpStream::new_list(3); let mut block_rlp = RlpStream::new_list(3);
block_rlp.append_raw(&block.header, 1); block_rlp.append_raw(&block.header, 1);
let body = Rlp::new(block.body.as_ref().unwrap()); // incomplete blocks are filtered out in the loop above let body = Rlp::new(block.body.as_ref().expect("blocks contains only full blocks; qed"));
block_rlp.append_raw(body.at(0).as_raw(), 1); block_rlp.append_raw(body.at(0).as_raw(), 1);
block_rlp.append_raw(body.at(1).as_raw(), 1); block_rlp.append_raw(body.at(1).as_raw(), 1);
drained.push(block_rlp.out()); drained.push(block_rlp.out());

View File

@ -89,7 +89,6 @@
use util::*; use util::*;
use network::*; use network::*;
use std::mem::{replace};
use ethcore::views::{HeaderView, BlockView}; use ethcore::views::{HeaderView, BlockView};
use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo, BlockImportError}; use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo, BlockImportError};
@ -138,7 +137,7 @@ const GET_RECEIPTS_PACKET: u8 = 0x0f;
const RECEIPTS_PACKET: u8 = 0x10; const RECEIPTS_PACKET: u8 = 0x10;
const HEADERS_TIMEOUT_SEC: f64 = 15f64; const HEADERS_TIMEOUT_SEC: f64 = 15f64;
const BODIES_TIMEOUT_SEC: f64 = 5f64; const BODIES_TIMEOUT_SEC: f64 = 10f64;
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64; const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;
#[derive(Copy, Clone, Eq, PartialEq, Debug)] #[derive(Copy, Clone, Eq, PartialEq, Debug)]
@ -230,8 +229,6 @@ struct PeerInfo {
network_id: U256, network_id: U256,
/// Peer best block hash /// Peer best block hash
latest_hash: H256, latest_hash: H256,
/// Peer best block number if known
latest_number: Option<BlockNumber>,
/// Peer total difficulty if known /// Peer total difficulty if known
difficulty: Option<U256>, difficulty: Option<U256>,
/// Type of data currenty being requested from peer. /// Type of data currenty being requested from peer.
@ -360,6 +357,8 @@ impl ChainSync {
} }
self.syncing_difficulty = From::from(0u64); self.syncing_difficulty = From::from(0u64);
self.state = SyncState::Idle; self.state = SyncState::Idle;
// Reactivate peers only if some progress has been made
// since the last sync round of if starting fresh.
self.active_peers = self.peers.keys().cloned().collect(); self.active_peers = self.peers.keys().cloned().collect();
} }
@ -371,7 +370,8 @@ impl ChainSync {
self.continue_sync(io); self.continue_sync(io);
} }
/// Remove peer from active peer set /// Remove peer from active peer set. Peer will be reactivated on the next sync
/// round.
fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) { fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "Deactivating peer {}", peer_id); trace!(target: "sync", "Deactivating peer {}", peer_id);
self.active_peers.remove(&peer_id); self.active_peers.remove(&peer_id);
@ -401,7 +401,6 @@ impl ChainSync {
network_id: try!(r.val_at(1)), network_id: try!(r.val_at(1)),
difficulty: Some(try!(r.val_at(2))), difficulty: Some(try!(r.val_at(2))),
latest_hash: try!(r.val_at(3)), latest_hash: try!(r.val_at(3)),
latest_number: None,
genesis: try!(r.val_at(4)), genesis: try!(r.val_at(4)),
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),
@ -434,7 +433,11 @@ impl ChainSync {
} }
self.peers.insert(peer_id.clone(), peer); self.peers.insert(peer_id.clone(), peer);
// Don't activate peer immediatelly when searching for common block.
// Let the current sync round complete first.
if self.state != SyncState::ChainHead {
self.active_peers.insert(peer_id.clone()); self.active_peers.insert(peer_id.clone());
}
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
if let Some((fork_block, _)) = self.fork_block { if let Some((fork_block, _)) = self.fork_block {
self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader); self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader);
@ -521,7 +524,7 @@ impl ChainSync {
continue; continue;
} }
if self.highest_block == None || number > self.highest_block.unwrap() { if self.highest_block.as_ref().map_or(true, |n| number > *n) {
self.highest_block = Some(number); self.highest_block = Some(number);
} }
let hash = info.hash(); let hash = info.hash();
@ -553,9 +556,9 @@ impl ChainSync {
} }
if headers.is_empty() { if headers.is_empty() {
// Peer does not have any new subchain heads, deactivate it nd try with another // Peer does not have any new subchain heads, deactivate it and try with another.
trace!(target: "sync", "{} Disabled for no data", peer_id); trace!(target: "sync", "{} Disabled for no data", peer_id);
io.disable_peer(peer_id); self.deactivate_peer(io, peer_id);
} }
match self.state { match self.state {
SyncState::ChainHead => { SyncState::ChainHead => {
@ -634,9 +637,9 @@ impl ChainSync {
} }
let mut unknown = false; let mut unknown = false;
{ {
let peer = self.peers.get_mut(&peer_id).unwrap(); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.latest_hash = header.hash(); peer.latest_hash = header.hash();
peer.latest_number = Some(header.number()); }
} }
if self.last_imported_block > header.number() && self.last_imported_block - header.number() > MAX_NEW_BLOCK_AGE { if self.last_imported_block > header.number() && self.last_imported_block - header.number() > MAX_NEW_BLOCK_AGE {
trace!(target: "sync", "Ignored ancient new block {:?}", h); trace!(target: "sync", "Ignored ancient new block {:?}", h);
@ -729,9 +732,9 @@ impl ChainSync {
new_hashes.push(hash.clone()); new_hashes.push(hash.clone());
if number > max_height { if number > max_height {
trace!(target: "sync", "New unknown block hash {:?}", hash); trace!(target: "sync", "New unknown block hash {:?}", hash);
let peer = self.peers.get_mut(&peer_id).unwrap(); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.latest_hash = hash.clone(); peer.latest_hash = hash.clone();
peer.latest_number = Some(number); }
max_height = number; max_height = number;
} }
}, },
@ -808,7 +811,7 @@ impl ChainSync {
return; return;
} }
let (peer_latest, peer_difficulty) = { let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).unwrap(); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if peer.asking != PeerAsking::Nothing || !peer.can_sync() { if peer.asking != PeerAsking::Nothing || !peer.can_sync() {
return; return;
} }
@ -817,6 +820,9 @@ impl ChainSync {
return; return;
} }
(peer.latest_hash.clone(), peer.difficulty.clone()) (peer.latest_hash.clone(), peer.difficulty.clone())
} else {
return;
}
}; };
let chain_info = io.chain().chain_info(); let chain_info = io.chain().chain_info();
let td = chain_info.pending_total_difficulty; let td = chain_info.pending_total_difficulty;
@ -893,21 +899,25 @@ impl ChainSync {
// check to see if we need to download any block bodies first // check to see if we need to download any block bodies first
let needed_bodies = self.blocks.needed_bodies(MAX_BODIES_TO_REQUEST, ignore_others); let needed_bodies = self.blocks.needed_bodies(MAX_BODIES_TO_REQUEST, ignore_others);
if !needed_bodies.is_empty() { if !needed_bodies.is_empty() {
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_bodies.clone()); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.asking_blocks = needed_bodies.clone();
}
self.request_bodies(io, peer_id, needed_bodies); self.request_bodies(io, peer_id, needed_bodies);
return; return;
} }
// find subchain to download // find subchain to download
if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, ignore_others) { if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, ignore_others) {
replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, vec![h.clone()]); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.asking_blocks = vec![h.clone()];
}
self.request_headers_by_hash(io, peer_id, &h, count, 0, false, PeerAsking::BlockHeaders); self.request_headers_by_hash(io, peer_id, &h, count, 0, false, PeerAsking::BlockHeaders);
} }
} }
/// Clear all blocks/headers marked as being downloaded by a peer. /// Clear all blocks/headers marked as being downloaded by a peer.
fn clear_peer_download(&mut self, peer_id: PeerId) { fn clear_peer_download(&mut self, peer_id: PeerId) {
let peer = self.peers.get_mut(&peer_id).unwrap(); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
match peer.asking { match peer.asking {
PeerAsking::BlockHeaders | PeerAsking::Heads => { PeerAsking::BlockHeaders | PeerAsking::Heads => {
for b in &peer.asking_blocks { for b in &peer.asking_blocks {
@ -923,6 +933,7 @@ impl ChainSync {
} }
peer.asking_blocks.clear(); peer.asking_blocks.clear();
} }
}
fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) { fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) {
self.last_imported_block = number; self.last_imported_block = number;
@ -1030,7 +1041,7 @@ impl ChainSync {
/// Reset peer status after request is complete. /// Reset peer status after request is complete.
fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) -> bool { fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) -> bool {
let peer = self.peers.get_mut(&peer_id).unwrap(); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.expired = false; peer.expired = false;
if peer.asking != asking { if peer.asking != asking {
trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking);
@ -1041,11 +1052,14 @@ impl ChainSync {
peer.asking = PeerAsking::Nothing; peer.asking = PeerAsking::Nothing;
true true
} }
} else {
false
}
} }
/// Generic request sender /// Generic request sender
fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
let peer = self.peers.get_mut(&peer_id).unwrap(); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if peer.asking != PeerAsking::Nothing { if peer.asking != PeerAsking::Nothing {
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
} }
@ -1056,6 +1070,7 @@ impl ChainSync {
sync.disable_peer(peer_id); sync.disable_peer(peer_id);
} }
} }
}
/// Generic packet sender /// Generic packet sender
fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
@ -1371,7 +1386,7 @@ impl ChainSync {
/// creates latest block rlp for the given client /// creates latest block rlp for the given client
fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
let mut rlp_stream = RlpStream::new_list(2); let mut rlp_stream = RlpStream::new_list(2);
rlp_stream.append_raw(&chain.block(BlockID::Hash(chain.chain_info().best_block_hash)).unwrap(), 1); rlp_stream.append_raw(&chain.block(BlockID::Hash(chain.chain_info().best_block_hash)).expect("Best block always exists"), 1);
rlp_stream.append(&chain.chain_info().total_difficulty); rlp_stream.append(&chain.chain_info().total_difficulty);
rlp_stream.out() rlp_stream.out()
} }
@ -1385,25 +1400,23 @@ impl ChainSync {
} }
/// returns peer ids that have less blocks than our chain /// returns peer ids that have less blocks than our chain
fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> { fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<PeerId> {
let latest_hash = chain_info.best_block_hash; let latest_hash = chain_info.best_block_hash;
let latest_number = chain_info.best_block_number;
self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)| self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)|
match io.chain().block_status(BlockID::Hash(peer_info.latest_hash.clone())) { match io.chain().block_status(BlockID::Hash(peer_info.latest_hash.clone())) {
BlockStatus::InChain => { BlockStatus::InChain => {
if peer_info.latest_number.is_none() { if peer_info.latest_hash != latest_hash {
peer_info.latest_number = Some(HeaderView::new(&io.chain().block_header(BlockID::Hash(peer_info.latest_hash.clone())).unwrap()).number()); Some(id)
} else {
None
} }
if peer_info.latest_hash != latest_hash && latest_number > peer_info.latest_number.unwrap() {
Some((id, peer_info.latest_number.unwrap()))
} else { None }
}, },
_ => None _ => None
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }
fn select_random_lagging_peers(&mut self, peers: &[(PeerId, BlockNumber)]) -> Vec<(PeerId, BlockNumber)> { fn select_random_lagging_peers(&mut self, peers: &[PeerId]) -> Vec<PeerId> {
use rand::Rng; use rand::Rng;
// take sqrt(x) peers // take sqrt(x) peers
let mut peers = peers.to_vec(); let mut peers = peers.to_vec();
@ -1416,46 +1429,42 @@ impl ChainSync {
} }
/// propagates latest block to lagging peers /// propagates latest block to lagging peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256], peers: &[(PeerId, BlockNumber)]) -> usize { fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256], peers: &[PeerId]) -> usize {
trace!(target: "sync", "Sending NewBlocks to {:?}", peers); trace!(target: "sync", "Sending NewBlocks to {:?}", peers);
let mut sent = 0; let mut sent = 0;
for &(peer_id, _) in peers { for peer_id in peers {
if sealed.is_empty() { if sealed.is_empty() {
let rlp = ChainSync::create_latest_block_rlp(io.chain()); let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
} else { } else {
for h in sealed { for h in sealed {
let rlp = ChainSync::create_new_block_rlp(io.chain(), h); let rlp = ChainSync::create_new_block_rlp(io.chain(), h);
self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
} }
} }
self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone(); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
self.peers.get_mut(&peer_id).unwrap().latest_number = Some(chain_info.best_block_number); peer.latest_hash = chain_info.best_block_hash.clone();
}
sent += 1; sent += 1;
} }
sent sent
} }
/// propagates new known hashes to all peers /// propagates new known hashes to all peers
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[(PeerId, BlockNumber)]) -> usize { fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[PeerId]) -> usize {
trace!(target: "sync", "Sending NewHashes to {:?}", peers); trace!(target: "sync", "Sending NewHashes to {:?}", peers);
let mut sent = 0; let mut sent = 0;
let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone()))
for &(peer_id, peer_number) in peers { .expect("Best block always exists")).parent_hash();
let peer_best = if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber { for peer_id in peers {
// If we think peer is too far behind just send one latest hash sent += match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &chain_info.best_block_hash) {
last_parent.clone()
} else {
self.peers.get(&peer_id).unwrap().latest_hash.clone()
};
sent += match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &chain_info.best_block_hash) {
Some(rlp) => { Some(rlp) => {
{ {
let peer = self.peers.get_mut(&peer_id).unwrap(); if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.latest_hash = chain_info.best_block_hash.clone(); peer.latest_hash = chain_info.best_block_hash.clone();
peer.latest_number = Some(chain_info.best_block_number);
} }
self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); }
self.send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp);
1 1
}, },
None => 0 None => 0
@ -1735,7 +1744,6 @@ mod tests {
genesis: H256::zero(), genesis: H256::zero(),
network_id: U256::zero(), network_id: U256::zero(),
latest_hash: peer_latest_hash, latest_hash: peer_latest_hash,
latest_number: None,
difficulty: None, difficulty: None,
asking: PeerAsking::Nothing, asking: PeerAsking::Nothing,
asking_blocks: Vec::new(), asking_blocks: Vec::new(),

View File

@ -95,6 +95,27 @@ fn forked() {
assert_eq!(&*net.peer(2).chain.numbers.read(), &peer1_chain); assert_eq!(&*net.peer(2).chain.numbers.read(), &peer1_chain);
} }
#[test]
fn forked_with_misbehaving_peer() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
// peer 0 is on a totally different chain with higher total difficulty
net.peer_mut(0).chain = TestBlockChainClient::new_with_extra_data(b"fork".to_vec());
net.peer_mut(0).chain.add_blocks(500, EachBlockWith::Nothing);
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Nothing);
net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Nothing);
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Nothing);
net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle);
// peer 1 should sync to peer 2, others should not change
let peer0_chain = net.peer(0).chain.numbers.read().clone();
let peer2_chain = net.peer(2).chain.numbers.read().clone();
net.sync();
assert_eq!(&*net.peer(0).chain.numbers.read(), &peer0_chain);
assert_eq!(&*net.peer(1).chain.numbers.read(), &peer2_chain);
assert_eq!(&*net.peer(2).chain.numbers.read(), &peer2_chain);
}
#[test] #[test]
fn net_hard_fork() { fn net_hard_fork() {
::env_logger::init().ok(); ::env_logger::init().ok();
@ -116,11 +137,12 @@ fn net_hard_fork() {
#[test] #[test]
fn restart() { fn restart() {
::env_logger::init().ok();
let mut net = TestNet::new(3); let mut net = TestNet::new(3);
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync_steps(8); net.sync();
// make sure that sync has actually happened // make sure that sync has actually happened
assert!(net.peer(0).chain.chain_info().best_block_number > 100); assert!(net.peer(0).chain.chain_info().best_block_number > 100);

View File

@ -26,6 +26,7 @@ pub struct TestIo<'p> {
pub chain: &'p mut TestBlockChainClient, pub chain: &'p mut TestBlockChainClient,
pub queue: &'p mut VecDeque<TestPacket>, pub queue: &'p mut VecDeque<TestPacket>,
pub sender: Option<PeerId>, pub sender: Option<PeerId>,
pub to_disconnect: HashSet<PeerId>,
overlay: RwLock<HashMap<BlockNumber, Bytes>>, overlay: RwLock<HashMap<BlockNumber, Bytes>>,
} }
@ -35,16 +36,19 @@ impl<'p> TestIo<'p> {
chain: chain, chain: chain,
queue: queue, queue: queue,
sender: sender, sender: sender,
to_disconnect: HashSet::new(),
overlay: RwLock::new(HashMap::new()), overlay: RwLock::new(HashMap::new()),
} }
} }
} }
impl<'p> SyncIo for TestIo<'p> { impl<'p> SyncIo for TestIo<'p> {
fn disable_peer(&mut self, _peer_id: PeerId) { fn disable_peer(&mut self, peer_id: PeerId) {
self.disconnect_peer(peer_id);
} }
fn disconnect_peer(&mut self, _peer_id: PeerId) { fn disconnect_peer(&mut self, peer_id: PeerId) {
self.to_disconnect.insert(peer_id);
} }
fn is_expired(&self) -> bool { fn is_expired(&self) -> bool {
@ -141,13 +145,30 @@ impl TestNet {
pub fn sync_step(&mut self) { pub fn sync_step(&mut self) {
for peer in 0..self.peers.len() { for peer in 0..self.peers.len() {
if let Some(packet) = self.peers[peer].queue.pop_front() { if let Some(packet) = self.peers[peer].queue.pop_front() {
let disconnecting = {
let mut p = self.peers.get_mut(packet.recipient).unwrap(); let mut p = self.peers.get_mut(packet.recipient).unwrap();
trace!("--- {} -> {} ---", peer, packet.recipient); trace!("--- {} -> {} ---", peer, packet.recipient);
ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); let to_disconnect = {
trace!("----------------"); let mut io = TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId));
ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data);
io.to_disconnect
};
for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting
let mut io = TestIo::new(&mut p.chain, &mut p.queue, Some(*d));
p.sync.write().on_peer_aborting(&mut io, *d);
} }
let mut p = self.peers.get_mut(peer).unwrap(); to_disconnect
p.sync.write().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); };
for d in &disconnecting {
// notify other peers that this peer is disconnecting
let mut p = self.peers.get_mut(*d).unwrap();
let mut io = TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId));
p.sync.write().on_peer_aborting(&mut io, peer as PeerId);
}
}
self.sync_step_peer(peer);
} }
} }

View File

@ -3,7 +3,7 @@ description = "Ethcore utility library"
homepage = "http://ethcore.io" homepage = "http://ethcore.io"
license = "GPL-3.0" license = "GPL-3.0"
name = "ethcore-util" name = "ethcore-util"
version = "1.3.5" version = "1.3.6"
authors = ["Ethcore <admin@ethcore.io>"] authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs" build = "build.rs"