diff --git a/dapps/src/apps/cache.rs b/dapps/src/apps/cache.rs index b5acbcb15..be9521cf9 100644 --- a/dapps/src/apps/cache.rs +++ b/dapps/src/apps/cache.rs @@ -18,13 +18,13 @@ use std::fs; use std::sync::{Arc}; -use std::sync::atomic::{AtomicBool, Ordering}; use linked_hash_map::LinkedHashMap; use page::LocalPageEndpoint; +use handlers::FetchControl; pub enum ContentStatus { - Fetching(Arc), + Fetching(Arc), Ready(LocalPageEndpoint), } @@ -57,10 +57,10 @@ impl ContentCache { while len > expected_size { let entry = self.cache.pop_front().unwrap(); match entry.1 { - ContentStatus::Fetching(ref abort) => { + ContentStatus::Fetching(ref fetch) => { trace!(target: "dapps", "Aborting {} because of limit.", entry.0); // Mark as aborted - abort.store(true, Ordering::SeqCst); + fetch.abort() }, ContentStatus::Ready(ref endpoint) => { trace!(target: "dapps", "Removing {} because of limit.", entry.0); diff --git a/dapps/src/apps/fetcher.rs b/dapps/src/apps/fetcher.rs index 8c0e7c421..8702e4706 100644 --- a/dapps/src/apps/fetcher.rs +++ b/dapps/src/apps/fetcher.rs @@ -23,7 +23,6 @@ use std::{fs, env, fmt}; use std::io::{self, Read, Write}; use std::path::PathBuf; use std::sync::Arc; -use std::sync::atomic::{AtomicBool}; use rustc_serialize::hex::FromHex; use hyper; @@ -76,10 +75,12 @@ impl ContentFetcher { } pub fn contains(&self, content_id: &str) -> bool { - let mut cache = self.cache.lock(); - // Check if we already have the app - if cache.get(content_id).is_some() { - return true; + { + let mut cache = self.cache.lock(); + // Check if we already have the app + if cache.get(content_id).is_some() { + return true; + } } // fallback to resolver if let Ok(content_id) = content_id.from_hex() { @@ -115,49 +116,60 @@ impl ContentFetcher { (None, endpoint.to_async_handler(path, control)) }, // App is already being fetched - Some(&mut ContentStatus::Fetching(_)) => { - (None, Box::new(ContentHandler::error_with_refresh( - StatusCode::ServiceUnavailable, - "Download In Progress", - "This dapp is already being downloaded. Please wait...", - None, - )) as Box) + Some(&mut ContentStatus::Fetching(ref fetch_control)) => { + trace!(target: "dapps", "Content fetching in progress. Waiting..."); + (None, fetch_control.to_handler(control)) }, // We need to start fetching app None => { + trace!(target: "dapps", "Content unavailable. Fetching..."); let content_hex = content_id.from_hex().expect("to_handler is called only when `contains` returns true."); let content = self.resolver.resolve(content_hex); - let abort = Arc::new(AtomicBool::new(false)); + + let cache = self.cache.clone(); + let on_done = move |id: String, result: Option| { + let mut cache = cache.lock(); + match result { + Some(endpoint) => { + cache.insert(id, ContentStatus::Ready(endpoint)); + }, + // In case of error + None => { + cache.remove(&id); + }, + } + }; match content { - Some(URLHintResult::Dapp(dapp)) => ( - Some(ContentStatus::Fetching(abort.clone())), - Box::new(ContentFetcherHandler::new( + Some(URLHintResult::Dapp(dapp)) => { + let (handler, fetch_control) = ContentFetcherHandler::new( dapp.url(), - abort, control, path.using_dapps_domains, DappInstaller { id: content_id.clone(), dapps_path: self.dapps_path.clone(), - cache: self.cache.clone(), - })) as Box - ), - Some(URLHintResult::Content(content)) => ( - Some(ContentStatus::Fetching(abort.clone())), - Box::new(ContentFetcherHandler::new( + on_done: Box::new(on_done), + } + ); + + (Some(ContentStatus::Fetching(fetch_control)), Box::new(handler) as Box) + }, + Some(URLHintResult::Content(content)) => { + let (handler, fetch_control) = ContentFetcherHandler::new( content.url, - abort, control, path.using_dapps_domains, ContentInstaller { id: content_id.clone(), mime: content.mime, content_path: self.dapps_path.clone(), - cache: self.cache.clone(), + on_done: Box::new(on_done), } - )) as Box, - ), + ); + + (Some(ContentStatus::Fetching(fetch_control)), Box::new(handler) as Box) + }, None => { // This may happen when sync status changes in between // `contains` and `to_handler` @@ -225,14 +237,13 @@ struct ContentInstaller { id: String, mime: String, content_path: PathBuf, - cache: Arc>, + on_done: Box) + Send>, } impl ContentValidator for ContentInstaller { type Error = ValidationError; - type Result = PathBuf; - fn validate_and_install(&self, path: PathBuf) -> Result<(String, PathBuf), ValidationError> { + fn validate_and_install(&self, path: PathBuf) -> Result<(String, LocalPageEndpoint), ValidationError> { // Create dir try!(fs::create_dir_all(&self.content_path)); @@ -247,21 +258,11 @@ impl ContentValidator for ContentInstaller { try!(fs::copy(&path, &content_path)); - Ok((self.id.clone(), content_path)) + Ok((self.id.clone(), LocalPageEndpoint::single_file(content_path, self.mime.clone()))) } - fn done(&self, result: Option<&PathBuf>) { - let mut cache = self.cache.lock(); - match result { - Some(result) => { - let page = LocalPageEndpoint::single_file(result.clone(), self.mime.clone()); - cache.insert(self.id.clone(), ContentStatus::Ready(page)); - }, - // In case of error - None => { - cache.remove(&self.id); - }, - } + fn done(&self, endpoint: Option) { + (self.on_done)(self.id.clone(), endpoint) } } @@ -269,7 +270,7 @@ impl ContentValidator for ContentInstaller { struct DappInstaller { id: String, dapps_path: PathBuf, - cache: Arc>, + on_done: Box) + Send>, } impl DappInstaller { @@ -306,9 +307,8 @@ impl DappInstaller { impl ContentValidator for DappInstaller { type Error = ValidationError; - type Result = Manifest; - fn validate_and_install(&self, app_path: PathBuf) -> Result<(String, Manifest), ValidationError> { + fn validate_and_install(&self, app_path: PathBuf) -> Result<(String, LocalPageEndpoint), ValidationError> { trace!(target: "dapps", "Opening dapp bundle at {:?}", app_path); let mut file_reader = io::BufReader::new(try!(fs::File::open(app_path))); let hash = try!(sha3(&mut file_reader)); @@ -362,23 +362,15 @@ impl ContentValidator for DappInstaller { let mut manifest_file = try!(fs::File::create(manifest_path)); try!(manifest_file.write_all(manifest_str.as_bytes())); + // Create endpoint + let app = LocalPageEndpoint::new(target, manifest.clone().into()); + // Return modified app manifest - Ok((manifest.id.clone(), manifest)) + Ok((manifest.id.clone(), app)) } - fn done(&self, manifest: Option<&Manifest>) { - let mut cache = self.cache.lock(); - match manifest { - Some(manifest) => { - let path = self.dapp_target_path(manifest); - let app = LocalPageEndpoint::new(path, manifest.clone().into()); - cache.insert(self.id.clone(), ContentStatus::Ready(app)); - }, - // In case of error - None => { - cache.remove(&self.id); - }, - } + fn done(&self, endpoint: Option) { + (self.on_done)(self.id.clone(), endpoint) } } diff --git a/dapps/src/error_tpl.html b/dapps/src/error_tpl.html index 6551431a6..c6b4db0e7 100644 --- a/dapps/src/error_tpl.html +++ b/dapps/src/error_tpl.html @@ -3,7 +3,6 @@ - {meta} {title} diff --git a/dapps/src/handlers/content.rs b/dapps/src/handlers/content.rs index f283fbb6a..4dc011475 100644 --- a/dapps/src/handlers/content.rs +++ b/dapps/src/handlers/content.rs @@ -23,6 +23,7 @@ use hyper::status::StatusCode; use util::version; +#[derive(Clone)] pub struct ContentHandler { code: StatusCode, content: String, @@ -57,18 +58,6 @@ impl ContentHandler { Self::html(code, format!( include_str!("../error_tpl.html"), title=title, - meta="", - message=message, - details=details.unwrap_or_else(|| ""), - version=version(), - )) - } - - pub fn error_with_refresh(code: StatusCode, title: &str, message: &str, details: Option<&str>) -> Self { - Self::html(code, format!( - include_str!("../error_tpl.html"), - title=title, - meta="", message=message, details=details.unwrap_or_else(|| ""), version=version(), diff --git a/dapps/src/handlers/fetch.rs b/dapps/src/handlers/fetch.rs index 790bf4710..c463d3710 100644 --- a/dapps/src/handlers/fetch.rs +++ b/dapps/src/handlers/fetch.rs @@ -19,41 +19,122 @@ use std::{fs, fmt}; use std::path::PathBuf; use std::sync::{mpsc, Arc}; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Instant, Duration}; +use util::Mutex; -use hyper::{header, server, Decoder, Encoder, Next, Method, Control}; +use hyper::{server, Decoder, Encoder, Next, Method, Control}; use hyper::net::HttpStream; use hyper::status::StatusCode; -use handlers::ContentHandler; +use handlers::{ContentHandler, Redirection}; use handlers::client::{Client, FetchResult}; use apps::redirection_address; +use page::LocalPageEndpoint; const FETCH_TIMEOUT: u64 = 30; -enum FetchState { +enum FetchState { NotStarted(String), Error(ContentHandler), - InProgress { - deadline: Instant, - receiver: mpsc::Receiver, - }, - Done((String, T)), + InProgress(mpsc::Receiver), + Done(String, LocalPageEndpoint, Redirection), } pub trait ContentValidator { type Error: fmt::Debug + fmt::Display; - type Result: fmt::Debug; - fn validate_and_install(&self, app: PathBuf) -> Result<(String, Self::Result), Self::Error>; - fn done(&self, Option<&Self::Result>); + fn validate_and_install(&self, app: PathBuf) -> Result<(String, LocalPageEndpoint), Self::Error>; + fn done(&self, Option); +} + +pub struct FetchControl { + abort: Arc, + listeners: Mutex)>>, + deadline: Instant, +} + +impl Default for FetchControl { + fn default() -> Self { + FetchControl { + abort: Arc::new(AtomicBool::new(false)), + listeners: Mutex::new(Vec::new()), + deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT), + } + } +} + +impl FetchControl { + fn notify FetchState>(&self, status: F) { + let mut listeners = self.listeners.lock(); + for (control, sender) in listeners.drain(..) { + if let Err(e) = sender.send(status()) { + trace!(target: "dapps", "Waiting listener notification failed: {:?}", e); + } else { + let _ = control.ready(Next::read()); + } + } + } + + fn set_status(&self, status: &FetchState) { + match *status { + FetchState::Error(ref handler) => self.notify(|| FetchState::Error(handler.clone())), + FetchState::Done(ref id, ref endpoint, ref handler) => self.notify(|| FetchState::Done(id.clone(), endpoint.clone(), handler.clone())), + FetchState::NotStarted(_) | FetchState::InProgress(_) => {}, + } + } + + pub fn abort(&self) { + self.abort.store(true, Ordering::SeqCst); + } + + pub fn to_handler(&self, control: Control) -> Box + Send> { + let (tx, rx) = mpsc::channel(); + self.listeners.lock().push((control, tx)); + + Box::new(WaitingHandler { + receiver: rx, + state: None, + }) + } +} + +pub struct WaitingHandler { + receiver: mpsc::Receiver, + state: Option, +} + +impl server::Handler for WaitingHandler { + fn on_request(&mut self, _request: server::Request) -> Next { + Next::wait() + } + + fn on_request_readable(&mut self, _decoder: &mut Decoder) -> Next { + self.state = self.receiver.try_recv().ok(); + Next::write() + } + + fn on_response(&mut self, res: &mut server::Response) -> Next { + match self.state { + Some(FetchState::Done(_, _, ref mut handler)) => handler.on_response(res), + Some(FetchState::Error(ref mut handler)) => handler.on_response(res), + _ => Next::end(), + } + } + + fn on_response_writable(&mut self, encoder: &mut Encoder) -> Next { + match self.state { + Some(FetchState::Done(_, _, ref mut handler)) => handler.on_response_writable(encoder), + Some(FetchState::Error(ref mut handler)) => handler.on_response_writable(encoder), + _ => Next::end(), + } + } } pub struct ContentFetcherHandler { - abort: Arc, + fetch_control: Arc, control: Option, - status: FetchState, + status: FetchState, client: Option, using_dapps_domains: bool, installer: H, @@ -62,7 +143,7 @@ pub struct ContentFetcherHandler { impl Drop for ContentFetcherHandler { fn drop(&mut self) { let result = match self.status { - FetchState::Done((_, ref result)) => Some(result), + FetchState::Done(_, ref result, _) => Some(result.clone()), _ => None, }; self.installer.done(result); @@ -73,20 +154,22 @@ impl ContentFetcherHandler { pub fn new( url: String, - abort: Arc, control: Control, using_dapps_domains: bool, - handler: H) -> Self { + handler: H) -> (Self, Arc) { + let fetch_control = Arc::new(FetchControl::default()); let client = Client::new(); - ContentFetcherHandler { - abort: abort, + let handler = ContentFetcherHandler { + fetch_control: fetch_control.clone(), control: Some(control), client: Some(client), status: FetchState::NotStarted(url), using_dapps_domains: using_dapps_domains, installer: handler, - } + }; + + (handler, fetch_control) } fn close_client(client: &mut Option) { @@ -95,7 +178,6 @@ impl ContentFetcherHandler { .close(); } - fn fetch_content(client: &mut Client, url: &str, abort: Arc, control: Control) -> Result, String> { client.request(url, abort, Box::new(move || { trace!(target: "dapps", "Fetching finished."); @@ -114,12 +196,9 @@ impl server::Handler for ContentFetcherHandler< trace!(target: "dapps", "Fetching content from: {:?}", url); let control = self.control.take().expect("on_request is called only once, thus control is always Some"); let client = self.client.as_mut().expect("on_request is called before client is closed."); - let fetch = Self::fetch_content(client, url, self.abort.clone(), control); + let fetch = Self::fetch_content(client, url, self.fetch_control.abort.clone(), control); match fetch { - Ok(receiver) => FetchState::InProgress { - deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT), - receiver: receiver, - }, + Ok(receiver) => FetchState::InProgress(receiver), Err(e) => FetchState::Error(ContentHandler::error( StatusCode::BadGateway, "Unable To Start Dapp Download", @@ -139,6 +218,7 @@ impl server::Handler for ContentFetcherHandler< } else { None }; if let Some(status) = status { + self.fetch_control.set_status(&status); self.status = status; } @@ -148,7 +228,7 @@ impl server::Handler for ContentFetcherHandler< fn on_request_readable(&mut self, decoder: &mut Decoder) -> Next { let (status, next) = match self.status { // Request may time out - FetchState::InProgress { ref deadline, .. } if *deadline < Instant::now() => { + FetchState::InProgress(_) if self.fetch_control.deadline < Instant::now() => { trace!(target: "dapps", "Fetching dapp failed because of timeout."); let timeout = ContentHandler::error( StatusCode::GatewayTimeout, @@ -159,7 +239,7 @@ impl server::Handler for ContentFetcherHandler< Self::close_client(&mut self.client); (Some(FetchState::Error(timeout)), Next::write()) }, - FetchState::InProgress { ref receiver, .. } => { + FetchState::InProgress(ref receiver) => { // Check if there is an answer let rec = receiver.try_recv(); match rec { @@ -178,7 +258,10 @@ impl server::Handler for ContentFetcherHandler< Some(&format!("{:?}", e)) )) }, - Ok(result) => FetchState::Done(result) + Ok((id, result)) => { + let address = redirection_address(self.using_dapps_domains, &id); + FetchState::Done(id, result, Redirection::new(&address)) + }, }; // Remove temporary zip file let _ = fs::remove_file(path); @@ -203,6 +286,7 @@ impl server::Handler for ContentFetcherHandler< }; if let Some(status) = status { + self.fetch_control.set_status(&status); self.status = status; } @@ -211,12 +295,7 @@ impl server::Handler for ContentFetcherHandler< fn on_response(&mut self, res: &mut server::Response) -> Next { match self.status { - FetchState::Done((ref id, _)) => { - trace!(target: "dapps", "Fetching content finished. Redirecting to {}", id); - res.set_status(StatusCode::Found); - res.headers_mut().set(header::Location(redirection_address(self.using_dapps_domains, id))); - Next::write() - }, + FetchState::Done(_, _, ref mut handler) => handler.on_response(res), FetchState::Error(ref mut handler) => handler.on_response(res), _ => Next::end(), } @@ -224,9 +303,9 @@ impl server::Handler for ContentFetcherHandler< fn on_response_writable(&mut self, encoder: &mut Encoder) -> Next { match self.status { + FetchState::Done(_, _, ref mut handler) => handler.on_response_writable(encoder), FetchState::Error(ref mut handler) => handler.on_response_writable(encoder), _ => Next::end(), } } } - diff --git a/dapps/src/handlers/mod.rs b/dapps/src/handlers/mod.rs index 6f6423b58..62b13eaa8 100644 --- a/dapps/src/handlers/mod.rs +++ b/dapps/src/handlers/mod.rs @@ -27,7 +27,7 @@ pub use self::auth::AuthRequiredHandler; pub use self::echo::EchoHandler; pub use self::content::ContentHandler; pub use self::redirect::Redirection; -pub use self::fetch::{ContentFetcherHandler, ContentValidator}; +pub use self::fetch::{ContentFetcherHandler, ContentValidator, FetchControl}; use url::Url; use hyper::{server, header, net, uri}; diff --git a/dapps/src/handlers/redirect.rs b/dapps/src/handlers/redirect.rs index 8b6158266..e43d32e24 100644 --- a/dapps/src/handlers/redirect.rs +++ b/dapps/src/handlers/redirect.rs @@ -20,15 +20,20 @@ use hyper::{header, server, Decoder, Encoder, Next}; use hyper::net::HttpStream; use hyper::status::StatusCode; +#[derive(Clone)] pub struct Redirection { to_url: String } impl Redirection { - pub fn new(url: &str) -> Box { - Box::new(Redirection { + pub fn new(url: &str) -> Self { + Redirection { to_url: url.to_owned() - }) + } + } + + pub fn boxed(url: &str) -> Box { + Box::new(Self::new(url)) } } diff --git a/dapps/src/page/local.rs b/dapps/src/page/local.rs index 10b6f08b1..e34cc6434 100644 --- a/dapps/src/page/local.rs +++ b/dapps/src/page/local.rs @@ -21,6 +21,7 @@ use std::path::{Path, PathBuf}; use page::handler; use endpoint::{Endpoint, EndpointInfo, EndpointPath, Handler}; +#[derive(Debug, Clone)] pub struct LocalPageEndpoint { path: PathBuf, mime: Option, diff --git a/dapps/src/router/mod.rs b/dapps/src/router/mod.rs index b908203d6..f54d6bf3d 100644 --- a/dapps/src/router/mod.rs +++ b/dapps/src/router/mod.rs @@ -104,7 +104,7 @@ impl server::Handler for Router { // Redirect any GET request to home. _ if *req.method() == hyper::method::Method::Get => { let address = apps::redirection_address(false, self.main_page); - Redirection::new(address.as_str()) + Redirection::boxed(address.as_str()) }, // RPC by default _ => {