From cf6fa63f55e0cddb77b5f7da41620e8723bc6bb9 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 4 Feb 2020 20:59:16 +0100 Subject: [PATCH] Avoid long state queries when serving GetNodeData requests (#11444) * Remove dead bootnodes, add new geth bootnodes * More granular locking when fetching state Finish GetDataNode requests early if queries take too long * typo * Use latest kvdb-rocksdb * Cleanup * Update ethcore/sync/src/chain/supplier.rs Co-Authored-By: Andronik Ordian * Address review grumbles * Fix compilation * Address review grumbles Co-authored-by: Andronik Ordian --- ethcore/sync/src/chain/mod.rs | 5 +++++ ethcore/sync/src/chain/supplier.rs | 28 ++++++++++++++++++++------- util/journaldb/src/overlayrecentdb.rs | 20 +++++++++++++++---- util/network-devp2p/src/connection.rs | 12 ++++++------ 4 files changed, 48 insertions(+), 17 deletions(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 8f9e296c0..2b506c04f 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -162,7 +162,12 @@ pub const PAR_PROTOCOL_VERSION_4: (u8, u8) = (4, 0x20); pub const MAX_BODIES_TO_SEND: usize = 256; pub const MAX_HEADERS_TO_SEND: usize = 512; +/// Maximum number of "entries" to include in a GetDataNode request. pub const MAX_NODE_DATA_TO_SEND: usize = 1024; +/// Maximum allowed duration for serving a batch GetNodeData request. +const MAX_NODE_DATA_TOTAL_DURATION: Duration = Duration::from_secs(2); +/// Maximum allowed duration for serving a single GetNodeData request. +const MAX_NODE_DATA_SINGLE_DURATION: Duration = Duration::from_millis(100); pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256; const MIN_PEERS_PROPAGATION: usize = 4; const MAX_PEERS_PROPAGATION: usize = 128; diff --git a/ethcore/sync/src/chain/supplier.rs b/ethcore/sync/src/chain/supplier.rs index 43e1148fc..6596506ed 100644 --- a/ethcore/sync/src/chain/supplier.rs +++ b/ethcore/sync/src/chain/supplier.rs @@ -15,13 +15,14 @@ // along with Parity Ethereum. If not, see . use std::cmp; +use std::time::{Duration, Instant}; use crate::sync_io::SyncIo; use bytes::Bytes; use enum_primitive::FromPrimitive; use ethereum_types::H256; -use log::{debug, trace}; +use log::{debug, trace, warn}; use network::{self, PeerId}; use parking_lot::RwLock; use rlp::{Rlp, RlpStream}; @@ -56,6 +57,8 @@ use super::{ MAX_BODIES_TO_SEND, MAX_HEADERS_TO_SEND, MAX_NODE_DATA_TO_SEND, + MAX_NODE_DATA_TOTAL_DURATION, + MAX_NODE_DATA_SINGLE_DURATION, MAX_RECEIPTS_HEADERS_TO_SEND, }; @@ -258,9 +261,9 @@ impl SyncSupplier { /// Respond to GetNodeData request fn return_node_data(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult { - let payload_soft_limit = io.payload_soft_limit(); + let payload_soft_limit = io.payload_soft_limit(); // 4Mb let mut count = r.item_count().unwrap_or(0); - trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count); + trace!(target: "sync", "{} -> GetNodeData: {} entries requested", peer_id, count); if count == 0 { debug!(target: "sync", "Empty GetNodeData request, ignoring."); return Ok(None); @@ -269,10 +272,20 @@ impl SyncSupplier { let mut added = 0usize; let mut data = Vec::new(); let mut total_bytes = 0; + let mut total_elpsd = Duration::from_secs(0); for i in 0..count { - if let Some(node) = io.chain().state_data(&r.val_at::(i)?) { + let hash = &r.val_at(i)?; + let elpsd = Instant::now(); + let state = io.chain().state_data(hash); + + total_elpsd += elpsd.elapsed(); + if elpsd.elapsed() > MAX_NODE_DATA_SINGLE_DURATION || total_elpsd > MAX_NODE_DATA_TOTAL_DURATION { + warn!(target: "sync", "{} -> GetNodeData: item {}/{} – slow state fetch for hash {:?}; took {:?}", + peer_id, i, count, hash, elpsd); + break; + } + if let Some(node) = state { total_bytes += node.len(); - // Check that the packet won't be oversized if total_bytes > payload_soft_limit { break; } @@ -280,7 +293,8 @@ impl SyncSupplier { added += 1; } } - trace!(target: "sync", "{} -> GetNodeData: return {} entries", peer_id, added); + trace!(target: "sync", "{} -> GetNodeData: returning {}/{} entries ({} bytes total in {:?})", + peer_id, added, count, total_bytes, total_elpsd); let mut rlp = RlpStream::new_list(added); for d in data { rlp.append(&d); @@ -540,7 +554,7 @@ mod test { let rlp_result = result.unwrap(); assert!(rlp_result.is_some()); - // the length of one rlp-encoded hashe + // the length of one rlp-encoded hash let rlp = rlp_result.unwrap().1.out(); let rlp = Rlp::new(&rlp); assert_eq!(Ok(1), rlp.item_count()); diff --git a/util/journaldb/src/overlayrecentdb.rs b/util/journaldb/src/overlayrecentdb.rs index ed4c5bb25..8f1763770 100644 --- a/util/journaldb/src/overlayrecentdb.rs +++ b/util/journaldb/src/overlayrecentdb.rs @@ -20,6 +20,7 @@ use std::{ collections::{HashMap, hash_map::Entry}, io, sync::Arc, + time::Duration, }; use ethereum_types::H256; @@ -279,11 +280,22 @@ impl JournalDB for OverlayRecentDB { fn earliest_era(&self) -> Option { self.journal_overlay.read().earliest_era } fn state(&self, key: &H256) -> Option { - let journal_overlay = self.journal_overlay.read(); let key = to_short_key(key); - journal_overlay.backing_overlay.get(&key, EMPTY_PREFIX) - .or_else(|| journal_overlay.pending_overlay.get(&key).map(|d| d.clone())) - .or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec())) + // Hold the read lock for shortest possible amount of time. + let maybe_state_data = { + let journal_overlay = self.journal_overlay.read(); + journal_overlay + .backing_overlay + .get(&key, EMPTY_PREFIX) + .or_else(|| journal_overlay.pending_overlay.get(&key).map(|d| d.clone())) + }; + + maybe_state_data.or_else(|| { + let pkey = &key[..DB_PREFIX_LEN]; + self.backing + .get_by_prefix(self.column, &pkey) + .map(|b| b.to_vec()) + }) } fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> io::Result { diff --git a/util/network-devp2p/src/connection.rs b/util/network-devp2p/src/connection.rs index 0ddeb3737..2f88efb2d 100644 --- a/util/network-devp2p/src/connection.rs +++ b/util/network-devp2p/src/connection.rs @@ -40,11 +40,11 @@ use crate::handshake::Handshake; const ENCRYPTED_HEADER_LEN: usize = 32; const RECEIVE_PAYLOAD: Duration = Duration::from_secs(30); -pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1; +pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1; // 16Mb /// Network responses should try not to go over this limit. /// This should be lower than MAX_PAYLOAD_SIZE -pub const PAYLOAD_SOFT_LIMIT: usize = (1 << 22) - 1; +pub const PAYLOAD_SOFT_LIMIT: usize = (1 << 22) - 1; // 4Mb pub trait GenericSocket : Read + Write { } @@ -97,7 +97,7 @@ impl GenericConnection { else if self.rec_buf.len() > self.rec_size { warn!(target:"network", "Read past buffer {} bytes", self.rec_buf.len() - self.rec_size); return Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) - } + } }, Ok(_) => return Ok(None), Err(e) => { @@ -105,7 +105,7 @@ impl GenericConnection { return Err(e) } } - } + } } /// Add a packet to send queue. @@ -222,7 +222,7 @@ impl Connection { pub fn register_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { if self.registered.compare_and_swap(false, true, AtomicOrdering::SeqCst) { return Ok(()); - } + } trace!(target: "network", "connection register; token={:?}", reg); if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */) { // TODO: oneshot is broken on windows trace!(target: "network", "Failed to register {:?}, {:?}", reg, e); @@ -235,7 +235,7 @@ impl Connection { trace!(target: "network", "connection reregister; token={:?}", reg); if !self.registered.load(AtomicOrdering::SeqCst) { self.register_socket(reg, event_loop) - } else { + } else { event_loop.reregister(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */ ).unwrap_or_else(|e| { // TODO: oneshot is broken on windows trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e); });