* light client fixes * fix memory-lru-cache * clear pending reqs on disconnect
This commit is contained in:
parent
06be7271aa
commit
a554b81f32
@ -405,6 +405,7 @@ impl HeaderChain {
|
||||
|
||||
match id {
|
||||
BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.clone()),
|
||||
BlockId::Hash(hash) if hash == self.genesis_hash() => { Some(self.genesis_header.clone()) }
|
||||
BlockId::Hash(hash) => load_from_db(hash),
|
||||
BlockId::Number(num) => {
|
||||
if self.best_block.read().number < num { return None }
|
||||
@ -781,4 +782,18 @@ mod tests {
|
||||
assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 10);
|
||||
assert!(chain.candidates.read().get(&100).is_some())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn genesis_header_available() {
|
||||
let spec = Spec::new_test();
|
||||
let genesis_header = spec.genesis_header();
|
||||
let db = make_db();
|
||||
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
|
||||
|
||||
let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header), cache.clone()).unwrap();
|
||||
|
||||
assert!(chain.block_header(BlockId::Earliest).is_some());
|
||||
assert!(chain.block_header(BlockId::Number(0)).is_some());
|
||||
assert!(chain.block_header(BlockId::Hash(genesis_header.hash())).is_some());
|
||||
}
|
||||
}
|
||||
|
@ -282,6 +282,7 @@ impl Client {
|
||||
let mut good = Vec::new();
|
||||
for verified_header in self.queue.drain(MAX) {
|
||||
let (num, hash) = (verified_header.number(), verified_header.hash());
|
||||
trace!(target: "client", "importing block {}", num);
|
||||
|
||||
if self.verify_full && !self.check_header(&mut bad, &verified_header) {
|
||||
continue
|
||||
@ -381,13 +382,17 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
// return true if should skip, false otherwise. may push onto bad if
|
||||
// return false if should skip, true otherwise. may push onto bad if
|
||||
// should skip.
|
||||
fn check_header(&self, bad: &mut Vec<H256>, verified_header: &Header) -> bool {
|
||||
let hash = verified_header.hash();
|
||||
let parent_header = match self.chain.block_header(BlockId::Hash(*verified_header.parent_hash())) {
|
||||
Some(header) => header,
|
||||
None => return false, // skip import of block with missing parent.
|
||||
None => {
|
||||
trace!(target: "client", "No parent for block ({}, {})",
|
||||
verified_header.number(), hash);
|
||||
return false // skip import of block with missing parent.
|
||||
}
|
||||
};
|
||||
|
||||
// Verify Block Family
|
||||
|
@ -806,6 +806,9 @@ impl LightProtocol {
|
||||
trace!(target: "pip", "Connected peer with chain head {:?}", (status.head_hash, status.head_num));
|
||||
|
||||
if (status.network_id, status.genesis_hash) != (self.network_id, self.genesis_hash) {
|
||||
trace!(target: "pip", "peer {} wrong network: network_id is {} vs our {}, gh is {} vs our {}",
|
||||
peer, status.network_id, self.network_id, status.genesis_hash, self.genesis_hash);
|
||||
|
||||
return Err(Error::WrongNetwork);
|
||||
}
|
||||
|
||||
|
@ -285,6 +285,13 @@ impl<L: AsLightClient + Send + Sync> Handler for LightSync<L> {
|
||||
best.clone()
|
||||
};
|
||||
|
||||
{
|
||||
let mut pending_reqs = self.pending_reqs.lock();
|
||||
for unfulfilled in unfulfilled {
|
||||
pending_reqs.remove(&unfulfilled);
|
||||
}
|
||||
}
|
||||
|
||||
if new_best.is_none() {
|
||||
debug!(target: "sync", "No peers remain. Reverting to idle");
|
||||
*self.state.lock() = SyncState::Idle;
|
||||
@ -503,10 +510,12 @@ impl<L: AsLightClient> LightSync<L> {
|
||||
None
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let mut rng = self.rng.lock();
|
||||
let mut requested_from = HashSet::new();
|
||||
|
||||
// naive request dispatcher: just give to any peer which says it will
|
||||
// give us responses.
|
||||
// give us responses. but only one request per peer per state transition.
|
||||
let dispatcher = move |req: HeadersRequest| {
|
||||
rng.shuffle(&mut peer_ids);
|
||||
|
||||
@ -521,9 +530,12 @@ impl<L: AsLightClient> LightSync<L> {
|
||||
builder.build()
|
||||
};
|
||||
for peer in &peer_ids {
|
||||
if requested_from.contains(peer) { continue }
|
||||
match ctx.request_from(*peer, request.clone()) {
|
||||
Ok(id) => {
|
||||
self.pending_reqs.lock().insert(id.clone());
|
||||
requested_from.insert(peer.clone());
|
||||
|
||||
return Some(id)
|
||||
}
|
||||
Err(NetError::NoCredits) => {}
|
||||
|
@ -32,6 +32,11 @@ pub struct MemoryLruCache<K: Eq + Hash, V: HeapSizeOf> {
|
||||
max_size: usize,
|
||||
}
|
||||
|
||||
// amount of memory used when the item will be put on the heap.
|
||||
fn heap_size_of<T: HeapSizeOf>(val: &T) -> usize {
|
||||
::std::mem::size_of::<T>() + val.heap_size_of_children()
|
||||
}
|
||||
|
||||
impl<K: Eq + Hash, V: HeapSizeOf> MemoryLruCache<K, V> {
|
||||
/// Create a new cache with a maximum size in bytes.
|
||||
pub fn new(max_size: usize) -> Self {
|
||||
@ -52,15 +57,17 @@ impl<K: Eq + Hash, V: HeapSizeOf> MemoryLruCache<K, V> {
|
||||
self.inner.set_capacity(cap * 2);
|
||||
}
|
||||
|
||||
self.cur_size += heap_size_of(&val);
|
||||
|
||||
// account for any element displaced from the cache.
|
||||
if let Some(lru) = self.inner.insert(key, val) {
|
||||
self.cur_size -= lru.heap_size_of_children();
|
||||
self.cur_size -= heap_size_of(&lru);
|
||||
}
|
||||
|
||||
// remove elements until we are below the memory target.
|
||||
while self.cur_size > self.max_size {
|
||||
match self.inner.remove_lru() {
|
||||
Some((_, v)) => self.cur_size -= v.heap_size_of_children(),
|
||||
Some((_, v)) => self.cur_size -= heap_size_of(&v),
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
@ -77,3 +84,27 @@ impl<K: Eq + Hash, V: HeapSizeOf> MemoryLruCache<K, V> {
|
||||
self.cur_size
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn it_works() {
|
||||
let mut cache = MemoryLruCache::new(256);
|
||||
let val1 = vec![0u8; 100];
|
||||
let size1 = heap_size_of(&val1);
|
||||
cache.insert("hello", val1);
|
||||
|
||||
assert_eq!(cache.current_size(), size1);
|
||||
|
||||
let val2 = vec![0u8; 210];
|
||||
let size2 = heap_size_of(&val2);
|
||||
cache.insert("world", val2);
|
||||
|
||||
assert!(cache.get_mut(&"hello").is_none());
|
||||
assert!(cache.get_mut(&"world").is_some());
|
||||
|
||||
assert_eq!(cache.current_size(), size2);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user