LRU cache for dapps (#2006)

Conflicts:
	dapps/Cargo.toml
	dapps/src/lib.rs
This commit is contained in:
Tomasz Drwięga
2016-08-30 14:04:52 +02:00
committed by Arkadiy Paronyan
parent 6da60afaba
commit 6f321d9849
10 changed files with 214 additions and 39 deletions

View File

@@ -18,7 +18,8 @@
use std::{env, io, fs, fmt};
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{mpsc, Arc};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use random_filename;
@@ -29,6 +30,7 @@ use hyper::{self, Decoder, Encoder, Next};
#[derive(Debug)]
pub enum Error {
Aborted,
NotStarted,
UnexpectedStatus(StatusCode),
IoError(io::Error),
@@ -40,6 +42,7 @@ pub type OnDone = Box<Fn() + Send>;
pub struct Fetch {
path: PathBuf,
abort: Arc<AtomicBool>,
file: Option<fs::File>,
result: Option<FetchResult>,
sender: mpsc::Sender<FetchResult>,
@@ -56,7 +59,7 @@ impl Drop for Fetch {
fn drop(&mut self) {
let res = self.result.take().unwrap_or(Err(Error::NotStarted));
// Remove file if there was an error
if res.is_err() {
if res.is_err() || self.is_aborted() {
if let Some(file) = self.file.take() {
drop(file);
// Remove file
@@ -72,12 +75,13 @@ impl Drop for Fetch {
}
impl Fetch {
pub fn new(sender: mpsc::Sender<FetchResult>, on_done: OnDone) -> Self {
pub fn new(sender: mpsc::Sender<FetchResult>, abort: Arc<AtomicBool>, on_done: OnDone) -> Self {
let mut dir = env::temp_dir();
dir.push(random_filename());
Fetch {
path: dir,
abort: abort,
file: None,
result: None,
sender: sender,
@@ -86,17 +90,36 @@ impl Fetch {
}
}
impl Fetch {
fn is_aborted(&self) -> bool {
self.abort.load(Ordering::Relaxed)
}
fn mark_aborted(&mut self) -> Next {
self.result = Some(Err(Error::Aborted));
Next::end()
}
}
impl hyper::client::Handler<HttpStream> for Fetch {
fn on_request(&mut self, req: &mut Request) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
req.headers_mut().set(Connection::close());
read()
}
fn on_request_writable(&mut self, _encoder: &mut Encoder<HttpStream>) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
read()
}
fn on_response(&mut self, res: Response) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
if *res.status() != StatusCode::Ok {
self.result = Some(Err(Error::UnexpectedStatus(*res.status())));
return Next::end();
@@ -117,6 +140,9 @@ impl hyper::client::Handler<HttpStream> for Fetch {
}
fn on_response_readable(&mut self, decoder: &mut Decoder<HttpStream>) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
match io::copy(decoder, self.file.as_mut().expect("File is there because on_response has created it.")) {
Ok(0) => Next::end(),
Ok(_) => read(),

View File

@@ -18,7 +18,8 @@
use std::{fs, fmt};
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{mpsc, Arc};
use std::sync::atomic::AtomicBool;
use std::time::{Instant, Duration};
use hyper::{header, server, Decoder, Encoder, Next, Method, Control, Client};
@@ -38,19 +39,20 @@ enum FetchState {
Error(ContentHandler),
InProgress {
deadline: Instant,
receiver: mpsc::Receiver<FetchResult>
receiver: mpsc::Receiver<FetchResult>,
},
Done(Manifest),
}
pub trait DappHandler {
pub trait ContentValidator {
type Error: fmt::Debug;
fn validate_and_install(&self, app: PathBuf) -> Result<Manifest, Self::Error>;
fn done(&self, Option<&Manifest>);
}
pub struct AppFetcherHandler<H: DappHandler> {
pub struct ContentFetcherHandler<H: ContentValidator> {
abort: Arc<AtomicBool>,
control: Option<Control>,
status: FetchState,
client: Option<Client<Fetch>>,
@@ -58,7 +60,7 @@ pub struct AppFetcherHandler<H: DappHandler> {
dapp: H,
}
impl<H: DappHandler> Drop for AppFetcherHandler<H> {
impl<H: ContentValidator> Drop for ContentFetcherHandler<H> {
fn drop(&mut self) {
let manifest = match self.status {
FetchState::Done(ref manifest) => Some(manifest),
@@ -68,16 +70,18 @@ impl<H: DappHandler> Drop for AppFetcherHandler<H> {
}
}
impl<H: DappHandler> AppFetcherHandler<H> {
impl<H: ContentValidator> ContentFetcherHandler<H> {
pub fn new(
app: GithubApp,
abort: Arc<AtomicBool>,
control: Control,
using_dapps_domains: bool,
handler: H) -> Self {
let client = Client::new().expect("Failed to create a Client");
AppFetcherHandler {
ContentFetcherHandler {
abort: abort,
control: Some(control),
client: Some(client),
status: FetchState::NotStarted(app),
@@ -94,12 +98,12 @@ impl<H: DappHandler> AppFetcherHandler<H> {
// TODO [todr] https support
fn fetch_app(client: &mut Client<Fetch>, app: &GithubApp, control: Control) -> Result<mpsc::Receiver<FetchResult>, String> {
fn fetch_app(client: &mut Client<Fetch>, app: &GithubApp, abort: Arc<AtomicBool>, control: Control) -> Result<mpsc::Receiver<FetchResult>, String> {
let url = try!(app.url().parse().map_err(|e| format!("{:?}", e)));
trace!(target: "dapps", "Fetching from: {:?}", url);
let (tx, rx) = mpsc::channel();
let res = client.request(url, Fetch::new(tx, Box::new(move || {
let res = client.request(url, Fetch::new(tx, abort, Box::new(move || {
trace!(target: "dapps", "Fetching finished.");
// Ignoring control errors
let _ = control.ready(Next::read());
@@ -111,7 +115,7 @@ impl<H: DappHandler> AppFetcherHandler<H> {
}
}
impl<H: DappHandler> server::Handler<HttpStream> for AppFetcherHandler<H> {
impl<H: ContentValidator> server::Handler<HttpStream> for ContentFetcherHandler<H> {
fn on_request(&mut self, request: server::Request<HttpStream>) -> Next {
let status = if let FetchState::NotStarted(ref app) = self.status {
Some(match *request.method() {
@@ -120,7 +124,7 @@ impl<H: DappHandler> server::Handler<HttpStream> for AppFetcherHandler<H> {
trace!(target: "dapps", "Fetching dapp: {:?}", app);
let control = self.control.take().expect("on_request is called only once, thus control is always Some");
let client = self.client.as_mut().expect("on_request is called before client is closed.");
let fetch = Self::fetch_app(client, app, control);
let fetch = Self::fetch_app(client, app, self.abort.clone(), control);
match fetch {
Ok(receiver) => FetchState::InProgress {
deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT),

View File

@@ -27,7 +27,7 @@ pub use self::auth::AuthRequiredHandler;
pub use self::echo::EchoHandler;
pub use self::content::ContentHandler;
pub use self::redirect::Redirection;
pub use self::fetch::{AppFetcherHandler, DappHandler};
pub use self::fetch::{ContentFetcherHandler, ContentValidator};
use url::Url;
use hyper::{server, header, net, uri};