Don't activate peers on connect; Test (#2537)

This commit is contained in:
Arkadiy Paronyan 2016-10-10 23:05:41 +02:00 committed by Gav Wood
parent 26d7712d30
commit 193cdb1326
4 changed files with 74 additions and 16 deletions

View File

@ -55,6 +55,8 @@ pub struct TestBlockChainClient {
pub genesis_hash: H256, pub genesis_hash: H256,
/// Last block hash. /// Last block hash.
pub last_hash: RwLock<H256>, pub last_hash: RwLock<H256>,
/// Extra data do set for each block
pub extra_data: Bytes,
/// Difficulty. /// Difficulty.
pub difficulty: RwLock<U256>, pub difficulty: RwLock<U256>,
/// Balances. /// Balances.
@ -105,11 +107,17 @@ impl Default for TestBlockChainClient {
impl TestBlockChainClient { impl TestBlockChainClient {
/// Creates new test client. /// Creates new test client.
pub fn new() -> Self { pub fn new() -> Self {
Self::new_with_extra_data(Bytes::new())
}
/// Creates new test client with specified extra data for each block
pub fn new_with_extra_data(extra_data: Bytes) -> Self {
let spec = Spec::new_test(); let spec = Spec::new_test();
let mut client = TestBlockChainClient { let mut client = TestBlockChainClient {
blocks: RwLock::new(HashMap::new()), blocks: RwLock::new(HashMap::new()),
numbers: RwLock::new(HashMap::new()), numbers: RwLock::new(HashMap::new()),
genesis_hash: H256::new(), genesis_hash: H256::new(),
extra_data: extra_data,
last_hash: RwLock::new(H256::new()), last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)), difficulty: RwLock::new(From::from(0)),
balances: RwLock::new(HashMap::new()), balances: RwLock::new(HashMap::new()),
@ -184,6 +192,7 @@ impl TestBlockChainClient {
header.set_parent_hash(self.last_hash.read().clone()); header.set_parent_hash(self.last_hash.read().clone());
header.set_number(n as BlockNumber); header.set_number(n as BlockNumber);
header.set_gas_limit(U256::from(1_000_000)); header.set_gas_limit(U256::from(1_000_000));
header.set_extra_data(self.extra_data.clone());
let uncles = match with { let uncles = match with {
EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => {
let mut uncles = RlpStream::new_list(1); let mut uncles = RlpStream::new_list(1);

View File

@ -143,7 +143,7 @@ const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
const SNAPSHOT_DATA_PACKET: u8 = 0x14; const SNAPSHOT_DATA_PACKET: u8 = 0x14;
const HEADERS_TIMEOUT_SEC: f64 = 15f64; const HEADERS_TIMEOUT_SEC: f64 = 15f64;
const BODIES_TIMEOUT_SEC: f64 = 5f64; const BODIES_TIMEOUT_SEC: f64 = 10f64;
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64; const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: f64 = 3f64; const SNAPSHOT_MANIFEST_TIMEOUT_SEC: f64 = 3f64;
const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 10f64; const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 10f64;
@ -393,6 +393,8 @@ impl ChainSync {
} }
self.syncing_difficulty = From::from(0u64); self.syncing_difficulty = From::from(0u64);
self.state = SyncState::Idle; self.state = SyncState::Idle;
// Reactivate peers only if some progress has been made
// since the last sync round of if starting fresh.
self.active_peers = self.peers.keys().cloned().collect(); self.active_peers = self.peers.keys().cloned().collect();
} }
@ -404,7 +406,8 @@ impl ChainSync {
self.continue_sync(io); self.continue_sync(io);
} }
/// Remove peer from active peer set /// Remove peer from active peer set. Peer will be reactivated on the next sync
/// round.
fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) { fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "Deactivating peer {}", peer_id); trace!(target: "sync", "Deactivating peer {}", peer_id);
self.active_peers.remove(&peer_id); self.active_peers.remove(&peer_id);
@ -477,7 +480,11 @@ impl ChainSync {
} }
self.peers.insert(peer_id.clone(), peer); self.peers.insert(peer_id.clone(), peer);
// Don't activate peer immediatelly when searching for common block.
// Let the current sync round complete first.
if self.state != SyncState::ChainHead {
self.active_peers.insert(peer_id.clone()); self.active_peers.insert(peer_id.clone());
}
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
if let Some((fork_block, _)) = self.fork_block { if let Some((fork_block, _)) = self.fork_block {
self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader); self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader);
@ -592,9 +599,9 @@ impl ChainSync {
} }
if headers.is_empty() { if headers.is_empty() {
// Peer does not have any new subchain heads, deactivate it nd try with another // Peer does not have any new subchain heads, deactivate it and try with another.
trace!(target: "sync", "{} Disabled for no data", peer_id); trace!(target: "sync", "{} Disabled for no data", peer_id);
io.disable_peer(peer_id); self.deactivate_peer(io, peer_id);
} }
match self.state { match self.state {
SyncState::ChainHead => { SyncState::ChainHead => {

View File

@ -95,6 +95,27 @@ fn forked() {
assert_eq!(&*net.peer(2).chain.numbers.read(), &peer1_chain); assert_eq!(&*net.peer(2).chain.numbers.read(), &peer1_chain);
} }
#[test]
fn forked_with_misbehaving_peer() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
// peer 0 is on a totally different chain with higher total difficulty
net.peer_mut(0).chain = TestBlockChainClient::new_with_extra_data(b"fork".to_vec());
net.peer_mut(0).chain.add_blocks(500, EachBlockWith::Nothing);
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Nothing);
net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Nothing);
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Nothing);
net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle);
// peer 1 should sync to peer 2, others should not change
let peer0_chain = net.peer(0).chain.numbers.read().clone();
let peer2_chain = net.peer(2).chain.numbers.read().clone();
net.sync();
assert_eq!(&*net.peer(0).chain.numbers.read(), &peer0_chain);
assert_eq!(&*net.peer(1).chain.numbers.read(), &peer2_chain);
assert_eq!(&*net.peer(2).chain.numbers.read(), &peer2_chain);
}
#[test] #[test]
fn net_hard_fork() { fn net_hard_fork() {
::env_logger::init().ok(); ::env_logger::init().ok();
@ -121,7 +142,7 @@ fn restart() {
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync_steps(8); net.sync();
// make sure that sync has actually happened // make sure that sync has actually happened
assert!(net.peer(0).chain.chain_info().best_block_number > 100); assert!(net.peer(0).chain.chain_info().best_block_number > 100);

View File

@ -29,6 +29,7 @@ pub struct TestIo<'p> {
pub snapshot_service: &'p TestSnapshotService, pub snapshot_service: &'p TestSnapshotService,
pub queue: &'p mut VecDeque<TestPacket>, pub queue: &'p mut VecDeque<TestPacket>,
pub sender: Option<PeerId>, pub sender: Option<PeerId>,
pub to_disconnect: HashSet<PeerId>,
} }
impl<'p> TestIo<'p> { impl<'p> TestIo<'p> {
@ -37,16 +38,19 @@ impl<'p> TestIo<'p> {
chain: chain, chain: chain,
snapshot_service: ss, snapshot_service: ss,
queue: queue, queue: queue,
sender: sender sender: sender,
to_disconnect: HashSet::new(),
} }
} }
} }
impl<'p> SyncIo for TestIo<'p> { impl<'p> SyncIo for TestIo<'p> {
fn disable_peer(&mut self, _peer_id: PeerId) { fn disable_peer(&mut self, peer_id: PeerId) {
self.disconnect_peer(peer_id);
} }
fn disconnect_peer(&mut self, _peer_id: PeerId) { fn disconnect_peer(&mut self, peer_id: PeerId) {
self.to_disconnect.insert(peer_id);
} }
fn is_expired(&self) -> bool { fn is_expired(&self) -> bool {
@ -150,13 +154,30 @@ impl TestNet {
pub fn sync_step(&mut self) { pub fn sync_step(&mut self) {
for peer in 0..self.peers.len() { for peer in 0..self.peers.len() {
if let Some(packet) = self.peers[peer].queue.pop_front() { if let Some(packet) = self.peers[peer].queue.pop_front() {
let disconnecting = {
let mut p = self.peers.get_mut(packet.recipient).unwrap(); let mut p = self.peers.get_mut(packet.recipient).unwrap();
trace!("--- {} -> {} ---", peer, packet.recipient); trace!("--- {} -> {} ---", peer, packet.recipient);
ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); let to_disconnect = {
trace!("----------------"); let mut io = TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(peer as PeerId));
ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data);
io.to_disconnect
};
for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting
let mut io = TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(*d));
p.sync.write().on_peer_aborting(&mut io, *d);
} }
let mut p = self.peers.get_mut(peer).unwrap(); to_disconnect
p.sync.write().maintain_sync(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, None)); };
for d in &disconnecting {
// notify other peers that this peer is disconnecting
let mut p = self.peers.get_mut(*d).unwrap();
let mut io = TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(peer as PeerId));
p.sync.write().on_peer_aborting(&mut io, peer as PeerId);
}
}
self.sync_step_peer(peer);
} }
} }