SecretStore: mapping requests to KeyServer + requests retry

This commit is contained in:
Svyatoslav Nikolsky 2017-11-20 15:18:31 +03:00
parent 6618827d1a
commit 3945a29ee6
7 changed files with 370 additions and 45 deletions

1
Cargo.lock generated
View File

@ -678,6 +678,7 @@ dependencies = [
"ethcore-util 1.9.0", "ethcore-util 1.9.0",
"ethcrypto 0.1.0", "ethcrypto 0.1.0",
"ethkey 0.2.0", "ethkey 0.2.0",
"ethsync 1.8.0",
"futures 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"hash 0.1.0", "hash 0.1.0",

View File

@ -1,3 +1 @@
[ [{"constant":true,"inputs":[],"name":"serverKeyGenerationRequestsCount","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"authority","type":"address"},{"name":"index","type":"uint256"}],"name":"getServerKeyGenerationRequest","outputs":[{"name":"","type":"bytes32"},{"name":"","type":"uint256"},{"name":"","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"threshold","type":"uint256"}],"name":"generateServerKey","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[],"name":"serverKeyGenerationFee","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[],"name":"drain","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":true,"name":"threshold","type":"uint256"}],"name":"ServerKeyRequested","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":false,"name":"serverKeyPublic","type":"bytes"}],"name":"ServerKeyGenerated","type":"event"}]
{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"threshold","type":"uint256"}],"name":"generateServerKey","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[],"name":"serverKeyGenerationFee","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"}],"name":"ServerKeyRequested","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":false,"name":"serverKeyPublic","type":"bytes"}],"name":"ServerKeyGenerated","type":"event"}
]

View File

@ -785,6 +785,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
// secret store key server // secret store key server
let secretstore_deps = secretstore::Dependencies { let secretstore_deps = secretstore::Dependencies {
client: client.clone(), client: client.clone(),
sync: sync_provider.clone(),
account_provider: account_provider, account_provider: account_provider,
accounts_passwords: &passwords, accounts_passwords: &passwords,
}; };

View File

@ -20,6 +20,7 @@ use dir::default_data_path;
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::client::Client; use ethcore::client::Client;
use ethkey::{Secret, Public}; use ethkey::{Secret, Public};
use ethsync::SyncProvider;
use helpers::replace_home; use helpers::replace_home;
use util::Address; use util::Address;
@ -63,6 +64,8 @@ pub struct Configuration {
pub struct Dependencies<'a> { pub struct Dependencies<'a> {
/// Blockchain client. /// Blockchain client.
pub client: Arc<Client>, pub client: Arc<Client>,
/// Sync provider.
pub sync: Arc<SyncProvider>,
/// Account provider. /// Account provider.
pub account_provider: Arc<AccountProvider>, pub account_provider: Arc<AccountProvider>,
/// Passed accounts passwords. /// Passed accounts passwords.
@ -153,7 +156,7 @@ mod server {
cconf.cluster_config.nodes.insert(self_secret.public().clone(), cconf.cluster_config.listener_address.clone()); cconf.cluster_config.nodes.insert(self_secret.public().clone(), cconf.cluster_config.listener_address.clone());
let key_server = ethcore_secretstore::start(deps.client, self_secret, cconf) let key_server = ethcore_secretstore::start(deps.client, deps.sync, self_secret, cconf)
.map_err(|e| format!("Error starting KeyServer {}: {}", key_server_name, e))?; .map_err(|e| format!("Error starting KeyServer {}: {}", key_server_name, e))?;
Ok(KeyServer { Ok(KeyServer {

View File

@ -27,6 +27,7 @@ ethcore-bytes = { path = "../util/bytes" }
ethcore-devtools = { path = "../devtools" } ethcore-devtools = { path = "../devtools" }
ethcore-util = { path = "../util" } ethcore-util = { path = "../util" }
ethcore-bigint = { path = "../util/bigint" } ethcore-bigint = { path = "../util/bigint" }
ethsync = { path = "../sync" }
kvdb = { path = "../util/kvdb" } kvdb = { path = "../util/kvdb" }
kvdb-rocksdb = { path = "../util/kvdb-rocksdb" } kvdb-rocksdb = { path = "../util/kvdb-rocksdb" }
hash = { path = "../util/hash" } hash = { path = "../util/hash" }

View File

@ -44,6 +44,7 @@ extern crate ethcore_bigint as bigint;
extern crate ethcore_logger as logger; extern crate ethcore_logger as logger;
extern crate ethcrypto; extern crate ethcrypto;
extern crate ethkey; extern crate ethkey;
extern crate ethsync;
extern crate native_contracts; extern crate native_contracts;
extern crate hash; extern crate hash;
extern crate kvdb; extern crate kvdb;
@ -63,6 +64,7 @@ mod listener;
use std::sync::Arc; use std::sync::Arc;
use ethcore::client::Client; use ethcore::client::Client;
use ethsync::SyncProvider;
pub use types::all::{ServerKeyId, EncryptedDocumentKey, RequestSignature, Public, pub use types::all::{ServerKeyId, EncryptedDocumentKey, RequestSignature, Public,
Error, NodeAddress, ServiceConfiguration, ClusterConfiguration}; Error, NodeAddress, ServiceConfiguration, ClusterConfiguration};
@ -70,20 +72,20 @@ pub use traits::{NodeKeyPair, KeyServer};
pub use self::node_key_pair::{PlainNodeKeyPair, KeyStoreNodeKeyPair}; pub use self::node_key_pair::{PlainNodeKeyPair, KeyStoreNodeKeyPair};
/// Start new key server instance /// Start new key server instance
pub fn start(client: Arc<Client>, self_key_pair: Arc<NodeKeyPair>, config: ServiceConfiguration) -> Result<Box<KeyServer>, Error> { pub fn start(client: Arc<Client>, sync: Arc<SyncProvider>, self_key_pair: Arc<NodeKeyPair>, config: ServiceConfiguration) -> Result<Box<KeyServer>, Error> {
let acl_storage: Arc<acl_storage::AclStorage> = if config.acl_check_enabled { let acl_storage: Arc<acl_storage::AclStorage> = if config.acl_check_enabled {
acl_storage::OnChainAclStorage::new(&client) acl_storage::OnChainAclStorage::new(&client/*, &sync*/) // TODO: return false until fully synced
} else { } else {
Arc::new(acl_storage::DummyAclStorage::default()) Arc::new(acl_storage::DummyAclStorage::default())
}; };
let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, config.cluster_config.nodes.clone())?; let key_server_set = key_server_set::OnChainKeyServerSet::new(&client, /*&sync, */config.cluster_config.nodes.clone())?; // TODO: return empty set until fully synced
let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?); let key_storage = Arc::new(key_storage::PersistentKeyStorage::new(&config)?);
let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set, self_key_pair.clone(), acl_storage, key_storage)?); let key_server = Arc::new(key_server::KeyServerImpl::new(&config.cluster_config, key_server_set.clone(), self_key_pair.clone(), acl_storage, key_storage)?);
let http_listener = match config.listener_address { let http_listener = match config.listener_address {
Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, key_server.clone())?), Some(listener_address) => Some(listener::http_listener::KeyServerHttpListener::start(listener_address, key_server.clone())?),
None => None, None => None,
}; };
let contract_listener = listener::service_contract_listener::ServiceContractListener::new(&client, key_server.clone(), self_key_pair); let contract_listener = listener::service_contract_listener::ServiceContractListener::new(&client, &sync, key_server.clone(), self_key_pair, key_server_set);
let listener = listener::Listener::new(key_server, http_listener, Some(contract_listener)); let listener = listener::Listener::new(key_server, http_listener, Some(contract_listener));
Ok(Box::new(listener)) Ok(Box::new(listener))
} }

View File

@ -14,26 +14,40 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::VecDeque; use std::collections::{VecDeque, HashSet};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread; use std::thread;
use futures::{future, Future};
use parking_lot::{RwLock, Mutex, Condvar}; use parking_lot::{RwLock, Mutex, Condvar};
use ethcore::filter::Filter; use ethcore::filter::Filter;
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify}; use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify};
use ethkey::{Random, Generator, Public, Signature, sign, public_to_address};
use ethsync::SyncProvider;
use native_contracts::SecretStoreService; use native_contracts::SecretStoreService;
use ethkey::{Random, Generator, Public, Signature, sign};
use bytes::Bytes; use bytes::Bytes;
use hash::keccak; use hash::keccak;
use bigint::hash::H256; use bigint::hash::H256;
use bigint::prelude::U256;
use util::Address; use util::Address;
use key_server_set::KeyServerSet;
use {ServerKeyId, NodeKeyPair, KeyServer}; use {ServerKeyId, NodeKeyPair, KeyServer};
/// Name of the SecretStore contract in the registry. /// Name of the SecretStore contract in the registry.
const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service"; const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service";
/// Key server has been added to the set. /// Key server has been added to the set.
const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32)"; const SERVER_KEY_REQUESTED_EVENT_NAME: &'static [u8] = &*b"ServerKeyRequested(bytes32,uint256)";
/// Retry interval (in blocks). Every RETRY_INTEVAL_BLOCKS blocks each KeyServer reads pending requests from
/// service contract && tries to re-execute. The reason to have this mechanism is primarily because keys
/// servers set change takes a lot of time + there could be some races, when blocks are coming to different
/// KS at different times. This isn't intended to fix && respond to general session errors!
const RETRY_INTEVAL_BLOCKS: usize = 30;
/// Max failed retry requests (in single retry interval). The reason behind this constant is that if several
/// pending requests have failed, then most probably other will fail too.
const MAX_FAILED_RETRY_REQUESTS: usize = 1;
lazy_static! { lazy_static! {
static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME); static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME);
@ -52,22 +66,35 @@ pub struct ServiceContractListener {
/// Service contract listener data. /// Service contract listener data.
struct ServiceContractListenerData { struct ServiceContractListenerData {
/// Contract (currently used for parameters encoding only). /// Blocks since last retry.
pub last_retry: AtomicUsize,
/// Retry-related data.
pub retry_data: Mutex<ServiceContractRetryData>,
/// Contract.
pub contract: RwLock<SecretStoreService>, pub contract: RwLock<SecretStoreService>,
/// Blockchain client. /// Blockchain client.
pub client: Weak<Client>, pub client: Weak<Client>,
/// Sync provider.
pub sync: Weak<SyncProvider>,
/// Key server reference. /// Key server reference.
pub key_server: Arc<KeyServer>, pub key_server: Arc<KeyServer>,
/// This node key pair. /// This node key pair.
pub self_key_pair: Arc<NodeKeyPair>, pub self_key_pair: Arc<NodeKeyPair>,
/// Key servers set.
pub key_servers_set: Arc<KeyServerSet>,
/// Service tasks queue. /// Service tasks queue.
pub tasks_queue: Arc<TasksQueue>, pub tasks_queue: Arc<TasksQueue>,
} }
/// Retry-related data.
#[derive(Default)]
struct ServiceContractRetryData {
/// Server keys, which we have generated (or tried to generate) since last retry moment.
pub generated_keys: HashSet<ServerKeyId>,
}
/// Service tasks queue. /// Service tasks queue.
struct TasksQueue { struct TasksQueue {
/// Are we closing currently.
is_shutdown: AtomicBool,
/// Service event. /// Service event.
service_event: Condvar, service_event: Condvar,
/// Service tasks queue. /// Service tasks queue.
@ -75,7 +102,10 @@ struct TasksQueue {
} }
/// Service task. /// Service task.
#[derive(Debug)]
enum ServiceTask { enum ServiceTask {
/// Retry all 'stalled' tasks.
Retry,
/// Generate server key (server_key_id, threshold). /// Generate server key (server_key_id, threshold).
GenerateServerKey(H256, H256), GenerateServerKey(H256, H256),
/// Shutdown listener. /// Shutdown listener.
@ -83,16 +113,32 @@ enum ServiceTask {
} }
impl ServiceContractListener { impl ServiceContractListener {
pub fn new(client: &Arc<Client>, key_server: Arc<KeyServer>, self_key_pair: Arc<NodeKeyPair>) -> Arc<ServiceContractListener> { pub fn new(client: &Arc<Client>, sync: &Arc<SyncProvider>, key_server: Arc<KeyServer>, self_key_pair: Arc<NodeKeyPair>, key_servers_set: Arc<KeyServerSet>) -> Arc<ServiceContractListener> {
let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()).unwrap_or_default(); let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned())
.map(|a| {
trace!(target: "secretstore", "Installing service contract from address {}", a);
a
})
.unwrap_or_default();
let is_syncing = sync.status().is_syncing(client.queue_info());
let data = Arc::new(ServiceContractListenerData { let data = Arc::new(ServiceContractListenerData {
last_retry: AtomicUsize::new(0),
retry_data: Default::default(),
contract: RwLock::new(SecretStoreService::new(contract_addr)), contract: RwLock::new(SecretStoreService::new(contract_addr)),
client: Arc::downgrade(client), client: Arc::downgrade(client),
sync: Arc::downgrade(sync),
key_server: key_server, key_server: key_server,
self_key_pair: self_key_pair, self_key_pair: self_key_pair,
key_servers_set: key_servers_set,
tasks_queue: Arc::new(TasksQueue::new()), tasks_queue: Arc::new(TasksQueue::new()),
}); });
// retry on restart
if !is_syncing {
data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
}
let service_thread_data = data.clone(); let service_thread_data = data.clone();
let service_handle = thread::spawn(move || Self::run_service_thread(service_thread_data)); let service_handle = thread::spawn(move || Self::run_service_thread(service_thread_data));
let contract = Arc::new(ServiceContractListener { let contract = Arc::new(ServiceContractListener {
@ -107,7 +153,8 @@ impl ServiceContractListener {
debug_assert!(!blocks.is_empty()); debug_assert!(!blocks.is_empty());
// TODO: is blocks guaranteed to be ordered here? // TODO: is blocks guaranteed to be ordered here?
// TODO: logs() is called from notify() thread - is it ok? // TODO: logs() is called from notify() thread - is it ok (doesn't 'logs')?
// read server key generation requests
let request_logs = client.logs(Filter { let request_logs = client.logs(Filter {
from_block: BlockId::Hash(blocks.first().expect("!block.is_empty(); qed").clone()), from_block: BlockId::Hash(blocks.first().expect("!block.is_empty(); qed").clone()),
to_block: BlockId::Hash(blocks.last().expect("!block.is_empty(); qed").clone()), to_block: BlockId::Hash(blocks.last().expect("!block.is_empty(); qed").clone()),
@ -121,12 +168,16 @@ impl ServiceContractListener {
limit: None, limit: None,
}); });
// schedule correct requests if they're intended to be processed by this KeyServer
self.data.tasks_queue.push(request_logs.into_iter() self.data.tasks_queue.push(request_logs.into_iter()
.filter_map(|r| match r.entry.topics.len() { .filter_map(|r| match r.entry.topics.len() {
3 => Some(ServiceTask::GenerateServerKey( 3 if is_processed_by_this_key_server(&*self.data.key_servers_set, &*self.data.self_key_pair, &r.entry.topics[1]) => {
Some(ServiceTask::GenerateServerKey(
r.entry.topics[1], r.entry.topics[1],
r.entry.topics[2], r.entry.topics[2],
)), ))
},
3 => None,
l @ _ => { l @ _ => {
warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l); warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l);
None None
@ -137,20 +188,106 @@ impl ServiceContractListener {
fn run_service_thread(data: Arc<ServiceContractListenerData>) { fn run_service_thread(data: Arc<ServiceContractListenerData>) {
loop { loop {
let task = data.tasks_queue.wait(); let task = data.tasks_queue.wait();
trace!(target: "secretstore", "Processing {:?} task", task);
match task { match task {
ServiceTask::Shutdown => break,
task @ _ => {
// the only possible reaction to an error is a trace && it is already happened
let _ = Self::process_service_task(&data, task);
},
};
}
}
fn process_service_task(data: &Arc<ServiceContractListenerData>, task: ServiceTask) -> Result<(), String> {
match task {
ServiceTask::Retry =>
Self::retry_pending_requests(&data)
.map(|processed_requests| {
if processed_requests != 0 {
trace!(target: "secretstore", "Successfully retried {} pending requests",
processed_requests);
}
()
})
.map_err(|error| {
warn!(target: "secretstore", "Retrying pending requests has failed with: {}",
error);
error
}),
ServiceTask::GenerateServerKey(server_key_id, threshold) => { ServiceTask::GenerateServerKey(server_key_id, threshold) => {
match Self::generate_server_key(&data, &server_key_id, &threshold) data.retry_data.lock().generated_keys.insert(server_key_id.clone());
.and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) { Self::generate_server_key(&data, &server_key_id, &threshold)
Ok(_) => trace!(target: "secretstore", "GenerateServerKey({}, {}) request has completed", .and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key))
server_key_id, threshold), .map(|_| {
Err(error) => warn!(target: "secretstore", "GenerateServerKey({}, {}) request has failed with: {}", trace!(target: "secretstore", "GenerateServerKey({}, {}) request has completed",
server_key_id, threshold, error), server_key_id, threshold);
()
})
.map_err(|error| {
warn!(target: "secretstore", "GenerateServerKey({}, {}) request has failed with: {}",
server_key_id, threshold, error);
error
})
},
ServiceTask::Shutdown => unreachable!("it must be checked outside"),
}
}
fn retry_pending_requests(data: &Arc<ServiceContractListenerData>) -> Result<usize, String> {
let client = data.client.upgrade().ok_or("client is required".to_owned())?;
let retry_data = ::std::mem::replace(&mut *data.retry_data.lock(), Default::default());
let contract = data.contract.read();
// it is only possible when contract address is set
if contract.address == Default::default() {
return Ok(0);
}
let do_call = |a, d| future::done(client.call_contract(BlockId::Latest, a, d));
let generate_server_key_requests_count = contract.server_key_generation_requests_count(&do_call).wait()?;
let mut generate_server_key_request_index = 0.into();
let mut failed_requests = 0;
let mut processed_requests = 0;
loop {
if generate_server_key_request_index >= generate_server_key_requests_count {
break;
}
// read request from the contract
let (server_key_id, threshold, is_confirmed) = contract.get_server_key_generation_request(&do_call,
public_to_address(data.self_key_pair.public()),
generate_server_key_request_index).wait()?;
generate_server_key_request_index = generate_server_key_request_index + 1.into();
// only process requests, which we haven't confirmed yet
if is_confirmed {
continue;
}
// only process request, which haven't been processed recently
// there could be a lag when we've just generated server key && retrying on the same block
// (or before our tx is mined) - state is not updated yet
if retry_data.generated_keys.contains(&server_key_id){
continue;
}
// only process requests that are intended to be processed by this server
if !is_processed_by_this_key_server(&*data.key_servers_set, &*data.self_key_pair, &server_key_id) {
continue;
}
// process request
match Self::process_service_task(data, ServiceTask::GenerateServerKey(server_key_id, threshold.into())) {
Ok(_) => processed_requests += 1,
Err(_) => {
failed_requests += 1;
if failed_requests > MAX_FAILED_RETRY_REQUESTS {
return Err("too many failed requests".into());
} }
}, },
ServiceTask::Shutdown => break,
} }
} }
Ok(processed_requests)
} }
fn generate_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId, threshold: &H256) -> Result<Public, String> { fn generate_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId, threshold: &H256) -> Result<Public, String> {
@ -159,6 +296,7 @@ impl ServiceContractListener {
return Err(format!("invalid threshold {:?}", threshold)); return Err(format!("invalid threshold {:?}", threshold));
} }
// TODO: check if key is already generated
// TODO: if this server key is going to be used for document key generation later, author must // TODO: if this server key is going to be used for document key generation later, author must
// be specified from outside // be specified from outside
let author_key = Random.generate().map_err(|e| format!("{}", e))?; let author_key = Random.generate().map_err(|e| format!("{}", e))?;
@ -205,17 +343,32 @@ impl Drop for ServiceContractListener {
impl ChainNotify for ServiceContractListener { impl ChainNotify for ServiceContractListener {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) { fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
if !enacted.is_empty() { let enacted_len = enacted.len();
if let Some(client) = self.data.client.upgrade() { if enacted_len != 0 {
if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) { if let (Some(client), Some(sync)) = (self.data.client.upgrade(), self.data.sync.upgrade()) {
if self.data.contract.read().address != service_contract_addr { // do nothing until synced
*self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone()); if sync.status().is_syncing(client.queue_info()) {
} return;
self.process_service_contract_events(&*client, service_contract_addr, enacted);
}
} }
//self.contract.lock().update(enacted) // update contract address from registry
if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) {
if self.data.contract.read().address != service_contract_addr {
trace!(target: "secretstore", "Installing service contract from address {}", service_contract_addr);
*self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone());
}
// and process contract events
self.process_service_contract_events(&*client, service_contract_addr, enacted);
}
// schedule retry if received enough blocks since last retry
// it maybe inaccurate when switching syncing/synced states, but that's ok
if self.data.last_retry.fetch_add(enacted_len, Ordering::AcqRel) >= RETRY_INTEVAL_BLOCKS {
self.data.tasks_queue.push(::std::iter::once(ServiceTask::Retry));
self.data.last_retry.store(0, Ordering::AcqRel);
}
}
} }
} }
} }
@ -223,37 +376,203 @@ impl ChainNotify for ServiceContractListener {
impl TasksQueue { impl TasksQueue {
pub fn new() -> Self { pub fn new() -> Self {
TasksQueue { TasksQueue {
is_shutdown: AtomicBool::new(false),
service_event: Condvar::new(), service_event: Condvar::new(),
service_tasks: Mutex::new(VecDeque::new()), service_tasks: Mutex::new(VecDeque::new()),
} }
} }
pub fn shutdown(&self) { pub fn shutdown(&self) {
self.is_shutdown.store(true, Ordering::Release); let mut service_tasks = self.service_tasks.lock();
service_tasks.push_front(ServiceTask::Shutdown);
self.service_event.notify_all(); self.service_event.notify_all();
} }
pub fn push<I>(&self, tasks: I) where I: Iterator<Item=ServiceTask> { pub fn push<I>(&self, tasks: I) where I: Iterator<Item=ServiceTask> {
let mut service_tasks = self.service_tasks.lock(); let mut service_tasks = self.service_tasks.lock();
service_tasks.extend(tasks); service_tasks.extend(tasks);
if !service_tasks.is_empty() {
self.service_event.notify_all(); self.service_event.notify_all();
} }
}
pub fn wait(&self) -> ServiceTask { pub fn wait(&self) -> ServiceTask {
if self.is_shutdown.load(Ordering::Release) {
return ServiceTask::Shutdown;
}
let mut service_tasks = self.service_tasks.lock(); let mut service_tasks = self.service_tasks.lock();
if service_tasks.is_empty() { if service_tasks.is_empty() {
self.service_event.wait(&mut service_tasks); self.service_event.wait(&mut service_tasks);
if self.is_shutdown.load(Ordering::Release) {
return ServiceTask::Shutdown;
}
} }
service_tasks.pop_front() service_tasks.pop_front()
.expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed") .expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed")
} }
} }
/// Returns true when session, related to `server_key_id` must be started on this KeyServer.
fn is_processed_by_this_key_server(key_servers_set: &KeyServerSet, self_key_pair: &NodeKeyPair, server_key_id: &H256) -> bool {
let servers = key_servers_set.get();
let total_servers_count = servers.len();
if total_servers_count == 0 {
return false;
}
let this_server_index = match servers.keys().enumerate().find(|&(_, s)| s == self_key_pair.public()) {
Some((index, _)) => index,
None => return false,
};
let server_key_id_value: U256 = server_key_id.into();
let range_interval = U256::max_value() / total_servers_count.into();
let range_begin = (range_interval + 1.into()) * this_server_index.into();
let range_end = range_begin.saturating_add(range_interval);
server_key_id_value >= range_begin && server_key_id_value <= range_end
}
#[cfg(test)]
mod tests {
use ethkey::{Random, Generator, KeyPair};
use key_server_set::tests::MapKeyServerSet;
use PlainNodeKeyPair;
use super::is_processed_by_this_key_server;
#[test]
fn is_not_processed_by_this_key_server_with_zero_servers() {
assert_eq!(is_processed_by_this_key_server(
&MapKeyServerSet::default(),
&PlainNodeKeyPair::new(Random.generate().unwrap()),
&Default::default()), false);
}
#[test]
fn is_not_processed_by_this_key_server_when_not_a_part_of_servers_set() {
assert_eq!(is_processed_by_this_key_server(
&MapKeyServerSet::new(vec![
(Random.generate().unwrap().public().clone(), "127.0.0.1:8080".parse().unwrap())
].into_iter().collect()),
&PlainNodeKeyPair::new(Random.generate().unwrap()),
&Default::default()), false);
}
#[test]
fn is_processed_by_this_key_server_in_set_of_3() {
// servers set is ordered && server range depends on index of this server
let servers_set = MapKeyServerSet::new(vec![
// secret: 0000000000000000000000000000000000000000000000000000000000000001
("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
// secret: 0000000000000000000000000000000000000000000000000000000000000002
("c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee51ae168fea63dc339a3c58419466ceaeef7f632653266d0e1236431a950cfe52a".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
// secret: 0000000000000000000000000000000000000000000000000000000000000003
("f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9388f7b0f632de8140fe337e62a37f3566500a99934c2231b6cb9fd7584b8e672".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
].into_iter().collect());
// 1st server: process hashes [0x0; 0x555...555]
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"0000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"3000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"5555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"5555555555555555555555555555555555555555555555555555555555555556".parse().unwrap()), false);
// 2nd server: process hashes from 0x555...556 to 0xaaa...aab
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000002".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"5555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"5555555555555555555555555555555555555555555555555555555555555556".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"7555555555555555555555555555555555555555555555555555555555555555".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), false);
// 3rd server: process hashes from 0x800...000 to 0xbff...ff
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000003".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"daaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
}
#[test]
fn is_processed_by_this_key_server_in_set_of_4() {
// servers set is ordered && server range depends on index of this server
let servers_set = MapKeyServerSet::new(vec![
// secret: 0000000000000000000000000000000000000000000000000000000000000001
("79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
// secret: 0000000000000000000000000000000000000000000000000000000000000002
("c6047f9441ed7d6d3045406e95c07cd85c778e4b8cef3ca7abac09b95c709ee51ae168fea63dc339a3c58419466ceaeef7f632653266d0e1236431a950cfe52a".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
// secret: 0000000000000000000000000000000000000000000000000000000000000004
("e493dbf1c10d80f3581e4904930b1404cc6c13900ee0758474fa94abe8c4cd1351ed993ea0d455b75642e2098ea51448d967ae33bfbdfe40cfe97bdc47739922".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
// secret: 0000000000000000000000000000000000000000000000000000000000000003
("f9308a019258c31049344f85f89d5229b531c845836f99b08601f113bce036f9388f7b0f632de8140fe337e62a37f3566500a99934c2231b6cb9fd7584b8e672".parse().unwrap(),
"127.0.0.1:8080".parse().unwrap()),
].into_iter().collect());
// 1st server: process hashes [0x0; 0x3ff...ff]
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000001".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"0000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"2000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"4000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false);
// 2nd server: process hashes from 0x400...000 to 0x7ff...ff
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000002".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"4000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"6000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"8000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false);
// 3rd server: process hashes from 0x800...000 to 0xbff...ff
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000004".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"8000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"a000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"bfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"c000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), false);
// 4th server: process hashes from 0xc00...000 to 0xfff...ff
let key_pair = PlainNodeKeyPair::new(KeyPair::from_secret(
"0000000000000000000000000000000000000000000000000000000000000003".parse().unwrap()).unwrap());
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"bfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), false);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"c000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"e000000000000000000000000000000000000000000000000000000000000000".parse().unwrap()), true);
assert_eq!(is_processed_by_this_key_server(&servers_set, &key_pair,
&"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff".parse().unwrap()), true);
}
}