diff --git a/ethcore/src/block.rs b/ethcore/src/block.rs index ebaeec0a3..41d299892 100644 --- a/ethcore/src/block.rs +++ b/ethcore/src/block.rs @@ -395,6 +395,10 @@ impl<'x> OpenBlock<'x> { uncle_bytes: uncle_bytes, } } + + #[cfg(test)] + /// Return mutable block reference. To be used in tests only. + pub fn block_mut (&mut self) -> &mut ExecutedBlock { &mut self.block } } impl<'x> IsBlock for OpenBlock<'x> { diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 6a479b09f..c8519c365 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -17,6 +17,7 @@ use std::collections::{HashSet, HashMap, BTreeMap, VecDeque}; use std::sync::{Arc, Weak}; use std::path::{Path}; use std::fmt; +use std::cmp; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::time::{Instant}; use time::precise_time_ns; @@ -73,6 +74,7 @@ pub use blockchain::CacheSize as BlockChainCacheSize; const MAX_TX_QUEUE_SIZE: usize = 4096; const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; +const MIN_HISTORY_SIZE: u64 = 8; impl fmt::Display for BlockChainInfo { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -139,10 +141,9 @@ pub struct Client { notify: RwLock>>, queue_transactions: AtomicUsize, last_hashes: RwLock>, + history: u64, } -const HISTORY: u64 = 1200; - // database columns /// Column for State pub const DB_COL_STATE: Option = Some(0); @@ -187,6 +188,19 @@ impl Client { try!(db.write(batch).map_err(ClientError::Database)); } + trace!("Cleanup journal: DB Earliest = {:?}, Latest = {:?}", state_db.journal_db().earliest_era(), state_db.journal_db().latest_era()); + let history = cmp::max(MIN_HISTORY_SIZE, config.history); + if let (Some(earliest), Some(latest)) = (state_db.journal_db().earliest_era(), state_db.journal_db().latest_era()) { + if latest > earliest && latest - earliest > history { + for era in earliest..(latest - history + 1) { + trace!("Removing era {}", era); + let batch = DBTransaction::new(&db); + try!(state_db.journal_db_mut().commit_old(&batch, era, &chain.block_hash(era).expect("Old block not found in the database"))); + try!(db.write(batch).map_err(ClientError::Database)); + } + } + } + if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) { warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex()); } @@ -219,6 +233,7 @@ impl Client { notify: RwLock::new(Vec::new()), queue_transactions: AtomicUsize::new(0), last_hashes: RwLock::new(VecDeque::new()), + history: history, }; Ok(Arc::new(client)) } @@ -275,7 +290,7 @@ impl Client { // Check the block isn't so old we won't be able to enact it. let best_block_number = self.chain.best_block_number(); - if best_block_number >= HISTORY && header.number() <= best_block_number - HISTORY { + if best_block_number >= self.history && header.number() <= best_block_number - self.history { warn!(target: "client", "Block import failed for #{} ({})\nBlock is ancient (current best block: #{}).", header.number(), header.hash(), best_block_number); return Err(()); } @@ -419,8 +434,8 @@ impl Client { let number = block.header().number(); let parent = block.header().parent_hash().clone(); // Are we committing an era? - let ancient = if number >= HISTORY { - let n = number - HISTORY; + let ancient = if number >= self.history { + let n = number - self.history; Some((n, self.chain.block_hash(n).expect("only verified blocks can be commited; verified block has hash; qed"))) } else { None @@ -498,7 +513,7 @@ impl Client { let db = self.state_db.lock().boxed_clone(); // early exit for pruned blocks - if db.is_pruned() && self.chain.best_block_number() >= block_number + HISTORY { + if db.is_pruned() && self.chain.best_block_number() >= block_number + self.history { return None; } @@ -603,7 +618,7 @@ impl Client { let best_block_number = self.chain_info().best_block_number; let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at))); - if best_block_number > HISTORY + block_number && db.is_pruned() { + if best_block_number > self.history + block_number && db.is_pruned() { return Err(snapshot::Error::OldBlockPrunedDB.into()); } @@ -615,8 +630,10 @@ impl Client { 0 }; - self.block_hash(BlockID::Number(start_num)) - .expect("blocks within HISTORY are always stored.") + match self.block_hash(BlockID::Number(start_num)) { + Some(h) => h, + None => return Err(snapshot::Error::InvalidStartingBlock(at).into()), + } } _ => match self.block_hash(at) { Some(hash) => hash, diff --git a/ethcore/src/client/config.rs b/ethcore/src/client/config.rs index 504ca4de7..4696e9240 100644 --- a/ethcore/src/client/config.rs +++ b/ethcore/src/client/config.rs @@ -107,6 +107,8 @@ pub struct ClientConfig { pub mode: Mode, /// Type of block verifier used by client. pub verifier_type: VerifierType, + /// State pruning history size. + pub history: u64, } #[cfg(test)] diff --git a/ethcore/src/state_db.rs b/ethcore/src/state_db.rs index a1c756b32..6f779e968 100644 --- a/ethcore/src/state_db.rs +++ b/ethcore/src/state_db.rs @@ -323,6 +323,11 @@ impl StateDB { &*self.db } + /// Returns underlying `JournalDB`. + pub fn journal_db_mut(&mut self) -> &mut JournalDB { + &mut *self.db + } + /// Add pending cache change. /// The change is queued to be applied in `commit`. pub fn add_to_account_cache(&mut self, addr: Address, data: Option, modified: bool) { diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 1ac26c83b..f1960557d 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -22,6 +22,7 @@ use tests::helpers::*; use common::*; use devtools::*; use miner::Miner; +use spec::Spec; #[test] fn imports_from_empty() { @@ -167,3 +168,26 @@ fn can_mine() { assert_eq!(*b.block().header().parent_hash(), BlockView::new(&dummy_blocks[0]).header_view().sha3()); } + +#[test] +fn change_history_size() { + let dir = RandomTempPath::new(); + let test_spec = Spec::new_null(); + let mut config = ClientConfig::default(); + config.history = 2; + let address = Address::random(); + { + let client = Client::new(ClientConfig::default(), &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected()).unwrap(); + for _ in 0..20 { + let mut b = client.prepare_open_block(Address::default(), (3141562.into(), 31415620.into()), vec![]); + b.block_mut().fields_mut().state.add_balance(&address, &5.into()); + b.block_mut().fields_mut().state.commit().unwrap(); + let b = b.close_and_lock().seal(&*test_spec.engine, vec![]).unwrap(); + client.import_sealed_block(b).unwrap(); // account change is in the journal overlay + } + } + let mut config = ClientConfig::default(); + config.history = 10; + let client = Client::new(config, &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected()).unwrap(); + assert_eq!(client.state().balance(&address), 100.into()); +} diff --git a/parity/blockchain.rs b/parity/blockchain.rs index b0c5d95a7..902642640 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -75,6 +75,7 @@ pub struct ImportBlockchain { pub file_path: Option, pub format: Option, pub pruning: Pruning, + pub pruning_history: u64, pub compaction: DatabaseCompactionProfile, pub wal: bool, pub mode: Mode, @@ -91,6 +92,7 @@ pub struct ExportBlockchain { pub file_path: Option, pub format: Option, pub pruning: Pruning, + pub pruning_history: u64, pub compaction: DatabaseCompactionProfile, pub wal: bool, pub mode: Mode, @@ -131,7 +133,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result { try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile())); // prepare client config - let client_config = to_client_config(&cmd.cache_config, &cmd.dirs, genesis_hash, cmd.mode, cmd.tracing, cmd.pruning, cmd.compaction, cmd.wal, cmd.vm_type, "".into(), spec.fork_name.as_ref()); + let client_config = to_client_config(&cmd.cache_config, &cmd.dirs, genesis_hash, cmd.mode, cmd.tracing, cmd.pruning, cmd.pruning_history, cmd.compaction, cmd.wal, cmd.vm_type, "".into(), spec.fork_name.as_ref()); // build client let service = try!(ClientService::start( @@ -242,7 +244,7 @@ fn execute_export(cmd: ExportBlockchain) -> Result { try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile())); // prepare client config - let client_config = to_client_config(&cmd.cache_config, &cmd.dirs, genesis_hash, cmd.mode, cmd.tracing, cmd.pruning, cmd.compaction, cmd.wal, VMType::default(), "".into(), spec.fork_name.as_ref()); + let client_config = to_client_config(&cmd.cache_config, &cmd.dirs, genesis_hash, cmd.mode, cmd.tracing, cmd.pruning, cmd.pruning_history, cmd.compaction, cmd.wal, VMType::default(), "".into(), spec.fork_name.as_ref()); let service = try!(ClientService::start( client_config, diff --git a/parity/cli.rs b/parity/cli.rs index 0385828ea..fe648d29e 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -223,6 +223,8 @@ Footprint Options: fast - maintain journal overlay. Fast but 50MB used. auto - use the method most recently synced or default to fast if none synced [default: auto]. + --pruning-history NUM Set a number of recent states to keep when pruning + is active. [default: 64]. --cache-size-db MB Override database cache size [default: 64]. --cache-size-blocks MB Specify the prefered size of the blockchain cache in megabytes [default: 8]. @@ -329,6 +331,7 @@ pub struct Args { pub flag_bootnodes: Option, pub flag_network_id: Option, pub flag_pruning: String, + pub flag_pruning_history: u64, pub flag_tracing: String, pub flag_port: u16, pub flag_min_peers: u16, diff --git a/parity/configuration.rs b/parity/configuration.rs index eac1baff1..e829b9721 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -73,6 +73,7 @@ impl Configuration { pub fn into_command(self) -> Result { let dirs = self.directories(); let pruning = try!(self.args.flag_pruning.parse()); + let pruning_history = self.args.flag_pruning_history; let vm_type = try!(self.vm_type()); let mode = try!(to_mode(&self.args.flag_mode, self.args.flag_mode_timeout, self.args.flag_mode_alarm)); let miner_options = try!(self.miner_options()); @@ -134,6 +135,7 @@ impl Configuration { file_path: self.args.arg_file.clone(), format: format, pruning: pruning, + pruning_history: pruning_history, compaction: compaction, wal: wal, mode: mode, @@ -150,6 +152,7 @@ impl Configuration { file_path: self.args.arg_file.clone(), format: format, pruning: pruning, + pruning_history: pruning_history, compaction: compaction, wal: wal, mode: mode, @@ -164,6 +167,7 @@ impl Configuration { dirs: dirs, spec: spec, pruning: pruning, + pruning_history: pruning_history, logger_config: logger_config, mode: mode, tracing: tracing, @@ -180,6 +184,7 @@ impl Configuration { dirs: dirs, spec: spec, pruning: pruning, + pruning_history: pruning_history, logger_config: logger_config, mode: mode, tracing: tracing, @@ -202,6 +207,7 @@ impl Configuration { dirs: dirs, spec: spec, pruning: pruning, + pruning_history: pruning_history, daemon: daemon, logger_config: logger_config, miner_options: miner_options, @@ -695,6 +701,7 @@ mod tests { file_path: Some("blockchain.json".into()), format: Default::default(), pruning: Default::default(), + pruning_history: 64, compaction: Default::default(), wal: true, mode: Default::default(), @@ -714,6 +721,7 @@ mod tests { dirs: Default::default(), file_path: Some("blockchain.json".into()), pruning: Default::default(), + pruning_history: 64, format: Default::default(), compaction: Default::default(), wal: true, @@ -735,6 +743,7 @@ mod tests { dirs: Default::default(), file_path: Some("blockchain.json".into()), pruning: Default::default(), + pruning_history: 64, format: Some(DataFormat::Hex), compaction: Default::default(), wal: true, @@ -762,6 +771,7 @@ mod tests { dirs: Default::default(), spec: Default::default(), pruning: Default::default(), + pruning_history: 64, daemon: None, logger_config: Default::default(), miner_options: Default::default(), diff --git a/parity/helpers.rs b/parity/helpers.rs index 15539881f..ee2211c3b 100644 --- a/parity/helpers.rs +++ b/parity/helpers.rs @@ -202,6 +202,7 @@ pub fn to_client_config( mode: Mode, tracing: Switch, pruning: Pruning, + pruning_history: u64, compaction: DatabaseCompactionProfile, wal: bool, vm_type: VMType, @@ -229,6 +230,7 @@ pub fn to_client_config( client_config.mode = mode; client_config.tracing.enabled = tracing; client_config.pruning = pruning.to_algorithm(dirs, genesis_hash, fork_name); + client_config.history = pruning_history; client_config.db_compaction = compaction; client_config.db_wal = wal; client_config.vm_type = vm_type; diff --git a/parity/run.rs b/parity/run.rs index c659d4d25..fa59d2a73 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -52,6 +52,7 @@ pub struct RunCmd { pub dirs: Directories, pub spec: SpecType, pub pruning: Pruning, + pub pruning_history: u64, /// Some if execution should be daemonized. Contains pid_file path. pub daemon: Option, pub logger_config: LogConfig, @@ -149,6 +150,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> { cmd.mode, cmd.tracing, cmd.pruning, + cmd.pruning_history, cmd.compaction, cmd.wal, cmd.vm_type, diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 2a3c12567..68b6e6bb6 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -52,6 +52,7 @@ pub struct SnapshotCommand { pub dirs: Directories, pub spec: SpecType, pub pruning: Pruning, + pub pruning_history: u64, pub logger_config: LogConfig, pub mode: Mode, pub tracing: Switch, @@ -89,7 +90,7 @@ impl SnapshotCommand { try!(execute_upgrades(&self.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, self.compaction.compaction_profile())); // prepare client config - let client_config = to_client_config(&self.cache_config, &self.dirs, genesis_hash, self.mode, self.tracing, self.pruning, self.compaction, self.wal, VMType::default(), "".into(), spec.fork_name.as_ref()); + let client_config = to_client_config(&self.cache_config, &self.dirs, genesis_hash, self.mode, self.tracing, self.pruning, self.pruning_history, self.compaction, self.wal, VMType::default(), "".into(), spec.fork_name.as_ref()); let service = try!(ClientService::start( client_config, diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index 5143b9999..d4580aed9 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -70,6 +70,7 @@ struct JournalOverlay { pending_overlay: H256FastMap, // Nodes being transfered from backing_overlay to backing db journal: HashMap>, latest_era: Option, + earliest_era: Option, } #[derive(PartialEq)] @@ -123,7 +124,10 @@ impl OverlayRecentDB { fn can_reconstruct_refs(&self) -> bool { let reconstructed = Self::read_overlay(&self.backing, self.column); let journal_overlay = self.journal_overlay.read(); - *journal_overlay == reconstructed + journal_overlay.backing_overlay == reconstructed.backing_overlay && + journal_overlay.pending_overlay == reconstructed.pending_overlay && + journal_overlay.journal == reconstructed.journal && + journal_overlay.latest_era == reconstructed.latest_era } fn payload(&self, key: &H256) -> Option { @@ -135,6 +139,7 @@ impl OverlayRecentDB { let mut overlay = MemoryDB::new(); let mut count = 0; let mut latest_era = None; + let mut earliest_era = None; if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") { let mut era = decode::(&val); latest_era = Some(era); @@ -166,6 +171,7 @@ impl OverlayRecentDB { deletions: deletions, }); index += 1; + earliest_era = Some(era); }; if index == 0 || era == 0 { break; @@ -178,9 +184,62 @@ impl OverlayRecentDB { backing_overlay: overlay, pending_overlay: HashMap::default(), journal: journal, - latest_era: latest_era } + latest_era: latest_era, + earliest_era: earliest_era, + } } + fn apply_old_commit(batch: &DBTransaction, journal_overlay: &mut JournalOverlay, column: Option, end_era: u64, canon_id: &H256) -> Result<(), UtilError> { + // apply old commits' details + if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) { + let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new(); + let mut canon_deletions: Vec = Vec::new(); + let mut overlay_deletions: Vec = Vec::new(); + let mut index = 0usize; + for mut journal in records.drain(..) { + //delete the record from the db + let mut r = RlpStream::new_list(3); + r.append(&end_era); + r.append(&index); + r.append(&&PADDING[..]); + try!(batch.delete(column, &r.drain())); + trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len()); + { + if canon_id == &journal.id { + for h in &journal.insertions { + if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) { + if rc > 0 { + canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy + } + } + } + canon_deletions = journal.deletions; + } + overlay_deletions.append(&mut journal.insertions); + } + index += 1; + } + // apply canon inserts first + for (k, v) in canon_insertions { + try!(batch.put(column, &k, &v)); + journal_overlay.pending_overlay.insert(to_short_key(&k), v); + } + // update the overlay + for k in overlay_deletions { + journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)); + } + // apply canon deletions + for k in canon_deletions { + if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) { + try!(batch.delete(column, &k)); + } + } + } + journal_overlay.journal.remove(&end_era); + Ok(()) + } + + } #[inline] @@ -214,6 +273,8 @@ impl JournalDB for OverlayRecentDB { fn latest_era(&self) -> Option { self.journal_overlay.read().latest_era } + fn earliest_era(&self) -> Option { self.journal_overlay.read().earliest_era } + fn state(&self, key: &H256) -> Option { let journal_overlay = self.journal_overlay.read(); let key = to_short_key(key); @@ -257,57 +318,16 @@ impl JournalDB for OverlayRecentDB { } journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys }); } - - let journal_overlay = &mut *journal_overlay; - // apply old commits' details if let Some((end_era, canon_id)) = end { - if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) { - let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new(); - let mut canon_deletions: Vec = Vec::new(); - let mut overlay_deletions: Vec = Vec::new(); - let mut index = 0usize; - for mut journal in records.drain(..) { - //delete the record from the db - let mut r = RlpStream::new_list(3); - r.append(&end_era); - r.append(&index); - r.append(&&PADDING[..]); - try!(batch.delete(self.column, &r.drain())); - trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len()); - { - if canon_id == journal.id { - for h in &journal.insertions { - if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) { - if rc > 0 { - canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy - } - } - } - canon_deletions = journal.deletions; - } - overlay_deletions.append(&mut journal.insertions); - } - index += 1; - } - // apply canon inserts first - for (k, v) in canon_insertions { - try!(batch.put(self.column, &k, &v)); - journal_overlay.pending_overlay.insert(to_short_key(&k), v); - } - // update the overlay - for k in overlay_deletions { - journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)); - } - // apply canon deletions - for k in canon_deletions { - if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) { - try!(batch.delete(self.column, &k)); - } - } - } - journal_overlay.journal.remove(&end_era); + try!(Self::apply_old_commit(batch, &mut journal_overlay, self.column, end_era, &canon_id)); } - Ok(0) + Ok((0)) + + } + + fn commit_old(&mut self, batch: &DBTransaction, end_era: u64, end_id: &H256) -> Result<(), UtilError> { + let mut journal_overlay = self.journal_overlay.write(); + Self::apply_old_commit(batch, &mut journal_overlay, self.column, end_era, end_id) } fn flush(&self) { diff --git a/util/src/journaldb/traits.rs b/util/src/journaldb/traits.rs index 96715604e..3301b36a3 100644 --- a/util/src/journaldb/traits.rs +++ b/util/src/journaldb/traits.rs @@ -35,10 +35,19 @@ pub trait JournalDB: HashDB { /// Get the latest era in the DB. None if there isn't yet any data in there. fn latest_era(&self) -> Option; + /// Get the earliest era in the DB. None if there isn't yet any data in there. + fn earliest_era(&self) -> Option { None } + /// Commit all recent insert operations and canonical historical commits' removals from the /// old era to the backing database, reverting any non-canonical historical commit's inserts. fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result; + /// Commit canonical historical commits' removals from the + /// old era to the backing database, reverting any non-canonical historical commit's inserts. + fn commit_old(&mut self, _batch: &DBTransaction, _end_era: u64, _end_id: &H256) -> Result<(), UtilError> { + Ok(()) + } + /// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions /// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated. ///