Don't try to send oversized packets (#10042)

* Don't construct oversized packets

* Add test for payload limit

* [eth-sync] Fix wrongly computed data sizes

* Replace `MAX_RECEIPTS_TO_SEND` with overall softlimit
This commit is contained in:
Nicolas Gotchac 2019-01-04 19:58:21 +01:00 committed by Afri Schoedon
parent b180be7526
commit e435407080
8 changed files with 109 additions and 19 deletions

View File

@ -129,6 +129,8 @@ pub enum EachBlockWith {
Uncle, Uncle,
/// Block with a transaction. /// Block with a transaction.
Transaction, Transaction,
/// Block with multiple transactions.
Transactions(usize),
/// Block with an uncle and transaction. /// Block with an uncle and transaction.
UncleAndTransaction UncleAndTransaction
} }
@ -274,21 +276,31 @@ impl TestBlockChainClient {
_ => RlpStream::new_list(0) _ => RlpStream::new_list(0)
}; };
let txs = match with { let txs = match with {
EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction | EachBlockWith::Transactions(_) => {
let mut txs = RlpStream::new_list(1); let num_transactions = match with {
let keypair = Random.generate().unwrap(); EachBlockWith::Transactions(num) => num,
// Update nonces value _ => 1,
self.nonces.write().insert(keypair.address(), U256::one());
let tx = Transaction {
action: Action::Create,
value: U256::from(100),
data: "3331600055".from_hex().unwrap(),
gas: U256::from(100_000),
gas_price: U256::from(200_000_000_000u64),
nonce: U256::zero()
}; };
let signed_tx = tx.sign(keypair.secret(), None); let mut txs = RlpStream::new_list(num_transactions);
txs.append(&signed_tx); let keypair = Random.generate().unwrap();
let mut nonce = U256::zero();
for _ in 0..num_transactions {
// Update nonces value
let tx = Transaction {
action: Action::Create,
value: U256::from(100),
data: "3331600055".from_hex().unwrap(),
gas: U256::from(100_000),
gas_price: U256::from(200_000_000_000u64),
nonce: nonce
};
let signed_tx = tx.sign(keypair.secret(), None);
txs.append(&signed_tx);
nonce += U256::one();
}
self.nonces.write().insert(keypair.address(), nonce);
txs.out() txs.out()
}, },
_ => ::rlp::EMPTY_LIST_RLP.to_vec() _ => ::rlp::EMPTY_LIST_RLP.to_vec()

View File

@ -140,7 +140,6 @@ pub const PAR_PROTOCOL_VERSION_3: (u8, u8) = (3, 0x18);
pub const MAX_BODIES_TO_SEND: usize = 256; pub const MAX_BODIES_TO_SEND: usize = 256;
pub const MAX_HEADERS_TO_SEND: usize = 512; pub const MAX_HEADERS_TO_SEND: usize = 512;
pub const MAX_NODE_DATA_TO_SEND: usize = 1024; pub const MAX_NODE_DATA_TO_SEND: usize = 1024;
pub const MAX_RECEIPTS_TO_SEND: usize = 1024;
pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
const MIN_PEERS_PROPAGATION: usize = 4; const MIN_PEERS_PROPAGATION: usize = 4;
const MAX_PEERS_PROPAGATION: usize = 128; const MAX_PEERS_PROPAGATION: usize = 128;

View File

@ -43,7 +43,6 @@ use super::{
MAX_HEADERS_TO_SEND, MAX_HEADERS_TO_SEND,
MAX_NODE_DATA_TO_SEND, MAX_NODE_DATA_TO_SEND,
MAX_RECEIPTS_HEADERS_TO_SEND, MAX_RECEIPTS_HEADERS_TO_SEND,
MAX_RECEIPTS_TO_SEND,
NODE_DATA_PACKET, NODE_DATA_PACKET,
RECEIPTS_PACKET, RECEIPTS_PACKET,
SNAPSHOT_DATA_PACKET, SNAPSHOT_DATA_PACKET,
@ -127,6 +126,7 @@ impl SyncSupplier {
/// Respond to GetBlockHeaders request /// Respond to GetBlockHeaders request
fn return_block_headers(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { fn return_block_headers(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
let payload_soft_limit = io.payload_soft_limit();
// Packet layout: // Packet layout:
// [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ]
let max_headers: usize = r.val_at(1)?; let max_headers: usize = r.val_at(1)?;
@ -182,6 +182,10 @@ impl SyncSupplier {
} else if let Some(hdr) = io.chain().block_header(BlockId::Number(number)) { } else if let Some(hdr) = io.chain().block_header(BlockId::Number(number)) {
data.append(&mut hdr.into_inner()); data.append(&mut hdr.into_inner());
count += 1; count += 1;
// Check that the packet won't be oversized
if data.len() > payload_soft_limit {
break;
}
} else { } else {
// No required block. // No required block.
break; break;
@ -203,6 +207,7 @@ impl SyncSupplier {
/// Respond to GetBlockBodies request /// Respond to GetBlockBodies request
fn return_block_bodies(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { fn return_block_bodies(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
let payload_soft_limit = io.payload_soft_limit();
let mut count = r.item_count().unwrap_or(0); let mut count = r.item_count().unwrap_or(0);
if count == 0 { if count == 0 {
debug!(target: "sync", "Empty GetBlockBodies request, ignoring."); debug!(target: "sync", "Empty GetBlockBodies request, ignoring.");
@ -215,6 +220,10 @@ impl SyncSupplier {
if let Some(body) = io.chain().block_body(BlockId::Hash(r.val_at::<H256>(i)?)) { if let Some(body) = io.chain().block_body(BlockId::Hash(r.val_at::<H256>(i)?)) {
data.append(&mut body.into_inner()); data.append(&mut body.into_inner());
added += 1; added += 1;
// Check that the packet won't be oversized
if data.len() > payload_soft_limit {
break;
}
} }
} }
let mut rlp = RlpStream::new_list(added); let mut rlp = RlpStream::new_list(added);
@ -225,6 +234,7 @@ impl SyncSupplier {
/// Respond to GetNodeData request /// Respond to GetNodeData request
fn return_node_data(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { fn return_node_data(io: &SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
let payload_soft_limit = io.payload_soft_limit();
let mut count = r.item_count().unwrap_or(0); let mut count = r.item_count().unwrap_or(0);
trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count); trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count);
if count == 0 { if count == 0 {
@ -234,8 +244,14 @@ impl SyncSupplier {
count = cmp::min(count, MAX_NODE_DATA_TO_SEND); count = cmp::min(count, MAX_NODE_DATA_TO_SEND);
let mut added = 0usize; let mut added = 0usize;
let mut data = Vec::new(); let mut data = Vec::new();
let mut total_bytes = 0;
for i in 0..count { for i in 0..count {
if let Some(node) = io.chain().state_data(&r.val_at::<H256>(i)?) { if let Some(node) = io.chain().state_data(&r.val_at::<H256>(i)?) {
total_bytes += node.len();
// Check that the packet won't be oversized
if total_bytes > payload_soft_limit {
break;
}
data.push(node); data.push(node);
added += 1; added += 1;
} }
@ -249,6 +265,7 @@ impl SyncSupplier {
} }
fn return_receipts(io: &SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult { fn return_receipts(io: &SyncIo, rlp: &Rlp, peer_id: PeerId) -> RlpResponseResult {
let payload_soft_limit = io.payload_soft_limit();
let mut count = rlp.item_count().unwrap_or(0); let mut count = rlp.item_count().unwrap_or(0);
trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count); trace!(target: "sync", "{} -> GetReceipts: {} entries", peer_id, count);
if count == 0 { if count == 0 {
@ -257,15 +274,15 @@ impl SyncSupplier {
} }
count = cmp::min(count, MAX_RECEIPTS_HEADERS_TO_SEND); count = cmp::min(count, MAX_RECEIPTS_HEADERS_TO_SEND);
let mut added_headers = 0usize; let mut added_headers = 0usize;
let mut added_receipts = 0usize;
let mut data = Bytes::new(); let mut data = Bytes::new();
let mut total_bytes = 0;
for i in 0..count { for i in 0..count {
if let Some(receipts) = io.chain().block_receipts(&rlp.val_at::<H256>(i)?) { if let Some(receipts) = io.chain().block_receipts(&rlp.val_at::<H256>(i)?) {
let mut receipts_bytes = ::rlp::encode(&receipts); let mut receipts_bytes = ::rlp::encode(&receipts);
total_bytes += receipts_bytes.len();
if total_bytes > payload_soft_limit { break; }
data.append(&mut receipts_bytes); data.append(&mut receipts_bytes);
added_receipts += receipts_bytes.len();
added_headers += 1; added_headers += 1;
if added_receipts > MAX_RECEIPTS_TO_SEND { break; }
} }
} }
let mut rlp_result = RlpStream::new_list(added_headers); let mut rlp_result = RlpStream::new_list(added_headers);
@ -410,6 +427,42 @@ mod test {
assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]); assert_eq!(to_header_vec(result), vec![headers[50].clone(), headers[44].clone(), headers[38].clone()]);
} }
#[test]
fn respect_packet_limit() {
let small_num_blocks = 10;
let large_num_blocks = 50;
let tx_per_block = 100;
let mut client = TestBlockChainClient::new();
client.add_blocks(large_num_blocks, EachBlockWith::Transactions(tx_per_block));
let mut small_rlp_request = RlpStream::new_list(small_num_blocks);
let mut large_rlp_request = RlpStream::new_list(large_num_blocks);
for i in 0..small_num_blocks {
let hash: H256 = client.block_hash(BlockId::Number(i as u64)).unwrap();
small_rlp_request.append(&hash);
large_rlp_request.append(&hash);
}
for i in small_num_blocks..large_num_blocks {
let hash: H256 = client.block_hash(BlockId::Number(i as u64)).unwrap();
large_rlp_request.append(&hash);
}
let queue = RwLock::new(VecDeque::new());
let ss = TestSnapshotService::new();
let io = TestIo::new(&mut client, &ss, &queue, None);
let small_result = SyncSupplier::return_block_bodies(&io, &Rlp::new(&small_rlp_request.out()), 0);
let small_result = small_result.unwrap().unwrap().1;
assert_eq!(Rlp::new(&small_result.out()).item_count().unwrap(), small_num_blocks);
let large_result = SyncSupplier::return_block_bodies(&io, &Rlp::new(&large_rlp_request.out()), 0);
let large_result = large_result.unwrap().unwrap().1;
assert!(Rlp::new(&large_result.out()).item_count().unwrap() < large_num_blocks);
}
#[test] #[test]
fn return_nodes() { fn return_nodes() {
let mut client = TestBlockChainClient::new(); let mut client = TestBlockChainClient::new();

View File

@ -58,6 +58,8 @@ pub trait SyncIo {
fn is_expired(&self) -> bool; fn is_expired(&self) -> bool;
/// Return sync overlay /// Return sync overlay
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>>; fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>>;
/// Returns the size the payload shouldn't exceed
fn payload_soft_limit(&self) -> usize;
} }
/// Wraps `NetworkContext` and the blockchain client /// Wraps `NetworkContext` and the blockchain client
@ -135,4 +137,8 @@ impl<'s> SyncIo for NetSyncIo<'s> {
fn peer_info(&self, peer_id: PeerId) -> String { fn peer_info(&self, peer_id: PeerId) -> String {
self.network.peer_client_version(peer_id) self.network.peer_client_version(peer_id)
} }
fn payload_soft_limit(&self) -> usize {
self.network.payload_soft_limit()
}
} }

View File

@ -144,6 +144,10 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> { fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
&self.overlay &self.overlay
} }
fn payload_soft_limit(&self) -> usize {
100_000
}
} }
/// Mock for emulution of async run of new blocks /// Mock for emulution of async run of new blocks

View File

@ -41,6 +41,10 @@ const ENCRYPTED_HEADER_LEN: usize = 32;
const RECEIVE_PAYLOAD: Duration = Duration::from_secs(30); const RECEIVE_PAYLOAD: Duration = Duration::from_secs(30);
pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1; pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1;
/// Network responses should try not to go over this limit.
/// This should be lower than MAX_PAYLOAD_SIZE
pub const PAYLOAD_SOFT_LIMIT: usize = (1 << 22) - 1;
pub trait GenericSocket : Read + Write { pub trait GenericSocket : Read + Write {
} }

View File

@ -46,6 +46,7 @@ use ip_utils::{map_external_address, select_public_address};
use parity_path::restrict_permissions_owner; use parity_path::restrict_permissions_owner;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use network::{ConnectionFilter, ConnectionDirection}; use network::{ConnectionFilter, ConnectionDirection};
use connection::PAYLOAD_SOFT_LIMIT;
type Slab<T> = ::slab::Slab<T, usize>; type Slab<T> = ::slab::Slab<T, usize>;
@ -200,6 +201,10 @@ impl<'s> NetworkContextTrait for NetworkContext<'s> {
.map(|node| self.reserved_peers.contains(&node)) .map(|node| self.reserved_peers.contains(&node))
.unwrap_or(false) .unwrap_or(false)
} }
fn payload_soft_limit(&self) -> usize {
PAYLOAD_SOFT_LIMIT
}
} }
/// Shared host information /// Shared host information

View File

@ -288,6 +288,9 @@ pub trait NetworkContext {
/// Returns whether the given peer ID is a reserved peer. /// Returns whether the given peer ID is a reserved peer.
fn is_reserved_peer(&self, peer: PeerId) -> bool; fn is_reserved_peer(&self, peer: PeerId) -> bool;
/// Returns the size the payload shouldn't exceed
fn payload_soft_limit(&self) -> usize;
} }
impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext { impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext {
@ -338,6 +341,10 @@ impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext {
fn is_reserved_peer(&self, peer: PeerId) -> bool { fn is_reserved_peer(&self, peer: PeerId) -> bool {
(**self).is_reserved_peer(peer) (**self).is_reserved_peer(peer)
} }
fn payload_soft_limit(&self) -> usize {
(**self).payload_soft_limit()
}
} }
/// Network IO protocol handler. This needs to be implemented for each new subprotocol. /// Network IO protocol handler. This needs to be implemented for each new subprotocol.