fix deadlock
This commit is contained in:
parent
294e89e5c0
commit
7929a145e7
@ -354,14 +354,15 @@ impl Engine for Tendermint {
|
|||||||
fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> {
|
fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> {
|
||||||
let message: ConsensusMessage = try!(rlp.as_val());
|
let message: ConsensusMessage = try!(rlp.as_val());
|
||||||
// Check if the message is known.
|
// Check if the message is known.
|
||||||
if self.votes.is_known(&message) {
|
if !self.votes.is_known(&message) {
|
||||||
let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3())));
|
let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3())));
|
||||||
if !self.is_authority(&sender) {
|
if !self.is_authority(&sender) {
|
||||||
try!(Err(EngineError::NotAuthorized(sender)));
|
try!(Err(EngineError::NotAuthorized(sender)));
|
||||||
}
|
}
|
||||||
self.votes.vote(message.clone(), sender);
|
|
||||||
|
|
||||||
trace!(target: "poa", "handle_message: Processing new authorized message: {:?}", &message);
|
trace!(target: "poa", "handle_message: Processing new authorized message: {:?}", &message);
|
||||||
|
self.votes.vote(message.clone(), sender);
|
||||||
|
|
||||||
self.broadcast_message(rlp.as_raw().to_vec());
|
self.broadcast_message(rlp.as_raw().to_vec());
|
||||||
let is_newer_than_lock = match *self.lock_change.read() {
|
let is_newer_than_lock = match *self.lock_change.read() {
|
||||||
Some(ref lock) => &message > lock,
|
Some(ref lock) => &message > lock,
|
||||||
@ -381,8 +382,6 @@ impl Engine for Tendermint {
|
|||||||
self.increment_round(1);
|
self.increment_round(1);
|
||||||
Some(Step::Propose)
|
Some(Step::Propose)
|
||||||
} else {
|
} else {
|
||||||
// Remove old messages.
|
|
||||||
self.votes.throw_out_old(&message);
|
|
||||||
Some(Step::Commit)
|
Some(Step::Commit)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -504,7 +503,6 @@ mod tests {
|
|||||||
use spec::Spec;
|
use spec::Spec;
|
||||||
use engines::{Engine, EngineError};
|
use engines::{Engine, EngineError};
|
||||||
use super::*;
|
use super::*;
|
||||||
use super::params::TendermintParams;
|
|
||||||
use super::message::*;
|
use super::message::*;
|
||||||
|
|
||||||
/// Accounts inserted with "1" and "2" are authorities. First proposer is "0".
|
/// Accounts inserted with "1" and "2" are authorities. First proposer is "0".
|
||||||
@ -712,5 +710,6 @@ mod tests {
|
|||||||
|
|
||||||
::std::thread::sleep(::std::time::Duration::from_millis(500));
|
::std::thread::sleep(::std::time::Duration::from_millis(500));
|
||||||
assert_eq!(test_io.received.read()[5], ClientIoMessage::SubmitSeal(proposal.unwrap(), seal));
|
assert_eq!(test_io.received.read()[5], ClientIoMessage::SubmitSeal(proposal.unwrap(), seal));
|
||||||
|
println!("{:?}", *test_io.received.read());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -58,12 +58,14 @@ impl VoteCollector {
|
|||||||
|
|
||||||
/// Insert vote if it is newer than the oldest one.
|
/// Insert vote if it is newer than the oldest one.
|
||||||
pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option<Address> {
|
pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option<Address> {
|
||||||
if {
|
let is_new = {
|
||||||
let guard = self.votes.read();
|
let guard = self.votes.read();
|
||||||
guard.keys().next().map_or(true, |oldest| &message > oldest)
|
guard.keys().next().map_or(true, |oldest| &message > oldest)
|
||||||
} {
|
};
|
||||||
|
if is_new {
|
||||||
self.votes.write().insert(message, voter)
|
self.votes.write().insert(message, voter)
|
||||||
} else {
|
} else {
|
||||||
|
trace!(target: "poa", "vote: Old message ignored {:?}.", message);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -80,17 +82,22 @@ impl VoteCollector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn seal_signatures(&self, height: Height, round: Round, block_hash: H256) -> Option<SealSignatures> {
|
pub fn seal_signatures(&self, height: Height, round: Round, block_hash: H256) -> Option<SealSignatures> {
|
||||||
let guard = self.votes.read();
|
|
||||||
let bh = Some(block_hash);
|
let bh = Some(block_hash);
|
||||||
let mut current_signatures = guard.keys()
|
let (proposal, votes) = {
|
||||||
.skip_while(|m| !m.is_block_hash(height, round, Step::Propose, bh));
|
let guard = self.votes.read();
|
||||||
current_signatures.next().map(|proposal| SealSignatures {
|
let mut current_signatures = guard.keys().skip_while(|m| !m.is_block_hash(height, round, Step::Propose, bh));
|
||||||
proposal: proposal.signature,
|
let proposal = current_signatures.next().cloned();
|
||||||
votes: current_signatures
|
let votes = current_signatures
|
||||||
.skip_while(|m| !m.is_block_hash(height, round, Step::Precommit, bh))
|
.skip_while(|m| !m.is_block_hash(height, round, Step::Precommit, bh))
|
||||||
.filter(|m| m.is_block_hash(height, round, Step::Precommit, bh))
|
.filter(|m| m.is_block_hash(height, round, Step::Precommit, bh))
|
||||||
.map(|m| m.signature.clone())
|
.cloned()
|
||||||
.collect()
|
.collect::<Vec<_>>();
|
||||||
|
(proposal, votes)
|
||||||
|
};
|
||||||
|
votes.last().map(|m| self.throw_out_old(m));
|
||||||
|
proposal.map(|p| SealSignatures {
|
||||||
|
proposal: p.signature,
|
||||||
|
votes: votes.into_iter().map(|m| m.signature).collect()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user