Merge pull request #180 from gavofyork/cacheman
Memory management for cache
This commit is contained in:
commit
3e7fc36357
@ -236,7 +236,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn storage_at() {
|
fn storage_at() {
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = MemoryDB::new();
|
||||||
let rlp = {
|
let rlp = {
|
||||||
let mut a = Account::new_contract(U256::from(69u8));
|
let mut a = Account::new_contract(U256::from(69u8));
|
||||||
a.set_storage(H256::from(&U256::from(0x00u64)), H256::from(&U256::from(0x1234u64)));
|
a.set_storage(H256::from(&U256::from(0x00u64)), H256::from(&U256::from(0x1234u64)));
|
||||||
@ -254,7 +254,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn note_code() {
|
fn note_code() {
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = MemoryDB::new();
|
||||||
|
|
||||||
let rlp = {
|
let rlp = {
|
||||||
let mut a = Account::new_contract(U256::from(69u8));
|
let mut a = Account::new_contract(U256::from(69u8));
|
||||||
@ -273,7 +273,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn commit_storage() {
|
fn commit_storage() {
|
||||||
let mut a = Account::new_contract(U256::from(69u8));
|
let mut a = Account::new_contract(U256::from(69u8));
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = MemoryDB::new();
|
||||||
a.set_storage(x!(0), x!(0x1234));
|
a.set_storage(x!(0), x!(0x1234));
|
||||||
assert_eq!(a.storage_root(), None);
|
assert_eq!(a.storage_root(), None);
|
||||||
a.commit_storage(&mut db);
|
a.commit_storage(&mut db);
|
||||||
@ -283,7 +283,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn commit_remove_commit_storage() {
|
fn commit_remove_commit_storage() {
|
||||||
let mut a = Account::new_contract(U256::from(69u8));
|
let mut a = Account::new_contract(U256::from(69u8));
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = MemoryDB::new();
|
||||||
a.set_storage(x!(0), x!(0x1234));
|
a.set_storage(x!(0), x!(0x1234));
|
||||||
a.commit_storage(&mut db);
|
a.commit_storage(&mut db);
|
||||||
a.set_storage(x!(1), x!(0x1234));
|
a.set_storage(x!(1), x!(0x1234));
|
||||||
@ -296,7 +296,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn commit_code() {
|
fn commit_code() {
|
||||||
let mut a = Account::new_contract(U256::from(69u8));
|
let mut a = Account::new_contract(U256::from(69u8));
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = MemoryDB::new();
|
||||||
a.init_code(vec![0x55, 0x44, 0xffu8]);
|
a.init_code(vec![0x55, 0x44, 0xffu8]);
|
||||||
assert_eq!(a.code_hash(), SHA3_EMPTY);
|
assert_eq!(a.code_hash(), SHA3_EMPTY);
|
||||||
a.commit_code(&mut db);
|
a.commit_code(&mut db);
|
||||||
|
@ -12,6 +12,7 @@ use util::*;
|
|||||||
use ethcore::client::*;
|
use ethcore::client::*;
|
||||||
use ethcore::service::ClientService;
|
use ethcore::service::ClientService;
|
||||||
use ethcore::ethereum;
|
use ethcore::ethereum;
|
||||||
|
use ethcore::blockchain::CacheSize;
|
||||||
use ethcore::sync::*;
|
use ethcore::sync::*;
|
||||||
|
|
||||||
fn setup_log() {
|
fn setup_log() {
|
||||||
@ -29,7 +30,7 @@ fn main() {
|
|||||||
setup_log();
|
setup_log();
|
||||||
let spec = ethereum::new_frontier();
|
let spec = ethereum::new_frontier();
|
||||||
let mut service = ClientService::start(spec).unwrap();
|
let mut service = ClientService::start(spec).unwrap();
|
||||||
let io_handler = Box::new(ClientIoHandler { client: service.client(), timer: 0 });
|
let io_handler = Box::new(ClientIoHandler { client: service.client(), timer: 0, info: Default::default() });
|
||||||
service.io().register_handler(io_handler).expect("Error registering IO handler");
|
service.io().register_handler(io_handler).expect("Error registering IO handler");
|
||||||
loop {
|
loop {
|
||||||
let mut cmd = String::new();
|
let mut cmd = String::new();
|
||||||
@ -40,10 +41,47 @@ fn main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
struct Informant {
|
||||||
|
chain_info: Option<BlockChainInfo>,
|
||||||
|
cache_info: Option<CacheSize>,
|
||||||
|
report: Option<ClientReport>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Informant {
|
||||||
|
pub fn tick(&mut self, client: &Client) {
|
||||||
|
// 5 seconds betwen calls. TODO: calculate this properly.
|
||||||
|
let dur = 5usize;
|
||||||
|
|
||||||
|
let chain_info = client.chain_info();
|
||||||
|
let cache_info = client.cache_info();
|
||||||
|
let report = client.report();
|
||||||
|
|
||||||
|
if let (_, &Some(ref last_cache_info), &Some(ref last_report)) = (&self.chain_info, &self.cache_info, &self.report) {
|
||||||
|
println!("[ {} {} ]---[ {} blk/s | {} tx/s | {} gas/s //···{}···// {} ({}) bl {} ({}) ex ]",
|
||||||
|
chain_info.best_block_number,
|
||||||
|
chain_info.best_block_hash,
|
||||||
|
(report.blocks_imported - last_report.blocks_imported) / dur,
|
||||||
|
(report.transactions_applied - last_report.transactions_applied) / dur,
|
||||||
|
(report.gas_processed - last_report.gas_processed) / From::from(dur),
|
||||||
|
0, // TODO: peers
|
||||||
|
cache_info.blocks,
|
||||||
|
cache_info.blocks as isize - last_cache_info.blocks as isize,
|
||||||
|
cache_info.block_details,
|
||||||
|
cache_info.block_details as isize - last_cache_info.block_details as isize
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.chain_info = Some(chain_info);
|
||||||
|
self.cache_info = Some(cache_info);
|
||||||
|
self.report = Some(report);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct ClientIoHandler {
|
struct ClientIoHandler {
|
||||||
client: Arc<RwLock<Client>>,
|
client: Arc<RwLock<Client>>,
|
||||||
timer: TimerToken,
|
timer: TimerToken,
|
||||||
|
info: Informant,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IoHandler<NetSyncMessage> for ClientIoHandler {
|
impl IoHandler<NetSyncMessage> for ClientIoHandler {
|
||||||
@ -53,7 +91,9 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
|
|||||||
|
|
||||||
fn timeout<'s>(&'s mut self, _io: &mut IoContext<'s, NetSyncMessage>, timer: TimerToken) {
|
fn timeout<'s>(&'s mut self, _io: &mut IoContext<'s, NetSyncMessage>, timer: TimerToken) {
|
||||||
if self.timer == timer {
|
if self.timer == timer {
|
||||||
println!("Chain info: {:?}", self.client.read().unwrap().deref().chain_info());
|
let client = self.client.read().unwrap();
|
||||||
|
client.tick();
|
||||||
|
self.info.tick(client.deref());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
20
src/block.rs
20
src/block.rs
@ -104,7 +104,7 @@ pub struct SealedBlock {
|
|||||||
|
|
||||||
impl<'x, 'y> OpenBlock<'x, 'y> {
|
impl<'x, 'y> OpenBlock<'x, 'y> {
|
||||||
/// Create a new OpenBlock ready for transaction pushing.
|
/// Create a new OpenBlock ready for transaction pushing.
|
||||||
pub fn new<'a, 'b>(engine: &'a Engine, db: OverlayDB, parent: &Header, last_hashes: &'b LastHashes, author: Address, extra_data: Bytes) -> OpenBlock<'a, 'b> {
|
pub fn new<'a, 'b>(engine: &'a Engine, db: JournalDB, parent: &Header, last_hashes: &'b LastHashes, author: Address, extra_data: Bytes) -> OpenBlock<'a, 'b> {
|
||||||
let mut r = OpenBlock {
|
let mut r = OpenBlock {
|
||||||
block: Block::new(State::from_existing(db, parent.state_root().clone(), engine.account_start_nonce())),
|
block: Block::new(State::from_existing(db, parent.state_root().clone(), engine.account_start_nonce())),
|
||||||
engine: engine,
|
engine: engine,
|
||||||
@ -242,7 +242,7 @@ impl<'x, 'y> ClosedBlock<'x, 'y> {
|
|||||||
pub fn reopen(self) -> OpenBlock<'x, 'y> { self.open_block }
|
pub fn reopen(self) -> OpenBlock<'x, 'y> { self.open_block }
|
||||||
|
|
||||||
/// Drop this object and return the underlieing database.
|
/// Drop this object and return the underlieing database.
|
||||||
pub fn drain(self) -> OverlayDB { self.open_block.block.state.drop().1 }
|
pub fn drain(self) -> JournalDB { self.open_block.block.state.drop().1 }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SealedBlock {
|
impl SealedBlock {
|
||||||
@ -257,7 +257,7 @@ impl SealedBlock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Drop this object and return the underlieing database.
|
/// Drop this object and return the underlieing database.
|
||||||
pub fn drain(self) -> OverlayDB { self.block.state.drop().1 }
|
pub fn drain(self) -> JournalDB { self.block.state.drop().1 }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IsBlock for SealedBlock {
|
impl IsBlock for SealedBlock {
|
||||||
@ -265,7 +265,7 @@ impl IsBlock for SealedBlock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Enact the block given by block header, transactions and uncles
|
/// Enact the block given by block header, transactions and uncles
|
||||||
pub fn enact<'x, 'y>(header: &Header, transactions: &[Transaction], uncles: &[Header], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> {
|
pub fn enact<'x, 'y>(header: &Header, transactions: &[Transaction], uncles: &[Header], engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> {
|
||||||
{
|
{
|
||||||
let s = State::from_existing(db.clone(), parent.state_root().clone(), engine.account_start_nonce());
|
let s = State::from_existing(db.clone(), parent.state_root().clone(), engine.account_start_nonce());
|
||||||
trace!("enact(): root={}, author={}, author_balance={}\n", s.root(), header.author(), s.balance(&header.author()));
|
trace!("enact(): root={}, author={}, author_balance={}\n", s.root(), header.author(), s.balance(&header.author()));
|
||||||
@ -281,20 +281,20 @@ pub fn enact<'x, 'y>(header: &Header, transactions: &[Transaction], uncles: &[He
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header
|
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header
|
||||||
pub fn enact_bytes<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> {
|
pub fn enact_bytes<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> {
|
||||||
let block = BlockView::new(block_bytes);
|
let block = BlockView::new(block_bytes);
|
||||||
let header = block.header();
|
let header = block.header();
|
||||||
enact(&header, &block.transactions(), &block.uncles(), engine, db, parent, last_hashes)
|
enact(&header, &block.transactions(), &block.uncles(), engine, db, parent, last_hashes)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header
|
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header
|
||||||
pub fn enact_verified<'x, 'y>(block: &PreVerifiedBlock, engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> {
|
pub fn enact_verified<'x, 'y>(block: &PreVerifiedBlock, engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes) -> Result<ClosedBlock<'x, 'y>, Error> {
|
||||||
let view = BlockView::new(&block.bytes);
|
let view = BlockView::new(&block.bytes);
|
||||||
enact(&block.header, &block.transactions, &view.uncles(), engine, db, parent, last_hashes)
|
enact(&block.header, &block.transactions, &view.uncles(), engine, db, parent, last_hashes)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header. Seal the block aferwards
|
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header. Seal the block aferwards
|
||||||
pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: OverlayDB, parent: &Header, last_hashes: &LastHashes) -> Result<SealedBlock, Error> {
|
pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: JournalDB, parent: &Header, last_hashes: &LastHashes) -> Result<SealedBlock, Error> {
|
||||||
let header = BlockView::new(block_bytes).header_view();
|
let header = BlockView::new(block_bytes).header_view();
|
||||||
Ok(try!(try!(enact_bytes(block_bytes, engine, db, parent, last_hashes)).seal(header.seal())))
|
Ok(try!(try!(enact_bytes(block_bytes, engine, db, parent, last_hashes)).seal(header.seal())))
|
||||||
}
|
}
|
||||||
@ -304,7 +304,7 @@ fn open_block() {
|
|||||||
use spec::*;
|
use spec::*;
|
||||||
let engine = Spec::new_test().to_engine().unwrap();
|
let engine = Spec::new_test().to_engine().unwrap();
|
||||||
let genesis_header = engine.spec().genesis_header();
|
let genesis_header = engine.spec().genesis_header();
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = JournalDB::new_temp();
|
||||||
engine.spec().ensure_db_good(&mut db);
|
engine.spec().ensure_db_good(&mut db);
|
||||||
let last_hashes = vec![genesis_header.hash()];
|
let last_hashes = vec![genesis_header.hash()];
|
||||||
let b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]);
|
let b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]);
|
||||||
@ -318,13 +318,13 @@ fn enact_block() {
|
|||||||
let engine = Spec::new_test().to_engine().unwrap();
|
let engine = Spec::new_test().to_engine().unwrap();
|
||||||
let genesis_header = engine.spec().genesis_header();
|
let genesis_header = engine.spec().genesis_header();
|
||||||
|
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = JournalDB::new_temp();
|
||||||
engine.spec().ensure_db_good(&mut db);
|
engine.spec().ensure_db_good(&mut db);
|
||||||
let b = OpenBlock::new(engine.deref(), db, &genesis_header, &vec![genesis_header.hash()], Address::zero(), vec![]).close().seal(vec![]).unwrap();
|
let b = OpenBlock::new(engine.deref(), db, &genesis_header, &vec![genesis_header.hash()], Address::zero(), vec![]).close().seal(vec![]).unwrap();
|
||||||
let orig_bytes = b.rlp_bytes();
|
let orig_bytes = b.rlp_bytes();
|
||||||
let orig_db = b.drain();
|
let orig_db = b.drain();
|
||||||
|
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = JournalDB::new_temp();
|
||||||
engine.spec().ensure_db_good(&mut db);
|
engine.spec().ensure_db_good(&mut db);
|
||||||
let e = enact_and_seal(&orig_bytes, engine.deref(), db, &genesis_header, &vec![genesis_header.hash()]).unwrap();
|
let e = enact_and_seal(&orig_bytes, engine.deref(), db, &genesis_header, &vec![genesis_header.hash()]).unwrap();
|
||||||
|
|
||||||
|
@ -30,6 +30,11 @@ pub struct CacheSize {
|
|||||||
pub blocks_blooms: usize
|
pub blocks_blooms: usize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl CacheSize {
|
||||||
|
/// Total amount used by the cache.
|
||||||
|
fn total(&self) -> usize { self.blocks + self.block_details + self.transaction_addresses + self.block_logs + self.blocks_blooms }
|
||||||
|
}
|
||||||
|
|
||||||
/// Information about best block gathered together
|
/// Information about best block gathered together
|
||||||
struct BestBlock {
|
struct BestBlock {
|
||||||
pub hash: H256,
|
pub hash: H256,
|
||||||
@ -96,6 +101,17 @@ pub trait BlockProvider {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
|
||||||
|
enum CacheID {
|
||||||
|
Block(H256),
|
||||||
|
Extras(ExtrasIndex, H256),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CacheManager {
|
||||||
|
cache_usage: VecDeque<HashSet<CacheID>>,
|
||||||
|
in_use: HashSet<CacheID>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Structure providing fast access to blockchain data.
|
/// Structure providing fast access to blockchain data.
|
||||||
///
|
///
|
||||||
/// **Does not do input data verification.**
|
/// **Does not do input data verification.**
|
||||||
@ -113,7 +129,9 @@ pub struct BlockChain {
|
|||||||
blocks_blooms: RwLock<HashMap<H256, BlocksBlooms>>,
|
blocks_blooms: RwLock<HashMap<H256, BlocksBlooms>>,
|
||||||
|
|
||||||
extras_db: DB,
|
extras_db: DB,
|
||||||
blocks_db: DB
|
blocks_db: DB,
|
||||||
|
|
||||||
|
cache_man: RwLock<CacheManager>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockProvider for BlockChain {
|
impl BlockProvider for BlockChain {
|
||||||
@ -136,6 +154,8 @@ impl BlockProvider for BlockChain {
|
|||||||
let opt = self.blocks_db.get(hash)
|
let opt = self.blocks_db.get(hash)
|
||||||
.expect("Low level database error. Some issue with disk?");
|
.expect("Low level database error. Some issue with disk?");
|
||||||
|
|
||||||
|
self.note_used(CacheID::Block(hash.clone()));
|
||||||
|
|
||||||
match opt {
|
match opt {
|
||||||
Some(b) => {
|
Some(b) => {
|
||||||
let bytes: Bytes = b.to_vec();
|
let bytes: Bytes = b.to_vec();
|
||||||
@ -158,6 +178,10 @@ impl BlockProvider for BlockChain {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const COLLECTION_QUEUE_SIZE: usize = 2;
|
||||||
|
const MIN_CACHE_SIZE: usize = 1;
|
||||||
|
const MAX_CACHE_SIZE: usize = 1024 * 1024 * 1;
|
||||||
|
|
||||||
impl BlockChain {
|
impl BlockChain {
|
||||||
/// Create new instance of blockchain from given Genesis
|
/// Create new instance of blockchain from given Genesis
|
||||||
///
|
///
|
||||||
@ -197,6 +221,9 @@ impl BlockChain {
|
|||||||
blocks_path.push("blocks");
|
blocks_path.push("blocks");
|
||||||
let blocks_db = DB::open_default(blocks_path.to_str().unwrap()).unwrap();
|
let blocks_db = DB::open_default(blocks_path.to_str().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let mut cache_man = CacheManager{cache_usage: VecDeque::new(), in_use: HashSet::new()};
|
||||||
|
(0..COLLECTION_QUEUE_SIZE).foreach(|_| cache_man.cache_usage.push_back(HashSet::new()));
|
||||||
|
|
||||||
let bc = BlockChain {
|
let bc = BlockChain {
|
||||||
best_block: RwLock::new(BestBlock::new()),
|
best_block: RwLock::new(BestBlock::new()),
|
||||||
blocks: RwLock::new(HashMap::new()),
|
blocks: RwLock::new(HashMap::new()),
|
||||||
@ -206,7 +233,8 @@ impl BlockChain {
|
|||||||
block_logs: RwLock::new(HashMap::new()),
|
block_logs: RwLock::new(HashMap::new()),
|
||||||
blocks_blooms: RwLock::new(HashMap::new()),
|
blocks_blooms: RwLock::new(HashMap::new()),
|
||||||
extras_db: extras_db,
|
extras_db: extras_db,
|
||||||
blocks_db: blocks_db
|
blocks_db: blocks_db,
|
||||||
|
cache_man: RwLock::new(cache_man),
|
||||||
};
|
};
|
||||||
|
|
||||||
// load best block
|
// load best block
|
||||||
@ -251,7 +279,7 @@ impl BlockChain {
|
|||||||
/// Ensure that the best block does indeed have a state_root in the state DB.
|
/// Ensure that the best block does indeed have a state_root in the state DB.
|
||||||
/// If it doesn't, then rewind down until we find one that does and delete data to ensure that
|
/// If it doesn't, then rewind down until we find one that does and delete data to ensure that
|
||||||
/// later blocks will be reimported.
|
/// later blocks will be reimported.
|
||||||
pub fn ensure_good(&mut self, _state: &OverlayDB) {
|
pub fn ensure_good(&mut self, _state: &JournalDB) {
|
||||||
unimplemented!();
|
unimplemented!();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -497,6 +525,10 @@ impl BlockChain {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(h) = hash.as_h256() {
|
||||||
|
self.note_used(CacheID::Extras(T::extras_index(), h.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
self.extras_db.get_extras(hash).map(| t: T | {
|
self.extras_db.get_extras(hash).map(| t: T | {
|
||||||
let mut write = cache.write().unwrap();
|
let mut write = cache.write().unwrap();
|
||||||
write.insert(hash.clone(), t.clone());
|
write.insert(hash.clone(), t.clone());
|
||||||
@ -537,6 +569,56 @@ impl BlockChain {
|
|||||||
self.block_logs.write().unwrap().squeeze(size.block_logs);
|
self.block_logs.write().unwrap().squeeze(size.block_logs);
|
||||||
self.blocks_blooms.write().unwrap().squeeze(size.blocks_blooms);
|
self.blocks_blooms.write().unwrap().squeeze(size.blocks_blooms);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Let the cache system know that a cacheable item has been used.
|
||||||
|
fn note_used(&self, id: CacheID) {
|
||||||
|
let mut cache_man = self.cache_man.write().unwrap();
|
||||||
|
if !cache_man.cache_usage[0].contains(&id) {
|
||||||
|
cache_man.cache_usage[0].insert(id.clone());
|
||||||
|
if cache_man.in_use.contains(&id) {
|
||||||
|
if let Some(c) = cache_man.cache_usage.iter_mut().skip(1).find(|e|e.contains(&id)) {
|
||||||
|
c.remove(&id);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cache_man.in_use.insert(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ticks our cache system and throws out any old data.
|
||||||
|
pub fn collect_garbage(&self, force: bool) {
|
||||||
|
// TODO: check time.
|
||||||
|
let timeout = true;
|
||||||
|
|
||||||
|
let t = self.cache_size().total();
|
||||||
|
if t < MIN_CACHE_SIZE || (!timeout && (!force || t < MAX_CACHE_SIZE)) { return; }
|
||||||
|
|
||||||
|
let mut cache_man = self.cache_man.write().unwrap();
|
||||||
|
let mut blocks = self.blocks.write().unwrap();
|
||||||
|
let mut block_details = self.block_details.write().unwrap();
|
||||||
|
let mut block_hashes = self.block_hashes.write().unwrap();
|
||||||
|
let mut transaction_addresses = self.transaction_addresses.write().unwrap();
|
||||||
|
let mut block_logs = self.block_logs.write().unwrap();
|
||||||
|
let mut blocks_blooms = self.blocks_blooms.write().unwrap();
|
||||||
|
|
||||||
|
for id in cache_man.cache_usage.pop_back().unwrap().into_iter() {
|
||||||
|
cache_man.in_use.remove(&id);
|
||||||
|
match id {
|
||||||
|
CacheID::Block(h) => { blocks.remove(&h); },
|
||||||
|
CacheID::Extras(ExtrasIndex::BlockDetails, h) => { block_details.remove(&h); },
|
||||||
|
CacheID::Extras(ExtrasIndex::TransactionAddress, h) => { transaction_addresses.remove(&h); },
|
||||||
|
CacheID::Extras(ExtrasIndex::BlockLogBlooms, h) => { block_logs.remove(&h); },
|
||||||
|
CacheID::Extras(ExtrasIndex::BlocksBlooms, h) => { blocks_blooms.remove(&h); },
|
||||||
|
_ => panic!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cache_man.cache_usage.push_front(HashSet::new());
|
||||||
|
|
||||||
|
// TODO: handle block_hashes properly.
|
||||||
|
block_hashes.clear();
|
||||||
|
|
||||||
|
// TODO: m_lastCollection = chrono::system_clock::now();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
use util::*;
|
use util::*;
|
||||||
use rocksdb::{Options, DB};
|
use rocksdb::{Options, DB};
|
||||||
use rocksdb::DBCompactionStyle::DBUniversalCompaction;
|
use blockchain::{BlockChain, BlockProvider, CacheSize};
|
||||||
use blockchain::{BlockChain, BlockProvider};
|
|
||||||
use views::BlockView;
|
use views::BlockView;
|
||||||
use error::*;
|
use error::*;
|
||||||
use header::BlockNumber;
|
use header::BlockNumber;
|
||||||
@ -41,6 +40,12 @@ pub struct BlockChainInfo {
|
|||||||
pub best_block_number: BlockNumber
|
pub best_block_number: BlockNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for BlockChainInfo {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "#{}.{}", self.best_block_number, self.best_block_hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Block queue status
|
/// Block queue status
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct BlockQueueStatus {
|
pub struct BlockQueueStatus {
|
||||||
@ -100,14 +105,32 @@ pub trait BlockChainClient : Sync + Send {
|
|||||||
fn chain_info(&self) -> BlockChainInfo;
|
fn chain_info(&self) -> BlockChainInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone, Debug, Eq, PartialEq)]
|
||||||
|
pub struct ClientReport {
|
||||||
|
pub blocks_imported: usize,
|
||||||
|
pub transactions_applied: usize,
|
||||||
|
pub gas_processed: U256,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClientReport {
|
||||||
|
pub fn accrue_block(&mut self, block: &PreVerifiedBlock) {
|
||||||
|
self.blocks_imported += 1;
|
||||||
|
self.transactions_applied += block.transactions.len();
|
||||||
|
self.gas_processed += block.header.gas_used;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue.
|
/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue.
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
chain: Arc<RwLock<BlockChain>>,
|
chain: Arc<RwLock<BlockChain>>,
|
||||||
engine: Arc<Box<Engine>>,
|
engine: Arc<Box<Engine>>,
|
||||||
state_db: OverlayDB,
|
state_db: JournalDB,
|
||||||
queue: BlockQueue,
|
queue: BlockQueue,
|
||||||
|
report: ClientReport,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const HISTORY: u64 = 1000;
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
/// Create a new client with given spec and DB path.
|
/// Create a new client with given spec and DB path.
|
||||||
pub fn new(spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Client, Error> {
|
pub fn new(spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Client, Error> {
|
||||||
@ -115,7 +138,7 @@ impl Client {
|
|||||||
let mut opts = Options::new();
|
let mut opts = Options::new();
|
||||||
opts.create_if_missing(true);
|
opts.create_if_missing(true);
|
||||||
opts.set_max_open_files(256);
|
opts.set_max_open_files(256);
|
||||||
opts.set_use_fsync(false);
|
/*opts.set_use_fsync(false);
|
||||||
opts.set_bytes_per_sync(8388608);
|
opts.set_bytes_per_sync(8388608);
|
||||||
opts.set_disable_data_sync(false);
|
opts.set_disable_data_sync(false);
|
||||||
opts.set_block_cache_size_mb(1024);
|
opts.set_block_cache_size_mb(1024);
|
||||||
@ -130,16 +153,17 @@ impl Client {
|
|||||||
opts.set_max_background_compactions(4);
|
opts.set_max_background_compactions(4);
|
||||||
opts.set_max_background_flushes(4);
|
opts.set_max_background_flushes(4);
|
||||||
opts.set_filter_deletes(false);
|
opts.set_filter_deletes(false);
|
||||||
opts.set_disable_auto_compactions(true);
|
opts.set_disable_auto_compactions(false);*/
|
||||||
|
|
||||||
let mut state_path = path.to_path_buf();
|
let mut state_path = path.to_path_buf();
|
||||||
state_path.push("state");
|
state_path.push("state");
|
||||||
let db = DB::open(&opts, state_path.to_str().unwrap()).unwrap();
|
let db = DB::open(&opts, state_path.to_str().unwrap()).unwrap();
|
||||||
let mut state_db = OverlayDB::new(db);
|
let mut state_db = JournalDB::new(db);
|
||||||
|
|
||||||
let engine = Arc::new(try!(spec.to_engine()));
|
let engine = Arc::new(try!(spec.to_engine()));
|
||||||
engine.spec().ensure_db_good(&mut state_db);
|
if engine.spec().ensure_db_good(&mut state_db) {
|
||||||
state_db.commit().expect("Error commiting genesis state to state DB");
|
state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
|
||||||
|
}
|
||||||
|
|
||||||
// chain.write().unwrap().ensure_good(&state_db);
|
// chain.write().unwrap().ensure_good(&state_db);
|
||||||
|
|
||||||
@ -148,6 +172,7 @@ impl Client {
|
|||||||
engine: engine.clone(),
|
engine: engine.clone(),
|
||||||
state_db: state_db,
|
state_db: state_db,
|
||||||
queue: BlockQueue::new(engine, message_channel),
|
queue: BlockQueue::new(engine, message_channel),
|
||||||
|
report: Default::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -212,16 +237,34 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here?
|
self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here?
|
||||||
match result.drain().commit() {
|
let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None };
|
||||||
|
match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(target: "client", "State DB commit failed: {:?}", e);
|
warn!(target: "client", "State DB commit failed: {:?}", e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
self.report.accrue_block(&block);
|
||||||
|
|
||||||
|
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get info on the cache.
|
||||||
|
pub fn cache_info(&self) -> CacheSize {
|
||||||
|
self.chain.read().unwrap().cache_size()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the report.
|
||||||
|
pub fn report(&self) -> ClientReport {
|
||||||
|
self.report.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tick the client.
|
||||||
|
pub fn tick(&self) {
|
||||||
|
self.chain.read().unwrap().collect_garbage(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockChainClient for Client {
|
impl BlockChainClient for Client {
|
||||||
|
@ -215,7 +215,7 @@ fn on_close_block() {
|
|||||||
use super::*;
|
use super::*;
|
||||||
let engine = new_morden().to_engine().unwrap();
|
let engine = new_morden().to_engine().unwrap();
|
||||||
let genesis_header = engine.spec().genesis_header();
|
let genesis_header = engine.spec().genesis_header();
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = JournalDB::new_temp();
|
||||||
engine.spec().ensure_db_good(&mut db);
|
engine.spec().ensure_db_good(&mut db);
|
||||||
let last_hashes = vec![genesis_header.hash()];
|
let last_hashes = vec![genesis_header.hash()];
|
||||||
let b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]);
|
let b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]);
|
||||||
@ -228,7 +228,7 @@ fn on_close_block_with_uncle() {
|
|||||||
use super::*;
|
use super::*;
|
||||||
let engine = new_morden().to_engine().unwrap();
|
let engine = new_morden().to_engine().unwrap();
|
||||||
let genesis_header = engine.spec().genesis_header();
|
let genesis_header = engine.spec().genesis_header();
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = JournalDB::new_temp();
|
||||||
engine.spec().ensure_db_good(&mut db);
|
engine.spec().ensure_db_good(&mut db);
|
||||||
let last_hashes = vec![genesis_header.hash()];
|
let last_hashes = vec![genesis_header.hash()];
|
||||||
let mut b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]);
|
let mut b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]);
|
||||||
|
@ -40,7 +40,7 @@ mod tests {
|
|||||||
fn ensure_db_good() {
|
fn ensure_db_good() {
|
||||||
let engine = new_morden().to_engine().unwrap();
|
let engine = new_morden().to_engine().unwrap();
|
||||||
let genesis_header = engine.spec().genesis_header();
|
let genesis_header = engine.spec().genesis_header();
|
||||||
let mut db = OverlayDB::new_temp();
|
let mut db = JournalDB::new_temp();
|
||||||
engine.spec().ensure_db_good(&mut db);
|
engine.spec().ensure_db_good(&mut db);
|
||||||
let s = State::from_existing(db.clone(), genesis_header.state_root.clone(), engine.account_start_nonce());
|
let s = State::from_existing(db.clone(), genesis_header.state_root.clone(), engine.account_start_nonce());
|
||||||
assert_eq!(s.balance(&address_from_hex("0000000000000000000000000000000000000001")), U256::from(1u64));
|
assert_eq!(s.balance(&address_from_hex("0000000000000000000000000000000000000001")), U256::from(1u64));
|
||||||
|
@ -3,7 +3,7 @@ use header::BlockNumber;
|
|||||||
use rocksdb::{DB, Writable};
|
use rocksdb::{DB, Writable};
|
||||||
|
|
||||||
/// Represents index of extra data in database
|
/// Represents index of extra data in database
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Debug, Hash, Eq, PartialEq, Clone)]
|
||||||
pub enum ExtrasIndex {
|
pub enum ExtrasIndex {
|
||||||
BlockDetails = 0,
|
BlockDetails = 0,
|
||||||
BlockHash = 1,
|
BlockHash = 1,
|
||||||
@ -59,6 +59,7 @@ impl ExtrasReadable for DB {
|
|||||||
/// Implementations should convert arbitrary type to database key slice
|
/// Implementations should convert arbitrary type to database key slice
|
||||||
pub trait ExtrasSliceConvertable {
|
pub trait ExtrasSliceConvertable {
|
||||||
fn to_extras_slice(&self, i: ExtrasIndex) -> H264;
|
fn to_extras_slice(&self, i: ExtrasIndex) -> H264;
|
||||||
|
fn as_h256(&self) -> Option<&H256> { None }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExtrasSliceConvertable for H256 {
|
impl ExtrasSliceConvertable for H256 {
|
||||||
@ -67,6 +68,7 @@ impl ExtrasSliceConvertable for H256 {
|
|||||||
slice[32] = i as u8;
|
slice[32] = i as u8;
|
||||||
slice
|
slice
|
||||||
}
|
}
|
||||||
|
fn as_h256(&self) -> Option<&H256> { Some(self) }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExtrasSliceConvertable for U256 {
|
impl ExtrasSliceConvertable for U256 {
|
||||||
|
@ -220,7 +220,7 @@ impl FromJson for Spec {
|
|||||||
|
|
||||||
impl Spec {
|
impl Spec {
|
||||||
/// Ensure that the given state DB has the trie nodes in for the genesis state.
|
/// Ensure that the given state DB has the trie nodes in for the genesis state.
|
||||||
pub fn ensure_db_good(&self, db: &mut HashDB) {
|
pub fn ensure_db_good(&self, db: &mut HashDB) -> bool {
|
||||||
if !db.contains(&self.state_root()) {
|
if !db.contains(&self.state_root()) {
|
||||||
info!("Populating genesis state...");
|
info!("Populating genesis state...");
|
||||||
let mut root = H256::new();
|
let mut root = H256::new();
|
||||||
@ -232,7 +232,8 @@ impl Spec {
|
|||||||
}
|
}
|
||||||
assert!(db.contains(&self.state_root()));
|
assert!(db.contains(&self.state_root()));
|
||||||
info!("Genesis state is ready");
|
info!("Genesis state is ready");
|
||||||
}
|
true
|
||||||
|
} else { false }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new Spec from a JSON UTF-8 data resource `data`.
|
/// Create a new Spec from a JSON UTF-8 data resource `data`.
|
||||||
|
12
src/state.rs
12
src/state.rs
@ -10,7 +10,7 @@ pub type ApplyResult = Result<Receipt, Error>;
|
|||||||
/// Representation of the entire state of all accounts in the system.
|
/// Representation of the entire state of all accounts in the system.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct State {
|
pub struct State {
|
||||||
db: OverlayDB,
|
db: JournalDB,
|
||||||
root: H256,
|
root: H256,
|
||||||
cache: RefCell<HashMap<Address, Option<Account>>>,
|
cache: RefCell<HashMap<Address, Option<Account>>>,
|
||||||
|
|
||||||
@ -19,7 +19,7 @@ pub struct State {
|
|||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
/// Creates new state with empty state root
|
/// Creates new state with empty state root
|
||||||
pub fn new(mut db: OverlayDB, account_start_nonce: U256) -> State {
|
pub fn new(mut db: JournalDB, account_start_nonce: U256) -> State {
|
||||||
let mut root = H256::new();
|
let mut root = H256::new();
|
||||||
{
|
{
|
||||||
// init trie and reset root too null
|
// init trie and reset root too null
|
||||||
@ -35,7 +35,7 @@ impl State {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Creates new state with existing state root
|
/// Creates new state with existing state root
|
||||||
pub fn from_existing(db: OverlayDB, root: H256, account_start_nonce: U256) -> State {
|
pub fn from_existing(db: JournalDB, root: H256, account_start_nonce: U256) -> State {
|
||||||
{
|
{
|
||||||
// trie should panic! if root does not exist
|
// trie should panic! if root does not exist
|
||||||
let _ = SecTrieDB::new(&db, &root);
|
let _ = SecTrieDB::new(&db, &root);
|
||||||
@ -51,11 +51,11 @@ impl State {
|
|||||||
|
|
||||||
/// Create temporary state object
|
/// Create temporary state object
|
||||||
pub fn new_temp() -> State {
|
pub fn new_temp() -> State {
|
||||||
Self::new(OverlayDB::new_temp(), U256::from(0u8))
|
Self::new(JournalDB::new_temp(), U256::from(0u8))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Destroy the current object and return root and database.
|
/// Destroy the current object and return root and database.
|
||||||
pub fn drop(self) -> (H256, OverlayDB) {
|
pub fn drop(self) -> (H256, JournalDB) {
|
||||||
(self.root, self.db)
|
(self.root, self.db)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,7 +65,7 @@ impl State {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Expose the underlying database; good to use for calling `state.db().commit()`.
|
/// Expose the underlying database; good to use for calling `state.db().commit()`.
|
||||||
pub fn db(&mut self) -> &mut OverlayDB {
|
pub fn db(&mut self) -> &mut JournalDB {
|
||||||
&mut self.db
|
&mut self.db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -471,7 +471,7 @@ impl ChainSync {
|
|||||||
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
|
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
|
||||||
trace!(target: "sync", "== Disconnecting {}", peer);
|
trace!(target: "sync", "== Disconnecting {}", peer);
|
||||||
if self.peers.contains_key(&peer) {
|
if self.peers.contains_key(&peer) {
|
||||||
info!(target: "sync", "Disconneced {}:{}", peer, io.peer_info(peer));
|
info!(target: "sync", "Disconnected {}:{}", peer, io.peer_info(peer));
|
||||||
self.clear_peer_download(peer);
|
self.clear_peer_download(peer);
|
||||||
self.peers.remove(&peer);
|
self.peers.remove(&peer);
|
||||||
self.continue_sync(io);
|
self.continue_sync(io);
|
||||||
|
@ -22,6 +22,7 @@ pub enum UtilError {
|
|||||||
BaseData(BaseDataError),
|
BaseData(BaseDataError),
|
||||||
Network(NetworkError),
|
Network(NetworkError),
|
||||||
Decoder(DecoderError),
|
Decoder(DecoderError),
|
||||||
|
SimpleString(String),
|
||||||
BadSize,
|
BadSize,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,6 +74,12 @@ impl From<::rlp::DecoderError> for UtilError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<String> for UtilError {
|
||||||
|
fn from(err: String) -> UtilError {
|
||||||
|
UtilError::SimpleString(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: uncomment below once https://github.com/rust-lang/rust/issues/27336 sorted.
|
// TODO: uncomment below once https://github.com/rust-lang/rust/issues/27336 sorted.
|
||||||
/*#![feature(concat_idents)]
|
/*#![feature(concat_idents)]
|
||||||
macro_rules! assimilate {
|
macro_rules! assimilate {
|
||||||
|
214
util/src/journaldb.rs
Normal file
214
util/src/journaldb.rs
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
//! Disk-backed HashDB implementation.
|
||||||
|
|
||||||
|
use std::env;
|
||||||
|
use common::*;
|
||||||
|
use rlp::*;
|
||||||
|
use hashdb::*;
|
||||||
|
use overlaydb::*;
|
||||||
|
use rocksdb::{DB, Writable};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
/// Implementation of the HashDB trait for a disk-backed database with a memory overlay
|
||||||
|
/// and latent-removal semantics.
|
||||||
|
///
|
||||||
|
/// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to
|
||||||
|
/// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect
|
||||||
|
/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before
|
||||||
|
/// the removals actually take effect.
|
||||||
|
pub struct JournalDB {
|
||||||
|
forward: OverlayDB,
|
||||||
|
backing: Arc<DB>,
|
||||||
|
inserts: Vec<H256>,
|
||||||
|
removes: Vec<H256>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JournalDB {
|
||||||
|
/// Create a new instance given a `backing` database.
|
||||||
|
pub fn new(backing: DB) -> JournalDB {
|
||||||
|
let db = Arc::new(backing);
|
||||||
|
JournalDB {
|
||||||
|
forward: OverlayDB::new_with_arc(db.clone()),
|
||||||
|
backing: db,
|
||||||
|
inserts: vec![],
|
||||||
|
removes: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new instance with an anonymous temporary database.
|
||||||
|
pub fn new_temp() -> JournalDB {
|
||||||
|
let mut dir = env::temp_dir();
|
||||||
|
dir.push(H32::random().hex());
|
||||||
|
Self::new(DB::open_default(dir.to_str().unwrap()).unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a clone of the overlay db portion of this.
|
||||||
|
pub fn to_overlaydb(&self) -> OverlayDB { self.forward.clone() }
|
||||||
|
|
||||||
|
/// Commit all recent insert operations and historical removals from the old era
|
||||||
|
/// to the backing database.
|
||||||
|
pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
|
||||||
|
// journal format:
|
||||||
|
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
|
||||||
|
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
|
||||||
|
// [era, n] => [ ... ]
|
||||||
|
|
||||||
|
// TODO: store last_era, reclaim_period.
|
||||||
|
|
||||||
|
// when we make a new commit, we journal the inserts and removes.
|
||||||
|
// for each end_era that we journaled that we are no passing by,
|
||||||
|
// we remove all of its removes assuming it is canonical and all
|
||||||
|
// of its inserts otherwise.
|
||||||
|
|
||||||
|
// record new commit's details.
|
||||||
|
{
|
||||||
|
let mut index = 0usize;
|
||||||
|
let mut last;
|
||||||
|
|
||||||
|
while try!(self.backing.get({
|
||||||
|
let mut r = RlpStream::new_list(2);
|
||||||
|
r.append(&now);
|
||||||
|
r.append(&index);
|
||||||
|
last = r.drain();
|
||||||
|
&last
|
||||||
|
})).is_some() {
|
||||||
|
index += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut r = RlpStream::new_list(3);
|
||||||
|
r.append(id);
|
||||||
|
r.append(&self.inserts);
|
||||||
|
r.append(&self.removes);
|
||||||
|
try!(self.backing.put(&last, r.as_raw()));
|
||||||
|
self.inserts.clear();
|
||||||
|
self.removes.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// apply old commits' details
|
||||||
|
if let Some((end_era, canon_id)) = end {
|
||||||
|
let mut index = 0usize;
|
||||||
|
let mut last;
|
||||||
|
while let Some(rlp_data) = try!(self.backing.get({
|
||||||
|
let mut r = RlpStream::new_list(2);
|
||||||
|
r.append(&end_era);
|
||||||
|
r.append(&index);
|
||||||
|
last = r.drain();
|
||||||
|
&last
|
||||||
|
})) {
|
||||||
|
let rlp = Rlp::new(&rlp_data);
|
||||||
|
let to_remove: Vec<H256> = rlp.val_at(if canon_id == rlp.val_at(0) {2} else {1});
|
||||||
|
for i in to_remove.iter() {
|
||||||
|
self.forward.remove(i);
|
||||||
|
}
|
||||||
|
try!(self.backing.delete(&last));
|
||||||
|
trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, to_remove.len());
|
||||||
|
index += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.forward.commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Revert all operations on this object (i.e. `insert()`s and `removes()`s) since the
|
||||||
|
/// last `commit()`.
|
||||||
|
pub fn revert(&mut self) { self.forward.revert(); self.removes.clear(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HashDB for JournalDB {
|
||||||
|
fn keys(&self) -> HashMap<H256, i32> { self.forward.keys() }
|
||||||
|
fn lookup(&self, key: &H256) -> Option<&[u8]> { self.forward.lookup(key) }
|
||||||
|
fn exists(&self, key: &H256) -> bool { self.forward.exists(key) }
|
||||||
|
fn insert(&mut self, value: &[u8]) -> H256 { let r = self.forward.insert(value); self.inserts.push(r.clone()); r }
|
||||||
|
fn emplace(&mut self, key: H256, value: Bytes) { self.inserts.push(key.clone()); self.forward.emplace(key, value); }
|
||||||
|
fn kill(&mut self, key: &H256) { self.removes.push(key.clone()); }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use common::*;
|
||||||
|
use super::*;
|
||||||
|
use hashdb::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn long_history() {
|
||||||
|
// history is 3
|
||||||
|
let mut jdb = JournalDB::new_temp();
|
||||||
|
let h = jdb.insert(b"foo");
|
||||||
|
jdb.commit(0, &b"0".sha3(), None).unwrap();
|
||||||
|
assert!(jdb.exists(&h));
|
||||||
|
jdb.remove(&h);
|
||||||
|
jdb.commit(1, &b"1".sha3(), None).unwrap();
|
||||||
|
assert!(jdb.exists(&h));
|
||||||
|
jdb.commit(2, &b"2".sha3(), None).unwrap();
|
||||||
|
assert!(jdb.exists(&h));
|
||||||
|
jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap();
|
||||||
|
assert!(jdb.exists(&h));
|
||||||
|
jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap();
|
||||||
|
assert!(!jdb.exists(&h));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn complex() {
|
||||||
|
// history is 1
|
||||||
|
let mut jdb = JournalDB::new_temp();
|
||||||
|
|
||||||
|
let foo = jdb.insert(b"foo");
|
||||||
|
let bar = jdb.insert(b"bar");
|
||||||
|
jdb.commit(0, &b"0".sha3(), None).unwrap();
|
||||||
|
assert!(jdb.exists(&foo));
|
||||||
|
assert!(jdb.exists(&bar));
|
||||||
|
|
||||||
|
jdb.remove(&foo);
|
||||||
|
jdb.remove(&bar);
|
||||||
|
let baz = jdb.insert(b"baz");
|
||||||
|
jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap();
|
||||||
|
assert!(jdb.exists(&foo));
|
||||||
|
assert!(jdb.exists(&bar));
|
||||||
|
assert!(jdb.exists(&baz));
|
||||||
|
|
||||||
|
let foo = jdb.insert(b"foo");
|
||||||
|
jdb.remove(&baz);
|
||||||
|
jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap();
|
||||||
|
assert!(jdb.exists(&foo));
|
||||||
|
assert!(!jdb.exists(&bar));
|
||||||
|
assert!(jdb.exists(&baz));
|
||||||
|
|
||||||
|
jdb.remove(&foo);
|
||||||
|
jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap();
|
||||||
|
assert!(jdb.exists(&foo));
|
||||||
|
assert!(!jdb.exists(&bar));
|
||||||
|
assert!(!jdb.exists(&baz));
|
||||||
|
|
||||||
|
jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap();
|
||||||
|
assert!(!jdb.exists(&foo));
|
||||||
|
assert!(!jdb.exists(&bar));
|
||||||
|
assert!(!jdb.exists(&baz));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn fork() {
|
||||||
|
// history is 1
|
||||||
|
let mut jdb = JournalDB::new_temp();
|
||||||
|
|
||||||
|
let foo = jdb.insert(b"foo");
|
||||||
|
let bar = jdb.insert(b"bar");
|
||||||
|
jdb.commit(0, &b"0".sha3(), None).unwrap();
|
||||||
|
assert!(jdb.exists(&foo));
|
||||||
|
assert!(jdb.exists(&bar));
|
||||||
|
|
||||||
|
jdb.remove(&foo);
|
||||||
|
let baz = jdb.insert(b"baz");
|
||||||
|
jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap();
|
||||||
|
|
||||||
|
jdb.remove(&bar);
|
||||||
|
jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap();
|
||||||
|
|
||||||
|
assert!(jdb.exists(&foo));
|
||||||
|
assert!(jdb.exists(&bar));
|
||||||
|
assert!(jdb.exists(&baz));
|
||||||
|
|
||||||
|
jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap();
|
||||||
|
assert!(jdb.exists(&foo));
|
||||||
|
assert!(!jdb.exists(&baz));
|
||||||
|
assert!(!jdb.exists(&bar));
|
||||||
|
}
|
||||||
|
}
|
@ -69,6 +69,7 @@ pub mod sha3;
|
|||||||
pub mod hashdb;
|
pub mod hashdb;
|
||||||
pub mod memorydb;
|
pub mod memorydb;
|
||||||
pub mod overlaydb;
|
pub mod overlaydb;
|
||||||
|
pub mod journaldb;
|
||||||
pub mod math;
|
pub mod math;
|
||||||
pub mod chainfilter;
|
pub mod chainfilter;
|
||||||
pub mod crypto;
|
pub mod crypto;
|
||||||
@ -88,6 +89,7 @@ pub use rlp::*;
|
|||||||
pub use hashdb::*;
|
pub use hashdb::*;
|
||||||
pub use memorydb::*;
|
pub use memorydb::*;
|
||||||
pub use overlaydb::*;
|
pub use overlaydb::*;
|
||||||
|
pub use journaldb::*;
|
||||||
pub use math::*;
|
pub use math::*;
|
||||||
pub use chainfilter::*;
|
pub use chainfilter::*;
|
||||||
pub use crypto::*;
|
pub use crypto::*;
|
||||||
|
@ -15,11 +15,11 @@ use rocksdb::{DB, Writable, IteratorMode};
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
/// Implementation of the HashDB trait for a disk-backed database with a memory overlay.
|
/// Implementation of the HashDB trait for a disk-backed database with a memory overlay.
|
||||||
///
|
///
|
||||||
/// The operations `insert()` and `kill()` take place on the memory overlay; batches of
|
/// The operations `insert()` and `remove()` take place on the memory overlay; batches of
|
||||||
/// such operations may be flushed to the disk-backed DB with `commit()` or discarded with
|
/// such operations may be flushed to the disk-backed DB with `commit()` or discarded with
|
||||||
/// `revert()`.
|
/// `revert()`.
|
||||||
///
|
///
|
||||||
/// `lookup()` and `exists()` maintain normal behaviour - all `insert()` and `kill()`
|
/// `lookup()` and `contains()` maintain normal behaviour - all `insert()` and `remove()`
|
||||||
/// queries have an immediate effect in terms of these functions.
|
/// queries have an immediate effect in terms of these functions.
|
||||||
pub struct OverlayDB {
|
pub struct OverlayDB {
|
||||||
overlay: MemoryDB,
|
overlay: MemoryDB,
|
||||||
@ -28,8 +28,11 @@ pub struct OverlayDB {
|
|||||||
|
|
||||||
impl OverlayDB {
|
impl OverlayDB {
|
||||||
/// Create a new instance of OverlayDB given a `backing` database.
|
/// Create a new instance of OverlayDB given a `backing` database.
|
||||||
pub fn new(backing: DB) -> OverlayDB {
|
pub fn new(backing: DB) -> OverlayDB { Self::new_with_arc(Arc::new(backing)) }
|
||||||
OverlayDB{ overlay: MemoryDB::new(), backing: Arc::new(backing) }
|
|
||||||
|
/// Create a new instance of OverlayDB given a `backing` database.
|
||||||
|
pub fn new_with_arc(backing: Arc<DB>) -> OverlayDB {
|
||||||
|
OverlayDB{ overlay: MemoryDB::new(), backing: backing }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new instance of OverlayDB with an anonymous temporary database.
|
/// Create a new instance of OverlayDB with an anonymous temporary database.
|
||||||
@ -68,11 +71,10 @@ impl OverlayDB {
|
|||||||
/// ```
|
/// ```
|
||||||
pub fn commit(&mut self) -> Result<u32, UtilError> {
|
pub fn commit(&mut self) -> Result<u32, UtilError> {
|
||||||
let mut ret = 0u32;
|
let mut ret = 0u32;
|
||||||
|
let mut deletes = 0usize;
|
||||||
for i in self.overlay.drain().into_iter() {
|
for i in self.overlay.drain().into_iter() {
|
||||||
let (key, (value, rc)) = i;
|
let (key, (value, rc)) = i;
|
||||||
// until we figure out state trie pruning, only commit stuff when it has a strictly positive delkta of RCs -
|
if rc != 0 {
|
||||||
// this prevents RCs being reduced to 0 where the DB would pretent that the node had been removed.
|
|
||||||
if rc > 0 {
|
|
||||||
match self.payload(&key) {
|
match self.payload(&key) {
|
||||||
Some(x) => {
|
Some(x) => {
|
||||||
let (back_value, back_rc) = x;
|
let (back_value, back_rc) = x;
|
||||||
@ -80,7 +82,7 @@ impl OverlayDB {
|
|||||||
if total_rc < 0 {
|
if total_rc < 0 {
|
||||||
return Err(From::from(BaseDataError::NegativelyReferencedHash));
|
return Err(From::from(BaseDataError::NegativelyReferencedHash));
|
||||||
}
|
}
|
||||||
self.put_payload(&key, (back_value, total_rc as u32));
|
deletes += if self.put_payload(&key, (back_value, total_rc as u32)) {1} else {0};
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
if rc < 0 {
|
if rc < 0 {
|
||||||
@ -92,6 +94,7 @@ impl OverlayDB {
|
|||||||
ret += 1;
|
ret += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
trace!("OverlayDB::commit() deleted {} nodes", deletes);
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,11 +132,18 @@ impl OverlayDB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get the refs and value of the given key.
|
/// Get the refs and value of the given key.
|
||||||
fn put_payload(&self, key: &H256, payload: (Bytes, u32)) {
|
fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool {
|
||||||
|
if payload.1 > 0 {
|
||||||
let mut s = RlpStream::new_list(2);
|
let mut s = RlpStream::new_list(2);
|
||||||
s.append(&payload.1);
|
s.append(&payload.1);
|
||||||
s.append(&payload.0);
|
s.append(&payload.0);
|
||||||
self.backing.put(&key.bytes(), &s.out()).expect("Low-level database error. Some issue with your hard disk?");
|
self.backing.put(&key.bytes(), s.as_raw()).expect("Low-level database error. Some issue with your hard disk?");
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
self.backing.delete(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?");
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,6 +142,14 @@ impl RlpStream {
|
|||||||
self.note_appended(1);
|
self.note_appended(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drain the object and return the underlying ElasticArray.
|
||||||
|
pub fn drain(self) -> ElasticArray1024<u8> {
|
||||||
|
match self.is_finished() {
|
||||||
|
true => self.encoder.bytes,
|
||||||
|
false => panic!()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct BasicEncoder {
|
struct BasicEncoder {
|
||||||
|
Loading…
Reference in New Issue
Block a user