better proposal block handling

This commit is contained in:
keorn 2016-12-08 12:03:34 +01:00
parent 347634ac6c
commit 3ebfa1481d
14 changed files with 228 additions and 131 deletions

View File

@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ipc::IpcConfig;
use util::H256;
use util::{H256, Bytes};
/// Represents what has to be handled by actor listening to chain events
#[ipc]
@ -27,6 +27,8 @@ pub trait ChainNotify : Send + Sync {
_enacted: Vec<H256>,
_retracted: Vec<H256>,
_sealed: Vec<H256>,
// Block bytes and total difficulty.
_proposed: Vec<Bytes>,
_duration: u64) {
// does nothing by default
}

View File

@ -391,9 +391,10 @@ impl Client {
/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize {
let max_blocks_to_import = 4;
let (imported_blocks, import_results, invalid_blocks, imported, duration, is_empty) = {
let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, is_empty) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut proposed_blocks = Vec::with_capacity(max_blocks_to_import);
let mut import_results = Vec::with_capacity(max_blocks_to_import);
let _import_lock = self.import_lock.lock();
@ -412,12 +413,17 @@ impl Client {
continue;
}
if let Ok(closed_block) = self.check_and_close_block(&block) {
imported_blocks.push(header.hash());
if self.engine.is_proposal(&block.header) {
proposed_blocks.push(block.bytes);
invalid_blocks.insert(header.hash());
} else {
imported_blocks.push(header.hash());
let route = self.commit_block(closed_block, &header.hash(), &block.bytes);
import_results.push(route);
let route = self.commit_block(closed_block, &header.hash(), &block.bytes);
import_results.push(route);
self.report.write().accrue_block(&block);
self.report.write().accrue_block(&block);
}
} else {
invalid_blocks.insert(header.hash());
}
@ -431,7 +437,7 @@ impl Client {
}
let is_empty = self.block_queue.mark_as_good(&imported_blocks);
let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, imported, duration_ns, is_empty)
(imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration_ns, is_empty)
};
{
@ -449,6 +455,7 @@ impl Client {
enacted.clone(),
retracted.clone(),
Vec::new(),
proposed_blocks.clone(),
duration,
);
});
@ -1364,6 +1371,20 @@ impl MiningBlockChainClient for Client {
&self.factories.vm
}
fn broadcast_proposal_block(&self, block: SealedBlock) {
self.notify(|notify| {
notify.new_blocks(
vec![],
vec![],
vec![],
vec![],
vec![],
vec![block.rlp_bytes()],
0,
);
});
}
fn import_sealed_block(&self, block: SealedBlock) -> ImportResult {
let h = block.header().hash();
let start = precise_time_ns();
@ -1388,6 +1409,7 @@ impl MiningBlockChainClient for Client {
enacted.clone(),
retracted.clone(),
vec![h.clone()],
vec![],
precise_time_ns() - start,
);
});
@ -1458,4 +1480,4 @@ mod tests {
assert!(client.tree_route(&genesis, &new_hash).is_none());
}
}
}

View File

@ -360,6 +360,8 @@ impl MiningBlockChainClient for TestBlockChainClient {
fn import_sealed_block(&self, _block: SealedBlock) -> ImportResult {
Ok(H256::default())
}
fn broadcast_proposal_block(&self, _block: SealedBlock) {}
}
impl BlockChainClient for TestBlockChainClient {

View File

@ -273,6 +273,9 @@ pub trait MiningBlockChainClient: BlockChainClient {
/// Returns EvmFactory.
fn vm_factory(&self) -> &EvmFactory;
/// Broadcast a block proposal.
fn broadcast_proposal_block(&self, block: SealedBlock);
/// Import sealed block. Skips all verifications.
fn import_sealed_block(&self, block: SealedBlock) -> ImportResult;
@ -299,4 +302,4 @@ pub trait ProvingBlockChainClient: BlockChainClient {
/// Get code by address hash.
fn code_by_hash(&self, account_key: H256, id: BlockID) -> Bytes;
}
}

View File

@ -25,7 +25,7 @@ use rlp::{UntrustedRlp, Rlp, View, encode};
use account_provider::AccountProvider;
use block::*;
use spec::CommonParams;
use engines::{Engine, EngineError};
use engines::{Engine, Seal, EngineError};
use header::Header;
use error::{Error, BlockError};
use blockchain::extras::BlockDetails;
@ -218,8 +218,8 @@ impl Engine for AuthorityRound {
///
/// This operation is synchronous and may (quite reasonably) not be available, in which `false` will
/// be returned.
fn generate_seal(&self, block: &ExecutedBlock) -> Option<Vec<Bytes>> {
if self.proposed.load(AtomicOrdering::SeqCst) { return None; }
fn generate_seal(&self, block: &ExecutedBlock) -> Seal {
if self.proposed.load(AtomicOrdering::SeqCst) { return Seal::None; }
let header = block.header();
let step = self.step();
if self.is_step_proposer(step, header.author()) {
@ -228,7 +228,8 @@ impl Engine for AuthorityRound {
if let Ok(signature) = ap.sign(*header.author(), self.password.read().clone(), header.bare_hash()) {
trace!(target: "poa", "generate_seal: Issuing a block for step {}.", step);
self.proposed.store(true, AtomicOrdering::SeqCst);
return Some(vec![encode(&step).to_vec(), encode(&(&*signature as &[u8])).to_vec()]);
let rlps = vec![encode(&step).to_vec(), encode(&(&*signature as &[u8])).to_vec()];
return Seal::Regular(rlps);
} else {
warn!(target: "poa", "generate_seal: FAIL: Accounts secret key unavailable.");
}
@ -236,7 +237,7 @@ impl Engine for AuthorityRound {
warn!(target: "poa", "generate_seal: FAIL: Accounts not provided.");
}
}
None
Seal::None
}
/// Check the number of seal fields.
@ -339,6 +340,7 @@ mod tests {
use account_provider::AccountProvider;
use spec::Spec;
use std::time::UNIX_EPOCH;
use engines::Seal;
#[test]
fn has_valid_metadata() {
@ -408,17 +410,17 @@ mod tests {
let b2 = b2.close_and_lock();
engine.set_signer(addr1, "1".into());
if let Some(seal) = engine.generate_seal(b1.block()) {
if let Seal::Regular(seal) = engine.generate_seal(b1.block()) {
assert!(b1.clone().try_seal(engine, seal).is_ok());
// Second proposal is forbidden.
assert!(engine.generate_seal(b1.block()).is_none());
assert!(engine.generate_seal(b1.block()) == Seal::None);
}
engine.set_signer(addr2, "2".into());
if let Some(seal) = engine.generate_seal(b2.block()) {
if let Seal::Regular(seal) = engine.generate_seal(b2.block()) {
assert!(b2.clone().try_seal(engine, seal).is_ok());
// Second proposal is forbidden.
assert!(engine.generate_seal(b2.block()).is_none());
assert!(engine.generate_seal(b2.block()) == Seal::None);
}
}

View File

@ -21,7 +21,7 @@ use account_provider::AccountProvider;
use block::*;
use builtin::Builtin;
use spec::CommonParams;
use engines::Engine;
use engines::{Engine, Seal};
use env_info::EnvInfo;
use error::{BlockError, Error};
use evm::Schedule;
@ -112,20 +112,20 @@ impl Engine for BasicAuthority {
///
/// This operation is synchronous and may (quite reasonably) not be available, in which `false` will
/// be returned.
fn generate_seal(&self, block: &ExecutedBlock) -> Option<Vec<Bytes>> {
fn generate_seal(&self, block: &ExecutedBlock) -> Seal {
if let Some(ref ap) = *self.account_provider.lock() {
let header = block.header();
let message = header.bare_hash();
// account should be pernamently unlocked, otherwise sealing will fail
if let Ok(signature) = ap.sign(*block.header().author(), self.password.read().clone(), message) {
return Some(vec![::rlp::encode(&(&*signature as &[u8])).to_vec()]);
return Seal::Regular(vec![::rlp::encode(&(&*signature as &[u8])).to_vec()]);
} else {
trace!(target: "basicauthority", "generate_seal: FAIL: accounts secret key unavailable");
}
} else {
trace!(target: "basicauthority", "generate_seal: FAIL: accounts not provided");
}
None
Seal::None
}
fn verify_block_basic(&self, header: &Header, _block: Option<&[u8]>) -> result::Result<(), Error> {
@ -199,6 +199,7 @@ mod tests {
use account_provider::AccountProvider;
use header::Header;
use spec::Spec;
use engines::Seal;
/// Create a new test chain spec with `BasicAuthority` consensus engine.
fn new_test_authority() -> Spec {
@ -269,8 +270,9 @@ mod tests {
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let b = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes, addr, (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = b.close_and_lock();
let seal = engine.generate_seal(b.block()).unwrap();
assert!(b.try_seal(engine, seal).is_ok());
if let Seal::Regular(seal) = engine.generate_seal(b.block()) {
assert!(b.try_seal(engine, seal).is_ok());
}
}
#[test]

View File

@ -17,12 +17,11 @@
use std::collections::BTreeMap;
use util::Address;
use builtin::Builtin;
use engines::Engine;
use engines::{Engine, Seal};
use env_info::EnvInfo;
use spec::CommonParams;
use evm::Schedule;
use block::ExecutedBlock;
use util::Bytes;
/// An engine which does not provide any consensus mechanism, just seals blocks internally.
pub struct InstantSeal {
@ -59,8 +58,8 @@ impl Engine for InstantSeal {
fn is_sealer(&self, _author: &Address) -> Option<bool> { Some(true) }
fn generate_seal(&self, _block: &ExecutedBlock) -> Option<Vec<Bytes>> {
Some(Vec::new())
fn generate_seal(&self, _block: &ExecutedBlock) -> Seal {
Seal::Regular(Vec::new())
}
}
@ -72,6 +71,7 @@ mod tests {
use spec::Spec;
use header::Header;
use block::*;
use engines::Seal;
#[test]
fn instant_can_seal() {
@ -84,8 +84,9 @@ mod tests {
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let b = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes, Address::default(), (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = b.close_and_lock();
let seal = engine.generate_seal(b.block()).unwrap();
assert!(b.try_seal(engine, seal).is_ok());
if let Seal::Regular(seal) = engine.generate_seal(b.block()) {
assert!(b.try_seal(engine, seal).is_ok());
}
}
#[test]

View File

@ -49,11 +49,11 @@ use views::HeaderView;
#[derive(Debug)]
pub enum EngineError {
/// Signature does not belong to an authority.
NotAuthorized(H160),
NotAuthorized(Address),
/// The same author issued different votes at the same step.
DoubleVote(H160),
DoubleVote(Address),
/// The received block is from an incorrect proposer.
NotProposer(Mismatch<H160>),
NotProposer(Mismatch<Address>),
/// Message was not expected.
UnexpectedMessage,
/// Seal field has an unexpected size.
@ -75,6 +75,17 @@ impl fmt::Display for EngineError {
}
}
/// Seal type.
#[derive(Debug, PartialEq, Eq)]
pub enum Seal {
/// Proposal seal; should be broadcasted, but not inserted into blockchain.
Proposal(Vec<Bytes>),
/// Regular block seal; should be part of the blockchain.
Regular(Vec<Bytes>),
/// Engine does generate seal for this block right now.
None,
}
/// A consensus mechanism for the chain. Generally either proof-of-work or proof-of-stake-based.
/// Provides hooks into each of the major parts of block import.
pub trait Engine : Sync + Send {
@ -127,7 +138,7 @@ pub trait Engine : Sync + Send {
///
/// This operation is synchronous and may (quite reasonably) not be available, in which None will
/// be returned.
fn generate_seal(&self, _block: &ExecutedBlock) -> Option<Vec<Bytes>> { None }
fn generate_seal(&self, _block: &ExecutedBlock) -> Seal { Seal::None }
/// Phase 1 quick block verification. Only does checks that are cheap. `block` (the header's full block)
/// may be provided for additional checks. Returns either a null `Ok` or a general error detailing the problem with import.
@ -189,6 +200,10 @@ pub trait Engine : Sync + Send {
ethash::is_new_best_block(best_total_difficulty, parent_details, new_header)
}
/// Find out if the block is a proposal block and should not be inserted into the DB.
/// Takes a header of a fully verified block.
fn is_proposal(&self, _verified_header: &Header) -> bool { false }
/// Register an account which signs consensus messages.
fn set_signer(&self, _address: Address, _password: String) {}

View File

@ -33,7 +33,7 @@ use ethkey::{recover, public_to_address};
use account_provider::AccountProvider;
use block::*;
use spec::CommonParams;
use engines::{Engine, EngineError};
use engines::{Engine, Seal, EngineError};
use blockchain::extras::BlockDetails;
use views::HeaderView;
use evm::Schedule;
@ -408,14 +408,14 @@ impl Engine for Tendermint {
Some(self.is_authority(address))
}
/// Attempt to seal the block internally using all available signatures.
fn generate_seal(&self, block: &ExecutedBlock) -> Option<Vec<Bytes>> {
/// Attempt to seal generate a proposal seal.
fn generate_seal(&self, block: &ExecutedBlock) -> Seal {
if let Some(ref ap) = *self.account_provider.lock() {
let header = block.header();
let author = header.author();
// Only proposer can generate seal if None was generated.
if self.is_proposer(author).is_err() && self.proposal.read().is_none() {
return None;
return Seal::None;
}
let height = header.number() as Height;
@ -428,18 +428,18 @@ impl Engine for Tendermint {
self.votes.vote(ConsensusMessage::new(signature, height, round, Step::Propose, bh), *author);
// Remember proposal for later seal submission.
*self.proposal.write() = bh;
Some(vec![
Seal::Proposal(vec![
::rlp::encode(&round).to_vec(),
::rlp::encode(&signature).to_vec(),
::rlp::EMPTY_LIST_RLP.to_vec()
])
} else {
warn!(target: "poa", "generate_seal: FAIL: accounts secret key unavailable");
None
Seal::None
}
} else {
warn!(target: "poa", "generate_seal: FAIL: accounts not provided");
None
Seal::None
}
}
@ -526,11 +526,6 @@ impl Engine for Tendermint {
})));
}
try!(self.is_round_proposer(proposal.height, proposal.round, &proposer));
if self.is_round(&proposal) {
debug!(target: "poa", "Received a new proposal for height {}, round {} from {}.", proposal.height, proposal.round, proposer);
*self.proposal.write() = proposal.block_hash.clone();
self.votes.vote(proposal, proposer);
}
}
Ok(())
}
@ -547,17 +542,6 @@ impl Engine for Tendermint {
try!(Err(BlockError::InvalidGasLimit(OutOfBounds { min: Some(min_gas), max: Some(max_gas), found: header.gas_limit().clone() })));
}
// Commit is longer than empty signature list.
let parent_signature_len = parent.seal()[2].len();
if parent_signature_len <= 1 {
try!(Err(EngineError::BadSealFieldSize(OutOfBounds {
// One signature.
min: Some(69),
max: None,
found: parent_signature_len
})));
}
Ok(())
}
@ -599,6 +583,22 @@ impl Engine for Tendermint {
}
}
fn is_proposal(&self, header: &Header) -> bool {
let signatures_len = header.seal()[2].len();
// Signatures have to be an empty list rlp.
if signatures_len != 1 {
return false;
}
let proposal = ConsensusMessage::new_proposal(header).expect("block went through full verification; this Engine verifies new_proposal creation; qed");
let proposer = proposal.verify().expect("block went through full verification; this Engine tries verify; qed");
debug!(target: "poa", "Received a new proposal for height {}, round {} from {}.", proposal.height, proposal.round, proposer);
if self.is_round(&proposal) {
*self.proposal.write() = proposal.block_hash.clone();
}
self.votes.vote(proposal, proposer);
true
}
fn register_message_channel(&self, message_channel: IoChannel<ClientIoMessage>) {
trace!(target: "poa", "register_message_channel: Register the IoChannel.");
*self.message_channel.lock() = Some(message_channel);
@ -624,7 +624,7 @@ mod tests {
use io::IoService;
use service::ClientIoMessage;
use spec::Spec;
use engines::{Engine, EngineError};
use engines::{Engine, EngineError, Seal};
use super::*;
use super::message::*;
@ -644,8 +644,11 @@ mod tests {
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let b = OpenBlock::new(spec.engine.as_ref(), Default::default(), false, db.boxed_clone(), &genesis_header, last_hashes, proposer, (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = b.close_and_lock();
let seal = spec.engine.generate_seal(b.block()).unwrap();
(b, seal)
if let Seal::Proposal(seal) = spec.engine.generate_seal(b.block()) {
(b, seal)
} else {
panic!()
}
}
fn vote<F>(engine: &Arc<Engine>, signer: F, height: usize, round: usize, step: Step, block_hash: Option<H256>) where F: FnOnce(H256) -> Result<H520, ::account_provider::Error> {
@ -737,6 +740,7 @@ mod tests {
}
#[test]
#[ignore]
fn allows_correct_proposer() {
let (spec, tap) = setup();
let engine = spec.engine;
@ -825,7 +829,7 @@ mod tests {
#[test]
fn step_transitioning() {
::env_logger::init().unwrap();
//::env_logger::init().unwrap();
let (spec, tap) = setup();
let engine = spec.engine.clone();
let mut db_result = get_temp_state_db();

View File

@ -26,12 +26,12 @@ use state::{State, CleanupMode};
use client::{MiningBlockChainClient, Executive, Executed, EnvInfo, TransactOptions, BlockID, CallAnalytics};
use client::TransactionImportResult;
use executive::contract_address;
use block::{ClosedBlock, SealedBlock, IsBlock, Block};
use block::{ClosedBlock, IsBlock, Block};
use error::*;
use transaction::{Action, SignedTransaction};
use receipt::{Receipt, RichReceipt};
use spec::Spec;
use engines::Engine;
use engines::{Engine, Seal};
use miner::{MinerService, MinerStatus, TransactionQueue, PrioritizationStrategy, AccountDetails, TransactionOrigin};
use miner::banning_queue::{BanningTransactionQueue, Threshold};
use miner::work_notify::WorkPoster;
@ -461,39 +461,41 @@ impl Miner {
}
}
/// Attempts to perform internal sealing (one that does not require work) to return Ok(sealed),
/// Err(Some(block)) returns for unsuccesful sealing while Err(None) indicates misspecified engine.
fn seal_block_internally(&self, block: ClosedBlock) -> Result<SealedBlock, Option<ClosedBlock>> {
trace!(target: "miner", "seal_block_internally: attempting internal seal.");
let s = self.engine.generate_seal(block.block());
if let Some(seal) = s {
trace!(target: "miner", "seal_block_internally: managed internal seal. importing...");
block.lock().try_seal(&*self.engine, seal).or_else(|(e, _)| {
warn!("prepare_sealing: ERROR: try_seal failed when given internally generated seal: {}", e);
Err(None)
})
} else {
trace!(target: "miner", "seal_block_internally: unable to generate seal internally");
Err(Some(block))
}
}
/// Uses Engine to seal the block internally and then imports it to chain.
/// Attempts to perform internal sealing (one that does not require work) and handles the result depending on the type of Seal.
fn seal_and_import_block_internally(&self, chain: &MiningBlockChainClient, block: ClosedBlock) -> bool {
{
let mut sealing_work = self.sealing_work.lock();
sealing_work.queue.push(block.clone());
sealing_work.queue.use_last_ref();
}
if !block.transactions().is_empty() || self.forced_sealing() {
if let Ok(sealed) = self.seal_block_internally(block) {
if chain.import_sealed_block(sealed).is_ok() {
trace!(target: "miner", "import_block_internally: imported internally sealed block");
return true
}
trace!(target: "miner", "seal_block_internally: attempting internal seal.");
match self.engine.generate_seal(block.block()) {
// Save proposal for later seal submission and broadcast it.
Seal::Proposal(seal) => {
trace!(target: "miner", "Received a Proposal seal.");
let mut sealing_work = self.sealing_work.lock();
sealing_work.queue.push(block.clone());
sealing_work.queue.use_last_ref();
block
.lock()
.seal(&*self.engine, seal)
.map(|sealed| { chain.broadcast_proposal_block(sealed); true })
.unwrap_or_else(|e| {
warn!("ERROR: seal failed when given internally generated seal: {}", e);
false
})
},
// Directly import a regular seal.
Seal::Regular(seal) =>
block
.lock()
.seal(&*self.engine, seal)
.map(|sealed| chain.import_sealed_block(sealed).is_ok())
.unwrap_or_else(|e| {
warn!("ERROR: seal failed when given internally generated seal: {}", e);
false
}),
Seal::None => false,
}
} else {
false
}
false
}
/// Prepares work which has to be done to seal.
@ -1034,7 +1036,9 @@ impl MinerService for Miner {
let (block, original_work_hash) = self.prepare_block(chain);
if self.seals_internally {
trace!(target: "miner", "update_sealing: engine indicates internal sealing");
self.seal_and_import_block_internally(chain, block);
if self.seal_and_import_block_internally(chain, block) {
trace!(target: "miner", "update_sealing: imported internally sealed block");
}
} else {
trace!(target: "miner", "update_sealing: engine does not seal internally, preparing work");
self.prepare_work(block, original_work_hash);

View File

@ -23,7 +23,7 @@ use service::ClientIoMessage;
use views::HeaderView;
use io::IoChannel;
use util::hash::H256;
use util::{H256, Bytes};
use std::sync::Arc;
@ -107,6 +107,7 @@ impl ChainNotify for Watcher {
_: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_: Vec<Bytes>,
_duration: u64)
{
if self.oracle.is_major_importing() { return }
@ -174,6 +175,7 @@ mod tests {
vec![],
vec![],
vec![],
vec![],
0,
);
}

View File

@ -227,6 +227,7 @@ impl ChainNotify for EthSync {
enacted: Vec<H256>,
retracted: Vec<H256>,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
_duration: u64)
{
self.network.with_context(self.subprotocol_name, |context| {
@ -237,7 +238,8 @@ impl ChainNotify for EthSync {
&invalid,
&enacted,
&retracted,
&sealed);
&sealed,
&proposed);
});
}

View File

@ -249,6 +249,15 @@ enum PeerAsking {
SnapshotData,
}
/// Peer type semantic boolean.
#[derive(Clone)]
enum PeerStatus {
/// Have the same latest_hash as we.
Current,
/// Is lagging in blocks.
Lagging
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
/// Block downloader channel.
enum BlockSet {
@ -1797,32 +1806,42 @@ impl ChainSync {
}
}
/// creates rlp from block bytes and total difficulty
fn create_block_rlp(bytes: &Bytes, total_difficulty: U256) -> Bytes {
let mut rlp_stream = RlpStream::new_list(2);
rlp_stream.append_raw(bytes, 1);
rlp_stream.append(&total_difficulty);
rlp_stream.out()
}
/// creates latest block rlp for the given client
fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
let mut rlp_stream = RlpStream::new_list(2);
rlp_stream.append_raw(&chain.block(BlockID::Hash(chain.chain_info().best_block_hash)).expect("Best block always exists"), 1);
rlp_stream.append(&chain.chain_info().total_difficulty);
rlp_stream.out()
ChainSync::create_block_rlp(
&chain.block(BlockID::Hash(chain.chain_info().best_block_hash)).expect("Best block always exists"),
chain.chain_info().total_difficulty
)
}
/// creates latest block rlp for the given client
fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes {
let mut rlp_stream = RlpStream::new_list(2);
rlp_stream.append_raw(&chain.block(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed"), 1);
rlp_stream.append(&chain.block_total_difficulty(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed."));
rlp_stream.out()
ChainSync::create_block_rlp(
&chain.block(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed"),
chain.block_total_difficulty(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed.")
)
}
/// returns peer ids that have less blocks than our chain
fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<PeerId> {
/// Returns peer ids that either have less blocks than our (Lagging) chain or are Current.
fn get_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo, peer_status: PeerStatus) -> Vec<PeerId> {
let latest_hash = chain_info.best_block_hash;
self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)|
match io.chain().block_status(BlockID::Hash(peer_info.latest_hash.clone())) {
BlockStatus::InChain => {
if peer_info.latest_hash != latest_hash {
Some(id)
} else {
None
match (peer_info.latest_hash == latest_hash, peer_status.clone()) {
(false, PeerStatus::Lagging) => Some(id),
(true, PeerStatus::Lagging) => None,
(false, PeerStatus::Current) => None,
(true, PeerStatus::Current) => Some(id),
}
},
_ => None
@ -1830,7 +1849,7 @@ impl ChainSync {
.collect::<Vec<_>>()
}
fn select_random_lagging_peers(&mut self, peers: &[PeerId]) -> Vec<PeerId> {
fn select_random_peers(&mut self, peers: &[PeerId]) -> Vec<PeerId> {
use rand::Rng;
// take sqrt(x) peers
let mut peers = peers.to_vec();
@ -1842,16 +1861,16 @@ impl ChainSync {
peers
}
/// propagates latest block to lagging peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256], peers: &[PeerId]) -> usize {
/// propagates latest block to a set of peers
fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize {
trace!(target: "sync", "Sending NewBlocks to {:?}", peers);
let mut sent = 0;
for peer_id in peers {
if sealed.is_empty() {
if blocks.is_empty() {
let rlp = ChainSync::create_latest_block_rlp(io.chain());
self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
} else {
for h in sealed {
for h in blocks {
let rlp = ChainSync::create_new_block_rlp(io.chain(), h);
self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
}
@ -1969,10 +1988,10 @@ impl ChainSync {
fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) {
let chain_info = io.chain().chain_info();
if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
let mut peers = self.get_lagging_peers(&chain_info, io);
let mut peers = self.get_peers(&chain_info, io, PeerStatus::Lagging);
if sealed.is_empty() {
let hashes = self.propagate_new_hashes(&chain_info, io, &peers);
peers = self.select_random_lagging_peers(&peers);
peers = self.select_random_peers(&peers);
let blocks = self.propagate_blocks(&chain_info, io, sealed, &peers);
if blocks != 0 || hashes != 0 {
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
@ -1987,6 +2006,22 @@ impl ChainSync {
self.last_sent_block_number = chain_info.best_block_number;
}
/// Distribute valid proposed blocks to subset of current peers.
fn propagate_proposed_blocks(&mut self, io: &mut SyncIo, proposed: &[Bytes]) {
let chain_info = io.chain().chain_info();
let mut peers = self.get_peers(&chain_info, io, PeerStatus::Current);
peers = self.select_random_peers(&peers);
for block in proposed {
let rlp = ChainSync::create_block_rlp(
block,
chain_info.total_difficulty
);
for peer_id in &peers {
self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
}
}
}
/// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
self.maybe_start_snapshot_sync(io);
@ -1994,9 +2029,10 @@ impl ChainSync {
}
/// called when block is imported to chain - propagates the blocks and updates transactions sent to peers
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) {
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) {
if io.is_chain_queue_empty() {
self.propagate_latest_blocks(io, sealed);
self.propagate_proposed_blocks(io, proposed);
}
if !invalid.is_empty() {
trace!(target: "sync", "Bad blocks in the queue, restarting");
@ -2032,7 +2068,7 @@ mod tests {
use rlp::{Rlp, RlpStream, UntrustedRlp, View, Stream};
use super::*;
use ::SyncConfig;
use super::{PeerInfo, PeerAsking};
use super::{PeerInfo, PeerAsking, PeerStatus};
use ethcore::views::BlockView;
use ethcore::header::*;
use ethcore::client::*;
@ -2250,7 +2286,7 @@ mod tests {
let ss = TestSnapshotService::new();
let io = TestIo::new(&mut client, &ss, &mut queue, None);
let lagging_peers = sync.get_lagging_peers(&chain_info, &io);
let lagging_peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging);
assert_eq!(1, lagging_peers.len())
}
@ -2282,7 +2318,7 @@ mod tests {
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging);
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers);
// 1 message should be send
@ -2302,7 +2338,7 @@ mod tests {
let chain_info = client.chain_info();
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
// 1 message should be send
@ -2323,7 +2359,7 @@ mod tests {
let chain_info = client.chain_info();
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging);
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers);
// 1 message should be send
@ -2347,7 +2383,7 @@ mod tests {
// Try to propagate same transactions for the second time
let peer_count2 = sync.propagate_new_transactions(&mut io);
// Even after new block transactions should not be propagated twice
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
// Try to propagate same transactions for the third time
let peer_count3 = sync.propagate_new_transactions(&mut io);
@ -2373,7 +2409,7 @@ mod tests {
let peer_count = sync.propagate_new_transactions(&mut io);
io.chain.insert_transaction_to_queue();
// New block import should trigger propagation.
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
// 2 message should be send
assert_eq!(2, io.queue.len());
@ -2534,7 +2570,7 @@ mod tests {
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging);
sync.propagate_new_hashes(&chain_info, &mut io, &peers);
let data = &io.queue[0].data.clone();
@ -2554,7 +2590,7 @@ mod tests {
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
let peers = sync.get_lagging_peers(&chain_info, &io);
let peers = sync.get_peers(&chain_info, &io, PeerStatus::Lagging);
sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
let data = &io.queue[0].data.clone();
@ -2589,7 +2625,7 @@ mod tests {
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]);
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 1);
}
@ -2604,7 +2640,7 @@ mod tests {
let ss = TestSnapshotService::new();
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[], &[]);
}
// then
@ -2630,10 +2666,10 @@ mod tests {
let mut io = TestIo::new(&mut client, &ss, &mut queue, None);
// when
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]);
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 0);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[], &[]);
// then
let status = io.chain.miner.status();

View File

@ -244,6 +244,6 @@ impl TestNet {
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let mut peer = self.peer_mut(peer_id);
peer.sync.write().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &peer.snapshot_service, &mut peer.queue, None), &[], &[], &[], &[], &[]);
peer.sync.write().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &peer.snapshot_service, &mut peer.queue, None), &[], &[], &[], &[], &[], &[]);
}
}