Use hyper 0.11 in ethcore-miner and improvements in parity-reactor (#8335)

* parity-reactor: Pass over Handle in spawning fn to allow normal tokio ops

* Allow fetch to work with arbitrary requests

* typo: Fix missing handler closure

* miner, work_notify: use fetch and parity-reactor

* Fix work_notify pushing in parity CLI
This commit is contained in:
Wei Tang
2018-04-10 19:51:29 +08:00
committed by Marek Kotewicz
parent 86446d713a
commit 692cd10d4a
20 changed files with 302 additions and 221 deletions

View File

@@ -20,7 +20,7 @@ use futures::{self, Future, Async, Sink, Stream};
use futures_timer::FutureExt;
use hyper::header::{UserAgent, Location, ContentLength, ContentType};
use hyper::mime::Mime;
use hyper::{self, Request, Method, StatusCode};
use hyper::{self, Method, StatusCode};
use hyper_rustls;
use std;
use std::cmp::min;
@@ -32,6 +32,7 @@ use std::time::Duration;
use std::{io, fmt};
use tokio_core::reactor;
use url::{self, Url};
use bytes::Bytes;
const MAX_SIZE: usize = 64 * 1024 * 1024;
const MAX_SECS: Duration = Duration::from_secs(5);
@@ -120,22 +121,18 @@ pub trait Fetch: Clone + Send + Sync + 'static {
type Result: Future<Item=Response, Error=Error> + Send + 'static;
/// Make a request to given URL
fn fetch(&self, url: &str, method: Method, abort: Abort) -> Self::Result;
fn fetch(&self, request: Request, abort: Abort) -> Self::Result;
/// Get content from some URL.
fn get(&self, url: &str, abort: Abort) -> Self::Result {
self.fetch(url, Method::Get, abort)
}
fn get(&self, url: &str, abort: Abort) -> Self::Result;
/// Post content to some URL.
fn post(&self, url: &str, abort: Abort) -> Self::Result {
self.fetch(url, Method::Post, abort)
}
fn post(&self, url: &str, abort: Abort) -> Self::Result;
}
type TxResponse = oneshot::Sender<Result<Response, Error>>;
type TxStartup = std::sync::mpsc::SyncSender<Result<(), io::Error>>;
type ChanItem = Option<(Url, Method, Abort, TxResponse)>;
type ChanItem = Option<(Request, Abort, TxResponse)>;
/// An implementation of `Fetch` using a `hyper` client.
// Due to the `Send` bound of `Fetch` we spawn a background thread for
@@ -213,29 +210,37 @@ impl Client {
let future = rx_proto.take_while(|item| Ok(item.is_some()))
.map(|item| item.expect("`take_while` is only passing on channel items != None; qed"))
.for_each(|(url, method, abort, sender)|
.for_each(|(request, abort, sender)|
{
trace!(target: "fetch", "new request to {}", url);
trace!(target: "fetch", "new request to {}", request.url());
if abort.is_aborted() {
return future::ok(sender.send(Err(Error::Aborted)).unwrap_or(()))
}
let ini = (hyper.clone(), url, method, abort, 0);
let fut = future::loop_fn(ini, |(client, url, method, abort, redirects)| {
let url2 = url.clone();
let ini = (hyper.clone(), request, abort, 0);
let fut = future::loop_fn(ini, |(client, request, abort, redirects)| {
let request2 = request.clone();
let url2 = request2.url().clone();
let abort2 = abort.clone();
client.request(build_request(&url, method.clone()))
client.request(request.into())
.map(move |resp| Response::new(url2, resp, abort2))
.from_err()
.and_then(move |resp| {
if abort.is_aborted() {
debug!(target: "fetch", "fetch of {} aborted", url);
debug!(target: "fetch", "fetch of {} aborted", request2.url());
return Err(Error::Aborted)
}
if let Some(next_url) = redirect_location(url, &resp) {
if let Some((next_url, preserve_method)) = redirect_location(request2.url().clone(), &resp) {
if redirects >= abort.max_redirects() {
return Err(Error::TooManyRedirects)
}
Ok(Loop::Continue((client, next_url, method, abort, redirects + 1)))
let request = if preserve_method {
let mut request2 = request2.clone();
request2.set_url(next_url);
request2
} else {
Request::new(next_url, Method::Get)
};
Ok(Loop::Continue((client, request, abort, redirects + 1)))
} else {
let content_len = resp.headers.get::<ContentLength>().cloned();
if content_len.map(|n| *n > abort.max_size() as u64).unwrap_or(false) {
@@ -267,19 +272,15 @@ impl Client {
impl Fetch for Client {
type Result = Box<Future<Item=Response, Error=Error> + Send>;
fn fetch(&self, url: &str, method: Method, abort: Abort) -> Self::Result {
debug!(target: "fetch", "fetching: {:?}", url);
fn fetch(&self, request: Request, abort: Abort) -> Self::Result {
debug!(target: "fetch", "fetching: {:?}", request.url());
if abort.is_aborted() {
return Box::new(future::err(Error::Aborted))
}
let url: Url = match url.parse() {
Ok(u) => u,
Err(e) => return Box::new(future::err(e.into()))
};
let (tx_res, rx_res) = oneshot::channel();
let maxdur = abort.max_duration();
let sender = self.core.clone();
let future = sender.send(Some((url.clone(), method, abort, tx_res)))
let future = sender.send(Some((request, abort, tx_res)))
.map_err(|e| {
error!(target: "fetch", "failed to schedule request: {}", e);
Error::BackgroundThreadDead
@@ -297,11 +298,33 @@ impl Fetch for Client {
});
Box::new(future)
}
/// Get content from some URL.
fn get(&self, url: &str, abort: Abort) -> Self::Result {
let url: Url = match url.parse() {
Ok(u) => u,
Err(e) => return Box::new(future::err(e.into()))
};
self.fetch(Request::get(url), abort)
}
/// Post content to some URL.
fn post(&self, url: &str, abort: Abort) -> Self::Result {
let url: Url = match url.parse() {
Ok(u) => u,
Err(e) => return Box::new(future::err(e.into()))
};
self.fetch(Request::post(url), abort)
}
}
// Extract redirect location from response.
fn redirect_location(u: Url, r: &Response) -> Option<Url> {
// Extract redirect location from response. The second return value indicate whether the original method should be preserved.
fn redirect_location(u: Url, r: &Response) -> Option<(Url, bool)> {
use hyper::StatusCode::*;
let preserve_method = match r.status() {
TemporaryRedirect | PermanentRedirect => true,
_ => false,
};
match r.status() {
MovedPermanently
| PermanentRedirect
@@ -309,7 +332,7 @@ fn redirect_location(u: Url, r: &Response) -> Option<Url> {
| Found
| SeeOther => {
if let Some(loc) = r.headers.get::<Location>() {
u.join(loc).ok()
u.join(loc).ok().map(|url| (url, preserve_method))
} else {
None
}
@@ -318,12 +341,84 @@ fn redirect_location(u: Url, r: &Response) -> Option<Url> {
}
}
// Build a simple request for the given Url and method
fn build_request(u: &Url, method: Method) -> hyper::Request {
let uri = u.as_ref().parse().expect("Every valid URL is aso a URI.");
let mut rq = Request::new(method, uri);
rq.headers_mut().set(UserAgent::new("Parity Fetch Neo"));
rq
/// A wrapper for hyper::Request using Url and with methods.
#[derive(Debug, Clone)]
pub struct Request {
url: Url,
method: Method,
headers: hyper::Headers,
body: Bytes,
}
impl Request {
/// Create a new request, with given url and method.
pub fn new(url: Url, method: Method) -> Request {
Request {
url, method,
headers: hyper::Headers::new(),
body: Default::default(),
}
}
/// Create a new GET request.
pub fn get(url: Url) -> Request {
Request::new(url, Method::Get)
}
/// Create a new empty POST request.
pub fn post(url: Url) -> Request {
Request::new(url, Method::Post)
}
/// Read the url.
pub fn url(&self) -> &Url {
&self.url
}
/// Read the request headers.
pub fn headers(&self) -> &hyper::Headers {
&self.headers
}
/// Get a mutable reference to the headers.
pub fn headers_mut(&mut self) -> &mut hyper::Headers {
&mut self.headers
}
/// Set the body of the request.
pub fn set_body<T: Into<Bytes>>(&mut self, body: T) {
self.body = body.into();
}
/// Set the url of the request.
pub fn set_url(&mut self, url: Url) {
self.url = url;
}
/// Consume self, and return it with the added given header.
pub fn with_header<H: hyper::header::Header>(mut self, value: H) -> Self {
self.headers_mut().set(value);
self
}
/// Consume self, and return it with the body.
pub fn with_body<T: Into<Bytes>>(mut self, body: T) -> Self {
self.set_body(body);
self
}
}
impl Into<hyper::Request> for Request {
fn into(mut self) -> hyper::Request {
let uri = self.url.as_ref().parse().expect("Every valid URLis also a URI.");
let mut req = hyper::Request::new(self.method, uri);
self.headers.set(UserAgent::new("Parity Fetch Neo"));
*req.headers_mut() = self.headers;
req.set_body(self.body);
req
}
}
/// An HTTP response.
@@ -527,7 +622,7 @@ mod test {
use futures::future;
use futures::sync::mpsc;
use futures_timer::Delay;
use hyper::{StatusCode, Method};
use hyper::StatusCode;
use hyper::server::{Http, Request, Response, Service};
use std;
use std::io::Read;
@@ -539,7 +634,7 @@ mod test {
fn it_should_fetch() {
let server = TestServer::run();
let client = Client::new().unwrap();
let future = client.fetch(&format!("http://{}?123", server.addr()), Method::Get, Default::default());
let future = client.get(&format!("http://{}?123", server.addr()), Default::default());
let resp = future.wait().unwrap();
assert!(resp.is_success());
let body = resp.concat2().wait().unwrap();
@@ -551,7 +646,7 @@ mod test {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_duration(Duration::from_secs(1));
match client.fetch(&format!("http://{}/delay?3", server.addr()), Method::Get, abort).wait() {
match client.get(&format!("http://{}/delay?3", server.addr()), abort).wait() {
Err(Error::Timeout) => {}
other => panic!("expected timeout, got {:?}", other)
}
@@ -562,7 +657,7 @@ mod test {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default();
let future = client.fetch(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), Method::Get, abort);
let future = client.get(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), abort);
assert!(future.wait().unwrap().is_success())
}
@@ -571,7 +666,7 @@ mod test {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_redirects(4);
let future = client.fetch(&format!("http://{}/redirect?/", server.addr()), Method::Get, abort);
let future = client.get(&format!("http://{}/redirect?/", server.addr()), abort);
assert!(future.wait().unwrap().is_success())
}
@@ -580,7 +675,7 @@ mod test {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_redirects(3);
match client.fetch(&format!("http://{}/loop", server.addr()), Method::Get, abort).wait() {
match client.get(&format!("http://{}/loop", server.addr()), abort).wait() {
Err(Error::TooManyRedirects) => {}
other => panic!("expected too many redirects error, got {:?}", other)
}
@@ -591,7 +686,7 @@ mod test {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default();
let future = client.fetch(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), Method::Get, abort);
let future = client.get(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), abort);
let resp = future.wait().unwrap();
assert!(resp.is_success());
assert_eq!(&resp.concat2().wait().unwrap()[..], b"abcdefghijklmnopqrstuvwxyz")
@@ -602,7 +697,7 @@ mod test {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_size(3);
let resp = client.fetch(&format!("http://{}/?1234", server.addr()), Method::Get, abort).wait().unwrap();
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
assert!(resp.is_success());
match resp.concat2().wait() {
Err(Error::SizeLimit) => {}
@@ -615,7 +710,7 @@ mod test {
let server = TestServer::run();
let client = Client::new().unwrap();
let abort = Abort::default().with_max_size(3);
let resp = client.fetch(&format!("http://{}/?1234", server.addr()), Method::Get, abort).wait().unwrap();
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
assert!(resp.is_success());
let mut buffer = Vec::new();
let mut reader = BodyReader::new(resp);

View File

@@ -30,11 +30,11 @@ extern crate hyper_rustls;
extern crate tokio_core;
extern crate url;
extern crate bytes;
/// Fetch client implementation.
pub mod client;
pub use url::Url;
pub use self::client::{Client, Fetch, Error, Response, Abort, BodyReader};
pub use self::client::{Client, Fetch, Error, Response, Request, Abort, BodyReader};
pub use hyper::Method;