diff --git a/Cargo.toml b/Cargo.toml index cea775ef3..0583aa78f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ time = "0.1" #interpolate_idents = { git = "https://github.com/SkylerLipthay/interpolate_idents" } evmjit = { path = "rust-evmjit", optional = true } ethash = { path = "ethash" } +num_cpus = "0.2" [features] jit = ["evmjit"] diff --git a/src/block.rs b/src/block.rs index d149d6132..e5ca18e8c 100644 --- a/src/block.rs +++ b/src/block.rs @@ -1,6 +1,7 @@ use common::*; use engine::*; use state::*; +use verification::PreVerifiedBlock; /// A transaction/receipt execution entry. pub struct Entry { @@ -263,30 +264,39 @@ impl IsBlock for SealedBlock { fn block(&self) -> &Block { &self.block } } -/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header -pub fn enact<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { +/// Enact the block given by block header, transactions and uncles +pub fn enact<'x, 'y>(header: &Header, transactions: &[Transaction], uncles: &[Header], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { { - let header = BlockView::new(block_bytes).header_view(); let s = State::from_existing(db.clone(), parent.state_root().clone(), engine.account_start_nonce()); trace!("enact(): root={}, author={}, author_balance={}\n", s.root(), header.author(), s.balance(&header.author())); } - let block = BlockView::new(block_bytes); - let header = block.header_view(); - let mut b = OpenBlock::new(engine, db, parent, last_hashes, header.author(), header.extra_data()); - b.set_difficulty(header.difficulty()); - b.set_gas_limit(header.gas_limit()); + let mut b = OpenBlock::new(engine, db, parent, last_hashes, header.author().clone(), header.extra_data().clone()); + b.set_difficulty(*header.difficulty()); + b.set_gas_limit(*header.gas_limit()); b.set_timestamp(header.timestamp()); -// info!("enact: Enacting #{}. env_info={:?}", header.number(), b.env_info()); - for t in block.transactions().into_iter() { try!(b.push_transaction(t, None)); } - for u in block.uncles().into_iter() { try!(b.push_uncle(u)); } + for t in transactions { try!(b.push_transaction(t.clone(), None)); } + for u in uncles { try!(b.push_uncle(u.clone())); } Ok(b.close()) } +/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header +pub fn enact_bytes<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { + let block = BlockView::new(block_bytes); + let header = block.header(); + enact(&header, &block.transactions(), &block.uncles(), engine, db, parent, last_hashes) +} + +/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header +pub fn enact_verified<'x, 'y>(block: &PreVerifiedBlock, engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { + let view = BlockView::new(&block.bytes); + enact(&block.header, &block.transactions, &view.uncles(), engine, db, parent, last_hashes) +} + /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header. Seal the block aferwards pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: OverlayDB, parent: &Header, last_hashes: &LastHashes) -> Result { let header = BlockView::new(block_bytes).header_view(); - Ok(try!(try!(enact(block_bytes, engine, db, parent, last_hashes)).seal(header.seal()))) + Ok(try!(try!(enact_bytes(block_bytes, engine, db, parent, last_hashes)).seal(header.seal()))) } #[test] diff --git a/src/client.rs b/src/client.rs index 3ee84ccd7..e02ab37d8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -88,7 +88,7 @@ pub trait BlockChainClient : Sync + Send { fn block_receipts(&self, hash: &H256) -> Option; /// Import a block into the blockchain. - fn import_block(&mut self, byte: &[u8]) -> ImportResult; + fn import_block(&mut self, bytes: Bytes) -> ImportResult; /// Get block queue information. fn queue_status(&self) -> BlockQueueStatus; @@ -152,58 +152,75 @@ impl Client { } /// This is triggered by a message coming from a block queue when the block is ready for insertion - pub fn import_verified_block(&mut self, bytes: Bytes) { - let block = BlockView::new(&bytes); - let header = block.header(); - if let Err(e) = verify_block_family(&header, &bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { - warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.queue.mark_as_bad(&header.hash()); + pub fn import_verified_blocks(&mut self) { + + let mut bad = HashSet::new(); + let blocks = self.queue.drain(128); + if blocks.is_empty() { return; - }; - let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) { - Some(p) => p, - None => { - warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); + } + + for block in blocks { + if bad.contains(&block.header.parent_hash) { + self.queue.mark_as_bad(&block.header.hash()); + bad.insert(block.header.hash()); + continue; + } + + let header = &block.header; + if let Err(e) = verify_block_family(&header, &block.bytes, self.engine.deref().deref(), self.chain.read().unwrap().deref()) { + warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); self.queue.mark_as_bad(&header.hash()); + bad.insert(block.header.hash()); return; - }, - }; - // build last hashes - let mut last_hashes = LastHashes::new(); - last_hashes.resize(256, H256::new()); - last_hashes[0] = header.parent_hash.clone(); - for i in 0..255 { - match self.chain.read().unwrap().block_details(&last_hashes[i]) { - Some(details) => { - last_hashes[i + 1] = details.parent.clone(); + }; + let parent = match self.chain.read().unwrap().block_header(&header.parent_hash) { + Some(p) => p, + None => { + warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); + self.queue.mark_as_bad(&header.hash()); + bad.insert(block.header.hash()); + return; }, - None => break, + }; + // build last hashes + let mut last_hashes = LastHashes::new(); + last_hashes.resize(256, H256::new()); + last_hashes[0] = header.parent_hash.clone(); + for i in 0..255 { + match self.chain.read().unwrap().block_details(&last_hashes[i]) { + Some(details) => { + last_hashes[i + 1] = details.parent.clone(); + }, + None => break, + } } - } - let result = match enact(&bytes, self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes) { - Ok(b) => b, - Err(e) => { - warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + let result = match enact_verified(&block, self.engine.deref().deref(), self.state_db.clone(), &parent, &last_hashes) { + Ok(b) => b, + Err(e) => { + warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + bad.insert(block.header.hash()); + self.queue.mark_as_bad(&header.hash()); + return; + } + }; + if let Err(e) = verify_block_final(&header, result.block().header()) { + warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); self.queue.mark_as_bad(&header.hash()); return; } - }; - if let Err(e) = verify_block_final(&header, result.block().header()) { - warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - self.queue.mark_as_bad(&header.hash()); - return; - } - self.chain.write().unwrap().insert_block(&bytes); //TODO: err here? - match result.drain().commit() { - Ok(_) => (), - Err(e) => { - warn!(target: "client", "State DB commit failed: {:?}", e); - return; + self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here? + match result.drain().commit() { + Ok(_) => (), + Err(e) => { + warn!(target: "client", "State DB commit failed: {:?}", e); + return; + } } + info!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } - info!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } } @@ -261,8 +278,8 @@ impl BlockChainClient for Client { unimplemented!(); } - fn import_block(&mut self, bytes: &[u8]) -> ImportResult { - let header = BlockView::new(bytes).header(); + fn import_block(&mut self, bytes: Bytes) -> ImportResult { + let header = BlockView::new(&bytes).header(); if self.chain.read().unwrap().is_known(&header.hash()) { return Err(ImportError::AlreadyInChain); } diff --git a/src/ethereum/ethash.rs b/src/ethereum/ethash.rs index 99ffc3186..8a86cf4e5 100644 --- a/src/ethereum/ethash.rs +++ b/src/ethereum/ethash.rs @@ -146,6 +146,10 @@ impl Engine for Ethash { } Ok(()) } + + fn verify_transaction(&self, t: &Transaction, _header: &Header) -> Result<(), Error> { + t.sender().map(|_|()) // Perform EC recovery and cache sender + } } impl Ethash { diff --git a/src/lib.rs b/src/lib.rs index 7e4fdf33e..8180736a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ extern crate heapsize; extern crate crypto; extern crate time; extern crate env_logger; +extern crate num_cpus; #[cfg(feature = "jit" )] extern crate evmjit; #[macro_use] diff --git a/src/queue.rs b/src/queue.rs index 5ca361834..5803b3e5f 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,59 +1,245 @@ +use std::thread::{JoinHandle, self}; +use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use util::*; use verification::*; use error::*; use engine::Engine; use sync::*; use views::*; +use header::*; /// A queue of blocks. Sits between network or other I/O and the BlockChain. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { engine: Arc>, + more_to_verify: Arc, + verification: Arc>, + verifiers: Vec>, + deleting: Arc, + ready_signal: Arc, + processing: HashSet +} + +struct UnVerifiedBlock { + header: Header, + bytes: Bytes, +} + +struct VerifyingBlock { + hash: H256, + block: Option, +} + +struct QueueSignal { + signalled: AtomicBool, message_channel: IoChannel, +} + +impl QueueSignal { + fn set(&self) { + if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { + self.message_channel.send(UserMessage(SyncMessage::BlockVerified)).expect("Error sending BlockVerified message"); + } + } + fn reset(&self) { + self.signalled.store(false, AtomicOrdering::Relaxed); + } +} + +#[derive(Default)] +struct Verification { + unverified: VecDeque, + verified: VecDeque, + verifying: VecDeque, bad: HashSet, } impl BlockQueue { /// Creates a new queue instance. pub fn new(engine: Arc>, message_channel: IoChannel) -> BlockQueue { + let verification = Arc::new(Mutex::new(Verification::default())); + let more_to_verify = Arc::new(Condvar::new()); + let ready_signal = Arc::new(QueueSignal { signalled: AtomicBool::new(false), message_channel: message_channel }); + let deleting = Arc::new(AtomicBool::new(false)); + + let mut verifiers: Vec> = Vec::new(); + let thread_count = max(::num_cpus::get(), 2) - 1; + for _ in 0..thread_count { + let verification = verification.clone(); + let engine = engine.clone(); + let more_to_verify = more_to_verify.clone(); + let ready_signal = ready_signal.clone(); + let deleting = deleting.clone(); + verifiers.push(thread::spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting))); + } BlockQueue { engine: engine, - message_channel: message_channel, - bad: HashSet::new(), + ready_signal: ready_signal.clone(), + more_to_verify: more_to_verify.clone(), + verification: verification.clone(), + verifiers: verifiers, + deleting: deleting.clone(), + processing: HashSet::new(), + } + } + + fn verify(verification: Arc>, engine: Arc>, wait: Arc, ready: Arc, deleting: Arc) { + while !deleting.load(AtomicOrdering::Relaxed) { + { + let mut lock = verification.lock().unwrap(); + while lock.unverified.is_empty() && !deleting.load(AtomicOrdering::Relaxed) { + lock = wait.wait(lock).unwrap(); + } + + if deleting.load(AtomicOrdering::Relaxed) { + return; + } + } + + let block = { + let mut v = verification.lock().unwrap(); + if v.unverified.is_empty() { + continue; + } + let block = v.unverified.pop_front().unwrap(); + v.verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); + block + }; + + let block_hash = block.header.hash(); + match verify_block_unordered(block.header, block.bytes, engine.deref().deref()) { + Ok(verified) => { + let mut v = verification.lock().unwrap(); + for e in &mut v.verifying { + if e.hash == block_hash { + e.block = Some(verified); + break; + } + } + if !v.verifying.is_empty() && v.verifying.front().unwrap().hash == block_hash { + // we're next! + let mut vref = v.deref_mut(); + BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); + ready.set(); + } + }, + Err(err) => { + let mut v = verification.lock().unwrap(); + warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); + v.bad.insert(block_hash.clone()); + v.verifying.retain(|e| e.hash != block_hash); + let mut vref = v.deref_mut(); + BlockQueue::drain_verifying(&mut vref.verifying, &mut vref.verified, &mut vref.bad); + ready.set(); + } + } + } + } + + fn drain_verifying(verifying: &mut VecDeque, verified: &mut VecDeque, bad: &mut HashSet) { + while !verifying.is_empty() && verifying.front().unwrap().block.is_some() { + let block = verifying.pop_front().unwrap().block.unwrap(); + if bad.contains(&block.header.parent_hash) { + bad.insert(block.header.hash()); + } + else { + verified.push_back(block); + } } } /// Clear the queue and stop verification activity. pub fn clear(&mut self) { + let mut verification = self.verification.lock().unwrap(); + verification.unverified.clear(); + verification.verifying.clear(); } /// Add a block to the queue. - pub fn import_block(&mut self, bytes: &[u8]) -> ImportResult { - let header = BlockView::new(bytes).header(); - if self.bad.contains(&header.hash()) { - return Err(ImportError::Bad(None)); + pub fn import_block(&mut self, bytes: Bytes) -> ImportResult { + let header = BlockView::new(&bytes).header(); + if self.processing.contains(&header.hash()) { + return Err(ImportError::AlreadyQueued); + } + { + let mut verification = self.verification.lock().unwrap(); + if verification.bad.contains(&header.hash()) { + return Err(ImportError::Bad(None)); + } + + if verification.bad.contains(&header.parent_hash) { + verification.bad.insert(header.hash()); + return Err(ImportError::Bad(None)); + } } - if self.bad.contains(&header.parent_hash) { - self.bad.insert(header.hash()); - return Err(ImportError::Bad(None)); + match verify_block_basic(&header, &bytes, self.engine.deref().deref()) { + Ok(()) => { + self.processing.insert(header.hash()); + self.verification.lock().unwrap().unverified.push_back(UnVerifiedBlock { header: header, bytes: bytes }); + self.more_to_verify.notify_all(); + }, + Err(err) => { + warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), err); + self.verification.lock().unwrap().bad.insert(header.hash()); + } } - - try!(verify_block_basic(&header, bytes, self.engine.deref().deref()).map_err(|e| { - warn!(target: "client", "Stage 1 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), e); - e - })); - try!(verify_block_unordered(&header, bytes, self.engine.deref().deref()).map_err(|e| { - warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", BlockView::new(&bytes).header_view().sha3(), e); - e - })); - try!(self.message_channel.send(UserMessage(SyncMessage::BlockVerified(bytes.to_vec()))).map_err(|e| Error::from(e))); Ok(()) } + /// Mark given block and all its children as bad. Stops verification. pub fn mark_as_bad(&mut self, hash: &H256) { - self.bad.insert(hash.clone()); - //TODO: walk the queue + let mut verification_lock = self.verification.lock().unwrap(); + let mut verification = verification_lock.deref_mut(); + verification.bad.insert(hash.clone()); + let mut new_verified = VecDeque::new(); + for block in verification.verified.drain(..) { + if verification.bad.contains(&block.header.parent_hash) { + verification.bad.insert(block.header.hash()); + } + else { + new_verified.push_back(block); + } + } + verification.verified = new_verified; + } + + pub fn drain(&mut self, max: usize) -> Vec { + let mut verification = self.verification.lock().unwrap(); + let count = min(max, verification.verified.len()); + let mut result = Vec::with_capacity(count); + for _ in 0..count { + let block = verification.verified.pop_front().unwrap(); + self.processing.remove(&block.header.hash()); + result.push(block); + } + self.ready_signal.reset(); + result } } +impl Drop for BlockQueue { + fn drop(&mut self) { + self.clear(); + self.deleting.store(true, AtomicOrdering::Relaxed); + self.more_to_verify.notify_all(); + for t in self.verifiers.drain(..) { + t.join().unwrap(); + } + } +} + +#[cfg(test)] +mod tests { + use util::*; + use spec::*; + use queue::*; + + #[test] + fn test_block_queue() { + // TODO better test + let spec = Spec::new_test(); + let engine = spec.to_engine().unwrap(); + let _ = BlockQueue::new(Arc::new(engine), IoChannel::disconnected()); + } +} diff --git a/src/service.rs b/src/service.rs index 3bc137c9c..036c99bc4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -55,8 +55,8 @@ impl IoHandler for ClientIoHandler { match net_message { &mut UserMessage(ref mut message) => { match message { - &mut SyncMessage::BlockVerified(ref mut bytes) => { - self.client.write().unwrap().import_verified_block(mem::replace(bytes, Bytes::new())); + &mut SyncMessage::BlockVerified => { + self.client.write().unwrap().import_verified_blocks(); }, _ => {}, // ignore other messages } diff --git a/src/sync/chain.rs b/src/sync/chain.rs index 43f5968f4..15fe6d1f9 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -401,7 +401,7 @@ impl ChainSync { let header_view = HeaderView::new(header_rlp.as_raw()); // TODO: Decompose block and add to self.headers and self.bodies instead if header_view.number() == From::from(self.last_imported_block + 1) { - match io.chain().import_block(block_rlp.as_raw()) { + match io.chain().import_block(block_rlp.as_raw().to_vec()) { Err(ImportError::AlreadyInChain) => { trace!(target: "sync", "New block already in chain {:?}", h); }, @@ -655,7 +655,7 @@ impl ChainSync { block_rlp.append_raw(body.at(0).as_raw(), 1); block_rlp.append_raw(body.at(1).as_raw(), 1); let h = &headers.1[i].hash; - match io.chain().import_block(&block_rlp.out()) { + match io.chain().import_block(block_rlp.out()) { Err(ImportError::AlreadyInChain) => { trace!(target: "sync", "Block already in chain {:?}", h); self.last_imported_block = headers.0 + i as BlockNumber; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 491fa8e40..da91a6889 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -43,7 +43,7 @@ pub enum SyncMessage { /// New block has been imported into the blockchain NewChainBlock(Bytes), /// A block is ready - BlockVerified(Bytes), + BlockVerified, } pub type NetSyncMessage = NetworkIoMessage; diff --git a/src/sync/tests.rs b/src/sync/tests.rs index 84a8bf21f..05d7ac317 100644 --- a/src/sync/tests.rs +++ b/src/sync/tests.rs @@ -43,7 +43,7 @@ impl TestBlockChainClient { rlp.append(&header); rlp.append_raw(&rlp::NULL_RLP, 1); rlp.append_raw(uncles.as_raw(), 1); - self.import_block(rlp.as_raw()).unwrap(); + self.import_block(rlp.as_raw().to_vec()).unwrap(); } } } @@ -110,7 +110,7 @@ impl BlockChainClient for TestBlockChainClient { None } - fn import_block(&mut self, b: &[u8]) -> ImportResult { + fn import_block(&mut self, b: Bytes) -> ImportResult { let header = Rlp::new(&b).val_at::(0); let number: usize = header.number as usize; if number > self.blocks.len() { @@ -132,7 +132,7 @@ impl BlockChainClient for TestBlockChainClient { if number == self.numbers.len() { self.difficulty = self.difficulty + header.difficulty; self.last_hash = header.hash(); - self.blocks.insert(header.hash(), b.to_vec()); + self.blocks.insert(header.hash(), b); self.numbers.insert(number, header.hash()); let mut parent_hash = header.parent_hash; if number > 0 { diff --git a/src/verification.rs b/src/verification.rs index 6df49ac31..3d852dc3e 100644 --- a/src/verification.rs +++ b/src/verification.rs @@ -9,6 +9,16 @@ use common::*; use engine::Engine; use blockchain::*; +/// Preprocessed block data gathered in `verify_block_unordered` call +pub struct PreVerifiedBlock { + /// Populated block header + pub header: Header, + /// Populated block transactions + pub transactions: Vec, + /// Block bytes + pub bytes: Bytes, +} + /// Phase 1 quick block verification. Only does checks that are cheap. Operates on a single block pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Result<(), Error> { try!(verify_header(&header, engine)); @@ -29,19 +39,26 @@ pub fn verify_block_basic(header: &Header, bytes: &[u8], engine: &Engine) -> Res /// Phase 2 verification. Perform costly checks such as transaction signatures and block nonce for ethash. /// Still operates on a individual block -/// TODO: return cached transactions, header hash. -pub fn verify_block_unordered(header: &Header, bytes: &[u8], engine: &Engine) -> Result<(), Error> { - try!(engine.verify_block_unordered(&header, Some(bytes))); - for u in Rlp::new(bytes).at(2).iter().map(|rlp| rlp.as_val::
()) { +/// Returns a PreVerifiedBlock structure populated with transactions +pub fn verify_block_unordered(header: Header, bytes: Bytes, engine: &Engine) -> Result { + try!(engine.verify_block_unordered(&header, Some(&bytes))); + for u in Rlp::new(&bytes).at(2).iter().map(|rlp| rlp.as_val::
()) { try!(engine.verify_block_unordered(&u, None)); } - // Verify transactions. - // TODO: pass in pre-recovered transactions - maybe verify_transaction wants to call `sender()`. - let v = BlockView::new(bytes); - for t in v.transactions() { - try!(engine.verify_transaction(&t, &header)); + // Verify transactions. + let mut transactions = Vec::new(); + { + let v = BlockView::new(&bytes); + for t in v.transactions() { + try!(engine.verify_transaction(&t, &header)); + transactions.push(t); + } } - Ok(()) + Ok(PreVerifiedBlock { + header: header, + transactions: transactions, + bytes: bytes, + }) } /// Phase 3 verification. Check block information against parent and uncles. diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 4ccfb2407..4a96d19a7 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -151,14 +151,22 @@ impl Handler for IoManager where Message: Send + 'static { /// Allows sending messages into the event loop. All the IO handlers will get the message /// in the `message` callback. pub struct IoChannel where Message: Send { - channel: Sender> + channel: Option>> } impl IoChannel where Message: Send { - pub fn send(&mut self, message: Message) -> Result<(), IoError> { - try!(self.channel.send(IoMessage::UserMessage(message))); + /// Send a msessage through the channel + pub fn send(&self, message: Message) -> Result<(), IoError> { + if let Some(ref channel) = self.channel { + try!(channel.send(IoMessage::UserMessage(message))); + } Ok(()) } + + /// Create a new channel to connected to event loop. + pub fn disconnected() -> IoChannel { + IoChannel { channel: None } + } } /// General IO Service. Starts an event loop and dispatches IO requests. @@ -198,7 +206,7 @@ impl IoService where Message: Send + 'static { /// Create a new message channel pub fn channel(&mut self) -> IoChannel { - IoChannel { channel: self.host_channel.clone() } + IoChannel { channel: Some(self.host_channel.clone()) } } }