Adding logs to signing queue
This commit is contained in:
parent
84882922b4
commit
b4bc395c6e
@ -51,7 +51,7 @@ fn do_start(conf: Configuration, deps: Dependencies) -> SignerServer {
|
|||||||
});
|
});
|
||||||
|
|
||||||
let start_result = {
|
let start_result = {
|
||||||
let server = signer::ServerBuilder::new(daps.apis.signer_queue.clone());
|
let server = signer::ServerBuilder::new(deps.apis.signer_queue.clone());
|
||||||
let server = rpc_apis::setup_rpc(server, deps.apis, rpc_apis::ApiSet::SafeContext);
|
let server = rpc_apis::setup_rpc(server, deps.apis, rpc_apis::ApiSet::SafeContext);
|
||||||
server.start(addr)
|
server.start(addr)
|
||||||
};
|
};
|
||||||
|
@ -166,6 +166,7 @@ impl SigningQueue for ConfirmationsQueue {
|
|||||||
id: id,
|
id: id,
|
||||||
transaction: transaction,
|
transaction: transaction,
|
||||||
});
|
});
|
||||||
|
debug!(target: "own_tx", "Signer: New transaction ({:?}) in confirmation queue.", id);
|
||||||
}
|
}
|
||||||
// Notify listeners
|
// Notify listeners
|
||||||
self.notify(QueueMessage::NewRequest(id));
|
self.notify(QueueMessage::NewRequest(id));
|
||||||
@ -177,12 +178,14 @@ impl SigningQueue for ConfirmationsQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn request_rejected(&self, id: U256) -> Option<TransactionConfirmation> {
|
fn request_rejected(&self, id: U256) -> Option<TransactionConfirmation> {
|
||||||
|
debug!(target: "own_tx", "Signer: Transaction rejected ({:?}).", id);
|
||||||
let o = self.remove(id);
|
let o = self.remove(id);
|
||||||
self.update_status(id, QueueStatus::Rejected);
|
self.update_status(id, QueueStatus::Rejected);
|
||||||
o
|
o
|
||||||
}
|
}
|
||||||
|
|
||||||
fn request_confirmed(&self, id: U256, hash: H256) -> Option<TransactionConfirmation> {
|
fn request_confirmed(&self, id: U256, hash: H256) -> Option<TransactionConfirmation> {
|
||||||
|
debug!(target: "own_tx", "Signer: Transaction confirmed ({:?}).", id);
|
||||||
let o = self.remove(id);
|
let o = self.remove(id);
|
||||||
self.update_status(id, QueueStatus::Confirmed(hash));
|
self.update_status(id, QueueStatus::Confirmed(hash));
|
||||||
o
|
o
|
||||||
@ -213,6 +216,7 @@ impl SigningQueue for ConfirmationsQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!(target: "own_tx", "Signer: Awaiting transaction confirmation... ({:?}).", id);
|
||||||
// Now wait for a response
|
// Now wait for a response
|
||||||
let deadline = Instant::now() + Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC);
|
let deadline = Instant::now() + Duration::from_secs(QUEUE_TIMEOUT_DURATION_SEC);
|
||||||
while Instant::now() < deadline {
|
while Instant::now() < deadline {
|
||||||
@ -233,7 +237,9 @@ impl SigningQueue for ConfirmationsQueue {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// We reached the timeout. Just return `None`
|
// We reached the timeout. Just return `None` and make sure to remove waiting.
|
||||||
|
trace!(target: "own_tx", "Signer: Confirmation timeout reached... ({:?}).", id);
|
||||||
|
self.waiters.write().unwrap().remove(&id);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,11 +30,15 @@
|
|||||||
//!
|
//!
|
||||||
//! ```
|
//! ```
|
||||||
//! extern crate ethcore_signer;
|
//! extern crate ethcore_signer;
|
||||||
|
//! extern crate ethcore_rpc;
|
||||||
//!
|
//!
|
||||||
//! use ethcore_signer::Server;
|
//! use std::sync::Arc;
|
||||||
|
//! use ethcore_signer::ServerBuilder;
|
||||||
|
//! use ethcore_rpc::ConfirmationsQueue;
|
||||||
//!
|
//!
|
||||||
//! fn main() {
|
//! fn main() {
|
||||||
//! let _server = Server::start("127.0.0.1:8084".parse().unwrap());
|
//! let queue = Arc::new(ConfirmationsQueue::default());
|
||||||
|
//! let _server = ServerBuilder::new(queue).start("127.0.0.1:8084".parse().unwrap());
|
||||||
//! }
|
//! }
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
|
@ -71,30 +71,22 @@ impl ServerBuilder {
|
|||||||
/// Starts a new `WebSocket` server in separate thread.
|
/// Starts a new `WebSocket` server in separate thread.
|
||||||
/// Returns a `Server` handle which closes the server when droped.
|
/// Returns a `Server` handle which closes the server when droped.
|
||||||
pub fn start(self, addr: SocketAddr) -> Result<Server, ServerError> {
|
pub fn start(self, addr: SocketAddr) -> Result<Server, ServerError> {
|
||||||
Server::start(addr, self.handler).and_then(|(server, broadcaster)| {
|
Server::start(addr, self.handler, self.queue)
|
||||||
// Fire up queue notifications broadcasting
|
|
||||||
let queue = self.queue.clone();
|
|
||||||
thread::spawn(move || {
|
|
||||||
queue.start_listening(|_message| {
|
|
||||||
broadcaster.send("new_message").unwrap();
|
|
||||||
}).expect("It's the only place we are running start_listening. It shouldn't fail.");
|
|
||||||
}).expect("We should be able to create the thread");
|
|
||||||
|
|
||||||
Ok(server)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `WebSockets` server implementation.
|
/// `WebSockets` server implementation.
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
handle: Option<thread::JoinHandle<ws::WebSocket<session::Factory>>>,
|
handle: Option<thread::JoinHandle<ws::WebSocket<session::Factory>>>,
|
||||||
|
broadcaster_handle: Option<thread::JoinHandle<()>>,
|
||||||
|
queue: Arc<ConfirmationsQueue>,
|
||||||
panic_handler: Arc<PanicHandler>,
|
panic_handler: Arc<PanicHandler>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
/// Starts a new `WebSocket` server in separate thread.
|
/// Starts a new `WebSocket` server in separate thread.
|
||||||
/// Returns a `Server` handle which closes the server when droped.
|
/// Returns a `Server` handle which closes the server when droped.
|
||||||
fn start(addr: SocketAddr, handler: Arc<IoHandler>) -> Result<(Server, ws::Sender), ServerError> {
|
fn start(addr: SocketAddr, handler: Arc<IoHandler>, queue: Arc<ConfirmationsQueue>) -> Result<Server, ServerError> {
|
||||||
let config = {
|
let config = {
|
||||||
let mut config = ws::Settings::default();
|
let mut config = ws::Settings::default();
|
||||||
config.max_connections = 5;
|
config.max_connections = 5;
|
||||||
@ -108,6 +100,7 @@ impl Server {
|
|||||||
let panic_handler = PanicHandler::new_in_arc();
|
let panic_handler = PanicHandler::new_in_arc();
|
||||||
let ph = panic_handler.clone();
|
let ph = panic_handler.clone();
|
||||||
let broadcaster = ws.broadcaster();
|
let broadcaster = ws.broadcaster();
|
||||||
|
|
||||||
// Spawn a thread with event loop
|
// Spawn a thread with event loop
|
||||||
let handle = thread::spawn(move || {
|
let handle = thread::spawn(move || {
|
||||||
ph.catch_panic(move || {
|
ph.catch_panic(move || {
|
||||||
@ -115,11 +108,25 @@ impl Server {
|
|||||||
}).unwrap()
|
}).unwrap()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Spawn a thread for broadcasting
|
||||||
|
let ph = panic_handler.clone();
|
||||||
|
let q = queue.clone();
|
||||||
|
let broadcaster_handle = thread::spawn(move || {
|
||||||
|
ph.catch_panic(move || {
|
||||||
|
q.start_listening(|_message| {
|
||||||
|
// TODO [ToDr] Some better structure here for messages.
|
||||||
|
broadcaster.send("new_message").unwrap();
|
||||||
|
}).expect("It's the only place we are running start_listening. It shouldn't fail.")
|
||||||
|
}).unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
// Return a handle
|
// Return a handle
|
||||||
Ok((Server {
|
Ok(Server {
|
||||||
handle: Some(handle),
|
handle: Some(handle),
|
||||||
|
broadcaster_handle: Some(broadcaster_handle),
|
||||||
|
queue: queue,
|
||||||
panic_handler: panic_handler,
|
panic_handler: panic_handler,
|
||||||
}, broadcaster))
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,6 +138,8 @@ impl MayPanic for Server {
|
|||||||
|
|
||||||
impl Drop for Server {
|
impl Drop for Server {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
self.queue.finish();
|
||||||
|
self.broadcaster_handle.take().unwrap().join().unwrap();
|
||||||
self.handle.take().unwrap().join().unwrap();
|
self.handle.take().unwrap().join().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user