Merge pull request #416 from ethcore/net

Network/Sync fixes and optimizations
This commit is contained in:
Gav Wood 2016-02-12 09:18:37 +01:00
commit 23d2899e54
5 changed files with 36 additions and 28 deletions

View File

@ -316,12 +316,11 @@ impl Client {
self.report.write().unwrap().accrue_block(&block); self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
ret += 1; ret += 1;
if self.block_queue.read().unwrap().queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::BlockVerified)).unwrap();
}
} }
self.block_queue.write().unwrap().mark_as_good(&good_blocks); self.block_queue.write().unwrap().mark_as_good(&good_blocks);
if !good_blocks.is_empty() && self.block_queue.read().unwrap().queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::BlockVerified)).unwrap();
}
ret ret
} }
@ -350,24 +349,26 @@ impl Client {
self.chain.write().unwrap().configure_cache(pref_cache_size, max_cache_size); self.chain.write().unwrap().configure_cache(pref_cache_size, max_cache_size);
} }
fn block_hash(&self, id: BlockId) -> Option<H256> { fn block_hash(chain: &BlockChain, id: BlockId) -> Option<H256> {
match id { match id {
BlockId::Hash(hash) => Some(hash), BlockId::Hash(hash) => Some(hash),
BlockId::Number(number) => self.chain.read().unwrap().block_hash(number), BlockId::Number(number) => chain.block_hash(number),
BlockId::Earliest => self.chain.read().unwrap().block_hash(0), BlockId::Earliest => chain.block_hash(0),
BlockId::Latest => Some(self.chain.read().unwrap().best_block_hash()) BlockId::Latest => Some(chain.best_block_hash())
} }
} }
} }
impl BlockChainClient for Client { impl BlockChainClient for Client {
fn block_header(&self, id: BlockId) -> Option<Bytes> { fn block_header(&self, id: BlockId) -> Option<Bytes> {
self.block_hash(id).and_then(|hash| self.chain.read().unwrap().block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) let chain = self.chain.read().unwrap();
Self::block_hash(&chain, id).and_then(|hash| chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec()))
} }
fn block_body(&self, id: BlockId) -> Option<Bytes> { fn block_body(&self, id: BlockId) -> Option<Bytes> {
self.block_hash(id).and_then(|hash| { let chain = self.chain.read().unwrap();
self.chain.read().unwrap().block(&hash).map(|bytes| { Self::block_hash(&chain, id).and_then(|hash| {
chain.block(&hash).map(|bytes| {
let rlp = Rlp::new(&bytes); let rlp = Rlp::new(&bytes);
let mut body = RlpStream::new_list(2); let mut body = RlpStream::new_list(2);
body.append_raw(rlp.at(1).as_raw(), 1); body.append_raw(rlp.at(1).as_raw(), 1);
@ -378,21 +379,24 @@ impl BlockChainClient for Client {
} }
fn block(&self, id: BlockId) -> Option<Bytes> { fn block(&self, id: BlockId) -> Option<Bytes> {
self.block_hash(id).and_then(|hash| { let chain = self.chain.read().unwrap();
self.chain.read().unwrap().block(&hash) Self::block_hash(&chain, id).and_then(|hash| {
chain.block(&hash)
}) })
} }
fn block_status(&self, id: BlockId) -> BlockStatus { fn block_status(&self, id: BlockId) -> BlockStatus {
match self.block_hash(id) { let chain = self.chain.read().unwrap();
Some(ref hash) if self.chain.read().unwrap().is_known(hash) => BlockStatus::InChain, match Self::block_hash(&chain, id) {
Some(ref hash) if chain.is_known(hash) => BlockStatus::InChain,
Some(hash) => self.block_queue.read().unwrap().block_status(&hash), Some(hash) => self.block_queue.read().unwrap().block_status(&hash),
None => BlockStatus::Unknown None => BlockStatus::Unknown
} }
} }
fn block_total_difficulty(&self, id: BlockId) -> Option<U256> { fn block_total_difficulty(&self, id: BlockId) -> Option<U256> {
self.block_hash(id).and_then(|hash| self.chain.read().unwrap().block_details(&hash)).map(|d| d.total_difficulty) let chain = self.chain.read().unwrap();
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
} }
fn code(&self, address: &Address) -> Option<Bytes> { fn code(&self, address: &Address) -> Option<Bytes> {
@ -400,13 +404,14 @@ impl BlockChainClient for Client {
} }
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> { fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction> {
let chain = self.chain.read().unwrap();
match id { match id {
TransactionId::Hash(ref hash) => self.chain.read().unwrap().transaction_address(hash), TransactionId::Hash(ref hash) => chain.transaction_address(hash),
TransactionId::Location(id, index) => self.block_hash(id).map(|hash| TransactionAddress { TransactionId::Location(id, index) => Self::block_hash(&chain, id).map(|hash| TransactionAddress {
block_hash: hash, block_hash: hash,
index: index index: index
}) })
}.and_then(|address| self.chain.read().unwrap().transaction(&address)) }.and_then(|address| chain.transaction(&address))
} }
fn tree_route(&self, from: &H256, to: &H256) -> Option<TreeRoute> { fn tree_route(&self, from: &H256, to: &H256) -> Option<TreeRoute> {

View File

@ -67,6 +67,7 @@ Options:
--public-address URL Specify the IP/port on which peers may connect [default: 0.0.0.0:30304]. --public-address URL Specify the IP/port on which peers may connect [default: 0.0.0.0:30304].
--address URL Equivalent to --listen-address URL --public-address URL. --address URL Equivalent to --listen-address URL --public-address URL.
--upnp Use UPnP to try to figure out the correct network settings. --upnp Use UPnP to try to figure out the correct network settings.
--node-key KEY Specify node secret key as hex string.
--cache-pref-size BYTES Specify the prefered size of the blockchain cache in bytes [default: 16384]. --cache-pref-size BYTES Specify the prefered size of the blockchain cache in bytes [default: 16384].
--cache-max-size BYTES Specify the maximum size of the blockchain cache in bytes [default: 262144]. --cache-max-size BYTES Specify the maximum size of the blockchain cache in bytes [default: 262144].
@ -77,7 +78,7 @@ Options:
-l --logging LOGGING Specify the logging level. -l --logging LOGGING Specify the logging level.
-v --version Show information about version. -v --version Show information about version.
-h --help Show this screen. -h --help Show this screen.
", flag_cache_pref_size: usize, flag_cache_max_size: usize, flag_address: Option<String>); ", flag_cache_pref_size: usize, flag_cache_max_size: usize, flag_address: Option<String>, flag_node_key: Option<String>);
fn setup_log(init: &str) { fn setup_log(init: &str) {
let mut builder = LogBuilder::new(); let mut builder = LogBuilder::new();
@ -205,6 +206,7 @@ fn main() {
let (listen, public) = conf.net_addresses(); let (listen, public) = conf.net_addresses();
net_settings.listen_address = listen; net_settings.listen_address = listen;
net_settings.public_address = public; net_settings.public_address = public;
net_settings.use_secret = conf.args.flag_node_key.as_ref().map(|s| Secret::from_str(&s).expect("Invalid key string"));
// Build client // Build client
let mut service = ClientService::start(spec, net_settings, &Path::new(&conf.path())).unwrap(); let mut service = ClientService::start(spec, net_settings, &Path::new(&conf.path())).unwrap();
@ -259,7 +261,7 @@ impl Informant {
let sync_info = sync.status(); let sync_info = sync.status();
if let (_, &Some(ref last_cache_info), &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) { if let (_, &Some(ref last_cache_info), &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) {
println!("[ {} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, {} downloaded, {}+{} queued ···// {} ({}) bl {} ({}) ex ]", println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// {} ({}) bl {} ({}) ex ]",
chain_info.best_block_number, chain_info.best_block_number,
chain_info.best_block_hash, chain_info.best_block_hash,
(report.blocks_imported - last_report.blocks_imported) / dur, (report.blocks_imported - last_report.blocks_imported) / dur,
@ -268,7 +270,7 @@ impl Informant {
sync_info.num_active_peers, sync_info.num_active_peers,
sync_info.num_peers, sync_info.num_peers,
sync_info.blocks_received, sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number),
queue_info.unverified_queue_size, queue_info.unverified_queue_size,
queue_info.verified_queue_size, queue_info.verified_queue_size,

View File

@ -583,7 +583,7 @@ impl ChainSync {
trace!(target: "sync", "Starting sync with better chain"); trace!(target: "sync", "Starting sync with better chain");
self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false);
} }
else if self.state == SyncState::Blocks { else if self.state == SyncState::Blocks && io.chain().block_status(BlockId::Hash(peer_latest)) == BlockStatus::Unknown {
self.request_blocks(io, peer_id); self.request_blocks(io, peer_id);
} }
} }
@ -607,7 +607,7 @@ impl ChainSync {
if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 { if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 {
for (start, ref items) in self.headers.range_iter() { for (start, ref items) in self.headers.range_iter() {
if needed_bodies.len() > MAX_BODIES_TO_REQUEST { if needed_bodies.len() >= MAX_BODIES_TO_REQUEST {
break; break;
} }
let mut index: BlockNumber = 0; let mut index: BlockNumber = 0;
@ -654,7 +654,7 @@ impl ChainSync {
continue; continue;
} }
let mut block = prev; let mut block = prev;
while block < next && headers.len() <= MAX_HEADERS_TO_REQUEST { while block < next && headers.len() < MAX_HEADERS_TO_REQUEST {
if !self.downloading_headers.contains(&(block as BlockNumber)) { if !self.downloading_headers.contains(&(block as BlockNumber)) {
headers.push(block as BlockNumber); headers.push(block as BlockNumber);
self.downloading_headers.insert(block as BlockNumber); self.downloading_headers.insert(block as BlockNumber);
@ -1045,7 +1045,7 @@ impl ChainSync {
fn check_resume(&mut self, io: &mut SyncIo) { fn check_resume(&mut self, io: &mut SyncIo) {
if !io.chain().queue_info().is_full() && self.state == SyncState::Waiting { if !io.chain().queue_info().is_full() && self.state == SyncState::Waiting {
self.state = SyncState::Idle; self.state = SyncState::Blocks;
self.continue_sync(io); self.continue_sync(io);
} }
} }

View File

@ -412,7 +412,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut to_kill = Vec::new(); let mut to_kill = Vec::new();
for e in self.connections.write().unwrap().iter_mut() { for e in self.connections.write().unwrap().iter_mut() {
if let ConnectionEntry::Session(ref mut s) = *e.lock().unwrap().deref_mut() { if let ConnectionEntry::Session(ref mut s) = *e.lock().unwrap().deref_mut() {
if !s.keep_alive() { if !s.keep_alive(io) {
s.disconnect(DisconnectReason::PingTimeout); s.disconnect(DisconnectReason::PingTimeout);
to_kill.push(s.token()); to_kill.push(s.token());
} }

View File

@ -180,7 +180,7 @@ impl Session {
} }
/// Keep this session alive. Returns false if ping timeout happened /// Keep this session alive. Returns false if ping timeout happened
pub fn keep_alive(&mut self) -> bool { pub fn keep_alive<Message>(&mut self, io: &IoContext<Message>) -> bool where Message: Send + Sync + Clone {
let timed_out = if let Some(pong) = self.pong_time_ns { let timed_out = if let Some(pong) = self.pong_time_ns {
pong - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000 pong - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000
} else { } else {
@ -191,6 +191,7 @@ impl Session {
if let Err(e) = self.send_ping() { if let Err(e) = self.send_ping() {
debug!("Error sending ping message: {:?}", e); debug!("Error sending ping message: {:?}", e);
} }
io.update_registration(self.token()).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
} }
!timed_out !timed_out
} }