Merge remote-tracking branch 'parity/master' into auth-round
This commit is contained in:
commit
099468107e
@ -18,13 +18,13 @@
|
||||
|
||||
use std::fs;
|
||||
use std::sync::{Arc};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use page::LocalPageEndpoint;
|
||||
use handlers::FetchControl;
|
||||
|
||||
pub enum ContentStatus {
|
||||
Fetching(Arc<AtomicBool>),
|
||||
Fetching(Arc<FetchControl>),
|
||||
Ready(LocalPageEndpoint),
|
||||
}
|
||||
|
||||
@ -57,10 +57,10 @@ impl ContentCache {
|
||||
while len > expected_size {
|
||||
let entry = self.cache.pop_front().unwrap();
|
||||
match entry.1 {
|
||||
ContentStatus::Fetching(ref abort) => {
|
||||
ContentStatus::Fetching(ref fetch) => {
|
||||
trace!(target: "dapps", "Aborting {} because of limit.", entry.0);
|
||||
// Mark as aborted
|
||||
abort.store(true, Ordering::SeqCst);
|
||||
fetch.abort()
|
||||
},
|
||||
ContentStatus::Ready(ref endpoint) => {
|
||||
trace!(target: "dapps", "Removing {} because of limit.", entry.0);
|
||||
|
@ -23,7 +23,6 @@ use std::{fs, env, fmt};
|
||||
use std::io::{self, Read, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool};
|
||||
use rustc_serialize::hex::FromHex;
|
||||
|
||||
use hyper;
|
||||
@ -76,10 +75,12 @@ impl<R: URLHint> ContentFetcher<R> {
|
||||
}
|
||||
|
||||
pub fn contains(&self, content_id: &str) -> bool {
|
||||
let mut cache = self.cache.lock();
|
||||
// Check if we already have the app
|
||||
if cache.get(content_id).is_some() {
|
||||
return true;
|
||||
{
|
||||
let mut cache = self.cache.lock();
|
||||
// Check if we already have the app
|
||||
if cache.get(content_id).is_some() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// fallback to resolver
|
||||
if let Ok(content_id) = content_id.from_hex() {
|
||||
@ -115,49 +116,60 @@ impl<R: URLHint> ContentFetcher<R> {
|
||||
(None, endpoint.to_async_handler(path, control))
|
||||
},
|
||||
// App is already being fetched
|
||||
Some(&mut ContentStatus::Fetching(_)) => {
|
||||
(None, Box::new(ContentHandler::error_with_refresh(
|
||||
StatusCode::ServiceUnavailable,
|
||||
"Download In Progress",
|
||||
"This dapp is already being downloaded. Please wait...",
|
||||
None,
|
||||
)) as Box<Handler>)
|
||||
Some(&mut ContentStatus::Fetching(ref fetch_control)) => {
|
||||
trace!(target: "dapps", "Content fetching in progress. Waiting...");
|
||||
(None, fetch_control.to_handler(control))
|
||||
},
|
||||
// We need to start fetching app
|
||||
None => {
|
||||
trace!(target: "dapps", "Content unavailable. Fetching...");
|
||||
let content_hex = content_id.from_hex().expect("to_handler is called only when `contains` returns true.");
|
||||
let content = self.resolver.resolve(content_hex);
|
||||
let abort = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let cache = self.cache.clone();
|
||||
let on_done = move |id: String, result: Option<LocalPageEndpoint>| {
|
||||
let mut cache = cache.lock();
|
||||
match result {
|
||||
Some(endpoint) => {
|
||||
cache.insert(id, ContentStatus::Ready(endpoint));
|
||||
},
|
||||
// In case of error
|
||||
None => {
|
||||
cache.remove(&id);
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
match content {
|
||||
Some(URLHintResult::Dapp(dapp)) => (
|
||||
Some(ContentStatus::Fetching(abort.clone())),
|
||||
Box::new(ContentFetcherHandler::new(
|
||||
Some(URLHintResult::Dapp(dapp)) => {
|
||||
let (handler, fetch_control) = ContentFetcherHandler::new(
|
||||
dapp.url(),
|
||||
abort,
|
||||
control,
|
||||
path.using_dapps_domains,
|
||||
DappInstaller {
|
||||
id: content_id.clone(),
|
||||
dapps_path: self.dapps_path.clone(),
|
||||
cache: self.cache.clone(),
|
||||
})) as Box<Handler>
|
||||
),
|
||||
Some(URLHintResult::Content(content)) => (
|
||||
Some(ContentStatus::Fetching(abort.clone())),
|
||||
Box::new(ContentFetcherHandler::new(
|
||||
on_done: Box::new(on_done),
|
||||
}
|
||||
);
|
||||
|
||||
(Some(ContentStatus::Fetching(fetch_control)), Box::new(handler) as Box<Handler>)
|
||||
},
|
||||
Some(URLHintResult::Content(content)) => {
|
||||
let (handler, fetch_control) = ContentFetcherHandler::new(
|
||||
content.url,
|
||||
abort,
|
||||
control,
|
||||
path.using_dapps_domains,
|
||||
ContentInstaller {
|
||||
id: content_id.clone(),
|
||||
mime: content.mime,
|
||||
content_path: self.dapps_path.clone(),
|
||||
cache: self.cache.clone(),
|
||||
on_done: Box::new(on_done),
|
||||
}
|
||||
)) as Box<Handler>,
|
||||
),
|
||||
);
|
||||
|
||||
(Some(ContentStatus::Fetching(fetch_control)), Box::new(handler) as Box<Handler>)
|
||||
},
|
||||
None => {
|
||||
// This may happen when sync status changes in between
|
||||
// `contains` and `to_handler`
|
||||
@ -225,14 +237,13 @@ struct ContentInstaller {
|
||||
id: String,
|
||||
mime: String,
|
||||
content_path: PathBuf,
|
||||
cache: Arc<Mutex<ContentCache>>,
|
||||
on_done: Box<Fn(String, Option<LocalPageEndpoint>) + Send>,
|
||||
}
|
||||
|
||||
impl ContentValidator for ContentInstaller {
|
||||
type Error = ValidationError;
|
||||
type Result = PathBuf;
|
||||
|
||||
fn validate_and_install(&self, path: PathBuf) -> Result<(String, PathBuf), ValidationError> {
|
||||
fn validate_and_install(&self, path: PathBuf) -> Result<(String, LocalPageEndpoint), ValidationError> {
|
||||
// Create dir
|
||||
try!(fs::create_dir_all(&self.content_path));
|
||||
|
||||
@ -247,21 +258,11 @@ impl ContentValidator for ContentInstaller {
|
||||
|
||||
try!(fs::copy(&path, &content_path));
|
||||
|
||||
Ok((self.id.clone(), content_path))
|
||||
Ok((self.id.clone(), LocalPageEndpoint::single_file(content_path, self.mime.clone())))
|
||||
}
|
||||
|
||||
fn done(&self, result: Option<&PathBuf>) {
|
||||
let mut cache = self.cache.lock();
|
||||
match result {
|
||||
Some(result) => {
|
||||
let page = LocalPageEndpoint::single_file(result.clone(), self.mime.clone());
|
||||
cache.insert(self.id.clone(), ContentStatus::Ready(page));
|
||||
},
|
||||
// In case of error
|
||||
None => {
|
||||
cache.remove(&self.id);
|
||||
},
|
||||
}
|
||||
fn done(&self, endpoint: Option<LocalPageEndpoint>) {
|
||||
(self.on_done)(self.id.clone(), endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
@ -269,7 +270,7 @@ impl ContentValidator for ContentInstaller {
|
||||
struct DappInstaller {
|
||||
id: String,
|
||||
dapps_path: PathBuf,
|
||||
cache: Arc<Mutex<ContentCache>>,
|
||||
on_done: Box<Fn(String, Option<LocalPageEndpoint>) + Send>,
|
||||
}
|
||||
|
||||
impl DappInstaller {
|
||||
@ -306,9 +307,8 @@ impl DappInstaller {
|
||||
|
||||
impl ContentValidator for DappInstaller {
|
||||
type Error = ValidationError;
|
||||
type Result = Manifest;
|
||||
|
||||
fn validate_and_install(&self, app_path: PathBuf) -> Result<(String, Manifest), ValidationError> {
|
||||
fn validate_and_install(&self, app_path: PathBuf) -> Result<(String, LocalPageEndpoint), ValidationError> {
|
||||
trace!(target: "dapps", "Opening dapp bundle at {:?}", app_path);
|
||||
let mut file_reader = io::BufReader::new(try!(fs::File::open(app_path)));
|
||||
let hash = try!(sha3(&mut file_reader));
|
||||
@ -362,23 +362,15 @@ impl ContentValidator for DappInstaller {
|
||||
let mut manifest_file = try!(fs::File::create(manifest_path));
|
||||
try!(manifest_file.write_all(manifest_str.as_bytes()));
|
||||
|
||||
// Create endpoint
|
||||
let app = LocalPageEndpoint::new(target, manifest.clone().into());
|
||||
|
||||
// Return modified app manifest
|
||||
Ok((manifest.id.clone(), manifest))
|
||||
Ok((manifest.id.clone(), app))
|
||||
}
|
||||
|
||||
fn done(&self, manifest: Option<&Manifest>) {
|
||||
let mut cache = self.cache.lock();
|
||||
match manifest {
|
||||
Some(manifest) => {
|
||||
let path = self.dapp_target_path(manifest);
|
||||
let app = LocalPageEndpoint::new(path, manifest.clone().into());
|
||||
cache.insert(self.id.clone(), ContentStatus::Ready(app));
|
||||
},
|
||||
// In case of error
|
||||
None => {
|
||||
cache.remove(&self.id);
|
||||
},
|
||||
}
|
||||
fn done(&self, endpoint: Option<LocalPageEndpoint>) {
|
||||
(self.on_done)(self.id.clone(), endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,6 @@
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<meta name="viewport" content="width=device-width">
|
||||
{meta}
|
||||
<title>{title}</title>
|
||||
<link rel="stylesheet" href="/parity-utils/styles.css">
|
||||
</head>
|
||||
|
@ -23,6 +23,7 @@ use hyper::status::StatusCode;
|
||||
|
||||
use util::version;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ContentHandler {
|
||||
code: StatusCode,
|
||||
content: String,
|
||||
@ -57,18 +58,6 @@ impl ContentHandler {
|
||||
Self::html(code, format!(
|
||||
include_str!("../error_tpl.html"),
|
||||
title=title,
|
||||
meta="",
|
||||
message=message,
|
||||
details=details.unwrap_or_else(|| ""),
|
||||
version=version(),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn error_with_refresh(code: StatusCode, title: &str, message: &str, details: Option<&str>) -> Self {
|
||||
Self::html(code, format!(
|
||||
include_str!("../error_tpl.html"),
|
||||
title=title,
|
||||
meta="<meta http-equiv=\"refresh\" content=\"1\">",
|
||||
message=message,
|
||||
details=details.unwrap_or_else(|| ""),
|
||||
version=version(),
|
||||
|
@ -19,41 +19,122 @@
|
||||
use std::{fs, fmt};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{mpsc, Arc};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::{Instant, Duration};
|
||||
use util::Mutex;
|
||||
|
||||
use hyper::{header, server, Decoder, Encoder, Next, Method, Control};
|
||||
use hyper::{server, Decoder, Encoder, Next, Method, Control};
|
||||
use hyper::net::HttpStream;
|
||||
use hyper::status::StatusCode;
|
||||
|
||||
use handlers::ContentHandler;
|
||||
use handlers::{ContentHandler, Redirection};
|
||||
use handlers::client::{Client, FetchResult};
|
||||
use apps::redirection_address;
|
||||
use page::LocalPageEndpoint;
|
||||
|
||||
const FETCH_TIMEOUT: u64 = 30;
|
||||
|
||||
enum FetchState<T: fmt::Debug> {
|
||||
enum FetchState {
|
||||
NotStarted(String),
|
||||
Error(ContentHandler),
|
||||
InProgress {
|
||||
deadline: Instant,
|
||||
receiver: mpsc::Receiver<FetchResult>,
|
||||
},
|
||||
Done((String, T)),
|
||||
InProgress(mpsc::Receiver<FetchResult>),
|
||||
Done(String, LocalPageEndpoint, Redirection),
|
||||
}
|
||||
|
||||
pub trait ContentValidator {
|
||||
type Error: fmt::Debug + fmt::Display;
|
||||
type Result: fmt::Debug;
|
||||
|
||||
fn validate_and_install(&self, app: PathBuf) -> Result<(String, Self::Result), Self::Error>;
|
||||
fn done(&self, Option<&Self::Result>);
|
||||
fn validate_and_install(&self, app: PathBuf) -> Result<(String, LocalPageEndpoint), Self::Error>;
|
||||
fn done(&self, Option<LocalPageEndpoint>);
|
||||
}
|
||||
|
||||
pub struct FetchControl {
|
||||
abort: Arc<AtomicBool>,
|
||||
listeners: Mutex<Vec<(Control, mpsc::Sender<FetchState>)>>,
|
||||
deadline: Instant,
|
||||
}
|
||||
|
||||
impl Default for FetchControl {
|
||||
fn default() -> Self {
|
||||
FetchControl {
|
||||
abort: Arc::new(AtomicBool::new(false)),
|
||||
listeners: Mutex::new(Vec::new()),
|
||||
deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FetchControl {
|
||||
fn notify<F: Fn() -> FetchState>(&self, status: F) {
|
||||
let mut listeners = self.listeners.lock();
|
||||
for (control, sender) in listeners.drain(..) {
|
||||
if let Err(e) = sender.send(status()) {
|
||||
trace!(target: "dapps", "Waiting listener notification failed: {:?}", e);
|
||||
} else {
|
||||
let _ = control.ready(Next::read());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn set_status(&self, status: &FetchState) {
|
||||
match *status {
|
||||
FetchState::Error(ref handler) => self.notify(|| FetchState::Error(handler.clone())),
|
||||
FetchState::Done(ref id, ref endpoint, ref handler) => self.notify(|| FetchState::Done(id.clone(), endpoint.clone(), handler.clone())),
|
||||
FetchState::NotStarted(_) | FetchState::InProgress(_) => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn abort(&self) {
|
||||
self.abort.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn to_handler(&self, control: Control) -> Box<server::Handler<HttpStream> + Send> {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
self.listeners.lock().push((control, tx));
|
||||
|
||||
Box::new(WaitingHandler {
|
||||
receiver: rx,
|
||||
state: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WaitingHandler {
|
||||
receiver: mpsc::Receiver<FetchState>,
|
||||
state: Option<FetchState>,
|
||||
}
|
||||
|
||||
impl server::Handler<HttpStream> for WaitingHandler {
|
||||
fn on_request(&mut self, _request: server::Request<HttpStream>) -> Next {
|
||||
Next::wait()
|
||||
}
|
||||
|
||||
fn on_request_readable(&mut self, _decoder: &mut Decoder<HttpStream>) -> Next {
|
||||
self.state = self.receiver.try_recv().ok();
|
||||
Next::write()
|
||||
}
|
||||
|
||||
fn on_response(&mut self, res: &mut server::Response) -> Next {
|
||||
match self.state {
|
||||
Some(FetchState::Done(_, _, ref mut handler)) => handler.on_response(res),
|
||||
Some(FetchState::Error(ref mut handler)) => handler.on_response(res),
|
||||
_ => Next::end(),
|
||||
}
|
||||
}
|
||||
|
||||
fn on_response_writable(&mut self, encoder: &mut Encoder<HttpStream>) -> Next {
|
||||
match self.state {
|
||||
Some(FetchState::Done(_, _, ref mut handler)) => handler.on_response_writable(encoder),
|
||||
Some(FetchState::Error(ref mut handler)) => handler.on_response_writable(encoder),
|
||||
_ => Next::end(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ContentFetcherHandler<H: ContentValidator> {
|
||||
abort: Arc<AtomicBool>,
|
||||
fetch_control: Arc<FetchControl>,
|
||||
control: Option<Control>,
|
||||
status: FetchState<H::Result>,
|
||||
status: FetchState,
|
||||
client: Option<Client>,
|
||||
using_dapps_domains: bool,
|
||||
installer: H,
|
||||
@ -62,7 +143,7 @@ pub struct ContentFetcherHandler<H: ContentValidator> {
|
||||
impl<H: ContentValidator> Drop for ContentFetcherHandler<H> {
|
||||
fn drop(&mut self) {
|
||||
let result = match self.status {
|
||||
FetchState::Done((_, ref result)) => Some(result),
|
||||
FetchState::Done(_, ref result, _) => Some(result.clone()),
|
||||
_ => None,
|
||||
};
|
||||
self.installer.done(result);
|
||||
@ -73,20 +154,22 @@ impl<H: ContentValidator> ContentFetcherHandler<H> {
|
||||
|
||||
pub fn new(
|
||||
url: String,
|
||||
abort: Arc<AtomicBool>,
|
||||
control: Control,
|
||||
using_dapps_domains: bool,
|
||||
handler: H) -> Self {
|
||||
handler: H) -> (Self, Arc<FetchControl>) {
|
||||
|
||||
let fetch_control = Arc::new(FetchControl::default());
|
||||
let client = Client::new();
|
||||
ContentFetcherHandler {
|
||||
abort: abort,
|
||||
let handler = ContentFetcherHandler {
|
||||
fetch_control: fetch_control.clone(),
|
||||
control: Some(control),
|
||||
client: Some(client),
|
||||
status: FetchState::NotStarted(url),
|
||||
using_dapps_domains: using_dapps_domains,
|
||||
installer: handler,
|
||||
}
|
||||
};
|
||||
|
||||
(handler, fetch_control)
|
||||
}
|
||||
|
||||
fn close_client(client: &mut Option<Client>) {
|
||||
@ -95,7 +178,6 @@ impl<H: ContentValidator> ContentFetcherHandler<H> {
|
||||
.close();
|
||||
}
|
||||
|
||||
|
||||
fn fetch_content(client: &mut Client, url: &str, abort: Arc<AtomicBool>, control: Control) -> Result<mpsc::Receiver<FetchResult>, String> {
|
||||
client.request(url, abort, Box::new(move || {
|
||||
trace!(target: "dapps", "Fetching finished.");
|
||||
@ -114,12 +196,9 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
trace!(target: "dapps", "Fetching content from: {:?}", url);
|
||||
let control = self.control.take().expect("on_request is called only once, thus control is always Some");
|
||||
let client = self.client.as_mut().expect("on_request is called before client is closed.");
|
||||
let fetch = Self::fetch_content(client, url, self.abort.clone(), control);
|
||||
let fetch = Self::fetch_content(client, url, self.fetch_control.abort.clone(), control);
|
||||
match fetch {
|
||||
Ok(receiver) => FetchState::InProgress {
|
||||
deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT),
|
||||
receiver: receiver,
|
||||
},
|
||||
Ok(receiver) => FetchState::InProgress(receiver),
|
||||
Err(e) => FetchState::Error(ContentHandler::error(
|
||||
StatusCode::BadGateway,
|
||||
"Unable To Start Dapp Download",
|
||||
@ -139,6 +218,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
} else { None };
|
||||
|
||||
if let Some(status) = status {
|
||||
self.fetch_control.set_status(&status);
|
||||
self.status = status;
|
||||
}
|
||||
|
||||
@ -148,7 +228,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
fn on_request_readable(&mut self, decoder: &mut Decoder<HttpStream>) -> Next {
|
||||
let (status, next) = match self.status {
|
||||
// Request may time out
|
||||
FetchState::InProgress { ref deadline, .. } if *deadline < Instant::now() => {
|
||||
FetchState::InProgress(_) if self.fetch_control.deadline < Instant::now() => {
|
||||
trace!(target: "dapps", "Fetching dapp failed because of timeout.");
|
||||
let timeout = ContentHandler::error(
|
||||
StatusCode::GatewayTimeout,
|
||||
@ -159,7 +239,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
Self::close_client(&mut self.client);
|
||||
(Some(FetchState::Error(timeout)), Next::write())
|
||||
},
|
||||
FetchState::InProgress { ref receiver, .. } => {
|
||||
FetchState::InProgress(ref receiver) => {
|
||||
// Check if there is an answer
|
||||
let rec = receiver.try_recv();
|
||||
match rec {
|
||||
@ -178,7 +258,10 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
Some(&format!("{:?}", e))
|
||||
))
|
||||
},
|
||||
Ok(result) => FetchState::Done(result)
|
||||
Ok((id, result)) => {
|
||||
let address = redirection_address(self.using_dapps_domains, &id);
|
||||
FetchState::Done(id, result, Redirection::new(&address))
|
||||
},
|
||||
};
|
||||
// Remove temporary zip file
|
||||
let _ = fs::remove_file(path);
|
||||
@ -203,6 +286,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
};
|
||||
|
||||
if let Some(status) = status {
|
||||
self.fetch_control.set_status(&status);
|
||||
self.status = status;
|
||||
}
|
||||
|
||||
@ -211,12 +295,7 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
|
||||
fn on_response(&mut self, res: &mut server::Response) -> Next {
|
||||
match self.status {
|
||||
FetchState::Done((ref id, _)) => {
|
||||
trace!(target: "dapps", "Fetching content finished. Redirecting to {}", id);
|
||||
res.set_status(StatusCode::Found);
|
||||
res.headers_mut().set(header::Location(redirection_address(self.using_dapps_domains, id)));
|
||||
Next::write()
|
||||
},
|
||||
FetchState::Done(_, _, ref mut handler) => handler.on_response(res),
|
||||
FetchState::Error(ref mut handler) => handler.on_response(res),
|
||||
_ => Next::end(),
|
||||
}
|
||||
@ -224,9 +303,9 @@ impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<
|
||||
|
||||
fn on_response_writable(&mut self, encoder: &mut Encoder<HttpStream>) -> Next {
|
||||
match self.status {
|
||||
FetchState::Done(_, _, ref mut handler) => handler.on_response_writable(encoder),
|
||||
FetchState::Error(ref mut handler) => handler.on_response_writable(encoder),
|
||||
_ => Next::end(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ pub use self::auth::AuthRequiredHandler;
|
||||
pub use self::echo::EchoHandler;
|
||||
pub use self::content::ContentHandler;
|
||||
pub use self::redirect::Redirection;
|
||||
pub use self::fetch::{ContentFetcherHandler, ContentValidator};
|
||||
pub use self::fetch::{ContentFetcherHandler, ContentValidator, FetchControl};
|
||||
|
||||
use url::Url;
|
||||
use hyper::{server, header, net, uri};
|
||||
|
@ -20,15 +20,20 @@ use hyper::{header, server, Decoder, Encoder, Next};
|
||||
use hyper::net::HttpStream;
|
||||
use hyper::status::StatusCode;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Redirection {
|
||||
to_url: String
|
||||
}
|
||||
|
||||
impl Redirection {
|
||||
pub fn new(url: &str) -> Box<Self> {
|
||||
Box::new(Redirection {
|
||||
pub fn new(url: &str) -> Self {
|
||||
Redirection {
|
||||
to_url: url.to_owned()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn boxed(url: &str) -> Box<Self> {
|
||||
Box::new(Self::new(url))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ use std::path::{Path, PathBuf};
|
||||
use page::handler;
|
||||
use endpoint::{Endpoint, EndpointInfo, EndpointPath, Handler};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LocalPageEndpoint {
|
||||
path: PathBuf,
|
||||
mime: Option<String>,
|
||||
|
@ -104,7 +104,7 @@ impl<A: Authorization + 'static> server::Handler<HttpStream> for Router<A> {
|
||||
// Redirect any GET request to home.
|
||||
_ if *req.method() == hyper::method::Method::Get => {
|
||||
let address = apps::redirection_address(false, self.main_page);
|
||||
Redirection::new(address.as_str())
|
||||
Redirection::boxed(address.as_str())
|
||||
},
|
||||
// RPC by default
|
||||
_ => {
|
||||
|
@ -81,6 +81,7 @@
|
||||
//! - It removes all transactions (either from `current` or `future`) with nonce < client nonce
|
||||
//! - It moves matching `future` transactions to `current`
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::cmp::Ordering;
|
||||
use std::cmp;
|
||||
use std::collections::{HashSet, HashMap, BTreeSet, BTreeMap};
|
||||
@ -215,7 +216,48 @@ impl VerifiedTransaction {
|
||||
}
|
||||
|
||||
fn sender(&self) -> Address {
|
||||
self.transaction.sender().unwrap()
|
||||
self.transaction.sender().expect("Sender is verified in new; qed")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct GasPriceQueue {
|
||||
backing: BTreeMap<U256, HashSet<H256>>,
|
||||
}
|
||||
|
||||
impl GasPriceQueue {
|
||||
/// Insert an item into a BTreeMap/HashSet "multimap".
|
||||
pub fn insert(&mut self, gas_price: U256, hash: H256) -> bool {
|
||||
self.backing.entry(gas_price).or_insert_with(Default::default).insert(hash)
|
||||
}
|
||||
|
||||
/// Remove an item from a BTreeMap/HashSet "multimap".
|
||||
/// Returns true if the item was removed successfully.
|
||||
pub fn remove(&mut self, gas_price: &U256, hash: &H256) -> bool {
|
||||
if let Some(mut hashes) = self.backing.get_mut(gas_price) {
|
||||
let only_one_left = hashes.len() == 1;
|
||||
if !only_one_left {
|
||||
// Operation may be ok: only if hash is in gas-price's Set.
|
||||
return hashes.remove(hash);
|
||||
}
|
||||
if hash != hashes.iter().next().expect("We know there is only one element in collection, tested above; qed") {
|
||||
// Operation failed: hash not the single item in gas-price's Set.
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
// Operation failed: gas-price not found in Map.
|
||||
return false;
|
||||
}
|
||||
// Operation maybe ok: only if hash not found in gas-price Set.
|
||||
self.backing.remove(gas_price).is_some()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for GasPriceQueue {
|
||||
type Target=BTreeMap<U256, HashSet<H256>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.backing
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,7 +269,7 @@ impl VerifiedTransaction {
|
||||
struct TransactionSet {
|
||||
by_priority: BTreeSet<TransactionOrder>,
|
||||
by_address: Table<Address, U256, TransactionOrder>,
|
||||
by_gas_price: BTreeMap<U256, HashSet<H256>>,
|
||||
by_gas_price: GasPriceQueue,
|
||||
limit: usize,
|
||||
}
|
||||
|
||||
@ -245,12 +287,12 @@ impl TransactionSet {
|
||||
// If transaction was replaced remove it from priority queue
|
||||
if let Some(ref old_order) = by_address_replaced {
|
||||
assert!(self.by_priority.remove(old_order), "hash is in `by_address`; all transactions in `by_address` must be in `by_priority`; qed");
|
||||
assert!(Self::remove_item(&mut self.by_gas_price, &old_order.gas_price, &old_order.hash),
|
||||
assert!(self.by_gas_price.remove(&old_order.gas_price, &old_order.hash),
|
||||
"hash is in `by_address`; all transactions' gas_prices in `by_address` must be in `by_gas_limit`; qed");
|
||||
}
|
||||
Self::insert_item(&mut self.by_gas_price, order_gas_price, order_hash);
|
||||
debug_assert_eq!(self.by_priority.len(), self.by_address.len());
|
||||
debug_assert_eq!(self.by_gas_price.iter().map(|(_, v)| v.len()).fold(0, |a, b| a + b), self.by_address.len());
|
||||
self.by_gas_price.insert(order_gas_price, order_hash);
|
||||
assert_eq!(self.by_priority.len(), self.by_address.len());
|
||||
assert_eq!(self.by_gas_price.values().map(|v| v.len()).fold(0, |a, b| a + b), self.by_address.len());
|
||||
by_address_replaced
|
||||
}
|
||||
|
||||
@ -263,6 +305,7 @@ impl TransactionSet {
|
||||
if len <= self.limit {
|
||||
return None;
|
||||
}
|
||||
|
||||
let to_drop : Vec<(Address, U256)> = {
|
||||
self.by_priority
|
||||
.iter()
|
||||
@ -290,13 +333,16 @@ impl TransactionSet {
|
||||
/// Drop transaction from this set (remove from `by_priority` and `by_address`)
|
||||
fn drop(&mut self, sender: &Address, nonce: &U256) -> Option<TransactionOrder> {
|
||||
if let Some(tx_order) = self.by_address.remove(sender, nonce) {
|
||||
assert!(Self::remove_item(&mut self.by_gas_price, &tx_order.gas_price, &tx_order.hash),
|
||||
assert!(self.by_gas_price.remove(&tx_order.gas_price, &tx_order.hash),
|
||||
"hash is in `by_address`; all transactions' gas_prices in `by_address` must be in `by_gas_limit`; qed");
|
||||
self.by_priority.remove(&tx_order);
|
||||
assert!(self.by_priority.remove(&tx_order),
|
||||
"hash is in `by_address`; all transactions' gas_prices in `by_address` must be in `by_priority`; qed");
|
||||
assert_eq!(self.by_priority.len(), self.by_address.len());
|
||||
assert_eq!(self.by_gas_price.values().map(|v| v.len()).fold(0, |a, b| a + b), self.by_address.len());
|
||||
return Some(tx_order);
|
||||
}
|
||||
assert_eq!(self.by_priority.len(), self.by_address.len());
|
||||
assert_eq!(self.by_gas_price.values().map(|v| v.len()).fold(0, |a, b| a + b), self.by_address.len());
|
||||
None
|
||||
}
|
||||
|
||||
@ -304,7 +350,7 @@ impl TransactionSet {
|
||||
fn clear(&mut self) {
|
||||
self.by_priority.clear();
|
||||
self.by_address.clear();
|
||||
self.by_gas_price.clear();
|
||||
self.by_gas_price.backing.clear();
|
||||
}
|
||||
|
||||
/// Sets new limit for number of transactions in this `TransactionSet`.
|
||||
@ -321,32 +367,6 @@ impl TransactionSet {
|
||||
_ => U256::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert an item into a BTreeMap/HashSet "multimap".
|
||||
fn insert_item(into: &mut BTreeMap<U256, HashSet<H256>>, gas_price: U256, hash: H256) -> bool {
|
||||
into.entry(gas_price).or_insert_with(Default::default).insert(hash)
|
||||
}
|
||||
|
||||
/// Remove an item from a BTreeMap/HashSet "multimap".
|
||||
/// Returns true if the item was removed successfully.
|
||||
fn remove_item(from: &mut BTreeMap<U256, HashSet<H256>>, gas_price: &U256, hash: &H256) -> bool {
|
||||
if let Some(mut hashes) = from.get_mut(gas_price) {
|
||||
let only_one_left = hashes.len() == 1;
|
||||
if !only_one_left {
|
||||
// Operation may be ok: only if hash is in gas-price's Set.
|
||||
return hashes.remove(hash);
|
||||
}
|
||||
if hashes.iter().next().unwrap() != hash {
|
||||
// Operation failed: hash not the single item in gas-price's Set.
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
// Operation failed: gas-price not found in Map.
|
||||
return false;
|
||||
}
|
||||
// Operation maybe ok: only if hash not found in gas-price Set.
|
||||
from.remove(gas_price).is_some()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -588,7 +608,7 @@ impl TransactionQueue {
|
||||
return;
|
||||
}
|
||||
|
||||
let transaction = transaction.unwrap();
|
||||
let transaction = transaction.expect("None is tested in early-exit condition above; qed");
|
||||
let sender = transaction.sender();
|
||||
let nonce = transaction.nonce();
|
||||
let current_nonce = fetch_account(&sender).nonce;
|
||||
@ -623,7 +643,7 @@ impl TransactionQueue {
|
||||
None => vec![],
|
||||
};
|
||||
for k in all_nonces_from_sender {
|
||||
let order = self.future.drop(sender, &k).unwrap();
|
||||
let order = self.future.drop(sender, &k).expect("iterating over a collection that has been retrieved above; qed");
|
||||
if k >= current_nonce {
|
||||
self.future.insert(*sender, k, order.update_height(k, current_nonce));
|
||||
} else {
|
||||
@ -644,7 +664,8 @@ impl TransactionQueue {
|
||||
|
||||
for k in all_nonces_from_sender {
|
||||
// Goes to future or is removed
|
||||
let order = self.current.drop(sender, &k).unwrap();
|
||||
let order = self.current.drop(sender, &k).expect("iterating over a collection that has been retrieved above;
|
||||
qed");
|
||||
if k >= current_nonce {
|
||||
self.future.insert(*sender, k, order.update_height(k, current_nonce));
|
||||
} else {
|
||||
@ -704,10 +725,11 @@ impl TransactionQueue {
|
||||
if let None = by_nonce {
|
||||
return;
|
||||
}
|
||||
let mut by_nonce = by_nonce.unwrap();
|
||||
let mut by_nonce = by_nonce.expect("None is tested in early-exit condition above; qed");
|
||||
while let Some(order) = by_nonce.remove(¤t_nonce) {
|
||||
// remove also from priority and hash
|
||||
// remove also from priority and gas_price
|
||||
self.future.by_priority.remove(&order);
|
||||
self.future.by_gas_price.remove(&order.gas_price, &order.hash);
|
||||
// Put to current
|
||||
let order = order.update_height(current_nonce, first_nonce);
|
||||
self.current.insert(address, current_nonce, order);
|
||||
@ -1395,6 +1417,9 @@ mod test {
|
||||
let stats = txq.status();
|
||||
assert_eq!(stats.pending, 3);
|
||||
assert_eq!(stats.future, 0);
|
||||
assert_eq!(txq.future.by_priority.len(), 0);
|
||||
assert_eq!(txq.future.by_address.len(), 0);
|
||||
assert_eq!(txq.future.by_gas_price.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -354,6 +354,10 @@ impl Service {
|
||||
// destroy the old snapshot reader.
|
||||
*reader = None;
|
||||
|
||||
if snapshot_dir.exists() {
|
||||
try!(fs::remove_dir_all(&snapshot_dir));
|
||||
}
|
||||
|
||||
try!(fs::rename(temp_dir, &snapshot_dir));
|
||||
|
||||
*reader = Some(try!(LooseReader::new(snapshot_dir)));
|
||||
@ -428,16 +432,11 @@ impl Service {
|
||||
|
||||
let snapshot_dir = self.snapshot_dir();
|
||||
|
||||
trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
|
||||
if let Err(e) = fs::remove_dir_all(&snapshot_dir) {
|
||||
match e.kind() {
|
||||
ErrorKind::NotFound => {}
|
||||
_ => return Err(e.into()),
|
||||
}
|
||||
if snapshot_dir.exists() {
|
||||
trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
|
||||
try!(fs::remove_dir_all(&snapshot_dir));
|
||||
}
|
||||
|
||||
try!(fs::create_dir(&snapshot_dir));
|
||||
|
||||
trace!(target: "snapshot", "copying restored snapshot files over");
|
||||
try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir));
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user