Merge branch 'master' into journaldb_commit

This commit is contained in:
Robert Habermeier
2016-10-13 12:59:32 +02:00
198 changed files with 6496 additions and 2398 deletions

View File

@@ -34,6 +34,7 @@ using_queue = { path = "using_queue" }
table = { path = "table" }
ansi_term = "0.7"
tiny-keccak= "1.0"
ethcore-bloom-journal = { path = "bloom" }
[features]
default = []

View File

@@ -4,7 +4,7 @@ homepage = "http://ethcore.io"
repository = "https://github.com/ethcore/parity"
license = "GPL-3.0"
name = "ethcore-bigint"
version = "0.1.0"
version = "0.1.1"
authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs"

View File

@@ -64,11 +64,11 @@ pub fn clean_0x(s: &str) -> &str {
macro_rules! impl_hash {
($from: ident, $size: expr) => {
#[derive(Eq)]
#[repr(C)]
/// Unformatted binary data of fixed length.
pub struct $from (pub [u8; $size]);
impl From<[u8; $size]> for $from {
fn from(bytes: [u8; $size]) -> Self {
$from(bytes)
@@ -210,6 +210,8 @@ macro_rules! impl_hash {
}
}
impl Eq for $from {}
impl PartialEq for $from {
fn eq(&self, other: &Self) -> bool {
for i in 0..$size {

9
util/bloom/Cargo.toml Normal file
View File

@@ -0,0 +1,9 @@
[project]
name = "ethcore-bloom-journal"
version = "0.1.0"
authors = ["Ethcore<admin@ethcore.io>"]
description = "Journaling bloom filter"
license = "GPL3"
[lib]
path = "src/lib.rs"

247
util/bloom/src/lib.rs Normal file
View File

@@ -0,0 +1,247 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::cmp;
use std::mem;
use std::f64;
use std::hash::{Hash, Hasher, SipHasher};
use std::collections::HashSet;
/// BitVec structure with journalling
/// Every time any of the blocks is getting set it's index is tracked
/// and can be then drained by `drain` method
struct BitVecJournal {
elems: Vec<u64>,
journal: HashSet<usize>,
}
impl BitVecJournal {
pub fn new(size: usize) -> BitVecJournal {
let extra = if size % 8 > 0 { 1 } else { 0 };
BitVecJournal {
elems: vec![0u64; size / 8 + extra],
journal: HashSet::new(),
}
}
pub fn from_parts(parts: &[u64]) -> BitVecJournal {
BitVecJournal {
elems: parts.to_vec(),
journal: HashSet::new(),
}
}
pub fn set(&mut self, index: usize) {
let e_index = index / 64;
let bit_index = index % 64;
let val = self.elems.get_mut(e_index).unwrap();
*val |= 1u64 << bit_index;
self.journal.insert(e_index);
}
pub fn get(&self, index: usize) -> bool {
let e_index = index / 64;
let bit_index = index % 64;
self.elems[e_index] & (1 << bit_index) != 0
}
pub fn drain(&mut self) -> Vec<(usize, u64)> {
let journal = mem::replace(&mut self.journal, HashSet::new()).into_iter();
journal.map(|idx| (idx, self.elems[idx])).collect::<Vec<(usize, u64)>>()
}
pub fn saturation(&self) -> f64 {
self.elems.iter().fold(0u64, |acc, e| acc + e.count_ones() as u64) as f64 / (self.elems.len() * 64) as f64
}
}
/// Bloom filter structure
pub struct Bloom {
bitmap: BitVecJournal,
bitmap_bits: u64,
k_num: u32,
sips: [SipHasher; 2],
}
impl Bloom {
/// Create a new bloom filter structure.
/// bitmap_size is the size in bytes (not bits) that will be allocated in memory
/// items_count is an estimation of the maximum number of items to store.
pub fn new(bitmap_size: usize, items_count: usize) -> Bloom {
assert!(bitmap_size > 0 && items_count > 0);
let bitmap_bits = (bitmap_size as u64) * 8u64;
let k_num = Bloom::optimal_k_num(bitmap_bits, items_count);
let bitmap = BitVecJournal::new(bitmap_bits as usize);
let sips = [Bloom::sip_new(), Bloom::sip_new()];
Bloom {
bitmap: bitmap,
bitmap_bits: bitmap_bits,
k_num: k_num,
sips: sips,
}
}
/// Initializes bloom filter from saved state
pub fn from_parts(parts: &[u64], k_num: u32) -> Bloom {
let bitmap_size = parts.len() * 8;
let bitmap_bits = (bitmap_size as u64) * 8u64;
let bitmap = BitVecJournal::from_parts(parts);
let sips = [Bloom::sip_new(), Bloom::sip_new()];
Bloom {
bitmap: bitmap,
bitmap_bits: bitmap_bits,
k_num: k_num,
sips: sips,
}
}
/// Create a new bloom filter structure.
/// items_count is an estimation of the maximum number of items to store.
/// fp_p is the wanted rate of false positives, in ]0.0, 1.0[
pub fn new_for_fp_rate(items_count: usize, fp_p: f64) -> Bloom {
let bitmap_size = Bloom::compute_bitmap_size(items_count, fp_p);
Bloom::new(bitmap_size, items_count)
}
/// Compute a recommended bitmap size for items_count items
/// and a fp_p rate of false positives.
/// fp_p obviously has to be within the ]0.0, 1.0[ range.
pub fn compute_bitmap_size(items_count: usize, fp_p: f64) -> usize {
assert!(items_count > 0);
assert!(fp_p > 0.0 && fp_p < 1.0);
let log2 = f64::consts::LN_2;
let log2_2 = log2 * log2;
((items_count as f64) * f64::ln(fp_p) / (-8.0 * log2_2)).ceil() as usize
}
/// Records the presence of an item.
pub fn set<T>(&mut self, item: T)
where T: Hash
{
let mut hashes = [0u64, 0u64];
for k_i in 0..self.k_num {
let bit_offset = (self.bloom_hash(&mut hashes, &item, k_i) % self.bitmap_bits) as usize;
self.bitmap.set(bit_offset);
}
}
/// Check if an item is present in the set.
/// There can be false positives, but no false negatives.
pub fn check<T>(&self, item: T) -> bool
where T: Hash
{
let mut hashes = [0u64, 0u64];
for k_i in 0..self.k_num {
let bit_offset = (self.bloom_hash(&mut hashes, &item, k_i) % self.bitmap_bits) as usize;
if !self.bitmap.get(bit_offset) {
return false;
}
}
true
}
/// Return the number of bits in the filter
pub fn number_of_bits(&self) -> u64 {
self.bitmap_bits
}
/// Return the number of hash functions used for `check` and `set`
pub fn number_of_hash_functions(&self) -> u32 {
self.k_num
}
fn optimal_k_num(bitmap_bits: u64, items_count: usize) -> u32 {
let m = bitmap_bits as f64;
let n = items_count as f64;
let k_num = (m / n * f64::ln(2.0f64)).ceil() as u32;
cmp::max(k_num, 1)
}
fn bloom_hash<T>(&self, hashes: &mut [u64; 2], item: &T, k_i: u32) -> u64
where T: Hash
{
if k_i < 2 {
let sip = &mut self.sips[k_i as usize].clone();
item.hash(sip);
let hash = sip.finish();
hashes[k_i as usize] = hash;
hash
} else {
hashes[0].wrapping_add((k_i as u64).wrapping_mul(hashes[1]) % 0xffffffffffffffc5)
}
}
fn sip_new() -> SipHasher {
SipHasher::new()
}
/// Drains the bloom journal returning the updated bloom part
pub fn drain_journal(&mut self) -> BloomJournal {
BloomJournal {
entries: self.bitmap.drain(),
hash_functions: self.k_num,
}
}
/// Returns the ratio of set bits in the bloom filter to the total bits
pub fn saturation(&self) -> f64 {
self.bitmap.saturation()
}
}
/// Bloom journal
/// Returns the tuple of (bloom part index, bloom part value) where each one is representing
/// an index of bloom parts that was updated since the last drain
pub struct BloomJournal {
pub hash_functions: u32,
pub entries: Vec<(usize, u64)>,
}
#[cfg(test)]
mod tests {
use super::Bloom;
#[test]
fn get_set() {
let mut bloom = Bloom::new(10, 80);
let key = vec![115u8, 99];
assert!(!bloom.check(&key));
bloom.set(&key);
assert!(bloom.check(&key));
}
#[test]
fn journalling() {
let initial = vec![0u64; 8];
let mut bloom = Bloom::from_parts(&initial, 3);
bloom.set(&vec![5u8, 4]);
let drain = bloom.drain_journal();
assert_eq!(2, drain.entries.len())
}
#[test]
fn saturation() {
let initial = vec![0u64; 8];
let mut bloom = Bloom::from_parts(&initial, 3);
bloom.set(&vec![5u8, 4]);
let full = bloom.saturation();
// 2/8/64 = 0.00390625
assert!(full >= 0.0039f64 && full <= 0.004f64);
}
}

18
util/fetch/Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
description = "HTTP/HTTPS fetching library"
homepage = "http://ethcore.io"
license = "GPL-3.0"
name = "fetch"
version = "0.1.0"
authors = ["Ethcore <admin@ethcore.io>"]
[dependencies]
log = "0.3"
rand = "0.3"
hyper = { default-features = false, git = "https://github.com/ethcore/hyper" }
https-fetch = { path = "../https-fetch" }
clippy = { version = "0.0.90", optional = true}
[features]
default = []
dev = ["clippy"]

146
util/fetch/src/client.rs Normal file
View File

@@ -0,0 +1,146 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Fetching
use std::{env, io};
use std::sync::{mpsc, Arc};
use std::sync::atomic::AtomicBool;
use std::path::PathBuf;
use hyper;
use https_fetch as https;
use fetch_file::{FetchHandler, Error as HttpFetchError};
pub type FetchResult = Result<PathBuf, FetchError>;
#[derive(Debug)]
pub enum FetchError {
InvalidUrl,
Http(HttpFetchError),
Https(https::FetchError),
Io(io::Error),
Other(String),
}
impl From<HttpFetchError> for FetchError {
fn from(e: HttpFetchError) -> Self {
FetchError::Http(e)
}
}
impl From<io::Error> for FetchError {
fn from(e: io::Error) -> Self {
FetchError::Io(e)
}
}
pub trait Fetch: Default + Send {
/// Fetch URL and get the result in callback.
fn request_async(&mut self, url: &str, abort: Arc<AtomicBool>, on_done: Box<Fn(FetchResult) + Send>) -> Result<(), FetchError>;
/// Fetch URL and get a result Receiver. You will be notified when receiver is ready by `on_done` callback.
fn request(&mut self, url: &str, abort: Arc<AtomicBool>, on_done: Box<Fn() + Send>) -> Result<mpsc::Receiver<FetchResult>, FetchError> {
let (tx, rx) = mpsc::channel();
try!(self.request_async(url, abort, Box::new(move |result| {
let res = tx.send(result);
if let Err(_) = res {
warn!("Fetch finished, but no one was listening");
}
on_done();
})));
Ok(rx)
}
/// Closes this client
fn close(self) {}
/// Returns a random filename
fn random_filename() -> String {
use ::rand::Rng;
let mut rng = ::rand::OsRng::new().unwrap();
rng.gen_ascii_chars().take(12).collect()
}
}
pub struct Client {
http_client: hyper::Client<FetchHandler>,
https_client: https::Client,
limit: Option<usize>,
}
impl Default for Client {
fn default() -> Self {
// Max 15MB will be downloaded.
Client::with_limit(Some(15*1024*1024))
}
}
impl Client {
fn with_limit(limit: Option<usize>) -> Self {
Client {
http_client: hyper::Client::new().expect("Unable to initialize http client."),
https_client: https::Client::with_limit(limit).expect("Unable to initialize https client."),
limit: limit,
}
}
fn convert_url(url: hyper::Url) -> Result<https::Url, FetchError> {
let host = format!("{}", try!(url.host().ok_or(FetchError::InvalidUrl)));
let port = try!(url.port_or_known_default().ok_or(FetchError::InvalidUrl));
https::Url::new(&host, port, url.path()).map_err(|_| FetchError::InvalidUrl)
}
fn temp_path() -> PathBuf {
let mut dir = env::temp_dir();
dir.push(Self::random_filename());
dir
}
}
impl Fetch for Client {
fn close(self) {
self.http_client.close();
self.https_client.close();
}
fn request_async(&mut self, url: &str, abort: Arc<AtomicBool>, on_done: Box<Fn(FetchResult) + Send>) -> Result<(), FetchError> {
let is_https = url.starts_with("https://");
let url = try!(url.parse().map_err(|_| FetchError::InvalidUrl));
let temp_path = Self::temp_path();
trace!(target: "fetch", "Fetching from: {:?}", url);
if is_https {
let url = try!(Self::convert_url(url));
try!(self.https_client.fetch_to_file(
url,
temp_path.clone(),
abort,
move |result| on_done(result.map(|_| temp_path).map_err(FetchError::Https)),
).map_err(|e| FetchError::Other(format!("{:?}", e))));
} else {
try!(self.http_client.request(
url,
FetchHandler::new(temp_path, abort, Box::new(move |result| on_done(result)), self.limit.map(|v| v as u64).clone()),
).map_err(|e| FetchError::Other(format!("{:?}", e))));
}
Ok(())
}
}

View File

@@ -0,0 +1,176 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Hyper Client Handler to Fetch File
use std::{io, fs, fmt};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use hyper::status::StatusCode;
use hyper::client::{Request, Response, DefaultTransport as HttpStream};
use hyper::header::Connection;
use hyper::{self, Decoder, Encoder, Next};
use super::FetchError;
#[derive(Debug)]
pub enum Error {
Aborted,
NotStarted,
SizeLimit,
UnexpectedStatus(StatusCode),
IoError(io::Error),
HyperError(hyper::Error),
}
pub type FetchResult = Result<PathBuf, FetchError>;
pub type OnDone = Box<Fn(FetchResult) + Send>;
pub struct FetchHandler {
path: PathBuf,
abort: Arc<AtomicBool>,
file: Option<fs::File>,
result: Option<FetchResult>,
on_done: Option<OnDone>,
size_limit: Option<u64>,
}
impl fmt::Debug for FetchHandler {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Fetch {{ path: {:?}, file: {:?}, result: {:?} }}", self.path, self.file, self.result)
}
}
impl Drop for FetchHandler {
fn drop(&mut self) {
let res = self.result.take().unwrap_or(Err(Error::NotStarted.into()));
// Remove file if there was an error
if res.is_err() || self.is_aborted() {
if let Some(file) = self.file.take() {
drop(file);
// Remove file
let _ = fs::remove_file(&self.path);
}
}
// send result
if let Some(f) = self.on_done.take() {
f(res);
}
}
}
impl FetchHandler {
pub fn new(path: PathBuf, abort: Arc<AtomicBool>, on_done: OnDone, size_limit: Option<u64>) -> Self {
FetchHandler {
path: path,
abort: abort,
file: None,
result: None,
on_done: Some(on_done),
size_limit: size_limit,
}
}
fn is_aborted(&self) -> bool {
self.abort.load(Ordering::SeqCst)
}
fn mark_aborted(&mut self) -> Next {
self.result = Some(Err(Error::Aborted.into()));
Next::end()
}
}
impl hyper::client::Handler<HttpStream> for FetchHandler {
fn on_request(&mut self, req: &mut Request) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
req.headers_mut().set(Connection::close());
read()
}
fn on_request_writable(&mut self, _encoder: &mut Encoder<HttpStream>) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
read()
}
fn on_response(&mut self, res: Response) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
if *res.status() != StatusCode::Ok {
self.result = Some(Err(Error::UnexpectedStatus(*res.status()).into()));
return Next::end();
}
// Open file to write
match fs::File::create(&self.path) {
Ok(file) => {
self.file = Some(file);
self.result = Some(Ok(self.path.clone()));
read()
},
Err(err) => {
self.result = Some(Err(Error::IoError(err).into()));
Next::end()
},
}
}
fn on_response_readable(&mut self, decoder: &mut Decoder<HttpStream>) -> Next {
if self.is_aborted() {
return self.mark_aborted();
}
match io::copy(decoder, self.file.as_mut().expect("File is there because on_response has created it.")) {
Ok(0) => Next::end(),
Ok(bytes_read) => match self.size_limit {
None => read(),
// Check limit
Some(limit) if limit > bytes_read => {
self.size_limit = Some(limit - bytes_read);
read()
},
// Size limit reached
_ => {
self.result = Some(Err(Error::SizeLimit.into()));
Next::end()
},
},
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => Next::read(),
_ => {
self.result = Some(Err(Error::IoError(e).into()));
Next::end()
}
}
}
}
fn on_error(&mut self, err: hyper::Error) -> Next {
self.result = Some(Err(Error::HyperError(err).into()));
Next::remove()
}
}
fn read() -> Next {
Next::read().timeout(Duration::from_secs(15))
}

29
util/fetch/src/lib.rs Normal file
View File

@@ -0,0 +1,29 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! A service to fetch any HTTP / HTTPS content.
#[macro_use]
extern crate log;
extern crate hyper;
extern crate https_fetch;
extern crate rand;
pub mod client;
pub mod fetch_file;
pub use self::client::{Client, Fetch, FetchError, FetchResult};

View File

@@ -78,6 +78,10 @@ impl Drop for Client {
impl Client {
pub fn new() -> Result<Self, FetchError> {
Self::with_limit(None)
}
pub fn with_limit(size_limit: Option<usize>) -> Result<Self, FetchError> {
let mut event_loop = try!(mio::EventLoop::new());
let channel = event_loop.channel();
@@ -85,6 +89,7 @@ impl Client {
let mut client = ClientLoop {
next_token: 0,
sessions: HashMap::new(),
size_limit: size_limit,
};
event_loop.run(&mut client).unwrap();
});
@@ -128,6 +133,7 @@ impl Client {
pub struct ClientLoop {
next_token: usize,
sessions: HashMap<usize, TlsClient>,
size_limit: Option<usize>,
}
impl mio::Handler for ClientLoop {
@@ -154,7 +160,7 @@ impl mio::Handler for ClientLoop {
let token = self.next_token;
self.next_token += 1;
if let Ok(mut tlsclient) = TlsClient::new(mio::Token(token), &url, writer, abort, callback) {
if let Ok(mut tlsclient) = TlsClient::new(mio::Token(token), &url, writer, abort, callback, self.size_limit.clone()) {
let httpreq = format!(
"GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\nAccept-Encoding: identity\r\n\r\n",
url.path(),

View File

@@ -35,18 +35,20 @@ pub struct HttpProcessor {
status: Option<String>,
headers: Vec<String>,
body_writer: io::BufWriter<Box<io::Write>>,
size_limit: Option<usize>,
}
const BREAK_LEN: usize = 2;
impl HttpProcessor {
pub fn new(body_writer: Box<io::Write>) -> Self {
pub fn new(body_writer: Box<io::Write>, size_limit: Option<usize>) -> Self {
HttpProcessor {
state: State::WaitingForStatus,
buffer: Cursor::new(Vec::new()),
status: None,
headers: Vec::new(),
body_writer: io::BufWriter::new(body_writer)
body_writer: io::BufWriter::new(body_writer),
size_limit: size_limit,
}
}
@@ -140,6 +142,15 @@ impl HttpProcessor {
},
State::WritingBody => {
let len = self.buffer.get_ref().len();
match self.size_limit {
None => {},
Some(limit) if limit > len => {},
_ => {
warn!("Finishing file fetching because limit was reached.");
self.set_state(State::Finished);
continue;
}
}
try!(self.body_writer.write_all(self.buffer.get_ref()));
self.buffer_consume(len);
return Ok(());
@@ -167,6 +178,17 @@ impl HttpProcessor {
},
// Buffers the data until we have a full chunk
State::WritingChunk(left) if self.buffer.get_ref().len() >= left => {
match self.size_limit {
None => {},
Some(limit) if limit > left => {
self.size_limit = Some(limit - left);
},
_ => {
warn!("Finishing file fetching because limit was reached.");
self.set_state(State::Finished);
continue;
}
}
try!(self.body_writer.write_all(&self.buffer.get_ref()[0..left]));
self.buffer_consume(left + BREAK_LEN);
@@ -230,7 +252,7 @@ mod tests {
#[test]
fn should_be_able_to_process_status_line() {
// given
let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())));
let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())), None);
// when
let out =
@@ -249,7 +271,7 @@ mod tests {
#[test]
fn should_be_able_to_process_headers() {
// given
let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())));
let mut http = HttpProcessor::new(Box::new(Cursor::new(Vec::new())), None);
// when
let out =
@@ -274,7 +296,7 @@ mod tests {
fn should_be_able_to_consume_body() {
// given
let (writer, data) = Writer::new();
let mut http = HttpProcessor::new(Box::new(writer));
let mut http = HttpProcessor::new(Box::new(writer), None);
// when
let out =
@@ -301,7 +323,7 @@ mod tests {
fn should_correctly_handle_chunked_content() {
// given
let (writer, data) = Writer::new();
let mut http = HttpProcessor::new(Box::new(writer));
let mut http = HttpProcessor::new(Box::new(writer), None);
// when
let out =
@@ -331,4 +353,40 @@ mod tests {
assert_eq!(data.borrow().get_ref()[..], b"Parity in\r\n\r\nchunks."[..]);
assert_eq!(http.state(), State::Finished);
}
#[test]
fn should_stop_fetching_when_limit_is_reached() {
// given
let (writer, data) = Writer::new();
let mut http = HttpProcessor::new(Box::new(writer), Some(5));
// when
let out =
"\
HTTP/1.1 200 OK\r\n\
Host: 127.0.0.1:8080\r\n\
Transfer-Encoding: chunked\r\n\
Connection: close\r\n\
\r\n\
4\r\n\
Pari\r\n\
3\r\n\
ty \r\n\
D\r\n\
in\r\n\
\r\n\
chunks.\r\n\
0\r\n\
\r\n\
";
http.write_all(out.as_bytes()).unwrap();
http.flush().unwrap();
// then
assert_eq!(http.status().unwrap(), "HTTP/1.1 200 OK");
assert_eq!(http.headers().len(), 3);
assert_eq!(data.borrow().get_ref()[..], b"Pari"[..]);
assert_eq!(http.state(), State::Finished);
}
}

View File

@@ -87,6 +87,7 @@ impl TlsClient {
writer: Box<io::Write + Send>,
abort: Arc<AtomicBool>,
mut callback: Box<FnMut(FetchResult) + Send>,
size_limit: Option<usize>,
) -> Result<Self, FetchError> {
let res = TlsClient::make_config().and_then(|cfg| {
TcpStream::connect(url.address()).map(|sock| {
@@ -98,7 +99,7 @@ impl TlsClient {
Ok((cfg, sock)) => Ok(TlsClient {
abort: abort,
token: token,
writer: HttpProcessor::new(writer),
writer: HttpProcessor::new(writer, size_limit),
socket: sock,
closing: false,
error: None,

View File

@@ -68,6 +68,8 @@ mod panics;
use mio::{EventLoop, Token};
use std::fmt;
pub use worker::LOCAL_STACK_SIZE;
#[derive(Debug)]
/// IO Error
pub enum IoError {

View File

@@ -22,9 +22,19 @@ use crossbeam::sync::chase_lev;
use service::{HandlerId, IoChannel, IoContext};
use IoHandler;
use panics::*;
use std::cell::Cell;
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
const STACK_SIZE: usize = 16*1024*1024;
thread_local! {
/// Stack size
/// Should be modified if it is changed in Rust since it is no way
/// to know or get it
pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
}
pub enum WorkType<Message> {
Readable,
Writable,
@@ -66,8 +76,9 @@ impl Worker {
deleting: deleting.clone(),
wait_mutex: wait_mutex.clone(),
};
worker.thread = Some(thread::Builder::new().name(format!("IO Worker #{}", index)).spawn(
worker.thread = Some(thread::Builder::new().stack_size(STACK_SIZE).name(format!("IO Worker #{}", index)).spawn(
move || {
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
panic_handler.catch_panic(move || {
Worker::work_loop(stealer, channel.clone(), wait, wait_mutex.clone(), deleting)
}).unwrap()

View File

@@ -191,6 +191,11 @@ impl Connection {
self.socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "Unknown".to_owned())
}
/// Get local peer address string
pub fn local_addr_str(&self) -> String {
self.socket.local_addr().map(|a| a.to_string()).unwrap_or_else(|_| "Unknown".to_owned())
}
/// Clone this connection. Clears the receiving buffer of the returned connection.
pub fn try_clone(&self) -> io::Result<Self> {
Ok(Connection {

View File

@@ -31,7 +31,7 @@ use util::hash::*;
use util::Hashable;
use util::version;
use rlp::*;
use session::{Session, SessionData};
use session::{Session, SessionInfo, SessionData};
use error::*;
use io::*;
use {NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION};
@@ -47,7 +47,24 @@ type Slab<T> = ::slab::Slab<T, usize>;
const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 80;
const MAX_HANDSHAKES_PER_ROUND: usize = 32;
// Tokens
const TCP_ACCEPT: usize = SYS_TIMER + 1;
const IDLE: usize = SYS_TIMER + 2;
const DISCOVERY: usize = SYS_TIMER + 3;
const DISCOVERY_REFRESH: usize = SYS_TIMER + 4;
const DISCOVERY_ROUND: usize = SYS_TIMER + 5;
const NODE_TABLE: usize = SYS_TIMER + 6;
const FIRST_SESSION: usize = 0;
const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1;
const USER_TIMER: usize = LAST_SESSION + 256;
const SYS_TIMER: usize = LAST_SESSION + 1;
// Timeouts
const MAINTENANCE_TIMEOUT: u64 = 1000;
const DISCOVERY_REFRESH_TIMEOUT: u64 = 7200;
const DISCOVERY_ROUND_TIMEOUT: u64 = 300;
const NODE_TABLE_TIMEOUT: u64 = 300_000;
#[derive(Debug, PartialEq, Clone)]
/// Network service configuration
@@ -122,22 +139,10 @@ impl NetworkConfiguration {
}
}
// Tokens
const TCP_ACCEPT: usize = SYS_TIMER + 1;
const IDLE: usize = SYS_TIMER + 2;
const DISCOVERY: usize = SYS_TIMER + 3;
const DISCOVERY_REFRESH: usize = SYS_TIMER + 4;
const DISCOVERY_ROUND: usize = SYS_TIMER + 5;
const NODE_TABLE: usize = SYS_TIMER + 6;
const FIRST_SESSION: usize = 0;
const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1;
const USER_TIMER: usize = LAST_SESSION + 256;
const SYS_TIMER: usize = LAST_SESSION + 1;
/// Protocol handler level packet id
pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = &'static str;
pub type ProtocolId = [u8; 3];
/// Messages used to communitate with the event loop from other threads.
#[derive(Clone)]
@@ -185,7 +190,7 @@ pub struct CapabilityInfo {
impl Encodable for CapabilityInfo {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(2);
s.append(&self.protocol);
s.append(&&self.protocol[..]);
s.append(&self.version);
}
}
@@ -275,19 +280,23 @@ impl<'s> NetworkContext<'s> {
}
/// Returns peer identification string
pub fn peer_info(&self, peer: PeerId) -> String {
let session = self.resolve_session(peer);
if let Some(session) = session {
return session.lock().info.client_version.clone()
}
"unknown".to_owned()
pub fn peer_client_version(&self, peer: PeerId) -> String {
self.resolve_session(peer).map_or("unknown".to_owned(), |s| s.lock().info.client_version.clone())
}
/// Returns information on p2p session
pub fn session_info(&self, peer: PeerId) -> Option<SessionInfo> {
self.resolve_session(peer).map(|s| s.lock().info.clone())
}
/// Returns max version for a given protocol.
pub fn protocol_version(&self, peer: PeerId, protocol: &str) -> Option<u8> {
pub fn protocol_version(&self, peer: PeerId, protocol: ProtocolId) -> Option<u8> {
let session = self.resolve_session(peer);
session.and_then(|s| s.lock().capability_version(protocol))
}
/// Returns this object's subprotocol name.
pub fn subprotocol_name(&self) -> ProtocolId { self.protocol }
}
/// Shared host information
@@ -561,11 +570,11 @@ impl Host {
discovery.init_node_list(self.nodes.read().unordered_entries());
discovery.add_node_list(self.nodes.read().unordered_entries());
*self.discovery.lock() = Some(discovery);
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
try!(io.register_stream(DISCOVERY));
try!(io.register_timer(DISCOVERY_REFRESH, DISCOVERY_REFRESH_TIMEOUT));
try!(io.register_timer(DISCOVERY_ROUND, DISCOVERY_ROUND_TIMEOUT));
}
try!(io.register_timer(NODE_TABLE, 300_000));
try!(io.register_timer(NODE_TABLE, NODE_TABLE_TIMEOUT));
try!(io.register_stream(TCP_ACCEPT));
Ok(())
}
@@ -588,7 +597,8 @@ impl Host {
}
fn handshake_count(&self) -> usize {
self.sessions.read().count() - self.session_count()
// session_count < total_count is possible because of the data race.
self.sessions.read().count().saturating_sub(self.session_count())
}
fn keep_alive(&self, io: &IoContext<NetworkIoMessage>) {
@@ -801,8 +811,8 @@ impl Host {
}
}
for (p, _) in self.handlers.read().iter() {
if s.have_capability(p) {
ready_data.push(p);
if s.have_capability(*p) {
ready_data.push(*p);
}
}
},
@@ -811,7 +821,7 @@ impl Host {
protocol,
packet_id,
}) => {
match self.handlers.read().get(protocol) {
match self.handlers.read().get(&protocol) {
None => { warn!(target: "network", "No handler found for protocol: {:?}", protocol) },
Some(_) => packet_data.push((protocol, packet_id, data)),
}
@@ -826,13 +836,13 @@ impl Host {
}
let handlers = self.handlers.read();
for p in ready_data {
let h = handlers.get(p).unwrap().clone();
let h = handlers.get(&p).unwrap().clone();
self.stats.inc_sessions();
let reserved = self.reserved_nodes.read();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
}
for (p, packet_id, data) in packet_data {
let h = handlers.get(p).unwrap().clone();
let h = handlers.get(&p).unwrap().clone();
let reserved = self.reserved_nodes.read();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
}
@@ -857,8 +867,8 @@ impl Host {
if s.is_ready() {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
for (p, _) in self.handlers.read().iter() {
if s.have_capability(p) {
to_disconnect.push(p);
if s.have_capability(*p) {
to_disconnect.push(*p);
}
}
}
@@ -874,7 +884,7 @@ impl Host {
}
}
for p in to_disconnect {
let h = self.handlers.read().get(p).unwrap().clone();
let h = self.handlers.read().get(&p).unwrap().clone();
let reserved = self.reserved_nodes.read();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
@@ -909,6 +919,13 @@ impl Host {
let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved);
action(&context);
}
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F) -> T where F: Fn(&NetworkContext) -> T {
let reserved = { self.reserved_nodes.read() };
let context = NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved);
action(&context)
}
}
impl IoHandler<NetworkIoMessage> for Host {
@@ -978,9 +995,10 @@ impl IoHandler<NetworkIoMessage> for Host {
NODE_TABLE => {
trace!(target: "network", "Refreshing node table");
self.nodes.write().clear_useless();
self.nodes.write().save();
},
_ => match self.timers.read().get(&token).cloned() {
Some(timer) => match self.handlers.read().get(timer.protocol).cloned() {
Some(timer) => match self.handlers.read().get(&timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => {
let reserved = self.reserved_nodes.read();
@@ -1004,11 +1022,11 @@ impl IoHandler<NetworkIoMessage> for Host {
} => {
let h = handler.clone();
let reserved = self.reserved_nodes.read();
h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved));
self.handlers.write().insert(protocol, h);
h.initialize(&NetworkContext::new(io, *protocol, None, self.sessions.clone(), &reserved));
self.handlers.write().insert(*protocol, h);
let mut info = self.info.write();
for v in versions {
info.capabilities.push(CapabilityInfo { protocol: protocol, version: *v, packet_count:0 });
info.capabilities.push(CapabilityInfo { protocol: *protocol, version: *v, packet_count:0 });
}
},
NetworkIoMessage::AddTimer {
@@ -1023,7 +1041,7 @@ impl IoHandler<NetworkIoMessage> for Host {
*counter += 1;
handler_token
};
self.timers.write().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
self.timers.write().insert(handler_token, ProtocolTimer { protocol: *protocol, token: *token });
io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
},
NetworkIoMessage::Disconnect(ref peer) => {

View File

@@ -45,7 +45,7 @@
//!
//! fn main () {
//! let mut service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service");
//! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
//! service.register_protocol(Arc::new(MyHandler), *b"myp", &[1u8]);
//! service.start().expect("Error starting service");
//!
//! // Wait for quit condition
@@ -99,6 +99,7 @@ pub use host::NetworkIoMessage;
pub use error::NetworkError;
pub use host::NetworkConfiguration;
pub use stats::NetworkStats;
pub use session::SessionInfo;
use io::TimerToken;
pub use node_table::is_valid_node_url;

View File

@@ -266,7 +266,8 @@ impl NodeTable {
self.useless_nodes.clear();
}
fn save(&self) {
/// Save the nodes.json file.
pub fn save(&self) {
if let Some(ref path) = self.path {
let mut path_buf = PathBuf::from(path);
if let Err(e) = fs::create_dir_all(path_buf.as_path()) {
@@ -292,7 +293,7 @@ impl NodeTable {
}
};
if let Err(e) = file.write(&json.into_bytes()) {
warn!("Error writing node table file: {:?}", e);
warn!("Error writing node table file: {:?}", e);
}
}
}

View File

@@ -178,6 +178,13 @@ impl NetworkService {
host.with_context(protocol, &io, action);
};
}
/// Evaluates function in the network context
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, action: F) -> Option<T> where F: Fn(&NetworkContext) -> T {
let io = IoContext::new(self.io_service.channel(), 0);
let host = self.host.read();
host.as_ref().map(|ref host| host.with_context_eval(protocol, &io, action))
}
}
impl MayPanic for NetworkService {

View File

@@ -14,8 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{str, io};
use std::net::SocketAddr;
use std::io;
use std::sync::*;
use mio::*;
use mio::tcp::*;
@@ -63,7 +63,7 @@ pub enum SessionData {
/// Packet data
data: Vec<u8>,
/// Packet protocol ID
protocol: &'static str,
protocol: [u8; 3],
/// Zero based packet ID
packet_id: u8,
},
@@ -72,6 +72,7 @@ pub enum SessionData {
}
/// Shared session information
#[derive(Debug, Clone)]
pub struct SessionInfo {
/// Peer public key
pub id: Option<NodeId>,
@@ -79,33 +80,51 @@ pub struct SessionInfo {
pub client_version: String,
/// Peer RLPx protocol version
pub protocol_version: u32,
/// Session protocol capabilities
pub capabilities: Vec<SessionCapabilityInfo>,
/// Peer protocol capabilities
capabilities: Vec<SessionCapabilityInfo>,
pub peer_capabilities: Vec<PeerCapabilityInfo>,
/// Peer ping delay in milliseconds
pub ping_ms: Option<u64>,
/// True if this session was originated by us.
pub originated: bool,
/// Remote endpoint address of the session
pub remote_address: String,
/// Local endpoint address of the session
pub local_address: String,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerCapabilityInfo {
pub protocol: String,
pub protocol: ProtocolId,
pub version: u8,
}
impl Decodable for PeerCapabilityInfo {
fn decode<D>(decoder: &D) -> Result<Self, DecoderError> where D: Decoder {
let c = decoder.as_rlp();
let p: Vec<u8> = try!(c.val_at(0));
if p.len() != 3 {
return Err(DecoderError::Custom("Invalid subprotocol string length. Should be 3"));
}
let mut p2: ProtocolId = [0u8; 3];
p2.clone_from_slice(&p);
Ok(PeerCapabilityInfo {
protocol: try!(c.val_at(0)),
protocol: p2,
version: try!(c.val_at(1))
})
}
}
#[derive(Debug)]
struct SessionCapabilityInfo {
pub protocol: &'static str,
impl ToString for PeerCapabilityInfo {
fn to_string(&self) -> String {
format!("{}/{}", str::from_utf8(&self.protocol[..]).unwrap_or("???"), self.version)
}
}
#[derive(Debug, Clone)]
pub struct SessionCapabilityInfo {
pub protocol: [u8; 3],
pub version: u8,
pub packet_count: u8,
pub id_offset: u8,
@@ -128,6 +147,7 @@ impl Session {
where Message: Send + Clone {
let originated = id.is_some();
let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake");
let local_addr = handshake.connection.local_addr_str();
try!(handshake.start(io, host, originated));
Ok(Session {
state: State::Handshake(handshake),
@@ -137,8 +157,11 @@ impl Session {
client_version: String::new(),
protocol_version: 0,
capabilities: Vec::new(),
peer_capabilities: Vec::new(),
ping_ms: None,
originated: originated,
remote_address: "Handshake".to_owned(),
local_address: local_addr,
},
ping_time_ns: 0,
pong_time_ns: None,
@@ -149,6 +172,7 @@ impl Session {
fn complete_handshake<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Sync + Clone {
let connection = if let State::Handshake(ref mut h) = self.state {
self.info.id = Some(h.id.clone());
self.info.remote_address = h.connection.remote_addr_str();
try!(EncryptedConnection::new(h))
} else {
panic!("Unexpected state");
@@ -239,12 +263,12 @@ impl Session {
}
/// Checks if peer supports given capability
pub fn have_capability(&self, protocol: &str) -> bool {
pub fn have_capability(&self, protocol: [u8; 3]) -> bool {
self.info.capabilities.iter().any(|c| c.protocol == protocol)
}
/// Checks if peer supports given capability
pub fn capability_version(&self, protocol: &str) -> Option<u8> {
pub fn capability_version(&self, protocol: [u8; 3]) -> Option<u8> {
self.info.capabilities.iter().filter_map(|c| if c.protocol == protocol { Some(c.version) } else { None }).max()
}
@@ -270,10 +294,10 @@ impl Session {
}
/// Send a protocol packet to peer.
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), NetworkError>
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, protocol: [u8; 3], packet_id: u8, data: &[u8]) -> Result<(), NetworkError>
where Message: Send + Sync + Clone {
if self.info.capabilities.is_empty() || !self.had_hello {
debug!(target: "network", "Sending to unconfirmed session {}, protocol: {}, packet: {}", self.token(), protocol, packet_id);
debug!(target: "network", "Sending to unconfirmed session {}, protocol: {}, packet: {}", self.token(), str::from_utf8(&protocol[..]).unwrap_or("??"), packet_id);
return Err(From::from(NetworkError::BadProtocol));
}
if self.expired() {
@@ -425,8 +449,10 @@ impl Session {
i += 1;
}
trace!(target: "network", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps);
self.info.protocol_version = protocol;
self.info.client_version = client_version;
self.info.capabilities = caps;
self.info.peer_capabilities = peer_caps;
if self.info.capabilities.is_empty() {
trace!(target: "network", "No common capabilities with peer.");
return Err(From::from(self.disconnect(io, DisconnectReason::UselessPeer)));

View File

@@ -41,7 +41,7 @@ impl TestProtocol {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
let handler = Arc::new(TestProtocol::new(drop_session));
service.register_protocol(handler.clone(), "test", &[42u8, 43u8]).expect("Error registering test protocol handler");
service.register_protocol(handler.clone(), *b"tst", &[42u8, 43u8]).expect("Error registering test protocol handler");
handler
}
@@ -69,7 +69,7 @@ impl NetworkProtocolHandler for TestProtocol {
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
assert!(io.peer_info(*peer).contains("Parity"));
assert!(io.peer_client_version(*peer).contains("Parity"));
if self.drop_session {
io.disconnect_peer(*peer)
} else {
@@ -93,7 +93,7 @@ impl NetworkProtocolHandler for TestProtocol {
fn net_service() {
let service = NetworkService::new(NetworkConfiguration::new_local()).expect("Error creating network service");
service.start().unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[1u8]).unwrap();
}
#[test]

View File

@@ -143,12 +143,12 @@ impl CompactionProfile {
}
/// Database configuration
#[derive(Clone, Copy)]
#[derive(Clone)]
pub struct DatabaseConfig {
/// Max number of open files.
pub max_open_files: i32,
/// Cache-size
pub cache_size: Option<usize>,
/// Cache sizes (in MiB) for specific columns.
pub cache_sizes: HashMap<Option<u32>, usize>,
/// Compaction profile
pub compaction: CompactionProfile,
/// Set number of columns
@@ -159,17 +159,23 @@ pub struct DatabaseConfig {
impl DatabaseConfig {
/// Create new `DatabaseConfig` with default parameters and specified set of columns.
/// Note that cache sizes must be explicitly set.
pub fn with_columns(columns: Option<u32>) -> Self {
let mut config = Self::default();
config.columns = columns;
config
}
/// Set the column cache size in MiB.
pub fn set_cache(&mut self, col: Option<u32>, size: usize) {
self.cache_sizes.insert(col, size);
}
}
impl Default for DatabaseConfig {
fn default() -> DatabaseConfig {
DatabaseConfig {
cache_size: None,
cache_sizes: HashMap::new(),
max_open_files: 512,
compaction: CompactionProfile::default(),
columns: None,
@@ -213,6 +219,9 @@ impl Database {
/// Open database file. Creates if it does not exist.
pub fn open(config: &DatabaseConfig, path: &str) -> Result<Database, String> {
// default cache size for columns not specified.
const DEFAULT_CACHE: usize = 2;
let mut opts = Options::new();
if let Some(rate_limit) = config.compaction.write_rate_limit {
try!(opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)));
@@ -232,17 +241,22 @@ impl Database {
let mut cf_options = Vec::with_capacity(config.columns.unwrap_or(0) as usize);
for _ in 0 .. config.columns.unwrap_or(0) {
for col in 0 .. config.columns.unwrap_or(0) {
let mut opts = Options::new();
opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
opts.set_target_file_size_base(config.compaction.initial_file_size);
opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
if let Some(cache_size) = config.cache_size {
let col_opt = config.columns.map(|_| col);
{
let cache_size = config.cache_sizes.get(&col_opt).cloned().unwrap_or(DEFAULT_CACHE);
let mut block_opts = BlockBasedOptions::new();
// all goes to read cache
// all goes to read cache.
block_opts.set_cache(Cache::new(cache_size * 1024 * 1024));
opts.set_block_based_table_factory(&block_opts);
}
cf_options.push(opts);
}

View File

@@ -20,7 +20,9 @@ mod tests;
use std::collections::BTreeMap;
use std::fs;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use ::kvdb::{CompactionProfile, Database, DatabaseConfig, DBTransaction};
@@ -96,20 +98,39 @@ pub enum Error {
Custom(String),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Error::CannotAddMigration => write!(f, "Cannot add migration"),
Error::MigrationImpossible => write!(f, "Migration impossible"),
Error::Io(ref err) => write!(f, "{}", err),
Error::Custom(ref err) => write!(f, "{}", err),
}
}
}
impl From<::std::io::Error> for Error {
fn from(e: ::std::io::Error) -> Self {
Error::Io(e)
}
}
impl From<String> for Error {
fn from(e: String) -> Self {
Error::Custom(e)
}
}
/// A generalized migration from the given db to a destination db.
pub trait Migration: 'static {
/// Number of columns in the database before the migration.
fn pre_columns(&self) -> Option<u32> { self.columns() }
/// Number of columns in database after the migration.
fn columns(&self) -> Option<u32>;
/// Version of the database after the migration.
fn version(&self) -> u32;
/// Migrate a source to a destination.
fn migrate(&mut self, source: &Database, config: &Config, destination: &mut Database, col: Option<u32>) -> Result<(), Error>;
fn migrate(&mut self, source: Arc<Database>, config: &Config, destination: &mut Database, col: Option<u32>) -> Result<(), Error>;
}
/// A simple migration over key-value pairs.
@@ -128,7 +149,7 @@ impl<T: SimpleMigration> Migration for T {
fn version(&self) -> u32 { SimpleMigration::version(self) }
fn migrate(&mut self, source: &Database, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
let mut batch = Batch::new(config, col);
for (key, value) in source.iter(col) {
@@ -195,6 +216,7 @@ impl Manager {
Some(last) => migration.version() > last.version(),
None => true,
};
match is_new {
true => Ok(self.migrations.push(Box::new(migration))),
false => Err(Error::CannotAddMigration),
@@ -205,12 +227,16 @@ impl Manager {
/// and producing a path where the final migration lives.
pub fn execute(&mut self, old_path: &Path, version: u32) -> Result<PathBuf, Error> {
let config = self.config.clone();
let columns = self.no_of_columns_at(version);
let migrations = self.migrations_from(version);
trace!(target: "migration", "Total migrations to execute for version {}: {}", version, migrations.len());
if migrations.is_empty() { return Err(Error::MigrationImpossible) };
let columns = migrations.iter().nth(0).and_then(|m| m.pre_columns());
trace!(target: "migration", "Expecting database to contain {:?} columns", columns);
let mut db_config = DatabaseConfig {
max_open_files: 64,
cache_size: None,
cache_sizes: Default::default(),
compaction: config.compaction_profile,
columns: columns,
wal: true,
@@ -222,7 +248,7 @@ impl Manager {
// start with the old db.
let old_path_str = try!(old_path.to_str().ok_or(Error::MigrationImpossible));
let mut cur_db = try!(Database::open(&db_config, old_path_str).map_err(Error::Custom));
let mut cur_db = Arc::new(try!(Database::open(&db_config, old_path_str).map_err(Error::Custom)));
for migration in migrations {
// Change number of columns in new db
@@ -237,16 +263,16 @@ impl Manager {
// perform the migration from cur_db to new_db.
match current_columns {
// migrate only default column
None => try!(migration.migrate(&cur_db, &config, &mut new_db, None)),
None => try!(migration.migrate(cur_db.clone(), &config, &mut new_db, None)),
Some(v) => {
// Migrate all columns in previous DB
for col in 0..v {
try!(migration.migrate(&cur_db, &config, &mut new_db, Some(col)))
try!(migration.migrate(cur_db.clone(), &config, &mut new_db, Some(col)))
}
}
}
// next iteration, we will migrate from this db into the other temp.
cur_db = new_db;
cur_db = Arc::new(new_db);
temp_idx.swap();
// remove the other temporary migration database.
@@ -267,14 +293,6 @@ impl Manager {
fn migrations_from(&mut self, version: u32) -> Vec<&mut Box<Migration>> {
self.migrations.iter_mut().filter(|m| m.version() > version).collect()
}
fn no_of_columns_at(&self, version: u32) -> Option<u32> {
let migration = self.migrations.iter().find(|m| m.version() == version);
match migration {
Some(m) => m.columns(),
None => None
}
}
}
/// Prints a dot every `max` ticks

View File

@@ -19,7 +19,7 @@
//! are performed in temp sub-directories.
use common::*;
use migration::{Config, SimpleMigration, Manager};
use migration::{Batch, Config, Error, SimpleMigration, Migration, Manager};
use kvdb::Database;
use devtools::RandomTempPath;
@@ -62,11 +62,10 @@ impl SimpleMigration for Migration0 {
fn version(&self) -> u32 { 1 }
fn simple_migrate(&mut self, key: Vec<u8>, value: Vec<u8>) -> Option<(Vec<u8>, Vec<u8>)> {
let mut key = key;
fn simple_migrate(&mut self, mut key: Vec<u8>, mut value: Vec<u8>) -> Option<(Vec<u8>, Vec<u8>)> {
key.push(0x11);
let mut value = value;
value.push(0x22);
Some((key, value))
}
}
@@ -83,6 +82,31 @@ impl SimpleMigration for Migration1 {
}
}
struct AddsColumn;
impl Migration for AddsColumn {
fn pre_columns(&self) -> Option<u32> { None }
fn columns(&self) -> Option<u32> { Some(1) }
fn version(&self) -> u32 { 1 }
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: Option<u32>) -> Result<(), Error> {
let mut batch = Batch::new(config, col);
for (key, value) in source.iter(col) {
try!(batch.insert(key.to_vec(), value.to_vec(), dest));
}
if col == Some(1) {
try!(batch.insert(vec![1, 2, 3], vec![4, 5, 6], dest));
}
batch.commit(dest)
}
}
#[test]
fn one_simple_migration() {
let dir = RandomTempPath::create_dir();
@@ -189,3 +213,16 @@ fn is_migration_needed() {
assert!(manager.is_needed(1));
assert!(!manager.is_needed(2));
}
#[test]
fn pre_columns() {
let mut manager = Manager::new(Config::default());
manager.add_migration(AddsColumn).unwrap();
let dir = RandomTempPath::create_dir();
let db_path = db_path(dir.as_path());
// this shouldn't fail to open the database even though it's one column
// short of the one before it.
manager.execute(&db_path, 0).unwrap();
}

View File

@@ -23,7 +23,7 @@ use target_info::Target;
include!(concat!(env!("OUT_DIR"), "/version.rs"));
include!(concat!(env!("OUT_DIR"), "/rustc_version.rs"));
#[derive(PartialEq,Eq,Clone,Copy)]
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
/// Boolean type for clean/dirty status.
pub enum Filth {
/// Data has not been changed.

View File

@@ -46,4 +46,4 @@ pub use rustc_serialize::hex::{FromHex, FromHexError};
pub use heapsize::HeapSizeOf;
pub use itertools::Itertools;
pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};

View File

@@ -233,4 +233,7 @@ impl TrieFactory {
TrieSpec::Fat => Ok(Box::new(try!(FatDBMut::from_existing(db, root)))),
}
}
/// Returns true iff the trie DB is a fat DB (allows enumeration of keys).
pub fn is_fat(&self) -> bool { self.spec == TrieSpec::Fat }
}