Merge branch 'master' into lightrpc

This commit is contained in:
Robert Habermeier
2017-02-20 18:01:29 +01:00
181 changed files with 7675 additions and 901 deletions

View File

@@ -504,7 +504,6 @@ pub type H256FastSet = HashSet<H256, BuildHasherDefault<PlainHasher>>;
#[cfg(test)]
mod tests {
use hash::*;
use bigint::*;
use std::str::FromStr;
#[test]

View File

@@ -26,10 +26,12 @@ use mime::{self, Mime};
use parking_lot::RwLock;
use reqwest;
/// Fetch abort control
#[derive(Default, Debug, Clone)]
pub struct Abort(Arc<AtomicBool>);
impl Abort {
/// Returns `true` if request is aborted.
pub fn is_aborted(&self) -> bool {
self.0.load(atomic::Ordering::SeqCst)
}
@@ -41,9 +43,12 @@ impl From<Arc<AtomicBool>> for Abort {
}
}
/// Fetch
pub trait Fetch: Clone + Send + Sync + 'static {
/// Result type
type Result: Future<Item=Response, Error=Error> + Send + 'static;
/// Creates new Fetch object.
fn new() -> Result<Self, Error> where Self: Sized;
/// Spawn the future in context of this `Fetch` thread pool.
@@ -76,6 +81,7 @@ pub trait Fetch: Clone + Send + Sync + 'static {
const CLIENT_TIMEOUT_SECONDS: u64 = 5;
/// Fetch client
pub struct Client {
client: RwLock<(time::Instant, Arc<reqwest::Client>)>,
pool: CpuPool,
@@ -189,9 +195,12 @@ impl Future for FetchTask {
}
}
/// Fetch Error
#[derive(Debug)]
pub enum Error {
/// Internal fetch error
Fetch(reqwest::Error),
/// Request aborted
Aborted,
}
@@ -204,17 +213,20 @@ impl From<reqwest::Error> for Error {
enum ResponseInner {
Response(reqwest::Response),
Reader(Box<io::Read + Send>),
NotFound,
}
impl fmt::Debug for ResponseInner {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ResponseInner::Response(ref response) => response.fmt(f),
ResponseInner::NotFound => write!(f, "Not found"),
ResponseInner::Reader(_) => write!(f, "io Reader"),
}
}
}
/// A fetch response type.
#[derive(Debug)]
pub struct Response {
inner: ResponseInner,
@@ -224,6 +236,7 @@ pub struct Response {
}
impl Response {
/// Creates new successfuly response reading from a file.
pub fn from_reader<R: io::Read + Send + 'static>(reader: R) -> Self {
Response {
inner: ResponseInner::Reader(Box::new(reader)),
@@ -233,13 +246,31 @@ impl Response {
}
}
/// Creates 404 response (useful for tests)
pub fn not_found() -> Self {
Response {
inner: ResponseInner::NotFound,
abort: Abort::default(),
limit: None,
read: 0,
}
}
/// Returns status code of this response.
pub fn status(&self) -> reqwest::StatusCode {
match self.inner {
ResponseInner::Response(ref r) => *r.status(),
ResponseInner::NotFound => reqwest::StatusCode::NotFound,
_ => reqwest::StatusCode::Ok,
}
}
/// Returns `true` if response status code is successful.
pub fn is_success(&self) -> bool {
self.status() == reqwest::StatusCode::Ok
}
/// Returns `true` if content type of this response is `text/html`
pub fn is_html(&self) -> bool {
match self.content_type() {
Some(Mime(mime::TopLevel::Text, mime::SubLevel::Html, _)) => true,
@@ -247,6 +278,7 @@ impl Response {
}
}
/// Returns content type of this response (if present)
pub fn content_type(&self) -> Option<Mime> {
match self.inner {
ResponseInner::Response(ref r) => {
@@ -266,6 +298,7 @@ impl io::Read for Response {
let res = match self.inner {
ResponseInner::Response(ref mut response) => response.read(buf),
ResponseInner::NotFound => return Ok(0),
ResponseInner::Reader(ref mut reader) => reader.read(buf),
};

View File

@@ -16,6 +16,8 @@
//! A service to fetch any HTTP / HTTPS content.
#![warn(missing_docs)]
#[macro_use]
extern crate log;

View File

@@ -354,7 +354,10 @@ impl EncryptedConnection {
/// Send a packet
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
let mut header = RlpStream::new();
let len = payload.len() as usize;
let len = payload.len();
if len >= (1 << 24) {
return Err(NetworkError::OversizedPacket);
}
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
header.append_raw(&[0xc2u8, 0x80u8, 0x80u8], 1);
//TODO: ger rid of vectors here

View File

@@ -556,7 +556,6 @@ impl Discovery {
mod tests {
use super::*;
use std::net::{SocketAddr};
use util::sha3::Hashable;
use util::FixedHash;
use node_table::{Node, NodeId, NodeEndpoint};

View File

@@ -106,6 +106,8 @@ pub enum NetworkError {
AddressResolve(Option<::std::io::Error>),
/// Error concerning the Rust standard library's IO subsystem.
StdIo(::std::io::Error),
/// Packet size is over the protocol limit.
OversizedPacket,
}
impl fmt::Display for NetworkError {
@@ -124,6 +126,7 @@ impl fmt::Display for NetworkError {
AddressResolve(_) => "Failed to resolve network address.".into(),
StdIo(ref err) => format!("{}", err),
Util(ref err) => format!("{}", err),
OversizedPacket => "Packet is too large".into(),
};
f.write_fmt(format_args!("Network error ({})", msg))

View File

@@ -844,7 +844,8 @@ impl Host {
// only proceed if the connecting peer is reserved.
if !self.reserved_nodes.read().contains(&id) {
s.disconnect(io, DisconnectReason::TooManyPeers);
return;
kill = true;
break;
}
}
ready_id = Some(id);
@@ -895,6 +896,7 @@ impl Host {
if duplicate {
trace!(target: "network", "Rejected duplicate connection: {}", token);
session.lock().disconnect(io, DisconnectReason::DuplicatePeer);
self.kill_connection(token, io, false);
return;
}
for p in ready_data {
@@ -1159,8 +1161,11 @@ impl IoHandler<NetworkIoMessage> for Host {
FIRST_SESSION ... LAST_SESSION => {
let mut connections = self.sessions.write();
if let Some(connection) = connections.get(stream).cloned() {
connection.lock().deregister_socket(event_loop).expect("Error deregistering socket");
connections.remove(stream);
let c = connection.lock();
if c.expired() { // make sure it is the same connection that the event was generated for
c.deregister_socket(event_loop).expect("Error deregistering socket");
connections.remove(stream);
}
}
}
DISCOVERY => (),

View File

@@ -241,7 +241,7 @@ impl Session {
/// Check if this session is expired.
pub fn expired(&self) -> bool {
match self.state {
State::Handshake(ref h) => h.expired(),
State::Handshake(ref h) => self.expired || h.expired(),
_ => self.expired,
}
}
@@ -407,7 +407,7 @@ impl Session {
let rlp = UntrustedRlp::new(&packet.data[1..]);
let reason: u8 = rlp.val_at(0)?;
if self.had_hello {
debug!("Disconnected: {}: {:?}", self.token(), DisconnectReason::from_u8(reason));
debug!(target:"network", "Disconnected: {}: {:?}", self.token(), DisconnectReason::from_u8(reason));
}
Err(From::from(NetworkError::Disconnect(DisconnectReason::from_u8(reason))))
}

View File

@@ -22,9 +22,7 @@ use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
use kvdb::{KeyValueDB, DBTransaction};
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics.
@@ -35,14 +33,14 @@ use std::env;
/// that the states of any block the node has ever processed will be accessible.
pub struct ArchiveDB {
overlay: MemoryDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
latest_era: Option<u64>,
column: Option<u32>,
}
impl ArchiveDB {
/// Create a new instance from file
pub fn new(backing: Arc<Database>, col: Option<u32>) -> ArchiveDB {
/// Create a new instance from a key-value db.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> ArchiveDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
ArchiveDB {
overlay: MemoryDB::new(),
@@ -55,9 +53,7 @@ impl ArchiveDB {
/// Create a new instance with an anonymous temporary database.
#[cfg(test)]
fn new_temp() -> ArchiveDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
@@ -186,7 +182,7 @@ impl JournalDB for ArchiveDB {
fn is_pruned(&self) -> bool { false }
fn backing(&self) -> &Arc<Database> {
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}

View File

@@ -22,9 +22,7 @@ use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
use kvdb::{KeyValueDB, DBTransaction};
#[derive(Clone, PartialEq, Eq)]
struct RefInfo {
@@ -112,7 +110,7 @@ enum RemoveFrom {
/// TODO: `store_reclaim_period`
pub struct EarlyMergeDB {
overlay: MemoryDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
refs: Option<Arc<RwLock<HashMap<H256, RefInfo>>>>,
latest_era: Option<u64>,
column: Option<u32>,
@@ -122,8 +120,8 @@ const PADDING : [u8; 10] = [ 0u8; 10 ];
impl EarlyMergeDB {
/// Create a new instance from file
pub fn new(backing: Arc<Database>, col: Option<u32>) -> EarlyMergeDB {
let (latest_era, refs) = EarlyMergeDB::read_refs(&backing, col);
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> EarlyMergeDB {
let (latest_era, refs) = EarlyMergeDB::read_refs(&*backing, col);
let refs = Some(Arc::new(RwLock::new(refs)));
EarlyMergeDB {
overlay: MemoryDB::new(),
@@ -137,9 +135,7 @@ impl EarlyMergeDB {
/// Create a new instance with an anonymous temporary database.
#[cfg(test)]
fn new_temp() -> EarlyMergeDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
@@ -152,11 +148,11 @@ impl EarlyMergeDB {
// The next three are valid only as long as there is an insert operation of `key` in the journal.
fn set_already_in(batch: &mut DBTransaction, col: Option<u32>, key: &H256) { batch.put(col, &Self::morph_key(key, 0), &[1u8]); }
fn reset_already_in(batch: &mut DBTransaction, col: Option<u32>, key: &H256) { batch.delete(col, &Self::morph_key(key, 0)); }
fn is_already_in(backing: &Database, col: Option<u32>, key: &H256) -> bool {
fn is_already_in(backing: &KeyValueDB, col: Option<u32>, key: &H256) -> bool {
backing.get(col, &Self::morph_key(key, 0)).expect("Low-level database error. Some issue with your hard disk?").is_some()
}
fn insert_keys(inserts: &[(H256, DBValue)], backing: &Database, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, trace: bool) {
fn insert_keys(inserts: &[(H256, DBValue)], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>, batch: &mut DBTransaction, trace: bool) {
for &(ref h, ref d) in inserts {
if let Some(c) = refs.get_mut(h) {
// already counting. increment.
@@ -189,7 +185,7 @@ impl EarlyMergeDB {
}
}
fn replay_keys(inserts: &[H256], backing: &Database, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>) {
fn replay_keys(inserts: &[H256], backing: &KeyValueDB, col: Option<u32>, refs: &mut HashMap<H256, RefInfo>) {
trace!(target: "jdb.fine", "replay_keys: inserts={:?}, refs={:?}", inserts, refs);
for h in inserts {
if let Some(c) = refs.get_mut(h) {
@@ -262,7 +258,7 @@ impl EarlyMergeDB {
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let (latest_era, reconstructed) = Self::read_refs(&self.backing, self.column);
let (latest_era, reconstructed) = Self::read_refs(&*self.backing, self.column);
let refs = self.refs.as_ref().unwrap().write();
if *refs != reconstructed || latest_era != self.latest_era {
let clean_refs = refs.iter().filter_map(|(k, v)| if reconstructed.get(k) == Some(v) {None} else {Some((k.clone(), v.clone()))}).collect::<HashMap<_, _>>();
@@ -278,7 +274,7 @@ impl EarlyMergeDB {
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?")
}
fn read_refs(db: &Database, col: Option<u32>) -> (Option<u64>, HashMap<H256, RefInfo>) {
fn read_refs(db: &KeyValueDB, col: Option<u32>) -> (Option<u64>, HashMap<H256, RefInfo>) {
let mut refs = HashMap::new();
let mut latest_era = None;
if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") {
@@ -361,7 +357,7 @@ impl JournalDB for EarlyMergeDB {
self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn backing(&self) -> &Arc<Database> {
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}
@@ -432,7 +428,7 @@ impl JournalDB for EarlyMergeDB {
r.begin_list(inserts.len());
inserts.iter().foreach(|&(k, _)| {r.append(&k);});
r.append(&removes);
Self::insert_keys(&inserts, &self.backing, self.column, &mut refs, batch, trace);
Self::insert_keys(&inserts, &*self.backing, self.column, &mut refs, batch, trace);
let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
@@ -558,7 +554,7 @@ mod tests {
use super::*;
use super::super::traits::JournalDB;
use log::init_log;
use kvdb::{Database, DatabaseConfig};
use kvdb::{DatabaseConfig};
#[test]
fn insert_same_in_fork() {
@@ -817,7 +813,7 @@ mod tests {
fn new_db(path: &Path) -> EarlyMergeDB {
let config = DatabaseConfig::with_columns(Some(1));
let backing = Arc::new(Database::open(&config, path.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::Database::open(&config, path.to_str().unwrap()).unwrap());
EarlyMergeDB::new(backing, Some(0))
}

View File

@@ -17,7 +17,6 @@
//! `JournalDB` interface and implementation.
use common::*;
use kvdb::Database;
/// Export the journaldb module.
pub mod traits;
@@ -115,8 +114,8 @@ impl fmt::Display for Algorithm {
}
}
/// Create a new `JournalDB` trait object.
pub fn new(backing: Arc<Database>, algorithm: Algorithm, col: Option<u32>) -> Box<JournalDB> {
/// Create a new `JournalDB` trait object over a generic key-value database.
pub fn new(backing: Arc<::kvdb::KeyValueDB>, algorithm: Algorithm, col: Option<u32>) -> Box<JournalDB> {
match algorithm {
Algorithm::Archive => Box::new(archivedb::ArchiveDB::new(backing, col)),
Algorithm::EarlyMerge => Box::new(earlymergedb::EarlyMergeDB::new(backing, col)),
@@ -184,4 +183,4 @@ mod tests {
assert_eq!(overlayrecent, 1);
assert_eq!(refcounted, 1);
}
}
}

View File

@@ -21,9 +21,7 @@ use rlp::*;
use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
use kvdb::{KeyValueDB, DBTransaction};
use super::JournalDB;
/// Implementation of the `JournalDB` trait for a disk-backed database with a memory overlay
@@ -59,7 +57,7 @@ use super::JournalDB;
pub struct OverlayRecentDB {
transaction_overlay: MemoryDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
journal_overlay: Arc<RwLock<JournalOverlay>>,
column: Option<u32>,
}
@@ -102,8 +100,8 @@ const PADDING : [u8; 10] = [ 0u8; 10 ];
impl OverlayRecentDB {
/// Create a new instance.
pub fn new(backing: Arc<Database>, col: Option<u32>) -> OverlayRecentDB {
let journal_overlay = Arc::new(RwLock::new(OverlayRecentDB::read_overlay(&backing, col)));
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayRecentDB {
let journal_overlay = Arc::new(RwLock::new(OverlayRecentDB::read_overlay(&*backing, col)));
OverlayRecentDB {
transaction_overlay: MemoryDB::new(),
backing: backing,
@@ -115,15 +113,13 @@ impl OverlayRecentDB {
/// Create a new instance with an anonymous temporary database.
#[cfg(test)]
pub fn new_temp() -> OverlayRecentDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
#[cfg(test)]
fn can_reconstruct_refs(&self) -> bool {
let reconstructed = Self::read_overlay(&self.backing, self.column);
let reconstructed = Self::read_overlay(&*self.backing, self.column);
let journal_overlay = self.journal_overlay.read();
journal_overlay.backing_overlay == reconstructed.backing_overlay &&
journal_overlay.pending_overlay == reconstructed.pending_overlay &&
@@ -136,7 +132,7 @@ impl OverlayRecentDB {
self.backing.get(self.column, key).expect("Low-level database error. Some issue with your hard disk?")
}
fn read_overlay(db: &Database, col: Option<u32>) -> JournalOverlay {
fn read_overlay(db: &KeyValueDB, col: Option<u32>) -> JournalOverlay {
let mut journal = HashMap::new();
let mut overlay = MemoryDB::new();
let mut count = 0;
@@ -235,7 +231,7 @@ impl JournalDB for OverlayRecentDB {
self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none()
}
fn backing(&self) -> &Arc<Database> {
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}

View File

@@ -23,9 +23,7 @@ use overlaydb::OverlayDB;
use memorydb::MemoryDB;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
use super::traits::JournalDB;
use kvdb::{Database, DBTransaction};
#[cfg(test)]
use std::env;
use kvdb::{KeyValueDB, DBTransaction};
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay
/// and latent-removal semantics.
@@ -49,7 +47,7 @@ use std::env;
// TODO: store last_era, reclaim_period.
pub struct RefCountedDB {
forward: OverlayDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
latest_era: Option<u64>,
inserts: Vec<H256>,
removes: Vec<H256>,
@@ -60,7 +58,7 @@ const PADDING : [u8; 10] = [ 0u8; 10 ];
impl RefCountedDB {
/// Create a new instance given a `backing` database.
pub fn new(backing: Arc<Database>, col: Option<u32>) -> RefCountedDB {
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> RefCountedDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
RefCountedDB {
@@ -76,9 +74,7 @@ impl RefCountedDB {
/// Create a new instance with an anonymous temporary database.
#[cfg(test)]
fn new_temp() -> RefCountedDB {
let mut dir = env::temp_dir();
dir.push(H32::random().hex());
let backing = Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap());
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
}
@@ -112,7 +108,7 @@ impl JournalDB for RefCountedDB {
self.latest_era.is_none()
}
fn backing(&self) -> &Arc<Database> {
fn backing(&self) -> &Arc<KeyValueDB> {
&self.backing
}

View File

@@ -18,7 +18,7 @@
use common::*;
use hashdb::*;
use kvdb::{Database, DBTransaction};
use kvdb::{self, DBTransaction};
/// A `HashDB` which can manage a short-term journal potentially containing many forks of mutually
/// exclusive actions.
@@ -66,7 +66,7 @@ pub trait JournalDB: HashDB {
fn is_pruned(&self) -> bool { true }
/// Get backing database.
fn backing(&self) -> &Arc<Database>;
fn backing(&self) -> &Arc<kvdb::KeyValueDB>;
/// Clear internal strucutres. This should called after changes have been written
/// to the backing strage

View File

@@ -17,10 +17,11 @@
//! Key-Value store abstraction with `RocksDB` backend.
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::path::PathBuf;
use common::*;
use elastic_array::*;
use std::default::Default;
use std::path::PathBuf;
use hashdb::DBValue;
use rlp::{UntrustedRlp, RlpType, View, Compressible};
use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
@@ -36,10 +37,12 @@ const DB_BACKGROUND_FLUSHES: i32 = 2;
const DB_BACKGROUND_COMPACTIONS: i32 = 2;
/// Write transaction. Batches a sequence of put/delete operations for efficiency.
#[derive(Default, Clone, PartialEq)]
pub struct DBTransaction {
ops: Vec<DBOp>,
}
#[derive(Clone, PartialEq)]
enum DBOp {
Insert {
col: Option<u32>,
@@ -59,9 +62,14 @@ enum DBOp {
impl DBTransaction {
/// Create new transaction.
pub fn new(_db: &Database) -> DBTransaction {
pub fn new() -> DBTransaction {
DBTransaction::with_capacity(256)
}
/// Create new transaction with capacity.
pub fn with_capacity(cap: usize) -> DBTransaction {
DBTransaction {
ops: Vec::with_capacity(256),
ops: Vec::with_capacity(cap)
}
}
@@ -116,6 +124,138 @@ enum KeyState {
Delete,
}
/// Generic key-value database.
///
/// This makes a distinction between "buffered" and "flushed" values. Values which have been
/// written can always be read, but may be present in an in-memory buffer. Values which have
/// been flushed have been moved to backing storage, like a RocksDB instance. There are certain
/// operations which are only guaranteed to operate on flushed data and not buffered,
/// although implementations may differ in this regard.
///
/// The contents of an interior buffer may be explicitly flushed using the `flush` method.
///
/// The `KeyValueDB` also deals in "column families", which can be thought of as distinct
/// stores within a database. Keys written in one column family will not be accessible from
/// any other. The number of column families must be specified at initialization, with a
/// differing interface for each database. The `None` argument in place of a column index
/// is always supported.
///
/// The API laid out here, along with the `Sync` bound implies interior synchronization for
/// implementation.
pub trait KeyValueDB: Sync + Send {
/// Helper to create a new transaction.
fn transaction(&self) -> DBTransaction { DBTransaction::new() }
/// Get a value by key.
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String>;
/// Get a value by partial key. Only works for flushed data.
fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>>;
/// Write a transaction of changes to the buffer.
fn write_buffered(&self, transaction: DBTransaction);
/// Write a transaction of changes to the backing store.
fn write(&self, transaction: DBTransaction) -> Result<(), String> {
self.write_buffered(transaction);
self.flush()
}
/// Flush all buffered data.
fn flush(&self) -> Result<(), String>;
/// Iterate over flushed data for a given column.
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a>;
/// Attempt to replace this database with a new one located at the given path.
fn restore(&self, new_db: &str) -> Result<(), UtilError>;
}
/// A key-value database fulfilling the `KeyValueDB` trait, living in memory.
/// This is generally intended for tests and is not particularly optimized.
pub struct InMemory {
columns: RwLock<HashMap<Option<u32>, BTreeMap<Vec<u8>, DBValue>>>,
}
/// Create an in-memory database with the given number of columns.
/// Columns will be indexable by 0..`num_cols`
pub fn in_memory(num_cols: u32) -> InMemory {
let mut cols = HashMap::new();
cols.insert(None, BTreeMap::new());
for idx in 0..num_cols {
cols.insert(Some(idx), BTreeMap::new());
}
InMemory {
columns: RwLock::new(cols)
}
}
impl KeyValueDB for InMemory {
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String> {
let columns = self.columns.read();
match columns.get(&col) {
None => Err(format!("No such column family: {:?}", col)),
Some(map) => Ok(map.get(key).cloned()),
}
}
fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> {
let columns = self.columns.read();
match columns.get(&col) {
None => None,
Some(map) =>
map.iter()
.find(|&(ref k ,_)| k.starts_with(prefix))
.map(|(_, v)| (&**v).to_vec().into_boxed_slice())
}
}
fn write_buffered(&self, transaction: DBTransaction) {
let mut columns = self.columns.write();
let ops = transaction.ops;
for op in ops {
match op {
DBOp::Insert { col, key, value } => {
if let Some(mut col) = columns.get_mut(&col) {
col.insert(key.to_vec(), value);
}
},
DBOp::InsertCompressed { col, key, value } => {
if let Some(mut col) = columns.get_mut(&col) {
let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
let mut value = DBValue::new();
value.append_slice(&compressed);
col.insert(key.to_vec(), value);
}
},
DBOp::Delete { col, key } => {
if let Some(mut col) = columns.get_mut(&col) {
col.remove(&*key);
}
},
}
}
}
fn flush(&self) -> Result<(), String> { Ok(()) }
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a> {
match self.columns.read().get(&col) {
Some(map) => Box::new( // TODO: worth optimizing at all?
map.clone()
.into_iter()
.map(|(k, v)| (k.into_boxed_slice(), v.to_vec().into_boxed_slice()))
),
None => Box::new(None.into_iter())
}
}
fn restore(&self, _new_db: &str) -> Result<(), UtilError> {
Err(UtilError::SimpleString("Attempted to restore in-memory database".into()))
}
}
/// Compaction profile for the database settings
#[derive(Clone, Copy, PartialEq, Debug)]
pub struct CompactionProfile {
@@ -248,12 +388,16 @@ impl Default for DatabaseConfig {
}
}
/// Database iterator for flushed data only
pub struct DatabaseIterator {
/// Database iterator (for flushed data only)
// The compromise of holding only a virtual borrow vs. holding a lock on the
// inner DB (to prevent closing via restoration) may be re-evaluated in the future.
//
pub struct DatabaseIterator<'a> {
iter: DBIterator,
_marker: PhantomData<&'a Database>,
}
impl<'a> Iterator for DatabaseIterator {
impl<'a> Iterator for DatabaseIterator<'a> {
type Item = (Box<[u8]>, Box<[u8]>);
fn next(&mut self) -> Option<Self::Item> {
@@ -393,9 +537,9 @@ impl Database {
})
}
/// Creates new transaction for this database.
/// Helper to create new transaction for this database.
pub fn transaction(&self) -> DBTransaction {
DBTransaction::new(self)
DBTransaction::new()
}
@@ -562,9 +706,16 @@ impl Database {
//TODO: iterate over overlay
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
col.map_or_else(|| DatabaseIterator { iter: db.iterator_opt(IteratorMode::Start, &self.read_opts) },
|c| DatabaseIterator { iter: db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts)
.expect("iterator params are valid; qed") })
let iter = col.map_or_else(
|| db.iterator_opt(IteratorMode::Start, &self.read_opts),
|c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts)
.expect("iterator params are valid; qed")
);
DatabaseIterator {
iter: iter,
_marker: PhantomData,
}
},
None => panic!("Not supported yet") //TODO: return an empty iterator or change return type
}
@@ -619,6 +770,39 @@ impl Database {
}
}
// duplicate declaration of methods here to avoid trait import in certain existing cases
// at time of addition.
impl KeyValueDB for Database {
fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<DBValue>, String> {
Database::get(self, col, key)
}
fn get_by_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<Box<[u8]>> {
Database::get_by_prefix(self, col, prefix)
}
fn write_buffered(&self, transaction: DBTransaction) {
Database::write_buffered(self, transaction)
}
fn write(&self, transaction: DBTransaction) -> Result<(), String> {
Database::write(self, transaction)
}
fn flush(&self) -> Result<(), String> {
Database::flush(self)
}
fn iter<'a>(&'a self, col: Option<u32>) -> Box<Iterator<Item=(Box<[u8]>, Box<[u8]>)> + 'a> {
let unboxed = Database::iter(self, col);
Box::new(unboxed)
}
fn restore(&self, new_db: &str) -> Result<(), UtilError> {
Database::restore(self, new_db)
}
}
impl Drop for Database {
fn drop(&mut self) {
// write all buffered changes if we can.

View File

@@ -74,7 +74,7 @@ impl Batch {
pub fn commit(&mut self, dest: &mut Database) -> Result<(), Error> {
if self.inner.is_empty() { return Ok(()) }
let mut transaction = DBTransaction::new(dest);
let mut transaction = DBTransaction::new();
for keypair in &self.inner {
transaction.put(self.column, &keypair.0, &keypair.1);

View File

@@ -23,7 +23,7 @@ use hashdb::*;
use memorydb::*;
use std::sync::*;
use std::collections::HashMap;
use kvdb::{Database, DBTransaction};
use kvdb::{KeyValueDB, DBTransaction};
/// Implementation of the `HashDB` trait for a disk-backed database with a memory overlay.
///
@@ -36,22 +36,21 @@ use kvdb::{Database, DBTransaction};
#[derive(Clone)]
pub struct OverlayDB {
overlay: MemoryDB,
backing: Arc<Database>,
backing: Arc<KeyValueDB>,
column: Option<u32>,
}
impl OverlayDB {
/// Create a new instance of OverlayDB given a `backing` database.
pub fn new(backing: Arc<Database>, col: Option<u32>) -> OverlayDB {
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayDB {
OverlayDB{ overlay: MemoryDB::new(), backing: backing, column: col }
}
/// Create a new instance of OverlayDB with an anonymous temporary database.
#[cfg(test)]
pub fn new_temp() -> OverlayDB {
let mut dir = ::std::env::temp_dir();
dir.push(H32::random().hex());
Self::new(Arc::new(Database::open_default(dir.to_str().unwrap()).unwrap()), None)
let backing = Arc::new(::kvdb::in_memory(0));
Self::new(backing, None)
}
/// Commit all operations in a single batch.
@@ -295,23 +294,3 @@ fn overlaydb_complex() {
trie.commit().unwrap(); //
assert_eq!(trie.get(&hfoo), None);
}
#[test]
fn playpen() {
use std::fs;
{
let db = Database::open_default("/tmp/test").unwrap();
let mut batch = db.transaction();
batch.put(None, b"test", b"test2");
db.write(batch).unwrap();
match db.get(None, b"test") {
Ok(Some(value)) => println!("Got value {:?}", &*value),
Ok(None) => println!("No value for that key"),
Err(..) => println!("Gah"),
}
let mut batch = db.transaction();
batch.delete(None, b"test");
db.write(batch).unwrap();
}
fs::remove_dir_all("/tmp/test").unwrap();
}