Simple WebSockets notification about new request (#1202)

* Splitting methods requiring signing into separate trait

* Single place where RPC apis are created.

* Separating eth_filter

* Separating eth_signing

* Stubs for Personal Signer methods

* Test for EthSigningQueueClient

* TransactionConfirmation API

* Exposing PersonalSigner API

* Defining ApiSets dependent on context

* Removing types

* Supporting sending notification to WS connected SystemUIs

* Sending a notification on every new messages

* Adding logs to signing queue

* Shutting down broadcaster

* Refactoring the signing queue

* Fixing wait loop in case of spurious wake-ups.
This commit is contained in:
Tomasz Drwięga
2016-06-02 17:05:13 +02:00
committed by Gav Wood
parent 35753f22f7
commit 18dac64abb
9 changed files with 348 additions and 69 deletions

View File

@@ -30,12 +30,15 @@
//!
//! ```
//! extern crate ethcore_signer;
//! extern crate ethcore_rpc;
//!
//! use std::sync::Arc;
//! use ethcore_signer::ServerBuilder;
//! use ethcore_rpc::ConfirmationsQueue;
//!
//! fn main() {
//! let builder = ServerBuilder::new();
//! let _server = builder.start("127.0.0.1:8084".parse().unwrap()).unwrap();
//! let queue = Arc::new(ConfirmationsQueue::default());
//! let _server = ServerBuilder::new(queue).start("127.0.0.1:8084".parse().unwrap());
//! }
//! ```

View File

@@ -25,7 +25,7 @@ use std::sync::Arc;
use std::net::SocketAddr;
use util::panics::{PanicHandler, OnPanicListener, MayPanic};
use jsonrpc_core::{IoHandler, IoDelegate};
use rpc::Extendable;
use rpc::{Extendable, ConfirmationsQueue};
mod session;
@@ -49,15 +49,10 @@ impl From<ws::Error> for ServerError {
/// Builder for `WebSockets` server
pub struct ServerBuilder {
queue: Arc<ConfirmationsQueue>,
handler: Arc<IoHandler>,
}
impl Default for ServerBuilder {
fn default() -> Self {
ServerBuilder::new()
}
}
impl Extendable for ServerBuilder {
fn add_delegate<D: Send + Sync + 'static>(&self, delegate: IoDelegate<D>) {
self.handler.add_delegate(delegate);
@@ -66,30 +61,32 @@ impl Extendable for ServerBuilder {
impl ServerBuilder {
/// Creates new `ServerBuilder`
pub fn new() -> Self {
pub fn new(queue: Arc<ConfirmationsQueue>) -> Self {
ServerBuilder {
handler: Arc::new(IoHandler::new())
queue: queue,
handler: Arc::new(IoHandler::new()),
}
}
/// Starts a new `WebSocket` server in separate thread.
/// Returns a `Server` handle which closes the server when droped.
pub fn start(self, addr: SocketAddr) -> Result<Server, ServerError> {
Server::start(addr, self.handler)
Server::start(addr, self.handler, self.queue)
}
}
/// `WebSockets` server implementation.
pub struct Server {
handle: Option<thread::JoinHandle<ws::WebSocket<session::Factory>>>,
broadcaster: ws::Sender,
broadcaster_handle: Option<thread::JoinHandle<()>>,
queue: Arc<ConfirmationsQueue>,
panic_handler: Arc<PanicHandler>,
}
impl Server {
/// Starts a new `WebSocket` server in separate thread.
/// Returns a `Server` handle which closes the server when droped.
pub fn start(addr: SocketAddr, handler: Arc<IoHandler>) -> Result<Server, ServerError> {
fn start(addr: SocketAddr, handler: Arc<IoHandler>, queue: Arc<ConfirmationsQueue>) -> Result<Server, ServerError> {
let config = {
let mut config = ws::Settings::default();
config.max_connections = 5;
@@ -103,6 +100,7 @@ impl Server {
let panic_handler = PanicHandler::new_in_arc();
let ph = panic_handler.clone();
let broadcaster = ws.broadcaster();
// Spawn a thread with event loop
let handle = thread::spawn(move || {
ph.catch_panic(move || {
@@ -110,10 +108,24 @@ impl Server {
}).unwrap()
});
// Spawn a thread for broadcasting
let ph = panic_handler.clone();
let q = queue.clone();
let broadcaster_handle = thread::spawn(move || {
ph.catch_panic(move || {
q.start_listening(|_message| {
// TODO [ToDr] Some better structure here for messages.
broadcaster.send("new_message").unwrap();
}).expect("It's the only place we are running start_listening. It shouldn't fail.");
broadcaster.shutdown().expect("Broadcaster should close gently.")
}).unwrap()
});
// Return a handle
Ok(Server {
handle: Some(handle),
broadcaster: broadcaster,
broadcaster_handle: Some(broadcaster_handle),
queue: queue,
panic_handler: panic_handler,
})
}
@@ -127,7 +139,8 @@ impl MayPanic for Server {
impl Drop for Server {
fn drop(&mut self) {
self.broadcaster.shutdown().expect("WsServer should close nicely.");
self.queue.finish();
self.broadcaster_handle.take().unwrap().join().unwrap();
self.handle.take().unwrap().join().unwrap();
}
}