diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index 4deeb1244..d97edfdfa 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::collections::{BTreeMap, BTreeSet}; use std::collections::btree_map::Entry; use std::net::{SocketAddr, IpAddr}; -use futures::{finished, failed, Future, Stream, BoxFuture}; +use futures::{finished, failed, Future, Stream}; use futures_cpupool::CpuPool; use parking_lot::{RwLock, Mutex}; use tokio_io::IoFuture; @@ -55,9 +55,8 @@ const KEEP_ALIVE_SEND_INTERVAL: u64 = 30; /// we must treat this node as non-responding && disconnect from it. const KEEP_ALIVE_DISCONNECT_INTERVAL: u64 = 60; -/// Encryption sesion timeout interval. It works /// Empty future. -type BoxedEmptyFuture = BoxFuture<(), ()>; +type BoxedEmptyFuture = ::std::boxed::Box + Send>; /// Cluster interface for external clients. pub trait ClusterClient: Send + Sync { @@ -261,23 +260,21 @@ impl ClusterCore { /// Connect to socket using given context and handle. fn connect_future(handle: &Handle, data: Arc, node_address: SocketAddr) -> BoxedEmptyFuture { let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect(); - net_connect(&node_address, handle, data.self_key_pair.clone(), disconnected_nodes) + Box::new(net_connect(&node_address, handle, data.self_key_pair.clone(), disconnected_nodes) .then(move |result| ClusterCore::process_connection_result(data, Some(node_address), result)) - .then(|_| finished(())) - .boxed() + .then(|_| finished(()))) } /// Start listening for incoming connections. fn listen(handle: &Handle, data: Arc, listen_address: SocketAddr) -> Result { - Ok(TcpListener::bind(&listen_address, &handle)? + Ok(Box::new(TcpListener::bind(&listen_address, &handle)? .incoming() .and_then(move |(stream, node_address)| { ClusterCore::accept_connection(data.clone(), stream, node_address); Ok(()) }) .for_each(|_| Ok(())) - .then(|_| finished(())) - .boxed()) + .then(|_| finished(())))) } /// Accept connection. @@ -289,21 +286,19 @@ impl ClusterCore { /// Accept connection future. fn accept_connection_future(handle: &Handle, data: Arc, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture { - net_accept_connection(node_address, stream, handle, data.self_key_pair.clone()) + Box::new(net_accept_connection(node_address, stream, handle, data.self_key_pair.clone()) .then(move |result| ClusterCore::process_connection_result(data, None, result)) - .then(|_| finished(())) - .boxed() + .then(|_| finished(()))) } /// Schedule mainatain procedures. fn schedule_maintain(handle: &Handle, data: Arc) { let d = data.clone(); - let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle) + let interval: BoxedEmptyFuture = Box::new(Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle) .expect("failed to create interval") .and_then(move |_| Ok(ClusterCore::maintain(data.clone()))) .for_each(|_| Ok(())) - .then(|_| finished(())) - .boxed(); + .then(|_| finished(()))); d.spawn(interval); } @@ -319,7 +314,7 @@ impl ClusterCore { /// Called for every incomming mesage. fn process_connection_messages(data: Arc, connection: Arc) -> IoFuture> { - connection + Box::new(connection .read_message() .then(move |result| match result { @@ -327,22 +322,22 @@ impl ClusterCore { ClusterCore::process_connection_message(data.clone(), connection.clone(), message); // continue serving connection data.spawn(ClusterCore::process_connection_messages(data.clone(), connection)); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) }, Ok((_, Err(err))) => { warn!(target: "secretstore_net", "{}: protocol error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id()); // continue serving connection data.spawn(ClusterCore::process_connection_messages(data.clone(), connection)); - finished(Err(err)).boxed() + Box::new(finished(Err(err))) }, Err(err) => { warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id()); // close connection data.connections.remove(connection.node_id(), connection.is_inbound()); - failed(err).boxed() + Box::new(failed(err)) }, } - ).boxed() + )) } /// Send keepalive messages to every othe node. @@ -377,26 +372,26 @@ impl ClusterCore { if data.connections.insert(connection.clone()) { ClusterCore::process_connection_messages(data.clone(), connection) } else { - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) } }, Ok(DeadlineStatus::Meet(Err(err))) => { warn!(target: "secretstore_net", "{}: protocol error '{}' when establishing {} connection{}", data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" }, outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) }, Ok(DeadlineStatus::Timeout) => { warn!(target: "secretstore_net", "{}: timeout when establishing {} connection{}", data.self_key_pair.public(), if outbound_addr.is_some() { "outbound" } else { "inbound" }, outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) }, Err(err) => { warn!(target: "secretstore_net", "{}: network error '{}' when establishing {} connection{}", data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" }, outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); - finished(Ok(())).boxed() + Box::new(finished(Ok(()))) }, } } diff --git a/secret_store/src/key_server_cluster/io/deadline.rs b/secret_store/src/key_server_cluster/io/deadline.rs index a2f794e76..620003775 100644 --- a/secret_store/src/key_server_cluster/io/deadline.rs +++ b/secret_store/src/key_server_cluster/io/deadline.rs @@ -16,16 +16,16 @@ use std::io; use std::time::Duration; -use futures::{Future, Select, BoxFuture, Poll, Async}; +use futures::{Future, Select, Poll, Async}; use tokio_core::reactor::{Handle, Timeout}; -type DeadlineBox = BoxFuture::Item>, ::Error>; +type DeadlineBox = ::std::boxed::Box::Item>, Error = ::Error> + Send>; /// Complete a passed future or fail if it is not completed within timeout. pub fn deadline(duration: Duration, handle: &Handle, future: F) -> Result, io::Error> where F: Future + Send + 'static, T: 'static { - let timeout = Timeout::new(duration, handle)?.map(|_| DeadlineStatus::Timeout).boxed(); - let future = future.map(DeadlineStatus::Meet).boxed(); + let timeout: DeadlineBox = Box::new(Timeout::new(duration, handle)?.map(|_| DeadlineStatus::Timeout)); + let future: DeadlineBox = Box::new(future.map(DeadlineStatus::Meet)); let deadline = Deadline { future: timeout.select(future), };