Replace reqwest with hyper (#8099)

* Very primitive test of the Client API

* [WIP] getting rid of request

* Add support for redirects.

* Remove CpuPool from `fetch::Client`.

* Adapt code to API changes and fix tests.

* Use reference counter to stop background thread.

On `clone` the counter is incremented, on `drop` decremented. Once 0 we
send `None` over the channel, expecting the background thread to end.

* Fix tests.

* Comment.

* Change expect messages.

* Use local test server for testing fetch client.

* Ensure max_size also in BodyReader.

* Replace `Condvar` with `sync_channel`.

* Re-export `url::Url` from `fetch` crate.

* Remove spaces.

* Use random ports in local test server.
This commit is contained in:
Toralf Wittner 2018-03-14 13:40:54 +01:00 committed by Fredrik Harrysson
parent 1bad20ae38
commit 322dfbcd78
29 changed files with 1051 additions and 753 deletions

531
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -19,9 +19,10 @@ use std::{fs, fmt};
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::path::PathBuf; use std::path::PathBuf;
use ethereum_types::H256; use ethereum_types::H256;
use fetch::{self, Mime}; use fetch;
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use hash::keccak_buffer; use hash::keccak_buffer;
use mime_guess::Mime;
use apps::manifest::{MANIFEST_FILENAME, deserialize_manifest, serialize_manifest, Manifest}; use apps::manifest::{MANIFEST_FILENAME, deserialize_manifest, serialize_manifest, Manifest};
use handlers::{ContentValidator, ValidatorResponse}; use handlers::{ContentValidator, ValidatorResponse};
@ -53,7 +54,7 @@ fn write_response_and_check_hash(
// Now write the response // Now write the response
let mut file = io::BufWriter::new(fs::File::create(&content_path)?); let mut file = io::BufWriter::new(fs::File::create(&content_path)?);
let mut reader = io::BufReader::new(response); let mut reader = io::BufReader::new(fetch::BodyReader::new(response));
io::copy(&mut reader, &mut file)?; io::copy(&mut reader, &mut file)?;
file.flush()?; file.flush()?;

View File

@ -216,6 +216,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
), ),
self.embeddable_on.clone(), self.embeddable_on.clone(),
self.fetch.clone(), self.fetch.clone(),
self.pool.clone(),
) )
}, },
URLHintResult::GithubDapp(content) => { URLHintResult::GithubDapp(content) => {
@ -232,6 +233,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
), ),
self.embeddable_on.clone(), self.embeddable_on.clone(),
self.fetch.clone(), self.fetch.clone(),
self.pool.clone(),
) )
}, },
URLHintResult::Content(content) => { URLHintResult::Content(content) => {
@ -248,6 +250,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
), ),
self.embeddable_on.clone(), self.embeddable_on.clone(),
self.fetch.clone(), self.fetch.clone(),
self.pool.clone(),
) )
}, },
}; };
@ -280,7 +283,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
mod tests { mod tests {
use std::env; use std::env;
use std::sync::Arc; use std::sync::Arc;
use fetch::{Fetch, Client}; use fetch::Client;
use futures::{future, Future}; use futures::{future, Future};
use hash_fetch::urlhint::{URLHint, URLHintResult}; use hash_fetch::urlhint::{URLHint, URLHintResult};
use ethereum_types::H256; use ethereum_types::H256;

View File

@ -81,7 +81,7 @@ pub fn all_endpoints<F: Fetch>(
insert::<parity_ui::old::App>(&mut pages, "v1", Embeddable::Yes(embeddable.clone()), pool.clone()); insert::<parity_ui::old::App>(&mut pages, "v1", Embeddable::Yes(embeddable.clone()), pool.clone());
pages.insert("proxy".into(), ProxyPac::boxed(embeddable.clone(), dapps_domain.to_owned())); pages.insert("proxy".into(), ProxyPac::boxed(embeddable.clone(), dapps_domain.to_owned()));
pages.insert(WEB_PATH.into(), Web::boxed(embeddable.clone(), web_proxy_tokens.clone(), fetch.clone())); pages.insert(WEB_PATH.into(), Web::boxed(embeddable.clone(), web_proxy_tokens.clone(), fetch.clone(), pool.clone()));
(local_endpoints, pages) (local_endpoints, pages)
} }

View File

@ -23,6 +23,7 @@ use std::time::{Instant, Duration};
use fetch::{self, Fetch}; use fetch::{self, Fetch};
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::{self, Future}; use futures::{self, Future};
use futures_cpupool::CpuPool;
use hyper::{self, Method, StatusCode}; use hyper::{self, Method, StatusCode};
use parking_lot::Mutex; use parking_lot::Mutex;
@ -35,7 +36,7 @@ const FETCH_TIMEOUT: u64 = 300;
pub enum ValidatorResponse { pub enum ValidatorResponse {
Local(local::Dapp), Local(local::Dapp),
Streaming(StreamingHandler<fetch::Response>), Streaming(StreamingHandler<fetch::BodyReader>),
} }
pub trait ContentValidator: Sized + Send + 'static { pub trait ContentValidator: Sized + Send + 'static {
@ -252,6 +253,7 @@ impl ContentFetcherHandler {
installer: H, installer: H,
embeddable_on: Embeddable, embeddable_on: Embeddable,
fetch: F, fetch: F,
pool: CpuPool,
) -> Self { ) -> Self {
let fetch_control = FetchControl::default(); let fetch_control = FetchControl::default();
let errors = Errors { embeddable_on }; let errors = Errors { embeddable_on };
@ -262,6 +264,7 @@ impl ContentFetcherHandler {
Method::Get => { Method::Get => {
trace!(target: "dapps", "Fetching content from: {:?}", url); trace!(target: "dapps", "Fetching content from: {:?}", url);
FetchState::InProgress(Self::fetch_content( FetchState::InProgress(Self::fetch_content(
pool,
fetch, fetch,
url, url,
fetch_control.abort.clone(), fetch_control.abort.clone(),
@ -282,6 +285,7 @@ impl ContentFetcherHandler {
} }
fn fetch_content<H: ContentValidator, F: Fetch>( fn fetch_content<H: ContentValidator, F: Fetch>(
pool: CpuPool,
fetch: F, fetch: F,
url: &str, url: &str,
abort: Arc<AtomicBool>, abort: Arc<AtomicBool>,
@ -290,8 +294,8 @@ impl ContentFetcherHandler {
installer: H, installer: H,
) -> Box<Future<Item=FetchState, Error=()> + Send> { ) -> Box<Future<Item=FetchState, Error=()> + Send> {
// Start fetching the content // Start fetching the content
let fetch2 = fetch.clone(); let pool2 = pool.clone();
let future = fetch.fetch_with_abort(url, abort.into()).then(move |result| { let future = fetch.fetch(url, abort.into()).then(move |result| {
trace!(target: "dapps", "Fetching content finished. Starting validation: {:?}", result); trace!(target: "dapps", "Fetching content finished. Starting validation: {:?}", result);
Ok(match result { Ok(match result {
Ok(response) => match installer.validate_and_install(response) { Ok(response) => match installer.validate_and_install(response) {
@ -303,7 +307,7 @@ impl ContentFetcherHandler {
Ok(ValidatorResponse::Streaming(stream)) => { Ok(ValidatorResponse::Streaming(stream)) => {
trace!(target: "dapps", "Validation OK. Streaming response."); trace!(target: "dapps", "Validation OK. Streaming response.");
let (reading, response) = stream.into_response(); let (reading, response) = stream.into_response();
fetch2.process_and_forget(reading); pool.spawn(reading).forget();
FetchState::Streaming(response) FetchState::Streaming(response)
}, },
Err(e) => { Err(e) => {
@ -319,7 +323,7 @@ impl ContentFetcherHandler {
}); });
// make sure to run within fetch thread pool. // make sure to run within fetch thread pool.
fetch.process(future) Box::new(pool2.spawn(future))
} }
} }

View File

@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{io, thread, time}; use std::{thread, time};
use std::sync::{atomic, mpsc, Arc}; use std::sync::{atomic, mpsc, Arc};
use parking_lot::Mutex; use parking_lot::Mutex;
use hyper;
use futures::{self, Future}; use futures::{self, Future};
use fetch::{self, Fetch}; use fetch::{self, Fetch, Url};
pub struct FetchControl { pub struct FetchControl {
sender: mpsc::Sender<()>, sender: mpsc::Sender<()>,
@ -96,11 +97,8 @@ impl FakeFetch {
impl Fetch for FakeFetch { impl Fetch for FakeFetch {
type Result = Box<Future<Item = fetch::Response, Error = fetch::Error> + Send>; type Result = Box<Future<Item = fetch::Response, Error = fetch::Error> + Send>;
fn new() -> Result<Self, fetch::Error> where Self: Sized { fn fetch(&self, url: &str, abort: fetch::Abort) -> Self::Result {
Ok(FakeFetch::default()) let u = Url::parse(url).unwrap();
}
fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result {
self.requested.lock().push(url.into()); self.requested.lock().push(url.into());
let manual = self.manual.clone(); let manual = self.manual.clone();
let response = self.response.clone(); let response = self.response.clone();
@ -111,23 +109,10 @@ impl Fetch for FakeFetch {
// wait for manual resume // wait for manual resume
let _ = rx.recv(); let _ = rx.recv();
} }
let data = response.lock().take().unwrap_or(b"Some content"); let data = response.lock().take().unwrap_or(b"Some content");
let cursor = io::Cursor::new(data); tx.send(fetch::Response::new(u, hyper::Response::new().with_body(data), abort)).unwrap();
tx.send(fetch::Response::from_reader(cursor)).unwrap();
}); });
Box::new(rx.map_err(|_| fetch::Error::Aborted)) Box::new(rx.map_err(|_| fetch::Error::Aborted))
} }
fn process_and_forget<F, I, E>(&self, f: F) where
F: Future<Item=I, Error=E> + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
// Spawn the task in a separate thread.
thread::spawn(|| {
let _ = f.wait();
});
}
} }

View File

@ -63,7 +63,7 @@ pub fn init_server<F, B>(process: F, io: IoHandler) -> (Server, Arc<FakeRegistra
let mut dapps_path = env::temp_dir(); let mut dapps_path = env::temp_dir();
dapps_path.push("non-existent-dir-to-prevent-fs-files-from-loading"); dapps_path.push("non-existent-dir-to-prevent-fs-files-from-loading");
let mut builder = ServerBuilder::new(&dapps_path, registrar.clone()); let mut builder = ServerBuilder::new(FetchClient::new().unwrap(), &dapps_path, registrar.clone());
builder.signer_address = Some(("127.0.0.1".into(), SIGNER_PORT)); builder.signer_address = Some(("127.0.0.1".into(), SIGNER_PORT));
let server = process(builder).start_unsecured_http(&"127.0.0.1:0".parse().unwrap(), io).unwrap(); let server = process(builder).start_unsecured_http(&"127.0.0.1:0".parse().unwrap(), io).unwrap();
( (
@ -149,13 +149,13 @@ pub struct ServerBuilder<T: Fetch = FetchClient> {
web_proxy_tokens: Arc<WebProxyTokens>, web_proxy_tokens: Arc<WebProxyTokens>,
signer_address: Option<(String, u16)>, signer_address: Option<(String, u16)>,
allowed_hosts: DomainsValidation<Host>, allowed_hosts: DomainsValidation<Host>,
fetch: Option<T>, fetch: T,
serve_ui: bool, serve_ui: bool,
} }
impl ServerBuilder { impl ServerBuilder {
/// Construct new dapps server /// Construct new dapps server
pub fn new<P: AsRef<Path>>(dapps_path: P, registrar: Arc<RegistrarClient<Call=Asynchronous>>) -> Self { pub fn new<P: AsRef<Path>>(fetch: FetchClient, dapps_path: P, registrar: Arc<RegistrarClient<Call=Asynchronous>>) -> Self {
ServerBuilder { ServerBuilder {
dapps_path: dapps_path.as_ref().to_owned(), dapps_path: dapps_path.as_ref().to_owned(),
registrar: registrar, registrar: registrar,
@ -163,7 +163,7 @@ impl ServerBuilder {
web_proxy_tokens: Arc::new(|_| None), web_proxy_tokens: Arc::new(|_| None),
signer_address: None, signer_address: None,
allowed_hosts: DomainsValidation::Disabled, allowed_hosts: DomainsValidation::Disabled,
fetch: None, fetch: fetch,
serve_ui: false, serve_ui: false,
} }
} }
@ -179,7 +179,7 @@ impl<T: Fetch> ServerBuilder<T> {
web_proxy_tokens: self.web_proxy_tokens, web_proxy_tokens: self.web_proxy_tokens,
signer_address: self.signer_address, signer_address: self.signer_address,
allowed_hosts: self.allowed_hosts, allowed_hosts: self.allowed_hosts,
fetch: Some(fetch), fetch: fetch,
serve_ui: self.serve_ui, serve_ui: self.serve_ui,
} }
} }
@ -187,7 +187,6 @@ impl<T: Fetch> ServerBuilder<T> {
/// Asynchronously start server with no authentication, /// Asynchronously start server with no authentication,
/// returns result with `Server` handle on success or an error. /// returns result with `Server` handle on success or an error.
pub fn start_unsecured_http(self, addr: &SocketAddr, io: IoHandler) -> io::Result<Server> { pub fn start_unsecured_http(self, addr: &SocketAddr, io: IoHandler) -> io::Result<Server> {
let fetch = self.fetch_client();
Server::start_http( Server::start_http(
addr, addr,
io, io,
@ -199,17 +198,10 @@ impl<T: Fetch> ServerBuilder<T> {
self.sync_status, self.sync_status,
self.web_proxy_tokens, self.web_proxy_tokens,
Remote::new_sync(), Remote::new_sync(),
fetch, self.fetch,
self.serve_ui, self.serve_ui,
) )
} }
fn fetch_client(&self) -> T {
match self.fetch.clone() {
Some(fetch) => fetch,
None => T::new().unwrap(),
}
}
} }
const DAPPS_DOMAIN: &'static str = "web3.site"; const DAPPS_DOMAIN: &'static str = "web3.site";

View File

@ -25,6 +25,7 @@ use hyper::{mime, StatusCode};
use apps; use apps;
use endpoint::{Endpoint, EndpointPath, Request, Response}; use endpoint::{Endpoint, EndpointPath, Request, Response};
use futures::future; use futures::future;
use futures_cpupool::CpuPool;
use handlers::{ use handlers::{
ContentFetcherHandler, ContentHandler, ContentValidator, ValidatorResponse, ContentFetcherHandler, ContentHandler, ContentValidator, ValidatorResponse,
StreamingHandler, StreamingHandler,
@ -35,6 +36,7 @@ pub struct Web<F> {
embeddable_on: Embeddable, embeddable_on: Embeddable,
web_proxy_tokens: Arc<WebProxyTokens>, web_proxy_tokens: Arc<WebProxyTokens>,
fetch: F, fetch: F,
pool: CpuPool,
} }
impl<F: Fetch> Web<F> { impl<F: Fetch> Web<F> {
@ -42,11 +44,13 @@ impl<F: Fetch> Web<F> {
embeddable_on: Embeddable, embeddable_on: Embeddable,
web_proxy_tokens: Arc<WebProxyTokens>, web_proxy_tokens: Arc<WebProxyTokens>,
fetch: F, fetch: F,
pool: CpuPool,
) -> Box<Endpoint> { ) -> Box<Endpoint> {
Box::new(Web { Box::new(Web {
embeddable_on, embeddable_on,
web_proxy_tokens, web_proxy_tokens,
fetch, fetch,
pool,
}) })
} }
@ -129,6 +133,7 @@ impl<F: Fetch> Endpoint for Web<F> {
}, },
self.embeddable_on.clone(), self.embeddable_on.clone(),
self.fetch.clone(), self.fetch.clone(),
self.pool.clone(),
)) ))
} }
} }
@ -146,7 +151,7 @@ impl ContentValidator for WebInstaller {
let is_html = response.is_html(); let is_html = response.is_html();
let mime = response.content_type().unwrap_or(mime::TEXT_HTML); let mime = response.content_type().unwrap_or(mime::TEXT_HTML);
let mut handler = StreamingHandler::new( let mut handler = StreamingHandler::new(
response, fetch::BodyReader::new(response),
status, status,
mime, mime,
self.embeddable_on, self.embeddable_on,

View File

@ -33,6 +33,7 @@ ethjson = { path = "../json" }
ethkey = { path = "../ethkey" } ethkey = { path = "../ethkey" }
ethstore = { path = "../ethstore" } ethstore = { path = "../ethstore" }
evm = { path = "evm" } evm = { path = "evm" }
futures-cpupool = "0.1"
hardware-wallet = { path = "../hw" } hardware-wallet = { path = "../hw" }
heapsize = "0.4" heapsize = "0.4"
itertools = "0.5" itertools = "0.5"

View File

@ -71,6 +71,7 @@ extern crate ethcore_transaction as transaction;
extern crate ethereum_types; extern crate ethereum_types;
extern crate ethjson; extern crate ethjson;
extern crate ethkey; extern crate ethkey;
extern crate futures_cpupool;
extern crate hardware_wallet; extern crate hardware_wallet;
extern crate hashdb; extern crate hashdb;
extern crate itertools; extern crate itertools;

View File

@ -35,6 +35,7 @@ use ethcore_miner::transaction_queue::{
AccountDetails, AccountDetails,
TransactionOrigin, TransactionOrigin,
}; };
use futures_cpupool::CpuPool;
use ethcore_miner::work_notify::{WorkPoster, NotifyWork}; use ethcore_miner::work_notify::{WorkPoster, NotifyWork};
use miner::service_transaction_checker::ServiceTransactionChecker; use miner::service_transaction_checker::ServiceTransactionChecker;
use miner::{MinerService, MinerStatus}; use miner::{MinerService, MinerStatus};
@ -218,11 +219,11 @@ pub enum GasPricer {
impl GasPricer { impl GasPricer {
/// Create a new Calibrated `GasPricer`. /// Create a new Calibrated `GasPricer`.
pub fn new_calibrated(options: GasPriceCalibratorOptions, fetch: FetchClient) -> GasPricer { pub fn new_calibrated(options: GasPriceCalibratorOptions, fetch: FetchClient, p: CpuPool) -> GasPricer {
GasPricer::Calibrated(GasPriceCalibrator { GasPricer::Calibrated(GasPriceCalibrator {
options: options, options: options,
next_calibration: Instant::now(), next_calibration: Instant::now(),
price_info: PriceInfoClient::new(fetch), price_info: PriceInfoClient::new(fetch, p),
}) })
} }

View File

@ -8,6 +8,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
futures = "0.1" futures = "0.1"
futures-cpupool = "0.1"
log = "0.3" log = "0.3"
mime = "0.3" mime = "0.3"
mime_guess = "2.0.0-alpha.2" mime_guess = "2.0.0-alpha.2"
@ -25,4 +26,5 @@ ethabi-derive = "5.0"
ethabi-contract = "5.0" ethabi-contract = "5.0"
[dev-dependencies] [dev-dependencies]
hyper = "0.11"
parking_lot = "0.5" parking_lot = "0.5"

View File

@ -22,7 +22,8 @@ use std::sync::Arc;
use std::path::PathBuf; use std::path::PathBuf;
use hash::keccak_buffer; use hash::keccak_buffer;
use fetch::{Fetch, Response, Error as FetchError, Client as FetchClient}; use fetch::{self, Fetch};
use futures_cpupool::CpuPool;
use futures::{Future, IntoFuture}; use futures::{Future, IntoFuture};
use parity_reactor::Remote; use parity_reactor::Remote;
use urlhint::{URLHintContract, URLHint, URLHintResult}; use urlhint::{URLHintContract, URLHint, URLHintResult};
@ -37,7 +38,7 @@ pub trait HashFetch: Send + Sync + 'static {
/// 2. `on_done` - callback function invoked when the content is ready (or there was error during fetch) /// 2. `on_done` - callback function invoked when the content is ready (or there was error during fetch)
/// ///
/// This function may fail immediately when fetch cannot be initialized or content cannot be resolved. /// This function may fail immediately when fetch cannot be initialized or content cannot be resolved.
fn fetch(&self, hash: H256, on_done: Box<Fn(Result<PathBuf, Error>) + Send>); fn fetch(&self, hash: H256, abort: fetch::Abort, on_done: Box<Fn(Result<PathBuf, Error>) + Send>);
} }
/// Hash-fetching error. /// Hash-fetching error.
@ -57,7 +58,7 @@ pub enum Error {
/// IO Error while validating hash. /// IO Error while validating hash.
IO(io::Error), IO(io::Error),
/// Error during fetch. /// Error during fetch.
Fetch(FetchError), Fetch(fetch::Error),
} }
#[cfg(test)] #[cfg(test)]
@ -77,8 +78,8 @@ impl PartialEq for Error {
} }
} }
impl From<FetchError> for Error { impl From<fetch::Error> for Error {
fn from(error: FetchError) -> Self { fn from(error: fetch::Error) -> Self {
Error::Fetch(error) Error::Fetch(error)
} }
} }
@ -89,14 +90,9 @@ impl From<io::Error> for Error {
} }
} }
fn validate_hash(path: PathBuf, hash: H256, result: Result<Response, FetchError>) -> Result<PathBuf, Error> { fn validate_hash(path: PathBuf, hash: H256, body: fetch::BodyReader) -> Result<PathBuf, Error> {
let response = result?;
if !response.is_success() {
return Err(Error::InvalidStatus);
}
// Read the response // Read the response
let mut reader = io::BufReader::new(response); let mut reader = io::BufReader::new(body);
let mut writer = io::BufWriter::new(fs::File::create(&path)?); let mut writer = io::BufWriter::new(fs::File::create(&path)?);
io::copy(&mut reader, &mut writer)?; io::copy(&mut reader, &mut writer)?;
writer.flush()?; writer.flush()?;
@ -112,24 +108,19 @@ fn validate_hash(path: PathBuf, hash: H256, result: Result<Response, FetchError>
} }
/// Default Hash-fetching client using on-chain contract to resolve hashes to URLs. /// Default Hash-fetching client using on-chain contract to resolve hashes to URLs.
pub struct Client<F: Fetch + 'static = FetchClient> { pub struct Client<F: Fetch + 'static = fetch::Client> {
pool: CpuPool,
contract: URLHintContract, contract: URLHintContract,
fetch: F, fetch: F,
remote: Remote, remote: Remote,
random_path: Arc<Fn() -> PathBuf + Sync + Send>, random_path: Arc<Fn() -> PathBuf + Sync + Send>,
} }
impl Client {
/// Creates new instance of the `Client` given on-chain contract client and task runner.
pub fn new(contract: Arc<RegistrarClient<Call=Asynchronous>>, remote: Remote) -> Self {
Client::with_fetch(contract, FetchClient::new().unwrap(), remote)
}
}
impl<F: Fetch + 'static> Client<F> { impl<F: Fetch + 'static> Client<F> {
/// Creates new instance of the `Client` given on-chain contract client, fetch service and task runner. /// Creates new instance of the `Client` given on-chain contract client, fetch service and task runner.
pub fn with_fetch(contract: Arc<RegistrarClient<Call=Asynchronous>>, fetch: F, remote: Remote) -> Self { pub fn with_fetch(contract: Arc<RegistrarClient<Call=Asynchronous>>, pool: CpuPool, fetch: F, remote: Remote) -> Self {
Client { Client {
pool,
contract: URLHintContract::new(contract), contract: URLHintContract::new(contract),
fetch: fetch, fetch: fetch,
remote: remote, remote: remote,
@ -139,11 +130,12 @@ impl<F: Fetch + 'static> Client<F> {
} }
impl<F: Fetch + 'static> HashFetch for Client<F> { impl<F: Fetch + 'static> HashFetch for Client<F> {
fn fetch(&self, hash: H256, on_done: Box<Fn(Result<PathBuf, Error>) + Send>) { fn fetch(&self, hash: H256, abort: fetch::Abort, on_done: Box<Fn(Result<PathBuf, Error>) + Send>) {
debug!(target: "fetch", "Fetching: {:?}", hash); debug!(target: "fetch", "Fetching: {:?}", hash);
let random_path = self.random_path.clone(); let random_path = self.random_path.clone();
let remote_fetch = self.fetch.clone(); let remote_fetch = self.fetch.clone();
let pool = self.pool.clone();
let future = self.contract.resolve(hash) let future = self.contract.resolve(hash)
.map_err(|e| { warn!("Error resolving URL: {}", e); Error::NoResolution }) .map_err(|e| { warn!("Error resolving URL: {}", e); Error::NoResolution })
.and_then(|maybe_url| maybe_url.ok_or(Error::NoResolution)) .and_then(|maybe_url| maybe_url.ok_or(Error::NoResolution))
@ -161,19 +153,26 @@ impl<F: Fetch + 'static> HashFetch for Client<F> {
.into_future() .into_future()
.and_then(move |url| { .and_then(move |url| {
debug!(target: "fetch", "Resolved {:?} to {:?}. Fetching...", hash, url); debug!(target: "fetch", "Resolved {:?} to {:?}. Fetching...", hash, url);
let future = remote_fetch.fetch(&url).then(move |result| { remote_fetch.fetch(&url, abort).from_err()
debug!(target: "fetch", "Content fetched, validating hash ({:?})", hash);
let path = random_path();
let res = validate_hash(path.clone(), hash, result);
if let Err(ref err) = res {
trace!(target: "fetch", "Error: {:?}", err);
// Remove temporary file in case of error
let _ = fs::remove_file(&path);
}
res
});
remote_fetch.process(future)
}) })
.and_then(move |response| {
if !response.is_success() {
Err(Error::InvalidStatus)
} else {
Ok(response)
}
})
.and_then(move |response| pool.spawn_fn(move || {
debug!(target: "fetch", "Content fetched, validating hash ({:?})", hash);
let path = random_path();
let res = validate_hash(path.clone(), hash, fetch::BodyReader::new(response));
if let Err(ref err) = res {
trace!(target: "fetch", "Error: {:?}", err);
// Remove temporary file in case of error
let _ = fs::remove_file(&path);
}
res
}))
.then(move |res| { on_done(res); Ok(()) as Result<(), ()> }); .then(move |res| { on_done(res); Ok(()) as Result<(), ()> });
self.remote.spawn(future); self.remote.spawn(future);
@ -194,14 +193,17 @@ fn random_temp_path() -> PathBuf {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
extern crate hyper;
use rustc_hex::FromHex; use rustc_hex::FromHex;
use std::sync::{Arc, mpsc}; use std::sync::{Arc, mpsc};
use parking_lot::Mutex; use parking_lot::Mutex;
use futures::future; use futures::future;
use fetch::{self, Fetch}; use futures_cpupool::CpuPool;
use fetch::{self, Fetch, Url};
use parity_reactor::Remote; use parity_reactor::Remote;
use urlhint::tests::{FakeRegistrar, URLHINT}; use urlhint::tests::{FakeRegistrar, URLHINT};
use super::{Error, Client, HashFetch, random_temp_path}; use super::{Error, Client, HashFetch, random_temp_path};
use self::hyper::StatusCode;
#[derive(Clone)] #[derive(Clone)]
@ -212,17 +214,13 @@ mod tests {
impl Fetch for FakeFetch { impl Fetch for FakeFetch {
type Result = future::Ok<fetch::Response, fetch::Error>; type Result = future::Ok<fetch::Response, fetch::Error>;
fn new() -> Result<Self, fetch::Error> where Self: Sized { fn fetch(&self, url: &str, abort: fetch::Abort) -> Self::Result {
Ok(FakeFetch { return_success: true })
}
fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result {
assert_eq!(url, "https://parity.io/assets/images/ethcore-black-horizontal.png"); assert_eq!(url, "https://parity.io/assets/images/ethcore-black-horizontal.png");
let u = Url::parse(url).unwrap();
future::ok(if self.return_success { future::ok(if self.return_success {
let cursor = ::std::io::Cursor::new(b"result"); fetch::client::Response::new(u, hyper::Response::new().with_body(&b"result"[..]), abort)
fetch::Response::from_reader(cursor)
} else { } else {
fetch::Response::not_found() fetch::client::Response::new(u, hyper::Response::new().with_status(StatusCode::NotFound), abort)
}) })
} }
} }
@ -241,11 +239,11 @@ mod tests {
// given // given
let contract = Arc::new(FakeRegistrar::new()); let contract = Arc::new(FakeRegistrar::new());
let fetch = FakeFetch { return_success: false }; let fetch = FakeFetch { return_success: false };
let client = Client::with_fetch(contract.clone(), fetch, Remote::new_sync()); let client = Client::with_fetch(contract.clone(), CpuPool::new(1), fetch, Remote::new_sync());
// when // when
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
client.fetch(2.into(), Box::new(move |result| { client.fetch(2.into(), Default::default(), Box::new(move |result| {
tx.send(result).unwrap(); tx.send(result).unwrap();
})); }));
@ -259,11 +257,11 @@ mod tests {
// given // given
let registrar = Arc::new(registrar()); let registrar = Arc::new(registrar());
let fetch = FakeFetch { return_success: false }; let fetch = FakeFetch { return_success: false };
let client = Client::with_fetch(registrar.clone(), fetch, Remote::new_sync()); let client = Client::with_fetch(registrar.clone(), CpuPool::new(1), fetch, Remote::new_sync());
// when // when
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
client.fetch(2.into(), Box::new(move |result| { client.fetch(2.into(), Default::default(), Box::new(move |result| {
tx.send(result).unwrap(); tx.send(result).unwrap();
})); }));
@ -277,14 +275,14 @@ mod tests {
// given // given
let registrar = Arc::new(registrar()); let registrar = Arc::new(registrar());
let fetch = FakeFetch { return_success: true }; let fetch = FakeFetch { return_success: true };
let mut client = Client::with_fetch(registrar.clone(), fetch, Remote::new_sync()); let mut client = Client::with_fetch(registrar.clone(), CpuPool::new(1), fetch, Remote::new_sync());
let path = random_temp_path(); let path = random_temp_path();
let path2 = path.clone(); let path2 = path.clone();
client.random_path = Arc::new(move || path2.clone()); client.random_path = Arc::new(move || path2.clone());
// when // when
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
client.fetch(2.into(), Box::new(move |result| { client.fetch(2.into(), Default::default(), Box::new(move |result| {
tx.send(result).unwrap(); tx.send(result).unwrap();
})); }));
@ -300,16 +298,17 @@ mod tests {
// given // given
let registrar = Arc::new(registrar()); let registrar = Arc::new(registrar());
let fetch = FakeFetch { return_success: true }; let fetch = FakeFetch { return_success: true };
let client = Client::with_fetch(registrar.clone(), fetch, Remote::new_sync()); let client = Client::with_fetch(registrar.clone(), CpuPool::new(1), fetch, Remote::new_sync());
// when // when
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
client.fetch("0x06b0a4f426f6713234b2d4b2468640bc4e0bb72657a920ad24c5087153c593c8".into(), Box::new(move |result| { client.fetch("0x06b0a4f426f6713234b2d4b2468640bc4e0bb72657a920ad24c5087153c593c8".into(),
tx.send(result).unwrap(); Default::default(),
})); Box::new(move |result| { tx.send(result).unwrap(); }));
// then // then
let result = rx.recv().unwrap(); let result = rx.recv().unwrap();
assert!(result.is_ok(), "Should return path, got: {:?}", result); assert!(result.is_ok(), "Should return path, got: {:?}", result);
} }
} }

View File

@ -25,6 +25,7 @@ extern crate ethabi;
extern crate ethcore_bytes as bytes; extern crate ethcore_bytes as bytes;
extern crate ethereum_types; extern crate ethereum_types;
extern crate futures; extern crate futures;
extern crate futures_cpupool;
extern crate keccak_hash as hash; extern crate keccak_hash as hash;
extern crate mime; extern crate mime;
extern crate mime_guess; extern crate mime_guess;
@ -47,3 +48,4 @@ mod client;
pub mod urlhint; pub mod urlhint;
pub use client::{HashFetch, Client, Error}; pub use client::{HashFetch, Client, Error};
pub use fetch::Abort;

View File

@ -939,6 +939,7 @@ impl Configuration {
_ => return Err("Invalid value for `--releases-track`. See `--help` for more information.".into()), _ => return Err("Invalid value for `--releases-track`. See `--help` for more information.".into()),
}, },
path: default_hypervisor_path(), path: default_hypervisor_path(),
max_size: 128 * 1024 * 1024,
}) })
} }
@ -1389,7 +1390,8 @@ mod tests {
require_consensus: true, require_consensus: true,
filter: UpdateFilter::Critical, filter: UpdateFilter::Critical,
track: ReleaseTrack::Unknown, track: ReleaseTrack::Unknown,
path: default_hypervisor_path() path: default_hypervisor_path(),
max_size: 128 * 1024 * 1024,
}, },
mode: Default::default(), mode: Default::default(),
tracing: Default::default(), tracing: Default::default(),
@ -1459,9 +1461,30 @@ mod tests {
let conf3 = parse(&["parity", "--auto-update=xxx"]); let conf3 = parse(&["parity", "--auto-update=xxx"]);
// then // then
assert_eq!(conf0.update_policy().unwrap(), UpdatePolicy{enable_downloading: true, require_consensus: true, filter: UpdateFilter::Critical, track: ReleaseTrack::Testing, path: default_hypervisor_path()}); assert_eq!(conf0.update_policy().unwrap(), UpdatePolicy {
assert_eq!(conf1.update_policy().unwrap(), UpdatePolicy{enable_downloading: true, require_consensus: false, filter: UpdateFilter::All, track: ReleaseTrack::Unknown, path: default_hypervisor_path()}); enable_downloading: true,
assert_eq!(conf2.update_policy().unwrap(), UpdatePolicy{enable_downloading: false, require_consensus: true, filter: UpdateFilter::All, track: ReleaseTrack::Beta, path: default_hypervisor_path()}); require_consensus: true,
filter: UpdateFilter::Critical,
track: ReleaseTrack::Testing,
path: default_hypervisor_path(),
max_size: 128 * 1024 * 1024,
});
assert_eq!(conf1.update_policy().unwrap(), UpdatePolicy {
enable_downloading: true,
require_consensus: false,
filter: UpdateFilter::All,
track: ReleaseTrack::Unknown,
path: default_hypervisor_path(),
max_size: 128 * 1024 * 1024,
});
assert_eq!(conf2.update_policy().unwrap(), UpdatePolicy {
enable_downloading: false,
require_consensus: true,
filter: UpdateFilter::All,
track: ReleaseTrack::Beta,
path: default_hypervisor_path(),
max_size: 128 * 1024 * 1024,
});
assert!(conf3.update_policy().is_err()); assert!(conf3.update_policy().is_err());
} }

View File

@ -23,6 +23,7 @@ use dir::helpers::replace_home;
use ethcore::client::{Client, BlockChainClient, BlockId, CallContract}; use ethcore::client::{Client, BlockChainClient, BlockId, CallContract};
use ethsync::LightSync; use ethsync::LightSync;
use futures::{Future, future, IntoFuture}; use futures::{Future, future, IntoFuture};
use futures_cpupool::CpuPool;
use hash_fetch::fetch::Client as FetchClient; use hash_fetch::fetch::Client as FetchClient;
use registrar::{RegistrarClient, Asynchronous}; use registrar::{RegistrarClient, Asynchronous};
use light::client::LightChainClient; use light::client::LightChainClient;
@ -160,6 +161,7 @@ pub struct Dependencies {
pub sync_status: Arc<SyncStatus>, pub sync_status: Arc<SyncStatus>,
pub contract_client: Arc<RegistrarClient<Call=Asynchronous>>, pub contract_client: Arc<RegistrarClient<Call=Asynchronous>>,
pub fetch: FetchClient, pub fetch: FetchClient,
pub pool: CpuPool,
pub signer: Arc<SignerService>, pub signer: Arc<SignerService>,
pub ui_address: Option<(String, u16)>, pub ui_address: Option<(String, u16)>,
} }
@ -253,7 +255,7 @@ mod server {
let web_proxy_tokens = Arc::new(move |token| signer.web_proxy_access_token_domain(&token)); let web_proxy_tokens = Arc::new(move |token| signer.web_proxy_access_token_domain(&token));
Ok(parity_dapps::Middleware::dapps( Ok(parity_dapps::Middleware::dapps(
deps.fetch.pool(), deps.pool,
deps.node_health, deps.node_health,
deps.ui_address, deps.ui_address,
extra_embed_on, extra_embed_on,
@ -273,7 +275,7 @@ mod server {
dapps_domain: &str, dapps_domain: &str,
) -> Result<Middleware, String> { ) -> Result<Middleware, String> {
Ok(parity_dapps::Middleware::ui( Ok(parity_dapps::Middleware::ui(
deps.fetch.pool(), deps.pool,
deps.node_health, deps.node_health,
dapps_domain, dapps_domain,
deps.contract_client, deps.contract_client,

View File

@ -17,6 +17,7 @@
use std::{str, fs, fmt}; use std::{str, fs, fmt};
use std::time::Duration; use std::time::Duration;
use ethereum_types::{U256, Address}; use ethereum_types::{U256, Address};
use futures_cpupool::CpuPool;
use parity_version::version_data; use parity_version::version_data;
use journaldb::Algorithm; use journaldb::Algorithm;
use ethcore::spec::{Spec, SpecParams}; use ethcore::spec::{Spec, SpecParams};
@ -240,7 +241,7 @@ impl Default for GasPricerConfig {
} }
impl GasPricerConfig { impl GasPricerConfig {
pub fn to_gas_pricer(&self, fetch: FetchClient) -> GasPricer { pub fn to_gas_pricer(&self, fetch: FetchClient, p: CpuPool) -> GasPricer {
match *self { match *self {
GasPricerConfig::Fixed(u) => GasPricer::Fixed(u), GasPricerConfig::Fixed(u) => GasPricer::Fixed(u),
GasPricerConfig::Calibrated { usd_per_tx, recalibration_period, .. } => { GasPricerConfig::Calibrated { usd_per_tx, recalibration_period, .. } => {
@ -249,7 +250,8 @@ impl GasPricerConfig {
usd_per_tx: usd_per_tx, usd_per_tx: usd_per_tx,
recalibration_period: recalibration_period, recalibration_period: recalibration_period,
}, },
fetch fetch,
p,
) )
} }
} }

View File

@ -28,6 +28,7 @@ use ethcore::miner::Miner;
use ethcore::snapshot::SnapshotService; use ethcore::snapshot::SnapshotService;
use ethcore_logger::RotatingLogger; use ethcore_logger::RotatingLogger;
use ethsync::{ManageNetwork, SyncProvider, LightSync}; use ethsync::{ManageNetwork, SyncProvider, LightSync};
use futures_cpupool::CpuPool;
use hash_fetch::fetch::Client as FetchClient; use hash_fetch::fetch::Client as FetchClient;
use jsonrpc_core::{self as core, MetaIoHandler}; use jsonrpc_core::{self as core, MetaIoHandler};
use light::client::LightChainClient; use light::client::LightChainClient;
@ -225,6 +226,7 @@ pub struct FullDependencies {
pub dapps_address: Option<Host>, pub dapps_address: Option<Host>,
pub ws_address: Option<Host>, pub ws_address: Option<Host>,
pub fetch: FetchClient, pub fetch: FetchClient,
pub pool: CpuPool,
pub remote: parity_reactor::Remote, pub remote: parity_reactor::Remote,
pub whisper_rpc: Option<::whisper::RpcFactory>, pub whisper_rpc: Option<::whisper::RpcFactory>,
pub gas_price_percentile: usize, pub gas_price_percentile: usize,
@ -253,7 +255,7 @@ impl FullDependencies {
} }
} }
let nonces = Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))); let nonces = Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.pool.clone())));
let dispatcher = FullDispatcher::new( let dispatcher = FullDispatcher::new(
self.client.clone(), self.client.clone(),
self.miner.clone(), self.miner.clone(),
@ -355,6 +357,7 @@ impl FullDependencies {
&self.net_service, &self.net_service,
self.dapps_service.clone(), self.dapps_service.clone(),
self.fetch.clone(), self.fetch.clone(),
self.pool.clone(),
).to_delegate()) ).to_delegate())
}, },
Api::Traces => { Api::Traces => {
@ -430,6 +433,7 @@ pub struct LightDependencies<T> {
pub dapps_address: Option<Host>, pub dapps_address: Option<Host>,
pub ws_address: Option<Host>, pub ws_address: Option<Host>,
pub fetch: FetchClient, pub fetch: FetchClient,
pub pool: CpuPool,
pub geth_compatibility: bool, pub geth_compatibility: bool,
pub remote: parity_reactor::Remote, pub remote: parity_reactor::Remote,
pub whisper_rpc: Option<::whisper::RpcFactory>, pub whisper_rpc: Option<::whisper::RpcFactory>,
@ -451,7 +455,7 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
self.on_demand.clone(), self.on_demand.clone(),
self.cache.clone(), self.cache.clone(),
self.transaction_queue.clone(), self.transaction_queue.clone(),
Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.fetch.pool()))), Arc::new(Mutex::new(dispatch::Reservations::with_pool(self.pool.clone()))),
self.gas_price_percentile, self.gas_price_percentile,
); );
@ -564,6 +568,7 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
self.sync.clone(), self.sync.clone(),
self.dapps_service.clone(), self.dapps_service.clone(),
self.fetch.clone(), self.fetch.clone(),
self.pool.clone(),
).to_delegate()) ).to_delegate())
}, },
Api::Traces => { Api::Traces => {

View File

@ -35,8 +35,8 @@ use ethcore_logger::{Config as LogConfig, RotatingLogger};
use ethcore_service::ClientService; use ethcore_service::ClientService;
use ethsync::{self, SyncConfig}; use ethsync::{self, SyncConfig};
use fdlimit::raise_fd_limit; use fdlimit::raise_fd_limit;
use hash_fetch::fetch::{Fetch, Client as FetchClient}; use futures_cpupool::CpuPool;
use hash_fetch; use hash_fetch::{self, fetch};
use informant::{Informant, LightNodeInformantData, FullNodeInformantData}; use informant::{Informant, LightNodeInformantData, FullNodeInformantData};
use journaldb::Algorithm; use journaldb::Algorithm;
use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb_rocksdb::{Database, DatabaseConfig};
@ -308,8 +308,10 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger
// start the network. // start the network.
light_sync.start_network(); light_sync.start_network();
let cpu_pool = CpuPool::new(4);
// fetch service // fetch service
let fetch = FetchClient::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?; let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
let passwords = passwords_from_files(&cmd.acc_conf.password_files)?; let passwords = passwords_from_files(&cmd.acc_conf.password_files)?;
// prepare account provider // prepare account provider
@ -342,7 +344,7 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger
let sync_status = Arc::new(LightSyncStatus(light_sync.clone())); let sync_status = Arc::new(LightSyncStatus(light_sync.clone()));
let node_health = node_health::NodeHealth::new( let node_health = node_health::NodeHealth::new(
sync_status.clone(), sync_status.clone(),
node_health::TimeChecker::new(&cmd.ntp_servers, fetch.pool()), node_health::TimeChecker::new(&cmd.ntp_servers, cpu_pool.clone()),
event_loop.remote(), event_loop.remote(),
); );
@ -351,6 +353,7 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger
node_health, node_health,
contract_client: Arc::new(contract_client), contract_client: Arc::new(contract_client),
fetch: fetch.clone(), fetch: fetch.clone(),
pool: cpu_pool.clone(),
signer: signer_service.clone(), signer: signer_service.clone(),
ui_address: cmd.ui_conf.redirection_address(), ui_address: cmd.ui_conf.redirection_address(),
}) })
@ -377,6 +380,7 @@ fn execute_light_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger
dapps_address: cmd.dapps_conf.address(cmd.http_conf.address()), dapps_address: cmd.dapps_conf.address(cmd.http_conf.address()),
ws_address: cmd.ws_conf.address(), ws_address: cmd.ws_conf.address(),
fetch: fetch, fetch: fetch,
pool: cpu_pool.clone(),
geth_compatibility: cmd.geth_compatibility, geth_compatibility: cmd.geth_compatibility,
remote: event_loop.remote(), remote: event_loop.remote(),
whisper_rpc: whisper_factory, whisper_rpc: whisper_factory,
@ -530,12 +534,14 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
// prepare account provider // prepare account provider
let account_provider = Arc::new(prepare_account_provider(&cmd.spec, &cmd.dirs, &spec.data_dir, cmd.acc_conf, &passwords)?); let account_provider = Arc::new(prepare_account_provider(&cmd.spec, &cmd.dirs, &spec.data_dir, cmd.acc_conf, &passwords)?);
let cpu_pool = CpuPool::new(4);
// fetch service // fetch service
let fetch = FetchClient::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?; let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
// create miner // create miner
let initial_min_gas_price = cmd.gas_pricer_conf.initial_min(); let initial_min_gas_price = cmd.gas_pricer_conf.initial_min();
let miner = Miner::new(cmd.miner_options, cmd.gas_pricer_conf.to_gas_pricer(fetch.clone()), &spec, Some(account_provider.clone())); let miner = Miner::new(cmd.miner_options, cmd.gas_pricer_conf.to_gas_pricer(fetch.clone(), cpu_pool.clone()), &spec, Some(account_provider.clone()));
miner.set_author(cmd.miner_extras.author); miner.set_author(cmd.miner_extras.author);
miner.set_gas_floor_target(cmd.miner_extras.gas_floor_target); miner.set_gas_floor_target(cmd.miner_extras.gas_floor_target);
miner.set_gas_ceil_target(cmd.miner_extras.gas_ceil_target); miner.set_gas_ceil_target(cmd.miner_extras.gas_ceil_target);
@ -694,15 +700,12 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
let contract_client = Arc::new(::dapps::FullRegistrar::new(client.clone())); let contract_client = Arc::new(::dapps::FullRegistrar::new(client.clone()));
// the updater service // the updater service
let mut updater_fetch = fetch.clone(); let updater_fetch = fetch.clone();
// parity binaries should be smaller than 128MB
updater_fetch.set_limit(Some(128 * 1024 * 1024));
let updater = Updater::new( let updater = Updater::new(
Arc::downgrade(&(service.client() as Arc<BlockChainClient>)), Arc::downgrade(&(service.client() as Arc<BlockChainClient>)),
Arc::downgrade(&sync_provider), Arc::downgrade(&sync_provider),
update_policy, update_policy,
hash_fetch::Client::with_fetch(contract_client.clone(), updater_fetch, event_loop.remote()) hash_fetch::Client::with_fetch(contract_client.clone(), cpu_pool.clone(), updater_fetch, event_loop.remote())
); );
service.add_notify(updater.clone()); service.add_notify(updater.clone());
@ -738,7 +741,7 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
let sync_status = Arc::new(SyncStatus(sync, client, net_conf)); let sync_status = Arc::new(SyncStatus(sync, client, net_conf));
let node_health = node_health::NodeHealth::new( let node_health = node_health::NodeHealth::new(
sync_status.clone(), sync_status.clone(),
node_health::TimeChecker::new(&cmd.ntp_servers, fetch.pool()), node_health::TimeChecker::new(&cmd.ntp_servers, cpu_pool.clone()),
event_loop.remote(), event_loop.remote(),
); );
(node_health.clone(), dapps::Dependencies { (node_health.clone(), dapps::Dependencies {
@ -746,6 +749,7 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
node_health, node_health,
contract_client, contract_client,
fetch: fetch.clone(), fetch: fetch.clone(),
pool: cpu_pool.clone(),
signer: signer_service.clone(), signer: signer_service.clone(),
ui_address: cmd.ui_conf.redirection_address(), ui_address: cmd.ui_conf.redirection_address(),
}) })
@ -773,6 +777,7 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
dapps_address: cmd.dapps_conf.address(cmd.http_conf.address()), dapps_address: cmd.dapps_conf.address(cmd.http_conf.address()),
ws_address: cmd.ws_conf.address(), ws_address: cmd.ws_conf.address(),
fetch: fetch.clone(), fetch: fetch.clone(),
pool: cpu_pool.clone(),
remote: event_loop.remote(), remote: event_loop.remote(),
whisper_rpc: whisper_factory, whisper_rpc: whisper_factory,
gas_price_percentile: cmd.gas_price_percentile, gas_price_percentile: cmd.gas_price_percentile,

View File

@ -9,8 +9,10 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
fetch = { path = "../util/fetch" } fetch = { path = "../util/fetch" }
futures = "0.1" futures = "0.1"
futures-cpupool = "0.1"
log = "0.3" log = "0.3"
serde_json = "1.0" serde_json = "1.0"
[dev-dependencies] [dev-dependencies]
hyper = "0.11"
parking_lot = "0.5" parking_lot = "0.5"

View File

@ -19,6 +19,7 @@
//! A simple client to get the current ETH price using an external API. //! A simple client to get the current ETH price using an external API.
extern crate futures; extern crate futures;
extern crate futures_cpupool;
extern crate serde_json; extern crate serde_json;
#[macro_use] #[macro_use]
@ -29,10 +30,12 @@ pub extern crate fetch;
use std::cmp; use std::cmp;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::io::Read; use std::str;
use fetch::{Client as FetchClient, Fetch}; use fetch::{Client as FetchClient, Fetch};
use futures::Future; use futures::{Future, Stream};
use futures::future::{self, Either};
use futures_cpupool::CpuPool;
use serde_json::Value; use serde_json::Value;
/// Current ETH price information. /// Current ETH price information.
@ -48,7 +51,7 @@ pub enum Error {
/// The API returned an unexpected status code. /// The API returned an unexpected status code.
StatusCode(&'static str), StatusCode(&'static str),
/// The API returned an unexpected status content. /// The API returned an unexpected status content.
UnexpectedResponse(String), UnexpectedResponse(Option<String>),
/// There was an error when trying to reach the API. /// There was an error when trying to reach the API.
Fetch(fetch::Error), Fetch(fetch::Error),
/// IO error when reading API response. /// IO error when reading API response.
@ -65,6 +68,7 @@ impl From<fetch::Error> for Error {
/// A client to get the current ETH price using an external API. /// A client to get the current ETH price using an external API.
pub struct Client<F = FetchClient> { pub struct Client<F = FetchClient> {
pool: CpuPool,
api_endpoint: String, api_endpoint: String,
fetch: F, fetch: F,
} }
@ -85,23 +89,25 @@ impl<F> cmp::PartialEq for Client<F> {
impl<F: Fetch> Client<F> { impl<F: Fetch> Client<F> {
/// Creates a new instance of the `Client` given a `fetch::Client`. /// Creates a new instance of the `Client` given a `fetch::Client`.
pub fn new(fetch: F) -> Client<F> { pub fn new(fetch: F, pool: CpuPool) -> Client<F> {
let api_endpoint = "https://api.etherscan.io/api?module=stats&action=ethprice".to_owned(); let api_endpoint = "https://api.etherscan.io/api?module=stats&action=ethprice".to_owned();
Client { api_endpoint, fetch } Client { pool, api_endpoint, fetch }
} }
/// Gets the current ETH price and calls `set_price` with the result. /// Gets the current ETH price and calls `set_price` with the result.
pub fn get<G: Fn(PriceInfo) + Sync + Send + 'static>(&self, set_price: G) { pub fn get<G: Fn(PriceInfo) + Sync + Send + 'static>(&self, set_price: G) {
self.fetch.process_and_forget(self.fetch.fetch(&self.api_endpoint) let future = self.fetch.fetch(&self.api_endpoint, fetch::Abort::default())
.map_err(|err| Error::Fetch(err)) .from_err()
.and_then(move |mut response| { .and_then(|response| {
if !response.is_success() { if !response.is_success() {
return Err(Error::StatusCode(response.status().canonical_reason().unwrap_or("unknown"))); let s = Error::StatusCode(response.status().canonical_reason().unwrap_or("unknown"));
return Either::A(future::err(s));
} }
let mut result = String::new(); Either::B(response.concat2().from_err())
response.read_to_string(&mut result)?; })
.map(move |body| {
let value: Option<Value> = serde_json::from_str(&result).ok(); let body_str = str::from_utf8(&body).ok();
let value: Option<Value> = body_str.and_then(|s| serde_json::from_str(s).ok());
let ethusd = value let ethusd = value
.as_ref() .as_ref()
@ -114,63 +120,65 @@ impl<F: Fetch> Client<F> {
set_price(PriceInfo { ethusd }); set_price(PriceInfo { ethusd });
Ok(()) Ok(())
}, },
None => Err(Error::UnexpectedResponse(result)), None => Err(Error::UnexpectedResponse(body_str.map(From::from))),
} }
}) })
.map_err(|err| { .map_err(|err| {
warn!("Failed to auto-update latest ETH price: {:?}", err); warn!("Failed to auto-update latest ETH price: {:?}", err);
err err
}) });
); self.pool.spawn(future).forget()
} }
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
extern crate hyper;
extern crate parking_lot; extern crate parking_lot;
use self::parking_lot::Mutex; use self::parking_lot::Mutex;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use fetch; use fetch;
use fetch::Fetch; use fetch::{Fetch, Url};
use futures; use futures_cpupool::CpuPool;
use futures::future::{Future, FutureResult}; use futures::future::{self, FutureResult};
use Client; use Client;
use self::hyper::StatusCode;
#[derive(Clone)] #[derive(Clone)]
struct FakeFetch(Option<String>, Arc<Mutex<u64>>); struct FakeFetch(Option<String>, Arc<Mutex<u64>>);
impl FakeFetch {
fn new() -> Result<Self, fetch::Error> {
Ok(FakeFetch(None, Default::default()))
}
}
impl Fetch for FakeFetch { impl Fetch for FakeFetch {
type Result = FutureResult<fetch::Response, fetch::Error>; type Result = FutureResult<fetch::Response, fetch::Error>;
fn new() -> Result<Self, fetch::Error> where Self: Sized { Ok(FakeFetch(None, Default::default())) }
fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result { fn fetch(&self, url: &str, abort: fetch::Abort) -> Self::Result {
assert_eq!(url, "https://api.etherscan.io/api?module=stats&action=ethprice"); assert_eq!(url, "https://api.etherscan.io/api?module=stats&action=ethprice");
let u = Url::parse(url).unwrap();
let mut val = self.1.lock(); let mut val = self.1.lock();
*val = *val + 1; *val = *val + 1;
if let Some(ref response) = self.0 { if let Some(ref response) = self.0 {
let data = ::std::io::Cursor::new(response.clone()); let r = hyper::Response::new().with_body(response.clone());
futures::future::ok(fetch::Response::from_reader(data)) future::ok(fetch::client::Response::new(u, r, abort))
} else { } else {
futures::future::ok(fetch::Response::not_found()) let r = hyper::Response::new().with_status(StatusCode::NotFound);
future::ok(fetch::client::Response::new(u, r, abort))
} }
} }
// this guarantees that the calls to price_info::Client::get will block for execution
fn process_and_forget<F, I, E>(&self, f: F) where
F: Future<Item=I, Error=E> + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
let _ = f.wait();
}
} }
fn price_info_ok(response: &str) -> Client<FakeFetch> { fn price_info_ok(response: &str) -> Client<FakeFetch> {
Client::new(FakeFetch(Some(response.to_owned()), Default::default())) Client::new(FakeFetch(Some(response.to_owned()), Default::default()), CpuPool::new(1))
} }
fn price_info_not_found() -> Client<FakeFetch> { fn price_info_not_found() -> Client<FakeFetch> {
Client::new(FakeFetch::new().unwrap()) Client::new(FakeFetch::new().unwrap(), CpuPool::new(1))
} }
#[test] #[test]

View File

@ -21,7 +21,8 @@ use std::io;
use std::sync::Arc; use std::sync::Arc;
use ethsync::ManageNetwork; use ethsync::ManageNetwork;
use fetch::Fetch; use fetch::{self, Fetch};
use futures_cpupool::CpuPool;
use hash::keccak_buffer; use hash::keccak_buffer;
use jsonrpc_core::{Result, BoxFuture}; use jsonrpc_core::{Result, BoxFuture};
@ -36,15 +37,17 @@ pub struct ParitySetClient<F> {
net: Arc<ManageNetwork>, net: Arc<ManageNetwork>,
dapps: Option<Arc<DappsService>>, dapps: Option<Arc<DappsService>>,
fetch: F, fetch: F,
pool: CpuPool,
} }
impl<F: Fetch> ParitySetClient<F> { impl<F: Fetch> ParitySetClient<F> {
/// Creates new `ParitySetClient` with given `Fetch`. /// Creates new `ParitySetClient` with given `Fetch`.
pub fn new(net: Arc<ManageNetwork>, dapps: Option<Arc<DappsService>>, fetch: F) -> Self { pub fn new(net: Arc<ManageNetwork>, dapps: Option<Arc<DappsService>>, fetch: F, p: CpuPool) -> Self {
ParitySetClient { ParitySetClient {
net: net, net: net,
dapps: dapps, dapps: dapps,
fetch: fetch, fetch: fetch,
pool: p,
} }
} }
} }
@ -125,14 +128,16 @@ impl<F: Fetch> ParitySet for ParitySetClient<F> {
} }
fn hash_content(&self, url: String) -> BoxFuture<H256> { fn hash_content(&self, url: String) -> BoxFuture<H256> {
self.fetch.process(self.fetch.fetch(&url).then(move |result| { let future = self.fetch.fetch(&url, Default::default()).then(move |result| {
result result
.map_err(errors::fetch) .map_err(errors::fetch)
.and_then(|response| { .and_then(move |response| {
keccak_buffer(&mut io::BufReader::new(response)).map_err(errors::fetch) let mut reader = io::BufReader::new(fetch::BodyReader::new(response));
keccak_buffer(&mut reader).map_err(errors::fetch)
}) })
.map(Into::into) .map(Into::into)
})) });
Box::new(self.pool.spawn(future))
} }
fn dapps_refresh(&self) -> Result<bool> { fn dapps_refresh(&self) -> Result<bool> {

View File

@ -23,6 +23,7 @@ use ethcore::client::MiningBlockChainClient;
use ethcore::mode::Mode; use ethcore::mode::Mode;
use ethsync::ManageNetwork; use ethsync::ManageNetwork;
use fetch::{self, Fetch}; use fetch::{self, Fetch};
use futures_cpupool::CpuPool;
use hash::keccak_buffer; use hash::keccak_buffer;
use updater::{Service as UpdateService}; use updater::{Service as UpdateService};
@ -41,6 +42,7 @@ pub struct ParitySetClient<C, M, U, F = fetch::Client> {
net: Arc<ManageNetwork>, net: Arc<ManageNetwork>,
dapps: Option<Arc<DappsService>>, dapps: Option<Arc<DappsService>>,
fetch: F, fetch: F,
pool: CpuPool,
eip86_transition: u64, eip86_transition: u64,
} }
@ -55,6 +57,7 @@ impl<C, M, U, F> ParitySetClient<C, M, U, F>
net: &Arc<ManageNetwork>, net: &Arc<ManageNetwork>,
dapps: Option<Arc<DappsService>>, dapps: Option<Arc<DappsService>>,
fetch: F, fetch: F,
pool: CpuPool,
) -> Self { ) -> Self {
ParitySetClient { ParitySetClient {
client: client.clone(), client: client.clone(),
@ -63,6 +66,7 @@ impl<C, M, U, F> ParitySetClient<C, M, U, F>
net: net.clone(), net: net.clone(),
dapps: dapps, dapps: dapps,
fetch: fetch, fetch: fetch,
pool: pool,
eip86_transition: client.eip86_transition(), eip86_transition: client.eip86_transition(),
} }
} }
@ -166,14 +170,16 @@ impl<C, M, U, F> ParitySet for ParitySetClient<C, M, U, F> where
} }
fn hash_content(&self, url: String) -> BoxFuture<H256> { fn hash_content(&self, url: String) -> BoxFuture<H256> {
self.fetch.process(self.fetch.fetch(&url).then(move |result| { let future = self.fetch.fetch(&url, Default::default()).then(move |result| {
result result
.map_err(errors::fetch) .map_err(errors::fetch)
.and_then(|response| { .and_then(move |response| {
keccak_buffer(&mut io::BufReader::new(response)).map_err(errors::fetch) let mut reader = io::BufReader::new(fetch::BodyReader::new(response));
keccak_buffer(&mut reader).map_err(errors::fetch)
}) })
.map(Into::into) .map(Into::into)
})) });
Box::new(self.pool.spawn(future))
} }
fn dapps_refresh(&self) -> Result<bool> { fn dapps_refresh(&self) -> Result<bool> {

View File

@ -16,9 +16,10 @@
//! Test implementation of fetch client. //! Test implementation of fetch client.
use std::{io, thread}; use std::thread;
use jsonrpc_core::futures::{self, Future}; use jsonrpc_core::futures::{self, Future};
use fetch::{self, Fetch}; use fetch::{self, Fetch, Url};
use hyper;
/// Test implementation of fetcher. Will always return the same file. /// Test implementation of fetcher. Will always return the same file.
#[derive(Default, Clone)] #[derive(Default, Clone)]
@ -27,15 +28,12 @@ pub struct TestFetch;
impl Fetch for TestFetch { impl Fetch for TestFetch {
type Result = Box<Future<Item = fetch::Response, Error = fetch::Error> + Send + 'static>; type Result = Box<Future<Item = fetch::Response, Error = fetch::Error> + Send + 'static>;
fn new() -> Result<Self, fetch::Error> where Self: Sized { fn fetch(&self, url: &str, abort: fetch::Abort) -> Self::Result {
Ok(TestFetch) let u = Url::parse(url).unwrap();
}
fn fetch_with_abort(&self, _url: &str, _abort: fetch::Abort) -> Self::Result {
let (tx, rx) = futures::oneshot(); let (tx, rx) = futures::oneshot();
thread::spawn(move || { thread::spawn(move || {
let cursor = io::Cursor::new(b"Some content"); let r = hyper::Response::new().with_body(&b"Some content"[..]);
tx.send(fetch::Response::from_reader(cursor)).unwrap(); tx.send(fetch::Response::new(u, r, abort)).unwrap();
}); });
Box::new(rx.map_err(|_| fetch::Error::Aborted)) Box::new(rx.map_err(|_| fetch::Error::Aborted))

View File

@ -22,6 +22,7 @@ use ethereum_types::{U256, Address};
use ethcore::miner::MinerService; use ethcore::miner::MinerService;
use ethcore::client::TestBlockChainClient; use ethcore::client::TestBlockChainClient;
use ethsync::ManageNetwork; use ethsync::ManageNetwork;
use futures_cpupool::CpuPool;
use jsonrpc_core::IoHandler; use jsonrpc_core::IoHandler;
use v1::{ParitySet, ParitySetClient}; use v1::{ParitySet, ParitySetClient};
@ -53,7 +54,8 @@ fn parity_set_client(
net: &Arc<TestManageNetwork>, net: &Arc<TestManageNetwork>,
) -> TestParitySetClient { ) -> TestParitySetClient {
let dapps_service = Arc::new(TestDappsService); let dapps_service = Arc::new(TestDappsService);
ParitySetClient::new(client, miner, updater, &(net.clone() as Arc<ManageNetwork>), Some(dapps_service), TestFetch::default()) let pool = CpuPool::new(1);
ParitySetClient::new(client, miner, updater, &(net.clone() as Arc<ManageNetwork>), Some(dapps_service), TestFetch::default(), pool)
} }
#[test] #[test]

View File

@ -58,6 +58,8 @@ pub struct UpdatePolicy {
pub track: ReleaseTrack, pub track: ReleaseTrack,
/// Path for the updates to go. /// Path for the updates to go.
pub path: String, pub path: String,
/// Maximum download size.
pub max_size: usize,
} }
impl Default for UpdatePolicy { impl Default for UpdatePolicy {
@ -68,6 +70,7 @@ impl Default for UpdatePolicy {
filter: UpdateFilter::None, filter: UpdateFilter::None,
track: ReleaseTrack::Unknown, track: ReleaseTrack::Unknown,
path: Default::default(), path: Default::default(),
max_size: 128 * 1024 * 1024,
} }
} }
} }
@ -341,7 +344,8 @@ impl Updater {
drop(s); drop(s);
let weak_self = self.weak_self.lock().clone(); let weak_self = self.weak_self.lock().clone();
let f = move |r: Result<PathBuf, fetch::Error>| if let Some(this) = weak_self.upgrade() { this.fetch_done(r) }; let f = move |r: Result<PathBuf, fetch::Error>| if let Some(this) = weak_self.upgrade() { this.fetch_done(r) };
self.fetcher.fetch(b, Box::new(f)); let a = fetch::Abort::default().with_max_size(self.update_policy.max_size);
self.fetcher.fetch(b, a, Box::new(f));
} }
} }
} }

View File

@ -8,10 +8,12 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
futures = "0.1" futures = "0.1"
futures-cpupool = "0.1" futures-timer = "0.1"
parking_lot = "0.5" hyper = "0.11"
log = "0.3" hyper-rustls = "0.11"
reqwest = "0.8" log = "0.4"
tokio-core = "0.1"
url = "1"
[features] [features]
default = [] default = []

View File

@ -14,347 +14,665 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Fetching use futures::future::{self, Loop};
use futures::sync::{mpsc, oneshot};
use std::{io, fmt, time}; use futures::{self, Future, Async, Sink, Stream};
use futures_timer::FutureExt;
use hyper::header::{UserAgent, Location, ContentLength, ContentType};
use hyper::mime::Mime;
use hyper::{self, Request, Method, StatusCode};
use hyper_rustls;
use std;
use std::cmp::min;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::thread;
use std::time::Duration;
use std::{io, fmt};
use tokio_core::reactor;
use url::{self, Url};
use futures::{self, Future}; const MAX_SIZE: usize = 64 * 1024 * 1024;
use futures_cpupool::{CpuPool, CpuFuture}; const MAX_SECS: u64 = 5;
use parking_lot::RwLock; const MAX_REDR: usize = 5;
use reqwest;
use reqwest::mime::Mime;
type BoxFuture<A, B> = Box<Future<Item = A, Error = B> + Send>; /// A handle to abort requests.
///
/// Requests are either aborted based on reaching thresholds such as
/// maximum response size, timeouts or too many redirects, or else
/// they can be aborted explicitly by the calling code.
#[derive(Clone, Debug)]
pub struct Abort {
abort: Arc<AtomicBool>,
size: usize,
time: Duration,
redir: usize,
}
/// Fetch abort control impl Default for Abort {
#[derive(Default, Debug, Clone)] fn default() -> Abort {
pub struct Abort(Arc<AtomicBool>); Abort {
abort: Arc::new(AtomicBool::new(false)),
impl Abort { size: MAX_SIZE,
/// Returns `true` if request is aborted. time: Duration::from_secs(MAX_SECS),
pub fn is_aborted(&self) -> bool { redir: MAX_REDR,
self.0.load(atomic::Ordering::SeqCst) }
} }
} }
impl From<Arc<AtomicBool>> for Abort { impl From<Arc<AtomicBool>> for Abort {
fn from(a: Arc<AtomicBool>) -> Self { fn from(a: Arc<AtomicBool>) -> Abort {
Abort(a) Abort {
abort: a,
size: MAX_SIZE,
time: Duration::from_secs(MAX_SECS),
redir: MAX_REDR,
}
} }
} }
/// Fetch impl Abort {
/// True if `abort` has been invoked.
pub fn is_aborted(&self) -> bool {
self.abort.load(Ordering::SeqCst)
}
/// The maximum response body size.
pub fn max_size(&self) -> usize {
self.size
}
/// The maximum total time, including redirects.
pub fn max_duration(&self) -> Duration {
self.time
}
/// The maximum number of redirects to allow.
pub fn max_redirects(&self) -> usize {
self.redir
}
/// Mark as aborted.
pub fn abort(&self) {
self.abort.store(true, Ordering::SeqCst)
}
/// Set the maximum reponse body size.
pub fn with_max_size(self, n: usize) -> Abort {
Abort { size: n, .. self }
}
/// Set the maximum duration (including redirects).
pub fn with_max_duration(self, d: Duration) -> Abort {
Abort { time: d, .. self }
}
/// Set the maximum number of redirects to follow.
pub fn with_max_redirects(self, n: usize) -> Abort {
Abort { redir: n, .. self }
}
}
/// Types which retrieve content from some URL.
pub trait Fetch: Clone + Send + Sync + 'static { pub trait Fetch: Clone + Send + Sync + 'static {
/// Result type /// The result future.
type Result: Future<Item=Response, Error=Error> + Send + 'static; type Result: Future<Item=Response, Error=Error> + Send + 'static;
/// Creates new Fetch object. /// Get content from some URL.
fn new() -> Result<Self, Error> where Self: Sized; fn fetch(&self, url: &str, abort: Abort) -> Self::Result;
/// Spawn the future in context of this `Fetch` thread pool.
/// Implementation is optional.
fn process<F, I, E>(&self, f: F) -> BoxFuture<I, E> where
F: Future<Item=I, Error=E> + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
Box::new(f)
}
/// Spawn the future in context of this `Fetch` thread pool as "fire and forget", i.e. dropping this future without
/// canceling the underlying future.
/// Implementation is optional.
fn process_and_forget<F, I, E>(&self, _: F) where
F: Future<Item=I, Error=E> + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
panic!("Attempting to process and forget future on unsupported Fetch.");
}
/// Fetch URL and get a future for the result.
/// Supports aborting the request in the middle of execution.
fn fetch_with_abort(&self, url: &str, abort: Abort) -> Self::Result;
/// Fetch URL and get a future for the result.
fn fetch(&self, url: &str) -> Self::Result {
self.fetch_with_abort(url, Default::default())
}
/// Fetch URL and get the result synchronously.
fn fetch_sync(&self, url: &str) -> Result<Response, Error> {
self.fetch(url).wait()
}
/// Closes this client
fn close(self) where Self: Sized {}
} }
const CLIENT_TIMEOUT_SECONDS: u64 = 5; type TxResponse = oneshot::Sender<Result<Response, Error>>;
type TxStartup = std::sync::mpsc::SyncSender<Result<(), io::Error>>;
type ChanItem = Option<(Url, Abort, TxResponse)>;
/// Fetch client /// An implementation of `Fetch` using a `hyper` client.
// Due to the `Send` bound of `Fetch` we spawn a background thread for
// actual request/response processing as `hyper::Client` itself does
// not implement `Send` currently.
#[derive(Debug)]
pub struct Client { pub struct Client {
client: RwLock<(time::Instant, Arc<reqwest::Client>)>, core: mpsc::Sender<ChanItem>,
pool: CpuPool, refs: Arc<AtomicUsize>,
limit: Option<usize>,
} }
// When cloning a client we increment the internal reference counter.
impl Clone for Client { impl Clone for Client {
fn clone(&self) -> Self { fn clone(&self) -> Client {
let (ref time, ref client) = *self.client.read(); self.refs.fetch_add(1, Ordering::SeqCst);
Client { Client {
client: RwLock::new((time.clone(), client.clone())), core: self.core.clone(),
pool: self.pool.clone(), refs: self.refs.clone(),
limit: self.limit.clone(), }
}
}
// When dropping a client, we decrement the reference counter.
// Once it reaches 0 we terminate the background thread.
impl Drop for Client {
fn drop(&mut self) {
if self.refs.fetch_sub(1, Ordering::SeqCst) == 1 {
// ignore send error as it means the background thread is gone already
let _ = self.core.clone().send(None).wait();
} }
} }
} }
impl Client { impl Client {
fn new_client() -> Result<Arc<reqwest::Client>, Error> { /// Create a new fetch client.
let mut client = reqwest::ClientBuilder::new(); pub fn new() -> Result<Self, Error> {
client.redirect(reqwest::RedirectPolicy::limited(5)); let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
Ok(Arc::new(client.build()?)) let (tx_proto, rx_proto) = mpsc::channel(64);
}
Client::background_thread(tx_start, rx_proto)?;
match rx_start.recv_timeout(Duration::from_secs(10)) {
Err(RecvTimeoutError::Timeout) => {
error!(target: "fetch", "timeout starting background thread");
return Err(Error::BackgroundThreadDead)
}
Err(RecvTimeoutError::Disconnected) => {
error!(target: "fetch", "background thread gone");
return Err(Error::BackgroundThreadDead)
}
Ok(Err(e)) => {
error!(target: "fetch", "error starting background thread: {}", e);
return Err(e.into())
}
Ok(Ok(())) => {}
}
fn with_limit(limit: Option<usize>) -> Result<Self, Error> {
Ok(Client { Ok(Client {
client: RwLock::new((time::Instant::now(), Self::new_client()?)), core: tx_proto,
pool: CpuPool::new(4), refs: Arc::new(AtomicUsize::new(1)),
limit: limit,
}) })
} }
/// Sets a limit on the maximum download size. fn background_thread(tx_start: TxStartup, rx_proto: mpsc::Receiver<ChanItem>) -> io::Result<thread::JoinHandle<()>> {
pub fn set_limit(&mut self, limit: Option<usize>) { thread::Builder::new().name("fetch".into()).spawn(move || {
self.limit = limit let mut core = match reactor::Core::new() {
} Ok(c) => c,
Err(e) => return tx_start.send(Err(e)).unwrap_or(())
};
fn client(&self) -> Result<Arc<reqwest::Client>, Error> { let handle = core.handle();
{ let hyper = hyper::Client::configure()
let (ref time, ref client) = *self.client.read(); .connector(hyper_rustls::HttpsConnector::new(4, &core.handle()))
if time.elapsed() < time::Duration::from_secs(CLIENT_TIMEOUT_SECONDS) { .build(&core.handle());
return Ok(client.clone());
let future = rx_proto.take_while(|item| Ok(item.is_some()))
.map(|item| item.expect("`take_while` is only passing on channel items != None; qed"))
.for_each(|(url, abort, sender)|
{
trace!(target: "fetch", "new request to {}", url);
if abort.is_aborted() {
return future::ok(sender.send(Err(Error::Aborted)).unwrap_or(()))
}
let ini = (hyper.clone(), url, abort, 0);
let fut = future::loop_fn(ini, |(client, url, abort, redirects)| {
let url2 = url.clone();
let abort2 = abort.clone();
client.request(get(&url))
.map(move |resp| Response::new(url2, resp, abort2))
.from_err()
.and_then(move |resp| {
if abort.is_aborted() {
debug!(target: "fetch", "fetch of {} aborted", url);
return Err(Error::Aborted)
}
if let Some(next_url) = redirect_location(url, &resp) {
if redirects >= abort.max_redirects() {
return Err(Error::TooManyRedirects)
}
Ok(Loop::Continue((client, next_url, abort, redirects + 1)))
} else {
let content_len = resp.headers.get::<ContentLength>().cloned();
if content_len.map(|n| *n > abort.max_size() as u64).unwrap_or(false) {
return Err(Error::SizeLimit)
}
Ok(Loop::Break(resp))
}
})
})
.then(|result| {
future::ok(sender.send(result).unwrap_or(()))
});
handle.spawn(fut);
trace!(target: "fetch", "waiting for next request ...");
future::ok(())
});
tx_start.send(Ok(())).unwrap_or(());
debug!(target: "fetch", "processing requests ...");
if let Err(()) = core.run(future) {
error!(target: "fetch", "error while executing future")
} }
} debug!(target: "fetch", "fetch background thread finished")
})
let client = Self::new_client()?;
*self.client.write() = (time::Instant::now(), client.clone());
Ok(client)
}
/// Returns a handle to underlying CpuPool of this client.
pub fn pool(&self) -> CpuPool {
self.pool.clone()
} }
} }
impl Fetch for Client { impl Fetch for Client {
type Result = CpuFuture<Response, Error>; type Result = Box<Future<Item=Response, Error=Error> + Send>;
fn new() -> Result<Self, Error> { fn fetch(&self, url: &str, abort: Abort) -> Self::Result {
// Max 64MB will be downloaded. debug!(target: "fetch", "fetching: {:?}", url);
Self::with_limit(Some(64 * 1024 * 1024)) if abort.is_aborted() {
return Box::new(future::err(Error::Aborted))
}
let url: Url = match url.parse() {
Ok(u) => u,
Err(e) => return Box::new(future::err(e.into()))
};
let (tx_res, rx_res) = oneshot::channel();
let maxdur = abort.max_duration();
let sender = self.core.clone();
let future = sender.send(Some((url.clone(), abort, tx_res)))
.map_err(|e| {
error!(target: "fetch", "failed to schedule request: {}", e);
Error::BackgroundThreadDead
})
.and_then(|_| rx_res.map_err(|oneshot::Canceled| Error::BackgroundThreadDead))
.and_then(future::result)
.timeout(maxdur)
.map_err(|err| {
if let Error::Io(ref e) = err {
if let io::ErrorKind::TimedOut = e.kind() {
return Error::Timeout
}
}
err.into()
});
Box::new(future)
} }
}
fn process<F, I, E>(&self, f: F) -> BoxFuture<I, E> where // Extract redirect location from response.
F: Future<Item=I, Error=E> + Send + 'static, fn redirect_location(u: Url, r: &Response) -> Option<Url> {
I: Send + 'static, use hyper::StatusCode::*;
E: Send + 'static, match r.status() {
{ MovedPermanently
Box::new(self.pool.spawn(f)) | PermanentRedirect
| TemporaryRedirect
| Found
| SeeOther => {
if let Some(loc) = r.headers.get::<Location>() {
u.join(loc).ok()
} else {
None
}
}
_ => None
} }
}
fn process_and_forget<F, I, E>(&self, f: F) where // Build a simple GET request for the given Url.
F: Future<Item=I, Error=E> + Send + 'static, fn get(u: &Url) -> hyper::Request {
I: Send + 'static, let uri = u.as_ref().parse().expect("Every valid URL is aso a URI.");
E: Send + 'static, let mut rq = Request::new(Method::Get, uri);
{ rq.headers_mut().set(UserAgent::new("Parity Fetch Neo"));
self.pool.spawn(f).forget() rq
} }
fn fetch_with_abort(&self, url: &str, abort: Abort) -> Self::Result { /// An HTTP response.
debug!(target: "fetch", "Fetching from: {:?}", url); #[derive(Debug)]
pub struct Response {
url: Url,
status: StatusCode,
headers: hyper::Headers,
body: hyper::Body,
abort: Abort,
nread: usize,
}
match self.client() { impl Response {
Ok(client) => { /// Create a new response, wrapping a hyper response.
self.pool.spawn(FetchTask { pub fn new(u: Url, r: hyper::Response, a: Abort) -> Response {
url: url.into(), Response {
client: client, url: u,
limit: self.limit, status: r.status(),
abort: abort, headers: r.headers().clone(),
}) body: r.body(),
}, abort: a,
Err(err) => { nread: 0,
self.pool.spawn(futures::future::err(err))
},
} }
} }
/// The response status.
pub fn status(&self) -> StatusCode {
self.status
}
/// Status code == OK (200)?
pub fn is_success(&self) -> bool {
self.status() == StatusCode::Ok
}
/// Is the content-type text/html?
pub fn is_html(&self) -> bool {
if let Some(ref mime) = self.content_type() {
mime.type_() == "text" && mime.subtype() == "html"
} else {
false
}
}
/// The conten-type header value.
pub fn content_type(&self) -> Option<Mime> {
self.headers.get::<ContentType>().map(|ct| ct.0.clone())
}
} }
struct FetchTask { impl Stream for Response {
url: String, type Item = hyper::Chunk;
client: Arc<reqwest::Client>,
limit: Option<usize>,
abort: Abort,
}
impl Future for FetchTask {
// TODO [ToDr] timeouts handling?
type Item = Response;
type Error = Error; type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
if self.abort.is_aborted() { if self.abort.is_aborted() {
trace!(target: "fetch", "Fetch of {:?} aborted.", self.url); debug!(target: "fetch", "fetch of {} aborted", self.url);
return Err(Error::Aborted); return Err(Error::Aborted)
}
match try_ready!(self.body.poll()) {
None => Ok(Async::Ready(None)),
Some(c) => {
if self.nread + c.len() > self.abort.max_size() {
debug!(target: "fetch", "size limit {:?} for {} exceeded", self.abort.max_size(), self.url);
return Err(Error::SizeLimit)
}
self.nread += c.len();
Ok(Async::Ready(Some(c)))
}
} }
trace!(target: "fetch", "Starting fetch task: {:?}", self.url);
let result = self.client.get(&self.url)
.header(reqwest::header::UserAgent::new("Parity Fetch"))
.send()?;
Ok(futures::Async::Ready(Response {
inner: ResponseInner::Response(result),
abort: self.abort.clone(),
limit: self.limit,
read: 0,
}))
} }
} }
/// Fetch Error /// `BodyReader` serves as an adapter from async to sync I/O.
///
/// It implements `io::Read` by repedately waiting for the next `Chunk`
/// of hyper's response `Body` which blocks the current thread.
pub struct BodyReader {
chunk: hyper::Chunk,
body: Option<hyper::Body>,
abort: Abort,
offset: usize,
count: usize,
}
impl BodyReader {
/// Create a new body reader for the given response.
pub fn new(r: Response) -> BodyReader {
BodyReader {
body: Some(r.body),
chunk: Default::default(),
abort: r.abort,
offset: 0,
count: 0,
}
}
}
impl io::Read for BodyReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut n = 0;
while self.body.is_some() {
// Can we still read from the current chunk?
if self.offset < self.chunk.len() {
let k = min(self.chunk.len() - self.offset, buf.len() - n);
if self.count + k > self.abort.max_size() {
debug!(target: "fetch", "size limit {:?} exceeded", self.abort.max_size());
return Err(io::Error::new(io::ErrorKind::PermissionDenied, "size limit exceeded"))
}
let c = &self.chunk[self.offset .. self.offset + k];
(&mut buf[n .. n + k]).copy_from_slice(c);
self.offset += k;
self.count += k;
n += k;
if n == buf.len() {
break
}
} else {
let body = self.body.take().expect("loop condition ensures `self.body` is always defined; qed");
match body.into_future().wait() { // wait for next chunk
Err((e, _)) => {
error!(target: "fetch", "failed to read chunk: {}", e);
return Err(io::Error::new(io::ErrorKind::Other, "failed to read body chunk"))
}
Ok((None, _)) => break, // body is exhausted, break out of the loop
Ok((Some(c), b)) => {
self.body = Some(b);
self.chunk = c;
self.offset = 0
}
}
}
}
Ok(n)
}
}
/// Fetch error cases.
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
/// Internal fetch error /// Hyper gave us an error.
Fetch(reqwest::Error), Hyper(hyper::Error),
/// Request aborted /// Some I/O error occured.
Io(io::Error),
/// Invalid URLs where attempted to parse.
Url(url::ParseError),
/// Calling code invoked `Abort::abort`.
Aborted, Aborted,
/// Too many redirects have been encountered.
TooManyRedirects,
/// The maximum duration was reached.
Timeout,
/// The response body is too large.
SizeLimit,
/// The background processing thread does not run.
BackgroundThreadDead,
} }
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
Error::Aborted => write!(fmt, "The request has been aborted."), Error::Aborted => write!(fmt, "The request has been aborted."),
Error::Fetch(ref err) => write!(fmt, "{}", err), Error::Hyper(ref e) => write!(fmt, "{}", e),
Error::Url(ref e) => write!(fmt, "{}", e),
Error::Io(ref e) => write!(fmt, "{}", e),
Error::BackgroundThreadDead => write!(fmt, "background thread gond"),
Error::TooManyRedirects => write!(fmt, "too many redirects"),
Error::Timeout => write!(fmt, "request timed out"),
Error::SizeLimit => write!(fmt, "size limit reached"),
} }
} }
} }
impl From<reqwest::Error> for Error { impl From<hyper::Error> for Error {
fn from(error: reqwest::Error) -> Self { fn from(e: hyper::Error) -> Self {
Error::Fetch(error) Error::Hyper(e)
} }
} }
enum ResponseInner { impl From<io::Error> for Error {
Response(reqwest::Response), fn from(e: io::Error) -> Self {
Reader(Box<io::Read + Send>), Error::Io(e)
NotFound,
}
impl fmt::Debug for ResponseInner {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ResponseInner::Response(ref response) => response.fmt(f),
ResponseInner::NotFound => write!(f, "Not found"),
ResponseInner::Reader(_) => write!(f, "io Reader"),
}
} }
} }
/// A fetch response type. impl From<url::ParseError> for Error {
#[derive(Debug)] fn from(e: url::ParseError) -> Self {
pub struct Response { Error::Url(e)
inner: ResponseInner,
abort: Abort,
limit: Option<usize>,
read: usize,
}
impl Response {
/// Creates new successfuly response reading from a file.
pub fn from_reader<R: io::Read + Send + 'static>(reader: R) -> Self {
Response {
inner: ResponseInner::Reader(Box::new(reader)),
abort: Abort::default(),
limit: None,
read: 0,
}
}
/// Creates 404 response (useful for tests)
pub fn not_found() -> Self {
Response {
inner: ResponseInner::NotFound,
abort: Abort::default(),
limit: None,
read: 0,
}
}
/// Returns status code of this response.
pub fn status(&self) -> reqwest::StatusCode {
match self.inner {
ResponseInner::Response(ref r) => r.status(),
ResponseInner::NotFound => reqwest::StatusCode::NotFound,
_ => reqwest::StatusCode::Ok,
}
}
/// Returns `true` if response status code is successful.
pub fn is_success(&self) -> bool {
self.status() == reqwest::StatusCode::Ok
}
/// Returns `true` if content type of this response is `text/html`
pub fn is_html(&self) -> bool {
match self.content_type() {
Some(ref mime) if mime.type_() == "text" && mime.subtype() == "html" => true,
_ => false,
}
}
/// Returns content type of this response (if present)
pub fn content_type(&self) -> Option<Mime> {
match self.inner {
ResponseInner::Response(ref r) => {
let content_type = r.headers().get::<reqwest::header::ContentType>();
content_type.map(|mime| mime.0.clone())
},
_ => None,
}
} }
} }
impl io::Read for Response { #[cfg(test)]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { mod test {
if self.abort.is_aborted() { use super::*;
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "Fetch aborted.")); use futures::future;
use futures::sync::mpsc;
use futures_timer::Delay;
use hyper::StatusCode;
use hyper::server::{Http, Request, Response, Service};
use std;
use std::io::Read;
use std::net::SocketAddr;
const ADDRESS: &str = "127.0.0.1:0";
#[test]
fn it_should_fetch() {
let server = TestServer::run();
let client = Client::new().unwrap();
let future = client.fetch(&format!("http://{}?123", server.addr()), Default::default());
let resp = future.wait().unwrap();
assert!(resp.is_success());
let body = resp.concat2().wait().unwrap();
assert_eq!(&body[..], b"123")
}
#[test]
fn it_should_timeout() {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_duration(Duration::from_secs(1));
match client.fetch(&format!("http://{}/delay?3", server.addr()), abort).wait() {
Err(Error::Timeout) => {}
other => panic!("expected timeout, got {:?}", other)
} }
}
let res = match self.inner { #[test]
ResponseInner::Response(ref mut response) => response.read(buf), fn it_should_follow_redirects() {
ResponseInner::NotFound => return Ok(0), let server = TestServer::run();
ResponseInner::Reader(ref mut reader) => reader.read(buf), let client = Client::new().unwrap();
}; let abort = Abort::default();
let future = client.fetch(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), abort);
assert!(future.wait().unwrap().is_success())
}
// increase bytes read #[test]
if let Ok(read) = res { fn it_should_follow_relative_redirects() {
self.read += read; let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_redirects(4);
let future = client.fetch(&format!("http://{}/redirect?/", server.addr()), abort);
assert!(future.wait().unwrap().is_success())
}
#[test]
fn it_should_not_follow_too_many_redirects() {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_redirects(3);
match client.fetch(&format!("http://{}/loop", server.addr()), abort).wait() {
Err(Error::TooManyRedirects) => {}
other => panic!("expected too many redirects error, got {:?}", other)
} }
}
// check limit #[test]
match self.limit { fn it_should_read_data() {
Some(limit) if limit < self.read => { let server = TestServer::run();
return Err(io::Error::new(io::ErrorKind::PermissionDenied, "Size limit reached.")); let client = Client::new().unwrap();
}, let abort = Abort::default();
_ => {}, let future = client.fetch(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), abort);
let resp = future.wait().unwrap();
assert!(resp.is_success());
assert_eq!(&resp.concat2().wait().unwrap()[..], b"abcdefghijklmnopqrstuvwxyz")
}
#[test]
fn it_should_not_read_too_much_data() {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_size(3);
let resp = client.fetch(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
assert!(resp.is_success());
match resp.concat2().wait() {
Err(Error::SizeLimit) => {}
other => panic!("expected size limit error, got {:?}", other)
} }
}
res #[test]
fn it_should_not_read_too_much_data_sync() {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_size(3);
let resp = client.fetch(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
assert!(resp.is_success());
let mut buffer = Vec::new();
let mut reader = BodyReader::new(resp);
match reader.read_to_end(&mut buffer) {
Err(ref e) if e.kind() == io::ErrorKind::PermissionDenied => {}
other => panic!("expected size limit error, got {:?}", other)
}
}
struct TestServer;
impl Service for TestServer {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
fn call(&self, req: Request) -> Self::Future {
match req.uri().path() {
"/" => {
let body = req.uri().query().unwrap_or("").to_string();
let req = Response::new().with_body(body);
Box::new(future::ok(req))
}
"/redirect" => {
let loc = Location::new(req.uri().query().unwrap_or("/").to_string());
let req = Response::new()
.with_status(StatusCode::MovedPermanently)
.with_header(loc);
Box::new(future::ok(req))
}
"/loop" => {
let req = Response::new()
.with_status(StatusCode::MovedPermanently)
.with_header(Location::new("/loop".to_string()));
Box::new(future::ok(req))
}
"/delay" => {
let d = Duration::from_secs(req.uri().query().unwrap_or("0").parse().unwrap());
Box::new(Delay::new(d).from_err().map(|_| Response::new()))
}
_ => Box::new(future::ok(Response::new().with_status(StatusCode::NotFound)))
}
}
}
impl TestServer {
fn run() -> Handle {
let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
let (tx_end, rx_end) = mpsc::channel(0);
let rx_end_fut = rx_end.into_future().map(|_| ()).map_err(|_| ());
thread::spawn(move || {
let addr = ADDRESS.parse().unwrap();
let server = Http::new().bind(&addr, || Ok(TestServer)).unwrap();
tx_start.send(server.local_addr().unwrap()).unwrap_or(());
server.run_until(rx_end_fut).unwrap();
});
Handle(rx_start.recv().unwrap(), tx_end)
}
}
struct Handle(SocketAddr, mpsc::Sender<()>);
impl Handle {
fn addr(&self) -> SocketAddr {
self.0
}
}
impl Drop for Handle {
fn drop(&mut self) {
self.1.clone().send(()).wait().unwrap();
}
} }
} }

View File

@ -21,14 +21,19 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use]
extern crate futures; extern crate futures;
extern crate futures_cpupool; extern crate futures_timer;
extern crate parking_lot;
extern crate reqwest;
extern crate hyper;
extern crate hyper_rustls;
extern crate tokio_core;
extern crate url;
/// Fetch client implementation.
pub mod client; pub mod client;
pub use self::reqwest::StatusCode; pub use url::Url;
pub use self::reqwest::mime::Mime; pub use self::client::{Client, Fetch, Error, Response, Abort, BodyReader};
pub use self::client::{Client, Fetch, Error, Response, Abort};