Canonical state cache (master) (#2311)

* State cache

* Reduced copying data between caches

Whitespace and optional symbols

* Reduced copying data between caches

Whitespace and optional symbols

* Set a limit on storage cache

* Style and docs
This commit is contained in:
Arkadiy Paronyan
2016-09-27 18:02:11 +02:00
committed by GitHub
parent 9d4bee4922
commit ad63780b4d
20 changed files with 701 additions and 210 deletions

View File

@@ -23,7 +23,7 @@ use time::precise_time_ns;
// util
use util::{Bytes, PerfTimer, Itertools, Mutex, RwLock};
use util::journaldb::{self, JournalDB};
use util::journaldb;
use util::{U256, H256, Address, H2048, Uint};
use util::TrieFactory;
use util::kvdb::*;
@@ -65,7 +65,7 @@ use miner::{Miner, MinerService};
use snapshot::{self, io as snapshot_io};
use factory::Factories;
use rlp::{View, UntrustedRlp};
use state_db::StateDB;
// re-export
pub use types::blockchain_info::BlockChainInfo;
@@ -125,9 +125,9 @@ pub struct Client {
tracedb: RwLock<TraceDB<BlockChain>>,
engine: Arc<Engine>,
config: ClientConfig,
db: RwLock<Arc<Database>>,
pruning: journaldb::Algorithm,
state_db: RwLock<Box<JournalDB>>,
db: RwLock<Arc<Database>>,
state_db: Mutex<StateDB>,
block_queue: BlockQueue,
report: RwLock<ClientReport>,
import_lock: Mutex<()>,
@@ -171,14 +171,15 @@ impl Client {
let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone()));
let tracedb = RwLock::new(TraceDB::new(config.tracing.clone(), db.clone(), chain.clone()));
let mut state_db = journaldb::new(db.clone(), config.pruning, ::db::COL_STATE);
if state_db.is_empty() && try!(spec.ensure_db_good(state_db.as_hashdb_mut())) {
let journal_db = journaldb::new(db.clone(), config.pruning, ::db::COL_STATE);
let mut state_db = StateDB::new(journal_db);
if state_db.journal_db().is_empty() && try!(spec.ensure_db_good(&mut state_db)) {
let mut batch = DBTransaction::new(&db);
try!(state_db.commit(&mut batch, 0, &spec.genesis_header().hash(), None));
try!(db.write(batch).map_err(ClientError::Database));
}
if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.contains(h.state_root())) {
if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) {
warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex());
}
@@ -207,7 +208,7 @@ impl Client {
verifier: verification::new(config.verifier_type.clone()),
config: config,
db: RwLock::new(db),
state_db: RwLock::new(state_db),
state_db: Mutex::new(state_db),
block_queue: block_queue,
report: RwLock::new(Default::default()),
import_lock: Mutex::new(()),
@@ -298,7 +299,8 @@ impl Client {
// Enact Verified Block
let parent = chain_has_parent.unwrap();
let last_hashes = self.build_last_hashes(header.parent_hash().clone());
let db = self.state_db.read().boxed_clone();
let is_canon = header.parent_hash() == &chain.best_block_hash();
let db = if is_canon { self.state_db.lock().boxed_clone_canon() } else { self.state_db.lock().boxed_clone() };
let enact_result = enact_verified(block, engine, self.tracedb.read().tracing_enabled(), db, &parent, last_hashes, self.factories.clone());
if let Err(e) = enact_result {
@@ -441,7 +443,8 @@ impl Client {
// CHECK! I *think* this is fine, even if the state_root is equal to another
// already-imported block of the same number.
// TODO: Prove it with a test.
block.drain().commit(&mut batch, number, hash, ancient).expect("DB commit failed.");
let mut state = block.drain();
state.commit(&mut batch, number, hash, ancient).expect("DB commit failed.");
let route = chain.insert_block(&mut batch, block_data, receipts);
self.tracedb.read().import(&mut batch, TraceImportRequest {
@@ -454,7 +457,6 @@ impl Client {
// Final commit to the DB
self.db.read().write_buffered(batch);
chain.commit();
self.update_last_hashes(&parent, hash);
route
}
@@ -496,7 +498,7 @@ impl Client {
};
self.block_header(id).and_then(|header| {
let db = self.state_db.read().boxed_clone();
let db = self.state_db.lock().boxed_clone();
// early exit for pruned blocks
if db.is_pruned() && self.chain.read().best_block_number() >= block_number + HISTORY {
@@ -527,7 +529,7 @@ impl Client {
/// Get a copy of the best block's state.
pub fn state(&self) -> State {
State::from_existing(
self.state_db.read().boxed_clone(),
self.state_db.lock().boxed_clone(),
HeaderView::new(&self.best_block_header()).state_root(),
self.engine.account_start_nonce(),
self.factories.clone())
@@ -542,7 +544,7 @@ impl Client {
/// Get the report.
pub fn report(&self) -> ClientReport {
let mut report = self.report.read().clone();
report.state_db_mem = self.state_db.read().mem_used();
report.state_db_mem = self.state_db.lock().mem_used();
report
}
@@ -598,7 +600,7 @@ impl Client {
/// Take a snapshot at the given block.
/// If the ID given is "latest", this will default to 1000 blocks behind.
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockID, p: &snapshot::Progress) -> Result<(), EthcoreError> {
let db = self.state_db.read().boxed_clone();
let db = self.state_db.lock().journal_db().boxed_clone();
let best_block_number = self.chain_info().best_block_number;
let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at)));
@@ -677,14 +679,14 @@ impl snapshot::DatabaseRestore for Client {
trace!(target: "snapshot", "Replacing client database with {:?}", new_db);
let _import_lock = self.import_lock.lock();
let mut state_db = self.state_db.write();
let mut state_db = self.state_db.lock();
let mut chain = self.chain.write();
let mut tracedb = self.tracedb.write();
self.miner.clear();
let db = self.db.write();
try!(db.restore(new_db));
*state_db = journaldb::new(db.clone(), self.pruning, ::db::COL_STATE);
*state_db = StateDB::new(journaldb::new(db.clone(), self.pruning, ::db::COL_STATE));
*chain = Arc::new(BlockChain::new(self.config.blockchain.clone(), &[], db.clone()));
*tracedb = TraceDB::new(self.config.tracing.clone(), db.clone(), chain.clone());
Ok(())
@@ -908,7 +910,7 @@ impl BlockChainClient for Client {
}
fn state_data(&self, hash: &H256) -> Option<Bytes> {
self.state_db.read().state(hash)
self.state_db.lock().journal_db().state(hash)
}
fn block_receipts(&self, hash: &H256) -> Option<Bytes> {
@@ -1050,7 +1052,7 @@ impl MiningBlockChainClient for Client {
engine,
self.factories.clone(),
false, // TODO: this will need to be parameterised once we want to do immediate mining insertion.
self.state_db.read().boxed_clone(),
self.state_db.lock().boxed_clone(),
&chain.block_header(&h).expect("h is best block hash: so its header must exist: qed"),
self.build_last_hashes(h.clone()),
author,

View File

@@ -42,6 +42,7 @@ use block::{OpenBlock, SealedBlock};
use executive::Executed;
use error::CallError;
use trace::LocalizedTrace;
use state_db::StateDB;
/// Test client.
pub struct TestBlockChainClient {
@@ -283,13 +284,14 @@ impl TestBlockChainClient {
}
}
pub fn get_temp_journal_db() -> GuardedTempResult<Box<JournalDB>> {
pub fn get_temp_state_db() -> GuardedTempResult<StateDB> {
let temp = RandomTempPath::new();
let db = Database::open_default(temp.as_str()).unwrap();
let journal_db = journaldb::new(Arc::new(db), journaldb::Algorithm::EarlyMerge, None);
let state_db = StateDB::new(journal_db);
GuardedTempResult {
_temp: temp,
result: Some(journal_db)
result: Some(state_db)
}
}
@@ -297,9 +299,9 @@ impl MiningBlockChainClient for TestBlockChainClient {
fn prepare_open_block(&self, author: Address, gas_range_target: (U256, U256), extra_data: Bytes) -> OpenBlock {
let engine = &*self.spec.engine;
let genesis_header = self.spec.genesis_header();
let mut db_result = get_temp_journal_db();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
self.spec.ensure_db_good(db.as_hashdb_mut()).unwrap();
self.spec.ensure_db_good(&mut db).unwrap();
let last_hashes = vec![genesis_header.hash()];
let mut open_block = OpenBlock::new(