Fetching dispatcher (HTTP, HTTPS)

This commit is contained in:
Tomasz Drwięga
2016-08-30 11:40:59 +02:00
parent 25fc919913
commit 61879ef144
8 changed files with 244 additions and 79 deletions

View File

@@ -14,9 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::str;
use std::thread;
use std::sync::mpsc;
use std::cell::RefCell;
use std::{fs, str, thread};
use std::path::PathBuf;
use std::io::{self, Write};
use std::collections::HashMap;
@@ -56,7 +56,7 @@ impl From<TlsClientError> for FetchError {
pub type FetchResult = Result<(), FetchError>;
pub enum ClientMessage {
Fetch(Url, Box<io::Write + Send>, mpsc::Sender<FetchResult>),
Fetch(Url, Box<io::Write + Send>, Box<FnMut(FetchResult) + Send>),
Shutdown,
}
@@ -67,9 +67,7 @@ pub struct Client {
impl Drop for Client {
fn drop(&mut self) {
if let Err(e) = self.channel.send(ClientMessage::Shutdown) {
warn!("Error while closing client: {:?}. Already stopped?", e);
}
self.close_internal();
if let Some(thread) = self.thread.take() {
thread.join().expect("Clean shutdown.");
}
@@ -95,10 +93,33 @@ impl Client {
})
}
pub fn fetch(&self, url: Url, writer: Box<io::Write + Send>) -> Result<mpsc::Receiver<FetchResult>, FetchError> {
let (tx, rx) = mpsc::channel();
try!(self.channel.send(ClientMessage::Fetch(url, writer, tx)));
Ok(rx)
pub fn fetch_to_file<F: FnOnce(FetchResult) + Send + 'static>(&self, url: Url, path: PathBuf, callback: F) -> Result<(), FetchError> {
let file = try!(fs::File::create(&path));
self.fetch(url, Box::new(file), move |result| {
if let Err(_) = result {
// remove temporary file
let _ = fs::remove_file(&path);
}
callback(result);
})
}
pub fn fetch<F: FnOnce(FetchResult) + Send + 'static>(&self, url: Url, writer: Box<io::Write + Send>, callback: F) -> Result<(), FetchError> {
let cell = RefCell::new(Some(callback));
try!(self.channel.send(ClientMessage::Fetch(url, writer, Box::new(move |res| {
cell.borrow_mut().take().expect("Called only once.")(res);
}))));
Ok(())
}
pub fn close(mut self) {
self.close_internal()
}
fn close_internal(&mut self) {
if let Err(e) = self.channel.send(ClientMessage::Shutdown) {
warn!("Error while closing client: {:?}. Already stopped?", e);
}
}
}
@@ -127,11 +148,11 @@ impl mio::Handler for ClientLoop {
fn notify(&mut self, event_loop: &mut mio::EventLoop<Self>, msg: Self::Message) {
match msg {
ClientMessage::Shutdown => event_loop.shutdown(),
ClientMessage::Fetch(url, writer, sender) => {
ClientMessage::Fetch(url, writer, callback) => {
let token = self.next_token;
self.next_token += 1;
if let Ok(mut tlsclient) = TlsClient::new(mio::Token(token), &url, writer, sender) {
if let Ok(mut tlsclient) = TlsClient::new(mio::Token(token), &url, writer, callback) {
let httpreq = format!(
"GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\nAccept-Encoding: identity\r\n\r\n",
url.path(),
@@ -183,3 +204,4 @@ fn should_successfuly_fetch_a_page() {
assert!(result.is_ok());
assert!(wrote.load(Ordering::Relaxed) > 0);
}

View File

@@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::str;
use std::sync::{mpsc, Arc};
use std::sync::Arc;
use std::io::{self, Read, Cursor, BufReader};
use mio;
@@ -43,7 +43,7 @@ pub struct TlsClient {
writer: Box<io::Write>,
error: Option<TlsClientError>,
closing: bool,
listener: mpsc::Sender<FetchResult>,
callback: Box<FnMut(FetchResult) + Send>,
}
impl io::Write for TlsClient {
@@ -81,7 +81,7 @@ impl TlsClient {
token: mio::Token,
url: &Url,
writer: Box<io::Write + Send>,
sender: mpsc::Sender<FetchResult>,
mut callback: Box<FnMut(FetchResult) + Send>,
) -> Result<Self, FetchError> {
let res = TlsClient::make_config().and_then(|cfg| {
TcpStream::connect(url.address()).map(|sock| {
@@ -97,10 +97,10 @@ impl TlsClient {
closing: false,
error: None,
tls_session: rustls::ClientSession::new(&cfg, url.hostname()),
listener: sender,
callback: callback,
}),
Err(e) => {
sender.send(Err(e)).unwrap_or_else(|e| warn!("Client initialization error: {:?}", e));
callback(Err(e));
Err(FetchError::Client(TlsClientError::Initialization))
}
}
@@ -121,14 +121,12 @@ impl TlsClient {
if self.is_closed() {
trace!("Connection closed");
let res = self.listener.send(match self.error.take() {
let callback = &mut self.callback;
callback(match self.error.take() {
Some(err) => Err(err.into()),
None => Ok(()),
});
if let Err(e) = res {
warn!("Finished fetching but listener is not available: {:?}", e);
}
return true;
}
@@ -207,6 +205,7 @@ impl TlsClient {
fn do_write(&mut self) {
self.tls_session.write_tls(&mut self.socket).unwrap_or_else(|e| {
warn!("TLS write error: {:?}", e);
0
});
}