New jsonrpc-core with futures and metadata support (#3859)

* Bumping serde & serde_json

* Super-initial usage of new jsonrpc

* Single event loop for jsonrpc

* Metadata

* Supporting metadata extraction for eth_accounts

* Fixing Cargo.lock

* Removing uneccessary clones

* Fixing unused import

* Unused import

* Fixing test
This commit is contained in:
Tomasz Drwięga
2017-01-11 20:02:27 +01:00
committed by Gav Wood
parent c4d96a64a2
commit 41da1a0a79
45 changed files with 706 additions and 642 deletions

View File

@@ -15,30 +15,6 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Ethcore Webapplications for Parity
//! ```
//! extern crate jsonrpc_core;
//! extern crate ethcore_dapps;
//!
//! use std::sync::Arc;
//! use jsonrpc_core::IoHandler;
//! use ethcore_dapps::*;
//!
//! struct SayHello;
//! impl MethodCommand for SayHello {
//! fn execute(&self, _params: Params) -> Result<Value, Error> {
//! Ok(Value::String("hello".to_string()))
//! }
//! }
//!
//! fn main() {
//! let io = IoHandler::new();
//! io.add_method("say_hello", SayHello);
//! let _server = Server::start_unsecure_http(
//! &"127.0.0.1:3030".parse().unwrap(),
//! Arc::new(io)
//! );
//! }
//! ```
//!
#![warn(missing_docs)]
#![cfg_attr(feature="nightly", plugin(clippy))]
@@ -93,11 +69,11 @@ use std::sync::{Arc, Mutex};
use std::net::SocketAddr;
use std::collections::HashMap;
use hash_fetch::urlhint::ContractClient;
use ethcore_rpc::Metadata;
use fetch::{Fetch, Client as FetchClient};
use jsonrpc_core::{IoHandler, IoDelegate};
use hash_fetch::urlhint::ContractClient;
use jsonrpc_core::reactor::RpcHandler;
use router::auth::{Authorization, NoAuth, HttpBasicAuth};
use ethcore_rpc::Extendable;
use parity_reactor::Remote;
use self::apps::{HOME_PAGE, DAPPS_DOMAIN};
@@ -126,32 +102,26 @@ impl<F> WebProxyTokens for F where F: Fn(String) -> bool + Send + Sync {
pub struct ServerBuilder<T: Fetch = FetchClient> {
dapps_path: PathBuf,
extra_dapps: Vec<PathBuf>,
handler: Arc<IoHandler>,
registrar: Arc<ContractClient>,
sync_status: Arc<SyncStatus>,
web_proxy_tokens: Arc<WebProxyTokens>,
signer_address: Option<(String, u16)>,
allowed_hosts: Option<Vec<String>>,
remote: Remote,
fetch: Option<T>,
}
impl<T: Fetch> Extendable for ServerBuilder<T> {
fn add_delegate<D: Send + Sync + 'static>(&self, delegate: IoDelegate<D>) {
self.handler.add_delegate(delegate);
}
}
impl ServerBuilder {
/// Construct new dapps server
pub fn new<P: AsRef<Path>>(dapps_path: P, registrar: Arc<ContractClient>, remote: Remote) -> Self {
ServerBuilder {
dapps_path: dapps_path.as_ref().to_owned(),
extra_dapps: vec![],
handler: Arc::new(IoHandler::new()),
registrar: registrar,
sync_status: Arc::new(|| false),
web_proxy_tokens: Arc::new(|_| false),
signer_address: None,
allowed_hosts: Some(vec![]),
remote: remote,
fetch: None,
}
@@ -164,11 +134,11 @@ impl<T: Fetch> ServerBuilder<T> {
ServerBuilder {
dapps_path: self.dapps_path,
extra_dapps: vec![],
handler: self.handler,
registrar: self.registrar,
sync_status: self.sync_status,
web_proxy_tokens: self.web_proxy_tokens,
signer_address: self.signer_address,
allowed_hosts: self.allowed_hosts,
remote: self.remote,
fetch: Some(fetch),
}
@@ -192,6 +162,14 @@ impl<T: Fetch> ServerBuilder<T> {
self
}
/// Change allowed hosts.
/// `None` - All hosts are allowed
/// `Some(whitelist)` - Allow only whitelisted hosts (+ listen address)
pub fn allowed_hosts(mut self, allowed_hosts: Option<Vec<String>>) -> Self {
self.allowed_hosts = allowed_hosts;
self
}
/// Change extra dapps paths (apart from `dapps_path`)
pub fn extra_dapps<P: AsRef<Path>>(mut self, extra_dapps: &[P]) -> Self {
self.extra_dapps = extra_dapps.iter().map(|p| p.as_ref().to_owned()).collect();
@@ -200,39 +178,41 @@ impl<T: Fetch> ServerBuilder<T> {
/// Asynchronously start server with no authentication,
/// returns result with `Server` handle on success or an error.
pub fn start_unsecured_http(self, addr: &SocketAddr, hosts: Option<Vec<String>>) -> Result<Server, ServerError> {
pub fn start_unsecured_http(self, addr: &SocketAddr, handler: RpcHandler<Metadata>) -> Result<Server, ServerError> {
let fetch = self.fetch_client()?;
Server::start_http(
addr,
hosts,
self.allowed_hosts,
NoAuth,
self.handler.clone(),
self.dapps_path.clone(),
self.extra_dapps.clone(),
self.signer_address.clone(),
self.registrar.clone(),
self.sync_status.clone(),
self.web_proxy_tokens.clone(),
self.remote.clone(),
self.fetch_client()?,
handler,
self.dapps_path,
self.extra_dapps,
self.signer_address,
self.registrar,
self.sync_status,
self.web_proxy_tokens,
self.remote,
fetch,
)
}
/// Asynchronously start server with `HTTP Basic Authentication`,
/// return result with `Server` handle on success or an error.
pub fn start_basic_auth_http(self, addr: &SocketAddr, hosts: Option<Vec<String>>, username: &str, password: &str) -> Result<Server, ServerError> {
pub fn start_basic_auth_http(self, addr: &SocketAddr, username: &str, password: &str, handler: RpcHandler<Metadata>) -> Result<Server, ServerError> {
let fetch = self.fetch_client()?;
Server::start_http(
addr,
hosts,
self.allowed_hosts,
HttpBasicAuth::single_user(username, password),
self.handler.clone(),
self.dapps_path.clone(),
self.extra_dapps.clone(),
self.signer_address.clone(),
self.registrar.clone(),
self.sync_status.clone(),
self.web_proxy_tokens.clone(),
self.remote.clone(),
self.fetch_client()?,
handler,
self.dapps_path,
self.extra_dapps,
self.signer_address,
self.registrar,
self.sync_status,
self.web_proxy_tokens,
self.remote,
fetch,
)
}
@@ -281,7 +261,7 @@ impl Server {
addr: &SocketAddr,
hosts: Option<Vec<String>>,
authorization: A,
handler: Arc<IoHandler>,
handler: RpcHandler<Metadata>,
dapps_path: PathBuf,
extra_dapps: Vec<PathBuf>,
signer_address: Option<(String, u16)>,

View File

@@ -17,13 +17,15 @@
use std::sync::{Arc, Mutex};
use hyper;
use jsonrpc_core::{IoHandler, ResponseHandler, Request, Response};
use jsonrpc_http_server::{ServerHandler, PanicHandler, AccessControlAllowOrigin, RpcHandler};
use ethcore_rpc::{Metadata, Origin};
use jsonrpc_core::reactor::RpcHandler;
use jsonrpc_http_server::{Rpc, ServerHandler, PanicHandler, AccessControlAllowOrigin, HttpMetaExtractor};
use endpoint::{Endpoint, EndpointPath, Handler};
pub fn rpc(handler: Arc<IoHandler>, panic_handler: Arc<Mutex<Option<Box<Fn() -> () + Send>>>>) -> Box<Endpoint> {
pub fn rpc(handler: RpcHandler<Metadata>, panic_handler: Arc<Mutex<Option<Box<Fn() -> () + Send>>>>) -> Box<Endpoint> {
Box::new(RpcEndpoint {
handler: Arc::new(RpcMiddleware::new(handler)),
handler: handler,
meta_extractor: Arc::new(MetadataExtractor),
panic_handler: panic_handler,
cors_domain: None,
// NOTE [ToDr] We don't need to do any hosts validation here. It's already done in router.
@@ -32,7 +34,8 @@ pub fn rpc(handler: Arc<IoHandler>, panic_handler: Arc<Mutex<Option<Box<Fn() ->
}
struct RpcEndpoint {
handler: Arc<RpcMiddleware>,
handler: RpcHandler<Metadata>,
meta_extractor: Arc<HttpMetaExtractor<Metadata>>,
panic_handler: Arc<Mutex<Option<Box<Fn() -> () + Send>>>>,
cors_domain: Option<Vec<AccessControlAllowOrigin>>,
allowed_hosts: Option<Vec<String>>,
@@ -42,7 +45,7 @@ impl Endpoint for RpcEndpoint {
fn to_async_handler(&self, _path: EndpointPath, control: hyper::Control) -> Box<Handler> {
let panic_handler = PanicHandler { handler: self.panic_handler.clone() };
Box::new(ServerHandler::new(
self.handler.clone(),
Rpc::new(self.handler.clone(), self.meta_extractor.clone()),
self.cors_domain.clone(),
self.allowed_hosts.clone(),
panic_handler,
@@ -51,85 +54,19 @@ impl Endpoint for RpcEndpoint {
}
}
struct RpcMiddleware {
handler: Arc<IoHandler>,
methods: Vec<String>,
}
impl RpcMiddleware {
fn new(handler: Arc<IoHandler>) -> Self {
RpcMiddleware {
handler: handler,
methods: vec!["eth_accounts".into(), "parity_accountsInfo".into()],
}
}
/// Appends additional parameter for specific calls.
fn augment_request(&self, request: &mut Request, meta: Option<Meta>) {
use jsonrpc_core::{Call, Params, to_value};
fn augment_call(call: &mut Call, meta: Option<&Meta>, methods: &Vec<String>) {
match (call, meta) {
(&mut Call::MethodCall(ref mut method_call), Some(meta)) if methods.contains(&method_call.method) => {
let session = to_value(&meta.app_id);
let params = match method_call.params {
Some(Params::Array(ref vec)) if vec.len() == 0 => Some(Params::Array(vec![session])),
// invalid params otherwise
_ => None,
};
method_call.params = params;
},
_ => {}
}
}
match *request {
Request::Single(ref mut call) => augment_call(call, meta.as_ref(), &self.methods),
Request::Batch(ref mut vec) => {
for mut call in vec {
augment_call(call, meta.as_ref(), &self.methods)
}
},
}
}
}
#[derive(Debug)]
struct Meta {
app_id: String,
}
impl RpcHandler for RpcMiddleware {
type Metadata = Meta;
fn read_metadata(&self, request: &hyper::server::Request<hyper::net::HttpStream>) -> Option<Self::Metadata> {
request.headers().get::<hyper::header::Referer>()
struct MetadataExtractor;
impl HttpMetaExtractor<Metadata> for MetadataExtractor {
fn read_metadata(&self, request: &hyper::server::Request<hyper::net::HttpStream>) -> Metadata {
let dapp_id = request.headers().get::<hyper::header::Referer>()
.and_then(|referer| hyper::Url::parse(referer).ok())
.and_then(|url| {
url.path_segments()
.and_then(|mut split| split.next())
.map(|app_id| Meta {
app_id: app_id.to_owned(),
})
})
}
fn handle_request<H>(&self, request_str: &str, response_handler: H, meta: Option<Self::Metadata>) where
H: ResponseHandler<Option<String>, Option<String>> + 'static
{
let handler = IoHandler::convert_handler(response_handler);
let request = IoHandler::read_request(request_str);
trace!(target: "rpc", "Request metadata: {:?}", meta);
match request {
Ok(mut request) => {
self.augment_request(&mut request, meta);
self.handler.request_handler().handle_request(request, handler, None)
},
Err(error) => handler.send(Some(Response::from(error))),
.map(|app_id| app_id.to_owned())
});
Metadata {
dapp_id: dapp_id,
origin: Origin::Dapps,
}
}
}

View File

@@ -16,8 +16,11 @@
use std::env;
use std::str;
use std::ops::Deref;
use std::sync::Arc;
use env_logger::LogBuilder;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_core::reactor::RpcEventLoop;
use ServerBuilder;
use Server;
@@ -42,7 +45,20 @@ fn init_logger() {
}
}
pub fn init_server<F, B>(hosts: Option<Vec<String>>, process: F, remote: Remote) -> (Server, Arc<FakeRegistrar>) where
pub struct ServerLoop {
pub server: Server,
pub event_loop: RpcEventLoop,
}
impl Deref for ServerLoop {
type Target = Server;
fn deref(&self) -> &Self::Target {
&self.server
}
}
pub fn init_server<F, B>(process: F, remote: Remote) -> (ServerLoop, Arc<FakeRegistrar>) where
F: FnOnce(ServerBuilder) -> ServerBuilder<B>,
B: Fetch,
{
@@ -50,60 +66,76 @@ pub fn init_server<F, B>(hosts: Option<Vec<String>>, process: F, remote: Remote)
let registrar = Arc::new(FakeRegistrar::new());
let mut dapps_path = env::temp_dir();
dapps_path.push("non-existent-dir-to-prevent-fs-files-from-loading");
// TODO [ToDr] When https://github.com/ethcore/jsonrpc/issues/26 is resolved
// this additional EventLoop wouldn't be needed, we should be able to re-use remote.
let event_loop = RpcEventLoop::spawn();
let handler = event_loop.handler(Arc::new(MetaIoHandler::default()));
let server = process(ServerBuilder::new(
&dapps_path, registrar.clone(), remote,
))
.signer_address(Some(("127.0.0.1".into(), SIGNER_PORT)))
.start_unsecured_http(&"127.0.0.1:0".parse().unwrap(), hosts).unwrap();
.start_unsecured_http(&"127.0.0.1:0".parse().unwrap(), handler).unwrap();
(
server,
ServerLoop { server: server, event_loop: event_loop },
registrar,
)
}
pub fn serve_with_auth(user: &str, pass: &str) -> Server {
pub fn serve_with_auth(user: &str, pass: &str) -> ServerLoop {
init_logger();
let registrar = Arc::new(FakeRegistrar::new());
let mut dapps_path = env::temp_dir();
dapps_path.push("non-existent-dir-to-prevent-fs-files-from-loading");
ServerBuilder::new(&dapps_path, registrar.clone(), Remote::new_sync())
let event_loop = RpcEventLoop::spawn();
let handler = event_loop.handler(Arc::new(MetaIoHandler::default()));
let server = ServerBuilder::new(&dapps_path, registrar, Remote::new(event_loop.remote()))
.signer_address(Some(("127.0.0.1".into(), SIGNER_PORT)))
.start_basic_auth_http(&"127.0.0.1:0".parse().unwrap(), None, user, pass).unwrap()
.allowed_hosts(None)
.start_basic_auth_http(&"127.0.0.1:0".parse().unwrap(), user, pass, handler).unwrap();
ServerLoop {
server: server,
event_loop: event_loop,
}
}
pub fn serve_hosts(hosts: Option<Vec<String>>) -> Server {
init_server(hosts, |builder| builder, Remote::new_sync()).0
pub fn serve_hosts(hosts: Option<Vec<String>>) -> ServerLoop {
init_server(|builder| builder.allowed_hosts(hosts), Remote::new_sync()).0
}
pub fn serve_with_registrar() -> (Server, Arc<FakeRegistrar>) {
init_server(None, |builder| builder, Remote::new_sync())
pub fn serve_with_registrar() -> (ServerLoop, Arc<FakeRegistrar>) {
init_server(|builder| builder.allowed_hosts(None), Remote::new_sync())
}
pub fn serve_with_registrar_and_sync() -> (Server, Arc<FakeRegistrar>) {
init_server(None, |builder| {
builder.sync_status(Arc::new(|| true))
pub fn serve_with_registrar_and_sync() -> (ServerLoop, Arc<FakeRegistrar>) {
init_server(|builder| {
builder
.sync_status(Arc::new(|| true))
.allowed_hosts(None)
}, Remote::new_sync())
}
pub fn serve_with_registrar_and_fetch() -> (Server, FakeFetch, Arc<FakeRegistrar>) {
pub fn serve_with_registrar_and_fetch() -> (ServerLoop, FakeFetch, Arc<FakeRegistrar>) {
serve_with_registrar_and_fetch_and_threads(false)
}
pub fn serve_with_registrar_and_fetch_and_threads(multi_threaded: bool) -> (Server, FakeFetch, Arc<FakeRegistrar>) {
pub fn serve_with_registrar_and_fetch_and_threads(multi_threaded: bool) -> (ServerLoop, FakeFetch, Arc<FakeRegistrar>) {
let fetch = FakeFetch::default();
let f = fetch.clone();
let (server, reg) = init_server(None, move |builder| {
builder.fetch(f.clone())
let (server, reg) = init_server(move |builder| {
builder.allowed_hosts(None).fetch(f.clone())
}, if multi_threaded { Remote::new_thread_per_future() } else { Remote::new_sync() });
(server, fetch, reg)
}
pub fn serve_with_fetch(web_token: &'static str) -> (Server, FakeFetch) {
pub fn serve_with_fetch(web_token: &'static str) -> (ServerLoop, FakeFetch) {
let fetch = FakeFetch::default();
let f = fetch.clone();
let (server, _) = init_server(None, move |builder| {
let (server, _) = init_server(move |builder| {
builder
.allowed_hosts(None)
.fetch(f.clone())
.web_proxy_tokens(Arc::new(move |token| &token == web_token))
}, Remote::new_sync());
@@ -111,11 +143,11 @@ pub fn serve_with_fetch(web_token: &'static str) -> (Server, FakeFetch) {
(server, fetch)
}
pub fn serve() -> Server {
init_server(None, |builder| builder, Remote::new_sync()).0
pub fn serve() -> ServerLoop {
init_server(|builder| builder.allowed_hosts(None), Remote::new_sync()).0
}
pub fn request(server: Server, request: &str) -> http_client::Response {
pub fn request(server: ServerLoop, request: &str) -> http_client::Response {
http_client::request(server.addr(), request)
}