diff --git a/Cargo.lock b/Cargo.lock index 4659ba0d4..4e46da6b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ [root] name = "parity" -version = "1.3.8" +version = "1.3.9" dependencies = [ "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)", @@ -20,7 +20,7 @@ dependencies = [ "ethcore-logger 1.3.0", "ethcore-rpc 1.3.0", "ethcore-signer 1.3.0", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "ethsync 1.3.0", "fdlimit 0.1.0", "hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -35,7 +35,7 @@ dependencies = [ "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "syntex 0.36.0 (registry+https://github.com/rust-lang/crates.io-index)", + "syntex 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -270,7 +270,7 @@ dependencies = [ "ethcore-ipc 1.3.0", "ethcore-ipc-codegen 1.3.0", "ethcore-ipc-nano 1.3.0", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "ethjson 0.1.0", "ethstore 0.1.0", "evmjit 1.3.0", @@ -294,7 +294,7 @@ version = "1.3.0" dependencies = [ "clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-rpc 1.3.0", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 6.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git?branch=beta)", @@ -336,7 +336,7 @@ name = "ethcore-ipc" version = "1.3.0" dependencies = [ "ethcore-devtools 1.3.0", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -381,7 +381,7 @@ dependencies = [ "ethcore-ipc 1.3.0", "ethcore-ipc-codegen 1.3.0", "ethcore-ipc-nano 1.3.0", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -393,7 +393,7 @@ name = "ethcore-logger" version = "1.3.0" dependencies = [ "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -408,7 +408,7 @@ dependencies = [ "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-devtools 1.3.0", "ethcore-io 1.3.0", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "igd 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -432,7 +432,7 @@ dependencies = [ "ethcore-devtools 1.3.0", "ethcore-io 1.3.0", "ethcore-ipc 1.3.0", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "ethjson 0.1.0", "ethsync 1.3.0", "json-ipc-server 0.2.4 (git+https://github.com/ethcore/json-ipc-server.git?branch=beta)", @@ -455,7 +455,7 @@ dependencies = [ "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-io 1.3.0", "ethcore-rpc 1.3.0", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-dapps-signer 1.4.0 (git+https://github.com/ethcore/parity-ui.git)", @@ -466,7 +466,7 @@ dependencies = [ [[package]] name = "ethcore-util" -version = "1.3.8" +version = "1.3.9" dependencies = [ "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "arrayvec 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", @@ -499,7 +499,7 @@ dependencies = [ name = "ethjson" version = "0.1.0" dependencies = [ - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", "serde_codegen 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -547,7 +547,7 @@ dependencies = [ "ethcore-ipc-codegen 1.3.0", "ethcore-ipc-nano 1.3.0", "ethcore-network 1.3.0", - "ethcore-util 1.3.8", + "ethcore-util 1.3.9", "heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1390,14 +1390,6 @@ dependencies = [ "syntex_syntax 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "syntex" -version = "0.36.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "syntex_syntax 0.36.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "syntex_syntax" version = "0.33.0" @@ -1411,19 +1403,6 @@ dependencies = [ "unicode-xid 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "syntex_syntax" -version = "0.36.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "bitflags 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", - "term 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", - "unicode-xid 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "table" version = "0.1.0" @@ -1765,9 +1744,7 @@ dependencies = [ "checksum stable-heap 0.1.0 (git+https://github.com/carllerche/stable-heap?rev=3c5cd1ca47)" = "" "checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e" "checksum syntex 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "393b6dd0889df2b064beeea954cfda6bc2571604ac460deeae0fed55a53988af" -"checksum syntex 0.36.0 (registry+https://github.com/rust-lang/crates.io-index)" = "61dc0bbe1e46dcd53ec50d6600e750152c22e0e9352cadbd413e86fb847ae899" "checksum syntex_syntax 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44bded3cabafc65c90b663b1071bd2d198a9ab7515e6ce729e4570aaf53c407e" -"checksum syntex_syntax 0.36.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2b92a8c33fad2fa99e14fe499ec17e82b6c6496a7a38a499f33b584ffa1886fa" "checksum target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c63f48baada5c52e65a29eef93ab4f8982681b67f9e8d29c7b05abcfec2b9ffe" "checksum term 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "f2077e54d38055cf1ca0fd7933a2e00cd3ec8f6fed352b2a377f06dcdaaf3281" "checksum termios 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a" diff --git a/Cargo.toml b/Cargo.toml index 1a127bb3d..b8b7bd41d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] description = "Ethcore client." name = "parity" -version = "1.3.8" +version = "1.3.9" license = "GPL-3.0" authors = ["Ethcore "] build = "build.rs" diff --git a/ethcore/res/ethereum/tests b/ethcore/res/ethereum/tests index 862b4e3d4..97066e40c 160000 --- a/ethcore/res/ethereum/tests +++ b/ethcore/res/ethereum/tests @@ -1 +1 @@ -Subproject commit 862b4e3d4a9a7141af1b4aaf7dfe228a6a294614 +Subproject commit 97066e40ccd061f727deb5cd860e4d9135aa2551 diff --git a/ethcore/src/account.rs b/ethcore/src/account.rs index fa6fedae6..8511a5b4b 100644 --- a/ethcore/src/account.rs +++ b/ethcore/src/account.rs @@ -288,6 +288,15 @@ impl Account { /// Determine whether there are any un-`commit()`-ed storage-setting operations. pub fn storage_is_clean(&self) -> bool { self.storage_changes.is_empty() } + /// Check if account has zero nonce, balance, no code and no storage. + pub fn is_empty(&self) -> bool { + self.storage_changes.is_empty() && + self.balance.is_zero() && + self.nonce.is_zero() && + self.storage_root == SHA3_NULL_RLP && + self.code_hash == SHA3_EMPTY + } + #[cfg(test)] /// return the storage root associated with this account or None if it has been altered via the overlay. pub fn storage_root(&self) -> Option<&H256> { if self.storage_is_clean() {Some(&self.storage_root)} else {None} } diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 89a620493..b80ab8cde 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -17,7 +17,7 @@ //! A queue of blocks. Sits between network or other I/O and the `BlockChain`. //! Sorts them ready for blockchain insertion. use std::thread::{JoinHandle, self}; -use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}; use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use util::*; use io::*; @@ -76,6 +76,13 @@ impl BlockQueueInfo { } } +// the internal queue sizes. +struct Sizes { + unverified: AtomicUsize, + verifying: AtomicUsize, + verified: AtomicUsize, +} + /// A queue of blocks. Sits between network or other I/O and the `BlockChain`. /// Sorts them ready for blockchain insertion. pub struct BlockQueue { @@ -110,7 +117,21 @@ struct QueueSignal { impl QueueSignal { #[cfg_attr(feature="dev", allow(bool_comparison))] - fn set(&self) { + fn set_sync(&self) { + // Do not signal when we are about to close + if self.deleting.load(AtomicOrdering::Relaxed) { + return; + } + + if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { + if let Err(e) = self.message_channel.send_sync(ClientIoMessage::BlockVerified) { + debug!("Error sending BlockVerified message: {:?}", e); + } + } + } + + #[cfg_attr(feature="dev", allow(bool_comparison))] + fn set_async(&self) { // Do not signal when we are about to close if self.deleting.load(AtomicOrdering::Relaxed) { return; @@ -131,11 +152,12 @@ impl QueueSignal { struct Verification { // All locks must be captured in the order declared here. unverified: Mutex>, - verified: Mutex>, verifying: Mutex>, + verified: Mutex>, bad: Mutex>, more_to_verify: SMutex<()>, empty: SMutex<()>, + sizes: Sizes, } impl BlockQueue { @@ -143,12 +165,16 @@ impl BlockQueue { pub fn new(config: BlockQueueConfig, engine: Arc, message_channel: IoChannel) -> BlockQueue { let verification = Arc::new(Verification { unverified: Mutex::new(VecDeque::new()), - verified: Mutex::new(VecDeque::new()), verifying: Mutex::new(VecDeque::new()), + verified: Mutex::new(VecDeque::new()), bad: Mutex::new(HashSet::new()), more_to_verify: SMutex::new(()), empty: SMutex::new(()), - + sizes: Sizes { + unverified: AtomicUsize::new(0), + verifying: AtomicUsize::new(0), + verified: AtomicUsize::new(0), + } }); let more_to_verify = Arc::new(SCondvar::new()); let deleting = Arc::new(AtomicBool::new(false)); @@ -221,16 +247,18 @@ impl BlockQueue { } let mut verifying = verification.verifying.lock(); let block = unverified.pop_front().unwrap(); + verification.sizes.unverified.fetch_sub(block.heap_size_of_children(), AtomicOrdering::SeqCst); verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); block }; let block_hash = block.header.hash(); - match verify_block_unordered(block.header, block.bytes, &*engine) { + let is_ready = match verify_block_unordered(block.header, block.bytes, &*engine) { Ok(verified) => { let mut verifying = verification.verifying.lock(); for e in verifying.iter_mut() { if e.hash == block_hash { + verification.sizes.verifying.fetch_add(verified.heap_size_of_children(), AtomicOrdering::SeqCst); e.block = Some(verified); break; } @@ -239,8 +267,10 @@ impl BlockQueue { // we're next! let mut verified = verification.verified.lock(); let mut bad = verification.bad.lock(); - BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); - ready.set(); + BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes); + true + } else { + false } }, Err(err) => { @@ -250,23 +280,39 @@ impl BlockQueue { warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); bad.insert(block_hash.clone()); verifying.retain(|e| e.hash != block_hash); - BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); - ready.set(); + if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash { + BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes); + true + } else { + false + } } + }; + if is_ready { + // Import the block immediately + ready.set_sync(); } } } - fn drain_verifying(verifying: &mut VecDeque, verified: &mut VecDeque, bad: &mut HashSet) { + fn drain_verifying(verifying: &mut VecDeque, verified: &mut VecDeque, bad: &mut HashSet, sizes: &Sizes) { + let mut removed_size = 0; + let mut inserted_size = 0; while !verifying.is_empty() && verifying.front().unwrap().block.is_some() { let block = verifying.pop_front().unwrap().block.unwrap(); - if bad.contains(&block.header.parent_hash) { + let size = block.heap_size_of_children(); + removed_size += size; + + if bad.contains(&block.header.parent_hash()) { bad.insert(block.header.hash()); - } - else { + } else { + inserted_size += size; verified.push_back(block); } } + + sizes.verifying.fetch_sub(removed_size, AtomicOrdering::SeqCst); + sizes.verified.fetch_add(inserted_size, AtomicOrdering::SeqCst); } /// Clear the queue and stop verification activity. @@ -277,6 +323,12 @@ impl BlockQueue { unverified.clear(); verifying.clear(); verified.clear(); + + let sizes = &self.verification.sizes; + sizes.unverified.store(0, AtomicOrdering::Release); + sizes.verifying.store(0, AtomicOrdering::Release); + sizes.verified.store(0, AtomicOrdering::Release); + self.processing.write().clear(); } @@ -322,7 +374,9 @@ impl BlockQueue { match verify_block_basic(&header, &bytes, &*self.engine) { Ok(()) => { self.processing.write().insert(h.clone()); - self.verification.unverified.lock().push_back(UnverifiedBlock { header: header, bytes: bytes }); + let block = UnverifiedBlock { header: header, bytes: bytes }; + self.verification.sizes.unverified.fetch_add(block.heap_size_of_children(), AtomicOrdering::SeqCst); + self.verification.unverified.lock().push_back(block); self.more_to_verify.notify_all(); Ok(h) }, @@ -350,26 +404,32 @@ impl BlockQueue { } let mut new_verified = VecDeque::new(); + let mut removed_size = 0; for block in verified.drain(..) { if bad.contains(&block.header.parent_hash) { + removed_size += block.heap_size_of_children(); bad.insert(block.header.hash()); processing.remove(&block.header.hash()); } else { new_verified.push_back(block); } } + + self.verification.sizes.verified.fetch_sub(removed_size, AtomicOrdering::SeqCst); *verified = new_verified; } - /// Mark given block as processed - pub fn mark_as_good(&self, block_hashes: &[H256]) { - if block_hashes.is_empty() { - return; + /// Mark given item as processed. + /// Returns true if the queue becomes empty. + pub fn mark_as_good(&self, hashes: &[H256]) -> bool { + if hashes.is_empty() { + return self.processing.read().is_empty(); } let mut processing = self.processing.write(); - for hash in block_hashes { + for hash in hashes { processing.remove(hash); } + processing.is_empty() } /// Removes up to `max` verified blocks from the queue @@ -379,28 +439,34 @@ impl BlockQueue { let mut result = Vec::with_capacity(count); for _ in 0..count { let block = verified.pop_front().unwrap(); + self.verification.sizes.verified.fetch_sub(block.heap_size_of_children(), AtomicOrdering::SeqCst); result.push(block); } self.ready_signal.reset(); if !verified.is_empty() { - self.ready_signal.set(); + self.ready_signal.set_async(); } result } /// Get queue status. pub fn queue_info(&self) -> BlockQueueInfo { + use std::mem::size_of; let (unverified_len, unverified_bytes) = { - let v = self.verification.unverified.lock(); - (v.len(), v.heap_size_of_children()) + let len = self.verification.unverified.lock().len(); + let size = self.verification.sizes.unverified.load(AtomicOrdering::Acquire); + + (len, size + len * size_of::()) }; let (verifying_len, verifying_bytes) = { - let v = self.verification.verifying.lock(); - (v.len(), v.heap_size_of_children()) + let len = self.verification.verifying.lock().len(); + let size = self.verification.sizes.verifying.load(AtomicOrdering::Acquire); + (len, size + len * size_of::()) }; let (verified_len, verified_bytes) = { - let v = self.verification.verified.lock(); - (v.len(), v.heap_size_of_children()) + let len = self.verification.verified.lock().len(); + let size = self.verification.sizes.verified.load(AtomicOrdering::Acquire); + (len, size + len * size_of::()) }; BlockQueueInfo { unverified_queue_size: unverified_len, @@ -408,12 +474,9 @@ impl BlockQueue { verified_queue_size: verified_len, max_queue_size: self.max_queue_size, max_mem_use: self.max_mem_use, - mem_used: - unverified_bytes - + verifying_bytes - + verified_bytes - // TODO: https://github.com/servo/heapsize/pull/50 - //+ self.processing.read().heap_size_of_children(), + mem_used: unverified_bytes + + verifying_bytes + + verified_bytes } } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index c8519c365..28e5e7d7d 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -355,16 +355,19 @@ impl Client { /// This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self) -> usize { - let max_blocks_to_import = 64; - let (imported_blocks, import_results, invalid_blocks, imported, duration) = { + let max_blocks_to_import = 4; + let (imported_blocks, import_results, invalid_blocks, imported, duration, is_empty) = { let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); let mut invalid_blocks = HashSet::new(); let mut import_results = Vec::with_capacity(max_blocks_to_import); let _import_lock = self.import_lock.lock(); + let blocks = self.block_queue.drain(max_blocks_to_import); + if blocks.is_empty() { + return 0; + } let _timer = PerfTimer::new("import_verified_blocks"); let start = precise_time_ns(); - let blocks = self.block_queue.drain(max_blocks_to_import); for block in blocks { let header = &block.header; @@ -393,23 +396,19 @@ impl Client { let imported = imported_blocks.len(); let invalid_blocks = invalid_blocks.into_iter().collect::>(); - { - if !invalid_blocks.is_empty() { - self.block_queue.mark_as_bad(&invalid_blocks); - } - if !imported_blocks.is_empty() { - self.block_queue.mark_as_good(&imported_blocks); - } + if !invalid_blocks.is_empty() { + self.block_queue.mark_as_bad(&invalid_blocks); } + let is_empty = self.block_queue.mark_as_good(&imported_blocks); let duration_ns = precise_time_ns() - start; - (imported_blocks, import_results, invalid_blocks, imported, duration_ns) + (imported_blocks, import_results, invalid_blocks, imported, duration_ns, is_empty) }; { - if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() { + if !imported_blocks.is_empty() && is_empty { let (enacted, retracted) = self.calculate_enacted_retracted(&import_results); - if self.queue_info().is_empty() { + if is_empty { self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted); } diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index a6bf13637..4c9b9d9b5 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -278,6 +278,7 @@ impl Miner { trace!(target: "miner", "done recalibration."); } + let _timer = PerfTimer::new("prepare_block"); let (transactions, mut open_block, original_work_hash) = { let transactions = {self.transaction_queue.lock().top_transactions()}; let mut sealing_work = self.sealing_work.lock(); diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 21d03e51a..51cc82045 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -27,7 +27,7 @@ use ids::BlockID; use views::BlockView; use super::state_db::StateDB; -use util::{Bytes, Hashable, HashDB, snappy, TrieDB, TrieDBMut, TrieMut, BytesConvertable}; +use util::{Bytes, Hashable, HashDB, snappy, TrieDB, TrieDBMut, TrieMut, BytesConvertable, U256, Uint}; use util::Mutex; use util::hash::{FixedHash, H256}; use util::journaldb::{self, Algorithm, JournalDB}; @@ -39,6 +39,8 @@ use self::account::Account; use self::block::AbridgedBlock; use self::io::SnapshotWriter; +use super::account::Account as StateAccount; + use crossbeam::{scope, ScopedJoinHandle}; use rand::{Rng, OsRng}; @@ -417,6 +419,7 @@ impl StateRebuilder { /// Feed an uncompressed state chunk into the rebuilder. pub fn feed(&mut self, chunk: &[u8]) -> Result<(), ::error::Error> { let rlp = UntrustedRlp::new(chunk); + let empty_rlp = StateAccount::new_basic(U256::zero(), U256::zero()).rlp(); let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect(); let mut pairs = Vec::with_capacity(rlp.item_count()); let backing = self.db.backing().clone(); @@ -464,7 +467,9 @@ impl StateRebuilder { }; for (hash, thin_rlp) in pairs { - bloom.set(hash.as_slice()); + if &thin_rlp[..] != &empty_rlp[..] { + bloom.set(hash.as_slice()); + } try!(account_trie.insert(&hash, &thin_rlp)); } } diff --git a/ethcore/src/state.rs b/ethcore/src/state.rs index f77ba023f..d9f6e540c 100644 --- a/ethcore/src/state.rs +++ b/ethcore/src/state.rs @@ -335,18 +335,20 @@ impl State { /// Determine whether an account exists. pub fn exists(&self, a: &Address) -> bool { - self.ensure_cached(a, RequireCache::None, |a| a.is_some()) + // Bloom filter does not contain empty accounts, so it is important here to + // check if account exists in the database directly before EIP-158 is in effect. + self.ensure_cached(a, RequireCache::None, false, |a| a.is_some()) } /// Get the balance of account `a`. pub fn balance(&self, a: &Address) -> U256 { - self.ensure_cached(a, RequireCache::None, + self.ensure_cached(a, RequireCache::None, true, |a| a.as_ref().map_or(U256::zero(), |account| *account.balance())) } /// Get the nonce of account `a`. pub fn nonce(&self, a: &Address) -> U256 { - self.ensure_cached(a, RequireCache::None, + self.ensure_cached(a, RequireCache::None, true, |a| a.as_ref().map_or(self.account_start_nonce, |account| *account.nonce())) } @@ -403,18 +405,18 @@ impl State { /// Get accounts' code. pub fn code(&self, a: &Address) -> Option> { - self.ensure_cached(a, RequireCache::Code, + self.ensure_cached(a, RequireCache::Code, true, |a| a.as_ref().map_or(None, |a| a.code().clone())) } pub fn code_hash(&self, a: &Address) -> H256 { - self.ensure_cached(a, RequireCache::None, + self.ensure_cached(a, RequireCache::None, true, |a| a.as_ref().map_or(SHA3_EMPTY, |a| a.code_hash())) } /// Get accounts' code size. pub fn code_size(&self, a: &Address) -> Option { - self.ensure_cached(a, RequireCache::CodeSize, + self.ensure_cached(a, RequireCache::CodeSize, true, |a| a.as_ref().and_then(|a| a.code_size())) } @@ -492,7 +494,9 @@ impl State { for (address, ref mut a) in accounts.iter_mut().filter(|&(_, ref a)| a.is_dirty()) { match a.account { Some(ref mut account) => { - db.note_account_bloom(&address); + if !account.is_empty() { + db.note_account_bloom(&address); + } let mut account_db = AccountDBMut::from_hash(db.as_hashdb_mut(), account.address_hash(address)); account.commit_storage(trie_factory, &mut account_db); account.commit_code(&mut account_db); @@ -545,7 +549,6 @@ impl State { pub fn populate_from(&mut self, accounts: PodState) { assert!(self.snapshots.borrow().is_empty()); for (add, acc) in accounts.drain().into_iter() { - self.db.note_account_bloom(&add); self.cache.borrow_mut().insert(add, AccountEntry::new_dirty(Some(Account::from_pod(acc)))); } } @@ -565,7 +568,7 @@ impl State { fn query_pod(&mut self, query: &PodState) { for (address, pod_account) in query.get().into_iter() - .filter(|&(ref a, _)| self.ensure_cached(a, RequireCache::Code, |a| a.is_some())) + .filter(|&(ref a, _)| self.ensure_cached(a, RequireCache::Code, true, |a| a.is_some())) { // needs to be split into two parts for the refcell code here // to work. @@ -601,7 +604,7 @@ impl State { /// Check caches for required data /// First searches for account in the local, then the shared cache. /// Populates local cache if nothing found. - fn ensure_cached(&self, a: &Address, require: RequireCache, f: F) -> U + fn ensure_cached(&self, a: &Address, require: RequireCache, check_bloom: bool, f: F) -> U where F: Fn(Option<&Account>) -> U { // check local cache first if let Some(ref mut maybe_acc) = self.cache.borrow_mut().get_mut(a) { @@ -621,6 +624,8 @@ impl State { match result { Some(r) => r, None => { + // first check bloom if it is not in database for sure + if check_bloom && !self.db.check_account_bloom(a) { return f(None); } // not found in the global cache, get from the DB and insert into local if !self.db.check_account_bloom(a) { return f(None); } let db = self.trie_factory.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR); diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index f1960557d..58e7af005 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -152,7 +152,7 @@ fn can_handle_long_fork() { push_blocks_to_client(client, 49, 1201, 800); push_blocks_to_client(client, 53, 1201, 600); - for _ in 0..40 { + for _ in 0..400 { client.import_verified_blocks(); } assert_eq!(2000, client.chain_info().best_block_number); diff --git a/nsis/installer.nsi b/nsis/installer.nsi index 8682424ca..58a00c6a9 100644 --- a/nsis/installer.nsi +++ b/nsis/installer.nsi @@ -4,7 +4,7 @@ !define DESCRIPTION "Fast, light, robust Ethereum implementation" !define VERSIONMAJOR 1 !define VERSIONMINOR 3 -!define VERSIONBUILD 8 +!define VERSIONBUILD 9 !addplugindir .\ diff --git a/util/Cargo.toml b/util/Cargo.toml index 3ff359da2..e4995bc8a 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -3,7 +3,7 @@ description = "Ethcore utility library" homepage = "http://ethcore.io" license = "GPL-3.0" name = "ethcore-util" -version = "1.3.8" +version = "1.3.9" authors = ["Ethcore "] build = "build.rs" diff --git a/util/io/src/service.rs b/util/io/src/service.rs index a47e84e56..d06d34284 100644 --- a/util/io/src/service.rs +++ b/util/io/src/service.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::thread::{self, JoinHandle}; use std::collections::HashMap; use mio::*; @@ -75,12 +75,12 @@ pub enum IoMessage where Message: Send + Clone + Sized { } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct IoContext where Message: Send + Clone + 'static { +pub struct IoContext where Message: Send + Clone + Sync + 'static { channel: IoChannel, handler: HandlerId, } -impl IoContext where Message: Send + Clone + 'static { +impl IoContext where Message: Send + Clone + Sync + 'static { /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. pub fn new(channel: IoChannel, handler: HandlerId) -> IoContext { IoContext { @@ -165,7 +165,7 @@ struct UserTimer { /// Root IO handler. Manages user handlers, messages and IO timers. pub struct IoManager where Message: Send + Sync { timers: Arc>>, - handlers: Slab>, HandlerId>, + handlers: Arc>, HandlerId>>>, workers: Vec, worker_channel: chase_lev::Worker>, work_ready: Arc, @@ -173,7 +173,11 @@ pub struct IoManager where Message: Send + Sync { impl IoManager where Message: Send + Sync + Clone + 'static { /// Creates a new instance and registers it with the event loop. - pub fn start(panic_handler: Arc, event_loop: &mut EventLoop>) -> Result<(), IoError> { + pub fn start( + panic_handler: Arc, + event_loop: &mut EventLoop>, + handlers: Arc>, HandlerId>>> + ) -> Result<(), IoError> { let (worker, stealer) = chase_lev::deque(); let num_workers = 4; let work_ready_mutex = Arc::new(SMutex::new(())); @@ -182,7 +186,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { Worker::new( i, stealer.clone(), - IoChannel::new(event_loop.channel()), + IoChannel::new(event_loop.channel(), Arc::downgrade(&handlers)), work_ready.clone(), work_ready_mutex.clone(), panic_handler.clone(), @@ -191,7 +195,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { let mut io = IoManager { timers: Arc::new(RwLock::new(HashMap::new())), - handlers: Slab::new(MAX_HANDLERS), + handlers: handlers, worker_channel: worker, workers: workers, work_ready: work_ready, @@ -208,7 +212,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync fn ready(&mut self, _event_loop: &mut EventLoop, token: Token, events: EventSet) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - if let Some(handler) = self.handlers.get(handler_index) { + if let Some(handler) = self.handlers.read().get(handler_index) { if events.is_hup() { self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index }); } @@ -227,7 +231,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - if let Some(handler) = self.handlers.get(handler_index) { + if let Some(handler) = self.handlers.read().get(handler_index) { if let Some(timer) = self.timers.read().get(&token.as_usize()) { event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index }); @@ -243,12 +247,12 @@ impl Handler for IoManager where Message: Send + Clone + Sync event_loop.shutdown(); }, IoMessage::AddHandler { handler } => { - let handler_id = self.handlers.insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered")); - handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id)); + let handler_id = self.handlers.write().insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered")); + handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel(), Arc::downgrade(&self.handlers)), handler_id)); }, IoMessage::RemoveHandler { handler_id } => { // TODO: flush event loop - self.handlers.remove(handler_id); + self.handlers.write().remove(handler_id); // unregister timers let mut timers = self.timers.write(); let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect(); @@ -269,12 +273,12 @@ impl Handler for IoManager where Message: Send + Clone + Sync } }, IoMessage::RegisterStream { handler_id, token } => { - if let Some(handler) = self.handlers.get(handler_id) { + if let Some(handler) = self.handlers.read().get(handler_id) { handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); } }, IoMessage::DeregisterStream { handler_id, token } => { - if let Some(handler) = self.handlers.get(handler_id) { + if let Some(handler) = self.handlers.read().get(handler_id) { handler.deregister_stream(token, event_loop); // unregister a timer associated with the token (if any) let timer_id = token + handler_id * TOKENS_PER_HANDLER; @@ -284,14 +288,14 @@ impl Handler for IoManager where Message: Send + Clone + Sync } }, IoMessage::UpdateStreamRegistration { handler_id, token } => { - if let Some(handler) = self.handlers.get(handler_id) { + if let Some(handler) = self.handlers.read().get(handler_id) { handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); } }, IoMessage::UserMessage(data) => { //TODO: better way to iterate the slab for id in 0 .. MAX_HANDLERS { - if let Some(h) = self.handlers.get(id) { + if let Some(h) = self.handlers.read().get(id) { let handler = h.clone(); self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id }); } @@ -305,19 +309,21 @@ impl Handler for IoManager where Message: Send + Clone + Sync /// Allows sending messages into the event loop. All the IO handlers will get the message /// in the `message` callback. pub struct IoChannel where Message: Send + Clone{ - channel: Option>> + channel: Option>>, + handlers: Weak>, HandlerId>>>, } -impl Clone for IoChannel where Message: Send + Clone { +impl Clone for IoChannel where Message: Send + Clone + Sync + 'static { fn clone(&self) -> IoChannel { IoChannel { - channel: self.channel.clone() + channel: self.channel.clone(), + handlers: self.handlers.clone(), } } } -impl IoChannel where Message: Send + Clone { - /// Send a msessage through the channel +impl IoChannel where Message: Send + Clone + Sync + 'static { + /// Send a message through the channel pub fn send(&self, message: Message) -> Result<(), IoError> { if let Some(ref channel) = self.channel { try!(channel.send(IoMessage::UserMessage(message))); @@ -325,6 +331,19 @@ impl IoChannel where Message: Send + Clone { Ok(()) } + /// Send a message through the channel and handle it synchronously + pub fn send_sync(&self, message: Message) -> Result<(), IoError> { + if let Some(handlers) = self.handlers.upgrade() { + for id in 0 .. MAX_HANDLERS { + if let Some(h) = handlers.read().get(id) { + let handler = h.clone(); + handler.message(&IoContext::new(self.clone(), id), &message); + } + } + } + Ok(()) + } + /// Send low level io message pub fn send_io(&self, message: IoMessage) -> Result<(), IoError> { if let Some(ref channel) = self.channel { @@ -334,11 +353,17 @@ impl IoChannel where Message: Send + Clone { } /// Create a new channel to connected to event loop. pub fn disconnected() -> IoChannel { - IoChannel { channel: None } + IoChannel { + channel: None, + handlers: Weak::default(), + } } - fn new(channel: Sender>) -> IoChannel { - IoChannel { channel: Some(channel) } + fn new(channel: Sender>, handlers: Weak>, HandlerId>>>) -> IoChannel { + IoChannel { + channel: Some(channel), + handlers: handlers, + } } } @@ -348,6 +373,7 @@ pub struct IoService where Message: Send + Sync + Clone + 'static { panic_handler: Arc, thread: Option>, host_channel: Sender>, + handlers: Arc>, HandlerId>>>, } impl MayPanic for IoService where Message: Send + Sync + Clone + 'static { @@ -365,16 +391,19 @@ impl IoService where Message: Send + Sync + Clone + 'static { let mut event_loop = EventLoop::configured(config).expect("Error creating event loop"); let channel = event_loop.channel(); let panic = panic_handler.clone(); + let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS))); + let h = handlers.clone(); let thread = thread::spawn(move || { let p = panic.clone(); panic.catch_panic(move || { - IoManager::::start(p, &mut event_loop).unwrap(); + IoManager::::start(p, &mut event_loop, h).unwrap(); }).unwrap() }); Ok(IoService { panic_handler: panic_handler, thread: Some(thread), - host_channel: channel + host_channel: channel, + handlers: handlers, }) } @@ -394,7 +423,7 @@ impl IoService where Message: Send + Sync + Clone + 'static { /// Create a new message channel pub fn channel(&self) -> IoChannel { - IoChannel { channel: Some(self.host_channel.clone()) } + IoChannel::new(self.host_channel.clone(), Arc::downgrade(&self.handlers)) } } diff --git a/util/network/src/connection.rs b/util/network/src/connection.rs index d1e35e09b..491d1959d 100644 --- a/util/network/src/connection.rs +++ b/util/network/src/connection.rs @@ -104,7 +104,7 @@ impl GenericConnection { } /// Add a packet to send queue. - pub fn send(&mut self, io: &IoContext, data: Bytes) where Message: Send + Clone { + pub fn send(&mut self, io: &IoContext, data: Bytes) where Message: Send + Clone + Sync + 'static { if !data.is_empty() { self.send_queue.push_back(Cursor::new(data)); } @@ -120,7 +120,7 @@ impl GenericConnection { } /// Writable IO handler. Called when the socket is ready to send. - pub fn writable(&mut self, io: &IoContext) -> Result where Message: Send + Clone { + pub fn writable(&mut self, io: &IoContext) -> Result where Message: Send + Clone + Sync + 'static { if self.send_queue.is_empty() { return Ok(WriteStatus::Complete) } @@ -340,7 +340,7 @@ impl EncryptedConnection { } /// Send a packet - pub fn send_packet(&mut self, io: &IoContext, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone { + pub fn send_packet(&mut self, io: &IoContext, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { let mut header = RlpStream::new(); let len = payload.len() as usize; header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); @@ -435,7 +435,7 @@ impl EncryptedConnection { } /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. - pub fn readable(&mut self, io: &IoContext) -> Result, NetworkError> where Message: Send + Clone{ + pub fn readable(&mut self, io: &IoContext) -> Result, NetworkError> where Message: Send + Clone + Sync + 'static { try!(io.clear_timer(self.connection.token)); if let EncryptedConnectionState::Header = self.read_state { if let Some(data) = try!(self.connection.readable()) { @@ -458,7 +458,7 @@ impl EncryptedConnection { } /// Writable IO handler. Processes send queeue. - pub fn writable(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone { + pub fn writable(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { try!(self.connection.writable(io)); Ok(()) } diff --git a/util/network/src/handshake.rs b/util/network/src/handshake.rs index e87197683..f99fd8e7a 100644 --- a/util/network/src/handshake.rs +++ b/util/network/src/handshake.rs @@ -106,7 +106,7 @@ impl Handshake { } /// Start a handhsake - pub fn start(&mut self, io: &IoContext, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone{ + pub fn start(&mut self, io: &IoContext, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone+ Sync + 'static { self.originated = originated; io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok(); if originated { @@ -125,7 +125,7 @@ impl Handshake { } /// Readable IO handler. Drives the state change. - pub fn readable(&mut self, io: &IoContext, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone { + pub fn readable(&mut self, io: &IoContext, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { if !self.expired() { while let Some(data) = try!(self.connection.readable()) { match self.state { @@ -154,7 +154,7 @@ impl Handshake { } /// Writabe IO handler. - pub fn writable(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone { + pub fn writable(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { if !self.expired() { try!(self.connection.writable(io)); } @@ -172,7 +172,7 @@ impl Handshake { } /// Parse, validate and confirm auth message - fn read_auth(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone { + fn read_auth(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Received handshake auth from {:?}", self.connection.remote_addr_str()); if data.len() != V4_AUTH_PACKET_SIZE { debug!(target: "network", "Wrong auth packet size"); @@ -203,7 +203,7 @@ impl Handshake { Ok(()) } - fn read_auth_eip8(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone { + fn read_auth_eip8(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str()); self.auth_cipher.extend_from_slice(data); let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])); @@ -259,7 +259,7 @@ impl Handshake { } /// Sends auth message - fn write_auth(&mut self, io: &IoContext, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone { + fn write_auth(&mut self, io: &IoContext, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Sending handshake auth to {:?}", self.connection.remote_addr_str()); let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let len = data.len(); @@ -286,7 +286,7 @@ impl Handshake { } /// Sends ack message - fn write_ack(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone { + fn write_ack(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Sending handshake ack to {:?}", self.connection.remote_addr_str()); let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants let len = data.len(); @@ -305,7 +305,7 @@ impl Handshake { } /// Sends EIP8 ack message - fn write_ack_eip8(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone { + fn write_ack_eip8(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str()); let mut rlp = RlpStream::new_list(3); rlp.append(self.ecdhe.public()); diff --git a/util/network/src/session.rs b/util/network/src/session.rs index 4e6dd1c8a..af964bd78 100644 --- a/util/network/src/session.rs +++ b/util/network/src/session.rs @@ -125,7 +125,7 @@ impl Session { /// and leaves the handhsake in limbo to be deregistered from the event loop. pub fn new(io: &IoContext, socket: TcpStream, token: StreamToken, id: Option<&NodeId>, nonce: &H256, stats: Arc, host: &HostInfo) -> Result - where Message: Send + Clone { + where Message: Send + Clone + Sync + 'static { let originated = id.is_some(); let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake"); try!(handshake.start(io, host, originated)); diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 6eb26d6a7..350d84fa3 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -199,7 +199,12 @@ pub struct Database { write_opts: WriteOptions, cfs: Vec, read_opts: ReadOptions, + // Dirty values added with `write_buffered`. Cleaned on `flush`. overlay: RwLock, KeyState>>>, + // Values currently being flushed. Cleared when `flush` completes. + flushing: RwLock, KeyState>>>, + // Prevents concurrent flushes. + flushing_lock: Mutex<()>, } impl Database { @@ -290,7 +295,9 @@ impl Database { db: db, write_opts: write_opts, overlay: RwLock::new((0..(cfs.len() + 1)).map(|_| HashMap::new()).collect()), + flushing: RwLock::new((0..(cfs.len() + 1)).map(|_| HashMap::new()).collect()), cfs: cfs, + flushing_lock: Mutex::new(()), read_opts: read_opts, }) } @@ -330,13 +337,12 @@ impl Database { /// Commit buffered changes to database. pub fn flush(&self) -> Result<(), String> { + let _lock = self.flushing_lock.lock(); + mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); let batch = WriteBatch::new(); - let mut overlay = self.overlay.write(); - - for (c, column) in overlay.iter_mut().enumerate() { - let column_data = mem::replace(column, HashMap::new()); - for (key, state) in column_data.into_iter() { - match state { + for (c, column) in self.flushing.read().iter().enumerate() { + for (key, state) in column.iter() { + match *state { KeyState::Delete => { if c > 0 { try!(batch.delete_cf(self.cfs[c - 1], &key)); @@ -344,14 +350,14 @@ impl Database { try!(batch.delete(&key)); } }, - KeyState::Insert(value) => { + KeyState::Insert(ref value) => { if c > 0 { try!(batch.put_cf(self.cfs[c - 1], &key, &value)); } else { try!(batch.put(&key, &value)); } }, - KeyState::InsertCompressed(value) => { + KeyState::InsertCompressed(ref value) => { let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); if c > 0 { try!(batch.put_cf(self.cfs[c - 1], &key, &compressed)); @@ -362,7 +368,11 @@ impl Database { } } } - self.db.write_opt(batch, &self.write_opts) + try!(self.db.write_opt(batch, &self.write_opts)); + for column in self.flushing.write().iter_mut() { + column.clear(); + } + Ok(()) } @@ -394,9 +404,16 @@ impl Database { Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), Some(&KeyState::Delete) => Ok(None), None => { - col.map_or_else( - || self.db.get_opt(key, &self.read_opts).map(|r| r.map(|v| v.to_vec())), - |c| self.db.get_cf_opt(self.cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| v.to_vec()))) + let flushing = &self.flushing.read()[Self::to_overlay_column(col)]; + match flushing.get(key) { + Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), + Some(&KeyState::Delete) => Ok(None), + None => { + col.map_or_else( + || self.db.get_opt(key, &self.read_opts).map(|r| r.map(|v| v.to_vec())), + |c| self.db.get_cf_opt(self.cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| v.to_vec()))) + }, + } }, } }