From ea9c8a174c6f4fb313213eaefbe6bebb80d71204 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 22 Nov 2017 17:31:34 +0300 Subject: [PATCH] SecretStore: started adding tests for ServiceContractListener --- .../res/secretstore_service.json | 7 +- secret_store/src/key_server.rs | 9 +- .../src/key_server_cluster/cluster.rs | 36 +- secret_store/src/key_server_cluster/mod.rs | 2 + secret_store/src/key_storage.rs | 1 + secret_store/src/lib.rs | 7 +- secret_store/src/listener/http_listener.rs | 2 +- secret_store/src/listener/mod.rs | 17 + secret_store/src/listener/service_contract.rs | 306 +++++++++++++++ .../src/listener/service_contract_listener.rs | 363 +++++++++++------- 10 files changed, 609 insertions(+), 141 deletions(-) create mode 100644 secret_store/src/listener/service_contract.rs diff --git a/ethcore/native_contracts/res/secretstore_service.json b/ethcore/native_contracts/res/secretstore_service.json index 48d9adaa7..fecf5ca14 100644 --- a/ethcore/native_contracts/res/secretstore_service.json +++ b/ethcore/native_contracts/res/secretstore_service.json @@ -1 +1,6 @@ -[{"constant":true,"inputs":[],"name":"serverKeyGenerationRequestsCount","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"authority","type":"address"},{"name":"index","type":"uint256"}],"name":"getServerKeyGenerationRequest","outputs":[{"name":"","type":"bytes32"},{"name":"","type":"uint256"},{"name":"","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"threshold","type":"uint256"}],"name":"generateServerKey","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[],"name":"serverKeyGenerationFee","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[],"name":"drain","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":true,"name":"threshold","type":"uint256"}],"name":"ServerKeyRequested","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":false,"name":"serverKeyPublic","type":"bytes"}],"name":"ServerKeyGenerated","type":"event"}] \ No newline at end of file +[ + {"constant":true,"inputs":[],"name":"serverKeyGenerationRequestsCount","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"}, + {"constant":true,"inputs":[{"name":"authority","type":"address"},{"name":"index","type":"uint256"}],"name":"getServerKeyGenerationRequest","outputs":[{"name":"","type":"bytes32"},{"name":"","type":"uint256"},{"name":"","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"}, + {"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"}, + {"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":true,"name":"threshold","type":"uint256"}],"name":"ServerKeyRequested","type":"event"} +] \ No newline at end of file diff --git a/secret_store/src/key_server.rs b/secret_store/src/key_server.rs index 6a68abe46..2908afd64 100644 --- a/secret_store/src/key_server.rs +++ b/secret_store/src/key_server.rs @@ -204,6 +204,7 @@ pub mod tests { use std::collections::BTreeSet; use std::time; use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; use std::net::SocketAddr; use std::collections::BTreeMap; use ethcrypto; @@ -219,7 +220,10 @@ pub mod tests { use traits::{AdminSessionsServer, ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer}; use super::KeyServerImpl; - pub struct DummyKeyServer; + #[derive(Default)] + pub struct DummyKeyServer { + pub generation_requests_count: AtomicUsize, + } impl KeyServer for DummyKeyServer {} @@ -231,7 +235,8 @@ pub mod tests { impl ServerKeyGenerator for DummyKeyServer { fn generate_key(&self, _key_id: &ServerKeyId, _signature: &RequestSignature, _threshold: usize) -> Result { - unimplemented!() + self.generation_requests_count.fetch_add(1, Ordering::Relaxed); + Err(Error::Internal("test error".into())) } } diff --git a/secret_store/src/key_server_cluster/cluster.rs b/secret_store/src/key_server_cluster/cluster.rs index c8f24034a..ae352a9b4 100644 --- a/secret_store/src/key_server_cluster/cluster.rs +++ b/secret_store/src/key_server_cluster/cluster.rs @@ -1014,12 +1014,21 @@ pub mod tests { use std::collections::{BTreeSet, VecDeque}; use parking_lot::Mutex; use tokio_core::reactor::Core; - use ethkey::{Random, Generator, Public, sign}; - use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet, PlainNodeKeyPair}; + use bigint::hash::H256; + use ethkey::{Random, Generator, Public, Signature, sign}; + use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet, PlainNodeKeyPair, KeyStorage}; use key_server_cluster::message::Message; - use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration}; - use key_server_cluster::cluster_sessions::ClusterSession; - use key_server_cluster::generation_session::SessionState as GenerationSessionState; + use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration, ClusterClient, ClusterState}; + use key_server_cluster::cluster_sessions::{ClusterSession, AdminSession, ClusterSessionsListener}; + use key_server_cluster::generation_session::{SessionImpl as GenerationSession, SessionState as GenerationSessionState}; + use key_server_cluster::decryption_session::{SessionImpl as DecryptionSession}; + use key_server_cluster::encryption_session::{SessionImpl as EncryptionSession}; + use key_server_cluster::signing_session::{SessionImpl as SigningSession}; + use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession, + IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction}; + + #[derive(Default)] + pub struct DummyClusterClient; #[derive(Debug)] pub struct DummyCluster { @@ -1033,6 +1042,23 @@ pub mod tests { messages: VecDeque<(NodeId, Message)>, } + impl ClusterClient for DummyClusterClient { + fn cluster_state(&self) -> ClusterState { unimplemented!() } + fn new_generation_session(&self, session_id: SessionId, author: Public, threshold: usize) -> Result, Error> { unimplemented!() } + fn new_encryption_session(&self, session_id: SessionId, requestor_signature: Signature, common_point: Public, encrypted_point: Public) -> Result, Error> { unimplemented!() } + fn new_decryption_session(&self, session_id: SessionId, requestor_signature: Signature, version: Option, is_shadow_decryption: bool) -> Result, Error> { unimplemented!() } + fn new_signing_session(&self, session_id: SessionId, requestor_signature: Signature, version: Option, message_hash: H256) -> Result, Error> { unimplemented!() } + fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result>, Error> { unimplemented!() } + fn new_servers_set_change_session(&self, session_id: Option, new_nodes_set: BTreeSet, old_set_signature: Signature, new_set_signature: Signature) -> Result, Error> { unimplemented!() } + + fn add_generation_listener(&self, listener: Arc>) {} + + fn make_faulty_generation_sessions(&self) { unimplemented!() } + fn generation_session(&self, session_id: &SessionId) -> Option> { unimplemented!() } + fn connect(&self) { unimplemented!() } + fn key_storage(&self) -> Arc { unimplemented!() } + } + impl DummyCluster { pub fn new(id: NodeId) -> Self { DummyCluster { diff --git a/secret_store/src/key_server_cluster/mod.rs b/secret_store/src/key_server_cluster/mod.rs index bea212460..0692a39ad 100644 --- a/secret_store/src/key_server_cluster/mod.rs +++ b/secret_store/src/key_server_cluster/mod.rs @@ -28,6 +28,8 @@ pub use super::key_server_set::KeyServerSet; pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash}; pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient}; pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener}; +#[cfg(test)] +pub use self::cluster::tests::DummyClusterClient; #[cfg(test)] pub use super::node_key_pair::PlainNodeKeyPair; diff --git a/secret_store/src/key_storage.rs b/secret_store/src/key_storage.rs index 3b1e0e6f2..da698e5e0 100644 --- a/secret_store/src/key_storage.rs +++ b/secret_store/src/key_storage.rs @@ -35,6 +35,7 @@ type CurrentSerializableDocumentKeyVersion = SerializableDocumentKeyShareVersion /// Encrypted key share, stored by key storage on the single key server. #[derive(Debug, Clone, PartialEq)] +#[cfg_attr(test, derive(Default))] pub struct DocumentKeyShare { /// Author of the entry. pub author: Public, diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index a0af4f072..3bf6dde06 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -85,15 +85,16 @@ pub fn start(client: Arc, sync: Arc, self_key_pair: Arc Some(listener::http_listener::KeyServerHttpListener::start(listener_address, key_server.clone())?), None => None, }; + let service_contract = Arc::new(listener::service_contract::OnChainServiceContract::new(&client, &sync, self_key_pair.clone())); let contract_listener = listener::service_contract_listener::ServiceContractListener::new(listener::service_contract_listener::ServiceContractListenerParams { - client: Arc::downgrade(&client), - sync: Arc::downgrade(&sync), + contract: service_contract, key_server: key_server.clone(), self_key_pair: self_key_pair, - key_servers_set: key_server_set, + key_server_set: key_server_set, cluster: cluster, key_storage: key_storage, }); + client.add_notify(contract_listener.clone()); let listener = listener::Listener::new(key_server, http_listener, Some(contract_listener)); Ok(Box::new(listener)) } diff --git a/secret_store/src/listener/http_listener.rs b/secret_store/src/listener/http_listener.rs index 3350b51d4..8b1779843 100644 --- a/secret_store/src/listener/http_listener.rs +++ b/secret_store/src/listener/http_listener.rs @@ -334,7 +334,7 @@ mod tests { #[test] fn http_listener_successfully_drops() { - let key_server = Arc::new(DummyKeyServer); + let key_server = Arc::new(DummyKeyServer::default()); let address = NodeAddress { address: "127.0.0.1".into(), port: 9000 }; let listener = KeyServerHttpListener::start(address, key_server).unwrap(); drop(listener); diff --git a/secret_store/src/listener/mod.rs b/secret_store/src/listener/mod.rs index 1ebe4aa47..403eaf549 100644 --- a/secret_store/src/listener/mod.rs +++ b/secret_store/src/listener/mod.rs @@ -1,4 +1,21 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + pub mod http_listener; +pub mod service_contract; pub mod service_contract_listener; use std::collections::BTreeSet; diff --git a/secret_store/src/listener/service_contract.rs b/secret_store/src/listener/service_contract.rs new file mode 100644 index 000000000..30ad8937d --- /dev/null +++ b/secret_store/src/listener/service_contract.rs @@ -0,0 +1,306 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::collections::{VecDeque, HashSet}; +use std::sync::{Arc, Weak}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; +use futures::{future, Future}; +use parking_lot::{RwLock, Mutex, Condvar}; +use ethcore::filter::Filter; +use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; +use ethkey::{Random, Generator, Public, Signature, sign, public_to_address}; +use ethsync::SyncProvider; +use native_contracts::SecretStoreService; +use bytes::Bytes; +use hash::keccak; +use bigint::hash::H256; +use bigint::prelude::U256; +use util::Address; +use key_server_set::KeyServerSet; +use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession}; +use key_server_cluster::generation_session::SessionImpl as GenerationSession; +use key_storage::KeyStorage; +use listener::service_contract_listener::{ServiceTask, ServiceContractListenerParams}; +use {ServerKeyId, NodeKeyPair, KeyServer}; + +/// Name of the SecretStore contract in the registry. +const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service"; + +/// Key server has been added to the set. +const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32,uint256)"; + +lazy_static! { + static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME); +} + +/// Service contract trait. +pub trait ServiceContract: Send + Sync { + /// Update contract. + fn update(&self); + /// Is contract installed && up-to-date (i.e. chain is synced)? + fn is_actual(&self) -> bool; + /// Read contract logs from given blocks. Returns topics of every entry. + fn read_logs(&self, first_block: H256, last_block: H256) -> Box>>; + /// Publish generated key. + fn read_pending_requests(&self) -> Box>; + /// Publish server key. + fn publish_server_key(&self, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String>; +} + +/// On-chain service contract. +pub struct OnChainServiceContract { + /// Blockchain client. + client: Weak, + /// Sync provider. + sync: Weak, + /// This node key pair. + self_key_pair: Arc, + /// Contract. + contract: RwLock>, +} + +/// Pending requests iterator. +struct PendingRequestsIterator { + /// Blockchain client. + client: Arc, + /// Contract. + contract: Arc, + /// This node key pair. + self_key_pair: Arc, + /// Current request index. + index: U256, + /// Requests length. + length: U256, +} + +impl OnChainServiceContract { + /// Create new on-chain service contract. + pub fn new(client: &Arc, sync: &Arc, self_key_pair: Arc) -> Self { + let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) + .map(|address| { + trace!(target: "secretstore", "{}: installing service contract from address {}", + self_key_pair.public(), address); + address + }) + .unwrap_or_default(); + + OnChainServiceContract { + client: Arc::downgrade(client), + sync: Arc::downgrade(sync), + self_key_pair: self_key_pair, + contract: RwLock::new(Arc::new(SecretStoreService::new(contract_addr))), + } + } +} + +impl ServiceContract for OnChainServiceContract { + fn update(&self) { + if let (Some(client), Some(sync)) = (self.client.upgrade(), self.sync.upgrade()) { + // do nothing until synced + if sync.status().is_syncing(client.queue_info()) { + return; + } + + // update contract address from registry + let service_contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()).unwrap_or_default(); + if self.contract.read().address != service_contract_addr { + trace!(target: "secretstore", "{}: installing service contract from address {}", + self.self_key_pair.public(), service_contract_addr); + *self.contract.write() = Arc::new(SecretStoreService::new(service_contract_addr)); + } + } + } + + fn is_actual(&self) -> bool { + self.contract.read().address != Default::default() + && match (self.client.upgrade(), self.sync.upgrade()) { + (Some(client), Some(sync)) => !sync.status().is_syncing(client.queue_info()), + _ => false, + } + } + + fn read_logs(&self, first_block: H256, last_block: H256) -> Box>> { + let client = match self.client.upgrade() { + Some(client) => client, + None => { + warn!(target: "secretstore", "{}: client is offline during read_pending_requests call", + self.self_key_pair.public()); + return Box::new(::std::iter::empty()); + }, + }; + + // read server key generation requests + let contract_address = self.contract.read().address.clone(); + let request_logs = client.logs(Filter { + from_block: BlockId::Hash(first_block), + to_block: BlockId::Hash(last_block), + address: Some(vec![contract_address]), + topics: vec![ + Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]), + None, + None, + None, + ], + limit: None, + }); + + Box::new(request_logs.into_iter().map(|log| log.entry.topics)) + } + + fn read_pending_requests(&self) -> Box> { + let client = match self.client.upgrade() { + Some(client) => client, + None => { + warn!(target: "secretstore", "{}: client is offline during read_pending_requests call", + self.self_key_pair.public()); + return Box::new(::std::iter::empty()); + }, + }; + + let contract = self.contract.read(); + let length = match contract.address == Default::default() { + true => 0.into(), + false => { + let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d)); + contract.server_key_generation_requests_count(&do_call).wait() + .map_err(|error| { + warn!(target: "secretstore", "{}: call to server_key_generation_requests_count failed: {}", + self.self_key_pair.public(), error); + error + }) + .unwrap_or_default() + }, + }; + + Box::new(PendingRequestsIterator { + client: client, + contract: contract.clone(), + self_key_pair: self.self_key_pair.clone(), + index: 0.into(), + length: length, + }) + } + + fn publish_server_key(&self, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> { + let server_key_hash = keccak(server_key); + let signed_server_key = self.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?; + let signed_server_key: Signature = signed_server_key.into_electrum().into(); + let contract = self.contract.read(); + let transaction_data = contract.encode_server_key_generated_input(server_key_id.clone(), + server_key.to_vec(), + signed_server_key.v(), + signed_server_key.r().into(), + signed_server_key.s().into() + )?; + + if contract.address != Default::default() { + if let Some(client) = self.client.upgrade() { + client.transact_contract( + contract.address.clone(), + transaction_data + ).map_err(|e| format!("{}", e))?; + } // else we will read this in the next refresh cycle + } + + Ok(()) + } +} + +impl Iterator for PendingRequestsIterator { + type Item = (bool, ServiceTask); + + fn next(&mut self) -> Option<(bool, ServiceTask)> { + if self.index >= self.length { + return None; + } + self.index = self.index + 1.into(); + + let do_call = |a, d| future::done(self.client.call_contract(BlockId::Latest, a, d)); + let key_generation_request = self.contract.get_server_key_generation_request(&do_call, + public_to_address(self.self_key_pair.public()), + (self.index - 1.into()).clone().into()).wait(); + let (server_key_id, threshold, is_confirmed) = match key_generation_request { + Ok((server_key_id, threshold, is_confirmed)) => { + (server_key_id, threshold, is_confirmed) + }, + Err(error) => { + warn!(target: "secretstore", "{}: call to get_server_key_generation_request failed: {}", + self.self_key_pair.public(), error); + return None; + }, + }; + + Some((is_confirmed, ServiceTask::GenerateServerKey(server_key_id, threshold.into()))) + } +} + +#[cfg(test)] +pub mod tests { + use std::collections::{VecDeque, HashSet}; + use std::sync::{Arc, Weak}; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::thread; + use futures::{future, Future}; + use parking_lot::{RwLock, Mutex, Condvar}; + use ethcore::filter::Filter; + use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; + use ethkey::{Random, Generator, Public, Signature, sign, public_to_address}; + use ethsync::SyncProvider; + use native_contracts::SecretStoreService; + use bytes::Bytes; + use hash::keccak; + use bigint::hash::H256; + use bigint::prelude::U256; + use util::Address; + use key_server_set::KeyServerSet; + use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession}; + use key_server_cluster::generation_session::SessionImpl as GenerationSession; + use key_storage::KeyStorage; + use listener::service_contract_listener::{ServiceTask, ServiceContractListenerParams}; + use {ServerKeyId, NodeKeyPair, KeyServer}; + use super::ServiceContract; + + #[derive(Default)] + pub struct DummyServiceContract { + pub is_actual: bool, + pub logs: Vec>, + pub pending_requests: Vec<(bool, ServiceTask)>, + pub published_keys: Mutex>, + } + + impl ServiceContract for DummyServiceContract { + fn update(&self) { + } + + fn is_actual(&self) -> bool { + self.is_actual + } + + fn read_logs(&self, first_block: H256, last_block: H256) -> Box>> { + Box::new(self.logs.clone().into_iter()) + } + + fn read_pending_requests(&self) -> Box> { + Box::new(self.pending_requests.clone().into_iter()) + } + + fn publish_server_key(&self, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> { + self.published_keys.lock().push((server_key_id.clone(), server_key.clone())); + Ok(()) + } + } +} diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index 62a2a387a..48884a73c 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -34,14 +34,9 @@ use key_server_set::KeyServerSet; use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession}; use key_server_cluster::generation_session::SessionImpl as GenerationSession; use key_storage::KeyStorage; +use listener::service_contract::ServiceContract; use {ServerKeyId, NodeKeyPair, KeyServer}; -/// Name of the SecretStore contract in the registry. -const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service"; - -/// Key server has been added to the set. -const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32,uint256)"; - /// Retry interval (in blocks). Every RETRY_INTEVAL_BLOCKS blocks each KeyServer reads pending requests from /// service contract && tries to re-execute. The reason to have this mechanism is primarily because keys /// servers set change takes a lot of time + there could be some races, when blocks are coming to different @@ -52,10 +47,6 @@ const RETRY_INTEVAL_BLOCKS: usize = 30; /// pending requests have failed, then most probably other will fail too. const MAX_FAILED_RETRY_REQUESTS: usize = 1; -lazy_static! { - static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME); -} - /// SecretStore <-> Authority connector responsible for: /// 1. listening for new requests on SecretStore contract /// 2. redirecting requests to key server @@ -69,16 +60,14 @@ pub struct ServiceContractListener { /// Service contract listener parameters. pub struct ServiceContractListenerParams { - /// Blockchain client. - pub client: Weak, - /// Sync provider. - pub sync: Weak, + /// Service contract. + pub contract: Arc, /// Key server reference. pub key_server: Arc, /// This node key pair. pub self_key_pair: Arc, /// Key servers set. - pub key_servers_set: Arc, + pub key_server_set: Arc, /// Cluster reference. pub cluster: Arc, /// Key storage reference. @@ -91,12 +80,19 @@ struct ServiceContractListenerData { pub last_retry: AtomicUsize, /// Retry-related data. pub retry_data: Mutex, - /// Contract. - pub contract: RwLock, /// Service tasks queue. pub tasks_queue: Arc, - /// Cluster params. - pub params: ServiceContractListenerParams, + /// Service contract. + pub contract: Arc, + /// Key server reference. + pub key_server: Arc, + /// This node key pair. + pub self_key_pair: Arc, + /// Key servers set. + pub key_server_set: Arc, + /// Key storage reference. + pub key_storage: Arc, + } /// Retry-related data. @@ -115,8 +111,8 @@ struct TasksQueue { } /// Service task. -#[derive(Debug)] -enum ServiceTask { +#[derive(Debug, Clone, PartialEq)] +pub enum ServiceTask { /// Retry all 'stalled' tasks. Retry, /// Generate server key (server_key_id, threshold). @@ -130,44 +126,58 @@ enum ServiceTask { impl ServiceContractListener { /// Create new service contract listener. pub fn new(params: ServiceContractListenerParams) -> Arc { - let client = params.client.upgrade().expect("client is active in constructor; qed"); - let sync = params.sync.upgrade().expect("sync is active in constructor; qed"); - let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) - .map(|address| { - trace!(target: "secretstore", "{}: installing service contract from address {}", - params.self_key_pair.public(), address); - address - }) - .unwrap_or_default(); - - let is_syncing = sync.status().is_syncing(client.queue_info()); let data = Arc::new(ServiceContractListenerData { last_retry: AtomicUsize::new(0), retry_data: Default::default(), - contract: RwLock::new(SecretStoreService::new(contract_addr)), tasks_queue: Arc::new(TasksQueue::new()), - params: params, + contract: params.contract, + key_server: params.key_server, + self_key_pair: params.self_key_pair, + key_server_set: params.key_server_set, + key_storage: params.key_storage, }); + data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); - // retry on restart - if !is_syncing { - data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); - } - - let service_thread_data = data.clone(); - let service_handle = thread::spawn(move || Self::run_service_thread(service_thread_data)); + // we are not starting thread when in test mode + let service_handle = if cfg!(test) { + None + } else { + let service_thread_data = data.clone(); + Some(thread::spawn(move || Self::run_service_thread(service_thread_data))) + }; let contract = Arc::new(ServiceContractListener { data: data, - service_handle: Some(service_handle), + service_handle: service_handle, }); - client.add_notify(contract.clone()); - contract.data.params.cluster.add_generation_listener(contract.clone()); + params.cluster.add_generation_listener(contract.clone()); contract } /// Process incoming events of service contract. - fn process_service_contract_events(&self, client: &Client, service_contract: Address, blocks: Vec) { - debug_assert!(!blocks.is_empty()); + fn process_service_contract_events(&self, first: H256, last: H256) { + self.data.tasks_queue.push(self.data.contract.read_logs(first, last) + .filter_map(|topics| match topics.len() { + // when key is already generated && we have this key + 3 if self.data.key_storage.get(&topics[1]).map(|k| k.is_some()).unwrap_or_default() => { + Some(ServiceTask::RestoreServerKey( + topics[1], + )) + } + // when key is not yet generated && this node should be master of this key generation session + 3 if is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &topics[1]) => { + Some(ServiceTask::GenerateServerKey( + topics[1], + topics[2], + )) + }, + 3 => None, + l @ _ => { + warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l); + None + }, + })); + +/* debug_assert!(!blocks.is_empty()); // read server key generation requests let request_logs = client.logs(Filter { @@ -187,13 +197,13 @@ impl ServiceContractListener { self.data.tasks_queue.push(request_logs.into_iter() .filter_map(|r| match r.entry.topics.len() { // when key is already generated && we have this key - 3 if self.data.params.key_storage.get(&r.entry.topics[1]).map(|k| k.is_some()).unwrap_or_default() => { + 3 if self.data.key_storage.get(&r.entry.topics[1]).map(|k| k.is_some()).unwrap_or_default() => { Some(ServiceTask::RestoreServerKey( r.entry.topics[1], )) } // when key is not yet generated && this node should be master of this key generation session - 3 if is_processed_by_this_key_server(&*self.data.params.key_servers_set, &*self.data.params.self_key_pair, &r.entry.topics[1]) => { + 3 if is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &r.entry.topics[1]) => { Some(ServiceTask::GenerateServerKey( r.entry.topics[1], r.entry.topics[2], @@ -204,14 +214,14 @@ impl ServiceContractListener { warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l); None }, - })); + }));*/ } /// Service thread procedure. fn run_service_thread(data: Arc) { loop { let task = data.tasks_queue.wait(); - trace!(target: "secretstore", "{}: processing {:?} task", data.params.self_key_pair.public(), task); + trace!(target: "secretstore", "{}: processing {:?} task", data.self_key_pair.public(), task); match task { ServiceTask::Shutdown => break, @@ -231,13 +241,13 @@ impl ServiceContractListener { .map(|processed_requests| { if processed_requests != 0 { trace!(target: "secretstore", "{}: successfully retried {} pending requests", - data.params.self_key_pair.public(), processed_requests); + data.self_key_pair.public(), processed_requests); } () }) .map_err(|error| { warn!(target: "secretstore", "{}: retrying pending requests has failed with: {}", - data.params.self_key_pair.public(), error); + data.self_key_pair.public(), error); error }), ServiceTask::RestoreServerKey(server_key_id) => { @@ -246,27 +256,27 @@ impl ServiceContractListener { .and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) .map(|_| { trace!(target: "secretstore", "{}: processed RestoreServerKey({}) request", - data.params.self_key_pair.public(), server_key_id); + data.self_key_pair.public(), server_key_id); () }) .map_err(|error| { warn!(target: "secretstore", "{}: failed to process RestoreServerKey({}) request with: {}", - data.params.self_key_pair.public(), server_key_id, error); + data.self_key_pair.public(), server_key_id, error); error }) - } + }, ServiceTask::GenerateServerKey(server_key_id, threshold) => { data.retry_data.lock().generated_keys.insert(server_key_id.clone()); Self::generate_server_key(&data, &server_key_id, &threshold) .and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) .map(|_| { trace!(target: "secretstore", "{}: processed GenerateServerKey({}, {}) request", - data.params.self_key_pair.public(), server_key_id, threshold); + data.self_key_pair.public(), server_key_id, threshold); () }) .map_err(|error| { warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}, {}) request with: {}", - data.params.self_key_pair.public(), server_key_id, threshold, error); + data.self_key_pair.public(), server_key_id, threshold, error); error }) }, @@ -276,49 +286,33 @@ impl ServiceContractListener { /// Retry processing pending requests. fn retry_pending_requests(data: &Arc) -> Result { - let client = data.params.client.upgrade().ok_or("client is required".to_owned())?; - let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default()); - let contract = data.contract.read(); - - // it is only possible when contract address is set - if contract.address == Default::default() { - return Ok(0); - } - - let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d)); - let generate_server_key_requests_count = contract.server_key_generation_requests_count(&do_call).wait()?; - let mut generate_server_key_request_index = 0.into(); let mut failed_requests = 0; let mut processed_requests = 0; - loop { - if generate_server_key_request_index >= generate_server_key_requests_count { - break; - } - - // read request from the contract - let (server_key_id, threshold, is_confirmed) = contract.get_server_key_generation_request(&do_call, - public_to_address(data.params.self_key_pair.public()), - generate_server_key_request_index).wait()?; - generate_server_key_request_index = generate_server_key_request_index + 1.into(); - + let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default()); + for (is_confirmed, task) in data.contract.read_pending_requests() { // only process requests, which we haven't confirmed yet if is_confirmed { continue; } - // only process request, which haven't been processed recently - // there could be a lag when we've just generated server key && retrying on the same block - // (or before our tx is mined) - state is not updated yet - if retry_data.generated_keys.contains(&server_key_id){ - continue; - } + let request_result = match task { + ServiceTask::GenerateServerKey(server_key_id, threshold) => { + // only process request, which haven't been processed recently + // there could be a lag when we've just generated server key && retrying on the same block + // (or before our tx is mined) - state is not updated yet + if retry_data.generated_keys.contains(&server_key_id){ + continue; + } - // process request - let is_own_request = is_processed_by_this_key_server(&*data.params.key_servers_set, &*data.params.self_key_pair, &server_key_id); - let request_result = Self::process_service_task(data, match is_own_request { - true => ServiceTask::GenerateServerKey(server_key_id, threshold.into()), - false => ServiceTask::RestoreServerKey(server_key_id), - }); + // process request + let is_own_request = is_processed_by_this_key_server(&*data.key_server_set, &*data.self_key_pair, &server_key_id); + Self::process_service_task(data, match is_own_request { + true => ServiceTask::GenerateServerKey(server_key_id, threshold.into()), + false => ServiceTask::RestoreServerKey(server_key_id), + }) + }, + _ => Err("not supported".into()), + }; // process request result match request_result { @@ -331,6 +325,7 @@ impl ServiceContractListener { }, } } + Ok(processed_requests) } @@ -346,13 +341,13 @@ impl ServiceContractListener { // => this API (server key generation) is not suitable for usage in encryption via contract endpoint let author_key = Random.generate().map_err(|e| format!("{}", e))?; let server_key_id_signature = sign(author_key.secret(), server_key_id).map_err(|e| format!("{}", e))?; - data.params.key_server.generate_key(server_key_id, &server_key_id_signature, threshold_num as usize) + data.key_server.generate_key(server_key_id, &server_key_id_signature, threshold_num as usize) .map_err(Into::into) } /// Restore server key. fn restore_server_key(data: &Arc, server_key_id: &ServerKeyId) -> Result { - data.params.key_storage.get(server_key_id) + data.key_storage.get(server_key_id) .map_err(|e| format!("{}", e)) .and_then(|ks| ks.ok_or("missing key".to_owned())) .map(|ks| ks.public) @@ -360,8 +355,9 @@ impl ServiceContractListener { /// Publish server key. fn publish_server_key(data: &Arc, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> { - let server_key_hash = keccak(server_key); - let signed_server_key = data.params.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?; + data.contract.publish_server_key(server_key_id, server_key) + /*let server_key_hash = keccak(server_key); + let signed_server_key = data.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?; let signed_server_key: Signature = signed_server_key.into_electrum().into(); let transaction_data = data.contract.read().encode_server_key_generated_input(server_key_id.clone(), server_key.to_vec(), @@ -372,7 +368,7 @@ impl ServiceContractListener { let contract = data.contract.read(); if contract.address != Default::default() { - if let Some(client) = data.params.client.upgrade() { + if let Some(client) = data.client.upgrade() { client.transact_contract( contract.address.clone(), transaction_data @@ -381,6 +377,7 @@ impl ServiceContractListener { } Ok(()) + unimplemented!()*/ } } @@ -397,33 +394,52 @@ impl Drop for ServiceContractListener { impl ChainNotify for ServiceContractListener { fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, _retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { let enacted_len = enacted.len(); - if enacted_len != 0 { - if let (Some(client), Some(sync)) = (self.data.params.client.upgrade(), self.data.params.sync.upgrade()) { - // do nothing until synced - if sync.status().is_syncing(client.queue_info()) { - return; - } - - // update contract address from registry - if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) { - if self.data.contract.read().address != service_contract_addr { - trace!(target: "secretstore", "{}: installing service contract from address {}", - self.data.params.self_key_pair.public(), service_contract_addr); - *self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone()); - } - - // and process contract events - self.process_service_contract_events(&*client, service_contract_addr, enacted); - } - - // schedule retry if received enough blocks since last retry - // it maybe inaccurate when switching syncing/synced states, but that's ok - if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTEVAL_BLOCKS { - self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); - self.data.last_retry.store(0, Ordering::Relaxed); - } - } + if enacted_len == 0 { + return; } + + if !self.data.contract.is_actual() { + return; + } + + self.data.contract.update(); + self.process_service_contract_events( + enacted.first().expect("TODO").clone(), + enacted.last().expect("TODO").clone()); + + // schedule retry if received enough blocks since last retry + // it maybe inaccurate when switching syncing/synced states, but that's ok + if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTEVAL_BLOCKS { + self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); + self.data.last_retry.store(0, Ordering::Relaxed); + } + + +/* if let (Some(client), Some(sync)) = (self.data.client.upgrade(), self.data.sync.upgrade()) { + // do nothing until synced + if sync.status().is_syncing(client.queue_info()) { + return; + } + + // update contract address from registry + if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) { + if self.data.contract.read().address != service_contract_addr { + trace!(target: "secretstore", "{}: installing service contract from address {}", + self.data.self_key_pair.public(), service_contract_addr); + *self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone()); + } + + // and process contract events + self.process_service_contract_events(&*client, service_contract_addr, enacted); + } + + // schedule retry if received enough blocks since last retry + // it maybe inaccurate when switching syncing/synced states, but that's ok + if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTEVAL_BLOCKS { + self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); + self.data.last_retry.store(0, Ordering::Relaxed); + } + }*/ } } @@ -433,7 +449,7 @@ impl ClusterSessionsListener for ServiceContractListener { // only publish when the session is started by another node // when it is started by this node, it is published from process_service_task - if !is_processed_by_this_key_server(&*self.data.params.key_servers_set, &*self.data.params.self_key_pair, &session.id()) { + if !is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &session.id()) { // by this time sesion must already be completed - either successfully, or not debug_assert!(session.is_finished()); @@ -443,12 +459,12 @@ impl ClusterSessionsListener for ServiceContractListener { .and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key)) .map(|_| { trace!(target: "secretstore", "{}: completed foreign GenerateServerKey({}) request", - self.data.params.self_key_pair.public(), session.id()); + self.data.self_key_pair.public(), session.id()); () }) .map_err(|error| { warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}) request with: {}", - self.data.params.self_key_pair.public(), session.id(), error); + self.data.self_key_pair.public(), session.id(), error); error }); } @@ -493,8 +509,8 @@ impl TasksQueue { } /// Returns true when session, related to `server_key_id` must be started on this KeyServer. -fn is_processed_by_this_key_server(key_servers_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool { - let servers = key_servers_set.get(); +fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool { + let servers = key_server_set.get(); let total_servers_count = servers.len(); if total_servers_count == 0 { return false; @@ -514,10 +530,39 @@ fn is_processed_by_this_key_server(key_servers_set: &KeyServerSet, self_key_pair #[cfg(test)] mod tests { + use std::sync::Arc; + use std::sync::atomic::Ordering; use ethkey::{Random, Generator, KeyPair}; + use listener::service_contract::ServiceContract; + use listener::service_contract::tests::DummyServiceContract; + use key_server_cluster::DummyClusterClient; + use key_server::tests::DummyKeyServer; + use key_storage::tests::DummyKeyStorage; use key_server_set::tests::MapKeyServerSet; use PlainNodeKeyPair; - use super::is_processed_by_this_key_server; + use super::{ServiceTask, ServiceContractListener, ServiceContractListenerParams, is_processed_by_this_key_server}; + + fn make_service_contract_listener(contract: Option>, key_server: Option>) -> Arc { + let contract = contract.unwrap_or_else(|| Arc::new(DummyServiceContract::default())); + let key_server = key_server.unwrap_or_else(|| Arc::new(DummyKeyServer::default())); + let servers_set = Arc::new(MapKeyServerSet::new(vec![ + ("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + ("c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee51ae168fea63dc339a3c58419466ceaeef7f632653266d0e1236431a950cfe52a".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + ("f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9388f7b0f632de8140fe337e62a37f3566500a99934c2231b6cb9fd7584b8e672".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + ].into_iter().collect())); + let self_key_pair = Arc::new(PlainNodeKeyPair::new(KeyPair::from_secret("0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap())); + ServiceContractListener::new(ServiceContractListenerParams { + contract: contract, + key_server: key_server, + self_key_pair: self_key_pair, + key_server_set: servers_set, + cluster: Arc::new(DummyClusterClient::default()), + key_storage: Arc::new(DummyKeyStorage::default()), + }) + } #[test] fn is_not_processed_by_this_key_server_with_zero_servers() { @@ -661,4 +706,64 @@ mod tests { assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, &"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); } + + #[test] + fn no_tasks_scheduled_when_no_contract_events() { + let listener = make_service_contract_listener(None, None); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + } + + #[test] + fn server_key_generation_is_scheduled_when_requested_key_is_unknnown() { + let mut contract = DummyServiceContract::default(); + contract.logs.push(vec![Default::default(), Default::default(), Default::default()]); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::GenerateServerKey(Default::default(), Default::default()))); + } + + #[test] + fn no_new_tasks_scheduled_when_requested_key_is_unknown_and_request_belongs_to_other_key_server() { + let server_key_id = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap(); + let mut contract = DummyServiceContract::default(); + contract.logs.push(vec![Default::default(), server_key_id, Default::default()]); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + } + + #[test] + fn server_key_restore_is_scheduled_when_requested_key_is_knnown() { + let mut contract = DummyServiceContract::default(); + contract.logs.push(vec![Default::default(), Default::default(), Default::default()]); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None); + listener.data.key_storage.insert(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::RestoreServerKey(Default::default()))); + } + + #[test] + fn no_new_tasks_scheduled_when_wrong_number_of_topics_in_log() { + let mut contract = DummyServiceContract::default(); + contract.logs.push(vec![Default::default(), Default::default()]); + let listener = make_service_contract_listener(Some(Arc::new(contract)), None); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + listener.process_service_contract_events(Default::default(), Default::default()); + assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1); + } + + #[test] + fn generation_session_is_created_when_processing_generate_server_key_task() { + let key_server = Arc::new(DummyKeyServer::default()); + let listener = make_service_contract_listener(None, Some(key_server.clone())); + ServiceContractListener::process_service_task(&listener.data, ServiceTask::GenerateServerKey(Default::default(), Default::default())).unwrap_err(); + assert_eq!(key_server.generation_requests_count.load(Ordering::Relaxed), 1); + } }