Web Based Dapps (#3956)

* Dapps web

Conflicts:
	dapps/src/apps/fetcher.rs
	dapps/src/handlers/fetch.rs

* Rewriting fetch

* Parity-wide fetch service

* Obey the limits and support cancellation.

* Removing temporary files.

* Actually use Fetch for dapps

* Re-implementing file fetching to avoid temporary files.

* Serde to 0.8.19

* Fixing content & dapps fetch

* Serving web content and injecting scripts

* Don't wait for old content, start a new download

* Supporting timeouts and query

* Simple GUI for the browser

* Proxy tokens validation

* Recovering from invalid web-based requests

* Remember last visisted URL

* Removing unused variables

* Addressing review comments

* Setting default account in web3

* Adding WebBrowser dapp to the list

* Actually prune old entries when generating new token
This commit is contained in:
Tomasz Drwięga
2016-12-27 11:15:02 +01:00
committed by Gav Wood
parent 6842d43491
commit c7c309d152
34 changed files with 933 additions and 167 deletions

View File

@@ -17,14 +17,13 @@
//! Fetchable Dapps support.
use std::fs;
use std::sync::{Arc};
use linked_hash_map::LinkedHashMap;
use page::LocalPageEndpoint;
use handlers::FetchControl;
pub enum ContentStatus {
Fetching(Arc<FetchControl>),
Fetching(FetchControl),
Ready(LocalPageEndpoint),
}

View File

@@ -23,7 +23,7 @@ use util::H256;
use util::sha3::sha3;
use page::{LocalPageEndpoint, PageCache};
use handlers::ContentValidator;
use handlers::{ContentValidator, ValidatorResponse};
use apps::manifest::{MANIFEST_FILENAME, deserialize_manifest, serialize_manifest, Manifest};
type OnDone = Box<Fn(Option<LocalPageEndpoint>) + Send>;
@@ -90,7 +90,7 @@ impl Content {
impl ContentValidator for Content {
type Error = ValidationError;
fn validate_and_install(&self, response: fetch::Response) -> Result<LocalPageEndpoint, ValidationError> {
fn validate_and_install(&self, response: fetch::Response) -> Result<ValidatorResponse, ValidationError> {
let validate = |content_path: PathBuf| {
// Create dir
let (_, content_path) = try!(write_response_and_check_hash(self.id.as_str(), content_path.clone(), self.id.as_str(), response));
@@ -108,7 +108,7 @@ impl ContentValidator for Content {
let _ = fs::remove_dir_all(&content_path);
}
(self.on_done)(result.as_ref().ok().cloned());
result
result.map(ValidatorResponse::Local)
}
}
@@ -157,7 +157,7 @@ impl Dapp {
impl ContentValidator for Dapp {
type Error = ValidationError;
fn validate_and_install(&self, response: fetch::Response) -> Result<LocalPageEndpoint, ValidationError> {
fn validate_and_install(&self, response: fetch::Response) -> Result<ValidatorResponse, ValidationError> {
let validate = |dapp_path: PathBuf| {
let (file, zip_path) = try!(write_response_and_check_hash(self.id.as_str(), dapp_path.clone(), &format!("{}.zip", self.id), response));
trace!(target: "dapps", "Opening dapp bundle at {:?}", zip_path);
@@ -211,7 +211,7 @@ impl ContentValidator for Dapp {
let _ = fs::remove_dir_all(&target);
}
(self.on_done)(result.as_ref().ok().cloned());
result
result.map(ValidatorResponse::Local)
}
}

View File

@@ -129,12 +129,12 @@ impl<R: URLHint + Send + Sync + 'static, F: Fetch> Fetcher for ContentFetcher<F,
(None, endpoint.to_async_handler(path, control))
},
// Content is already being fetched
Some(&mut ContentStatus::Fetching(ref fetch_control)) => {
Some(&mut ContentStatus::Fetching(ref fetch_control)) if !fetch_control.is_deadline_reached() => {
trace!(target: "dapps", "Content fetching in progress. Waiting...");
(None, fetch_control.to_async_handler(path, control))
},
// We need to start fetching the content
None => {
_ => {
trace!(target: "dapps", "Content unavailable. Fetching... {:?}", content_id);
let content_hex = content_id.from_hex().expect("to_handler is called only when `contains` returns true.");
let content = self.resolver.resolve(content_hex);
@@ -156,7 +156,7 @@ impl<R: URLHint + Send + Sync + 'static, F: Fetch> Fetcher for ContentFetcher<F,
(None, Self::still_syncing(self.embeddable_on.clone()))
},
Some(URLHintResult::Dapp(dapp)) => {
let (handler, fetch_control) = ContentFetcherHandler::new(
let handler = ContentFetcherHandler::new(
dapp.url(),
path,
control,
@@ -171,10 +171,10 @@ impl<R: URLHint + Send + Sync + 'static, F: Fetch> Fetcher for ContentFetcher<F,
self.fetch.clone(),
);
(Some(ContentStatus::Fetching(fetch_control)), Box::new(handler) as Box<Handler>)
(Some(ContentStatus::Fetching(handler.fetch_control())), Box::new(handler) as Box<Handler>)
},
Some(URLHintResult::Content(content)) => {
let (handler, fetch_control) = ContentFetcherHandler::new(
let handler = ContentFetcherHandler::new(
content.url,
path,
control,
@@ -189,7 +189,7 @@ impl<R: URLHint + Send + Sync + 'static, F: Fetch> Fetcher for ContentFetcher<F,
self.fetch.clone(),
);
(Some(ContentStatus::Fetching(fetch_control)), Box::new(handler) as Box<Handler>)
(Some(ContentStatus::Fetching(handler.fetch_control())), Box::new(handler) as Box<Handler>)
},
None if self.sync.is_major_importing() => {
(None, Self::still_syncing(self.embeddable_on.clone()))

View File

@@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use endpoint::{Endpoints, Endpoint};
use page::PageEndpoint;
use proxypac::ProxyPac;
@@ -21,6 +22,7 @@ use web::Web;
use fetch::Fetch;
use parity_dapps::WebApp;
use parity_reactor::Remote;
use {WebProxyTokens};
mod cache;
mod fs;
@@ -40,14 +42,20 @@ pub fn utils() -> Box<Endpoint> {
Box::new(PageEndpoint::with_prefix(parity_ui::App::default(), UTILS_PATH.to_owned()))
}
pub fn all_endpoints<F: Fetch>(dapps_path: String, signer_address: Option<(String, u16)>, remote: Remote, fetch: F) -> Endpoints {
pub fn all_endpoints<F: Fetch>(
dapps_path: String,
signer_address: Option<(String, u16)>,
web_proxy_tokens: Arc<WebProxyTokens>,
remote: Remote,
fetch: F,
) -> Endpoints {
// fetch fs dapps at first to avoid overwriting builtins
let mut pages = fs::local_endpoints(dapps_path, signer_address.clone());
// NOTE [ToDr] Dapps will be currently embeded on 8180
insert::<parity_ui::App>(&mut pages, "ui", Embeddable::Yes(signer_address.clone()));
pages.insert("proxy".into(), ProxyPac::boxed(signer_address));
pages.insert(WEB_PATH.into(), Web::boxed(remote, fetch));
pages.insert("proxy".into(), ProxyPac::boxed(signer_address.clone()));
pages.insert(WEB_PATH.into(), Web::boxed(signer_address.clone(), web_proxy_tokens.clone(), remote.clone(), fetch.clone()));
pages
}

View File

@@ -31,33 +31,41 @@ use hyper::uri::RequestUri;
use hyper::status::StatusCode;
use endpoint::EndpointPath;
use handlers::ContentHandler;
use handlers::{ContentHandler, StreamingHandler};
use page::{LocalPageEndpoint, PageHandlerWaiting};
const FETCH_TIMEOUT: u64 = 30;
pub enum ValidatorResponse {
Local(LocalPageEndpoint),
Streaming(StreamingHandler<fetch::Response>),
}
pub trait ContentValidator: Send + 'static {
type Error: fmt::Debug + fmt::Display;
fn validate_and_install(&self, fetch::Response) -> Result<ValidatorResponse, Self::Error>;
}
enum FetchState {
Waiting,
NotStarted(String),
Error(ContentHandler),
InProgress(mpsc::Receiver<FetchState>),
Streaming(StreamingHandler<fetch::Response>),
Done(LocalPageEndpoint, Box<PageHandlerWaiting>),
}
enum WaitResult {
Error(ContentHandler),
Done(LocalPageEndpoint),
NonAwaitable,
}
pub trait ContentValidator: Send + 'static {
type Error: fmt::Debug + fmt::Display;
fn validate_and_install(&self, fetch::Response) -> Result<LocalPageEndpoint, Self::Error>;
}
#[derive(Clone)]
pub struct FetchControl {
abort: Arc<AtomicBool>,
listeners: Mutex<Vec<(Control, mpsc::Sender<WaitResult>)>>,
listeners: Arc<Mutex<Vec<(Control, mpsc::Sender<WaitResult>)>>>,
deadline: Instant,
}
@@ -65,7 +73,7 @@ impl Default for FetchControl {
fn default() -> Self {
FetchControl {
abort: Arc::new(AtomicBool::new(false)),
listeners: Mutex::new(Vec::new()),
listeners: Arc::new(Mutex::new(Vec::new())),
deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT),
}
}
@@ -88,10 +96,15 @@ impl FetchControl {
match *status {
FetchState::Error(ref handler) => self.notify(|| WaitResult::Error(handler.clone())),
FetchState::Done(ref endpoint, _) => self.notify(|| WaitResult::Done(endpoint.clone())),
FetchState::Streaming(_) => self.notify(|| WaitResult::NonAwaitable),
FetchState::NotStarted(_) | FetchState::InProgress(_) | FetchState::Waiting => {},
}
}
pub fn is_deadline_reached(&self) -> bool {
self.deadline < Instant::now()
}
pub fn abort(&self) {
self.abort.store(true, Ordering::SeqCst);
}
@@ -131,7 +144,7 @@ impl server::Handler<HttpStream> for WaitingHandler {
page_handler.set_uri(&self.uri);
FetchState::Done(endpoint, page_handler)
},
None => {
_ => {
warn!("A result for waiting request was not received.");
FetchState::Waiting
},
@@ -139,6 +152,7 @@ impl server::Handler<HttpStream> for WaitingHandler {
match self.state {
FetchState::Done(_, ref mut handler) => handler.on_request_readable(decoder),
FetchState::Streaming(ref mut handler) => handler.on_request_readable(decoder),
FetchState::Error(ref mut handler) => handler.on_request_readable(decoder),
_ => Next::write(),
}
@@ -147,6 +161,7 @@ impl server::Handler<HttpStream> for WaitingHandler {
fn on_response(&mut self, res: &mut server::Response) -> Next {
match self.state {
FetchState::Done(_, ref mut handler) => handler.on_response(res),
FetchState::Streaming(ref mut handler) => handler.on_response(res),
FetchState::Error(ref mut handler) => handler.on_response(res),
_ => Next::end(),
}
@@ -155,22 +170,69 @@ impl server::Handler<HttpStream> for WaitingHandler {
fn on_response_writable(&mut self, encoder: &mut Encoder<HttpStream>) -> Next {
match self.state {
FetchState::Done(_, ref mut handler) => handler.on_response_writable(encoder),
FetchState::Streaming(ref mut handler) => handler.on_response_writable(encoder),
FetchState::Error(ref mut handler) => handler.on_response_writable(encoder),
_ => Next::end(),
}
}
}
#[derive(Clone)]
struct Errors {
embeddable_on: Option<(String, u16)>,
}
impl Errors {
fn download_error<E: fmt::Debug>(&self, e: E) -> ContentHandler {
ContentHandler::error(
StatusCode::BadGateway,
"Download Error",
"There was an error when fetching the content.",
Some(&format!("{:?}", e)),
self.embeddable_on.clone(),
)
}
fn invalid_content<E: fmt::Debug>(&self, e: E) -> ContentHandler {
ContentHandler::error(
StatusCode::BadGateway,
"Invalid Dapp",
"Downloaded bundle does not contain a valid content.",
Some(&format!("{:?}", e)),
self.embeddable_on.clone(),
)
}
fn timeout_error(&self) -> ContentHandler {
ContentHandler::error(
StatusCode::GatewayTimeout,
"Download Timeout",
&format!("Could not fetch content within {} seconds.", FETCH_TIMEOUT),
None,
self.embeddable_on.clone(),
)
}
fn method_not_allowed(&self) -> ContentHandler {
ContentHandler::error(
StatusCode::MethodNotAllowed,
"Method Not Allowed",
"Only <code>GET</code> requests are allowed.",
None,
self.embeddable_on.clone(),
)
}
}
pub struct ContentFetcherHandler<H: ContentValidator, F: Fetch> {
fetch_control: Arc<FetchControl>,
fetch_control: FetchControl,
control: Control,
remote: Remote,
status: FetchState,
fetch: F,
installer: Option<H>,
path: EndpointPath,
uri: RequestUri,
embeddable_on: Option<(String, u16)>,
errors: Errors,
}
impl<H: ContentValidator, F: Fetch> ContentFetcherHandler<H, F> {
@@ -178,70 +240,64 @@ impl<H: ContentValidator, F: Fetch> ContentFetcherHandler<H, F> {
url: String,
path: EndpointPath,
control: Control,
handler: H,
installer: H,
embeddable_on: Option<(String, u16)>,
remote: Remote,
fetch: F,
) -> (Self, Arc<FetchControl>) {
let fetch_control = Arc::new(FetchControl::default());
let handler = ContentFetcherHandler {
fetch_control: fetch_control.clone(),
) -> Self {
ContentFetcherHandler {
fetch_control: FetchControl::default(),
control: control,
remote: remote,
fetch: fetch,
status: FetchState::NotStarted(url),
installer: Some(handler),
installer: Some(installer),
path: path,
uri: RequestUri::default(),
embeddable_on: embeddable_on,
};
(handler, fetch_control)
errors: Errors {
embeddable_on: embeddable_on,
},
}
}
fn fetch_content(&self, url: &str, installer: H) -> mpsc::Receiver<FetchState> {
pub fn fetch_control(&self) -> FetchControl {
self.fetch_control.clone()
}
fn fetch_content(&self, uri: RequestUri, url: &str, installer: H) -> mpsc::Receiver<FetchState> {
let (tx, rx) = mpsc::channel();
let abort = self.fetch_control.abort.clone();
let control = self.control.clone();
let embeddable_on = self.embeddable_on.clone();
let uri = self.uri.clone();
let path = self.path.clone();
let tx2 = tx.clone();
let control = self.control.clone();
let errors = self.errors.clone();
let future = self.fetch.fetch_with_abort(url, abort.into()).then(move |result| {
trace!(target: "dapps", "Fetching content finished. Starting validation: {:?}", result);
let new_state = match result {
Ok(response) => match installer.validate_and_install(response) {
Ok(endpoint) => {
Ok(ValidatorResponse::Local(endpoint)) => {
trace!(target: "dapps", "Validation OK. Returning response.");
let mut handler = endpoint.to_page_handler(path);
handler.set_uri(&uri);
FetchState::Done(endpoint, handler)
},
Ok(ValidatorResponse::Streaming(handler)) => {
trace!(target: "dapps", "Validation OK. Streaming response.");
FetchState::Streaming(handler)
},
Err(e) => {
trace!(target: "dapps", "Error while validating content: {:?}", e);
FetchState::Error(ContentHandler::error(
StatusCode::BadGateway,
"Invalid Dapp",
"Downloaded bundle does not contain a valid content.",
Some(&format!("{:?}", e)),
embeddable_on,
))
FetchState::Error(errors.invalid_content(e))
},
},
Err(e) => {
warn!(target: "dapps", "Unable to fetch content: {:?}", e);
FetchState::Error(ContentHandler::error(
StatusCode::BadGateway,
"Download Error",
"There was an error when fetching the content.",
Some(&format!("{:?}", e)),
embeddable_on,
))
FetchState::Error(errors.download_error(e))
},
};
// Content may be resolved when the connection is already dropped.
let _ = tx.send(new_state);
let _ = tx2.send(new_state);
// Ignoring control errors
let _ = control.ready(Next::read());
Ok(()) as Result<(), ()>
@@ -250,7 +306,14 @@ impl<H: ContentValidator, F: Fetch> ContentFetcherHandler<H, F> {
// make sure to run within fetch thread pool.
let future = self.fetch.process(future);
// spawn to event loop
self.remote.spawn(future);
let control = self.control.clone();
let errors = self.errors.clone();
self.remote.spawn_with_timeout(|| future, Duration::from_secs(FETCH_TIMEOUT), move || {
// Notify about the timeout
let _ = tx.send(FetchState::Error(errors.timeout_error()));
// Ignoring control errors
let _ = control.ready(Next::read());
});
rx
}
@@ -258,24 +321,19 @@ impl<H: ContentValidator, F: Fetch> ContentFetcherHandler<H, F> {
impl<H: ContentValidator, F: Fetch> server::Handler<HttpStream> for ContentFetcherHandler<H, F> {
fn on_request(&mut self, request: server::Request<HttpStream>) -> Next {
self.uri = request.uri().clone();
let installer = self.installer.take().expect("Installer always set initialy; installer used only in on_request; on_request invoked only once; qed");
let status = if let FetchState::NotStarted(ref url) = self.status {
let uri = request.uri().clone();
let installer = self.installer.take().expect("Installer always set initialy; installer used only in on_request; on_request invoked only once; qed");
Some(match *request.method() {
// Start fetching content
Method::Get => {
trace!(target: "dapps", "Fetching content from: {:?}", url);
let receiver = self.fetch_content(url, installer);
let receiver = self.fetch_content(uri, url, installer);
FetchState::InProgress(receiver)
},
// or return error
_ => FetchState::Error(ContentHandler::error(
StatusCode::MethodNotAllowed,
"Method Not Allowed",
"Only <code>GET</code> requests are allowed.",
None,
self.embeddable_on.clone(),
)),
_ => FetchState::Error(self.errors.method_not_allowed()),
})
} else { None };
@@ -290,16 +348,9 @@ impl<H: ContentValidator, F: Fetch> server::Handler<HttpStream> for ContentFetch
fn on_request_readable(&mut self, decoder: &mut Decoder<HttpStream>) -> Next {
let (status, next) = match self.status {
// Request may time out
FetchState::InProgress(_) if self.fetch_control.deadline < Instant::now() => {
FetchState::InProgress(_) if self.fetch_control.is_deadline_reached() => {
trace!(target: "dapps", "Fetching dapp failed because of timeout.");
let timeout = ContentHandler::error(
StatusCode::GatewayTimeout,
"Download Timeout",
&format!("Could not fetch content within {} seconds.", FETCH_TIMEOUT),
None,
self.embeddable_on.clone(),
);
(Some(FetchState::Error(timeout)), Next::write())
(Some(FetchState::Error(self.errors.timeout_error())), Next::write())
},
FetchState::InProgress(ref receiver) => {
// Check if there is an answer
@@ -326,6 +377,7 @@ impl<H: ContentValidator, F: Fetch> server::Handler<HttpStream> for ContentFetch
fn on_response(&mut self, res: &mut server::Response) -> Next {
match self.status {
FetchState::Done(_, ref mut handler) => handler.on_response(res),
FetchState::Streaming(ref mut handler) => handler.on_response(res),
FetchState::Error(ref mut handler) => handler.on_response(res),
_ => Next::end(),
}
@@ -334,6 +386,7 @@ impl<H: ContentValidator, F: Fetch> server::Handler<HttpStream> for ContentFetch
fn on_response_writable(&mut self, encoder: &mut Encoder<HttpStream>) -> Next {
match self.status {
FetchState::Done(_, ref mut handler) => handler.on_response_writable(encoder),
FetchState::Streaming(ref mut handler) => handler.on_response_writable(encoder),
FetchState::Error(ref mut handler) => handler.on_response_writable(encoder),
_ => Next::end(),
}

View File

@@ -17,16 +17,18 @@
//! Hyper handlers implementations.
mod auth;
mod echo;
mod content;
mod redirect;
mod echo;
mod fetch;
mod redirect;
mod streaming;
pub use self::auth::AuthRequiredHandler;
pub use self::echo::EchoHandler;
pub use self::content::ContentHandler;
pub use self::echo::EchoHandler;
pub use self::fetch::{ContentFetcherHandler, ContentValidator, FetchControl, ValidatorResponse};
pub use self::redirect::Redirection;
pub use self::fetch::{ContentFetcherHandler, ContentValidator, FetchControl};
pub use self::streaming::StreamingHandler;
use url::Url;
use hyper::{server, header, net, uri};
@@ -49,20 +51,30 @@ pub fn add_security_headers(headers: &mut header::Headers, embeddable_on: Option
}
}
/// Extracts URL part from the Request.
pub fn extract_url(req: &server::Request<net::HttpStream>) -> Option<Url> {
match *req.uri() {
convert_uri_to_url(req.uri(), req.headers().get::<header::Host>())
}
/// Extracts URL given URI and Host header.
pub fn convert_uri_to_url(uri: &uri::RequestUri, host: Option<&header::Host>) -> Option<Url> {
match *uri {
uri::RequestUri::AbsoluteUri(ref url) => {
match Url::from_generic_url(url.clone()) {
Ok(url) => Some(url),
_ => None,
}
},
uri::RequestUri::AbsolutePath { ref path, .. } => {
uri::RequestUri::AbsolutePath { ref path, ref query } => {
let query = match *query {
Some(ref query) => format!("?{}", query),
None => "".into(),
};
// Attempt to prepend the Host header (mandatory in HTTP/1.1)
let url_string = match req.headers().get::<header::Host>() {
let url_string = match host {
Some(ref host) => {
format!("http://{}:{}{}", host.hostname, host.port.unwrap_or(80), path)
format!("http://{}:{}{}{}", host.hostname, host.port.unwrap_or(80), path, query)
},
None => return None,
};

View File

@@ -0,0 +1,101 @@
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Content Stream Response
use std::io::{self, Read};
use hyper::{header, server, Decoder, Encoder, Next};
use hyper::net::HttpStream;
use hyper::mime::Mime;
use hyper::status::StatusCode;
use handlers::add_security_headers;
const BUFFER_SIZE: usize = 1024;
pub struct StreamingHandler<R: io::Read> {
buffer: [u8; BUFFER_SIZE],
buffer_leftover: usize,
status: StatusCode,
content: io::BufReader<R>,
mimetype: Mime,
safe_to_embed_on: Option<(String, u16)>,
}
impl<R: io::Read> StreamingHandler<R> {
pub fn new(content: R, status: StatusCode, mimetype: Mime, embeddable_on: Option<(String, u16)>) -> Self {
StreamingHandler {
buffer: [0; BUFFER_SIZE],
buffer_leftover: 0,
status: status,
content: io::BufReader::new(content),
mimetype: mimetype,
safe_to_embed_on: embeddable_on,
}
}
pub fn set_initial_content(&mut self, content: &str) {
assert_eq!(self.buffer_leftover, 0);
let bytes = content.as_bytes();
self.buffer_leftover = bytes.len();
self.buffer[0..self.buffer_leftover].copy_from_slice(bytes);
}
}
impl<R: io::Read> server::Handler<HttpStream> for StreamingHandler<R> {
fn on_request(&mut self, _request: server::Request<HttpStream>) -> Next {
Next::write()
}
fn on_request_readable(&mut self, _decoder: &mut Decoder<HttpStream>) -> Next {
Next::write()
}
fn on_response(&mut self, res: &mut server::Response) -> Next {
res.set_status(self.status);
res.headers_mut().set(header::ContentType(self.mimetype.clone()));
add_security_headers(&mut res.headers_mut(), self.safe_to_embed_on.clone());
Next::write()
}
fn on_response_writable(&mut self, encoder: &mut Encoder<HttpStream>) -> Next {
fn handle_error(e: io::Error) -> Next {
match e.kind() {
::std::io::ErrorKind::WouldBlock => Next::write(),
_ => Next::end(),
}
}
let write_pos = self.buffer_leftover;
match self.content.read(&mut self.buffer[write_pos..]) {
Err(e) => handle_error(e),
Ok(read) => match encoder.write(&self.buffer[..write_pos + read]) {
Err(e) => handle_error(e),
Ok(0) => Next::end(),
Ok(wrote) => {
self.buffer_leftover = write_pos + read - wrote;
if self.buffer_leftover > 0 {
for i in self.buffer_leftover..write_pos + read {
self.buffer.swap(i, i - self.buffer_leftover);
}
}
Next::write()
},
},
}
}
}

View File

@@ -111,12 +111,23 @@ impl<F> SyncStatus for F where F: Fn() -> bool + Send + Sync {
fn is_major_importing(&self) -> bool { self() }
}
/// Validates Web Proxy tokens
pub trait WebProxyTokens: Send + Sync {
/// Should return true if token is a valid web proxy access token.
fn is_web_proxy_token_valid(&self, token: &String) -> bool;
}
impl<F> WebProxyTokens for F where F: Fn(String) -> bool + Send + Sync {
fn is_web_proxy_token_valid(&self, token: &String) -> bool { self(token.to_owned()) }
}
/// Webapps HTTP+RPC server build.
pub struct ServerBuilder {
dapps_path: String,
handler: Arc<IoHandler>,
registrar: Arc<ContractClient>,
sync_status: Arc<SyncStatus>,
web_proxy_tokens: Arc<WebProxyTokens>,
signer_address: Option<(String, u16)>,
remote: Remote,
fetch: Option<FetchClient>,
@@ -136,6 +147,7 @@ impl ServerBuilder {
handler: Arc::new(IoHandler::new()),
registrar: registrar,
sync_status: Arc::new(|| false),
web_proxy_tokens: Arc::new(|_| false),
signer_address: None,
remote: remote,
fetch: None,
@@ -152,6 +164,11 @@ impl ServerBuilder {
self.sync_status = status;
}
/// Change default web proxy tokens validator.
pub fn with_web_proxy_tokens(&mut self, tokens: Arc<WebProxyTokens>) {
self.web_proxy_tokens = tokens;
}
/// Change default signer port.
pub fn with_signer_address(&mut self, signer_address: Option<(String, u16)>) {
self.signer_address = signer_address;
@@ -169,6 +186,7 @@ impl ServerBuilder {
self.signer_address.clone(),
self.registrar.clone(),
self.sync_status.clone(),
self.web_proxy_tokens.clone(),
self.remote.clone(),
try!(self.fetch()),
)
@@ -186,6 +204,7 @@ impl ServerBuilder {
self.signer_address.clone(),
self.registrar.clone(),
self.sync_status.clone(),
self.web_proxy_tokens.clone(),
self.remote.clone(),
try!(self.fetch()),
)
@@ -241,6 +260,7 @@ impl Server {
signer_address: Option<(String, u16)>,
registrar: Arc<ContractClient>,
sync_status: Arc<SyncStatus>,
web_proxy_tokens: Arc<WebProxyTokens>,
remote: Remote,
fetch: F,
) -> Result<Server, ServerError> {
@@ -253,7 +273,7 @@ impl Server {
remote.clone(),
fetch.clone(),
));
let endpoints = Arc::new(apps::all_endpoints(dapps_path, signer_address.clone(), remote.clone(), fetch.clone()));
let endpoints = Arc::new(apps::all_endpoints(dapps_path, signer_address.clone(), web_proxy_tokens, remote.clone(), fetch.clone()));
let cors_domains = Self::cors_domains(signer_address.clone());
let special = Arc::new({

View File

@@ -21,15 +21,16 @@ pub mod auth;
mod host_validation;
use address;
use std::cmp;
use std::sync::Arc;
use std::collections::HashMap;
use url::{Url, Host};
use hyper::{self, server, Next, Encoder, Decoder, Control, StatusCode};
use hyper::{self, server, header, Next, Encoder, Decoder, Control, StatusCode};
use hyper::net::HttpStream;
use apps::{self, DAPPS_DOMAIN};
use apps::fetcher::Fetcher;
use endpoint::{Endpoint, Endpoints, EndpointPath};
use handlers::{Redirection, extract_url, ContentHandler};
use handlers::{self, Redirection, ContentHandler};
use self::auth::{Authorization, Authorized};
/// Special endpoints are accessible on every domain (every dapp)
@@ -57,9 +58,11 @@ impl<A: Authorization + 'static> server::Handler<HttpStream> for Router<A> {
fn on_request(&mut self, req: server::Request<HttpStream>) -> Next {
// Choose proper handler depending on path / domain
let url = extract_url(&req);
let url = handlers::extract_url(&req);
let endpoint = extract_endpoint(&url);
let referer = extract_referer_endpoint(&req);
let is_utils = endpoint.1 == SpecialEndpoint::Utils;
let is_get_request = *req.method() == hyper::Method::Get;
trace!(target: "dapps", "Routing request to {:?}. Details: {:?}", url, req);
@@ -83,25 +86,42 @@ impl<A: Authorization + 'static> server::Handler<HttpStream> for Router<A> {
return self.handler.on_request(req);
}
let control = self.control.take().expect("on_request is called only once; control is always defined at start; qed");
debug!(target: "dapps", "Handling endpoint request: {:?}", endpoint);
self.handler = match endpoint {
self.handler = match (endpoint.0, endpoint.1, referer) {
// Handle invalid web requests that we can recover from
(ref path, SpecialEndpoint::None, Some((ref referer, ref referer_url)))
if is_get_request
&& referer.app_id == apps::WEB_PATH
&& self.endpoints.contains_key(apps::WEB_PATH)
&& !is_web_endpoint(path)
=>
{
trace!(target: "dapps", "Redirecting to correct web request: {:?}", referer_url);
// TODO [ToDr] Some nice util for this!
let using_domain = if referer.using_dapps_domains { 0 } else { 1 };
let len = cmp::min(referer_url.path.len(), using_domain + 3); // token + protocol + hostname
let base = referer_url.path[..len].join("/");
let requested = url.map(|u| u.path.join("/")).unwrap_or_default();
Redirection::boxed(&format!("/{}/{}", base, requested))
},
// First check special endpoints
(ref path, ref endpoint) if self.special.contains_key(endpoint) => {
(ref path, ref endpoint, _) if self.special.contains_key(endpoint) => {
trace!(target: "dapps", "Resolving to special endpoint.");
self.special.get(endpoint)
.expect("special known to contain key; qed")
.to_async_handler(path.clone().unwrap_or_default(), control)
},
// Then delegate to dapp
(Some(ref path), _) if self.endpoints.contains_key(&path.app_id) => {
(Some(ref path), _, _) if self.endpoints.contains_key(&path.app_id) => {
trace!(target: "dapps", "Resolving to local/builtin dapp.");
self.endpoints.get(&path.app_id)
.expect("special known to contain key; qed")
.expect("endpoints known to contain key; qed")
.to_async_handler(path.clone(), control)
},
// Try to resolve and fetch the dapp
(Some(ref path), _) if self.fetch.contains(&path.app_id) => {
(Some(ref path), _, _) if self.fetch.contains(&path.app_id) => {
trace!(target: "dapps", "Resolving to fetchable content.");
self.fetch.to_async_handler(path.clone(), control)
},
@@ -110,7 +130,7 @@ impl<A: Authorization + 'static> server::Handler<HttpStream> for Router<A> {
// It should be safe to remove it in (near) future.
//
// 404 for non-existent content
(Some(ref path), _) if *req.method() == hyper::Method::Get && path.app_id != "home" => {
(Some(ref path), _, _) if is_get_request && path.app_id != "home" => {
trace!(target: "dapps", "Resolving to 404.");
Box::new(ContentHandler::error(
StatusCode::NotFound,
@@ -121,7 +141,7 @@ impl<A: Authorization + 'static> server::Handler<HttpStream> for Router<A> {
))
},
// Redirect any other GET request to signer.
_ if *req.method() == hyper::Method::Get => {
_ if is_get_request => {
if let Some(signer_address) = self.signer_address.clone() {
trace!(target: "dapps", "Redirecting to signer interface.");
Redirection::boxed(&format!("http://{}", address(signer_address)))
@@ -192,6 +212,23 @@ impl<A: Authorization> Router<A> {
}
}
fn is_web_endpoint(path: &Option<EndpointPath>) -> bool {
match *path {
Some(ref path) if path.app_id == apps::WEB_PATH => true,
_ => false,
}
}
fn extract_referer_endpoint(req: &server::Request<HttpStream>) -> Option<(EndpointPath, Url)> {
let referer = req.headers().get::<header::Referer>();
let url = referer.and_then(|referer| Url::parse(&referer.0).ok());
url.and_then(|url| {
let option = Some(url);
extract_endpoint(&option).0.map(|endpoint| (endpoint, option.expect("Just wrapped; qed")))
})
}
fn extract_endpoint(url: &Option<Url>) -> (Option<EndpointPath>, SpecialEndpoint) {
fn special_endpoint(url: &Url) -> SpecialEndpoint {
if url.path.len() <= 1 {

View File

@@ -37,6 +37,9 @@ pub struct Url {
/// Empty entries of `""` correspond to trailing slashes.
pub path: Vec<String>,
/// The URL query.
pub query: Option<String>,
/// The URL username field, from the userinfo section of the URL.
///
/// `None` if the `@` character was not part of the input OR
@@ -86,11 +89,13 @@ impl Url {
let host = try!(raw_url.host().ok_or_else(|| "Valid host, because only data:, mailto: protocols does not have host.".to_owned())).to_owned();
let path = try!(raw_url.path_segments().ok_or_else(|| "Valid path segments. In HTTP we won't get cannot-be-a-base URLs".to_owned()))
.map(|part| part.to_owned()).collect();
let query = raw_url.query().map(|x| x.to_owned());
Ok(Url {
port: port,
host: host,
path: path,
query: query,
raw: raw_url,
username: username,
password: password,

View File

@@ -16,24 +16,36 @@
//! Serving web-based content (proxying)
use endpoint::{Endpoint, Handler, EndpointPath};
use handlers::{ContentFetcherHandler, ContentHandler, ContentValidator, Redirection, extract_url};
use page::{LocalPageEndpoint};
use std::sync::Arc;
use fetch::{self, Fetch};
use url::Url;
use parity_reactor::Remote;
use hyper::{self, server, net, Next, Encoder, Decoder};
use hyper::status::StatusCode;
use parity_reactor::Remote;
use apps::WEB_PATH;
use apps;
use endpoint::{Endpoint, Handler, EndpointPath};
use handlers::{
ContentFetcherHandler, ContentHandler, ContentValidator, ValidatorResponse,
StreamingHandler, Redirection, extract_url,
};
use url::Url;
use WebProxyTokens;
pub type Embeddable = Option<(String, u16)>;
pub struct Web<F> {
embeddable_on: Embeddable,
web_proxy_tokens: Arc<WebProxyTokens>,
remote: Remote,
fetch: F,
}
impl<F: Fetch> Web<F> {
pub fn boxed(remote: Remote, fetch: F) -> Box<Endpoint> {
pub fn boxed(embeddable_on: Embeddable, web_proxy_tokens: Arc<WebProxyTokens>, remote: Remote, fetch: F) -> Box<Endpoint> {
Box::new(Web {
embeddable_on: embeddable_on,
web_proxy_tokens: web_proxy_tokens,
remote: remote,
fetch: fetch,
})
@@ -48,20 +60,33 @@ impl<F: Fetch> Endpoint for Web<F> {
path: path,
remote: self.remote.clone(),
fetch: self.fetch.clone(),
web_proxy_tokens: self.web_proxy_tokens.clone(),
embeddable_on: self.embeddable_on.clone(),
})
}
}
pub struct WebInstaller;
struct WebInstaller {
embeddable_on: Embeddable,
}
impl ContentValidator for WebInstaller {
type Error = String;
fn validate_and_install(&self, _response: fetch::Response) -> Result<LocalPageEndpoint, String> {
// let path = unimplemented!();
// let mime = response.content_type().unwrap_or(mime!(Text/Html));
// Ok(LocalPageEndpoint::single_file(path, mime, PageCache::Enabled))
Err("unimplemented".into())
fn validate_and_install(&self, response: fetch::Response) -> Result<ValidatorResponse, String> {
let status = StatusCode::from_u16(response.status().to_u16());
let is_html = response.is_html();
let mime = response.content_type().unwrap_or(mime!(Text/Html));
let mut handler = StreamingHandler::new(
response,
status,
mime,
self.embeddable_on.clone(),
);
if is_html {
handler.set_initial_content(&format!(r#"<script src="/{}/inject.js"></script>"#, apps::UTILS_PATH));
}
Ok(ValidatorResponse::Streaming(handler))
}
}
@@ -78,46 +103,61 @@ struct WebHandler<F: Fetch> {
path: EndpointPath,
remote: Remote,
fetch: F,
web_proxy_tokens: Arc<WebProxyTokens>,
embeddable_on: Embeddable,
}
impl<F: Fetch> WebHandler<F> {
fn extract_target_url(url: Option<Url>) -> Result<String, State<F>> {
let path = match url {
Some(url) => url.path,
fn extract_target_url(&self, url: Option<Url>) -> Result<String, State<F>> {
let (path, query) = match url {
Some(url) => (url.path, url.query),
None => {
return Err(State::Error(
ContentHandler::error(StatusCode::BadRequest, "Invalid URL", "Couldn't parse URL", None, None)
));
return Err(State::Error(ContentHandler::error(
StatusCode::BadRequest, "Invalid URL", "Couldn't parse URL", None, self.embeddable_on.clone()
)));
}
};
// TODO [ToDr] Check if token supplied in URL is correct.
// Support domain based routing.
let idx = match path.get(0).map(|m| m.as_ref()) {
Some(WEB_PATH) => 1,
Some(apps::WEB_PATH) => 1,
_ => 0,
};
// Check if token supplied in URL is correct.
match path.get(idx) {
Some(ref token) if self.web_proxy_tokens.is_web_proxy_token_valid(token) => {},
_ => {
return Err(State::Error(ContentHandler::error(
StatusCode::BadRequest, "Invalid Access Token", "Invalid or old web proxy access token supplied.", Some("Try refreshing the page."), self.embeddable_on.clone()
)));
}
}
// Validate protocol
let protocol = match path.get(idx).map(|a| a.as_str()) {
let protocol = match path.get(idx + 1).map(|a| a.as_str()) {
Some("http") => "http",
Some("https") => "https",
_ => {
return Err(State::Error(
ContentHandler::error(StatusCode::BadRequest, "Invalid Protocol", "Invalid protocol used", None, None)
));
return Err(State::Error(ContentHandler::error(
StatusCode::BadRequest, "Invalid Protocol", "Invalid protocol used.", None, self.embeddable_on.clone()
)));
}
};
// Redirect if address to main page does not end with /
if let None = path.get(idx + 2) {
if let None = path.get(idx + 3) {
return Err(State::Redirecting(
Redirection::new(&format!("/{}/", path.join("/")))
));
}
Ok(format!("{}://{}", protocol, path[2..].join("/")))
let query = match query {
Some(query) => format!("?{}", query),
None => "".into(),
};
Ok(format!("{}://{}{}", protocol, path[idx + 2..].join("/"), query))
}
}
@@ -126,7 +166,7 @@ impl<F: Fetch> server::Handler<net::HttpStream> for WebHandler<F> {
let url = extract_url(&request);
// First extract the URL (reject invalid URLs)
let target_url = match Self::extract_target_url(url) {
let target_url = match self.extract_target_url(url) {
Ok(url) => url,
Err(error) => {
self.state = error;
@@ -134,12 +174,14 @@ impl<F: Fetch> server::Handler<net::HttpStream> for WebHandler<F> {
}
};
let (mut handler, _control) = ContentFetcherHandler::new(
let mut handler = ContentFetcherHandler::new(
target_url,
self.path.clone(),
self.control.clone(),
WebInstaller,
None,
WebInstaller {
embeddable_on: self.embeddable_on.clone(),
},
self.embeddable_on.clone(),
self.remote.clone(),
self.fetch.clone(),
);