// Copyright 2015-2019 Parity Technologies (UK) Ltd. // This file is part of Parity Ethereum. // Parity Ethereum is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Ethereum is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Ethereum. If not, see . extern crate cid; extern crate multihash; extern crate unicase; extern crate ethcore; extern crate ethereum_types; extern crate jsonrpc_core; extern crate jsonrpc_http_server as http; extern crate parity_bytes as bytes; extern crate rlp; pub mod error; mod route; use std::{ net::{IpAddr, SocketAddr}, sync::{mpsc, Arc}, thread, }; use ethcore::client::BlockChainClient; use http::hyper::{ self, header::{self, HeaderValue}, server, Body, Method, StatusCode, }; use jsonrpc_core::futures::{ self, future::{self, FutureResult}, Future, }; use error::ServerError; use route::Out; pub use http::{AccessControlAllowOrigin, DomainsValidation, Host}; /// Request/response handler pub struct IpfsHandler { /// Allowed CORS domains cors_domains: Option>, /// Hostnames allowed in the `Host` request header allowed_hosts: Option>, /// Reference to the Blockchain Client client: Arc, } impl IpfsHandler { pub fn client(&self) -> &dyn BlockChainClient { &*self.client } pub fn new( cors: DomainsValidation, hosts: DomainsValidation, client: Arc, ) -> Self { IpfsHandler { cors_domains: cors.into(), allowed_hosts: hosts.into(), client, } } pub fn on_request(&self, req: hyper::Request) -> (Option, Out) { match *req.method() { Method::GET | Method::POST => {} _ => return (None, Out::Bad("Invalid Request")), } if !http::is_host_allowed(&req, &self.allowed_hosts) { return (None, Out::Bad("Disallowed Host header")); } let cors_header = http::cors_allow_origin(&req, &self.cors_domains); if cors_header == http::AllowCors::Invalid { return (None, Out::Bad("Disallowed Origin header")); } let path = req.uri().path(); let query = req.uri().query(); return (cors_header.into(), self.route(path, query)); } } impl hyper::service::Service for IpfsHandler { type ReqBody = Body; type ResBody = Body; type Error = hyper::Error; type Future = FutureResult, Self::Error>; fn call(&mut self, request: hyper::Request) -> Self::Future { let (cors_header, out) = self.on_request(request); let mut res = match out { Out::OctetStream(bytes) => hyper::Response::builder() .status(StatusCode::OK) .header( "content-type", HeaderValue::from_static("application/octet-stream"), ) .body(bytes.into()), Out::NotFound(reason) => hyper::Response::builder() .status(StatusCode::NOT_FOUND) .header( "content-type", HeaderValue::from_static("text/plain; charset=utf-8"), ) .body(reason.into()), Out::Bad(reason) => hyper::Response::builder() .status(StatusCode::BAD_REQUEST) .header( "content-type", HeaderValue::from_static("text/plain; charset=utf-8"), ) .body(reason.into()), } .expect("Response builder: Parsing 'content-type' header name will not fail; qed"); if let Some(cors_header) = cors_header { res.headers_mut() .append(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors_header); res.headers_mut() .append(header::VARY, HeaderValue::from_static("origin")); } future::ok(res) } } /// Add current interface (default: "") to list of allowed hosts fn include_current_interface(mut hosts: Vec, interface: String, port: u16) -> Vec { hosts.push( match port { 80 => interface, _ => format!("{}:{}", interface, port), } .into(), ); hosts } #[derive(Debug)] pub struct Listening { close: Option>, thread: Option>, } impl Drop for Listening { fn drop(&mut self) { self.close.take().unwrap().send(()).unwrap(); let _ = self.thread.take().unwrap().join(); } } pub fn start_server( port: u16, interface: String, cors: DomainsValidation, hosts: DomainsValidation, client: Arc, ) -> Result { let ip: IpAddr = interface .parse() .map_err(|_| ServerError::InvalidInterface)?; let addr = SocketAddr::new(ip, port); let hosts: Option> = hosts.into(); let hosts: DomainsValidation<_> = hosts .map(move |hosts| include_current_interface(hosts, interface, port)) .into(); let (close, shutdown_signal) = futures::sync::oneshot::channel::<()>(); let (tx, rx) = mpsc::sync_channel::>(1); let thread = thread::spawn(move || { let send = |res| tx.send(res).expect("rx end is never dropped; qed"); let server_bldr = match server::Server::try_bind(&addr) { Ok(s) => s, Err(err) => { send(Err(ServerError::from(err))); return; } }; let new_service = move || { Ok::<_, ServerError>(IpfsHandler::new( cors.clone(), hosts.clone(), client.clone(), )) }; let server = server_bldr .serve(new_service) .map_err(|_| ()) .select(shutdown_signal.map_err(|_| ())) .then(|_| Ok(())); hyper::rt::run(server); send(Ok(())); }); // Wait for server to start successfuly. rx.recv().expect("tx end is never dropped; qed")?; Ok(Listening { close: close.into(), thread: thread.into(), }) }