Block import optimization (#2748)

* Block import optimization

* whitespace

[ci:none]
This commit is contained in:
Arkadiy Paronyan 2016-10-20 14:49:12 +02:00 committed by Gav Wood
parent 8ef598990a
commit 906dcd7bfe
10 changed files with 174 additions and 103 deletions

@ -1 +1 @@
Subproject commit 862b4e3d4a9a7141af1b4aaf7dfe228a6a294614
Subproject commit 97066e40ccd061f727deb5cd860e4d9135aa2551

View File

@ -361,16 +361,19 @@ impl Client {
/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize {
let max_blocks_to_import = 64;
let (imported_blocks, import_results, invalid_blocks, imported, duration) = {
let max_blocks_to_import = 4;
let (imported_blocks, import_results, invalid_blocks, imported, duration, is_empty) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);
let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.drain(max_blocks_to_import);
if blocks.is_empty() {
return 0;
}
let _timer = PerfTimer::new("import_verified_blocks");
let start = precise_time_ns();
let blocks = self.block_queue.drain(max_blocks_to_import);
for block in blocks {
let header = &block.header;
@ -394,23 +397,19 @@ impl Client {
let imported = imported_blocks.len();
let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();
{
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
if !imported_blocks.is_empty() {
self.block_queue.mark_as_good(&imported_blocks);
}
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
let is_empty = self.block_queue.mark_as_good(&imported_blocks);
let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, imported, duration_ns)
(imported_blocks, import_results, invalid_blocks, imported, duration_ns, is_empty)
};
{
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() {
if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);
if self.queue_info().is_empty() {
if is_empty {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
}

View File

@ -282,6 +282,7 @@ impl Miner {
trace!(target: "miner", "prepare_block: done recalibration.");
}
let _timer = PerfTimer::new("prepare_block");
let (transactions, mut open_block, original_work_hash) = {
let transactions = {self.transaction_queue.lock().top_transactions()};
let mut sealing_work = self.sealing_work.lock();

View File

@ -223,7 +223,7 @@ fn can_handle_long_fork() {
push_blocks_to_client(client, 49, 1201, 800);
push_blocks_to_client(client, 53, 1201, 600);
for _ in 0..40 {
for _ in 0..400 {
client.import_verified_blocks();
}
assert_eq!(2000, client.chain_info().best_block_number);

View File

@ -107,7 +107,21 @@ struct QueueSignal {
impl QueueSignal {
#[cfg_attr(feature="dev", allow(bool_comparison))]
fn set(&self) {
fn set_sync(&self) {
// Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) {
return;
}
if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false {
if let Err(e) = self.message_channel.send_sync(ClientIoMessage::BlockVerified) {
debug!("Error sending BlockVerified message: {:?}", e);
}
}
}
#[cfg_attr(feature="dev", allow(bool_comparison))]
fn set_async(&self) {
// Do not signal when we are about to close
if self.deleting.load(AtomicOrdering::Relaxed) {
return;
@ -128,8 +142,8 @@ impl QueueSignal {
struct Verification<K: Kind> {
// All locks must be captured in the order declared here.
unverified: Mutex<VecDeque<K::Unverified>>,
verified: Mutex<VecDeque<K::Verified>>,
verifying: Mutex<VecDeque<Verifying<K>>>,
verified: Mutex<VecDeque<K::Verified>>,
bad: Mutex<HashSet<H256>>,
more_to_verify: SMutex<()>,
empty: SMutex<()>,
@ -140,8 +154,8 @@ impl<K: Kind> VerificationQueue<K> {
pub fn new(config: Config, engine: Arc<Engine>, message_channel: IoChannel<ClientIoMessage>) -> Self {
let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
verifying: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
bad: Mutex::new(HashSet::new()),
more_to_verify: SMutex::new(()),
empty: SMutex::new(()),
@ -226,7 +240,7 @@ impl<K: Kind> VerificationQueue<K> {
};
let hash = item.hash();
match K::verify(item, &*engine) {
let is_ready = match K::verify(item, &*engine) {
Ok(verified) => {
let mut verifying = verification.verifying.lock();
let mut idx = None;
@ -243,7 +257,9 @@ impl<K: Kind> VerificationQueue<K> {
let mut verified = verification.verified.lock();
let mut bad = verification.bad.lock();
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
true
} else {
false
}
},
Err(_) => {
@ -256,9 +272,15 @@ impl<K: Kind> VerificationQueue<K> {
if verifying.front().map_or(false, |x| x.output.is_some()) {
VerificationQueue::drain_verifying(&mut verifying, &mut verified, &mut bad);
ready.set();
true
} else {
false
}
}
};
if is_ready {
// Import the block immediately
ready.set_sync();
}
}
}
@ -366,15 +388,17 @@ impl<K: Kind> VerificationQueue<K> {
*verified = new_verified;
}
/// Mark given item as processed
pub fn mark_as_good(&self, hashes: &[H256]) {
/// Mark given item as processed.
/// Returns true if the queue becomes empty.
pub fn mark_as_good(&self, hashes: &[H256]) -> bool {
if hashes.is_empty() {
return;
return self.processing.read().is_empty();
}
let mut processing = self.processing.write();
for hash in hashes {
processing.remove(hash);
}
processing.is_empty()
}
/// Removes up to `max` verified items from the queue
@ -385,7 +409,7 @@ impl<K: Kind> VerificationQueue<K> {
self.ready_signal.reset();
if !verified.is_empty() {
self.ready_signal.set();
self.ready_signal.set_async();
}
result
}
@ -411,12 +435,9 @@ impl<K: Kind> VerificationQueue<K> {
verified_queue_size: verified_len,
max_queue_size: self.max_queue_size,
max_mem_use: self.max_mem_use,
mem_used:
unverified_bytes
+ verifying_bytes
+ verified_bytes
// TODO: https://github.com/servo/heapsize/pull/50
//+ self.processing.read().heap_size_of_children(),
mem_used: unverified_bytes
+ verifying_bytes
+ verified_bytes
}
}

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::thread::{self, JoinHandle};
use std::collections::HashMap;
use mio::*;
@ -75,12 +75,12 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
}
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct IoContext<Message> where Message: Send + Clone + 'static {
pub struct IoContext<Message> where Message: Send + Clone + Sync + 'static {
channel: IoChannel<Message>,
handler: HandlerId,
}
impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
impl<Message> IoContext<Message> where Message: Send + Clone + Sync + 'static {
/// Create a new IO access point. Takes references to all the data that can be updated within the IO handler.
pub fn new(channel: IoChannel<Message>, handler: HandlerId) -> IoContext<Message> {
IoContext {
@ -165,7 +165,7 @@ struct UserTimer {
/// Root IO handler. Manages user handlers, messages and IO timers.
pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Slab<Arc<IoHandler<Message>>, HandlerId>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>,
work_ready: Arc<SCondvar>,
@ -173,7 +173,11 @@ pub struct IoManager<Message> where Message: Send + Sync {
impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
/// Creates a new instance and registers it with the event loop.
pub fn start(panic_handler: Arc<PanicHandler>, event_loop: &mut EventLoop<IoManager<Message>>) -> Result<(), IoError> {
pub fn start(
panic_handler: Arc<PanicHandler>,
event_loop: &mut EventLoop<IoManager<Message>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>
) -> Result<(), IoError> {
let (worker, stealer) = chase_lev::deque();
let num_workers = 4;
let work_ready_mutex = Arc::new(SMutex::new(()));
@ -182,7 +186,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
Worker::new(
i,
stealer.clone(),
IoChannel::new(event_loop.channel()),
IoChannel::new(event_loop.channel(), Arc::downgrade(&handlers)),
work_ready.clone(),
work_ready_mutex.clone(),
panic_handler.clone(),
@ -191,7 +195,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
let mut io = IoManager {
timers: Arc::new(RwLock::new(HashMap::new())),
handlers: Slab::new(MAX_HANDLERS),
handlers: handlers,
worker_channel: worker,
workers: workers,
work_ready: work_ready,
@ -208,7 +212,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.get(handler_index) {
if let Some(handler) = self.handlers.read().get(handler_index) {
if events.is_hup() {
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
}
@ -227,7 +231,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.get(handler_index) {
if let Some(handler) = self.handlers.read().get(handler_index) {
if let Some(timer) = self.timers.read().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
@ -243,12 +247,12 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
event_loop.shutdown();
},
IoMessage::AddHandler { handler } => {
let handler_id = self.handlers.insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered"));
handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id));
let handler_id = self.handlers.write().insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered"));
handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel(), Arc::downgrade(&self.handlers)), handler_id));
},
IoMessage::RemoveHandler { handler_id } => {
// TODO: flush event loop
self.handlers.remove(handler_id);
self.handlers.write().remove(handler_id);
// unregister timers
let mut timers = self.timers.write();
let to_remove: Vec<_> = timers.keys().cloned().filter(|timer_id| timer_id / TOKENS_PER_HANDLER == handler_id).collect();
@ -269,12 +273,12 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
}
},
IoMessage::RegisterStream { handler_id, token } => {
if let Some(handler) = self.handlers.get(handler_id) {
if let Some(handler) = self.handlers.read().get(handler_id) {
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
}
},
IoMessage::DeregisterStream { handler_id, token } => {
if let Some(handler) = self.handlers.get(handler_id) {
if let Some(handler) = self.handlers.read().get(handler_id) {
handler.deregister_stream(token, event_loop);
// unregister a timer associated with the token (if any)
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
@ -284,14 +288,14 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
}
},
IoMessage::UpdateStreamRegistration { handler_id, token } => {
if let Some(handler) = self.handlers.get(handler_id) {
if let Some(handler) = self.handlers.read().get(handler_id) {
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
}
},
IoMessage::UserMessage(data) => {
//TODO: better way to iterate the slab
for id in 0 .. MAX_HANDLERS {
if let Some(h) = self.handlers.get(id) {
if let Some(h) = self.handlers.read().get(id) {
let handler = h.clone();
self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id });
}
@ -305,19 +309,21 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
/// Allows sending messages into the event loop. All the IO handlers will get the message
/// in the `message` callback.
pub struct IoChannel<Message> where Message: Send + Clone{
channel: Option<Sender<IoMessage<Message>>>
channel: Option<Sender<IoMessage<Message>>>,
handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
}
impl<Message> Clone for IoChannel<Message> where Message: Send + Clone {
impl<Message> Clone for IoChannel<Message> where Message: Send + Clone + Sync + 'static {
fn clone(&self) -> IoChannel<Message> {
IoChannel {
channel: self.channel.clone()
channel: self.channel.clone(),
handlers: self.handlers.clone(),
}
}
}
impl<Message> IoChannel<Message> where Message: Send + Clone {
/// Send a msessage through the channel
impl<Message> IoChannel<Message> where Message: Send + Clone + Sync + 'static {
/// Send a message through the channel
pub fn send(&self, message: Message) -> Result<(), IoError> {
if let Some(ref channel) = self.channel {
try!(channel.send(IoMessage::UserMessage(message)));
@ -325,6 +331,19 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
Ok(())
}
/// Send a message through the channel and handle it synchronously
pub fn send_sync(&self, message: Message) -> Result<(), IoError> {
if let Some(handlers) = self.handlers.upgrade() {
for id in 0 .. MAX_HANDLERS {
if let Some(h) = handlers.read().get(id) {
let handler = h.clone();
handler.message(&IoContext::new(self.clone(), id), &message);
}
}
}
Ok(())
}
/// Send low level io message
pub fn send_io(&self, message: IoMessage<Message>) -> Result<(), IoError> {
if let Some(ref channel) = self.channel {
@ -334,11 +353,17 @@ impl<Message> IoChannel<Message> where Message: Send + Clone {
}
/// Create a new channel to connected to event loop.
pub fn disconnected() -> IoChannel<Message> {
IoChannel { channel: None }
IoChannel {
channel: None,
handlers: Weak::default(),
}
}
fn new(channel: Sender<IoMessage<Message>>) -> IoChannel<Message> {
IoChannel { channel: Some(channel) }
fn new(channel: Sender<IoMessage<Message>>, handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>) -> IoChannel<Message> {
IoChannel {
channel: Some(channel),
handlers: handlers,
}
}
}
@ -348,6 +373,7 @@ pub struct IoService<Message> where Message: Send + Sync + Clone + 'static {
panic_handler: Arc<PanicHandler>,
thread: Option<JoinHandle<()>>,
host_channel: Sender<IoMessage<Message>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>, HandlerId>>>,
}
impl<Message> MayPanic for IoService<Message> where Message: Send + Sync + Clone + 'static {
@ -365,16 +391,19 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
let mut event_loop = EventLoop::configured(config).expect("Error creating event loop");
let channel = event_loop.channel();
let panic = panic_handler.clone();
let handlers = Arc::new(RwLock::new(Slab::new(MAX_HANDLERS)));
let h = handlers.clone();
let thread = thread::spawn(move || {
let p = panic.clone();
panic.catch_panic(move || {
IoManager::<Message>::start(p, &mut event_loop).unwrap();
IoManager::<Message>::start(p, &mut event_loop, h).unwrap();
}).unwrap()
});
Ok(IoService {
panic_handler: panic_handler,
thread: Some(thread),
host_channel: channel
host_channel: channel,
handlers: handlers,
})
}
@ -394,7 +423,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
/// Create a new message channel
pub fn channel(&self) -> IoChannel<Message> {
IoChannel { channel: Some(self.host_channel.clone()) }
IoChannel::new(self.host_channel.clone(), Arc::downgrade(&self.handlers))
}
}

View File

@ -104,7 +104,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
}
/// Add a packet to send queue.
pub fn send<Message>(&mut self, io: &IoContext<Message>, data: Bytes) where Message: Send + Clone {
pub fn send<Message>(&mut self, io: &IoContext<Message>, data: Bytes) where Message: Send + Clone + Sync + 'static {
if !data.is_empty() {
trace!(target:"network", "{}: Sending {} bytes", self.token, data.len());
self.send_queue.push_back(Cursor::new(data));
@ -121,7 +121,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
}
/// Writable IO handler. Called when the socket is ready to send.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone {
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<WriteStatus, NetworkError> where Message: Send + Clone + Sync + 'static {
if self.send_queue.is_empty() {
return Ok(WriteStatus::Complete)
}
@ -346,7 +346,7 @@ impl EncryptedConnection {
}
/// Send a packet
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone {
pub fn send_packet<Message>(&mut self, io: &IoContext<Message>, payload: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
let mut header = RlpStream::new();
let len = payload.len() as usize;
header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1);
@ -441,7 +441,7 @@ impl EncryptedConnection {
}
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, NetworkError> where Message: Send + Clone{
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Result<Option<Packet>, NetworkError> where Message: Send + Clone + Sync + 'static {
try!(io.clear_timer(self.connection.token));
if let EncryptedConnectionState::Header = self.read_state {
if let Some(data) = try!(self.connection.readable()) {
@ -464,7 +464,7 @@ impl EncryptedConnection {
}
/// Writable IO handler. Processes send queeue.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone {
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
try!(self.connection.writable(io));
Ok(())
}

View File

@ -106,7 +106,7 @@ impl Handshake {
}
/// Start a handhsake
pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone{
pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), NetworkError> where Message: Send + Clone+ Sync + 'static {
self.originated = originated;
io.register_timer(self.connection.token, HANDSHAKE_TIMEOUT).ok();
if originated {
@ -125,7 +125,7 @@ impl Handshake {
}
/// Readable IO handler. Drives the state change.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone {
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
if !self.expired() {
while let Some(data) = try!(self.connection.readable()) {
match self.state {
@ -154,7 +154,7 @@ impl Handshake {
}
/// Writabe IO handler.
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone {
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
if !self.expired() {
try!(self.connection.writable(io));
}
@ -172,7 +172,7 @@ impl Handshake {
}
/// Parse, validate and confirm auth message
fn read_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone {
fn read_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Received handshake auth from {:?}", self.connection.remote_addr_str());
if data.len() != V4_AUTH_PACKET_SIZE {
debug!(target: "network", "Wrong auth packet size");
@ -203,7 +203,7 @@ impl Handshake {
Ok(())
}
fn read_auth_eip8<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone {
fn read_auth_eip8<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, data: &[u8]) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str());
self.auth_cipher.extend_from_slice(data);
let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..]));
@ -259,7 +259,7 @@ impl Handshake {
}
/// Sends auth message
fn write_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone {
fn write_auth<Message>(&mut self, io: &IoContext<Message>, secret: &Secret, public: &Public) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending handshake auth to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
let len = data.len();
@ -286,7 +286,7 @@ impl Handshake {
}
/// Sends ack message
fn write_ack<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone {
fn write_ack<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending handshake ack to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants
let len = data.len();
@ -305,7 +305,7 @@ impl Handshake {
}
/// Sends EIP8 ack message
fn write_ack_eip8<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone {
fn write_ack_eip8<Message>(&mut self, io: &IoContext<Message>) -> Result<(), NetworkError> where Message: Send + Clone + Sync + 'static {
trace!(target: "network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str());
let mut rlp = RlpStream::new_list(3);
rlp.append(self.ecdhe.public());

View File

@ -144,7 +144,7 @@ impl Session {
/// and leaves the handhsake in limbo to be deregistered from the event loop.
pub fn new<Message>(io: &IoContext<Message>, socket: TcpStream, token: StreamToken, id: Option<&NodeId>,
nonce: &H256, stats: Arc<NetworkStats>, host: &HostInfo) -> Result<Session, NetworkError>
where Message: Send + Clone {
where Message: Send + Clone + Sync + 'static {
let originated = id.is_some();
let mut handshake = Handshake::new(token, id, socket, nonce, stats).expect("Can't create handshake");
let local_addr = handshake.connection.local_addr_str();

View File

@ -208,8 +208,13 @@ pub struct Database {
config: DatabaseConfig,
write_opts: WriteOptions,
read_opts: ReadOptions,
overlay: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
path: String,
// Dirty values added with `write_buffered`. Cleaned on `flush`.
overlay: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
// Values currently being flushed. Cleared when `flush` completes.
flushing: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
// Prevents concurrent flushes.
flushing_lock: Mutex<()>,
}
impl Database {
@ -310,6 +315,8 @@ impl Database {
config: config.clone(),
write_opts: write_opts,
overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
flushing_lock: Mutex::new(()),
path: path.to_owned(),
read_opts: read_opts,
})
@ -351,39 +358,44 @@ impl Database {
pub fn flush(&self) -> Result<(), String> {
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
let _lock = self.flushing_lock.lock();
let batch = WriteBatch::new();
let mut overlay = self.overlay.write();
for (c, column) in overlay.iter_mut().enumerate() {
let column_data = mem::replace(column, HashMap::new());
for (key, state) in column_data.into_iter() {
match state {
KeyState::Delete => {
if c > 0 {
try!(batch.delete_cf(cfs[c - 1], &key));
} else {
try!(batch.delete(&key));
}
},
KeyState::Insert(value) => {
if c > 0 {
try!(batch.put_cf(cfs[c - 1], &key, &value));
} else {
try!(batch.put(&key, &value));
}
},
KeyState::InsertCompressed(value) => {
let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
if c > 0 {
try!(batch.put_cf(cfs[c - 1], &key, &compressed));
} else {
try!(batch.put(&key, &value));
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
{
for (c, column) in self.flushing.read().iter().enumerate() {
for (ref key, ref state) in column.iter() {
match **state {
KeyState::Delete => {
if c > 0 {
try!(batch.delete_cf(cfs[c - 1], &key));
} else {
try!(batch.delete(&key));
}
},
KeyState::Insert(ref value) => {
if c > 0 {
try!(batch.put_cf(cfs[c - 1], &key, value));
} else {
try!(batch.put(&key, &value));
}
},
KeyState::InsertCompressed(ref value) => {
let compressed = UntrustedRlp::new(&value).compress(RlpType::Blocks);
if c > 0 {
try!(batch.put_cf(cfs[c - 1], &key, &compressed));
} else {
try!(batch.put(&key, &value));
}
}
}
}
}
}
db.write_opt(batch, &self.write_opts)
try!(db.write_opt(batch, &self.write_opts));
for column in self.flushing.write().iter_mut() {
column.clear();
}
Ok(())
},
None => Err("Database is closed".to_owned())
}
@ -425,9 +437,16 @@ impl Database {
Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None),
None => {
col.map_or_else(
|| db.get_opt(key, &self.read_opts).map(|r| r.map(|v| v.to_vec())),
|c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| v.to_vec())))
let flushing = &self.flushing.read()[Self::to_overlay_column(col)];
match flushing.get(key) {
Some(&KeyState::Insert(ref value)) | Some(&KeyState::InsertCompressed(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None),
None => {
col.map_or_else(
|| db.get_opt(key, &self.read_opts).map(|r| r.map(|v| v.to_vec())),
|c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| v.to_vec())))
},
}
},
}
},
@ -468,6 +487,7 @@ impl Database {
fn close(&self) {
*self.db.write() = None;
self.overlay.write().clear();
self.flushing.write().clear();
}
/// Restore the database from a copy at given path.
@ -507,6 +527,7 @@ impl Database {
let db = try!(Self::open(&self.config, &self.path));
*self.db.write() = mem::replace(&mut *db.db.write(), None);
*self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new());
*self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new());
Ok(())
}
}