From 203fd8a4711c105c15dc243f36813528bbfd09b8 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 20 Jan 2017 13:25:53 +0100 Subject: [PATCH] Memory-based pruning history size (#4114) * prune states based on memory param * pruning memory CLI and usage in sync * return purged value from memorydb * calculate memory used incrementally in overlayrecentdb * refactor shared history pruning code in client * Fixed usage alignment * journal_size function for fast memory calculation --- ethcore/src/client/client.rs | 69 ++++++++++++++++++--------- ethcore/src/client/config.rs | 4 +- ethcore/src/client/test_client.rs | 4 +- ethcore/src/types/pruning_info.rs | 2 - parity/blockchain.rs | 47 ++++++++++++++++-- parity/cli/config.full.toml | 1 + parity/cli/mod.rs | 5 ++ parity/cli/usage.txt | 6 ++- parity/configuration.rs | 11 +++++ parity/helpers.rs | 3 ++ parity/run.rs | 2 + parity/snapshot.rs | 16 ++++++- sync/src/block_sync.rs | 37 +++++++++++--- sync/src/chain.rs | 8 ++-- util/src/journaldb/overlayrecentdb.rs | 39 +++++++++++++-- util/src/journaldb/traits.rs | 5 ++ util/src/memorydb.rs | 14 ++++-- 17 files changed, 220 insertions(+), 53 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 037106804..8e9b2c6d2 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -206,17 +206,6 @@ impl Client { config.history }; - if let (Some(earliest), Some(latest)) = (state_db.journal_db().earliest_era(), state_db.journal_db().latest_era()) { - if latest > earliest && latest - earliest > history { - for era in earliest..(latest - history + 1) { - trace!("Removing era {}", era); - let mut batch = DBTransaction::new(&db); - state_db.mark_canonical(&mut batch, era, &chain.block_hash(era).expect("Old block not found in the database"))?; - db.write(batch).map_err(ClientError::Database)?; - } - } - } - if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) { warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex()); } @@ -257,6 +246,13 @@ impl Client { on_mode_change: Mutex::new(None), registrar: Mutex::new(None), }); + + { + let state_db = client.state_db.lock().boxed_clone(); + let chain = client.chain.read(); + client.prune_ancient(state_db, &chain)?; + } + if let Some(reg_addr) = client.additional_params().get("registrar").and_then(|s| Address::from_str(s).ok()) { trace!(target: "client", "Found registrar at {}", reg_addr); let weak = Arc::downgrade(&client); @@ -553,16 +549,6 @@ impl Client { let mut state = block.drain(); state.journal_under(&mut batch, number, hash).expect("DB commit failed"); - - if number >= self.history { - let n = number - self.history; - if let Some(ancient_hash) = chain.block_hash(n) { - state.mark_canonical(&mut batch, n, &ancient_hash).expect("DB commit failed"); - } else { - debug!(target: "client", "Missing expected hash for block {}", n); - } - } - let route = chain.insert_block(&mut batch, block_data, receipts); self.tracedb.read().import(&mut batch, TraceImportRequest { traces: traces.into(), @@ -578,9 +564,49 @@ impl Client { self.db.read().write_buffered(batch); chain.commit(); self.update_last_hashes(&parent, hash); + + if let Err(e) = self.prune_ancient(state, &chain) { + warn!("Failed to prune ancient state data: {}", e); + } + route } + // prune ancient states until below the memory limit or only the minimum amount remain. + fn prune_ancient(&self, mut state_db: StateDB, chain: &BlockChain) -> Result<(), ClientError> { + let number = match state_db.journal_db().latest_era() { + Some(n) => n, + None => return Ok(()), + }; + + // prune all ancient eras until we're below the memory target, + // but have at least the minimum number of states. + loop { + let needs_pruning = state_db.journal_db().is_pruned() && + state_db.journal_db().journal_size() >= self.config.history_mem; + + if !needs_pruning { break } + match state_db.journal_db().earliest_era() { + Some(era) if era + self.history <= number => { + trace!(target: "client", "Pruning state for ancient era {}", era); + match chain.block_hash(era) { + Some(ancient_hash) => { + let mut batch = DBTransaction::new(&self.db.read()); + state_db.mark_canonical(&mut batch, era, &ancient_hash)?; + self.db.read().write_buffered(batch); + state_db.journal_db().flush(); + } + None => + debug!(target: "client", "Missing expected hash for block {}", era), + } + } + _ => break, // means that every era is kept, no pruning necessary. + } + } + + Ok(()) + } + fn update_last_hashes(&self, parent: &H256, hash: &H256) { let mut hashes = self.last_hashes.write(); if hashes.front().map_or(false, |h| h == parent) { @@ -1408,7 +1434,6 @@ impl BlockChainClient for Client { PruningInfo { earliest_chain: self.chain.read().first_block_number().unwrap_or(1), earliest_state: self.state_db.lock().journal_db().earliest_era().unwrap_or(0), - state_history_size: Some(self.history), } } diff --git a/ethcore/src/client/config.rs b/ethcore/src/client/config.rs index 575d5ac31..9853309c2 100644 --- a/ethcore/src/client/config.rs +++ b/ethcore/src/client/config.rs @@ -129,8 +129,10 @@ pub struct ClientConfig { pub state_cache_size: usize, /// EVM jump-tables cache size. pub jump_table_size: usize, - /// State pruning history size. + /// Minimum state pruning history size. pub history: u64, + /// Ideal memory usage for state pruning history. + pub history_mem: usize, /// Check seal valididity on block import pub check_seal: bool, } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index fe562536a..d14cc31a4 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -714,10 +714,10 @@ impl BlockChainClient for TestBlockChainClient { fn disable(&self) { unimplemented!(); } fn pruning_info(&self) -> PruningInfo { + let best_num = self.chain_info().best_block_number; PruningInfo { earliest_chain: 1, - earliest_state: 1, - state_history_size: *self.history.read(), + earliest_state: self.history.read().as_ref().map(|x| best_num - x).unwrap_or(0), } } diff --git a/ethcore/src/types/pruning_info.rs b/ethcore/src/types/pruning_info.rs index 443316865..80c3e0ce2 100644 --- a/ethcore/src/types/pruning_info.rs +++ b/ethcore/src/types/pruning_info.rs @@ -28,6 +28,4 @@ pub struct PruningInfo { pub earliest_chain: u64, /// The first block where state requests may be served. pub earliest_state: u64, - /// State pruning history size. - pub state_history_size: Option, } diff --git a/parity/blockchain.rs b/parity/blockchain.rs index e5a5ddb3f..3eee9cbac 100644 --- a/parity/blockchain.rs +++ b/parity/blockchain.rs @@ -85,6 +85,7 @@ pub struct ImportBlockchain { pub format: Option, pub pruning: Pruning, pub pruning_history: u64, + pub pruning_memory: usize, pub compaction: DatabaseCompactionProfile, pub wal: bool, pub tracing: Switch, @@ -104,6 +105,7 @@ pub struct ExportBlockchain { pub format: Option, pub pruning: Pruning, pub pruning_history: u64, + pub pruning_memory: usize, pub compaction: DatabaseCompactionProfile, pub wal: bool, pub fat_db: Switch, @@ -122,6 +124,7 @@ pub struct ExportState { pub format: Option, pub pruning: Pruning, pub pruning_history: u64, + pub pruning_memory: usize, pub compaction: DatabaseCompactionProfile, pub wal: bool, pub fat_db: Switch, @@ -196,6 +199,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> { "".into(), algorithm, cmd.pruning_history, + cmd.pruning_memory, cmd.check_seal ); @@ -310,6 +314,7 @@ fn start_client( spec: SpecType, pruning: Pruning, pruning_history: u64, + pruning_memory: usize, tracing: Switch, fat_db: Switch, compaction: DatabaseCompactionProfile, @@ -354,7 +359,20 @@ fn start_client( dirs.create_dirs(false, false)?; // prepare client config - let client_config = to_client_config(&cache_config, Mode::Active, tracing, fat_db, compaction, wal, VMType::default(), "".into(), algorithm, pruning_history, true); + let client_config = to_client_config( + &cache_config, + Mode::Active, + tracing, + fat_db, + compaction, + wal, + VMType::default(), + "".into(), + algorithm, + pruning_history, + pruning_memory, + true, + ); let service = ClientService::start( client_config, @@ -371,7 +389,18 @@ fn start_client( fn execute_export(cmd: ExportBlockchain) -> Result<(), String> { // Setup panic handler - let service = start_client(cmd.dirs, cmd.spec, cmd.pruning, cmd.pruning_history, cmd.tracing, cmd.fat_db, cmd.compaction, cmd.wal, cmd.cache_config)?; + let service = start_client( + cmd.dirs, + cmd.spec, + cmd.pruning, + cmd.pruning_history, + cmd.pruning_memory, + cmd.tracing, + cmd.fat_db, + cmd.compaction, + cmd.wal, + cmd.cache_config + )?; let panic_handler = PanicHandler::new_in_arc(); let format = cmd.format.unwrap_or_default(); @@ -403,7 +432,19 @@ fn execute_export(cmd: ExportBlockchain) -> Result<(), String> { fn execute_export_state(cmd: ExportState) -> Result<(), String> { // Setup panic handler - let service = start_client(cmd.dirs, cmd.spec, cmd.pruning, cmd.pruning_history, cmd.tracing, cmd.fat_db, cmd.compaction, cmd.wal, cmd.cache_config)?; + let service = start_client( + cmd.dirs, + cmd.spec, + cmd.pruning, + cmd.pruning_history, + cmd.pruning_memory, + cmd.tracing, + cmd.fat_db, + cmd.compaction, + cmd.wal, + cmd.cache_config + )?; + let panic_handler = PanicHandler::new_in_arc(); panic_handler.forward_from(&service); diff --git a/parity/cli/config.full.toml b/parity/cli/config.full.toml index 67cba6a48..a3e662a16 100644 --- a/parity/cli/config.full.toml +++ b/parity/cli/config.full.toml @@ -95,6 +95,7 @@ notify_work = ["http://localhost:3001"] tracing = "auto" pruning = "auto" pruning_history = 1200 +pruning_memory = 500 cache_size_db = 64 cache_size_blocks = 8 cache_size_queue = 50 diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 0802cb67f..83407ec25 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -238,6 +238,8 @@ usage! { or |c: &Config| otry!(c.footprint).pruning.clone(), flag_pruning_history: u64 = 1200u64, or |c: &Config| otry!(c.footprint).pruning_history.clone(), + flag_pruning_memory: usize = 150usize, + or |c: &Config| otry!(c.footprint).pruning_memory.clone(), flag_cache_size_db: u32 = 64u32, or |c: &Config| otry!(c.footprint).cache_size_db.clone(), flag_cache_size_blocks: u32 = 8u32, @@ -421,6 +423,7 @@ struct Footprint { tracing: Option, pruning: Option, pruning_history: Option, + pruning_memory: Option, fast_and_loose: Option, cache_size: Option, cache_size_db: Option, @@ -635,6 +638,7 @@ mod tests { flag_tracing: "auto".into(), flag_pruning: "auto".into(), flag_pruning_history: 1200u64, + flag_pruning_memory: 500usize, flag_cache_size_db: 64u32, flag_cache_size_blocks: 8u32, flag_cache_size_queue: 50u32, @@ -812,6 +816,7 @@ mod tests { tracing: Some("on".into()), pruning: Some("fast".into()), pruning_history: Some(64), + pruning_memory: None, fast_and_loose: None, cache_size: None, cache_size_db: Some(128), diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index f75dda524..ebe12a9db 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -271,8 +271,12 @@ Footprint Options: fast - maintain journal overlay. Fast but 50MB used. auto - use the method most recently synced or default to fast if none synced (default: {flag_pruning}). - --pruning-history NUM Set a number of recent states to keep when pruning + --pruning-history NUM Set a minimum number of recent states to keep when pruning is active. (default: {flag_pruning_history}). + --pruning-memory MB The ideal amount of memory in megabytes to use to store + recent states. As many states as possible will be kept + within this limit, and at least --pruning-history states + will always be kept. (default: {flag_pruning_memory}) --cache-size-db MB Override database cache size (default: {flag_cache_size_db}). --cache-size-blocks MB Specify the prefered size of the blockchain cache in megabytes (default: {flag_cache_size_blocks}). diff --git a/parity/configuration.rs b/parity/configuration.rs index 6d23579c1..4efff62a7 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -214,6 +214,7 @@ impl Configuration { format: format, pruning: pruning, pruning_history: pruning_history, + pruning_memory: self.args.flag_pruning_memory, compaction: compaction, wal: wal, tracing: tracing, @@ -234,6 +235,7 @@ impl Configuration { format: format, pruning: pruning, pruning_history: pruning_history, + pruning_memory: self.args.flag_pruning_memory, compaction: compaction, wal: wal, tracing: tracing, @@ -252,6 +254,7 @@ impl Configuration { format: format, pruning: pruning, pruning_history: pruning_history, + pruning_memory: self.args.flag_pruning_memory, compaction: compaction, wal: wal, tracing: tracing, @@ -273,6 +276,7 @@ impl Configuration { spec: spec, pruning: pruning, pruning_history: pruning_history, + pruning_memory: self.args.flag_pruning_memory, tracing: tracing, fat_db: fat_db, compaction: compaction, @@ -289,6 +293,7 @@ impl Configuration { spec: spec, pruning: pruning, pruning_history: pruning_history, + pruning_memory: self.args.flag_pruning_memory, tracing: tracing, fat_db: fat_db, compaction: compaction, @@ -313,6 +318,7 @@ impl Configuration { spec: spec, pruning: pruning, pruning_history: pruning_history, + pruning_memory: self.args.flag_pruning_memory, daemon: daemon, logger_config: logger_config.clone(), miner_options: miner_options, @@ -943,6 +949,7 @@ mod tests { format: Default::default(), pruning: Default::default(), pruning_history: 1200, + pruning_memory: 150, compaction: Default::default(), wal: true, tracing: Default::default(), @@ -965,6 +972,7 @@ mod tests { file_path: Some("blockchain.json".into()), pruning: Default::default(), pruning_history: 1200, + pruning_memory: 150, format: Default::default(), compaction: Default::default(), wal: true, @@ -987,6 +995,7 @@ mod tests { file_path: Some("state.json".into()), pruning: Default::default(), pruning_history: 1200, + pruning_memory: 150, format: Default::default(), compaction: Default::default(), wal: true, @@ -1011,6 +1020,7 @@ mod tests { file_path: Some("blockchain.json".into()), pruning: Default::default(), pruning_history: 1200, + pruning_memory: 150, format: Some(DataFormat::Hex), compaction: Default::default(), wal: true, @@ -1046,6 +1056,7 @@ mod tests { spec: Default::default(), pruning: Default::default(), pruning_history: 1200, + pruning_memory: 150, daemon: None, logger_config: Default::default(), miner_options: Default::default(), diff --git a/parity/helpers.rs b/parity/helpers.rs index 680306684..b09b114cc 100644 --- a/parity/helpers.rs +++ b/parity/helpers.rs @@ -224,6 +224,7 @@ pub fn to_client_config( name: String, pruning: Algorithm, pruning_history: u64, + pruning_memory: usize, check_seal: bool, ) -> ClientConfig { let mut client_config = ClientConfig::default(); @@ -247,6 +248,8 @@ pub fn to_client_config( client_config.state_cache_size = cache_config.state() as usize * mb; // in bytes client_config.jump_table_size = cache_config.jump_tables() as usize * mb; + // in bytes + client_config.history_mem = pruning_memory * mb; client_config.mode = mode; client_config.tracing.enabled = tracing; diff --git a/parity/run.rs b/parity/run.rs index 67bbf0a90..ba4236bac 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -68,6 +68,7 @@ pub struct RunCmd { pub spec: SpecType, pub pruning: Pruning, pub pruning_history: u64, + pub pruning_memory: usize, /// Some if execution should be daemonized. Contains pid_file path. pub daemon: Option, pub logger_config: LogConfig, @@ -273,6 +274,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R cmd.name, algorithm, cmd.pruning_history, + cmd.pruning_memory, cmd.check_seal, ); diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 4118b6e9c..9034b73cf 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -54,6 +54,7 @@ pub struct SnapshotCommand { pub spec: SpecType, pub pruning: Pruning, pub pruning_history: u64, + pub pruning_memory: usize, pub tracing: Switch, pub fat_db: Switch, pub compaction: DatabaseCompactionProfile, @@ -170,7 +171,20 @@ impl SnapshotCommand { execute_upgrades(&self.dirs.base, &db_dirs, algorithm, self.compaction.compaction_profile(db_dirs.db_root_path().as_path()))?; // prepare client config - let client_config = to_client_config(&self.cache_config, Mode::Active, tracing, fat_db, self.compaction, self.wal, VMType::default(), "".into(), algorithm, self.pruning_history, true); + let client_config = to_client_config( + &self.cache_config, + Mode::Active, + tracing, + fat_db, + self.compaction, + self.wal, + VMType::default(), + "".into(), + algorithm, + self.pruning_history, + self.pruning_memory, + true + ); let service = ClientService::start( client_config, diff --git a/sync/src/block_sync.rs b/sync/src/block_sync.rs index 7c423f7c2..f62431d84 100644 --- a/sync/src/block_sync.rs +++ b/sync/src/block_sync.rs @@ -103,15 +103,16 @@ pub struct BlockDownloader { download_receipts: bool, /// Sync up to the block with this hash. target_hash: Option, - /// Reorganize up to this many blocks. Up to genesis if `None`, - max_reorg_blocks: Option, /// Probing range for seeking common best block. retract_step: u64, + /// Whether reorg should be limited. + limit_reorg: bool, } impl BlockDownloader { - /// Create a new instance of syncing strategy. - pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber, max_reorg: Option) -> BlockDownloader { + /// Create a new instance of syncing strategy. This won't reorganize to before the + /// last kept state. + pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { BlockDownloader { state: State::Idle, highest_block: None, @@ -124,8 +125,27 @@ impl BlockDownloader { round_parents: VecDeque::new(), download_receipts: sync_receipts, target_hash: None, - max_reorg_blocks: max_reorg, retract_step: 1, + limit_reorg: true, + } + } + + /// Create a new instance of sync with unlimited reorg allowed. + pub fn with_unlimited_reorg(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { + BlockDownloader { + state: State::Idle, + highest_block: None, + last_imported_block: start_number, + last_imported_hash: start_hash.clone(), + last_round_start: start_number, + last_round_start_hash: start_hash.clone(), + blocks: BlockCollection::new(sync_receipts), + imported_this_round: None, + round_parents: VecDeque::new(), + download_receipts: sync_receipts, + target_hash: None, + retract_step: 1, + limit_reorg: false, } } @@ -268,7 +288,9 @@ impl BlockDownloader { return Ok(DownloadAction::Reset); } else { let best = io.chain().chain_info().best_block_number; - if best > self.last_imported_block && (self.last_imported_block == 0 || best - self.last_imported_block > self.max_reorg_blocks.unwrap_or(u64::max_value())) { + let oldest_reorg = io.chain().pruning_info().earliest_state; + let last = self.last_imported_block; + if self.limit_reorg && best > last && (last == 0 || last < oldest_reorg) { trace!(target: "sync", "No common block, disabling peer"); return Err(BlockDownloaderImportError::Invalid); } @@ -359,7 +381,8 @@ impl BlockDownloader { trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); } else { let best = io.chain().chain_info().best_block_number; - if best > start && (start == 0 || best - start > self.max_reorg_blocks.unwrap_or(u64::max_value())) { + let oldest_reorg = io.chain().pruning_info().earliest_state; + if self.limit_reorg && best > start && start < oldest_reorg { debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); self.reset(); } else { diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 40ed60b1f..8b63bea5f 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -384,7 +384,6 @@ impl ChainSync { /// Create a new instance of syncing strategy. pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync { let chain_info = chain.chain_info(); - let pruning = chain.pruning_info(); let mut sync = ChainSync { state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle }, starting_block: chain.chain_info().best_block_number, @@ -392,7 +391,7 @@ impl ChainSync { peers: HashMap::new(), handshaking_peers: HashMap::new(), active_peers: HashSet::new(), - new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number, pruning.state_history_size), + new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), old_blocks: None, last_sent_block_number: 0, network_id: config.network_id, @@ -567,15 +566,14 @@ impl ChainSync { /// Update sync after the blockchain has been changed externally. pub fn update_targets(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block - let pruning = chain.pruning_info(); let chain = chain.chain_info(); - self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number, pruning.state_history_size); + self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); self.old_blocks = None; if self.download_old_blocks { if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); - let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number, None); + let mut downloader = BlockDownloader::with_unlimited_reorg(true, &ancient_block_hash, ancient_block_number); if let Some(hash) = chain.first_block_hash { trace!(target: "sync", "Downloader target set to {:?}", hash); downloader.set_target(&hash); diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index db320f07d..85728b38f 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -71,6 +71,7 @@ struct JournalOverlay { journal: HashMap>, latest_era: Option, earliest_era: Option, + cumulative_size: usize, // cumulative size of all entries. } #[derive(PartialEq)] @@ -127,7 +128,8 @@ impl OverlayRecentDB { journal_overlay.backing_overlay == reconstructed.backing_overlay && journal_overlay.pending_overlay == reconstructed.pending_overlay && journal_overlay.journal == reconstructed.journal && - journal_overlay.latest_era == reconstructed.latest_era + journal_overlay.latest_era == reconstructed.latest_era && + journal_overlay.cumulative_size == reconstructed.cumulative_size } fn payload(&self, key: &H256) -> Option { @@ -140,6 +142,7 @@ impl OverlayRecentDB { let mut count = 0; let mut latest_era = None; let mut earliest_era = None; + let mut cumulative_size = 0; if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") { let mut era = decode::(&val); latest_era = Some(era); @@ -161,7 +164,14 @@ impl OverlayRecentDB { for r in insertions.iter() { let k: H256 = r.val_at(0); let v = r.at(1).data(); - overlay.emplace(to_short_key(&k), DBValue::from_slice(v)); + + let short_key = to_short_key(&k); + + if !overlay.contains(&short_key) { + cumulative_size += v.len(); + } + + overlay.emplace(short_key, DBValue::from_slice(v)); inserted_keys.push(k); count += 1; } @@ -186,6 +196,7 @@ impl OverlayRecentDB { journal: journal, latest_era: latest_era, earliest_era: earliest_era, + cumulative_size: cumulative_size, } } @@ -207,12 +218,19 @@ impl JournalDB for OverlayRecentDB { fn mem_used(&self) -> usize { let mut mem = self.transaction_overlay.mem_used(); let overlay = self.journal_overlay.read(); + mem += overlay.backing_overlay.mem_used(); mem += overlay.pending_overlay.heap_size_of_children(); mem += overlay.journal.heap_size_of_children(); + mem } + fn journal_size(&self) -> usize { + self.journal_overlay.read().cumulative_size + + } + fn is_empty(&self) -> bool { self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none() } @@ -256,7 +274,13 @@ impl JournalDB for OverlayRecentDB { r.begin_list(2); r.append(&k); r.append(&&*v); - journal_overlay.backing_overlay.emplace(to_short_key(&k), v); + + let short_key = to_short_key(&k); + if !journal_overlay.backing_overlay.contains(&short_key) { + journal_overlay.cumulative_size += v.len(); + } + + journal_overlay.backing_overlay.emplace(short_key, v); } r.append(&removed_keys); @@ -267,6 +291,7 @@ impl JournalDB for OverlayRecentDB { k.append(&&PADDING[..]); batch.put_vec(self.column, &k.drain(), r.out()); if journal_overlay.latest_era.map_or(true, |e| now > e) { + trace!(target: "journaldb", "Set latest era to {}", now); batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec()); journal_overlay.latest_era = Some(now); } @@ -322,7 +347,9 @@ impl JournalDB for OverlayRecentDB { } // update the overlay for k in overlay_deletions { - journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)); + if let Some(val) = journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)) { + journal_overlay.cumulative_size -= val.len(); + } } // apply canon deletions for k in canon_deletions { @@ -332,6 +359,10 @@ impl JournalDB for OverlayRecentDB { } } journal_overlay.journal.remove(&end_era); + if !journal_overlay.journal.is_empty() { + trace!(target: "journaldb", "Set earliest_era to {}", end_era + 1); + journal_overlay.earliest_era = Some(end_era + 1); + } Ok(ops as u32) } diff --git a/util/src/journaldb/traits.rs b/util/src/journaldb/traits.rs index e32591d96..2ec73922a 100644 --- a/util/src/journaldb/traits.rs +++ b/util/src/journaldb/traits.rs @@ -29,6 +29,11 @@ pub trait JournalDB: HashDB { /// Returns heap memory size used fn mem_used(&self) -> usize; + /// Returns the size of journalled state in memory. + /// This function has a considerable speed requirement -- + /// it must be fast enough to call several times per block imported. + fn journal_size(&self) -> usize { 0 } + /// Check if this database has any commits fn is_empty(&self) -> bool; diff --git a/util/src/memorydb.rs b/util/src/memorydb.rs index 67f25922d..d880ca02d 100644 --- a/util/src/memorydb.rs +++ b/util/src/memorydb.rs @@ -133,19 +133,22 @@ impl MemoryDB { } /// Remove an element and delete it from storage if reference count reaches zero. - pub fn remove_and_purge(&mut self, key: &H256) { + /// If the value was purged, return the old value. + pub fn remove_and_purge(&mut self, key: &H256) -> Option { if key == &SHA3_NULL_RLP { - return; + return None; } match self.data.entry(key.clone()) { Entry::Occupied(mut entry) => if entry.get().1 == 1 { - entry.remove(); + Some(entry.remove().0) } else { entry.get_mut().1 -= 1; + None }, Entry::Vacant(entry) => { entry.insert((DBValue::new(), -1)); + None } } } @@ -265,13 +268,14 @@ fn memorydb_remove_and_purge() { assert_eq!(m.raw(&hello_key), None); let mut m = MemoryDB::new(); - m.remove_and_purge(&hello_key); + assert!(m.remove_and_purge(&hello_key).is_none()); assert_eq!(m.raw(&hello_key).unwrap().1, -1); m.insert(hello_bytes); m.insert(hello_bytes); assert_eq!(m.raw(&hello_key).unwrap().1, 1); - m.remove_and_purge(&hello_key); + assert_eq!(&*m.remove_and_purge(&hello_key).unwrap(), hello_bytes); assert_eq!(m.raw(&hello_key), None); + assert!(m.remove_and_purge(&hello_key).is_none()); } #[test]