Configurable history size in beta (#2587)

* Reduce DB history

* Configurable history size

* Set min history size

* Test

* Fixed a test and reduced the limit
This commit is contained in:
Arkadiy Paronyan 2016-10-12 20:44:17 +02:00 committed by GitHub
parent 13de1ebc8e
commit 6b4d0cea6b
13 changed files with 164 additions and 63 deletions

View File

@ -395,6 +395,10 @@ impl<'x> OpenBlock<'x> {
uncle_bytes: uncle_bytes, uncle_bytes: uncle_bytes,
} }
} }
#[cfg(test)]
/// Return mutable block reference. To be used in tests only.
pub fn block_mut (&mut self) -> &mut ExecutedBlock { &mut self.block }
} }
impl<'x> IsBlock for OpenBlock<'x> { impl<'x> IsBlock for OpenBlock<'x> {

View File

@ -17,6 +17,7 @@ use std::collections::{HashSet, HashMap, BTreeMap, VecDeque};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::path::{Path}; use std::path::{Path};
use std::fmt; use std::fmt;
use std::cmp;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant}; use std::time::{Instant};
use time::precise_time_ns; use time::precise_time_ns;
@ -73,6 +74,7 @@ pub use blockchain::CacheSize as BlockChainCacheSize;
const MAX_TX_QUEUE_SIZE: usize = 4096; const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2; const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
const MIN_HISTORY_SIZE: u64 = 8;
impl fmt::Display for BlockChainInfo { impl fmt::Display for BlockChainInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@ -139,10 +141,9 @@ pub struct Client {
notify: RwLock<Vec<Weak<ChainNotify>>>, notify: RwLock<Vec<Weak<ChainNotify>>>,
queue_transactions: AtomicUsize, queue_transactions: AtomicUsize,
last_hashes: RwLock<VecDeque<H256>>, last_hashes: RwLock<VecDeque<H256>>,
history: u64,
} }
const HISTORY: u64 = 1200;
// database columns // database columns
/// Column for State /// Column for State
pub const DB_COL_STATE: Option<u32> = Some(0); pub const DB_COL_STATE: Option<u32> = Some(0);
@ -187,6 +188,19 @@ impl Client {
try!(db.write(batch).map_err(ClientError::Database)); try!(db.write(batch).map_err(ClientError::Database));
} }
trace!("Cleanup journal: DB Earliest = {:?}, Latest = {:?}", state_db.journal_db().earliest_era(), state_db.journal_db().latest_era());
let history = cmp::max(MIN_HISTORY_SIZE, config.history);
if let (Some(earliest), Some(latest)) = (state_db.journal_db().earliest_era(), state_db.journal_db().latest_era()) {
if latest > earliest && latest - earliest > history {
for era in earliest..(latest - history + 1) {
trace!("Removing era {}", era);
let batch = DBTransaction::new(&db);
try!(state_db.journal_db_mut().commit_old(&batch, era, &chain.block_hash(era).expect("Old block not found in the database")));
try!(db.write(batch).map_err(ClientError::Database));
}
}
}
if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) { if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) {
warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex()); warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex());
} }
@ -219,6 +233,7 @@ impl Client {
notify: RwLock::new(Vec::new()), notify: RwLock::new(Vec::new()),
queue_transactions: AtomicUsize::new(0), queue_transactions: AtomicUsize::new(0),
last_hashes: RwLock::new(VecDeque::new()), last_hashes: RwLock::new(VecDeque::new()),
history: history,
}; };
Ok(Arc::new(client)) Ok(Arc::new(client))
} }
@ -275,7 +290,7 @@ impl Client {
// Check the block isn't so old we won't be able to enact it. // Check the block isn't so old we won't be able to enact it.
let best_block_number = self.chain.best_block_number(); let best_block_number = self.chain.best_block_number();
if best_block_number >= HISTORY && header.number() <= best_block_number - HISTORY { if best_block_number >= self.history && header.number() <= best_block_number - self.history {
warn!(target: "client", "Block import failed for #{} ({})\nBlock is ancient (current best block: #{}).", header.number(), header.hash(), best_block_number); warn!(target: "client", "Block import failed for #{} ({})\nBlock is ancient (current best block: #{}).", header.number(), header.hash(), best_block_number);
return Err(()); return Err(());
} }
@ -419,8 +434,8 @@ impl Client {
let number = block.header().number(); let number = block.header().number();
let parent = block.header().parent_hash().clone(); let parent = block.header().parent_hash().clone();
// Are we committing an era? // Are we committing an era?
let ancient = if number >= HISTORY { let ancient = if number >= self.history {
let n = number - HISTORY; let n = number - self.history;
Some((n, self.chain.block_hash(n).expect("only verified blocks can be commited; verified block has hash; qed"))) Some((n, self.chain.block_hash(n).expect("only verified blocks can be commited; verified block has hash; qed")))
} else { } else {
None None
@ -498,7 +513,7 @@ impl Client {
let db = self.state_db.lock().boxed_clone(); let db = self.state_db.lock().boxed_clone();
// early exit for pruned blocks // early exit for pruned blocks
if db.is_pruned() && self.chain.best_block_number() >= block_number + HISTORY { if db.is_pruned() && self.chain.best_block_number() >= block_number + self.history {
return None; return None;
} }
@ -603,7 +618,7 @@ impl Client {
let best_block_number = self.chain_info().best_block_number; let best_block_number = self.chain_info().best_block_number;
let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at))); let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at)));
if best_block_number > HISTORY + block_number && db.is_pruned() { if best_block_number > self.history + block_number && db.is_pruned() {
return Err(snapshot::Error::OldBlockPrunedDB.into()); return Err(snapshot::Error::OldBlockPrunedDB.into());
} }
@ -615,8 +630,10 @@ impl Client {
0 0
}; };
self.block_hash(BlockID::Number(start_num)) match self.block_hash(BlockID::Number(start_num)) {
.expect("blocks within HISTORY are always stored.") Some(h) => h,
None => return Err(snapshot::Error::InvalidStartingBlock(at).into()),
}
} }
_ => match self.block_hash(at) { _ => match self.block_hash(at) {
Some(hash) => hash, Some(hash) => hash,

View File

@ -107,6 +107,8 @@ pub struct ClientConfig {
pub mode: Mode, pub mode: Mode,
/// Type of block verifier used by client. /// Type of block verifier used by client.
pub verifier_type: VerifierType, pub verifier_type: VerifierType,
/// State pruning history size.
pub history: u64,
} }
#[cfg(test)] #[cfg(test)]

View File

@ -323,6 +323,11 @@ impl StateDB {
&*self.db &*self.db
} }
/// Returns underlying `JournalDB`.
pub fn journal_db_mut(&mut self) -> &mut JournalDB {
&mut *self.db
}
/// Add pending cache change. /// Add pending cache change.
/// The change is queued to be applied in `commit`. /// The change is queued to be applied in `commit`.
pub fn add_to_account_cache(&mut self, addr: Address, data: Option<Account>, modified: bool) { pub fn add_to_account_cache(&mut self, addr: Address, data: Option<Account>, modified: bool) {

View File

@ -22,6 +22,7 @@ use tests::helpers::*;
use common::*; use common::*;
use devtools::*; use devtools::*;
use miner::Miner; use miner::Miner;
use spec::Spec;
#[test] #[test]
fn imports_from_empty() { fn imports_from_empty() {
@ -167,3 +168,26 @@ fn can_mine() {
assert_eq!(*b.block().header().parent_hash(), BlockView::new(&dummy_blocks[0]).header_view().sha3()); assert_eq!(*b.block().header().parent_hash(), BlockView::new(&dummy_blocks[0]).header_view().sha3());
} }
#[test]
fn change_history_size() {
let dir = RandomTempPath::new();
let test_spec = Spec::new_null();
let mut config = ClientConfig::default();
config.history = 2;
let address = Address::random();
{
let client = Client::new(ClientConfig::default(), &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected()).unwrap();
for _ in 0..20 {
let mut b = client.prepare_open_block(Address::default(), (3141562.into(), 31415620.into()), vec![]);
b.block_mut().fields_mut().state.add_balance(&address, &5.into());
b.block_mut().fields_mut().state.commit().unwrap();
let b = b.close_and_lock().seal(&*test_spec.engine, vec![]).unwrap();
client.import_sealed_block(b).unwrap(); // account change is in the journal overlay
}
}
let mut config = ClientConfig::default();
config.history = 10;
let client = Client::new(config, &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected()).unwrap();
assert_eq!(client.state().balance(&address), 100.into());
}

View File

@ -75,6 +75,7 @@ pub struct ImportBlockchain {
pub file_path: Option<String>, pub file_path: Option<String>,
pub format: Option<DataFormat>, pub format: Option<DataFormat>,
pub pruning: Pruning, pub pruning: Pruning,
pub pruning_history: u64,
pub compaction: DatabaseCompactionProfile, pub compaction: DatabaseCompactionProfile,
pub wal: bool, pub wal: bool,
pub mode: Mode, pub mode: Mode,
@ -91,6 +92,7 @@ pub struct ExportBlockchain {
pub file_path: Option<String>, pub file_path: Option<String>,
pub format: Option<DataFormat>, pub format: Option<DataFormat>,
pub pruning: Pruning, pub pruning: Pruning,
pub pruning_history: u64,
pub compaction: DatabaseCompactionProfile, pub compaction: DatabaseCompactionProfile,
pub wal: bool, pub wal: bool,
pub mode: Mode, pub mode: Mode,
@ -131,7 +133,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile())); try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile()));
// prepare client config // prepare client config
let client_config = to_client_config(&cmd.cache_config, &cmd.dirs, genesis_hash, cmd.mode, cmd.tracing, cmd.pruning, cmd.compaction, cmd.wal, cmd.vm_type, "".into(), spec.fork_name.as_ref()); let client_config = to_client_config(&cmd.cache_config, &cmd.dirs, genesis_hash, cmd.mode, cmd.tracing, cmd.pruning, cmd.pruning_history, cmd.compaction, cmd.wal, cmd.vm_type, "".into(), spec.fork_name.as_ref());
// build client // build client
let service = try!(ClientService::start( let service = try!(ClientService::start(
@ -242,7 +244,7 @@ fn execute_export(cmd: ExportBlockchain) -> Result<String, String> {
try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile())); try!(execute_upgrades(&cmd.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, cmd.compaction.compaction_profile()));
// prepare client config // prepare client config
let client_config = to_client_config(&cmd.cache_config, &cmd.dirs, genesis_hash, cmd.mode, cmd.tracing, cmd.pruning, cmd.compaction, cmd.wal, VMType::default(), "".into(), spec.fork_name.as_ref()); let client_config = to_client_config(&cmd.cache_config, &cmd.dirs, genesis_hash, cmd.mode, cmd.tracing, cmd.pruning, cmd.pruning_history, cmd.compaction, cmd.wal, VMType::default(), "".into(), spec.fork_name.as_ref());
let service = try!(ClientService::start( let service = try!(ClientService::start(
client_config, client_config,

View File

@ -223,6 +223,8 @@ Footprint Options:
fast - maintain journal overlay. Fast but 50MB used. fast - maintain journal overlay. Fast but 50MB used.
auto - use the method most recently synced or auto - use the method most recently synced or
default to fast if none synced [default: auto]. default to fast if none synced [default: auto].
--pruning-history NUM Set a number of recent states to keep when pruning
is active. [default: 64].
--cache-size-db MB Override database cache size [default: 64]. --cache-size-db MB Override database cache size [default: 64].
--cache-size-blocks MB Specify the prefered size of the blockchain cache in --cache-size-blocks MB Specify the prefered size of the blockchain cache in
megabytes [default: 8]. megabytes [default: 8].
@ -329,6 +331,7 @@ pub struct Args {
pub flag_bootnodes: Option<String>, pub flag_bootnodes: Option<String>,
pub flag_network_id: Option<String>, pub flag_network_id: Option<String>,
pub flag_pruning: String, pub flag_pruning: String,
pub flag_pruning_history: u64,
pub flag_tracing: String, pub flag_tracing: String,
pub flag_port: u16, pub flag_port: u16,
pub flag_min_peers: u16, pub flag_min_peers: u16,

View File

@ -73,6 +73,7 @@ impl Configuration {
pub fn into_command(self) -> Result<Cmd, String> { pub fn into_command(self) -> Result<Cmd, String> {
let dirs = self.directories(); let dirs = self.directories();
let pruning = try!(self.args.flag_pruning.parse()); let pruning = try!(self.args.flag_pruning.parse());
let pruning_history = self.args.flag_pruning_history;
let vm_type = try!(self.vm_type()); let vm_type = try!(self.vm_type());
let mode = try!(to_mode(&self.args.flag_mode, self.args.flag_mode_timeout, self.args.flag_mode_alarm)); let mode = try!(to_mode(&self.args.flag_mode, self.args.flag_mode_timeout, self.args.flag_mode_alarm));
let miner_options = try!(self.miner_options()); let miner_options = try!(self.miner_options());
@ -134,6 +135,7 @@ impl Configuration {
file_path: self.args.arg_file.clone(), file_path: self.args.arg_file.clone(),
format: format, format: format,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history,
compaction: compaction, compaction: compaction,
wal: wal, wal: wal,
mode: mode, mode: mode,
@ -150,6 +152,7 @@ impl Configuration {
file_path: self.args.arg_file.clone(), file_path: self.args.arg_file.clone(),
format: format, format: format,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history,
compaction: compaction, compaction: compaction,
wal: wal, wal: wal,
mode: mode, mode: mode,
@ -164,6 +167,7 @@ impl Configuration {
dirs: dirs, dirs: dirs,
spec: spec, spec: spec,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history,
logger_config: logger_config, logger_config: logger_config,
mode: mode, mode: mode,
tracing: tracing, tracing: tracing,
@ -180,6 +184,7 @@ impl Configuration {
dirs: dirs, dirs: dirs,
spec: spec, spec: spec,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history,
logger_config: logger_config, logger_config: logger_config,
mode: mode, mode: mode,
tracing: tracing, tracing: tracing,
@ -202,6 +207,7 @@ impl Configuration {
dirs: dirs, dirs: dirs,
spec: spec, spec: spec,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history,
daemon: daemon, daemon: daemon,
logger_config: logger_config, logger_config: logger_config,
miner_options: miner_options, miner_options: miner_options,
@ -695,6 +701,7 @@ mod tests {
file_path: Some("blockchain.json".into()), file_path: Some("blockchain.json".into()),
format: Default::default(), format: Default::default(),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 64,
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
mode: Default::default(), mode: Default::default(),
@ -714,6 +721,7 @@ mod tests {
dirs: Default::default(), dirs: Default::default(),
file_path: Some("blockchain.json".into()), file_path: Some("blockchain.json".into()),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 64,
format: Default::default(), format: Default::default(),
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
@ -735,6 +743,7 @@ mod tests {
dirs: Default::default(), dirs: Default::default(),
file_path: Some("blockchain.json".into()), file_path: Some("blockchain.json".into()),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 64,
format: Some(DataFormat::Hex), format: Some(DataFormat::Hex),
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
@ -762,6 +771,7 @@ mod tests {
dirs: Default::default(), dirs: Default::default(),
spec: Default::default(), spec: Default::default(),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 64,
daemon: None, daemon: None,
logger_config: Default::default(), logger_config: Default::default(),
miner_options: Default::default(), miner_options: Default::default(),

View File

@ -202,6 +202,7 @@ pub fn to_client_config(
mode: Mode, mode: Mode,
tracing: Switch, tracing: Switch,
pruning: Pruning, pruning: Pruning,
pruning_history: u64,
compaction: DatabaseCompactionProfile, compaction: DatabaseCompactionProfile,
wal: bool, wal: bool,
vm_type: VMType, vm_type: VMType,
@ -229,6 +230,7 @@ pub fn to_client_config(
client_config.mode = mode; client_config.mode = mode;
client_config.tracing.enabled = tracing; client_config.tracing.enabled = tracing;
client_config.pruning = pruning.to_algorithm(dirs, genesis_hash, fork_name); client_config.pruning = pruning.to_algorithm(dirs, genesis_hash, fork_name);
client_config.history = pruning_history;
client_config.db_compaction = compaction; client_config.db_compaction = compaction;
client_config.db_wal = wal; client_config.db_wal = wal;
client_config.vm_type = vm_type; client_config.vm_type = vm_type;

View File

@ -52,6 +52,7 @@ pub struct RunCmd {
pub dirs: Directories, pub dirs: Directories,
pub spec: SpecType, pub spec: SpecType,
pub pruning: Pruning, pub pruning: Pruning,
pub pruning_history: u64,
/// Some if execution should be daemonized. Contains pid_file path. /// Some if execution should be daemonized. Contains pid_file path.
pub daemon: Option<String>, pub daemon: Option<String>,
pub logger_config: LogConfig, pub logger_config: LogConfig,
@ -149,6 +150,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
cmd.mode, cmd.mode,
cmd.tracing, cmd.tracing,
cmd.pruning, cmd.pruning,
cmd.pruning_history,
cmd.compaction, cmd.compaction,
cmd.wal, cmd.wal,
cmd.vm_type, cmd.vm_type,

View File

@ -52,6 +52,7 @@ pub struct SnapshotCommand {
pub dirs: Directories, pub dirs: Directories,
pub spec: SpecType, pub spec: SpecType,
pub pruning: Pruning, pub pruning: Pruning,
pub pruning_history: u64,
pub logger_config: LogConfig, pub logger_config: LogConfig,
pub mode: Mode, pub mode: Mode,
pub tracing: Switch, pub tracing: Switch,
@ -89,7 +90,7 @@ impl SnapshotCommand {
try!(execute_upgrades(&self.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, self.compaction.compaction_profile())); try!(execute_upgrades(&self.dirs, genesis_hash, spec.fork_name.as_ref(), algorithm, self.compaction.compaction_profile()));
// prepare client config // prepare client config
let client_config = to_client_config(&self.cache_config, &self.dirs, genesis_hash, self.mode, self.tracing, self.pruning, self.compaction, self.wal, VMType::default(), "".into(), spec.fork_name.as_ref()); let client_config = to_client_config(&self.cache_config, &self.dirs, genesis_hash, self.mode, self.tracing, self.pruning, self.pruning_history, self.compaction, self.wal, VMType::default(), "".into(), spec.fork_name.as_ref());
let service = try!(ClientService::start( let service = try!(ClientService::start(
client_config, client_config,

View File

@ -70,6 +70,7 @@ struct JournalOverlay {
pending_overlay: H256FastMap<Bytes>, // Nodes being transfered from backing_overlay to backing db pending_overlay: H256FastMap<Bytes>, // Nodes being transfered from backing_overlay to backing db
journal: HashMap<u64, Vec<JournalEntry>>, journal: HashMap<u64, Vec<JournalEntry>>,
latest_era: Option<u64>, latest_era: Option<u64>,
earliest_era: Option<u64>,
} }
#[derive(PartialEq)] #[derive(PartialEq)]
@ -123,7 +124,10 @@ impl OverlayRecentDB {
fn can_reconstruct_refs(&self) -> bool { fn can_reconstruct_refs(&self) -> bool {
let reconstructed = Self::read_overlay(&self.backing, self.column); let reconstructed = Self::read_overlay(&self.backing, self.column);
let journal_overlay = self.journal_overlay.read(); let journal_overlay = self.journal_overlay.read();
*journal_overlay == reconstructed journal_overlay.backing_overlay == reconstructed.backing_overlay &&
journal_overlay.pending_overlay == reconstructed.pending_overlay &&
journal_overlay.journal == reconstructed.journal &&
journal_overlay.latest_era == reconstructed.latest_era
} }
fn payload(&self, key: &H256) -> Option<Bytes> { fn payload(&self, key: &H256) -> Option<Bytes> {
@ -135,6 +139,7 @@ impl OverlayRecentDB {
let mut overlay = MemoryDB::new(); let mut overlay = MemoryDB::new();
let mut count = 0; let mut count = 0;
let mut latest_era = None; let mut latest_era = None;
let mut earliest_era = None;
if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") { if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") {
let mut era = decode::<u64>(&val); let mut era = decode::<u64>(&val);
latest_era = Some(era); latest_era = Some(era);
@ -166,6 +171,7 @@ impl OverlayRecentDB {
deletions: deletions, deletions: deletions,
}); });
index += 1; index += 1;
earliest_era = Some(era);
}; };
if index == 0 || era == 0 { if index == 0 || era == 0 {
break; break;
@ -178,9 +184,62 @@ impl OverlayRecentDB {
backing_overlay: overlay, backing_overlay: overlay,
pending_overlay: HashMap::default(), pending_overlay: HashMap::default(),
journal: journal, journal: journal,
latest_era: latest_era } latest_era: latest_era,
earliest_era: earliest_era,
}
} }
fn apply_old_commit(batch: &DBTransaction, journal_overlay: &mut JournalOverlay, column: Option<u32>, end_era: u64, canon_id: &H256) -> Result<(), UtilError> {
// apply old commits' details
if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) {
let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new();
let mut canon_deletions: Vec<H256> = Vec::new();
let mut overlay_deletions: Vec<H256> = Vec::new();
let mut index = 0usize;
for mut journal in records.drain(..) {
//delete the record from the db
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
try!(batch.delete(column, &r.drain()));
trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len());
{
if canon_id == &journal.id {
for h in &journal.insertions {
if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) {
if rc > 0 {
canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy
}
}
}
canon_deletions = journal.deletions;
}
overlay_deletions.append(&mut journal.insertions);
}
index += 1;
}
// apply canon inserts first
for (k, v) in canon_insertions {
try!(batch.put(column, &k, &v));
journal_overlay.pending_overlay.insert(to_short_key(&k), v);
}
// update the overlay
for k in overlay_deletions {
journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k));
}
// apply canon deletions
for k in canon_deletions {
if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) {
try!(batch.delete(column, &k));
}
}
}
journal_overlay.journal.remove(&end_era);
Ok(())
}
} }
#[inline] #[inline]
@ -214,6 +273,8 @@ impl JournalDB for OverlayRecentDB {
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era } fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era }
fn earliest_era(&self) -> Option<u64> { self.journal_overlay.read().earliest_era }
fn state(&self, key: &H256) -> Option<Bytes> { fn state(&self, key: &H256) -> Option<Bytes> {
let journal_overlay = self.journal_overlay.read(); let journal_overlay = self.journal_overlay.read();
let key = to_short_key(key); let key = to_short_key(key);
@ -257,57 +318,16 @@ impl JournalDB for OverlayRecentDB {
} }
journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys }); journal_overlay.journal.entry(now).or_insert_with(Vec::new).push(JournalEntry { id: id.clone(), insertions: inserted_keys, deletions: removed_keys });
} }
let journal_overlay = &mut *journal_overlay;
// apply old commits' details
if let Some((end_era, canon_id)) = end { if let Some((end_era, canon_id)) = end {
if let Some(ref mut records) = journal_overlay.journal.get_mut(&end_era) { try!(Self::apply_old_commit(batch, &mut journal_overlay, self.column, end_era, &canon_id));
let mut canon_insertions: Vec<(H256, Bytes)> = Vec::new();
let mut canon_deletions: Vec<H256> = Vec::new();
let mut overlay_deletions: Vec<H256> = Vec::new();
let mut index = 0usize;
for mut journal in records.drain(..) {
//delete the record from the db
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
try!(batch.delete(self.column, &r.drain()));
trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): +{} -{} entries", end_era, index, journal.id, canon_id, journal.insertions.len(), journal.deletions.len());
{
if canon_id == journal.id {
for h in &journal.insertions {
if let Some((d, rc)) = journal_overlay.backing_overlay.raw(&to_short_key(h)) {
if rc > 0 {
canon_insertions.push((h.clone(), d.to_owned())); //TODO: optimize this to avoid data copy
}
}
}
canon_deletions = journal.deletions;
}
overlay_deletions.append(&mut journal.insertions);
}
index += 1;
}
// apply canon inserts first
for (k, v) in canon_insertions {
try!(batch.put(self.column, &k, &v));
journal_overlay.pending_overlay.insert(to_short_key(&k), v);
}
// update the overlay
for k in overlay_deletions {
journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k));
}
// apply canon deletions
for k in canon_deletions {
if !journal_overlay.backing_overlay.contains(&to_short_key(&k)) {
try!(batch.delete(self.column, &k));
}
}
}
journal_overlay.journal.remove(&end_era);
} }
Ok(0) Ok((0))
}
fn commit_old(&mut self, batch: &DBTransaction, end_era: u64, end_id: &H256) -> Result<(), UtilError> {
let mut journal_overlay = self.journal_overlay.write();
Self::apply_old_commit(batch, &mut journal_overlay, self.column, end_era, end_id)
} }
fn flush(&self) { fn flush(&self) {

View File

@ -35,10 +35,19 @@ pub trait JournalDB: HashDB {
/// Get the latest era in the DB. None if there isn't yet any data in there. /// Get the latest era in the DB. None if there isn't yet any data in there.
fn latest_era(&self) -> Option<u64>; fn latest_era(&self) -> Option<u64>;
/// Get the earliest era in the DB. None if there isn't yet any data in there.
fn earliest_era(&self) -> Option<u64> { None }
/// Commit all recent insert operations and canonical historical commits' removals from the /// Commit all recent insert operations and canonical historical commits' removals from the
/// old era to the backing database, reverting any non-canonical historical commit's inserts. /// old era to the backing database, reverting any non-canonical historical commit's inserts.
fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>; fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError>;
/// Commit canonical historical commits' removals from the
/// old era to the backing database, reverting any non-canonical historical commit's inserts.
fn commit_old(&mut self, _batch: &DBTransaction, _end_era: u64, _end_id: &H256) -> Result<(), UtilError> {
Ok(())
}
/// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions /// Commit all queued insert and delete operations without affecting any journalling -- this requires that all insertions
/// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated. /// and deletions are indeed canonical and will likely lead to an invalid database if that assumption is violated.
/// ///