diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index b2e1e23f5..c0990b720 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -129,6 +129,8 @@ pub enum EachBlockWith { Uncle, /// Block with a transaction. Transaction, + /// Block with multiple transactions. + Transactions(usize), /// Block with an uncle and transaction. UncleAndTransaction } @@ -274,21 +276,31 @@ impl TestBlockChainClient { _ => RlpStream::new_list(0) }; let txs = match with { - EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { - let mut txs = RlpStream::new_list(1); - let keypair = Random.generate().unwrap(); - // Update nonces value - self.nonces.write().insert(keypair.address(), U256::one()); - let tx = Transaction { - action: Action::Create, - value: U256::from(100), - data: "3331600055".from_hex().unwrap(), - gas: U256::from(100_000), - gas_price: U256::from(200_000_000_000u64), - nonce: U256::zero() + EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction | EachBlockWith::Transactions(_) => { + let num_transactions = match with { + EachBlockWith::Transactions(num) => num, + _ => 1, }; - let signed_tx = tx.sign(keypair.secret(), None); - txs.append(&signed_tx); + let mut txs = RlpStream::new_list(num_transactions); + let keypair = Random.generate().unwrap(); + let mut nonce = U256::zero(); + + for _ in 0..num_transactions { + // Update nonces value + let tx = Transaction { + action: Action::Create, + value: U256::from(100), + data: "3331600055".from_hex().unwrap(), + gas: U256::from(100_000), + gas_price: U256::from(200_000_000_000u64), + nonce: nonce + }; + let signed_tx = tx.sign(keypair.secret(), None); + txs.append(&signed_tx); + nonce += U256::one(); + } + + self.nonces.write().insert(keypair.address(), nonce); txs.out() }, _ => ::rlp::EMPTY_LIST_RLP.to_vec() diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 63824752a..d5841dac5 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -140,7 +140,6 @@ pub const PAR_PROTOCOL_VERSION_3: (u8, u8) = (3, 0x18); pub const MAX_BODIES_TO_SEND: usize = 256; pub const MAX_HEADERS_TO_SEND: usize = 512; pub const MAX_NODE_DATA_TO_SEND: usize = 1024; -pub const MAX_RECEIPTS_TO_SEND: usize = 1024; pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; const MIN_PEERS_PROPAGATION: usize = 4; const MAX_PEERS_PROPAGATION: usize = 128; diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 066a586e1..ca6380416 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -43,7 +43,6 @@ use super::{ MAX_HEADERS_TO_SEND, MAX_NODE_DATA_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND, - MAX_RECEIPTS_TO_SEND, NODE_DATA_PACKET, RECEIPTS_PACKET, SNAPSHOT_DATA_PACKET, @@ -127,6 +126,7 @@ impl SyncSupplier { /// Respond to GetBlockHeaders request fn return_block_headers(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { + let payload_soft_limit = io.payload_soft_limit(); // Packet layout: // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] let max_headers: usize = r.val_at(1)?; @@ -182,6 +182,10 @@ impl SyncSupplier { } else if let Some(hdr) = io.chain().block_header(BlockId::Number(number)) { data.append(&mut hdr.into_inner()); count += 1; + // Check that the packet won't be oversized + if data.len() > payload_soft_limit { + break; + } } else { // No required block. break; @@ -203,6 +207,7 @@ impl SyncSupplier { /// Respond to GetBlockBodies request fn return_block_bodies(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { + let payload_soft_limit = io.payload_soft_limit(); let mut count = r.item_count().unwrap_or(0); if count == 0 { debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); @@ -215,6 +220,10 @@ impl SyncSupplier { if let Some(body) = io.chain().block_body(BlockId::Hash(r.val_at::(i)?)) { data.append(&mut body.into_inner()); added += 1; + // Check that the packet won't be oversized + if data.len() > payload_soft_limit { + break; + } } } let mut rlp = RlpStream::new_list(added); @@ -225,6 +234,7 @@ impl SyncSupplier { /// Respond to GetNodeData request fn return_node_data(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { + let payload_soft_limit = io.payload_soft_limit(); let mut count = r.item_count().unwrap_or(0); trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count); if count == 0 { @@ -234,8 +244,14 @@ impl SyncSupplier { count = cmp::min(count, MAX_NODE_DATA_TO_SEND); let mut added = 0usize; let mut data = Vec::new(); + let mut total_bytes = 0; for i in 0..count { if let Some(node) = io.chain().state_data(&r.val_at::(i)?) { + total_bytes += node.len(); + // Check that the packet won't be oversized + if total_bytes > payload_soft_limit { + break; + } data.push(node); added += 1; } @@ -249,6 +265,7 @@ impl SyncSupplier { } fn return_receipts(io: &SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult { + let payload_soft_limit = io.payload_soft_limit(); let mut count = rlp.item_count().unwrap_or(0); trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count); if count == 0 { @@ -257,15 +274,15 @@ impl SyncSupplier { } count = cmp::min(count, MAX_RECEIPTS_HEADERS_TO_SEND); let mut added_headers = 0usize; - let mut added_receipts = 0usize; let mut data = Bytes::new(); + let mut total_bytes = 0; for i in 0..count { if let Some(receipts) = io.chain().block_receipts(&rlp.val_at::(i)?) { let mut receipts_bytes = ::rlp::encode(&receipts); + total_bytes += receipts_bytes.len(); + if total_bytes > payload_soft_limit { break; } data.append(&mut receipts_bytes); - added_receipts += receipts_bytes.len(); added_headers += 1; - if added_receipts > MAX_RECEIPTS_TO_SEND { break; } } } let mut rlp_result = RlpStream::new_list(added_headers); @@ -410,6 +427,42 @@ mod test { assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]); } + #[test] + fn respect_packet_limit() { + let small_num_blocks = 10; + let large_num_blocks = 50; + let tx_per_block = 100; + + let mut client = TestBlockChainClient::new(); + client.add_blocks(large_num_blocks, EachBlockWith::Transactions(tx_per_block)); + + let mut small_rlp_request = RlpStream::new_list(small_num_blocks); + let mut large_rlp_request = RlpStream::new_list(large_num_blocks); + + for i in 0..small_num_blocks { + let hash: H256 = client.block_hash(BlockId::Number(i as u64)).unwrap(); + small_rlp_request.append(&hash); + large_rlp_request.append(&hash); + } + + for i in small_num_blocks..large_num_blocks { + let hash: H256 = client.block_hash(BlockId::Number(i as u64)).unwrap(); + large_rlp_request.append(&hash); + } + + let queue = RwLock::new(VecDeque::new()); + let ss = TestSnapshotService::new(); + let io = TestIo::new(&mut client, &ss, &queue, None); + + let small_result = SyncSupplier::return_block_bodies(&io, &Rlp::new(&small_rlp_request.out()), 0); + let small_result = small_result.unwrap().unwrap().1; + assert_eq!(Rlp::new(&small_result.out()).item_count().unwrap(), small_num_blocks); + + let large_result = SyncSupplier::return_block_bodies(&io, &Rlp::new(&large_rlp_request.out()), 0); + let large_result = large_result.unwrap().unwrap().1; + assert!(Rlp::new(&large_result.out()).item_count().unwrap() < large_num_blocks); + } + #[test] fn return_nodes() { let mut client = TestBlockChainClient::new(); diff --git a/ethcore/sync/src/sync_io.rs b/ethcore/sync/src/sync_io.rs index 4516394aa..e3457ea55 100644 --- a/ethcore/sync/src/sync_io.rs +++ b/ethcore/sync/src/sync_io.rs @@ -58,6 +58,8 @@ pub trait SyncIo { fn is_expired(&self) -> bool; /// Return sync overlay fn chain_overlay(&self) -> &RwLock>; + /// Returns the size the payload shouldn't exceed + fn payload_soft_limit(&self) -> usize; } /// Wraps `NetworkContext` and the blockchain client @@ -135,4 +137,8 @@ impl<'s> SyncIo for NetSyncIo<'s> { fn peer_info(&self, peer_id: PeerId) -> String { self.network.peer_client_version(peer_id) } + + fn payload_soft_limit(&self) -> usize { + self.network.payload_soft_limit() + } } diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 60f49cd2d..549bfb7f2 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -144,6 +144,10 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p { fn chain_overlay(&self) -> &RwLock> { &self.overlay } + + fn payload_soft_limit(&self) -> usize { + 100_000 + } } /// Mock for emulution of async run of new blocks diff --git a/util/network-devp2p/src/connection.rs b/util/network-devp2p/src/connection.rs index eb663d379..be3f363aa 100644 --- a/util/network-devp2p/src/connection.rs +++ b/util/network-devp2p/src/connection.rs @@ -41,6 +41,10 @@ const ENCRYPTED_HEADER_LEN: usize = 32; const RECEIVE_PAYLOAD: Duration = Duration::from_secs(30); pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1; +/// Network responses should try not to go over this limit. +/// This should be lower than MAX_PAYLOAD_SIZE +pub const PAYLOAD_SOFT_LIMIT: usize = (1 << 22) - 1; + pub trait GenericSocket : Read + Write { } diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 8df7a89a3..55d6c08c2 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -46,6 +46,7 @@ use ip_utils::{map_external_address, select_public_address}; use parity_path::restrict_permissions_owner; use parking_lot::{Mutex, RwLock}; use network::{ConnectionFilter, ConnectionDirection}; +use connection::PAYLOAD_SOFT_LIMIT; type Slab = ::slab::Slab; @@ -200,6 +201,10 @@ impl<'s> NetworkContextTrait for NetworkContext<'s> { .map(|node| self.reserved_peers.contains(&node)) .unwrap_or(false) } + + fn payload_soft_limit(&self) -> usize { + PAYLOAD_SOFT_LIMIT + } } /// Shared host information diff --git a/util/network/src/lib.rs b/util/network/src/lib.rs index 395075e9f..caff90a34 100644 --- a/util/network/src/lib.rs +++ b/util/network/src/lib.rs @@ -288,6 +288,9 @@ pub trait NetworkContext { /// Returns whether the given peer ID is a reserved peer. fn is_reserved_peer(&self, peer: PeerId) -> bool; + + /// Returns the size the payload shouldn't exceed + fn payload_soft_limit(&self) -> usize; } impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext { @@ -338,6 +341,10 @@ impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext { fn is_reserved_peer(&self, peer: PeerId) -> bool { (**self).is_reserved_peer(peer) } + + fn payload_soft_limit(&self) -> usize { + (**self).payload_soft_limit() + } } /// Network IO protocol handler. This needs to be implemented for each new subprotocol.