old message removal, avoid too many recoveries
This commit is contained in:
parent
61cf8b8b7e
commit
d0eab4a0d8
@ -353,14 +353,14 @@ impl Engine for Tendermint {
|
|||||||
|
|
||||||
fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> {
|
fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> {
|
||||||
let message: ConsensusMessage = try!(rlp.as_val());
|
let message: ConsensusMessage = try!(rlp.as_val());
|
||||||
|
// Check if the message is known.
|
||||||
|
if self.votes.is_known(&message) {
|
||||||
let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3())));
|
let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3())));
|
||||||
// TODO: Do not admit old messages.
|
|
||||||
if !self.is_authority(&sender) {
|
if !self.is_authority(&sender) {
|
||||||
try!(Err(BlockError::NotAuthorized(sender)));
|
try!(Err(BlockError::NotAuthorized(sender)));
|
||||||
}
|
}
|
||||||
|
self.votes.vote(message.clone(), sender);
|
||||||
|
|
||||||
// Check if the message is known.
|
|
||||||
if self.votes.vote(message.clone(), sender).is_none() {
|
|
||||||
trace!(target: "poa", "handle_message: Processing new authorized message: {:?}", &message);
|
trace!(target: "poa", "handle_message: Processing new authorized message: {:?}", &message);
|
||||||
self.broadcast_message(rlp.as_raw().to_vec());
|
self.broadcast_message(rlp.as_raw().to_vec());
|
||||||
let is_newer_than_lock = match *self.lock_change.read() {
|
let is_newer_than_lock = match *self.lock_change.read() {
|
||||||
@ -381,6 +381,8 @@ impl Engine for Tendermint {
|
|||||||
self.increment_round(1);
|
self.increment_round(1);
|
||||||
Some(Step::Propose)
|
Some(Step::Propose)
|
||||||
} else {
|
} else {
|
||||||
|
// Remove old messages.
|
||||||
|
self.votes.throw_out_old(&message);
|
||||||
Some(Step::Commit)
|
Some(Step::Commit)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -34,11 +34,40 @@ pub struct SealSignatures {
|
|||||||
|
|
||||||
impl VoteCollector {
|
impl VoteCollector {
|
||||||
pub fn new() -> VoteCollector {
|
pub fn new() -> VoteCollector {
|
||||||
VoteCollector { votes: RwLock::new(BTreeMap::new()) }
|
let mut collector = BTreeMap::new();
|
||||||
|
// Insert dummy message to fulfill invariant: "only messages newer than the oldest are inserted".
|
||||||
|
collector.insert(ConsensusMessage {
|
||||||
|
signature: H520::default(),
|
||||||
|
height: 0,
|
||||||
|
round: 0,
|
||||||
|
step: Step::Propose,
|
||||||
|
block_hash: None
|
||||||
|
},
|
||||||
|
Address::default());
|
||||||
|
VoteCollector { votes: RwLock::new(collector) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Insert vote if it is newer than the oldest one.
|
||||||
pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option<Address> {
|
pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option<Address> {
|
||||||
|
if {
|
||||||
|
let guard = self.votes.read();
|
||||||
|
guard.keys().next().map_or(true, |oldest| &message > oldest)
|
||||||
|
} {
|
||||||
self.votes.write().insert(message, voter)
|
self.votes.write().insert(message, voter)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_known(&self, message: &ConsensusMessage) -> bool {
|
||||||
|
self.votes.read().contains_key(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Throws out messages older than message, leaves message as marker for the oldest.
|
||||||
|
pub fn throw_out_old(&self, message: &ConsensusMessage) {
|
||||||
|
let mut guard = self.votes.write();
|
||||||
|
let new_collector = guard.split_off(message);
|
||||||
|
*guard = new_collector;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn seal_signatures(&self, height: Height, round: Round, block_hash: H256) -> Option<SealSignatures> {
|
pub fn seal_signatures(&self, height: Height, round: Round, block_hash: H256) -> Option<SealSignatures> {
|
||||||
@ -162,4 +191,26 @@ mod tests {
|
|||||||
};
|
};
|
||||||
assert_eq!(collector.count_aligned_votes(&message), 2);
|
assert_eq!(collector.count_aligned_votes(&message), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn remove_old() {
|
||||||
|
let collector = VoteCollector::new();
|
||||||
|
simple_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("0".sha3()));
|
||||||
|
simple_vote(&collector, H520::random(), 3, 1, Step::Prevote, Some("0".sha3()));
|
||||||
|
simple_vote(&collector, H520::random(), 3, 3, Step::Precommit, Some("0".sha3()));
|
||||||
|
simple_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("1".sha3()));
|
||||||
|
simple_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("1".sha3()));
|
||||||
|
simple_vote(&collector, H520::random(), 3, 2, Step::Prevote, Some("0".sha3()));
|
||||||
|
simple_vote(&collector, H520::random(), 2, 2, Step::Precommit, Some("2".sha3()));
|
||||||
|
|
||||||
|
let message = ConsensusMessage {
|
||||||
|
signature: H520::default(),
|
||||||
|
height: 3,
|
||||||
|
round: 2,
|
||||||
|
step: Step::Precommit,
|
||||||
|
block_hash: Some("1".sha3())
|
||||||
|
};
|
||||||
|
collector.throw_out_old(&message);
|
||||||
|
assert_eq!(collector.votes.read().len(), 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user