Expanse compatibility (#2369)

* Add support for Expanse.

* Fix build.

* Refactor to be able to alter the eth subprotocol name

* Fix JSON.

* Support exp hardfork.

* Fix exp json again.

* Fixed test

* Fix tests.
This commit is contained in:
Gav Wood
2016-09-28 14:21:59 +02:00
committed by GitHub
parent fb92a98451
commit 15a14a5f49
15 changed files with 190 additions and 45 deletions

View File

@@ -137,7 +137,7 @@ const SYS_TIMER: usize = LAST_SESSION + 1;
/// Protocol handler level packet id
pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = &'static str;
pub type ProtocolId = [u8; 3];
/// Messages used to communitate with the event loop from other threads.
#[derive(Clone)]
@@ -185,7 +185,7 @@ pub struct CapabilityInfo {
impl Encodable for CapabilityInfo {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(2);
s.append(&self.protocol);
s.append(&&self.protocol[..]);
s.append(&self.version);
}
}
@@ -284,10 +284,13 @@ impl<'s> NetworkContext<'s> {
}
/// Returns max version for a given protocol.
pub fn protocol_version(&self, peer: PeerId, protocol: &str) -> Option<u8> {
pub fn protocol_version(&self, peer: PeerId, protocol: ProtocolId) -> Option<u8> {
let session = self.resolve_session(peer);
session.and_then(|s| s.lock().capability_version(protocol))
}
/// Returns this object's subprotocol name.
pub fn subprotocol_name(&self) -> ProtocolId { self.protocol }
}
/// Shared host information
@@ -801,8 +804,8 @@ impl Host {
}
}
for (p, _) in self.handlers.read().iter() {
if s.have_capability(p) {
ready_data.push(p);
if s.have_capability(*p) {
ready_data.push(*p);
}
}
},
@@ -811,7 +814,7 @@ impl Host {
protocol,
packet_id,
}) => {
match self.handlers.read().get(protocol) {
match self.handlers.read().get(&protocol) {
None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) },
Some(_) => packet_data.push((protocol, packet_id, data)),
}
@@ -826,13 +829,13 @@ impl Host {
}
let handlers = self.handlers.read();
for p in ready_data {
let h = handlers.get(p).unwrap().clone();
let h = handlers.get(&p).unwrap().clone();
self.stats.inc_sessions();
let reserved = self.reserved_nodes.read();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
}
for (p, packet_id, data) in packet_data {
let h = handlers.get(p).unwrap().clone();
let h = handlers.get(&p).unwrap().clone();
let reserved = self.reserved_nodes.read();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
}
@@ -857,8 +860,8 @@ impl Host {
if s.is_ready() {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
for (p, _) in self.handlers.read().iter() {
if s.have_capability(p) {
to_disconnect.push(p);
if s.have_capability(*p) {
to_disconnect.push(*p);
}
}
}
@@ -874,7 +877,7 @@ impl Host {
}
}
for p in to_disconnect {
let h = self.handlers.read().get(p).unwrap().clone();
let h = self.handlers.read().get(&p).unwrap().clone();
let reserved = self.reserved_nodes.read();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
@@ -980,7 +983,7 @@ impl IoHandler<NetworkIoMessage> for Host {
self.nodes.write().clear_useless();
},
_ => match self.timers.read().get(&token).cloned() {
Some(timer) => match self.handlers.read().get(timer.protocol).cloned() {
Some(timer) => match self.handlers.read().get(&timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => {
let reserved = self.reserved_nodes.read();
@@ -1004,11 +1007,11 @@ impl IoHandler<NetworkIoMessage> for Host {
} => {
let h = handler.clone();
let reserved = self.reserved_nodes.read();
h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved));
self.handlers.write().insert(protocol, h);
h.initialize(&NetworkContext::new(io, *protocol, None, self.sessions.clone(), &reserved));
self.handlers.write().insert(*protocol, h);
let mut info = self.info.write();
for v in versions {
info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 });
info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count:0 });
}
},
NetworkIoMessage::AddTimer {
@@ -1023,7 +1026,7 @@ impl IoHandler<NetworkIoMessage> for Host {
*counter += 1;
handler_token
};
self.timers.write().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
self.timers.write().insert(handler_token, ProtocolTimer { protocol: *protocol, token: *token });
io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
},
NetworkIoMessage::Disconnect(ref peer) => {

View File

@@ -45,7 +45,7 @@
//!
//! fn main () {
//! let mut service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service");
//! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
//! service.register_protocol(Arc::new(MyHandler), *b"myp", &[1u8]);
//! service.start().expect("Error starting service");
//!
//! // Wait for quit condition

View File

@@ -14,8 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{str, io};
use std::net::SocketAddr;
use std::io;
use std::sync::*;
use mio::*;
use mio::tcp::*;
@@ -63,7 +63,7 @@ pub enum SessionData {
/// Packet data
data: Vec<u8>,
/// Packet protocol ID
protocol: &'static str,
protocol: [u8; 3],
/// Zero based packet ID
packet_id: u8,
},
@@ -89,15 +89,21 @@ pub struct SessionInfo {
#[derive(Debug, PartialEq, Eq)]
pub struct PeerCapabilityInfo {
pub protocol: String,
pub protocol: ProtocolId,
pub version: u8,
}
impl Decodable for PeerCapabilityInfo {
fn decode<D>(decoder: &D) -> Result<Self, DecoderError> where D: Decoder {
let c = decoder.as_rlp();
let p: Vec<u8> = try!(c.val_at(0));
if p.len() != 3 {
return Err(DecoderError::Custom("Invalid subprotocol string length. Should be 3"));
}
let mut p2: ProtocolId = [0u8; 3];
p2.clone_from_slice(&p);
Ok(PeerCapabilityInfo {
protocol: try!(c.val_at(0)),
protocol: p2,
version: try!(c.val_at(1))
})
}
@@ -105,7 +111,7 @@ impl Decodable for PeerCapabilityInfo {
#[derive(Debug)]
struct SessionCapabilityInfo {
pub protocol: &'static str,
pub protocol: [u8; 3],
pub version: u8,
pub packet_count: u8,
pub id_offset: u8,
@@ -239,12 +245,12 @@ impl Session {
}
/// Checks if peer supports given capability
pub fn have_capability(&self, protocol: &str) -> bool {
pub fn have_capability(&self, protocol: [u8; 3]) -> bool {
self.info.capabilities.iter().any(|c| c.protocol == protocol)
}
/// Checks if peer supports given capability
pub fn capability_version(&self, protocol: &str) -> Option<u8> {
pub fn capability_version(&self, protocol: [u8; 3]) -> Option<u8> {
self.info.capabilities.iter().filter_map(|c| if c.protocol == protocol { Some(c.version) } else { None }).max()
}
@@ -270,10 +276,10 @@ impl Session {
}
/// Send a protocol packet to peer.
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), NetworkError>
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: [u8; 3], packet_id: u8, data: &[u8]) -> Result<(), NetworkError>
where Message: Send + Sync + Clone {
if self.info.capabilities.is_empty() || !self.had_hello {
debug!(target: "network", "Sending to unconfirmed session {}, protocol: {}, packet: {}", self.token(), protocol, packet_id);
debug!(target: "network", "Sending to unconfirmed session {}, protocol: {}, packet: {}", self.token(), str::from_utf8(&protocol[..]).unwrap_or("??"), packet_id);
return Err(From::from(NetworkError::BadProtocol));
}
if self.expired() {

View File

@@ -41,7 +41,7 @@ impl TestProtocol {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
let handler = Arc::new(TestProtocol::new(drop_session));
service.register_protocol(handler.clone(), "test", &[42u8, 43u8]).expect("Error registering test protocol handler");
service.register_protocol(handler.clone(), *b"tst", &[42u8, 43u8]).expect("Error registering test protocol handler");
handler
}
@@ -93,7 +93,7 @@ impl NetworkProtocolHandler for TestProtocol {
fn net_service() {
let service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service");
service.start().unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[1u8]).unwrap();
}
#[test]