From 7cc43893d8c8214979dfe226301bed6bfcbbfd50 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 28 Sep 2017 14:43:51 +0300 Subject: [PATCH] Revert "SecretStore; fixed warnings for futures 0.1.15" This reverts commit 61daa5f3e7c86a3a5a8b259598334fb0240da076. --- .../src/key_server_cluster/cluster.rs | 43 +++++++++++-------- .../src/key_server_cluster/io/deadline.rs | 8 ++-- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index d97edfdfa..4deeb1244 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::collections::{BTreeMap, BTreeSet}; use std::collections::btree_map::Entry; use std::net::{SocketAddr, IpAddr}; -use futures::{finished, failed, Future, Stream}; +use futures::{finished, failed, Future, Stream, BoxFuture}; use futures_cpupool::CpuPool; use parking_lot::{RwLock, Mutex}; use tokio_io::IoFuture; @@ -55,8 +55,9 @@ const KEEP_ALIVE_SEND_INTERVAL: u64 = 30; /// we must treat this node as non-responding && disconnect from it. const KEEP_ALIVE_DISCONNECT_INTERVAL: u64 = 60; +/// Encryption sesion timeout interval. It works /// Empty future. -type BoxedEmptyFuture = ::std::boxed::Box + Send>; +type BoxedEmptyFuture = BoxFuture<(), ()>; /// Cluster interface for external clients. pub trait ClusterClient: Send + Sync { @@ -260,21 +261,23 @@ impl ClusterCore { /// Connect to socket using given context and handle. fn connect_future(handle: &Handle, data: Arc, node_address: SocketAddr) -> BoxedEmptyFuture { let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect(); - Box::new(net_connect(&node_address, handle, data.self_key_pair.clone(), disconnected_nodes) + net_connect(&node_address, handle, data.self_key_pair.clone(), disconnected_nodes) .then(move |result| ClusterCore::process_connection_result(data, Some(node_address), result)) - .then(|_| finished(()))) + .then(|_| finished(())) + .boxed() } /// Start listening for incoming connections. fn listen(handle: &Handle, data: Arc, listen_address: SocketAddr) -> Result { - Ok(Box::new(TcpListener::bind(&listen_address, &handle)? + Ok(TcpListener::bind(&listen_address, &handle)? .incoming() .and_then(move |(stream, node_address)| { ClusterCore::accept_connection(data.clone(), stream, node_address); Ok(()) }) .for_each(|_| Ok(())) - .then(|_| finished(())))) + .then(|_| finished(())) + .boxed()) } /// Accept connection. @@ -286,19 +289,21 @@ impl ClusterCore { /// Accept connection future. fn accept_connection_future(handle: &Handle, data: Arc, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture { - Box::new(net_accept_connection(node_address, stream, handle, data.self_key_pair.clone()) + net_accept_connection(node_address, stream, handle, data.self_key_pair.clone()) .then(move |result| ClusterCore::process_connection_result(data, None, result)) - .then(|_| finished(()))) + .then(|_| finished(())) + .boxed() } /// Schedule mainatain procedures. fn schedule_maintain(handle: &Handle, data: Arc) { let d = data.clone(); - let interval: BoxedEmptyFuture = Box::new(Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle) + let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle) .expect("failed to create interval") .and_then(move |_| Ok(ClusterCore::maintain(data.clone()))) .for_each(|_| Ok(())) - .then(|_| finished(()))); + .then(|_| finished(())) + .boxed(); d.spawn(interval); } @@ -314,7 +319,7 @@ impl ClusterCore { /// Called for every incomming mesage. fn process_connection_messages(data: Arc, connection: Arc) -> IoFuture> { - Box::new(connection + connection .read_message() .then(move |result| match result { @@ -322,22 +327,22 @@ impl ClusterCore { ClusterCore::process_connection_message(data.clone(), connection.clone(), message); // continue serving connection data.spawn(ClusterCore::process_connection_messages(data.clone(), connection)); - Box::new(finished(Ok(()))) + finished(Ok(())).boxed() }, Ok((_, Err(err))) => { warn!(target: "secretstore_net", "{}: protocol error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id()); // continue serving connection data.spawn(ClusterCore::process_connection_messages(data.clone(), connection)); - Box::new(finished(Err(err))) + finished(Err(err)).boxed() }, Err(err) => { warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id()); // close connection data.connections.remove(connection.node_id(), connection.is_inbound()); - Box::new(failed(err)) + failed(err).boxed() }, } - )) + ).boxed() } /// Send keepalive messages to every othe node. @@ -372,26 +377,26 @@ impl ClusterCore { if data.connections.insert(connection.clone()) { ClusterCore::process_connection_messages(data.clone(), connection) } else { - Box::new(finished(Ok(()))) + finished(Ok(())).boxed() } }, Ok(DeadlineStatus::Meet(Err(err))) => { warn!(target: "secretstore_net", "{}: protocol error '{}' when establishing {} connection{}", data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" }, outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); - Box::new(finished(Ok(()))) + finished(Ok(())).boxed() }, Ok(DeadlineStatus::Timeout) => { warn!(target: "secretstore_net", "{}: timeout when establishing {} connection{}", data.self_key_pair.public(), if outbound_addr.is_some() { "outbound" } else { "inbound" }, outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); - Box::new(finished(Ok(()))) + finished(Ok(())).boxed() }, Err(err) => { warn!(target: "secretstore_net", "{}: network error '{}' when establishing {} connection{}", data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" }, outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); - Box::new(finished(Ok(()))) + finished(Ok(())).boxed() }, } } diff --git a/secret_store/src/key_server_cluster/io/deadline.rs b/secret_store/src/key_server_cluster/io/deadline.rs index 620003775..a2f794e76 100644 --- a/secret_store/src/key_server_cluster/io/deadline.rs +++ b/secret_store/src/key_server_cluster/io/deadline.rs @@ -16,16 +16,16 @@ use std::io; use std::time::Duration; -use futures::{Future, Select, Poll, Async}; +use futures::{Future, Select, BoxFuture, Poll, Async}; use tokio_core::reactor::{Handle, Timeout}; -type DeadlineBox = ::std::boxed::Box::Item>, Error = ::Error> + Send>; +type DeadlineBox = BoxFuture::Item>, ::Error>; /// Complete a passed future or fail if it is not completed within timeout. pub fn deadline(duration: Duration, handle: &Handle, future: F) -> Result, io::Error> where F: Future + Send + 'static, T: 'static { - let timeout: DeadlineBox = Box::new(Timeout::new(duration, handle)?.map(|_| DeadlineStatus::Timeout)); - let future: DeadlineBox = Box::new(future.map(DeadlineStatus::Meet)); + let timeout = Timeout::new(duration, handle)?.map(|_| DeadlineStatus::Timeout).boxed(); + let future = future.map(DeadlineStatus::Meet).boxed(); let deadline = Deadline { future: timeout.select(future), };