Merge branch 'master' of github.com:ethcore/parity into fatdb

This commit is contained in:
debris
2016-07-01 10:17:08 +02:00
80 changed files with 1724 additions and 886 deletions

View File

@@ -132,15 +132,10 @@ macro_rules! impl_hash {
$size
}
// TODO: remove once slice::clone_from_slice is stable
#[inline]
fn clone_from_slice(&mut self, src: &[u8]) -> usize {
let min = ::std::cmp::min($size, src.len());
let dst = &mut self.deref_mut()[.. min];
let src = &src[.. min];
for i in 0..min {
dst[i] = src[i];
}
let min = cmp::min($size, src.len());
self.0[..min].copy_from_slice(&src[..min]);
min
}
@@ -151,7 +146,7 @@ macro_rules! impl_hash {
}
fn copy_to(&self, dest: &mut[u8]) {
let min = ::std::cmp::min($size, dest.len());
let min = cmp::min($size, dest.len());
dest[..min].copy_from_slice(&self.0[..min]);
}

View File

@@ -135,8 +135,9 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
}
/// Broadcast a message to other IO clients
pub fn message(&self, message: Message) {
self.channel.send(message).expect("Error seding message");
pub fn message(&self, message: Message) -> Result<(), UtilError> {
try!(self.channel.send(message));
Ok(())
}
/// Get message channel
@@ -351,7 +352,9 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop
pub fn start() -> Result<IoService<Message>, UtilError> {
let panic_handler = PanicHandler::new_in_arc();
let mut event_loop = EventLoop::new().unwrap();
let mut config = EventLoopConfig::new();
config.messages_per_tick(1024);
let mut event_loop = EventLoop::configured(config).expect("Error creating event loop");
let channel = event_loop.channel();
let panic = panic_handler.clone();
let thread = thread::spawn(move || {
@@ -390,7 +393,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
fn drop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
self.host_channel.send(IoMessage::Shutdown).unwrap();
self.host_channel.send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
self.thread.take().unwrap().join().ok();
trace!(target: "shutdown", "[IoService] Closed.");
}

View File

@@ -48,13 +48,8 @@ pub struct ArchiveDB {
impl ArchiveDB {
/// Create a new instance from file
pub fn new(path: &str, cache_size: Option<usize>) -> ArchiveDB {
let opts = DatabaseConfig {
// this must match account_db prefix
prefix_size: Some(DB_PREFIX_LEN),
max_open_files: 256,
cache_size: cache_size,
};
pub fn new(path: &str, config: DatabaseConfig) -> ArchiveDB {
let opts = config.prefix(DB_PREFIX_LEN);
let backing = Database::open(&opts, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e);
});
@@ -80,7 +75,7 @@ impl ArchiveDB {
fn new_temp() -> ArchiveDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
Self::new(dir.to_str().unwrap(), None)
Self::new(dir.to_str().unwrap(), DatabaseConfig::default())
}
fn payload(&self, key: &H256) -> Option<Bytes> {
@@ -222,6 +217,7 @@ mod tests {
use super::*;
use hashdb::*;
use journaldb::traits::JournalDB;
use kvdb::DatabaseConfig;
#[test]
fn insert_same_in_fork() {
@@ -363,7 +359,7 @@ mod tests {
let bar = H256::random();
let foo = {
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), None);
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
jdb.emplace(bar.clone(), b"bar".to_vec());
@@ -372,13 +368,13 @@ mod tests {
};
{
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), None);
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
}
{
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), None);
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
@@ -391,7 +387,7 @@ mod tests {
dir.push(H32::random().hex());
let foo = {
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), None);
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
@@ -405,7 +401,7 @@ mod tests {
};
{
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), None);
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.remove(&foo);
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
assert!(jdb.contains(&foo));
@@ -420,7 +416,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let (foo, _, _) = {
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), None);
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
@@ -435,7 +431,7 @@ mod tests {
};
{
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), None);
let mut jdb = ArchiveDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.contains(&foo));
}
@@ -446,14 +442,14 @@ mod tests {
let temp = ::devtools::RandomTempPath::new();
let key = {
let mut jdb = ArchiveDB::new(temp.as_str(), None);
let mut jdb = ArchiveDB::new(temp.as_str(), DatabaseConfig::default());
let key = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
key
};
{
let jdb = ArchiveDB::new(temp.as_str(), None);
let jdb = ArchiveDB::new(temp.as_str(), DatabaseConfig::default());
let state = jdb.state(&key);
assert!(state.is_some());
}

View File

@@ -73,13 +73,8 @@ const PADDING : [u8; 10] = [ 0u8; 10 ];
impl EarlyMergeDB {
/// Create a new instance from file
pub fn new(path: &str, cache_size: Option<usize>) -> EarlyMergeDB {
let opts = DatabaseConfig {
// this must match account_db prefix
prefix_size: Some(DB_PREFIX_LEN),
max_open_files: 256,
cache_size: cache_size,
};
pub fn new(path: &str, config: DatabaseConfig) -> EarlyMergeDB {
let opts = config.prefix(DB_PREFIX_LEN);
let backing = Database::open(&opts, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e);
});
@@ -107,7 +102,7 @@ impl EarlyMergeDB {
fn new_temp() -> EarlyMergeDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
Self::new(dir.to_str().unwrap(), None)
Self::new(dir.to_str().unwrap(), DatabaseConfig::default())
}
fn morph_key(key: &H256, index: u8) -> Bytes {
@@ -537,6 +532,7 @@ mod tests {
use super::super::traits::JournalDB;
use hashdb::*;
use log::init_log;
use kvdb::DatabaseConfig;
#[test]
fn insert_same_in_fork() {
@@ -714,7 +710,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
@@ -742,7 +738,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
@@ -770,7 +766,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
@@ -808,7 +804,7 @@ mod tests {
let bar = H256::random();
let foo = {
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
jdb.emplace(bar.clone(), b"bar".to_vec());
@@ -818,14 +814,14 @@ mod tests {
};
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
@@ -840,7 +836,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 4
let foo = jdb.insert(b"foo");
@@ -869,7 +865,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 4
let foo = jdb.insert(b"foo");
@@ -918,7 +914,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
@@ -949,7 +945,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
@@ -989,7 +985,7 @@ mod tests {
let foo = b"foo".sha3();
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
@@ -1010,7 +1006,7 @@ mod tests {
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.remove(&foo);
jdb.commit(4, &b"4".sha3(), Some((2, b"2".sha3()))).unwrap();
@@ -1018,14 +1014,14 @@ mod tests {
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(5, &b"5".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
}; { let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(6, &b"6".sha3(), Some((4, b"4".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
@@ -1038,7 +1034,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let (foo, bar, baz) = {
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
@@ -1056,7 +1052,7 @@ mod tests {
};
{
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), None);
let mut jdb = EarlyMergeDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));

View File

@@ -17,6 +17,7 @@
//! `JournalDB` interface and implementation.
use common::*;
use kvdb::DatabaseConfig;
/// Export the journaldb module.
pub mod traits;
@@ -71,12 +72,12 @@ impl fmt::Display for Algorithm {
}
/// Create a new `JournalDB` trait object.
pub fn new(path: &str, algorithm: Algorithm, cache_size: Option<usize>) -> Box<JournalDB> {
pub fn new(path: &str, algorithm: Algorithm, config: DatabaseConfig) -> Box<JournalDB> {
match algorithm {
Algorithm::Archive => Box::new(archivedb::ArchiveDB::new(path, cache_size)),
Algorithm::EarlyMerge => Box::new(earlymergedb::EarlyMergeDB::new(path, cache_size)),
Algorithm::OverlayRecent => Box::new(overlayrecentdb::OverlayRecentDB::new(path, cache_size)),
Algorithm::RefCounted => Box::new(refcounteddb::RefCountedDB::new(path, cache_size)),
Algorithm::Archive => Box::new(archivedb::ArchiveDB::new(path, config)),
Algorithm::EarlyMerge => Box::new(earlymergedb::EarlyMergeDB::new(path, config)),
Algorithm::OverlayRecent => Box::new(overlayrecentdb::OverlayRecentDB::new(path, config)),
Algorithm::RefCounted => Box::new(refcounteddb::RefCountedDB::new(path, config)),
}
}

View File

@@ -98,18 +98,13 @@ const PADDING : [u8; 10] = [ 0u8; 10 ];
impl OverlayRecentDB {
/// Create a new instance from file
pub fn new(path: &str, cache_size: Option<usize>) -> OverlayRecentDB {
Self::from_prefs(path, cache_size)
pub fn new(path: &str, config: DatabaseConfig) -> OverlayRecentDB {
Self::from_prefs(path, config)
}
/// Create a new instance from file
pub fn from_prefs(path: &str, cache_size: Option<usize>) -> OverlayRecentDB {
let opts = DatabaseConfig {
// this must match account_db prefix
prefix_size: Some(DB_PREFIX_LEN),
max_open_files: 256,
cache_size: cache_size,
};
pub fn from_prefs(path: &str, config: DatabaseConfig) -> OverlayRecentDB {
let opts = config.prefix(DB_PREFIX_LEN);
let backing = Database::open(&opts, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e);
});
@@ -135,7 +130,7 @@ impl OverlayRecentDB {
pub fn new_temp() -> OverlayRecentDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
Self::new(dir.to_str().unwrap(), None)
Self::new(dir.to_str().unwrap(), DatabaseConfig::default())
}
#[cfg(test)]
@@ -369,6 +364,7 @@ mod tests {
use hashdb::*;
use log::init_log;
use journaldb::JournalDB;
use kvdb::DatabaseConfig;
#[test]
fn insert_same_in_fork() {
@@ -526,7 +522,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
@@ -554,7 +550,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
@@ -582,7 +578,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(0, &b"0".sha3(), None).unwrap();
assert!(jdb.can_reconstruct_refs());
@@ -620,7 +616,7 @@ mod tests {
let bar = H256::random();
let foo = {
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
jdb.emplace(bar.clone(), b"bar".to_vec());
@@ -630,14 +626,14 @@ mod tests {
};
{
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.remove(&foo);
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
}
{
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
assert!(jdb.contains(&foo));
assert!(jdb.contains(&bar));
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
@@ -652,7 +648,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 4
let foo = jdb.insert(b"foo");
@@ -681,7 +677,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 4
let foo = jdb.insert(b"foo");
@@ -730,7 +726,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
@@ -761,7 +757,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 4
let foo = jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
@@ -801,7 +797,7 @@ mod tests {
let foo = b"foo".sha3();
{
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
jdb.insert(b"foo");
jdb.commit(0, &b"0".sha3(), None).unwrap();
@@ -822,7 +818,7 @@ mod tests {
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
}; { let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.remove(&foo);
jdb.commit(4, &b"4".sha3(), Some((2, b"2".sha3()))).unwrap();
@@ -830,14 +826,14 @@ mod tests {
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
}; { let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(5, &b"5".sha3(), Some((3, b"3".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));
// incantation to reopen the db
}; { let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
}; { let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(6, &b"6".sha3(), Some((4, b"4".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
@@ -850,7 +846,7 @@ mod tests {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
let (foo, bar, baz) = {
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
// history is 1
let foo = jdb.insert(b"foo");
let bar = jdb.insert(b"bar");
@@ -868,7 +864,7 @@ mod tests {
};
{
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), None);
let mut jdb = OverlayRecentDB::new(dir.to_str().unwrap(), DatabaseConfig::default());
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
assert!(jdb.can_reconstruct_refs());
assert!(jdb.contains(&foo));

View File

@@ -46,13 +46,8 @@ const PADDING : [u8; 10] = [ 0u8; 10 ];
impl RefCountedDB {
/// Create a new instance given a `backing` database.
pub fn new(path: &str, cache_size: Option<usize>) -> RefCountedDB {
let opts = DatabaseConfig {
// this must match account_db prefix
prefix_size: Some(DB_PREFIX_LEN),
max_open_files: 256,
cache_size: cache_size,
};
pub fn new(path: &str, config: DatabaseConfig) -> RefCountedDB {
let opts = config.prefix(DB_PREFIX_LEN);
let backing = Database::open(&opts, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e);
});
@@ -82,7 +77,7 @@ impl RefCountedDB {
fn new_temp() -> RefCountedDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
Self::new(dir.to_str().unwrap(), None)
Self::new(dir.to_str().unwrap(), DatabaseConfig::default())
}
}

View File

@@ -20,8 +20,6 @@ use std::default::Default;
use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBVector, DBIterator,
IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction};
const DB_FILE_SIZE_BASE: u64 = 16 * 1024 * 1024;
const DB_FILE_SIZE_MULTIPLIER: i32 = 5;
const DB_BACKGROUND_FLUSHES: i32 = 2;
const DB_BACKGROUND_COMPACTIONS: i32 = 2;
@@ -53,6 +51,36 @@ impl DBTransaction {
}
}
/// Compaction profile for the database settings
pub struct CompactionProfile {
/// L0-L1 target file size
pub initial_file_size: u64,
/// L2-LN target file size multiplier
pub file_size_multiplier: i32,
/// rate limiter for background flushes and compactions, bytes/sec, if any
pub write_rate_limit: Option<u64>,
}
impl CompactionProfile {
/// Default profile suitable for most storage
pub fn default() -> CompactionProfile {
CompactionProfile {
initial_file_size: 32 * 1024 * 1024,
file_size_multiplier: 2,
write_rate_limit: None,
}
}
/// Slow hdd compaction profile
pub fn hdd() -> CompactionProfile {
CompactionProfile {
initial_file_size: 192 * 1024 * 1024,
file_size_multiplier: 1,
write_rate_limit: Some(8 * 1024 * 1024),
}
}
}
/// Database configuration
pub struct DatabaseConfig {
/// Optional prefix size in bytes. Allows lookup by partial key.
@@ -61,6 +89,8 @@ pub struct DatabaseConfig {
pub max_open_files: i32,
/// Cache-size
pub cache_size: Option<usize>,
/// Compaction profile
pub compaction: CompactionProfile,
}
impl DatabaseConfig {
@@ -69,9 +99,22 @@ impl DatabaseConfig {
DatabaseConfig {
cache_size: Some(cache_size),
prefix_size: None,
max_open_files: -1,
max_open_files: 256,
compaction: CompactionProfile::default(),
}
}
/// Modify the compaction profile
pub fn compaction(mut self, profile: CompactionProfile) -> Self {
self.compaction = profile;
self
}
/// Modify the prefix of the db
pub fn prefix(mut self, prefix_size: usize) -> Self {
self.prefix_size = Some(prefix_size);
self
}
}
impl Default for DatabaseConfig {
@@ -79,7 +122,8 @@ impl Default for DatabaseConfig {
DatabaseConfig {
cache_size: None,
prefix_size: None,
max_open_files: -1,
max_open_files: 256,
compaction: CompactionProfile::default(),
}
}
}
@@ -111,13 +155,18 @@ impl Database {
/// Open database file. Creates if it does not exist.
pub fn open(config: &DatabaseConfig, path: &str) -> Result<Database, String> {
let mut opts = Options::new();
try!(opts.set_parsed_options("rate_limiter_bytes_per_sec=256000000"));
if let Some(rate_limit) = config.compaction.write_rate_limit {
try!(opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)));
}
opts.set_max_open_files(config.max_open_files);
opts.create_if_missing(true);
opts.set_use_fsync(false);
// compaction settings
opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
opts.set_target_file_size_base(DB_FILE_SIZE_BASE);
opts.set_target_file_size_multiplier(DB_FILE_SIZE_MULTIPLIER);
opts.set_target_file_size_base(config.compaction.initial_file_size);
opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
opts.set_max_background_flushes(DB_BACKGROUND_FLUSHES);
opts.set_max_background_compactions(DB_BACKGROUND_COMPACTIONS);
if let Some(cache_size) = config.cache_size {
@@ -150,7 +199,16 @@ impl Database {
opts.set_block_based_table_factory(&block_opts);
opts.set_prefix_extractor_fixed_size(size);
}
let db = try!(DB::open(&opts, path));
let db = match DB::open(&opts, path) {
Ok(db) => db,
Err(ref s) if s.starts_with("Corruption:") => {
info!("{}", s);
info!("Attempting DB repair for {}", path);
try!(DB::repair(&opts, path));
try!(DB::open(&opts, path))
},
Err(s) => { return Err(s); }
};
Ok(Database { db: db })
}
@@ -244,10 +302,10 @@ mod tests {
let path = RandomTempPath::create_dir();
let smoke = Database::open_default(path.as_path().to_str().unwrap()).unwrap();
assert!(smoke.is_empty());
test_db(&DatabaseConfig { prefix_size: None, max_open_files: 256, cache_size: None, });
test_db(&DatabaseConfig { prefix_size: Some(1), max_open_files: 256, cache_size: None, });
test_db(&DatabaseConfig { prefix_size: Some(8), max_open_files: 256, cache_size: None, });
test_db(&DatabaseConfig { prefix_size: Some(32), max_open_files: 256, cache_size: None, });
test_db(&DatabaseConfig::default());
test_db(&DatabaseConfig::default().prefix(12));
test_db(&DatabaseConfig::default().prefix(22));
test_db(&DatabaseConfig::default().prefix(8));
}
}

View File

@@ -117,6 +117,7 @@ extern crate libc;
extern crate target_info;
extern crate bigint;
extern crate chrono;
extern crate ansi_term;
pub mod standard;
#[macro_use]

View File

@@ -20,7 +20,21 @@ use std::env;
use rlog::{LogLevelFilter};
use env_logger::LogBuilder;
use std::sync::{RwLock, RwLockReadGuard};
use std::sync::atomic::{Ordering, AtomicBool};
use arrayvec::ArrayVec;
pub use ansi_term::{Colour, Style};
lazy_static! {
static ref USE_COLOR: AtomicBool = AtomicBool::new(false);
}
/// Paint, using colour if desired.
pub fn paint(c: Style, t: String) -> String {
match USE_COLOR.load(Ordering::Relaxed) {
true => format!("{}", c.paint(t)),
false => t,
}
}
lazy_static! {
static ref LOG_DUMMY: bool = {
@@ -57,7 +71,8 @@ impl RotatingLogger {
/// Creates new `RotatingLogger` with given levels.
/// It does not enforce levels - it's just read only.
pub fn new(levels: String) -> Self {
pub fn new(levels: String, enable_color: bool) -> Self {
USE_COLOR.store(enable_color, Ordering::Relaxed);
RotatingLogger {
levels: levels,
logs: RwLock::new(ArrayVec::<[_; LOG_SIZE]>::new()),
@@ -86,7 +101,7 @@ mod test {
use super::RotatingLogger;
fn logger() -> RotatingLogger {
RotatingLogger::new("test".to_owned())
RotatingLogger::new("test".to_owned(), false)
}
#[test]

View File

@@ -28,7 +28,7 @@ use crypto::*;
use rlp::*;
use network::node_table::*;
use network::error::NetworkError;
use io::StreamToken;
use io::{StreamToken, IoContext};
use network::PROTOCOL_VERSION;
@@ -283,7 +283,7 @@ impl Discovery {
ret
}
pub fn writable(&mut self) {
pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
while !self.send_queue.is_empty() {
let data = self.send_queue.pop_front().unwrap();
match self.udp_socket.send_to(&data.payload, &data.address) {
@@ -302,15 +302,17 @@ impl Discovery {
}
}
}
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
}
fn send_to(&mut self, payload: Bytes, address: SocketAddr) {
self.send_queue.push_back(Datagramm { payload: payload, address: address });
}
pub fn readable(&mut self) -> Option<TableUpdates> {
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Option<TableUpdates> where Message: Send + Sync + Clone {
let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() };
match self.udp_socket.recv_from(&mut buf) {
let writable = !self.send_queue.is_empty();
let res = match self.udp_socket.recv_from(&mut buf) {
Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
debug!("Error processing UDP packet: {:?}", e);
None
@@ -320,7 +322,12 @@ impl Discovery {
debug!("Error reading UPD socket: {:?}", e);
None
}
};
let new_writable = !self.send_queue.is_empty();
if writable != new_writable {
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
}
res
}
fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {

View File

@@ -32,6 +32,8 @@ use misc::version;
use crypto::*;
use sha3::Hashable;
use rlp::*;
use log::Colour::White;
use log::paint;
use network::session::{Session, SessionData};
use error::*;
use io::*;
@@ -236,8 +238,8 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
}
/// Send an IO message
pub fn message(&self, msg: Message) {
self.io.message(NetworkIoMessage::User(msg));
pub fn message(&self, msg: Message) -> Result<(), UtilError> {
self.io.message(NetworkIoMessage::User(msg))
}
/// Get an IoChannel.
@@ -248,12 +250,14 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
pub fn disable_peer(&self, peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left
self.io.message(NetworkIoMessage::DisablePeer(peer));
self.io.message(NetworkIoMessage::DisablePeer(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
/// Disconnect peer. Reconnect can be attempted later.
pub fn disconnect_peer(&self, peer: PeerId) {
self.io.message(NetworkIoMessage::Disconnect(peer));
self.io.message(NetworkIoMessage::Disconnect(peer))
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
}
/// Check if the session is still active.
@@ -267,7 +271,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
token: token,
delay: ms,
protocol: self.protocol,
});
}).unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
Ok(())
}
@@ -341,6 +345,7 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
reserved_nodes: RwLock<HashSet<NodeId>>,
num_sessions: AtomicUsize,
stopping: AtomicBool,
first_time: AtomicBool,
}
impl<Message> Host<Message> where Message: Send + Sync + Clone {
@@ -396,6 +401,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
reserved_nodes: RwLock::new(HashSet::new()),
num_sessions: AtomicUsize::new(0),
stopping: AtomicBool::new(false),
first_time: AtomicBool::new(true),
};
for n in boot_nodes {
@@ -531,7 +537,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
};
self.info.write().unwrap().public_endpoint = Some(public_endpoint.clone());
info!("Public node URL: {}", self.external_url().unwrap());
if self.first_time.load(AtomicOrdering::Relaxed) {
info!("Public node URL: {}", paint(White.bold(), format!("{}", self.external_url().unwrap())));
self.first_time.store(false, AtomicOrdering::Relaxed);
}
// Initialize discovery.
let discovery = {
@@ -714,7 +724,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
debug!(target: "network", "Can't accept connection: {:?}", e);
}
}
io.update_registration(TCP_ACCEPT).expect("Error registering TCP listener");
}
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
@@ -910,11 +919,10 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => {
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable() };
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable(io) };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
},
TCP_ACCEPT => self.accept(io),
_ => panic!("Received unknown readable token"),
@@ -928,8 +936,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
DISCOVERY => {
self.discovery.lock().unwrap().as_mut().unwrap().writable();
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
self.discovery.lock().unwrap().as_mut().unwrap().writable(io);
}
_ => panic!("Received unknown writable token"),
}
@@ -946,14 +953,14 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
DISCOVERY_REFRESH => {
self.discovery.lock().unwrap().as_mut().unwrap().refresh();
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
DISCOVERY_ROUND => {
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().round() };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
NODE_TABLE => {
trace!(target: "network", "Refreshing node table");
@@ -1004,7 +1011,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
handler_token
};
self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
io.register_timer(handler_token, *delay).expect("Error registering timer");
io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
},
NetworkIoMessage::Disconnect(ref peer) => {
let session = { self.sessions.read().unwrap().get(*peer).cloned() };

View File

@@ -88,7 +88,7 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {
/// Timer function called after a timeout created with `NetworkContext::timeout`.
fn timeout(&self, io: &NetworkContext<TestProtocolMessage>, timer: TimerToken) {
io.message(TestProtocolMessage { payload: 22 });
io.message(TestProtocolMessage { payload: 22 }).unwrap();
assert_eq!(timer, 0);
self.got_timeout.store(true, AtomicOrdering::Relaxed);
}

View File

@@ -258,14 +258,7 @@ impl <T>FromBytes for T where T: FixedHash {
Ordering::Equal => ()
};
unsafe {
use std::{mem, ptr};
let mut res: T = mem::uninitialized();
ptr::copy(bytes.as_ptr(), res.as_slice_mut().as_mut_ptr(), T::len());
Ok(res)
}
Ok(T::from_slice(bytes))
}
}

View File

@@ -27,6 +27,14 @@ pub struct UsingQueue<T> where T: Clone {
max_size: usize,
}
/// Take an item or just clone it?
pub enum GetAction {
/// Remove the item, faster but you can't get it back.
Take,
/// Clone the item, slower but you can get it again.
Clone,
}
impl<T> UsingQueue<T> where T: Clone {
/// Create a new struct with a maximum size of `max_size`.
pub fn new(max_size: usize) -> UsingQueue<T> {
@@ -74,6 +82,20 @@ impl<T> UsingQueue<T> where T: Clone {
self.in_use.iter().position(|r| predicate(r)).map(|i| self.in_use.remove(i))
}
/// Returns `Some` item which is the first that `f` returns `true` with a reference to it
/// as a parameter or `None` if no such item exists in the queue.
pub fn clone_used_if<P>(&mut self, predicate: P) -> Option<T> where P: Fn(&T) -> bool {
self.in_use.iter().find(|r| predicate(r)).cloned()
}
/// Fork-function for `take_used_if` and `clone_used_if`.
pub fn get_used_if<P>(&mut self, action: GetAction, predicate: P) -> Option<T> where P: Fn(&T) -> bool {
match action {
GetAction::Take => self.take_used_if(predicate),
GetAction::Clone => self.clone_used_if(predicate),
}
}
/// Returns the most recently pushed block if `f` returns `true` with a reference to it as
/// a parameter, otherwise `None`.
/// Will not destroy a block if a reference to it has previously been returned by `use_last_ref`,
@@ -94,18 +116,66 @@ impl<T> UsingQueue<T> where T: Clone {
}
#[test]
fn should_find_when_pushed() {
fn should_not_find_when_pushed() {
let mut q = UsingQueue::new(2);
q.push(1);
assert!(q.take_used_if(|i| i == &1).is_none());
}
#[test]
fn should_not_find_when_pushed_with_clone() {
let mut q = UsingQueue::new(2);
q.push(1);
assert!(q.clone_used_if(|i| i == &1).is_none());
}
#[test]
fn should_find_when_pushed_and_used() {
let mut q = UsingQueue::new(2);
q.push(1);
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).is_some());
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
}
#[test]
fn should_have_same_semantics_for_get_take_clone() {
let mut q = UsingQueue::new(2);
q.push(1);
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).is_none());
assert!(q.get_used_if(GetAction::Take, |i| i == &1).is_none());
q.use_last_ref();
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).unwrap() == 1);
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).unwrap() == 1);
assert!(q.get_used_if(GetAction::Take, |i| i == &1).unwrap() == 1);
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).is_none());
assert!(q.get_used_if(GetAction::Take, |i| i == &1).is_none());
}
#[test]
fn should_find_when_pushed_and_used_with_clone() {
let mut q = UsingQueue::new(2);
q.push(1);
q.use_last_ref();
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
}
#[test]
fn should_not_find_again_when_pushed_and_taken() {
let mut q = UsingQueue::new(2);
q.push(1);
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
assert!(q.clone_used_if(|i| i == &1).is_none());
}
#[test]
fn should_find_again_when_pushed_and_cloned() {
let mut q = UsingQueue::new(2);
q.push(1);
q.use_last_ref();
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
}
#[test]