From c4e2f650513f70f4e2f80592e8e191882ad84931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 27 May 2016 17:46:15 +0200 Subject: [PATCH] Exposing RPC over websockets --- Cargo.lock | 1 + parity/main.rs | 5 ++ parity/signer.rs | 47 ++++++++---- rpc/src/v1/types/transaction_request.rs | 2 +- signer/Cargo.toml | 1 + signer/src/lib.rs | 1 + signer/src/signing_queue.rs | 2 +- signer/src/{ws_server.rs => ws_server/mod.rs} | 73 ++++++++++--------- signer/src/ws_server/session.rs | 59 +++++++++++++++ 9 files changed, 140 insertions(+), 51 deletions(-) rename signer/src/{ws_server.rs => ws_server/mod.rs} (70%) create mode 100644 signer/src/ws_server/session.rs diff --git a/Cargo.lock b/Cargo.lock index 6298ef7ea..7950ecc4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -354,6 +354,7 @@ dependencies = [ "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-rpc 1.2.0", "ethcore-util 1.2.0", + "jsonrpc-core 2.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/parity/main.rs b/parity/main.rs index 1e9ab33f5..61aa8ab21 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -244,6 +244,11 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) port: conf.args.flag_signer_port, }, signer::Dependencies { panic_handler: panic_handler.clone(), + client: client.clone(), + sync: sync.clone(), + secret_store: account_service.clone(), + miner: miner.clone(), + external_miner: external_miner.clone(), }); // Register IO handler diff --git a/parity/signer.rs b/parity/signer.rs index e7f7a9cb9..c4a540721 100644 --- a/parity/signer.rs +++ b/parity/signer.rs @@ -15,6 +15,10 @@ // along with Parity. If not, see . use std::sync::Arc; +use ethcore::client::Client; +use ethsync::EthSync; +use ethminer::{Miner, ExternalMiner}; +use util::keys::store::AccountService; use util::panics::{PanicHandler, ForwardPanic}; use die::*; @@ -32,36 +36,51 @@ pub struct Configuration { pub struct Dependencies { pub panic_handler: Arc, + pub client: Arc, + pub sync: Arc, + pub secret_store: Arc, + pub miner: Arc, + pub external_miner: Arc, +} + +pub fn start(conf: Configuration, deps: Dependencies) -> Option { + if !conf.enabled { + None + } else { + Some(do_start(conf, deps)) + } } #[cfg(feature = "ethcore-signer")] -pub fn start(conf: Configuration, deps: Dependencies) -> Option { - if !conf.enabled { - return None; - } +fn do_start(conf: Configuration, deps: Dependencies) -> SignerServer { + let addr = format!("127.0.0.1:{}", conf.port).parse().unwrap_or_else(|_| { + die!("Invalid port specified: {}", conf.port) + }); - let addr = format!("127.0.0.1:{}", conf.port).parse().unwrap_or_else(|_| die!("Invalid port specified: {}", conf.port)); - let start_result = signer::Server::start(addr); + let start_result = { + use ethcore_rpc::v1::*; + let server = signer::ServerBuilder::new(); + server.add_delegate(Web3Client::new().to_delegate()); + server.add_delegate(NetClient::new(&deps.sync).to_delegate()); + server.add_delegate(EthClient::new(&deps.client, &deps.sync, &deps.secret_store, &deps.miner, &deps.external_miner).to_delegate()); + server.add_delegate(EthFilterClient::new(&deps.client, &deps.miner).to_delegate()); + server.add_delegate(PersonalClient::new(&deps.secret_store).to_delegate()); + server.start(addr) + }; match start_result { Err(signer::ServerError::IoError(err)) => die_with_io_error("Trusted Signer", err), Err(e) => die!("Trusted Signer: {:?}", e), Ok(server) => { deps.panic_handler.forward_from(&server); - Some(server) + server }, } } #[cfg(not(feature = "ethcore-signer"))] -pub fn start(conf: Configuration) -> !{ - if !conf.enabled { - return None; - } - +fn do_start(conf: Configuration) -> ! { die!("Your Parity version has been compiled without Trusted Signer support.") } - - diff --git a/rpc/src/v1/types/transaction_request.rs b/rpc/src/v1/types/transaction_request.rs index 2fc806912..1b51e6b12 100644 --- a/rpc/src/v1/types/transaction_request.rs +++ b/rpc/src/v1/types/transaction_request.rs @@ -47,7 +47,7 @@ mod tests { use serde_json; use util::numbers::{U256}; use util::hash::Address; - use types::bytes::Bytes; + use v1::types::bytes::Bytes; use super::*; #[test] diff --git a/signer/Cargo.toml b/signer/Cargo.toml index 59c7f90b2..51b1b1e8d 100644 --- a/signer/Cargo.toml +++ b/signer/Cargo.toml @@ -16,6 +16,7 @@ syntex = "^0.32.0" serde = "0.7.0" serde_json = "0.7.0" rustc-serialize = "0.3" +jsonrpc-core = "2.0" log = "0.3" env_logger = "0.3" ws = "0.4.7" diff --git a/signer/src/lib.rs b/signer/src/lib.rs index 4317338e0..a39fe68f0 100644 --- a/signer/src/lib.rs +++ b/signer/src/lib.rs @@ -50,6 +50,7 @@ extern crate rustc_serialize; extern crate ethcore_util as util; extern crate ethcore_rpc as rpc; +extern crate jsonrpc_core; extern crate ws; mod signing_queue; diff --git a/signer/src/signing_queue.rs b/signer/src/signing_queue.rs index d320d2375..611d467c2 100644 --- a/signer/src/signing_queue.rs +++ b/signer/src/signing_queue.rs @@ -45,7 +45,7 @@ mod test { use std::collections::HashSet; use util::hash::Address; use util::numbers::U256; - use rpc::v1::types::transaction_request::TransactionRequest; + use rpc::v1::types::TransactionRequest; use super::*; #[test] diff --git a/signer/src/ws_server.rs b/signer/src/ws_server/mod.rs similarity index 70% rename from signer/src/ws_server.rs rename to signer/src/ws_server/mod.rs index d2ab02d66..bb10bc5c1 100644 --- a/signer/src/ws_server.rs +++ b/signer/src/ws_server/mod.rs @@ -19,11 +19,14 @@ use ws; use std; use std::thread; +use std::default::Default; use std::ops::Drop; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::net::SocketAddr; use util::panics::{PanicHandler, OnPanicListener, MayPanic}; +use jsonrpc_core::{IoHandler, IoDelegate}; + +mod session; /// Signer startup error #[derive(Debug)] @@ -43,9 +46,40 @@ impl From for ServerError { } } +/// Builder for `WebSockets` server +pub struct ServerBuilder { + handler: Arc, +} + +impl Default for ServerBuilder { + fn default() -> Self { + ServerBuilder::new() + } +} + +impl ServerBuilder { + /// Creates new `ServerBuilder` + pub fn new() -> Self { + ServerBuilder { + handler: Arc::new(IoHandler::new()) + } + } + + /// Adds rpc delegate + pub fn add_delegate(&self, delegate: IoDelegate) where D: Send + Sync + 'static { + self.handler.add_delegate(delegate); + } + + /// Starts a new `WebSocket` server in separate thread. + /// Returns a `Server` handle which closes the server when droped. + pub fn start(self, addr: SocketAddr) -> Result { + Server::start(addr, self.handler) + } +} + /// `WebSockets` server implementation. pub struct Server { - handle: Option>>, + handle: Option>>, broadcaster: ws::Sender, panic_handler: Arc, } @@ -53,7 +87,7 @@ pub struct Server { impl Server { /// Starts a new `WebSocket` server in separate thread. /// Returns a `Server` handle which closes the server when droped. - pub fn start(addr: SocketAddr) -> Result { + pub fn start(addr: SocketAddr, handler: Arc) -> Result { let config = { let mut config = ws::Settings::default(); config.max_connections = 5; @@ -62,10 +96,7 @@ impl Server { }; // Create WebSocket - let session_id = Arc::new(AtomicUsize::new(1)); - let ws = try!(ws::Builder::new().with_settings(config).build(Factory { - session_id: session_id, - })); + let ws = try!(ws::Builder::new().with_settings(config).build(session::Factory::new(handler))); let panic_handler = PanicHandler::new_in_arc(); let ph = panic_handler.clone(); @@ -98,31 +129,3 @@ impl Drop for Server { self.handle.take().unwrap().join().unwrap(); } } - -struct Session { - id: usize, - out: ws::Sender, -} - -impl ws::Handler for Session { - fn on_open(&mut self, _shake: ws::Handshake) -> ws::Result<()> { - try!(self.out.send(format!("Hello client no: {}. We are not implemented yet.", self.id))); - try!(self.out.close(ws::CloseCode::Normal)); - Ok(()) - } -} - -struct Factory { - session_id: Arc, -} - -impl ws::Factory for Factory { - type Handler = Session; - - fn connection_made(&mut self, sender: ws::Sender) -> Self::Handler { - Session { - id: self.session_id.fetch_add(1, Ordering::SeqCst), - out: sender, - } - } -} diff --git a/signer/src/ws_server/session.rs b/signer/src/ws_server/session.rs new file mode 100644 index 000000000..258e05d5b --- /dev/null +++ b/signer/src/ws_server/session.rs @@ -0,0 +1,59 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Session handlers factory. + +use ws; +use std::sync::Arc; +use jsonrpc_core::IoHandler; + +pub struct Session { + out: ws::Sender, + handler: Arc, +} + +impl ws::Handler for Session { + fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { + let req = try!(msg.as_text()); + match self.handler.handle_request(req) { + Some(res) => self.out.send(res), + None => Ok(()), + } + } +} + +pub struct Factory { + handler: Arc, +} + +impl Factory { + pub fn new(handler: Arc) -> Self { + Factory { + handler: handler, + } + } +} + +impl ws::Factory for Factory { + type Handler = Session; + + fn connection_made(&mut self, sender: ws::Sender) -> Self::Handler { + Session { + out: sender, + handler: self.handler.clone(), + } + } +}