diff --git a/rpc/src/v1/impls/pubsub.rs b/rpc/src/v1/impls/pubsub.rs index 98ac5707e..193fec72b 100644 --- a/rpc/src/v1/impls/pubsub.rs +++ b/rpc/src/v1/impls/pubsub.rs @@ -21,7 +21,7 @@ use std::time::Duration; use parking_lot::RwLock; use jsonrpc_core::{self as core, Result, MetaIoHandler}; -use jsonrpc_core::futures::{Future, Stream, Sink}; +use jsonrpc_core::futures::{future, Future, Stream, Sink}; use jsonrpc_macros::Trailing; use jsonrpc_macros::pubsub::Subscriber; use jsonrpc_pubsub::SubscriptionId; @@ -42,7 +42,7 @@ impl> PubSubClient { /// Creates new `PubSubClient`. pub fn new(rpc: MetaIoHandler, executor: Executor) -> Self { let poll_manager = Arc::new(RwLock::new(GenericPollManager::new(rpc))); - let pm2 = poll_manager.clone(); + let pm2 = Arc::downgrade(&poll_manager); let timer = tokio_timer::wheel() .tick_duration(Duration::from_millis(500)) @@ -52,7 +52,13 @@ impl> PubSubClient { let interval = timer.interval(Duration::from_millis(1000)); executor.spawn(interval .map_err(|e| warn!("Polling timer error: {:?}", e)) - .for_each(move |_| pm2.read().tick()) + .for_each(move |_| { + if let Some(pm2) = pm2.upgrade() { + pm2.read().tick() + } else { + Box::new(future::err(())) + } + }) ); PubSubClient {