diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index b6976a933..5ed51eee8 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1236,7 +1236,7 @@ impl BlockChainClient for Client { self.miner.pending_transactions(self.chain.read().best_block_number()) } - fn queue_infinity_message(&self, message: Bytes) { + fn queue_consensus_message(&self, message: Bytes) { if let Err(e) = self.io_channel.lock().send(ClientIoMessage::NewMessage(message)) { debug!("Ignoring the message, error queueing: {}", e); } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index cf713cb4f..6a9ab4b68 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -651,7 +651,7 @@ impl BlockChainClient for TestBlockChainClient { self.miner.import_external_transactions(self, txs); } - fn queue_infinity_message(&self, _packet: Bytes) { + fn queue_consensus_message(&self, _packet: Bytes) { unimplemented!(); } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 61077ceb1..493f623f6 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -189,8 +189,8 @@ pub trait BlockChainClient : Sync + Send { /// Queue transactions for importing. fn queue_transactions(&self, transactions: Vec); - /// Queue packet - fn queue_infinity_message(&self, message: Bytes); + /// Queue conensus engine message. + fn queue_consensus_message(&self, message: Bytes); /// list all transactions fn pending_transactions(&self) -> Vec; diff --git a/sync/src/api.rs b/sync/src/api.rs index 8d7d08037..ee9031d0e 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -28,19 +28,16 @@ use ethcore::snapshot::SnapshotService; use ethcore::header::BlockNumber; use sync_io::NetSyncIo; use chain::{ChainSync, SyncStatus}; -use infinity::{InfinitySync}; use std::net::{SocketAddr, AddrParseError}; use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; use std::str::FromStr; use parking_lot::RwLock; use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT}; +/// Parity sync protocol pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par"; - /// Ethereum sync protocol -pub const ETH_PROTOCOL: [u8; 3] = *b"eth"; -/// Infinity protocol -pub const INF_PROTOCOL: [u8; 3] = *b"inf"; +pub const ETH_PROTOCOL: ProtocolId = *b"eth"; /// Sync configuration #[derive(Debug, Clone, Copy)] @@ -124,8 +121,6 @@ pub struct EthSync { network: NetworkService, /// Ethereum Protocol handler eth_handler: Arc, - /// Infinity Protocol handler - inf_handler: Arc, /// The main subprotocol name subprotocol_name: [u8; 3], /// Configuration @@ -135,7 +130,6 @@ pub struct EthSync { impl EthSync { /// Creates and register protocol with the network service pub fn new(config: SyncConfig, chain: Arc, snapshot_service: Arc, network_config: NetworkConfiguration) -> Result, NetworkError> { - let inf_sync = InfinitySync::new(&config, chain.clone()); let chain_sync = ChainSync::new(config, &*chain); let service = try!(NetworkService::new(try!(network_config.clone().into_basic()))); let sync = Arc::new(EthSync{ @@ -146,12 +140,6 @@ impl EthSync { snapshot_service: snapshot_service.clone(), overlay: RwLock::new(HashMap::new()), }), - inf_handler: Arc::new(InfProtocolHandler { - sync: RwLock::new(inf_sync), - chain: chain, - snapshot_service: snapshot_service, - overlay: RwLock::new(HashMap::new()), - }), subprotocol_name: config.subprotocol_name, config: network_config, }); @@ -232,37 +220,6 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } } -struct InfProtocolHandler { - /// Shared blockchain client. - chain: Arc, - /// Shared snapshot service. - snapshot_service: Arc, - /// Sync strategy - sync: RwLock, - /// Chain overlay used to cache data such as fork block. - overlay: RwLock>, -} - -impl NetworkProtocolHandler for InfProtocolHandler { - fn initialize(&self, _io: &NetworkContext) { - } - - fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - InfinitySync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); - } - - fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); - } - - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); - } - - fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) { - } -} - impl ChainNotify for EthSync { fn new_blocks(&self, imported: Vec, @@ -295,9 +252,6 @@ impl ChainNotify for EthSync { // register the warp sync subprotocol self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8]) .unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e)); - // register the inf sync subprotocol - self.network.register_protocol(self.inf_handler.clone(), INF_PROTOCOL, ETH_PACKET_COUNT, &[1u8]) - .unwrap_or_else(|e| warn!("Error registering infinity protocol: {:?}", e)); } fn stop(&self) { @@ -308,7 +262,7 @@ impl ChainNotify for EthSync { fn broadcast(&self, message: Vec) { self.network.with_context(ETH_PROTOCOL, |context| { let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay); - self.inf_handler.sync.write().propagate_packet(&mut sync_io, message.clone()); + self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message.clone()); }); } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index e36dcffa3..e317232ba 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -112,6 +112,7 @@ type PacketDecodeError = DecoderError; const PROTOCOL_VERSION_63: u8 = 63; const PROTOCOL_VERSION_1: u8 = 1; +const PROTOCOL_VERSION_2: u8 = 2; const MAX_BODIES_TO_SEND: usize = 256; const MAX_HEADERS_TO_SEND: usize = 512; const MAX_NODE_DATA_TO_SEND: usize = 1024; @@ -148,8 +149,9 @@ const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11; const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12; const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13; const SNAPSHOT_DATA_PACKET: u8 = 0x14; +const CONSENSUS_DATA_PACKET: u8 = 0x15; -pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x15; +pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x16; const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; @@ -607,7 +609,7 @@ impl ChainSync { trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id); return Ok(()); } - if (warp_protocol && peer.protocol_version != PROTOCOL_VERSION_1) || (!warp_protocol && peer.protocol_version != PROTOCOL_VERSION_63) { + if (warp_protocol && peer.protocol_version != PROTOCOL_VERSION_1 && peer.protocol_version != PROTOCOL_VERSION_2) || (!warp_protocol && peer.protocol_version != PROTOCOL_VERSION_63) { io.disable_peer(peer_id); trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); return Ok(()); @@ -1416,8 +1418,9 @@ impl ChainSync { /// Send Status message fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), NetworkError> { - let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer) != 0; - let protocol = if warp_protocol { PROTOCOL_VERSION_1 } else { PROTOCOL_VERSION_63 }; + let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer); + let warp_protocol = warp_protocol_version != 0; + let protocol = if warp_protocol { warp_protocol_version } else { PROTOCOL_VERSION_63 }; trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol); let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 }); let chain = io.chain().chain_info(); @@ -1663,7 +1666,7 @@ impl ChainSync { GET_SNAPSHOT_DATA_PACKET => ChainSync::return_rlp(io, &rlp, peer, ChainSync::return_snapshot_data, |e| format!("Error sending snapshot data: {:?}", e)), - + CONSENSUS_DATA_PACKET => ChainSync::on_consensus_packet(io, peer, &rlp), _ => { sync.write().on_packet(io, peer, packet_id, data); Ok(()) @@ -1996,6 +1999,21 @@ impl ChainSync { self.restart(io); } } + + /// Called when peer sends us new consensus packet + fn on_consensus_packet(io: &mut SyncIo, _peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + io.chain().queue_consensus_message(r.as_raw().to_vec()); + Ok(()) + } + + /// Broadcast consensus message to peers. + pub fn propagate_consensus_packet(&mut self, io: &mut SyncIo, packet: Bytes) { + let lucky_peers: Vec<_> = self.peers.iter().filter_map(|(id, p)| if p.protocol_version == PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect(); + trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers); + for peer_id in lucky_peers { + self.send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone()); + } + } } #[cfg(test)] diff --git a/sync/src/infinity.rs b/sync/src/infinity.rs deleted file mode 100644 index 936060a1d..000000000 --- a/sync/src/infinity.rs +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2015, 2016 Ethcore (UK) Ltd. -// This file is part of Parity. - -// Parity is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity. If not, see . - -/// Infinity networking - -use util::*; -use network::*; -use rlp::{UntrustedRlp, DecoderError, RlpStream, View, Stream}; -use ethcore::client::{BlockChainClient}; -use sync_io::SyncIo; -use super::SyncConfig; - -known_heap_size!(0, PeerInfo); - -type PacketDecodeError = DecoderError; - -const PROTOCOL_VERSION: u8 = 1u8; - -const STATUS_PACKET: u8 = 0x00; -const GENERIC_PACKET: u8 = 0x01; - -/// Syncing status and statistics -#[derive(Clone)] -pub struct NetworkStatus { - pub protocol_version: u8, - /// The underlying p2p network version. - pub network_id: usize, - /// Total number of connected peers - pub num_peers: usize, - /// Total number of active peers - pub num_active_peers: usize, -} - -#[derive(Clone)] -/// Inf peer information -struct PeerInfo { - /// inf protocol version - protocol_version: u32, - /// Peer chain genesis hash - genesis: H256, - /// Peer network id - network_id: usize, -} - -/// Infinity protocol handler. -pub struct InfinitySync { - chain: Arc, - /// All connected peers - peers: HashMap, - /// Network ID - network_id: usize, -} - -impl InfinitySync { - /// Create a new instance of syncing strategy. - pub fn new(config: &SyncConfig, chain: Arc) -> InfinitySync { - let mut sync = InfinitySync { - chain: chain, - peers: HashMap::new(), - network_id: config.network_id, - }; - sync.reset(); - sync - } - - /// @returns Synchonization status - pub fn _status(&self) -> NetworkStatus { - NetworkStatus { - protocol_version: 1, - network_id: self.network_id, - num_peers: self.peers.len(), - num_active_peers: 0, - } - } - - #[cfg_attr(feature="dev", allow(for_kv_map))] // Because it's not possible to get `values_mut()` - /// Reset sync. Clear all downloaded data but keep the queue - fn reset(&mut self) { - } - - /// Called by peer to report status - fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let peer = PeerInfo { - protocol_version: try!(r.val_at(0)), - network_id: try!(r.val_at(1)), - genesis: try!(r.val_at(2)), - }; - trace!(target: "inf", "New peer {} (protocol: {}, network: {:?}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.genesis); - if self.peers.contains_key(&peer_id) { - debug!(target: "inf", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); - return Ok(()); - } - let chain_info = io.chain().chain_info(); - if peer.genesis != chain_info.genesis_hash { - io.disable_peer(peer_id); - trace!(target: "inf", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, chain_info.genesis_hash, peer.genesis); - return Ok(()); - } - if peer.network_id != self.network_id { - io.disable_peer(peer_id); - trace!(target: "inf", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id); - return Ok(()); - } - - self.peers.insert(peer_id.clone(), peer); - Ok(()) - } - - /// Called when a new peer is connected - pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "inf", "== Connected {}: {}", peer, io.peer_info(peer)); - if let Err(e) = self.send_status(io) { - debug!(target:"inf", "Error sending status request: {:?}", e); - io.disable_peer(peer); - } - } - - /// Generic packet sender - fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { - if self.peers.contains_key(&peer_id) { - if let Err(e) = sync.send(peer_id, packet_id, packet) { - debug!(target:"inf", "Error sending request: {:?}", e); - sync.disable_peer(peer_id); - } - } - } - - /// Called when peer sends us new transactions - fn on_peer_packet(&mut self, _io: &mut SyncIo, _peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - self.chain.queue_infinity_message(r.as_raw().to_vec()); - Ok(()) - } - - /// Called by peer when it is disconnecting - pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "inf", "== Disconnecting {}: {}", peer, io.peer_info(peer)); - if self.peers.contains_key(&peer) { - debug!(target: "inf", "Disconnected {}", peer); - self.peers.remove(&peer); - } - } - - /// Send Status message - fn send_status(&mut self, io: &mut SyncIo) -> Result<(), NetworkError> { - let mut packet = RlpStream::new_list(5); - let chain = io.chain().chain_info(); - packet.append(&(PROTOCOL_VERSION as u32)); - packet.append(&self.network_id); - packet.append(&chain.total_difficulty); - packet.append(&chain.best_block_hash); - packet.append(&chain.genesis_hash); - io.respond(STATUS_PACKET, packet.out()) - } - - pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { - let rlp = UntrustedRlp::new(data); - match packet_id { - STATUS_PACKET => sync.write().on_peer_status(io, peer, &rlp).unwrap_or_else( - |e| trace!(target: "inf", "Error processing packet: {:?}", e)), - GENERIC_PACKET => sync.write().on_peer_packet(io, peer, &rlp).unwrap_or_else( - |e| warn!(target: "inf", "Error queueing packet: {:?}", e)), - p @ _ => trace!(target: "inf", "Unexpected packet {} from {}", p, peer), - }; - } - - pub fn propagate_packet(&mut self, io: &mut SyncIo, packet: Bytes) { - let lucky_peers: Vec<_> = self.peers.keys().cloned().collect(); - trace!(target: "inf", "Sending packets to {:?}", lucky_peers); - for peer_id in lucky_peers { - self.send_packet(io, peer_id, GENERIC_PACKET, packet.clone()); - } - } -} - -#[cfg(test)] -mod tests { -} - diff --git a/sync/src/lib.rs b/sync/src/lib.rs index d7c208030..2061e4e3a 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -50,7 +50,6 @@ mod chain; mod blocks; mod block_sync; mod sync_io; -mod infinity; mod snapshot; mod transactions_stats;