Backports for beta (#2764)

* v1.3.9

* Block import optimization (#2748)

* Block import optimization

* whitespace

[ci:none]

* Don't add empty accounts to bloom (#2753)

* Incrementally calculate verification queue heap size (#2749)

* incrementally calculate queue heap size

* query the correct queue sizes
This commit is contained in:
Arkadiy Paronyan 2016-10-20 20:40:50 +02:00 committed by GitHub
parent e5ae24dfb6
commit b3097c8036
17 changed files with 258 additions and 153 deletions

51
Cargo.lock generated
View File

@ -1,6 +1,6 @@
[root] [root]
name = "parity" name = "parity"
version = "1.3.8" version = "1.3.9"
dependencies = [ dependencies = [
"ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
@ -20,7 +20,7 @@ dependencies = [
"ethcore-logger 1.3.0", "ethcore-logger 1.3.0",
"ethcore-rpc 1.3.0", "ethcore-rpc 1.3.0",
"ethcore-signer 1.3.0", "ethcore-signer 1.3.0",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"ethsync 1.3.0", "ethsync 1.3.0",
"fdlimit 0.1.0", "fdlimit 0.1.0",
"hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -35,7 +35,7 @@ dependencies = [
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"syntex 0.36.0 (registry+https://github.com/rust-lang/crates.io-index)", "syntex 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -270,7 +270,7 @@ dependencies = [
"ethcore-ipc 1.3.0", "ethcore-ipc 1.3.0",
"ethcore-ipc-codegen 1.3.0", "ethcore-ipc-codegen 1.3.0",
"ethcore-ipc-nano 1.3.0", "ethcore-ipc-nano 1.3.0",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"ethjson 0.1.0", "ethjson 0.1.0",
"ethstore 0.1.0", "ethstore 0.1.0",
"evmjit 1.3.0", "evmjit 1.3.0",
@ -294,7 +294,7 @@ version = "1.3.0"
dependencies = [ dependencies = [
"clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-rpc 1.3.0", "ethcore-rpc 1.3.0",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "hyper 0.9.4 (git+https://github.com/ethcore/hyper)",
"jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-http-server 6.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git?branch=beta)", "jsonrpc-http-server 6.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git?branch=beta)",
@ -336,7 +336,7 @@ name = "ethcore-ipc"
version = "1.3.0" version = "1.3.0"
dependencies = [ dependencies = [
"ethcore-devtools 1.3.0", "ethcore-devtools 1.3.0",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -381,7 +381,7 @@ dependencies = [
"ethcore-ipc 1.3.0", "ethcore-ipc 1.3.0",
"ethcore-ipc-codegen 1.3.0", "ethcore-ipc-codegen 1.3.0",
"ethcore-ipc-nano 1.3.0", "ethcore-ipc-nano 1.3.0",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)",
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -393,7 +393,7 @@ name = "ethcore-logger"
version = "1.3.0" version = "1.3.0"
dependencies = [ dependencies = [
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -408,7 +408,7 @@ dependencies = [
"ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-devtools 1.3.0", "ethcore-devtools 1.3.0",
"ethcore-io 1.3.0", "ethcore-io 1.3.0",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"igd 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "igd 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -432,7 +432,7 @@ dependencies = [
"ethcore-devtools 1.3.0", "ethcore-devtools 1.3.0",
"ethcore-io 1.3.0", "ethcore-io 1.3.0",
"ethcore-ipc 1.3.0", "ethcore-ipc 1.3.0",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"ethjson 0.1.0", "ethjson 0.1.0",
"ethsync 1.3.0", "ethsync 1.3.0",
"json-ipc-server 0.2.4 (git+https://github.com/ethcore/json-ipc-server.git?branch=beta)", "json-ipc-server 0.2.4 (git+https://github.com/ethcore/json-ipc-server.git?branch=beta)",
@ -455,7 +455,7 @@ dependencies = [
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-io 1.3.0", "ethcore-io 1.3.0",
"ethcore-rpc 1.3.0", "ethcore-rpc 1.3.0",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-dapps-signer 1.4.0 (git+https://github.com/ethcore/parity-ui.git)", "parity-dapps-signer 1.4.0 (git+https://github.com/ethcore/parity-ui.git)",
@ -466,7 +466,7 @@ dependencies = [
[[package]] [[package]]
name = "ethcore-util" name = "ethcore-util"
version = "1.3.8" version = "1.3.9"
dependencies = [ dependencies = [
"ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"arrayvec 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", "arrayvec 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
@ -499,7 +499,7 @@ dependencies = [
name = "ethjson" name = "ethjson"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_codegen 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", "serde_codegen 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)",
@ -547,7 +547,7 @@ dependencies = [
"ethcore-ipc-codegen 1.3.0", "ethcore-ipc-codegen 1.3.0",
"ethcore-ipc-nano 1.3.0", "ethcore-ipc-nano 1.3.0",
"ethcore-network 1.3.0", "ethcore-network 1.3.0",
"ethcore-util 1.3.8", "ethcore-util 1.3.9",
"heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1390,14 +1390,6 @@ dependencies = [
"syntex_syntax 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "syntex_syntax 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "syntex"
version = "0.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"syntex_syntax 0.36.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "syntex_syntax" name = "syntex_syntax"
version = "0.33.0" version = "0.33.0"
@ -1411,19 +1403,6 @@ dependencies = [
"unicode-xid 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-xid 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "syntex_syntax"
version = "0.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"term 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
"unicode-xid 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "table" name = "table"
version = "0.1.0" version = "0.1.0"
@ -1765,9 +1744,7 @@ dependencies = [
"checksum stable-heap 0.1.0 (git+https://github.com/carllerche/stable-heap?rev=3c5cd1ca47)" = "<none>" "checksum stable-heap 0.1.0 (git+https://github.com/carllerche/stable-heap?rev=3c5cd1ca47)" = "<none>"
"checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e" "checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e"
"checksum syntex 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "393b6dd0889df2b064beeea954cfda6bc2571604ac460deeae0fed55a53988af" "checksum syntex 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "393b6dd0889df2b064beeea954cfda6bc2571604ac460deeae0fed55a53988af"
"checksum syntex 0.36.0 (registry+https://github.com/rust-lang/crates.io-index)" = "61dc0bbe1e46dcd53ec50d6600e750152c22e0e9352cadbd413e86fb847ae899"
"checksum syntex_syntax 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44bded3cabafc65c90b663b1071bd2d198a9ab7515e6ce729e4570aaf53c407e" "checksum syntex_syntax 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44bded3cabafc65c90b663b1071bd2d198a9ab7515e6ce729e4570aaf53c407e"
"checksum syntex_syntax 0.36.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2b92a8c33fad2fa99e14fe499ec17e82b6c6496a7a38a499f33b584ffa1886fa"
"checksum target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c63f48baada5c52e65a29eef93ab4f8982681b67f9e8d29c7b05abcfec2b9ffe" "checksum target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c63f48baada5c52e65a29eef93ab4f8982681b67f9e8d29c7b05abcfec2b9ffe"
"checksum term 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "f2077e54d38055cf1ca0fd7933a2e00cd3ec8f6fed352b2a377f06dcdaaf3281" "checksum term 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "f2077e54d38055cf1ca0fd7933a2e00cd3ec8f6fed352b2a377f06dcdaaf3281"
"checksum termios 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a" "checksum termios 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a"

View File

@ -1,7 +1,7 @@
[package] [package]
description = "Ethcore client." description = "Ethcore client."
name = "parity" name = "parity"
version = "1.3.8" version = "1.3.9"
license = "GPL-3.0" license = "GPL-3.0"
authors = ["Ethcore <admin@ethcore.io>"] authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs" build = "build.rs"

@ -1 +1 @@
Subproject commit 862b4e3d4a9a7141af1b4aaf7dfe228a6a294614 Subproject commit 97066e40ccd061f727deb5cd860e4d9135aa2551

View File

@ -288,6 +288,15 @@ impl Account {
/// Determine whether there are any un-`commit()`-ed storage-setting operations. /// Determine whether there are any un-`commit()`-ed storage-setting operations.
pub fn storage_is_clean(&self) -> bool { self.storage_changes.is_empty() } pub fn storage_is_clean(&self) -> bool { self.storage_changes.is_empty() }
/// Check if account has zero nonce, balance, no code and no storage.
pub fn is_empty(&self) -> bool {
self.storage_changes.is_empty() &&
self.balance.is_zero() &&
self.nonce.is_zero() &&
self.storage_root == SHA3_NULL_RLP &&
self.code_hash == SHA3_EMPTY
}
#[cfg(test)] #[cfg(test)]
/// return the storage root associated with this account or None if it has been altered via the overlay. /// return the storage root associated with this account or None if it has been altered via the overlay.
pub fn storage_root(&self) -> Option<&H256> { if self.storage_is_clean() {Some(&self.storage_root)} else {None} } pub fn storage_root(&self) -> Option<&H256> { if self.storage_is_clean() {Some(&self.storage_root)} else {None} }

View File

@ -17,7 +17,7 @@
//! A queue of blocks. Sits between network or other I/O and the `BlockChain`. //! A queue of blocks. Sits between network or other I/O and the `BlockChain`.
//! Sorts them ready for blockchain insertion. //! Sorts them ready for blockchain insertion.
use std::thread::{JoinHandle, self}; use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::{Condvar as SCondvar, Mutex as SMutex}; use std::sync::{Condvar as SCondvar, Mutex as SMutex};
use util::*; use util::*;
use io::*; use io::*;
@ -76,6 +76,13 @@ impl BlockQueueInfo {
} }
} }
// the internal queue sizes.
struct Sizes {
unverified: AtomicUsize,
verifying: AtomicUsize,
verified: AtomicUsize,
}
/// A queue of blocks. Sits between network or other I/O and the `BlockChain`. /// A queue of blocks. Sits between network or other I/O and the `BlockChain`.
/// Sorts them ready for blockchain insertion. /// Sorts them ready for blockchain insertion.
pub struct BlockQueue { pub struct BlockQueue {
@ -110,7 +117,21 @@ struct QueueSignal {
impl QueueSignal { impl QueueSignal {
#[cfg_attr(feature="dev", allow(bool_comparison))] #[cfg_attr(feature="dev", allow(bool_comparison))]
fn set(&self) { fn set_sync(&self) {
// Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) {
return;
}
if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false {
if let Err(e) = self.message_channel.send_sync(ClientIoMessage::BlockVerified) {
debug!("Error sending BlockVerified message: {:?}", e);
}
}
}
#[cfg_attr(feature="dev", allow(bool_comparison))]
fn set_async(&self) {
// Do not signal when we are about to close // Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) { if self.deleting.load(AtomicOrdering::Relaxed) {
return; return;
@ -131,11 +152,12 @@ impl QueueSignal {
struct Verification { struct Verification {
// All locks must be captured in the order declared here. // All locks must be captured in the order declared here.
unverified: Mutex<VecDeque<UnverifiedBlock>>, unverified: Mutex<VecDeque<UnverifiedBlock>>,
verified: Mutex<VecDeque<PreverifiedBlock>>,
verifying: Mutex<VecDeque<VerifyingBlock>>, verifying: Mutex<VecDeque<VerifyingBlock>>,
verified: Mutex<VecDeque<PreverifiedBlock>>,
bad: Mutex<HashSet<H256>>, bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>, more_to_verify: SMutex<()>,
empty: SMutex<()>, empty: SMutex<()>,
sizes: Sizes,
} }
impl BlockQueue { impl BlockQueue {
@ -143,12 +165,16 @@ impl BlockQueue {
pub fn new(config: BlockQueueConfig, engine: Arc<Engine>, message_channel: IoChannel<ClientIoMessage>) -> BlockQueue { pub fn new(config: BlockQueueConfig, engine: Arc<Engine>, message_channel: IoChannel<ClientIoMessage>) -> BlockQueue {
let verification = Arc::new(Verification { let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()), unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
verifying: Mutex::new(VecDeque::new()), verifying: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()), bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()), more_to_verify: SMutex::new(()),
empty: SMutex::new(()), empty: SMutex::new(()),
sizes: Sizes {
unverified: AtomicUsize::new(0),
verifying: AtomicUsize::new(0),
verified: AtomicUsize::new(0),
}
}); });
let more_to_verify = Arc::new(SCondvar::new()); let more_to_verify = Arc::new(SCondvar::new());
let deleting = Arc::new(AtomicBool::new(false)); let deleting = Arc::new(AtomicBool::new(false));
@ -221,16 +247,18 @@ impl BlockQueue {
} }
let mut verifying = verification.verifying.lock(); let mut verifying = verification.verifying.lock();
let block = unverified.pop_front().unwrap(); let block = unverified.pop_front().unwrap();
verification.sizes.unverified.fetch_sub(block.heap_size_of_children(), AtomicOrdering::SeqCst);
verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None }); verifying.push_back(VerifyingBlock{ hash: block.header.hash(), block: None });
block block
}; };
let block_hash = block.header.hash(); let block_hash = block.header.hash();
match verify_block_unordered(block.header, block.bytes, &*engine) { let is_ready = match verify_block_unordered(block.header, block.bytes, &*engine) {
Ok(verified) => { Ok(verified) => {
let mut verifying = verification.verifying.lock(); let mut verifying = verification.verifying.lock();
for e in verifying.iter_mut() { for e in verifying.iter_mut() {
if e.hash == block_hash { if e.hash == block_hash {
verification.sizes.verifying.fetch_add(verified.heap_size_of_children(), AtomicOrdering::SeqCst);
e.block = Some(verified); e.block = Some(verified);
break; break;
} }
@ -239,8 +267,10 @@ impl BlockQueue {
// we're next! // we're next!
let mut verified = verification.verified.lock(); let mut verified = verification.verified.lock();
let mut bad = verification.bad.lock(); let mut bad = verification.bad.lock();
BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes);
ready.set(); true
} else {
false
} }
}, },
Err(err) => { Err(err) => {
@ -250,23 +280,39 @@ impl BlockQueue {
warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err); warn!(target: "client", "Stage 2 block verification failed for {}\nError: {:?}", block_hash, err);
bad.insert(block_hash.clone()); bad.insert(block_hash.clone());
verifying.retain(|e| e.hash != block_hash); verifying.retain(|e| e.hash != block_hash);
BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); if !verifying.is_empty() && verifying.front().unwrap().hash == block_hash {
ready.set(); BlockQueue::drain_verifying(&mut verifying, &mut verified, &mut bad, &verification.sizes);
true
} else {
false
} }
} }
};
if is_ready {
// Import the block immediately
ready.set_sync();
}
} }
} }
fn drain_verifying(verifying: &mut VecDeque<VerifyingBlock>, verified: &mut VecDeque<PreverifiedBlock>, bad: &mut HashSet<H256>) { fn drain_verifying(verifying: &mut VecDeque<VerifyingBlock>, verified: &mut VecDeque<PreverifiedBlock>, bad: &mut HashSet<H256>, sizes: &Sizes) {
let mut removed_size = 0;
let mut inserted_size = 0;
while !verifying.is_empty() && verifying.front().unwrap().block.is_some() { while !verifying.is_empty() && verifying.front().unwrap().block.is_some() {
let block = verifying.pop_front().unwrap().block.unwrap(); let block = verifying.pop_front().unwrap().block.unwrap();
if bad.contains(&block.header.parent_hash) { let size = block.heap_size_of_children();
removed_size += size;
if bad.contains(&block.header.parent_hash()) {
bad.insert(block.header.hash()); bad.insert(block.header.hash());
} } else {
else { inserted_size += size;
verified.push_back(block); verified.push_back(block);
} }
} }
sizes.verifying.fetch_sub(removed_size, AtomicOrdering::SeqCst);
sizes.verified.fetch_add(inserted_size, AtomicOrdering::SeqCst);
} }
/// Clear the queue and stop verification activity. /// Clear the queue and stop verification activity.
@ -277,6 +323,12 @@ impl BlockQueue {
unverified.clear(); unverified.clear();
verifying.clear(); verifying.clear();
verified.clear(); verified.clear();
let sizes = &self.verification.sizes;
sizes.unverified.store(0, AtomicOrdering::Release);
sizes.verifying.store(0, AtomicOrdering::Release);
sizes.verified.store(0, AtomicOrdering::Release);
self.processing.write().clear(); self.processing.write().clear();
} }
@ -322,7 +374,9 @@ impl BlockQueue {
match verify_block_basic(&header, &bytes, &*self.engine) { match verify_block_basic(&header, &bytes, &*self.engine) {
Ok(()) => { Ok(()) => {
self.processing.write().insert(h.clone()); self.processing.write().insert(h.clone());
self.verification.unverified.lock().push_back(UnverifiedBlock { header: header, bytes: bytes }); let block = UnverifiedBlock { header: header, bytes: bytes };
self.verification.sizes.unverified.fetch_add(block.heap_size_of_children(), AtomicOrdering::SeqCst);
self.verification.unverified.lock().push_back(block);
self.more_to_verify.notify_all(); self.more_to_verify.notify_all();
Ok(h) Ok(h)
}, },
@ -350,26 +404,32 @@ impl BlockQueue {
} }
let mut new_verified = VecDeque::new(); let mut new_verified = VecDeque::new();
let mut removed_size = 0;
for block in verified.drain(..) { for block in verified.drain(..) {
if bad.contains(&block.header.parent_hash) { if bad.contains(&block.header.parent_hash) {
removed_size += block.heap_size_of_children();
bad.insert(block.header.hash()); bad.insert(block.header.hash());
processing.remove(&block.header.hash()); processing.remove(&block.header.hash());
} else { } else {
new_verified.push_back(block); new_verified.push_back(block);
} }
} }
self.verification.sizes.verified.fetch_sub(removed_size, AtomicOrdering::SeqCst);
*verified = new_verified; *verified = new_verified;
} }
/// Mark given block as processed /// Mark given item as processed.
pub fn mark_as_good(&self, block_hashes: &[H256]) { /// Returns true if the queue becomes empty.
if block_hashes.is_empty() { pub fn mark_as_good(&self, hashes: &[H256]) -> bool {
return; if hashes.is_empty() {
return self.processing.read().is_empty();
} }
let mut processing = self.processing.write(); let mut processing = self.processing.write();
for hash in block_hashes { for hash in hashes {
processing.remove(hash); processing.remove(hash);
} }
processing.is_empty()
} }
/// Removes up to `max` verified blocks from the queue /// Removes up to `max` verified blocks from the queue
@ -379,28 +439,34 @@ impl BlockQueue {
let mut result = Vec::with_capacity(count); let mut result = Vec::with_capacity(count);
for _ in 0..count { for _ in 0..count {
let block = verified.pop_front().unwrap(); let block = verified.pop_front().unwrap();
self.verification.sizes.verified.fetch_sub(block.heap_size_of_children(), AtomicOrdering::SeqCst);
result.push(block); result.push(block);
} }
self.ready_signal.reset(); self.ready_signal.reset();
if !verified.is_empty() { if !verified.is_empty() {
self.ready_signal.set(); self.ready_signal.set_async();
} }
result result
} }
/// Get queue status. /// Get queue status.
pub fn queue_info(&self) -> BlockQueueInfo { pub fn queue_info(&self) -> BlockQueueInfo {
use std::mem::size_of;
let (unverified_len, unverified_bytes) = { let (unverified_len, unverified_bytes) = {
let v = self.verification.unverified.lock(); let len = self.verification.unverified.lock().len();
(v.len(), v.heap_size_of_children()) let size = self.verification.sizes.unverified.load(AtomicOrdering::Acquire);
(len, size + len * size_of::<UnverifiedBlock>())
}; };
let (verifying_len, verifying_bytes) = { let (verifying_len, verifying_bytes) = {
let v = self.verification.verifying.lock(); let len = self.verification.verifying.lock().len();
(v.len(), v.heap_size_of_children()) let size = self.verification.sizes.verifying.load(AtomicOrdering::Acquire);
(len, size + len * size_of::<VerifyingBlock>())
}; };
let (verified_len, verified_bytes) = { let (verified_len, verified_bytes) = {
let v = self.verification.verified.lock(); let len = self.verification.verified.lock().len();
(v.len(), v.heap_size_of_children()) let size = self.verification.sizes.verified.load(AtomicOrdering::Acquire);
(len, size + len * size_of::<PreverifiedBlock>())
}; };
BlockQueueInfo { BlockQueueInfo {
unverified_queue_size: unverified_len, unverified_queue_size: unverified_len,
@ -408,12 +474,9 @@ impl BlockQueue {
verified_queue_size: verified_len, verified_queue_size: verified_len,
max_queue_size: self.max_queue_size, max_queue_size: self.max_queue_size,
max_mem_use: self.max_mem_use, max_mem_use: self.max_mem_use,
mem_used: mem_used: unverified_bytes
unverified_bytes
+ verifying_bytes + verifying_bytes
+ verified_bytes + verified_bytes
// TODO: https://github.com/servo/heapsize/pull/50
//+ self.processing.read().heap_size_of_children(),
} }
} }

View File

@ -355,16 +355,19 @@ impl Client {
/// This is triggered by a message coming from a block queue when the block is ready for insertion /// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize { pub fn import_verified_blocks(&self) -> usize {
let max_blocks_to_import = 64; let max_blocks_to_import = 4;
let (imported_blocks, import_results, invalid_blocks, imported, duration) = { let (imported_blocks, import_results, invalid_blocks, imported, duration, is_empty) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new(); let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import); let mut import_results = Vec::with_capacity(max_blocks_to_import);
let _import_lock = self.import_lock.lock(); let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.drain(max_blocks_to_import);
if blocks.is_empty() {
return 0;
}
let _timer = PerfTimer::new("import_verified_blocks"); let _timer = PerfTimer::new("import_verified_blocks");
let start = precise_time_ns(); let start = precise_time_ns();
let blocks = self.block_queue.drain(max_blocks_to_import);
for block in blocks { for block in blocks {
let header = &block.header; let header = &block.header;
@ -393,23 +396,19 @@ impl Client {
let imported = imported_blocks.len(); let imported = imported_blocks.len();
let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>(); let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();
{
if !invalid_blocks.is_empty() { if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks); self.block_queue.mark_as_bad(&invalid_blocks);
} }
if !imported_blocks.is_empty() { let is_empty = self.block_queue.mark_as_good(&imported_blocks);
self.block_queue.mark_as_good(&imported_blocks);
}
}
let duration_ns = precise_time_ns() - start; let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, imported, duration_ns) (imported_blocks, import_results, invalid_blocks, imported, duration_ns, is_empty)
}; };
{ {
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() { if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results); let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);
if self.queue_info().is_empty() { if is_empty {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted); self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
} }

View File

@ -278,6 +278,7 @@ impl Miner {
trace!(target: "miner", "done recalibration."); trace!(target: "miner", "done recalibration.");
} }
let _timer = PerfTimer::new("prepare_block");
let (transactions, mut open_block, original_work_hash) = { let (transactions, mut open_block, original_work_hash) = {
let transactions = {self.transaction_queue.lock().top_transactions()}; let transactions = {self.transaction_queue.lock().top_transactions()};
let mut sealing_work = self.sealing_work.lock(); let mut sealing_work = self.sealing_work.lock();

View File

@ -27,7 +27,7 @@ use ids::BlockID;
use views::BlockView; use views::BlockView;
use super::state_db::StateDB; use super::state_db::StateDB;
use util::{Bytes, Hashable, HashDB, snappy, TrieDB, TrieDBMut, TrieMut, BytesConvertable}; use util::{Bytes, Hashable, HashDB, snappy, TrieDB, TrieDBMut, TrieMut, BytesConvertable, U256, Uint};
use util::Mutex; use util::Mutex;
use util::hash::{FixedHash, H256}; use util::hash::{FixedHash, H256};
use util::journaldb::{self, Algorithm, JournalDB}; use util::journaldb::{self, Algorithm, JournalDB};
@ -39,6 +39,8 @@ use self::account::Account;
use self::block::AbridgedBlock; use self::block::AbridgedBlock;
use self::io::SnapshotWriter; use self::io::SnapshotWriter;
use super::account::Account as StateAccount;
use crossbeam::{scope, ScopedJoinHandle}; use crossbeam::{scope, ScopedJoinHandle};
use rand::{Rng, OsRng}; use rand::{Rng, OsRng};
@ -417,6 +419,7 @@ impl StateRebuilder {
/// Feed an uncompressed state chunk into the rebuilder. /// Feed an uncompressed state chunk into the rebuilder.
pub fn feed(&mut self, chunk: &[u8]) -> Result<(), ::error::Error> { pub fn feed(&mut self, chunk: &[u8]) -> Result<(), ::error::Error> {
let rlp = UntrustedRlp::new(chunk); let rlp = UntrustedRlp::new(chunk);
let empty_rlp = StateAccount::new_basic(U256::zero(), U256::zero()).rlp();
let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect(); let account_fat_rlps: Vec<_> = rlp.iter().map(|r| r.as_raw()).collect();
let mut pairs = Vec::with_capacity(rlp.item_count()); let mut pairs = Vec::with_capacity(rlp.item_count());
let backing = self.db.backing().clone(); let backing = self.db.backing().clone();
@ -464,7 +467,9 @@ impl StateRebuilder {
}; };
for (hash, thin_rlp) in pairs { for (hash, thin_rlp) in pairs {
if &thin_rlp[..] != &empty_rlp[..] {
bloom.set(hash.as_slice()); bloom.set(hash.as_slice());
}
try!(account_trie.insert(&hash, &thin_rlp)); try!(account_trie.insert(&hash, &thin_rlp));
} }
} }

View File

@ -335,18 +335,20 @@ impl State {
/// Determine whether an account exists. /// Determine whether an account exists.
pub fn exists(&self, a: &Address) -> bool { pub fn exists(&self, a: &Address) -> bool {
self.ensure_cached(a, RequireCache::None, |a| a.is_some()) // Bloom filter does not contain empty accounts, so it is important here to
// check if account exists in the database directly before EIP-158 is in effect.
self.ensure_cached(a, RequireCache::None, false, |a| a.is_some())
} }
/// Get the balance of account `a`. /// Get the balance of account `a`.
pub fn balance(&self, a: &Address) -> U256 { pub fn balance(&self, a: &Address) -> U256 {
self.ensure_cached(a, RequireCache::None, self.ensure_cached(a, RequireCache::None, true,
|a| a.as_ref().map_or(U256::zero(), |account| *account.balance())) |a| a.as_ref().map_or(U256::zero(), |account| *account.balance()))
} }
/// Get the nonce of account `a`. /// Get the nonce of account `a`.
pub fn nonce(&self, a: &Address) -> U256 { pub fn nonce(&self, a: &Address) -> U256 {
self.ensure_cached(a, RequireCache::None, self.ensure_cached(a, RequireCache::None, true,
|a| a.as_ref().map_or(self.account_start_nonce, |account| *account.nonce())) |a| a.as_ref().map_or(self.account_start_nonce, |account| *account.nonce()))
} }
@ -403,18 +405,18 @@ impl State {
/// Get accounts' code. /// Get accounts' code.
pub fn code(&self, a: &Address) -> Option<Arc<Bytes>> { pub fn code(&self, a: &Address) -> Option<Arc<Bytes>> {
self.ensure_cached(a, RequireCache::Code, self.ensure_cached(a, RequireCache::Code, true,
|a| a.as_ref().map_or(None, |a| a.code().clone())) |a| a.as_ref().map_or(None, |a| a.code().clone()))
} }
pub fn code_hash(&self, a: &Address) -> H256 { pub fn code_hash(&self, a: &Address) -> H256 {
self.ensure_cached(a, RequireCache::None, self.ensure_cached(a, RequireCache::None, true,
|a| a.as_ref().map_or(SHA3_EMPTY, |a| a.code_hash())) |a| a.as_ref().map_or(SHA3_EMPTY, |a| a.code_hash()))
} }
/// Get accounts' code size. /// Get accounts' code size.
pub fn code_size(&self, a: &Address) -> Option<u64> { pub fn code_size(&self, a: &Address) -> Option<u64> {
self.ensure_cached(a, RequireCache::CodeSize, self.ensure_cached(a, RequireCache::CodeSize, true,
|a| a.as_ref().and_then(|a| a.code_size())) |a| a.as_ref().and_then(|a| a.code_size()))
} }
@ -492,7 +494,9 @@ impl State {
for (address, ref mut a) in accounts.iter_mut().filter(|&(_, ref a)| a.is_dirty()) { for (address, ref mut a) in accounts.iter_mut().filter(|&(_, ref a)| a.is_dirty()) {
match a.account { match a.account {
Some(ref mut account) => { Some(ref mut account) => {
if !account.is_empty() {
db.note_account_bloom(&address); db.note_account_bloom(&address);
}
let mut account_db = AccountDBMut::from_hash(db.as_hashdb_mut(), account.address_hash(address)); let mut account_db = AccountDBMut::from_hash(db.as_hashdb_mut(), account.address_hash(address));
account.commit_storage(trie_factory, &mut account_db); account.commit_storage(trie_factory, &mut account_db);
account.commit_code(&mut account_db); account.commit_code(&mut account_db);
@ -545,7 +549,6 @@ impl State {
pub fn populate_from(&mut self, accounts: PodState) { pub fn populate_from(&mut self, accounts: PodState) {
assert!(self.snapshots.borrow().is_empty()); assert!(self.snapshots.borrow().is_empty());
for (add, acc) in accounts.drain().into_iter() { for (add, acc) in accounts.drain().into_iter() {
self.db.note_account_bloom(&add);
self.cache.borrow_mut().insert(add, AccountEntry::new_dirty(Some(Account::from_pod(acc)))); self.cache.borrow_mut().insert(add, AccountEntry::new_dirty(Some(Account::from_pod(acc))));
} }
} }
@ -565,7 +568,7 @@ impl State {
fn query_pod(&mut self, query: &PodState) { fn query_pod(&mut self, query: &PodState) {
for (address, pod_account) in query.get().into_iter() for (address, pod_account) in query.get().into_iter()
.filter(|&(ref a, _)| self.ensure_cached(a, RequireCache::Code, |a| a.is_some())) .filter(|&(ref a, _)| self.ensure_cached(a, RequireCache::Code, true, |a| a.is_some()))
{ {
// needs to be split into two parts for the refcell code here // needs to be split into two parts for the refcell code here
// to work. // to work.
@ -601,7 +604,7 @@ impl State {
/// Check caches for required data /// Check caches for required data
/// First searches for account in the local, then the shared cache. /// First searches for account in the local, then the shared cache.
/// Populates local cache if nothing found. /// Populates local cache if nothing found.
fn ensure_cached<F, U>(&self, a: &Address, require: RequireCache, f: F) -> U fn ensure_cached<F, U>(&self, a: &Address, require: RequireCache, check_bloom: bool, f: F) -> U
where F: Fn(Option<&Account>) -> U { where F: Fn(Option<&Account>) -> U {
// check local cache first // check local cache first
if let Some(ref mut maybe_acc) = self.cache.borrow_mut().get_mut(a) { if let Some(ref mut maybe_acc) = self.cache.borrow_mut().get_mut(a) {
@ -621,6 +624,8 @@ impl State {
match result { match result {
Some(r) => r, Some(r) => r,
None => { None => {
// first check bloom if it is not in database for sure
if check_bloom && !self.db.check_account_bloom(a) { return f(None); }
// not found in the global cache, get from the DB and insert into local // not found in the global cache, get from the DB and insert into local
if !self.db.check_account_bloom(a) { return f(None); } if !self.db.check_account_bloom(a) { return f(None); }
let db = self.trie_factory.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR); let db = self.trie_factory.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR);

View File

@ -152,7 +152,7 @@ fn can_handle_long_fork() {
push_blocks_to_client(client, 49, 1201, 800); push_blocks_to_client(client, 49, 1201, 800);
push_blocks_to_client(client, 53, 1201, 600); push_blocks_to_client(client, 53, 1201, 600);
for _ in 0..40 { for _ in 0..400 {
client.import_verified_blocks(); client.import_verified_blocks();
} }
assert_eq!(2000, client.chain_info().best_block_number); assert_eq!(2000, client.chain_info().best_block_number);

View File

@ -4,7 +4,7 @@
!define DESCRIPTION "Fast, light, robust Ethereum implementation" !define DESCRIPTION "Fast, light, robust Ethereum implementation"
!define VERSIONMAJOR 1 !define VERSIONMAJOR 1
!define VERSIONMINOR 3 !define VERSIONMINOR 3
!define VERSIONBUILD 8 !define VERSIONBUILD 9
!addplugindir .\ !addplugindir .\

View File

@ -3,7 +3,7 @@ description = "Ethcore utility library"
homepage = "http://ethcore.io" homepage = "http://ethcore.io"
license = "GPL-3.0" license = "GPL-3.0"
name = "ethcore-util" name = "ethcore-util"
version = "1.3.8" version = "1.3.9"
authors = ["Ethcore <admin@ethcore.io>"] authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs" build = "build.rs"

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc; use std::sync::{Arc, Weak};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::collections::HashMap; use std::collections::HashMap;
use mio::*; use mio::*;
@ -75,12 +75,12 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
} }
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct IoContext<Message> where Message: Send + Clone + 'static { pub struct IoContext<Message> where Message: Send + Clone + Sync + 'static {
channel: IoChannel<Message>, channel: IoChannel<Message>,
handler: HandlerId, handler: HandlerId,
} }
impl<Message> IoContext<Message> where Message: Send + Clone + 'static { impl<Message> IoContext<Message> where Message: Send + Clone + Sync + 'static {
/// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler.
pub fn new(channel: IoChannel<Message>, handler: HandlerId) -> IoContext<Message> { pub fn new(channel: IoChannel<Message>, handler: HandlerId) -> IoContext<Message> {
IoContext { IoContext {
@ -165,7 +165,7 @@ struct UserTimer {
/// Root IO handler. Manages user handlers, messages and IO timers. /// Root IO handler. Manages user handlers, messages and IO timers.
pub struct IoManager<Message> where Message: Send + Sync { pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>, timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Slab<Arc<IoHandler<Message>>, HandlerId>, handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
workers: Vec<Worker>, workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>, worker_channel: chase_lev::Worker<Work<Message>>,
work_ready: Arc<SCondvar>, work_ready: Arc<SCondvar>,
@ -173,7 +173,11 @@ pub struct IoManager<Message> where Message: Send + Sync {
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static { impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
/// Creates a new instance and registers it with the event loop. /// Creates a new instance and registers it with the event loop.
pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), IoError> { pub fn start(
panic_handler: Arc<PanicHandler>,
event_loop: &mut EventLoop<IoManager<Message>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>
) -> Result<(), IoError> {
let (worker, stealer) = chase_lev::deque(); let (worker, stealer) = chase_lev::deque();
let num_workers = 4; let num_workers = 4;
let work_ready_mutex = Arc::new(SMutex::new(())); let work_ready_mutex = Arc::new(SMutex::new(()));
@ -182,7 +186,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
Worker::new( Worker::new(
i, i,
stealer.clone(), stealer.clone(),
IoChannel::new(event_loop.channel()), IoChannel::new(event_loop.channel(), Arc::downgrade(&handlers)),
work_ready.clone(), work_ready.clone(),
work_ready_mutex.clone(), work_ready_mutex.clone(),
panic_handler.clone(), panic_handler.clone(),
@ -191,7 +195,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
let mut io = IoManager { let mut io = IoManager {
timers: Arc::new(RwLock::new(HashMap::new())), timers: Arc::new(RwLock::new(HashMap::new())),
handlers: Slab::new(MAX_HANDLERS), handlers: handlers,
worker_channel: worker, worker_channel: worker,
workers: workers, workers: workers,
work_ready: work_ready, work_ready: work_ready,
@ -208,7 +212,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) { fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.get(handler_index) { if let Some(handler) = self.handlers.read().get(handler_index) {
if events.is_hup() { if events.is_hup() {
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index }); self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
} }
@ -227,7 +231,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) { fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.get(handler_index) { if let Some(handler) = self.handlers.read().get(handler_index) {
if let Some(timer) = self.timers.read().get(&token.as_usize()) { if let Some(timer) = self.timers.read().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index }); self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
@ -243,12 +247,12 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
event_loop.shutdown(); event_loop.shutdown();
}, },
IoMessage::AddHandler { handler } => { IoMessage::AddHandler { handler } => {
let handler_id = self.handlers.insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered")); let handler_id = self.handlers.write().insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered"));
handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id)); handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel(), Arc::downgrade(&self.handlers)), handler_id));
}, },
IoMessage::RemoveHandler { handler_id } => { IoMessage::RemoveHandler { handler_id } => {
// TODO: flush event loop // TODO: flush event loop
self.handlers.remove(handler_id); self.handlers.write().remove(handler_id);
// unregister timers // unregister timers
let mut timers = self.timers.write(); let mut timers = self.timers.write();
let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect(); let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect();
@ -269,12 +273,12 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
} }
}, },
IoMessage::RegisterStream { handler_id, token } => { IoMessage::RegisterStream { handler_id, token } => {
if let Some(handler) = self.handlers.get(handler_id) { if let Some(handler) = self.handlers.read().get(handler_id) {
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
} }
}, },
IoMessage::DeregisterStream { handler_id, token } => { IoMessage::DeregisterStream { handler_id, token } => {
if let Some(handler) = self.handlers.get(handler_id) { if let Some(handler) = self.handlers.read().get(handler_id) {
handler.deregister_stream(token, event_loop); handler.deregister_stream(token, event_loop);
// unregister a timer associated with the token (if any) // unregister a timer associated with the token (if any)
let timer_id = token + handler_id * TOKENS_PER_HANDLER; let timer_id = token + handler_id * TOKENS_PER_HANDLER;
@ -284,14 +288,14 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
} }
}, },
IoMessage::UpdateStreamRegistration { handler_id, token } => { IoMessage::UpdateStreamRegistration { handler_id, token } => {
if let Some(handler) = self.handlers.get(handler_id) { if let Some(handler) = self.handlers.read().get(handler_id) {
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
} }
}, },
IoMessage::UserMessage(data) => { IoMessage::UserMessage(data) => {
//TODO: better way to iterate the slab //TODO: better way to iterate the slab
for id in 0 .. MAX_HANDLERS { for id in 0 .. MAX_HANDLERS {
if let Some(h) = self.handlers.get(id) { if let Some(h) = self.handlers.read().get(id) {
let handler = h.clone(); let handler = h.clone();
self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id }); self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id });
} }
@ -305,19 +309,21 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
/// Allows sending messages into the event loop. All the IO handlers will get the message /// Allows sending messages into the event loop. All the IO handlers will get the message
/// in the `message` callback. /// in the `message` callback.
pub struct IoChannel<Message> where Message: Send + Clone{ pub struct IoChannel<Message> where Message: Send + Clone{
channel: Option<Sender<IoMessage<Message>>> channel: Option<Sender<IoMessage<Message>>>,
handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
} }
impl<Message> Clone for IoChannel<Message> where Message: Send + Clone { impl<Message> Clone for IoChannel<Message> where Message: Send + Clone + Sync + 'static {
fn clone(&self) -> IoChannel<Message> { fn clone(&self) -> IoChannel<Message> {
IoChannel { IoChannel {
channel: self.channel.clone() channel: self.channel.clone(),
handlers: self.handlers.clone(),
} }
} }
} }
impl<Message> IoChannel<Message> where Message: Send + Clone { impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
/// Send a msessage through the channel /// Send a message through the channel
pub fn send(&self, message: Message) -> Result<(), IoError> { pub fn send(&self, message: Message) -> Result<(), IoError> {
if let Some(ref channel) = self.channel { if let Some(ref channel) = self.channel {
try!(channel.send(IoMessage::UserMessage(message))); try!(channel.send(IoMessage::UserMessage(message)));
@ -325,6 +331,19 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
Ok(()) Ok(())
} }
/// Send a message through the channel and handle it synchronously
pub fn send_sync(&self, message: Message) -> Result<(), IoError> {
if let Some(handlers) = self.handlers.upgrade() {
for id in 0 .. MAX_HANDLERS {
if let Some(h) = handlers.read().get(id) {
let handler = h.clone();
handler.message(&IoContext::new(self.clone(), id), &message);
}
}
}
Ok(())
}
/// Send low level io message /// Send low level io message
pub fn send_io(&self, message: IoMessage<Message>) -> Result<(), IoError> { pub fn send_io(&self, message: IoMessage<Message>) -> Result<(), IoError> {
if let Some(ref channel) = self.channel { if let Some(ref channel) = self.channel {
@ -334,11 +353,17 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
} }
/// Create a new channel to connected to event loop. /// Create a new channel to connected to event loop.
pub fn disconnected() -> IoChannel<Message> { pub fn disconnected() -> IoChannel<Message> {
IoChannel { channel: None } IoChannel {
channel: None,
handlers: Weak::default(),
}
} }
fn new(channel: Sender<IoMessage<Message>>) -> IoChannel<Message> { fn new(channel: Sender<IoMessage<Message>>, handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>) -> IoChannel<Message> {
IoChannel { channel: Some(channel) } IoChannel {
channel: Some(channel),
handlers: handlers,
}
} }
} }
@ -348,6 +373,7 @@ pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
panic_handler: Arc<PanicHandler>, panic_handler: Arc<PanicHandler>,
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
host_channel: Sender<IoMessage<Message>>, host_channel: Sender<IoMessage<Message>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
} }
impl<Message> MayPanic for IoService<Message> where Message: Send + Sync + Clone + 'static { impl<Message> MayPanic for IoService<Message> where Message: Send + Sync + Clone + 'static {
@ -365,16 +391,19 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
let mut event_loop = EventLoop::configured(config).expect("Error creating event loop"); let mut event_loop = EventLoop::configured(config).expect("Error creating event loop");
let channel = event_loop.channel(); let channel = event_loop.channel();
let panic = panic_handler.clone(); let panic = panic_handler.clone();
let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS)));
let h = handlers.clone();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let p = panic.clone(); let p = panic.clone();
panic.catch_panic(move || { panic.catch_panic(move || {
IoManager::<Message>::start(p, &mut event_loop).unwrap(); IoManager::<Message>::start(p, &mut event_loop, h).unwrap();
}).unwrap() }).unwrap()
}); });
Ok(IoService { Ok(IoService {
panic_handler: panic_handler, panic_handler: panic_handler,
thread: Some(thread), thread: Some(thread),
host_channel: channel host_channel: channel,
handlers: handlers,
}) })
} }
@ -394,7 +423,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Create a new message channel /// Create a new message channel
pub fn channel(&self) -> IoChannel<Message> { pub fn channel(&self) -> IoChannel<Message> {
IoChannel { channel: Some(self.host_channel.clone()) } IoChannel::new(self.host_channel.clone(), Arc::downgrade(&self.handlers))
} }
} }

View File

@ -104,7 +104,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
} }
/// Add a packet to send queue. /// Add a packet to send queue.
pub fn send<Message>(&mut self, io: &IoContext<Message>, data: Bytes) where Message: Send + Clone { pub fn send<Message>(&mut self, io: &IoContext<Message>, data: Bytes) where Message: Send + Clone + Sync + 'static {
if !data.is_empty() { if !data.is_empty() {
self.send_queue.push_back(Cursor::new(data)); self.send_queue.push_back(Cursor::new(data));
} }
@ -120,7 +120,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
} }
/// Writable IO handler. Called when the socket is ready to send. /// Writable IO handler. Called when the socket is ready to send.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone { pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone + Sync + 'static {
if self.send_queue.is_empty() { if self.send_queue.is_empty() {
return Ok(WriteStatus::Complete) return Ok(WriteStatus::Complete)
} }
@ -340,7 +340,7 @@ impl EncryptedConnection {
} }
/// Send a packet /// Send a packet
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone { pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
let mut header = RlpStream::new(); let mut header = RlpStream::new();
let len = payload.len() as usize; let len = payload.len() as usize;
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
@ -435,7 +435,7 @@ impl EncryptedConnection {
} }
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, NetworkError> where Message: Send + Clone{ pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, NetworkError> where Message: Send + Clone + Sync + 'static {
try!(io.clear_timer(self.connection.token)); try!(io.clear_timer(self.connection.token));
if let EncryptedConnectionState::Header = self.read_state { if let EncryptedConnectionState::Header = self.read_state {
if let Some(data) = try!(self.connection.readable()) { if let Some(data) = try!(self.connection.readable()) {
@ -458,7 +458,7 @@ impl EncryptedConnection {
} }
/// Writable IO handler. Processes send queeue. /// Writable IO handler. Processes send queeue.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone { pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
try!(self.connection.writable(io)); try!(self.connection.writable(io));
Ok(()) Ok(())
} }

View File

@ -106,7 +106,7 @@ impl Handshake {
} }
/// Start a handhsake /// Start a handhsake
pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone{ pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone+ Sync + 'static {
self.originated = originated; self.originated = originated;
io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok(); io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok();
if originated { if originated {
@ -125,7 +125,7 @@ impl Handshake {
} }
/// Readable IO handler. Drives the state change. /// Readable IO handler. Drives the state change.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone { pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
if !self.expired() { if !self.expired() {
while let Some(data) = try!(self.connection.readable()) { while let Some(data) = try!(self.connection.readable()) {
match self.state { match self.state {
@ -154,7 +154,7 @@ impl Handshake {
} }
/// Writabe IO handler. /// Writabe IO handler.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone { pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
if !self.expired() { if !self.expired() {
try!(self.connection.writable(io)); try!(self.connection.writable(io));
} }
@ -172,7 +172,7 @@ impl Handshake {
} }
/// Parse, validate and confirm auth message /// Parse, validate and confirm auth message
fn read_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone { fn read_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Received handshake auth from {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Received handshake auth from {:?}", self.connection.remote_addr_str());
if data.len() != V4_AUTH_PACKET_SIZE { if data.len() != V4_AUTH_PACKET_SIZE {
debug!(target: "network", "Wrong auth packet size"); debug!(target: "network", "Wrong auth packet size");
@ -203,7 +203,7 @@ impl Handshake {
Ok(()) Ok(())
} }
fn read_auth_eip8<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone { fn read_auth_eip8<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str());
self.auth_cipher.extend_from_slice(data); self.auth_cipher.extend_from_slice(data);
let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])); let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..]));
@ -259,7 +259,7 @@ impl Handshake {
} }
/// Sends auth message /// Sends auth message
fn write_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone { fn write_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending handshake auth to {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Sending handshake auth to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
let len = data.len(); let len = data.len();
@ -286,7 +286,7 @@ impl Handshake {
} }
/// Sends ack message /// Sends ack message
fn write_ack<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone { fn write_ack<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending handshake ack to {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Sending handshake ack to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants
let len = data.len(); let len = data.len();
@ -305,7 +305,7 @@ impl Handshake {
} }
/// Sends EIP8 ack message /// Sends EIP8 ack message
fn write_ack_eip8<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone { fn write_ack_eip8<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str()); trace!(target: "network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str());
let mut rlp = RlpStream::new_list(3); let mut rlp = RlpStream::new_list(3);
rlp.append(self.ecdhe.public()); rlp.append(self.ecdhe.public());

View File

@ -125,7 +125,7 @@ impl Session {
/// and leaves the handhsake in limbo to be deregistered from the event loop. /// and leaves the handhsake in limbo to be deregistered from the event loop.
pub fn new<Message>(io: &IoContext<Message>, socket: TcpStream, token: StreamToken, id: Option<&NodeId>, pub fn new<Message>(io: &IoContext<Message>, socket: TcpStream, token: StreamToken, id: Option<&NodeId>,
nonce: &H256, stats: Arc<NetworkStats>, host: &HostInfo) -> Result<Session, NetworkError> nonce: &H256, stats: Arc<NetworkStats>, host: &HostInfo) -> Result<Session, NetworkError>
where Message: Send + Clone { where Message: Send + Clone + Sync + 'static {
let originated = id.is_some(); let originated = id.is_some();
let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake"); let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake");
try!(handshake.start(io, host, originated)); try!(handshake.start(io, host, originated));

View File

@ -199,7 +199,12 @@ pub struct Database {
write_opts: WriteOptions, write_opts: WriteOptions,
cfs: Vec<Column>, cfs: Vec<Column>,
read_opts: ReadOptions, read_opts: ReadOptions,
// Dirty values added with `write_buffered`. Cleaned on `flush`.
overlay: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>, overlay: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
// Values currently being flushed. Cleared when `flush` completes.
flushing: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
// Prevents concurrent flushes.
flushing_lock: Mutex<()>,
} }
impl Database { impl Database {
@ -290,7 +295,9 @@ impl Database {
db: db, db: db,
write_opts: write_opts, write_opts: write_opts,
overlay: RwLock::new((0..(cfs.len() + 1)).map(|_| HashMap::new()).collect()), overlay: RwLock::new((0..(cfs.len() + 1)).map(|_| HashMap::new()).collect()),
flushing: RwLock::new((0..(cfs.len() + 1)).map(|_| HashMap::new()).collect()),
cfs: cfs, cfs: cfs,
flushing_lock: Mutex::new(()),
read_opts: read_opts, read_opts: read_opts,
}) })
} }
@ -330,13 +337,12 @@ impl Database {
/// Commit buffered changes to database. /// Commit buffered changes to database.
pub fn flush(&self) -> Result<(), String> { pub fn flush(&self) -> Result<(), String> {
let _lock = self.flushing_lock.lock();
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
let batch = WriteBatch::new(); let batch = WriteBatch::new();
let mut overlay = self.overlay.write(); for (c, column) in self.flushing.read().iter().enumerate() {
for (key, state) in column.iter() {
for (c, column) in overlay.iter_mut().enumerate() { match *state {
let column_data = mem::replace(column, HashMap::new());
for (key, state) in column_data.into_iter() {
match state {
KeyState::Delete => { KeyState::Delete => {
if c > 0 { if c > 0 {
try!(batch.delete_cf(self.cfs[c - 1], &key)); try!(batch.delete_cf(self.cfs[c - 1], &key));
@ -344,14 +350,14 @@ impl Database {
try!(batch.delete(&key)); try!(batch.delete(&key));
} }
}, },
KeyState::Insert(value) => { KeyState::Insert(ref value) => {
if c > 0 { if c > 0 {
try!(batch.put_cf(self.cfs[c - 1], &key, &value)); try!(batch.put_cf(self.cfs[c - 1], &key, &value));
} else { } else {
try!(batch.put(&key, &value)); try!(batch.put(&key, &value));
} }
}, },
KeyState::InsertCompressed(value) => { KeyState::InsertCompressed(ref value) => {
let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
if c > 0 { if c > 0 {
try!(batch.put_cf(self.cfs[c - 1], &key, &compressed)); try!(batch.put_cf(self.cfs[c - 1], &key, &compressed));
@ -362,7 +368,11 @@ impl Database {
} }
} }
} }
self.db.write_opt(batch, &self.write_opts) try!(self.db.write_opt(batch, &self.write_opts));
for column in self.flushing.write().iter_mut() {
column.clear();
}
Ok(())
} }
@ -391,6 +401,11 @@ impl Database {
pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<Bytes>, String> { pub fn get(&self, col: Option<u32>, key: &[u8]) -> Result<Option<Bytes>, String> {
let overlay = &self.overlay.read()[Self::to_overlay_column(col)]; let overlay = &self.overlay.read()[Self::to_overlay_column(col)];
match overlay.get(key) { match overlay.get(key) {
Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None),
None => {
let flushing = &self.flushing.read()[Self::to_overlay_column(col)];
match flushing.get(key) {
Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None), Some(&KeyState::Delete) => Ok(None),
None => { None => {
@ -399,6 +414,8 @@ impl Database {
|c| self.db.get_cf_opt(self.cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| v.to_vec()))) |c| self.db.get_cf_opt(self.cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| v.to_vec())))
}, },
} }
},
}
} }
/// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. /// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values.