// Copyright 2015-2017 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . //! Hyper Server Handler that fetches a file during a request (proxy). use std::{fmt, mem}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Instant, Duration}; use fetch::{self, Fetch}; use futures::sync::oneshot; use futures::{self, Future}; use futures_cpupool::CpuPool; use hyper::{self, Method, StatusCode}; use parking_lot::Mutex; use endpoint::{self, EndpointPath}; use handlers::{ContentHandler, StreamingHandler}; use page::local; use {Embeddable}; const FETCH_TIMEOUT: Duration = Duration::from_secs(300); pub enum ValidatorResponse { Local(local::Dapp), Streaming(StreamingHandler), } pub trait ContentValidator: Sized + Send + 'static { type Error: fmt::Debug + fmt::Display; fn validate_and_install(self, fetch::Response) -> Result; } #[derive(Debug, Clone)] pub struct FetchControl { abort: Arc, listeners: Arc>>>, deadline: Instant, } impl Default for FetchControl { fn default() -> Self { FetchControl { abort: Arc::new(AtomicBool::new(false)), listeners: Arc::new(Mutex::new(Vec::new())), deadline: Instant::now() + FETCH_TIMEOUT, } } } impl FetchControl { pub fn is_deadline_reached(&self) -> bool { self.deadline < Instant::now() } pub fn abort(&self) { self.abort.store(true, Ordering::SeqCst); } pub fn to_response(&self, path: EndpointPath) -> endpoint::Response { let (tx, receiver) = oneshot::channel(); self.listeners.lock().push(tx); Box::new(WaitingHandler { path, state: WaitState::Waiting(receiver), }) } fn notify WaitResult>(&self, status: F) { let mut listeners = self.listeners.lock(); for sender in listeners.drain(..) { trace!(target: "dapps", "Resuming request waiting for content..."); if let Err(_) = sender.send(status()) { trace!(target: "dapps", "Waiting listener notification failed."); } } } fn set_status(&self, status: &FetchState) { match *status { FetchState::Error(ref handler) => self.notify(|| WaitResult::Error(handler.clone())), FetchState::Done(ref endpoint, _) => self.notify(|| WaitResult::Done(endpoint.clone())), FetchState::Streaming(_) => self.notify(|| WaitResult::NonAwaitable), FetchState::InProgress(_) => {}, FetchState::Empty => {}, } } } enum WaitState { Waiting(oneshot::Receiver), Done(endpoint::Response), } #[derive(Debug)] enum WaitResult { Error(ContentHandler), Done(local::Dapp), NonAwaitable, } pub struct WaitingHandler { path: EndpointPath, state: WaitState, } impl Future for WaitingHandler { type Item = hyper::Response; type Error = hyper::Error; fn poll(&mut self) -> futures::Poll { loop { let new_state = match self.state { WaitState::Waiting(ref mut receiver) => { let result = try_ready!(receiver.poll().map_err(|_| hyper::Error::Timeout)); match result { WaitResult::Error(handler) => { return Ok(futures::Async::Ready(handler.into())); }, WaitResult::NonAwaitable => { let errors = Errors { embeddable_on: None }; return Ok(futures::Async::Ready(errors.streaming().into())); }, WaitResult::Done(endpoint) => { WaitState::Done(endpoint.to_response(&self.path).into()) }, } }, WaitState::Done(ref mut response) => { return response.poll() }, }; self.state = new_state; } } } #[derive(Debug, Clone)] struct Errors { embeddable_on: Embeddable, } impl Errors { fn streaming(&self) -> ContentHandler { ContentHandler::error( StatusCode::BadGateway, "Streaming Error", "This content is being streamed in other place.", None, self.embeddable_on.clone(), ) } fn download_error(&self, e: E) -> ContentHandler { ContentHandler::error( StatusCode::BadGateway, "Download Error", "There was an error when fetching the content.", Some(&format!("{:?}", e)), self.embeddable_on.clone(), ) } fn invalid_content(&self, e: E) -> ContentHandler { ContentHandler::error( StatusCode::BadGateway, "Invalid Dapp", "Downloaded bundle does not contain a valid content.", Some(&format!("{:?}", e)), self.embeddable_on.clone(), ) } fn timeout_error(&self) -> ContentHandler { ContentHandler::error( StatusCode::GatewayTimeout, "Download Timeout", &format!("Could not fetch content within {} seconds.", FETCH_TIMEOUT.as_secs()), None, self.embeddable_on.clone(), ) } fn method_not_allowed(&self) -> ContentHandler { ContentHandler::error( StatusCode::MethodNotAllowed, "Method Not Allowed", "Only GET requests are allowed.", None, self.embeddable_on.clone(), ) } } enum FetchState { Error(ContentHandler), InProgress(Box + Send>), Streaming(hyper::Response), Done(local::Dapp, endpoint::Response), Empty, } impl fmt::Debug for FetchState { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { use self::FetchState::*; write!(fmt, "FetchState(")?; match *self { Error(ref error) => write!(fmt, "error: {:?}", error), InProgress(_) => write!(fmt, "in progress"), Streaming(ref res) => write!(fmt, "streaming: {:?}", res), Done(ref endpoint, _) => write!(fmt, "done: {:?}", endpoint), Empty => write!(fmt, "?"), }?; write!(fmt, ")") } } #[derive(Debug)] pub struct ContentFetcherHandler { fetch_control: FetchControl, status: FetchState, errors: Errors, } impl ContentFetcherHandler { pub fn fetch_control(&self) -> FetchControl { self.fetch_control.clone() } pub fn new( method: &hyper::Method, url: &str, path: EndpointPath, installer: H, embeddable_on: Embeddable, fetch: F, pool: CpuPool, ) -> Self { let fetch_control = FetchControl::default(); let errors = Errors { embeddable_on }; // Validation of method let status = match *method { // Start fetching content Method::Get => { trace!(target: "dapps", "Fetching content from: {:?}", url); FetchState::InProgress(Self::fetch_content( pool, fetch, url, fetch_control.abort.clone(), path, errors.clone(), installer, )) }, // or return error _ => FetchState::Error(errors.method_not_allowed()), }; ContentFetcherHandler { fetch_control, status, errors, } } fn fetch_content( pool: CpuPool, fetch: F, url: &str, abort: Arc, path: EndpointPath, errors: Errors, installer: H, ) -> Box + Send> { // Start fetching the content let pool2 = pool.clone(); let future = fetch.fetch(url, abort.into()).then(move |result| { trace!(target: "dapps", "Fetching content finished. Starting validation: {:?}", result); Ok(match result { Ok(response) => match installer.validate_and_install(response) { Ok(ValidatorResponse::Local(endpoint)) => { trace!(target: "dapps", "Validation OK. Returning response."); let response = endpoint.to_response(&path); FetchState::Done(endpoint, response) }, Ok(ValidatorResponse::Streaming(stream)) => { trace!(target: "dapps", "Validation OK. Streaming response."); let (reading, response) = stream.into_response(); pool.spawn(reading).forget(); FetchState::Streaming(response) }, Err(e) => { trace!(target: "dapps", "Error while validating content: {:?}", e); FetchState::Error(errors.invalid_content(e)) }, }, Err(e) => { warn!(target: "dapps", "Unable to fetch content: {:?}", e); FetchState::Error(errors.download_error(e)) }, }) }); // make sure to run within fetch thread pool. Box::new(pool2.spawn(future)) } } impl Future for ContentFetcherHandler { type Item = hyper::Response; type Error = hyper::Error; fn poll(&mut self) -> futures::Poll { loop { trace!(target: "dapps", "Polling status: {:?}", self.status); self.status = match mem::replace(&mut self.status, FetchState::Empty) { FetchState::Error(error) => { return Ok(futures::Async::Ready(error.into())); }, FetchState::Streaming(response) => { return Ok(futures::Async::Ready(response)); }, any => any, }; let status = match self.status { // Request may time out FetchState::InProgress(_) if self.fetch_control.is_deadline_reached() => { trace!(target: "dapps", "Fetching dapp failed because of timeout."); FetchState::Error(self.errors.timeout_error()) }, FetchState::InProgress(ref mut receiver) => { // Check if there is a response trace!(target: "dapps", "Polling streaming response."); try_ready!(receiver.poll().map_err(|err| { warn!(target: "dapps", "Error while fetching response: {:?}", err); hyper::Error::Timeout })) }, FetchState::Done(_, ref mut response) => { return response.poll() }, FetchState::Empty => panic!("Future polled twice."), _ => unreachable!(), }; trace!(target: "dapps", "New status: {:?}", status); self.fetch_control.set_status(&status); self.status = status; } } }