diff --git a/rpc/src/v1/helpers/subscription_manager.rs b/rpc/src/v1/helpers/subscription_manager.rs index 1cf067901..c5e4216f4 100644 --- a/rpc/src/v1/helpers/subscription_manager.rs +++ b/rpc/src/v1/helpers/subscription_manager.rs @@ -17,6 +17,7 @@ //! Generic poll manager for Pub-Sub. use std::sync::Arc; +use std::sync::atomic::{self, AtomicBool}; use util::Mutex; use jsonrpc_core::futures::future::{self, Either}; @@ -34,7 +35,8 @@ struct Subscription { method: String, params: core::Params, sink: mpsc::Sender>, - last_result: Arc>>, + /// a flag if subscription is still active and last returned value + last_result: Arc<(AtomicBool, Mutex>)>, } /// A struct managing all subscriptions. @@ -68,10 +70,10 @@ impl> GenericPollManager { { let (sink, stream) = mpsc::channel(1); let subscription = Subscription { - metadata: metadata, - method: method, - params: params, - sink: sink, + metadata, + method, + params, + sink, last_result: Default::default(), }; let id = self.subscribers.insert(subscription); @@ -80,7 +82,9 @@ impl> GenericPollManager { pub fn unsubscribe(&mut self, id: &SubscriptionId) -> bool { debug!(target: "pubsub", "Removing subscription: {:?}", id); - self.subscribers.remove(id).is_some() + self.subscribers.remove(id).map(|subscription| { + subscription.last_result.0.store(true, atomic::Ordering::SeqCst); + }).is_some() } pub fn tick(&self) -> BoxFuture<(), ()> { @@ -100,7 +104,12 @@ impl> GenericPollManager { let sender = subscription.sink.clone(); let result = result.and_then(move |response| { - let mut last_result = last_result.lock(); + // quick check if the subscription is still valid + if last_result.0.load(atomic::Ordering::SeqCst) { + return Either::B(future::ok(())) + } + + let mut last_result = last_result.1.lock(); if *last_result != response && response.is_some() { let output = response.expect("Existence proved by the condition."); debug!(target: "pubsub", "Got new response, sending: {:?}", output);