Use randomized subscription ids for PubSub (#5756)
* Use randomized subscription ids for PubSub * Use H64 instead of H128. * iflet instead of match. * Adding 0x.
This commit is contained in:
parent
6afe0b0612
commit
57479dac27
@ -16,45 +16,79 @@
|
||||
|
||||
//! A map of subscribers.
|
||||
|
||||
use std::ops;
|
||||
use std::{ops, str};
|
||||
use std::collections::HashMap;
|
||||
use jsonrpc_macros::pubsub::{Subscriber, Sink, SubscriptionId};
|
||||
use rand::{Rng, StdRng};
|
||||
use v1::types::H64;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
||||
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
|
||||
pub struct Id(H64);
|
||||
impl str::FromStr for Id {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
if s.starts_with("0x") {
|
||||
Ok(Id(s[2..].parse().map_err(|e| format!("{}", e))?))
|
||||
} else {
|
||||
Err("The id must start with 0x".into())
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Id {
|
||||
pub fn as_string(&self) -> String {
|
||||
format!("0x{:?}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Subscribers<T> {
|
||||
next_id: u64,
|
||||
subscriptions: HashMap<u64, T>,
|
||||
rand: StdRng,
|
||||
subscriptions: HashMap<Id, T>,
|
||||
}
|
||||
|
||||
impl<T> Default for Subscribers<T> {
|
||||
fn default() -> Self {
|
||||
Subscribers {
|
||||
next_id: 0,
|
||||
rand: StdRng::new().expect("Valid random source is required."),
|
||||
subscriptions: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Subscribers<T> {
|
||||
fn next_id(&mut self) -> u64 {
|
||||
self.next_id += 1;
|
||||
self.next_id
|
||||
/// Create a new Subscribers with given random source.
|
||||
#[cfg(test)]
|
||||
pub fn new_test() -> Self {
|
||||
Subscribers {
|
||||
rand: ::rand::SeedableRng::from_seed([0usize].as_ref()),
|
||||
subscriptions: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn next_id(&mut self) -> Id {
|
||||
let mut data = H64::default();
|
||||
self.rand.fill_bytes(&mut data.0);
|
||||
Id(data)
|
||||
}
|
||||
|
||||
/// Insert new subscription and return assigned id.
|
||||
pub fn insert(&mut self, val: T) -> SubscriptionId {
|
||||
let id = self.next_id();
|
||||
debug!(target: "pubsub", "Adding subscription id={}", id);
|
||||
debug!(target: "pubsub", "Adding subscription id={:?}", id);
|
||||
let s = id.as_string();
|
||||
self.subscriptions.insert(id, val);
|
||||
SubscriptionId::Number(id)
|
||||
SubscriptionId::String(s)
|
||||
}
|
||||
|
||||
/// Removes subscription with given id and returns it (if any).
|
||||
pub fn remove(&mut self, id: &SubscriptionId) -> Option<T> {
|
||||
trace!(target: "pubsub", "Removing subscription id={:?}", id);
|
||||
match *id {
|
||||
SubscriptionId::Number(id) => {
|
||||
self.subscriptions.remove(&id)
|
||||
SubscriptionId::String(ref id) => match id.parse() {
|
||||
Ok(id) => self.subscriptions.remove(&id),
|
||||
Err(_) => None,
|
||||
},
|
||||
_ => None,
|
||||
}
|
||||
@ -65,20 +99,15 @@ impl <T> Subscribers<Sink<T>> {
|
||||
/// Assigns id and adds a subscriber to the list.
|
||||
pub fn push(&mut self, sub: Subscriber<T>) {
|
||||
let id = self.next_id();
|
||||
match sub.assign_id(SubscriptionId::Number(id)) {
|
||||
Ok(sink) => {
|
||||
debug!(target: "pubsub", "Adding subscription id={:?}", id);
|
||||
self.subscriptions.insert(id, sink);
|
||||
},
|
||||
Err(_) => {
|
||||
self.next_id -= 1;
|
||||
},
|
||||
if let Ok(sink) = sub.assign_id(SubscriptionId::String(id.as_string())) {
|
||||
debug!(target: "pubsub", "Adding subscription id={:?}", id);
|
||||
self.subscriptions.insert(id, sink);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ops::Deref for Subscribers<T> {
|
||||
type Target = HashMap<u64, T>;
|
||||
type Target = HashMap<Id, T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.subscriptions
|
||||
|
@ -54,6 +54,14 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates new poll manager with deterministic ids.
|
||||
#[cfg(test)]
|
||||
pub fn new_test(rpc: MetaIoHandler<Metadata, S>) -> Self {
|
||||
let mut manager = Self::new(rpc);
|
||||
manager.subscribers = Subscribers::new_test();
|
||||
manager
|
||||
}
|
||||
|
||||
/// Subscribes to update from polling given method.
|
||||
pub fn subscribe(&mut self, metadata: Metadata, method: String, params: core::Params)
|
||||
-> (SubscriptionId, mpsc::Receiver<Result<core::Value, core::Error>>)
|
||||
@ -81,7 +89,7 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
|
||||
for (id, subscription) in self.subscribers.iter() {
|
||||
let call = core::MethodCall {
|
||||
jsonrpc: Some(core::Version::V2),
|
||||
id: core::Id::Num(*id as u64),
|
||||
id: core::Id::Str(id.as_string()),
|
||||
method: subscription.method.clone(),
|
||||
params: Some(subscription.params.clone()),
|
||||
};
|
||||
@ -139,7 +147,7 @@ mod tests {
|
||||
Ok(Value::String("world".into()))
|
||||
}
|
||||
});
|
||||
GenericPollManager::new(io)
|
||||
GenericPollManager::new_test(io)
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -148,7 +156,7 @@ mod tests {
|
||||
let mut el = reactor::Core::new().unwrap();
|
||||
let mut poll_manager = poll_manager();
|
||||
let (id, rx) = poll_manager.subscribe(Default::default(), "hello".into(), Params::None);
|
||||
assert_eq!(id, SubscriptionId::Number(1));
|
||||
assert_eq!(id, SubscriptionId::String("0x416d77337e24399d".into()));
|
||||
|
||||
// then
|
||||
poll_manager.tick().wait().unwrap();
|
||||
|
@ -48,14 +48,22 @@ impl<C> EthPubSubClient<C> {
|
||||
let heads_subscribers = Arc::new(Mutex::new(Subscribers::default()));
|
||||
EthPubSubClient {
|
||||
handler: Arc::new(ChainNotificationHandler {
|
||||
client: client,
|
||||
remote: remote,
|
||||
client,
|
||||
remote,
|
||||
heads_subscribers: heads_subscribers.clone(),
|
||||
}),
|
||||
heads_subscribers: heads_subscribers,
|
||||
heads_subscribers,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates new `EthPubSubCient` with deterministic subscription ids.
|
||||
#[cfg(test)]
|
||||
pub fn new_test(client: Arc<C>, remote: Remote) -> Self {
|
||||
let client = Self::new(client, remote);
|
||||
*client.heads_subscribers.lock() = Subscribers::new_test();
|
||||
client
|
||||
}
|
||||
|
||||
/// Returns a chain notification handler.
|
||||
pub fn handler(&self) -> Arc<ChainNotificationHandler<C>> {
|
||||
self.handler.clone()
|
||||
|
@ -55,12 +55,22 @@ impl<S: core::Middleware<Metadata>> PubSubClient<S> {
|
||||
);
|
||||
|
||||
PubSubClient {
|
||||
poll_manager: poll_manager,
|
||||
remote: remote,
|
||||
poll_manager,
|
||||
remote,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PubSubClient<core::NoopMiddleware> {
|
||||
/// Creates new `PubSubClient` with deterministic ids.
|
||||
#[cfg(test)]
|
||||
pub fn new_test(rpc: MetaIoHandler<Metadata, core::NoopMiddleware>, remote: Remote) -> Self {
|
||||
let client = Self::new(MetaIoHandler::with_middleware(Default::default()), remote);
|
||||
*client.poll_manager.write() = GenericPollManager::new_test(rpc);
|
||||
client
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: core::Middleware<Metadata>> PubSub for PubSubClient<S> {
|
||||
type Metadata = Metadata;
|
||||
|
||||
|
@ -36,7 +36,7 @@ fn should_subscribe_to_new_heads() {
|
||||
let h2 = client.block_hash_delta_minus(2);
|
||||
let h1 = client.block_hash_delta_minus(3);
|
||||
|
||||
let pubsub = EthPubSubClient::new(Arc::new(client), el.remote());
|
||||
let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote());
|
||||
let handler = pubsub.handler();
|
||||
let pubsub = pubsub.to_delegate();
|
||||
|
||||
@ -49,13 +49,13 @@ fn should_subscribe_to_new_heads() {
|
||||
|
||||
// Subscribe
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newHeads"], "id": 1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","result":1,"id":1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","result":"0x416d77337e24399d","id":1}"#;
|
||||
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
||||
|
||||
// Check notifications
|
||||
handler.new_blocks(vec![], vec![], vec![h1], vec![], vec![], vec![], 0);
|
||||
let (res, receiver) = receiver.into_future().wait().unwrap();
|
||||
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":1}}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#;
|
||||
assert_eq!(res, Some(response.into()));
|
||||
|
||||
// Notify about two blocks
|
||||
@ -63,14 +63,14 @@ fn should_subscribe_to_new_heads() {
|
||||
|
||||
// Receive both
|
||||
let (res, receiver) = receiver.into_future().wait().unwrap();
|
||||
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x2","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x2","parentHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":1}}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x2","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x2","parentHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#;
|
||||
assert_eq!(res, Some(response.into()));
|
||||
let (res, receiver) = receiver.into_future().wait().unwrap();
|
||||
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x3","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0xdf04a98bb0c6fa8441bd429822f65a46d0cb553f6bcef602b973e65c81497f8e","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x3","parentHash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":1}}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x3","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0xdf04a98bb0c6fa8441bd429822f65a46d0cb553f6bcef602b973e65c81497f8e","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x3","parentHash":"0x44e5ecf454ea99af9d8a8f2ca0daba96964c90de05db7a78f59b84ae9e749706","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#;
|
||||
assert_eq!(res, Some(response.into()));
|
||||
|
||||
// And unsubscribe
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "eth_unsubscribe", "params": [1], "id": 1}"#;
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "eth_unsubscribe", "params": ["0x416d77337e24399d"], "id": 1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
|
||||
assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned()));
|
||||
|
||||
@ -83,7 +83,7 @@ fn should_return_unimplemented() {
|
||||
// given
|
||||
let el = EventLoop::spawn();
|
||||
let client = TestBlockChainClient::new();
|
||||
let pubsub = EthPubSubClient::new(Arc::new(client), el.remote());
|
||||
let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote());
|
||||
let pubsub = pubsub.to_delegate();
|
||||
|
||||
let mut io = MetaIoHandler::default();
|
||||
|
@ -42,7 +42,7 @@ fn should_subscribe_to_a_method() {
|
||||
// given
|
||||
let el = EventLoop::spawn();
|
||||
let rpc = rpc();
|
||||
let pubsub = PubSubClient::new(rpc, el.remote()).to_delegate();
|
||||
let pubsub = PubSubClient::new_test(rpc, el.remote()).to_delegate();
|
||||
|
||||
let mut io = MetaIoHandler::default();
|
||||
io.extend_with(pubsub);
|
||||
@ -53,20 +53,22 @@ fn should_subscribe_to_a_method() {
|
||||
|
||||
// Subscribe
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "parity_subscribe", "params": ["hello", []], "id": 1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","result":1,"id":1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","result":"0x416d77337e24399d","id":1}"#;
|
||||
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
||||
|
||||
// Check notifications
|
||||
let (res, receiver) = receiver.into_future().wait().unwrap();
|
||||
let response = r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"hello","subscription":1}}"#;
|
||||
let response =
|
||||
r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"hello","subscription":"0x416d77337e24399d"}}"#;
|
||||
assert_eq!(res, Some(response.into()));
|
||||
|
||||
let (res, receiver) = receiver.into_future().wait().unwrap();
|
||||
let response = r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"world","subscription":1}}"#;
|
||||
let response =
|
||||
r#"{"jsonrpc":"2.0","method":"parity_subscription","params":{"result":"world","subscription":"0x416d77337e24399d"}}"#;
|
||||
assert_eq!(res, Some(response.into()));
|
||||
|
||||
// And unsubscribe
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "parity_unsubscribe", "params": [1], "id": 1}"#;
|
||||
let request = r#"{"jsonrpc": "2.0", "method": "parity_unsubscribe", "params": ["0x416d77337e24399d"], "id": 1}"#;
|
||||
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
|
||||
assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned()));
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user