Move snapshot sync to a subprotocol (#2820)

This commit is contained in:
Arkadiy Paronyan 2016-10-24 16:24:35 +02:00 committed by Gav Wood
parent ff347da8d3
commit 9ec091e0cf
9 changed files with 92 additions and 36 deletions

View File

@ -17,7 +17,7 @@
use std::sync::Arc;
use std::collections::HashMap;
use util::Bytes;
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId,
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId,
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError};
use util::{U256, H256};
use io::{TimerToken};
@ -30,6 +30,9 @@ use std::net::{SocketAddr, AddrParseError};
use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig};
use std::str::FromStr;
use parking_lot::RwLock;
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT};
pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"bam";
/// Sync configuration
#[derive(Debug, Clone, Copy)]
@ -150,20 +153,29 @@ struct SyncProtocolHandler {
impl NetworkProtocolHandler for SyncProtocolHandler {
fn initialize(&self, io: &NetworkContext) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
io.register_timer(0, 1000).expect("Error registering sync timer");
}
}
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
// If warp protocol is supported only allow warp handshake
let warp_protocol = io.protocol_version(WARP_SYNC_PROTOCOL_ID, *peer).unwrap_or(0) != 0;
let warp_context = io.subprotocol_name() == WARP_SYNC_PROTOCOL_ID;
if warp_protocol == warp_context {
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer);
}
}
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer);
}
}
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay));
@ -195,8 +207,11 @@ impl ChainNotify for EthSync {
fn start(&self) {
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
self.network.register_protocol(self.handler.clone(), self.subprotocol_name, &[62u8, 63u8, 64u8])
self.network.register_protocol(self.handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8])
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
// register the warp sync subprotocol
self.network.register_protocol(self.handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8])
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
}
fn stop(&self) {

View File

@ -101,14 +101,14 @@ use super::SyncConfig;
use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError};
use snapshot::{Snapshot, ChunkType};
use rand::{thread_rng, Rng};
use api::PeerInfo as PeerInfoDigest;
use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
known_heap_size!(0, PeerInfo);
type PacketDecodeError = DecoderError;
const PROTOCOL_VERSION_63: u8 = 63;
const PROTOCOL_VERSION_64: u8 = 64;
const PROTOCOL_VERSION_1: u8 = 1;
const MAX_BODIES_TO_SEND: usize = 256;
const MAX_HEADERS_TO_SEND: usize = 512;
const MAX_NODE_DATA_TO_SEND: usize = 1024;
@ -137,11 +137,16 @@ const GET_NODE_DATA_PACKET: u8 = 0x0d;
const NODE_DATA_PACKET: u8 = 0x0e;
const GET_RECEIPTS_PACKET: u8 = 0x0f;
const RECEIPTS_PACKET: u8 = 0x10;
pub const ETH_PACKET_COUNT: u8 = 0x11;
const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
const SNAPSHOT_DATA_PACKET: u8 = 0x14;
pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x15;
const HEADERS_TIMEOUT_SEC: f64 = 15f64;
const BODIES_TIMEOUT_SEC: f64 = 10f64;
const RECEIPTS_TIMEOUT_SEC: f64 = 10f64;
@ -354,7 +359,7 @@ impl ChainSync {
let last_imported_number = self.new_blocks.last_imported_block_number();
SyncStatus {
state: self.state.clone(),
protocol_version: if self.state == SyncState::SnapshotData { PROTOCOL_VERSION_64 } else { PROTOCOL_VERSION_63 },
protocol_version: PROTOCOL_VERSION_63,
network_id: self.network_id,
start_block_number: self.starting_block,
last_imported_block_number: Some(last_imported_number),
@ -471,6 +476,7 @@ impl ChainSync {
/// Called by peer to report status
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let protocol_version: u8 = try!(r.val_at(0));
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0;
let peer = PeerInfo {
protocol_version: protocol_version,
network_id: try!(r.val_at(1)),
@ -485,8 +491,8 @@ impl ChainSync {
expired: false,
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
asking_snapshot_data: None,
snapshot_hash: if protocol_version == PROTOCOL_VERSION_64 { Some(try!(r.val_at(5))) } else { None },
snapshot_number: if protocol_version == PROTOCOL_VERSION_64 { Some(try!(r.val_at(6))) } else { None },
snapshot_hash: if warp_protocol { Some(try!(r.val_at(5))) } else { None },
snapshot_number: if warp_protocol { Some(try!(r.val_at(6))) } else { None },
block_set: None,
};
@ -511,7 +517,7 @@ impl ChainSync {
trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id);
return Ok(());
}
if peer.protocol_version != PROTOCOL_VERSION_64 && peer.protocol_version != PROTOCOL_VERSION_63 {
if (warp_protocol && peer.protocol_version != PROTOCOL_VERSION_1) || (!warp_protocol && peer.protocol_version != PROTOCOL_VERSION_63) {
io.disable_peer(peer_id);
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
return Ok(());
@ -1291,17 +1297,17 @@ impl ChainSync {
/// Send Status message
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), NetworkError> {
let protocol = io.eth_protocol_version(peer);
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer) != 0;
let protocol = if warp_protocol { PROTOCOL_VERSION_1 } else { PROTOCOL_VERSION_63 };
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
let pv64 = protocol >= PROTOCOL_VERSION_64;
let mut packet = RlpStream::new_list(if pv64 { 7 } else { 5 });
let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 });
let chain = io.chain().chain_info();
packet.append(&(protocol as u32));
packet.append(&self.network_id);
packet.append(&chain.total_difficulty);
packet.append(&chain.best_block_hash);
packet.append(&chain.genesis_hash);
if pv64 {
if warp_protocol {
let manifest = io.snapshot_service().manifest();
let block_number = manifest.as_ref().map_or(0, |m| m.block_number);
let manifest_hash = manifest.map_or(H256::new(), |m| m.into_rlp().sha3());
@ -1354,6 +1360,7 @@ impl ChainSync {
let mut data = Bytes::new();
let inc = (skip + 1) as BlockNumber;
let overlay = io.chain_overlay().read();
while number <= last && count < max_count {
if let Some(hdr) = overlay.get(&number) {
trace!(target: "sync", "{}: Returning cached fork header", peer_id);
@ -1362,6 +1369,9 @@ impl ChainSync {
} else if let Some(mut hdr) = io.chain().block_header(BlockID::Number(number)) {
data.append(&mut hdr);
count += 1;
} else {
// No required block.
break;
}
if reverse {
if number <= inc || number == 0 {
@ -1471,7 +1481,7 @@ impl ChainSync {
Ok(Some((SNAPSHOT_MANIFEST_PACKET, rlp)))
}
/// Respond to GetSnapshotManifest request
/// Respond to GetSnapshotData request
fn return_snapshot_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
let hash: H256 = try!(r.val_at(0));
trace!(target: "sync", "{} -> GetSnapshotData {:?}", peer_id, hash);

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo};
use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo, ProtocolId};
use util::Bytes;
use ethcore::client::BlockChainClient;
use ethcore::header::BlockNumber;
@ -44,8 +44,10 @@ pub trait SyncIo {
}
/// Returns information on p2p session
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo>;
/// Maximum mutuallt supported ETH protocol version
/// Maximum mutually supported ETH protocol version
fn eth_protocol_version(&self, peer_id: PeerId) -> u8;
/// Maximum mutually supported version of a gien protocol.
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8;
/// Returns if the chain block queue empty
fn is_chain_queue_empty(&self) -> bool {
self.chain().queue_info().is_empty()
@ -117,7 +119,11 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
}
fn eth_protocol_version(&self, peer_id: PeerId) -> u8 {
self.network.protocol_version(peer_id, self.network.subprotocol_name()).unwrap_or(0)
self.network.protocol_version(self.network.subprotocol_name(), peer_id).unwrap_or(0)
}
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
self.network.protocol_version(*protocol, peer_id).unwrap_or(0)
}
}

View File

@ -21,6 +21,7 @@ use ethcore::client::{TestBlockChainClient, BlockChainClient};
use ethcore::header::BlockNumber;
use ethcore::snapshot::SnapshotService;
use sync_io::SyncIo;
use api::WARP_SYNC_PROTOCOL_ID;
use chain::ChainSync;
use ::SyncConfig;
@ -90,7 +91,11 @@ impl<'p> SyncIo for TestIo<'p> {
}
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
64
63
}
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
if protocol == &WARP_SYNC_PROTOCOL_ID { 1 } else { self.eth_protocol_version(peer_id) }
}
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {

View File

@ -155,6 +155,8 @@ pub enum NetworkIoMessage {
protocol: ProtocolId,
/// Supported protocol versions.
versions: Vec<u8>,
/// Number of packet IDs reserved by the protocol.
packet_count: u8,
},
/// Register a new protocol timer
AddTimer {
@ -251,9 +253,8 @@ impl<'s> NetworkContext<'s> {
self.io.channel()
}
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
/// Disconnect a peer and prevent it from connecting again.
pub fn disable_peer(&self, peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left
self.io.message(NetworkIoMessage::DisablePeer(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
@ -290,7 +291,7 @@ impl<'s> NetworkContext<'s> {
}
/// Returns max version for a given protocol.
pub fn protocol_version(&self, peer: PeerId, protocol: ProtocolId) -> Option<u8> {
pub fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> {
let session = self.resolve_session(peer);
session.and_then(|s| s.lock().capability_version(protocol))
}
@ -1018,7 +1019,8 @@ impl IoHandler<NetworkIoMessage> for Host {
NetworkIoMessage::AddHandler {
ref handler,
ref protocol,
ref versions
ref versions,
ref packet_count,
} => {
let h = handler.clone();
let reserved = self.reserved_nodes.read();
@ -1026,7 +1028,7 @@ impl IoHandler<NetworkIoMessage> for Host {
self.handlers.write().insert(*protocol, h);
let mut info = self.info.write();
for v in versions {
info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count:0 });
info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count: *packet_count });
}
},
NetworkIoMessage::AddTimer {

View File

@ -45,7 +45,7 @@
//!
//! fn main () {
//! let mut service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service");
//! service.register_protocol(Arc::new(MyHandler), *b"myp", &[1u8]);
//! service.register_protocol(Arc::new(MyHandler), *b"myp", 1, &[1u8]);
//! service.start().expect("Error starting service");
//!
//! // Wait for quit condition
@ -91,13 +91,9 @@ mod ip_utils;
#[cfg(test)]
mod tests;
pub use host::PeerId;
pub use host::PacketId;
pub use host::NetworkContext;
pub use host::{PeerId, PacketId, ProtocolId, NetworkContext, NetworkIoMessage, NetworkConfiguration};
pub use service::NetworkService;
pub use host::NetworkIoMessage;
pub use error::NetworkError;
pub use host::NetworkConfiguration;
pub use stats::NetworkStats;
pub use session::SessionInfo;

View File

@ -73,11 +73,12 @@ impl NetworkService {
}
/// Regiter a new protocol handler with the event loop.
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> {
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), NetworkError> {
try!(self.io_service.send_message(NetworkIoMessage::AddHandler {
handler: handler,
protocol: protocol,
versions: versions.to_vec(),
packet_count: packet_count,
}));
Ok(())
}

View File

@ -16,6 +16,7 @@
use std::{str, io};
use std::net::SocketAddr;
use std::cmp::Ordering;
use std::sync::*;
use mio::*;
use mio::tcp::*;
@ -122,7 +123,7 @@ impl ToString for PeerCapabilityInfo {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SessionCapabilityInfo {
pub protocol: [u8; 3],
pub version: u8,
@ -130,6 +131,23 @@ pub struct SessionCapabilityInfo {
pub id_offset: u8,
}
impl PartialOrd for SessionCapabilityInfo {
fn partial_cmp(&self, other: &SessionCapabilityInfo) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SessionCapabilityInfo {
fn cmp(&self, b: &SessionCapabilityInfo) -> Ordering {
// By protocol id first
if self.protocol != b.protocol {
return self.protocol.cmp(&b.protocol);
}
// By version
self.version.cmp(&b.version)
}
}
const PACKET_HELLO: u8 = 0x80;
const PACKET_DISCONNECT: u8 = 0x01;
const PACKET_PING: u8 = 0x02;
@ -441,6 +459,9 @@ impl Session {
}
}
// Sort capabilities alphabeticaly.
caps.sort();
i = 0;
let mut offset: u8 = PACKET_USER;
while i < caps.len() {

View File

@ -41,7 +41,7 @@ impl TestProtocol {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
let handler = Arc::new(TestProtocol::new(drop_session));
service.register_protocol(handler.clone(), *b"tst", &[42u8, 43u8]).expect("Error registering test protocol handler");
service.register_protocol(handler.clone(), *b"tst", 1, &[42u8, 43u8]).expect("Error registering test protocol handler");
handler
}
@ -93,7 +93,7 @@ impl NetworkProtocolHandler for TestProtocol {
fn net_service() {
let service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service");
service.start().unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[1u8]).unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", 1, &[1u8]).unwrap();
}
#[test]