Merge pull request #678 from ethcore/reorgjdb
Rearrange journaldb infrastructure to make more extensible
This commit is contained in:
		
						commit
						c9f5a9bc9a
					
				@ -412,6 +412,7 @@ impl BlockQueue {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Optimise memory footprint of the heap fields.
 | 
			
		||||
	pub fn collect_garbage(&self) {
 | 
			
		||||
		{
 | 
			
		||||
			self.verification.unverified.lock().unwrap().shrink_to_fit();
 | 
			
		||||
 | 
			
		||||
@ -131,7 +131,8 @@ impl<V> Client<V> where V: Verifier {
 | 
			
		||||
		let mut dir = path.to_path_buf();
 | 
			
		||||
		dir.push(H64::from(spec.genesis_header().hash()).hex());
 | 
			
		||||
		//TODO: sec/fat: pruned/full versioning
 | 
			
		||||
		dir.push(format!("v{}-sec-{}", CLIENT_DB_VER_STR, if config.prefer_journal { "pruned" } else { "archive" }));
 | 
			
		||||
		// version here is a bit useless now, since it's controlled only be the pruning algo.
 | 
			
		||||
		dir.push(format!("v{}-sec-{}", CLIENT_DB_VER_STR, config.pruning));
 | 
			
		||||
		let path = dir.as_path();
 | 
			
		||||
		let gb = spec.genesis_block();
 | 
			
		||||
		let chain = Arc::new(BlockChain::new(config.blockchain, &gb, path));
 | 
			
		||||
@ -139,8 +140,10 @@ impl<V> Client<V> where V: Verifier {
 | 
			
		||||
		state_path.push("state");
 | 
			
		||||
 | 
			
		||||
		let engine = Arc::new(try!(spec.to_engine()));
 | 
			
		||||
		let mut state_db = Box::new(OptionOneDB::from_prefs(state_path.to_str().unwrap(), config.prefer_journal));
 | 
			
		||||
		if state_db.is_empty() && engine.spec().ensure_db_good(state_db.deref_mut()) {
 | 
			
		||||
		let state_path_str = state_path.to_str().unwrap();
 | 
			
		||||
		let mut state_db = journaldb::new(state_path_str, config.pruning);
 | 
			
		||||
 | 
			
		||||
		if state_db.is_empty() && engine.spec().ensure_db_good(state_db.as_hashdb_mut()) {
 | 
			
		||||
			state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
 | 
			
		||||
pub use block_queue::BlockQueueConfig;
 | 
			
		||||
pub use blockchain::BlockChainConfig;
 | 
			
		||||
use util::journaldb;
 | 
			
		||||
 | 
			
		||||
/// Client configuration. Includes configs for all sub-systems.
 | 
			
		||||
#[derive(Debug, Default)]
 | 
			
		||||
@ -24,8 +25,8 @@ pub struct ClientConfig {
 | 
			
		||||
	pub queue: BlockQueueConfig,
 | 
			
		||||
	/// Blockchain configuration.
 | 
			
		||||
	pub blockchain: BlockChainConfig,
 | 
			
		||||
	/// Prefer journal rather than archive.
 | 
			
		||||
	pub prefer_journal: bool,
 | 
			
		||||
	/// The JournalDB ("pruning") algorithm to use.
 | 
			
		||||
	pub pruning: journaldb::Algorithm,
 | 
			
		||||
	/// The name of the client instance.
 | 
			
		||||
	pub name: String,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -252,7 +252,7 @@ pub fn generate_dummy_empty_blockchain() -> GuardedTempResult<BlockChain> {
 | 
			
		||||
 | 
			
		||||
pub fn get_temp_journal_db() -> GuardedTempResult<Box<JournalDB>> {
 | 
			
		||||
	let temp = RandomTempPath::new();
 | 
			
		||||
	let journal_db: Box<JournalDB> = Box::new(OptionOneDB::new(temp.as_str()));
 | 
			
		||||
	let journal_db = journaldb::new(temp.as_str(), journaldb::Algorithm::EarlyMerge);
 | 
			
		||||
	GuardedTempResult {
 | 
			
		||||
		_temp: temp,
 | 
			
		||||
		result: Some(journal_db)
 | 
			
		||||
@ -269,7 +269,7 @@ pub fn get_temp_state() -> GuardedTempResult<State> {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn get_temp_journal_db_in(path: &Path) -> Box<JournalDB> {
 | 
			
		||||
	Box::new(OptionOneDB::new(path.to_str().unwrap()))
 | 
			
		||||
	journaldb::new(path.to_str().unwrap(), journaldb::Algorithm::EarlyMerge)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn get_temp_state_in(path: &Path) -> State {
 | 
			
		||||
 | 
			
		||||
@ -78,6 +78,11 @@ Usage:
 | 
			
		||||
Protocol Options:
 | 
			
		||||
  --chain CHAIN            Specify the blockchain type. CHAIN may be either a JSON chain specification file
 | 
			
		||||
                           or olympic, frontier, homestead, mainnet, morden, or testnet [default: homestead].
 | 
			
		||||
  --testnet                Equivalent to --chain testnet (geth-compatible).
 | 
			
		||||
  --networkid INDEX        Override the network identifier from the chain we are on.
 | 
			
		||||
  --pruning METHOD         Configure pruning of the state/storage trie. METHOD may be one of: archive,
 | 
			
		||||
                           light (experimental) [default: archive].
 | 
			
		||||
  -d --datadir PATH        Specify the database & configuration directory path [default: $HOME/.parity]
 | 
			
		||||
  --db-path PATH           Specify the database & configuration directory path [default: $HOME/.parity]
 | 
			
		||||
  --pruning                Client should prune the state/storage trie.
 | 
			
		||||
  --keys-path PATH         Specify the path for JSON key files to be found [default: $HOME/.web3/keys]
 | 
			
		||||
@ -99,7 +104,12 @@ API and Console Options:
 | 
			
		||||
  --jsonrpc-port PORT      Specify the port portion of the JSONRPC API server [default: 8545].
 | 
			
		||||
  --jsonrpc-cors URL       Specify CORS header for JSON-RPC API responses [default: null].
 | 
			
		||||
  --jsonrpc-apis APIS      Specify the APIs available through the JSONRPC interface. APIS is a comma-delimited
 | 
			
		||||
                           list of API name. Possible name are web3, eth and net. [default: web3,eth,net].
 | 
			
		||||
                           list of API name. Possible names are web3, eth and net. [default: web3,eth,net].
 | 
			
		||||
  --rpc                    Equivalent to --jsonrpc (geth-compatible).
 | 
			
		||||
  --rpcaddr HOST           Equivalent to --jsonrpc-addr HOST (geth-compatible).
 | 
			
		||||
  --rpcport PORT           Equivalent to --jsonrpc-port PORT (geth-compatible).
 | 
			
		||||
  --rpcapi APIS            Equivalent to --jsonrpc-apis APIS (geth-compatible).
 | 
			
		||||
  --rpccorsdomain URL      Equivalent to --jsonrpc-cors URL (geth-compatible).
 | 
			
		||||
 | 
			
		||||
Sealing/Mining Options:
 | 
			
		||||
  --author ADDRESS         Specify the block author (aka "coinbase") address for sending block rewards
 | 
			
		||||
@ -147,7 +157,7 @@ struct Args {
 | 
			
		||||
	flag_cache: Option<usize>,
 | 
			
		||||
	flag_keys_path: String,
 | 
			
		||||
	flag_bootnodes: Option<String>,
 | 
			
		||||
	flag_pruning: bool,
 | 
			
		||||
	flag_pruning: String,
 | 
			
		||||
	flag_no_bootstrap: bool,
 | 
			
		||||
	flag_port: u16,
 | 
			
		||||
	flag_peers: usize,
 | 
			
		||||
@ -409,7 +419,14 @@ impl Configuration {
 | 
			
		||||
				client_config.blockchain.max_cache_size = self.args.flag_cache_max_size;
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		client_config.prefer_journal = self.args.flag_pruning;
 | 
			
		||||
		client_config.pruning = match self.args.flag_pruning.as_str() {
 | 
			
		||||
			"" => journaldb::Algorithm::Archive,
 | 
			
		||||
			"archive" => journaldb::Algorithm::Archive,
 | 
			
		||||
			"pruned" => journaldb::Algorithm::EarlyMerge,
 | 
			
		||||
//			"fast" => journaldb::Algorithm::OverlayRecent,	// TODO: @arkpar uncomment this once option 2 is merged.
 | 
			
		||||
//			"slow" => journaldb::Algorithm::RefCounted,		// TODO: @gavofyork uncomment this once ref-count algo is merged.
 | 
			
		||||
			_ => { die!("Invalid pruning method given."); }
 | 
			
		||||
		};
 | 
			
		||||
		client_config.name = self.args.flag_identity.clone();
 | 
			
		||||
		client_config.queue.max_mem_use = self.args.flag_queue_max_size;
 | 
			
		||||
		let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap();
 | 
			
		||||
@ -430,7 +447,7 @@ impl Configuration {
 | 
			
		||||
				self.args.flag_rpcaddr.as_ref().unwrap_or(&self.args.flag_jsonrpc_addr),
 | 
			
		||||
				self.args.flag_rpcport.unwrap_or(self.args.flag_jsonrpc_port)
 | 
			
		||||
			);
 | 
			
		||||
			SocketAddr::from_str(&url).unwrap_or_else(|_|die!("{}: Invalid JSONRPC listen host/port given.", url));
 | 
			
		||||
			SocketAddr::from_str(&url).unwrap_or_else(|_| die!("{}: Invalid JSONRPC listen host/port given.", url));
 | 
			
		||||
			let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors);
 | 
			
		||||
			// TODO: use this as the API list.
 | 
			
		||||
			let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis);
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										388
									
								
								util/src/journaldb/archivedb.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										388
									
								
								util/src/journaldb/archivedb.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,388 @@
 | 
			
		||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
 | 
			
		||||
// This file is part of Parity.
 | 
			
		||||
 | 
			
		||||
// Parity is free software: you can redistribute it and/or modify
 | 
			
		||||
// it under the terms of the GNU General Public License as published by
 | 
			
		||||
// the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
// (at your option) any later version.
 | 
			
		||||
 | 
			
		||||
// Parity is distributed in the hope that it will be useful,
 | 
			
		||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
// GNU General Public License for more details.
 | 
			
		||||
 | 
			
		||||
// You should have received a copy of the GNU General Public License
 | 
			
		||||
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
//! Disk-backed HashDB implementation.
 | 
			
		||||
 | 
			
		||||
use common::*;
 | 
			
		||||
use rlp::*;
 | 
			
		||||
use hashdb::*;
 | 
			
		||||
use memorydb::*;
 | 
			
		||||
use super::traits::JournalDB;
 | 
			
		||||
use kvdb::{Database, DBTransaction, DatabaseConfig};
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
use std::env;
 | 
			
		||||
 | 
			
		||||
/// Implementation of the HashDB trait for a disk-backed database with a memory overlay
 | 
			
		||||
/// and latent-removal semantics.
 | 
			
		||||
///
 | 
			
		||||
/// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to
 | 
			
		||||
/// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect
 | 
			
		||||
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before
 | 
			
		||||
/// the removals actually take effect.
 | 
			
		||||
pub struct ArchiveDB {
 | 
			
		||||
	overlay: MemoryDB,
 | 
			
		||||
	backing: Arc<Database>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// all keys must be at least 12 bytes
 | 
			
		||||
const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ];
 | 
			
		||||
const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ];
 | 
			
		||||
const DB_VERSION : u32 = 259;
 | 
			
		||||
 | 
			
		||||
impl ArchiveDB {
 | 
			
		||||
	/// Create a new instance from file
 | 
			
		||||
	pub fn new(path: &str) -> ArchiveDB {
 | 
			
		||||
		let opts = DatabaseConfig {
 | 
			
		||||
			prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix
 | 
			
		||||
		};
 | 
			
		||||
		let backing = Database::open(&opts, path).unwrap_or_else(|e| {
 | 
			
		||||
			panic!("Error opening state db: {}", e);
 | 
			
		||||
		});
 | 
			
		||||
		if !backing.is_empty() {
 | 
			
		||||
			match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::<u32>(&v))) {
 | 
			
		||||
				Ok(Some(DB_VERSION)) => {},
 | 
			
		||||
				v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database");
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		ArchiveDB {
 | 
			
		||||
			overlay: MemoryDB::new(),
 | 
			
		||||
			backing: Arc::new(backing),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Create a new instance with an anonymous temporary database.
 | 
			
		||||
	#[cfg(test)]
 | 
			
		||||
	fn new_temp() -> ArchiveDB {
 | 
			
		||||
		let mut dir = env::temp_dir();
 | 
			
		||||
		dir.push(H32::random().hex());
 | 
			
		||||
		Self::new(dir.to_str().unwrap())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn payload(&self, key: &H256) -> Option<Bytes> {
 | 
			
		||||
		self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl HashDB for ArchiveDB {
 | 
			
		||||
	fn keys(&self) -> HashMap<H256, i32> {
 | 
			
		||||
		let mut ret: HashMap<H256, i32> = HashMap::new();
 | 
			
		||||
		for (key, _) in self.backing.iter() {
 | 
			
		||||
			let h = H256::from_slice(key.deref());
 | 
			
		||||
			ret.insert(h, 1);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for (key, refs) in self.overlay.keys().into_iter() {
 | 
			
		||||
			let refs = *ret.get(&key).unwrap_or(&0) + refs;
 | 
			
		||||
			ret.insert(key, refs);
 | 
			
		||||
		}
 | 
			
		||||
		ret
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn lookup(&self, key: &H256) -> Option<&[u8]> {
 | 
			
		||||
		let k = self.overlay.raw(key);
 | 
			
		||||
		match k {
 | 
			
		||||
			Some(&(ref d, rc)) if rc > 0 => Some(d),
 | 
			
		||||
			_ => {
 | 
			
		||||
				if let Some(x) = self.payload(key) {
 | 
			
		||||
					Some(&self.overlay.denote(key, x).0)
 | 
			
		||||
				}
 | 
			
		||||
				else {
 | 
			
		||||
					None
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn exists(&self, key: &H256) -> bool {
 | 
			
		||||
		self.lookup(key).is_some()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn insert(&mut self, value: &[u8]) -> H256 {
 | 
			
		||||
		self.overlay.insert(value)
 | 
			
		||||
	}
 | 
			
		||||
	fn emplace(&mut self, key: H256, value: Bytes) {
 | 
			
		||||
		self.overlay.emplace(key, value);
 | 
			
		||||
	}
 | 
			
		||||
	fn kill(&mut self, key: &H256) {
 | 
			
		||||
		self.overlay.kill(key);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl JournalDB for ArchiveDB {
 | 
			
		||||
	fn spawn(&self) -> Box<JournalDB> {
 | 
			
		||||
		Box::new(ArchiveDB {
 | 
			
		||||
			overlay: MemoryDB::new(),
 | 
			
		||||
			backing: self.backing.clone(),
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn mem_used(&self) -> usize {
 | 
			
		||||
		self.overlay.mem_used()
 | 
			
		||||
 	}
 | 
			
		||||
 | 
			
		||||
	fn is_empty(&self) -> bool {
 | 
			
		||||
		self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn commit(&mut self, _: u64, _: &H256, _: Option<(u64, H256)>) -> Result<u32, UtilError> {
 | 
			
		||||
		let batch = DBTransaction::new();
 | 
			
		||||
		let mut inserts = 0usize;
 | 
			
		||||
		let mut deletes = 0usize;
 | 
			
		||||
		for i in self.overlay.drain().into_iter() {
 | 
			
		||||
			let (key, (value, rc)) = i;
 | 
			
		||||
			if rc > 0 {
 | 
			
		||||
				assert!(rc == 1);
 | 
			
		||||
				batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?");
 | 
			
		||||
				inserts += 1;
 | 
			
		||||
			}
 | 
			
		||||
			if rc < 0 {
 | 
			
		||||
				assert!(rc == -1);
 | 
			
		||||
				deletes += 1;
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		try!(self.backing.write(batch));
 | 
			
		||||
		Ok((inserts + deletes) as u32)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod tests {
 | 
			
		||||
	use common::*;
 | 
			
		||||
	use super::*;
 | 
			
		||||
	use hashdb::*;
 | 
			
		||||
	use journaldb::traits::JournalDB;
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn insert_same_in_fork() {
 | 
			
		||||
		// history is 1
 | 
			
		||||
		let mut jdb = ArchiveDB::new_temp();
 | 
			
		||||
 | 
			
		||||
		let x = jdb.insert(b"X");
 | 
			
		||||
		jdb.commit(1, &b"1".sha3(), None).unwrap();
 | 
			
		||||
		jdb.commit(2, &b"2".sha3(), None).unwrap();
 | 
			
		||||
		jdb.commit(3, &b"1002a".sha3(), Some((1, b"1".sha3()))).unwrap();
 | 
			
		||||
		jdb.commit(4, &b"1003a".sha3(), Some((2, b"2".sha3()))).unwrap();
 | 
			
		||||
 | 
			
		||||
		jdb.remove(&x);
 | 
			
		||||
		jdb.commit(3, &b"1002b".sha3(), Some((1, b"1".sha3()))).unwrap();
 | 
			
		||||
		let x = jdb.insert(b"X");
 | 
			
		||||
		jdb.commit(4, &b"1003b".sha3(), Some((2, b"2".sha3()))).unwrap();
 | 
			
		||||
 | 
			
		||||
		jdb.commit(5, &b"1004a".sha3(), Some((3, b"1002a".sha3()))).unwrap();
 | 
			
		||||
		jdb.commit(6, &b"1005a".sha3(), Some((4, b"1003a".sha3()))).unwrap();
 | 
			
		||||
 | 
			
		||||
		assert!(jdb.exists(&x));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn long_history() {
 | 
			
		||||
		// history is 3
 | 
			
		||||
		let mut jdb = ArchiveDB::new_temp();
 | 
			
		||||
		let h = jdb.insert(b"foo");
 | 
			
		||||
		jdb.commit(0, &b"0".sha3(), None).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&h));
 | 
			
		||||
		jdb.remove(&h);
 | 
			
		||||
		jdb.commit(1, &b"1".sha3(), None).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&h));
 | 
			
		||||
		jdb.commit(2, &b"2".sha3(), None).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&h));
 | 
			
		||||
		jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&h));
 | 
			
		||||
		jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn complex() {
 | 
			
		||||
		// history is 1
 | 
			
		||||
		let mut jdb = ArchiveDB::new_temp();
 | 
			
		||||
 | 
			
		||||
		let foo = jdb.insert(b"foo");
 | 
			
		||||
		let bar = jdb.insert(b"bar");
 | 
			
		||||
		jdb.commit(0, &b"0".sha3(), None).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
		assert!(jdb.exists(&bar));
 | 
			
		||||
 | 
			
		||||
		jdb.remove(&foo);
 | 
			
		||||
		jdb.remove(&bar);
 | 
			
		||||
		let baz = jdb.insert(b"baz");
 | 
			
		||||
		jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
		assert!(jdb.exists(&bar));
 | 
			
		||||
		assert!(jdb.exists(&baz));
 | 
			
		||||
 | 
			
		||||
		let foo = jdb.insert(b"foo");
 | 
			
		||||
		jdb.remove(&baz);
 | 
			
		||||
		jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
		assert!(jdb.exists(&baz));
 | 
			
		||||
 | 
			
		||||
		jdb.remove(&foo);
 | 
			
		||||
		jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
 | 
			
		||||
		jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn fork() {
 | 
			
		||||
		// history is 1
 | 
			
		||||
		let mut jdb = ArchiveDB::new_temp();
 | 
			
		||||
 | 
			
		||||
		let foo = jdb.insert(b"foo");
 | 
			
		||||
		let bar = jdb.insert(b"bar");
 | 
			
		||||
		jdb.commit(0, &b"0".sha3(), None).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
		assert!(jdb.exists(&bar));
 | 
			
		||||
 | 
			
		||||
		jdb.remove(&foo);
 | 
			
		||||
		let baz = jdb.insert(b"baz");
 | 
			
		||||
		jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
 | 
			
		||||
		jdb.remove(&bar);
 | 
			
		||||
		jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
		assert!(jdb.exists(&bar));
 | 
			
		||||
		assert!(jdb.exists(&baz));
 | 
			
		||||
 | 
			
		||||
		jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn overwrite() {
 | 
			
		||||
		// history is 1
 | 
			
		||||
		let mut jdb = ArchiveDB::new_temp();
 | 
			
		||||
 | 
			
		||||
		let foo = jdb.insert(b"foo");
 | 
			
		||||
		jdb.commit(0, &b"0".sha3(), None).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
 | 
			
		||||
		jdb.remove(&foo);
 | 
			
		||||
		jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
		jdb.insert(b"foo");
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
		jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
		jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn fork_same_key() {
 | 
			
		||||
		// history is 1
 | 
			
		||||
		let mut jdb = ArchiveDB::new_temp();
 | 
			
		||||
		jdb.commit(0, &b"0".sha3(), None).unwrap();
 | 
			
		||||
 | 
			
		||||
		let foo = jdb.insert(b"foo");
 | 
			
		||||
		jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
 | 
			
		||||
		jdb.insert(b"foo");
 | 
			
		||||
		jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
 | 
			
		||||
		jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap();
 | 
			
		||||
		assert!(jdb.exists(&foo));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn reopen() {
 | 
			
		||||
		let mut dir = ::std::env::temp_dir();
 | 
			
		||||
		dir.push(H32::random().hex());
 | 
			
		||||
		let bar = H256::random();
 | 
			
		||||
 | 
			
		||||
		let foo = {
 | 
			
		||||
			let mut jdb = ArchiveDB::new(dir.to_str().unwrap());
 | 
			
		||||
			// history is 1
 | 
			
		||||
			let foo = jdb.insert(b"foo");
 | 
			
		||||
			jdb.emplace(bar.clone(), b"bar".to_vec());
 | 
			
		||||
			jdb.commit(0, &b"0".sha3(), None).unwrap();
 | 
			
		||||
			foo
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			let mut jdb = ArchiveDB::new(dir.to_str().unwrap());
 | 
			
		||||
			jdb.remove(&foo);
 | 
			
		||||
			jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			let mut jdb = ArchiveDB::new(dir.to_str().unwrap());
 | 
			
		||||
			assert!(jdb.exists(&foo));
 | 
			
		||||
			assert!(jdb.exists(&bar));
 | 
			
		||||
			jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn reopen_remove() {
 | 
			
		||||
		let mut dir = ::std::env::temp_dir();
 | 
			
		||||
		dir.push(H32::random().hex());
 | 
			
		||||
 | 
			
		||||
		let foo = {
 | 
			
		||||
			let mut jdb = ArchiveDB::new(dir.to_str().unwrap());
 | 
			
		||||
			// history is 1
 | 
			
		||||
			let foo = jdb.insert(b"foo");
 | 
			
		||||
			jdb.commit(0, &b"0".sha3(), None).unwrap();
 | 
			
		||||
			jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
 | 
			
		||||
			// foo is ancient history.
 | 
			
		||||
 | 
			
		||||
			jdb.insert(b"foo");
 | 
			
		||||
			jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
 | 
			
		||||
			foo
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			let mut jdb = ArchiveDB::new(dir.to_str().unwrap());
 | 
			
		||||
			jdb.remove(&foo);
 | 
			
		||||
			jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
 | 
			
		||||
			assert!(jdb.exists(&foo));
 | 
			
		||||
			jdb.remove(&foo);
 | 
			
		||||
			jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
 | 
			
		||||
			jdb.commit(5, &b"5".sha3(), Some((4, b"4".sha3()))).unwrap();
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn reopen_fork() {
 | 
			
		||||
		let mut dir = ::std::env::temp_dir();
 | 
			
		||||
		dir.push(H32::random().hex());
 | 
			
		||||
		let (foo, bar, baz) = {
 | 
			
		||||
			let mut jdb = ArchiveDB::new(dir.to_str().unwrap());
 | 
			
		||||
			// history is 1
 | 
			
		||||
			let foo = jdb.insert(b"foo");
 | 
			
		||||
			let bar = jdb.insert(b"bar");
 | 
			
		||||
			jdb.commit(0, &b"0".sha3(), None).unwrap();
 | 
			
		||||
			jdb.remove(&foo);
 | 
			
		||||
			let baz = jdb.insert(b"baz");
 | 
			
		||||
			jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
 | 
			
		||||
			jdb.remove(&bar);
 | 
			
		||||
			jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
 | 
			
		||||
			(foo, bar, baz)
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			let mut jdb = ArchiveDB::new(dir.to_str().unwrap());
 | 
			
		||||
			jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
 | 
			
		||||
			assert!(jdb.exists(&foo));
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										78
									
								
								util/src/journaldb/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										78
									
								
								util/src/journaldb/mod.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,78 @@
 | 
			
		||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
 | 
			
		||||
// This file is part of Parity.
 | 
			
		||||
 | 
			
		||||
// Parity is free software: you can redistribute it and/or modify
 | 
			
		||||
// it under the terms of the GNU General Public License as published by
 | 
			
		||||
// the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
// (at your option) any later version.
 | 
			
		||||
 | 
			
		||||
// Parity is distributed in the hope that it will be useful,
 | 
			
		||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
// GNU General Public License for more details.
 | 
			
		||||
 | 
			
		||||
// You should have received a copy of the GNU General Public License
 | 
			
		||||
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
//! JournalDB interface and implementation.
 | 
			
		||||
 | 
			
		||||
use common::*;
 | 
			
		||||
 | 
			
		||||
/// Export the journaldb module.
 | 
			
		||||
pub mod traits;
 | 
			
		||||
mod archivedb;
 | 
			
		||||
mod optiononedb;
 | 
			
		||||
 | 
			
		||||
/// Export the JournalDB trait.
 | 
			
		||||
pub use self::traits::JournalDB;
 | 
			
		||||
 | 
			
		||||
/// A journal database algorithm.
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
pub enum Algorithm {
 | 
			
		||||
	/// Keep all keys forever.
 | 
			
		||||
	Archive,
 | 
			
		||||
 | 
			
		||||
	/// Ancient and recent history maintained separately; recent history lasts for particular
 | 
			
		||||
	/// number of blocks.
 | 
			
		||||
	///
 | 
			
		||||
	/// Inserts go into backing database, journal retains knowledge of whether backing DB key is
 | 
			
		||||
	/// ancient or recent. Non-canon inserts get explicitly reverted and removed from backing DB.
 | 
			
		||||
	EarlyMerge,
 | 
			
		||||
 | 
			
		||||
	/// Ancient and recent history maintained separately; recent history lasts for particular
 | 
			
		||||
	/// number of blocks.
 | 
			
		||||
	///
 | 
			
		||||
	/// Inserts go into memory overlay, which is tried for key fetches. Memory overlay gets
 | 
			
		||||
	/// flushed in backing only at end of recent history.
 | 
			
		||||
	OverlayRecent,
 | 
			
		||||
 | 
			
		||||
	/// Ancient and recent history maintained separately; recent history lasts for particular
 | 
			
		||||
	/// number of blocks.
 | 
			
		||||
	///
 | 
			
		||||
	/// References are counted in disk-backed DB.
 | 
			
		||||
	RefCounted,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Default for Algorithm {
 | 
			
		||||
	fn default() -> Algorithm { Algorithm::Archive }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl fmt::Display for Algorithm {
 | 
			
		||||
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
 | 
			
		||||
		write!(f, "{}", match self {
 | 
			
		||||
			&Algorithm::Archive => "archive",
 | 
			
		||||
			&Algorithm::EarlyMerge => "earlymerge",
 | 
			
		||||
			&Algorithm::OverlayRecent => "overlayrecent",
 | 
			
		||||
			&Algorithm::RefCounted => "refcounted",
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Create a new JournalDB trait object.
 | 
			
		||||
pub fn new(path: &str, algorithm: Algorithm) -> Box<JournalDB> {
 | 
			
		||||
	match algorithm {
 | 
			
		||||
		Algorithm::Archive => Box::new(archivedb::ArchiveDB::new(path)),
 | 
			
		||||
		Algorithm::EarlyMerge => Box::new(optiononedb::OptionOneDB::new(path)),
 | 
			
		||||
		_ => unimplemented!(),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -20,26 +20,11 @@ use common::*;
 | 
			
		||||
use rlp::*;
 | 
			
		||||
use hashdb::*;
 | 
			
		||||
use memorydb::*;
 | 
			
		||||
use super::traits::JournalDB;
 | 
			
		||||
use kvdb::{Database, DBTransaction, DatabaseConfig};
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
use std::env;
 | 
			
		||||
 | 
			
		||||
/// A HashDB which can manage a short-term journal potentially containing many forks of mutually
 | 
			
		||||
/// exclusive actions.
 | 
			
		||||
pub trait JournalDB : HashDB + Sync + Send {
 | 
			
		||||
	/// Return a copy of ourself, in a box.
 | 
			
		||||
	fn spawn(&self) -> Box<JournalDB>;
 | 
			
		||||
 | 
			
		||||
	/// Returns heap memory size used
 | 
			
		||||
	fn mem_used(&self) -> usize;
 | 
			
		||||
 | 
			
		||||
	/// Check if this database has any commits
 | 
			
		||||
	fn is_empty(&self) -> bool;
 | 
			
		||||
 | 
			
		||||
	/// Commit all recent insert operations.
 | 
			
		||||
	fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Implementation of the HashDB trait for a disk-backed database with a memory overlay
 | 
			
		||||
/// and latent-removal semantics.
 | 
			
		||||
///
 | 
			
		||||
@ -56,43 +41,28 @@ pub struct OptionOneDB {
 | 
			
		||||
// all keys must be at least 12 bytes
 | 
			
		||||
const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ];
 | 
			
		||||
const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ];
 | 
			
		||||
 | 
			
		||||
const DB_VERSION : u32 = 3;
 | 
			
		||||
const DB_VERSION_NO_JOURNAL : u32 = 3 + 256;
 | 
			
		||||
 | 
			
		||||
const PADDING : [u8; 10] = [ 0u8; 10 ];
 | 
			
		||||
 | 
			
		||||
impl OptionOneDB {
 | 
			
		||||
	/// Create a new instance from file
 | 
			
		||||
	pub fn new(path: &str) -> OptionOneDB {
 | 
			
		||||
		Self::from_prefs(path, true)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Create a new instance from file
 | 
			
		||||
	pub fn from_prefs(path: &str, prefer_journal: bool) -> OptionOneDB {
 | 
			
		||||
		let opts = DatabaseConfig {
 | 
			
		||||
			prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix
 | 
			
		||||
		};
 | 
			
		||||
		let backing = Database::open(&opts, path).unwrap_or_else(|e| {
 | 
			
		||||
			panic!("Error opening state db: {}", e);
 | 
			
		||||
		});
 | 
			
		||||
		let with_journal;
 | 
			
		||||
		if !backing.is_empty() {
 | 
			
		||||
			match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::<u32>(&v))) {
 | 
			
		||||
				Ok(Some(DB_VERSION)) => { with_journal = true; },
 | 
			
		||||
				Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; },
 | 
			
		||||
				Ok(Some(DB_VERSION)) => {},
 | 
			
		||||
				v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database");
 | 
			
		||||
			with_journal = prefer_journal;
 | 
			
		||||
			backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database");
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		let counters = if with_journal {
 | 
			
		||||
			Some(Arc::new(RwLock::new(OptionOneDB::read_counters(&backing))))
 | 
			
		||||
		} else {
 | 
			
		||||
			None
 | 
			
		||||
		};
 | 
			
		||||
		let counters = Some(Arc::new(RwLock::new(OptionOneDB::read_counters(&backing))));
 | 
			
		||||
		OptionOneDB {
 | 
			
		||||
			overlay: MemoryDB::new(),
 | 
			
		||||
			backing: Arc::new(backing),
 | 
			
		||||
@ -108,34 +78,6 @@ impl OptionOneDB {
 | 
			
		||||
		Self::new(dir.to_str().unwrap())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Drain the overlay and place it into a batch for the DB.
 | 
			
		||||
	fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize {
 | 
			
		||||
		let mut inserts = 0usize;
 | 
			
		||||
		let mut deletes = 0usize;
 | 
			
		||||
		for i in overlay.drain().into_iter() {
 | 
			
		||||
			let (key, (value, rc)) = i;
 | 
			
		||||
			if rc > 0 {
 | 
			
		||||
				assert!(rc == 1);
 | 
			
		||||
				batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?");
 | 
			
		||||
				inserts += 1;
 | 
			
		||||
			}
 | 
			
		||||
			if rc < 0 {
 | 
			
		||||
				assert!(rc == -1);
 | 
			
		||||
				deletes += 1;
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes);
 | 
			
		||||
		inserts + deletes
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Just commit the overlay into the backing DB.
 | 
			
		||||
	fn commit_without_counters(&mut self) -> Result<u32, UtilError> {
 | 
			
		||||
		let batch = DBTransaction::new();
 | 
			
		||||
		let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch);
 | 
			
		||||
		try!(self.backing.write(batch));
 | 
			
		||||
		Ok(ret as u32)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn morph_key(key: &H256, index: u8) -> Bytes {
 | 
			
		||||
		let mut ret = key.bytes().to_owned();
 | 
			
		||||
		ret.push(index);
 | 
			
		||||
@ -217,9 +159,106 @@ impl OptionOneDB {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	/// Commit all recent insert operations and historical removals from the old era
 | 
			
		||||
	/// to the backing database.
 | 
			
		||||
	fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
 | 
			
		||||
	fn payload(&self, key: &H256) -> Option<Bytes> {
 | 
			
		||||
		self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn read_counters(db: &Database) -> HashMap<H256, i32> {
 | 
			
		||||
		let mut counters = HashMap::new();
 | 
			
		||||
		if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") {
 | 
			
		||||
			let mut era = decode::<u64>(&val);
 | 
			
		||||
			loop {
 | 
			
		||||
				let mut index = 0usize;
 | 
			
		||||
				while let Some(rlp_data) = db.get({
 | 
			
		||||
					let mut r = RlpStream::new_list(3);
 | 
			
		||||
					r.append(&era);
 | 
			
		||||
					r.append(&index);
 | 
			
		||||
					r.append(&&PADDING[..]);
 | 
			
		||||
					&r.drain()
 | 
			
		||||
				}).expect("Low-level database error.") {
 | 
			
		||||
					trace!("read_counters: era={}, index={}", era, index);
 | 
			
		||||
					let rlp = Rlp::new(&rlp_data);
 | 
			
		||||
					let inserts: Vec<H256> = rlp.val_at(1);
 | 
			
		||||
					Self::replay_keys(&inserts, db, &mut counters);
 | 
			
		||||
					index += 1;
 | 
			
		||||
				};
 | 
			
		||||
				if index == 0 || era == 0 {
 | 
			
		||||
					break;
 | 
			
		||||
				}
 | 
			
		||||
				era -= 1;
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		trace!("Recovered {} counters", counters.len());
 | 
			
		||||
		counters
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl HashDB for OptionOneDB {
 | 
			
		||||
	fn keys(&self) -> HashMap<H256, i32> {
 | 
			
		||||
		let mut ret: HashMap<H256, i32> = HashMap::new();
 | 
			
		||||
		for (key, _) in self.backing.iter() {
 | 
			
		||||
			let h = H256::from_slice(key.deref());
 | 
			
		||||
			ret.insert(h, 1);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for (key, refs) in self.overlay.keys().into_iter() {
 | 
			
		||||
			let refs = *ret.get(&key).unwrap_or(&0) + refs;
 | 
			
		||||
			ret.insert(key, refs);
 | 
			
		||||
		}
 | 
			
		||||
		ret
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn lookup(&self, key: &H256) -> Option<&[u8]> {
 | 
			
		||||
		let k = self.overlay.raw(key);
 | 
			
		||||
		match k {
 | 
			
		||||
			Some(&(ref d, rc)) if rc > 0 => Some(d),
 | 
			
		||||
			_ => {
 | 
			
		||||
				if let Some(x) = self.payload(key) {
 | 
			
		||||
					Some(&self.overlay.denote(key, x).0)
 | 
			
		||||
				}
 | 
			
		||||
				else {
 | 
			
		||||
					None
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn exists(&self, key: &H256) -> bool {
 | 
			
		||||
		self.lookup(key).is_some()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn insert(&mut self, value: &[u8]) -> H256 {
 | 
			
		||||
		self.overlay.insert(value)
 | 
			
		||||
	}
 | 
			
		||||
	fn emplace(&mut self, key: H256, value: Bytes) {
 | 
			
		||||
		self.overlay.emplace(key, value);
 | 
			
		||||
	}
 | 
			
		||||
	fn kill(&mut self, key: &H256) {
 | 
			
		||||
		self.overlay.kill(key);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl JournalDB for OptionOneDB {
 | 
			
		||||
	fn spawn(&self) -> Box<JournalDB> {
 | 
			
		||||
		Box::new(OptionOneDB {
 | 
			
		||||
			overlay: MemoryDB::new(),
 | 
			
		||||
			backing: self.backing.clone(),
 | 
			
		||||
			counters: self.counters.clone(),
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn mem_used(&self) -> usize {
 | 
			
		||||
		self.overlay.mem_used() + match self.counters {
 | 
			
		||||
			Some(ref c) => c.read().unwrap().heap_size_of_children(),
 | 
			
		||||
			None => 0
 | 
			
		||||
		}
 | 
			
		||||
 	}
 | 
			
		||||
 | 
			
		||||
	fn is_empty(&self) -> bool {
 | 
			
		||||
		self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
 | 
			
		||||
		// journal format:
 | 
			
		||||
		// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
 | 
			
		||||
		// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
 | 
			
		||||
@ -337,114 +376,6 @@ impl OptionOneDB {
 | 
			
		||||
//		trace!("OptionOneDB::commit() deleted {} nodes", deletes);
 | 
			
		||||
		Ok(0)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn payload(&self, key: &H256) -> Option<Bytes> {
 | 
			
		||||
		self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn read_counters(db: &Database) -> HashMap<H256, i32> {
 | 
			
		||||
		let mut counters = HashMap::new();
 | 
			
		||||
		if let Some(val) = db.get(&LATEST_ERA_KEY).expect("Low-level database error.") {
 | 
			
		||||
			let mut era = decode::<u64>(&val);
 | 
			
		||||
			loop {
 | 
			
		||||
				let mut index = 0usize;
 | 
			
		||||
				while let Some(rlp_data) = db.get({
 | 
			
		||||
					let mut r = RlpStream::new_list(3);
 | 
			
		||||
					r.append(&era);
 | 
			
		||||
					r.append(&index);
 | 
			
		||||
					r.append(&&PADDING[..]);
 | 
			
		||||
					&r.drain()
 | 
			
		||||
				}).expect("Low-level database error.") {
 | 
			
		||||
					trace!("read_counters: era={}, index={}", era, index);
 | 
			
		||||
					let rlp = Rlp::new(&rlp_data);
 | 
			
		||||
					let inserts: Vec<H256> = rlp.val_at(1);
 | 
			
		||||
					Self::replay_keys(&inserts, db, &mut counters);
 | 
			
		||||
					index += 1;
 | 
			
		||||
				};
 | 
			
		||||
				if index == 0 || era == 0 {
 | 
			
		||||
					break;
 | 
			
		||||
				}
 | 
			
		||||
				era -= 1;
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		trace!("Recovered {} counters", counters.len());
 | 
			
		||||
		counters
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl HashDB for OptionOneDB {
 | 
			
		||||
	fn keys(&self) -> HashMap<H256, i32> {
 | 
			
		||||
		let mut ret: HashMap<H256, i32> = HashMap::new();
 | 
			
		||||
		for (key, _) in self.backing.iter() {
 | 
			
		||||
			let h = H256::from_slice(key.deref());
 | 
			
		||||
			ret.insert(h, 1);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for (key, refs) in self.overlay.keys().into_iter() {
 | 
			
		||||
			let refs = *ret.get(&key).unwrap_or(&0) + refs;
 | 
			
		||||
			ret.insert(key, refs);
 | 
			
		||||
		}
 | 
			
		||||
		ret
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn lookup(&self, key: &H256) -> Option<&[u8]> {
 | 
			
		||||
		let k = self.overlay.raw(key);
 | 
			
		||||
		match k {
 | 
			
		||||
			Some(&(ref d, rc)) if rc > 0 => Some(d),
 | 
			
		||||
			_ => {
 | 
			
		||||
				if let Some(x) = self.payload(key) {
 | 
			
		||||
					Some(&self.overlay.denote(key, x).0)
 | 
			
		||||
				}
 | 
			
		||||
				else {
 | 
			
		||||
					None
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn exists(&self, key: &H256) -> bool {
 | 
			
		||||
		self.lookup(key).is_some()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn insert(&mut self, value: &[u8]) -> H256 {
 | 
			
		||||
		self.overlay.insert(value)
 | 
			
		||||
	}
 | 
			
		||||
	fn emplace(&mut self, key: H256, value: Bytes) {
 | 
			
		||||
		self.overlay.emplace(key, value);
 | 
			
		||||
	}
 | 
			
		||||
	fn kill(&mut self, key: &H256) {
 | 
			
		||||
		self.overlay.kill(key);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl JournalDB for OptionOneDB {
 | 
			
		||||
	fn spawn(&self) -> Box<JournalDB> {
 | 
			
		||||
		Box::new(OptionOneDB {
 | 
			
		||||
			overlay: MemoryDB::new(),
 | 
			
		||||
			backing: self.backing.clone(),
 | 
			
		||||
			counters: self.counters.clone(),
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn mem_used(&self) -> usize {
 | 
			
		||||
		self.overlay.mem_used() + match self.counters {
 | 
			
		||||
			Some(ref c) => c.read().unwrap().heap_size_of_children(),
 | 
			
		||||
			None => 0
 | 
			
		||||
		}
 | 
			
		||||
 	}
 | 
			
		||||
 | 
			
		||||
	fn is_empty(&self) -> bool {
 | 
			
		||||
		self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
 | 
			
		||||
		let have_counters = self.counters.is_some();
 | 
			
		||||
		if have_counters {
 | 
			
		||||
			self.commit_with_counters(now, id, end)
 | 
			
		||||
		} else {
 | 
			
		||||
			self.commit_without_counters()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
@ -452,6 +383,7 @@ mod tests {
 | 
			
		||||
	use common::*;
 | 
			
		||||
	use super::*;
 | 
			
		||||
	use hashdb::*;
 | 
			
		||||
	use journaldb::traits::JournalDB;
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn insert_same_in_fork() {
 | 
			
		||||
							
								
								
									
										37
									
								
								util/src/journaldb/traits.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								util/src/journaldb/traits.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,37 @@
 | 
			
		||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
 | 
			
		||||
// This file is part of Parity.
 | 
			
		||||
 | 
			
		||||
// Parity is free software: you can redistribute it and/or modify
 | 
			
		||||
// it under the terms of the GNU General Public License as published by
 | 
			
		||||
// the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
// (at your option) any later version.
 | 
			
		||||
 | 
			
		||||
// Parity is distributed in the hope that it will be useful,
 | 
			
		||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
// GNU General Public License for more details.
 | 
			
		||||
 | 
			
		||||
// You should have received a copy of the GNU General Public License
 | 
			
		||||
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
//! Disk-backed HashDB implementation.
 | 
			
		||||
 | 
			
		||||
use common::*;
 | 
			
		||||
use hashdb::*;
 | 
			
		||||
 | 
			
		||||
/// A HashDB which can manage a short-term journal potentially containing many forks of mutually
 | 
			
		||||
/// exclusive actions.
 | 
			
		||||
pub trait JournalDB : HashDB + Send + Sync {
 | 
			
		||||
	/// Return a copy of ourself, in a box.
 | 
			
		||||
	fn spawn(&self) -> Box<JournalDB>;
 | 
			
		||||
 | 
			
		||||
	/// Returns heap memory size used
 | 
			
		||||
	fn mem_used(&self) -> usize;
 | 
			
		||||
 | 
			
		||||
	/// Check if this database has any commits
 | 
			
		||||
	fn is_empty(&self) -> bool;
 | 
			
		||||
 | 
			
		||||
	/// Commit all recent insert operations and canonical historical commits' removals from the
 | 
			
		||||
	/// old era to the backing database, reverting any non-canonical historical commit's inserts.
 | 
			
		||||
	fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
 | 
			
		||||
}
 | 
			
		||||
@ -154,7 +154,7 @@ pub use rlp::*;
 | 
			
		||||
pub use hashdb::*;
 | 
			
		||||
pub use memorydb::*;
 | 
			
		||||
pub use overlaydb::*;
 | 
			
		||||
pub use journaldb::*;
 | 
			
		||||
pub use journaldb::JournalDB;
 | 
			
		||||
pub use math::*;
 | 
			
		||||
pub use crypto::*;
 | 
			
		||||
pub use triehash::*;
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user