Don't send to old subscriptions. (#5960)

This commit is contained in:
Tomasz Drwięga 2017-07-10 13:23:19 +02:00 committed by Arkadiy Paronyan
parent 67c1f71b6e
commit ed5efebec1

View File

@ -17,6 +17,7 @@
//! Generic poll manager for Pub-Sub. //! Generic poll manager for Pub-Sub.
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool};
use util::Mutex; use util::Mutex;
use jsonrpc_core::futures::future::{self, Either}; use jsonrpc_core::futures::future::{self, Either};
@ -34,7 +35,8 @@ struct Subscription {
method: String, method: String,
params: core::Params, params: core::Params,
sink: mpsc::Sender<Result<core::Value, core::Error>>, sink: mpsc::Sender<Result<core::Value, core::Error>>,
last_result: Arc<Mutex<Option<core::Output>>>, /// a flag if subscription is still active and last returned value
last_result: Arc<(AtomicBool, Mutex<Option<core::Output>>)>,
} }
/// A struct managing all subscriptions. /// A struct managing all subscriptions.
@ -68,10 +70,10 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
{ {
let (sink, stream) = mpsc::channel(1); let (sink, stream) = mpsc::channel(1);
let subscription = Subscription { let subscription = Subscription {
metadata: metadata, metadata,
method: method, method,
params: params, params,
sink: sink, sink,
last_result: Default::default(), last_result: Default::default(),
}; };
let id = self.subscribers.insert(subscription); let id = self.subscribers.insert(subscription);
@ -80,7 +82,9 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
pub fn unsubscribe(&mut self, id: &SubscriptionId) -> bool { pub fn unsubscribe(&mut self, id: &SubscriptionId) -> bool {
debug!(target: "pubsub", "Removing subscription: {:?}", id); debug!(target: "pubsub", "Removing subscription: {:?}", id);
self.subscribers.remove(id).is_some() self.subscribers.remove(id).map(|subscription| {
subscription.last_result.0.store(true, atomic::Ordering::SeqCst);
}).is_some()
} }
pub fn tick(&self) -> BoxFuture<(), ()> { pub fn tick(&self) -> BoxFuture<(), ()> {
@ -100,7 +104,12 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
let sender = subscription.sink.clone(); let sender = subscription.sink.clone();
let result = result.and_then(move |response| { let result = result.and_then(move |response| {
let mut last_result = last_result.lock(); // quick check if the subscription is still valid
if last_result.0.load(atomic::Ordering::SeqCst) {
return Either::B(future::ok(()))
}
let mut last_result = last_result.1.lock();
if *last_result != response && response.is_some() { if *last_result != response && response.is_some() {
let output = response.expect("Existence proved by the condition."); let output = response.expect("Existence proved by the condition.");
debug!(target: "pubsub", "Got new response, sending: {:?}", output); debug!(target: "pubsub", "Got new response, sending: {:?}", output);