From eca8fb74ae515483ec8a8a699249451a0670e8ef Mon Sep 17 00:00:00 2001 From: rakita Date: Wed, 10 Mar 2021 12:36:23 +0100 Subject: [PATCH] Strict memory order (#306) * Make MemoryOrdering more strict * fmt * Strict mem order for priority_tasks_gate --- bin/oe/stratum.rs | 2 +- crates/concensus/miner/price-info/src/lib.rs | 8 ++--- crates/concensus/miner/src/pool/queue.rs | 2 +- crates/ethcore/service/src/stop_guard.rs | 2 +- crates/ethcore/src/client/client.rs | 18 +++++------ crates/ethcore/src/client/test_client.rs | 8 ++--- crates/ethcore/src/snapshot/mod.rs | 18 +++++------ crates/ethcore/src/verification/queue/mod.rs | 32 +++++++------------ crates/ethcore/sync/src/chain/mod.rs | 4 +-- crates/net/network-devp2p/src/host.rs | 10 +++--- crates/net/network-devp2p/tests/tests.rs | 8 ++--- crates/rpc/src/v1/informant.rs | 2 +- crates/runtime/io/src/worker.rs | 6 ++-- .../util/cli-signer/rpc-client/src/client.rs | 2 +- 14 files changed, 56 insertions(+), 66 deletions(-) diff --git a/bin/oe/stratum.rs b/bin/oe/stratum.rs index 14e54aa90..d3bd6cc3a 100644 --- a/bin/oe/stratum.rs +++ b/bin/oe/stratum.rs @@ -39,7 +39,7 @@ struct StratumControlService { impl ControlService for StratumControlService { fn shutdown(&self) -> bool { trace!(target: "hypervisor", "Received shutdown from control service"); - self.stop.store(true, ::std::sync::atomic::Ordering::Relaxed); + self.stop.store(true, ::std::sync::atomic::Ordering::SeqCst); true } } diff --git a/crates/concensus/miner/price-info/src/lib.rs b/crates/concensus/miner/price-info/src/lib.rs index 51ad6a0bc..7cb422724 100644 --- a/crates/concensus/miner/price-info/src/lib.rs +++ b/crates/concensus/miner/price-info/src/lib.rs @@ -207,11 +207,11 @@ mod test { // when let bb = b.clone(); price_info.get(move |_| { - bb.store(true, Ordering::Relaxed); + bb.store(true, Ordering::SeqCst); }); // then - assert_eq!(b.load(Ordering::Relaxed), false); + assert_eq!(b.load(Ordering::SeqCst), false); } #[test] @@ -225,10 +225,10 @@ mod test { // when let bb = b.clone(); price_info.get(move |_| { - bb.store(true, Ordering::Relaxed); + bb.store(true, Ordering::SeqCst); }); // then - assert_eq!(b.load(Ordering::Relaxed), false); + assert_eq!(b.load(Ordering::SeqCst), false); } } diff --git a/crates/concensus/miner/src/pool/queue.rs b/crates/concensus/miner/src/pool/queue.rs index d9a72e3da..e2e0d1562 100644 --- a/crates/concensus/miner/src/pool/queue.rs +++ b/crates/concensus/miner/src/pool/queue.rs @@ -480,7 +480,7 @@ impl TransactionQueue { // We want to clear stale transactions from the queue as well. // (Transactions that are occuping the queue for a long time without being included) let stale_id = { - let current_id = self.insertion_id.load(atomic::Ordering::Relaxed); + let current_id = self.insertion_id.load(atomic::Ordering::SeqCst); // wait at least for half of the queue to be replaced let gap = self.pool.read().options().max_count / 2; // but never less than 100 transactions diff --git a/crates/ethcore/service/src/stop_guard.rs b/crates/ethcore/service/src/stop_guard.rs index 34438fd59..ce662ea5d 100644 --- a/crates/ethcore/service/src/stop_guard.rs +++ b/crates/ethcore/service/src/stop_guard.rs @@ -34,6 +34,6 @@ impl StopGuard { impl Drop for StopGuard { fn drop(&mut self) { - self.flag.store(true, Ordering::Relaxed) + self.flag.store(true, Ordering::SeqCst) } } diff --git a/crates/ethcore/src/client/client.rs b/crates/ethcore/src/client/client.rs index a6c3c7a97..5f183ba04 100644 --- a/crates/ethcore/src/client/client.rs +++ b/crates/ethcore/src/client/client.rs @@ -284,7 +284,7 @@ impl Importer { // t_nb 6.0 This is triggered by a message coming from a block queue when the block is ready for insertion pub fn import_verified_blocks(&self, client: &Client) -> usize { // Shortcut out if we know we're incapable of syncing the chain. - if !client.enabled.load(AtomicOrdering::Relaxed) { + if !client.enabled.load(AtomicOrdering::SeqCst) { return 0; } @@ -1445,18 +1445,18 @@ impl Client { } fn wake_up(&self) { - if !self.liveness.load(AtomicOrdering::Relaxed) { - self.liveness.store(true, AtomicOrdering::Relaxed); + if !self.liveness.load(AtomicOrdering::SeqCst) { + self.liveness.store(true, AtomicOrdering::SeqCst); self.notify(|n| n.start()); info!(target: "mode", "wake_up: Waking."); } } fn sleep(&self, force: bool) { - if self.liveness.load(AtomicOrdering::Relaxed) { + if self.liveness.load(AtomicOrdering::SeqCst) { // only sleep if the import queue is mostly empty. if force || (self.queue_info().total_queue_size() <= MAX_QUEUE_SIZE_TO_SLEEP_ON) { - self.liveness.store(false, AtomicOrdering::Relaxed); + self.liveness.store(false, AtomicOrdering::SeqCst); self.notify(|n| n.stop()); info!(target: "mode", "sleep: Sleeping."); } else { @@ -2058,13 +2058,13 @@ impl BlockChainClient for Client { fn disable(&self) { self.set_mode(Mode::Off); - self.enabled.store(false, AtomicOrdering::Relaxed); + self.enabled.store(false, AtomicOrdering::SeqCst); self.clear_queue(); } fn set_mode(&self, new_mode: Mode) { trace!(target: "mode", "Client::set_mode({:?})", new_mode); - if !self.enabled.load(AtomicOrdering::Relaxed) { + if !self.enabled.load(AtomicOrdering::SeqCst) { return; } { @@ -2095,7 +2095,7 @@ impl BlockChainClient for Client { fn set_spec_name(&self, new_spec_name: String) -> Result<(), ()> { trace!(target: "mode", "Client::set_spec_name({:?})", new_spec_name); - if !self.enabled.load(AtomicOrdering::Relaxed) { + if !self.enabled.load(AtomicOrdering::SeqCst) { return Err(()); } if let Some(ref h) = *self.exit_handler.lock() { @@ -3241,7 +3241,7 @@ impl IoChannelQueue { where F: Fn(&Client) + Send + Sync + 'static, { - let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); + let queue_size = self.currently_queued.load(AtomicOrdering::SeqCst); if queue_size >= self.limit { let err_limit = usize::try_from(self.limit).unwrap_or(usize::max_value()); bail!("The queue is full ({})", err_limit); diff --git a/crates/ethcore/src/client/test_client.rs b/crates/ethcore/src/client/test_client.rs index 28195da23..c0f62aff6 100644 --- a/crates/ethcore/src/client/test_client.rs +++ b/crates/ethcore/src/client/test_client.rs @@ -237,7 +237,7 @@ impl TestBlockChainClient { /// Set block queue size for testing pub fn set_queue_size(&self, size: usize) { - self.queue_size.store(size, AtomicOrder::Relaxed); + self.queue_size.store(size, AtomicOrder::SeqCst); } /// Set timestamp assigned to latest sealed block @@ -402,7 +402,7 @@ impl TestBlockChainClient { /// Returns true if the client has been disabled. pub fn is_disabled(&self) -> bool { - self.disabled.load(AtomicOrder::Relaxed) + self.disabled.load(AtomicOrder::SeqCst) } } @@ -959,7 +959,7 @@ impl BlockChainClient for TestBlockChainClient { fn queue_info(&self) -> QueueInfo { QueueInfo { - verified_queue_size: self.queue_size.load(AtomicOrder::Relaxed), + verified_queue_size: self.queue_size.load(AtomicOrder::SeqCst), unverified_queue_size: 0, verifying_queue_size: 0, max_queue_size: 0, @@ -1019,7 +1019,7 @@ impl BlockChainClient for TestBlockChainClient { } fn disable(&self) { - self.disabled.store(true, AtomicOrder::Relaxed); + self.disabled.store(true, AtomicOrder::SeqCst); } fn pruning_info(&self) -> PruningInfo { diff --git a/crates/ethcore/src/snapshot/mod.rs b/crates/ethcore/src/snapshot/mod.rs index ca9b7f7b0..444b49429 100644 --- a/crates/ethcore/src/snapshot/mod.rs +++ b/crates/ethcore/src/snapshot/mod.rs @@ -127,34 +127,34 @@ pub struct Progress { impl Progress { /// Reset the progress. pub fn reset(&self) { - self.accounts.store(0, Ordering::Release); - self.blocks.store(0, Ordering::Release); - self.size.store(0, Ordering::Release); - self.abort.store(false, Ordering::Release); + self.accounts.store(0, Ordering::SeqCst); + self.blocks.store(0, Ordering::SeqCst); + self.size.store(0, Ordering::SeqCst); + self.abort.store(false, Ordering::SeqCst); // atomic fence here to ensure the others are written first? // logs might very rarely get polluted if not. - self.done.store(false, Ordering::Release); + self.done.store(false, Ordering::SeqCst); } /// Get the number of accounts snapshotted thus far. pub fn accounts(&self) -> usize { - self.accounts.load(Ordering::Acquire) + self.accounts.load(Ordering::SeqCst) } /// Get the number of blocks snapshotted thus far. pub fn blocks(&self) -> usize { - self.blocks.load(Ordering::Acquire) + self.blocks.load(Ordering::SeqCst) } /// Get the written size of the snapshot in bytes. pub fn size(&self) -> u64 { - self.size.load(Ordering::Acquire) + self.size.load(Ordering::SeqCst) } /// Whether the snapshot is complete. pub fn done(&self) -> bool { - self.done.load(Ordering::Acquire) + self.done.load(Ordering::SeqCst) } } /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. diff --git a/crates/ethcore/src/verification/queue/mod.rs b/crates/ethcore/src/verification/queue/mod.rs index b6f2b8bba..2928ccc2e 100644 --- a/crates/ethcore/src/verification/queue/mod.rs +++ b/crates/ethcore/src/verification/queue/mod.rs @@ -175,18 +175,13 @@ struct QueueSignal { impl QueueSignal { fn set_sync(&self) { // Do not signal when we are about to close - if self.deleting.load(AtomicOrdering::Relaxed) { + if self.deleting.load(AtomicOrdering::SeqCst) { return; } if self .signalled - .compare_exchange( - false, - true, - AtomicOrdering::Relaxed, - AtomicOrdering::Relaxed, - ) + .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst) .is_ok() { let channel = self.message_channel.lock().clone(); @@ -198,18 +193,13 @@ impl QueueSignal { fn set_async(&self) { // Do not signal when we are about to close - if self.deleting.load(AtomicOrdering::Relaxed) { + if self.deleting.load(AtomicOrdering::SeqCst) { return; } if self .signalled - .compare_exchange( - false, - true, - AtomicOrdering::Relaxed, - AtomicOrdering::Relaxed, - ) + .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst) .is_ok() { let channel = self.message_channel.lock().clone(); @@ -220,7 +210,7 @@ impl QueueSignal { } fn reset(&self) { - self.signalled.store(false, AtomicOrdering::Relaxed); + self.signalled.store(false, AtomicOrdering::SeqCst); } } @@ -499,9 +489,9 @@ impl VerificationQueue { verified.clear(); let sizes = &self.verification.sizes; - sizes.unverified.store(0, AtomicOrdering::Release); - sizes.verifying.store(0, AtomicOrdering::Release); - sizes.verified.store(0, AtomicOrdering::Release); + sizes.unverified.store(0, AtomicOrdering::SeqCst); + sizes.verifying.store(0, AtomicOrdering::SeqCst); + sizes.verified.store(0, AtomicOrdering::SeqCst); *self.total_difficulty.write() = 0.into(); self.processing.write().clear(); @@ -728,7 +718,7 @@ impl VerificationQueue { .verification .sizes .unverified - .load(AtomicOrdering::Acquire); + .load(AtomicOrdering::SeqCst); (len, size + len * size_of::()) }; @@ -738,7 +728,7 @@ impl VerificationQueue { .verification .sizes .verifying - .load(AtomicOrdering::Acquire); + .load(AtomicOrdering::SeqCst); (len, size + len * size_of::>()) }; let (verified_len, verified_bytes) = { @@ -747,7 +737,7 @@ impl VerificationQueue { .verification .sizes .verified - .load(AtomicOrdering::Acquire); + .load(AtomicOrdering::SeqCst); (len, size + len * size_of::()) }; diff --git a/crates/ethcore/sync/src/chain/mod.rs b/crates/ethcore/sync/src/chain/mod.rs index d3854dd20..bf6103a2b 100644 --- a/crates/ethcore/sync/src/chain/mod.rs +++ b/crates/ethcore/sync/src/chain/mod.rs @@ -489,7 +489,7 @@ impl ChainSyncApi { if self .priority_tasks_gate - .compare_exchange(false, true, Ordering::Acquire, Ordering::Release) + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { return; @@ -552,7 +552,7 @@ impl ChainSyncApi { // Process as many items as we can until the deadline is reached. loop { if work().is_none() { - self.priority_tasks_gate.store(false, Ordering::Release); + self.priority_tasks_gate.store(false, Ordering::SeqCst); return; } } diff --git a/crates/net/network-devp2p/src/host.rs b/crates/net/network-devp2p/src/host.rs index cb36da9a9..9ac7221d3 100644 --- a/crates/net/network-devp2p/src/host.rs +++ b/crates/net/network-devp2p/src/host.rs @@ -464,7 +464,7 @@ impl Host { } pub fn stop(&self, io: &IoContext) { - self.stopping.store(true, AtomicOrdering::Release); + self.stopping.store(true, AtomicOrdering::SeqCst); let mut to_kill = Vec::new(); for e in self.sessions.read().iter() { let mut s = e.lock(); @@ -1168,7 +1168,7 @@ impl IoHandler for Host { } fn stream_readable(&self, io: &IoContext, stream: StreamToken) { - if self.stopping.load(AtomicOrdering::Acquire) { + if self.stopping.load(AtomicOrdering::SeqCst) { return; } match stream { @@ -1180,7 +1180,7 @@ impl IoHandler for Host { } fn stream_writable(&self, io: &IoContext, stream: StreamToken) { - if self.stopping.load(AtomicOrdering::Acquire) { + if self.stopping.load(AtomicOrdering::SeqCst) { return; } match stream { @@ -1191,7 +1191,7 @@ impl IoHandler for Host { } fn timeout(&self, io: &IoContext, token: TimerToken) { - if self.stopping.load(AtomicOrdering::Acquire) { + if self.stopping.load(AtomicOrdering::SeqCst) { return; } match token { @@ -1253,7 +1253,7 @@ impl IoHandler for Host { } fn message(&self, io: &IoContext, message: &NetworkIoMessage) { - if self.stopping.load(AtomicOrdering::Acquire) { + if self.stopping.load(AtomicOrdering::SeqCst) { return; } match *message { diff --git a/crates/net/network-devp2p/tests/tests.rs b/crates/net/network-devp2p/tests/tests.rs index ab97c2c15..eec2aad78 100644 --- a/crates/net/network-devp2p/tests/tests.rs +++ b/crates/net/network-devp2p/tests/tests.rs @@ -73,11 +73,11 @@ impl TestProtocol { } pub fn got_timeout(&self) -> bool { - self.got_timeout.load(AtomicOrdering::Relaxed) + self.got_timeout.load(AtomicOrdering::SeqCst) } pub fn got_disconnect(&self) -> bool { - self.got_disconnect.load(AtomicOrdering::Relaxed) + self.got_disconnect.load(AtomicOrdering::SeqCst) } } @@ -101,13 +101,13 @@ impl NetworkProtocolHandler for TestProtocol { } fn disconnected(&self, _io: &dyn NetworkContext, _peer: &PeerId) { - self.got_disconnect.store(true, AtomicOrdering::Relaxed); + self.got_disconnect.store(true, AtomicOrdering::SeqCst); } /// Timer function called after a timeout created with `NetworkContext::timeout`. fn timeout(&self, _io: &dyn NetworkContext, timer: TimerToken) { assert_eq!(timer, 0); - self.got_timeout.store(true, AtomicOrdering::Relaxed); + self.got_timeout.store(true, AtomicOrdering::SeqCst); } } diff --git a/crates/rpc/src/v1/informant.rs b/crates/rpc/src/v1/informant.rs index 0b146969d..7e64dc756 100644 --- a/crates/rpc/src/v1/informant.rs +++ b/crates/rpc/src/v1/informant.rs @@ -171,7 +171,7 @@ impl RpcStats { /// Returns number of open sessions pub fn sessions(&self) -> usize { - self.active_sessions.load(atomic::Ordering::Relaxed) + self.active_sessions.load(atomic::Ordering::SeqCst) } /// Returns requests rate diff --git a/crates/runtime/io/src/worker.rs b/crates/runtime/io/src/worker.rs index a3b4b7721..7d875121c 100644 --- a/crates/runtime/io/src/worker.rs +++ b/crates/runtime/io/src/worker.rs @@ -86,13 +86,13 @@ impl Worker { future::loop_fn(ini, |(stealer, channel, wait, wait_mutex, deleting)| { { let mut lock = wait_mutex.lock(); - if deleting.load(AtomicOrdering::Acquire) { + if deleting.load(AtomicOrdering::SeqCst) { return Ok(Loop::Break(())); } wait.wait(&mut lock); } - while !deleting.load(AtomicOrdering::Acquire) { + while !deleting.load(AtomicOrdering::SeqCst) { match stealer.steal() { deque::Steal::Data(work) => { Worker::do_work(work, channel.clone()) @@ -147,7 +147,7 @@ impl Drop for Worker { fn drop(&mut self) { trace!(target: "shutdown", "[IoWorker] Closing..."); let _ = self.wait_mutex.lock(); - self.deleting.store(true, AtomicOrdering::Release); + self.deleting.store(true, AtomicOrdering::SeqCst); self.wait.notify_all(); if let Some(thread) = self.thread.take() { thread.join().ok(); diff --git a/crates/util/cli-signer/rpc-client/src/client.rs b/crates/util/cli-signer/rpc-client/src/client.rs index 30c227bf9..d27862036 100644 --- a/crates/util/cli-signer/rpc-client/src/client.rs +++ b/crates/util/cli-signer/rpc-client/src/client.rs @@ -259,7 +259,7 @@ impl Rpc { { let (c, p) = oneshot::>(); - let id = self.counter.fetch_add(1, Ordering::Relaxed); + let id = self.counter.fetch_add(1, Ordering::SeqCst); self.pending.insert(id, c); let request = MethodCall {