// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity.  If not, see .
//! Key-Value store abstraction with `RocksDB` backend.
use common::*;
use elastic_array::*;
use std::default::Default;
use rlp::{UntrustedRlp, RlpType, View, Compressible};
use rocksdb::{DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
	Options, DBCompactionStyle, BlockBasedOptions, Direction, Cache, Column};
const DB_BACKGROUND_FLUSHES: i32 = 2;
const DB_BACKGROUND_COMPACTIONS: i32 = 2;
/// Write transaction. Batches a sequence of put/delete operations for efficiency.
pub struct DBTransaction {
	ops: Mutex>,
}
enum DBOp {
	Insert {
		col: Option,
		key: ElasticArray32,
		value: Bytes,
	},
	InsertCompressed {
		col: Option,
		key: ElasticArray32,
		value: Bytes,
	},
	Delete {
		col: Option,
		key: ElasticArray32,
	}
}
impl DBTransaction {
	/// Create new transaction.
	pub fn new(_db: &Database) -> DBTransaction {
		DBTransaction {
			ops: Mutex::new(Vec::with_capacity(256)),
		}
	}
	/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
	pub fn put(&self, col: Option, key: &[u8], value: &[u8]) -> Result<(), String> {
		let mut ekey = ElasticArray32::new();
		ekey.append_slice(key);
		self.ops.lock().push(DBOp::Insert {
			col: col,
			key: ekey,
			value: value.to_vec(),
		});
		Ok(())
	}
	/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
	pub fn put_vec(&self, col: Option, key: &[u8], value: Bytes) -> Result<(), String> {
		let mut ekey = ElasticArray32::new();
		ekey.append_slice(key);
		self.ops.lock().push(DBOp::Insert {
			col: col,
			key: ekey,
			value: value,
		});
		Ok(())
	}
	/// Insert a key-value pair in the transaction. Any existing value value will be overwritten upon write.
	/// Value will be RLP-compressed on  flush
	pub fn put_compressed(&self, col: Option, key: &[u8], value: Bytes) -> Result<(), String> {
		let mut ekey = ElasticArray32::new();
		ekey.append_slice(key);
		self.ops.lock().push(DBOp::InsertCompressed {
			col: col,
			key: ekey,
			value: value,
		});
		Ok(())
	}
	/// Delete value by key.
	pub fn delete(&self, col: Option, key: &[u8]) -> Result<(), String> {
		let mut ekey = ElasticArray32::new();
		ekey.append_slice(key);
		self.ops.lock().push(DBOp::Delete {
			col: col,
			key: ekey,
		});
		Ok(())
	}
}
enum KeyState {
	Insert(Bytes),
	InsertCompressed(Bytes),
	Delete,
}
/// Compaction profile for the database settings
#[derive(Clone, Copy)]
pub struct CompactionProfile {
	/// L0-L1 target file size
	pub initial_file_size: u64,
	/// L2-LN target file size multiplier
	pub file_size_multiplier: i32,
	/// rate limiter for background flushes and compactions, bytes/sec, if any
	pub write_rate_limit: Option,
}
impl Default for CompactionProfile {
	/// Default profile suitable for most storage
	fn default() -> CompactionProfile {
		CompactionProfile {
			initial_file_size: 32 * 1024 * 1024,
			file_size_multiplier: 2,
			write_rate_limit: None,
		}
	}
}
impl CompactionProfile {
	/// Slow hdd compaction profile
	pub fn hdd() -> CompactionProfile {
		CompactionProfile {
			initial_file_size: 192 * 1024 * 1024,
			file_size_multiplier: 1,
			write_rate_limit: Some(8 * 1024 * 1024),
		}
	}
}
/// Database configuration
#[derive(Clone, Copy)]
pub struct DatabaseConfig {
	/// Max number of open files.
	pub max_open_files: i32,
	/// Cache-size
	pub cache_size: Option,
	/// Compaction profile
	pub compaction: CompactionProfile,
	/// Set number of columns
	pub columns: Option,
	/// Should we keep WAL enabled?
	pub wal: bool,
}
impl DatabaseConfig {
	/// Create new `DatabaseConfig` with default parameters and specified set of columns.
	pub fn with_columns(columns: Option) -> Self {
		let mut config = Self::default();
		config.columns = columns;
		config
	}
}
impl Default for DatabaseConfig {
	fn default() -> DatabaseConfig {
		DatabaseConfig {
			cache_size: None,
			max_open_files: 512,
			compaction: CompactionProfile::default(),
			columns: None,
			wal: true,
		}
	}
}
/// Database iterator for flushed data only
pub struct DatabaseIterator {
	iter: DBIterator,
}
impl<'a> Iterator for DatabaseIterator {
	type Item = (Box<[u8]>, Box<[u8]>);
    fn next(&mut self) -> Option {
		self.iter.next()
	}
}
/// Key-Value database.
pub struct Database {
	db: DB,
	write_opts: WriteOptions,
	cfs: Vec,
	overlay: RwLock, KeyState>>>,
}
impl Database {
	/// Open database with default settings.
	pub fn open_default(path: &str) -> Result {
		Database::open(&DatabaseConfig::default(), path)
	}
	/// Open database file. Creates if it does not exist.
	pub fn open(config: &DatabaseConfig, path: &str) -> Result {
		let mut opts = Options::new();
		if let Some(rate_limit) = config.compaction.write_rate_limit {
			try!(opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)));
		}
		try!(opts.set_parsed_options(&format!("max_total_wal_size={}", 64 * 1024 * 1024)));
		opts.set_max_open_files(config.max_open_files);
		opts.create_if_missing(true);
		opts.set_use_fsync(false);
		opts.set_max_background_flushes(DB_BACKGROUND_FLUSHES);
		opts.set_max_background_compactions(DB_BACKGROUND_COMPACTIONS);
		// compaction settings
		opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
		opts.set_target_file_size_base(config.compaction.initial_file_size);
		opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
		let mut cf_options = Vec::with_capacity(config.columns.unwrap_or(0) as usize);
		for _ in 0 .. config.columns.unwrap_or(0) {
			let mut opts = Options::new();
			opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
			opts.set_target_file_size_base(config.compaction.initial_file_size);
			opts.set_target_file_size_multiplier(config.compaction.file_size_multiplier);
			if let Some(cache_size) = config.cache_size {
				let mut block_opts = BlockBasedOptions::new();
				// all goes to read cache
				block_opts.set_cache(Cache::new(cache_size * 1024 * 1024));
				opts.set_block_based_table_factory(&block_opts);
			}
			cf_options.push(opts);
		}
		let mut write_opts = WriteOptions::new();
		if !config.wal {
			write_opts.disable_wal(true);
		}
		let mut cfs: Vec = Vec::new();
		let db = match config.columns {
			Some(columns) => {
				let cfnames: Vec<_> = (0..columns).map(|c| format!("col{}", c)).collect();
				let cfnames: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
				match DB::open_cf(&opts, path, &cfnames, &cf_options) {
					Ok(db) => {
						cfs = cfnames.iter().map(|n| db.cf_handle(n).unwrap()).collect();
						assert!(cfs.len() == columns as usize);
						Ok(db)
					}
					Err(_) => {
						// retry and create CFs
						match DB::open_cf(&opts, path, &[], &[]) {
							Ok(mut db) => {
								cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i]).unwrap()).collect();
								Ok(db)
							},
							err @ Err(_) => err,
						}
					}
				}
			},
			None => DB::open(&opts, path)
		};
		let db = match db {
			Ok(db) => db,
			Err(ref s) if s.starts_with("Corruption:") => {
				info!("{}", s);
				info!("Attempting DB repair for {}", path);
				try!(DB::repair(&opts, path));
				try!(DB::open(&opts, path))
			},
			Err(s) => { return Err(s); }
		};
		Ok(Database {
			db: db,
			write_opts: write_opts,
			overlay: RwLock::new((0..(cfs.len() + 1)).map(|_| HashMap::new()).collect()),
			cfs: cfs,
		})
	}
	/// Creates new transaction for this database.
	pub fn transaction(&self) -> DBTransaction {
		DBTransaction::new(self)
	}
	fn to_overlay_column(col: Option) -> usize {
		col.map_or(0, |c| (c + 1) as usize)
	}
	/// Commit transaction to database.
	pub fn write_buffered(&self, tr: DBTransaction) -> Result<(), String> {
		let mut overlay = self.overlay.write();
		let ops = tr.ops.into_inner();
		for op in ops {
			match op {
				DBOp::Insert { col, key, value } => {
					let c = Self::to_overlay_column(col);
					overlay[c].insert(key, KeyState::Insert(value));
				},
				DBOp::InsertCompressed { col, key, value } => {
					let c = Self::to_overlay_column(col);
					overlay[c].insert(key, KeyState::InsertCompressed(value));
				},
				DBOp::Delete { col, key } => {
					let c = Self::to_overlay_column(col);
					overlay[c].insert(key, KeyState::Delete);
				},
			}
		};
		Ok(())
	}
	/// Commit buffered changes to database.
	pub fn flush(&self) -> Result<(), String> {
		let batch = WriteBatch::new();
		let mut overlay = self.overlay.write();
		for (c, column) in overlay.iter_mut().enumerate() {
			let column_data = mem::replace(column, HashMap::new());
			for (key, state) in column_data.into_iter() {
				match state {
					KeyState::Delete => {
						if c > 0 {
							try!(batch.delete_cf(self.cfs[c - 1], &key));
						} else {
							try!(batch.delete(&key));
						}
					},
					KeyState::Insert(value) => {
						if c > 0 {
							try!(batch.put_cf(self.cfs[c - 1], &key, &value));
						} else {
							try!(batch.put(&key, &value));
						}
					},
					KeyState::InsertCompressed(value) => {
						let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
						if c > 0 {
							try!(batch.put_cf(self.cfs[c - 1], &key, &compressed));
						} else {
							try!(batch.put(&key, &value));
						}
					}
				}
			}
		}
		self.db.write_opt(batch, &self.write_opts)
	}
	/// Commit transaction to database.
	pub fn write(&self, tr: DBTransaction) -> Result<(), String> {
		let batch = WriteBatch::new();
		let ops = tr.ops.into_inner();
		for op in ops {
			match op {
				DBOp::Insert { col, key, value } => {
					try!(col.map_or_else(|| batch.put(&key, &value), |c| batch.put_cf(self.cfs[c as usize], &key, &value)))
				},
				DBOp::InsertCompressed { col, key, value } => {
					let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
					try!(col.map_or_else(|| batch.put(&key, &compressed), |c| batch.put_cf(self.cfs[c as usize], &key, &compressed)))
				},
				DBOp::Delete { col, key } => {
					try!(col.map_or_else(|| batch.delete(&key), |c| batch.delete_cf(self.cfs[c as usize], &key)))
				},
			}
		}
		self.db.write_opt(batch, &self.write_opts)
	}
	/// Get value by key.
	pub fn get(&self, col: Option, key: &[u8]) -> Result