diff --git a/ethcore/res/ethereum/tests b/ethcore/res/ethereum/tests index 862b4e3d4..97066e40c 160000 --- a/ethcore/res/ethereum/tests +++ b/ethcore/res/ethereum/tests @@ -1 +1 @@ -Subproject commit 862b4e3d4a9a7141af1b4aaf7dfe228a6a294614 +Subproject commit 97066e40ccd061f727deb5cd860e4d9135aa2551 diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 2b13b0570..6f5558708 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -361,16 +361,19 @@ impl Client { /// This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self) -> usize { - let max_blocks_to_import = 64; - let (imported_blocks, import_results, invalid_blocks, imported, duration) = { + let max_blocks_to_import = 4; + let (imported_blocks, import_results, invalid_blocks, imported, duration, is_empty) = { let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); let mut invalid_blocks = HashSet::new(); let mut import_results = Vec::with_capacity(max_blocks_to_import); let _import_lock = self.import_lock.lock(); + let blocks = self.block_queue.drain(max_blocks_to_import); + if blocks.is_empty() { + return 0; + } let _timer = PerfTimer::new("import_verified_blocks"); let start = precise_time_ns(); - let blocks = self.block_queue.drain(max_blocks_to_import); for block in blocks { let header = &block.header; @@ -394,23 +397,19 @@ impl Client { let imported = imported_blocks.len(); let invalid_blocks = invalid_blocks.into_iter().collect::>(); - { - if !invalid_blocks.is_empty() { - self.block_queue.mark_as_bad(&invalid_blocks); - } - if !imported_blocks.is_empty() { - self.block_queue.mark_as_good(&imported_blocks); - } + if !invalid_blocks.is_empty() { + self.block_queue.mark_as_bad(&invalid_blocks); } + let is_empty = self.block_queue.mark_as_good(&imported_blocks); let duration_ns = precise_time_ns() - start; - (imported_blocks, import_results, invalid_blocks, imported, duration_ns) + (imported_blocks, import_results, invalid_blocks, imported, duration_ns, is_empty) }; { - if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() { + if !imported_blocks.is_empty() && is_empty { let (enacted, retracted) = self.calculate_enacted_retracted(&import_results); - if self.queue_info().is_empty() { + if is_empty { self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted); } diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 63bb367b6..8e88db29c 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -282,6 +282,7 @@ impl Miner { trace!(target: "miner", "prepare_block: done recalibration."); } + let _timer = PerfTimer::new("prepare_block"); let (transactions, mut open_block, original_work_hash) = { let transactions = {self.transaction_queue.lock().top_transactions()}; let mut sealing_work = self.sealing_work.lock(); diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index de99d6d05..082cd3c78 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -223,7 +223,7 @@ fn can_handle_long_fork() { push_blocks_to_client(client, 49, 1201, 800); push_blocks_to_client(client, 53, 1201, 600); - for _ in 0..40 { + for _ in 0..400 { client.import_verified_blocks(); } assert_eq!(2000, client.chain_info().best_block_number); diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 3f81d53ce..313ab02a6 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -107,7 +107,21 @@ struct QueueSignal { impl QueueSignal { #[cfg_attr(feature="dev", allow(bool_comparison))] - fn set(&self) { + fn set_sync(&self) { + // Do not signal when we are about to close + if self.deleting.load(AtomicOrdering::Relaxed) { + return; + } + + if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false { + if let Err(e) = self.message_channel.send_sync(ClientIoMessage::BlockVerified) { + debug!("Error sending BlockVerified message: {:?}", e); + } + } + } + + #[cfg_attr(feature="dev", allow(bool_comparison))] + fn set_async(&self) { // Do not signal when we are about to close if self.deleting.load(AtomicOrdering::Relaxed) { return; @@ -128,8 +142,8 @@ impl QueueSignal { struct Verification { // All locks must be captured in the order declared here. unverified: Mutex>, - verified: Mutex>, verifying: Mutex>>, + verified: Mutex>, bad: Mutex>, more_to_verify: SMutex<()>, empty: SMutex<()>, @@ -140,8 +154,8 @@ impl VerificationQueue { pub fn new(config: Config, engine: Arc, message_channel: IoChannel) -> Self { let verification = Arc::new(Verification { unverified: Mutex::new(VecDeque::new()), - verified: Mutex::new(VecDeque::new()), verifying: Mutex::new(VecDeque::new()), + verified: Mutex::new(VecDeque::new()), bad: Mutex::new(HashSet::new()), more_to_verify: SMutex::new(()), empty: SMutex::new(()), @@ -226,7 +240,7 @@ impl VerificationQueue { }; let hash = item.hash(); - match K::verify(item, &*engine) { + let is_ready = match K::verify(item, &*engine) { Ok(verified) => { let mut verifying = verification.verifying.lock(); let mut idx = None; @@ -243,7 +257,9 @@ impl VerificationQueue { let mut verified = verification.verified.lock(); let mut bad = verification.bad.lock(); VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); - ready.set(); + true + } else { + false } }, Err(_) => { @@ -256,9 +272,15 @@ impl VerificationQueue { if verifying.front().map_or(false, |x| x.output.is_some()) { VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad); - ready.set(); + true + } else { + false } } + }; + if is_ready { + // Import the block immediately + ready.set_sync(); } } } @@ -366,15 +388,17 @@ impl VerificationQueue { *verified = new_verified; } - /// Mark given item as processed - pub fn mark_as_good(&self, hashes: &[H256]) { + /// Mark given item as processed. + /// Returns true if the queue becomes empty. + pub fn mark_as_good(&self, hashes: &[H256]) -> bool { if hashes.is_empty() { - return; + return self.processing.read().is_empty(); } let mut processing = self.processing.write(); for hash in hashes { processing.remove(hash); } + processing.is_empty() } /// Removes up to `max` verified items from the queue @@ -385,7 +409,7 @@ impl VerificationQueue { self.ready_signal.reset(); if !verified.is_empty() { - self.ready_signal.set(); + self.ready_signal.set_async(); } result } @@ -411,12 +435,9 @@ impl VerificationQueue { verified_queue_size: verified_len, max_queue_size: self.max_queue_size, max_mem_use: self.max_mem_use, - mem_used: - unverified_bytes - + verifying_bytes - + verified_bytes - // TODO: https://github.com/servo/heapsize/pull/50 - //+ self.processing.read().heap_size_of_children(), + mem_used: unverified_bytes + + verifying_bytes + + verified_bytes } } diff --git a/util/io/src/service.rs b/util/io/src/service.rs index a47e84e56..d06d34284 100644 --- a/util/io/src/service.rs +++ b/util/io/src/service.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::thread::{self, JoinHandle}; use std::collections::HashMap; use mio::*; @@ -75,12 +75,12 @@ pub enum IoMessage where Message: Send + Clone + Sized { } /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. -pub struct IoContext where Message: Send + Clone + 'static { +pub struct IoContext where Message: Send + Clone + Sync + 'static { channel: IoChannel, handler: HandlerId, } -impl IoContext where Message: Send + Clone + 'static { +impl IoContext where Message: Send + Clone + Sync + 'static { /// Create a new IO access point. Takes references to all the data that can be updated within the IO handler. pub fn new(channel: IoChannel, handler: HandlerId) -> IoContext { IoContext { @@ -165,7 +165,7 @@ struct UserTimer { /// Root IO handler. Manages user handlers, messages and IO timers. pub struct IoManager where Message: Send + Sync { timers: Arc>>, - handlers: Slab>, HandlerId>, + handlers: Arc>, HandlerId>>>, workers: Vec, worker_channel: chase_lev::Worker>, work_ready: Arc, @@ -173,7 +173,11 @@ pub struct IoManager where Message: Send + Sync { impl IoManager where Message: Send + Sync + Clone + 'static { /// Creates a new instance and registers it with the event loop. - pub fn start(panic_handler: Arc, event_loop: &mut EventLoop>) -> Result<(), IoError> { + pub fn start( + panic_handler: Arc, + event_loop: &mut EventLoop>, + handlers: Arc>, HandlerId>>> + ) -> Result<(), IoError> { let (worker, stealer) = chase_lev::deque(); let num_workers = 4; let work_ready_mutex = Arc::new(SMutex::new(())); @@ -182,7 +186,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { Worker::new( i, stealer.clone(), - IoChannel::new(event_loop.channel()), + IoChannel::new(event_loop.channel(), Arc::downgrade(&handlers)), work_ready.clone(), work_ready_mutex.clone(), panic_handler.clone(), @@ -191,7 +195,7 @@ impl IoManager where Message: Send + Sync + Clone + 'static { let mut io = IoManager { timers: Arc::new(RwLock::new(HashMap::new())), - handlers: Slab::new(MAX_HANDLERS), + handlers: handlers, worker_channel: worker, workers: workers, work_ready: work_ready, @@ -208,7 +212,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync fn ready(&mut self, _event_loop: &mut EventLoop, token: Token, events: EventSet) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - if let Some(handler) = self.handlers.get(handler_index) { + if let Some(handler) = self.handlers.read().get(handler_index) { if events.is_hup() { self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index }); } @@ -227,7 +231,7 @@ impl Handler for IoManager where Message: Send + Clone + Sync fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { let handler_index = token.as_usize() / TOKENS_PER_HANDLER; let token_id = token.as_usize() % TOKENS_PER_HANDLER; - if let Some(handler) = self.handlers.get(handler_index) { + if let Some(handler) = self.handlers.read().get(handler_index) { if let Some(timer) = self.timers.read().get(&token.as_usize()) { event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer"); self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index }); @@ -243,12 +247,12 @@ impl Handler for IoManager where Message: Send + Clone + Sync event_loop.shutdown(); }, IoMessage::AddHandler { handler } => { - let handler_id = self.handlers.insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered")); - handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id)); + let handler_id = self.handlers.write().insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered")); + handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel(), Arc::downgrade(&self.handlers)), handler_id)); }, IoMessage::RemoveHandler { handler_id } => { // TODO: flush event loop - self.handlers.remove(handler_id); + self.handlers.write().remove(handler_id); // unregister timers let mut timers = self.timers.write(); let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect(); @@ -269,12 +273,12 @@ impl Handler for IoManager where Message: Send + Clone + Sync } }, IoMessage::RegisterStream { handler_id, token } => { - if let Some(handler) = self.handlers.get(handler_id) { + if let Some(handler) = self.handlers.read().get(handler_id) { handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); } }, IoMessage::DeregisterStream { handler_id, token } => { - if let Some(handler) = self.handlers.get(handler_id) { + if let Some(handler) = self.handlers.read().get(handler_id) { handler.deregister_stream(token, event_loop); // unregister a timer associated with the token (if any) let timer_id = token + handler_id * TOKENS_PER_HANDLER; @@ -284,14 +288,14 @@ impl Handler for IoManager where Message: Send + Clone + Sync } }, IoMessage::UpdateStreamRegistration { handler_id, token } => { - if let Some(handler) = self.handlers.get(handler_id) { + if let Some(handler) = self.handlers.read().get(handler_id) { handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); } }, IoMessage::UserMessage(data) => { //TODO: better way to iterate the slab for id in 0 .. MAX_HANDLERS { - if let Some(h) = self.handlers.get(id) { + if let Some(h) = self.handlers.read().get(id) { let handler = h.clone(); self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id }); } @@ -305,19 +309,21 @@ impl Handler for IoManager where Message: Send + Clone + Sync /// Allows sending messages into the event loop. All the IO handlers will get the message /// in the `message` callback. pub struct IoChannel where Message: Send + Clone{ - channel: Option>> + channel: Option>>, + handlers: Weak>, HandlerId>>>, } -impl Clone for IoChannel where Message: Send + Clone { +impl Clone for IoChannel where Message: Send + Clone + Sync + 'static { fn clone(&self) -> IoChannel { IoChannel { - channel: self.channel.clone() + channel: self.channel.clone(), + handlers: self.handlers.clone(), } } } -impl IoChannel where Message: Send + Clone { - /// Send a msessage through the channel +impl IoChannel where Message: Send + Clone + Sync + 'static { + /// Send a message through the channel pub fn send(&self, message: Message) -> Result<(), IoError> { if let Some(ref channel) = self.channel { try!(channel.send(IoMessage::UserMessage(message))); @@ -325,6 +331,19 @@ impl IoChannel where Message: Send + Clone { Ok(()) } + /// Send a message through the channel and handle it synchronously + pub fn send_sync(&self, message: Message) -> Result<(), IoError> { + if let Some(handlers) = self.handlers.upgrade() { + for id in 0 .. MAX_HANDLERS { + if let Some(h) = handlers.read().get(id) { + let handler = h.clone(); + handler.message(&IoContext::new(self.clone(), id), &message); + } + } + } + Ok(()) + } + /// Send low level io message pub fn send_io(&self, message: IoMessage) -> Result<(), IoError> { if let Some(ref channel) = self.channel { @@ -334,11 +353,17 @@ impl IoChannel where Message: Send + Clone { } /// Create a new channel to connected to event loop. pub fn disconnected() -> IoChannel { - IoChannel { channel: None } + IoChannel { + channel: None, + handlers: Weak::default(), + } } - fn new(channel: Sender>) -> IoChannel { - IoChannel { channel: Some(channel) } + fn new(channel: Sender>, handlers: Weak>, HandlerId>>>) -> IoChannel { + IoChannel { + channel: Some(channel), + handlers: handlers, + } } } @@ -348,6 +373,7 @@ pub struct IoService where Message: Send + Sync + Clone + 'static { panic_handler: Arc, thread: Option>, host_channel: Sender>, + handlers: Arc>, HandlerId>>>, } impl MayPanic for IoService where Message: Send + Sync + Clone + 'static { @@ -365,16 +391,19 @@ impl IoService where Message: Send + Sync + Clone + 'static { let mut event_loop = EventLoop::configured(config).expect("Error creating event loop"); let channel = event_loop.channel(); let panic = panic_handler.clone(); + let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS))); + let h = handlers.clone(); let thread = thread::spawn(move || { let p = panic.clone(); panic.catch_panic(move || { - IoManager::::start(p, &mut event_loop).unwrap(); + IoManager::::start(p, &mut event_loop, h).unwrap(); }).unwrap() }); Ok(IoService { panic_handler: panic_handler, thread: Some(thread), - host_channel: channel + host_channel: channel, + handlers: handlers, }) } @@ -394,7 +423,7 @@ impl IoService where Message: Send + Sync + Clone + 'static { /// Create a new message channel pub fn channel(&self) -> IoChannel { - IoChannel { channel: Some(self.host_channel.clone()) } + IoChannel::new(self.host_channel.clone(), Arc::downgrade(&self.handlers)) } } diff --git a/util/network/src/connection.rs b/util/network/src/connection.rs index e12434a19..aadec31b9 100644 --- a/util/network/src/connection.rs +++ b/util/network/src/connection.rs @@ -104,7 +104,7 @@ impl GenericConnection { } /// Add a packet to send queue. - pub fn send(&mut self, io: &IoContext, data: Bytes) where Message: Send + Clone { + pub fn send(&mut self, io: &IoContext, data: Bytes) where Message: Send + Clone + Sync + 'static { if !data.is_empty() { trace!(target:"network", "{}: Sending {} bytes", self.token, data.len()); self.send_queue.push_back(Cursor::new(data)); @@ -121,7 +121,7 @@ impl GenericConnection { } /// Writable IO handler. Called when the socket is ready to send. - pub fn writable(&mut self, io: &IoContext) -> Result where Message: Send + Clone { + pub fn writable(&mut self, io: &IoContext) -> Result where Message: Send + Clone + Sync + 'static { if self.send_queue.is_empty() { return Ok(WriteStatus::Complete) } @@ -346,7 +346,7 @@ impl EncryptedConnection { } /// Send a packet - pub fn send_packet(&mut self, io: &IoContext, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone { + pub fn send_packet(&mut self, io: &IoContext, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { let mut header = RlpStream::new(); let len = payload.len() as usize; header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); @@ -441,7 +441,7 @@ impl EncryptedConnection { } /// Readable IO handler. Tracker receive status and returns decoded packet if avaialable. - pub fn readable(&mut self, io: &IoContext) -> Result, NetworkError> where Message: Send + Clone{ + pub fn readable(&mut self, io: &IoContext) -> Result, NetworkError> where Message: Send + Clone + Sync + 'static { try!(io.clear_timer(self.connection.token)); if let EncryptedConnectionState::Header = self.read_state { if let Some(data) = try!(self.connection.readable()) { @@ -464,7 +464,7 @@ impl EncryptedConnection { } /// Writable IO handler. Processes send queeue. - pub fn writable(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone { + pub fn writable(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { try!(self.connection.writable(io)); Ok(()) } diff --git a/util/network/src/handshake.rs b/util/network/src/handshake.rs index 1456cf801..d7950b383 100644 --- a/util/network/src/handshake.rs +++ b/util/network/src/handshake.rs @@ -106,7 +106,7 @@ impl Handshake { } /// Start a handhsake - pub fn start(&mut self, io: &IoContext, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone{ + pub fn start(&mut self, io: &IoContext, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone+ Sync + 'static { self.originated = originated; io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok(); if originated { @@ -125,7 +125,7 @@ impl Handshake { } /// Readable IO handler. Drives the state change. - pub fn readable(&mut self, io: &IoContext, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone { + pub fn readable(&mut self, io: &IoContext, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { if !self.expired() { while let Some(data) = try!(self.connection.readable()) { match self.state { @@ -154,7 +154,7 @@ impl Handshake { } /// Writabe IO handler. - pub fn writable(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone { + pub fn writable(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { if !self.expired() { try!(self.connection.writable(io)); } @@ -172,7 +172,7 @@ impl Handshake { } /// Parse, validate and confirm auth message - fn read_auth(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone { + fn read_auth(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Received handshake auth from {:?}", self.connection.remote_addr_str()); if data.len() != V4_AUTH_PACKET_SIZE { debug!(target: "network", "Wrong auth packet size"); @@ -203,7 +203,7 @@ impl Handshake { Ok(()) } - fn read_auth_eip8(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone { + fn read_auth_eip8(&mut self, io: &IoContext, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str()); self.auth_cipher.extend_from_slice(data); let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..])); @@ -259,7 +259,7 @@ impl Handshake { } /// Sends auth message - fn write_auth(&mut self, io: &IoContext, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone { + fn write_auth(&mut self, io: &IoContext, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Sending handshake auth to {:?}", self.connection.remote_addr_str()); let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let len = data.len(); @@ -286,7 +286,7 @@ impl Handshake { } /// Sends ack message - fn write_ack(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone { + fn write_ack(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Sending handshake ack to {:?}", self.connection.remote_addr_str()); let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants let len = data.len(); @@ -305,7 +305,7 @@ impl Handshake { } /// Sends EIP8 ack message - fn write_ack_eip8(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone { + fn write_ack_eip8(&mut self, io: &IoContext) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static { trace!(target: "network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str()); let mut rlp = RlpStream::new_list(3); rlp.append(self.ecdhe.public()); diff --git a/util/network/src/session.rs b/util/network/src/session.rs index 50aa92168..c7f196680 100644 --- a/util/network/src/session.rs +++ b/util/network/src/session.rs @@ -144,7 +144,7 @@ impl Session { /// and leaves the handhsake in limbo to be deregistered from the event loop. pub fn new(io: &IoContext, socket: TcpStream, token: StreamToken, id: Option<&NodeId>, nonce: &H256, stats: Arc, host: &HostInfo) -> Result - where Message: Send + Clone { + where Message: Send + Clone + Sync + 'static { let originated = id.is_some(); let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake"); let local_addr = handshake.connection.local_addr_str(); diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 92b7e9fbd..e5507b78f 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -208,8 +208,13 @@ pub struct Database { config: DatabaseConfig, write_opts: WriteOptions, read_opts: ReadOptions, - overlay: RwLock, KeyState>>>, path: String, + // Dirty values added with `write_buffered`. Cleaned on `flush`. + overlay: RwLock, KeyState>>>, + // Values currently being flushed. Cleared when `flush` completes. + flushing: RwLock, KeyState>>>, + // Prevents concurrent flushes. + flushing_lock: Mutex<()>, } impl Database { @@ -310,6 +315,8 @@ impl Database { config: config.clone(), write_opts: write_opts, overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), + flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()), + flushing_lock: Mutex::new(()), path: path.to_owned(), read_opts: read_opts, }) @@ -351,39 +358,44 @@ impl Database { pub fn flush(&self) -> Result<(), String> { match *self.db.read() { Some(DBAndColumns { ref db, ref cfs }) => { + let _lock = self.flushing_lock.lock(); let batch = WriteBatch::new(); - let mut overlay = self.overlay.write(); - - for (c, column) in overlay.iter_mut().enumerate() { - let column_data = mem::replace(column, HashMap::new()); - for (key, state) in column_data.into_iter() { - match state { - KeyState::Delete => { - if c > 0 { - try!(batch.delete_cf(cfs[c - 1], &key)); - } else { - try!(batch.delete(&key)); - } - }, - KeyState::Insert(value) => { - if c > 0 { - try!(batch.put_cf(cfs[c - 1], &key, &value)); - } else { - try!(batch.put(&key, &value)); - } - }, - KeyState::InsertCompressed(value) => { - let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); - if c > 0 { - try!(batch.put_cf(cfs[c - 1], &key, &compressed)); - } else { - try!(batch.put(&key, &value)); + mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); + { + for (c, column) in self.flushing.read().iter().enumerate() { + for (ref key, ref state) in column.iter() { + match **state { + KeyState::Delete => { + if c > 0 { + try!(batch.delete_cf(cfs[c - 1], &key)); + } else { + try!(batch.delete(&key)); + } + }, + KeyState::Insert(ref value) => { + if c > 0 { + try!(batch.put_cf(cfs[c - 1], &key, value)); + } else { + try!(batch.put(&key, &value)); + } + }, + KeyState::InsertCompressed(ref value) => { + let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks); + if c > 0 { + try!(batch.put_cf(cfs[c - 1], &key, &compressed)); + } else { + try!(batch.put(&key, &value)); + } } } } } } - db.write_opt(batch, &self.write_opts) + try!(db.write_opt(batch, &self.write_opts)); + for column in self.flushing.write().iter_mut() { + column.clear(); + } + Ok(()) }, None => Err("Database is closed".to_owned()) } @@ -425,9 +437,16 @@ impl Database { Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), Some(&KeyState::Delete) => Ok(None), None => { - col.map_or_else( - || db.get_opt(key, &self.read_opts).map(|r| r.map(|v| v.to_vec())), - |c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| v.to_vec()))) + let flushing = &self.flushing.read()[Self::to_overlay_column(col)]; + match flushing.get(key) { + Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())), + Some(&KeyState::Delete) => Ok(None), + None => { + col.map_or_else( + || db.get_opt(key, &self.read_opts).map(|r| r.map(|v| v.to_vec())), + |c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| v.to_vec()))) + }, + } }, } }, @@ -468,6 +487,7 @@ impl Database { fn close(&self) { *self.db.write() = None; self.overlay.write().clear(); + self.flushing.write().clear(); } /// Restore the database from a copy at given path. @@ -507,6 +527,7 @@ impl Database { let db = try!(Self::open(&self.config, &self.path)); *self.db.write() = mem::replace(&mut *db.db.write(), None); *self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new()); + *self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new()); Ok(()) } }