Sync channel for consensus test

This commit is contained in:
arkpar
2016-12-11 12:32:01 +01:00
parent f3af0f46be
commit c777362d02
11 changed files with 232 additions and 168 deletions

View File

@@ -329,11 +329,18 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
}
}
#[derive(Clone)]
enum Handlers<Message> where Message: Send + Clone {
SharedCollection(Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>),
Single(Weak<IoHandler<Message>>),
}
/// Allows sending messages into the event loop. All the IO handlers will get the message
/// in the `message` callback.
pub struct IoChannel<Message> where Message: Send + Clone{
channel: Option<Sender<IoMessage<Message>>>,
handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
handlers: Handlers<Message>,
}
impl<Message> Clone for IoChannel<Message> where Message: Send + Clone + Sync + 'static {
@@ -348,19 +355,29 @@ impl<Message> Clone for IoChannel<Message> where Message: Send + Clone + Sync +
impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
/// Send a message through the channel
pub fn send(&self, message: Message) -> Result<(), IoError> {
if let Some(ref channel) = self.channel {
try!(channel.send(IoMessage::UserMessage(message)));
match self.channel {
Some(ref channel) => try!(channel.send(IoMessage::UserMessage(message))),
None => try!(self.send_sync(message))
}
Ok(())
}
/// Send a message through the channel and handle it synchronously
pub fn send_sync(&self, message: Message) -> Result<(), IoError> {
if let Some(handlers) = self.handlers.upgrade() {
for id in 0 .. MAX_HANDLERS {
if let Some(h) = handlers.read().get(id) {
let handler = h.clone();
handler.message(&IoContext::new(self.clone(), id), &message);
match self.handlers {
Handlers::SharedCollection(ref handlers) => {
if let Some(handlers) = handlers.upgrade() {
for id in 0 .. MAX_HANDLERS {
if let Some(h) = handlers.read().get(id) {
let handler = h.clone();
handler.message(&IoContext::new(self.clone(), id), &message);
}
}
}
},
Handlers::Single(ref handler) => {
if let Some(handler) = handler.upgrade() {
handler.message(&IoContext::new(self.clone(), 0), &message);
}
}
}
@@ -378,14 +395,21 @@ impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
pub fn disconnected() -> IoChannel<Message> {
IoChannel {
channel: None,
handlers: Weak::default(),
handlers: Handlers::SharedCollection(Weak::default()),
}
}
/// Create a new synchronous channel to a given handler.
pub fn to_handler(handler: Weak<IoHandler<Message>>) -> IoChannel<Message> {
IoChannel {
channel: None,
handlers: Handlers::Single(handler),
}
}
fn new(channel: Sender<IoMessage<Message>>, handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>) -> IoChannel<Message> {
IoChannel {
channel: Some(channel),
handlers: handlers,
handlers: Handlers::SharedCollection(handlers),
}
}
}