Revert "SecretStore; fixed warnings for futures 0.1.15"

This reverts commit 61daa5f3e7.
This commit is contained in:
Svyatoslav Nikolsky 2017-09-28 14:43:51 +03:00
parent 61daa5f3e7
commit 7cc43893d8
2 changed files with 28 additions and 23 deletions

View File

@ -20,7 +20,7 @@ use std::sync::Arc;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::btree_map::Entry;
use std::net::{SocketAddr, IpAddr};
use futures::{finished, failed, Future, Stream};
use futures::{finished, failed, Future, Stream, BoxFuture};
use futures_cpupool::CpuPool;
use parking_lot::{RwLock, Mutex};
use tokio_io::IoFuture;
@ -55,8 +55,9 @@ const KEEP_ALIVE_SEND_INTERVAL: u64 = 30;
/// we must treat this node as non-responding && disconnect from it.
const KEEP_ALIVE_DISCONNECT_INTERVAL: u64 = 60;
/// Encryption sesion timeout interval. It works
/// Empty future.
type BoxedEmptyFuture = ::std::boxed::Box<Future<Item = (), Error = ()> + Send>;
type BoxedEmptyFuture = BoxFuture<(), ()>;
/// Cluster interface for external clients.
pub trait ClusterClient: Send + Sync {
@ -260,21 +261,23 @@ impl ClusterCore {
/// Connect to socket using given context and handle.
fn connect_future(handle: &Handle, data: Arc<ClusterData>, node_address: SocketAddr) -> BoxedEmptyFuture {
let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect();
Box::new(net_connect(&node_address, handle, data.self_key_pair.clone(), disconnected_nodes)
net_connect(&node_address, handle, data.self_key_pair.clone(), disconnected_nodes)
.then(move |result| ClusterCore::process_connection_result(data, Some(node_address), result))
.then(|_| finished(())))
.then(|_| finished(()))
.boxed()
}
/// Start listening for incoming connections.
fn listen(handle: &Handle, data: Arc<ClusterData>, listen_address: SocketAddr) -> Result<BoxedEmptyFuture, Error> {
Ok(Box::new(TcpListener::bind(&listen_address, &handle)?
Ok(TcpListener::bind(&listen_address, &handle)?
.incoming()
.and_then(move |(stream, node_address)| {
ClusterCore::accept_connection(data.clone(), stream, node_address);
Ok(())
})
.for_each(|_| Ok(()))
.then(|_| finished(()))))
.then(|_| finished(()))
.boxed())
}
/// Accept connection.
@ -286,19 +289,21 @@ impl ClusterCore {
/// Accept connection future.
fn accept_connection_future(handle: &Handle, data: Arc<ClusterData>, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture {
Box::new(net_accept_connection(node_address, stream, handle, data.self_key_pair.clone())
net_accept_connection(node_address, stream, handle, data.self_key_pair.clone())
.then(move |result| ClusterCore::process_connection_result(data, None, result))
.then(|_| finished(())))
.then(|_| finished(()))
.boxed()
}
/// Schedule mainatain procedures.
fn schedule_maintain(handle: &Handle, data: Arc<ClusterData>) {
let d = data.clone();
let interval: BoxedEmptyFuture = Box::new(Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle)
let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle)
.expect("failed to create interval")
.and_then(move |_| Ok(ClusterCore::maintain(data.clone())))
.for_each(|_| Ok(()))
.then(|_| finished(())));
.then(|_| finished(()))
.boxed();
d.spawn(interval);
}
@ -314,7 +319,7 @@ impl ClusterCore {
/// Called for every incomming mesage.
fn process_connection_messages(data: Arc<ClusterData>, connection: Arc<Connection>) -> IoFuture<Result<(), Error>> {
Box::new(connection
connection
.read_message()
.then(move |result|
match result {
@ -322,22 +327,22 @@ impl ClusterCore {
ClusterCore::process_connection_message(data.clone(), connection.clone(), message);
// continue serving connection
data.spawn(ClusterCore::process_connection_messages(data.clone(), connection));
Box::new(finished(Ok(())))
finished(Ok(())).boxed()
},
Ok((_, Err(err))) => {
warn!(target: "secretstore_net", "{}: protocol error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id());
// continue serving connection
data.spawn(ClusterCore::process_connection_messages(data.clone(), connection));
Box::new(finished(Err(err)))
finished(Err(err)).boxed()
},
Err(err) => {
warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id());
// close connection
data.connections.remove(connection.node_id(), connection.is_inbound());
Box::new(failed(err))
failed(err).boxed()
},
}
))
).boxed()
}
/// Send keepalive messages to every othe node.
@ -372,26 +377,26 @@ impl ClusterCore {
if data.connections.insert(connection.clone()) {
ClusterCore::process_connection_messages(data.clone(), connection)
} else {
Box::new(finished(Ok(())))
finished(Ok(())).boxed()
}
},
Ok(DeadlineStatus::Meet(Err(err))) => {
warn!(target: "secretstore_net", "{}: protocol error '{}' when establishing {} connection{}",
data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" },
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
Box::new(finished(Ok(())))
finished(Ok(())).boxed()
},
Ok(DeadlineStatus::Timeout) => {
warn!(target: "secretstore_net", "{}: timeout when establishing {} connection{}",
data.self_key_pair.public(), if outbound_addr.is_some() { "outbound" } else { "inbound" },
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
Box::new(finished(Ok(())))
finished(Ok(())).boxed()
},
Err(err) => {
warn!(target: "secretstore_net", "{}: network error '{}' when establishing {} connection{}",
data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" },
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
Box::new(finished(Ok(())))
finished(Ok(())).boxed()
},
}
}

View File

@ -16,16 +16,16 @@
use std::io;
use std::time::Duration;
use futures::{Future, Select, Poll, Async};
use futures::{Future, Select, BoxFuture, Poll, Async};
use tokio_core::reactor::{Handle, Timeout};
type DeadlineBox<F> = ::std::boxed::Box<Future<Item = DeadlineStatus<<F as Future>::Item>, Error = <F as Future>::Error> + Send>;
type DeadlineBox<F> = BoxFuture<DeadlineStatus<<F as Future>::Item>, <F as Future>::Error>;
/// Complete a passed future or fail if it is not completed within timeout.
pub fn deadline<F, T>(duration: Duration, handle: &Handle, future: F) -> Result<Deadline<F>, io::Error>
where F: Future<Item = T, Error = io::Error> + Send + 'static, T: 'static {
let timeout: DeadlineBox<F> = Box::new(Timeout::new(duration, handle)?.map(|_| DeadlineStatus::Timeout));
let future: DeadlineBox<F> = Box::new(future.map(DeadlineStatus::Meet));
let timeout = Timeout::new(duration, handle)?.map(|_| DeadlineStatus::Timeout).boxed();
let future = future.map(DeadlineStatus::Meet).boxed();
let deadline = Deadline {
future: timeout.select(future),
};