diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 9374b3ff2..24f6a718a 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -577,9 +577,9 @@ impl ChainNotify for EthSync { match message_type { ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message), ChainMessageType::PrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PRIVATE_TRANSACTION_PACKET, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, message), ChainMessageType::SignedPrivateTransaction(transaction_hash, message) => - self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, message), + self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, message), } }); } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 5a144853d..123f2b9c8 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -103,7 +103,7 @@ use fastmap::{H256FastMap, H256FastSet}; use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use bytes::Bytes; use rlp::{RlpStream, DecoderError}; -use network::{self, PeerId, PacketId}; +use network::{self, PeerId, PacketId, ProtocolId}; use network::client_version::ClientVersion; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo}; use ethcore::snapshot::{RestorationStatus}; @@ -112,7 +112,7 @@ use super::{WarpSync, SyncConfig}; use block_sync::{BlockDownloader, DownloadAction}; use rand::Rng; use snapshot::{Snapshot}; -use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask}; +use api::{EthProtocolInfo as PeerInfoDigest, ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID, PriorityTask}; use private_tx::PrivateTxHandler; use transactions_stats::{TransactionsStats, Stats as TransactionStats}; use types::transaction::UnverifiedTransaction; @@ -154,7 +154,7 @@ const MAX_TRANSACTION_PACKET_SIZE: usize = 5 * 1024 * 1024; const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000; const SNAPSHOT_MIN_PEERS: usize = 3; -const STATUS_PACKET: u8 = 0x00; +pub const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; const TRANSACTIONS_PACKET: u8 = 0x02; pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03; @@ -484,7 +484,7 @@ impl ChainSyncApi { for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) { check_deadline(deadline)?; for peer in peers { - SyncPropagator::send_packet(io, *peer, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer, NEW_BLOCK_PACKET, rlp.clone()); if let Some(ref mut peer) = sync.peers.get_mut(peer) { peer.latest_hash = hash; } @@ -1331,8 +1331,8 @@ impl ChainSync { } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { - SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet); + pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) { + SyncPropagator::propagate_private_transaction(self, io, transaction_hash, protocol, packet_id, packet); } } diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 9d50fafc1..77035387e 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -17,10 +17,11 @@ use std::cmp; use std::collections::HashSet; +use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; use bytes::Bytes; use ethereum_types::H256; use fastmap::H256FastSet; -use network::{PeerId, PacketId}; +use network::{PeerId, PacketId, ProtocolId}; use network::client_version::ClientCapabilities; use rand::Rng; use rlp::{Encodable, RlpStream}; @@ -42,7 +43,6 @@ use super::{ TRANSACTIONS_PACKET, }; - /// The Chain Sync Propagator: propagates data to peers pub struct SyncPropagator; @@ -53,7 +53,7 @@ impl SyncPropagator { let sent = peers.len(); let mut send_packet = |io: &mut SyncIo, rlp: Bytes| { for peer_id in peers { - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { peer.latest_hash = chain_info.best_block_hash.clone(); } @@ -88,7 +88,7 @@ impl SyncPropagator { if let Some(ref mut peer) = sync.peers.get_mut(peer_id) { peer.latest_hash = best_block_hash; } - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone()); } sent } @@ -156,7 +156,7 @@ impl SyncPropagator { let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| { let size = rlp.len(); - SyncPropagator::send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp); + SyncPropagator::send_packet(io, ETH_PROTOCOL, peer_id, TRANSACTIONS_PACKET, rlp); trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size); }; @@ -275,7 +275,7 @@ impl SyncPropagator { io.chain().chain_info().total_difficulty ); for peer_id in &peers { - SyncPropagator::send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); + SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone()); } } } @@ -285,12 +285,12 @@ impl SyncPropagator { let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers()); trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers); for peer_id in lucky_peers { - SyncPropagator::send_packet(io, peer_id, CONSENSUS_DATA_PACKET, packet.clone()); + SyncPropagator::send_packet(io, WARP_SYNC_PROTOCOL_ID, peer_id, CONSENSUS_DATA_PACKET, packet.clone()); } } /// Broadcast private transaction message to peers. - pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: PacketId, packet: Bytes) { + pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) { let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash)); if lucky_peers.is_empty() { error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected"); @@ -300,7 +300,7 @@ impl SyncPropagator { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { peer.last_sent_private_transactions.insert(transaction_hash); } - SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone()); + SyncPropagator::send_packet(io, protocol, peer_id, packet_id, packet.clone()); } } } @@ -321,8 +321,8 @@ impl SyncPropagator { } /// Generic packet sender - pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { - if let Err(e) = sync.send(peer_id, packet_id, packet) { + pub fn send_packet(sync: &mut SyncIo, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { + if let Err(e) = sync.send_protocol(protocol, peer_id, packet_id, packet) { debug!(target:"sync", "Error sending packet: {:?}", e); sync.disconnect_peer(peer_id); } diff --git a/ethcore/sync/src/chain/requester.rs b/ethcore/sync/src/chain/requester.rs index 09eec748e..85f622ec8 100644 --- a/ethcore/sync/src/chain/requester.rs +++ b/ethcore/sync/src/chain/requester.rs @@ -14,11 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Parity Ethereum. If not, see . -use api::WARP_SYNC_PROTOCOL_ID; +use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; use block_sync::BlockRequest; use bytes::Bytes; use ethereum_types::H256; -use network::{PeerId, PacketId}; +use network::{PeerId, PacketId, ProtocolId}; use rlp::RlpStream; use std::time::Instant; use sync_io::SyncIo; @@ -28,7 +28,6 @@ use super::{ BlockSet, ChainSync, PeerAsking, - ETH_PROTOCOL_VERSION_63, GET_BLOCK_BODIES_PACKET, GET_BLOCK_HEADERS_PACKET, GET_RECEIPTS_PACKET, @@ -63,7 +62,8 @@ impl SyncRequester { rlp.append(&h.clone()); } - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, ETH_PROTOCOL, GET_BLOCK_BODIES_PACKET, rlp.out()); + let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); @@ -77,7 +77,7 @@ impl SyncRequester { rlp.append(&1u32); rlp.append(&0u32); rlp.append(&0u32); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GET_BLOCK_HEADERS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out()); } /// Find some headers or blocks to download for a peer. @@ -95,7 +95,7 @@ impl SyncRequester { pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) { trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id); let rlp = RlpStream::new_list(0); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out()); } /// Request headers from a peer by block hash @@ -106,7 +106,7 @@ impl SyncRequester { rlp.append(&count); rlp.append(&skip); rlp.append(&if reverse {1u32} else {0u32}); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockHeaders, ETH_PROTOCOL, GET_BLOCK_HEADERS_PACKET, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_hash = Some(h.clone()); peer.block_set = Some(set); @@ -119,7 +119,7 @@ impl SyncRequester { for h in &hashes { rlp.append(&h.clone()); } - SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, GET_RECEIPTS_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockReceipts, ETH_PROTOCOL, GET_RECEIPTS_PACKET, rlp.out()); let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed"); peer.asking_blocks = hashes; peer.block_set = Some(set); @@ -130,23 +130,20 @@ impl SyncRequester { trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk); let mut rlp = RlpStream::new_list(1); rlp.append(chunk); - SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, GET_SNAPSHOT_DATA_PACKET, rlp.out()); + SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotData, WARP_SYNC_PROTOCOL_ID, GET_SNAPSHOT_DATA_PACKET, rlp.out()); } /// Generic request sender - fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { + fn send_request(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) { if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) { if peer.asking != PeerAsking::Nothing { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } peer.asking = asking; peer.ask_time = Instant::now(); - // TODO [ToDr] This seems quite fragile. Be careful when protocol is updated. - let result = if packet_id >= ETH_PROTOCOL_VERSION_63.1 { - io.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet) - } else { - io.send(peer_id, packet_id, packet) - }; + + let result = io.send_protocol(protocol, peer_id, packet_id, packet); + if let Err(e) = result { debug!(target:"sync", "Error sending request: {:?}", e); io.disconnect_peer(peer_id); diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index b4cd9a08f..1bf892e47 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -33,8 +33,6 @@ pub trait SyncIo { fn disconnect_peer(&mut self, peer_id: PeerId); /// Respond to current request with a packet. Can be called from an IO handler for incoming packet. fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), Error>; - /// Send a packet to a peer. - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>; /// Send a packet to a peer using specified protocol. fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>; /// Get the blockchain @@ -99,10 +97,6 @@ impl<'s> SyncIo for NetSyncIo<'s> { self.network.respond(packet_id, data) } - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>{ - self.network.send(peer_id, packet_id, data) - } - fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error>{ self.network.send_protocol(protocol, peer_id, packet_id, data) } diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index cdc55fba0..04ceb3dd3 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -30,8 +30,8 @@ use ethcore::miner::Miner; use ethcore::test_helpers; use sync_io::SyncIo; use io::{IoChannel, IoContext, IoHandler}; -use api::WARP_SYNC_PROTOCOL_ID; -use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET, SyncSupplier}; +use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID}; +use chain::{ChainSync, ETH_PROTOCOL_VERSION_63, PAR_PROTOCOL_VERSION_3, PRIVATE_TRANSACTION_PACKET, SyncSupplier, STATUS_PACKET, RECEIPTS_PACKET, GET_SNAPSHOT_MANIFEST_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET}; use SyncConfig; use private_tx::SimplePrivateTxHandler; use types::BlockNumber; @@ -80,6 +80,16 @@ impl<'p, C> Drop for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { } } +fn assert_packet_id_matches_protocol(protocol: &ProtocolId, packet_id: &PacketId) { + match packet_id { + STATUS_PACKET ... RECEIPTS_PACKET => assert_eq!(*protocol, ETH_PROTOCOL), + GET_SNAPSHOT_MANIFEST_PACKET ... SIGNED_PRIVATE_TRANSACTION_PACKET => assert_eq!(*protocol, WARP_SYNC_PROTOCOL_ID), + // What about light? + _ => assert!(false) + } +} + + impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { fn disable_peer(&mut self, peer_id: PeerId) { self.disconnect_peer(peer_id); @@ -102,7 +112,9 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { Ok(()) } - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { + fn send_protocol(&mut self, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { + assert_packet_id_matches_protocol(&protocol, &packet_id); + self.packets.push(TestPacket { data: data, packet_id: packet_id, @@ -111,10 +123,6 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { Ok(()) } - fn send_protocol(&mut self, _protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), network::Error> { - self.send(peer_id, packet_id, data) - } - fn chain(&self) -> &BlockChainClient { &*self.chain } @@ -236,9 +244,9 @@ impl EthPeer where C: FlushingBlockChainClient { match message { ChainMessageType::Consensus(data) => self.sync.write().propagate_consensus_packet(&mut io, data), ChainMessageType::PrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_private_transaction(&mut io, transaction_hash, PRIVATE_TRANSACTION_PACKET, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, data), ChainMessageType::SignedPrivateTransaction(transaction_hash, data) => - self.sync.write().propagate_private_transaction(&mut io, transaction_hash, SIGNED_PRIVATE_TRANSACTION_PACKET, data), + self.sync.write().propagate_private_transaction(&mut io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, data), } }