From 6f321d9849c8be23c8e89558d653d3ebd6173fff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 30 Aug 2016 14:04:52 +0200 Subject: [PATCH] LRU cache for dapps (#2006) Conflicts: dapps/Cargo.toml dapps/src/lib.rs --- Cargo.lock | 7 ++ dapps/Cargo.toml | 1 + dapps/src/apps/cache.rs | 128 ++++++++++++++++++++++++ dapps/src/apps/fetcher.rs | 51 +++++----- dapps/src/apps/mod.rs | 1 + dapps/src/handlers/client/fetch_file.rs | 32 +++++- dapps/src/handlers/fetch.rs | 26 +++-- dapps/src/handlers/mod.rs | 2 +- dapps/src/lib.rs | 1 + dapps/src/page/local.rs | 4 + 10 files changed, 214 insertions(+), 39 deletions(-) create mode 100644 dapps/src/apps/cache.rs diff --git a/Cargo.lock b/Cargo.lock index 119e87fdf..79bb6dba9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,6 +295,7 @@ dependencies = [ "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 6.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git)", + "linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "mime_guess 1.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "parity-dapps 1.4.0 (git+https://github.com/ethcore/parity-ui.git)", @@ -793,6 +794,11 @@ name = "libc" version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "linked-hash-map" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "log" version = "0.3.6" @@ -1699,6 +1705,7 @@ dependencies = [ "checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" "checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f" "checksum libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "97def9dc7ce1d8e153e693e3a33020bc69972181adb2f871e87e888876feae49" +"checksum linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d262045c5b87c0861b3f004610afd0e2c851e2908d08b6c870cbb9d5f494ecd" "checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054" "checksum matches 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "15305656809ce5a4805b1ff2946892810992197ce1270ff79baded852187942e" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" diff --git a/dapps/Cargo.toml b/dapps/Cargo.toml index 2c7c9db9c..1f5a0c491 100644 --- a/dapps/Cargo.toml +++ b/dapps/Cargo.toml @@ -22,6 +22,7 @@ serde_json = "0.7.0" serde_macros = { version = "0.7.0", optional = true } zip = { version = "0.1", default-features = false } ethabi = "0.2.1" +linked-hash-map = "0.3" ethcore-rpc = { path = "../rpc" } ethcore-util = { path = "../util" } parity-dapps = { git = "https://github.com/ethcore/parity-ui.git", version = "1.4" } diff --git a/dapps/src/apps/cache.rs b/dapps/src/apps/cache.rs new file mode 100644 index 000000000..bf1c5f3cc --- /dev/null +++ b/dapps/src/apps/cache.rs @@ -0,0 +1,128 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Fetchable Dapps support. + +use std::fs; +use std::sync::{Arc}; +use std::sync::atomic::{AtomicBool, Ordering}; + +use linked_hash_map::LinkedHashMap; +use page::LocalPageEndpoint; + +pub enum ContentStatus { + Fetching(Arc), + Ready(LocalPageEndpoint), +} + +#[derive(Default)] +pub struct ContentCache { + cache: LinkedHashMap, +} + +impl ContentCache { + pub fn insert(&mut self, content_id: String, status: ContentStatus) -> Option { + self.cache.insert(content_id, status) + } + + pub fn remove(&mut self, content_id: &str) -> Option { + self.cache.remove(content_id) + } + + pub fn get(&mut self, content_id: &str) -> Option<&mut ContentStatus> { + self.cache.get_refresh(content_id) + } + + pub fn clear_garbage(&mut self, expected_size: usize) -> Vec<(String, ContentStatus)> { + let mut len = self.cache.len(); + + if len <= expected_size { + return Vec::new(); + } + + let mut removed = Vec::with_capacity(len - expected_size); + while len > expected_size { + let entry = self.cache.pop_front().unwrap(); + match entry.1 { + ContentStatus::Fetching(ref abort) => { + trace!(target: "dapps", "Aborting {} because of limit.", entry.0); + // Mark as aborted + abort.store(true, Ordering::Relaxed); + }, + ContentStatus::Ready(ref endpoint) => { + trace!(target: "dapps", "Removing {} because of limit.", entry.0); + // Remove path + let res = fs::remove_dir_all(&endpoint.path()); + if let Err(e) = res { + warn!(target: "dapps", "Unable to remove dapp: {:?}", e); + } + } + } + + removed.push(entry); + len -= 1; + } + removed + } + + #[cfg(test)] + pub fn len(&self) -> usize { + self.cache.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn only_keys(data: Vec<(String, ContentStatus)>) -> Vec { + data.into_iter().map(|x| x.0).collect() + } + + #[test] + fn should_remove_least_recently_used() { + // given + let mut cache = ContentCache::default(); + cache.insert("a".into(), ContentStatus::Fetching(Default::default())); + cache.insert("b".into(), ContentStatus::Fetching(Default::default())); + cache.insert("c".into(), ContentStatus::Fetching(Default::default())); + + // when + let res = cache.clear_garbage(2); + + // then + assert_eq!(cache.len(), 2); + assert_eq!(only_keys(res), vec!["a"]); + } + + #[test] + fn should_update_lru_if_accessed() { + // given + let mut cache = ContentCache::default(); + cache.insert("a".into(), ContentStatus::Fetching(Default::default())); + cache.insert("b".into(), ContentStatus::Fetching(Default::default())); + cache.insert("c".into(), ContentStatus::Fetching(Default::default())); + + // when + cache.get("a"); + let res = cache.clear_garbage(2); + + // then + assert_eq!(cache.len(), 2); + assert_eq!(only_keys(res), vec!["b"]); + } + +} diff --git a/dapps/src/apps/fetcher.rs b/dapps/src/apps/fetcher.rs index 347c8da5f..e31aae55d 100644 --- a/dapps/src/apps/fetcher.rs +++ b/dapps/src/apps/fetcher.rs @@ -23,7 +23,7 @@ use std::{fs, env}; use std::io::{self, Read, Write}; use std::path::PathBuf; use std::sync::Arc; -use std::collections::HashMap; +use std::sync::atomic::{AtomicBool}; use rustc_serialize::hex::FromHex; use hyper::Control; @@ -33,20 +33,18 @@ use random_filename; use util::{Mutex, H256}; use util::sha3::sha3; use page::LocalPageEndpoint; -use handlers::{ContentHandler, AppFetcherHandler, DappHandler}; +use handlers::{ContentHandler, ContentFetcherHandler, ContentValidator}; use endpoint::{Endpoint, EndpointPath, Handler}; +use apps::cache::{ContentCache, ContentStatus}; use apps::manifest::{MANIFEST_FILENAME, deserialize_manifest, serialize_manifest, Manifest}; use apps::urlhint::{URLHintContract, URLHint}; -enum AppStatus { - Fetching, - Ready(LocalPageEndpoint), -} +const MAX_CACHED_DAPPS: usize = 10; pub struct AppFetcher { dapps_path: PathBuf, resolver: R, - dapps: Arc>>, + dapps: Arc>, } impl Drop for AppFetcher { @@ -65,17 +63,17 @@ impl AppFetcher { AppFetcher { dapps_path: dapps_path, resolver: resolver, - dapps: Arc::new(Mutex::new(HashMap::new())), + dapps: Arc::new(Mutex::new(ContentCache::default())), } } #[cfg(test)] - fn set_status(&self, app_id: &str, status: AppStatus) { + fn set_status(&self, app_id: &str, status: ContentStatus) { self.dapps.lock().insert(app_id.to_owned(), status); } pub fn contains(&self, app_id: &str) -> bool { - let dapps = self.dapps.lock(); + let mut dapps = self.dapps.lock(); match dapps.get(app_id) { // Check if we already have the app Some(_) => true, @@ -95,11 +93,11 @@ impl AppFetcher { let status = dapps.get(&app_id); match status { // Just server dapp - Some(&AppStatus::Ready(ref endpoint)) => { + Some(&mut ContentStatus::Ready(ref endpoint)) => { (None, endpoint.to_handler(path)) }, // App is already being fetched - Some(&AppStatus::Fetching) => { + Some(&mut ContentStatus::Fetching(_)) => { (None, Box::new(ContentHandler::html( StatusCode::ServiceUnavailable, format!( @@ -111,11 +109,13 @@ impl AppFetcher { }, // We need to start fetching app None => { - // TODO [todr] Keep only last N dapps available! let app_hex = app_id.from_hex().expect("to_handler is called only when `contains` returns true."); let app = self.resolver.resolve(app_hex).expect("to_handler is called only when `contains` returns true."); - (Some(AppStatus::Fetching), Box::new(AppFetcherHandler::new( + let abort = Arc::new(AtomicBool::new(false)); + + (Some(ContentStatus::Fetching(abort.clone())), Box::new(ContentFetcherHandler::new( app, + abort, control, path.using_dapps_domains, DappInstaller { @@ -129,6 +129,7 @@ impl AppFetcher { }; if let Some(status) = new_status { + dapps.clear_garbage(MAX_CACHED_DAPPS); dapps.insert(app_id, status); } @@ -161,7 +162,7 @@ impl From for ValidationError { struct DappInstaller { dapp_id: String, dapps_path: PathBuf, - dapps: Arc>>, + dapps: Arc>, } impl DappInstaller { @@ -196,7 +197,7 @@ impl DappInstaller { } } -impl DappHandler for DappInstaller { +impl ContentValidator for DappInstaller { type Error = ValidationError; fn validate_and_install(&self, app_path: PathBuf) -> Result { @@ -262,7 +263,7 @@ impl DappHandler for DappInstaller { Some(manifest) => { let path = self.dapp_target_path(manifest); let app = LocalPageEndpoint::new(path, manifest.clone().into()); - dapps.insert(self.dapp_id.clone(), AppStatus::Ready(app)); + dapps.insert(self.dapp_id.clone(), ContentStatus::Ready(app)); }, // In case of error None => { @@ -274,12 +275,13 @@ impl DappHandler for DappInstaller { #[cfg(test)] mod tests { - use std::path::PathBuf; - use super::{AppFetcher, AppStatus}; - use apps::urlhint::{GithubApp, URLHint}; + use std::env; + use util::Bytes; use endpoint::EndpointInfo; use page::LocalPageEndpoint; - use util::Bytes; + use apps::cache::ContentStatus; + use apps::urlhint::{GithubApp, URLHint}; + use super::AppFetcher; struct FakeResolver; impl URLHint for FakeResolver { @@ -291,8 +293,9 @@ mod tests { #[test] fn should_true_if_contains_the_app() { // given + let path = env::temp_dir(); let fetcher = AppFetcher::new(FakeResolver); - let handler = LocalPageEndpoint::new(PathBuf::from("/tmp/test"), EndpointInfo { + let handler = LocalPageEndpoint::new(path, EndpointInfo { name: "fake".into(), description: "".into(), version: "".into(), @@ -301,8 +304,8 @@ mod tests { }); // when - fetcher.set_status("test", AppStatus::Ready(handler)); - fetcher.set_status("test2", AppStatus::Fetching); + fetcher.set_status("test", ContentStatus::Ready(handler)); + fetcher.set_status("test2", ContentStatus::Fetching(Default::default())); // then assert_eq!(fetcher.contains("test"), true); diff --git a/dapps/src/apps/mod.rs b/dapps/src/apps/mod.rs index 84a3c5ddf..65bee587d 100644 --- a/dapps/src/apps/mod.rs +++ b/dapps/src/apps/mod.rs @@ -19,6 +19,7 @@ use page::PageEndpoint; use proxypac::ProxyPac; use parity_dapps::WebApp; +mod cache; mod fs; pub mod urlhint; pub mod fetcher; diff --git a/dapps/src/handlers/client/fetch_file.rs b/dapps/src/handlers/client/fetch_file.rs index 27b8bbe8e..f11827ed8 100644 --- a/dapps/src/handlers/client/fetch_file.rs +++ b/dapps/src/handlers/client/fetch_file.rs @@ -18,7 +18,8 @@ use std::{env, io, fs, fmt}; use std::path::PathBuf; -use std::sync::mpsc; +use std::sync::{mpsc, Arc}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use random_filename; @@ -29,6 +30,7 @@ use hyper::{self, Decoder, Encoder, Next}; #[derive(Debug)] pub enum Error { + Aborted, NotStarted, UnexpectedStatus(StatusCode), IoError(io::Error), @@ -40,6 +42,7 @@ pub type OnDone = Box; pub struct Fetch { path: PathBuf, + abort: Arc, file: Option, result: Option, sender: mpsc::Sender, @@ -56,7 +59,7 @@ impl Drop for Fetch { fn drop(&mut self) { let res = self.result.take().unwrap_or(Err(Error::NotStarted)); // Remove file if there was an error - if res.is_err() { + if res.is_err() || self.is_aborted() { if let Some(file) = self.file.take() { drop(file); // Remove file @@ -72,12 +75,13 @@ impl Drop for Fetch { } impl Fetch { - pub fn new(sender: mpsc::Sender, on_done: OnDone) -> Self { + pub fn new(sender: mpsc::Sender, abort: Arc, on_done: OnDone) -> Self { let mut dir = env::temp_dir(); dir.push(random_filename()); Fetch { path: dir, + abort: abort, file: None, result: None, sender: sender, @@ -86,17 +90,36 @@ impl Fetch { } } +impl Fetch { + fn is_aborted(&self) -> bool { + self.abort.load(Ordering::Relaxed) + } + fn mark_aborted(&mut self) -> Next { + self.result = Some(Err(Error::Aborted)); + Next::end() + } +} + impl hyper::client::Handler for Fetch { fn on_request(&mut self, req: &mut Request) -> Next { + if self.is_aborted() { + return self.mark_aborted(); + } req.headers_mut().set(Connection::close()); read() } fn on_request_writable(&mut self, _encoder: &mut Encoder) -> Next { + if self.is_aborted() { + return self.mark_aborted(); + } read() } fn on_response(&mut self, res: Response) -> Next { + if self.is_aborted() { + return self.mark_aborted(); + } if *res.status() != StatusCode::Ok { self.result = Some(Err(Error::UnexpectedStatus(*res.status()))); return Next::end(); @@ -117,6 +140,9 @@ impl hyper::client::Handler for Fetch { } fn on_response_readable(&mut self, decoder: &mut Decoder) -> Next { + if self.is_aborted() { + return self.mark_aborted(); + } match io::copy(decoder, self.file.as_mut().expect("File is there because on_response has created it.")) { Ok(0) => Next::end(), Ok(_) => read(), diff --git a/dapps/src/handlers/fetch.rs b/dapps/src/handlers/fetch.rs index 94bce1492..d4919562a 100644 --- a/dapps/src/handlers/fetch.rs +++ b/dapps/src/handlers/fetch.rs @@ -18,7 +18,8 @@ use std::{fs, fmt}; use std::path::PathBuf; -use std::sync::mpsc; +use std::sync::{mpsc, Arc}; +use std::sync::atomic::AtomicBool; use std::time::{Instant, Duration}; use hyper::{header, server, Decoder, Encoder, Next, Method, Control, Client}; @@ -38,19 +39,20 @@ enum FetchState { Error(ContentHandler), InProgress { deadline: Instant, - receiver: mpsc::Receiver + receiver: mpsc::Receiver, }, Done(Manifest), } -pub trait DappHandler { +pub trait ContentValidator { type Error: fmt::Debug; fn validate_and_install(&self, app: PathBuf) -> Result; fn done(&self, Option<&Manifest>); } -pub struct AppFetcherHandler { +pub struct ContentFetcherHandler { + abort: Arc, control: Option, status: FetchState, client: Option>, @@ -58,7 +60,7 @@ pub struct AppFetcherHandler { dapp: H, } -impl Drop for AppFetcherHandler { +impl Drop for ContentFetcherHandler { fn drop(&mut self) { let manifest = match self.status { FetchState::Done(ref manifest) => Some(manifest), @@ -68,16 +70,18 @@ impl Drop for AppFetcherHandler { } } -impl AppFetcherHandler { +impl ContentFetcherHandler { pub fn new( app: GithubApp, + abort: Arc, control: Control, using_dapps_domains: bool, handler: H) -> Self { let client = Client::new().expect("Failed to create a Client"); - AppFetcherHandler { + ContentFetcherHandler { + abort: abort, control: Some(control), client: Some(client), status: FetchState::NotStarted(app), @@ -94,12 +98,12 @@ impl AppFetcherHandler { // TODO [todr] https support - fn fetch_app(client: &mut Client, app: &GithubApp, control: Control) -> Result, String> { + fn fetch_app(client: &mut Client, app: &GithubApp, abort: Arc, control: Control) -> Result, String> { let url = try!(app.url().parse().map_err(|e| format!("{:?}", e))); trace!(target: "dapps", "Fetching from: {:?}", url); let (tx, rx) = mpsc::channel(); - let res = client.request(url, Fetch::new(tx, Box::new(move || { + let res = client.request(url, Fetch::new(tx, abort, Box::new(move || { trace!(target: "dapps", "Fetching finished."); // Ignoring control errors let _ = control.ready(Next::read()); @@ -111,7 +115,7 @@ impl AppFetcherHandler { } } -impl server::Handler for AppFetcherHandler { +impl server::Handler for ContentFetcherHandler { fn on_request(&mut self, request: server::Request) -> Next { let status = if let FetchState::NotStarted(ref app) = self.status { Some(match *request.method() { @@ -120,7 +124,7 @@ impl server::Handler for AppFetcherHandler { trace!(target: "dapps", "Fetching dapp: {:?}", app); let control = self.control.take().expect("on_request is called only once, thus control is always Some"); let client = self.client.as_mut().expect("on_request is called before client is closed."); - let fetch = Self::fetch_app(client, app, control); + let fetch = Self::fetch_app(client, app, self.abort.clone(), control); match fetch { Ok(receiver) => FetchState::InProgress { deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT), diff --git a/dapps/src/handlers/mod.rs b/dapps/src/handlers/mod.rs index 85a8bd439..6f6423b58 100644 --- a/dapps/src/handlers/mod.rs +++ b/dapps/src/handlers/mod.rs @@ -27,7 +27,7 @@ pub use self::auth::AuthRequiredHandler; pub use self::echo::EchoHandler; pub use self::content::ContentHandler; pub use self::redirect::Redirection; -pub use self::fetch::{AppFetcherHandler, DappHandler}; +pub use self::fetch::{ContentFetcherHandler, ContentValidator}; use url::Url; use hyper::{server, header, net, uri}; diff --git a/dapps/src/lib.rs b/dapps/src/lib.rs index e50bc2006..a2c17a42c 100644 --- a/dapps/src/lib.rs +++ b/dapps/src/lib.rs @@ -60,6 +60,7 @@ extern crate rustc_serialize; extern crate parity_dapps; extern crate ethcore_rpc; extern crate ethcore_util as util; +extern crate linked_hash_map; mod endpoint; mod apps; diff --git a/dapps/src/page/local.rs b/dapps/src/page/local.rs index 52e32bf5e..dcfd9bed2 100644 --- a/dapps/src/page/local.rs +++ b/dapps/src/page/local.rs @@ -33,6 +33,10 @@ impl LocalPageEndpoint { info: info, } } + + pub fn path(&self) -> PathBuf { + self.path.clone() + } } impl Endpoint for LocalPageEndpoint {