Merge branch 'master' of github.com:ethcore/parity into thread

This commit is contained in:
arkpar
2016-02-29 18:11:59 +01:00
64 changed files with 3035 additions and 1103 deletions

View File

@@ -28,6 +28,31 @@ use service::*;
use client::BlockStatus;
use util::panics::*;
known_heap_size!(0, UnVerifiedBlock, VerifyingBlock, PreVerifiedBlock);
const MIN_MEM_LIMIT: usize = 16384;
const MIN_QUEUE_LIMIT: usize = 512;
/// Block queue configuration
#[derive(Debug)]
pub struct BlockQueueConfig {
/// Maximum number of blocks to keep in unverified queue.
/// When the limit is reached, is_full returns true.
pub max_queue_size: usize,
/// Maximum heap memory to use.
/// When the limit is reached, is_full returns true.
pub max_mem_use: usize,
}
impl Default for BlockQueueConfig {
fn default() -> Self {
BlockQueueConfig {
max_queue_size: 30000,
max_mem_use: 50 * 1024 * 1024,
}
}
}
/// Block queue status
#[derive(Debug)]
pub struct BlockQueueInfo {
@@ -37,6 +62,12 @@ pub struct BlockQueueInfo {
pub verified_queue_size: usize,
/// Number of blocks being verified
pub verifying_queue_size: usize,
/// Configured maximum number of blocks in the queue
pub max_queue_size: usize,
/// Configured maximum number of bytes to use
pub max_mem_use: usize,
/// Heap memory used in bytes
pub mem_used: usize,
}
impl BlockQueueInfo {
@@ -48,7 +79,8 @@ impl BlockQueueInfo {
/// Indicates that queue is full
pub fn is_full(&self) -> bool {
self.unverified_queue_size + self.verified_queue_size + self.verifying_queue_size > MAX_UNVERIFIED_QUEUE_SIZE
self.unverified_queue_size + self.verified_queue_size + self.verifying_queue_size > self.max_queue_size ||
self.mem_used > self.max_mem_use
}
/// Indicates that queue is empty
@@ -68,7 +100,9 @@ pub struct BlockQueue {
deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>,
empty: Arc<Condvar>,
processing: RwLock<HashSet<H256>>
processing: RwLock<HashSet<H256>>,
max_queue_size: usize,
max_mem_use: usize,
}
struct UnVerifiedBlock {
@@ -106,11 +140,9 @@ struct Verification {
bad: Mutex<HashSet<H256>>,
}
const MAX_UNVERIFIED_QUEUE_SIZE: usize = 50000;
impl BlockQueue {
/// Creates a new queue instance.
pub fn new(engine: Arc<Box<Engine>>, message_channel: IoChannel<NetSyncMessage>) -> BlockQueue {
pub fn new(config: BlockQueueConfig, engine: Arc<Box<Engine>>, message_channel: IoChannel<NetSyncMessage>) -> BlockQueue {
let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
@@ -154,6 +186,8 @@ impl BlockQueue {
deleting: deleting.clone(),
processing: RwLock::new(HashSet::new()),
empty: empty.clone(),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
}
}
@@ -297,19 +331,23 @@ impl BlockQueue {
}
/// Mark given block and all its children as bad. Stops verification.
pub fn mark_as_bad(&self, hash: &H256) {
pub fn mark_as_bad(&self, block_hashes: &[H256]) {
let mut verified_lock = self.verification.verified.lock().unwrap();
let mut verified = verified_lock.deref_mut();
let mut bad = self.verification.bad.lock().unwrap();
bad.insert(hash.clone());
self.processing.write().unwrap().remove(&hash);
let mut processing = self.processing.write().unwrap();
bad.reserve(block_hashes.len());
for hash in block_hashes {
bad.insert(hash.clone());
processing.remove(&hash);
}
let mut new_verified = VecDeque::new();
for block in verified.drain(..) {
if bad.contains(&block.header.parent_hash) {
bad.insert(block.header.hash());
self.processing.write().unwrap().remove(&block.header.hash());
}
else {
processing.remove(&block.header.hash());
} else {
new_verified.push_back(block);
}
}
@@ -317,10 +355,10 @@ impl BlockQueue {
}
/// Mark given block as processed
pub fn mark_as_good(&self, hashes: &[H256]) {
pub fn mark_as_good(&self, block_hashes: &[H256]) {
let mut processing = self.processing.write().unwrap();
for h in hashes {
processing.remove(&h);
for hash in block_hashes {
processing.remove(&hash);
}
}
@@ -342,12 +380,41 @@ impl BlockQueue {
/// Get queue status.
pub fn queue_info(&self) -> BlockQueueInfo {
let (unverified_len, unverified_bytes) = {
let v = self.verification.unverified.lock().unwrap();
(v.len(), v.heap_size_of_children())
};
let (verifying_len, verifying_bytes) = {
let v = self.verification.verifying.lock().unwrap();
(v.len(), v.heap_size_of_children())
};
let (verified_len, verified_bytes) = {
let v = self.verification.verified.lock().unwrap();
(v.len(), v.heap_size_of_children())
};
BlockQueueInfo {
unverified_queue_size: self.verification.unverified.lock().unwrap().len(),
verifying_queue_size: self.verification.verifying.lock().unwrap().len(),
verified_queue_size: self.verification.verified.lock().unwrap().len(),
unverified_queue_size: unverified_len,
verifying_queue_size: verifying_len,
verified_queue_size: verified_len,
max_queue_size: self.max_queue_size,
max_mem_use: self.max_mem_use,
mem_used:
unverified_bytes
+ verifying_bytes
+ verified_bytes
// TODO: https://github.com/servo/heapsize/pull/50
//+ self.processing.read().unwrap().heap_size_of_children(),
}
}
pub fn collect_garbage(&self) {
{
self.verification.unverified.lock().unwrap().shrink_to_fit();
self.verification.verifying.lock().unwrap().shrink_to_fit();
self.verification.verified.lock().unwrap().shrink_to_fit();
}
self.processing.write().unwrap().shrink_to_fit();
}
}
impl MayPanic for BlockQueue {
@@ -379,7 +446,7 @@ mod tests {
fn get_test_queue() -> BlockQueue {
let spec = get_test_spec();
let engine = spec.to_engine().unwrap();
BlockQueue::new(Arc::new(engine), IoChannel::disconnected())
BlockQueue::new(BlockQueueConfig::default(), Arc::new(engine), IoChannel::disconnected())
}
#[test]
@@ -387,7 +454,7 @@ mod tests {
// TODO better test
let spec = Spec::new_test();
let engine = spec.to_engine().unwrap();
let _ = BlockQueue::new(Arc::new(engine), IoChannel::disconnected());
let _ = BlockQueue::new(BlockQueueConfig::default(), Arc::new(engine), IoChannel::disconnected());
}
#[test]
@@ -443,4 +510,19 @@ mod tests {
assert!(queue.queue_info().is_empty());
}
#[test]
fn test_mem_limit() {
let spec = get_test_spec();
let engine = spec.to_engine().unwrap();
let mut config = BlockQueueConfig::default();
config.max_mem_use = super::MIN_MEM_LIMIT; // empty queue uses about 15000
let mut queue = BlockQueue::new(config, Arc::new(engine), IoChannel::disconnected());
assert!(!queue.queue_info().is_full());
let mut blocks = get_good_dummy_block_seq(50);
for b in blocks.drain(..) {
queue.import_block(b).unwrap();
}
assert!(queue.queue_info().is_full());
}
}