diff --git a/ethcore/native_contracts/res/secretstore_service.json b/ethcore/native_contracts/res/secretstore_service.json index 3c9510bb5..0189691a1 100644 --- a/ethcore/native_contracts/res/secretstore_service.json +++ b/ethcore/native_contracts/res/secretstore_service.json @@ -1,3 +1,3 @@ [ - {"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"} + {"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"threshold","type":"uint256"}],"name":"generateServerKey","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[],"name":"serverKeyGenerationFee","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"}],"name":"ServerKeyRequested","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":false,"name":"serverKeyPublic","type":"bytes"}],"name":"ServerKeyGenerated","type":"event"} ] \ No newline at end of file diff --git a/secret_store/src/listener/http_listener.rs b/secret_store/src/listener/http_listener.rs index cb6530f40..c1f5fc3fd 100644 --- a/secret_store/src/listener/http_listener.rs +++ b/secret_store/src/listener/http_listener.rs @@ -287,7 +287,7 @@ mod tests { fn http_listener_successfully_drops() { let key_server = Arc::new(DummyKeyServer); let address = NodeAddress { address: "127.0.0.1".into(), port: 9000 }; - let listener = KeyServerHttpListener::start(Some(address), key_server).unwrap(); + let listener = KeyServerHttpListener::start(address, key_server).unwrap(); drop(listener); } diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index d30e0100c..ae3ec0736 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -14,17 +14,20 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::collections::VecDeque; use std::sync::{Arc, Weak}; -use parking_lot::Mutex; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use parking_lot::{RwLock, Mutex, Condvar}; use ethcore::filter::Filter; use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; use native_contracts::SecretStoreService; -use ethkey::{Random, Generator, sign}; +use ethkey::{Random, Generator, Public, Signature, sign}; use bytes::Bytes; use hash::keccak; use bigint::hash::H256; use util::Address; -use {NodeKeyPair, KeyServer}; +use {ServerKeyId, NodeKeyPair, KeyServer}; /// Name of the SecretStore contract in the registry. const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service"; @@ -36,103 +39,221 @@ lazy_static! { static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME); } -/// SecretStore <-> Authority connector. Duties: -/// 1. Listen for new requests on SecretStore contract -/// 2. Redirects requests for key server -/// 3. Publishes response on SecretStore contract +/// SecretStore <-> Authority connector responsible for: +/// 1. listening for new requests on SecretStore contract +/// 2. redirecting requests to key server +/// 3. publishing response on SecretStore contract pub struct ServiceContractListener { - /// Cached on-chain contract. - contract: Mutex, + /// Service contract listener data. + data: Arc, + /// Service thread handle. + service_handle: Option>, } -/// Cached on-chain Key Server set contract. -struct CachedContract { +/// Service contract listener data. +struct ServiceContractListenerData { + /// Contract (currently used for parameters encoding only). + pub contract: RwLock, /// Blockchain client. - client: Weak, - /// Contract. - contract: SecretStoreService, - /// Contract address. - contract_addr: Option
, + pub client: Weak, /// Key server reference. - key_server: Arc, + pub key_server: Arc, /// This node key pair. - self_key_pair: Arc, + pub self_key_pair: Arc, + /// Service tasks queue. + pub tasks_queue: Arc, +} + +/// Service tasks queue. +struct TasksQueue { + /// Are we closing currently. + is_shutdown: AtomicBool, + /// Service event. + service_event: Condvar, + /// Service tasks queue. + service_tasks: Mutex>, +} + +/// Service task. +enum ServiceTask { + /// Generate server key (server_key_id, threshold). + GenerateServerKey(H256, H256), + /// Shutdown listener. + Shutdown, } impl ServiceContractListener { pub fn new(client: &Arc, key_server: Arc, self_key_pair: Arc) -> Arc { + let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()).unwrap_or_default(); + let data = Arc::new(ServiceContractListenerData { + contract: RwLock::new(SecretStoreService::new(contract_addr)), + client: Arc::downgrade(client), + key_server: key_server, + self_key_pair: self_key_pair, + tasks_queue: Arc::new(TasksQueue::new()), + }); + + let service_thread_data = data.clone(); + let service_handle = thread::spawn(move || Self::run_service_thread(service_thread_data)); let contract = Arc::new(ServiceContractListener { - contract: Mutex::new(CachedContract::new(client, key_server, self_key_pair)), + data: data, + service_handle: Some(service_handle), }); client.add_notify(contract.clone()); contract } + + fn process_service_contract_events(&self, client: &Client, service_contract: Address, blocks: Vec) { + debug_assert!(!blocks.is_empty()); + + // TODO: is blocks guaranteed to be ordered here? + // TODO: logs() is called from notify() thread - is it ok? + let request_logs = client.logs(Filter { + from_block: BlockId::Hash(blocks.first().expect("!block.is_empty(); qed").clone()), + to_block: BlockId::Hash(blocks.last().expect("!block.is_empty(); qed").clone()), + address: Some(vec![service_contract]), + topics: vec![ + Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]), + None, + None, + None, + ], + limit: None, + }); + + self.data.tasks_queue.push(request_logs.into_iter() + .filter_map(|r| match r.entry.topics.len() { + 3 => Some(ServiceTask::GenerateServerKey( + r.entry.topics[1], + r.entry.topics[2], + )), + l @ _ => { + warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l); + None + }, + })); + } + + fn run_service_thread(data: Arc) { + loop { + let task = data.tasks_queue.wait(); + + match task { + ServiceTask::GenerateServerKey(server_key_id, threshold) => { + match Self::generate_server_key(&data, &server_key_id, &threshold) + .and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) { + Ok(_) => trace!(target: "secretstore", "GenerateServerKey({}, {}) request has completed", + server_key_id, threshold), + Err(error) => warn!(target: "secretstore", "GenerateServerKey({}, {}) request has failed with: {}", + server_key_id, threshold, error), + } + }, + ServiceTask::Shutdown => break, + } + } + } + + fn generate_server_key(data: &Arc, server_key_id: &ServerKeyId, threshold: &H256) -> Result { + let threshold_num = threshold.low_u64(); + if threshold != &threshold_num.into() || threshold_num >= ::std::usize::MAX as u64 { + return Err(format!("invalid threshold {:?}", threshold)); + } + + // TODO: if this server key is going to be used for document key generation later, author must + // be specified from outside + let author_key = Random.generate().map_err(|e| format!("{}", e))?; + let server_key_id_signature = sign(author_key.secret(), server_key_id).map_err(|e| format!("{}", e))?; + data.key_server.generate_key(server_key_id, &server_key_id_signature, threshold_num as usize) + .map_err(Into::into) + + } + + fn publish_server_key(data: &Arc, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> { + let server_key_hash = keccak(server_key); + let signed_server_key = data.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?; + let signed_server_key: Signature = signed_server_key.into_electrum().into(); + let transaction_data = data.contract.read().encode_server_key_generated_input(server_key_id.clone(), + server_key.to_vec(), + signed_server_key.v(), + signed_server_key.r().into(), + signed_server_key.s().into() + )?; + + let contract = data.contract.read(); + if contract.address != Default::default() { + if let Some(client) = data.client.upgrade() { + client.transact_contract( + contract.address.clone(), + transaction_data + ).map_err(|e| format!("{}", e))?; + } // else we will read this in the next refresh cycle + } + + Ok(()) + } +} + +impl Drop for ServiceContractListener { + fn drop(&mut self) { + if let Some(service_handle) = self.service_handle.take() { + self.data.tasks_queue.shutdown(); + // ignore error as we are already closing + let _ = service_handle.join(); + } + } } impl ChainNotify for ServiceContractListener { fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, _retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { if !enacted.is_empty() { - self.contract.lock().update(enacted) - } - } -} - -impl CachedContract { - pub fn new(client: &Arc, key_server: Arc, self_key_pair: Arc) -> Self { - CachedContract { - client: Arc::downgrade(client), - contract: SecretStoreService::new(Default::default()), // we aren't going to call contract => could use default address - contract_addr: client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()), - key_server: key_server, - self_key_pair: self_key_pair, - } - } - - pub fn update(&mut self, enacted: Vec) { - if let Some(client) = self.client.upgrade() { - // update contract address - self.contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()); - - // check for new key requests. - // NOTE: If contract is changed, or unregistered && there are several enacted blocks - // in single update call, some requests in old contract can be abandoned (we get contract_address from latest block) - // && check for requests in this contract for every enacted block. - // The opposite is also true (we can process requests of contract, before it actually becames a SS contract). - if let Some(contract_addr) = self.contract_addr.as_ref() { - // TODO: in case of reorgs we might process requests for free (maybe wait for several confirmations???) && publish keys without request - // TODO: in case of reorgs we might publish keys to forked branch (re-submit transaction???) - for block in enacted { - let request_logs = client.logs(Filter { - from_block: BlockId::Hash(block.clone()), - to_block: BlockId::Hash(block), - address: Some(vec![contract_addr.clone()]), - topics: vec![ - Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]), - None, - None, - None, - ], - limit: None, - }); - - // TODO: it actually should queue tasks to separate thread - // + separate thread at the beginning should read all requests from contract - // and then start processing logs - for request in request_logs { - // TODO: check if we are selected to process this request - let key_id = request.entry.topics[1]; - let key = Random.generate().unwrap(); - let signature = sign(key.secret(), &key_id).unwrap(); - let server_key = self.key_server.generate_key(&key_id, &signature, 0).unwrap(); -println!("=== generated key: {:?}", server_key); - // publish generated key - let server_key_hash = keccak(server_key); - let signed_key = self.self_key_pair.sign(&server_key_hash).unwrap(); - let transaction_data = self.contract.encode_server_key_generated_input(key_id, server_key.to_vec(), signed_key.v(), signed_key.r().into(), signed_key.s().into()).unwrap(); - client.transact_contract(contract_addr.clone(), transaction_data).unwrap(); + if let Some(client) = self.data.client.upgrade() { + if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) { + if self.data.contract.read().address != service_contract_addr { + *self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone()); } + self.process_service_contract_events(&*client, service_contract_addr, enacted); } } + + //self.contract.lock().update(enacted) } } } + +impl TasksQueue { + pub fn new() -> Self { + TasksQueue { + is_shutdown: AtomicBool::new(false), + service_event: Condvar::new(), + service_tasks: Mutex::new(VecDeque::new()), + } + } + + pub fn shutdown(&self) { + self.is_shutdown.store(true, Ordering::Release); + self.service_event.notify_all(); + } + + pub fn push(&self, tasks: I) where I: Iterator { + let mut service_tasks = self.service_tasks.lock(); + service_tasks.extend(tasks); + self.service_event.notify_all(); + } + + pub fn wait(&self) -> ServiceTask { + if self.is_shutdown.load(Ordering::Release) { + return ServiceTask::Shutdown; + } + + let mut service_tasks = self.service_tasks.lock(); + if service_tasks.is_empty() { + self.service_event.wait(&mut service_tasks); + if self.is_shutdown.load(Ordering::Release) { + return ServiceTask::Shutdown; + } + } + + service_tasks.pop_front() + .expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed") + } +}