apply pending changes to chain after DB commit

This commit is contained in:
Robert Habermeier 2017-03-23 04:00:22 +01:00
parent b96eb45877
commit 0d110ed47c
2 changed files with 31 additions and 8 deletions

View File

@ -126,6 +126,11 @@ fn era_key(number: u64) -> String {
format!("candidates_{}", number) format!("candidates_{}", number)
} }
/// Pending changes from `insert` to be applied after the database write has finished.
pub struct PendingChanges {
best_block: Option<BlockDescriptor>, // new best block.
}
/// Header chain. See module docs for more details. /// Header chain. See module docs for more details.
pub struct HeaderChain { pub struct HeaderChain {
genesis_header: encoded::Header, // special-case the genesis. genesis_header: encoded::Header, // special-case the genesis.
@ -203,10 +208,15 @@ impl HeaderChain {
/// Insert a pre-verified header. /// Insert a pre-verified header.
/// ///
/// This blindly trusts that the data given to it is sensible. /// This blindly trusts that the data given to it is sensible.
pub fn insert(&self, transaction: &mut DBTransaction, header: Header) -> Result<(), BlockError> { /// Returns a set of pending changes to be applied with `apply_pending`
/// before the next call to insert and after the transaction has been written.
pub fn insert(&self, transaction: &mut DBTransaction, header: Header) -> Result<PendingChanges, BlockError> {
let hash = header.hash(); let hash = header.hash();
let number = header.number(); let number = header.number();
let parent_hash = *header.parent_hash(); let parent_hash = *header.parent_hash();
let mut pending = PendingChanges {
best_block: None,
};
// hold candidates the whole time to guard import order. // hold candidates the whole time to guard import order.
let mut candidates = self.candidates.write(); let mut candidates = self.candidates.write();
@ -286,11 +296,11 @@ impl HeaderChain {
} }
trace!(target: "chain", "New best block: ({}, {}), TD {}", number, hash, total_difficulty); trace!(target: "chain", "New best block: ({}, {}), TD {}", number, hash, total_difficulty);
*self.best_block.write() = BlockDescriptor { pending.best_block = Some(BlockDescriptor {
hash: hash, hash: hash,
number: number, number: number,
total_difficulty: total_difficulty, total_difficulty: total_difficulty,
}; });
// produce next CHT root if it's time. // produce next CHT root if it's time.
let earliest_era = *candidates.keys().next().expect("at least one era just created; qed"); let earliest_era = *candidates.keys().next().expect("at least one era just created; qed");
@ -334,7 +344,15 @@ impl HeaderChain {
stream.append(&best_num).append(&latest_num); stream.append(&best_num).append(&latest_num);
transaction.put(self.col, CURRENT_KEY, &stream.out()) transaction.put(self.col, CURRENT_KEY, &stream.out())
} }
Ok(()) Ok(pending)
}
/// Apply pending changes from a previous `insert` operation.
/// Must be done before the next `insert` call.
pub fn apply_pending(&self, pending: PendingChanges) {
if let Some(best_block) = pending.best_block {
*self.best_block.write() = best_block;
}
} }
/// Get a block header. In the case of query by number, only canonical blocks /// Get a block header. In the case of query by number, only canonical blocks
@ -360,6 +378,9 @@ impl HeaderChain {
.and_then(load_from_db) .and_then(load_from_db)
} }
BlockId::Latest | BlockId::Pending => { BlockId::Latest | BlockId::Pending => {
// hold candidates hear to prevent deletion of the header
// as we read it.
let _candidates = self.candidates.read();
let hash = { let hash = {
let best = self.best_block.read(); let best = self.best_block.read();
if best.number == 0 { if best.number == 0 {

View File

@ -223,18 +223,20 @@ impl Client {
let (num, hash) = (verified_header.number(), verified_header.hash()); let (num, hash) = (verified_header.number(), verified_header.hash());
let mut tx = self.db.transaction(); let mut tx = self.db.transaction();
match self.chain.insert(&mut tx, verified_header) { let pending = match self.chain.insert(&mut tx, verified_header) {
Ok(()) => { Ok(pending) => {
good.push(hash); good.push(hash);
self.report.write().blocks_imported += 1; self.report.write().blocks_imported += 1;
pending
} }
Err(e) => { Err(e) => {
debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e); debug!(target: "client", "Error importing header {:?}: {}", (num, hash), e);
bad.push(hash); bad.push(hash);
break;
} }
} };
self.db.write_buffered(tx); self.db.write_buffered(tx);
self.chain.apply_pending(pending);
if let Err(e) = self.db.flush() { if let Err(e) = self.db.flush() {
panic!("Database flush failed: {}. Check disk health and space.", e); panic!("Database flush failed: {}. Check disk health and space.", e);
} }