Memory-based pruning history size (#4114)

* prune states based on memory param

* pruning memory CLI and usage in sync

* return purged value from memorydb

* calculate memory used incrementally in overlayrecentdb

* refactor shared history pruning code in client

* Fixed usage alignment

* journal_size function for fast memory calculation
This commit is contained in:
Robert Habermeier 2017-01-20 13:25:53 +01:00 committed by Gav Wood
parent 97a60ceab1
commit 203fd8a471
17 changed files with 220 additions and 53 deletions

View File

@ -206,17 +206,6 @@ impl Client {
config.history config.history
}; };
if let (Some(earliest), Some(latest)) = (state_db.journal_db().earliest_era(), state_db.journal_db().latest_era()) {
if latest > earliest && latest - earliest > history {
for era in earliest..(latest - history + 1) {
trace!("Removing era {}", era);
let mut batch = DBTransaction::new(&db);
state_db.mark_canonical(&mut batch, era, &chain.block_hash(era).expect("Old block not found in the database"))?;
db.write(batch).map_err(ClientError::Database)?;
}
}
}
if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) { if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) {
warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex()); warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex());
} }
@ -257,6 +246,13 @@ impl Client {
on_mode_change: Mutex::new(None), on_mode_change: Mutex::new(None),
registrar: Mutex::new(None), registrar: Mutex::new(None),
}); });
{
let state_db = client.state_db.lock().boxed_clone();
let chain = client.chain.read();
client.prune_ancient(state_db, &chain)?;
}
if let Some(reg_addr) = client.additional_params().get("registrar").and_then(|s| Address::from_str(s).ok()) { if let Some(reg_addr) = client.additional_params().get("registrar").and_then(|s| Address::from_str(s).ok()) {
trace!(target: "client", "Found registrar at {}", reg_addr); trace!(target: "client", "Found registrar at {}", reg_addr);
let weak = Arc::downgrade(&client); let weak = Arc::downgrade(&client);
@ -553,16 +549,6 @@ impl Client {
let mut state = block.drain(); let mut state = block.drain();
state.journal_under(&mut batch, number, hash).expect("DB commit failed"); state.journal_under(&mut batch, number, hash).expect("DB commit failed");
if number >= self.history {
let n = number - self.history;
if let Some(ancient_hash) = chain.block_hash(n) {
state.mark_canonical(&mut batch, n, &ancient_hash).expect("DB commit failed");
} else {
debug!(target: "client", "Missing expected hash for block {}", n);
}
}
let route = chain.insert_block(&mut batch, block_data, receipts); let route = chain.insert_block(&mut batch, block_data, receipts);
self.tracedb.read().import(&mut batch, TraceImportRequest { self.tracedb.read().import(&mut batch, TraceImportRequest {
traces: traces.into(), traces: traces.into(),
@ -578,9 +564,49 @@ impl Client {
self.db.read().write_buffered(batch); self.db.read().write_buffered(batch);
chain.commit(); chain.commit();
self.update_last_hashes(&parent, hash); self.update_last_hashes(&parent, hash);
if let Err(e) = self.prune_ancient(state, &chain) {
warn!("Failed to prune ancient state data: {}", e);
}
route route
} }
// prune ancient states until below the memory limit or only the minimum amount remain.
fn prune_ancient(&self, mut state_db: StateDB, chain: &BlockChain) -> Result<(), ClientError> {
let number = match state_db.journal_db().latest_era() {
Some(n) => n,
None => return Ok(()),
};
// prune all ancient eras until we're below the memory target,
// but have at least the minimum number of states.
loop {
let needs_pruning = state_db.journal_db().is_pruned() &&
state_db.journal_db().journal_size() >= self.config.history_mem;
if !needs_pruning { break }
match state_db.journal_db().earliest_era() {
Some(era) if era + self.history <= number => {
trace!(target: "client", "Pruning state for ancient era {}", era);
match chain.block_hash(era) {
Some(ancient_hash) => {
let mut batch = DBTransaction::new(&self.db.read());
state_db.mark_canonical(&mut batch, era, &ancient_hash)?;
self.db.read().write_buffered(batch);
state_db.journal_db().flush();
}
None =>
debug!(target: "client", "Missing expected hash for block {}", era),
}
}
_ => break, // means that every era is kept, no pruning necessary.
}
}
Ok(())
}
fn update_last_hashes(&self, parent: &H256, hash: &H256) { fn update_last_hashes(&self, parent: &H256, hash: &H256) {
let mut hashes = self.last_hashes.write(); let mut hashes = self.last_hashes.write();
if hashes.front().map_or(false, |h| h == parent) { if hashes.front().map_or(false, |h| h == parent) {
@ -1408,7 +1434,6 @@ impl BlockChainClient for Client {
PruningInfo { PruningInfo {
earliest_chain: self.chain.read().first_block_number().unwrap_or(1), earliest_chain: self.chain.read().first_block_number().unwrap_or(1),
earliest_state: self.state_db.lock().journal_db().earliest_era().unwrap_or(0), earliest_state: self.state_db.lock().journal_db().earliest_era().unwrap_or(0),
state_history_size: Some(self.history),
} }
} }

View File

@ -129,8 +129,10 @@ pub struct ClientConfig {
pub state_cache_size: usize, pub state_cache_size: usize,
/// EVM jump-tables cache size. /// EVM jump-tables cache size.
pub jump_table_size: usize, pub jump_table_size: usize,
/// State pruning history size. /// Minimum state pruning history size.
pub history: u64, pub history: u64,
/// Ideal memory usage for state pruning history.
pub history_mem: usize,
/// Check seal valididity on block import /// Check seal valididity on block import
pub check_seal: bool, pub check_seal: bool,
} }

View File

@ -714,10 +714,10 @@ impl BlockChainClient for TestBlockChainClient {
fn disable(&self) { unimplemented!(); } fn disable(&self) { unimplemented!(); }
fn pruning_info(&self) -> PruningInfo { fn pruning_info(&self) -> PruningInfo {
let best_num = self.chain_info().best_block_number;
PruningInfo { PruningInfo {
earliest_chain: 1, earliest_chain: 1,
earliest_state: 1, earliest_state: self.history.read().as_ref().map(|x| best_num - x).unwrap_or(0),
state_history_size: *self.history.read(),
} }
} }

View File

@ -28,6 +28,4 @@ pub struct PruningInfo {
pub earliest_chain: u64, pub earliest_chain: u64,
/// The first block where state requests may be served. /// The first block where state requests may be served.
pub earliest_state: u64, pub earliest_state: u64,
/// State pruning history size.
pub state_history_size: Option<u64>,
} }

View File

@ -85,6 +85,7 @@ pub struct ImportBlockchain {
pub format: Option<DataFormat>, pub format: Option<DataFormat>,
pub pruning: Pruning, pub pruning: Pruning,
pub pruning_history: u64, pub pruning_history: u64,
pub pruning_memory: usize,
pub compaction: DatabaseCompactionProfile, pub compaction: DatabaseCompactionProfile,
pub wal: bool, pub wal: bool,
pub tracing: Switch, pub tracing: Switch,
@ -104,6 +105,7 @@ pub struct ExportBlockchain {
pub format: Option<DataFormat>, pub format: Option<DataFormat>,
pub pruning: Pruning, pub pruning: Pruning,
pub pruning_history: u64, pub pruning_history: u64,
pub pruning_memory: usize,
pub compaction: DatabaseCompactionProfile, pub compaction: DatabaseCompactionProfile,
pub wal: bool, pub wal: bool,
pub fat_db: Switch, pub fat_db: Switch,
@ -122,6 +124,7 @@ pub struct ExportState {
pub format: Option<DataFormat>, pub format: Option<DataFormat>,
pub pruning: Pruning, pub pruning: Pruning,
pub pruning_history: u64, pub pruning_history: u64,
pub pruning_memory: usize,
pub compaction: DatabaseCompactionProfile, pub compaction: DatabaseCompactionProfile,
pub wal: bool, pub wal: bool,
pub fat_db: Switch, pub fat_db: Switch,
@ -196,6 +199,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
"".into(), "".into(),
algorithm, algorithm,
cmd.pruning_history, cmd.pruning_history,
cmd.pruning_memory,
cmd.check_seal cmd.check_seal
); );
@ -310,6 +314,7 @@ fn start_client(
spec: SpecType, spec: SpecType,
pruning: Pruning, pruning: Pruning,
pruning_history: u64, pruning_history: u64,
pruning_memory: usize,
tracing: Switch, tracing: Switch,
fat_db: Switch, fat_db: Switch,
compaction: DatabaseCompactionProfile, compaction: DatabaseCompactionProfile,
@ -354,7 +359,20 @@ fn start_client(
dirs.create_dirs(false, false)?; dirs.create_dirs(false, false)?;
// prepare client config // prepare client config
let client_config = to_client_config(&cache_config, Mode::Active, tracing, fat_db, compaction, wal, VMType::default(), "".into(), algorithm, pruning_history, true); let client_config = to_client_config(
&cache_config,
Mode::Active,
tracing,
fat_db,
compaction,
wal,
VMType::default(),
"".into(),
algorithm,
pruning_history,
pruning_memory,
true,
);
let service = ClientService::start( let service = ClientService::start(
client_config, client_config,
@ -371,7 +389,18 @@ fn start_client(
fn execute_export(cmd: ExportBlockchain) -> Result<(), String> { fn execute_export(cmd: ExportBlockchain) -> Result<(), String> {
// Setup panic handler // Setup panic handler
let service = start_client(cmd.dirs, cmd.spec, cmd.pruning, cmd.pruning_history, cmd.tracing, cmd.fat_db, cmd.compaction, cmd.wal, cmd.cache_config)?; let service = start_client(
cmd.dirs,
cmd.spec,
cmd.pruning,
cmd.pruning_history,
cmd.pruning_memory,
cmd.tracing,
cmd.fat_db,
cmd.compaction,
cmd.wal,
cmd.cache_config
)?;
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
let format = cmd.format.unwrap_or_default(); let format = cmd.format.unwrap_or_default();
@ -403,7 +432,19 @@ fn execute_export(cmd: ExportBlockchain) -> Result<(), String> {
fn execute_export_state(cmd: ExportState) -> Result<(), String> { fn execute_export_state(cmd: ExportState) -> Result<(), String> {
// Setup panic handler // Setup panic handler
let service = start_client(cmd.dirs, cmd.spec, cmd.pruning, cmd.pruning_history, cmd.tracing, cmd.fat_db, cmd.compaction, cmd.wal, cmd.cache_config)?; let service = start_client(
cmd.dirs,
cmd.spec,
cmd.pruning,
cmd.pruning_history,
cmd.pruning_memory,
cmd.tracing,
cmd.fat_db,
cmd.compaction,
cmd.wal,
cmd.cache_config
)?;
let panic_handler = PanicHandler::new_in_arc(); let panic_handler = PanicHandler::new_in_arc();
panic_handler.forward_from(&service); panic_handler.forward_from(&service);

View File

@ -95,6 +95,7 @@ notify_work = ["http://localhost:3001"]
tracing = "auto" tracing = "auto"
pruning = "auto" pruning = "auto"
pruning_history = 1200 pruning_history = 1200
pruning_memory = 500
cache_size_db = 64 cache_size_db = 64
cache_size_blocks = 8 cache_size_blocks = 8
cache_size_queue = 50 cache_size_queue = 50

View File

@ -238,6 +238,8 @@ usage! {
or |c: &Config| otry!(c.footprint).pruning.clone(), or |c: &Config| otry!(c.footprint).pruning.clone(),
flag_pruning_history: u64 = 1200u64, flag_pruning_history: u64 = 1200u64,
or |c: &Config| otry!(c.footprint).pruning_history.clone(), or |c: &Config| otry!(c.footprint).pruning_history.clone(),
flag_pruning_memory: usize = 150usize,
or |c: &Config| otry!(c.footprint).pruning_memory.clone(),
flag_cache_size_db: u32 = 64u32, flag_cache_size_db: u32 = 64u32,
or |c: &Config| otry!(c.footprint).cache_size_db.clone(), or |c: &Config| otry!(c.footprint).cache_size_db.clone(),
flag_cache_size_blocks: u32 = 8u32, flag_cache_size_blocks: u32 = 8u32,
@ -421,6 +423,7 @@ struct Footprint {
tracing: Option<String>, tracing: Option<String>,
pruning: Option<String>, pruning: Option<String>,
pruning_history: Option<u64>, pruning_history: Option<u64>,
pruning_memory: Option<usize>,
fast_and_loose: Option<bool>, fast_and_loose: Option<bool>,
cache_size: Option<u32>, cache_size: Option<u32>,
cache_size_db: Option<u32>, cache_size_db: Option<u32>,
@ -635,6 +638,7 @@ mod tests {
flag_tracing: "auto".into(), flag_tracing: "auto".into(),
flag_pruning: "auto".into(), flag_pruning: "auto".into(),
flag_pruning_history: 1200u64, flag_pruning_history: 1200u64,
flag_pruning_memory: 500usize,
flag_cache_size_db: 64u32, flag_cache_size_db: 64u32,
flag_cache_size_blocks: 8u32, flag_cache_size_blocks: 8u32,
flag_cache_size_queue: 50u32, flag_cache_size_queue: 50u32,
@ -812,6 +816,7 @@ mod tests {
tracing: Some("on".into()), tracing: Some("on".into()),
pruning: Some("fast".into()), pruning: Some("fast".into()),
pruning_history: Some(64), pruning_history: Some(64),
pruning_memory: None,
fast_and_loose: None, fast_and_loose: None,
cache_size: None, cache_size: None,
cache_size_db: Some(128), cache_size_db: Some(128),

View File

@ -271,8 +271,12 @@ Footprint Options:
fast - maintain journal overlay. Fast but 50MB used. fast - maintain journal overlay. Fast but 50MB used.
auto - use the method most recently synced or auto - use the method most recently synced or
default to fast if none synced (default: {flag_pruning}). default to fast if none synced (default: {flag_pruning}).
--pruning-history NUM Set a number of recent states to keep when pruning --pruning-history NUM Set a minimum number of recent states to keep when pruning
is active. (default: {flag_pruning_history}). is active. (default: {flag_pruning_history}).
--pruning-memory MB The ideal amount of memory in megabytes to use to store
recent states. As many states as possible will be kept
within this limit, and at least --pruning-history states
will always be kept. (default: {flag_pruning_memory})
--cache-size-db MB Override database cache size (default: {flag_cache_size_db}). --cache-size-db MB Override database cache size (default: {flag_cache_size_db}).
--cache-size-blocks MB Specify the prefered size of the blockchain cache in --cache-size-blocks MB Specify the prefered size of the blockchain cache in
megabytes (default: {flag_cache_size_blocks}). megabytes (default: {flag_cache_size_blocks}).

View File

@ -214,6 +214,7 @@ impl Configuration {
format: format, format: format,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history, pruning_history: pruning_history,
pruning_memory: self.args.flag_pruning_memory,
compaction: compaction, compaction: compaction,
wal: wal, wal: wal,
tracing: tracing, tracing: tracing,
@ -234,6 +235,7 @@ impl Configuration {
format: format, format: format,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history, pruning_history: pruning_history,
pruning_memory: self.args.flag_pruning_memory,
compaction: compaction, compaction: compaction,
wal: wal, wal: wal,
tracing: tracing, tracing: tracing,
@ -252,6 +254,7 @@ impl Configuration {
format: format, format: format,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history, pruning_history: pruning_history,
pruning_memory: self.args.flag_pruning_memory,
compaction: compaction, compaction: compaction,
wal: wal, wal: wal,
tracing: tracing, tracing: tracing,
@ -273,6 +276,7 @@ impl Configuration {
spec: spec, spec: spec,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history, pruning_history: pruning_history,
pruning_memory: self.args.flag_pruning_memory,
tracing: tracing, tracing: tracing,
fat_db: fat_db, fat_db: fat_db,
compaction: compaction, compaction: compaction,
@ -289,6 +293,7 @@ impl Configuration {
spec: spec, spec: spec,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history, pruning_history: pruning_history,
pruning_memory: self.args.flag_pruning_memory,
tracing: tracing, tracing: tracing,
fat_db: fat_db, fat_db: fat_db,
compaction: compaction, compaction: compaction,
@ -313,6 +318,7 @@ impl Configuration {
spec: spec, spec: spec,
pruning: pruning, pruning: pruning,
pruning_history: pruning_history, pruning_history: pruning_history,
pruning_memory: self.args.flag_pruning_memory,
daemon: daemon, daemon: daemon,
logger_config: logger_config.clone(), logger_config: logger_config.clone(),
miner_options: miner_options, miner_options: miner_options,
@ -943,6 +949,7 @@ mod tests {
format: Default::default(), format: Default::default(),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 1200, pruning_history: 1200,
pruning_memory: 150,
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
tracing: Default::default(), tracing: Default::default(),
@ -965,6 +972,7 @@ mod tests {
file_path: Some("blockchain.json".into()), file_path: Some("blockchain.json".into()),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 1200, pruning_history: 1200,
pruning_memory: 150,
format: Default::default(), format: Default::default(),
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
@ -987,6 +995,7 @@ mod tests {
file_path: Some("state.json".into()), file_path: Some("state.json".into()),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 1200, pruning_history: 1200,
pruning_memory: 150,
format: Default::default(), format: Default::default(),
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
@ -1011,6 +1020,7 @@ mod tests {
file_path: Some("blockchain.json".into()), file_path: Some("blockchain.json".into()),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 1200, pruning_history: 1200,
pruning_memory: 150,
format: Some(DataFormat::Hex), format: Some(DataFormat::Hex),
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
@ -1046,6 +1056,7 @@ mod tests {
spec: Default::default(), spec: Default::default(),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 1200, pruning_history: 1200,
pruning_memory: 150,
daemon: None, daemon: None,
logger_config: Default::default(), logger_config: Default::default(),
miner_options: Default::default(), miner_options: Default::default(),

View File

@ -224,6 +224,7 @@ pub fn to_client_config(
name: String, name: String,
pruning: Algorithm, pruning: Algorithm,
pruning_history: u64, pruning_history: u64,
pruning_memory: usize,
check_seal: bool, check_seal: bool,
) -> ClientConfig { ) -> ClientConfig {
let mut client_config = ClientConfig::default(); let mut client_config = ClientConfig::default();
@ -247,6 +248,8 @@ pub fn to_client_config(
client_config.state_cache_size = cache_config.state() as usize * mb; client_config.state_cache_size = cache_config.state() as usize * mb;
// in bytes // in bytes
client_config.jump_table_size = cache_config.jump_tables() as usize * mb; client_config.jump_table_size = cache_config.jump_tables() as usize * mb;
// in bytes
client_config.history_mem = pruning_memory * mb;
client_config.mode = mode; client_config.mode = mode;
client_config.tracing.enabled = tracing; client_config.tracing.enabled = tracing;

View File

@ -68,6 +68,7 @@ pub struct RunCmd {
pub spec: SpecType, pub spec: SpecType,
pub pruning: Pruning, pub pruning: Pruning,
pub pruning_history: u64, pub pruning_history: u64,
pub pruning_memory: usize,
/// Some if execution should be daemonized. Contains pid_file path. /// Some if execution should be daemonized. Contains pid_file path.
pub daemon: Option<String>, pub daemon: Option<String>,
pub logger_config: LogConfig, pub logger_config: LogConfig,
@ -273,6 +274,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
cmd.name, cmd.name,
algorithm, algorithm,
cmd.pruning_history, cmd.pruning_history,
cmd.pruning_memory,
cmd.check_seal, cmd.check_seal,
); );

View File

@ -54,6 +54,7 @@ pub struct SnapshotCommand {
pub spec: SpecType, pub spec: SpecType,
pub pruning: Pruning, pub pruning: Pruning,
pub pruning_history: u64, pub pruning_history: u64,
pub pruning_memory: usize,
pub tracing: Switch, pub tracing: Switch,
pub fat_db: Switch, pub fat_db: Switch,
pub compaction: DatabaseCompactionProfile, pub compaction: DatabaseCompactionProfile,
@ -170,7 +171,20 @@ impl SnapshotCommand {
execute_upgrades(&self.dirs.base, &db_dirs, algorithm, self.compaction.compaction_profile(db_dirs.db_root_path().as_path()))?; execute_upgrades(&self.dirs.base, &db_dirs, algorithm, self.compaction.compaction_profile(db_dirs.db_root_path().as_path()))?;
// prepare client config // prepare client config
let client_config = to_client_config(&self.cache_config, Mode::Active, tracing, fat_db, self.compaction, self.wal, VMType::default(), "".into(), algorithm, self.pruning_history, true); let client_config = to_client_config(
&self.cache_config,
Mode::Active,
tracing,
fat_db,
self.compaction,
self.wal,
VMType::default(),
"".into(),
algorithm,
self.pruning_history,
self.pruning_memory,
true
);
let service = ClientService::start( let service = ClientService::start(
client_config, client_config,

View File

@ -103,15 +103,16 @@ pub struct BlockDownloader {
download_receipts: bool, download_receipts: bool,
/// Sync up to the block with this hash. /// Sync up to the block with this hash.
target_hash: Option<H256>, target_hash: Option<H256>,
/// Reorganize up to this many blocks. Up to genesis if `None`,
max_reorg_blocks: Option<BlockNumber>,
/// Probing range for seeking common best block. /// Probing range for seeking common best block.
retract_step: u64, retract_step: u64,
/// Whether reorg should be limited.
limit_reorg: bool,
} }
impl BlockDownloader { impl BlockDownloader {
/// Create a new instance of syncing strategy. /// Create a new instance of syncing strategy. This won't reorganize to before the
pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber, max_reorg: Option<BlockNumber>) -> BlockDownloader { /// last kept state.
pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self {
BlockDownloader { BlockDownloader {
state: State::Idle, state: State::Idle,
highest_block: None, highest_block: None,
@ -124,8 +125,27 @@ impl BlockDownloader {
round_parents: VecDeque::new(), round_parents: VecDeque::new(),
download_receipts: sync_receipts, download_receipts: sync_receipts,
target_hash: None, target_hash: None,
max_reorg_blocks: max_reorg,
retract_step: 1, retract_step: 1,
limit_reorg: true,
}
}
/// Create a new instance of sync with unlimited reorg allowed.
pub fn with_unlimited_reorg(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self {
BlockDownloader {
state: State::Idle,
highest_block: None,
last_imported_block: start_number,
last_imported_hash: start_hash.clone(),
last_round_start: start_number,
last_round_start_hash: start_hash.clone(),
blocks: BlockCollection::new(sync_receipts),
imported_this_round: None,
round_parents: VecDeque::new(),
download_receipts: sync_receipts,
target_hash: None,
retract_step: 1,
limit_reorg: false,
} }
} }
@ -268,7 +288,9 @@ impl BlockDownloader {
return Ok(DownloadAction::Reset); return Ok(DownloadAction::Reset);
} else { } else {
let best = io.chain().chain_info().best_block_number; let best = io.chain().chain_info().best_block_number;
if best > self.last_imported_block && (self.last_imported_block == 0 || best - self.last_imported_block > self.max_reorg_blocks.unwrap_or(u64::max_value())) { let oldest_reorg = io.chain().pruning_info().earliest_state;
let last = self.last_imported_block;
if self.limit_reorg && best > last && (last == 0 || last < oldest_reorg) {
trace!(target: "sync", "No common block, disabling peer"); trace!(target: "sync", "No common block, disabling peer");
return Err(BlockDownloaderImportError::Invalid); return Err(BlockDownloaderImportError::Invalid);
} }
@ -359,7 +381,8 @@ impl BlockDownloader {
trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
} else { } else {
let best = io.chain().chain_info().best_block_number; let best = io.chain().chain_info().best_block_number;
if best > start && (start == 0 || best - start > self.max_reorg_blocks.unwrap_or(u64::max_value())) { let oldest_reorg = io.chain().pruning_info().earliest_state;
if self.limit_reorg && best > start && start < oldest_reorg {
debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash);
self.reset(); self.reset();
} else { } else {

View File

@ -384,7 +384,6 @@ impl ChainSync {
/// Create a new instance of syncing strategy. /// Create a new instance of syncing strategy.
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync { pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
let chain_info = chain.chain_info(); let chain_info = chain.chain_info();
let pruning = chain.pruning_info();
let mut sync = ChainSync { let mut sync = ChainSync {
state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle }, state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle },
starting_block: chain.chain_info().best_block_number, starting_block: chain.chain_info().best_block_number,
@ -392,7 +391,7 @@ impl ChainSync {
peers: HashMap::new(), peers: HashMap::new(),
handshaking_peers: HashMap::new(), handshaking_peers: HashMap::new(),
active_peers: HashSet::new(), active_peers: HashSet::new(),
new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number, pruning.state_history_size), new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number),
old_blocks: None, old_blocks: None,
last_sent_block_number: 0, last_sent_block_number: 0,
network_id: config.network_id, network_id: config.network_id,
@ -567,15 +566,14 @@ impl ChainSync {
/// Update sync after the blockchain has been changed externally. /// Update sync after the blockchain has been changed externally.
pub fn update_targets(&mut self, chain: &BlockChainClient) { pub fn update_targets(&mut self, chain: &BlockChainClient) {
// Do not assume that the block queue/chain still has our last_imported_block // Do not assume that the block queue/chain still has our last_imported_block
let pruning = chain.pruning_info();
let chain = chain.chain_info(); let chain = chain.chain_info();
self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number, pruning.state_history_size); self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number);
self.old_blocks = None; self.old_blocks = None;
if self.download_old_blocks { if self.download_old_blocks {
if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) {
trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number);
let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number, None); let mut downloader = BlockDownloader::with_unlimited_reorg(true, &ancient_block_hash, ancient_block_number);
if let Some(hash) = chain.first_block_hash { if let Some(hash) = chain.first_block_hash {
trace!(target: "sync", "Downloader target set to {:?}", hash); trace!(target: "sync", "Downloader target set to {:?}", hash);
downloader.set_target(&hash); downloader.set_target(&hash);

View File

@ -71,6 +71,7 @@ struct JournalOverlay {
journal: HashMap<u64, Vec<JournalEntry>>, journal: HashMap<u64, Vec<JournalEntry>>,
latest_era: Option<u64>, latest_era: Option<u64>,
earliest_era: Option<u64>, earliest_era: Option<u64>,
cumulative_size: usize, // cumulative size of all entries.
} }
#[derive(PartialEq)] #[derive(PartialEq)]
@ -127,7 +128,8 @@ impl OverlayRecentDB {
journal_overlay.backing_overlay == reconstructed.backing_overlay && journal_overlay.backing_overlay == reconstructed.backing_overlay &&
journal_overlay.pending_overlay == reconstructed.pending_overlay && journal_overlay.pending_overlay == reconstructed.pending_overlay &&
journal_overlay.journal == reconstructed.journal && journal_overlay.journal == reconstructed.journal &&
journal_overlay.latest_era == reconstructed.latest_era journal_overlay.latest_era == reconstructed.latest_era &&
journal_overlay.cumulative_size == reconstructed.cumulative_size
} }
fn payload(&self, key: &H256) -> Option<DBValue> { fn payload(&self, key: &H256) -> Option<DBValue> {
@ -140,6 +142,7 @@ impl OverlayRecentDB {
let mut count = 0; let mut count = 0;
let mut latest_era = None; let mut latest_era = None;
let mut earliest_era = None; let mut earliest_era = None;
let mut cumulative_size = 0;
if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") { if let Some(val) = db.get(col, &LATEST_ERA_KEY).expect("Low-level database error.") {
let mut era = decode::<u64>(&val); let mut era = decode::<u64>(&val);
latest_era = Some(era); latest_era = Some(era);
@ -161,7 +164,14 @@ impl OverlayRecentDB {
for r in insertions.iter() { for r in insertions.iter() {
let k: H256 = r.val_at(0); let k: H256 = r.val_at(0);
let v = r.at(1).data(); let v = r.at(1).data();
overlay.emplace(to_short_key(&k), DBValue::from_slice(v));
let short_key = to_short_key(&k);
if !overlay.contains(&short_key) {
cumulative_size += v.len();
}
overlay.emplace(short_key, DBValue::from_slice(v));
inserted_keys.push(k); inserted_keys.push(k);
count += 1; count += 1;
} }
@ -186,6 +196,7 @@ impl OverlayRecentDB {
journal: journal, journal: journal,
latest_era: latest_era, latest_era: latest_era,
earliest_era: earliest_era, earliest_era: earliest_era,
cumulative_size: cumulative_size,
} }
} }
@ -207,12 +218,19 @@ impl JournalDB for OverlayRecentDB {
fn mem_used(&self) -> usize { fn mem_used(&self) -> usize {
let mut mem = self.transaction_overlay.mem_used(); let mut mem = self.transaction_overlay.mem_used();
let overlay = self.journal_overlay.read(); let overlay = self.journal_overlay.read();
mem += overlay.backing_overlay.mem_used(); mem += overlay.backing_overlay.mem_used();
mem += overlay.pending_overlay.heap_size_of_children(); mem += overlay.pending_overlay.heap_size_of_children();
mem += overlay.journal.heap_size_of_children(); mem += overlay.journal.heap_size_of_children();
mem mem
} }
fn journal_size(&self) -> usize {
self.journal_overlay.read().cumulative_size
}
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none() self.backing.get(self.column, &LATEST_ERA_KEY).expect("Low level database error").is_none()
} }
@ -256,7 +274,13 @@ impl JournalDB for OverlayRecentDB {
r.begin_list(2); r.begin_list(2);
r.append(&k); r.append(&k);
r.append(&&*v); r.append(&&*v);
journal_overlay.backing_overlay.emplace(to_short_key(&k), v);
let short_key = to_short_key(&k);
if !journal_overlay.backing_overlay.contains(&short_key) {
journal_overlay.cumulative_size += v.len();
}
journal_overlay.backing_overlay.emplace(short_key, v);
} }
r.append(&removed_keys); r.append(&removed_keys);
@ -267,6 +291,7 @@ impl JournalDB for OverlayRecentDB {
k.append(&&PADDING[..]); k.append(&&PADDING[..]);
batch.put_vec(self.column, &k.drain(), r.out()); batch.put_vec(self.column, &k.drain(), r.out());
if journal_overlay.latest_era.map_or(true, |e| now > e) { if journal_overlay.latest_era.map_or(true, |e| now > e) {
trace!(target: "journaldb", "Set latest era to {}", now);
batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec()); batch.put_vec(self.column, &LATEST_ERA_KEY, encode(&now).to_vec());
journal_overlay.latest_era = Some(now); journal_overlay.latest_era = Some(now);
} }
@ -322,7 +347,9 @@ impl JournalDB for OverlayRecentDB {
} }
// update the overlay // update the overlay
for k in overlay_deletions { for k in overlay_deletions {
journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)); if let Some(val) = journal_overlay.backing_overlay.remove_and_purge(&to_short_key(&k)) {
journal_overlay.cumulative_size -= val.len();
}
} }
// apply canon deletions // apply canon deletions
for k in canon_deletions { for k in canon_deletions {
@ -332,6 +359,10 @@ impl JournalDB for OverlayRecentDB {
} }
} }
journal_overlay.journal.remove(&end_era); journal_overlay.journal.remove(&end_era);
if !journal_overlay.journal.is_empty() {
trace!(target: "journaldb", "Set earliest_era to {}", end_era + 1);
journal_overlay.earliest_era = Some(end_era + 1);
}
Ok(ops as u32) Ok(ops as u32)
} }

View File

@ -29,6 +29,11 @@ pub trait JournalDB: HashDB {
/// Returns heap memory size used /// Returns heap memory size used
fn mem_used(&self) -> usize; fn mem_used(&self) -> usize;
/// Returns the size of journalled state in memory.
/// This function has a considerable speed requirement --
/// it must be fast enough to call several times per block imported.
fn journal_size(&self) -> usize { 0 }
/// Check if this database has any commits /// Check if this database has any commits
fn is_empty(&self) -> bool; fn is_empty(&self) -> bool;

View File

@ -133,19 +133,22 @@ impl MemoryDB {
} }
/// Remove an element and delete it from storage if reference count reaches zero. /// Remove an element and delete it from storage if reference count reaches zero.
pub fn remove_and_purge(&mut self, key: &H256) { /// If the value was purged, return the old value.
pub fn remove_and_purge(&mut self, key: &H256) -> Option<DBValue> {
if key == &SHA3_NULL_RLP { if key == &SHA3_NULL_RLP {
return; return None;
} }
match self.data.entry(key.clone()) { match self.data.entry(key.clone()) {
Entry::Occupied(mut entry) => Entry::Occupied(mut entry) =>
if entry.get().1 == 1 { if entry.get().1 == 1 {
entry.remove(); Some(entry.remove().0)
} else { } else {
entry.get_mut().1 -= 1; entry.get_mut().1 -= 1;
None
}, },
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
entry.insert((DBValue::new(), -1)); entry.insert((DBValue::new(), -1));
None
} }
} }
} }
@ -265,13 +268,14 @@ fn memorydb_remove_and_purge() {
assert_eq!(m.raw(&hello_key), None); assert_eq!(m.raw(&hello_key), None);
let mut m = MemoryDB::new(); let mut m = MemoryDB::new();
m.remove_and_purge(&hello_key); assert!(m.remove_and_purge(&hello_key).is_none());
assert_eq!(m.raw(&hello_key).unwrap().1, -1); assert_eq!(m.raw(&hello_key).unwrap().1, -1);
m.insert(hello_bytes); m.insert(hello_bytes);
m.insert(hello_bytes); m.insert(hello_bytes);
assert_eq!(m.raw(&hello_key).unwrap().1, 1); assert_eq!(m.raw(&hello_key).unwrap().1, 1);
m.remove_and_purge(&hello_key); assert_eq!(&*m.remove_and_purge(&hello_key).unwrap(), hello_bytes);
assert_eq!(m.raw(&hello_key), None); assert_eq!(m.raw(&hello_key), None);
assert!(m.remove_and_purge(&hello_key).is_none());
} }
#[test] #[test]