SecretStore: do not cache ACL contract + on-chain key servers configuration (#6107)
* do not cache ACL storage contract * when error comes before initialization * initial KeyServerSet commit * update_nodes_set in maintain * do not connect to self * fixed connection establishing * removed println * improved KeyServerSet tracing * moved parsing to KeyServerSet * re-read only when blockchain is changed * do not try to connect if not a part of cluster * improved logging * fixed tests
This commit is contained in:
parent
a20892e5e6
commit
872e5537bb
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -723,6 +723,7 @@ dependencies = [
|
|||||||
"futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
"futures-cpupool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"hyper 0.10.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
"hyper 0.10.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"native-contracts 0.1.0",
|
"native-contracts 0.1.0",
|
||||||
"parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
@ -21,6 +21,7 @@ use std::fs::File;
|
|||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
|
||||||
// TODO: just walk the "res" directory and generate whole crate automatically.
|
// TODO: just walk the "res" directory and generate whole crate automatically.
|
||||||
|
const KEY_SERVER_SET_ABI: &'static str = include_str!("res/key_server_set.json");
|
||||||
const REGISTRY_ABI: &'static str = include_str!("res/registrar.json");
|
const REGISTRY_ABI: &'static str = include_str!("res/registrar.json");
|
||||||
const URLHINT_ABI: &'static str = include_str!("res/urlhint.json");
|
const URLHINT_ABI: &'static str = include_str!("res/urlhint.json");
|
||||||
const SERVICE_TRANSACTION_ABI: &'static str = include_str!("res/service_transaction.json");
|
const SERVICE_TRANSACTION_ABI: &'static str = include_str!("res/service_transaction.json");
|
||||||
@ -45,6 +46,7 @@ fn build_test_contracts() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
build_file("KeyServerSet", KEY_SERVER_SET_ABI, "key_server_set.rs");
|
||||||
build_file("Registry", REGISTRY_ABI, "registry.rs");
|
build_file("Registry", REGISTRY_ABI, "registry.rs");
|
||||||
build_file("Urlhint", URLHINT_ABI, "urlhint.rs");
|
build_file("Urlhint", URLHINT_ABI, "urlhint.rs");
|
||||||
build_file("ServiceTransactionChecker", SERVICE_TRANSACTION_ABI, "service_transaction.rs");
|
build_file("ServiceTransactionChecker", SERVICE_TRANSACTION_ABI, "service_transaction.rs");
|
||||||
|
1
ethcore/native_contracts/res/key_server_set.json
Normal file
1
ethcore/native_contracts/res/key_server_set.json
Normal file
@ -0,0 +1 @@
|
|||||||
|
[{"constant":true,"inputs":[{"name":"","type":"uint256"}],"name":"keyServersList","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"_new","type":"address"}],"name":"setOwner","outputs":[],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerPublic","outputs":[{"name":"","type":"bytes"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"getKeyServers","outputs":[{"name":"","type":"address[]"}],"payable":false,"type":"function"},{"constant":true,"inputs":[],"name":"owner","outputs":[{"name":"","type":"address"}],"payable":false,"type":"function"},{"constant":true,"inputs":[{"name":"keyServer","type":"address"}],"name":"getKeyServerAddress","outputs":[{"name":"","type":"string"}],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServer","type":"address"}],"name":"removeKeyServer","outputs":[],"payable":false,"type":"function"},{"constant":false,"inputs":[{"name":"keyServerPublic","type":"bytes"},{"name":"keyServerIp","type":"string"}],"name":"addKeyServer","outputs":[],"payable":false,"type":"function"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerAdded","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"name":"keyServer","type":"address"}],"name":"KeyServerRemoved","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"old","type":"address"},{"indexed":true,"name":"current","type":"address"}],"name":"NewOwner","type":"event"}]
|
21
ethcore/native_contracts/src/key_server_set.rs
Normal file
21
ethcore/native_contracts/src/key_server_set.rs
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Parity.
|
||||||
|
|
||||||
|
// Parity is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
#![allow(unused_mut, unused_variables, unused_imports)]
|
||||||
|
|
||||||
|
//! Secret store Key Server set contract.
|
||||||
|
|
||||||
|
include!(concat!(env!("OUT_DIR"), "/key_server_set.rs"));
|
@ -23,6 +23,7 @@ extern crate byteorder;
|
|||||||
extern crate ethabi;
|
extern crate ethabi;
|
||||||
extern crate ethcore_util as util;
|
extern crate ethcore_util as util;
|
||||||
|
|
||||||
|
mod key_server_set;
|
||||||
mod registry;
|
mod registry;
|
||||||
mod urlhint;
|
mod urlhint;
|
||||||
mod service_transaction;
|
mod service_transaction;
|
||||||
@ -32,6 +33,7 @@ mod validator_report;
|
|||||||
|
|
||||||
pub mod test_contracts;
|
pub mod test_contracts;
|
||||||
|
|
||||||
|
pub use self::key_server_set::KeyServerSet;
|
||||||
pub use self::registry::Registry;
|
pub use self::registry::Registry;
|
||||||
pub use self::urlhint::Urlhint;
|
pub use self::urlhint::Urlhint;
|
||||||
pub use self::service_transaction::ServiceTransactionChecker;
|
pub use self::service_transaction::ServiceTransactionChecker;
|
||||||
|
@ -35,3 +35,4 @@ ethcore-logger = { path = "../logger" }
|
|||||||
ethcrypto = { path = "../ethcrypto" }
|
ethcrypto = { path = "../ethcrypto" }
|
||||||
ethkey = { path = "../ethkey" }
|
ethkey = { path = "../ethkey" }
|
||||||
native-contracts = { path = "../ethcore/native_contracts" }
|
native-contracts = { path = "../ethcore/native_contracts" }
|
||||||
|
lazy_static = "0.2"
|
||||||
|
@ -14,12 +14,13 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Weak};
|
||||||
use futures::{future, Future};
|
use futures::{future, Future};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use ethkey::public_to_address;
|
use ethkey::public_to_address;
|
||||||
use ethcore::client::{Client, BlockChainClient, BlockId};
|
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify};
|
||||||
use native_contracts::SecretStoreAclStorage;
|
use native_contracts::SecretStoreAclStorage;
|
||||||
|
use util::{H256, Address, Bytes};
|
||||||
use types::all::{Error, ServerKeyId, Public};
|
use types::all::{Error, ServerKeyId, Public};
|
||||||
|
|
||||||
const ACL_CHECKER_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_acl_checker";
|
const ACL_CHECKER_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_acl_checker";
|
||||||
@ -32,40 +33,82 @@ pub trait AclStorage: Send + Sync {
|
|||||||
|
|
||||||
/// On-chain ACL storage implementation.
|
/// On-chain ACL storage implementation.
|
||||||
pub struct OnChainAclStorage {
|
pub struct OnChainAclStorage {
|
||||||
|
/// Cached on-chain contract.
|
||||||
|
contract: Mutex<CachedContract>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cached on-chain ACL storage contract.
|
||||||
|
struct CachedContract {
|
||||||
/// Blockchain client.
|
/// Blockchain client.
|
||||||
client: Arc<Client>,
|
client: Weak<Client>,
|
||||||
/// On-chain contract.
|
/// Contract address.
|
||||||
contract: Mutex<Option<SecretStoreAclStorage>>,
|
contract_addr: Option<Address>,
|
||||||
|
/// Contract at given address.
|
||||||
|
contract: Option<SecretStoreAclStorage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OnChainAclStorage {
|
impl OnChainAclStorage {
|
||||||
pub fn new(client: Arc<Client>) -> Self {
|
pub fn new(client: &Arc<Client>) -> Arc<Self> {
|
||||||
OnChainAclStorage {
|
let acl_storage = Arc::new(OnChainAclStorage {
|
||||||
client: client,
|
contract: Mutex::new(CachedContract::new(client)),
|
||||||
contract: Mutex::new(None),
|
});
|
||||||
}
|
client.add_notify(acl_storage.clone());
|
||||||
|
acl_storage
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AclStorage for OnChainAclStorage {
|
impl AclStorage for OnChainAclStorage {
|
||||||
fn check(&self, public: &Public, document: &ServerKeyId) -> Result<bool, Error> {
|
fn check(&self, public: &Public, document: &ServerKeyId) -> Result<bool, Error> {
|
||||||
let mut contract = self.contract.lock();
|
self.contract.lock().check(public, document)
|
||||||
if !contract.is_some() {
|
}
|
||||||
*contract = self.client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned())
|
}
|
||||||
.and_then(|contract_addr| {
|
|
||||||
|
impl ChainNotify for OnChainAclStorage {
|
||||||
|
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
|
||||||
|
if !enacted.is_empty() || !retracted.is_empty() {
|
||||||
|
self.contract.lock().update()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CachedContract {
|
||||||
|
pub fn new(client: &Arc<Client>) -> Self {
|
||||||
|
CachedContract {
|
||||||
|
client: Arc::downgrade(client),
|
||||||
|
contract_addr: None,
|
||||||
|
contract: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update(&mut self) {
|
||||||
|
if let Some(client) = self.client.upgrade() {
|
||||||
|
let new_contract_addr = client.registry_address(ACL_CHECKER_CONTRACT_REGISTRY_NAME.to_owned());
|
||||||
|
if self.contract_addr.as_ref() != new_contract_addr.as_ref() {
|
||||||
|
self.contract = new_contract_addr.map(|contract_addr| {
|
||||||
trace!(target: "secretstore", "Configuring for ACL checker contract from {}", contract_addr);
|
trace!(target: "secretstore", "Configuring for ACL checker contract from {}", contract_addr);
|
||||||
|
|
||||||
Some(SecretStoreAclStorage::new(contract_addr))
|
SecretStoreAclStorage::new(contract_addr)
|
||||||
})
|
});
|
||||||
|
|
||||||
|
self.contract_addr = new_contract_addr;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Some(ref contract) = *contract {
|
}
|
||||||
let address = public_to_address(&public);
|
|
||||||
let do_call = |a, d| future::done(self.client.call_contract(BlockId::Latest, a, d));
|
pub fn check(&mut self, public: &Public, document: &ServerKeyId) -> Result<bool, Error> {
|
||||||
contract.check_permissions(do_call, address, document.clone())
|
match self.contract.as_ref() {
|
||||||
.map_err(|err| Error::Internal(err))
|
Some(contract) => {
|
||||||
.wait()
|
let address = public_to_address(&public);
|
||||||
} else {
|
let do_call = |a, d| future::done(
|
||||||
Err(Error::Internal("ACL checker contract is not configured".to_owned()))
|
self.client
|
||||||
|
.upgrade()
|
||||||
|
.ok_or("Calling contract without client".into())
|
||||||
|
.and_then(|c| c.call_contract(BlockId::Latest, a, d)));
|
||||||
|
contract.check_permissions(do_call, address, document.clone())
|
||||||
|
.map_err(|err| Error::Internal(err))
|
||||||
|
.wait()
|
||||||
|
},
|
||||||
|
None => Err(Error::Internal("ACL checker contract is not configured".to_owned())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@ use ethcrypto;
|
|||||||
use ethkey;
|
use ethkey;
|
||||||
use super::acl_storage::AclStorage;
|
use super::acl_storage::AclStorage;
|
||||||
use super::key_storage::KeyStorage;
|
use super::key_storage::KeyStorage;
|
||||||
|
use super::key_server_set::KeyServerSet;
|
||||||
use key_server_cluster::{math, ClusterCore};
|
use key_server_cluster::{math, ClusterCore};
|
||||||
use traits::{ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer};
|
use traits::{ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer};
|
||||||
use types::all::{Error, Public, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow,
|
use types::all::{Error, Public, RequestSignature, ServerKeyId, EncryptedDocumentKey, EncryptedDocumentKeyShadow,
|
||||||
@ -44,9 +45,9 @@ pub struct KeyServerCore {
|
|||||||
|
|
||||||
impl KeyServerImpl {
|
impl KeyServerImpl {
|
||||||
/// Create new key server instance
|
/// Create new key server instance
|
||||||
pub fn new(config: &ClusterConfiguration, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
|
pub fn new(config: &ClusterConfiguration, key_server_set: Arc<KeyServerSet>, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
|
||||||
Ok(KeyServerImpl {
|
Ok(KeyServerImpl {
|
||||||
data: Arc::new(Mutex::new(KeyServerCore::new(config, acl_storage, key_storage)?)),
|
data: Arc::new(Mutex::new(KeyServerCore::new(config, key_server_set, acl_storage, key_storage)?)),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -143,14 +144,12 @@ impl MessageSigner for KeyServerImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl KeyServerCore {
|
impl KeyServerCore {
|
||||||
pub fn new(config: &ClusterConfiguration, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
|
pub fn new(config: &ClusterConfiguration, key_server_set: Arc<KeyServerSet>, acl_storage: Arc<AclStorage>, key_storage: Arc<KeyStorage>) -> Result<Self, Error> {
|
||||||
let config = NetClusterConfiguration {
|
let config = NetClusterConfiguration {
|
||||||
threads: config.threads,
|
threads: config.threads,
|
||||||
self_key_pair: ethkey::KeyPair::from_secret_slice(&config.self_private)?,
|
self_key_pair: ethkey::KeyPair::from_secret_slice(&config.self_private)?,
|
||||||
listen_address: (config.listener_address.address.clone(), config.listener_address.port),
|
listen_address: (config.listener_address.address.clone(), config.listener_address.port),
|
||||||
nodes: config.nodes.iter()
|
key_server_set: key_server_set,
|
||||||
.map(|(node_id, node_address)| (node_id.clone(), (node_address.address.clone(), node_address.port)))
|
|
||||||
.collect(),
|
|
||||||
allow_connecting_to_higher_nodes: config.allow_connecting_to_higher_nodes,
|
allow_connecting_to_higher_nodes: config.allow_connecting_to_higher_nodes,
|
||||||
acl_storage: acl_storage,
|
acl_storage: acl_storage,
|
||||||
key_storage: key_storage,
|
key_storage: key_storage,
|
||||||
@ -193,10 +192,13 @@ impl Drop for KeyServerCore {
|
|||||||
pub mod tests {
|
pub mod tests {
|
||||||
use std::time;
|
use std::time;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
use ethcrypto;
|
use ethcrypto;
|
||||||
use ethkey::{self, Secret, Random, Generator};
|
use ethkey::{self, Secret, Random, Generator};
|
||||||
use acl_storage::tests::DummyAclStorage;
|
use acl_storage::tests::DummyAclStorage;
|
||||||
use key_storage::tests::DummyKeyStorage;
|
use key_storage::tests::DummyKeyStorage;
|
||||||
|
use key_server_set::tests::MapKeyServerSet;
|
||||||
use key_server_cluster::math;
|
use key_server_cluster::math;
|
||||||
use util::H256;
|
use util::H256;
|
||||||
use types::all::{Error, Public, ClusterConfiguration, NodeAddress, RequestSignature, ServerKeyId,
|
use types::all::{Error, Public, ClusterConfiguration, NodeAddress, RequestSignature, ServerKeyId,
|
||||||
@ -254,8 +256,11 @@ pub mod tests {
|
|||||||
})).collect(),
|
})).collect(),
|
||||||
allow_connecting_to_higher_nodes: false,
|
allow_connecting_to_higher_nodes: false,
|
||||||
}).collect();
|
}).collect();
|
||||||
|
let key_servers_set: BTreeMap<Public, SocketAddr> = configs[0].nodes.iter()
|
||||||
|
.map(|(k, a)| (k.clone(), format!("{}:{}", a.address, a.port).parse().unwrap()))
|
||||||
|
.collect();
|
||||||
let key_servers: Vec<_> = configs.into_iter().map(|cfg|
|
let key_servers: Vec<_> = configs.into_iter().map(|cfg|
|
||||||
KeyServerImpl::new(&cfg, Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap()
|
KeyServerImpl::new(&cfg, Arc::new(MapKeyServerSet::new(key_servers_set.clone())), Arc::new(DummyAclStorage::default()), Arc::new(DummyKeyStorage::default())).unwrap()
|
||||||
).collect();
|
).collect();
|
||||||
|
|
||||||
// wait until connections are established. It is fast => do not bother with events here
|
// wait until connections are established. It is fast => do not bother with events here
|
||||||
|
@ -28,7 +28,7 @@ use tokio_core::reactor::{Handle, Remote, Interval};
|
|||||||
use tokio_core::net::{TcpListener, TcpStream};
|
use tokio_core::net::{TcpListener, TcpStream};
|
||||||
use ethkey::{Public, KeyPair, Signature, Random, Generator};
|
use ethkey::{Public, KeyPair, Signature, Random, Generator};
|
||||||
use util::H256;
|
use util::H256;
|
||||||
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage};
|
use key_server_cluster::{Error, NodeId, SessionId, AclStorage, KeyStorage, KeyServerSet};
|
||||||
use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, GenerationSessionWrapper, EncryptionSessionWrapper,
|
use key_server_cluster::cluster_sessions::{ClusterSession, ClusterSessions, GenerationSessionWrapper, EncryptionSessionWrapper,
|
||||||
DecryptionSessionWrapper, SigningSessionWrapper};
|
DecryptionSessionWrapper, SigningSessionWrapper};
|
||||||
use key_server_cluster::message::{self, Message, ClusterMessage, GenerationMessage, EncryptionMessage, DecryptionMessage,
|
use key_server_cluster::message::{self, Message, ClusterMessage, GenerationMessage, EncryptionMessage, DecryptionMessage,
|
||||||
@ -102,8 +102,8 @@ pub struct ClusterConfiguration {
|
|||||||
pub self_key_pair: KeyPair,
|
pub self_key_pair: KeyPair,
|
||||||
/// Interface to listen to.
|
/// Interface to listen to.
|
||||||
pub listen_address: (String, u16),
|
pub listen_address: (String, u16),
|
||||||
/// Cluster nodes.
|
/// Cluster nodes set.
|
||||||
pub nodes: BTreeMap<NodeId, (String, u16)>,
|
pub key_server_set: Arc<KeyServerSet>,
|
||||||
/// Reference to key storage
|
/// Reference to key storage
|
||||||
pub key_storage: Arc<KeyStorage>,
|
pub key_storage: Arc<KeyStorage>,
|
||||||
/// Reference to ACL storage
|
/// Reference to ACL storage
|
||||||
@ -158,9 +158,17 @@ pub struct ClusterConnections {
|
|||||||
/// Self node id.
|
/// Self node id.
|
||||||
pub self_node_id: NodeId,
|
pub self_node_id: NodeId,
|
||||||
/// All known other key servers.
|
/// All known other key servers.
|
||||||
pub nodes: BTreeMap<NodeId, SocketAddr>,
|
pub key_server_set: Arc<KeyServerSet>,
|
||||||
|
/// Connections data.
|
||||||
|
pub data: RwLock<ClusterConnectionsData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cluster connections data.
|
||||||
|
pub struct ClusterConnectionsData {
|
||||||
|
/// Active key servers set.
|
||||||
|
pub nodes: BTreeMap<Public, SocketAddr>,
|
||||||
/// Active connections to key servers.
|
/// Active connections to key servers.
|
||||||
pub connections: RwLock<BTreeMap<NodeId, Arc<Connection>>>,
|
pub connections: BTreeMap<NodeId, Arc<Connection>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cluster view core.
|
/// Cluster view core.
|
||||||
@ -281,8 +289,7 @@ impl ClusterCore {
|
|||||||
|
|
||||||
/// Accept connection future.
|
/// Accept connection future.
|
||||||
fn accept_connection_future(handle: &Handle, data: Arc<ClusterData>, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture {
|
fn accept_connection_future(handle: &Handle, data: Arc<ClusterData>, stream: TcpStream, node_address: SocketAddr) -> BoxedEmptyFuture {
|
||||||
let disconnected_nodes = data.connections.disconnected_nodes().keys().cloned().collect();
|
net_accept_connection(node_address, stream, handle, data.self_key_pair.clone())
|
||||||
net_accept_connection(node_address, stream, handle, data.self_key_pair.clone(), disconnected_nodes)
|
|
||||||
.then(move |result| ClusterCore::process_connection_result(data, true, result))
|
.then(move |result| ClusterCore::process_connection_result(data, true, result))
|
||||||
.then(|_| finished(()))
|
.then(|_| finished(()))
|
||||||
.boxed()
|
.boxed()
|
||||||
@ -354,6 +361,7 @@ impl ClusterCore {
|
|||||||
|
|
||||||
/// Try to connect to every disconnected node.
|
/// Try to connect to every disconnected node.
|
||||||
fn connect_disconnected_nodes(data: Arc<ClusterData>) {
|
fn connect_disconnected_nodes(data: Arc<ClusterData>) {
|
||||||
|
data.connections.update_nodes_set();
|
||||||
for (node_id, node_address) in data.connections.disconnected_nodes() {
|
for (node_id, node_address) in data.connections.disconnected_nodes() {
|
||||||
if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id {
|
if data.config.allow_connecting_to_higher_nodes || data.self_key_pair.public() < &node_id {
|
||||||
ClusterCore::connect(data.clone(), node_address);
|
ClusterCore::connect(data.clone(), node_address);
|
||||||
@ -372,14 +380,16 @@ impl ClusterCore {
|
|||||||
finished(Ok(())).boxed()
|
finished(Ok(())).boxed()
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok(DeadlineStatus::Meet(Err(_))) => {
|
Ok(DeadlineStatus::Meet(Err(err))) => {
|
||||||
|
warn!(target: "secretstore_net", "{}: protocol error {} when establishind connection", data.self_key_pair.public(), err);
|
||||||
finished(Ok(())).boxed()
|
finished(Ok(())).boxed()
|
||||||
},
|
},
|
||||||
Ok(DeadlineStatus::Timeout) => {
|
Ok(DeadlineStatus::Timeout) => {
|
||||||
|
warn!(target: "secretstore_net", "{}: timeout when establishind connection", data.self_key_pair.public());
|
||||||
finished(Ok(())).boxed()
|
finished(Ok(())).boxed()
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(err) => {
|
||||||
// network error
|
warn!(target: "secretstore_net", "{}: network error {} when establishind connection", data.self_key_pair.public(), err);
|
||||||
finished(Ok(())).boxed()
|
finished(Ok(())).boxed()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -665,33 +675,38 @@ impl ClusterCore {
|
|||||||
|
|
||||||
impl ClusterConnections {
|
impl ClusterConnections {
|
||||||
pub fn new(config: &ClusterConfiguration) -> Result<Self, Error> {
|
pub fn new(config: &ClusterConfiguration) -> Result<Self, Error> {
|
||||||
let mut connections = ClusterConnections {
|
let mut nodes = config.key_server_set.get();
|
||||||
|
nodes.remove(config.self_key_pair.public());
|
||||||
|
|
||||||
|
Ok(ClusterConnections {
|
||||||
self_node_id: config.self_key_pair.public().clone(),
|
self_node_id: config.self_key_pair.public().clone(),
|
||||||
nodes: BTreeMap::new(),
|
key_server_set: config.key_server_set.clone(),
|
||||||
connections: RwLock::new(BTreeMap::new()),
|
data: RwLock::new(ClusterConnectionsData {
|
||||||
};
|
nodes: nodes,
|
||||||
|
connections: BTreeMap::new(),
|
||||||
for (node_id, &(ref node_addr, node_port)) in config.nodes.iter().filter(|&(node_id, _)| node_id != config.self_key_pair.public()) {
|
}),
|
||||||
let socket_address = make_socket_address(&node_addr, node_port)?;
|
})
|
||||||
connections.nodes.insert(node_id.clone(), socket_address);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(connections)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cluster_state(&self) -> ClusterState {
|
pub fn cluster_state(&self) -> ClusterState {
|
||||||
ClusterState {
|
ClusterState {
|
||||||
connected: self.connections.read().keys().cloned().collect(),
|
connected: self.data.read().connections.keys().cloned().collect(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get(&self, node: &NodeId) -> Option<Arc<Connection>> {
|
pub fn get(&self, node: &NodeId) -> Option<Arc<Connection>> {
|
||||||
self.connections.read().get(node).cloned()
|
self.data.read().connections.get(node).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert(&self, connection: Arc<Connection>) -> bool {
|
pub fn insert(&self, connection: Arc<Connection>) -> bool {
|
||||||
let mut connections = self.connections.write();
|
let mut data = self.data.write();
|
||||||
if connections.contains_key(connection.node_id()) {
|
if !data.nodes.contains_key(connection.node_id()) {
|
||||||
|
// incoming connections are checked here
|
||||||
|
trace!(target: "secretstore_net", "{}: ignoring unknown connection from {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
|
||||||
|
debug_assert!(connection.is_inbound());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if data.connections.contains_key(connection.node_id()) {
|
||||||
// we have already connected to the same node
|
// we have already connected to the same node
|
||||||
// the agreement is that node with lower id must establish connection to node with higher id
|
// the agreement is that node with lower id must establish connection to node with higher id
|
||||||
if (&self.self_node_id < connection.node_id() && connection.is_inbound())
|
if (&self.self_node_id < connection.node_id() && connection.is_inbound())
|
||||||
@ -700,14 +715,15 @@ impl ClusterConnections {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!(target: "secretstore_net", "{}: inserting connection to {} at {}", self.self_node_id, connection.node_id(), connection.node_address());
|
trace!(target: "secretstore_net", "{}: inserting connection to {} at {}. Connected to {} of {} nodes",
|
||||||
connections.insert(connection.node_id().clone(), connection);
|
self.self_node_id, connection.node_id(), connection.node_address(), data.connections.len() + 1, data.nodes.len());
|
||||||
|
data.connections.insert(connection.node_id().clone(), connection);
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove(&self, node: &NodeId, is_inbound: bool) {
|
pub fn remove(&self, node: &NodeId, is_inbound: bool) {
|
||||||
let mut connections = self.connections.write();
|
let mut data = self.data.write();
|
||||||
if let Entry::Occupied(entry) = connections.entry(node.clone()) {
|
if let Entry::Occupied(entry) = data.connections.entry(node.clone()) {
|
||||||
if entry.get().is_inbound() != is_inbound {
|
if entry.get().is_inbound() != is_inbound {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -718,20 +734,64 @@ impl ClusterConnections {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn connected_nodes(&self) -> BTreeSet<NodeId> {
|
pub fn connected_nodes(&self) -> BTreeSet<NodeId> {
|
||||||
self.connections.read().keys().cloned().collect()
|
self.data.read().connections.keys().cloned().collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn active_connections(&self)-> Vec<Arc<Connection>> {
|
pub fn active_connections(&self)-> Vec<Arc<Connection>> {
|
||||||
self.connections.read().values().cloned().collect()
|
self.data.read().connections.values().cloned().collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disconnected_nodes(&self) -> BTreeMap<NodeId, SocketAddr> {
|
pub fn disconnected_nodes(&self) -> BTreeMap<NodeId, SocketAddr> {
|
||||||
let connections = self.connections.read();
|
let data = self.data.read();
|
||||||
self.nodes.iter()
|
data.nodes.iter()
|
||||||
.filter(|&(node_id, _)| !connections.contains_key(node_id))
|
.filter(|&(node_id, _)| !data.connections.contains_key(node_id))
|
||||||
.map(|(node_id, node_address)| (node_id.clone(), node_address.clone()))
|
.map(|(node_id, node_address)| (node_id.clone(), node_address.clone()))
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn update_nodes_set(&self) {
|
||||||
|
let mut data = self.data.write();
|
||||||
|
let mut new_nodes = self.key_server_set.get();
|
||||||
|
// we do not need to connect to self
|
||||||
|
// + we do not need to try to connect to any other node if we are not the part of a cluster
|
||||||
|
if new_nodes.remove(&self.self_node_id).is_none() {
|
||||||
|
new_nodes.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut num_added_nodes = 0;
|
||||||
|
let mut num_removed_nodes = 0;
|
||||||
|
let mut num_changed_nodes = 0;
|
||||||
|
|
||||||
|
for obsolete_node in data.nodes.keys().cloned().collect::<Vec<_>>() {
|
||||||
|
if !new_nodes.contains_key(&obsolete_node) {
|
||||||
|
if let Entry::Occupied(entry) = data.connections.entry(obsolete_node) {
|
||||||
|
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
|
||||||
|
entry.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
data.nodes.remove(&obsolete_node);
|
||||||
|
num_removed_nodes += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (new_node_public, new_node_addr) in new_nodes {
|
||||||
|
match data.nodes.insert(new_node_public, new_node_addr) {
|
||||||
|
None => num_added_nodes += 1,
|
||||||
|
Some(old_node_addr) => if new_node_addr != old_node_addr {
|
||||||
|
if let Entry::Occupied(entry) = data.connections.entry(new_node_public) {
|
||||||
|
trace!(target: "secretstore_net", "{}: removing connection to {} at {}", self.self_node_id, entry.get().node_id(), entry.get().node_address());
|
||||||
|
entry.remove();
|
||||||
|
}
|
||||||
|
num_changed_nodes += 1;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if num_added_nodes != 0 || num_removed_nodes != 0 || num_changed_nodes != 0 {
|
||||||
|
trace!(target: "secretstore_net", "{}: updated nodes set: removed {}, added {}, changed {}. Connected to {} of {} nodes",
|
||||||
|
self.self_node_id, num_removed_nodes, num_added_nodes, num_changed_nodes, data.connections.len(), data.nodes.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClusterData {
|
impl ClusterData {
|
||||||
@ -929,7 +989,7 @@ pub mod tests {
|
|||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use tokio_core::reactor::Core;
|
use tokio_core::reactor::Core;
|
||||||
use ethkey::{Random, Generator, Public};
|
use ethkey::{Random, Generator, Public};
|
||||||
use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage};
|
use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet};
|
||||||
use key_server_cluster::message::Message;
|
use key_server_cluster::message::Message;
|
||||||
use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration};
|
use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration};
|
||||||
use key_server_cluster::generation_session::{Session as GenerationSession, SessionState as GenerationSessionState};
|
use key_server_cluster::generation_session::{Session as GenerationSession, SessionState as GenerationSessionState};
|
||||||
@ -999,7 +1059,7 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn all_connections_established(cluster: &Arc<ClusterCore>) -> bool {
|
pub fn all_connections_established(cluster: &Arc<ClusterCore>) -> bool {
|
||||||
cluster.config().nodes.keys()
|
cluster.config().key_server_set.get().keys()
|
||||||
.filter(|p| *p != cluster.config().self_key_pair.public())
|
.filter(|p| *p != cluster.config().self_key_pair.public())
|
||||||
.all(|p| cluster.connection(p).is_some())
|
.all(|p| cluster.connection(p).is_some())
|
||||||
}
|
}
|
||||||
@ -1010,9 +1070,9 @@ pub mod tests {
|
|||||||
threads: 1,
|
threads: 1,
|
||||||
self_key_pair: key_pairs[i].clone(),
|
self_key_pair: key_pairs[i].clone(),
|
||||||
listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16),
|
listen_address: ("127.0.0.1".to_owned(), ports_begin + i as u16),
|
||||||
nodes: key_pairs.iter().enumerate()
|
key_server_set: Arc::new(MapKeyServerSet::new(key_pairs.iter().enumerate()
|
||||||
.map(|(j, kp)| (kp.public().clone(), ("127.0.0.1".into(), ports_begin + j as u16)))
|
.map(|(j, kp)| (kp.public().clone(), format!("127.0.0.1:{}", ports_begin + j as u16).parse().unwrap()))
|
||||||
.collect(),
|
.collect())),
|
||||||
allow_connecting_to_higher_nodes: false,
|
allow_connecting_to_higher_nodes: false,
|
||||||
key_storage: Arc::new(DummyKeyStorage::default()),
|
key_storage: Arc::new(DummyKeyStorage::default()),
|
||||||
acl_storage: Arc::new(DummyAclStorage::default()),
|
acl_storage: Arc::new(DummyAclStorage::default()),
|
||||||
|
@ -135,7 +135,7 @@ impl ClusterSessions {
|
|||||||
pub fn new(config: &ClusterConfiguration) -> Self {
|
pub fn new(config: &ClusterConfiguration) -> Self {
|
||||||
ClusterSessions {
|
ClusterSessions {
|
||||||
self_node_id: config.self_key_pair.public().clone(),
|
self_node_id: config.self_key_pair.public().clone(),
|
||||||
nodes: config.nodes.keys().cloned().collect(),
|
nodes: config.key_server_set.get().keys().cloned().collect(),
|
||||||
acl_storage: config.acl_storage.clone(),
|
acl_storage: config.acl_storage.clone(),
|
||||||
key_storage: config.key_storage.clone(),
|
key_storage: config.key_storage.clone(),
|
||||||
generation_sessions: ClusterSessionsContainer::new(),
|
generation_sessions: ClusterSessionsContainer::new(),
|
||||||
|
@ -45,7 +45,7 @@ pub fn handshake_with_plain_confirmation<A>(a: A, self_confirmation_plain: Resul
|
|||||||
state: state,
|
state: state,
|
||||||
self_key_pair: self_key_pair,
|
self_key_pair: self_key_pair,
|
||||||
self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()),
|
self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()),
|
||||||
trusted_nodes: trusted_nodes,
|
trusted_nodes: Some(trusted_nodes),
|
||||||
other_node_id: None,
|
other_node_id: None,
|
||||||
other_confirmation_plain: None,
|
other_confirmation_plain: None,
|
||||||
shared_key: None,
|
shared_key: None,
|
||||||
@ -53,7 +53,7 @@ pub fn handshake_with_plain_confirmation<A>(a: A, self_confirmation_plain: Resul
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Wait for handshake procedure to be started by another node from the cluster.
|
/// Wait for handshake procedure to be started by another node from the cluster.
|
||||||
pub fn accept_handshake<A>(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet<NodeId>) -> Handshake<A> where A: AsyncWrite + AsyncRead {
|
pub fn accept_handshake<A>(a: A, self_key_pair: KeyPair) -> Handshake<A> where A: AsyncWrite + AsyncRead {
|
||||||
let self_confirmation_plain = Random.generate().map(|kp| *kp.secret().clone()).map_err(Into::into);
|
let self_confirmation_plain = Random.generate().map(|kp| *kp.secret().clone()).map_err(Into::into);
|
||||||
let (error, state) = match self_confirmation_plain.clone() {
|
let (error, state) = match self_confirmation_plain.clone() {
|
||||||
Ok(_) => (None, HandshakeState::ReceivePublicKey(read_message(a))),
|
Ok(_) => (None, HandshakeState::ReceivePublicKey(read_message(a))),
|
||||||
@ -66,7 +66,7 @@ pub fn accept_handshake<A>(a: A, self_key_pair: KeyPair, trusted_nodes: BTreeSet
|
|||||||
state: state,
|
state: state,
|
||||||
self_key_pair: self_key_pair,
|
self_key_pair: self_key_pair,
|
||||||
self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()),
|
self_confirmation_plain: self_confirmation_plain.unwrap_or(Default::default()),
|
||||||
trusted_nodes: trusted_nodes,
|
trusted_nodes: None,
|
||||||
other_node_id: None,
|
other_node_id: None,
|
||||||
other_confirmation_plain: None,
|
other_confirmation_plain: None,
|
||||||
shared_key: None,
|
shared_key: None,
|
||||||
@ -89,7 +89,7 @@ pub struct Handshake<A> {
|
|||||||
state: HandshakeState<A>,
|
state: HandshakeState<A>,
|
||||||
self_key_pair: KeyPair,
|
self_key_pair: KeyPair,
|
||||||
self_confirmation_plain: H256,
|
self_confirmation_plain: H256,
|
||||||
trusted_nodes: BTreeSet<NodeId>,
|
trusted_nodes: Option<BTreeSet<NodeId>>,
|
||||||
other_node_id: Option<NodeId>,
|
other_node_id: Option<NodeId>,
|
||||||
other_confirmation_plain: Option<H256>,
|
other_confirmation_plain: Option<H256>,
|
||||||
shared_key: Option<KeyPair>,
|
shared_key: Option<KeyPair>,
|
||||||
@ -172,7 +172,7 @@ impl<A> Future for Handshake<A> where A: AsyncRead + AsyncWrite {
|
|||||||
Err(err) => return Ok((stream, Err(err.into())).into()),
|
Err(err) => return Ok((stream, Err(err.into())).into()),
|
||||||
};
|
};
|
||||||
|
|
||||||
if !self.trusted_nodes.contains(&*message.node_id) {
|
if !self.trusted_nodes.as_ref().map(|tn| tn.contains(&*message.node_id)).unwrap_or(true) {
|
||||||
return Ok((stream, Err(Error::InvalidNodeId)).into());
|
return Ok((stream, Err(Error::InvalidNodeId)).into());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -300,7 +300,7 @@ mod tests {
|
|||||||
let trusted_nodes: BTreeSet<_> = vec![io.peer_public().clone()].into_iter().collect();
|
let trusted_nodes: BTreeSet<_> = vec![io.peer_public().clone()].into_iter().collect();
|
||||||
let shared_key = compute_shared_key(self_key_pair.secret(), trusted_nodes.iter().nth(0).unwrap()).unwrap();
|
let shared_key = compute_shared_key(self_key_pair.secret(), trusted_nodes.iter().nth(0).unwrap()).unwrap();
|
||||||
|
|
||||||
let mut handshake = accept_handshake(io, self_key_pair, trusted_nodes);
|
let mut handshake = accept_handshake(io, self_key_pair);
|
||||||
handshake.set_self_confirmation_plain(self_confirmation_plain);
|
handshake.set_self_confirmation_plain(self_confirmation_plain);
|
||||||
|
|
||||||
let handshake_result = handshake.wait().unwrap();
|
let handshake_result = handshake.wait().unwrap();
|
||||||
|
@ -299,22 +299,22 @@ impl<Executor, Transport> JobSession<Executor, Transport> where Executor: JobExe
|
|||||||
return Err(Error::ConsensusUnreachable);
|
return Err(Error::ConsensusUnreachable);
|
||||||
}
|
}
|
||||||
|
|
||||||
let active_data = self.data.active_data.as_mut()
|
if let Some(active_data) = self.data.active_data.as_mut() {
|
||||||
.expect("we have checked that we are on master node; on master nodes active_data is filled during initialization; qed");
|
if active_data.rejects.contains(node) {
|
||||||
if active_data.rejects.contains(node) {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
if active_data.requests.remove(node) || active_data.responses.remove(node).is_some() {
|
|
||||||
active_data.rejects.insert(node.clone());
|
|
||||||
if self.data.state == JobSessionState::Finished && active_data.responses.len() < self.meta.threshold + 1 {
|
|
||||||
self.data.state = JobSessionState::Active;
|
|
||||||
}
|
|
||||||
if active_data.requests.len() + active_data.responses.len() >= self.meta.threshold + 1 {
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
if active_data.requests.remove(node) || active_data.responses.remove(node).is_some() {
|
||||||
|
active_data.rejects.insert(node.clone());
|
||||||
|
if self.data.state == JobSessionState::Finished && active_data.responses.len() < self.meta.threshold + 1 {
|
||||||
|
self.data.state = JobSessionState::Active;
|
||||||
|
}
|
||||||
|
if active_data.requests.len() + active_data.responses.len() >= self.meta.threshold + 1 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
self.data.state = JobSessionState::Failed;
|
self.data.state = JobSessionState::Failed;
|
||||||
return Err(Error::ConsensusUnreachable);
|
return Err(Error::ConsensusUnreachable);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -23,6 +23,7 @@ use super::types::all::ServerKeyId;
|
|||||||
pub use super::types::all::{NodeId, EncryptedDocumentKeyShadow};
|
pub use super::types::all::{NodeId, EncryptedDocumentKeyShadow};
|
||||||
pub use super::acl_storage::AclStorage;
|
pub use super::acl_storage::AclStorage;
|
||||||
pub use super::key_storage::{KeyStorage, DocumentKeyShare};
|
pub use super::key_storage::{KeyStorage, DocumentKeyShare};
|
||||||
|
pub use super::key_server_set::KeyServerSet;
|
||||||
pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash};
|
pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash};
|
||||||
pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient};
|
pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient};
|
||||||
pub use self::generation_session::Session as GenerationSession;
|
pub use self::generation_session::Session as GenerationSession;
|
||||||
@ -33,6 +34,8 @@ pub use self::decryption_session::Session as DecryptionSession;
|
|||||||
pub use super::key_storage::tests::DummyKeyStorage;
|
pub use super::key_storage::tests::DummyKeyStorage;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub use super::acl_storage::tests::DummyAclStorage;
|
pub use super::acl_storage::tests::DummyAclStorage;
|
||||||
|
#[cfg(test)]
|
||||||
|
pub use super::key_server_set::tests::MapKeyServerSet;
|
||||||
|
|
||||||
pub type SessionId = ServerKeyId;
|
pub type SessionId = ServerKeyId;
|
||||||
|
|
||||||
|
@ -17,19 +17,18 @@
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::collections::BTreeSet;
|
|
||||||
use futures::{Future, Poll};
|
use futures::{Future, Poll};
|
||||||
use tokio_core::reactor::Handle;
|
use tokio_core::reactor::Handle;
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_core::net::TcpStream;
|
||||||
use ethkey::KeyPair;
|
use ethkey::KeyPair;
|
||||||
use key_server_cluster::{Error, NodeId};
|
use key_server_cluster::Error;
|
||||||
use key_server_cluster::io::{accept_handshake, Handshake, Deadline, deadline};
|
use key_server_cluster::io::{accept_handshake, Handshake, Deadline, deadline};
|
||||||
use key_server_cluster::net::Connection;
|
use key_server_cluster::net::Connection;
|
||||||
|
|
||||||
/// Create future for accepting incoming connection.
|
/// Create future for accepting incoming connection.
|
||||||
pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair, trusted_nodes: BTreeSet<NodeId>) -> Deadline<AcceptConnection> {
|
pub fn accept_connection(address: SocketAddr, stream: TcpStream, handle: &Handle, self_key_pair: KeyPair) -> Deadline<AcceptConnection> {
|
||||||
let accept = AcceptConnection {
|
let accept = AcceptConnection {
|
||||||
handshake: accept_handshake(stream, self_key_pair, trusted_nodes),
|
handshake: accept_handshake(stream, self_key_pair),
|
||||||
address: address,
|
address: address,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
204
secret_store/src/key_server_set.rs
Normal file
204
secret_store/src/key_server_set.rs
Normal file
@ -0,0 +1,204 @@
|
|||||||
|
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Parity.
|
||||||
|
|
||||||
|
// Parity is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
use std::sync::{Arc, Weak};
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use futures::{future, Future};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use ethcore::filter::Filter;
|
||||||
|
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify};
|
||||||
|
use native_contracts::KeyServerSet as KeyServerSetContract;
|
||||||
|
use util::{H256, Address, Bytes, Hashable};
|
||||||
|
use types::all::{Error, Public, NodeAddress};
|
||||||
|
|
||||||
|
const KEY_SERVER_SET_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_server_set";
|
||||||
|
|
||||||
|
/// Key server has been added to the set.
|
||||||
|
const ADDED_EVENT_NAME: &'static [u8] = &*b"KeyServerAdded(address)";
|
||||||
|
/// Key server has been removed from the set.
|
||||||
|
const REMOVED_EVENT_NAME: &'static [u8] = &*b"KeyServerRemoved(address)";
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref ADDED_EVENT_NAME_HASH: H256 = ADDED_EVENT_NAME.sha3();
|
||||||
|
static ref REMOVED_EVENT_NAME_HASH: H256 = REMOVED_EVENT_NAME.sha3();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Key Server set
|
||||||
|
pub trait KeyServerSet: Send + Sync {
|
||||||
|
/// Get set of configured key servers
|
||||||
|
fn get(&self) -> BTreeMap<Public, SocketAddr>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// On-chain Key Server set implementation.
|
||||||
|
pub struct OnChainKeyServerSet {
|
||||||
|
/// Cached on-chain contract.
|
||||||
|
contract: Mutex<CachedContract>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cached on-chain Key Server set contract.
|
||||||
|
struct CachedContract {
|
||||||
|
/// Blockchain client.
|
||||||
|
client: Weak<Client>,
|
||||||
|
/// Contract address.
|
||||||
|
contract_addr: Option<Address>,
|
||||||
|
/// Active set of key servers.
|
||||||
|
key_servers: BTreeMap<Public, SocketAddr>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OnChainKeyServerSet {
|
||||||
|
pub fn new(client: &Arc<Client>, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Arc<Self>, Error> {
|
||||||
|
let mut cached_contract = CachedContract::new(client, key_servers)?;
|
||||||
|
let key_server_contract_address = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
|
||||||
|
// only initialize from contract if it is installed. otherwise - use default nodes
|
||||||
|
// once the contract is installed, all default nodes are lost (if not in the contract' set)
|
||||||
|
if key_server_contract_address.is_some() {
|
||||||
|
cached_contract.read_from_registry(&*client, key_server_contract_address);
|
||||||
|
}
|
||||||
|
|
||||||
|
let key_server_set = Arc::new(OnChainKeyServerSet {
|
||||||
|
contract: Mutex::new(cached_contract),
|
||||||
|
});
|
||||||
|
client.add_notify(key_server_set.clone());
|
||||||
|
Ok(key_server_set)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl KeyServerSet for OnChainKeyServerSet {
|
||||||
|
fn get(&self) -> BTreeMap<Public, SocketAddr> {
|
||||||
|
self.contract.lock().get()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChainNotify for OnChainKeyServerSet {
|
||||||
|
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
|
||||||
|
if !enacted.is_empty() || !retracted.is_empty() {
|
||||||
|
self.contract.lock().update(enacted, retracted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CachedContract {
|
||||||
|
pub fn new(client: &Arc<Client>, key_servers: BTreeMap<Public, NodeAddress>) -> Result<Self, Error> {
|
||||||
|
Ok(CachedContract {
|
||||||
|
client: Arc::downgrade(client),
|
||||||
|
contract_addr: None,
|
||||||
|
key_servers: key_servers.into_iter()
|
||||||
|
.map(|(p, addr)| {
|
||||||
|
let addr = format!("{}:{}", addr.address, addr.port).parse()
|
||||||
|
.map_err(|err| Error::Internal(format!("error parsing node address: {}", err)))?;
|
||||||
|
Ok((p, addr))
|
||||||
|
})
|
||||||
|
.collect::<Result<BTreeMap<_, _>, Error>>()?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update(&mut self, enacted: Vec<H256>, retracted: Vec<H256>) {
|
||||||
|
if let Some(client) = self.client.upgrade() {
|
||||||
|
let new_contract_addr = client.registry_address(KEY_SERVER_SET_CONTRACT_REGISTRY_NAME.to_owned());
|
||||||
|
|
||||||
|
// new contract installed => read nodes set from the contract
|
||||||
|
if self.contract_addr.as_ref() != new_contract_addr.as_ref() {
|
||||||
|
self.read_from_registry(&*client, new_contract_addr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check for contract events
|
||||||
|
let is_set_changed = self.contract_addr.is_some() && enacted.iter()
|
||||||
|
.chain(retracted.iter())
|
||||||
|
.any(|block_hash| !client.logs(Filter {
|
||||||
|
from_block: BlockId::Hash(block_hash.clone()),
|
||||||
|
to_block: BlockId::Hash(block_hash.clone()),
|
||||||
|
address: self.contract_addr.clone().map(|a| vec![a]),
|
||||||
|
topics: vec![
|
||||||
|
Some(vec![*ADDED_EVENT_NAME_HASH, *REMOVED_EVENT_NAME_HASH]),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
],
|
||||||
|
limit: Some(1),
|
||||||
|
}).is_empty());
|
||||||
|
// to simplify processing - just re-read the whole nodes set from the contract
|
||||||
|
if is_set_changed {
|
||||||
|
self.read_from_registry(&*client, new_contract_addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self) -> BTreeMap<Public, SocketAddr> {
|
||||||
|
self.key_servers.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_from_registry(&mut self, client: &Client, new_contract_address: Option<Address>) {
|
||||||
|
self.key_servers = new_contract_address.map(|contract_addr| {
|
||||||
|
trace!(target: "secretstore", "Configuring for key server set contract from {}", contract_addr);
|
||||||
|
|
||||||
|
KeyServerSetContract::new(contract_addr)
|
||||||
|
})
|
||||||
|
.map(|contract| {
|
||||||
|
let mut key_servers = BTreeMap::new();
|
||||||
|
let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d));
|
||||||
|
let key_servers_list = contract.get_key_servers(do_call).wait()
|
||||||
|
.map_err(|err| { trace!(target: "secretstore", "Error {} reading list of key servers from contract", err); err })
|
||||||
|
.unwrap_or_default();
|
||||||
|
for key_server in key_servers_list {
|
||||||
|
let key_server_public = contract.get_key_server_public(
|
||||||
|
|a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait()
|
||||||
|
.and_then(|p| if p.len() == 64 { Ok(Public::from_slice(&p)) } else { Err(format!("Invalid public length {}", p.len())) });
|
||||||
|
let key_server_ip = contract.get_key_server_address(
|
||||||
|
|a, d| future::done(client.call_contract(BlockId::Latest, a, d)), key_server).wait()
|
||||||
|
.and_then(|a| a.parse().map_err(|e| format!("Invalid ip address: {}", e)));
|
||||||
|
|
||||||
|
// only add successfully parsed nodes
|
||||||
|
match (key_server_public, key_server_ip) {
|
||||||
|
(Ok(key_server_public), Ok(key_server_ip)) => { key_servers.insert(key_server_public, key_server_ip); },
|
||||||
|
(Err(public_err), _) => warn!(target: "secretstore_net", "received invalid public from key server set contract: {}", public_err),
|
||||||
|
(_, Err(ip_err)) => warn!(target: "secretstore_net", "received invalid IP from key server set contract: {}", ip_err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
key_servers
|
||||||
|
})
|
||||||
|
.unwrap_or_default();
|
||||||
|
self.contract_addr = new_contract_address;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub mod tests {
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use ethkey::Public;
|
||||||
|
use super::KeyServerSet;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct MapKeyServerSet {
|
||||||
|
nodes: BTreeMap<Public, SocketAddr>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MapKeyServerSet {
|
||||||
|
pub fn new(nodes: BTreeMap<Public, SocketAddr>) -> Self {
|
||||||
|
MapKeyServerSet {
|
||||||
|
nodes: nodes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl KeyServerSet for MapKeyServerSet {
|
||||||
|
fn get(&self) -> BTreeMap<Public, SocketAddr> {
|
||||||
|
self.nodes.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -21,6 +21,8 @@ extern crate log;
|
|||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate futures_cpupool;
|
extern crate futures_cpupool;
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
|
#[macro_use]
|
||||||
|
extern crate lazy_static;
|
||||||
extern crate parking_lot;
|
extern crate parking_lot;
|
||||||
extern crate rustc_hex;
|
extern crate rustc_hex;
|
||||||
extern crate serde;
|
extern crate serde;
|
||||||
@ -56,6 +58,7 @@ mod http_listener;
|
|||||||
mod key_server;
|
mod key_server;
|
||||||
mod key_storage;
|
mod key_storage;
|
||||||
mod serialization;
|
mod serialization;
|
||||||
|
mod key_server_set;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use ethcore::client::Client;
|
use ethcore::client::Client;
|
||||||
@ -68,9 +71,10 @@ pub use traits::{KeyServer};
|
|||||||
pub fn start(client: Arc<Client>, config: ServiceConfiguration) -> Result<Box<KeyServer>, Error> {
|
pub fn start(client: Arc<Client>, config: ServiceConfiguration) -> Result<Box<KeyServer>, Error> {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
let acl_storage = Arc::new(acl_storage::OnChainAclStorage::new(client));
|
let acl_storage = acl_storage::OnChainAclStorage::new(&client);
|
||||||
|
let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, config.cluster_config.nodes.clone())?;
|
||||||
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?);
|
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?);
|
||||||
let key_server = key_server::KeyServerImpl::new(&config.cluster_config, acl_storage, key_storage)?;
|
let key_server = key_server::KeyServerImpl::new(&config.cluster_config, key_server_set, acl_storage, key_storage)?;
|
||||||
let listener = http_listener::KeyServerHttpListener::start(&config.listener_address, key_server)?;
|
let listener = http_listener::KeyServerHttpListener::start(&config.listener_address, key_server)?;
|
||||||
Ok(Box::new(listener))
|
Ok(Box::new(listener))
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user