diff --git a/Cargo.lock b/Cargo.lock index f59140ada..e92bae295 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1705,6 +1705,7 @@ dependencies = [ "jsonrpc-ipc-server 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-macros 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-minihttp-server 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", + "jsonrpc-pubsub 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-ws-server 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "multihash 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1722,6 +1723,7 @@ dependencies = [ "serde_json 0.9.5 (registry+https://github.com/rust-lang/crates.io-index)", "stats 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "transient-hashmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2554,6 +2556,15 @@ dependencies = [ "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-timer" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio-uds" version = "0.1.4" @@ -2987,6 +2998,7 @@ dependencies = [ "checksum tokio-proto 0.1.0 (git+https://github.com/tomusdrw/tokio-proto)" = "" "checksum tokio-proto 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7c0d6031f94d78d7b4d509d4a7c5e1cdf524a17e7b08d1c188a83cf720e69808" "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" +"checksum tokio-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "86f33def658c14724fc13ec6289b3875a8152ee8ae767a5b1ccbded363b03db8" "checksum tokio-uds 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bd209039933255ea77c6d7a1d18abc20b997d161acb900acca6eb74cdd049f31" "checksum toml 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "fcd27a04ca509aff336ba5eb2abc58d456f52c4ff64d9724d88acb85ead560b6" "checksum toml 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a442dfc13508e603c3f763274361db7f79d7469a0e95c411cde53662ab30fc72" diff --git a/parity/rpc.rs b/parity/rpc.rs index 1fc503767..2ac93baf7 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -22,7 +22,7 @@ use dir::default_data_path; use parity_rpc::informant::{RpcStats, Middleware}; use parity_rpc::{self as rpc, HttpServerError, Metadata, Origin, DomainsValidation}; use helpers::parity_ipc_path; -use jsonrpc_core::MetaIoHandler; +use jsonrpc_core::{futures, MetaIoHandler}; use parity_reactor::TokioRemote; use rpc_apis::{self, ApiSet}; @@ -126,11 +126,53 @@ impl rpc::IpcMetaExtractor for RpcExtractor { } } -impl rpc::ws::MetaExtractor for RpcExtractor { +struct Sender(rpc::ws::ws::Sender, futures::sync::mpsc::Receiver); + +impl futures::Future for Sender { + type Item = (); + type Error = (); + + fn poll(&mut self) -> futures::Poll { + use self::futures::Stream; + + let item = self.1.poll()?; + match item { + futures::Async::NotReady => { + Ok(futures::Async::NotReady) + }, + futures::Async::Ready(None) => { + Ok(futures::Async::Ready(())) + }, + futures::Async::Ready(Some(val)) => { + if let Err(e) = self.0.send(val) { + warn!("Error sending a subscription update: {:?}", e); + } + self.poll() + }, + } + } +} + +struct WsRpcExtractor { + remote: TokioRemote, +} + +impl WsRpcExtractor { + fn wrap_out(&self, out: rpc::ws::ws::Sender) -> futures::sync::mpsc::Sender { + let (sender, receiver) = futures::sync::mpsc::channel(8); + self.remote.spawn(move |_| Sender(out, receiver)); + sender + } +} + +impl rpc::ws::MetaExtractor for WsRpcExtractor { fn extract(&self, req: &rpc::ws::RequestContext) -> Metadata { let mut metadata = Metadata::default(); let id = req.session_id as u64; metadata.origin = Origin::Ws(id.into()); + metadata.session = Some(Arc::new(rpc::PubSubSession::new( + self.wrap_out(req.out.clone()) + ))); metadata } } @@ -173,10 +215,12 @@ pub fn new_ws( let start_result = rpc::start_ws( &addr, handler, - remote, + remote.clone(), allowed_origins, allowed_hosts, - RpcExtractor, + WsRpcExtractor { + remote: remote, + }, WsStats { stats: deps.stats.clone(), }, @@ -247,7 +291,14 @@ pub fn new_ipc( let handler = setup_apis(conf.apis, dependencies); let remote = dependencies.remote.clone(); - match rpc::start_ipc(&conf.socket_addr, handler, remote, RpcExtractor) { + let ipc = rpc::start_ipc( + &conf.socket_addr, + handler, + remote, + RpcExtractor, + ); + + match ipc { Ok(server) => Ok(Some(server)), Err(io_error) => Err(format!("IPC error: {}", io_error)), } diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 0ff204aba..48e66e322 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -31,11 +31,12 @@ use parity_rpc::informant::{ActivityNotifier, Middleware, RpcStats, ClientNotifi use parity_rpc::dispatch::{FullDispatcher, LightDispatcher}; use ethsync::{ManageNetwork, SyncProvider, LightSync}; use hash_fetch::fetch::Client as FetchClient; -use jsonrpc_core::{MetaIoHandler}; +use jsonrpc_core::{self as core, MetaIoHandler}; use light::{TransactionQueue as LightTransactionQueue, Cache as LightDataCache}; use updater::Updater; use util::{Mutex, RwLock}; use ethcore_logger::RotatingLogger; +use parity_reactor; #[derive(Debug, PartialEq, Clone, Eq, Hash)] pub enum Api { @@ -195,18 +196,16 @@ pub struct FullDependencies { pub dapps_interface: Option, pub dapps_port: Option, pub fetch: FetchClient, + pub remote: parity_reactor::Remote, } -impl Dependencies for FullDependencies { - type Notifier = ClientNotifier; - - fn activity_notifier(&self) -> ClientNotifier { - ClientNotifier { - client: self.client.clone(), - } - } - - fn extend_with_set(&self, handler: &mut MetaIoHandler, apis: &[Api]) { +impl FullDependencies { + fn extend_api>( + &self, + handler: &mut MetaIoHandler, + apis: &[Api], + for_generic_pubsub: bool, + ) { use parity_rpc::v1::*; macro_rules! add_signing_methods { @@ -248,10 +247,12 @@ impl Dependencies for FullDependencies { ); handler.extend_with(client.to_delegate()); - let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone()); - handler.extend_with(filter_client.to_delegate()); + if !for_generic_pubsub { + let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone()); + handler.extend_with(filter_client.to_delegate()); - add_signing_methods!(EthSigning, handler, self); + add_signing_methods!(EthSigning, handler, self); + } }, Api::Personal => { handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate()); @@ -278,8 +279,14 @@ impl Dependencies for FullDependencies { self.dapps_port, ).to_delegate()); - add_signing_methods!(EthSigning, handler, self); - add_signing_methods!(ParitySigning, handler, self); + if !for_generic_pubsub { + let mut rpc = MetaIoHandler::default(); + self.extend_api(&mut rpc, apis, true); + handler.extend_with(PubSubClient::new(rpc, self.remote.clone()).to_delegate()); + + add_signing_methods!(EthSigning, handler, self); + add_signing_methods!(ParitySigning, handler, self); + } }, Api::ParityAccounts => { handler.extend_with(ParityAccountsClient::new(&self.secret_store).to_delegate()); @@ -308,6 +315,20 @@ impl Dependencies for FullDependencies { } } +impl Dependencies for FullDependencies { + type Notifier = ClientNotifier; + + fn activity_notifier(&self) -> ClientNotifier { + ClientNotifier { + client: self.client.clone(), + } + } + + fn extend_with_set(&self, handler: &mut MetaIoHandler>, apis: &[Api]) { + self.extend_api(handler, apis, false) + } +} + /// Light client notifier. Doesn't do anything yet, but might in the future. pub struct LightClientNotifier; diff --git a/parity/run.rs b/parity/run.rs index 9d90ba006..083cbe5ce 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -631,6 +631,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R false => None, }, fetch: fetch.clone(), + remote: event_loop.remote(), }); let dependencies = rpc::Dependencies { diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 0067bdeaa..b1e2d64aa 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -17,6 +17,7 @@ serde = "0.9" serde_derive = "0.9" serde_json = "0.9" time = "0.1" +tokio-timer = "0.1" transient-hashmap = "0.4" cid = "0.2.1" multihash = "0.5" @@ -29,6 +30,7 @@ jsonrpc-minihttp-server = { git = "https://github.com/paritytech/jsonrpc.git", b jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" } jsonrpc-ipc-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" } jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" } +jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.7" } ethcore-io = { path = "../util/io" } ethcore-ipc = { path = "../ipc/rpc" } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 291c5bcd8..4540d6b33 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -26,6 +26,7 @@ extern crate semver; extern crate serde; extern crate serde_json; extern crate time; +extern crate tokio_timer; extern crate transient_hashmap; extern crate cid; extern crate multihash; @@ -34,8 +35,9 @@ extern crate rand; extern crate jsonrpc_core; extern crate jsonrpc_http_server as http; -extern crate jsonrpc_minihttp_server as minihttp; extern crate jsonrpc_ipc_server as ipc; +extern crate jsonrpc_minihttp_server as minihttp; +extern crate jsonrpc_pubsub; extern crate ethash; extern crate ethcore; @@ -76,6 +78,7 @@ pub extern crate jsonrpc_ws_server as ws; mod metadata; pub mod v1; +pub use jsonrpc_pubsub::Session as PubSubSession; pub use ipc::{Server as IpcServer, MetaExtractor as IpcMetaExtractor, RequestContext as IpcRequestContext}; pub use http::{ hyper, diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs index 0a4265616..1e26de852 100644 --- a/rpc/src/v1/helpers/mod.rs +++ b/rpc/src/v1/helpers/mod.rs @@ -33,6 +33,7 @@ mod poll_filter; mod requests; mod signer; mod signing_queue; +mod subscription_manager; pub use self::dispatch::{Dispatcher, FullDispatcher}; pub use self::network_settings::NetworkSettings; @@ -46,3 +47,4 @@ pub use self::signing_queue::{ QUEUE_LIMIT as SIGNING_QUEUE_LIMIT, }; pub use self::signer::SignerService; +pub use self::subscription_manager::GenericPollManager; diff --git a/rpc/src/v1/helpers/subscription_manager.rs b/rpc/src/v1/helpers/subscription_manager.rs new file mode 100644 index 000000000..4b82fdcc4 --- /dev/null +++ b/rpc/src/v1/helpers/subscription_manager.rs @@ -0,0 +1,175 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Generic poll manager for Pub-Sub. + +use std::sync::Arc; +use std::collections::HashMap; +use util::Mutex; + +use jsonrpc_core::futures::future::{self, Either}; +use jsonrpc_core::futures::sync::mpsc; +use jsonrpc_core::futures::{Sink, Future, BoxFuture}; +use jsonrpc_core::{self as core, MetaIoHandler}; + +use v1::metadata::Metadata; + +#[derive(Debug)] +struct Subscription { + metadata: Metadata, + method: String, + params: core::Params, + sink: mpsc::Sender>, + last_result: Arc>>, +} + +/// A struct managing all subscriptions. +/// TODO [ToDr] Depending on the method decide on poll interval. +/// For most of the methods it will be enough to poll on new block instead of time-interval. +pub struct GenericPollManager> { + next_id: usize, + poll_subscriptions: HashMap, + rpc: MetaIoHandler, +} + +impl> GenericPollManager { + /// Creates new poll manager + pub fn new(rpc: MetaIoHandler) -> Self { + GenericPollManager { + next_id: 1, + poll_subscriptions: Default::default(), + rpc: rpc, + } + } + + /// Subscribes to update from polling given method. + pub fn subscribe(&mut self, metadata: Metadata, method: String, params: core::Params) + -> (usize, mpsc::Receiver>) + { + let id = self.next_id; + self.next_id += 1; + + let (sink, stream) = mpsc::channel(1); + + let subscription = Subscription { + metadata: metadata, + method: method, + params: params, + sink: sink, + last_result: Default::default(), + }; + + debug!(target: "pubsub", "Adding subscription id={:?}, {:?}", id, subscription); + self.poll_subscriptions.insert(id, subscription); + (id, stream) + } + + pub fn unsubscribe(&mut self, id: usize) -> bool { + debug!(target: "pubsub", "Removing subscription: {:?}", id); + self.poll_subscriptions.remove(&id).is_some() + } + + pub fn tick(&self) -> BoxFuture<(), ()> { + let mut futures = Vec::new(); + // poll all subscriptions + for (id, subscription) in self.poll_subscriptions.iter() { + let call = core::MethodCall { + jsonrpc: Some(core::Version::V2), + id: core::Id::Num(*id as u64), + method: subscription.method.clone(), + params: Some(subscription.params.clone()), + }; + trace!(target: "pubsub", "Polling method: {:?}", call); + let result = self.rpc.handle_call(call.into(), subscription.metadata.clone()); + + let last_result = subscription.last_result.clone(); + let sender = subscription.sink.clone(); + + let result = result.and_then(move |response| { + let mut last_result = last_result.lock(); + if *last_result != response && response.is_some() { + let output = response.expect("Existence proved by the condition."); + debug!(target: "pubsub", "Got new response, sending: {:?}", output); + *last_result = Some(output.clone()); + + let send = match output { + core::Output::Success(core::Success { result, .. }) => Ok(result), + core::Output::Failure(core::Failure { error, .. }) => Err(error), + }; + Either::A(sender.send(send).map(|_| ()).map_err(|_| ())) + } else { + trace!(target: "pubsub", "Response was not changed: {:?}", response); + Either::B(future::ok(())) + } + }); + + futures.push(result) + } + + // return a future represeting all the polls + future::join_all(futures).map(|_| ()).boxed() + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{self, AtomicBool}; + + use jsonrpc_core::{MetaIoHandler, NoopMiddleware, Value, Params}; + use jsonrpc_core::futures::{Future, Stream}; + use http::tokio_core::reactor; + + use super::GenericPollManager; + + fn poll_manager() -> GenericPollManager { + let mut io = MetaIoHandler::default(); + let called = AtomicBool::new(false); + io.add_method("hello", move |_| { + if !called.load(atomic::Ordering::SeqCst) { + called.store(true, atomic::Ordering::SeqCst); + Ok(Value::String("hello".into())) + } else { + Ok(Value::String("world".into())) + } + }); + GenericPollManager::new(io) + } + + #[test] + fn should_poll_subscribed_method() { + // given + let mut el = reactor::Core::new().unwrap(); + let mut poll_manager = poll_manager(); + let (id, rx) = poll_manager.subscribe(Default::default(), "hello".into(), Params::None); + assert_eq!(id, 1); + + // then + poll_manager.tick().wait().unwrap(); + let (res, rx) = el.run(rx.into_future()).unwrap(); + assert_eq!(res, Some(Ok(Value::String("hello".into())))); + + // retrieve second item + poll_manager.tick().wait().unwrap(); + let (res, rx) = el.run(rx.into_future()).unwrap(); + assert_eq!(res, Some(Ok(Value::String("world".into())))); + + // and no more notifications + poll_manager.tick().wait().unwrap(); + // we need to unsubscribe otherwise the future will never finish. + poll_manager.unsubscribe(1); + assert_eq!(el.run(rx.into_future()).unwrap().0, None); + } +} diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index a128e7104..a8691b32b 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -23,6 +23,7 @@ mod parity; mod parity_accounts; mod parity_set; mod personal; +mod pubsub; mod signer; mod signing; mod signing_unsafe; @@ -33,7 +34,6 @@ mod web3; pub mod light; -pub use self::web3::Web3Client; pub use self::eth::{EthClient, EthClientOptions}; pub use self::eth_filter::EthFilterClient; pub use self::net::NetClient; @@ -41,9 +41,11 @@ pub use self::parity::ParityClient; pub use self::parity_accounts::ParityAccountsClient; pub use self::parity_set::ParitySetClient; pub use self::personal::PersonalClient; +pub use self::pubsub::PubSubClient; pub use self::signer::SignerClient; pub use self::signing::SigningQueueClient; pub use self::signing_unsafe::SigningUnsafeClient; pub use self::traces::TracesClient; +pub use self::web3::Web3Client; pub use self::rpc::RpcClient; pub use self::secretstore::SecretStoreClient; diff --git a/rpc/src/v1/impls/pubsub.rs b/rpc/src/v1/impls/pubsub.rs new file mode 100644 index 000000000..8badefdb8 --- /dev/null +++ b/rpc/src/v1/impls/pubsub.rs @@ -0,0 +1,100 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Parity-specific PUB-SUB rpc implementation. + +use std::sync::Arc; +use std::time::Duration; +use util::RwLock; + +use futures::{self, BoxFuture, Future, Stream, Sink}; +use jsonrpc_core::{self as core, Error, MetaIoHandler}; +use jsonrpc_macros::pubsub::Subscriber; +use jsonrpc_pubsub::SubscriptionId; +use tokio_timer; + +use parity_reactor::Remote; +use v1::helpers::GenericPollManager; +use v1::metadata::Metadata; +use v1::traits::PubSub; + +/// Parity PubSub implementation. +pub struct PubSubClient> { + poll_manager: Arc>>, + remote: Remote, +} + +impl> PubSubClient { + /// Creates new `PubSubClient`. + pub fn new(rpc: MetaIoHandler, remote: Remote) -> Self { + let poll_manager = Arc::new(RwLock::new(GenericPollManager::new(rpc))); + let pm2 = poll_manager.clone(); + + let timer = tokio_timer::wheel() + .tick_duration(Duration::from_millis(500)) + .build(); + + // Start ticking + let interval = timer.interval(Duration::from_millis(1000)); + remote.spawn(interval + .map_err(|e| warn!("Polling timer error: {:?}", e)) + .for_each(move |_| pm2.read().tick()) + ); + + PubSubClient { + poll_manager: poll_manager, + remote: remote, + } + } +} + +impl> PubSub for PubSubClient { + type Metadata = Metadata; + + fn parity_subscribe(&self, mut meta: Metadata, subscriber: Subscriber, method: String, params: core::Params) { + // Make sure to get rid of PubSub session otherwise it will never be dropped. + meta.session = None; + + let mut poll_manager = self.poll_manager.write(); + let (id, receiver) = poll_manager.subscribe(meta, method, params); + match subscriber.assign_id(SubscriptionId::Number(id as u64)) { + Ok(sink) => { + self.remote.spawn(receiver.map(|res| match res { + Ok(val) => val, + Err(error) => { + warn!(target: "pubsub", "Subscription error: {:?}", error); + core::Value::Null + }, + }).forward(sink.sink_map_err(|e| { + warn!("Cannot send notification: {:?}", e); + })).map(|_| ())); + }, + Err(_) => { + poll_manager.unsubscribe(id); + }, + } + } + + fn parity_unsubscribe(&self, id: SubscriptionId) -> BoxFuture { + let res = if let SubscriptionId::Number(id) = id { + self.poll_manager.write().unsubscribe(id as usize) + } else { + false + }; + + futures::future::ok(res).boxed() + } +} diff --git a/rpc/src/v1/metadata.rs b/rpc/src/v1/metadata.rs index 26c79d976..74567d510 100644 --- a/rpc/src/v1/metadata.rs +++ b/rpc/src/v1/metadata.rs @@ -14,20 +14,26 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::sync::Arc; + use jsonrpc_core; +use jsonrpc_pubsub::{Session, PubSubMetadata}; use v1::types::{DappId, Origin}; /// RPC methods metadata. -#[derive(Clone, Default, Debug, PartialEq)] +#[derive(Clone, Default, Debug)] pub struct Metadata { /// Request origin pub origin: Origin, + /// Request PubSub Session + pub session: Option>, } impl Metadata { - /// Get + /// Returns dapp id if this request is coming from a Dapp or default `DappId` otherwise. pub fn dapp_id(&self) -> DappId { + // TODO [ToDr] Extract dapp info from Ws connections. match self.origin { Origin::Dapps(ref dapp_id) => dapp_id.clone(), _ => DappId::default(), @@ -36,4 +42,8 @@ impl Metadata { } impl jsonrpc_core::Metadata for Metadata {} - +impl PubSubMetadata for Metadata { + fn session(&self) -> Option> { + self.session.clone() + } +} diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index e591cbd43..59aef84b3 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -58,7 +58,7 @@ pub mod traits; pub mod tests; pub mod types; -pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, Signer, Personal, Traces, Rpc, SecretStore}; +pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, PubSub, Signer, Personal, Traces, Rpc, SecretStore}; pub use self::impls::*; pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import, informant, dispatch}; pub use self::metadata::Metadata; diff --git a/rpc/src/v1/tests/mocked/mod.rs b/rpc/src/v1/tests/mocked/mod.rs index fed358574..e5a459633 100644 --- a/rpc/src/v1/tests/mocked/mod.rs +++ b/rpc/src/v1/tests/mocked/mod.rs @@ -24,6 +24,7 @@ mod parity; mod parity_accounts; mod parity_set; mod personal; +mod pubsub; mod rpc; mod secretstore; mod signer; diff --git a/rpc/src/v1/tests/mocked/pubsub.rs b/rpc/src/v1/tests/mocked/pubsub.rs new file mode 100644 index 000000000..b7c8963e5 --- /dev/null +++ b/rpc/src/v1/tests/mocked/pubsub.rs @@ -0,0 +1,76 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::sync::{atomic, Arc}; + +use jsonrpc_core::{self as core, MetaIoHandler}; +use jsonrpc_core::futures::{self, Stream, Future}; +use jsonrpc_pubsub::Session; + +use parity_reactor::EventLoop; +use v1::{PubSub, PubSubClient, Metadata}; + +fn rpc() -> MetaIoHandler { + let mut io = MetaIoHandler::default(); + let called = atomic::AtomicBool::new(false); + io.add_method("hello", move |_| { + if !called.load(atomic::Ordering::SeqCst) { + called.store(true, atomic::Ordering::SeqCst); + Ok(core::Value::String("hello".into())) + } else { + Ok(core::Value::String("world".into())) + } + }); + io +} + +#[test] +fn should_subscribe_to_a_method() { + // given + let el = EventLoop::spawn(); + let rpc = rpc(); + let pubsub = PubSubClient::new(rpc, el.remote()).to_delegate(); + + let mut io = MetaIoHandler::default(); + io.extend_with(pubsub); + + let mut metadata = Metadata::default(); + let (sender, receiver) = futures::sync::mpsc::channel(8); + metadata.session = Some(Arc::new(Session::new(sender))); + + // Subscribe + let request = r#"{"jsonrpc": "2.0", "method": "parity_subscribe", "params": ["hello", []], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":1,"id":1}"#; + assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); + + // Check notifications + let (res, receiver) = receiver.into_future().wait().unwrap(); + let response = r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"hello","subscription":1}}"#; + assert_eq!(res, Some(response.into())); + + let (res, receiver) = receiver.into_future().wait().unwrap(); + let response = r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"world","subscription":1}}"#; + assert_eq!(res, Some(response.into())); + + // And unsubscribe + let request = r#"{"jsonrpc": "2.0", "method": "parity_unsubscribe", "params": [1], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; + assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned())); + + let (res, _receiver) = receiver.into_future().wait().unwrap(); + assert_eq!(res, None); +} + diff --git a/rpc/src/v1/traits/mod.rs b/rpc/src/v1/traits/mod.rs index 3f4125341..9cef58ee7 100644 --- a/rpc/src/v1/traits/mod.rs +++ b/rpc/src/v1/traits/mod.rs @@ -25,6 +25,7 @@ pub mod parity_accounts; pub mod parity_set; pub mod parity_signing; pub mod personal; +pub mod pubsub; pub mod signer; pub mod traces; pub mod rpc; @@ -39,6 +40,7 @@ pub use self::parity_accounts::ParityAccounts; pub use self::parity_set::ParitySet; pub use self::parity_signing::ParitySigning; pub use self::personal::Personal; +pub use self::pubsub::PubSub; pub use self::signer::Signer; pub use self::traces::Traces; pub use self::rpc::Rpc; diff --git a/rpc/src/v1/traits/pubsub.rs b/rpc/src/v1/traits/pubsub.rs new file mode 100644 index 000000000..a4672403a --- /dev/null +++ b/rpc/src/v1/traits/pubsub.rs @@ -0,0 +1,39 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Parity-specific PUB-SUB rpc interface. + +use jsonrpc_core::{Error, Value, Params}; +use jsonrpc_pubsub::SubscriptionId; +use jsonrpc_macros::pubsub::Subscriber; +use futures::BoxFuture; + +build_rpc_trait! { + /// Parity-specific PUB-SUB rpc interface. + pub trait PubSub { + type Metadata; + + #[pubsub(name = "parity_subscription")] { + /// Subscribe to changes of any RPC method in Parity. + #[rpc(name = "parity_subscribe")] + fn parity_subscribe(&self, Self::Metadata, Subscriber, String, Params); + + /// Unsubscribe from existing Parity subscription. + #[rpc(name = "parity_unsubscribe")] + fn parity_unsubscribe(&self, SubscriptionId) -> BoxFuture; + } + } +}