// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
//! Key-Value store abstraction with `RocksDB` backend.
use std::io::ErrorKind;
use common::*;
use elastic_array::*;
use std::default::Default;
use std::path::PathBuf;
use hashdb::DBValue;
use rlp::{UntrustedRlp, RlpType, View, Compressible};
use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
Options, DBCompactionStyle, BlockBasedOptions, Direction, Cache, Column, ReadOptions};
#[cfg(target_os = "linux")]
use regex::Regex;
#[cfg(target_os = "linux")]
use std::process::Command;
#[cfg(target_os = "linux")]
use std::fs::File;
const DB_BACKGROUND_FLUSHES: i32 = 2;
const DB_BACKGROUND_COMPACTIONS: i32 = 2;
/// Write transaction. Batches a sequence of put/delete operations for efficiency.
pub struct DBTransaction {
ops: Vec,
}
enum DBOp {
Insert {
col: Option,
key: ElasticArray32,
value: DBValue,
},
InsertCompressed {
col: Option,
key: ElasticArray32,
value: DBValue,
},
Delete {
col: Option,
key: ElasticArray32,
}
}
impl DBTransaction {
/// Create new transaction.
pub fn new(_db: &Database) -> DBTransaction {
DBTransaction {
ops: Vec::with_capacity(256),
}
}
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
pub fn put(&mut self, col: Option, key: &[u8], value: &[u8]) {
let mut ekey = ElasticArray32::new();
ekey.append_slice(key);
self.ops.push(DBOp::Insert {
col: col,
key: ekey,
value: DBValue::from_slice(value),
});
}
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
pub fn put_vec(&mut self, col: Option, key: &[u8], value: Bytes) {
let mut ekey = ElasticArray32::new();
ekey.append_slice(key);
self.ops.push(DBOp::Insert {
col: col,
key: ekey,
value: DBValue::from_vec(value),
});
}
/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
/// Value will be RLP-compressed on flush
pub fn put_compressed(&mut self, col: Option, key: &[u8], value: Bytes) {
let mut ekey = ElasticArray32::new();
ekey.append_slice(key);
self.ops.push(DBOp::InsertCompressed {
col: col,
key: ekey,
value: DBValue::from_vec(value),
});
}
/// Delete value by key.
pub fn delete(&mut self, col: Option, key: &[u8]) {
let mut ekey = ElasticArray32::new();
ekey.append_slice(key);
self.ops.push(DBOp::Delete {
col: col,
key: ekey,
});
}
}
enum KeyState {
Insert(DBValue),
InsertCompressed(DBValue),
Delete,
}
/// Compaction profile for the database settings
#[derive(Clone, Copy, PartialEq, Debug)]
pub struct CompactionProfile {
/// L0-L1 target file size
pub initial_file_size: u64,
/// L2-LN target file size multiplier
pub file_size_multiplier: i32,
/// rate limiter for background flushes and compactions, bytes/sec, if any
pub write_rate_limit: Option,
}
impl Default for CompactionProfile {
/// Default profile suitable for most storage
fn default() -> CompactionProfile {
CompactionProfile::ssd()
}
}
/// Given output of df command return Linux rotational flag file path.
#[cfg(target_os = "linux")]
pub fn rotational_from_df_output(df_out: Vec) -> Option {
str::from_utf8(df_out.as_slice())
.ok()
// Get the drive name.
.and_then(|df_str| Regex::new(r"/dev/(sd[:alpha:]{1,2})")
.ok()
.and_then(|re| re.captures(df_str))
.and_then(|captures| captures.at(1)))
// Generate path e.g. /sys/block/sda/queue/rotational
.map(|drive_path| {
let mut p = PathBuf::from("/sys/block");
p.push(drive_path);
p.push("queue/rotational");
p
})
}
impl CompactionProfile {
/// Attempt to determine the best profile automatically, only Linux for now.
#[cfg(target_os = "linux")]
pub fn auto(db_path: &Path) -> CompactionProfile {
let hdd_check_file = db_path
.to_str()
.and_then(|path_str| Command::new("df").arg(path_str).output().ok())
.and_then(|df_res| match df_res.status.success() {
true => Some(df_res.stdout),
false => None,
})
.and_then(rotational_from_df_output);
// Read out the file and match compaction profile.
if let Some(hdd_check) = hdd_check_file {
if let Ok(mut file) = File::open(hdd_check.as_path()) {
let mut buffer = [0; 1];
if file.read_exact(&mut buffer).is_ok() {
// 0 means not rotational.
if buffer == [48] { return Self::ssd(); }
// 1 means rotational.
if buffer == [49] { return Self::hdd(); }
}
}
}
// Fallback if drive type was not determined.
Self::default()
}
/// Just default for other platforms.
#[cfg(not(target_os = "linux"))]
pub fn auto(_db_path: &Path) -> CompactionProfile {
Self::default()
}
/// Default profile suitable for SSD storage
pub fn ssd() -> CompactionProfile {
CompactionProfile {
initial_file_size: 32 * 1024 * 1024,
file_size_multiplier: 2,
write_rate_limit: None,
}
}
/// Slow HDD compaction profile
pub fn hdd() -> CompactionProfile {
CompactionProfile {
initial_file_size: 192 * 1024 * 1024,
file_size_multiplier: 1,
write_rate_limit: Some(8 * 1024 * 1024),
}
}
}
/// Database configuration
#[derive(Clone)]
pub struct DatabaseConfig {
/// Max number of open files.
pub max_open_files: i32,
/// Cache sizes (in MiB) for specific columns.
pub cache_sizes: HashMap