From d013a13be66c0e259c069771b083b12a6832d827 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 21 Mar 2017 19:45:52 +0100 Subject: [PATCH] header_chain writes to database --- ethcore/light/src/client/header_chain.rs | 236 +++++++++++++++++++---- ethcore/light/src/client/mod.rs | 9 +- 2 files changed, 207 insertions(+), 38 deletions(-) diff --git a/ethcore/light/src/client/header_chain.rs b/ethcore/light/src/client/header_chain.rs index 9dcd25888..676142b17 100644 --- a/ethcore/light/src/client/header_chain.rs +++ b/ethcore/light/src/client/header_chain.rs @@ -23,9 +23,9 @@ //! This is separate from the `BlockChain` for two reasons: //! - It stores only headers (and a pruned subset of them) //! - To allow for flexibility in the database layout once that's incorporated. -// TODO: use DB instead of memory. DB Layout: just the contents of `candidates`/`headers` use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use cht; @@ -34,7 +34,10 @@ use ethcore::error::BlockError; use ethcore::encoded; use ethcore::header::Header; use ethcore::ids::BlockId; -use util::{H256, U256, HeapSizeOf, Mutex, RwLock}; + +use rlp::{Encodable, Decodable, Decoder, DecoderError, RlpStream, View}; +use util::{H256, U256, HeapSizeOf, RwLock}; +use util::kvdb::{DBTransaction, KeyValueDB}; use smallvec::SmallVec; @@ -43,6 +46,9 @@ use smallvec::SmallVec; /// relevant to any blocks we've got in memory. const HISTORY: u64 = 2048; +/// The best block key. Maps to a `u64` best block number. +const BEST_KEY: &'static [u8] = &*b"best_block_key"; + /// Information about a block. #[derive(Debug, Clone)] pub struct BlockDescriptor { @@ -75,39 +81,130 @@ impl HeapSizeOf for Entry { } } +impl Encodable for Entry { + fn rlp_append(&self, s: &mut RlpStream) { + s.begin_list(self.candidates.len()); + + for candidate in &self.candidates { + s.begin_list(3) + .append(&candidate.hash) + .append(&candidate.parent_hash) + .append(&candidate.total_difficulty); + } + } +} + +impl Decodable for Entry { + fn decode(decoder: &D) -> Result { + let rlp = decoder.as_rlp(); + + let mut candidates = SmallVec::<[Candidate; 3]>::new(); + + for item in rlp.iter() { + candidates.push(Candidate { + hash: item.val_at(0)?, + parent_hash: item.val_at(1)?, + total_difficulty: item.val_at(2)?, + }) + } + + if candidates.is_empty() { return Err(DecoderError::Custom("Empty candidates vector submitted.")) } + + // rely on the invariant that the canonical entry is always first. + let canon_hash = candidates[0].hash; + Ok(Entry { + candidates: candidates, + canonical_hash: canon_hash, + }) + } +} + +fn cht_key(number: u64) -> String { + format!("canonical_{}", number) +} + +fn era_key(number: u64) -> String { + format!("candidates_{}", number) +} + /// Header chain. See module docs for more details. pub struct HeaderChain { genesis_header: encoded::Header, // special-case the genesis. candidates: RwLock>, headers: RwLock>, best_block: RwLock, - cht_roots: Mutex>, + db: Arc, + col: Option, } impl HeaderChain { - /// Create a new header chain given this genesis block. - pub fn new(genesis: &[u8]) -> Self { + /// Create a new header chain given this genesis block and database to read from. + pub fn new(db: Arc, col: Option, genesis: &[u8]) -> Result { use ethcore::views::HeaderView; - let g_view = HeaderView::new(genesis); + let chain = if let Some(best_number) = db.get(col, BEST_KEY)?.map(|x| ::rlp::decode(&x)) { + let mut cur_number = best_number; + let mut candidates = BTreeMap::new(); + let mut headers = HashMap::new(); - HeaderChain { - genesis_header: encoded::Header::new(genesis.to_owned()), - best_block: RwLock::new(BlockDescriptor { - hash: g_view.hash(), - number: 0, - total_difficulty: g_view.difficulty(), - }), - candidates: RwLock::new(BTreeMap::new()), - headers: RwLock::new(HashMap::new()), - cht_roots: Mutex::new(Vec::new()), - } + // load all era entries and referenced headers within them. + while let Some(entry) = db.get(col, era_key(cur_number).as_bytes())? { + let entry: Entry = ::rlp::decode(&entry); + for candidate in &entry.candidates { + match db.get(col, &*candidate.hash)? { + Some(hdr) => headers.insert(candidate.hash, encoded::Header::new(hdr.to_vec())), + None => return Err(format!("Database missing referenced header: {}", candidate.hash)), + }; + } + candidates.insert(cur_number, entry); + + cur_number -= 1; + } + + // fill best block block descriptor. + if candidates.is_empty() { return Err(format!("Database corrupt: best block referenced but no data.")) } + let best_block = { + let era = candidates.get(&best_number) + .expect("candidates non-empty; filled in loop starting at best_number; qed"); + let best = &era.candidates[0]; + BlockDescriptor { + hash: best.hash, + number: best_number, + total_difficulty: best.total_difficulty, + } + }; + + HeaderChain { + genesis_header: encoded::Header::new(genesis.to_owned()), + best_block: RwLock::new(best_block), + candidates: RwLock::new(candidates), + headers: RwLock::new(headers), + db: db, + col: col, + } + } else { + let g_view = HeaderView::new(genesis); + HeaderChain { + genesis_header: encoded::Header::new(genesis.to_owned()), + best_block: RwLock::new(BlockDescriptor { + hash: g_view.hash(), + number: 0, + total_difficulty: g_view.difficulty(), + }), + candidates: RwLock::new(BTreeMap::new()), + headers: RwLock::new(HashMap::new()), + db: db, + col: col, + } + }; + + Ok(chain) } /// Insert a pre-verified header. /// /// This blindly trusts that the data given to it is sensible. - pub fn insert(&self, header: Header) -> Result<(), BlockError> { + pub fn insert(&self, transaction: &mut DBTransaction, header: Header) -> Result<(), BlockError> { let hash = header.hash(); let number = header.number(); let parent_hash = *header.parent_hash(); @@ -129,15 +226,19 @@ impl HeaderChain { let total_difficulty = parent_td + *header.difficulty(); // insert headers and candidates entries. - candidates.entry(number).or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash }) - .candidates.push(Candidate { + { + let cur_era = candidates.entry(number) + .or_insert_with(|| Entry { candidates: SmallVec::new(), canonical_hash: hash }); + cur_era.candidates.push(Candidate { hash: hash, parent_hash: parent_hash, total_difficulty: total_difficulty, - }); + }); + } - let raw = ::rlp::encode(&header).to_vec(); - self.headers.write().insert(hash, encoded::Header::new(raw)); + let raw = ::rlp::encode(&header); + transaction.put(self.col, &hash[..], &*raw); + self.headers.write().insert(hash, encoded::Header::new(raw.to_vec())); // reorganize ancestors so canonical entries are first in their // respective candidates vectors. @@ -160,6 +261,10 @@ impl HeaderChain { // what about reorgs > cht::SIZE + HISTORY? // resetting to the last block of a given CHT should be possible. canon_hash = entry.candidates[0].parent_hash; + + // write altered era to disk. + let rlp_era = ::rlp::encode(&*entry); + transaction.put(self.col, era_key(height).as_bytes(), &rlp_era); } trace!(target: "chain", "New best block: ({}, {}), TD {}", number, hash, total_difficulty); @@ -168,13 +273,13 @@ impl HeaderChain { number: number, total_difficulty: total_difficulty, }; + transaction.put(self.col, BEST_KEY, &*::rlp::encode(&number)); // produce next CHT root if it's time. let earliest_era = *candidates.keys().next().expect("at least one era just created; qed"); if earliest_era + HISTORY + cht::SIZE <= number { let cht_num = cht::block_to_cht_number(earliest_era) .expect("fails only for number == 0; genesis never imported; qed"); - debug_assert_eq!(cht_num as usize, self.cht_roots.lock().len()); let mut headers = self.headers.write(); @@ -186,10 +291,13 @@ impl HeaderChain { let iter = || { let era_entry = candidates.remove(&i) .expect("all eras are sequential with no gaps; qed"); + transaction.delete(self.col, era_key(i).as_bytes()); + i += 1; for ancient in &era_entry.candidates { headers.remove(&ancient.hash); + transaction.delete(self.col, &ancient.hash); } let canon = &era_entry.candidates[0]; @@ -199,9 +307,9 @@ impl HeaderChain { .expect("fails only when too few items; this is checked; qed") }; + // write the CHT root to the database. debug!(target: "chain", "Produced CHT {} root: {:?}", cht_num, cht_root); - - self.cht_roots.lock().push(cht_root); + transaction.put(self.col, cht_key(cht_num).as_bytes(), &::rlp::encode(&cht_root)); } } @@ -257,7 +365,13 @@ impl HeaderChain { /// This is because it's assumed that the genesis hash is known, /// so including it within a CHT would be redundant. pub fn cht_root(&self, n: usize) -> Option { - self.cht_roots.lock().get(n).map(|h| h.clone()) + match self.db.get(self.col, cht_key(n as u64).as_bytes()) { + Ok(val) => val.map(|x| ::rlp::decode(&x)), + Err(e) => { + warn!(target: "chain", "Error reading from database: {}", e); + None + } + } } /// Get the genesis hash. @@ -297,8 +411,7 @@ impl HeaderChain { impl HeapSizeOf for HeaderChain { fn heap_size_of_children(&self) -> usize { self.candidates.read().heap_size_of_children() + - self.headers.read().heap_size_of_children() + - self.cht_roots.lock().heap_size_of_children() + self.headers.read().heap_size_of_children() } } @@ -324,16 +437,23 @@ impl<'a> Iterator for AncestryIter<'a> { #[cfg(test)] mod tests { use super::HeaderChain; + use std::sync::Arc; + use ethcore::ids::BlockId; use ethcore::header::Header; use ethcore::spec::Spec; + fn make_db() -> Arc<::util::KeyValueDB> { + Arc::new(::util::kvdb::in_memory(0)) + } + #[test] fn basic_chain() { let spec = Spec::new_test(); let genesis_header = spec.genesis_header(); + let db = make_db(); - let chain = HeaderChain::new(&::rlp::encode(&genesis_header)); + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); let mut parent_hash = genesis_header.hash(); let mut rolling_timestamp = genesis_header.timestamp(); @@ -345,7 +465,9 @@ mod tests { header.set_difficulty(*genesis_header.difficulty() * i.into()); parent_hash = header.hash(); - chain.insert(header).unwrap(); + let mut tx = db.transaction(); + chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); rolling_timestamp += 10; } @@ -361,7 +483,8 @@ mod tests { let spec = Spec::new_test(); let genesis_header = spec.genesis_header(); - let chain = HeaderChain::new(&::rlp::encode(&genesis_header)); + let db = make_db(); + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); let mut parent_hash = genesis_header.hash(); let mut rolling_timestamp = genesis_header.timestamp(); @@ -373,7 +496,9 @@ mod tests { header.set_difficulty(*genesis_header.difficulty() * i.into()); parent_hash = header.hash(); - chain.insert(header).unwrap(); + let mut tx = db.transaction(); + chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); rolling_timestamp += 10; } @@ -389,7 +514,9 @@ mod tests { header.set_difficulty(*genesis_header.difficulty() * i.into()); parent_hash = header.hash(); - chain.insert(header).unwrap(); + let mut tx = db.transaction(); + chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); rolling_timestamp += 10; } @@ -410,7 +537,9 @@ mod tests { header.set_difficulty(*genesis_header.difficulty() * (i * i).into()); parent_hash = header.hash(); - chain.insert(header).unwrap(); + let mut tx = db.transaction(); + chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); rolling_timestamp += 11; } @@ -432,11 +561,46 @@ mod tests { fn earliest_is_latest() { let spec = Spec::new_test(); let genesis_header = spec.genesis_header(); + let db = make_db(); - let chain = HeaderChain::new(&::rlp::encode(&genesis_header)); + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); assert!(chain.block_header(BlockId::Earliest).is_some()); assert!(chain.block_header(BlockId::Latest).is_some()); assert!(chain.block_header(BlockId::Pending).is_some()); } + + #[test] + fn restore_from_db() { + let spec = Spec::new_test(); + let genesis_header = spec.genesis_header(); + let db = make_db(); + + { + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); + let mut parent_hash = genesis_header.hash(); + let mut rolling_timestamp = genesis_header.timestamp(); + for i in 1..10000 { + let mut header = Header::new(); + header.set_parent_hash(parent_hash); + header.set_number(i); + header.set_timestamp(rolling_timestamp); + header.set_difficulty(*genesis_header.difficulty() * i.into()); + parent_hash = header.hash(); + + let mut tx = db.transaction(); + chain.insert(&mut tx, header).unwrap(); + db.write(tx).unwrap(); + + rolling_timestamp += 10; + } + } + + let chain = HeaderChain::new(db.clone(), None, &::rlp::encode(&genesis_header)).unwrap(); + assert!(chain.block_header(BlockId::Number(10)).is_none()); + assert!(chain.block_header(BlockId::Number(9000)).is_some()); + assert!(chain.cht_root(2).is_some()); + assert!(chain.cht_root(3).is_none()); + assert_eq!(chain.block_header(BlockId::Latest).unwrap().number(), 9999); + } } diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index c791caed1..23242f407 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -111,10 +111,14 @@ pub struct Client { impl Client { /// Create a new `Client`. pub fn new(config: Config, spec: &Spec, io_channel: IoChannel) -> Self { + // TODO: use real DB. + let db = ::util::kvdb::in_memory(0); + let gh = ::rlp::encode(&spec.genesis_header()); + Client { queue: HeaderQueue::new(config.queue, spec.engine.clone(), io_channel, true), engine: spec.engine.clone(), - chain: HeaderChain::new(&::rlp::encode(&spec.genesis_header())), + chain: HeaderChain::new(Arc::new(db), None, &gh).expect("new db every time"), report: RwLock::new(ClientReport::default()), import_lock: Mutex::new(()), } @@ -201,7 +205,8 @@ impl Client { for verified_header in self.queue.drain(MAX) { let (num, hash) = (verified_header.number(), verified_header.hash()); - match self.chain.insert(verified_header) { + let mut tx = unimplemented!(); + match self.chain.insert(&mut tx, verified_header) { Ok(()) => { good.push(hash); self.report.write().blocks_imported += 1;