From bff847b90c7736594428ee838ce47312228cb80b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 31 Aug 2016 10:43:55 +0200 Subject: [PATCH] Chunked encoding parser --- dapps/src/apps/fetcher.rs | 5 +- dapps/src/apps/urlhint.rs | 4 +- dapps/src/handlers/fetch.rs | 5 +- util/https-fetch/examples/fetch.rs | 8 +- util/https-fetch/src/client.rs | 15 +- util/https-fetch/src/http.rs | 327 +++++++++++++++++++++++++++++ util/https-fetch/src/lib.rs | 1 + util/https-fetch/src/tlsclient.rs | 10 +- util/src/sha3.rs | 4 +- 9 files changed, 355 insertions(+), 24 deletions(-) create mode 100644 util/https-fetch/src/http.rs diff --git a/dapps/src/apps/fetcher.rs b/dapps/src/apps/fetcher.rs index afbfda96a..91b6ed990 100644 --- a/dapps/src/apps/fetcher.rs +++ b/dapps/src/apps/fetcher.rs @@ -218,8 +218,8 @@ impl DappHandler for DappInstaller { fn validate_and_install(&self, app_path: PathBuf) -> Result { trace!(target: "dapps", "Opening dapp bundle at {:?}", app_path); - let mut file = try!(fs::File::open(app_path)); - let hash = try!(sha3(&mut file)); + let mut file_reader = io::BufReader::new(try!(fs::File::open(app_path))); + let hash = try!(sha3(&mut file_reader)); let dapp_id = try!(self.dapp_id.as_str().parse().map_err(|_| ValidationError::InvalidDappId)); if dapp_id != hash { return Err(ValidationError::HashMismatch { @@ -227,6 +227,7 @@ impl DappHandler for DappInstaller { got: hash, }); } + let file = file_reader.into_inner(); // Unpack archive let mut zip = try!(zip::ZipArchive::new(file)); // First find manifest file diff --git a/dapps/src/apps/urlhint.rs b/dapps/src/apps/urlhint.rs index 30017cb00..fb3b7c419 100644 --- a/dapps/src/apps/urlhint.rs +++ b/dapps/src/apps/urlhint.rs @@ -33,7 +33,9 @@ pub struct GithubApp { impl GithubApp { pub fn url(&self) -> String { - format!("https://github.com/{}/{}/archive/{}.zip", self.account, self.repo, self.commit.to_hex()) + // Since https fetcher doesn't support redirections we use direct link + // format!("https://github.com/{}/{}/archive/{}.zip", self.account, self.repo, self.commit.to_hex()) + format!("https://codeload.github.com/{}/{}/zip/{}", self.account, self.repo, self.commit.to_hex()) } fn commit(bytes: &[u8]) -> Option<[u8;COMMIT_LEN]> { diff --git a/dapps/src/handlers/fetch.rs b/dapps/src/handlers/fetch.rs index 5169af985..94110e534 100644 --- a/dapps/src/handlers/fetch.rs +++ b/dapps/src/handlers/fetch.rs @@ -16,7 +16,7 @@ //! Hyper Server Handler that fetches a file during a request (proxy). -use std::{fs, fmt}; +use std::fmt; use std::path::PathBuf; use std::sync::mpsc; use std::time::{Instant, Duration}; @@ -171,7 +171,8 @@ impl server::Handler for AppFetcherHandler { Ok(manifest) => FetchState::Done(manifest) }; // Remove temporary zip file - let _ = fs::remove_file(path); + // TODO [todr] Uncomment me + // let _ = fs::remove_file(path); (Some(state), Next::write()) }, Ok(Err(e)) => { diff --git a/util/https-fetch/examples/fetch.rs b/util/https-fetch/examples/fetch.rs index febcad080..7d0806275 100644 --- a/util/https-fetch/examples/fetch.rs +++ b/util/https-fetch/examples/fetch.rs @@ -6,9 +6,7 @@ use https_fetch::*; fn main() { let client = Client::new().unwrap(); - let rx = client.fetch(Url::new("github.com", 443, "/").unwrap(), Box::new(io::stdout())).unwrap(); - - let result = rx.recv().unwrap(); - - assert!(result.is_ok()); + client.fetch(Url::new("github.com", 443, "/").unwrap(), Box::new(io::stdout()), |result| { + assert!(result.is_ok()); + }).unwrap(); } diff --git a/util/https-fetch/src/client.rs b/util/https-fetch/src/client.rs index 78bce769e..77f538479 100644 --- a/util/https-fetch/src/client.rs +++ b/util/https-fetch/src/client.rs @@ -171,7 +171,7 @@ impl mio::Handler for ClientLoop { #[test] fn should_successfuly_fetch_a_page() { use std::io::{self, Cursor}; - use std::sync::Arc; + use std::sync::{mpsc, Arc}; use std::sync::atomic::{AtomicUsize, Ordering}; struct Writer { @@ -197,11 +197,12 @@ fn should_successfuly_fetch_a_page() { wrote: wrote.clone(), data: Cursor::new(Vec::new()), }; - let rx = client.fetch(Url::new("github.com", 443, "/").unwrap(), Box::new(writer)).unwrap(); - - let result = rx.recv().unwrap(); - - assert!(result.is_ok()); - assert!(wrote.load(Ordering::Relaxed) > 0); + let (tx, rx) = mpsc::channel(); + client.fetch(Url::new("github.com", 443, "/").unwrap(), Box::new(writer), move |result| { + assert!(result.is_ok()); + assert!(wrote.load(Ordering::Relaxed) > 0); + tx.send(result).unwrap(); + }); + let _ = rx.recv().unwrap(); } diff --git a/util/https-fetch/src/http.rs b/util/https-fetch/src/http.rs new file mode 100644 index 000000000..27b019cbb --- /dev/null +++ b/util/https-fetch/src/http.rs @@ -0,0 +1,327 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! HTTP format processor + +use std::io::{self, Cursor, Write}; +use std::cmp; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum State { + WaitingForStatus, + WaitingForHeaders, + WaitingForChunk, + WritingBody, + WritingChunk(usize), + Finished, +} + +pub struct HttpProcessor { + state: State, + buffer: Cursor>, + status: Option, + headers: Vec, + body_writer: io::BufWriter>, +} + +const BREAK_LEN: usize = 2; + +impl HttpProcessor { + pub fn new(body_writer: Box) -> Self { + HttpProcessor { + state: State::WaitingForStatus, + buffer: Cursor::new(Vec::new()), + status: None, + headers: Vec::new(), + body_writer: io::BufWriter::new(body_writer) + } + } + + #[cfg(test)] + pub fn status(&self) -> Option<&String> { + self.status.as_ref() + } + + #[cfg(test)] + pub fn headers(&self) -> &[String] { + &self.headers + } + + fn find_break_index(&mut self) -> Option { + let data = self.buffer.get_ref(); + let mut idx = 0; + let mut got_r = false; + // looks for \r\n in data + for b in data { + idx += 1; + if got_r && b == &10u8 { + return Some(idx); + } else if !got_r && b == &13u8 { + got_r = true; + } else { + got_r = false; + } + } + None + } + + // Consumes bytes from internal buffer + fn buffer_consume(&mut self, bytes: usize) { + let bytes = cmp::min(bytes, self.buffer.get_ref().len()); + // Drain data + self.buffer.get_mut().drain(0..bytes); + let len = self.buffer.position(); + self.buffer.set_position(len - bytes as u64); + } + + fn buffer_to_string(&mut self, bytes: usize) -> String { + let val = String::from_utf8_lossy(&self.buffer.get_ref()[0..bytes-BREAK_LEN]).into_owned(); + self.buffer_consume(bytes); + val + } + + fn is_chunked(&self) -> bool { + self.headers + .iter() + .find(|item| item.to_lowercase().contains("transfer-encoding: chunked")) + .is_some() + } + fn set_state(&mut self, state: State) { + self.state = state; + debug!("Changing state to {:?}", state); + } + + fn process_buffer(&mut self) -> io::Result<()> { + // consume data and perform state transitions + loop { + match self.state { + State::WaitingForStatus => { + if let Some(break_index) = self.find_break_index() { + let status = self.buffer_to_string(break_index); + trace!("Read status: {:?}", status); + self.status = Some(status); + self.set_state(State::WaitingForHeaders); + } else { + // wait for more data + return Ok(()); + } + }, + State::WaitingForHeaders => { + match self.find_break_index() { + // Last header - got empty line, body starts + Some(BREAK_LEN) => { + self.buffer_consume(BREAK_LEN); + let is_chunked = self.is_chunked(); + self.set_state(match is_chunked { + true => State::WaitingForChunk, + false => State::WritingBody, + }); + }, + Some(break_index) => { + let header = self.buffer_to_string(break_index); + trace!("Found header: {:?}", header); + self.headers.push(header); + }, + None => return Ok(()), + } + }, + State::WritingBody => { + let len = self.buffer.get_ref().len(); + try!(self.body_writer.write_all(self.buffer.get_ref())); + self.buffer_consume(len); + return Ok(()); + }, + State::WaitingForChunk => { + match self.find_break_index() { + None => return Ok(()), + // last chunk - size 0 + Some(BREAK_LEN) => { + self.state = State::Finished; + }, + Some(break_index) => { + let chunk_size = self.buffer_to_string(break_index); + self.set_state(if let Ok(size) = usize::from_str_radix(&chunk_size, 16) { + State::WritingChunk(size) + } else { + warn!("Error parsing server chunked response. Invalid chunk size."); + State::Finished + }); + } + } + }, + // Buffers the data until we have a full chunk + State::WritingChunk(left) if self.buffer.get_ref().len() >= left => { + try!(self.body_writer.write_all(&self.buffer.get_ref()[0..left])); + self.buffer_consume(left + BREAK_LEN); + + self.set_state(State::WaitingForChunk); + }, + // Wait for more data + State::WritingChunk(_) => return Ok(()), + // Just consume buffer + State::Finished => { + let len = self.buffer.get_ref().len(); + self.buffer_consume(len); + return Ok(()); + }, + } + } + } + + #[cfg(test)] + pub fn state(&self) -> State { + self.state + } +} + +impl io::Write for HttpProcessor { + fn write(&mut self, bytes: &[u8]) -> io::Result { + let result = self.buffer.write(bytes); + try!(self.process_buffer()); + result + } + + fn flush(&mut self) -> io::Result<()> { + self.buffer.flush().and_then(|_| { + self.body_writer.flush() + }) + } +} + +#[cfg(test)] +mod tests { + use std::rc::Rc; + use std::cell::RefCell; + use std::io::{self, Write, Cursor}; + use super::*; + + struct Writer { + data: Rc>>>, + } + + impl Writer { + fn new() -> (Self, Rc>>>) { + let data = Rc::new(RefCell::new(Cursor::new(Vec::new()))); + (Writer { data: data.clone() }, data) + } + } + + impl Write for Writer { + fn write(&mut self, buf: &[u8]) -> io::Result { self.data.borrow_mut().write(buf) } + fn flush(&mut self) -> io::Result<()> { self.data.borrow_mut().flush() } + } + + #[test] + fn should_be_able_to_process_status_line() { + // given + let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new()))); + + // when + let out = + "\ + HTTP/1.1 200 OK\r\n\ + Server: Pari + "; + http.write_all(out.as_bytes()).unwrap(); + + // then + assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK"); + assert_eq!(http.state(), State::WaitingForHeaders); + } + + #[test] + fn should_be_able_to_process_headers() { + // given + let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new()))); + + // when + let out = + "\ + HTTP/1.1 200 OK\r\n\ + Server: Parity/1.1.1\r\n\ + Connection: close\r\n\ + Content-Length: 2\r\n\ + Content-Type: application/json\r\n\ + \r\n\ + "; + http.write_all(out.as_bytes()).unwrap(); + + // then + assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK"); + assert_eq!(http.headers().len(), 4); + assert_eq!(http.state(), State::WritingBody); + } + + #[test] + fn should_be_able_to_consume_body() { + // given + let (writer, data) = Writer::new(); + let mut http = HttpProcessor::new(Box::new(writer)); + + // when + let out = + "\ + HTTP/1.1 200 OK\r\n\ + Server: Parity/1.1.1\r\n\ + Connection: close\r\n\ + Content-Length: 2\r\n\ + Content-Type: application/json\r\n\ + \r\n\ + Some data\ + "; + http.write_all(out.as_bytes()).unwrap(); + + // then + assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK"); + assert_eq!(http.headers().len(), 4); + assert_eq!(http.state(), State::WritingBody); + assert_eq!(data.borrow().get_ref()[..], b"Some data"[..]); + } + + #[test] + fn should_correctly_handle_chunked_content() { + // given + let (writer, data) = Writer::new(); + let mut http = HttpProcessor::new(Box::new(writer)); + + // when + let out = + "\ + HTTP/1.1 200 OK\r\n\ + Host: 127.0.0.1:8080\r\n\ + Transfer-Encoding: chunked\r\n\ + Connection: close\r\n\ + \r\n\ + 4\r\n\ + Pari\r\n\ + 3\r\n\ + ty \r\n\ + D\r\n\ + in\r\n\ + \r\n\ + chunks.\r\n\ + 0\r\n\ + \r\n\ + "; + http.write_all(out.as_bytes()).unwrap(); + + // then + assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK"); + assert_eq!(http.headers().len(), 3); + assert_eq!(data.borrow().get_ref()[..], b"Parity in\r\n\r\nchunks."[..]); + assert_eq!(http.state(), State::WaitingForChunk); + } +} diff --git a/util/https-fetch/src/lib.rs b/util/https-fetch/src/lib.rs index 7dc37d060..edd1186d0 100644 --- a/util/https-fetch/src/lib.rs +++ b/util/https-fetch/src/lib.rs @@ -21,6 +21,7 @@ extern crate mio; mod tlsclient; mod client; mod url; +mod http; pub use self::client::{Client, FetchError, FetchResult}; pub use self::url::{Url, UrlError}; diff --git a/util/https-fetch/src/tlsclient.rs b/util/https-fetch/src/tlsclient.rs index 5e395a88d..8fa7eb11b 100644 --- a/util/https-fetch/src/tlsclient.rs +++ b/util/https-fetch/src/tlsclient.rs @@ -16,14 +16,15 @@ use std::str; use std::sync::Arc; -use std::io::{self, Read, Cursor, BufReader}; +use std::io::{self, Write, Read, Cursor, BufReader}; use mio; use mio::tcp::TcpStream; use rustls::{self, Session}; -use client::{FetchError, ClientLoop, FetchResult}; use url::Url; +use http::HttpProcessor; +use client::{FetchError, ClientLoop, FetchResult}; #[derive(Debug)] pub enum TlsClientError { @@ -40,7 +41,7 @@ pub struct TlsClient { token: mio::Token, socket: TcpStream, tls_session: rustls::ClientSession, - writer: Box, + writer: HttpProcessor, error: Option, closing: bool, callback: Box, @@ -65,7 +66,6 @@ impl io::Read for TlsClient { impl TlsClient { pub fn make_config() -> Result, FetchError> { let mut config = rustls::ClientConfig::new(); - // TODO [ToDr] Windows / MacOs support! let mut cursor = Cursor::new(if cfg!(feature = "ca-github-only") { include_bytes!("./ca-github.crt").to_vec() } else { @@ -92,7 +92,7 @@ impl TlsClient { match res { Ok((cfg, sock)) => Ok(TlsClient { token: token, - writer: writer, + writer: HttpProcessor::new(writer), socket: sock, closing: false, error: None, diff --git a/util/src/sha3.rs b/util/src/sha3.rs index 0dcde2ccb..59b66efa9 100644 --- a/util/src/sha3.rs +++ b/util/src/sha3.rs @@ -17,7 +17,7 @@ //! Wrapper around tiny-keccak crate. extern crate sha3 as sha3_ext; -use std::io; +use std::io::{self, Read}; use std::mem::uninitialized; use tiny_keccak::Keccak; use bytes::{BytesConvertable, Populatable}; @@ -67,7 +67,7 @@ impl Hashable for T where T: BytesConvertable { } /// Calculate SHA3 of given stream. -pub fn sha3(r: &mut R) -> Result { +pub fn sha3(r: &mut io::BufReader) -> Result { let mut output = [0u8; 32]; let mut input = [0u8; 1024]; let mut sha3 = Keccak::new_keccak256();