Merge branch 'master' into lightsync
This commit is contained in:
@@ -1,9 +0,0 @@
|
||||
if ! type kcov > /dev/null; then
|
||||
echo "Install kcov first (details inside this file). Aborting."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
cargo test --no-run || exit $?
|
||||
mkdir -p target/coverage
|
||||
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1,sync/src/tests --include-pattern sync/src --verify target/coverage target/debug/ethsync*
|
||||
xdg-open target/coverage/index.html
|
||||
@@ -185,7 +185,7 @@ impl EthSync {
|
||||
};
|
||||
|
||||
let chain_sync = ChainSync::new(params.config, &*params.chain);
|
||||
let service = try!(NetworkService::new(try!(params.network_config.clone().into_basic())));
|
||||
let service = NetworkService::new(params.network_config.clone().into_basic()?)?;
|
||||
|
||||
let sync = Arc::new(EthSync {
|
||||
network: service,
|
||||
@@ -509,8 +509,8 @@ impl NetworkConfiguration {
|
||||
Ok(BasicNetworkConfiguration {
|
||||
config_path: self.config_path,
|
||||
net_config_path: self.net_config_path,
|
||||
listen_address: match self.listen_address { None => None, Some(addr) => Some(try!(SocketAddr::from_str(&addr))) },
|
||||
public_address: match self.public_address { None => None, Some(addr) => Some(try!(SocketAddr::from_str(&addr))) },
|
||||
listen_address: match self.listen_address { None => None, Some(addr) => Some(SocketAddr::from_str(&addr)?) },
|
||||
public_address: match self.public_address { None => None, Some(addr) => Some(SocketAddr::from_str(&addr)?) },
|
||||
udp_port: self.udp_port,
|
||||
nat_enabled: self.nat_enabled,
|
||||
discovery_enabled: self.discovery_enabled,
|
||||
|
||||
@@ -32,9 +32,8 @@ const MAX_HEADERS_TO_REQUEST: usize = 128;
|
||||
const MAX_BODIES_TO_REQUEST: usize = 64;
|
||||
const MAX_RECEPITS_TO_REQUEST: usize = 128;
|
||||
const SUBCHAIN_SIZE: u64 = 256;
|
||||
const MAX_ROUND_PARENTS: usize = 32;
|
||||
const MAX_ROUND_PARENTS: usize = 16;
|
||||
const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5;
|
||||
const MAX_REORG_BLOCKS: u64 = 20;
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||
/// Downloader state
|
||||
@@ -95,27 +94,38 @@ pub struct BlockDownloader {
|
||||
last_imported_hash: H256,
|
||||
/// Number of blocks imported this round
|
||||
imported_this_round: Option<usize>,
|
||||
/// Block number the last round started with.
|
||||
last_round_start: BlockNumber,
|
||||
last_round_start_hash: H256,
|
||||
/// Block parents imported this round (hash, parent)
|
||||
round_parents: VecDeque<(H256, H256)>,
|
||||
/// Do we need to download block recetips.
|
||||
download_receipts: bool,
|
||||
/// Sync up to the block with this hash.
|
||||
target_hash: Option<H256>,
|
||||
/// Reorganize up to this many blocks. Up to genesis if `None`,
|
||||
max_reorg_blocks: Option<BlockNumber>,
|
||||
/// Probing range for seeking common best block.
|
||||
retract_step: u64,
|
||||
}
|
||||
|
||||
impl BlockDownloader {
|
||||
/// Create a new instance of syncing strategy.
|
||||
pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> BlockDownloader {
|
||||
pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber, max_reorg: Option<BlockNumber>) -> BlockDownloader {
|
||||
BlockDownloader {
|
||||
state: State::Idle,
|
||||
highest_block: None,
|
||||
last_imported_block: start_number,
|
||||
last_imported_hash: start_hash.clone(),
|
||||
last_round_start: start_number,
|
||||
last_round_start_hash: start_hash.clone(),
|
||||
blocks: BlockCollection::new(sync_receipts),
|
||||
imported_this_round: None,
|
||||
round_parents: VecDeque::new(),
|
||||
download_receipts: sync_receipts,
|
||||
target_hash: None,
|
||||
max_reorg_blocks: max_reorg,
|
||||
retract_step: 1,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,9 +137,12 @@ impl BlockDownloader {
|
||||
|
||||
/// Mark a block as known in the chain
|
||||
pub fn mark_as_known(&mut self, hash: &H256, number: BlockNumber) {
|
||||
if number == self.last_imported_block + 1 {
|
||||
if number >= self.last_imported_block + 1 {
|
||||
self.last_imported_block = number;
|
||||
self.last_imported_hash = hash.clone();
|
||||
self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + 1);
|
||||
self.last_round_start = number;
|
||||
self.last_round_start_hash = hash.clone();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,12 +161,6 @@ impl BlockDownloader {
|
||||
self.target_hash = Some(hash.clone());
|
||||
}
|
||||
|
||||
/// Set starting sync block
|
||||
pub fn _set_start(&mut self, hash: &H256, number: BlockNumber) {
|
||||
self.last_imported_hash = hash.clone();
|
||||
self.last_imported_block = number;
|
||||
}
|
||||
|
||||
/// Unmark header as being downloaded.
|
||||
pub fn clear_header_download(&mut self, hash: &H256) {
|
||||
self.blocks.clear_header_download(hash)
|
||||
@@ -172,6 +179,7 @@ impl BlockDownloader {
|
||||
pub fn reset_to(&mut self, hashes: Vec<H256>) {
|
||||
self.reset();
|
||||
self.blocks.reset_to(hashes);
|
||||
self.state = State::Blocks;
|
||||
}
|
||||
|
||||
/// Returns used heap memory size.
|
||||
@@ -200,10 +208,10 @@ impl BlockDownloader {
|
||||
let mut valid_response = item_count == 0; //empty response is valid
|
||||
let mut any_known = false;
|
||||
for i in 0..item_count {
|
||||
let info: BlockHeader = try!(r.val_at(i).map_err(|e| {
|
||||
let info: BlockHeader = r.val_at(i).map_err(|e| {
|
||||
trace!(target: "sync", "Error decoding block header RLP: {:?}", e);
|
||||
BlockDownloaderImportError::Invalid
|
||||
}));
|
||||
})?;
|
||||
let number = BlockNumber::from(info.number());
|
||||
// Check if any of the headers matches the hash we requested
|
||||
if !valid_response {
|
||||
@@ -221,10 +229,10 @@ impl BlockDownloader {
|
||||
self.highest_block = Some(number);
|
||||
}
|
||||
let hash = info.hash();
|
||||
let hdr = try!(r.at(i).map_err(|e| {
|
||||
let hdr = r.at(i).map_err(|e| {
|
||||
trace!(target: "sync", "Error decoding block header RLP: {:?}", e);
|
||||
BlockDownloaderImportError::Invalid
|
||||
}));
|
||||
})?;
|
||||
match io.chain().block_status(BlockId::Hash(hash.clone())) {
|
||||
BlockStatus::InChain | BlockStatus::Queued => {
|
||||
match self.state {
|
||||
@@ -260,7 +268,7 @@ impl BlockDownloader {
|
||||
return Ok(DownloadAction::Reset);
|
||||
} else {
|
||||
let best = io.chain().chain_info().best_block_number;
|
||||
if best > self.last_imported_block && best - self.last_imported_block > MAX_REORG_BLOCKS {
|
||||
if best > self.last_imported_block && (self.last_imported_block == 0 || best - self.last_imported_block > self.max_reorg_blocks.unwrap_or(u64::max_value())) {
|
||||
trace!(target: "sync", "No common block, disabling peer");
|
||||
return Err(BlockDownloaderImportError::Invalid);
|
||||
}
|
||||
@@ -294,10 +302,10 @@ impl BlockDownloader {
|
||||
else {
|
||||
let mut bodies = Vec::with_capacity(item_count);
|
||||
for i in 0..item_count {
|
||||
let body = try!(r.at(i).map_err(|e| {
|
||||
let body = r.at(i).map_err(|e| {
|
||||
trace!(target: "sync", "Error decoding block boides RLP: {:?}", e);
|
||||
BlockDownloaderImportError::Invalid
|
||||
}));
|
||||
})?;
|
||||
bodies.push(body.as_raw().to_vec());
|
||||
}
|
||||
if self.blocks.insert_bodies(bodies) != item_count {
|
||||
@@ -320,10 +328,10 @@ impl BlockDownloader {
|
||||
else {
|
||||
let mut receipts = Vec::with_capacity(item_count);
|
||||
for i in 0..item_count {
|
||||
let receipt = try!(r.at(i).map_err(|e| {
|
||||
let receipt = r.at(i).map_err(|e| {
|
||||
trace!(target: "sync", "Error decoding block receipts RLP: {:?}", e);
|
||||
BlockDownloaderImportError::Invalid
|
||||
}));
|
||||
})?;
|
||||
receipts.push(receipt.as_raw().to_vec());
|
||||
}
|
||||
if self.blocks.insert_receipts(receipts) != item_count {
|
||||
@@ -336,39 +344,47 @@ impl BlockDownloader {
|
||||
|
||||
fn start_sync_round(&mut self, io: &mut SyncIo) {
|
||||
self.state = State::ChainHead;
|
||||
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block);
|
||||
trace!(target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block);
|
||||
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
|
||||
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
|
||||
let start = self.last_round_start;
|
||||
let start_hash = self.last_round_start_hash;
|
||||
match self.imported_this_round {
|
||||
Some(n) if n == 0 && self.last_imported_block > 0 => {
|
||||
Some(n) if n == 0 && start > 0 => {
|
||||
// nothing was imported last round, step back to a previous block
|
||||
// search parent in last round known parents first
|
||||
if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) {
|
||||
self.last_imported_block -= 1;
|
||||
if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) {
|
||||
self.last_imported_block = start - 1;
|
||||
self.last_imported_hash = p.clone();
|
||||
trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
|
||||
} else {
|
||||
let best = io.chain().chain_info().best_block_number;
|
||||
if best > self.last_imported_block && best - self.last_imported_block > MAX_REORG_BLOCKS {
|
||||
debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", self.last_imported_block, self.last_imported_hash);
|
||||
if best > start && (start == 0 || best - start > self.max_reorg_blocks.unwrap_or(u64::max_value())) {
|
||||
debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash);
|
||||
self.reset();
|
||||
} else {
|
||||
match io.chain().block_hash(BlockId::Number(self.last_imported_block - 1)) {
|
||||
let n = start - min(self.retract_step, start);
|
||||
self.retract_step *= 2;
|
||||
match io.chain().block_hash(BlockId::Number(n)) {
|
||||
Some(h) => {
|
||||
self.last_imported_block -= 1;
|
||||
self.last_imported_block = n;
|
||||
self.last_imported_hash = h;
|
||||
trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash);
|
||||
trace!(target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash);
|
||||
}
|
||||
None => {
|
||||
debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash);
|
||||
debug!(target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash);
|
||||
self.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => (),
|
||||
_ => {
|
||||
self.retract_step = 1;
|
||||
},
|
||||
}
|
||||
self.last_round_start = self.last_imported_block;
|
||||
self.last_round_start_hash = self.last_imported_hash;
|
||||
self.imported_this_round = None;
|
||||
}
|
||||
|
||||
@@ -474,6 +490,9 @@ impl BlockDownloader {
|
||||
self.block_imported(&h, number, &parent);
|
||||
},
|
||||
Err(BlockImportError::Block(BlockError::UnknownParent(_))) if allow_out_of_order => {
|
||||
break;
|
||||
},
|
||||
Err(BlockImportError::Block(BlockError::UnknownParent(_))) => {
|
||||
trace!(target: "sync", "Unknown new block parent, restarting sync");
|
||||
break;
|
||||
},
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
use util::*;
|
||||
use rlp::*;
|
||||
use network::NetworkError;
|
||||
use ethcore::header::{ Header as BlockHeader};
|
||||
use ethcore::header::Header as BlockHeader;
|
||||
|
||||
known_heap_size!(0, HeaderId);
|
||||
|
||||
@@ -329,9 +329,9 @@ impl BlockCollection {
|
||||
fn insert_body(&mut self, b: Bytes) -> Result<(), NetworkError> {
|
||||
let header_id = {
|
||||
let body = UntrustedRlp::new(&b);
|
||||
let tx = try!(body.at(0));
|
||||
let tx = body.at(0)?;
|
||||
let tx_root = ordered_trie_root(tx.iter().map(|r| r.as_raw().to_vec())); //TODO: get rid of vectors here
|
||||
let uncles = try!(body.at(1)).as_raw().sha3();
|
||||
let uncles = body.at(1)?.as_raw().sha3();
|
||||
HeaderId {
|
||||
transactions_root: tx_root,
|
||||
uncles: uncles
|
||||
@@ -390,7 +390,7 @@ impl BlockCollection {
|
||||
}
|
||||
|
||||
fn insert_header(&mut self, header: Bytes) -> Result<H256, UtilError> {
|
||||
let info: BlockHeader = try!(UntrustedRlp::new(&header).as_val());
|
||||
let info: BlockHeader = UntrustedRlp::new(&header).as_val()?;
|
||||
let hash = info.hash();
|
||||
if self.blocks.contains_key(&hash) {
|
||||
return Ok(hash);
|
||||
@@ -511,7 +511,9 @@ mod test {
|
||||
let client = TestBlockChainClient::new();
|
||||
let nblocks = 200;
|
||||
client.add_blocks(nblocks, EachBlockWith::Nothing);
|
||||
let blocks: Vec<_> = (0 .. nblocks).map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap()).collect();
|
||||
let blocks: Vec<_> = (0..nblocks)
|
||||
.map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap().into_inner())
|
||||
.collect();
|
||||
let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect();
|
||||
let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect();
|
||||
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect();
|
||||
@@ -564,7 +566,9 @@ mod test {
|
||||
let client = TestBlockChainClient::new();
|
||||
let nblocks = 200;
|
||||
client.add_blocks(nblocks, EachBlockWith::Nothing);
|
||||
let blocks: Vec<_> = (0 .. nblocks).map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap()).collect();
|
||||
let blocks: Vec<_> = (0..nblocks)
|
||||
.map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap().into_inner())
|
||||
.collect();
|
||||
let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect();
|
||||
let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect();
|
||||
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect();
|
||||
@@ -586,7 +590,9 @@ mod test {
|
||||
let client = TestBlockChainClient::new();
|
||||
let nblocks = 200;
|
||||
client.add_blocks(nblocks, EachBlockWith::Nothing);
|
||||
let blocks: Vec<_> = (0 .. nblocks).map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap()).collect();
|
||||
let blocks: Vec<_> = (0..nblocks)
|
||||
.map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap().into_inner())
|
||||
.collect();
|
||||
let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect();
|
||||
let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect();
|
||||
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect();
|
||||
|
||||
@@ -92,7 +92,6 @@
|
||||
use util::*;
|
||||
use rlp::*;
|
||||
use network::*;
|
||||
use ethcore::views::{HeaderView};
|
||||
use ethcore::header::{BlockNumber, Header as BlockHeader};
|
||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockImportError, BlockQueueInfo};
|
||||
use ethcore::error::*;
|
||||
@@ -372,6 +371,7 @@ impl ChainSync {
|
||||
/// Create a new instance of syncing strategy.
|
||||
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
|
||||
let chain_info = chain.chain_info();
|
||||
let pruning = chain.pruning_info();
|
||||
let mut sync = ChainSync {
|
||||
state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle },
|
||||
starting_block: chain.chain_info().best_block_number,
|
||||
@@ -379,7 +379,7 @@ impl ChainSync {
|
||||
peers: HashMap::new(),
|
||||
handshaking_peers: HashMap::new(),
|
||||
active_peers: HashSet::new(),
|
||||
new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number),
|
||||
new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number, pruning.state_history_size),
|
||||
old_blocks: None,
|
||||
last_sent_block_number: 0,
|
||||
network_id: config.network_id,
|
||||
@@ -459,6 +459,7 @@ impl ChainSync {
|
||||
fn reset(&mut self, io: &mut SyncIo) {
|
||||
self.new_blocks.reset();
|
||||
self.snapshot.clear();
|
||||
let chain_info = io.chain().chain_info();
|
||||
if self.state == SyncState::SnapshotData {
|
||||
debug!(target:"sync", "Aborting snapshot restore");
|
||||
io.snapshot_service().abort_restore();
|
||||
@@ -466,6 +467,10 @@ impl ChainSync {
|
||||
for (_, ref mut p) in &mut self.peers {
|
||||
if p.block_set != Some(BlockSet::OldBlocks) {
|
||||
p.reset_asking();
|
||||
if p.difficulty.is_none() {
|
||||
// assume peer has up to date difficulty
|
||||
p.difficulty = Some(chain_info.pending_total_difficulty);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.state = SyncState::Idle;
|
||||
@@ -557,14 +562,15 @@ impl ChainSync {
|
||||
/// Update sync after the blockchain has been changed externally.
|
||||
pub fn update_targets(&mut self, chain: &BlockChainClient) {
|
||||
// Do not assume that the block queue/chain still has our last_imported_block
|
||||
let pruning = chain.pruning_info();
|
||||
let chain = chain.chain_info();
|
||||
self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number);
|
||||
self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number, pruning.state_history_size);
|
||||
self.old_blocks = None;
|
||||
if self.download_old_blocks {
|
||||
if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) {
|
||||
|
||||
trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number);
|
||||
let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number);
|
||||
let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number, pruning.state_history_size);
|
||||
if let Some(hash) = chain.first_block_hash {
|
||||
trace!(target: "sync", "Downloader target set to {:?}", hash);
|
||||
downloader.set_target(&hash);
|
||||
@@ -577,14 +583,14 @@ impl ChainSync {
|
||||
/// Called by peer to report status
|
||||
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
self.handshaking_peers.remove(&peer_id);
|
||||
let protocol_version: u8 = try!(r.val_at(0));
|
||||
let protocol_version: u8 = r.val_at(0)?;
|
||||
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0;
|
||||
let peer = PeerInfo {
|
||||
protocol_version: protocol_version,
|
||||
network_id: try!(r.val_at(1)),
|
||||
difficulty: Some(try!(r.val_at(2))),
|
||||
latest_hash: try!(r.val_at(3)),
|
||||
genesis: try!(r.val_at(4)),
|
||||
network_id: r.val_at(1)?,
|
||||
difficulty: Some(r.val_at(2)?),
|
||||
latest_hash: r.val_at(3)?,
|
||||
genesis: r.val_at(4)?,
|
||||
asking: PeerAsking::Nothing,
|
||||
asking_blocks: Vec::new(),
|
||||
asking_hash: None,
|
||||
@@ -593,8 +599,8 @@ impl ChainSync {
|
||||
expired: false,
|
||||
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
|
||||
asking_snapshot_data: None,
|
||||
snapshot_hash: if warp_protocol { Some(try!(r.val_at(5))) } else { None },
|
||||
snapshot_number: if warp_protocol { Some(try!(r.val_at(6))) } else { None },
|
||||
snapshot_hash: if warp_protocol { Some(r.val_at(5)?) } else { None },
|
||||
snapshot_number: if warp_protocol { Some(r.val_at(6)?) } else { None },
|
||||
block_set: None,
|
||||
};
|
||||
|
||||
@@ -655,7 +661,7 @@ impl ChainSync {
|
||||
trace!(target: "sync", "{}: Chain is too short to confirm the block", peer_id);
|
||||
peer.confirmation = ForkConfirmation::TooShort;
|
||||
} else {
|
||||
let header = try!(r.at(0)).as_raw();
|
||||
let header = r.at(0)?.as_raw();
|
||||
if header.sha3() == fork_hash {
|
||||
trace!(target: "sync", "{}: Confirmed peer", peer_id);
|
||||
peer.confirmation = ForkConfirmation::Confirmed;
|
||||
@@ -860,11 +866,17 @@ impl ChainSync {
|
||||
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
let block_rlp = try!(r.at(0));
|
||||
let header_rlp = try!(block_rlp.at(0));
|
||||
let difficulty: U256 = r.val_at(1)?;
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
|
||||
peer.difficulty = Some(difficulty);
|
||||
}
|
||||
}
|
||||
let block_rlp = r.at(0)?;
|
||||
let header_rlp = block_rlp.at(0)?;
|
||||
let h = header_rlp.as_raw().sha3();
|
||||
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
|
||||
let header: BlockHeader = try!(header_rlp.as_val());
|
||||
let header: BlockHeader = header_rlp.as_val()?;
|
||||
if header.number() > self.highest_block.unwrap_or(0) {
|
||||
self.highest_block = Some(header.number());
|
||||
}
|
||||
@@ -888,6 +900,8 @@ impl ChainSync {
|
||||
trace!(target: "sync", "New block already queued {:?}", h);
|
||||
},
|
||||
Ok(_) => {
|
||||
// abort current download of the same block
|
||||
self.complete_sync(io);
|
||||
self.new_blocks.mark_as_known(&header.hash(), header.number());
|
||||
trace!(target: "sync", "New block queued {:?} ({})", h, header.number());
|
||||
},
|
||||
@@ -906,16 +920,10 @@ impl ChainSync {
|
||||
} else {
|
||||
trace!(target: "sync", "New unknown block {:?}", h);
|
||||
//TODO: handle too many unknown blocks
|
||||
let difficulty: U256 = try!(r.val_at(1));
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
|
||||
peer.difficulty = Some(difficulty);
|
||||
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
|
||||
}
|
||||
}
|
||||
self.sync_peer(io, peer_id, true);
|
||||
}
|
||||
}
|
||||
self.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -925,22 +933,30 @@ impl ChainSync {
|
||||
trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
let hashes: Vec<_> = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1))).collect();
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
// Peer has new blocks with unknown difficulty
|
||||
peer.difficulty = None;
|
||||
if let Some(&(Ok(ref h), _)) = hashes.last() {
|
||||
peer.latest_hash = h.clone();
|
||||
}
|
||||
}
|
||||
if self.state != SyncState::Idle {
|
||||
trace!(target: "sync", "Ignoring new hashes since we're already downloading.");
|
||||
let max = r.iter().take(MAX_NEW_HASHES).map(|item| item.val_at::<BlockNumber>(1).unwrap_or(0)).fold(0u64, max);
|
||||
if max > self.highest_block.unwrap_or(0) {
|
||||
self.highest_block = Some(max);
|
||||
}
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count());
|
||||
let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1)));
|
||||
let mut max_height: BlockNumber = 0;
|
||||
let mut new_hashes = Vec::new();
|
||||
let last_imported_number = self.new_blocks.last_imported_block_number();
|
||||
for (rh, rn) in hashes {
|
||||
let hash = try!(rh);
|
||||
let number = try!(rn);
|
||||
let hash = rh?;
|
||||
let number = rn?;
|
||||
if number > self.highest_block.unwrap_or(0) {
|
||||
self.highest_block = Some(number);
|
||||
}
|
||||
@@ -982,6 +998,7 @@ impl ChainSync {
|
||||
self.state = SyncState::NewBlocks;
|
||||
self.sync_peer(io, peer_id, true);
|
||||
}
|
||||
self.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -998,7 +1015,7 @@ impl ChainSync {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let manifest_rlp = try!(r.at(0));
|
||||
let manifest_rlp = r.at(0)?;
|
||||
let manifest = match ManifestData::from_rlp(manifest_rlp.as_raw()) {
|
||||
Err(e) => {
|
||||
trace!(target: "sync", "{}: Ignored bad manifest: {:?}", peer_id, e);
|
||||
@@ -1046,7 +1063,7 @@ impl ChainSync {
|
||||
},
|
||||
}
|
||||
|
||||
let snapshot_data: Bytes = try!(r.val_at(0));
|
||||
let snapshot_data: Bytes = r.val_at(0)?;
|
||||
match self.snapshot.validate_chunk(&snapshot_data) {
|
||||
Ok(ChunkType::Block(hash)) => {
|
||||
trace!(target: "sync", "{}: Processing block chunk", peer_id);
|
||||
@@ -1106,7 +1123,7 @@ impl ChainSync {
|
||||
thread_rng().shuffle(&mut peers); //TODO: sort by rating
|
||||
// prefer peers with higher protocol version
|
||||
peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2));
|
||||
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
|
||||
trace!(target: "sync", "Syncing with peers: {} active, {} confirmed, {} total", self.active_peers.len(), peers.len(), self.peers.len());
|
||||
for (p, _, _) in peers {
|
||||
if self.active_peers.contains(&p) {
|
||||
self.sync_peer(io, p, false);
|
||||
@@ -1135,12 +1152,13 @@ impl ChainSync {
|
||||
/// Find something to do for a peer. Called for a new peer or when a peer is done with its task.
|
||||
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
|
||||
if !self.active_peers.contains(&peer_id) {
|
||||
trace!(target: "sync", "Skipping deactivated peer");
|
||||
trace!(target: "sync", "Skipping deactivated peer {}", peer_id);
|
||||
return;
|
||||
}
|
||||
let (peer_latest, peer_difficulty, peer_snapshot_number, peer_snapshot_hash) = {
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
if peer.asking != PeerAsking::Nothing || !peer.can_sync() {
|
||||
trace!(target: "sync", "Skipping busy peer {}", peer_id);
|
||||
return;
|
||||
}
|
||||
if self.state == SyncState::Waiting {
|
||||
@@ -1161,7 +1179,7 @@ impl ChainSync {
|
||||
let num_active_peers = self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count();
|
||||
|
||||
let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty);
|
||||
if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() {
|
||||
if force || higher_difficulty || self.old_blocks.is_some() {
|
||||
match self.state {
|
||||
SyncState::WaitingPeers => {
|
||||
trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
|
||||
@@ -1174,9 +1192,10 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
let have_latest = io.chain().block_status(BlockId::Hash(peer_latest)) != BlockStatus::Unknown;
|
||||
trace!(target: "sync", "Considering peer {}, force={}, td={:?}, our td={}, latest={}, have_latest={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, peer_latest, have_latest, self.state);
|
||||
if !have_latest && (higher_difficulty || force || self.state == SyncState::NewBlocks) {
|
||||
// check if got new blocks to download
|
||||
trace!(target: "sync", "Syncing with {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state);
|
||||
trace!(target: "sync", "Syncing with peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state);
|
||||
if let Some(request) = self.new_blocks.request_blocks(io, num_active_peers) {
|
||||
self.request_blocks(io, peer_id, request, BlockSet::NewBlocks);
|
||||
if self.state == SyncState::Idle {
|
||||
@@ -1206,6 +1225,8 @@ impl ChainSync {
|
||||
SyncState::SnapshotManifest | //already downloading from other peer
|
||||
SyncState::Waiting | SyncState::SnapshotWaiting => ()
|
||||
}
|
||||
} else {
|
||||
trace!(target: "sync", "Skipping peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1419,7 +1440,7 @@ impl ChainSync {
|
||||
item_count = min(item_count, MAX_TX_TO_IMPORT);
|
||||
let mut transactions = Vec::with_capacity(item_count);
|
||||
for i in 0 .. item_count {
|
||||
let rlp = try!(r.at(i));
|
||||
let rlp = r.at(i)?;
|
||||
if rlp.as_raw().len() > MAX_TRANSACTION_SIZE {
|
||||
debug!("Skipped oversized transaction of {} bytes", rlp.as_raw().len());
|
||||
continue;
|
||||
@@ -1461,24 +1482,25 @@ impl ChainSync {
|
||||
fn return_block_headers(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
// Packet layout:
|
||||
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
|
||||
let max_headers: usize = try!(r.val_at(1));
|
||||
let skip: usize = try!(r.val_at(2));
|
||||
let reverse: bool = try!(r.val_at(3));
|
||||
let max_headers: usize = r.val_at(1)?;
|
||||
let skip: usize = r.val_at(2)?;
|
||||
let reverse: bool = r.val_at(3)?;
|
||||
let last = io.chain().chain_info().best_block_number;
|
||||
let number = if try!(r.at(0)).size() == 32 {
|
||||
let number = if r.at(0)?.size() == 32 {
|
||||
// id is a hash
|
||||
let hash: H256 = try!(r.val_at(0));
|
||||
let hash: H256 = r.val_at(0)?;
|
||||
trace!(target: "sync", "{} -> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", peer_id, hash, max_headers, skip, reverse);
|
||||
match io.chain().block_header(BlockId::Hash(hash)) {
|
||||
Some(hdr) => {
|
||||
let number = From::from(HeaderView::new(&hdr).number());
|
||||
debug_assert_eq!(HeaderView::new(&hdr).sha3(), hash);
|
||||
let number = hdr.number().into();
|
||||
debug_assert_eq!(hdr.sha3(), hash);
|
||||
|
||||
if max_headers == 1 || io.chain().block_hash(BlockId::Number(number)) != Some(hash) {
|
||||
// Non canonical header or single header requested
|
||||
// TODO: handle single-step reverse hashchains of non-canon hashes
|
||||
trace!(target:"sync", "Returning single header: {:?}", hash);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append_raw(&hdr, 1);
|
||||
rlp.append_raw(&hdr.into_inner(), 1);
|
||||
return Ok(Some((BLOCK_HEADERS_PACKET, rlp)));
|
||||
}
|
||||
number
|
||||
@@ -1486,8 +1508,8 @@ impl ChainSync {
|
||||
None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing
|
||||
}
|
||||
} else {
|
||||
trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, try!(r.val_at::<BlockNumber>(0)), max_headers, skip, reverse);
|
||||
try!(r.val_at(0))
|
||||
trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, r.val_at::<BlockNumber>(0)?, max_headers, skip, reverse);
|
||||
r.val_at(0)?
|
||||
};
|
||||
|
||||
let mut number = if reverse {
|
||||
@@ -1506,8 +1528,8 @@ impl ChainSync {
|
||||
trace!(target: "sync", "{}: Returning cached fork header", peer_id);
|
||||
data.extend_from_slice(hdr);
|
||||
count += 1;
|
||||
} else if let Some(mut hdr) = io.chain().block_header(BlockId::Number(number)) {
|
||||
data.append(&mut hdr);
|
||||
} else if let Some(hdr) = io.chain().block_header(BlockId::Number(number)) {
|
||||
data.append(&mut hdr.into_inner());
|
||||
count += 1;
|
||||
} else {
|
||||
// No required block.
|
||||
@@ -1540,8 +1562,8 @@ impl ChainSync {
|
||||
let mut added = 0usize;
|
||||
let mut data = Bytes::new();
|
||||
for i in 0..count {
|
||||
if let Some(mut hdr) = io.chain().block_body(BlockId::Hash(try!(r.val_at::<H256>(i)))) {
|
||||
data.append(&mut hdr);
|
||||
if let Some(body) = io.chain().block_body(BlockId::Hash(r.val_at::<H256>(i)?)) {
|
||||
data.append(&mut body.into_inner());
|
||||
added += 1;
|
||||
}
|
||||
}
|
||||
@@ -1563,8 +1585,8 @@ impl ChainSync {
|
||||
let mut added = 0usize;
|
||||
let mut data = Vec::new();
|
||||
for i in 0..count {
|
||||
if let Some(hdr) = io.chain().state_data(&try!(r.val_at::<H256>(i))) {
|
||||
data.push(hdr);
|
||||
if let Some(node) = io.chain().state_data(&r.val_at::<H256>(i)?) {
|
||||
data.push(node);
|
||||
added += 1;
|
||||
}
|
||||
}
|
||||
@@ -1588,7 +1610,7 @@ impl ChainSync {
|
||||
let mut added_receipts = 0usize;
|
||||
let mut data = Bytes::new();
|
||||
for i in 0..count {
|
||||
if let Some(mut receipts_bytes) = io.chain().block_receipts(&try!(rlp.val_at::<H256>(i))) {
|
||||
if let Some(mut receipts_bytes) = io.chain().block_receipts(&rlp.val_at::<H256>(i)?) {
|
||||
data.append(&mut receipts_bytes);
|
||||
added_receipts += receipts_bytes.len();
|
||||
added_headers += 1;
|
||||
@@ -1625,7 +1647,7 @@ impl ChainSync {
|
||||
|
||||
/// Respond to GetSnapshotData request
|
||||
fn return_snapshot_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let hash: H256 = try!(r.val_at(0));
|
||||
let hash: H256 = r.val_at(0)?;
|
||||
trace!(target: "sync", "{} -> GetSnapshotData {:?}", peer_id, hash);
|
||||
let rlp = match io.snapshot_service().chunk(hash) {
|
||||
Some(data) => {
|
||||
@@ -1797,8 +1819,8 @@ impl ChainSync {
|
||||
let mut rlp_stream = RlpStream::new_list(blocks.len());
|
||||
for block_hash in blocks {
|
||||
let mut hash_rlp = RlpStream::new_list(2);
|
||||
let number = HeaderView::new(&chain.block_header(BlockId::Hash(block_hash.clone()))
|
||||
.expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.")).number();
|
||||
let number = chain.block_header(BlockId::Hash(block_hash.clone()))
|
||||
.expect("chain.tree_route and chain.find_uncles only return hahses of blocks that are in the blockchain. qed.").number();
|
||||
hash_rlp.append(&block_hash);
|
||||
hash_rlp.append(&number);
|
||||
rlp_stream.append_raw(hash_rlp.as_raw(), 1);
|
||||
@@ -1822,7 +1844,8 @@ impl ChainSync {
|
||||
/// creates latest block rlp for the given client
|
||||
fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes {
|
||||
ChainSync::create_block_rlp(
|
||||
&chain.block(BlockId::Hash(chain.chain_info().best_block_hash)).expect("Best block always exists"),
|
||||
&chain.block(BlockId::Hash(chain.chain_info().best_block_hash))
|
||||
.expect("Best block always exists").into_inner(),
|
||||
chain.chain_info().total_difficulty
|
||||
)
|
||||
}
|
||||
@@ -1830,7 +1853,7 @@ impl ChainSync {
|
||||
/// creates given hash block rlp for the given client
|
||||
fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes {
|
||||
ChainSync::create_block_rlp(
|
||||
&chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed"),
|
||||
&chain.block(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed").into_inner(),
|
||||
chain.block_total_difficulty(BlockId::Hash(hash.clone())).expect("Block has just been sealed; qed.")
|
||||
)
|
||||
}
|
||||
@@ -1893,8 +1916,7 @@ impl ChainSync {
|
||||
fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[PeerId]) -> usize {
|
||||
trace!(target: "sync", "Sending NewHashes to {:?}", peers);
|
||||
let mut sent = 0;
|
||||
let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(chain_info.best_block_hash.clone()))
|
||||
.expect("Best block always exists")).parent_hash();
|
||||
let last_parent = &io.chain().best_block_header().parent_hash();
|
||||
for peer_id in peers {
|
||||
sent += match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &chain_info.best_block_hash) {
|
||||
Some(rlp) => {
|
||||
@@ -2035,7 +2057,9 @@ impl ChainSync {
|
||||
|
||||
/// called when block is imported to chain - propagates the blocks and updates transactions sent to peers
|
||||
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) {
|
||||
if io.is_chain_queue_empty() {
|
||||
let queue_info = io.chain().queue_info();
|
||||
if !self.status().is_syncing(queue_info) || !sealed.is_empty() {
|
||||
trace!(target: "sync", "Propagating blocks, state={:?}", self.state);
|
||||
self.propagate_latest_blocks(io, sealed);
|
||||
self.propagate_proposed_blocks(io, proposed);
|
||||
}
|
||||
@@ -2075,7 +2099,6 @@ mod tests {
|
||||
use super::*;
|
||||
use ::SyncConfig;
|
||||
use super::{PeerInfo, PeerAsking};
|
||||
use ethcore::views::BlockView;
|
||||
use ethcore::header::*;
|
||||
use ethcore::client::*;
|
||||
use ethcore::miner::MinerService;
|
||||
@@ -2229,7 +2252,8 @@ mod tests {
|
||||
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Nothing);
|
||||
let blocks: Vec<_> = (0 .. 100).map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).unwrap()).collect();
|
||||
let blocks: Vec<_> = (0 .. 100)
|
||||
.map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).map(|b| b.into_inner()).unwrap()).collect();
|
||||
let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect();
|
||||
let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect();
|
||||
|
||||
@@ -2420,7 +2444,7 @@ mod tests {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(2, EachBlockWith::Uncle);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let block = client.block(BlockId::Latest).unwrap();
|
||||
let block = client.block(BlockId::Latest).unwrap().into_inner();
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client);
|
||||
sync.peers.insert(0,
|
||||
PeerInfo {
|
||||
@@ -2698,9 +2722,8 @@ mod tests {
|
||||
// Add some balance to clients and reset nonces
|
||||
for h in &[good_blocks[0], retracted_blocks[0]] {
|
||||
let block = client.block(BlockId::Hash(*h)).unwrap();
|
||||
let view = BlockView::new(&block);
|
||||
client.set_balance(view.transactions()[0].sender().unwrap(), U256::from(1_000_000_000));
|
||||
client.set_nonce(view.transactions()[0].sender().unwrap(), U256::from(0));
|
||||
client.set_balance(block.transactions()[0].sender().unwrap(), U256::from(1_000_000_000));
|
||||
client.set_nonce(block.transactions()[0].sender().unwrap(), U256::from(0));
|
||||
}
|
||||
|
||||
|
||||
@@ -2717,8 +2740,7 @@ mod tests {
|
||||
// We need to update nonce status (because we say that the block has been imported)
|
||||
for h in &[good_blocks[0]] {
|
||||
let block = client.block(BlockId::Hash(*h)).unwrap();
|
||||
let view = BlockView::new(&block);
|
||||
client.set_nonce(view.transactions()[0].sender().unwrap(), U256::from(1));
|
||||
client.set_nonce(block.transactions()[0].sender().unwrap(), U256::from(1));
|
||||
}
|
||||
{
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
|
||||
@@ -255,8 +255,12 @@ fn high_td_attach() {
|
||||
fn disconnect_on_unrelated_chain() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(2);
|
||||
net.peer(0).chain.add_blocks(200, EachBlockWith::Uncle);
|
||||
net.peer(1).chain.add_blocks(100, EachBlockWith::Nothing);
|
||||
net.peer(0).chain.set_history(Some(20));
|
||||
net.peer(1).chain.set_history(Some(20));
|
||||
net.restart_peer(0);
|
||||
net.restart_peer(1);
|
||||
net.peer(0).chain.add_blocks(500, EachBlockWith::Uncle);
|
||||
net.peer(1).chain.add_blocks(300, EachBlockWith::Nothing);
|
||||
net.sync();
|
||||
assert_eq!(net.disconnect_events, vec![(0, 0)]);
|
||||
}
|
||||
|
||||
@@ -132,37 +132,115 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
||||
}
|
||||
}
|
||||
|
||||
/// Abstract messages between peers.
|
||||
pub trait Message {
|
||||
/// The intended recipient of this message.
|
||||
fn recipient(&self) -> PeerId;
|
||||
}
|
||||
|
||||
/// Mock subprotocol packet
|
||||
pub struct TestPacket {
|
||||
pub data: Bytes,
|
||||
pub packet_id: PacketId,
|
||||
pub recipient: PeerId,
|
||||
}
|
||||
|
||||
pub struct TestPeer<C> where C: FlushingBlockChainClient {
|
||||
impl Message for TestPacket {
|
||||
fn recipient(&self) -> PeerId { self.recipient }
|
||||
}
|
||||
|
||||
/// A peer which can be a member of the `TestNet`.
|
||||
pub trait Peer {
|
||||
type Message: Message;
|
||||
|
||||
/// Called on connection to other indicated peer.
|
||||
fn on_connect(&self, other: PeerId);
|
||||
|
||||
/// Called on disconnect from other indicated peer.
|
||||
fn on_disconnect(&self, other: PeerId);
|
||||
|
||||
/// Receive a message from another peer. Return a set of peers to disconnect.
|
||||
fn receive_message(&self, from: PeerId, msg: Self::Message) -> HashSet<PeerId>;
|
||||
|
||||
/// Produce the next pending message to send to another peer.
|
||||
fn pending_message(&self) -> Option<Self::Message>;
|
||||
|
||||
/// Whether this peer is done syncing (has no messages to send).
|
||||
fn is_done(&self) -> bool;
|
||||
|
||||
/// Execute a "sync step". This is called for each peer after it sends a packet.
|
||||
fn sync_step(&self);
|
||||
|
||||
/// Restart sync for a peer.
|
||||
fn restart_sync(&self);
|
||||
}
|
||||
|
||||
pub struct EthPeer<C> where C: FlushingBlockChainClient {
|
||||
pub chain: Arc<C>,
|
||||
pub snapshot_service: Arc<TestSnapshotService>,
|
||||
pub sync: RwLock<ChainSync>,
|
||||
pub queue: RwLock<VecDeque<TestPacket>>,
|
||||
}
|
||||
|
||||
pub struct TestNet<C> where C: FlushingBlockChainClient {
|
||||
pub peers: Vec<Arc<TestPeer<C>>>,
|
||||
impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
|
||||
type Message = TestPacket;
|
||||
|
||||
fn on_connect(&self, other: PeerId) {
|
||||
self.sync.write().update_targets(&*self.chain);
|
||||
self.sync.write().on_peer_connected(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other)), other);
|
||||
}
|
||||
|
||||
fn on_disconnect(&self, other: PeerId) {
|
||||
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(other));
|
||||
self.sync.write().on_peer_aborting(&mut io, other);
|
||||
}
|
||||
|
||||
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
|
||||
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, Some(from));
|
||||
ChainSync::dispatch_packet(&self.sync, &mut io, from, msg.packet_id, &msg.data);
|
||||
self.chain.flush();
|
||||
io.to_disconnect.clone()
|
||||
}
|
||||
|
||||
fn pending_message(&self) -> Option<TestPacket> {
|
||||
self.chain.flush();
|
||||
self.queue.write().pop_front()
|
||||
}
|
||||
|
||||
fn is_done(&self) -> bool {
|
||||
self.queue.read().is_empty()
|
||||
}
|
||||
|
||||
fn sync_step(&self) {
|
||||
self.chain.flush();
|
||||
self.sync.write().maintain_peers(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
|
||||
self.sync.write().maintain_sync(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
|
||||
self.sync.write().propagate_new_transactions(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
|
||||
}
|
||||
|
||||
fn restart_sync(&self) {
|
||||
self.sync.write().restart(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestNet<P> {
|
||||
pub peers: Vec<Arc<P>>,
|
||||
pub started: bool,
|
||||
pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to)
|
||||
}
|
||||
|
||||
impl TestNet<TestBlockChainClient> {
|
||||
pub fn new(n: usize) -> TestNet<TestBlockChainClient> {
|
||||
impl TestNet<EthPeer<TestBlockChainClient>> {
|
||||
pub fn new(n: usize) -> Self {
|
||||
Self::new_with_config(n, SyncConfig::default())
|
||||
}
|
||||
|
||||
pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet<TestBlockChainClient> {
|
||||
pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> Self {
|
||||
let mut config = SyncConfig::default();
|
||||
config.fork_block = fork;
|
||||
Self::new_with_config(n, config)
|
||||
}
|
||||
|
||||
pub fn new_with_config(n: usize, config: SyncConfig) -> TestNet<TestBlockChainClient> {
|
||||
pub fn new_with_config(n: usize, config: SyncConfig) -> Self {
|
||||
let mut net = TestNet {
|
||||
peers: Vec::new(),
|
||||
started: false,
|
||||
@@ -172,7 +250,7 @@ impl TestNet<TestBlockChainClient> {
|
||||
let chain = TestBlockChainClient::new();
|
||||
let ss = Arc::new(TestSnapshotService::new());
|
||||
let sync = ChainSync::new(config.clone(), &chain);
|
||||
net.peers.push(Arc::new(TestPeer {
|
||||
net.peers.push(Arc::new(EthPeer {
|
||||
sync: RwLock::new(sync),
|
||||
snapshot_service: ss,
|
||||
chain: Arc::new(chain),
|
||||
@@ -183,8 +261,8 @@ impl TestNet<TestBlockChainClient> {
|
||||
}
|
||||
}
|
||||
|
||||
impl TestNet<EthcoreClient> {
|
||||
pub fn with_spec<F>(n: usize, config: SyncConfig, spec_factory: F) -> GuardedTempResult<TestNet<EthcoreClient>>
|
||||
impl TestNet<EthPeer<EthcoreClient>> {
|
||||
pub fn with_spec<F>(n: usize, config: SyncConfig, spec_factory: F) -> GuardedTempResult<Self>
|
||||
where F: Fn() -> Spec
|
||||
{
|
||||
let mut net = TestNet {
|
||||
@@ -211,7 +289,7 @@ impl TestNet<EthcoreClient> {
|
||||
|
||||
let ss = Arc::new(TestSnapshotService::new());
|
||||
let sync = ChainSync::new(config.clone(), &*client);
|
||||
let peer = Arc::new(TestPeer {
|
||||
let peer = Arc::new(EthPeer {
|
||||
sync: RwLock::new(sync),
|
||||
snapshot_service: ss,
|
||||
chain: client,
|
||||
@@ -220,22 +298,18 @@ impl TestNet<EthcoreClient> {
|
||||
peer.chain.add_notify(peer.clone());
|
||||
net.peers.push(peer);
|
||||
}
|
||||
GuardedTempResult::<TestNet<EthcoreClient>> {
|
||||
GuardedTempResult {
|
||||
_temp: dir,
|
||||
result: Some(net)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> TestNet<C> where C: FlushingBlockChainClient {
|
||||
pub fn peer(&self, i: usize) -> &TestPeer<C> {
|
||||
impl<P> TestNet<P> where P: Peer {
|
||||
pub fn peer(&self, i: usize) -> &P {
|
||||
&self.peers[i]
|
||||
}
|
||||
|
||||
pub fn peer_mut(&mut self, i: usize) -> &mut TestPeer<C> {
|
||||
Arc::get_mut(&mut self.peers[i]).unwrap()
|
||||
}
|
||||
|
||||
pub fn start(&mut self) {
|
||||
if self.started {
|
||||
return;
|
||||
@@ -243,9 +317,7 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
|
||||
for peer in 0..self.peers.len() {
|
||||
for client in 0..self.peers.len() {
|
||||
if peer != client {
|
||||
let p = &self.peers[peer];
|
||||
p.sync.write().update_targets(&*p.chain);
|
||||
p.sync.write().on_peer_connected(&mut TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(client as PeerId)), client as PeerId);
|
||||
self.peers[peer].on_connect(client as PeerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -254,31 +326,22 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
|
||||
|
||||
pub fn sync_step(&mut self) {
|
||||
for peer in 0..self.peers.len() {
|
||||
self.peers[peer].chain.flush();
|
||||
let packet = self.peers[peer].queue.write().pop_front();
|
||||
let packet = self.peers[peer].pending_message();
|
||||
if let Some(packet) = packet {
|
||||
let disconnecting = {
|
||||
let p = &self.peers[packet.recipient];
|
||||
trace!("--- {} -> {} ---", peer, packet.recipient);
|
||||
let to_disconnect = {
|
||||
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId));
|
||||
ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data);
|
||||
p.chain.flush();
|
||||
io.to_disconnect.clone()
|
||||
};
|
||||
let recipient = packet.recipient();
|
||||
trace!("--- {} -> {} ---", peer, recipient);
|
||||
let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet);
|
||||
for d in &to_disconnect {
|
||||
// notify this that disconnecting peers are disconnecting
|
||||
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(*d));
|
||||
p.sync.write().on_peer_aborting(&mut io, *d);
|
||||
self.peers[recipient].on_disconnect(*d as PeerId);
|
||||
self.disconnect_events.push((peer, *d));
|
||||
}
|
||||
to_disconnect
|
||||
};
|
||||
for d in &disconnecting {
|
||||
// notify other peers that this peer is disconnecting
|
||||
let p = &self.peers[*d];
|
||||
let mut io = TestIo::new(&*p.chain, &p.snapshot_service, &p.queue, Some(peer as PeerId));
|
||||
p.sync.write().on_peer_aborting(&mut io, peer as PeerId);
|
||||
self.peers[*d].on_disconnect(peer as PeerId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,16 +350,11 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
|
||||
}
|
||||
|
||||
pub fn sync_step_peer(&mut self, peer_num: usize) {
|
||||
let peer = self.peer(peer_num);
|
||||
peer.chain.flush();
|
||||
peer.sync.write().maintain_peers(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
|
||||
peer.sync.write().maintain_sync(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
|
||||
peer.sync.write().propagate_new_transactions(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
|
||||
self.peers[peer_num].sync_step();
|
||||
}
|
||||
|
||||
pub fn restart_peer(&mut self, i: usize) {
|
||||
let peer = self.peer(i);
|
||||
peer.sync.write().restart(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None));
|
||||
self.peers[i].restart_sync();
|
||||
}
|
||||
|
||||
pub fn sync(&mut self) -> u32 {
|
||||
@@ -317,16 +375,25 @@ impl<C> TestNet<C> where C: FlushingBlockChainClient {
|
||||
}
|
||||
|
||||
pub fn done(&self) -> bool {
|
||||
self.peers.iter().all(|p| p.queue.read().is_empty())
|
||||
self.peers.iter().all(|p| p.is_done())
|
||||
}
|
||||
}
|
||||
|
||||
impl TestNet<EthPeer<TestBlockChainClient>> {
|
||||
// relies on Arc uniqueness, which is only true when we haven't registered a ChainNotify.
|
||||
pub fn peer_mut(&mut self, i: usize) -> &mut EthPeer<TestBlockChainClient> {
|
||||
Arc::get_mut(&mut self.peers[i]).expect("Arc never exposed externally")
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: FlushingBlockChainClient> TestNet<EthPeer<C>> {
|
||||
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
|
||||
let peer = self.peer(peer_id);
|
||||
let peer = &mut self.peers[peer_id];
|
||||
peer.sync.write().chain_new_blocks(&mut TestIo::new(&*peer.chain, &peer.snapshot_service, &peer.queue, None), &[], &[], &[], &[], &[], &[]);
|
||||
}
|
||||
}
|
||||
|
||||
impl ChainNotify for TestPeer<EthcoreClient> {
|
||||
impl ChainNotify for EthPeer<EthcoreClient> {
|
||||
fn new_blocks(&self,
|
||||
imported: Vec<H256>,
|
||||
invalid: Vec<H256>,
|
||||
|
||||
Reference in New Issue
Block a user