diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index 09d4ce774..a0af4f072 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -79,13 +79,21 @@ pub fn start(client: Arc, sync: Arc, self_key_pair: Arc Some(listener::http_listener::KeyServerHttpListener::start(listener_address, key_server.clone())?), None => None, }; - let contract_listener = listener::service_contract_listener::ServiceContractListener::new(&client, &sync, key_server.clone(), cluster, self_key_pair, key_server_set); + let contract_listener = listener::service_contract_listener::ServiceContractListener::new(listener::service_contract_listener::ServiceContractListenerParams { + client: Arc::downgrade(&client), + sync: Arc::downgrade(&sync), + key_server: key_server.clone(), + self_key_pair: self_key_pair, + key_servers_set: key_server_set, + cluster: cluster, + key_storage: key_storage, + }); let listener = listener::Listener::new(key_server, http_listener, Some(contract_listener)); Ok(Box::new(listener)) } diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index d53be7daa..33fde4eed 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -33,6 +33,7 @@ use util::Address; use key_server_set::KeyServerSet; use key_server_cluster::{ClusterClient, ClusterSessionsListener, ClusterSession}; use key_server_cluster::generation_session::SessionImpl as GenerationSession; +use key_storage::KeyStorage; use {ServerKeyId, NodeKeyPair, KeyServer}; /// Name of the SecretStore contract in the registry. @@ -66,14 +67,8 @@ pub struct ServiceContractListener { service_handle: Option>, } -/// Service contract listener data. -struct ServiceContractListenerData { - /// Blocks since last retry. - pub last_retry: AtomicUsize, - /// Retry-related data. - pub retry_data: Mutex, - /// Contract. - pub contract: RwLock, +/// Service contract listener parameters. +pub struct ServiceContractListenerParams { /// Blockchain client. pub client: Weak, /// Sync provider. @@ -84,8 +79,24 @@ struct ServiceContractListenerData { pub self_key_pair: Arc, /// Key servers set. pub key_servers_set: Arc, + /// Cluster reference. + pub cluster: Arc, + /// Key storage reference. + pub key_storage: Arc, +} + +/// Service contract listener data. +struct ServiceContractListenerData { + /// Blocks since last retry. + pub last_retry: AtomicUsize, + /// Retry-related data. + pub retry_data: Mutex, + /// Contract. + pub contract: RwLock, /// Service tasks queue. pub tasks_queue: Arc, + /// Cluster params. + pub params: ServiceContractListenerParams, } /// Retry-related data. @@ -111,18 +122,21 @@ enum ServiceTask { /// Generate server key (server_key_id, threshold). GenerateServerKey(H256, H256), /// Confirm server key (server_key_id). - ConfirmServerKey(H256), + RestoreServerKey(H256), /// Shutdown listener. Shutdown, } impl ServiceContractListener { /// Create new service contract listener. - pub fn new(client: &Arc, sync: &Arc, key_server: Arc, cluster: Arc, self_key_pair: Arc, key_servers_set: Arc) -> Arc { + pub fn new(params: ServiceContractListenerParams) -> Arc { + let client = params.client.upgrade().expect("client is active in constructor; qed"); + let sync = params.sync.upgrade().expect("sync is active in constructor; qed"); let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) - .map(|a| { - trace!(target: "secretstore", "{}: installing service contract from address {}", self_key_pair.public(), a); - a + .map(|address| { + trace!(target: "secretstore", "{}: installing service contract from address {}", + params.self_key_pair.public(), address); + address }) .unwrap_or_default(); @@ -131,12 +145,8 @@ impl ServiceContractListener { last_retry: AtomicUsize::new(0), retry_data: Default::default(), contract: RwLock::new(SecretStoreService::new(contract_addr)), - client: Arc::downgrade(client), - sync: Arc::downgrade(sync), - key_server: key_server, - self_key_pair: self_key_pair, - key_servers_set: key_servers_set, tasks_queue: Arc::new(TasksQueue::new()), + params: params, }); // retry on restart @@ -151,7 +161,7 @@ impl ServiceContractListener { service_handle: Some(service_handle), }); client.add_notify(contract.clone()); - cluster.add_generation_listener(contract.clone()); + contract.data.params.cluster.add_generation_listener(contract.clone()); contract } @@ -176,7 +186,14 @@ impl ServiceContractListener { // schedule correct requests if they're intended to be processed by this KeyServer self.data.tasks_queue.push(request_logs.into_iter() .filter_map(|r| match r.entry.topics.len() { - 3 if is_processed_by_this_key_server(&*self.data.key_servers_set, &*self.data.self_key_pair, &r.entry.topics[1]) => { + // when key is already generated && we have this key + 3 if self.data.params.key_storage.get(&r.entry.topics[1]).map(|k| k.is_some()).unwrap_or_default() => { + Some(ServiceTask::RestoreServerKey( + r.entry.topics[1], + )) + } + // when key is not yet generated && this node should be master of this key generation session + 3 if is_processed_by_this_key_server(&*self.data.params.key_servers_set, &*self.data.params.self_key_pair, &r.entry.topics[1]) => { Some(ServiceTask::GenerateServerKey( r.entry.topics[1], r.entry.topics[2], @@ -194,7 +211,7 @@ impl ServiceContractListener { fn run_service_thread(data: Arc) { loop { let task = data.tasks_queue.wait(); - trace!(target: "secretstore", "{}: processing {:?} task",data.self_key_pair.public(), task); + trace!(target: "secretstore", "{}: processing {:?} task", data.params.self_key_pair.public(), task); match task { ServiceTask::Shutdown => break, @@ -214,28 +231,42 @@ impl ServiceContractListener { .map(|processed_requests| { if processed_requests != 0 { trace!(target: "secretstore", "{}: successfully retried {} pending requests", - data.self_key_pair.public(), processed_requests); + data.params.self_key_pair.public(), processed_requests); } () }) .map_err(|error| { warn!(target: "secretstore", "{}: retrying pending requests has failed with: {}", - data.self_key_pair.public(), error); + data.params.self_key_pair.public(), error); error }), - ServiceTask::ConfirmServerKey(_) => Err("not implemented".to_owned()), // TODO + ServiceTask::RestoreServerKey(server_key_id) => { + data.retry_data.lock().generated_keys.insert(server_key_id.clone()); + Self::restore_server_key(&data, &server_key_id) + .and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) + .map(|_| { + trace!(target: "secretstore", "{}: processed RestoreServerKey({}) request", + data.params.self_key_pair.public(), server_key_id); + () + }) + .map_err(|error| { + warn!(target: "secretstore", "{}: failed to process RestoreServerKey({}) request with: {}", + data.params.self_key_pair.public(), server_key_id, error); + error + }) + } ServiceTask::GenerateServerKey(server_key_id, threshold) => { data.retry_data.lock().generated_keys.insert(server_key_id.clone()); Self::generate_server_key(&data, &server_key_id, &threshold) .and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) .map(|_| { - trace!(target: "secretstore", "{}: started processing GenerateServerKey({}, {}) request", - data.self_key_pair.public(), server_key_id, threshold); + trace!(target: "secretstore", "{}: processed GenerateServerKey({}, {}) request", + data.params.self_key_pair.public(), server_key_id, threshold); () }) .map_err(|error| { - warn!(target: "secretstore", "{}: failed to start processing GenerateServerKey({}, {}) request with: {}", - data.self_key_pair.public(), server_key_id, threshold, error); + warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}, {}) request with: {}", + data.params.self_key_pair.public(), server_key_id, threshold, error); error }) }, @@ -245,7 +276,7 @@ impl ServiceContractListener { /// Retry processing pending requests. fn retry_pending_requests(data: &Arc) -> Result { - let client = data.client.upgrade().ok_or("client is required".to_owned())?; + let client = data.params.client.upgrade().ok_or("client is required".to_owned())?; let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default()); let contract = data.contract.read(); @@ -266,7 +297,7 @@ impl ServiceContractListener { // read request from the contract let (server_key_id, threshold, is_confirmed) = contract.get_server_key_generation_request(&do_call, - public_to_address(data.self_key_pair.public()), + public_to_address(data.params.self_key_pair.public()), generate_server_key_request_index).wait()?; generate_server_key_request_index = generate_server_key_request_index + 1.into(); @@ -283,10 +314,10 @@ impl ServiceContractListener { } // process request - let is_own_request = is_processed_by_this_key_server(&*data.key_servers_set, &*data.self_key_pair, &server_key_id); + let is_own_request = is_processed_by_this_key_server(&*data.params.key_servers_set, &*data.params.self_key_pair, &server_key_id); let request_result = Self::process_service_task(data, match is_own_request { true => ServiceTask::GenerateServerKey(server_key_id, threshold.into()), - false => ServiceTask::ConfirmServerKey(server_key_id), + false => ServiceTask::RestoreServerKey(server_key_id), }); // process request result @@ -315,14 +346,19 @@ impl ServiceContractListener { // => this API (server key generation) is not suitable for usage in encryption via contract endpoint let author_key = Random.generate().map_err(|e| format!("{}", e))?; let server_key_id_signature = sign(author_key.secret(), server_key_id).map_err(|e| format!("{}", e))?; - data.key_server.generate_key(server_key_id, &server_key_id_signature, threshold_num as usize) + data.params.key_server.generate_key(server_key_id, &server_key_id_signature, threshold_num as usize) .map_err(Into::into) } + /// Restore server key. + fn restore_server_key(data: &Arc, server_key_id: &ServerKeyId) -> Result { + unimplemented!() + } + /// Publish server key. fn publish_server_key(data: &Arc, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> { let server_key_hash = keccak(server_key); - let signed_server_key = data.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?; + let signed_server_key = data.params.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?; let signed_server_key: Signature = signed_server_key.into_electrum().into(); let transaction_data = data.contract.read().encode_server_key_generated_input(server_key_id.clone(), server_key.to_vec(), @@ -333,7 +369,7 @@ impl ServiceContractListener { let contract = data.contract.read(); if contract.address != Default::default() { - if let Some(client) = data.client.upgrade() { + if let Some(client) = data.params.client.upgrade() { client.transact_contract( contract.address.clone(), transaction_data @@ -359,7 +395,7 @@ impl ChainNotify for ServiceContractListener { fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, _retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { let enacted_len = enacted.len(); if enacted_len != 0 { - if let (Some(client), Some(sync)) = (self.data.client.upgrade(), self.data.sync.upgrade()) { + if let (Some(client), Some(sync)) = (self.data.params.client.upgrade(), self.data.params.sync.upgrade()) { // do nothing until synced if sync.status().is_syncing(client.queue_info()) { return; @@ -368,7 +404,8 @@ impl ChainNotify for ServiceContractListener { // update contract address from registry if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) { if self.data.contract.read().address != service_contract_addr { - trace!(target: "secretstore", "{}: installing service contract from address {}", self.data.self_key_pair.public(), service_contract_addr); + trace!(target: "secretstore", "{}: installing service contract from address {}", + self.data.params.self_key_pair.public(), service_contract_addr); *self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone()); } @@ -393,7 +430,7 @@ impl ClusterSessionsListener for ServiceContractListener { // only publish when the session is started by another node // when it is started by this node, it is published from process_service_task - if !is_processed_by_this_key_server(&*self.data.key_servers_set, &*self.data.self_key_pair, &session.id()) { + if !is_processed_by_this_key_server(&*self.data.params.key_servers_set, &*self.data.params.self_key_pair, &session.id()) { // by this time sesion must already be completed - either successfully, or not debug_assert!(session.is_finished()); session.wait(Some(Default::default())) @@ -401,12 +438,12 @@ impl ClusterSessionsListener for ServiceContractListener { .and_then(|server_key| Self::publish_server_key(&self.data, &session.id(), &server_key)) .map(|_| { trace!(target: "secretstore", "{}: completed foreign GenerateServerKey({}) request", - self.data.self_key_pair.public(), session.id()); + self.data.params.self_key_pair.public(), session.id()); () }) .map_err(|error| { warn!(target: "secretstore", "{}: failed to process GenerateServerKey({}) request with: {}", - self.data.self_key_pair.public(), session.id(), error); + self.data.params.self_key_pair.public(), session.id(), error); error }); }