From f38cc8e1826c69151cdaa2c2b8e932daf4d99f03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 23 May 2017 12:26:39 +0200 Subject: [PATCH] Latest headers Pub-Sub (#5655) * Signer subscription. * Fixing RPC tests. * Block Headers eth-pubsub. * PubSub for light client. * Fixing tests. * Updating to proper jsonrpc version. * Update to correct tests. * Fixing tests. --- Cargo.lock | 38 +++++-- ethcore/light/src/client/mod.rs | 25 ++++- ethcore/src/client/chain_notify.rs | 6 +- parity/cli/mod.rs | 6 +- parity/cli/usage.txt | 2 +- parity/rpc.rs | 50 +-------- parity/rpc_apis.rs | 33 ++++-- parity/signer.rs | 2 +- rpc/src/v1/impls/eth_pubsub.rs | 153 ++++++++++++++++++++++++++ rpc/src/v1/impls/mod.rs | 2 + rpc/src/v1/impls/parity.rs | 22 +--- rpc/src/v1/mod.rs | 2 +- rpc/src/v1/tests/mocked/eth_pubsub.rs | 104 +++++++++++++++++ rpc/src/v1/tests/mocked/mod.rs | 1 + rpc/src/v1/traits/eth_pubsub.rs | 42 +++++++ rpc/src/v1/traits/mod.rs | 2 + rpc/src/v1/types/block.rs | 37 ++++++- rpc/src/v1/types/filter.rs | 4 +- rpc/src/v1/types/mod.rs | 2 + rpc/src/v1/types/pubsub.rs | 114 +++++++++++++++++++ 20 files changed, 551 insertions(+), 96 deletions(-) create mode 100644 rpc/src/v1/impls/eth_pubsub.rs create mode 100644 rpc/src/v1/tests/mocked/eth_pubsub.rs create mode 100644 rpc/src/v1/traits/eth_pubsub.rs create mode 100644 rpc/src/v1/types/pubsub.rs diff --git a/Cargo.lock b/Cargo.lock index 224b84439..342dee62e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -836,6 +836,11 @@ dependencies = [ "miniz-sys 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fnv" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "futures" version = "0.1.11" @@ -873,6 +878,18 @@ name = "glob" version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "globset" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "aho-corasick 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hamming" version = "0.1.3" @@ -1040,7 +1057,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#8ed20d6e094e88f707045fca2d0959f46bfd23f9" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" dependencies = [ "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1052,7 +1069,7 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#8ed20d6e094e88f707045fca2d0959f46bfd23f9" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" dependencies = [ "hyper 0.10.0-a.0 (git+https://github.com/paritytech/hyper)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1065,7 +1082,7 @@ dependencies = [ [[package]] name = "jsonrpc-ipc-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#8ed20d6e094e88f707045fca2d0959f46bfd23f9" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" dependencies = [ "bytes 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1078,7 +1095,7 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#8ed20d6e094e88f707045fca2d0959f46bfd23f9" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-pubsub 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1088,7 +1105,7 @@ dependencies = [ [[package]] name = "jsonrpc-minihttp-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#8ed20d6e094e88f707045fca2d0959f46bfd23f9" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-server-utils 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1102,7 +1119,7 @@ dependencies = [ [[package]] name = "jsonrpc-pubsub" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#8ed20d6e094e88f707045fca2d0959f46bfd23f9" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1112,8 +1129,9 @@ dependencies = [ [[package]] name = "jsonrpc-server-utils" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#8ed20d6e094e88f707045fca2d0959f46bfd23f9" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" dependencies = [ + "globset 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1123,7 +1141,7 @@ dependencies = [ [[package]] name = "jsonrpc-tcp-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#8ed20d6e094e88f707045fca2d0959f46bfd23f9" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" dependencies = [ "bytes 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -1137,7 +1155,7 @@ dependencies = [ [[package]] name = "jsonrpc-ws-server" version = "7.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#8ed20d6e094e88f707045fca2d0959f46bfd23f9" +source = "git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7#900b528213ffd1aaaefd29e2b99dfab892b15ab4" dependencies = [ "jsonrpc-core 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", "jsonrpc-server-utils 7.0.0 (git+https://github.com/paritytech/jsonrpc.git?branch=parity-1.7)", @@ -2840,11 +2858,13 @@ dependencies = [ "checksum ethcore-bigint 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "5d237300af825a8d78f4c0dc835b0eab76a207e9df4aa088d91e162a173e0ca0" "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" "checksum flate2 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "3eeb481e957304178d2e782f2da1257f1434dfecbae883bafb61ada2a9fea3bb" +"checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" "checksum futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8e51e7f9c150ba7fd4cee9df8bf6ea3dea5b63b68955ddad19ccd35b71dcfb4d" "checksum futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bb982bb25cd8fa5da6a8eb3a460354c984ff1113da82bcb4f0b0862b5795db82" "checksum gcc 0.3.43 (registry+https://github.com/rust-lang/crates.io-index)" = "c07c758b972368e703a562686adb39125707cc1ef3399da8c019fc6c2498a75d" "checksum gdi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0912515a8ff24ba900422ecda800b52f4016a56251922d397c576bf92c690518" "checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb" +"checksum globset 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "90d069fe6beb9be359ef505650b3f73228c5591a3c4b1f32be2f4f44459ffa3a" "checksum hamming 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "65043da274378d68241eb9a8f8f8aa54e349136f7b8e12f63e3ef44043cc30e1" "checksum heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "abb306abb8d398e053cfb1b3e7b72c2f580be048b85745c52652954f8ad1439c" "checksum heck 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6f807d2f64cc044a6bcf250ff23e59be0deec7a16612c014f962a06fa7e020f9" diff --git a/ethcore/light/src/client/mod.rs b/ethcore/light/src/client/mod.rs index 7e6213273..57cd61cec 100644 --- a/ethcore/light/src/client/mod.rs +++ b/ethcore/light/src/client/mod.rs @@ -16,7 +16,7 @@ //! Light client implementation. Stores data from light sync -use std::sync::Arc; +use std::sync::{Weak, Arc}; use ethcore::block_import_error::BlockImportError; use ethcore::block_status::BlockStatus; @@ -111,6 +111,12 @@ pub trait LightChainClient: Send + Sync { fn eip86_transition(&self) -> u64; } +/// An actor listening to light chain events. +pub trait LightChainNotify: Send + Sync { + /// Notifies about imported headers. + fn new_headers(&self, good: &[H256]); +} + /// Something which can be treated as a `LightChainClient`. pub trait AsLightClient { /// The kind of light client this can be treated as. @@ -134,6 +140,7 @@ pub struct Client { report: RwLock, import_lock: Mutex<()>, db: Arc, + listeners: RwLock>>, } impl Client { @@ -148,9 +155,15 @@ impl Client { report: RwLock::new(ClientReport::default()), import_lock: Mutex::new(()), db: db, + listeners: RwLock::new(vec![]), }) } + /// Adds a new `LightChainNotify` listener. + pub fn add_listener(&self, listener: Weak) { + self.listeners.write().push(listener); + } + /// Create a new `Client` backed purely in-memory. /// This will ignore all database options in the configuration. pub fn in_memory(config: Config, spec: &Spec, io_channel: IoChannel, cache: Arc>) -> Self { @@ -272,6 +285,8 @@ impl Client { self.queue.mark_as_bad(&bad); self.queue.mark_as_good(&good); + + self.notify(|listener| listener.new_headers(&good)); } /// Get a report about blocks imported. @@ -327,6 +342,14 @@ impl Client { Arc::new(v) } + + fn notify(&self, f: F) { + for listener in &*self.listeners.read() { + if let Some(listener) = listener.upgrade() { + f(&*listener) + } + } + } } impl LightChainClient for Client { diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 01f32edf7..0a9bff8d7 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -21,7 +21,8 @@ use util::{H256, Bytes}; #[ipc] pub trait ChainNotify : Send + Sync { /// fires when chain has new blocks. - fn new_blocks(&self, + fn new_blocks( + &self, _imported: Vec, _invalid: Vec, _enacted: Vec, @@ -29,7 +30,8 @@ pub trait ChainNotify : Send + Sync { _sealed: Vec, // Block bytes. _proposed: Vec, - _duration: u64) { + _duration: u64, + ) { // does nothing by default } diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index efd618ffb..45285fa4a 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -172,7 +172,7 @@ usage! { or |c: &Config| otry!(c.rpc).interface.clone(), flag_jsonrpc_cors: Option = None, or |c: &Config| otry!(c.rpc).cors.clone().map(Some), - flag_jsonrpc_apis: String = "web3,eth,net,parity,traces,rpc,secretstore", + flag_jsonrpc_apis: String = "web3,eth,pubsub,net,parity,traces,rpc,secretstore", or |c: &Config| otry!(c.rpc).apis.as_ref().map(|vec| vec.join(",")), flag_jsonrpc_hosts: String = "none", or |c: &Config| otry!(c.rpc).hosts.as_ref().map(|vec| vec.join(",")), @@ -186,7 +186,7 @@ usage! { or |c: &Config| otry!(c.websockets).port.clone(), flag_ws_interface: String = "local", or |c: &Config| otry!(c.websockets).interface.clone(), - flag_ws_apis: String = "web3,eth,net,parity,traces,rpc,secretstore", + flag_ws_apis: String = "web3,eth,pubsub,net,parity,traces,rpc,secretstore", or |c: &Config| otry!(c.websockets).apis.as_ref().map(|vec| vec.join(",")), flag_ws_origins: String = "none", or |c: &Config| otry!(c.websockets).origins.as_ref().map(|vec| vec.join(",")), @@ -198,7 +198,7 @@ usage! { or |c: &Config| otry!(c.ipc).disable.clone(), flag_ipc_path: String = if cfg!(windows) { r"\\.\pipe\jsonrpc.ipc" } else { "$BASE/jsonrpc.ipc" }, or |c: &Config| otry!(c.ipc).path.clone(), - flag_ipc_apis: String = "web3,eth,net,parity,parity_accounts,traces,rpc,secretstore", + flag_ipc_apis: String = "web3,eth,pubsub,net,parity,parity_accounts,traces,rpc,secretstore", or |c: &Config| otry!(c.ipc).apis.as_ref().map(|vec| vec.join(",")), // DAPPS diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index bd5ed1f84..34352450a 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -180,7 +180,7 @@ API and Console Options: all (all interfaces) or local (default: {flag_ws_interface}). --ws-apis APIS Specify the APIs available through the WebSockets interface. APIS is a comma-delimited list of API - name. Possible name are web3, eth, net, personal, + name. Possible name are web3, eth, pubsub, net, personal, parity, parity_set, traces, rpc, parity_accounts. (default: {flag_ws_apis}). --ws-origins URL Specify Origin header values allowed to connect. diff --git a/parity/rpc.rs b/parity/rpc.rs index eb8f5c279..ae0e08858 100644 --- a/parity/rpc.rs +++ b/parity/rpc.rs @@ -21,7 +21,7 @@ use dapps; use parity_rpc::informant::{RpcStats, Middleware}; use parity_rpc::{self as rpc, HttpServerError, Metadata, Origin, DomainsValidation}; use helpers::parity_ipc_path; -use jsonrpc_core::{futures, MetaIoHandler}; +use jsonrpc_core::MetaIoHandler; use parity_reactor::TokioRemote; use rpc_apis::{self, ApiSet}; @@ -129,53 +129,13 @@ impl rpc::IpcMetaExtractor for RpcExtractor { } } -struct Sender(rpc::ws::ws::Sender, futures::sync::mpsc::Receiver); - -impl futures::Future for Sender { - type Item = (); - type Error = (); - - fn poll(&mut self) -> futures::Poll { - use self::futures::Stream; - - let item = self.1.poll()?; - match item { - futures::Async::NotReady => { - Ok(futures::Async::NotReady) - }, - futures::Async::Ready(None) => { - Ok(futures::Async::Ready(())) - }, - futures::Async::Ready(Some(val)) => { - if let Err(e) = self.0.send(val) { - warn!("Error sending a subscription update: {:?}", e); - } - self.poll() - }, - } - } -} - -struct WsRpcExtractor { - remote: TokioRemote, -} - -impl WsRpcExtractor { - fn wrap_out(&self, out: rpc::ws::ws::Sender) -> futures::sync::mpsc::Sender { - let (sender, receiver) = futures::sync::mpsc::channel(8); - self.remote.spawn(move |_| Sender(out, receiver)); - sender - } -} - +struct WsRpcExtractor; impl rpc::ws::MetaExtractor for WsRpcExtractor { fn extract(&self, req: &rpc::ws::RequestContext) -> Metadata { let mut metadata = Metadata::default(); let id = req.session_id as u64; metadata.origin = Origin::Ws(id.into()); - metadata.session = Some(Arc::new(rpc::PubSubSession::new( - self.wrap_out(req.out.clone()) - ))); + metadata.session = Some(Arc::new(rpc::PubSubSession::new(req.sender()))); metadata } } @@ -221,9 +181,7 @@ pub fn new_ws( remote.clone(), allowed_origins, allowed_hosts, - WsRpcExtractor { - remote: remote, - }, + WsRpcExtractor, WsStats { stats: deps.stats.clone(), }, diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 8b89c179f..d738d1eee 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -18,7 +18,7 @@ use std::cmp::PartialEq; use std::collections::BTreeMap; use std::collections::HashSet; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Weak}; pub use parity_rpc::SignerService; @@ -46,6 +46,8 @@ pub enum Api { Net, /// Eth (Safe) Eth, + /// Eth Pub-Sub (Safe) + EthPubSub, /// Geth-compatible "personal" API (DEPRECATED; only used in `--geth` mode.) Personal, /// Signer - Confirm transactions in Signer (UNSAFE: Passwords, List of transactions) @@ -74,6 +76,7 @@ impl FromStr for Api { "web3" => Ok(Web3), "net" => Ok(Net), "eth" => Ok(Eth), + "pubsub" => Ok(EthPubSub), "personal" => Ok(Personal), "signer" => Ok(Signer), "parity" => Ok(Parity), @@ -153,6 +156,7 @@ fn to_modules(apis: &[Api]) -> BTreeMap { Api::Web3 => ("web3", "1.0"), Api::Net => ("net", "1.0"), Api::Eth => ("eth", "1.0"), + Api::EthPubSub => ("pubsub", "1.0"), Api::Personal => ("personal", "1.0"), Api::Signer => ("signer", "1.0"), Api::Parity => ("parity", "1.0"), @@ -254,6 +258,11 @@ impl FullDependencies { add_signing_methods!(EthSigning, handler, self); } }, + Api::EthPubSub => { + let client = EthPubSubClient::new(self.client.clone(), self.remote.clone()); + self.client.add_notify(client.handler()); + handler.extend_with(client.to_delegate()); + }, Api::Personal => { handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate()); }, @@ -410,6 +419,13 @@ impl Dependencies for LightDependencies { handler.extend_with(EthFilter::to_delegate(client)); add_signing_methods!(EthSigning, handler, self); }, + Api::EthPubSub => { + let client = EthPubSubClient::new(self.client.clone(), self.remote.clone()); + self.client.add_listener( + Arc::downgrade(&client.handler()) as Weak<::light::client::LightChainNotify> + ); + handler.extend_with(EthPubSub::to_delegate(client)); + }, Api::Personal => { let secret_store = Some(self.secret_store.clone()); handler.extend_with(PersonalClient::new(&secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate()); @@ -471,7 +487,7 @@ impl ApiSet { pub fn list_apis(&self) -> HashSet { let mut public_list = vec![ - Api::Web3, Api::Net, Api::Eth, Api::Parity, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::Rpc, Api::SecretStore, ].into_iter().collect(); match *self { ApiSet::List(ref apis) => apis.clone(), @@ -522,6 +538,7 @@ mod test { assert_eq!(Api::Web3, "web3".parse().unwrap()); assert_eq!(Api::Net, "net".parse().unwrap()); assert_eq!(Api::Eth, "eth".parse().unwrap()); + assert_eq!(Api::EthPubSub, "pubsub".parse().unwrap()); assert_eq!(Api::Personal, "personal".parse().unwrap()); assert_eq!(Api::Signer, "signer".parse().unwrap()); assert_eq!(Api::Parity, "parity".parse().unwrap()); @@ -547,7 +564,7 @@ mod test { fn test_api_set_unsafe_context() { let expected = vec![ // make sure this list contains only SAFE methods - Api::Web3, Api::Net, Api::Eth, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore ].into_iter().collect(); assert_eq!(ApiSet::UnsafeContext.list_apis(), expected); } @@ -556,7 +573,7 @@ mod test { fn test_api_set_ipc_context() { let expected = vec![ // safe - Api::Web3, Api::Net, Api::Eth, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, // semi-safe Api::ParityAccounts ].into_iter().collect(); @@ -567,7 +584,7 @@ mod test { fn test_api_set_safe_context() { let expected = vec![ // safe - Api::Web3, Api::Net, Api::Eth, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, // semi-safe Api::ParityAccounts, // Unsafe @@ -579,7 +596,7 @@ mod test { #[test] fn test_all_apis() { assert_eq!("all".parse::().unwrap(), ApiSet::List(vec![ - Api::Web3, Api::Net, Api::Eth, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, Api::ParityAccounts, Api::ParitySet, Api::Signer, Api::Personal @@ -589,7 +606,7 @@ mod test { #[test] fn test_all_without_personal_apis() { assert_eq!("personal,all,-personal".parse::().unwrap(), ApiSet::List(vec![ - Api::Web3, Api::Net, Api::Eth, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, Api::ParityAccounts, Api::ParitySet, Api::Signer, ].into_iter().collect())); @@ -598,7 +615,7 @@ mod test { #[test] fn test_safe_parsing() { assert_eq!("safe".parse::().unwrap(), ApiSet::List(vec![ - Api::Web3, Api::Net, Api::Eth, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, + Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::EthPubSub, Api::Parity, Api::Traces, Api::Rpc, Api::SecretStore, ].into_iter().collect())); } } diff --git a/parity/signer.rs b/parity/signer.rs index 1ab53ea69..7f800f0e0 100644 --- a/parity/signer.rs +++ b/parity/signer.rs @@ -101,7 +101,7 @@ pub fn execute(cmd: Configuration) -> Result { } pub fn generate_token_and_url(conf: &Configuration) -> Result { - let code = generate_new_token(conf.signer_path.clone()).map_err(|err| format!("Error generating token: {:?}", err))?; + let code = generate_new_token(conf.signer_path.clone()).map_err(|err| format!("Error generating token: {}", err))?; let auth_url = format!("http://{}:{}/#/auth?token={}", conf.interface, conf.port, code); // And print in to the console Ok(NewToken { diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs new file mode 100644 index 000000000..202f2592a --- /dev/null +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -0,0 +1,153 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Eth PUB-SUB rpc implementation. + +use std::sync::Arc; +use std::collections::BTreeMap; + +use futures::{self, BoxFuture, Future}; +use jsonrpc_core::Error; +use jsonrpc_macros::Trailing; +use jsonrpc_macros::pubsub::{Sink, Subscriber}; +use jsonrpc_pubsub::SubscriptionId; + +use v1::helpers::{errors, Subscribers}; +use v1::metadata::Metadata; +use v1::traits::EthPubSub; +use v1::types::{pubsub, RichHeader}; + +use ethcore::encoded; +use ethcore::client::{BlockChainClient, ChainNotify, BlockId}; +use light::client::{LightChainClient, LightChainNotify}; +use parity_reactor::Remote; +use util::{Mutex, H256, Bytes}; + +/// Eth PubSub implementation. +pub struct EthPubSubClient { + handler: Arc>, + heads_subscribers: Arc>>>, +} + +impl EthPubSubClient { + /// Creates new `EthPubSubClient`. + pub fn new(client: Arc, remote: Remote) -> Self { + let heads_subscribers = Arc::new(Mutex::new(Subscribers::default())); + EthPubSubClient { + handler: Arc::new(ChainNotificationHandler { + client: client, + remote: remote, + heads_subscribers: heads_subscribers.clone(), + }), + heads_subscribers: heads_subscribers, + } + } + + /// Returns a chain notification handler. + pub fn handler(&self) -> Arc> { + self.handler.clone() + } +} + +/// PubSub Notification handler. +pub struct ChainNotificationHandler { + client: Arc, + remote: Remote, + heads_subscribers: Arc>>>, +} + +impl ChainNotificationHandler { + fn notify(&self, blocks: Vec<(encoded::Header, BTreeMap)>) { + for subscriber in self.heads_subscribers.lock().values() { + for &(ref block, ref extra_info) in &blocks { + self.remote.spawn(subscriber + .notify(pubsub::Result::Header(RichHeader { + inner: block.into(), + extra_info: extra_info.clone(), + })) + .map(|_| ()) + .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) + ); + } + } + } +} + +impl LightChainNotify for ChainNotificationHandler { + fn new_headers( + &self, + headers: &[H256], + ) { + let blocks = headers + .iter() + .filter_map(|hash| self.client.block_header(BlockId::Hash(*hash))) + .map(|header| (header, Default::default())) + .collect(); + + self.notify(blocks); + } +} + +impl ChainNotify for ChainNotificationHandler { + fn new_blocks( + &self, + _imported: Vec, + _invalid: Vec, + enacted: Vec, + _retracted: Vec, + _sealed: Vec, + // Block bytes. + _proposed: Vec, + _duration: u64, + ) { + const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed"; + let blocks = enacted + .into_iter() + .filter_map(|hash| self.client.block_header(BlockId::Hash(hash))) + .map(|header| { + let hash = header.hash(); + (header, self.client.block_extra_info(BlockId::Hash(hash)).expect(EXTRA_INFO_PROOF)) + }) + .collect(); + self.notify(blocks); + } +} + +impl EthPubSub for EthPubSubClient { + type Metadata = Metadata; + + fn subscribe( + &self, + _meta: Metadata, + subscriber: Subscriber, + kind: pubsub::Kind, + params: Trailing, + ) { + match (kind, params.0) { + (pubsub::Kind::NewHeads, pubsub::Params::None) => { + self.heads_subscribers.lock().push(subscriber) + }, + _ => { + let _ = subscriber.reject(errors::unimplemented(None)); + }, + } + } + + fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture { + let res = self.heads_subscribers.lock().remove(&id).is_some(); + futures::future::ok(res).boxed() + } +} diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index a8691b32b..d3e0554c2 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -18,6 +18,7 @@ mod eth; mod eth_filter; +mod eth_pubsub; mod net; mod parity; mod parity_accounts; @@ -36,6 +37,7 @@ pub mod light; pub use self::eth::{EthClient, EthClientOptions}; pub use self::eth_filter::EthFilterClient; +pub use self::eth_pubsub::EthPubSubClient; pub use self::net::NetClient; pub use self::parity::ParityClient; pub use self::parity_accounts::ParityAccountsClient; diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index 9c3673ee4..ae91af54c 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -48,7 +48,7 @@ use v1::types::{ TransactionStats, LocalTransactionStatus, BlockNumber, ConsensusCapability, VersionInfo, OperationsInfo, DappId, ChainStatus, - AccountInfo, HwAccountInfo, Header, RichHeader + AccountInfo, HwAccountInfo, RichHeader }; /// Parity implementation. @@ -411,25 +411,7 @@ impl Parity for ParityClient where }; future::ok(RichHeader { - inner: Header { - hash: Some(encoded.hash().into()), - size: Some(encoded.rlp().as_raw().len().into()), - parent_hash: encoded.parent_hash().into(), - uncles_hash: encoded.uncles_hash().into(), - author: encoded.author().into(), - miner: encoded.author().into(), - state_root: encoded.state_root().into(), - transactions_root: encoded.transactions_root().into(), - receipts_root: encoded.receipts_root().into(), - number: Some(encoded.number().into()), - gas_used: encoded.gas_used().into(), - gas_limit: encoded.gas_limit().into(), - logs_bloom: encoded.log_bloom().into(), - timestamp: encoded.timestamp().into(), - difficulty: encoded.difficulty().into(), - seal_fields: encoded.seal().into_iter().map(Into::into).collect(), - extra_data: Bytes::new(encoded.extra_data()), - }, + inner: encoded.into(), extra_info: client.block_extra_info(id).expect(EXTRA_INFO_PROOF), }).boxed() } diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index 59aef84b3..bd8675196 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -58,7 +58,7 @@ pub mod traits; pub mod tests; pub mod types; -pub use self::traits::{Web3, Eth, EthFilter, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, PubSub, Signer, Personal, Traces, Rpc, SecretStore}; +pub use self::traits::{Web3, Eth, EthFilter, EthPubSub, EthSigning, Net, Parity, ParityAccounts, ParitySet, ParitySigning, PubSub, Signer, Personal, Traces, Rpc, SecretStore}; pub use self::impls::*; pub use self::helpers::{SigningQueue, SignerService, ConfirmationsQueue, NetworkSettings, block_import, informant, dispatch}; pub use self::metadata::Metadata; diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs new file mode 100644 index 000000000..ae1165068 --- /dev/null +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -0,0 +1,104 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::sync::Arc; + +use jsonrpc_core::MetaIoHandler; +use jsonrpc_core::futures::{self, Stream, Future}; +use jsonrpc_pubsub::Session; + +use v1::{EthPubSub, EthPubSubClient, Metadata}; + +use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify}; +use parity_reactor::EventLoop; + +#[test] +fn should_subscribe_to_new_heads() { + // given + let el = EventLoop::spawn(); + let mut client = TestBlockChainClient::new(); + // Insert some blocks + client.add_blocks(3, EachBlockWith::Nothing); + let h3 = client.block_hash_delta_minus(1); + let h2 = client.block_hash_delta_minus(2); + let h1 = client.block_hash_delta_minus(3); + + let pubsub = EthPubSubClient::new(Arc::new(client), el.remote()); + let handler = pubsub.handler(); + let pubsub = pubsub.to_delegate(); + + let mut io = MetaIoHandler::default(); + io.extend_with(pubsub); + + let mut metadata = Metadata::default(); + let (sender, receiver) = futures::sync::mpsc::channel(8); + metadata.session = Some(Arc::new(Session::new(sender))); + + // Subscribe + let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newHeads"], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":1,"id":1}"#; + assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); + + // Check notifications + handler.new_blocks(vec![], vec![], vec![h1], vec![], vec![], vec![], 0); + let (res, receiver) = receiver.into_future().wait().unwrap(); + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":1}}"#; + assert_eq!(res, Some(response.into())); + + // Notify about two blocks + handler.new_blocks(vec![], vec![], vec![h2, h3], vec![], vec![], vec![], 0); + + // Receive both + let (res, receiver) = receiver.into_future().wait().unwrap(); + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x2","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x2","parentHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":1}}"#; + assert_eq!(res, Some(response.into())); + let (res, receiver) = receiver.into_future().wait().unwrap(); + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x3","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0xdf04a98bb0c6fa8441bd429822f65a46d0cb553f6bcef602b973e65c81497f8e","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x3","parentHash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":1}}"#; + assert_eq!(res, Some(response.into())); + + // And unsubscribe + let request = r#"{"jsonrpc": "2.0", "method": "eth_unsubscribe", "params": [1], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; + assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned())); + + let (res, _receiver) = receiver.into_future().wait().unwrap(); + assert_eq!(res, None); +} + +#[test] +fn should_return_unimplemented() { + // given + let el = EventLoop::spawn(); + let client = TestBlockChainClient::new(); + let pubsub = EthPubSubClient::new(Arc::new(client), el.remote()); + let pubsub = pubsub.to_delegate(); + + let mut io = MetaIoHandler::default(); + io.extend_with(pubsub); + + let mut metadata = Metadata::default(); + let (sender, _receiver) = futures::sync::mpsc::channel(8); + metadata.session = Some(Arc::new(Session::new(sender))); + + // Subscribe + let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"This request is not implemented yet. Please create an issue on Github repo."},"id":1}"#; + let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions"], "id": 1}"#; + assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); + let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["logs"], "id": 1}"#; + assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); + let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["syncing"], "id": 1}"#; + assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); +} diff --git a/rpc/src/v1/tests/mocked/mod.rs b/rpc/src/v1/tests/mocked/mod.rs index e5a459633..ae51c2be6 100644 --- a/rpc/src/v1/tests/mocked/mod.rs +++ b/rpc/src/v1/tests/mocked/mod.rs @@ -18,6 +18,7 @@ //! method calls properly. mod eth; +mod eth_pubsub; mod manage_network; mod net; mod parity; diff --git a/rpc/src/v1/traits/eth_pubsub.rs b/rpc/src/v1/traits/eth_pubsub.rs new file mode 100644 index 000000000..794d12768 --- /dev/null +++ b/rpc/src/v1/traits/eth_pubsub.rs @@ -0,0 +1,42 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Eth PUB-SUB rpc interface. + +use jsonrpc_core::Error; +use jsonrpc_macros::Trailing; +use jsonrpc_macros::pubsub::Subscriber; +use jsonrpc_pubsub::SubscriptionId; +use futures::BoxFuture; + +use v1::types::pubsub; + +build_rpc_trait! { + /// Eth PUB-SUB rpc interface. + pub trait EthPubSub { + type Metadata; + + #[pubsub(name = "eth_subscription")] { + /// Subscribe to Eth subscription. + #[rpc(name = "eth_subscribe")] + fn subscribe(&self, Self::Metadata, Subscriber, pubsub::Kind, Trailing); + + /// Unsubscribe from existing Eth subscription. + #[rpc(name = "eth_unsubscribe")] + fn unsubscribe(&self, SubscriptionId) -> BoxFuture; + } + } +} diff --git a/rpc/src/v1/traits/mod.rs b/rpc/src/v1/traits/mod.rs index 9cef58ee7..528463a4a 100644 --- a/rpc/src/v1/traits/mod.rs +++ b/rpc/src/v1/traits/mod.rs @@ -18,6 +18,7 @@ pub mod web3; pub mod eth; +pub mod eth_pubsub; pub mod eth_signing; pub mod net; pub mod parity; @@ -33,6 +34,7 @@ pub mod secretstore; pub use self::web3::Web3; pub use self::eth::{Eth, EthFilter}; +pub use self::eth_pubsub::EthPubSub; pub use self::eth_signing::EthSigning; pub use self::net::Net; pub use self::parity::Parity; diff --git a/rpc/src/v1/types/block.rs b/rpc/src/v1/types/block.rs index 4077d7221..50a081356 100644 --- a/rpc/src/v1/types/block.rs +++ b/rpc/src/v1/types/block.rs @@ -16,6 +16,9 @@ use std::ops::Deref; use std::collections::BTreeMap; + +use ethcore::encoded::Header as EthHeader; + use serde::{Serialize, Serializer}; use serde::ser::Error; use v1::types::{Bytes, Transaction, H160, H256, H2048, U256}; @@ -97,7 +100,7 @@ pub struct Block { } /// Block header representation. -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, PartialEq, Eq)] pub struct Header { /// Hash of the block pub hash: Option, @@ -146,6 +149,36 @@ pub struct Header { pub size: Option, } +impl From for Header { + fn from(h: EthHeader) -> Self { + (&h).into() + } +} + +impl<'a> From<&'a EthHeader> for Header { + fn from(h: &'a EthHeader) -> Self { + Header { + hash: Some(h.hash().into()), + size: Some(h.rlp().as_raw().len().into()), + parent_hash: h.parent_hash().into(), + uncles_hash: h.uncles_hash().into(), + author: h.author().into(), + miner: h.author().into(), + state_root: h.state_root().into(), + transactions_root: h.transactions_root().into(), + receipts_root: h.receipts_root().into(), + number: Some(h.number().into()), + gas_used: h.gas_used().into(), + gas_limit: h.gas_limit().into(), + logs_bloom: h.log_bloom().into(), + timestamp: h.timestamp().into(), + difficulty: h.difficulty().into(), + seal_fields: h.seal().into_iter().map(Into::into).collect(), + extra_data: h.extra_data().into(), + } + } +} + /// Block representation with additional info. pub type RichBlock = Rich; @@ -153,7 +186,7 @@ pub type RichBlock = Rich; pub type RichHeader = Rich
; /// Value representation with additional info -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct Rich { /// Standard value. pub inner: T, diff --git a/rpc/src/v1/types/filter.rs b/rpc/src/v1/types/filter.rs index 8ccac7efd..cd1d43fcb 100644 --- a/rpc/src/v1/types/filter.rs +++ b/rpc/src/v1/types/filter.rs @@ -22,7 +22,7 @@ use ethcore::client::BlockId; use v1::types::{BlockNumber, H160, H256, Log}; /// Variadic value -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub enum VariadicValue where T: Deserialize { /// Single Single(T), @@ -53,7 +53,7 @@ pub type FilterAddress = VariadicValue; pub type Topic = VariadicValue; /// Filter -#[derive(Debug, PartialEq, Clone, Deserialize)] +#[derive(Debug, PartialEq, Clone, Deserialize, Eq, Hash)] #[serde(deny_unknown_fields)] pub struct Filter { /// From Block diff --git a/rpc/src/v1/types/mod.rs b/rpc/src/v1/types/mod.rs index 7d0ae0541..97f2eb2ae 100644 --- a/rpc/src/v1/types/mod.rs +++ b/rpc/src/v1/types/mod.rs @@ -43,6 +43,8 @@ mod transaction_condition; mod uint; mod work; +pub mod pubsub; + pub use self::account_info::{AccountInfo, HwAccountInfo}; pub use self::bytes::Bytes; pub use self::block::{RichBlock, Block, BlockTransactions, Header, RichHeader, Rich}; diff --git a/rpc/src/v1/types/pubsub.rs b/rpc/src/v1/types/pubsub.rs new file mode 100644 index 000000000..8bc4f9079 --- /dev/null +++ b/rpc/src/v1/types/pubsub.rs @@ -0,0 +1,114 @@ +// Copyright 2015-2017 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Pub-Sub types. + +use serde::{Serialize, Serializer}; +use v1::types::{RichHeader, Filter}; + +/// Subscription result. +#[derive(Debug, PartialEq, Eq)] +pub enum Result { + /// New block header. + Header(RichHeader), +} + +impl Serialize for Result { + fn serialize(&self, serializer: S) -> ::std::result::Result + where S: Serializer + { + match *self { + Result::Header(ref header) => header.serialize(serializer), + } + } +} + +/// Subscription kind. +#[derive(Debug, Deserialize, PartialEq, Eq, Hash, Clone)] +#[serde(deny_unknown_fields)] +pub enum Kind { + /// New block headers subscription. + #[serde(rename="newHeads")] + NewHeads, + /// Logs subscription. + #[serde(rename="logs")] + Logs, + /// New Pending Transactions subscription. + #[serde(rename="newPendingTransactions")] + NewPendingTransactions, + /// Node syncing status subscription. + #[serde(rename="syncing")] + Syncing, +} + +/// Subscription kind. +#[derive(Debug, Deserialize, PartialEq, Eq, Hash, Clone)] +#[serde(deny_unknown_fields)] +pub enum Params { + /// No parameters passed. + None, + /// Log parameters. + Logs(Filter), +} + +impl Default for Params { + fn default() -> Self { + Params::None + } +} + +#[cfg(test)] +mod tests { + use serde_json; + use super::{Result, Kind}; + use v1::types::{RichHeader, Header}; + + #[test] + fn should_deserialize_kind() { + assert_eq!(serde_json::from_str::(r#""newHeads""#).unwrap(), Kind::NewHeads); + assert_eq!(serde_json::from_str::(r#""logs""#).unwrap(), Kind::Logs); + assert_eq!(serde_json::from_str::(r#""newPendingTransactions""#).unwrap(), Kind::NewPendingTransactions); + assert_eq!(serde_json::from_str::(r#""syncing""#).unwrap(), Kind::Syncing); + } + + #[test] + fn should_serialize_header() { + let header = Result::Header(RichHeader { + extra_info: Default::default(), + inner: Header { + hash: Some(Default::default()), + parent_hash: Default::default(), + uncles_hash: Default::default(), + author: Default::default(), + miner: Default::default(), + state_root: Default::default(), + transactions_root: Default::default(), + receipts_root: Default::default(), + number: Some(Default::default()), + gas_used: Default::default(), + gas_limit: Default::default(), + extra_data: Default::default(), + logs_bloom: Default::default(), + timestamp: Default::default(), + difficulty: Default::default(), + seal_fields: vec![Default::default(), Default::default()], + size: Some(69.into()), + }, + }); + let expected = r#"{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x0","extraData":"0x","gasLimit":"0x0","gasUsed":"0x0","hash":"0x0000000000000000000000000000000000000000000000000000000000000000","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x0","parentHash":"0x0000000000000000000000000000000000000000000000000000000000000000","receiptsRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","sealFields":["0x","0x"],"sha3Uncles":"0x0000000000000000000000000000000000000000000000000000000000000000","size":"0x45","stateRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","timestamp":"0x0","transactionsRoot":"0x0000000000000000000000000000000000000000000000000000000000000000"}"#; + assert_eq!(serde_json::to_string(&header).unwrap(), expected); + } +}