Merge branch 'master' of github.com:ethcore/parity
This commit is contained in:
commit
24fa2888ab
@ -17,7 +17,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
use util::Bytes;
|
||||
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId,
|
||||
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, ProtocolId,
|
||||
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode, NetworkError};
|
||||
use util::{U256, H256};
|
||||
use io::{TimerToken};
|
||||
@ -30,6 +30,9 @@ use std::net::{SocketAddr, AddrParseError};
|
||||
use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig};
|
||||
use std::str::FromStr;
|
||||
use parking_lot::RwLock;
|
||||
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT};
|
||||
|
||||
pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"bam";
|
||||
|
||||
/// Sync configuration
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@ -78,7 +81,7 @@ pub struct PeerInfo {
|
||||
/// Node client ID
|
||||
pub client_version: String,
|
||||
/// Capabilities
|
||||
pub capabilities: Vec<String>,
|
||||
pub capabilities: Vec<String>,
|
||||
/// Remote endpoint address
|
||||
pub remote_address: String,
|
||||
/// Local endpoint address
|
||||
@ -150,7 +153,9 @@ struct SyncProtocolHandler {
|
||||
|
||||
impl NetworkProtocolHandler for SyncProtocolHandler {
|
||||
fn initialize(&self, io: &NetworkContext) {
|
||||
io.register_timer(0, 1000).expect("Error registering sync timer");
|
||||
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
|
||||
io.register_timer(0, 1000).expect("Error registering sync timer");
|
||||
}
|
||||
}
|
||||
|
||||
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||
@ -158,11 +163,18 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
|
||||
}
|
||||
|
||||
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer);
|
||||
// If warp protocol is supported only allow warp handshake
|
||||
let warp_protocol = io.protocol_version(WARP_SYNC_PROTOCOL_ID, *peer).unwrap_or(0) != 0;
|
||||
let warp_context = io.subprotocol_name() == WARP_SYNC_PROTOCOL_ID;
|
||||
if warp_protocol == warp_context {
|
||||
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer);
|
||||
}
|
||||
}
|
||||
|
||||
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer);
|
||||
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
|
||||
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer);
|
||||
}
|
||||
}
|
||||
|
||||
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
|
||||
@ -195,8 +207,11 @@ impl ChainNotify for EthSync {
|
||||
|
||||
fn start(&self) {
|
||||
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
|
||||
self.network.register_protocol(self.handler.clone(), self.subprotocol_name, &[62u8, 63u8, 64u8])
|
||||
self.network.register_protocol(self.handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8])
|
||||
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
|
||||
// register the warp sync subprotocol
|
||||
self.network.register_protocol(self.handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8])
|
||||
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
|
@ -101,14 +101,14 @@ use super::SyncConfig;
|
||||
use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError};
|
||||
use snapshot::{Snapshot, ChunkType};
|
||||
use rand::{thread_rng, Rng};
|
||||
use api::PeerInfo as PeerInfoDigest;
|
||||
use api::{PeerInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID};
|
||||
|
||||
known_heap_size!(0, PeerInfo);
|
||||
|
||||
type PacketDecodeError = DecoderError;
|
||||
|
||||
const PROTOCOL_VERSION_63: u8 = 63;
|
||||
const PROTOCOL_VERSION_64: u8 = 64;
|
||||
const PROTOCOL_VERSION_1: u8 = 1;
|
||||
const MAX_BODIES_TO_SEND: usize = 256;
|
||||
const MAX_HEADERS_TO_SEND: usize = 512;
|
||||
const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
||||
@ -137,11 +137,16 @@ const GET_NODE_DATA_PACKET: u8 = 0x0d;
|
||||
const NODE_DATA_PACKET: u8 = 0x0e;
|
||||
const GET_RECEIPTS_PACKET: u8 = 0x0f;
|
||||
const RECEIPTS_PACKET: u8 = 0x10;
|
||||
|
||||
pub const ETH_PACKET_COUNT: u8 = 0x11;
|
||||
|
||||
const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
|
||||
const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
|
||||
const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
|
||||
const SNAPSHOT_DATA_PACKET: u8 = 0x14;
|
||||
|
||||
pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x15;
|
||||
|
||||
const HEADERS_TIMEOUT_SEC: f64 = 15f64;
|
||||
const BODIES_TIMEOUT_SEC: f64 = 10f64;
|
||||
const RECEIPTS_TIMEOUT_SEC: f64 = 10f64;
|
||||
@ -354,7 +359,7 @@ impl ChainSync {
|
||||
let last_imported_number = self.new_blocks.last_imported_block_number();
|
||||
SyncStatus {
|
||||
state: self.state.clone(),
|
||||
protocol_version: if self.state == SyncState::SnapshotData { PROTOCOL_VERSION_64 } else { PROTOCOL_VERSION_63 },
|
||||
protocol_version: PROTOCOL_VERSION_63,
|
||||
network_id: self.network_id,
|
||||
start_block_number: self.starting_block,
|
||||
last_imported_block_number: Some(last_imported_number),
|
||||
@ -471,6 +476,7 @@ impl ChainSync {
|
||||
/// Called by peer to report status
|
||||
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
let protocol_version: u8 = try!(r.val_at(0));
|
||||
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0;
|
||||
let peer = PeerInfo {
|
||||
protocol_version: protocol_version,
|
||||
network_id: try!(r.val_at(1)),
|
||||
@ -485,8 +491,8 @@ impl ChainSync {
|
||||
expired: false,
|
||||
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
|
||||
asking_snapshot_data: None,
|
||||
snapshot_hash: if protocol_version == PROTOCOL_VERSION_64 { Some(try!(r.val_at(5))) } else { None },
|
||||
snapshot_number: if protocol_version == PROTOCOL_VERSION_64 { Some(try!(r.val_at(6))) } else { None },
|
||||
snapshot_hash: if warp_protocol { Some(try!(r.val_at(5))) } else { None },
|
||||
snapshot_number: if warp_protocol { Some(try!(r.val_at(6))) } else { None },
|
||||
block_set: None,
|
||||
};
|
||||
|
||||
@ -511,7 +517,7 @@ impl ChainSync {
|
||||
trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id);
|
||||
return Ok(());
|
||||
}
|
||||
if peer.protocol_version != PROTOCOL_VERSION_64 && peer.protocol_version != PROTOCOL_VERSION_63 {
|
||||
if (warp_protocol && peer.protocol_version != PROTOCOL_VERSION_1) || (!warp_protocol && peer.protocol_version != PROTOCOL_VERSION_63) {
|
||||
io.disable_peer(peer_id);
|
||||
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
|
||||
return Ok(());
|
||||
@ -1291,17 +1297,17 @@ impl ChainSync {
|
||||
|
||||
/// Send Status message
|
||||
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), NetworkError> {
|
||||
let protocol = io.eth_protocol_version(peer);
|
||||
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer) != 0;
|
||||
let protocol = if warp_protocol { PROTOCOL_VERSION_1 } else { PROTOCOL_VERSION_63 };
|
||||
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
|
||||
let pv64 = protocol >= PROTOCOL_VERSION_64;
|
||||
let mut packet = RlpStream::new_list(if pv64 { 7 } else { 5 });
|
||||
let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 });
|
||||
let chain = io.chain().chain_info();
|
||||
packet.append(&(protocol as u32));
|
||||
packet.append(&self.network_id);
|
||||
packet.append(&chain.total_difficulty);
|
||||
packet.append(&chain.best_block_hash);
|
||||
packet.append(&chain.genesis_hash);
|
||||
if pv64 {
|
||||
if warp_protocol {
|
||||
let manifest = io.snapshot_service().manifest();
|
||||
let block_number = manifest.as_ref().map_or(0, |m| m.block_number);
|
||||
let manifest_hash = manifest.map_or(H256::new(), |m| m.into_rlp().sha3());
|
||||
@ -1354,6 +1360,7 @@ impl ChainSync {
|
||||
let mut data = Bytes::new();
|
||||
let inc = (skip + 1) as BlockNumber;
|
||||
let overlay = io.chain_overlay().read();
|
||||
|
||||
while number <= last && count < max_count {
|
||||
if let Some(hdr) = overlay.get(&number) {
|
||||
trace!(target: "sync", "{}: Returning cached fork header", peer_id);
|
||||
@ -1362,6 +1369,9 @@ impl ChainSync {
|
||||
} else if let Some(mut hdr) = io.chain().block_header(BlockID::Number(number)) {
|
||||
data.append(&mut hdr);
|
||||
count += 1;
|
||||
} else {
|
||||
// No required block.
|
||||
break;
|
||||
}
|
||||
if reverse {
|
||||
if number <= inc || number == 0 {
|
||||
@ -1471,7 +1481,7 @@ impl ChainSync {
|
||||
Ok(Some((SNAPSHOT_MANIFEST_PACKET, rlp)))
|
||||
}
|
||||
|
||||
/// Respond to GetSnapshotManifest request
|
||||
/// Respond to GetSnapshotData request
|
||||
fn return_snapshot_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let hash: H256 = try!(r.val_at(0));
|
||||
trace!(target: "sync", "{} -> GetSnapshotData {:?}", peer_id, hash);
|
||||
|
@ -15,7 +15,7 @@
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo};
|
||||
use network::{NetworkContext, PeerId, PacketId, NetworkError, SessionInfo, ProtocolId};
|
||||
use util::Bytes;
|
||||
use ethcore::client::BlockChainClient;
|
||||
use ethcore::header::BlockNumber;
|
||||
@ -44,8 +44,10 @@ pub trait SyncIo {
|
||||
}
|
||||
/// Returns information on p2p session
|
||||
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo>;
|
||||
/// Maximum mutuallt supported ETH protocol version
|
||||
/// Maximum mutually supported ETH protocol version
|
||||
fn eth_protocol_version(&self, peer_id: PeerId) -> u8;
|
||||
/// Maximum mutually supported version of a gien protocol.
|
||||
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8;
|
||||
/// Returns if the chain block queue empty
|
||||
fn is_chain_queue_empty(&self) -> bool {
|
||||
self.chain().queue_info().is_empty()
|
||||
@ -117,7 +119,11 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
|
||||
}
|
||||
|
||||
fn eth_protocol_version(&self, peer_id: PeerId) -> u8 {
|
||||
self.network.protocol_version(peer_id, self.network.subprotocol_name()).unwrap_or(0)
|
||||
self.network.protocol_version(self.network.subprotocol_name(), peer_id).unwrap_or(0)
|
||||
}
|
||||
|
||||
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
|
||||
self.network.protocol_version(*protocol, peer_id).unwrap_or(0)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ use ethcore::client::{TestBlockChainClient, BlockChainClient};
|
||||
use ethcore::header::BlockNumber;
|
||||
use ethcore::snapshot::SnapshotService;
|
||||
use sync_io::SyncIo;
|
||||
use api::WARP_SYNC_PROTOCOL_ID;
|
||||
use chain::ChainSync;
|
||||
use ::SyncConfig;
|
||||
|
||||
@ -90,7 +91,11 @@ impl<'p> SyncIo for TestIo<'p> {
|
||||
}
|
||||
|
||||
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
|
||||
64
|
||||
63
|
||||
}
|
||||
|
||||
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
|
||||
if protocol == &WARP_SYNC_PROTOCOL_ID { 1 } else { self.eth_protocol_version(peer_id) }
|
||||
}
|
||||
|
||||
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
|
||||
|
@ -155,6 +155,8 @@ pub enum NetworkIoMessage {
|
||||
protocol: ProtocolId,
|
||||
/// Supported protocol versions.
|
||||
versions: Vec<u8>,
|
||||
/// Number of packet IDs reserved by the protocol.
|
||||
packet_count: u8,
|
||||
},
|
||||
/// Register a new protocol timer
|
||||
AddTimer {
|
||||
@ -251,9 +253,8 @@ impl<'s> NetworkContext<'s> {
|
||||
self.io.channel()
|
||||
}
|
||||
|
||||
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
|
||||
/// Disconnect a peer and prevent it from connecting again.
|
||||
pub fn disable_peer(&self, peer: PeerId) {
|
||||
//TODO: remove capability, disconnect if no capabilities left
|
||||
self.io.message(NetworkIoMessage::DisablePeer(peer))
|
||||
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
|
||||
}
|
||||
@ -290,7 +291,7 @@ impl<'s> NetworkContext<'s> {
|
||||
}
|
||||
|
||||
/// Returns max version for a given protocol.
|
||||
pub fn protocol_version(&self, peer: PeerId, protocol: ProtocolId) -> Option<u8> {
|
||||
pub fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> {
|
||||
let session = self.resolve_session(peer);
|
||||
session.and_then(|s| s.lock().capability_version(protocol))
|
||||
}
|
||||
@ -1018,7 +1019,8 @@ impl IoHandler<NetworkIoMessage> for Host {
|
||||
NetworkIoMessage::AddHandler {
|
||||
ref handler,
|
||||
ref protocol,
|
||||
ref versions
|
||||
ref versions,
|
||||
ref packet_count,
|
||||
} => {
|
||||
let h = handler.clone();
|
||||
let reserved = self.reserved_nodes.read();
|
||||
@ -1026,7 +1028,7 @@ impl IoHandler<NetworkIoMessage> for Host {
|
||||
self.handlers.write().insert(*protocol, h);
|
||||
let mut info = self.info.write();
|
||||
for v in versions {
|
||||
info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count:0 });
|
||||
info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count: *packet_count });
|
||||
}
|
||||
},
|
||||
NetworkIoMessage::AddTimer {
|
||||
|
@ -45,7 +45,7 @@
|
||||
//!
|
||||
//! fn main () {
|
||||
//! let mut service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service");
|
||||
//! service.register_protocol(Arc::new(MyHandler), *b"myp", &[1u8]);
|
||||
//! service.register_protocol(Arc::new(MyHandler), *b"myp", 1, &[1u8]);
|
||||
//! service.start().expect("Error starting service");
|
||||
//!
|
||||
//! // Wait for quit condition
|
||||
@ -91,13 +91,9 @@ mod ip_utils;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub use host::PeerId;
|
||||
pub use host::PacketId;
|
||||
pub use host::NetworkContext;
|
||||
pub use host::{PeerId, PacketId, ProtocolId, NetworkContext, NetworkIoMessage, NetworkConfiguration};
|
||||
pub use service::NetworkService;
|
||||
pub use host::NetworkIoMessage;
|
||||
pub use error::NetworkError;
|
||||
pub use host::NetworkConfiguration;
|
||||
pub use stats::NetworkStats;
|
||||
pub use session::SessionInfo;
|
||||
|
||||
|
@ -73,11 +73,12 @@ impl NetworkService {
|
||||
}
|
||||
|
||||
/// Regiter a new protocol handler with the event loop.
|
||||
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> {
|
||||
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), NetworkError> {
|
||||
try!(self.io_service.send_message(NetworkIoMessage::AddHandler {
|
||||
handler: handler,
|
||||
protocol: protocol,
|
||||
versions: versions.to_vec(),
|
||||
packet_count: packet_count,
|
||||
}));
|
||||
Ok(())
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
use std::{str, io};
|
||||
use std::net::SocketAddr;
|
||||
use std::cmp::Ordering;
|
||||
use std::sync::*;
|
||||
use mio::*;
|
||||
use mio::tcp::*;
|
||||
@ -122,7 +123,7 @@ impl ToString for PeerCapabilityInfo {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct SessionCapabilityInfo {
|
||||
pub protocol: [u8; 3],
|
||||
pub version: u8,
|
||||
@ -130,6 +131,23 @@ pub struct SessionCapabilityInfo {
|
||||
pub id_offset: u8,
|
||||
}
|
||||
|
||||
impl PartialOrd for SessionCapabilityInfo {
|
||||
fn partial_cmp(&self, other: &SessionCapabilityInfo) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for SessionCapabilityInfo {
|
||||
fn cmp(&self, b: &SessionCapabilityInfo) -> Ordering {
|
||||
// By protocol id first
|
||||
if self.protocol != b.protocol {
|
||||
return self.protocol.cmp(&b.protocol);
|
||||
}
|
||||
// By version
|
||||
self.version.cmp(&b.version)
|
||||
}
|
||||
}
|
||||
|
||||
const PACKET_HELLO: u8 = 0x80;
|
||||
const PACKET_DISCONNECT: u8 = 0x01;
|
||||
const PACKET_PING: u8 = 0x02;
|
||||
@ -441,6 +459,9 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
// Sort capabilities alphabeticaly.
|
||||
caps.sort();
|
||||
|
||||
i = 0;
|
||||
let mut offset: u8 = PACKET_USER;
|
||||
while i < caps.len() {
|
||||
|
@ -41,7 +41,7 @@ impl TestProtocol {
|
||||
/// Creates and register protocol with the network service
|
||||
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
|
||||
let handler = Arc::new(TestProtocol::new(drop_session));
|
||||
service.register_protocol(handler.clone(), *b"tst", &[42u8, 43u8]).expect("Error registering test protocol handler");
|
||||
service.register_protocol(handler.clone(), *b"tst", 1, &[42u8, 43u8]).expect("Error registering test protocol handler");
|
||||
handler
|
||||
}
|
||||
|
||||
@ -93,7 +93,7 @@ impl NetworkProtocolHandler for TestProtocol {
|
||||
fn net_service() {
|
||||
let service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service");
|
||||
service.start().unwrap();
|
||||
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[1u8]).unwrap();
|
||||
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", 1, &[1u8]).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
Loading…
Reference in New Issue
Block a user