WebSockets RPC server (#5425)

* Basic WS server.

* CLI for WS server.

* Bump jsonrpc

* Fixing test.
This commit is contained in:
Tomasz Drwięga
2017-04-13 16:32:07 +02:00
committed by Marek Kotewicz
parent 1df30ee83e
commit ea09aa584d
28 changed files with 388 additions and 141 deletions

View File

@@ -52,6 +52,14 @@ cors = "null"
apis = ["web3", "eth", "net", "parity", "traces", "rpc"]
hosts = ["none"]
[websockets]
disable = false
port = 8546
interface = "local"
origins = ["none"]
apis = ["web3", "eth", "net", "parity", "traces", "rpc"]
hosts = ["none"]
[ipc]
disable = false
path = "$HOME/.parity/jsonrpc.ipc"

View File

@@ -24,6 +24,9 @@ allow_ips = "public"
reserved_only = true
reserved_peers = "./path/to/reserved_peers"
[websockets]
disable = true
origins = ["none"]
[rpc]
disable = true

View File

@@ -170,6 +170,20 @@ usage! {
flag_jsonrpc_threads: Option<usize> = None,
or |c: &Config| otry!(c.rpc).threads.map(Some),
// WS
flag_no_ws: bool = false,
or |c: &Config| otry!(c.websockets).disable.clone(),
flag_ws_port: u16 = 8546u16,
or |c: &Config| otry!(c.websockets).port.clone(),
flag_ws_interface: String = "local",
or |c: &Config| otry!(c.websockets).interface.clone(),
flag_ws_apis: String = "web3,eth,net,parity,traces,rpc",
or |c: &Config| otry!(c.websockets).apis.as_ref().map(|vec| vec.join(",")),
flag_ws_origins: String = "none",
or |c: &Config| otry!(c.websockets).origins.as_ref().map(|vec| vec.join(",")),
flag_ws_hosts: String = "none",
or |c: &Config| otry!(c.websockets).hosts.as_ref().map(|vec| vec.join(",")),
// IPC
flag_no_ipc: bool = false,
or |c: &Config| otry!(c.ipc).disable.clone(),
@@ -363,6 +377,7 @@ struct Config {
ui: Option<Ui>,
network: Option<Network>,
rpc: Option<Rpc>,
websockets: Option<Ws>,
ipc: Option<Ipc>,
dapps: Option<Dapps>,
secretstore: Option<SecretStore>,
@@ -440,6 +455,16 @@ struct Rpc {
threads: Option<usize>,
}
#[derive(Default, Debug, PartialEq, RustcDecodable)]
struct Ws {
disable: Option<bool>,
port: Option<u16>,
interface: Option<String>,
apis: Option<Vec<String>>,
origins: Option<Vec<String>>,
hosts: Option<Vec<String>>,
}
#[derive(Default, Debug, PartialEq, RustcDecodable)]
struct Ipc {
disable: Option<bool>,
@@ -554,7 +579,7 @@ struct Misc {
mod tests {
use super::{
Args, ArgsError,
Config, Operating, Account, Ui, Network, Rpc, Ipc, Dapps, Ipfs, Mining, Footprint,
Config, Operating, Account, Ui, Network, Ws, Rpc, Ipc, Dapps, Ipfs, Mining, Footprint,
Snapshots, VM, Misc, SecretStore,
};
use toml;
@@ -699,6 +724,14 @@ mod tests {
flag_jsonrpc_hosts: "none".into(),
flag_jsonrpc_threads: None,
// WS
flag_no_ws: false,
flag_ws_port: 8546u16,
flag_ws_interface: "local".into(),
flag_ws_apis: "web3,eth,net,parity,traces,rpc".into(),
flag_ws_origins: "none".into(),
flag_ws_hosts: "none".into(),
// IPC
flag_no_ipc: false,
flag_ipc_path: "$HOME/.parity/jsonrpc.ipc".into(),
@@ -899,6 +932,14 @@ mod tests {
reserved_only: Some(true),
no_serve_light: None,
}),
websockets: Some(Ws {
disable: Some(true),
port: None,
interface: None,
apis: None,
origins: Some(vec!["none".into()]),
hosts: None,
}),
rpc: Some(Rpc {
disable: Some(true),
port: Some(8180),

View File

@@ -157,7 +157,28 @@ API and Console Options:
vectors. Special options: "all", "none",
(default: {flag_jsonrpc_hosts}).
--jsonrpc-threads THREADS Enables experimental faster implementation of JSON-RPC server.
Requires Dapps server to be disabled using --no-dapps. (default: {flag_jsonrpc_threads:?})
Requires Dapps server to be disabled
using --no-dapps. (default: {flag_jsonrpc_threads:?})
--no-ws Disable the WebSockets server. (default: {flag_no_ws})
--ws-port PORT Specify the port portion of the WebSockets server
(default: {flag_ws_port}).
--ws-interface IP Specify the hostname portion of the WebSockets
server, IP should be an interface's IP address, or
all (all interfaces) or local (default: {flag_ws_interface}).
--ws-apis APIS Specify the APIs available through the WebSockets
interface. APIS is a comma-delimited list of API
name. Possible name are web3, eth, net, personal,
parity, parity_set, traces, rpc, parity_accounts.
(default: {flag_ws_apis}).
--ws-origins URL Specify Origin header values allowed to connect.
Special options: "all", "none".
(default: {flag_ws_origins})
--ws-hosts HOSTS List of allowed Host header values. This option will
validate the Host header sent by the browser, it
is additional security against some attack
vectors. Special options: "all", "none",
(default: {flag_ws_hosts}).
--no-ipc Disable JSON-RPC over IPC service. (default: {flag_no_ipc})
--ipc-path PATH Specify custom path for JSON-RPC over IPC service

View File

@@ -30,9 +30,9 @@ use ethcore::client::{VMType};
use ethcore::miner::{MinerOptions, Banning, StratumOptions};
use ethcore::verification::queue::VerifierSettings;
use rpc::{IpcConfiguration, HttpConfiguration};
use rpc::{IpcConfiguration, HttpConfiguration, WsConfiguration};
use rpc_apis::ApiSet;
use ethcore_rpc::NetworkSettings;
use parity_rpc::NetworkSettings;
use cache::CacheConfig;
use helpers::{to_duration, to_mode, to_block_id, to_u256, to_pending_set, to_price, replace_home, replace_home_for_db,
geth_ipc_path, parity_ipc_path, to_bootnodes, to_addresses, to_address, to_gas_limit, to_queue_strategy};
@@ -114,6 +114,7 @@ impl Configuration {
};
let update_policy = self.update_policy()?;
let logger_config = self.logger_config();
let ws_conf = self.ws_config()?;
let http_conf = self.http_config()?;
let ipc_conf = self.ipc_config()?;
let net_conf = self.net_config()?;
@@ -352,6 +353,7 @@ impl Configuration {
daemon: daemon,
logger_config: logger_config.clone(),
miner_options: miner_options,
ws_conf: ws_conf,
http_conf: http_conf,
ipc_conf: ipc_conf,
net_conf: net_conf,
@@ -757,6 +759,14 @@ impl Configuration {
Self::hosts(&self.args.flag_jsonrpc_hosts)
}
fn ws_hosts(&self) -> Option<Vec<String>> {
Self::hosts(&self.args.flag_ws_hosts)
}
fn ws_origins(&self) -> Option<Vec<String>> {
Self::hosts(&self.args.flag_ws_origins)
}
fn ipfs_hosts(&self) -> Option<Vec<String>> {
Self::hosts(&self.args.flag_ipfs_api_hosts)
}
@@ -801,6 +811,19 @@ impl Configuration {
Ok(conf)
}
fn ws_config(&self) -> Result<WsConfiguration, String> {
let conf = WsConfiguration {
enabled: self.ws_enabled(),
interface: self.ws_interface(),
port: self.args.flag_ws_port,
apis: self.args.flag_ws_apis.parse()?,
hosts: self.ws_hosts(),
origins: self.ws_origins()
};
Ok(conf)
}
fn network_settings(&self) -> NetworkSettings {
NetworkSettings {
name: self.args.flag_identity.clone(),
@@ -913,6 +936,10 @@ impl Configuration {
Self::interface(&self.network_settings().rpc_interface)
}
fn ws_interface(&self) -> String {
Self::interface(&self.args.flag_ws_interface)
}
fn ipfs_interface(&self) -> String {
Self::interface(&self.args.flag_ipfs_api_interface)
}
@@ -965,6 +992,10 @@ impl Configuration {
!self.args.flag_jsonrpc_off && !self.args.flag_no_jsonrpc
}
fn ws_enabled(&self) -> bool {
!self.args.flag_no_ws
}
fn dapps_enabled(&self) -> bool {
!self.args.flag_dapps_off && !self.args.flag_no_dapps && self.rpc_enabled() && cfg!(feature = "dapps")
}
@@ -1000,7 +1031,7 @@ impl Configuration {
mod tests {
use super::*;
use cli::Args;
use ethcore_rpc::NetworkSettings;
use parity_rpc::NetworkSettings;
use ethcore::client::{VMType, BlockId};
use ethcore::miner::{MinerOptions, PrioritizationStrategy};
use helpers::{default_network_config};
@@ -1204,6 +1235,7 @@ mod tests {
daemon: None,
logger_config: Default::default(),
miner_options: Default::default(),
ws_conf: Default::default(),
http_conf: Default::default(),
ipc_conf: Default::default(),
net_conf: default_network_config(),

View File

@@ -109,7 +109,7 @@ pub use self::server::{SyncStatus, Middleware, dapps_middleware};
mod server {
use super::Dependencies;
use std::path::PathBuf;
use ethcore_rpc::{hyper, RequestMiddleware, RequestMiddlewareAction};
use parity_rpc::{hyper, RequestMiddleware, RequestMiddlewareAction};
pub type SyncStatus = Fn() -> bool;

View File

@@ -30,8 +30,8 @@ use ethcore::service::ClientIoMessage;
use ethcore::snapshot::service::Service as SnapshotService;
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
use number_prefix::{binary_prefix, Standalone, Prefixed};
use ethcore_rpc::{is_major_importing};
use ethcore_rpc::informant::RpcStats;
use parity_rpc::{is_major_importing};
use parity_rpc::informant::RpcStats;
pub struct Informant {
report: RwLock<Option<ClientReport>>,

View File

@@ -51,18 +51,18 @@ extern crate ethcore_ipc_hypervisor as hypervisor;
extern crate ethcore_ipc_nano as nanoipc;
extern crate ethcore_light as light;
extern crate ethcore_logger;
extern crate ethcore_rpc;
extern crate ethcore_signer;
extern crate ethcore_util as util;
extern crate ethkey;
extern crate ethsync;
extern crate parity_hash_fetch as hash_fetch;
extern crate parity_ipfs_api;
extern crate parity_reactor;
extern crate parity_updater as updater;
extern crate parity_local_store as local_store;
extern crate rpc_cli;
extern crate parity_reactor;
extern crate parity_rpc;
extern crate parity_updater as updater;
extern crate path;
extern crate rpc_cli;
#[macro_use]
extern crate log as rlog;

View File

@@ -14,19 +14,20 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{io, fmt};
use std::io;
use std::sync::Arc;
use dapps;
use dir::default_data_path;
use ethcore_rpc::informant::{RpcStats, Middleware};
use ethcore_rpc::{self as rpc, HttpServerError, Metadata, Origin, AccessControlAllowOrigin, Host};
use parity_rpc::informant::{RpcStats, Middleware};
use parity_rpc::{self as rpc, HttpServerError, Metadata, Origin, DomainsValidation};
use helpers::parity_ipc_path;
use jsonrpc_core::MetaIoHandler;
use parity_reactor::TokioRemote;
use rpc_apis::{self, ApiSet};
pub use ethcore_rpc::{IpcServer, HttpServer, RequestMiddleware};
pub use parity_rpc::{IpcServer, HttpServer, RequestMiddleware};
pub use parity_rpc::ws::Server as WsServer;
#[derive(Debug, Clone, PartialEq)]
pub struct HttpConfiguration {
@@ -71,12 +72,25 @@ impl Default for IpcConfiguration {
}
}
impl fmt::Display for IpcConfiguration {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.enabled {
write!(f, "endpoint address [{}], api list [{:?}]", self.socket_addr, self.apis)
} else {
write!(f, "disabled")
#[derive(Debug, PartialEq)]
pub struct WsConfiguration {
pub enabled: bool,
pub interface: String,
pub port: u16,
pub apis: ApiSet,
pub origins: Option<Vec<String>>,
pub hosts: Option<Vec<String>>,
}
impl Default for WsConfiguration {
fn default() -> Self {
WsConfiguration {
enabled: true,
interface: "127.0.0.1".into(),
port: 8546,
apis: ApiSet::UnsafeContext,
origins: Some(Vec::new()),
hosts: Some(Vec::new()),
}
}
}
@@ -112,12 +126,71 @@ impl rpc::IpcMetaExtractor<Metadata> for RpcExtractor {
}
}
impl rpc::ws::MetaExtractor<Metadata> for RpcExtractor {
fn extract(&self, req: &rpc::ws::RequestContext) -> Metadata {
let mut metadata = Metadata::default();
let id = req.session_id as u64;
metadata.origin = Origin::Ws(id.into());
metadata
}
}
struct WsStats {
stats: Arc<RpcStats>,
}
impl rpc::ws::SessionStats for WsStats {
fn open_session(&self, _id: rpc::ws::SessionId) {
self.stats.open_session()
}
fn close_session(&self, _id: rpc::ws::SessionId) {
self.stats.close_session()
}
}
fn setup_apis<D>(apis: ApiSet, deps: &Dependencies<D>) -> MetaIoHandler<Metadata, Middleware<D::Notifier>>
where D: rpc_apis::Dependencies
{
rpc_apis::setup_rpc(deps.stats.clone(), &*deps.apis, apis)
}
pub fn new_ws<D: rpc_apis::Dependencies>(
conf: WsConfiguration,
deps: &Dependencies<D>,
) -> Result<Option<WsServer>, String> {
if !conf.enabled {
return Ok(None);
}
let url = format!("{}:{}", conf.interface, conf.port);
let addr = url.parse().map_err(|_| format!("Invalid WebSockets listen host/port given: {}", url))?;
let handler = setup_apis(conf.apis, deps);
let remote = deps.remote.clone();
let allowed_origins = into_domains(conf.origins);
let allowed_hosts = into_domains(conf.hosts);
let start_result = rpc::start_ws(
&addr,
handler,
remote,
allowed_origins,
allowed_hosts,
RpcExtractor,
WsStats {
stats: deps.stats.clone(),
},
);
match start_result {
Ok(server) => Ok(Some(server)),
Err(rpc::ws::Error::Io(ref err)) if err.kind() == io::ErrorKind::AddrInUse => Err(
format!("WebSockets address {} is already in use, make sure that another instance of an Ethereum client is not running or change the address using the --ws-port and --ws-interface options.", url)
),
Err(e) => Err(format!("WebSockets error: {:?}", e)),
}
}
pub fn new_http<D: rpc_apis::Dependencies>(
conf: HttpConfiguration,
deps: &Dependencies<D>,
@@ -128,17 +201,17 @@ pub fn new_http<D: rpc_apis::Dependencies>(
}
let url = format!("{}:{}", conf.interface, conf.port);
let addr = url.parse().map_err(|_| format!("Invalid JSONRPC listen host/port given: {}", url))?;
let addr = url.parse().map_err(|_| format!("Invalid HTTP JSON-RPC listen host/port given: {}", url))?;
let handler = setup_apis(conf.apis, deps);
let remote = deps.remote.clone();
let cors_domains: Option<Vec<_>> = conf.cors.map(|domains| domains.into_iter().map(AccessControlAllowOrigin::from).collect());
let allowed_hosts: Option<Vec<_>> = conf.hosts.map(|hosts| hosts.into_iter().map(Host::from).collect());
let cors_domains = into_domains(conf.cors);
let allowed_hosts = into_domains(conf.hosts);
let start_result = rpc::start_http(
&addr,
cors_domains.into(),
allowed_hosts.into(),
cors_domains,
allowed_hosts,
handler,
remote,
RpcExtractor,
@@ -153,16 +226,17 @@ pub fn new_http<D: rpc_apis::Dependencies>(
match start_result {
Ok(server) => Ok(Some(server)),
Err(HttpServerError::Io(err)) => match err.kind() {
io::ErrorKind::AddrInUse => Err(
format!("RPC address {} is already in use, make sure that another instance of an Ethereum client is not running or change the address using the --jsonrpc-port and --jsonrpc-interface options.", url)
),
_ => Err(format!("RPC io error: {}", err)),
},
Err(e) => Err(format!("RPC error: {:?}", e)),
Err(HttpServerError::Io(ref err)) if err.kind() == io::ErrorKind::AddrInUse => Err(
format!("HTTP address {} is already in use, make sure that another instance of an Ethereum client is not running or change the address using the --jsonrpc-port and --jsonrpc-interface options.", url)
),
Err(e) => Err(format!("HTTP error: {:?}", e)),
}
}
fn into_domains<T: From<String>>(items: Option<Vec<String>>) -> DomainsValidation<T> {
items.map(|vals| vals.into_iter().map(T::from).collect()).into()
}
pub fn new_ipc<D: rpc_apis::Dependencies>(
conf: IpcConfiguration,
dependencies: &Dependencies<D>
@@ -170,18 +244,19 @@ pub fn new_ipc<D: rpc_apis::Dependencies>(
if !conf.enabled {
return Ok(None);
}
let handler = setup_apis(conf.apis, dependencies);
let remote = dependencies.remote.clone();
match rpc::start_ipc(&conf.socket_addr, handler, remote, RpcExtractor) {
Ok(server) => Ok(Some(server)),
Err(io_error) => Err(format!("RPC io error: {}", io_error)),
Err(io_error) => Err(format!("IPC error: {}", io_error)),
}
}
#[cfg(test)]
mod tests {
use super::RpcExtractor;
use ethcore_rpc::{HttpMetaExtractor, Origin};
use parity_rpc::{HttpMetaExtractor, Origin};
#[test]
fn should_extract_rpc_origin() {

View File

@@ -20,15 +20,15 @@ use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
pub use ethcore_rpc::SignerService;
pub use parity_rpc::SignerService;
use ethcore::account_provider::AccountProvider;
use ethcore::client::Client;
use ethcore::miner::{Miner, ExternalMiner};
use ethcore::snapshot::SnapshotService;
use ethcore_rpc::{Metadata, NetworkSettings};
use ethcore_rpc::informant::{ActivityNotifier, Middleware, RpcStats, ClientNotifier};
use ethcore_rpc::dispatch::{FullDispatcher, LightDispatcher};
use parity_rpc::{Metadata, NetworkSettings};
use parity_rpc::informant::{ActivityNotifier, Middleware, RpcStats, ClientNotifier};
use parity_rpc::dispatch::{FullDispatcher, LightDispatcher};
use ethsync::{ManageNetwork, SyncProvider, LightSync};
use hash_fetch::fetch::Client as FetchClient;
use jsonrpc_core::{MetaIoHandler};
@@ -203,7 +203,7 @@ impl Dependencies for FullDependencies {
}
fn extend_with_set(&self, handler: &mut MetaIoHandler<Metadata, Middleware>, apis: &[Api]) {
use ethcore_rpc::v1::*;
use parity_rpc::v1::*;
macro_rules! add_signing_methods {
($namespace:ident, $handler:expr, $deps:expr) => {
@@ -331,7 +331,7 @@ impl Dependencies for LightDependencies {
fn activity_notifier(&self) -> Self::Notifier { LightClientNotifier }
fn extend_with_set(&self, handler: &mut MetaIoHandler<Metadata, Middleware<Self::Notifier>>, apis: &[Api]) {
use ethcore_rpc::v1::*;
use parity_rpc::v1::*;
let dispatcher = LightDispatcher::new(
self.sync.clone(),

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use std::net::{TcpListener};
use ctrlc::CtrlC;
use fdlimit::raise_fd_limit;
use ethcore_rpc::{NetworkSettings, informant, is_major_importing};
use parity_rpc::{NetworkSettings, informant, is_major_importing};
use ethsync::NetworkConfiguration;
use util::{Colour, version, Mutex, Condvar};
use io::{MayPanic, ForwardPanic, PanicHandler};
@@ -80,6 +80,7 @@ pub struct RunCmd {
pub daemon: Option<String>,
pub logger_config: LogConfig,
pub miner_options: MinerOptions,
pub ws_conf: rpc::WsConfiguration,
pub http_conf: rpc::HttpConfiguration,
pub ipc_conf: rpc::IpcConfiguration,
pub net_conf: NetworkConfiguration,
@@ -295,7 +296,8 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
};
// start rpc servers
let _http_server = rpc::new_http(cmd.http_conf, &dependencies, None)?;
let _ws_server = rpc::new_ws(cmd.ws_conf, &dependencies)?;
let _http_server = rpc::new_http(cmd.http_conf.clone(), &dependencies, None)?;
let _ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
// the signer server
@@ -636,6 +638,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps)?;
// start rpc servers
let ws_server = rpc::new_ws(cmd.ws_conf, &dependencies)?;
let http_server = rpc::new_http(cmd.http_conf.clone(), &dependencies, dapps_middleware)?;
let ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
@@ -716,7 +719,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
let restart = wait_for_exit(panic_handler, Some(updater), Some(client), can_restart);
// drop this stuff as soon as exit detected.
drop((http_server, ipc_server, signer_server, secretstore_key_server, ipfs_server, event_loop));
drop((ws_server, http_server, ipc_server, signer_server, secretstore_key_server, ipfs_server, event_loop));
info!("Finishing work, please wait...");

View File

@@ -22,8 +22,8 @@ pub use ethcore_signer::Server as SignerServer;
use ansi_term::Colour;
use dir::default_data_path;
use ethcore_rpc::informant::RpcStats;
use ethcore_rpc::{self, ConfirmationsQueue};
use parity_rpc::informant::RpcStats;
use parity_rpc::{self, ConfirmationsQueue};
use ethcore_signer as signer;
use helpers::replace_home;
use parity_reactor::TokioRemote;
@@ -69,10 +69,10 @@ pub struct NewToken {
#[derive(Debug, Default, Clone)]
pub struct StandardExtractor;
impl signer::MetaExtractor<ethcore_rpc::Metadata> for StandardExtractor {
fn extract_metadata(&self, session: &H256) -> ethcore_rpc::Metadata {
let mut metadata = ethcore_rpc::Metadata::default();
metadata.origin = ethcore_rpc::Origin::Signer((*session).into());
impl signer::MetaExtractor<parity_rpc::Metadata> for StandardExtractor {
fn extract_metadata(&self, session: &H256) -> parity_rpc::Metadata {
let mut metadata = parity_rpc::Metadata::default();
metadata.origin = parity_rpc::Origin::Signer((*session).into());
metadata
}
}