Get rid of 'Dapp is being downloaded' page
This commit is contained in:
@@ -16,17 +16,18 @@
|
||||
|
||||
//! Hyper Server Handler that fetches a file during a request (proxy).
|
||||
|
||||
use std::fmt;
|
||||
use std::{fs, fmt};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{mpsc, Arc};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::{Instant, Duration};
|
||||
use util::Mutex;
|
||||
|
||||
use hyper::{header, server, Decoder, Encoder, Next, Method, Control};
|
||||
use hyper::{server, Decoder, Encoder, Next, Method, Control};
|
||||
use hyper::net::HttpStream;
|
||||
use hyper::status::StatusCode;
|
||||
|
||||
use handlers::ContentHandler;
|
||||
use handlers::{ContentHandler, Redirection};
|
||||
use handlers::client::{Client, FetchResult};
|
||||
use apps::redirection_address;
|
||||
use apps::urlhint::GithubApp;
|
||||
@@ -36,12 +37,9 @@ const FETCH_TIMEOUT: u64 = 30;
|
||||
|
||||
enum FetchState {
|
||||
NotStarted(GithubApp),
|
||||
InProgress(mpsc::Receiver<FetchResult>),
|
||||
Error(ContentHandler),
|
||||
InProgress {
|
||||
deadline: Instant,
|
||||
receiver: mpsc::Receiver<FetchResult>,
|
||||
},
|
||||
Done(Manifest),
|
||||
Done(Manifest, Redirection),
|
||||
}
|
||||
|
||||
pub trait ContentValidator {
|
||||
@@ -51,10 +49,93 @@ pub trait ContentValidator {
|
||||
fn done(&self, Option<&Manifest>);
|
||||
}
|
||||
|
||||
pub struct ContentFetcherHandler<H: ContentValidator> {
|
||||
pub struct FetchControl {
|
||||
abort: Arc<AtomicBool>,
|
||||
control: Option<Control>,
|
||||
listeners: Mutex<Vec<(Control, mpsc::Sender<FetchState>)>>,
|
||||
deadline: Instant,
|
||||
}
|
||||
|
||||
impl Default for FetchControl {
|
||||
fn default() -> Self {
|
||||
FetchControl {
|
||||
abort: Arc::new(AtomicBool::new(false)),
|
||||
listeners: Mutex::new(Vec::new()),
|
||||
deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FetchControl {
|
||||
fn notify<F: Fn() -> FetchState>(&self, status: F) {
|
||||
let mut listeners = self.listeners.lock();
|
||||
for (control, sender) in listeners.drain(..) {
|
||||
if let Err(e) = sender.send(status()) {
|
||||
trace!(target: "dapps", "Waiting listener notification failed: {:?}", e);
|
||||
} else {
|
||||
let _ = control.ready(Next::read());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn set_status(&self, status: &FetchState) {
|
||||
match *status {
|
||||
FetchState::Error(ref handler) => self.notify(|| FetchState::Error(handler.clone())),
|
||||
FetchState::Done(ref manifest, ref handler) => self.notify(|| FetchState::Done(manifest.clone(), handler.clone())),
|
||||
FetchState::NotStarted(_) | FetchState::InProgress(_) => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn abort(&self) {
|
||||
self.abort.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn to_handler(&self, control: Control) -> Box<server::Handler<HttpStream> + Send> {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
self.listeners.lock().push((control, tx));
|
||||
|
||||
Box::new(WaitingHandler {
|
||||
receiver: rx,
|
||||
state: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WaitingHandler {
|
||||
receiver: mpsc::Receiver<FetchState>,
|
||||
state: Option<FetchState>,
|
||||
}
|
||||
|
||||
impl server::Handler<HttpStream> for WaitingHandler {
|
||||
fn on_request(&mut self, _request: server::Request<HttpStream>) -> Next {
|
||||
Next::wait()
|
||||
}
|
||||
|
||||
fn on_request_readable(&mut self, _decoder: &mut Decoder<HttpStream>) -> Next {
|
||||
self.state = self.receiver.try_recv().ok();
|
||||
Next::write()
|
||||
}
|
||||
|
||||
fn on_response(&mut self, res: &mut server::Response) -> Next {
|
||||
match self.state {
|
||||
Some(FetchState::Done(_, ref mut handler)) => handler.on_response(res),
|
||||
Some(FetchState::Error(ref mut handler)) => handler.on_response(res),
|
||||
_ => Next::end(),
|
||||
}
|
||||
}
|
||||
|
||||
fn on_response_writable(&mut self, encoder: &mut Encoder<HttpStream>) -> Next {
|
||||
match self.state {
|
||||
Some(FetchState::Done(_, ref mut handler)) => handler.on_response_writable(encoder),
|
||||
Some(FetchState::Error(ref mut handler)) => handler.on_response_writable(encoder),
|
||||
_ => Next::end(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ContentFetcherHandler<H: ContentValidator> {
|
||||
fetch_control: Arc<FetchControl>,
|
||||
status: FetchState,
|
||||
control: Option<Control>,
|
||||
client: Option<Client>,
|
||||
using_dapps_domains: bool,
|
||||
dapp: H,
|
||||
@@ -63,7 +144,7 @@ pub struct ContentFetcherHandler<H: ContentValidator> {
|
||||
impl<H: ContentValidator> Drop for ContentFetcherHandler<H> {
|
||||
fn drop(&mut self) {
|
||||
let manifest = match self.status {
|
||||
FetchState::Done(ref manifest) => Some(manifest),
|
||||
FetchState::Done(ref manifest, _) => Some(manifest),
|
||||
_ => None,
|
||||
};
|
||||
self.dapp.done(manifest);
|
||||
@@ -74,20 +155,22 @@ impl<H: ContentValidator> ContentFetcherHandler<H> {
|
||||
|
||||
pub fn new(
|
||||
app: GithubApp,
|
||||
abort: Arc<AtomicBool>,
|
||||
control: Control,
|
||||
using_dapps_domains: bool,
|
||||
handler: H) -> Self {
|
||||
handler: H) -> (Self, Arc<FetchControl>) {
|
||||
|
||||
let fetch_control = Arc::new(FetchControl::default());
|
||||
let client = Client::new();
|
||||
ContentFetcherHandler {
|
||||
abort: abort,
|
||||
let handler = ContentFetcherHandler {
|
||||
fetch_control: fetch_control.clone(),
|
||||
control: Some(control),
|
||||
client: Some(client),
|
||||
status: FetchState::NotStarted(app),
|
||||
using_dapps_domains: using_dapps_domains,
|
||||
dapp: handler,
|
||||
}
|
||||
};
|
||||
|
||||
(handler, fetch_control)
|
||||
}
|
||||
|
||||
fn close_client(client: &mut Option<Client>) {
|
||||
@@ -96,13 +179,13 @@ impl<H: ContentValidator> ContentFetcherHandler<H> {
|
||||
.close();
|
||||
}
|
||||
|
||||
|
||||
fn fetch_app(client: &mut Client, app: &GithubApp, abort: Arc<AtomicBool>, control: Control) -> Result<mpsc::Receiver<FetchResult>, String> {
|
||||
client.request(app.url(), abort, Box::new(move || {
|
||||
let res = client.request(app.url(), abort, Box::new(move || {
|
||||
trace!(target: "dapps", "Fetching finished.");
|
||||
// Ignoring control errors
|
||||
let _ = control.ready(Next::read());
|
||||
})).map_err(|e| format!("{:?}", e))
|
||||
})).map_err(|e| format!("{:?}", e));
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,12 +198,9 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
trace!(target: "dapps", "Fetching dapp: {:?}", app);
|
||||
let control = self.control.take().expect("on_request is called only once, thus control is always Some");
|
||||
let client = self.client.as_mut().expect("on_request is called before client is closed.");
|
||||
let fetch = Self::fetch_app(client, app, self.abort.clone(), control);
|
||||
let fetch = Self::fetch_app(client, app, self.fetch_control.abort.clone(), control);
|
||||
match fetch {
|
||||
Ok(receiver) => FetchState::InProgress {
|
||||
deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT),
|
||||
receiver: receiver,
|
||||
},
|
||||
Ok(receiver) => FetchState::InProgress(receiver),
|
||||
Err(e) => FetchState::Error(ContentHandler::error(
|
||||
StatusCode::BadGateway,
|
||||
"Unable To Start Dapp Download",
|
||||
@@ -140,6 +220,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
} else { None };
|
||||
|
||||
if let Some(status) = status {
|
||||
self.fetch_control.set_status(&status);
|
||||
self.status = status;
|
||||
}
|
||||
|
||||
@@ -149,7 +230,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
fn on_request_readable(&mut self, decoder: &mut Decoder<HttpStream>) -> Next {
|
||||
let (status, next) = match self.status {
|
||||
// Request may time out
|
||||
FetchState::InProgress { ref deadline, .. } if *deadline < Instant::now() => {
|
||||
FetchState::InProgress(_) if self.fetch_control.deadline < Instant::now() => {
|
||||
trace!(target: "dapps", "Fetching dapp failed because of timeout.");
|
||||
let timeout = ContentHandler::error(
|
||||
StatusCode::GatewayTimeout,
|
||||
@@ -160,7 +241,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
Self::close_client(&mut self.client);
|
||||
(Some(FetchState::Error(timeout)), Next::write())
|
||||
},
|
||||
FetchState::InProgress { ref receiver, .. } => {
|
||||
FetchState::InProgress(ref receiver) => {
|
||||
// Check if there is an answer
|
||||
let rec = receiver.try_recv();
|
||||
match rec {
|
||||
@@ -179,11 +260,13 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
Some(&format!("{:?}", e))
|
||||
))
|
||||
},
|
||||
Ok(manifest) => FetchState::Done(manifest)
|
||||
Ok(manifest) => {
|
||||
let address = redirection_address(self.using_dapps_domains, &manifest.id);
|
||||
FetchState::Done(manifest, Redirection::new(&address))
|
||||
},
|
||||
};
|
||||
// Remove temporary zip file
|
||||
// TODO [todr] Uncomment me
|
||||
// let _ = fs::remove_file(path);
|
||||
let _ = fs::remove_file(path);
|
||||
(Some(state), Next::write())
|
||||
},
|
||||
Ok(Err(e)) => {
|
||||
@@ -205,6 +288,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
};
|
||||
|
||||
if let Some(status) = status {
|
||||
self.fetch_control.set_status(&status);
|
||||
self.status = status;
|
||||
}
|
||||
|
||||
@@ -213,12 +297,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
|
||||
fn on_response(&mut self, res: &mut server::Response) -> Next {
|
||||
match self.status {
|
||||
FetchState::Done(ref manifest) => {
|
||||
trace!(target: "dapps", "Fetching dapp finished. Redirecting to {}", manifest.id);
|
||||
res.set_status(StatusCode::Found);
|
||||
res.headers_mut().set(header::Location(redirection_address(self.using_dapps_domains, &manifest.id)));
|
||||
Next::write()
|
||||
},
|
||||
FetchState::Done(_, ref mut handler) => handler.on_response(res),
|
||||
FetchState::Error(ref mut handler) => handler.on_response(res),
|
||||
_ => Next::end(),
|
||||
}
|
||||
@@ -226,9 +305,9 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
|
||||
fn on_response_writable(&mut self, encoder: &mut Encoder<HttpStream>) -> Next {
|
||||
match self.status {
|
||||
FetchState::Done(_, ref mut handler) => handler.on_response_writable(encoder),
|
||||
FetchState::Error(ref mut handler) => handler.on_response_writable(encoder),
|
||||
_ => Next::end(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user