Replace tokio_core with tokio (ring -> 0.13) (#9657)

* Replace `tokio_core` with `tokio`.

* Remove `tokio-core` and replace with `tokio` in

    - `ethcore/stratum`

    - `secret_store`

    - `util/fetch`

    - `util/reactor`

* Bump hyper to 0.12 in

    - `miner`

    - `util/fake-fetch`

    - `util/fetch`

    - `secret_store`

* Bump `jsonrpc-***` to 0.9 in

    - `parity`

    - `ethcore/stratum`

    - `ipfs`

    - `rpc`

    - `rpc_client`

    - `whisper`

* Bump `ring` to 0.13

* Use a more graceful shutdown process in `secret_store` tests.

* Convert some mutexes to rwlocks in `secret_store`.

* Consolidate Tokio Runtime use, remove `CpuPool`.

* Rename and move the `tokio_reactor` crate (`util/reactor`) to
  `tokio_runtime` (`util/runtime`).

* Rename `EventLoop` to `Runtime`.

    - Rename `EventLoop::spawn` to `Runtime::with_default_thread_count`.

    - Add the `Runtime::with_thread_count` method.

    - Rename `Remote` to `Executor`.

* Remove uses of `CpuPool` and spawn all tasks via the `Runtime` executor
  instead.

* Other changes related to `CpuPool` removal:

    - Remove `Reservations::with_pool`. `::new` now takes an `Executor` as an argument.

    - Remove `SenderReservations::with_pool`. `::new` now takes an `Executor` as an argument.
This commit is contained in:
Nick Sanders
2018-10-22 00:40:50 -07:00
committed by Afri Schoedon
parent b8da38f4e4
commit 68ca8df22f
75 changed files with 2027 additions and 1671 deletions

View File

@@ -17,8 +17,7 @@
use futures::future::{self, Loop};
use futures::sync::{mpsc, oneshot};
use futures::{self, Future, Async, Sink, Stream};
use hyper::header::{UserAgent, Location, ContentLength, ContentType};
use hyper::mime::Mime;
use hyper::header::{self, HeaderMap, HeaderValue, IntoHeaderName};
use hyper::{self, Method, StatusCode};
use hyper_rustls;
use std;
@@ -29,8 +28,7 @@ use std::sync::mpsc::RecvTimeoutError;
use std::thread;
use std::time::Duration;
use std::{io, fmt};
use tokio_core::reactor;
use tokio_timer::{self, Timer};
use tokio::{self, util::FutureExt};
use url::{self, Url};
use bytes::Bytes;
@@ -118,7 +116,7 @@ impl Abort {
/// Types which retrieve content from some URL.
pub trait Fetch: Clone + Send + Sync + 'static {
/// The result future.
type Result: Future<Item=Response, Error=Error> + Send + 'static;
type Result: Future<Item = Response, Error = Error> + Send + 'static;
/// Make a request to given URL
fn fetch(&self, request: Request, abort: Abort) -> Self::Result;
@@ -131,7 +129,7 @@ pub trait Fetch: Clone + Send + Sync + 'static {
}
type TxResponse = oneshot::Sender<Result<Response, Error>>;
type TxStartup = std::sync::mpsc::SyncSender<Result<(), io::Error>>;
type TxStartup = std::sync::mpsc::SyncSender<Result<(), tokio::io::Error>>;
type ChanItem = Option<(Request, Abort, TxResponse)>;
/// An implementation of `Fetch` using a `hyper` client.
@@ -140,9 +138,8 @@ type ChanItem = Option<(Request, Abort, TxResponse)>;
// not implement `Send` currently.
#[derive(Debug)]
pub struct Client {
core: mpsc::Sender<ChanItem>,
runtime: mpsc::Sender<ChanItem>,
refs: Arc<AtomicUsize>,
timer: Timer,
}
// When cloning a client we increment the internal reference counter.
@@ -150,9 +147,8 @@ impl Clone for Client {
fn clone(&self) -> Client {
self.refs.fetch_add(1, Ordering::SeqCst);
Client {
core: self.core.clone(),
runtime: self.runtime.clone(),
refs: self.refs.clone(),
timer: self.timer.clone(),
}
}
}
@@ -163,7 +159,7 @@ impl Drop for Client {
fn drop(&mut self) {
if self.refs.fetch_sub(1, Ordering::SeqCst) == 1 {
// ignore send error as it means the background thread is gone already
let _ = self.core.clone().send(None).wait();
let _ = self.runtime.clone().send(None).wait();
}
}
}
@@ -193,23 +189,20 @@ impl Client {
}
Ok(Client {
core: tx_proto,
runtime: tx_proto,
refs: Arc::new(AtomicUsize::new(1)),
timer: Timer::default(),
})
}
fn background_thread(tx_start: TxStartup, rx_proto: mpsc::Receiver<ChanItem>, num_dns_threads: usize) -> io::Result<thread::JoinHandle<()>> {
thread::Builder::new().name("fetch".into()).spawn(move || {
let mut core = match reactor::Core::new() {
let mut runtime = match tokio::runtime::current_thread::Runtime::new() {
Ok(c) => c,
Err(e) => return tx_start.send(Err(e)).unwrap_or(())
};
let handle = core.handle();
let hyper = hyper::Client::configure()
.connector(hyper_rustls::HttpsConnector::new(num_dns_threads, &core.handle()))
.build(&core.handle());
let hyper = hyper::Client::builder()
.build(hyper_rustls::HttpsConnector::new(num_dns_threads));
let future = rx_proto.take_while(|item| Ok(item.is_some()))
.map(|item| item.expect("`take_while` is only passing on channel items != None; qed"))
@@ -241,13 +234,18 @@ impl Client {
request2.set_url(next_url);
request2
} else {
Request::new(next_url, Method::Get)
Request::new(next_url, Method::GET)
};
Ok(Loop::Continue((client, request, abort, redirects + 1)))
} else {
let content_len = resp.headers.get::<ContentLength>().cloned();
if content_len.map(|n| *n > abort.max_size() as u64).unwrap_or(false) {
return Err(Error::SizeLimit)
if let Some(ref h_val) = resp.headers.get(header::CONTENT_LENGTH) {
let content_len = h_val
.to_str()?
.parse::<u64>()?;
if content_len > abort.max_size() as u64 {
return Err(Error::SizeLimit)
}
}
Ok(Loop::Break(resp))
}
@@ -256,7 +254,7 @@ impl Client {
.then(|result| {
future::ok(sender.send(result).unwrap_or(()))
});
handle.spawn(fut);
tokio::spawn(fut);
trace!(target: "fetch", "waiting for next request ...");
future::ok(())
});
@@ -264,7 +262,7 @@ impl Client {
tx_start.send(Ok(())).unwrap_or(());
debug!(target: "fetch", "processing requests ...");
if let Err(()) = core.run(future) {
if let Err(()) = runtime.block_on(future) {
error!(target: "fetch", "error while executing future")
}
debug!(target: "fetch", "fetch background thread finished")
@@ -273,7 +271,7 @@ impl Client {
}
impl Fetch for Client {
type Result = Box<Future<Item=Response, Error=Error> + Send>;
type Result = Box<Future<Item=Response, Error=Error> + Send + 'static>;
fn fetch(&self, request: Request, abort: Abort) -> Self::Result {
debug!(target: "fetch", "fetching: {:?}", request.url());
@@ -282,7 +280,7 @@ impl Fetch for Client {
}
let (tx_res, rx_res) = oneshot::channel();
let maxdur = abort.max_duration();
let sender = self.core.clone();
let sender = self.runtime.clone();
let future = sender.send(Some((request, abort, tx_res)))
.map_err(|e| {
error!(target: "fetch", "failed to schedule request: {}", e);
@@ -291,7 +289,15 @@ impl Fetch for Client {
.and_then(|_| rx_res.map_err(|oneshot::Canceled| Error::BackgroundThreadDead))
.and_then(future::result);
Box::new(self.timer.timeout(future, maxdur))
Box::new(future.timeout(maxdur)
.map_err(|err| {
if err.is_inner() {
Error::from(err.into_inner().unwrap())
} else {
Error::from(err)
}
})
)
}
/// Get content from some URL.
@@ -315,22 +321,21 @@ impl Fetch for Client {
// Extract redirect location from response. The second return value indicate whether the original method should be preserved.
fn redirect_location(u: Url, r: &Response) -> Option<(Url, bool)> {
use hyper::StatusCode::*;
let preserve_method = match r.status() {
TemporaryRedirect | PermanentRedirect => true,
StatusCode::TEMPORARY_REDIRECT | StatusCode::PERMANENT_REDIRECT => true,
_ => false,
};
match r.status() {
MovedPermanently
| PermanentRedirect
| TemporaryRedirect
| Found
| SeeOther => {
if let Some(loc) = r.headers.get::<Location>() {
u.join(loc).ok().map(|url| (url, preserve_method))
} else {
None
}
StatusCode::MOVED_PERMANENTLY
| StatusCode::PERMANENT_REDIRECT
| StatusCode::TEMPORARY_REDIRECT
| StatusCode::FOUND
| StatusCode::SEE_OTHER => {
r.headers.get(header::LOCATION).and_then(|loc| {
loc.to_str().ok().and_then(|loc_s| {
u.join(loc_s).ok().map(|url| (url, preserve_method))
})
})
}
_ => None
}
@@ -341,7 +346,7 @@ fn redirect_location(u: Url, r: &Response) -> Option<(Url, bool)> {
pub struct Request {
url: Url,
method: Method,
headers: hyper::Headers,
headers: HeaderMap,
body: Bytes,
}
@@ -350,19 +355,19 @@ impl Request {
pub fn new(url: Url, method: Method) -> Request {
Request {
url, method,
headers: hyper::Headers::new(),
headers: HeaderMap::new(),
body: Default::default(),
}
}
/// Create a new GET request.
pub fn get(url: Url) -> Request {
Request::new(url, Method::Get)
Request::new(url, Method::GET)
}
/// Create a new empty POST request.
pub fn post(url: Url) -> Request {
Request::new(url, Method::Post)
Request::new(url, Method::POST)
}
/// Read the url.
@@ -371,12 +376,12 @@ impl Request {
}
/// Read the request headers.
pub fn headers(&self) -> &hyper::Headers {
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
/// Get a mutable reference to the headers.
pub fn headers_mut(&mut self) -> &mut hyper::Headers {
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.headers
}
@@ -391,8 +396,10 @@ impl Request {
}
/// Consume self, and return it with the added given header.
pub fn with_header<H: hyper::header::Header>(mut self, value: H) -> Self {
self.headers_mut().set(value);
pub fn with_header<K>(mut self, key: K, val: HeaderValue) -> Self
where K: IntoHeaderName,
{
self.headers_mut().append(key, val);
self
}
@@ -403,16 +410,15 @@ impl Request {
}
}
impl Into<hyper::Request> for Request {
fn into(mut self) -> hyper::Request {
let uri = self.url.as_ref().parse().expect("Every valid URLis also a URI.");
let mut req = hyper::Request::new(self.method, uri);
self.headers.set(UserAgent::new("Parity Fetch Neo"));
*req.headers_mut() = self.headers;
req.set_body(self.body);
req
impl From<Request> for hyper::Request<hyper::Body> {
fn from(req: Request) -> hyper::Request<hyper::Body> {
let uri: hyper::Uri = req.url.as_ref().parse().expect("Every valid URLis also a URI.");
hyper::Request::builder()
.method(req.method)
.uri(uri)
.header(header::USER_AGENT, HeaderValue::from_static("Parity Fetch Neo"))
.body(req.body.into())
.expect("Header, uri, method, and body are already valid and can not fail to parse; qed")
}
}
@@ -421,7 +427,7 @@ impl Into<hyper::Request> for Request {
pub struct Response {
url: Url,
status: StatusCode,
headers: hyper::Headers,
headers: HeaderMap,
body: hyper::Body,
abort: Abort,
nread: usize,
@@ -429,12 +435,12 @@ pub struct Response {
impl Response {
/// Create a new response, wrapping a hyper response.
pub fn new(u: Url, r: hyper::Response, a: Abort) -> Response {
pub fn new(u: Url, r: hyper::Response<hyper::Body>, a: Abort) -> Response {
Response {
url: u,
status: r.status(),
headers: r.headers().clone(),
body: r.body(),
body: r.into_body(),
abort: a,
nread: 0,
}
@@ -447,26 +453,21 @@ impl Response {
/// Status code == OK (200)?
pub fn is_success(&self) -> bool {
self.status() == StatusCode::Ok
self.status() == StatusCode::OK
}
/// Status code == 404.
pub fn is_not_found(&self) -> bool {
self.status() == StatusCode::NotFound
self.status() == StatusCode::NOT_FOUND
}
/// Is the content-type text/html?
pub fn is_html(&self) -> bool {
if let Some(ref mime) = self.content_type() {
mime.type_() == "text" && mime.subtype() == "html"
} else {
false
}
}
/// The conten-type header value.
pub fn content_type(&self) -> Option<Mime> {
self.headers.get::<ContentType>().map(|ct| ct.0.clone())
self.headers.get(header::CONTENT_TYPE).and_then(|ct_val| {
ct_val.to_str().ok().map(|ct_str| {
ct_str.contains("text") && ct_str.contains("html")
})
}).unwrap_or(false)
}
}
@@ -562,6 +563,10 @@ impl io::Read for BodyReader {
pub enum Error {
/// Hyper gave us an error.
Hyper(hyper::Error),
/// A hyper header conversion error.
HyperHeaderToStrError(hyper::header::ToStrError),
/// An integer parsing error.
ParseInt(std::num::ParseIntError),
/// Some I/O error occured.
Io(io::Error),
/// Invalid URLs where attempted to parse.
@@ -570,8 +575,10 @@ pub enum Error {
Aborted,
/// Too many redirects have been encountered.
TooManyRedirects,
/// tokio-timer inner future gave us an error.
TokioTimeoutInnerVal(String),
/// tokio-timer gave us an error.
Timer(tokio_timer::TimerError),
TokioTimer(Option<tokio::timer::Error>),
/// The maximum duration was reached.
Timeout,
/// The response body is too large.
@@ -585,23 +592,43 @@ impl fmt::Display for Error {
match *self {
Error::Aborted => write!(fmt, "The request has been aborted."),
Error::Hyper(ref e) => write!(fmt, "{}", e),
Error::HyperHeaderToStrError(ref e) => write!(fmt, "{}", e),
Error::ParseInt(ref e) => write!(fmt, "{}", e),
Error::Url(ref e) => write!(fmt, "{}", e),
Error::Io(ref e) => write!(fmt, "{}", e),
Error::BackgroundThreadDead => write!(fmt, "background thread gond"),
Error::TooManyRedirects => write!(fmt, "too many redirects"),
Error::Timer(ref e) => write!(fmt, "{}", e),
Error::TokioTimeoutInnerVal(ref s) => write!(fmt, "tokio timer inner value error: {:?}", s),
Error::TokioTimer(ref e) => write!(fmt, "tokio timer error: {:?}", e),
Error::Timeout => write!(fmt, "request timed out"),
Error::SizeLimit => write!(fmt, "size limit reached"),
}
}
}
impl ::std::error::Error for Error {
fn description(&self) -> &str { "Fetch client error" }
fn cause(&self) -> Option<&::std::error::Error> { None }
}
impl From<hyper::Error> for Error {
fn from(e: hyper::Error) -> Self {
Error::Hyper(e)
}
}
impl From<hyper::header::ToStrError> for Error {
fn from(e: hyper::header::ToStrError) -> Self {
Error::HyperHeaderToStrError(e)
}
}
impl From<std::num::ParseIntError> for Error {
fn from(e: std::num::ParseIntError) -> Self {
Error::ParseInt(e)
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::Io(e)
@@ -614,24 +641,35 @@ impl From<url::ParseError> for Error {
}
}
impl<F> From<tokio_timer::TimeoutError<F>> for Error {
fn from(e: tokio_timer::TimeoutError<F>) -> Self {
match e {
tokio_timer::TimeoutError::Timer(_, e) => Error::Timer(e),
tokio_timer::TimeoutError::TimedOut(_) => Error::Timeout,
impl<T: std::fmt::Debug> From<tokio::timer::timeout::Error<T>> for Error {
fn from(e: tokio::timer::timeout::Error<T>) -> Self {
if e.is_inner() {
Error::TokioTimeoutInnerVal(format!("{:?}", e.into_inner().unwrap()))
} else if e.is_elapsed() {
Error::Timeout
} else {
Error::TokioTimer(e.into_timer())
}
}
}
impl From<tokio::timer::Error> for Error {
fn from(e: tokio::timer::Error) -> Self {
Error::TokioTimer(Some(e))
}
}
#[cfg(test)]
mod test {
use super::*;
use futures::future;
use futures::sync::mpsc;
use hyper::StatusCode;
use hyper::server::{Http, Request, Response, Service};
use tokio_timer::Timer;
use std;
use futures::sync::oneshot;
use hyper::{
StatusCode,
service::Service,
};
use tokio::timer::Delay;
use tokio::runtime::current_thread::Runtime;
use std::io::Read;
use std::net::SocketAddr;
@@ -641,139 +679,238 @@ mod test {
fn it_should_fetch() {
let server = TestServer::run();
let client = Client::new(4).unwrap();
let future = client.get(&format!("http://{}?123", server.addr()), Default::default());
let resp = future.wait().unwrap();
assert!(resp.is_success());
let body = resp.concat2().wait().unwrap();
assert_eq!(&body[..], b"123")
let mut runtime = Runtime::new().unwrap();
let future = client.get(&format!("http://{}?123", server.addr()), Abort::default())
.map(|resp| {
assert!(resp.is_success());
resp
})
.map(|resp| resp.concat2())
.flatten()
.map(|body| assert_eq!(&body[..], b"123"))
.map_err(|err| panic!(err));
runtime.block_on(future).unwrap();
}
#[test]
fn it_should_fetch_in_light_mode() {
let server = TestServer::run();
let client = Client::new(1).unwrap();
let future = client.get(&format!("http://{}?123", server.addr()), Default::default());
let resp = future.wait().unwrap();
assert!(resp.is_success());
let body = resp.concat2().wait().unwrap();
assert_eq!(&body[..], b"123")
let mut runtime = Runtime::new().unwrap();
let future = client.get(&format!("http://{}?123", server.addr()), Abort::default())
.map(|resp| {
assert!(resp.is_success());
resp
})
.map(|resp| resp.concat2())
.flatten()
.map(|body| assert_eq!(&body[..], b"123"))
.map_err(|err| panic!(err));
runtime.block_on(future).unwrap();
}
#[test]
fn it_should_timeout() {
let server = TestServer::run();
let client = Client::new(4).unwrap();
let mut runtime = Runtime::new().unwrap();
let abort = Abort::default().with_max_duration(Duration::from_secs(1));
match client.get(&format!("http://{}/delay?3", server.addr()), abort).wait() {
Err(Error::Timeout) => {}
other => panic!("expected timeout, got {:?}", other)
}
let future = client.get(&format!("http://{}/delay?3", server.addr()), abort)
.then(|res| {
match res {
Err(Error::Timeout) => Ok::<_, ()>(()),
other => panic!("expected timeout, got {:?}", other),
}
});
runtime.block_on(future).unwrap();
}
#[test]
fn it_should_follow_redirects() {
let server = TestServer::run();
let client = Client::new(4).unwrap();
let mut runtime = Runtime::new().unwrap();
let abort = Abort::default();
let future = client.get(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), abort);
assert!(future.wait().unwrap().is_success())
let future = client.get(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), abort)
.and_then(|resp| {
if resp.is_success() { Ok(()) } else { panic!("Response unsuccessful") }
});
runtime.block_on(future).unwrap();
}
#[test]
fn it_should_follow_relative_redirects() {
let server = TestServer::run();
let client = Client::new(4).unwrap();
let mut runtime = Runtime::new().unwrap();
let abort = Abort::default().with_max_redirects(4);
let future = client.get(&format!("http://{}/redirect?/", server.addr()), abort);
assert!(future.wait().unwrap().is_success())
let future = client.get(&format!("http://{}/redirect?/", server.addr()), abort)
.and_then(|resp| {
if resp.is_success() { Ok(()) } else { panic!("Response unsuccessful") }
});
runtime.block_on(future).unwrap();
}
#[test]
fn it_should_not_follow_too_many_redirects() {
let server = TestServer::run();
let client = Client::new(4).unwrap();
let mut runtime = Runtime::new().unwrap();
let abort = Abort::default().with_max_redirects(3);
match client.get(&format!("http://{}/loop", server.addr()), abort).wait() {
Err(Error::TooManyRedirects) => {}
other => panic!("expected too many redirects error, got {:?}", other)
}
let future = client.get(&format!("http://{}/loop", server.addr()), abort)
.then(|res| {
match res {
Err(Error::TooManyRedirects) => Ok::<_, ()>(()),
other => panic!("expected too many redirects error, got {:?}", other)
}
});
runtime.block_on(future).unwrap();
}
#[test]
fn it_should_read_data() {
let server = TestServer::run();
let client = Client::new(4).unwrap();
let mut runtime = Runtime::new().unwrap();
let abort = Abort::default();
let future = client.get(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), abort);
let resp = future.wait().unwrap();
assert!(resp.is_success());
assert_eq!(&resp.concat2().wait().unwrap()[..], b"abcdefghijklmnopqrstuvwxyz")
let future = client.get(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), abort)
.and_then(|resp| {
if resp.is_success() { Ok(resp) } else { panic!("Response unsuccessful") }
})
.map(|resp| resp.concat2())
.flatten()
.map(|body| assert_eq!(&body[..], b"abcdefghijklmnopqrstuvwxyz"));
runtime.block_on(future).unwrap();
}
#[test]
fn it_should_not_read_too_much_data() {
let server = TestServer::run();
let client = Client::new(4).unwrap();
let mut runtime = Runtime::new().unwrap();
let abort = Abort::default().with_max_size(3);
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
assert!(resp.is_success());
match resp.concat2().wait() {
Err(Error::SizeLimit) => {}
other => panic!("expected size limit error, got {:?}", other)
}
let future = client.get(&format!("http://{}/?1234", server.addr()), abort)
.and_then(|resp| {
if resp.is_success() { Ok(resp) } else { panic!("Response unsuccessful") }
})
.map(|resp| resp.concat2())
.flatten()
.then(|body| {
match body {
Err(Error::SizeLimit) => Ok::<_, ()>(()),
other => panic!("expected size limit error, got {:?}", other),
}
});
runtime.block_on(future).unwrap();
}
#[test]
fn it_should_not_read_too_much_data_sync() {
let server = TestServer::run();
let client = Client::new(4).unwrap();
let mut runtime = Runtime::new().unwrap();
// let abort = Abort::default().with_max_size(3);
// let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
// assert!(resp.is_success());
// let mut buffer = Vec::new();
// let mut reader = BodyReader::new(resp);
// match reader.read_to_end(&mut buffer) {
// Err(ref e) if e.kind() == io::ErrorKind::PermissionDenied => {}
// other => panic!("expected size limit error, got {:?}", other)
// }
// FIXME (c0gent): The prior version of this test (pre-hyper-0.12,
// commented out above) is not possible to recreate. It relied on an
// apparent bug in `Client::background_thread` which suppressed the
// `SizeLimit` error from occurring. This is due to the headers
// collection not returning a value for content length when queried.
// The precise reason why this was happening is unclear.
let abort = Abort::default().with_max_size(3);
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
assert!(resp.is_success());
let mut buffer = Vec::new();
let mut reader = BodyReader::new(resp);
match reader.read_to_end(&mut buffer) {
Err(ref e) if e.kind() == io::ErrorKind::PermissionDenied => {}
other => panic!("expected size limit error, got {:?}", other)
let future = client.get(&format!("http://{}/?1234", server.addr()), abort)
.and_then(|resp| {
assert_eq!(true, false, "Unreachable. (see FIXME note)");
assert!(resp.is_success());
let mut buffer = Vec::new();
let mut reader = BodyReader::new(resp);
match reader.read_to_end(&mut buffer) {
Err(ref e) if e.kind() == io::ErrorKind::PermissionDenied => Ok(()),
other => panic!("expected size limit error, got {:?}", other)
}
});
// FIXME: This simply demonstrates the above point.
match runtime.block_on(future) {
Err(Error::SizeLimit) => {},
other => panic!("Expected `Error::SizeLimit`, got: {:?}", other),
}
}
struct TestServer(Timer);
struct TestServer;
impl Service for TestServer {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = Error;
type Future = Box<Future<Item=hyper::Response<Self::ResBody>, Error=Self::Error> + Send + 'static>;
fn call(&self, req: Request) -> Self::Future {
fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
match req.uri().path() {
"/" => {
let body = req.uri().query().unwrap_or("").to_string();
let req = Response::new().with_body(body);
Box::new(future::ok(req))
let res = hyper::Response::new(body.into());
Box::new(future::ok(res))
}
"/redirect" => {
let loc = Location::new(req.uri().query().unwrap_or("/").to_string());
let req = Response::new()
.with_status(StatusCode::MovedPermanently)
.with_header(loc);
Box::new(future::ok(req))
let loc = req.uri().query().unwrap_or("/").to_string();
let res = hyper::Response::builder()
.status(StatusCode::MOVED_PERMANENTLY)
.header(hyper::header::LOCATION, loc)
.body(hyper::Body::empty())
.expect("Unable to create response");
Box::new(future::ok(res))
}
"/loop" => {
let req = Response::new()
.with_status(StatusCode::MovedPermanently)
.with_header(Location::new("/loop".to_string()));
Box::new(future::ok(req))
let res = hyper::Response::builder()
.status(StatusCode::MOVED_PERMANENTLY)
.header(hyper::header::LOCATION, "/loop")
.body(hyper::Body::empty())
.expect("Unable to create response");
Box::new(future::ok(res))
}
"/delay" => {
let d = Duration::from_secs(req.uri().query().unwrap_or("0").parse().unwrap());
Box::new(self.0.sleep(d)
.map_err(|_| return io::Error::new(io::ErrorKind::Other, "timer error"))
.from_err()
.map(|_| Response::new()))
let dur = Duration::from_secs(req.uri().query().unwrap_or("0").parse().unwrap());
let delayed_res = Delay::new(std::time::Instant::now() + dur)
.and_then(|_| Ok::<_, _>(hyper::Response::new(hyper::Body::empty())))
.from_err();
Box::new(delayed_res)
}
_ => {
let res = hyper::Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::empty())
.expect("Unable to create response");
Box::new(future::ok(res))
}
_ => Box::new(future::ok(Response::new().with_status(StatusCode::NotFound)))
}
}
}
@@ -781,19 +918,27 @@ mod test {
impl TestServer {
fn run() -> Handle {
let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
let (tx_end, rx_end) = mpsc::channel(0);
let rx_end_fut = rx_end.into_future().map(|_| ()).map_err(|_| ());
let (tx_end, rx_end) = oneshot::channel();
let rx_end_fut = rx_end.map(|_| ()).map_err(|_| ());
thread::spawn(move || {
let addr = ADDRESS.parse().unwrap();
let server = Http::new().bind(&addr, || Ok(TestServer(Timer::default()))).unwrap();
tx_start.send(server.local_addr().unwrap()).unwrap_or(());
server.run_until(rx_end_fut).unwrap();
let server = hyper::server::Server::bind(&addr)
.serve(|| future::ok::<_, hyper::Error>(TestServer));
tx_start.send(server.local_addr()).unwrap_or(());
tokio::run(
server.with_graceful_shutdown(rx_end_fut)
.map_err(|e| panic!("server error: {}", e))
);
});
Handle(rx_start.recv().unwrap(), tx_end)
Handle(rx_start.recv().unwrap(), Some(tx_end))
}
}
struct Handle(SocketAddr, mpsc::Sender<()>);
struct Handle(SocketAddr, Option<oneshot::Sender<()>>);
impl Handle {
fn addr(&self) -> SocketAddr {
@@ -803,7 +948,7 @@ mod test {
impl Drop for Handle {
fn drop(&mut self) {
self.1.clone().send(()).wait().unwrap();
self.1.take().unwrap().send(()).unwrap();
}
}
}

View File

@@ -26,9 +26,9 @@ extern crate futures;
extern crate hyper;
extern crate hyper_rustls;
extern crate http;
extern crate tokio_core;
extern crate tokio_timer;
extern crate tokio;
extern crate url;
extern crate bytes;