PubSub for IPC. (#5800)

This commit is contained in:
Tomasz Drwięga 2017-06-09 12:20:37 +02:00 committed by Gav Wood
parent d368006526
commit f86457ffd1
5 changed files with 18 additions and 24 deletions

18
Cargo.lock generated
View File

@ -1017,7 +1017,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "jsonrpc-core" name = "jsonrpc-core"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d"
dependencies = [ dependencies = [
"futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1029,7 +1029,7 @@ dependencies = [
[[package]] [[package]]
name = "jsonrpc-http-server" name = "jsonrpc-http-server"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d"
dependencies = [ dependencies = [
"hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)", "hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)",
"jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
@ -1042,7 +1042,7 @@ dependencies = [
[[package]] [[package]]
name = "jsonrpc-ipc-server" name = "jsonrpc-ipc-server"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d"
dependencies = [ dependencies = [
"bytes 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
@ -1055,7 +1055,7 @@ dependencies = [
[[package]] [[package]]
name = "jsonrpc-macros" name = "jsonrpc-macros"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d"
dependencies = [ dependencies = [
"jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
"jsonrpc-pubsub 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-pubsub 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
@ -1065,7 +1065,7 @@ dependencies = [
[[package]] [[package]]
name = "jsonrpc-minihttp-server" name = "jsonrpc-minihttp-server"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d"
dependencies = [ dependencies = [
"jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
"jsonrpc-server-utils 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-server-utils 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
@ -1079,7 +1079,7 @@ dependencies = [
[[package]] [[package]]
name = "jsonrpc-pubsub" name = "jsonrpc-pubsub"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d"
dependencies = [ dependencies = [
"jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1089,7 +1089,7 @@ dependencies = [
[[package]] [[package]]
name = "jsonrpc-server-utils" name = "jsonrpc-server-utils"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d"
dependencies = [ dependencies = [
"globset 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "globset 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
@ -1101,7 +1101,7 @@ dependencies = [
[[package]] [[package]]
name = "jsonrpc-tcp-server" name = "jsonrpc-tcp-server"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d"
dependencies = [ dependencies = [
"bytes 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
@ -1115,7 +1115,7 @@ dependencies = [
[[package]] [[package]]
name = "jsonrpc-ws-server" name = "jsonrpc-ws-server"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#87db29043826f152cce171351fa34fada287764d"
dependencies = [ dependencies = [
"jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",
"jsonrpc-server-utils 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-server-utils 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)",

View File

@ -51,10 +51,10 @@ impl HttpMetaExtractor for RpcExtractor {
} }
impl ipc::MetaExtractor<Metadata> for RpcExtractor { impl ipc::MetaExtractor<Metadata> for RpcExtractor {
fn extract(&self, _req: &ipc::RequestContext) -> Metadata { fn extract(&self, req: &ipc::RequestContext) -> Metadata {
let mut metadata = Metadata::default(); let mut metadata = Metadata::default();
// TODO [ToDr] Extract proper session id when it's available in context. metadata.origin = Origin::Ipc(req.session_id.into());
metadata.origin = Origin::Ipc(1.into()); metadata.session = Some(Arc::new(Session::new(req.sender.clone())));
metadata metadata
} }
} }
@ -77,8 +77,8 @@ impl ws::MetaExtractor<Metadata> for WsExtractor {
fn extract(&self, req: &ws::RequestContext) -> Metadata { fn extract(&self, req: &ws::RequestContext) -> Metadata {
let mut metadata = Metadata::default(); let mut metadata = Metadata::default();
let id = req.session_id as u64; let id = req.session_id as u64;
// TODO [ToDr] Extract dapp from Origin
let dapp = "".into(); let dapp = req.origin.as_ref().map(|origin| (&**origin).into()).unwrap_or_default();
metadata.origin = match self.authcodes_path { metadata.origin = match self.authcodes_path {
Some(ref path) => { Some(ref path) => {
let authorization = req.protocols.get(0).and_then(|p| auth_token_hash(&path, p)); let authorization = req.protocols.get(0).and_then(|p| auth_token_hash(&path, p));

View File

@ -74,10 +74,10 @@ impl<C> ChainNotificationHandler<C> {
for subscriber in self.heads_subscribers.lock().values() { for subscriber in self.heads_subscribers.lock().values() {
for &(ref block, ref extra_info) in &blocks { for &(ref block, ref extra_info) in &blocks {
self.remote.spawn(subscriber self.remote.spawn(subscriber
.notify(pubsub::Result::Header(RichHeader { .notify(Ok(pubsub::Result::Header(RichHeader {
inner: block.into(), inner: block.into(),
extra_info: extra_info.clone(), extra_info: extra_info.clone(),
})) })))
.map(|_| ()) .map(|_| ())
.map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e))
); );

View File

@ -72,13 +72,7 @@ impl<S: core::Middleware<Metadata>> PubSub for PubSubClient<S> {
let (id, receiver) = poll_manager.subscribe(meta, method, params); let (id, receiver) = poll_manager.subscribe(meta, method, params);
match subscriber.assign_id(id.clone()) { match subscriber.assign_id(id.clone()) {
Ok(sink) => { Ok(sink) => {
self.remote.spawn(receiver.map(|res| match res { self.remote.spawn(receiver.forward(sink.sink_map_err(|e| {
Ok(val) => val,
Err(error) => {
warn!(target: "pubsub", "Subscription error: {:?}", error);
core::Value::Null
},
}).forward(sink.sink_map_err(|e| {
warn!("Cannot send notification: {:?}", e); warn!("Cannot send notification: {:?}", e);
})).map(|_| ())); })).map(|_| ()));
}, },

View File

@ -61,7 +61,7 @@ impl<D: Dispatcher + 'static> SignerClient<D> {
for subscription in subs.lock().values() { for subscription in subs.lock().values() {
let subscription: &Sink<_> = subscription; let subscription: &Sink<_> = subscription;
remote.spawn(subscription remote.spawn(subscription
.notify(requests.clone()) .notify(Ok(requests.clone()))
.map(|_| ()) .map(|_| ())
.map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e))
); );