Fix packet count when talking with PAR2 peers (#8555)
* Support diferent packet counts in different protocol versions. * Fix light timeouts and eclipse protection. * Fix devp2p tests. * Fix whisper-cli compilation. * Fix compilation. * Fix ethcore-sync tests. * Revert "Fix light timeouts and eclipse protection." This reverts commit 06285ea8c1d9d184d809f64b5507aece633da6cc. * Increase timeouts.
This commit is contained in:
parent
981554cf74
commit
08abf67a51
@ -75,14 +75,17 @@ const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
|
|||||||
// minimum interval between updates.
|
// minimum interval between updates.
|
||||||
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
|
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
|
||||||
|
|
||||||
|
/// Packet count for PIP.
|
||||||
|
const PACKET_COUNT_V1: u8 = 9;
|
||||||
|
|
||||||
/// Supported protocol versions.
|
/// Supported protocol versions.
|
||||||
pub const PROTOCOL_VERSIONS: &'static [u8] = &[1];
|
pub const PROTOCOL_VERSIONS: &'static [(u8, u8)] = &[
|
||||||
|
(1, PACKET_COUNT_V1),
|
||||||
|
];
|
||||||
|
|
||||||
/// Max protocol version.
|
/// Max protocol version.
|
||||||
pub const MAX_PROTOCOL_VERSION: u8 = 1;
|
pub const MAX_PROTOCOL_VERSION: u8 = 1;
|
||||||
|
|
||||||
/// Packet count for PIP.
|
|
||||||
pub const PACKET_COUNT: u8 = 9;
|
|
||||||
|
|
||||||
// packet ID definitions.
|
// packet ID definitions.
|
||||||
mod packet {
|
mod packet {
|
||||||
@ -111,9 +114,9 @@ mod packet {
|
|||||||
mod timeout {
|
mod timeout {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub const HANDSHAKE: Duration = Duration::from_millis(2500);
|
pub const HANDSHAKE: Duration = Duration::from_millis(4_000);
|
||||||
pub const ACKNOWLEDGE_UPDATE: Duration = Duration::from_millis(5000);
|
pub const ACKNOWLEDGE_UPDATE: Duration = Duration::from_millis(5_000);
|
||||||
pub const BASE: u64 = 1500; // base timeout for packet.
|
pub const BASE: u64 = 2_500; // base timeout for packet.
|
||||||
|
|
||||||
// timeouts per request within packet.
|
// timeouts per request within packet.
|
||||||
pub const HEADERS: u64 = 250; // per header?
|
pub const HEADERS: u64 = 250; // per header?
|
||||||
@ -688,7 +691,7 @@ impl LightProtocol {
|
|||||||
Err(e) => { punish(*peer, io, e); return }
|
Err(e) => { punish(*peer, io, e); return }
|
||||||
};
|
};
|
||||||
|
|
||||||
if PROTOCOL_VERSIONS.iter().find(|x| **x == proto_version).is_none() {
|
if PROTOCOL_VERSIONS.iter().find(|x| x.0 == proto_version).is_none() {
|
||||||
punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version));
|
punish(*peer, io, Error::UnsupportedProtocolVersion(proto_version));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ use chain::{ChainSync, SyncStatus as EthSyncStatus};
|
|||||||
use std::net::{SocketAddr, AddrParseError};
|
use std::net::{SocketAddr, AddrParseError};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
|
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
|
||||||
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
|
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
|
||||||
use light::client::AsLightClient;
|
use light::client::AsLightClient;
|
||||||
use light::Provider;
|
use light::Provider;
|
||||||
@ -202,10 +202,8 @@ pub struct AttachedProtocol {
|
|||||||
pub handler: Arc<NetworkProtocolHandler + Send + Sync>,
|
pub handler: Arc<NetworkProtocolHandler + Send + Sync>,
|
||||||
/// 3-character ID for the protocol.
|
/// 3-character ID for the protocol.
|
||||||
pub protocol_id: ProtocolId,
|
pub protocol_id: ProtocolId,
|
||||||
/// Packet count.
|
/// Supported versions and their packet counts.
|
||||||
pub packet_count: u8,
|
pub versions: &'static [(u8, u8)],
|
||||||
/// Supported versions.
|
|
||||||
pub versions: &'static [u8],
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AttachedProtocol {
|
impl AttachedProtocol {
|
||||||
@ -213,7 +211,6 @@ impl AttachedProtocol {
|
|||||||
let res = network.register_protocol(
|
let res = network.register_protocol(
|
||||||
self.handler.clone(),
|
self.handler.clone(),
|
||||||
self.protocol_id,
|
self.protocol_id,
|
||||||
self.packet_count,
|
|
||||||
self.versions
|
self.versions
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -459,15 +456,15 @@ impl ChainNotify for EthSync {
|
|||||||
Err(err) => warn!("Error starting network: {}", err),
|
Err(err) => warn!("Error starting network: {}", err),
|
||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63])
|
self.network.register_protocol(self.eth_handler.clone(), self.subprotocol_name, &[ETH_PROTOCOL_VERSION_62, ETH_PROTOCOL_VERSION_63])
|
||||||
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
|
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
|
||||||
// register the warp sync subprotocol
|
// register the warp sync subprotocol
|
||||||
self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3])
|
self.network.register_protocol(self.eth_handler.clone(), WARP_SYNC_PROTOCOL_ID, &[PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3])
|
||||||
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
|
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
|
||||||
|
|
||||||
// register the light protocol.
|
// register the light protocol.
|
||||||
if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) {
|
if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) {
|
||||||
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
|
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PROTOCOL_VERSIONS)
|
||||||
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
|
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -827,7 +824,7 @@ impl ManageNetwork for LightSync {
|
|||||||
|
|
||||||
let light_proto = self.proto.clone();
|
let light_proto = self.proto.clone();
|
||||||
|
|
||||||
self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
|
self.network.register_protocol(light_proto, self.subprotocol_name, ::light::net::PROTOCOL_VERSIONS)
|
||||||
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
|
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
|
||||||
|
|
||||||
for proto in &self.attached_protos { proto.register(&self.network) }
|
for proto in &self.attached_protos { proto.register(&self.network) }
|
||||||
|
@ -45,7 +45,6 @@ use super::{
|
|||||||
MAX_NEW_BLOCK_AGE,
|
MAX_NEW_BLOCK_AGE,
|
||||||
MAX_NEW_HASHES,
|
MAX_NEW_HASHES,
|
||||||
PAR_PROTOCOL_VERSION_1,
|
PAR_PROTOCOL_VERSION_1,
|
||||||
PAR_PROTOCOL_VERSION_2,
|
|
||||||
PAR_PROTOCOL_VERSION_3,
|
PAR_PROTOCOL_VERSION_3,
|
||||||
BLOCK_BODIES_PACKET,
|
BLOCK_BODIES_PACKET,
|
||||||
BLOCK_HEADERS_PACKET,
|
BLOCK_HEADERS_PACKET,
|
||||||
@ -641,8 +640,11 @@ impl SyncHandler {
|
|||||||
trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, sync.network_id, peer.network_id);
|
trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, sync.network_id, peer.network_id);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
if (warp_protocol && peer.protocol_version != PAR_PROTOCOL_VERSION_1 && peer.protocol_version != PAR_PROTOCOL_VERSION_2 && peer.protocol_version != PAR_PROTOCOL_VERSION_3)
|
|
||||||
|| (!warp_protocol && peer.protocol_version != ETH_PROTOCOL_VERSION_63 && peer.protocol_version != ETH_PROTOCOL_VERSION_62) {
|
if false
|
||||||
|
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_3.0))
|
||||||
|
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_62.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_63.0))
|
||||||
|
{
|
||||||
io.disable_peer(peer_id);
|
io.disable_peer(peer_id);
|
||||||
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
|
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -127,15 +127,15 @@ known_heap_size!(0, PeerInfo);
|
|||||||
pub type PacketDecodeError = DecoderError;
|
pub type PacketDecodeError = DecoderError;
|
||||||
|
|
||||||
/// 63 version of Ethereum protocol.
|
/// 63 version of Ethereum protocol.
|
||||||
pub const ETH_PROTOCOL_VERSION_63: u8 = 63;
|
pub const ETH_PROTOCOL_VERSION_63: (u8, u8) = (63, 0x11);
|
||||||
/// 62 version of Ethereum protocol.
|
/// 62 version of Ethereum protocol.
|
||||||
pub const ETH_PROTOCOL_VERSION_62: u8 = 62;
|
pub const ETH_PROTOCOL_VERSION_62: (u8, u8) = (62, 0x11);
|
||||||
/// 1 version of Parity protocol.
|
/// 1 version of Parity protocol and the packet count.
|
||||||
pub const PAR_PROTOCOL_VERSION_1: u8 = 1;
|
pub const PAR_PROTOCOL_VERSION_1: (u8, u8) = (1, 0x15);
|
||||||
/// 2 version of Parity protocol (consensus messages added).
|
/// 2 version of Parity protocol (consensus messages added).
|
||||||
pub const PAR_PROTOCOL_VERSION_2: u8 = 2;
|
pub const PAR_PROTOCOL_VERSION_2: (u8, u8) = (2, 0x16);
|
||||||
/// 3 version of Parity protocol (private transactions messages added).
|
/// 3 version of Parity protocol (private transactions messages added).
|
||||||
pub const PAR_PROTOCOL_VERSION_3: u8 = 3;
|
pub const PAR_PROTOCOL_VERSION_3: (u8, u8) = (3, 0x18);
|
||||||
|
|
||||||
pub const MAX_BODIES_TO_SEND: usize = 256;
|
pub const MAX_BODIES_TO_SEND: usize = 256;
|
||||||
pub const MAX_HEADERS_TO_SEND: usize = 512;
|
pub const MAX_HEADERS_TO_SEND: usize = 512;
|
||||||
@ -169,8 +169,6 @@ pub const NODE_DATA_PACKET: u8 = 0x0e;
|
|||||||
pub const GET_RECEIPTS_PACKET: u8 = 0x0f;
|
pub const GET_RECEIPTS_PACKET: u8 = 0x0f;
|
||||||
pub const RECEIPTS_PACKET: u8 = 0x10;
|
pub const RECEIPTS_PACKET: u8 = 0x10;
|
||||||
|
|
||||||
pub const ETH_PACKET_COUNT: u8 = 0x11;
|
|
||||||
|
|
||||||
pub const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
|
pub const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
|
||||||
pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
|
pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
|
||||||
pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
|
pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
|
||||||
@ -179,8 +177,6 @@ pub const CONSENSUS_DATA_PACKET: u8 = 0x15;
|
|||||||
const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
|
const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
|
||||||
const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;
|
const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;
|
||||||
|
|
||||||
pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x18;
|
|
||||||
|
|
||||||
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
|
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
|
||||||
|
|
||||||
const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
|
const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
@ -453,7 +449,7 @@ impl ChainSync {
|
|||||||
let last_imported_number = self.new_blocks.last_imported_block_number();
|
let last_imported_number = self.new_blocks.last_imported_block_number();
|
||||||
SyncStatus {
|
SyncStatus {
|
||||||
state: self.state.clone(),
|
state: self.state.clone(),
|
||||||
protocol_version: ETH_PROTOCOL_VERSION_63,
|
protocol_version: ETH_PROTOCOL_VERSION_63.0,
|
||||||
network_id: self.network_id,
|
network_id: self.network_id,
|
||||||
start_block_number: self.starting_block,
|
start_block_number: self.starting_block,
|
||||||
last_imported_block_number: Some(last_imported_number),
|
last_imported_block_number: Some(last_imported_number),
|
||||||
@ -855,7 +851,7 @@ impl ChainSync {
|
|||||||
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> {
|
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), network::Error> {
|
||||||
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer);
|
let warp_protocol_version = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer);
|
||||||
let warp_protocol = warp_protocol_version != 0;
|
let warp_protocol = warp_protocol_version != 0;
|
||||||
let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63 };
|
let protocol = if warp_protocol { warp_protocol_version } else { ETH_PROTOCOL_VERSION_63.0 };
|
||||||
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
|
trace!(target: "sync", "Sending status to {}, protocol version {}", peer, protocol);
|
||||||
let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 });
|
let mut packet = RlpStream::new_list(if warp_protocol { 7 } else { 5 });
|
||||||
let chain = io.chain().chain_info();
|
let chain = io.chain().chain_info();
|
||||||
@ -1019,11 +1015,11 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn get_consensus_peers(&self) -> Vec<PeerId> {
|
fn get_consensus_peers(&self) -> Vec<PeerId> {
|
||||||
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2 { Some(*id) } else { None }).collect()
|
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_2.0 { Some(*id) } else { None }).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_private_transaction_peers(&self) -> Vec<PeerId> {
|
fn get_private_transaction_peers(&self) -> Vec<PeerId> {
|
||||||
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3 { Some(*id) } else { None }).collect()
|
self.peers.iter().filter_map(|(id, p)| if p.protocol_version >= PAR_PROTOCOL_VERSION_3.0 { Some(*id) } else { None }).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Maintain other peers. Send out any new blocks and transactions
|
/// Maintain other peers. Send out any new blocks and transactions
|
||||||
|
@ -28,7 +28,7 @@ use super::{
|
|||||||
BlockSet,
|
BlockSet,
|
||||||
ChainSync,
|
ChainSync,
|
||||||
PeerAsking,
|
PeerAsking,
|
||||||
ETH_PACKET_COUNT,
|
ETH_PROTOCOL_VERSION_63,
|
||||||
GET_BLOCK_BODIES_PACKET,
|
GET_BLOCK_BODIES_PACKET,
|
||||||
GET_BLOCK_HEADERS_PACKET,
|
GET_BLOCK_HEADERS_PACKET,
|
||||||
GET_RECEIPTS_PACKET,
|
GET_RECEIPTS_PACKET,
|
||||||
@ -140,7 +140,8 @@ impl SyncRequester {
|
|||||||
}
|
}
|
||||||
peer.asking = asking;
|
peer.asking = asking;
|
||||||
peer.ask_time = Instant::now();
|
peer.ask_time = Instant::now();
|
||||||
let result = if packet_id >= ETH_PACKET_COUNT {
|
// TODO [ToDr] This seems quite fragile. Be careful when protocol is updated.
|
||||||
|
let result = if packet_id >= ETH_PROTOCOL_VERSION_63.1 {
|
||||||
io.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
|
io.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
|
||||||
} else {
|
} else {
|
||||||
io.send(peer_id, packet_id, packet)
|
io.send(peer_id, packet_id, packet)
|
||||||
|
@ -134,11 +134,11 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
|
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
|
||||||
ETH_PROTOCOL_VERSION_63
|
ETH_PROTOCOL_VERSION_63.0
|
||||||
}
|
}
|
||||||
|
|
||||||
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
|
fn protocol_version(&self, protocol: &ProtocolId, peer_id: PeerId) -> u8 {
|
||||||
if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3 } else { self.eth_protocol_version(peer_id) }
|
if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3.0 } else { self.eth_protocol_version(peer_id) }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
|
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
|
||||||
|
@ -89,7 +89,6 @@ pub fn setup(target_pool_size: usize, protos: &mut Vec<AttachedProtocol>)
|
|||||||
|
|
||||||
protos.push(AttachedProtocol {
|
protos.push(AttachedProtocol {
|
||||||
handler: net.clone() as Arc<_>,
|
handler: net.clone() as Arc<_>,
|
||||||
packet_count: whisper_net::PACKET_COUNT,
|
|
||||||
versions: whisper_net::SUPPORTED_VERSIONS,
|
versions: whisper_net::SUPPORTED_VERSIONS,
|
||||||
protocol_id: whisper_net::PROTOCOL_ID,
|
protocol_id: whisper_net::PROTOCOL_ID,
|
||||||
});
|
});
|
||||||
@ -97,7 +96,6 @@ pub fn setup(target_pool_size: usize, protos: &mut Vec<AttachedProtocol>)
|
|||||||
// parity-only extensions to whisper.
|
// parity-only extensions to whisper.
|
||||||
protos.push(AttachedProtocol {
|
protos.push(AttachedProtocol {
|
||||||
handler: Arc::new(whisper_net::ParityExtensions),
|
handler: Arc::new(whisper_net::ParityExtensions),
|
||||||
packet_count: whisper_net::PACKET_COUNT,
|
|
||||||
versions: whisper_net::SUPPORTED_VERSIONS,
|
versions: whisper_net::SUPPORTED_VERSIONS,
|
||||||
protocol_id: whisper_net::PARITY_PROTOCOL_ID,
|
protocol_id: whisper_net::PARITY_PROTOCOL_ID,
|
||||||
});
|
});
|
||||||
|
@ -79,7 +79,9 @@ const NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
|
|||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
/// Protocol info
|
/// Protocol info
|
||||||
pub struct CapabilityInfo {
|
pub struct CapabilityInfo {
|
||||||
|
/// Protocol ID
|
||||||
pub protocol: ProtocolId,
|
pub protocol: ProtocolId,
|
||||||
|
/// Protocol version
|
||||||
pub version: u8,
|
pub version: u8,
|
||||||
/// Total number of packet IDs this protocol support.
|
/// Total number of packet IDs this protocol support.
|
||||||
pub packet_count: u8,
|
pub packet_count: u8,
|
||||||
@ -687,7 +689,7 @@ impl Host {
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
let s = session.lock();
|
let s = session.lock();
|
||||||
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
|
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
|
||||||
if let ErrorKind::Disconnect(DisconnectReason::UselessPeer) = *e.kind() {
|
if let ErrorKind::Disconnect(DisconnectReason::IncompatibleProtocol) = *e.kind() {
|
||||||
if let Some(id) = s.id() {
|
if let Some(id) = s.id() {
|
||||||
if !self.reserved_nodes.read().contains(id) {
|
if !self.reserved_nodes.read().contains(id) {
|
||||||
let mut nodes = self.nodes.write();
|
let mut nodes = self.nodes.write();
|
||||||
@ -990,7 +992,6 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
ref handler,
|
ref handler,
|
||||||
ref protocol,
|
ref protocol,
|
||||||
ref versions,
|
ref versions,
|
||||||
ref packet_count,
|
|
||||||
} => {
|
} => {
|
||||||
let h = handler.clone();
|
let h = handler.clone();
|
||||||
let reserved = self.reserved_nodes.read();
|
let reserved = self.reserved_nodes.read();
|
||||||
@ -1000,8 +1001,12 @@ impl IoHandler<NetworkIoMessage> for Host {
|
|||||||
);
|
);
|
||||||
self.handlers.write().insert(*protocol, h);
|
self.handlers.write().insert(*protocol, h);
|
||||||
let mut info = self.info.write();
|
let mut info = self.info.write();
|
||||||
for v in versions {
|
for &(version, packet_count) in versions {
|
||||||
info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count: *packet_count });
|
info.capabilities.push(CapabilityInfo {
|
||||||
|
protocol: *protocol,
|
||||||
|
version,
|
||||||
|
packet_count,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
NetworkIoMessage::AddTimer {
|
NetworkIoMessage::AddTimer {
|
||||||
|
@ -49,7 +49,7 @@
|
|||||||
//! fn main () {
|
//! fn main () {
|
||||||
//! let mut service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
|
//! let mut service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
|
||||||
//! service.start().expect("Error starting service");
|
//! service.start().expect("Error starting service");
|
||||||
//! service.register_protocol(Arc::new(MyHandler), *b"myp", 1, &[1u8]);
|
//! service.register_protocol(Arc::new(MyHandler), *b"myp", &[(1u8, 1u8)]);
|
||||||
//!
|
//!
|
||||||
//! // Wait for quit condition
|
//! // Wait for quit condition
|
||||||
//! // ...
|
//! // ...
|
||||||
|
@ -67,12 +67,17 @@ impl NetworkService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Regiter a new protocol handler with the event loop.
|
/// Regiter a new protocol handler with the event loop.
|
||||||
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId, packet_count: u8, versions: &[u8]) -> Result<(), Error> {
|
pub fn register_protocol(
|
||||||
|
&self,
|
||||||
|
handler: Arc<NetworkProtocolHandler + Send + Sync>,
|
||||||
|
protocol: ProtocolId,
|
||||||
|
// version id + packet count
|
||||||
|
versions: &[(u8, u8)]
|
||||||
|
) -> Result<(), Error> {
|
||||||
self.io_service.send_message(NetworkIoMessage::AddHandler {
|
self.io_service.send_message(NetworkIoMessage::AddHandler {
|
||||||
handler: handler,
|
handler,
|
||||||
protocol: protocol,
|
protocol,
|
||||||
versions: versions.to_vec(),
|
versions: versions.to_vec(),
|
||||||
packet_count: packet_count,
|
|
||||||
})?;
|
})?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,7 @@ impl TestProtocol {
|
|||||||
/// Creates and register protocol with the network service
|
/// Creates and register protocol with the network service
|
||||||
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
|
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
|
||||||
let handler = Arc::new(TestProtocol::new(drop_session));
|
let handler = Arc::new(TestProtocol::new(drop_session));
|
||||||
service.register_protocol(handler.clone(), *b"tst", 1, &[42u8, 43u8]).expect("Error registering test protocol handler");
|
service.register_protocol(handler.clone(), *b"tst", &[(42u8, 1u8), (43u8, 1u8)]).expect("Error registering test protocol handler");
|
||||||
handler
|
handler
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -104,7 +104,7 @@ impl NetworkProtocolHandler for TestProtocol {
|
|||||||
fn net_service() {
|
fn net_service() {
|
||||||
let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
|
let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
|
||||||
service.start().unwrap();
|
service.start().unwrap();
|
||||||
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", 1, &[1u8]).unwrap();
|
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1u8)]).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -64,10 +64,8 @@ pub enum NetworkIoMessage {
|
|||||||
handler: Arc<NetworkProtocolHandler + Sync>,
|
handler: Arc<NetworkProtocolHandler + Sync>,
|
||||||
/// Protocol Id.
|
/// Protocol Id.
|
||||||
protocol: ProtocolId,
|
protocol: ProtocolId,
|
||||||
/// Supported protocol versions.
|
/// Supported protocol versions and number of packet IDs reserved by the protocol (packet count).
|
||||||
versions: Vec<u8>,
|
versions: Vec<(u8, u8)>,
|
||||||
/// Number of packet IDs reserved by the protocol.
|
|
||||||
packet_count: u8,
|
|
||||||
},
|
},
|
||||||
/// Register a new protocol timer
|
/// Register a new protocol timer
|
||||||
AddTimer {
|
AddTimer {
|
||||||
|
@ -218,10 +218,10 @@ fn execute<S, I>(command: I) -> Result<(), Error> where I: IntoIterator<Item=S>,
|
|||||||
network.start()?;
|
network.start()?;
|
||||||
|
|
||||||
// Attach whisper protocol to the network service
|
// Attach whisper protocol to the network service
|
||||||
network.register_protocol(whisper_network_handler.clone(), whisper::net::PROTOCOL_ID, whisper::net::PACKET_COUNT,
|
network.register_protocol(whisper_network_handler.clone(), whisper::net::PROTOCOL_ID,
|
||||||
whisper::net::SUPPORTED_VERSIONS)?;
|
whisper::net::SUPPORTED_VERSIONS)?;
|
||||||
network.register_protocol(Arc::new(whisper::net::ParityExtensions), whisper::net::PARITY_PROTOCOL_ID,
|
network.register_protocol(Arc::new(whisper::net::ParityExtensions), whisper::net::PARITY_PROTOCOL_ID,
|
||||||
whisper::net::PACKET_COUNT, whisper::net::SUPPORTED_VERSIONS)?;
|
whisper::net::SUPPORTED_VERSIONS)?;
|
||||||
|
|
||||||
// Request handler
|
// Request handler
|
||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
|
@ -41,15 +41,17 @@ const RALLY_TIMEOUT: Duration = Duration::from_millis(2500);
|
|||||||
/// Current protocol version.
|
/// Current protocol version.
|
||||||
pub const PROTOCOL_VERSION: usize = 6;
|
pub const PROTOCOL_VERSION: usize = 6;
|
||||||
|
|
||||||
|
/// Number of packets. A bunch are reserved.
|
||||||
|
const PACKET_COUNT: u8 = 128;
|
||||||
|
|
||||||
/// Supported protocol versions.
|
/// Supported protocol versions.
|
||||||
pub const SUPPORTED_VERSIONS: &'static [u8] = &[PROTOCOL_VERSION as u8];
|
pub const SUPPORTED_VERSIONS: &'static [(u8, u8)] = &[
|
||||||
|
(PROTOCOL_VERSION as u8, PACKET_COUNT)
|
||||||
|
];
|
||||||
|
|
||||||
// maximum tolerated delay between messages packets.
|
// maximum tolerated delay between messages packets.
|
||||||
const MAX_TOLERATED_DELAY: Duration = Duration::from_millis(5000);
|
const MAX_TOLERATED_DELAY: Duration = Duration::from_millis(5000);
|
||||||
|
|
||||||
/// Number of packets. A bunch are reserved.
|
|
||||||
pub const PACKET_COUNT: u8 = 128;
|
|
||||||
|
|
||||||
/// Whisper protocol ID
|
/// Whisper protocol ID
|
||||||
pub const PROTOCOL_ID: ::network::ProtocolId = *b"shh";
|
pub const PROTOCOL_ID: ::network::ProtocolId = *b"shh";
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user