SecretStore: started adding tests for ServiceContractListener

This commit is contained in:
Svyatoslav Nikolsky 2017-11-22 17:31:34 +03:00
parent 5d6abfe2f5
commit ea9c8a174c
10 changed files with 609 additions and 141 deletions

View File

@ -1 +1,6 @@
[{"constant":true,"inputs":[],"name":"serverKeyGenerationRequestsCount","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"authority","type":"address"},{"name":"index","type":"uint256"}],"name":"getServerKeyGenerationRequest","outputs":[{"name":"","type":"bytes32"},{"name":"","type":"uint256"},{"name":"","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"threshold","type":"uint256"}],"name":"generateServerKey","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[],"name":"serverKeyGenerationFee","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[],"name":"drain","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":true,"name":"threshold","type":"uint256"}],"name":"ServerKeyRequested","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":false,"name":"serverKeyPublic","type":"bytes"}],"name":"ServerKeyGenerated","type":"event"}]
[
{"constant":true,"inputs":[],"name":"serverKeyGenerationRequestsCount","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},
{"constant":true,"inputs":[{"name":"authority","type":"address"},{"name":"index","type":"uint256"}],"name":"getServerKeyGenerationRequest","outputs":[{"name":"","type":"bytes32"},{"name":"","type":"uint256"},{"name":"","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},
{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},
{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":true,"name":"threshold","type":"uint256"}],"name":"ServerKeyRequested","type":"event"}
]

View File

@ -204,6 +204,7 @@ pub mod tests {
use std::collections::BTreeSet;
use std::time;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::net::SocketAddr;
use std::collections::BTreeMap;
use ethcrypto;
@ -219,7 +220,10 @@ pub mod tests {
use traits::{AdminSessionsServer, ServerKeyGenerator, DocumentKeyServer, MessageSigner, KeyServer};
use super::KeyServerImpl;
pub struct DummyKeyServer;
#[derive(Default)]
pub struct DummyKeyServer {
pub generation_requests_count: AtomicUsize,
}
impl KeyServer for DummyKeyServer {}
@ -231,7 +235,8 @@ pub mod tests {
impl ServerKeyGenerator for DummyKeyServer {
fn generate_key(&self, _key_id: &ServerKeyId, _signature: &RequestSignature, _threshold: usize) -> Result<Public, Error> {
unimplemented!()
self.generation_requests_count.fetch_add(1, Ordering::Relaxed);
Err(Error::Internal("test error".into()))
}
}

View File

@ -1014,12 +1014,21 @@ pub mod tests {
use std::collections::{BTreeSet, VecDeque};
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use ethkey::{Random, Generator, Public, sign};
use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet, PlainNodeKeyPair};
use bigint::hash::H256;
use ethkey::{Random, Generator, Public, Signature, sign};
use key_server_cluster::{NodeId, SessionId, Error, DummyAclStorage, DummyKeyStorage, MapKeyServerSet, PlainNodeKeyPair, KeyStorage};
use key_server_cluster::message::Message;
use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration};
use key_server_cluster::cluster_sessions::ClusterSession;
use key_server_cluster::generation_session::SessionState as GenerationSessionState;
use key_server_cluster::cluster::{Cluster, ClusterCore, ClusterConfiguration, ClusterClient, ClusterState};
use key_server_cluster::cluster_sessions::{ClusterSession, AdminSession, ClusterSessionsListener};
use key_server_cluster::generation_session::{SessionImpl as GenerationSession, SessionState as GenerationSessionState};
use key_server_cluster::decryption_session::{SessionImpl as DecryptionSession};
use key_server_cluster::encryption_session::{SessionImpl as EncryptionSession};
use key_server_cluster::signing_session::{SessionImpl as SigningSession};
use key_server_cluster::key_version_negotiation_session::{SessionImpl as KeyVersionNegotiationSession,
IsolatedSessionTransport as KeyVersionNegotiationSessionTransport, ContinueAction};
#[derive(Default)]
pub struct DummyClusterClient;
#[derive(Debug)]
pub struct DummyCluster {
@ -1033,6 +1042,23 @@ pub mod tests {
messages: VecDeque<(NodeId, Message)>,
}
impl ClusterClient for DummyClusterClient {
fn cluster_state(&self) -> ClusterState { unimplemented!() }
fn new_generation_session(&self, session_id: SessionId, author: Public, threshold: usize) -> Result<Arc<GenerationSession>, Error> { unimplemented!() }
fn new_encryption_session(&self, session_id: SessionId, requestor_signature: Signature, common_point: Public, encrypted_point: Public) -> Result<Arc<EncryptionSession>, Error> { unimplemented!() }
fn new_decryption_session(&self, session_id: SessionId, requestor_signature: Signature, version: Option<H256>, is_shadow_decryption: bool) -> Result<Arc<DecryptionSession>, Error> { unimplemented!() }
fn new_signing_session(&self, session_id: SessionId, requestor_signature: Signature, version: Option<H256>, message_hash: H256) -> Result<Arc<SigningSession>, Error> { unimplemented!() }
fn new_key_version_negotiation_session(&self, session_id: SessionId) -> Result<Arc<KeyVersionNegotiationSession<KeyVersionNegotiationSessionTransport>>, Error> { unimplemented!() }
fn new_servers_set_change_session(&self, session_id: Option<SessionId>, new_nodes_set: BTreeSet<NodeId>, old_set_signature: Signature, new_set_signature: Signature) -> Result<Arc<AdminSession>, Error> { unimplemented!() }
fn add_generation_listener(&self, listener: Arc<ClusterSessionsListener<GenerationSession>>) {}
fn make_faulty_generation_sessions(&self) { unimplemented!() }
fn generation_session(&self, session_id: &SessionId) -> Option<Arc<GenerationSession>> { unimplemented!() }
fn connect(&self) { unimplemented!() }
fn key_storage(&self) -> Arc<KeyStorage> { unimplemented!() }
}
impl DummyCluster {
pub fn new(id: NodeId) -> Self {
DummyCluster {

View File

@ -28,6 +28,8 @@ pub use super::key_server_set::KeyServerSet;
pub use super::serialization::{SerializableSignature, SerializableH256, SerializableSecret, SerializablePublic, SerializableMessageHash};
pub use self::cluster::{ClusterCore, ClusterConfiguration, ClusterClient};
pub use self::cluster_sessions::{ClusterSession, ClusterSessionsListener};
#[cfg(test)]
pub use self::cluster::tests::DummyClusterClient;
#[cfg(test)]
pub use super::node_key_pair::PlainNodeKeyPair;

View File

@ -35,6 +35,7 @@ type CurrentSerializableDocumentKeyVersion = SerializableDocumentKeyShareVersion
/// Encrypted key share, stored by key storage on the single key server.
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(test, derive(Default))]
pub struct DocumentKeyShare {
/// Author of the entry.
pub author: Public,

View File

@ -85,15 +85,16 @@ pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, self_key_pair: Arc<No
Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, key_server.clone())?),
None => None,
};
let service_contract = Arc::new(listener::service_contract::OnChainServiceContract::new(&client, &sync, self_key_pair.clone()));
let contract_listener = listener::service_contract_listener::ServiceContractListener::new(listener::service_contract_listener::ServiceContractListenerParams {
client: Arc::downgrade(&client),
sync: Arc::downgrade(&sync),
contract: service_contract,
key_server: key_server.clone(),
self_key_pair: self_key_pair,
key_servers_set: key_server_set,
key_server_set: key_server_set,
cluster: cluster,
key_storage: key_storage,
});
client.add_notify(contract_listener.clone());
let listener = listener::Listener::new(key_server, http_listener, Some(contract_listener));
Ok(Box::new(listener))
}

View File

@ -334,7 +334,7 @@ mod tests {
#[test]
fn http_listener_successfully_drops() {
let key_server = Arc::new(DummyKeyServer);
let key_server = Arc::new(DummyKeyServer::default());
let address = NodeAddress { address: "127.0.0.1".into(), port: 9000 };
let listener = KeyServerHttpListener::start(address, key_server).unwrap();
drop(listener);

View File

@ -1,4 +1,21 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
pub mod http_listener;
pub mod service_contract;
pub mod service_contract_listener;
use std::collections::BTreeSet;

View File

@ -0,0 +1,306 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{VecDeque, HashSet};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use futures::{future, Future};
use parking_lot::{RwLock, Mutex, Condvar};
use ethcore::filter::Filter;
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify};
use ethkey::{Random, Generator, Public, Signature, sign, public_to_address};
use ethsync::SyncProvider;
use native_contracts::SecretStoreService;
use bytes::Bytes;
use hash::keccak;
use bigint::hash::H256;
use bigint::prelude::U256;
use util::Address;
use key_server_set::KeyServerSet;
use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession};
use key_server_cluster::generation_session::SessionImpl as GenerationSession;
use key_storage::KeyStorage;
use listener::service_contract_listener::{ServiceTask, ServiceContractListenerParams};
use {ServerKeyId, NodeKeyPair, KeyServer};
/// Name of the SecretStore contract in the registry.
const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service";
/// Key server has been added to the set.
const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32,uint256)";
lazy_static! {
static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME);
}
/// Service contract trait.
pub trait ServiceContract: Send + Sync {
/// Update contract.
fn update(&self);
/// Is contract installed && up-to-date (i.e. chain is synced)?
fn is_actual(&self) -> bool;
/// Read contract logs from given blocks. Returns topics of every entry.
fn read_logs(&self, first_block: H256, last_block: H256) -> Box<Iterator<Item=Vec<H256>>>;
/// Publish generated key.
fn read_pending_requests(&self) -> Box<Iterator<Item=(bool, ServiceTask)>>;
/// Publish server key.
fn publish_server_key(&self, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String>;
}
/// On-chain service contract.
pub struct OnChainServiceContract {
/// Blockchain client.
client: Weak<Client>,
/// Sync provider.
sync: Weak<SyncProvider>,
/// This node key pair.
self_key_pair: Arc<NodeKeyPair>,
/// Contract.
contract: RwLock<Arc<SecretStoreService>>,
}
/// Pending requests iterator.
struct PendingRequestsIterator {
/// Blockchain client.
client: Arc<Client>,
/// Contract.
contract: Arc<SecretStoreService>,
/// This node key pair.
self_key_pair: Arc<NodeKeyPair>,
/// Current request index.
index: U256,
/// Requests length.
length: U256,
}
impl OnChainServiceContract {
/// Create new on-chain service contract.
pub fn new(client: &Arc<Client>, sync: &Arc<SyncProvider>, self_key_pair: Arc<NodeKeyPair>) -> Self {
let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned())
.map(|address| {
trace!(target: "secretstore", "{}: installing service contract from address {}",
self_key_pair.public(), address);
address
})
.unwrap_or_default();
OnChainServiceContract {
client: Arc::downgrade(client),
sync: Arc::downgrade(sync),
self_key_pair: self_key_pair,
contract: RwLock::new(Arc::new(SecretStoreService::new(contract_addr))),
}
}
}
impl ServiceContract for OnChainServiceContract {
fn update(&self) {
if let (Some(client), Some(sync)) = (self.client.upgrade(), self.sync.upgrade()) {
// do nothing until synced
if sync.status().is_syncing(client.queue_info()) {
return;
}
// update contract address from registry
let service_contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()).unwrap_or_default();
if self.contract.read().address != service_contract_addr {
trace!(target: "secretstore", "{}: installing service contract from address {}",
self.self_key_pair.public(), service_contract_addr);
*self.contract.write() = Arc::new(SecretStoreService::new(service_contract_addr));
}
}
}
fn is_actual(&self) -> bool {
self.contract.read().address != Default::default()
&& match (self.client.upgrade(), self.sync.upgrade()) {
(Some(client), Some(sync)) => !sync.status().is_syncing(client.queue_info()),
_ => false,
}
}
fn read_logs(&self, first_block: H256, last_block: H256) -> Box<Iterator<Item=Vec<H256>>> {
let client = match self.client.upgrade() {
Some(client) => client,
None => {
warn!(target: "secretstore", "{}: client is offline during read_pending_requests call",
self.self_key_pair.public());
return Box::new(::std::iter::empty());
},
};
// read server key generation requests
let contract_address = self.contract.read().address.clone();
let request_logs = client.logs(Filter {
from_block: BlockId::Hash(first_block),
to_block: BlockId::Hash(last_block),
address: Some(vec![contract_address]),
topics: vec![
Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]),
None,
None,
None,
],
limit: None,
});
Box::new(request_logs.into_iter().map(|log| log.entry.topics))
}
fn read_pending_requests(&self) -> Box<Iterator<Item=(bool, ServiceTask)>> {
let client = match self.client.upgrade() {
Some(client) => client,
None => {
warn!(target: "secretstore", "{}: client is offline during read_pending_requests call",
self.self_key_pair.public());
return Box::new(::std::iter::empty());
},
};
let contract = self.contract.read();
let length = match contract.address == Default::default() {
true => 0.into(),
false => {
let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d));
contract.server_key_generation_requests_count(&do_call).wait()
.map_err(|error| {
warn!(target: "secretstore", "{}: call to server_key_generation_requests_count failed: {}",
self.self_key_pair.public(), error);
error
})
.unwrap_or_default()
},
};
Box::new(PendingRequestsIterator {
client: client,
contract: contract.clone(),
self_key_pair: self.self_key_pair.clone(),
index: 0.into(),
length: length,
})
}
fn publish_server_key(&self, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> {
let server_key_hash = keccak(server_key);
let signed_server_key = self.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?;
let signed_server_key: Signature = signed_server_key.into_electrum().into();
let contract = self.contract.read();
let transaction_data = contract.encode_server_key_generated_input(server_key_id.clone(),
server_key.to_vec(),
signed_server_key.v(),
signed_server_key.r().into(),
signed_server_key.s().into()
)?;
if contract.address != Default::default() {
if let Some(client) = self.client.upgrade() {
client.transact_contract(
contract.address.clone(),
transaction_data
).map_err(|e| format!("{}", e))?;
} // else we will read this in the next refresh cycle
}
Ok(())
}
}
impl Iterator for PendingRequestsIterator {
type Item = (bool, ServiceTask);
fn next(&mut self) -> Option<(bool, ServiceTask)> {
if self.index >= self.length {
return None;
}
self.index = self.index + 1.into();
let do_call = |a, d| future::done(self.client.call_contract(BlockId::Latest, a, d));
let key_generation_request = self.contract.get_server_key_generation_request(&do_call,
public_to_address(self.self_key_pair.public()),
(self.index - 1.into()).clone().into()).wait();
let (server_key_id, threshold, is_confirmed) = match key_generation_request {
Ok((server_key_id, threshold, is_confirmed)) => {
(server_key_id, threshold, is_confirmed)
},
Err(error) => {
warn!(target: "secretstore", "{}: call to get_server_key_generation_request failed: {}",
self.self_key_pair.public(), error);
return None;
},
};
Some((is_confirmed, ServiceTask::GenerateServerKey(server_key_id, threshold.into())))
}
}
#[cfg(test)]
pub mod tests {
use std::collections::{VecDeque, HashSet};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use futures::{future, Future};
use parking_lot::{RwLock, Mutex, Condvar};
use ethcore::filter::Filter;
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify};
use ethkey::{Random, Generator, Public, Signature, sign, public_to_address};
use ethsync::SyncProvider;
use native_contracts::SecretStoreService;
use bytes::Bytes;
use hash::keccak;
use bigint::hash::H256;
use bigint::prelude::U256;
use util::Address;
use key_server_set::KeyServerSet;
use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession};
use key_server_cluster::generation_session::SessionImpl as GenerationSession;
use key_storage::KeyStorage;
use listener::service_contract_listener::{ServiceTask, ServiceContractListenerParams};
use {ServerKeyId, NodeKeyPair, KeyServer};
use super::ServiceContract;
#[derive(Default)]
pub struct DummyServiceContract {
pub is_actual: bool,
pub logs: Vec<Vec<H256>>,
pub pending_requests: Vec<(bool, ServiceTask)>,
pub published_keys: Mutex<Vec<(ServerKeyId, Public)>>,
}
impl ServiceContract for DummyServiceContract {
fn update(&self) {
}
fn is_actual(&self) -> bool {
self.is_actual
}
fn read_logs(&self, first_block: H256, last_block: H256) -> Box<Iterator<Item=Vec<H256>>> {
Box::new(self.logs.clone().into_iter())
}
fn read_pending_requests(&self) -> Box<Iterator<Item=(bool, ServiceTask)>> {
Box::new(self.pending_requests.clone().into_iter())
}
fn publish_server_key(&self, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> {
self.published_keys.lock().push((server_key_id.clone(), server_key.clone()));
Ok(())
}
}
}

View File

@ -34,14 +34,9 @@ use key_server_set::KeyServerSet;
use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession};
use key_server_cluster::generation_session::SessionImpl as GenerationSession;
use key_storage::KeyStorage;
use listener::service_contract::ServiceContract;
use {ServerKeyId, NodeKeyPair, KeyServer};
/// Name of the SecretStore contract in the registry.
const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service";
/// Key server has been added to the set.
const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32,uint256)";
/// Retry interval (in blocks). Every RETRY_INTEVAL_BLOCKS blocks each KeyServer reads pending requests from
/// service contract && tries to re-execute. The reason to have this mechanism is primarily because keys
/// servers set change takes a lot of time + there could be some races, when blocks are coming to different
@ -52,10 +47,6 @@ const RETRY_INTEVAL_BLOCKS: usize = 30;
/// pending requests have failed, then most probably other will fail too.
const MAX_FAILED_RETRY_REQUESTS: usize = 1;
lazy_static! {
static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME);
}
/// SecretStore <-> Authority connector responsible for:
/// 1. listening for new requests on SecretStore contract
/// 2. redirecting requests to key server
@ -69,16 +60,14 @@ pub struct ServiceContractListener {
/// Service contract listener parameters.
pub struct ServiceContractListenerParams {
/// Blockchain client.
pub client: Weak<Client>,
/// Sync provider.
pub sync: Weak<SyncProvider>,
/// Service contract.
pub contract: Arc<ServiceContract>,
/// Key server reference.
pub key_server: Arc<KeyServer>,
/// This node key pair.
pub self_key_pair: Arc<NodeKeyPair>,
/// Key servers set.
pub key_servers_set: Arc<KeyServerSet>,
pub key_server_set: Arc<KeyServerSet>,
/// Cluster reference.
pub cluster: Arc<ClusterClient>,
/// Key storage reference.
@ -91,12 +80,19 @@ struct ServiceContractListenerData {
pub last_retry: AtomicUsize,
/// Retry-related data.
pub retry_data: Mutex<ServiceContractRetryData>,
/// Contract.
pub contract: RwLock<SecretStoreService>,
/// Service tasks queue.
pub tasks_queue: Arc<TasksQueue>,
/// Cluster params.
pub params: ServiceContractListenerParams,
/// Service contract.
pub contract: Arc<ServiceContract>,
/// Key server reference.
pub key_server: Arc<KeyServer>,
/// This node key pair.
pub self_key_pair: Arc<NodeKeyPair>,
/// Key servers set.
pub key_server_set: Arc<KeyServerSet>,
/// Key storage reference.
pub key_storage: Arc<KeyStorage>,
}
/// Retry-related data.
@ -115,8 +111,8 @@ struct TasksQueue {
}
/// Service task.
#[derive(Debug)]
enum ServiceTask {
#[derive(Debug, Clone, PartialEq)]
pub enum ServiceTask {
/// Retry all 'stalled' tasks.
Retry,
/// Generate server key (server_key_id, threshold).
@ -130,44 +126,58 @@ enum ServiceTask {
impl ServiceContractListener {
/// Create new service contract listener.
pub fn new(params: ServiceContractListenerParams) -> Arc<ServiceContractListener> {
let client = params.client.upgrade().expect("client is active in constructor; qed");
let sync = params.sync.upgrade().expect("sync is active in constructor; qed");
let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned())
.map(|address| {
trace!(target: "secretstore", "{}: installing service contract from address {}",
params.self_key_pair.public(), address);
address
})
.unwrap_or_default();
let is_syncing = sync.status().is_syncing(client.queue_info());
let data = Arc::new(ServiceContractListenerData {
last_retry: AtomicUsize::new(0),
retry_data: Default::default(),
contract: RwLock::new(SecretStoreService::new(contract_addr)),
tasks_queue: Arc::new(TasksQueue::new()),
params: params,
contract: params.contract,
key_server: params.key_server,
self_key_pair: params.self_key_pair,
key_server_set: params.key_server_set,
key_storage: params.key_storage,
});
data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
// retry on restart
if !is_syncing {
data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
}
let service_thread_data = data.clone();
let service_handle = thread::spawn(move || Self::run_service_thread(service_thread_data));
// we are not starting thread when in test mode
let service_handle = if cfg!(test) {
None
} else {
let service_thread_data = data.clone();
Some(thread::spawn(move || Self::run_service_thread(service_thread_data)))
};
let contract = Arc::new(ServiceContractListener {
data: data,
service_handle: Some(service_handle),
service_handle: service_handle,
});
client.add_notify(contract.clone());
contract.data.params.cluster.add_generation_listener(contract.clone());
params.cluster.add_generation_listener(contract.clone());
contract
}
/// Process incoming events of service contract.
fn process_service_contract_events(&self, client: &Client, service_contract: Address, blocks: Vec<H256>) {
debug_assert!(!blocks.is_empty());
fn process_service_contract_events(&self, first: H256, last: H256) {
self.data.tasks_queue.push(self.data.contract.read_logs(first, last)
.filter_map(|topics| match topics.len() {
// when key is already generated && we have this key
3 if self.data.key_storage.get(&topics[1]).map(|k| k.is_some()).unwrap_or_default() => {
Some(ServiceTask::RestoreServerKey(
topics[1],
))
}
// when key is not yet generated && this node should be master of this key generation session
3 if is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &topics[1]) => {
Some(ServiceTask::GenerateServerKey(
topics[1],
topics[2],
))
},
3 => None,
l @ _ => {
warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l);
None
},
}));
/* debug_assert!(!blocks.is_empty());
// read server key generation requests
let request_logs = client.logs(Filter {
@ -187,13 +197,13 @@ impl ServiceContractListener {
self.data.tasks_queue.push(request_logs.into_iter()
.filter_map(|r| match r.entry.topics.len() {
// when key is already generated && we have this key
3 if self.data.params.key_storage.get(&r.entry.topics[1]).map(|k| k.is_some()).unwrap_or_default() => {
3 if self.data.key_storage.get(&r.entry.topics[1]).map(|k| k.is_some()).unwrap_or_default() => {
Some(ServiceTask::RestoreServerKey(
r.entry.topics[1],
))
}
// when key is not yet generated && this node should be master of this key generation session
3 if is_processed_by_this_key_server(&*self.data.params.key_servers_set, &*self.data.params.self_key_pair, &r.entry.topics[1]) => {
3 if is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &r.entry.topics[1]) => {
Some(ServiceTask::GenerateServerKey(
r.entry.topics[1],
r.entry.topics[2],
@ -204,14 +214,14 @@ impl ServiceContractListener {
warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l);
None
},
}));
}));*/
}
/// Service thread procedure.
fn run_service_thread(data: Arc<ServiceContractListenerData>) {
loop {
let task = data.tasks_queue.wait();
trace!(target: "secretstore", "{}: processing {:?} task", data.params.self_key_pair.public(), task);
trace!(target: "secretstore", "{}: processing {:?} task", data.self_key_pair.public(), task);
match task {
ServiceTask::Shutdown => break,
@ -231,13 +241,13 @@ impl ServiceContractListener {
.map(|processed_requests| {
if processed_requests != 0 {
trace!(target: "secretstore", "{}: successfully retried {} pending requests",
data.params.self_key_pair.public(), processed_requests);
data.self_key_pair.public(), processed_requests);
}
()
})
.map_err(|error| {
warn!(target: "secretstore", "{}: retrying pending requests has failed with: {}",
data.params.self_key_pair.public(), error);
data.self_key_pair.public(), error);
error
}),
ServiceTask::RestoreServerKey(server_key_id) => {
@ -246,27 +256,27 @@ impl ServiceContractListener {
.and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key))
.map(|_| {
trace!(target: "secretstore", "{}: processed RestoreServerKey({}) request",
data.params.self_key_pair.public(), server_key_id);
data.self_key_pair.public(), server_key_id);
()
})
.map_err(|error| {
warn!(target: "secretstore", "{}: failed to process RestoreServerKey({}) request with: {}",
data.params.self_key_pair.public(), server_key_id, error);
data.self_key_pair.public(), server_key_id, error);
error
})
}
},
ServiceTask::GenerateServerKey(server_key_id, threshold) => {
data.retry_data.lock().generated_keys.insert(server_key_id.clone());
Self::generate_server_key(&data, &server_key_id, &threshold)
.and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key))
.map(|_| {
trace!(target: "secretstore", "{}: processed GenerateServerKey({}, {}) request",
data.params.self_key_pair.public(), server_key_id, threshold);
data.self_key_pair.public(), server_key_id, threshold);
()
})
.map_err(|error| {
warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}, {}) request with: {}",
data.params.self_key_pair.public(), server_key_id, threshold, error);
data.self_key_pair.public(), server_key_id, threshold, error);
error
})
},
@ -276,49 +286,33 @@ impl ServiceContractListener {
/// Retry processing pending requests.
fn retry_pending_requests(data: &Arc<ServiceContractListenerData>) -> Result<usize, String> {
let client = data.params.client.upgrade().ok_or("client is required".to_owned())?;
let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default());
let contract = data.contract.read();
// it is only possible when contract address is set
if contract.address == Default::default() {
return Ok(0);
}
let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d));
let generate_server_key_requests_count = contract.server_key_generation_requests_count(&do_call).wait()?;
let mut generate_server_key_request_index = 0.into();
let mut failed_requests = 0;
let mut processed_requests = 0;
loop {
if generate_server_key_request_index >= generate_server_key_requests_count {
break;
}
// read request from the contract
let (server_key_id, threshold, is_confirmed) = contract.get_server_key_generation_request(&do_call,
public_to_address(data.params.self_key_pair.public()),
generate_server_key_request_index).wait()?;
generate_server_key_request_index = generate_server_key_request_index + 1.into();
let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default());
for (is_confirmed, task) in data.contract.read_pending_requests() {
// only process requests, which we haven't confirmed yet
if is_confirmed {
continue;
}
// only process request, which haven't been processed recently
// there could be a lag when we've just generated server key && retrying on the same block
// (or before our tx is mined) - state is not updated yet
if retry_data.generated_keys.contains(&server_key_id){
continue;
}
let request_result = match task {
ServiceTask::GenerateServerKey(server_key_id, threshold) => {
// only process request, which haven't been processed recently
// there could be a lag when we've just generated server key && retrying on the same block
// (or before our tx is mined) - state is not updated yet
if retry_data.generated_keys.contains(&server_key_id){
continue;
}
// process request
let is_own_request = is_processed_by_this_key_server(&*data.params.key_servers_set, &*data.params.self_key_pair, &server_key_id);
let request_result = Self::process_service_task(data, match is_own_request {
true => ServiceTask::GenerateServerKey(server_key_id, threshold.into()),
false => ServiceTask::RestoreServerKey(server_key_id),
});
// process request
let is_own_request = is_processed_by_this_key_server(&*data.key_server_set, &*data.self_key_pair, &server_key_id);
Self::process_service_task(data, match is_own_request {
true => ServiceTask::GenerateServerKey(server_key_id, threshold.into()),
false => ServiceTask::RestoreServerKey(server_key_id),
})
},
_ => Err("not supported".into()),
};
// process request result
match request_result {
@ -331,6 +325,7 @@ impl ServiceContractListener {
},
}
}
Ok(processed_requests)
}
@ -346,13 +341,13 @@ impl ServiceContractListener {
// => this API (server key generation) is not suitable for usage in encryption via contract endpoint
let author_key = Random.generate().map_err(|e| format!("{}", e))?;
let server_key_id_signature = sign(author_key.secret(), server_key_id).map_err(|e| format!("{}", e))?;
data.params.key_server.generate_key(server_key_id, &server_key_id_signature, threshold_num as usize)
data.key_server.generate_key(server_key_id, &server_key_id_signature, threshold_num as usize)
.map_err(Into::into)
}
/// Restore server key.
fn restore_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId) -> Result<Public, String> {
data.params.key_storage.get(server_key_id)
data.key_storage.get(server_key_id)
.map_err(|e| format!("{}", e))
.and_then(|ks| ks.ok_or("missing key".to_owned()))
.map(|ks| ks.public)
@ -360,8 +355,9 @@ impl ServiceContractListener {
/// Publish server key.
fn publish_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> {
let server_key_hash = keccak(server_key);
let signed_server_key = data.params.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?;
data.contract.publish_server_key(server_key_id, server_key)
/*let server_key_hash = keccak(server_key);
let signed_server_key = data.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?;
let signed_server_key: Signature = signed_server_key.into_electrum().into();
let transaction_data = data.contract.read().encode_server_key_generated_input(server_key_id.clone(),
server_key.to_vec(),
@ -372,7 +368,7 @@ impl ServiceContractListener {
let contract = data.contract.read();
if contract.address != Default::default() {
if let Some(client) = data.params.client.upgrade() {
if let Some(client) = data.client.upgrade() {
client.transact_contract(
contract.address.clone(),
transaction_data
@ -381,6 +377,7 @@ impl ServiceContractListener {
}
Ok(())
unimplemented!()*/
}
}
@ -397,33 +394,52 @@ impl Drop for ServiceContractListener {
impl ChainNotify for ServiceContractListener {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
let enacted_len = enacted.len();
if enacted_len != 0 {
if let (Some(client), Some(sync)) = (self.data.params.client.upgrade(), self.data.params.sync.upgrade()) {
// do nothing until synced
if sync.status().is_syncing(client.queue_info()) {
return;
}
// update contract address from registry
if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) {
if self.data.contract.read().address != service_contract_addr {
trace!(target: "secretstore", "{}: installing service contract from address {}",
self.data.params.self_key_pair.public(), service_contract_addr);
*self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone());
}
// and process contract events
self.process_service_contract_events(&*client, service_contract_addr, enacted);
}
// schedule retry if received enough blocks since last retry
// it maybe inaccurate when switching syncing/synced states, but that's ok
if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTEVAL_BLOCKS {
self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
self.data.last_retry.store(0, Ordering::Relaxed);
}
}
if enacted_len == 0 {
return;
}
if !self.data.contract.is_actual() {
return;
}
self.data.contract.update();
self.process_service_contract_events(
enacted.first().expect("TODO").clone(),
enacted.last().expect("TODO").clone());
// schedule retry if received enough blocks since last retry
// it maybe inaccurate when switching syncing/synced states, but that's ok
if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTEVAL_BLOCKS {
self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
self.data.last_retry.store(0, Ordering::Relaxed);
}
/* if let (Some(client), Some(sync)) = (self.data.client.upgrade(), self.data.sync.upgrade()) {
// do nothing until synced
if sync.status().is_syncing(client.queue_info()) {
return;
}
// update contract address from registry
if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) {
if self.data.contract.read().address != service_contract_addr {
trace!(target: "secretstore", "{}: installing service contract from address {}",
self.data.self_key_pair.public(), service_contract_addr);
*self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone());
}
// and process contract events
self.process_service_contract_events(&*client, service_contract_addr, enacted);
}
// schedule retry if received enough blocks since last retry
// it maybe inaccurate when switching syncing/synced states, but that's ok
if self.data.last_retry.fetch_add(enacted_len, Ordering::Relaxed) >= RETRY_INTEVAL_BLOCKS {
self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
self.data.last_retry.store(0, Ordering::Relaxed);
}
}*/
}
}
@ -433,7 +449,7 @@ impl ClusterSessionsListener<GenerationSession> for ServiceContractListener {
// only publish when the session is started by another node
// when it is started by this node, it is published from process_service_task
if !is_processed_by_this_key_server(&*self.data.params.key_servers_set, &*self.data.params.self_key_pair, &session.id()) {
if !is_processed_by_this_key_server(&*self.data.key_server_set, &*self.data.self_key_pair, &session.id()) {
// by this time sesion must already be completed - either successfully, or not
debug_assert!(session.is_finished());
@ -443,12 +459,12 @@ impl ClusterSessionsListener<GenerationSession> for ServiceContractListener {
.and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key))
.map(|_| {
trace!(target: "secretstore", "{}: completed foreign GenerateServerKey({}) request",
self.data.params.self_key_pair.public(), session.id());
self.data.self_key_pair.public(), session.id());
()
})
.map_err(|error| {
warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}) request with: {}",
self.data.params.self_key_pair.public(), session.id(), error);
self.data.self_key_pair.public(), session.id(), error);
error
});
}
@ -493,8 +509,8 @@ impl TasksQueue {
}
/// Returns true when session, related to `server_key_id` must be started on this KeyServer.
fn is_processed_by_this_key_server(key_servers_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool {
let servers = key_servers_set.get();
fn is_processed_by_this_key_server(key_server_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool {
let servers = key_server_set.get();
let total_servers_count = servers.len();
if total_servers_count == 0 {
return false;
@ -514,10 +530,39 @@ fn is_processed_by_this_key_server(key_servers_set: &KeyServerSet, self_key_pair
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::Ordering;
use ethkey::{Random, Generator, KeyPair};
use listener::service_contract::ServiceContract;
use listener::service_contract::tests::DummyServiceContract;
use key_server_cluster::DummyClusterClient;
use key_server::tests::DummyKeyServer;
use key_storage::tests::DummyKeyStorage;
use key_server_set::tests::MapKeyServerSet;
use PlainNodeKeyPair;
use super::is_processed_by_this_key_server;
use super::{ServiceTask, ServiceContractListener, ServiceContractListenerParams, is_processed_by_this_key_server};
fn make_service_contract_listener(contract: Option<Arc<ServiceContract>>, key_server: Option<Arc<DummyKeyServer>>) -> Arc<ServiceContractListener> {
let contract = contract.unwrap_or_else(|| Arc::new(DummyServiceContract::default()));
let key_server = key_server.unwrap_or_else(|| Arc::new(DummyKeyServer::default()));
let servers_set = Arc::new(MapKeyServerSet::new(vec![
("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
("c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee51ae168fea63dc339a3c58419466ceaeef7f632653266d0e1236431a950cfe52a".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
("f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9388f7b0f632de8140fe337e62a37f3566500a99934c2231b6cb9fd7584b8e672".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
].into_iter().collect()));
let self_key_pair = Arc::new(PlainNodeKeyPair::new(KeyPair::from_secret("0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap()));
ServiceContractListener::new(ServiceContractListenerParams {
contract: contract,
key_server: key_server,
self_key_pair: self_key_pair,
key_server_set: servers_set,
cluster: Arc::new(DummyClusterClient::default()),
key_storage: Arc::new(DummyKeyStorage::default()),
})
}
#[test]
fn is_not_processed_by_this_key_server_with_zero_servers() {
@ -661,4 +706,64 @@ mod tests {
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
}
#[test]
fn no_tasks_scheduled_when_no_contract_events() {
let listener = make_service_contract_listener(None, None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
}
#[test]
fn server_key_generation_is_scheduled_when_requested_key_is_unknnown() {
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), Default::default(), Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::GenerateServerKey(Default::default(), Default::default())));
}
#[test]
fn no_new_tasks_scheduled_when_requested_key_is_unknown_and_request_belongs_to_other_key_server() {
let server_key_id = "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap();
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), server_key_id, Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
}
#[test]
fn server_key_restore_is_scheduled_when_requested_key_is_knnown() {
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), Default::default(), Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None);
listener.data.key_storage.insert(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 2);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().pop_back(), Some(ServiceTask::RestoreServerKey(Default::default())));
}
#[test]
fn no_new_tasks_scheduled_when_wrong_number_of_topics_in_log() {
let mut contract = DummyServiceContract::default();
contract.logs.push(vec![Default::default(), Default::default()]);
let listener = make_service_contract_listener(Some(Arc::new(contract)), None);
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
listener.process_service_contract_events(Default::default(), Default::default());
assert_eq!(listener.data.tasks_queue.service_tasks.lock().len(), 1);
}
#[test]
fn generation_session_is_created_when_processing_generate_server_key_task() {
let key_server = Arc::new(DummyKeyServer::default());
let listener = make_service_contract_listener(None, Some(key_server.clone()));
ServiceContractListener::process_service_task(&listener.data, ServiceTask::GenerateServerKey(Default::default(), Default::default())).unwrap_err();
assert_eq!(key_server.generation_requests_count.load(Ordering::Relaxed), 1);
}
}