Avoid long state queries when serving GetNodeData requests (#11444)
* Remove dead bootnodes, add new geth bootnodes * More granular locking when fetching state Finish GetDataNode requests early if queries take too long * typo * Use latest kvdb-rocksdb * Cleanup * Update ethcore/sync/src/chain/supplier.rs Co-Authored-By: Andronik Ordian <write@reusable.software> * Address review grumbles * Fix compilation * Address review grumbles Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
parent
edf59a1394
commit
cf6fa63f55
@ -162,7 +162,12 @@ pub const PAR_PROTOCOL_VERSION_4: (u8, u8) = (4, 0x20);
|
|||||||
|
|
||||||
pub const MAX_BODIES_TO_SEND: usize = 256;
|
pub const MAX_BODIES_TO_SEND: usize = 256;
|
||||||
pub const MAX_HEADERS_TO_SEND: usize = 512;
|
pub const MAX_HEADERS_TO_SEND: usize = 512;
|
||||||
|
/// Maximum number of "entries" to include in a GetDataNode request.
|
||||||
pub const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
pub const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
||||||
|
/// Maximum allowed duration for serving a batch GetNodeData request.
|
||||||
|
const MAX_NODE_DATA_TOTAL_DURATION: Duration = Duration::from_secs(2);
|
||||||
|
/// Maximum allowed duration for serving a single GetNodeData request.
|
||||||
|
const MAX_NODE_DATA_SINGLE_DURATION: Duration = Duration::from_millis(100);
|
||||||
pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
|
pub const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
|
||||||
const MIN_PEERS_PROPAGATION: usize = 4;
|
const MIN_PEERS_PROPAGATION: usize = 4;
|
||||||
const MAX_PEERS_PROPAGATION: usize = 128;
|
const MAX_PEERS_PROPAGATION: usize = 128;
|
||||||
|
@ -15,13 +15,14 @@
|
|||||||
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use crate::sync_io::SyncIo;
|
use crate::sync_io::SyncIo;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use enum_primitive::FromPrimitive;
|
use enum_primitive::FromPrimitive;
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
use log::{debug, trace};
|
use log::{debug, trace, warn};
|
||||||
use network::{self, PeerId};
|
use network::{self, PeerId};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use rlp::{Rlp, RlpStream};
|
use rlp::{Rlp, RlpStream};
|
||||||
@ -56,6 +57,8 @@ use super::{
|
|||||||
MAX_BODIES_TO_SEND,
|
MAX_BODIES_TO_SEND,
|
||||||
MAX_HEADERS_TO_SEND,
|
MAX_HEADERS_TO_SEND,
|
||||||
MAX_NODE_DATA_TO_SEND,
|
MAX_NODE_DATA_TO_SEND,
|
||||||
|
MAX_NODE_DATA_TOTAL_DURATION,
|
||||||
|
MAX_NODE_DATA_SINGLE_DURATION,
|
||||||
MAX_RECEIPTS_HEADERS_TO_SEND,
|
MAX_RECEIPTS_HEADERS_TO_SEND,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -258,9 +261,9 @@ impl SyncSupplier {
|
|||||||
|
|
||||||
/// Respond to GetNodeData request
|
/// Respond to GetNodeData request
|
||||||
fn return_node_data(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
fn return_node_data(io: &dyn SyncIo, r: &Rlp, peer_id: PeerId) -> RlpResponseResult {
|
||||||
let payload_soft_limit = io.payload_soft_limit();
|
let payload_soft_limit = io.payload_soft_limit(); // 4Mb
|
||||||
let mut count = r.item_count().unwrap_or(0);
|
let mut count = r.item_count().unwrap_or(0);
|
||||||
trace!(target: "sync", "{} -> GetNodeData: {} entries", peer_id, count);
|
trace!(target: "sync", "{} -> GetNodeData: {} entries requested", peer_id, count);
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
debug!(target: "sync", "Empty GetNodeData request, ignoring.");
|
debug!(target: "sync", "Empty GetNodeData request, ignoring.");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
@ -269,10 +272,20 @@ impl SyncSupplier {
|
|||||||
let mut added = 0usize;
|
let mut added = 0usize;
|
||||||
let mut data = Vec::new();
|
let mut data = Vec::new();
|
||||||
let mut total_bytes = 0;
|
let mut total_bytes = 0;
|
||||||
|
let mut total_elpsd = Duration::from_secs(0);
|
||||||
for i in 0..count {
|
for i in 0..count {
|
||||||
if let Some(node) = io.chain().state_data(&r.val_at::<H256>(i)?) {
|
let hash = &r.val_at(i)?;
|
||||||
|
let elpsd = Instant::now();
|
||||||
|
let state = io.chain().state_data(hash);
|
||||||
|
|
||||||
|
total_elpsd += elpsd.elapsed();
|
||||||
|
if elpsd.elapsed() > MAX_NODE_DATA_SINGLE_DURATION || total_elpsd > MAX_NODE_DATA_TOTAL_DURATION {
|
||||||
|
warn!(target: "sync", "{} -> GetNodeData: item {}/{} – slow state fetch for hash {:?}; took {:?}",
|
||||||
|
peer_id, i, count, hash, elpsd);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if let Some(node) = state {
|
||||||
total_bytes += node.len();
|
total_bytes += node.len();
|
||||||
// Check that the packet won't be oversized
|
|
||||||
if total_bytes > payload_soft_limit {
|
if total_bytes > payload_soft_limit {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -280,7 +293,8 @@ impl SyncSupplier {
|
|||||||
added += 1;
|
added += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace!(target: "sync", "{} -> GetNodeData: return {} entries", peer_id, added);
|
trace!(target: "sync", "{} -> GetNodeData: returning {}/{} entries ({} bytes total in {:?})",
|
||||||
|
peer_id, added, count, total_bytes, total_elpsd);
|
||||||
let mut rlp = RlpStream::new_list(added);
|
let mut rlp = RlpStream::new_list(added);
|
||||||
for d in data {
|
for d in data {
|
||||||
rlp.append(&d);
|
rlp.append(&d);
|
||||||
@ -540,7 +554,7 @@ mod test {
|
|||||||
let rlp_result = result.unwrap();
|
let rlp_result = result.unwrap();
|
||||||
assert!(rlp_result.is_some());
|
assert!(rlp_result.is_some());
|
||||||
|
|
||||||
// the length of one rlp-encoded hashe
|
// the length of one rlp-encoded hash
|
||||||
let rlp = rlp_result.unwrap().1.out();
|
let rlp = rlp_result.unwrap().1.out();
|
||||||
let rlp = Rlp::new(&rlp);
|
let rlp = Rlp::new(&rlp);
|
||||||
assert_eq!(Ok(1), rlp.item_count());
|
assert_eq!(Ok(1), rlp.item_count());
|
||||||
|
@ -20,6 +20,7 @@ use std::{
|
|||||||
collections::{HashMap, hash_map::Entry},
|
collections::{HashMap, hash_map::Entry},
|
||||||
io,
|
io,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
@ -279,11 +280,22 @@ impl JournalDB for OverlayRecentDB {
|
|||||||
fn earliest_era(&self) -> Option<u64> { self.journal_overlay.read().earliest_era }
|
fn earliest_era(&self) -> Option<u64> { self.journal_overlay.read().earliest_era }
|
||||||
|
|
||||||
fn state(&self, key: &H256) -> Option<Bytes> {
|
fn state(&self, key: &H256) -> Option<Bytes> {
|
||||||
let journal_overlay = self.journal_overlay.read();
|
|
||||||
let key = to_short_key(key);
|
let key = to_short_key(key);
|
||||||
journal_overlay.backing_overlay.get(&key, EMPTY_PREFIX)
|
// Hold the read lock for shortest possible amount of time.
|
||||||
.or_else(|| journal_overlay.pending_overlay.get(&key).map(|d| d.clone()))
|
let maybe_state_data = {
|
||||||
.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec()))
|
let journal_overlay = self.journal_overlay.read();
|
||||||
|
journal_overlay
|
||||||
|
.backing_overlay
|
||||||
|
.get(&key, EMPTY_PREFIX)
|
||||||
|
.or_else(|| journal_overlay.pending_overlay.get(&key).map(|d| d.clone()))
|
||||||
|
};
|
||||||
|
|
||||||
|
maybe_state_data.or_else(|| {
|
||||||
|
let pkey = &key[..DB_PREFIX_LEN];
|
||||||
|
self.backing
|
||||||
|
.get_by_prefix(self.column, &pkey)
|
||||||
|
.map(|b| b.to_vec())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> io::Result<u32> {
|
fn journal_under(&mut self, batch: &mut DBTransaction, now: u64, id: &H256) -> io::Result<u32> {
|
||||||
|
@ -40,11 +40,11 @@ use crate::handshake::Handshake;
|
|||||||
|
|
||||||
const ENCRYPTED_HEADER_LEN: usize = 32;
|
const ENCRYPTED_HEADER_LEN: usize = 32;
|
||||||
const RECEIVE_PAYLOAD: Duration = Duration::from_secs(30);
|
const RECEIVE_PAYLOAD: Duration = Duration::from_secs(30);
|
||||||
pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1;
|
pub const MAX_PAYLOAD_SIZE: usize = (1 << 24) - 1; // 16Mb
|
||||||
|
|
||||||
/// Network responses should try not to go over this limit.
|
/// Network responses should try not to go over this limit.
|
||||||
/// This should be lower than MAX_PAYLOAD_SIZE
|
/// This should be lower than MAX_PAYLOAD_SIZE
|
||||||
pub const PAYLOAD_SOFT_LIMIT: usize = (1 << 22) - 1;
|
pub const PAYLOAD_SOFT_LIMIT: usize = (1 << 22) - 1; // 4Mb
|
||||||
|
|
||||||
pub trait GenericSocket : Read + Write {
|
pub trait GenericSocket : Read + Write {
|
||||||
}
|
}
|
||||||
@ -97,7 +97,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
|
|||||||
else if self.rec_buf.len() > self.rec_size {
|
else if self.rec_buf.len() > self.rec_size {
|
||||||
warn!(target:"network", "Read past buffer {} bytes", self.rec_buf.len() - self.rec_size);
|
warn!(target:"network", "Read past buffer {} bytes", self.rec_buf.len() - self.rec_size);
|
||||||
return Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
|
return Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok(_) => return Ok(None),
|
Ok(_) => return Ok(None),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -105,7 +105,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
|
|||||||
return Err(e)
|
return Err(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a packet to send queue.
|
/// Add a packet to send queue.
|
||||||
@ -222,7 +222,7 @@ impl Connection {
|
|||||||
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
||||||
if self.registered.compare_and_swap(false, true, AtomicOrdering::SeqCst) {
|
if self.registered.compare_and_swap(false, true, AtomicOrdering::SeqCst) {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
trace!(target: "network", "connection register; token={:?}", reg);
|
trace!(target: "network", "connection register; token={:?}", reg);
|
||||||
if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */) { // TODO: oneshot is broken on windows
|
if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */) { // TODO: oneshot is broken on windows
|
||||||
trace!(target: "network", "Failed to register {:?}, {:?}", reg, e);
|
trace!(target: "network", "Failed to register {:?}, {:?}", reg, e);
|
||||||
@ -235,7 +235,7 @@ impl Connection {
|
|||||||
trace!(target: "network", "connection reregister; token={:?}", reg);
|
trace!(target: "network", "connection reregister; token={:?}", reg);
|
||||||
if !self.registered.load(AtomicOrdering::SeqCst) {
|
if !self.registered.load(AtomicOrdering::SeqCst) {
|
||||||
self.register_socket(reg, event_loop)
|
self.register_socket(reg, event_loop)
|
||||||
} else {
|
} else {
|
||||||
event_loop.reregister(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */ ).unwrap_or_else(|e| { // TODO: oneshot is broken on windows
|
event_loop.reregister(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */ ).unwrap_or_else(|e| { // TODO: oneshot is broken on windows
|
||||||
trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e);
|
trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e);
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user