Generic PubSub implementation (#5456)

* Generic PubSub

* Adding more tests.

* Fix submodules.

* Remove PartialEq

* Actually remove the implementation.

* Update mod.rs

* Update mod.rs
This commit is contained in:
Tomasz Drwięga
2017-05-06 13:24:18 +02:00
committed by Gav Wood
parent 91d6f14e3c
commit 1617264b69
16 changed files with 524 additions and 27 deletions

View File

@@ -26,6 +26,7 @@ extern crate semver;
extern crate serde;
extern crate serde_json;
extern crate time;
extern crate tokio_timer;
extern crate transient_hashmap;
extern crate cid;
extern crate multihash;
@@ -34,8 +35,9 @@ extern crate rand;
extern crate jsonrpc_core;
extern crate jsonrpc_http_server as http;
extern crate jsonrpc_minihttp_server as minihttp;
extern crate jsonrpc_ipc_server as ipc;
extern crate jsonrpc_minihttp_server as minihttp;
extern crate jsonrpc_pubsub;
extern crate ethash;
extern crate ethcore;
@@ -76,6 +78,7 @@ pub extern crate jsonrpc_ws_server as ws;
mod metadata;
pub mod v1;
pub use jsonrpc_pubsub::Session as PubSubSession;
pub use ipc::{Server as IpcServer, MetaExtractor as IpcMetaExtractor, RequestContext as IpcRequestContext};
pub use http::{
hyper,

View File

@@ -33,6 +33,7 @@ mod poll_filter;
mod requests;
mod signer;
mod signing_queue;
mod subscription_manager;
pub use self::dispatch::{Dispatcher, FullDispatcher};
pub use self::network_settings::NetworkSettings;
@@ -46,3 +47,4 @@ pub use self::signing_queue::{
QUEUE_LIMIT as SIGNING_QUEUE_LIMIT,
};
pub use self::signer::SignerService;
pub use self::subscription_manager::GenericPollManager;

View File

@@ -0,0 +1,175 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Generic poll manager for Pub-Sub.
use std::sync::Arc;
use std::collections::HashMap;
use util::Mutex;
use jsonrpc_core::futures::future::{self, Either};
use jsonrpc_core::futures::sync::mpsc;
use jsonrpc_core::futures::{Sink, Future, BoxFuture};
use jsonrpc_core::{self as core, MetaIoHandler};
use v1::metadata::Metadata;
#[derive(Debug)]
struct Subscription {
metadata: Metadata,
method: String,
params: core::Params,
sink: mpsc::Sender<Result<core::Value, core::Error>>,
last_result: Arc<Mutex<Option<core::Output>>>,
}
/// A struct managing all subscriptions.
/// TODO [ToDr] Depending on the method decide on poll interval.
/// For most of the methods it will be enough to poll on new block instead of time-interval.
pub struct GenericPollManager<S: core::Middleware<Metadata>> {
next_id: usize,
poll_subscriptions: HashMap<usize, Subscription>,
rpc: MetaIoHandler<Metadata, S>,
}
impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
/// Creates new poll manager
pub fn new(rpc: MetaIoHandler<Metadata, S>) -> Self {
GenericPollManager {
next_id: 1,
poll_subscriptions: Default::default(),
rpc: rpc,
}
}
/// Subscribes to update from polling given method.
pub fn subscribe(&mut self, metadata: Metadata, method: String, params: core::Params)
-> (usize, mpsc::Receiver<Result<core::Value, core::Error>>)
{
let id = self.next_id;
self.next_id += 1;
let (sink, stream) = mpsc::channel(1);
let subscription = Subscription {
metadata: metadata,
method: method,
params: params,
sink: sink,
last_result: Default::default(),
};
debug!(target: "pubsub", "Adding subscription id={:?}, {:?}", id, subscription);
self.poll_subscriptions.insert(id, subscription);
(id, stream)
}
pub fn unsubscribe(&mut self, id: usize) -> bool {
debug!(target: "pubsub", "Removing subscription: {:?}", id);
self.poll_subscriptions.remove(&id).is_some()
}
pub fn tick(&self) -> BoxFuture<(), ()> {
let mut futures = Vec::new();
// poll all subscriptions
for (id, subscription) in self.poll_subscriptions.iter() {
let call = core::MethodCall {
jsonrpc: Some(core::Version::V2),
id: core::Id::Num(*id as u64),
method: subscription.method.clone(),
params: Some(subscription.params.clone()),
};
trace!(target: "pubsub", "Polling method: {:?}", call);
let result = self.rpc.handle_call(call.into(), subscription.metadata.clone());
let last_result = subscription.last_result.clone();
let sender = subscription.sink.clone();
let result = result.and_then(move |response| {
let mut last_result = last_result.lock();
if *last_result != response && response.is_some() {
let output = response.expect("Existence proved by the condition.");
debug!(target: "pubsub", "Got new response, sending: {:?}", output);
*last_result = Some(output.clone());
let send = match output {
core::Output::Success(core::Success { result, .. }) => Ok(result),
core::Output::Failure(core::Failure { error, .. }) => Err(error),
};
Either::A(sender.send(send).map(|_| ()).map_err(|_| ()))
} else {
trace!(target: "pubsub", "Response was not changed: {:?}", response);
Either::B(future::ok(()))
}
});
futures.push(result)
}
// return a future represeting all the polls
future::join_all(futures).map(|_| ()).boxed()
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{self, AtomicBool};
use jsonrpc_core::{MetaIoHandler, NoopMiddleware, Value, Params};
use jsonrpc_core::futures::{Future, Stream};
use http::tokio_core::reactor;
use super::GenericPollManager;
fn poll_manager() -> GenericPollManager<NoopMiddleware> {
let mut io = MetaIoHandler::default();
let called = AtomicBool::new(false);
io.add_method("hello", move |_| {
if !called.load(atomic::Ordering::SeqCst) {
called.store(true, atomic::Ordering::SeqCst);
Ok(Value::String("hello".into()))
} else {
Ok(Value::String("world".into()))
}
});
GenericPollManager::new(io)
}
#[test]
fn should_poll_subscribed_method() {
// given
let mut el = reactor::Core::new().unwrap();
let mut poll_manager = poll_manager();
let (id, rx) = poll_manager.subscribe(Default::default(), "hello".into(), Params::None);
assert_eq!(id, 1);
// then
poll_manager.tick().wait().unwrap();
let (res, rx) = el.run(rx.into_future()).unwrap();
assert_eq!(res, Some(Ok(Value::String("hello".into()))));
// retrieve second item
poll_manager.tick().wait().unwrap();
let (res, rx) = el.run(rx.into_future()).unwrap();
assert_eq!(res, Some(Ok(Value::String("world".into()))));
// and no more notifications
poll_manager.tick().wait().unwrap();
// we need to unsubscribe otherwise the future will never finish.
poll_manager.unsubscribe(1);
assert_eq!(el.run(rx.into_future()).unwrap().0, None);
}
}

View File

@@ -23,6 +23,7 @@ mod parity;
mod parity_accounts;
mod parity_set;
mod personal;
mod pubsub;
mod signer;
mod signing;
mod signing_unsafe;
@@ -33,7 +34,6 @@ mod web3;
pub mod light;
pub use self::web3::Web3Client;
pub use self::eth::{EthClient, EthClientOptions};
pub use self::eth_filter::EthFilterClient;
pub use self::net::NetClient;
@@ -41,9 +41,11 @@ pub use self::parity::ParityClient;
pub use self::parity_accounts::ParityAccountsClient;
pub use self::parity_set::ParitySetClient;
pub use self::personal::PersonalClient;
pub use self::pubsub::PubSubClient;
pub use self::signer::SignerClient;
pub use self::signing::SigningQueueClient;
pub use self::signing_unsafe::SigningUnsafeClient;
pub use self::traces::TracesClient;
pub use self::web3::Web3Client;
pub use self::rpc::RpcClient;
pub use self::secretstore::SecretStoreClient;

100
rpc/src/v1/impls/pubsub.rs Normal file
View File

@@ -0,0 +1,100 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Parity-specific PUB-SUB rpc implementation.
use std::sync::Arc;
use std::time::Duration;
use util::RwLock;
use futures::{self, BoxFuture, Future, Stream, Sink};
use jsonrpc_core::{self as core, Error, MetaIoHandler};
use jsonrpc_macros::pubsub::Subscriber;
use jsonrpc_pubsub::SubscriptionId;
use tokio_timer;
use parity_reactor::Remote;
use v1::helpers::GenericPollManager;
use v1::metadata::Metadata;
use v1::traits::PubSub;
/// Parity PubSub implementation.
pub struct PubSubClient<S: core::Middleware<Metadata>> {
poll_manager: Arc<RwLock<GenericPollManager<S>>>,
remote: Remote,
}
impl<S: core::Middleware<Metadata>> PubSubClient<S> {
/// Creates new `PubSubClient`.
pub fn new(rpc: MetaIoHandler<Metadata, S>, remote: Remote) -> Self {
let poll_manager = Arc::new(RwLock::new(GenericPollManager::new(rpc)));
let pm2 = poll_manager.clone();
let timer = tokio_timer::wheel()
.tick_duration(Duration::from_millis(500))
.build();
// Start ticking
let interval = timer.interval(Duration::from_millis(1000));
remote.spawn(interval
.map_err(|e| warn!("Polling timer error: {:?}", e))
.for_each(move |_| pm2.read().tick())
);
PubSubClient {
poll_manager: poll_manager,
remote: remote,
}
}
}
impl<S: core::Middleware<Metadata>> PubSub for PubSubClient<S> {
type Metadata = Metadata;
fn parity_subscribe(&self, mut meta: Metadata, subscriber: Subscriber<core::Value>, method: String, params: core::Params) {
// Make sure to get rid of PubSub session otherwise it will never be dropped.
meta.session = None;
let mut poll_manager = self.poll_manager.write();
let (id, receiver) = poll_manager.subscribe(meta, method, params);
match subscriber.assign_id(SubscriptionId::Number(id as u64)) {
Ok(sink) => {
self.remote.spawn(receiver.map(|res| match res {
Ok(val) => val,
Err(error) => {
warn!(target: "pubsub", "Subscription error: {:?}", error);
core::Value::Null
},
}).forward(sink.sink_map_err(|e| {
warn!("Cannot send notification: {:?}", e);
})).map(|_| ()));
},
Err(_) => {
poll_manager.unsubscribe(id);
},
}
}
fn parity_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> {
let res = if let SubscriptionId::Number(id) = id {
self.poll_manager.write().unsubscribe(id as usize)
} else {
false
};
futures::future::ok(res).boxed()
}
}

View File

@@ -14,20 +14,26 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use jsonrpc_core;
use jsonrpc_pubsub::{Session, PubSubMetadata};
use v1::types::{DappId, Origin};
/// RPC methods metadata.
#[derive(Clone, Default, Debug, PartialEq)]
#[derive(Clone, Default, Debug)]
pub struct Metadata {
/// Request origin
pub origin: Origin,
/// Request PubSub Session
pub session: Option<Arc<Session>>,
}
impl Metadata {
/// Get
/// Returns dapp id if this request is coming from a Dapp or default `DappId` otherwise.
pub fn dapp_id(&self) -> DappId {
// TODO [ToDr] Extract dapp info from Ws connections.
match self.origin {
Origin::Dapps(ref dapp_id) => dapp_id.clone(),
_ => DappId::default(),
@@ -36,4 +42,8 @@ impl Metadata {
}
impl jsonrpc_core::Metadata for Metadata {}
impl PubSubMetadata for Metadata {
fn session(&self) -> Option<Arc<Session>> {
self.session.clone()
}
}

View File

@@ -58,7 +58,7 @@ pub mod traits;
pub mod tests;
pub mod types;
pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, Signer, Personal, Traces, Rpc, SecretStore};
pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, PubSub, Signer, Personal, Traces, Rpc, SecretStore};
pub use self::impls::*;
pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import, informant, dispatch};
pub use self::metadata::Metadata;

View File

@@ -24,6 +24,7 @@ mod parity;
mod parity_accounts;
mod parity_set;
mod personal;
mod pubsub;
mod rpc;
mod secretstore;
mod signer;

View File

@@ -0,0 +1,76 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::{atomic, Arc};
use jsonrpc_core::{self as core, MetaIoHandler};
use jsonrpc_core::futures::{self, Stream, Future};
use jsonrpc_pubsub::Session;
use parity_reactor::EventLoop;
use v1::{PubSub, PubSubClient, Metadata};
fn rpc() -> MetaIoHandler<Metadata, core::NoopMiddleware> {
let mut io = MetaIoHandler::default();
let called = atomic::AtomicBool::new(false);
io.add_method("hello", move |_| {
if !called.load(atomic::Ordering::SeqCst) {
called.store(true, atomic::Ordering::SeqCst);
Ok(core::Value::String("hello".into()))
} else {
Ok(core::Value::String("world".into()))
}
});
io
}
#[test]
fn should_subscribe_to_a_method() {
// given
let el = EventLoop::spawn();
let rpc = rpc();
let pubsub = PubSubClient::new(rpc, el.remote()).to_delegate();
let mut io = MetaIoHandler::default();
io.extend_with(pubsub);
let mut metadata = Metadata::default();
let (sender, receiver) = futures::sync::mpsc::channel(8);
metadata.session = Some(Arc::new(Session::new(sender)));
// Subscribe
let request = r#"{"jsonrpc": "2.0", "method": "parity_subscribe", "params": ["hello", []], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":1,"id":1}"#;
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
// Check notifications
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"hello","subscription":1}}"#;
assert_eq!(res, Some(response.into()));
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"world","subscription":1}}"#;
assert_eq!(res, Some(response.into()));
// And unsubscribe
let request = r#"{"jsonrpc": "2.0", "method": "parity_unsubscribe", "params": [1], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned()));
let (res, _receiver) = receiver.into_future().wait().unwrap();
assert_eq!(res, None);
}

View File

@@ -25,6 +25,7 @@ pub mod parity_accounts;
pub mod parity_set;
pub mod parity_signing;
pub mod personal;
pub mod pubsub;
pub mod signer;
pub mod traces;
pub mod rpc;
@@ -39,6 +40,7 @@ pub use self::parity_accounts::ParityAccounts;
pub use self::parity_set::ParitySet;
pub use self::parity_signing::ParitySigning;
pub use self::personal::Personal;
pub use self::pubsub::PubSub;
pub use self::signer::Signer;
pub use self::traces::Traces;
pub use self::rpc::Rpc;

View File

@@ -0,0 +1,39 @@
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Parity-specific PUB-SUB rpc interface.
use jsonrpc_core::{Error, Value, Params};
use jsonrpc_pubsub::SubscriptionId;
use jsonrpc_macros::pubsub::Subscriber;
use futures::BoxFuture;
build_rpc_trait! {
/// Parity-specific PUB-SUB rpc interface.
pub trait PubSub {
type Metadata;
#[pubsub(name = "parity_subscription")] {
/// Subscribe to changes of any RPC method in Parity.
#[rpc(name = "parity_subscribe")]
fn parity_subscribe(&self, Self::Metadata, Subscriber<Value>, String, Params);
/// Unsubscribe from existing Parity subscription.
#[rpc(name = "parity_unsubscribe")]
fn parity_unsubscribe(&self, SubscriptionId) -> BoxFuture<bool, Error>;
}
}
}