// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
//! Fetching
use std::{io, fmt, time};
use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool};
use futures::{self, BoxFuture, Future};
use futures_cpupool::{CpuPool, CpuFuture};
use mime::{self, Mime};
use parking_lot::RwLock;
use reqwest;
/// Fetch abort control
#[derive(Default, Debug, Clone)]
pub struct Abort(Arc);
impl Abort {
/// Returns `true` if request is aborted.
pub fn is_aborted(&self) -> bool {
self.0.load(atomic::Ordering::SeqCst)
}
}
impl From> for Abort {
fn from(a: Arc) -> Self {
Abort(a)
}
}
/// Fetch
pub trait Fetch: Clone + Send + Sync + 'static {
/// Result type
type Result: Future- + Send + 'static;
/// Creates new Fetch object.
fn new() -> Result where Self: Sized;
/// Spawn the future in context of this `Fetch` thread pool.
/// Implementation is optional.
fn process(&self, f: F) -> BoxFuture where
F: Future
- + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
f.boxed()
}
/// Spawn the future in context of this `Fetch` thread pool as "fire and forget", i.e. dropping this future without
/// canceling the underlying future.
/// Implementation is optional.
fn forget(&self, _: F) where
F: Future
- + Send + 'static,
I: Send + 'static,
E: Send + 'static {}
/// Fetch URL and get a future for the result.
/// Supports aborting the request in the middle of execution.
fn fetch_with_abort(&self, url: &str, abort: Abort) -> Self::Result;
/// Fetch URL and get a future for the result.
fn fetch(&self, url: &str) -> Self::Result {
self.fetch_with_abort(url, Default::default())
}
/// Fetch URL and get the result synchronously.
fn fetch_sync(&self, url: &str) -> Result {
self.fetch(url).wait()
}
/// Closes this client
fn close(self) where Self: Sized {}
}
const CLIENT_TIMEOUT_SECONDS: u64 = 5;
/// Fetch client
pub struct Client {
client: RwLock<(time::Instant, Arc)>,
pool: CpuPool,
limit: Option,
}
impl Clone for Client {
fn clone(&self) -> Self {
let (ref time, ref client) = *self.client.read();
Client {
client: RwLock::new((time.clone(), client.clone())),
pool: self.pool.clone(),
limit: self.limit.clone(),
}
}
}
impl Client {
fn new_client() -> Result, Error> {
let mut client = reqwest::Client::new()?;
client.redirect(reqwest::RedirectPolicy::limited(5));
Ok(Arc::new(client))
}
fn with_limit(limit: Option) -> Result {
Ok(Client {
client: RwLock::new((time::Instant::now(), Self::new_client()?)),
pool: CpuPool::new(4),
limit: limit,
})
}
fn client(&self) -> Result, Error> {
{
let (ref time, ref client) = *self.client.read();
if time.elapsed() < time::Duration::from_secs(CLIENT_TIMEOUT_SECONDS) {
return Ok(client.clone());
}
}
let client = Self::new_client()?;
*self.client.write() = (time::Instant::now(), client.clone());
Ok(client)
}
/// Returns a handle to underlying CpuPool of this client.
pub fn pool(&self) -> CpuPool {
self.pool.clone()
}
}
impl Fetch for Client {
type Result = CpuFuture;
fn new() -> Result {
// Max 50MB will be downloaded.
Self::with_limit(Some(50*1024*1024))
}
fn process(&self, f: F) -> BoxFuture where
F: Future
- + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
self.pool.spawn(f).boxed()
}
fn forget(&self, f: F) where
F: Future
- + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
self.pool.spawn(f).forget()
}
fn fetch_with_abort(&self, url: &str, abort: Abort) -> Self::Result {
debug!(target: "fetch", "Fetching from: {:?}", url);
match self.client() {
Ok(client) => {
self.pool.spawn(FetchTask {
url: url.into(),
client: client,
limit: self.limit,
abort: abort,
})
},
Err(err) => {
self.pool.spawn(futures::future::err(err))
},
}
}
}
struct FetchTask {
url: String,
client: Arc,
limit: Option,
abort: Abort,
}
impl Future for FetchTask {
// TODO [ToDr] timeouts handling?
type Item = Response;
type Error = Error;
fn poll(&mut self) -> futures::Poll {
if self.abort.is_aborted() {
trace!(target: "fetch", "Fetch of {:?} aborted.", self.url);
return Err(Error::Aborted);
}
trace!(target: "fetch", "Starting fetch task: {:?}", self.url);
let result = self.client.get(&self.url)
.header(reqwest::header::UserAgent("Parity Fetch".into()))
.send()?;
Ok(futures::Async::Ready(Response {
inner: ResponseInner::Response(result),
abort: self.abort.clone(),
limit: self.limit,
read: 0,
}))
}
}
/// Fetch Error
#[derive(Debug)]
pub enum Error {
/// Internal fetch error
Fetch(reqwest::Error),
/// Request aborted
Aborted,
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Aborted => write!(fmt, "The request has been aborted."),
Error::Fetch(ref err) => write!(fmt, "{}", err),
}
}
}
impl From for Error {
fn from(error: reqwest::Error) -> Self {
Error::Fetch(error)
}
}
enum ResponseInner {
Response(reqwest::Response),
Reader(Box),
NotFound,
}
impl fmt::Debug for ResponseInner {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ResponseInner::Response(ref response) => response.fmt(f),
ResponseInner::NotFound => write!(f, "Not found"),
ResponseInner::Reader(_) => write!(f, "io Reader"),
}
}
}
/// A fetch response type.
#[derive(Debug)]
pub struct Response {
inner: ResponseInner,
abort: Abort,
limit: Option,
read: usize,
}
impl Response {
/// Creates new successfuly response reading from a file.
pub fn from_reader(reader: R) -> Self {
Response {
inner: ResponseInner::Reader(Box::new(reader)),
abort: Abort::default(),
limit: None,
read: 0,
}
}
/// Creates 404 response (useful for tests)
pub fn not_found() -> Self {
Response {
inner: ResponseInner::NotFound,
abort: Abort::default(),
limit: None,
read: 0,
}
}
/// Returns status code of this response.
pub fn status(&self) -> reqwest::StatusCode {
match self.inner {
ResponseInner::Response(ref r) => *r.status(),
ResponseInner::NotFound => reqwest::StatusCode::NotFound,
_ => reqwest::StatusCode::Ok,
}
}
/// Returns `true` if response status code is successful.
pub fn is_success(&self) -> bool {
self.status() == reqwest::StatusCode::Ok
}
/// Returns `true` if content type of this response is `text/html`
pub fn is_html(&self) -> bool {
match self.content_type() {
Some(Mime(mime::TopLevel::Text, mime::SubLevel::Html, _)) => true,
_ => false,
}
}
/// Returns content type of this response (if present)
pub fn content_type(&self) -> Option {
match self.inner {
ResponseInner::Response(ref r) => {
let content_type = r.headers().get::();
content_type.map(|mime| mime.0.clone())
},
_ => None,
}
}
}
impl io::Read for Response {
fn read(&mut self, buf: &mut [u8]) -> io::Result {
if self.abort.is_aborted() {
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "Fetch aborted."));
}
let res = match self.inner {
ResponseInner::Response(ref mut response) => response.read(buf),
ResponseInner::NotFound => return Ok(0),
ResponseInner::Reader(ref mut reader) => reader.read(buf),
};
// increase bytes read
if let Ok(read) = res {
self.read += read;
}
// check limit
match self.limit {
Some(limit) if limit < self.read => {
return Err(io::Error::new(io::ErrorKind::PermissionDenied, "Size limit reached."));
},
_ => {},
}
res
}
}