diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 5f098bc26..75853e0ab 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -14,4 +14,4 @@ clippy = "0.0.37" log = "0.3" env_logger = "0.3" time = "0.1.34" - +rand = "0.3.13" diff --git a/sync/src/chain.rs b/sync/src/chain.rs index e143f20b1..590d351df 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -46,6 +46,8 @@ const MAX_NODE_DATA_TO_SEND: usize = 1024; const MAX_RECEIPTS_TO_SEND: usize = 1024; const MAX_HEADERS_TO_REQUEST: usize = 512; const MAX_BODIES_TO_REQUEST: usize = 256; +const MIN_PEERS_PROPAGATION: usize = 4; +const MAX_PEERS_PROPAGATION: usize = 128; const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; @@ -1026,13 +1028,82 @@ impl ChainSync { } } } - /// Maintain other peers. Send out any new blocks and transactions - pub fn maintain_sync(&mut self, io: &mut SyncIo) { + + fn check_resume(&mut self, io: &mut SyncIo) { if !io.chain().queue_info().full && self.state == SyncState::Waiting { self.state = SyncState::Idle; self.continue_sync(io); } } + + fn create_new_hashes_rlp(chain: &BlockChainClient, from: &H256, to: &H256) -> Option { + match chain.tree_route(from, to) { + Some(route) => { + match route.blocks.len() { + 0 => None, + _ => { + let mut rlp_stream = RlpStream::new_list(route.blocks.len()); + for hash in route.blocks { + rlp_stream.append(&hash); + } + Some(rlp_stream.out()) + } + } + }, + None => None + } + } + + fn query_peer_latest_blocks(&self) -> Vec<(usize, H256)> { + self.peers.iter().map(|peer| (peer.0.clone(), peer.1.latest.clone())).collect() + } + + fn propagade_blocks(&mut self, io: &mut SyncIo) -> usize { + let updated_peers = { + let chain = io.chain(); + let chain_info = chain.chain_info(); + let latest_hash = chain_info.best_block_hash; + + let lagging_peers = self.query_peer_latest_blocks().iter().filter(|peer| + match io.chain().block_status(&peer.1) + { + BlockStatus::InChain => peer.1 != latest_hash, + _ => false + }).cloned().collect::>(); + + let lucky_peers = match lagging_peers.len() { + 0 ... MIN_PEERS_PROPAGATION => lagging_peers, + _ => lagging_peers.iter().filter(|_| ::rand::random::() < 64u8).cloned().collect::>() + }; + + match lucky_peers.len() { + 0 ... MAX_PEERS_PROPAGATION => lucky_peers, + _ => lucky_peers.iter().take(MAX_PEERS_PROPAGATION).cloned().collect::>() + } + }; + + let mut sent = 0; + for (peer_id, peer_hash) in updated_peers { + sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_hash, &io.chain().chain_info().best_block_hash) { + Some(rlp) => { + self.send_request(io, peer_id, PeerAsking::Nothing, NEW_BLOCK_HASHES_PACKET, rlp); + 1 + }, + None => 0 + } + } + sent + } + + /// Maintain other peers. Send out any new blocks and transactions + pub fn maintain_sync(&mut self, io: &mut SyncIo) { + self.check_resume(io); + + if self.state == SyncState::Idle { + let blocks_propagaded = self.propagade_blocks(io); + debug!(target: "sync", "Sent new blocks to peers: {:?}", blocks_propagaded); + } + } } #[cfg(test)] diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 1523a8a9f..8847d9611 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -34,6 +34,7 @@ extern crate ethcore_util as util; extern crate ethcore; extern crate env_logger; extern crate time; +extern crate rand; use std::ops::*; use std::sync::*; diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index fcd9b6a7b..e328ba33d 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -88,4 +88,32 @@ fn restart() { fn status_empty() { let net = TestNet::new(2); assert_eq!(net.peer(0).sync.status().state, SyncState::NotSynced); +} + +#[test] +fn status_packet() { + let mut net = TestNet::new(2); + net.peer_mut(0).chain.add_blocks(1000, false); + net.peer_mut(1).chain.add_blocks(1, false); + + net.start(); + + net.sync_step_peer(0); + + assert_eq!(1, net.peer(0).queue.len()); + assert_eq!(0x00, net.peer(0).queue[0].packet_id); +} + +#[test] +fn propagade() { + let mut net = TestNet::new(2); + net.peer_mut(0).chain.add_blocks(100, false); + net.peer_mut(1).chain.add_blocks(100, false); + net.sync(); + + net.peer_mut(0).chain.add_blocks(10, false); + net.sync_step_peer(0); + + assert_eq!(1, net.peer(0).queue.len()); + assert_eq!(0x01, net.peer(0).queue[0].packet_id); } \ No newline at end of file diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index c4a4d80cb..54fcc37b0 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -318,6 +318,11 @@ impl TestNet { } } + pub fn sync_step_peer(&mut self, peer_num: usize) { + let mut peer = self.peer_mut(peer_num); + peer.sync.maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + } + pub fn restart_peer(&mut self, i: usize) { let peer = self.peer_mut(i); peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));