// Copyright 2015, 2016 Ethcore (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . //! HTTP format processor use std::io::{self, Cursor, Write}; use std::cmp; #[derive(Debug, Clone, Copy, PartialEq)] pub enum State { WaitingForStatus, WaitingForHeaders, WaitingForChunk, WritingBody, WritingChunk(usize), Finished, } pub struct HttpProcessor { state: State, buffer: Cursor>, status: Option, headers: Vec, body_writer: io::BufWriter>, size_limit: Option, } const BREAK_LEN: usize = 2; impl HttpProcessor { pub fn new(body_writer: Box, size_limit: Option) -> Self { HttpProcessor { state: State::WaitingForStatus, buffer: Cursor::new(Vec::new()), status: None, headers: Vec::new(), body_writer: io::BufWriter::new(body_writer), size_limit: size_limit, } } #[cfg(test)] pub fn status(&self) -> Option<&String> { self.status.as_ref() } #[cfg(test)] pub fn headers(&self) -> &[String] { &self.headers } fn find_break_index(&mut self) -> Option { let data = self.buffer.get_ref(); let mut idx = 0; let mut got_r = false; // looks for \r\n in data for b in data { idx += 1; if got_r && b == &10u8 { return Some(idx); } else if !got_r && b == &13u8 { got_r = true; } else { got_r = false; } } None } // Consumes bytes from internal buffer fn buffer_consume(&mut self, bytes: usize) { let bytes = cmp::min(bytes, self.buffer.get_ref().len()); // Drain data self.buffer.get_mut().drain(0..bytes); let len = self.buffer.position(); self.buffer.set_position(len - bytes as u64); } fn buffer_to_string(&mut self, bytes: usize) -> String { let val = String::from_utf8_lossy(&self.buffer.get_ref()[0..bytes-BREAK_LEN]).into_owned(); self.buffer_consume(bytes); val } fn is_chunked(&self) -> bool { self.headers .iter() .find(|item| item.to_lowercase().contains("transfer-encoding: chunked")) .is_some() } fn set_state(&mut self, state: State) { self.state = state; trace!("Changing state to {:?}", state); } fn process_buffer(&mut self) -> io::Result<()> { // consume data and perform state transitions loop { match self.state { State::WaitingForStatus => { if let Some(break_index) = self.find_break_index() { let status = self.buffer_to_string(break_index); debug!("Read status: {:?}", status); self.status = Some(status); self.set_state(State::WaitingForHeaders); } else { // wait for more data return Ok(()); } }, State::WaitingForHeaders => { match self.find_break_index() { // Last header - got empty line, body starts Some(BREAK_LEN) => { self.buffer_consume(BREAK_LEN); let is_chunked = self.is_chunked(); self.set_state(match is_chunked { true => State::WaitingForChunk, false => State::WritingBody, }); }, Some(break_index) => { let header = self.buffer_to_string(break_index); debug!("Found header: {:?}", header); self.headers.push(header); }, None => return Ok(()), } }, State::WritingBody => { let len = self.buffer.get_ref().len(); match self.size_limit { None => {}, Some(limit) if limit > len => {}, _ => { warn!("Finishing file fetching because limit was reached."); self.set_state(State::Finished); continue; } } try!(self.body_writer.write_all(self.buffer.get_ref())); self.buffer_consume(len); return Ok(()); }, State::WaitingForChunk => { match self.find_break_index() { None => return Ok(()), // last chunk - size 0 Some(BREAK_LEN) => { self.state = State::Finished; }, Some(break_index) => { let chunk_size = self.buffer_to_string(break_index); self.set_state(if let Ok(size) = usize::from_str_radix(&chunk_size, 16) { State::WritingChunk(size) } else { warn!("Error parsing server chunked response. Invalid chunk size."); State::Finished }); } } }, State::WritingChunk(0) => { self.set_state(State::Finished); }, // Buffers the data until we have a full chunk State::WritingChunk(left) if self.buffer.get_ref().len() >= left => { match self.size_limit { None => {}, Some(limit) if limit > left => { self.size_limit = Some(limit - left); }, _ => { warn!("Finishing file fetching because limit was reached."); self.set_state(State::Finished); continue; } } try!(self.body_writer.write_all(&self.buffer.get_ref()[0..left])); self.buffer_consume(left + BREAK_LEN); self.set_state(State::WaitingForChunk); }, // Wait for more data State::WritingChunk(_) => return Ok(()), // Just consume buffer State::Finished => { let len = self.buffer.get_ref().len(); self.buffer_consume(len); return Ok(()); }, } } } #[cfg(test)] pub fn state(&self) -> State { self.state } } impl io::Write for HttpProcessor { fn write(&mut self, bytes: &[u8]) -> io::Result { let result = self.buffer.write(bytes); try!(self.process_buffer()); result } fn flush(&mut self) -> io::Result<()> { self.buffer.flush().and_then(|_| { self.body_writer.flush() }) } } #[cfg(test)] mod tests { use std::rc::Rc; use std::cell::RefCell; use std::io::{self, Write, Cursor}; use super::*; struct Writer { data: Rc>>>, } impl Writer { fn new() -> (Self, Rc>>>) { let data = Rc::new(RefCell::new(Cursor::new(Vec::new()))); (Writer { data: data.clone() }, data) } } impl Write for Writer { fn write(&mut self, buf: &[u8]) -> io::Result { self.data.borrow_mut().write(buf) } fn flush(&mut self) -> io::Result<()> { self.data.borrow_mut().flush() } } #[test] fn should_be_able_to_process_status_line() { // given let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())), None); // when let out = "\ HTTP/1.1 200 OK\r\n\ Server: Pari "; http.write_all(out.as_bytes()).unwrap(); http.flush().unwrap(); // then assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK"); assert_eq!(http.state(), State::WaitingForHeaders); } #[test] fn should_be_able_to_process_headers() { // given let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())), None); // when let out = "\ HTTP/1.1 200 OK\r\n\ Server: Parity/1.1.1\r\n\ Connection: close\r\n\ Content-Length: 2\r\n\ Content-Type: application/json\r\n\ \r\n\ "; http.write_all(out.as_bytes()).unwrap(); http.flush().unwrap(); // then assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK"); assert_eq!(http.headers().len(), 4); assert_eq!(http.state(), State::WritingBody); } #[test] fn should_be_able_to_consume_body() { // given let (writer, data) = Writer::new(); let mut http = HttpProcessor::new(Box::new(writer), None); // when let out = "\ HTTP/1.1 200 OK\r\n\ Server: Parity/1.1.1\r\n\ Connection: close\r\n\ Content-Length: 2\r\n\ Content-Type: application/json\r\n\ \r\n\ Some data\ "; http.write_all(out.as_bytes()).unwrap(); http.flush().unwrap(); // then assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK"); assert_eq!(http.headers().len(), 4); assert_eq!(http.state(), State::WritingBody); assert_eq!(data.borrow().get_ref()[..], b"Some data"[..]); } #[test] fn should_correctly_handle_chunked_content() { // given let (writer, data) = Writer::new(); let mut http = HttpProcessor::new(Box::new(writer), None); // when let out = "\ HTTP/1.1 200 OK\r\n\ Host: 127.0.0.1:8080\r\n\ Transfer-Encoding: chunked\r\n\ Connection: close\r\n\ \r\n\ 4\r\n\ Pari\r\n\ 3\r\n\ ty \r\n\ D\r\n\ in\r\n\ \r\n\ chunks.\r\n\ 0\r\n\ \r\n\ "; http.write_all(out.as_bytes()).unwrap(); http.flush().unwrap(); // then assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK"); assert_eq!(http.headers().len(), 3); assert_eq!(data.borrow().get_ref()[..], b"Parity in\r\n\r\nchunks."[..]); assert_eq!(http.state(), State::Finished); } #[test] fn should_stop_fetching_when_limit_is_reached() { // given let (writer, data) = Writer::new(); let mut http = HttpProcessor::new(Box::new(writer), Some(5)); // when let out = "\ HTTP/1.1 200 OK\r\n\ Host: 127.0.0.1:8080\r\n\ Transfer-Encoding: chunked\r\n\ Connection: close\r\n\ \r\n\ 4\r\n\ Pari\r\n\ 3\r\n\ ty \r\n\ D\r\n\ in\r\n\ \r\n\ chunks.\r\n\ 0\r\n\ \r\n\ "; http.write_all(out.as_bytes()).unwrap(); http.flush().unwrap(); // then assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK"); assert_eq!(http.headers().len(), 3); assert_eq!(data.borrow().get_ref()[..], b"Pari"[..]); assert_eq!(http.state(), State::Finished); } }