From 9ec091e0cffbb36e1a5e81c1ef3d3ab767a5533f Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Mon, 24 Oct 2016 16:24:35 +0200 Subject: [PATCH] Move snapshot sync to a subprotocol (#2820) --- sync/src/api.rs | 27 +++++++++++++++++++++------ sync/src/chain.rs | 32 +++++++++++++++++++++----------- sync/src/sync_io.rs | 12 +++++++++--- sync/src/tests/helpers.rs | 7 ++++++- util/network/src/host.rs | 12 +++++++----- util/network/src/lib.rs | 8 ++------ util/network/src/service.rs | 3 ++- util/network/src/session.rs | 23 ++++++++++++++++++++++- util/network/src/tests.rs | 4 ++-- 9 files changed, 92 insertions(+), 36 deletions(-) diff --git a/sync/src/api.rs b/sync/src/api.rs index 1ea0a5bbb..d0d734024 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::collections::HashMap; use util::Bytes; -use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, +use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId, NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError}; use util::{U256, H256}; use io::{TimerToken}; @@ -30,6 +30,9 @@ use std::net::{SocketAddr, AddrParseError}; use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; use std::str::FromStr; use parking_lot::RwLock; +use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT}; + +pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"bam"; /// Sync configuration #[derive(Debug, Clone, Copy)] @@ -78,7 +81,7 @@ pub struct PeerInfo { /// Node client ID pub client_version: String, /// Capabilities - pub capabilities: Vec, + pub capabilities: Vec, /// Remote endpoint address pub remote_address: String, /// Local endpoint address @@ -150,7 +153,9 @@ struct SyncProtocolHandler { impl NetworkProtocolHandler for SyncProtocolHandler { fn initialize(&self, io: &NetworkContext) { - io.register_timer(0, 1000).expect("Error registering sync timer"); + if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { + io.register_timer(0, 1000).expect("Error registering sync timer"); + } } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { @@ -158,11 +163,18 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); + // If warp protocol is supported only allow warp handshake + let warp_protocol = io.protocol_version(WARP_SYNC_PROTOCOL_ID, *peer).unwrap_or(0) != 0; + let warp_context = io.subprotocol_name() == WARP_SYNC_PROTOCOL_ID; + if warp_protocol == warp_context { + self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); + } } fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); + if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { + self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); + } } fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { @@ -195,8 +207,11 @@ impl ChainNotify for EthSync { fn start(&self) { self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); - self.network.register_protocol(self.handler.clone(), self.subprotocol_name, &[62u8, 63u8, 64u8]) + self.network.register_protocol(self.handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8]) .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); + // register the warp sync subprotocol + self.network.register_protocol(self.handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8]) + .unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e)); } fn stop(&self) { diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 181d8dda3..850dff228 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -101,14 +101,14 @@ use super::SyncConfig; use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError}; use snapshot::{Snapshot, ChunkType}; use rand::{thread_rng, Rng}; -use api::PeerInfo as PeerInfoDigest; +use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; known_heap_size!(0, PeerInfo); type PacketDecodeError = DecoderError; const PROTOCOL_VERSION_63: u8 = 63; -const PROTOCOL_VERSION_64: u8 = 64; +const PROTOCOL_VERSION_1: u8 = 1; const MAX_BODIES_TO_SEND: usize = 256; const MAX_HEADERS_TO_SEND: usize = 512; const MAX_NODE_DATA_TO_SEND: usize = 1024; @@ -137,11 +137,16 @@ const GET_NODE_DATA_PACKET: u8 = 0x0d; const NODE_DATA_PACKET: u8 = 0x0e; const GET_RECEIPTS_PACKET: u8 = 0x0f; const RECEIPTS_PACKET: u8 = 0x10; + +pub const ETH_PACKET_COUNT: u8 = 0x11; + const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11; const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12; const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13; const SNAPSHOT_DATA_PACKET: u8 = 0x14; +pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x15; + const HEADERS_TIMEOUT_SEC: f64 = 15f64; const BODIES_TIMEOUT_SEC: f64 = 10f64; const RECEIPTS_TIMEOUT_SEC: f64 = 10f64; @@ -354,7 +359,7 @@ impl ChainSync { let last_imported_number = self.new_blocks.last_imported_block_number(); SyncStatus { state: self.state.clone(), - protocol_version: if self.state == SyncState::SnapshotData { PROTOCOL_VERSION_64 } else { PROTOCOL_VERSION_63 }, + protocol_version: PROTOCOL_VERSION_63, network_id: self.network_id, start_block_number: self.starting_block, last_imported_block_number: Some(last_imported_number), @@ -471,6 +476,7 @@ impl ChainSync { /// Called by peer to report status fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let protocol_version: u8 = try!(r.val_at(0)); + let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0; let peer = PeerInfo { protocol_version: protocol_version, network_id: try!(r.val_at(1)), @@ -485,8 +491,8 @@ impl ChainSync { expired: false, confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed }, asking_snapshot_data: None, - snapshot_hash: if protocol_version == PROTOCOL_VERSION_64 { Some(try!(r.val_at(5))) } else { None }, - snapshot_number: if protocol_version == PROTOCOL_VERSION_64 { Some(try!(r.val_at(6))) } else { None }, + snapshot_hash: if warp_protocol { Some(try!(r.val_at(5))) } else { None }, + snapshot_number: if warp_protocol { Some(try!(r.val_at(6))) } else { None }, block_set: None, }; @@ -511,7 +517,7 @@ impl ChainSync { trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id); return Ok(()); } - if peer.protocol_version != PROTOCOL_VERSION_64 && peer.protocol_version != PROTOCOL_VERSION_63 { + if (warp_protocol && peer.protocol_version != PROTOCOL_VERSION_1) || (!warp_protocol && peer.protocol_version != PROTOCOL_VERSION_63) { io.disable_peer(peer_id); trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version); return Ok(()); @@ -1291,17 +1297,17 @@ impl ChainSync { /// Send Status message fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), NetworkError> { - let protocol = io.eth_protocol_version(peer); + let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer) != 0; + let protocol = if warp_protocol { PROTOCOL_VERSION_1 } else { PROTOCOL_VERSION_63 }; trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol); - let pv64 = protocol >= PROTOCOL_VERSION_64; - let mut packet = RlpStream::new_list(if pv64 { 7 } else { 5 }); + let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 }); let chain = io.chain().chain_info(); packet.append(&(protocol as u32)); packet.append(&self.network_id); packet.append(&chain.total_difficulty); packet.append(&chain.best_block_hash); packet.append(&chain.genesis_hash); - if pv64 { + if warp_protocol { let manifest = io.snapshot_service().manifest(); let block_number = manifest.as_ref().map_or(0, |m| m.block_number); let manifest_hash = manifest.map_or(H256::new(), |m| m.into_rlp().sha3()); @@ -1354,6 +1360,7 @@ impl ChainSync { let mut data = Bytes::new(); let inc = (skip + 1) as BlockNumber; let overlay = io.chain_overlay().read(); + while number <= last && count < max_count { if let Some(hdr) = overlay.get(&number) { trace!(target: "sync", "{}: Returning cached fork header", peer_id); @@ -1362,6 +1369,9 @@ impl ChainSync { } else if let Some(mut hdr) = io.chain().block_header(BlockID::Number(number)) { data.append(&mut hdr); count += 1; + } else { + // No required block. + break; } if reverse { if number <= inc || number == 0 { @@ -1471,7 +1481,7 @@ impl ChainSync { Ok(Some((SNAPSHOT_MANIFEST_PACKET, rlp))) } - /// Respond to GetSnapshotManifest request + /// Respond to GetSnapshotData request fn return_snapshot_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult { let hash: H256 = try!(r.val_at(0)); trace!(target: "sync", "{} -> GetSnapshotData {:?}", peer_id, hash); diff --git a/sync/src/sync_io.rs b/sync/src/sync_io.rs index 24a73437b..25d235c60 100644 --- a/sync/src/sync_io.rs +++ b/sync/src/sync_io.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . use std::collections::HashMap; -use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo}; +use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo, ProtocolId}; use util::Bytes; use ethcore::client::BlockChainClient; use ethcore::header::BlockNumber; @@ -44,8 +44,10 @@ pub trait SyncIo { } /// Returns information on p2p session fn peer_session_info(&self, peer_id: PeerId) -> Option; - /// Maximum mutuallt supported ETH protocol version + /// Maximum mutually supported ETH protocol version fn eth_protocol_version(&self, peer_id: PeerId) -> u8; + /// Maximum mutually supported version of a gien protocol. + fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8; /// Returns if the chain block queue empty fn is_chain_queue_empty(&self) -> bool { self.chain().queue_info().is_empty() @@ -117,7 +119,11 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { } fn eth_protocol_version(&self, peer_id: PeerId) -> u8 { - self.network.protocol_version(peer_id, self.network.subprotocol_name()).unwrap_or(0) + self.network.protocol_version(self.network.subprotocol_name(), peer_id).unwrap_or(0) + } + + fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 { + self.network.protocol_version(*protocol, peer_id).unwrap_or(0) } } diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 7fb2319c7..801db234d 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -21,6 +21,7 @@ use ethcore::client::{TestBlockChainClient, BlockChainClient}; use ethcore::header::BlockNumber; use ethcore::snapshot::SnapshotService; use sync_io::SyncIo; +use api::WARP_SYNC_PROTOCOL_ID; use chain::ChainSync; use ::SyncConfig; @@ -90,7 +91,11 @@ impl<'p> SyncIo for TestIo<'p> { } fn eth_protocol_version(&self, _peer: PeerId) -> u8 { - 64 + 63 + } + + fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 { + if protocol == &WARP_SYNC_PROTOCOL_ID { 1 } else { self.eth_protocol_version(peer_id) } } fn chain_overlay(&self) -> &RwLock> { diff --git a/util/network/src/host.rs b/util/network/src/host.rs index f5c14b0f9..a6d61d26f 100644 --- a/util/network/src/host.rs +++ b/util/network/src/host.rs @@ -155,6 +155,8 @@ pub enum NetworkIoMessage { protocol: ProtocolId, /// Supported protocol versions. versions: Vec, + /// Number of packet IDs reserved by the protocol. + packet_count: u8, }, /// Register a new protocol timer AddTimer { @@ -251,9 +253,8 @@ impl<'s> NetworkContext<'s> { self.io.channel() } - /// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected. + /// Disconnect a peer and prevent it from connecting again. pub fn disable_peer(&self, peer: PeerId) { - //TODO: remove capability, disconnect if no capabilities left self.io.message(NetworkIoMessage::DisablePeer(peer)) .unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e)); } @@ -290,7 +291,7 @@ impl<'s> NetworkContext<'s> { } /// Returns max version for a given protocol. - pub fn protocol_version(&self, peer: PeerId, protocol: ProtocolId) -> Option { + pub fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option { let session = self.resolve_session(peer); session.and_then(|s| s.lock().capability_version(protocol)) } @@ -1018,7 +1019,8 @@ impl IoHandler for Host { NetworkIoMessage::AddHandler { ref handler, ref protocol, - ref versions + ref versions, + ref packet_count, } => { let h = handler.clone(); let reserved = self.reserved_nodes.read(); @@ -1026,7 +1028,7 @@ impl IoHandler for Host { self.handlers.write().insert(*protocol, h); let mut info = self.info.write(); for v in versions { - info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count:0 }); + info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count: *packet_count }); } }, NetworkIoMessage::AddTimer { diff --git a/util/network/src/lib.rs b/util/network/src/lib.rs index 50396b6fb..627458c1c 100644 --- a/util/network/src/lib.rs +++ b/util/network/src/lib.rs @@ -45,7 +45,7 @@ //! //! fn main () { //! let mut service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service"); -//! service.register_protocol(Arc::new(MyHandler), *b"myp", &[1u8]); +//! service.register_protocol(Arc::new(MyHandler), *b"myp", 1, &[1u8]); //! service.start().expect("Error starting service"); //! //! // Wait for quit condition @@ -91,13 +91,9 @@ mod ip_utils; #[cfg(test)] mod tests; -pub use host::PeerId; -pub use host::PacketId; -pub use host::NetworkContext; +pub use host::{PeerId, PacketId, ProtocolId, NetworkContext, NetworkIoMessage, NetworkConfiguration}; pub use service::NetworkService; -pub use host::NetworkIoMessage; pub use error::NetworkError; -pub use host::NetworkConfiguration; pub use stats::NetworkStats; pub use session::SessionInfo; diff --git a/util/network/src/service.rs b/util/network/src/service.rs index 0b59f8bc7..3fe6ae04a 100644 --- a/util/network/src/service.rs +++ b/util/network/src/service.rs @@ -73,11 +73,12 @@ impl NetworkService { } /// Regiter a new protocol handler with the event loop. - pub fn register_protocol(&self, handler: Arc, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> { + pub fn register_protocol(&self, handler: Arc, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), NetworkError> { try!(self.io_service.send_message(NetworkIoMessage::AddHandler { handler: handler, protocol: protocol, versions: versions.to_vec(), + packet_count: packet_count, })); Ok(()) } diff --git a/util/network/src/session.rs b/util/network/src/session.rs index c7f196680..1791a441d 100644 --- a/util/network/src/session.rs +++ b/util/network/src/session.rs @@ -16,6 +16,7 @@ use std::{str, io}; use std::net::SocketAddr; +use std::cmp::Ordering; use std::sync::*; use mio::*; use mio::tcp::*; @@ -122,7 +123,7 @@ impl ToString for PeerCapabilityInfo { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct SessionCapabilityInfo { pub protocol: [u8; 3], pub version: u8, @@ -130,6 +131,23 @@ pub struct SessionCapabilityInfo { pub id_offset: u8, } +impl PartialOrd for SessionCapabilityInfo { + fn partial_cmp(&self, other: &SessionCapabilityInfo) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for SessionCapabilityInfo { + fn cmp(&self, b: &SessionCapabilityInfo) -> Ordering { + // By protocol id first + if self.protocol != b.protocol { + return self.protocol.cmp(&b.protocol); + } + // By version + self.version.cmp(&b.version) + } +} + const PACKET_HELLO: u8 = 0x80; const PACKET_DISCONNECT: u8 = 0x01; const PACKET_PING: u8 = 0x02; @@ -441,6 +459,9 @@ impl Session { } } + // Sort capabilities alphabeticaly. + caps.sort(); + i = 0; let mut offset: u8 = PACKET_USER; while i < caps.len() { diff --git a/util/network/src/tests.rs b/util/network/src/tests.rs index 1b0c6eb6c..467dbcbd8 100644 --- a/util/network/src/tests.rs +++ b/util/network/src/tests.rs @@ -41,7 +41,7 @@ impl TestProtocol { /// Creates and register protocol with the network service pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc { let handler = Arc::new(TestProtocol::new(drop_session)); - service.register_protocol(handler.clone(), *b"tst", &[42u8, 43u8]).expect("Error registering test protocol handler"); + service.register_protocol(handler.clone(), *b"tst", 1, &[42u8, 43u8]).expect("Error registering test protocol handler"); handler } @@ -93,7 +93,7 @@ impl NetworkProtocolHandler for TestProtocol { fn net_service() { let service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service"); service.start().unwrap(); - service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[1u8]).unwrap(); + service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", 1, &[1u8]).unwrap(); } #[test]