Merge remote-tracking branch 'parity/master' into auth-round-no-mocknet

This commit is contained in:
keorn
2016-11-02 13:01:31 +00:00
242 changed files with 3270 additions and 3967 deletions

View File

@@ -419,8 +419,8 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
let thread = thread::spawn(move || {
let p = panic.clone();
panic.catch_panic(move || {
IoManager::<Message>::start(p, &mut event_loop, h).unwrap();
}).unwrap()
IoManager::<Message>::start(p, &mut event_loop, h).expect("Error starting IO service");
}).expect("Error starting panic handler")
});
Ok(IoService {
panic_handler: panic_handler,
@@ -454,7 +454,11 @@ impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
self.thread.take().unwrap().join().ok();
if let Some(thread) = self.thread.take() {
thread.join().unwrap_or_else(|e| {
debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e);
});
}
trace!(target: "shutdown", "[IoService] Closed.");
}
}

View File

@@ -15,7 +15,6 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::mem;
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crossbeam::sync::chase_lev;
@@ -81,7 +80,7 @@ impl Worker {
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
panic_handler.catch_panic(move || {
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
}).unwrap()
}).expect("Error starting panic handler")
})
.expect("Error creating worker thread"));
worker
@@ -94,7 +93,7 @@ impl Worker {
where Message: Send + Sync + Clone + 'static {
loop {
{
let lock = wait_mutex.lock().unwrap();
let lock = wait_mutex.lock().expect("Poisoned work_loop mutex");
if deleting.load(AtomicOrdering::Acquire) {
return;
}
@@ -134,11 +133,12 @@ impl Worker {
impl Drop for Worker {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoWorker] Closing...");
let _ = self.wait_mutex.lock().unwrap();
let _ = self.wait_mutex.lock().expect("Poisoned work_loop mutex");
self.deleting.store(true, AtomicOrdering::Release);
self.wait.notify_all();
let thread = mem::replace(&mut self.thread, None).unwrap();
thread.join().ok();
if let Some(thread) = self.thread.take() {
thread.join().ok();
}
trace!(target: "shutdown", "[IoWorker] Closed");
}
}

View File

@@ -125,11 +125,11 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
/// Writable IO handler. Called when the socket is ready to send.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone + Sync + 'static {
if self.send_queue.is_empty() {
return Ok(WriteStatus::Complete)
}
{
let buf = self.send_queue.front_mut().unwrap();
let buf = match self.send_queue.front_mut() {
Some(buf) => buf,
None => return Ok(WriteStatus::Complete),
};
let send_size = buf.get_ref().len();
let pos = buf.position() as usize;
if (pos as usize) >= send_size {
@@ -439,7 +439,7 @@ impl EncryptedConnection {
let mut prev = H128::new();
mac.clone().finalize(&mut prev);
let mut enc = H128::new();
mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).unwrap();
mac_encoder.encrypt(&mut RefReadBuffer::new(&prev), &mut RefWriteBuffer::new(&mut enc), true).expect("Error updating MAC");
mac_encoder.reset();
enc = enc ^ if seed.is_empty() { prev } else { H128::from_slice(seed) };

View File

@@ -130,7 +130,7 @@ impl Discovery {
/// Add a new node to discovery table. Pings the node.
pub fn add_node(&mut self, e: NodeEntry) {
if e.endpoint.is_allowed(self.allow_ips) {
if self.is_allowed(&e) {
let endpoint = e.endpoint.clone();
self.update_node(e);
self.ping(&endpoint);
@@ -146,7 +146,7 @@ impl Discovery {
/// Add a list of known nodes to the table.
pub fn init_node_list(&mut self, mut nodes: Vec<NodeEntry>) {
for n in nodes.drain(..) {
if n.endpoint.is_allowed(self.allow_ips) {
if self.is_allowed(&n) {
self.update_node(n);
}
}
@@ -156,7 +156,7 @@ impl Discovery {
trace!(target: "discovery", "Inserting {:?}", &e);
let id_hash = e.id.sha3();
let ping = {
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id_hash) as usize).unwrap();
let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &id_hash) as usize];
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
node.address = e.clone();
node.timeout = None;
@@ -169,8 +169,9 @@ impl Discovery {
if bucket.nodes.len() > BUCKET_SIZE {
//ping least active node
bucket.nodes.back_mut().unwrap().timeout = Some(time::precise_time_ns());
Some(bucket.nodes.back().unwrap().address.endpoint.clone())
let mut last = bucket.nodes.back_mut().expect("Last item is always present when len() > 0");
last.timeout = Some(time::precise_time_ns());
Some(last.address.endpoint.clone())
} else { None }
};
if let Some(endpoint) = ping {
@@ -179,7 +180,7 @@ impl Discovery {
}
fn clear_ping(&mut self, id: &NodeId) {
let mut bucket = self.node_buckets.get_mut(Discovery::distance(&self.id_hash, &id.sha3()) as usize).unwrap();
let mut bucket = &mut self.node_buckets[Discovery::distance(&self.id_hash, &id.sha3()) as usize];
if let Some(node) = bucket.nodes.iter_mut().find(|n| &n.address.id == id) {
node.timeout = None;
}
@@ -294,7 +295,7 @@ impl Discovery {
if count == BUCKET_SIZE {
// delete the most distant element
let remove = {
let (key, last) = found.iter_mut().next_back().unwrap();
let (key, last) = found.iter_mut().next_back().expect("Last element is always Some when count > 0");
last.pop();
if last.is_empty() { Some(key.clone()) } else { None }
};
@@ -316,8 +317,7 @@ impl Discovery {
}
pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
while !self.send_queue.is_empty() {
let data = self.send_queue.pop_front().unwrap();
while let Some(data) = self.send_queue.pop_front() {
match self.udp_socket.send_to(&data.payload, &data.address) {
Ok(Some(size)) if size == data.payload.len() => {
},
@@ -399,6 +399,10 @@ impl Discovery {
Ok(())
}
fn is_allowed(&self, entry: &NodeEntry) -> bool {
entry.endpoint.is_allowed(self.allow_ips) && entry.id != self.id
}
fn on_ping(&mut self, rlp: &UntrustedRlp, node: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
trace!(target: "discovery", "Got Ping from {:?}", &from);
let source = try!(NodeEndpoint::from_rlp(&try!(rlp.at(1))));
@@ -409,7 +413,7 @@ impl Discovery {
let entry = NodeEntry { id: node.clone(), endpoint: source.clone() };
if !entry.endpoint.is_valid() {
debug!(target: "discovery", "Got bad address: {:?}", entry);
} else if !entry.endpoint.is_allowed(self.allow_ips) {
} else if !self.is_allowed(&entry) {
debug!(target: "discovery", "Address not allowed: {:?}", entry);
} else {
self.update_node(entry.clone());
@@ -484,15 +488,15 @@ impl Discovery {
debug!(target: "discovery", "Bad address: {:?}", endpoint);
continue;
}
if !endpoint.is_allowed(self.allow_ips) {
debug!(target: "discovery", "Address not allowed: {:?}", endpoint);
continue;
}
let node_id: NodeId = try!(r.val_at(3));
if node_id == self.id {
continue;
}
let entry = NodeEntry { id: node_id.clone(), endpoint: endpoint };
if !self.is_allowed(&entry) {
debug!(target: "discovery", "Address not allowed: {:?}", entry);
continue;
}
added.insert(node_id, entry.clone());
self.ping(&entry.endpoint);
self.update_node(entry);

View File

@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::net::SocketAddr;
use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
@@ -48,6 +48,8 @@ type Slab<T> = ::slab::Slab<T, usize>;
const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 1024;
const DEFAULT_PORT: u16 = 30303;
// Tokens
const TCP_ACCEPT: usize = SYS_TIMER + 1;
const IDLE: usize = SYS_TIMER + 2;
@@ -135,14 +137,14 @@ impl NetworkConfiguration {
/// Create new default configuration with sepcified listen port.
pub fn new_with_port(port: u16) -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap());
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)));
config
}
/// Create new default configuration for localhost-only connection with random port (usefull for testing)
pub fn new_local() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_address = Some(SocketAddr::from_str("127.0.0.1:0").unwrap());
config.listen_address = Some(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)));
config.nat_enabled = false;
config
}
@@ -259,7 +261,7 @@ impl<'s> NetworkContext<'s> {
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError> {
assert!(self.session.is_some(), "Respond called without network context");
self.send(self.session_id.unwrap(), packet_id, data)
self.session_id.map_or_else(|| Err(NetworkError::Expired), |id| self.send(id, packet_id, data))
}
/// Get an IoChannel.
@@ -382,16 +384,16 @@ impl Host {
trace!(target: "host", "Creating new Host object");
let mut listen_address = match config.listen_address {
None => SocketAddr::from_str("0.0.0.0:30304").unwrap(),
None => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), DEFAULT_PORT)),
Some(addr) => addr,
};
let keys = if let Some(ref secret) = config.use_secret {
KeyPair::from_secret(secret.clone()).unwrap()
try!(KeyPair::from_secret(secret.clone()))
} else {
config.config_path.clone().and_then(|ref p| load_key(Path::new(&p)))
.map_or_else(|| {
let key = Random.generate().unwrap();
let key = Random.generate().expect("Error generating random key pair");
if let Some(path) = config.config_path.clone() {
save_key(Path::new(&path), key.secret());
}
@@ -488,7 +490,7 @@ impl Host {
let mut s = e.lock();
{
let id = s.id();
if id.is_some() && reserved.contains(id.unwrap()) {
if id.map_or(false, |id| reserved.contains(id)) {
continue;
}
}
@@ -634,14 +636,14 @@ impl Host {
}
fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) {
let (min_peers, mut pin, max_handshakes, allow_ips) = {
let (min_peers, mut pin, max_handshakes, allow_ips, self_id) = {
let info = self.info.read();
if info.capabilities.is_empty() {
return;
}
let config = &info.config;
(config.min_peers, config.non_reserved_mode == NonReservedPeerMode::Deny, config.max_handshakes as usize, config.allow_ips)
(config.min_peers, config.non_reserved_mode == NonReservedPeerMode::Deny, config.max_handshakes as usize, config.allow_ips, info.id().clone())
};
let session_count = self.session_count();
@@ -672,7 +674,7 @@ impl Host {
let max_handshakes_per_round = max_handshakes / 2;
let mut started: usize = 0;
for id in nodes.filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
for id in nodes.filter(|id| !self.have_session(id) && !self.connecting_to(id) && *id != self_id)
.take(min(max_handshakes_per_round, max_handshakes - handshake_count)) {
self.connect_peer(&id, io);
started += 1;
@@ -814,11 +816,12 @@ impl Host {
(info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
};
// Check for the session limit. session_counts accounts for the new session.
if reserved_only ||
(s.info.originated && session_count >= min_peers) ||
(!s.info.originated && session_count >= max_peers) {
(s.info.originated && session_count > min_peers) ||
(!s.info.originated && session_count > max_peers) {
// only proceed if the connecting peer is reserved.
if !self.reserved_nodes.read().contains(s.id().unwrap()) {
if !self.reserved_nodes.read().contains(s.id().expect("Ready session always has id")) {
s.disconnect(io, DisconnectReason::TooManyPeers);
return;
}
@@ -827,7 +830,7 @@ impl Host {
// Add it to the node table
if !s.info.originated {
if let Ok(address) = s.remote_addr() {
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
let entry = NodeEntry { id: s.id().expect("Ready session always has id").clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
self.nodes.write().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.lock();
if let Some(ref mut discovery) = *discovery {
@@ -861,15 +864,17 @@ impl Host {
}
let handlers = self.handlers.read();
for p in ready_data {
let h = handlers.get(&p).unwrap().clone();
self.stats.inc_sessions();
let reserved = self.reserved_nodes.read();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
if let Some(h) = handlers.get(&p).clone() {
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
}
}
for (p, packet_id, data) in packet_data {
let h = handlers.get(&p).unwrap().clone();
let reserved = self.reserved_nodes.read();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
if let Some(h) = handlers.get(&p).clone() {
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
}
}
}
@@ -909,9 +914,10 @@ impl Host {
}
}
for p in to_disconnect {
let h = self.handlers.read().get(&p).unwrap().clone();
let reserved = self.reserved_nodes.read();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
if let Some(h) = self.handlers.read().get(&p).clone() {
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
}
if deregister {
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
@@ -975,7 +981,7 @@ impl IoHandler<NetworkIoMessage> for Host {
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => {
let node_changes = { self.discovery.lock().as_mut().unwrap().readable(io) };
let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.readable(io)) };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
@@ -992,7 +998,7 @@ impl IoHandler<NetworkIoMessage> for Host {
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
DISCOVERY => {
self.discovery.lock().as_mut().unwrap().writable(io);
self.discovery.lock().as_mut().map(|d| d.writable(io));
}
_ => panic!("Received unknown writable token"),
}
@@ -1006,11 +1012,11 @@ impl IoHandler<NetworkIoMessage> for Host {
IDLE => self.maintain_network(io),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
DISCOVERY_REFRESH => {
self.discovery.lock().as_mut().unwrap().refresh();
self.discovery.lock().as_mut().map(|d| d.refresh());
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
DISCOVERY_ROUND => {
let node_changes = { self.discovery.lock().as_mut().unwrap().round() };
let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.round()) };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
@@ -1102,7 +1108,7 @@ impl IoHandler<NetworkIoMessage> for Host {
session.lock().register_socket(reg, event_loop).expect("Error registering socket");
}
}
DISCOVERY => self.discovery.lock().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(event_loop).ok()).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration")
}
@@ -1130,7 +1136,7 @@ impl IoHandler<NetworkIoMessage> for Host {
connection.lock().update_socket(reg, event_loop).expect("Error updating socket");
}
}
DISCOVERY => self.discovery.lock().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(event_loop).ok()).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update")
}
@@ -1200,7 +1206,7 @@ fn key_save_load() {
#[test]
fn host_client_url() {
let mut config = NetworkConfiguration::new();
let mut config = NetworkConfiguration::new_local();
let key = "6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2".into();
config.use_secret = Some(key);
let host: Host = Host::new(config, Arc::new(NetworkStats::new())).unwrap();

View File

@@ -280,7 +280,7 @@ impl NodeTable {
json.push_str("\"nodes\": [\n");
let node_ids = self.nodes(AllowIP::All);
for i in 0 .. node_ids.len() {
let node = self.nodes.get(&node_ids[i]).unwrap();
let node = self.nodes.get(&node_ids[i]).expect("self.nodes() only returns node IDs from self.nodes");
json.push_str(&format!("\t{{ \"url\": \"{}\", \"failures\": {} }}{}\n", node, node.failures, if i == node_ids.len() - 1 {""} else {","}))
}
json.push_str("]\n");

View File

@@ -388,8 +388,9 @@ impl Session {
Ok(SessionData::Continue)
},
PACKET_PONG => {
self.pong_time_ns = Some(time::precise_time_ns());
self.info.ping_ms = Some((self.pong_time_ns.unwrap() - self.ping_time_ns) / 1000_000);
let time = time::precise_time_ns();
self.pong_time_ns = Some(time);
self.info.ping_ms = Some((time - self.ping_time_ns) / 1000_000);
Ok(SessionData::Continue)
},
PACKET_GET_PEERS => Ok(SessionData::None), //TODO;

View File

@@ -312,6 +312,8 @@ impl Database {
opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
let mut cf_options = Vec::with_capacity(config.columns.unwrap_or(0) as usize);
let cfnames: Vec<_> = (0..config.columns.unwrap_or(0)).map(|c| format!("col{}", c)).collect();
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
for col in 0 .. config.columns.unwrap_or(0) {
let mut opts = Options::new();
@@ -342,8 +344,6 @@ impl Database {
let mut cfs: Vec<Column> = Vec::new();
let db = match config.columns {
Some(columns) => {
let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect();
let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
match DB::open_cf(&opts, path, &cfnames, &cf_options) {
Ok(db) => {
cfs = cfnames.iter().map(|n| db.cf_handle(n)
@@ -365,13 +365,18 @@ impl Database {
},
None => DB::open(&opts, path)
};
let db = match db {
Ok(db) => db,
Err(ref s) if s.starts_with("Corruption:") => {
info!("{}", s);
info!("Attempting DB repair for {}", path);
try!(DB::repair(&opts, path));
try!(DB::open(&opts, path))
match cfnames.is_empty() {
true => try!(DB::open(&opts, path)),
false => try!(DB::open_cf(&opts, path, &cfnames, &cf_options))
}
},
Err(s) => { return Err(s); }
};

View File

@@ -144,6 +144,7 @@ pub mod semantic_version;
pub mod log;
pub mod path;
pub mod snappy;
pub mod stats;
pub mod cache;
mod timer;

View File

@@ -251,6 +251,7 @@ impl Manager {
let mut cur_db = Arc::new(try!(Database::open(&db_config, old_path_str).map_err(Error::Custom)));
for migration in migrations {
trace!(target: "migration", "starting migration to version {}", migration.version());
// Change number of columns in new db
let current_columns = db_config.columns;
db_config.columns = migration.columns();

70
util/src/stats.rs Normal file
View File

@@ -0,0 +1,70 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Statistical functions.
use bigint::uint::*;
/// Discretised histogram.
#[derive(Debug, PartialEq)]
pub struct Histogram {
/// Bounds of each bucket.
pub bucket_bounds: Vec<U256>,
/// Count within each bucket.
pub counts: Vec<u64>
}
impl Histogram {
/// Histogram if a sorted corpus is at least fills the buckets.
pub fn new(corpus: &[U256], bucket_number: usize) -> Option<Histogram> {
if corpus.len() < bucket_number { return None; }
let corpus_end = corpus.last().expect("there are at least bucket_number elements; qed").clone();
// If there are extremely few transactions, go from zero.
let corpus_start = corpus.first().expect("there are at least bucket_number elements; qed").clone();
let bucket_size = (corpus_end - corpus_start + 1.into()) / bucket_number.into();
let mut bucket_end = corpus_start + bucket_size;
let mut bucket_bounds = vec![corpus_start; bucket_number + 1];
let mut counts = vec![0; bucket_number];
let mut corpus_i = 0;
// Go through the corpus adding to buckets.
for bucket in 0..bucket_number {
while corpus[corpus_i] < bucket_end {
counts[bucket] += 1;
corpus_i += 1;
}
bucket_bounds[bucket + 1] = bucket_end;
bucket_end = bucket_end + bucket_size;
}
Some(Histogram { bucket_bounds: bucket_bounds, counts: counts })
}
}
#[cfg(test)]
mod tests {
use bigint::uint::U256;
use super::Histogram;
#[test]
fn check_histogram() {
let hist = Histogram::new(&vec_into![643,689,1408,2000,2296,2512,4250,4320,4842,4958,5804,6065,6098,6354,7002,7145,7845,8589,8593,8895], 5).unwrap();
let correct_bounds: Vec<U256> = vec_into![643,2293,3943,5593,7243,8893];
assert_eq!(Histogram { bucket_bounds: correct_bounds, counts: vec![4,2,4,6,3] }, hist);
assert!(Histogram::new(&vec_into![1, 2], 5).is_none());
}
}