light client fixes (#6148)

* light client fixes

* fix memory-lru-cache

* clear pending reqs on disconnect
This commit is contained in:
Robert Habermeier 2017-07-26 15:48:00 +02:00 committed by Arkadiy Paronyan
parent ee1dfb5605
commit 7d348e2260
5 changed files with 71 additions and 5 deletions

View File

@ -405,6 +405,7 @@ impl HeaderChain {
match id { match id {
BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()), BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()),
BlockId::Hash(hash) if hash == self.genesis_hash() => { Some(self.genesis_header.clone()) }
BlockId::Hash(hash) => load_from_db(hash), BlockId::Hash(hash) => load_from_db(hash),
BlockId::Number(num) => { BlockId::Number(num) => {
if self.best_block.read().number < num { return None } if self.best_block.read().number < num { return None }
@ -781,4 +782,18 @@ mod tests {
assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 10); assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 10);
assert!(chain.candidates.read().get(&100).is_some()) assert!(chain.candidates.read().get(&100).is_some())
} }
#[test]
fn genesis_header_available() {
let spec = Spec::new_test();
let genesis_header = spec.genesis_header();
let db = make_db();
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache.clone()).unwrap();
assert!(chain.block_header(BlockId::Earliest).is_some());
assert!(chain.block_header(BlockId::Number(0)).is_some());
assert!(chain.block_header(BlockId::Hash(genesis_header.hash())).is_some());
}
} }

View File

@ -282,6 +282,7 @@ impl Client {
let mut good = Vec::new(); let mut good = Vec::new();
for verified_header in self.queue.drain(MAX) { for verified_header in self.queue.drain(MAX) {
let (num, hash) = (verified_header.number(), verified_header.hash()); let (num, hash) = (verified_header.number(), verified_header.hash());
trace!(target: "client", "importing block {}", num);
if self.verify_full && !self.check_header(&mut bad, &verified_header) { if self.verify_full && !self.check_header(&mut bad, &verified_header) {
continue continue
@ -381,13 +382,17 @@ impl Client {
} }
} }
// return true if should skip, false otherwise. may push onto bad if // return false if should skip, true otherwise. may push onto bad if
// should skip. // should skip.
fn check_header(&self, bad: &mut Vec<H256>, verified_header: &Header) -> bool { fn check_header(&self, bad: &mut Vec<H256>, verified_header: &Header) -> bool {
let hash = verified_header.hash(); let hash = verified_header.hash();
let parent_header = match self.chain.block_header(BlockId::Hash(*verified_header.parent_hash())) { let parent_header = match self.chain.block_header(BlockId::Hash(*verified_header.parent_hash())) {
Some(header) => header, Some(header) => header,
None => return false, // skip import of block with missing parent. None => {
trace!(target: "client", "No parent for block ({}, {})",
verified_header.number(), hash);
return false // skip import of block with missing parent.
}
}; };
// Verify Block Family // Verify Block Family

View File

@ -806,6 +806,9 @@ impl LightProtocol {
trace!(target: "pip", "Connected peer with chain head {:?}", (status.head_hash, status.head_num)); trace!(target: "pip", "Connected peer with chain head {:?}", (status.head_hash, status.head_num));
if (status.network_id, status.genesis_hash) != (self.network_id, self.genesis_hash) { if (status.network_id, status.genesis_hash) != (self.network_id, self.genesis_hash) {
trace!(target: "pip", "peer {} wrong network: network_id is {} vs our {}, gh is {} vs our {}",
peer, status.network_id, self.network_id, status.genesis_hash, self.genesis_hash);
return Err(Error::WrongNetwork); return Err(Error::WrongNetwork);
} }

View File

@ -285,6 +285,13 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
best.clone() best.clone()
}; };
{
let mut pending_reqs = self.pending_reqs.lock();
for unfulfilled in unfulfilled {
pending_reqs.remove(&unfulfilled);
}
}
if new_best.is_none() { if new_best.is_none() {
debug!(target: "sync", "No peers remain. Reverting to idle"); debug!(target: "sync", "No peers remain. Reverting to idle");
*self.state.lock() = SyncState::Idle; *self.state.lock() = SyncState::Idle;
@ -503,10 +510,12 @@ impl<L: AsLightClient> LightSync<L> {
None None
} }
}).collect(); }).collect();
let mut rng = self.rng.lock(); let mut rng = self.rng.lock();
let mut requested_from = HashSet::new();
// naive request dispatcher: just give to any peer which says it will // naive request dispatcher: just give to any peer which says it will
// give us responses. // give us responses. but only one request per peer per state transition.
let dispatcher = move |req: HeadersRequest| { let dispatcher = move |req: HeadersRequest| {
rng.shuffle(&mut peer_ids); rng.shuffle(&mut peer_ids);
@ -521,9 +530,12 @@ impl<L: AsLightClient> LightSync<L> {
builder.build() builder.build()
}; };
for peer in &peer_ids { for peer in &peer_ids {
if requested_from.contains(peer) { continue }
match ctx.request_from(*peer, request.clone()) { match ctx.request_from(*peer, request.clone()) {
Ok(id) => { Ok(id) => {
self.pending_reqs.lock().insert(id.clone()); self.pending_reqs.lock().insert(id.clone());
requested_from.insert(peer.clone());
return Some(id) return Some(id)
} }
Err(NetError::NoCredits) => {} Err(NetError::NoCredits) => {}

View File

@ -32,6 +32,11 @@ pub struct MemoryLruCache<K: Eq + Hash, V: HeapSizeOf> {
max_size: usize, max_size: usize,
} }
// amount of memory used when the item will be put on the heap.
fn heap_size_of<T: HeapSizeOf>(val: &T) -> usize {
::std::mem::size_of::<T>() + val.heap_size_of_children()
}
impl<K: Eq + Hash, V: HeapSizeOf> MemoryLruCache<K, V> { impl<K: Eq + Hash, V: HeapSizeOf> MemoryLruCache<K, V> {
/// Create a new cache with a maximum size in bytes. /// Create a new cache with a maximum size in bytes.
pub fn new(max_size: usize) -> Self { pub fn new(max_size: usize) -> Self {
@ -52,15 +57,17 @@ impl<K: Eq + Hash, V: HeapSizeOf> MemoryLruCache<K, V> {
self.inner.set_capacity(cap * 2); self.inner.set_capacity(cap * 2);
} }
self.cur_size += heap_size_of(&val);
// account for any element displaced from the cache. // account for any element displaced from the cache.
if let Some(lru) = self.inner.insert(key, val) { if let Some(lru) = self.inner.insert(key, val) {
self.cur_size -= lru.heap_size_of_children(); self.cur_size -= heap_size_of(&lru);
} }
// remove elements until we are below the memory target. // remove elements until we are below the memory target.
while self.cur_size > self.max_size { while self.cur_size > self.max_size {
match self.inner.remove_lru() { match self.inner.remove_lru() {
Some((_, v)) => self.cur_size -= v.heap_size_of_children(), Some((_, v)) => self.cur_size -= heap_size_of(&v),
_ => break, _ => break,
} }
} }
@ -77,3 +84,27 @@ impl<K: Eq + Hash, V: HeapSizeOf> MemoryLruCache<K, V> {
self.cur_size self.cur_size
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let mut cache = MemoryLruCache::new(256);
let val1 = vec![0u8; 100];
let size1 = heap_size_of(&val1);
cache.insert("hello", val1);
assert_eq!(cache.current_size(), size1);
let val2 = vec![0u8; 210];
let size2 = heap_size_of(&val2);
cache.insert("world", val2);
assert!(cache.get_mut(&"hello").is_none());
assert!(cache.get_mut(&"world").is_some());
assert_eq!(cache.current_size(), size2);
}
}