diff --git a/Cargo.lock b/Cargo.lock index 64e468e67..b02fcbd24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -818,6 +818,7 @@ dependencies = [ "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "stop-guard 0.1.0", "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "trace-time 0.1.0", ] [[package]] @@ -866,6 +867,7 @@ dependencies = [ "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "trace-time 0.1.0", "triehash 0.1.0", ] diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 723d49182..7aca4c85d 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -149,8 +149,8 @@ impl Provider where { encryptor: Box, config: ProviderConfig, channel: IoChannel, - ) -> Result { - Ok(Provider { + ) -> Self { + Provider { encryptor, validator_accounts: config.validator_accounts.into_iter().collect(), signer_account: config.signer_account, @@ -162,7 +162,7 @@ impl Provider where { miner, accounts, channel, - }) + } } // TODO [ToDr] Don't use `ChainNotify` here! @@ -243,50 +243,6 @@ impl Provider where { Ok(original_transaction) } - /// Process received private transaction - pub fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> { - trace!("Private transaction received"); - let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?; - let contract = private_tx.contract; - let contract_validators = self.get_validators(BlockId::Latest, &contract)?; - - let validation_account = contract_validators - .iter() - .find(|address| self.validator_accounts.contains(address)); - - match validation_account { - None => { - // TODO [ToDr] This still seems a bit invalid, imho we should still import the transaction to the pool. - // Importing to pool verifies correctness and nonce; here we are just blindly forwarding. - // - // Not for verification, broadcast further to peers - self.broadcast_private_transaction(rlp.into()); - return Ok(()); - }, - Some(&validation_account) => { - let hash = private_tx.hash(); - trace!("Private transaction taken for verification"); - let original_tx = self.extract_original_transaction(private_tx, &contract)?; - trace!("Validating transaction: {:?}", original_tx); - // Verify with the first account available - trace!("The following account will be used for verification: {:?}", validation_account); - let nonce_cache = Default::default(); - self.transactions_for_verification.lock().add_transaction( - original_tx, - contract, - validation_account, - hash, - self.pool_client(&nonce_cache), - )?; - // NOTE This will just fire `on_private_transaction_queued` but from a client thread. - // It seems that a lot of heavy work (verification) is done in this thread anyway - // it might actually make sense to decouple it from clientService and just use dedicated thread - // for both verification and execution. - self.channel.send(ClientIoMessage::NewPrivateTransaction).map_err(|_| ErrorKind::ClientIsMalformed.into()) - } - } - } - fn pool_client<'a>(&'a self, nonce_cache: &'a RwLock>) -> miner::pool_client::PoolClient<'a, Client> { let engine = self.client.engine(); let refuse_service_transactions = true; @@ -299,11 +255,6 @@ impl Provider where { ) } - /// Private transaction for validation added into queue - pub fn on_private_transaction_queued(&self) -> Result<(), Error> { - self.process_queue() - } - /// Retrieve and verify the first available private transaction for every sender /// /// TODO [ToDr] It seems that: @@ -347,73 +298,6 @@ impl Provider where { Ok(()) } - /// Add signed private transaction into the store - /// Creates corresponding public transaction if last required singature collected and sends it to the chain - pub fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> { - let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?; - trace!("Signature for private transaction received: {:?}", tx); - let private_hash = tx.private_transaction_hash(); - let desc = match self.transactions_for_signing.lock().get(&private_hash) { - None => { - // TODO [ToDr] Verification (we can't just blindly forward every transaction) - - // Not our transaction, broadcast further to peers - self.broadcast_signed_private_transaction(rlp.into()); - return Ok(()); - }, - Some(desc) => desc, - }; - - let last = self.last_required_signature(&desc, tx.signature())?; - - if last { - let mut signatures = desc.received_signatures.clone(); - signatures.push(tx.signature()); - let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); - //Create public transaction - let public_tx = self.public_transaction( - desc.state.clone(), - &desc.original_transaction, - &rsv, - desc.original_transaction.nonce, - desc.original_transaction.gas_price - )?; - trace!("Last required signature received, public transaction created: {:?}", public_tx); - //Sign and add it to the queue - let chain_id = desc.original_transaction.chain_id(); - let hash = public_tx.hash(chain_id); - let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?; - let password = find_account_password(&self.passwords, &*self.accounts, &signer_account); - let signature = self.accounts.sign(signer_account, password, hash)?; - let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?; - match self.miner.import_own_transaction(&*self.client, signed.into()) { - Ok(_) => trace!("Public transaction added to queue"), - Err(err) => { - trace!("Failed to add transaction to queue, error: {:?}", err); - bail!(err); - } - } - //Remove from store for signing - match self.transactions_for_signing.lock().remove(&private_hash) { - Ok(_) => {} - Err(err) => { - trace!("Failed to remove transaction from signing store, error: {:?}", err); - bail!(err); - } - } - } else { - //Add signature to the store - match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) { - Ok(_) => trace!("Signature stored for private transaction"), - Err(err) => { - trace!("Failed to add signature to signing store, error: {:?}", err); - bail!(err); - } - } - } - Ok(()) - } - fn last_required_signature(&self, desc: &PrivateTransactionSigningDesc, sign: Signature) -> Result { if desc.received_signatures.contains(&sign) { return Ok(false); @@ -657,6 +541,134 @@ impl Provider where { } } +pub trait Importer { + /// Process received private transaction + fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>; + + /// Add signed private transaction into the store + /// + /// Creates corresponding public transaction if last required signature collected and sends it to the chain + fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>; +} + +// TODO [ToDr] Offload more heavy stuff to the IoService thread. +// It seems that a lot of heavy work (verification) is done in this thread anyway +// it might actually make sense to decouple it from clientService and just use dedicated thread +// for both verification and execution. + +impl Importer for Arc { + fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> { + trace!("Private transaction received"); + let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?; + let contract = private_tx.contract; + let contract_validators = self.get_validators(BlockId::Latest, &contract)?; + + let validation_account = contract_validators + .iter() + .find(|address| self.validator_accounts.contains(address)); + + match validation_account { + None => { + // TODO [ToDr] This still seems a bit invalid, imho we should still import the transaction to the pool. + // Importing to pool verifies correctness and nonce; here we are just blindly forwarding. + // + // Not for verification, broadcast further to peers + self.broadcast_private_transaction(rlp.into()); + return Ok(()); + }, + Some(&validation_account) => { + let hash = private_tx.hash(); + trace!("Private transaction taken for verification"); + let original_tx = self.extract_original_transaction(private_tx, &contract)?; + trace!("Validating transaction: {:?}", original_tx); + // Verify with the first account available + trace!("The following account will be used for verification: {:?}", validation_account); + let nonce_cache = Default::default(); + self.transactions_for_verification.lock().add_transaction( + original_tx, + contract, + validation_account, + hash, + self.pool_client(&nonce_cache), + )?; + let provider = Arc::downgrade(self); + self.channel.send(ClientIoMessage::execute(move |_| { + if let Some(provider) = provider.upgrade() { + if let Err(e) = provider.process_queue() { + debug!("Unable to process the queue: {}", e); + } + } + })).map_err(|_| ErrorKind::ClientIsMalformed.into()) + } + } + } + + fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> { + let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?; + trace!("Signature for private transaction received: {:?}", tx); + let private_hash = tx.private_transaction_hash(); + let desc = match self.transactions_for_signing.lock().get(&private_hash) { + None => { + // TODO [ToDr] Verification (we can't just blindly forward every transaction) + + // Not our transaction, broadcast further to peers + self.broadcast_signed_private_transaction(rlp.into()); + return Ok(()); + }, + Some(desc) => desc, + }; + + let last = self.last_required_signature(&desc, tx.signature())?; + + if last { + let mut signatures = desc.received_signatures.clone(); + signatures.push(tx.signature()); + let rsv: Vec = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect(); + //Create public transaction + let public_tx = self.public_transaction( + desc.state.clone(), + &desc.original_transaction, + &rsv, + desc.original_transaction.nonce, + desc.original_transaction.gas_price + )?; + trace!("Last required signature received, public transaction created: {:?}", public_tx); + //Sign and add it to the queue + let chain_id = desc.original_transaction.chain_id(); + let hash = public_tx.hash(chain_id); + let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?; + let password = find_account_password(&self.passwords, &*self.accounts, &signer_account); + let signature = self.accounts.sign(signer_account, password, hash)?; + let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?; + match self.miner.import_own_transaction(&*self.client, signed.into()) { + Ok(_) => trace!("Public transaction added to queue"), + Err(err) => { + trace!("Failed to add transaction to queue, error: {:?}", err); + bail!(err); + } + } + //Remove from store for signing + match self.transactions_for_signing.lock().remove(&private_hash) { + Ok(_) => {} + Err(err) => { + trace!("Failed to remove transaction from signing store, error: {:?}", err); + bail!(err); + } + } + } else { + //Add signature to the store + match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) { + Ok(_) => trace!("Signature stored for private transaction"), + Err(err) => { + trace!("Failed to add signature to signing store, error: {:?}", err); + bail!(err); + } + } + } + Ok(()) + } +} + /// Try to unlock account using stored password, return found password if any fn find_account_password(passwords: &Vec, account_provider: &AccountProvider, account: &Address) -> Option { for password in passwords { diff --git a/ethcore/private-tx/tests/private_contract.rs b/ethcore/private-tx/tests/private_contract.rs index e53ad5e5f..e7e608c2b 100644 --- a/ethcore/private-tx/tests/private_contract.rs +++ b/ethcore/private-tx/tests/private_contract.rs @@ -74,7 +74,7 @@ fn private_contract() { Box::new(NoopEncryptor::default()), config, io, - ).unwrap()); + )); let (address, _) = contract_address(CreateContractAddress::FromSenderAndNonce, &key1.address(), &0.into(), &[]); diff --git a/ethcore/service/Cargo.toml b/ethcore/service/Cargo.toml index b612baf56..3a10849b6 100644 --- a/ethcore/service/Cargo.toml +++ b/ethcore/service/Cargo.toml @@ -13,6 +13,7 @@ ethcore-sync = { path = "../sync" } kvdb = { path = "../../util/kvdb" } log = "0.3" stop-guard = { path = "../../util/stop-guard" } +trace-time = { path = "../../util/trace-time" } [dev-dependencies] tempdir = "0.3" diff --git a/ethcore/service/src/lib.rs b/ethcore/service/src/lib.rs index 1604e84b1..d85a377cd 100644 --- a/ethcore/service/src/lib.rs +++ b/ethcore/service/src/lib.rs @@ -28,6 +28,9 @@ extern crate error_chain; #[macro_use] extern crate log; +#[macro_use] +extern crate trace_time; + #[cfg(test)] extern crate tempdir; diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index b60d4194c..5f4679979 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -33,7 +33,7 @@ use ethcore::snapshot::{RestorationStatus}; use ethcore::spec::Spec; use ethcore::account_provider::AccountProvider; -use ethcore_private_tx; +use ethcore_private_tx::{self, Importer}; use Error; pub struct PrivateTxService { @@ -112,14 +112,13 @@ impl ClientService { account_provider, encryptor, private_tx_conf, - io_service.channel())?, - ); + io_service.channel(), + )); let private_tx = Arc::new(PrivateTxService::new(provider)); let client_io = Arc::new(ClientIoHandler { client: client.clone(), snapshot: snapshot.clone(), - private_tx: private_tx.clone(), }); io_service.register_handler(client_io)?; @@ -175,7 +174,6 @@ impl ClientService { struct ClientIoHandler { client: Arc, snapshot: Arc, - private_tx: Arc, } const CLIENT_TICK_TIMER: TimerToken = 0; @@ -191,6 +189,7 @@ impl IoHandler for ClientIoHandler { } fn timeout(&self, _io: &IoContext, timer: TimerToken) { + trace_time!("service::read"); match timer { CLIENT_TICK_TIMER => { use ethcore::snapshot::SnapshotService; @@ -203,20 +202,24 @@ impl IoHandler for ClientIoHandler { } fn message(&self, _io: &IoContext, net_message: &ClientIoMessage) { + trace_time!("service::message"); use std::thread; match *net_message { - ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); } - ClientIoMessage::NewTransactions(ref transactions, peer_id) => { - self.client.import_queued_transactions(transactions, peer_id); + ClientIoMessage::BlockVerified => { + self.client.import_verified_blocks(); } ClientIoMessage::BeginRestoration(ref manifest) => { if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) { warn!("Failed to initialize snapshot restoration: {}", e); } } - ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk), - ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk), + ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => { + self.snapshot.feed_state_chunk(*hash, chunk) + } + ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => { + self.snapshot.feed_block_chunk(*hash, chunk) + } ClientIoMessage::TakeSnapshot(num) => { let client = self.client.clone(); let snapshot = self.snapshot.clone(); @@ -231,12 +234,9 @@ impl IoHandler for ClientIoHandler { debug!(target: "snapshot", "Failed to initialize periodic snapshot thread: {:?}", e); } }, - ClientIoMessage::NewMessage(ref message) => if let Err(e) = self.client.engine().handle_message(message) { - trace!(target: "poa", "Invalid message received: {}", e); - }, - ClientIoMessage::NewPrivateTransaction => if let Err(e) = self.private_tx.provider.on_private_transaction_queued() { - warn!("Failed to handle private transaction {:?}", e); - }, + ClientIoMessage::Execute(ref exec) => { + (*exec.0)(&self.client); + } _ => {} // ignore other messages } } diff --git a/ethcore/src/client/ancient_import.rs b/ethcore/src/client/ancient_import.rs index 13699ea5a..c2523a13a 100644 --- a/ethcore/src/client/ancient_import.rs +++ b/ethcore/src/client/ancient_import.rs @@ -32,16 +32,16 @@ const HEAVY_VERIFY_RATE: f32 = 0.02; /// Ancient block verifier: import an ancient sequence of blocks in order from a starting /// epoch. pub struct AncientVerifier { - cur_verifier: RwLock>>, + cur_verifier: RwLock>>>, engine: Arc, } impl AncientVerifier { - /// Create a new ancient block verifier with the given engine and initial verifier. - pub fn new(engine: Arc, start_verifier: Box>) -> Self { + /// Create a new ancient block verifier with the given engine. + pub fn new(engine: Arc) -> Self { AncientVerifier { - cur_verifier: RwLock::new(start_verifier), - engine: engine, + cur_verifier: RwLock::new(None), + engine, } } @@ -53,17 +53,49 @@ impl AncientVerifier { header: &Header, chain: &BlockChain, ) -> Result<(), ::error::Error> { - match rng.gen::() <= HEAVY_VERIFY_RATE { - true => self.cur_verifier.read().verify_heavy(header)?, - false => self.cur_verifier.read().verify_light(header)?, + // perform verification + let verified = if let Some(ref cur_verifier) = *self.cur_verifier.read() { + match rng.gen::() <= HEAVY_VERIFY_RATE { + true => cur_verifier.verify_heavy(header)?, + false => cur_verifier.verify_light(header)?, + } + true + } else { + false + }; + + // when there is no verifier initialize it. + // We use a bool flag to avoid double locking in the happy case + if !verified { + { + let mut cur_verifier = self.cur_verifier.write(); + if cur_verifier.is_none() { + *cur_verifier = Some(self.initial_verifier(header, chain)?); + } + } + // Call again to verify. + return self.verify(rng, header, chain); } // ancient import will only use transitions obtained from the snapshot. if let Some(transition) = chain.epoch_transition(header.number(), header.hash()) { let v = self.engine.epoch_verifier(&header, &transition.proof).known_confirmed()?; - *self.cur_verifier.write() = v; + *self.cur_verifier.write() = Some(v); } Ok(()) } + + fn initial_verifier(&self, header: &Header, chain: &BlockChain) + -> Result>, ::error::Error> + { + trace!(target: "client", "Initializing ancient block restoration."); + let current_epoch_data = chain.epoch_transitions() + .take_while(|&(_, ref t)| t.block_number < header.number()) + .last() + .map(|(_, t)| t.proof) + .expect("At least one epoch entry (genesis) always stored; qed"); + + self.engine.epoch_verifier(&header, ¤t_epoch_data).known_confirmed() + } } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 8119ebd35..bffa4e38b 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -15,15 +15,16 @@ // along with Parity. If not, see . use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque}; +use std::fmt; use std::str::FromStr; -use std::sync::{Arc, Weak}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; +use std::sync::{Arc, Weak}; use std::time::{Instant, Duration}; -use itertools::Itertools; // util use hash::keccak; use bytes::Bytes; +use itertools::Itertools; use journaldb; use trie::{TrieSpec, TrieFactory, Trie}; use kvdb::{DBValue, KeyValueDB, DBTransaction}; @@ -45,7 +46,8 @@ use client::{ use client::{ BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient, TraceFilter, CallAnalytics, BlockImportError, Mode, - ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType + ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType, + IoClient, }; use encoded; use engines::{EthEngine, EpochTransition}; @@ -55,7 +57,7 @@ use evm::Schedule; use executive::{Executive, Executed, TransactOptions, contract_address}; use factory::{Factories, VmFactory}; use header::{BlockNumber, Header}; -use io::IoChannel; +use io::{IoChannel, IoError}; use log_entry::LocalizedLogEntry; use miner::{Miner, MinerService}; use ethcore_miner::pool::VerifiedTransaction; @@ -85,6 +87,7 @@ pub use verification::queue::QueueInfo as BlockQueueInfo; use_contract!(registry, "Registry", "res/contracts/registrar.json"); const MAX_TX_QUEUE_SIZE: usize = 4096; +const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096; const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; const MIN_HISTORY_SIZE: u64 = 8; @@ -154,10 +157,7 @@ struct Importer { pub miner: Arc, /// Ancient block verifier: import an ancient sequence of blocks in order from a starting epoch - pub ancient_verifier: Mutex>, - - /// Random number generator used by `AncientVerifier` - pub rng: Mutex, + pub ancient_verifier: AncientVerifier, /// Ethereum engine to be used during import pub engine: Arc, @@ -204,8 +204,13 @@ pub struct Client { /// List of actors to be notified on certain chain events notify: RwLock>>, - /// Count of pending transactions in the queue - queue_transactions: AtomicUsize, + /// Queued transactions from IO + queue_transactions: IoChannelQueue, + /// Ancient blocks import queue + queue_ancient_blocks: IoChannelQueue, + /// Consensus messages import queue + queue_consensus_message: IoChannelQueue, + last_hashes: RwLock>, factories: Factories, @@ -239,8 +244,7 @@ impl Importer { verifier: verification::new(config.verifier_type.clone()), block_queue, miner, - ancient_verifier: Mutex::new(None), - rng: Mutex::new(OsRng::new()?), + ancient_verifier: AncientVerifier::new(engine.clone()), engine, }) } @@ -416,55 +420,25 @@ impl Importer { Ok(locked_block) } + /// Import a block with transaction receipts. /// /// The block is guaranteed to be the next best blocks in the /// first block sequence. Does no sealing or transaction validation. - fn import_old_block(&self, header: &Header, block_bytes: Bytes, receipts_bytes: Bytes, db: &KeyValueDB, chain: &BlockChain) -> Result { - let receipts = ::rlp::decode_list(&receipts_bytes); + fn import_old_block(&self, header: &Header, block_bytes: &[u8], receipts_bytes: &[u8], db: &KeyValueDB, chain: &BlockChain) -> Result { + let receipts = ::rlp::decode_list(receipts_bytes); let hash = header.hash(); let _import_lock = self.import_lock.lock(); { trace_time!("import_old_block"); - let mut ancient_verifier = self.ancient_verifier.lock(); - - { - // closure for verifying a block. - let verify_with = |verifier: &AncientVerifier| -> Result<(), ::error::Error> { - // verify the block, passing the chain for updating the epoch - // verifier. - let mut rng = OsRng::new().map_err(UtilError::from)?; - verifier.verify(&mut rng, &header, &chain) - }; - - // initialize the ancient block verifier if we don't have one already. - match &mut *ancient_verifier { - &mut Some(ref verifier) => { - verify_with(verifier)? - } - x @ &mut None => { - // load most recent epoch. - trace!(target: "client", "Initializing ancient block restoration."); - let current_epoch_data = chain.epoch_transitions() - .take_while(|&(_, ref t)| t.block_number < header.number()) - .last() - .map(|(_, t)| t.proof) - .expect("At least one epoch entry (genesis) always stored; qed"); - - let current_verifier = self.engine.epoch_verifier(&header, ¤t_epoch_data) - .known_confirmed()?; - let current_verifier = AncientVerifier::new(self.engine.clone(), current_verifier); - - verify_with(¤t_verifier)?; - *x = Some(current_verifier); - } - } - } + // verify the block, passing the chain for updating the epoch verifier. + let mut rng = OsRng::new().map_err(UtilError::from)?; + self.ancient_verifier.verify(&mut rng, &header, &chain)?; // Commit results let mut batch = DBTransaction::new(); - chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, false, true); + chain.insert_unordered_block(&mut batch, block_bytes, receipts, None, false, true); // Final commit to the DB db.write_buffered(batch); chain.commit(); @@ -734,7 +708,9 @@ impl Client { report: RwLock::new(Default::default()), io_channel: Mutex::new(message_channel), notify: RwLock::new(Vec::new()), - queue_transactions: AtomicUsize::new(0), + queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), + queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), + queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), factories: factories, history: history, @@ -820,7 +796,7 @@ impl Client { } fn notify(&self, f: F) where F: Fn(&ChainNotify) { - for np in self.notify.read().iter() { + for np in &*self.notify.read() { if let Some(n) = np.upgrade() { f(&*n); } @@ -954,24 +930,6 @@ impl Client { } } - /// Import transactions from the IO queue - pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: usize) -> usize { - trace_time!("import_queued_transactions"); - self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst); - - let txs: Vec = transactions - .iter() - .filter_map(|bytes| self.engine().decode_transaction(bytes).ok()) - .collect(); - - self.notify(|notify| { - notify.transactions_received(&txs, peer_id); - }); - - let results = self.importer.miner.import_external_transactions(self, txs); - results.len() - } - /// Get shared miner reference. #[cfg(test)] pub fn miner(&self) -> Arc { @@ -1392,22 +1350,6 @@ impl ImportBlock for Client { } Ok(self.importer.block_queue.import(unverified)?) } - - fn import_block_with_receipts(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result { - let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?; - { - // check block order - if self.chain.read().is_known(&header.hash()) { - bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain)); - } - let status = self.block_status(BlockId::Hash(*header.parent_hash())); - if status == BlockStatus::Unknown || status == BlockStatus::Pending { - bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash()))); - } - } - - self.importer.import_old_block(&header, block_bytes, receipts_bytes, &**self.db.read(), &*self.chain.read()).map_err(Into::into) - } } impl StateClient for Client { @@ -1958,35 +1900,10 @@ impl BlockChainClient for Client { (*self.build_last_hashes(&self.chain.read().best_block_hash())).clone() } - fn queue_transactions(&self, transactions: Vec, peer_id: usize) { - let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed); - trace!(target: "external_tx", "Queue size: {}", queue_size); - if queue_size > MAX_TX_QUEUE_SIZE { - debug!("Ignoring {} transactions: queue is full", transactions.len()); - } else { - let len = transactions.len(); - match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, peer_id)) { - Ok(_) => { - self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); - } - Err(e) => { - debug!("Ignoring {} transactions: error queueing: {}", len, e); - } - } - } - } - fn ready_transactions(&self) -> Vec> { self.importer.miner.ready_transactions(self) } - fn queue_consensus_message(&self, message: Bytes) { - let channel = self.io_channel.lock().clone(); - if let Err(e) = channel.send(ClientIoMessage::NewMessage(message)) { - debug!("Ignoring the message, error queueing: {}", e); - } - } - fn signing_chain_id(&self) -> Option { self.engine.signing_chain_id(&self.latest_env_info()) } @@ -2034,6 +1951,72 @@ impl BlockChainClient for Client { } } +impl IoClient for Client { + fn queue_transactions(&self, transactions: Vec, peer_id: usize) { + let len = transactions.len(); + self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| { + trace_time!("import_queued_transactions"); + + let txs: Vec = transactions + .iter() + .filter_map(|bytes| client.engine.decode_transaction(bytes).ok()) + .collect(); + + client.notify(|notify| { + notify.transactions_received(&txs, peer_id); + }); + + client.importer.miner.import_external_transactions(client, txs); + }).unwrap_or_else(|e| { + debug!(target: "client", "Ignoring {} transactions: {}", len, e); + }); + } + + fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result { + let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?; + let hash = header.hash(); + + { + // check block order + if self.chain.read().is_known(&header.hash()) { + bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain)); + } + let status = self.block_status(BlockId::Hash(*header.parent_hash())); + if status == BlockStatus::Unknown || status == BlockStatus::Pending { + bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash()))); + } + } + + match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { + client.importer.import_old_block( + &header, + &block_bytes, + &receipts_bytes, + &**client.db.read(), + &*client.chain.read() + ).map(|_| ()).unwrap_or_else(|e| { + error!(target: "client", "Error importing ancient block: {}", e); + }); + }) { + Ok(_) => Ok(hash), + Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))), + } + } + + fn queue_consensus_message(&self, message: Bytes) { + match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| { + if let Err(e) = client.engine().handle_message(&message) { + debug!(target: "poa", "Invalid message received: {}", e); + } + }) { + Ok(_) => (), + Err(e) => { + debug!(target: "poa", "Ignoring the message, error queueing: {}", e); + } + } + } +} + impl ReopenBlock for Client { fn reopen_block(&self, block: ClosedBlock) -> OpenBlock { let engine = &*self.engine; @@ -2409,3 +2392,54 @@ mod tests { }); } } + +#[derive(Debug)] +enum QueueError { + Channel(IoError), + Full(usize), +} + +impl fmt::Display for QueueError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match *self { + QueueError::Channel(ref c) => fmt::Display::fmt(c, fmt), + QueueError::Full(limit) => write!(fmt, "The queue is full ({})", limit), + } + } +} + +/// Queue some items to be processed by IO client. +struct IoChannelQueue { + currently_queued: Arc, + limit: usize, +} + +impl IoChannelQueue { + pub fn new(limit: usize) -> Self { + IoChannelQueue { + currently_queued: Default::default(), + limit, + } + } + + pub fn queue(&self, channel: &mut IoChannel, count: usize, fun: F) -> Result<(), QueueError> where + F: Fn(&Client) + Send + Sync + 'static, + { + let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); + ensure!(queue_size < self.limit, QueueError::Full(self.limit)); + + let currently_queued = self.currently_queued.clone(); + let result = channel.send(ClientIoMessage::execute(move |client| { + currently_queued.fetch_sub(count, AtomicOrdering::SeqCst); + fun(client); + })); + + match result { + Ok(_) => { + self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst); + Ok(()) + }, + Err(e) => Err(QueueError::Channel(e)), + } + } +} diff --git a/ethcore/src/client/io_message.rs b/ethcore/src/client/io_message.rs index e19d3054f..817c72602 100644 --- a/ethcore/src/client/io_message.rs +++ b/ethcore/src/client/io_message.rs @@ -14,19 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use ethereum_types::H256; +use std::fmt; use bytes::Bytes; +use client::Client; +use ethereum_types::H256; use snapshot::ManifestData; /// Message type for external and internal events -#[derive(Clone, PartialEq, Eq, Debug)] +#[derive(Debug)] pub enum ClientIoMessage { /// Best Block Hash in chain has been changed NewChainHead, /// A block is ready BlockVerified, - /// New transaction RLPs are ready to be imported - NewTransactions(Vec, usize), /// Begin snapshot restoration BeginRestoration(ManifestData), /// Feed a state chunk to the snapshot service @@ -35,9 +35,23 @@ pub enum ClientIoMessage { FeedBlockChunk(H256, Bytes), /// Take a snapshot for the block with given number. TakeSnapshot(u64), - /// New consensus message received. - NewMessage(Bytes), - /// New private transaction arrived - NewPrivateTransaction, + /// Execute wrapped closure + Execute(Callback), +} + +impl ClientIoMessage { + /// Create new `ClientIoMessage` that executes given procedure. + pub fn execute(fun: F) -> Self { + ClientIoMessage::Execute(Callback(Box::new(fun))) + } +} + +/// A function to invoke in the client thread. +pub struct Callback(pub Box); + +impl fmt::Debug for Callback { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "") + } } diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 05e201825..4c410d301 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -36,9 +36,8 @@ pub use self::traits::{ Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock, StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter }; -//pub use self::private_notify::PrivateNotify; pub use state::StateInfo; -pub use self::traits::{BlockChainClient, EngineClient, ProvingBlockChainClient}; +pub use self::traits::{BlockChainClient, EngineClient, ProvingBlockChainClient, IoClient}; pub use types::ids::*; pub use types::trace_filter::Filter as TraceFilter; diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index c2e06009b..b22915966 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -39,7 +39,7 @@ use client::{ PrepareOpenBlock, BlockChainClient, BlockChainInfo, BlockStatus, BlockId, TransactionId, UncleId, TraceId, TraceFilter, LastHashes, CallAnalytics, BlockImportError, ProvingBlockChainClient, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock, StateOrBlock, - Call, StateClient, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter + Call, StateClient, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter, IoClient }; use db::{NUM_COLUMNS, COL_STATE}; use header::{Header as BlockHeader, BlockNumber}; @@ -556,10 +556,6 @@ impl ImportBlock for TestBlockChainClient { } Ok(h) } - - fn import_block_with_receipts(&self, b: Bytes, _r: Bytes) -> Result { - self.import_block(b) - } } impl Call for TestBlockChainClient { @@ -809,16 +805,6 @@ impl BlockChainClient for TestBlockChainClient { self.traces.read().clone() } - fn queue_transactions(&self, transactions: Vec, _peer_id: usize) { - // import right here - let txs = transactions.into_iter().filter_map(|bytes| Rlp::new(&bytes).as_val().ok()).collect(); - self.miner.import_external_transactions(self, txs); - } - - fn queue_consensus_message(&self, message: Bytes) { - self.spec.engine.handle_message(&message).unwrap(); - } - fn ready_transactions(&self) -> Vec> { self.miner.ready_transactions(self) } @@ -863,6 +849,22 @@ impl BlockChainClient for TestBlockChainClient { fn eip86_transition(&self) -> u64 { u64::max_value() } } +impl IoClient for TestBlockChainClient { + fn queue_transactions(&self, transactions: Vec, _peer_id: usize) { + // import right here + let txs = transactions.into_iter().filter_map(|bytes| Rlp::new(&bytes).as_val().ok()).collect(); + self.miner.import_external_transactions(self, txs); + } + + fn queue_ancient_block(&self, b: Bytes, _r: Bytes) -> Result { + self.import_block(b) + } + + fn queue_consensus_message(&self, message: Bytes) { + self.spec.engine.handle_message(&message).unwrap(); + } +} + impl ProvingBlockChainClient for TestBlockChainClient { fn prove_storage(&self, _: H256, _: H256, _: BlockId) -> Option<(Vec, H256)> { None diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 7d4d5846c..358e24fa9 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -168,9 +168,6 @@ pub trait RegistryInfo { pub trait ImportBlock { /// Import a block into the blockchain. fn import_block(&self, bytes: Bytes) -> Result; - - /// Import a block with transaction receipts. Does no sealing and transaction validation. - fn import_block_with_receipts(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result; } /// Provides `call_contract` method @@ -201,8 +198,21 @@ pub trait EngineInfo { fn engine(&self) -> &EthEngine; } +/// IO operations that should off-load heavy work to another thread. +pub trait IoClient: Sync + Send { + /// Queue transactions for importing. + fn queue_transactions(&self, transactions: Vec, peer_id: usize); + + /// Queue block import with transaction receipts. Does no sealing and transaction validation. + fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result; + + /// Queue conensus engine message. + fn queue_consensus_message(&self, message: Bytes); +} + /// Blockchain database client. Owns and manages a blockchain and a block queue. -pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContract + RegistryInfo + ImportBlock { +pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContract + RegistryInfo + ImportBlock ++ IoClient { /// Look up the block number for the given block ID. fn block_number(&self, id: BlockId) -> Option; @@ -310,12 +320,6 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra /// Get last hashes starting from best block. fn last_hashes(&self) -> LastHashes; - /// Queue transactions for importing. - fn queue_transactions(&self, transactions: Vec, peer_id: usize); - - /// Queue conensus engine message. - fn queue_consensus_message(&self, message: Bytes); - /// List all transactions that are allowed into the next block. fn ready_transactions(&self) -> Vec>; diff --git a/ethcore/src/views/block.rs b/ethcore/src/views/block.rs index f610504d8..3bed1818f 100644 --- a/ethcore/src/views/block.rs +++ b/ethcore/src/views/block.rs @@ -29,7 +29,6 @@ pub struct BlockView<'a> { rlp: ViewRlp<'a> } - impl<'a> BlockView<'a> { /// Creates new view onto block from rlp. /// Use the `view!` macro to create this view in order to capture debugging info. @@ -39,9 +38,9 @@ impl<'a> BlockView<'a> { /// ``` /// #[macro_use] /// extern crate ethcore; - /// + /// /// use ethcore::views::{BlockView}; - /// + /// /// fn main() { /// let bytes : &[u8] = &[]; /// let block_view = view!(BlockView, bytes); diff --git a/ethcore/sync/Cargo.toml b/ethcore/sync/Cargo.toml index d2d060e68..ba03075d0 100644 --- a/ethcore/sync/Cargo.toml +++ b/ethcore/sync/Cargo.toml @@ -30,6 +30,7 @@ heapsize = "0.4" semver = "0.9" smallvec = { version = "0.4", features = ["heapsizeof"] } parking_lot = "0.5" +trace-time = { path = "../../util/trace-time" } ipnetwork = "0.12.6" [dev-dependencies] diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 7690eeb86..4fd0cbb54 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -379,10 +379,12 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + trace_time!("sync::read"); ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data); } fn connected(&self, io: &NetworkContext, peer: &PeerId) { + trace_time!("sync::connected"); // If warp protocol is supported only allow warp handshake let warp_protocol = io.protocol_version(WARP_SYNC_PROTOCOL_ID, *peer).unwrap_or(0) != 0; let warp_context = io.subprotocol_name() == WARP_SYNC_PROTOCOL_ID; @@ -392,12 +394,14 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + trace_time!("sync::disconnected"); if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer); } } fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { + trace_time!("sync::timeout"); let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); self.sync.write().maintain_peers(&mut io); self.sync.write().maintain_sync(&mut io); diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 4a5acae52..7411fa30c 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -496,7 +496,7 @@ impl BlockDownloader { } let result = if let Some(receipts) = receipts { - io.chain().import_block_with_receipts(block, receipts) + io.chain().queue_ancient_block(block, receipts) } else { io.chain().import_block(block) }; diff --git a/ethcore/sync/src/chain.rs b/ethcore/sync/src/chain.rs index 25d9a09f6..1a6af1011 100644 --- a/ethcore/sync/src/chain.rs +++ b/ethcore/sync/src/chain.rs @@ -1789,10 +1789,13 @@ impl ChainSync { } pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { + debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id); + if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) { debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer)); return; } + let rlp = Rlp::new(data); let result = match packet_id { STATUS_PACKET => self.on_peer_status(io, peer, &rlp), @@ -1831,7 +1834,7 @@ impl ChainSync { PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT, }; if timeout { - trace!(target:"sync", "Timeout {}", peer_id); + debug!(target:"sync", "Timeout {}", peer_id); io.disconnect_peer(*peer_id); aborting.push(*peer_id); } diff --git a/ethcore/sync/src/lib.rs b/ethcore/sync/src/lib.rs index a3e24bdb8..3eb2e8332 100644 --- a/ethcore/sync/src/lib.rs +++ b/ethcore/sync/src/lib.rs @@ -54,6 +54,8 @@ extern crate macros; extern crate log; #[macro_use] extern crate heapsize; +#[macro_use] +extern crate trace_time; mod chain; mod blocks; diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index dc52fdd8b..3a4697cc0 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -520,11 +520,9 @@ impl TestIoHandler { impl IoHandler for TestIoHandler { fn message(&self, _io: &IoContext, net_message: &ClientIoMessage) { match *net_message { - ClientIoMessage::NewMessage(ref message) => if let Err(e) = self.client.engine().handle_message(message) { - panic!("Invalid message received: {}", e); - }, - ClientIoMessage::NewPrivateTransaction => { + ClientIoMessage::Execute(ref exec) => { *self.private_tx_queued.lock() += 1; + (*exec.0)(&self.client); }, _ => {} // ignore other messages } diff --git a/ethcore/sync/src/tests/private.rs b/ethcore/sync/src/tests/private.rs index a9e8718e5..b54240bfb 100644 --- a/ethcore/sync/src/tests/private.rs +++ b/ethcore/sync/src/tests/private.rs @@ -24,7 +24,7 @@ use ethcore::CreateContractAddress; use transaction::{Transaction, Action}; use ethcore::executive::{contract_address}; use ethcore::test_helpers::{push_block_with_transactions}; -use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor}; +use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer}; use ethcore::account_provider::AccountProvider; use ethkey::{KeyPair}; use tests::helpers::{TestNet, TestIoHandler}; @@ -84,7 +84,7 @@ fn send_private_transaction() { Box::new(NoopEncryptor::default()), signer_config, IoChannel::to_handler(Arc::downgrade(&io_handler0)), - ).unwrap()); + )); pm0.add_notify(net.peers[0].clone()); let pm1 = Arc::new(Provider::new( @@ -94,7 +94,7 @@ fn send_private_transaction() { Box::new(NoopEncryptor::default()), validator_config, IoChannel::to_handler(Arc::downgrade(&io_handler1)), - ).unwrap()); + )); pm1.add_notify(net.peers[1].clone()); // Create and deploy contract @@ -133,7 +133,6 @@ fn send_private_transaction() { //process received private transaction message let private_transaction = received_private_transactions[0].clone(); assert!(pm1.import_private_transaction(&private_transaction).is_ok()); - assert!(pm1.on_private_transaction_queued().is_ok()); //send signed response net.sync(); @@ -147,4 +146,4 @@ fn send_private_transaction() { assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok()); let local_transactions = net.peer(0).miner.local_transactions(); assert_eq!(local_transactions.len(), 1); -} \ No newline at end of file +} diff --git a/util/io/src/lib.rs b/util/io/src/lib.rs index 20b908ac9..9232b2a90 100644 --- a/util/io/src/lib.rs +++ b/util/io/src/lib.rs @@ -106,7 +106,7 @@ impl From<::std::io::Error> for IoError { } } -impl From>> for IoError where Message: Send + Clone { +impl From>> for IoError where Message: Send { fn from(_err: NotifyError>) -> IoError { IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) } @@ -115,7 +115,7 @@ impl From>> for IoError where M /// Generic IO handler. /// All the handler function are called from within IO event loop. /// `Message` type is used as notification data -pub trait IoHandler: Send + Sync where Message: Send + Sync + Clone + 'static { +pub trait IoHandler: Send + Sync where Message: Send + Sync + 'static { /// Initialize the handler fn initialize(&self, _io: &IoContext) {} /// Timer function called after a timeout created with `HandlerIo::timeout`. diff --git a/util/io/src/service.rs b/util/io/src/service.rs index 19f2d4b3b..0de674ae1 100644 --- a/util/io/src/service.rs +++ b/util/io/src/service.rs @@ -41,7 +41,7 @@ const MAX_HANDLERS: usize = 8; /// Messages used to communicate with the event loop from other threads. #[derive(Clone)] -pub enum IoMessage where Message: Send + Clone + Sized { +pub enum IoMessage where Message: Send + Sized { /// Shutdown the event loop Shutdown, /// Register a new protocol handler. @@ -74,16 +74,16 @@ pub enum IoMessage where Message: Send + Clone + Sized { token: StreamToken, }, /// Broadcast a message across all protocol handlers. - UserMessage(Message) + UserMessage(Arc) } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct IoContext where Message: Send + Clone + Sync + 'static { +pub struct IoContext where Message: Send + Sync + 'static { channel: IoChannel, handler: HandlerId, } -impl IoContext where Message: Send + Clone + Sync + 'static { +impl IoContext where Message: Send + Sync + 'static { /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. pub fn new(channel: IoChannel, handler: HandlerId) -> IoContext { IoContext { @@ -187,7 +187,7 @@ pub struct IoManager where Message: Send + Sync { work_ready: Arc, } -impl IoManager where Message: Send + Sync + Clone + 'static { +impl IoManager where Message: Send + Sync + 'static { /// Creates a new instance and registers it with the event loop. pub fn start( event_loop: &mut EventLoop>, @@ -219,7 +219,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { } } -impl Handler for IoManager where Message: Send + Clone + Sync + 'static { +impl Handler for IoManager where Message: Send + Sync + 'static { type Timeout = Token; type Message = IoMessage; @@ -317,7 +317,12 @@ impl Handler for IoManager where Message: Send + Clone + Sync for id in 0 .. MAX_HANDLERS { if let Some(h) = self.handlers.read().get(id) { let handler = h.clone(); - self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id }); + self.worker_channel.push(Work { + work_type: WorkType::Message(data.clone()), + token: 0, + handler: handler, + handler_id: id + }); } } self.work_ready.notify_all(); @@ -326,21 +331,30 @@ impl Handler for IoManager where Message: Send + Clone + Sync } } -#[derive(Clone)] -enum Handlers where Message: Send + Clone { +enum Handlers where Message: Send { SharedCollection(Weak>, HandlerId>>>), Single(Weak>), } -/// Allows sending messages into the event loop. All the IO handlers will get the message -/// in the `message` callback. -pub struct IoChannel where Message: Send + Clone{ - channel: Option>>, - handlers: Handlers, +impl Clone for Handlers { + fn clone(&self) -> Self { + use self::Handlers::*; + match *self { + SharedCollection(ref w) => SharedCollection(w.clone()), + Single(ref w) => Single(w.clone()), + } + } } -impl Clone for IoChannel where Message: Send + Clone + Sync + 'static { +/// Allows sending messages into the event loop. All the IO handlers will get the message +/// in the `message` callback. +pub struct IoChannel where Message: Send { + channel: Option>>, + handlers: Handlers, +} + +impl Clone for IoChannel where Message: Send + Sync + 'static { fn clone(&self) -> IoChannel { IoChannel { channel: self.channel.clone(), @@ -349,11 +363,11 @@ impl Clone for IoChannel where Message: Send + Clone + Sync + } } -impl IoChannel where Message: Send + Clone + Sync + 'static { +impl IoChannel where Message: Send + Sync + 'static { /// Send a message through the channel pub fn send(&self, message: Message) -> Result<(), IoError> { match self.channel { - Some(ref channel) => channel.send(IoMessage::UserMessage(message))?, + Some(ref channel) => channel.send(IoMessage::UserMessage(Arc::new(message)))?, None => self.send_sync(message)? } Ok(()) @@ -413,13 +427,13 @@ impl IoChannel where Message: Send + Clone + Sync + 'static { /// General IO Service. Starts an event loop and dispatches IO requests. /// 'Message' is a notification message type -pub struct IoService where Message: Send + Sync + Clone + 'static { +pub struct IoService where Message: Send + Sync + 'static { thread: Mutex>>, host_channel: Mutex>>, handlers: Arc>, HandlerId>>>, } -impl IoService where Message: Send + Sync + Clone + 'static { +impl IoService where Message: Send + Sync + 'static { /// Starts IO event loop pub fn start() -> Result, IoError> { let mut config = EventLoopBuilder::new(); @@ -462,7 +476,7 @@ impl IoService where Message: Send + Sync + Clone + 'static { /// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads. pub fn send_message(&self, message: Message) -> Result<(), IoError> { - self.host_channel.lock().send(IoMessage::UserMessage(message))?; + self.host_channel.lock().send(IoMessage::UserMessage(Arc::new(message)))?; Ok(()) } @@ -472,7 +486,7 @@ impl IoService where Message: Send + Sync + Clone + 'static { } } -impl Drop for IoService where Message: Send + Sync + Clone { +impl Drop for IoService where Message: Send + Sync { fn drop(&mut self) { self.stop() } diff --git a/util/io/src/worker.rs b/util/io/src/worker.rs index 79570d361..0f0d448ec 100644 --- a/util/io/src/worker.rs +++ b/util/io/src/worker.rs @@ -38,7 +38,7 @@ pub enum WorkType { Writable, Hup, Timeout, - Message(Message) + Message(Arc) } pub struct Work { @@ -65,7 +65,7 @@ impl Worker { wait: Arc, wait_mutex: Arc>, ) -> Worker - where Message: Send + Sync + Clone + 'static { + where Message: Send + Sync + 'static { let deleting = Arc::new(AtomicBool::new(false)); let mut worker = Worker { thread: None, @@ -86,7 +86,7 @@ impl Worker { channel: IoChannel, wait: Arc, wait_mutex: Arc>, deleting: Arc) - where Message: Send + Sync + Clone + 'static { + where Message: Send + Sync + 'static { loop { { let lock = wait_mutex.lock().expect("Poisoned work_loop mutex"); @@ -105,7 +105,7 @@ impl Worker { } } - fn do_work(work: Work, channel: IoChannel) where Message: Send + Sync + Clone + 'static { + fn do_work(work: Work, channel: IoChannel) where Message: Send + Sync + 'static { match work.work_type { WorkType::Readable => { work.handler.stream_readable(&IoContext::new(channel, work.handler_id), work.token); @@ -120,7 +120,7 @@ impl Worker { work.handler.timeout(&IoContext::new(channel, work.handler_id), work.token); } WorkType::Message(message) => { - work.handler.message(&IoContext::new(channel, work.handler_id), &message); + work.handler.message(&IoContext::new(channel, work.handler_id), &*message); } } }