Disconnect peers on a fork (#1738)

This commit is contained in:
Arkadiy Paronyan 2016-07-27 21:38:22 +02:00 committed by Gav Wood
parent eaa41ea568
commit 6b1e722a6b
10 changed files with 122 additions and 16 deletions

View File

@ -18,7 +18,9 @@
"accountStartNonce": "0x00", "accountStartNonce": "0x00",
"maximumExtraDataSize": "0x20", "maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388", "minGasLimit": "0x1388",
"networkID" : "0x1" "networkID" : "0x1",
"forkBlock": "0x1d4c00",
"forkCanonHash": "0x94365e3a8c0b35089c1d1195081fe7489b528a84b22199c916180db8b28ade7f"
}, },
"genesis": { "genesis": {
"seal": { "seal": {

View File

@ -137,7 +137,9 @@
"accountStartNonce": "0x00", "accountStartNonce": "0x00",
"maximumExtraDataSize": "0x20", "maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388", "minGasLimit": "0x1388",
"networkID" : "0x1" "networkID" : "0x1",
"forkBlock": "0x1d4c00",
"forkCanonHash": "0x4985f5ca3d2afbec36529aa96f74de3cc10a2a4a6c44f2157a57d2c6059a11bb"
}, },
"genesis": { "genesis": {
"seal": { "seal": {

View File

@ -38,6 +38,8 @@ pub struct CommonParams {
pub network_id: U256, pub network_id: U256,
/// Minimum gas limit. /// Minimum gas limit.
pub min_gas_limit: U256, pub min_gas_limit: U256,
/// Fork block to check.
pub fork_block: Option<(BlockNumber, H256)>,
} }
impl From<ethjson::spec::Params> for CommonParams { impl From<ethjson::spec::Params> for CommonParams {
@ -47,6 +49,7 @@ impl From<ethjson::spec::Params> for CommonParams {
maximum_extra_data_size: p.maximum_extra_data_size.into(), maximum_extra_data_size: p.maximum_extra_data_size.into(),
network_id: p.network_id.into(), network_id: p.network_id.into(),
min_gas_limit: p.min_gas_limit.into(), min_gas_limit: p.min_gas_limit.into(),
fork_block: if let (Some(n), Some(h)) = (p.fork_block, p.fork_hash) { Some((n.into(), h.into())) } else { None },
} }
} }
} }
@ -151,6 +154,9 @@ impl Spec {
/// Get the configured Network ID. /// Get the configured Network ID.
pub fn network_id(&self) -> U256 { self.params.network_id } pub fn network_id(&self) -> U256 { self.params.network_id }
/// Get the configured network fork block.
pub fn fork_block(&self) -> Option<(BlockNumber, H256)> { self.params.fork_block }
/// Get the header of the genesis block. /// Get the header of the genesis block.
pub fn genesis_header(&self) -> Header { pub fn genesis_header(&self) -> Header {
Header { Header {

View File

@ -17,6 +17,7 @@
//! Spec params deserialization. //! Spec params deserialization.
use uint::Uint; use uint::Uint;
use hash::H256;
/// Spec params. /// Spec params.
#[derive(Debug, PartialEq, Deserialize)] #[derive(Debug, PartialEq, Deserialize)]
@ -33,6 +34,12 @@ pub struct Params {
/// Minimum gas limit. /// Minimum gas limit.
#[serde(rename="minGasLimit")] #[serde(rename="minGasLimit")]
pub min_gas_limit: Uint, pub min_gas_limit: Uint,
/// Option fork block number to check.
#[serde(rename="forkBlock")]
pub fork_block: Option<Uint>,
/// Expected fork block hash.
#[serde(rename="forkCanonHash")]
pub fork_hash: Option<H256>,
} }
#[cfg(test)] #[cfg(test)]

View File

@ -79,6 +79,8 @@ mod tests {
"maximumExtraDataSize": "0x20", "maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388", "minGasLimit": "0x1388",
"networkID" : "0x2" "networkID" : "0x2"
"forkBlock": "0xffffffffffffffff",
"forkCanonHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
}, },
"genesis": { "genesis": {
"seal": { "seal": {

View File

@ -130,6 +130,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
Some(id) => id, Some(id) => id,
None => spec.network_id(), None => spec.network_id(),
}; };
sync_config.fork_block = spec.fork_block().clone();
// prepare account provider // prepare account provider
let account_provider = Arc::new(try!(prepare_account_provider(&cmd.dirs, cmd.acc_conf))); let account_provider = Arc::new(try!(prepare_account_provider(&cmd.dirs, cmd.acc_conf)));

View File

@ -20,6 +20,7 @@ use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, Peer
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode}; NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode};
use util::{TimerToken, U256, H256, UtilError, Secret, Populatable}; use util::{TimerToken, U256, H256, UtilError, Secret, Populatable};
use ethcore::client::{BlockChainClient, ChainNotify}; use ethcore::client::{BlockChainClient, ChainNotify};
use ethcore::header::BlockNumber;
use io::NetSyncIo; use io::NetSyncIo;
use chain::{ChainSync, SyncStatus}; use chain::{ChainSync, SyncStatus};
use std::net::{SocketAddr, AddrParseError}; use std::net::{SocketAddr, AddrParseError};
@ -38,6 +39,8 @@ pub struct SyncConfig {
pub max_download_ahead_blocks: usize, pub max_download_ahead_blocks: usize,
/// Network ID /// Network ID
pub network_id: U256, pub network_id: U256,
/// Fork block to check
pub fork_block: Option<(BlockNumber, H256)>,
} }
impl Default for SyncConfig { impl Default for SyncConfig {
@ -45,6 +48,7 @@ impl Default for SyncConfig {
SyncConfig { SyncConfig {
max_download_ahead_blocks: 20000, max_download_ahead_blocks: 20000,
network_id: U256::from(1), network_id: U256::from(1),
fork_block: None,
} }
} }
} }

View File

@ -137,6 +137,7 @@ const RECEIPTS_PACKET: u8 = 0x10;
const HEADERS_TIMEOUT_SEC: f64 = 15f64; const HEADERS_TIMEOUT_SEC: f64 = 15f64;
const BODIES_TIMEOUT_SEC: f64 = 5f64; const BODIES_TIMEOUT_SEC: f64 = 5f64;
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;
#[derive(Copy, Clone, Eq, PartialEq, Debug)] #[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state /// Sync state
@ -191,6 +192,7 @@ impl SyncStatus {
/// Peer data type requested /// Peer data type requested
enum PeerAsking { enum PeerAsking {
Nothing, Nothing,
ForkHeader,
BlockHeaders, BlockHeaders,
BlockBodies, BlockBodies,
Heads, Heads,
@ -221,6 +223,14 @@ struct PeerInfo {
ask_time: f64, ask_time: f64,
/// Pending request is expird and result should be ignored /// Pending request is expird and result should be ignored
expired: bool, expired: bool,
/// Peer fork confirmed
confirmed: bool,
}
impl PeerInfo {
fn is_available(&self) -> bool {
self.confirmed && !self.expired
}
} }
/// Blockchain sync handler. /// Blockchain sync handler.
@ -254,6 +264,8 @@ pub struct ChainSync {
round_parents: VecDeque<(H256, H256)>, round_parents: VecDeque<(H256, H256)>,
/// Network ID /// Network ID
network_id: U256, network_id: U256,
/// Optional fork block to check
fork_block: Option<(BlockNumber, H256)>,
} }
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@ -277,6 +289,7 @@ impl ChainSync {
round_parents: VecDeque::new(), round_parents: VecDeque::new(),
_max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), _max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id, network_id: config.network_id,
fork_block: config.fork_block,
}; };
sync.reset(); sync.reset();
sync sync
@ -293,8 +306,8 @@ impl ChainSync {
highest_block_number: self.highest_block.map(|n| max(n, self.last_imported_block)), highest_block_number: self.highest_block.map(|n| max(n, self.last_imported_block)),
blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 }, blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 },
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 }, blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
num_peers: self.peers.len(), num_peers: self.peers.values().filter(|p| p.confirmed).count(),
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), num_active_peers: self.peers.values().filter(|p| p.confirmed && p.asking != PeerAsking::Nothing).count(),
mem_used: mem_used:
self.blocks.heap_size() self.blocks.heap_size()
+ self.peers.heap_size_of_children() + self.peers.heap_size_of_children()
@ -316,7 +329,7 @@ impl ChainSync {
p.asking_blocks.clear(); p.asking_blocks.clear();
p.asking_hash = None; p.asking_hash = None;
// mark any pending requests as expired // mark any pending requests as expired
if p.asking != PeerAsking::Nothing { if p.asking != PeerAsking::Nothing && p.confirmed {
p.expired = true; p.expired = true;
} }
} }
@ -370,6 +383,7 @@ impl ChainSync {
asking_hash: None, asking_hash: None,
ask_time: 0f64, ask_time: 0f64,
expired: false, expired: false,
confirmed: self.fork_block.is_none(),
}; };
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis); trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
@ -397,16 +411,41 @@ impl ChainSync {
self.peers.insert(peer_id.clone(), peer); self.peers.insert(peer_id.clone(), peer);
self.active_peers.insert(peer_id.clone()); self.active_peers.insert(peer_id.clone());
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
self.sync_peer(io, peer_id, false); if let Some((fork_block, _)) = self.fork_block {
self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader);
} else {
self.sync_peer(io, peer_id, false);
}
Ok(()) Ok(())
} }
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))] #[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
/// Called by peer once it has new block headers during sync /// Called by peer once it has new block headers during sync
fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let confirmed = match self.peers.get_mut(&peer_id) {
Some(ref mut peer) if peer.asking == PeerAsking::ForkHeader => {
let item_count = r.item_count();
if item_count == 0 || (item_count == 1 && try!(r.at(0)).as_raw().sha3() == self.fork_block.unwrap().1) {
trace!(target: "sync", "{}: Confirmed peer", peer_id);
peer.asking = PeerAsking::Nothing;
peer.confirmed = true;
true
} else {
trace!(target: "sync", "{}: Fork mismatch", peer_id);
io.disconnect_peer(peer_id);
false
}
},
_ => false,
};
if confirmed {
self.sync_peer(io, peer_id, false);
return Ok(());
}
self.clear_peer_download(peer_id); self.clear_peer_download(peer_id);
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders }; let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders };
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() { if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() {
trace!(target: "sync", "{}: Ignored unexpected headers", peer_id); trace!(target: "sync", "{}: Ignored unexpected headers", peer_id);
self.continue_sync(io); self.continue_sync(io);
@ -474,14 +513,14 @@ impl ChainSync {
// Disable the peer for this syncing round if it gives invalid chain // Disable the peer for this syncing round if it gives invalid chain
if !valid_response { if !valid_response {
trace!(target: "sync", "{} Deactivated for invalid headers response", peer_id); trace!(target: "sync", "{} Disabled for invalid headers response", peer_id);
self.deactivate_peer(io, peer_id); io.disable_peer(peer_id);
} }
if headers.is_empty() { if headers.is_empty() {
// Peer does not have any new subchain heads, deactivate it nd try with another // Peer does not have any new subchain heads, deactivate it nd try with another
trace!(target: "sync", "{} Deactivated for no data", peer_id); trace!(target: "sync", "{} Disabled for no data", peer_id);
self.deactivate_peer(io, peer_id); io.disable_peer(peer_id);
} }
match self.state { match self.state {
SyncState::ChainHead => { SyncState::ChainHead => {
@ -692,7 +731,8 @@ impl ChainSync {
/// Resume downloading /// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) { fn continue_sync(&mut self, io: &mut SyncIo) {
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect(); let mut peers: Vec<(PeerId, U256)> = self.peers.iter().filter_map(|(k, p)|
if p.is_available() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
thread_rng().shuffle(&mut peers); //TODO: sort by rating thread_rng().shuffle(&mut peers); //TODO: sort by rating
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len()); trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
for (p, _) in peers { for (p, _) in peers {
@ -700,7 +740,7 @@ impl ChainSync {
self.sync_peer(io, p, false); self.sync_peer(io, p, false);
} }
} }
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && !p.expired) { if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.is_available()) {
self.complete_sync(); self.complete_sync();
} }
} }
@ -726,7 +766,7 @@ impl ChainSync {
} }
let (peer_latest, peer_difficulty) = { let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).unwrap(); let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing { if peer.asking != PeerAsking::Nothing || !peer.is_available() {
return; return;
} }
if self.state == SyncState::Waiting { if self.state == SyncState::Waiting {
@ -924,6 +964,17 @@ impl ChainSync {
.asking_hash = Some(h.clone()); .asking_hash = Some(h.clone());
} }
/// Request headers from a peer by block number
#[cfg_attr(feature="dev", allow(too_many_arguments))]
fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool, asking: PeerAsking) {
trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n);
let mut rlp = RlpStream::new_list(4);
rlp.append(&n);
rlp.append(&count);
rlp.append(&skip);
rlp.append(&if reverse {1u32} else {0u32});
self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out());
}
/// Request block bodies from a peer /// Request block bodies from a peer
fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>) { fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>) {
let mut rlp = RlpStream::new_list(hashes.len()); let mut rlp = RlpStream::new_list(hashes.len());
@ -977,6 +1028,9 @@ impl ChainSync {
if !io.is_chain_queue_empty() { if !io.is_chain_queue_empty() {
return Ok(()); return Ok(());
} }
if self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
}
let mut item_count = r.item_count(); let mut item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
@ -1212,6 +1266,7 @@ impl ChainSync {
PeerAsking::BlockHeaders | PeerAsking::Heads => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC, PeerAsking::BlockHeaders | PeerAsking::Heads => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC,
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC, PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
PeerAsking::Nothing => false, PeerAsking::Nothing => false,
PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC,
}; };
if timeout { if timeout {
trace!(target:"sync", "Timeout {}", peer_id); trace!(target:"sync", "Timeout {}", peer_id);
@ -1629,6 +1684,7 @@ mod tests {
asking_hash: None, asking_hash: None,
ask_time: 0f64, ask_time: 0f64,
expired: false, expired: false,
confirmed: false,
}); });
sync sync
} }

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use util::*; use util::*;
use ethcore::client::{BlockChainClient, BlockID, EachBlockWith}; use ethcore::client::{TestBlockChainClient, BlockChainClient, BlockID, EachBlockWith};
use chain::{SyncState}; use chain::{SyncState};
use super::helpers::*; use super::helpers::*;
@ -95,6 +95,25 @@ fn forked() {
assert_eq!(net.peer(2).chain.numbers.read().deref(), &peer1_chain); assert_eq!(net.peer(2).chain.numbers.read().deref(), &peer1_chain);
} }
#[test]
fn net_hard_fork() {
::env_logger::init().ok();
let ref_client = TestBlockChainClient::new();
ref_client.add_blocks(50, EachBlockWith::Uncle);
{
let mut net = TestNet::new_with_fork(2, Some((50, ref_client.block_hash(BlockID::Number(50)).unwrap())));
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle);
net.sync();
assert_eq!(net.peer(1).chain.chain_info().best_block_number, 100);
}
{
let mut net = TestNet::new_with_fork(2, Some((50, ref_client.block_hash(BlockID::Number(50)).unwrap())));
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing);
net.sync();
assert_eq!(net.peer(1).chain.chain_info().best_block_number, 0);
}
}
#[test] #[test]
fn restart() { fn restart() {
let mut net = TestNet::new(3); let mut net = TestNet::new(3);

View File

@ -16,6 +16,7 @@
use util::*; use util::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient}; use ethcore::client::{TestBlockChainClient, BlockChainClient};
use ethcore::header::BlockNumber;
use io::SyncIo; use io::SyncIo;
use chain::ChainSync; use chain::ChainSync;
use ::SyncConfig; use ::SyncConfig;
@ -89,13 +90,19 @@ pub struct TestNet {
impl TestNet { impl TestNet {
pub fn new(n: usize) -> TestNet { pub fn new(n: usize) -> TestNet {
Self::new_with_fork(n, None)
}
pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet {
let mut net = TestNet { let mut net = TestNet {
peers: Vec::new(), peers: Vec::new(),
started: false, started: false,
}; };
for _ in 0..n { for _ in 0..n {
let chain = TestBlockChainClient::new(); let chain = TestBlockChainClient::new();
let sync = ChainSync::new(SyncConfig::default(), &chain); let mut config = SyncConfig::default();
config.fork_block = fork;
let sync = ChainSync::new(config, &chain);
net.peers.push(TestPeer { net.peers.push(TestPeer {
sync: RwLock::new(sync), sync: RwLock::new(sync),
chain: chain, chain: chain,