Refactoring ethcore-sync
- Fixing warp-sync barrier (#8543)
* Start dividing sync chain : first supplier method * WIP - updated chain sync supplier * Finish refactoring the Chain Sync Supplier * Create Chain Sync Requester * Add Propagator for Chain Sync * Add the Chain Sync Handler * Move tests from mod -> handler * Move tests to propagator * Refactor SyncRequester arguments * Refactoring peer fork header handler * Fix wrong highest block number in snapshot sync * Small refactor... * Address PR grumbles * Retry failed CI job * Fix tests * PR Grumbles
This commit is contained in:
parent
b84682168d
commit
8b0ba97cf2
File diff suppressed because it is too large
Load Diff
828
ethcore/sync/src/chain/handler.rs
Normal file
828
ethcore/sync/src/chain/handler.rs
Normal file
@ -0,0 +1,828 @@
|
||||
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use api::WARP_SYNC_PROTOCOL_ID;
|
||||
use block_sync::{BlockDownloaderImportError as DownloaderImportError, DownloadAction};
|
||||
use bytes::Bytes;
|
||||
use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind};
|
||||
use ethcore::error::*;
|
||||
use ethcore::header::{BlockNumber, Header as BlockHeader};
|
||||
use ethcore::snapshot::{ManifestData, RestorationStatus};
|
||||
use ethereum_types::{H256, U256};
|
||||
use hash::keccak;
|
||||
use network::PeerId;
|
||||
use rlp::Rlp;
|
||||
use snapshot::ChunkType;
|
||||
use std::cmp;
|
||||
use std::collections::HashSet;
|
||||
use std::time::Instant;
|
||||
use sync_io::SyncIo;
|
||||
|
||||
use super::{
|
||||
BlockSet,
|
||||
ChainSync,
|
||||
ForkConfirmation,
|
||||
PacketDecodeError,
|
||||
PeerAsking,
|
||||
PeerInfo,
|
||||
SyncRequester,
|
||||
SyncState,
|
||||
ETH_PROTOCOL_VERSION_62,
|
||||
ETH_PROTOCOL_VERSION_63,
|
||||
MAX_NEW_BLOCK_AGE,
|
||||
MAX_NEW_HASHES,
|
||||
PAR_PROTOCOL_VERSION_1,
|
||||
PAR_PROTOCOL_VERSION_2,
|
||||
PAR_PROTOCOL_VERSION_3,
|
||||
BLOCK_BODIES_PACKET,
|
||||
BLOCK_HEADERS_PACKET,
|
||||
NEW_BLOCK_HASHES_PACKET,
|
||||
NEW_BLOCK_PACKET,
|
||||
PRIVATE_TRANSACTION_PACKET,
|
||||
RECEIPTS_PACKET,
|
||||
SIGNED_PRIVATE_TRANSACTION_PACKET,
|
||||
SNAPSHOT_DATA_PACKET,
|
||||
SNAPSHOT_MANIFEST_PACKET,
|
||||
STATUS_PACKET,
|
||||
TRANSACTIONS_PACKET,
|
||||
};
|
||||
|
||||
/// The Chain Sync Handler: handles responses from peers
|
||||
pub struct SyncHandler;
|
||||
|
||||
impl SyncHandler {
|
||||
/// Handle incoming packet from peer
|
||||
pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
if packet_id != STATUS_PACKET && !sync.peers.contains_key(&peer) {
|
||||
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_info(peer));
|
||||
return;
|
||||
}
|
||||
let rlp = Rlp::new(data);
|
||||
let result = match packet_id {
|
||||
STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp),
|
||||
TRANSACTIONS_PACKET => SyncHandler::on_peer_transactions(sync, io, peer, &rlp),
|
||||
BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
|
||||
BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
|
||||
RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
|
||||
NEW_BLOCK_PACKET => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
|
||||
NEW_BLOCK_HASHES_PACKET => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
|
||||
SNAPSHOT_MANIFEST_PACKET => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
|
||||
SNAPSHOT_DATA_PACKET => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
|
||||
PRIVATE_TRANSACTION_PACKET => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
|
||||
SIGNED_PRIVATE_TRANSACTION_PACKET => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
|
||||
_ => {
|
||||
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
result.unwrap_or_else(|e| {
|
||||
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
|
||||
})
|
||||
}
|
||||
|
||||
/// Called when peer sends us new consensus packet
|
||||
pub fn on_consensus_packet(io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
trace!(target: "sync", "Received consensus packet from {:?}", peer_id);
|
||||
io.chain().queue_consensus_message(r.as_raw().to_vec());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called by peer when it is disconnecting
|
||||
pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId) {
|
||||
trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer));
|
||||
sync.handshaking_peers.remove(&peer);
|
||||
if sync.peers.contains_key(&peer) {
|
||||
debug!(target: "sync", "Disconnected {}", peer);
|
||||
sync.clear_peer_download(peer);
|
||||
sync.peers.remove(&peer);
|
||||
sync.active_peers.remove(&peer);
|
||||
sync.continue_sync(io);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when a new peer is connected
|
||||
pub fn on_peer_connected(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId) {
|
||||
trace!(target: "sync", "== Connected {}: {}", peer, io.peer_info(peer));
|
||||
if let Err(e) = sync.send_status(io, peer) {
|
||||
debug!(target:"sync", "Error sending status request: {:?}", e);
|
||||
io.disconnect_peer(peer);
|
||||
} else {
|
||||
sync.handshaking_peers.insert(peer, Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
/// Called by peer once it has new block bodies
|
||||
pub fn on_peer_new_block(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
let difficulty: U256 = r.val_at(1)?;
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
|
||||
peer.difficulty = Some(difficulty);
|
||||
}
|
||||
}
|
||||
let block_rlp = r.at(0)?;
|
||||
let header_rlp = block_rlp.at(0)?;
|
||||
let h = keccak(&header_rlp.as_raw());
|
||||
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
|
||||
let header: BlockHeader = header_rlp.as_val()?;
|
||||
if header.number() > sync.highest_block.unwrap_or(0) {
|
||||
sync.highest_block = Some(header.number());
|
||||
}
|
||||
let mut unknown = false;
|
||||
{
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||
peer.latest_hash = header.hash();
|
||||
}
|
||||
}
|
||||
let last_imported_number = sync.new_blocks.last_imported_block_number();
|
||||
if last_imported_number > header.number() && last_imported_number - header.number() > MAX_NEW_BLOCK_AGE {
|
||||
trace!(target: "sync", "Ignored ancient new block {:?}", h);
|
||||
io.disable_peer(peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
|
||||
Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => {
|
||||
trace!(target: "sync", "New block already in chain {:?}", h);
|
||||
},
|
||||
Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyQueued), _)) => {
|
||||
trace!(target: "sync", "New block already queued {:?}", h);
|
||||
},
|
||||
Ok(_) => {
|
||||
// abort current download of the same block
|
||||
sync.complete_sync(io);
|
||||
sync.new_blocks.mark_as_known(&header.hash(), header.number());
|
||||
trace!(target: "sync", "New block queued {:?} ({})", h, header.number());
|
||||
},
|
||||
Err(BlockImportError(BlockImportErrorKind::Block(BlockError::UnknownParent(p)), _)) => {
|
||||
unknown = true;
|
||||
trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h);
|
||||
},
|
||||
Err(e) => {
|
||||
debug!(target: "sync", "Bad new block {:?} : {:?}", h, e);
|
||||
io.disable_peer(peer_id);
|
||||
}
|
||||
};
|
||||
if unknown {
|
||||
if sync.state != SyncState::Idle {
|
||||
trace!(target: "sync", "NewBlock ignored while seeking");
|
||||
} else {
|
||||
trace!(target: "sync", "New unknown block {:?}", h);
|
||||
//TODO: handle too many unknown blocks
|
||||
sync.sync_peer(io, peer_id, true);
|
||||
}
|
||||
}
|
||||
sync.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles `NewHashes` packet. Initiates headers download for any unknown hashes.
|
||||
pub fn on_peer_new_hashes(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
let hashes: Vec<_> = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1))).collect();
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||
// Peer has new blocks with unknown difficulty
|
||||
peer.difficulty = None;
|
||||
if let Some(&(Ok(ref h), _)) = hashes.last() {
|
||||
peer.latest_hash = h.clone();
|
||||
}
|
||||
}
|
||||
if sync.state != SyncState::Idle {
|
||||
trace!(target: "sync", "Ignoring new hashes since we're already downloading.");
|
||||
let max = r.iter().take(MAX_NEW_HASHES).map(|item| item.val_at::<BlockNumber>(1).unwrap_or(0)).fold(0u64, cmp::max);
|
||||
if max > sync.highest_block.unwrap_or(0) {
|
||||
sync.highest_block = Some(max);
|
||||
}
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()?);
|
||||
let mut max_height: BlockNumber = 0;
|
||||
let mut new_hashes = Vec::new();
|
||||
let last_imported_number = sync.new_blocks.last_imported_block_number();
|
||||
for (rh, rn) in hashes {
|
||||
let hash = rh?;
|
||||
let number = rn?;
|
||||
if number > sync.highest_block.unwrap_or(0) {
|
||||
sync.highest_block = Some(number);
|
||||
}
|
||||
if sync.new_blocks.is_downloading(&hash) {
|
||||
continue;
|
||||
}
|
||||
if last_imported_number > number && last_imported_number - number > MAX_NEW_BLOCK_AGE {
|
||||
trace!(target: "sync", "Ignored ancient new block hash {:?}", hash);
|
||||
io.disable_peer(peer_id);
|
||||
continue;
|
||||
}
|
||||
match io.chain().block_status(BlockId::Hash(hash.clone())) {
|
||||
BlockStatus::InChain => {
|
||||
trace!(target: "sync", "New block hash already in chain {:?}", hash);
|
||||
},
|
||||
BlockStatus::Queued => {
|
||||
trace!(target: "sync", "New hash block already queued {:?}", hash);
|
||||
},
|
||||
BlockStatus::Unknown | BlockStatus::Pending => {
|
||||
new_hashes.push(hash.clone());
|
||||
if number > max_height {
|
||||
trace!(target: "sync", "New unknown block hash {:?}", hash);
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||
peer.latest_hash = hash.clone();
|
||||
}
|
||||
max_height = number;
|
||||
}
|
||||
},
|
||||
BlockStatus::Bad => {
|
||||
debug!(target: "sync", "Bad new block hash {:?}", hash);
|
||||
io.disable_peer(peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
if max_height != 0 {
|
||||
trace!(target: "sync", "Downloading blocks for new hashes");
|
||||
sync.new_blocks.reset_to(new_hashes);
|
||||
sync.state = SyncState::NewBlocks;
|
||||
sync.sync_peer(io, peer_id, true);
|
||||
}
|
||||
sync.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called by peer once it has new block bodies
|
||||
fn on_peer_block_bodies(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
sync.clear_peer_download(peer_id);
|
||||
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
|
||||
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockBodies) {
|
||||
trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
let item_count = r.item_count()?;
|
||||
trace!(target: "sync", "{} -> BlockBodies ({} entries), set = {:?}", peer_id, item_count, block_set);
|
||||
if item_count == 0 {
|
||||
sync.deactivate_peer(io, peer_id);
|
||||
}
|
||||
else if sync.state == SyncState::Waiting {
|
||||
trace!(target: "sync", "Ignored block bodies while waiting");
|
||||
}
|
||||
else
|
||||
{
|
||||
let result = {
|
||||
let downloader = match block_set {
|
||||
BlockSet::NewBlocks => &mut sync.new_blocks,
|
||||
BlockSet::OldBlocks => match sync.old_blocks {
|
||||
None => {
|
||||
trace!(target: "sync", "Ignored block headers while block download is inactive");
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
},
|
||||
Some(ref mut blocks) => blocks,
|
||||
}
|
||||
};
|
||||
downloader.import_bodies(io, r)
|
||||
};
|
||||
|
||||
match result {
|
||||
Err(DownloaderImportError::Invalid) => {
|
||||
io.disable_peer(peer_id);
|
||||
sync.deactivate_peer(io, peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
},
|
||||
Err(DownloaderImportError::Useless) => {
|
||||
sync.deactivate_peer(io, peer_id);
|
||||
},
|
||||
Ok(()) => (),
|
||||
}
|
||||
|
||||
sync.collect_blocks(io, block_set);
|
||||
sync.sync_peer(io, peer_id, false);
|
||||
}
|
||||
sync.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_peer_confirmed(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
||||
sync.sync_peer(io, peer_id, false);
|
||||
}
|
||||
|
||||
fn on_peer_fork_header(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
{
|
||||
let peer = sync.peers.get_mut(&peer_id).expect("Is only called when peer is present in peers");
|
||||
peer.asking = PeerAsking::Nothing;
|
||||
let item_count = r.item_count()?;
|
||||
let (fork_number, fork_hash) = sync.fork_block.expect("ForkHeader request is sent only fork block is Some; qed").clone();
|
||||
|
||||
if item_count == 0 || item_count != 1 {
|
||||
trace!(target: "sync", "{}: Chain is too short to confirm the block", peer_id);
|
||||
io.disable_peer(peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let header = r.at(0)?.as_raw();
|
||||
if keccak(&header) != fork_hash {
|
||||
trace!(target: "sync", "{}: Fork mismatch", peer_id);
|
||||
io.disable_peer(peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
trace!(target: "sync", "{}: Confirmed peer", peer_id);
|
||||
peer.confirmation = ForkConfirmation::Confirmed;
|
||||
if !io.chain_overlay().read().contains_key(&fork_number) {
|
||||
io.chain_overlay().write().insert(fork_number, header.to_vec());
|
||||
}
|
||||
}
|
||||
SyncHandler::on_peer_confirmed(sync, io, peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
/// Called by peer once it has new block headers during sync
|
||||
fn on_peer_block_headers(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
let is_fork_header_request = match sync.peers.get(&peer_id) {
|
||||
Some(peer) if peer.asking == PeerAsking::ForkHeader => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if is_fork_header_request {
|
||||
return SyncHandler::on_peer_fork_header(sync, io, peer_id, r);
|
||||
}
|
||||
|
||||
sync.clear_peer_download(peer_id);
|
||||
let expected_hash = sync.peers.get(&peer_id).and_then(|p| p.asking_hash);
|
||||
let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false);
|
||||
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
|
||||
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) || expected_hash.is_none() || !allowed {
|
||||
trace!(target: "sync", "{}: Ignored unexpected headers, expected_hash = {:?}", peer_id, expected_hash);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
let item_count = r.item_count()?;
|
||||
trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, sync.state, block_set);
|
||||
if (sync.state == SyncState::Idle || sync.state == SyncState::WaitingPeers) && sync.old_blocks.is_none() {
|
||||
trace!(target: "sync", "Ignored unexpected block headers");
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
if sync.state == SyncState::Waiting {
|
||||
trace!(target: "sync", "Ignored block headers while waiting");
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let result = {
|
||||
let downloader = match block_set {
|
||||
BlockSet::NewBlocks => &mut sync.new_blocks,
|
||||
BlockSet::OldBlocks => {
|
||||
match sync.old_blocks {
|
||||
None => {
|
||||
trace!(target: "sync", "Ignored block headers while block download is inactive");
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
},
|
||||
Some(ref mut blocks) => blocks,
|
||||
}
|
||||
}
|
||||
};
|
||||
downloader.import_headers(io, r, expected_hash)
|
||||
};
|
||||
|
||||
match result {
|
||||
Err(DownloaderImportError::Useless) => {
|
||||
sync.deactivate_peer(io, peer_id);
|
||||
},
|
||||
Err(DownloaderImportError::Invalid) => {
|
||||
io.disable_peer(peer_id);
|
||||
sync.deactivate_peer(io, peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
},
|
||||
Ok(DownloadAction::Reset) => {
|
||||
// mark all outstanding requests as expired
|
||||
trace!("Resetting downloads for {:?}", block_set);
|
||||
for (_, ref mut p) in sync.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) {
|
||||
p.reset_asking();
|
||||
}
|
||||
|
||||
}
|
||||
Ok(DownloadAction::None) => {},
|
||||
}
|
||||
|
||||
sync.collect_blocks(io, block_set);
|
||||
// give a task to the same peer first if received valuable headers.
|
||||
sync.sync_peer(io, peer_id, false);
|
||||
// give tasks to other peers
|
||||
sync.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called by peer once it has new block receipts
|
||||
fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
sync.clear_peer_download(peer_id);
|
||||
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
|
||||
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) {
|
||||
trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
let item_count = r.item_count()?;
|
||||
trace!(target: "sync", "{} -> BlockReceipts ({} entries)", peer_id, item_count);
|
||||
if item_count == 0 {
|
||||
sync.deactivate_peer(io, peer_id);
|
||||
}
|
||||
else if sync.state == SyncState::Waiting {
|
||||
trace!(target: "sync", "Ignored block receipts while waiting");
|
||||
}
|
||||
else
|
||||
{
|
||||
let result = {
|
||||
let downloader = match block_set {
|
||||
BlockSet::NewBlocks => &mut sync.new_blocks,
|
||||
BlockSet::OldBlocks => match sync.old_blocks {
|
||||
None => {
|
||||
trace!(target: "sync", "Ignored block headers while block download is inactive");
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
},
|
||||
Some(ref mut blocks) => blocks,
|
||||
}
|
||||
};
|
||||
downloader.import_receipts(io, r)
|
||||
};
|
||||
|
||||
match result {
|
||||
Err(DownloaderImportError::Invalid) => {
|
||||
io.disable_peer(peer_id);
|
||||
sync.deactivate_peer(io, peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
},
|
||||
Err(DownloaderImportError::Useless) => {
|
||||
sync.deactivate_peer(io, peer_id);
|
||||
},
|
||||
Ok(()) => (),
|
||||
}
|
||||
|
||||
sync.collect_blocks(io, block_set);
|
||||
sync.sync_peer(io, peer_id, false);
|
||||
}
|
||||
sync.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when snapshot manifest is downloaded from a peer.
|
||||
fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
sync.clear_peer_download(peer_id);
|
||||
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || sync.state != SyncState::SnapshotManifest {
|
||||
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let manifest_rlp = r.at(0)?;
|
||||
let manifest = match ManifestData::from_rlp(manifest_rlp.as_raw()) {
|
||||
Err(e) => {
|
||||
trace!(target: "sync", "{}: Ignored bad manifest: {:?}", peer_id, e);
|
||||
io.disable_peer(peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
Ok(manifest) => manifest,
|
||||
};
|
||||
|
||||
let is_supported_version = io.snapshot_service().supported_versions()
|
||||
.map_or(false, |(l, h)| manifest.version >= l && manifest.version <= h);
|
||||
|
||||
if !is_supported_version {
|
||||
trace!(target: "sync", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version);
|
||||
io.disable_peer(peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
sync.snapshot.reset_to(&manifest, &keccak(manifest_rlp.as_raw()));
|
||||
io.snapshot_service().begin_restore(manifest);
|
||||
sync.state = SyncState::SnapshotData;
|
||||
|
||||
// give a task to the same peer first.
|
||||
sync.sync_peer(io, peer_id, false);
|
||||
// give tasks to other peers
|
||||
sync.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when snapshot data is downloaded from a peer.
|
||||
fn on_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
sync.clear_peer_download(peer_id);
|
||||
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || (sync.state != SyncState::SnapshotData && sync.state != SyncState::SnapshotWaiting) {
|
||||
trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// check service status
|
||||
let status = io.snapshot_service().status();
|
||||
match status {
|
||||
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
||||
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
|
||||
sync.state = SyncState::WaitingPeers;
|
||||
|
||||
// only note bad if restoration failed.
|
||||
if let (Some(hash), RestorationStatus::Failed) = (sync.snapshot.snapshot_hash(), status) {
|
||||
trace!(target: "sync", "Noting snapshot hash {} as bad", hash);
|
||||
sync.snapshot.note_bad(hash);
|
||||
}
|
||||
|
||||
sync.snapshot.clear();
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
},
|
||||
RestorationStatus::Ongoing { .. } => {
|
||||
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
|
||||
},
|
||||
}
|
||||
|
||||
let snapshot_data: Bytes = r.val_at(0)?;
|
||||
match sync.snapshot.validate_chunk(&snapshot_data) {
|
||||
Ok(ChunkType::Block(hash)) => {
|
||||
trace!(target: "sync", "{}: Processing block chunk", peer_id);
|
||||
io.snapshot_service().restore_block_chunk(hash, snapshot_data);
|
||||
}
|
||||
Ok(ChunkType::State(hash)) => {
|
||||
trace!(target: "sync", "{}: Processing state chunk", peer_id);
|
||||
io.snapshot_service().restore_state_chunk(hash, snapshot_data);
|
||||
}
|
||||
Err(()) => {
|
||||
trace!(target: "sync", "{}: Got bad snapshot chunk", peer_id);
|
||||
io.disconnect_peer(peer_id);
|
||||
sync.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if sync.snapshot.is_complete() {
|
||||
// wait for snapshot restoration process to complete
|
||||
sync.state = SyncState::SnapshotWaiting;
|
||||
}
|
||||
// give a task to the same peer first.
|
||||
sync.sync_peer(io, peer_id, false);
|
||||
// give tasks to other peers
|
||||
sync.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called by peer to report status
|
||||
fn on_peer_status(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
sync.handshaking_peers.remove(&peer_id);
|
||||
let protocol_version: u8 = r.val_at(0)?;
|
||||
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0;
|
||||
let peer = PeerInfo {
|
||||
protocol_version: protocol_version,
|
||||
network_id: r.val_at(1)?,
|
||||
difficulty: Some(r.val_at(2)?),
|
||||
latest_hash: r.val_at(3)?,
|
||||
genesis: r.val_at(4)?,
|
||||
asking: PeerAsking::Nothing,
|
||||
asking_blocks: Vec::new(),
|
||||
asking_hash: None,
|
||||
ask_time: Instant::now(),
|
||||
last_sent_transactions: HashSet::new(),
|
||||
expired: false,
|
||||
confirmation: if sync.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
|
||||
asking_snapshot_data: None,
|
||||
snapshot_hash: if warp_protocol { Some(r.val_at(5)?) } else { None },
|
||||
snapshot_number: if warp_protocol { Some(r.val_at(6)?) } else { None },
|
||||
block_set: None,
|
||||
};
|
||||
|
||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})",
|
||||
peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number);
|
||||
if io.is_expired() {
|
||||
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if sync.peers.contains_key(&peer_id) {
|
||||
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
|
||||
return Ok(());
|
||||
}
|
||||
let chain_info = io.chain().chain_info();
|
||||
if peer.genesis != chain_info.genesis_hash {
|
||||
io.disable_peer(peer_id);
|
||||
trace!(target: "sync", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, chain_info.genesis_hash, peer.genesis);
|
||||
return Ok(());
|
||||
}
|
||||
if peer.network_id != sync.network_id {
|
||||
io.disable_peer(peer_id);
|
||||
trace!(target: "sync", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, sync.network_id, peer.network_id);
|
||||
return Ok(());
|
||||
}
|
||||
if (warp_protocol && peer.protocol_version != PAR_PROTOCOL_VERSION_1 && peer.protocol_version != PAR_PROTOCOL_VERSION_2 && peer.protocol_version != PAR_PROTOCOL_VERSION_3)
|
||||
|| (!warp_protocol && peer.protocol_version != ETH_PROTOCOL_VERSION_63 && peer.protocol_version != ETH_PROTOCOL_VERSION_62) {
|
||||
io.disable_peer(peer_id);
|
||||
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if sync.sync_start_time.is_none() {
|
||||
sync.sync_start_time = Some(Instant::now());
|
||||
}
|
||||
|
||||
sync.peers.insert(peer_id.clone(), peer);
|
||||
// Don't activate peer immediatelly when searching for common block.
|
||||
// Let the current sync round complete first.
|
||||
sync.active_peers.insert(peer_id.clone());
|
||||
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
|
||||
if let Some((fork_block, _)) = sync.fork_block {
|
||||
SyncRequester::request_fork_header(sync, io, peer_id, fork_block);
|
||||
} else {
|
||||
SyncHandler::on_peer_confirmed(sync, io, peer_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when peer sends us new transactions
|
||||
fn on_peer_transactions(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
// Accept transactions only when fully synced
|
||||
if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) {
|
||||
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let item_count = r.item_count()?;
|
||||
trace!(target: "sync", "{:02} -> Transactions ({} entries)", peer_id, item_count);
|
||||
let mut transactions = Vec::with_capacity(item_count);
|
||||
for i in 0 .. item_count {
|
||||
let rlp = r.at(i)?;
|
||||
let tx = rlp.as_raw().to_vec();
|
||||
transactions.push(tx);
|
||||
}
|
||||
io.chain().queue_transactions(transactions, peer_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when peer sends us signed private transaction packet
|
||||
fn on_signed_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
trace!(target: "sync", "Received signed private transaction packet from {:?}", peer_id);
|
||||
if let Err(e) = sync.private_tx_handler.import_signed_private_transaction(r.as_raw()) {
|
||||
trace!(target: "sync", "Ignoring the message, error queueing: {}", e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when peer sends us new private transaction packet
|
||||
fn on_private_transaction(sync: &ChainSync, _io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
|
||||
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "{} Ignoring packet from unconfirmed/unknown peer", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
trace!(target: "sync", "Received private transaction packet from {:?}", peer_id);
|
||||
|
||||
if let Err(e) = sync.private_tx_handler.import_private_transaction(r.as_raw()) {
|
||||
trace!(target: "sync", "Ignoring the message, error queueing: {}", e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use ethcore::client::{ChainInfo, EachBlockWith, TestBlockChainClient};
|
||||
use parking_lot::RwLock;
|
||||
use rlp::{Rlp};
|
||||
use std::collections::{VecDeque};
|
||||
use tests::helpers::{TestIo};
|
||||
use tests::snapshot::TestSnapshotService;
|
||||
|
||||
use super::*;
|
||||
use super::super::tests::{
|
||||
dummy_sync_with_peer,
|
||||
get_dummy_block,
|
||||
get_dummy_blocks,
|
||||
get_dummy_hashes,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn handles_peer_new_hashes() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let hashes_data = get_dummy_hashes();
|
||||
let hashes_rlp = Rlp::new(&hashes_data);
|
||||
|
||||
let result = SyncHandler::on_peer_new_hashes(&mut sync, &mut io, 0, &hashes_rlp);
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handles_peer_new_block_malformed() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
|
||||
let block_data = get_dummy_block(11, client.chain_info().best_block_hash);
|
||||
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
//sync.have_common_block = true;
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let block = Rlp::new(&block_data);
|
||||
|
||||
let result = SyncHandler::on_peer_new_block(&mut sync, &mut io, 0, &block);
|
||||
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handles_peer_new_block() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
|
||||
let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash);
|
||||
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let block = Rlp::new(&block_data);
|
||||
|
||||
let result = SyncHandler::on_peer_new_block(&mut sync, &mut io, 0, &block);
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handles_peer_new_block_empty() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let empty_data = vec![];
|
||||
let block = Rlp::new(&empty_data);
|
||||
|
||||
let result = SyncHandler::on_peer_new_block(&mut sync, &mut io, 0, &block);
|
||||
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handles_peer_new_hashes_empty() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let empty_hashes_data = vec![];
|
||||
let hashes_rlp = Rlp::new(&empty_hashes_data);
|
||||
|
||||
let result = SyncHandler::on_peer_new_hashes(&mut sync, &mut io, 0, &hashes_rlp);
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
}
|
1379
ethcore/sync/src/chain/mod.rs
Normal file
1379
ethcore/sync/src/chain/mod.rs
Normal file
File diff suppressed because it is too large
Load Diff
636
ethcore/sync/src/chain/propagator.rs
Normal file
636
ethcore/sync/src/chain/propagator.rs
Normal file
@ -0,0 +1,636 @@
|
||||
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use bytes::Bytes;
|
||||
use ethereum_types::H256;
|
||||
use ethcore::client::BlockChainInfo;
|
||||
use ethcore::header::BlockNumber;
|
||||
use network::{PeerId, PacketId};
|
||||
use rand::Rng;
|
||||
use rlp::{Encodable, RlpStream};
|
||||
use sync_io::SyncIo;
|
||||
use std::cmp;
|
||||
use std::collections::HashSet;
|
||||
use transaction::SignedTransaction;
|
||||
|
||||
use super::{
|
||||
random,
|
||||
ChainSync,
|
||||
MAX_PEER_LAG_PROPAGATION,
|
||||
MAX_PEERS_PROPAGATION,
|
||||
MAX_TRANSACTION_PACKET_SIZE,
|
||||
MAX_TRANSACTIONS_TO_PROPAGATE,
|
||||
MIN_PEERS_PROPAGATION,
|
||||
CONSENSUS_DATA_PACKET,
|
||||
NEW_BLOCK_HASHES_PACKET,
|
||||
NEW_BLOCK_PACKET,
|
||||
PRIVATE_TRANSACTION_PACKET,
|
||||
SIGNED_PRIVATE_TRANSACTION_PACKET,
|
||||
TRANSACTIONS_PACKET,
|
||||
};
|
||||
|
||||
/// Checks if peer is able to process service transactions
|
||||
fn accepts_service_transaction(client_id: &str) -> bool {
|
||||
// Parity versions starting from this will accept service-transactions
|
||||
const SERVICE_TRANSACTIONS_VERSION: (u32, u32) = (1u32, 6u32);
|
||||
// Parity client string prefix
|
||||
const PARITY_CLIENT_ID_PREFIX: &'static str = "Parity/v";
|
||||
|
||||
if !client_id.starts_with(PARITY_CLIENT_ID_PREFIX) {
|
||||
return false;
|
||||
}
|
||||
let ver: Vec<u32> = client_id[PARITY_CLIENT_ID_PREFIX.len()..].split('.')
|
||||
.take(2)
|
||||
.filter_map(|s| s.parse().ok())
|
||||
.collect();
|
||||
ver.len() == 2 && (ver[0] > SERVICE_TRANSACTIONS_VERSION.0 || (ver[0] == SERVICE_TRANSACTIONS_VERSION.0 && ver[1] >= SERVICE_TRANSACTIONS_VERSION.1))
|
||||
}
|
||||
|
||||
/// The Chain Sync Propagator: propagates data to peers
|
||||
pub struct SyncPropagator;
|
||||
|
||||
impl SyncPropagator {
|
||||
/// propagates latest block to a set of peers
|
||||
pub fn propagate_blocks(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, blocks: &[H256], peers: &[PeerId]) -> usize {
|
||||
trace!(target: "sync", "Sending NewBlocks to {:?}", peers);
|
||||
let mut sent = 0;
|
||||
for peer_id in peers {
|
||||
if blocks.is_empty() {
|
||||
let rlp = ChainSync::create_latest_block_rlp(io.chain());
|
||||
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
|
||||
} else {
|
||||
for h in blocks {
|
||||
let rlp = ChainSync::create_new_block_rlp(io.chain(), h);
|
||||
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
|
||||
}
|
||||
}
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
||||
peer.latest_hash = chain_info.best_block_hash.clone();
|
||||
}
|
||||
sent += 1;
|
||||
}
|
||||
sent
|
||||
}
|
||||
|
||||
/// propagates new known hashes to all peers
|
||||
pub fn propagate_new_hashes(sync: &mut ChainSync, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[PeerId]) -> usize {
|
||||
trace!(target: "sync", "Sending NewHashes to {:?}", peers);
|
||||
let mut sent = 0;
|
||||
let last_parent = *io.chain().best_block_header().parent_hash();
|
||||
for peer_id in peers {
|
||||
sent += match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &chain_info.best_block_hash) {
|
||||
Some(rlp) => {
|
||||
{
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
||||
peer.latest_hash = chain_info.best_block_hash.clone();
|
||||
}
|
||||
}
|
||||
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp);
|
||||
1
|
||||
},
|
||||
None => 0
|
||||
}
|
||||
}
|
||||
sent
|
||||
}
|
||||
|
||||
/// propagates new transactions to all peers
|
||||
pub fn propagate_new_transactions(sync: &mut ChainSync, io: &mut SyncIo) -> usize {
|
||||
// Early out if nobody to send to.
|
||||
if sync.peers.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let transactions = io.chain().ready_transactions();
|
||||
if transactions.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let (transactions, service_transactions): (Vec<_>, Vec<_>) = transactions.iter()
|
||||
.map(|tx| tx.signed())
|
||||
.partition(|tx| !tx.gas_price.is_zero());
|
||||
|
||||
// usual transactions could be propagated to all peers
|
||||
let mut affected_peers = HashSet::new();
|
||||
if !transactions.is_empty() {
|
||||
let peers = SyncPropagator::select_peers_for_transactions(sync, |_| true);
|
||||
affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, peers, transactions);
|
||||
}
|
||||
|
||||
// most of times service_transactions will be empty
|
||||
// => there's no need to merge packets
|
||||
if !service_transactions.is_empty() {
|
||||
let service_transactions_peers = SyncPropagator::select_peers_for_transactions(sync, |peer_id| accepts_service_transaction(&io.peer_info(*peer_id)));
|
||||
let service_transactions_affected_peers = SyncPropagator::propagate_transactions_to_peers(sync, io, service_transactions_peers, service_transactions);
|
||||
affected_peers.extend(&service_transactions_affected_peers);
|
||||
}
|
||||
|
||||
affected_peers.len()
|
||||
}
|
||||
|
||||
fn propagate_transactions_to_peers(sync: &mut ChainSync, io: &mut SyncIo, peers: Vec<PeerId>, transactions: Vec<&SignedTransaction>) -> HashSet<PeerId> {
|
||||
let all_transactions_hashes = transactions.iter()
|
||||
.map(|tx| tx.hash())
|
||||
.collect::<HashSet<H256>>();
|
||||
let all_transactions_rlp = {
|
||||
let mut packet = RlpStream::new_list(transactions.len());
|
||||
for tx in &transactions { packet.append(&**tx); }
|
||||
packet.out()
|
||||
};
|
||||
|
||||
// Clear old transactions from stats
|
||||
sync.transactions_stats.retain(&all_transactions_hashes);
|
||||
|
||||
// sqrt(x)/x scaled to max u32
|
||||
let block_number = io.chain().chain_info().best_block_number;
|
||||
|
||||
let lucky_peers = {
|
||||
peers.into_iter()
|
||||
.filter_map(|peer_id| {
|
||||
let stats = &mut sync.transactions_stats;
|
||||
let peer_info = sync.peers.get_mut(&peer_id)
|
||||
.expect("peer_id is form peers; peers is result of select_peers_for_transactions; select_peers_for_transactions selects peers from self.peers; qed");
|
||||
|
||||
// Send all transactions
|
||||
if peer_info.last_sent_transactions.is_empty() {
|
||||
// update stats
|
||||
for hash in &all_transactions_hashes {
|
||||
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
|
||||
stats.propagated(hash, id, block_number);
|
||||
}
|
||||
peer_info.last_sent_transactions = all_transactions_hashes.clone();
|
||||
return Some((peer_id, all_transactions_hashes.len(), all_transactions_rlp.clone()));
|
||||
}
|
||||
|
||||
// Get hashes of all transactions to send to this peer
|
||||
let to_send = all_transactions_hashes.difference(&peer_info.last_sent_transactions)
|
||||
.take(MAX_TRANSACTIONS_TO_PROPAGATE)
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
if to_send.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Construct RLP
|
||||
let (packet, to_send) = {
|
||||
let mut to_send = to_send;
|
||||
let mut packet = RlpStream::new();
|
||||
packet.begin_unbounded_list();
|
||||
let mut pushed = 0;
|
||||
for tx in &transactions {
|
||||
let hash = tx.hash();
|
||||
if to_send.contains(&hash) {
|
||||
let mut transaction = RlpStream::new();
|
||||
tx.rlp_append(&mut transaction);
|
||||
let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE);
|
||||
if !appended {
|
||||
// Maximal packet size reached just proceed with sending
|
||||
debug!("Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len());
|
||||
to_send = to_send.into_iter().take(pushed).collect();
|
||||
break;
|
||||
}
|
||||
pushed += 1;
|
||||
}
|
||||
}
|
||||
packet.complete_unbounded_list();
|
||||
(packet, to_send)
|
||||
};
|
||||
|
||||
// Update stats
|
||||
let id = io.peer_session_info(peer_id).and_then(|info| info.id);
|
||||
for hash in &to_send {
|
||||
// update stats
|
||||
stats.propagated(hash, id, block_number);
|
||||
}
|
||||
|
||||
peer_info.last_sent_transactions = all_transactions_hashes
|
||||
.intersection(&peer_info.last_sent_transactions)
|
||||
.chain(&to_send)
|
||||
.cloned()
|
||||
.collect();
|
||||
Some((peer_id, to_send.len(), packet.out()))
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
// Send RLPs
|
||||
let mut peers = HashSet::new();
|
||||
if lucky_peers.len() > 0 {
|
||||
let mut max_sent = 0;
|
||||
let lucky_peers_len = lucky_peers.len();
|
||||
for (peer_id, sent, rlp) in lucky_peers {
|
||||
peers.insert(peer_id);
|
||||
SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
|
||||
trace!(target: "sync", "{:02} <- Transactions ({} entries)", peer_id, sent);
|
||||
max_sent = cmp::max(max_sent, sent);
|
||||
}
|
||||
debug!(target: "sync", "Sent up to {} transactions to {} peers.", max_sent, lucky_peers_len);
|
||||
}
|
||||
|
||||
peers
|
||||
}
|
||||
|
||||
pub fn propagate_latest_blocks(sync: &mut ChainSync, io: &mut SyncIo, sealed: &[H256]) {
|
||||
let chain_info = io.chain().chain_info();
|
||||
if (((chain_info.best_block_number as i64) - (sync.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION {
|
||||
let mut peers = sync.get_lagging_peers(&chain_info);
|
||||
if sealed.is_empty() {
|
||||
let hashes = SyncPropagator::propagate_new_hashes(sync, &chain_info, io, &peers);
|
||||
peers = ChainSync::select_random_peers(&peers);
|
||||
let blocks = SyncPropagator::propagate_blocks(sync, &chain_info, io, sealed, &peers);
|
||||
if blocks != 0 || hashes != 0 {
|
||||
trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes);
|
||||
}
|
||||
} else {
|
||||
SyncPropagator::propagate_blocks(sync, &chain_info, io, sealed, &peers);
|
||||
SyncPropagator::propagate_new_hashes(sync, &chain_info, io, &peers);
|
||||
trace!(target: "sync", "Sent sealed block to all peers");
|
||||
};
|
||||
}
|
||||
sync.last_sent_block_number = chain_info.best_block_number;
|
||||
}
|
||||
|
||||
/// Distribute valid proposed blocks to subset of current peers.
|
||||
pub fn propagate_proposed_blocks(sync: &mut ChainSync, io: &mut SyncIo, proposed: &[Bytes]) {
|
||||
let peers = sync.get_consensus_peers();
|
||||
trace!(target: "sync", "Sending proposed blocks to {:?}", peers);
|
||||
for block in proposed {
|
||||
let rlp = ChainSync::create_block_rlp(
|
||||
block,
|
||||
io.chain().chain_info().total_difficulty
|
||||
);
|
||||
for peer_id in &peers {
|
||||
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast consensus message to peers.
|
||||
pub fn propagate_consensus_packet(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) {
|
||||
let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers());
|
||||
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
|
||||
for peer_id in lucky_peers {
|
||||
SyncPropagator::send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast private transaction message to peers.
|
||||
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) {
|
||||
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers());
|
||||
trace!(target: "sync", "Sending private transaction packet to {:?}", lucky_peers);
|
||||
for peer_id in lucky_peers {
|
||||
SyncPropagator::send_packet(io, peer_id, PRIVATE_TRANSACTION_PACKET, packet.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast signed private transaction message to peers.
|
||||
pub fn propagate_signed_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, packet: Bytes) {
|
||||
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers());
|
||||
trace!(target: "sync", "Sending signed private transaction packet to {:?}", lucky_peers);
|
||||
for peer_id in lucky_peers {
|
||||
SyncPropagator::send_packet(io, peer_id, SIGNED_PRIVATE_TRANSACTION_PACKET, packet.clone());
|
||||
}
|
||||
}
|
||||
|
||||
fn select_peers_for_transactions<F>(sync: &ChainSync, filter: F) -> Vec<PeerId>
|
||||
where F: Fn(&PeerId) -> bool {
|
||||
// sqrt(x)/x scaled to max u32
|
||||
let fraction = ((sync.peers.len() as f64).powf(-0.5) * (u32::max_value() as f64).round()) as u32;
|
||||
let small = sync.peers.len() < MIN_PEERS_PROPAGATION;
|
||||
|
||||
let mut random = random::new();
|
||||
sync.peers.keys()
|
||||
.cloned()
|
||||
.filter(filter)
|
||||
.filter(|_| small || random.next_u32() < fraction)
|
||||
.take(MAX_PEERS_PROPAGATION)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Generic packet sender
|
||||
fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
|
||||
if let Err(e) = sync.send(peer_id, packet_id, packet) {
|
||||
debug!(target:"sync", "Error sending packet: {:?}", e);
|
||||
sync.disconnect_peer(peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use ethcore::client::{BlockInfo, ChainInfo, EachBlockWith, TestBlockChainClient};
|
||||
use parking_lot::RwLock;
|
||||
use private_tx::NoopPrivateTxHandler;
|
||||
use rlp::{Rlp};
|
||||
use std::collections::{VecDeque};
|
||||
use tests::helpers::{TestIo};
|
||||
use tests::snapshot::TestSnapshotService;
|
||||
|
||||
use super::{*, super::{*, tests::*}};
|
||||
|
||||
#[test]
|
||||
fn sends_new_hashes_to_lagging_peer() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let peers = sync.get_lagging_peers(&chain_info);
|
||||
let peer_count = SyncPropagator::propagate_new_hashes(&mut sync, &chain_info, &mut io, &peers);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.packets.len());
|
||||
// 1 peer should be updated
|
||||
assert_eq!(1, peer_count);
|
||||
// NEW_BLOCK_HASHES_PACKET
|
||||
assert_eq!(0x01, io.packets[0].packet_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sends_latest_block_to_lagging_peer() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
let peers = sync.get_lagging_peers(&chain_info);
|
||||
let peer_count = SyncPropagator::propagate_blocks(&mut sync, &chain_info, &mut io, &[], &peers);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.packets.len());
|
||||
// 1 peer should be updated
|
||||
assert_eq!(1, peer_count);
|
||||
// NEW_BLOCK_PACKET
|
||||
assert_eq!(0x07, io.packets[0].packet_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sends_sealed_block() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let hash = client.block_hash(BlockId::Number(99)).unwrap();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
let peers = sync.get_lagging_peers(&chain_info);
|
||||
let peer_count = SyncPropagator::propagate_blocks(&mut sync ,&chain_info, &mut io, &[hash.clone()], &peers);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.packets.len());
|
||||
// 1 peer should be updated
|
||||
assert_eq!(1, peer_count);
|
||||
// NEW_BLOCK_PACKET
|
||||
assert_eq!(0x07, io.packets[0].packet_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sends_proposed_block() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(2, EachBlockWith::Uncle);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let block = client.block(BlockId::Latest).unwrap().into_inner();
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
|
||||
sync.peers.insert(0,
|
||||
PeerInfo {
|
||||
// Messaging protocol
|
||||
protocol_version: 2,
|
||||
genesis: H256::zero(),
|
||||
network_id: 0,
|
||||
latest_hash: client.block_hash_delta_minus(1),
|
||||
difficulty: None,
|
||||
asking: PeerAsking::Nothing,
|
||||
asking_blocks: Vec::new(),
|
||||
asking_hash: None,
|
||||
ask_time: Instant::now(),
|
||||
last_sent_transactions: HashSet::new(),
|
||||
expired: false,
|
||||
confirmation: ForkConfirmation::Confirmed,
|
||||
snapshot_number: None,
|
||||
snapshot_hash: None,
|
||||
asking_snapshot_data: None,
|
||||
block_set: None,
|
||||
});
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
SyncPropagator::propagate_proposed_blocks(&mut sync, &mut io, &[block]);
|
||||
|
||||
// 1 message should be sent
|
||||
assert_eq!(1, io.packets.len());
|
||||
// NEW_BLOCK_PACKET
|
||||
assert_eq!(0x07, io.packets[0].packet_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn propagates_transactions() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
// Try to propagate same transactions for the second time
|
||||
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
// Even after new block transactions should not be propagated twice
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
|
||||
// Try to propagate same transactions for the third time
|
||||
let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
|
||||
// 1 message should be send
|
||||
assert_eq!(1, io.packets.len());
|
||||
// 1 peer should be updated but only once
|
||||
assert_eq!(1, peer_count);
|
||||
assert_eq!(0, peer_count2);
|
||||
assert_eq!(0, peer_count3);
|
||||
// TRANSACTIONS_PACKET
|
||||
assert_eq!(0x02, io.packets[0].packet_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn does_not_propagate_new_transactions_after_new_block() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
io.chain.insert_transaction_to_queue();
|
||||
// New block import should not trigger propagation.
|
||||
// (we only propagate on timeout)
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
|
||||
|
||||
// 2 message should be send
|
||||
assert_eq!(1, io.packets.len());
|
||||
// 1 peer should receive the message
|
||||
assert_eq!(1, peer_count);
|
||||
// TRANSACTIONS_PACKET
|
||||
assert_eq!(0x02, io.packets[0].packet_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn does_not_fail_for_no_peers() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
client.insert_transaction_to_queue();
|
||||
// Sync with no peers
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[], &[]);
|
||||
// Try to propagate same transactions for the second time
|
||||
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
|
||||
assert_eq!(0, io.packets.len());
|
||||
assert_eq!(0, peer_count);
|
||||
assert_eq!(0, peer_count2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn propagates_transactions_without_alternating() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
// should sent some
|
||||
{
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
let peer_count = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
assert_eq!(1, io.packets.len());
|
||||
assert_eq!(1, peer_count);
|
||||
}
|
||||
// Insert some more
|
||||
client.insert_transaction_to_queue();
|
||||
let (peer_count2, peer_count3) = {
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
// Propagate new transactions
|
||||
let peer_count2 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
// And now the peer should have all transactions
|
||||
let peer_count3 = SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
(peer_count2, peer_count3)
|
||||
};
|
||||
|
||||
// 2 message should be send (in total)
|
||||
assert_eq!(2, queue.read().len());
|
||||
// 1 peer should be updated but only once after inserting new transaction
|
||||
assert_eq!(1, peer_count2);
|
||||
assert_eq!(0, peer_count3);
|
||||
// TRANSACTIONS_PACKET
|
||||
assert_eq!(0x02, queue.read()[0].packet_id);
|
||||
assert_eq!(0x02, queue.read()[1].packet_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_maintain_transations_propagation_stats() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Uncle);
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
|
||||
let stats = sync.transactions_stats();
|
||||
assert_eq!(stats.len(), 1, "Should maintain stats for single transaction.")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_propagate_service_transaction_to_selected_peers_only() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.insert_transaction_with_gas_price_to_queue(U256::zero());
|
||||
let block_hash = client.block_hash_delta_minus(1);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
// when peer#1 is Geth
|
||||
insert_dummy_peer(&mut sync, 1, block_hash);
|
||||
io.peers_info.insert(1, "Geth".to_owned());
|
||||
// and peer#2 is Parity, accepting service transactions
|
||||
insert_dummy_peer(&mut sync, 2, block_hash);
|
||||
io.peers_info.insert(2, "Parity/v1.6".to_owned());
|
||||
// and peer#3 is Parity, discarding service transactions
|
||||
insert_dummy_peer(&mut sync, 3, block_hash);
|
||||
io.peers_info.insert(3, "Parity/v1.5".to_owned());
|
||||
// and peer#4 is Parity, accepting service transactions
|
||||
insert_dummy_peer(&mut sync, 4, block_hash);
|
||||
io.peers_info.insert(4, "Parity/v1.7.3-ABCDEFGH".to_owned());
|
||||
|
||||
// and new service transaction is propagated to peers
|
||||
SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
|
||||
// peer#2 && peer#4 are receiving service transaction
|
||||
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 2)); // TRANSACTIONS_PACKET
|
||||
assert!(io.packets.iter().any(|p| p.packet_id == 0x02 && p.recipient == 4)); // TRANSACTIONS_PACKET
|
||||
assert_eq!(io.packets.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_propagate_service_transaction_is_sent_as_separate_message() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let tx1_hash = client.insert_transaction_to_queue();
|
||||
let tx2_hash = client.insert_transaction_with_gas_price_to_queue(U256::zero());
|
||||
let block_hash = client.block_hash_delta_minus(1);
|
||||
let mut sync = ChainSync::new(SyncConfig::default(), &client, Arc::new(NoopPrivateTxHandler));
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
// when peer#1 is Parity, accepting service transactions
|
||||
insert_dummy_peer(&mut sync, 1, block_hash);
|
||||
io.peers_info.insert(1, "Parity/v1.6".to_owned());
|
||||
|
||||
// and service + non-service transactions are propagated to peers
|
||||
SyncPropagator::propagate_new_transactions(&mut sync, &mut io);
|
||||
|
||||
// two separate packets for peer are queued:
|
||||
// 1) with non-service-transaction
|
||||
// 2) with service transaction
|
||||
let sent_transactions: Vec<UnverifiedTransaction> = io.packets.iter()
|
||||
.filter_map(|p| {
|
||||
if p.packet_id != 0x02 || p.recipient != 1 { // TRANSACTIONS_PACKET
|
||||
return None;
|
||||
}
|
||||
|
||||
let rlp = Rlp::new(&*p.data);
|
||||
let item_count = rlp.item_count().unwrap_or(0);
|
||||
if item_count != 1 {
|
||||
return None;
|
||||
}
|
||||
|
||||
rlp.at(0).ok().and_then(|r| r.as_val().ok())
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(sent_transactions.len(), 2);
|
||||
assert!(sent_transactions.iter().any(|tx| tx.hash() == tx1_hash));
|
||||
assert!(sent_transactions.iter().any(|tx| tx.hash() == tx2_hash));
|
||||
}
|
||||
}
|
154
ethcore/sync/src/chain/requester.rs
Normal file
154
ethcore/sync/src/chain/requester.rs
Normal file
@ -0,0 +1,154 @@
|
||||
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use api::WARP_SYNC_PROTOCOL_ID;
|
||||
use block_sync::BlockRequest;
|
||||
use bytes::Bytes;
|
||||
use ethcore::header::BlockNumber;
|
||||
use ethereum_types::H256;
|
||||
use network::{PeerId, PacketId};
|
||||
use rlp::RlpStream;
|
||||
use std::time::Instant;
|
||||
use sync_io::SyncIo;
|
||||
|
||||
use super::{
|
||||
BlockSet,
|
||||
ChainSync,
|
||||
PeerAsking,
|
||||
ETH_PACKET_COUNT,
|
||||
GET_BLOCK_BODIES_PACKET,
|
||||
GET_BLOCK_HEADERS_PACKET,
|
||||
GET_RECEIPTS_PACKET,
|
||||
GET_SNAPSHOT_DATA_PACKET,
|
||||
GET_SNAPSHOT_MANIFEST_PACKET,
|
||||
};
|
||||
|
||||
/// The Chain Sync Requester: requesting data to other peers
|
||||
pub struct SyncRequester;
|
||||
|
||||
impl SyncRequester {
|
||||
/// Perform block download request`
|
||||
pub fn request_blocks(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, request: BlockRequest, block_set: BlockSet) {
|
||||
match request {
|
||||
BlockRequest::Headers { start, count, skip } => {
|
||||
SyncRequester::request_headers_by_hash(sync, io, peer_id, &start, count, skip, false, block_set);
|
||||
},
|
||||
BlockRequest::Bodies { hashes } => {
|
||||
SyncRequester::request_bodies(sync, io, peer_id, hashes, block_set);
|
||||
},
|
||||
BlockRequest::Receipts { hashes } => {
|
||||
SyncRequester::request_receipts(sync, io, peer_id, hashes, block_set);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Request block bodies from a peer
|
||||
fn request_bodies(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>, set: BlockSet) {
|
||||
let mut rlp = RlpStream::new_list(hashes.len());
|
||||
trace!(target: "sync", "{} <- GetBlockBodies: {} entries starting from {:?}, set = {:?}", peer_id, hashes.len(), hashes.first(), set);
|
||||
for h in &hashes {
|
||||
rlp.append(&h.clone());
|
||||
}
|
||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out());
|
||||
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
||||
peer.asking_blocks = hashes;
|
||||
peer.block_set = Some(set);
|
||||
}
|
||||
|
||||
/// Request headers from a peer by block number
|
||||
pub fn request_fork_header(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, n: BlockNumber) {
|
||||
trace!(target: "sync", "{} <- GetForkHeader: at {}", peer_id, n);
|
||||
let mut rlp = RlpStream::new_list(4);
|
||||
rlp.append(&n);
|
||||
rlp.append(&1u32);
|
||||
rlp.append(&0u32);
|
||||
rlp.append(&0u32);
|
||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
||||
}
|
||||
|
||||
/// Find some headers or blocks to download for a peer.
|
||||
pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
||||
// find chunk data to download
|
||||
if let Some(hash) = sync.snapshot.needed_chunk() {
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||
peer.asking_snapshot_data = Some(hash.clone());
|
||||
}
|
||||
SyncRequester::request_snapshot_chunk(sync, io, peer_id, &hash);
|
||||
}
|
||||
}
|
||||
|
||||
/// Request snapshot manifest from a peer.
|
||||
pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
||||
trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id);
|
||||
let rlp = RlpStream::new_list(0);
|
||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out());
|
||||
}
|
||||
|
||||
/// Request headers from a peer by block hash
|
||||
fn request_headers_by_hash(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, h: &H256, count: u64, skip: u64, reverse: bool, set: BlockSet) {
|
||||
trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}, set = {:?}", peer_id, count, h, set);
|
||||
let mut rlp = RlpStream::new_list(4);
|
||||
rlp.append(h);
|
||||
rlp.append(&count);
|
||||
rlp.append(&skip);
|
||||
rlp.append(&if reverse {1u32} else {0u32});
|
||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
||||
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
||||
peer.asking_hash = Some(h.clone());
|
||||
peer.block_set = Some(set);
|
||||
}
|
||||
|
||||
/// Request block receipts from a peer
|
||||
fn request_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>, set: BlockSet) {
|
||||
let mut rlp = RlpStream::new_list(hashes.len());
|
||||
trace!(target: "sync", "{} <- GetBlockReceipts: {} entries starting from {:?}, set = {:?}", peer_id, hashes.len(), hashes.first(), set);
|
||||
for h in &hashes {
|
||||
rlp.append(&h.clone());
|
||||
}
|
||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, GET_RECEIPTS_PACKET, rlp.out());
|
||||
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
||||
peer.asking_blocks = hashes;
|
||||
peer.block_set = Some(set);
|
||||
}
|
||||
|
||||
/// Request snapshot chunk from a peer.
|
||||
fn request_snapshot_chunk(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, chunk: &H256) {
|
||||
trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append(chunk);
|
||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, GET_SNAPSHOT_DATA_PACKET, rlp.out());
|
||||
}
|
||||
|
||||
/// Generic request sender
|
||||
fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
|
||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||
if peer.asking != PeerAsking::Nothing {
|
||||
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
||||
}
|
||||
peer.asking = asking;
|
||||
peer.ask_time = Instant::now();
|
||||
let result = if packet_id >= ETH_PACKET_COUNT {
|
||||
io.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
|
||||
} else {
|
||||
io.send(peer_id, packet_id, packet)
|
||||
};
|
||||
if let Err(e) = result {
|
||||
debug!(target:"sync", "Error sending request: {:?}", e);
|
||||
io.disconnect_peer(peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
446
ethcore/sync/src/chain/supplier.rs
Normal file
446
ethcore/sync/src/chain/supplier.rs
Normal file
@ -0,0 +1,446 @@
|
||||
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use bytes::Bytes;
|
||||
use ethcore::client::BlockId;
|
||||
use ethcore::header::BlockNumber;
|
||||
use ethereum_types::H256;
|
||||
use network::{self, PeerId};
|
||||
use parking_lot::RwLock;
|
||||
use rlp::{Rlp, RlpStream};
|
||||
use std::cmp;
|
||||
use sync_io::SyncIo;
|
||||
|
||||
use super::{
|
||||
ChainSync,
|
||||
RlpResponseResult,
|
||||
PacketDecodeError,
|
||||
BLOCK_BODIES_PACKET,
|
||||
BLOCK_HEADERS_PACKET,
|
||||
CONSENSUS_DATA_PACKET,
|
||||
GET_BLOCK_BODIES_PACKET,
|
||||
GET_BLOCK_HEADERS_PACKET,
|
||||
GET_NODE_DATA_PACKET,
|
||||
GET_RECEIPTS_PACKET,
|
||||
GET_SNAPSHOT_DATA_PACKET,
|
||||
GET_SNAPSHOT_MANIFEST_PACKET,
|
||||
MAX_BODIES_TO_SEND,
|
||||
MAX_HEADERS_TO_SEND,
|
||||
MAX_NODE_DATA_TO_SEND,
|
||||
MAX_RECEIPTS_HEADERS_TO_SEND,
|
||||
MAX_RECEIPTS_TO_SEND,
|
||||
NODE_DATA_PACKET,
|
||||
RECEIPTS_PACKET,
|
||||
SNAPSHOT_DATA_PACKET,
|
||||
SNAPSHOT_MANIFEST_PACKET,
|
||||
};
|
||||
|
||||
/// The Chain Sync Supplier: answers requests from peers with available data
|
||||
pub struct SyncSupplier;
|
||||
|
||||
impl SyncSupplier {
|
||||
/// Dispatch incoming requests and responses
|
||||
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
|
||||
let rlp = Rlp::new(data);
|
||||
let result = match packet_id {
|
||||
GET_BLOCK_BODIES_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
||||
SyncSupplier::return_block_bodies,
|
||||
|e| format!("Error sending block bodies: {:?}", e)),
|
||||
|
||||
GET_BLOCK_HEADERS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
||||
SyncSupplier::return_block_headers,
|
||||
|e| format!("Error sending block headers: {:?}", e)),
|
||||
|
||||
GET_RECEIPTS_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
||||
SyncSupplier::return_receipts,
|
||||
|e| format!("Error sending receipts: {:?}", e)),
|
||||
|
||||
GET_NODE_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
||||
SyncSupplier::return_node_data,
|
||||
|e| format!("Error sending nodes: {:?}", e)),
|
||||
|
||||
GET_SNAPSHOT_MANIFEST_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
||||
SyncSupplier::return_snapshot_manifest,
|
||||
|e| format!("Error sending snapshot manifest: {:?}", e)),
|
||||
|
||||
GET_SNAPSHOT_DATA_PACKET => SyncSupplier::return_rlp(io, &rlp, peer,
|
||||
SyncSupplier::return_snapshot_data,
|
||||
|e| format!("Error sending snapshot data: {:?}", e)),
|
||||
CONSENSUS_DATA_PACKET => ChainSync::on_consensus_packet(io, peer, &rlp),
|
||||
_ => {
|
||||
sync.write().on_packet(io, peer, packet_id, data);
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
result.unwrap_or_else(|e| {
|
||||
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
|
||||
})
|
||||
}
|
||||
|
||||
/// Respond to GetBlockHeaders request
|
||||
fn return_block_headers(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
// Packet layout:
|
||||
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
|
||||
let max_headers: usize = r.val_at(1)?;
|
||||
let skip: usize = r.val_at(2)?;
|
||||
let reverse: bool = r.val_at(3)?;
|
||||
let last = io.chain().chain_info().best_block_number;
|
||||
let number = if r.at(0)?.size() == 32 {
|
||||
// id is a hash
|
||||
let hash: H256 = r.val_at(0)?;
|
||||
trace!(target: "sync", "{} -> GetBlockHeaders (hash: {}, max: {}, skip: {}, reverse:{})", peer_id, hash, max_headers, skip, reverse);
|
||||
match io.chain().block_header(BlockId::Hash(hash)) {
|
||||
Some(hdr) => {
|
||||
let number = hdr.number().into();
|
||||
debug_assert_eq!(hdr.hash(), hash);
|
||||
|
||||
if max_headers == 1 || io.chain().block_hash(BlockId::Number(number)) != Some(hash) {
|
||||
// Non canonical header or single header requested
|
||||
// TODO: handle single-step reverse hashchains of non-canon hashes
|
||||
trace!(target:"sync", "Returning single header: {:?}", hash);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append_raw(&hdr.into_inner(), 1);
|
||||
return Ok(Some((BLOCK_HEADERS_PACKET, rlp)));
|
||||
}
|
||||
number
|
||||
}
|
||||
None => return Ok(Some((BLOCK_HEADERS_PACKET, RlpStream::new_list(0)))) //no such header, return nothing
|
||||
}
|
||||
} else {
|
||||
trace!(target: "sync", "{} -> GetBlockHeaders (number: {}, max: {}, skip: {}, reverse:{})", peer_id, r.val_at::<BlockNumber>(0)?, max_headers, skip, reverse);
|
||||
r.val_at(0)?
|
||||
};
|
||||
|
||||
let mut number = if reverse {
|
||||
cmp::min(last, number)
|
||||
} else {
|
||||
cmp::max(0, number)
|
||||
};
|
||||
let max_count = cmp::min(MAX_HEADERS_TO_SEND, max_headers);
|
||||
let mut count = 0;
|
||||
let mut data = Bytes::new();
|
||||
let inc = (skip + 1) as BlockNumber;
|
||||
let overlay = io.chain_overlay().read();
|
||||
|
||||
while number <= last && count < max_count {
|
||||
if let Some(hdr) = overlay.get(&number) {
|
||||
trace!(target: "sync", "{}: Returning cached fork header", peer_id);
|
||||
data.extend_from_slice(hdr);
|
||||
count += 1;
|
||||
} else if let Some(hdr) = io.chain().block_header(BlockId::Number(number)) {
|
||||
data.append(&mut hdr.into_inner());
|
||||
count += 1;
|
||||
} else {
|
||||
// No required block.
|
||||
break;
|
||||
}
|
||||
if reverse {
|
||||
if number <= inc || number == 0 {
|
||||
break;
|
||||
}
|
||||
number -= inc;
|
||||
}
|
||||
else {
|
||||
number += inc;
|
||||
}
|
||||
}
|
||||
let mut rlp = RlpStream::new_list(count as usize);
|
||||
rlp.append_raw(&data, count as usize);
|
||||
trace!(target: "sync", "{} -> GetBlockHeaders: returned {} entries", peer_id, count);
|
||||
Ok(Some((BLOCK_HEADERS_PACKET, rlp)))
|
||||
}
|
||||
|
||||
/// Respond to GetBlockBodies request
|
||||
fn return_block_bodies(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let mut count = r.item_count().unwrap_or(0);
|
||||
if count == 0 {
|
||||
debug!(target: "sync", "Empty GetBlockBodies request, ignoring.");
|
||||
return Ok(None);
|
||||
}
|
||||
count = cmp::min(count, MAX_BODIES_TO_SEND);
|
||||
let mut added = 0usize;
|
||||
let mut data = Bytes::new();
|
||||
for i in 0..count {
|
||||
if let Some(body) = io.chain().block_body(BlockId::Hash(r.val_at::<H256>(i)?)) {
|
||||
data.append(&mut body.into_inner());
|
||||
added += 1;
|
||||
}
|
||||
}
|
||||
let mut rlp = RlpStream::new_list(added);
|
||||
rlp.append_raw(&data, added);
|
||||
trace!(target: "sync", "{} -> GetBlockBodies: returned {} entries", peer_id, added);
|
||||
Ok(Some((BLOCK_BODIES_PACKET, rlp)))
|
||||
}
|
||||
|
||||
/// Respond to GetNodeData request
|
||||
fn return_node_data(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let mut count = r.item_count().unwrap_or(0);
|
||||
trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count);
|
||||
if count == 0 {
|
||||
debug!(target: "sync", "Empty GetNodeData request, ignoring.");
|
||||
return Ok(None);
|
||||
}
|
||||
count = cmp::min(count, MAX_NODE_DATA_TO_SEND);
|
||||
let mut added = 0usize;
|
||||
let mut data = Vec::new();
|
||||
for i in 0..count {
|
||||
if let Some(node) = io.chain().state_data(&r.val_at::<H256>(i)?) {
|
||||
data.push(node);
|
||||
added += 1;
|
||||
}
|
||||
}
|
||||
trace!(target: "sync", "{} -> GetNodeData: return {} entries", peer_id, added);
|
||||
let mut rlp = RlpStream::new_list(added);
|
||||
for d in data {
|
||||
rlp.append(&d);
|
||||
}
|
||||
Ok(Some((NODE_DATA_PACKET, rlp)))
|
||||
}
|
||||
|
||||
fn return_receipts(io: &SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let mut count = rlp.item_count().unwrap_or(0);
|
||||
trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count);
|
||||
if count == 0 {
|
||||
debug!(target: "sync", "Empty GetReceipts request, ignoring.");
|
||||
return Ok(None);
|
||||
}
|
||||
count = cmp::min(count, MAX_RECEIPTS_HEADERS_TO_SEND);
|
||||
let mut added_headers = 0usize;
|
||||
let mut added_receipts = 0usize;
|
||||
let mut data = Bytes::new();
|
||||
for i in 0..count {
|
||||
if let Some(mut receipts_bytes) = io.chain().block_receipts(&rlp.val_at::<H256>(i)?) {
|
||||
data.append(&mut receipts_bytes);
|
||||
added_receipts += receipts_bytes.len();
|
||||
added_headers += 1;
|
||||
if added_receipts > MAX_RECEIPTS_TO_SEND { break; }
|
||||
}
|
||||
}
|
||||
let mut rlp_result = RlpStream::new_list(added_headers);
|
||||
rlp_result.append_raw(&data, added_headers);
|
||||
Ok(Some((RECEIPTS_PACKET, rlp_result)))
|
||||
}
|
||||
|
||||
/// Respond to GetSnapshotManifest request
|
||||
fn return_snapshot_manifest(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let count = r.item_count().unwrap_or(0);
|
||||
trace!(target: "sync", "{} -> GetSnapshotManifest", peer_id);
|
||||
if count != 0 {
|
||||
debug!(target: "sync", "Invalid GetSnapshotManifest request, ignoring.");
|
||||
return Ok(None);
|
||||
}
|
||||
let rlp = match io.snapshot_service().manifest() {
|
||||
Some(manifest) => {
|
||||
trace!(target: "sync", "{} <- SnapshotManifest", peer_id);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append_raw(&manifest.into_rlp(), 1);
|
||||
rlp
|
||||
},
|
||||
None => {
|
||||
trace!(target: "sync", "{}: No manifest to return", peer_id);
|
||||
RlpStream::new_list(0)
|
||||
}
|
||||
};
|
||||
Ok(Some((SNAPSHOT_MANIFEST_PACKET, rlp)))
|
||||
}
|
||||
|
||||
/// Respond to GetSnapshotData request
|
||||
fn return_snapshot_data(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let hash: H256 = r.val_at(0)?;
|
||||
trace!(target: "sync", "{} -> GetSnapshotData {:?}", peer_id, hash);
|
||||
let rlp = match io.snapshot_service().chunk(hash) {
|
||||
Some(data) => {
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
trace!(target: "sync", "{} <- SnapshotData", peer_id);
|
||||
rlp.append(&data);
|
||||
rlp
|
||||
},
|
||||
None => {
|
||||
RlpStream::new_list(0)
|
||||
}
|
||||
};
|
||||
Ok(Some((SNAPSHOT_DATA_PACKET, rlp)))
|
||||
}
|
||||
|
||||
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &Rlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||
where FRlp : Fn(&SyncIo, &Rlp, PeerId) -> RlpResponseResult,
|
||||
FError : FnOnce(network::Error) -> String
|
||||
{
|
||||
let response = rlp_func(io, rlp, peer);
|
||||
match response {
|
||||
Err(e) => Err(e),
|
||||
Ok(Some((packet_id, rlp_stream))) => {
|
||||
io.respond(packet_id, rlp_stream.out()).unwrap_or_else(
|
||||
|e| debug!(target: "sync", "{:?}", error_func(e)));
|
||||
Ok(())
|
||||
}
|
||||
_ => Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::{VecDeque};
|
||||
use tests::helpers::{TestIo};
|
||||
use tests::snapshot::TestSnapshotService;
|
||||
use ethereum_types::{H256};
|
||||
use parking_lot::RwLock;
|
||||
use bytes::Bytes;
|
||||
use rlp::{Rlp, RlpStream};
|
||||
use super::{*, super::tests::*};
|
||||
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient};
|
||||
|
||||
#[test]
|
||||
fn return_block_headers() {
|
||||
use ethcore::views::HeaderView;
|
||||
fn make_hash_req(h: &H256, count: usize, skip: usize, reverse: bool) -> Bytes {
|
||||
let mut rlp = RlpStream::new_list(4);
|
||||
rlp.append(h);
|
||||
rlp.append(&count);
|
||||
rlp.append(&skip);
|
||||
rlp.append(&if reverse {1u32} else {0u32});
|
||||
rlp.out()
|
||||
}
|
||||
|
||||
fn make_num_req(n: usize, count: usize, skip: usize, reverse: bool) -> Bytes {
|
||||
let mut rlp = RlpStream::new_list(4);
|
||||
rlp.append(&n);
|
||||
rlp.append(&count);
|
||||
rlp.append(&skip);
|
||||
rlp.append(&if reverse {1u32} else {0u32});
|
||||
rlp.out()
|
||||
}
|
||||
fn to_header_vec(rlp: ::chain::RlpResponseResult) -> Vec<Bytes> {
|
||||
Rlp::new(&rlp.unwrap().unwrap().1.out()).iter().map(|r| r.as_raw().to_vec()).collect()
|
||||
}
|
||||
|
||||
let mut client = TestBlockChainClient::new();
|
||||
client.add_blocks(100, EachBlockWith::Nothing);
|
||||
let blocks: Vec<_> = (0 .. 100)
|
||||
.map(|i| (&client as &BlockChainClient).block(BlockId::Number(i as BlockNumber)).map(|b| b.into_inner()).unwrap()).collect();
|
||||
let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).unwrap().as_raw().to_vec()).collect();
|
||||
let hashes: Vec<_> = headers.iter().map(|h| view!(HeaderView, h).hash()).collect();
|
||||
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let unknown: H256 = H256::new();
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_hash_req(&unknown, 1, 0, false)), 0);
|
||||
assert!(to_header_vec(result).is_empty());
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_hash_req(&unknown, 1, 0, true)), 0);
|
||||
assert!(to_header_vec(result).is_empty());
|
||||
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_hash_req(&hashes[2], 1, 0, true)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
|
||||
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_hash_req(&hashes[2], 1, 0, false)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
|
||||
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_hash_req(&hashes[50], 3, 5, false)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]);
|
||||
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_hash_req(&hashes[50], 3, 5, true)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]);
|
||||
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_num_req(2, 1, 0, true)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
|
||||
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_num_req(2, 1, 0, false)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[2].clone()]);
|
||||
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_num_req(50, 3, 5, false)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[56].clone(), headers[62].clone()]);
|
||||
|
||||
let result = SyncSupplier::return_block_headers(&io, &Rlp::new(&make_num_req(50, 3, 5, true)), 0);
|
||||
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn return_nodes() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let mut node_list = RlpStream::new_list(3);
|
||||
node_list.append(&H256::from("0000000000000000000000000000000000000000000000005555555555555555"));
|
||||
node_list.append(&H256::from("ffffffffffffffffffffffffffffffffffffffffffffaaaaaaaaaaaaaaaaaaaa"));
|
||||
node_list.append(&H256::from("aff0000000000000000000000000000000000000000000000000000000000000"));
|
||||
|
||||
let node_request = node_list.out();
|
||||
// it returns rlp ONLY for hashes started with "f"
|
||||
let result = SyncSupplier::return_node_data(&io, &Rlp::new(&node_request.clone()), 0);
|
||||
|
||||
assert!(result.is_ok());
|
||||
let rlp_result = result.unwrap();
|
||||
assert!(rlp_result.is_some());
|
||||
|
||||
// the length of one rlp-encoded hashe
|
||||
let rlp = rlp_result.unwrap().1.out();
|
||||
let rlp = Rlp::new(&rlp);
|
||||
assert_eq!(Ok(1), rlp.item_count());
|
||||
|
||||
io.sender = Some(2usize);
|
||||
|
||||
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_NODE_DATA_PACKET, &node_request);
|
||||
assert_eq!(1, io.packets.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn return_receipts_empty() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let ss = TestSnapshotService::new();
|
||||
let io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let result = SyncSupplier::return_receipts(&io, &Rlp::new(&[0xc0]), 0);
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn return_receipts() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &ss, &queue, None);
|
||||
|
||||
let mut receipt_list = RlpStream::new_list(4);
|
||||
receipt_list.append(&H256::from("0000000000000000000000000000000000000000000000005555555555555555"));
|
||||
receipt_list.append(&H256::from("ff00000000000000000000000000000000000000000000000000000000000000"));
|
||||
receipt_list.append(&H256::from("fff0000000000000000000000000000000000000000000000000000000000000"));
|
||||
receipt_list.append(&H256::from("aff0000000000000000000000000000000000000000000000000000000000000"));
|
||||
|
||||
let receipts_request = receipt_list.out();
|
||||
// it returns rlp ONLY for hashes started with "f"
|
||||
let result = SyncSupplier::return_receipts(&io, &Rlp::new(&receipts_request.clone()), 0);
|
||||
|
||||
assert!(result.is_ok());
|
||||
let rlp_result = result.unwrap();
|
||||
assert!(rlp_result.is_some());
|
||||
|
||||
// the length of two rlp-encoded receipts
|
||||
assert_eq!(603, rlp_result.unwrap().1.out().len());
|
||||
|
||||
io.sender = Some(2usize);
|
||||
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, GET_RECEIPTS_PACKET, &receipts_request);
|
||||
assert_eq!(1, io.packets.len());
|
||||
}
|
||||
}
|
@ -22,7 +22,7 @@ use parking_lot::Mutex;
|
||||
use bytes::Bytes;
|
||||
use ethcore::snapshot::{SnapshotService, ManifestData, RestorationStatus};
|
||||
use ethcore::header::BlockNumber;
|
||||
use ethcore::client::{EachBlockWith};
|
||||
use ethcore::client::EachBlockWith;
|
||||
use super::helpers::*;
|
||||
use {SyncConfig, WarpSync};
|
||||
|
||||
@ -99,7 +99,15 @@ impl SnapshotService for TestSnapshotService {
|
||||
}
|
||||
|
||||
fn begin_restore(&self, manifest: ManifestData) {
|
||||
*self.restoration_manifest.lock() = Some(manifest);
|
||||
let mut restoration_manifest = self.restoration_manifest.lock();
|
||||
|
||||
if let Some(ref c_manifest) = *restoration_manifest {
|
||||
if c_manifest.state_root == manifest.state_root {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
*restoration_manifest = Some(manifest);
|
||||
self.state_restoration_chunks.lock().clear();
|
||||
self.block_restoration_chunks.lock().clear();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user