// Copyright 2015-2017 Parity Technologies (UK) Ltd. // This file is part of Parity. // Parity is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity. If not, see . use std::io; use std::sync::Arc; use dapps; use parity_rpc::informant::{RpcStats, Middleware}; use parity_rpc::{self as rpc, HttpServerError, Metadata, Origin, DomainsValidation}; use helpers::parity_ipc_path; use jsonrpc_core::{futures, MetaIoHandler}; use parity_reactor::TokioRemote; use rpc_apis::{self, ApiSet}; pub use parity_rpc::{IpcServer, HttpServer, RequestMiddleware}; pub use parity_rpc::ws::Server as WsServer; #[derive(Debug, Clone, PartialEq)] pub struct HttpConfiguration { pub enabled: bool, pub interface: String, pub port: u16, pub apis: ApiSet, pub cors: Option>, pub hosts: Option>, pub threads: Option, } impl Default for HttpConfiguration { fn default() -> Self { HttpConfiguration { enabled: true, interface: "127.0.0.1".into(), port: 8545, apis: ApiSet::UnsafeContext, cors: None, hosts: Some(Vec::new()), threads: None, } } } #[derive(Debug, PartialEq)] pub struct IpcConfiguration { pub enabled: bool, pub socket_addr: String, pub apis: ApiSet, } impl Default for IpcConfiguration { fn default() -> Self { IpcConfiguration { enabled: true, socket_addr: if cfg!(windows) { r"\\.\pipe\jsonrpc.ipc".into() } else { let data_dir = ::dir::default_data_path(); parity_ipc_path(&data_dir, "$BASE/jsonrpc.ipc", 0) }, apis: ApiSet::IpcContext, } } } #[derive(Debug, PartialEq)] pub struct WsConfiguration { pub enabled: bool, pub interface: String, pub port: u16, pub apis: ApiSet, pub origins: Option>, pub hosts: Option>, } impl Default for WsConfiguration { fn default() -> Self { WsConfiguration { enabled: true, interface: "127.0.0.1".into(), port: 8546, apis: ApiSet::UnsafeContext, origins: Some(Vec::new()), hosts: Some(Vec::new()), } } } pub struct Dependencies { pub apis: Arc, pub remote: TokioRemote, pub stats: Arc, } pub struct RpcExtractor; impl rpc::HttpMetaExtractor for RpcExtractor { type Metadata = Metadata; fn read_metadata(&self, origin: String, dapps_origin: Option) -> Metadata { let mut metadata = Metadata::default(); metadata.origin = match (origin.as_str(), dapps_origin) { ("null", Some(dapp)) => Origin::Dapps(dapp.into()), _ => Origin::Rpc(origin), }; metadata } } impl rpc::IpcMetaExtractor for RpcExtractor { fn extract(&self, _req: &rpc::IpcRequestContext) -> Metadata { let mut metadata = Metadata::default(); // TODO [ToDr] Extract proper session id when it's available in context. metadata.origin = Origin::Ipc(1.into()); metadata } } struct Sender(rpc::ws::ws::Sender, futures::sync::mpsc::Receiver); impl futures::Future for Sender { type Item = (); type Error = (); fn poll(&mut self) -> futures::Poll { use self::futures::Stream; let item = self.1.poll()?; match item { futures::Async::NotReady => { Ok(futures::Async::NotReady) }, futures::Async::Ready(None) => { Ok(futures::Async::Ready(())) }, futures::Async::Ready(Some(val)) => { if let Err(e) = self.0.send(val) { warn!("Error sending a subscription update: {:?}", e); } self.poll() }, } } } struct WsRpcExtractor { remote: TokioRemote, } impl WsRpcExtractor { fn wrap_out(&self, out: rpc::ws::ws::Sender) -> futures::sync::mpsc::Sender { let (sender, receiver) = futures::sync::mpsc::channel(8); self.remote.spawn(move |_| Sender(out, receiver)); sender } } impl rpc::ws::MetaExtractor for WsRpcExtractor { fn extract(&self, req: &rpc::ws::RequestContext) -> Metadata { let mut metadata = Metadata::default(); let id = req.session_id as u64; metadata.origin = Origin::Ws(id.into()); metadata.session = Some(Arc::new(rpc::PubSubSession::new( self.wrap_out(req.out.clone()) ))); metadata } } struct WsStats { stats: Arc, } impl rpc::ws::SessionStats for WsStats { fn open_session(&self, _id: rpc::ws::SessionId) { self.stats.open_session() } fn close_session(&self, _id: rpc::ws::SessionId) { self.stats.close_session() } } fn setup_apis(apis: ApiSet, deps: &Dependencies) -> MetaIoHandler> where D: rpc_apis::Dependencies { rpc_apis::setup_rpc(deps.stats.clone(), &*deps.apis, apis) } pub fn new_ws( conf: WsConfiguration, deps: &Dependencies, ) -> Result, String> { if !conf.enabled { return Ok(None); } let url = format!("{}:{}", conf.interface, conf.port); let addr = url.parse().map_err(|_| format!("Invalid WebSockets listen host/port given: {}", url))?; let handler = setup_apis(conf.apis, deps); let remote = deps.remote.clone(); let allowed_origins = into_domains(conf.origins); let allowed_hosts = into_domains(conf.hosts); let start_result = rpc::start_ws( &addr, handler, remote.clone(), allowed_origins, allowed_hosts, WsRpcExtractor { remote: remote, }, WsStats { stats: deps.stats.clone(), }, ); match start_result { Ok(server) => Ok(Some(server)), Err(rpc::ws::Error::Io(ref err)) if err.kind() == io::ErrorKind::AddrInUse => Err( format!("WebSockets address {} is already in use, make sure that another instance of an Ethereum client is not running or change the address using the --ws-port and --ws-interface options.", url) ), Err(e) => Err(format!("WebSockets error: {:?}", e)), } } pub fn new_http( conf: HttpConfiguration, deps: &Dependencies, middleware: Option ) -> Result, String> { if !conf.enabled { return Ok(None); } let url = format!("{}:{}", conf.interface, conf.port); let addr = url.parse().map_err(|_| format!("Invalid HTTP JSON-RPC listen host/port given: {}", url))?; let handler = setup_apis(conf.apis, deps); let remote = deps.remote.clone(); let cors_domains = into_domains(conf.cors); let allowed_hosts = into_domains(conf.hosts); let start_result = rpc::start_http( &addr, cors_domains, allowed_hosts, handler, remote, RpcExtractor, match (conf.threads, middleware) { (Some(threads), None) => rpc::HttpSettings::Threads(threads), (None, middleware) => rpc::HttpSettings::Dapps(middleware), (Some(_), Some(_)) => { return Err("Dapps and fast multi-threaded RPC server cannot be enabled at the same time.".into()) }, } ); match start_result { Ok(server) => Ok(Some(server)), Err(HttpServerError::Io(ref err)) if err.kind() == io::ErrorKind::AddrInUse => Err( format!("HTTP address {} is already in use, make sure that another instance of an Ethereum client is not running or change the address using the --jsonrpc-port and --jsonrpc-interface options.", url) ), Err(e) => Err(format!("HTTP error: {:?}", e)), } } fn into_domains>(items: Option>) -> DomainsValidation { items.map(|vals| vals.into_iter().map(T::from).collect()).into() } pub fn new_ipc( conf: IpcConfiguration, dependencies: &Dependencies ) -> Result, String> { if !conf.enabled { return Ok(None); } let handler = setup_apis(conf.apis, dependencies); let remote = dependencies.remote.clone(); let ipc = rpc::start_ipc( &conf.socket_addr, handler, remote, RpcExtractor, ); match ipc { Ok(server) => Ok(Some(server)), Err(io_error) => Err(format!("IPC error: {}", io_error)), } } #[cfg(test)] mod tests { use super::RpcExtractor; use parity_rpc::{HttpMetaExtractor, Origin}; #[test] fn should_extract_rpc_origin() { // given let extractor = RpcExtractor; // when let meta = extractor.read_metadata("http://parity.io".into(), None); let meta1 = extractor.read_metadata("http://parity.io".into(), Some("ignored".into())); // then assert_eq!(meta.origin, Origin::Rpc("http://parity.io".into())); assert_eq!(meta1.origin, Origin::Rpc("http://parity.io".into())); } #[test] fn should_dapps_origin() { // given let extractor = RpcExtractor; let dapp = "https://wallet.ethereum.org".to_owned(); // when let meta = extractor.read_metadata("null".into(), Some(dapp.clone())); // then assert_eq!(meta.origin, Origin::Dapps(dapp.into())); } }