SecretStore; fixed warnings for futures 0.1.15

This commit is contained in:
Svyatoslav Nikolsky 2017-09-28 14:43:05 +03:00
parent cc759530fe
commit 61daa5f3e7
2 changed files with 23 additions and 28 deletions

View File

@ -20,7 +20,7 @@ use std::sync::Arc;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::collections::btree_map::Entry; use std::collections::btree_map::Entry;
use std::net::{SocketAddr, IpAddr}; use std::net::{SocketAddr, IpAddr};
use futures::{finished, failed, Future, Stream, BoxFuture}; use futures::{finished, failed, Future, Stream};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use parking_lot::{RwLock, Mutex}; use parking_lot::{RwLock, Mutex};
use tokio_io::IoFuture; use tokio_io::IoFuture;
@ -55,9 +55,8 @@ const KEEP_ALIVE_SEND_INTERVAL: u64 = 30;
/// we must treat this node as non-responding && disconnect from it. /// we must treat this node as non-responding && disconnect from it.
const KEEP_ALIVE_DISCONNECT_INTERVAL: u64 = 60; const KEEP_ALIVE_DISCONNECT_INTERVAL: u64 = 60;
/// Encryption sesion timeout interval. It works
/// Empty future. /// Empty future.
type BoxedEmptyFuture = BoxFuture<(), ()>; type BoxedEmptyFuture = ::std::boxed::Box<Future<Item = (), Error = ()> + Send>;
/// Cluster interface for external clients. /// Cluster interface for external clients.
pub trait ClusterClient: Send + Sync { pub trait ClusterClient: Send + Sync {
@ -261,23 +260,21 @@ impl ClusterCore {
/// Connect to socket using given context and handle. /// Connect to socket using given context and handle.
fn connect_future(handle: &Handle, data: Arc<ClusterData>, node_address: SocketAddr) -> BoxedEmptyFuture { fn connect_future(handle: &Handle, data: Arc<ClusterData>, node_address: SocketAddr) -> BoxedEmptyFuture {
let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect(); let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect();
net_connect(&node_address, handle, data.self_key_pair.clone(), disconnected_nodes) Box::new(net_connect(&node_address, handle, data.self_key_pair.clone(), disconnected_nodes)
.then(move |result| ClusterCore::process_connection_result(data, Some(node_address), result)) .then(move |result| ClusterCore::process_connection_result(data, Some(node_address), result))
.then(|_| finished(())) .then(|_| finished(())))
.boxed()
} }
/// Start listening for incoming connections. /// Start listening for incoming connections.
fn listen(handle: &Handle, data: Arc<ClusterData>, listen_address: SocketAddr) -> Result<BoxedEmptyFuture, Error> { fn listen(handle: &Handle, data: Arc<ClusterData>, listen_address: SocketAddr) -> Result<BoxedEmptyFuture, Error> {
Ok(TcpListener::bind(&listen_address, &handle)? Ok(Box::new(TcpListener::bind(&listen_address, &handle)?
.incoming() .incoming()
.and_then(move |(stream, node_address)| { .and_then(move |(stream, node_address)| {
ClusterCore::accept_connection(data.clone(), stream, node_address); ClusterCore::accept_connection(data.clone(), stream, node_address);
Ok(()) Ok(())
}) })
.for_each(|_| Ok(())) .for_each(|_| Ok(()))
.then(|_| finished(())) .then(|_| finished(()))))
.boxed())
} }
/// Accept connection. /// Accept connection.
@ -289,21 +286,19 @@ impl ClusterCore {
/// Accept connection future. /// Accept connection future.
fn accept_connection_future(handle: &Handle, data: Arc<ClusterData>, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture { fn accept_connection_future(handle: &Handle, data: Arc<ClusterData>, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture {
net_accept_connection(node_address, stream, handle, data.self_key_pair.clone()) Box::new(net_accept_connection(node_address, stream, handle, data.self_key_pair.clone())
.then(move |result| ClusterCore::process_connection_result(data, None, result)) .then(move |result| ClusterCore::process_connection_result(data, None, result))
.then(|_| finished(())) .then(|_| finished(())))
.boxed()
} }
/// Schedule mainatain procedures. /// Schedule mainatain procedures.
fn schedule_maintain(handle: &Handle, data: Arc<ClusterData>) { fn schedule_maintain(handle: &Handle, data: Arc<ClusterData>) {
let d = data.clone(); let d = data.clone();
let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle) let interval: BoxedEmptyFuture = Box::new(Interval::new(time::Duration::new(MAINTAIN_INTERVAL, 0), handle)
.expect("failed to create interval") .expect("failed to create interval")
.and_then(move |_| Ok(ClusterCore::maintain(data.clone()))) .and_then(move |_| Ok(ClusterCore::maintain(data.clone())))
.for_each(|_| Ok(())) .for_each(|_| Ok(()))
.then(|_| finished(())) .then(|_| finished(())));
.boxed();
d.spawn(interval); d.spawn(interval);
} }
@ -319,7 +314,7 @@ impl ClusterCore {
/// Called for every incomming mesage. /// Called for every incomming mesage.
fn process_connection_messages(data: Arc<ClusterData>, connection: Arc<Connection>) -> IoFuture<Result<(), Error>> { fn process_connection_messages(data: Arc<ClusterData>, connection: Arc<Connection>) -> IoFuture<Result<(), Error>> {
connection Box::new(connection
.read_message() .read_message()
.then(move |result| .then(move |result|
match result { match result {
@ -327,22 +322,22 @@ impl ClusterCore {
ClusterCore::process_connection_message(data.clone(), connection.clone(), message); ClusterCore::process_connection_message(data.clone(), connection.clone(), message);
// continue serving connection // continue serving connection
data.spawn(ClusterCore::process_connection_messages(data.clone(), connection)); data.spawn(ClusterCore::process_connection_messages(data.clone(), connection));
finished(Ok(())).boxed() Box::new(finished(Ok(())))
}, },
Ok((_, Err(err))) => { Ok((_, Err(err))) => {
warn!(target: "secretstore_net", "{}: protocol error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id()); warn!(target: "secretstore_net", "{}: protocol error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id());
// continue serving connection // continue serving connection
data.spawn(ClusterCore::process_connection_messages(data.clone(), connection)); data.spawn(ClusterCore::process_connection_messages(data.clone(), connection));
finished(Err(err)).boxed() Box::new(finished(Err(err)))
}, },
Err(err) => { Err(err) => {
warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id()); warn!(target: "secretstore_net", "{}: network error '{}' when reading message from node {}", data.self_key_pair.public(), err, connection.node_id());
// close connection // close connection
data.connections.remove(connection.node_id(), connection.is_inbound()); data.connections.remove(connection.node_id(), connection.is_inbound());
failed(err).boxed() Box::new(failed(err))
}, },
} }
).boxed() ))
} }
/// Send keepalive messages to every othe node. /// Send keepalive messages to every othe node.
@ -377,26 +372,26 @@ impl ClusterCore {
if data.connections.insert(connection.clone()) { if data.connections.insert(connection.clone()) {
ClusterCore::process_connection_messages(data.clone(), connection) ClusterCore::process_connection_messages(data.clone(), connection)
} else { } else {
finished(Ok(())).boxed() Box::new(finished(Ok(())))
} }
}, },
Ok(DeadlineStatus::Meet(Err(err))) => { Ok(DeadlineStatus::Meet(Err(err))) => {
warn!(target: "secretstore_net", "{}: protocol error '{}' when establishing {} connection{}", warn!(target: "secretstore_net", "{}: protocol error '{}' when establishing {} connection{}",
data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" }, data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" },
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
finished(Ok(())).boxed() Box::new(finished(Ok(())))
}, },
Ok(DeadlineStatus::Timeout) => { Ok(DeadlineStatus::Timeout) => {
warn!(target: "secretstore_net", "{}: timeout when establishing {} connection{}", warn!(target: "secretstore_net", "{}: timeout when establishing {} connection{}",
data.self_key_pair.public(), if outbound_addr.is_some() { "outbound" } else { "inbound" }, data.self_key_pair.public(), if outbound_addr.is_some() { "outbound" } else { "inbound" },
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
finished(Ok(())).boxed() Box::new(finished(Ok(())))
}, },
Err(err) => { Err(err) => {
warn!(target: "secretstore_net", "{}: network error '{}' when establishing {} connection{}", warn!(target: "secretstore_net", "{}: network error '{}' when establishing {} connection{}",
data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" }, data.self_key_pair.public(), err, if outbound_addr.is_some() { "outbound" } else { "inbound" },
outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default()); outbound_addr.map(|a| format!(" with {}", a)).unwrap_or_default());
finished(Ok(())).boxed() Box::new(finished(Ok(())))
}, },
} }
} }

View File

@ -16,16 +16,16 @@
use std::io; use std::io;
use std::time::Duration; use std::time::Duration;
use futures::{Future, Select, BoxFuture, Poll, Async}; use futures::{Future, Select, Poll, Async};
use tokio_core::reactor::{Handle, Timeout}; use tokio_core::reactor::{Handle, Timeout};
type DeadlineBox<F> = BoxFuture<DeadlineStatus<<F as Future>::Item>, <F as Future>::Error>; type DeadlineBox<F> = ::std::boxed::Box<Future<Item = DeadlineStatus<<F as Future>::Item>, Error = <F as Future>::Error> + Send>;
/// Complete a passed future or fail if it is not completed within timeout. /// Complete a passed future or fail if it is not completed within timeout.
pub fn deadline<F, T>(duration: Duration, handle: &Handle, future: F) -> Result<Deadline<F>, io::Error> pub fn deadline<F, T>(duration: Duration, handle: &Handle, future: F) -> Result<Deadline<F>, io::Error>
where F: Future<Item = T, Error = io::Error> + Send + 'static, T: 'static { where F: Future<Item = T, Error = io::Error> + Send + 'static, T: 'static {
let timeout = Timeout::new(duration, handle)?.map(|_| DeadlineStatus::Timeout).boxed(); let timeout: DeadlineBox<F> = Box::new(Timeout::new(duration, handle)?.map(|_| DeadlineStatus::Timeout));
let future = future.map(DeadlineStatus::Meet).boxed(); let future: DeadlineBox<F> = Box::new(future.map(DeadlineStatus::Meet));
let deadline = Deadline { let deadline = Deadline {
future: timeout.select(future), future: timeout.select(future),
}; };