diff --git a/src/account.rs b/src/account.rs index 8c36c7cbd..c6c4136df 100644 --- a/src/account.rs +++ b/src/account.rs @@ -236,7 +236,7 @@ mod tests { #[test] fn storage_at() { - let mut db = OverlayDB::new_temp(); + let mut db = MemoryDB::new(); let rlp = { let mut a = Account::new_contract(U256::from(69u8)); a.set_storage(H256::from(&U256::from(0x00u64)), H256::from(&U256::from(0x1234u64))); @@ -254,7 +254,7 @@ mod tests { #[test] fn note_code() { - let mut db = OverlayDB::new_temp(); + let mut db = MemoryDB::new(); let rlp = { let mut a = Account::new_contract(U256::from(69u8)); @@ -273,7 +273,7 @@ mod tests { #[test] fn commit_storage() { let mut a = Account::new_contract(U256::from(69u8)); - let mut db = OverlayDB::new_temp(); + let mut db = MemoryDB::new(); a.set_storage(x!(0), x!(0x1234)); assert_eq!(a.storage_root(), None); a.commit_storage(&mut db); @@ -283,7 +283,7 @@ mod tests { #[test] fn commit_remove_commit_storage() { let mut a = Account::new_contract(U256::from(69u8)); - let mut db = OverlayDB::new_temp(); + let mut db = MemoryDB::new(); a.set_storage(x!(0), x!(0x1234)); a.commit_storage(&mut db); a.set_storage(x!(1), x!(0x1234)); @@ -296,7 +296,7 @@ mod tests { #[test] fn commit_code() { let mut a = Account::new_contract(U256::from(69u8)); - let mut db = OverlayDB::new_temp(); + let mut db = MemoryDB::new(); a.init_code(vec![0x55, 0x44, 0xffu8]); assert_eq!(a.code_hash(), SHA3_EMPTY); a.commit_code(&mut db); diff --git a/src/bin/client.rs b/src/bin/client.rs index a862737be..3335d8a72 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -12,6 +12,7 @@ use util::*; use ethcore::client::*; use ethcore::service::ClientService; use ethcore::ethereum; +use ethcore::blockchain::CacheSize; use ethcore::sync::*; fn setup_log() { @@ -29,7 +30,7 @@ fn main() { setup_log(); let spec = ethereum::new_frontier(); let mut service = ClientService::start(spec).unwrap(); - let io_handler = Box::new(ClientIoHandler { client: service.client(), timer: 0 }); + let io_handler = Box::new(ClientIoHandler { client: service.client(), timer: 0, info: Default::default() }); service.io().register_handler(io_handler).expect("Error registering IO handler"); loop { let mut cmd = String::new(); @@ -40,10 +41,47 @@ fn main() { } } +#[derive(Default, Debug)] +struct Informant { + chain_info: Option, + cache_info: Option, + report: Option, +} + +impl Informant { + pub fn tick(&mut self, client: &Client) { + // 5 seconds betwen calls. TODO: calculate this properly. + let dur = 5usize; + + let chain_info = client.chain_info(); + let cache_info = client.cache_info(); + let report = client.report(); + + if let (_, &Some(ref last_cache_info), &Some(ref last_report)) = (&self.chain_info, &self.cache_info, &self.report) { + println!("[ {} {} ]---[ {} blk/s | {} tx/s | {} gas/s //···{}···// {} ({}) bl {} ({}) ex ]", + chain_info.best_block_number, + chain_info.best_block_hash, + (report.blocks_imported - last_report.blocks_imported) / dur, + (report.transactions_applied - last_report.transactions_applied) / dur, + (report.gas_processed - last_report.gas_processed) / From::from(dur), + 0, // TODO: peers + cache_info.blocks, + cache_info.blocks as isize - last_cache_info.blocks as isize, + cache_info.block_details, + cache_info.block_details as isize - last_cache_info.block_details as isize + ); + } + + self.chain_info = Some(chain_info); + self.cache_info = Some(cache_info); + self.report = Some(report); + } +} struct ClientIoHandler { client: Arc>, timer: TimerToken, + info: Informant, } impl IoHandler for ClientIoHandler { @@ -53,7 +91,9 @@ impl IoHandler for ClientIoHandler { fn timeout<'s>(&'s mut self, _io: &mut IoContext<'s, NetSyncMessage>, timer: TimerToken) { if self.timer == timer { - println!("Chain info: {:?}", self.client.read().unwrap().deref().chain_info()); + let client = self.client.read().unwrap(); + client.tick(); + self.info.tick(client.deref()); } } } diff --git a/src/block.rs b/src/block.rs index e5ca18e8c..1b578e3a3 100644 --- a/src/block.rs +++ b/src/block.rs @@ -104,7 +104,7 @@ pub struct SealedBlock { impl<'x, 'y> OpenBlock<'x, 'y> { /// Create a new OpenBlock ready for transaction pushing. - pub fn new<'a, 'b>(engine: &'a Engine, db: OverlayDB, parent: &Header, last_hashes: &'b LastHashes, author: Address, extra_data: Bytes) -> OpenBlock<'a, 'b> { + pub fn new<'a, 'b>(engine: &'a Engine, db: JournalDB, parent: &Header, last_hashes: &'b LastHashes, author: Address, extra_data: Bytes) -> OpenBlock<'a, 'b> { let mut r = OpenBlock { block: Block::new(State::from_existing(db, parent.state_root().clone(), engine.account_start_nonce())), engine: engine, @@ -242,7 +242,7 @@ impl<'x, 'y> ClosedBlock<'x, 'y> { pub fn reopen(self) -> OpenBlock<'x, 'y> { self.open_block } /// Drop this object and return the underlieing database. - pub fn drain(self) -> OverlayDB { self.open_block.block.state.drop().1 } + pub fn drain(self) -> JournalDB { self.open_block.block.state.drop().1 } } impl SealedBlock { @@ -257,7 +257,7 @@ impl SealedBlock { } /// Drop this object and return the underlieing database. - pub fn drain(self) -> OverlayDB { self.block.state.drop().1 } + pub fn drain(self) -> JournalDB { self.block.state.drop().1 } } impl IsBlock for SealedBlock { @@ -265,7 +265,7 @@ impl IsBlock for SealedBlock { } /// Enact the block given by block header, transactions and uncles -pub fn enact<'x, 'y>(header: &Header, transactions: &[Transaction], uncles: &[Header], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { +pub fn enact<'x, 'y>(header: &Header, transactions: &[Transaction], uncles: &[Header], engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { { let s = State::from_existing(db.clone(), parent.state_root().clone(), engine.account_start_nonce()); trace!("enact(): root={}, author={}, author_balance={}\n", s.root(), header.author(), s.balance(&header.author())); @@ -281,20 +281,20 @@ pub fn enact<'x, 'y>(header: &Header, transactions: &[Transaction], uncles: &[He } /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header -pub fn enact_bytes<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { +pub fn enact_bytes<'x, 'y>(block_bytes: &[u8], engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { let block = BlockView::new(block_bytes); let header = block.header(); enact(&header, &block.transactions(), &block.uncles(), engine, db, parent, last_hashes) } /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header -pub fn enact_verified<'x, 'y>(block: &PreVerifiedBlock, engine: &'x Engine, db: OverlayDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { +pub fn enact_verified<'x, 'y>(block: &PreVerifiedBlock, engine: &'x Engine, db: JournalDB, parent: &Header, last_hashes: &'y LastHashes) -> Result, Error> { let view = BlockView::new(&block.bytes); enact(&block.header, &block.transactions, &view.uncles(), engine, db, parent, last_hashes) } /// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header. Seal the block aferwards -pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: OverlayDB, parent: &Header, last_hashes: &LastHashes) -> Result { +pub fn enact_and_seal(block_bytes: &[u8], engine: &Engine, db: JournalDB, parent: &Header, last_hashes: &LastHashes) -> Result { let header = BlockView::new(block_bytes).header_view(); Ok(try!(try!(enact_bytes(block_bytes, engine, db, parent, last_hashes)).seal(header.seal()))) } @@ -304,7 +304,7 @@ fn open_block() { use spec::*; let engine = Spec::new_test().to_engine().unwrap(); let genesis_header = engine.spec().genesis_header(); - let mut db = OverlayDB::new_temp(); + let mut db = JournalDB::new_temp(); engine.spec().ensure_db_good(&mut db); let last_hashes = vec![genesis_header.hash()]; let b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]); @@ -318,13 +318,13 @@ fn enact_block() { let engine = Spec::new_test().to_engine().unwrap(); let genesis_header = engine.spec().genesis_header(); - let mut db = OverlayDB::new_temp(); + let mut db = JournalDB::new_temp(); engine.spec().ensure_db_good(&mut db); let b = OpenBlock::new(engine.deref(), db, &genesis_header, &vec![genesis_header.hash()], Address::zero(), vec![]).close().seal(vec![]).unwrap(); let orig_bytes = b.rlp_bytes(); let orig_db = b.drain(); - let mut db = OverlayDB::new_temp(); + let mut db = JournalDB::new_temp(); engine.spec().ensure_db_good(&mut db); let e = enact_and_seal(&orig_bytes, engine.deref(), db, &genesis_header, &vec![genesis_header.hash()]).unwrap(); diff --git a/src/blockchain.rs b/src/blockchain.rs index f08d15057..d0b97a0ff 100644 --- a/src/blockchain.rs +++ b/src/blockchain.rs @@ -30,6 +30,11 @@ pub struct CacheSize { pub blocks_blooms: usize } +impl CacheSize { + /// Total amount used by the cache. + fn total(&self) -> usize { self.blocks + self.block_details + self.transaction_addresses + self.block_logs + self.blocks_blooms } +} + /// Information about best block gathered together struct BestBlock { pub hash: H256, @@ -96,6 +101,17 @@ pub trait BlockProvider { } } +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +enum CacheID { + Block(H256), + Extras(ExtrasIndex, H256), +} + +struct CacheManager { + cache_usage: VecDeque>, + in_use: HashSet, +} + /// Structure providing fast access to blockchain data. /// /// **Does not do input data verification.** @@ -113,7 +129,9 @@ pub struct BlockChain { blocks_blooms: RwLock>, extras_db: DB, - blocks_db: DB + blocks_db: DB, + + cache_man: RwLock, } impl BlockProvider for BlockChain { @@ -136,6 +154,8 @@ impl BlockProvider for BlockChain { let opt = self.blocks_db.get(hash) .expect("Low level database error. Some issue with disk?"); + self.note_used(CacheID::Block(hash.clone())); + match opt { Some(b) => { let bytes: Bytes = b.to_vec(); @@ -158,6 +178,10 @@ impl BlockProvider for BlockChain { } } +const COLLECTION_QUEUE_SIZE: usize = 2; +const MIN_CACHE_SIZE: usize = 1; +const MAX_CACHE_SIZE: usize = 1024 * 1024 * 1; + impl BlockChain { /// Create new instance of blockchain from given Genesis /// @@ -197,6 +221,9 @@ impl BlockChain { blocks_path.push("blocks"); let blocks_db = DB::open_default(blocks_path.to_str().unwrap()).unwrap(); + let mut cache_man = CacheManager{cache_usage: VecDeque::new(), in_use: HashSet::new()}; + (0..COLLECTION_QUEUE_SIZE).foreach(|_| cache_man.cache_usage.push_back(HashSet::new())); + let bc = BlockChain { best_block: RwLock::new(BestBlock::new()), blocks: RwLock::new(HashMap::new()), @@ -206,7 +233,8 @@ impl BlockChain { block_logs: RwLock::new(HashMap::new()), blocks_blooms: RwLock::new(HashMap::new()), extras_db: extras_db, - blocks_db: blocks_db + blocks_db: blocks_db, + cache_man: RwLock::new(cache_man), }; // load best block @@ -251,7 +279,7 @@ impl BlockChain { /// Ensure that the best block does indeed have a state_root in the state DB. /// If it doesn't, then rewind down until we find one that does and delete data to ensure that /// later blocks will be reimported. - pub fn ensure_good(&mut self, _state: &OverlayDB) { + pub fn ensure_good(&mut self, _state: &JournalDB) { unimplemented!(); } @@ -497,6 +525,10 @@ impl BlockChain { } } + if let Some(h) = hash.as_h256() { + self.note_used(CacheID::Extras(T::extras_index(), h.clone())); + } + self.extras_db.get_extras(hash).map(| t: T | { let mut write = cache.write().unwrap(); write.insert(hash.clone(), t.clone()); @@ -537,6 +569,56 @@ impl BlockChain { self.block_logs.write().unwrap().squeeze(size.block_logs); self.blocks_blooms.write().unwrap().squeeze(size.blocks_blooms); } + + /// Let the cache system know that a cacheable item has been used. + fn note_used(&self, id: CacheID) { + let mut cache_man = self.cache_man.write().unwrap(); + if !cache_man.cache_usage[0].contains(&id) { + cache_man.cache_usage[0].insert(id.clone()); + if cache_man.in_use.contains(&id) { + if let Some(c) = cache_man.cache_usage.iter_mut().skip(1).find(|e|e.contains(&id)) { + c.remove(&id); + } + } else { + cache_man.in_use.insert(id); + } + } + } + + /// Ticks our cache system and throws out any old data. + pub fn collect_garbage(&self, force: bool) { + // TODO: check time. + let timeout = true; + + let t = self.cache_size().total(); + if t < MIN_CACHE_SIZE || (!timeout && (!force || t < MAX_CACHE_SIZE)) { return; } + + let mut cache_man = self.cache_man.write().unwrap(); + let mut blocks = self.blocks.write().unwrap(); + let mut block_details = self.block_details.write().unwrap(); + let mut block_hashes = self.block_hashes.write().unwrap(); + let mut transaction_addresses = self.transaction_addresses.write().unwrap(); + let mut block_logs = self.block_logs.write().unwrap(); + let mut blocks_blooms = self.blocks_blooms.write().unwrap(); + + for id in cache_man.cache_usage.pop_back().unwrap().into_iter() { + cache_man.in_use.remove(&id); + match id { + CacheID::Block(h) => { blocks.remove(&h); }, + CacheID::Extras(ExtrasIndex::BlockDetails, h) => { block_details.remove(&h); }, + CacheID::Extras(ExtrasIndex::TransactionAddress, h) => { transaction_addresses.remove(&h); }, + CacheID::Extras(ExtrasIndex::BlockLogBlooms, h) => { block_logs.remove(&h); }, + CacheID::Extras(ExtrasIndex::BlocksBlooms, h) => { blocks_blooms.remove(&h); }, + _ => panic!(), + } + } + cache_man.cache_usage.push_front(HashSet::new()); + + // TODO: handle block_hashes properly. + block_hashes.clear(); + + // TODO: m_lastCollection = chrono::system_clock::now(); + } } #[cfg(test)] diff --git a/src/client.rs b/src/client.rs index e02ab37d8..cf57e6a07 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,6 @@ use util::*; use rocksdb::{Options, DB}; -use rocksdb::DBCompactionStyle::DBUniversalCompaction; -use blockchain::{BlockChain, BlockProvider}; +use blockchain::{BlockChain, BlockProvider, CacheSize}; use views::BlockView; use error::*; use header::BlockNumber; @@ -41,6 +40,12 @@ pub struct BlockChainInfo { pub best_block_number: BlockNumber } +impl fmt::Display for BlockChainInfo { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "#{}.{}", self.best_block_number, self.best_block_hash) + } +} + /// Block queue status #[derive(Debug)] pub struct BlockQueueStatus { @@ -100,14 +105,32 @@ pub trait BlockChainClient : Sync + Send { fn chain_info(&self) -> BlockChainInfo; } +#[derive(Default, Clone, Debug, Eq, PartialEq)] +pub struct ClientReport { + pub blocks_imported: usize, + pub transactions_applied: usize, + pub gas_processed: U256, +} + +impl ClientReport { + pub fn accrue_block(&mut self, block: &PreVerifiedBlock) { + self.blocks_imported += 1; + self.transactions_applied += block.transactions.len(); + self.gas_processed += block.header.gas_used; + } +} + /// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue. pub struct Client { chain: Arc>, engine: Arc>, - state_db: OverlayDB, + state_db: JournalDB, queue: BlockQueue, + report: ClientReport, } +const HISTORY: u64 = 1000; + impl Client { /// Create a new client with given spec and DB path. pub fn new(spec: Spec, path: &Path, message_channel: IoChannel ) -> Result { @@ -115,7 +138,7 @@ impl Client { let mut opts = Options::new(); opts.create_if_missing(true); opts.set_max_open_files(256); - opts.set_use_fsync(false); + /*opts.set_use_fsync(false); opts.set_bytes_per_sync(8388608); opts.set_disable_data_sync(false); opts.set_block_cache_size_mb(1024); @@ -130,16 +153,17 @@ impl Client { opts.set_max_background_compactions(4); opts.set_max_background_flushes(4); opts.set_filter_deletes(false); - opts.set_disable_auto_compactions(true); + opts.set_disable_auto_compactions(false);*/ let mut state_path = path.to_path_buf(); state_path.push("state"); let db = DB::open(&opts, state_path.to_str().unwrap()).unwrap(); - let mut state_db = OverlayDB::new(db); + let mut state_db = JournalDB::new(db); let engine = Arc::new(try!(spec.to_engine())); - engine.spec().ensure_db_good(&mut state_db); - state_db.commit().expect("Error commiting genesis state to state DB"); + if engine.spec().ensure_db_good(&mut state_db) { + state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); + } // chain.write().unwrap().ensure_good(&state_db); @@ -148,6 +172,7 @@ impl Client { engine: engine.clone(), state_db: state_db, queue: BlockQueue::new(engine, message_channel), + report: Default::default(), }) } @@ -212,16 +237,34 @@ impl Client { } self.chain.write().unwrap().insert_block(&block.bytes); //TODO: err here? - match result.drain().commit() { + let ancient = if header.number() >= HISTORY { Some(header.number() - HISTORY) } else { None }; + match result.drain().commit(header.number(), &header.hash(), ancient.map(|n|(n, self.chain.read().unwrap().block_hash(n).unwrap()))) { Ok(_) => (), Err(e) => { warn!(target: "client", "State DB commit failed: {:?}", e); return; } } - info!(target: "client", "Imported #{} ({})", header.number(), header.hash()); + self.report.accrue_block(&block); + + trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } } + + /// Get info on the cache. + pub fn cache_info(&self) -> CacheSize { + self.chain.read().unwrap().cache_size() + } + + /// Get the report. + pub fn report(&self) -> ClientReport { + self.report.clone() + } + + /// Tick the client. + pub fn tick(&self) { + self.chain.read().unwrap().collect_garbage(false); + } } impl BlockChainClient for Client { diff --git a/src/ethereum/ethash.rs b/src/ethereum/ethash.rs index 8a86cf4e5..f559446ce 100644 --- a/src/ethereum/ethash.rs +++ b/src/ethereum/ethash.rs @@ -215,7 +215,7 @@ fn on_close_block() { use super::*; let engine = new_morden().to_engine().unwrap(); let genesis_header = engine.spec().genesis_header(); - let mut db = OverlayDB::new_temp(); + let mut db = JournalDB::new_temp(); engine.spec().ensure_db_good(&mut db); let last_hashes = vec![genesis_header.hash()]; let b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]); @@ -228,7 +228,7 @@ fn on_close_block_with_uncle() { use super::*; let engine = new_morden().to_engine().unwrap(); let genesis_header = engine.spec().genesis_header(); - let mut db = OverlayDB::new_temp(); + let mut db = JournalDB::new_temp(); engine.spec().ensure_db_good(&mut db); let last_hashes = vec![genesis_header.hash()]; let mut b = OpenBlock::new(engine.deref(), db, &genesis_header, &last_hashes, Address::zero(), vec![]); diff --git a/src/ethereum/mod.rs b/src/ethereum/mod.rs index 603a64e7d..e97ac79a3 100644 --- a/src/ethereum/mod.rs +++ b/src/ethereum/mod.rs @@ -40,7 +40,7 @@ mod tests { fn ensure_db_good() { let engine = new_morden().to_engine().unwrap(); let genesis_header = engine.spec().genesis_header(); - let mut db = OverlayDB::new_temp(); + let mut db = JournalDB::new_temp(); engine.spec().ensure_db_good(&mut db); let s = State::from_existing(db.clone(), genesis_header.state_root.clone(), engine.account_start_nonce()); assert_eq!(s.balance(&address_from_hex("0000000000000000000000000000000000000001")), U256::from(1u64)); diff --git a/src/extras.rs b/src/extras.rs index ed0032698..8052af791 100644 --- a/src/extras.rs +++ b/src/extras.rs @@ -3,7 +3,7 @@ use header::BlockNumber; use rocksdb::{DB, Writable}; /// Represents index of extra data in database -#[derive(Copy, Clone)] +#[derive(Copy, Debug, Hash, Eq, PartialEq, Clone)] pub enum ExtrasIndex { BlockDetails = 0, BlockHash = 1, @@ -59,6 +59,7 @@ impl ExtrasReadable for DB { /// Implementations should convert arbitrary type to database key slice pub trait ExtrasSliceConvertable { fn to_extras_slice(&self, i: ExtrasIndex) -> H264; + fn as_h256(&self) -> Option<&H256> { None } } impl ExtrasSliceConvertable for H256 { @@ -67,6 +68,7 @@ impl ExtrasSliceConvertable for H256 { slice[32] = i as u8; slice } + fn as_h256(&self) -> Option<&H256> { Some(self) } } impl ExtrasSliceConvertable for U256 { diff --git a/src/spec.rs b/src/spec.rs index b174b0e9f..e93b460c8 100644 --- a/src/spec.rs +++ b/src/spec.rs @@ -220,7 +220,7 @@ impl FromJson for Spec { impl Spec { /// Ensure that the given state DB has the trie nodes in for the genesis state. - pub fn ensure_db_good(&self, db: &mut HashDB) { + pub fn ensure_db_good(&self, db: &mut HashDB) -> bool { if !db.contains(&self.state_root()) { info!("Populating genesis state..."); let mut root = H256::new(); @@ -232,7 +232,8 @@ impl Spec { } assert!(db.contains(&self.state_root())); info!("Genesis state is ready"); - } + true + } else { false } } /// Create a new Spec from a JSON UTF-8 data resource `data`. diff --git a/src/state.rs b/src/state.rs index a186d6cd6..e325b8d34 100644 --- a/src/state.rs +++ b/src/state.rs @@ -10,7 +10,7 @@ pub type ApplyResult = Result; /// Representation of the entire state of all accounts in the system. #[derive(Clone)] pub struct State { - db: OverlayDB, + db: JournalDB, root: H256, cache: RefCell>>, @@ -19,7 +19,7 @@ pub struct State { impl State { /// Creates new state with empty state root - pub fn new(mut db: OverlayDB, account_start_nonce: U256) -> State { + pub fn new(mut db: JournalDB, account_start_nonce: U256) -> State { let mut root = H256::new(); { // init trie and reset root too null @@ -35,7 +35,7 @@ impl State { } /// Creates new state with existing state root - pub fn from_existing(db: OverlayDB, root: H256, account_start_nonce: U256) -> State { + pub fn from_existing(db: JournalDB, root: H256, account_start_nonce: U256) -> State { { // trie should panic! if root does not exist let _ = SecTrieDB::new(&db, &root); @@ -51,11 +51,11 @@ impl State { /// Create temporary state object pub fn new_temp() -> State { - Self::new(OverlayDB::new_temp(), U256::from(0u8)) + Self::new(JournalDB::new_temp(), U256::from(0u8)) } /// Destroy the current object and return root and database. - pub fn drop(self) -> (H256, OverlayDB) { + pub fn drop(self) -> (H256, JournalDB) { (self.root, self.db) } @@ -65,7 +65,7 @@ impl State { } /// Expose the underlying database; good to use for calling `state.db().commit()`. - pub fn db(&mut self) -> &mut OverlayDB { + pub fn db(&mut self) -> &mut JournalDB { &mut self.db } diff --git a/src/sync/chain.rs b/src/sync/chain.rs index 15fe6d1f9..dba9ae08d 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -471,7 +471,7 @@ impl ChainSync { pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { trace!(target: "sync", "== Disconnecting {}", peer); if self.peers.contains_key(&peer) { - info!(target: "sync", "Disconneced {}:{}", peer, io.peer_info(peer)); + info!(target: "sync", "Disconnected {}:{}", peer, io.peer_info(peer)); self.clear_peer_download(peer); self.peers.remove(&peer); self.continue_sync(io); diff --git a/util/src/error.rs b/util/src/error.rs index 04f7b96ce..d9687183d 100644 --- a/util/src/error.rs +++ b/util/src/error.rs @@ -22,6 +22,7 @@ pub enum UtilError { BaseData(BaseDataError), Network(NetworkError), Decoder(DecoderError), + SimpleString(String), BadSize, } @@ -73,6 +74,12 @@ impl From<::rlp::DecoderError> for UtilError { } } +impl From for UtilError { + fn from(err: String) -> UtilError { + UtilError::SimpleString(err) + } +} + // TODO: uncomment below once https://github.com/rust-lang/rust/issues/27336 sorted. /*#![feature(concat_idents)] macro_rules! assimilate { diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs new file mode 100644 index 000000000..ada9c0d2b --- /dev/null +++ b/util/src/journaldb.rs @@ -0,0 +1,214 @@ +//! Disk-backed HashDB implementation. + +use std::env; +use common::*; +use rlp::*; +use hashdb::*; +use overlaydb::*; +use rocksdb::{DB, Writable}; + +#[derive(Clone)] +/// Implementation of the HashDB trait for a disk-backed database with a memory overlay +/// and latent-removal semantics. +/// +/// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to +/// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect +/// immediately. Rather some age (based on a linear but arbitrary metric) must pass before +/// the removals actually take effect. +pub struct JournalDB { + forward: OverlayDB, + backing: Arc, + inserts: Vec, + removes: Vec, +} + +impl JournalDB { + /// Create a new instance given a `backing` database. + pub fn new(backing: DB) -> JournalDB { + let db = Arc::new(backing); + JournalDB { + forward: OverlayDB::new_with_arc(db.clone()), + backing: db, + inserts: vec![], + removes: vec![], + } + } + + /// Create a new instance with an anonymous temporary database. + pub fn new_temp() -> JournalDB { + let mut dir = env::temp_dir(); + dir.push(H32::random().hex()); + Self::new(DB::open_default(dir.to_str().unwrap()).unwrap()) + } + + /// Get a clone of the overlay db portion of this. + pub fn to_overlaydb(&self) -> OverlayDB { self.forward.clone() } + + /// Commit all recent insert operations and historical removals from the old era + /// to the backing database. + pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + // journal format: + // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] + // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] + // [era, n] => [ ... ] + + // TODO: store last_era, reclaim_period. + + // when we make a new commit, we journal the inserts and removes. + // for each end_era that we journaled that we are no passing by, + // we remove all of its removes assuming it is canonical and all + // of its inserts otherwise. + + // record new commit's details. + { + let mut index = 0usize; + let mut last; + + while try!(self.backing.get({ + let mut r = RlpStream::new_list(2); + r.append(&now); + r.append(&index); + last = r.drain(); + &last + })).is_some() { + index += 1; + } + + let mut r = RlpStream::new_list(3); + r.append(id); + r.append(&self.inserts); + r.append(&self.removes); + try!(self.backing.put(&last, r.as_raw())); + self.inserts.clear(); + self.removes.clear(); + } + + // apply old commits' details + if let Some((end_era, canon_id)) = end { + let mut index = 0usize; + let mut last; + while let Some(rlp_data) = try!(self.backing.get({ + let mut r = RlpStream::new_list(2); + r.append(&end_era); + r.append(&index); + last = r.drain(); + &last + })) { + let rlp = Rlp::new(&rlp_data); + let to_remove: Vec = rlp.val_at(if canon_id == rlp.val_at(0) {2} else {1}); + for i in to_remove.iter() { + self.forward.remove(i); + } + try!(self.backing.delete(&last)); + trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, to_remove.len()); + index += 1; + } + } + + self.forward.commit() + } + + /// Revert all operations on this object (i.e. `insert()`s and `removes()`s) since the + /// last `commit()`. + pub fn revert(&mut self) { self.forward.revert(); self.removes.clear(); } +} + +impl HashDB for JournalDB { + fn keys(&self) -> HashMap { self.forward.keys() } + fn lookup(&self, key: &H256) -> Option<&[u8]> { self.forward.lookup(key) } + fn exists(&self, key: &H256) -> bool { self.forward.exists(key) } + fn insert(&mut self, value: &[u8]) -> H256 { let r = self.forward.insert(value); self.inserts.push(r.clone()); r } + fn emplace(&mut self, key: H256, value: Bytes) { self.inserts.push(key.clone()); self.forward.emplace(key, value); } + fn kill(&mut self, key: &H256) { self.removes.push(key.clone()); } +} + +#[cfg(test)] +mod tests { + use common::*; + use super::*; + use hashdb::*; + + #[test] + fn long_history() { + // history is 3 + let mut jdb = JournalDB::new_temp(); + let h = jdb.insert(b"foo"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + assert!(jdb.exists(&h)); + jdb.remove(&h); + jdb.commit(1, &b"1".sha3(), None).unwrap(); + assert!(jdb.exists(&h)); + jdb.commit(2, &b"2".sha3(), None).unwrap(); + assert!(jdb.exists(&h)); + jdb.commit(3, &b"3".sha3(), Some((0, b"0".sha3()))).unwrap(); + assert!(jdb.exists(&h)); + jdb.commit(4, &b"4".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(!jdb.exists(&h)); + } + + #[test] + fn complex() { + // history is 1 + let mut jdb = JournalDB::new_temp(); + + let foo = jdb.insert(b"foo"); + let bar = jdb.insert(b"bar"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + + jdb.remove(&foo); + jdb.remove(&bar); + let baz = jdb.insert(b"baz"); + jdb.commit(1, &b"1".sha3(), Some((0, b"0".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + assert!(jdb.exists(&baz)); + + let foo = jdb.insert(b"foo"); + jdb.remove(&baz); + jdb.commit(2, &b"2".sha3(), Some((1, b"1".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(!jdb.exists(&bar)); + assert!(jdb.exists(&baz)); + + jdb.remove(&foo); + jdb.commit(3, &b"3".sha3(), Some((2, b"2".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(!jdb.exists(&bar)); + assert!(!jdb.exists(&baz)); + + jdb.commit(4, &b"4".sha3(), Some((3, b"3".sha3()))).unwrap(); + assert!(!jdb.exists(&foo)); + assert!(!jdb.exists(&bar)); + assert!(!jdb.exists(&baz)); + } + + #[test] + fn fork() { + // history is 1 + let mut jdb = JournalDB::new_temp(); + + let foo = jdb.insert(b"foo"); + let bar = jdb.insert(b"bar"); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + + jdb.remove(&foo); + let baz = jdb.insert(b"baz"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.remove(&bar); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + + assert!(jdb.exists(&foo)); + assert!(jdb.exists(&bar)); + assert!(jdb.exists(&baz)); + + jdb.commit(2, &b"2b".sha3(), Some((1, b"1b".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + assert!(!jdb.exists(&baz)); + assert!(!jdb.exists(&bar)); + } +} diff --git a/util/src/lib.rs b/util/src/lib.rs index 4bc47e61c..204266c54 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -69,6 +69,7 @@ pub mod sha3; pub mod hashdb; pub mod memorydb; pub mod overlaydb; +pub mod journaldb; pub mod math; pub mod chainfilter; pub mod crypto; @@ -88,6 +89,7 @@ pub use rlp::*; pub use hashdb::*; pub use memorydb::*; pub use overlaydb::*; +pub use journaldb::*; pub use math::*; pub use chainfilter::*; pub use crypto::*; diff --git a/util/src/overlaydb.rs b/util/src/overlaydb.rs index 1006cd28c..e8492091f 100644 --- a/util/src/overlaydb.rs +++ b/util/src/overlaydb.rs @@ -15,11 +15,11 @@ use rocksdb::{DB, Writable, IteratorMode}; #[derive(Clone)] /// Implementation of the HashDB trait for a disk-backed database with a memory overlay. /// -/// The operations `insert()` and `kill()` take place on the memory overlay; batches of +/// The operations `insert()` and `remove()` take place on the memory overlay; batches of /// such operations may be flushed to the disk-backed DB with `commit()` or discarded with /// `revert()`. /// -/// `lookup()` and `exists()` maintain normal behaviour - all `insert()` and `kill()` +/// `lookup()` and `contains()` maintain normal behaviour - all `insert()` and `remove()` /// queries have an immediate effect in terms of these functions. pub struct OverlayDB { overlay: MemoryDB, @@ -28,8 +28,11 @@ pub struct OverlayDB { impl OverlayDB { /// Create a new instance of OverlayDB given a `backing` database. - pub fn new(backing: DB) -> OverlayDB { - OverlayDB{ overlay: MemoryDB::new(), backing: Arc::new(backing) } + pub fn new(backing: DB) -> OverlayDB { Self::new_with_arc(Arc::new(backing)) } + + /// Create a new instance of OverlayDB given a `backing` database. + pub fn new_with_arc(backing: Arc) -> OverlayDB { + OverlayDB{ overlay: MemoryDB::new(), backing: backing } } /// Create a new instance of OverlayDB with an anonymous temporary database. @@ -68,11 +71,10 @@ impl OverlayDB { /// ``` pub fn commit(&mut self) -> Result { let mut ret = 0u32; + let mut deletes = 0usize; for i in self.overlay.drain().into_iter() { let (key, (value, rc)) = i; - // until we figure out state trie pruning, only commit stuff when it has a strictly positive delkta of RCs - - // this prevents RCs being reduced to 0 where the DB would pretent that the node had been removed. - if rc > 0 { + if rc != 0 { match self.payload(&key) { Some(x) => { let (back_value, back_rc) = x; @@ -80,7 +82,7 @@ impl OverlayDB { if total_rc < 0 { return Err(From::from(BaseDataError::NegativelyReferencedHash)); } - self.put_payload(&key, (back_value, total_rc as u32)); + deletes += if self.put_payload(&key, (back_value, total_rc as u32)) {1} else {0}; } None => { if rc < 0 { @@ -92,6 +94,7 @@ impl OverlayDB { ret += 1; } } + trace!("OverlayDB::commit() deleted {} nodes", deletes); Ok(ret) } @@ -129,11 +132,18 @@ impl OverlayDB { } /// Get the refs and value of the given key. - fn put_payload(&self, key: &H256, payload: (Bytes, u32)) { - let mut s = RlpStream::new_list(2); - s.append(&payload.1); - s.append(&payload.0); - self.backing.put(&key.bytes(), &s.out()).expect("Low-level database error. Some issue with your hard disk?"); + fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool { + if payload.1 > 0 { + let mut s = RlpStream::new_list(2); + s.append(&payload.1); + s.append(&payload.0); + self.backing.put(&key.bytes(), s.as_raw()).expect("Low-level database error. Some issue with your hard disk?"); + false + } else { + self.backing.delete(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?"); + true + } + } } diff --git a/util/src/rlp/rlpstream.rs b/util/src/rlp/rlpstream.rs index b8954ae6f..a30978f24 100644 --- a/util/src/rlp/rlpstream.rs +++ b/util/src/rlp/rlpstream.rs @@ -142,6 +142,14 @@ impl RlpStream { self.note_appended(1); } } + + /// Drain the object and return the underlying ElasticArray. + pub fn drain(self) -> ElasticArray1024 { + match self.is_finished() { + true => self.encoder.bytes, + false => panic!() + } + } } struct BasicEncoder {