// Copyright 2015-2017 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . //! Eth PUB-SUB rpc implementation. use std::sync::Arc; use std::collections::BTreeMap; use futures::{self, BoxFuture, Future}; use jsonrpc_core::Error; use jsonrpc_macros::Trailing; use jsonrpc_macros::pubsub::{Sink, Subscriber}; use jsonrpc_pubsub::SubscriptionId; use v1::helpers::{errors, Subscribers}; use v1::metadata::Metadata; use v1::traits::EthPubSub; use v1::types::{pubsub, RichHeader}; use ethcore::encoded; use ethcore::client::{BlockChainClient, ChainNotify, BlockId}; use light::client::{LightChainClient, LightChainNotify}; use parity_reactor::Remote; use util::{Mutex, H256, Bytes}; /// Eth PubSub implementation. pub struct EthPubSubClient { handler: Arc>, heads_subscribers: Arc>>>, } impl EthPubSubClient { /// Creates new `EthPubSubClient`. pub fn new(client: Arc, remote: Remote) -> Self { let heads_subscribers = Arc::new(Mutex::new(Subscribers::default())); EthPubSubClient { handler: Arc::new(ChainNotificationHandler { client: client, remote: remote, heads_subscribers: heads_subscribers.clone(), }), heads_subscribers: heads_subscribers, } } /// Returns a chain notification handler. pub fn handler(&self) -> Arc> { self.handler.clone() } } /// PubSub Notification handler. pub struct ChainNotificationHandler { client: Arc, remote: Remote, heads_subscribers: Arc>>>, } impl ChainNotificationHandler { fn notify(&self, blocks: Vec<(encoded::Header, BTreeMap)>) { for subscriber in self.heads_subscribers.lock().values() { for &(ref block, ref extra_info) in &blocks { self.remote.spawn(subscriber .notify(pubsub::Result::Header(RichHeader { inner: block.into(), extra_info: extra_info.clone(), })) .map(|_| ()) .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) ); } } } } impl LightChainNotify for ChainNotificationHandler { fn new_headers( &self, headers: &[H256], ) { let blocks = headers .iter() .filter_map(|hash| self.client.block_header(BlockId::Hash(*hash))) .map(|header| (header, Default::default())) .collect(); self.notify(blocks); } } impl ChainNotify for ChainNotificationHandler { fn new_blocks( &self, _imported: Vec, _invalid: Vec, enacted: Vec, _retracted: Vec, _sealed: Vec, // Block bytes. _proposed: Vec, _duration: u64, ) { const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed"; let blocks = enacted .into_iter() .filter_map(|hash| self.client.block_header(BlockId::Hash(hash))) .map(|header| { let hash = header.hash(); (header, self.client.block_extra_info(BlockId::Hash(hash)).expect(EXTRA_INFO_PROOF)) }) .collect(); self.notify(blocks); } } impl EthPubSub for EthPubSubClient { type Metadata = Metadata; fn subscribe( &self, _meta: Metadata, subscriber: Subscriber, kind: pubsub::Kind, params: Trailing, ) { match (kind, params.0) { (pubsub::Kind::NewHeads, pubsub::Params::None) => { self.heads_subscribers.lock().push(subscriber) }, _ => { let _ = subscriber.reject(errors::unimplemented(None)); }, } } fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture { let res = self.heads_subscribers.lock().remove(&id).is_some(); futures::future::ok(res).boxed() } }