diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 65e60d6eb..bf7f51d71 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -9,6 +9,25 @@ variables: cache: key: "$CI_BUILD_NAME/$CI_BUILD_REF_NAME" untracked: true +linux-stable: + stage: build + image: ethcore/rust:stable + only: + - master + - beta + - tags + - stable + script: + - export + - cargo build --release --verbose + - strip target/release/parity + tags: + - rust + - rust-stable + artifacts: + paths: + - target/release/parity + name: "${CI_BUILD_NAME}_parity" linux-beta: stage: build image: ethcore/rust:beta @@ -29,12 +48,6 @@ linux-beta: paths: - target/release/parity name: "${CI_BUILD_NAME}_parity" - stage: deploy - tags: - - rust - - rust-beta - script: - - ./deploy.sh linux-nightly: stage: build image: ethcore/rust:nightly @@ -84,6 +97,7 @@ linux-armv7: - stable script: - export + - export CXX="arm-linux-gnueabihf-g++" - rm -rf .cargo - mkdir -p .cargo - echo "[target.armv7-unknown-linux-gnueabihf]" >> .cargo/config @@ -98,6 +112,7 @@ linux-armv7: paths: - target/armv7-unknown-linux-gnueabihf/release/parity name: "${CI_BUILD_NAME}_parity" + allow_failure: true linux-arm: stage: build image: ethcore/rust-arm:latest @@ -108,11 +123,11 @@ linux-arm: - stable script: - export - - rm -rf .cargo - - mkdir -p .cargo - - echo "[target.arm-unknown-linux-gnueabihf]" >> .cargo/config - - echo "linker= \"arm-linux-gnueabihf-gcc\"" >> .cargo/config - - cat .cargo/config + #- rm -rf .cargo + #- mkdir -p .cargo + #- echo "[target.arm-unknown-linux-gnueabihf]" >> .cargo/config + #- echo "linker= \"arm-linux-gnueabihf-gcc\"" >> .cargo/config + #- cat .cargo/config - cargo build --target arm-unknown-linux-gnueabihf --release --verbose - arm-linux-gnueabihf-strip target/arm-unknown-linux-gnueabihf/release/parity tags: @@ -133,11 +148,11 @@ linux-armv6: - stable script: - export - - rm -rf .cargo - - mkdir -p .cargo - - echo "[target.arm-unknown-linux-gnueabi]" >> .cargo/config - - echo "linker= \"arm-linux-gnueabi-gcc\"" >> .cargo/config - - cat .cargo/config + #- rm -rf .cargo + #- mkdir -p .cargo + #- echo "[target.arm-unknown-linux-gnueabi]" >> .cargo/config + #- echo "linker= \"arm-linux-gnueabi-gcc\"" >> .cargo/config + #- cat .cargo/config - cargo build --target arm-unknown-linux-gnueabi --release --verbose - arm-linux-gnueabi-strip target/arm-unknown-linux-gnueabi/release/parity tags: @@ -158,11 +173,11 @@ linux-aarch64: - stable script: - export - - rm -rf .cargo - - mkdir -p .cargo - - echo "[target.aarch64-unknown-linux-gnu]" >> .cargo/config - - echo "linker= \"aarch64-linux-gnu-gcc\"" >> .cargo/config - - cat .cargo/config + #- rm -rf .cargo + #- mkdir -p .cargo + #- echo "[target.aarch64-unknown-linux-gnu]" >> .cargo/config + #- echo "linker= \"aarch64-linux-gnu-gcc\"" >> .cargo/config + #- cat .cargo/config - cargo build --target aarch64-unknown-linux-gnu --release --verbose - aarch64-linux-gnu-strip target/aarch64-unknown-linux-gnu/release/parity tags: @@ -208,30 +223,22 @@ windows: - target/release/parity.exe - target/release/parity.pdb name: "${CI_BUILD_NAME}_parity" -linux-stable: - stage: build - image: ethcore/rust:stable - only: - - master - - beta - - tags - - stable - script: - - export - - cargo build --release --verbose - - strip target/release/parity - tags: - - rust - - rust-stable - artifacts: - paths: - - target/release/parity - name: "${CI_BUILD_NAME}_parity" test-linux: stage: test before_script: - git submodule update --init --recursive script: - ./test.sh --verbose + tags: + - rust-test dependencies: - linux-stable +deploy-binaries: + stage: deploy + only: + - master + - beta + - tags + - stable + script: + - ./deploy.sh diff --git a/Cargo.lock b/Cargo.lock index 18a77833d..af791fe47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,7 @@ name = "ethcore-ipc-nano" version = "1.4.0" dependencies = [ "ethcore-ipc 1.4.0", + "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", ] @@ -570,6 +571,7 @@ name = "ethkey" version = "0.2.0" dependencies = [ "bigint 0.1.0", + "docopt 0.6.80 (registry+https://github.com/rust-lang/crates.io-index)", "eth-secp256k1 0.5.4 (git+https://github.com/ethcore/rust-secp256k1)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -581,6 +583,7 @@ dependencies = [ name = "ethstore" version = "0.1.0" dependencies = [ + "docopt 0.6.80 (registry+https://github.com/rust-lang/crates.io-index)", "ethcrypto 0.1.0", "ethkey 0.2.0", "itertools 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 6349c354d..476212b27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,8 @@ ipc = ["ethcore/ipc"] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethcore-dapps/dev", "ethcore-signer/dev"] json-tests = ["ethcore/json-tests"] stratum = ["ipc"] +ethkey-cli = ["ethcore/ethkey-cli"] +ethstore-cli = ["ethcore/ethstore-cli"] [[bin]] path = "parity/main.rs" diff --git a/dapps/Cargo.toml b/dapps/Cargo.toml index 531f7da1b..742d59bf2 100644 --- a/dapps/Cargo.toml +++ b/dapps/Cargo.toml @@ -39,7 +39,7 @@ clippy = { version = "0.0.85", optional = true} serde_codegen = { version = "0.8", optional = true } [features] -default = ["serde_codegen", "extra-dapps", "https-fetch/ca-github-only"] +default = ["serde_codegen", "extra-dapps"] extra-dapps = ["parity-dapps-wallet"] nightly = ["serde_macros"] dev = ["clippy", "ethcore-rpc/dev", "ethcore-util/dev"] diff --git a/dapps/src/apps/fetcher.rs b/dapps/src/apps/fetcher.rs index 502fbe4aa..8c0e7c421 100644 --- a/dapps/src/apps/fetcher.rs +++ b/dapps/src/apps/fetcher.rs @@ -38,65 +38,65 @@ use handlers::{ContentHandler, ContentFetcherHandler, ContentValidator}; use endpoint::{Endpoint, EndpointPath, Handler}; use apps::cache::{ContentCache, ContentStatus}; use apps::manifest::{MANIFEST_FILENAME, deserialize_manifest, serialize_manifest, Manifest}; -use apps::urlhint::{URLHintContract, URLHint}; +use apps::urlhint::{URLHintContract, URLHint, URLHintResult}; const MAX_CACHED_DAPPS: usize = 10; -pub struct AppFetcher { +pub struct ContentFetcher { dapps_path: PathBuf, resolver: R, + cache: Arc>, sync: Arc, - dapps: Arc>, } -impl Drop for AppFetcher { +impl Drop for ContentFetcher { fn drop(&mut self) { // Clear cache path let _ = fs::remove_dir_all(&self.dapps_path); } } -impl AppFetcher { +impl ContentFetcher { pub fn new(resolver: R, sync_status: Arc) -> Self { let mut dapps_path = env::temp_dir(); dapps_path.push(random_filename()); - AppFetcher { + ContentFetcher { dapps_path: dapps_path, resolver: resolver, sync: sync_status, - dapps: Arc::new(Mutex::new(ContentCache::default())), + cache: Arc::new(Mutex::new(ContentCache::default())), } } #[cfg(test)] - fn set_status(&self, app_id: &str, status: ContentStatus) { - self.dapps.lock().insert(app_id.to_owned(), status); + fn set_status(&self, content_id: &str, status: ContentStatus) { + self.cache.lock().insert(content_id.to_owned(), status); } - pub fn contains(&self, app_id: &str) -> bool { - let mut dapps = self.dapps.lock(); + pub fn contains(&self, content_id: &str) -> bool { + let mut cache = self.cache.lock(); // Check if we already have the app - if dapps.get(app_id).is_some() { + if cache.get(content_id).is_some() { return true; } // fallback to resolver - if let Ok(app_id) = app_id.from_hex() { + if let Ok(content_id) = content_id.from_hex() { // if app_id is valid, but we are syncing always return true. if self.sync.is_major_syncing() { return true; } // else try to resolve the app_id - self.resolver.resolve(app_id).is_some() + self.resolver.resolve(content_id).is_some() } else { false } } pub fn to_async_handler(&self, path: EndpointPath, control: hyper::Control) -> Box { - let mut dapps = self.dapps.lock(); - let app_id = path.app_id.clone(); + let mut cache = self.cache.lock(); + let content_id = path.app_id.clone(); if self.sync.is_major_syncing() { return Box::new(ContentHandler::error( @@ -108,7 +108,7 @@ impl AppFetcher { } let (new_status, handler) = { - let status = dapps.get(&app_id); + let status = cache.get(&content_id); match status { // Just server dapp Some(&mut ContentStatus::Ready(ref endpoint)) => { @@ -125,40 +125,57 @@ impl AppFetcher { }, // We need to start fetching app None => { - let app_hex = app_id.from_hex().expect("to_handler is called only when `contains` returns true."); - let app = self.resolver.resolve(app_hex); + let content_hex = content_id.from_hex().expect("to_handler is called only when `contains` returns true."); + let content = self.resolver.resolve(content_hex); + let abort = Arc::new(AtomicBool::new(false)); - if let Some(app) = app { - let abort = Arc::new(AtomicBool::new(false)); - - (Some(ContentStatus::Fetching(abort.clone())), Box::new(ContentFetcherHandler::new( - app, - abort, - control, - path.using_dapps_domains, - DappInstaller { - dapp_id: app_id.clone(), - dapps_path: self.dapps_path.clone(), - dapps: self.dapps.clone(), - } - )) as Box) - } else { - // This may happen when sync status changes in between - // `contains` and `to_handler` - (None, Box::new(ContentHandler::error( - StatusCode::NotFound, - "Resource Not Found", - "Requested resource was not found.", - None - )) as Box) + match content { + Some(URLHintResult::Dapp(dapp)) => ( + Some(ContentStatus::Fetching(abort.clone())), + Box::new(ContentFetcherHandler::new( + dapp.url(), + abort, + control, + path.using_dapps_domains, + DappInstaller { + id: content_id.clone(), + dapps_path: self.dapps_path.clone(), + cache: self.cache.clone(), + })) as Box + ), + Some(URLHintResult::Content(content)) => ( + Some(ContentStatus::Fetching(abort.clone())), + Box::new(ContentFetcherHandler::new( + content.url, + abort, + control, + path.using_dapps_domains, + ContentInstaller { + id: content_id.clone(), + mime: content.mime, + content_path: self.dapps_path.clone(), + cache: self.cache.clone(), + } + )) as Box, + ), + None => { + // This may happen when sync status changes in between + // `contains` and `to_handler` + (None, Box::new(ContentHandler::error( + StatusCode::NotFound, + "Resource Not Found", + "Requested resource was not found.", + None + )) as Box) + }, } }, } }; if let Some(status) = new_status { - dapps.clear_garbage(MAX_CACHED_DAPPS); - dapps.insert(app_id, status); + cache.clear_garbage(MAX_CACHED_DAPPS); + cache.insert(content_id, status); } handler @@ -169,7 +186,7 @@ impl AppFetcher { pub enum ValidationError { Io(io::Error), Zip(zip::result::ZipError), - InvalidDappId, + InvalidContentId, ManifestNotFound, ManifestSerialization(String), HashMismatch { expected: H256, got: H256, }, @@ -180,7 +197,7 @@ impl fmt::Display for ValidationError { match *self { ValidationError::Io(ref io) => write!(f, "Unexpected IO error occured: {:?}", io), ValidationError::Zip(ref zip) => write!(f, "Unable to read ZIP archive: {:?}", zip), - ValidationError::InvalidDappId => write!(f, "Dapp ID is invalid. It should be 32 bytes hash of content."), + ValidationError::InvalidContentId => write!(f, "ID is invalid. It should be 256 bits keccak hash of content."), ValidationError::ManifestNotFound => write!(f, "Downloaded Dapp bundle did not contain valid manifest.json file."), ValidationError::ManifestSerialization(ref err) => { write!(f, "There was an error during Dapp Manifest serialization: {:?}", err) @@ -204,10 +221,55 @@ impl From for ValidationError { } } +struct ContentInstaller { + id: String, + mime: String, + content_path: PathBuf, + cache: Arc>, +} + +impl ContentValidator for ContentInstaller { + type Error = ValidationError; + type Result = PathBuf; + + fn validate_and_install(&self, path: PathBuf) -> Result<(String, PathBuf), ValidationError> { + // Create dir + try!(fs::create_dir_all(&self.content_path)); + + // And prepare path for a file + let filename = path.file_name().expect("We always fetch a file."); + let mut content_path = self.content_path.clone(); + content_path.push(&filename); + + if content_path.exists() { + try!(fs::remove_dir_all(&content_path)) + } + + try!(fs::copy(&path, &content_path)); + + Ok((self.id.clone(), content_path)) + } + + fn done(&self, result: Option<&PathBuf>) { + let mut cache = self.cache.lock(); + match result { + Some(result) => { + let page = LocalPageEndpoint::single_file(result.clone(), self.mime.clone()); + cache.insert(self.id.clone(), ContentStatus::Ready(page)); + }, + // In case of error + None => { + cache.remove(&self.id); + }, + } + } +} + + struct DappInstaller { - dapp_id: String, + id: String, dapps_path: PathBuf, - dapps: Arc>, + cache: Arc>, } impl DappInstaller { @@ -244,15 +306,16 @@ impl DappInstaller { impl ContentValidator for DappInstaller { type Error = ValidationError; + type Result = Manifest; - fn validate_and_install(&self, app_path: PathBuf) -> Result { + fn validate_and_install(&self, app_path: PathBuf) -> Result<(String, Manifest), ValidationError> { trace!(target: "dapps", "Opening dapp bundle at {:?}", app_path); let mut file_reader = io::BufReader::new(try!(fs::File::open(app_path))); let hash = try!(sha3(&mut file_reader)); - let dapp_id = try!(self.dapp_id.as_str().parse().map_err(|_| ValidationError::InvalidDappId)); - if dapp_id != hash { + let id = try!(self.id.as_str().parse().map_err(|_| ValidationError::InvalidContentId)); + if id != hash { return Err(ValidationError::HashMismatch { - expected: dapp_id, + expected: id, got: hash, }); } @@ -262,7 +325,7 @@ impl ContentValidator for DappInstaller { // First find manifest file let (mut manifest, manifest_dir) = try!(Self::find_manifest(&mut zip)); // Overwrite id to match hash - manifest.id = self.dapp_id.clone(); + manifest.id = self.id.clone(); let target = self.dapp_target_path(&manifest); @@ -300,20 +363,20 @@ impl ContentValidator for DappInstaller { try!(manifest_file.write_all(manifest_str.as_bytes())); // Return modified app manifest - Ok(manifest) + Ok((manifest.id.clone(), manifest)) } fn done(&self, manifest: Option<&Manifest>) { - let mut dapps = self.dapps.lock(); + let mut cache = self.cache.lock(); match manifest { Some(manifest) => { let path = self.dapp_target_path(manifest); let app = LocalPageEndpoint::new(path, manifest.clone().into()); - dapps.insert(self.dapp_id.clone(), ContentStatus::Ready(app)); + cache.insert(self.id.clone(), ContentStatus::Ready(app)); }, // In case of error None => { - dapps.remove(&self.dapp_id); + cache.remove(&self.id); }, } } @@ -327,12 +390,12 @@ mod tests { use endpoint::EndpointInfo; use page::LocalPageEndpoint; use apps::cache::ContentStatus; - use apps::urlhint::{GithubApp, URLHint}; - use super::AppFetcher; + use apps::urlhint::{URLHint, URLHintResult}; + use super::ContentFetcher; struct FakeResolver; impl URLHint for FakeResolver { - fn resolve(&self, _app_id: Bytes) -> Option { + fn resolve(&self, _id: Bytes) -> Option { None } } @@ -341,7 +404,7 @@ mod tests { fn should_true_if_contains_the_app() { // given let path = env::temp_dir(); - let fetcher = AppFetcher::new(FakeResolver, Arc::new(|| false)); + let fetcher = ContentFetcher::new(FakeResolver, Arc::new(|| false)); let handler = LocalPageEndpoint::new(path, EndpointInfo { name: "fake".into(), description: "".into(), diff --git a/dapps/src/apps/urlhint.rs b/dapps/src/apps/urlhint.rs index f57e5e0d7..2b86c0777 100644 --- a/dapps/src/apps/urlhint.rs +++ b/dapps/src/apps/urlhint.rs @@ -17,6 +17,7 @@ use std::fmt; use std::sync::Arc; use rustc_serialize::hex::ToHex; +use mime_guess; use ethabi::{Interface, Contract, Token}; use util::{Address, Bytes, Hashable}; @@ -52,6 +53,13 @@ impl GithubApp { } } +#[derive(Debug, PartialEq)] +pub struct Content { + pub url: String, + pub mime: String, + pub owner: Address, +} + /// RAW Contract interface. /// Should execute transaction using current blockchain state. pub trait ContractClient: Send + Sync { @@ -61,10 +69,19 @@ pub trait ContractClient: Send + Sync { fn call(&self, address: Address, data: Bytes) -> Result; } +/// Result of resolving id to URL +#[derive(Debug, PartialEq)] +pub enum URLHintResult { + /// Dapp + Dapp(GithubApp), + /// Content + Content(Content), +} + /// URLHint Contract interface pub trait URLHint { /// Resolves given id to registrar entry. - fn resolve(&self, app_id: Bytes) -> Option; + fn resolve(&self, id: Bytes) -> Option; } pub struct URLHintContract { @@ -110,10 +127,10 @@ impl URLHintContract { } } - fn encode_urlhint_call(&self, app_id: Bytes) -> Option { + fn encode_urlhint_call(&self, id: Bytes) -> Option { let call = self.urlhint .function("entries".into()) - .and_then(|f| f.encode_call(vec![Token::FixedBytes(app_id)])); + .and_then(|f| f.encode_call(vec![Token::FixedBytes(id)])); match call { Ok(res) => { @@ -126,7 +143,7 @@ impl URLHintContract { } } - fn decode_urlhint_output(&self, output: Bytes) -> Option { + fn decode_urlhint_output(&self, output: Bytes) -> Option { trace!(target: "dapps", "Output: {:?}", output.to_hex()); let output = self.urlhint .function("entries".into()) @@ -149,6 +166,17 @@ impl URLHintContract { if owner == Address::default() { return None; } + + let commit = GithubApp::commit(&commit); + if commit == Some(Default::default()) { + let mime = guess_mime_type(&account_slash_repo).unwrap_or("application/octet-stream".into()); + return Some(URLHintResult::Content(Content { + url: account_slash_repo, + mime: mime, + owner: owner, + })); + } + let (account, repo) = { let mut it = account_slash_repo.split('/'); match (it.next(), it.next()) { @@ -157,12 +185,12 @@ impl URLHintContract { } }; - GithubApp::commit(&commit).map(|commit| GithubApp { + commit.map(|commit| URLHintResult::Dapp(GithubApp { account: account, repo: repo, commit: commit, owner: owner, - }) + })) }, e => { warn!(target: "dapps", "Invalid contract output parameters: {:?}", e); @@ -177,10 +205,10 @@ impl URLHintContract { } impl URLHint for URLHintContract { - fn resolve(&self, app_id: Bytes) -> Option { + fn resolve(&self, id: Bytes) -> Option { self.urlhint_address().and_then(|address| { // Prepare contract call - self.encode_urlhint_call(app_id) + self.encode_urlhint_call(id) .and_then(|data| { let call = self.client.call(address, data); if let Err(ref e) = call { @@ -193,6 +221,34 @@ impl URLHint for URLHintContract { } } +fn guess_mime_type(url: &str) -> Option { + const CONTENT_TYPE: &'static str = "content-type="; + + let mut it = url.split('#'); + // skip url + let url = it.next(); + // get meta headers + let metas = it.next(); + if let Some(metas) = metas { + for meta in metas.split('&') { + let meta = meta.to_lowercase(); + if meta.starts_with(CONTENT_TYPE) { + return Some(meta[CONTENT_TYPE.len()..].to_owned()); + } + } + } + url.and_then(|url| { + url.split('.').last() + }).and_then(|extension| { + mime_guess::get_mime_type_str(extension).map(Into::into) + }) +} + +#[cfg(test)] +pub fn test_guess_mime_type(url: &str) -> Option { + guess_mime_type(url) +} + fn as_string(e: T) -> String { format!("{:?}", e) } @@ -201,7 +257,7 @@ fn as_string(e: T) -> String { mod tests { use std::sync::Arc; use std::str::FromStr; - use rustc_serialize::hex::{ToHex, FromHex}; + use rustc_serialize::hex::FromHex; use super::*; use util::{Bytes, Address, Mutex, ToPretty}; @@ -279,12 +335,33 @@ mod tests { let res = urlhint.resolve("test".bytes().collect()); // then - assert_eq!(res, Some(GithubApp { + assert_eq!(res, Some(URLHintResult::Dapp(GithubApp { account: "ethcore".into(), repo: "dao.claim".into(), commit: GithubApp::commit(&"ec4c1fe06c808fe3739858c347109b1f5f1ed4b5".from_hex().unwrap()).unwrap(), owner: Address::from_str("deadcafebeefbeefcafedeaddeedfeedffffffff").unwrap(), - })) + }))) + } + + #[test] + fn should_decode_urlhint_content_output() { + // given + let mut registrar = FakeRegistrar::new(); + registrar.responses = Mutex::new(vec![ + Ok(format!("000000000000000000000000{}", URLHINT).from_hex().unwrap()), + Ok("00000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000deadcafebeefbeefcafedeaddeedfeedffffffff000000000000000000000000000000000000000000000000000000000000003d68747470733a2f2f657468636f72652e696f2f6173736574732f696d616765732f657468636f72652d626c61636b2d686f72697a6f6e74616c2e706e67000000".from_hex().unwrap()), + ]); + let urlhint = URLHintContract::new(Arc::new(registrar)); + + // when + let res = urlhint.resolve("test".bytes().collect()); + + // then + assert_eq!(res, Some(URLHintResult::Content(Content { + url: "https://ethcore.io/assets/images/ethcore-black-horizontal.png".into(), + mime: "image/png".into(), + owner: Address::from_str("deadcafebeefbeefcafedeaddeedfeedffffffff").unwrap(), + }))) } #[test] @@ -303,4 +380,20 @@ mod tests { // then assert_eq!(url, "https://codeload.github.com/test/xyz/zip/000102030405060708090a0b0c0d0e0f10111213".to_owned()); } + + #[test] + fn should_guess_mime_type_from_url() { + let url1 = "https://ethcore.io/parity"; + let url2 = "https://ethcore.io/parity#content-type=image/png"; + let url3 = "https://ethcore.io/parity#something&content-type=image/png"; + let url4 = "https://ethcore.io/parity.png#content-type=image/jpeg"; + let url5 = "https://ethcore.io/parity.png"; + + + assert_eq!(test_guess_mime_type(url1), None); + assert_eq!(test_guess_mime_type(url2), Some("image/png".into())); + assert_eq!(test_guess_mime_type(url3), Some("image/png".into())); + assert_eq!(test_guess_mime_type(url4), Some("image/jpeg".into())); + assert_eq!(test_guess_mime_type(url5), Some("image/png".into())); + } } diff --git a/dapps/src/handlers/client/mod.rs b/dapps/src/handlers/client/mod.rs index 181f60001..3d8551e8a 100644 --- a/dapps/src/handlers/client/mod.rs +++ b/dapps/src/handlers/client/mod.rs @@ -63,7 +63,7 @@ impl Client { self.https_client.close(); } - pub fn request(&mut self, url: String, abort: Arc, on_done: Box) -> Result, FetchError> { + pub fn request(&mut self, url: &str, abort: Arc, on_done: Box) -> Result, FetchError> { let is_https = url.starts_with("https://"); let url = try!(url.parse().map_err(|_| FetchError::InvalidUrl)); trace!(target: "dapps", "Fetching from: {:?}", url); diff --git a/dapps/src/handlers/fetch.rs b/dapps/src/handlers/fetch.rs index 98242f2b3..790bf4710 100644 --- a/dapps/src/handlers/fetch.rs +++ b/dapps/src/handlers/fetch.rs @@ -16,7 +16,7 @@ //! Hyper Server Handler that fetches a file during a request (proxy). -use std::fmt; +use std::{fs, fmt}; use std::path::PathBuf; use std::sync::{mpsc, Arc}; use std::sync::atomic::AtomicBool; @@ -29,51 +29,50 @@ use hyper::status::StatusCode; use handlers::ContentHandler; use handlers::client::{Client, FetchResult}; use apps::redirection_address; -use apps::urlhint::GithubApp; -use apps::manifest::Manifest; const FETCH_TIMEOUT: u64 = 30; -enum FetchState { - NotStarted(GithubApp), +enum FetchState { + NotStarted(String), Error(ContentHandler), InProgress { deadline: Instant, receiver: mpsc::Receiver, }, - Done(Manifest), + Done((String, T)), } pub trait ContentValidator { type Error: fmt::Debug + fmt::Display; + type Result: fmt::Debug; - fn validate_and_install(&self, app: PathBuf) -> Result; - fn done(&self, Option<&Manifest>); + fn validate_and_install(&self, app: PathBuf) -> Result<(String, Self::Result), Self::Error>; + fn done(&self, Option<&Self::Result>); } pub struct ContentFetcherHandler { abort: Arc, control: Option, - status: FetchState, + status: FetchState, client: Option, using_dapps_domains: bool, - dapp: H, + installer: H, } impl Drop for ContentFetcherHandler { fn drop(&mut self) { - let manifest = match self.status { - FetchState::Done(ref manifest) => Some(manifest), + let result = match self.status { + FetchState::Done((_, ref result)) => Some(result), _ => None, }; - self.dapp.done(manifest); + self.installer.done(result); } } impl ContentFetcherHandler { pub fn new( - app: GithubApp, + url: String, abort: Arc, control: Control, using_dapps_domains: bool, @@ -84,9 +83,9 @@ impl ContentFetcherHandler { abort: abort, control: Some(control), client: Some(client), - status: FetchState::NotStarted(app), + status: FetchState::NotStarted(url), using_dapps_domains: using_dapps_domains, - dapp: handler, + installer: handler, } } @@ -97,8 +96,8 @@ impl ContentFetcherHandler { } - fn fetch_app(client: &mut Client, app: &GithubApp, abort: Arc, control: Control) -> Result, String> { - client.request(app.url(), abort, Box::new(move || { + fn fetch_content(client: &mut Client, url: &str, abort: Arc, control: Control) -> Result, String> { + client.request(url, abort, Box::new(move || { trace!(target: "dapps", "Fetching finished."); // Ignoring control errors let _ = control.ready(Next::read()); @@ -108,14 +107,14 @@ impl ContentFetcherHandler { impl server::Handler for ContentFetcherHandler { fn on_request(&mut self, request: server::Request) -> Next { - let status = if let FetchState::NotStarted(ref app) = self.status { + let status = if let FetchState::NotStarted(ref url) = self.status { Some(match *request.method() { // Start fetching content Method::Get => { - trace!(target: "dapps", "Fetching dapp: {:?}", app); + trace!(target: "dapps", "Fetching content from: {:?}", url); let control = self.control.take().expect("on_request is called only once, thus control is always Some"); let client = self.client.as_mut().expect("on_request is called before client is closed."); - let fetch = Self::fetch_app(client, app, self.abort.clone(), control); + let fetch = Self::fetch_content(client, url, self.abort.clone(), control); match fetch { Ok(receiver) => FetchState::InProgress { deadline: Instant::now() + Duration::from_secs(FETCH_TIMEOUT), @@ -154,7 +153,7 @@ impl server::Handler for ContentFetcherHandler< let timeout = ContentHandler::error( StatusCode::GatewayTimeout, "Download Timeout", - &format!("Could not fetch dapp bundle within {} seconds.", FETCH_TIMEOUT), + &format!("Could not fetch content within {} seconds.", FETCH_TIMEOUT), None ); Self::close_client(&mut self.client); @@ -166,32 +165,31 @@ impl server::Handler for ContentFetcherHandler< match rec { // Unpack and validate Ok(Ok(path)) => { - trace!(target: "dapps", "Fetching dapp finished. Starting validation."); + trace!(target: "dapps", "Fetching content finished. Starting validation ({:?})", path); Self::close_client(&mut self.client); // Unpack and verify - let state = match self.dapp.validate_and_install(path.clone()) { + let state = match self.installer.validate_and_install(path.clone()) { Err(e) => { - trace!(target: "dapps", "Error while validating dapp: {:?}", e); + trace!(target: "dapps", "Error while validating content: {:?}", e); FetchState::Error(ContentHandler::error( StatusCode::BadGateway, "Invalid Dapp", - "Downloaded bundle does not contain a valid dapp.", + "Downloaded bundle does not contain a valid content.", Some(&format!("{:?}", e)) )) }, - Ok(manifest) => FetchState::Done(manifest) + Ok(result) => FetchState::Done(result) }; // Remove temporary zip file - // TODO [todr] Uncomment me - // let _ = fs::remove_file(path); + let _ = fs::remove_file(path); (Some(state), Next::write()) }, Ok(Err(e)) => { - warn!(target: "dapps", "Unable to fetch new dapp: {:?}", e); + warn!(target: "dapps", "Unable to fetch content: {:?}", e); let error = ContentHandler::error( StatusCode::BadGateway, "Download Error", - "There was an error when fetching the dapp.", + "There was an error when fetching the content.", Some(&format!("{:?}", e)), ); (Some(FetchState::Error(error)), Next::write()) @@ -213,10 +211,10 @@ impl server::Handler for ContentFetcherHandler< fn on_response(&mut self, res: &mut server::Response) -> Next { match self.status { - FetchState::Done(ref manifest) => { - trace!(target: "dapps", "Fetching dapp finished. Redirecting to {}", manifest.id); + FetchState::Done((ref id, _)) => { + trace!(target: "dapps", "Fetching content finished. Redirecting to {}", id); res.set_status(StatusCode::Found); - res.headers_mut().set(header::Location(redirection_address(self.using_dapps_domains, &manifest.id))); + res.headers_mut().set(header::Location(redirection_address(self.using_dapps_domains, id))); Next::write() }, FetchState::Error(ref mut handler) => handler.on_response(res), diff --git a/dapps/src/lib.rs b/dapps/src/lib.rs index 87563a3ae..4dcf53a44 100644 --- a/dapps/src/lib.rs +++ b/dapps/src/lib.rs @@ -191,7 +191,7 @@ impl Server { ) -> Result { let panic_handler = Arc::new(Mutex::new(None)); let authorization = Arc::new(authorization); - let apps_fetcher = Arc::new(apps::fetcher::AppFetcher::new(apps::urlhint::URLHintContract::new(registrar), sync_status)); + let content_fetcher = Arc::new(apps::fetcher::ContentFetcher::new(apps::urlhint::URLHintContract::new(registrar), sync_status)); let endpoints = Arc::new(apps::all_endpoints(dapps_path)); let special = Arc::new({ let mut special = HashMap::new(); @@ -206,7 +206,7 @@ impl Server { .handle(move |ctrl| router::Router::new( ctrl, apps::main_page(), - apps_fetcher.clone(), + content_fetcher.clone(), endpoints.clone(), special.clone(), authorization.clone(), diff --git a/dapps/src/page/local.rs b/dapps/src/page/local.rs index 86d4273d5..10b6f08b1 100644 --- a/dapps/src/page/local.rs +++ b/dapps/src/page/local.rs @@ -17,20 +17,30 @@ use mime_guess; use std::io::{Seek, Read, SeekFrom}; use std::fs; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use page::handler; use endpoint::{Endpoint, EndpointInfo, EndpointPath, Handler}; pub struct LocalPageEndpoint { path: PathBuf, - info: EndpointInfo, + mime: Option, + info: Option, } impl LocalPageEndpoint { pub fn new(path: PathBuf, info: EndpointInfo) -> Self { LocalPageEndpoint { path: path, - info: info, + mime: None, + info: Some(info), + } + } + + pub fn single_file(path: PathBuf, mime: String) -> Self { + LocalPageEndpoint { + path: path, + mime: Some(mime), + info: None, } } @@ -41,17 +51,40 @@ impl LocalPageEndpoint { impl Endpoint for LocalPageEndpoint { fn info(&self) -> Option<&EndpointInfo> { - Some(&self.info) + self.info.as_ref() } fn to_handler(&self, path: EndpointPath) -> Box { - Box::new(handler::PageHandler { - app: LocalDapp::new(self.path.clone()), - prefix: None, - path: path, - file: Default::default(), - safe_to_embed: false, - }) + if let Some(ref mime) = self.mime { + Box::new(handler::PageHandler { + app: LocalSingleFile { path: self.path.clone(), mime: mime.clone() }, + prefix: None, + path: path, + file: Default::default(), + safe_to_embed: false, + }) + } else { + Box::new(handler::PageHandler { + app: LocalDapp { path: self.path.clone() }, + prefix: None, + path: path, + file: Default::default(), + safe_to_embed: false, + }) + } + } +} + +struct LocalSingleFile { + path: PathBuf, + mime: String, +} + +impl handler::Dapp for LocalSingleFile { + type DappFile = LocalFile; + + fn file(&self, _path: &str) -> Option { + LocalFile::from_path(&self.path, Some(&self.mime)) } } @@ -59,14 +92,6 @@ struct LocalDapp { path: PathBuf, } -impl LocalDapp { - fn new(path: PathBuf) -> Self { - LocalDapp { - path: path - } - } -} - impl handler::Dapp for LocalDapp { type DappFile = LocalFile; @@ -75,18 +100,7 @@ impl handler::Dapp for LocalDapp { for part in file_path.split('/') { path.push(part); } - // Check if file exists - fs::File::open(path.clone()).ok().map(|file| { - let content_type = mime_guess::guess_mime_type(path); - let len = file.metadata().ok().map_or(0, |meta| meta.len()); - LocalFile { - content_type: content_type.to_string(), - buffer: [0; 4096], - file: file, - pos: 0, - len: len, - } - }) + LocalFile::from_path(&path, None) } } @@ -98,6 +112,24 @@ struct LocalFile { pos: u64, } +impl LocalFile { + fn from_path>(path: P, mime: Option<&str>) -> Option { + // Check if file exists + fs::File::open(&path).ok().map(|file| { + let content_type = mime.map(|mime| mime.to_owned()) + .unwrap_or_else(|| mime_guess::guess_mime_type(path).to_string()); + let len = file.metadata().ok().map_or(0, |meta| meta.len()); + LocalFile { + content_type: content_type, + buffer: [0; 4096], + file: file, + pos: 0, + len: len, + } + }) + } +} + impl handler::DappFile for LocalFile { fn content_type(&self) -> &str { &self.content_type diff --git a/dapps/src/router/mod.rs b/dapps/src/router/mod.rs index c93456d71..b908203d6 100644 --- a/dapps/src/router/mod.rs +++ b/dapps/src/router/mod.rs @@ -27,7 +27,7 @@ use url::{Url, Host}; use hyper::{self, server, Next, Encoder, Decoder, Control, StatusCode}; use hyper::net::HttpStream; use apps; -use apps::fetcher::AppFetcher; +use apps::fetcher::ContentFetcher; use endpoint::{Endpoint, Endpoints, EndpointPath}; use handlers::{Redirection, extract_url, ContentHandler}; use self::auth::{Authorization, Authorized}; @@ -45,7 +45,7 @@ pub struct Router { control: Option, main_page: &'static str, endpoints: Arc, - fetch: Arc, + fetch: Arc, special: Arc>>, authorization: Arc, allowed_hosts: Option>, @@ -136,7 +136,7 @@ impl Router { pub fn new( control: Control, main_page: &'static str, - app_fetcher: Arc, + content_fetcher: Arc, endpoints: Arc, special: Arc>>, authorization: Arc, @@ -148,7 +148,7 @@ impl Router { control: Some(control), main_page: main_page, endpoints: endpoints, - fetch: app_fetcher, + fetch: content_fetcher, special: special, authorization: authorization, allowed_hosts: allowed_hosts, diff --git a/dapps/src/tests/helpers.rs b/dapps/src/tests/helpers.rs index 4cd21520c..efbd24a8d 100644 --- a/dapps/src/tests/helpers.rs +++ b/dapps/src/tests/helpers.rs @@ -17,7 +17,7 @@ use std::env; use std::str; use std::sync::Arc; -use rustc_serialize::hex::{ToHex, FromHex}; +use rustc_serialize::hex::FromHex; use ServerBuilder; use Server; diff --git a/db/src/database.rs b/db/src/database.rs index 185618f99..9a52822f6 100644 --- a/db/src/database.rs +++ b/db/src/database.rs @@ -460,7 +460,7 @@ mod client_tests { crossbeam::scope(move |scope| { let stop = Arc::new(AtomicBool::new(false)); run_worker(scope, stop.clone(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); client.close().unwrap(); @@ -477,7 +477,7 @@ mod client_tests { crossbeam::scope(move |scope| { let stop = Arc::new(AtomicBool::new(false)); run_worker(scope, stop.clone(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); client.put("xxx".as_bytes(), "1".as_bytes()).unwrap(); @@ -498,7 +498,7 @@ mod client_tests { crossbeam::scope(move |scope| { let stop = Arc::new(AtomicBool::new(false)); run_worker(scope, stop.clone(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); assert!(client.get("xxx".as_bytes()).unwrap().is_none()); @@ -516,7 +516,7 @@ mod client_tests { crossbeam::scope(move |scope| { let stop = Arc::new(AtomicBool::new(false)); run_worker(scope, stop.clone(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); let transaction = DBTransaction::new(); @@ -541,7 +541,7 @@ mod client_tests { let stop = StopGuard::new(); run_worker(&scope, stop.share(), url); - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::generic_client::>(url).unwrap(); client.open_default(path.as_str().to_owned()).unwrap(); let mut batch = Vec::new(); diff --git a/db/src/lib.rs.in b/db/src/lib.rs.in index 4fa43b977..54fccb097 100644 --- a/db/src/lib.rs.in +++ b/db/src/lib.rs.in @@ -66,13 +66,13 @@ pub fn extras_service_url(db_path: &str) -> Result { pub fn blocks_client(db_path: &str) -> Result { let url = try!(blocks_service_url(db_path)); - let client = try!(nanoipc::init_client::>(&url)); + let client = try!(nanoipc::generic_client::>(&url)); Ok(client) } pub fn extras_client(db_path: &str) -> Result { let url = try!(extras_service_url(db_path)); - let client = try!(nanoipc::init_client::>(&url)); + let client = try!(nanoipc::generic_client::>(&url)); Ok(client) } diff --git a/docker/ubuntu-aarch64/Dockerfile b/docker/ubuntu-aarch64/Dockerfile index aae09f71c..1f4159a54 100644 --- a/docker/ubuntu-aarch64/Dockerfile +++ b/docker/ubuntu-aarch64/Dockerfile @@ -23,15 +23,9 @@ RUN rustup target add aarch64-unknown-linux-gnu # show backtraces ENV RUST_BACKTRACE 1 -# set compilers -ENV CXX aarch64-linux-gnu-g++ -ENV CC aarch64-linux-gnu-gcc - # show tools RUN rustc -vV && \ - cargo -V && \ - gcc -v &&\ - g++ -v + cargo -V # build parity RUN git clone https://github.com/ethcore/parity && \ diff --git a/docker/ubuntu-arm/Dockerfile b/docker/ubuntu-arm/Dockerfile index 54a54ad55..6c2fa2852 100644 --- a/docker/ubuntu-arm/Dockerfile +++ b/docker/ubuntu-arm/Dockerfile @@ -23,15 +23,10 @@ RUN rustup target add armv7-unknown-linux-gnueabihf # show backtraces ENV RUST_BACKTRACE 1 -# set compilers -ENV CXX arm-linux-gnueabihf-g++ -ENV CC arm-linux-gnueabihf-gcc # show tools RUN rustc -vV && \ - cargo -V && \ - gcc -v &&\ - g++ -v + cargo -V # build parity RUN git clone https://github.com/ethcore/parity && \ diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 8acba2266..fe6a682cb 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -51,3 +51,5 @@ dev = ["clippy"] default = [] benches = [] ipc = [] +ethkey-cli = ["ethkey/cli"] +ethstore-cli = ["ethstore/cli"] diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index dfb4f3ad3..b6333902b 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -161,13 +161,10 @@ impl Client { path: &Path, miner: Arc, message_channel: IoChannel, + db_config: &DatabaseConfig, ) -> Result, ClientError> { let path = path.to_path_buf(); let gb = spec.genesis_block(); - let mut db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); - db_config.cache_size = config.db_cache_size; - db_config.compaction = config.db_compaction.compaction_profile(); - db_config.wal = config.db_wal; let db = Arc::new(try!(Database::open(&db_config, &path.to_str().unwrap()).map_err(ClientError::Database))); let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone())); @@ -676,6 +673,8 @@ impl Client { impl snapshot::DatabaseRestore for Client { /// Restart the client with a new backend fn restore_db(&self, new_db: &str) -> Result<(), EthcoreError> { + trace!(target: "snapshot", "Replacing client database with {:?}", new_db); + let _import_lock = self.import_lock.lock(); let mut state_db = self.state_db.write(); let mut chain = self.chain.write(); diff --git a/ethcore/src/engines/basic_authority.rs b/ethcore/src/engines/basic_authority.rs index eef3df6b1..18dfeec46 100644 --- a/ethcore/src/engines/basic_authority.rs +++ b/ethcore/src/engines/basic_authority.rs @@ -187,7 +187,10 @@ mod tests { use spec::Spec; /// Create a new test chain spec with `BasicAuthority` consensus engine. - fn new_test_authority() -> Spec { Spec::load(include_bytes!("../../res/test_authority.json")) } + fn new_test_authority() -> Spec { + let bytes: &[u8] = include_bytes!("../../res/test_authority.json"); + Spec::load(bytes).expect("invalid chain spec") + } #[test] fn has_valid_metadata() { diff --git a/ethcore/src/engines/instant_seal.rs b/ethcore/src/engines/instant_seal.rs index bdb882ee7..3c95f3465 100644 --- a/ethcore/src/engines/instant_seal.rs +++ b/ethcore/src/engines/instant_seal.rs @@ -72,7 +72,10 @@ mod tests { use block::*; /// Create a new test chain spec with `BasicAuthority` consensus engine. - fn new_test_instant() -> Spec { Spec::load(include_bytes!("../../res/instant_seal.json")) } + fn new_test_instant() -> Spec { + let bytes: &[u8] = include_bytes!("../../res/instant_seal.json"); + Spec::load(bytes).expect("invalid chain spec") + } #[test] fn instant_can_seal() { diff --git a/ethcore/src/ethereum/mod.rs b/ethcore/src/ethereum/mod.rs index 1efe001e5..6d46d5551 100644 --- a/ethcore/src/ethereum/mod.rs +++ b/ethcore/src/ethereum/mod.rs @@ -29,29 +29,33 @@ pub use self::denominations::*; use super::spec::*; +fn load(b: &[u8]) -> Spec { + Spec::load(b).expect("chain spec is invalid") +} + /// Create a new Olympic chain spec. -pub fn new_olympic() -> Spec { Spec::load(include_bytes!("../../res/ethereum/olympic.json")) } +pub fn new_olympic() -> Spec { load(include_bytes!("../../res/ethereum/olympic.json")) } /// Create a new Frontier mainnet chain spec. -pub fn new_frontier() -> Spec { Spec::load(include_bytes!("../../res/ethereum/frontier.json")) } +pub fn new_frontier() -> Spec { load(include_bytes!("../../res/ethereum/frontier.json")) } /// Create a new Frontier mainnet chain spec without the DAO hardfork. -pub fn new_classic() -> Spec { Spec::load(include_bytes!("../../res/ethereum/classic.json")) } +pub fn new_classic() -> Spec { load(include_bytes!("../../res/ethereum/classic.json")) } /// Create a new Frontier chain spec as though it never changes to Homestead. -pub fn new_frontier_test() -> Spec { Spec::load(include_bytes!("../../res/ethereum/frontier_test.json")) } +pub fn new_frontier_test() -> Spec { load(include_bytes!("../../res/ethereum/frontier_test.json")) } /// Create a new Homestead chain spec as though it never changed from Frontier. -pub fn new_homestead_test() -> Spec { Spec::load(include_bytes!("../../res/ethereum/homestead_test.json")) } +pub fn new_homestead_test() -> Spec { load(include_bytes!("../../res/ethereum/homestead_test.json")) } /// Create a new Frontier/Homestead/DAO chain spec with transition points at #5 and #8. -pub fn new_daohardfork_test() -> Spec { Spec::load(include_bytes!("../../res/ethereum/daohardfork_test.json")) } +pub fn new_daohardfork_test() -> Spec { load(include_bytes!("../../res/ethereum/daohardfork_test.json")) } /// Create a new Frontier main net chain spec without genesis accounts. -pub fn new_mainnet_like() -> Spec { Spec::load(include_bytes!("../../res/ethereum/frontier_like_test.json")) } +pub fn new_mainnet_like() -> Spec { load(include_bytes!("../../res/ethereum/frontier_like_test.json")) } /// Create a new Morden chain spec. -pub fn new_morden() -> Spec { Spec::load(include_bytes!("../../res/ethereum/morden.json")) } +pub fn new_morden() -> Spec { load(include_bytes!("../../res/ethereum/morden.json")) } #[cfg(test)] mod tests { diff --git a/ethcore/src/json_tests/chain.rs b/ethcore/src/json_tests/chain.rs index 16161e158..93b0cf82c 100644 --- a/ethcore/src/json_tests/chain.rs +++ b/ethcore/src/json_tests/chain.rs @@ -58,12 +58,14 @@ pub fn json_chain_test(json_data: &[u8], era: ChainEra) -> Vec { let temp = RandomTempPath::new(); { + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); let client = Client::new( ClientConfig::default(), &spec, temp.as_path(), Arc::new(Miner::with_spec(&spec)), - IoChannel::disconnected() + IoChannel::disconnected(), + &db_config, ).unwrap(); for b in &blockchain.blocks_rlp() { if Block::is_good(&b) { diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 1f377d0ae..9fa126cc7 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -23,7 +23,7 @@ use error::*; use client::{Client, ClientConfig, ChainNotify}; use miner::Miner; use snapshot::ManifestData; -use snapshot::service::Service as SnapshotService; +use snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams}; use std::sync::atomic::AtomicBool; #[cfg(feature="ipc")] @@ -60,11 +60,12 @@ pub struct ClientService { } impl ClientService { - /// Start the service in a separate thread. + /// Start the `ClientService`. pub fn start( config: ClientConfig, spec: &Spec, - db_path: &Path, + client_path: &Path, + snapshot_path: &Path, ipc_path: &Path, miner: Arc, ) -> Result @@ -78,11 +79,24 @@ impl ClientService { warn!("Your chain is an alternative fork. {}", Colour::Red.bold().paint("TRANSACTIONS MAY BE REPLAYED ON THE MAINNET!")); } - let pruning = config.pruning; - let client = try!(Client::new(config, &spec, db_path, miner, io_service.channel())); - let snapshot = try!(SnapshotService::new(spec, pruning, db_path.into(), io_service.channel(), client.clone())); + let mut db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + db_config.cache_size = config.db_cache_size; + db_config.compaction = config.db_compaction.compaction_profile(); + db_config.wal = config.db_wal; - let snapshot = Arc::new(snapshot); + let pruning = config.pruning; + let client = try!(Client::new(config, &spec, client_path, miner, io_service.channel(), &db_config)); + + let snapshot_params = SnapServiceParams { + engine: spec.engine.clone(), + genesis_block: spec.genesis_block(), + db_config: db_config, + pruning: pruning, + channel: io_service.channel(), + snapshot_root: snapshot_path.into(), + db_restore: client.clone(), + }; + let snapshot = Arc::new(try!(SnapshotService::new(snapshot_params))); panic_handler.forward_from(&*client); let client_io = Arc::new(ClientIoHandler { @@ -172,7 +186,7 @@ impl IoHandler for ClientIoHandler { ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } ClientIoMessage::NewTransactions(ref transactions) => { self.client.import_queued_transactions(transactions); } ClientIoMessage::BeginRestoration(ref manifest) => { - if let Err(e) = self.snapshot.init_restore(manifest.clone()) { + if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) { warn!("Failed to initialize snapshot restoration: {}", e); } } @@ -232,15 +246,25 @@ mod tests { #[test] fn it_can_be_started() { let temp_path = RandomTempPath::new(); - let mut path = temp_path.as_path().to_owned(); - path.push("pruning"); - path.push("db"); + let path = temp_path.as_path().to_owned(); + let client_path = { + let mut path = path.to_owned(); + path.push("client"); + path + }; + + let snapshot_path = { + let mut path = path.to_owned(); + path.push("snapshot"); + path + }; let spec = get_test_spec(); let service = ClientService::start( ClientConfig::default(), &spec, - &path, + &client_path, + &snapshot_path, &path, Arc::new(Miner::with_spec(&spec)), ); diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index ce34a0def..30d5ab716 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -19,9 +19,9 @@ use std::collections::HashSet; use std::io::ErrorKind; use std::fs; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use super::{ManifestData, StateRebuilder, BlockRebuilder, RestorationStatus, SnapshotService}; use super::io::{SnapshotReader, LooseReader, SnapshotWriter, LooseWriter}; @@ -32,11 +32,10 @@ use engines::Engine; use error::Error; use ids::BlockID; use service::ClientIoMessage; -use spec::Spec; use io::IoChannel; -use util::{Bytes, H256, Mutex, RwLock, UtilError}; +use util::{Bytes, H256, Mutex, RwLock, RwLockReadGuard, UtilError}; use util::journaldb::Algorithm; use util::kvdb::{Database, DatabaseConfig}; use util::snappy; @@ -71,7 +70,7 @@ struct Restoration { block_chunks_left: HashSet, state: StateRebuilder, blocks: BlockRebuilder, - writer: LooseWriter, + writer: Option, snappy_buffer: Bytes, final_state_root: H256, guard: Guard, @@ -81,7 +80,8 @@ struct RestorationParams<'a> { manifest: ManifestData, // manifest to base restoration on. pruning: Algorithm, // pruning algorithm for the database. db_path: PathBuf, // database path - writer: LooseWriter, // writer for recovered snapshot. + db_config: &'a DatabaseConfig, // configuration for the database. + writer: Option, // writer for recovered snapshot. genesis: &'a [u8], // genesis block of the chain. guard: Guard, // guard for the restoration directory. } @@ -94,8 +94,7 @@ impl Restoration { let state_chunks = manifest.state_hashes.iter().cloned().collect(); let block_chunks = manifest.block_hashes.iter().cloned().collect(); - let cfg = DatabaseConfig::with_columns(::db::NUM_COLUMNS); - let raw_db = Arc::new(try!(Database::open(&cfg, &*params.db_path.to_string_lossy()) + let raw_db = Arc::new(try!(Database::open(params.db_config, &*params.db_path.to_string_lossy()) .map_err(UtilError::SimpleString))); let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone()); @@ -121,7 +120,10 @@ impl Restoration { let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); try!(self.state.feed(&self.snappy_buffer[..len])); - try!(self.writer.write_state_chunk(hash, chunk)); + + if let Some(ref mut writer) = self.writer.as_mut() { + try!(writer.write_state_chunk(hash, chunk)); + } } Ok(()) @@ -133,7 +135,9 @@ impl Restoration { let len = try!(snappy::decompress_into(chunk, &mut self.snappy_buffer)); try!(self.blocks.feed(&self.snappy_buffer[..len], engine)); - try!(self.writer.write_block_chunk(hash, chunk)); + if let Some(ref mut writer) = self.writer.as_mut() { + try!(writer.write_block_chunk(hash, chunk)); + } } Ok(()) @@ -158,7 +162,9 @@ impl Restoration { // connect out-of-order chunks. self.blocks.glue_chunks(); - try!(self.writer.finish(self.manifest)); + if let Some(writer) = self.writer { + try!(writer.finish(self.manifest)); + } self.guard.disarm(); Ok(()) @@ -173,15 +179,31 @@ impl Restoration { /// Type alias for client io channel. pub type Channel = IoChannel; -/// Service implementation. -/// -/// This will replace the client's state DB as soon as the last state chunk -/// is fed, and will replace the client's blocks DB when the last block chunk -/// is fed. +/// Snapshot service parameters. +pub struct ServiceParams { + /// The consensus engine this is built on. + pub engine: Arc, + /// The chain's genesis block. + pub genesis_block: Bytes, + /// Database configuration options. + pub db_config: DatabaseConfig, + /// State pruning algorithm. + pub pruning: Algorithm, + /// Async IO channel for sending messages. + pub channel: Channel, + /// The directory to put snapshots in. + /// Usually "/snapshot" + pub snapshot_root: PathBuf, + /// A handle for database restoration. + pub db_restore: Arc, +} + +/// `SnapshotService` implementation. +/// This controls taking snapshots and restoring from them. pub struct Service { restoration: Mutex>, - client_db: PathBuf, // "//db" - db_path: PathBuf, // "/" + snapshot_root: PathBuf, + db_config: DatabaseConfig, io_channel: Channel, pruning: Algorithm, status: Mutex, @@ -192,40 +214,31 @@ pub struct Service { block_chunks: AtomicUsize, db_restore: Arc, progress: super::Progress, + taking_snapshot: AtomicBool, } impl Service { - /// Create a new snapshot service. - pub fn new(spec: &Spec, pruning: Algorithm, client_db: PathBuf, io_channel: Channel, db_restore: Arc) -> Result { - let db_path = try!(client_db.parent().and_then(Path::parent) - .ok_or_else(|| UtilError::SimpleString("Failed to find database root.".into()))).to_owned(); - - let reader = { - let mut snapshot_path = db_path.clone(); - snapshot_path.push("snapshot"); - snapshot_path.push("current"); - - LooseReader::new(snapshot_path).ok() - }; - - let service = Service { + /// Create a new snapshot service from the given parameters. + pub fn new(params: ServiceParams) -> Result { + let mut service = Service { restoration: Mutex::new(None), - client_db: client_db, - db_path: db_path, - io_channel: io_channel, - pruning: pruning, + snapshot_root: params.snapshot_root, + db_config: params.db_config, + io_channel: params.channel, + pruning: params.pruning, status: Mutex::new(RestorationStatus::Inactive), - reader: RwLock::new(reader), - engine: spec.engine.clone(), - genesis_block: spec.genesis_block(), + reader: RwLock::new(None), + engine: params.engine, + genesis_block: params.genesis_block, state_chunks: AtomicUsize::new(0), block_chunks: AtomicUsize::new(0), - db_restore: db_restore, + db_restore: params.db_restore, progress: Default::default(), + taking_snapshot: AtomicBool::new(false), }; // create the root snapshot dir if it doesn't exist. - if let Err(e) = fs::create_dir_all(service.root_dir()) { + if let Err(e) = fs::create_dir_all(&service.snapshot_root) { if e.kind() != ErrorKind::AlreadyExists { return Err(e.into()) } @@ -245,33 +258,29 @@ impl Service { } } - Ok(service) - } + let reader = LooseReader::new(service.snapshot_dir()).ok(); + *service.reader.get_mut() = reader; - // get the root path. - fn root_dir(&self) -> PathBuf { - let mut dir = self.db_path.clone(); - dir.push("snapshot"); - dir + Ok(service) } // get the current snapshot dir. fn snapshot_dir(&self) -> PathBuf { - let mut dir = self.root_dir(); + let mut dir = self.snapshot_root.clone(); dir.push("current"); dir } // get the temporary snapshot dir. fn temp_snapshot_dir(&self) -> PathBuf { - let mut dir = self.root_dir(); + let mut dir = self.snapshot_root.clone(); dir.push("in_progress"); dir } // get the restoration directory. fn restoration_dir(&self) -> PathBuf { - let mut dir = self.root_dir(); + let mut dir = self.snapshot_root.clone(); dir.push("restoration"); dir } @@ -294,15 +303,19 @@ impl Service { fn replace_client_db(&self) -> Result<(), Error> { let our_db = self.restoration_db(); - trace!(target: "snapshot", "replacing {:?} with {:?}", self.client_db, our_db); - try!(self.db_restore.restore_db(our_db.to_str().unwrap())); + try!(self.db_restore.restore_db(&*our_db.to_string_lossy())); Ok(()) } + /// Get a reference to the snapshot reader. + pub fn reader(&self) -> RwLockReadGuard> { + self.reader.read() + } + /// Tick the snapshot service. This will log any active snapshot /// being taken. pub fn tick(&self) { - if self.progress.done() { return } + if self.progress.done() || !self.taking_snapshot.load(Ordering::SeqCst) { return } let p = &self.progress; info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size()); @@ -313,6 +326,11 @@ impl Service { /// will lead to a race condition where the first one to finish will /// have their produced snapshot overwritten. pub fn take_snapshot(&self, client: &Client, num: u64) -> Result<(), Error> { + if self.taking_snapshot.compare_and_swap(false, true, Ordering::SeqCst) { + info!("Skipping snapshot at #{} as another one is currently in-progress.", num); + return Ok(()); + } + info!("Taking snapshot at #{}", num); self.progress.reset(); @@ -324,7 +342,10 @@ impl Service { let writer = try!(LooseWriter::new(temp_dir.clone())); let guard = Guard::new(temp_dir.clone()); - try!(client.take_snapshot(writer, BlockID::Number(num), &self.progress)); + let res = client.take_snapshot(writer, BlockID::Number(num), &self.progress); + + self.taking_snapshot.store(false, Ordering::SeqCst); + try!(res); info!("Finished taking snapshot at #{}", num); @@ -342,11 +363,15 @@ impl Service { } /// Initialize the restoration synchronously. - pub fn init_restore(&self, manifest: ManifestData) -> Result<(), Error> { + /// The recover flag indicates whether to recover the restored snapshot. + pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> { let rest_dir = self.restoration_dir(); let mut res = self.restoration.lock(); + self.state_chunks.store(0, Ordering::SeqCst); + self.block_chunks.store(0, Ordering::SeqCst); + // tear down existing restoration. *res = None; @@ -361,12 +386,16 @@ impl Service { try!(fs::create_dir_all(&rest_dir)); // make new restoration. - let writer = try!(LooseWriter::new(self.temp_recovery_dir())); + let writer = match recover { + true => Some(try!(LooseWriter::new(self.temp_recovery_dir()))), + false => None + }; let params = RestorationParams { manifest: manifest, pruning: self.pruning, db_path: self.restoration_db(), + db_config: &self.db_config, writer: writer, genesis: &self.genesis_block, guard: Guard::new(rest_dir), @@ -375,8 +404,8 @@ impl Service { *res = Some(try!(Restoration::new(params))); *self.status.lock() = RestorationStatus::Ongoing { - state_chunks_done: self.state_chunks.load(Ordering::Relaxed) as u32, - block_chunks_done: self.block_chunks.load(Ordering::Relaxed) as u32, + state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, + block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, }; Ok(()) } @@ -387,35 +416,35 @@ impl Service { fn finalize_restoration(&self, rest: &mut Option) -> Result<(), Error> { trace!(target: "snapshot", "finalizing restoration"); - self.state_chunks.store(0, Ordering::SeqCst); - self.block_chunks.store(0, Ordering::SeqCst); + let recover = rest.as_ref().map_or(false, |rest| rest.writer.is_some()); // destroy the restoration before replacing databases and snapshot. try!(rest.take().map(Restoration::finalize).unwrap_or(Ok(()))); try!(self.replace_client_db()); - let mut reader = self.reader.write(); - *reader = None; // destroy the old reader if it existed. + if recover { + let mut reader = self.reader.write(); + *reader = None; // destroy the old reader if it existed. - let snapshot_dir = self.snapshot_dir(); + let snapshot_dir = self.snapshot_dir(); - trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy()); - if let Err(e) = fs::remove_dir_all(&snapshot_dir) { - match e.kind() { - ErrorKind::NotFound => {} - _ => return Err(e.into()), + trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy()); + if let Err(e) = fs::remove_dir_all(&snapshot_dir) { + match e.kind() { + ErrorKind::NotFound => {} + _ => return Err(e.into()), + } } + + try!(fs::create_dir(&snapshot_dir)); + + trace!(target: "snapshot", "copying restored snapshot files over"); + try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir)); + + *reader = Some(try!(LooseReader::new(snapshot_dir))); } - try!(fs::create_dir(&snapshot_dir)); - - trace!(target: "snapshot", "copying restored snapshot files over"); - try!(fs::rename(self.temp_recovery_dir(), &snapshot_dir)); - let _ = fs::remove_dir_all(self.restoration_dir()); - - *reader = Some(try!(LooseReader::new(snapshot_dir))); - *self.status.lock() = RestorationStatus::Inactive; Ok(()) @@ -496,7 +525,13 @@ impl SnapshotService for Service { } fn status(&self) -> RestorationStatus { - *self.status.lock() + let mut cur_status = self.status.lock(); + if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done } = *cur_status { + *state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32; + *block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32; + } + + cur_status.clone() } fn begin_restore(&self, manifest: ManifestData) { @@ -507,12 +542,6 @@ impl SnapshotService for Service { fn abort_restore(&self) { *self.restoration.lock() = None; *self.status.lock() = RestorationStatus::Inactive; - if let Err(e) = fs::remove_dir_all(&self.restoration_dir()) { - match e.kind() { - ErrorKind::NotFound => {}, - _ => warn!("encountered error {} while deleting snapshot restoration dir.", e), - } - } } fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { @@ -554,19 +583,25 @@ mod tests { #[test] fn sends_async_messages() { let service = IoService::::start().unwrap(); + let spec = get_test_spec(); let dir = RandomTempPath::new(); let mut dir = dir.as_path().to_owned(); - dir.push("pruning"); - dir.push("db"); + let mut client_db = dir.clone(); + dir.push("snapshot"); + client_db.push("client"); - let service = Service::new( - &get_test_spec(), - Algorithm::Archive, - dir, - service.channel(), - Arc::new(NoopDBRestore), - ).unwrap(); + let snapshot_params = ServiceParams { + engine: spec.engine.clone(), + genesis_block: spec.genesis_block(), + db_config: Default::default(), + pruning: Algorithm::Archive, + channel: service.channel(), + snapshot_root: dir, + db_restore: Arc::new(NoopDBRestore), + }; + + let service = Service::new(snapshot_params).unwrap(); assert!(service.manifest().is_none()); assert!(service.chunk(Default::default()).is_none()); diff --git a/ethcore/src/snapshot/tests/mod.rs b/ethcore/src/snapshot/tests/mod.rs index 84096bead..d9c0abc73 100644 --- a/ethcore/src/snapshot/tests/mod.rs +++ b/ethcore/src/snapshot/tests/mod.rs @@ -18,6 +18,7 @@ mod blocks; mod state; +mod service; pub mod helpers; diff --git a/ethcore/src/snapshot/tests/service.rs b/ethcore/src/snapshot/tests/service.rs new file mode 100644 index 000000000..e136985c6 --- /dev/null +++ b/ethcore/src/snapshot/tests/service.rs @@ -0,0 +1,143 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Tests for the snapshot service. + +use std::sync::Arc; + +use client::{BlockChainClient, Client}; +use ids::BlockID; +use snapshot::service::{Service, ServiceParams}; +use snapshot::{self, ManifestData, SnapshotService}; +use spec::Spec; +use tests::helpers::generate_dummy_client_with_spec_and_data; + +use devtools::RandomTempPath; +use io::IoChannel; +use util::kvdb::DatabaseConfig; + +struct NoopDBRestore; + +impl snapshot::DatabaseRestore for NoopDBRestore { + fn restore_db(&self, _new_db: &str) -> Result<(), ::error::Error> { + Ok(()) + } +} + +#[test] +fn restored_is_equivalent() { + const NUM_BLOCKS: u32 = 400; + const TX_PER: usize = 5; + + let gas_prices = vec![1.into(), 2.into(), 3.into(), 999.into()]; + + let client = generate_dummy_client_with_spec_and_data(Spec::new_null, NUM_BLOCKS, TX_PER, &gas_prices); + + let path = RandomTempPath::create_dir(); + let mut path = path.as_path().clone(); + let mut client_db = path.clone(); + + client_db.push("client_db"); + path.push("snapshot"); + + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + + let spec = Spec::new_null(); + let client2 = Client::new( + Default::default(), + &spec, + &client_db, + Arc::new(::miner::Miner::with_spec(&spec)), + IoChannel::disconnected(), + &db_config, + ).unwrap(); + + let service_params = ServiceParams { + engine: spec.engine.clone(), + genesis_block: spec.genesis_block(), + db_config: db_config, + pruning: ::util::journaldb::Algorithm::Archive, + channel: IoChannel::disconnected(), + snapshot_root: path, + db_restore: client2.clone(), + }; + + let service = Service::new(service_params).unwrap(); + service.take_snapshot(&client, NUM_BLOCKS as u64).unwrap(); + + let manifest = service.manifest().unwrap(); + + service.init_restore(manifest.clone(), true).unwrap(); + assert!(service.init_restore(manifest.clone(), true).is_ok()); + + for hash in manifest.state_hashes { + let chunk = service.chunk(hash).unwrap(); + service.feed_state_chunk(hash, &chunk); + } + + for hash in manifest.block_hashes { + let chunk = service.chunk(hash).unwrap(); + service.feed_block_chunk(hash, &chunk); + } + + assert_eq!(service.status(), ::snapshot::RestorationStatus::Inactive); + + for x in 0..NUM_BLOCKS { + let block1 = client.block(BlockID::Number(x as u64)).unwrap(); + let block2 = client2.block(BlockID::Number(x as u64)).unwrap(); + + assert_eq!(block1, block2); + } +} + +#[test] +fn guards_delete_folders() { + let spec = Spec::new_null(); + let path = RandomTempPath::create_dir(); + let mut path = path.as_path().clone(); + let service_params = ServiceParams { + engine: spec.engine.clone(), + genesis_block: spec.genesis_block(), + db_config: DatabaseConfig::with_columns(::db::NUM_COLUMNS), + pruning: ::util::journaldb::Algorithm::Archive, + channel: IoChannel::disconnected(), + snapshot_root: path.clone(), + db_restore: Arc::new(NoopDBRestore), + }; + + let service = Service::new(service_params).unwrap(); + path.push("restoration"); + + let manifest = ManifestData { + state_hashes: vec![], + block_hashes: vec![], + block_number: 0, + block_hash: Default::default(), + state_root: Default::default(), + }; + + service.init_restore(manifest.clone(), true).unwrap(); + assert!(path.exists()); + + service.abort_restore(); + assert!(!path.exists()); + + service.init_restore(manifest.clone(), true).unwrap(); + assert!(path.exists()); + + drop(service); + assert!(!path.exists()); +} \ No newline at end of file diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs index 8f9d3833b..65f47efc8 100644 --- a/ethcore/src/snapshot/watcher.rs +++ b/ethcore/src/snapshot/watcher.rs @@ -33,15 +33,22 @@ trait Oracle: Send + Sync { fn is_major_syncing(&self) -> bool; } -impl Oracle for Client { +struct StandardOracle where F: 'static + Send + Sync + Fn() -> bool { + client: Arc, + sync_status: F, +} + +impl Oracle for StandardOracle + where F: Send + Sync + Fn() -> bool +{ fn to_number(&self, hash: H256) -> Option { - self.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number()) + self.client.block_header(BlockID::Hash(hash)).map(|h| HeaderView::new(&h).number()) } fn is_major_syncing(&self) -> bool { - let queue_info = self.queue_info(); + let queue_info = self.client.queue_info(); - queue_info.unverified_queue_size + queue_info.verified_queue_size > 3 + (self.sync_status)() || queue_info.unverified_queue_size + queue_info.verified_queue_size > 3 } } @@ -68,7 +75,7 @@ impl Broadcast for IoChannel { /// A `ChainNotify` implementation which will trigger a snapshot event /// at certain block numbers. pub struct Watcher { - oracle: Arc, + oracle: Box, broadcast: Box, period: u64, history: u64, @@ -78,9 +85,14 @@ impl Watcher { /// Create a new `Watcher` which will trigger a snapshot event /// once every `period` blocks, but only after that block is /// `history` blocks old. - pub fn new(client: Arc, channel: IoChannel, period: u64, history: u64) -> Self { + pub fn new(client: Arc, sync_status: F, channel: IoChannel, period: u64, history: u64) -> Self + where F: 'static + Send + Sync + Fn() -> bool + { Watcher { - oracle: client, + oracle: Box::new(StandardOracle { + client: client, + sync_status: sync_status, + }), broadcast: Box::new(channel), period: period, history: history, @@ -125,7 +137,6 @@ mod tests { use util::{H256, U256}; use std::collections::HashMap; - use std::sync::Arc; struct TestOracle(HashMap); @@ -152,7 +163,7 @@ mod tests { let map = hashes.clone().into_iter().zip(numbers).collect(); let watcher = Watcher { - oracle: Arc::new(TestOracle(map)), + oracle: Box::new(TestOracle(map)), broadcast: Box::new(TestBroadcast(expected)), period: period, history: history, diff --git a/ethcore/src/spec/spec.rs b/ethcore/src/spec/spec.rs index d80ac0e33..58317e97b 100644 --- a/ethcore/src/spec/spec.rs +++ b/ethcore/src/spec/spec.rs @@ -244,18 +244,21 @@ impl Spec { } /// Loads spec from json file. - pub fn load(reader: &[u8]) -> Self { - From::from(ethjson::spec::Spec::load(reader).expect("invalid json file")) + pub fn load(reader: R) -> Result where R: Read { + match ethjson::spec::Spec::load(reader) { + Ok(spec) => Ok(spec.into()), + _ => Err("Spec json is invalid".into()), + } } /// Create a new Spec which conforms to the Frontier-era Morden chain except that it's a NullEngine consensus. - pub fn new_test() -> Spec { - Spec::load(include_bytes!("../../res/null_morden.json")) + pub fn new_test() -> Self { + Spec::load(include_bytes!("../../res/null_morden.json") as &[u8]).expect("null_morden.json is invalid") } /// Create a new Spec which is a NullEngine consensus with a premine of address whose secret is sha3(''). - pub fn new_null() -> Spec { - Spec::load(include_bytes!("../../res/null.json")) + pub fn new_null() -> Self { + Spec::load(include_bytes!("../../res/null.json") as &[u8]).expect("null.json is invalid") } } @@ -267,6 +270,12 @@ mod tests { use views::*; use super::*; + // https://github.com/ethcore/parity/issues/1840 + #[test] + fn test_load_empty() { + assert!(Spec::load(&vec![] as &[u8]).is_err()); + } + #[test] fn test_chain() { let test_spec = Spec::new_test(); diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 99aae1078..ff4e09dc9 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -28,7 +28,16 @@ use rlp::{Rlp, View}; fn imports_from_empty() { let dir = RandomTempPath::new(); let spec = get_test_spec(); - let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected()).unwrap(); + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + + let client = Client::new( + ClientConfig::default(), + &spec, + dir.as_path(), + Arc::new(Miner::with_spec(&spec)), + IoChannel::disconnected(), + &db_config + ).unwrap(); client.import_verified_blocks(); client.flush_queue(); } @@ -37,7 +46,16 @@ fn imports_from_empty() { fn should_return_registrar() { let dir = RandomTempPath::new(); let spec = ethereum::new_morden(); - let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected()).unwrap(); + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + + let client = Client::new( + ClientConfig::default(), + &spec, + dir.as_path(), + Arc::new(Miner::with_spec(&spec)), + IoChannel::disconnected(), + &db_config + ).unwrap(); assert_eq!(client.additional_params().get("registrar"), Some(&"8e4e9b13d4b45cb0befc93c3061b1408f67316b2".to_owned())); } @@ -55,7 +73,16 @@ fn returns_state_root_basic() { fn imports_good_block() { let dir = RandomTempPath::new(); let spec = get_test_spec(); - let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected()).unwrap(); + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + + let client = Client::new( + ClientConfig::default(), + &spec, + dir.as_path(), + Arc::new(Miner::with_spec(&spec)), + IoChannel::disconnected(), + &db_config + ).unwrap(); let good_block = get_good_dummy_block(); if let Err(_) = client.import_block(good_block) { panic!("error importing block being good by definition"); @@ -71,8 +98,16 @@ fn imports_good_block() { fn query_none_block() { let dir = RandomTempPath::new(); let spec = get_test_spec(); - let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), Arc::new(Miner::with_spec(&spec)), IoChannel::disconnected()).unwrap(); + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client = Client::new( + ClientConfig::default(), + &spec, + dir.as_path(), + Arc::new(Miner::with_spec(&spec)), + IoChannel::disconnected(), + &db_config + ).unwrap(); let non_existant = client.block_header(BlockID::Number(188)); assert!(non_existant.is_none()); } diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index e05c82c55..c1f99f434 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -133,9 +133,17 @@ pub fn generate_dummy_client_with_data(block_number: u32, txs_per_block: usize, pub fn generate_dummy_client_with_spec_and_data(get_test_spec: F, block_number: u32, txs_per_block: usize, tx_gas_prices: &[U256]) -> GuardedTempResult> where F: Fn()->Spec { let dir = RandomTempPath::new(); - let test_spec = get_test_spec(); - let client = Client::new(ClientConfig::default(), &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected()).unwrap(); + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + + let client = Client::new( + ClientConfig::default(), + &test_spec, + dir.as_path(), + Arc::new(Miner::with_spec(&test_spec)), + IoChannel::disconnected(), + &db_config + ).unwrap(); let test_engine = &*test_spec.engine; let mut db_result = get_temp_journal_db(); @@ -233,7 +241,17 @@ pub fn push_blocks_to_client(client: &Arc, timestamp_salt: u64, starting pub fn get_test_client_with_blocks(blocks: Vec) -> GuardedTempResult> { let dir = RandomTempPath::new(); let test_spec = get_test_spec(); - let client = Client::new(ClientConfig::default(), &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected()).unwrap(); + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + + let client = Client::new( + ClientConfig::default(), + &test_spec, + dir.as_path(), + Arc::new(Miner::with_spec(&test_spec)), + IoChannel::disconnected(), + &db_config + ).unwrap(); + for block in &blocks { if let Err(_) = client.import_block(block.clone()) { panic!("panic importing block which is well-formed"); diff --git a/ethcore/src/tests/rpc.rs b/ethcore/src/tests/rpc.rs index afd2cd6a7..d5d88c087 100644 --- a/ethcore/src/tests/rpc.rs +++ b/ethcore/src/tests/rpc.rs @@ -25,18 +25,23 @@ use devtools::*; use miner::Miner; use crossbeam; use io::IoChannel; +use util::kvdb::DatabaseConfig; pub fn run_test_worker(scope: &crossbeam::Scope, stop: Arc, socket_path: &str) { let socket_path = socket_path.to_owned(); scope.spawn(move || { let temp = RandomTempPath::create_dir(); let spec = get_test_spec(); + let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS); + let client = Client::new( ClientConfig::default(), &spec, temp.as_path(), Arc::new(Miner::with_spec(&spec)), - IoChannel::disconnected()).unwrap(); + IoChannel::disconnected(), + &db_config + ).unwrap(); let mut worker = nanoipc::Worker::new(&(client as Arc)); worker.add_reqrep(&socket_path).unwrap(); while !stop.load(Ordering::Relaxed) { @@ -51,7 +56,7 @@ fn can_handshake() { let stop_guard = StopGuard::new(); let socket_path = "ipc:///tmp/parity-client-rpc-10.ipc"; run_test_worker(scope, stop_guard.share(), socket_path); - let remote_client = nanoipc::init_client::>(socket_path).unwrap(); + let remote_client = nanoipc::generic_client::>(socket_path).unwrap(); assert!(remote_client.handshake().is_ok()); }) @@ -63,7 +68,7 @@ fn can_query_block() { let stop_guard = StopGuard::new(); let socket_path = "ipc:///tmp/parity-client-rpc-20.ipc"; run_test_worker(scope, stop_guard.share(), socket_path); - let remote_client = nanoipc::init_client::>(socket_path).unwrap(); + let remote_client = nanoipc::generic_client::>(socket_path).unwrap(); let non_existant_block = remote_client.block_header(BlockID::Number(999)); diff --git a/ipc/hypervisor/src/lib.rs b/ipc/hypervisor/src/lib.rs index 3cfd464e9..78b8b04ce 100644 --- a/ipc/hypervisor/src/lib.rs +++ b/ipc/hypervisor/src/lib.rs @@ -240,7 +240,7 @@ mod tests { ::std::thread::spawn(move || { while !hypervisor_ready.load(Ordering::Relaxed) { } - let client = nanoipc::init_client::>(url).unwrap(); + let client = nanoipc::fast_client::>(url).unwrap(); client.handshake().unwrap(); client.module_ready(test_module_id); }); diff --git a/ipc/hypervisor/src/service.rs.in b/ipc/hypervisor/src/service.rs.in index 938cea345..74d289f50 100644 --- a/ipc/hypervisor/src/service.rs.in +++ b/ipc/hypervisor/src/service.rs.in @@ -110,7 +110,7 @@ impl HypervisorService { let modules = self.modules.read().unwrap(); modules.get(&module_id).map(|module| { trace!(target: "hypervisor", "Sending shutdown to {}({})", module_id, &module.control_url); - let client = nanoipc::init_client::>(&module.control_url).unwrap(); + let client = nanoipc::fast_client::>(&module.control_url).unwrap(); client.shutdown(); trace!(target: "hypervisor", "Sent shutdown to {}", module_id); }); diff --git a/ipc/nano/Cargo.toml b/ipc/nano/Cargo.toml index ee399e60f..b358eb23a 100644 --- a/ipc/nano/Cargo.toml +++ b/ipc/nano/Cargo.toml @@ -10,4 +10,4 @@ license = "GPL-3.0" ethcore-ipc = { path = "../rpc" } nanomsg = { git = "https://github.com/ethcore/nanomsg.rs.git" } log = "0.3" - +lazy_static = "0.2" diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index da48151a6..1157e75d3 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -19,6 +19,7 @@ extern crate ethcore_ipc as ipc; extern crate nanomsg; #[macro_use] extern crate log; +#[macro_use] extern crate lazy_static; pub use ipc::{WithSocket, IpcInterface, IpcConfig}; pub use nanomsg::Socket as NanoSocket; @@ -28,7 +29,8 @@ use nanomsg::{Socket, Protocol, Error, Endpoint, PollRequest, PollFd, PollInOut} use std::ops::Deref; const POLL_TIMEOUT: isize = 200; -const CLIENT_CONNECTION_TIMEOUT: isize = 120000; +const DEFAULT_CONNECTION_TIMEOUT: isize = 30000; +const DEBUG_CONNECTION_TIMEOUT: isize = 5000; /// Generic worker to handle service (binded) sockets pub struct Worker where S: IpcInterface { @@ -68,7 +70,7 @@ pub fn init_duplex_client(socket_addr: &str) -> Result, Sock SocketError::DuplexLink })); - socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap(); + socket.set_receive_timeout(DEFAULT_CONNECTION_TIMEOUT).unwrap(); let endpoint = try!(socket.connect(socket_addr).map_err(|e| { warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); @@ -84,26 +86,58 @@ pub fn init_duplex_client(socket_addr: &str) -> Result, Sock /// Spawns client <`S`> over specified address /// creates socket and connects endpoint to it /// for request-reply connections to the service -pub fn init_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { +pub fn client(socket_addr: &str, receive_timeout: Option) -> Result, SocketError> where S: WithSocket { let mut socket = try!(Socket::new(Protocol::Req).map_err(|e| { warn!(target: "ipc", "Failed to create ipc socket: {:?}", e); SocketError::RequestLink })); - socket.set_receive_timeout(CLIENT_CONNECTION_TIMEOUT).unwrap(); + if let Some(timeout) = receive_timeout { + socket.set_receive_timeout(timeout).unwrap(); + } let endpoint = try!(socket.connect(socket_addr).map_err(|e| { warn!(target: "ipc", "Failed to bind socket to address '{}': {:?}", socket_addr, e); SocketError::RequestLink })); - trace!(target: "ipc", "Created cleint for {}", socket_addr); + trace!(target: "ipc", "Created client for {}", socket_addr); Ok(GuardedSocket { client: Arc::new(S::init(socket)), _endpoint: endpoint, }) } +lazy_static! { + /// Set PARITY_IPC_DEBUG=1 for fail-fast connectivity problems diagnostic + pub static ref DEBUG_FLAG: bool = { + use std::env; + + if let Ok(debug) = env::var("PARITY_IPC_DEBUG") { + debug == "1" || debug.to_uppercase() == "TRUE" + } + else { false } + }; +} + +/// Client with no default timeout on operations +pub fn generic_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { + if *DEBUG_FLAG { + client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT)) + } else { + client(socket_addr, None) + } +} + +/// Client over interface that is supposed to give quick almost non-blocking responses +pub fn fast_client(socket_addr: &str) -> Result, SocketError> where S: WithSocket { + if *DEBUG_FLAG { + client(socket_addr, Some(DEBUG_CONNECTION_TIMEOUT)) + } else { + client(socket_addr, Some(DEFAULT_CONNECTION_TIMEOUT)) + } +} + /// Error occurred while establising socket or endpoint #[derive(Debug)] pub enum SocketError { diff --git a/parity/blockchain.rs b/parity/blockchain.rs index 10e475fc4..05f2a6f4f 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -19,7 +19,6 @@ use std::{io, fs}; use std::io::{BufReader, BufRead}; use std::time::Duration; use std::thread::sleep; -use std::path::Path; use std::sync::Arc; use rustc_serialize::hex::FromHex; use ethcore_logger::{setup_log, Config as LogConfig}; @@ -32,6 +31,7 @@ use ethcore::error::ImportError; use ethcore::miner::Miner; use cache::CacheConfig; use informant::Informant; +use io_handler::ImportIoHandler; use params::{SpecType, Pruning}; use helpers::{to_client_config, execute_upgrades}; use dir::Directories; @@ -125,8 +125,9 @@ fn execute_import(cmd: ImportBlockchain) -> Result { // select pruning algorithm let algorithm = cmd.pruning.to_algorithm(&cmd.dirs, genesis_hash, spec.fork_name.as_ref()); - // prepare client_path + // prepare client and snapshot paths. let client_path = cmd.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm); + let snapshot_path = cmd.dirs.snapshot_path(genesis_hash, spec.fork_name.as_ref()); // execute upgrades try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile())); @@ -138,8 +139,9 @@ fn execute_import(cmd: ImportBlockchain) -> Result { let service = try!(ClientService::start( client_config, &spec, - Path::new(&client_path), - Path::new(&cmd.dirs.ipc_path()), + &client_path, + &snapshot_path, + &cmd.dirs.ipc_path(), Arc::new(Miner::with_spec(&spec)), ).map_err(|e| format!("Client service error: {:?}", e))); @@ -169,6 +171,10 @@ fn execute_import(cmd: ImportBlockchain) -> Result { let informant = Informant::new(client.clone(), None, None, cmd.logger_config.color); + try!(service.register_io_handler(Arc::new(ImportIoHandler { + info: Arc::new(informant), + })).map_err(|_| "Unable to register informant handler".to_owned())); + let do_import = |bytes| { while client.queue_info().is_full() { sleep(Duration::from_secs(1)); } match client.import_block(bytes) { @@ -180,7 +186,6 @@ fn execute_import(cmd: ImportBlockchain) -> Result { }, Ok(_) => {}, } - informant.tick(); Ok(()) }; @@ -237,8 +242,9 @@ fn execute_export(cmd: ExportBlockchain) -> Result { // select pruning algorithm let algorithm = cmd.pruning.to_algorithm(&cmd.dirs, genesis_hash, spec.fork_name.as_ref()); - // prepare client_path + // prepare client and snapshot paths. let client_path = cmd.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm); + let snapshot_path = cmd.dirs.snapshot_path(genesis_hash, spec.fork_name.as_ref()); // execute upgrades try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile())); @@ -249,8 +255,9 @@ fn execute_export(cmd: ExportBlockchain) -> Result { let service = try!(ClientService::start( client_config, &spec, - Path::new(&client_path), - Path::new(&cmd.dirs.ipc_path()), + &client_path, + &snapshot_path, + &cmd.dirs.ipc_path(), Arc::new(Miner::with_spec(&spec)), ).map_err(|e| format!("Client service error: {:?}", e))); @@ -263,10 +270,10 @@ fn execute_export(cmd: ExportBlockchain) -> Result { }; let from = try!(client.block_number(cmd.from_block).ok_or("From block could not be found")); - let to = try!(client.block_number(cmd.to_block).ok_or("From block could not be found")); + let to = try!(client.block_number(cmd.to_block).ok_or("To block could not be found")); for i in from..(to + 1) { - let b = client.block(BlockID::Number(i)).unwrap(); + let b = try!(client.block(BlockID::Number(i)).ok_or("Error exporting incomplete chain")); match format { DataFormat::Binary => { out.write(&b).expect("Couldn't write to stream."); } DataFormat::Hex => { out.write_fmt(format_args!("{}", b.pretty())).expect("Couldn't write to stream."); } diff --git a/parity/boot.rs b/parity/boot.rs index aa0e4b82b..0b0e6b670 100644 --- a/parity/boot.rs +++ b/parity/boot.rs @@ -63,7 +63,7 @@ pub fn payload() -> Result { } pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> GuardedSocket>{ - let hypervisor_client = nanoipc::init_client::>(hv_url).unwrap(); + let hypervisor_client = nanoipc::fast_client::>(hv_url).unwrap(); hypervisor_client.handshake().unwrap(); hypervisor_client.module_ready(module_id, control_url.to_owned()); @@ -73,7 +73,7 @@ pub fn register(hv_url: &str, control_url: &str, module_id: IpcModuleId) -> Guar pub fn dependency>(url: &str) -> Result, BootError> { - nanoipc::init_client::(url).map_err(|socket_err| BootError::DependencyConnect(socket_err)) + nanoipc::generic_client::(url).map_err(|socket_err| BootError::DependencyConnect(socket_err)) } pub fn main_thread() -> Arc { diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index 7d778d4f8..14673eb2b 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -13,7 +13,7 @@ Usage: parity export [ ] [options] parity signer new-token [options] parity snapshot [options] - parity restore [options] + parity restore [ ] [options] Operating Options: --mode MODE Set the operating mode. MODE can be one of: diff --git a/parity/dir.rs b/parity/dir.rs index f1f230163..d31e81e2c 100644 --- a/parity/dir.rs +++ b/parity/dir.rs @@ -52,10 +52,16 @@ impl Directories { Ok(()) } - /// Get the root path for database - pub fn db_version_path(&self, genesis_hash: H256, fork_name: Option<&String>, pruning: Algorithm) -> PathBuf { + /// Get the chain's root path. + pub fn chain_path(&self, genesis_hash: H256, fork_name: Option<&String>) -> PathBuf { let mut dir = Path::new(&self.db).to_path_buf(); dir.push(format!("{:?}{}", H64::from(genesis_hash), fork_name.map(|f| format!("-{}", f)).unwrap_or_default())); + dir + } + + /// Get the root path for database + pub fn db_version_path(&self, genesis_hash: H256, fork_name: Option<&String>, pruning: Algorithm) -> PathBuf { + let mut dir = self.chain_path(genesis_hash, fork_name); dir.push(format!("v{}-sec-{}", LEGACY_CLIENT_DB_VER_STR, pruning.as_internal_name_str())); dir } @@ -67,6 +73,13 @@ impl Directories { dir } + /// Get the path for the snapshot directory given the genesis hash and fork name. + pub fn snapshot_path(&self, genesis_hash: H256, fork_name: Option<&String>) -> PathBuf { + let mut dir = self.chain_path(genesis_hash, fork_name); + dir.push("snapshot"); + dir + } + /// Get the ipc sockets path pub fn ipc_path(&self) -> PathBuf { let mut dir = Path::new(&self.db).to_path_buf(); diff --git a/parity/io_handler.rs b/parity/io_handler.rs index d60f80f9a..bf73f55bb 100644 --- a/parity/io_handler.rs +++ b/parity/io_handler.rs @@ -15,6 +15,7 @@ // along with Parity. If not, see . use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use ethcore::client::Client; use ethcore::service::ClientIoMessage; use ethsync::{SyncProvider, ManageNetwork}; @@ -31,6 +32,7 @@ pub struct ClientIoHandler { pub net: Arc, pub accounts: Arc, pub info: Arc, + pub shutdown: Arc } impl IoHandler for ClientIoHandler { @@ -39,8 +41,24 @@ impl IoHandler for ClientIoHandler { } fn timeout(&self, _io: &IoContext, timer: TimerToken) { - if let INFO_TIMER = timer { + if timer == INFO_TIMER && !self.shutdown.load(Ordering::SeqCst) { self.info.tick(); } } } + +pub struct ImportIoHandler { + pub info: Arc, +} + +impl IoHandler for ImportIoHandler { + fn initialize(&self, io: &IoContext) { + io.register_timer(INFO_TIMER, 5000).expect("Error registering timer"); + } + + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + if let INFO_TIMER = timer { + self.info.tick() + } + } +} \ No newline at end of file diff --git a/parity/modules.rs b/parity/modules.rs index 73de6ca29..53cef4741 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -71,7 +71,7 @@ mod ipc_deps { pub use ethsync::{SyncClient, NetworkManagerClient, ServiceConfiguration}; pub use ethcore::client::ChainNotifyClient; pub use hypervisor::{SYNC_MODULE_ID, BootArgs, HYPERVISOR_IPC_URL}; - pub use nanoipc::{GuardedSocket, NanoSocket, init_client}; + pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client}; pub use ipc::IpcSocket; pub use ipc::binary::serialize; } @@ -134,11 +134,11 @@ pub fn sync hypervisor.start(); hypervisor.wait_for_startup(); - let sync_client = init_client::>( + let sync_client = generic_client::>( &service_urls::with_base(&hypervisor.io_path, service_urls::SYNC)).unwrap(); - let notify_client = init_client::>( + let notify_client = generic_client::>( &service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap(); - let manage_client = init_client::>( + let manage_client = generic_client::>( &service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap(); *hypervisor_ref = Some(hypervisor); diff --git a/parity/params.rs b/parity/params.rs index 54a680414..c67520aa1 100644 --- a/parity/params.rs +++ b/parity/params.rs @@ -17,7 +17,7 @@ use std::str::FromStr; use std::fs; use std::time::Duration; -use util::{contents, H256, Address, U256, version_data}; +use util::{H256, Address, U256, version_data}; use util::journaldb::Algorithm; use ethcore::spec::Spec; use ethcore::ethereum; @@ -61,7 +61,10 @@ impl SpecType { SpecType::Testnet => Ok(ethereum::new_morden()), SpecType::Olympic => Ok(ethereum::new_olympic()), SpecType::Classic => Ok(ethereum::new_classic()), - SpecType::Custom(ref file) => Ok(Spec::load(&try!(contents(file).map_err(|_| "Could not load specification file.")))) + SpecType::Custom(ref filename) => { + let file = try!(fs::File::open(filename).map_err(|_| "Could not load specification file.")); + Spec::load(file) + } } } } diff --git a/parity/run.rs b/parity/run.rs index aaae33b60..cefd8bb21 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -15,7 +15,6 @@ // along with Parity. If not, see . use std::sync::{Arc, Mutex, Condvar}; -use std::path::Path; use std::io::ErrorKind; use ctrlc::CtrlC; use fdlimit::raise_fd_limit; @@ -29,7 +28,7 @@ use ethcore::service::ClientService; use ethcore::account_provider::AccountProvider; use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions}; use ethcore::snapshot; -use ethsync::SyncConfig; +use ethsync::{SyncConfig, SyncProvider}; use informant::Informant; use rpc::{HttpServer, IpcServer, HttpConfiguration, IpcConfiguration}; @@ -51,7 +50,7 @@ use url; const SNAPSHOT_PERIOD: u64 = 10000; // how many blocks to wait before starting a periodic snapshot. -const SNAPSHOT_HISTORY: u64 = 1000; +const SNAPSHOT_HISTORY: u64 = 500; #[derive(Debug, PartialEq)] pub struct RunCmd { @@ -110,8 +109,9 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { // select pruning algorithm let algorithm = cmd.pruning.to_algorithm(&cmd.dirs, genesis_hash, fork_name.as_ref()); - // prepare client_path + // prepare client and snapshot paths. let client_path = cmd.dirs.client_path(genesis_hash, fork_name.as_ref(), algorithm); + let snapshot_path = cmd.dirs.snapshot_path(genesis_hash, fork_name.as_ref()); // execute upgrades try!(execute_upgrades(&cmd.dirs, genesis_hash, fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile())); @@ -171,14 +171,15 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { } // create supervisor - let mut hypervisor = modules::hypervisor(Path::new(&cmd.dirs.ipc_path())); + let mut hypervisor = modules::hypervisor(&cmd.dirs.ipc_path()); // create client service. let service = try!(ClientService::start( client_config, &spec, - Path::new(&client_path), - Path::new(&cmd.dirs.ipc_path()), + &client_path, + &snapshot_path, + &cmd.dirs.ipc_path(), miner.clone(), ).map_err(|e| format!("Client service error: {:?}", e))); @@ -256,15 +257,18 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { sync: sync_provider.clone(), net: manage_network.clone(), accounts: account_provider.clone(), + shutdown: Default::default(), }); - service.register_io_handler(io_handler).expect("Error registering IO handler"); + service.register_io_handler(io_handler.clone()).expect("Error registering IO handler"); // the watcher must be kept alive. let _watcher = match cmd.no_periodic_snapshot { true => None, false => { + let sync = sync_provider.clone(); let watcher = Arc::new(snapshot::Watcher::new( service.client(), + move || sync.status().is_major_syncing(), service.io().channel(), SNAPSHOT_PERIOD, SNAPSHOT_HISTORY, @@ -286,6 +290,11 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { // Handle exit wait_for_exit(panic_handler, http_server, ipc_server, dapps_server, signer_server); + // to make sure timer does not spawn requests while shutdown is in progress + io_handler.shutdown.store(true, ::std::sync::atomic::Ordering::SeqCst); + // just Arc is dropping here, to allow other reference release in its default time + drop(io_handler); + // hypervisor should be shutdown first while everything still works and can be // terminated gracefully drop(hypervisor); diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 5bf5024ae..73d06426f 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -21,8 +21,9 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use ethcore_logger::{setup_log, Config as LogConfig}; -use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService}; +use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService as SS}; use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter}; +use ethcore::snapshot::service::Service as SnapshotService; use ethcore::service::ClientService; use ethcore::client::{Mode, DatabaseCompactionProfile, Switch, VMType}; use ethcore::miner::Miner; @@ -62,6 +63,60 @@ pub struct SnapshotCommand { pub block_at: BlockID, } +// helper for reading chunks from arbitrary reader and feeding them into the +// service. +fn restore_using(snapshot: Arc, reader: &R, recover: bool) -> Result<(), String> { + let manifest = reader.manifest(); + + info!("Restoring to block #{} (0x{:?})", manifest.block_number, manifest.block_hash); + + try!(snapshot.init_restore(manifest.clone(), recover).map_err(|e| { + format!("Failed to begin restoration: {}", e) + })); + + let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len()); + + let informant_handle = snapshot.clone(); + ::std::thread::spawn(move || { + while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done } = informant_handle.status() { + info!("Processed {}/{} state chunks and {}/{} block chunks.", + state_chunks_done, num_state, block_chunks_done, num_blocks); + ::std::thread::sleep(Duration::from_secs(5)); + } + }); + + info!("Restoring state"); + for &state_hash in &manifest.state_hashes { + if snapshot.status() == RestorationStatus::Failed { + return Err("Restoration failed".into()); + } + + let chunk = try!(reader.chunk(state_hash) + .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", state_hash, e))); + snapshot.feed_state_chunk(state_hash, &chunk); + } + + info!("Restoring blocks"); + for &block_hash in &manifest.block_hashes { + if snapshot.status() == RestorationStatus::Failed { + return Err("Restoration failed".into()); + } + + let chunk = try!(reader.chunk(block_hash) + .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", block_hash, e))); + snapshot.feed_block_chunk(block_hash, &chunk); + } + + match snapshot.status() { + RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()), + RestorationStatus::Failed => Err("Snapshot restoration failed.".into()), + RestorationStatus::Inactive => { + info!("Restoration complete."); + Ok(()) + } + } +} + impl SnapshotCommand { // shared portion of snapshot commands: start the client service fn start_service(self) -> Result<(ClientService, Arc), String> { @@ -82,8 +137,9 @@ impl SnapshotCommand { // select pruning algorithm let algorithm = self.pruning.to_algorithm(&self.dirs, genesis_hash, spec.fork_name.as_ref()); - // prepare client_path + // prepare client and snapshot paths. let client_path = self.dirs.client_path(genesis_hash, spec.fork_name.as_ref(), algorithm); + let snapshot_path = self.dirs.snapshot_path(genesis_hash, spec.fork_name.as_ref()); // execute upgrades try!(execute_upgrades(&self.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, self.compaction.compaction_profile())); @@ -94,8 +150,9 @@ impl SnapshotCommand { let service = try!(ClientService::start( client_config, &spec, - Path::new(&client_path), - Path::new(&self.dirs.ipc_path()), + &client_path, + &snapshot_path, + &self.dirs.ipc_path(), Arc::new(Miner::with_spec(&spec)) ).map_err(|e| format!("Client service error: {:?}", e))); @@ -104,69 +161,35 @@ impl SnapshotCommand { /// restore from a snapshot pub fn restore(self) -> Result<(), String> { - let file = try!(self.file_path.clone().ok_or("No file path provided.".to_owned())); + let file = self.file_path.clone(); let (service, _panic_handler) = try!(self.start_service()); warn!("Snapshot restoration is experimental and the format may be subject to change."); warn!("On encountering an unexpected error, please ensure that you have a recent snapshot."); let snapshot = service.snapshot_service(); - let reader = PackedReader::new(Path::new(&file)) - .map_err(|e| format!("Couldn't open snapshot file: {}", e)) - .and_then(|x| x.ok_or("Snapshot file has invalid format.".into())); - let reader = try!(reader); - let manifest = reader.manifest(); + if let Some(file) = file { + info!("Attempting to restore from snapshot at '{}'", file); - // drop the client so we don't restore while it has open DB handles. - drop(service); + let reader = PackedReader::new(Path::new(&file)) + .map_err(|e| format!("Couldn't open snapshot file: {}", e)) + .and_then(|x| x.ok_or("Snapshot file has invalid format.".into())); - try!(snapshot.init_restore(manifest.clone()).map_err(|e| { - format!("Failed to begin restoration: {}", e) - })); + let reader = try!(reader); + try!(restore_using(snapshot, &reader, true)); + } else { + info!("Attempting to restore from local snapshot."); - let (num_state, num_blocks) = (manifest.state_hashes.len(), manifest.block_hashes.len()); - - let informant_handle = snapshot.clone(); - ::std::thread::spawn(move || { - while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done } = informant_handle.status() { - info!("Processed {}/{} state chunks and {}/{} block chunks.", - state_chunks_done, num_state, block_chunks_done, num_blocks); - - ::std::thread::sleep(Duration::from_secs(5)); - } - }); - - info!("Restoring state"); - for &state_hash in &manifest.state_hashes { - if snapshot.status() == RestorationStatus::Failed { - return Err("Restoration failed".into()); - } - - let chunk = try!(reader.chunk(state_hash) - .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", state_hash, e))); - snapshot.feed_state_chunk(state_hash, &chunk); - } - - info!("Restoring blocks"); - for &block_hash in &manifest.block_hashes { - if snapshot.status() == RestorationStatus::Failed { - return Err("Restoration failed".into()); - } - - let chunk = try!(reader.chunk(block_hash) - .map_err(|e| format!("Encountered error while reading chunk {:?}: {}", block_hash, e))); - snapshot.feed_block_chunk(block_hash, &chunk); - } - - match snapshot.status() { - RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()), - RestorationStatus::Failed => Err("Snapshot restoration failed.".into()), - RestorationStatus::Inactive => { - info!("Restoration complete."); - Ok(()) + // attempting restoration with recovery will lead to deadlock + // as we currently hold a read lock on the service's reader. + match *snapshot.reader() { + Some(ref reader) => try!(restore_using(snapshot.clone(), reader, false)), + None => return Err("No local snapshot found.".into()), } } + + Ok(()) } /// Take a snapshot from the head of the chain. diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index b7ad5b943..448fa4734 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -108,7 +108,16 @@ impl EthTester { let dir = RandomTempPath::new(); let account_provider = account_provider(); let miner_service = miner_service(&spec, account_provider.clone()); - let client = Client::new(ClientConfig::default(), &spec, dir.as_path(), miner_service.clone(), IoChannel::disconnected()).unwrap(); + + let db_config = ::util::kvdb::DatabaseConfig::with_columns(::ethcore::db::NUM_COLUMNS); + let client = Client::new( + ClientConfig::default(), + &spec, + dir.as_path(), + miner_service.clone(), + IoChannel::disconnected(), + &db_config + ).unwrap(); let sync_provider = sync_provider(); let external_miner = Arc::new(ExternalMiner::default()); @@ -286,7 +295,7 @@ const POSITIVE_NONCE_SPEC: &'static [u8] = br#"{ #[test] fn eth_transaction_count() { let secret = "8a283037bb19c4fed7b1c569e40c7dcff366165eb869110a1b11532963eb9cb2".into(); - let tester = EthTester::from_spec(Spec::load(TRANSACTION_COUNT_SPEC)); + let tester = EthTester::from_spec(Spec::load(TRANSACTION_COUNT_SPEC).expect("invalid chain spec")); let address = tester.accounts.insert_account(secret, "").unwrap(); tester.accounts.unlock_account_permanently(address, "".into()).unwrap(); @@ -412,7 +421,7 @@ fn verify_transaction_counts(name: String, chain: BlockChain) { #[test] fn starting_nonce_test() { - let tester = EthTester::from_spec(Spec::load(POSITIVE_NONCE_SPEC)); + let tester = EthTester::from_spec(Spec::load(POSITIVE_NONCE_SPEC).expect("invalid chain spec")); let address = Address::from(10); let sample = tester.handler.handle_request_sync(&(r#" diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 177df5fa0..3a89ae293 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -458,8 +458,6 @@ impl Database { let mut backup_db = PathBuf::from(&self.path); backup_db.pop(); backup_db.push("backup_db"); - println!("Path at {:?}", self.path); - println!("Backup at {:?}", backup_db); let existed = match fs::rename(&self.path, &backup_db) { Ok(_) => true, diff --git a/util/src/misc.rs b/util/src/misc.rs index 62e8542db..50b2e7e8d 100644 --- a/util/src/misc.rs +++ b/util/src/misc.rs @@ -16,7 +16,6 @@ //! Diff misc. -use std::fs::File; use common::*; use rlp::{Stream, RlpStream}; use target_info::Target; @@ -33,14 +32,6 @@ pub enum Filth { Dirty, } -/// Read the whole contents of a file `name`. -pub fn contents(name: &str) -> Result { - let mut file = try!(File::open(name)); - let mut ret: Vec = Vec::new(); - try!(file.read_to_end(&mut ret)); - Ok(ret) -} - /// Get the standard version string for this software. pub fn version() -> String { let sha3 = short_sha(); @@ -64,4 +55,4 @@ pub fn version_data() -> Bytes { s.append(&rustc_version()); s.append(&&Target::os()[0..2]); s.out() -} \ No newline at end of file +}