Asynchronous RPC support (#2017)

* Async RPC

* Limiting number of transactions in queue

* Fixing tests

* Bumping serde and jsonrpc-core

* serde updated to 0.8

* fixed failing tests

* Bumping ipc server

* Fixing API for endpoints

* Experimenting with tests without --release mode
This commit is contained in:
Tomasz Drwięga
2016-09-01 12:00:00 +02:00
committed by Arkadiy Paronyan
parent ca03cfa58a
commit b4f3c4bd7a
43 changed files with 657 additions and 515 deletions

View File

@@ -26,7 +26,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool};
use rustc_serialize::hex::FromHex;
use hyper::Control;
use hyper;
use hyper::status::StatusCode;
use random_filename;
@@ -85,7 +85,7 @@ impl<R: URLHint> AppFetcher<R> {
}
}
pub fn to_handler(&self, path: EndpointPath, control: Control) -> Box<Handler> {
pub fn to_async_handler(&self, path: EndpointPath, control: hyper::Control) -> Box<Handler> {
let mut dapps = self.dapps.lock();
let app_id = path.app_id.clone();
@@ -94,7 +94,7 @@ impl<R: URLHint> AppFetcher<R> {
match status {
// Just server dapp
Some(&mut ContentStatus::Ready(ref endpoint)) => {
(None, endpoint.to_handler(path))
(None, endpoint.to_async_handler(path, control))
},
// App is already being fetched
Some(&mut ContentStatus::Fetching(_)) => {

View File

@@ -16,7 +16,7 @@
//! URL Endpoint traits
use hyper::{server, net};
use hyper::{self, server, net};
use std::collections::BTreeMap;
#[derive(Debug, PartialEq, Default, Clone)]
@@ -43,4 +43,8 @@ pub trait Endpoint : Send + Sync {
fn info(&self) -> Option<&EndpointInfo> { None }
fn to_handler(&self, path: EndpointPath) -> Box<Handler>;
fn to_async_handler(&self, path: EndpointPath, _control: hyper::Control) -> Box<Handler> {
self.to_handler(path)
}
}

View File

@@ -73,20 +73,20 @@ impl<A: Authorization + 'static> server::Handler<HttpStream> for Router<A> {
// Choose proper handler depending on path / domain
let url = extract_url(&req);
let endpoint = extract_endpoint(&url);
let control = self.control.take().expect("on_request is called only once; control is always defined at start; qed");
self.handler = match endpoint {
// First check special endpoints
(ref path, ref endpoint) if self.special.contains_key(endpoint) => {
self.special.get(endpoint).unwrap().to_handler(path.clone().unwrap_or_default())
self.special.get(endpoint).unwrap().to_async_handler(path.clone().unwrap_or_default(), control)
},
// Then delegate to dapp
(Some(ref path), _) if self.endpoints.contains_key(&path.app_id) => {
self.endpoints.get(&path.app_id).unwrap().to_handler(path.clone())
self.endpoints.get(&path.app_id).unwrap().to_async_handler(path.clone(), control)
},
// Try to resolve and fetch the dapp
(Some(ref path), _) if self.fetch.contains(&path.app_id) => {
let control = self.control.take().expect("on_request is called only once, thus control is always defined.");
self.fetch.to_handler(path.clone(), control)
self.fetch.to_async_handler(path.clone(), control)
},
// Redirection to main page (maybe 404 instead?)
(Some(ref path), _) if *req.method() == hyper::method::Method::Get => {
@@ -100,7 +100,7 @@ impl<A: Authorization + 'static> server::Handler<HttpStream> for Router<A> {
},
// RPC by default
_ => {
self.special.get(&SpecialEndpoint::Rpc).unwrap().to_handler(EndpointPath::default())
self.special.get(&SpecialEndpoint::Rpc).unwrap().to_async_handler(EndpointPath::default(), control)
}
};
@@ -135,7 +135,7 @@ impl<A: Authorization> Router<A> {
allowed_hosts: Option<Vec<String>>,
) -> Self {
let handler = special.get(&SpecialEndpoint::Rpc).unwrap().to_handler(EndpointPath::default());
let handler = special.get(&SpecialEndpoint::Api).unwrap().to_handler(EndpointPath::default());
Router {
control: Some(control),
main_page: main_page,

View File

@@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::{Arc, Mutex};
use hyper;
use jsonrpc_core::IoHandler;
use jsonrpc_http_server::{ServerHandler, PanicHandler, AccessControlAllowOrigin};
use endpoint::{Endpoint, EndpointPath, Handler};
@@ -38,7 +39,17 @@ struct RpcEndpoint {
impl Endpoint for RpcEndpoint {
fn to_handler(&self, _path: EndpointPath) -> Box<Handler> {
panic!("RPC Endpoint is asynchronous and requires Control object.");
}
fn to_async_handler(&self, _path: EndpointPath, control: hyper::Control) -> Box<Handler> {
let panic_handler = PanicHandler { handler: self.panic_handler.clone() };
Box::new(ServerHandler::new(self.handler.clone(), self.cors_domain.clone(), self.allowed_hosts.clone(), panic_handler))
Box::new(ServerHandler::new(
self.handler.clone(),
self.cors_domain.clone(),
self.allowed_hosts.clone(),
panic_handler,
control,
))
}
}