From f86457ffd12a30cea9fefd8fef061ae3ac584a7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 9 Jun 2017 12:20:37 +0200 Subject: [PATCH] PubSub for IPC. (#5800) --- Cargo.lock | 18 +++++++++--------- rpc/src/v1/extractors.rs | 10 +++++----- rpc/src/v1/impls/eth_pubsub.rs | 4 ++-- rpc/src/v1/impls/pubsub.rs | 8 +------- rpc/src/v1/impls/signer.rs | 2 +- 5 files changed, 18 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff8b71780..c194b594d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1017,7 +1017,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d" dependencies = [ "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1029,7 +1029,7 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d" dependencies = [ "hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1042,7 +1042,7 @@ dependencies = [ [[package]] name = "jsonrpc-ipc-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d" dependencies = [ "bytes 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1055,7 +1055,7 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-pubsub 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1065,7 +1065,7 @@ dependencies = [ [[package]] name = "jsonrpc-minihttp-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-server-utils 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1079,7 +1079,7 @@ dependencies = [ [[package]] name = "jsonrpc-pubsub" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1089,7 +1089,7 @@ dependencies = [ [[package]] name = "jsonrpc-server-utils" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d" dependencies = [ "globset 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1101,7 +1101,7 @@ dependencies = [ [[package]] name = "jsonrpc-tcp-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d" dependencies = [ "bytes 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1115,7 +1115,7 @@ dependencies = [ [[package]] name = "jsonrpc-ws-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-server-utils 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", diff --git a/rpc/src/v1/extractors.rs b/rpc/src/v1/extractors.rs index 1feaf4d9b..b93ca7b87 100644 --- a/rpc/src/v1/extractors.rs +++ b/rpc/src/v1/extractors.rs @@ -51,10 +51,10 @@ impl HttpMetaExtractor for RpcExtractor { } impl ipc::MetaExtractor for RpcExtractor { - fn extract(&self, _req: &ipc::RequestContext) -> Metadata { + fn extract(&self, req: &ipc::RequestContext) -> Metadata { let mut metadata = Metadata::default(); - // TODO [ToDr] Extract proper session id when it's available in context. - metadata.origin = Origin::Ipc(1.into()); + metadata.origin = Origin::Ipc(req.session_id.into()); + metadata.session = Some(Arc::new(Session::new(req.sender.clone()))); metadata } } @@ -77,8 +77,8 @@ impl ws::MetaExtractor for WsExtractor { fn extract(&self, req: &ws::RequestContext) -> Metadata { let mut metadata = Metadata::default(); let id = req.session_id as u64; - // TODO [ToDr] Extract dapp from Origin - let dapp = "".into(); + + let dapp = req.origin.as_ref().map(|origin| (&**origin).into()).unwrap_or_default(); metadata.origin = match self.authcodes_path { Some(ref path) => { let authorization = req.protocols.get(0).and_then(|p| auth_token_hash(&path, p)); diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 202f2592a..ce839428b 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -74,10 +74,10 @@ impl ChainNotificationHandler { for subscriber in self.heads_subscribers.lock().values() { for &(ref block, ref extra_info) in &blocks { self.remote.spawn(subscriber - .notify(pubsub::Result::Header(RichHeader { + .notify(Ok(pubsub::Result::Header(RichHeader { inner: block.into(), extra_info: extra_info.clone(), - })) + }))) .map(|_| ()) .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) ); diff --git a/rpc/src/v1/impls/pubsub.rs b/rpc/src/v1/impls/pubsub.rs index ea7bb4e91..215141e84 100644 --- a/rpc/src/v1/impls/pubsub.rs +++ b/rpc/src/v1/impls/pubsub.rs @@ -72,13 +72,7 @@ impl> PubSub for PubSubClient { let (id, receiver) = poll_manager.subscribe(meta, method, params); match subscriber.assign_id(id.clone()) { Ok(sink) => { - self.remote.spawn(receiver.map(|res| match res { - Ok(val) => val, - Err(error) => { - warn!(target: "pubsub", "Subscription error: {:?}", error); - core::Value::Null - }, - }).forward(sink.sink_map_err(|e| { + self.remote.spawn(receiver.forward(sink.sink_map_err(|e| { warn!("Cannot send notification: {:?}", e); })).map(|_| ())); }, diff --git a/rpc/src/v1/impls/signer.rs b/rpc/src/v1/impls/signer.rs index d596bde86..4667283c9 100644 --- a/rpc/src/v1/impls/signer.rs +++ b/rpc/src/v1/impls/signer.rs @@ -61,7 +61,7 @@ impl SignerClient { for subscription in subs.lock().values() { let subscription: &Sink<_> = subscription; remote.spawn(subscription - .notify(requests.clone()) + .notify(Ok(requests.clone())) .map(|_| ()) .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) );