Configurable history size in master (#2606)
* split journaldb commit into two functions: journal_under and mark_canonical * use new commit mechanism in client * Configurable history size in master * Reduce DB history * Configurable history size * Set min history size * Test * Fixed a test and reduced the limit
This commit is contained in:
parent
835cd13c0e
commit
f28b8352c1
@ -404,6 +404,10 @@ impl<'x> OpenBlock<'x> {
|
||||
uncle_bytes: uncle_bytes,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// Return mutable block reference. To be used in tests only.
|
||||
pub fn block_mut (&mut self) -> &mut ExecutedBlock { &mut self.block }
|
||||
}
|
||||
|
||||
impl<'x> IsBlock for OpenBlock<'x> {
|
||||
|
@ -74,6 +74,7 @@ pub use blockchain::CacheSize as BlockChainCacheSize;
|
||||
|
||||
const MAX_TX_QUEUE_SIZE: usize = 4096;
|
||||
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
|
||||
const MIN_HISTORY_SIZE: u64 = 8;
|
||||
|
||||
impl fmt::Display for BlockChainInfo {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
@ -141,12 +142,9 @@ pub struct Client {
|
||||
queue_transactions: AtomicUsize,
|
||||
last_hashes: RwLock<VecDeque<H256>>,
|
||||
factories: Factories,
|
||||
history: u64,
|
||||
}
|
||||
|
||||
/// The pruning constant -- how old blocks must be before we
|
||||
/// assume finality of a given candidate.
|
||||
pub const HISTORY: u64 = 1200;
|
||||
|
||||
impl Client {
|
||||
/// Create a new client with given spec and DB path and custom verifier.
|
||||
pub fn new(
|
||||
@ -177,6 +175,28 @@ impl Client {
|
||||
try!(db.write(batch).map_err(ClientError::Database));
|
||||
}
|
||||
|
||||
trace!("Cleanup journal: DB Earliest = {:?}, Latest = {:?}", state_db.journal_db().earliest_era(), state_db.journal_db().latest_era());
|
||||
|
||||
let history = if config.history < MIN_HISTORY_SIZE {
|
||||
info!(target: "client", "Ignoring pruning history parameter of {}\
|
||||
, falling back to minimum of {}",
|
||||
config.history, MIN_HISTORY_SIZE);
|
||||
MIN_HISTORY_SIZE
|
||||
} else {
|
||||
config.history
|
||||
};
|
||||
|
||||
if let (Some(earliest), Some(latest)) = (state_db.journal_db().earliest_era(), state_db.journal_db().latest_era()) {
|
||||
if latest > earliest && latest - earliest > history {
|
||||
for era in earliest..(latest - history + 1) {
|
||||
trace!("Removing era {}", era);
|
||||
let mut batch = DBTransaction::new(&db);
|
||||
try!(state_db.mark_canonical(&mut batch, era, &chain.block_hash(era).expect("Old block not found in the database")));
|
||||
try!(db.write(batch).map_err(ClientError::Database));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) {
|
||||
warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex());
|
||||
}
|
||||
@ -217,6 +237,7 @@ impl Client {
|
||||
queue_transactions: AtomicUsize::new(0),
|
||||
last_hashes: RwLock::new(VecDeque::new()),
|
||||
factories: factories,
|
||||
history: history,
|
||||
};
|
||||
Ok(Arc::new(client))
|
||||
}
|
||||
@ -275,7 +296,7 @@ impl Client {
|
||||
let chain = self.chain.read();
|
||||
// Check the block isn't so old we won't be able to enact it.
|
||||
let best_block_number = chain.best_block_number();
|
||||
if best_block_number >= HISTORY && header.number() <= best_block_number - HISTORY {
|
||||
if best_block_number >= self.history && header.number() <= best_block_number - self.history {
|
||||
warn!(target: "client", "Block import failed for #{} ({})\nBlock is ancient (current best block: #{}).", header.number(), header.hash(), best_block_number);
|
||||
return Err(());
|
||||
}
|
||||
@ -432,8 +453,8 @@ impl Client {
|
||||
|
||||
state.journal_under(&mut batch, number, hash).expect("DB commit failed");
|
||||
|
||||
if number >= HISTORY {
|
||||
let n = number - HISTORY;
|
||||
if number >= self.history {
|
||||
let n = number - self.history;
|
||||
state.mark_canonical(&mut batch, n, &chain.block_hash(n).unwrap()).expect("DB commit failed");
|
||||
}
|
||||
|
||||
@ -495,7 +516,7 @@ impl Client {
|
||||
let db = self.state_db.lock().boxed_clone();
|
||||
|
||||
// early exit for pruned blocks
|
||||
if db.is_pruned() && self.chain.read().best_block_number() >= block_number + HISTORY {
|
||||
if db.is_pruned() && self.chain.read().best_block_number() >= block_number + self.history {
|
||||
return None;
|
||||
}
|
||||
|
||||
@ -600,7 +621,7 @@ impl Client {
|
||||
let best_block_number = self.chain_info().best_block_number;
|
||||
let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at)));
|
||||
|
||||
if best_block_number > HISTORY + block_number && db.is_pruned() {
|
||||
if best_block_number > self.history + block_number && db.is_pruned() {
|
||||
return Err(snapshot::Error::OldBlockPrunedDB.into());
|
||||
}
|
||||
|
||||
@ -612,8 +633,10 @@ impl Client {
|
||||
0
|
||||
};
|
||||
|
||||
self.block_hash(BlockID::Number(start_num))
|
||||
.expect("blocks within HISTORY are always stored.")
|
||||
match self.block_hash(BlockID::Number(start_num)) {
|
||||
Some(h) => h,
|
||||
None => return Err(snapshot::Error::InvalidStartingBlock(at).into()),
|
||||
}
|
||||
}
|
||||
_ => match self.block_hash(at) {
|
||||
Some(hash) => hash,
|
||||
@ -626,6 +649,11 @@ impl Client {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ask the client what the history parameter is.
|
||||
pub fn pruning_history(&self) -> u64 {
|
||||
self.history
|
||||
}
|
||||
|
||||
fn block_hash(chain: &BlockChain, id: BlockID) -> Option<H256> {
|
||||
match id {
|
||||
BlockID::Hash(hash) => Some(hash),
|
||||
|
@ -110,6 +110,8 @@ pub struct ClientConfig {
|
||||
pub state_cache_size: usize,
|
||||
/// EVM jump-tables cache size.
|
||||
pub jump_table_size: usize,
|
||||
/// State pruning history size.
|
||||
pub history: u64,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -346,7 +346,7 @@ impl Service {
|
||||
|
||||
self.taking_snapshot.store(false, Ordering::SeqCst);
|
||||
if let Err(e) = res {
|
||||
if client.chain_info().best_block_number >= num + ::client::HISTORY {
|
||||
if client.chain_info().best_block_number >= num + client.pruning_history() {
|
||||
// "Cancelled" is mincing words a bit -- what really happened
|
||||
// is that the state we were snapshotting got pruned out
|
||||
// before we could finish.
|
||||
|
@ -24,6 +24,7 @@ use common::*;
|
||||
use devtools::*;
|
||||
use miner::Miner;
|
||||
use rlp::{Rlp, View};
|
||||
use spec::Spec;
|
||||
|
||||
#[test]
|
||||
fn imports_from_empty() {
|
||||
@ -238,3 +239,27 @@ fn can_mine() {
|
||||
|
||||
assert_eq!(*b.block().header().parent_hash(), BlockView::new(&dummy_blocks[0]).header_view().sha3());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn change_history_size() {
|
||||
let dir = RandomTempPath::new();
|
||||
let test_spec = Spec::new_null();
|
||||
let mut config = ClientConfig::default();
|
||||
let db_config = DatabaseConfig::with_columns(::db::NUM_COLUMNS);
|
||||
config.history = 2;
|
||||
let address = Address::random();
|
||||
{
|
||||
let client = Client::new(ClientConfig::default(), &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected(), &db_config).unwrap();
|
||||
for _ in 0..20 {
|
||||
let mut b = client.prepare_open_block(Address::default(), (3141562.into(), 31415620.into()), vec![]);
|
||||
b.block_mut().fields_mut().state.add_balance(&address, &5.into());
|
||||
b.block_mut().fields_mut().state.commit().unwrap();
|
||||
let b = b.close_and_lock().seal(&*test_spec.engine, vec![]).unwrap();
|
||||
client.import_sealed_block(b).unwrap(); // account change is in the journal overlay
|
||||
}
|
||||
}
|
||||
let mut config = ClientConfig::default();
|
||||
config.history = 10;
|
||||
let client = Client::new(config, &test_spec, dir.as_path(), Arc::new(Miner::with_spec(&test_spec)), IoChannel::disconnected(), &db_config).unwrap();
|
||||
assert_eq!(client.state().balance(&address), 100.into());
|
||||
}
|
||||
|
@ -77,6 +77,7 @@ pub struct ImportBlockchain {
|
||||
pub file_path: Option<String>,
|
||||
pub format: Option<DataFormat>,
|
||||
pub pruning: Pruning,
|
||||
pub pruning_history: u64,
|
||||
pub compaction: DatabaseCompactionProfile,
|
||||
pub wal: bool,
|
||||
pub mode: Mode,
|
||||
@ -94,6 +95,7 @@ pub struct ExportBlockchain {
|
||||
pub file_path: Option<String>,
|
||||
pub format: Option<DataFormat>,
|
||||
pub pruning: Pruning,
|
||||
pub pruning_history: u64,
|
||||
pub compaction: DatabaseCompactionProfile,
|
||||
pub wal: bool,
|
||||
pub mode: Mode,
|
||||
@ -156,7 +158,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<String, String> {
|
||||
try!(execute_upgrades(&db_dirs, algorithm, cmd.compaction.compaction_profile()));
|
||||
|
||||
// prepare client config
|
||||
let client_config = to_client_config(&cmd.cache_config, cmd.mode, tracing, fat_db, cmd.compaction, cmd.wal, cmd.vm_type, "".into(), algorithm);
|
||||
let client_config = to_client_config(&cmd.cache_config, cmd.mode, tracing, fat_db, cmd.compaction, cmd.wal, cmd.vm_type, "".into(), algorithm, cmd.pruning_history);
|
||||
|
||||
// build client
|
||||
let service = try!(ClientService::start(
|
||||
@ -307,7 +309,7 @@ fn execute_export(cmd: ExportBlockchain) -> Result<String, String> {
|
||||
try!(execute_upgrades(&db_dirs, algorithm, cmd.compaction.compaction_profile()));
|
||||
|
||||
// prepare client config
|
||||
let client_config = to_client_config(&cmd.cache_config, cmd.mode, tracing, fat_db, cmd.compaction, cmd.wal, VMType::default(), "".into(), algorithm);
|
||||
let client_config = to_client_config(&cmd.cache_config, cmd.mode, tracing, fat_db, cmd.compaction, cmd.wal, VMType::default(), "".into(), algorithm, cmd.pruning_history);
|
||||
|
||||
let service = try!(ClientService::start(
|
||||
client_config,
|
||||
|
@ -77,6 +77,7 @@ notify_work = ["http://localhost:3001"]
|
||||
[footprint]
|
||||
tracing = "auto"
|
||||
pruning = "auto"
|
||||
pruning_history = 64
|
||||
cache_size_db = 64
|
||||
cache_size_blocks = 8
|
||||
cache_size_queue = 50
|
||||
|
@ -46,6 +46,7 @@ tx_queue_gas = "auto"
|
||||
[footprint]
|
||||
tracing = "on"
|
||||
pruning = "fast"
|
||||
pruning_history = 64
|
||||
cache_size_db = 128
|
||||
cache_size_blocks = 16
|
||||
cache_size_queue = 100
|
||||
|
@ -207,6 +207,8 @@ usage! {
|
||||
or |c: &Config| otry!(c.footprint).tracing.clone(),
|
||||
flag_pruning: String = "auto",
|
||||
or |c: &Config| otry!(c.footprint).pruning.clone(),
|
||||
flag_pruning_history: u64 = 64u64,
|
||||
or |c: &Config| otry!(c.footprint).pruning_history.clone(),
|
||||
flag_cache_size_db: u32 = 64u32,
|
||||
or |c: &Config| otry!(c.footprint).cache_size_db.clone(),
|
||||
flag_cache_size_blocks: u32 = 8u32,
|
||||
@ -361,6 +363,7 @@ struct Mining {
|
||||
struct Footprint {
|
||||
tracing: Option<String>,
|
||||
pruning: Option<String>,
|
||||
pruning_history: Option<u64>,
|
||||
fast_and_loose: Option<bool>,
|
||||
cache_size: Option<u32>,
|
||||
cache_size_db: Option<u32>,
|
||||
@ -536,6 +539,7 @@ mod tests {
|
||||
// -- Footprint Options
|
||||
flag_tracing: "auto".into(),
|
||||
flag_pruning: "auto".into(),
|
||||
flag_pruning_history: 64u64,
|
||||
flag_cache_size_db: 64u32,
|
||||
flag_cache_size_blocks: 8u32,
|
||||
flag_cache_size_queue: 50u32,
|
||||
@ -690,6 +694,7 @@ mod tests {
|
||||
footprint: Some(Footprint {
|
||||
tracing: Some("on".into()),
|
||||
pruning: Some("fast".into()),
|
||||
pruning_history: Some(64),
|
||||
fast_and_loose: None,
|
||||
cache_size: None,
|
||||
cache_size_db: Some(128),
|
||||
|
@ -209,6 +209,8 @@ Footprint Options:
|
||||
fast - maintain journal overlay. Fast but 50MB used.
|
||||
auto - use the method most recently synced or
|
||||
default to fast if none synced (default: {flag_pruning}).
|
||||
--pruning-history NUM Set a number of recent states to keep when pruning
|
||||
is active. [default: {flag_pruning_history}].
|
||||
--cache-size-db MB Override database cache size (default: {flag_cache_size_db}).
|
||||
--cache-size-blocks MB Specify the prefered size of the blockchain cache in
|
||||
megabytes (default: {flag_cache_size_blocks}).
|
||||
|
@ -73,6 +73,7 @@ impl Configuration {
|
||||
pub fn into_command(self) -> Result<Cmd, String> {
|
||||
let dirs = self.directories();
|
||||
let pruning = try!(self.args.flag_pruning.parse());
|
||||
let pruning_history = self.args.flag_pruning_history;
|
||||
let vm_type = try!(self.vm_type());
|
||||
let mode = try!(to_mode(&self.args.flag_mode, self.args.flag_mode_timeout, self.args.flag_mode_alarm));
|
||||
let miner_options = try!(self.miner_options());
|
||||
@ -145,6 +146,7 @@ impl Configuration {
|
||||
file_path: self.args.arg_file.clone(),
|
||||
format: format,
|
||||
pruning: pruning,
|
||||
pruning_history: pruning_history,
|
||||
compaction: compaction,
|
||||
wal: wal,
|
||||
mode: mode,
|
||||
@ -162,6 +164,7 @@ impl Configuration {
|
||||
file_path: self.args.arg_file.clone(),
|
||||
format: format,
|
||||
pruning: pruning,
|
||||
pruning_history: pruning_history,
|
||||
compaction: compaction,
|
||||
wal: wal,
|
||||
mode: mode,
|
||||
@ -177,6 +180,7 @@ impl Configuration {
|
||||
dirs: dirs,
|
||||
spec: spec,
|
||||
pruning: pruning,
|
||||
pruning_history: pruning_history,
|
||||
logger_config: logger_config,
|
||||
mode: mode,
|
||||
tracing: tracing,
|
||||
@ -194,6 +198,7 @@ impl Configuration {
|
||||
dirs: dirs,
|
||||
spec: spec,
|
||||
pruning: pruning,
|
||||
pruning_history: pruning_history,
|
||||
logger_config: logger_config,
|
||||
mode: mode,
|
||||
tracing: tracing,
|
||||
@ -217,6 +222,7 @@ impl Configuration {
|
||||
dirs: dirs,
|
||||
spec: spec,
|
||||
pruning: pruning,
|
||||
pruning_history: pruning_history,
|
||||
daemon: daemon,
|
||||
logger_config: logger_config,
|
||||
miner_options: miner_options,
|
||||
@ -721,6 +727,7 @@ mod tests {
|
||||
file_path: Some("blockchain.json".into()),
|
||||
format: Default::default(),
|
||||
pruning: Default::default(),
|
||||
pruning_history: 64,
|
||||
compaction: Default::default(),
|
||||
wal: true,
|
||||
mode: Default::default(),
|
||||
@ -741,6 +748,7 @@ mod tests {
|
||||
dirs: Default::default(),
|
||||
file_path: Some("blockchain.json".into()),
|
||||
pruning: Default::default(),
|
||||
pruning_history: 64,
|
||||
format: Default::default(),
|
||||
compaction: Default::default(),
|
||||
wal: true,
|
||||
@ -763,6 +771,7 @@ mod tests {
|
||||
dirs: Default::default(),
|
||||
file_path: Some("blockchain.json".into()),
|
||||
pruning: Default::default(),
|
||||
pruning_history: 64,
|
||||
format: Some(DataFormat::Hex),
|
||||
compaction: Default::default(),
|
||||
wal: true,
|
||||
@ -791,6 +800,7 @@ mod tests {
|
||||
dirs: Default::default(),
|
||||
spec: Default::default(),
|
||||
pruning: Default::default(),
|
||||
pruning_history: 64,
|
||||
daemon: None,
|
||||
logger_config: Default::default(),
|
||||
miner_options: Default::default(),
|
||||
|
@ -205,6 +205,7 @@ pub fn to_client_config(
|
||||
vm_type: VMType,
|
||||
name: String,
|
||||
pruning: Algorithm,
|
||||
pruning_history: u64,
|
||||
) -> ClientConfig {
|
||||
let mut client_config = ClientConfig::default();
|
||||
|
||||
@ -232,6 +233,7 @@ pub fn to_client_config(
|
||||
client_config.tracing.enabled = tracing;
|
||||
client_config.fat_db = fat_db;
|
||||
client_config.pruning = pruning;
|
||||
client_config.history = pruning_history;
|
||||
client_config.db_compaction = compaction;
|
||||
client_config.db_wal = wal;
|
||||
client_config.vm_type = vm_type;
|
||||
|
@ -61,6 +61,7 @@ pub struct RunCmd {
|
||||
pub dirs: Directories,
|
||||
pub spec: SpecType,
|
||||
pub pruning: Pruning,
|
||||
pub pruning_history: u64,
|
||||
/// Some if execution should be daemonized. Contains pid_file path.
|
||||
pub daemon: Option<String>,
|
||||
pub logger_config: LogConfig,
|
||||
@ -193,6 +194,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
|
||||
cmd.vm_type,
|
||||
cmd.name,
|
||||
algorithm,
|
||||
cmd.pruning_history,
|
||||
);
|
||||
|
||||
// set up bootnodes
|
||||
|
@ -54,6 +54,7 @@ pub struct SnapshotCommand {
|
||||
pub dirs: Directories,
|
||||
pub spec: SpecType,
|
||||
pub pruning: Pruning,
|
||||
pub pruning_history: u64,
|
||||
pub logger_config: LogConfig,
|
||||
pub mode: Mode,
|
||||
pub tracing: Switch,
|
||||
@ -162,7 +163,7 @@ impl SnapshotCommand {
|
||||
try!(execute_upgrades(&db_dirs, algorithm, self.compaction.compaction_profile()));
|
||||
|
||||
// prepare client config
|
||||
let client_config = to_client_config(&self.cache_config, self.mode, tracing, fat_db, self.compaction, self.wal, VMType::default(), "".into(), algorithm);
|
||||
let client_config = to_client_config(&self.cache_config, self.mode, tracing, fat_db, self.compaction, self.wal, VMType::default(), "".into(), algorithm, self.pruning_history);
|
||||
|
||||
let service = try!(ClientService::start(
|
||||
client_config,
|
||||
|
@ -70,6 +70,7 @@ struct JournalOverlay {
|
||||
pending_overlay: H256FastMap<Bytes>, // Nodes being transfered from backing_overlay to backing db
|
||||
journal: HashMap<u64, Vec<JournalEntry>>,
|
||||
latest_era: Option<u64>,
|
||||
earliest_era: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
@ -123,7 +124,10 @@ impl OverlayRecentDB {
|
||||
fn can_reconstruct_refs(&self) -> bool {
|
||||
let reconstructed = Self::read_overlay(&self.backing, self.column);
|
||||
let journal_overlay = self.journal_overlay.read();
|
||||
*journal_overlay == reconstructed
|
||||
journal_overlay.backing_overlay == reconstructed.backing_overlay &&
|
||||
journal_overlay.pending_overlay == reconstructed.pending_overlay &&
|
||||
journal_overlay.journal == reconstructed.journal &&
|
||||
journal_overlay.latest_era == reconstructed.latest_era
|
||||
}
|
||||
|
||||
fn payload(&self, key: &H256) -> Option<Bytes> {
|
||||
@ -135,6 +139,7 @@ impl OverlayRecentDB {
|
||||
let mut overlay = MemoryDB::new();
|
||||
let mut count = 0;
|
||||
let mut latest_era = None;
|
||||
let mut earliest_era = None;
|
||||
if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") {
|
||||
let mut era = decode::<u64>(&val);
|
||||
latest_era = Some(era);
|
||||
@ -166,6 +171,7 @@ impl OverlayRecentDB {
|
||||
deletions: deletions,
|
||||
});
|
||||
index += 1;
|
||||
earliest_era = Some(era);
|
||||
};
|
||||
if index == 0 || era == 0 {
|
||||
break;
|
||||
@ -178,8 +184,11 @@ impl OverlayRecentDB {
|
||||
backing_overlay: overlay,
|
||||
pending_overlay: HashMap::default(),
|
||||
journal: journal,
|
||||
latest_era: latest_era }
|
||||
latest_era: latest_era,
|
||||
earliest_era: earliest_era,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -214,6 +223,8 @@ impl JournalDB for OverlayRecentDB {
|
||||
|
||||
fn latest_era(&self) -> Option<u64> { self.journal_overlay.read().latest_era }
|
||||
|
||||
fn earliest_era(&self) -> Option<u64> { self.journal_overlay.read().earliest_era }
|
||||
|
||||
fn state(&self, key: &H256) -> Option<Bytes> {
|
||||
let journal_overlay = self.journal_overlay.read();
|
||||
let key = to_short_key(key);
|
||||
|
@ -32,6 +32,9 @@ pub trait JournalDB: HashDB {
|
||||
/// Check if this database has any commits
|
||||
fn is_empty(&self) -> bool;
|
||||
|
||||
/// Get the earliest era in the DB. None if there isn't yet any data in there.
|
||||
fn earliest_era(&self) -> Option<u64> { None }
|
||||
|
||||
/// Get the latest era in the DB. None if there isn't yet any data in there.
|
||||
fn latest_era(&self) -> Option<u64>;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user