SecretStore: cleaning up service contract listener

This commit is contained in:
Svyatoslav Nikolsky 2017-11-17 13:37:01 +03:00
parent 56875a83b3
commit 6618827d1a
3 changed files with 200 additions and 79 deletions

View File

@ -1,3 +1,3 @@
[
{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"}
{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"threshold","type":"uint256"}],"name":"generateServerKey","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[],"name":"serverKeyGenerationFee","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"serverKeyId","type":"bytes32"},{"name":"serverKeyPublic","type":"bytes"},{"name":"v","type":"uint8"},{"name":"r","type":"bytes32"},{"name":"s","type":"bytes32"}],"name":"serverKeyGenerated","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"}],"name":"ServerKeyRequested","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"serverKeyId","type":"bytes32"},{"indexed":false,"name":"serverKeyPublic","type":"bytes"}],"name":"ServerKeyGenerated","type":"event"}
]

View File

@ -287,7 +287,7 @@ mod tests {
fn http_listener_successfully_drops() {
let key_server = Arc::new(DummyKeyServer);
let address = NodeAddress { address: "127.0.0.1".into(), port: 9000 };
let listener = KeyServerHttpListener::start(Some(address), key_server).unwrap();
let listener = KeyServerHttpListener::start(address, key_server).unwrap();
drop(listener);
}

View File

@ -14,17 +14,20 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::VecDeque;
use std::sync::{Arc, Weak};
use parking_lot::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use parking_lot::{RwLock, Mutex, Condvar};
use ethcore::filter::Filter;
use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify};
use native_contracts::SecretStoreService;
use ethkey::{Random, Generator, sign};
use ethkey::{Random, Generator, Public, Signature, sign};
use bytes::Bytes;
use hash::keccak;
use bigint::hash::H256;
use util::Address;
use {NodeKeyPair, KeyServer};
use {ServerKeyId, NodeKeyPair, KeyServer};
/// Name of the SecretStore contract in the registry.
const SERVICE_CONTRACT_REGISTRY_NAME: &'static str = "secretstore_service";
@ -36,103 +39,221 @@ lazy_static! {
static ref SERVER_KEY_REQUESTED_EVENT_NAME_HASH: H256 = keccak(SERVER_KEY_REQUESTED_EVENT_NAME);
}
/// SecretStore <-> Authority connector. Duties:
/// 1. Listen for new requests on SecretStore contract
/// 2. Redirects requests for key server
/// 3. Publishes response on SecretStore contract
/// SecretStore <-> Authority connector responsible for:
/// 1. listening for new requests on SecretStore contract
/// 2. redirecting requests to key server
/// 3. publishing response on SecretStore contract
pub struct ServiceContractListener {
/// Cached on-chain contract.
contract: Mutex<CachedContract>,
/// Service contract listener data.
data: Arc<ServiceContractListenerData>,
/// Service thread handle.
service_handle: Option<thread::JoinHandle<()>>,
}
/// Cached on-chain Key Server set contract.
struct CachedContract {
/// Service contract listener data.
struct ServiceContractListenerData {
/// Contract (currently used for parameters encoding only).
pub contract: RwLock<SecretStoreService>,
/// Blockchain client.
client: Weak<Client>,
/// Contract.
contract: SecretStoreService,
/// Contract address.
contract_addr: Option<Address>,
pub client: Weak<Client>,
/// Key server reference.
key_server: Arc<KeyServer>,
pub key_server: Arc<KeyServer>,
/// This node key pair.
self_key_pair: Arc<NodeKeyPair>,
pub self_key_pair: Arc<NodeKeyPair>,
/// Service tasks queue.
pub tasks_queue: Arc<TasksQueue>,
}
/// Service tasks queue.
struct TasksQueue {
/// Are we closing currently.
is_shutdown: AtomicBool,
/// Service event.
service_event: Condvar,
/// Service tasks queue.
service_tasks: Mutex<VecDeque<ServiceTask>>,
}
/// Service task.
enum ServiceTask {
/// Generate server key (server_key_id, threshold).
GenerateServerKey(H256, H256),
/// Shutdown listener.
Shutdown,
}
impl ServiceContractListener {
pub fn new(client: &Arc<Client>, key_server: Arc<KeyServer>, self_key_pair: Arc<NodeKeyPair>) -> Arc<ServiceContractListener> {
let contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()).unwrap_or_default();
let data = Arc::new(ServiceContractListenerData {
contract: RwLock::new(SecretStoreService::new(contract_addr)),
client: Arc::downgrade(client),
key_server: key_server,
self_key_pair: self_key_pair,
tasks_queue: Arc::new(TasksQueue::new()),
});
let service_thread_data = data.clone();
let service_handle = thread::spawn(move || Self::run_service_thread(service_thread_data));
let contract = Arc::new(ServiceContractListener {
contract: Mutex::new(CachedContract::new(client, key_server, self_key_pair)),
data: data,
service_handle: Some(service_handle),
});
client.add_notify(contract.clone());
contract
}
fn process_service_contract_events(&self, client: &Client, service_contract: Address, blocks: Vec<H256>) {
debug_assert!(!blocks.is_empty());
// TODO: is blocks guaranteed to be ordered here?
// TODO: logs() is called from notify() thread - is it ok?
let request_logs = client.logs(Filter {
from_block: BlockId::Hash(blocks.first().expect("!block.is_empty(); qed").clone()),
to_block: BlockId::Hash(blocks.last().expect("!block.is_empty(); qed").clone()),
address: Some(vec![service_contract]),
topics: vec![
Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]),
None,
None,
None,
],
limit: None,
});
self.data.tasks_queue.push(request_logs.into_iter()
.filter_map(|r| match r.entry.topics.len() {
3 => Some(ServiceTask::GenerateServerKey(
r.entry.topics[1],
r.entry.topics[2],
)),
l @ _ => {
warn!(target: "secretstore", "Ignoring ServerKeyRequested event with wrong number of params {}", l);
None
},
}));
}
fn run_service_thread(data: Arc<ServiceContractListenerData>) {
loop {
let task = data.tasks_queue.wait();
match task {
ServiceTask::GenerateServerKey(server_key_id, threshold) => {
match Self::generate_server_key(&data, &server_key_id, &threshold)
.and_then(|server_key| Self::publish_server_key(&data, &server_key_id, &server_key)) {
Ok(_) => trace!(target: "secretstore", "GenerateServerKey({}, {}) request has completed",
server_key_id, threshold),
Err(error) => warn!(target: "secretstore", "GenerateServerKey({}, {}) request has failed with: {}",
server_key_id, threshold, error),
}
},
ServiceTask::Shutdown => break,
}
}
}
fn generate_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId, threshold: &H256) -> Result<Public, String> {
let threshold_num = threshold.low_u64();
if threshold != &threshold_num.into() || threshold_num >= ::std::usize::MAX as u64 {
return Err(format!("invalid threshold {:?}", threshold));
}
// TODO: if this server key is going to be used for document key generation later, author must
// be specified from outside
let author_key = Random.generate().map_err(|e| format!("{}", e))?;
let server_key_id_signature = sign(author_key.secret(), server_key_id).map_err(|e| format!("{}", e))?;
data.key_server.generate_key(server_key_id, &server_key_id_signature, threshold_num as usize)
.map_err(Into::into)
}
fn publish_server_key(data: &Arc<ServiceContractListenerData>, server_key_id: &ServerKeyId, server_key: &Public) -> Result<(), String> {
let server_key_hash = keccak(server_key);
let signed_server_key = data.self_key_pair.sign(&server_key_hash).map_err(|e| format!("{}", e))?;
let signed_server_key: Signature = signed_server_key.into_electrum().into();
let transaction_data = data.contract.read().encode_server_key_generated_input(server_key_id.clone(),
server_key.to_vec(),
signed_server_key.v(),
signed_server_key.r().into(),
signed_server_key.s().into()
)?;
let contract = data.contract.read();
if contract.address != Default::default() {
if let Some(client) = data.client.upgrade() {
client.transact_contract(
contract.address.clone(),
transaction_data
).map_err(|e| format!("{}", e))?;
} // else we will read this in the next refresh cycle
}
Ok(())
}
}
impl Drop for ServiceContractListener {
fn drop(&mut self) {
if let Some(service_handle) = self.service_handle.take() {
self.data.tasks_queue.shutdown();
// ignore error as we are already closing
let _ = service_handle.join();
}
}
}
impl ChainNotify for ServiceContractListener {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: u64) {
if !enacted.is_empty() {
self.contract.lock().update(enacted)
}
}
}
impl CachedContract {
pub fn new(client: &Arc<Client>, key_server: Arc<KeyServer>, self_key_pair: Arc<NodeKeyPair>) -> Self {
CachedContract {
client: Arc::downgrade(client),
contract: SecretStoreService::new(Default::default()), // we aren't going to call contract => could use default address
contract_addr: client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()),
key_server: key_server,
self_key_pair: self_key_pair,
}
}
pub fn update(&mut self, enacted: Vec<H256>) {
if let Some(client) = self.client.upgrade() {
// update contract address
self.contract_addr = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned());
// check for new key requests.
// NOTE: If contract is changed, or unregistered && there are several enacted blocks
// in single update call, some requests in old contract can be abandoned (we get contract_address from latest block)
// && check for requests in this contract for every enacted block.
// The opposite is also true (we can process requests of contract, before it actually becames a SS contract).
if let Some(contract_addr) = self.contract_addr.as_ref() {
// TODO: in case of reorgs we might process requests for free (maybe wait for several confirmations???) && publish keys without request
// TODO: in case of reorgs we might publish keys to forked branch (re-submit transaction???)
for block in enacted {
let request_logs = client.logs(Filter {
from_block: BlockId::Hash(block.clone()),
to_block: BlockId::Hash(block),
address: Some(vec![contract_addr.clone()]),
topics: vec![
Some(vec![*SERVER_KEY_REQUESTED_EVENT_NAME_HASH]),
None,
None,
None,
],
limit: None,
});
// TODO: it actually should queue tasks to separate thread
// + separate thread at the beginning should read all requests from contract
// and then start processing logs
for request in request_logs {
// TODO: check if we are selected to process this request
let key_id = request.entry.topics[1];
let key = Random.generate().unwrap();
let signature = sign(key.secret(), &key_id).unwrap();
let server_key = self.key_server.generate_key(&key_id, &signature, 0).unwrap();
println!("=== generated key: {:?}", server_key);
// publish generated key
let server_key_hash = keccak(server_key);
let signed_key = self.self_key_pair.sign(&server_key_hash).unwrap();
let transaction_data = self.contract.encode_server_key_generated_input(key_id, server_key.to_vec(), signed_key.v(), signed_key.r().into(), signed_key.s().into()).unwrap();
client.transact_contract(contract_addr.clone(), transaction_data).unwrap();
if let Some(client) = self.data.client.upgrade() {
if let Some(service_contract_addr) = client.registry_address(SERVICE_CONTRACT_REGISTRY_NAME.to_owned()) {
if self.data.contract.read().address != service_contract_addr {
*self.data.contract.write() = SecretStoreService::new(service_contract_addr.clone());
}
self.process_service_contract_events(&*client, service_contract_addr, enacted);
}
}
//self.contract.lock().update(enacted)
}
}
}
impl TasksQueue {
pub fn new() -> Self {
TasksQueue {
is_shutdown: AtomicBool::new(false),
service_event: Condvar::new(),
service_tasks: Mutex::new(VecDeque::new()),
}
}
pub fn shutdown(&self) {
self.is_shutdown.store(true, Ordering::Release);
self.service_event.notify_all();
}
pub fn push<I>(&self, tasks: I) where I: Iterator<Item=ServiceTask> {
let mut service_tasks = self.service_tasks.lock();
service_tasks.extend(tasks);
self.service_event.notify_all();
}
pub fn wait(&self) -> ServiceTask {
if self.is_shutdown.load(Ordering::Release) {
return ServiceTask::Shutdown;
}
let mut service_tasks = self.service_tasks.lock();
if service_tasks.is_empty() {
self.service_event.wait(&mut service_tasks);
if self.is_shutdown.load(Ordering::Release) {
return ServiceTask::Shutdown;
}
}
service_tasks.pop_front()
.expect("service_event is only fired when there are new tasks or is_shutdown == true; is_shutdown == false; qed")
}
}