Hash Content RPC method (#2355)

* Moving file fetching to separate crate.

* ethcore_hashContent

* Tests running on mocked fetch.

* Limiting size of downloadable assets
This commit is contained in:
Tomasz Drwięga
2016-09-27 16:27:06 +02:00
committed by Gav Wood
parent 3fb3f1f54e
commit d7bbc5cc3f
21 changed files with 486 additions and 173 deletions

18
util/fetch/Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
description = "HTTP/HTTPS fetching library"
homepage = "http://ethcore.io"
license = "GPL-3.0"
name = "fetch"
version = "0.1.0"
authors = ["Ethcore <admin@ethcore.io>"]
[dependencies]
log = "0.3"
rand = "0.3"
hyper = { default-features = false, git = "https://github.com/ethcore/hyper" }
https-fetch = { path = "../https-fetch" }
clippy = { version = "0.0.90", optional = true}
[features]
default = []
dev = ["clippy"]

146
util/fetch/src/client.rs Normal file
View File

@@ -0,0 +1,146 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Fetching
use std::{env, io};
use std::sync::{mpsc, Arc};
use std::sync::atomic::AtomicBool;
use std::path::PathBuf;
use hyper;
use https_fetch as https;
use fetch_file::{FetchHandler, Error as HttpFetchError};
pub type FetchResult = Result<PathBuf, FetchError>;
#[derive(Debug)]
pub enum FetchError {
InvalidUrl,
Http(HttpFetchError),
Https(https::FetchError),
Io(io::Error),
Other(String),
}
impl From<HttpFetchError> for FetchError {
fn from(e: HttpFetchError) -> Self {
FetchError::Http(e)
}
}
impl From<io::Error> for FetchError {
fn from(e: io::Error) -> Self {
FetchError::Io(e)
}
}
pub trait Fetch: Default + Send {
/// Fetch URL and get the result in callback.
fn request_async(&mut self, url: &str, abort: Arc<AtomicBool>, on_done: Box<Fn(FetchResult) + Send>) -> Result<(), FetchError>;
/// Fetch URL and get a result Receiver. You will be notified when receiver is ready by `on_done` callback.
fn request(&mut self, url: &str, abort: Arc<AtomicBool>, on_done: Box<Fn() + Send>) -> Result<mpsc::Receiver<FetchResult>, FetchError> {
let (tx, rx) = mpsc::channel();
try!(self.request_async(url, abort, Box::new(move |result| {
let res = tx.send(result);
if let Err(_) = res {
warn!("Fetch finished, but no one was listening");
}
on_done();
})));
Ok(rx)
}
/// Closes this client
fn close(self) {}
/// Returns a random filename
fn random_filename() -> String {
use ::rand::Rng;
let mut rng = ::rand::OsRng::new().unwrap();
rng.gen_ascii_chars().take(12).collect()
}
}
pub struct Client {
http_client: hyper::Client<FetchHandler>,
https_client: https::Client,
limit: Option<usize>,
}
impl Default for Client {
fn default() -> Self {
// Max 15MB will be downloaded.
Client::with_limit(Some(15*1024*1024))
}
}
impl Client {
fn with_limit(limit: Option<usize>) -> Self {
Client {
http_client: hyper::Client::new().expect("Unable to initialize http client."),
https_client: https::Client::with_limit(limit).expect("Unable to initialize https client."),
limit: limit,
}
}
fn convert_url(url: hyper::Url) -> Result<https::Url, FetchError> {
let host = format!("{}", try!(url.host().ok_or(FetchError::InvalidUrl)));
let port = try!(url.port_or_known_default().ok_or(FetchError::InvalidUrl));
https::Url::new(&host, port, url.path()).map_err(|_| FetchError::InvalidUrl)
}
fn temp_path() -> PathBuf {
let mut dir = env::temp_dir();
dir.push(Self::random_filename());
dir
}
}
impl Fetch for Client {
fn close(self) {
self.http_client.close();
self.https_client.close();
}
fn request_async(&mut self, url: &str, abort: Arc<AtomicBool>, on_done: Box<Fn(FetchResult) + Send>) -> Result<(), FetchError> {
let is_https = url.starts_with("https://");
let url = try!(url.parse().map_err(|_| FetchError::InvalidUrl));
let temp_path = Self::temp_path();
trace!(target: "fetch", "Fetching from: {:?}", url);
if is_https {
let url = try!(Self::convert_url(url));
try!(self.https_client.fetch_to_file(
url,
temp_path.clone(),
abort,
move |result| on_done(result.map(|_| temp_path).map_err(FetchError::Https)),
).map_err(|e| FetchError::Other(format!("{:?}", e))));
} else {
try!(self.http_client.request(
url,
FetchHandler::new(temp_path, abort, Box::new(move |result| on_done(result)), self.limit.map(|v| v as u64).clone()),
).map_err(|e| FetchError::Other(format!("{:?}", e))));
}
Ok(())
}
}

View File

@@ -0,0 +1,176 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Hyper Client Handler to Fetch File
use std::{io, fs, fmt};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use hyper::status::StatusCode;
use hyper::client::{Request, Response, DefaultTransport as HttpStream};
use hyper::header::Connection;
use hyper::{self, Decoder, Encoder, Next};
use super::FetchError;
#[derive(Debug)]
pub enum Error {
Aborted,
NotStarted,
SizeLimit,
UnexpectedStatus(StatusCode),
IoError(io::Error),
HyperError(hyper::Error),
}
pub type FetchResult = Result<PathBuf, FetchError>;
pub type OnDone = Box<Fn(FetchResult) + Send>;
pub struct FetchHandler {
path: PathBuf,
abort: Arc<AtomicBool>,
file: Option<fs::File>,
result: Option<FetchResult>,
on_done: Option<OnDone>,
size_limit: Option<u64>,
}
impl fmt::Debug for FetchHandler {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Fetch {{ path: {:?}, file: {:?}, result: {:?} }}", self.path, self.file, self.result)
}
}
impl Drop for FetchHandler {
fn drop(&mut self) {
let res = self.result.take().unwrap_or(Err(Error::NotStarted.into()));
// Remove file if there was an error
if res.is_err() || self.is_aborted() {
if let Some(file) = self.file.take() {
drop(file);
// Remove file
let _ = fs::remove_file(&self.path);
}
}
// send result
if let Some(f) = self.on_done.take() {
f(res);
}
}
}
impl FetchHandler {
pub fn new(path: PathBuf, abort: Arc<AtomicBool>, on_done: OnDone, size_limit: Option<u64>) -> Self {
FetchHandler {
path: path,
abort: abort,
file: None,
result: None,
on_done: Some(on_done),
size_limit: size_limit,
}
}
fn is_aborted(&self) -> bool {
self.abort.load(Ordering::SeqCst)
}
fn mark_aborted(&mut self) -> Next {
self.result = Some(Err(Error::Aborted.into()));
Next::end()
}
}
impl hyper::client::Handler<HttpStream> for FetchHandler {
fn on_request(&mut self, req: &mut Request) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
req.headers_mut().set(Connection::close());
read()
}
fn on_request_writable(&mut self, _encoder: &mut Encoder<HttpStream>) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
read()
}
fn on_response(&mut self, res: Response) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
if *res.status() != StatusCode::Ok {
self.result = Some(Err(Error::UnexpectedStatus(*res.status()).into()));
return Next::end();
}
// Open file to write
match fs::File::create(&self.path) {
Ok(file) => {
self.file = Some(file);
self.result = Some(Ok(self.path.clone()));
read()
},
Err(err) => {
self.result = Some(Err(Error::IoError(err).into()));
Next::end()
},
}
}
fn on_response_readable(&mut self, decoder: &mut Decoder<HttpStream>) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
match io::copy(decoder, self.file.as_mut().expect("File is there because on_response has created it.")) {
Ok(0) => Next::end(),
Ok(bytes_read) => match self.size_limit {
None => read(),
// Check limit
Some(limit) if limit > bytes_read => {
self.size_limit = Some(limit - bytes_read);
read()
},
// Size limit reached
_ => {
self.result = Some(Err(Error::SizeLimit.into()));
Next::end()
},
},
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => Next::read(),
_ => {
self.result = Some(Err(Error::IoError(e).into()));
Next::end()
}
}
}
}
fn on_error(&mut self, err: hyper::Error) -> Next {
self.result = Some(Err(Error::HyperError(err).into()));
Next::remove()
}
}
fn read() -> Next {
Next::read().timeout(Duration::from_secs(15))
}

29
util/fetch/src/lib.rs Normal file
View File

@@ -0,0 +1,29 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! A service to fetch any HTTP / HTTPS content.
#[macro_use]
extern crate log;
extern crate hyper;
extern crate https_fetch;
extern crate rand;
pub mod client;
pub mod fetch_file;
pub use self::client::{Client, Fetch, FetchError, FetchResult};

View File

@@ -78,6 +78,10 @@ impl Drop for Client {
impl Client {
pub fn new() -> Result<Self, FetchError> {
Self::with_limit(None)
}
pub fn with_limit(size_limit: Option<usize>) -> Result<Self, FetchError> {
let mut event_loop = try!(mio::EventLoop::new());
let channel = event_loop.channel();
@@ -85,6 +89,7 @@ impl Client {
let mut client = ClientLoop {
next_token: 0,
sessions: HashMap::new(),
size_limit: size_limit,
};
event_loop.run(&mut client).unwrap();
});
@@ -128,6 +133,7 @@ impl Client {
pub struct ClientLoop {
next_token: usize,
sessions: HashMap<usize, TlsClient>,
size_limit: Option<usize>,
}
impl mio::Handler for ClientLoop {
@@ -154,7 +160,7 @@ impl mio::Handler for ClientLoop {
let token = self.next_token;
self.next_token += 1;
if let Ok(mut tlsclient) = TlsClient::new(mio::Token(token), &url, writer, abort, callback) {
if let Ok(mut tlsclient) = TlsClient::new(mio::Token(token), &url, writer, abort, callback, self.size_limit.clone()) {
let httpreq = format!(
"GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\nAccept-Encoding: identity\r\n\r\n",
url.path(),

View File

@@ -35,18 +35,20 @@ pub struct HttpProcessor {
status: Option<String>,
headers: Vec<String>,
body_writer: io::BufWriter<Box<io::Write>>,
size_limit: Option<usize>,
}
const BREAK_LEN: usize = 2;
impl HttpProcessor {
pub fn new(body_writer: Box<io::Write>) -> Self {
pub fn new(body_writer: Box<io::Write>, size_limit: Option<usize>) -> Self {
HttpProcessor {
state: State::WaitingForStatus,
buffer: Cursor::new(Vec::new()),
status: None,
headers: Vec::new(),
body_writer: io::BufWriter::new(body_writer)
body_writer: io::BufWriter::new(body_writer),
size_limit: size_limit,
}
}
@@ -140,6 +142,15 @@ impl HttpProcessor {
},
State::WritingBody => {
let len = self.buffer.get_ref().len();
match self.size_limit {
None => {},
Some(limit) if limit > len => {},
_ => {
warn!("Finishing file fetching because limit was reached.");
self.set_state(State::Finished);
continue;
}
}
try!(self.body_writer.write_all(self.buffer.get_ref()));
self.buffer_consume(len);
return Ok(());
@@ -167,6 +178,17 @@ impl HttpProcessor {
},
// Buffers the data until we have a full chunk
State::WritingChunk(left) if self.buffer.get_ref().len() >= left => {
match self.size_limit {
None => {},
Some(limit) if limit > left => {
self.size_limit = Some(limit - left);
},
_ => {
warn!("Finishing file fetching because limit was reached.");
self.set_state(State::Finished);
continue;
}
}
try!(self.body_writer.write_all(&self.buffer.get_ref()[0..left]));
self.buffer_consume(left + BREAK_LEN);
@@ -230,7 +252,7 @@ mod tests {
#[test]
fn should_be_able_to_process_status_line() {
// given
let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())));
let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())), None);
// when
let out =
@@ -249,7 +271,7 @@ mod tests {
#[test]
fn should_be_able_to_process_headers() {
// given
let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())));
let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())), None);
// when
let out =
@@ -274,7 +296,7 @@ mod tests {
fn should_be_able_to_consume_body() {
// given
let (writer, data) = Writer::new();
let mut http = HttpProcessor::new(Box::new(writer));
let mut http = HttpProcessor::new(Box::new(writer), None);
// when
let out =
@@ -301,7 +323,7 @@ mod tests {
fn should_correctly_handle_chunked_content() {
// given
let (writer, data) = Writer::new();
let mut http = HttpProcessor::new(Box::new(writer));
let mut http = HttpProcessor::new(Box::new(writer), None);
// when
let out =
@@ -331,4 +353,40 @@ mod tests {
assert_eq!(data.borrow().get_ref()[..], b"Parity in\r\n\r\nchunks."[..]);
assert_eq!(http.state(), State::Finished);
}
#[test]
fn should_stop_fetching_when_limit_is_reached() {
// given
let (writer, data) = Writer::new();
let mut http = HttpProcessor::new(Box::new(writer), Some(5));
// when
let out =
"\
HTTP/1.1 200 OK\r\n\
Host: 127.0.0.1:8080\r\n\
Transfer-Encoding: chunked\r\n\
Connection: close\r\n\
\r\n\
4\r\n\
Pari\r\n\
3\r\n\
ty \r\n\
D\r\n\
in\r\n\
\r\n\
chunks.\r\n\
0\r\n\
\r\n\
";
http.write_all(out.as_bytes()).unwrap();
http.flush().unwrap();
// then
assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK");
assert_eq!(http.headers().len(), 3);
assert_eq!(data.borrow().get_ref()[..], b"Pari"[..]);
assert_eq!(http.state(), State::Finished);
}
}

View File

@@ -87,6 +87,7 @@ impl TlsClient {
writer: Box<io::Write + Send>,
abort: Arc<AtomicBool>,
mut callback: Box<FnMut(FetchResult) + Send>,
size_limit: Option<usize>,
) -> Result<Self, FetchError> {
let res = TlsClient::make_config().and_then(|cfg| {
TcpStream::connect(url.address()).map(|sock| {
@@ -98,7 +99,7 @@ impl TlsClient {
Ok((cfg, sock)) => Ok(TlsClient {
abort: abort,
token: token,
writer: HttpProcessor::new(writer),
writer: HttpProcessor::new(writer, size_limit),
socket: sock,
closing: false,
error: None,