Make specification of protocol in SyncRequester::send_request explicit (#10295)
* Make the specification of the protocol to which a packet_id belongs to explicit when calling "SyncRequester::send_packet". * Remove "SyncIO::send" and leave only "SyncIO::send_protocol" * Adapt tests to new code. * Strengthen tests to check if packet_id and protocol match when sending a devp2p packet.
This commit is contained in:
parent
6fa4b2dec5
commit
046b8bbc8a
@ -577,9 +577,9 @@ impl ChainNotify for EthSync {
|
|||||||
match message_type {
|
match message_type {
|
||||||
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message),
|
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message),
|
||||||
ChainMessageType::PrivateTransaction(transaction_hash, message) =>
|
ChainMessageType::PrivateTransaction(transaction_hash, message) =>
|
||||||
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PRIVATE_TRANSACTION_PACKET, message),
|
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, message),
|
||||||
ChainMessageType::SignedPrivateTransaction(transaction_hash, message) =>
|
ChainMessageType::SignedPrivateTransaction(transaction_hash, message) =>
|
||||||
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, message),
|
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, message),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ use fastmap::{H256FastMap, H256FastSet};
|
|||||||
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
|
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use rlp::{RlpStream, DecoderError};
|
use rlp::{RlpStream, DecoderError};
|
||||||
use network::{self, PeerId, PacketId};
|
use network::{self, PeerId, PacketId, ProtocolId};
|
||||||
use network::client_version::ClientVersion;
|
use network::client_version::ClientVersion;
|
||||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
|
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
|
||||||
use ethcore::snapshot::{RestorationStatus};
|
use ethcore::snapshot::{RestorationStatus};
|
||||||
@ -112,7 +112,7 @@ use super::{WarpSync, SyncConfig};
|
|||||||
use block_sync::{BlockDownloader, DownloadAction};
|
use block_sync::{BlockDownloader, DownloadAction};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use snapshot::{Snapshot};
|
use snapshot::{Snapshot};
|
||||||
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask};
|
use api::{EthProtocolInfo as PeerInfoDigest, ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID, PriorityTask};
|
||||||
use private_tx::PrivateTxHandler;
|
use private_tx::PrivateTxHandler;
|
||||||
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
|
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
|
||||||
use types::transaction::UnverifiedTransaction;
|
use types::transaction::UnverifiedTransaction;
|
||||||
@ -154,7 +154,7 @@ const MAX_TRANSACTION_PACKET_SIZE: usize = 5 * 1024 * 1024;
|
|||||||
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000;
|
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000;
|
||||||
const SNAPSHOT_MIN_PEERS: usize = 3;
|
const SNAPSHOT_MIN_PEERS: usize = 3;
|
||||||
|
|
||||||
const STATUS_PACKET: u8 = 0x00;
|
pub const STATUS_PACKET: u8 = 0x00;
|
||||||
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
||||||
const TRANSACTIONS_PACKET: u8 = 0x02;
|
const TRANSACTIONS_PACKET: u8 = 0x02;
|
||||||
pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03;
|
pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03;
|
||||||
@ -484,7 +484,7 @@ impl ChainSyncApi {
|
|||||||
for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
|
for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
|
||||||
check_deadline(deadline)?;
|
check_deadline(deadline)?;
|
||||||
for peer in peers {
|
for peer in peers {
|
||||||
SyncPropagator::send_packet(io, *peer, NEW_BLOCK_PACKET, rlp.clone());
|
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer, NEW_BLOCK_PACKET, rlp.clone());
|
||||||
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
|
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
|
||||||
peer.latest_hash = hash;
|
peer.latest_hash = hash;
|
||||||
}
|
}
|
||||||
@ -1331,8 +1331,8 @@ impl ChainSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Broadcast private transaction message to peers.
|
/// Broadcast private transaction message to peers.
|
||||||
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) {
|
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
|
||||||
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet);
|
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, protocol, packet_id, packet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,10 +17,11 @@
|
|||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use fastmap::H256FastSet;
|
use fastmap::H256FastSet;
|
||||||
use network::{PeerId, PacketId};
|
use network::{PeerId, PacketId, ProtocolId};
|
||||||
use network::client_version::ClientCapabilities;
|
use network::client_version::ClientCapabilities;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use rlp::{Encodable, RlpStream};
|
use rlp::{Encodable, RlpStream};
|
||||||
@ -42,7 +43,6 @@ use super::{
|
|||||||
TRANSACTIONS_PACKET,
|
TRANSACTIONS_PACKET,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/// The Chain Sync Propagator: propagates data to peers
|
/// The Chain Sync Propagator: propagates data to peers
|
||||||
pub struct SyncPropagator;
|
pub struct SyncPropagator;
|
||||||
|
|
||||||
@ -53,7 +53,7 @@ impl SyncPropagator {
|
|||||||
let sent = peers.len();
|
let sent = peers.len();
|
||||||
let mut send_packet = |io: &mut SyncIo, rlp: Bytes| {
|
let mut send_packet = |io: &mut SyncIo, rlp: Bytes| {
|
||||||
for peer_id in peers {
|
for peer_id in peers {
|
||||||
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
|
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
|
||||||
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
||||||
peer.latest_hash = chain_info.best_block_hash.clone();
|
peer.latest_hash = chain_info.best_block_hash.clone();
|
||||||
}
|
}
|
||||||
@ -88,7 +88,7 @@ impl SyncPropagator {
|
|||||||
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
|
||||||
peer.latest_hash = best_block_hash;
|
peer.latest_hash = best_block_hash;
|
||||||
}
|
}
|
||||||
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone());
|
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone());
|
||||||
}
|
}
|
||||||
sent
|
sent
|
||||||
}
|
}
|
||||||
@ -156,7 +156,7 @@ impl SyncPropagator {
|
|||||||
|
|
||||||
let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
|
let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
|
||||||
let size = rlp.len();
|
let size = rlp.len();
|
||||||
SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
|
SyncPropagator::send_packet(io, ETH_PROTOCOL, peer_id, TRANSACTIONS_PACKET, rlp);
|
||||||
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
|
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -275,7 +275,7 @@ impl SyncPropagator {
|
|||||||
io.chain().chain_info().total_difficulty
|
io.chain().chain_info().total_difficulty
|
||||||
);
|
);
|
||||||
for peer_id in &peers {
|
for peer_id in &peers {
|
||||||
SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
|
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -285,12 +285,12 @@ impl SyncPropagator {
|
|||||||
let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers());
|
let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers());
|
||||||
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
|
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
|
||||||
for peer_id in lucky_peers {
|
for peer_id in lucky_peers {
|
||||||
SyncPropagator::send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone());
|
SyncPropagator::send_packet(io, WARP_SYNC_PROTOCOL_ID, peer_id, CONSENSUS_DATA_PACKET, packet.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Broadcast private transaction message to peers.
|
/// Broadcast private transaction message to peers.
|
||||||
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) {
|
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
|
||||||
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
|
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
|
||||||
if lucky_peers.is_empty() {
|
if lucky_peers.is_empty() {
|
||||||
error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected");
|
error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected");
|
||||||
@ -300,7 +300,7 @@ impl SyncPropagator {
|
|||||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||||
peer.last_sent_private_transactions.insert(transaction_hash);
|
peer.last_sent_private_transactions.insert(transaction_hash);
|
||||||
}
|
}
|
||||||
SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone());
|
SyncPropagator::send_packet(io, protocol, peer_id, packet_id, packet.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -321,8 +321,8 @@ impl SyncPropagator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Generic packet sender
|
/// Generic packet sender
|
||||||
pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
|
pub fn send_packet(sync: &mut SyncIo, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
|
||||||
if let Err(e) = sync.send(peer_id, packet_id, packet) {
|
if let Err(e) = sync.send_protocol(protocol, peer_id, packet_id, packet) {
|
||||||
debug!(target:"sync", "Error sending packet: {:?}", e);
|
debug!(target:"sync", "Error sending packet: {:?}", e);
|
||||||
sync.disconnect_peer(peer_id);
|
sync.disconnect_peer(peer_id);
|
||||||
}
|
}
|
||||||
|
@ -14,11 +14,11 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use api::WARP_SYNC_PROTOCOL_ID;
|
use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID};
|
||||||
use block_sync::BlockRequest;
|
use block_sync::BlockRequest;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use network::{PeerId, PacketId};
|
use network::{PeerId, PacketId, ProtocolId};
|
||||||
use rlp::RlpStream;
|
use rlp::RlpStream;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use sync_io::SyncIo;
|
use sync_io::SyncIo;
|
||||||
@ -28,7 +28,6 @@ use super::{
|
|||||||
BlockSet,
|
BlockSet,
|
||||||
ChainSync,
|
ChainSync,
|
||||||
PeerAsking,
|
PeerAsking,
|
||||||
ETH_PROTOCOL_VERSION_63,
|
|
||||||
GET_BLOCK_BODIES_PACKET,
|
GET_BLOCK_BODIES_PACKET,
|
||||||
GET_BLOCK_HEADERS_PACKET,
|
GET_BLOCK_HEADERS_PACKET,
|
||||||
GET_RECEIPTS_PACKET,
|
GET_RECEIPTS_PACKET,
|
||||||
@ -63,7 +62,8 @@ impl SyncRequester {
|
|||||||
rlp.append(&h.clone());
|
rlp.append(&h.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, ETH_PROTOCOL, GET_BLOCK_BODIES_PACKET, rlp.out());
|
||||||
|
|
||||||
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
||||||
peer.asking_blocks = hashes;
|
peer.asking_blocks = hashes;
|
||||||
peer.block_set = Some(set);
|
peer.block_set = Some(set);
|
||||||
@ -77,7 +77,7 @@ impl SyncRequester {
|
|||||||
rlp.append(&1u32);
|
rlp.append(&1u32);
|
||||||
rlp.append(&0u32);
|
rlp.append(&0u32);
|
||||||
rlp.append(&0u32);
|
rlp.append(&0u32);
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Find some headers or blocks to download for a peer.
|
/// Find some headers or blocks to download for a peer.
|
||||||
@ -95,7 +95,7 @@ impl SyncRequester {
|
|||||||
pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
|
||||||
trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id);
|
trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id);
|
||||||
let rlp = RlpStream::new_list(0);
|
let rlp = RlpStream::new_list(0);
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Request headers from a peer by block hash
|
/// Request headers from a peer by block hash
|
||||||
@ -106,7 +106,7 @@ impl SyncRequester {
|
|||||||
rlp.append(&count);
|
rlp.append(&count);
|
||||||
rlp.append(&skip);
|
rlp.append(&skip);
|
||||||
rlp.append(&if reverse {1u32} else {0u32});
|
rlp.append(&if reverse {1u32} else {0u32});
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
||||||
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
||||||
peer.asking_hash = Some(h.clone());
|
peer.asking_hash = Some(h.clone());
|
||||||
peer.block_set = Some(set);
|
peer.block_set = Some(set);
|
||||||
@ -119,7 +119,7 @@ impl SyncRequester {
|
|||||||
for h in &hashes {
|
for h in &hashes {
|
||||||
rlp.append(&h.clone());
|
rlp.append(&h.clone());
|
||||||
}
|
}
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, GET_RECEIPTS_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, ETH_PROTOCOL, GET_RECEIPTS_PACKET, rlp.out());
|
||||||
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
|
||||||
peer.asking_blocks = hashes;
|
peer.asking_blocks = hashes;
|
||||||
peer.block_set = Some(set);
|
peer.block_set = Some(set);
|
||||||
@ -130,23 +130,20 @@ impl SyncRequester {
|
|||||||
trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk);
|
trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk);
|
||||||
let mut rlp = RlpStream::new_list(1);
|
let mut rlp = RlpStream::new_list(1);
|
||||||
rlp.append(chunk);
|
rlp.append(chunk);
|
||||||
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, GET_SNAPSHOT_DATA_PACKET, rlp.out());
|
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_DATA_PACKET, rlp.out());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generic request sender
|
/// Generic request sender
|
||||||
fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) {
|
fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
|
||||||
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
|
||||||
if peer.asking != PeerAsking::Nothing {
|
if peer.asking != PeerAsking::Nothing {
|
||||||
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
||||||
}
|
}
|
||||||
peer.asking = asking;
|
peer.asking = asking;
|
||||||
peer.ask_time = Instant::now();
|
peer.ask_time = Instant::now();
|
||||||
// TODO [ToDr] This seems quite fragile. Be careful when protocol is updated.
|
|
||||||
let result = if packet_id >= ETH_PROTOCOL_VERSION_63.1 {
|
let result = io.send_protocol(protocol, peer_id, packet_id, packet);
|
||||||
io.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
|
|
||||||
} else {
|
|
||||||
io.send(peer_id, packet_id, packet)
|
|
||||||
};
|
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
debug!(target:"sync", "Error sending request: {:?}", e);
|
debug!(target:"sync", "Error sending request: {:?}", e);
|
||||||
io.disconnect_peer(peer_id);
|
io.disconnect_peer(peer_id);
|
||||||
|
@ -33,8 +33,6 @@ pub trait SyncIo {
|
|||||||
fn disconnect_peer(&mut self, peer_id: PeerId);
|
fn disconnect_peer(&mut self, peer_id: PeerId);
|
||||||
/// Respond to current request with a packet. Can be called from an IO handler for incoming packet.
|
/// Respond to current request with a packet. Can be called from an IO handler for incoming packet.
|
||||||
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
|
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
|
||||||
/// Send a packet to a peer.
|
|
||||||
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
|
|
||||||
/// Send a packet to a peer using specified protocol.
|
/// Send a packet to a peer using specified protocol.
|
||||||
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
|
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>;
|
||||||
/// Get the blockchain
|
/// Get the blockchain
|
||||||
@ -99,10 +97,6 @@ impl<'s> SyncIo for NetSyncIo<'s> {
|
|||||||
self.network.respond(packet_id, data)
|
self.network.respond(packet_id, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>{
|
|
||||||
self.network.send(peer_id, packet_id, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>{
|
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), Error>{
|
||||||
self.network.send_protocol(protocol, peer_id, packet_id, data)
|
self.network.send_protocol(protocol, peer_id, packet_id, data)
|
||||||
}
|
}
|
||||||
|
@ -30,8 +30,8 @@ use ethcore::miner::Miner;
|
|||||||
use ethcore::test_helpers;
|
use ethcore::test_helpers;
|
||||||
use sync_io::SyncIo;
|
use sync_io::SyncIo;
|
||||||
use io::{IoChannel, IoContext, IoHandler};
|
use io::{IoChannel, IoContext, IoHandler};
|
||||||
use api::WARP_SYNC_PROTOCOL_ID;
|
use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID};
|
||||||
use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET, SyncSupplier};
|
use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SyncSupplier, STATUS_PACKET, RECEIPTS_PACKET, GET_SNAPSHOT_MANIFEST_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
|
||||||
use SyncConfig;
|
use SyncConfig;
|
||||||
use private_tx::SimplePrivateTxHandler;
|
use private_tx::SimplePrivateTxHandler;
|
||||||
use types::BlockNumber;
|
use types::BlockNumber;
|
||||||
@ -80,6 +80,16 @@ impl<'p, C> Drop for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn assert_packet_id_matches_protocol(protocol: &ProtocolId, packet_id: &PacketId) {
|
||||||
|
match packet_id {
|
||||||
|
STATUS_PACKET ... RECEIPTS_PACKET => assert_eq!(*protocol, ETH_PROTOCOL),
|
||||||
|
GET_SNAPSHOT_MANIFEST_PACKET ... SIGNED_PRIVATE_TRANSACTION_PACKET => assert_eq!(*protocol, WARP_SYNC_PROTOCOL_ID),
|
||||||
|
// What about light?
|
||||||
|
_ => assert!(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
||||||
fn disable_peer(&mut self, peer_id: PeerId) {
|
fn disable_peer(&mut self, peer_id: PeerId) {
|
||||||
self.disconnect_peer(peer_id);
|
self.disconnect_peer(peer_id);
|
||||||
@ -102,7 +112,9 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), network::Error> {
|
fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), network::Error> {
|
||||||
|
assert_packet_id_matches_protocol(&protocol, &packet_id);
|
||||||
|
|
||||||
self.packets.push(TestPacket {
|
self.packets.push(TestPacket {
|
||||||
data: data,
|
data: data,
|
||||||
packet_id: packet_id,
|
packet_id: packet_id,
|
||||||
@ -111,10 +123,6 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), network::Error> {
|
|
||||||
self.send(peer_id, packet_id, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn chain(&self) -> &BlockChainClient {
|
fn chain(&self) -> &BlockChainClient {
|
||||||
&*self.chain
|
&*self.chain
|
||||||
}
|
}
|
||||||
@ -236,9 +244,9 @@ impl<C> EthPeer<C> where C: FlushingBlockChainClient {
|
|||||||
match message {
|
match message {
|
||||||
ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data),
|
ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data),
|
||||||
ChainMessageType::PrivateTransaction(transaction_hash, data) =>
|
ChainMessageType::PrivateTransaction(transaction_hash, data) =>
|
||||||
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PRIVATE_TRANSACTION_PACKET, data),
|
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, data),
|
||||||
ChainMessageType::SignedPrivateTransaction(transaction_hash, data) =>
|
ChainMessageType::SignedPrivateTransaction(transaction_hash, data) =>
|
||||||
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, data),
|
self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, data),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user