Net service

This commit is contained in:
arkpar 2015-12-17 11:42:30 +01:00
parent c8bac97be1
commit a9bd050d2f
7 changed files with 367 additions and 90 deletions

View File

@ -11,7 +11,7 @@ log = "0.3"
env_logger = "0.3"
rustc-serialize = "0.3"
arrayvec = "0.3"
mio = "0.4.4"
mio = "0.5.*"
rand = "0.3.12"
time = "0.1.34"
tiny-keccak = "1.0"

View File

@ -120,7 +120,7 @@ impl Connection {
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", self.token);
self.interest.insert(EventSet::readable());
event_loop.register_opt(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
event_loop.register(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
error!("Failed to reregister {:?}, {:?}", self.token, e);
Err(e)
})

View File

@ -4,17 +4,15 @@ use hash::*;
use bytes::Bytes;
use crypto::*;
use crypto;
use network::connection::{Connection, WriteStatus};
use network::connection::{Connection};
use network::host::{NodeId, Host, HostInfo};
use network::Error;
#[derive(PartialEq, Eq)]
#[derive(PartialEq, Eq, Debug)]
enum HandshakeState {
New,
ReadingAuth,
WritingAuth,
ReadingAck,
WritingAck,
StartSession,
}
@ -89,7 +87,7 @@ impl Handshake {
None => {}
};
},
_ => { panic!("Unexpected state") }
_ => { panic!("Unexpected state"); }
}
if self.state != HandshakeState::StartSession {
try!(self.connection.reregister(event_loop));
@ -99,27 +97,7 @@ impl Handshake {
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), Error> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
match self.state {
HandshakeState::WritingAuth => {
match try!(self.connection.writable()) {
WriteStatus::Complete => {
self.connection.expect(ACK_PACKET_SIZE);
self.state = HandshakeState::ReadingAck;
},
_ => {}
};
},
HandshakeState::WritingAck => {
match try!(self.connection.writable()) {
WriteStatus::Complete => {
self.connection.expect(32);
self.state = HandshakeState::StartSession;
},
_ => {}
};
},
_ => { panic!("Unexpected state") }
}
try!(self.connection.writable());
if self.state != HandshakeState::StartSession {
try!(self.connection.reregister(event_loop));
}
@ -185,7 +163,8 @@ impl Handshake {
let message = try!(crypto::ecies::encrypt(&self.id, &data));
self.auth_cipher = message.clone();
self.connection.send(message);
self.state = HandshakeState::WritingAuth;
self.connection.expect(ACK_PACKET_SIZE);
self.state = HandshakeState::ReadingAck;
Ok(())
}
@ -203,7 +182,7 @@ impl Handshake {
let message = try!(crypto::ecies::encrypt(&self.id, &data));
self.ack_cipher = message.clone();
self.connection.send(message);
self.state = HandshakeState::WritingAck;
self.state = HandshakeState::StartSession;
Ok(())
}
}

View File

@ -13,15 +13,17 @@ use crypto::*;
use rlp::*;
use time::Tm;
use network::handshake::Handshake;
use network::session::Session;
use network::Error;
use network::session::{Session, SessionData};
use network::{Error, ProtocolHandler};
const DEFAULT_PORT: u16 = 30304;
const MAX_CONNECTIONS: usize = 1024;
const MAX_USER_TIMERS: usize = 32;
const IDEAL_PEERS:u32 = 10;
pub type NodeId = H512;
type TimerFun = Fn(&mut HostIo) -> bool + Send;
#[derive(Debug)]
struct NetworkConfiguration {
@ -141,33 +143,100 @@ const NODETABLE_MAINTAIN: usize = 5;
const NODETABLE_DISCOVERY: usize = 6;
const FIRST_CONNECTION: usize = 7;
const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1;
const USER_TIMER: usize = LAST_CONNECTION;
const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1;
pub type PacketId = u32;
pub type ProtocolId = &'static str;
pub enum HostMessage {
Shutdown
Shutdown,
AddHandler {
handler: Box<ProtocolHandler+Send>,
protocol: ProtocolId,
versions: Vec<u8>,
},
Send {
peer: PeerId,
packet_id: PacketId,
protocol: ProtocolId,
data: Vec<u8>,
},
AddTimer {
handler: Box<TimerFun>,
delay: u32,
protocol: ProtocolId,
}
}
pub type PeerId = u32;
#[derive(Debug, PartialEq, Eq)]
pub struct CapabilityInfo {
pub protocol: String,
pub version: u32,
pub protocol: ProtocolId,
pub version: u8,
pub packet_count: u8,
}
impl Encodable for CapabilityInfo {
fn encode<E>(&self, encoder: &mut E) -> () where E: Encoder {
encoder.emit_list(|e| {
self.protocol.encode(e);
self.version.encode(e);
(self.version as u32).encode(e);
});
}
}
impl Decodable for CapabilityInfo {
fn decode_untrusted(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
Ok(CapabilityInfo {
protocol: try!(String::decode_untrusted(&try!(rlp.at(0)))),
version: try!(u32::decode_untrusted(&try!(rlp.at(1)))),
})
pub struct HostIo<'s> {
protocol: ProtocolId,
session: Option<&'s mut Session>,
event_loop: &'s mut EventLoop<Host>,
channel: &'s mut Sender<HostMessage>
}
impl<'s> HostIo<'s> {
fn new(protocol: ProtocolId, session: Option<&'s mut Session>, event_loop: &'s mut EventLoop<Host>, channel: &'s mut Sender<HostMessage>) -> HostIo<'s> {
HostIo {
protocol: protocol,
session: session,
event_loop: event_loop,
channel: channel,
}
}
pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error> {
try!(self.channel.send(HostMessage::Send {
peer: peer,
packet_id: packet_id,
protocol: self.protocol,
data: data
}));
Ok(())
}
pub fn respond(&mut self, packet_id: PacketId, data: &[u8]) -> Result<(), Error> {
match self.session.as_mut() {
Some(session) => session.send_packet(self.protocol, packet_id as u8, data),
None => {
panic!("Respond: Session does not exist")
}
}
}
fn register_timer(&mut self, ms: u32, handler: Box<TimerFun>) -> Result<(), Error>{
try!(self.channel.send(HostMessage::AddTimer {
delay: ms,
handler: handler,
protocol: self.protocol,
}));
Ok(())
}
}
struct UserTimer {
handler: Box<TimerFun>,
protocol: ProtocolId,
delay: u32,
}
pub struct HostInfo {
@ -201,16 +270,18 @@ enum ConnectionEntry {
pub struct Host {
info: HostInfo,
sender: Sender<HostMessage>,
udp_socket: UdpSocket,
listener: TcpListener,
connections: Slab<ConnectionEntry>,
timers: Slab<UserTimer>,
nodes: HashMap<NodeId, Node>,
handlers: HashMap<ProtocolId, Box<ProtocolHandler>>,
idle_timeout: Timeout,
channel: Sender<HostMessage>,
}
impl Host {
pub fn start() {
pub fn start(event_loop: &mut EventLoop<Host>) -> Result<(), Error> {
let config = NetworkConfiguration::new();
/*
match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() {
@ -222,19 +293,16 @@ impl Host {
let addr = config.listen_address;
// Setup the server socket
let listener = TcpListener::bind(&addr).unwrap();
// Create an event loop
let mut event_loop = EventLoop::new().unwrap();
let sender = event_loop.channel();
// Start listening for incoming connections
event_loop.register_opt(&listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap();
event_loop.register(&listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap();
// Setup the client socket
//let sock = TcpStream::connect(&addr).unwrap();
// Register the socket
//self.event_loop.register_opt(&sock, CLIENT, EventSet::readable(), PollOpt::edge()).unwrap();
//self.event_loop.register(&sock, CLIENT, EventSet::readable(), PollOpt::edge()).unwrap();
let idle_timeout = event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay
// open the udp socket
let udp_socket = UdpSocket::bound(&addr).unwrap();
event_loop.register_opt(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap();
event_loop.register(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap();
event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap();
let port = config.listen_address.port();
@ -246,14 +314,17 @@ impl Host {
protocol_version: 4,
client_version: "parity".to_string(),
listen_port: port,
capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }],
//capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }],
capabilities: Vec::new(),
},
sender: sender,
udp_socket: udp_socket,
listener: listener,
connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS),
timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS),
nodes: HashMap::new(),
idle_timeout: idle_timeout
handlers: HashMap::new(),
idle_timeout: idle_timeout,
channel: event_loop.channel(),
};
host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303");
@ -263,14 +334,8 @@ impl Host {
host.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303");
host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303");
event_loop.run(&mut host).unwrap();
}
fn stop(&mut self) {
}
fn have_network(&mut self) -> bool {
true
try!(event_loop.run(&mut host));
Ok(())
}
fn add_node(&mut self, id: &str) {
@ -422,6 +487,8 @@ impl Host {
self.start_session(token, event_loop);
}
}
fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
let mut kill = false;
let mut create_session = false;
@ -435,10 +502,31 @@ impl Host {
create_session = h.done();
},
Some(&mut ConnectionEntry::Session(ref mut s)) => {
s.readable(event_loop, &self.info).unwrap_or_else(|e| {
let sd = { s.readable(event_loop, &self.info).unwrap_or_else(|e| {
debug!(target: "net", "Session read error: {:?}", e);
kill = true;
});
SessionData::None
}) };
match sd {
SessionData::Ready => {
for (p, h) in self.handlers.iter_mut() {
if s.have_capability(p) {
h.connected(&mut HostIo::new(p, Some(s), event_loop, &mut self.channel), &(token.as_usize() as u32));
}
}
},
SessionData::Packet {
data,
protocol,
packet_id,
} => {
match self.handlers.get_mut(protocol) {
None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) },
Some(h) => h.read(&mut HostIo::new(protocol, Some(s), event_loop, &mut self.channel), packet_id, &data[1..])
}
},
SessionData::None => {},
}
}
_ => {
warn!(target: "net", "Received event for unknown connection");
@ -504,7 +592,62 @@ impl Handler for Host {
FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, event_loop),
NODETABLE_DISCOVERY => {},
NODETABLE_MAINTAIN => {},
_ => panic!("Received unknown timer token"),
USER_TIMER ... LAST_USER_TIMER => {
let timer = self.timers.get_mut(token).expect("Unknown user timer token");
if (*timer.handler)(&mut HostIo::new(timer.protocol, None, event_loop, &mut self.channel)) {
event_loop.timeout_ms(token, timer.delay as u64).expect("Unable to reregister user timer");
}
}
_ => panic!("Unknown timer token"),
}
}
fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
match msg {
HostMessage::Shutdown => event_loop.shutdown(),
HostMessage::AddHandler {
handler,
protocol,
versions
} => {
self.handlers.insert(protocol, handler);
for v in versions {
self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: v, packet_count:0 });
}
},
HostMessage::Send {
peer,
packet_id,
protocol,
data,
} => {
match self.connections.get_mut(Token(peer as usize)) {
Some(&mut ConnectionEntry::Session(ref mut s)) => {
s.send_packet(protocol, packet_id as u8, &data).unwrap_or_else(|e| {
warn!(target: "net", "Send error: {:?}", e);
}); //TODO: don't copy vector data
},
_ => {
warn!(target: "net", "Send: Peer does not exist");
}
}
},
HostMessage::AddTimer {
handler,
delay,
protocol,
} => {
match self.timers.insert(UserTimer {
handler: handler,
delay: delay,
protocol: protocol,
}) {
Ok(token) => {
event_loop.timeout_ms(token, delay as u64).expect("Error registering user timer");
},
_ => { panic!("Max timers reached") }
}
}
}
}
}

View File

@ -4,6 +4,7 @@ mod connection;
mod handshake;
mod session;
mod discovery;
mod service;
#[derive(Debug, Copy, Clone)]
pub enum DisconnectReason
@ -31,6 +32,7 @@ pub enum Error {
AddressParse(::std::net::AddrParseError),
AddressResolve(Option<::std::io::Error>),
NodeIdParse(::error::EthcoreError),
PeerNotFound,
Disconnect(DisconnectReason)
}
@ -61,7 +63,22 @@ impl From<::rlp::DecoderError> for Error {
}
}
pub fn start_host()
{
let _ = host::Host::start();
impl From<::mio::NotifyError<host::HostMessage>> for Error {
fn from(_err: ::mio::NotifyError<host::HostMessage>) -> Error {
Error::Io(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error"))
}
}
pub type PeerId = host::PeerId;
pub type HandlerIo<'s> = host::HostIo<'s>;
pub trait ProtocolHandler: Send {
fn initialize(&mut self, io: &mut HandlerIo);
fn read(&mut self, io: &mut HandlerIo, packet_id: u8, data: &[u8]);
fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId);
fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId);
}
pub struct NetworkClient;

52
src/network/service.rs Normal file
View File

@ -0,0 +1,52 @@
#![allow(dead_code)] //TODO: remove this after everything is done
use std::thread::{self, JoinHandle};
use mio::*;
use network::{Error, ProtocolHandler};
use network::host::{Host, HostMessage, PeerId, PacketId, ProtocolId};
pub struct NetworkService {
thread: Option<JoinHandle<()>>,
host_channel: Sender<HostMessage>
}
impl NetworkService {
pub fn start() -> Result<NetworkService, Error> {
let mut event_loop = EventLoop::new().unwrap();
let channel = event_loop.channel();
let thread = thread::spawn(move || {
Host::start(&mut event_loop).unwrap(); //TODO:
});
Ok(NetworkService {
thread: Some(thread),
host_channel: channel
})
}
pub fn send(&mut self, peer: &PeerId, packet_id: PacketId, protocol: ProtocolId, data: &[u8]) -> Result<(), Error> {
try!(self.host_channel.send(HostMessage::Send {
peer: *peer,
packet_id: packet_id,
protocol: protocol,
data: data.to_vec()
}));
Ok(())
}
pub fn register_protocol(&mut self, handler: Box<ProtocolHandler+Send>, protocol: ProtocolId, versions: &[u8]) -> Result<(), Error> {
try!(self.host_channel.send(HostMessage::AddHandler {
handler: handler,
protocol: protocol,
versions: versions.to_vec(),
}));
Ok(())
}
}
impl Drop for NetworkService {
fn drop(&mut self) {
self.host_channel.send(HostMessage::Shutdown).unwrap();
self.thread.take().unwrap().join().unwrap();
}
}

View File

@ -14,11 +14,44 @@ pub struct Session {
had_hello: bool,
}
pub enum SessionData {
None,
Ready,
Packet {
data: Vec<u8>,
protocol: &'static str,
packet_id: u8,
},
}
pub struct SessionInfo {
pub id: NodeId,
pub client_version: String,
pub protocol_version: u32,
pub capabilities: Vec<CapabilityInfo>,
pub capabilities: Vec<SessionCapabilityInfo>,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PeerCapabilityInfo {
pub protocol: String,
pub version: u8,
}
impl Decodable for PeerCapabilityInfo {
fn decode_untrusted(rlp: &UntrustedRlp) -> Result<Self, DecoderError> {
Ok(PeerCapabilityInfo {
protocol: try!(String::decode_untrusted(&try!(rlp.at(0)))),
version: try!(u32::decode_untrusted(&try!(rlp.at(1)))) as u8,
})
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct SessionCapabilityInfo {
pub protocol: &'static str,
pub version: u8,
pub packet_count: u8,
pub id_offset: u8,
}
const PACKET_HELLO: u8 = 0x80;
@ -50,43 +83,76 @@ impl Session {
Ok(session)
}
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<(), Error> {
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<SessionData, Error> {
match try!(self.connection.readable(event_loop)) {
Some(data) => {
try!(self.read_packet(data, host));
},
None => {}
};
Ok(())
Some(data) => self.read_packet(data, host),
None => Ok(SessionData::None)
}
}
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), Error> {
self.connection.writable(event_loop)
}
pub fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<(), Error> {
let data = &packet.data;
if data.len() < 2 {
pub fn have_capability(&self, protocol: &str) -> bool {
self.info.capabilities.iter().any(|c| c.protocol == protocol)
}
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), Error> {
let mut i = 0usize;
while protocol != self.info.capabilities[i].protocol {
i += 1;
if i == self.info.capabilities.len() {
debug!(target: "net", "Unkown protocol: {:?}", protocol);
return Ok(())
}
}
let pid = self.info.capabilities[i].id_offset + packet_id;
let mut rlp = RlpStream::new();
rlp.append(&(pid as u32));
rlp.append_raw(data, 1);
self.connection.send_packet(&rlp.out())
}
fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<SessionData, Error> {
if packet.data.len() < 2 {
return Err(Error::BadProtocol);
}
let packet_id = data[0];
let rlp = UntrustedRlp::new(&data[1..]); //TODO: validate rlp expected size
let packet_id = packet.data[0];
if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello {
return Err(Error::BadProtocol);
}
match packet_id {
PACKET_HELLO => self.read_hello(&rlp, host),
PACKET_HELLO => {
let rlp = UntrustedRlp::new(&packet.data[1..]); //TODO: validate rlp expected size
try!(self.read_hello(&rlp, host));
Ok(SessionData::Ready)
}
PACKET_DISCONNECT => Err(Error::Disconnect(DisconnectReason::DisconnectRequested)),
PACKET_PING => self.write_pong(),
PACKET_GET_PEERS => Ok(()), //TODO;
PACKET_PEERS => Ok(()),
PACKET_PING => {
try!(self.write_pong());
Ok(SessionData::None)
}
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;
PACKET_PEERS => Ok(SessionData::None),
PACKET_USER ... PACKET_LAST => {
warn!(target: "net", "User packet: {:?}", rlp);
Ok(())
let mut i = 0usize;
while packet_id < self.info.capabilities[i].id_offset {
i += 1;
if i == self.info.capabilities.len() {
debug!(target: "net", "Unkown packet: {:?}", packet_id);
return Ok(SessionData::None)
}
}
// map to protocol
let protocol = self.info.capabilities[i].protocol;
let pid = packet_id - self.info.capabilities[i].id_offset;
return Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
},
_ => {
debug!(target: "net", "Unkown packet: {:?}", rlp);
Ok(())
debug!(target: "net", "Unkown packet: {:?}", packet_id);
Ok(SessionData::None)
}
}
}
@ -106,12 +172,24 @@ impl Session {
fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), Error> {
let protocol = try!(u32::decode_untrusted(&try!(rlp.at(0))));
let client_version = try!(String::decode_untrusted(&try!(rlp.at(1))));
let mut caps: Vec<CapabilityInfo> = try!(Decodable::decode_untrusted(&try!(rlp.at(2))));
let peer_caps: Vec<PeerCapabilityInfo> = try!(Decodable::decode_untrusted(&try!(rlp.at(2))));
let id = try!(NodeId::decode_untrusted(&try!(rlp.at(4))));
// Intersect with host capabilities
// Leave only highset mutually supported capability version
caps.retain(|c| host.capabilities.contains(&c));
let mut caps: Vec<SessionCapabilityInfo> = Vec::new();
for hc in host.capabilities.iter() {
if peer_caps.iter().any(|c| c.protocol == hc.protocol && c.version == hc.version) {
caps.push(SessionCapabilityInfo {
protocol: hc.protocol,
version: hc.version,
id_offset: 0,
packet_count: hc.packet_count,
});
}
}
caps.retain(|c| host.capabilities.iter().any(|hc| hc.protocol == c.protocol && hc.version == c.version));
let mut i = 0;
while i < caps.len() {
if caps.iter().any(|c| c.protocol == caps[i].protocol && c.version > caps[i].version) {
@ -122,7 +200,15 @@ impl Session {
}
}
i = 0;
let mut offset: u8 = PACKET_USER;
while i < caps.len() {
caps[i].id_offset = offset;
offset += caps[i].packet_count;
i += 1;
}
trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps);
self.info.capabilities = caps;
if protocol != host.protocol_version {
return Err(self.disconnect(DisconnectReason::UselessPeer));
}