From 1d67a7a37343b8756a15539d1815ffd591b41fd0 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 27 Dec 2015 00:48:03 +0100 Subject: [PATCH] sync fixed; more tests --- src/header.rs | 24 ++++---- src/sync/chain.rs | 108 ++++++++++++++++++++++++----------- src/sync/tests.rs | 140 ++++++++++++++++++++++++++++++++-------------- 3 files changed, 185 insertions(+), 87 deletions(-) diff --git a/src/header.rs b/src/header.rs index fdc51a549..b9281144f 100644 --- a/src/header.rs +++ b/src/header.rs @@ -37,22 +37,22 @@ pub struct Header { impl Header { pub fn new() -> Header { Header { - parent_hash: ZERO_H256.clone(), - timestamp: BAD_U256.clone(), - number: ZERO_U256.clone(), - author: ZERO_ADDRESS.clone(), + parent_hash: ZERO_H256, + timestamp: BAD_U256, + number: ZERO_U256, + author: ZERO_ADDRESS, - transactions_root: ZERO_H256.clone(), - uncles_hash: ZERO_H256.clone(), + transactions_root: SHA3_NULL_RLP, + uncles_hash: SHA3_EMPTY_LIST_RLP, extra_data: vec![], - state_root: ZERO_H256.clone(), - receipts_root: ZERO_H256.clone(), - log_bloom: ZERO_LOGBLOOM.clone(), - gas_used: ZERO_U256.clone(), - gas_limit: ZERO_U256.clone(), + state_root: SHA3_NULL_RLP, + receipts_root: SHA3_NULL_RLP, + log_bloom: ZERO_LOGBLOOM, + gas_used: ZERO_U256, + gas_limit: ZERO_U256, - difficulty: ZERO_U256.clone(), + difficulty: ZERO_U256, seal: vec![], hash: Cell::new(None), } diff --git a/src/sync/chain.rs b/src/sync/chain.rs index 0b921f552..d1d7df03b 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -2,7 +2,7 @@ use std::collections::{HashSet, HashMap}; use std::cmp::{min, max}; use std::mem::{replace}; use util::network::{PeerId, PacketId}; -use util::hash::{H256}; +use util::hash::{H256, FixedHash}; use util::bytes::{Bytes}; use util::uint::{U256}; use util::rlp::{Rlp, RlpStream, self}; //TODO: use UntrustedRlp @@ -26,10 +26,10 @@ impl FromUsize for BlockNumber { const PROTOCOL_VERSION: u8 = 63u8; const MAX_BODIES_TO_SEND: usize = 256; -const MAX_HEADERS_TO_SEND: usize = 1024; +const MAX_HEADERS_TO_SEND: usize = 512; const MAX_NODE_DATA_TO_SEND: usize = 1024; const MAX_RECEIPTS_TO_SEND: usize = 1024; -const MAX_HEADERS_TO_REQUEST: usize = 1024; +const MAX_HEADERS_TO_REQUEST: usize = 512; const MAX_BODIES_TO_REQUEST: usize = 256; const STATUS_PACKET: u8 = 0x00; @@ -127,6 +127,8 @@ pub struct ChainSync { header_ids: HashMap, /// Last impoted block number last_imported_block: BlockNumber, + /// Last impoted block hash + last_imported_hash: H256, /// Syncing total difficulty syncing_difficulty: U256, /// True if common block for our and remote chain has been found @@ -147,6 +149,7 @@ impl ChainSync { peers: HashMap::new(), header_ids: HashMap::new(), last_imported_block: 0, + last_imported_hash: H256::new(), syncing_difficulty: U256::from(0u64), have_common_block: false } @@ -193,6 +196,7 @@ impl ChainSync { pub fn restart(&mut self, io: &mut SyncIo) { self.reset(); self.last_imported_block = 0; + self.last_imported_hash = H256::new(); self.starting_block = 0; self.highest_block = 0; self.have_common_block = false; @@ -224,8 +228,9 @@ impl ChainSync { /// Called by peer once it has new block headers during sync fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders); let item_count = r.item_count(); - trace!(target: "sync", "BlockHeaders ({} entries)", item_count); + trace!(target: "sync", "{}-> BlockHeaders ({} entries)", peer_id, item_count); self.clear_peer_download(peer_id); if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { trace!(target: "sync", "Ignored unexpected block headers"); @@ -250,10 +255,16 @@ impl ChainSync { BlockStatus::InChain => { self.have_common_block = true; self.last_imported_block = number; + self.last_imported_hash = info.hash(); + trace!(target: "sync", "Found common header {} ({})", number, info.hash()); }, _ => { if self.have_common_block { //validate chain + if number == self.last_imported_block + 1 && info.parent_hash != self.last_imported_hash { + debug!(target: "sync", "Mismatched block header {} {}", number, info.hash()); + continue; + } if self.headers.find_item(&(number - 1)).map_or(false, |p| p.hash != info.parent_hash) { // mismatching parent id, delete the previous block and don't add this one // TODO: lower peer rating @@ -277,11 +288,12 @@ impl ChainSync { transactions_root: info.transactions_root, uncles: info.uncles_hash }; + trace!(target: "sync", "Got header {} ({})", number, info.hash()); if header_id.transactions_root == rlp::SHA3_NULL_RLP && header_id.uncles == rlp::SHA3_EMPTY_LIST_RLP { //empty body, just mark as downloaded let mut body_stream = RlpStream::new_list(2); - body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); body_stream.append_raw(&rlp::NULL_RLP, 1); + body_stream.append_raw(&rlp::EMPTY_LIST_RLP, 1); self.bodies.insert_item(number, body_stream.out()); } else { @@ -289,15 +301,16 @@ impl ChainSync { } } } - self.collect_blocks(io); - self.continue_sync(io); } + self.collect_blocks(io); + self.continue_sync(io); } /// Called by peer once it has new block bodies fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { + self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); let item_count = r.item_count(); - trace!(target: "sync", "BlockBodies ({} entries)", item_count); + trace!(target: "sync", "{}-> BlockBodies ({} entries)", peer_id, item_count); self.clear_peer_download(peer_id); if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { trace!(target: "sync", "Ignored unexpected block bodies"); @@ -320,6 +333,7 @@ impl ChainSync { Some(n) => { self.header_ids.remove(&header_id); self.bodies.insert_item(n, body.raw().to_vec()); + trace!(target: "sync", "Got body {}", n); } None => { debug!(target: "sync", "Ignored unknown block body"); @@ -336,6 +350,7 @@ impl ChainSync { let header_rlp = block_rlp.at(0); let h = header_rlp.raw().sha3(); + trace!(target: "sync", "{}-> NewBlock ({})", peer_id, h); match io.chain().import_block(block_rlp.raw()) { ImportResult::AlreadyInChain => { trace!(target: "sync", "New block already in chain {:?}", h); @@ -368,6 +383,7 @@ impl ChainSync { trace!(target: "sync", "Ignoring new hashes since we're already downloading."); return; } + trace!(target: "sync", "{}-> NewHashes ({} entries)", peer_id, r.item_count()); let hashes = r.iter().map(|item| (item.val_at::(0), item.val_at::(1))); let mut max_height: U256 = From::from(0); for (h, d) in hashes { @@ -397,11 +413,13 @@ impl ChainSync { /// Called by peer when it is disconnecting pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: &PeerId) { + trace!(target: "sync", "== Disconnected {}", peer); self.clear_peer_download(peer); self.continue_sync(io); } pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: &PeerId) { + trace!(target: "sync", "== Connected {}", peer); self.send_status(io, peer); } @@ -415,6 +433,7 @@ impl ChainSync { /// Called after all blocks have been donloaded fn complete_sync(&mut self) { + trace!(target: "sync", "Sync complete"); self.reset(); self.state = SyncState::Idle; } @@ -428,13 +447,10 @@ impl ChainSync { fn sync_peer(&mut self, io: &mut SyncIo, peer_id: &PeerId, force: bool) { let (peer_latest, peer_difficulty) = { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - if peer.asking != PeerAsking::Nothing - { - trace!(target: "sync", "Can't sync with this peer - outstanding asks."); + if peer.asking != PeerAsking::Nothing { return; } - if self.state == SyncState::Waiting - { + if self.state == SyncState::Waiting { trace!(target: "sync", "Waiting for block queue"); return; } @@ -449,6 +465,7 @@ impl ChainSync { if self.state == SyncState::Idle || self.state == SyncState::NotSynced { self.state = SyncState::Blocks; } + trace!(target: "sync", "Starting sync with better chain"); self.request_headers_by_hash(io, peer_id, &peer_latest, 1, 0, false); } else if self.state == SyncState::Blocks { @@ -470,7 +487,7 @@ impl ChainSync { if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.last_imported_block + 1 { for (start, ref items) in self.headers.range_iter() { - if needed_bodies.len() >= MAX_BODIES_TO_REQUEST { + if needed_bodies.len() > MAX_BODIES_TO_REQUEST { break; } let mut index: BlockNumber = 0; @@ -498,36 +515,34 @@ impl ChainSync { if !self.headers.is_empty() { start = min(start, self.headers.range_iter().next().unwrap().0 as usize - 1); } - if start <= 1 { + if start == 0 { self.have_common_block = true; //reached genesis } } if self.have_common_block { - - - let mut headers: Vec = Vec::new(); let mut prev = self.last_imported_block + 1; - for (start, ref items) in self.headers.range_iter() { - if headers.len() >= MAX_HEADERS_TO_REQUEST { + for (next, ref items) in self.headers.range_iter() { + if !headers.is_empty() { break; } - if start > prev { + if next <= prev { + prev = next + items.len() as BlockNumber; continue; } - let mut index = 0; - while index != items.len() as BlockNumber && headers.len() < MAX_BODIES_TO_REQUEST { - let block = prev + index; + let mut block = prev; + while block < next && headers.len() <= MAX_HEADERS_TO_REQUEST { if !self.downloading_headers.contains(&(block as BlockNumber)) { headers.push(block as BlockNumber); self.downloading_headers.insert(block as BlockNumber); } - index += 1; + block += 1; } - prev = start + items.len() as BlockNumber; + prev = next + items.len() as BlockNumber; } if !headers.is_empty() { + start = headers[0] as usize; let count = headers.len(); replace(&mut self.peers.get_mut(peer_id).unwrap().asking_blocks, headers); assert!(!self.headers.have_item(&(start as BlockNumber))); @@ -563,23 +578,31 @@ impl ChainSync { return; } - for i in 0..min(headers.1.len(), bodies.1.len()) { + let count = min(headers.1.len(), bodies.1.len()); + let mut imported = 0; + for i in 0..count { let mut block_rlp = RlpStream::new_list(3); block_rlp.append_raw(&headers.1[i].data, 1); - block_rlp.append_raw(&bodies.1[i], 2); + let body = Rlp::new(&bodies.1[i]); + block_rlp.append_raw(body.at(0).raw(), 1); + block_rlp.append_raw(body.at(1).raw(), 1); let h = &headers.1[i].hash; match io.chain().import_block(&block_rlp.out()) { ImportResult::AlreadyInChain => { trace!(target: "sync", "Block already in chain {:?}", h); self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = *h; }, ImportResult::AlreadyQueued(_) => { trace!(target: "sync", "Block already queued {:?}", h); self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = *h; }, ImportResult::Queued(QueueStatus::Known) => { trace!(target: "sync", "Block queued {:?}", h); self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = *h; + imported += 1; }, ImportResult::Queued(QueueStatus::Unknown) => { panic!("Queued out of order block"); @@ -590,6 +613,7 @@ impl ChainSync { } } } + trace!(target: "sync", "Imported {} of {}", imported, count); } if restart { @@ -627,6 +651,7 @@ impl ChainSync { } fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: &PeerId, h: &H256, count: usize, skip: usize, reverse: bool) { + trace!(target: "sync", "{}<- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); let mut rlp = RlpStream::new_list(4); rlp.append(h); rlp.append(&count); @@ -637,6 +662,7 @@ impl ChainSync { fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: &PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool) { let mut rlp = RlpStream::new_list(4); + trace!(target: "sync", "{}<- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); rlp.append(&n); rlp.append(&count); rlp.append(&skip); @@ -646,12 +672,23 @@ impl ChainSync { fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: &PeerId, hashes: Vec) { let mut rlp = RlpStream::new_list(hashes.len()); + trace!(target: "sync", "{}<- GetBlockBodies: {} entries", peer_id, hashes.len()); for h in hashes { rlp.append(&h); } self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); } + fn reset_peer_asking(&mut self, peer_id: &PeerId, asking: PeerAsking) { + let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); + if peer.asking != asking { + warn!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); + } + else { + peer.asking = PeerAsking::Nothing; + } + } + fn send_request(&mut self, sync: &mut SyncIo, peer_id: &PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); @@ -703,22 +740,26 @@ impl ChainSync { let mut number = if r.at(0).size() == 32 { // id is a hash let hash: H256 = r.val_at(0); - trace!(target: "sync", "GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); + trace!(target: "sync", "-> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", hash, max_headers, skip, reverse); match io.chain().block_header(&hash) { Some(hdr) => From::from(rlp::decode::(&hdr).number), None => last } } else { + trace!(target: "sync", "-> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", r.val_at::(0), max_headers, skip, reverse); r.val_at(0) }; - number = max(1, number); - number = min(last, number); + if reverse { + number = min(last, number); + } else { + number = max(1, number); + } let max_count = min(MAX_HEADERS_TO_SEND, max_headers); let mut count = 0; let mut data = Bytes::new(); - while number < last && number > 1 && count < max_count { + while number <= last && number > 0 && count < max_count { match io.chain().block_header_at(number) { Some(mut hdr) => { data.append(&mut hdr); @@ -737,6 +778,7 @@ impl ChainSync { rlp.append_raw(&data, count as usize); io.respond(BLOCK_HEADERS_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); + trace!(target: "sync", "-> GetBlockHeaders: returned {} entries", count); } fn return_block_bodies(&self, io: &mut SyncIo, r: &Rlp) { @@ -745,6 +787,7 @@ impl ChainSync { debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); return; } + trace!(target: "sync", "-> GetBlockBodies: {} entries", count); count = min(count, MAX_BODIES_TO_SEND); let mut added = 0usize; let mut data = Bytes::new(); @@ -761,6 +804,7 @@ impl ChainSync { rlp.append_raw(&data, added); io.respond(BLOCK_BODIES_PACKET, rlp.out()).unwrap_or_else(|e| debug!(target: "sync", "Error sending headers: {:?}", e)); + trace!(target: "sync", "-> GetBlockBodies: returned {} entries", added); } fn return_node_data(&self, io: &mut SyncIo, r: &Rlp) { diff --git a/src/sync/tests.rs b/src/sync/tests.rs index 7be66ff2f..6b2db9c9b 100644 --- a/src/sync/tests.rs +++ b/src/sync/tests.rs @@ -10,8 +10,8 @@ use sync::{SyncIo}; use sync::chain::{ChainSync}; struct TestBlockChainClient { - blocks: Vec, - hashes: HashMap, + blocks: HashMap, + numbers: HashMap, genesis_hash: H256, last_hash: H256, difficulty: U256 @@ -21,8 +21,8 @@ impl TestBlockChainClient { fn new() -> TestBlockChainClient { let mut client = TestBlockChainClient { - blocks: Vec::new(), - hashes: HashMap::new(), + blocks: HashMap::new(), + numbers: HashMap::new(), genesis_hash: H256::new(), last_hash: H256::new(), difficulty: From::from(0), @@ -33,20 +33,20 @@ impl TestBlockChainClient { } pub fn add_blocks(&mut self, count: usize, empty: bool) { - for n in self.blocks.len()..(self.blocks.len() + count) { + for n in self.numbers.len()..(self.numbers.len() + count) { let mut header = BlockHeader::new(); header.difficulty = From::from(n); header.parent_hash = self.last_hash; header.number = From::from(n); let mut uncles = RlpStream::new_list(if empty {0} else {1}); if !empty { - uncles.append(&H256::random()); + uncles.append(&H256::from(&U256::from(n))); header.uncles_hash = uncles.raw().sha3(); } let mut rlp = RlpStream::new_list(3); rlp.append(&header); - rlp.append_raw(uncles.raw(), 1); rlp.append_raw(&rlp::NULL_RLP, 1); + rlp.append_raw(uncles.raw(), 1); self.import_block(rlp.raw()); } } @@ -54,27 +54,12 @@ impl TestBlockChainClient { impl BlockChainClient for TestBlockChainClient { fn block_header(&self, h: &H256) -> Option { - self.hashes.get(h).and_then(|i| self.block_header_at(*i as BlockNumber)) + self.blocks.get(h).map(|r| Rlp::new(r).at(0).raw().to_vec()) + } fn block_body(&self, h: &H256) -> Option { - self.hashes.get(h).and_then(|i| self.block_body_at(*i as BlockNumber)) - } - - fn block(&self, h: &H256) -> Option { - self.hashes.get(h).map(|i| self.blocks[*i].clone()) - } - - fn block_status(&self, h: &H256) -> BlockStatus { - self.hashes.get(h).map(|i| self.block_status_at(*i as BlockNumber)).unwrap_or(BlockStatus::Unknown) - } - - fn block_header_at(&self, n: BlockNumber) -> Option { - self.blocks.get(n as usize).map(|r| Rlp::new(r).at(0).raw().to_vec()) - } - - fn block_body_at(&self, n: BlockNumber) -> Option { - self.blocks.get(n as usize).map(|r| { + self.blocks.get(h).map(|r| { let mut stream = RlpStream::new_list(2); stream.append_raw(Rlp::new(&r).at(1).raw(), 1); stream.append_raw(Rlp::new(&r).at(2).raw(), 1); @@ -82,8 +67,27 @@ impl BlockChainClient for TestBlockChainClient { }) } + fn block(&self, h: &H256) -> Option { + self.blocks.get(h).map(|b| b.clone()) + } + + fn block_status(&self, h: &H256) -> BlockStatus { + match self.blocks.get(h) { + Some(_) => BlockStatus::InChain, + None => BlockStatus::Unknown + } + } + + fn block_header_at(&self, n: BlockNumber) -> Option { + self.numbers.get(&(n as usize)).and_then(|h| self.block_header(h)) + } + + fn block_body_at(&self, n: BlockNumber) -> Option { + self.numbers.get(&(n as usize)).and_then(|h| self.block_body(h)) + } + fn block_at(&self, n: BlockNumber) -> Option { - self.blocks.get(n as usize).map(|b| b.clone()) + self.numbers.get(&(n as usize)).map(|h| self.blocks.get(h).unwrap().clone()) } fn block_status_at(&self, n: BlockNumber) -> BlockStatus { @@ -112,19 +116,41 @@ impl BlockChainClient for TestBlockChainClient { fn import_block(&mut self, b: &[u8]) -> ImportResult { let header = Rlp::new(&b).val_at::(0); - if header.number != From::from(self.blocks.len()) { - panic!("Unexpected block number"); + let number: usize = header.number.low_u64() as usize; + if number > self.blocks.len() { + panic!("Unexpected block number. Expected {}, got {}", self.blocks.len(), number); } - if !self.blocks.is_empty() { - let parent = Rlp::new(self.blocks.last().unwrap()).val_at::(0); - if header.parent_hash != parent.hash() { - panic!("Unexpected block header"); + if number > 0 { + match self.blocks.get(&header.parent_hash) { + Some(parent) => { + let parent = Rlp::new(parent).val_at::(0); + if parent.number != (header.number - From::from(1)) { + panic!("Unexpected block parent"); + } + }, + None => { + panic!("Unknown block parent {:?} for block {}", header.parent_hash, number); + } } } - self.difficulty = self.difficulty + header.difficulty; - self.last_hash = header.hash(); - self.hashes.insert(header.hash(), self.blocks.len()); - self.blocks.push(b.to_vec()); + if number == self.numbers.len() { + self.difficulty = self.difficulty + header.difficulty; + self.last_hash = header.hash(); + self.blocks.insert(header.hash(), b.to_vec()); + self.numbers.insert(number, header.hash()); + let mut parent_hash = header.parent_hash; + if number > 0 { + let mut n = number - 1; + while n > 0 && self.numbers[&n] != parent_hash { + *self.numbers.get_mut(&n).unwrap() = parent_hash; + n -= 1; + parent_hash = Rlp::new(&self.blocks[&parent_hash]).val_at::(0).parent_hash; + } + } + } + else { + self.blocks.insert(header.hash(), b.to_vec()); + } ImportResult::Queued(QueueStatus::Known) } @@ -222,7 +248,11 @@ impl TestNet { net } - pub fn peer(&mut self, i: usize) -> &mut TestPeer { + pub fn peer(&self, i: usize) -> &TestPeer { + self.peers.get(i).unwrap() + } + + pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer { self.peers.get_mut(i).unwrap() } @@ -242,7 +272,9 @@ impl TestNet { match self.peers[peer].queue.pop_front() { Some(packet) => { let mut p = self.peers.get_mut(packet.recipient).unwrap(); + trace!("--- {} -> {} ---", peer, packet.recipient); p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), &(peer as PeerId), packet.packet_id, &packet.data); + trace!("----------------"); }, None => {} } @@ -267,19 +299,41 @@ impl TestNet { #[test] fn full_sync_two_peers() { let mut net = TestNet::new(3); - net.peer(1).chain.add_blocks(1000, false); - net.peer(2).chain.add_blocks(1000, false); + net.peer_mut(1).chain.add_blocks(1000, false); + net.peer_mut(2).chain.add_blocks(1000, false); net.sync(); - assert_eq!(net.peer(0).chain.block_at(50000), net.peer(1).chain.block_at(50000)); + assert!(net.peer(0).chain.block_at(1000).is_some()); + assert_eq!(net.peer(0).chain.blocks, net.peer(1).chain.blocks); } #[test] fn full_sync_empty_blocks() { let mut net = TestNet::new(3); for n in 0..200 { - net.peer(1).chain.add_blocks(5, n % 2 == 0); - net.peer(2).chain.add_blocks(5, n % 2 == 0); + net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); + net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); } net.sync(); - assert_eq!(net.peer(0).chain.block_at(50000), net.peer(1).chain.block_at(50000)); + assert!(net.peer(0).chain.block_at(1000).is_some()); + assert_eq!(net.peer(0).chain.blocks, net.peer(1).chain.blocks); +} + +#[test] +fn forked_sync() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer_mut(0).chain.add_blocks(300, false); + net.peer_mut(1).chain.add_blocks(300, false); + net.peer_mut(2).chain.add_blocks(300, false); + net.peer_mut(0).chain.add_blocks(100, true); //fork + net.peer_mut(1).chain.add_blocks(200, false); + net.peer_mut(2).chain.add_blocks(200, false); + net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 + net.peer_mut(2).chain.add_blocks(10, true); + // peer 1 has the best chain of 601 blocks + let peer1_chain = net.peer(1).chain.numbers.clone(); + net.sync(); + assert_eq!(net.peer(0).chain.numbers, peer1_chain); + assert_eq!(net.peer(1).chain.numbers, peer1_chain); + assert_eq!(net.peer(2).chain.numbers, peer1_chain); }