Don't block sync when importing old blocks (#8530)

* Alter IO queueing.

* Don't require IoMessages to be Clone

* Ancient blocks imported via IoChannel.

* Get rid of private transactions io message.

* Get rid of deadlock and fix disconnected handler.

* Revert to old disconnect condition.

* Fix tests.

* Fix deadlock.
This commit is contained in:
Tomasz Drwięga 2018-05-09 08:49:34 +02:00 committed by Afri Schoedon
parent 7a00d97977
commit 24838bbcd3
23 changed files with 455 additions and 332 deletions

2
Cargo.lock generated
View File

@ -818,6 +818,7 @@ dependencies = [
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"stop-guard 0.1.0",
"tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"trace-time 0.1.0",
]
[[package]]
@ -866,6 +867,7 @@ dependencies = [
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"trace-time 0.1.0",
"triehash 0.1.0",
]

View File

@ -149,8 +149,8 @@ impl Provider where {
encryptor: Box<Encryptor>,
config: ProviderConfig,
channel: IoChannel<ClientIoMessage>,
) -> Result<Self, Error> {
Ok(Provider {
) -> Self {
Provider {
encryptor,
validator_accounts: config.validator_accounts.into_iter().collect(),
signer_account: config.signer_account,
@ -162,7 +162,7 @@ impl Provider where {
miner,
accounts,
channel,
})
}
}
// TODO [ToDr] Don't use `ChainNotify` here!
@ -243,50 +243,6 @@ impl Provider where {
Ok(original_transaction)
}
/// Process received private transaction
pub fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
trace!("Private transaction received");
let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?;
let contract = private_tx.contract;
let contract_validators = self.get_validators(BlockId::Latest, &contract)?;
let validation_account = contract_validators
.iter()
.find(|address| self.validator_accounts.contains(address));
match validation_account {
None => {
// TODO [ToDr] This still seems a bit invalid, imho we should still import the transaction to the pool.
// Importing to pool verifies correctness and nonce; here we are just blindly forwarding.
//
// Not for verification, broadcast further to peers
self.broadcast_private_transaction(rlp.into());
return Ok(());
},
Some(&validation_account) => {
let hash = private_tx.hash();
trace!("Private transaction taken for verification");
let original_tx = self.extract_original_transaction(private_tx, &contract)?;
trace!("Validating transaction: {:?}", original_tx);
// Verify with the first account available
trace!("The following account will be used for verification: {:?}", validation_account);
let nonce_cache = Default::default();
self.transactions_for_verification.lock().add_transaction(
original_tx,
contract,
validation_account,
hash,
self.pool_client(&nonce_cache),
)?;
// NOTE This will just fire `on_private_transaction_queued` but from a client thread.
// It seems that a lot of heavy work (verification) is done in this thread anyway
// it might actually make sense to decouple it from clientService and just use dedicated thread
// for both verification and execution.
self.channel.send(ClientIoMessage::NewPrivateTransaction).map_err(|_| ErrorKind::ClientIsMalformed.into())
}
}
}
fn pool_client<'a>(&'a self, nonce_cache: &'a RwLock<HashMap<Address, U256>>) -> miner::pool_client::PoolClient<'a, Client> {
let engine = self.client.engine();
let refuse_service_transactions = true;
@ -299,11 +255,6 @@ impl Provider where {
)
}
/// Private transaction for validation added into queue
pub fn on_private_transaction_queued(&self) -> Result<(), Error> {
self.process_queue()
}
/// Retrieve and verify the first available private transaction for every sender
///
/// TODO [ToDr] It seems that:
@ -347,73 +298,6 @@ impl Provider where {
Ok(())
}
/// Add signed private transaction into the store
/// Creates corresponding public transaction if last required singature collected and sends it to the chain
pub fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?;
trace!("Signature for private transaction received: {:?}", tx);
let private_hash = tx.private_transaction_hash();
let desc = match self.transactions_for_signing.lock().get(&private_hash) {
None => {
// TODO [ToDr] Verification (we can't just blindly forward every transaction)
// Not our transaction, broadcast further to peers
self.broadcast_signed_private_transaction(rlp.into());
return Ok(());
},
Some(desc) => desc,
};
let last = self.last_required_signature(&desc, tx.signature())?;
if last {
let mut signatures = desc.received_signatures.clone();
signatures.push(tx.signature());
let rsv: Vec<Signature> = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect();
//Create public transaction
let public_tx = self.public_transaction(
desc.state.clone(),
&desc.original_transaction,
&rsv,
desc.original_transaction.nonce,
desc.original_transaction.gas_price
)?;
trace!("Last required signature received, public transaction created: {:?}", public_tx);
//Sign and add it to the queue
let chain_id = desc.original_transaction.chain_id();
let hash = public_tx.hash(chain_id);
let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?;
let password = find_account_password(&self.passwords, &*self.accounts, &signer_account);
let signature = self.accounts.sign(signer_account, password, hash)?;
let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?;
match self.miner.import_own_transaction(&*self.client, signed.into()) {
Ok(_) => trace!("Public transaction added to queue"),
Err(err) => {
trace!("Failed to add transaction to queue, error: {:?}", err);
bail!(err);
}
}
//Remove from store for signing
match self.transactions_for_signing.lock().remove(&private_hash) {
Ok(_) => {}
Err(err) => {
trace!("Failed to remove transaction from signing store, error: {:?}", err);
bail!(err);
}
}
} else {
//Add signature to the store
match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) {
Ok(_) => trace!("Signature stored for private transaction"),
Err(err) => {
trace!("Failed to add signature to signing store, error: {:?}", err);
bail!(err);
}
}
}
Ok(())
}
fn last_required_signature(&self, desc: &PrivateTransactionSigningDesc, sign: Signature) -> Result<bool, Error> {
if desc.received_signatures.contains(&sign) {
return Ok(false);
@ -657,6 +541,134 @@ impl Provider where {
}
}
pub trait Importer {
/// Process received private transaction
fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>;
/// Add signed private transaction into the store
///
/// Creates corresponding public transaction if last required signature collected and sends it to the chain
fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>;
}
// TODO [ToDr] Offload more heavy stuff to the IoService thread.
// It seems that a lot of heavy work (verification) is done in this thread anyway
// it might actually make sense to decouple it from clientService and just use dedicated thread
// for both verification and execution.
impl Importer for Arc<Provider> {
fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
trace!("Private transaction received");
let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?;
let contract = private_tx.contract;
let contract_validators = self.get_validators(BlockId::Latest, &contract)?;
let validation_account = contract_validators
.iter()
.find(|address| self.validator_accounts.contains(address));
match validation_account {
None => {
// TODO [ToDr] This still seems a bit invalid, imho we should still import the transaction to the pool.
// Importing to pool verifies correctness and nonce; here we are just blindly forwarding.
//
// Not for verification, broadcast further to peers
self.broadcast_private_transaction(rlp.into());
return Ok(());
},
Some(&validation_account) => {
let hash = private_tx.hash();
trace!("Private transaction taken for verification");
let original_tx = self.extract_original_transaction(private_tx, &contract)?;
trace!("Validating transaction: {:?}", original_tx);
// Verify with the first account available
trace!("The following account will be used for verification: {:?}", validation_account);
let nonce_cache = Default::default();
self.transactions_for_verification.lock().add_transaction(
original_tx,
contract,
validation_account,
hash,
self.pool_client(&nonce_cache),
)?;
let provider = Arc::downgrade(self);
self.channel.send(ClientIoMessage::execute(move |_| {
if let Some(provider) = provider.upgrade() {
if let Err(e) = provider.process_queue() {
debug!("Unable to process the queue: {}", e);
}
}
})).map_err(|_| ErrorKind::ClientIsMalformed.into())
}
}
}
fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?;
trace!("Signature for private transaction received: {:?}", tx);
let private_hash = tx.private_transaction_hash();
let desc = match self.transactions_for_signing.lock().get(&private_hash) {
None => {
// TODO [ToDr] Verification (we can't just blindly forward every transaction)
// Not our transaction, broadcast further to peers
self.broadcast_signed_private_transaction(rlp.into());
return Ok(());
},
Some(desc) => desc,
};
let last = self.last_required_signature(&desc, tx.signature())?;
if last {
let mut signatures = desc.received_signatures.clone();
signatures.push(tx.signature());
let rsv: Vec<Signature> = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect();
//Create public transaction
let public_tx = self.public_transaction(
desc.state.clone(),
&desc.original_transaction,
&rsv,
desc.original_transaction.nonce,
desc.original_transaction.gas_price
)?;
trace!("Last required signature received, public transaction created: {:?}", public_tx);
//Sign and add it to the queue
let chain_id = desc.original_transaction.chain_id();
let hash = public_tx.hash(chain_id);
let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?;
let password = find_account_password(&self.passwords, &*self.accounts, &signer_account);
let signature = self.accounts.sign(signer_account, password, hash)?;
let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?;
match self.miner.import_own_transaction(&*self.client, signed.into()) {
Ok(_) => trace!("Public transaction added to queue"),
Err(err) => {
trace!("Failed to add transaction to queue, error: {:?}", err);
bail!(err);
}
}
//Remove from store for signing
match self.transactions_for_signing.lock().remove(&private_hash) {
Ok(_) => {}
Err(err) => {
trace!("Failed to remove transaction from signing store, error: {:?}", err);
bail!(err);
}
}
} else {
//Add signature to the store
match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) {
Ok(_) => trace!("Signature stored for private transaction"),
Err(err) => {
trace!("Failed to add signature to signing store, error: {:?}", err);
bail!(err);
}
}
}
Ok(())
}
}
/// Try to unlock account using stored password, return found password if any
fn find_account_password(passwords: &Vec<String>, account_provider: &AccountProvider, account: &Address) -> Option<String> {
for password in passwords {

View File

@ -74,7 +74,7 @@ fn private_contract() {
Box::new(NoopEncryptor::default()),
config,
io,
).unwrap());
));
let (address, _) = contract_address(CreateContractAddress::FromSenderAndNonce, &key1.address(), &0.into(), &[]);

View File

@ -13,6 +13,7 @@ ethcore-sync = { path = "../sync" }
kvdb = { path = "../../util/kvdb" }
log = "0.3"
stop-guard = { path = "../../util/stop-guard" }
trace-time = { path = "../../util/trace-time" }
[dev-dependencies]
tempdir = "0.3"

View File

@ -28,6 +28,9 @@ extern crate error_chain;
#[macro_use]
extern crate log;
#[macro_use]
extern crate trace_time;
#[cfg(test)]
extern crate tempdir;

View File

@ -33,7 +33,7 @@ use ethcore::snapshot::{RestorationStatus};
use ethcore::spec::Spec;
use ethcore::account_provider::AccountProvider;
use ethcore_private_tx;
use ethcore_private_tx::{self, Importer};
use Error;
pub struct PrivateTxService {
@ -112,14 +112,13 @@ impl ClientService {
account_provider,
encryptor,
private_tx_conf,
io_service.channel())?,
);
io_service.channel(),
));
let private_tx = Arc::new(PrivateTxService::new(provider));
let client_io = Arc::new(ClientIoHandler {
client: client.clone(),
snapshot: snapshot.clone(),
private_tx: private_tx.clone(),
});
io_service.register_handler(client_io)?;
@ -175,7 +174,6 @@ impl ClientService {
struct ClientIoHandler {
client: Arc<Client>,
snapshot: Arc<SnapshotService>,
private_tx: Arc<PrivateTxService>,
}
const CLIENT_TICK_TIMER: TimerToken = 0;
@ -191,6 +189,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
}
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
trace_time!("service::read");
match timer {
CLIENT_TICK_TIMER => {
use ethcore::snapshot::SnapshotService;
@ -203,20 +202,24 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
}
fn message(&self, _io: &IoContext<ClientIoMessage>, net_message: &ClientIoMessage) {
trace_time!("service::message");
use std::thread;
match *net_message {
ClientIoMessage::BlockVerified => { self.client.import_verified_blocks(); }
ClientIoMessage::NewTransactions(ref transactions, peer_id) => {
self.client.import_queued_transactions(transactions, peer_id);
ClientIoMessage::BlockVerified => {
self.client.import_verified_blocks();
}
ClientIoMessage::BeginRestoration(ref manifest) => {
if let Err(e) = self.snapshot.init_restore(manifest.clone(), true) {
warn!("Failed to initialize snapshot restoration: {}", e);
}
}
ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => self.snapshot.feed_state_chunk(*hash, chunk),
ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => self.snapshot.feed_block_chunk(*hash, chunk),
ClientIoMessage::FeedStateChunk(ref hash, ref chunk) => {
self.snapshot.feed_state_chunk(*hash, chunk)
}
ClientIoMessage::FeedBlockChunk(ref hash, ref chunk) => {
self.snapshot.feed_block_chunk(*hash, chunk)
}
ClientIoMessage::TakeSnapshot(num) => {
let client = self.client.clone();
let snapshot = self.snapshot.clone();
@ -231,12 +234,9 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
debug!(target: "snapshot", "Failed to initialize periodic snapshot thread: {:?}", e);
}
},
ClientIoMessage::NewMessage(ref message) => if let Err(e) = self.client.engine().handle_message(message) {
trace!(target: "poa", "Invalid message received: {}", e);
},
ClientIoMessage::NewPrivateTransaction => if let Err(e) = self.private_tx.provider.on_private_transaction_queued() {
warn!("Failed to handle private transaction {:?}", e);
},
ClientIoMessage::Execute(ref exec) => {
(*exec.0)(&self.client);
}
_ => {} // ignore other messages
}
}

View File

@ -32,16 +32,16 @@ const HEAVY_VERIFY_RATE: f32 = 0.02;
/// Ancient block verifier: import an ancient sequence of blocks in order from a starting
/// epoch.
pub struct AncientVerifier {
cur_verifier: RwLock<Box<EpochVerifier<EthereumMachine>>>,
cur_verifier: RwLock<Option<Box<EpochVerifier<EthereumMachine>>>>,
engine: Arc<EthEngine>,
}
impl AncientVerifier {
/// Create a new ancient block verifier with the given engine and initial verifier.
pub fn new(engine: Arc<EthEngine>, start_verifier: Box<EpochVerifier<EthereumMachine>>) -> Self {
/// Create a new ancient block verifier with the given engine.
pub fn new(engine: Arc<EthEngine>) -> Self {
AncientVerifier {
cur_verifier: RwLock::new(start_verifier),
engine: engine,
cur_verifier: RwLock::new(None),
engine,
}
}
@ -53,17 +53,49 @@ impl AncientVerifier {
header: &Header,
chain: &BlockChain,
) -> Result<(), ::error::Error> {
match rng.gen::<f32>() <= HEAVY_VERIFY_RATE {
true => self.cur_verifier.read().verify_heavy(header)?,
false => self.cur_verifier.read().verify_light(header)?,
// perform verification
let verified = if let Some(ref cur_verifier) = *self.cur_verifier.read() {
match rng.gen::<f32>() <= HEAVY_VERIFY_RATE {
true => cur_verifier.verify_heavy(header)?,
false => cur_verifier.verify_light(header)?,
}
true
} else {
false
};
// when there is no verifier initialize it.
// We use a bool flag to avoid double locking in the happy case
if !verified {
{
let mut cur_verifier = self.cur_verifier.write();
if cur_verifier.is_none() {
*cur_verifier = Some(self.initial_verifier(header, chain)?);
}
}
// Call again to verify.
return self.verify(rng, header, chain);
}
// ancient import will only use transitions obtained from the snapshot.
if let Some(transition) = chain.epoch_transition(header.number(), header.hash()) {
let v = self.engine.epoch_verifier(&header, &transition.proof).known_confirmed()?;
*self.cur_verifier.write() = v;
*self.cur_verifier.write() = Some(v);
}
Ok(())
}
fn initial_verifier(&self, header: &Header, chain: &BlockChain)
-> Result<Box<EpochVerifier<EthereumMachine>>, ::error::Error>
{
trace!(target: "client", "Initializing ancient block restoration.");
let current_epoch_data = chain.epoch_transitions()
.take_while(|&(_, ref t)| t.block_number < header.number())
.last()
.map(|(_, t)| t.proof)
.expect("At least one epoch entry (genesis) always stored; qed");
self.engine.epoch_verifier(&header, &current_epoch_data).known_confirmed()
}
}

View File

@ -15,15 +15,16 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Arc, Weak};
use std::time::{Instant, Duration};
use itertools::Itertools;
// util
use hash::keccak;
use bytes::Bytes;
use itertools::Itertools;
use journaldb;
use trie::{TrieSpec, TrieFactory, Trie};
use kvdb::{DBValue, KeyValueDB, DBTransaction};
@ -45,7 +46,8 @@ use client::{
use client::{
BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
TraceFilter, CallAnalytics, BlockImportError, Mode,
ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType
ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType,
IoClient,
};
use encoded;
use engines::{EthEngine, EpochTransition};
@ -55,7 +57,7 @@ use evm::Schedule;
use executive::{Executive, Executed, TransactOptions, contract_address};
use factory::{Factories, VmFactory};
use header::{BlockNumber, Header};
use io::IoChannel;
use io::{IoChannel, IoError};
use log_entry::LocalizedLogEntry;
use miner::{Miner, MinerService};
use ethcore_miner::pool::VerifiedTransaction;
@ -85,6 +87,7 @@ pub use verification::queue::QueueInfo as BlockQueueInfo;
use_contract!(registry, "Registry", "res/contracts/registrar.json");
const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
const MIN_HISTORY_SIZE: u64 = 8;
@ -154,10 +157,7 @@ struct Importer {
pub miner: Arc<Miner>,
/// Ancient block verifier: import an ancient sequence of blocks in order from a starting epoch
pub ancient_verifier: Mutex<Option<AncientVerifier>>,
/// Random number generator used by `AncientVerifier`
pub rng: Mutex<OsRng>,
pub ancient_verifier: AncientVerifier,
/// Ethereum engine to be used during import
pub engine: Arc<EthEngine>,
@ -204,8 +204,13 @@ pub struct Client {
/// List of actors to be notified on certain chain events
notify: RwLock<Vec<Weak<ChainNotify>>>,
/// Count of pending transactions in the queue
queue_transactions: AtomicUsize,
/// Queued transactions from IO
queue_transactions: IoChannelQueue,
/// Ancient blocks import queue
queue_ancient_blocks: IoChannelQueue,
/// Consensus messages import queue
queue_consensus_message: IoChannelQueue,
last_hashes: RwLock<VecDeque<H256>>,
factories: Factories,
@ -239,8 +244,7 @@ impl Importer {
verifier: verification::new(config.verifier_type.clone()),
block_queue,
miner,
ancient_verifier: Mutex::new(None),
rng: Mutex::new(OsRng::new()?),
ancient_verifier: AncientVerifier::new(engine.clone()),
engine,
})
}
@ -416,55 +420,25 @@ impl Importer {
Ok(locked_block)
}
/// Import a block with transaction receipts.
///
/// The block is guaranteed to be the next best blocks in the
/// first block sequence. Does no sealing or transaction validation.
fn import_old_block(&self, header: &Header, block_bytes: Bytes, receipts_bytes: Bytes, db: &KeyValueDB, chain: &BlockChain) -> Result<H256, ::error::Error> {
let receipts = ::rlp::decode_list(&receipts_bytes);
fn import_old_block(&self, header: &Header, block_bytes: &[u8], receipts_bytes: &[u8], db: &KeyValueDB, chain: &BlockChain) -> Result<H256, ::error::Error> {
let receipts = ::rlp::decode_list(receipts_bytes);
let hash = header.hash();
let _import_lock = self.import_lock.lock();
{
trace_time!("import_old_block");
let mut ancient_verifier = self.ancient_verifier.lock();
{
// closure for verifying a block.
let verify_with = |verifier: &AncientVerifier| -> Result<(), ::error::Error> {
// verify the block, passing the chain for updating the epoch
// verifier.
let mut rng = OsRng::new().map_err(UtilError::from)?;
verifier.verify(&mut rng, &header, &chain)
};
// initialize the ancient block verifier if we don't have one already.
match &mut *ancient_verifier {
&mut Some(ref verifier) => {
verify_with(verifier)?
}
x @ &mut None => {
// load most recent epoch.
trace!(target: "client", "Initializing ancient block restoration.");
let current_epoch_data = chain.epoch_transitions()
.take_while(|&(_, ref t)| t.block_number < header.number())
.last()
.map(|(_, t)| t.proof)
.expect("At least one epoch entry (genesis) always stored; qed");
let current_verifier = self.engine.epoch_verifier(&header, &current_epoch_data)
.known_confirmed()?;
let current_verifier = AncientVerifier::new(self.engine.clone(), current_verifier);
verify_with(&current_verifier)?;
*x = Some(current_verifier);
}
}
}
// verify the block, passing the chain for updating the epoch verifier.
let mut rng = OsRng::new().map_err(UtilError::from)?;
self.ancient_verifier.verify(&mut rng, &header, &chain)?;
// Commit results
let mut batch = DBTransaction::new();
chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, false, true);
chain.insert_unordered_block(&mut batch, block_bytes, receipts, None, false, true);
// Final commit to the DB
db.write_buffered(batch);
chain.commit();
@ -734,7 +708,9 @@ impl Client {
report: RwLock::new(Default::default()),
io_channel: Mutex::new(message_channel),
notify: RwLock::new(Vec::new()),
queue_transactions: AtomicUsize::new(0),
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
last_hashes: RwLock::new(VecDeque::new()),
factories: factories,
history: history,
@ -820,7 +796,7 @@ impl Client {
}
fn notify<F>(&self, f: F) where F: Fn(&ChainNotify) {
for np in self.notify.read().iter() {
for np in &*self.notify.read() {
if let Some(n) = np.upgrade() {
f(&*n);
}
@ -954,24 +930,6 @@ impl Client {
}
}
/// Import transactions from the IO queue
pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: usize) -> usize {
trace_time!("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let txs: Vec<UnverifiedTransaction> = transactions
.iter()
.filter_map(|bytes| self.engine().decode_transaction(bytes).ok())
.collect();
self.notify(|notify| {
notify.transactions_received(&txs, peer_id);
});
let results = self.importer.miner.import_external_transactions(self, txs);
results.len()
}
/// Get shared miner reference.
#[cfg(test)]
pub fn miner(&self) -> Arc<Miner> {
@ -1392,22 +1350,6 @@ impl ImportBlock for Client {
}
Ok(self.importer.block_queue.import(unverified)?)
}
fn import_block_with_receipts(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError> {
let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
{
// check block order
if self.chain.read().is_known(&header.hash()) {
bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain));
}
let status = self.block_status(BlockId::Hash(*header.parent_hash()));
if status == BlockStatus::Unknown || status == BlockStatus::Pending {
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash())));
}
}
self.importer.import_old_block(&header, block_bytes, receipts_bytes, &**self.db.read(), &*self.chain.read()).map_err(Into::into)
}
}
impl StateClient for Client {
@ -1958,35 +1900,10 @@ impl BlockChainClient for Client {
(*self.build_last_hashes(&self.chain.read().best_block_hash())).clone()
}
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed);
trace!(target: "external_tx", "Queue size: {}", queue_size);
if queue_size > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len());
} else {
let len = transactions.len();
match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions, peer_id)) {
Ok(_) => {
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
}
Err(e) => {
debug!("Ignoring {} transactions: error queueing: {}", len, e);
}
}
}
}
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
self.importer.miner.ready_transactions(self)
}
fn queue_consensus_message(&self, message: Bytes) {
let channel = self.io_channel.lock().clone();
if let Err(e) = channel.send(ClientIoMessage::NewMessage(message)) {
debug!("Ignoring the message, error queueing: {}", e);
}
}
fn signing_chain_id(&self) -> Option<u64> {
self.engine.signing_chain_id(&self.latest_env_info())
}
@ -2034,6 +1951,72 @@ impl BlockChainClient for Client {
}
}
impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
let len = transactions.len();
self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
trace_time!("import_queued_transactions");
let txs: Vec<UnverifiedTransaction> = transactions
.iter()
.filter_map(|bytes| client.engine.decode_transaction(bytes).ok())
.collect();
client.notify(|notify| {
notify.transactions_received(&txs, peer_id);
});
client.importer.miner.import_external_transactions(client, txs);
}).unwrap_or_else(|e| {
debug!(target: "client", "Ignoring {} transactions: {}", len, e);
});
}
fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError> {
let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
let hash = header.hash();
{
// check block order
if self.chain.read().is_known(&header.hash()) {
bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain));
}
let status = self.block_status(BlockId::Hash(*header.parent_hash()));
if status == BlockStatus::Unknown || status == BlockStatus::Pending {
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash())));
}
}
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
client.importer.import_old_block(
&header,
&block_bytes,
&receipts_bytes,
&**client.db.read(),
&*client.chain.read()
).map(|_| ()).unwrap_or_else(|e| {
error!(target: "client", "Error importing ancient block: {}", e);
});
}) {
Ok(_) => Ok(hash),
Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))),
}
}
fn queue_consensus_message(&self, message: Bytes) {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
if let Err(e) = client.engine().handle_message(&message) {
debug!(target: "poa", "Invalid message received: {}", e);
}
}) {
Ok(_) => (),
Err(e) => {
debug!(target: "poa", "Ignoring the message, error queueing: {}", e);
}
}
}
}
impl ReopenBlock for Client {
fn reopen_block(&self, block: ClosedBlock) -> OpenBlock {
let engine = &*self.engine;
@ -2409,3 +2392,54 @@ mod tests {
});
}
}
#[derive(Debug)]
enum QueueError {
Channel(IoError),
Full(usize),
}
impl fmt::Display for QueueError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
QueueError::Channel(ref c) => fmt::Display::fmt(c, fmt),
QueueError::Full(limit) => write!(fmt, "The queue is full ({})", limit),
}
}
}
/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
limit: usize,
}
impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
IoChannelQueue {
currently_queued: Default::default(),
limit,
}
}
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
ensure!(queue_size < self.limit, QueueError::Full(self.limit));
let currently_queued = self.currently_queued.clone();
let result = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
}));
match result {
Ok(_) => {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Err(e) => Err(QueueError::Channel(e)),
}
}
}

View File

@ -14,19 +14,19 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethereum_types::H256;
use std::fmt;
use bytes::Bytes;
use client::Client;
use ethereum_types::H256;
use snapshot::ManifestData;
/// Message type for external and internal events
#[derive(Clone, PartialEq, Eq, Debug)]
#[derive(Debug)]
pub enum ClientIoMessage {
/// Best Block Hash in chain has been changed
NewChainHead,
/// A block is ready
BlockVerified,
/// New transaction RLPs are ready to be imported
NewTransactions(Vec<Bytes>, usize),
/// Begin snapshot restoration
BeginRestoration(ManifestData),
/// Feed a state chunk to the snapshot service
@ -35,9 +35,23 @@ pub enum ClientIoMessage {
FeedBlockChunk(H256, Bytes),
/// Take a snapshot for the block with given number.
TakeSnapshot(u64),
/// New consensus message received.
NewMessage(Bytes),
/// New private transaction arrived
NewPrivateTransaction,
/// Execute wrapped closure
Execute(Callback),
}
impl ClientIoMessage {
/// Create new `ClientIoMessage` that executes given procedure.
pub fn execute<F: Fn(&Client) + Send + Sync + 'static>(fun: F) -> Self {
ClientIoMessage::Execute(Callback(Box::new(fun)))
}
}
/// A function to invoke in the client thread.
pub struct Callback(pub Box<Fn(&Client) + Send + Sync>);
impl fmt::Debug for Callback {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "<callback>")
}
}

View File

@ -36,9 +36,8 @@ pub use self::traits::{
Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock,
StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter
};
//pub use self::private_notify::PrivateNotify;
pub use state::StateInfo;
pub use self::traits::{BlockChainClient, EngineClient, ProvingBlockChainClient};
pub use self::traits::{BlockChainClient, EngineClient, ProvingBlockChainClient, IoClient};
pub use types::ids::*;
pub use types::trace_filter::Filter as TraceFilter;

View File

@ -39,7 +39,7 @@ use client::{
PrepareOpenBlock, BlockChainClient, BlockChainInfo, BlockStatus, BlockId,
TransactionId, UncleId, TraceId, TraceFilter, LastHashes, CallAnalytics, BlockImportError,
ProvingBlockChainClient, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock, StateOrBlock,
Call, StateClient, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter
Call, StateClient, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter, IoClient
};
use db::{NUM_COLUMNS, COL_STATE};
use header::{Header as BlockHeader, BlockNumber};
@ -556,10 +556,6 @@ impl ImportBlock for TestBlockChainClient {
}
Ok(h)
}
fn import_block_with_receipts(&self, b: Bytes, _r: Bytes) -> Result<H256, BlockImportError> {
self.import_block(b)
}
}
impl Call for TestBlockChainClient {
@ -809,16 +805,6 @@ impl BlockChainClient for TestBlockChainClient {
self.traces.read().clone()
}
fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: usize) {
// import right here
let txs = transactions.into_iter().filter_map(|bytes| Rlp::new(&bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, txs);
}
fn queue_consensus_message(&self, message: Bytes) {
self.spec.engine.handle_message(&message).unwrap();
}
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
self.miner.ready_transactions(self)
}
@ -863,6 +849,22 @@ impl BlockChainClient for TestBlockChainClient {
fn eip86_transition(&self) -> u64 { u64::max_value() }
}
impl IoClient for TestBlockChainClient {
fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: usize) {
// import right here
let txs = transactions.into_iter().filter_map(|bytes| Rlp::new(&bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, txs);
}
fn queue_ancient_block(&self, b: Bytes, _r: Bytes) -> Result<H256, BlockImportError> {
self.import_block(b)
}
fn queue_consensus_message(&self, message: Bytes) {
self.spec.engine.handle_message(&message).unwrap();
}
}
impl ProvingBlockChainClient for TestBlockChainClient {
fn prove_storage(&self, _: H256, _: H256, _: BlockId) -> Option<(Vec<Bytes>, H256)> {
None

View File

@ -168,9 +168,6 @@ pub trait RegistryInfo {
pub trait ImportBlock {
/// Import a block into the blockchain.
fn import_block(&self, bytes: Bytes) -> Result<H256, BlockImportError>;
/// Import a block with transaction receipts. Does no sealing and transaction validation.
fn import_block_with_receipts(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError>;
}
/// Provides `call_contract` method
@ -201,8 +198,21 @@ pub trait EngineInfo {
fn engine(&self) -> &EthEngine;
}
/// IO operations that should off-load heavy work to another thread.
pub trait IoClient: Sync + Send {
/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize);
/// Queue block import with transaction receipts. Does no sealing and transaction validation.
fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError>;
/// Queue conensus engine message.
fn queue_consensus_message(&self, message: Bytes);
}
/// Blockchain database client. Owns and manages a blockchain and a block queue.
pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContract + RegistryInfo + ImportBlock {
pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContract + RegistryInfo + ImportBlock
+ IoClient {
/// Look up the block number for the given block ID.
fn block_number(&self, id: BlockId) -> Option<BlockNumber>;
@ -310,12 +320,6 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
/// Get last hashes starting from best block.
fn last_hashes(&self) -> LastHashes;
/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize);
/// Queue conensus engine message.
fn queue_consensus_message(&self, message: Bytes);
/// List all transactions that are allowed into the next block.
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>>;

View File

@ -29,7 +29,6 @@ pub struct BlockView<'a> {
rlp: ViewRlp<'a>
}
impl<'a> BlockView<'a> {
/// Creates new view onto block from rlp.
/// Use the `view!` macro to create this view in order to capture debugging info.
@ -39,9 +38,9 @@ impl<'a> BlockView<'a> {
/// ```
/// #[macro_use]
/// extern crate ethcore;
///
///
/// use ethcore::views::{BlockView};
///
///
/// fn main() {
/// let bytes : &[u8] = &[];
/// let block_view = view!(BlockView, bytes);

View File

@ -30,6 +30,7 @@ heapsize = "0.4"
semver = "0.9"
smallvec = { version = "0.4", features = ["heapsizeof"] }
parking_lot = "0.5"
trace-time = { path = "../../util/trace-time" }
ipnetwork = "0.12.6"
[dev-dependencies]

View File

@ -379,10 +379,12 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
}
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
trace_time!("sync::read");
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
trace_time!("sync::connected");
// If warp protocol is supported only allow warp handshake
let warp_protocol = io.protocol_version(WARP_SYNC_PROTOCOL_ID, *peer).unwrap_or(0) != 0;
let warp_context = io.subprotocol_name() == WARP_SYNC_PROTOCOL_ID;
@ -392,12 +394,14 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
}
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
trace_time!("sync::disconnected");
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer);
}
}
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
trace_time!("sync::timeout");
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
self.sync.write().maintain_peers(&mut io);
self.sync.write().maintain_sync(&mut io);

View File

@ -496,7 +496,7 @@ impl BlockDownloader {
}
let result = if let Some(receipts) = receipts {
io.chain().import_block_with_receipts(block, receipts)
io.chain().queue_ancient_block(block, receipts)
} else {
io.chain().import_block(block)
};

View File

@ -1789,10 +1789,13 @@ impl ChainSync {
}
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
return;
}
let rlp = Rlp::new(data);
let result = match packet_id {
STATUS_PACKET => self.on_peer_status(io, peer, &rlp),
@ -1831,7 +1834,7 @@ impl ChainSync {
PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT,
};
if timeout {
trace!(target:"sync", "Timeout {}", peer_id);
debug!(target:"sync", "Timeout {}", peer_id);
io.disconnect_peer(*peer_id);
aborting.push(*peer_id);
}

View File

@ -54,6 +54,8 @@ extern crate macros;
extern crate log;
#[macro_use]
extern crate heapsize;
#[macro_use]
extern crate trace_time;
mod chain;
mod blocks;

View File

@ -520,11 +520,9 @@ impl TestIoHandler {
impl IoHandler<ClientIoMessage> for TestIoHandler {
fn message(&self, _io: &IoContext<ClientIoMessage>, net_message: &ClientIoMessage) {
match *net_message {
ClientIoMessage::NewMessage(ref message) => if let Err(e) = self.client.engine().handle_message(message) {
panic!("Invalid message received: {}", e);
},
ClientIoMessage::NewPrivateTransaction => {
ClientIoMessage::Execute(ref exec) => {
*self.private_tx_queued.lock() += 1;
(*exec.0)(&self.client);
},
_ => {} // ignore other messages
}

View File

@ -24,7 +24,7 @@ use ethcore::CreateContractAddress;
use transaction::{Transaction, Action};
use ethcore::executive::{contract_address};
use ethcore::test_helpers::{push_block_with_transactions};
use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor};
use ethcore_private_tx::{Provider, ProviderConfig, NoopEncryptor, Importer};
use ethcore::account_provider::AccountProvider;
use ethkey::{KeyPair};
use tests::helpers::{TestNet, TestIoHandler};
@ -84,7 +84,7 @@ fn send_private_transaction() {
Box::new(NoopEncryptor::default()),
signer_config,
IoChannel::to_handler(Arc::downgrade(&io_handler0)),
).unwrap());
));
pm0.add_notify(net.peers[0].clone());
let pm1 = Arc::new(Provider::new(
@ -94,7 +94,7 @@ fn send_private_transaction() {
Box::new(NoopEncryptor::default()),
validator_config,
IoChannel::to_handler(Arc::downgrade(&io_handler1)),
).unwrap());
));
pm1.add_notify(net.peers[1].clone());
// Create and deploy contract
@ -133,7 +133,6 @@ fn send_private_transaction() {
//process received private transaction message
let private_transaction = received_private_transactions[0].clone();
assert!(pm1.import_private_transaction(&private_transaction).is_ok());
assert!(pm1.on_private_transaction_queued().is_ok());
//send signed response
net.sync();
@ -147,4 +146,4 @@ fn send_private_transaction() {
assert!(pm0.import_signed_private_transaction(&signed_private_transaction).is_ok());
let local_transactions = net.peer(0).miner.local_transactions();
assert_eq!(local_transactions.len(), 1);
}
}

View File

@ -106,7 +106,7 @@ impl From<::std::io::Error> for IoError {
}
}
impl<Message> From<NotifyError<service::IoMessage<Message>>> for IoError where Message: Send + Clone {
impl<Message> From<NotifyError<service::IoMessage<Message>>> for IoError where Message: Send {
fn from(_err: NotifyError<service::IoMessage<Message>>) -> IoError {
IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error"))
}
@ -115,7 +115,7 @@ impl<Message> From<NotifyError<service::IoMessage<Message>>> for IoError where M
/// Generic IO handler.
/// All the handler function are called from within IO event loop.
/// `Message` type is used as notification data
pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + 'static {
pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + 'static {
/// Initialize the handler
fn initialize(&self, _io: &IoContext<Message>) {}
/// Timer function called after a timeout created with `HandlerIo::timeout`.

View File

@ -41,7 +41,7 @@ const MAX_HANDLERS: usize = 8;
/// Messages used to communicate with the event loop from other threads.
#[derive(Clone)]
pub enum IoMessage<Message> where Message: Send + Clone + Sized {
pub enum IoMessage<Message> where Message: Send + Sized {
/// Shutdown the event loop
Shutdown,
/// Register a new protocol handler.
@ -74,16 +74,16 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
token: StreamToken,
},
/// Broadcast a message across all protocol handlers.
UserMessage(Message)
UserMessage(Arc<Message>)
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct IoContext<Message> where Message: Send + Clone + Sync + 'static {
pub struct IoContext<Message> where Message: Send + Sync + 'static {
channel: IoChannel<Message>,
handler: HandlerId,
}
impl<Message> IoContext<Message> where Message: Send + Clone + Sync + 'static {
impl<Message> IoContext<Message> where Message: Send + Sync + 'static {
/// Create a new IO access point. Takes references to all the data that can be updated within the IO handler.
pub fn new(channel: IoChannel<Message>, handler: HandlerId) -> IoContext<Message> {
IoContext {
@ -187,7 +187,7 @@ pub struct IoManager<Message> where Message: Send + Sync {
work_ready: Arc<SCondvar>,
}
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
impl<Message> IoManager<Message> where Message: Send + Sync + 'static {
/// Creates a new instance and registers it with the event loop.
pub fn start(
event_loop: &mut EventLoop<IoManager<Message>>,
@ -219,7 +219,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
}
}
impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync + 'static {
impl<Message> Handler for IoManager<Message> where Message: Send + Sync + 'static {
type Timeout = Token;
type Message = IoMessage<Message>;
@ -317,7 +317,12 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
for id in 0 .. MAX_HANDLERS {
if let Some(h) = self.handlers.read().get(id) {
let handler = h.clone();
self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id });
self.worker_channel.push(Work {
work_type: WorkType::Message(data.clone()),
token: 0,
handler: handler,
handler_id: id
});
}
}
self.work_ready.notify_all();
@ -326,21 +331,30 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
}
}
#[derive(Clone)]
enum Handlers<Message> where Message: Send + Clone {
enum Handlers<Message> where Message: Send {
SharedCollection(Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>),
Single(Weak<IoHandler<Message>>),
}
/// Allows sending messages into the event loop. All the IO handlers will get the message
/// in the `message` callback.
pub struct IoChannel<Message> where Message: Send + Clone{
channel: Option<Sender<IoMessage<Message>>>,
handlers: Handlers<Message>,
impl<Message: Send> Clone for Handlers<Message> {
fn clone(&self) -> Self {
use self::Handlers::*;
match *self {
SharedCollection(ref w) => SharedCollection(w.clone()),
Single(ref w) => Single(w.clone()),
}
}
}
impl<Message> Clone for IoChannel<Message> where Message: Send + Clone + Sync + 'static {
/// Allows sending messages into the event loop. All the IO handlers will get the message
/// in the `message` callback.
pub struct IoChannel<Message> where Message: Send {
channel: Option<Sender<IoMessage<Message>>>,
handlers: Handlers<Message>,
}
impl<Message> Clone for IoChannel<Message> where Message: Send + Sync + 'static {
fn clone(&self) -> IoChannel<Message> {
IoChannel {
channel: self.channel.clone(),
@ -349,11 +363,11 @@ impl<Message> Clone for IoChannel<Message> where Message: Send + Clone + Sync +
}
}
impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
impl<Message> IoChannel<Message> where Message: Send + Sync + 'static {
/// Send a message through the channel
pub fn send(&self, message: Message) -> Result<(), IoError> {
match self.channel {
Some(ref channel) => channel.send(IoMessage::UserMessage(message))?,
Some(ref channel) => channel.send(IoMessage::UserMessage(Arc::new(message)))?,
None => self.send_sync(message)?
}
Ok(())
@ -413,13 +427,13 @@ impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
/// General IO Service. Starts an event loop and dispatches IO requests.
/// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
pub struct IoService<Message> where Message: Send + Sync + 'static {
thread: Mutex<Option<JoinHandle<()>>>,
host_channel: Mutex<Sender<IoMessage<Message>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
}
impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
impl<Message> IoService<Message> where Message: Send + Sync + 'static {
/// Starts IO event loop
pub fn start() -> Result<IoService<Message>, IoError> {
let mut config = EventLoopBuilder::new();
@ -462,7 +476,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads.
pub fn send_message(&self, message: Message) -> Result<(), IoError> {
self.host_channel.lock().send(IoMessage::UserMessage(message))?;
self.host_channel.lock().send(IoMessage::UserMessage(Arc::new(message)))?;
Ok(())
}
@ -472,7 +486,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
}
}
impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
impl<Message> Drop for IoService<Message> where Message: Send + Sync {
fn drop(&mut self) {
self.stop()
}

View File

@ -38,7 +38,7 @@ pub enum WorkType<Message> {
Writable,
Hup,
Timeout,
Message(Message)
Message(Arc<Message>)
}
pub struct Work<Message> {
@ -65,7 +65,7 @@ impl Worker {
wait: Arc<SCondvar>,
wait_mutex: Arc<SMutex<()>>,
) -> Worker
where Message: Send + Sync + Clone + 'static {
where Message: Send + Sync + 'static {
let deleting = Arc::new(AtomicBool::new(false));
let mut worker = Worker {
thread: None,
@ -86,7 +86,7 @@ impl Worker {
channel: IoChannel<Message>, wait: Arc<SCondvar>,
wait_mutex: Arc<SMutex<()>>,
deleting: Arc<AtomicBool>)
where Message: Send + Sync + Clone + 'static {
where Message: Send + Sync + 'static {
loop {
{
let lock = wait_mutex.lock().expect("Poisoned work_loop mutex");
@ -105,7 +105,7 @@ impl Worker {
}
}
fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>) where Message: Send + Sync + Clone + 'static {
fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>) where Message: Send + Sync + 'static {
match work.work_type {
WorkType::Readable => {
work.handler.stream_readable(&IoContext::new(channel, work.handler_id), work.token);
@ -120,7 +120,7 @@ impl Worker {
work.handler.timeout(&IoContext::new(channel, work.handler_id), work.token);
}
WorkType::Message(message) => {
work.handler.message(&IoContext::new(channel, work.handler_id), &message);
work.handler.message(&IoContext::new(channel, work.handler_id), &*message);
}
}
}