From 57479dac27b1233127d888f7552763167e395951 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 13 Jun 2017 17:36:39 +0200 Subject: [PATCH] Use randomized subscription ids for PubSub (#5756) * Use randomized subscription ids for PubSub * Use H64 instead of H128. * iflet instead of match. * Adding 0x. --- rpc/src/v1/helpers/subscribers.rs | 71 +++++++++++++++------- rpc/src/v1/helpers/subscription_manager.rs | 14 ++++- rpc/src/v1/impls/eth_pubsub.rs | 14 ++++- rpc/src/v1/impls/pubsub.rs | 14 ++++- rpc/src/v1/tests/mocked/eth_pubsub.rs | 14 ++--- rpc/src/v1/tests/mocked/pubsub.rs | 12 ++-- 6 files changed, 98 insertions(+), 41 deletions(-) diff --git a/rpc/src/v1/helpers/subscribers.rs b/rpc/src/v1/helpers/subscribers.rs index a67dbb464..e1fc42ae4 100644 --- a/rpc/src/v1/helpers/subscribers.rs +++ b/rpc/src/v1/helpers/subscribers.rs @@ -16,45 +16,79 @@ //! A map of subscribers. -use std::ops; +use std::{ops, str}; use std::collections::HashMap; use jsonrpc_macros::pubsub::{Subscriber, Sink, SubscriptionId}; +use rand::{Rng, StdRng}; +use v1::types::H64; -#[derive(Clone, Debug)] + +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub struct Id(H64); +impl str::FromStr for Id { + type Err = String; + + fn from_str(s: &str) -> Result { + if s.starts_with("0x") { + Ok(Id(s[2..].parse().map_err(|e| format!("{}", e))?)) + } else { + Err("The id must start with 0x".into()) + } + } +} +impl Id { + pub fn as_string(&self) -> String { + format!("0x{:?}", self.0) + } +} + +#[derive(Clone)] pub struct Subscribers { - next_id: u64, - subscriptions: HashMap, + rand: StdRng, + subscriptions: HashMap, } impl Default for Subscribers { fn default() -> Self { Subscribers { - next_id: 0, + rand: StdRng::new().expect("Valid random source is required."), subscriptions: HashMap::new(), } } } impl Subscribers { - fn next_id(&mut self) -> u64 { - self.next_id += 1; - self.next_id + /// Create a new Subscribers with given random source. + #[cfg(test)] + pub fn new_test() -> Self { + Subscribers { + rand: ::rand::SeedableRng::from_seed([0usize].as_ref()), + subscriptions: HashMap::new(), + } + } + + fn next_id(&mut self) -> Id { + let mut data = H64::default(); + self.rand.fill_bytes(&mut data.0); + Id(data) } /// Insert new subscription and return assigned id. pub fn insert(&mut self, val: T) -> SubscriptionId { let id = self.next_id(); - debug!(target: "pubsub", "Adding subscription id={}", id); + debug!(target: "pubsub", "Adding subscription id={:?}", id); + let s = id.as_string(); self.subscriptions.insert(id, val); - SubscriptionId::Number(id) + SubscriptionId::String(s) } /// Removes subscription with given id and returns it (if any). pub fn remove(&mut self, id: &SubscriptionId) -> Option { trace!(target: "pubsub", "Removing subscription id={:?}", id); match *id { - SubscriptionId::Number(id) => { - self.subscriptions.remove(&id) + SubscriptionId::String(ref id) => match id.parse() { + Ok(id) => self.subscriptions.remove(&id), + Err(_) => None, }, _ => None, } @@ -65,20 +99,15 @@ impl Subscribers> { /// Assigns id and adds a subscriber to the list. pub fn push(&mut self, sub: Subscriber) { let id = self.next_id(); - match sub.assign_id(SubscriptionId::Number(id)) { - Ok(sink) => { - debug!(target: "pubsub", "Adding subscription id={:?}", id); - self.subscriptions.insert(id, sink); - }, - Err(_) => { - self.next_id -= 1; - }, + if let Ok(sink) = sub.assign_id(SubscriptionId::String(id.as_string())) { + debug!(target: "pubsub", "Adding subscription id={:?}", id); + self.subscriptions.insert(id, sink); } } } impl ops::Deref for Subscribers { - type Target = HashMap; + type Target = HashMap; fn deref(&self) -> &Self::Target { &self.subscriptions diff --git a/rpc/src/v1/helpers/subscription_manager.rs b/rpc/src/v1/helpers/subscription_manager.rs index 6f97b3462..1cf067901 100644 --- a/rpc/src/v1/helpers/subscription_manager.rs +++ b/rpc/src/v1/helpers/subscription_manager.rs @@ -54,6 +54,14 @@ impl> GenericPollManager { } } + /// Creates new poll manager with deterministic ids. + #[cfg(test)] + pub fn new_test(rpc: MetaIoHandler) -> Self { + let mut manager = Self::new(rpc); + manager.subscribers = Subscribers::new_test(); + manager + } + /// Subscribes to update from polling given method. pub fn subscribe(&mut self, metadata: Metadata, method: String, params: core::Params) -> (SubscriptionId, mpsc::Receiver>) @@ -81,7 +89,7 @@ impl> GenericPollManager { for (id, subscription) in self.subscribers.iter() { let call = core::MethodCall { jsonrpc: Some(core::Version::V2), - id: core::Id::Num(*id as u64), + id: core::Id::Str(id.as_string()), method: subscription.method.clone(), params: Some(subscription.params.clone()), }; @@ -139,7 +147,7 @@ mod tests { Ok(Value::String("world".into())) } }); - GenericPollManager::new(io) + GenericPollManager::new_test(io) } #[test] @@ -148,7 +156,7 @@ mod tests { let mut el = reactor::Core::new().unwrap(); let mut poll_manager = poll_manager(); let (id, rx) = poll_manager.subscribe(Default::default(), "hello".into(), Params::None); - assert_eq!(id, SubscriptionId::Number(1)); + assert_eq!(id, SubscriptionId::String("0x416d77337e24399d".into())); // then poll_manager.tick().wait().unwrap(); diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index ce839428b..00df24731 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -48,14 +48,22 @@ impl EthPubSubClient { let heads_subscribers = Arc::new(Mutex::new(Subscribers::default())); EthPubSubClient { handler: Arc::new(ChainNotificationHandler { - client: client, - remote: remote, + client, + remote, heads_subscribers: heads_subscribers.clone(), }), - heads_subscribers: heads_subscribers, + heads_subscribers, } } + /// Creates new `EthPubSubCient` with deterministic subscription ids. + #[cfg(test)] + pub fn new_test(client: Arc, remote: Remote) -> Self { + let client = Self::new(client, remote); + *client.heads_subscribers.lock() = Subscribers::new_test(); + client + } + /// Returns a chain notification handler. pub fn handler(&self) -> Arc> { self.handler.clone() diff --git a/rpc/src/v1/impls/pubsub.rs b/rpc/src/v1/impls/pubsub.rs index 215141e84..eb39e5e89 100644 --- a/rpc/src/v1/impls/pubsub.rs +++ b/rpc/src/v1/impls/pubsub.rs @@ -55,12 +55,22 @@ impl> PubSubClient { ); PubSubClient { - poll_manager: poll_manager, - remote: remote, + poll_manager, + remote, } } } +impl PubSubClient { + /// Creates new `PubSubClient` with deterministic ids. + #[cfg(test)] + pub fn new_test(rpc: MetaIoHandler, remote: Remote) -> Self { + let client = Self::new(MetaIoHandler::with_middleware(Default::default()), remote); + *client.poll_manager.write() = GenericPollManager::new_test(rpc); + client + } +} + impl> PubSub for PubSubClient { type Metadata = Metadata; diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index ae1165068..b31d78796 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -36,7 +36,7 @@ fn should_subscribe_to_new_heads() { let h2 = client.block_hash_delta_minus(2); let h1 = client.block_hash_delta_minus(3); - let pubsub = EthPubSubClient::new(Arc::new(client), el.remote()); + let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote()); let handler = pubsub.handler(); let pubsub = pubsub.to_delegate(); @@ -49,13 +49,13 @@ fn should_subscribe_to_new_heads() { // Subscribe let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newHeads"], "id": 1}"#; - let response = r#"{"jsonrpc":"2.0","result":1,"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":"0x416d77337e24399d","id":1}"#; assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Check notifications handler.new_blocks(vec![], vec![], vec![h1], vec![], vec![], vec![], 0); let (res, receiver) = receiver.into_future().wait().unwrap(); - let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":1}}"#; + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#; assert_eq!(res, Some(response.into())); // Notify about two blocks @@ -63,14 +63,14 @@ fn should_subscribe_to_new_heads() { // Receive both let (res, receiver) = receiver.into_future().wait().unwrap(); - let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x2","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x2","parentHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":1}}"#; + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x2","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x2","parentHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#; assert_eq!(res, Some(response.into())); let (res, receiver) = receiver.into_future().wait().unwrap(); - let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x3","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0xdf04a98bb0c6fa8441bd429822f65a46d0cb553f6bcef602b973e65c81497f8e","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x3","parentHash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":1}}"#; + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x3","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0xdf04a98bb0c6fa8441bd429822f65a46d0cb553f6bcef602b973e65c81497f8e","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x3","parentHash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#; assert_eq!(res, Some(response.into())); // And unsubscribe - let request = r#"{"jsonrpc": "2.0", "method": "eth_unsubscribe", "params": [1], "id": 1}"#; + let request = r#"{"jsonrpc": "2.0", "method": "eth_unsubscribe", "params": ["0x416d77337e24399d"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned())); @@ -83,7 +83,7 @@ fn should_return_unimplemented() { // given let el = EventLoop::spawn(); let client = TestBlockChainClient::new(); - let pubsub = EthPubSubClient::new(Arc::new(client), el.remote()); + let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote()); let pubsub = pubsub.to_delegate(); let mut io = MetaIoHandler::default(); diff --git a/rpc/src/v1/tests/mocked/pubsub.rs b/rpc/src/v1/tests/mocked/pubsub.rs index b7c8963e5..99b34366c 100644 --- a/rpc/src/v1/tests/mocked/pubsub.rs +++ b/rpc/src/v1/tests/mocked/pubsub.rs @@ -42,7 +42,7 @@ fn should_subscribe_to_a_method() { // given let el = EventLoop::spawn(); let rpc = rpc(); - let pubsub = PubSubClient::new(rpc, el.remote()).to_delegate(); + let pubsub = PubSubClient::new_test(rpc, el.remote()).to_delegate(); let mut io = MetaIoHandler::default(); io.extend_with(pubsub); @@ -53,20 +53,22 @@ fn should_subscribe_to_a_method() { // Subscribe let request = r#"{"jsonrpc": "2.0", "method": "parity_subscribe", "params": ["hello", []], "id": 1}"#; - let response = r#"{"jsonrpc":"2.0","result":1,"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":"0x416d77337e24399d","id":1}"#; assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Check notifications let (res, receiver) = receiver.into_future().wait().unwrap(); - let response = r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"hello","subscription":1}}"#; + let response = + r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"hello","subscription":"0x416d77337e24399d"}}"#; assert_eq!(res, Some(response.into())); let (res, receiver) = receiver.into_future().wait().unwrap(); - let response = r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"world","subscription":1}}"#; + let response = + r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"world","subscription":"0x416d77337e24399d"}}"#; assert_eq!(res, Some(response.into())); // And unsubscribe - let request = r#"{"jsonrpc": "2.0", "method": "parity_unsubscribe", "params": [1], "id": 1}"#; + let request = r#"{"jsonrpc": "2.0", "method": "parity_unsubscribe", "params": ["0x416d77337e24399d"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned()));