Deregister handshake properly when converting to session
This commit is contained in:
parent
b66f88181a
commit
69df91de68
@ -175,6 +175,18 @@ impl Connection {
|
|||||||
self.socket.peer_addr()
|
self.socket.peer_addr()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn try_clone(&self) -> io::Result<Self> {
|
||||||
|
Ok(Connection {
|
||||||
|
token: self.token,
|
||||||
|
socket: try!(self.socket.try_clone()),
|
||||||
|
rec_buf: Vec::new(),
|
||||||
|
rec_size: 0,
|
||||||
|
send_queue: VecDeque::new(),
|
||||||
|
interest: EventSet::hup() | EventSet::readable(),
|
||||||
|
stats: self.stats.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Register this connection with the IO event loop.
|
/// Register this connection with the IO event loop.
|
||||||
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
||||||
trace!(target: "net", "connection register; token={:?}", reg);
|
trace!(target: "net", "connection register; token={:?}", reg);
|
||||||
@ -265,7 +277,7 @@ impl EncryptedConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
|
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
|
||||||
pub fn new(mut handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
|
pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, UtilError> {
|
||||||
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
|
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
|
||||||
let mut nonce_material = H512::new();
|
let mut nonce_material = H512::new();
|
||||||
if handshake.originated {
|
if handshake.originated {
|
||||||
@ -300,9 +312,8 @@ impl EncryptedConnection {
|
|||||||
ingress_mac.update(&mac_material);
|
ingress_mac.update(&mac_material);
|
||||||
ingress_mac.update(if handshake.originated { &handshake.ack_cipher } else { &handshake.auth_cipher });
|
ingress_mac.update(if handshake.originated { &handshake.ack_cipher } else { &handshake.auth_cipher });
|
||||||
|
|
||||||
handshake.connection.expect(ENCRYPTED_HEADER_LEN);
|
let mut enc = EncryptedConnection {
|
||||||
Ok(EncryptedConnection {
|
connection: try!(handshake.connection.try_clone()),
|
||||||
connection: handshake.connection,
|
|
||||||
encoder: encoder,
|
encoder: encoder,
|
||||||
decoder: decoder,
|
decoder: decoder,
|
||||||
mac_encoder: mac_encoder,
|
mac_encoder: mac_encoder,
|
||||||
@ -311,7 +322,9 @@ impl EncryptedConnection {
|
|||||||
read_state: EncryptedConnectionState::Header,
|
read_state: EncryptedConnectionState::Header,
|
||||||
protocol_id: 0,
|
protocol_id: 0,
|
||||||
payload_len: 0
|
payload_len: 0
|
||||||
})
|
};
|
||||||
|
enc.connection.expect(ENCRYPTED_HEADER_LEN);
|
||||||
|
Ok(enc)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a packet
|
/// Send a packet
|
||||||
@ -440,6 +453,12 @@ impl EncryptedConnection {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register socket with the event lpop. This should be called at the end of the event loop.
|
||||||
|
pub fn register_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
|
try!(self.connection.register_socket(reg, event_loop));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Update connection registration. This should be called at the end of the event loop.
|
/// Update connection registration. This should be called at the end of the event loop.
|
||||||
pub fn update_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
pub fn update_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
try!(self.connection.update_socket(reg, event_loop));
|
try!(self.connection.update_socket(reg, event_loop));
|
||||||
|
@ -544,7 +544,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
if let Some(handshake) = handshake {
|
if let Some(handshake) = handshake {
|
||||||
let mut h = handshake.lock().unwrap();
|
let mut h = handshake.lock().unwrap();
|
||||||
if let Err(e) = h.writable(io, &self.info.read().unwrap()) {
|
if let Err(e) = h.writable(io, &self.info.read().unwrap()) {
|
||||||
debug!(target: "net", "Handshake write error: {}:{:?}", token, e);
|
trace!(target: "net", "Handshake write error: {}: {:?}", token, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -554,7 +554,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
if let Some(session) = session {
|
if let Some(session) = session {
|
||||||
let mut s = session.lock().unwrap();
|
let mut s = session.lock().unwrap();
|
||||||
if let Err(e) = s.writable(io, &self.info.read().unwrap()) {
|
if let Err(e) = s.writable(io, &self.info.read().unwrap()) {
|
||||||
debug!(target: "net", "Session write error: {}:{:?}", token, e);
|
trace!(target: "net", "Session write error: {}: {:?}", token, e);
|
||||||
}
|
}
|
||||||
io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
|
io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
|
||||||
}
|
}
|
||||||
@ -571,7 +571,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
if let Some(handshake) = handshake {
|
if let Some(handshake) = handshake {
|
||||||
let mut h = handshake.lock().unwrap();
|
let mut h = handshake.lock().unwrap();
|
||||||
if let Err(e) = h.readable(io, &self.info.read().unwrap()) {
|
if let Err(e) = h.readable(io, &self.info.read().unwrap()) {
|
||||||
debug!(target: "net", "Handshake read error: {}:{:?}", token, e);
|
debug!(target: "net", "Handshake read error: {}: {:?}", token, e);
|
||||||
kill = true;
|
kill = true;
|
||||||
}
|
}
|
||||||
if h.done() {
|
if h.done() {
|
||||||
@ -583,7 +583,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
return;
|
return;
|
||||||
} else if create_session {
|
} else if create_session {
|
||||||
self.start_session(token, io);
|
self.start_session(token, io);
|
||||||
io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
|
return;
|
||||||
}
|
}
|
||||||
io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Token registration error: {:?}", e));
|
io.update_registration(token).unwrap_or_else(|e| debug!(target: "net", "Token registration error: {:?}", e));
|
||||||
}
|
}
|
||||||
@ -597,7 +597,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
let mut s = session.lock().unwrap();
|
let mut s = session.lock().unwrap();
|
||||||
match s.readable(io, &self.info.read().unwrap()) {
|
match s.readable(io, &self.info.read().unwrap()) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!(target: "net", "Session read error: {}:{:?}", token, e);
|
debug!(target: "net", "Session read error: {}: {:?}", token, e);
|
||||||
kill = true;
|
kill = true;
|
||||||
},
|
},
|
||||||
Ok(SessionData::Ready) => {
|
Ok(SessionData::Ready) => {
|
||||||
@ -642,16 +642,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
|
|
||||||
// turn a handshake into a session
|
// turn a handshake into a session
|
||||||
let mut sessions = self.sessions.write().unwrap();
|
let mut sessions = self.sessions.write().unwrap();
|
||||||
let mut h = handshakes.remove(token).unwrap();
|
let mut h = handshakes.get_mut(token).unwrap().lock().unwrap();
|
||||||
// wait for other threads to stop using it
|
|
||||||
{
|
|
||||||
while Arc::get_mut(&mut h).is_none() {
|
|
||||||
h.lock().ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let h = Arc::try_unwrap(h).ok().unwrap().into_inner().unwrap();
|
|
||||||
let originated = h.originated;
|
let originated = h.originated;
|
||||||
let mut session = match Session::new(h, &self.info.read().unwrap()) {
|
let mut session = match Session::new(&mut h, &self.info.read().unwrap()) {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Session creation error: {:?}", e);
|
debug!("Session creation error: {:?}", e);
|
||||||
@ -660,7 +653,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
};
|
};
|
||||||
let result = sessions.insert_with(move |session_token| {
|
let result = sessions.insert_with(move |session_token| {
|
||||||
session.set_token(session_token);
|
session.set_token(session_token);
|
||||||
io.update_registration(session_token).expect("Error updating session registration");
|
io.deregister_stream(token).expect("Error deleting handshake registration");
|
||||||
|
io.register_stream(session_token).expect("Error creating session registration");
|
||||||
self.stats.inc_sessions();
|
self.stats.inc_sessions();
|
||||||
if !originated {
|
if !originated {
|
||||||
// Add it no node table
|
// Add it no node table
|
||||||
@ -872,7 +866,10 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
|
fn register_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
|
||||||
match stream {
|
match stream {
|
||||||
FIRST_SESSION ... LAST_SESSION => {
|
FIRST_SESSION ... LAST_SESSION => {
|
||||||
warn!("Unexpected session stream registration");
|
let session = { self.sessions.read().unwrap().get(stream).cloned() };
|
||||||
|
if let Some(session) = session {
|
||||||
|
session.lock().unwrap().register_socket(reg, event_loop).expect("Error registering socket");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
FIRST_HANDSHAKE ... LAST_HANDSHAKE => {
|
FIRST_HANDSHAKE ... LAST_HANDSHAKE => {
|
||||||
let connection = { self.handshakes.read().unwrap().get(stream).cloned() };
|
let connection = { self.handshakes.read().unwrap().get(stream).cloned() };
|
||||||
|
@ -109,8 +109,8 @@ const PACKET_USER: u8 = 0x10;
|
|||||||
const PACKET_LAST: u8 = 0x7f;
|
const PACKET_LAST: u8 = 0x7f;
|
||||||
|
|
||||||
impl Session {
|
impl Session {
|
||||||
/// Create a new session out of comepleted handshake. Consumes handshake object.
|
/// Create a new session out of comepleted handshake.
|
||||||
pub fn new(h: Handshake, host: &HostInfo) -> Result<Session, UtilError> {
|
pub fn new(h: &mut Handshake, host: &HostInfo) -> Result<Session, UtilError> {
|
||||||
let id = h.id.clone();
|
let id = h.id.clone();
|
||||||
let connection = try!(EncryptedConnection::new(h));
|
let connection = try!(EncryptedConnection::new(h));
|
||||||
let mut session = Session {
|
let mut session = Session {
|
||||||
@ -169,6 +169,12 @@ impl Session {
|
|||||||
self.info.capabilities.iter().any(|c| c.protocol == protocol)
|
self.info.capabilities.iter().any(|c| c.protocol == protocol)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register the session socket with the event loop
|
||||||
|
pub fn register_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
|
try!(self.connection.register_socket(reg, event_loop));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Update registration with the event loop. Should be called at the end of the IO handler.
|
/// Update registration with the event loop. Should be called at the end of the IO handler.
|
||||||
pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
self.connection.update_socket(reg, event_loop)
|
self.connection.update_socket(reg, event_loop)
|
||||||
|
Loading…
Reference in New Issue
Block a user