message broadcasting methods

This commit is contained in:
keorn 2016-11-17 12:18:20 +00:00
parent 45027ea306
commit 9563ccfbd2
3 changed files with 48 additions and 44 deletions

View File

@ -159,7 +159,7 @@ pub trait Engine : Sync + Send {
/// Handle any potential consensus messages; /// Handle any potential consensus messages;
/// updating consensus state and potentially issuing a new one. /// updating consensus state and potentially issuing a new one.
fn handle_message(&self, _message: UntrustedRlp) -> Result<Bytes, Error> { Err(EngineError::UnexpectedMessage.into()) } fn handle_message(&self, _message: UntrustedRlp) -> Result<(), Error> { Err(EngineError::UnexpectedMessage.into()) }
// TODO: builtin contract routing - to do this properly, it will require removing the built-in configuration-reading logic // TODO: builtin contract routing - to do this properly, it will require removing the built-in configuration-reading logic
// from Spec into here and removing the Spec::builtins field. // from Spec into here and removing the Spec::builtins field.

View File

@ -125,11 +125,37 @@ impl Tendermint {
} }
} }
fn broadcast_message(&self, message: Bytes) {
if let Some(ref channel) = *self.message_channel.lock() {
match channel.send(ClientIoMessage::BroadcastMessage(message)) {
Ok(_) => trace!(target: "poa", "timeout: BroadcastMessage message sent."),
Err(err) => warn!(target: "poa", "timeout: Could not send a sealing message {}.", err),
}
}
}
fn to_step(&self, step: Step) { fn to_step(&self, step: Step) {
*self.step.write() = step; *self.step.write() = step;
match step { match step {
Step::Propose => self.update_sealing(), Step::Propose => self.update_sealing(),
_ => {}, Step::Prevote => {
self.broadcast_message()
},
Step::Precommit => {
self.broadcast_message()
},
Step::Commit => {
// Commit the block using a complete signature set.
let round = self.round.load(AtomicOrdering::SeqCst);
if let Some((proposer, votes)) = self.votes.seal_signatures(header.number() as Height, round, Some(block_hash(header))) {
let seal = vec![
::rlp::encode(&round).to_vec(),
::rlp::encode(proposer).to_vec(),
::rlp::encode(&votes).to_vec()
];
self.submit_seal(seal)
}
},
} }
} }
@ -240,47 +266,27 @@ impl Engine for Tendermint {
} }
/// Attempt to seal the block internally using all available signatures. /// Attempt to seal the block internally using all available signatures.
///
/// None is returned if not enough signatures can be collected.
fn generate_seal(&self, block: &ExecutedBlock, accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> { fn generate_seal(&self, block: &ExecutedBlock, accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> {
if let (Some(ap), Some(step)) = (accounts, self.step.try_read()) { if let Some(ap) = accounts {
let header = block.header(); let header = block.header();
let author = header.author(); let author = header.author();
match *step {
Step::Commit => {
// Commit the block using a complete signature set.
let round = self.round.load(AtomicOrdering::SeqCst);
if let Some((proposer, votes)) = self.votes.seal_signatures(header.number() as Height, round, Some(block_hash(header))).split_first() {
if votes.len() + 1 > self.threshold() {
return Some(vec![
::rlp::encode(&round).to_vec(),
::rlp::encode(proposer).to_vec(),
::rlp::encode(&votes).to_vec()
]);
}
}
},
Step::Propose if self.is_proposer(author) =>
// Seal block with propose signature.
if let Ok(signature) = ap.sign(*author, None, block_hash(header)) { if let Ok(signature) = ap.sign(*author, None, block_hash(header)) {
return Some(vec![ Some(vec![
::rlp::encode(&self.round.load(AtomicOrdering::SeqCst)).to_vec(), ::rlp::encode(&self.round.load(AtomicOrdering::SeqCst)).to_vec(),
::rlp::encode(&H520::from(signature)).to_vec(), ::rlp::encode(&H520::from(signature)).to_vec(),
Vec::new() Vec::new()
]) ])
} else { } else {
trace!(target: "poa", "generate_seal: FAIL: accounts secret key unavailable"); warn!(target: "poa", "generate_seal: FAIL: accounts secret key unavailable");
},
_ => {},
}
} else {
trace!(target: "poa", "generate_seal: FAIL: accounts not provided");
}
None None
} }
} else {
warn!(target: "poa", "generate_seal: FAIL: accounts not provided");
None
}
}
fn handle_message(&self, rlp: UntrustedRlp) -> Result<Bytes, Error> { fn handle_message(&self, rlp: UntrustedRlp) -> Result<(), Error> {
let message: ConsensusMessage = try!(rlp.as_val()); let message: ConsensusMessage = try!(rlp.as_val());
let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3()))); let sender = public_to_address(&try!(recover(&message.signature.into(), &try!(rlp.at(1)).as_raw().sha3())));
// TODO: Do not admit old messages. // TODO: Do not admit old messages.
@ -288,10 +294,8 @@ impl Engine for Tendermint {
try!(Err(BlockError::InvalidSeal)); try!(Err(BlockError::InvalidSeal));
} }
self.votes.vote(message.clone(), sender); // Check if the message is known and should be handled right now.
if self.votes.vote(message.clone(), sender).is_none() && self.is_current(&message) {
// Check if the message should be handled right now.
if self.is_current(&message) {
let next_step = match *self.step.read() { let next_step = match *self.step.read() {
Step::Precommit if self.has_enough_aligned_votes(&message) => { Step::Precommit if self.has_enough_aligned_votes(&message) => {
if message.block_hash.is_none() { if message.block_hash.is_none() {
@ -319,8 +323,7 @@ impl Engine for Tendermint {
} }
} }
} }
Ok(())
Err(BlockError::InvalidSeal.into())
} }
fn verify_block_basic(&self, header: &Header, _block: Option<&[u8]>) -> Result<(), Error> { fn verify_block_basic(&self, header: &Header, _block: Option<&[u8]>) -> Result<(), Error> {

View File

@ -32,11 +32,11 @@ impl VoteCollector {
VoteCollector { votes: RwLock::new(BTreeMap::new()) } VoteCollector { votes: RwLock::new(BTreeMap::new()) }
} }
pub fn vote(&self, message: ConsensusMessage, voter: Address) { pub fn vote(&self, message: ConsensusMessage, voter: Address) -> Option<Address> {
self.votes.write().insert(message, voter); self.votes.write().insert(message, voter)
} }
pub fn seal_signatures(&self, height: Height, round: Round, block_hash: Option<H256>) -> Vec<H520> { pub fn seal_signatures(&self, height: Height, round: Round, block_hash: Option<H256>) -> (H520, Vec<H520>) {
self.votes self.votes
.read() .read()
.keys() .keys()
@ -44,6 +44,7 @@ impl VoteCollector {
.filter(|m| m.is_aligned(height, round, block_hash) && m.step != Step::Prevote) .filter(|m| m.is_aligned(height, round, block_hash) && m.step != Step::Prevote)
.map(|m| m.signature) .map(|m| m.signature)
.collect() .collect()
.split_first()
} }
pub fn aligned_signatures(&self, message: &ConsensusMessage) -> Vec<H520> { pub fn aligned_signatures(&self, message: &ConsensusMessage) -> Vec<H520> {