diff --git a/Cargo.lock b/Cargo.lock index a66df64da..e705e39f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -723,6 +723,7 @@ dependencies = [ "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.10.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "native-contracts 0.1.0", "parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/native_contracts/build.rs b/ethcore/native_contracts/build.rs index cec830929..bcb64067c 100644 --- a/ethcore/native_contracts/build.rs +++ b/ethcore/native_contracts/build.rs @@ -21,6 +21,7 @@ use std::fs::File; use std::io::Write; // TODO: just walk the "res" directory and generate whole crate automatically. +const KEY_SERVER_SET_ABI: &'static str = include_str!("res/key_server_set.json"); const REGISTRY_ABI: &'static str = include_str!("res/registrar.json"); const URLHINT_ABI: &'static str = include_str!("res/urlhint.json"); const SERVICE_TRANSACTION_ABI: &'static str = include_str!("res/service_transaction.json"); @@ -45,6 +46,7 @@ fn build_test_contracts() { } fn main() { + build_file("KeyServerSet", KEY_SERVER_SET_ABI, "key_server_set.rs"); build_file("Registry", REGISTRY_ABI, "registry.rs"); build_file("Urlhint", URLHINT_ABI, "urlhint.rs"); build_file("ServiceTransactionChecker", SERVICE_TRANSACTION_ABI, "service_transaction.rs"); diff --git a/ethcore/native_contracts/res/key_server_set.json b/ethcore/native_contracts/res/key_server_set.json new file mode 100644 index 000000000..93f68837a --- /dev/null +++ b/ethcore/native_contracts/res/key_server_set.json @@ -0,0 +1 @@ +[{"constant":true,"inputs":[{"name":"","type":"uint256"}],"name":"keyServersList","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"_new","type":"address"}],"name":"setOwner","outputs":[],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"getKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"owner","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServer","type":"address"}],"name":"removeKeyServer","outputs":[],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServerPublic","type":"bytes"},{"name":"keyServerIp","type":"string"}],"name":"addKeyServer","outputs":[],"payable":false,"type":"function"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerAdded","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerRemoved","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"old","type":"address"},{"indexed":true,"name":"current","type":"address"}],"name":"NewOwner","type":"event"}] \ No newline at end of file diff --git a/ethcore/native_contracts/src/key_server_set.rs b/ethcore/native_contracts/src/key_server_set.rs new file mode 100644 index 000000000..60b137aae --- /dev/null +++ b/ethcore/native_contracts/src/key_server_set.rs @@ -0,0 +1,21 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +#![allow(unused_mut, unused_variables, unused_imports)] + +//! Secret store Key Server set contract. + +include!(concat!(env!("OUT_DIR"), "/key_server_set.rs")); diff --git a/ethcore/native_contracts/src/lib.rs b/ethcore/native_contracts/src/lib.rs index e35a4ec19..33cb91563 100644 --- a/ethcore/native_contracts/src/lib.rs +++ b/ethcore/native_contracts/src/lib.rs @@ -23,6 +23,7 @@ extern crate byteorder; extern crate ethabi; extern crate ethcore_util as util; +mod key_server_set; mod registry; mod urlhint; mod service_transaction; @@ -32,6 +33,7 @@ mod validator_report; pub mod test_contracts; +pub use self::key_server_set::KeyServerSet; pub use self::registry::Registry; pub use self::urlhint::Urlhint; pub use self::service_transaction::ServiceTransactionChecker; diff --git a/secret_store/Cargo.toml b/secret_store/Cargo.toml index eea49978d..19f342aa9 100644 --- a/secret_store/Cargo.toml +++ b/secret_store/Cargo.toml @@ -35,3 +35,4 @@ ethcore-logger = { path = "../logger" } ethcrypto = { path = "../ethcrypto" } ethkey = { path = "../ethkey" } native-contracts = { path = "../ethcore/native_contracts" } +lazy_static = "0.2" diff --git a/secret_store/src/acl_storage.rs b/secret_store/src/acl_storage.rs index 816d100dc..37d5bcd25 100644 --- a/secret_store/src/acl_storage.rs +++ b/secret_store/src/acl_storage.rs @@ -14,12 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Arc; +use std::sync::{Arc, Weak}; use futures::{future, Future}; use parking_lot::Mutex; use ethkey::public_to_address; -use ethcore::client::{Client, BlockChainClient, BlockId}; +use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; use native_contracts::SecretStoreAclStorage; +use util::{H256, Address, Bytes}; use types::all::{Error, ServerKeyId, Public}; const ACL_CHECKER_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_acl_checker"; @@ -32,40 +33,82 @@ pub trait AclStorage: Send + Sync { /// On-chain ACL storage implementation. pub struct OnChainAclStorage { + /// Cached on-chain contract. + contract: Mutex, +} + +/// Cached on-chain ACL storage contract. +struct CachedContract { /// Blockchain client. - client: Arc, - /// On-chain contract. - contract: Mutex>, + client: Weak, + /// Contract address. + contract_addr: Option
, + /// Contract at given address. + contract: Option, } impl OnChainAclStorage { - pub fn new(client: Arc) -> Self { - OnChainAclStorage { - client: client, - contract: Mutex::new(None), - } + pub fn new(client: &Arc) -> Arc { + let acl_storage = Arc::new(OnChainAclStorage { + contract: Mutex::new(CachedContract::new(client)), + }); + client.add_notify(acl_storage.clone()); + acl_storage } } impl AclStorage for OnChainAclStorage { fn check(&self, public: &Public, document: &ServerKeyId) -> Result { - let mut contract = self.contract.lock(); - if !contract.is_some() { - *contract = self.client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned()) - .and_then(|contract_addr| { + self.contract.lock().check(public, document) + } +} + +impl ChainNotify for OnChainAclStorage { + fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { + if !enacted.is_empty() || !retracted.is_empty() { + self.contract.lock().update() + } + } +} + +impl CachedContract { + pub fn new(client: &Arc) -> Self { + CachedContract { + client: Arc::downgrade(client), + contract_addr: None, + contract: None, + } + } + + pub fn update(&mut self) { + if let Some(client) = self.client.upgrade() { + let new_contract_addr = client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned()); + if self.contract_addr.as_ref() != new_contract_addr.as_ref() { + self.contract = new_contract_addr.map(|contract_addr| { trace!(target: "secretstore", "Configuring for ACL checker contract from {}", contract_addr); - Some(SecretStoreAclStorage::new(contract_addr)) - }) + SecretStoreAclStorage::new(contract_addr) + }); + + self.contract_addr = new_contract_addr; + } } - if let Some(ref contract) = *contract { - let address = public_to_address(&public); - let do_call = |a, d| future::done(self.client.call_contract(BlockId::Latest, a, d)); - contract.check_permissions(do_call, address, document.clone()) - .map_err(|err| Error::Internal(err)) - .wait() - } else { - Err(Error::Internal("ACL checker contract is not configured".to_owned())) + } + + pub fn check(&mut self, public: &Public, document: &ServerKeyId) -> Result { + match self.contract.as_ref() { + Some(contract) => { + let address = public_to_address(&public); + let do_call = |a, d| future::done( + self.client + .upgrade() + .ok_or("Calling contract without client".into()) + .and_then(|c| c.call_contract(BlockId::Latest, a, d))); + contract.check_permissions(do_call, address, document.clone()) + .map_err(|err| Error::Internal(err)) + .wait() + }, + None => Err(Error::Internal("ACL checker contract is not configured".to_owned())), } } } diff --git a/secret_store/src/key_server.rs b/secret_store/src/key_server.rs index fd4e154fa..c83e460f3 100644 --- a/secret_store/src/key_server.rs +++ b/secret_store/src/key_server.rs @@ -24,6 +24,7 @@ use ethcrypto; use ethkey; use super::acl_storage::AclStorage; use super::key_storage::KeyStorage; +use super::key_server_set::KeyServerSet; use key_server_cluster::{math, ClusterCore}; use traits::{ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer}; use types::all::{Error, Public, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow, @@ -44,9 +45,9 @@ pub struct KeyServerCore { impl KeyServerImpl { /// Create new key server instance - pub fn new(config: &ClusterConfiguration, acl_storage: Arc, key_storage: Arc) -> Result { + pub fn new(config: &ClusterConfiguration, key_server_set: Arc, acl_storage: Arc, key_storage: Arc) -> Result { Ok(KeyServerImpl { - data: Arc::new(Mutex::new(KeyServerCore::new(config, acl_storage, key_storage)?)), + data: Arc::new(Mutex::new(KeyServerCore::new(config, key_server_set, acl_storage, key_storage)?)), }) } @@ -143,14 +144,12 @@ impl MessageSigner for KeyServerImpl { } impl KeyServerCore { - pub fn new(config: &ClusterConfiguration, acl_storage: Arc, key_storage: Arc) -> Result { + pub fn new(config: &ClusterConfiguration, key_server_set: Arc, acl_storage: Arc, key_storage: Arc) -> Result { let config = NetClusterConfiguration { threads: config.threads, self_key_pair: ethkey::KeyPair::from_secret_slice(&config.self_private)?, listen_address: (config.listener_address.address.clone(), config.listener_address.port), - nodes: config.nodes.iter() - .map(|(node_id, node_address)| (node_id.clone(), (node_address.address.clone(), node_address.port))) - .collect(), + key_server_set: key_server_set, allow_connecting_to_higher_nodes: config.allow_connecting_to_higher_nodes, acl_storage: acl_storage, key_storage: key_storage, @@ -193,10 +192,13 @@ impl Drop for KeyServerCore { pub mod tests { use std::time; use std::sync::Arc; + use std::net::SocketAddr; + use std::collections::BTreeMap; use ethcrypto; use ethkey::{self, Secret, Random, Generator}; use acl_storage::tests::DummyAclStorage; use key_storage::tests::DummyKeyStorage; + use key_server_set::tests::MapKeyServerSet; use key_server_cluster::math; use util::H256; use types::all::{Error, Public, ClusterConfiguration, NodeAddress, RequestSignature, ServerKeyId, @@ -254,8 +256,11 @@ pub mod tests { })).collect(), allow_connecting_to_higher_nodes: false, }).collect(); + let key_servers_set: BTreeMap = configs[0].nodes.iter() + .map(|(k, a)| (k.clone(), format!("{}:{}", a.address, a.port).parse().unwrap())) + .collect(); let key_servers: Vec<_> = configs.into_iter().map(|cfg| - KeyServerImpl::new(&cfg, Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap() + KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(key_servers_set.clone())), Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap() ).collect(); // wait until connections are established. It is fast => do not bother with events here diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index c86f30267..d77a82431 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -28,7 +28,7 @@ use tokio_core::reactor::{Handle, Remote, Interval}; use tokio_core::net::{TcpListener, TcpStream}; use ethkey::{Public, KeyPair, Signature, Random, Generator}; use util::H256; -use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage}; +use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, KeyServerSet}; use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, GenerationSessionWrapper, EncryptionSessionWrapper, DecryptionSessionWrapper, SigningSessionWrapper}; use key_server_cluster::message::{self, Message, ClusterMessage, GenerationMessage, EncryptionMessage, DecryptionMessage, @@ -102,8 +102,8 @@ pub struct ClusterConfiguration { pub self_key_pair: KeyPair, /// Interface to listen to. pub listen_address: (String, u16), - /// Cluster nodes. - pub nodes: BTreeMap, + /// Cluster nodes set. + pub key_server_set: Arc, /// Reference to key storage pub key_storage: Arc, /// Reference to ACL storage @@ -158,9 +158,17 @@ pub struct ClusterConnections { /// Self node id. pub self_node_id: NodeId, /// All known other key servers. - pub nodes: BTreeMap, + pub key_server_set: Arc, + /// Connections data. + pub data: RwLock, +} + +/// Cluster connections data. +pub struct ClusterConnectionsData { + /// Active key servers set. + pub nodes: BTreeMap, /// Active connections to key servers. - pub connections: RwLock>>, + pub connections: BTreeMap>, } /// Cluster view core. @@ -281,8 +289,7 @@ impl ClusterCore { /// Accept connection future. fn accept_connection_future(handle: &Handle, data: Arc, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture { - let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect(); - net_accept_connection(node_address, stream, handle, data.self_key_pair.clone(), disconnected_nodes) + net_accept_connection(node_address, stream, handle, data.self_key_pair.clone()) .then(move |result| ClusterCore::process_connection_result(data, true, result)) .then(|_| finished(())) .boxed() @@ -354,6 +361,7 @@ impl ClusterCore { /// Try to connect to every disconnected node. fn connect_disconnected_nodes(data: Arc) { + data.connections.update_nodes_set(); for (node_id, node_address) in data.connections.disconnected_nodes() { if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id { ClusterCore::connect(data.clone(), node_address); @@ -372,14 +380,16 @@ impl ClusterCore { finished(Ok(())).boxed() } }, - Ok(DeadlineStatus::Meet(Err(_))) => { + Ok(DeadlineStatus::Meet(Err(err))) => { + warn!(target: "secretstore_net", "{}: protocol error {} when establishind connection", data.self_key_pair.public(), err); finished(Ok(())).boxed() }, Ok(DeadlineStatus::Timeout) => { + warn!(target: "secretstore_net", "{}: timeout when establishind connection", data.self_key_pair.public()); finished(Ok(())).boxed() }, - Err(_) => { - // network error + Err(err) => { + warn!(target: "secretstore_net", "{}: network error {} when establishind connection", data.self_key_pair.public(), err); finished(Ok(())).boxed() }, } @@ -665,33 +675,38 @@ impl ClusterCore { impl ClusterConnections { pub fn new(config: &ClusterConfiguration) -> Result { - let mut connections = ClusterConnections { + let mut nodes = config.key_server_set.get(); + nodes.remove(config.self_key_pair.public()); + + Ok(ClusterConnections { self_node_id: config.self_key_pair.public().clone(), - nodes: BTreeMap::new(), - connections: RwLock::new(BTreeMap::new()), - }; - - for (node_id, &(ref node_addr, node_port)) in config.nodes.iter().filter(|&(node_id, _)| node_id != config.self_key_pair.public()) { - let socket_address = make_socket_address(&node_addr, node_port)?; - connections.nodes.insert(node_id.clone(), socket_address); - } - - Ok(connections) + key_server_set: config.key_server_set.clone(), + data: RwLock::new(ClusterConnectionsData { + nodes: nodes, + connections: BTreeMap::new(), + }), + }) } pub fn cluster_state(&self) -> ClusterState { ClusterState { - connected: self.connections.read().keys().cloned().collect(), + connected: self.data.read().connections.keys().cloned().collect(), } } pub fn get(&self, node: &NodeId) -> Option> { - self.connections.read().get(node).cloned() + self.data.read().connections.get(node).cloned() } pub fn insert(&self, connection: Arc) -> bool { - let mut connections = self.connections.write(); - if connections.contains_key(connection.node_id()) { + let mut data = self.data.write(); + if !data.nodes.contains_key(connection.node_id()) { + // incoming connections are checked here + trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); + debug_assert!(connection.is_inbound()); + return false; + } + if data.connections.contains_key(connection.node_id()) { // we have already connected to the same node // the agreement is that node with lower id must establish connection to node with higher id if (&self.self_node_id < connection.node_id() && connection.is_inbound()) @@ -700,14 +715,15 @@ impl ClusterConnections { } } - trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address()); - connections.insert(connection.node_id().clone(), connection); + trace!(target: "secretstore_net", "{}: inserting connection to {} at {}. Connected to {} of {} nodes", + self.self_node_id, connection.node_id(), connection.node_address(), data.connections.len() + 1, data.nodes.len()); + data.connections.insert(connection.node_id().clone(), connection); true } pub fn remove(&self, node: &NodeId, is_inbound: bool) { - let mut connections = self.connections.write(); - if let Entry::Occupied(entry) = connections.entry(node.clone()) { + let mut data = self.data.write(); + if let Entry::Occupied(entry) = data.connections.entry(node.clone()) { if entry.get().is_inbound() != is_inbound { return; } @@ -718,20 +734,64 @@ impl ClusterConnections { } pub fn connected_nodes(&self) -> BTreeSet { - self.connections.read().keys().cloned().collect() + self.data.read().connections.keys().cloned().collect() } pub fn active_connections(&self)-> Vec> { - self.connections.read().values().cloned().collect() + self.data.read().connections.values().cloned().collect() } pub fn disconnected_nodes(&self) -> BTreeMap { - let connections = self.connections.read(); - self.nodes.iter() - .filter(|&(node_id, _)| !connections.contains_key(node_id)) + let data = self.data.read(); + data.nodes.iter() + .filter(|&(node_id, _)| !data.connections.contains_key(node_id)) .map(|(node_id, node_address)| (node_id.clone(), node_address.clone())) .collect() } + + pub fn update_nodes_set(&self) { + let mut data = self.data.write(); + let mut new_nodes = self.key_server_set.get(); + // we do not need to connect to self + // + we do not need to try to connect to any other node if we are not the part of a cluster + if new_nodes.remove(&self.self_node_id).is_none() { + new_nodes.clear(); + } + + let mut num_added_nodes = 0; + let mut num_removed_nodes = 0; + let mut num_changed_nodes = 0; + + for obsolete_node in data.nodes.keys().cloned().collect::>() { + if !new_nodes.contains_key(&obsolete_node) { + if let Entry::Occupied(entry) = data.connections.entry(obsolete_node) { + trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); + entry.remove(); + } + + data.nodes.remove(&obsolete_node); + num_removed_nodes += 1; + } + } + + for (new_node_public, new_node_addr) in new_nodes { + match data.nodes.insert(new_node_public, new_node_addr) { + None => num_added_nodes += 1, + Some(old_node_addr) => if new_node_addr != old_node_addr { + if let Entry::Occupied(entry) = data.connections.entry(new_node_public) { + trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address()); + entry.remove(); + } + num_changed_nodes += 1; + }, + } + } + + if num_added_nodes != 0 || num_removed_nodes != 0 || num_changed_nodes != 0 { + trace!(target: "secretstore_net", "{}: updated nodes set: removed {}, added {}, changed {}. Connected to {} of {} nodes", + self.self_node_id, num_removed_nodes, num_added_nodes, num_changed_nodes, data.connections.len(), data.nodes.len()); + } + } } impl ClusterData { @@ -929,7 +989,7 @@ pub mod tests { use parking_lot::Mutex; use tokio_core::reactor::Core; use ethkey::{Random, Generator, Public}; - use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage}; + use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet}; use key_server_cluster::message::Message; use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration}; use key_server_cluster::generation_session::{Session as GenerationSession, SessionState as GenerationSessionState}; @@ -999,7 +1059,7 @@ pub mod tests { } pub fn all_connections_established(cluster: &Arc) -> bool { - cluster.config().nodes.keys() + cluster.config().key_server_set.get().keys() .filter(|p| *p != cluster.config().self_key_pair.public()) .all(|p| cluster.connection(p).is_some()) } @@ -1010,9 +1070,9 @@ pub mod tests { threads: 1, self_key_pair: key_pairs[i].clone(), listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16), - nodes: key_pairs.iter().enumerate() - .map(|(j, kp)| (kp.public().clone(), ("127.0.0.1".into(), ports_begin + j as u16))) - .collect(), + key_server_set: Arc::new(MapKeyServerSet::new(key_pairs.iter().enumerate() + .map(|(j, kp)| (kp.public().clone(), format!("127.0.0.1:{}", ports_begin + j as u16).parse().unwrap())) + .collect())), allow_connecting_to_higher_nodes: false, key_storage: Arc::new(DummyKeyStorage::default()), acl_storage: Arc::new(DummyAclStorage::default()), diff --git a/secret_store/src/key_server_cluster/cluster_sessions.rs b/secret_store/src/key_server_cluster/cluster_sessions.rs index f66ad972f..f8e4974b1 100644 --- a/secret_store/src/key_server_cluster/cluster_sessions.rs +++ b/secret_store/src/key_server_cluster/cluster_sessions.rs @@ -135,7 +135,7 @@ impl ClusterSessions { pub fn new(config: &ClusterConfiguration) -> Self { ClusterSessions { self_node_id: config.self_key_pair.public().clone(), - nodes: config.nodes.keys().cloned().collect(), + nodes: config.key_server_set.get().keys().cloned().collect(), acl_storage: config.acl_storage.clone(), key_storage: config.key_storage.clone(), generation_sessions: ClusterSessionsContainer::new(), diff --git a/secret_store/src/key_server_cluster/io/handshake.rs b/secret_store/src/key_server_cluster/io/handshake.rs index 38d8a6ac1..df8f6cbf7 100644 --- a/secret_store/src/key_server_cluster/io/handshake.rs +++ b/secret_store/src/key_server_cluster/io/handshake.rs @@ -45,7 +45,7 @@ pub fn handshake_with_plain_confirmation(a: A, self_confirmation_plain: Resul state: state, self_key_pair: self_key_pair, self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()), - trusted_nodes: trusted_nodes, + trusted_nodes: Some(trusted_nodes), other_node_id: None, other_confirmation_plain: None, shared_key: None, @@ -53,7 +53,7 @@ pub fn handshake_with_plain_confirmation(a: A, self_confirmation_plain: Resul } /// Wait for handshake procedure to be started by another node from the cluster. -pub fn accept_handshake(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet) -> Handshake where A: AsyncWrite + AsyncRead { +pub fn accept_handshake(a: A, self_key_pair: KeyPair) -> Handshake where A: AsyncWrite + AsyncRead { let self_confirmation_plain = Random.generate().map(|kp| *kp.secret().clone()).map_err(Into::into); let (error, state) = match self_confirmation_plain.clone() { Ok(_) => (None, HandshakeState::ReceivePublicKey(read_message(a))), @@ -66,7 +66,7 @@ pub fn accept_handshake(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet state: state, self_key_pair: self_key_pair, self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()), - trusted_nodes: trusted_nodes, + trusted_nodes: None, other_node_id: None, other_confirmation_plain: None, shared_key: None, @@ -89,7 +89,7 @@ pub struct Handshake { state: HandshakeState, self_key_pair: KeyPair, self_confirmation_plain: H256, - trusted_nodes: BTreeSet, + trusted_nodes: Option>, other_node_id: Option, other_confirmation_plain: Option, shared_key: Option, @@ -172,7 +172,7 @@ impl Future for Handshake where A: AsyncRead + AsyncWrite { Err(err) => return Ok((stream, Err(err.into())).into()), }; - if !self.trusted_nodes.contains(&*message.node_id) { + if !self.trusted_nodes.as_ref().map(|tn| tn.contains(&*message.node_id)).unwrap_or(true) { return Ok((stream, Err(Error::InvalidNodeId)).into()); } @@ -300,7 +300,7 @@ mod tests { let trusted_nodes: BTreeSet<_> = vec![io.peer_public().clone()].into_iter().collect(); let shared_key = compute_shared_key(self_key_pair.secret(), trusted_nodes.iter().nth(0).unwrap()).unwrap(); - let mut handshake = accept_handshake(io, self_key_pair, trusted_nodes); + let mut handshake = accept_handshake(io, self_key_pair); handshake.set_self_confirmation_plain(self_confirmation_plain); let handshake_result = handshake.wait().unwrap(); diff --git a/secret_store/src/key_server_cluster/jobs/job_session.rs b/secret_store/src/key_server_cluster/jobs/job_session.rs index 7ae1da42a..6608397dd 100644 --- a/secret_store/src/key_server_cluster/jobs/job_session.rs +++ b/secret_store/src/key_server_cluster/jobs/job_session.rs @@ -299,22 +299,22 @@ impl JobSession where Executor: JobExe return Err(Error::ConsensusUnreachable); } - let active_data = self.data.active_data.as_mut() - .expect("we have checked that we are on master node; on master nodes active_data is filled during initialization; qed"); - if active_data.rejects.contains(node) { - return Ok(()); - } - if active_data.requests.remove(node) || active_data.responses.remove(node).is_some() { - active_data.rejects.insert(node.clone()); - if self.data.state == JobSessionState::Finished && active_data.responses.len() < self.meta.threshold + 1 { - self.data.state = JobSessionState::Active; - } - if active_data.requests.len() + active_data.responses.len() >= self.meta.threshold + 1 { + if let Some(active_data) = self.data.active_data.as_mut() { + if active_data.rejects.contains(node) { return Ok(()); } + if active_data.requests.remove(node) || active_data.responses.remove(node).is_some() { + active_data.rejects.insert(node.clone()); + if self.data.state == JobSessionState::Finished && active_data.responses.len() < self.meta.threshold + 1 { + self.data.state = JobSessionState::Active; + } + if active_data.requests.len() + active_data.responses.len() >= self.meta.threshold + 1 { + return Ok(()); + } - self.data.state = JobSessionState::Failed; - return Err(Error::ConsensusUnreachable); + self.data.state = JobSessionState::Failed; + return Err(Error::ConsensusUnreachable); + } } Ok(()) diff --git a/secret_store/src/key_server_cluster/mod.rs b/secret_store/src/key_server_cluster/mod.rs index 71c505f95..8f6ae4add 100644 --- a/secret_store/src/key_server_cluster/mod.rs +++ b/secret_store/src/key_server_cluster/mod.rs @@ -23,6 +23,7 @@ use super::types::all::ServerKeyId; pub use super::types::all::{NodeId, EncryptedDocumentKeyShadow}; pub use super::acl_storage::AclStorage; pub use super::key_storage::{KeyStorage, DocumentKeyShare}; +pub use super::key_server_set::KeyServerSet; pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash}; pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient}; pub use self::generation_session::Session as GenerationSession; @@ -33,6 +34,8 @@ pub use self::decryption_session::Session as DecryptionSession; pub use super::key_storage::tests::DummyKeyStorage; #[cfg(test)] pub use super::acl_storage::tests::DummyAclStorage; +#[cfg(test)] +pub use super::key_server_set::tests::MapKeyServerSet; pub type SessionId = ServerKeyId; diff --git a/secret_store/src/key_server_cluster/net/accept_connection.rs b/secret_store/src/key_server_cluster/net/accept_connection.rs index 0daa8b2da..339625f3f 100644 --- a/secret_store/src/key_server_cluster/net/accept_connection.rs +++ b/secret_store/src/key_server_cluster/net/accept_connection.rs @@ -17,19 +17,18 @@ use std::io; use std::net::SocketAddr; use std::time::Duration; -use std::collections::BTreeSet; use futures::{Future, Poll}; use tokio_core::reactor::Handle; use tokio_core::net::TcpStream; use ethkey::KeyPair; -use key_server_cluster::{Error, NodeId}; +use key_server_cluster::Error; use key_server_cluster::io::{accept_handshake, Handshake, Deadline, deadline}; use key_server_cluster::net::Connection; /// Create future for accepting incoming connection. -pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair, trusted_nodes: BTreeSet) -> Deadline { +pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair) -> Deadline { let accept = AcceptConnection { - handshake: accept_handshake(stream, self_key_pair, trusted_nodes), + handshake: accept_handshake(stream, self_key_pair), address: address, }; diff --git a/secret_store/src/key_server_set.rs b/secret_store/src/key_server_set.rs new file mode 100644 index 000000000..e17dceed5 --- /dev/null +++ b/secret_store/src/key_server_set.rs @@ -0,0 +1,204 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::sync::{Arc, Weak}; +use std::net::SocketAddr; +use std::collections::BTreeMap; +use futures::{future, Future}; +use parking_lot::Mutex; +use ethcore::filter::Filter; +use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; +use native_contracts::KeyServerSet as KeyServerSetContract; +use util::{H256, Address, Bytes, Hashable}; +use types::all::{Error, Public, NodeAddress}; + +const KEY_SERVER_SET_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_server_set"; + +/// Key server has been added to the set. +const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)"; +/// Key server has been removed from the set. +const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)"; + +lazy_static! { + static ref ADDED_EVENT_NAME_HASH: H256 = ADDED_EVENT_NAME.sha3(); + static ref REMOVED_EVENT_NAME_HASH: H256 = REMOVED_EVENT_NAME.sha3(); +} + +/// Key Server set +pub trait KeyServerSet: Send + Sync { + /// Get set of configured key servers + fn get(&self) -> BTreeMap; +} + +/// On-chain Key Server set implementation. +pub struct OnChainKeyServerSet { + /// Cached on-chain contract. + contract: Mutex, +} + +/// Cached on-chain Key Server set contract. +struct CachedContract { + /// Blockchain client. + client: Weak, + /// Contract address. + contract_addr: Option
, + /// Active set of key servers. + key_servers: BTreeMap, +} + +impl OnChainKeyServerSet { + pub fn new(client: &Arc, key_servers: BTreeMap) -> Result, Error> { + let mut cached_contract = CachedContract::new(client, key_servers)?; + let key_server_contract_address = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned()); + // only initialize from contract if it is installed. otherwise - use default nodes + // once the contract is installed, all default nodes are lost (if not in the contract' set) + if key_server_contract_address.is_some() { + cached_contract.read_from_registry(&*client, key_server_contract_address); + } + + let key_server_set = Arc::new(OnChainKeyServerSet { + contract: Mutex::new(cached_contract), + }); + client.add_notify(key_server_set.clone()); + Ok(key_server_set) + } +} + +impl KeyServerSet for OnChainKeyServerSet { + fn get(&self) -> BTreeMap { + self.contract.lock().get() + } +} + +impl ChainNotify for OnChainKeyServerSet { + fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { + if !enacted.is_empty() || !retracted.is_empty() { + self.contract.lock().update(enacted, retracted) + } + } +} + +impl CachedContract { + pub fn new(client: &Arc, key_servers: BTreeMap) -> Result { + Ok(CachedContract { + client: Arc::downgrade(client), + contract_addr: None, + key_servers: key_servers.into_iter() + .map(|(p, addr)| { + let addr = format!("{}:{}", addr.address, addr.port).parse() + .map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?; + Ok((p, addr)) + }) + .collect::, Error>>()?, + }) + } + + pub fn update(&mut self, enacted: Vec, retracted: Vec) { + if let Some(client) = self.client.upgrade() { + let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned()); + + // new contract installed => read nodes set from the contract + if self.contract_addr.as_ref() != new_contract_addr.as_ref() { + self.read_from_registry(&*client, new_contract_addr); + return; + } + + // check for contract events + let is_set_changed = self.contract_addr.is_some() && enacted.iter() + .chain(retracted.iter()) + .any(|block_hash| !client.logs(Filter { + from_block: BlockId::Hash(block_hash.clone()), + to_block: BlockId::Hash(block_hash.clone()), + address: self.contract_addr.clone().map(|a| vec![a]), + topics: vec![ + Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH]), + None, + None, + None, + ], + limit: Some(1), + }).is_empty()); + // to simplify processing - just re-read the whole nodes set from the contract + if is_set_changed { + self.read_from_registry(&*client, new_contract_addr); + } + } + } + + pub fn get(&self) -> BTreeMap { + self.key_servers.clone() + } + + fn read_from_registry(&mut self, client: &Client, new_contract_address: Option
) { + self.key_servers = new_contract_address.map(|contract_addr| { + trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr); + + KeyServerSetContract::new(contract_addr) + }) + .map(|contract| { + let mut key_servers = BTreeMap::new(); + let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d)); + let key_servers_list = contract.get_key_servers(do_call).wait() + .map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err }) + .unwrap_or_default(); + for key_server in key_servers_list { + let key_server_public = contract.get_key_server_public( + |a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait() + .and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) }); + let key_server_ip = contract.get_key_server_address( + |a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait() + .and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e))); + + // only add successfully parsed nodes + match (key_server_public, key_server_ip) { + (Ok(key_server_public), Ok(key_server_ip)) => { key_servers.insert(key_server_public, key_server_ip); }, + (Err(public_err), _) => warn!(target: "secretstore_net", "received invalid public from key server set contract: {}", public_err), + (_, Err(ip_err)) => warn!(target: "secretstore_net", "received invalid IP from key server set contract: {}", ip_err), + } + } + key_servers + }) + .unwrap_or_default(); + self.contract_addr = new_contract_address; + } +} + +#[cfg(test)] +pub mod tests { + use std::collections::BTreeMap; + use std::net::SocketAddr; + use ethkey::Public; + use super::KeyServerSet; + + #[derive(Default)] + pub struct MapKeyServerSet { + nodes: BTreeMap, + } + + impl MapKeyServerSet { + pub fn new(nodes: BTreeMap) -> Self { + MapKeyServerSet { + nodes: nodes, + } + } + } + + impl KeyServerSet for MapKeyServerSet { + fn get(&self) -> BTreeMap { + self.nodes.clone() + } + } +} diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index f8a74dd1a..9750f7223 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -21,6 +21,8 @@ extern crate log; extern crate futures; extern crate futures_cpupool; extern crate hyper; +#[macro_use] +extern crate lazy_static; extern crate parking_lot; extern crate rustc_hex; extern crate serde; @@ -56,6 +58,7 @@ mod http_listener; mod key_server; mod key_storage; mod serialization; +mod key_server_set; use std::sync::Arc; use ethcore::client::Client; @@ -68,9 +71,10 @@ pub use traits::{KeyServer}; pub fn start(client: Arc, config: ServiceConfiguration) -> Result, Error> { use std::sync::Arc; - let acl_storage = Arc::new(acl_storage::OnChainAclStorage::new(client)); + let acl_storage = acl_storage::OnChainAclStorage::new(&client); + let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, config.cluster_config.nodes.clone())?; let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?); - let key_server = key_server::KeyServerImpl::new(&config.cluster_config, acl_storage, key_storage)?; + let key_server = key_server::KeyServerImpl::new(&config.cluster_config, key_server_set, acl_storage, key_storage)?; let listener = http_listener::KeyServerHttpListener::start(&config.listener_address, key_server)?; Ok(Box::new(listener)) }