diff --git a/Cargo.lock b/Cargo.lock index c3922dc48..fd4cc2e5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -678,6 +678,7 @@ dependencies = [ "ethcore-util 1.9.0", "ethcrypto 0.1.0", "ethkey 0.2.0", + "ethsync 1.8.0", "futures 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "hash 0.1.0", diff --git a/ethcore/native_contracts/res/secretstore_service.json b/ethcore/native_contracts/res/secretstore_service.json index 0189691a1..48d9adaa7 100644 --- a/ethcore/native_contracts/res/secretstore_service.json +++ b/ethcore/native_contracts/res/secretstore_service.json @@ -1,3 +1 @@ -[ - {"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"threshold","type":"uint256"}],"name":"generateServerKey","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[],"name":"serverKeyGenerationFee","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"}],"name":"ServerKeyRequested","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":false,"name":"serverKeyPublic","type":"bytes"}],"name":"ServerKeyGenerated","type":"event"} -] \ No newline at end of file +[{"constant":true,"inputs":[],"name":"serverKeyGenerationRequestsCount","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"authority","type":"address"},{"name":"index","type":"uint256"}],"name":"getServerKeyGenerationRequest","outputs":[{"name":"","type":"bytes32"},{"name":"","type":"uint256"},{"name":"","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"threshold","type":"uint256"}],"name":"generateServerKey","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[],"name":"serverKeyGenerationFee","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[],"name":"drain","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":true,"name":"threshold","type":"uint256"}],"name":"ServerKeyRequested","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":false,"name":"serverKeyPublic","type":"bytes"}],"name":"ServerKeyGenerated","type":"event"}] \ No newline at end of file diff --git a/parity/run.rs b/parity/run.rs index 13c3575d9..fcad975c8 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -785,6 +785,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R // secret store key server let secretstore_deps = secretstore::Dependencies { client: client.clone(), + sync: sync_provider.clone(), account_provider: account_provider, accounts_passwords: &passwords, }; diff --git a/parity/secretstore.rs b/parity/secretstore.rs index 416c9d547..7e36ef5e0 100644 --- a/parity/secretstore.rs +++ b/parity/secretstore.rs @@ -20,6 +20,7 @@ use dir::default_data_path; use ethcore::account_provider::AccountProvider; use ethcore::client::Client; use ethkey::{Secret, Public}; +use ethsync::SyncProvider; use helpers::replace_home; use util::Address; @@ -63,6 +64,8 @@ pub struct Configuration { pub struct Dependencies<'a> { /// Blockchain client. pub client: Arc, + /// Sync provider. + pub sync: Arc, /// Account provider. pub account_provider: Arc, /// Passed accounts passwords. @@ -153,7 +156,7 @@ mod server { cconf.cluster_config.nodes.insert(self_secret.public().clone(), cconf.cluster_config.listener_address.clone()); - let key_server = ethcore_secretstore::start(deps.client, self_secret, cconf) + let key_server = ethcore_secretstore::start(deps.client, deps.sync, self_secret, cconf) .map_err(|e| format!("Error starting KeyServer {}: {}", key_server_name, e))?; Ok(KeyServer { diff --git a/secret_store/Cargo.toml b/secret_store/Cargo.toml index 8357b530b..7f8d0bf8c 100644 --- a/secret_store/Cargo.toml +++ b/secret_store/Cargo.toml @@ -27,6 +27,7 @@ ethcore-bytes = { path = "../util/bytes" } ethcore-devtools = { path = "../devtools" } ethcore-util = { path = "../util" } ethcore-bigint = { path = "../util/bigint" } +ethsync = { path = "../sync" } kvdb = { path = "../util/kvdb" } kvdb-rocksdb = { path = "../util/kvdb-rocksdb" } hash = { path = "../util/hash" } diff --git a/secret_store/src/lib.rs b/secret_store/src/lib.rs index c93c3c5dd..e71cf2adc 100644 --- a/secret_store/src/lib.rs +++ b/secret_store/src/lib.rs @@ -44,6 +44,7 @@ extern crate ethcore_bigint as bigint; extern crate ethcore_logger as logger; extern crate ethcrypto; extern crate ethkey; +extern crate ethsync; extern crate native_contracts; extern crate hash; extern crate kvdb; @@ -63,6 +64,7 @@ mod listener; use std::sync::Arc; use ethcore::client::Client; +use ethsync::SyncProvider; pub use types::all::{ServerKeyId, EncryptedDocumentKey, RequestSignature, Public, Error, NodeAddress, ServiceConfiguration, ClusterConfiguration}; @@ -70,20 +72,20 @@ pub use traits::{NodeKeyPair, KeyServer}; pub use self::node_key_pair::{PlainNodeKeyPair, KeyStoreNodeKeyPair}; /// Start new key server instance -pub fn start(client: Arc, self_key_pair: Arc, config: ServiceConfiguration) -> Result, Error> { +pub fn start(client: Arc, sync: Arc, self_key_pair: Arc, config: ServiceConfiguration) -> Result, Error> { let acl_storage: Arc = if config.acl_check_enabled { - acl_storage::OnChainAclStorage::new(&client) + acl_storage::OnChainAclStorage::new(&client/*, &sync*/) // TODO: return false until fully synced } else { Arc::new(acl_storage::DummyAclStorage::default()) }; - let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, config.cluster_config.nodes.clone())?; + let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, /*&sync, */config.cluster_config.nodes.clone())?; // TODO: return empty set until fully synced let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?); - let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set, self_key_pair.clone(), acl_storage, key_storage)?); + let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage, key_storage)?); let http_listener = match config.listener_address { Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, key_server.clone())?), None => None, }; - let contract_listener = listener::service_contract_listener::ServiceContractListener::new(&client, key_server.clone(), self_key_pair); + let contract_listener = listener::service_contract_listener::ServiceContractListener::new(&client, &sync, key_server.clone(), self_key_pair, key_server_set); let listener = listener::Listener::new(key_server, http_listener, Some(contract_listener)); Ok(Box::new(listener)) } diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index ae3ec0736..884c6d260 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -14,26 +14,40 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::collections::VecDeque; +use std::collections::{VecDeque, HashSet}; use std::sync::{Arc, Weak}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; +use futures::{future, Future}; use parking_lot::{RwLock, Mutex, Condvar}; use ethcore::filter::Filter; use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; +use ethkey::{Random, Generator, Public, Signature, sign, public_to_address}; +use ethsync::SyncProvider; use native_contracts::SecretStoreService; -use ethkey::{Random, Generator, Public, Signature, sign}; use bytes::Bytes; use hash::keccak; use bigint::hash::H256; +use bigint::prelude::U256; use util::Address; +use key_server_set::KeyServerSet; use {ServerKeyId, NodeKeyPair, KeyServer}; /// Name of the SecretStore contract in the registry. const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service"; /// Key server has been added to the set. -const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32)"; +const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32,uint256)"; + +/// Retry interval (in blocks). Every RETRY_INTEVAL_BLOCKS blocks each KeyServer reads pending requests from +/// service contract && tries to re-execute. The reason to have this mechanism is primarily because keys +/// servers set change takes a lot of time + there could be some races, when blocks are coming to different +/// KS at different times. This isn't intended to fix && respond to general session errors! +const RETRY_INTEVAL_BLOCKS: usize = 30; + +/// Max failed retry requests (in single retry interval). The reason behind this constant is that if several +/// pending requests have failed, then most probably other will fail too. +const MAX_FAILED_RETRY_REQUESTS: usize = 1; lazy_static! { static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME); @@ -52,22 +66,35 @@ pub struct ServiceContractListener { /// Service contract listener data. struct ServiceContractListenerData { - /// Contract (currently used for parameters encoding only). + /// Blocks since last retry. + pub last_retry: AtomicUsize, + /// Retry-related data. + pub retry_data: Mutex, + /// Contract. pub contract: RwLock, /// Blockchain client. pub client: Weak, + /// Sync provider. + pub sync: Weak, /// Key server reference. pub key_server: Arc, /// This node key pair. pub self_key_pair: Arc, + /// Key servers set. + pub key_servers_set: Arc, /// Service tasks queue. pub tasks_queue: Arc, } +/// Retry-related data. +#[derive(Default)] +struct ServiceContractRetryData { + /// Server keys, which we have generated (or tried to generate) since last retry moment. + pub generated_keys: HashSet, +} + /// Service tasks queue. struct TasksQueue { - /// Are we closing currently. - is_shutdown: AtomicBool, /// Service event. service_event: Condvar, /// Service tasks queue. @@ -75,7 +102,10 @@ struct TasksQueue { } /// Service task. +#[derive(Debug)] enum ServiceTask { + /// Retry all 'stalled' tasks. + Retry, /// Generate server key (server_key_id, threshold). GenerateServerKey(H256, H256), /// Shutdown listener. @@ -83,16 +113,32 @@ enum ServiceTask { } impl ServiceContractListener { - pub fn new(client: &Arc, key_server: Arc, self_key_pair: Arc) -> Arc { - let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()).unwrap_or_default(); + pub fn new(client: &Arc, sync: &Arc, key_server: Arc, self_key_pair: Arc, key_servers_set: Arc) -> Arc { + let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) + .map(|a| { + trace!(target: "secretstore", "Installing service contract from address {}", a); + a + }) + .unwrap_or_default(); + + let is_syncing = sync.status().is_syncing(client.queue_info()); let data = Arc::new(ServiceContractListenerData { + last_retry: AtomicUsize::new(0), + retry_data: Default::default(), contract: RwLock::new(SecretStoreService::new(contract_addr)), client: Arc::downgrade(client), + sync: Arc::downgrade(sync), key_server: key_server, self_key_pair: self_key_pair, + key_servers_set: key_servers_set, tasks_queue: Arc::new(TasksQueue::new()), }); + // retry on restart + if !is_syncing { + data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); + } + let service_thread_data = data.clone(); let service_handle = thread::spawn(move || Self::run_service_thread(service_thread_data)); let contract = Arc::new(ServiceContractListener { @@ -107,7 +153,8 @@ impl ServiceContractListener { debug_assert!(!blocks.is_empty()); // TODO: is blocks guaranteed to be ordered here? - // TODO: logs() is called from notify() thread - is it ok? + // TODO: logs() is called from notify() thread - is it ok (doesn't 'logs')? + // read server key generation requests let request_logs = client.logs(Filter { from_block: BlockId::Hash(blocks.first().expect("!block.is_empty(); qed").clone()), to_block: BlockId::Hash(blocks.last().expect("!block.is_empty(); qed").clone()), @@ -121,12 +168,16 @@ impl ServiceContractListener { limit: None, }); + // schedule correct requests if they're intended to be processed by this KeyServer self.data.tasks_queue.push(request_logs.into_iter() .filter_map(|r| match r.entry.topics.len() { - 3 => Some(ServiceTask::GenerateServerKey( - r.entry.topics[1], - r.entry.topics[2], - )), + 3 if is_processed_by_this_key_server(&*self.data.key_servers_set, &*self.data.self_key_pair, &r.entry.topics[1]) => { + Some(ServiceTask::GenerateServerKey( + r.entry.topics[1], + r.entry.topics[2], + )) + }, + 3 => None, l @ _ => { warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l); None @@ -137,20 +188,106 @@ impl ServiceContractListener { fn run_service_thread(data: Arc) { loop { let task = data.tasks_queue.wait(); + trace!(target: "secretstore", "Processing {:?} task", task); match task { - ServiceTask::GenerateServerKey(server_key_id, threshold) => { - match Self::generate_server_key(&data, &server_key_id, &threshold) - .and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) { - Ok(_) => trace!(target: "secretstore", "GenerateServerKey({}, {}) request has completed", - server_key_id, threshold), - Err(error) => warn!(target: "secretstore", "GenerateServerKey({}, {}) request has failed with: {}", - server_key_id, threshold, error), + ServiceTask::Shutdown => break, + task @ _ => { + // the only possible reaction to an error is a trace && it is already happened + let _ = Self::process_service_task(&data, task); + }, + }; + } + } + + fn process_service_task(data: &Arc, task: ServiceTask) -> Result<(), String> { + match task { + ServiceTask::Retry => + Self::retry_pending_requests(&data) + .map(|processed_requests| { + if processed_requests != 0 { + trace!(target: "secretstore", "Successfully retried {} pending requests", + processed_requests); + } + () + }) + .map_err(|error| { + warn!(target: "secretstore", "Retrying pending requests has failed with: {}", + error); + error + }), + ServiceTask::GenerateServerKey(server_key_id, threshold) => { + data.retry_data.lock().generated_keys.insert(server_key_id.clone()); + Self::generate_server_key(&data, &server_key_id, &threshold) + .and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) + .map(|_| { + trace!(target: "secretstore", "GenerateServerKey({}, {}) request has completed", + server_key_id, threshold); + () + }) + .map_err(|error| { + warn!(target: "secretstore", "GenerateServerKey({}, {}) request has failed with: {}", + server_key_id, threshold, error); + error + }) + }, + ServiceTask::Shutdown => unreachable!("it must be checked outside"), + } + } + + fn retry_pending_requests(data: &Arc) -> Result { + let client = data.client.upgrade().ok_or("client is required".to_owned())?; + let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default()); + let contract = data.contract.read(); + + // it is only possible when contract address is set + if contract.address == Default::default() { + return Ok(0); + } + + let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d)); + let generate_server_key_requests_count = contract.server_key_generation_requests_count(&do_call).wait()?; + let mut generate_server_key_request_index = 0.into(); + let mut failed_requests = 0; + let mut processed_requests = 0; + loop { + if generate_server_key_request_index >= generate_server_key_requests_count { + break; + } + + // read request from the contract + let (server_key_id, threshold, is_confirmed) = contract.get_server_key_generation_request(&do_call, + public_to_address(data.self_key_pair.public()), + generate_server_key_request_index).wait()?; + generate_server_key_request_index = generate_server_key_request_index + 1.into(); + + // only process requests, which we haven't confirmed yet + if is_confirmed { + continue; + } + // only process request, which haven't been processed recently + // there could be a lag when we've just generated server key && retrying on the same block + // (or before our tx is mined) - state is not updated yet + if retry_data.generated_keys.contains(&server_key_id){ + continue; + } + // only process requests that are intended to be processed by this server + if !is_processed_by_this_key_server(&*data.key_servers_set, &*data.self_key_pair, &server_key_id) { + continue; + } + + // process request + match Self::process_service_task(data, ServiceTask::GenerateServerKey(server_key_id, threshold.into())) { + Ok(_) => processed_requests += 1, + Err(_) => { + failed_requests += 1; + if failed_requests > MAX_FAILED_RETRY_REQUESTS { + return Err("too many failed requests".into()); } }, - ServiceTask::Shutdown => break, } } + Ok(processed_requests) } fn generate_server_key(data: &Arc, server_key_id: &ServerKeyId, threshold: &H256) -> Result { @@ -159,6 +296,7 @@ impl ServiceContractListener { return Err(format!("invalid threshold {:?}", threshold)); } + // TODO: check if key is already generated // TODO: if this server key is going to be used for document key generation later, author must // be specified from outside let author_key = Random.generate().map_err(|e| format!("{}", e))?; @@ -205,17 +343,32 @@ impl Drop for ServiceContractListener { impl ChainNotify for ServiceContractListener { fn new_blocks(&self, _imported: Vec, _invalid: Vec, enacted: Vec, _retracted: Vec, _sealed: Vec, _proposed: Vec, _duration: u64) { - if !enacted.is_empty() { - if let Some(client) = self.data.client.upgrade() { + let enacted_len = enacted.len(); + if enacted_len != 0 { + if let (Some(client), Some(sync)) = (self.data.client.upgrade(), self.data.sync.upgrade()) { + // do nothing until synced + if sync.status().is_syncing(client.queue_info()) { + return; + } + + // update contract address from registry if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) { if self.data.contract.read().address != service_contract_addr { + trace!(target: "secretstore", "Installing service contract from address {}", service_contract_addr); *self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone()); } + + // and process contract events self.process_service_contract_events(&*client, service_contract_addr, enacted); } - } - //self.contract.lock().update(enacted) + // schedule retry if received enough blocks since last retry + // it maybe inaccurate when switching syncing/synced states, but that's ok + if self.data.last_retry.fetch_add(enacted_len, Ordering::AcqRel) >= RETRY_INTEVAL_BLOCKS { + self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry)); + self.data.last_retry.store(0, Ordering::AcqRel); + } + } } } } @@ -223,37 +376,203 @@ impl ChainNotify for ServiceContractListener { impl TasksQueue { pub fn new() -> Self { TasksQueue { - is_shutdown: AtomicBool::new(false), service_event: Condvar::new(), service_tasks: Mutex::new(VecDeque::new()), } } pub fn shutdown(&self) { - self.is_shutdown.store(true, Ordering::Release); + let mut service_tasks = self.service_tasks.lock(); + service_tasks.push_front(ServiceTask::Shutdown); self.service_event.notify_all(); } pub fn push(&self, tasks: I) where I: Iterator { let mut service_tasks = self.service_tasks.lock(); service_tasks.extend(tasks); - self.service_event.notify_all(); + if !service_tasks.is_empty() { + self.service_event.notify_all(); + } } pub fn wait(&self) -> ServiceTask { - if self.is_shutdown.load(Ordering::Release) { - return ServiceTask::Shutdown; - } - let mut service_tasks = self.service_tasks.lock(); if service_tasks.is_empty() { self.service_event.wait(&mut service_tasks); - if self.is_shutdown.load(Ordering::Release) { - return ServiceTask::Shutdown; - } } service_tasks.pop_front() .expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed") } } + +/// Returns true when session, related to `server_key_id` must be started on this KeyServer. +fn is_processed_by_this_key_server(key_servers_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool { + let servers = key_servers_set.get(); + let total_servers_count = servers.len(); + if total_servers_count == 0 { + return false; + } + let this_server_index = match servers.keys().enumerate().find(|&(_, s)| s == self_key_pair.public()) { + Some((index, _)) => index, + None => return false, + }; + + let server_key_id_value: U256 = server_key_id.into(); + let range_interval = U256::max_value() / total_servers_count.into(); + let range_begin = (range_interval + 1.into()) * this_server_index.into(); + let range_end = range_begin.saturating_add(range_interval); + + server_key_id_value >= range_begin && server_key_id_value <= range_end +} + +#[cfg(test)] +mod tests { + use ethkey::{Random, Generator, KeyPair}; + use key_server_set::tests::MapKeyServerSet; + use PlainNodeKeyPair; + use super::is_processed_by_this_key_server; + + #[test] + fn is_not_processed_by_this_key_server_with_zero_servers() { + assert_eq!(is_processed_by_this_key_server( + &MapKeyServerSet::default(), + &PlainNodeKeyPair::new(Random.generate().unwrap()), + &Default::default()), false); + } + + #[test] + fn is_not_processed_by_this_key_server_when_not_a_part_of_servers_set() { + assert_eq!(is_processed_by_this_key_server( + &MapKeyServerSet::new(vec![ + (Random.generate().unwrap().public().clone(), "127.0.0.1:8080".parse().unwrap()) + ].into_iter().collect()), + &PlainNodeKeyPair::new(Random.generate().unwrap()), + &Default::default()), false); + } + + #[test] + fn is_processed_by_this_key_server_in_set_of_3() { + // servers set is ordered && server range depends on index of this server + let servers_set = MapKeyServerSet::new(vec![ + // secret: 0000000000000000000000000000000000000000000000000000000000000001 + ("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + // secret: 0000000000000000000000000000000000000000000000000000000000000002 + ("c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee51ae168fea63dc339a3c58419466ceaeef7f632653266d0e1236431a950cfe52a".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + // secret: 0000000000000000000000000000000000000000000000000000000000000003 + ("f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9388f7b0f632de8140fe337e62a37f3566500a99934c2231b6cb9fd7584b8e672".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + ].into_iter().collect()); + + // 1st server: process hashes [0x0; 0x555...555] + let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( + "0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap()); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"0000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"3000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"5555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"5555555555555555555555555555555555555555555555555555555555555556".parse().unwrap()), false); + + // 2nd server: process hashes from 0x555...556 to 0xaaa...aab + let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( + "0000000000000000000000000000000000000000000000000000000000000002".parse().unwrap()).unwrap()); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"5555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), false); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"5555555555555555555555555555555555555555555555555555555555555556".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"7555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), false); + + // 3rd server: process hashes from 0x800...000 to 0xbff...ff + let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( + "0000000000000000000000000000000000000000000000000000000000000003".parse().unwrap()).unwrap()); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab".parse().unwrap()), false); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"daaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); + } + + #[test] + fn is_processed_by_this_key_server_in_set_of_4() { + // servers set is ordered && server range depends on index of this server + let servers_set = MapKeyServerSet::new(vec![ + // secret: 0000000000000000000000000000000000000000000000000000000000000001 + ("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + // secret: 0000000000000000000000000000000000000000000000000000000000000002 + ("c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee51ae168fea63dc339a3c58419466ceaeef7f632653266d0e1236431a950cfe52a".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + // secret: 0000000000000000000000000000000000000000000000000000000000000004 + ("e493dbf1c10d80f3581e4904930b1404cc6c13900ee0758474fa94abe8c4cd1351ed993ea0d455b75642e2098ea51448d967ae33bfbdfe40cfe97bdc47739922".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + // secret: 0000000000000000000000000000000000000000000000000000000000000003 + ("f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9388f7b0f632de8140fe337e62a37f3566500a99934c2231b6cb9fd7584b8e672".parse().unwrap(), + "127.0.0.1:8080".parse().unwrap()), + ].into_iter().collect()); + + // 1st server: process hashes [0x0; 0x3ff...ff] + let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( + "0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap()); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"0000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"2000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"4000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false); + + // 2nd server: process hashes from 0x400...000 to 0x7ff...ff + let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( + "0000000000000000000000000000000000000000000000000000000000000002".parse().unwrap()).unwrap()); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"4000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"6000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"8000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false); + + // 3rd server: process hashes from 0x800...000 to 0xbff...ff + let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( + "0000000000000000000000000000000000000000000000000000000000000004".parse().unwrap()).unwrap()); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"8000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"a000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"bfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"c000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false); + + // 4th server: process hashes from 0xc00...000 to 0xfff...ff + let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret( + "0000000000000000000000000000000000000000000000000000000000000003".parse().unwrap()).unwrap()); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"bfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"c000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"e000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true); + assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair, + &"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true); + } +}